script/dom/stream/
writablestreamdefaultwriter.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
4
5use std::cell::RefCell;
6use std::rc::Rc;
7
8use dom_struct::dom_struct;
9use js::jsval::UndefinedValue;
10use js::realm::CurrentRealm;
11use js::rust::{HandleObject as SafeHandleObject, HandleValue as SafeHandleValue};
12
13use crate::dom::bindings::codegen::Bindings::WritableStreamDefaultWriterBinding::WritableStreamDefaultWriterMethods;
14use crate::dom::bindings::error::{Error, ErrorToJsval};
15use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object_with_proto};
16use crate::dom::bindings::root::{DomRoot, MutNullableDom};
17use crate::dom::globalscope::GlobalScope;
18use crate::dom::promise::Promise;
19use crate::dom::stream::writablestream::WritableStream;
20use crate::realms::InRealm;
21use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
22
23/// <https://streams.spec.whatwg.org/#writablestreamdefaultwriter>
24#[dom_struct]
25pub struct WritableStreamDefaultWriter {
26    reflector_: Reflector,
27
28    #[conditional_malloc_size_of]
29    ready_promise: RefCell<Rc<Promise>>,
30
31    /// <https://streams.spec.whatwg.org/#writablestreamdefaultwriter-closedpromise>
32    #[conditional_malloc_size_of]
33    closed_promise: RefCell<Rc<Promise>>,
34
35    /// <https://streams.spec.whatwg.org/#writablestreamdefaultwriter-stream>
36    stream: MutNullableDom<WritableStream>,
37}
38
39impl WritableStreamDefaultWriter {
40    /// <https://streams.spec.whatwg.org/#set-up-writable-stream-default-writer>
41    /// The parts that create a new promise.
42    fn new_inherited(global: &GlobalScope, can_gc: CanGc) -> WritableStreamDefaultWriter {
43        WritableStreamDefaultWriter {
44            reflector_: Reflector::new(),
45            stream: Default::default(),
46            closed_promise: RefCell::new(Promise::new(global, can_gc)),
47            ready_promise: RefCell::new(Promise::new(global, can_gc)),
48        }
49    }
50
51    pub(crate) fn new(
52        global: &GlobalScope,
53        proto: Option<SafeHandleObject>,
54        can_gc: CanGc,
55    ) -> DomRoot<WritableStreamDefaultWriter> {
56        reflect_dom_object_with_proto(
57            Box::new(WritableStreamDefaultWriter::new_inherited(global, can_gc)),
58            global,
59            proto,
60            can_gc,
61        )
62    }
63
64    /// <https://streams.spec.whatwg.org/#set-up-writable-stream-default-writer>
65    /// Continuing from `new_inherited`, the rest.
66    pub(crate) fn setup(
67        &self,
68        cx: SafeJSContext,
69        stream: &WritableStream,
70        can_gc: CanGc,
71    ) -> Result<(), Error> {
72        // If ! IsWritableStreamLocked(stream) is true, throw a TypeError exception.
73        if stream.is_locked() {
74            return Err(Error::Type(c"Stream is locked".to_owned()));
75        }
76
77        // Set writer.[[stream]] to stream.
78        self.stream.set(Some(stream));
79
80        // Set stream.[[writer]] to writer.
81        stream.set_writer(Some(self));
82
83        // Let state be stream.[[state]].
84
85        // If state is "writable",
86        if stream.is_writable() {
87            // If ! WritableStreamCloseQueuedOrInFlight(stream) is false
88            // and stream.[[backpressure]] is true,
89            if !stream.close_queued_or_in_flight() && stream.get_backpressure() {
90                // set writer.[[readyPromise]] to a new promise.
91                // Done in `new_inherited`.
92            } else {
93                // Otherwise, set writer.[[readyPromise]] to a promise resolved with undefined.
94                // Note: new promise created in `new_inherited`.
95                self.ready_promise.borrow().resolve_native(&(), can_gc);
96            }
97
98            // Set writer.[[closedPromise]] to a new promise.
99            // Done in `new_inherited`.
100            return Ok(());
101        }
102
103        // Otherwise, if state is "erroring",
104        if stream.is_erroring() {
105            rooted!(in(*cx) let mut error = UndefinedValue());
106            stream.get_stored_error(error.handle_mut());
107
108            // Set writer.[[readyPromise]] to a promise rejected with stream.[[storedError]].
109            // Set writer.[[readyPromise]].[[PromiseIsHandled]] to true.
110            // Note: new promise created in `new_inherited`.
111            let ready_promise = self.ready_promise.borrow();
112            ready_promise.reject_native(&error.handle(), can_gc);
113            ready_promise.set_promise_is_handled();
114
115            // Set writer.[[closedPromise]] to a new promise.
116            // Done in `new_inherited`.
117            return Ok(());
118        }
119
120        // Otherwise, if state is "closed",
121        if stream.is_closed() {
122            // Set writer.[[readyPromise]] to a promise resolved with undefined.
123            // Note: new promise created in `new_inherited`.
124            self.ready_promise.borrow().resolve_native(&(), can_gc);
125
126            // Set writer.[[closedPromise]] to a promise resolved with undefined.
127            // Note: new promise created in `new_inherited`.
128            self.closed_promise.borrow().resolve_native(&(), can_gc);
129            return Ok(());
130        }
131
132        // Otherwise,
133        // Assert: state is "errored".
134        assert!(stream.is_errored());
135
136        // Let storedError be stream.[[storedError]].
137        rooted!(in(*cx) let mut error = UndefinedValue());
138        stream.get_stored_error(error.handle_mut());
139
140        // Set writer.[[readyPromise]] to a promise rejected with stream.[[storedError]].
141        // Set writer.[[readyPromise]].[[PromiseIsHandled]] to true.
142        // Note: new promise created in `new_inherited`.
143        let ready_promise = self.ready_promise.borrow();
144        ready_promise.reject_native(&error.handle(), can_gc);
145        ready_promise.set_promise_is_handled();
146
147        // Set writer.[[closedPromise]] to a promise rejected with storedError.
148        // Set writer.[[closedPromise]].[[PromiseIsHandled]] to true.
149        // Note: new promise created in `new_inherited`.
150        let ready_promise = self.closed_promise.borrow();
151        ready_promise.reject_native(&error.handle(), can_gc);
152        ready_promise.set_promise_is_handled();
153
154        Ok(())
155    }
156
157    pub(crate) fn reject_closed_promise_with_stored_error(
158        &self,
159        error: &SafeHandleValue,
160        can_gc: CanGc,
161    ) {
162        self.closed_promise.borrow().reject_native(error, can_gc);
163    }
164
165    pub(crate) fn set_close_promise_is_handled(&self) {
166        self.closed_promise.borrow().set_promise_is_handled();
167    }
168
169    pub(crate) fn set_ready_promise(&self, promise: Rc<Promise>) {
170        *self.ready_promise.borrow_mut() = promise;
171    }
172
173    pub(crate) fn resolve_ready_promise_with_undefined(&self, can_gc: CanGc) {
174        self.ready_promise.borrow().resolve_native(&(), can_gc);
175    }
176
177    pub(crate) fn resolve_closed_promise_with_undefined(&self, can_gc: CanGc) {
178        self.closed_promise.borrow().resolve_native(&(), can_gc);
179    }
180
181    /// <https://streams.spec.whatwg.org/#writable-stream-default-writer-ensure-ready-promise-rejected>
182    pub(crate) fn ensure_ready_promise_rejected(
183        &self,
184        global: &GlobalScope,
185        error: SafeHandleValue,
186        can_gc: CanGc,
187    ) {
188        let ready_promise = self.ready_promise.borrow().clone();
189
190        // If writer.[[readyPromise]].[[PromiseState]] is "pending",
191        if ready_promise.is_pending() {
192            // reject writer.[[readyPromise]] with error.
193            ready_promise.reject_native(&error, can_gc);
194
195            // Set writer.[[readyPromise]].[[PromiseIsHandled]] to true.
196            ready_promise.set_promise_is_handled();
197        } else {
198            // Otherwise, set writer.[[readyPromise]] to a promise rejected with error.
199            let promise = Promise::new(global, can_gc);
200            promise.reject_native(&error, can_gc);
201
202            // Set writer.[[readyPromise]].[[PromiseIsHandled]] to true.
203            promise.set_promise_is_handled();
204            *self.ready_promise.borrow_mut() = promise;
205        }
206    }
207
208    /// <https://streams.spec.whatwg.org/#writable-stream-default-writer-ensure-closed-promise-rejected>
209    pub(crate) fn ensure_closed_promise_rejected(
210        &self,
211        global: &GlobalScope,
212        error: SafeHandleValue,
213        can_gc: CanGc,
214    ) {
215        let closed_promise = self.closed_promise.borrow().clone();
216
217        // If writer.[[closedPromise]].[[PromiseState]] is "pending",
218        if closed_promise.is_pending() {
219            // reject writer.[[closedPromise]] with error.
220            closed_promise.reject_native(&error, can_gc);
221
222            // Set writer.[[closedPromise]].[[PromiseIsHandled]] to true.
223            closed_promise.set_promise_is_handled();
224        } else {
225            // Otherwise, set writer.[[closedPromise]] to a promise rejected with error.
226            let promise = Promise::new(global, can_gc);
227            promise.reject_native(&error, can_gc);
228
229            // Set writer.[[closedPromise]].[[PromiseIsHandled]] to true.
230            promise.set_promise_is_handled();
231            *self.closed_promise.borrow_mut() = promise;
232        }
233    }
234
235    /// <https://streams.spec.whatwg.org/#writable-stream-default-writer-abort>
236    fn abort(
237        &self,
238        cx: &mut CurrentRealm,
239        global: &GlobalScope,
240        reason: SafeHandleValue,
241    ) -> Rc<Promise> {
242        // Let stream be writer.[[stream]].
243        let Some(stream) = self.stream.get() else {
244            // Assert: stream is not undefined.
245            unreachable!("Stream should be set.");
246        };
247
248        // Return ! WritableStreamAbort(stream, reason).
249        stream.abort(cx, global, reason)
250    }
251
252    /// <https://streams.spec.whatwg.org/#writable-stream-default-writer-close>
253    fn close(&self, cx: SafeJSContext, global: &GlobalScope, can_gc: CanGc) -> Rc<Promise> {
254        // Let stream be writer.[[stream]].
255        let Some(stream) = self.stream.get() else {
256            // Assert: stream is not undefined.
257            unreachable!("Stream should be set.");
258        };
259
260        // Return ! WritableStreamClose(stream).
261        stream.close(cx, global, can_gc)
262    }
263
264    /// <https://streams.spec.whatwg.org/#writable-stream-default-writer-write>
265    pub(crate) fn write(
266        &self,
267        cx: SafeJSContext,
268        global: &GlobalScope,
269        chunk: SafeHandleValue,
270        can_gc: CanGc,
271    ) -> Rc<Promise> {
272        // Let stream be writer.[[stream]].
273        let Some(stream) = self.stream.get() else {
274            // Assert: stream is not undefined.
275            unreachable!("Stream should be set.");
276        };
277
278        // Let controller be stream.[[controller]].
279        // Note: asserting controller is some.
280        let Some(controller) = stream.get_controller() else {
281            unreachable!("Controller should be set.");
282        };
283
284        // Let chunkSize be ! WritableStreamDefaultControllerGetChunkSize(controller, chunk).
285        let chunk_size = controller.get_chunk_size(cx, global, chunk, can_gc);
286
287        // If stream is not equal to writer.[[stream]],
288        // return a promise rejected with a TypeError exception.
289        if !self
290            .stream
291            .get()
292            .is_some_and(|current_stream| current_stream == stream)
293        {
294            let promise = Promise::new(global, can_gc);
295            promise.reject_error(
296                Error::Type(c"Stream is not equal to writer stream".to_owned()),
297                can_gc,
298            );
299            return promise;
300        }
301
302        // Let state be stream.[[state]].
303        // If state is "errored",
304        if stream.is_errored() {
305            // return a promise rejected with stream.[[storedError]].
306            rooted!(in(*cx) let mut error = UndefinedValue());
307            stream.get_stored_error(error.handle_mut());
308            let promise = Promise::new(global, can_gc);
309            promise.reject_native(&error.handle(), can_gc);
310            return promise;
311        }
312
313        // If ! WritableStreamCloseQueuedOrInFlight(stream) is true
314        // or state is "closed",
315        if stream.close_queued_or_in_flight() || stream.is_closed() {
316            // return a promise rejected with a TypeError exception
317            // indicating that the stream is closing or closed
318            let promise = Promise::new(global, can_gc);
319            promise.reject_error(
320                Error::Type(c"Stream has been closed, or has close queued or in-flight".to_owned()),
321                can_gc,
322            );
323            return promise;
324        }
325
326        // If state is "erroring",
327        if stream.is_erroring() {
328            // return a promise rejected with stream.[[storedError]].
329            rooted!(in(*cx) let mut error = UndefinedValue());
330            stream.get_stored_error(error.handle_mut());
331            let promise = Promise::new(global, can_gc);
332            promise.reject_native(&error.handle(), can_gc);
333            return promise;
334        }
335
336        // Assert: state is "writable".
337        assert!(stream.is_writable());
338
339        // Let promise be ! WritableStreamAddWriteRequest(stream).
340        let promise = stream.add_write_request(global, can_gc);
341
342        // Perform ! WritableStreamDefaultControllerWrite(controller, chunk, chunkSize).
343        controller.write(cx, global, chunk, chunk_size, can_gc);
344
345        // Return promise.
346        promise
347    }
348
349    /// <https://streams.spec.whatwg.org/#writable-stream-default-writer-release>
350    pub(crate) fn release(&self, cx: SafeJSContext, global: &GlobalScope, can_gc: CanGc) {
351        // Let stream be this.[[stream]].
352        let Some(stream) = self.stream.get() else {
353            // Assert: stream is not undefined.
354            unreachable!("Stream should be set.");
355        };
356
357        // Assert: stream.[[writer]] is writer.
358        assert!(stream.get_writer().is_some_and(|writer| &*writer == self));
359
360        // Let releasedError be a new TypeError.
361        let released_error = Error::Type(c"Writer has been released".to_owned());
362
363        // Root the js val of the error.
364        rooted!(in(*cx) let mut error = UndefinedValue());
365        released_error.to_jsval(cx, global, error.handle_mut(), can_gc);
366
367        // Perform ! WritableStreamDefaultWriterEnsureReadyPromiseRejected(writer, releasedError).
368        self.ensure_ready_promise_rejected(global, error.handle(), can_gc);
369
370        // Perform ! WritableStreamDefaultWriterEnsureClosedPromiseRejected(writer, releasedError).
371        self.ensure_closed_promise_rejected(global, error.handle(), can_gc);
372
373        // Set stream.[[writer]] to undefined.
374        stream.set_writer(None);
375
376        // Set this.[[stream]] to undefined.
377        self.stream.set(None);
378    }
379
380    /// <https://streams.spec.whatwg.org/#writable-stream-default-writer-close-with-error-propagation>
381    pub(crate) fn close_with_error_propagation(
382        &self,
383        cx: SafeJSContext,
384        global: &GlobalScope,
385        can_gc: CanGc,
386    ) -> Rc<Promise> {
387        // Let stream be writer.[[stream]].
388        let Some(stream) = self.stream.get() else {
389            // Assert: stream is not undefined.
390            unreachable!("Stream should be set.");
391        };
392
393        // Let state be stream.[[state]].
394        // Used via stream method calls.
395
396        // If ! WritableStreamCloseQueuedOrInFlight(stream) is true
397        // or state is "closed",
398        if stream.close_queued_or_in_flight() || stream.is_closed() {
399            // return a promise resolved with undefined.
400            let promise = Promise::new(global, can_gc);
401            promise.resolve_native(&(), can_gc);
402            return promise;
403        }
404
405        // If state is "errored",
406        if stream.is_errored() {
407            // return a promise rejected with stream.[[storedError]].
408            rooted!(in(*cx) let mut error = UndefinedValue());
409            stream.get_stored_error(error.handle_mut());
410            let promise = Promise::new(global, can_gc);
411            promise.reject_native(&error.handle(), can_gc);
412            return promise;
413        }
414
415        // Assert: state is "writable" or "erroring".
416        assert!(stream.is_writable() || stream.is_erroring());
417
418        // Return ! WritableStreamDefaultWriterClose(writer).
419        self.close(cx, global, can_gc)
420    }
421
422    pub(crate) fn get_stream(&self) -> Option<DomRoot<WritableStream>> {
423        self.stream.get()
424    }
425}
426
427impl WritableStreamDefaultWriterMethods<crate::DomTypeHolder> for WritableStreamDefaultWriter {
428    /// <https://streams.spec.whatwg.org/#default-writer-closed>
429    fn Closed(&self) -> Rc<Promise> {
430        // Return this.[[closedPromise]].
431        return self.closed_promise.borrow().clone();
432    }
433
434    /// <https://streams.spec.whatwg.org/#default-writer-desired-size>
435    fn GetDesiredSize(&self) -> Result<Option<f64>, Error> {
436        // If this.[[stream]] is undefined, throw a TypeError exception.
437        let Some(stream) = self.stream.get() else {
438            return Err(Error::Type(c"Stream is undefined".to_owned()));
439        };
440
441        // Return ! WritableStreamDefaultWriterGetDesiredSize(this).
442        Ok(stream.get_desired_size())
443    }
444
445    /// <https://streams.spec.whatwg.org/#default-writer-ready>
446    fn Ready(&self) -> Rc<Promise> {
447        // Return this.[[readyPromise]].
448        return self.ready_promise.borrow().clone();
449    }
450
451    /// <https://streams.spec.whatwg.org/#default-writer-abort>
452    fn Abort(&self, cx: &mut CurrentRealm, reason: SafeHandleValue) -> Rc<Promise> {
453        let global = GlobalScope::from_current_realm(cx);
454
455        // If this.[[stream]] is undefined,
456        if self.stream.get().is_none() {
457            // return a promise rejected with a TypeError exception.
458            let promise = Promise::new2(cx, &global);
459            promise.reject_error(
460                Error::Type(c"Stream is undefined".to_owned()),
461                CanGc::from_cx(cx),
462            );
463            return promise;
464        }
465
466        // Return ! WritableStreamDefaultWriterAbort(this, reason).
467        self.abort(cx, &global, reason)
468    }
469
470    /// <https://streams.spec.whatwg.org/#default-writer-close>
471    fn Close(&self, in_realm: InRealm, can_gc: CanGc) -> Rc<Promise> {
472        let cx = GlobalScope::get_cx();
473        let global = GlobalScope::from_safe_context(cx, in_realm);
474        let promise = Promise::new(&global, can_gc);
475
476        // Let stream be this.[[stream]].
477        let Some(stream) = self.stream.get() else {
478            // If stream is undefined,
479            // return a promise rejected with a TypeError exception.
480            promise.reject_error(Error::Type(c"Stream is undefined".to_owned()), can_gc);
481            return promise;
482        };
483
484        // If ! WritableStreamCloseQueuedOrInFlight(stream) is true
485        if stream.close_queued_or_in_flight() {
486            // return a promise rejected with a TypeError exception.
487            promise.reject_error(
488                Error::Type(c"Stream has closed queued or in-flight".to_owned()),
489                can_gc,
490            );
491            return promise;
492        }
493
494        self.close(cx, &global, can_gc)
495    }
496
497    /// <https://streams.spec.whatwg.org/#default-writer-release-lock>
498    fn ReleaseLock(&self, can_gc: CanGc) {
499        // Let stream be this.[[stream]].
500        let Some(stream) = self.stream.get() else {
501            // If stream is undefined, return.
502            return;
503        };
504
505        // Assert: stream.[[writer]] is not undefined.
506        assert!(stream.get_writer().is_some());
507
508        let global = self.global();
509        let cx = GlobalScope::get_cx();
510
511        // Perform ! WritableStreamDefaultWriterRelease(this).
512        self.release(cx, &global, can_gc);
513    }
514
515    /// <https://streams.spec.whatwg.org/#default-writer-write>
516    fn Write(
517        &self,
518        cx: SafeJSContext,
519        chunk: SafeHandleValue,
520        realm: InRealm,
521        can_gc: CanGc,
522    ) -> Rc<Promise> {
523        let global = GlobalScope::from_safe_context(cx, realm);
524
525        // If this.[[stream]] is undefined,
526        if self.stream.get().is_none() {
527            // return a promise rejected with a TypeError exception.
528            let global = GlobalScope::from_safe_context(cx, realm);
529            let promise = Promise::new(&global, can_gc);
530            promise.reject_error(Error::Type(c"Stream is undefined".to_owned()), can_gc);
531            return promise;
532        }
533
534        // Return ! WritableStreamDefaultWriterWrite(this, chunk).
535        self.write(cx, &global, chunk, can_gc)
536    }
537
538    /// <https://streams.spec.whatwg.org/#default-writer-constructor>
539    fn Constructor(
540        global: &GlobalScope,
541        proto: Option<SafeHandleObject>,
542        can_gc: CanGc,
543        stream: &WritableStream,
544    ) -> Result<DomRoot<WritableStreamDefaultWriter>, Error> {
545        let writer = WritableStreamDefaultWriter::new(global, proto, can_gc);
546
547        let cx = GlobalScope::get_cx();
548
549        // Perform ? SetUpWritableStreamDefaultWriter(this, stream).
550        writer.setup(cx, stream, can_gc)?;
551
552        Ok(writer)
553    }
554}