1use std::cell::RefCell;
6use std::rc::Rc;
7
8use dom_struct::dom_struct;
9use js::jsval::UndefinedValue;
10use js::realm::CurrentRealm;
11use js::rust::{HandleObject as SafeHandleObject, HandleValue as SafeHandleValue};
12
13use crate::dom::bindings::codegen::Bindings::WritableStreamDefaultWriterBinding::WritableStreamDefaultWriterMethods;
14use crate::dom::bindings::error::{Error, ErrorToJsval};
15use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object_with_proto};
16use crate::dom::bindings::root::{DomRoot, MutNullableDom};
17use crate::dom::globalscope::GlobalScope;
18use crate::dom::promise::Promise;
19use crate::dom::stream::writablestream::WritableStream;
20use crate::realms::InRealm;
21use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
22
23#[dom_struct]
25pub struct WritableStreamDefaultWriter {
26 reflector_: Reflector,
27
28 #[conditional_malloc_size_of]
29 ready_promise: RefCell<Rc<Promise>>,
30
31 #[conditional_malloc_size_of]
33 closed_promise: RefCell<Rc<Promise>>,
34
35 stream: MutNullableDom<WritableStream>,
37}
38
39impl WritableStreamDefaultWriter {
40 fn new_inherited(global: &GlobalScope, can_gc: CanGc) -> WritableStreamDefaultWriter {
43 WritableStreamDefaultWriter {
44 reflector_: Reflector::new(),
45 stream: Default::default(),
46 closed_promise: RefCell::new(Promise::new(global, can_gc)),
47 ready_promise: RefCell::new(Promise::new(global, can_gc)),
48 }
49 }
50
51 pub(crate) fn new(
52 global: &GlobalScope,
53 proto: Option<SafeHandleObject>,
54 can_gc: CanGc,
55 ) -> DomRoot<WritableStreamDefaultWriter> {
56 reflect_dom_object_with_proto(
57 Box::new(WritableStreamDefaultWriter::new_inherited(global, can_gc)),
58 global,
59 proto,
60 can_gc,
61 )
62 }
63
64 pub(crate) fn setup(
67 &self,
68 cx: SafeJSContext,
69 stream: &WritableStream,
70 can_gc: CanGc,
71 ) -> Result<(), Error> {
72 if stream.is_locked() {
74 return Err(Error::Type(c"Stream is locked".to_owned()));
75 }
76
77 self.stream.set(Some(stream));
79
80 stream.set_writer(Some(self));
82
83 if stream.is_writable() {
87 if !stream.close_queued_or_in_flight() && stream.get_backpressure() {
90 } else {
93 self.ready_promise.borrow().resolve_native(&(), can_gc);
96 }
97
98 return Ok(());
101 }
102
103 if stream.is_erroring() {
105 rooted!(in(*cx) let mut error = UndefinedValue());
106 stream.get_stored_error(error.handle_mut());
107
108 let ready_promise = self.ready_promise.borrow();
112 ready_promise.reject_native(&error.handle(), can_gc);
113 ready_promise.set_promise_is_handled();
114
115 return Ok(());
118 }
119
120 if stream.is_closed() {
122 self.ready_promise.borrow().resolve_native(&(), can_gc);
125
126 self.closed_promise.borrow().resolve_native(&(), can_gc);
129 return Ok(());
130 }
131
132 assert!(stream.is_errored());
135
136 rooted!(in(*cx) let mut error = UndefinedValue());
138 stream.get_stored_error(error.handle_mut());
139
140 let ready_promise = self.ready_promise.borrow();
144 ready_promise.reject_native(&error.handle(), can_gc);
145 ready_promise.set_promise_is_handled();
146
147 let ready_promise = self.closed_promise.borrow();
151 ready_promise.reject_native(&error.handle(), can_gc);
152 ready_promise.set_promise_is_handled();
153
154 Ok(())
155 }
156
157 pub(crate) fn reject_closed_promise_with_stored_error(
158 &self,
159 error: &SafeHandleValue,
160 can_gc: CanGc,
161 ) {
162 self.closed_promise.borrow().reject_native(error, can_gc);
163 }
164
165 pub(crate) fn set_close_promise_is_handled(&self) {
166 self.closed_promise.borrow().set_promise_is_handled();
167 }
168
169 pub(crate) fn set_ready_promise(&self, promise: Rc<Promise>) {
170 *self.ready_promise.borrow_mut() = promise;
171 }
172
173 pub(crate) fn resolve_ready_promise_with_undefined(&self, can_gc: CanGc) {
174 self.ready_promise.borrow().resolve_native(&(), can_gc);
175 }
176
177 pub(crate) fn resolve_closed_promise_with_undefined(&self, can_gc: CanGc) {
178 self.closed_promise.borrow().resolve_native(&(), can_gc);
179 }
180
181 pub(crate) fn ensure_ready_promise_rejected(
183 &self,
184 global: &GlobalScope,
185 error: SafeHandleValue,
186 can_gc: CanGc,
187 ) {
188 let ready_promise = self.ready_promise.borrow().clone();
189
190 if ready_promise.is_pending() {
192 ready_promise.reject_native(&error, can_gc);
194
195 ready_promise.set_promise_is_handled();
197 } else {
198 let promise = Promise::new(global, can_gc);
200 promise.reject_native(&error, can_gc);
201
202 promise.set_promise_is_handled();
204 *self.ready_promise.borrow_mut() = promise;
205 }
206 }
207
208 pub(crate) fn ensure_closed_promise_rejected(
210 &self,
211 global: &GlobalScope,
212 error: SafeHandleValue,
213 can_gc: CanGc,
214 ) {
215 let closed_promise = self.closed_promise.borrow().clone();
216
217 if closed_promise.is_pending() {
219 closed_promise.reject_native(&error, can_gc);
221
222 closed_promise.set_promise_is_handled();
224 } else {
225 let promise = Promise::new(global, can_gc);
227 promise.reject_native(&error, can_gc);
228
229 promise.set_promise_is_handled();
231 *self.closed_promise.borrow_mut() = promise;
232 }
233 }
234
235 fn abort(
237 &self,
238 cx: &mut CurrentRealm,
239 global: &GlobalScope,
240 reason: SafeHandleValue,
241 ) -> Rc<Promise> {
242 let Some(stream) = self.stream.get() else {
244 unreachable!("Stream should be set.");
246 };
247
248 stream.abort(cx, global, reason)
250 }
251
252 fn close(&self, cx: SafeJSContext, global: &GlobalScope, can_gc: CanGc) -> Rc<Promise> {
254 let Some(stream) = self.stream.get() else {
256 unreachable!("Stream should be set.");
258 };
259
260 stream.close(cx, global, can_gc)
262 }
263
264 pub(crate) fn write(
266 &self,
267 cx: SafeJSContext,
268 global: &GlobalScope,
269 chunk: SafeHandleValue,
270 can_gc: CanGc,
271 ) -> Rc<Promise> {
272 let Some(stream) = self.stream.get() else {
274 unreachable!("Stream should be set.");
276 };
277
278 let Some(controller) = stream.get_controller() else {
281 unreachable!("Controller should be set.");
282 };
283
284 let chunk_size = controller.get_chunk_size(cx, global, chunk, can_gc);
286
287 if !self
290 .stream
291 .get()
292 .is_some_and(|current_stream| current_stream == stream)
293 {
294 let promise = Promise::new(global, can_gc);
295 promise.reject_error(
296 Error::Type(c"Stream is not equal to writer stream".to_owned()),
297 can_gc,
298 );
299 return promise;
300 }
301
302 if stream.is_errored() {
305 rooted!(in(*cx) let mut error = UndefinedValue());
307 stream.get_stored_error(error.handle_mut());
308 let promise = Promise::new(global, can_gc);
309 promise.reject_native(&error.handle(), can_gc);
310 return promise;
311 }
312
313 if stream.close_queued_or_in_flight() || stream.is_closed() {
316 let promise = Promise::new(global, can_gc);
319 promise.reject_error(
320 Error::Type(c"Stream has been closed, or has close queued or in-flight".to_owned()),
321 can_gc,
322 );
323 return promise;
324 }
325
326 if stream.is_erroring() {
328 rooted!(in(*cx) let mut error = UndefinedValue());
330 stream.get_stored_error(error.handle_mut());
331 let promise = Promise::new(global, can_gc);
332 promise.reject_native(&error.handle(), can_gc);
333 return promise;
334 }
335
336 assert!(stream.is_writable());
338
339 let promise = stream.add_write_request(global, can_gc);
341
342 controller.write(cx, global, chunk, chunk_size, can_gc);
344
345 promise
347 }
348
349 pub(crate) fn release(&self, cx: SafeJSContext, global: &GlobalScope, can_gc: CanGc) {
351 let Some(stream) = self.stream.get() else {
353 unreachable!("Stream should be set.");
355 };
356
357 assert!(stream.get_writer().is_some_and(|writer| &*writer == self));
359
360 let released_error = Error::Type(c"Writer has been released".to_owned());
362
363 rooted!(in(*cx) let mut error = UndefinedValue());
365 released_error.to_jsval(cx, global, error.handle_mut(), can_gc);
366
367 self.ensure_ready_promise_rejected(global, error.handle(), can_gc);
369
370 self.ensure_closed_promise_rejected(global, error.handle(), can_gc);
372
373 stream.set_writer(None);
375
376 self.stream.set(None);
378 }
379
380 pub(crate) fn close_with_error_propagation(
382 &self,
383 cx: SafeJSContext,
384 global: &GlobalScope,
385 can_gc: CanGc,
386 ) -> Rc<Promise> {
387 let Some(stream) = self.stream.get() else {
389 unreachable!("Stream should be set.");
391 };
392
393 if stream.close_queued_or_in_flight() || stream.is_closed() {
399 let promise = Promise::new(global, can_gc);
401 promise.resolve_native(&(), can_gc);
402 return promise;
403 }
404
405 if stream.is_errored() {
407 rooted!(in(*cx) let mut error = UndefinedValue());
409 stream.get_stored_error(error.handle_mut());
410 let promise = Promise::new(global, can_gc);
411 promise.reject_native(&error.handle(), can_gc);
412 return promise;
413 }
414
415 assert!(stream.is_writable() || stream.is_erroring());
417
418 self.close(cx, global, can_gc)
420 }
421
422 pub(crate) fn get_stream(&self) -> Option<DomRoot<WritableStream>> {
423 self.stream.get()
424 }
425}
426
427impl WritableStreamDefaultWriterMethods<crate::DomTypeHolder> for WritableStreamDefaultWriter {
428 fn Closed(&self) -> Rc<Promise> {
430 return self.closed_promise.borrow().clone();
432 }
433
434 fn GetDesiredSize(&self) -> Result<Option<f64>, Error> {
436 let Some(stream) = self.stream.get() else {
438 return Err(Error::Type(c"Stream is undefined".to_owned()));
439 };
440
441 Ok(stream.get_desired_size())
443 }
444
445 fn Ready(&self) -> Rc<Promise> {
447 return self.ready_promise.borrow().clone();
449 }
450
451 fn Abort(&self, cx: &mut CurrentRealm, reason: SafeHandleValue) -> Rc<Promise> {
453 let global = GlobalScope::from_current_realm(cx);
454
455 if self.stream.get().is_none() {
457 let promise = Promise::new2(cx, &global);
459 promise.reject_error(
460 Error::Type(c"Stream is undefined".to_owned()),
461 CanGc::from_cx(cx),
462 );
463 return promise;
464 }
465
466 self.abort(cx, &global, reason)
468 }
469
470 fn Close(&self, in_realm: InRealm, can_gc: CanGc) -> Rc<Promise> {
472 let cx = GlobalScope::get_cx();
473 let global = GlobalScope::from_safe_context(cx, in_realm);
474 let promise = Promise::new(&global, can_gc);
475
476 let Some(stream) = self.stream.get() else {
478 promise.reject_error(Error::Type(c"Stream is undefined".to_owned()), can_gc);
481 return promise;
482 };
483
484 if stream.close_queued_or_in_flight() {
486 promise.reject_error(
488 Error::Type(c"Stream has closed queued or in-flight".to_owned()),
489 can_gc,
490 );
491 return promise;
492 }
493
494 self.close(cx, &global, can_gc)
495 }
496
497 fn ReleaseLock(&self, can_gc: CanGc) {
499 let Some(stream) = self.stream.get() else {
501 return;
503 };
504
505 assert!(stream.get_writer().is_some());
507
508 let global = self.global();
509 let cx = GlobalScope::get_cx();
510
511 self.release(cx, &global, can_gc);
513 }
514
515 fn Write(
517 &self,
518 cx: SafeJSContext,
519 chunk: SafeHandleValue,
520 realm: InRealm,
521 can_gc: CanGc,
522 ) -> Rc<Promise> {
523 let global = GlobalScope::from_safe_context(cx, realm);
524
525 if self.stream.get().is_none() {
527 let global = GlobalScope::from_safe_context(cx, realm);
529 let promise = Promise::new(&global, can_gc);
530 promise.reject_error(Error::Type(c"Stream is undefined".to_owned()), can_gc);
531 return promise;
532 }
533
534 self.write(cx, &global, chunk, can_gc)
536 }
537
538 fn Constructor(
540 global: &GlobalScope,
541 proto: Option<SafeHandleObject>,
542 can_gc: CanGc,
543 stream: &WritableStream,
544 ) -> Result<DomRoot<WritableStreamDefaultWriter>, Error> {
545 let writer = WritableStreamDefaultWriter::new(global, proto, can_gc);
546
547 let cx = GlobalScope::get_cx();
548
549 writer.setup(cx, stream, can_gc)?;
551
552 Ok(writer)
553 }
554}