1use std::cell::RefCell;
6use std::rc::Rc;
7
8use dom_struct::dom_struct;
9use js::jsval::UndefinedValue;
10use js::realm::CurrentRealm;
11use js::rust::{HandleObject as SafeHandleObject, HandleValue as SafeHandleValue};
12use script_bindings::reflector::{Reflector, reflect_dom_object_with_proto};
13
14use crate::dom::bindings::codegen::Bindings::WritableStreamDefaultWriterBinding::WritableStreamDefaultWriterMethods;
15use crate::dom::bindings::error::{Error, ErrorToJsval};
16use crate::dom::bindings::reflector::DomGlobal;
17use crate::dom::bindings::root::{DomRoot, MutNullableDom};
18use crate::dom::globalscope::GlobalScope;
19use crate::dom::promise::Promise;
20use crate::dom::stream::writablestream::WritableStream;
21use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
22
23#[dom_struct]
25pub struct WritableStreamDefaultWriter {
26 reflector_: Reflector,
27
28 #[conditional_malloc_size_of]
29 ready_promise: RefCell<Rc<Promise>>,
30
31 #[conditional_malloc_size_of]
33 closed_promise: RefCell<Rc<Promise>>,
34
35 stream: MutNullableDom<WritableStream>,
37}
38
39impl WritableStreamDefaultWriter {
40 fn new_inherited(global: &GlobalScope, can_gc: CanGc) -> WritableStreamDefaultWriter {
43 WritableStreamDefaultWriter {
44 reflector_: Reflector::new(),
45 stream: Default::default(),
46 closed_promise: RefCell::new(Promise::new(global, can_gc)),
47 ready_promise: RefCell::new(Promise::new(global, can_gc)),
48 }
49 }
50
51 pub(crate) fn new(
52 global: &GlobalScope,
53 proto: Option<SafeHandleObject>,
54 can_gc: CanGc,
55 ) -> DomRoot<WritableStreamDefaultWriter> {
56 reflect_dom_object_with_proto(
57 Box::new(WritableStreamDefaultWriter::new_inherited(global, can_gc)),
58 global,
59 proto,
60 can_gc,
61 )
62 }
63
64 pub(crate) fn setup(
67 &self,
68 cx: SafeJSContext,
69 stream: &WritableStream,
70 can_gc: CanGc,
71 ) -> Result<(), Error> {
72 if stream.is_locked() {
74 return Err(Error::Type(c"Stream is locked".to_owned()));
75 }
76
77 self.stream.set(Some(stream));
79
80 stream.set_writer(Some(self));
82
83 if stream.is_writable() {
87 if !stream.close_queued_or_in_flight() && stream.get_backpressure() {
90 } else {
93 self.ready_promise.borrow().resolve_native(&(), can_gc);
96 }
97
98 return Ok(());
101 }
102
103 if stream.is_erroring() {
105 rooted!(in(*cx) let mut error = UndefinedValue());
106 stream.get_stored_error(error.handle_mut());
107
108 let ready_promise = self.ready_promise.borrow();
112 ready_promise.reject_native(&error.handle(), can_gc);
113 ready_promise.set_promise_is_handled();
114
115 return Ok(());
118 }
119
120 if stream.is_closed() {
122 self.ready_promise.borrow().resolve_native(&(), can_gc);
125
126 self.closed_promise.borrow().resolve_native(&(), can_gc);
129 return Ok(());
130 }
131
132 assert!(stream.is_errored());
135
136 rooted!(in(*cx) let mut error = UndefinedValue());
138 stream.get_stored_error(error.handle_mut());
139
140 let ready_promise = self.ready_promise.borrow();
144 ready_promise.reject_native(&error.handle(), can_gc);
145 ready_promise.set_promise_is_handled();
146
147 let ready_promise = self.closed_promise.borrow();
151 ready_promise.reject_native(&error.handle(), can_gc);
152 ready_promise.set_promise_is_handled();
153
154 Ok(())
155 }
156
157 pub(crate) fn reject_closed_promise_with_stored_error(
158 &self,
159 cx: &mut js::context::JSContext,
160 error: &SafeHandleValue,
161 ) {
162 self.closed_promise
163 .borrow()
164 .reject_native_with_cx(cx, error);
165 }
166
167 pub(crate) fn set_close_promise_is_handled(&self) {
168 self.closed_promise.borrow().set_promise_is_handled();
169 }
170
171 pub(crate) fn set_ready_promise(&self, promise: Rc<Promise>) {
172 *self.ready_promise.borrow_mut() = promise;
173 }
174
175 pub(crate) fn resolve_ready_promise_with_undefined(&self, can_gc: CanGc) {
176 self.ready_promise.borrow().resolve_native(&(), can_gc);
177 }
178
179 pub(crate) fn resolve_closed_promise_with_undefined(&self, can_gc: CanGc) {
180 self.closed_promise.borrow().resolve_native(&(), can_gc);
181 }
182
183 pub(crate) fn ensure_ready_promise_rejected(
185 &self,
186 global: &GlobalScope,
187 error: SafeHandleValue,
188 can_gc: CanGc,
189 ) {
190 let ready_promise = self.ready_promise.borrow().clone();
191
192 if ready_promise.is_pending() {
194 ready_promise.reject_native(&error, can_gc);
196
197 ready_promise.set_promise_is_handled();
199 } else {
200 let promise = Promise::new(global, can_gc);
202 promise.reject_native(&error, can_gc);
203
204 promise.set_promise_is_handled();
206 *self.ready_promise.borrow_mut() = promise;
207 }
208 }
209
210 pub(crate) fn ensure_closed_promise_rejected(
212 &self,
213 global: &GlobalScope,
214 error: SafeHandleValue,
215 can_gc: CanGc,
216 ) {
217 let closed_promise = self.closed_promise.borrow().clone();
218
219 if closed_promise.is_pending() {
221 closed_promise.reject_native(&error, can_gc);
223
224 closed_promise.set_promise_is_handled();
226 } else {
227 let promise = Promise::new(global, can_gc);
229 promise.reject_native(&error, can_gc);
230
231 promise.set_promise_is_handled();
233 *self.closed_promise.borrow_mut() = promise;
234 }
235 }
236
237 fn abort(
239 &self,
240 cx: &mut CurrentRealm,
241 global: &GlobalScope,
242 reason: SafeHandleValue,
243 ) -> Rc<Promise> {
244 let Some(stream) = self.stream.get() else {
246 unreachable!("Stream should be set.");
248 };
249
250 stream.abort(cx, global, reason)
252 }
253
254 fn close(&self, cx: &mut js::context::JSContext, global: &GlobalScope) -> Rc<Promise> {
256 let Some(stream) = self.stream.get() else {
258 unreachable!("Stream should be set.");
260 };
261
262 stream.close(cx, global)
264 }
265
266 pub(crate) fn write(
268 &self,
269 cx: &mut js::context::JSContext,
270 global: &GlobalScope,
271 chunk: SafeHandleValue,
272 ) -> Rc<Promise> {
273 let Some(stream) = self.stream.get() else {
275 unreachable!("Stream should be set.");
277 };
278
279 let Some(controller) = stream.get_controller() else {
282 unreachable!("Controller should be set.");
283 };
284
285 let chunk_size = controller.get_chunk_size(cx, global, chunk);
287
288 if !self
291 .stream
292 .get()
293 .is_some_and(|current_stream| current_stream == stream)
294 {
295 let promise = Promise::new2(cx, global);
296 promise.reject_error_with_cx(
297 cx,
298 Error::Type(c"Stream is not equal to writer stream".to_owned()),
299 );
300 return promise;
301 }
302
303 if stream.is_errored() {
306 rooted!(&in(cx) let mut error = UndefinedValue());
308 stream.get_stored_error(error.handle_mut());
309 let promise = Promise::new2(cx, global);
310 promise.reject_native_with_cx(cx, &error.handle());
311 return promise;
312 }
313
314 if stream.close_queued_or_in_flight() || stream.is_closed() {
317 let promise = Promise::new2(cx, global);
320 promise.reject_error_with_cx(
321 cx,
322 Error::Type(c"Stream has been closed, or has close queued or in-flight".to_owned()),
323 );
324 return promise;
325 }
326
327 if stream.is_erroring() {
329 rooted!(&in(cx) let mut error = UndefinedValue());
331 stream.get_stored_error(error.handle_mut());
332 let promise = Promise::new2(cx, global);
333 promise.reject_native_with_cx(cx, &error.handle());
334 return promise;
335 }
336
337 assert!(stream.is_writable());
339
340 let promise = stream.add_write_request(global, CanGc::from_cx(cx));
342
343 controller.write(cx, global, chunk, chunk_size);
345
346 promise
348 }
349
350 pub(crate) fn release(&self, cx: SafeJSContext, global: &GlobalScope, can_gc: CanGc) {
352 let Some(stream) = self.stream.get() else {
354 unreachable!("Stream should be set.");
356 };
357
358 assert!(stream.get_writer().is_some_and(|writer| &*writer == self));
360
361 let released_error = Error::Type(c"Writer has been released".to_owned());
363
364 rooted!(in(*cx) let mut error = UndefinedValue());
366 released_error.to_jsval(cx, global, error.handle_mut(), can_gc);
367
368 self.ensure_ready_promise_rejected(global, error.handle(), can_gc);
370
371 self.ensure_closed_promise_rejected(global, error.handle(), can_gc);
373
374 stream.set_writer(None);
376
377 self.stream.set(None);
379 }
380
381 pub(crate) fn close_with_error_propagation(
383 &self,
384 cx: &mut js::context::JSContext,
385 global: &GlobalScope,
386 ) -> Rc<Promise> {
387 let Some(stream) = self.stream.get() else {
389 unreachable!("Stream should be set.");
391 };
392
393 if stream.close_queued_or_in_flight() || stream.is_closed() {
399 let promise = Promise::new2(cx, global);
401 promise.resolve_native_with_cx(cx, &());
402 return promise;
403 }
404
405 if stream.is_errored() {
407 rooted!(&in(cx) let mut error = UndefinedValue());
409 stream.get_stored_error(error.handle_mut());
410 let promise = Promise::new2(cx, global);
411 promise.reject_native_with_cx(cx, &error.handle());
412 return promise;
413 }
414
415 assert!(stream.is_writable() || stream.is_erroring());
417
418 self.close(cx, global)
420 }
421
422 pub(crate) fn get_stream(&self) -> Option<DomRoot<WritableStream>> {
423 self.stream.get()
424 }
425}
426
427impl WritableStreamDefaultWriterMethods<crate::DomTypeHolder> for WritableStreamDefaultWriter {
428 fn Closed(&self) -> Rc<Promise> {
430 return self.closed_promise.borrow().clone();
432 }
433
434 fn GetDesiredSize(&self) -> Result<Option<f64>, Error> {
436 let Some(stream) = self.stream.get() else {
438 return Err(Error::Type(c"Stream is undefined".to_owned()));
439 };
440
441 Ok(stream.get_desired_size())
443 }
444
445 fn Ready(&self) -> Rc<Promise> {
447 return self.ready_promise.borrow().clone();
449 }
450
451 fn Abort(&self, cx: &mut CurrentRealm, reason: SafeHandleValue) -> Rc<Promise> {
453 let global = GlobalScope::from_current_realm(cx);
454
455 if self.stream.get().is_none() {
457 let promise = Promise::new2(cx, &global);
459 promise.reject_error_with_cx(cx, Error::Type(c"Stream is undefined".to_owned()));
460 return promise;
461 }
462
463 self.abort(cx, &global, reason)
465 }
466
467 fn Close(&self, cx: &mut CurrentRealm) -> Rc<Promise> {
469 let global = GlobalScope::from_current_realm(cx);
470 let promise = Promise::new2(cx, &global);
471
472 let Some(stream) = self.stream.get() else {
474 promise.reject_error_with_cx(cx, Error::Type(c"Stream is undefined".to_owned()));
477 return promise;
478 };
479
480 if stream.close_queued_or_in_flight() {
482 promise.reject_error_with_cx(
484 cx,
485 Error::Type(c"Stream has closed queued or in-flight".to_owned()),
486 );
487 return promise;
488 }
489
490 self.close(cx, &global)
491 }
492
493 fn ReleaseLock(&self, can_gc: CanGc) {
495 let Some(stream) = self.stream.get() else {
497 return;
499 };
500
501 assert!(stream.get_writer().is_some());
503
504 let global = self.global();
505 let cx = GlobalScope::get_cx();
506
507 self.release(cx, &global, can_gc);
509 }
510
511 fn Write(&self, cx: &mut CurrentRealm, chunk: SafeHandleValue) -> Rc<Promise> {
513 let global = GlobalScope::from_current_realm(cx);
514
515 if self.stream.get().is_none() {
517 let promise = Promise::new2(cx, &global);
519 promise.reject_error_with_cx(cx, Error::Type(c"Stream is undefined".to_owned()));
520 return promise;
521 }
522
523 self.write(cx, &global, chunk)
525 }
526
527 fn Constructor(
529 global: &GlobalScope,
530 proto: Option<SafeHandleObject>,
531 can_gc: CanGc,
532 stream: &WritableStream,
533 ) -> Result<DomRoot<WritableStreamDefaultWriter>, Error> {
534 let writer = WritableStreamDefaultWriter::new(global, proto, can_gc);
535
536 let cx = GlobalScope::get_cx();
537
538 writer.setup(cx, stream, can_gc)?;
540
541 Ok(writer)
542 }
543}