script/dom/transformstreamdefaultcontroller.rs
1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
4
5use std::cell::RefCell;
6use std::rc::Rc;
7
8use dom_struct::dom_struct;
9use js::jsapi::{
10 ExceptionStackBehavior, Heap, JS_IsExceptionPending, JS_SetPendingException, JSObject,
11};
12use js::jsval::UndefinedValue;
13use js::rust::{HandleObject as SafeHandleObject, HandleValue as SafeHandleValue};
14
15use super::bindings::cell::DomRefCell;
16use super::bindings::codegen::Bindings::TransformerBinding::{
17 TransformerCancelCallback, TransformerFlushCallback, TransformerTransformCallback,
18};
19use super::types::TransformStream;
20use crate::dom::bindings::callback::ExceptionHandling;
21use crate::dom::bindings::codegen::Bindings::TransformStreamDefaultControllerBinding::TransformStreamDefaultControllerMethods;
22use crate::dom::bindings::codegen::Bindings::TransformerBinding::Transformer;
23use crate::dom::bindings::error::{Error, ErrorToJsval, Fallible};
24use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object};
25use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom};
26use crate::dom::compressionstream::{
27 CompressionStream, compress_and_enqueue_a_chunk, compress_flush_and_enqueue,
28};
29use crate::dom::decompressionstream::{
30 decompress_and_enqueue_a_chunk, decompress_flush_and_enqueue,
31};
32use crate::dom::globalscope::GlobalScope;
33use crate::dom::promise::Promise;
34use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
35use crate::dom::textdecodercommon::TextDecoderCommon;
36use crate::dom::textdecoderstream::{decode_and_enqueue_a_chunk, flush_and_enqueue};
37use crate::dom::textencoderstream::{Encoder, encode_and_enqueue_a_chunk, encode_and_flush};
38use crate::dom::types::DecompressionStream;
39use crate::realms::{InRealm, enter_realm};
40use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
41
42impl js::gc::Rootable for TransformTransformPromiseRejection {}
43
44/// Reacting to transformPromise as part of
45/// <https://streams.spec.whatwg.org/#transform-stream-default-controller-perform-transform>
46#[derive(JSTraceable, MallocSizeOf)]
47#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
48struct TransformTransformPromiseRejection {
49 controller: Dom<TransformStreamDefaultController>,
50}
51
52impl Callback for TransformTransformPromiseRejection {
53 /// Reacting to transformPromise with the following fulfillment steps:
54 #[allow(unsafe_code)]
55 fn callback(&self, cx: SafeJSContext, v: SafeHandleValue, _realm: InRealm, can_gc: CanGc) {
56 // Perform ! TransformStreamError(controller.[[stream]], r).
57 self.controller
58 .error(cx, &self.controller.global(), v, can_gc);
59
60 // Throw r.
61 // Note: this is done part of perform_transform().
62 }
63}
64
65/// The type of transformer algorithms we are using
66#[derive(JSTraceable)]
67pub(crate) enum TransformerType {
68 /// Algorithms provided by Js callbacks
69 Js {
70 /// <https://streams.spec.whatwg.org/#transformstreamdefaultcontroller-cancelalgorithm>
71 cancel: RefCell<Option<Rc<TransformerCancelCallback>>>,
72
73 /// <https://streams.spec.whatwg.org/#transformstreamdefaultcontroller-flushalgorithm>
74 flush: RefCell<Option<Rc<TransformerFlushCallback>>>,
75
76 /// <https://streams.spec.whatwg.org/#transformstreamdefaultcontroller-transformalgorithm>
77 transform: RefCell<Option<Rc<TransformerTransformCallback>>>,
78
79 /// The JS object used as `this` when invoking sink algorithms.
80 transform_obj: Heap<*mut JSObject>,
81 },
82 /// Algorithms supporting `TextDecoderStream` are implemented in Rust
83 ///
84 /// <https://encoding.spec.whatwg.org/#textdecodercommon>
85 Decoder(Rc<TextDecoderCommon>),
86 /// Algorithms supporting `TextEncoderStream` are implemented in Rust
87 ///
88 /// <https://encoding.spec.whatwg.org/#textencoderstream-encoder>
89 Encoder(Encoder),
90 /// Algorithms supporting `CompressionStream` are implemented in Rust
91 ///
92 /// <https://compression.spec.whatwg.org/#compressionstream>
93 Compressor(DomRoot<CompressionStream>),
94 /// Algorithms supporting `DecompressionStream` are implemented in Rust
95 ///
96 /// <https://compression.spec.whatwg.org/#decompressionstream>
97 Decompressor(DomRoot<DecompressionStream>),
98}
99
100impl TransformerType {
101 pub(crate) fn new_from_js_transformer(transformer: &Transformer) -> TransformerType {
102 TransformerType::Js {
103 cancel: RefCell::new(transformer.cancel.clone()),
104 flush: RefCell::new(transformer.flush.clone()),
105 transform: RefCell::new(transformer.transform.clone()),
106 transform_obj: Default::default(),
107 }
108 }
109}
110
111/// <https://streams.spec.whatwg.org/#transformstreamdefaultcontroller>
112#[dom_struct]
113pub struct TransformStreamDefaultController {
114 reflector_: Reflector,
115
116 /// The type of the underlying transformer used. Besides the JS variant,
117 /// there will be other variant(s) for `TextDecoderStream`
118 #[ignore_malloc_size_of = "transformer_type"]
119 transformer_type: TransformerType,
120
121 /// <https://streams.spec.whatwg.org/#TransformStreamDefaultController-stream>
122 stream: MutNullableDom<TransformStream>,
123
124 /// <https://streams.spec.whatwg.org/#transformstreamdefaultcontroller-finishpromise>
125 #[conditional_malloc_size_of]
126 finish_promise: DomRefCell<Option<Rc<Promise>>>,
127}
128
129impl TransformStreamDefaultController {
130 #[cfg_attr(crown, allow(crown::unrooted_must_root))]
131 fn new_inherited(transformer_type: TransformerType) -> TransformStreamDefaultController {
132 TransformStreamDefaultController {
133 reflector_: Reflector::new(),
134 transformer_type,
135 stream: MutNullableDom::new(None),
136 finish_promise: DomRefCell::new(None),
137 }
138 }
139
140 #[cfg_attr(crown, allow(crown::unrooted_must_root))]
141 pub(crate) fn new(
142 global: &GlobalScope,
143 transformer_type: TransformerType,
144 can_gc: CanGc,
145 ) -> DomRoot<TransformStreamDefaultController> {
146 reflect_dom_object(
147 Box::new(TransformStreamDefaultController::new_inherited(
148 transformer_type,
149 )),
150 global,
151 can_gc,
152 )
153 }
154
155 /// Setting the JS object after the heap has settled down.
156 ///
157 /// Note that this has no effect if the transformer type is not `TransformerType::Js`
158 pub(crate) fn set_transform_obj(&self, this_object: SafeHandleObject) {
159 if let TransformerType::Js { transform_obj, .. } = &self.transformer_type {
160 transform_obj.set(*this_object)
161 } else {
162 unreachable!("Non-Js transformer type should not set transform_obj")
163 }
164 }
165
166 pub(crate) fn set_stream(&self, stream: &TransformStream) {
167 self.stream.set(Some(stream));
168 }
169
170 pub(crate) fn get_finish_promise(&self) -> Option<Rc<Promise>> {
171 self.finish_promise.borrow().clone()
172 }
173
174 pub(crate) fn set_finish_promise(&self, promise: Rc<Promise>) {
175 *self.finish_promise.borrow_mut() = Some(promise);
176 }
177
178 /// <https://streams.spec.whatwg.org/#transform-stream-default-controller-perform-transform>
179 pub(crate) fn transform_stream_default_controller_perform_transform(
180 &self,
181 cx: SafeJSContext,
182 global: &GlobalScope,
183 chunk: SafeHandleValue,
184 can_gc: CanGc,
185 ) -> Fallible<Rc<Promise>> {
186 // Let transformPromise be the result of performing controller.[[transformAlgorithm]], passing chunk.
187 let transform_promise = self.perform_transform(cx, global, chunk, can_gc)?;
188
189 // Return the result of reacting to transformPromise with the following rejection steps given the argument r:
190 rooted!(in(*cx) let mut reject_handler = Some(TransformTransformPromiseRejection {
191 controller: Dom::from_ref(self),
192 }));
193
194 let handler = PromiseNativeHandler::new(
195 global,
196 None,
197 reject_handler.take().map(|h| Box::new(h) as Box<_>),
198 can_gc,
199 );
200 let realm = enter_realm(global);
201 let comp = InRealm::Entered(&realm);
202 transform_promise.append_native_handler(&handler, comp, can_gc);
203
204 Ok(transform_promise)
205 }
206
207 pub(crate) fn perform_transform(
208 &self,
209 cx: SafeJSContext,
210 global: &GlobalScope,
211 chunk: SafeHandleValue,
212 can_gc: CanGc,
213 ) -> Fallible<Rc<Promise>> {
214 let result = match &self.transformer_type {
215 // <https://streams.spec.whatwg.org/#set-up-transform-stream-default-controller-from-transformer>
216 TransformerType::Js {
217 transform,
218 transform_obj,
219 ..
220 } => {
221 // Step 5. If transformerDict["transform"] exists, set
222 // transformAlgorithm to an algorithm which takes an argument
223 // chunk and returns the result of invoking
224 // transformerDict["transform"] with argument list « chunk,
225 // controller » and callback this value transformer.
226 let algo = transform.borrow().clone();
227 if let Some(transform) = algo {
228 rooted!(in(*cx) let this_object = transform_obj.get());
229 transform
230 .Call_(
231 &this_object.handle(),
232 chunk,
233 self,
234 ExceptionHandling::Rethrow,
235 can_gc,
236 )
237 .unwrap_or_else(|e| {
238 let p = Promise::new(global, can_gc);
239 p.reject_error(e, can_gc);
240 p
241 })
242 } else {
243 // Step 2. Let transformAlgorithm be the following steps, taking a chunk argument:
244 // Let result be TransformStreamDefaultControllerEnqueue(controller, chunk).
245 // If result is an abrupt completion, return a promise rejected with result.[[Value]].
246 if let Err(error) = self.enqueue(cx, global, chunk, can_gc) {
247 rooted!(in(*cx) let mut error_val = UndefinedValue());
248 error.to_jsval(cx, global, error_val.handle_mut(), can_gc);
249 Promise::new_rejected(global, cx, error_val.handle(), can_gc)
250 } else {
251 // Otherwise, return a promise resolved with undefined.
252 Promise::new_resolved(global, cx, (), can_gc)
253 }
254 }
255 },
256 TransformerType::Decoder(decoder) => {
257 // <https://encoding.spec.whatwg.org/#dom-textdecoderstream>
258 // Step 7. Let transformAlgorithm be an algorithm which takes a
259 // chunk argument and runs the decode and enqueue a chunk
260 // algorithm with this and chunk.
261 decode_and_enqueue_a_chunk(cx, global, chunk, decoder, self, can_gc)
262 // <https://streams.spec.whatwg.org/#transformstream-set-up>
263 // Step 5. Let transformAlgorithmWrapper be an algorithm that runs these steps given a value chunk:
264 // Step 5.1 Let result be the result of running transformAlgorithm given chunk.
265 // Step 5.2 If result is a Promise, then return result.
266 // Note: not applicable, the spec does NOT require deode_and_enqueue_a_chunk() to return a Promise
267 // Step 5.3 Return a promise resolved with undefined.
268 .map(|_| Promise::new_resolved(global, cx, (), can_gc))
269 .unwrap_or_else(|e| {
270 // <https://streams.spec.whatwg.org/#transformstream-set-up>
271 // Step 5.1 If this throws an exception e,
272 let realm = enter_realm(self);
273 let p = Promise::new_in_current_realm((&realm).into(), can_gc);
274 // return a promise rejected with e.
275 p.reject_error(e, can_gc);
276 p
277 })
278 },
279 TransformerType::Encoder(encoder) => {
280 // <https://encoding.spec.whatwg.org/#dom-textencoderstream>
281 // Step 2. Let transformAlgorithm be an algorithm which takes a chunk argument and runs the encode
282 // and enqueue a chunk algorithm with this and chunk.
283 encode_and_enqueue_a_chunk(cx, global, chunk, encoder, self, can_gc)
284 // <https://streams.spec.whatwg.org/#transformstream-set-up>
285 // Step 5. Let transformAlgorithmWrapper be an algorithm that runs these steps given a value chunk:
286 // Step 5.1 Let result be the result of running transformAlgorithm given chunk.
287 // Step 5.2 If result is a Promise, then return result.
288 // Note: not applicable, the spec does NOT require encode_and_enqueue_a_chunk() to return a Promise
289 // Step 5.3 Return a promise resolved with undefined.
290 .map(|_| Promise::new_resolved(global, cx, (), can_gc))
291 .unwrap_or_else(|e| {
292 // <https://streams.spec.whatwg.org/#transformstream-set-up>
293 // Step 5.1 If this throws an exception e,
294 let realm = enter_realm(self);
295 let p = Promise::new_in_current_realm((&realm).into(), can_gc);
296 // return a promise rejected with e.
297 p.reject_error(e, can_gc);
298 p
299 })
300 },
301 TransformerType::Compressor(cs) => {
302 // <https://compression.spec.whatwg.org/#dom-compressionstream-compressionstream>
303 // Step 3. Let transformAlgorithm be an algorithm which takes a chunk argument and
304 // runs the compress and enqueue a chunk algorithm with this and chunk.
305 compress_and_enqueue_a_chunk(cx, global, cs, chunk, self, can_gc)
306 // <https://streams.spec.whatwg.org/#transformstream-set-up>
307 // Step 5. Let transformAlgorithmWrapper be an algorithm that runs these steps given a value chunk:
308 // Step 5.1 Let result be the result of running transformAlgorithm given chunk.
309 // Step 5.2 If result is a Promise, then return result.
310 // Note: not applicable, the spec does NOT require
311 // compress_and_enqueue_a_chunk() to return a Promise.
312 // Step 5.3 Return a promise resolved with undefined.
313 .map(|_| Promise::new_resolved(global, cx, (), can_gc))
314 .unwrap_or_else(|e| {
315 // <https://streams.spec.whatwg.org/#transformstream-set-up>
316 // Step 5.1 If this throws an exception e,
317 let realm = enter_realm(self);
318 let p = Promise::new_in_current_realm((&realm).into(), can_gc);
319 // return a promise rejected with e.
320 p.reject_error(e, can_gc);
321 p
322 })
323 },
324 TransformerType::Decompressor(ds) => {
325 // <https://compression.spec.whatwg.org/#dom-decompressionstream-decompressionstream>
326 // Step 3. Let transformAlgorithm be an algorithm which takes a chunk argument and
327 // runs the decompress and enqueue a chunk algorithm with this and chunk.
328 decompress_and_enqueue_a_chunk(cx, global, ds, chunk, self, can_gc)
329 // <https://streams.spec.whatwg.org/#transformstream-set-up>
330 // Step 5. Let transformAlgorithmWrapper be an algorithm that runs these steps given a value chunk:
331 // Step 5.1 Let result be the result of running transformAlgorithm given chunk.
332 // Step 5.2 If result is a Promise, then return result.
333 // Note: not applicable, the spec does NOT require
334 // decompress_and_enqueue_a_chunk() to return a Promise
335 // Step 5.3 Return a promise resolved with undefined.
336 .map(|_| Promise::new_resolved(global, cx, (), can_gc))
337 .unwrap_or_else(|e| {
338 // <https://streams.spec.whatwg.org/#transformstream-set-up>
339 // Step 5.1 If this throws an exception e,
340 let realm = enter_realm(self);
341 let p = Promise::new_in_current_realm((&realm).into(), can_gc);
342 // return a promise rejected with e.
343 p.reject_error(e, can_gc);
344 p
345 })
346 },
347 };
348
349 Ok(result)
350 }
351
352 pub(crate) fn perform_cancel(
353 &self,
354 cx: SafeJSContext,
355 global: &GlobalScope,
356 chunk: SafeHandleValue,
357 can_gc: CanGc,
358 ) -> Fallible<Rc<Promise>> {
359 let result = match &self.transformer_type {
360 // <https://streams.spec.whatwg.org/#set-up-transform-stream-default-controller-from-transformer>
361 TransformerType::Js {
362 cancel,
363 transform_obj,
364 ..
365 } => {
366 // Step 7. If transformerDict["cancel"] exists, set
367 // cancelAlgorithm to an algorithm which takes an argument
368 // reason and returns the result of invoking
369 // transformerDict["cancel"] with argument list « reason » and
370 // callback this value transformer.
371 let algo = cancel.borrow().clone();
372 if let Some(cancel) = algo {
373 rooted!(in(*cx) let this_object = transform_obj.get());
374 cancel
375 .Call_(
376 &this_object.handle(),
377 chunk,
378 ExceptionHandling::Rethrow,
379 can_gc,
380 )
381 .unwrap_or_else(|e| {
382 let p = Promise::new(global, can_gc);
383 p.reject_error(e, can_gc);
384 p
385 })
386 } else {
387 // Step 4. Let cancelAlgorithm be an algorithm which returns a promise resolved with undefined.
388 Promise::new_resolved(global, cx, (), can_gc)
389 }
390 },
391 TransformerType::Decoder(_) => {
392 // <https://streams.spec.whatwg.org/#transformstream-set-up>
393 // Step 7. Let cancelAlgorithmWrapper be an algorithm that runs these steps given a value reason:
394 // Step 7.1 Let result be the result of running cancelAlgorithm given reason,
395 // if cancelAlgorithm was given, or null otherwise
396 // Note: `TextDecoderStream` does NOT specify a cancel algorithm.
397 // Step 7.2 If result is a Promise, then return result.
398 // Note: Not applicable.
399 // Step 7.3 Return a promise resolved with undefined.
400 Promise::new_resolved(global, cx, (), can_gc)
401 },
402 TransformerType::Encoder(_) => {
403 // <https://streams.spec.whatwg.org/#transformstream-set-up>
404 // Step 7. Let cancelAlgorithmWrapper be an algorithm that runs these steps given a value reason:
405 // Step 7.1 Let result be the result of running cancelAlgorithm given reason,
406 // if cancelAlgorithm was given, or null otherwise
407 // Note: `TextDecoderStream` does NOT specify a cancel algorithm.
408 // Step 7.2 If result is a Promise, then return result.
409 // Note: Not applicable.
410 // Step 7.3 Return a promise resolved with undefined.
411 Promise::new_resolved(global, cx, (), can_gc)
412 },
413 TransformerType::Compressor(_) => {
414 // <https://streams.spec.whatwg.org/#transformstream-set-up>
415 // Step 7. Let cancelAlgorithmWrapper be an algorithm that runs these steps given a value reason:
416 // Step 7.1 Let result be the result of running cancelAlgorithm given reason,
417 // if cancelAlgorithm was given, or null otherwise
418 // Note: `CompressionStream` does NOT specify a cancel algorithm.
419 // Step 7.2 If result is a Promise, then return result.
420 // Note: Not applicable.
421 // Step 7.3 Return a promise resolved with undefined.
422 Promise::new_resolved(global, cx, (), can_gc)
423 },
424 TransformerType::Decompressor(_) => {
425 // <https://streams.spec.whatwg.org/#transformstream-set-up>
426 // Step 7. Let cancelAlgorithmWrapper be an algorithm that runs these steps given a value reason:
427 // Step 7.1 Let result be the result of running cancelAlgorithm given reason,
428 // if cancelAlgorithm was given, or null otherwise
429 // Note: `DecompressionStream` does NOT specify a cancel algorithm.
430 // Step 7.2 If result is a Promise, then return result.
431 // Note: Not applicable.
432 // Step 7.3 Return a promise resolved with undefined.
433 Promise::new_resolved(global, cx, (), can_gc)
434 },
435 };
436
437 Ok(result)
438 }
439
440 pub(crate) fn perform_flush(
441 &self,
442 cx: SafeJSContext,
443 global: &GlobalScope,
444 can_gc: CanGc,
445 ) -> Fallible<Rc<Promise>> {
446 let result = match &self.transformer_type {
447 // <https://streams.spec.whatwg.org/#set-up-transform-stream-default-controller-from-transformer>
448 TransformerType::Js {
449 flush,
450 transform_obj,
451 ..
452 } => {
453 // Step 6. If transformerDict["flush"] exists, set flushAlgorithm to an
454 // algorithm which returns the result of invoking
455 // transformerDict["flush"] with argument list « controller »
456 // and callback this value transformer.
457 let algo = flush.borrow().clone();
458 if let Some(flush) = algo {
459 rooted!(in(*cx) let this_object = transform_obj.get());
460 flush
461 .Call_(
462 &this_object.handle(),
463 self,
464 ExceptionHandling::Rethrow,
465 can_gc,
466 )
467 .unwrap_or_else(|e| {
468 let p = Promise::new(global, can_gc);
469 p.reject_error(e, can_gc);
470 p
471 })
472 } else {
473 // Step 3. Let flushAlgorithm be an algorithm which returns a promise resolved with undefined.
474 Promise::new_resolved(global, cx, (), can_gc)
475 }
476 },
477 TransformerType::Decoder(decoder) => {
478 // <https://encoding.spec.whatwg.org/#dom-textdecoderstream>
479 // Step 8. Let flushAlgorithm be an algorithm which takes no
480 // arguments and runs the flush and enqueue algorithm with this.
481 flush_and_enqueue(cx, global, decoder, self, can_gc)
482 // <https://streams.spec.whatwg.org/#transformstream-set-up>
483 // Step 6. Let flushAlgorithmWrapper be an algorithm that runs these steps:
484 // Step 6.1 Let result be the result of running flushAlgorithm,
485 // if flushAlgorithm was given, or null otherwise.
486 // Step 6.2 If result is a Promise, then return result.
487 // Note: Not applicable. The spec does NOT require flush_and_enqueue algo to return a Promise
488 // Step 6.3 Return a promise resolved with undefined.
489 .map(|_| Promise::new_resolved(global, cx, (), can_gc))
490 .unwrap_or_else(|e| {
491 // <https://streams.spec.whatwg.org/#transformstream-set-up>
492 // Step 6.1 If this throws an exception e,
493 let realm = enter_realm(self);
494 let p = Promise::new_in_current_realm((&realm).into(), can_gc);
495 // return a promise rejected with e.
496 p.reject_error(e, can_gc);
497 p
498 })
499 },
500 TransformerType::Encoder(encoder) => {
501 // <https://encoding.spec.whatwg.org/#textencoderstream-encoder>
502 // Step 3. Let flushAlgorithm be an algorithm which runs the encode and flush algorithm with this.
503 encode_and_flush(cx, global, encoder, self, can_gc)
504 // <https://streams.spec.whatwg.org/#transformstream-set-up>
505 // Step 6. Let flushAlgorithmWrapper be an algorithm that runs these steps:
506 // Step 6.1 Let result be the result of running flushAlgorithm,
507 // if flushAlgorithm was given, or null otherwise.
508 // Step 6.2 If result is a Promise, then return result.
509 // Note: Not applicable. The spec does NOT require encode_and_flush algo to return a Promise
510 // Step 6.3 Return a promise resolved with undefined.
511 .map(|_| Promise::new_resolved(global, cx, (), can_gc))
512 .unwrap_or_else(|e| {
513 // <https://streams.spec.whatwg.org/#transformstream-set-up>
514 // Step 6.1 If this throws an exception e,
515 let realm = enter_realm(self);
516 let p = Promise::new_in_current_realm((&realm).into(), can_gc);
517 // return a promise rejected with e.
518 p.reject_error(e, can_gc);
519 p
520 })
521 },
522 TransformerType::Compressor(cs) => {
523 // <https://compression.spec.whatwg.org/#dom-compressionstream-compressionstream>
524 // Step 4. Let flushAlgorithm be an algorithm which takes no argument and runs the
525 // compress flush and enqueue algorithm with this.
526 compress_flush_and_enqueue(cx, global, cs, self, can_gc)
527 // <https://streams.spec.whatwg.org/#transformstream-set-up>
528 // Step 6. Let flushAlgorithmWrapper be an algorithm that runs these steps:
529 // Step 6.1 Let result be the result of running flushAlgorithm,
530 // if flushAlgorithm was given, or null otherwise.
531 // Step 6.2 If result is a Promise, then return result.
532 // Note: Not applicable. The spec does NOT require compress_flush_and_enqueue
533 // algo to return a Promise.
534 // Step 6.3 Return a promise resolved with undefined.
535 .map(|_| Promise::new_resolved(global, cx, (), can_gc))
536 .unwrap_or_else(|e| {
537 // <https://streams.spec.whatwg.org/#transformstream-set-up>
538 // Step 6.1 If this throws an exception e,
539 let realm = enter_realm(self);
540 let p = Promise::new_in_current_realm((&realm).into(), can_gc);
541 // return a promise rejected with e.
542 p.reject_error(e, can_gc);
543 p
544 })
545 },
546 TransformerType::Decompressor(ds) => {
547 // <https://compression.spec.whatwg.org/#dom-decompressionstream-decompressionstream>
548 // Step 4. Let flushAlgorithm be an algorithm which takes no argument and runs the
549 // decompress flush and enqueue algorithm with this.
550 decompress_flush_and_enqueue(cx, global, ds, self, can_gc)
551 // <https://streams.spec.whatwg.org/#transformstream-set-up>
552 // Step 6. Let flushAlgorithmWrapper be an algorithm that runs these steps:
553 // Step 6.1 Let result be the result of running flushAlgorithm,
554 // if flushAlgorithm was given, or null otherwise.
555 // Step 6.2 If result is a Promise, then return result.
556 // Note: Not applicable. The spec does NOT require decompress_flush_and_enqueue
557 // algo to return a Promise.
558 // Step 6.3 Return a promise resolved with undefined.
559 .map(|_| Promise::new_resolved(global, cx, (), can_gc))
560 .unwrap_or_else(|e| {
561 // <https://streams.spec.whatwg.org/#transformstream-set-up>
562 // Step 6.1 If this throws an exception e,
563 let realm = enter_realm(self);
564 let p = Promise::new_in_current_realm((&realm).into(), can_gc);
565 // return a promise rejected with e.
566 p.reject_error(e, can_gc);
567 p
568 })
569 },
570 };
571
572 Ok(result)
573 }
574
575 /// <https://streams.spec.whatwg.org/#transform-stream-default-controller-enqueue>
576 #[allow(unsafe_code)]
577 pub(crate) fn enqueue(
578 &self,
579 cx: SafeJSContext,
580 global: &GlobalScope,
581 chunk: SafeHandleValue,
582 can_gc: CanGc,
583 ) -> Fallible<()> {
584 // Let stream be controller.[[stream]].
585 let stream = self.stream.get().expect("stream is null");
586
587 // Let readableController be stream.[[readable]].[[controller]].
588 let readable = stream.get_readable();
589 let readable_controller = readable.get_default_controller();
590
591 // If ! ReadableStreamDefaultControllerCanCloseOrEnqueue(readableController)
592 // is false, throw a TypeError exception.
593 if !readable_controller.can_close_or_enqueue() {
594 return Err(Error::Type(
595 "ReadableStreamDefaultControllerCanCloseOrEnqueue is false".to_owned(),
596 ));
597 }
598
599 // Let enqueueResult be ReadableStreamDefaultControllerEnqueue(readableController, chunk).
600 // If enqueueResult is an abrupt completion,
601 if let Err(error) = readable_controller.enqueue(cx, chunk, can_gc) {
602 // Perform ! TransformStreamErrorWritableAndUnblockWrite(stream, enqueueResult.[[Value]]).
603 rooted!(in(*cx) let mut rooted_error = UndefinedValue());
604 error
605 .clone()
606 .to_jsval(cx, global, rooted_error.handle_mut(), can_gc);
607 stream.error_writable_and_unblock_write(cx, global, rooted_error.handle(), can_gc);
608
609 // Throw stream.[[readable]].[[storedError]].
610 unsafe {
611 if !JS_IsExceptionPending(*cx) {
612 rooted!(in(*cx) let mut stored_error = UndefinedValue());
613 readable.get_stored_error(stored_error.handle_mut());
614
615 JS_SetPendingException(
616 *cx,
617 stored_error.handle().into(),
618 ExceptionStackBehavior::Capture,
619 );
620 }
621 }
622 return Err(error);
623 }
624
625 // Let backpressure be ! ReadableStreamDefaultControllerHasBackpressure(readableController).
626 let backpressure = readable_controller.has_backpressure();
627
628 // If backpressure is not stream.[[backpressure]],
629 if backpressure != stream.get_backpressure() {
630 // Assert: backpressure is true.
631 assert!(backpressure);
632
633 // Perform ! TransformStreamSetBackpressure(stream, true).
634 stream.set_backpressure(global, true, can_gc);
635 }
636 Ok(())
637 }
638
639 /// <https://streams.spec.whatwg.org/#transform-stream-default-controller-error>
640 pub(crate) fn error(
641 &self,
642 cx: SafeJSContext,
643 global: &GlobalScope,
644 reason: SafeHandleValue,
645 can_gc: CanGc,
646 ) {
647 // Perform ! TransformStreamError(controller.[[stream]], e).
648 self.stream
649 .get()
650 .expect("stream is undefined")
651 .error(cx, global, reason, can_gc);
652 }
653
654 /// <https://streams.spec.whatwg.org/#transform-stream-default-controller-clear-algorithms>
655 pub(crate) fn clear_algorithms(&self) {
656 if let TransformerType::Js {
657 cancel,
658 flush,
659 transform,
660 ..
661 } = &self.transformer_type
662 {
663 // Set controller.[[transformAlgorithm]] to undefined.
664 transform.replace(None);
665
666 // Set controller.[[flushAlgorithm]] to undefined.
667 flush.replace(None);
668
669 // Set controller.[[cancelAlgorithm]] to undefined.
670 cancel.replace(None);
671 }
672 }
673
674 /// <https://streams.spec.whatwg.org/#transform-stream-default-controller-terminate>
675 pub(crate) fn terminate(&self, cx: SafeJSContext, global: &GlobalScope, can_gc: CanGc) {
676 // Let stream be controller.[[stream]].
677 let stream = self.stream.get().expect("stream is null");
678
679 // Let readableController be stream.[[readable]].[[controller]].
680 let readable = stream.get_readable();
681 let readable_controller = readable.get_default_controller();
682
683 // Perform ! ReadableStreamDefaultControllerClose(readableController).
684 readable_controller.close(can_gc);
685
686 // Let error be a TypeError exception indicating that the stream has been terminated.
687 let error = Error::Type("stream has been terminated".to_owned());
688
689 // Perform ! TransformStreamErrorWritableAndUnblockWrite(stream, error).
690 rooted!(in(*cx) let mut rooted_error = UndefinedValue());
691 error.to_jsval(cx, global, rooted_error.handle_mut(), can_gc);
692 stream.error_writable_and_unblock_write(cx, global, rooted_error.handle(), can_gc);
693 }
694}
695
696#[allow(non_snake_case)]
697impl TransformStreamDefaultControllerMethods<crate::DomTypeHolder>
698 for TransformStreamDefaultController
699{
700 /// <https://streams.spec.whatwg.org/#ts-default-controller-desired-size>
701 fn GetDesiredSize(&self) -> Option<f64> {
702 // Let readableController be this.[[stream]].[[readable]].[[controller]].
703 let readable_controller = self
704 .stream
705 .get()
706 .expect("stream is null")
707 .get_readable()
708 .get_default_controller();
709
710 // Return ! ReadableStreamDefaultControllerGetDesiredSize(readableController).
711 readable_controller.get_desired_size()
712 }
713
714 /// <https://streams.spec.whatwg.org/#ts-default-controller-enqueue>
715 fn Enqueue(&self, cx: SafeJSContext, chunk: SafeHandleValue, can_gc: CanGc) -> Fallible<()> {
716 // Perform ? TransformStreamDefaultControllerEnqueue(this, chunk).
717 self.enqueue(cx, &self.global(), chunk, can_gc)
718 }
719
720 /// <https://streams.spec.whatwg.org/#ts-default-controller-error>
721 fn Error(&self, cx: SafeJSContext, reason: SafeHandleValue, can_gc: CanGc) -> Fallible<()> {
722 // Perform ? TransformStreamDefaultControllerError(this, e).
723 self.error(cx, &self.global(), reason, can_gc);
724 Ok(())
725 }
726
727 /// <https://streams.spec.whatwg.org/#ts-default-controller-terminate>
728 fn Terminate(&self, can_gc: CanGc) -> Fallible<()> {
729 // Perform ? TransformStreamDefaultControllerTerminate(this).
730 self.terminate(GlobalScope::get_cx(), &self.global(), can_gc);
731 Ok(())
732 }
733}