script/dom/stream/transformstreamdefaultcontroller.rs
1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
4
5use std::cell::RefCell;
6use std::rc::Rc;
7
8use dom_struct::dom_struct;
9use js::context::JSContext;
10use js::jsapi::{
11 ExceptionStackBehavior, Heap, JS_IsExceptionPending, JS_SetPendingException, JSObject,
12};
13use js::jsval::UndefinedValue;
14use js::realm::CurrentRealm;
15use js::rust::{HandleObject as SafeHandleObject, HandleValue as SafeHandleValue};
16use script_bindings::cell::DomRefCell;
17use script_bindings::reflector::{Reflector, reflect_dom_object_with_cx};
18
19use crate::dom::bindings::callback::ExceptionHandling;
20use crate::dom::bindings::codegen::Bindings::TransformStreamDefaultControllerBinding::TransformStreamDefaultControllerMethods;
21use crate::dom::bindings::codegen::Bindings::TransformerBinding::{
22 Transformer, TransformerCancelCallback, TransformerFlushCallback, TransformerTransformCallback,
23};
24use crate::dom::bindings::error::{Error, ErrorToJsval, Fallible};
25use crate::dom::bindings::reflector::DomGlobal;
26use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom};
27use crate::dom::compressionstream::{
28 CompressionStream, compress_and_enqueue_a_chunk, compress_flush_and_enqueue,
29};
30use crate::dom::decompressionstream::{
31 decompress_and_enqueue_a_chunk, decompress_flush_and_enqueue,
32};
33use crate::dom::encoding::textdecodercommon::TextDecoderCommon;
34use crate::dom::encoding::textdecoderstream::{decode_and_enqueue_a_chunk, flush_and_enqueue};
35use crate::dom::encoding::textencoderstream::{
36 Encoder, encode_and_enqueue_a_chunk, encode_and_flush,
37};
38use crate::dom::globalscope::GlobalScope;
39use crate::dom::promise::Promise;
40use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
41use crate::dom::types::{DecompressionStream, TransformStream};
42use crate::realms::enter_auto_realm;
43
44impl js::gc::Rootable for TransformTransformPromiseRejection {}
45
46/// Reacting to transformPromise as part of
47/// <https://streams.spec.whatwg.org/#transform-stream-default-controller-perform-transform>
48#[derive(JSTraceable, MallocSizeOf)]
49#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
50struct TransformTransformPromiseRejection {
51 controller: Dom<TransformStreamDefaultController>,
52}
53
54impl Callback for TransformTransformPromiseRejection {
55 /// Reacting to transformPromise with the following fulfillment steps:
56 fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
57 // Perform ! TransformStreamError(controller.[[stream]], r).
58 self.controller.error(cx, &self.controller.global(), v);
59
60 // Throw r.
61 // Note: this is done part of perform_transform().
62 }
63}
64
65/// The type of transformer algorithms we are using
66#[derive(JSTraceable)]
67pub(crate) enum TransformerType {
68 /// Algorithms provided by Js callbacks
69 Js {
70 /// <https://streams.spec.whatwg.org/#transformstreamdefaultcontroller-cancelalgorithm>
71 cancel: RefCell<Option<Rc<TransformerCancelCallback>>>,
72
73 /// <https://streams.spec.whatwg.org/#transformstreamdefaultcontroller-flushalgorithm>
74 flush: RefCell<Option<Rc<TransformerFlushCallback>>>,
75
76 /// <https://streams.spec.whatwg.org/#transformstreamdefaultcontroller-transformalgorithm>
77 transform: RefCell<Option<Rc<TransformerTransformCallback>>>,
78
79 /// The JS object used as `this` when invoking sink algorithms.
80 transform_obj: Heap<*mut JSObject>,
81 },
82 /// Algorithms supporting `TextDecoderStream` are implemented in Rust
83 ///
84 /// <https://encoding.spec.whatwg.org/#textdecodercommon>
85 Decoder(Rc<TextDecoderCommon>),
86 /// Algorithms supporting `TextEncoderStream` are implemented in Rust
87 ///
88 /// <https://encoding.spec.whatwg.org/#textencoderstream-encoder>
89 Encoder(Encoder),
90 /// Algorithms supporting `CompressionStream` are implemented in Rust
91 ///
92 /// <https://compression.spec.whatwg.org/#compressionstream>
93 Compressor(DomRoot<CompressionStream>),
94 /// Algorithms supporting `DecompressionStream` are implemented in Rust
95 ///
96 /// <https://compression.spec.whatwg.org/#decompressionstream>
97 Decompressor(DomRoot<DecompressionStream>),
98}
99
100impl TransformerType {
101 pub(crate) fn new_from_js_transformer(transformer: &Transformer) -> TransformerType {
102 TransformerType::Js {
103 cancel: RefCell::new(transformer.cancel.clone()),
104 flush: RefCell::new(transformer.flush.clone()),
105 transform: RefCell::new(transformer.transform.clone()),
106 transform_obj: Default::default(),
107 }
108 }
109}
110
111/// <https://streams.spec.whatwg.org/#transformstreamdefaultcontroller>
112#[dom_struct]
113pub struct TransformStreamDefaultController {
114 reflector_: Reflector,
115
116 /// The type of the underlying transformer used. Besides the JS variant,
117 /// there will be other variant(s) for `TextDecoderStream`
118 #[ignore_malloc_size_of = "transformer_type"]
119 transformer_type: TransformerType,
120
121 /// <https://streams.spec.whatwg.org/#TransformStreamDefaultController-stream>
122 stream: MutNullableDom<TransformStream>,
123
124 /// <https://streams.spec.whatwg.org/#transformstreamdefaultcontroller-finishpromise>
125 #[conditional_malloc_size_of]
126 finish_promise: DomRefCell<Option<Rc<Promise>>>,
127}
128
129impl TransformStreamDefaultController {
130 fn new_inherited(transformer_type: TransformerType) -> TransformStreamDefaultController {
131 TransformStreamDefaultController {
132 reflector_: Reflector::new(),
133 transformer_type,
134 stream: MutNullableDom::new(None),
135 finish_promise: DomRefCell::new(None),
136 }
137 }
138
139 pub(crate) fn new(
140 cx: &mut JSContext,
141 global: &GlobalScope,
142 transformer_type: TransformerType,
143 ) -> DomRoot<TransformStreamDefaultController> {
144 reflect_dom_object_with_cx(
145 Box::new(TransformStreamDefaultController::new_inherited(
146 transformer_type,
147 )),
148 global,
149 cx,
150 )
151 }
152
153 /// Setting the JS object after the heap has settled down.
154 ///
155 /// Note that this has no effect if the transformer type is not `TransformerType::Js`
156 pub(crate) fn set_transform_obj(&self, this_object: SafeHandleObject) {
157 if let TransformerType::Js { transform_obj, .. } = &self.transformer_type {
158 transform_obj.set(*this_object)
159 } else {
160 unreachable!("Non-Js transformer type should not set transform_obj")
161 }
162 }
163
164 pub(crate) fn set_stream(&self, stream: &TransformStream) {
165 self.stream.set(Some(stream));
166 }
167
168 pub(crate) fn get_finish_promise(&self) -> Option<Rc<Promise>> {
169 self.finish_promise.borrow().clone()
170 }
171
172 pub(crate) fn set_finish_promise(&self, promise: Rc<Promise>) {
173 *self.finish_promise.borrow_mut() = Some(promise);
174 }
175
176 /// <https://streams.spec.whatwg.org/#transform-stream-default-controller-perform-transform>
177 pub(crate) fn transform_stream_default_controller_perform_transform(
178 &self,
179 cx: &mut JSContext,
180 global: &GlobalScope,
181 chunk: SafeHandleValue,
182 ) -> Fallible<Rc<Promise>> {
183 // Let transformPromise be the result of performing controller.[[transformAlgorithm]], passing chunk.
184 let transform_promise = self.perform_transform(cx, global, chunk)?;
185
186 // Return the result of reacting to transformPromise with the following rejection steps given the argument r:
187 rooted!(&in(cx) let mut reject_handler = Some(TransformTransformPromiseRejection {
188 controller: Dom::from_ref(self),
189 }));
190
191 let handler = PromiseNativeHandler::new(
192 cx,
193 global,
194 None,
195 reject_handler.take().map(|h| Box::new(h) as Box<_>),
196 );
197 let mut realm = enter_auto_realm(cx, global);
198 let realm = &mut realm.current_realm();
199 transform_promise.append_native_handler(realm, &handler);
200
201 Ok(transform_promise)
202 }
203
204 pub(crate) fn perform_transform(
205 &self,
206 cx: &mut JSContext,
207 global: &GlobalScope,
208 chunk: SafeHandleValue,
209 ) -> Fallible<Rc<Promise>> {
210 let result = match &self.transformer_type {
211 // <https://streams.spec.whatwg.org/#set-up-transform-stream-default-controller-from-transformer>
212 TransformerType::Js {
213 transform,
214 transform_obj,
215 ..
216 } => {
217 // Step 5. If transformerDict["transform"] exists, set
218 // transformAlgorithm to an algorithm which takes an argument
219 // chunk and returns the result of invoking
220 // transformerDict["transform"] with argument list « chunk,
221 // controller » and callback this value transformer.
222 let algo = transform.borrow().clone();
223 if let Some(transform) = algo {
224 rooted!(&in(cx) let this_object = transform_obj.get());
225 transform
226 .Call_(
227 cx,
228 &this_object.handle(),
229 chunk,
230 self,
231 ExceptionHandling::Rethrow,
232 )
233 .unwrap_or_else(|e| {
234 let p = Promise::new(cx, global);
235 p.reject_error(cx, e);
236 p
237 })
238 } else {
239 // Step 2. Let transformAlgorithm be the following steps, taking a chunk argument:
240 // Let result be TransformStreamDefaultControllerEnqueue(controller, chunk).
241 // If result is an abrupt completion, return a promise rejected with result.[[Value]].
242 if let Err(error) = self.enqueue(cx, global, chunk) {
243 rooted!(&in(cx) let mut error_val = UndefinedValue());
244 error.to_jsval(cx, global, error_val.handle_mut());
245 Promise::new_rejected(cx, global, error_val.handle())
246 } else {
247 // Otherwise, return a promise resolved with undefined.
248 Promise::new_resolved(cx, global, ())
249 }
250 }
251 },
252 TransformerType::Decoder(decoder) => {
253 // <https://encoding.spec.whatwg.org/#dom-textdecoderstream>
254 // Step 7. Let transformAlgorithm be an algorithm which takes a
255 // chunk argument and runs the decode and enqueue a chunk
256 // algorithm with this and chunk.
257 decode_and_enqueue_a_chunk(cx, global, chunk, decoder, self)
258 // <https://streams.spec.whatwg.org/#transformstream-set-up>
259 // Step 5. Let transformAlgorithmWrapper be an algorithm that runs these steps given a value chunk:
260 // Step 5.1 Let result be the result of running transformAlgorithm given chunk.
261 // Step 5.2 If result is a Promise, then return result.
262 // Note: not applicable, the spec does NOT require deode_and_enqueue_a_chunk() to return a Promise
263 // Step 5.3 Return a promise resolved with undefined.
264 .map(|_| Promise::new_resolved(cx, global, ()))
265 .unwrap_or_else(|e| {
266 // <https://streams.spec.whatwg.org/#transformstream-set-up>
267 // Step 5.1 If this throws an exception e,
268 let mut realm = enter_auto_realm(cx, self);
269 let realm = &mut realm.current_realm();
270 let p = Promise::new_in_realm(realm);
271 // return a promise rejected with e.
272 p.reject_error(realm, e);
273 p
274 })
275 },
276 TransformerType::Encoder(encoder) => {
277 // <https://encoding.spec.whatwg.org/#dom-textencoderstream>
278 // Step 2. Let transformAlgorithm be an algorithm which takes a chunk argument and runs the encode
279 // and enqueue a chunk algorithm with this and chunk.
280 encode_and_enqueue_a_chunk(cx, global, chunk, encoder, self)
281 // <https://streams.spec.whatwg.org/#transformstream-set-up>
282 // Step 5. Let transformAlgorithmWrapper be an algorithm that runs these steps given a value chunk:
283 // Step 5.1 Let result be the result of running transformAlgorithm given chunk.
284 // Step 5.2 If result is a Promise, then return result.
285 // Note: not applicable, the spec does NOT require encode_and_enqueue_a_chunk() to return a Promise
286 // Step 5.3 Return a promise resolved with undefined.
287 .map(|_| Promise::new_resolved(cx, global, ()))
288 .unwrap_or_else(|e| {
289 // <https://streams.spec.whatwg.org/#transformstream-set-up>
290 // Step 5.1 If this throws an exception e,
291 let mut realm = enter_auto_realm(cx, self);
292 let realm = &mut realm.current_realm();
293 let p = Promise::new_in_realm(realm);
294 // return a promise rejected with e.
295 p.reject_error(realm, e);
296 p
297 })
298 },
299 TransformerType::Compressor(cs) => {
300 // <https://compression.spec.whatwg.org/#dom-compressionstream-compressionstream>
301 // Step 3. Let transformAlgorithm be an algorithm which takes a chunk argument and
302 // runs the compress and enqueue a chunk algorithm with this and chunk.
303 compress_and_enqueue_a_chunk(cx, global, cs, chunk, self)
304 // <https://streams.spec.whatwg.org/#transformstream-set-up>
305 // Step 5. Let transformAlgorithmWrapper be an algorithm that runs these steps given a value chunk:
306 // Step 5.1 Let result be the result of running transformAlgorithm given chunk.
307 // Step 5.2 If result is a Promise, then return result.
308 // Note: not applicable, the spec does NOT require
309 // compress_and_enqueue_a_chunk() to return a Promise.
310 // Step 5.3 Return a promise resolved with undefined.
311 .map(|_| Promise::new_resolved(cx, global, ()))
312 .unwrap_or_else(|e| {
313 // <https://streams.spec.whatwg.org/#transformstream-set-up>
314 // Step 5.1 If this throws an exception e,
315 let mut realm = enter_auto_realm(cx, self);
316 let realm = &mut realm.current_realm();
317 let p = Promise::new_in_realm(realm);
318 // return a promise rejected with e.
319 p.reject_error(realm, e);
320 p
321 })
322 },
323 TransformerType::Decompressor(ds) => {
324 // <https://compression.spec.whatwg.org/#dom-decompressionstream-decompressionstream>
325 // Step 3. Let transformAlgorithm be an algorithm which takes a chunk argument and
326 // runs the decompress and enqueue a chunk algorithm with this and chunk.
327 decompress_and_enqueue_a_chunk(cx, global, ds, chunk, self)
328 // <https://streams.spec.whatwg.org/#transformstream-set-up>
329 // Step 5. Let transformAlgorithmWrapper be an algorithm that runs these steps given a value chunk:
330 // Step 5.1 Let result be the result of running transformAlgorithm given chunk.
331 // Step 5.2 If result is a Promise, then return result.
332 // Note: not applicable, the spec does NOT require
333 // decompress_and_enqueue_a_chunk() to return a Promise
334 // Step 5.3 Return a promise resolved with undefined.
335 .map(|_| Promise::new_resolved(cx, global, ()))
336 .unwrap_or_else(|e| {
337 // <https://streams.spec.whatwg.org/#transformstream-set-up>
338 // Step 5.1 If this throws an exception e,
339 let mut realm = enter_auto_realm(cx, self);
340 let realm = &mut realm.current_realm();
341 let p = Promise::new_in_realm(realm);
342 // return a promise rejected with e.
343 p.reject_error(realm, e);
344 p
345 })
346 },
347 };
348
349 Ok(result)
350 }
351
352 pub(crate) fn perform_cancel(
353 &self,
354 cx: &mut JSContext,
355 global: &GlobalScope,
356 chunk: SafeHandleValue,
357 ) -> Fallible<Rc<Promise>> {
358 let result = match &self.transformer_type {
359 // <https://streams.spec.whatwg.org/#set-up-transform-stream-default-controller-from-transformer>
360 TransformerType::Js {
361 cancel,
362 transform_obj,
363 ..
364 } => {
365 // Step 7. If transformerDict["cancel"] exists, set
366 // cancelAlgorithm to an algorithm which takes an argument
367 // reason and returns the result of invoking
368 // transformerDict["cancel"] with argument list « reason » and
369 // callback this value transformer.
370 let algo = cancel.borrow().clone();
371 if let Some(cancel) = algo {
372 rooted!(&in(cx) let this_object = transform_obj.get());
373 cancel
374 .Call_(cx, &this_object.handle(), chunk, ExceptionHandling::Rethrow)
375 .unwrap_or_else(|e| {
376 let p = Promise::new(cx, global);
377 p.reject_error(cx, e);
378 p
379 })
380 } else {
381 // Step 4. Let cancelAlgorithm be an algorithm which returns a promise resolved with undefined.
382 Promise::new_resolved(cx, global, ())
383 }
384 },
385 TransformerType::Decoder(_) => {
386 // <https://streams.spec.whatwg.org/#transformstream-set-up>
387 // Step 7. Let cancelAlgorithmWrapper be an algorithm that runs these steps given a value reason:
388 // Step 7.1 Let result be the result of running cancelAlgorithm given reason,
389 // if cancelAlgorithm was given, or null otherwise
390 // Note: `TextDecoderStream` does NOT specify a cancel algorithm.
391 // Step 7.2 If result is a Promise, then return result.
392 // Note: Not applicable.
393 // Step 7.3 Return a promise resolved with undefined.
394 Promise::new_resolved(cx, global, ())
395 },
396 TransformerType::Encoder(_) => {
397 // <https://streams.spec.whatwg.org/#transformstream-set-up>
398 // Step 7. Let cancelAlgorithmWrapper be an algorithm that runs these steps given a value reason:
399 // Step 7.1 Let result be the result of running cancelAlgorithm given reason,
400 // if cancelAlgorithm was given, or null otherwise
401 // Note: `TextDecoderStream` does NOT specify a cancel algorithm.
402 // Step 7.2 If result is a Promise, then return result.
403 // Note: Not applicable.
404 // Step 7.3 Return a promise resolved with undefined.
405 Promise::new_resolved(cx, global, ())
406 },
407 TransformerType::Compressor(_) => {
408 // <https://streams.spec.whatwg.org/#transformstream-set-up>
409 // Step 7. Let cancelAlgorithmWrapper be an algorithm that runs these steps given a value reason:
410 // Step 7.1 Let result be the result of running cancelAlgorithm given reason,
411 // if cancelAlgorithm was given, or null otherwise
412 // Note: `CompressionStream` does NOT specify a cancel algorithm.
413 // Step 7.2 If result is a Promise, then return result.
414 // Note: Not applicable.
415 // Step 7.3 Return a promise resolved with undefined.
416 Promise::new_resolved(cx, global, ())
417 },
418 TransformerType::Decompressor(_) => {
419 // <https://streams.spec.whatwg.org/#transformstream-set-up>
420 // Step 7. Let cancelAlgorithmWrapper be an algorithm that runs these steps given a value reason:
421 // Step 7.1 Let result be the result of running cancelAlgorithm given reason,
422 // if cancelAlgorithm was given, or null otherwise
423 // Note: `DecompressionStream` does NOT specify a cancel algorithm.
424 // Step 7.2 If result is a Promise, then return result.
425 // Note: Not applicable.
426 // Step 7.3 Return a promise resolved with undefined.
427 Promise::new_resolved(cx, global, ())
428 },
429 };
430
431 Ok(result)
432 }
433
434 pub(crate) fn perform_flush(
435 &self,
436 cx: &mut JSContext,
437 global: &GlobalScope,
438 ) -> Fallible<Rc<Promise>> {
439 let result = match &self.transformer_type {
440 // <https://streams.spec.whatwg.org/#set-up-transform-stream-default-controller-from-transformer>
441 TransformerType::Js {
442 flush,
443 transform_obj,
444 ..
445 } => {
446 // Step 6. If transformerDict["flush"] exists, set flushAlgorithm to an
447 // algorithm which returns the result of invoking
448 // transformerDict["flush"] with argument list « controller »
449 // and callback this value transformer.
450 let algo = flush.borrow().clone();
451 if let Some(flush) = algo {
452 rooted!(&in(cx) let this_object = transform_obj.get());
453 flush
454 .Call_(cx, &this_object.handle(), self, ExceptionHandling::Rethrow)
455 .unwrap_or_else(|e| {
456 let p = Promise::new(cx, global);
457 p.reject_error(cx, e);
458 p
459 })
460 } else {
461 // Step 3. Let flushAlgorithm be an algorithm which returns a promise resolved with undefined.
462 Promise::new_resolved(cx, global, ())
463 }
464 },
465 TransformerType::Decoder(decoder) => {
466 // <https://encoding.spec.whatwg.org/#dom-textdecoderstream>
467 // Step 8. Let flushAlgorithm be an algorithm which takes no
468 // arguments and runs the flush and enqueue algorithm with this.
469 flush_and_enqueue(cx, global, decoder, self)
470 // <https://streams.spec.whatwg.org/#transformstream-set-up>
471 // Step 6. Let flushAlgorithmWrapper be an algorithm that runs these steps:
472 // Step 6.1 Let result be the result of running flushAlgorithm,
473 // if flushAlgorithm was given, or null otherwise.
474 // Step 6.2 If result is a Promise, then return result.
475 // Note: Not applicable. The spec does NOT require flush_and_enqueue algo to return a Promise
476 // Step 6.3 Return a promise resolved with undefined.
477 .map(|_| Promise::new_resolved(cx, global, ()))
478 .unwrap_or_else(|e| {
479 // <https://streams.spec.whatwg.org/#transformstream-set-up>
480 // Step 6.1 If this throws an exception e,
481 let mut realm = enter_auto_realm(cx, self);
482 let realm = &mut realm.current_realm();
483 let p = Promise::new_in_realm(realm);
484 // return a promise rejected with e.
485 p.reject_error(realm, e);
486 p
487 })
488 },
489 TransformerType::Encoder(encoder) => {
490 // <https://encoding.spec.whatwg.org/#textencoderstream-encoder>
491 // Step 3. Let flushAlgorithm be an algorithm which runs the encode and flush algorithm with this.
492 encode_and_flush(cx, global, encoder, self)
493 // <https://streams.spec.whatwg.org/#transformstream-set-up>
494 // Step 6. Let flushAlgorithmWrapper be an algorithm that runs these steps:
495 // Step 6.1 Let result be the result of running flushAlgorithm,
496 // if flushAlgorithm was given, or null otherwise.
497 // Step 6.2 If result is a Promise, then return result.
498 // Note: Not applicable. The spec does NOT require encode_and_flush algo to return a Promise
499 // Step 6.3 Return a promise resolved with undefined.
500 .map(|_| Promise::new_resolved(cx, global, ()))
501 .unwrap_or_else(|e| {
502 // <https://streams.spec.whatwg.org/#transformstream-set-up>
503 // Step 6.1 If this throws an exception e,
504 let mut realm = enter_auto_realm(cx, self);
505 let realm = &mut realm.current_realm();
506 let p = Promise::new_in_realm(realm);
507 // return a promise rejected with e.
508 p.reject_error(realm, e);
509 p
510 })
511 },
512 TransformerType::Compressor(cs) => {
513 // <https://compression.spec.whatwg.org/#dom-compressionstream-compressionstream>
514 // Step 4. Let flushAlgorithm be an algorithm which takes no argument and runs the
515 // compress flush and enqueue algorithm with this.
516 compress_flush_and_enqueue(cx, global, cs, self)
517 // <https://streams.spec.whatwg.org/#transformstream-set-up>
518 // Step 6. Let flushAlgorithmWrapper be an algorithm that runs these steps:
519 // Step 6.1 Let result be the result of running flushAlgorithm,
520 // if flushAlgorithm was given, or null otherwise.
521 // Step 6.2 If result is a Promise, then return result.
522 // Note: Not applicable. The spec does NOT require compress_flush_and_enqueue
523 // algo to return a Promise.
524 // Step 6.3 Return a promise resolved with undefined.
525 .map(|_| Promise::new_resolved(cx, global, ()))
526 .unwrap_or_else(|e| {
527 // <https://streams.spec.whatwg.org/#transformstream-set-up>
528 // Step 6.1 If this throws an exception e,
529 let mut realm = enter_auto_realm(cx, self);
530 let realm = &mut realm.current_realm();
531 let p = Promise::new_in_realm(realm);
532 // return a promise rejected with e.
533 p.reject_error(realm, e);
534 p
535 })
536 },
537 TransformerType::Decompressor(ds) => {
538 // <https://compression.spec.whatwg.org/#dom-decompressionstream-decompressionstream>
539 // Step 4. Let flushAlgorithm be an algorithm which takes no argument and runs the
540 // decompress flush and enqueue algorithm with this.
541 decompress_flush_and_enqueue(cx, global, ds, self)
542 // <https://streams.spec.whatwg.org/#transformstream-set-up>
543 // Step 6. Let flushAlgorithmWrapper be an algorithm that runs these steps:
544 // Step 6.1 Let result be the result of running flushAlgorithm,
545 // if flushAlgorithm was given, or null otherwise.
546 // Step 6.2 If result is a Promise, then return result.
547 // Note: Not applicable. The spec does NOT require decompress_flush_and_enqueue
548 // algo to return a Promise.
549 // Step 6.3 Return a promise resolved with undefined.
550 .map(|_| Promise::new_resolved(cx, global, ()))
551 .unwrap_or_else(|e| {
552 // <https://streams.spec.whatwg.org/#transformstream-set-up>
553 // Step 6.1 If this throws an exception e,
554 let mut realm = enter_auto_realm(cx, self);
555 let realm = &mut realm.current_realm();
556 let p = Promise::new_in_realm(realm);
557 // return a promise rejected with e.
558 p.reject_error(realm, e);
559 p
560 })
561 },
562 };
563
564 Ok(result)
565 }
566
567 /// <https://streams.spec.whatwg.org/#transform-stream-default-controller-enqueue>
568 #[expect(unsafe_code)]
569 pub(crate) fn enqueue(
570 &self,
571 cx: &mut JSContext,
572 global: &GlobalScope,
573 chunk: SafeHandleValue,
574 ) -> Fallible<()> {
575 // Let stream be controller.[[stream]].
576 let stream = self.stream.get().expect("stream is null");
577
578 // Let readableController be stream.[[readable]].[[controller]].
579 let readable = stream.get_readable();
580 let readable_controller = readable.get_default_controller();
581
582 // If ! ReadableStreamDefaultControllerCanCloseOrEnqueue(readableController)
583 // is false, throw a TypeError exception.
584 if !readable_controller.can_close_or_enqueue() {
585 return Err(Error::Type(
586 c"ReadableStreamDefaultControllerCanCloseOrEnqueue is false".to_owned(),
587 ));
588 }
589
590 // Let enqueueResult be ReadableStreamDefaultControllerEnqueue(readableController, chunk).
591 // If enqueueResult is an abrupt completion,
592 if let Err(error) = readable_controller.enqueue(cx, chunk) {
593 // Perform ! TransformStreamErrorWritableAndUnblockWrite(stream, enqueueResult.[[Value]]).
594 rooted!(&in(cx) let mut rooted_error = UndefinedValue());
595 error
596 .clone()
597 .to_jsval(cx, global, rooted_error.handle_mut());
598 stream.error_writable_and_unblock_write(cx, global, rooted_error.handle());
599
600 // Throw stream.[[readable]].[[storedError]].
601 unsafe {
602 if !JS_IsExceptionPending(cx.raw_cx()) {
603 rooted!(&in(cx) let mut stored_error = UndefinedValue());
604 readable.get_stored_error(stored_error.handle_mut());
605
606 JS_SetPendingException(
607 cx.raw_cx(),
608 stored_error.handle().into(),
609 ExceptionStackBehavior::Capture,
610 );
611 }
612 }
613 return Err(error);
614 }
615
616 // Let backpressure be ! ReadableStreamDefaultControllerHasBackpressure(readableController).
617 let backpressure = readable_controller.has_backpressure();
618
619 // If backpressure is not stream.[[backpressure]],
620 if backpressure != stream.get_backpressure() {
621 // Assert: backpressure is true.
622 assert!(backpressure);
623
624 // Perform ! TransformStreamSetBackpressure(stream, true).
625 stream.set_backpressure(cx, global, true);
626 }
627 Ok(())
628 }
629
630 /// <https://streams.spec.whatwg.org/#transform-stream-default-controller-error>
631 pub(crate) fn error(&self, cx: &mut JSContext, global: &GlobalScope, reason: SafeHandleValue) {
632 // Perform ! TransformStreamError(controller.[[stream]], e).
633 self.stream
634 .get()
635 .expect("stream is undefined")
636 .error(cx, global, reason);
637 }
638
639 /// <https://streams.spec.whatwg.org/#transform-stream-default-controller-clear-algorithms>
640 pub(crate) fn clear_algorithms(&self) {
641 if let TransformerType::Js {
642 cancel,
643 flush,
644 transform,
645 ..
646 } = &self.transformer_type
647 {
648 // Set controller.[[transformAlgorithm]] to undefined.
649 transform.replace(None);
650
651 // Set controller.[[flushAlgorithm]] to undefined.
652 flush.replace(None);
653
654 // Set controller.[[cancelAlgorithm]] to undefined.
655 cancel.replace(None);
656 }
657 }
658
659 /// <https://streams.spec.whatwg.org/#transform-stream-default-controller-terminate>
660 fn terminate(&self, cx: &mut JSContext, global: &GlobalScope) {
661 // Let stream be controller.[[stream]].
662 let stream = self.stream.get().expect("stream is null");
663
664 // Let readableController be stream.[[readable]].[[controller]].
665 let readable = stream.get_readable();
666 let readable_controller = readable.get_default_controller();
667
668 // Perform ! ReadableStreamDefaultControllerClose(readableController).
669 readable_controller.close(cx);
670
671 // Let error be a TypeError exception indicating that the stream has been terminated.
672 let error = Error::Type(c"stream has been terminated".to_owned());
673
674 // Perform ! TransformStreamErrorWritableAndUnblockWrite(stream, error).
675 rooted!(&in(cx) let mut rooted_error = UndefinedValue());
676 error.to_jsval(cx, global, rooted_error.handle_mut());
677 stream.error_writable_and_unblock_write(cx, global, rooted_error.handle());
678 }
679}
680
681impl TransformStreamDefaultControllerMethods<crate::DomTypeHolder>
682 for TransformStreamDefaultController
683{
684 /// <https://streams.spec.whatwg.org/#ts-default-controller-desired-size>
685 fn GetDesiredSize(&self) -> Option<f64> {
686 // Let readableController be this.[[stream]].[[readable]].[[controller]].
687 let readable_controller = self
688 .stream
689 .get()
690 .expect("stream is null")
691 .get_readable()
692 .get_default_controller();
693
694 // Return ! ReadableStreamDefaultControllerGetDesiredSize(readableController).
695 readable_controller.get_desired_size()
696 }
697
698 /// <https://streams.spec.whatwg.org/#ts-default-controller-enqueue>
699 fn Enqueue(&self, cx: &mut JSContext, chunk: SafeHandleValue) -> Fallible<()> {
700 // Perform ? TransformStreamDefaultControllerEnqueue(this, chunk).
701 self.enqueue(cx, &self.global(), chunk)
702 }
703
704 /// <https://streams.spec.whatwg.org/#ts-default-controller-error>
705 fn Error(&self, cx: &mut JSContext, reason: SafeHandleValue) -> Fallible<()> {
706 // Perform ? TransformStreamDefaultControllerError(this, e).
707 self.error(cx, &self.global(), reason);
708 Ok(())
709 }
710
711 /// <https://streams.spec.whatwg.org/#ts-default-controller-terminate>
712 fn Terminate(&self, cx: &mut JSContext) -> Fallible<()> {
713 // Perform ? TransformStreamDefaultControllerTerminate(this).
714 self.terminate(cx, &self.global());
715 Ok(())
716 }
717}