1use std::cell::RefCell;
6use std::rc::Rc;
7
8use dom_struct::dom_struct;
9use js::jsval::UndefinedValue;
10use js::rust::{HandleObject as SafeHandleObject, HandleValue as SafeHandleValue};
11
12use crate::dom::bindings::codegen::Bindings::WritableStreamDefaultWriterBinding::WritableStreamDefaultWriterMethods;
13use crate::dom::bindings::error::{Error, ErrorToJsval};
14use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object_with_proto};
15use crate::dom::bindings::root::{DomRoot, MutNullableDom};
16use crate::dom::globalscope::GlobalScope;
17use crate::dom::promise::Promise;
18use crate::dom::stream::writablestream::WritableStream;
19use crate::realms::InRealm;
20use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
21
22#[dom_struct]
24pub struct WritableStreamDefaultWriter {
25 reflector_: Reflector,
26
27 #[conditional_malloc_size_of]
28 ready_promise: RefCell<Rc<Promise>>,
29
30 #[conditional_malloc_size_of]
32 closed_promise: RefCell<Rc<Promise>>,
33
34 stream: MutNullableDom<WritableStream>,
36}
37
38impl WritableStreamDefaultWriter {
39 fn new_inherited(global: &GlobalScope, can_gc: CanGc) -> WritableStreamDefaultWriter {
42 WritableStreamDefaultWriter {
43 reflector_: Reflector::new(),
44 stream: Default::default(),
45 closed_promise: RefCell::new(Promise::new(global, can_gc)),
46 ready_promise: RefCell::new(Promise::new(global, can_gc)),
47 }
48 }
49
50 pub(crate) fn new(
51 global: &GlobalScope,
52 proto: Option<SafeHandleObject>,
53 can_gc: CanGc,
54 ) -> DomRoot<WritableStreamDefaultWriter> {
55 reflect_dom_object_with_proto(
56 Box::new(WritableStreamDefaultWriter::new_inherited(global, can_gc)),
57 global,
58 proto,
59 can_gc,
60 )
61 }
62
63 pub(crate) fn setup(
66 &self,
67 cx: SafeJSContext,
68 stream: &WritableStream,
69 can_gc: CanGc,
70 ) -> Result<(), Error> {
71 if stream.is_locked() {
73 return Err(Error::Type("Stream is locked".to_string()));
74 }
75
76 self.stream.set(Some(stream));
78
79 stream.set_writer(Some(self));
81
82 if stream.is_writable() {
86 if !stream.close_queued_or_in_flight() && stream.get_backpressure() {
89 } else {
92 self.ready_promise.borrow().resolve_native(&(), can_gc);
95 }
96
97 return Ok(());
100 }
101
102 if stream.is_erroring() {
104 rooted!(in(*cx) let mut error = UndefinedValue());
105 stream.get_stored_error(error.handle_mut());
106
107 let ready_promise = self.ready_promise.borrow();
111 ready_promise.reject_native(&error.handle(), can_gc);
112 ready_promise.set_promise_is_handled();
113
114 return Ok(());
117 }
118
119 if stream.is_closed() {
121 self.ready_promise.borrow().resolve_native(&(), can_gc);
124
125 self.closed_promise.borrow().resolve_native(&(), can_gc);
128 return Ok(());
129 }
130
131 assert!(stream.is_errored());
134
135 rooted!(in(*cx) let mut error = UndefinedValue());
137 stream.get_stored_error(error.handle_mut());
138
139 let ready_promise = self.ready_promise.borrow();
143 ready_promise.reject_native(&error.handle(), can_gc);
144 ready_promise.set_promise_is_handled();
145
146 let ready_promise = self.closed_promise.borrow();
150 ready_promise.reject_native(&error.handle(), can_gc);
151 ready_promise.set_promise_is_handled();
152
153 Ok(())
154 }
155
156 pub(crate) fn reject_closed_promise_with_stored_error(
157 &self,
158 error: &SafeHandleValue,
159 can_gc: CanGc,
160 ) {
161 self.closed_promise.borrow().reject_native(error, can_gc);
162 }
163
164 pub(crate) fn set_close_promise_is_handled(&self) {
165 self.closed_promise.borrow().set_promise_is_handled();
166 }
167
168 pub(crate) fn set_ready_promise(&self, promise: Rc<Promise>) {
169 *self.ready_promise.borrow_mut() = promise;
170 }
171
172 pub(crate) fn resolve_ready_promise_with_undefined(&self, can_gc: CanGc) {
173 self.ready_promise.borrow().resolve_native(&(), can_gc);
174 }
175
176 pub(crate) fn resolve_closed_promise_with_undefined(&self, can_gc: CanGc) {
177 self.closed_promise.borrow().resolve_native(&(), can_gc);
178 }
179
180 pub(crate) fn ensure_ready_promise_rejected(
182 &self,
183 global: &GlobalScope,
184 error: SafeHandleValue,
185 can_gc: CanGc,
186 ) {
187 let ready_promise = self.ready_promise.borrow().clone();
188
189 if ready_promise.is_pending() {
191 ready_promise.reject_native(&error, can_gc);
193
194 ready_promise.set_promise_is_handled();
196 } else {
197 let promise = Promise::new(global, can_gc);
199 promise.reject_native(&error, can_gc);
200
201 promise.set_promise_is_handled();
203 *self.ready_promise.borrow_mut() = promise;
204 }
205 }
206
207 pub(crate) fn ensure_closed_promise_rejected(
209 &self,
210 global: &GlobalScope,
211 error: SafeHandleValue,
212 can_gc: CanGc,
213 ) {
214 let closed_promise = self.closed_promise.borrow().clone();
215
216 if closed_promise.is_pending() {
218 closed_promise.reject_native(&error, can_gc);
220
221 closed_promise.set_promise_is_handled();
223 } else {
224 let promise = Promise::new(global, can_gc);
226 promise.reject_native(&error, can_gc);
227
228 promise.set_promise_is_handled();
230 *self.closed_promise.borrow_mut() = promise;
231 }
232 }
233
234 fn abort(
236 &self,
237 cx: SafeJSContext,
238 global: &GlobalScope,
239 reason: SafeHandleValue,
240 realm: InRealm,
241 can_gc: CanGc,
242 ) -> Rc<Promise> {
243 let Some(stream) = self.stream.get() else {
245 unreachable!("Stream should be set.");
247 };
248
249 stream.abort(cx, global, reason, realm, can_gc)
251 }
252
253 fn close(&self, cx: SafeJSContext, global: &GlobalScope, can_gc: CanGc) -> Rc<Promise> {
255 let Some(stream) = self.stream.get() else {
257 unreachable!("Stream should be set.");
259 };
260
261 stream.close(cx, global, can_gc)
263 }
264
265 pub(crate) fn write(
267 &self,
268 cx: SafeJSContext,
269 global: &GlobalScope,
270 chunk: SafeHandleValue,
271 can_gc: CanGc,
272 ) -> Rc<Promise> {
273 let Some(stream) = self.stream.get() else {
275 unreachable!("Stream should be set.");
277 };
278
279 let Some(controller) = stream.get_controller() else {
282 unreachable!("Controller should be set.");
283 };
284
285 let chunk_size = controller.get_chunk_size(cx, global, chunk, can_gc);
287
288 if !self
291 .stream
292 .get()
293 .is_some_and(|current_stream| current_stream == stream)
294 {
295 let promise = Promise::new(global, can_gc);
296 promise.reject_error(
297 Error::Type("Stream is not equal to writer stream".to_string()),
298 can_gc,
299 );
300 return promise;
301 }
302
303 if stream.is_errored() {
306 rooted!(in(*cx) let mut error = UndefinedValue());
308 stream.get_stored_error(error.handle_mut());
309 let promise = Promise::new(global, can_gc);
310 promise.reject_native(&error.handle(), can_gc);
311 return promise;
312 }
313
314 if stream.close_queued_or_in_flight() || stream.is_closed() {
317 let promise = Promise::new(global, can_gc);
320 promise.reject_error(
321 Error::Type("Stream has been closed, or has close queued or in-flight".to_string()),
322 can_gc,
323 );
324 return promise;
325 }
326
327 if stream.is_erroring() {
329 rooted!(in(*cx) let mut error = UndefinedValue());
331 stream.get_stored_error(error.handle_mut());
332 let promise = Promise::new(global, can_gc);
333 promise.reject_native(&error.handle(), can_gc);
334 return promise;
335 }
336
337 assert!(stream.is_writable());
339
340 let promise = stream.add_write_request(global, can_gc);
342
343 controller.write(cx, global, chunk, chunk_size, can_gc);
345
346 promise
348 }
349
350 pub(crate) fn release(&self, cx: SafeJSContext, global: &GlobalScope, can_gc: CanGc) {
352 let Some(stream) = self.stream.get() else {
354 unreachable!("Stream should be set.");
356 };
357
358 assert!(stream.get_writer().is_some_and(|writer| &*writer == self));
360
361 let released_error = Error::Type("Writer has been released".to_string());
363
364 rooted!(in(*cx) let mut error = UndefinedValue());
366 released_error.to_jsval(cx, global, error.handle_mut(), can_gc);
367
368 self.ensure_ready_promise_rejected(global, error.handle(), can_gc);
370
371 self.ensure_closed_promise_rejected(global, error.handle(), can_gc);
373
374 stream.set_writer(None);
376
377 self.stream.set(None);
379 }
380
381 pub(crate) fn close_with_error_propagation(
383 &self,
384 cx: SafeJSContext,
385 global: &GlobalScope,
386 can_gc: CanGc,
387 ) -> Rc<Promise> {
388 let Some(stream) = self.stream.get() else {
390 unreachable!("Stream should be set.");
392 };
393
394 if stream.close_queued_or_in_flight() || stream.is_closed() {
400 let promise = Promise::new(global, can_gc);
402 promise.resolve_native(&(), can_gc);
403 return promise;
404 }
405
406 if stream.is_errored() {
408 rooted!(in(*cx) let mut error = UndefinedValue());
410 stream.get_stored_error(error.handle_mut());
411 let promise = Promise::new(global, can_gc);
412 promise.reject_native(&error.handle(), can_gc);
413 return promise;
414 }
415
416 assert!(stream.is_writable() || stream.is_erroring());
418
419 self.close(cx, global, can_gc)
421 }
422
423 pub(crate) fn get_stream(&self) -> Option<DomRoot<WritableStream>> {
424 self.stream.get()
425 }
426}
427
428impl WritableStreamDefaultWriterMethods<crate::DomTypeHolder> for WritableStreamDefaultWriter {
429 fn Closed(&self) -> Rc<Promise> {
431 return self.closed_promise.borrow().clone();
433 }
434
435 fn GetDesiredSize(&self) -> Result<Option<f64>, Error> {
437 let Some(stream) = self.stream.get() else {
439 return Err(Error::Type("Stream is undefined".to_string()));
440 };
441
442 Ok(stream.get_desired_size())
444 }
445
446 fn Ready(&self) -> Rc<Promise> {
448 return self.ready_promise.borrow().clone();
450 }
451
452 fn Abort(
454 &self,
455 cx: SafeJSContext,
456 reason: SafeHandleValue,
457 realm: InRealm,
458 can_gc: CanGc,
459 ) -> Rc<Promise> {
460 let global = GlobalScope::from_safe_context(cx, realm);
461
462 if self.stream.get().is_none() {
464 let promise = Promise::new(&global, can_gc);
466 promise.reject_error(Error::Type("Stream is undefined".to_string()), can_gc);
467 return promise;
468 }
469
470 self.abort(cx, &global, reason, realm, can_gc)
472 }
473
474 fn Close(&self, in_realm: InRealm, can_gc: CanGc) -> Rc<Promise> {
476 let cx = GlobalScope::get_cx();
477 let global = GlobalScope::from_safe_context(cx, in_realm);
478 let promise = Promise::new(&global, can_gc);
479
480 let Some(stream) = self.stream.get() else {
482 promise.reject_error(Error::Type("Stream is undefined".to_string()), can_gc);
485 return promise;
486 };
487
488 if stream.close_queued_or_in_flight() {
490 promise.reject_error(
492 Error::Type("Stream has closed queued or in-flight".to_string()),
493 can_gc,
494 );
495 return promise;
496 }
497
498 self.close(cx, &global, can_gc)
499 }
500
501 fn ReleaseLock(&self, can_gc: CanGc) {
503 let Some(stream) = self.stream.get() else {
505 return;
507 };
508
509 assert!(stream.get_writer().is_some());
511
512 let global = self.global();
513 let cx = GlobalScope::get_cx();
514
515 self.release(cx, &global, can_gc);
517 }
518
519 fn Write(
521 &self,
522 cx: SafeJSContext,
523 chunk: SafeHandleValue,
524 realm: InRealm,
525 can_gc: CanGc,
526 ) -> Rc<Promise> {
527 let global = GlobalScope::from_safe_context(cx, realm);
528
529 if self.stream.get().is_none() {
531 let global = GlobalScope::from_safe_context(cx, realm);
533 let promise = Promise::new(&global, can_gc);
534 promise.reject_error(Error::Type("Stream is undefined".to_string()), can_gc);
535 return promise;
536 }
537
538 self.write(cx, &global, chunk, can_gc)
540 }
541
542 fn Constructor(
544 global: &GlobalScope,
545 proto: Option<SafeHandleObject>,
546 can_gc: CanGc,
547 stream: &WritableStream,
548 ) -> Result<DomRoot<WritableStreamDefaultWriter>, Error> {
549 let writer = WritableStreamDefaultWriter::new(global, proto, can_gc);
550
551 let cx = GlobalScope::get_cx();
552
553 writer.setup(cx, stream, can_gc)?;
555
556 Ok(writer)
557 }
558}