1use std::cell::RefCell;
6use std::rc::Rc;
7
8use dom_struct::dom_struct;
9use js::jsval::UndefinedValue;
10use js::realm::CurrentRealm;
11use js::rust::{HandleObject as SafeHandleObject, HandleValue as SafeHandleValue};
12
13use crate::dom::bindings::codegen::Bindings::WritableStreamDefaultWriterBinding::WritableStreamDefaultWriterMethods;
14use crate::dom::bindings::error::{Error, ErrorToJsval};
15use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object_with_proto};
16use crate::dom::bindings::root::{DomRoot, MutNullableDom};
17use crate::dom::globalscope::GlobalScope;
18use crate::dom::promise::Promise;
19use crate::dom::stream::writablestream::WritableStream;
20use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
21
22#[dom_struct]
24pub struct WritableStreamDefaultWriter {
25 reflector_: Reflector,
26
27 #[conditional_malloc_size_of]
28 ready_promise: RefCell<Rc<Promise>>,
29
30 #[conditional_malloc_size_of]
32 closed_promise: RefCell<Rc<Promise>>,
33
34 stream: MutNullableDom<WritableStream>,
36}
37
38impl WritableStreamDefaultWriter {
39 fn new_inherited(global: &GlobalScope, can_gc: CanGc) -> WritableStreamDefaultWriter {
42 WritableStreamDefaultWriter {
43 reflector_: Reflector::new(),
44 stream: Default::default(),
45 closed_promise: RefCell::new(Promise::new(global, can_gc)),
46 ready_promise: RefCell::new(Promise::new(global, can_gc)),
47 }
48 }
49
50 pub(crate) fn new(
51 global: &GlobalScope,
52 proto: Option<SafeHandleObject>,
53 can_gc: CanGc,
54 ) -> DomRoot<WritableStreamDefaultWriter> {
55 reflect_dom_object_with_proto(
56 Box::new(WritableStreamDefaultWriter::new_inherited(global, can_gc)),
57 global,
58 proto,
59 can_gc,
60 )
61 }
62
63 pub(crate) fn setup(
66 &self,
67 cx: SafeJSContext,
68 stream: &WritableStream,
69 can_gc: CanGc,
70 ) -> Result<(), Error> {
71 if stream.is_locked() {
73 return Err(Error::Type(c"Stream is locked".to_owned()));
74 }
75
76 self.stream.set(Some(stream));
78
79 stream.set_writer(Some(self));
81
82 if stream.is_writable() {
86 if !stream.close_queued_or_in_flight() && stream.get_backpressure() {
89 } else {
92 self.ready_promise.borrow().resolve_native(&(), can_gc);
95 }
96
97 return Ok(());
100 }
101
102 if stream.is_erroring() {
104 rooted!(in(*cx) let mut error = UndefinedValue());
105 stream.get_stored_error(error.handle_mut());
106
107 let ready_promise = self.ready_promise.borrow();
111 ready_promise.reject_native(&error.handle(), can_gc);
112 ready_promise.set_promise_is_handled();
113
114 return Ok(());
117 }
118
119 if stream.is_closed() {
121 self.ready_promise.borrow().resolve_native(&(), can_gc);
124
125 self.closed_promise.borrow().resolve_native(&(), can_gc);
128 return Ok(());
129 }
130
131 assert!(stream.is_errored());
134
135 rooted!(in(*cx) let mut error = UndefinedValue());
137 stream.get_stored_error(error.handle_mut());
138
139 let ready_promise = self.ready_promise.borrow();
143 ready_promise.reject_native(&error.handle(), can_gc);
144 ready_promise.set_promise_is_handled();
145
146 let ready_promise = self.closed_promise.borrow();
150 ready_promise.reject_native(&error.handle(), can_gc);
151 ready_promise.set_promise_is_handled();
152
153 Ok(())
154 }
155
156 pub(crate) fn reject_closed_promise_with_stored_error(
157 &self,
158 cx: &mut js::context::JSContext,
159 error: &SafeHandleValue,
160 ) {
161 self.closed_promise
162 .borrow()
163 .reject_native(error, CanGc::from_cx(cx));
164 }
165
166 pub(crate) fn set_close_promise_is_handled(&self) {
167 self.closed_promise.borrow().set_promise_is_handled();
168 }
169
170 pub(crate) fn set_ready_promise(&self, promise: Rc<Promise>) {
171 *self.ready_promise.borrow_mut() = promise;
172 }
173
174 pub(crate) fn resolve_ready_promise_with_undefined(&self, can_gc: CanGc) {
175 self.ready_promise.borrow().resolve_native(&(), can_gc);
176 }
177
178 pub(crate) fn resolve_closed_promise_with_undefined(&self, can_gc: CanGc) {
179 self.closed_promise.borrow().resolve_native(&(), can_gc);
180 }
181
182 pub(crate) fn ensure_ready_promise_rejected(
184 &self,
185 global: &GlobalScope,
186 error: SafeHandleValue,
187 can_gc: CanGc,
188 ) {
189 let ready_promise = self.ready_promise.borrow().clone();
190
191 if ready_promise.is_pending() {
193 ready_promise.reject_native(&error, can_gc);
195
196 ready_promise.set_promise_is_handled();
198 } else {
199 let promise = Promise::new(global, can_gc);
201 promise.reject_native(&error, can_gc);
202
203 promise.set_promise_is_handled();
205 *self.ready_promise.borrow_mut() = promise;
206 }
207 }
208
209 pub(crate) fn ensure_closed_promise_rejected(
211 &self,
212 global: &GlobalScope,
213 error: SafeHandleValue,
214 can_gc: CanGc,
215 ) {
216 let closed_promise = self.closed_promise.borrow().clone();
217
218 if closed_promise.is_pending() {
220 closed_promise.reject_native(&error, can_gc);
222
223 closed_promise.set_promise_is_handled();
225 } else {
226 let promise = Promise::new(global, can_gc);
228 promise.reject_native(&error, can_gc);
229
230 promise.set_promise_is_handled();
232 *self.closed_promise.borrow_mut() = promise;
233 }
234 }
235
236 fn abort(
238 &self,
239 cx: &mut CurrentRealm,
240 global: &GlobalScope,
241 reason: SafeHandleValue,
242 ) -> Rc<Promise> {
243 let Some(stream) = self.stream.get() else {
245 unreachable!("Stream should be set.");
247 };
248
249 stream.abort(cx, global, reason)
251 }
252
253 fn close(&self, cx: &mut js::context::JSContext, global: &GlobalScope) -> Rc<Promise> {
255 let Some(stream) = self.stream.get() else {
257 unreachable!("Stream should be set.");
259 };
260
261 stream.close(cx, global)
263 }
264
265 pub(crate) fn write(
267 &self,
268 cx: &mut js::context::JSContext,
269 global: &GlobalScope,
270 chunk: SafeHandleValue,
271 ) -> Rc<Promise> {
272 let Some(stream) = self.stream.get() else {
274 unreachable!("Stream should be set.");
276 };
277
278 let Some(controller) = stream.get_controller() else {
281 unreachable!("Controller should be set.");
282 };
283
284 let chunk_size = controller.get_chunk_size(cx, global, chunk);
286
287 if !self
290 .stream
291 .get()
292 .is_some_and(|current_stream| current_stream == stream)
293 {
294 let promise = Promise::new2(cx, global);
295 promise.reject_error(
296 Error::Type(c"Stream is not equal to writer stream".to_owned()),
297 CanGc::from_cx(cx),
298 );
299 return promise;
300 }
301
302 if stream.is_errored() {
305 rooted!(&in(cx) let mut error = UndefinedValue());
307 stream.get_stored_error(error.handle_mut());
308 let promise = Promise::new2(cx, global);
309 promise.reject_native(&error.handle(), CanGc::from_cx(cx));
310 return promise;
311 }
312
313 if stream.close_queued_or_in_flight() || stream.is_closed() {
316 let promise = Promise::new2(cx, global);
319 promise.reject_error(
320 Error::Type(c"Stream has been closed, or has close queued or in-flight".to_owned()),
321 CanGc::from_cx(cx),
322 );
323 return promise;
324 }
325
326 if stream.is_erroring() {
328 rooted!(&in(cx) let mut error = UndefinedValue());
330 stream.get_stored_error(error.handle_mut());
331 let promise = Promise::new2(cx, global);
332 promise.reject_native(&error.handle(), CanGc::from_cx(cx));
333 return promise;
334 }
335
336 assert!(stream.is_writable());
338
339 let promise = stream.add_write_request(global, CanGc::from_cx(cx));
341
342 controller.write(cx, global, chunk, chunk_size);
344
345 promise
347 }
348
349 pub(crate) fn release(&self, cx: SafeJSContext, global: &GlobalScope, can_gc: CanGc) {
351 let Some(stream) = self.stream.get() else {
353 unreachable!("Stream should be set.");
355 };
356
357 assert!(stream.get_writer().is_some_and(|writer| &*writer == self));
359
360 let released_error = Error::Type(c"Writer has been released".to_owned());
362
363 rooted!(in(*cx) let mut error = UndefinedValue());
365 released_error.to_jsval(cx, global, error.handle_mut(), can_gc);
366
367 self.ensure_ready_promise_rejected(global, error.handle(), can_gc);
369
370 self.ensure_closed_promise_rejected(global, error.handle(), can_gc);
372
373 stream.set_writer(None);
375
376 self.stream.set(None);
378 }
379
380 pub(crate) fn close_with_error_propagation(
382 &self,
383 cx: &mut js::context::JSContext,
384 global: &GlobalScope,
385 ) -> Rc<Promise> {
386 let Some(stream) = self.stream.get() else {
388 unreachable!("Stream should be set.");
390 };
391
392 if stream.close_queued_or_in_flight() || stream.is_closed() {
398 let promise = Promise::new2(cx, global);
400 promise.resolve_native(&(), CanGc::from_cx(cx));
401 return promise;
402 }
403
404 if stream.is_errored() {
406 rooted!(&in(cx) let mut error = UndefinedValue());
408 stream.get_stored_error(error.handle_mut());
409 let promise = Promise::new2(cx, global);
410 promise.reject_native(&error.handle(), CanGc::from_cx(cx));
411 return promise;
412 }
413
414 assert!(stream.is_writable() || stream.is_erroring());
416
417 self.close(cx, global)
419 }
420
421 pub(crate) fn get_stream(&self) -> Option<DomRoot<WritableStream>> {
422 self.stream.get()
423 }
424}
425
426impl WritableStreamDefaultWriterMethods<crate::DomTypeHolder> for WritableStreamDefaultWriter {
427 fn Closed(&self) -> Rc<Promise> {
429 return self.closed_promise.borrow().clone();
431 }
432
433 fn GetDesiredSize(&self) -> Result<Option<f64>, Error> {
435 let Some(stream) = self.stream.get() else {
437 return Err(Error::Type(c"Stream is undefined".to_owned()));
438 };
439
440 Ok(stream.get_desired_size())
442 }
443
444 fn Ready(&self) -> Rc<Promise> {
446 return self.ready_promise.borrow().clone();
448 }
449
450 fn Abort(&self, cx: &mut CurrentRealm, reason: SafeHandleValue) -> Rc<Promise> {
452 let global = GlobalScope::from_current_realm(cx);
453
454 if self.stream.get().is_none() {
456 let promise = Promise::new2(cx, &global);
458 promise.reject_error(
459 Error::Type(c"Stream is undefined".to_owned()),
460 CanGc::from_cx(cx),
461 );
462 return promise;
463 }
464
465 self.abort(cx, &global, reason)
467 }
468
469 fn Close(&self, cx: &mut CurrentRealm) -> Rc<Promise> {
471 let global = GlobalScope::from_current_realm(cx);
472 let promise = Promise::new2(cx, &global);
473
474 let Some(stream) = self.stream.get() else {
476 promise.reject_error(
479 Error::Type(c"Stream is undefined".to_owned()),
480 CanGc::from_cx(cx),
481 );
482 return promise;
483 };
484
485 if stream.close_queued_or_in_flight() {
487 promise.reject_error(
489 Error::Type(c"Stream has closed queued or in-flight".to_owned()),
490 CanGc::from_cx(cx),
491 );
492 return promise;
493 }
494
495 self.close(cx, &global)
496 }
497
498 fn ReleaseLock(&self, can_gc: CanGc) {
500 let Some(stream) = self.stream.get() else {
502 return;
504 };
505
506 assert!(stream.get_writer().is_some());
508
509 let global = self.global();
510 let cx = GlobalScope::get_cx();
511
512 self.release(cx, &global, can_gc);
514 }
515
516 fn Write(&self, cx: &mut CurrentRealm, chunk: SafeHandleValue) -> Rc<Promise> {
518 let global = GlobalScope::from_current_realm(cx);
519
520 if self.stream.get().is_none() {
522 let promise = Promise::new2(cx, &global);
524 promise.reject_error(
525 Error::Type(c"Stream is undefined".to_owned()),
526 CanGc::from_cx(cx),
527 );
528 return promise;
529 }
530
531 self.write(cx, &global, chunk)
533 }
534
535 fn Constructor(
537 global: &GlobalScope,
538 proto: Option<SafeHandleObject>,
539 can_gc: CanGc,
540 stream: &WritableStream,
541 ) -> Result<DomRoot<WritableStreamDefaultWriter>, Error> {
542 let writer = WritableStreamDefaultWriter::new(global, proto, can_gc);
543
544 let cx = GlobalScope::get_cx();
545
546 writer.setup(cx, stream, can_gc)?;
548
549 Ok(writer)
550 }
551}