script/dom/transformstreamdefaultcontroller.rs
1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
4
5use std::cell::RefCell;
6use std::rc::Rc;
7
8use dom_struct::dom_struct;
9use js::jsapi::{
10 ExceptionStackBehavior, Heap, JS_IsExceptionPending, JS_SetPendingException, JSObject,
11};
12use js::jsval::UndefinedValue;
13use js::rust::{HandleObject as SafeHandleObject, HandleValue as SafeHandleValue};
14
15use super::bindings::cell::DomRefCell;
16use super::bindings::codegen::Bindings::TransformerBinding::{
17 TransformerCancelCallback, TransformerFlushCallback, TransformerTransformCallback,
18};
19use super::types::TransformStream;
20use crate::dom::bindings::callback::ExceptionHandling;
21use crate::dom::bindings::codegen::Bindings::TransformStreamDefaultControllerBinding::TransformStreamDefaultControllerMethods;
22use crate::dom::bindings::codegen::Bindings::TransformerBinding::Transformer;
23use crate::dom::bindings::error::{Error, ErrorToJsval, Fallible};
24use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object};
25use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom};
26use crate::dom::compressionstream::{
27 CompressionStream, compress_and_enqueue_a_chunk, compress_flush_and_enqueue,
28};
29use crate::dom::decompressionstream::{
30 decompress_and_enqueue_a_chunk, decompress_flush_and_enqueue,
31};
32use crate::dom::globalscope::GlobalScope;
33use crate::dom::promise::Promise;
34use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
35use crate::dom::textdecodercommon::TextDecoderCommon;
36use crate::dom::textdecoderstream::{decode_and_enqueue_a_chunk, flush_and_enqueue};
37use crate::dom::textencoderstream::{Encoder, encode_and_enqueue_a_chunk, encode_and_flush};
38use crate::dom::types::DecompressionStream;
39use crate::realms::{InRealm, enter_realm};
40use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
41
42impl js::gc::Rootable for TransformTransformPromiseRejection {}
43
44/// Reacting to transformPromise as part of
45/// <https://streams.spec.whatwg.org/#transform-stream-default-controller-perform-transform>
46#[derive(JSTraceable, MallocSizeOf)]
47#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
48struct TransformTransformPromiseRejection {
49 controller: Dom<TransformStreamDefaultController>,
50}
51
52impl Callback for TransformTransformPromiseRejection {
53 /// Reacting to transformPromise with the following fulfillment steps:
54 fn callback(&self, cx: SafeJSContext, v: SafeHandleValue, _realm: InRealm, can_gc: CanGc) {
55 // Perform ! TransformStreamError(controller.[[stream]], r).
56 self.controller
57 .error(cx, &self.controller.global(), v, can_gc);
58
59 // Throw r.
60 // Note: this is done part of perform_transform().
61 }
62}
63
64/// The type of transformer algorithms we are using
65#[derive(JSTraceable)]
66pub(crate) enum TransformerType {
67 /// Algorithms provided by Js callbacks
68 Js {
69 /// <https://streams.spec.whatwg.org/#transformstreamdefaultcontroller-cancelalgorithm>
70 cancel: RefCell<Option<Rc<TransformerCancelCallback>>>,
71
72 /// <https://streams.spec.whatwg.org/#transformstreamdefaultcontroller-flushalgorithm>
73 flush: RefCell<Option<Rc<TransformerFlushCallback>>>,
74
75 /// <https://streams.spec.whatwg.org/#transformstreamdefaultcontroller-transformalgorithm>
76 transform: RefCell<Option<Rc<TransformerTransformCallback>>>,
77
78 /// The JS object used as `this` when invoking sink algorithms.
79 transform_obj: Heap<*mut JSObject>,
80 },
81 /// Algorithms supporting `TextDecoderStream` are implemented in Rust
82 ///
83 /// <https://encoding.spec.whatwg.org/#textdecodercommon>
84 Decoder(Rc<TextDecoderCommon>),
85 /// Algorithms supporting `TextEncoderStream` are implemented in Rust
86 ///
87 /// <https://encoding.spec.whatwg.org/#textencoderstream-encoder>
88 Encoder(Encoder),
89 /// Algorithms supporting `CompressionStream` are implemented in Rust
90 ///
91 /// <https://compression.spec.whatwg.org/#compressionstream>
92 Compressor(DomRoot<CompressionStream>),
93 /// Algorithms supporting `DecompressionStream` are implemented in Rust
94 ///
95 /// <https://compression.spec.whatwg.org/#decompressionstream>
96 Decompressor(DomRoot<DecompressionStream>),
97}
98
99impl TransformerType {
100 pub(crate) fn new_from_js_transformer(transformer: &Transformer) -> TransformerType {
101 TransformerType::Js {
102 cancel: RefCell::new(transformer.cancel.clone()),
103 flush: RefCell::new(transformer.flush.clone()),
104 transform: RefCell::new(transformer.transform.clone()),
105 transform_obj: Default::default(),
106 }
107 }
108}
109
110/// <https://streams.spec.whatwg.org/#transformstreamdefaultcontroller>
111#[dom_struct]
112pub struct TransformStreamDefaultController {
113 reflector_: Reflector,
114
115 /// The type of the underlying transformer used. Besides the JS variant,
116 /// there will be other variant(s) for `TextDecoderStream`
117 #[ignore_malloc_size_of = "transformer_type"]
118 transformer_type: TransformerType,
119
120 /// <https://streams.spec.whatwg.org/#TransformStreamDefaultController-stream>
121 stream: MutNullableDom<TransformStream>,
122
123 /// <https://streams.spec.whatwg.org/#transformstreamdefaultcontroller-finishpromise>
124 #[conditional_malloc_size_of]
125 finish_promise: DomRefCell<Option<Rc<Promise>>>,
126}
127
128impl TransformStreamDefaultController {
129 #[cfg_attr(crown, allow(crown::unrooted_must_root))]
130 fn new_inherited(transformer_type: TransformerType) -> TransformStreamDefaultController {
131 TransformStreamDefaultController {
132 reflector_: Reflector::new(),
133 transformer_type,
134 stream: MutNullableDom::new(None),
135 finish_promise: DomRefCell::new(None),
136 }
137 }
138
139 #[cfg_attr(crown, allow(crown::unrooted_must_root))]
140 pub(crate) fn new(
141 global: &GlobalScope,
142 transformer_type: TransformerType,
143 can_gc: CanGc,
144 ) -> DomRoot<TransformStreamDefaultController> {
145 reflect_dom_object(
146 Box::new(TransformStreamDefaultController::new_inherited(
147 transformer_type,
148 )),
149 global,
150 can_gc,
151 )
152 }
153
154 /// Setting the JS object after the heap has settled down.
155 ///
156 /// Note that this has no effect if the transformer type is not `TransformerType::Js`
157 pub(crate) fn set_transform_obj(&self, this_object: SafeHandleObject) {
158 if let TransformerType::Js { transform_obj, .. } = &self.transformer_type {
159 transform_obj.set(*this_object)
160 } else {
161 unreachable!("Non-Js transformer type should not set transform_obj")
162 }
163 }
164
165 pub(crate) fn set_stream(&self, stream: &TransformStream) {
166 self.stream.set(Some(stream));
167 }
168
169 pub(crate) fn get_finish_promise(&self) -> Option<Rc<Promise>> {
170 self.finish_promise.borrow().clone()
171 }
172
173 pub(crate) fn set_finish_promise(&self, promise: Rc<Promise>) {
174 *self.finish_promise.borrow_mut() = Some(promise);
175 }
176
177 /// <https://streams.spec.whatwg.org/#transform-stream-default-controller-perform-transform>
178 pub(crate) fn transform_stream_default_controller_perform_transform(
179 &self,
180 cx: SafeJSContext,
181 global: &GlobalScope,
182 chunk: SafeHandleValue,
183 can_gc: CanGc,
184 ) -> Fallible<Rc<Promise>> {
185 // Let transformPromise be the result of performing controller.[[transformAlgorithm]], passing chunk.
186 let transform_promise = self.perform_transform(cx, global, chunk, can_gc)?;
187
188 // Return the result of reacting to transformPromise with the following rejection steps given the argument r:
189 rooted!(in(*cx) let mut reject_handler = Some(TransformTransformPromiseRejection {
190 controller: Dom::from_ref(self),
191 }));
192
193 let handler = PromiseNativeHandler::new(
194 global,
195 None,
196 reject_handler.take().map(|h| Box::new(h) as Box<_>),
197 can_gc,
198 );
199 let realm = enter_realm(global);
200 let comp = InRealm::Entered(&realm);
201 transform_promise.append_native_handler(&handler, comp, can_gc);
202
203 Ok(transform_promise)
204 }
205
206 pub(crate) fn perform_transform(
207 &self,
208 cx: SafeJSContext,
209 global: &GlobalScope,
210 chunk: SafeHandleValue,
211 can_gc: CanGc,
212 ) -> Fallible<Rc<Promise>> {
213 let result = match &self.transformer_type {
214 // <https://streams.spec.whatwg.org/#set-up-transform-stream-default-controller-from-transformer>
215 TransformerType::Js {
216 transform,
217 transform_obj,
218 ..
219 } => {
220 // Step 5. If transformerDict["transform"] exists, set
221 // transformAlgorithm to an algorithm which takes an argument
222 // chunk and returns the result of invoking
223 // transformerDict["transform"] with argument list « chunk,
224 // controller » and callback this value transformer.
225 let algo = transform.borrow().clone();
226 if let Some(transform) = algo {
227 rooted!(in(*cx) let this_object = transform_obj.get());
228 transform
229 .Call_(
230 &this_object.handle(),
231 chunk,
232 self,
233 ExceptionHandling::Rethrow,
234 can_gc,
235 )
236 .unwrap_or_else(|e| {
237 let p = Promise::new(global, can_gc);
238 p.reject_error(e, can_gc);
239 p
240 })
241 } else {
242 // Step 2. Let transformAlgorithm be the following steps, taking a chunk argument:
243 // Let result be TransformStreamDefaultControllerEnqueue(controller, chunk).
244 // If result is an abrupt completion, return a promise rejected with result.[[Value]].
245 if let Err(error) = self.enqueue(cx, global, chunk, can_gc) {
246 rooted!(in(*cx) let mut error_val = UndefinedValue());
247 error.to_jsval(cx, global, error_val.handle_mut(), can_gc);
248 Promise::new_rejected(global, cx, error_val.handle(), can_gc)
249 } else {
250 // Otherwise, return a promise resolved with undefined.
251 Promise::new_resolved(global, cx, (), can_gc)
252 }
253 }
254 },
255 TransformerType::Decoder(decoder) => {
256 // <https://encoding.spec.whatwg.org/#dom-textdecoderstream>
257 // Step 7. Let transformAlgorithm be an algorithm which takes a
258 // chunk argument and runs the decode and enqueue a chunk
259 // algorithm with this and chunk.
260 decode_and_enqueue_a_chunk(cx, global, chunk, decoder, self, can_gc)
261 // <https://streams.spec.whatwg.org/#transformstream-set-up>
262 // Step 5. Let transformAlgorithmWrapper be an algorithm that runs these steps given a value chunk:
263 // Step 5.1 Let result be the result of running transformAlgorithm given chunk.
264 // Step 5.2 If result is a Promise, then return result.
265 // Note: not applicable, the spec does NOT require deode_and_enqueue_a_chunk() to return a Promise
266 // Step 5.3 Return a promise resolved with undefined.
267 .map(|_| Promise::new_resolved(global, cx, (), can_gc))
268 .unwrap_or_else(|e| {
269 // <https://streams.spec.whatwg.org/#transformstream-set-up>
270 // Step 5.1 If this throws an exception e,
271 let realm = enter_realm(self);
272 let p = Promise::new_in_current_realm((&realm).into(), can_gc);
273 // return a promise rejected with e.
274 p.reject_error(e, can_gc);
275 p
276 })
277 },
278 TransformerType::Encoder(encoder) => {
279 // <https://encoding.spec.whatwg.org/#dom-textencoderstream>
280 // Step 2. Let transformAlgorithm be an algorithm which takes a chunk argument and runs the encode
281 // and enqueue a chunk algorithm with this and chunk.
282 encode_and_enqueue_a_chunk(cx, global, chunk, encoder, self, can_gc)
283 // <https://streams.spec.whatwg.org/#transformstream-set-up>
284 // Step 5. Let transformAlgorithmWrapper be an algorithm that runs these steps given a value chunk:
285 // Step 5.1 Let result be the result of running transformAlgorithm given chunk.
286 // Step 5.2 If result is a Promise, then return result.
287 // Note: not applicable, the spec does NOT require encode_and_enqueue_a_chunk() to return a Promise
288 // Step 5.3 Return a promise resolved with undefined.
289 .map(|_| Promise::new_resolved(global, cx, (), can_gc))
290 .unwrap_or_else(|e| {
291 // <https://streams.spec.whatwg.org/#transformstream-set-up>
292 // Step 5.1 If this throws an exception e,
293 let realm = enter_realm(self);
294 let p = Promise::new_in_current_realm((&realm).into(), can_gc);
295 // return a promise rejected with e.
296 p.reject_error(e, can_gc);
297 p
298 })
299 },
300 TransformerType::Compressor(cs) => {
301 // <https://compression.spec.whatwg.org/#dom-compressionstream-compressionstream>
302 // Step 3. Let transformAlgorithm be an algorithm which takes a chunk argument and
303 // runs the compress and enqueue a chunk algorithm with this and chunk.
304 compress_and_enqueue_a_chunk(cx, global, cs, chunk, self, can_gc)
305 // <https://streams.spec.whatwg.org/#transformstream-set-up>
306 // Step 5. Let transformAlgorithmWrapper be an algorithm that runs these steps given a value chunk:
307 // Step 5.1 Let result be the result of running transformAlgorithm given chunk.
308 // Step 5.2 If result is a Promise, then return result.
309 // Note: not applicable, the spec does NOT require
310 // compress_and_enqueue_a_chunk() to return a Promise.
311 // Step 5.3 Return a promise resolved with undefined.
312 .map(|_| Promise::new_resolved(global, cx, (), can_gc))
313 .unwrap_or_else(|e| {
314 // <https://streams.spec.whatwg.org/#transformstream-set-up>
315 // Step 5.1 If this throws an exception e,
316 let realm = enter_realm(self);
317 let p = Promise::new_in_current_realm((&realm).into(), can_gc);
318 // return a promise rejected with e.
319 p.reject_error(e, can_gc);
320 p
321 })
322 },
323 TransformerType::Decompressor(ds) => {
324 // <https://compression.spec.whatwg.org/#dom-decompressionstream-decompressionstream>
325 // Step 3. Let transformAlgorithm be an algorithm which takes a chunk argument and
326 // runs the decompress and enqueue a chunk algorithm with this and chunk.
327 decompress_and_enqueue_a_chunk(cx, global, ds, chunk, self, can_gc)
328 // <https://streams.spec.whatwg.org/#transformstream-set-up>
329 // Step 5. Let transformAlgorithmWrapper be an algorithm that runs these steps given a value chunk:
330 // Step 5.1 Let result be the result of running transformAlgorithm given chunk.
331 // Step 5.2 If result is a Promise, then return result.
332 // Note: not applicable, the spec does NOT require
333 // decompress_and_enqueue_a_chunk() to return a Promise
334 // Step 5.3 Return a promise resolved with undefined.
335 .map(|_| Promise::new_resolved(global, cx, (), can_gc))
336 .unwrap_or_else(|e| {
337 // <https://streams.spec.whatwg.org/#transformstream-set-up>
338 // Step 5.1 If this throws an exception e,
339 let realm = enter_realm(self);
340 let p = Promise::new_in_current_realm((&realm).into(), can_gc);
341 // return a promise rejected with e.
342 p.reject_error(e, can_gc);
343 p
344 })
345 },
346 };
347
348 Ok(result)
349 }
350
351 pub(crate) fn perform_cancel(
352 &self,
353 cx: SafeJSContext,
354 global: &GlobalScope,
355 chunk: SafeHandleValue,
356 can_gc: CanGc,
357 ) -> Fallible<Rc<Promise>> {
358 let result = match &self.transformer_type {
359 // <https://streams.spec.whatwg.org/#set-up-transform-stream-default-controller-from-transformer>
360 TransformerType::Js {
361 cancel,
362 transform_obj,
363 ..
364 } => {
365 // Step 7. If transformerDict["cancel"] exists, set
366 // cancelAlgorithm to an algorithm which takes an argument
367 // reason and returns the result of invoking
368 // transformerDict["cancel"] with argument list « reason » and
369 // callback this value transformer.
370 let algo = cancel.borrow().clone();
371 if let Some(cancel) = algo {
372 rooted!(in(*cx) let this_object = transform_obj.get());
373 cancel
374 .Call_(
375 &this_object.handle(),
376 chunk,
377 ExceptionHandling::Rethrow,
378 can_gc,
379 )
380 .unwrap_or_else(|e| {
381 let p = Promise::new(global, can_gc);
382 p.reject_error(e, can_gc);
383 p
384 })
385 } else {
386 // Step 4. Let cancelAlgorithm be an algorithm which returns a promise resolved with undefined.
387 Promise::new_resolved(global, cx, (), can_gc)
388 }
389 },
390 TransformerType::Decoder(_) => {
391 // <https://streams.spec.whatwg.org/#transformstream-set-up>
392 // Step 7. Let cancelAlgorithmWrapper be an algorithm that runs these steps given a value reason:
393 // Step 7.1 Let result be the result of running cancelAlgorithm given reason,
394 // if cancelAlgorithm was given, or null otherwise
395 // Note: `TextDecoderStream` does NOT specify a cancel algorithm.
396 // Step 7.2 If result is a Promise, then return result.
397 // Note: Not applicable.
398 // Step 7.3 Return a promise resolved with undefined.
399 Promise::new_resolved(global, cx, (), can_gc)
400 },
401 TransformerType::Encoder(_) => {
402 // <https://streams.spec.whatwg.org/#transformstream-set-up>
403 // Step 7. Let cancelAlgorithmWrapper be an algorithm that runs these steps given a value reason:
404 // Step 7.1 Let result be the result of running cancelAlgorithm given reason,
405 // if cancelAlgorithm was given, or null otherwise
406 // Note: `TextDecoderStream` does NOT specify a cancel algorithm.
407 // Step 7.2 If result is a Promise, then return result.
408 // Note: Not applicable.
409 // Step 7.3 Return a promise resolved with undefined.
410 Promise::new_resolved(global, cx, (), can_gc)
411 },
412 TransformerType::Compressor(_) => {
413 // <https://streams.spec.whatwg.org/#transformstream-set-up>
414 // Step 7. Let cancelAlgorithmWrapper be an algorithm that runs these steps given a value reason:
415 // Step 7.1 Let result be the result of running cancelAlgorithm given reason,
416 // if cancelAlgorithm was given, or null otherwise
417 // Note: `CompressionStream` does NOT specify a cancel algorithm.
418 // Step 7.2 If result is a Promise, then return result.
419 // Note: Not applicable.
420 // Step 7.3 Return a promise resolved with undefined.
421 Promise::new_resolved(global, cx, (), can_gc)
422 },
423 TransformerType::Decompressor(_) => {
424 // <https://streams.spec.whatwg.org/#transformstream-set-up>
425 // Step 7. Let cancelAlgorithmWrapper be an algorithm that runs these steps given a value reason:
426 // Step 7.1 Let result be the result of running cancelAlgorithm given reason,
427 // if cancelAlgorithm was given, or null otherwise
428 // Note: `DecompressionStream` does NOT specify a cancel algorithm.
429 // Step 7.2 If result is a Promise, then return result.
430 // Note: Not applicable.
431 // Step 7.3 Return a promise resolved with undefined.
432 Promise::new_resolved(global, cx, (), can_gc)
433 },
434 };
435
436 Ok(result)
437 }
438
439 pub(crate) fn perform_flush(
440 &self,
441 cx: SafeJSContext,
442 global: &GlobalScope,
443 can_gc: CanGc,
444 ) -> Fallible<Rc<Promise>> {
445 let result = match &self.transformer_type {
446 // <https://streams.spec.whatwg.org/#set-up-transform-stream-default-controller-from-transformer>
447 TransformerType::Js {
448 flush,
449 transform_obj,
450 ..
451 } => {
452 // Step 6. If transformerDict["flush"] exists, set flushAlgorithm to an
453 // algorithm which returns the result of invoking
454 // transformerDict["flush"] with argument list « controller »
455 // and callback this value transformer.
456 let algo = flush.borrow().clone();
457 if let Some(flush) = algo {
458 rooted!(in(*cx) let this_object = transform_obj.get());
459 flush
460 .Call_(
461 &this_object.handle(),
462 self,
463 ExceptionHandling::Rethrow,
464 can_gc,
465 )
466 .unwrap_or_else(|e| {
467 let p = Promise::new(global, can_gc);
468 p.reject_error(e, can_gc);
469 p
470 })
471 } else {
472 // Step 3. Let flushAlgorithm be an algorithm which returns a promise resolved with undefined.
473 Promise::new_resolved(global, cx, (), can_gc)
474 }
475 },
476 TransformerType::Decoder(decoder) => {
477 // <https://encoding.spec.whatwg.org/#dom-textdecoderstream>
478 // Step 8. Let flushAlgorithm be an algorithm which takes no
479 // arguments and runs the flush and enqueue algorithm with this.
480 flush_and_enqueue(cx, global, decoder, self, can_gc)
481 // <https://streams.spec.whatwg.org/#transformstream-set-up>
482 // Step 6. Let flushAlgorithmWrapper be an algorithm that runs these steps:
483 // Step 6.1 Let result be the result of running flushAlgorithm,
484 // if flushAlgorithm was given, or null otherwise.
485 // Step 6.2 If result is a Promise, then return result.
486 // Note: Not applicable. The spec does NOT require flush_and_enqueue algo to return a Promise
487 // Step 6.3 Return a promise resolved with undefined.
488 .map(|_| Promise::new_resolved(global, cx, (), can_gc))
489 .unwrap_or_else(|e| {
490 // <https://streams.spec.whatwg.org/#transformstream-set-up>
491 // Step 6.1 If this throws an exception e,
492 let realm = enter_realm(self);
493 let p = Promise::new_in_current_realm((&realm).into(), can_gc);
494 // return a promise rejected with e.
495 p.reject_error(e, can_gc);
496 p
497 })
498 },
499 TransformerType::Encoder(encoder) => {
500 // <https://encoding.spec.whatwg.org/#textencoderstream-encoder>
501 // Step 3. Let flushAlgorithm be an algorithm which runs the encode and flush algorithm with this.
502 encode_and_flush(cx, global, encoder, self, can_gc)
503 // <https://streams.spec.whatwg.org/#transformstream-set-up>
504 // Step 6. Let flushAlgorithmWrapper be an algorithm that runs these steps:
505 // Step 6.1 Let result be the result of running flushAlgorithm,
506 // if flushAlgorithm was given, or null otherwise.
507 // Step 6.2 If result is a Promise, then return result.
508 // Note: Not applicable. The spec does NOT require encode_and_flush algo to return a Promise
509 // Step 6.3 Return a promise resolved with undefined.
510 .map(|_| Promise::new_resolved(global, cx, (), can_gc))
511 .unwrap_or_else(|e| {
512 // <https://streams.spec.whatwg.org/#transformstream-set-up>
513 // Step 6.1 If this throws an exception e,
514 let realm = enter_realm(self);
515 let p = Promise::new_in_current_realm((&realm).into(), can_gc);
516 // return a promise rejected with e.
517 p.reject_error(e, can_gc);
518 p
519 })
520 },
521 TransformerType::Compressor(cs) => {
522 // <https://compression.spec.whatwg.org/#dom-compressionstream-compressionstream>
523 // Step 4. Let flushAlgorithm be an algorithm which takes no argument and runs the
524 // compress flush and enqueue algorithm with this.
525 compress_flush_and_enqueue(cx, global, cs, self, can_gc)
526 // <https://streams.spec.whatwg.org/#transformstream-set-up>
527 // Step 6. Let flushAlgorithmWrapper be an algorithm that runs these steps:
528 // Step 6.1 Let result be the result of running flushAlgorithm,
529 // if flushAlgorithm was given, or null otherwise.
530 // Step 6.2 If result is a Promise, then return result.
531 // Note: Not applicable. The spec does NOT require compress_flush_and_enqueue
532 // algo to return a Promise.
533 // Step 6.3 Return a promise resolved with undefined.
534 .map(|_| Promise::new_resolved(global, cx, (), can_gc))
535 .unwrap_or_else(|e| {
536 // <https://streams.spec.whatwg.org/#transformstream-set-up>
537 // Step 6.1 If this throws an exception e,
538 let realm = enter_realm(self);
539 let p = Promise::new_in_current_realm((&realm).into(), can_gc);
540 // return a promise rejected with e.
541 p.reject_error(e, can_gc);
542 p
543 })
544 },
545 TransformerType::Decompressor(ds) => {
546 // <https://compression.spec.whatwg.org/#dom-decompressionstream-decompressionstream>
547 // Step 4. Let flushAlgorithm be an algorithm which takes no argument and runs the
548 // decompress flush and enqueue algorithm with this.
549 decompress_flush_and_enqueue(cx, global, ds, self, can_gc)
550 // <https://streams.spec.whatwg.org/#transformstream-set-up>
551 // Step 6. Let flushAlgorithmWrapper be an algorithm that runs these steps:
552 // Step 6.1 Let result be the result of running flushAlgorithm,
553 // if flushAlgorithm was given, or null otherwise.
554 // Step 6.2 If result is a Promise, then return result.
555 // Note: Not applicable. The spec does NOT require decompress_flush_and_enqueue
556 // algo to return a Promise.
557 // Step 6.3 Return a promise resolved with undefined.
558 .map(|_| Promise::new_resolved(global, cx, (), can_gc))
559 .unwrap_or_else(|e| {
560 // <https://streams.spec.whatwg.org/#transformstream-set-up>
561 // Step 6.1 If this throws an exception e,
562 let realm = enter_realm(self);
563 let p = Promise::new_in_current_realm((&realm).into(), can_gc);
564 // return a promise rejected with e.
565 p.reject_error(e, can_gc);
566 p
567 })
568 },
569 };
570
571 Ok(result)
572 }
573
574 /// <https://streams.spec.whatwg.org/#transform-stream-default-controller-enqueue>
575 #[expect(unsafe_code)]
576 pub(crate) fn enqueue(
577 &self,
578 cx: SafeJSContext,
579 global: &GlobalScope,
580 chunk: SafeHandleValue,
581 can_gc: CanGc,
582 ) -> Fallible<()> {
583 // Let stream be controller.[[stream]].
584 let stream = self.stream.get().expect("stream is null");
585
586 // Let readableController be stream.[[readable]].[[controller]].
587 let readable = stream.get_readable();
588 let readable_controller = readable.get_default_controller();
589
590 // If ! ReadableStreamDefaultControllerCanCloseOrEnqueue(readableController)
591 // is false, throw a TypeError exception.
592 if !readable_controller.can_close_or_enqueue() {
593 return Err(Error::Type(
594 "ReadableStreamDefaultControllerCanCloseOrEnqueue is false".to_owned(),
595 ));
596 }
597
598 // Let enqueueResult be ReadableStreamDefaultControllerEnqueue(readableController, chunk).
599 // If enqueueResult is an abrupt completion,
600 if let Err(error) = readable_controller.enqueue(cx, chunk, can_gc) {
601 // Perform ! TransformStreamErrorWritableAndUnblockWrite(stream, enqueueResult.[[Value]]).
602 rooted!(in(*cx) let mut rooted_error = UndefinedValue());
603 error
604 .clone()
605 .to_jsval(cx, global, rooted_error.handle_mut(), can_gc);
606 stream.error_writable_and_unblock_write(cx, global, rooted_error.handle(), can_gc);
607
608 // Throw stream.[[readable]].[[storedError]].
609 unsafe {
610 if !JS_IsExceptionPending(*cx) {
611 rooted!(in(*cx) let mut stored_error = UndefinedValue());
612 readable.get_stored_error(stored_error.handle_mut());
613
614 JS_SetPendingException(
615 *cx,
616 stored_error.handle().into(),
617 ExceptionStackBehavior::Capture,
618 );
619 }
620 }
621 return Err(error);
622 }
623
624 // Let backpressure be ! ReadableStreamDefaultControllerHasBackpressure(readableController).
625 let backpressure = readable_controller.has_backpressure();
626
627 // If backpressure is not stream.[[backpressure]],
628 if backpressure != stream.get_backpressure() {
629 // Assert: backpressure is true.
630 assert!(backpressure);
631
632 // Perform ! TransformStreamSetBackpressure(stream, true).
633 stream.set_backpressure(global, true, can_gc);
634 }
635 Ok(())
636 }
637
638 /// <https://streams.spec.whatwg.org/#transform-stream-default-controller-error>
639 pub(crate) fn error(
640 &self,
641 cx: SafeJSContext,
642 global: &GlobalScope,
643 reason: SafeHandleValue,
644 can_gc: CanGc,
645 ) {
646 // Perform ! TransformStreamError(controller.[[stream]], e).
647 self.stream
648 .get()
649 .expect("stream is undefined")
650 .error(cx, global, reason, can_gc);
651 }
652
653 /// <https://streams.spec.whatwg.org/#transform-stream-default-controller-clear-algorithms>
654 pub(crate) fn clear_algorithms(&self) {
655 if let TransformerType::Js {
656 cancel,
657 flush,
658 transform,
659 ..
660 } = &self.transformer_type
661 {
662 // Set controller.[[transformAlgorithm]] to undefined.
663 transform.replace(None);
664
665 // Set controller.[[flushAlgorithm]] to undefined.
666 flush.replace(None);
667
668 // Set controller.[[cancelAlgorithm]] to undefined.
669 cancel.replace(None);
670 }
671 }
672
673 /// <https://streams.spec.whatwg.org/#transform-stream-default-controller-terminate>
674 pub(crate) fn terminate(&self, cx: SafeJSContext, global: &GlobalScope, can_gc: CanGc) {
675 // Let stream be controller.[[stream]].
676 let stream = self.stream.get().expect("stream is null");
677
678 // Let readableController be stream.[[readable]].[[controller]].
679 let readable = stream.get_readable();
680 let readable_controller = readable.get_default_controller();
681
682 // Perform ! ReadableStreamDefaultControllerClose(readableController).
683 readable_controller.close(can_gc);
684
685 // Let error be a TypeError exception indicating that the stream has been terminated.
686 let error = Error::Type("stream has been terminated".to_owned());
687
688 // Perform ! TransformStreamErrorWritableAndUnblockWrite(stream, error).
689 rooted!(in(*cx) let mut rooted_error = UndefinedValue());
690 error.to_jsval(cx, global, rooted_error.handle_mut(), can_gc);
691 stream.error_writable_and_unblock_write(cx, global, rooted_error.handle(), can_gc);
692 }
693}
694
695#[allow(non_snake_case)]
696impl TransformStreamDefaultControllerMethods<crate::DomTypeHolder>
697 for TransformStreamDefaultController
698{
699 /// <https://streams.spec.whatwg.org/#ts-default-controller-desired-size>
700 fn GetDesiredSize(&self) -> Option<f64> {
701 // Let readableController be this.[[stream]].[[readable]].[[controller]].
702 let readable_controller = self
703 .stream
704 .get()
705 .expect("stream is null")
706 .get_readable()
707 .get_default_controller();
708
709 // Return ! ReadableStreamDefaultControllerGetDesiredSize(readableController).
710 readable_controller.get_desired_size()
711 }
712
713 /// <https://streams.spec.whatwg.org/#ts-default-controller-enqueue>
714 fn Enqueue(&self, cx: SafeJSContext, chunk: SafeHandleValue, can_gc: CanGc) -> Fallible<()> {
715 // Perform ? TransformStreamDefaultControllerEnqueue(this, chunk).
716 self.enqueue(cx, &self.global(), chunk, can_gc)
717 }
718
719 /// <https://streams.spec.whatwg.org/#ts-default-controller-error>
720 fn Error(&self, cx: SafeJSContext, reason: SafeHandleValue, can_gc: CanGc) -> Fallible<()> {
721 // Perform ? TransformStreamDefaultControllerError(this, e).
722 self.error(cx, &self.global(), reason, can_gc);
723 Ok(())
724 }
725
726 /// <https://streams.spec.whatwg.org/#ts-default-controller-terminate>
727 fn Terminate(&self, can_gc: CanGc) -> Fallible<()> {
728 // Perform ? TransformStreamDefaultControllerTerminate(this).
729 self.terminate(GlobalScope::get_cx(), &self.global(), can_gc);
730 Ok(())
731 }
732}