1use std::cell::RefCell;
6use std::rc::Rc;
7
8use dom_struct::dom_struct;
9use js::jsval::UndefinedValue;
10use js::realm::CurrentRealm;
11use js::rust::{HandleObject as SafeHandleObject, HandleValue as SafeHandleValue};
12use script_bindings::reflector::{Reflector, reflect_dom_object_with_proto};
13
14use crate::dom::bindings::codegen::Bindings::WritableStreamDefaultWriterBinding::WritableStreamDefaultWriterMethods;
15use crate::dom::bindings::error::{Error, ErrorToJsval};
16use crate::dom::bindings::reflector::DomGlobal;
17use crate::dom::bindings::root::{DomRoot, MutNullableDom};
18use crate::dom::globalscope::GlobalScope;
19use crate::dom::promise::Promise;
20use crate::dom::stream::writablestream::WritableStream;
21use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
22
23#[dom_struct]
25pub struct WritableStreamDefaultWriter {
26 reflector_: Reflector,
27
28 #[conditional_malloc_size_of]
29 ready_promise: RefCell<Rc<Promise>>,
30
31 #[conditional_malloc_size_of]
33 closed_promise: RefCell<Rc<Promise>>,
34
35 stream: MutNullableDom<WritableStream>,
37}
38
39impl WritableStreamDefaultWriter {
40 fn new_inherited(global: &GlobalScope, can_gc: CanGc) -> WritableStreamDefaultWriter {
43 WritableStreamDefaultWriter {
44 reflector_: Reflector::new(),
45 stream: Default::default(),
46 closed_promise: RefCell::new(Promise::new(global, can_gc)),
47 ready_promise: RefCell::new(Promise::new(global, can_gc)),
48 }
49 }
50
51 pub(crate) fn new(
52 global: &GlobalScope,
53 proto: Option<SafeHandleObject>,
54 can_gc: CanGc,
55 ) -> DomRoot<WritableStreamDefaultWriter> {
56 reflect_dom_object_with_proto(
57 Box::new(WritableStreamDefaultWriter::new_inherited(global, can_gc)),
58 global,
59 proto,
60 can_gc,
61 )
62 }
63
64 pub(crate) fn setup(
67 &self,
68 cx: SafeJSContext,
69 stream: &WritableStream,
70 can_gc: CanGc,
71 ) -> Result<(), Error> {
72 if stream.is_locked() {
74 return Err(Error::Type(c"Stream is locked".to_owned()));
75 }
76
77 self.stream.set(Some(stream));
79
80 stream.set_writer(Some(self));
82
83 if stream.is_writable() {
87 if !stream.close_queued_or_in_flight() && stream.get_backpressure() {
90 } else {
93 self.ready_promise.borrow().resolve_native(&(), can_gc);
96 }
97
98 return Ok(());
101 }
102
103 if stream.is_erroring() {
105 rooted!(in(*cx) let mut error = UndefinedValue());
106 stream.get_stored_error(error.handle_mut());
107
108 let ready_promise = self.ready_promise.borrow();
112 ready_promise.reject_native(&error.handle(), can_gc);
113 ready_promise.set_promise_is_handled();
114
115 return Ok(());
118 }
119
120 if stream.is_closed() {
122 self.ready_promise.borrow().resolve_native(&(), can_gc);
125
126 self.closed_promise.borrow().resolve_native(&(), can_gc);
129 return Ok(());
130 }
131
132 assert!(stream.is_errored());
135
136 rooted!(in(*cx) let mut error = UndefinedValue());
138 stream.get_stored_error(error.handle_mut());
139
140 let ready_promise = self.ready_promise.borrow();
144 ready_promise.reject_native(&error.handle(), can_gc);
145 ready_promise.set_promise_is_handled();
146
147 let ready_promise = self.closed_promise.borrow();
151 ready_promise.reject_native(&error.handle(), can_gc);
152 ready_promise.set_promise_is_handled();
153
154 Ok(())
155 }
156
157 pub(crate) fn reject_closed_promise_with_stored_error(
158 &self,
159 cx: &mut js::context::JSContext,
160 error: &SafeHandleValue,
161 ) {
162 self.closed_promise
163 .borrow()
164 .reject_native(error, CanGc::from_cx(cx));
165 }
166
167 pub(crate) fn set_close_promise_is_handled(&self) {
168 self.closed_promise.borrow().set_promise_is_handled();
169 }
170
171 pub(crate) fn set_ready_promise(&self, promise: Rc<Promise>) {
172 *self.ready_promise.borrow_mut() = promise;
173 }
174
175 pub(crate) fn resolve_ready_promise_with_undefined(&self, can_gc: CanGc) {
176 self.ready_promise.borrow().resolve_native(&(), can_gc);
177 }
178
179 pub(crate) fn resolve_closed_promise_with_undefined(&self, can_gc: CanGc) {
180 self.closed_promise.borrow().resolve_native(&(), can_gc);
181 }
182
183 pub(crate) fn ensure_ready_promise_rejected(
185 &self,
186 global: &GlobalScope,
187 error: SafeHandleValue,
188 can_gc: CanGc,
189 ) {
190 let ready_promise = self.ready_promise.borrow().clone();
191
192 if ready_promise.is_pending() {
194 ready_promise.reject_native(&error, can_gc);
196
197 ready_promise.set_promise_is_handled();
199 } else {
200 let promise = Promise::new(global, can_gc);
202 promise.reject_native(&error, can_gc);
203
204 promise.set_promise_is_handled();
206 *self.ready_promise.borrow_mut() = promise;
207 }
208 }
209
210 pub(crate) fn ensure_closed_promise_rejected(
212 &self,
213 global: &GlobalScope,
214 error: SafeHandleValue,
215 can_gc: CanGc,
216 ) {
217 let closed_promise = self.closed_promise.borrow().clone();
218
219 if closed_promise.is_pending() {
221 closed_promise.reject_native(&error, can_gc);
223
224 closed_promise.set_promise_is_handled();
226 } else {
227 let promise = Promise::new(global, can_gc);
229 promise.reject_native(&error, can_gc);
230
231 promise.set_promise_is_handled();
233 *self.closed_promise.borrow_mut() = promise;
234 }
235 }
236
237 fn abort(
239 &self,
240 cx: &mut CurrentRealm,
241 global: &GlobalScope,
242 reason: SafeHandleValue,
243 ) -> Rc<Promise> {
244 let Some(stream) = self.stream.get() else {
246 unreachable!("Stream should be set.");
248 };
249
250 stream.abort(cx, global, reason)
252 }
253
254 fn close(&self, cx: &mut js::context::JSContext, global: &GlobalScope) -> Rc<Promise> {
256 let Some(stream) = self.stream.get() else {
258 unreachable!("Stream should be set.");
260 };
261
262 stream.close(cx, global)
264 }
265
266 pub(crate) fn write(
268 &self,
269 cx: &mut js::context::JSContext,
270 global: &GlobalScope,
271 chunk: SafeHandleValue,
272 ) -> Rc<Promise> {
273 let Some(stream) = self.stream.get() else {
275 unreachable!("Stream should be set.");
277 };
278
279 let Some(controller) = stream.get_controller() else {
282 unreachable!("Controller should be set.");
283 };
284
285 let chunk_size = controller.get_chunk_size(cx, global, chunk);
287
288 if !self
291 .stream
292 .get()
293 .is_some_and(|current_stream| current_stream == stream)
294 {
295 let promise = Promise::new2(cx, global);
296 promise.reject_error(
297 Error::Type(c"Stream is not equal to writer stream".to_owned()),
298 CanGc::from_cx(cx),
299 );
300 return promise;
301 }
302
303 if stream.is_errored() {
306 rooted!(&in(cx) let mut error = UndefinedValue());
308 stream.get_stored_error(error.handle_mut());
309 let promise = Promise::new2(cx, global);
310 promise.reject_native(&error.handle(), CanGc::from_cx(cx));
311 return promise;
312 }
313
314 if stream.close_queued_or_in_flight() || stream.is_closed() {
317 let promise = Promise::new2(cx, global);
320 promise.reject_error(
321 Error::Type(c"Stream has been closed, or has close queued or in-flight".to_owned()),
322 CanGc::from_cx(cx),
323 );
324 return promise;
325 }
326
327 if stream.is_erroring() {
329 rooted!(&in(cx) let mut error = UndefinedValue());
331 stream.get_stored_error(error.handle_mut());
332 let promise = Promise::new2(cx, global);
333 promise.reject_native(&error.handle(), CanGc::from_cx(cx));
334 return promise;
335 }
336
337 assert!(stream.is_writable());
339
340 let promise = stream.add_write_request(global, CanGc::from_cx(cx));
342
343 controller.write(cx, global, chunk, chunk_size);
345
346 promise
348 }
349
350 pub(crate) fn release(&self, cx: SafeJSContext, global: &GlobalScope, can_gc: CanGc) {
352 let Some(stream) = self.stream.get() else {
354 unreachable!("Stream should be set.");
356 };
357
358 assert!(stream.get_writer().is_some_and(|writer| &*writer == self));
360
361 let released_error = Error::Type(c"Writer has been released".to_owned());
363
364 rooted!(in(*cx) let mut error = UndefinedValue());
366 released_error.to_jsval(cx, global, error.handle_mut(), can_gc);
367
368 self.ensure_ready_promise_rejected(global, error.handle(), can_gc);
370
371 self.ensure_closed_promise_rejected(global, error.handle(), can_gc);
373
374 stream.set_writer(None);
376
377 self.stream.set(None);
379 }
380
381 pub(crate) fn close_with_error_propagation(
383 &self,
384 cx: &mut js::context::JSContext,
385 global: &GlobalScope,
386 ) -> Rc<Promise> {
387 let Some(stream) = self.stream.get() else {
389 unreachable!("Stream should be set.");
391 };
392
393 if stream.close_queued_or_in_flight() || stream.is_closed() {
399 let promise = Promise::new2(cx, global);
401 promise.resolve_native(&(), CanGc::from_cx(cx));
402 return promise;
403 }
404
405 if stream.is_errored() {
407 rooted!(&in(cx) let mut error = UndefinedValue());
409 stream.get_stored_error(error.handle_mut());
410 let promise = Promise::new2(cx, global);
411 promise.reject_native(&error.handle(), CanGc::from_cx(cx));
412 return promise;
413 }
414
415 assert!(stream.is_writable() || stream.is_erroring());
417
418 self.close(cx, global)
420 }
421
422 pub(crate) fn get_stream(&self) -> Option<DomRoot<WritableStream>> {
423 self.stream.get()
424 }
425}
426
427impl WritableStreamDefaultWriterMethods<crate::DomTypeHolder> for WritableStreamDefaultWriter {
428 fn Closed(&self) -> Rc<Promise> {
430 return self.closed_promise.borrow().clone();
432 }
433
434 fn GetDesiredSize(&self) -> Result<Option<f64>, Error> {
436 let Some(stream) = self.stream.get() else {
438 return Err(Error::Type(c"Stream is undefined".to_owned()));
439 };
440
441 Ok(stream.get_desired_size())
443 }
444
445 fn Ready(&self) -> Rc<Promise> {
447 return self.ready_promise.borrow().clone();
449 }
450
451 fn Abort(&self, cx: &mut CurrentRealm, reason: SafeHandleValue) -> Rc<Promise> {
453 let global = GlobalScope::from_current_realm(cx);
454
455 if self.stream.get().is_none() {
457 let promise = Promise::new2(cx, &global);
459 promise.reject_error(
460 Error::Type(c"Stream is undefined".to_owned()),
461 CanGc::from_cx(cx),
462 );
463 return promise;
464 }
465
466 self.abort(cx, &global, reason)
468 }
469
470 fn Close(&self, cx: &mut CurrentRealm) -> Rc<Promise> {
472 let global = GlobalScope::from_current_realm(cx);
473 let promise = Promise::new2(cx, &global);
474
475 let Some(stream) = self.stream.get() else {
477 promise.reject_error(
480 Error::Type(c"Stream is undefined".to_owned()),
481 CanGc::from_cx(cx),
482 );
483 return promise;
484 };
485
486 if stream.close_queued_or_in_flight() {
488 promise.reject_error(
490 Error::Type(c"Stream has closed queued or in-flight".to_owned()),
491 CanGc::from_cx(cx),
492 );
493 return promise;
494 }
495
496 self.close(cx, &global)
497 }
498
499 fn ReleaseLock(&self, can_gc: CanGc) {
501 let Some(stream) = self.stream.get() else {
503 return;
505 };
506
507 assert!(stream.get_writer().is_some());
509
510 let global = self.global();
511 let cx = GlobalScope::get_cx();
512
513 self.release(cx, &global, can_gc);
515 }
516
517 fn Write(&self, cx: &mut CurrentRealm, chunk: SafeHandleValue) -> Rc<Promise> {
519 let global = GlobalScope::from_current_realm(cx);
520
521 if self.stream.get().is_none() {
523 let promise = Promise::new2(cx, &global);
525 promise.reject_error(
526 Error::Type(c"Stream is undefined".to_owned()),
527 CanGc::from_cx(cx),
528 );
529 return promise;
530 }
531
532 self.write(cx, &global, chunk)
534 }
535
536 fn Constructor(
538 global: &GlobalScope,
539 proto: Option<SafeHandleObject>,
540 can_gc: CanGc,
541 stream: &WritableStream,
542 ) -> Result<DomRoot<WritableStreamDefaultWriter>, Error> {
543 let writer = WritableStreamDefaultWriter::new(global, proto, can_gc);
544
545 let cx = GlobalScope::get_cx();
546
547 writer.setup(cx, stream, can_gc)?;
549
550 Ok(writer)
551 }
552}