script/dom/stream/transformstreamdefaultcontroller.rs
1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
4
5use std::cell::RefCell;
6use std::rc::Rc;
7
8use dom_struct::dom_struct;
9use js::context::JSContext;
10use js::jsapi::{
11 ExceptionStackBehavior, Heap, JS_IsExceptionPending, JS_SetPendingException, JSObject,
12};
13use js::jsval::UndefinedValue;
14use js::realm::CurrentRealm;
15use js::rust::{HandleObject as SafeHandleObject, HandleValue as SafeHandleValue};
16use script_bindings::cell::DomRefCell;
17use script_bindings::reflector::{Reflector, reflect_dom_object};
18
19use crate::dom::bindings::callback::ExceptionHandling;
20use crate::dom::bindings::codegen::Bindings::TransformStreamDefaultControllerBinding::TransformStreamDefaultControllerMethods;
21use crate::dom::bindings::codegen::Bindings::TransformerBinding::{
22 Transformer, TransformerCancelCallback, TransformerFlushCallback, TransformerTransformCallback,
23};
24use crate::dom::bindings::error::{Error, ErrorToJsval, Fallible};
25use crate::dom::bindings::reflector::DomGlobal;
26use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom};
27use crate::dom::compressionstream::{
28 CompressionStream, compress_and_enqueue_a_chunk, compress_flush_and_enqueue,
29};
30use crate::dom::decompressionstream::{
31 decompress_and_enqueue_a_chunk, decompress_flush_and_enqueue,
32};
33use crate::dom::encoding::textdecodercommon::TextDecoderCommon;
34use crate::dom::encoding::textdecoderstream::{decode_and_enqueue_a_chunk, flush_and_enqueue};
35use crate::dom::encoding::textencoderstream::{
36 Encoder, encode_and_enqueue_a_chunk, encode_and_flush,
37};
38use crate::dom::globalscope::GlobalScope;
39use crate::dom::promise::Promise;
40use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
41use crate::dom::types::{DecompressionStream, TransformStream};
42use crate::realms::enter_auto_realm;
43use crate::script_runtime::CanGc;
44
45impl js::gc::Rootable for TransformTransformPromiseRejection {}
46
47/// Reacting to transformPromise as part of
48/// <https://streams.spec.whatwg.org/#transform-stream-default-controller-perform-transform>
49#[derive(JSTraceable, MallocSizeOf)]
50#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
51struct TransformTransformPromiseRejection {
52 controller: Dom<TransformStreamDefaultController>,
53}
54
55impl Callback for TransformTransformPromiseRejection {
56 /// Reacting to transformPromise with the following fulfillment steps:
57 fn callback(&self, cx: &mut CurrentRealm, v: SafeHandleValue) {
58 // Perform ! TransformStreamError(controller.[[stream]], r).
59 self.controller.error(cx, &self.controller.global(), v);
60
61 // Throw r.
62 // Note: this is done part of perform_transform().
63 }
64}
65
66/// The type of transformer algorithms we are using
67#[derive(JSTraceable)]
68pub(crate) enum TransformerType {
69 /// Algorithms provided by Js callbacks
70 Js {
71 /// <https://streams.spec.whatwg.org/#transformstreamdefaultcontroller-cancelalgorithm>
72 cancel: RefCell<Option<Rc<TransformerCancelCallback>>>,
73
74 /// <https://streams.spec.whatwg.org/#transformstreamdefaultcontroller-flushalgorithm>
75 flush: RefCell<Option<Rc<TransformerFlushCallback>>>,
76
77 /// <https://streams.spec.whatwg.org/#transformstreamdefaultcontroller-transformalgorithm>
78 transform: RefCell<Option<Rc<TransformerTransformCallback>>>,
79
80 /// The JS object used as `this` when invoking sink algorithms.
81 transform_obj: Heap<*mut JSObject>,
82 },
83 /// Algorithms supporting `TextDecoderStream` are implemented in Rust
84 ///
85 /// <https://encoding.spec.whatwg.org/#textdecodercommon>
86 Decoder(Rc<TextDecoderCommon>),
87 /// Algorithms supporting `TextEncoderStream` are implemented in Rust
88 ///
89 /// <https://encoding.spec.whatwg.org/#textencoderstream-encoder>
90 Encoder(Encoder),
91 /// Algorithms supporting `CompressionStream` are implemented in Rust
92 ///
93 /// <https://compression.spec.whatwg.org/#compressionstream>
94 Compressor(DomRoot<CompressionStream>),
95 /// Algorithms supporting `DecompressionStream` are implemented in Rust
96 ///
97 /// <https://compression.spec.whatwg.org/#decompressionstream>
98 Decompressor(DomRoot<DecompressionStream>),
99}
100
101impl TransformerType {
102 pub(crate) fn new_from_js_transformer(transformer: &Transformer) -> TransformerType {
103 TransformerType::Js {
104 cancel: RefCell::new(transformer.cancel.clone()),
105 flush: RefCell::new(transformer.flush.clone()),
106 transform: RefCell::new(transformer.transform.clone()),
107 transform_obj: Default::default(),
108 }
109 }
110}
111
112/// <https://streams.spec.whatwg.org/#transformstreamdefaultcontroller>
113#[dom_struct]
114pub struct TransformStreamDefaultController {
115 reflector_: Reflector,
116
117 /// The type of the underlying transformer used. Besides the JS variant,
118 /// there will be other variant(s) for `TextDecoderStream`
119 #[ignore_malloc_size_of = "transformer_type"]
120 transformer_type: TransformerType,
121
122 /// <https://streams.spec.whatwg.org/#TransformStreamDefaultController-stream>
123 stream: MutNullableDom<TransformStream>,
124
125 /// <https://streams.spec.whatwg.org/#transformstreamdefaultcontroller-finishpromise>
126 #[conditional_malloc_size_of]
127 finish_promise: DomRefCell<Option<Rc<Promise>>>,
128}
129
130impl TransformStreamDefaultController {
131 fn new_inherited(transformer_type: TransformerType) -> TransformStreamDefaultController {
132 TransformStreamDefaultController {
133 reflector_: Reflector::new(),
134 transformer_type,
135 stream: MutNullableDom::new(None),
136 finish_promise: DomRefCell::new(None),
137 }
138 }
139
140 pub(crate) fn new(
141 global: &GlobalScope,
142 transformer_type: TransformerType,
143 can_gc: CanGc,
144 ) -> DomRoot<TransformStreamDefaultController> {
145 reflect_dom_object(
146 Box::new(TransformStreamDefaultController::new_inherited(
147 transformer_type,
148 )),
149 global,
150 can_gc,
151 )
152 }
153
154 /// Setting the JS object after the heap has settled down.
155 ///
156 /// Note that this has no effect if the transformer type is not `TransformerType::Js`
157 pub(crate) fn set_transform_obj(&self, this_object: SafeHandleObject) {
158 if let TransformerType::Js { transform_obj, .. } = &self.transformer_type {
159 transform_obj.set(*this_object)
160 } else {
161 unreachable!("Non-Js transformer type should not set transform_obj")
162 }
163 }
164
165 pub(crate) fn set_stream(&self, stream: &TransformStream) {
166 self.stream.set(Some(stream));
167 }
168
169 pub(crate) fn get_finish_promise(&self) -> Option<Rc<Promise>> {
170 self.finish_promise.borrow().clone()
171 }
172
173 pub(crate) fn set_finish_promise(&self, promise: Rc<Promise>) {
174 *self.finish_promise.borrow_mut() = Some(promise);
175 }
176
177 /// <https://streams.spec.whatwg.org/#transform-stream-default-controller-perform-transform>
178 pub(crate) fn transform_stream_default_controller_perform_transform(
179 &self,
180 cx: &mut JSContext,
181 global: &GlobalScope,
182 chunk: SafeHandleValue,
183 ) -> Fallible<Rc<Promise>> {
184 // Let transformPromise be the result of performing controller.[[transformAlgorithm]], passing chunk.
185 let transform_promise = self.perform_transform(cx, global, chunk)?;
186
187 // Return the result of reacting to transformPromise with the following rejection steps given the argument r:
188 rooted!(&in(cx) let mut reject_handler = Some(TransformTransformPromiseRejection {
189 controller: Dom::from_ref(self),
190 }));
191
192 let handler = PromiseNativeHandler::new(
193 global,
194 None,
195 reject_handler.take().map(|h| Box::new(h) as Box<_>),
196 CanGc::from_cx(cx),
197 );
198 let mut realm = enter_auto_realm(cx, global);
199 let realm = &mut realm.current_realm();
200 transform_promise.append_native_handler(realm, &handler);
201
202 Ok(transform_promise)
203 }
204
205 pub(crate) fn perform_transform(
206 &self,
207 cx: &mut JSContext,
208 global: &GlobalScope,
209 chunk: SafeHandleValue,
210 ) -> Fallible<Rc<Promise>> {
211 let result = match &self.transformer_type {
212 // <https://streams.spec.whatwg.org/#set-up-transform-stream-default-controller-from-transformer>
213 TransformerType::Js {
214 transform,
215 transform_obj,
216 ..
217 } => {
218 // Step 5. If transformerDict["transform"] exists, set
219 // transformAlgorithm to an algorithm which takes an argument
220 // chunk and returns the result of invoking
221 // transformerDict["transform"] with argument list « chunk,
222 // controller » and callback this value transformer.
223 let algo = transform.borrow().clone();
224 if let Some(transform) = algo {
225 rooted!(&in(cx) let this_object = transform_obj.get());
226 transform
227 .Call_(
228 cx,
229 &this_object.handle(),
230 chunk,
231 self,
232 ExceptionHandling::Rethrow,
233 )
234 .unwrap_or_else(|e| {
235 let p = Promise::new2(cx, global);
236 p.reject_error(e, CanGc::from_cx(cx));
237 p
238 })
239 } else {
240 // Step 2. Let transformAlgorithm be the following steps, taking a chunk argument:
241 // Let result be TransformStreamDefaultControllerEnqueue(controller, chunk).
242 // If result is an abrupt completion, return a promise rejected with result.[[Value]].
243 if let Err(error) = self.enqueue(cx, global, chunk) {
244 rooted!(&in(cx) let mut error_val = UndefinedValue());
245 error.to_jsval(
246 cx.into(),
247 global,
248 error_val.handle_mut(),
249 CanGc::from_cx(cx),
250 );
251 Promise::new_rejected(
252 global,
253 cx.into(),
254 error_val.handle(),
255 CanGc::from_cx(cx),
256 )
257 } else {
258 // Otherwise, return a promise resolved with undefined.
259 Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx))
260 }
261 }
262 },
263 TransformerType::Decoder(decoder) => {
264 // <https://encoding.spec.whatwg.org/#dom-textdecoderstream>
265 // Step 7. Let transformAlgorithm be an algorithm which takes a
266 // chunk argument and runs the decode and enqueue a chunk
267 // algorithm with this and chunk.
268 decode_and_enqueue_a_chunk(cx, global, chunk, decoder, self)
269 // <https://streams.spec.whatwg.org/#transformstream-set-up>
270 // Step 5. Let transformAlgorithmWrapper be an algorithm that runs these steps given a value chunk:
271 // Step 5.1 Let result be the result of running transformAlgorithm given chunk.
272 // Step 5.2 If result is a Promise, then return result.
273 // Note: not applicable, the spec does NOT require deode_and_enqueue_a_chunk() to return a Promise
274 // Step 5.3 Return a promise resolved with undefined.
275 .map(|_| Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx)))
276 .unwrap_or_else(|e| {
277 // <https://streams.spec.whatwg.org/#transformstream-set-up>
278 // Step 5.1 If this throws an exception e,
279 let mut realm = enter_auto_realm(cx, self);
280 let realm = &mut realm.current_realm();
281 let p = Promise::new_in_realm(realm);
282 // return a promise rejected with e.
283 p.reject_error(e, CanGc::from_cx(realm));
284 p
285 })
286 },
287 TransformerType::Encoder(encoder) => {
288 // <https://encoding.spec.whatwg.org/#dom-textencoderstream>
289 // Step 2. Let transformAlgorithm be an algorithm which takes a chunk argument and runs the encode
290 // and enqueue a chunk algorithm with this and chunk.
291 encode_and_enqueue_a_chunk(cx, global, chunk, encoder, self)
292 // <https://streams.spec.whatwg.org/#transformstream-set-up>
293 // Step 5. Let transformAlgorithmWrapper be an algorithm that runs these steps given a value chunk:
294 // Step 5.1 Let result be the result of running transformAlgorithm given chunk.
295 // Step 5.2 If result is a Promise, then return result.
296 // Note: not applicable, the spec does NOT require encode_and_enqueue_a_chunk() to return a Promise
297 // Step 5.3 Return a promise resolved with undefined.
298 .map(|_| Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx)))
299 .unwrap_or_else(|e| {
300 // <https://streams.spec.whatwg.org/#transformstream-set-up>
301 // Step 5.1 If this throws an exception e,
302 let mut realm = enter_auto_realm(cx, self);
303 let realm = &mut realm.current_realm();
304 let p = Promise::new_in_realm(realm);
305 // return a promise rejected with e.
306 p.reject_error(e, CanGc::from_cx(realm));
307 p
308 })
309 },
310 TransformerType::Compressor(cs) => {
311 // <https://compression.spec.whatwg.org/#dom-compressionstream-compressionstream>
312 // Step 3. Let transformAlgorithm be an algorithm which takes a chunk argument and
313 // runs the compress and enqueue a chunk algorithm with this and chunk.
314 compress_and_enqueue_a_chunk(cx, global, cs, chunk, self)
315 // <https://streams.spec.whatwg.org/#transformstream-set-up>
316 // Step 5. Let transformAlgorithmWrapper be an algorithm that runs these steps given a value chunk:
317 // Step 5.1 Let result be the result of running transformAlgorithm given chunk.
318 // Step 5.2 If result is a Promise, then return result.
319 // Note: not applicable, the spec does NOT require
320 // compress_and_enqueue_a_chunk() to return a Promise.
321 // Step 5.3 Return a promise resolved with undefined.
322 .map(|_| Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx)))
323 .unwrap_or_else(|e| {
324 // <https://streams.spec.whatwg.org/#transformstream-set-up>
325 // Step 5.1 If this throws an exception e,
326 let mut realm = enter_auto_realm(cx, self);
327 let realm = &mut realm.current_realm();
328 let p = Promise::new_in_realm(realm);
329 // return a promise rejected with e.
330 p.reject_error(e, CanGc::from_cx(realm));
331 p
332 })
333 },
334 TransformerType::Decompressor(ds) => {
335 // <https://compression.spec.whatwg.org/#dom-decompressionstream-decompressionstream>
336 // Step 3. Let transformAlgorithm be an algorithm which takes a chunk argument and
337 // runs the decompress and enqueue a chunk algorithm with this and chunk.
338 decompress_and_enqueue_a_chunk(cx, global, ds, chunk, self)
339 // <https://streams.spec.whatwg.org/#transformstream-set-up>
340 // Step 5. Let transformAlgorithmWrapper be an algorithm that runs these steps given a value chunk:
341 // Step 5.1 Let result be the result of running transformAlgorithm given chunk.
342 // Step 5.2 If result is a Promise, then return result.
343 // Note: not applicable, the spec does NOT require
344 // decompress_and_enqueue_a_chunk() to return a Promise
345 // Step 5.3 Return a promise resolved with undefined.
346 .map(|_| Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx)))
347 .unwrap_or_else(|e| {
348 // <https://streams.spec.whatwg.org/#transformstream-set-up>
349 // Step 5.1 If this throws an exception e,
350 let mut realm = enter_auto_realm(cx, self);
351 let realm = &mut realm.current_realm();
352 let p = Promise::new_in_realm(realm);
353 // return a promise rejected with e.
354 p.reject_error(e, CanGc::from_cx(realm));
355 p
356 })
357 },
358 };
359
360 Ok(result)
361 }
362
363 pub(crate) fn perform_cancel(
364 &self,
365 cx: &mut JSContext,
366 global: &GlobalScope,
367 chunk: SafeHandleValue,
368 ) -> Fallible<Rc<Promise>> {
369 let result = match &self.transformer_type {
370 // <https://streams.spec.whatwg.org/#set-up-transform-stream-default-controller-from-transformer>
371 TransformerType::Js {
372 cancel,
373 transform_obj,
374 ..
375 } => {
376 // Step 7. If transformerDict["cancel"] exists, set
377 // cancelAlgorithm to an algorithm which takes an argument
378 // reason and returns the result of invoking
379 // transformerDict["cancel"] with argument list « reason » and
380 // callback this value transformer.
381 let algo = cancel.borrow().clone();
382 if let Some(cancel) = algo {
383 rooted!(&in(cx) let this_object = transform_obj.get());
384 cancel
385 .Call_(cx, &this_object.handle(), chunk, ExceptionHandling::Rethrow)
386 .unwrap_or_else(|e| {
387 let p = Promise::new2(cx, global);
388 p.reject_error(e, CanGc::from_cx(cx));
389 p
390 })
391 } else {
392 // Step 4. Let cancelAlgorithm be an algorithm which returns a promise resolved with undefined.
393 Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx))
394 }
395 },
396 TransformerType::Decoder(_) => {
397 // <https://streams.spec.whatwg.org/#transformstream-set-up>
398 // Step 7. Let cancelAlgorithmWrapper be an algorithm that runs these steps given a value reason:
399 // Step 7.1 Let result be the result of running cancelAlgorithm given reason,
400 // if cancelAlgorithm was given, or null otherwise
401 // Note: `TextDecoderStream` does NOT specify a cancel algorithm.
402 // Step 7.2 If result is a Promise, then return result.
403 // Note: Not applicable.
404 // Step 7.3 Return a promise resolved with undefined.
405 Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx))
406 },
407 TransformerType::Encoder(_) => {
408 // <https://streams.spec.whatwg.org/#transformstream-set-up>
409 // Step 7. Let cancelAlgorithmWrapper be an algorithm that runs these steps given a value reason:
410 // Step 7.1 Let result be the result of running cancelAlgorithm given reason,
411 // if cancelAlgorithm was given, or null otherwise
412 // Note: `TextDecoderStream` does NOT specify a cancel algorithm.
413 // Step 7.2 If result is a Promise, then return result.
414 // Note: Not applicable.
415 // Step 7.3 Return a promise resolved with undefined.
416 Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx))
417 },
418 TransformerType::Compressor(_) => {
419 // <https://streams.spec.whatwg.org/#transformstream-set-up>
420 // Step 7. Let cancelAlgorithmWrapper be an algorithm that runs these steps given a value reason:
421 // Step 7.1 Let result be the result of running cancelAlgorithm given reason,
422 // if cancelAlgorithm was given, or null otherwise
423 // Note: `CompressionStream` does NOT specify a cancel algorithm.
424 // Step 7.2 If result is a Promise, then return result.
425 // Note: Not applicable.
426 // Step 7.3 Return a promise resolved with undefined.
427 Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx))
428 },
429 TransformerType::Decompressor(_) => {
430 // <https://streams.spec.whatwg.org/#transformstream-set-up>
431 // Step 7. Let cancelAlgorithmWrapper be an algorithm that runs these steps given a value reason:
432 // Step 7.1 Let result be the result of running cancelAlgorithm given reason,
433 // if cancelAlgorithm was given, or null otherwise
434 // Note: `DecompressionStream` does NOT specify a cancel algorithm.
435 // Step 7.2 If result is a Promise, then return result.
436 // Note: Not applicable.
437 // Step 7.3 Return a promise resolved with undefined.
438 Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx))
439 },
440 };
441
442 Ok(result)
443 }
444
445 pub(crate) fn perform_flush(
446 &self,
447 cx: &mut JSContext,
448 global: &GlobalScope,
449 ) -> Fallible<Rc<Promise>> {
450 let result = match &self.transformer_type {
451 // <https://streams.spec.whatwg.org/#set-up-transform-stream-default-controller-from-transformer>
452 TransformerType::Js {
453 flush,
454 transform_obj,
455 ..
456 } => {
457 // Step 6. If transformerDict["flush"] exists, set flushAlgorithm to an
458 // algorithm which returns the result of invoking
459 // transformerDict["flush"] with argument list « controller »
460 // and callback this value transformer.
461 let algo = flush.borrow().clone();
462 if let Some(flush) = algo {
463 rooted!(&in(cx) let this_object = transform_obj.get());
464 flush
465 .Call_(cx, &this_object.handle(), self, ExceptionHandling::Rethrow)
466 .unwrap_or_else(|e| {
467 let p = Promise::new2(cx, global);
468 p.reject_error(e, CanGc::from_cx(cx));
469 p
470 })
471 } else {
472 // Step 3. Let flushAlgorithm be an algorithm which returns a promise resolved with undefined.
473 Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx))
474 }
475 },
476 TransformerType::Decoder(decoder) => {
477 // <https://encoding.spec.whatwg.org/#dom-textdecoderstream>
478 // Step 8. Let flushAlgorithm be an algorithm which takes no
479 // arguments and runs the flush and enqueue algorithm with this.
480 flush_and_enqueue(cx, global, decoder, self)
481 // <https://streams.spec.whatwg.org/#transformstream-set-up>
482 // Step 6. Let flushAlgorithmWrapper be an algorithm that runs these steps:
483 // Step 6.1 Let result be the result of running flushAlgorithm,
484 // if flushAlgorithm was given, or null otherwise.
485 // Step 6.2 If result is a Promise, then return result.
486 // Note: Not applicable. The spec does NOT require flush_and_enqueue algo to return a Promise
487 // Step 6.3 Return a promise resolved with undefined.
488 .map(|_| Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx)))
489 .unwrap_or_else(|e| {
490 // <https://streams.spec.whatwg.org/#transformstream-set-up>
491 // Step 6.1 If this throws an exception e,
492 let mut realm = enter_auto_realm(cx, self);
493 let realm = &mut realm.current_realm();
494 let p = Promise::new_in_realm(realm);
495 // return a promise rejected with e.
496 p.reject_error(e, CanGc::from_cx(realm));
497 p
498 })
499 },
500 TransformerType::Encoder(encoder) => {
501 // <https://encoding.spec.whatwg.org/#textencoderstream-encoder>
502 // Step 3. Let flushAlgorithm be an algorithm which runs the encode and flush algorithm with this.
503 encode_and_flush(cx, global, encoder, self)
504 // <https://streams.spec.whatwg.org/#transformstream-set-up>
505 // Step 6. Let flushAlgorithmWrapper be an algorithm that runs these steps:
506 // Step 6.1 Let result be the result of running flushAlgorithm,
507 // if flushAlgorithm was given, or null otherwise.
508 // Step 6.2 If result is a Promise, then return result.
509 // Note: Not applicable. The spec does NOT require encode_and_flush algo to return a Promise
510 // Step 6.3 Return a promise resolved with undefined.
511 .map(|_| Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx)))
512 .unwrap_or_else(|e| {
513 // <https://streams.spec.whatwg.org/#transformstream-set-up>
514 // Step 6.1 If this throws an exception e,
515 let mut realm = enter_auto_realm(cx, self);
516 let realm = &mut realm.current_realm();
517 let p = Promise::new_in_realm(realm);
518 // return a promise rejected with e.
519 p.reject_error(e, CanGc::from_cx(realm));
520 p
521 })
522 },
523 TransformerType::Compressor(cs) => {
524 // <https://compression.spec.whatwg.org/#dom-compressionstream-compressionstream>
525 // Step 4. Let flushAlgorithm be an algorithm which takes no argument and runs the
526 // compress flush and enqueue algorithm with this.
527 compress_flush_and_enqueue(cx, global, cs, self)
528 // <https://streams.spec.whatwg.org/#transformstream-set-up>
529 // Step 6. Let flushAlgorithmWrapper be an algorithm that runs these steps:
530 // Step 6.1 Let result be the result of running flushAlgorithm,
531 // if flushAlgorithm was given, or null otherwise.
532 // Step 6.2 If result is a Promise, then return result.
533 // Note: Not applicable. The spec does NOT require compress_flush_and_enqueue
534 // algo to return a Promise.
535 // Step 6.3 Return a promise resolved with undefined.
536 .map(|_| Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx)))
537 .unwrap_or_else(|e| {
538 // <https://streams.spec.whatwg.org/#transformstream-set-up>
539 // Step 6.1 If this throws an exception e,
540 let mut realm = enter_auto_realm(cx, self);
541 let realm = &mut realm.current_realm();
542 let p = Promise::new_in_realm(realm);
543 // return a promise rejected with e.
544 p.reject_error(e, CanGc::from_cx(realm));
545 p
546 })
547 },
548 TransformerType::Decompressor(ds) => {
549 // <https://compression.spec.whatwg.org/#dom-decompressionstream-decompressionstream>
550 // Step 4. Let flushAlgorithm be an algorithm which takes no argument and runs the
551 // decompress flush and enqueue algorithm with this.
552 decompress_flush_and_enqueue(cx, global, ds, self)
553 // <https://streams.spec.whatwg.org/#transformstream-set-up>
554 // Step 6. Let flushAlgorithmWrapper be an algorithm that runs these steps:
555 // Step 6.1 Let result be the result of running flushAlgorithm,
556 // if flushAlgorithm was given, or null otherwise.
557 // Step 6.2 If result is a Promise, then return result.
558 // Note: Not applicable. The spec does NOT require decompress_flush_and_enqueue
559 // algo to return a Promise.
560 // Step 6.3 Return a promise resolved with undefined.
561 .map(|_| Promise::new_resolved(global, cx.into(), (), CanGc::from_cx(cx)))
562 .unwrap_or_else(|e| {
563 // <https://streams.spec.whatwg.org/#transformstream-set-up>
564 // Step 6.1 If this throws an exception e,
565 let mut realm = enter_auto_realm(cx, self);
566 let realm = &mut realm.current_realm();
567 let p = Promise::new_in_realm(realm);
568 // return a promise rejected with e.
569 p.reject_error(e, CanGc::from_cx(realm));
570 p
571 })
572 },
573 };
574
575 Ok(result)
576 }
577
578 /// <https://streams.spec.whatwg.org/#transform-stream-default-controller-enqueue>
579 #[expect(unsafe_code)]
580 pub(crate) fn enqueue(
581 &self,
582 cx: &mut JSContext,
583 global: &GlobalScope,
584 chunk: SafeHandleValue,
585 ) -> Fallible<()> {
586 // Let stream be controller.[[stream]].
587 let stream = self.stream.get().expect("stream is null");
588
589 // Let readableController be stream.[[readable]].[[controller]].
590 let readable = stream.get_readable();
591 let readable_controller = readable.get_default_controller();
592
593 // If ! ReadableStreamDefaultControllerCanCloseOrEnqueue(readableController)
594 // is false, throw a TypeError exception.
595 if !readable_controller.can_close_or_enqueue() {
596 return Err(Error::Type(
597 c"ReadableStreamDefaultControllerCanCloseOrEnqueue is false".to_owned(),
598 ));
599 }
600
601 // Let enqueueResult be ReadableStreamDefaultControllerEnqueue(readableController, chunk).
602 // If enqueueResult is an abrupt completion,
603 if let Err(error) = readable_controller.enqueue(cx, chunk) {
604 // Perform ! TransformStreamErrorWritableAndUnblockWrite(stream, enqueueResult.[[Value]]).
605 rooted!(&in(cx) let mut rooted_error = UndefinedValue());
606 error.clone().to_jsval(
607 cx.into(),
608 global,
609 rooted_error.handle_mut(),
610 CanGc::from_cx(cx),
611 );
612 stream.error_writable_and_unblock_write(cx, global, rooted_error.handle());
613
614 // Throw stream.[[readable]].[[storedError]].
615 unsafe {
616 if !JS_IsExceptionPending(cx.raw_cx()) {
617 rooted!(&in(cx) let mut stored_error = UndefinedValue());
618 readable.get_stored_error(stored_error.handle_mut());
619
620 JS_SetPendingException(
621 cx.raw_cx(),
622 stored_error.handle().into(),
623 ExceptionStackBehavior::Capture,
624 );
625 }
626 }
627 return Err(error);
628 }
629
630 // Let backpressure be ! ReadableStreamDefaultControllerHasBackpressure(readableController).
631 let backpressure = readable_controller.has_backpressure();
632
633 // If backpressure is not stream.[[backpressure]],
634 if backpressure != stream.get_backpressure() {
635 // Assert: backpressure is true.
636 assert!(backpressure);
637
638 // Perform ! TransformStreamSetBackpressure(stream, true).
639 stream.set_backpressure(global, true, CanGc::from_cx(cx));
640 }
641 Ok(())
642 }
643
644 /// <https://streams.spec.whatwg.org/#transform-stream-default-controller-error>
645 pub(crate) fn error(&self, cx: &mut JSContext, global: &GlobalScope, reason: SafeHandleValue) {
646 // Perform ! TransformStreamError(controller.[[stream]], e).
647 self.stream
648 .get()
649 .expect("stream is undefined")
650 .error(cx, global, reason);
651 }
652
653 /// <https://streams.spec.whatwg.org/#transform-stream-default-controller-clear-algorithms>
654 pub(crate) fn clear_algorithms(&self) {
655 if let TransformerType::Js {
656 cancel,
657 flush,
658 transform,
659 ..
660 } = &self.transformer_type
661 {
662 // Set controller.[[transformAlgorithm]] to undefined.
663 transform.replace(None);
664
665 // Set controller.[[flushAlgorithm]] to undefined.
666 flush.replace(None);
667
668 // Set controller.[[cancelAlgorithm]] to undefined.
669 cancel.replace(None);
670 }
671 }
672
673 /// <https://streams.spec.whatwg.org/#transform-stream-default-controller-terminate>
674 fn terminate(&self, cx: &mut JSContext, global: &GlobalScope) {
675 // Let stream be controller.[[stream]].
676 let stream = self.stream.get().expect("stream is null");
677
678 // Let readableController be stream.[[readable]].[[controller]].
679 let readable = stream.get_readable();
680 let readable_controller = readable.get_default_controller();
681
682 // Perform ! ReadableStreamDefaultControllerClose(readableController).
683 readable_controller.close(cx);
684
685 // Let error be a TypeError exception indicating that the stream has been terminated.
686 let error = Error::Type(c"stream has been terminated".to_owned());
687
688 // Perform ! TransformStreamErrorWritableAndUnblockWrite(stream, error).
689 rooted!(&in(cx) let mut rooted_error = UndefinedValue());
690 error.to_jsval(
691 cx.into(),
692 global,
693 rooted_error.handle_mut(),
694 CanGc::from_cx(cx),
695 );
696 stream.error_writable_and_unblock_write(cx, global, rooted_error.handle());
697 }
698}
699
700impl TransformStreamDefaultControllerMethods<crate::DomTypeHolder>
701 for TransformStreamDefaultController
702{
703 /// <https://streams.spec.whatwg.org/#ts-default-controller-desired-size>
704 fn GetDesiredSize(&self) -> Option<f64> {
705 // Let readableController be this.[[stream]].[[readable]].[[controller]].
706 let readable_controller = self
707 .stream
708 .get()
709 .expect("stream is null")
710 .get_readable()
711 .get_default_controller();
712
713 // Return ! ReadableStreamDefaultControllerGetDesiredSize(readableController).
714 readable_controller.get_desired_size()
715 }
716
717 /// <https://streams.spec.whatwg.org/#ts-default-controller-enqueue>
718 fn Enqueue(&self, cx: &mut JSContext, chunk: SafeHandleValue) -> Fallible<()> {
719 // Perform ? TransformStreamDefaultControllerEnqueue(this, chunk).
720 self.enqueue(cx, &self.global(), chunk)
721 }
722
723 /// <https://streams.spec.whatwg.org/#ts-default-controller-error>
724 fn Error(&self, cx: &mut JSContext, reason: SafeHandleValue) -> Fallible<()> {
725 // Perform ? TransformStreamDefaultControllerError(this, e).
726 self.error(cx, &self.global(), reason);
727 Ok(())
728 }
729
730 /// <https://streams.spec.whatwg.org/#ts-default-controller-terminate>
731 fn Terminate(&self, cx: &mut JSContext) -> Fallible<()> {
732 // Perform ? TransformStreamDefaultControllerTerminate(this).
733 self.terminate(cx, &self.global());
734 Ok(())
735 }
736}