Skip to main content

script/dom/stream/
writablestreamdefaultwriter.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
4
5use std::cell::RefCell;
6use std::rc::Rc;
7
8use dom_struct::dom_struct;
9use js::jsval::UndefinedValue;
10use js::realm::CurrentRealm;
11use js::rust::{HandleObject as SafeHandleObject, HandleValue as SafeHandleValue};
12use script_bindings::reflector::{Reflector, reflect_dom_object_with_proto};
13
14use crate::dom::bindings::codegen::Bindings::WritableStreamDefaultWriterBinding::WritableStreamDefaultWriterMethods;
15use crate::dom::bindings::error::{Error, ErrorToJsval};
16use crate::dom::bindings::reflector::DomGlobal;
17use crate::dom::bindings::root::{DomRoot, MutNullableDom};
18use crate::dom::globalscope::GlobalScope;
19use crate::dom::promise::Promise;
20use crate::dom::stream::writablestream::WritableStream;
21use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
22
23/// <https://streams.spec.whatwg.org/#writablestreamdefaultwriter>
24#[dom_struct]
25pub struct WritableStreamDefaultWriter {
26    reflector_: Reflector,
27
28    #[conditional_malloc_size_of]
29    ready_promise: RefCell<Rc<Promise>>,
30
31    /// <https://streams.spec.whatwg.org/#writablestreamdefaultwriter-closedpromise>
32    #[conditional_malloc_size_of]
33    closed_promise: RefCell<Rc<Promise>>,
34
35    /// <https://streams.spec.whatwg.org/#writablestreamdefaultwriter-stream>
36    stream: MutNullableDom<WritableStream>,
37}
38
39impl WritableStreamDefaultWriter {
40    /// <https://streams.spec.whatwg.org/#set-up-writable-stream-default-writer>
41    /// The parts that create a new promise.
42    fn new_inherited(global: &GlobalScope, can_gc: CanGc) -> WritableStreamDefaultWriter {
43        WritableStreamDefaultWriter {
44            reflector_: Reflector::new(),
45            stream: Default::default(),
46            closed_promise: RefCell::new(Promise::new(global, can_gc)),
47            ready_promise: RefCell::new(Promise::new(global, can_gc)),
48        }
49    }
50
51    pub(crate) fn new(
52        global: &GlobalScope,
53        proto: Option<SafeHandleObject>,
54        can_gc: CanGc,
55    ) -> DomRoot<WritableStreamDefaultWriter> {
56        reflect_dom_object_with_proto(
57            Box::new(WritableStreamDefaultWriter::new_inherited(global, can_gc)),
58            global,
59            proto,
60            can_gc,
61        )
62    }
63
64    /// <https://streams.spec.whatwg.org/#set-up-writable-stream-default-writer>
65    /// Continuing from `new_inherited`, the rest.
66    pub(crate) fn setup(
67        &self,
68        cx: SafeJSContext,
69        stream: &WritableStream,
70        can_gc: CanGc,
71    ) -> Result<(), Error> {
72        // If ! IsWritableStreamLocked(stream) is true, throw a TypeError exception.
73        if stream.is_locked() {
74            return Err(Error::Type(c"Stream is locked".to_owned()));
75        }
76
77        // Set writer.[[stream]] to stream.
78        self.stream.set(Some(stream));
79
80        // Set stream.[[writer]] to writer.
81        stream.set_writer(Some(self));
82
83        // Let state be stream.[[state]].
84
85        // If state is "writable",
86        if stream.is_writable() {
87            // If ! WritableStreamCloseQueuedOrInFlight(stream) is false
88            // and stream.[[backpressure]] is true,
89            if !stream.close_queued_or_in_flight() && stream.get_backpressure() {
90                // set writer.[[readyPromise]] to a new promise.
91                // Done in `new_inherited`.
92            } else {
93                // Otherwise, set writer.[[readyPromise]] to a promise resolved with undefined.
94                // Note: new promise created in `new_inherited`.
95                self.ready_promise.borrow().resolve_native(&(), can_gc);
96            }
97
98            // Set writer.[[closedPromise]] to a new promise.
99            // Done in `new_inherited`.
100            return Ok(());
101        }
102
103        // Otherwise, if state is "erroring",
104        if stream.is_erroring() {
105            rooted!(in(*cx) let mut error = UndefinedValue());
106            stream.get_stored_error(error.handle_mut());
107
108            // Set writer.[[readyPromise]] to a promise rejected with stream.[[storedError]].
109            // Set writer.[[readyPromise]].[[PromiseIsHandled]] to true.
110            // Note: new promise created in `new_inherited`.
111            let ready_promise = self.ready_promise.borrow();
112            ready_promise.reject_native(&error.handle(), can_gc);
113            ready_promise.set_promise_is_handled();
114
115            // Set writer.[[closedPromise]] to a new promise.
116            // Done in `new_inherited`.
117            return Ok(());
118        }
119
120        // Otherwise, if state is "closed",
121        if stream.is_closed() {
122            // Set writer.[[readyPromise]] to a promise resolved with undefined.
123            // Note: new promise created in `new_inherited`.
124            self.ready_promise.borrow().resolve_native(&(), can_gc);
125
126            // Set writer.[[closedPromise]] to a promise resolved with undefined.
127            // Note: new promise created in `new_inherited`.
128            self.closed_promise.borrow().resolve_native(&(), can_gc);
129            return Ok(());
130        }
131
132        // Otherwise,
133        // Assert: state is "errored".
134        assert!(stream.is_errored());
135
136        // Let storedError be stream.[[storedError]].
137        rooted!(in(*cx) let mut error = UndefinedValue());
138        stream.get_stored_error(error.handle_mut());
139
140        // Set writer.[[readyPromise]] to a promise rejected with stream.[[storedError]].
141        // Set writer.[[readyPromise]].[[PromiseIsHandled]] to true.
142        // Note: new promise created in `new_inherited`.
143        let ready_promise = self.ready_promise.borrow();
144        ready_promise.reject_native(&error.handle(), can_gc);
145        ready_promise.set_promise_is_handled();
146
147        // Set writer.[[closedPromise]] to a promise rejected with storedError.
148        // Set writer.[[closedPromise]].[[PromiseIsHandled]] to true.
149        // Note: new promise created in `new_inherited`.
150        let ready_promise = self.closed_promise.borrow();
151        ready_promise.reject_native(&error.handle(), can_gc);
152        ready_promise.set_promise_is_handled();
153
154        Ok(())
155    }
156
157    pub(crate) fn reject_closed_promise_with_stored_error(
158        &self,
159        cx: &mut js::context::JSContext,
160        error: &SafeHandleValue,
161    ) {
162        self.closed_promise
163            .borrow()
164            .reject_native(error, CanGc::from_cx(cx));
165    }
166
167    pub(crate) fn set_close_promise_is_handled(&self) {
168        self.closed_promise.borrow().set_promise_is_handled();
169    }
170
171    pub(crate) fn set_ready_promise(&self, promise: Rc<Promise>) {
172        *self.ready_promise.borrow_mut() = promise;
173    }
174
175    pub(crate) fn resolve_ready_promise_with_undefined(&self, can_gc: CanGc) {
176        self.ready_promise.borrow().resolve_native(&(), can_gc);
177    }
178
179    pub(crate) fn resolve_closed_promise_with_undefined(&self, can_gc: CanGc) {
180        self.closed_promise.borrow().resolve_native(&(), can_gc);
181    }
182
183    /// <https://streams.spec.whatwg.org/#writable-stream-default-writer-ensure-ready-promise-rejected>
184    pub(crate) fn ensure_ready_promise_rejected(
185        &self,
186        global: &GlobalScope,
187        error: SafeHandleValue,
188        can_gc: CanGc,
189    ) {
190        let ready_promise = self.ready_promise.borrow().clone();
191
192        // If writer.[[readyPromise]].[[PromiseState]] is "pending",
193        if ready_promise.is_pending() {
194            // reject writer.[[readyPromise]] with error.
195            ready_promise.reject_native(&error, can_gc);
196
197            // Set writer.[[readyPromise]].[[PromiseIsHandled]] to true.
198            ready_promise.set_promise_is_handled();
199        } else {
200            // Otherwise, set writer.[[readyPromise]] to a promise rejected with error.
201            let promise = Promise::new(global, can_gc);
202            promise.reject_native(&error, can_gc);
203
204            // Set writer.[[readyPromise]].[[PromiseIsHandled]] to true.
205            promise.set_promise_is_handled();
206            *self.ready_promise.borrow_mut() = promise;
207        }
208    }
209
210    /// <https://streams.spec.whatwg.org/#writable-stream-default-writer-ensure-closed-promise-rejected>
211    pub(crate) fn ensure_closed_promise_rejected(
212        &self,
213        global: &GlobalScope,
214        error: SafeHandleValue,
215        can_gc: CanGc,
216    ) {
217        let closed_promise = self.closed_promise.borrow().clone();
218
219        // If writer.[[closedPromise]].[[PromiseState]] is "pending",
220        if closed_promise.is_pending() {
221            // reject writer.[[closedPromise]] with error.
222            closed_promise.reject_native(&error, can_gc);
223
224            // Set writer.[[closedPromise]].[[PromiseIsHandled]] to true.
225            closed_promise.set_promise_is_handled();
226        } else {
227            // Otherwise, set writer.[[closedPromise]] to a promise rejected with error.
228            let promise = Promise::new(global, can_gc);
229            promise.reject_native(&error, can_gc);
230
231            // Set writer.[[closedPromise]].[[PromiseIsHandled]] to true.
232            promise.set_promise_is_handled();
233            *self.closed_promise.borrow_mut() = promise;
234        }
235    }
236
237    /// <https://streams.spec.whatwg.org/#writable-stream-default-writer-abort>
238    fn abort(
239        &self,
240        cx: &mut CurrentRealm,
241        global: &GlobalScope,
242        reason: SafeHandleValue,
243    ) -> Rc<Promise> {
244        // Let stream be writer.[[stream]].
245        let Some(stream) = self.stream.get() else {
246            // Assert: stream is not undefined.
247            unreachable!("Stream should be set.");
248        };
249
250        // Return ! WritableStreamAbort(stream, reason).
251        stream.abort(cx, global, reason)
252    }
253
254    /// <https://streams.spec.whatwg.org/#writable-stream-default-writer-close>
255    fn close(&self, cx: &mut js::context::JSContext, global: &GlobalScope) -> Rc<Promise> {
256        // Let stream be writer.[[stream]].
257        let Some(stream) = self.stream.get() else {
258            // Assert: stream is not undefined.
259            unreachable!("Stream should be set.");
260        };
261
262        // Return ! WritableStreamClose(stream).
263        stream.close(cx, global)
264    }
265
266    /// <https://streams.spec.whatwg.org/#writable-stream-default-writer-write>
267    pub(crate) fn write(
268        &self,
269        cx: &mut js::context::JSContext,
270        global: &GlobalScope,
271        chunk: SafeHandleValue,
272    ) -> Rc<Promise> {
273        // Let stream be writer.[[stream]].
274        let Some(stream) = self.stream.get() else {
275            // Assert: stream is not undefined.
276            unreachable!("Stream should be set.");
277        };
278
279        // Let controller be stream.[[controller]].
280        // Note: asserting controller is some.
281        let Some(controller) = stream.get_controller() else {
282            unreachable!("Controller should be set.");
283        };
284
285        // Let chunkSize be ! WritableStreamDefaultControllerGetChunkSize(controller, chunk).
286        let chunk_size = controller.get_chunk_size(cx, global, chunk);
287
288        // If stream is not equal to writer.[[stream]],
289        // return a promise rejected with a TypeError exception.
290        if !self
291            .stream
292            .get()
293            .is_some_and(|current_stream| current_stream == stream)
294        {
295            let promise = Promise::new2(cx, global);
296            promise.reject_error(
297                Error::Type(c"Stream is not equal to writer stream".to_owned()),
298                CanGc::from_cx(cx),
299            );
300            return promise;
301        }
302
303        // Let state be stream.[[state]].
304        // If state is "errored",
305        if stream.is_errored() {
306            // return a promise rejected with stream.[[storedError]].
307            rooted!(&in(cx) let mut error = UndefinedValue());
308            stream.get_stored_error(error.handle_mut());
309            let promise = Promise::new2(cx, global);
310            promise.reject_native(&error.handle(), CanGc::from_cx(cx));
311            return promise;
312        }
313
314        // If ! WritableStreamCloseQueuedOrInFlight(stream) is true
315        // or state is "closed",
316        if stream.close_queued_or_in_flight() || stream.is_closed() {
317            // return a promise rejected with a TypeError exception
318            // indicating that the stream is closing or closed
319            let promise = Promise::new2(cx, global);
320            promise.reject_error(
321                Error::Type(c"Stream has been closed, or has close queued or in-flight".to_owned()),
322                CanGc::from_cx(cx),
323            );
324            return promise;
325        }
326
327        // If state is "erroring",
328        if stream.is_erroring() {
329            // return a promise rejected with stream.[[storedError]].
330            rooted!(&in(cx) let mut error = UndefinedValue());
331            stream.get_stored_error(error.handle_mut());
332            let promise = Promise::new2(cx, global);
333            promise.reject_native(&error.handle(), CanGc::from_cx(cx));
334            return promise;
335        }
336
337        // Assert: state is "writable".
338        assert!(stream.is_writable());
339
340        // Let promise be ! WritableStreamAddWriteRequest(stream).
341        let promise = stream.add_write_request(global, CanGc::from_cx(cx));
342
343        // Perform ! WritableStreamDefaultControllerWrite(controller, chunk, chunkSize).
344        controller.write(cx, global, chunk, chunk_size);
345
346        // Return promise.
347        promise
348    }
349
350    /// <https://streams.spec.whatwg.org/#writable-stream-default-writer-release>
351    pub(crate) fn release(&self, cx: SafeJSContext, global: &GlobalScope, can_gc: CanGc) {
352        // Let stream be this.[[stream]].
353        let Some(stream) = self.stream.get() else {
354            // Assert: stream is not undefined.
355            unreachable!("Stream should be set.");
356        };
357
358        // Assert: stream.[[writer]] is writer.
359        assert!(stream.get_writer().is_some_and(|writer| &*writer == self));
360
361        // Let releasedError be a new TypeError.
362        let released_error = Error::Type(c"Writer has been released".to_owned());
363
364        // Root the js val of the error.
365        rooted!(in(*cx) let mut error = UndefinedValue());
366        released_error.to_jsval(cx, global, error.handle_mut(), can_gc);
367
368        // Perform ! WritableStreamDefaultWriterEnsureReadyPromiseRejected(writer, releasedError).
369        self.ensure_ready_promise_rejected(global, error.handle(), can_gc);
370
371        // Perform ! WritableStreamDefaultWriterEnsureClosedPromiseRejected(writer, releasedError).
372        self.ensure_closed_promise_rejected(global, error.handle(), can_gc);
373
374        // Set stream.[[writer]] to undefined.
375        stream.set_writer(None);
376
377        // Set this.[[stream]] to undefined.
378        self.stream.set(None);
379    }
380
381    /// <https://streams.spec.whatwg.org/#writable-stream-default-writer-close-with-error-propagation>
382    pub(crate) fn close_with_error_propagation(
383        &self,
384        cx: &mut js::context::JSContext,
385        global: &GlobalScope,
386    ) -> Rc<Promise> {
387        // Let stream be writer.[[stream]].
388        let Some(stream) = self.stream.get() else {
389            // Assert: stream is not undefined.
390            unreachable!("Stream should be set.");
391        };
392
393        // Let state be stream.[[state]].
394        // Used via stream method calls.
395
396        // If ! WritableStreamCloseQueuedOrInFlight(stream) is true
397        // or state is "closed",
398        if stream.close_queued_or_in_flight() || stream.is_closed() {
399            // return a promise resolved with undefined.
400            let promise = Promise::new2(cx, global);
401            promise.resolve_native(&(), CanGc::from_cx(cx));
402            return promise;
403        }
404
405        // If state is "errored",
406        if stream.is_errored() {
407            // return a promise rejected with stream.[[storedError]].
408            rooted!(&in(cx) let mut error = UndefinedValue());
409            stream.get_stored_error(error.handle_mut());
410            let promise = Promise::new2(cx, global);
411            promise.reject_native(&error.handle(), CanGc::from_cx(cx));
412            return promise;
413        }
414
415        // Assert: state is "writable" or "erroring".
416        assert!(stream.is_writable() || stream.is_erroring());
417
418        // Return ! WritableStreamDefaultWriterClose(writer).
419        self.close(cx, global)
420    }
421
422    pub(crate) fn get_stream(&self) -> Option<DomRoot<WritableStream>> {
423        self.stream.get()
424    }
425}
426
427impl WritableStreamDefaultWriterMethods<crate::DomTypeHolder> for WritableStreamDefaultWriter {
428    /// <https://streams.spec.whatwg.org/#default-writer-closed>
429    fn Closed(&self) -> Rc<Promise> {
430        // Return this.[[closedPromise]].
431        return self.closed_promise.borrow().clone();
432    }
433
434    /// <https://streams.spec.whatwg.org/#default-writer-desired-size>
435    fn GetDesiredSize(&self) -> Result<Option<f64>, Error> {
436        // If this.[[stream]] is undefined, throw a TypeError exception.
437        let Some(stream) = self.stream.get() else {
438            return Err(Error::Type(c"Stream is undefined".to_owned()));
439        };
440
441        // Return ! WritableStreamDefaultWriterGetDesiredSize(this).
442        Ok(stream.get_desired_size())
443    }
444
445    /// <https://streams.spec.whatwg.org/#default-writer-ready>
446    fn Ready(&self) -> Rc<Promise> {
447        // Return this.[[readyPromise]].
448        return self.ready_promise.borrow().clone();
449    }
450
451    /// <https://streams.spec.whatwg.org/#default-writer-abort>
452    fn Abort(&self, cx: &mut CurrentRealm, reason: SafeHandleValue) -> Rc<Promise> {
453        let global = GlobalScope::from_current_realm(cx);
454
455        // If this.[[stream]] is undefined,
456        if self.stream.get().is_none() {
457            // return a promise rejected with a TypeError exception.
458            let promise = Promise::new2(cx, &global);
459            promise.reject_error(
460                Error::Type(c"Stream is undefined".to_owned()),
461                CanGc::from_cx(cx),
462            );
463            return promise;
464        }
465
466        // Return ! WritableStreamDefaultWriterAbort(this, reason).
467        self.abort(cx, &global, reason)
468    }
469
470    /// <https://streams.spec.whatwg.org/#default-writer-close>
471    fn Close(&self, cx: &mut CurrentRealm) -> Rc<Promise> {
472        let global = GlobalScope::from_current_realm(cx);
473        let promise = Promise::new2(cx, &global);
474
475        // Let stream be this.[[stream]].
476        let Some(stream) = self.stream.get() else {
477            // If stream is undefined,
478            // return a promise rejected with a TypeError exception.
479            promise.reject_error(
480                Error::Type(c"Stream is undefined".to_owned()),
481                CanGc::from_cx(cx),
482            );
483            return promise;
484        };
485
486        // If ! WritableStreamCloseQueuedOrInFlight(stream) is true
487        if stream.close_queued_or_in_flight() {
488            // return a promise rejected with a TypeError exception.
489            promise.reject_error(
490                Error::Type(c"Stream has closed queued or in-flight".to_owned()),
491                CanGc::from_cx(cx),
492            );
493            return promise;
494        }
495
496        self.close(cx, &global)
497    }
498
499    /// <https://streams.spec.whatwg.org/#default-writer-release-lock>
500    fn ReleaseLock(&self, can_gc: CanGc) {
501        // Let stream be this.[[stream]].
502        let Some(stream) = self.stream.get() else {
503            // If stream is undefined, return.
504            return;
505        };
506
507        // Assert: stream.[[writer]] is not undefined.
508        assert!(stream.get_writer().is_some());
509
510        let global = self.global();
511        let cx = GlobalScope::get_cx();
512
513        // Perform ! WritableStreamDefaultWriterRelease(this).
514        self.release(cx, &global, can_gc);
515    }
516
517    /// <https://streams.spec.whatwg.org/#default-writer-write>
518    fn Write(&self, cx: &mut CurrentRealm, chunk: SafeHandleValue) -> Rc<Promise> {
519        let global = GlobalScope::from_current_realm(cx);
520
521        // If this.[[stream]] is undefined,
522        if self.stream.get().is_none() {
523            // return a promise rejected with a TypeError exception.
524            let promise = Promise::new2(cx, &global);
525            promise.reject_error(
526                Error::Type(c"Stream is undefined".to_owned()),
527                CanGc::from_cx(cx),
528            );
529            return promise;
530        }
531
532        // Return ! WritableStreamDefaultWriterWrite(this, chunk).
533        self.write(cx, &global, chunk)
534    }
535
536    /// <https://streams.spec.whatwg.org/#default-writer-constructor>
537    fn Constructor(
538        global: &GlobalScope,
539        proto: Option<SafeHandleObject>,
540        can_gc: CanGc,
541        stream: &WritableStream,
542    ) -> Result<DomRoot<WritableStreamDefaultWriter>, Error> {
543        let writer = WritableStreamDefaultWriter::new(global, proto, can_gc);
544
545        let cx = GlobalScope::get_cx();
546
547        // Perform ? SetUpWritableStreamDefaultWriter(this, stream).
548        writer.setup(cx, stream, can_gc)?;
549
550        Ok(writer)
551    }
552}