script/dom/stream/
transformstreamdefaultcontroller.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
4
5use std::cell::RefCell;
6use std::rc::Rc;
7
8use dom_struct::dom_struct;
9use js::jsapi::{
10    ExceptionStackBehavior, Heap, JS_IsExceptionPending, JS_SetPendingException, JSObject,
11};
12use js::jsval::UndefinedValue;
13use js::realm::CurrentRealm;
14use js::rust::{HandleObject as SafeHandleObject, HandleValue as SafeHandleValue};
15
16use crate::dom::bindings::callback::ExceptionHandling;
17use crate::dom::bindings::cell::DomRefCell;
18use crate::dom::bindings::codegen::Bindings::TransformStreamDefaultControllerBinding::TransformStreamDefaultControllerMethods;
19use crate::dom::bindings::codegen::Bindings::TransformerBinding::{
20    Transformer, TransformerCancelCallback, TransformerFlushCallback, TransformerTransformCallback,
21};
22use crate::dom::bindings::error::{Error, ErrorToJsval, Fallible};
23use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object};
24use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom};
25use crate::dom::compressionstream::{
26    CompressionStream, compress_and_enqueue_a_chunk, compress_flush_and_enqueue,
27};
28use crate::dom::decompressionstream::{
29    decompress_and_enqueue_a_chunk, decompress_flush_and_enqueue,
30};
31use crate::dom::globalscope::GlobalScope;
32use crate::dom::promise::Promise;
33use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
34use crate::dom::textdecodercommon::TextDecoderCommon;
35use crate::dom::textdecoderstream::{decode_and_enqueue_a_chunk, flush_and_enqueue};
36use crate::dom::textencoderstream::{Encoder, encode_and_enqueue_a_chunk, encode_and_flush};
37use crate::dom::types::{DecompressionStream, TransformStream};
38use crate::realms::{InRealm, enter_realm};
39use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
40
41impl js::gc::Rootable for TransformTransformPromiseRejection {}
42
43/// Reacting to transformPromise as part of
44/// <https://streams.spec.whatwg.org/#transform-stream-default-controller-perform-transform>
45#[derive(JSTraceable, MallocSizeOf)]
46#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
47struct TransformTransformPromiseRejection {
48    controller: Dom<TransformStreamDefaultController>,
49}
50
51impl Callback for TransformTransformPromiseRejection {
52    /// Reacting to transformPromise with the following fulfillment steps:
53    fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
54        let can_gc = CanGc::from_cx(cx);
55        let cx: SafeJSContext = cx.into();
56        // Perform ! TransformStreamError(controller.[[stream]], r).
57        self.controller
58            .error(cx, &self.controller.global(), v, can_gc);
59
60        // Throw r.
61        // Note: this is done part of perform_transform().
62    }
63}
64
65/// The type of transformer algorithms we are using
66#[derive(JSTraceable)]
67pub(crate) enum TransformerType {
68    /// Algorithms provided by Js callbacks
69    Js {
70        /// <https://streams.spec.whatwg.org/#transformstreamdefaultcontroller-cancelalgorithm>
71        cancel: RefCell<Option<Rc<TransformerCancelCallback>>>,
72
73        /// <https://streams.spec.whatwg.org/#transformstreamdefaultcontroller-flushalgorithm>
74        flush: RefCell<Option<Rc<TransformerFlushCallback>>>,
75
76        /// <https://streams.spec.whatwg.org/#transformstreamdefaultcontroller-transformalgorithm>
77        transform: RefCell<Option<Rc<TransformerTransformCallback>>>,
78
79        /// The JS object used as `this` when invoking sink algorithms.
80        transform_obj: Heap<*mut JSObject>,
81    },
82    /// Algorithms supporting `TextDecoderStream` are implemented in Rust
83    ///
84    /// <https://encoding.spec.whatwg.org/#textdecodercommon>
85    Decoder(Rc<TextDecoderCommon>),
86    /// Algorithms supporting `TextEncoderStream` are implemented in Rust
87    ///
88    /// <https://encoding.spec.whatwg.org/#textencoderstream-encoder>
89    Encoder(Encoder),
90    /// Algorithms supporting `CompressionStream` are implemented in Rust
91    ///
92    /// <https://compression.spec.whatwg.org/#compressionstream>
93    Compressor(DomRoot<CompressionStream>),
94    /// Algorithms supporting `DecompressionStream` are implemented in Rust
95    ///
96    /// <https://compression.spec.whatwg.org/#decompressionstream>
97    Decompressor(DomRoot<DecompressionStream>),
98}
99
100impl TransformerType {
101    pub(crate) fn new_from_js_transformer(transformer: &Transformer) -> TransformerType {
102        TransformerType::Js {
103            cancel: RefCell::new(transformer.cancel.clone()),
104            flush: RefCell::new(transformer.flush.clone()),
105            transform: RefCell::new(transformer.transform.clone()),
106            transform_obj: Default::default(),
107        }
108    }
109}
110
111/// <https://streams.spec.whatwg.org/#transformstreamdefaultcontroller>
112#[dom_struct]
113pub struct TransformStreamDefaultController {
114    reflector_: Reflector,
115
116    /// The type of the underlying transformer used. Besides the JS variant,
117    /// there will be other variant(s) for `TextDecoderStream`
118    #[ignore_malloc_size_of = "transformer_type"]
119    transformer_type: TransformerType,
120
121    /// <https://streams.spec.whatwg.org/#TransformStreamDefaultController-stream>
122    stream: MutNullableDom<TransformStream>,
123
124    /// <https://streams.spec.whatwg.org/#transformstreamdefaultcontroller-finishpromise>
125    #[conditional_malloc_size_of]
126    finish_promise: DomRefCell<Option<Rc<Promise>>>,
127}
128
129impl TransformStreamDefaultController {
130    fn new_inherited(transformer_type: TransformerType) -> TransformStreamDefaultController {
131        TransformStreamDefaultController {
132            reflector_: Reflector::new(),
133            transformer_type,
134            stream: MutNullableDom::new(None),
135            finish_promise: DomRefCell::new(None),
136        }
137    }
138
139    pub(crate) fn new(
140        global: &GlobalScope,
141        transformer_type: TransformerType,
142        can_gc: CanGc,
143    ) -> DomRoot<TransformStreamDefaultController> {
144        reflect_dom_object(
145            Box::new(TransformStreamDefaultController::new_inherited(
146                transformer_type,
147            )),
148            global,
149            can_gc,
150        )
151    }
152
153    /// Setting the JS object after the heap has settled down.
154    ///
155    /// Note that this has no effect if the transformer type is not `TransformerType::Js`
156    pub(crate) fn set_transform_obj(&self, this_object: SafeHandleObject) {
157        if let TransformerType::Js { transform_obj, .. } = &self.transformer_type {
158            transform_obj.set(*this_object)
159        } else {
160            unreachable!("Non-Js transformer type should not set transform_obj")
161        }
162    }
163
164    pub(crate) fn set_stream(&self, stream: &TransformStream) {
165        self.stream.set(Some(stream));
166    }
167
168    pub(crate) fn get_finish_promise(&self) -> Option<Rc<Promise>> {
169        self.finish_promise.borrow().clone()
170    }
171
172    pub(crate) fn set_finish_promise(&self, promise: Rc<Promise>) {
173        *self.finish_promise.borrow_mut() = Some(promise);
174    }
175
176    /// <https://streams.spec.whatwg.org/#transform-stream-default-controller-perform-transform>
177    pub(crate) fn transform_stream_default_controller_perform_transform(
178        &self,
179        cx: SafeJSContext,
180        global: &GlobalScope,
181        chunk: SafeHandleValue,
182        can_gc: CanGc,
183    ) -> Fallible<Rc<Promise>> {
184        // Let transformPromise be the result of performing controller.[[transformAlgorithm]], passing chunk.
185        let transform_promise = self.perform_transform(cx, global, chunk, can_gc)?;
186
187        // Return the result of reacting to transformPromise with the following rejection steps given the argument r:
188        rooted!(in(*cx) let mut reject_handler = Some(TransformTransformPromiseRejection {
189            controller: Dom::from_ref(self),
190        }));
191
192        let handler = PromiseNativeHandler::new(
193            global,
194            None,
195            reject_handler.take().map(|h| Box::new(h) as Box<_>),
196            can_gc,
197        );
198        let realm = enter_realm(global);
199        let comp = InRealm::Entered(&realm);
200        transform_promise.append_native_handler(&handler, comp, can_gc);
201
202        Ok(transform_promise)
203    }
204
205    pub(crate) fn perform_transform(
206        &self,
207        cx: SafeJSContext,
208        global: &GlobalScope,
209        chunk: SafeHandleValue,
210        can_gc: CanGc,
211    ) -> Fallible<Rc<Promise>> {
212        let result = match &self.transformer_type {
213            // <https://streams.spec.whatwg.org/#set-up-transform-stream-default-controller-from-transformer>
214            TransformerType::Js {
215                transform,
216                transform_obj,
217                ..
218            } => {
219                // Step 5. If transformerDict["transform"] exists, set
220                // transformAlgorithm to an algorithm which takes an argument
221                // chunk and returns the result of invoking
222                // transformerDict["transform"] with argument list « chunk,
223                // controller » and callback this value transformer.
224                let algo = transform.borrow().clone();
225                if let Some(transform) = algo {
226                    rooted!(in(*cx) let this_object = transform_obj.get());
227                    transform
228                        .Call_(
229                            &this_object.handle(),
230                            chunk,
231                            self,
232                            ExceptionHandling::Rethrow,
233                            can_gc,
234                        )
235                        .unwrap_or_else(|e| {
236                            let p = Promise::new(global, can_gc);
237                            p.reject_error(e, can_gc);
238                            p
239                        })
240                } else {
241                    // Step 2. Let transformAlgorithm be the following steps, taking a chunk argument:
242                    // Let result be TransformStreamDefaultControllerEnqueue(controller, chunk).
243                    // If result is an abrupt completion, return a promise rejected with result.[[Value]].
244                    if let Err(error) = self.enqueue(cx, global, chunk, can_gc) {
245                        rooted!(in(*cx) let mut error_val = UndefinedValue());
246                        error.to_jsval(cx, global, error_val.handle_mut(), can_gc);
247                        Promise::new_rejected(global, cx, error_val.handle(), can_gc)
248                    } else {
249                        // Otherwise, return a promise resolved with undefined.
250                        Promise::new_resolved(global, cx, (), can_gc)
251                    }
252                }
253            },
254            TransformerType::Decoder(decoder) => {
255                // <https://encoding.spec.whatwg.org/#dom-textdecoderstream>
256                // Step 7. Let transformAlgorithm be an algorithm which takes a
257                // chunk argument and runs the decode and enqueue a chunk
258                // algorithm with this and chunk.
259                decode_and_enqueue_a_chunk(cx, global, chunk, decoder, self, can_gc)
260                    // <https://streams.spec.whatwg.org/#transformstream-set-up>
261                    // Step 5. Let transformAlgorithmWrapper be an algorithm that runs these steps given a value chunk:
262                    // Step 5.1 Let result be the result of running transformAlgorithm given chunk.
263                    // Step 5.2 If result is a Promise, then return result.
264                    // Note: not applicable, the spec does NOT require deode_and_enqueue_a_chunk() to return a Promise
265                    // Step 5.3 Return a promise resolved with undefined.
266                    .map(|_| Promise::new_resolved(global, cx, (), can_gc))
267                    .unwrap_or_else(|e| {
268                        // <https://streams.spec.whatwg.org/#transformstream-set-up>
269                        // Step 5.1 If this throws an exception e,
270                        let realm = enter_realm(self);
271                        let p = Promise::new_in_current_realm((&realm).into(), can_gc);
272                        // return a promise rejected with e.
273                        p.reject_error(e, can_gc);
274                        p
275                    })
276            },
277            TransformerType::Encoder(encoder) => {
278                // <https://encoding.spec.whatwg.org/#dom-textencoderstream>
279                // Step 2. Let transformAlgorithm be an algorithm which takes a chunk argument and runs the encode
280                //      and enqueue a chunk algorithm with this and chunk.
281                encode_and_enqueue_a_chunk(cx, global, chunk, encoder, self, can_gc)
282                    // <https://streams.spec.whatwg.org/#transformstream-set-up>
283                    // Step 5. Let transformAlgorithmWrapper be an algorithm that runs these steps given a value chunk:
284                    // Step 5.1 Let result be the result of running transformAlgorithm given chunk.
285                    // Step 5.2 If result is a Promise, then return result.
286                    // Note: not applicable, the spec does NOT require encode_and_enqueue_a_chunk() to return a Promise
287                    // Step 5.3 Return a promise resolved with undefined.
288                    .map(|_| Promise::new_resolved(global, cx, (), can_gc))
289                    .unwrap_or_else(|e| {
290                        // <https://streams.spec.whatwg.org/#transformstream-set-up>
291                        // Step 5.1 If this throws an exception e,
292                        let realm = enter_realm(self);
293                        let p = Promise::new_in_current_realm((&realm).into(), can_gc);
294                        // return a promise rejected with e.
295                        p.reject_error(e, can_gc);
296                        p
297                    })
298            },
299            TransformerType::Compressor(cs) => {
300                // <https://compression.spec.whatwg.org/#dom-compressionstream-compressionstream>
301                // Step 3. Let transformAlgorithm be an algorithm which takes a chunk argument and
302                // runs the compress and enqueue a chunk algorithm with this and chunk.
303                compress_and_enqueue_a_chunk(cx, global, cs, chunk, self, can_gc)
304                    // <https://streams.spec.whatwg.org/#transformstream-set-up>
305                    // Step 5. Let transformAlgorithmWrapper be an algorithm that runs these steps given a value chunk:
306                    // Step 5.1 Let result be the result of running transformAlgorithm given chunk.
307                    // Step 5.2 If result is a Promise, then return result.
308                    // Note: not applicable, the spec does NOT require
309                    // compress_and_enqueue_a_chunk() to return a Promise.
310                    // Step 5.3 Return a promise resolved with undefined.
311                    .map(|_| Promise::new_resolved(global, cx, (), can_gc))
312                    .unwrap_or_else(|e| {
313                        // <https://streams.spec.whatwg.org/#transformstream-set-up>
314                        // Step 5.1 If this throws an exception e,
315                        let realm = enter_realm(self);
316                        let p = Promise::new_in_current_realm((&realm).into(), can_gc);
317                        // return a promise rejected with e.
318                        p.reject_error(e, can_gc);
319                        p
320                    })
321            },
322            TransformerType::Decompressor(ds) => {
323                // <https://compression.spec.whatwg.org/#dom-decompressionstream-decompressionstream>
324                // Step 3. Let transformAlgorithm be an algorithm which takes a chunk argument and
325                // runs the decompress and enqueue a chunk algorithm with this and chunk.
326                decompress_and_enqueue_a_chunk(cx, global, ds, chunk, self, can_gc)
327                    // <https://streams.spec.whatwg.org/#transformstream-set-up>
328                    // Step 5. Let transformAlgorithmWrapper be an algorithm that runs these steps given a value chunk:
329                    // Step 5.1 Let result be the result of running transformAlgorithm given chunk.
330                    // Step 5.2 If result is a Promise, then return result.
331                    // Note: not applicable, the spec does NOT require
332                    // decompress_and_enqueue_a_chunk() to return a Promise
333                    // Step 5.3 Return a promise resolved with undefined.
334                    .map(|_| Promise::new_resolved(global, cx, (), can_gc))
335                    .unwrap_or_else(|e| {
336                        // <https://streams.spec.whatwg.org/#transformstream-set-up>
337                        // Step 5.1 If this throws an exception e,
338                        let realm = enter_realm(self);
339                        let p = Promise::new_in_current_realm((&realm).into(), can_gc);
340                        // return a promise rejected with e.
341                        p.reject_error(e, can_gc);
342                        p
343                    })
344            },
345        };
346
347        Ok(result)
348    }
349
350    pub(crate) fn perform_cancel(
351        &self,
352        cx: SafeJSContext,
353        global: &GlobalScope,
354        chunk: SafeHandleValue,
355        can_gc: CanGc,
356    ) -> Fallible<Rc<Promise>> {
357        let result = match &self.transformer_type {
358            // <https://streams.spec.whatwg.org/#set-up-transform-stream-default-controller-from-transformer>
359            TransformerType::Js {
360                cancel,
361                transform_obj,
362                ..
363            } => {
364                // Step 7. If transformerDict["cancel"] exists, set
365                // cancelAlgorithm to an algorithm which takes an argument
366                // reason and returns the result of invoking
367                // transformerDict["cancel"] with argument list « reason » and
368                // callback this value transformer.
369                let algo = cancel.borrow().clone();
370                if let Some(cancel) = algo {
371                    rooted!(in(*cx) let this_object = transform_obj.get());
372                    cancel
373                        .Call_(
374                            &this_object.handle(),
375                            chunk,
376                            ExceptionHandling::Rethrow,
377                            can_gc,
378                        )
379                        .unwrap_or_else(|e| {
380                            let p = Promise::new(global, can_gc);
381                            p.reject_error(e, can_gc);
382                            p
383                        })
384                } else {
385                    // Step 4. Let cancelAlgorithm be an algorithm which returns a promise resolved with undefined.
386                    Promise::new_resolved(global, cx, (), can_gc)
387                }
388            },
389            TransformerType::Decoder(_) => {
390                // <https://streams.spec.whatwg.org/#transformstream-set-up>
391                // Step 7. Let cancelAlgorithmWrapper be an algorithm that runs these steps given a value reason:
392                // Step 7.1 Let result be the result of running cancelAlgorithm given reason,
393                //      if cancelAlgorithm was given, or null otherwise
394                // Note: `TextDecoderStream` does NOT specify a cancel algorithm.
395                // Step 7.2 If result is a Promise, then return result.
396                // Note: Not applicable.
397                // Step 7.3 Return a promise resolved with undefined.
398                Promise::new_resolved(global, cx, (), can_gc)
399            },
400            TransformerType::Encoder(_) => {
401                // <https://streams.spec.whatwg.org/#transformstream-set-up>
402                // Step 7. Let cancelAlgorithmWrapper be an algorithm that runs these steps given a value reason:
403                // Step 7.1 Let result be the result of running cancelAlgorithm given reason,
404                //      if cancelAlgorithm was given, or null otherwise
405                // Note: `TextDecoderStream` does NOT specify a cancel algorithm.
406                // Step 7.2 If result is a Promise, then return result.
407                // Note: Not applicable.
408                // Step 7.3 Return a promise resolved with undefined.
409                Promise::new_resolved(global, cx, (), can_gc)
410            },
411            TransformerType::Compressor(_) => {
412                // <https://streams.spec.whatwg.org/#transformstream-set-up>
413                // Step 7. Let cancelAlgorithmWrapper be an algorithm that runs these steps given a value reason:
414                // Step 7.1 Let result be the result of running cancelAlgorithm given reason,
415                //      if cancelAlgorithm was given, or null otherwise
416                // Note: `CompressionStream` does NOT specify a cancel algorithm.
417                // Step 7.2 If result is a Promise, then return result.
418                // Note: Not applicable.
419                // Step 7.3 Return a promise resolved with undefined.
420                Promise::new_resolved(global, cx, (), can_gc)
421            },
422            TransformerType::Decompressor(_) => {
423                // <https://streams.spec.whatwg.org/#transformstream-set-up>
424                // Step 7. Let cancelAlgorithmWrapper be an algorithm that runs these steps given a value reason:
425                // Step 7.1 Let result be the result of running cancelAlgorithm given reason,
426                //      if cancelAlgorithm was given, or null otherwise
427                // Note: `DecompressionStream` does NOT specify a cancel algorithm.
428                // Step 7.2 If result is a Promise, then return result.
429                // Note: Not applicable.
430                // Step 7.3 Return a promise resolved with undefined.
431                Promise::new_resolved(global, cx, (), can_gc)
432            },
433        };
434
435        Ok(result)
436    }
437
438    pub(crate) fn perform_flush(
439        &self,
440        cx: SafeJSContext,
441        global: &GlobalScope,
442        can_gc: CanGc,
443    ) -> Fallible<Rc<Promise>> {
444        let result = match &self.transformer_type {
445            // <https://streams.spec.whatwg.org/#set-up-transform-stream-default-controller-from-transformer>
446            TransformerType::Js {
447                flush,
448                transform_obj,
449                ..
450            } => {
451                // Step 6. If transformerDict["flush"] exists, set flushAlgorithm to an
452                // algorithm which returns the result of invoking
453                // transformerDict["flush"] with argument list « controller »
454                // and callback this value transformer.
455                let algo = flush.borrow().clone();
456                if let Some(flush) = algo {
457                    rooted!(in(*cx) let this_object = transform_obj.get());
458                    flush
459                        .Call_(
460                            &this_object.handle(),
461                            self,
462                            ExceptionHandling::Rethrow,
463                            can_gc,
464                        )
465                        .unwrap_or_else(|e| {
466                            let p = Promise::new(global, can_gc);
467                            p.reject_error(e, can_gc);
468                            p
469                        })
470                } else {
471                    // Step 3. Let flushAlgorithm be an algorithm which returns a promise resolved with undefined.
472                    Promise::new_resolved(global, cx, (), can_gc)
473                }
474            },
475            TransformerType::Decoder(decoder) => {
476                // <https://encoding.spec.whatwg.org/#dom-textdecoderstream>
477                // Step 8. Let flushAlgorithm be an algorithm which takes no
478                // arguments and runs the flush and enqueue algorithm with this.
479                flush_and_enqueue(cx, global, decoder, self, can_gc)
480                    // <https://streams.spec.whatwg.org/#transformstream-set-up>
481                    // Step 6. Let flushAlgorithmWrapper be an algorithm that runs these steps:
482                    // Step 6.1 Let result be the result of running flushAlgorithm,
483                    //      if flushAlgorithm was given, or null otherwise.
484                    // Step 6.2 If result is a Promise, then return result.
485                    // Note: Not applicable. The spec does NOT require flush_and_enqueue algo to return a Promise
486                    // Step 6.3 Return a promise resolved with undefined.
487                    .map(|_| Promise::new_resolved(global, cx, (), can_gc))
488                    .unwrap_or_else(|e| {
489                        // <https://streams.spec.whatwg.org/#transformstream-set-up>
490                        // Step 6.1 If this throws an exception e,
491                        let realm = enter_realm(self);
492                        let p = Promise::new_in_current_realm((&realm).into(), can_gc);
493                        // return a promise rejected with e.
494                        p.reject_error(e, can_gc);
495                        p
496                    })
497            },
498            TransformerType::Encoder(encoder) => {
499                // <https://encoding.spec.whatwg.org/#textencoderstream-encoder>
500                // Step 3. Let flushAlgorithm be an algorithm which runs the encode and flush algorithm with this.
501                encode_and_flush(cx, global, encoder, self, can_gc)
502                    // <https://streams.spec.whatwg.org/#transformstream-set-up>
503                    // Step 6. Let flushAlgorithmWrapper be an algorithm that runs these steps:
504                    // Step 6.1 Let result be the result of running flushAlgorithm,
505                    //      if flushAlgorithm was given, or null otherwise.
506                    // Step 6.2 If result is a Promise, then return result.
507                    // Note: Not applicable. The spec does NOT require encode_and_flush algo to return a Promise
508                    // Step 6.3 Return a promise resolved with undefined.
509                    .map(|_| Promise::new_resolved(global, cx, (), can_gc))
510                    .unwrap_or_else(|e| {
511                        // <https://streams.spec.whatwg.org/#transformstream-set-up>
512                        // Step 6.1 If this throws an exception e,
513                        let realm = enter_realm(self);
514                        let p = Promise::new_in_current_realm((&realm).into(), can_gc);
515                        // return a promise rejected with e.
516                        p.reject_error(e, can_gc);
517                        p
518                    })
519            },
520            TransformerType::Compressor(cs) => {
521                // <https://compression.spec.whatwg.org/#dom-compressionstream-compressionstream>
522                // Step 4. Let flushAlgorithm be an algorithm which takes no argument and runs the
523                // compress flush and enqueue algorithm with this.
524                compress_flush_and_enqueue(cx, global, cs, self, can_gc)
525                    // <https://streams.spec.whatwg.org/#transformstream-set-up>
526                    // Step 6. Let flushAlgorithmWrapper be an algorithm that runs these steps:
527                    // Step 6.1 Let result be the result of running flushAlgorithm,
528                    //      if flushAlgorithm was given, or null otherwise.
529                    // Step 6.2 If result is a Promise, then return result.
530                    // Note: Not applicable. The spec does NOT require compress_flush_and_enqueue
531                    // algo to return a Promise.
532                    // Step 6.3 Return a promise resolved with undefined.
533                    .map(|_| Promise::new_resolved(global, cx, (), can_gc))
534                    .unwrap_or_else(|e| {
535                        // <https://streams.spec.whatwg.org/#transformstream-set-up>
536                        // Step 6.1 If this throws an exception e,
537                        let realm = enter_realm(self);
538                        let p = Promise::new_in_current_realm((&realm).into(), can_gc);
539                        // return a promise rejected with e.
540                        p.reject_error(e, can_gc);
541                        p
542                    })
543            },
544            TransformerType::Decompressor(ds) => {
545                // <https://compression.spec.whatwg.org/#dom-decompressionstream-decompressionstream>
546                // Step 4. Let flushAlgorithm be an algorithm which takes no argument and runs the
547                // decompress flush and enqueue algorithm with this.
548                decompress_flush_and_enqueue(cx, global, ds, self, can_gc)
549                    // <https://streams.spec.whatwg.org/#transformstream-set-up>
550                    // Step 6. Let flushAlgorithmWrapper be an algorithm that runs these steps:
551                    // Step 6.1 Let result be the result of running flushAlgorithm,
552                    //      if flushAlgorithm was given, or null otherwise.
553                    // Step 6.2 If result is a Promise, then return result.
554                    // Note: Not applicable. The spec does NOT require decompress_flush_and_enqueue
555                    // algo to return a Promise.
556                    // Step 6.3 Return a promise resolved with undefined.
557                    .map(|_| Promise::new_resolved(global, cx, (), can_gc))
558                    .unwrap_or_else(|e| {
559                        // <https://streams.spec.whatwg.org/#transformstream-set-up>
560                        // Step 6.1 If this throws an exception e,
561                        let realm = enter_realm(self);
562                        let p = Promise::new_in_current_realm((&realm).into(), can_gc);
563                        // return a promise rejected with e.
564                        p.reject_error(e, can_gc);
565                        p
566                    })
567            },
568        };
569
570        Ok(result)
571    }
572
573    /// <https://streams.spec.whatwg.org/#transform-stream-default-controller-enqueue>
574    #[expect(unsafe_code)]
575    pub(crate) fn enqueue(
576        &self,
577        cx: SafeJSContext,
578        global: &GlobalScope,
579        chunk: SafeHandleValue,
580        can_gc: CanGc,
581    ) -> Fallible<()> {
582        // Let stream be controller.[[stream]].
583        let stream = self.stream.get().expect("stream is null");
584
585        // Let readableController be stream.[[readable]].[[controller]].
586        let readable = stream.get_readable();
587        let readable_controller = readable.get_default_controller();
588
589        // If ! ReadableStreamDefaultControllerCanCloseOrEnqueue(readableController)
590        // is false, throw a TypeError exception.
591        if !readable_controller.can_close_or_enqueue() {
592            return Err(Error::Type(
593                "ReadableStreamDefaultControllerCanCloseOrEnqueue is false".to_owned(),
594            ));
595        }
596
597        // Let enqueueResult be ReadableStreamDefaultControllerEnqueue(readableController, chunk).
598        // If enqueueResult is an abrupt completion,
599        if let Err(error) = readable_controller.enqueue(cx, chunk, can_gc) {
600            // Perform ! TransformStreamErrorWritableAndUnblockWrite(stream, enqueueResult.[[Value]]).
601            rooted!(in(*cx) let mut rooted_error = UndefinedValue());
602            error
603                .clone()
604                .to_jsval(cx, global, rooted_error.handle_mut(), can_gc);
605            stream.error_writable_and_unblock_write(cx, global, rooted_error.handle(), can_gc);
606
607            // Throw stream.[[readable]].[[storedError]].
608            unsafe {
609                if !JS_IsExceptionPending(*cx) {
610                    rooted!(in(*cx) let mut stored_error = UndefinedValue());
611                    readable.get_stored_error(stored_error.handle_mut());
612
613                    JS_SetPendingException(
614                        *cx,
615                        stored_error.handle().into(),
616                        ExceptionStackBehavior::Capture,
617                    );
618                }
619            }
620            return Err(error);
621        }
622
623        // Let backpressure be ! ReadableStreamDefaultControllerHasBackpressure(readableController).
624        let backpressure = readable_controller.has_backpressure();
625
626        // If backpressure is not stream.[[backpressure]],
627        if backpressure != stream.get_backpressure() {
628            // Assert: backpressure is true.
629            assert!(backpressure);
630
631            // Perform ! TransformStreamSetBackpressure(stream, true).
632            stream.set_backpressure(global, true, can_gc);
633        }
634        Ok(())
635    }
636
637    /// <https://streams.spec.whatwg.org/#transform-stream-default-controller-error>
638    pub(crate) fn error(
639        &self,
640        cx: SafeJSContext,
641        global: &GlobalScope,
642        reason: SafeHandleValue,
643        can_gc: CanGc,
644    ) {
645        // Perform ! TransformStreamError(controller.[[stream]], e).
646        self.stream
647            .get()
648            .expect("stream is undefined")
649            .error(cx, global, reason, can_gc);
650    }
651
652    /// <https://streams.spec.whatwg.org/#transform-stream-default-controller-clear-algorithms>
653    pub(crate) fn clear_algorithms(&self) {
654        if let TransformerType::Js {
655            cancel,
656            flush,
657            transform,
658            ..
659        } = &self.transformer_type
660        {
661            // Set controller.[[transformAlgorithm]] to undefined.
662            transform.replace(None);
663
664            // Set controller.[[flushAlgorithm]] to undefined.
665            flush.replace(None);
666
667            // Set controller.[[cancelAlgorithm]] to undefined.
668            cancel.replace(None);
669        }
670    }
671
672    /// <https://streams.spec.whatwg.org/#transform-stream-default-controller-terminate>
673    pub(crate) fn terminate(&self, cx: SafeJSContext, global: &GlobalScope, can_gc: CanGc) {
674        // Let stream be controller.[[stream]].
675        let stream = self.stream.get().expect("stream is null");
676
677        // Let readableController be stream.[[readable]].[[controller]].
678        let readable = stream.get_readable();
679        let readable_controller = readable.get_default_controller();
680
681        // Perform ! ReadableStreamDefaultControllerClose(readableController).
682        readable_controller.close(can_gc);
683
684        // Let error be a TypeError exception indicating that the stream has been terminated.
685        let error = Error::Type("stream has been terminated".to_owned());
686
687        // Perform ! TransformStreamErrorWritableAndUnblockWrite(stream, error).
688        rooted!(in(*cx) let mut rooted_error = UndefinedValue());
689        error.to_jsval(cx, global, rooted_error.handle_mut(), can_gc);
690        stream.error_writable_and_unblock_write(cx, global, rooted_error.handle(), can_gc);
691    }
692}
693
694impl TransformStreamDefaultControllerMethods<crate::DomTypeHolder>
695    for TransformStreamDefaultController
696{
697    /// <https://streams.spec.whatwg.org/#ts-default-controller-desired-size>
698    fn GetDesiredSize(&self) -> Option<f64> {
699        // Let readableController be this.[[stream]].[[readable]].[[controller]].
700        let readable_controller = self
701            .stream
702            .get()
703            .expect("stream is null")
704            .get_readable()
705            .get_default_controller();
706
707        // Return ! ReadableStreamDefaultControllerGetDesiredSize(readableController).
708        readable_controller.get_desired_size()
709    }
710
711    /// <https://streams.spec.whatwg.org/#ts-default-controller-enqueue>
712    fn Enqueue(&self, cx: SafeJSContext, chunk: SafeHandleValue, can_gc: CanGc) -> Fallible<()> {
713        // Perform ? TransformStreamDefaultControllerEnqueue(this, chunk).
714        self.enqueue(cx, &self.global(), chunk, can_gc)
715    }
716
717    /// <https://streams.spec.whatwg.org/#ts-default-controller-error>
718    fn Error(&self, cx: SafeJSContext, reason: SafeHandleValue, can_gc: CanGc) -> Fallible<()> {
719        // Perform ? TransformStreamDefaultControllerError(this, e).
720        self.error(cx, &self.global(), reason, can_gc);
721        Ok(())
722    }
723
724    /// <https://streams.spec.whatwg.org/#ts-default-controller-terminate>
725    fn Terminate(&self, can_gc: CanGc) -> Fallible<()> {
726        // Perform ? TransformStreamDefaultControllerTerminate(this).
727        self.terminate(GlobalScope::get_cx(), &self.global(), can_gc);
728        Ok(())
729    }
730}