1use std::cell::RefCell;
6use std::rc::Rc;
7
8use dom_struct::dom_struct;
9use js::context::JSContext;
10use js::jsval::UndefinedValue;
11use js::realm::CurrentRealm;
12use js::rust::{HandleObject as SafeHandleObject, HandleValue as SafeHandleValue};
13use script_bindings::reflector::{Reflector, reflect_dom_object_with_proto};
14
15use crate::dom::bindings::codegen::Bindings::WritableStreamDefaultWriterBinding::WritableStreamDefaultWriterMethods;
16use crate::dom::bindings::error::{Error, ErrorToJsval};
17use crate::dom::bindings::reflector::DomGlobal;
18use crate::dom::bindings::root::{DomRoot, MutNullableDom};
19use crate::dom::globalscope::GlobalScope;
20use crate::dom::promise::Promise;
21use crate::dom::stream::writablestream::WritableStream;
22use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
23
24#[dom_struct]
26pub struct WritableStreamDefaultWriter {
27 reflector_: Reflector,
28
29 #[conditional_malloc_size_of]
30 ready_promise: RefCell<Rc<Promise>>,
31
32 #[conditional_malloc_size_of]
34 closed_promise: RefCell<Rc<Promise>>,
35
36 stream: MutNullableDom<WritableStream>,
38}
39
40impl WritableStreamDefaultWriter {
41 fn new_inherited(global: &GlobalScope, can_gc: CanGc) -> WritableStreamDefaultWriter {
44 WritableStreamDefaultWriter {
45 reflector_: Reflector::new(),
46 stream: Default::default(),
47 closed_promise: RefCell::new(Promise::new(global, can_gc)),
48 ready_promise: RefCell::new(Promise::new(global, can_gc)),
49 }
50 }
51
52 pub(crate) fn new(
53 global: &GlobalScope,
54 proto: Option<SafeHandleObject>,
55 can_gc: CanGc,
56 ) -> DomRoot<WritableStreamDefaultWriter> {
57 reflect_dom_object_with_proto(
58 Box::new(WritableStreamDefaultWriter::new_inherited(global, can_gc)),
59 global,
60 proto,
61 can_gc,
62 )
63 }
64
65 pub(crate) fn setup(
68 &self,
69 cx: SafeJSContext,
70 stream: &WritableStream,
71 can_gc: CanGc,
72 ) -> Result<(), Error> {
73 if stream.is_locked() {
75 return Err(Error::Type(c"Stream is locked".to_owned()));
76 }
77
78 self.stream.set(Some(stream));
80
81 stream.set_writer(Some(self));
83
84 if stream.is_writable() {
88 if !stream.close_queued_or_in_flight() && stream.get_backpressure() {
91 } else {
94 self.ready_promise.borrow().resolve_native(&(), can_gc);
97 }
98
99 return Ok(());
102 }
103
104 if stream.is_erroring() {
106 rooted!(in(*cx) let mut error = UndefinedValue());
107 stream.get_stored_error(error.handle_mut());
108
109 let ready_promise = self.ready_promise.borrow();
113 ready_promise.reject_native(&error.handle(), can_gc);
114 ready_promise.set_promise_is_handled();
115
116 return Ok(());
119 }
120
121 if stream.is_closed() {
123 self.ready_promise.borrow().resolve_native(&(), can_gc);
126
127 self.closed_promise.borrow().resolve_native(&(), can_gc);
130 return Ok(());
131 }
132
133 assert!(stream.is_errored());
136
137 rooted!(in(*cx) let mut error = UndefinedValue());
139 stream.get_stored_error(error.handle_mut());
140
141 let ready_promise = self.ready_promise.borrow();
145 ready_promise.reject_native(&error.handle(), can_gc);
146 ready_promise.set_promise_is_handled();
147
148 let ready_promise = self.closed_promise.borrow();
152 ready_promise.reject_native(&error.handle(), can_gc);
153 ready_promise.set_promise_is_handled();
154
155 Ok(())
156 }
157
158 pub(crate) fn reject_closed_promise_with_stored_error(
159 &self,
160 cx: &mut JSContext,
161 error: &SafeHandleValue,
162 ) {
163 self.closed_promise
164 .borrow()
165 .reject_native_with_cx(cx, error);
166 }
167
168 pub(crate) fn set_close_promise_is_handled(&self) {
169 self.closed_promise.borrow().set_promise_is_handled();
170 }
171
172 pub(crate) fn set_ready_promise(&self, promise: Rc<Promise>) {
173 *self.ready_promise.borrow_mut() = promise;
174 }
175
176 pub(crate) fn resolve_ready_promise_with_undefined(&self, can_gc: CanGc) {
177 self.ready_promise.borrow().resolve_native(&(), can_gc);
178 }
179
180 pub(crate) fn resolve_closed_promise_with_undefined(&self, can_gc: CanGc) {
181 self.closed_promise.borrow().resolve_native(&(), can_gc);
182 }
183
184 pub(crate) fn ensure_ready_promise_rejected(
186 &self,
187 cx: &mut JSContext,
188 global: &GlobalScope,
189 error: SafeHandleValue,
190 ) {
191 let ready_promise = self.ready_promise.borrow().clone();
192
193 if ready_promise.is_pending() {
195 ready_promise.reject_native_with_cx(cx, &error);
197
198 ready_promise.set_promise_is_handled();
200 } else {
201 let promise = Promise::new_rejected(cx, global, error);
203
204 promise.set_promise_is_handled();
206 *self.ready_promise.borrow_mut() = promise;
207 }
208 }
209
210 fn ensure_closed_promise_rejected(
212 &self,
213 cx: &mut JSContext,
214 global: &GlobalScope,
215 error: SafeHandleValue,
216 ) {
217 let closed_promise = self.closed_promise.borrow().clone();
218
219 if closed_promise.is_pending() {
221 closed_promise.reject_native_with_cx(cx, &error);
223
224 closed_promise.set_promise_is_handled();
226 } else {
227 let promise = Promise::new_rejected(cx, global, error);
229
230 promise.set_promise_is_handled();
232 *self.closed_promise.borrow_mut() = promise;
233 }
234 }
235
236 fn abort(
238 &self,
239 cx: &mut CurrentRealm,
240 global: &GlobalScope,
241 reason: SafeHandleValue,
242 ) -> Rc<Promise> {
243 let Some(stream) = self.stream.get() else {
245 unreachable!("Stream should be set.");
247 };
248
249 stream.abort(cx, global, reason)
251 }
252
253 fn close(&self, cx: &mut JSContext, global: &GlobalScope) -> Rc<Promise> {
255 let Some(stream) = self.stream.get() else {
257 unreachable!("Stream should be set.");
259 };
260
261 stream.close(cx, global)
263 }
264
265 pub(crate) fn write(
267 &self,
268 cx: &mut JSContext,
269 global: &GlobalScope,
270 chunk: SafeHandleValue,
271 ) -> Rc<Promise> {
272 let Some(stream) = self.stream.get() else {
274 unreachable!("Stream should be set.");
276 };
277
278 let Some(controller) = stream.get_controller() else {
281 unreachable!("Controller should be set.");
282 };
283
284 let chunk_size = controller.get_chunk_size(cx, global, chunk);
286
287 if !self
290 .stream
291 .get()
292 .is_some_and(|current_stream| current_stream == stream)
293 {
294 let promise = Promise::new2(cx, global);
295 promise.reject_error_with_cx(
296 cx,
297 Error::Type(c"Stream is not equal to writer stream".to_owned()),
298 );
299 return promise;
300 }
301
302 if stream.is_errored() {
305 rooted!(&in(cx) let mut error = UndefinedValue());
307 stream.get_stored_error(error.handle_mut());
308 let promise = Promise::new2(cx, global);
309 promise.reject_native_with_cx(cx, &error.handle());
310 return promise;
311 }
312
313 if stream.close_queued_or_in_flight() || stream.is_closed() {
316 let promise = Promise::new2(cx, global);
319 promise.reject_error_with_cx(
320 cx,
321 Error::Type(c"Stream has been closed, or has close queued or in-flight".to_owned()),
322 );
323 return promise;
324 }
325
326 if stream.is_erroring() {
328 rooted!(&in(cx) let mut error = UndefinedValue());
330 stream.get_stored_error(error.handle_mut());
331 let promise = Promise::new2(cx, global);
332 promise.reject_native_with_cx(cx, &error.handle());
333 return promise;
334 }
335
336 assert!(stream.is_writable());
338
339 let promise = stream.add_write_request(global, CanGc::from_cx(cx));
341
342 controller.write(cx, global, chunk, chunk_size);
344
345 promise
347 }
348
349 pub(crate) fn release(&self, cx: &mut JSContext, global: &GlobalScope) {
351 let Some(stream) = self.stream.get() else {
353 unreachable!("Stream should be set.");
355 };
356
357 assert!(stream.get_writer().is_some_and(|writer| &*writer == self));
359
360 let released_error = Error::Type(c"Writer has been released".to_owned());
362
363 rooted!(&in(cx) let mut error = UndefinedValue());
365 released_error.to_jsval(cx.into(), global, error.handle_mut(), CanGc::from_cx(cx));
366
367 self.ensure_ready_promise_rejected(cx, global, error.handle());
369
370 self.ensure_closed_promise_rejected(cx, global, error.handle());
372
373 stream.set_writer(None);
375
376 self.stream.set(None);
378 }
379
380 pub(crate) fn close_with_error_propagation(
382 &self,
383 cx: &mut JSContext,
384 global: &GlobalScope,
385 ) -> Rc<Promise> {
386 let Some(stream) = self.stream.get() else {
388 unreachable!("Stream should be set.");
390 };
391
392 if stream.close_queued_or_in_flight() || stream.is_closed() {
398 let promise = Promise::new2(cx, global);
400 promise.resolve_native_with_cx(cx, &());
401 return promise;
402 }
403
404 if stream.is_errored() {
406 rooted!(&in(cx) let mut error = UndefinedValue());
408 stream.get_stored_error(error.handle_mut());
409 let promise = Promise::new2(cx, global);
410 promise.reject_native_with_cx(cx, &error.handle());
411 return promise;
412 }
413
414 assert!(stream.is_writable() || stream.is_erroring());
416
417 self.close(cx, global)
419 }
420
421 pub(crate) fn get_stream(&self) -> Option<DomRoot<WritableStream>> {
422 self.stream.get()
423 }
424}
425
426impl WritableStreamDefaultWriterMethods<crate::DomTypeHolder> for WritableStreamDefaultWriter {
427 fn Closed(&self) -> Rc<Promise> {
429 return self.closed_promise.borrow().clone();
431 }
432
433 fn GetDesiredSize(&self) -> Result<Option<f64>, Error> {
435 let Some(stream) = self.stream.get() else {
437 return Err(Error::Type(c"Stream is undefined".to_owned()));
438 };
439
440 Ok(stream.get_desired_size())
442 }
443
444 fn Ready(&self) -> Rc<Promise> {
446 return self.ready_promise.borrow().clone();
448 }
449
450 fn Abort(&self, cx: &mut CurrentRealm, reason: SafeHandleValue) -> Rc<Promise> {
452 let global = GlobalScope::from_current_realm(cx);
453
454 if self.stream.get().is_none() {
456 let promise = Promise::new2(cx, &global);
458 promise.reject_error_with_cx(cx, Error::Type(c"Stream is undefined".to_owned()));
459 return promise;
460 }
461
462 self.abort(cx, &global, reason)
464 }
465
466 fn Close(&self, cx: &mut CurrentRealm) -> Rc<Promise> {
468 let global = GlobalScope::from_current_realm(cx);
469 let promise = Promise::new2(cx, &global);
470
471 let Some(stream) = self.stream.get() else {
473 promise.reject_error_with_cx(cx, Error::Type(c"Stream is undefined".to_owned()));
476 return promise;
477 };
478
479 if stream.close_queued_or_in_flight() {
481 promise.reject_error_with_cx(
483 cx,
484 Error::Type(c"Stream has closed queued or in-flight".to_owned()),
485 );
486 return promise;
487 }
488
489 self.close(cx, &global)
490 }
491
492 fn ReleaseLock(&self, cx: &mut JSContext) {
494 let Some(stream) = self.stream.get() else {
496 return;
498 };
499
500 assert!(stream.get_writer().is_some());
502
503 let global = self.global();
504
505 self.release(cx, &global);
507 }
508
509 fn Write(&self, cx: &mut CurrentRealm, chunk: SafeHandleValue) -> Rc<Promise> {
511 let global = GlobalScope::from_current_realm(cx);
512
513 if self.stream.get().is_none() {
515 let promise = Promise::new2(cx, &global);
517 promise.reject_error_with_cx(cx, Error::Type(c"Stream is undefined".to_owned()));
518 return promise;
519 }
520
521 self.write(cx, &global, chunk)
523 }
524
525 fn Constructor(
527 global: &GlobalScope,
528 proto: Option<SafeHandleObject>,
529 can_gc: CanGc,
530 stream: &WritableStream,
531 ) -> Result<DomRoot<WritableStreamDefaultWriter>, Error> {
532 let writer = WritableStreamDefaultWriter::new(global, proto, can_gc);
533
534 let cx = GlobalScope::get_cx();
535
536 writer.setup(cx, stream, can_gc)?;
538
539 Ok(writer)
540 }
541}