script/dom/stream/transformstreamdefaultcontroller.rs
1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
4
5use std::cell::RefCell;
6use std::rc::Rc;
7
8use dom_struct::dom_struct;
9use js::jsapi::{
10 ExceptionStackBehavior, Heap, JS_IsExceptionPending, JS_SetPendingException, JSObject,
11};
12use js::jsval::UndefinedValue;
13use js::realm::CurrentRealm;
14use js::rust::{HandleObject as SafeHandleObject, HandleValue as SafeHandleValue};
15
16use crate::dom::bindings::callback::ExceptionHandling;
17use crate::dom::bindings::cell::DomRefCell;
18use crate::dom::bindings::codegen::Bindings::TransformStreamDefaultControllerBinding::TransformStreamDefaultControllerMethods;
19use crate::dom::bindings::codegen::Bindings::TransformerBinding::{
20 Transformer, TransformerCancelCallback, TransformerFlushCallback, TransformerTransformCallback,
21};
22use crate::dom::bindings::error::{Error, ErrorToJsval, Fallible};
23use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object};
24use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom};
25use crate::dom::compressionstream::{
26 CompressionStream, compress_and_enqueue_a_chunk, compress_flush_and_enqueue,
27};
28use crate::dom::decompressionstream::{
29 decompress_and_enqueue_a_chunk, decompress_flush_and_enqueue,
30};
31use crate::dom::globalscope::GlobalScope;
32use crate::dom::promise::Promise;
33use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
34use crate::dom::textdecodercommon::TextDecoderCommon;
35use crate::dom::textdecoderstream::{decode_and_enqueue_a_chunk, flush_and_enqueue};
36use crate::dom::textencoderstream::{Encoder, encode_and_enqueue_a_chunk, encode_and_flush};
37use crate::dom::types::{DecompressionStream, TransformStream};
38use crate::realms::{InRealm, enter_realm};
39use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
40
41impl js::gc::Rootable for TransformTransformPromiseRejection {}
42
43/// Reacting to transformPromise as part of
44/// <https://streams.spec.whatwg.org/#transform-stream-default-controller-perform-transform>
45#[derive(JSTraceable, MallocSizeOf)]
46#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
47struct TransformTransformPromiseRejection {
48 controller: Dom<TransformStreamDefaultController>,
49}
50
51impl Callback for TransformTransformPromiseRejection {
52 /// Reacting to transformPromise with the following fulfillment steps:
53 fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
54 let can_gc = CanGc::from_cx(cx);
55 let cx: SafeJSContext = cx.into();
56 // Perform ! TransformStreamError(controller.[[stream]], r).
57 self.controller
58 .error(cx, &self.controller.global(), v, can_gc);
59
60 // Throw r.
61 // Note: this is done part of perform_transform().
62 }
63}
64
65/// The type of transformer algorithms we are using
66#[derive(JSTraceable)]
67pub(crate) enum TransformerType {
68 /// Algorithms provided by Js callbacks
69 Js {
70 /// <https://streams.spec.whatwg.org/#transformstreamdefaultcontroller-cancelalgorithm>
71 cancel: RefCell<Option<Rc<TransformerCancelCallback>>>,
72
73 /// <https://streams.spec.whatwg.org/#transformstreamdefaultcontroller-flushalgorithm>
74 flush: RefCell<Option<Rc<TransformerFlushCallback>>>,
75
76 /// <https://streams.spec.whatwg.org/#transformstreamdefaultcontroller-transformalgorithm>
77 transform: RefCell<Option<Rc<TransformerTransformCallback>>>,
78
79 /// The JS object used as `this` when invoking sink algorithms.
80 transform_obj: Heap<*mut JSObject>,
81 },
82 /// Algorithms supporting `TextDecoderStream` are implemented in Rust
83 ///
84 /// <https://encoding.spec.whatwg.org/#textdecodercommon>
85 Decoder(Rc<TextDecoderCommon>),
86 /// Algorithms supporting `TextEncoderStream` are implemented in Rust
87 ///
88 /// <https://encoding.spec.whatwg.org/#textencoderstream-encoder>
89 Encoder(Encoder),
90 /// Algorithms supporting `CompressionStream` are implemented in Rust
91 ///
92 /// <https://compression.spec.whatwg.org/#compressionstream>
93 Compressor(DomRoot<CompressionStream>),
94 /// Algorithms supporting `DecompressionStream` are implemented in Rust
95 ///
96 /// <https://compression.spec.whatwg.org/#decompressionstream>
97 Decompressor(DomRoot<DecompressionStream>),
98}
99
100impl TransformerType {
101 pub(crate) fn new_from_js_transformer(transformer: &Transformer) -> TransformerType {
102 TransformerType::Js {
103 cancel: RefCell::new(transformer.cancel.clone()),
104 flush: RefCell::new(transformer.flush.clone()),
105 transform: RefCell::new(transformer.transform.clone()),
106 transform_obj: Default::default(),
107 }
108 }
109}
110
111/// <https://streams.spec.whatwg.org/#transformstreamdefaultcontroller>
112#[dom_struct]
113pub struct TransformStreamDefaultController {
114 reflector_: Reflector,
115
116 /// The type of the underlying transformer used. Besides the JS variant,
117 /// there will be other variant(s) for `TextDecoderStream`
118 #[ignore_malloc_size_of = "transformer_type"]
119 transformer_type: TransformerType,
120
121 /// <https://streams.spec.whatwg.org/#TransformStreamDefaultController-stream>
122 stream: MutNullableDom<TransformStream>,
123
124 /// <https://streams.spec.whatwg.org/#transformstreamdefaultcontroller-finishpromise>
125 #[conditional_malloc_size_of]
126 finish_promise: DomRefCell<Option<Rc<Promise>>>,
127}
128
129impl TransformStreamDefaultController {
130 fn new_inherited(transformer_type: TransformerType) -> TransformStreamDefaultController {
131 TransformStreamDefaultController {
132 reflector_: Reflector::new(),
133 transformer_type,
134 stream: MutNullableDom::new(None),
135 finish_promise: DomRefCell::new(None),
136 }
137 }
138
139 pub(crate) fn new(
140 global: &GlobalScope,
141 transformer_type: TransformerType,
142 can_gc: CanGc,
143 ) -> DomRoot<TransformStreamDefaultController> {
144 reflect_dom_object(
145 Box::new(TransformStreamDefaultController::new_inherited(
146 transformer_type,
147 )),
148 global,
149 can_gc,
150 )
151 }
152
153 /// Setting the JS object after the heap has settled down.
154 ///
155 /// Note that this has no effect if the transformer type is not `TransformerType::Js`
156 pub(crate) fn set_transform_obj(&self, this_object: SafeHandleObject) {
157 if let TransformerType::Js { transform_obj, .. } = &self.transformer_type {
158 transform_obj.set(*this_object)
159 } else {
160 unreachable!("Non-Js transformer type should not set transform_obj")
161 }
162 }
163
164 pub(crate) fn set_stream(&self, stream: &TransformStream) {
165 self.stream.set(Some(stream));
166 }
167
168 pub(crate) fn get_finish_promise(&self) -> Option<Rc<Promise>> {
169 self.finish_promise.borrow().clone()
170 }
171
172 pub(crate) fn set_finish_promise(&self, promise: Rc<Promise>) {
173 *self.finish_promise.borrow_mut() = Some(promise);
174 }
175
176 /// <https://streams.spec.whatwg.org/#transform-stream-default-controller-perform-transform>
177 pub(crate) fn transform_stream_default_controller_perform_transform(
178 &self,
179 cx: SafeJSContext,
180 global: &GlobalScope,
181 chunk: SafeHandleValue,
182 can_gc: CanGc,
183 ) -> Fallible<Rc<Promise>> {
184 // Let transformPromise be the result of performing controller.[[transformAlgorithm]], passing chunk.
185 let transform_promise = self.perform_transform(cx, global, chunk, can_gc)?;
186
187 // Return the result of reacting to transformPromise with the following rejection steps given the argument r:
188 rooted!(in(*cx) let mut reject_handler = Some(TransformTransformPromiseRejection {
189 controller: Dom::from_ref(self),
190 }));
191
192 let handler = PromiseNativeHandler::new(
193 global,
194 None,
195 reject_handler.take().map(|h| Box::new(h) as Box<_>),
196 can_gc,
197 );
198 let realm = enter_realm(global);
199 let comp = InRealm::Entered(&realm);
200 transform_promise.append_native_handler(&handler, comp, can_gc);
201
202 Ok(transform_promise)
203 }
204
205 pub(crate) fn perform_transform(
206 &self,
207 cx: SafeJSContext,
208 global: &GlobalScope,
209 chunk: SafeHandleValue,
210 can_gc: CanGc,
211 ) -> Fallible<Rc<Promise>> {
212 let result = match &self.transformer_type {
213 // <https://streams.spec.whatwg.org/#set-up-transform-stream-default-controller-from-transformer>
214 TransformerType::Js {
215 transform,
216 transform_obj,
217 ..
218 } => {
219 // Step 5. If transformerDict["transform"] exists, set
220 // transformAlgorithm to an algorithm which takes an argument
221 // chunk and returns the result of invoking
222 // transformerDict["transform"] with argument list « chunk,
223 // controller » and callback this value transformer.
224 let algo = transform.borrow().clone();
225 if let Some(transform) = algo {
226 rooted!(in(*cx) let this_object = transform_obj.get());
227 transform
228 .Call_(
229 &this_object.handle(),
230 chunk,
231 self,
232 ExceptionHandling::Rethrow,
233 can_gc,
234 )
235 .unwrap_or_else(|e| {
236 let p = Promise::new(global, can_gc);
237 p.reject_error(e, can_gc);
238 p
239 })
240 } else {
241 // Step 2. Let transformAlgorithm be the following steps, taking a chunk argument:
242 // Let result be TransformStreamDefaultControllerEnqueue(controller, chunk).
243 // If result is an abrupt completion, return a promise rejected with result.[[Value]].
244 if let Err(error) = self.enqueue(cx, global, chunk, can_gc) {
245 rooted!(in(*cx) let mut error_val = UndefinedValue());
246 error.to_jsval(cx, global, error_val.handle_mut(), can_gc);
247 Promise::new_rejected(global, cx, error_val.handle(), can_gc)
248 } else {
249 // Otherwise, return a promise resolved with undefined.
250 Promise::new_resolved(global, cx, (), can_gc)
251 }
252 }
253 },
254 TransformerType::Decoder(decoder) => {
255 // <https://encoding.spec.whatwg.org/#dom-textdecoderstream>
256 // Step 7. Let transformAlgorithm be an algorithm which takes a
257 // chunk argument and runs the decode and enqueue a chunk
258 // algorithm with this and chunk.
259 decode_and_enqueue_a_chunk(cx, global, chunk, decoder, self, can_gc)
260 // <https://streams.spec.whatwg.org/#transformstream-set-up>
261 // Step 5. Let transformAlgorithmWrapper be an algorithm that runs these steps given a value chunk:
262 // Step 5.1 Let result be the result of running transformAlgorithm given chunk.
263 // Step 5.2 If result is a Promise, then return result.
264 // Note: not applicable, the spec does NOT require deode_and_enqueue_a_chunk() to return a Promise
265 // Step 5.3 Return a promise resolved with undefined.
266 .map(|_| Promise::new_resolved(global, cx, (), can_gc))
267 .unwrap_or_else(|e| {
268 // <https://streams.spec.whatwg.org/#transformstream-set-up>
269 // Step 5.1 If this throws an exception e,
270 let realm = enter_realm(self);
271 let p = Promise::new_in_current_realm((&realm).into(), can_gc);
272 // return a promise rejected with e.
273 p.reject_error(e, can_gc);
274 p
275 })
276 },
277 TransformerType::Encoder(encoder) => {
278 // <https://encoding.spec.whatwg.org/#dom-textencoderstream>
279 // Step 2. Let transformAlgorithm be an algorithm which takes a chunk argument and runs the encode
280 // and enqueue a chunk algorithm with this and chunk.
281 encode_and_enqueue_a_chunk(cx, global, chunk, encoder, self, can_gc)
282 // <https://streams.spec.whatwg.org/#transformstream-set-up>
283 // Step 5. Let transformAlgorithmWrapper be an algorithm that runs these steps given a value chunk:
284 // Step 5.1 Let result be the result of running transformAlgorithm given chunk.
285 // Step 5.2 If result is a Promise, then return result.
286 // Note: not applicable, the spec does NOT require encode_and_enqueue_a_chunk() to return a Promise
287 // Step 5.3 Return a promise resolved with undefined.
288 .map(|_| Promise::new_resolved(global, cx, (), can_gc))
289 .unwrap_or_else(|e| {
290 // <https://streams.spec.whatwg.org/#transformstream-set-up>
291 // Step 5.1 If this throws an exception e,
292 let realm = enter_realm(self);
293 let p = Promise::new_in_current_realm((&realm).into(), can_gc);
294 // return a promise rejected with e.
295 p.reject_error(e, can_gc);
296 p
297 })
298 },
299 TransformerType::Compressor(cs) => {
300 // <https://compression.spec.whatwg.org/#dom-compressionstream-compressionstream>
301 // Step 3. Let transformAlgorithm be an algorithm which takes a chunk argument and
302 // runs the compress and enqueue a chunk algorithm with this and chunk.
303 compress_and_enqueue_a_chunk(cx, global, cs, chunk, self, can_gc)
304 // <https://streams.spec.whatwg.org/#transformstream-set-up>
305 // Step 5. Let transformAlgorithmWrapper be an algorithm that runs these steps given a value chunk:
306 // Step 5.1 Let result be the result of running transformAlgorithm given chunk.
307 // Step 5.2 If result is a Promise, then return result.
308 // Note: not applicable, the spec does NOT require
309 // compress_and_enqueue_a_chunk() to return a Promise.
310 // Step 5.3 Return a promise resolved with undefined.
311 .map(|_| Promise::new_resolved(global, cx, (), can_gc))
312 .unwrap_or_else(|e| {
313 // <https://streams.spec.whatwg.org/#transformstream-set-up>
314 // Step 5.1 If this throws an exception e,
315 let realm = enter_realm(self);
316 let p = Promise::new_in_current_realm((&realm).into(), can_gc);
317 // return a promise rejected with e.
318 p.reject_error(e, can_gc);
319 p
320 })
321 },
322 TransformerType::Decompressor(ds) => {
323 // <https://compression.spec.whatwg.org/#dom-decompressionstream-decompressionstream>
324 // Step 3. Let transformAlgorithm be an algorithm which takes a chunk argument and
325 // runs the decompress and enqueue a chunk algorithm with this and chunk.
326 decompress_and_enqueue_a_chunk(cx, global, ds, chunk, self, can_gc)
327 // <https://streams.spec.whatwg.org/#transformstream-set-up>
328 // Step 5. Let transformAlgorithmWrapper be an algorithm that runs these steps given a value chunk:
329 // Step 5.1 Let result be the result of running transformAlgorithm given chunk.
330 // Step 5.2 If result is a Promise, then return result.
331 // Note: not applicable, the spec does NOT require
332 // decompress_and_enqueue_a_chunk() to return a Promise
333 // Step 5.3 Return a promise resolved with undefined.
334 .map(|_| Promise::new_resolved(global, cx, (), can_gc))
335 .unwrap_or_else(|e| {
336 // <https://streams.spec.whatwg.org/#transformstream-set-up>
337 // Step 5.1 If this throws an exception e,
338 let realm = enter_realm(self);
339 let p = Promise::new_in_current_realm((&realm).into(), can_gc);
340 // return a promise rejected with e.
341 p.reject_error(e, can_gc);
342 p
343 })
344 },
345 };
346
347 Ok(result)
348 }
349
350 pub(crate) fn perform_cancel(
351 &self,
352 cx: SafeJSContext,
353 global: &GlobalScope,
354 chunk: SafeHandleValue,
355 can_gc: CanGc,
356 ) -> Fallible<Rc<Promise>> {
357 let result = match &self.transformer_type {
358 // <https://streams.spec.whatwg.org/#set-up-transform-stream-default-controller-from-transformer>
359 TransformerType::Js {
360 cancel,
361 transform_obj,
362 ..
363 } => {
364 // Step 7. If transformerDict["cancel"] exists, set
365 // cancelAlgorithm to an algorithm which takes an argument
366 // reason and returns the result of invoking
367 // transformerDict["cancel"] with argument list « reason » and
368 // callback this value transformer.
369 let algo = cancel.borrow().clone();
370 if let Some(cancel) = algo {
371 rooted!(in(*cx) let this_object = transform_obj.get());
372 cancel
373 .Call_(
374 &this_object.handle(),
375 chunk,
376 ExceptionHandling::Rethrow,
377 can_gc,
378 )
379 .unwrap_or_else(|e| {
380 let p = Promise::new(global, can_gc);
381 p.reject_error(e, can_gc);
382 p
383 })
384 } else {
385 // Step 4. Let cancelAlgorithm be an algorithm which returns a promise resolved with undefined.
386 Promise::new_resolved(global, cx, (), can_gc)
387 }
388 },
389 TransformerType::Decoder(_) => {
390 // <https://streams.spec.whatwg.org/#transformstream-set-up>
391 // Step 7. Let cancelAlgorithmWrapper be an algorithm that runs these steps given a value reason:
392 // Step 7.1 Let result be the result of running cancelAlgorithm given reason,
393 // if cancelAlgorithm was given, or null otherwise
394 // Note: `TextDecoderStream` does NOT specify a cancel algorithm.
395 // Step 7.2 If result is a Promise, then return result.
396 // Note: Not applicable.
397 // Step 7.3 Return a promise resolved with undefined.
398 Promise::new_resolved(global, cx, (), can_gc)
399 },
400 TransformerType::Encoder(_) => {
401 // <https://streams.spec.whatwg.org/#transformstream-set-up>
402 // Step 7. Let cancelAlgorithmWrapper be an algorithm that runs these steps given a value reason:
403 // Step 7.1 Let result be the result of running cancelAlgorithm given reason,
404 // if cancelAlgorithm was given, or null otherwise
405 // Note: `TextDecoderStream` does NOT specify a cancel algorithm.
406 // Step 7.2 If result is a Promise, then return result.
407 // Note: Not applicable.
408 // Step 7.3 Return a promise resolved with undefined.
409 Promise::new_resolved(global, cx, (), can_gc)
410 },
411 TransformerType::Compressor(_) => {
412 // <https://streams.spec.whatwg.org/#transformstream-set-up>
413 // Step 7. Let cancelAlgorithmWrapper be an algorithm that runs these steps given a value reason:
414 // Step 7.1 Let result be the result of running cancelAlgorithm given reason,
415 // if cancelAlgorithm was given, or null otherwise
416 // Note: `CompressionStream` does NOT specify a cancel algorithm.
417 // Step 7.2 If result is a Promise, then return result.
418 // Note: Not applicable.
419 // Step 7.3 Return a promise resolved with undefined.
420 Promise::new_resolved(global, cx, (), can_gc)
421 },
422 TransformerType::Decompressor(_) => {
423 // <https://streams.spec.whatwg.org/#transformstream-set-up>
424 // Step 7. Let cancelAlgorithmWrapper be an algorithm that runs these steps given a value reason:
425 // Step 7.1 Let result be the result of running cancelAlgorithm given reason,
426 // if cancelAlgorithm was given, or null otherwise
427 // Note: `DecompressionStream` does NOT specify a cancel algorithm.
428 // Step 7.2 If result is a Promise, then return result.
429 // Note: Not applicable.
430 // Step 7.3 Return a promise resolved with undefined.
431 Promise::new_resolved(global, cx, (), can_gc)
432 },
433 };
434
435 Ok(result)
436 }
437
438 pub(crate) fn perform_flush(
439 &self,
440 cx: SafeJSContext,
441 global: &GlobalScope,
442 can_gc: CanGc,
443 ) -> Fallible<Rc<Promise>> {
444 let result = match &self.transformer_type {
445 // <https://streams.spec.whatwg.org/#set-up-transform-stream-default-controller-from-transformer>
446 TransformerType::Js {
447 flush,
448 transform_obj,
449 ..
450 } => {
451 // Step 6. If transformerDict["flush"] exists, set flushAlgorithm to an
452 // algorithm which returns the result of invoking
453 // transformerDict["flush"] with argument list « controller »
454 // and callback this value transformer.
455 let algo = flush.borrow().clone();
456 if let Some(flush) = algo {
457 rooted!(in(*cx) let this_object = transform_obj.get());
458 flush
459 .Call_(
460 &this_object.handle(),
461 self,
462 ExceptionHandling::Rethrow,
463 can_gc,
464 )
465 .unwrap_or_else(|e| {
466 let p = Promise::new(global, can_gc);
467 p.reject_error(e, can_gc);
468 p
469 })
470 } else {
471 // Step 3. Let flushAlgorithm be an algorithm which returns a promise resolved with undefined.
472 Promise::new_resolved(global, cx, (), can_gc)
473 }
474 },
475 TransformerType::Decoder(decoder) => {
476 // <https://encoding.spec.whatwg.org/#dom-textdecoderstream>
477 // Step 8. Let flushAlgorithm be an algorithm which takes no
478 // arguments and runs the flush and enqueue algorithm with this.
479 flush_and_enqueue(cx, global, decoder, self, can_gc)
480 // <https://streams.spec.whatwg.org/#transformstream-set-up>
481 // Step 6. Let flushAlgorithmWrapper be an algorithm that runs these steps:
482 // Step 6.1 Let result be the result of running flushAlgorithm,
483 // if flushAlgorithm was given, or null otherwise.
484 // Step 6.2 If result is a Promise, then return result.
485 // Note: Not applicable. The spec does NOT require flush_and_enqueue algo to return a Promise
486 // Step 6.3 Return a promise resolved with undefined.
487 .map(|_| Promise::new_resolved(global, cx, (), can_gc))
488 .unwrap_or_else(|e| {
489 // <https://streams.spec.whatwg.org/#transformstream-set-up>
490 // Step 6.1 If this throws an exception e,
491 let realm = enter_realm(self);
492 let p = Promise::new_in_current_realm((&realm).into(), can_gc);
493 // return a promise rejected with e.
494 p.reject_error(e, can_gc);
495 p
496 })
497 },
498 TransformerType::Encoder(encoder) => {
499 // <https://encoding.spec.whatwg.org/#textencoderstream-encoder>
500 // Step 3. Let flushAlgorithm be an algorithm which runs the encode and flush algorithm with this.
501 encode_and_flush(cx, global, encoder, self, can_gc)
502 // <https://streams.spec.whatwg.org/#transformstream-set-up>
503 // Step 6. Let flushAlgorithmWrapper be an algorithm that runs these steps:
504 // Step 6.1 Let result be the result of running flushAlgorithm,
505 // if flushAlgorithm was given, or null otherwise.
506 // Step 6.2 If result is a Promise, then return result.
507 // Note: Not applicable. The spec does NOT require encode_and_flush algo to return a Promise
508 // Step 6.3 Return a promise resolved with undefined.
509 .map(|_| Promise::new_resolved(global, cx, (), can_gc))
510 .unwrap_or_else(|e| {
511 // <https://streams.spec.whatwg.org/#transformstream-set-up>
512 // Step 6.1 If this throws an exception e,
513 let realm = enter_realm(self);
514 let p = Promise::new_in_current_realm((&realm).into(), can_gc);
515 // return a promise rejected with e.
516 p.reject_error(e, can_gc);
517 p
518 })
519 },
520 TransformerType::Compressor(cs) => {
521 // <https://compression.spec.whatwg.org/#dom-compressionstream-compressionstream>
522 // Step 4. Let flushAlgorithm be an algorithm which takes no argument and runs the
523 // compress flush and enqueue algorithm with this.
524 compress_flush_and_enqueue(cx, global, cs, self, can_gc)
525 // <https://streams.spec.whatwg.org/#transformstream-set-up>
526 // Step 6. Let flushAlgorithmWrapper be an algorithm that runs these steps:
527 // Step 6.1 Let result be the result of running flushAlgorithm,
528 // if flushAlgorithm was given, or null otherwise.
529 // Step 6.2 If result is a Promise, then return result.
530 // Note: Not applicable. The spec does NOT require compress_flush_and_enqueue
531 // algo to return a Promise.
532 // Step 6.3 Return a promise resolved with undefined.
533 .map(|_| Promise::new_resolved(global, cx, (), can_gc))
534 .unwrap_or_else(|e| {
535 // <https://streams.spec.whatwg.org/#transformstream-set-up>
536 // Step 6.1 If this throws an exception e,
537 let realm = enter_realm(self);
538 let p = Promise::new_in_current_realm((&realm).into(), can_gc);
539 // return a promise rejected with e.
540 p.reject_error(e, can_gc);
541 p
542 })
543 },
544 TransformerType::Decompressor(ds) => {
545 // <https://compression.spec.whatwg.org/#dom-decompressionstream-decompressionstream>
546 // Step 4. Let flushAlgorithm be an algorithm which takes no argument and runs the
547 // decompress flush and enqueue algorithm with this.
548 decompress_flush_and_enqueue(cx, global, ds, self, can_gc)
549 // <https://streams.spec.whatwg.org/#transformstream-set-up>
550 // Step 6. Let flushAlgorithmWrapper be an algorithm that runs these steps:
551 // Step 6.1 Let result be the result of running flushAlgorithm,
552 // if flushAlgorithm was given, or null otherwise.
553 // Step 6.2 If result is a Promise, then return result.
554 // Note: Not applicable. The spec does NOT require decompress_flush_and_enqueue
555 // algo to return a Promise.
556 // Step 6.3 Return a promise resolved with undefined.
557 .map(|_| Promise::new_resolved(global, cx, (), can_gc))
558 .unwrap_or_else(|e| {
559 // <https://streams.spec.whatwg.org/#transformstream-set-up>
560 // Step 6.1 If this throws an exception e,
561 let realm = enter_realm(self);
562 let p = Promise::new_in_current_realm((&realm).into(), can_gc);
563 // return a promise rejected with e.
564 p.reject_error(e, can_gc);
565 p
566 })
567 },
568 };
569
570 Ok(result)
571 }
572
573 /// <https://streams.spec.whatwg.org/#transform-stream-default-controller-enqueue>
574 #[expect(unsafe_code)]
575 pub(crate) fn enqueue(
576 &self,
577 cx: SafeJSContext,
578 global: &GlobalScope,
579 chunk: SafeHandleValue,
580 can_gc: CanGc,
581 ) -> Fallible<()> {
582 // Let stream be controller.[[stream]].
583 let stream = self.stream.get().expect("stream is null");
584
585 // Let readableController be stream.[[readable]].[[controller]].
586 let readable = stream.get_readable();
587 let readable_controller = readable.get_default_controller();
588
589 // If ! ReadableStreamDefaultControllerCanCloseOrEnqueue(readableController)
590 // is false, throw a TypeError exception.
591 if !readable_controller.can_close_or_enqueue() {
592 return Err(Error::Type(
593 "ReadableStreamDefaultControllerCanCloseOrEnqueue is false".to_owned(),
594 ));
595 }
596
597 // Let enqueueResult be ReadableStreamDefaultControllerEnqueue(readableController, chunk).
598 // If enqueueResult is an abrupt completion,
599 if let Err(error) = readable_controller.enqueue(cx, chunk, can_gc) {
600 // Perform ! TransformStreamErrorWritableAndUnblockWrite(stream, enqueueResult.[[Value]]).
601 rooted!(in(*cx) let mut rooted_error = UndefinedValue());
602 error
603 .clone()
604 .to_jsval(cx, global, rooted_error.handle_mut(), can_gc);
605 stream.error_writable_and_unblock_write(cx, global, rooted_error.handle(), can_gc);
606
607 // Throw stream.[[readable]].[[storedError]].
608 unsafe {
609 if !JS_IsExceptionPending(*cx) {
610 rooted!(in(*cx) let mut stored_error = UndefinedValue());
611 readable.get_stored_error(stored_error.handle_mut());
612
613 JS_SetPendingException(
614 *cx,
615 stored_error.handle().into(),
616 ExceptionStackBehavior::Capture,
617 );
618 }
619 }
620 return Err(error);
621 }
622
623 // Let backpressure be ! ReadableStreamDefaultControllerHasBackpressure(readableController).
624 let backpressure = readable_controller.has_backpressure();
625
626 // If backpressure is not stream.[[backpressure]],
627 if backpressure != stream.get_backpressure() {
628 // Assert: backpressure is true.
629 assert!(backpressure);
630
631 // Perform ! TransformStreamSetBackpressure(stream, true).
632 stream.set_backpressure(global, true, can_gc);
633 }
634 Ok(())
635 }
636
637 /// <https://streams.spec.whatwg.org/#transform-stream-default-controller-error>
638 pub(crate) fn error(
639 &self,
640 cx: SafeJSContext,
641 global: &GlobalScope,
642 reason: SafeHandleValue,
643 can_gc: CanGc,
644 ) {
645 // Perform ! TransformStreamError(controller.[[stream]], e).
646 self.stream
647 .get()
648 .expect("stream is undefined")
649 .error(cx, global, reason, can_gc);
650 }
651
652 /// <https://streams.spec.whatwg.org/#transform-stream-default-controller-clear-algorithms>
653 pub(crate) fn clear_algorithms(&self) {
654 if let TransformerType::Js {
655 cancel,
656 flush,
657 transform,
658 ..
659 } = &self.transformer_type
660 {
661 // Set controller.[[transformAlgorithm]] to undefined.
662 transform.replace(None);
663
664 // Set controller.[[flushAlgorithm]] to undefined.
665 flush.replace(None);
666
667 // Set controller.[[cancelAlgorithm]] to undefined.
668 cancel.replace(None);
669 }
670 }
671
672 /// <https://streams.spec.whatwg.org/#transform-stream-default-controller-terminate>
673 pub(crate) fn terminate(&self, cx: SafeJSContext, global: &GlobalScope, can_gc: CanGc) {
674 // Let stream be controller.[[stream]].
675 let stream = self.stream.get().expect("stream is null");
676
677 // Let readableController be stream.[[readable]].[[controller]].
678 let readable = stream.get_readable();
679 let readable_controller = readable.get_default_controller();
680
681 // Perform ! ReadableStreamDefaultControllerClose(readableController).
682 readable_controller.close(can_gc);
683
684 // Let error be a TypeError exception indicating that the stream has been terminated.
685 let error = Error::Type("stream has been terminated".to_owned());
686
687 // Perform ! TransformStreamErrorWritableAndUnblockWrite(stream, error).
688 rooted!(in(*cx) let mut rooted_error = UndefinedValue());
689 error.to_jsval(cx, global, rooted_error.handle_mut(), can_gc);
690 stream.error_writable_and_unblock_write(cx, global, rooted_error.handle(), can_gc);
691 }
692}
693
694impl TransformStreamDefaultControllerMethods<crate::DomTypeHolder>
695 for TransformStreamDefaultController
696{
697 /// <https://streams.spec.whatwg.org/#ts-default-controller-desired-size>
698 fn GetDesiredSize(&self) -> Option<f64> {
699 // Let readableController be this.[[stream]].[[readable]].[[controller]].
700 let readable_controller = self
701 .stream
702 .get()
703 .expect("stream is null")
704 .get_readable()
705 .get_default_controller();
706
707 // Return ! ReadableStreamDefaultControllerGetDesiredSize(readableController).
708 readable_controller.get_desired_size()
709 }
710
711 /// <https://streams.spec.whatwg.org/#ts-default-controller-enqueue>
712 fn Enqueue(&self, cx: SafeJSContext, chunk: SafeHandleValue, can_gc: CanGc) -> Fallible<()> {
713 // Perform ? TransformStreamDefaultControllerEnqueue(this, chunk).
714 self.enqueue(cx, &self.global(), chunk, can_gc)
715 }
716
717 /// <https://streams.spec.whatwg.org/#ts-default-controller-error>
718 fn Error(&self, cx: SafeJSContext, reason: SafeHandleValue, can_gc: CanGc) -> Fallible<()> {
719 // Perform ? TransformStreamDefaultControllerError(this, e).
720 self.error(cx, &self.global(), reason, can_gc);
721 Ok(())
722 }
723
724 /// <https://streams.spec.whatwg.org/#ts-default-controller-terminate>
725 fn Terminate(&self, can_gc: CanGc) -> Fallible<()> {
726 // Perform ? TransformStreamDefaultControllerTerminate(this).
727 self.terminate(GlobalScope::get_cx(), &self.global(), can_gc);
728 Ok(())
729 }
730}