Skip to main content

script/dom/stream/
writablestreamdefaultwriter.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
4
5use std::cell::RefCell;
6use std::rc::Rc;
7
8use dom_struct::dom_struct;
9use js::context::JSContext;
10use js::jsval::UndefinedValue;
11use js::realm::CurrentRealm;
12use js::rust::{HandleObject as SafeHandleObject, HandleValue as SafeHandleValue};
13use script_bindings::reflector::{Reflector, reflect_dom_object_with_proto};
14
15use crate::dom::bindings::codegen::Bindings::WritableStreamDefaultWriterBinding::WritableStreamDefaultWriterMethods;
16use crate::dom::bindings::error::{Error, ErrorToJsval};
17use crate::dom::bindings::reflector::DomGlobal;
18use crate::dom::bindings::root::{DomRoot, MutNullableDom};
19use crate::dom::globalscope::GlobalScope;
20use crate::dom::promise::Promise;
21use crate::dom::stream::writablestream::WritableStream;
22use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
23
24/// <https://streams.spec.whatwg.org/#writablestreamdefaultwriter>
25#[dom_struct]
26pub struct WritableStreamDefaultWriter {
27    reflector_: Reflector,
28
29    #[conditional_malloc_size_of]
30    ready_promise: RefCell<Rc<Promise>>,
31
32    /// <https://streams.spec.whatwg.org/#writablestreamdefaultwriter-closedpromise>
33    #[conditional_malloc_size_of]
34    closed_promise: RefCell<Rc<Promise>>,
35
36    /// <https://streams.spec.whatwg.org/#writablestreamdefaultwriter-stream>
37    stream: MutNullableDom<WritableStream>,
38}
39
40impl WritableStreamDefaultWriter {
41    /// <https://streams.spec.whatwg.org/#set-up-writable-stream-default-writer>
42    /// The parts that create a new promise.
43    fn new_inherited(global: &GlobalScope, can_gc: CanGc) -> WritableStreamDefaultWriter {
44        WritableStreamDefaultWriter {
45            reflector_: Reflector::new(),
46            stream: Default::default(),
47            closed_promise: RefCell::new(Promise::new(global, can_gc)),
48            ready_promise: RefCell::new(Promise::new(global, can_gc)),
49        }
50    }
51
52    pub(crate) fn new(
53        global: &GlobalScope,
54        proto: Option<SafeHandleObject>,
55        can_gc: CanGc,
56    ) -> DomRoot<WritableStreamDefaultWriter> {
57        reflect_dom_object_with_proto(
58            Box::new(WritableStreamDefaultWriter::new_inherited(global, can_gc)),
59            global,
60            proto,
61            can_gc,
62        )
63    }
64
65    /// <https://streams.spec.whatwg.org/#set-up-writable-stream-default-writer>
66    /// Continuing from `new_inherited`, the rest.
67    pub(crate) fn setup(
68        &self,
69        cx: SafeJSContext,
70        stream: &WritableStream,
71        can_gc: CanGc,
72    ) -> Result<(), Error> {
73        // If ! IsWritableStreamLocked(stream) is true, throw a TypeError exception.
74        if stream.is_locked() {
75            return Err(Error::Type(c"Stream is locked".to_owned()));
76        }
77
78        // Set writer.[[stream]] to stream.
79        self.stream.set(Some(stream));
80
81        // Set stream.[[writer]] to writer.
82        stream.set_writer(Some(self));
83
84        // Let state be stream.[[state]].
85
86        // If state is "writable",
87        if stream.is_writable() {
88            // If ! WritableStreamCloseQueuedOrInFlight(stream) is false
89            // and stream.[[backpressure]] is true,
90            if !stream.close_queued_or_in_flight() && stream.get_backpressure() {
91                // set writer.[[readyPromise]] to a new promise.
92                // Done in `new_inherited`.
93            } else {
94                // Otherwise, set writer.[[readyPromise]] to a promise resolved with undefined.
95                // Note: new promise created in `new_inherited`.
96                self.ready_promise.borrow().resolve_native(&(), can_gc);
97            }
98
99            // Set writer.[[closedPromise]] to a new promise.
100            // Done in `new_inherited`.
101            return Ok(());
102        }
103
104        // Otherwise, if state is "erroring",
105        if stream.is_erroring() {
106            rooted!(in(*cx) let mut error = UndefinedValue());
107            stream.get_stored_error(error.handle_mut());
108
109            // Set writer.[[readyPromise]] to a promise rejected with stream.[[storedError]].
110            // Set writer.[[readyPromise]].[[PromiseIsHandled]] to true.
111            // Note: new promise created in `new_inherited`.
112            let ready_promise = self.ready_promise.borrow();
113            ready_promise.reject_native(&error.handle(), can_gc);
114            ready_promise.set_promise_is_handled();
115
116            // Set writer.[[closedPromise]] to a new promise.
117            // Done in `new_inherited`.
118            return Ok(());
119        }
120
121        // Otherwise, if state is "closed",
122        if stream.is_closed() {
123            // Set writer.[[readyPromise]] to a promise resolved with undefined.
124            // Note: new promise created in `new_inherited`.
125            self.ready_promise.borrow().resolve_native(&(), can_gc);
126
127            // Set writer.[[closedPromise]] to a promise resolved with undefined.
128            // Note: new promise created in `new_inherited`.
129            self.closed_promise.borrow().resolve_native(&(), can_gc);
130            return Ok(());
131        }
132
133        // Otherwise,
134        // Assert: state is "errored".
135        assert!(stream.is_errored());
136
137        // Let storedError be stream.[[storedError]].
138        rooted!(in(*cx) let mut error = UndefinedValue());
139        stream.get_stored_error(error.handle_mut());
140
141        // Set writer.[[readyPromise]] to a promise rejected with stream.[[storedError]].
142        // Set writer.[[readyPromise]].[[PromiseIsHandled]] to true.
143        // Note: new promise created in `new_inherited`.
144        let ready_promise = self.ready_promise.borrow();
145        ready_promise.reject_native(&error.handle(), can_gc);
146        ready_promise.set_promise_is_handled();
147
148        // Set writer.[[closedPromise]] to a promise rejected with storedError.
149        // Set writer.[[closedPromise]].[[PromiseIsHandled]] to true.
150        // Note: new promise created in `new_inherited`.
151        let ready_promise = self.closed_promise.borrow();
152        ready_promise.reject_native(&error.handle(), can_gc);
153        ready_promise.set_promise_is_handled();
154
155        Ok(())
156    }
157
158    pub(crate) fn reject_closed_promise_with_stored_error(
159        &self,
160        cx: &mut JSContext,
161        error: &SafeHandleValue,
162    ) {
163        self.closed_promise
164            .borrow()
165            .reject_native_with_cx(cx, error);
166    }
167
168    pub(crate) fn set_close_promise_is_handled(&self) {
169        self.closed_promise.borrow().set_promise_is_handled();
170    }
171
172    pub(crate) fn set_ready_promise(&self, promise: Rc<Promise>) {
173        *self.ready_promise.borrow_mut() = promise;
174    }
175
176    pub(crate) fn resolve_ready_promise_with_undefined(&self, can_gc: CanGc) {
177        self.ready_promise.borrow().resolve_native(&(), can_gc);
178    }
179
180    pub(crate) fn resolve_closed_promise_with_undefined(&self, can_gc: CanGc) {
181        self.closed_promise.borrow().resolve_native(&(), can_gc);
182    }
183
184    /// <https://streams.spec.whatwg.org/#writable-stream-default-writer-ensure-ready-promise-rejected>
185    pub(crate) fn ensure_ready_promise_rejected(
186        &self,
187        cx: &mut JSContext,
188        global: &GlobalScope,
189        error: SafeHandleValue,
190    ) {
191        let ready_promise = self.ready_promise.borrow().clone();
192
193        // If writer.[[readyPromise]].[[PromiseState]] is "pending",
194        if ready_promise.is_pending() {
195            // reject writer.[[readyPromise]] with error.
196            ready_promise.reject_native_with_cx(cx, &error);
197
198            // Set writer.[[readyPromise]].[[PromiseIsHandled]] to true.
199            ready_promise.set_promise_is_handled();
200        } else {
201            // Otherwise, set writer.[[readyPromise]] to a promise rejected with error.
202            let promise = Promise::new_rejected(cx, global, error);
203
204            // Set writer.[[readyPromise]].[[PromiseIsHandled]] to true.
205            promise.set_promise_is_handled();
206            *self.ready_promise.borrow_mut() = promise;
207        }
208    }
209
210    /// <https://streams.spec.whatwg.org/#writable-stream-default-writer-ensure-closed-promise-rejected>
211    fn ensure_closed_promise_rejected(
212        &self,
213        cx: &mut JSContext,
214        global: &GlobalScope,
215        error: SafeHandleValue,
216    ) {
217        let closed_promise = self.closed_promise.borrow().clone();
218
219        // If writer.[[closedPromise]].[[PromiseState]] is "pending",
220        if closed_promise.is_pending() {
221            // reject writer.[[closedPromise]] with error.
222            closed_promise.reject_native_with_cx(cx, &error);
223
224            // Set writer.[[closedPromise]].[[PromiseIsHandled]] to true.
225            closed_promise.set_promise_is_handled();
226        } else {
227            // Otherwise, set writer.[[closedPromise]] to a promise rejected with error.
228            let promise = Promise::new_rejected(cx, global, error);
229
230            // Set writer.[[closedPromise]].[[PromiseIsHandled]] to true.
231            promise.set_promise_is_handled();
232            *self.closed_promise.borrow_mut() = promise;
233        }
234    }
235
236    /// <https://streams.spec.whatwg.org/#writable-stream-default-writer-abort>
237    fn abort(
238        &self,
239        cx: &mut CurrentRealm,
240        global: &GlobalScope,
241        reason: SafeHandleValue,
242    ) -> Rc<Promise> {
243        // Let stream be writer.[[stream]].
244        let Some(stream) = self.stream.get() else {
245            // Assert: stream is not undefined.
246            unreachable!("Stream should be set.");
247        };
248
249        // Return ! WritableStreamAbort(stream, reason).
250        stream.abort(cx, global, reason)
251    }
252
253    /// <https://streams.spec.whatwg.org/#writable-stream-default-writer-close>
254    fn close(&self, cx: &mut JSContext, global: &GlobalScope) -> Rc<Promise> {
255        // Let stream be writer.[[stream]].
256        let Some(stream) = self.stream.get() else {
257            // Assert: stream is not undefined.
258            unreachable!("Stream should be set.");
259        };
260
261        // Return ! WritableStreamClose(stream).
262        stream.close(cx, global)
263    }
264
265    /// <https://streams.spec.whatwg.org/#writable-stream-default-writer-write>
266    pub(crate) fn write(
267        &self,
268        cx: &mut JSContext,
269        global: &GlobalScope,
270        chunk: SafeHandleValue,
271    ) -> Rc<Promise> {
272        // Let stream be writer.[[stream]].
273        let Some(stream) = self.stream.get() else {
274            // Assert: stream is not undefined.
275            unreachable!("Stream should be set.");
276        };
277
278        // Let controller be stream.[[controller]].
279        // Note: asserting controller is some.
280        let Some(controller) = stream.get_controller() else {
281            unreachable!("Controller should be set.");
282        };
283
284        // Let chunkSize be ! WritableStreamDefaultControllerGetChunkSize(controller, chunk).
285        let chunk_size = controller.get_chunk_size(cx, global, chunk);
286
287        // If stream is not equal to writer.[[stream]],
288        // return a promise rejected with a TypeError exception.
289        if !self
290            .stream
291            .get()
292            .is_some_and(|current_stream| current_stream == stream)
293        {
294            let promise = Promise::new2(cx, global);
295            promise.reject_error_with_cx(
296                cx,
297                Error::Type(c"Stream is not equal to writer stream".to_owned()),
298            );
299            return promise;
300        }
301
302        // Let state be stream.[[state]].
303        // If state is "errored",
304        if stream.is_errored() {
305            // return a promise rejected with stream.[[storedError]].
306            rooted!(&in(cx) let mut error = UndefinedValue());
307            stream.get_stored_error(error.handle_mut());
308            let promise = Promise::new2(cx, global);
309            promise.reject_native_with_cx(cx, &error.handle());
310            return promise;
311        }
312
313        // If ! WritableStreamCloseQueuedOrInFlight(stream) is true
314        // or state is "closed",
315        if stream.close_queued_or_in_flight() || stream.is_closed() {
316            // return a promise rejected with a TypeError exception
317            // indicating that the stream is closing or closed
318            let promise = Promise::new2(cx, global);
319            promise.reject_error_with_cx(
320                cx,
321                Error::Type(c"Stream has been closed, or has close queued or in-flight".to_owned()),
322            );
323            return promise;
324        }
325
326        // If state is "erroring",
327        if stream.is_erroring() {
328            // return a promise rejected with stream.[[storedError]].
329            rooted!(&in(cx) let mut error = UndefinedValue());
330            stream.get_stored_error(error.handle_mut());
331            let promise = Promise::new2(cx, global);
332            promise.reject_native_with_cx(cx, &error.handle());
333            return promise;
334        }
335
336        // Assert: state is "writable".
337        assert!(stream.is_writable());
338
339        // Let promise be ! WritableStreamAddWriteRequest(stream).
340        let promise = stream.add_write_request(global, CanGc::from_cx(cx));
341
342        // Perform ! WritableStreamDefaultControllerWrite(controller, chunk, chunkSize).
343        controller.write(cx, global, chunk, chunk_size);
344
345        // Return promise.
346        promise
347    }
348
349    /// <https://streams.spec.whatwg.org/#writable-stream-default-writer-release>
350    pub(crate) fn release(&self, cx: &mut JSContext, global: &GlobalScope) {
351        // Let stream be this.[[stream]].
352        let Some(stream) = self.stream.get() else {
353            // Assert: stream is not undefined.
354            unreachable!("Stream should be set.");
355        };
356
357        // Assert: stream.[[writer]] is writer.
358        assert!(stream.get_writer().is_some_and(|writer| &*writer == self));
359
360        // Let releasedError be a new TypeError.
361        let released_error = Error::Type(c"Writer has been released".to_owned());
362
363        // Root the js val of the error.
364        rooted!(&in(cx) let mut error = UndefinedValue());
365        released_error.to_jsval(cx.into(), global, error.handle_mut(), CanGc::from_cx(cx));
366
367        // Perform ! WritableStreamDefaultWriterEnsureReadyPromiseRejected(writer, releasedError).
368        self.ensure_ready_promise_rejected(cx, global, error.handle());
369
370        // Perform ! WritableStreamDefaultWriterEnsureClosedPromiseRejected(writer, releasedError).
371        self.ensure_closed_promise_rejected(cx, global, error.handle());
372
373        // Set stream.[[writer]] to undefined.
374        stream.set_writer(None);
375
376        // Set this.[[stream]] to undefined.
377        self.stream.set(None);
378    }
379
380    /// <https://streams.spec.whatwg.org/#writable-stream-default-writer-close-with-error-propagation>
381    pub(crate) fn close_with_error_propagation(
382        &self,
383        cx: &mut JSContext,
384        global: &GlobalScope,
385    ) -> Rc<Promise> {
386        // Let stream be writer.[[stream]].
387        let Some(stream) = self.stream.get() else {
388            // Assert: stream is not undefined.
389            unreachable!("Stream should be set.");
390        };
391
392        // Let state be stream.[[state]].
393        // Used via stream method calls.
394
395        // If ! WritableStreamCloseQueuedOrInFlight(stream) is true
396        // or state is "closed",
397        if stream.close_queued_or_in_flight() || stream.is_closed() {
398            // return a promise resolved with undefined.
399            let promise = Promise::new2(cx, global);
400            promise.resolve_native_with_cx(cx, &());
401            return promise;
402        }
403
404        // If state is "errored",
405        if stream.is_errored() {
406            // return a promise rejected with stream.[[storedError]].
407            rooted!(&in(cx) let mut error = UndefinedValue());
408            stream.get_stored_error(error.handle_mut());
409            let promise = Promise::new2(cx, global);
410            promise.reject_native_with_cx(cx, &error.handle());
411            return promise;
412        }
413
414        // Assert: state is "writable" or "erroring".
415        assert!(stream.is_writable() || stream.is_erroring());
416
417        // Return ! WritableStreamDefaultWriterClose(writer).
418        self.close(cx, global)
419    }
420
421    pub(crate) fn get_stream(&self) -> Option<DomRoot<WritableStream>> {
422        self.stream.get()
423    }
424}
425
426impl WritableStreamDefaultWriterMethods<crate::DomTypeHolder> for WritableStreamDefaultWriter {
427    /// <https://streams.spec.whatwg.org/#default-writer-closed>
428    fn Closed(&self) -> Rc<Promise> {
429        // Return this.[[closedPromise]].
430        return self.closed_promise.borrow().clone();
431    }
432
433    /// <https://streams.spec.whatwg.org/#default-writer-desired-size>
434    fn GetDesiredSize(&self) -> Result<Option<f64>, Error> {
435        // If this.[[stream]] is undefined, throw a TypeError exception.
436        let Some(stream) = self.stream.get() else {
437            return Err(Error::Type(c"Stream is undefined".to_owned()));
438        };
439
440        // Return ! WritableStreamDefaultWriterGetDesiredSize(this).
441        Ok(stream.get_desired_size())
442    }
443
444    /// <https://streams.spec.whatwg.org/#default-writer-ready>
445    fn Ready(&self) -> Rc<Promise> {
446        // Return this.[[readyPromise]].
447        return self.ready_promise.borrow().clone();
448    }
449
450    /// <https://streams.spec.whatwg.org/#default-writer-abort>
451    fn Abort(&self, cx: &mut CurrentRealm, reason: SafeHandleValue) -> Rc<Promise> {
452        let global = GlobalScope::from_current_realm(cx);
453
454        // If this.[[stream]] is undefined,
455        if self.stream.get().is_none() {
456            // return a promise rejected with a TypeError exception.
457            let promise = Promise::new2(cx, &global);
458            promise.reject_error_with_cx(cx, Error::Type(c"Stream is undefined".to_owned()));
459            return promise;
460        }
461
462        // Return ! WritableStreamDefaultWriterAbort(this, reason).
463        self.abort(cx, &global, reason)
464    }
465
466    /// <https://streams.spec.whatwg.org/#default-writer-close>
467    fn Close(&self, cx: &mut CurrentRealm) -> Rc<Promise> {
468        let global = GlobalScope::from_current_realm(cx);
469        let promise = Promise::new2(cx, &global);
470
471        // Let stream be this.[[stream]].
472        let Some(stream) = self.stream.get() else {
473            // If stream is undefined,
474            // return a promise rejected with a TypeError exception.
475            promise.reject_error_with_cx(cx, Error::Type(c"Stream is undefined".to_owned()));
476            return promise;
477        };
478
479        // If ! WritableStreamCloseQueuedOrInFlight(stream) is true
480        if stream.close_queued_or_in_flight() {
481            // return a promise rejected with a TypeError exception.
482            promise.reject_error_with_cx(
483                cx,
484                Error::Type(c"Stream has closed queued or in-flight".to_owned()),
485            );
486            return promise;
487        }
488
489        self.close(cx, &global)
490    }
491
492    /// <https://streams.spec.whatwg.org/#default-writer-release-lock>
493    fn ReleaseLock(&self, cx: &mut JSContext) {
494        // Let stream be this.[[stream]].
495        let Some(stream) = self.stream.get() else {
496            // If stream is undefined, return.
497            return;
498        };
499
500        // Assert: stream.[[writer]] is not undefined.
501        assert!(stream.get_writer().is_some());
502
503        let global = self.global();
504
505        // Perform ! WritableStreamDefaultWriterRelease(this).
506        self.release(cx, &global);
507    }
508
509    /// <https://streams.spec.whatwg.org/#default-writer-write>
510    fn Write(&self, cx: &mut CurrentRealm, chunk: SafeHandleValue) -> Rc<Promise> {
511        let global = GlobalScope::from_current_realm(cx);
512
513        // If this.[[stream]] is undefined,
514        if self.stream.get().is_none() {
515            // return a promise rejected with a TypeError exception.
516            let promise = Promise::new2(cx, &global);
517            promise.reject_error_with_cx(cx, Error::Type(c"Stream is undefined".to_owned()));
518            return promise;
519        }
520
521        // Return ! WritableStreamDefaultWriterWrite(this, chunk).
522        self.write(cx, &global, chunk)
523    }
524
525    /// <https://streams.spec.whatwg.org/#default-writer-constructor>
526    fn Constructor(
527        global: &GlobalScope,
528        proto: Option<SafeHandleObject>,
529        can_gc: CanGc,
530        stream: &WritableStream,
531    ) -> Result<DomRoot<WritableStreamDefaultWriter>, Error> {
532        let writer = WritableStreamDefaultWriter::new(global, proto, can_gc);
533
534        let cx = GlobalScope::get_cx();
535
536        // Perform ? SetUpWritableStreamDefaultWriter(this, stream).
537        writer.setup(cx, stream, can_gc)?;
538
539        Ok(writer)
540    }
541}