script/dom/stream/readablebytestreamcontroller.rs
1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4
5use std::cell::Cell;
6use std::cmp::min;
7use std::collections::VecDeque;
8use std::rc::Rc;
9
10use dom_struct::dom_struct;
11use js::jsapi::{Heap, Type};
12use js::jsval::UndefinedValue;
13use js::realm::CurrentRealm;
14use js::rust::{HandleObject, HandleValue as SafeHandleValue, HandleValue};
15use js::typedarray::{ArrayBufferU8, ArrayBufferViewU8};
16
17use super::readablestreambyobreader::ReadIntoRequest;
18use super::readablestreamdefaultreader::ReadRequest;
19use super::underlyingsourcecontainer::{UnderlyingSourceContainer, UnderlyingSourceType};
20use crate::dom::bindings::buffer_source::{
21 Constructor, HeapBufferSource, byte_size, create_array_buffer_with_size,
22 create_buffer_source_with_constructor,
23};
24use crate::dom::bindings::cell::DomRefCell;
25use crate::dom::bindings::codegen::Bindings::ReadableByteStreamControllerBinding::ReadableByteStreamControllerMethods;
26use crate::dom::bindings::codegen::UnionTypes::ReadableStreamDefaultControllerOrReadableByteStreamController as Controller;
27use crate::dom::bindings::error::{Error, ErrorToJsval, Fallible};
28use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object};
29use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom};
30use crate::dom::bindings::trace::RootedTraceableBox;
31use crate::dom::globalscope::GlobalScope;
32use crate::dom::promise::Promise;
33use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
34use crate::dom::stream::readablestream::ReadableStream;
35use crate::dom::stream::readablestreambyobrequest::ReadableStreamBYOBRequest;
36use crate::realms::{InRealm, enter_realm};
37use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
38
39/// <https://streams.spec.whatwg.org/#readable-byte-stream-queue-entry>
40#[derive(JSTraceable, MallocSizeOf)]
41pub(crate) struct QueueEntry {
42 /// <https://streams.spec.whatwg.org/#readable-byte-stream-queue-entry-buffer>
43 #[ignore_malloc_size_of = "HeapBufferSource"]
44 buffer: HeapBufferSource<ArrayBufferU8>,
45 /// <https://streams.spec.whatwg.org/#readable-byte-stream-queue-entry-byte-offset>
46 byte_offset: usize,
47 /// <https://streams.spec.whatwg.org/#readable-byte-stream-queue-entry-byte-length>
48 byte_length: usize,
49}
50
51impl QueueEntry {
52 pub(crate) fn new(
53 buffer: HeapBufferSource<ArrayBufferU8>,
54 byte_offset: usize,
55 byte_length: usize,
56 ) -> QueueEntry {
57 QueueEntry {
58 buffer,
59 byte_offset,
60 byte_length,
61 }
62 }
63}
64
65#[derive(Debug, Eq, JSTraceable, MallocSizeOf, PartialEq)]
66pub(crate) enum ReaderType {
67 /// <https://streams.spec.whatwg.org/#readablestreambyobreader>
68 Byob,
69 /// <https://streams.spec.whatwg.org/#readablestreamdefaultreader>
70 Default,
71}
72
73/// <https://streams.spec.whatwg.org/#pull-into-descriptor>
74#[derive(Eq, JSTraceable, MallocSizeOf, PartialEq)]
75pub(crate) struct PullIntoDescriptor {
76 #[ignore_malloc_size_of = "HeapBufferSource"]
77 /// <https://streams.spec.whatwg.org/#pull-into-descriptor-buffer>
78 buffer: HeapBufferSource<ArrayBufferU8>,
79 /// <https://streams.spec.whatwg.org/#pull-into-descriptor-buffer-byte-length>
80 buffer_byte_length: u64,
81 /// <https://streams.spec.whatwg.org/#pull-into-descriptor-byte-offset>
82 byte_offset: u64,
83 /// <https://streams.spec.whatwg.org/#pull-into-descriptor-byte-length>
84 byte_length: u64,
85 /// <https://streams.spec.whatwg.org/#pull-into-descriptor-bytes-filled>
86 bytes_filled: Cell<u64>,
87 /// <https://streams.spec.whatwg.org/#pull-into-descriptor-minimum-fill>
88 minimum_fill: u64,
89 /// <https://streams.spec.whatwg.org/#pull-into-descriptor-element-size>
90 element_size: u64,
91 /// <https://streams.spec.whatwg.org/#pull-into-descriptor-view-constructor>
92 view_constructor: Constructor,
93 /// <https://streams.spec.whatwg.org/#pull-into-descriptor-reader-type>
94 reader_type: Option<ReaderType>,
95}
96
97/// The fulfillment handler for
98/// <https://streams.spec.whatwg.org/#dom-underlyingsource-start>
99#[derive(Clone, JSTraceable, MallocSizeOf)]
100#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
101struct StartAlgorithmFulfillmentHandler {
102 controller: Dom<ReadableByteStreamController>,
103}
104
105impl Callback for StartAlgorithmFulfillmentHandler {
106 /// Continuation of <https://streams.spec.whatwg.org/#set-up-readable-byte-stream-controller>
107 /// Upon fulfillment of startPromise,
108 fn callback(&self, cx: &mut CurrentRealm, _v: HandleValue) {
109 let can_gc = CanGc::from_cx(cx);
110 // Set controller.[[started]] to true.
111 self.controller.started.set(true);
112
113 // Assert: controller.[[pulling]] is false.
114 assert!(!self.controller.pulling.get());
115
116 // Assert: controller.[[pullAgain]] is false.
117 assert!(!self.controller.pull_again.get());
118
119 // Perform ! ReadableByteStreamControllerCallPullIfNeeded(controller).
120 self.controller.call_pull_if_needed(can_gc);
121 }
122}
123
124/// The rejection handler for
125/// <https://streams.spec.whatwg.org/#dom-underlyingsource-start>
126#[derive(Clone, JSTraceable, MallocSizeOf)]
127#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
128struct StartAlgorithmRejectionHandler {
129 controller: Dom<ReadableByteStreamController>,
130}
131
132impl Callback for StartAlgorithmRejectionHandler {
133 /// Continuation of <https://streams.spec.whatwg.org/#set-up-readable-byte-stream-controller>
134 /// Upon rejection of startPromise with reason r,
135 fn callback(&self, cx: &mut CurrentRealm, v: HandleValue) {
136 let can_gc = CanGc::from_cx(cx);
137 // Perform ! ReadableByteStreamControllerError(controller, r).
138 self.controller.error(v, can_gc);
139 }
140}
141
142/// The fulfillment handler for
143/// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-call-pull-if-needed>
144#[derive(Clone, JSTraceable, MallocSizeOf)]
145#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
146struct PullAlgorithmFulfillmentHandler {
147 controller: Dom<ReadableByteStreamController>,
148}
149
150impl Callback for PullAlgorithmFulfillmentHandler {
151 /// Continuation of <https://streams.spec.whatwg.org/#readable-byte-stream-controller-call-pull-if-needed>
152 /// Upon fulfillment of pullPromise
153 fn callback(&self, cx: &mut CurrentRealm, _v: HandleValue) {
154 let can_gc = CanGc::from_cx(cx);
155 // Set controller.[[pulling]] to false.
156 self.controller.pulling.set(false);
157
158 // If controller.[[pullAgain]] is true,
159 if self.controller.pull_again.get() {
160 // Set controller.[[pullAgain]] to false.
161 self.controller.pull_again.set(false);
162
163 // Perform ! ReadableByteStreamControllerCallPullIfNeeded(controller).
164 self.controller.call_pull_if_needed(can_gc);
165 }
166 }
167}
168
169/// The rejection handler for
170/// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-call-pull-if-needed>
171#[derive(Clone, JSTraceable, MallocSizeOf)]
172#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
173struct PullAlgorithmRejectionHandler {
174 controller: Dom<ReadableByteStreamController>,
175}
176
177impl Callback for PullAlgorithmRejectionHandler {
178 /// Continuation of <https://streams.spec.whatwg.org/#readable-stream-byte-controller-call-pull-if-needed>
179 /// Upon rejection of pullPromise with reason e.
180 fn callback(&self, cx: &mut CurrentRealm, v: HandleValue) {
181 let can_gc = CanGc::from_cx(cx);
182 // Perform ! ReadableByteStreamControllerError(controller, e).
183 self.controller.error(v, can_gc);
184 }
185}
186
187/// <https://streams.spec.whatwg.org/#readablebytestreamcontroller>
188#[dom_struct]
189pub(crate) struct ReadableByteStreamController {
190 reflector_: Reflector,
191 /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-autoallocatechunksize>
192 auto_allocate_chunk_size: Option<u64>,
193 /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-stream>
194 stream: MutNullableDom<ReadableStream>,
195 /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-strategyhwm>
196 strategy_hwm: f64,
197 /// A mutable reference to the underlying source is used to implement these two
198 /// internal slots:
199 ///
200 /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-pullalgorithm>
201 /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-cancelalgorithm>
202 underlying_source: MutNullableDom<UnderlyingSourceContainer>,
203 /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-queue>
204 queue: DomRefCell<VecDeque<QueueEntry>>,
205 /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-queuetotalsize>
206 queue_total_size: Cell<f64>,
207 /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-byobrequest>
208 byob_request: MutNullableDom<ReadableStreamBYOBRequest>,
209 /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-pendingpullintos>
210 pending_pull_intos: DomRefCell<Vec<PullIntoDescriptor>>,
211 /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-closerequested>
212 close_requested: Cell<bool>,
213 /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-started>
214 started: Cell<bool>,
215 /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-pulling>
216 pulling: Cell<bool>,
217 /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-pullalgorithm>
218 pull_again: Cell<bool>,
219}
220
221impl ReadableByteStreamController {
222 #[cfg_attr(crown, expect(crown::unrooted_must_root))]
223 fn new_inherited(
224 underlying_source_type: UnderlyingSourceType,
225 strategy_hwm: f64,
226 global: &GlobalScope,
227 can_gc: CanGc,
228 ) -> ReadableByteStreamController {
229 let underlying_source_container =
230 UnderlyingSourceContainer::new(global, underlying_source_type, can_gc);
231 let auto_allocate_chunk_size = underlying_source_container.auto_allocate_chunk_size();
232 ReadableByteStreamController {
233 reflector_: Reflector::new(),
234 byob_request: MutNullableDom::new(None),
235 stream: MutNullableDom::new(None),
236 underlying_source: MutNullableDom::new(Some(&*underlying_source_container)),
237 auto_allocate_chunk_size,
238 pending_pull_intos: DomRefCell::new(Vec::new()),
239 strategy_hwm,
240 close_requested: Default::default(),
241 queue: DomRefCell::new(Default::default()),
242 queue_total_size: Default::default(),
243 started: Default::default(),
244 pulling: Default::default(),
245 pull_again: Default::default(),
246 }
247 }
248
249 #[cfg_attr(crown, expect(crown::unrooted_must_root))]
250 pub(crate) fn new(
251 underlying_source_type: UnderlyingSourceType,
252 strategy_hwm: f64,
253 global: &GlobalScope,
254 can_gc: CanGc,
255 ) -> DomRoot<ReadableByteStreamController> {
256 reflect_dom_object(
257 Box::new(ReadableByteStreamController::new_inherited(
258 underlying_source_type,
259 strategy_hwm,
260 global,
261 can_gc,
262 )),
263 global,
264 can_gc,
265 )
266 }
267
268 #[allow(dead_code)]
269 pub(crate) fn set_stream(&self, stream: &ReadableStream) {
270 self.stream.set(Some(stream))
271 }
272
273 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-pull-into>
274 pub(crate) fn perform_pull_into(
275 &self,
276 cx: SafeJSContext,
277 read_into_request: &ReadIntoRequest,
278 view: HeapBufferSource<ArrayBufferViewU8>,
279 min: u64,
280 can_gc: CanGc,
281 ) {
282 // Let stream be controller.[[stream]].
283 let stream = self.stream.get().unwrap();
284
285 // Let elementSize be 1.
286 let mut element_size = 1;
287
288 // Let ctor be %DataView%.
289 let mut ctor = Constructor::DataView;
290
291 // If view has a [[TypedArrayName]] internal slot (i.e., it is not a DataView),
292 if view.has_typed_array_name() {
293 // Set elementSize to the element size specified in the
294 // typed array constructors table for view.[[TypedArrayName]].
295 let view_typw = view.get_array_buffer_view_type();
296 element_size = byte_size(view_typw);
297
298 // Set ctor to the constructor specified in the typed array constructors table for view.[[TypedArrayName]].
299 ctor = Constructor::Name(view_typw);
300 }
301
302 // Let minimumFill be min × elementSize.
303 let minimum_fill = min * element_size;
304
305 // Assert: minimumFill ≥ 0 and minimumFill ≤ view.[[ByteLength]].
306 assert!(minimum_fill <= (view.byte_length() as u64));
307
308 // Assert: the remainder after dividing minimumFill by elementSize is 0.
309 assert_eq!(minimum_fill % element_size, 0);
310
311 // Let byteOffset be view.[[ByteOffset]].
312 let byte_offset = view.get_byte_offset();
313
314 // Let byteLength be view.[[ByteLength]].
315 let byte_length = view.byte_length();
316
317 // Let bufferResult be TransferArrayBuffer(view.[[ViewedArrayBuffer]]).
318 match view
319 .get_array_buffer_view_buffer(cx)
320 .transfer_array_buffer(cx)
321 {
322 Ok(buffer) => {
323 // Let buffer be bufferResult.[[Value]].
324 // Let pullIntoDescriptor be a new pull-into descriptor with
325 // buffer buffer
326 // buffer byte length buffer.[[ArrayBufferByteLength]]
327 // byte offset byteOffset
328 // byte length byteLength
329 // bytes filled 0
330 // minimum fill minimumFill
331 // element size elementSize
332 // view constructor ctor
333 // reader type "byob"
334 let buffer_byte_length = buffer.byte_length();
335 let pull_into_descriptor = PullIntoDescriptor {
336 buffer,
337 buffer_byte_length: buffer_byte_length as u64,
338 byte_offset: byte_offset as u64,
339 byte_length: byte_length as u64,
340 bytes_filled: Cell::new(0),
341 minimum_fill,
342 element_size,
343 view_constructor: ctor.clone(),
344 reader_type: Some(ReaderType::Byob),
345 };
346
347 // If controller.[[pendingPullIntos]] is not empty,
348 {
349 let mut pending_pull_intos = self.pending_pull_intos.borrow_mut();
350 if !pending_pull_intos.is_empty() {
351 // Append pullIntoDescriptor to controller.[[pendingPullIntos]].
352 pending_pull_intos.push(pull_into_descriptor);
353
354 // Perform ! ReadableStreamAddReadIntoRequest(stream, readIntoRequest).
355 stream.add_read_into_request(read_into_request);
356
357 // Return.
358 return;
359 }
360 }
361
362 // If stream.[[state]] is "closed",
363 if stream.is_closed() {
364 // Let emptyView be ! Construct(ctor, « pullIntoDescriptor’s buffer,
365 // pullIntoDescriptor’s byte offset, 0 »).
366 if let Ok(empty_view) = create_buffer_source_with_constructor(
367 cx,
368 &ctor,
369 &pull_into_descriptor.buffer,
370 pull_into_descriptor.byte_offset as usize,
371 0,
372 ) {
373 // Perform readIntoRequest’s close steps, given emptyView.
374 let result = RootedTraceableBox::new(Heap::default());
375 rooted!(in(*cx) let mut view_value = UndefinedValue());
376 empty_view.get_buffer_view_value(cx, view_value.handle_mut());
377 result.set(*view_value);
378
379 read_into_request.close_steps(Some(result), can_gc);
380
381 // Return.
382 return;
383 } else {
384 return;
385 }
386 }
387
388 // If controller.[[queueTotalSize]] > 0,
389 if self.queue_total_size.get() > 0.0 {
390 // If ! ReadableByteStreamControllerFillPullIntoDescriptorFromQueue(
391 // controller, pullIntoDescriptor) is true,
392 if self.fill_pull_into_descriptor_from_queue(cx, &pull_into_descriptor) {
393 // Let filledView be ! ReadableByteStreamControllerConvertPullIntoDescriptor(
394 // pullIntoDescriptor).
395 if let Ok(filled_view) =
396 self.convert_pull_into_descriptor(cx, &pull_into_descriptor)
397 {
398 // Perform ! ReadableByteStreamControllerHandleQueueDrain(controller).
399 self.handle_queue_drain(can_gc);
400
401 // Perform readIntoRequest’s chunk steps, given filledView.
402 let result = RootedTraceableBox::new(Heap::default());
403 rooted!(in(*cx) let mut view_value = UndefinedValue());
404 filled_view.get_buffer_view_value(cx, view_value.handle_mut());
405 result.set(*view_value);
406 read_into_request.chunk_steps(result, can_gc);
407
408 // Return.
409 return;
410 } else {
411 return;
412 }
413 }
414
415 // If controller.[[closeRequested]] is true,
416 if self.close_requested.get() {
417 // Let e be a new TypeError exception.
418 rooted!(in(*cx) let mut error = UndefinedValue());
419 Error::Type("close requested".to_owned()).to_jsval(
420 cx,
421 &self.global(),
422 error.handle_mut(),
423 can_gc,
424 );
425
426 // Perform ! ReadableByteStreamControllerError(controller, e).
427 self.error(error.handle(), can_gc);
428
429 // Perform readIntoRequest’s error steps, given e.
430 read_into_request.error_steps(error.handle(), can_gc);
431
432 // Return.
433 return;
434 }
435 }
436
437 // Append pullIntoDescriptor to controller.[[pendingPullIntos]].
438 {
439 self.pending_pull_intos
440 .borrow_mut()
441 .push(pull_into_descriptor);
442 }
443 // Perform ! ReadableStreamAddReadIntoRequest(stream, readIntoRequest).
444 stream.add_read_into_request(read_into_request);
445
446 // Perform ! ReadableByteStreamControllerCallPullIfNeeded(controller).
447 self.call_pull_if_needed(can_gc);
448 },
449 Err(error) => {
450 // If bufferResult is an abrupt completion,
451
452 // Perform readIntoRequest’s error steps, given bufferResult.[[Value]].
453 rooted!(in(*cx) let mut rval = UndefinedValue());
454 error
455 .clone()
456 .to_jsval(cx, &self.global(), rval.handle_mut(), can_gc);
457 read_into_request.error_steps(rval.handle(), can_gc);
458
459 // Return.
460 },
461 }
462 }
463
464 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-respond>
465 pub(crate) fn respond(
466 &self,
467 cx: SafeJSContext,
468 bytes_written: u64,
469 can_gc: CanGc,
470 ) -> Fallible<()> {
471 {
472 // Assert: controller.[[pendingPullIntos]] is not empty.
473 let mut pending_pull_intos = self.pending_pull_intos.borrow_mut();
474 assert!(!pending_pull_intos.is_empty());
475
476 // Let firstDescriptor be controller.[[pendingPullIntos]][0].
477 let first_descriptor = pending_pull_intos.first_mut().unwrap();
478
479 // Let state be controller.[[stream]].[[state]].
480 let stream = self.stream.get().unwrap();
481
482 // If state is "closed",
483 if stream.is_closed() {
484 // If bytesWritten is not 0, throw a TypeError exception.
485 if bytes_written != 0 {
486 return Err(Error::Type(
487 "bytesWritten not zero on closed stream".to_owned(),
488 ));
489 }
490 } else {
491 // Assert: state is "readable".
492 assert!(stream.is_readable());
493
494 // If bytesWritten is 0, throw a TypeError exception.
495 if bytes_written == 0 {
496 return Err(Error::Type("bytesWritten is 0".to_owned()));
497 }
498
499 // If firstDescriptor’s bytes filled + bytesWritten > firstDescriptor’s byte length,
500 // throw a RangeError exception.
501 if first_descriptor.bytes_filled.get() + bytes_written >
502 first_descriptor.byte_length
503 {
504 return Err(Error::Range(
505 "bytes filled + bytesWritten > byte length".to_owned(),
506 ));
507 }
508 }
509
510 // Set firstDescriptor’s buffer to ! TransferArrayBuffer(firstDescriptor’s buffer).
511 first_descriptor.buffer = first_descriptor
512 .buffer
513 .transfer_array_buffer(cx)
514 .expect("TransferArrayBuffer failed");
515 }
516
517 // Perform ? ReadableByteStreamControllerRespondInternal(controller, bytesWritten).
518 self.respond_internal(cx, bytes_written, can_gc)
519 }
520
521 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-respond-internal>
522 pub(crate) fn respond_internal(
523 &self,
524 cx: SafeJSContext,
525 bytes_written: u64,
526 can_gc: CanGc,
527 ) -> Fallible<()> {
528 {
529 // Let firstDescriptor be controller.[[pendingPullIntos]][0].
530 let pending_pull_intos = self.pending_pull_intos.borrow();
531 let first_descriptor = pending_pull_intos.first().unwrap();
532
533 // Assert: ! CanTransferArrayBuffer(firstDescriptor’s buffer) is true
534 assert!(first_descriptor.buffer.can_transfer_array_buffer(cx));
535 }
536
537 // Perform ! ReadableByteStreamControllerInvalidateBYOBRequest(controller).
538 self.invalidate_byob_request();
539
540 // Let state be controller.[[stream]].[[state]].
541 let stream = self.stream.get().unwrap();
542
543 // If state is "closed",
544 if stream.is_closed() {
545 // Assert: bytesWritten is 0.
546 assert_eq!(bytes_written, 0);
547
548 // Perform ! ReadableByteStreamControllerRespondInClosedState(controller, firstDescriptor).
549 self.respond_in_closed_state(cx, can_gc)
550 .expect("respond_in_closed_state failed");
551 } else {
552 // Assert: state is "readable".
553 assert!(stream.is_readable());
554
555 // Assert: bytesWritten > 0.
556 assert!(bytes_written > 0);
557
558 // Perform ? ReadableByteStreamControllerRespondInReadableState(controller, bytesWritten, firstDescriptor).
559 self.respond_in_readable_state(cx, bytes_written, can_gc)?;
560 }
561
562 // Perform ! ReadableByteStreamControllerCallPullIfNeeded(controller).
563 self.call_pull_if_needed(can_gc);
564
565 Ok(())
566 }
567
568 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-respond-in-closed-state>
569 pub(crate) fn respond_in_closed_state(&self, cx: SafeJSContext, can_gc: CanGc) -> Fallible<()> {
570 let pending_pull_intos = self.pending_pull_intos.borrow();
571 let first_descriptor = pending_pull_intos.first().unwrap();
572
573 // Assert: the remainder after dividing firstDescriptor’s bytes filled
574 // by firstDescriptor’s element size is 0.
575 assert_eq!(
576 first_descriptor.bytes_filled.get() % first_descriptor.element_size,
577 0
578 );
579
580 // If firstDescriptor’s reader type is "none",
581 // perform ! ReadableByteStreamControllerShiftPendingPullInto(controller).
582 let reader_type = first_descriptor.reader_type.is_none();
583
584 // needed to drop the borrow and avoid BorrowMutError
585 drop(pending_pull_intos);
586
587 if reader_type {
588 self.shift_pending_pull_into();
589 }
590
591 // Let stream be controller.[[stream]].
592 let stream = self.stream.get().unwrap();
593
594 // If ! ReadableStreamHasBYOBReader(stream) is true,
595 if stream.has_byob_reader() {
596 // Let filledPullIntos be a new empty list.
597 let mut filled_pull_intos = Vec::new();
598
599 // While filledPullIntos’s size < ! ReadableStreamGetNumReadIntoRequests(stream),
600 while filled_pull_intos.len() < stream.get_num_read_into_requests() {
601 // Let pullIntoDescriptor be ! ReadableByteStreamControllerShiftPendingPullInto(controller).
602 let pull_into_descriptor = self.shift_pending_pull_into();
603
604 // Append pullIntoDescriptor to filledPullIntos.
605 filled_pull_intos.push(pull_into_descriptor);
606 }
607
608 // For each filledPullInto of filledPullIntos,
609 for filled_pull_into in filled_pull_intos {
610 // Perform ! ReadableByteStreamControllerCommitPullIntoDescriptor(stream, filledPullInto).
611 self.commit_pull_into_descriptor(cx, &filled_pull_into, can_gc)
612 .expect("commit_pull_into_descriptor failed");
613 }
614 }
615
616 Ok(())
617 }
618
619 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-respond-in-readable-state>
620 pub(crate) fn respond_in_readable_state(
621 &self,
622 cx: SafeJSContext,
623 bytes_written: u64,
624 can_gc: CanGc,
625 ) -> Fallible<()> {
626 let pending_pull_intos = self.pending_pull_intos.borrow();
627 let first_descriptor = pending_pull_intos.first().unwrap();
628
629 // Assert: pullIntoDescriptor’s bytes filled + bytesWritten ≤ pullIntoDescriptor’s byte length.
630 assert!(
631 first_descriptor.bytes_filled.get() + bytes_written <= first_descriptor.byte_length
632 );
633
634 // Perform ! ReadableByteStreamControllerFillHeadPullIntoDescriptor(
635 // controller, bytesWritten, pullIntoDescriptor).
636 self.fill_head_pull_into_descriptor(bytes_written, first_descriptor);
637
638 // If pullIntoDescriptor’s reader type is "none",
639 if first_descriptor.reader_type.is_none() {
640 // needed to drop the borrow and avoid BorrowMutError
641 drop(pending_pull_intos);
642
643 // Perform ? ReadableByteStreamControllerEnqueueDetachedPullIntoToQueue(controller, pullIntoDescriptor).
644 self.enqueue_detached_pull_into_to_queue(cx, can_gc)?;
645
646 // Let filledPullIntos be the result of performing
647 // ! ReadableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(controller).
648 let filled_pull_intos = self.process_pull_into_descriptors_using_queue(cx);
649
650 // For each filledPullInto of filledPullIntos,
651 for filled_pull_into in filled_pull_intos {
652 // Perform ! ReadableByteStreamControllerCommitPullIntoDescriptor(controller.[[stream]]
653 // , filledPullInto).
654 self.commit_pull_into_descriptor(cx, &filled_pull_into, can_gc)
655 .expect("commit_pull_into_descriptor failed");
656 }
657
658 // Return.
659 return Ok(());
660 }
661
662 // If pullIntoDescriptor’s bytes filled < pullIntoDescriptor’s minimum fill, return.
663 if first_descriptor.bytes_filled.get() < first_descriptor.minimum_fill {
664 return Ok(());
665 }
666
667 // needed to drop the borrow and avoid BorrowMutError
668 drop(pending_pull_intos);
669
670 // Perform ! ReadableByteStreamControllerShiftPendingPullInto(controller).
671 let pull_into_descriptor = self.shift_pending_pull_into();
672
673 // Let remainderSize be the remainder after dividing pullIntoDescriptor’s bytes
674 // filled by pullIntoDescriptor’s element size.
675 let remainder_size =
676 pull_into_descriptor.bytes_filled.get() % pull_into_descriptor.element_size;
677
678 // If remainderSize > 0,
679 if remainder_size > 0 {
680 // Let end be pullIntoDescriptor’s byte offset + pullIntoDescriptor’s bytes filled.
681 let end = pull_into_descriptor.byte_offset + pull_into_descriptor.bytes_filled.get();
682
683 // Perform ? ReadableByteStreamControllerEnqueueClonedChunkToQueue(controller,
684 // pullIntoDescriptor’s buffer, end − remainderSize, remainderSize).
685 self.enqueue_cloned_chunk_to_queue(
686 cx,
687 &pull_into_descriptor.buffer,
688 end - remainder_size,
689 remainder_size,
690 can_gc,
691 )?;
692 }
693
694 // Set pullIntoDescriptor’s bytes filled to pullIntoDescriptor’s bytes filled − remainderSize.
695 pull_into_descriptor
696 .bytes_filled
697 .set(pull_into_descriptor.bytes_filled.get() - remainder_size);
698
699 // Let filledPullIntos be the result of performing
700 // ! ReadableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(controller).
701 let filled_pull_intos = self.process_pull_into_descriptors_using_queue(cx);
702
703 // Perform ! ReadableByteStreamControllerCommitPullIntoDescriptor(controller.[[stream]], pullIntoDescriptor).
704 self.commit_pull_into_descriptor(cx, &pull_into_descriptor, can_gc)
705 .expect("commit_pull_into_descriptor failed");
706
707 // For each filledPullInto of filledPullIntos,
708 for filled_pull_into in filled_pull_intos {
709 // Perform ! ReadableByteStreamControllerCommitPullIntoDescriptor(controller.[[stream]], filledPullInto).
710 self.commit_pull_into_descriptor(cx, &filled_pull_into, can_gc)
711 .expect("commit_pull_into_descriptor failed");
712 }
713
714 Ok(())
715 }
716
717 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-respond-with-new-view>
718 pub(crate) fn respond_with_new_view(
719 &self,
720 cx: SafeJSContext,
721 view: HeapBufferSource<ArrayBufferViewU8>,
722 can_gc: CanGc,
723 ) -> Fallible<()> {
724 let view_byte_length;
725 {
726 // Assert: controller.[[pendingPullIntos]] is not empty.
727 let mut pending_pull_intos = self.pending_pull_intos.borrow_mut();
728 assert!(!pending_pull_intos.is_empty());
729
730 // Assert: ! IsDetachedBuffer(view.[[ViewedArrayBuffer]]) is false.
731 assert!(!view.is_detached_buffer(cx));
732
733 // Let firstDescriptor be controller.[[pendingPullIntos]][0].
734 let first_descriptor = pending_pull_intos.first_mut().unwrap();
735
736 // Let state be controller.[[stream]].[[state]].
737 let stream = self.stream.get().unwrap();
738
739 // If state is "closed",
740 if stream.is_closed() {
741 // If view.[[ByteLength]] is not 0, throw a TypeError exception.
742 if view.byte_length() != 0 {
743 return Err(Error::Type("view byte length is not 0".to_owned()));
744 }
745 } else {
746 // Assert: state is "readable".
747 assert!(stream.is_readable());
748
749 // If view.[[ByteLength]] is 0, throw a TypeError exception.
750 if view.byte_length() == 0 {
751 return Err(Error::Type("view byte length is 0".to_owned()));
752 }
753 }
754
755 // If firstDescriptor’s byte offset + firstDescriptor’ bytes filled is not view.[[ByteOffset]],
756 // throw a RangeError exception.
757 if first_descriptor.byte_offset + first_descriptor.bytes_filled.get() !=
758 (view.get_byte_offset() as u64)
759 {
760 return Err(Error::Range(
761 "firstDescriptor's byte offset + bytes filled is not view byte offset"
762 .to_owned(),
763 ));
764 }
765
766 // If firstDescriptor’s buffer byte length is not view.[[ViewedArrayBuffer]].[[ByteLength]],
767 // throw a RangeError exception.
768 if first_descriptor.buffer_byte_length !=
769 (view.viewed_buffer_array_byte_length(cx) as u64)
770 {
771 return Err(Error::Range(
772 "firstDescriptor's buffer byte length is not view viewed buffer array byte length"
773 .to_owned(),
774 ));
775 }
776
777 // If firstDescriptor’s bytes filled + view.[[ByteLength]] > firstDescriptor’s byte length,
778 // throw a RangeError exception.
779 if first_descriptor.bytes_filled.get() + (view.byte_length()) as u64 >
780 first_descriptor.byte_length
781 {
782 return Err(Error::Range(
783 "bytes filled + view byte length > byte length".to_owned(),
784 ));
785 }
786
787 // Let viewByteLength be view.[[ByteLength]].
788 view_byte_length = view.byte_length();
789
790 // Set firstDescriptor’s buffer to ? TransferArrayBuffer(view.[[ViewedArrayBuffer]]).
791 first_descriptor.buffer = view
792 .get_array_buffer_view_buffer(cx)
793 .transfer_array_buffer(cx)?;
794 }
795
796 // Perform ? ReadableByteStreamControllerRespondInternal(controller, viewByteLength).
797 self.respond_internal(cx, view_byte_length as u64, can_gc)
798 }
799
800 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-get-desired-size>
801 pub(crate) fn get_desired_size(&self) -> Option<f64> {
802 // Let state be controller.[[stream]].[[state]].
803 let stream = self.stream.get()?;
804
805 // If state is "errored", return null.
806 if stream.is_errored() {
807 return None;
808 }
809
810 // If state is "closed", return 0.
811 if stream.is_closed() {
812 return Some(0.0);
813 }
814
815 // Return controller.[[strategyHWM]] − controller.[[queueTotalSize]].
816 Some(self.strategy_hwm - self.queue_total_size.get())
817 }
818
819 /// <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamcontrollergetbyobrequest>
820 pub(crate) fn get_byob_request(
821 &self,
822 cx: SafeJSContext,
823 can_gc: CanGc,
824 ) -> Fallible<Option<DomRoot<ReadableStreamBYOBRequest>>> {
825 // If controller.[[byobRequest]] is null and controller.[[pendingPullIntos]] is not empty,
826 let pending_pull_intos = self.pending_pull_intos.borrow();
827 if self.byob_request.get().is_none() && !pending_pull_intos.is_empty() {
828 // Let firstDescriptor be controller.[[pendingPullIntos]][0].
829 let first_descriptor = pending_pull_intos.first().unwrap();
830 // Let view be ! Construct(%Uint8Array%, « firstDescriptor’s buffer,
831 // firstDescriptor’s byte offset + firstDescriptor’s bytes filled,
832 // firstDescriptor’s byte length − firstDescriptor’s bytes filled »).
833
834 let byte_offset = first_descriptor.byte_offset + first_descriptor.bytes_filled.get();
835 let byte_length = first_descriptor.byte_length - first_descriptor.bytes_filled.get();
836
837 let view = create_buffer_source_with_constructor(
838 cx,
839 &Constructor::Name(Type::Uint8),
840 &first_descriptor.buffer,
841 byte_offset as usize,
842 byte_length as usize,
843 )
844 .expect("Construct Uint8Array failed");
845
846 // Let byobRequest be a new ReadableStreamBYOBRequest.
847 let byob_request = ReadableStreamBYOBRequest::new(&self.global(), can_gc);
848
849 // Set byobRequest.[[controller]] to controller.
850 byob_request.set_controller(Some(&DomRoot::from_ref(self)));
851
852 // Set byobRequest.[[view]] to view.
853 byob_request.set_view(Some(view));
854
855 // Set controller.[[byobRequest]] to byobRequest.
856 self.byob_request.set(Some(&byob_request));
857 }
858
859 // Return controller.[[byobRequest]].
860 Ok(self.byob_request.get())
861 }
862
863 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-close>
864 pub(crate) fn close(&self, cx: SafeJSContext, can_gc: CanGc) -> Fallible<()> {
865 // Let stream be controller.[[stream]].
866 let stream = self.stream.get().unwrap();
867
868 // If controller.[[closeRequested]] is true or stream.[[state]] is not "readable", return.
869 if self.close_requested.get() || !stream.is_readable() {
870 return Ok(());
871 }
872
873 // If controller.[[queueTotalSize]] > 0,
874 if self.queue_total_size.get() > 0.0 {
875 // Set controller.[[closeRequested]] to true.
876 self.close_requested.set(true);
877 // Return.
878 return Ok(());
879 }
880
881 // If controller.[[pendingPullIntos]] is not empty,
882 let pending_pull_intos = self.pending_pull_intos.borrow();
883 if !pending_pull_intos.is_empty() {
884 // Let firstPendingPullInto be controller.[[pendingPullIntos]][0].
885 let first_pending_pull_into = pending_pull_intos.first().unwrap();
886
887 // If the remainder after dividing firstPendingPullInto’s bytes filled by
888 // firstPendingPullInto’s element size is not 0,
889 if first_pending_pull_into.bytes_filled.get() % first_pending_pull_into.element_size !=
890 0
891 {
892 // needed to drop the borrow and avoid BorrowMutError
893 drop(pending_pull_intos);
894
895 // Let e be a new TypeError exception.
896 let e = Error::Type(
897 "remainder after dividing firstPendingPullInto's bytes
898 filled by firstPendingPullInto's element size is not 0"
899 .to_owned(),
900 );
901
902 // Perform ! ReadableByteStreamControllerError(controller, e).
903 rooted!(in(*cx) let mut error = UndefinedValue());
904 e.clone()
905 .to_jsval(cx, &self.global(), error.handle_mut(), can_gc);
906 self.error(error.handle(), can_gc);
907
908 // Throw e.
909 return Err(e);
910 }
911 }
912
913 // Perform ! ReadableByteStreamControllerClearAlgorithms(controller).
914 self.clear_algorithms();
915
916 // Perform ! ReadableStreamClose(stream).
917 stream.close(can_gc);
918 Ok(())
919 }
920
921 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-error>
922 pub(crate) fn error(&self, e: SafeHandleValue, can_gc: CanGc) {
923 // Let stream be controller.[[stream]].
924 let stream = self.stream.get().unwrap();
925
926 // If stream.[[state]] is not "readable", return.
927 if !stream.is_readable() {
928 return;
929 }
930
931 // Perform ! ReadableByteStreamControllerClearPendingPullIntos(controller).
932 self.clear_pending_pull_intos();
933
934 // Perform ! ResetQueue(controller).
935 self.reset_queue();
936
937 // Perform ! ReadableByteStreamControllerClearAlgorithms(controller).
938 self.clear_algorithms();
939
940 // Perform ! ReadableStreamError(stream, e).
941 stream.error(e, can_gc);
942 }
943
944 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-clear-algorithms>
945 fn clear_algorithms(&self) {
946 // Set controller.[[pullAlgorithm]] to undefined.
947 // Set controller.[[cancelAlgorithm]] to undefined.
948 self.underlying_source.set(None);
949 }
950
951 /// <https://streams.spec.whatwg.org/#reset-queue>
952 pub(crate) fn reset_queue(&self) {
953 // Assert: container has [[queue]] and [[queueTotalSize]] internal slots.
954
955 // Set container.[[queue]] to a new empty list.
956 self.queue.borrow_mut().clear();
957
958 // Set container.[[queueTotalSize]] to 0.
959 self.queue_total_size.set(0.0);
960 }
961
962 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-clear-pending-pull-intos>
963 pub(crate) fn clear_pending_pull_intos(&self) {
964 // Perform ! ReadableByteStreamControllerInvalidateBYOBRequest(controller).
965 self.invalidate_byob_request();
966
967 // Set controller.[[pendingPullIntos]] to a new empty list.
968 self.pending_pull_intos.borrow_mut().clear();
969 }
970
971 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-invalidate-byob-request>
972 pub(crate) fn invalidate_byob_request(&self) {
973 if let Some(byob_request) = self.byob_request.get() {
974 // Set controller.[[byobRequest]].[[controller]] to undefined.
975 byob_request.set_controller(None);
976
977 // Set controller.[[byobRequest]].[[view]] to null.
978 byob_request.set_view(None);
979
980 // Set controller.[[byobRequest]] to null.
981 self.byob_request.set(None);
982 }
983 // If controller.[[byobRequest]] is null, return.
984 }
985
986 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-enqueue>
987 pub(crate) fn enqueue(
988 &self,
989 cx: SafeJSContext,
990 chunk: HeapBufferSource<ArrayBufferViewU8>,
991 can_gc: CanGc,
992 ) -> Fallible<()> {
993 // Let stream be controller.[[stream]].
994 let stream = self.stream.get().unwrap();
995
996 // If controller.[[closeRequested]] is true or stream.[[state]] is not "readable", return.
997 if self.close_requested.get() || !stream.is_readable() {
998 return Ok(());
999 }
1000
1001 // Let buffer be chunk.[[ViewedArrayBuffer]].
1002 let buffer = chunk.get_array_buffer_view_buffer(cx);
1003
1004 // Let byteOffset be chunk.[[ByteOffset]].
1005 let byte_offset = chunk.get_byte_offset();
1006
1007 // Let byteLength be chunk.[[ByteLength]].
1008 let byte_length = chunk.byte_length();
1009
1010 // If ! IsDetachedBuffer(buffer) is true, throw a TypeError exception.
1011 if buffer.is_detached_buffer(cx) {
1012 return Err(Error::Type("buffer is detached".to_owned()));
1013 }
1014
1015 // Let transferredBuffer be ? TransferArrayBuffer(buffer).
1016 let transferred_buffer = buffer.transfer_array_buffer(cx)?;
1017
1018 // If controller.[[pendingPullIntos]] is not empty,
1019 {
1020 let mut pending_pull_intos = self.pending_pull_intos.borrow_mut();
1021 if !pending_pull_intos.is_empty() {
1022 // Let firstPendingPullInto be controller.[[pendingPullIntos]][0].
1023 let first_descriptor = pending_pull_intos.first_mut().unwrap();
1024 // If ! IsDetachedBuffer(firstPendingPullInto’s buffer) is true, throw a TypeError exception.
1025 if first_descriptor.buffer.is_detached_buffer(cx) {
1026 return Err(Error::Type("buffer is detached".to_owned()));
1027 }
1028
1029 // Perform ! ReadableByteStreamControllerInvalidateBYOBRequest(controller).
1030 self.invalidate_byob_request();
1031
1032 // Set firstPendingPullInto’s buffer to ! TransferArrayBuffer(firstPendingPullInto’s buffer).
1033 first_descriptor.buffer = first_descriptor
1034 .buffer
1035 .transfer_array_buffer(cx)
1036 .expect("TransferArrayBuffer failed");
1037
1038 // If firstPendingPullInto’s reader type is "none",
1039 if first_descriptor.reader_type.is_none() {
1040 // needed to drop the borrow and avoid BorrowMutError
1041 drop(pending_pull_intos);
1042
1043 // perform ? ReadableByteStreamControllerEnqueueDetachedPullIntoToQueue(
1044 // controller, firstPendingPullInto).
1045 self.enqueue_detached_pull_into_to_queue(cx, can_gc)?;
1046 }
1047 }
1048 }
1049
1050 // If ! ReadableStreamHasDefaultReader(stream) is true,
1051 if stream.has_default_reader() {
1052 // Perform ! ReadableByteStreamControllerProcessReadRequestsUsingQueue(controller).
1053 self.process_read_requests_using_queue(cx, can_gc)
1054 .expect("process_read_requests_using_queue failed");
1055
1056 // If ! ReadableStreamGetNumReadRequests(stream) is 0,
1057 if stream.get_num_read_requests() == 0 {
1058 // Assert: controller.[[pendingPullIntos]] is empty.
1059 {
1060 assert!(self.pending_pull_intos.borrow().is_empty());
1061 }
1062
1063 // Perform ! ReadableByteStreamControllerEnqueueChunkToQueue(
1064 // controller, transferredBuffer, byteOffset, byteLength).
1065 self.enqueue_chunk_to_queue(transferred_buffer, byte_offset, byte_length);
1066 } else {
1067 // Assert: controller.[[queue]] is empty.
1068 assert!(self.queue.borrow().is_empty());
1069
1070 // If controller.[[pendingPullIntos]] is not empty,
1071
1072 let pending_pull_intos = self.pending_pull_intos.borrow();
1073 if !pending_pull_intos.is_empty() {
1074 // Assert: controller.[[pendingPullIntos]][0]'s reader type is "default".
1075 assert!(matches!(
1076 pending_pull_intos.first().unwrap().reader_type,
1077 Some(ReaderType::Default)
1078 ));
1079
1080 // needed to drop the borrow and avoid BorrowMutError
1081 drop(pending_pull_intos);
1082
1083 // Perform ! ReadableByteStreamControllerShiftPendingPullInto(controller).
1084 self.shift_pending_pull_into();
1085 }
1086
1087 // Let transferredView be ! Construct(%Uint8Array%, « transferredBuffer, byteOffset, byteLength »).
1088 let transferred_view = create_buffer_source_with_constructor(
1089 cx,
1090 &Constructor::Name(Type::Uint8),
1091 &transferred_buffer,
1092 byte_offset,
1093 byte_length,
1094 )
1095 .expect("Construct Uint8Array failed");
1096
1097 // Perform ! ReadableStreamFulfillReadRequest(stream, transferredView, false).
1098 rooted!(in(*cx) let mut view_value = UndefinedValue());
1099 transferred_view.get_buffer_view_value(cx, view_value.handle_mut());
1100 stream.fulfill_read_request(view_value.handle(), false, can_gc);
1101 }
1102 // Otherwise, if ! ReadableStreamHasBYOBReader(stream) is true,
1103 } else if stream.has_byob_reader() {
1104 // Perform ! ReadableByteStreamControllerEnqueueChunkToQueue(
1105 // controller, transferredBuffer, byteOffset, byteLength).
1106 self.enqueue_chunk_to_queue(transferred_buffer, byte_offset, byte_length);
1107
1108 // Let filledPullIntos be the result of performing !
1109 // ReadableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(controller).
1110 let filled_pull_intos = self.process_pull_into_descriptors_using_queue(cx);
1111
1112 // For each filledPullInto of filledPullIntos,
1113 // Perform ! ReadableByteStreamControllerCommitPullIntoDescriptor(stream, filledPullInto).
1114 for filled_pull_into in filled_pull_intos {
1115 self.commit_pull_into_descriptor(cx, &filled_pull_into, can_gc)
1116 .expect("commit_pull_into_descriptor failed");
1117 }
1118 } else {
1119 // Assert: ! IsReadableStreamLocked(stream) is false.
1120 assert!(!stream.is_locked());
1121
1122 // Perform ! ReadableByteStreamControllerEnqueueChunkToQueue
1123 // (controller, transferredBuffer, byteOffset, byteLength).
1124 self.enqueue_chunk_to_queue(transferred_buffer, byte_offset, byte_length);
1125 }
1126
1127 // Perform ! ReadableByteStreamControllerCallPullIfNeeded(controller).
1128 self.call_pull_if_needed(can_gc);
1129
1130 Ok(())
1131 }
1132
1133 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-commit-pull-into-descriptor>
1134 pub(crate) fn commit_pull_into_descriptor(
1135 &self,
1136 cx: SafeJSContext,
1137 pull_into_descriptor: &PullIntoDescriptor,
1138 can_gc: CanGc,
1139 ) -> Fallible<()> {
1140 // Assert: stream.[[state]] is not "errored".
1141 let stream = self.stream.get().unwrap();
1142 assert!(!stream.is_errored());
1143
1144 // Assert: pullIntoDescriptor.reader type is not "none".
1145 assert!(pull_into_descriptor.reader_type.is_some());
1146
1147 // Let done be false.
1148 let mut done = false;
1149
1150 // If stream.[[state]] is "closed",
1151 if stream.is_closed() {
1152 // Assert: the remainder after dividing pullIntoDescriptor’s bytes filled
1153 // by pullIntoDescriptor’s element size is 0.
1154 assert!(
1155 pull_into_descriptor.bytes_filled.get() % pull_into_descriptor.element_size == 0
1156 );
1157
1158 // Set done to true.
1159 done = true;
1160 }
1161
1162 // Let filledView be ! ReadableByteStreamControllerConvertPullIntoDescriptor(pullIntoDescriptor).
1163 let filled_view = self
1164 .convert_pull_into_descriptor(cx, pull_into_descriptor)
1165 .expect("convert_pull_into_descriptor failed");
1166
1167 rooted!(in(*cx) let mut view_value = UndefinedValue());
1168 filled_view.get_buffer_view_value(cx, view_value.handle_mut());
1169
1170 // If pullIntoDescriptor’s reader type is "default",
1171 if matches!(pull_into_descriptor.reader_type, Some(ReaderType::Default)) {
1172 // Perform ! ReadableStreamFulfillReadRequest(stream, filledView, done).
1173
1174 stream.fulfill_read_request(view_value.handle(), done, can_gc);
1175 } else {
1176 // Assert: pullIntoDescriptor’s reader type is "byob".
1177 assert!(matches!(
1178 pull_into_descriptor.reader_type,
1179 Some(ReaderType::Byob)
1180 ));
1181
1182 // Perform ! ReadableStreamFulfillReadIntoRequest(stream, filledView, done).
1183 stream.fulfill_read_into_request(view_value.handle(), done, can_gc);
1184 }
1185 Ok(())
1186 }
1187
1188 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-convert-pull-into-descriptor>
1189 pub(crate) fn convert_pull_into_descriptor(
1190 &self,
1191 cx: SafeJSContext,
1192 pull_into_descriptor: &PullIntoDescriptor,
1193 ) -> Fallible<HeapBufferSource<ArrayBufferViewU8>> {
1194 // Let bytesFilled be pullIntoDescriptor’s bytes filled.
1195 let bytes_filled = pull_into_descriptor.bytes_filled.get();
1196
1197 // Let elementSize be pullIntoDescriptor’s element size.
1198 let element_size = pull_into_descriptor.element_size;
1199
1200 // Assert: bytesFilled ≤ pullIntoDescriptor’s byte length.
1201 assert!(bytes_filled <= pull_into_descriptor.byte_length);
1202
1203 // Assert: the remainder after dividing bytesFilled by elementSize is 0.
1204 assert!(bytes_filled % element_size == 0);
1205
1206 // Let buffer be ! TransferArrayBuffer(pullIntoDescriptor’s buffer).
1207 let buffer = pull_into_descriptor
1208 .buffer
1209 .transfer_array_buffer(cx)
1210 .expect("TransferArrayBuffer failed");
1211
1212 // Return ! Construct(pullIntoDescriptor’s view constructor,
1213 // « buffer, pullIntoDescriptor’s byte offset, bytesFilled ÷ elementSize »).
1214 Ok(create_buffer_source_with_constructor(
1215 cx,
1216 &pull_into_descriptor.view_constructor,
1217 &buffer,
1218 pull_into_descriptor.byte_offset as usize,
1219 (bytes_filled / element_size) as usize,
1220 )
1221 .expect("Construct view failed"))
1222 }
1223
1224 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-process-pull-into-descriptors-using-queue>
1225 pub(crate) fn process_pull_into_descriptors_using_queue(
1226 &self,
1227 cx: SafeJSContext,
1228 ) -> Vec<PullIntoDescriptor> {
1229 // Assert: controller.[[closeRequested]] is false.
1230 assert!(!self.close_requested.get());
1231
1232 // Let filledPullIntos be a new empty list.
1233 let mut filled_pull_intos = Vec::new();
1234
1235 // While controller.[[pendingPullIntos]] is not empty,
1236 loop {
1237 // If controller.[[queueTotalSize]] is 0, then break.
1238 if self.queue_total_size.get() == 0.0 {
1239 break;
1240 }
1241
1242 // Let pullIntoDescriptor be controller.[[pendingPullIntos]][0].
1243 let fill_pull_result = {
1244 let pending_pull_intos = self.pending_pull_intos.borrow();
1245 let Some(pull_into_descriptor) = pending_pull_intos.first() else {
1246 break;
1247 };
1248 self.fill_pull_into_descriptor_from_queue(cx, pull_into_descriptor)
1249 };
1250
1251 // If ! ReadableByteStreamControllerFillPullIntoDescriptorFromQueue(controller, pullIntoDescriptor) is true,
1252 if fill_pull_result {
1253 // Perform ! ReadableByteStreamControllerShiftPendingPullInto(controller).
1254 let pull_into_descriptor = self.shift_pending_pull_into();
1255
1256 // Append pullIntoDescriptor to filledPullIntos.
1257 filled_pull_intos.push(pull_into_descriptor);
1258 }
1259 }
1260
1261 // Return filledPullIntos.
1262 filled_pull_intos
1263 }
1264
1265 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-fill-pull-into-descriptor-from-queue>
1266 pub(crate) fn fill_pull_into_descriptor_from_queue(
1267 &self,
1268 cx: SafeJSContext,
1269 pull_into_descriptor: &PullIntoDescriptor,
1270 ) -> bool {
1271 // Let maxBytesToCopy be min(controller.[[queueTotalSize]],
1272 // pullIntoDescriptor’s byte length − pullIntoDescriptor’s bytes filled).
1273 let max_bytes_to_copy = min(
1274 self.queue_total_size.get() as usize,
1275 (pull_into_descriptor.byte_length - pull_into_descriptor.bytes_filled.get()) as usize,
1276 );
1277
1278 // Let maxBytesFilled be pullIntoDescriptor’s bytes filled + maxBytesToCopy.
1279 let max_bytes_filled = pull_into_descriptor.bytes_filled.get() as usize + max_bytes_to_copy;
1280
1281 // Let totalBytesToCopyRemaining be maxBytesToCopy.
1282 let mut total_bytes_to_copy_remaining = max_bytes_to_copy;
1283
1284 // Let ready be false.
1285 let mut ready = false;
1286
1287 // Assert: ! IsDetachedBuffer(pullIntoDescriptor’s buffer) is false.
1288 assert!(!pull_into_descriptor.buffer.is_detached_buffer(cx));
1289
1290 // Assert: pullIntoDescriptor’s bytes filled < pullIntoDescriptor’s minimum fill.
1291 assert!(pull_into_descriptor.bytes_filled.get() < pull_into_descriptor.minimum_fill);
1292
1293 // Let remainderBytes be the remainder after dividing maxBytesFilled by pullIntoDescriptor’s element size.
1294 let remainder_bytes = max_bytes_filled % pull_into_descriptor.element_size as usize;
1295
1296 // Let maxAlignedBytes be maxBytesFilled − remainderBytes.
1297 let max_aligned_bytes = max_bytes_filled - remainder_bytes;
1298
1299 // If maxAlignedBytes ≥ pullIntoDescriptor’s minimum fill,
1300 if max_aligned_bytes >= pull_into_descriptor.minimum_fill as usize {
1301 // Set totalBytesToCopyRemaining to maxAlignedBytes − pullIntoDescriptor’s bytes filled.
1302 total_bytes_to_copy_remaining =
1303 max_aligned_bytes - (pull_into_descriptor.bytes_filled.get() as usize);
1304
1305 // Set ready to true.
1306 ready = true;
1307 }
1308
1309 // Let queue be controller.[[queue]].
1310 let mut queue = self.queue.borrow_mut();
1311
1312 // While totalBytesToCopyRemaining > 0,
1313 while total_bytes_to_copy_remaining > 0 {
1314 // Let headOfQueue be queue[0].
1315 let head_of_queue = queue.front_mut().unwrap();
1316
1317 // Let bytesToCopy be min(totalBytesToCopyRemaining, headOfQueue’s byte length).
1318 let bytes_to_copy = total_bytes_to_copy_remaining.min(head_of_queue.byte_length);
1319
1320 // Let destStart be pullIntoDescriptor’s byte offset + pullIntoDescriptor’s bytes filled.
1321 let dest_start =
1322 pull_into_descriptor.byte_offset + pull_into_descriptor.bytes_filled.get();
1323
1324 // Let descriptorBuffer be pullIntoDescriptor’s buffer.
1325 let descriptor_buffer = &pull_into_descriptor.buffer;
1326
1327 // Let queueBuffer be headOfQueue’s buffer.
1328 let queue_buffer = &head_of_queue.buffer;
1329
1330 // Let queueByteOffset be headOfQueue’s byte offset.
1331 let queue_byte_offset = head_of_queue.byte_offset;
1332
1333 // Assert: ! CanCopyDataBlockBytes(descriptorBuffer, destStart,
1334 // queueBuffer, queueByteOffset, bytesToCopy) is true.
1335 assert!(descriptor_buffer.can_copy_data_block_bytes(
1336 cx,
1337 dest_start as usize,
1338 queue_buffer,
1339 queue_byte_offset,
1340 bytes_to_copy
1341 ));
1342
1343 // Perform ! CopyDataBlockBytes(descriptorBuffer.[[ArrayBufferData]], destStart,
1344 // queueBuffer.[[ArrayBufferData]], queueByteOffset, bytesToCopy).
1345 descriptor_buffer.copy_data_block_bytes(
1346 cx,
1347 dest_start as usize,
1348 queue_buffer,
1349 queue_byte_offset,
1350 bytes_to_copy,
1351 );
1352
1353 // If headOfQueue’s byte length is bytesToCopy,
1354 if head_of_queue.byte_length == bytes_to_copy {
1355 // Remove queue[0].
1356 queue.pop_front().unwrap();
1357 } else {
1358 // Set headOfQueue’s byte offset to headOfQueue’s byte offset + bytesToCopy.
1359 head_of_queue.byte_offset += bytes_to_copy;
1360
1361 // Set headOfQueue’s byte length to headOfQueue’s byte length − bytesToCopy.
1362 head_of_queue.byte_length -= bytes_to_copy;
1363 }
1364
1365 // Set controller.[[queueTotalSize]] to controller.[[queueTotalSize]] − bytesToCopy.
1366 self.queue_total_size
1367 .set(self.queue_total_size.get() - (bytes_to_copy as f64));
1368
1369 // Perform ! ReadableByteStreamControllerFillHeadPullIntoDescriptor(
1370 // controller, bytesToCopy, pullIntoDescriptor).
1371 self.fill_head_pull_into_descriptor(bytes_to_copy as u64, pull_into_descriptor);
1372
1373 // Set totalBytesToCopyRemaining to totalBytesToCopyRemaining − bytesToCopy.
1374 total_bytes_to_copy_remaining -= bytes_to_copy;
1375 }
1376
1377 // If ready is false,
1378 if !ready {
1379 // Assert: controller.[[queueTotalSize]] is 0.
1380 assert!(self.queue_total_size.get() == 0.0);
1381
1382 // Assert: pullIntoDescriptor’s bytes filled > 0.
1383 assert!(pull_into_descriptor.bytes_filled.get() > 0);
1384
1385 // Assert: pullIntoDescriptor’s bytes filled < pullIntoDescriptor’s minimum fill.
1386 assert!(pull_into_descriptor.bytes_filled.get() < pull_into_descriptor.minimum_fill);
1387 }
1388
1389 // Return ready.
1390 ready
1391 }
1392
1393 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-fill-head-pull-into-descriptor>
1394 pub(crate) fn fill_head_pull_into_descriptor(
1395 &self,
1396 bytes_copied: u64,
1397 pull_into_descriptor: &PullIntoDescriptor,
1398 ) {
1399 // Assert: either controller.[[pendingPullIntos]] is empty,
1400 // or controller.[[pendingPullIntos]][0] is pullIntoDescriptor.
1401 {
1402 let pending_pull_intos = self.pending_pull_intos.borrow();
1403 assert!(
1404 pending_pull_intos.is_empty() ||
1405 pending_pull_intos.first().unwrap() == pull_into_descriptor
1406 );
1407 }
1408
1409 // Assert: controller.[[byobRequest]] is null.
1410 assert!(self.byob_request.get().is_none());
1411
1412 // Set pullIntoDescriptor’s bytes filled to bytes filled + size.
1413 pull_into_descriptor
1414 .bytes_filled
1415 .set(pull_into_descriptor.bytes_filled.get() + bytes_copied);
1416 }
1417
1418 /// <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamcontrollerenqueuedetachedpullintotoqueue>
1419 pub(crate) fn enqueue_detached_pull_into_to_queue(
1420 &self,
1421 cx: SafeJSContext,
1422 can_gc: CanGc,
1423 ) -> Fallible<()> {
1424 // first_descriptor: &PullIntoDescriptor,
1425 let pending_pull_intos = self.pending_pull_intos.borrow();
1426 let first_descriptor = pending_pull_intos.first().unwrap();
1427
1428 // Assert: pullIntoDescriptor’s reader type is "none".
1429 assert!(first_descriptor.reader_type.is_none());
1430
1431 // If pullIntoDescriptor’s bytes filled > 0, perform ?
1432 // ReadableByteStreamControllerEnqueueClonedChunkToQueue(controller,
1433 // pullIntoDescriptor’s buffer, pullIntoDescriptor’s byte offset, pullIntoDescriptor’s bytes filled).
1434
1435 if first_descriptor.bytes_filled.get() > 0 {
1436 self.enqueue_cloned_chunk_to_queue(
1437 cx,
1438 &first_descriptor.buffer,
1439 first_descriptor.byte_offset,
1440 first_descriptor.bytes_filled.get(),
1441 can_gc,
1442 )?;
1443 }
1444
1445 // needed to drop the borrow and avoid BorrowMutError
1446 drop(pending_pull_intos);
1447
1448 // Perform ! ReadableByteStreamControllerShiftPendingPullInto(controller).
1449 self.shift_pending_pull_into();
1450
1451 Ok(())
1452 }
1453
1454 /// <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamcontrollerenqueueclonedchunktoqueue>
1455 pub(crate) fn enqueue_cloned_chunk_to_queue(
1456 &self,
1457 cx: SafeJSContext,
1458 buffer: &HeapBufferSource<ArrayBufferU8>,
1459 byte_offset: u64,
1460 byte_length: u64,
1461 can_gc: CanGc,
1462 ) -> Fallible<()> {
1463 // Let cloneResult be CloneArrayBuffer(buffer, byteOffset, byteLength, %ArrayBuffer%).
1464 if let Ok(clone_result) =
1465 buffer.clone_array_buffer(cx, byte_offset as usize, byte_length as usize)
1466 {
1467 // Perform ! ReadableByteStreamControllerEnqueueChunkToQueue
1468 // (controller, cloneResult.[[Value]], 0, byteLength).
1469 self.enqueue_chunk_to_queue(clone_result, 0, byte_length as usize);
1470
1471 Ok(())
1472 } else {
1473 // If cloneResult is an abrupt completion,
1474
1475 // Perform ! ReadableByteStreamControllerError(controller, cloneResult.[[Value]]).
1476 rooted!(in(*cx) let mut rval = UndefinedValue());
1477 let error = Error::Type("can not clone array buffer".to_owned());
1478 error
1479 .clone()
1480 .to_jsval(cx, &self.global(), rval.handle_mut(), can_gc);
1481 self.error(rval.handle(), can_gc);
1482
1483 // Return cloneResult.
1484 Err(error)
1485 }
1486 }
1487
1488 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-enqueue-chunk-to-queue>
1489 pub(crate) fn enqueue_chunk_to_queue(
1490 &self,
1491 buffer: HeapBufferSource<ArrayBufferU8>,
1492 byte_offset: usize,
1493 byte_length: usize,
1494 ) {
1495 // Let entry be a new ReadableByteStreamQueueEntry object.
1496 let entry = QueueEntry::new(buffer, byte_offset, byte_length);
1497
1498 // Append entry to controller.[[queue]].
1499 self.queue.borrow_mut().push_back(entry);
1500
1501 // Set controller.[[queueTotalSize]] to controller.[[queueTotalSize]] + byteLength.
1502 self.queue_total_size
1503 .set(self.queue_total_size.get() + byte_length as f64);
1504 }
1505
1506 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-shift-pending-pull-into>
1507 pub(crate) fn shift_pending_pull_into(&self) -> PullIntoDescriptor {
1508 // Assert: controller.[[byobRequest]] is null.
1509 assert!(self.byob_request.get().is_none());
1510
1511 // Let descriptor be controller.[[pendingPullIntos]][0].
1512 // Remove descriptor from controller.[[pendingPullIntos]].
1513 // Return descriptor.
1514 self.pending_pull_intos.borrow_mut().remove(0)
1515 }
1516
1517 /// <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamcontrollerprocessreadrequestsusingqueue>
1518 pub(crate) fn process_read_requests_using_queue(
1519 &self,
1520 cx: SafeJSContext,
1521 can_gc: CanGc,
1522 ) -> Fallible<()> {
1523 // Let reader be controller.[[stream]].[[reader]].
1524 // Assert: reader implements ReadableStreamDefaultReader.
1525 let reader = self.stream.get().unwrap().get_default_reader();
1526
1527 // Step 3
1528 reader.process_read_requests(cx, DomRoot::from_ref(self), can_gc)
1529 }
1530
1531 /// <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamcontrollerfillreadrequestfromqueue>
1532 pub(crate) fn fill_read_request_from_queue(
1533 &self,
1534 cx: SafeJSContext,
1535 read_request: &ReadRequest,
1536 can_gc: CanGc,
1537 ) -> Fallible<()> {
1538 // Assert: controller.[[queueTotalSize]] > 0.
1539 assert!(self.queue_total_size.get() > 0.0);
1540 // Also assert that the queue has a non-zero length;
1541 assert!(!self.queue.borrow().is_empty());
1542
1543 // Let entry be controller.[[queue]][0].
1544 // Remove entry from controller.[[queue]].
1545 let entry = self.remove_entry();
1546
1547 // Set controller.[[queueTotalSize]] to controller.[[queueTotalSize]] − entry’s byte length.
1548 self.queue_total_size
1549 .set(self.queue_total_size.get() - entry.byte_length as f64);
1550
1551 // Perform ! ReadableByteStreamControllerHandleQueueDrain(controller).
1552 self.handle_queue_drain(can_gc);
1553
1554 // Let view be ! Construct(%Uint8Array%, « entry’s buffer, entry’s byte offset, entry’s byte length »).
1555 let view = create_buffer_source_with_constructor(
1556 cx,
1557 &Constructor::Name(Type::Uint8),
1558 &entry.buffer,
1559 entry.byte_offset,
1560 entry.byte_length,
1561 )
1562 .expect("Construct Uint8Array failed");
1563
1564 // Perform readRequest’s chunk steps, given view.
1565 let result = RootedTraceableBox::new(Heap::default());
1566 rooted!(in(*cx) let mut view_value = UndefinedValue());
1567 view.get_buffer_view_value(cx, view_value.handle_mut());
1568 result.set(*view_value);
1569
1570 read_request.chunk_steps(result, &self.global(), can_gc);
1571
1572 Ok(())
1573 }
1574
1575 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-handle-queue-drain>
1576 pub(crate) fn handle_queue_drain(&self, can_gc: CanGc) {
1577 // Assert: controller.[[stream]].[[state]] is "readable".
1578 assert!(self.stream.get().unwrap().is_readable());
1579
1580 // If controller.[[queueTotalSize]] is 0 and controller.[[closeRequested]] is true,
1581 if self.queue_total_size.get() == 0.0 && self.close_requested.get() {
1582 // Perform ! ReadableByteStreamControllerClearAlgorithms(controller).
1583 self.clear_algorithms();
1584
1585 // Perform ! ReadableStreamClose(controller.[[stream]]).
1586 self.stream.get().unwrap().close(can_gc);
1587 } else {
1588 // Perform ! ReadableByteStreamControllerCallPullIfNeeded(controller).
1589 self.call_pull_if_needed(can_gc);
1590 }
1591 }
1592
1593 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-call-pull-if-needed>
1594 pub(crate) fn call_pull_if_needed(&self, can_gc: CanGc) {
1595 // Let shouldPull be ! ReadableByteStreamControllerShouldCallPull(controller).
1596 let should_pull = self.should_call_pull();
1597 // If shouldPull is false, return.
1598 if !should_pull {
1599 return;
1600 }
1601
1602 // If controller.[[pulling]] is true,
1603 if self.pulling.get() {
1604 // Set controller.[[pullAgain]] to true.
1605 self.pull_again.set(true);
1606
1607 // Return.
1608 return;
1609 }
1610
1611 // Assert: controller.[[pullAgain]] is false.
1612 assert!(!self.pull_again.get());
1613
1614 // Set controller.[[pulling]] to true.
1615 self.pulling.set(true);
1616
1617 // Let pullPromise be the result of performing controller.[[pullAlgorithm]].
1618 // Continues into the resolve and reject handling of the native handler.
1619 let global = self.global();
1620 let rooted_controller = DomRoot::from_ref(self);
1621 let controller = Controller::ReadableByteStreamController(rooted_controller.clone());
1622
1623 if let Some(underlying_source) = self.underlying_source.get() {
1624 let handler = PromiseNativeHandler::new(
1625 &global,
1626 Some(Box::new(PullAlgorithmFulfillmentHandler {
1627 controller: Dom::from_ref(&rooted_controller),
1628 })),
1629 Some(Box::new(PullAlgorithmRejectionHandler {
1630 controller: Dom::from_ref(&rooted_controller),
1631 })),
1632 can_gc,
1633 );
1634
1635 let realm = enter_realm(&*global);
1636 let comp = InRealm::Entered(&realm);
1637 let result = underlying_source
1638 .call_pull_algorithm(controller, &global, can_gc)
1639 .unwrap_or_else(|| {
1640 let promise = Promise::new(&global, can_gc);
1641 promise.resolve_native(&(), can_gc);
1642 Ok(promise)
1643 });
1644 let promise = result.unwrap_or_else(|error| {
1645 let cx = GlobalScope::get_cx();
1646 rooted!(in(*cx) let mut rval = UndefinedValue());
1647 // TODO: check if `self.global()` is the right globalscope.
1648 error
1649 .clone()
1650 .to_jsval(cx, &self.global(), rval.handle_mut(), can_gc);
1651 let promise = Promise::new(&global, can_gc);
1652 promise.reject_native(&rval.handle(), can_gc);
1653 promise
1654 });
1655 promise.append_native_handler(&handler, comp, can_gc);
1656 }
1657 }
1658
1659 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-should-call-pull>
1660 fn should_call_pull(&self) -> bool {
1661 // Let stream be controller.[[stream]].
1662 // Note: the spec does not assert that stream is not undefined here,
1663 // so we return false if it is.
1664 let stream = self.stream.get().unwrap();
1665
1666 // If stream.[[state]] is not "readable", return false.
1667 if !stream.is_readable() {
1668 return false;
1669 }
1670
1671 // If controller.[[closeRequested]] is true, return false.
1672 if self.close_requested.get() {
1673 return false;
1674 }
1675
1676 // If controller.[[started]] is false, return false.
1677 if !self.started.get() {
1678 return false;
1679 }
1680
1681 // If ! ReadableStreamHasDefaultReader(stream) is true and ! ReadableStreamGetNumReadRequests(stream) > 0
1682 // , return true.
1683 if stream.has_default_reader() && stream.get_num_read_requests() > 0 {
1684 return true;
1685 }
1686
1687 // If ! ReadableStreamHasBYOBReader(stream) is true and ! ReadableStreamGetNumReadIntoRequests(stream) > 0
1688 // , return true.
1689 if stream.has_byob_reader() && stream.get_num_read_into_requests() > 0 {
1690 return true;
1691 }
1692
1693 // Let desiredSize be ! ReadableByteStreamControllerGetDesiredSize(controller).
1694 let desired_size = self.get_desired_size();
1695
1696 // Assert: desiredSize is not null.
1697 assert!(desired_size.is_some());
1698
1699 // If desiredSize > 0, return true.
1700 if desired_size.unwrap() > 0. {
1701 return true;
1702 }
1703
1704 // Return false.
1705 false
1706 }
1707 /// <https://streams.spec.whatwg.org/#set-up-readable-byte-stream-controller>
1708 pub(crate) fn setup(
1709 &self,
1710 global: &GlobalScope,
1711 stream: DomRoot<ReadableStream>,
1712 can_gc: CanGc,
1713 ) -> Fallible<()> {
1714 // Assert: stream.[[controller]] is undefined.
1715 stream.assert_no_controller();
1716
1717 // If autoAllocateChunkSize is not undefined,
1718 if self.auto_allocate_chunk_size.is_some() {
1719 // Assert: ! IsInteger(autoAllocateChunkSize) is true. Implicit
1720 // Assert: autoAllocateChunkSize is positive. (Implicit by type.)
1721 }
1722
1723 // Set controller.[[stream]] to stream.
1724 self.stream.set(Some(&stream));
1725
1726 // Set controller.[[pullAgain]] and controller.[[pulling]] to false.
1727 self.pull_again.set(false);
1728 self.pulling.set(false);
1729
1730 // Set controller.[[byobRequest]] to null.
1731 self.byob_request.set(None);
1732
1733 // Perform ! ResetQueue(controller).
1734 self.reset_queue();
1735
1736 // Set controller.[[closeRequested]] and controller.[[started]] to false.
1737 self.close_requested.set(false);
1738 self.started.set(false);
1739
1740 // Set controller.[[strategyHWM]] to highWaterMark.
1741 // Set controller.[[pullAlgorithm]] to pullAlgorithm.
1742 // Set controller.[[cancelAlgorithm]] to cancelAlgorithm.
1743 // Set controller.[[autoAllocateChunkSize]] to autoAllocateChunkSize.
1744 // Set controller.[[pendingPullIntos]] to a new empty list.
1745 // Note: the above steps are done in `new`.
1746
1747 // Set stream.[[controller]] to controller.
1748 let rooted_byte_controller = DomRoot::from_ref(self);
1749 stream.set_byte_controller(&rooted_byte_controller);
1750
1751 if let Some(underlying_source) = rooted_byte_controller.underlying_source.get() {
1752 // Let startResult be the result of performing startAlgorithm. (This might throw an exception.)
1753 let start_result = underlying_source
1754 .call_start_algorithm(
1755 Controller::ReadableByteStreamController(rooted_byte_controller.clone()),
1756 can_gc,
1757 )
1758 .unwrap_or_else(|| {
1759 let promise = Promise::new(global, can_gc);
1760 promise.resolve_native(&(), can_gc);
1761 Ok(promise)
1762 });
1763
1764 // Let startPromise be a promise resolved with startResult.
1765 let start_promise = start_result?;
1766
1767 // Upon fulfillment of startPromise, Upon rejection of startPromise with reason r,
1768 let handler = PromiseNativeHandler::new(
1769 global,
1770 Some(Box::new(StartAlgorithmFulfillmentHandler {
1771 controller: Dom::from_ref(&rooted_byte_controller),
1772 })),
1773 Some(Box::new(StartAlgorithmRejectionHandler {
1774 controller: Dom::from_ref(&rooted_byte_controller),
1775 })),
1776 can_gc,
1777 );
1778 let realm = enter_realm(global);
1779 let comp = InRealm::Entered(&realm);
1780 start_promise.append_native_handler(&handler, comp, can_gc);
1781 };
1782
1783 Ok(())
1784 }
1785
1786 // <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamcontroller-releasesteps
1787 pub(crate) fn perform_release_steps(&self) -> Fallible<()> {
1788 // If this.[[pendingPullIntos]] is not empty,
1789 let mut pending_pull_intos = self.pending_pull_intos.borrow_mut();
1790 if !pending_pull_intos.is_empty() {
1791 // Let firstPendingPullInto be this.[[pendingPullIntos]][0].
1792 let mut first_pending_pull_into = pending_pull_intos.remove(0);
1793
1794 // Set firstPendingPullInto’s reader type to "none".
1795 first_pending_pull_into.reader_type = None;
1796
1797 // Set this.[[pendingPullIntos]] to the list « firstPendingPullInto »
1798 pending_pull_intos.clear();
1799 pending_pull_intos.push(first_pending_pull_into);
1800 }
1801 Ok(())
1802 }
1803
1804 /// <https://streams.spec.whatwg.org/#rbs-controller-private-cancel>
1805 pub(crate) fn perform_cancel_steps(
1806 &self,
1807 cx: SafeJSContext,
1808 global: &GlobalScope,
1809 reason: SafeHandleValue,
1810 can_gc: CanGc,
1811 ) -> Rc<Promise> {
1812 // Perform ! ReadableByteStreamControllerClearPendingPullIntos(this).
1813 self.clear_pending_pull_intos();
1814
1815 // Perform ! ResetQueue(this).
1816 self.reset_queue();
1817
1818 let underlying_source = self
1819 .underlying_source
1820 .get()
1821 .expect("Controller should have a source when the cancel steps are called into.");
1822
1823 // Let result be the result of performing this.[[cancelAlgorithm]], passing in reason.
1824 let result = underlying_source
1825 .call_cancel_algorithm(cx, global, reason, can_gc)
1826 .unwrap_or_else(|| {
1827 let promise = Promise::new(global, can_gc);
1828 promise.resolve_native(&(), can_gc);
1829 Ok(promise)
1830 });
1831
1832 let promise = result.unwrap_or_else(|error| {
1833 let cx = GlobalScope::get_cx();
1834 rooted!(in(*cx) let mut rval = UndefinedValue());
1835 error
1836 .clone()
1837 .to_jsval(cx, global, rval.handle_mut(), can_gc);
1838 let promise = Promise::new(global, can_gc);
1839 promise.reject_native(&rval.handle(), can_gc);
1840 promise
1841 });
1842
1843 // Perform ! ReadableByteStreamControllerClearAlgorithms(this).
1844 self.clear_algorithms();
1845
1846 // Return result(the promise).
1847 promise
1848 }
1849
1850 /// <https://streams.spec.whatwg.org/#rbs-controller-private-pull>
1851 pub(crate) fn perform_pull_steps(
1852 &self,
1853 cx: SafeJSContext,
1854 read_request: &ReadRequest,
1855 can_gc: CanGc,
1856 ) {
1857 // Let stream be this.[[stream]].
1858 let stream = self.stream.get().unwrap();
1859
1860 // Assert: ! ReadableStreamHasDefaultReader(stream) is true.
1861 assert!(stream.has_default_reader());
1862
1863 // If this.[[queueTotalSize]] > 0,
1864 if self.queue_total_size.get() > 0.0 {
1865 // Assert: ! ReadableStreamGetNumReadRequests(stream) is 0.
1866 assert_eq!(stream.get_num_read_requests(), 0);
1867
1868 // Perform ! ReadableByteStreamControllerFillReadRequestFromQueue(this, readRequest).
1869 let _ = self.fill_read_request_from_queue(cx, read_request, can_gc);
1870
1871 // Return.
1872 return;
1873 }
1874
1875 // Let autoAllocateChunkSize be this.[[autoAllocateChunkSize]].
1876 let auto_allocate_chunk_size = self.auto_allocate_chunk_size;
1877
1878 // If autoAllocateChunkSize is not undefined,
1879 if let Some(auto_allocate_chunk_size) = auto_allocate_chunk_size {
1880 // create_array_buffer_with_size
1881 // Let buffer be Construct(%ArrayBuffer%, « autoAllocateChunkSize »).
1882 match create_array_buffer_with_size(cx, auto_allocate_chunk_size as usize) {
1883 Ok(buffer) => {
1884 // Let pullIntoDescriptor be a new pull-into descriptor with
1885 // buffer buffer.[[Value]]
1886 // buffer byte length autoAllocateChunkSize
1887 // byte offset 0
1888 // byte length autoAllocateChunkSize
1889 // bytes filled 0
1890 // minimum fill 1
1891 // element size 1
1892 // view constructor %Uint8Array%
1893 // reader type "default"
1894 let pull_into_descriptor = PullIntoDescriptor {
1895 buffer,
1896 buffer_byte_length: auto_allocate_chunk_size,
1897 byte_length: auto_allocate_chunk_size,
1898 byte_offset: 0,
1899 bytes_filled: Cell::new(0),
1900 minimum_fill: 1,
1901 element_size: 1,
1902 view_constructor: Constructor::Name(Type::Uint8),
1903 reader_type: Some(ReaderType::Default),
1904 };
1905
1906 // Append pullIntoDescriptor to this.[[pendingPullIntos]].
1907 self.pending_pull_intos
1908 .borrow_mut()
1909 .push(pull_into_descriptor);
1910 },
1911 Err(error) => {
1912 // If buffer is an abrupt completion,
1913 // Perform readRequest’s error steps, given buffer.[[Value]].
1914
1915 rooted!(in(*cx) let mut rval = UndefinedValue());
1916 error
1917 .clone()
1918 .to_jsval(cx, &self.global(), rval.handle_mut(), can_gc);
1919 read_request.error_steps(rval.handle(), can_gc);
1920
1921 // Return.
1922 return;
1923 },
1924 }
1925 }
1926
1927 // Perform ! ReadableStreamAddReadRequest(stream, readRequest).
1928 stream.add_read_request(read_request);
1929
1930 // Perform ! ReadableByteStreamControllerCallPullIfNeeded(this).
1931 self.call_pull_if_needed(can_gc);
1932 }
1933
1934 /// Setting the JS object after the heap has settled down.
1935 pub(crate) fn set_underlying_source_this_object(&self, this_object: HandleObject) {
1936 if let Some(underlying_source) = self.underlying_source.get() {
1937 underlying_source.set_underlying_source_this_object(this_object);
1938 }
1939 }
1940
1941 pub(crate) fn remove_entry(&self) -> QueueEntry {
1942 self.queue
1943 .borrow_mut()
1944 .pop_front()
1945 .expect("Reader must have read request when remove is called into.")
1946 }
1947
1948 pub(crate) fn get_queue_total_size(&self) -> f64 {
1949 self.queue_total_size.get()
1950 }
1951
1952 pub(crate) fn get_pending_pull_intos_size(&self) -> usize {
1953 self.pending_pull_intos.borrow().len()
1954 }
1955}
1956
1957impl ReadableByteStreamControllerMethods<crate::DomTypeHolder> for ReadableByteStreamController {
1958 /// <https://streams.spec.whatwg.org/#rbs-controller-byob-request>
1959 fn GetByobRequest(
1960 &self,
1961 can_gc: CanGc,
1962 ) -> Fallible<Option<DomRoot<ReadableStreamBYOBRequest>>> {
1963 let cx = GlobalScope::get_cx();
1964 // Return ! ReadableByteStreamControllerGetBYOBRequest(this).
1965 self.get_byob_request(cx, can_gc)
1966 }
1967
1968 /// <https://streams.spec.whatwg.org/#rbs-controller-desired-size>
1969 fn GetDesiredSize(&self) -> Option<f64> {
1970 // Return ! ReadableByteStreamControllerGetDesiredSize(this).
1971 self.get_desired_size()
1972 }
1973
1974 /// <https://streams.spec.whatwg.org/#rbs-controller-close>
1975 fn Close(&self, can_gc: CanGc) -> Fallible<()> {
1976 let cx = GlobalScope::get_cx();
1977 // If this.[[closeRequested]] is true, throw a TypeError exception.
1978 if self.close_requested.get() {
1979 return Err(Error::Type("closeRequested is true".to_owned()));
1980 }
1981
1982 // If this.[[stream]].[[state]] is not "readable", throw a TypeError exception.
1983 if !self.stream.get().unwrap().is_readable() {
1984 return Err(Error::Type("stream is not readable".to_owned()));
1985 }
1986
1987 // Perform ? ReadableByteStreamControllerClose(this).
1988 self.close(cx, can_gc)
1989 }
1990
1991 /// <https://streams.spec.whatwg.org/#rbs-controller-enqueue>
1992 fn Enqueue(
1993 &self,
1994 chunk: js::gc::CustomAutoRooterGuard<js::typedarray::ArrayBufferView>,
1995 can_gc: CanGc,
1996 ) -> Fallible<()> {
1997 let cx = GlobalScope::get_cx();
1998
1999 let chunk = HeapBufferSource::<ArrayBufferViewU8>::from_view(chunk);
2000
2001 // If chunk.[[ByteLength]] is 0, throw a TypeError exception.
2002 if chunk.byte_length() == 0 {
2003 return Err(Error::Type("chunk.ByteLength is 0".to_owned()));
2004 }
2005
2006 // If chunk.[[ViewedArrayBuffer]].[[ByteLength]] is 0, throw a TypeError exception.
2007 if chunk.viewed_buffer_array_byte_length(cx) == 0 {
2008 return Err(Error::Type(
2009 "chunk.ViewedArrayBuffer.ByteLength is 0".to_owned(),
2010 ));
2011 }
2012
2013 // If this.[[closeRequested]] is true, throw a TypeError exception.
2014 if self.close_requested.get() {
2015 return Err(Error::Type("closeRequested is true".to_owned()));
2016 }
2017
2018 // If this.[[stream]].[[state]] is not "readable", throw a TypeError exception.
2019 if !self.stream.get().unwrap().is_readable() {
2020 return Err(Error::Type("stream is not readable".to_owned()));
2021 }
2022
2023 // Return ? ReadableByteStreamControllerEnqueue(this, chunk).
2024 self.enqueue(cx, chunk, can_gc)
2025 }
2026
2027 /// <https://streams.spec.whatwg.org/#rbs-controller-error>
2028 fn Error(&self, _cx: SafeJSContext, e: SafeHandleValue, can_gc: CanGc) -> Fallible<()> {
2029 // Perform ! ReadableByteStreamControllerError(this, e).
2030 self.error(e, can_gc);
2031 Ok(())
2032 }
2033}