script/dom/stream/
writablestreamdefaultwriter.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
4
5use std::cell::RefCell;
6use std::rc::Rc;
7
8use dom_struct::dom_struct;
9use js::jsval::UndefinedValue;
10use js::realm::CurrentRealm;
11use js::rust::{HandleObject as SafeHandleObject, HandleValue as SafeHandleValue};
12
13use crate::dom::bindings::codegen::Bindings::WritableStreamDefaultWriterBinding::WritableStreamDefaultWriterMethods;
14use crate::dom::bindings::error::{Error, ErrorToJsval};
15use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object_with_proto};
16use crate::dom::bindings::root::{DomRoot, MutNullableDom};
17use crate::dom::globalscope::GlobalScope;
18use crate::dom::promise::Promise;
19use crate::dom::stream::writablestream::WritableStream;
20use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
21
22/// <https://streams.spec.whatwg.org/#writablestreamdefaultwriter>
23#[dom_struct]
24pub struct WritableStreamDefaultWriter {
25    reflector_: Reflector,
26
27    #[conditional_malloc_size_of]
28    ready_promise: RefCell<Rc<Promise>>,
29
30    /// <https://streams.spec.whatwg.org/#writablestreamdefaultwriter-closedpromise>
31    #[conditional_malloc_size_of]
32    closed_promise: RefCell<Rc<Promise>>,
33
34    /// <https://streams.spec.whatwg.org/#writablestreamdefaultwriter-stream>
35    stream: MutNullableDom<WritableStream>,
36}
37
38impl WritableStreamDefaultWriter {
39    /// <https://streams.spec.whatwg.org/#set-up-writable-stream-default-writer>
40    /// The parts that create a new promise.
41    fn new_inherited(global: &GlobalScope, can_gc: CanGc) -> WritableStreamDefaultWriter {
42        WritableStreamDefaultWriter {
43            reflector_: Reflector::new(),
44            stream: Default::default(),
45            closed_promise: RefCell::new(Promise::new(global, can_gc)),
46            ready_promise: RefCell::new(Promise::new(global, can_gc)),
47        }
48    }
49
50    pub(crate) fn new(
51        global: &GlobalScope,
52        proto: Option<SafeHandleObject>,
53        can_gc: CanGc,
54    ) -> DomRoot<WritableStreamDefaultWriter> {
55        reflect_dom_object_with_proto(
56            Box::new(WritableStreamDefaultWriter::new_inherited(global, can_gc)),
57            global,
58            proto,
59            can_gc,
60        )
61    }
62
63    /// <https://streams.spec.whatwg.org/#set-up-writable-stream-default-writer>
64    /// Continuing from `new_inherited`, the rest.
65    pub(crate) fn setup(
66        &self,
67        cx: SafeJSContext,
68        stream: &WritableStream,
69        can_gc: CanGc,
70    ) -> Result<(), Error> {
71        // If ! IsWritableStreamLocked(stream) is true, throw a TypeError exception.
72        if stream.is_locked() {
73            return Err(Error::Type(c"Stream is locked".to_owned()));
74        }
75
76        // Set writer.[[stream]] to stream.
77        self.stream.set(Some(stream));
78
79        // Set stream.[[writer]] to writer.
80        stream.set_writer(Some(self));
81
82        // Let state be stream.[[state]].
83
84        // If state is "writable",
85        if stream.is_writable() {
86            // If ! WritableStreamCloseQueuedOrInFlight(stream) is false
87            // and stream.[[backpressure]] is true,
88            if !stream.close_queued_or_in_flight() && stream.get_backpressure() {
89                // set writer.[[readyPromise]] to a new promise.
90                // Done in `new_inherited`.
91            } else {
92                // Otherwise, set writer.[[readyPromise]] to a promise resolved with undefined.
93                // Note: new promise created in `new_inherited`.
94                self.ready_promise.borrow().resolve_native(&(), can_gc);
95            }
96
97            // Set writer.[[closedPromise]] to a new promise.
98            // Done in `new_inherited`.
99            return Ok(());
100        }
101
102        // Otherwise, if state is "erroring",
103        if stream.is_erroring() {
104            rooted!(in(*cx) let mut error = UndefinedValue());
105            stream.get_stored_error(error.handle_mut());
106
107            // Set writer.[[readyPromise]] to a promise rejected with stream.[[storedError]].
108            // Set writer.[[readyPromise]].[[PromiseIsHandled]] to true.
109            // Note: new promise created in `new_inherited`.
110            let ready_promise = self.ready_promise.borrow();
111            ready_promise.reject_native(&error.handle(), can_gc);
112            ready_promise.set_promise_is_handled();
113
114            // Set writer.[[closedPromise]] to a new promise.
115            // Done in `new_inherited`.
116            return Ok(());
117        }
118
119        // Otherwise, if state is "closed",
120        if stream.is_closed() {
121            // Set writer.[[readyPromise]] to a promise resolved with undefined.
122            // Note: new promise created in `new_inherited`.
123            self.ready_promise.borrow().resolve_native(&(), can_gc);
124
125            // Set writer.[[closedPromise]] to a promise resolved with undefined.
126            // Note: new promise created in `new_inherited`.
127            self.closed_promise.borrow().resolve_native(&(), can_gc);
128            return Ok(());
129        }
130
131        // Otherwise,
132        // Assert: state is "errored".
133        assert!(stream.is_errored());
134
135        // Let storedError be stream.[[storedError]].
136        rooted!(in(*cx) let mut error = UndefinedValue());
137        stream.get_stored_error(error.handle_mut());
138
139        // Set writer.[[readyPromise]] to a promise rejected with stream.[[storedError]].
140        // Set writer.[[readyPromise]].[[PromiseIsHandled]] to true.
141        // Note: new promise created in `new_inherited`.
142        let ready_promise = self.ready_promise.borrow();
143        ready_promise.reject_native(&error.handle(), can_gc);
144        ready_promise.set_promise_is_handled();
145
146        // Set writer.[[closedPromise]] to a promise rejected with storedError.
147        // Set writer.[[closedPromise]].[[PromiseIsHandled]] to true.
148        // Note: new promise created in `new_inherited`.
149        let ready_promise = self.closed_promise.borrow();
150        ready_promise.reject_native(&error.handle(), can_gc);
151        ready_promise.set_promise_is_handled();
152
153        Ok(())
154    }
155
156    pub(crate) fn reject_closed_promise_with_stored_error(
157        &self,
158        cx: &mut js::context::JSContext,
159        error: &SafeHandleValue,
160    ) {
161        self.closed_promise
162            .borrow()
163            .reject_native(error, CanGc::from_cx(cx));
164    }
165
166    pub(crate) fn set_close_promise_is_handled(&self) {
167        self.closed_promise.borrow().set_promise_is_handled();
168    }
169
170    pub(crate) fn set_ready_promise(&self, promise: Rc<Promise>) {
171        *self.ready_promise.borrow_mut() = promise;
172    }
173
174    pub(crate) fn resolve_ready_promise_with_undefined(&self, can_gc: CanGc) {
175        self.ready_promise.borrow().resolve_native(&(), can_gc);
176    }
177
178    pub(crate) fn resolve_closed_promise_with_undefined(&self, can_gc: CanGc) {
179        self.closed_promise.borrow().resolve_native(&(), can_gc);
180    }
181
182    /// <https://streams.spec.whatwg.org/#writable-stream-default-writer-ensure-ready-promise-rejected>
183    pub(crate) fn ensure_ready_promise_rejected(
184        &self,
185        global: &GlobalScope,
186        error: SafeHandleValue,
187        can_gc: CanGc,
188    ) {
189        let ready_promise = self.ready_promise.borrow().clone();
190
191        // If writer.[[readyPromise]].[[PromiseState]] is "pending",
192        if ready_promise.is_pending() {
193            // reject writer.[[readyPromise]] with error.
194            ready_promise.reject_native(&error, can_gc);
195
196            // Set writer.[[readyPromise]].[[PromiseIsHandled]] to true.
197            ready_promise.set_promise_is_handled();
198        } else {
199            // Otherwise, set writer.[[readyPromise]] to a promise rejected with error.
200            let promise = Promise::new(global, can_gc);
201            promise.reject_native(&error, can_gc);
202
203            // Set writer.[[readyPromise]].[[PromiseIsHandled]] to true.
204            promise.set_promise_is_handled();
205            *self.ready_promise.borrow_mut() = promise;
206        }
207    }
208
209    /// <https://streams.spec.whatwg.org/#writable-stream-default-writer-ensure-closed-promise-rejected>
210    pub(crate) fn ensure_closed_promise_rejected(
211        &self,
212        global: &GlobalScope,
213        error: SafeHandleValue,
214        can_gc: CanGc,
215    ) {
216        let closed_promise = self.closed_promise.borrow().clone();
217
218        // If writer.[[closedPromise]].[[PromiseState]] is "pending",
219        if closed_promise.is_pending() {
220            // reject writer.[[closedPromise]] with error.
221            closed_promise.reject_native(&error, can_gc);
222
223            // Set writer.[[closedPromise]].[[PromiseIsHandled]] to true.
224            closed_promise.set_promise_is_handled();
225        } else {
226            // Otherwise, set writer.[[closedPromise]] to a promise rejected with error.
227            let promise = Promise::new(global, can_gc);
228            promise.reject_native(&error, can_gc);
229
230            // Set writer.[[closedPromise]].[[PromiseIsHandled]] to true.
231            promise.set_promise_is_handled();
232            *self.closed_promise.borrow_mut() = promise;
233        }
234    }
235
236    /// <https://streams.spec.whatwg.org/#writable-stream-default-writer-abort>
237    fn abort(
238        &self,
239        cx: &mut CurrentRealm,
240        global: &GlobalScope,
241        reason: SafeHandleValue,
242    ) -> Rc<Promise> {
243        // Let stream be writer.[[stream]].
244        let Some(stream) = self.stream.get() else {
245            // Assert: stream is not undefined.
246            unreachable!("Stream should be set.");
247        };
248
249        // Return ! WritableStreamAbort(stream, reason).
250        stream.abort(cx, global, reason)
251    }
252
253    /// <https://streams.spec.whatwg.org/#writable-stream-default-writer-close>
254    fn close(&self, cx: &mut js::context::JSContext, global: &GlobalScope) -> Rc<Promise> {
255        // Let stream be writer.[[stream]].
256        let Some(stream) = self.stream.get() else {
257            // Assert: stream is not undefined.
258            unreachable!("Stream should be set.");
259        };
260
261        // Return ! WritableStreamClose(stream).
262        stream.close(cx, global)
263    }
264
265    /// <https://streams.spec.whatwg.org/#writable-stream-default-writer-write>
266    pub(crate) fn write(
267        &self,
268        cx: &mut js::context::JSContext,
269        global: &GlobalScope,
270        chunk: SafeHandleValue,
271    ) -> Rc<Promise> {
272        // Let stream be writer.[[stream]].
273        let Some(stream) = self.stream.get() else {
274            // Assert: stream is not undefined.
275            unreachable!("Stream should be set.");
276        };
277
278        // Let controller be stream.[[controller]].
279        // Note: asserting controller is some.
280        let Some(controller) = stream.get_controller() else {
281            unreachable!("Controller should be set.");
282        };
283
284        // Let chunkSize be ! WritableStreamDefaultControllerGetChunkSize(controller, chunk).
285        let chunk_size = controller.get_chunk_size(cx, global, chunk);
286
287        // If stream is not equal to writer.[[stream]],
288        // return a promise rejected with a TypeError exception.
289        if !self
290            .stream
291            .get()
292            .is_some_and(|current_stream| current_stream == stream)
293        {
294            let promise = Promise::new2(cx, global);
295            promise.reject_error(
296                Error::Type(c"Stream is not equal to writer stream".to_owned()),
297                CanGc::from_cx(cx),
298            );
299            return promise;
300        }
301
302        // Let state be stream.[[state]].
303        // If state is "errored",
304        if stream.is_errored() {
305            // return a promise rejected with stream.[[storedError]].
306            rooted!(&in(cx) let mut error = UndefinedValue());
307            stream.get_stored_error(error.handle_mut());
308            let promise = Promise::new2(cx, global);
309            promise.reject_native(&error.handle(), CanGc::from_cx(cx));
310            return promise;
311        }
312
313        // If ! WritableStreamCloseQueuedOrInFlight(stream) is true
314        // or state is "closed",
315        if stream.close_queued_or_in_flight() || stream.is_closed() {
316            // return a promise rejected with a TypeError exception
317            // indicating that the stream is closing or closed
318            let promise = Promise::new2(cx, global);
319            promise.reject_error(
320                Error::Type(c"Stream has been closed, or has close queued or in-flight".to_owned()),
321                CanGc::from_cx(cx),
322            );
323            return promise;
324        }
325
326        // If state is "erroring",
327        if stream.is_erroring() {
328            // return a promise rejected with stream.[[storedError]].
329            rooted!(&in(cx) let mut error = UndefinedValue());
330            stream.get_stored_error(error.handle_mut());
331            let promise = Promise::new2(cx, global);
332            promise.reject_native(&error.handle(), CanGc::from_cx(cx));
333            return promise;
334        }
335
336        // Assert: state is "writable".
337        assert!(stream.is_writable());
338
339        // Let promise be ! WritableStreamAddWriteRequest(stream).
340        let promise = stream.add_write_request(global, CanGc::from_cx(cx));
341
342        // Perform ! WritableStreamDefaultControllerWrite(controller, chunk, chunkSize).
343        controller.write(cx, global, chunk, chunk_size);
344
345        // Return promise.
346        promise
347    }
348
349    /// <https://streams.spec.whatwg.org/#writable-stream-default-writer-release>
350    pub(crate) fn release(&self, cx: SafeJSContext, global: &GlobalScope, can_gc: CanGc) {
351        // Let stream be this.[[stream]].
352        let Some(stream) = self.stream.get() else {
353            // Assert: stream is not undefined.
354            unreachable!("Stream should be set.");
355        };
356
357        // Assert: stream.[[writer]] is writer.
358        assert!(stream.get_writer().is_some_and(|writer| &*writer == self));
359
360        // Let releasedError be a new TypeError.
361        let released_error = Error::Type(c"Writer has been released".to_owned());
362
363        // Root the js val of the error.
364        rooted!(in(*cx) let mut error = UndefinedValue());
365        released_error.to_jsval(cx, global, error.handle_mut(), can_gc);
366
367        // Perform ! WritableStreamDefaultWriterEnsureReadyPromiseRejected(writer, releasedError).
368        self.ensure_ready_promise_rejected(global, error.handle(), can_gc);
369
370        // Perform ! WritableStreamDefaultWriterEnsureClosedPromiseRejected(writer, releasedError).
371        self.ensure_closed_promise_rejected(global, error.handle(), can_gc);
372
373        // Set stream.[[writer]] to undefined.
374        stream.set_writer(None);
375
376        // Set this.[[stream]] to undefined.
377        self.stream.set(None);
378    }
379
380    /// <https://streams.spec.whatwg.org/#writable-stream-default-writer-close-with-error-propagation>
381    pub(crate) fn close_with_error_propagation(
382        &self,
383        cx: &mut js::context::JSContext,
384        global: &GlobalScope,
385    ) -> Rc<Promise> {
386        // Let stream be writer.[[stream]].
387        let Some(stream) = self.stream.get() else {
388            // Assert: stream is not undefined.
389            unreachable!("Stream should be set.");
390        };
391
392        // Let state be stream.[[state]].
393        // Used via stream method calls.
394
395        // If ! WritableStreamCloseQueuedOrInFlight(stream) is true
396        // or state is "closed",
397        if stream.close_queued_or_in_flight() || stream.is_closed() {
398            // return a promise resolved with undefined.
399            let promise = Promise::new2(cx, global);
400            promise.resolve_native(&(), CanGc::from_cx(cx));
401            return promise;
402        }
403
404        // If state is "errored",
405        if stream.is_errored() {
406            // return a promise rejected with stream.[[storedError]].
407            rooted!(&in(cx) let mut error = UndefinedValue());
408            stream.get_stored_error(error.handle_mut());
409            let promise = Promise::new2(cx, global);
410            promise.reject_native(&error.handle(), CanGc::from_cx(cx));
411            return promise;
412        }
413
414        // Assert: state is "writable" or "erroring".
415        assert!(stream.is_writable() || stream.is_erroring());
416
417        // Return ! WritableStreamDefaultWriterClose(writer).
418        self.close(cx, global)
419    }
420
421    pub(crate) fn get_stream(&self) -> Option<DomRoot<WritableStream>> {
422        self.stream.get()
423    }
424}
425
426impl WritableStreamDefaultWriterMethods<crate::DomTypeHolder> for WritableStreamDefaultWriter {
427    /// <https://streams.spec.whatwg.org/#default-writer-closed>
428    fn Closed(&self) -> Rc<Promise> {
429        // Return this.[[closedPromise]].
430        return self.closed_promise.borrow().clone();
431    }
432
433    /// <https://streams.spec.whatwg.org/#default-writer-desired-size>
434    fn GetDesiredSize(&self) -> Result<Option<f64>, Error> {
435        // If this.[[stream]] is undefined, throw a TypeError exception.
436        let Some(stream) = self.stream.get() else {
437            return Err(Error::Type(c"Stream is undefined".to_owned()));
438        };
439
440        // Return ! WritableStreamDefaultWriterGetDesiredSize(this).
441        Ok(stream.get_desired_size())
442    }
443
444    /// <https://streams.spec.whatwg.org/#default-writer-ready>
445    fn Ready(&self) -> Rc<Promise> {
446        // Return this.[[readyPromise]].
447        return self.ready_promise.borrow().clone();
448    }
449
450    /// <https://streams.spec.whatwg.org/#default-writer-abort>
451    fn Abort(&self, cx: &mut CurrentRealm, reason: SafeHandleValue) -> Rc<Promise> {
452        let global = GlobalScope::from_current_realm(cx);
453
454        // If this.[[stream]] is undefined,
455        if self.stream.get().is_none() {
456            // return a promise rejected with a TypeError exception.
457            let promise = Promise::new2(cx, &global);
458            promise.reject_error(
459                Error::Type(c"Stream is undefined".to_owned()),
460                CanGc::from_cx(cx),
461            );
462            return promise;
463        }
464
465        // Return ! WritableStreamDefaultWriterAbort(this, reason).
466        self.abort(cx, &global, reason)
467    }
468
469    /// <https://streams.spec.whatwg.org/#default-writer-close>
470    fn Close(&self, cx: &mut CurrentRealm) -> Rc<Promise> {
471        let global = GlobalScope::from_current_realm(cx);
472        let promise = Promise::new2(cx, &global);
473
474        // Let stream be this.[[stream]].
475        let Some(stream) = self.stream.get() else {
476            // If stream is undefined,
477            // return a promise rejected with a TypeError exception.
478            promise.reject_error(
479                Error::Type(c"Stream is undefined".to_owned()),
480                CanGc::from_cx(cx),
481            );
482            return promise;
483        };
484
485        // If ! WritableStreamCloseQueuedOrInFlight(stream) is true
486        if stream.close_queued_or_in_flight() {
487            // return a promise rejected with a TypeError exception.
488            promise.reject_error(
489                Error::Type(c"Stream has closed queued or in-flight".to_owned()),
490                CanGc::from_cx(cx),
491            );
492            return promise;
493        }
494
495        self.close(cx, &global)
496    }
497
498    /// <https://streams.spec.whatwg.org/#default-writer-release-lock>
499    fn ReleaseLock(&self, can_gc: CanGc) {
500        // Let stream be this.[[stream]].
501        let Some(stream) = self.stream.get() else {
502            // If stream is undefined, return.
503            return;
504        };
505
506        // Assert: stream.[[writer]] is not undefined.
507        assert!(stream.get_writer().is_some());
508
509        let global = self.global();
510        let cx = GlobalScope::get_cx();
511
512        // Perform ! WritableStreamDefaultWriterRelease(this).
513        self.release(cx, &global, can_gc);
514    }
515
516    /// <https://streams.spec.whatwg.org/#default-writer-write>
517    fn Write(&self, cx: &mut CurrentRealm, chunk: SafeHandleValue) -> Rc<Promise> {
518        let global = GlobalScope::from_current_realm(cx);
519
520        // If this.[[stream]] is undefined,
521        if self.stream.get().is_none() {
522            // return a promise rejected with a TypeError exception.
523            let promise = Promise::new2(cx, &global);
524            promise.reject_error(
525                Error::Type(c"Stream is undefined".to_owned()),
526                CanGc::from_cx(cx),
527            );
528            return promise;
529        }
530
531        // Return ! WritableStreamDefaultWriterWrite(this, chunk).
532        self.write(cx, &global, chunk)
533    }
534
535    /// <https://streams.spec.whatwg.org/#default-writer-constructor>
536    fn Constructor(
537        global: &GlobalScope,
538        proto: Option<SafeHandleObject>,
539        can_gc: CanGc,
540        stream: &WritableStream,
541    ) -> Result<DomRoot<WritableStreamDefaultWriter>, Error> {
542        let writer = WritableStreamDefaultWriter::new(global, proto, can_gc);
543
544        let cx = GlobalScope::get_cx();
545
546        // Perform ? SetUpWritableStreamDefaultWriter(this, stream).
547        writer.setup(cx, stream, can_gc)?;
548
549        Ok(writer)
550    }
551}