Skip to main content

script/dom/stream/
writablestreamdefaultwriter.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
4
5use std::cell::RefCell;
6use std::rc::Rc;
7
8use dom_struct::dom_struct;
9use js::context::JSContext;
10use js::jsval::UndefinedValue;
11use js::realm::CurrentRealm;
12use js::rust::{HandleObject as SafeHandleObject, HandleValue as SafeHandleValue};
13use script_bindings::reflector::{Reflector, reflect_dom_object_with_proto_and_cx};
14
15use crate::dom::bindings::codegen::Bindings::WritableStreamDefaultWriterBinding::WritableStreamDefaultWriterMethods;
16use crate::dom::bindings::error::{Error, ErrorToJsval};
17use crate::dom::bindings::reflector::DomGlobal;
18use crate::dom::bindings::root::{DomRoot, MutNullableDom};
19use crate::dom::globalscope::GlobalScope;
20use crate::dom::promise::Promise;
21use crate::dom::stream::writablestream::WritableStream;
22
23/// <https://streams.spec.whatwg.org/#writablestreamdefaultwriter>
24#[dom_struct]
25pub struct WritableStreamDefaultWriter {
26    reflector_: Reflector,
27
28    #[conditional_malloc_size_of]
29    ready_promise: RefCell<Rc<Promise>>,
30
31    /// <https://streams.spec.whatwg.org/#writablestreamdefaultwriter-closedpromise>
32    #[conditional_malloc_size_of]
33    closed_promise: RefCell<Rc<Promise>>,
34
35    /// <https://streams.spec.whatwg.org/#writablestreamdefaultwriter-stream>
36    stream: MutNullableDom<WritableStream>,
37}
38
39impl WritableStreamDefaultWriter {
40    /// <https://streams.spec.whatwg.org/#set-up-writable-stream-default-writer>
41    /// The parts that create a new promise.
42    fn new_inherited(cx: &mut CurrentRealm) -> WritableStreamDefaultWriter {
43        WritableStreamDefaultWriter {
44            reflector_: Reflector::new(),
45            stream: Default::default(),
46            closed_promise: RefCell::new(Promise::new_in_realm(cx)),
47            ready_promise: RefCell::new(Promise::new_in_realm(cx)),
48        }
49    }
50
51    pub(crate) fn new(
52        cx: &mut CurrentRealm,
53        global: &GlobalScope,
54        proto: Option<SafeHandleObject>,
55    ) -> DomRoot<WritableStreamDefaultWriter> {
56        reflect_dom_object_with_proto_and_cx(
57            Box::new(WritableStreamDefaultWriter::new_inherited(cx)),
58            global,
59            proto,
60            cx,
61        )
62    }
63
64    /// <https://streams.spec.whatwg.org/#set-up-writable-stream-default-writer>
65    /// Continuing from `new_inherited`, the rest.
66    pub(crate) fn setup(&self, cx: &mut JSContext, stream: &WritableStream) -> Result<(), Error> {
67        // If ! IsWritableStreamLocked(stream) is true, throw a TypeError exception.
68        if stream.is_locked() {
69            return Err(Error::Type(c"Stream is locked".to_owned()));
70        }
71
72        // Set writer.[[stream]] to stream.
73        self.stream.set(Some(stream));
74
75        // Set stream.[[writer]] to writer.
76        stream.set_writer(Some(self));
77
78        // Let state be stream.[[state]].
79
80        // If state is "writable",
81        if stream.is_writable() {
82            // If ! WritableStreamCloseQueuedOrInFlight(stream) is false
83            // and stream.[[backpressure]] is true,
84            if !stream.close_queued_or_in_flight() && stream.get_backpressure() {
85                // set writer.[[readyPromise]] to a new promise.
86                // Done in `new_inherited`.
87            } else {
88                // Otherwise, set writer.[[readyPromise]] to a promise resolved with undefined.
89                // Note: new promise created in `new_inherited`.
90                self.ready_promise.borrow().resolve_native(cx, &());
91            }
92
93            // Set writer.[[closedPromise]] to a new promise.
94            // Done in `new_inherited`.
95            return Ok(());
96        }
97
98        // Otherwise, if state is "erroring",
99        if stream.is_erroring() {
100            rooted!(&in(cx) let mut error = UndefinedValue());
101            stream.get_stored_error(error.handle_mut());
102
103            // Set writer.[[readyPromise]] to a promise rejected with stream.[[storedError]].
104            // Set writer.[[readyPromise]].[[PromiseIsHandled]] to true.
105            // Note: new promise created in `new_inherited`.
106            let ready_promise = self.ready_promise.borrow();
107            ready_promise.reject_native(cx, &error.handle());
108            ready_promise.set_promise_is_handled(cx);
109
110            // Set writer.[[closedPromise]] to a new promise.
111            // Done in `new_inherited`.
112            return Ok(());
113        }
114
115        // Otherwise, if state is "closed",
116        if stream.is_closed() {
117            // Set writer.[[readyPromise]] to a promise resolved with undefined.
118            // Note: new promise created in `new_inherited`.
119            self.ready_promise.borrow().resolve_native(cx, &());
120
121            // Set writer.[[closedPromise]] to a promise resolved with undefined.
122            // Note: new promise created in `new_inherited`.
123            self.closed_promise.borrow().resolve_native(cx, &());
124            return Ok(());
125        }
126
127        // Otherwise,
128        // Assert: state is "errored".
129        assert!(stream.is_errored());
130
131        // Let storedError be stream.[[storedError]].
132        rooted!(&in(cx) let mut error = UndefinedValue());
133        stream.get_stored_error(error.handle_mut());
134
135        // Set writer.[[readyPromise]] to a promise rejected with stream.[[storedError]].
136        // Set writer.[[readyPromise]].[[PromiseIsHandled]] to true.
137        // Note: new promise created in `new_inherited`.
138        let ready_promise = self.ready_promise.borrow();
139        ready_promise.reject_native(cx, &error.handle());
140        ready_promise.set_promise_is_handled(cx);
141
142        // Set writer.[[closedPromise]] to a promise rejected with storedError.
143        // Set writer.[[closedPromise]].[[PromiseIsHandled]] to true.
144        // Note: new promise created in `new_inherited`.
145        let ready_promise = self.closed_promise.borrow();
146        ready_promise.reject_native(cx, &error.handle());
147        ready_promise.set_promise_is_handled(cx);
148
149        Ok(())
150    }
151
152    pub(crate) fn reject_closed_promise_with_stored_error(
153        &self,
154        cx: &mut JSContext,
155        error: &SafeHandleValue,
156    ) {
157        self.closed_promise.borrow().reject_native(cx, error);
158    }
159
160    pub(crate) fn set_close_promise_is_handled(&self, cx: &mut JSContext) {
161        self.closed_promise.borrow().set_promise_is_handled(cx);
162    }
163
164    pub(crate) fn set_ready_promise(&self, promise: Rc<Promise>) {
165        *self.ready_promise.borrow_mut() = promise;
166    }
167
168    pub(crate) fn resolve_ready_promise_with_undefined(&self, cx: &mut JSContext) {
169        self.ready_promise.borrow().resolve_native(cx, &());
170    }
171
172    pub(crate) fn resolve_closed_promise_with_undefined(&self, cx: &mut JSContext) {
173        self.closed_promise.borrow().resolve_native(cx, &());
174    }
175
176    /// <https://streams.spec.whatwg.org/#writable-stream-default-writer-ensure-ready-promise-rejected>
177    pub(crate) fn ensure_ready_promise_rejected(
178        &self,
179        cx: &mut JSContext,
180        global: &GlobalScope,
181        error: SafeHandleValue,
182    ) {
183        let ready_promise = self.ready_promise.borrow().clone();
184
185        // If writer.[[readyPromise]].[[PromiseState]] is "pending",
186        if ready_promise.is_pending() {
187            // reject writer.[[readyPromise]] with error.
188            ready_promise.reject_native(cx, &error);
189
190            // Set writer.[[readyPromise]].[[PromiseIsHandled]] to true.
191            ready_promise.set_promise_is_handled(cx);
192        } else {
193            // Otherwise, set writer.[[readyPromise]] to a promise rejected with error.
194            let promise = Promise::new_rejected(cx, global, error);
195
196            // Set writer.[[readyPromise]].[[PromiseIsHandled]] to true.
197            promise.set_promise_is_handled(cx);
198            *self.ready_promise.borrow_mut() = promise;
199        }
200    }
201
202    /// <https://streams.spec.whatwg.org/#writable-stream-default-writer-ensure-closed-promise-rejected>
203    fn ensure_closed_promise_rejected(
204        &self,
205        cx: &mut JSContext,
206        global: &GlobalScope,
207        error: SafeHandleValue,
208    ) {
209        let closed_promise = self.closed_promise.borrow().clone();
210
211        // If writer.[[closedPromise]].[[PromiseState]] is "pending",
212        if closed_promise.is_pending() {
213            // reject writer.[[closedPromise]] with error.
214            closed_promise.reject_native(cx, &error);
215
216            // Set writer.[[closedPromise]].[[PromiseIsHandled]] to true.
217            closed_promise.set_promise_is_handled(cx);
218        } else {
219            // Otherwise, set writer.[[closedPromise]] to a promise rejected with error.
220            let promise = Promise::new_rejected(cx, global, error);
221
222            // Set writer.[[closedPromise]].[[PromiseIsHandled]] to true.
223            promise.set_promise_is_handled(cx);
224            *self.closed_promise.borrow_mut() = promise;
225        }
226    }
227
228    /// <https://streams.spec.whatwg.org/#writable-stream-default-writer-abort>
229    fn abort(
230        &self,
231        cx: &mut CurrentRealm,
232        global: &GlobalScope,
233        reason: SafeHandleValue,
234    ) -> Rc<Promise> {
235        // Let stream be writer.[[stream]].
236        let Some(stream) = self.stream.get() else {
237            // Assert: stream is not undefined.
238            unreachable!("Stream should be set.");
239        };
240
241        // Return ! WritableStreamAbort(stream, reason).
242        stream.abort(cx, global, reason)
243    }
244
245    /// <https://streams.spec.whatwg.org/#writable-stream-default-writer-close>
246    fn close(&self, cx: &mut JSContext, global: &GlobalScope) -> Rc<Promise> {
247        // Let stream be writer.[[stream]].
248        let Some(stream) = self.stream.get() else {
249            // Assert: stream is not undefined.
250            unreachable!("Stream should be set.");
251        };
252
253        // Return ! WritableStreamClose(stream).
254        stream.close(cx, global)
255    }
256
257    /// <https://streams.spec.whatwg.org/#writable-stream-default-writer-write>
258    pub(crate) fn write(
259        &self,
260        cx: &mut JSContext,
261        global: &GlobalScope,
262        chunk: SafeHandleValue,
263    ) -> Rc<Promise> {
264        // Let stream be writer.[[stream]].
265        let Some(stream) = self.stream.get() else {
266            // Assert: stream is not undefined.
267            unreachable!("Stream should be set.");
268        };
269
270        // Let controller be stream.[[controller]].
271        // Note: asserting controller is some.
272        let Some(controller) = stream.get_controller() else {
273            unreachable!("Controller should be set.");
274        };
275
276        // Let chunkSize be ! WritableStreamDefaultControllerGetChunkSize(controller, chunk).
277        let chunk_size = controller.get_chunk_size(cx, global, chunk);
278
279        // If stream is not equal to writer.[[stream]],
280        // return a promise rejected with a TypeError exception.
281        if !self
282            .stream
283            .get()
284            .is_some_and(|current_stream| current_stream == stream)
285        {
286            let promise = Promise::new(cx, global);
287            promise.reject_error(
288                cx,
289                Error::Type(c"Stream is not equal to writer stream".to_owned()),
290            );
291            return promise;
292        }
293
294        // Let state be stream.[[state]].
295        // If state is "errored",
296        if stream.is_errored() {
297            // return a promise rejected with stream.[[storedError]].
298            rooted!(&in(cx) let mut error = UndefinedValue());
299            stream.get_stored_error(error.handle_mut());
300            let promise = Promise::new(cx, global);
301            promise.reject_native(cx, &error.handle());
302            return promise;
303        }
304
305        // If ! WritableStreamCloseQueuedOrInFlight(stream) is true
306        // or state is "closed",
307        if stream.close_queued_or_in_flight() || stream.is_closed() {
308            // return a promise rejected with a TypeError exception
309            // indicating that the stream is closing or closed
310            let promise = Promise::new(cx, global);
311            promise.reject_error(
312                cx,
313                Error::Type(c"Stream has been closed, or has close queued or in-flight".to_owned()),
314            );
315            return promise;
316        }
317
318        // If state is "erroring",
319        if stream.is_erroring() {
320            // return a promise rejected with stream.[[storedError]].
321            rooted!(&in(cx) let mut error = UndefinedValue());
322            stream.get_stored_error(error.handle_mut());
323            let promise = Promise::new(cx, global);
324            promise.reject_native(cx, &error.handle());
325            return promise;
326        }
327
328        // Assert: state is "writable".
329        assert!(stream.is_writable());
330
331        // Let promise be ! WritableStreamAddWriteRequest(stream).
332        let promise = stream.add_write_request(cx, global);
333
334        // Perform ! WritableStreamDefaultControllerWrite(controller, chunk, chunkSize).
335        controller.write(cx, global, chunk, chunk_size);
336
337        // Return promise.
338        promise
339    }
340
341    /// <https://streams.spec.whatwg.org/#writable-stream-default-writer-release>
342    pub(crate) fn release(&self, cx: &mut JSContext, global: &GlobalScope) {
343        // Let stream be this.[[stream]].
344        let Some(stream) = self.stream.get() else {
345            // Assert: stream is not undefined.
346            unreachable!("Stream should be set.");
347        };
348
349        // Assert: stream.[[writer]] is writer.
350        assert!(stream.get_writer().is_some_and(|writer| &*writer == self));
351
352        // Let releasedError be a new TypeError.
353        let released_error = Error::Type(c"Writer has been released".to_owned());
354
355        // Root the js val of the error.
356        rooted!(&in(cx) let mut error = UndefinedValue());
357        released_error.to_jsval(cx, global, error.handle_mut());
358
359        // Perform ! WritableStreamDefaultWriterEnsureReadyPromiseRejected(writer, releasedError).
360        self.ensure_ready_promise_rejected(cx, global, error.handle());
361
362        // Perform ! WritableStreamDefaultWriterEnsureClosedPromiseRejected(writer, releasedError).
363        self.ensure_closed_promise_rejected(cx, global, error.handle());
364
365        // Set stream.[[writer]] to undefined.
366        stream.set_writer(None);
367
368        // Set this.[[stream]] to undefined.
369        self.stream.set(None);
370    }
371
372    /// <https://streams.spec.whatwg.org/#writable-stream-default-writer-close-with-error-propagation>
373    pub(crate) fn close_with_error_propagation(
374        &self,
375        cx: &mut JSContext,
376        global: &GlobalScope,
377    ) -> Rc<Promise> {
378        // Let stream be writer.[[stream]].
379        let Some(stream) = self.stream.get() else {
380            // Assert: stream is not undefined.
381            unreachable!("Stream should be set.");
382        };
383
384        // Let state be stream.[[state]].
385        // Used via stream method calls.
386
387        // If ! WritableStreamCloseQueuedOrInFlight(stream) is true
388        // or state is "closed",
389        if stream.close_queued_or_in_flight() || stream.is_closed() {
390            // return a promise resolved with undefined.
391            let promise = Promise::new(cx, global);
392            promise.resolve_native(cx, &());
393            return promise;
394        }
395
396        // If state is "errored",
397        if stream.is_errored() {
398            // return a promise rejected with stream.[[storedError]].
399            rooted!(&in(cx) let mut error = UndefinedValue());
400            stream.get_stored_error(error.handle_mut());
401            let promise = Promise::new(cx, global);
402            promise.reject_native(cx, &error.handle());
403            return promise;
404        }
405
406        // Assert: state is "writable" or "erroring".
407        assert!(stream.is_writable() || stream.is_erroring());
408
409        // Return ! WritableStreamDefaultWriterClose(writer).
410        self.close(cx, global)
411    }
412
413    pub(crate) fn get_stream(&self) -> Option<DomRoot<WritableStream>> {
414        self.stream.get()
415    }
416}
417
418impl WritableStreamDefaultWriterMethods<crate::DomTypeHolder> for WritableStreamDefaultWriter {
419    /// <https://streams.spec.whatwg.org/#default-writer-closed>
420    fn Closed(&self) -> Rc<Promise> {
421        // Return this.[[closedPromise]].
422        return self.closed_promise.borrow().clone();
423    }
424
425    /// <https://streams.spec.whatwg.org/#default-writer-desired-size>
426    fn GetDesiredSize(&self) -> Result<Option<f64>, Error> {
427        // If this.[[stream]] is undefined, throw a TypeError exception.
428        let Some(stream) = self.stream.get() else {
429            return Err(Error::Type(c"Stream is undefined".to_owned()));
430        };
431
432        // Return ! WritableStreamDefaultWriterGetDesiredSize(this).
433        Ok(stream.get_desired_size())
434    }
435
436    /// <https://streams.spec.whatwg.org/#default-writer-ready>
437    fn Ready(&self) -> Rc<Promise> {
438        // Return this.[[readyPromise]].
439        return self.ready_promise.borrow().clone();
440    }
441
442    /// <https://streams.spec.whatwg.org/#default-writer-abort>
443    fn Abort(&self, cx: &mut CurrentRealm, reason: SafeHandleValue) -> Rc<Promise> {
444        let global = GlobalScope::from_current_realm(cx);
445
446        // If this.[[stream]] is undefined,
447        if self.stream.get().is_none() {
448            // return a promise rejected with a TypeError exception.
449            let promise = Promise::new(cx, &global);
450            promise.reject_error(cx, Error::Type(c"Stream is undefined".to_owned()));
451            return promise;
452        }
453
454        // Return ! WritableStreamDefaultWriterAbort(this, reason).
455        self.abort(cx, &global, reason)
456    }
457
458    /// <https://streams.spec.whatwg.org/#default-writer-close>
459    fn Close(&self, cx: &mut CurrentRealm) -> Rc<Promise> {
460        let global = GlobalScope::from_current_realm(cx);
461        let promise = Promise::new(cx, &global);
462
463        // Let stream be this.[[stream]].
464        let Some(stream) = self.stream.get() else {
465            // If stream is undefined,
466            // return a promise rejected with a TypeError exception.
467            promise.reject_error(cx, Error::Type(c"Stream is undefined".to_owned()));
468            return promise;
469        };
470
471        // If ! WritableStreamCloseQueuedOrInFlight(stream) is true
472        if stream.close_queued_or_in_flight() {
473            // return a promise rejected with a TypeError exception.
474            promise.reject_error(
475                cx,
476                Error::Type(c"Stream has closed queued or in-flight".to_owned()),
477            );
478            return promise;
479        }
480
481        self.close(cx, &global)
482    }
483
484    /// <https://streams.spec.whatwg.org/#default-writer-release-lock>
485    fn ReleaseLock(&self, cx: &mut JSContext) {
486        // Let stream be this.[[stream]].
487        let Some(stream) = self.stream.get() else {
488            // If stream is undefined, return.
489            return;
490        };
491
492        // Assert: stream.[[writer]] is not undefined.
493        assert!(stream.get_writer().is_some());
494
495        let global = self.global();
496
497        // Perform ! WritableStreamDefaultWriterRelease(this).
498        self.release(cx, &global);
499    }
500
501    /// <https://streams.spec.whatwg.org/#default-writer-write>
502    fn Write(&self, cx: &mut CurrentRealm, chunk: SafeHandleValue) -> Rc<Promise> {
503        let global = GlobalScope::from_current_realm(cx);
504
505        // If this.[[stream]] is undefined,
506        if self.stream.get().is_none() {
507            // return a promise rejected with a TypeError exception.
508            let promise = Promise::new(cx, &global);
509            promise.reject_error(cx, Error::Type(c"Stream is undefined".to_owned()));
510            return promise;
511        }
512
513        // Return ! WritableStreamDefaultWriterWrite(this, chunk).
514        self.write(cx, &global, chunk)
515    }
516
517    /// <https://streams.spec.whatwg.org/#default-writer-constructor>
518    fn Constructor(
519        cx: &mut CurrentRealm,
520        global: &GlobalScope,
521        proto: Option<SafeHandleObject>,
522        stream: &WritableStream,
523    ) -> Result<DomRoot<WritableStreamDefaultWriter>, Error> {
524        let writer = WritableStreamDefaultWriter::new(cx, global, proto);
525
526        // Perform ? SetUpWritableStreamDefaultWriter(this, stream).
527        writer.setup(cx, stream)?;
528
529        Ok(writer)
530    }
531}