script/dom/
transformstreamdefaultcontroller.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
4
5use std::cell::RefCell;
6use std::rc::Rc;
7
8use dom_struct::dom_struct;
9use js::jsapi::{
10    ExceptionStackBehavior, Heap, JS_IsExceptionPending, JS_SetPendingException, JSObject,
11};
12use js::jsval::UndefinedValue;
13use js::rust::{HandleObject as SafeHandleObject, HandleValue as SafeHandleValue};
14
15use super::bindings::cell::DomRefCell;
16use super::bindings::codegen::Bindings::TransformerBinding::{
17    TransformerCancelCallback, TransformerFlushCallback, TransformerTransformCallback,
18};
19use super::types::TransformStream;
20use crate::dom::bindings::callback::ExceptionHandling;
21use crate::dom::bindings::codegen::Bindings::TransformStreamDefaultControllerBinding::TransformStreamDefaultControllerMethods;
22use crate::dom::bindings::codegen::Bindings::TransformerBinding::Transformer;
23use crate::dom::bindings::error::{Error, ErrorToJsval, Fallible};
24use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object};
25use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom};
26use crate::dom::compressionstream::{
27    CompressionStream, compress_and_enqueue_a_chunk, compress_flush_and_enqueue,
28};
29use crate::dom::decompressionstream::{
30    decompress_and_enqueue_a_chunk, decompress_flush_and_enqueue,
31};
32use crate::dom::globalscope::GlobalScope;
33use crate::dom::promise::Promise;
34use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
35use crate::dom::textdecodercommon::TextDecoderCommon;
36use crate::dom::textdecoderstream::{decode_and_enqueue_a_chunk, flush_and_enqueue};
37use crate::dom::textencoderstream::{Encoder, encode_and_enqueue_a_chunk, encode_and_flush};
38use crate::dom::types::DecompressionStream;
39use crate::realms::{InRealm, enter_realm};
40use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
41
42impl js::gc::Rootable for TransformTransformPromiseRejection {}
43
44/// Reacting to transformPromise as part of
45/// <https://streams.spec.whatwg.org/#transform-stream-default-controller-perform-transform>
46#[derive(JSTraceable, MallocSizeOf)]
47#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
48struct TransformTransformPromiseRejection {
49    controller: Dom<TransformStreamDefaultController>,
50}
51
52impl Callback for TransformTransformPromiseRejection {
53    /// Reacting to transformPromise with the following fulfillment steps:
54    #[allow(unsafe_code)]
55    fn callback(&self, cx: SafeJSContext, v: SafeHandleValue, _realm: InRealm, can_gc: CanGc) {
56        // Perform ! TransformStreamError(controller.[[stream]], r).
57        self.controller
58            .error(cx, &self.controller.global(), v, can_gc);
59
60        // Throw r.
61        // Note: this is done part of perform_transform().
62    }
63}
64
65/// The type of transformer algorithms we are using
66#[derive(JSTraceable)]
67pub(crate) enum TransformerType {
68    /// Algorithms provided by Js callbacks
69    Js {
70        /// <https://streams.spec.whatwg.org/#transformstreamdefaultcontroller-cancelalgorithm>
71        cancel: RefCell<Option<Rc<TransformerCancelCallback>>>,
72
73        /// <https://streams.spec.whatwg.org/#transformstreamdefaultcontroller-flushalgorithm>
74        flush: RefCell<Option<Rc<TransformerFlushCallback>>>,
75
76        /// <https://streams.spec.whatwg.org/#transformstreamdefaultcontroller-transformalgorithm>
77        transform: RefCell<Option<Rc<TransformerTransformCallback>>>,
78
79        /// The JS object used as `this` when invoking sink algorithms.
80        transform_obj: Heap<*mut JSObject>,
81    },
82    /// Algorithms supporting `TextDecoderStream` are implemented in Rust
83    ///
84    /// <https://encoding.spec.whatwg.org/#textdecodercommon>
85    Decoder(Rc<TextDecoderCommon>),
86    /// Algorithms supporting `TextEncoderStream` are implemented in Rust
87    ///
88    /// <https://encoding.spec.whatwg.org/#textencoderstream-encoder>
89    Encoder(Encoder),
90    /// Algorithms supporting `CompressionStream` are implemented in Rust
91    ///
92    /// <https://compression.spec.whatwg.org/#compressionstream>
93    Compressor(DomRoot<CompressionStream>),
94    /// Algorithms supporting `DecompressionStream` are implemented in Rust
95    ///
96    /// <https://compression.spec.whatwg.org/#decompressionstream>
97    Decompressor(DomRoot<DecompressionStream>),
98}
99
100impl TransformerType {
101    pub(crate) fn new_from_js_transformer(transformer: &Transformer) -> TransformerType {
102        TransformerType::Js {
103            cancel: RefCell::new(transformer.cancel.clone()),
104            flush: RefCell::new(transformer.flush.clone()),
105            transform: RefCell::new(transformer.transform.clone()),
106            transform_obj: Default::default(),
107        }
108    }
109}
110
111/// <https://streams.spec.whatwg.org/#transformstreamdefaultcontroller>
112#[dom_struct]
113pub struct TransformStreamDefaultController {
114    reflector_: Reflector,
115
116    /// The type of the underlying transformer used. Besides the JS variant,
117    /// there will be other variant(s) for `TextDecoderStream`
118    #[ignore_malloc_size_of = "transformer_type"]
119    transformer_type: TransformerType,
120
121    /// <https://streams.spec.whatwg.org/#TransformStreamDefaultController-stream>
122    stream: MutNullableDom<TransformStream>,
123
124    /// <https://streams.spec.whatwg.org/#transformstreamdefaultcontroller-finishpromise>
125    #[conditional_malloc_size_of]
126    finish_promise: DomRefCell<Option<Rc<Promise>>>,
127}
128
129impl TransformStreamDefaultController {
130    #[cfg_attr(crown, allow(crown::unrooted_must_root))]
131    fn new_inherited(transformer_type: TransformerType) -> TransformStreamDefaultController {
132        TransformStreamDefaultController {
133            reflector_: Reflector::new(),
134            transformer_type,
135            stream: MutNullableDom::new(None),
136            finish_promise: DomRefCell::new(None),
137        }
138    }
139
140    #[cfg_attr(crown, allow(crown::unrooted_must_root))]
141    pub(crate) fn new(
142        global: &GlobalScope,
143        transformer_type: TransformerType,
144        can_gc: CanGc,
145    ) -> DomRoot<TransformStreamDefaultController> {
146        reflect_dom_object(
147            Box::new(TransformStreamDefaultController::new_inherited(
148                transformer_type,
149            )),
150            global,
151            can_gc,
152        )
153    }
154
155    /// Setting the JS object after the heap has settled down.
156    ///
157    /// Note that this has no effect if the transformer type is not `TransformerType::Js`
158    pub(crate) fn set_transform_obj(&self, this_object: SafeHandleObject) {
159        if let TransformerType::Js { transform_obj, .. } = &self.transformer_type {
160            transform_obj.set(*this_object)
161        } else {
162            unreachable!("Non-Js transformer type should not set transform_obj")
163        }
164    }
165
166    pub(crate) fn set_stream(&self, stream: &TransformStream) {
167        self.stream.set(Some(stream));
168    }
169
170    pub(crate) fn get_finish_promise(&self) -> Option<Rc<Promise>> {
171        self.finish_promise.borrow().clone()
172    }
173
174    pub(crate) fn set_finish_promise(&self, promise: Rc<Promise>) {
175        *self.finish_promise.borrow_mut() = Some(promise);
176    }
177
178    /// <https://streams.spec.whatwg.org/#transform-stream-default-controller-perform-transform>
179    pub(crate) fn transform_stream_default_controller_perform_transform(
180        &self,
181        cx: SafeJSContext,
182        global: &GlobalScope,
183        chunk: SafeHandleValue,
184        can_gc: CanGc,
185    ) -> Fallible<Rc<Promise>> {
186        // Let transformPromise be the result of performing controller.[[transformAlgorithm]], passing chunk.
187        let transform_promise = self.perform_transform(cx, global, chunk, can_gc)?;
188
189        // Return the result of reacting to transformPromise with the following rejection steps given the argument r:
190        rooted!(in(*cx) let mut reject_handler = Some(TransformTransformPromiseRejection {
191            controller: Dom::from_ref(self),
192        }));
193
194        let handler = PromiseNativeHandler::new(
195            global,
196            None,
197            reject_handler.take().map(|h| Box::new(h) as Box<_>),
198            can_gc,
199        );
200        let realm = enter_realm(global);
201        let comp = InRealm::Entered(&realm);
202        transform_promise.append_native_handler(&handler, comp, can_gc);
203
204        Ok(transform_promise)
205    }
206
207    pub(crate) fn perform_transform(
208        &self,
209        cx: SafeJSContext,
210        global: &GlobalScope,
211        chunk: SafeHandleValue,
212        can_gc: CanGc,
213    ) -> Fallible<Rc<Promise>> {
214        let result = match &self.transformer_type {
215            // <https://streams.spec.whatwg.org/#set-up-transform-stream-default-controller-from-transformer>
216            TransformerType::Js {
217                transform,
218                transform_obj,
219                ..
220            } => {
221                // Step 5. If transformerDict["transform"] exists, set
222                // transformAlgorithm to an algorithm which takes an argument
223                // chunk and returns the result of invoking
224                // transformerDict["transform"] with argument list « chunk,
225                // controller » and callback this value transformer.
226                let algo = transform.borrow().clone();
227                if let Some(transform) = algo {
228                    rooted!(in(*cx) let this_object = transform_obj.get());
229                    transform
230                        .Call_(
231                            &this_object.handle(),
232                            chunk,
233                            self,
234                            ExceptionHandling::Rethrow,
235                            can_gc,
236                        )
237                        .unwrap_or_else(|e| {
238                            let p = Promise::new(global, can_gc);
239                            p.reject_error(e, can_gc);
240                            p
241                        })
242                } else {
243                    // Step 2. Let transformAlgorithm be the following steps, taking a chunk argument:
244                    // Let result be TransformStreamDefaultControllerEnqueue(controller, chunk).
245                    // If result is an abrupt completion, return a promise rejected with result.[[Value]].
246                    if let Err(error) = self.enqueue(cx, global, chunk, can_gc) {
247                        rooted!(in(*cx) let mut error_val = UndefinedValue());
248                        error.to_jsval(cx, global, error_val.handle_mut(), can_gc);
249                        Promise::new_rejected(global, cx, error_val.handle(), can_gc)
250                    } else {
251                        // Otherwise, return a promise resolved with undefined.
252                        Promise::new_resolved(global, cx, (), can_gc)
253                    }
254                }
255            },
256            TransformerType::Decoder(decoder) => {
257                // <https://encoding.spec.whatwg.org/#dom-textdecoderstream>
258                // Step 7. Let transformAlgorithm be an algorithm which takes a
259                // chunk argument and runs the decode and enqueue a chunk
260                // algorithm with this and chunk.
261                decode_and_enqueue_a_chunk(cx, global, chunk, decoder, self, can_gc)
262                    // <https://streams.spec.whatwg.org/#transformstream-set-up>
263                    // Step 5. Let transformAlgorithmWrapper be an algorithm that runs these steps given a value chunk:
264                    // Step 5.1 Let result be the result of running transformAlgorithm given chunk.
265                    // Step 5.2 If result is a Promise, then return result.
266                    // Note: not applicable, the spec does NOT require deode_and_enqueue_a_chunk() to return a Promise
267                    // Step 5.3 Return a promise resolved with undefined.
268                    .map(|_| Promise::new_resolved(global, cx, (), can_gc))
269                    .unwrap_or_else(|e| {
270                        // <https://streams.spec.whatwg.org/#transformstream-set-up>
271                        // Step 5.1 If this throws an exception e,
272                        let realm = enter_realm(self);
273                        let p = Promise::new_in_current_realm((&realm).into(), can_gc);
274                        // return a promise rejected with e.
275                        p.reject_error(e, can_gc);
276                        p
277                    })
278            },
279            TransformerType::Encoder(encoder) => {
280                // <https://encoding.spec.whatwg.org/#dom-textencoderstream>
281                // Step 2. Let transformAlgorithm be an algorithm which takes a chunk argument and runs the encode
282                //      and enqueue a chunk algorithm with this and chunk.
283                encode_and_enqueue_a_chunk(cx, global, chunk, encoder, self, can_gc)
284                    // <https://streams.spec.whatwg.org/#transformstream-set-up>
285                    // Step 5. Let transformAlgorithmWrapper be an algorithm that runs these steps given a value chunk:
286                    // Step 5.1 Let result be the result of running transformAlgorithm given chunk.
287                    // Step 5.2 If result is a Promise, then return result.
288                    // Note: not applicable, the spec does NOT require encode_and_enqueue_a_chunk() to return a Promise
289                    // Step 5.3 Return a promise resolved with undefined.
290                    .map(|_| Promise::new_resolved(global, cx, (), can_gc))
291                    .unwrap_or_else(|e| {
292                        // <https://streams.spec.whatwg.org/#transformstream-set-up>
293                        // Step 5.1 If this throws an exception e,
294                        let realm = enter_realm(self);
295                        let p = Promise::new_in_current_realm((&realm).into(), can_gc);
296                        // return a promise rejected with e.
297                        p.reject_error(e, can_gc);
298                        p
299                    })
300            },
301            TransformerType::Compressor(cs) => {
302                // <https://compression.spec.whatwg.org/#dom-compressionstream-compressionstream>
303                // Step 3. Let transformAlgorithm be an algorithm which takes a chunk argument and
304                // runs the compress and enqueue a chunk algorithm with this and chunk.
305                compress_and_enqueue_a_chunk(cx, global, cs, chunk, self, can_gc)
306                    // <https://streams.spec.whatwg.org/#transformstream-set-up>
307                    // Step 5. Let transformAlgorithmWrapper be an algorithm that runs these steps given a value chunk:
308                    // Step 5.1 Let result be the result of running transformAlgorithm given chunk.
309                    // Step 5.2 If result is a Promise, then return result.
310                    // Note: not applicable, the spec does NOT require
311                    // compress_and_enqueue_a_chunk() to return a Promise.
312                    // Step 5.3 Return a promise resolved with undefined.
313                    .map(|_| Promise::new_resolved(global, cx, (), can_gc))
314                    .unwrap_or_else(|e| {
315                        // <https://streams.spec.whatwg.org/#transformstream-set-up>
316                        // Step 5.1 If this throws an exception e,
317                        let realm = enter_realm(self);
318                        let p = Promise::new_in_current_realm((&realm).into(), can_gc);
319                        // return a promise rejected with e.
320                        p.reject_error(e, can_gc);
321                        p
322                    })
323            },
324            TransformerType::Decompressor(ds) => {
325                // <https://compression.spec.whatwg.org/#dom-decompressionstream-decompressionstream>
326                // Step 3. Let transformAlgorithm be an algorithm which takes a chunk argument and
327                // runs the decompress and enqueue a chunk algorithm with this and chunk.
328                decompress_and_enqueue_a_chunk(cx, global, ds, chunk, self, can_gc)
329                    // <https://streams.spec.whatwg.org/#transformstream-set-up>
330                    // Step 5. Let transformAlgorithmWrapper be an algorithm that runs these steps given a value chunk:
331                    // Step 5.1 Let result be the result of running transformAlgorithm given chunk.
332                    // Step 5.2 If result is a Promise, then return result.
333                    // Note: not applicable, the spec does NOT require
334                    // decompress_and_enqueue_a_chunk() to return a Promise
335                    // Step 5.3 Return a promise resolved with undefined.
336                    .map(|_| Promise::new_resolved(global, cx, (), can_gc))
337                    .unwrap_or_else(|e| {
338                        // <https://streams.spec.whatwg.org/#transformstream-set-up>
339                        // Step 5.1 If this throws an exception e,
340                        let realm = enter_realm(self);
341                        let p = Promise::new_in_current_realm((&realm).into(), can_gc);
342                        // return a promise rejected with e.
343                        p.reject_error(e, can_gc);
344                        p
345                    })
346            },
347        };
348
349        Ok(result)
350    }
351
352    pub(crate) fn perform_cancel(
353        &self,
354        cx: SafeJSContext,
355        global: &GlobalScope,
356        chunk: SafeHandleValue,
357        can_gc: CanGc,
358    ) -> Fallible<Rc<Promise>> {
359        let result = match &self.transformer_type {
360            // <https://streams.spec.whatwg.org/#set-up-transform-stream-default-controller-from-transformer>
361            TransformerType::Js {
362                cancel,
363                transform_obj,
364                ..
365            } => {
366                // Step 7. If transformerDict["cancel"] exists, set
367                // cancelAlgorithm to an algorithm which takes an argument
368                // reason and returns the result of invoking
369                // transformerDict["cancel"] with argument list « reason » and
370                // callback this value transformer.
371                let algo = cancel.borrow().clone();
372                if let Some(cancel) = algo {
373                    rooted!(in(*cx) let this_object = transform_obj.get());
374                    cancel
375                        .Call_(
376                            &this_object.handle(),
377                            chunk,
378                            ExceptionHandling::Rethrow,
379                            can_gc,
380                        )
381                        .unwrap_or_else(|e| {
382                            let p = Promise::new(global, can_gc);
383                            p.reject_error(e, can_gc);
384                            p
385                        })
386                } else {
387                    // Step 4. Let cancelAlgorithm be an algorithm which returns a promise resolved with undefined.
388                    Promise::new_resolved(global, cx, (), can_gc)
389                }
390            },
391            TransformerType::Decoder(_) => {
392                // <https://streams.spec.whatwg.org/#transformstream-set-up>
393                // Step 7. Let cancelAlgorithmWrapper be an algorithm that runs these steps given a value reason:
394                // Step 7.1 Let result be the result of running cancelAlgorithm given reason,
395                //      if cancelAlgorithm was given, or null otherwise
396                // Note: `TextDecoderStream` does NOT specify a cancel algorithm.
397                // Step 7.2 If result is a Promise, then return result.
398                // Note: Not applicable.
399                // Step 7.3 Return a promise resolved with undefined.
400                Promise::new_resolved(global, cx, (), can_gc)
401            },
402            TransformerType::Encoder(_) => {
403                // <https://streams.spec.whatwg.org/#transformstream-set-up>
404                // Step 7. Let cancelAlgorithmWrapper be an algorithm that runs these steps given a value reason:
405                // Step 7.1 Let result be the result of running cancelAlgorithm given reason,
406                //      if cancelAlgorithm was given, or null otherwise
407                // Note: `TextDecoderStream` does NOT specify a cancel algorithm.
408                // Step 7.2 If result is a Promise, then return result.
409                // Note: Not applicable.
410                // Step 7.3 Return a promise resolved with undefined.
411                Promise::new_resolved(global, cx, (), can_gc)
412            },
413            TransformerType::Compressor(_) => {
414                // <https://streams.spec.whatwg.org/#transformstream-set-up>
415                // Step 7. Let cancelAlgorithmWrapper be an algorithm that runs these steps given a value reason:
416                // Step 7.1 Let result be the result of running cancelAlgorithm given reason,
417                //      if cancelAlgorithm was given, or null otherwise
418                // Note: `CompressionStream` does NOT specify a cancel algorithm.
419                // Step 7.2 If result is a Promise, then return result.
420                // Note: Not applicable.
421                // Step 7.3 Return a promise resolved with undefined.
422                Promise::new_resolved(global, cx, (), can_gc)
423            },
424            TransformerType::Decompressor(_) => {
425                // <https://streams.spec.whatwg.org/#transformstream-set-up>
426                // Step 7. Let cancelAlgorithmWrapper be an algorithm that runs these steps given a value reason:
427                // Step 7.1 Let result be the result of running cancelAlgorithm given reason,
428                //      if cancelAlgorithm was given, or null otherwise
429                // Note: `DecompressionStream` does NOT specify a cancel algorithm.
430                // Step 7.2 If result is a Promise, then return result.
431                // Note: Not applicable.
432                // Step 7.3 Return a promise resolved with undefined.
433                Promise::new_resolved(global, cx, (), can_gc)
434            },
435        };
436
437        Ok(result)
438    }
439
440    pub(crate) fn perform_flush(
441        &self,
442        cx: SafeJSContext,
443        global: &GlobalScope,
444        can_gc: CanGc,
445    ) -> Fallible<Rc<Promise>> {
446        let result = match &self.transformer_type {
447            // <https://streams.spec.whatwg.org/#set-up-transform-stream-default-controller-from-transformer>
448            TransformerType::Js {
449                flush,
450                transform_obj,
451                ..
452            } => {
453                // Step 6. If transformerDict["flush"] exists, set flushAlgorithm to an
454                // algorithm which returns the result of invoking
455                // transformerDict["flush"] with argument list « controller »
456                // and callback this value transformer.
457                let algo = flush.borrow().clone();
458                if let Some(flush) = algo {
459                    rooted!(in(*cx) let this_object = transform_obj.get());
460                    flush
461                        .Call_(
462                            &this_object.handle(),
463                            self,
464                            ExceptionHandling::Rethrow,
465                            can_gc,
466                        )
467                        .unwrap_or_else(|e| {
468                            let p = Promise::new(global, can_gc);
469                            p.reject_error(e, can_gc);
470                            p
471                        })
472                } else {
473                    // Step 3. Let flushAlgorithm be an algorithm which returns a promise resolved with undefined.
474                    Promise::new_resolved(global, cx, (), can_gc)
475                }
476            },
477            TransformerType::Decoder(decoder) => {
478                // <https://encoding.spec.whatwg.org/#dom-textdecoderstream>
479                // Step 8. Let flushAlgorithm be an algorithm which takes no
480                // arguments and runs the flush and enqueue algorithm with this.
481                flush_and_enqueue(cx, global, decoder, self, can_gc)
482                    // <https://streams.spec.whatwg.org/#transformstream-set-up>
483                    // Step 6. Let flushAlgorithmWrapper be an algorithm that runs these steps:
484                    // Step 6.1 Let result be the result of running flushAlgorithm,
485                    //      if flushAlgorithm was given, or null otherwise.
486                    // Step 6.2 If result is a Promise, then return result.
487                    // Note: Not applicable. The spec does NOT require flush_and_enqueue algo to return a Promise
488                    // Step 6.3 Return a promise resolved with undefined.
489                    .map(|_| Promise::new_resolved(global, cx, (), can_gc))
490                    .unwrap_or_else(|e| {
491                        // <https://streams.spec.whatwg.org/#transformstream-set-up>
492                        // Step 6.1 If this throws an exception e,
493                        let realm = enter_realm(self);
494                        let p = Promise::new_in_current_realm((&realm).into(), can_gc);
495                        // return a promise rejected with e.
496                        p.reject_error(e, can_gc);
497                        p
498                    })
499            },
500            TransformerType::Encoder(encoder) => {
501                // <https://encoding.spec.whatwg.org/#textencoderstream-encoder>
502                // Step 3. Let flushAlgorithm be an algorithm which runs the encode and flush algorithm with this.
503                encode_and_flush(cx, global, encoder, self, can_gc)
504                    // <https://streams.spec.whatwg.org/#transformstream-set-up>
505                    // Step 6. Let flushAlgorithmWrapper be an algorithm that runs these steps:
506                    // Step 6.1 Let result be the result of running flushAlgorithm,
507                    //      if flushAlgorithm was given, or null otherwise.
508                    // Step 6.2 If result is a Promise, then return result.
509                    // Note: Not applicable. The spec does NOT require encode_and_flush algo to return a Promise
510                    // Step 6.3 Return a promise resolved with undefined.
511                    .map(|_| Promise::new_resolved(global, cx, (), can_gc))
512                    .unwrap_or_else(|e| {
513                        // <https://streams.spec.whatwg.org/#transformstream-set-up>
514                        // Step 6.1 If this throws an exception e,
515                        let realm = enter_realm(self);
516                        let p = Promise::new_in_current_realm((&realm).into(), can_gc);
517                        // return a promise rejected with e.
518                        p.reject_error(e, can_gc);
519                        p
520                    })
521            },
522            TransformerType::Compressor(cs) => {
523                // <https://compression.spec.whatwg.org/#dom-compressionstream-compressionstream>
524                // Step 4. Let flushAlgorithm be an algorithm which takes no argument and runs the
525                // compress flush and enqueue algorithm with this.
526                compress_flush_and_enqueue(cx, global, cs, self, can_gc)
527                    // <https://streams.spec.whatwg.org/#transformstream-set-up>
528                    // Step 6. Let flushAlgorithmWrapper be an algorithm that runs these steps:
529                    // Step 6.1 Let result be the result of running flushAlgorithm,
530                    //      if flushAlgorithm was given, or null otherwise.
531                    // Step 6.2 If result is a Promise, then return result.
532                    // Note: Not applicable. The spec does NOT require compress_flush_and_enqueue
533                    // algo to return a Promise.
534                    // Step 6.3 Return a promise resolved with undefined.
535                    .map(|_| Promise::new_resolved(global, cx, (), can_gc))
536                    .unwrap_or_else(|e| {
537                        // <https://streams.spec.whatwg.org/#transformstream-set-up>
538                        // Step 6.1 If this throws an exception e,
539                        let realm = enter_realm(self);
540                        let p = Promise::new_in_current_realm((&realm).into(), can_gc);
541                        // return a promise rejected with e.
542                        p.reject_error(e, can_gc);
543                        p
544                    })
545            },
546            TransformerType::Decompressor(ds) => {
547                // <https://compression.spec.whatwg.org/#dom-decompressionstream-decompressionstream>
548                // Step 4. Let flushAlgorithm be an algorithm which takes no argument and runs the
549                // decompress flush and enqueue algorithm with this.
550                decompress_flush_and_enqueue(cx, global, ds, self, can_gc)
551                    // <https://streams.spec.whatwg.org/#transformstream-set-up>
552                    // Step 6. Let flushAlgorithmWrapper be an algorithm that runs these steps:
553                    // Step 6.1 Let result be the result of running flushAlgorithm,
554                    //      if flushAlgorithm was given, or null otherwise.
555                    // Step 6.2 If result is a Promise, then return result.
556                    // Note: Not applicable. The spec does NOT require decompress_flush_and_enqueue
557                    // algo to return a Promise.
558                    // Step 6.3 Return a promise resolved with undefined.
559                    .map(|_| Promise::new_resolved(global, cx, (), can_gc))
560                    .unwrap_or_else(|e| {
561                        // <https://streams.spec.whatwg.org/#transformstream-set-up>
562                        // Step 6.1 If this throws an exception e,
563                        let realm = enter_realm(self);
564                        let p = Promise::new_in_current_realm((&realm).into(), can_gc);
565                        // return a promise rejected with e.
566                        p.reject_error(e, can_gc);
567                        p
568                    })
569            },
570        };
571
572        Ok(result)
573    }
574
575    /// <https://streams.spec.whatwg.org/#transform-stream-default-controller-enqueue>
576    #[allow(unsafe_code)]
577    pub(crate) fn enqueue(
578        &self,
579        cx: SafeJSContext,
580        global: &GlobalScope,
581        chunk: SafeHandleValue,
582        can_gc: CanGc,
583    ) -> Fallible<()> {
584        // Let stream be controller.[[stream]].
585        let stream = self.stream.get().expect("stream is null");
586
587        // Let readableController be stream.[[readable]].[[controller]].
588        let readable = stream.get_readable();
589        let readable_controller = readable.get_default_controller();
590
591        // If ! ReadableStreamDefaultControllerCanCloseOrEnqueue(readableController)
592        // is false, throw a TypeError exception.
593        if !readable_controller.can_close_or_enqueue() {
594            return Err(Error::Type(
595                "ReadableStreamDefaultControllerCanCloseOrEnqueue is false".to_owned(),
596            ));
597        }
598
599        // Let enqueueResult be ReadableStreamDefaultControllerEnqueue(readableController, chunk).
600        // If enqueueResult is an abrupt completion,
601        if let Err(error) = readable_controller.enqueue(cx, chunk, can_gc) {
602            // Perform ! TransformStreamErrorWritableAndUnblockWrite(stream, enqueueResult.[[Value]]).
603            rooted!(in(*cx) let mut rooted_error = UndefinedValue());
604            error
605                .clone()
606                .to_jsval(cx, global, rooted_error.handle_mut(), can_gc);
607            stream.error_writable_and_unblock_write(cx, global, rooted_error.handle(), can_gc);
608
609            // Throw stream.[[readable]].[[storedError]].
610            unsafe {
611                if !JS_IsExceptionPending(*cx) {
612                    rooted!(in(*cx) let mut stored_error = UndefinedValue());
613                    readable.get_stored_error(stored_error.handle_mut());
614
615                    JS_SetPendingException(
616                        *cx,
617                        stored_error.handle().into(),
618                        ExceptionStackBehavior::Capture,
619                    );
620                }
621            }
622            return Err(error);
623        }
624
625        // Let backpressure be ! ReadableStreamDefaultControllerHasBackpressure(readableController).
626        let backpressure = readable_controller.has_backpressure();
627
628        // If backpressure is not stream.[[backpressure]],
629        if backpressure != stream.get_backpressure() {
630            // Assert: backpressure is true.
631            assert!(backpressure);
632
633            // Perform ! TransformStreamSetBackpressure(stream, true).
634            stream.set_backpressure(global, true, can_gc);
635        }
636        Ok(())
637    }
638
639    /// <https://streams.spec.whatwg.org/#transform-stream-default-controller-error>
640    pub(crate) fn error(
641        &self,
642        cx: SafeJSContext,
643        global: &GlobalScope,
644        reason: SafeHandleValue,
645        can_gc: CanGc,
646    ) {
647        // Perform ! TransformStreamError(controller.[[stream]], e).
648        self.stream
649            .get()
650            .expect("stream is undefined")
651            .error(cx, global, reason, can_gc);
652    }
653
654    /// <https://streams.spec.whatwg.org/#transform-stream-default-controller-clear-algorithms>
655    pub(crate) fn clear_algorithms(&self) {
656        if let TransformerType::Js {
657            cancel,
658            flush,
659            transform,
660            ..
661        } = &self.transformer_type
662        {
663            // Set controller.[[transformAlgorithm]] to undefined.
664            transform.replace(None);
665
666            // Set controller.[[flushAlgorithm]] to undefined.
667            flush.replace(None);
668
669            // Set controller.[[cancelAlgorithm]] to undefined.
670            cancel.replace(None);
671        }
672    }
673
674    /// <https://streams.spec.whatwg.org/#transform-stream-default-controller-terminate>
675    pub(crate) fn terminate(&self, cx: SafeJSContext, global: &GlobalScope, can_gc: CanGc) {
676        // Let stream be controller.[[stream]].
677        let stream = self.stream.get().expect("stream is null");
678
679        // Let readableController be stream.[[readable]].[[controller]].
680        let readable = stream.get_readable();
681        let readable_controller = readable.get_default_controller();
682
683        // Perform ! ReadableStreamDefaultControllerClose(readableController).
684        readable_controller.close(can_gc);
685
686        // Let error be a TypeError exception indicating that the stream has been terminated.
687        let error = Error::Type("stream has been terminated".to_owned());
688
689        // Perform ! TransformStreamErrorWritableAndUnblockWrite(stream, error).
690        rooted!(in(*cx) let mut rooted_error = UndefinedValue());
691        error.to_jsval(cx, global, rooted_error.handle_mut(), can_gc);
692        stream.error_writable_and_unblock_write(cx, global, rooted_error.handle(), can_gc);
693    }
694}
695
696#[allow(non_snake_case)]
697impl TransformStreamDefaultControllerMethods<crate::DomTypeHolder>
698    for TransformStreamDefaultController
699{
700    /// <https://streams.spec.whatwg.org/#ts-default-controller-desired-size>
701    fn GetDesiredSize(&self) -> Option<f64> {
702        // Let readableController be this.[[stream]].[[readable]].[[controller]].
703        let readable_controller = self
704            .stream
705            .get()
706            .expect("stream is null")
707            .get_readable()
708            .get_default_controller();
709
710        // Return ! ReadableStreamDefaultControllerGetDesiredSize(readableController).
711        readable_controller.get_desired_size()
712    }
713
714    /// <https://streams.spec.whatwg.org/#ts-default-controller-enqueue>
715    fn Enqueue(&self, cx: SafeJSContext, chunk: SafeHandleValue, can_gc: CanGc) -> Fallible<()> {
716        // Perform ? TransformStreamDefaultControllerEnqueue(this, chunk).
717        self.enqueue(cx, &self.global(), chunk, can_gc)
718    }
719
720    /// <https://streams.spec.whatwg.org/#ts-default-controller-error>
721    fn Error(&self, cx: SafeJSContext, reason: SafeHandleValue, can_gc: CanGc) -> Fallible<()> {
722        // Perform ? TransformStreamDefaultControllerError(this, e).
723        self.error(cx, &self.global(), reason, can_gc);
724        Ok(())
725    }
726
727    /// <https://streams.spec.whatwg.org/#ts-default-controller-terminate>
728    fn Terminate(&self, can_gc: CanGc) -> Fallible<()> {
729        // Perform ? TransformStreamDefaultControllerTerminate(this).
730        self.terminate(GlobalScope::get_cx(), &self.global(), can_gc);
731        Ok(())
732    }
733}