script/dom/
writablestreamdefaultwriter.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
4
5use std::cell::RefCell;
6use std::rc::Rc;
7
8use dom_struct::dom_struct;
9use js::jsval::UndefinedValue;
10use js::rust::{HandleObject as SafeHandleObject, HandleValue as SafeHandleValue};
11
12use crate::dom::bindings::codegen::Bindings::WritableStreamDefaultWriterBinding::WritableStreamDefaultWriterMethods;
13use crate::dom::bindings::error::{Error, ErrorToJsval};
14use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object_with_proto};
15use crate::dom::bindings::root::{DomRoot, MutNullableDom};
16use crate::dom::globalscope::GlobalScope;
17use crate::dom::promise::Promise;
18use crate::dom::writablestream::WritableStream;
19use crate::realms::InRealm;
20use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
21
22/// <https://streams.spec.whatwg.org/#writablestreamdefaultwriter>
23#[dom_struct]
24pub struct WritableStreamDefaultWriter {
25    reflector_: Reflector,
26
27    #[ignore_malloc_size_of = "Rc is hard"]
28    ready_promise: RefCell<Rc<Promise>>,
29
30    /// <https://streams.spec.whatwg.org/#writablestreamdefaultwriter-closedpromise>
31    #[ignore_malloc_size_of = "Rc is hard"]
32    closed_promise: RefCell<Rc<Promise>>,
33
34    /// <https://streams.spec.whatwg.org/#writablestreamdefaultwriter-stream>
35    stream: MutNullableDom<WritableStream>,
36}
37
38impl WritableStreamDefaultWriter {
39    #[cfg_attr(crown, allow(crown::unrooted_must_root))]
40    /// <https://streams.spec.whatwg.org/#set-up-writable-stream-default-writer>
41    /// The parts that create a new promise.
42    fn new_inherited(global: &GlobalScope, can_gc: CanGc) -> WritableStreamDefaultWriter {
43        WritableStreamDefaultWriter {
44            reflector_: Reflector::new(),
45            stream: Default::default(),
46            closed_promise: RefCell::new(Promise::new(global, can_gc)),
47            ready_promise: RefCell::new(Promise::new(global, can_gc)),
48        }
49    }
50
51    pub(crate) fn new(
52        global: &GlobalScope,
53        proto: Option<SafeHandleObject>,
54        can_gc: CanGc,
55    ) -> DomRoot<WritableStreamDefaultWriter> {
56        reflect_dom_object_with_proto(
57            Box::new(WritableStreamDefaultWriter::new_inherited(global, can_gc)),
58            global,
59            proto,
60            can_gc,
61        )
62    }
63
64    /// <https://streams.spec.whatwg.org/#set-up-writable-stream-default-writer>
65    /// Continuing from `new_inherited`, the rest.
66    pub(crate) fn setup(
67        &self,
68        cx: SafeJSContext,
69        stream: &WritableStream,
70        can_gc: CanGc,
71    ) -> Result<(), Error> {
72        // If ! IsWritableStreamLocked(stream) is true, throw a TypeError exception.
73        if stream.is_locked() {
74            return Err(Error::Type("Stream is locked".to_string()));
75        }
76
77        // Set writer.[[stream]] to stream.
78        self.stream.set(Some(stream));
79
80        // Set stream.[[writer]] to writer.
81        stream.set_writer(Some(self));
82
83        // Let state be stream.[[state]].
84
85        // If state is "writable",
86        if stream.is_writable() {
87            // If ! WritableStreamCloseQueuedOrInFlight(stream) is false
88            // and stream.[[backpressure]] is true,
89            if !stream.close_queued_or_in_flight() && stream.get_backpressure() {
90                // set writer.[[readyPromise]] to a new promise.
91                // Done in `new_inherited`.
92            } else {
93                // Otherwise, set writer.[[readyPromise]] to a promise resolved with undefined.
94                // Note: new promise created in `new_inherited`.
95                self.ready_promise.borrow().resolve_native(&(), can_gc);
96            }
97
98            // Set writer.[[closedPromise]] to a new promise.
99            // Done in `new_inherited`.
100            return Ok(());
101        }
102
103        // Otherwise, if state is "erroring",
104        if stream.is_erroring() {
105            rooted!(in(*cx) let mut error = UndefinedValue());
106            stream.get_stored_error(error.handle_mut());
107
108            // Set writer.[[readyPromise]] to a promise rejected with stream.[[storedError]].
109            // Set writer.[[readyPromise]].[[PromiseIsHandled]] to true.
110            // Note: new promise created in `new_inherited`.
111            let ready_promise = self.ready_promise.borrow();
112            ready_promise.reject_native(&error.handle(), can_gc);
113            ready_promise.set_promise_is_handled();
114
115            // Set writer.[[closedPromise]] to a new promise.
116            // Done in `new_inherited`.
117            return Ok(());
118        }
119
120        // Otherwise, if state is "closed",
121        if stream.is_closed() {
122            // Set writer.[[readyPromise]] to a promise resolved with undefined.
123            // Note: new promise created in `new_inherited`.
124            self.ready_promise.borrow().resolve_native(&(), can_gc);
125
126            // Set writer.[[closedPromise]] to a promise resolved with undefined.
127            // Note: new promise created in `new_inherited`.
128            self.closed_promise.borrow().resolve_native(&(), can_gc);
129            return Ok(());
130        }
131
132        // Otherwise,
133        // Assert: state is "errored".
134        assert!(stream.is_errored());
135
136        // Let storedError be stream.[[storedError]].
137        rooted!(in(*cx) let mut error = UndefinedValue());
138        stream.get_stored_error(error.handle_mut());
139
140        // Set writer.[[readyPromise]] to a promise rejected with stream.[[storedError]].
141        // Set writer.[[readyPromise]].[[PromiseIsHandled]] to true.
142        // Note: new promise created in `new_inherited`.
143        let ready_promise = self.ready_promise.borrow();
144        ready_promise.reject_native(&error.handle(), can_gc);
145        ready_promise.set_promise_is_handled();
146
147        // Set writer.[[closedPromise]] to a promise rejected with storedError.
148        // Set writer.[[closedPromise]].[[PromiseIsHandled]] to true.
149        // Note: new promise created in `new_inherited`.
150        let ready_promise = self.closed_promise.borrow();
151        ready_promise.reject_native(&error.handle(), can_gc);
152        ready_promise.set_promise_is_handled();
153
154        Ok(())
155    }
156
157    pub(crate) fn reject_closed_promise_with_stored_error(
158        &self,
159        error: &SafeHandleValue,
160        can_gc: CanGc,
161    ) {
162        self.closed_promise.borrow().reject_native(error, can_gc);
163    }
164
165    pub(crate) fn set_close_promise_is_handled(&self) {
166        self.closed_promise.borrow().set_promise_is_handled();
167    }
168
169    pub(crate) fn set_ready_promise(&self, promise: Rc<Promise>) {
170        *self.ready_promise.borrow_mut() = promise;
171    }
172
173    pub(crate) fn resolve_ready_promise_with_undefined(&self, can_gc: CanGc) {
174        self.ready_promise.borrow().resolve_native(&(), can_gc);
175    }
176
177    pub(crate) fn resolve_closed_promise_with_undefined(&self, can_gc: CanGc) {
178        self.closed_promise.borrow().resolve_native(&(), can_gc);
179    }
180
181    /// <https://streams.spec.whatwg.org/#writable-stream-default-writer-ensure-ready-promise-rejected>
182    pub(crate) fn ensure_ready_promise_rejected(
183        &self,
184        global: &GlobalScope,
185        error: SafeHandleValue,
186        can_gc: CanGc,
187    ) {
188        let ready_promise = self.ready_promise.borrow().clone();
189
190        // If writer.[[readyPromise]].[[PromiseState]] is "pending",
191        if ready_promise.is_pending() {
192            // reject writer.[[readyPromise]] with error.
193            ready_promise.reject_native(&error, can_gc);
194
195            // Set writer.[[readyPromise]].[[PromiseIsHandled]] to true.
196            ready_promise.set_promise_is_handled();
197        } else {
198            // Otherwise, set writer.[[readyPromise]] to a promise rejected with error.
199            let promise = Promise::new(global, can_gc);
200            promise.reject_native(&error, can_gc);
201
202            // Set writer.[[readyPromise]].[[PromiseIsHandled]] to true.
203            promise.set_promise_is_handled();
204            *self.ready_promise.borrow_mut() = promise;
205        }
206    }
207
208    /// <https://streams.spec.whatwg.org/#writable-stream-default-writer-ensure-closed-promise-rejected>
209    pub(crate) fn ensure_closed_promise_rejected(
210        &self,
211        global: &GlobalScope,
212        error: SafeHandleValue,
213        can_gc: CanGc,
214    ) {
215        let closed_promise = self.closed_promise.borrow().clone();
216
217        // If writer.[[closedPromise]].[[PromiseState]] is "pending",
218        if closed_promise.is_pending() {
219            // reject writer.[[closedPromise]] with error.
220            closed_promise.reject_native(&error, can_gc);
221
222            // Set writer.[[closedPromise]].[[PromiseIsHandled]] to true.
223            closed_promise.set_promise_is_handled();
224        } else {
225            // Otherwise, set writer.[[closedPromise]] to a promise rejected with error.
226            let promise = Promise::new(global, can_gc);
227            promise.reject_native(&error, can_gc);
228
229            // Set writer.[[closedPromise]].[[PromiseIsHandled]] to true.
230            promise.set_promise_is_handled();
231            *self.closed_promise.borrow_mut() = promise;
232        }
233    }
234
235    /// <https://streams.spec.whatwg.org/#writable-stream-default-writer-abort>
236    fn abort(
237        &self,
238        cx: SafeJSContext,
239        global: &GlobalScope,
240        reason: SafeHandleValue,
241        realm: InRealm,
242        can_gc: CanGc,
243    ) -> Rc<Promise> {
244        // Let stream be writer.[[stream]].
245        let Some(stream) = self.stream.get() else {
246            // Assert: stream is not undefined.
247            unreachable!("Stream should be set.");
248        };
249
250        // Return ! WritableStreamAbort(stream, reason).
251        stream.abort(cx, global, reason, realm, can_gc)
252    }
253
254    /// <https://streams.spec.whatwg.org/#writable-stream-default-writer-close>
255    fn close(&self, cx: SafeJSContext, global: &GlobalScope, can_gc: CanGc) -> Rc<Promise> {
256        // Let stream be writer.[[stream]].
257        let Some(stream) = self.stream.get() else {
258            // Assert: stream is not undefined.
259            unreachable!("Stream should be set.");
260        };
261
262        // Return ! WritableStreamClose(stream).
263        stream.close(cx, global, can_gc)
264    }
265
266    /// <https://streams.spec.whatwg.org/#writable-stream-default-writer-write>
267    pub(crate) fn write(
268        &self,
269        cx: SafeJSContext,
270        global: &GlobalScope,
271        chunk: SafeHandleValue,
272        can_gc: CanGc,
273    ) -> Rc<Promise> {
274        // Let stream be writer.[[stream]].
275        let Some(stream) = self.stream.get() else {
276            // Assert: stream is not undefined.
277            unreachable!("Stream should be set.");
278        };
279
280        // Let controller be stream.[[controller]].
281        // Note: asserting controller is some.
282        let Some(controller) = stream.get_controller() else {
283            unreachable!("Controller should be set.");
284        };
285
286        // Let chunkSize be ! WritableStreamDefaultControllerGetChunkSize(controller, chunk).
287        let chunk_size = controller.get_chunk_size(cx, global, chunk, can_gc);
288
289        // If stream is not equal to writer.[[stream]],
290        // return a promise rejected with a TypeError exception.
291        if !self
292            .stream
293            .get()
294            .is_some_and(|current_stream| current_stream == stream)
295        {
296            let promise = Promise::new(global, can_gc);
297            promise.reject_error(
298                Error::Type("Stream is not equal to writer stream".to_string()),
299                can_gc,
300            );
301            return promise;
302        }
303
304        // Let state be stream.[[state]].
305        // If state is "errored",
306        if stream.is_errored() {
307            // return a promise rejected with stream.[[storedError]].
308            rooted!(in(*cx) let mut error = UndefinedValue());
309            stream.get_stored_error(error.handle_mut());
310            let promise = Promise::new(global, can_gc);
311            promise.reject_native(&error.handle(), can_gc);
312            return promise;
313        }
314
315        // If ! WritableStreamCloseQueuedOrInFlight(stream) is true
316        // or state is "closed",
317        if stream.close_queued_or_in_flight() || stream.is_closed() {
318            // return a promise rejected with a TypeError exception
319            // indicating that the stream is closing or closed
320            let promise = Promise::new(global, can_gc);
321            promise.reject_error(
322                Error::Type("Stream has been closed, or has close queued or in-flight".to_string()),
323                can_gc,
324            );
325            return promise;
326        }
327
328        // If state is "erroring",
329        if stream.is_erroring() {
330            // return a promise rejected with stream.[[storedError]].
331            rooted!(in(*cx) let mut error = UndefinedValue());
332            stream.get_stored_error(error.handle_mut());
333            let promise = Promise::new(global, can_gc);
334            promise.reject_native(&error.handle(), can_gc);
335            return promise;
336        }
337
338        // Assert: state is "writable".
339        assert!(stream.is_writable());
340
341        // Let promise be ! WritableStreamAddWriteRequest(stream).
342        let promise = stream.add_write_request(global, can_gc);
343
344        // Perform ! WritableStreamDefaultControllerWrite(controller, chunk, chunkSize).
345        controller.write(cx, global, chunk, chunk_size, can_gc);
346
347        // Return promise.
348        promise
349    }
350
351    /// <https://streams.spec.whatwg.org/#writable-stream-default-writer-release>
352    pub(crate) fn release(&self, cx: SafeJSContext, global: &GlobalScope, can_gc: CanGc) {
353        // Let stream be this.[[stream]].
354        let Some(stream) = self.stream.get() else {
355            // Assert: stream is not undefined.
356            unreachable!("Stream should be set.");
357        };
358
359        // Assert: stream.[[writer]] is writer.
360        assert!(stream.get_writer().is_some_and(|writer| &*writer == self));
361
362        // Let releasedError be a new TypeError.
363        let released_error = Error::Type("Writer has been released".to_string());
364
365        // Root the js val of the error.
366        rooted!(in(*cx) let mut error = UndefinedValue());
367        released_error.to_jsval(cx, global, error.handle_mut(), can_gc);
368
369        // Perform ! WritableStreamDefaultWriterEnsureReadyPromiseRejected(writer, releasedError).
370        self.ensure_ready_promise_rejected(global, error.handle(), can_gc);
371
372        // Perform ! WritableStreamDefaultWriterEnsureClosedPromiseRejected(writer, releasedError).
373        self.ensure_closed_promise_rejected(global, error.handle(), can_gc);
374
375        // Set stream.[[writer]] to undefined.
376        stream.set_writer(None);
377
378        // Set this.[[stream]] to undefined.
379        self.stream.set(None);
380    }
381
382    /// <https://streams.spec.whatwg.org/#writable-stream-default-writer-close-with-error-propagation>
383    pub(crate) fn close_with_error_propagation(
384        &self,
385        cx: SafeJSContext,
386        global: &GlobalScope,
387        can_gc: CanGc,
388    ) -> Rc<Promise> {
389        // Let stream be writer.[[stream]].
390        let Some(stream) = self.stream.get() else {
391            // Assert: stream is not undefined.
392            unreachable!("Stream should be set.");
393        };
394
395        // Let state be stream.[[state]].
396        // Used via stream method calls.
397
398        // If ! WritableStreamCloseQueuedOrInFlight(stream) is true
399        // or state is "closed",
400        if stream.close_queued_or_in_flight() || stream.is_closed() {
401            // return a promise resolved with undefined.
402            let promise = Promise::new(global, can_gc);
403            promise.resolve_native(&(), can_gc);
404            return promise;
405        }
406
407        // If state is "errored",
408        if stream.is_errored() {
409            // return a promise rejected with stream.[[storedError]].
410            rooted!(in(*cx) let mut error = UndefinedValue());
411            stream.get_stored_error(error.handle_mut());
412            let promise = Promise::new(global, can_gc);
413            promise.reject_native(&error.handle(), can_gc);
414            return promise;
415        }
416
417        // Assert: state is "writable" or "erroring".
418        assert!(stream.is_writable() || stream.is_erroring());
419
420        // Return ! WritableStreamDefaultWriterClose(writer).
421        self.close(cx, global, can_gc)
422    }
423
424    pub(crate) fn get_stream(&self) -> Option<DomRoot<WritableStream>> {
425        self.stream.get()
426    }
427}
428
429impl WritableStreamDefaultWriterMethods<crate::DomTypeHolder> for WritableStreamDefaultWriter {
430    /// <https://streams.spec.whatwg.org/#default-writer-closed>
431    fn Closed(&self) -> Rc<Promise> {
432        // Return this.[[closedPromise]].
433        return self.closed_promise.borrow().clone();
434    }
435
436    /// <https://streams.spec.whatwg.org/#default-writer-desired-size>
437    fn GetDesiredSize(&self) -> Result<Option<f64>, Error> {
438        // If this.[[stream]] is undefined, throw a TypeError exception.
439        let Some(stream) = self.stream.get() else {
440            return Err(Error::Type("Stream is undefined".to_string()));
441        };
442
443        // Return ! WritableStreamDefaultWriterGetDesiredSize(this).
444        Ok(stream.get_desired_size())
445    }
446
447    /// <https://streams.spec.whatwg.org/#default-writer-ready>
448    fn Ready(&self) -> Rc<Promise> {
449        // Return this.[[readyPromise]].
450        return self.ready_promise.borrow().clone();
451    }
452
453    /// <https://streams.spec.whatwg.org/#default-writer-abort>
454    fn Abort(
455        &self,
456        cx: SafeJSContext,
457        reason: SafeHandleValue,
458        realm: InRealm,
459        can_gc: CanGc,
460    ) -> Rc<Promise> {
461        let global = GlobalScope::from_safe_context(cx, realm);
462
463        // If this.[[stream]] is undefined,
464        if self.stream.get().is_none() {
465            // return a promise rejected with a TypeError exception.
466            let promise = Promise::new(&global, can_gc);
467            promise.reject_error(Error::Type("Stream is undefined".to_string()), can_gc);
468            return promise;
469        }
470
471        // Return ! WritableStreamDefaultWriterAbort(this, reason).
472        self.abort(cx, &global, reason, realm, can_gc)
473    }
474
475    /// <https://streams.spec.whatwg.org/#default-writer-close>
476    fn Close(&self, in_realm: InRealm, can_gc: CanGc) -> Rc<Promise> {
477        let cx = GlobalScope::get_cx();
478        let global = GlobalScope::from_safe_context(cx, in_realm);
479        let promise = Promise::new(&global, can_gc);
480
481        // Let stream be this.[[stream]].
482        let Some(stream) = self.stream.get() else {
483            // If stream is undefined,
484            // return a promise rejected with a TypeError exception.
485            promise.reject_error(Error::Type("Stream is undefined".to_string()), can_gc);
486            return promise;
487        };
488
489        // If ! WritableStreamCloseQueuedOrInFlight(stream) is true
490        if stream.close_queued_or_in_flight() {
491            // return a promise rejected with a TypeError exception.
492            promise.reject_error(
493                Error::Type("Stream has closed queued or in-flight".to_string()),
494                can_gc,
495            );
496            return promise;
497        }
498
499        self.close(cx, &global, can_gc)
500    }
501
502    /// <https://streams.spec.whatwg.org/#default-writer-release-lock>
503    fn ReleaseLock(&self, can_gc: CanGc) {
504        // Let stream be this.[[stream]].
505        let Some(stream) = self.stream.get() else {
506            // If stream is undefined, return.
507            return;
508        };
509
510        // Assert: stream.[[writer]] is not undefined.
511        assert!(stream.get_writer().is_some());
512
513        let global = self.global();
514        let cx = GlobalScope::get_cx();
515
516        // Perform ! WritableStreamDefaultWriterRelease(this).
517        self.release(cx, &global, can_gc);
518    }
519
520    /// <https://streams.spec.whatwg.org/#default-writer-write>
521    fn Write(
522        &self,
523        cx: SafeJSContext,
524        chunk: SafeHandleValue,
525        realm: InRealm,
526        can_gc: CanGc,
527    ) -> Rc<Promise> {
528        let global = GlobalScope::from_safe_context(cx, realm);
529
530        // If this.[[stream]] is undefined,
531        if self.stream.get().is_none() {
532            // return a promise rejected with a TypeError exception.
533            let global = GlobalScope::from_safe_context(cx, realm);
534            let promise = Promise::new(&global, can_gc);
535            promise.reject_error(Error::Type("Stream is undefined".to_string()), can_gc);
536            return promise;
537        }
538
539        // Return ! WritableStreamDefaultWriterWrite(this, chunk).
540        self.write(cx, &global, chunk, can_gc)
541    }
542
543    /// <https://streams.spec.whatwg.org/#default-writer-constructor>
544    fn Constructor(
545        global: &GlobalScope,
546        proto: Option<SafeHandleObject>,
547        can_gc: CanGc,
548        stream: &WritableStream,
549    ) -> Result<DomRoot<WritableStreamDefaultWriter>, Error> {
550        let writer = WritableStreamDefaultWriter::new(global, proto, can_gc);
551
552        let cx = GlobalScope::get_cx();
553
554        // Perform ? SetUpWritableStreamDefaultWriter(this, stream).
555        writer.setup(cx, stream, can_gc)?;
556
557        Ok(writer)
558    }
559}