script/dom/stream/transformstreamdefaultcontroller.rs
1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
4
5use std::cell::RefCell;
6use std::rc::Rc;
7
8use dom_struct::dom_struct;
9use js::jsapi::{
10 ExceptionStackBehavior, Heap, JS_IsExceptionPending, JS_SetPendingException, JSObject,
11};
12use js::jsval::UndefinedValue;
13use js::realm::CurrentRealm;
14use js::rust::{HandleObject as SafeHandleObject, HandleValue as SafeHandleValue};
15
16use crate::dom::bindings::callback::ExceptionHandling;
17use crate::dom::bindings::cell::DomRefCell;
18use crate::dom::bindings::codegen::Bindings::TransformStreamDefaultControllerBinding::TransformStreamDefaultControllerMethods;
19use crate::dom::bindings::codegen::Bindings::TransformerBinding::{
20 Transformer, TransformerCancelCallback, TransformerFlushCallback, TransformerTransformCallback,
21};
22use crate::dom::bindings::error::{Error, ErrorToJsval, Fallible};
23use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object};
24use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom};
25use crate::dom::compressionstream::{
26 CompressionStream, compress_and_enqueue_a_chunk, compress_flush_and_enqueue,
27};
28use crate::dom::decompressionstream::{
29 decompress_and_enqueue_a_chunk, decompress_flush_and_enqueue,
30};
31use crate::dom::encoding::textdecodercommon::TextDecoderCommon;
32use crate::dom::encoding::textdecoderstream::{decode_and_enqueue_a_chunk, flush_and_enqueue};
33use crate::dom::encoding::textencoderstream::{
34 Encoder, encode_and_enqueue_a_chunk, encode_and_flush,
35};
36use crate::dom::globalscope::GlobalScope;
37use crate::dom::promise::Promise;
38use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
39use crate::dom::types::{DecompressionStream, TransformStream};
40use crate::realms::{InRealm, enter_auto_realm};
41use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
42
43impl js::gc::Rootable for TransformTransformPromiseRejection {}
44
45/// Reacting to transformPromise as part of
46/// <https://streams.spec.whatwg.org/#transform-stream-default-controller-perform-transform>
47#[derive(JSTraceable, MallocSizeOf)]
48#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
49struct TransformTransformPromiseRejection {
50 controller: Dom<TransformStreamDefaultController>,
51}
52
53impl Callback for TransformTransformPromiseRejection {
54 /// Reacting to transformPromise with the following fulfillment steps:
55 fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
56 // Perform ! TransformStreamError(controller.[[stream]], r).
57 self.controller.error(cx, &self.controller.global(), v);
58
59 // Throw r.
60 // Note: this is done part of perform_transform().
61 }
62}
63
64/// The type of transformer algorithms we are using
65#[derive(JSTraceable)]
66pub(crate) enum TransformerType {
67 /// Algorithms provided by Js callbacks
68 Js {
69 /// <https://streams.spec.whatwg.org/#transformstreamdefaultcontroller-cancelalgorithm>
70 cancel: RefCell<Option<Rc<TransformerCancelCallback>>>,
71
72 /// <https://streams.spec.whatwg.org/#transformstreamdefaultcontroller-flushalgorithm>
73 flush: RefCell<Option<Rc<TransformerFlushCallback>>>,
74
75 /// <https://streams.spec.whatwg.org/#transformstreamdefaultcontroller-transformalgorithm>
76 transform: RefCell<Option<Rc<TransformerTransformCallback>>>,
77
78 /// The JS object used as `this` when invoking sink algorithms.
79 transform_obj: Heap<*mut JSObject>,
80 },
81 /// Algorithms supporting `TextDecoderStream` are implemented in Rust
82 ///
83 /// <https://encoding.spec.whatwg.org/#textdecodercommon>
84 Decoder(Rc<TextDecoderCommon>),
85 /// Algorithms supporting `TextEncoderStream` are implemented in Rust
86 ///
87 /// <https://encoding.spec.whatwg.org/#textencoderstream-encoder>
88 Encoder(Encoder),
89 /// Algorithms supporting `CompressionStream` are implemented in Rust
90 ///
91 /// <https://compression.spec.whatwg.org/#compressionstream>
92 Compressor(DomRoot<CompressionStream>),
93 /// Algorithms supporting `DecompressionStream` are implemented in Rust
94 ///
95 /// <https://compression.spec.whatwg.org/#decompressionstream>
96 Decompressor(DomRoot<DecompressionStream>),
97}
98
99impl TransformerType {
100 pub(crate) fn new_from_js_transformer(transformer: &Transformer) -> TransformerType {
101 TransformerType::Js {
102 cancel: RefCell::new(transformer.cancel.clone()),
103 flush: RefCell::new(transformer.flush.clone()),
104 transform: RefCell::new(transformer.transform.clone()),
105 transform_obj: Default::default(),
106 }
107 }
108}
109
110/// <https://streams.spec.whatwg.org/#transformstreamdefaultcontroller>
111#[dom_struct]
112pub struct TransformStreamDefaultController {
113 reflector_: Reflector,
114
115 /// The type of the underlying transformer used. Besides the JS variant,
116 /// there will be other variant(s) for `TextDecoderStream`
117 #[ignore_malloc_size_of = "transformer_type"]
118 transformer_type: TransformerType,
119
120 /// <https://streams.spec.whatwg.org/#TransformStreamDefaultController-stream>
121 stream: MutNullableDom<TransformStream>,
122
123 /// <https://streams.spec.whatwg.org/#transformstreamdefaultcontroller-finishpromise>
124 #[conditional_malloc_size_of]
125 finish_promise: DomRefCell<Option<Rc<Promise>>>,
126}
127
128impl TransformStreamDefaultController {
129 fn new_inherited(transformer_type: TransformerType) -> TransformStreamDefaultController {
130 TransformStreamDefaultController {
131 reflector_: Reflector::new(),
132 transformer_type,
133 stream: MutNullableDom::new(None),
134 finish_promise: DomRefCell::new(None),
135 }
136 }
137
138 pub(crate) fn new(
139 global: &GlobalScope,
140 transformer_type: TransformerType,
141 can_gc: CanGc,
142 ) -> DomRoot<TransformStreamDefaultController> {
143 reflect_dom_object(
144 Box::new(TransformStreamDefaultController::new_inherited(
145 transformer_type,
146 )),
147 global,
148 can_gc,
149 )
150 }
151
152 /// Setting the JS object after the heap has settled down.
153 ///
154 /// Note that this has no effect if the transformer type is not `TransformerType::Js`
155 pub(crate) fn set_transform_obj(&self, this_object: SafeHandleObject) {
156 if let TransformerType::Js { transform_obj, .. } = &self.transformer_type {
157 transform_obj.set(*this_object)
158 } else {
159 unreachable!("Non-Js transformer type should not set transform_obj")
160 }
161 }
162
163 pub(crate) fn set_stream(&self, stream: &TransformStream) {
164 self.stream.set(Some(stream));
165 }
166
167 pub(crate) fn get_finish_promise(&self) -> Option<Rc<Promise>> {
168 self.finish_promise.borrow().clone()
169 }
170
171 pub(crate) fn set_finish_promise(&self, promise: Rc<Promise>) {
172 *self.finish_promise.borrow_mut() = Some(promise);
173 }
174
175 /// <https://streams.spec.whatwg.org/#transform-stream-default-controller-perform-transform>
176 pub(crate) fn transform_stream_default_controller_perform_transform(
177 &self,
178 cx: &mut js::context::JSContext,
179 global: &GlobalScope,
180 chunk: SafeHandleValue,
181 ) -> Fallible<Rc<Promise>> {
182 // Let transformPromise be the result of performing controller.[[transformAlgorithm]], passing chunk.
183 let transform_promise = self.perform_transform(cx, global, chunk)?;
184
185 // Return the result of reacting to transformPromise with the following rejection steps given the argument r:
186 rooted!(&in(cx) let mut reject_handler = Some(TransformTransformPromiseRejection {
187 controller: Dom::from_ref(self),
188 }));
189
190 let handler = PromiseNativeHandler::new(
191 global,
192 None,
193 reject_handler.take().map(|h| Box::new(h) as Box<_>),
194 CanGc::from_cx(cx),
195 );
196 let mut realm = enter_auto_realm(cx, global);
197 let realm = &mut realm.current_realm();
198 let in_realm_proof = realm.into();
199 let comp = InRealm::Already(&in_realm_proof);
200 transform_promise.append_native_handler(&handler, comp, CanGc::from_cx(realm));
201
202 Ok(transform_promise)
203 }
204
205 pub(crate) fn perform_transform(
206 &self,
207 cx: &mut js::context::JSContext,
208 global: &GlobalScope,
209 chunk: SafeHandleValue,
210 ) -> Fallible<Rc<Promise>> {
211 let result = match &self.transformer_type {
212 // <https://streams.spec.whatwg.org/#set-up-transform-stream-default-controller-from-transformer>
213 TransformerType::Js {
214 transform,
215 transform_obj,
216 ..
217 } => {
218 // Step 5. If transformerDict["transform"] exists, set
219 // transformAlgorithm to an algorithm which takes an argument
220 // chunk and returns the result of invoking
221 // transformerDict["transform"] with argument list « chunk,
222 // controller » and callback this value transformer.
223 let algo = transform.borrow().clone();
224 if let Some(transform) = algo {
225 rooted!(&in(cx) let this_object = transform_obj.get());
226 transform
227 .Call_(
228 &this_object.handle(),
229 chunk,
230 self,
231 ExceptionHandling::Rethrow,
232 CanGc::from_cx(cx),
233 )
234 .unwrap_or_else(|e| {
235 let p = Promise::new2(cx, global);
236 p.reject_error(e, CanGc::from_cx(cx));
237 p
238 })
239 } else {
240 // Step 2. Let transformAlgorithm be the following steps, taking a chunk argument:
241 // Let result be TransformStreamDefaultControllerEnqueue(controller, chunk).
242 // If result is an abrupt completion, return a promise rejected with result.[[Value]].
243 if let Err(error) = self.enqueue(cx, global, chunk) {
244 rooted!(&in(cx) let mut error_val = UndefinedValue());
245 error.to_jsval(
246 cx.into(),
247 global,
248 error_val.handle_mut(),
249 CanGc::from_cx(cx),
250 );
251 Promise::new_rejected(
252 global,
253 cx.into(),
254 error_val.handle(),
255 CanGc::from_cx(cx),
256 )
257 } else {
258 // Otherwise, return a promise resolved with undefined.
259 Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx))
260 }
261 }
262 },
263 TransformerType::Decoder(decoder) => {
264 // <https://encoding.spec.whatwg.org/#dom-textdecoderstream>
265 // Step 7. Let transformAlgorithm be an algorithm which takes a
266 // chunk argument and runs the decode and enqueue a chunk
267 // algorithm with this and chunk.
268 decode_and_enqueue_a_chunk(cx, global, chunk, decoder, self)
269 // <https://streams.spec.whatwg.org/#transformstream-set-up>
270 // Step 5. Let transformAlgorithmWrapper be an algorithm that runs these steps given a value chunk:
271 // Step 5.1 Let result be the result of running transformAlgorithm given chunk.
272 // Step 5.2 If result is a Promise, then return result.
273 // Note: not applicable, the spec does NOT require deode_and_enqueue_a_chunk() to return a Promise
274 // Step 5.3 Return a promise resolved with undefined.
275 .map(|_| Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx)))
276 .unwrap_or_else(|e| {
277 // <https://streams.spec.whatwg.org/#transformstream-set-up>
278 // Step 5.1 If this throws an exception e,
279 let mut realm = enter_auto_realm(cx, self);
280 let realm = &mut realm.current_realm();
281 let p = Promise::new_in_realm(realm);
282 // return a promise rejected with e.
283 p.reject_error(e, CanGc::from_cx(realm));
284 p
285 })
286 },
287 TransformerType::Encoder(encoder) => {
288 // <https://encoding.spec.whatwg.org/#dom-textencoderstream>
289 // Step 2. Let transformAlgorithm be an algorithm which takes a chunk argument and runs the encode
290 // and enqueue a chunk algorithm with this and chunk.
291 encode_and_enqueue_a_chunk(cx, global, chunk, encoder, self)
292 // <https://streams.spec.whatwg.org/#transformstream-set-up>
293 // Step 5. Let transformAlgorithmWrapper be an algorithm that runs these steps given a value chunk:
294 // Step 5.1 Let result be the result of running transformAlgorithm given chunk.
295 // Step 5.2 If result is a Promise, then return result.
296 // Note: not applicable, the spec does NOT require encode_and_enqueue_a_chunk() to return a Promise
297 // Step 5.3 Return a promise resolved with undefined.
298 .map(|_| Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx)))
299 .unwrap_or_else(|e| {
300 // <https://streams.spec.whatwg.org/#transformstream-set-up>
301 // Step 5.1 If this throws an exception e,
302 let mut realm = enter_auto_realm(cx, self);
303 let realm = &mut realm.current_realm();
304 let p = Promise::new_in_realm(realm);
305 // return a promise rejected with e.
306 p.reject_error(e, CanGc::from_cx(realm));
307 p
308 })
309 },
310 TransformerType::Compressor(cs) => {
311 // <https://compression.spec.whatwg.org/#dom-compressionstream-compressionstream>
312 // Step 3. Let transformAlgorithm be an algorithm which takes a chunk argument and
313 // runs the compress and enqueue a chunk algorithm with this and chunk.
314 compress_and_enqueue_a_chunk(cx, global, cs, chunk, self)
315 // <https://streams.spec.whatwg.org/#transformstream-set-up>
316 // Step 5. Let transformAlgorithmWrapper be an algorithm that runs these steps given a value chunk:
317 // Step 5.1 Let result be the result of running transformAlgorithm given chunk.
318 // Step 5.2 If result is a Promise, then return result.
319 // Note: not applicable, the spec does NOT require
320 // compress_and_enqueue_a_chunk() to return a Promise.
321 // Step 5.3 Return a promise resolved with undefined.
322 .map(|_| Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx)))
323 .unwrap_or_else(|e| {
324 // <https://streams.spec.whatwg.org/#transformstream-set-up>
325 // Step 5.1 If this throws an exception e,
326 let mut realm = enter_auto_realm(cx, self);
327 let realm = &mut realm.current_realm();
328 let p = Promise::new_in_realm(realm);
329 // return a promise rejected with e.
330 p.reject_error(e, CanGc::from_cx(realm));
331 p
332 })
333 },
334 TransformerType::Decompressor(ds) => {
335 // <https://compression.spec.whatwg.org/#dom-decompressionstream-decompressionstream>
336 // Step 3. Let transformAlgorithm be an algorithm which takes a chunk argument and
337 // runs the decompress and enqueue a chunk algorithm with this and chunk.
338 decompress_and_enqueue_a_chunk(cx, global, ds, chunk, self)
339 // <https://streams.spec.whatwg.org/#transformstream-set-up>
340 // Step 5. Let transformAlgorithmWrapper be an algorithm that runs these steps given a value chunk:
341 // Step 5.1 Let result be the result of running transformAlgorithm given chunk.
342 // Step 5.2 If result is a Promise, then return result.
343 // Note: not applicable, the spec does NOT require
344 // decompress_and_enqueue_a_chunk() to return a Promise
345 // Step 5.3 Return a promise resolved with undefined.
346 .map(|_| Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx)))
347 .unwrap_or_else(|e| {
348 // <https://streams.spec.whatwg.org/#transformstream-set-up>
349 // Step 5.1 If this throws an exception e,
350 let mut realm = enter_auto_realm(cx, self);
351 let realm = &mut realm.current_realm();
352 let p = Promise::new_in_realm(realm);
353 // return a promise rejected with e.
354 p.reject_error(e, CanGc::from_cx(realm));
355 p
356 })
357 },
358 };
359
360 Ok(result)
361 }
362
363 pub(crate) fn perform_cancel(
364 &self,
365 cx: SafeJSContext,
366 global: &GlobalScope,
367 chunk: SafeHandleValue,
368 can_gc: CanGc,
369 ) -> Fallible<Rc<Promise>> {
370 let result = match &self.transformer_type {
371 // <https://streams.spec.whatwg.org/#set-up-transform-stream-default-controller-from-transformer>
372 TransformerType::Js {
373 cancel,
374 transform_obj,
375 ..
376 } => {
377 // Step 7. If transformerDict["cancel"] exists, set
378 // cancelAlgorithm to an algorithm which takes an argument
379 // reason and returns the result of invoking
380 // transformerDict["cancel"] with argument list « reason » and
381 // callback this value transformer.
382 let algo = cancel.borrow().clone();
383 if let Some(cancel) = algo {
384 rooted!(in(*cx) let this_object = transform_obj.get());
385 cancel
386 .Call_(
387 &this_object.handle(),
388 chunk,
389 ExceptionHandling::Rethrow,
390 can_gc,
391 )
392 .unwrap_or_else(|e| {
393 let p = Promise::new(global, can_gc);
394 p.reject_error(e, can_gc);
395 p
396 })
397 } else {
398 // Step 4. Let cancelAlgorithm be an algorithm which returns a promise resolved with undefined.
399 Promise::new_resolved(global, cx, (), can_gc)
400 }
401 },
402 TransformerType::Decoder(_) => {
403 // <https://streams.spec.whatwg.org/#transformstream-set-up>
404 // Step 7. Let cancelAlgorithmWrapper be an algorithm that runs these steps given a value reason:
405 // Step 7.1 Let result be the result of running cancelAlgorithm given reason,
406 // if cancelAlgorithm was given, or null otherwise
407 // Note: `TextDecoderStream` does NOT specify a cancel algorithm.
408 // Step 7.2 If result is a Promise, then return result.
409 // Note: Not applicable.
410 // Step 7.3 Return a promise resolved with undefined.
411 Promise::new_resolved(global, cx, (), can_gc)
412 },
413 TransformerType::Encoder(_) => {
414 // <https://streams.spec.whatwg.org/#transformstream-set-up>
415 // Step 7. Let cancelAlgorithmWrapper be an algorithm that runs these steps given a value reason:
416 // Step 7.1 Let result be the result of running cancelAlgorithm given reason,
417 // if cancelAlgorithm was given, or null otherwise
418 // Note: `TextDecoderStream` does NOT specify a cancel algorithm.
419 // Step 7.2 If result is a Promise, then return result.
420 // Note: Not applicable.
421 // Step 7.3 Return a promise resolved with undefined.
422 Promise::new_resolved(global, cx, (), can_gc)
423 },
424 TransformerType::Compressor(_) => {
425 // <https://streams.spec.whatwg.org/#transformstream-set-up>
426 // Step 7. Let cancelAlgorithmWrapper be an algorithm that runs these steps given a value reason:
427 // Step 7.1 Let result be the result of running cancelAlgorithm given reason,
428 // if cancelAlgorithm was given, or null otherwise
429 // Note: `CompressionStream` does NOT specify a cancel algorithm.
430 // Step 7.2 If result is a Promise, then return result.
431 // Note: Not applicable.
432 // Step 7.3 Return a promise resolved with undefined.
433 Promise::new_resolved(global, cx, (), can_gc)
434 },
435 TransformerType::Decompressor(_) => {
436 // <https://streams.spec.whatwg.org/#transformstream-set-up>
437 // Step 7. Let cancelAlgorithmWrapper be an algorithm that runs these steps given a value reason:
438 // Step 7.1 Let result be the result of running cancelAlgorithm given reason,
439 // if cancelAlgorithm was given, or null otherwise
440 // Note: `DecompressionStream` does NOT specify a cancel algorithm.
441 // Step 7.2 If result is a Promise, then return result.
442 // Note: Not applicable.
443 // Step 7.3 Return a promise resolved with undefined.
444 Promise::new_resolved(global, cx, (), can_gc)
445 },
446 };
447
448 Ok(result)
449 }
450
451 pub(crate) fn perform_flush(
452 &self,
453 cx: &mut js::context::JSContext,
454 global: &GlobalScope,
455 ) -> Fallible<Rc<Promise>> {
456 let result = match &self.transformer_type {
457 // <https://streams.spec.whatwg.org/#set-up-transform-stream-default-controller-from-transformer>
458 TransformerType::Js {
459 flush,
460 transform_obj,
461 ..
462 } => {
463 // Step 6. If transformerDict["flush"] exists, set flushAlgorithm to an
464 // algorithm which returns the result of invoking
465 // transformerDict["flush"] with argument list « controller »
466 // and callback this value transformer.
467 let algo = flush.borrow().clone();
468 if let Some(flush) = algo {
469 rooted!(&in(cx) let this_object = transform_obj.get());
470 flush
471 .Call_(
472 &this_object.handle(),
473 self,
474 ExceptionHandling::Rethrow,
475 CanGc::from_cx(cx),
476 )
477 .unwrap_or_else(|e| {
478 let p = Promise::new2(cx, global);
479 p.reject_error(e, CanGc::from_cx(cx));
480 p
481 })
482 } else {
483 // Step 3. Let flushAlgorithm be an algorithm which returns a promise resolved with undefined.
484 Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx))
485 }
486 },
487 TransformerType::Decoder(decoder) => {
488 // <https://encoding.spec.whatwg.org/#dom-textdecoderstream>
489 // Step 8. Let flushAlgorithm be an algorithm which takes no
490 // arguments and runs the flush and enqueue algorithm with this.
491 flush_and_enqueue(cx, global, decoder, self)
492 // <https://streams.spec.whatwg.org/#transformstream-set-up>
493 // Step 6. Let flushAlgorithmWrapper be an algorithm that runs these steps:
494 // Step 6.1 Let result be the result of running flushAlgorithm,
495 // if flushAlgorithm was given, or null otherwise.
496 // Step 6.2 If result is a Promise, then return result.
497 // Note: Not applicable. The spec does NOT require flush_and_enqueue algo to return a Promise
498 // Step 6.3 Return a promise resolved with undefined.
499 .map(|_| Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx)))
500 .unwrap_or_else(|e| {
501 // <https://streams.spec.whatwg.org/#transformstream-set-up>
502 // Step 6.1 If this throws an exception e,
503 let mut realm = enter_auto_realm(cx, self);
504 let realm = &mut realm.current_realm();
505 let p = Promise::new_in_realm(realm);
506 // return a promise rejected with e.
507 p.reject_error(e, CanGc::from_cx(realm));
508 p
509 })
510 },
511 TransformerType::Encoder(encoder) => {
512 // <https://encoding.spec.whatwg.org/#textencoderstream-encoder>
513 // Step 3. Let flushAlgorithm be an algorithm which runs the encode and flush algorithm with this.
514 encode_and_flush(cx, global, encoder, self)
515 // <https://streams.spec.whatwg.org/#transformstream-set-up>
516 // Step 6. Let flushAlgorithmWrapper be an algorithm that runs these steps:
517 // Step 6.1 Let result be the result of running flushAlgorithm,
518 // if flushAlgorithm was given, or null otherwise.
519 // Step 6.2 If result is a Promise, then return result.
520 // Note: Not applicable. The spec does NOT require encode_and_flush algo to return a Promise
521 // Step 6.3 Return a promise resolved with undefined.
522 .map(|_| Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx)))
523 .unwrap_or_else(|e| {
524 // <https://streams.spec.whatwg.org/#transformstream-set-up>
525 // Step 6.1 If this throws an exception e,
526 let mut realm = enter_auto_realm(cx, self);
527 let realm = &mut realm.current_realm();
528 let p = Promise::new_in_realm(realm);
529 // return a promise rejected with e.
530 p.reject_error(e, CanGc::from_cx(realm));
531 p
532 })
533 },
534 TransformerType::Compressor(cs) => {
535 // <https://compression.spec.whatwg.org/#dom-compressionstream-compressionstream>
536 // Step 4. Let flushAlgorithm be an algorithm which takes no argument and runs the
537 // compress flush and enqueue algorithm with this.
538 compress_flush_and_enqueue(cx, global, cs, self)
539 // <https://streams.spec.whatwg.org/#transformstream-set-up>
540 // Step 6. Let flushAlgorithmWrapper be an algorithm that runs these steps:
541 // Step 6.1 Let result be the result of running flushAlgorithm,
542 // if flushAlgorithm was given, or null otherwise.
543 // Step 6.2 If result is a Promise, then return result.
544 // Note: Not applicable. The spec does NOT require compress_flush_and_enqueue
545 // algo to return a Promise.
546 // Step 6.3 Return a promise resolved with undefined.
547 .map(|_| Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx)))
548 .unwrap_or_else(|e| {
549 // <https://streams.spec.whatwg.org/#transformstream-set-up>
550 // Step 6.1 If this throws an exception e,
551 let mut realm = enter_auto_realm(cx, self);
552 let realm = &mut realm.current_realm();
553 let p = Promise::new_in_realm(realm);
554 // return a promise rejected with e.
555 p.reject_error(e, CanGc::from_cx(realm));
556 p
557 })
558 },
559 TransformerType::Decompressor(ds) => {
560 // <https://compression.spec.whatwg.org/#dom-decompressionstream-decompressionstream>
561 // Step 4. Let flushAlgorithm be an algorithm which takes no argument and runs the
562 // decompress flush and enqueue algorithm with this.
563 decompress_flush_and_enqueue(cx, global, ds, self)
564 // <https://streams.spec.whatwg.org/#transformstream-set-up>
565 // Step 6. Let flushAlgorithmWrapper be an algorithm that runs these steps:
566 // Step 6.1 Let result be the result of running flushAlgorithm,
567 // if flushAlgorithm was given, or null otherwise.
568 // Step 6.2 If result is a Promise, then return result.
569 // Note: Not applicable. The spec does NOT require decompress_flush_and_enqueue
570 // algo to return a Promise.
571 // Step 6.3 Return a promise resolved with undefined.
572 .map(|_| Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx)))
573 .unwrap_or_else(|e| {
574 // <https://streams.spec.whatwg.org/#transformstream-set-up>
575 // Step 6.1 If this throws an exception e,
576 let mut realm = enter_auto_realm(cx, self);
577 let realm = &mut realm.current_realm();
578 let p = Promise::new_in_realm(realm);
579 // return a promise rejected with e.
580 p.reject_error(e, CanGc::from_cx(realm));
581 p
582 })
583 },
584 };
585
586 Ok(result)
587 }
588
589 /// <https://streams.spec.whatwg.org/#transform-stream-default-controller-enqueue>
590 #[expect(unsafe_code)]
591 pub(crate) fn enqueue(
592 &self,
593 cx: &mut js::context::JSContext,
594 global: &GlobalScope,
595 chunk: SafeHandleValue,
596 ) -> Fallible<()> {
597 // Let stream be controller.[[stream]].
598 let stream = self.stream.get().expect("stream is null");
599
600 // Let readableController be stream.[[readable]].[[controller]].
601 let readable = stream.get_readable();
602 let readable_controller = readable.get_default_controller();
603
604 // If ! ReadableStreamDefaultControllerCanCloseOrEnqueue(readableController)
605 // is false, throw a TypeError exception.
606 if !readable_controller.can_close_or_enqueue() {
607 return Err(Error::Type(
608 c"ReadableStreamDefaultControllerCanCloseOrEnqueue is false".to_owned(),
609 ));
610 }
611
612 // Let enqueueResult be ReadableStreamDefaultControllerEnqueue(readableController, chunk).
613 // If enqueueResult is an abrupt completion,
614 if let Err(error) = readable_controller.enqueue(cx, chunk) {
615 // Perform ! TransformStreamErrorWritableAndUnblockWrite(stream, enqueueResult.[[Value]]).
616 rooted!(&in(cx) let mut rooted_error = UndefinedValue());
617 error.clone().to_jsval(
618 cx.into(),
619 global,
620 rooted_error.handle_mut(),
621 CanGc::from_cx(cx),
622 );
623 stream.error_writable_and_unblock_write(cx, global, rooted_error.handle());
624
625 // Throw stream.[[readable]].[[storedError]].
626 unsafe {
627 if !JS_IsExceptionPending(cx.raw_cx()) {
628 rooted!(&in(cx) let mut stored_error = UndefinedValue());
629 readable.get_stored_error(stored_error.handle_mut());
630
631 JS_SetPendingException(
632 cx.raw_cx(),
633 stored_error.handle().into(),
634 ExceptionStackBehavior::Capture,
635 );
636 }
637 }
638 return Err(error);
639 }
640
641 // Let backpressure be ! ReadableStreamDefaultControllerHasBackpressure(readableController).
642 let backpressure = readable_controller.has_backpressure();
643
644 // If backpressure is not stream.[[backpressure]],
645 if backpressure != stream.get_backpressure() {
646 // Assert: backpressure is true.
647 assert!(backpressure);
648
649 // Perform ! TransformStreamSetBackpressure(stream, true).
650 stream.set_backpressure(global, true, CanGc::from_cx(cx));
651 }
652 Ok(())
653 }
654
655 /// <https://streams.spec.whatwg.org/#transform-stream-default-controller-error>
656 pub(crate) fn error(
657 &self,
658 cx: &mut js::context::JSContext,
659 global: &GlobalScope,
660 reason: SafeHandleValue,
661 ) {
662 // Perform ! TransformStreamError(controller.[[stream]], e).
663 self.stream
664 .get()
665 .expect("stream is undefined")
666 .error(cx, global, reason);
667 }
668
669 /// <https://streams.spec.whatwg.org/#transform-stream-default-controller-clear-algorithms>
670 pub(crate) fn clear_algorithms(&self) {
671 if let TransformerType::Js {
672 cancel,
673 flush,
674 transform,
675 ..
676 } = &self.transformer_type
677 {
678 // Set controller.[[transformAlgorithm]] to undefined.
679 transform.replace(None);
680
681 // Set controller.[[flushAlgorithm]] to undefined.
682 flush.replace(None);
683
684 // Set controller.[[cancelAlgorithm]] to undefined.
685 cancel.replace(None);
686 }
687 }
688
689 /// <https://streams.spec.whatwg.org/#transform-stream-default-controller-terminate>
690 fn terminate(&self, cx: &mut js::context::JSContext, global: &GlobalScope) {
691 // Let stream be controller.[[stream]].
692 let stream = self.stream.get().expect("stream is null");
693
694 // Let readableController be stream.[[readable]].[[controller]].
695 let readable = stream.get_readable();
696 let readable_controller = readable.get_default_controller();
697
698 // Perform ! ReadableStreamDefaultControllerClose(readableController).
699 readable_controller.close(CanGc::from_cx(cx));
700
701 // Let error be a TypeError exception indicating that the stream has been terminated.
702 let error = Error::Type(c"stream has been terminated".to_owned());
703
704 // Perform ! TransformStreamErrorWritableAndUnblockWrite(stream, error).
705 rooted!(&in(cx) let mut rooted_error = UndefinedValue());
706 error.to_jsval(
707 cx.into(),
708 global,
709 rooted_error.handle_mut(),
710 CanGc::from_cx(cx),
711 );
712 stream.error_writable_and_unblock_write(cx, global, rooted_error.handle());
713 }
714}
715
716impl TransformStreamDefaultControllerMethods<crate::DomTypeHolder>
717 for TransformStreamDefaultController
718{
719 /// <https://streams.spec.whatwg.org/#ts-default-controller-desired-size>
720 fn GetDesiredSize(&self) -> Option<f64> {
721 // Let readableController be this.[[stream]].[[readable]].[[controller]].
722 let readable_controller = self
723 .stream
724 .get()
725 .expect("stream is null")
726 .get_readable()
727 .get_default_controller();
728
729 // Return ! ReadableStreamDefaultControllerGetDesiredSize(readableController).
730 readable_controller.get_desired_size()
731 }
732
733 /// <https://streams.spec.whatwg.org/#ts-default-controller-enqueue>
734 fn Enqueue(&self, cx: &mut js::context::JSContext, chunk: SafeHandleValue) -> Fallible<()> {
735 // Perform ? TransformStreamDefaultControllerEnqueue(this, chunk).
736 self.enqueue(cx, &self.global(), chunk)
737 }
738
739 /// <https://streams.spec.whatwg.org/#ts-default-controller-error>
740 fn Error(&self, cx: &mut js::context::JSContext, reason: SafeHandleValue) -> Fallible<()> {
741 // Perform ? TransformStreamDefaultControllerError(this, e).
742 self.error(cx, &self.global(), reason);
743 Ok(())
744 }
745
746 /// <https://streams.spec.whatwg.org/#ts-default-controller-terminate>
747 fn Terminate(&self, cx: &mut js::context::JSContext) -> Fallible<()> {
748 // Perform ? TransformStreamDefaultControllerTerminate(this).
749 self.terminate(cx, &self.global());
750 Ok(())
751 }
752}