1use std::cell::RefCell;
6use std::rc::Rc;
7
8use dom_struct::dom_struct;
9use js::jsval::UndefinedValue;
10use js::rust::{HandleObject as SafeHandleObject, HandleValue as SafeHandleValue};
11
12use crate::dom::bindings::codegen::Bindings::WritableStreamDefaultWriterBinding::WritableStreamDefaultWriterMethods;
13use crate::dom::bindings::error::{Error, ErrorToJsval};
14use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object_with_proto};
15use crate::dom::bindings::root::{DomRoot, MutNullableDom};
16use crate::dom::globalscope::GlobalScope;
17use crate::dom::promise::Promise;
18use crate::dom::writablestream::WritableStream;
19use crate::realms::InRealm;
20use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
21
22#[dom_struct]
24pub struct WritableStreamDefaultWriter {
25 reflector_: Reflector,
26
27 #[ignore_malloc_size_of = "Rc is hard"]
28 ready_promise: RefCell<Rc<Promise>>,
29
30 #[ignore_malloc_size_of = "Rc is hard"]
32 closed_promise: RefCell<Rc<Promise>>,
33
34 stream: MutNullableDom<WritableStream>,
36}
37
38impl WritableStreamDefaultWriter {
39 #[cfg_attr(crown, allow(crown::unrooted_must_root))]
40 fn new_inherited(global: &GlobalScope, can_gc: CanGc) -> WritableStreamDefaultWriter {
43 WritableStreamDefaultWriter {
44 reflector_: Reflector::new(),
45 stream: Default::default(),
46 closed_promise: RefCell::new(Promise::new(global, can_gc)),
47 ready_promise: RefCell::new(Promise::new(global, can_gc)),
48 }
49 }
50
51 pub(crate) fn new(
52 global: &GlobalScope,
53 proto: Option<SafeHandleObject>,
54 can_gc: CanGc,
55 ) -> DomRoot<WritableStreamDefaultWriter> {
56 reflect_dom_object_with_proto(
57 Box::new(WritableStreamDefaultWriter::new_inherited(global, can_gc)),
58 global,
59 proto,
60 can_gc,
61 )
62 }
63
64 pub(crate) fn setup(
67 &self,
68 cx: SafeJSContext,
69 stream: &WritableStream,
70 can_gc: CanGc,
71 ) -> Result<(), Error> {
72 if stream.is_locked() {
74 return Err(Error::Type("Stream is locked".to_string()));
75 }
76
77 self.stream.set(Some(stream));
79
80 stream.set_writer(Some(self));
82
83 if stream.is_writable() {
87 if !stream.close_queued_or_in_flight() && stream.get_backpressure() {
90 } else {
93 self.ready_promise.borrow().resolve_native(&(), can_gc);
96 }
97
98 return Ok(());
101 }
102
103 if stream.is_erroring() {
105 rooted!(in(*cx) let mut error = UndefinedValue());
106 stream.get_stored_error(error.handle_mut());
107
108 let ready_promise = self.ready_promise.borrow();
112 ready_promise.reject_native(&error.handle(), can_gc);
113 ready_promise.set_promise_is_handled();
114
115 return Ok(());
118 }
119
120 if stream.is_closed() {
122 self.ready_promise.borrow().resolve_native(&(), can_gc);
125
126 self.closed_promise.borrow().resolve_native(&(), can_gc);
129 return Ok(());
130 }
131
132 assert!(stream.is_errored());
135
136 rooted!(in(*cx) let mut error = UndefinedValue());
138 stream.get_stored_error(error.handle_mut());
139
140 let ready_promise = self.ready_promise.borrow();
144 ready_promise.reject_native(&error.handle(), can_gc);
145 ready_promise.set_promise_is_handled();
146
147 let ready_promise = self.closed_promise.borrow();
151 ready_promise.reject_native(&error.handle(), can_gc);
152 ready_promise.set_promise_is_handled();
153
154 Ok(())
155 }
156
157 pub(crate) fn reject_closed_promise_with_stored_error(
158 &self,
159 error: &SafeHandleValue,
160 can_gc: CanGc,
161 ) {
162 self.closed_promise.borrow().reject_native(error, can_gc);
163 }
164
165 pub(crate) fn set_close_promise_is_handled(&self) {
166 self.closed_promise.borrow().set_promise_is_handled();
167 }
168
169 pub(crate) fn set_ready_promise(&self, promise: Rc<Promise>) {
170 *self.ready_promise.borrow_mut() = promise;
171 }
172
173 pub(crate) fn resolve_ready_promise_with_undefined(&self, can_gc: CanGc) {
174 self.ready_promise.borrow().resolve_native(&(), can_gc);
175 }
176
177 pub(crate) fn resolve_closed_promise_with_undefined(&self, can_gc: CanGc) {
178 self.closed_promise.borrow().resolve_native(&(), can_gc);
179 }
180
181 pub(crate) fn ensure_ready_promise_rejected(
183 &self,
184 global: &GlobalScope,
185 error: SafeHandleValue,
186 can_gc: CanGc,
187 ) {
188 let ready_promise = self.ready_promise.borrow().clone();
189
190 if ready_promise.is_pending() {
192 ready_promise.reject_native(&error, can_gc);
194
195 ready_promise.set_promise_is_handled();
197 } else {
198 let promise = Promise::new(global, can_gc);
200 promise.reject_native(&error, can_gc);
201
202 promise.set_promise_is_handled();
204 *self.ready_promise.borrow_mut() = promise;
205 }
206 }
207
208 pub(crate) fn ensure_closed_promise_rejected(
210 &self,
211 global: &GlobalScope,
212 error: SafeHandleValue,
213 can_gc: CanGc,
214 ) {
215 let closed_promise = self.closed_promise.borrow().clone();
216
217 if closed_promise.is_pending() {
219 closed_promise.reject_native(&error, can_gc);
221
222 closed_promise.set_promise_is_handled();
224 } else {
225 let promise = Promise::new(global, can_gc);
227 promise.reject_native(&error, can_gc);
228
229 promise.set_promise_is_handled();
231 *self.closed_promise.borrow_mut() = promise;
232 }
233 }
234
235 fn abort(
237 &self,
238 cx: SafeJSContext,
239 global: &GlobalScope,
240 reason: SafeHandleValue,
241 realm: InRealm,
242 can_gc: CanGc,
243 ) -> Rc<Promise> {
244 let Some(stream) = self.stream.get() else {
246 unreachable!("Stream should be set.");
248 };
249
250 stream.abort(cx, global, reason, realm, can_gc)
252 }
253
254 fn close(&self, cx: SafeJSContext, global: &GlobalScope, can_gc: CanGc) -> Rc<Promise> {
256 let Some(stream) = self.stream.get() else {
258 unreachable!("Stream should be set.");
260 };
261
262 stream.close(cx, global, can_gc)
264 }
265
266 pub(crate) fn write(
268 &self,
269 cx: SafeJSContext,
270 global: &GlobalScope,
271 chunk: SafeHandleValue,
272 can_gc: CanGc,
273 ) -> Rc<Promise> {
274 let Some(stream) = self.stream.get() else {
276 unreachable!("Stream should be set.");
278 };
279
280 let Some(controller) = stream.get_controller() else {
283 unreachable!("Controller should be set.");
284 };
285
286 let chunk_size = controller.get_chunk_size(cx, global, chunk, can_gc);
288
289 if !self
292 .stream
293 .get()
294 .is_some_and(|current_stream| current_stream == stream)
295 {
296 let promise = Promise::new(global, can_gc);
297 promise.reject_error(
298 Error::Type("Stream is not equal to writer stream".to_string()),
299 can_gc,
300 );
301 return promise;
302 }
303
304 if stream.is_errored() {
307 rooted!(in(*cx) let mut error = UndefinedValue());
309 stream.get_stored_error(error.handle_mut());
310 let promise = Promise::new(global, can_gc);
311 promise.reject_native(&error.handle(), can_gc);
312 return promise;
313 }
314
315 if stream.close_queued_or_in_flight() || stream.is_closed() {
318 let promise = Promise::new(global, can_gc);
321 promise.reject_error(
322 Error::Type("Stream has been closed, or has close queued or in-flight".to_string()),
323 can_gc,
324 );
325 return promise;
326 }
327
328 if stream.is_erroring() {
330 rooted!(in(*cx) let mut error = UndefinedValue());
332 stream.get_stored_error(error.handle_mut());
333 let promise = Promise::new(global, can_gc);
334 promise.reject_native(&error.handle(), can_gc);
335 return promise;
336 }
337
338 assert!(stream.is_writable());
340
341 let promise = stream.add_write_request(global, can_gc);
343
344 controller.write(cx, global, chunk, chunk_size, can_gc);
346
347 promise
349 }
350
351 pub(crate) fn release(&self, cx: SafeJSContext, global: &GlobalScope, can_gc: CanGc) {
353 let Some(stream) = self.stream.get() else {
355 unreachable!("Stream should be set.");
357 };
358
359 assert!(stream.get_writer().is_some_and(|writer| &*writer == self));
361
362 let released_error = Error::Type("Writer has been released".to_string());
364
365 rooted!(in(*cx) let mut error = UndefinedValue());
367 released_error.to_jsval(cx, global, error.handle_mut(), can_gc);
368
369 self.ensure_ready_promise_rejected(global, error.handle(), can_gc);
371
372 self.ensure_closed_promise_rejected(global, error.handle(), can_gc);
374
375 stream.set_writer(None);
377
378 self.stream.set(None);
380 }
381
382 pub(crate) fn close_with_error_propagation(
384 &self,
385 cx: SafeJSContext,
386 global: &GlobalScope,
387 can_gc: CanGc,
388 ) -> Rc<Promise> {
389 let Some(stream) = self.stream.get() else {
391 unreachable!("Stream should be set.");
393 };
394
395 if stream.close_queued_or_in_flight() || stream.is_closed() {
401 let promise = Promise::new(global, can_gc);
403 promise.resolve_native(&(), can_gc);
404 return promise;
405 }
406
407 if stream.is_errored() {
409 rooted!(in(*cx) let mut error = UndefinedValue());
411 stream.get_stored_error(error.handle_mut());
412 let promise = Promise::new(global, can_gc);
413 promise.reject_native(&error.handle(), can_gc);
414 return promise;
415 }
416
417 assert!(stream.is_writable() || stream.is_erroring());
419
420 self.close(cx, global, can_gc)
422 }
423
424 pub(crate) fn get_stream(&self) -> Option<DomRoot<WritableStream>> {
425 self.stream.get()
426 }
427}
428
429impl WritableStreamDefaultWriterMethods<crate::DomTypeHolder> for WritableStreamDefaultWriter {
430 fn Closed(&self) -> Rc<Promise> {
432 return self.closed_promise.borrow().clone();
434 }
435
436 fn GetDesiredSize(&self) -> Result<Option<f64>, Error> {
438 let Some(stream) = self.stream.get() else {
440 return Err(Error::Type("Stream is undefined".to_string()));
441 };
442
443 Ok(stream.get_desired_size())
445 }
446
447 fn Ready(&self) -> Rc<Promise> {
449 return self.ready_promise.borrow().clone();
451 }
452
453 fn Abort(
455 &self,
456 cx: SafeJSContext,
457 reason: SafeHandleValue,
458 realm: InRealm,
459 can_gc: CanGc,
460 ) -> Rc<Promise> {
461 let global = GlobalScope::from_safe_context(cx, realm);
462
463 if self.stream.get().is_none() {
465 let promise = Promise::new(&global, can_gc);
467 promise.reject_error(Error::Type("Stream is undefined".to_string()), can_gc);
468 return promise;
469 }
470
471 self.abort(cx, &global, reason, realm, can_gc)
473 }
474
475 fn Close(&self, in_realm: InRealm, can_gc: CanGc) -> Rc<Promise> {
477 let cx = GlobalScope::get_cx();
478 let global = GlobalScope::from_safe_context(cx, in_realm);
479 let promise = Promise::new(&global, can_gc);
480
481 let Some(stream) = self.stream.get() else {
483 promise.reject_error(Error::Type("Stream is undefined".to_string()), can_gc);
486 return promise;
487 };
488
489 if stream.close_queued_or_in_flight() {
491 promise.reject_error(
493 Error::Type("Stream has closed queued or in-flight".to_string()),
494 can_gc,
495 );
496 return promise;
497 }
498
499 self.close(cx, &global, can_gc)
500 }
501
502 fn ReleaseLock(&self, can_gc: CanGc) {
504 let Some(stream) = self.stream.get() else {
506 return;
508 };
509
510 assert!(stream.get_writer().is_some());
512
513 let global = self.global();
514 let cx = GlobalScope::get_cx();
515
516 self.release(cx, &global, can_gc);
518 }
519
520 fn Write(
522 &self,
523 cx: SafeJSContext,
524 chunk: SafeHandleValue,
525 realm: InRealm,
526 can_gc: CanGc,
527 ) -> Rc<Promise> {
528 let global = GlobalScope::from_safe_context(cx, realm);
529
530 if self.stream.get().is_none() {
532 let global = GlobalScope::from_safe_context(cx, realm);
534 let promise = Promise::new(&global, can_gc);
535 promise.reject_error(Error::Type("Stream is undefined".to_string()), can_gc);
536 return promise;
537 }
538
539 self.write(cx, &global, chunk, can_gc)
541 }
542
543 fn Constructor(
545 global: &GlobalScope,
546 proto: Option<SafeHandleObject>,
547 can_gc: CanGc,
548 stream: &WritableStream,
549 ) -> Result<DomRoot<WritableStreamDefaultWriter>, Error> {
550 let writer = WritableStreamDefaultWriter::new(global, proto, can_gc);
551
552 let cx = GlobalScope::get_cx();
553
554 writer.setup(cx, stream, can_gc)?;
556
557 Ok(writer)
558 }
559}