script/dom/stream/
transformstreamdefaultcontroller.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
4
5use std::cell::RefCell;
6use std::rc::Rc;
7
8use dom_struct::dom_struct;
9use js::jsapi::{
10    ExceptionStackBehavior, Heap, JS_IsExceptionPending, JS_SetPendingException, JSObject,
11};
12use js::jsval::UndefinedValue;
13use js::realm::CurrentRealm;
14use js::rust::{HandleObject as SafeHandleObject, HandleValue as SafeHandleValue};
15
16use crate::dom::bindings::callback::ExceptionHandling;
17use crate::dom::bindings::cell::DomRefCell;
18use crate::dom::bindings::codegen::Bindings::TransformStreamDefaultControllerBinding::TransformStreamDefaultControllerMethods;
19use crate::dom::bindings::codegen::Bindings::TransformerBinding::{
20    Transformer, TransformerCancelCallback, TransformerFlushCallback, TransformerTransformCallback,
21};
22use crate::dom::bindings::error::{Error, ErrorToJsval, Fallible};
23use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object};
24use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom};
25use crate::dom::compressionstream::{
26    CompressionStream, compress_and_enqueue_a_chunk, compress_flush_and_enqueue,
27};
28use crate::dom::decompressionstream::{
29    decompress_and_enqueue_a_chunk, decompress_flush_and_enqueue,
30};
31use crate::dom::encoding::textdecodercommon::TextDecoderCommon;
32use crate::dom::encoding::textdecoderstream::{decode_and_enqueue_a_chunk, flush_and_enqueue};
33use crate::dom::encoding::textencoderstream::{
34    Encoder, encode_and_enqueue_a_chunk, encode_and_flush,
35};
36use crate::dom::globalscope::GlobalScope;
37use crate::dom::promise::Promise;
38use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
39use crate::dom::types::{DecompressionStream, TransformStream};
40use crate::realms::{InRealm, enter_auto_realm};
41use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
42
43impl js::gc::Rootable for TransformTransformPromiseRejection {}
44
45/// Reacting to transformPromise as part of
46/// <https://streams.spec.whatwg.org/#transform-stream-default-controller-perform-transform>
47#[derive(JSTraceable, MallocSizeOf)]
48#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
49struct TransformTransformPromiseRejection {
50    controller: Dom<TransformStreamDefaultController>,
51}
52
53impl Callback for TransformTransformPromiseRejection {
54    /// Reacting to transformPromise with the following fulfillment steps:
55    fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
56        // Perform ! TransformStreamError(controller.[[stream]], r).
57        self.controller.error(cx, &self.controller.global(), v);
58
59        // Throw r.
60        // Note: this is done part of perform_transform().
61    }
62}
63
64/// The type of transformer algorithms we are using
65#[derive(JSTraceable)]
66pub(crate) enum TransformerType {
67    /// Algorithms provided by Js callbacks
68    Js {
69        /// <https://streams.spec.whatwg.org/#transformstreamdefaultcontroller-cancelalgorithm>
70        cancel: RefCell<Option<Rc<TransformerCancelCallback>>>,
71
72        /// <https://streams.spec.whatwg.org/#transformstreamdefaultcontroller-flushalgorithm>
73        flush: RefCell<Option<Rc<TransformerFlushCallback>>>,
74
75        /// <https://streams.spec.whatwg.org/#transformstreamdefaultcontroller-transformalgorithm>
76        transform: RefCell<Option<Rc<TransformerTransformCallback>>>,
77
78        /// The JS object used as `this` when invoking sink algorithms.
79        transform_obj: Heap<*mut JSObject>,
80    },
81    /// Algorithms supporting `TextDecoderStream` are implemented in Rust
82    ///
83    /// <https://encoding.spec.whatwg.org/#textdecodercommon>
84    Decoder(Rc<TextDecoderCommon>),
85    /// Algorithms supporting `TextEncoderStream` are implemented in Rust
86    ///
87    /// <https://encoding.spec.whatwg.org/#textencoderstream-encoder>
88    Encoder(Encoder),
89    /// Algorithms supporting `CompressionStream` are implemented in Rust
90    ///
91    /// <https://compression.spec.whatwg.org/#compressionstream>
92    Compressor(DomRoot<CompressionStream>),
93    /// Algorithms supporting `DecompressionStream` are implemented in Rust
94    ///
95    /// <https://compression.spec.whatwg.org/#decompressionstream>
96    Decompressor(DomRoot<DecompressionStream>),
97}
98
99impl TransformerType {
100    pub(crate) fn new_from_js_transformer(transformer: &Transformer) -> TransformerType {
101        TransformerType::Js {
102            cancel: RefCell::new(transformer.cancel.clone()),
103            flush: RefCell::new(transformer.flush.clone()),
104            transform: RefCell::new(transformer.transform.clone()),
105            transform_obj: Default::default(),
106        }
107    }
108}
109
110/// <https://streams.spec.whatwg.org/#transformstreamdefaultcontroller>
111#[dom_struct]
112pub struct TransformStreamDefaultController {
113    reflector_: Reflector,
114
115    /// The type of the underlying transformer used. Besides the JS variant,
116    /// there will be other variant(s) for `TextDecoderStream`
117    #[ignore_malloc_size_of = "transformer_type"]
118    transformer_type: TransformerType,
119
120    /// <https://streams.spec.whatwg.org/#TransformStreamDefaultController-stream>
121    stream: MutNullableDom<TransformStream>,
122
123    /// <https://streams.spec.whatwg.org/#transformstreamdefaultcontroller-finishpromise>
124    #[conditional_malloc_size_of]
125    finish_promise: DomRefCell<Option<Rc<Promise>>>,
126}
127
128impl TransformStreamDefaultController {
129    fn new_inherited(transformer_type: TransformerType) -> TransformStreamDefaultController {
130        TransformStreamDefaultController {
131            reflector_: Reflector::new(),
132            transformer_type,
133            stream: MutNullableDom::new(None),
134            finish_promise: DomRefCell::new(None),
135        }
136    }
137
138    pub(crate) fn new(
139        global: &GlobalScope,
140        transformer_type: TransformerType,
141        can_gc: CanGc,
142    ) -> DomRoot<TransformStreamDefaultController> {
143        reflect_dom_object(
144            Box::new(TransformStreamDefaultController::new_inherited(
145                transformer_type,
146            )),
147            global,
148            can_gc,
149        )
150    }
151
152    /// Setting the JS object after the heap has settled down.
153    ///
154    /// Note that this has no effect if the transformer type is not `TransformerType::Js`
155    pub(crate) fn set_transform_obj(&self, this_object: SafeHandleObject) {
156        if let TransformerType::Js { transform_obj, .. } = &self.transformer_type {
157            transform_obj.set(*this_object)
158        } else {
159            unreachable!("Non-Js transformer type should not set transform_obj")
160        }
161    }
162
163    pub(crate) fn set_stream(&self, stream: &TransformStream) {
164        self.stream.set(Some(stream));
165    }
166
167    pub(crate) fn get_finish_promise(&self) -> Option<Rc<Promise>> {
168        self.finish_promise.borrow().clone()
169    }
170
171    pub(crate) fn set_finish_promise(&self, promise: Rc<Promise>) {
172        *self.finish_promise.borrow_mut() = Some(promise);
173    }
174
175    /// <https://streams.spec.whatwg.org/#transform-stream-default-controller-perform-transform>
176    pub(crate) fn transform_stream_default_controller_perform_transform(
177        &self,
178        cx: &mut js::context::JSContext,
179        global: &GlobalScope,
180        chunk: SafeHandleValue,
181    ) -> Fallible<Rc<Promise>> {
182        // Let transformPromise be the result of performing controller.[[transformAlgorithm]], passing chunk.
183        let transform_promise = self.perform_transform(cx, global, chunk)?;
184
185        // Return the result of reacting to transformPromise with the following rejection steps given the argument r:
186        rooted!(&in(cx) let mut reject_handler = Some(TransformTransformPromiseRejection {
187            controller: Dom::from_ref(self),
188        }));
189
190        let handler = PromiseNativeHandler::new(
191            global,
192            None,
193            reject_handler.take().map(|h| Box::new(h) as Box<_>),
194            CanGc::from_cx(cx),
195        );
196        let mut realm = enter_auto_realm(cx, global);
197        let realm = &mut realm.current_realm();
198        let in_realm_proof = realm.into();
199        let comp = InRealm::Already(&in_realm_proof);
200        transform_promise.append_native_handler(&handler, comp, CanGc::from_cx(realm));
201
202        Ok(transform_promise)
203    }
204
205    pub(crate) fn perform_transform(
206        &self,
207        cx: &mut js::context::JSContext,
208        global: &GlobalScope,
209        chunk: SafeHandleValue,
210    ) -> Fallible<Rc<Promise>> {
211        let result = match &self.transformer_type {
212            // <https://streams.spec.whatwg.org/#set-up-transform-stream-default-controller-from-transformer>
213            TransformerType::Js {
214                transform,
215                transform_obj,
216                ..
217            } => {
218                // Step 5. If transformerDict["transform"] exists, set
219                // transformAlgorithm to an algorithm which takes an argument
220                // chunk and returns the result of invoking
221                // transformerDict["transform"] with argument list « chunk,
222                // controller » and callback this value transformer.
223                let algo = transform.borrow().clone();
224                if let Some(transform) = algo {
225                    rooted!(&in(cx) let this_object = transform_obj.get());
226                    transform
227                        .Call_(
228                            &this_object.handle(),
229                            chunk,
230                            self,
231                            ExceptionHandling::Rethrow,
232                            CanGc::from_cx(cx),
233                        )
234                        .unwrap_or_else(|e| {
235                            let p = Promise::new2(cx, global);
236                            p.reject_error(e, CanGc::from_cx(cx));
237                            p
238                        })
239                } else {
240                    // Step 2. Let transformAlgorithm be the following steps, taking a chunk argument:
241                    // Let result be TransformStreamDefaultControllerEnqueue(controller, chunk).
242                    // If result is an abrupt completion, return a promise rejected with result.[[Value]].
243                    if let Err(error) = self.enqueue(cx, global, chunk) {
244                        rooted!(&in(cx) let mut error_val = UndefinedValue());
245                        error.to_jsval(
246                            cx.into(),
247                            global,
248                            error_val.handle_mut(),
249                            CanGc::from_cx(cx),
250                        );
251                        Promise::new_rejected(
252                            global,
253                            cx.into(),
254                            error_val.handle(),
255                            CanGc::from_cx(cx),
256                        )
257                    } else {
258                        // Otherwise, return a promise resolved with undefined.
259                        Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx))
260                    }
261                }
262            },
263            TransformerType::Decoder(decoder) => {
264                // <https://encoding.spec.whatwg.org/#dom-textdecoderstream>
265                // Step 7. Let transformAlgorithm be an algorithm which takes a
266                // chunk argument and runs the decode and enqueue a chunk
267                // algorithm with this and chunk.
268                decode_and_enqueue_a_chunk(cx, global, chunk, decoder, self)
269                    // <https://streams.spec.whatwg.org/#transformstream-set-up>
270                    // Step 5. Let transformAlgorithmWrapper be an algorithm that runs these steps given a value chunk:
271                    // Step 5.1 Let result be the result of running transformAlgorithm given chunk.
272                    // Step 5.2 If result is a Promise, then return result.
273                    // Note: not applicable, the spec does NOT require deode_and_enqueue_a_chunk() to return a Promise
274                    // Step 5.3 Return a promise resolved with undefined.
275                    .map(|_| Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx)))
276                    .unwrap_or_else(|e| {
277                        // <https://streams.spec.whatwg.org/#transformstream-set-up>
278                        // Step 5.1 If this throws an exception e,
279                        let mut realm = enter_auto_realm(cx, self);
280                        let realm = &mut realm.current_realm();
281                        let p = Promise::new_in_realm(realm);
282                        // return a promise rejected with e.
283                        p.reject_error(e, CanGc::from_cx(realm));
284                        p
285                    })
286            },
287            TransformerType::Encoder(encoder) => {
288                // <https://encoding.spec.whatwg.org/#dom-textencoderstream>
289                // Step 2. Let transformAlgorithm be an algorithm which takes a chunk argument and runs the encode
290                //      and enqueue a chunk algorithm with this and chunk.
291                encode_and_enqueue_a_chunk(cx, global, chunk, encoder, self)
292                    // <https://streams.spec.whatwg.org/#transformstream-set-up>
293                    // Step 5. Let transformAlgorithmWrapper be an algorithm that runs these steps given a value chunk:
294                    // Step 5.1 Let result be the result of running transformAlgorithm given chunk.
295                    // Step 5.2 If result is a Promise, then return result.
296                    // Note: not applicable, the spec does NOT require encode_and_enqueue_a_chunk() to return a Promise
297                    // Step 5.3 Return a promise resolved with undefined.
298                    .map(|_| Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx)))
299                    .unwrap_or_else(|e| {
300                        // <https://streams.spec.whatwg.org/#transformstream-set-up>
301                        // Step 5.1 If this throws an exception e,
302                        let mut realm = enter_auto_realm(cx, self);
303                        let realm = &mut realm.current_realm();
304                        let p = Promise::new_in_realm(realm);
305                        // return a promise rejected with e.
306                        p.reject_error(e, CanGc::from_cx(realm));
307                        p
308                    })
309            },
310            TransformerType::Compressor(cs) => {
311                // <https://compression.spec.whatwg.org/#dom-compressionstream-compressionstream>
312                // Step 3. Let transformAlgorithm be an algorithm which takes a chunk argument and
313                // runs the compress and enqueue a chunk algorithm with this and chunk.
314                compress_and_enqueue_a_chunk(cx, global, cs, chunk, self)
315                    // <https://streams.spec.whatwg.org/#transformstream-set-up>
316                    // Step 5. Let transformAlgorithmWrapper be an algorithm that runs these steps given a value chunk:
317                    // Step 5.1 Let result be the result of running transformAlgorithm given chunk.
318                    // Step 5.2 If result is a Promise, then return result.
319                    // Note: not applicable, the spec does NOT require
320                    // compress_and_enqueue_a_chunk() to return a Promise.
321                    // Step 5.3 Return a promise resolved with undefined.
322                    .map(|_| Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx)))
323                    .unwrap_or_else(|e| {
324                        // <https://streams.spec.whatwg.org/#transformstream-set-up>
325                        // Step 5.1 If this throws an exception e,
326                        let mut realm = enter_auto_realm(cx, self);
327                        let realm = &mut realm.current_realm();
328                        let p = Promise::new_in_realm(realm);
329                        // return a promise rejected with e.
330                        p.reject_error(e, CanGc::from_cx(realm));
331                        p
332                    })
333            },
334            TransformerType::Decompressor(ds) => {
335                // <https://compression.spec.whatwg.org/#dom-decompressionstream-decompressionstream>
336                // Step 3. Let transformAlgorithm be an algorithm which takes a chunk argument and
337                // runs the decompress and enqueue a chunk algorithm with this and chunk.
338                decompress_and_enqueue_a_chunk(cx, global, ds, chunk, self)
339                    // <https://streams.spec.whatwg.org/#transformstream-set-up>
340                    // Step 5. Let transformAlgorithmWrapper be an algorithm that runs these steps given a value chunk:
341                    // Step 5.1 Let result be the result of running transformAlgorithm given chunk.
342                    // Step 5.2 If result is a Promise, then return result.
343                    // Note: not applicable, the spec does NOT require
344                    // decompress_and_enqueue_a_chunk() to return a Promise
345                    // Step 5.3 Return a promise resolved with undefined.
346                    .map(|_| Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx)))
347                    .unwrap_or_else(|e| {
348                        // <https://streams.spec.whatwg.org/#transformstream-set-up>
349                        // Step 5.1 If this throws an exception e,
350                        let mut realm = enter_auto_realm(cx, self);
351                        let realm = &mut realm.current_realm();
352                        let p = Promise::new_in_realm(realm);
353                        // return a promise rejected with e.
354                        p.reject_error(e, CanGc::from_cx(realm));
355                        p
356                    })
357            },
358        };
359
360        Ok(result)
361    }
362
363    pub(crate) fn perform_cancel(
364        &self,
365        cx: SafeJSContext,
366        global: &GlobalScope,
367        chunk: SafeHandleValue,
368        can_gc: CanGc,
369    ) -> Fallible<Rc<Promise>> {
370        let result = match &self.transformer_type {
371            // <https://streams.spec.whatwg.org/#set-up-transform-stream-default-controller-from-transformer>
372            TransformerType::Js {
373                cancel,
374                transform_obj,
375                ..
376            } => {
377                // Step 7. If transformerDict["cancel"] exists, set
378                // cancelAlgorithm to an algorithm which takes an argument
379                // reason and returns the result of invoking
380                // transformerDict["cancel"] with argument list « reason » and
381                // callback this value transformer.
382                let algo = cancel.borrow().clone();
383                if let Some(cancel) = algo {
384                    rooted!(in(*cx) let this_object = transform_obj.get());
385                    cancel
386                        .Call_(
387                            &this_object.handle(),
388                            chunk,
389                            ExceptionHandling::Rethrow,
390                            can_gc,
391                        )
392                        .unwrap_or_else(|e| {
393                            let p = Promise::new(global, can_gc);
394                            p.reject_error(e, can_gc);
395                            p
396                        })
397                } else {
398                    // Step 4. Let cancelAlgorithm be an algorithm which returns a promise resolved with undefined.
399                    Promise::new_resolved(global, cx, (), can_gc)
400                }
401            },
402            TransformerType::Decoder(_) => {
403                // <https://streams.spec.whatwg.org/#transformstream-set-up>
404                // Step 7. Let cancelAlgorithmWrapper be an algorithm that runs these steps given a value reason:
405                // Step 7.1 Let result be the result of running cancelAlgorithm given reason,
406                //      if cancelAlgorithm was given, or null otherwise
407                // Note: `TextDecoderStream` does NOT specify a cancel algorithm.
408                // Step 7.2 If result is a Promise, then return result.
409                // Note: Not applicable.
410                // Step 7.3 Return a promise resolved with undefined.
411                Promise::new_resolved(global, cx, (), can_gc)
412            },
413            TransformerType::Encoder(_) => {
414                // <https://streams.spec.whatwg.org/#transformstream-set-up>
415                // Step 7. Let cancelAlgorithmWrapper be an algorithm that runs these steps given a value reason:
416                // Step 7.1 Let result be the result of running cancelAlgorithm given reason,
417                //      if cancelAlgorithm was given, or null otherwise
418                // Note: `TextDecoderStream` does NOT specify a cancel algorithm.
419                // Step 7.2 If result is a Promise, then return result.
420                // Note: Not applicable.
421                // Step 7.3 Return a promise resolved with undefined.
422                Promise::new_resolved(global, cx, (), can_gc)
423            },
424            TransformerType::Compressor(_) => {
425                // <https://streams.spec.whatwg.org/#transformstream-set-up>
426                // Step 7. Let cancelAlgorithmWrapper be an algorithm that runs these steps given a value reason:
427                // Step 7.1 Let result be the result of running cancelAlgorithm given reason,
428                //      if cancelAlgorithm was given, or null otherwise
429                // Note: `CompressionStream` does NOT specify a cancel algorithm.
430                // Step 7.2 If result is a Promise, then return result.
431                // Note: Not applicable.
432                // Step 7.3 Return a promise resolved with undefined.
433                Promise::new_resolved(global, cx, (), can_gc)
434            },
435            TransformerType::Decompressor(_) => {
436                // <https://streams.spec.whatwg.org/#transformstream-set-up>
437                // Step 7. Let cancelAlgorithmWrapper be an algorithm that runs these steps given a value reason:
438                // Step 7.1 Let result be the result of running cancelAlgorithm given reason,
439                //      if cancelAlgorithm was given, or null otherwise
440                // Note: `DecompressionStream` does NOT specify a cancel algorithm.
441                // Step 7.2 If result is a Promise, then return result.
442                // Note: Not applicable.
443                // Step 7.3 Return a promise resolved with undefined.
444                Promise::new_resolved(global, cx, (), can_gc)
445            },
446        };
447
448        Ok(result)
449    }
450
451    pub(crate) fn perform_flush(
452        &self,
453        cx: &mut js::context::JSContext,
454        global: &GlobalScope,
455    ) -> Fallible<Rc<Promise>> {
456        let result = match &self.transformer_type {
457            // <https://streams.spec.whatwg.org/#set-up-transform-stream-default-controller-from-transformer>
458            TransformerType::Js {
459                flush,
460                transform_obj,
461                ..
462            } => {
463                // Step 6. If transformerDict["flush"] exists, set flushAlgorithm to an
464                // algorithm which returns the result of invoking
465                // transformerDict["flush"] with argument list « controller »
466                // and callback this value transformer.
467                let algo = flush.borrow().clone();
468                if let Some(flush) = algo {
469                    rooted!(&in(cx) let this_object = transform_obj.get());
470                    flush
471                        .Call_(
472                            &this_object.handle(),
473                            self,
474                            ExceptionHandling::Rethrow,
475                            CanGc::from_cx(cx),
476                        )
477                        .unwrap_or_else(|e| {
478                            let p = Promise::new2(cx, global);
479                            p.reject_error(e, CanGc::from_cx(cx));
480                            p
481                        })
482                } else {
483                    // Step 3. Let flushAlgorithm be an algorithm which returns a promise resolved with undefined.
484                    Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx))
485                }
486            },
487            TransformerType::Decoder(decoder) => {
488                // <https://encoding.spec.whatwg.org/#dom-textdecoderstream>
489                // Step 8. Let flushAlgorithm be an algorithm which takes no
490                // arguments and runs the flush and enqueue algorithm with this.
491                flush_and_enqueue(cx, global, decoder, self)
492                    // <https://streams.spec.whatwg.org/#transformstream-set-up>
493                    // Step 6. Let flushAlgorithmWrapper be an algorithm that runs these steps:
494                    // Step 6.1 Let result be the result of running flushAlgorithm,
495                    //      if flushAlgorithm was given, or null otherwise.
496                    // Step 6.2 If result is a Promise, then return result.
497                    // Note: Not applicable. The spec does NOT require flush_and_enqueue algo to return a Promise
498                    // Step 6.3 Return a promise resolved with undefined.
499                    .map(|_| Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx)))
500                    .unwrap_or_else(|e| {
501                        // <https://streams.spec.whatwg.org/#transformstream-set-up>
502                        // Step 6.1 If this throws an exception e,
503                        let mut realm = enter_auto_realm(cx, self);
504                        let realm = &mut realm.current_realm();
505                        let p = Promise::new_in_realm(realm);
506                        // return a promise rejected with e.
507                        p.reject_error(e, CanGc::from_cx(realm));
508                        p
509                    })
510            },
511            TransformerType::Encoder(encoder) => {
512                // <https://encoding.spec.whatwg.org/#textencoderstream-encoder>
513                // Step 3. Let flushAlgorithm be an algorithm which runs the encode and flush algorithm with this.
514                encode_and_flush(cx, global, encoder, self)
515                    // <https://streams.spec.whatwg.org/#transformstream-set-up>
516                    // Step 6. Let flushAlgorithmWrapper be an algorithm that runs these steps:
517                    // Step 6.1 Let result be the result of running flushAlgorithm,
518                    //      if flushAlgorithm was given, or null otherwise.
519                    // Step 6.2 If result is a Promise, then return result.
520                    // Note: Not applicable. The spec does NOT require encode_and_flush algo to return a Promise
521                    // Step 6.3 Return a promise resolved with undefined.
522                    .map(|_| Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx)))
523                    .unwrap_or_else(|e| {
524                        // <https://streams.spec.whatwg.org/#transformstream-set-up>
525                        // Step 6.1 If this throws an exception e,
526                        let mut realm = enter_auto_realm(cx, self);
527                        let realm = &mut realm.current_realm();
528                        let p = Promise::new_in_realm(realm);
529                        // return a promise rejected with e.
530                        p.reject_error(e, CanGc::from_cx(realm));
531                        p
532                    })
533            },
534            TransformerType::Compressor(cs) => {
535                // <https://compression.spec.whatwg.org/#dom-compressionstream-compressionstream>
536                // Step 4. Let flushAlgorithm be an algorithm which takes no argument and runs the
537                // compress flush and enqueue algorithm with this.
538                compress_flush_and_enqueue(cx, global, cs, self)
539                    // <https://streams.spec.whatwg.org/#transformstream-set-up>
540                    // Step 6. Let flushAlgorithmWrapper be an algorithm that runs these steps:
541                    // Step 6.1 Let result be the result of running flushAlgorithm,
542                    //      if flushAlgorithm was given, or null otherwise.
543                    // Step 6.2 If result is a Promise, then return result.
544                    // Note: Not applicable. The spec does NOT require compress_flush_and_enqueue
545                    // algo to return a Promise.
546                    // Step 6.3 Return a promise resolved with undefined.
547                    .map(|_| Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx)))
548                    .unwrap_or_else(|e| {
549                        // <https://streams.spec.whatwg.org/#transformstream-set-up>
550                        // Step 6.1 If this throws an exception e,
551                        let mut realm = enter_auto_realm(cx, self);
552                        let realm = &mut realm.current_realm();
553                        let p = Promise::new_in_realm(realm);
554                        // return a promise rejected with e.
555                        p.reject_error(e, CanGc::from_cx(realm));
556                        p
557                    })
558            },
559            TransformerType::Decompressor(ds) => {
560                // <https://compression.spec.whatwg.org/#dom-decompressionstream-decompressionstream>
561                // Step 4. Let flushAlgorithm be an algorithm which takes no argument and runs the
562                // decompress flush and enqueue algorithm with this.
563                decompress_flush_and_enqueue(cx, global, ds, self)
564                    // <https://streams.spec.whatwg.org/#transformstream-set-up>
565                    // Step 6. Let flushAlgorithmWrapper be an algorithm that runs these steps:
566                    // Step 6.1 Let result be the result of running flushAlgorithm,
567                    //      if flushAlgorithm was given, or null otherwise.
568                    // Step 6.2 If result is a Promise, then return result.
569                    // Note: Not applicable. The spec does NOT require decompress_flush_and_enqueue
570                    // algo to return a Promise.
571                    // Step 6.3 Return a promise resolved with undefined.
572                    .map(|_| Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx)))
573                    .unwrap_or_else(|e| {
574                        // <https://streams.spec.whatwg.org/#transformstream-set-up>
575                        // Step 6.1 If this throws an exception e,
576                        let mut realm = enter_auto_realm(cx, self);
577                        let realm = &mut realm.current_realm();
578                        let p = Promise::new_in_realm(realm);
579                        // return a promise rejected with e.
580                        p.reject_error(e, CanGc::from_cx(realm));
581                        p
582                    })
583            },
584        };
585
586        Ok(result)
587    }
588
589    /// <https://streams.spec.whatwg.org/#transform-stream-default-controller-enqueue>
590    #[expect(unsafe_code)]
591    pub(crate) fn enqueue(
592        &self,
593        cx: &mut js::context::JSContext,
594        global: &GlobalScope,
595        chunk: SafeHandleValue,
596    ) -> Fallible<()> {
597        // Let stream be controller.[[stream]].
598        let stream = self.stream.get().expect("stream is null");
599
600        // Let readableController be stream.[[readable]].[[controller]].
601        let readable = stream.get_readable();
602        let readable_controller = readable.get_default_controller();
603
604        // If ! ReadableStreamDefaultControllerCanCloseOrEnqueue(readableController)
605        // is false, throw a TypeError exception.
606        if !readable_controller.can_close_or_enqueue() {
607            return Err(Error::Type(
608                c"ReadableStreamDefaultControllerCanCloseOrEnqueue is false".to_owned(),
609            ));
610        }
611
612        // Let enqueueResult be ReadableStreamDefaultControllerEnqueue(readableController, chunk).
613        // If enqueueResult is an abrupt completion,
614        if let Err(error) = readable_controller.enqueue(cx, chunk) {
615            // Perform ! TransformStreamErrorWritableAndUnblockWrite(stream, enqueueResult.[[Value]]).
616            rooted!(&in(cx) let mut rooted_error = UndefinedValue());
617            error.clone().to_jsval(
618                cx.into(),
619                global,
620                rooted_error.handle_mut(),
621                CanGc::from_cx(cx),
622            );
623            stream.error_writable_and_unblock_write(cx, global, rooted_error.handle());
624
625            // Throw stream.[[readable]].[[storedError]].
626            unsafe {
627                if !JS_IsExceptionPending(cx.raw_cx()) {
628                    rooted!(&in(cx) let mut stored_error = UndefinedValue());
629                    readable.get_stored_error(stored_error.handle_mut());
630
631                    JS_SetPendingException(
632                        cx.raw_cx(),
633                        stored_error.handle().into(),
634                        ExceptionStackBehavior::Capture,
635                    );
636                }
637            }
638            return Err(error);
639        }
640
641        // Let backpressure be ! ReadableStreamDefaultControllerHasBackpressure(readableController).
642        let backpressure = readable_controller.has_backpressure();
643
644        // If backpressure is not stream.[[backpressure]],
645        if backpressure != stream.get_backpressure() {
646            // Assert: backpressure is true.
647            assert!(backpressure);
648
649            // Perform ! TransformStreamSetBackpressure(stream, true).
650            stream.set_backpressure(global, true, CanGc::from_cx(cx));
651        }
652        Ok(())
653    }
654
655    /// <https://streams.spec.whatwg.org/#transform-stream-default-controller-error>
656    pub(crate) fn error(
657        &self,
658        cx: &mut js::context::JSContext,
659        global: &GlobalScope,
660        reason: SafeHandleValue,
661    ) {
662        // Perform ! TransformStreamError(controller.[[stream]], e).
663        self.stream
664            .get()
665            .expect("stream is undefined")
666            .error(cx, global, reason);
667    }
668
669    /// <https://streams.spec.whatwg.org/#transform-stream-default-controller-clear-algorithms>
670    pub(crate) fn clear_algorithms(&self) {
671        if let TransformerType::Js {
672            cancel,
673            flush,
674            transform,
675            ..
676        } = &self.transformer_type
677        {
678            // Set controller.[[transformAlgorithm]] to undefined.
679            transform.replace(None);
680
681            // Set controller.[[flushAlgorithm]] to undefined.
682            flush.replace(None);
683
684            // Set controller.[[cancelAlgorithm]] to undefined.
685            cancel.replace(None);
686        }
687    }
688
689    /// <https://streams.spec.whatwg.org/#transform-stream-default-controller-terminate>
690    fn terminate(&self, cx: &mut js::context::JSContext, global: &GlobalScope) {
691        // Let stream be controller.[[stream]].
692        let stream = self.stream.get().expect("stream is null");
693
694        // Let readableController be stream.[[readable]].[[controller]].
695        let readable = stream.get_readable();
696        let readable_controller = readable.get_default_controller();
697
698        // Perform ! ReadableStreamDefaultControllerClose(readableController).
699        readable_controller.close(CanGc::from_cx(cx));
700
701        // Let error be a TypeError exception indicating that the stream has been terminated.
702        let error = Error::Type(c"stream has been terminated".to_owned());
703
704        // Perform ! TransformStreamErrorWritableAndUnblockWrite(stream, error).
705        rooted!(&in(cx) let mut rooted_error = UndefinedValue());
706        error.to_jsval(
707            cx.into(),
708            global,
709            rooted_error.handle_mut(),
710            CanGc::from_cx(cx),
711        );
712        stream.error_writable_and_unblock_write(cx, global, rooted_error.handle());
713    }
714}
715
716impl TransformStreamDefaultControllerMethods<crate::DomTypeHolder>
717    for TransformStreamDefaultController
718{
719    /// <https://streams.spec.whatwg.org/#ts-default-controller-desired-size>
720    fn GetDesiredSize(&self) -> Option<f64> {
721        // Let readableController be this.[[stream]].[[readable]].[[controller]].
722        let readable_controller = self
723            .stream
724            .get()
725            .expect("stream is null")
726            .get_readable()
727            .get_default_controller();
728
729        // Return ! ReadableStreamDefaultControllerGetDesiredSize(readableController).
730        readable_controller.get_desired_size()
731    }
732
733    /// <https://streams.spec.whatwg.org/#ts-default-controller-enqueue>
734    fn Enqueue(&self, cx: &mut js::context::JSContext, chunk: SafeHandleValue) -> Fallible<()> {
735        // Perform ? TransformStreamDefaultControllerEnqueue(this, chunk).
736        self.enqueue(cx, &self.global(), chunk)
737    }
738
739    /// <https://streams.spec.whatwg.org/#ts-default-controller-error>
740    fn Error(&self, cx: &mut js::context::JSContext, reason: SafeHandleValue) -> Fallible<()> {
741        // Perform ? TransformStreamDefaultControllerError(this, e).
742        self.error(cx, &self.global(), reason);
743        Ok(())
744    }
745
746    /// <https://streams.spec.whatwg.org/#ts-default-controller-terminate>
747    fn Terminate(&self, cx: &mut js::context::JSContext) -> Fallible<()> {
748        // Perform ? TransformStreamDefaultControllerTerminate(this).
749        self.terminate(cx, &self.global());
750        Ok(())
751    }
752}