script/dom/stream/
writablestreamdefaultwriter.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
4
5use std::cell::RefCell;
6use std::rc::Rc;
7
8use dom_struct::dom_struct;
9use js::jsval::UndefinedValue;
10use js::rust::{HandleObject as SafeHandleObject, HandleValue as SafeHandleValue};
11
12use crate::dom::bindings::codegen::Bindings::WritableStreamDefaultWriterBinding::WritableStreamDefaultWriterMethods;
13use crate::dom::bindings::error::{Error, ErrorToJsval};
14use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object_with_proto};
15use crate::dom::bindings::root::{DomRoot, MutNullableDom};
16use crate::dom::globalscope::GlobalScope;
17use crate::dom::promise::Promise;
18use crate::dom::stream::writablestream::WritableStream;
19use crate::realms::InRealm;
20use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
21
22/// <https://streams.spec.whatwg.org/#writablestreamdefaultwriter>
23#[dom_struct]
24pub struct WritableStreamDefaultWriter {
25    reflector_: Reflector,
26
27    #[conditional_malloc_size_of]
28    ready_promise: RefCell<Rc<Promise>>,
29
30    /// <https://streams.spec.whatwg.org/#writablestreamdefaultwriter-closedpromise>
31    #[conditional_malloc_size_of]
32    closed_promise: RefCell<Rc<Promise>>,
33
34    /// <https://streams.spec.whatwg.org/#writablestreamdefaultwriter-stream>
35    stream: MutNullableDom<WritableStream>,
36}
37
38impl WritableStreamDefaultWriter {
39    /// <https://streams.spec.whatwg.org/#set-up-writable-stream-default-writer>
40    /// The parts that create a new promise.
41    fn new_inherited(global: &GlobalScope, can_gc: CanGc) -> WritableStreamDefaultWriter {
42        WritableStreamDefaultWriter {
43            reflector_: Reflector::new(),
44            stream: Default::default(),
45            closed_promise: RefCell::new(Promise::new(global, can_gc)),
46            ready_promise: RefCell::new(Promise::new(global, can_gc)),
47        }
48    }
49
50    pub(crate) fn new(
51        global: &GlobalScope,
52        proto: Option<SafeHandleObject>,
53        can_gc: CanGc,
54    ) -> DomRoot<WritableStreamDefaultWriter> {
55        reflect_dom_object_with_proto(
56            Box::new(WritableStreamDefaultWriter::new_inherited(global, can_gc)),
57            global,
58            proto,
59            can_gc,
60        )
61    }
62
63    /// <https://streams.spec.whatwg.org/#set-up-writable-stream-default-writer>
64    /// Continuing from `new_inherited`, the rest.
65    pub(crate) fn setup(
66        &self,
67        cx: SafeJSContext,
68        stream: &WritableStream,
69        can_gc: CanGc,
70    ) -> Result<(), Error> {
71        // If ! IsWritableStreamLocked(stream) is true, throw a TypeError exception.
72        if stream.is_locked() {
73            return Err(Error::Type("Stream is locked".to_string()));
74        }
75
76        // Set writer.[[stream]] to stream.
77        self.stream.set(Some(stream));
78
79        // Set stream.[[writer]] to writer.
80        stream.set_writer(Some(self));
81
82        // Let state be stream.[[state]].
83
84        // If state is "writable",
85        if stream.is_writable() {
86            // If ! WritableStreamCloseQueuedOrInFlight(stream) is false
87            // and stream.[[backpressure]] is true,
88            if !stream.close_queued_or_in_flight() && stream.get_backpressure() {
89                // set writer.[[readyPromise]] to a new promise.
90                // Done in `new_inherited`.
91            } else {
92                // Otherwise, set writer.[[readyPromise]] to a promise resolved with undefined.
93                // Note: new promise created in `new_inherited`.
94                self.ready_promise.borrow().resolve_native(&(), can_gc);
95            }
96
97            // Set writer.[[closedPromise]] to a new promise.
98            // Done in `new_inherited`.
99            return Ok(());
100        }
101
102        // Otherwise, if state is "erroring",
103        if stream.is_erroring() {
104            rooted!(in(*cx) let mut error = UndefinedValue());
105            stream.get_stored_error(error.handle_mut());
106
107            // Set writer.[[readyPromise]] to a promise rejected with stream.[[storedError]].
108            // Set writer.[[readyPromise]].[[PromiseIsHandled]] to true.
109            // Note: new promise created in `new_inherited`.
110            let ready_promise = self.ready_promise.borrow();
111            ready_promise.reject_native(&error.handle(), can_gc);
112            ready_promise.set_promise_is_handled();
113
114            // Set writer.[[closedPromise]] to a new promise.
115            // Done in `new_inherited`.
116            return Ok(());
117        }
118
119        // Otherwise, if state is "closed",
120        if stream.is_closed() {
121            // Set writer.[[readyPromise]] to a promise resolved with undefined.
122            // Note: new promise created in `new_inherited`.
123            self.ready_promise.borrow().resolve_native(&(), can_gc);
124
125            // Set writer.[[closedPromise]] to a promise resolved with undefined.
126            // Note: new promise created in `new_inherited`.
127            self.closed_promise.borrow().resolve_native(&(), can_gc);
128            return Ok(());
129        }
130
131        // Otherwise,
132        // Assert: state is "errored".
133        assert!(stream.is_errored());
134
135        // Let storedError be stream.[[storedError]].
136        rooted!(in(*cx) let mut error = UndefinedValue());
137        stream.get_stored_error(error.handle_mut());
138
139        // Set writer.[[readyPromise]] to a promise rejected with stream.[[storedError]].
140        // Set writer.[[readyPromise]].[[PromiseIsHandled]] to true.
141        // Note: new promise created in `new_inherited`.
142        let ready_promise = self.ready_promise.borrow();
143        ready_promise.reject_native(&error.handle(), can_gc);
144        ready_promise.set_promise_is_handled();
145
146        // Set writer.[[closedPromise]] to a promise rejected with storedError.
147        // Set writer.[[closedPromise]].[[PromiseIsHandled]] to true.
148        // Note: new promise created in `new_inherited`.
149        let ready_promise = self.closed_promise.borrow();
150        ready_promise.reject_native(&error.handle(), can_gc);
151        ready_promise.set_promise_is_handled();
152
153        Ok(())
154    }
155
156    pub(crate) fn reject_closed_promise_with_stored_error(
157        &self,
158        error: &SafeHandleValue,
159        can_gc: CanGc,
160    ) {
161        self.closed_promise.borrow().reject_native(error, can_gc);
162    }
163
164    pub(crate) fn set_close_promise_is_handled(&self) {
165        self.closed_promise.borrow().set_promise_is_handled();
166    }
167
168    pub(crate) fn set_ready_promise(&self, promise: Rc<Promise>) {
169        *self.ready_promise.borrow_mut() = promise;
170    }
171
172    pub(crate) fn resolve_ready_promise_with_undefined(&self, can_gc: CanGc) {
173        self.ready_promise.borrow().resolve_native(&(), can_gc);
174    }
175
176    pub(crate) fn resolve_closed_promise_with_undefined(&self, can_gc: CanGc) {
177        self.closed_promise.borrow().resolve_native(&(), can_gc);
178    }
179
180    /// <https://streams.spec.whatwg.org/#writable-stream-default-writer-ensure-ready-promise-rejected>
181    pub(crate) fn ensure_ready_promise_rejected(
182        &self,
183        global: &GlobalScope,
184        error: SafeHandleValue,
185        can_gc: CanGc,
186    ) {
187        let ready_promise = self.ready_promise.borrow().clone();
188
189        // If writer.[[readyPromise]].[[PromiseState]] is "pending",
190        if ready_promise.is_pending() {
191            // reject writer.[[readyPromise]] with error.
192            ready_promise.reject_native(&error, can_gc);
193
194            // Set writer.[[readyPromise]].[[PromiseIsHandled]] to true.
195            ready_promise.set_promise_is_handled();
196        } else {
197            // Otherwise, set writer.[[readyPromise]] to a promise rejected with error.
198            let promise = Promise::new(global, can_gc);
199            promise.reject_native(&error, can_gc);
200
201            // Set writer.[[readyPromise]].[[PromiseIsHandled]] to true.
202            promise.set_promise_is_handled();
203            *self.ready_promise.borrow_mut() = promise;
204        }
205    }
206
207    /// <https://streams.spec.whatwg.org/#writable-stream-default-writer-ensure-closed-promise-rejected>
208    pub(crate) fn ensure_closed_promise_rejected(
209        &self,
210        global: &GlobalScope,
211        error: SafeHandleValue,
212        can_gc: CanGc,
213    ) {
214        let closed_promise = self.closed_promise.borrow().clone();
215
216        // If writer.[[closedPromise]].[[PromiseState]] is "pending",
217        if closed_promise.is_pending() {
218            // reject writer.[[closedPromise]] with error.
219            closed_promise.reject_native(&error, can_gc);
220
221            // Set writer.[[closedPromise]].[[PromiseIsHandled]] to true.
222            closed_promise.set_promise_is_handled();
223        } else {
224            // Otherwise, set writer.[[closedPromise]] to a promise rejected with error.
225            let promise = Promise::new(global, can_gc);
226            promise.reject_native(&error, can_gc);
227
228            // Set writer.[[closedPromise]].[[PromiseIsHandled]] to true.
229            promise.set_promise_is_handled();
230            *self.closed_promise.borrow_mut() = promise;
231        }
232    }
233
234    /// <https://streams.spec.whatwg.org/#writable-stream-default-writer-abort>
235    fn abort(
236        &self,
237        cx: SafeJSContext,
238        global: &GlobalScope,
239        reason: SafeHandleValue,
240        realm: InRealm,
241        can_gc: CanGc,
242    ) -> Rc<Promise> {
243        // Let stream be writer.[[stream]].
244        let Some(stream) = self.stream.get() else {
245            // Assert: stream is not undefined.
246            unreachable!("Stream should be set.");
247        };
248
249        // Return ! WritableStreamAbort(stream, reason).
250        stream.abort(cx, global, reason, realm, can_gc)
251    }
252
253    /// <https://streams.spec.whatwg.org/#writable-stream-default-writer-close>
254    fn close(&self, cx: SafeJSContext, global: &GlobalScope, can_gc: CanGc) -> Rc<Promise> {
255        // Let stream be writer.[[stream]].
256        let Some(stream) = self.stream.get() else {
257            // Assert: stream is not undefined.
258            unreachable!("Stream should be set.");
259        };
260
261        // Return ! WritableStreamClose(stream).
262        stream.close(cx, global, can_gc)
263    }
264
265    /// <https://streams.spec.whatwg.org/#writable-stream-default-writer-write>
266    pub(crate) fn write(
267        &self,
268        cx: SafeJSContext,
269        global: &GlobalScope,
270        chunk: SafeHandleValue,
271        can_gc: CanGc,
272    ) -> Rc<Promise> {
273        // Let stream be writer.[[stream]].
274        let Some(stream) = self.stream.get() else {
275            // Assert: stream is not undefined.
276            unreachable!("Stream should be set.");
277        };
278
279        // Let controller be stream.[[controller]].
280        // Note: asserting controller is some.
281        let Some(controller) = stream.get_controller() else {
282            unreachable!("Controller should be set.");
283        };
284
285        // Let chunkSize be ! WritableStreamDefaultControllerGetChunkSize(controller, chunk).
286        let chunk_size = controller.get_chunk_size(cx, global, chunk, can_gc);
287
288        // If stream is not equal to writer.[[stream]],
289        // return a promise rejected with a TypeError exception.
290        if !self
291            .stream
292            .get()
293            .is_some_and(|current_stream| current_stream == stream)
294        {
295            let promise = Promise::new(global, can_gc);
296            promise.reject_error(
297                Error::Type("Stream is not equal to writer stream".to_string()),
298                can_gc,
299            );
300            return promise;
301        }
302
303        // Let state be stream.[[state]].
304        // If state is "errored",
305        if stream.is_errored() {
306            // return a promise rejected with stream.[[storedError]].
307            rooted!(in(*cx) let mut error = UndefinedValue());
308            stream.get_stored_error(error.handle_mut());
309            let promise = Promise::new(global, can_gc);
310            promise.reject_native(&error.handle(), can_gc);
311            return promise;
312        }
313
314        // If ! WritableStreamCloseQueuedOrInFlight(stream) is true
315        // or state is "closed",
316        if stream.close_queued_or_in_flight() || stream.is_closed() {
317            // return a promise rejected with a TypeError exception
318            // indicating that the stream is closing or closed
319            let promise = Promise::new(global, can_gc);
320            promise.reject_error(
321                Error::Type("Stream has been closed, or has close queued or in-flight".to_string()),
322                can_gc,
323            );
324            return promise;
325        }
326
327        // If state is "erroring",
328        if stream.is_erroring() {
329            // return a promise rejected with stream.[[storedError]].
330            rooted!(in(*cx) let mut error = UndefinedValue());
331            stream.get_stored_error(error.handle_mut());
332            let promise = Promise::new(global, can_gc);
333            promise.reject_native(&error.handle(), can_gc);
334            return promise;
335        }
336
337        // Assert: state is "writable".
338        assert!(stream.is_writable());
339
340        // Let promise be ! WritableStreamAddWriteRequest(stream).
341        let promise = stream.add_write_request(global, can_gc);
342
343        // Perform ! WritableStreamDefaultControllerWrite(controller, chunk, chunkSize).
344        controller.write(cx, global, chunk, chunk_size, can_gc);
345
346        // Return promise.
347        promise
348    }
349
350    /// <https://streams.spec.whatwg.org/#writable-stream-default-writer-release>
351    pub(crate) fn release(&self, cx: SafeJSContext, global: &GlobalScope, can_gc: CanGc) {
352        // Let stream be this.[[stream]].
353        let Some(stream) = self.stream.get() else {
354            // Assert: stream is not undefined.
355            unreachable!("Stream should be set.");
356        };
357
358        // Assert: stream.[[writer]] is writer.
359        assert!(stream.get_writer().is_some_and(|writer| &*writer == self));
360
361        // Let releasedError be a new TypeError.
362        let released_error = Error::Type("Writer has been released".to_string());
363
364        // Root the js val of the error.
365        rooted!(in(*cx) let mut error = UndefinedValue());
366        released_error.to_jsval(cx, global, error.handle_mut(), can_gc);
367
368        // Perform ! WritableStreamDefaultWriterEnsureReadyPromiseRejected(writer, releasedError).
369        self.ensure_ready_promise_rejected(global, error.handle(), can_gc);
370
371        // Perform ! WritableStreamDefaultWriterEnsureClosedPromiseRejected(writer, releasedError).
372        self.ensure_closed_promise_rejected(global, error.handle(), can_gc);
373
374        // Set stream.[[writer]] to undefined.
375        stream.set_writer(None);
376
377        // Set this.[[stream]] to undefined.
378        self.stream.set(None);
379    }
380
381    /// <https://streams.spec.whatwg.org/#writable-stream-default-writer-close-with-error-propagation>
382    pub(crate) fn close_with_error_propagation(
383        &self,
384        cx: SafeJSContext,
385        global: &GlobalScope,
386        can_gc: CanGc,
387    ) -> Rc<Promise> {
388        // Let stream be writer.[[stream]].
389        let Some(stream) = self.stream.get() else {
390            // Assert: stream is not undefined.
391            unreachable!("Stream should be set.");
392        };
393
394        // Let state be stream.[[state]].
395        // Used via stream method calls.
396
397        // If ! WritableStreamCloseQueuedOrInFlight(stream) is true
398        // or state is "closed",
399        if stream.close_queued_or_in_flight() || stream.is_closed() {
400            // return a promise resolved with undefined.
401            let promise = Promise::new(global, can_gc);
402            promise.resolve_native(&(), can_gc);
403            return promise;
404        }
405
406        // If state is "errored",
407        if stream.is_errored() {
408            // return a promise rejected with stream.[[storedError]].
409            rooted!(in(*cx) let mut error = UndefinedValue());
410            stream.get_stored_error(error.handle_mut());
411            let promise = Promise::new(global, can_gc);
412            promise.reject_native(&error.handle(), can_gc);
413            return promise;
414        }
415
416        // Assert: state is "writable" or "erroring".
417        assert!(stream.is_writable() || stream.is_erroring());
418
419        // Return ! WritableStreamDefaultWriterClose(writer).
420        self.close(cx, global, can_gc)
421    }
422
423    pub(crate) fn get_stream(&self) -> Option<DomRoot<WritableStream>> {
424        self.stream.get()
425    }
426}
427
428impl WritableStreamDefaultWriterMethods<crate::DomTypeHolder> for WritableStreamDefaultWriter {
429    /// <https://streams.spec.whatwg.org/#default-writer-closed>
430    fn Closed(&self) -> Rc<Promise> {
431        // Return this.[[closedPromise]].
432        return self.closed_promise.borrow().clone();
433    }
434
435    /// <https://streams.spec.whatwg.org/#default-writer-desired-size>
436    fn GetDesiredSize(&self) -> Result<Option<f64>, Error> {
437        // If this.[[stream]] is undefined, throw a TypeError exception.
438        let Some(stream) = self.stream.get() else {
439            return Err(Error::Type("Stream is undefined".to_string()));
440        };
441
442        // Return ! WritableStreamDefaultWriterGetDesiredSize(this).
443        Ok(stream.get_desired_size())
444    }
445
446    /// <https://streams.spec.whatwg.org/#default-writer-ready>
447    fn Ready(&self) -> Rc<Promise> {
448        // Return this.[[readyPromise]].
449        return self.ready_promise.borrow().clone();
450    }
451
452    /// <https://streams.spec.whatwg.org/#default-writer-abort>
453    fn Abort(
454        &self,
455        cx: SafeJSContext,
456        reason: SafeHandleValue,
457        realm: InRealm,
458        can_gc: CanGc,
459    ) -> Rc<Promise> {
460        let global = GlobalScope::from_safe_context(cx, realm);
461
462        // If this.[[stream]] is undefined,
463        if self.stream.get().is_none() {
464            // return a promise rejected with a TypeError exception.
465            let promise = Promise::new(&global, can_gc);
466            promise.reject_error(Error::Type("Stream is undefined".to_string()), can_gc);
467            return promise;
468        }
469
470        // Return ! WritableStreamDefaultWriterAbort(this, reason).
471        self.abort(cx, &global, reason, realm, can_gc)
472    }
473
474    /// <https://streams.spec.whatwg.org/#default-writer-close>
475    fn Close(&self, in_realm: InRealm, can_gc: CanGc) -> Rc<Promise> {
476        let cx = GlobalScope::get_cx();
477        let global = GlobalScope::from_safe_context(cx, in_realm);
478        let promise = Promise::new(&global, can_gc);
479
480        // Let stream be this.[[stream]].
481        let Some(stream) = self.stream.get() else {
482            // If stream is undefined,
483            // return a promise rejected with a TypeError exception.
484            promise.reject_error(Error::Type("Stream is undefined".to_string()), can_gc);
485            return promise;
486        };
487
488        // If ! WritableStreamCloseQueuedOrInFlight(stream) is true
489        if stream.close_queued_or_in_flight() {
490            // return a promise rejected with a TypeError exception.
491            promise.reject_error(
492                Error::Type("Stream has closed queued or in-flight".to_string()),
493                can_gc,
494            );
495            return promise;
496        }
497
498        self.close(cx, &global, can_gc)
499    }
500
501    /// <https://streams.spec.whatwg.org/#default-writer-release-lock>
502    fn ReleaseLock(&self, can_gc: CanGc) {
503        // Let stream be this.[[stream]].
504        let Some(stream) = self.stream.get() else {
505            // If stream is undefined, return.
506            return;
507        };
508
509        // Assert: stream.[[writer]] is not undefined.
510        assert!(stream.get_writer().is_some());
511
512        let global = self.global();
513        let cx = GlobalScope::get_cx();
514
515        // Perform ! WritableStreamDefaultWriterRelease(this).
516        self.release(cx, &global, can_gc);
517    }
518
519    /// <https://streams.spec.whatwg.org/#default-writer-write>
520    fn Write(
521        &self,
522        cx: SafeJSContext,
523        chunk: SafeHandleValue,
524        realm: InRealm,
525        can_gc: CanGc,
526    ) -> Rc<Promise> {
527        let global = GlobalScope::from_safe_context(cx, realm);
528
529        // If this.[[stream]] is undefined,
530        if self.stream.get().is_none() {
531            // return a promise rejected with a TypeError exception.
532            let global = GlobalScope::from_safe_context(cx, realm);
533            let promise = Promise::new(&global, can_gc);
534            promise.reject_error(Error::Type("Stream is undefined".to_string()), can_gc);
535            return promise;
536        }
537
538        // Return ! WritableStreamDefaultWriterWrite(this, chunk).
539        self.write(cx, &global, chunk, can_gc)
540    }
541
542    /// <https://streams.spec.whatwg.org/#default-writer-constructor>
543    fn Constructor(
544        global: &GlobalScope,
545        proto: Option<SafeHandleObject>,
546        can_gc: CanGc,
547        stream: &WritableStream,
548    ) -> Result<DomRoot<WritableStreamDefaultWriter>, Error> {
549        let writer = WritableStreamDefaultWriter::new(global, proto, can_gc);
550
551        let cx = GlobalScope::get_cx();
552
553        // Perform ? SetUpWritableStreamDefaultWriter(this, stream).
554        writer.setup(cx, stream, can_gc)?;
555
556        Ok(writer)
557    }
558}