script/dom/
transformstreamdefaultcontroller.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
4
5use std::cell::RefCell;
6use std::rc::Rc;
7
8use dom_struct::dom_struct;
9use js::jsapi::{
10    ExceptionStackBehavior, Heap, JS_IsExceptionPending, JS_SetPendingException, JSObject,
11};
12use js::jsval::UndefinedValue;
13use js::rust::{HandleObject as SafeHandleObject, HandleValue as SafeHandleValue};
14
15use super::bindings::cell::DomRefCell;
16use super::bindings::codegen::Bindings::TransformerBinding::{
17    TransformerCancelCallback, TransformerFlushCallback, TransformerTransformCallback,
18};
19use super::types::TransformStream;
20use crate::dom::bindings::callback::ExceptionHandling;
21use crate::dom::bindings::codegen::Bindings::TransformStreamDefaultControllerBinding::TransformStreamDefaultControllerMethods;
22use crate::dom::bindings::codegen::Bindings::TransformerBinding::Transformer;
23use crate::dom::bindings::error::{Error, ErrorToJsval, Fallible};
24use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object};
25use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom};
26use crate::dom::compressionstream::{
27    CompressionStream, compress_and_enqueue_a_chunk, compress_flush_and_enqueue,
28};
29use crate::dom::decompressionstream::{
30    decompress_and_enqueue_a_chunk, decompress_flush_and_enqueue,
31};
32use crate::dom::globalscope::GlobalScope;
33use crate::dom::promise::Promise;
34use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
35use crate::dom::textdecodercommon::TextDecoderCommon;
36use crate::dom::textdecoderstream::{decode_and_enqueue_a_chunk, flush_and_enqueue};
37use crate::dom::textencoderstream::{Encoder, encode_and_enqueue_a_chunk, encode_and_flush};
38use crate::dom::types::DecompressionStream;
39use crate::realms::{InRealm, enter_realm};
40use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
41
42impl js::gc::Rootable for TransformTransformPromiseRejection {}
43
44/// Reacting to transformPromise as part of
45/// <https://streams.spec.whatwg.org/#transform-stream-default-controller-perform-transform>
46#[derive(JSTraceable, MallocSizeOf)]
47#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
48struct TransformTransformPromiseRejection {
49    controller: Dom<TransformStreamDefaultController>,
50}
51
52impl Callback for TransformTransformPromiseRejection {
53    /// Reacting to transformPromise with the following fulfillment steps:
54    fn callback(&self, cx: SafeJSContext, v: SafeHandleValue, _realm: InRealm, can_gc: CanGc) {
55        // Perform ! TransformStreamError(controller.[[stream]], r).
56        self.controller
57            .error(cx, &self.controller.global(), v, can_gc);
58
59        // Throw r.
60        // Note: this is done part of perform_transform().
61    }
62}
63
64/// The type of transformer algorithms we are using
65#[derive(JSTraceable)]
66pub(crate) enum TransformerType {
67    /// Algorithms provided by Js callbacks
68    Js {
69        /// <https://streams.spec.whatwg.org/#transformstreamdefaultcontroller-cancelalgorithm>
70        cancel: RefCell<Option<Rc<TransformerCancelCallback>>>,
71
72        /// <https://streams.spec.whatwg.org/#transformstreamdefaultcontroller-flushalgorithm>
73        flush: RefCell<Option<Rc<TransformerFlushCallback>>>,
74
75        /// <https://streams.spec.whatwg.org/#transformstreamdefaultcontroller-transformalgorithm>
76        transform: RefCell<Option<Rc<TransformerTransformCallback>>>,
77
78        /// The JS object used as `this` when invoking sink algorithms.
79        transform_obj: Heap<*mut JSObject>,
80    },
81    /// Algorithms supporting `TextDecoderStream` are implemented in Rust
82    ///
83    /// <https://encoding.spec.whatwg.org/#textdecodercommon>
84    Decoder(Rc<TextDecoderCommon>),
85    /// Algorithms supporting `TextEncoderStream` are implemented in Rust
86    ///
87    /// <https://encoding.spec.whatwg.org/#textencoderstream-encoder>
88    Encoder(Encoder),
89    /// Algorithms supporting `CompressionStream` are implemented in Rust
90    ///
91    /// <https://compression.spec.whatwg.org/#compressionstream>
92    Compressor(DomRoot<CompressionStream>),
93    /// Algorithms supporting `DecompressionStream` are implemented in Rust
94    ///
95    /// <https://compression.spec.whatwg.org/#decompressionstream>
96    Decompressor(DomRoot<DecompressionStream>),
97}
98
99impl TransformerType {
100    pub(crate) fn new_from_js_transformer(transformer: &Transformer) -> TransformerType {
101        TransformerType::Js {
102            cancel: RefCell::new(transformer.cancel.clone()),
103            flush: RefCell::new(transformer.flush.clone()),
104            transform: RefCell::new(transformer.transform.clone()),
105            transform_obj: Default::default(),
106        }
107    }
108}
109
110/// <https://streams.spec.whatwg.org/#transformstreamdefaultcontroller>
111#[dom_struct]
112pub struct TransformStreamDefaultController {
113    reflector_: Reflector,
114
115    /// The type of the underlying transformer used. Besides the JS variant,
116    /// there will be other variant(s) for `TextDecoderStream`
117    #[ignore_malloc_size_of = "transformer_type"]
118    transformer_type: TransformerType,
119
120    /// <https://streams.spec.whatwg.org/#TransformStreamDefaultController-stream>
121    stream: MutNullableDom<TransformStream>,
122
123    /// <https://streams.spec.whatwg.org/#transformstreamdefaultcontroller-finishpromise>
124    #[conditional_malloc_size_of]
125    finish_promise: DomRefCell<Option<Rc<Promise>>>,
126}
127
128impl TransformStreamDefaultController {
129    #[cfg_attr(crown, allow(crown::unrooted_must_root))]
130    fn new_inherited(transformer_type: TransformerType) -> TransformStreamDefaultController {
131        TransformStreamDefaultController {
132            reflector_: Reflector::new(),
133            transformer_type,
134            stream: MutNullableDom::new(None),
135            finish_promise: DomRefCell::new(None),
136        }
137    }
138
139    #[cfg_attr(crown, allow(crown::unrooted_must_root))]
140    pub(crate) fn new(
141        global: &GlobalScope,
142        transformer_type: TransformerType,
143        can_gc: CanGc,
144    ) -> DomRoot<TransformStreamDefaultController> {
145        reflect_dom_object(
146            Box::new(TransformStreamDefaultController::new_inherited(
147                transformer_type,
148            )),
149            global,
150            can_gc,
151        )
152    }
153
154    /// Setting the JS object after the heap has settled down.
155    ///
156    /// Note that this has no effect if the transformer type is not `TransformerType::Js`
157    pub(crate) fn set_transform_obj(&self, this_object: SafeHandleObject) {
158        if let TransformerType::Js { transform_obj, .. } = &self.transformer_type {
159            transform_obj.set(*this_object)
160        } else {
161            unreachable!("Non-Js transformer type should not set transform_obj")
162        }
163    }
164
165    pub(crate) fn set_stream(&self, stream: &TransformStream) {
166        self.stream.set(Some(stream));
167    }
168
169    pub(crate) fn get_finish_promise(&self) -> Option<Rc<Promise>> {
170        self.finish_promise.borrow().clone()
171    }
172
173    pub(crate) fn set_finish_promise(&self, promise: Rc<Promise>) {
174        *self.finish_promise.borrow_mut() = Some(promise);
175    }
176
177    /// <https://streams.spec.whatwg.org/#transform-stream-default-controller-perform-transform>
178    pub(crate) fn transform_stream_default_controller_perform_transform(
179        &self,
180        cx: SafeJSContext,
181        global: &GlobalScope,
182        chunk: SafeHandleValue,
183        can_gc: CanGc,
184    ) -> Fallible<Rc<Promise>> {
185        // Let transformPromise be the result of performing controller.[[transformAlgorithm]], passing chunk.
186        let transform_promise = self.perform_transform(cx, global, chunk, can_gc)?;
187
188        // Return the result of reacting to transformPromise with the following rejection steps given the argument r:
189        rooted!(in(*cx) let mut reject_handler = Some(TransformTransformPromiseRejection {
190            controller: Dom::from_ref(self),
191        }));
192
193        let handler = PromiseNativeHandler::new(
194            global,
195            None,
196            reject_handler.take().map(|h| Box::new(h) as Box<_>),
197            can_gc,
198        );
199        let realm = enter_realm(global);
200        let comp = InRealm::Entered(&realm);
201        transform_promise.append_native_handler(&handler, comp, can_gc);
202
203        Ok(transform_promise)
204    }
205
206    pub(crate) fn perform_transform(
207        &self,
208        cx: SafeJSContext,
209        global: &GlobalScope,
210        chunk: SafeHandleValue,
211        can_gc: CanGc,
212    ) -> Fallible<Rc<Promise>> {
213        let result = match &self.transformer_type {
214            // <https://streams.spec.whatwg.org/#set-up-transform-stream-default-controller-from-transformer>
215            TransformerType::Js {
216                transform,
217                transform_obj,
218                ..
219            } => {
220                // Step 5. If transformerDict["transform"] exists, set
221                // transformAlgorithm to an algorithm which takes an argument
222                // chunk and returns the result of invoking
223                // transformerDict["transform"] with argument list « chunk,
224                // controller » and callback this value transformer.
225                let algo = transform.borrow().clone();
226                if let Some(transform) = algo {
227                    rooted!(in(*cx) let this_object = transform_obj.get());
228                    transform
229                        .Call_(
230                            &this_object.handle(),
231                            chunk,
232                            self,
233                            ExceptionHandling::Rethrow,
234                            can_gc,
235                        )
236                        .unwrap_or_else(|e| {
237                            let p = Promise::new(global, can_gc);
238                            p.reject_error(e, can_gc);
239                            p
240                        })
241                } else {
242                    // Step 2. Let transformAlgorithm be the following steps, taking a chunk argument:
243                    // Let result be TransformStreamDefaultControllerEnqueue(controller, chunk).
244                    // If result is an abrupt completion, return a promise rejected with result.[[Value]].
245                    if let Err(error) = self.enqueue(cx, global, chunk, can_gc) {
246                        rooted!(in(*cx) let mut error_val = UndefinedValue());
247                        error.to_jsval(cx, global, error_val.handle_mut(), can_gc);
248                        Promise::new_rejected(global, cx, error_val.handle(), can_gc)
249                    } else {
250                        // Otherwise, return a promise resolved with undefined.
251                        Promise::new_resolved(global, cx, (), can_gc)
252                    }
253                }
254            },
255            TransformerType::Decoder(decoder) => {
256                // <https://encoding.spec.whatwg.org/#dom-textdecoderstream>
257                // Step 7. Let transformAlgorithm be an algorithm which takes a
258                // chunk argument and runs the decode and enqueue a chunk
259                // algorithm with this and chunk.
260                decode_and_enqueue_a_chunk(cx, global, chunk, decoder, self, can_gc)
261                    // <https://streams.spec.whatwg.org/#transformstream-set-up>
262                    // Step 5. Let transformAlgorithmWrapper be an algorithm that runs these steps given a value chunk:
263                    // Step 5.1 Let result be the result of running transformAlgorithm given chunk.
264                    // Step 5.2 If result is a Promise, then return result.
265                    // Note: not applicable, the spec does NOT require deode_and_enqueue_a_chunk() to return a Promise
266                    // Step 5.3 Return a promise resolved with undefined.
267                    .map(|_| Promise::new_resolved(global, cx, (), can_gc))
268                    .unwrap_or_else(|e| {
269                        // <https://streams.spec.whatwg.org/#transformstream-set-up>
270                        // Step 5.1 If this throws an exception e,
271                        let realm = enter_realm(self);
272                        let p = Promise::new_in_current_realm((&realm).into(), can_gc);
273                        // return a promise rejected with e.
274                        p.reject_error(e, can_gc);
275                        p
276                    })
277            },
278            TransformerType::Encoder(encoder) => {
279                // <https://encoding.spec.whatwg.org/#dom-textencoderstream>
280                // Step 2. Let transformAlgorithm be an algorithm which takes a chunk argument and runs the encode
281                //      and enqueue a chunk algorithm with this and chunk.
282                encode_and_enqueue_a_chunk(cx, global, chunk, encoder, self, can_gc)
283                    // <https://streams.spec.whatwg.org/#transformstream-set-up>
284                    // Step 5. Let transformAlgorithmWrapper be an algorithm that runs these steps given a value chunk:
285                    // Step 5.1 Let result be the result of running transformAlgorithm given chunk.
286                    // Step 5.2 If result is a Promise, then return result.
287                    // Note: not applicable, the spec does NOT require encode_and_enqueue_a_chunk() to return a Promise
288                    // Step 5.3 Return a promise resolved with undefined.
289                    .map(|_| Promise::new_resolved(global, cx, (), can_gc))
290                    .unwrap_or_else(|e| {
291                        // <https://streams.spec.whatwg.org/#transformstream-set-up>
292                        // Step 5.1 If this throws an exception e,
293                        let realm = enter_realm(self);
294                        let p = Promise::new_in_current_realm((&realm).into(), can_gc);
295                        // return a promise rejected with e.
296                        p.reject_error(e, can_gc);
297                        p
298                    })
299            },
300            TransformerType::Compressor(cs) => {
301                // <https://compression.spec.whatwg.org/#dom-compressionstream-compressionstream>
302                // Step 3. Let transformAlgorithm be an algorithm which takes a chunk argument and
303                // runs the compress and enqueue a chunk algorithm with this and chunk.
304                compress_and_enqueue_a_chunk(cx, global, cs, chunk, self, can_gc)
305                    // <https://streams.spec.whatwg.org/#transformstream-set-up>
306                    // Step 5. Let transformAlgorithmWrapper be an algorithm that runs these steps given a value chunk:
307                    // Step 5.1 Let result be the result of running transformAlgorithm given chunk.
308                    // Step 5.2 If result is a Promise, then return result.
309                    // Note: not applicable, the spec does NOT require
310                    // compress_and_enqueue_a_chunk() to return a Promise.
311                    // Step 5.3 Return a promise resolved with undefined.
312                    .map(|_| Promise::new_resolved(global, cx, (), can_gc))
313                    .unwrap_or_else(|e| {
314                        // <https://streams.spec.whatwg.org/#transformstream-set-up>
315                        // Step 5.1 If this throws an exception e,
316                        let realm = enter_realm(self);
317                        let p = Promise::new_in_current_realm((&realm).into(), can_gc);
318                        // return a promise rejected with e.
319                        p.reject_error(e, can_gc);
320                        p
321                    })
322            },
323            TransformerType::Decompressor(ds) => {
324                // <https://compression.spec.whatwg.org/#dom-decompressionstream-decompressionstream>
325                // Step 3. Let transformAlgorithm be an algorithm which takes a chunk argument and
326                // runs the decompress and enqueue a chunk algorithm with this and chunk.
327                decompress_and_enqueue_a_chunk(cx, global, ds, chunk, self, can_gc)
328                    // <https://streams.spec.whatwg.org/#transformstream-set-up>
329                    // Step 5. Let transformAlgorithmWrapper be an algorithm that runs these steps given a value chunk:
330                    // Step 5.1 Let result be the result of running transformAlgorithm given chunk.
331                    // Step 5.2 If result is a Promise, then return result.
332                    // Note: not applicable, the spec does NOT require
333                    // decompress_and_enqueue_a_chunk() to return a Promise
334                    // Step 5.3 Return a promise resolved with undefined.
335                    .map(|_| Promise::new_resolved(global, cx, (), can_gc))
336                    .unwrap_or_else(|e| {
337                        // <https://streams.spec.whatwg.org/#transformstream-set-up>
338                        // Step 5.1 If this throws an exception e,
339                        let realm = enter_realm(self);
340                        let p = Promise::new_in_current_realm((&realm).into(), can_gc);
341                        // return a promise rejected with e.
342                        p.reject_error(e, can_gc);
343                        p
344                    })
345            },
346        };
347
348        Ok(result)
349    }
350
351    pub(crate) fn perform_cancel(
352        &self,
353        cx: SafeJSContext,
354        global: &GlobalScope,
355        chunk: SafeHandleValue,
356        can_gc: CanGc,
357    ) -> Fallible<Rc<Promise>> {
358        let result = match &self.transformer_type {
359            // <https://streams.spec.whatwg.org/#set-up-transform-stream-default-controller-from-transformer>
360            TransformerType::Js {
361                cancel,
362                transform_obj,
363                ..
364            } => {
365                // Step 7. If transformerDict["cancel"] exists, set
366                // cancelAlgorithm to an algorithm which takes an argument
367                // reason and returns the result of invoking
368                // transformerDict["cancel"] with argument list « reason » and
369                // callback this value transformer.
370                let algo = cancel.borrow().clone();
371                if let Some(cancel) = algo {
372                    rooted!(in(*cx) let this_object = transform_obj.get());
373                    cancel
374                        .Call_(
375                            &this_object.handle(),
376                            chunk,
377                            ExceptionHandling::Rethrow,
378                            can_gc,
379                        )
380                        .unwrap_or_else(|e| {
381                            let p = Promise::new(global, can_gc);
382                            p.reject_error(e, can_gc);
383                            p
384                        })
385                } else {
386                    // Step 4. Let cancelAlgorithm be an algorithm which returns a promise resolved with undefined.
387                    Promise::new_resolved(global, cx, (), can_gc)
388                }
389            },
390            TransformerType::Decoder(_) => {
391                // <https://streams.spec.whatwg.org/#transformstream-set-up>
392                // Step 7. Let cancelAlgorithmWrapper be an algorithm that runs these steps given a value reason:
393                // Step 7.1 Let result be the result of running cancelAlgorithm given reason,
394                //      if cancelAlgorithm was given, or null otherwise
395                // Note: `TextDecoderStream` does NOT specify a cancel algorithm.
396                // Step 7.2 If result is a Promise, then return result.
397                // Note: Not applicable.
398                // Step 7.3 Return a promise resolved with undefined.
399                Promise::new_resolved(global, cx, (), can_gc)
400            },
401            TransformerType::Encoder(_) => {
402                // <https://streams.spec.whatwg.org/#transformstream-set-up>
403                // Step 7. Let cancelAlgorithmWrapper be an algorithm that runs these steps given a value reason:
404                // Step 7.1 Let result be the result of running cancelAlgorithm given reason,
405                //      if cancelAlgorithm was given, or null otherwise
406                // Note: `TextDecoderStream` does NOT specify a cancel algorithm.
407                // Step 7.2 If result is a Promise, then return result.
408                // Note: Not applicable.
409                // Step 7.3 Return a promise resolved with undefined.
410                Promise::new_resolved(global, cx, (), can_gc)
411            },
412            TransformerType::Compressor(_) => {
413                // <https://streams.spec.whatwg.org/#transformstream-set-up>
414                // Step 7. Let cancelAlgorithmWrapper be an algorithm that runs these steps given a value reason:
415                // Step 7.1 Let result be the result of running cancelAlgorithm given reason,
416                //      if cancelAlgorithm was given, or null otherwise
417                // Note: `CompressionStream` does NOT specify a cancel algorithm.
418                // Step 7.2 If result is a Promise, then return result.
419                // Note: Not applicable.
420                // Step 7.3 Return a promise resolved with undefined.
421                Promise::new_resolved(global, cx, (), can_gc)
422            },
423            TransformerType::Decompressor(_) => {
424                // <https://streams.spec.whatwg.org/#transformstream-set-up>
425                // Step 7. Let cancelAlgorithmWrapper be an algorithm that runs these steps given a value reason:
426                // Step 7.1 Let result be the result of running cancelAlgorithm given reason,
427                //      if cancelAlgorithm was given, or null otherwise
428                // Note: `DecompressionStream` does NOT specify a cancel algorithm.
429                // Step 7.2 If result is a Promise, then return result.
430                // Note: Not applicable.
431                // Step 7.3 Return a promise resolved with undefined.
432                Promise::new_resolved(global, cx, (), can_gc)
433            },
434        };
435
436        Ok(result)
437    }
438
439    pub(crate) fn perform_flush(
440        &self,
441        cx: SafeJSContext,
442        global: &GlobalScope,
443        can_gc: CanGc,
444    ) -> Fallible<Rc<Promise>> {
445        let result = match &self.transformer_type {
446            // <https://streams.spec.whatwg.org/#set-up-transform-stream-default-controller-from-transformer>
447            TransformerType::Js {
448                flush,
449                transform_obj,
450                ..
451            } => {
452                // Step 6. If transformerDict["flush"] exists, set flushAlgorithm to an
453                // algorithm which returns the result of invoking
454                // transformerDict["flush"] with argument list « controller »
455                // and callback this value transformer.
456                let algo = flush.borrow().clone();
457                if let Some(flush) = algo {
458                    rooted!(in(*cx) let this_object = transform_obj.get());
459                    flush
460                        .Call_(
461                            &this_object.handle(),
462                            self,
463                            ExceptionHandling::Rethrow,
464                            can_gc,
465                        )
466                        .unwrap_or_else(|e| {
467                            let p = Promise::new(global, can_gc);
468                            p.reject_error(e, can_gc);
469                            p
470                        })
471                } else {
472                    // Step 3. Let flushAlgorithm be an algorithm which returns a promise resolved with undefined.
473                    Promise::new_resolved(global, cx, (), can_gc)
474                }
475            },
476            TransformerType::Decoder(decoder) => {
477                // <https://encoding.spec.whatwg.org/#dom-textdecoderstream>
478                // Step 8. Let flushAlgorithm be an algorithm which takes no
479                // arguments and runs the flush and enqueue algorithm with this.
480                flush_and_enqueue(cx, global, decoder, self, can_gc)
481                    // <https://streams.spec.whatwg.org/#transformstream-set-up>
482                    // Step 6. Let flushAlgorithmWrapper be an algorithm that runs these steps:
483                    // Step 6.1 Let result be the result of running flushAlgorithm,
484                    //      if flushAlgorithm was given, or null otherwise.
485                    // Step 6.2 If result is a Promise, then return result.
486                    // Note: Not applicable. The spec does NOT require flush_and_enqueue algo to return a Promise
487                    // Step 6.3 Return a promise resolved with undefined.
488                    .map(|_| Promise::new_resolved(global, cx, (), can_gc))
489                    .unwrap_or_else(|e| {
490                        // <https://streams.spec.whatwg.org/#transformstream-set-up>
491                        // Step 6.1 If this throws an exception e,
492                        let realm = enter_realm(self);
493                        let p = Promise::new_in_current_realm((&realm).into(), can_gc);
494                        // return a promise rejected with e.
495                        p.reject_error(e, can_gc);
496                        p
497                    })
498            },
499            TransformerType::Encoder(encoder) => {
500                // <https://encoding.spec.whatwg.org/#textencoderstream-encoder>
501                // Step 3. Let flushAlgorithm be an algorithm which runs the encode and flush algorithm with this.
502                encode_and_flush(cx, global, encoder, self, can_gc)
503                    // <https://streams.spec.whatwg.org/#transformstream-set-up>
504                    // Step 6. Let flushAlgorithmWrapper be an algorithm that runs these steps:
505                    // Step 6.1 Let result be the result of running flushAlgorithm,
506                    //      if flushAlgorithm was given, or null otherwise.
507                    // Step 6.2 If result is a Promise, then return result.
508                    // Note: Not applicable. The spec does NOT require encode_and_flush algo to return a Promise
509                    // Step 6.3 Return a promise resolved with undefined.
510                    .map(|_| Promise::new_resolved(global, cx, (), can_gc))
511                    .unwrap_or_else(|e| {
512                        // <https://streams.spec.whatwg.org/#transformstream-set-up>
513                        // Step 6.1 If this throws an exception e,
514                        let realm = enter_realm(self);
515                        let p = Promise::new_in_current_realm((&realm).into(), can_gc);
516                        // return a promise rejected with e.
517                        p.reject_error(e, can_gc);
518                        p
519                    })
520            },
521            TransformerType::Compressor(cs) => {
522                // <https://compression.spec.whatwg.org/#dom-compressionstream-compressionstream>
523                // Step 4. Let flushAlgorithm be an algorithm which takes no argument and runs the
524                // compress flush and enqueue algorithm with this.
525                compress_flush_and_enqueue(cx, global, cs, self, can_gc)
526                    // <https://streams.spec.whatwg.org/#transformstream-set-up>
527                    // Step 6. Let flushAlgorithmWrapper be an algorithm that runs these steps:
528                    // Step 6.1 Let result be the result of running flushAlgorithm,
529                    //      if flushAlgorithm was given, or null otherwise.
530                    // Step 6.2 If result is a Promise, then return result.
531                    // Note: Not applicable. The spec does NOT require compress_flush_and_enqueue
532                    // algo to return a Promise.
533                    // Step 6.3 Return a promise resolved with undefined.
534                    .map(|_| Promise::new_resolved(global, cx, (), can_gc))
535                    .unwrap_or_else(|e| {
536                        // <https://streams.spec.whatwg.org/#transformstream-set-up>
537                        // Step 6.1 If this throws an exception e,
538                        let realm = enter_realm(self);
539                        let p = Promise::new_in_current_realm((&realm).into(), can_gc);
540                        // return a promise rejected with e.
541                        p.reject_error(e, can_gc);
542                        p
543                    })
544            },
545            TransformerType::Decompressor(ds) => {
546                // <https://compression.spec.whatwg.org/#dom-decompressionstream-decompressionstream>
547                // Step 4. Let flushAlgorithm be an algorithm which takes no argument and runs the
548                // decompress flush and enqueue algorithm with this.
549                decompress_flush_and_enqueue(cx, global, ds, self, can_gc)
550                    // <https://streams.spec.whatwg.org/#transformstream-set-up>
551                    // Step 6. Let flushAlgorithmWrapper be an algorithm that runs these steps:
552                    // Step 6.1 Let result be the result of running flushAlgorithm,
553                    //      if flushAlgorithm was given, or null otherwise.
554                    // Step 6.2 If result is a Promise, then return result.
555                    // Note: Not applicable. The spec does NOT require decompress_flush_and_enqueue
556                    // algo to return a Promise.
557                    // Step 6.3 Return a promise resolved with undefined.
558                    .map(|_| Promise::new_resolved(global, cx, (), can_gc))
559                    .unwrap_or_else(|e| {
560                        // <https://streams.spec.whatwg.org/#transformstream-set-up>
561                        // Step 6.1 If this throws an exception e,
562                        let realm = enter_realm(self);
563                        let p = Promise::new_in_current_realm((&realm).into(), can_gc);
564                        // return a promise rejected with e.
565                        p.reject_error(e, can_gc);
566                        p
567                    })
568            },
569        };
570
571        Ok(result)
572    }
573
574    /// <https://streams.spec.whatwg.org/#transform-stream-default-controller-enqueue>
575    #[expect(unsafe_code)]
576    pub(crate) fn enqueue(
577        &self,
578        cx: SafeJSContext,
579        global: &GlobalScope,
580        chunk: SafeHandleValue,
581        can_gc: CanGc,
582    ) -> Fallible<()> {
583        // Let stream be controller.[[stream]].
584        let stream = self.stream.get().expect("stream is null");
585
586        // Let readableController be stream.[[readable]].[[controller]].
587        let readable = stream.get_readable();
588        let readable_controller = readable.get_default_controller();
589
590        // If ! ReadableStreamDefaultControllerCanCloseOrEnqueue(readableController)
591        // is false, throw a TypeError exception.
592        if !readable_controller.can_close_or_enqueue() {
593            return Err(Error::Type(
594                "ReadableStreamDefaultControllerCanCloseOrEnqueue is false".to_owned(),
595            ));
596        }
597
598        // Let enqueueResult be ReadableStreamDefaultControllerEnqueue(readableController, chunk).
599        // If enqueueResult is an abrupt completion,
600        if let Err(error) = readable_controller.enqueue(cx, chunk, can_gc) {
601            // Perform ! TransformStreamErrorWritableAndUnblockWrite(stream, enqueueResult.[[Value]]).
602            rooted!(in(*cx) let mut rooted_error = UndefinedValue());
603            error
604                .clone()
605                .to_jsval(cx, global, rooted_error.handle_mut(), can_gc);
606            stream.error_writable_and_unblock_write(cx, global, rooted_error.handle(), can_gc);
607
608            // Throw stream.[[readable]].[[storedError]].
609            unsafe {
610                if !JS_IsExceptionPending(*cx) {
611                    rooted!(in(*cx) let mut stored_error = UndefinedValue());
612                    readable.get_stored_error(stored_error.handle_mut());
613
614                    JS_SetPendingException(
615                        *cx,
616                        stored_error.handle().into(),
617                        ExceptionStackBehavior::Capture,
618                    );
619                }
620            }
621            return Err(error);
622        }
623
624        // Let backpressure be ! ReadableStreamDefaultControllerHasBackpressure(readableController).
625        let backpressure = readable_controller.has_backpressure();
626
627        // If backpressure is not stream.[[backpressure]],
628        if backpressure != stream.get_backpressure() {
629            // Assert: backpressure is true.
630            assert!(backpressure);
631
632            // Perform ! TransformStreamSetBackpressure(stream, true).
633            stream.set_backpressure(global, true, can_gc);
634        }
635        Ok(())
636    }
637
638    /// <https://streams.spec.whatwg.org/#transform-stream-default-controller-error>
639    pub(crate) fn error(
640        &self,
641        cx: SafeJSContext,
642        global: &GlobalScope,
643        reason: SafeHandleValue,
644        can_gc: CanGc,
645    ) {
646        // Perform ! TransformStreamError(controller.[[stream]], e).
647        self.stream
648            .get()
649            .expect("stream is undefined")
650            .error(cx, global, reason, can_gc);
651    }
652
653    /// <https://streams.spec.whatwg.org/#transform-stream-default-controller-clear-algorithms>
654    pub(crate) fn clear_algorithms(&self) {
655        if let TransformerType::Js {
656            cancel,
657            flush,
658            transform,
659            ..
660        } = &self.transformer_type
661        {
662            // Set controller.[[transformAlgorithm]] to undefined.
663            transform.replace(None);
664
665            // Set controller.[[flushAlgorithm]] to undefined.
666            flush.replace(None);
667
668            // Set controller.[[cancelAlgorithm]] to undefined.
669            cancel.replace(None);
670        }
671    }
672
673    /// <https://streams.spec.whatwg.org/#transform-stream-default-controller-terminate>
674    pub(crate) fn terminate(&self, cx: SafeJSContext, global: &GlobalScope, can_gc: CanGc) {
675        // Let stream be controller.[[stream]].
676        let stream = self.stream.get().expect("stream is null");
677
678        // Let readableController be stream.[[readable]].[[controller]].
679        let readable = stream.get_readable();
680        let readable_controller = readable.get_default_controller();
681
682        // Perform ! ReadableStreamDefaultControllerClose(readableController).
683        readable_controller.close(can_gc);
684
685        // Let error be a TypeError exception indicating that the stream has been terminated.
686        let error = Error::Type("stream has been terminated".to_owned());
687
688        // Perform ! TransformStreamErrorWritableAndUnblockWrite(stream, error).
689        rooted!(in(*cx) let mut rooted_error = UndefinedValue());
690        error.to_jsval(cx, global, rooted_error.handle_mut(), can_gc);
691        stream.error_writable_and_unblock_write(cx, global, rooted_error.handle(), can_gc);
692    }
693}
694
695#[allow(non_snake_case)]
696impl TransformStreamDefaultControllerMethods<crate::DomTypeHolder>
697    for TransformStreamDefaultController
698{
699    /// <https://streams.spec.whatwg.org/#ts-default-controller-desired-size>
700    fn GetDesiredSize(&self) -> Option<f64> {
701        // Let readableController be this.[[stream]].[[readable]].[[controller]].
702        let readable_controller = self
703            .stream
704            .get()
705            .expect("stream is null")
706            .get_readable()
707            .get_default_controller();
708
709        // Return ! ReadableStreamDefaultControllerGetDesiredSize(readableController).
710        readable_controller.get_desired_size()
711    }
712
713    /// <https://streams.spec.whatwg.org/#ts-default-controller-enqueue>
714    fn Enqueue(&self, cx: SafeJSContext, chunk: SafeHandleValue, can_gc: CanGc) -> Fallible<()> {
715        // Perform ? TransformStreamDefaultControllerEnqueue(this, chunk).
716        self.enqueue(cx, &self.global(), chunk, can_gc)
717    }
718
719    /// <https://streams.spec.whatwg.org/#ts-default-controller-error>
720    fn Error(&self, cx: SafeJSContext, reason: SafeHandleValue, can_gc: CanGc) -> Fallible<()> {
721        // Perform ? TransformStreamDefaultControllerError(this, e).
722        self.error(cx, &self.global(), reason, can_gc);
723        Ok(())
724    }
725
726    /// <https://streams.spec.whatwg.org/#ts-default-controller-terminate>
727    fn Terminate(&self, can_gc: CanGc) -> Fallible<()> {
728        // Perform ? TransformStreamDefaultControllerTerminate(this).
729        self.terminate(GlobalScope::get_cx(), &self.global(), can_gc);
730        Ok(())
731    }
732}