Skip to main content

script/dom/stream/
transformstreamdefaultcontroller.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
4
5use std::cell::RefCell;
6use std::rc::Rc;
7
8use dom_struct::dom_struct;
9use js::context::JSContext;
10use js::jsapi::{
11    ExceptionStackBehavior, Heap, JS_IsExceptionPending, JS_SetPendingException, JSObject,
12};
13use js::jsval::UndefinedValue;
14use js::realm::CurrentRealm;
15use js::rust::{HandleObject as SafeHandleObject, HandleValue as SafeHandleValue};
16use script_bindings::cell::DomRefCell;
17use script_bindings::reflector::{Reflector, reflect_dom_object};
18
19use crate::dom::bindings::callback::ExceptionHandling;
20use crate::dom::bindings::codegen::Bindings::TransformStreamDefaultControllerBinding::TransformStreamDefaultControllerMethods;
21use crate::dom::bindings::codegen::Bindings::TransformerBinding::{
22    Transformer, TransformerCancelCallback, TransformerFlushCallback, TransformerTransformCallback,
23};
24use crate::dom::bindings::error::{Error, ErrorToJsval, Fallible};
25use crate::dom::bindings::reflector::DomGlobal;
26use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom};
27use crate::dom::compressionstream::{
28    CompressionStream, compress_and_enqueue_a_chunk, compress_flush_and_enqueue,
29};
30use crate::dom::decompressionstream::{
31    decompress_and_enqueue_a_chunk, decompress_flush_and_enqueue,
32};
33use crate::dom::encoding::textdecodercommon::TextDecoderCommon;
34use crate::dom::encoding::textdecoderstream::{decode_and_enqueue_a_chunk, flush_and_enqueue};
35use crate::dom::encoding::textencoderstream::{
36    Encoder, encode_and_enqueue_a_chunk, encode_and_flush,
37};
38use crate::dom::globalscope::GlobalScope;
39use crate::dom::promise::Promise;
40use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
41use crate::dom::types::{DecompressionStream, TransformStream};
42use crate::realms::enter_auto_realm;
43use crate::script_runtime::CanGc;
44
45impl js::gc::Rootable for TransformTransformPromiseRejection {}
46
47/// Reacting to transformPromise as part of
48/// <https://streams.spec.whatwg.org/#transform-stream-default-controller-perform-transform>
49#[derive(JSTraceable, MallocSizeOf)]
50#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
51struct TransformTransformPromiseRejection {
52    controller: Dom<TransformStreamDefaultController>,
53}
54
55impl Callback for TransformTransformPromiseRejection {
56    /// Reacting to transformPromise with the following fulfillment steps:
57    fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
58        // Perform ! TransformStreamError(controller.[[stream]], r).
59        self.controller.error(cx, &self.controller.global(), v);
60
61        // Throw r.
62        // Note: this is done part of perform_transform().
63    }
64}
65
66/// The type of transformer algorithms we are using
67#[derive(JSTraceable)]
68pub(crate) enum TransformerType {
69    /// Algorithms provided by Js callbacks
70    Js {
71        /// <https://streams.spec.whatwg.org/#transformstreamdefaultcontroller-cancelalgorithm>
72        cancel: RefCell<Option<Rc<TransformerCancelCallback>>>,
73
74        /// <https://streams.spec.whatwg.org/#transformstreamdefaultcontroller-flushalgorithm>
75        flush: RefCell<Option<Rc<TransformerFlushCallback>>>,
76
77        /// <https://streams.spec.whatwg.org/#transformstreamdefaultcontroller-transformalgorithm>
78        transform: RefCell<Option<Rc<TransformerTransformCallback>>>,
79
80        /// The JS object used as `this` when invoking sink algorithms.
81        transform_obj: Heap<*mut JSObject>,
82    },
83    /// Algorithms supporting `TextDecoderStream` are implemented in Rust
84    ///
85    /// <https://encoding.spec.whatwg.org/#textdecodercommon>
86    Decoder(Rc<TextDecoderCommon>),
87    /// Algorithms supporting `TextEncoderStream` are implemented in Rust
88    ///
89    /// <https://encoding.spec.whatwg.org/#textencoderstream-encoder>
90    Encoder(Encoder),
91    /// Algorithms supporting `CompressionStream` are implemented in Rust
92    ///
93    /// <https://compression.spec.whatwg.org/#compressionstream>
94    Compressor(DomRoot<CompressionStream>),
95    /// Algorithms supporting `DecompressionStream` are implemented in Rust
96    ///
97    /// <https://compression.spec.whatwg.org/#decompressionstream>
98    Decompressor(DomRoot<DecompressionStream>),
99}
100
101impl TransformerType {
102    pub(crate) fn new_from_js_transformer(transformer: &Transformer) -> TransformerType {
103        TransformerType::Js {
104            cancel: RefCell::new(transformer.cancel.clone()),
105            flush: RefCell::new(transformer.flush.clone()),
106            transform: RefCell::new(transformer.transform.clone()),
107            transform_obj: Default::default(),
108        }
109    }
110}
111
112/// <https://streams.spec.whatwg.org/#transformstreamdefaultcontroller>
113#[dom_struct]
114pub struct TransformStreamDefaultController {
115    reflector_: Reflector,
116
117    /// The type of the underlying transformer used. Besides the JS variant,
118    /// there will be other variant(s) for `TextDecoderStream`
119    #[ignore_malloc_size_of = "transformer_type"]
120    transformer_type: TransformerType,
121
122    /// <https://streams.spec.whatwg.org/#TransformStreamDefaultController-stream>
123    stream: MutNullableDom<TransformStream>,
124
125    /// <https://streams.spec.whatwg.org/#transformstreamdefaultcontroller-finishpromise>
126    #[conditional_malloc_size_of]
127    finish_promise: DomRefCell<Option<Rc<Promise>>>,
128}
129
130impl TransformStreamDefaultController {
131    fn new_inherited(transformer_type: TransformerType) -> TransformStreamDefaultController {
132        TransformStreamDefaultController {
133            reflector_: Reflector::new(),
134            transformer_type,
135            stream: MutNullableDom::new(None),
136            finish_promise: DomRefCell::new(None),
137        }
138    }
139
140    pub(crate) fn new(
141        global: &GlobalScope,
142        transformer_type: TransformerType,
143        can_gc: CanGc,
144    ) -> DomRoot<TransformStreamDefaultController> {
145        reflect_dom_object(
146            Box::new(TransformStreamDefaultController::new_inherited(
147                transformer_type,
148            )),
149            global,
150            can_gc,
151        )
152    }
153
154    /// Setting the JS object after the heap has settled down.
155    ///
156    /// Note that this has no effect if the transformer type is not `TransformerType::Js`
157    pub(crate) fn set_transform_obj(&self, this_object: SafeHandleObject) {
158        if let TransformerType::Js { transform_obj, .. } = &self.transformer_type {
159            transform_obj.set(*this_object)
160        } else {
161            unreachable!("Non-Js transformer type should not set transform_obj")
162        }
163    }
164
165    pub(crate) fn set_stream(&self, stream: &TransformStream) {
166        self.stream.set(Some(stream));
167    }
168
169    pub(crate) fn get_finish_promise(&self) -> Option<Rc<Promise>> {
170        self.finish_promise.borrow().clone()
171    }
172
173    pub(crate) fn set_finish_promise(&self, promise: Rc<Promise>) {
174        *self.finish_promise.borrow_mut() = Some(promise);
175    }
176
177    /// <https://streams.spec.whatwg.org/#transform-stream-default-controller-perform-transform>
178    pub(crate) fn transform_stream_default_controller_perform_transform(
179        &self,
180        cx: &mut JSContext,
181        global: &GlobalScope,
182        chunk: SafeHandleValue,
183    ) -> Fallible<Rc<Promise>> {
184        // Let transformPromise be the result of performing controller.[[transformAlgorithm]], passing chunk.
185        let transform_promise = self.perform_transform(cx, global, chunk)?;
186
187        // Return the result of reacting to transformPromise with the following rejection steps given the argument r:
188        rooted!(&in(cx) let mut reject_handler = Some(TransformTransformPromiseRejection {
189            controller: Dom::from_ref(self),
190        }));
191
192        let handler = PromiseNativeHandler::new(
193            global,
194            None,
195            reject_handler.take().map(|h| Box::new(h) as Box<_>),
196            CanGc::from_cx(cx),
197        );
198        let mut realm = enter_auto_realm(cx, global);
199        let realm = &mut realm.current_realm();
200        transform_promise.append_native_handler(realm, &handler);
201
202        Ok(transform_promise)
203    }
204
205    pub(crate) fn perform_transform(
206        &self,
207        cx: &mut JSContext,
208        global: &GlobalScope,
209        chunk: SafeHandleValue,
210    ) -> Fallible<Rc<Promise>> {
211        let result = match &self.transformer_type {
212            // <https://streams.spec.whatwg.org/#set-up-transform-stream-default-controller-from-transformer>
213            TransformerType::Js {
214                transform,
215                transform_obj,
216                ..
217            } => {
218                // Step 5. If transformerDict["transform"] exists, set
219                // transformAlgorithm to an algorithm which takes an argument
220                // chunk and returns the result of invoking
221                // transformerDict["transform"] with argument list « chunk,
222                // controller » and callback this value transformer.
223                let algo = transform.borrow().clone();
224                if let Some(transform) = algo {
225                    rooted!(&in(cx) let this_object = transform_obj.get());
226                    transform
227                        .Call_(
228                            cx,
229                            &this_object.handle(),
230                            chunk,
231                            self,
232                            ExceptionHandling::Rethrow,
233                        )
234                        .unwrap_or_else(|e| {
235                            let p = Promise::new2(cx, global);
236                            p.reject_error(e, CanGc::from_cx(cx));
237                            p
238                        })
239                } else {
240                    // Step 2. Let transformAlgorithm be the following steps, taking a chunk argument:
241                    // Let result be TransformStreamDefaultControllerEnqueue(controller, chunk).
242                    // If result is an abrupt completion, return a promise rejected with result.[[Value]].
243                    if let Err(error) = self.enqueue(cx, global, chunk) {
244                        rooted!(&in(cx) let mut error_val = UndefinedValue());
245                        error.to_jsval(
246                            cx.into(),
247                            global,
248                            error_val.handle_mut(),
249                            CanGc::from_cx(cx),
250                        );
251                        Promise::new_rejected(
252                            global,
253                            cx.into(),
254                            error_val.handle(),
255                            CanGc::from_cx(cx),
256                        )
257                    } else {
258                        // Otherwise, return a promise resolved with undefined.
259                        Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx))
260                    }
261                }
262            },
263            TransformerType::Decoder(decoder) => {
264                // <https://encoding.spec.whatwg.org/#dom-textdecoderstream>
265                // Step 7. Let transformAlgorithm be an algorithm which takes a
266                // chunk argument and runs the decode and enqueue a chunk
267                // algorithm with this and chunk.
268                decode_and_enqueue_a_chunk(cx, global, chunk, decoder, self)
269                    // <https://streams.spec.whatwg.org/#transformstream-set-up>
270                    // Step 5. Let transformAlgorithmWrapper be an algorithm that runs these steps given a value chunk:
271                    // Step 5.1 Let result be the result of running transformAlgorithm given chunk.
272                    // Step 5.2 If result is a Promise, then return result.
273                    // Note: not applicable, the spec does NOT require deode_and_enqueue_a_chunk() to return a Promise
274                    // Step 5.3 Return a promise resolved with undefined.
275                    .map(|_| Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx)))
276                    .unwrap_or_else(|e| {
277                        // <https://streams.spec.whatwg.org/#transformstream-set-up>
278                        // Step 5.1 If this throws an exception e,
279                        let mut realm = enter_auto_realm(cx, self);
280                        let realm = &mut realm.current_realm();
281                        let p = Promise::new_in_realm(realm);
282                        // return a promise rejected with e.
283                        p.reject_error(e, CanGc::from_cx(realm));
284                        p
285                    })
286            },
287            TransformerType::Encoder(encoder) => {
288                // <https://encoding.spec.whatwg.org/#dom-textencoderstream>
289                // Step 2. Let transformAlgorithm be an algorithm which takes a chunk argument and runs the encode
290                //      and enqueue a chunk algorithm with this and chunk.
291                encode_and_enqueue_a_chunk(cx, global, chunk, encoder, self)
292                    // <https://streams.spec.whatwg.org/#transformstream-set-up>
293                    // Step 5. Let transformAlgorithmWrapper be an algorithm that runs these steps given a value chunk:
294                    // Step 5.1 Let result be the result of running transformAlgorithm given chunk.
295                    // Step 5.2 If result is a Promise, then return result.
296                    // Note: not applicable, the spec does NOT require encode_and_enqueue_a_chunk() to return a Promise
297                    // Step 5.3 Return a promise resolved with undefined.
298                    .map(|_| Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx)))
299                    .unwrap_or_else(|e| {
300                        // <https://streams.spec.whatwg.org/#transformstream-set-up>
301                        // Step 5.1 If this throws an exception e,
302                        let mut realm = enter_auto_realm(cx, self);
303                        let realm = &mut realm.current_realm();
304                        let p = Promise::new_in_realm(realm);
305                        // return a promise rejected with e.
306                        p.reject_error(e, CanGc::from_cx(realm));
307                        p
308                    })
309            },
310            TransformerType::Compressor(cs) => {
311                // <https://compression.spec.whatwg.org/#dom-compressionstream-compressionstream>
312                // Step 3. Let transformAlgorithm be an algorithm which takes a chunk argument and
313                // runs the compress and enqueue a chunk algorithm with this and chunk.
314                compress_and_enqueue_a_chunk(cx, global, cs, chunk, self)
315                    // <https://streams.spec.whatwg.org/#transformstream-set-up>
316                    // Step 5. Let transformAlgorithmWrapper be an algorithm that runs these steps given a value chunk:
317                    // Step 5.1 Let result be the result of running transformAlgorithm given chunk.
318                    // Step 5.2 If result is a Promise, then return result.
319                    // Note: not applicable, the spec does NOT require
320                    // compress_and_enqueue_a_chunk() to return a Promise.
321                    // Step 5.3 Return a promise resolved with undefined.
322                    .map(|_| Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx)))
323                    .unwrap_or_else(|e| {
324                        // <https://streams.spec.whatwg.org/#transformstream-set-up>
325                        // Step 5.1 If this throws an exception e,
326                        let mut realm = enter_auto_realm(cx, self);
327                        let realm = &mut realm.current_realm();
328                        let p = Promise::new_in_realm(realm);
329                        // return a promise rejected with e.
330                        p.reject_error(e, CanGc::from_cx(realm));
331                        p
332                    })
333            },
334            TransformerType::Decompressor(ds) => {
335                // <https://compression.spec.whatwg.org/#dom-decompressionstream-decompressionstream>
336                // Step 3. Let transformAlgorithm be an algorithm which takes a chunk argument and
337                // runs the decompress and enqueue a chunk algorithm with this and chunk.
338                decompress_and_enqueue_a_chunk(cx, global, ds, chunk, self)
339                    // <https://streams.spec.whatwg.org/#transformstream-set-up>
340                    // Step 5. Let transformAlgorithmWrapper be an algorithm that runs these steps given a value chunk:
341                    // Step 5.1 Let result be the result of running transformAlgorithm given chunk.
342                    // Step 5.2 If result is a Promise, then return result.
343                    // Note: not applicable, the spec does NOT require
344                    // decompress_and_enqueue_a_chunk() to return a Promise
345                    // Step 5.3 Return a promise resolved with undefined.
346                    .map(|_| Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx)))
347                    .unwrap_or_else(|e| {
348                        // <https://streams.spec.whatwg.org/#transformstream-set-up>
349                        // Step 5.1 If this throws an exception e,
350                        let mut realm = enter_auto_realm(cx, self);
351                        let realm = &mut realm.current_realm();
352                        let p = Promise::new_in_realm(realm);
353                        // return a promise rejected with e.
354                        p.reject_error(e, CanGc::from_cx(realm));
355                        p
356                    })
357            },
358        };
359
360        Ok(result)
361    }
362
363    pub(crate) fn perform_cancel(
364        &self,
365        cx: &mut JSContext,
366        global: &GlobalScope,
367        chunk: SafeHandleValue,
368    ) -> Fallible<Rc<Promise>> {
369        let result = match &self.transformer_type {
370            // <https://streams.spec.whatwg.org/#set-up-transform-stream-default-controller-from-transformer>
371            TransformerType::Js {
372                cancel,
373                transform_obj,
374                ..
375            } => {
376                // Step 7. If transformerDict["cancel"] exists, set
377                // cancelAlgorithm to an algorithm which takes an argument
378                // reason and returns the result of invoking
379                // transformerDict["cancel"] with argument list « reason » and
380                // callback this value transformer.
381                let algo = cancel.borrow().clone();
382                if let Some(cancel) = algo {
383                    rooted!(&in(cx) let this_object = transform_obj.get());
384                    cancel
385                        .Call_(cx, &this_object.handle(), chunk, ExceptionHandling::Rethrow)
386                        .unwrap_or_else(|e| {
387                            let p = Promise::new2(cx, global);
388                            p.reject_error(e, CanGc::from_cx(cx));
389                            p
390                        })
391                } else {
392                    // Step 4. Let cancelAlgorithm be an algorithm which returns a promise resolved with undefined.
393                    Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx))
394                }
395            },
396            TransformerType::Decoder(_) => {
397                // <https://streams.spec.whatwg.org/#transformstream-set-up>
398                // Step 7. Let cancelAlgorithmWrapper be an algorithm that runs these steps given a value reason:
399                // Step 7.1 Let result be the result of running cancelAlgorithm given reason,
400                //      if cancelAlgorithm was given, or null otherwise
401                // Note: `TextDecoderStream` does NOT specify a cancel algorithm.
402                // Step 7.2 If result is a Promise, then return result.
403                // Note: Not applicable.
404                // Step 7.3 Return a promise resolved with undefined.
405                Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx))
406            },
407            TransformerType::Encoder(_) => {
408                // <https://streams.spec.whatwg.org/#transformstream-set-up>
409                // Step 7. Let cancelAlgorithmWrapper be an algorithm that runs these steps given a value reason:
410                // Step 7.1 Let result be the result of running cancelAlgorithm given reason,
411                //      if cancelAlgorithm was given, or null otherwise
412                // Note: `TextDecoderStream` does NOT specify a cancel algorithm.
413                // Step 7.2 If result is a Promise, then return result.
414                // Note: Not applicable.
415                // Step 7.3 Return a promise resolved with undefined.
416                Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx))
417            },
418            TransformerType::Compressor(_) => {
419                // <https://streams.spec.whatwg.org/#transformstream-set-up>
420                // Step 7. Let cancelAlgorithmWrapper be an algorithm that runs these steps given a value reason:
421                // Step 7.1 Let result be the result of running cancelAlgorithm given reason,
422                //      if cancelAlgorithm was given, or null otherwise
423                // Note: `CompressionStream` does NOT specify a cancel algorithm.
424                // Step 7.2 If result is a Promise, then return result.
425                // Note: Not applicable.
426                // Step 7.3 Return a promise resolved with undefined.
427                Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx))
428            },
429            TransformerType::Decompressor(_) => {
430                // <https://streams.spec.whatwg.org/#transformstream-set-up>
431                // Step 7. Let cancelAlgorithmWrapper be an algorithm that runs these steps given a value reason:
432                // Step 7.1 Let result be the result of running cancelAlgorithm given reason,
433                //      if cancelAlgorithm was given, or null otherwise
434                // Note: `DecompressionStream` does NOT specify a cancel algorithm.
435                // Step 7.2 If result is a Promise, then return result.
436                // Note: Not applicable.
437                // Step 7.3 Return a promise resolved with undefined.
438                Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx))
439            },
440        };
441
442        Ok(result)
443    }
444
445    pub(crate) fn perform_flush(
446        &self,
447        cx: &mut JSContext,
448        global: &GlobalScope,
449    ) -> Fallible<Rc<Promise>> {
450        let result = match &self.transformer_type {
451            // <https://streams.spec.whatwg.org/#set-up-transform-stream-default-controller-from-transformer>
452            TransformerType::Js {
453                flush,
454                transform_obj,
455                ..
456            } => {
457                // Step 6. If transformerDict["flush"] exists, set flushAlgorithm to an
458                // algorithm which returns the result of invoking
459                // transformerDict["flush"] with argument list « controller »
460                // and callback this value transformer.
461                let algo = flush.borrow().clone();
462                if let Some(flush) = algo {
463                    rooted!(&in(cx) let this_object = transform_obj.get());
464                    flush
465                        .Call_(cx, &this_object.handle(), self, ExceptionHandling::Rethrow)
466                        .unwrap_or_else(|e| {
467                            let p = Promise::new2(cx, global);
468                            p.reject_error(e, CanGc::from_cx(cx));
469                            p
470                        })
471                } else {
472                    // Step 3. Let flushAlgorithm be an algorithm which returns a promise resolved with undefined.
473                    Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx))
474                }
475            },
476            TransformerType::Decoder(decoder) => {
477                // <https://encoding.spec.whatwg.org/#dom-textdecoderstream>
478                // Step 8. Let flushAlgorithm be an algorithm which takes no
479                // arguments and runs the flush and enqueue algorithm with this.
480                flush_and_enqueue(cx, global, decoder, self)
481                    // <https://streams.spec.whatwg.org/#transformstream-set-up>
482                    // Step 6. Let flushAlgorithmWrapper be an algorithm that runs these steps:
483                    // Step 6.1 Let result be the result of running flushAlgorithm,
484                    //      if flushAlgorithm was given, or null otherwise.
485                    // Step 6.2 If result is a Promise, then return result.
486                    // Note: Not applicable. The spec does NOT require flush_and_enqueue algo to return a Promise
487                    // Step 6.3 Return a promise resolved with undefined.
488                    .map(|_| Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx)))
489                    .unwrap_or_else(|e| {
490                        // <https://streams.spec.whatwg.org/#transformstream-set-up>
491                        // Step 6.1 If this throws an exception e,
492                        let mut realm = enter_auto_realm(cx, self);
493                        let realm = &mut realm.current_realm();
494                        let p = Promise::new_in_realm(realm);
495                        // return a promise rejected with e.
496                        p.reject_error(e, CanGc::from_cx(realm));
497                        p
498                    })
499            },
500            TransformerType::Encoder(encoder) => {
501                // <https://encoding.spec.whatwg.org/#textencoderstream-encoder>
502                // Step 3. Let flushAlgorithm be an algorithm which runs the encode and flush algorithm with this.
503                encode_and_flush(cx, global, encoder, self)
504                    // <https://streams.spec.whatwg.org/#transformstream-set-up>
505                    // Step 6. Let flushAlgorithmWrapper be an algorithm that runs these steps:
506                    // Step 6.1 Let result be the result of running flushAlgorithm,
507                    //      if flushAlgorithm was given, or null otherwise.
508                    // Step 6.2 If result is a Promise, then return result.
509                    // Note: Not applicable. The spec does NOT require encode_and_flush algo to return a Promise
510                    // Step 6.3 Return a promise resolved with undefined.
511                    .map(|_| Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx)))
512                    .unwrap_or_else(|e| {
513                        // <https://streams.spec.whatwg.org/#transformstream-set-up>
514                        // Step 6.1 If this throws an exception e,
515                        let mut realm = enter_auto_realm(cx, self);
516                        let realm = &mut realm.current_realm();
517                        let p = Promise::new_in_realm(realm);
518                        // return a promise rejected with e.
519                        p.reject_error(e, CanGc::from_cx(realm));
520                        p
521                    })
522            },
523            TransformerType::Compressor(cs) => {
524                // <https://compression.spec.whatwg.org/#dom-compressionstream-compressionstream>
525                // Step 4. Let flushAlgorithm be an algorithm which takes no argument and runs the
526                // compress flush and enqueue algorithm with this.
527                compress_flush_and_enqueue(cx, global, cs, self)
528                    // <https://streams.spec.whatwg.org/#transformstream-set-up>
529                    // Step 6. Let flushAlgorithmWrapper be an algorithm that runs these steps:
530                    // Step 6.1 Let result be the result of running flushAlgorithm,
531                    //      if flushAlgorithm was given, or null otherwise.
532                    // Step 6.2 If result is a Promise, then return result.
533                    // Note: Not applicable. The spec does NOT require compress_flush_and_enqueue
534                    // algo to return a Promise.
535                    // Step 6.3 Return a promise resolved with undefined.
536                    .map(|_| Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx)))
537                    .unwrap_or_else(|e| {
538                        // <https://streams.spec.whatwg.org/#transformstream-set-up>
539                        // Step 6.1 If this throws an exception e,
540                        let mut realm = enter_auto_realm(cx, self);
541                        let realm = &mut realm.current_realm();
542                        let p = Promise::new_in_realm(realm);
543                        // return a promise rejected with e.
544                        p.reject_error(e, CanGc::from_cx(realm));
545                        p
546                    })
547            },
548            TransformerType::Decompressor(ds) => {
549                // <https://compression.spec.whatwg.org/#dom-decompressionstream-decompressionstream>
550                // Step 4. Let flushAlgorithm be an algorithm which takes no argument and runs the
551                // decompress flush and enqueue algorithm with this.
552                decompress_flush_and_enqueue(cx, global, ds, self)
553                    // <https://streams.spec.whatwg.org/#transformstream-set-up>
554                    // Step 6. Let flushAlgorithmWrapper be an algorithm that runs these steps:
555                    // Step 6.1 Let result be the result of running flushAlgorithm,
556                    //      if flushAlgorithm was given, or null otherwise.
557                    // Step 6.2 If result is a Promise, then return result.
558                    // Note: Not applicable. The spec does NOT require decompress_flush_and_enqueue
559                    // algo to return a Promise.
560                    // Step 6.3 Return a promise resolved with undefined.
561                    .map(|_| Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx)))
562                    .unwrap_or_else(|e| {
563                        // <https://streams.spec.whatwg.org/#transformstream-set-up>
564                        // Step 6.1 If this throws an exception e,
565                        let mut realm = enter_auto_realm(cx, self);
566                        let realm = &mut realm.current_realm();
567                        let p = Promise::new_in_realm(realm);
568                        // return a promise rejected with e.
569                        p.reject_error(e, CanGc::from_cx(realm));
570                        p
571                    })
572            },
573        };
574
575        Ok(result)
576    }
577
578    /// <https://streams.spec.whatwg.org/#transform-stream-default-controller-enqueue>
579    #[expect(unsafe_code)]
580    pub(crate) fn enqueue(
581        &self,
582        cx: &mut JSContext,
583        global: &GlobalScope,
584        chunk: SafeHandleValue,
585    ) -> Fallible<()> {
586        // Let stream be controller.[[stream]].
587        let stream = self.stream.get().expect("stream is null");
588
589        // Let readableController be stream.[[readable]].[[controller]].
590        let readable = stream.get_readable();
591        let readable_controller = readable.get_default_controller();
592
593        // If ! ReadableStreamDefaultControllerCanCloseOrEnqueue(readableController)
594        // is false, throw a TypeError exception.
595        if !readable_controller.can_close_or_enqueue() {
596            return Err(Error::Type(
597                c"ReadableStreamDefaultControllerCanCloseOrEnqueue is false".to_owned(),
598            ));
599        }
600
601        // Let enqueueResult be ReadableStreamDefaultControllerEnqueue(readableController, chunk).
602        // If enqueueResult is an abrupt completion,
603        if let Err(error) = readable_controller.enqueue(cx, chunk) {
604            // Perform ! TransformStreamErrorWritableAndUnblockWrite(stream, enqueueResult.[[Value]]).
605            rooted!(&in(cx) let mut rooted_error = UndefinedValue());
606            error.clone().to_jsval(
607                cx.into(),
608                global,
609                rooted_error.handle_mut(),
610                CanGc::from_cx(cx),
611            );
612            stream.error_writable_and_unblock_write(cx, global, rooted_error.handle());
613
614            // Throw stream.[[readable]].[[storedError]].
615            unsafe {
616                if !JS_IsExceptionPending(cx.raw_cx()) {
617                    rooted!(&in(cx) let mut stored_error = UndefinedValue());
618                    readable.get_stored_error(stored_error.handle_mut());
619
620                    JS_SetPendingException(
621                        cx.raw_cx(),
622                        stored_error.handle().into(),
623                        ExceptionStackBehavior::Capture,
624                    );
625                }
626            }
627            return Err(error);
628        }
629
630        // Let backpressure be ! ReadableStreamDefaultControllerHasBackpressure(readableController).
631        let backpressure = readable_controller.has_backpressure();
632
633        // If backpressure is not stream.[[backpressure]],
634        if backpressure != stream.get_backpressure() {
635            // Assert: backpressure is true.
636            assert!(backpressure);
637
638            // Perform ! TransformStreamSetBackpressure(stream, true).
639            stream.set_backpressure(global, true, CanGc::from_cx(cx));
640        }
641        Ok(())
642    }
643
644    /// <https://streams.spec.whatwg.org/#transform-stream-default-controller-error>
645    pub(crate) fn error(&self, cx: &mut JSContext, global: &GlobalScope, reason: SafeHandleValue) {
646        // Perform ! TransformStreamError(controller.[[stream]], e).
647        self.stream
648            .get()
649            .expect("stream is undefined")
650            .error(cx, global, reason);
651    }
652
653    /// <https://streams.spec.whatwg.org/#transform-stream-default-controller-clear-algorithms>
654    pub(crate) fn clear_algorithms(&self) {
655        if let TransformerType::Js {
656            cancel,
657            flush,
658            transform,
659            ..
660        } = &self.transformer_type
661        {
662            // Set controller.[[transformAlgorithm]] to undefined.
663            transform.replace(None);
664
665            // Set controller.[[flushAlgorithm]] to undefined.
666            flush.replace(None);
667
668            // Set controller.[[cancelAlgorithm]] to undefined.
669            cancel.replace(None);
670        }
671    }
672
673    /// <https://streams.spec.whatwg.org/#transform-stream-default-controller-terminate>
674    fn terminate(&self, cx: &mut JSContext, global: &GlobalScope) {
675        // Let stream be controller.[[stream]].
676        let stream = self.stream.get().expect("stream is null");
677
678        // Let readableController be stream.[[readable]].[[controller]].
679        let readable = stream.get_readable();
680        let readable_controller = readable.get_default_controller();
681
682        // Perform ! ReadableStreamDefaultControllerClose(readableController).
683        readable_controller.close(cx);
684
685        // Let error be a TypeError exception indicating that the stream has been terminated.
686        let error = Error::Type(c"stream has been terminated".to_owned());
687
688        // Perform ! TransformStreamErrorWritableAndUnblockWrite(stream, error).
689        rooted!(&in(cx) let mut rooted_error = UndefinedValue());
690        error.to_jsval(
691            cx.into(),
692            global,
693            rooted_error.handle_mut(),
694            CanGc::from_cx(cx),
695        );
696        stream.error_writable_and_unblock_write(cx, global, rooted_error.handle());
697    }
698}
699
700impl TransformStreamDefaultControllerMethods<crate::DomTypeHolder>
701    for TransformStreamDefaultController
702{
703    /// <https://streams.spec.whatwg.org/#ts-default-controller-desired-size>
704    fn GetDesiredSize(&self) -> Option<f64> {
705        // Let readableController be this.[[stream]].[[readable]].[[controller]].
706        let readable_controller = self
707            .stream
708            .get()
709            .expect("stream is null")
710            .get_readable()
711            .get_default_controller();
712
713        // Return ! ReadableStreamDefaultControllerGetDesiredSize(readableController).
714        readable_controller.get_desired_size()
715    }
716
717    /// <https://streams.spec.whatwg.org/#ts-default-controller-enqueue>
718    fn Enqueue(&self, cx: &mut JSContext, chunk: SafeHandleValue) -> Fallible<()> {
719        // Perform ? TransformStreamDefaultControllerEnqueue(this, chunk).
720        self.enqueue(cx, &self.global(), chunk)
721    }
722
723    /// <https://streams.spec.whatwg.org/#ts-default-controller-error>
724    fn Error(&self, cx: &mut JSContext, reason: SafeHandleValue) -> Fallible<()> {
725        // Perform ? TransformStreamDefaultControllerError(this, e).
726        self.error(cx, &self.global(), reason);
727        Ok(())
728    }
729
730    /// <https://streams.spec.whatwg.org/#ts-default-controller-terminate>
731    fn Terminate(&self, cx: &mut JSContext) -> Fallible<()> {
732        // Perform ? TransformStreamDefaultControllerTerminate(this).
733        self.terminate(cx, &self.global());
734        Ok(())
735    }
736}