script/dom/stream/
readablebytestreamcontroller.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4
5use std::cell::Cell;
6use std::cmp::min;
7use std::collections::VecDeque;
8use std::rc::Rc;
9
10use dom_struct::dom_struct;
11use js::jsapi::{Heap, Type};
12use js::jsval::UndefinedValue;
13use js::realm::CurrentRealm;
14use js::rust::{HandleObject, HandleValue as SafeHandleValue, HandleValue};
15use js::typedarray::{ArrayBufferU8, ArrayBufferViewU8};
16
17use super::readablestreambyobreader::ReadIntoRequest;
18use super::readablestreamdefaultreader::ReadRequest;
19use super::underlyingsourcecontainer::{UnderlyingSourceContainer, UnderlyingSourceType};
20use crate::dom::bindings::buffer_source::{
21    Constructor, HeapBufferSource, byte_size, create_array_buffer_with_size,
22    create_buffer_source_with_constructor,
23};
24use crate::dom::bindings::cell::DomRefCell;
25use crate::dom::bindings::codegen::Bindings::ReadableByteStreamControllerBinding::ReadableByteStreamControllerMethods;
26use crate::dom::bindings::codegen::UnionTypes::ReadableStreamDefaultControllerOrReadableByteStreamController as Controller;
27use crate::dom::bindings::error::{Error, ErrorToJsval, Fallible};
28use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object};
29use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom};
30use crate::dom::bindings::trace::RootedTraceableBox;
31use crate::dom::globalscope::GlobalScope;
32use crate::dom::promise::Promise;
33use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
34use crate::dom::stream::readablestream::ReadableStream;
35use crate::dom::stream::readablestreambyobrequest::ReadableStreamBYOBRequest;
36use crate::realms::{InRealm, enter_realm};
37use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
38
39/// <https://streams.spec.whatwg.org/#readable-byte-stream-queue-entry>
40#[derive(JSTraceable, MallocSizeOf)]
41pub(crate) struct QueueEntry {
42    /// <https://streams.spec.whatwg.org/#readable-byte-stream-queue-entry-buffer>
43    #[ignore_malloc_size_of = "HeapBufferSource"]
44    buffer: HeapBufferSource<ArrayBufferU8>,
45    /// <https://streams.spec.whatwg.org/#readable-byte-stream-queue-entry-byte-offset>
46    byte_offset: usize,
47    /// <https://streams.spec.whatwg.org/#readable-byte-stream-queue-entry-byte-length>
48    byte_length: usize,
49}
50
51impl QueueEntry {
52    pub(crate) fn new(
53        buffer: HeapBufferSource<ArrayBufferU8>,
54        byte_offset: usize,
55        byte_length: usize,
56    ) -> QueueEntry {
57        QueueEntry {
58            buffer,
59            byte_offset,
60            byte_length,
61        }
62    }
63}
64
65#[derive(Debug, Eq, JSTraceable, MallocSizeOf, PartialEq)]
66pub(crate) enum ReaderType {
67    /// <https://streams.spec.whatwg.org/#readablestreambyobreader>
68    Byob,
69    /// <https://streams.spec.whatwg.org/#readablestreamdefaultreader>
70    Default,
71}
72
73/// <https://streams.spec.whatwg.org/#pull-into-descriptor>
74#[derive(Eq, JSTraceable, MallocSizeOf, PartialEq)]
75pub(crate) struct PullIntoDescriptor {
76    #[ignore_malloc_size_of = "HeapBufferSource"]
77    /// <https://streams.spec.whatwg.org/#pull-into-descriptor-buffer>
78    buffer: HeapBufferSource<ArrayBufferU8>,
79    /// <https://streams.spec.whatwg.org/#pull-into-descriptor-buffer-byte-length>
80    buffer_byte_length: u64,
81    /// <https://streams.spec.whatwg.org/#pull-into-descriptor-byte-offset>
82    byte_offset: u64,
83    /// <https://streams.spec.whatwg.org/#pull-into-descriptor-byte-length>
84    byte_length: u64,
85    /// <https://streams.spec.whatwg.org/#pull-into-descriptor-bytes-filled>
86    bytes_filled: Cell<u64>,
87    /// <https://streams.spec.whatwg.org/#pull-into-descriptor-minimum-fill>
88    minimum_fill: u64,
89    /// <https://streams.spec.whatwg.org/#pull-into-descriptor-element-size>
90    element_size: u64,
91    /// <https://streams.spec.whatwg.org/#pull-into-descriptor-view-constructor>
92    view_constructor: Constructor,
93    /// <https://streams.spec.whatwg.org/#pull-into-descriptor-reader-type>
94    reader_type: Option<ReaderType>,
95}
96
97/// The fulfillment handler for
98/// <https://streams.spec.whatwg.org/#dom-underlyingsource-start>
99#[derive(Clone, JSTraceable, MallocSizeOf)]
100#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
101struct StartAlgorithmFulfillmentHandler {
102    controller: Dom<ReadableByteStreamController>,
103}
104
105impl Callback for StartAlgorithmFulfillmentHandler {
106    /// Continuation of <https://streams.spec.whatwg.org/#set-up-readable-byte-stream-controller>
107    /// Upon fulfillment of startPromise,
108    fn callback(&self, cx: &mut CurrentRealm, _v: HandleValue) {
109        let can_gc = CanGc::from_cx(cx);
110        // Set controller.[[started]] to true.
111        self.controller.started.set(true);
112
113        // Assert: controller.[[pulling]] is false.
114        assert!(!self.controller.pulling.get());
115
116        // Assert: controller.[[pullAgain]] is false.
117        assert!(!self.controller.pull_again.get());
118
119        // Perform ! ReadableByteStreamControllerCallPullIfNeeded(controller).
120        self.controller.call_pull_if_needed(can_gc);
121    }
122}
123
124/// The rejection handler for
125/// <https://streams.spec.whatwg.org/#dom-underlyingsource-start>
126#[derive(Clone, JSTraceable, MallocSizeOf)]
127#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
128struct StartAlgorithmRejectionHandler {
129    controller: Dom<ReadableByteStreamController>,
130}
131
132impl Callback for StartAlgorithmRejectionHandler {
133    /// Continuation of <https://streams.spec.whatwg.org/#set-up-readable-byte-stream-controller>
134    /// Upon rejection of startPromise with reason r,
135    fn callback(&self, cx: &mut CurrentRealm, v: HandleValue) {
136        let can_gc = CanGc::from_cx(cx);
137        // Perform ! ReadableByteStreamControllerError(controller, r).
138        self.controller.error(v, can_gc);
139    }
140}
141
142/// The fulfillment handler for
143/// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-call-pull-if-needed>
144#[derive(Clone, JSTraceable, MallocSizeOf)]
145#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
146struct PullAlgorithmFulfillmentHandler {
147    controller: Dom<ReadableByteStreamController>,
148}
149
150impl Callback for PullAlgorithmFulfillmentHandler {
151    /// Continuation of <https://streams.spec.whatwg.org/#readable-byte-stream-controller-call-pull-if-needed>
152    /// Upon fulfillment of pullPromise
153    fn callback(&self, cx: &mut CurrentRealm, _v: HandleValue) {
154        let can_gc = CanGc::from_cx(cx);
155        // Set controller.[[pulling]] to false.
156        self.controller.pulling.set(false);
157
158        // If controller.[[pullAgain]] is true,
159        if self.controller.pull_again.get() {
160            // Set controller.[[pullAgain]] to false.
161            self.controller.pull_again.set(false);
162
163            // Perform ! ReadableByteStreamControllerCallPullIfNeeded(controller).
164            self.controller.call_pull_if_needed(can_gc);
165        }
166    }
167}
168
169/// The rejection handler for
170/// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-call-pull-if-needed>
171#[derive(Clone, JSTraceable, MallocSizeOf)]
172#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
173struct PullAlgorithmRejectionHandler {
174    controller: Dom<ReadableByteStreamController>,
175}
176
177impl Callback for PullAlgorithmRejectionHandler {
178    /// Continuation of <https://streams.spec.whatwg.org/#readable-stream-byte-controller-call-pull-if-needed>
179    /// Upon rejection of pullPromise with reason e.
180    fn callback(&self, cx: &mut CurrentRealm, v: HandleValue) {
181        let can_gc = CanGc::from_cx(cx);
182        // Perform ! ReadableByteStreamControllerError(controller, e).
183        self.controller.error(v, can_gc);
184    }
185}
186
187/// <https://streams.spec.whatwg.org/#readablebytestreamcontroller>
188#[dom_struct]
189pub(crate) struct ReadableByteStreamController {
190    reflector_: Reflector,
191    /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-autoallocatechunksize>
192    auto_allocate_chunk_size: Option<u64>,
193    /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-stream>
194    stream: MutNullableDom<ReadableStream>,
195    /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-strategyhwm>
196    strategy_hwm: f64,
197    /// A mutable reference to the underlying source is used to implement these two
198    /// internal slots:
199    ///
200    /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-pullalgorithm>
201    /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-cancelalgorithm>
202    underlying_source: MutNullableDom<UnderlyingSourceContainer>,
203    /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-queue>
204    queue: DomRefCell<VecDeque<QueueEntry>>,
205    /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-queuetotalsize>
206    queue_total_size: Cell<f64>,
207    /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-byobrequest>
208    byob_request: MutNullableDom<ReadableStreamBYOBRequest>,
209    /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-pendingpullintos>
210    pending_pull_intos: DomRefCell<Vec<PullIntoDescriptor>>,
211    /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-closerequested>
212    close_requested: Cell<bool>,
213    /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-started>
214    started: Cell<bool>,
215    /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-pulling>
216    pulling: Cell<bool>,
217    /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-pullalgorithm>
218    pull_again: Cell<bool>,
219}
220
221impl ReadableByteStreamController {
222    #[cfg_attr(crown, expect(crown::unrooted_must_root))]
223    fn new_inherited(
224        underlying_source_type: UnderlyingSourceType,
225        strategy_hwm: f64,
226        global: &GlobalScope,
227        can_gc: CanGc,
228    ) -> ReadableByteStreamController {
229        let underlying_source_container =
230            UnderlyingSourceContainer::new(global, underlying_source_type, can_gc);
231        let auto_allocate_chunk_size = underlying_source_container.auto_allocate_chunk_size();
232        ReadableByteStreamController {
233            reflector_: Reflector::new(),
234            byob_request: MutNullableDom::new(None),
235            stream: MutNullableDom::new(None),
236            underlying_source: MutNullableDom::new(Some(&*underlying_source_container)),
237            auto_allocate_chunk_size,
238            pending_pull_intos: DomRefCell::new(Vec::new()),
239            strategy_hwm,
240            close_requested: Default::default(),
241            queue: DomRefCell::new(Default::default()),
242            queue_total_size: Default::default(),
243            started: Default::default(),
244            pulling: Default::default(),
245            pull_again: Default::default(),
246        }
247    }
248
249    #[cfg_attr(crown, expect(crown::unrooted_must_root))]
250    pub(crate) fn new(
251        underlying_source_type: UnderlyingSourceType,
252        strategy_hwm: f64,
253        global: &GlobalScope,
254        can_gc: CanGc,
255    ) -> DomRoot<ReadableByteStreamController> {
256        reflect_dom_object(
257            Box::new(ReadableByteStreamController::new_inherited(
258                underlying_source_type,
259                strategy_hwm,
260                global,
261                can_gc,
262            )),
263            global,
264            can_gc,
265        )
266    }
267
268    #[allow(dead_code)]
269    pub(crate) fn set_stream(&self, stream: &ReadableStream) {
270        self.stream.set(Some(stream))
271    }
272
273    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-pull-into>
274    pub(crate) fn perform_pull_into(
275        &self,
276        cx: SafeJSContext,
277        read_into_request: &ReadIntoRequest,
278        view: HeapBufferSource<ArrayBufferViewU8>,
279        min: u64,
280        can_gc: CanGc,
281    ) {
282        // Let stream be controller.[[stream]].
283        let stream = self.stream.get().unwrap();
284
285        // Let elementSize be 1.
286        let mut element_size = 1;
287
288        // Let ctor be %DataView%.
289        let mut ctor = Constructor::DataView;
290
291        // If view has a [[TypedArrayName]] internal slot (i.e., it is not a DataView),
292        if view.has_typed_array_name() {
293            // Set elementSize to the element size specified in the
294            // typed array constructors table for view.[[TypedArrayName]].
295            let view_typw = view.get_array_buffer_view_type();
296            element_size = byte_size(view_typw);
297
298            // Set ctor to the constructor specified in the typed array constructors table for view.[[TypedArrayName]].
299            ctor = Constructor::Name(view_typw);
300        }
301
302        // Let minimumFill be min × elementSize.
303        let minimum_fill = min * element_size;
304
305        // Assert: minimumFill ≥ 0 and minimumFill ≤ view.[[ByteLength]].
306        assert!(minimum_fill <= (view.byte_length() as u64));
307
308        // Assert: the remainder after dividing minimumFill by elementSize is 0.
309        assert_eq!(minimum_fill % element_size, 0);
310
311        // Let byteOffset be view.[[ByteOffset]].
312        let byte_offset = view.get_byte_offset();
313
314        // Let byteLength be view.[[ByteLength]].
315        let byte_length = view.byte_length();
316
317        // Let bufferResult be TransferArrayBuffer(view.[[ViewedArrayBuffer]]).
318        match view
319            .get_array_buffer_view_buffer(cx)
320            .transfer_array_buffer(cx)
321        {
322            Ok(buffer) => {
323                // Let buffer be bufferResult.[[Value]].
324                // Let pullIntoDescriptor be a new pull-into descriptor with
325                // buffer   buffer
326                // buffer byte length   buffer.[[ArrayBufferByteLength]]
327                // byte offset  byteOffset
328                // byte length  byteLength
329                // bytes filled  0
330                // minimum fill minimumFill
331                // element size elementSize
332                // view constructor ctor
333                // reader type  "byob"
334                let buffer_byte_length = buffer.byte_length();
335                let pull_into_descriptor = PullIntoDescriptor {
336                    buffer,
337                    buffer_byte_length: buffer_byte_length as u64,
338                    byte_offset: byte_offset as u64,
339                    byte_length: byte_length as u64,
340                    bytes_filled: Cell::new(0),
341                    minimum_fill,
342                    element_size,
343                    view_constructor: ctor.clone(),
344                    reader_type: Some(ReaderType::Byob),
345                };
346
347                // If controller.[[pendingPullIntos]] is not empty,
348                {
349                    let mut pending_pull_intos = self.pending_pull_intos.borrow_mut();
350                    if !pending_pull_intos.is_empty() {
351                        // Append pullIntoDescriptor to controller.[[pendingPullIntos]].
352                        pending_pull_intos.push(pull_into_descriptor);
353
354                        // Perform ! ReadableStreamAddReadIntoRequest(stream, readIntoRequest).
355                        stream.add_read_into_request(read_into_request);
356
357                        // Return.
358                        return;
359                    }
360                }
361
362                // If stream.[[state]] is "closed",
363                if stream.is_closed() {
364                    // Let emptyView be ! Construct(ctor, « pullIntoDescriptor’s buffer,
365                    // pullIntoDescriptor’s byte offset, 0 »).
366                    if let Ok(empty_view) = create_buffer_source_with_constructor(
367                        cx,
368                        &ctor,
369                        &pull_into_descriptor.buffer,
370                        pull_into_descriptor.byte_offset as usize,
371                        0,
372                    ) {
373                        // Perform readIntoRequest’s close steps, given emptyView.
374                        let result = RootedTraceableBox::new(Heap::default());
375                        rooted!(in(*cx) let mut view_value = UndefinedValue());
376                        empty_view.get_buffer_view_value(cx, view_value.handle_mut());
377                        result.set(*view_value);
378
379                        read_into_request.close_steps(Some(result), can_gc);
380
381                        // Return.
382                        return;
383                    } else {
384                        return;
385                    }
386                }
387
388                // If controller.[[queueTotalSize]] > 0,
389                if self.queue_total_size.get() > 0.0 {
390                    // If ! ReadableByteStreamControllerFillPullIntoDescriptorFromQueue(
391                    // controller, pullIntoDescriptor) is true,
392                    if self.fill_pull_into_descriptor_from_queue(cx, &pull_into_descriptor) {
393                        // Let filledView be ! ReadableByteStreamControllerConvertPullIntoDescriptor(
394                        // pullIntoDescriptor).
395                        if let Ok(filled_view) =
396                            self.convert_pull_into_descriptor(cx, &pull_into_descriptor)
397                        {
398                            // Perform ! ReadableByteStreamControllerHandleQueueDrain(controller).
399                            self.handle_queue_drain(can_gc);
400
401                            // Perform readIntoRequest’s chunk steps, given filledView.
402                            let result = RootedTraceableBox::new(Heap::default());
403                            rooted!(in(*cx) let mut view_value = UndefinedValue());
404                            filled_view.get_buffer_view_value(cx, view_value.handle_mut());
405                            result.set(*view_value);
406                            read_into_request.chunk_steps(result, can_gc);
407
408                            // Return.
409                            return;
410                        } else {
411                            return;
412                        }
413                    }
414
415                    // If controller.[[closeRequested]] is true,
416                    if self.close_requested.get() {
417                        // Let e be a new TypeError exception.
418                        rooted!(in(*cx) let mut error = UndefinedValue());
419                        Error::Type("close requested".to_owned()).to_jsval(
420                            cx,
421                            &self.global(),
422                            error.handle_mut(),
423                            can_gc,
424                        );
425
426                        // Perform ! ReadableByteStreamControllerError(controller, e).
427                        self.error(error.handle(), can_gc);
428
429                        // Perform readIntoRequest’s error steps, given e.
430                        read_into_request.error_steps(error.handle(), can_gc);
431
432                        // Return.
433                        return;
434                    }
435                }
436
437                // Append pullIntoDescriptor to controller.[[pendingPullIntos]].
438                {
439                    self.pending_pull_intos
440                        .borrow_mut()
441                        .push(pull_into_descriptor);
442                }
443                // Perform ! ReadableStreamAddReadIntoRequest(stream, readIntoRequest).
444                stream.add_read_into_request(read_into_request);
445
446                // Perform ! ReadableByteStreamControllerCallPullIfNeeded(controller).
447                self.call_pull_if_needed(can_gc);
448            },
449            Err(error) => {
450                // If bufferResult is an abrupt completion,
451
452                // Perform readIntoRequest’s error steps, given bufferResult.[[Value]].
453                rooted!(in(*cx) let mut rval = UndefinedValue());
454                error
455                    .clone()
456                    .to_jsval(cx, &self.global(), rval.handle_mut(), can_gc);
457                read_into_request.error_steps(rval.handle(), can_gc);
458
459                // Return.
460            },
461        }
462    }
463
464    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-respond>
465    pub(crate) fn respond(
466        &self,
467        cx: SafeJSContext,
468        bytes_written: u64,
469        can_gc: CanGc,
470    ) -> Fallible<()> {
471        {
472            // Assert: controller.[[pendingPullIntos]] is not empty.
473            let mut pending_pull_intos = self.pending_pull_intos.borrow_mut();
474            assert!(!pending_pull_intos.is_empty());
475
476            // Let firstDescriptor be controller.[[pendingPullIntos]][0].
477            let first_descriptor = pending_pull_intos.first_mut().unwrap();
478
479            // Let state be controller.[[stream]].[[state]].
480            let stream = self.stream.get().unwrap();
481
482            // If state is "closed",
483            if stream.is_closed() {
484                // If bytesWritten is not 0, throw a TypeError exception.
485                if bytes_written != 0 {
486                    return Err(Error::Type(
487                        "bytesWritten not zero on closed stream".to_owned(),
488                    ));
489                }
490            } else {
491                // Assert: state is "readable".
492                assert!(stream.is_readable());
493
494                // If bytesWritten is 0, throw a TypeError exception.
495                if bytes_written == 0 {
496                    return Err(Error::Type("bytesWritten is 0".to_owned()));
497                }
498
499                // If firstDescriptor’s bytes filled + bytesWritten > firstDescriptor’s byte length,
500                // throw a RangeError exception.
501                if first_descriptor.bytes_filled.get() + bytes_written >
502                    first_descriptor.byte_length
503                {
504                    return Err(Error::Range(
505                        "bytes filled + bytesWritten > byte length".to_owned(),
506                    ));
507                }
508            }
509
510            // Set firstDescriptor’s buffer to ! TransferArrayBuffer(firstDescriptor’s buffer).
511            first_descriptor.buffer = first_descriptor
512                .buffer
513                .transfer_array_buffer(cx)
514                .expect("TransferArrayBuffer failed");
515        }
516
517        // Perform ? ReadableByteStreamControllerRespondInternal(controller, bytesWritten).
518        self.respond_internal(cx, bytes_written, can_gc)
519    }
520
521    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-respond-internal>
522    pub(crate) fn respond_internal(
523        &self,
524        cx: SafeJSContext,
525        bytes_written: u64,
526        can_gc: CanGc,
527    ) -> Fallible<()> {
528        {
529            // Let firstDescriptor be controller.[[pendingPullIntos]][0].
530            let pending_pull_intos = self.pending_pull_intos.borrow();
531            let first_descriptor = pending_pull_intos.first().unwrap();
532
533            // Assert: ! CanTransferArrayBuffer(firstDescriptor’s buffer) is true
534            assert!(first_descriptor.buffer.can_transfer_array_buffer(cx));
535        }
536
537        // Perform ! ReadableByteStreamControllerInvalidateBYOBRequest(controller).
538        self.invalidate_byob_request();
539
540        // Let state be controller.[[stream]].[[state]].
541        let stream = self.stream.get().unwrap();
542
543        // If state is "closed",
544        if stream.is_closed() {
545            // Assert: bytesWritten is 0.
546            assert_eq!(bytes_written, 0);
547
548            // Perform ! ReadableByteStreamControllerRespondInClosedState(controller, firstDescriptor).
549            self.respond_in_closed_state(cx, can_gc)
550                .expect("respond_in_closed_state failed");
551        } else {
552            // Assert: state is "readable".
553            assert!(stream.is_readable());
554
555            // Assert: bytesWritten > 0.
556            assert!(bytes_written > 0);
557
558            // Perform ? ReadableByteStreamControllerRespondInReadableState(controller, bytesWritten, firstDescriptor).
559            self.respond_in_readable_state(cx, bytes_written, can_gc)?;
560        }
561
562        // Perform ! ReadableByteStreamControllerCallPullIfNeeded(controller).
563        self.call_pull_if_needed(can_gc);
564
565        Ok(())
566    }
567
568    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-respond-in-closed-state>
569    pub(crate) fn respond_in_closed_state(&self, cx: SafeJSContext, can_gc: CanGc) -> Fallible<()> {
570        let pending_pull_intos = self.pending_pull_intos.borrow();
571        let first_descriptor = pending_pull_intos.first().unwrap();
572
573        // Assert: the remainder after dividing firstDescriptor’s bytes filled
574        // by firstDescriptor’s element size is 0.
575        assert_eq!(
576            first_descriptor.bytes_filled.get() % first_descriptor.element_size,
577            0
578        );
579
580        // If firstDescriptor’s reader type is "none",
581        // perform ! ReadableByteStreamControllerShiftPendingPullInto(controller).
582        let reader_type = first_descriptor.reader_type.is_none();
583
584        // needed to drop the borrow and avoid BorrowMutError
585        drop(pending_pull_intos);
586
587        if reader_type {
588            self.shift_pending_pull_into();
589        }
590
591        // Let stream be controller.[[stream]].
592        let stream = self.stream.get().unwrap();
593
594        // If ! ReadableStreamHasBYOBReader(stream) is true,
595        if stream.has_byob_reader() {
596            // Let filledPullIntos be a new empty list.
597            let mut filled_pull_intos = Vec::new();
598
599            // While filledPullIntos’s size < ! ReadableStreamGetNumReadIntoRequests(stream),
600            while filled_pull_intos.len() < stream.get_num_read_into_requests() {
601                // Let pullIntoDescriptor be ! ReadableByteStreamControllerShiftPendingPullInto(controller).
602                let pull_into_descriptor = self.shift_pending_pull_into();
603
604                // Append pullIntoDescriptor to filledPullIntos.
605                filled_pull_intos.push(pull_into_descriptor);
606            }
607
608            // For each filledPullInto of filledPullIntos,
609            for filled_pull_into in filled_pull_intos {
610                // Perform ! ReadableByteStreamControllerCommitPullIntoDescriptor(stream, filledPullInto).
611                self.commit_pull_into_descriptor(cx, &filled_pull_into, can_gc)
612                    .expect("commit_pull_into_descriptor failed");
613            }
614        }
615
616        Ok(())
617    }
618
619    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-respond-in-readable-state>
620    pub(crate) fn respond_in_readable_state(
621        &self,
622        cx: SafeJSContext,
623        bytes_written: u64,
624        can_gc: CanGc,
625    ) -> Fallible<()> {
626        let pending_pull_intos = self.pending_pull_intos.borrow();
627        let first_descriptor = pending_pull_intos.first().unwrap();
628
629        // Assert: pullIntoDescriptor’s bytes filled + bytesWritten ≤ pullIntoDescriptor’s byte length.
630        assert!(
631            first_descriptor.bytes_filled.get() + bytes_written <= first_descriptor.byte_length
632        );
633
634        // Perform ! ReadableByteStreamControllerFillHeadPullIntoDescriptor(
635        // controller, bytesWritten, pullIntoDescriptor).
636        self.fill_head_pull_into_descriptor(bytes_written, first_descriptor);
637
638        // If pullIntoDescriptor’s reader type is "none",
639        if first_descriptor.reader_type.is_none() {
640            // needed to drop the borrow and avoid BorrowMutError
641            drop(pending_pull_intos);
642
643            // Perform ? ReadableByteStreamControllerEnqueueDetachedPullIntoToQueue(controller, pullIntoDescriptor).
644            self.enqueue_detached_pull_into_to_queue(cx, can_gc)?;
645
646            // Let filledPullIntos be the result of performing
647            // ! ReadableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(controller).
648            let filled_pull_intos = self.process_pull_into_descriptors_using_queue(cx);
649
650            // For each filledPullInto of filledPullIntos,
651            for filled_pull_into in filled_pull_intos {
652                // Perform ! ReadableByteStreamControllerCommitPullIntoDescriptor(controller.[[stream]]
653                // , filledPullInto).
654                self.commit_pull_into_descriptor(cx, &filled_pull_into, can_gc)
655                    .expect("commit_pull_into_descriptor failed");
656            }
657
658            // Return.
659            return Ok(());
660        }
661
662        // If pullIntoDescriptor’s bytes filled < pullIntoDescriptor’s minimum fill, return.
663        if first_descriptor.bytes_filled.get() < first_descriptor.minimum_fill {
664            return Ok(());
665        }
666
667        // needed to drop the borrow and avoid BorrowMutError
668        drop(pending_pull_intos);
669
670        // Perform ! ReadableByteStreamControllerShiftPendingPullInto(controller).
671        let pull_into_descriptor = self.shift_pending_pull_into();
672
673        // Let remainderSize be the remainder after dividing pullIntoDescriptor’s bytes
674        // filled by pullIntoDescriptor’s element size.
675        let remainder_size =
676            pull_into_descriptor.bytes_filled.get() % pull_into_descriptor.element_size;
677
678        // If remainderSize > 0,
679        if remainder_size > 0 {
680            // Let end be pullIntoDescriptor’s byte offset + pullIntoDescriptor’s bytes filled.
681            let end = pull_into_descriptor.byte_offset + pull_into_descriptor.bytes_filled.get();
682
683            // Perform ? ReadableByteStreamControllerEnqueueClonedChunkToQueue(controller,
684            // pullIntoDescriptor’s buffer, end − remainderSize, remainderSize).
685            self.enqueue_cloned_chunk_to_queue(
686                cx,
687                &pull_into_descriptor.buffer,
688                end - remainder_size,
689                remainder_size,
690                can_gc,
691            )?;
692        }
693
694        // Set pullIntoDescriptor’s bytes filled to pullIntoDescriptor’s bytes filled − remainderSize.
695        pull_into_descriptor
696            .bytes_filled
697            .set(pull_into_descriptor.bytes_filled.get() - remainder_size);
698
699        // Let filledPullIntos be the result of performing
700        // ! ReadableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(controller).
701        let filled_pull_intos = self.process_pull_into_descriptors_using_queue(cx);
702
703        // Perform ! ReadableByteStreamControllerCommitPullIntoDescriptor(controller.[[stream]], pullIntoDescriptor).
704        self.commit_pull_into_descriptor(cx, &pull_into_descriptor, can_gc)
705            .expect("commit_pull_into_descriptor failed");
706
707        // For each filledPullInto of filledPullIntos,
708        for filled_pull_into in filled_pull_intos {
709            // Perform ! ReadableByteStreamControllerCommitPullIntoDescriptor(controller.[[stream]], filledPullInto).
710            self.commit_pull_into_descriptor(cx, &filled_pull_into, can_gc)
711                .expect("commit_pull_into_descriptor failed");
712        }
713
714        Ok(())
715    }
716
717    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-respond-with-new-view>
718    pub(crate) fn respond_with_new_view(
719        &self,
720        cx: SafeJSContext,
721        view: HeapBufferSource<ArrayBufferViewU8>,
722        can_gc: CanGc,
723    ) -> Fallible<()> {
724        let view_byte_length;
725        {
726            // Assert: controller.[[pendingPullIntos]] is not empty.
727            let mut pending_pull_intos = self.pending_pull_intos.borrow_mut();
728            assert!(!pending_pull_intos.is_empty());
729
730            // Assert: ! IsDetachedBuffer(view.[[ViewedArrayBuffer]]) is false.
731            assert!(!view.is_detached_buffer(cx));
732
733            // Let firstDescriptor be controller.[[pendingPullIntos]][0].
734            let first_descriptor = pending_pull_intos.first_mut().unwrap();
735
736            // Let state be controller.[[stream]].[[state]].
737            let stream = self.stream.get().unwrap();
738
739            // If state is "closed",
740            if stream.is_closed() {
741                // If view.[[ByteLength]] is not 0, throw a TypeError exception.
742                if view.byte_length() != 0 {
743                    return Err(Error::Type("view byte length is not 0".to_owned()));
744                }
745            } else {
746                // Assert: state is "readable".
747                assert!(stream.is_readable());
748
749                // If view.[[ByteLength]] is 0, throw a TypeError exception.
750                if view.byte_length() == 0 {
751                    return Err(Error::Type("view byte length is 0".to_owned()));
752                }
753            }
754
755            // If firstDescriptor’s byte offset + firstDescriptor’ bytes filled is not view.[[ByteOffset]],
756            // throw a RangeError exception.
757            if first_descriptor.byte_offset + first_descriptor.bytes_filled.get() !=
758                (view.get_byte_offset() as u64)
759            {
760                return Err(Error::Range(
761                    "firstDescriptor's byte offset + bytes filled is not view byte offset"
762                        .to_owned(),
763                ));
764            }
765
766            // If firstDescriptor’s buffer byte length is not view.[[ViewedArrayBuffer]].[[ByteLength]],
767            // throw a RangeError exception.
768            if first_descriptor.buffer_byte_length !=
769                (view.viewed_buffer_array_byte_length(cx) as u64)
770            {
771                return Err(Error::Range(
772                "firstDescriptor's buffer byte length is not view viewed buffer array byte length"
773                    .to_owned(),
774            ));
775            }
776
777            // If firstDescriptor’s bytes filled + view.[[ByteLength]] > firstDescriptor’s byte length,
778            // throw a RangeError exception.
779            if first_descriptor.bytes_filled.get() + (view.byte_length()) as u64 >
780                first_descriptor.byte_length
781            {
782                return Err(Error::Range(
783                    "bytes filled + view byte length > byte length".to_owned(),
784                ));
785            }
786
787            // Let viewByteLength be view.[[ByteLength]].
788            view_byte_length = view.byte_length();
789
790            // Set firstDescriptor’s buffer to ? TransferArrayBuffer(view.[[ViewedArrayBuffer]]).
791            first_descriptor.buffer = view
792                .get_array_buffer_view_buffer(cx)
793                .transfer_array_buffer(cx)?;
794        }
795
796        // Perform ? ReadableByteStreamControllerRespondInternal(controller, viewByteLength).
797        self.respond_internal(cx, view_byte_length as u64, can_gc)
798    }
799
800    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-get-desired-size>
801    pub(crate) fn get_desired_size(&self) -> Option<f64> {
802        // Let state be controller.[[stream]].[[state]].
803        let stream = self.stream.get()?;
804
805        // If state is "errored", return null.
806        if stream.is_errored() {
807            return None;
808        }
809
810        // If state is "closed", return 0.
811        if stream.is_closed() {
812            return Some(0.0);
813        }
814
815        // Return controller.[[strategyHWM]] − controller.[[queueTotalSize]].
816        Some(self.strategy_hwm - self.queue_total_size.get())
817    }
818
819    /// <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamcontrollergetbyobrequest>
820    pub(crate) fn get_byob_request(
821        &self,
822        cx: SafeJSContext,
823        can_gc: CanGc,
824    ) -> Fallible<Option<DomRoot<ReadableStreamBYOBRequest>>> {
825        // If controller.[[byobRequest]] is null and controller.[[pendingPullIntos]] is not empty,
826        let pending_pull_intos = self.pending_pull_intos.borrow();
827        if self.byob_request.get().is_none() && !pending_pull_intos.is_empty() {
828            // Let firstDescriptor be controller.[[pendingPullIntos]][0].
829            let first_descriptor = pending_pull_intos.first().unwrap();
830            // Let view be ! Construct(%Uint8Array%, « firstDescriptor’s buffer,
831            // firstDescriptor’s byte offset + firstDescriptor’s bytes filled,
832            // firstDescriptor’s byte length − firstDescriptor’s bytes filled »).
833
834            let byte_offset = first_descriptor.byte_offset + first_descriptor.bytes_filled.get();
835            let byte_length = first_descriptor.byte_length - first_descriptor.bytes_filled.get();
836
837            let view = create_buffer_source_with_constructor(
838                cx,
839                &Constructor::Name(Type::Uint8),
840                &first_descriptor.buffer,
841                byte_offset as usize,
842                byte_length as usize,
843            )
844            .expect("Construct Uint8Array failed");
845
846            // Let byobRequest be a new ReadableStreamBYOBRequest.
847            let byob_request = ReadableStreamBYOBRequest::new(&self.global(), can_gc);
848
849            // Set byobRequest.[[controller]] to controller.
850            byob_request.set_controller(Some(&DomRoot::from_ref(self)));
851
852            // Set byobRequest.[[view]] to view.
853            byob_request.set_view(Some(view));
854
855            // Set controller.[[byobRequest]] to byobRequest.
856            self.byob_request.set(Some(&byob_request));
857        }
858
859        // Return controller.[[byobRequest]].
860        Ok(self.byob_request.get())
861    }
862
863    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-close>
864    pub(crate) fn close(&self, cx: SafeJSContext, can_gc: CanGc) -> Fallible<()> {
865        // Let stream be controller.[[stream]].
866        let stream = self.stream.get().unwrap();
867
868        // If controller.[[closeRequested]] is true or stream.[[state]] is not "readable", return.
869        if self.close_requested.get() || !stream.is_readable() {
870            return Ok(());
871        }
872
873        // If controller.[[queueTotalSize]] > 0,
874        if self.queue_total_size.get() > 0.0 {
875            // Set controller.[[closeRequested]] to true.
876            self.close_requested.set(true);
877            // Return.
878            return Ok(());
879        }
880
881        // If controller.[[pendingPullIntos]] is not empty,
882        let pending_pull_intos = self.pending_pull_intos.borrow();
883        if !pending_pull_intos.is_empty() {
884            // Let firstPendingPullInto be controller.[[pendingPullIntos]][0].
885            let first_pending_pull_into = pending_pull_intos.first().unwrap();
886
887            // If the remainder after dividing firstPendingPullInto’s bytes filled by
888            // firstPendingPullInto’s element size is not 0,
889            if first_pending_pull_into.bytes_filled.get() % first_pending_pull_into.element_size !=
890                0
891            {
892                // needed to drop the borrow and avoid BorrowMutError
893                drop(pending_pull_intos);
894
895                // Let e be a new TypeError exception.
896                let e = Error::Type(
897                    "remainder after dividing firstPendingPullInto's bytes
898                    filled by firstPendingPullInto's element size is not 0"
899                        .to_owned(),
900                );
901
902                // Perform ! ReadableByteStreamControllerError(controller, e).
903                rooted!(in(*cx) let mut error = UndefinedValue());
904                e.clone()
905                    .to_jsval(cx, &self.global(), error.handle_mut(), can_gc);
906                self.error(error.handle(), can_gc);
907
908                // Throw e.
909                return Err(e);
910            }
911        }
912
913        // Perform ! ReadableByteStreamControllerClearAlgorithms(controller).
914        self.clear_algorithms();
915
916        // Perform ! ReadableStreamClose(stream).
917        stream.close(can_gc);
918        Ok(())
919    }
920
921    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-error>
922    pub(crate) fn error(&self, e: SafeHandleValue, can_gc: CanGc) {
923        // Let stream be controller.[[stream]].
924        let stream = self.stream.get().unwrap();
925
926        // If stream.[[state]] is not "readable", return.
927        if !stream.is_readable() {
928            return;
929        }
930
931        // Perform ! ReadableByteStreamControllerClearPendingPullIntos(controller).
932        self.clear_pending_pull_intos();
933
934        // Perform ! ResetQueue(controller).
935        self.reset_queue();
936
937        // Perform ! ReadableByteStreamControllerClearAlgorithms(controller).
938        self.clear_algorithms();
939
940        // Perform ! ReadableStreamError(stream, e).
941        stream.error(e, can_gc);
942    }
943
944    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-clear-algorithms>
945    fn clear_algorithms(&self) {
946        // Set controller.[[pullAlgorithm]] to undefined.
947        // Set controller.[[cancelAlgorithm]] to undefined.
948        self.underlying_source.set(None);
949    }
950
951    /// <https://streams.spec.whatwg.org/#reset-queue>
952    pub(crate) fn reset_queue(&self) {
953        // Assert: container has [[queue]] and [[queueTotalSize]] internal slots.
954
955        // Set container.[[queue]] to a new empty list.
956        self.queue.borrow_mut().clear();
957
958        // Set container.[[queueTotalSize]] to 0.
959        self.queue_total_size.set(0.0);
960    }
961
962    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-clear-pending-pull-intos>
963    pub(crate) fn clear_pending_pull_intos(&self) {
964        // Perform ! ReadableByteStreamControllerInvalidateBYOBRequest(controller).
965        self.invalidate_byob_request();
966
967        // Set controller.[[pendingPullIntos]] to a new empty list.
968        self.pending_pull_intos.borrow_mut().clear();
969    }
970
971    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-invalidate-byob-request>
972    pub(crate) fn invalidate_byob_request(&self) {
973        if let Some(byob_request) = self.byob_request.get() {
974            // Set controller.[[byobRequest]].[[controller]] to undefined.
975            byob_request.set_controller(None);
976
977            // Set controller.[[byobRequest]].[[view]] to null.
978            byob_request.set_view(None);
979
980            // Set controller.[[byobRequest]] to null.
981            self.byob_request.set(None);
982        }
983        // If controller.[[byobRequest]] is null, return.
984    }
985
986    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-enqueue>
987    pub(crate) fn enqueue(
988        &self,
989        cx: SafeJSContext,
990        chunk: HeapBufferSource<ArrayBufferViewU8>,
991        can_gc: CanGc,
992    ) -> Fallible<()> {
993        // Let stream be controller.[[stream]].
994        let stream = self.stream.get().unwrap();
995
996        // If controller.[[closeRequested]] is true or stream.[[state]] is not "readable", return.
997        if self.close_requested.get() || !stream.is_readable() {
998            return Ok(());
999        }
1000
1001        // Let buffer be chunk.[[ViewedArrayBuffer]].
1002        let buffer = chunk.get_array_buffer_view_buffer(cx);
1003
1004        // Let byteOffset be chunk.[[ByteOffset]].
1005        let byte_offset = chunk.get_byte_offset();
1006
1007        // Let byteLength be chunk.[[ByteLength]].
1008        let byte_length = chunk.byte_length();
1009
1010        // If ! IsDetachedBuffer(buffer) is true, throw a TypeError exception.
1011        if buffer.is_detached_buffer(cx) {
1012            return Err(Error::Type("buffer is detached".to_owned()));
1013        }
1014
1015        // Let transferredBuffer be ? TransferArrayBuffer(buffer).
1016        let transferred_buffer = buffer.transfer_array_buffer(cx)?;
1017
1018        // If controller.[[pendingPullIntos]] is not empty,
1019        {
1020            let mut pending_pull_intos = self.pending_pull_intos.borrow_mut();
1021            if !pending_pull_intos.is_empty() {
1022                // Let firstPendingPullInto be controller.[[pendingPullIntos]][0].
1023                let first_descriptor = pending_pull_intos.first_mut().unwrap();
1024                // If ! IsDetachedBuffer(firstPendingPullInto’s buffer) is true, throw a TypeError exception.
1025                if first_descriptor.buffer.is_detached_buffer(cx) {
1026                    return Err(Error::Type("buffer is detached".to_owned()));
1027                }
1028
1029                // Perform ! ReadableByteStreamControllerInvalidateBYOBRequest(controller).
1030                self.invalidate_byob_request();
1031
1032                // Set firstPendingPullInto’s buffer to ! TransferArrayBuffer(firstPendingPullInto’s buffer).
1033                first_descriptor.buffer = first_descriptor
1034                    .buffer
1035                    .transfer_array_buffer(cx)
1036                    .expect("TransferArrayBuffer failed");
1037
1038                // If firstPendingPullInto’s reader type is "none",
1039                if first_descriptor.reader_type.is_none() {
1040                    // needed to drop the borrow and avoid BorrowMutError
1041                    drop(pending_pull_intos);
1042
1043                    // perform ? ReadableByteStreamControllerEnqueueDetachedPullIntoToQueue(
1044                    // controller, firstPendingPullInto).
1045                    self.enqueue_detached_pull_into_to_queue(cx, can_gc)?;
1046                }
1047            }
1048        }
1049
1050        // If ! ReadableStreamHasDefaultReader(stream) is true,
1051        if stream.has_default_reader() {
1052            // Perform ! ReadableByteStreamControllerProcessReadRequestsUsingQueue(controller).
1053            self.process_read_requests_using_queue(cx, can_gc)
1054                .expect("process_read_requests_using_queue failed");
1055
1056            // If ! ReadableStreamGetNumReadRequests(stream) is 0,
1057            if stream.get_num_read_requests() == 0 {
1058                // Assert: controller.[[pendingPullIntos]] is empty.
1059                {
1060                    assert!(self.pending_pull_intos.borrow().is_empty());
1061                }
1062
1063                // Perform ! ReadableByteStreamControllerEnqueueChunkToQueue(
1064                // controller, transferredBuffer, byteOffset, byteLength).
1065                self.enqueue_chunk_to_queue(transferred_buffer, byte_offset, byte_length);
1066            } else {
1067                // Assert: controller.[[queue]] is empty.
1068                assert!(self.queue.borrow().is_empty());
1069
1070                // If controller.[[pendingPullIntos]] is not empty,
1071
1072                let pending_pull_intos = self.pending_pull_intos.borrow();
1073                if !pending_pull_intos.is_empty() {
1074                    // Assert: controller.[[pendingPullIntos]][0]'s reader type is "default".
1075                    assert!(matches!(
1076                        pending_pull_intos.first().unwrap().reader_type,
1077                        Some(ReaderType::Default)
1078                    ));
1079
1080                    // needed to drop the borrow and avoid BorrowMutError
1081                    drop(pending_pull_intos);
1082
1083                    // Perform ! ReadableByteStreamControllerShiftPendingPullInto(controller).
1084                    self.shift_pending_pull_into();
1085                }
1086
1087                // Let transferredView be ! Construct(%Uint8Array%, « transferredBuffer, byteOffset, byteLength »).
1088                let transferred_view = create_buffer_source_with_constructor(
1089                    cx,
1090                    &Constructor::Name(Type::Uint8),
1091                    &transferred_buffer,
1092                    byte_offset,
1093                    byte_length,
1094                )
1095                .expect("Construct Uint8Array failed");
1096
1097                // Perform ! ReadableStreamFulfillReadRequest(stream, transferredView, false).
1098                rooted!(in(*cx) let mut view_value = UndefinedValue());
1099                transferred_view.get_buffer_view_value(cx, view_value.handle_mut());
1100                stream.fulfill_read_request(view_value.handle(), false, can_gc);
1101            }
1102            // Otherwise, if ! ReadableStreamHasBYOBReader(stream) is true,
1103        } else if stream.has_byob_reader() {
1104            // Perform ! ReadableByteStreamControllerEnqueueChunkToQueue(
1105            // controller, transferredBuffer, byteOffset, byteLength).
1106            self.enqueue_chunk_to_queue(transferred_buffer, byte_offset, byte_length);
1107
1108            // Let filledPullIntos be the result of performing !
1109            // ReadableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(controller).
1110            let filled_pull_intos = self.process_pull_into_descriptors_using_queue(cx);
1111
1112            // For each filledPullInto of filledPullIntos,
1113            // Perform ! ReadableByteStreamControllerCommitPullIntoDescriptor(stream, filledPullInto).
1114            for filled_pull_into in filled_pull_intos {
1115                self.commit_pull_into_descriptor(cx, &filled_pull_into, can_gc)
1116                    .expect("commit_pull_into_descriptor failed");
1117            }
1118        } else {
1119            // Assert: ! IsReadableStreamLocked(stream) is false.
1120            assert!(!stream.is_locked());
1121
1122            // Perform ! ReadableByteStreamControllerEnqueueChunkToQueue
1123            // (controller, transferredBuffer, byteOffset, byteLength).
1124            self.enqueue_chunk_to_queue(transferred_buffer, byte_offset, byte_length);
1125        }
1126
1127        // Perform ! ReadableByteStreamControllerCallPullIfNeeded(controller).
1128        self.call_pull_if_needed(can_gc);
1129
1130        Ok(())
1131    }
1132
1133    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-commit-pull-into-descriptor>
1134    pub(crate) fn commit_pull_into_descriptor(
1135        &self,
1136        cx: SafeJSContext,
1137        pull_into_descriptor: &PullIntoDescriptor,
1138        can_gc: CanGc,
1139    ) -> Fallible<()> {
1140        // Assert: stream.[[state]] is not "errored".
1141        let stream = self.stream.get().unwrap();
1142        assert!(!stream.is_errored());
1143
1144        // Assert: pullIntoDescriptor.reader type is not "none".
1145        assert!(pull_into_descriptor.reader_type.is_some());
1146
1147        // Let done be false.
1148        let mut done = false;
1149
1150        // If stream.[[state]] is "closed",
1151        if stream.is_closed() {
1152            // Assert: the remainder after dividing pullIntoDescriptor’s bytes filled
1153            // by pullIntoDescriptor’s element size is 0.
1154            assert!(
1155                pull_into_descriptor.bytes_filled.get() % pull_into_descriptor.element_size == 0
1156            );
1157
1158            // Set done to true.
1159            done = true;
1160        }
1161
1162        // Let filledView be ! ReadableByteStreamControllerConvertPullIntoDescriptor(pullIntoDescriptor).
1163        let filled_view = self
1164            .convert_pull_into_descriptor(cx, pull_into_descriptor)
1165            .expect("convert_pull_into_descriptor failed");
1166
1167        rooted!(in(*cx) let mut view_value = UndefinedValue());
1168        filled_view.get_buffer_view_value(cx, view_value.handle_mut());
1169
1170        // If pullIntoDescriptor’s reader type is "default",
1171        if matches!(pull_into_descriptor.reader_type, Some(ReaderType::Default)) {
1172            // Perform ! ReadableStreamFulfillReadRequest(stream, filledView, done).
1173
1174            stream.fulfill_read_request(view_value.handle(), done, can_gc);
1175        } else {
1176            // Assert: pullIntoDescriptor’s reader type is "byob".
1177            assert!(matches!(
1178                pull_into_descriptor.reader_type,
1179                Some(ReaderType::Byob)
1180            ));
1181
1182            // Perform ! ReadableStreamFulfillReadIntoRequest(stream, filledView, done).
1183            stream.fulfill_read_into_request(view_value.handle(), done, can_gc);
1184        }
1185        Ok(())
1186    }
1187
1188    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-convert-pull-into-descriptor>
1189    pub(crate) fn convert_pull_into_descriptor(
1190        &self,
1191        cx: SafeJSContext,
1192        pull_into_descriptor: &PullIntoDescriptor,
1193    ) -> Fallible<HeapBufferSource<ArrayBufferViewU8>> {
1194        // Let bytesFilled be pullIntoDescriptor’s bytes filled.
1195        let bytes_filled = pull_into_descriptor.bytes_filled.get();
1196
1197        // Let elementSize be pullIntoDescriptor’s element size.
1198        let element_size = pull_into_descriptor.element_size;
1199
1200        // Assert: bytesFilled ≤ pullIntoDescriptor’s byte length.
1201        assert!(bytes_filled <= pull_into_descriptor.byte_length);
1202
1203        // Assert: the remainder after dividing bytesFilled by elementSize is 0.
1204        assert!(bytes_filled % element_size == 0);
1205
1206        // Let buffer be ! TransferArrayBuffer(pullIntoDescriptor’s buffer).
1207        let buffer = pull_into_descriptor
1208            .buffer
1209            .transfer_array_buffer(cx)
1210            .expect("TransferArrayBuffer failed");
1211
1212        // Return ! Construct(pullIntoDescriptor’s view constructor,
1213        // « buffer, pullIntoDescriptor’s byte offset, bytesFilled ÷ elementSize »).
1214        Ok(create_buffer_source_with_constructor(
1215            cx,
1216            &pull_into_descriptor.view_constructor,
1217            &buffer,
1218            pull_into_descriptor.byte_offset as usize,
1219            (bytes_filled / element_size) as usize,
1220        )
1221        .expect("Construct view failed"))
1222    }
1223
1224    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-process-pull-into-descriptors-using-queue>
1225    pub(crate) fn process_pull_into_descriptors_using_queue(
1226        &self,
1227        cx: SafeJSContext,
1228    ) -> Vec<PullIntoDescriptor> {
1229        // Assert: controller.[[closeRequested]] is false.
1230        assert!(!self.close_requested.get());
1231
1232        // Let filledPullIntos be a new empty list.
1233        let mut filled_pull_intos = Vec::new();
1234
1235        // While controller.[[pendingPullIntos]] is not empty,
1236        loop {
1237            // If controller.[[queueTotalSize]] is 0, then break.
1238            if self.queue_total_size.get() == 0.0 {
1239                break;
1240            }
1241
1242            // Let pullIntoDescriptor be controller.[[pendingPullIntos]][0].
1243            let fill_pull_result = {
1244                let pending_pull_intos = self.pending_pull_intos.borrow();
1245                let Some(pull_into_descriptor) = pending_pull_intos.first() else {
1246                    break;
1247                };
1248                self.fill_pull_into_descriptor_from_queue(cx, pull_into_descriptor)
1249            };
1250
1251            // If ! ReadableByteStreamControllerFillPullIntoDescriptorFromQueue(controller, pullIntoDescriptor) is true,
1252            if fill_pull_result {
1253                // Perform ! ReadableByteStreamControllerShiftPendingPullInto(controller).
1254                let pull_into_descriptor = self.shift_pending_pull_into();
1255
1256                // Append pullIntoDescriptor to filledPullIntos.
1257                filled_pull_intos.push(pull_into_descriptor);
1258            }
1259        }
1260
1261        // Return filledPullIntos.
1262        filled_pull_intos
1263    }
1264
1265    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-fill-pull-into-descriptor-from-queue>
1266    pub(crate) fn fill_pull_into_descriptor_from_queue(
1267        &self,
1268        cx: SafeJSContext,
1269        pull_into_descriptor: &PullIntoDescriptor,
1270    ) -> bool {
1271        // Let maxBytesToCopy be min(controller.[[queueTotalSize]],
1272        // pullIntoDescriptor’s byte length − pullIntoDescriptor’s bytes filled).
1273        let max_bytes_to_copy = min(
1274            self.queue_total_size.get() as usize,
1275            (pull_into_descriptor.byte_length - pull_into_descriptor.bytes_filled.get()) as usize,
1276        );
1277
1278        // Let maxBytesFilled be pullIntoDescriptor’s bytes filled + maxBytesToCopy.
1279        let max_bytes_filled = pull_into_descriptor.bytes_filled.get() as usize + max_bytes_to_copy;
1280
1281        // Let totalBytesToCopyRemaining be maxBytesToCopy.
1282        let mut total_bytes_to_copy_remaining = max_bytes_to_copy;
1283
1284        // Let ready be false.
1285        let mut ready = false;
1286
1287        // Assert: ! IsDetachedBuffer(pullIntoDescriptor’s buffer) is false.
1288        assert!(!pull_into_descriptor.buffer.is_detached_buffer(cx));
1289
1290        // Assert: pullIntoDescriptor’s bytes filled < pullIntoDescriptor’s minimum fill.
1291        assert!(pull_into_descriptor.bytes_filled.get() < pull_into_descriptor.minimum_fill);
1292
1293        // Let remainderBytes be the remainder after dividing maxBytesFilled by pullIntoDescriptor’s element size.
1294        let remainder_bytes = max_bytes_filled % pull_into_descriptor.element_size as usize;
1295
1296        // Let maxAlignedBytes be maxBytesFilled − remainderBytes.
1297        let max_aligned_bytes = max_bytes_filled - remainder_bytes;
1298
1299        // If maxAlignedBytes ≥ pullIntoDescriptor’s minimum fill,
1300        if max_aligned_bytes >= pull_into_descriptor.minimum_fill as usize {
1301            // Set totalBytesToCopyRemaining to maxAlignedBytes − pullIntoDescriptor’s bytes filled.
1302            total_bytes_to_copy_remaining =
1303                max_aligned_bytes - (pull_into_descriptor.bytes_filled.get() as usize);
1304
1305            // Set ready to true.
1306            ready = true;
1307        }
1308
1309        // Let queue be controller.[[queue]].
1310        let mut queue = self.queue.borrow_mut();
1311
1312        // While totalBytesToCopyRemaining > 0,
1313        while total_bytes_to_copy_remaining > 0 {
1314            // Let headOfQueue be queue[0].
1315            let head_of_queue = queue.front_mut().unwrap();
1316
1317            // Let bytesToCopy be min(totalBytesToCopyRemaining, headOfQueue’s byte length).
1318            let bytes_to_copy = total_bytes_to_copy_remaining.min(head_of_queue.byte_length);
1319
1320            // Let destStart be pullIntoDescriptor’s byte offset + pullIntoDescriptor’s bytes filled.
1321            let dest_start =
1322                pull_into_descriptor.byte_offset + pull_into_descriptor.bytes_filled.get();
1323
1324            // Let descriptorBuffer be pullIntoDescriptor’s buffer.
1325            let descriptor_buffer = &pull_into_descriptor.buffer;
1326
1327            // Let queueBuffer be headOfQueue’s buffer.
1328            let queue_buffer = &head_of_queue.buffer;
1329
1330            // Let queueByteOffset be headOfQueue’s byte offset.
1331            let queue_byte_offset = head_of_queue.byte_offset;
1332
1333            // Assert: ! CanCopyDataBlockBytes(descriptorBuffer, destStart,
1334            // queueBuffer, queueByteOffset, bytesToCopy) is true.
1335            assert!(descriptor_buffer.can_copy_data_block_bytes(
1336                cx,
1337                dest_start as usize,
1338                queue_buffer,
1339                queue_byte_offset,
1340                bytes_to_copy
1341            ));
1342
1343            // Perform ! CopyDataBlockBytes(descriptorBuffer.[[ArrayBufferData]], destStart,
1344            // queueBuffer.[[ArrayBufferData]], queueByteOffset, bytesToCopy).
1345            descriptor_buffer.copy_data_block_bytes(
1346                cx,
1347                dest_start as usize,
1348                queue_buffer,
1349                queue_byte_offset,
1350                bytes_to_copy,
1351            );
1352
1353            // If headOfQueue’s byte length is bytesToCopy,
1354            if head_of_queue.byte_length == bytes_to_copy {
1355                // Remove queue[0].
1356                queue.pop_front().unwrap();
1357            } else {
1358                // Set headOfQueue’s byte offset to headOfQueue’s byte offset + bytesToCopy.
1359                head_of_queue.byte_offset += bytes_to_copy;
1360
1361                // Set headOfQueue’s byte length to headOfQueue’s byte length − bytesToCopy.
1362                head_of_queue.byte_length -= bytes_to_copy;
1363            }
1364
1365            // Set controller.[[queueTotalSize]] to controller.[[queueTotalSize]] − bytesToCopy.
1366            self.queue_total_size
1367                .set(self.queue_total_size.get() - (bytes_to_copy as f64));
1368
1369            // Perform ! ReadableByteStreamControllerFillHeadPullIntoDescriptor(
1370            // controller, bytesToCopy, pullIntoDescriptor).
1371            self.fill_head_pull_into_descriptor(bytes_to_copy as u64, pull_into_descriptor);
1372
1373            // Set totalBytesToCopyRemaining to totalBytesToCopyRemaining − bytesToCopy.
1374            total_bytes_to_copy_remaining -= bytes_to_copy;
1375        }
1376
1377        // If ready is false,
1378        if !ready {
1379            // Assert: controller.[[queueTotalSize]] is 0.
1380            assert!(self.queue_total_size.get() == 0.0);
1381
1382            // Assert: pullIntoDescriptor’s bytes filled > 0.
1383            assert!(pull_into_descriptor.bytes_filled.get() > 0);
1384
1385            // Assert: pullIntoDescriptor’s bytes filled < pullIntoDescriptor’s minimum fill.
1386            assert!(pull_into_descriptor.bytes_filled.get() < pull_into_descriptor.minimum_fill);
1387        }
1388
1389        // Return ready.
1390        ready
1391    }
1392
1393    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-fill-head-pull-into-descriptor>
1394    pub(crate) fn fill_head_pull_into_descriptor(
1395        &self,
1396        bytes_copied: u64,
1397        pull_into_descriptor: &PullIntoDescriptor,
1398    ) {
1399        // Assert: either controller.[[pendingPullIntos]] is empty,
1400        // or controller.[[pendingPullIntos]][0] is pullIntoDescriptor.
1401        {
1402            let pending_pull_intos = self.pending_pull_intos.borrow();
1403            assert!(
1404                pending_pull_intos.is_empty() ||
1405                    pending_pull_intos.first().unwrap() == pull_into_descriptor
1406            );
1407        }
1408
1409        // Assert: controller.[[byobRequest]] is null.
1410        assert!(self.byob_request.get().is_none());
1411
1412        // Set pullIntoDescriptor’s bytes filled to bytes filled + size.
1413        pull_into_descriptor
1414            .bytes_filled
1415            .set(pull_into_descriptor.bytes_filled.get() + bytes_copied);
1416    }
1417
1418    /// <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamcontrollerenqueuedetachedpullintotoqueue>
1419    pub(crate) fn enqueue_detached_pull_into_to_queue(
1420        &self,
1421        cx: SafeJSContext,
1422        can_gc: CanGc,
1423    ) -> Fallible<()> {
1424        // first_descriptor: &PullIntoDescriptor,
1425        let pending_pull_intos = self.pending_pull_intos.borrow();
1426        let first_descriptor = pending_pull_intos.first().unwrap();
1427
1428        // Assert: pullIntoDescriptor’s reader type is "none".
1429        assert!(first_descriptor.reader_type.is_none());
1430
1431        // If pullIntoDescriptor’s bytes filled > 0, perform ?
1432        // ReadableByteStreamControllerEnqueueClonedChunkToQueue(controller,
1433        // pullIntoDescriptor’s buffer, pullIntoDescriptor’s byte offset, pullIntoDescriptor’s bytes filled).
1434
1435        if first_descriptor.bytes_filled.get() > 0 {
1436            self.enqueue_cloned_chunk_to_queue(
1437                cx,
1438                &first_descriptor.buffer,
1439                first_descriptor.byte_offset,
1440                first_descriptor.bytes_filled.get(),
1441                can_gc,
1442            )?;
1443        }
1444
1445        // needed to drop the borrow and avoid BorrowMutError
1446        drop(pending_pull_intos);
1447
1448        // Perform ! ReadableByteStreamControllerShiftPendingPullInto(controller).
1449        self.shift_pending_pull_into();
1450
1451        Ok(())
1452    }
1453
1454    /// <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamcontrollerenqueueclonedchunktoqueue>
1455    pub(crate) fn enqueue_cloned_chunk_to_queue(
1456        &self,
1457        cx: SafeJSContext,
1458        buffer: &HeapBufferSource<ArrayBufferU8>,
1459        byte_offset: u64,
1460        byte_length: u64,
1461        can_gc: CanGc,
1462    ) -> Fallible<()> {
1463        // Let cloneResult be CloneArrayBuffer(buffer, byteOffset, byteLength, %ArrayBuffer%).
1464        if let Ok(clone_result) =
1465            buffer.clone_array_buffer(cx, byte_offset as usize, byte_length as usize)
1466        {
1467            // Perform ! ReadableByteStreamControllerEnqueueChunkToQueue
1468            // (controller, cloneResult.[[Value]], 0, byteLength).
1469            self.enqueue_chunk_to_queue(clone_result, 0, byte_length as usize);
1470
1471            Ok(())
1472        } else {
1473            // If cloneResult is an abrupt completion,
1474
1475            // Perform ! ReadableByteStreamControllerError(controller, cloneResult.[[Value]]).
1476            rooted!(in(*cx) let mut rval = UndefinedValue());
1477            let error = Error::Type("can not clone array buffer".to_owned());
1478            error
1479                .clone()
1480                .to_jsval(cx, &self.global(), rval.handle_mut(), can_gc);
1481            self.error(rval.handle(), can_gc);
1482
1483            // Return cloneResult.
1484            Err(error)
1485        }
1486    }
1487
1488    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-enqueue-chunk-to-queue>
1489    pub(crate) fn enqueue_chunk_to_queue(
1490        &self,
1491        buffer: HeapBufferSource<ArrayBufferU8>,
1492        byte_offset: usize,
1493        byte_length: usize,
1494    ) {
1495        // Let entry be a new ReadableByteStreamQueueEntry object.
1496        let entry = QueueEntry::new(buffer, byte_offset, byte_length);
1497
1498        // Append entry to controller.[[queue]].
1499        self.queue.borrow_mut().push_back(entry);
1500
1501        // Set controller.[[queueTotalSize]] to controller.[[queueTotalSize]] + byteLength.
1502        self.queue_total_size
1503            .set(self.queue_total_size.get() + byte_length as f64);
1504    }
1505
1506    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-shift-pending-pull-into>
1507    pub(crate) fn shift_pending_pull_into(&self) -> PullIntoDescriptor {
1508        // Assert: controller.[[byobRequest]] is null.
1509        assert!(self.byob_request.get().is_none());
1510
1511        // Let descriptor be controller.[[pendingPullIntos]][0].
1512        // Remove descriptor from controller.[[pendingPullIntos]].
1513        // Return descriptor.
1514        self.pending_pull_intos.borrow_mut().remove(0)
1515    }
1516
1517    /// <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamcontrollerprocessreadrequestsusingqueue>
1518    pub(crate) fn process_read_requests_using_queue(
1519        &self,
1520        cx: SafeJSContext,
1521        can_gc: CanGc,
1522    ) -> Fallible<()> {
1523        // Let reader be controller.[[stream]].[[reader]].
1524        // Assert: reader implements ReadableStreamDefaultReader.
1525        let reader = self.stream.get().unwrap().get_default_reader();
1526
1527        // Step 3
1528        reader.process_read_requests(cx, DomRoot::from_ref(self), can_gc)
1529    }
1530
1531    /// <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamcontrollerfillreadrequestfromqueue>
1532    pub(crate) fn fill_read_request_from_queue(
1533        &self,
1534        cx: SafeJSContext,
1535        read_request: &ReadRequest,
1536        can_gc: CanGc,
1537    ) -> Fallible<()> {
1538        // Assert: controller.[[queueTotalSize]] > 0.
1539        assert!(self.queue_total_size.get() > 0.0);
1540        // Also assert that the queue has a non-zero length;
1541        assert!(!self.queue.borrow().is_empty());
1542
1543        // Let entry be controller.[[queue]][0].
1544        // Remove entry from controller.[[queue]].
1545        let entry = self.remove_entry();
1546
1547        // Set controller.[[queueTotalSize]] to controller.[[queueTotalSize]] − entry’s byte length.
1548        self.queue_total_size
1549            .set(self.queue_total_size.get() - entry.byte_length as f64);
1550
1551        // Perform ! ReadableByteStreamControllerHandleQueueDrain(controller).
1552        self.handle_queue_drain(can_gc);
1553
1554        // Let view be ! Construct(%Uint8Array%, « entry’s buffer, entry’s byte offset, entry’s byte length »).
1555        let view = create_buffer_source_with_constructor(
1556            cx,
1557            &Constructor::Name(Type::Uint8),
1558            &entry.buffer,
1559            entry.byte_offset,
1560            entry.byte_length,
1561        )
1562        .expect("Construct Uint8Array failed");
1563
1564        // Perform readRequest’s chunk steps, given view.
1565        let result = RootedTraceableBox::new(Heap::default());
1566        rooted!(in(*cx) let mut view_value = UndefinedValue());
1567        view.get_buffer_view_value(cx, view_value.handle_mut());
1568        result.set(*view_value);
1569
1570        read_request.chunk_steps(result, &self.global(), can_gc);
1571
1572        Ok(())
1573    }
1574
1575    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-handle-queue-drain>
1576    pub(crate) fn handle_queue_drain(&self, can_gc: CanGc) {
1577        // Assert: controller.[[stream]].[[state]] is "readable".
1578        assert!(self.stream.get().unwrap().is_readable());
1579
1580        // If controller.[[queueTotalSize]] is 0 and controller.[[closeRequested]] is true,
1581        if self.queue_total_size.get() == 0.0 && self.close_requested.get() {
1582            // Perform ! ReadableByteStreamControllerClearAlgorithms(controller).
1583            self.clear_algorithms();
1584
1585            // Perform ! ReadableStreamClose(controller.[[stream]]).
1586            self.stream.get().unwrap().close(can_gc);
1587        } else {
1588            // Perform ! ReadableByteStreamControllerCallPullIfNeeded(controller).
1589            self.call_pull_if_needed(can_gc);
1590        }
1591    }
1592
1593    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-call-pull-if-needed>
1594    pub(crate) fn call_pull_if_needed(&self, can_gc: CanGc) {
1595        // Let shouldPull be ! ReadableByteStreamControllerShouldCallPull(controller).
1596        let should_pull = self.should_call_pull();
1597        // If shouldPull is false, return.
1598        if !should_pull {
1599            return;
1600        }
1601
1602        // If controller.[[pulling]] is true,
1603        if self.pulling.get() {
1604            // Set controller.[[pullAgain]] to true.
1605            self.pull_again.set(true);
1606
1607            // Return.
1608            return;
1609        }
1610
1611        // Assert: controller.[[pullAgain]] is false.
1612        assert!(!self.pull_again.get());
1613
1614        // Set controller.[[pulling]] to true.
1615        self.pulling.set(true);
1616
1617        // Let pullPromise be the result of performing controller.[[pullAlgorithm]].
1618        // Continues into the resolve and reject handling of the native handler.
1619        let global = self.global();
1620        let rooted_controller = DomRoot::from_ref(self);
1621        let controller = Controller::ReadableByteStreamController(rooted_controller.clone());
1622
1623        if let Some(underlying_source) = self.underlying_source.get() {
1624            let handler = PromiseNativeHandler::new(
1625                &global,
1626                Some(Box::new(PullAlgorithmFulfillmentHandler {
1627                    controller: Dom::from_ref(&rooted_controller),
1628                })),
1629                Some(Box::new(PullAlgorithmRejectionHandler {
1630                    controller: Dom::from_ref(&rooted_controller),
1631                })),
1632                can_gc,
1633            );
1634
1635            let realm = enter_realm(&*global);
1636            let comp = InRealm::Entered(&realm);
1637            let result = underlying_source
1638                .call_pull_algorithm(controller, &global, can_gc)
1639                .unwrap_or_else(|| {
1640                    let promise = Promise::new(&global, can_gc);
1641                    promise.resolve_native(&(), can_gc);
1642                    Ok(promise)
1643                });
1644            let promise = result.unwrap_or_else(|error| {
1645                let cx = GlobalScope::get_cx();
1646                rooted!(in(*cx) let mut rval = UndefinedValue());
1647                // TODO: check if `self.global()` is the right globalscope.
1648                error
1649                    .clone()
1650                    .to_jsval(cx, &self.global(), rval.handle_mut(), can_gc);
1651                let promise = Promise::new(&global, can_gc);
1652                promise.reject_native(&rval.handle(), can_gc);
1653                promise
1654            });
1655            promise.append_native_handler(&handler, comp, can_gc);
1656        }
1657    }
1658
1659    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-should-call-pull>
1660    fn should_call_pull(&self) -> bool {
1661        // Let stream be controller.[[stream]].
1662        // Note: the spec does not assert that stream is not undefined here,
1663        // so we return false if it is.
1664        let stream = self.stream.get().unwrap();
1665
1666        // If stream.[[state]] is not "readable", return false.
1667        if !stream.is_readable() {
1668            return false;
1669        }
1670
1671        // If controller.[[closeRequested]] is true, return false.
1672        if self.close_requested.get() {
1673            return false;
1674        }
1675
1676        // If controller.[[started]] is false, return false.
1677        if !self.started.get() {
1678            return false;
1679        }
1680
1681        // If ! ReadableStreamHasDefaultReader(stream) is true and ! ReadableStreamGetNumReadRequests(stream) > 0
1682        // , return true.
1683        if stream.has_default_reader() && stream.get_num_read_requests() > 0 {
1684            return true;
1685        }
1686
1687        // If ! ReadableStreamHasBYOBReader(stream) is true and ! ReadableStreamGetNumReadIntoRequests(stream) > 0
1688        // , return true.
1689        if stream.has_byob_reader() && stream.get_num_read_into_requests() > 0 {
1690            return true;
1691        }
1692
1693        // Let desiredSize be ! ReadableByteStreamControllerGetDesiredSize(controller).
1694        let desired_size = self.get_desired_size();
1695
1696        // Assert: desiredSize is not null.
1697        assert!(desired_size.is_some());
1698
1699        // If desiredSize > 0, return true.
1700        if desired_size.unwrap() > 0. {
1701            return true;
1702        }
1703
1704        // Return false.
1705        false
1706    }
1707    /// <https://streams.spec.whatwg.org/#set-up-readable-byte-stream-controller>
1708    pub(crate) fn setup(
1709        &self,
1710        global: &GlobalScope,
1711        stream: DomRoot<ReadableStream>,
1712        can_gc: CanGc,
1713    ) -> Fallible<()> {
1714        // Assert: stream.[[controller]] is undefined.
1715        stream.assert_no_controller();
1716
1717        // If autoAllocateChunkSize is not undefined,
1718        if self.auto_allocate_chunk_size.is_some() {
1719            // Assert: ! IsInteger(autoAllocateChunkSize) is true. Implicit
1720            // Assert: autoAllocateChunkSize is positive. (Implicit by type.)
1721        }
1722
1723        // Set controller.[[stream]] to stream.
1724        self.stream.set(Some(&stream));
1725
1726        // Set controller.[[pullAgain]] and controller.[[pulling]] to false.
1727        self.pull_again.set(false);
1728        self.pulling.set(false);
1729
1730        // Set controller.[[byobRequest]] to null.
1731        self.byob_request.set(None);
1732
1733        // Perform ! ResetQueue(controller).
1734        self.reset_queue();
1735
1736        // Set controller.[[closeRequested]] and controller.[[started]] to false.
1737        self.close_requested.set(false);
1738        self.started.set(false);
1739
1740        // Set controller.[[strategyHWM]] to highWaterMark.
1741        // Set controller.[[pullAlgorithm]] to pullAlgorithm.
1742        // Set controller.[[cancelAlgorithm]] to cancelAlgorithm.
1743        // Set controller.[[autoAllocateChunkSize]] to autoAllocateChunkSize.
1744        // Set controller.[[pendingPullIntos]] to a new empty list.
1745        // Note: the above steps are done in `new`.
1746
1747        // Set stream.[[controller]] to controller.
1748        let rooted_byte_controller = DomRoot::from_ref(self);
1749        stream.set_byte_controller(&rooted_byte_controller);
1750
1751        if let Some(underlying_source) = rooted_byte_controller.underlying_source.get() {
1752            // Let startResult be the result of performing startAlgorithm. (This might throw an exception.)
1753            let start_result = underlying_source
1754                .call_start_algorithm(
1755                    Controller::ReadableByteStreamController(rooted_byte_controller.clone()),
1756                    can_gc,
1757                )
1758                .unwrap_or_else(|| {
1759                    let promise = Promise::new(global, can_gc);
1760                    promise.resolve_native(&(), can_gc);
1761                    Ok(promise)
1762                });
1763
1764            // Let startPromise be a promise resolved with startResult.
1765            let start_promise = start_result?;
1766
1767            // Upon fulfillment of startPromise, Upon rejection of startPromise with reason r,
1768            let handler = PromiseNativeHandler::new(
1769                global,
1770                Some(Box::new(StartAlgorithmFulfillmentHandler {
1771                    controller: Dom::from_ref(&rooted_byte_controller),
1772                })),
1773                Some(Box::new(StartAlgorithmRejectionHandler {
1774                    controller: Dom::from_ref(&rooted_byte_controller),
1775                })),
1776                can_gc,
1777            );
1778            let realm = enter_realm(global);
1779            let comp = InRealm::Entered(&realm);
1780            start_promise.append_native_handler(&handler, comp, can_gc);
1781        };
1782
1783        Ok(())
1784    }
1785
1786    // <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamcontroller-releasesteps
1787    pub(crate) fn perform_release_steps(&self) -> Fallible<()> {
1788        // If this.[[pendingPullIntos]] is not empty,
1789        let mut pending_pull_intos = self.pending_pull_intos.borrow_mut();
1790        if !pending_pull_intos.is_empty() {
1791            // Let firstPendingPullInto be this.[[pendingPullIntos]][0].
1792            let mut first_pending_pull_into = pending_pull_intos.remove(0);
1793
1794            // Set firstPendingPullInto’s reader type to "none".
1795            first_pending_pull_into.reader_type = None;
1796
1797            // Set this.[[pendingPullIntos]] to the list « firstPendingPullInto »
1798            pending_pull_intos.clear();
1799            pending_pull_intos.push(first_pending_pull_into);
1800        }
1801        Ok(())
1802    }
1803
1804    /// <https://streams.spec.whatwg.org/#rbs-controller-private-cancel>
1805    pub(crate) fn perform_cancel_steps(
1806        &self,
1807        cx: SafeJSContext,
1808        global: &GlobalScope,
1809        reason: SafeHandleValue,
1810        can_gc: CanGc,
1811    ) -> Rc<Promise> {
1812        // Perform ! ReadableByteStreamControllerClearPendingPullIntos(this).
1813        self.clear_pending_pull_intos();
1814
1815        // Perform ! ResetQueue(this).
1816        self.reset_queue();
1817
1818        let underlying_source = self
1819            .underlying_source
1820            .get()
1821            .expect("Controller should have a source when the cancel steps are called into.");
1822
1823        // Let result be the result of performing this.[[cancelAlgorithm]], passing in reason.
1824        let result = underlying_source
1825            .call_cancel_algorithm(cx, global, reason, can_gc)
1826            .unwrap_or_else(|| {
1827                let promise = Promise::new(global, can_gc);
1828                promise.resolve_native(&(), can_gc);
1829                Ok(promise)
1830            });
1831
1832        let promise = result.unwrap_or_else(|error| {
1833            let cx = GlobalScope::get_cx();
1834            rooted!(in(*cx) let mut rval = UndefinedValue());
1835            error
1836                .clone()
1837                .to_jsval(cx, global, rval.handle_mut(), can_gc);
1838            let promise = Promise::new(global, can_gc);
1839            promise.reject_native(&rval.handle(), can_gc);
1840            promise
1841        });
1842
1843        // Perform ! ReadableByteStreamControllerClearAlgorithms(this).
1844        self.clear_algorithms();
1845
1846        // Return result(the promise).
1847        promise
1848    }
1849
1850    /// <https://streams.spec.whatwg.org/#rbs-controller-private-pull>
1851    pub(crate) fn perform_pull_steps(
1852        &self,
1853        cx: SafeJSContext,
1854        read_request: &ReadRequest,
1855        can_gc: CanGc,
1856    ) {
1857        // Let stream be this.[[stream]].
1858        let stream = self.stream.get().unwrap();
1859
1860        // Assert: ! ReadableStreamHasDefaultReader(stream) is true.
1861        assert!(stream.has_default_reader());
1862
1863        // If this.[[queueTotalSize]] > 0,
1864        if self.queue_total_size.get() > 0.0 {
1865            // Assert: ! ReadableStreamGetNumReadRequests(stream) is 0.
1866            assert_eq!(stream.get_num_read_requests(), 0);
1867
1868            // Perform ! ReadableByteStreamControllerFillReadRequestFromQueue(this, readRequest).
1869            let _ = self.fill_read_request_from_queue(cx, read_request, can_gc);
1870
1871            // Return.
1872            return;
1873        }
1874
1875        // Let autoAllocateChunkSize be this.[[autoAllocateChunkSize]].
1876        let auto_allocate_chunk_size = self.auto_allocate_chunk_size;
1877
1878        // If autoAllocateChunkSize is not undefined,
1879        if let Some(auto_allocate_chunk_size) = auto_allocate_chunk_size {
1880            // create_array_buffer_with_size
1881            // Let buffer be Construct(%ArrayBuffer%, « autoAllocateChunkSize »).
1882            match create_array_buffer_with_size(cx, auto_allocate_chunk_size as usize) {
1883                Ok(buffer) => {
1884                    // Let pullIntoDescriptor be a new pull-into descriptor with
1885                    // buffer buffer.[[Value]]
1886                    // buffer byte length autoAllocateChunkSize
1887                    // byte offset  0
1888                    // byte length  autoAllocateChunkSize
1889                    // bytes filled  0
1890                    // minimum fill 1
1891                    // element size 1
1892                    // view constructor %Uint8Array%
1893                    // reader type  "default"
1894                    let pull_into_descriptor = PullIntoDescriptor {
1895                        buffer,
1896                        buffer_byte_length: auto_allocate_chunk_size,
1897                        byte_length: auto_allocate_chunk_size,
1898                        byte_offset: 0,
1899                        bytes_filled: Cell::new(0),
1900                        minimum_fill: 1,
1901                        element_size: 1,
1902                        view_constructor: Constructor::Name(Type::Uint8),
1903                        reader_type: Some(ReaderType::Default),
1904                    };
1905
1906                    // Append pullIntoDescriptor to this.[[pendingPullIntos]].
1907                    self.pending_pull_intos
1908                        .borrow_mut()
1909                        .push(pull_into_descriptor);
1910                },
1911                Err(error) => {
1912                    // If buffer is an abrupt completion,
1913                    // Perform readRequest’s error steps, given buffer.[[Value]].
1914
1915                    rooted!(in(*cx) let mut rval = UndefinedValue());
1916                    error
1917                        .clone()
1918                        .to_jsval(cx, &self.global(), rval.handle_mut(), can_gc);
1919                    read_request.error_steps(rval.handle(), can_gc);
1920
1921                    // Return.
1922                    return;
1923                },
1924            }
1925        }
1926
1927        // Perform ! ReadableStreamAddReadRequest(stream, readRequest).
1928        stream.add_read_request(read_request);
1929
1930        // Perform ! ReadableByteStreamControllerCallPullIfNeeded(this).
1931        self.call_pull_if_needed(can_gc);
1932    }
1933
1934    /// Setting the JS object after the heap has settled down.
1935    pub(crate) fn set_underlying_source_this_object(&self, this_object: HandleObject) {
1936        if let Some(underlying_source) = self.underlying_source.get() {
1937            underlying_source.set_underlying_source_this_object(this_object);
1938        }
1939    }
1940
1941    pub(crate) fn remove_entry(&self) -> QueueEntry {
1942        self.queue
1943            .borrow_mut()
1944            .pop_front()
1945            .expect("Reader must have read request when remove is called into.")
1946    }
1947
1948    pub(crate) fn get_queue_total_size(&self) -> f64 {
1949        self.queue_total_size.get()
1950    }
1951
1952    pub(crate) fn get_pending_pull_intos_size(&self) -> usize {
1953        self.pending_pull_intos.borrow().len()
1954    }
1955}
1956
1957impl ReadableByteStreamControllerMethods<crate::DomTypeHolder> for ReadableByteStreamController {
1958    /// <https://streams.spec.whatwg.org/#rbs-controller-byob-request>
1959    fn GetByobRequest(
1960        &self,
1961        can_gc: CanGc,
1962    ) -> Fallible<Option<DomRoot<ReadableStreamBYOBRequest>>> {
1963        let cx = GlobalScope::get_cx();
1964        // Return ! ReadableByteStreamControllerGetBYOBRequest(this).
1965        self.get_byob_request(cx, can_gc)
1966    }
1967
1968    /// <https://streams.spec.whatwg.org/#rbs-controller-desired-size>
1969    fn GetDesiredSize(&self) -> Option<f64> {
1970        // Return ! ReadableByteStreamControllerGetDesiredSize(this).
1971        self.get_desired_size()
1972    }
1973
1974    /// <https://streams.spec.whatwg.org/#rbs-controller-close>
1975    fn Close(&self, can_gc: CanGc) -> Fallible<()> {
1976        let cx = GlobalScope::get_cx();
1977        // If this.[[closeRequested]] is true, throw a TypeError exception.
1978        if self.close_requested.get() {
1979            return Err(Error::Type("closeRequested is true".to_owned()));
1980        }
1981
1982        // If this.[[stream]].[[state]] is not "readable", throw a TypeError exception.
1983        if !self.stream.get().unwrap().is_readable() {
1984            return Err(Error::Type("stream is not readable".to_owned()));
1985        }
1986
1987        // Perform ? ReadableByteStreamControllerClose(this).
1988        self.close(cx, can_gc)
1989    }
1990
1991    /// <https://streams.spec.whatwg.org/#rbs-controller-enqueue>
1992    fn Enqueue(
1993        &self,
1994        chunk: js::gc::CustomAutoRooterGuard<js::typedarray::ArrayBufferView>,
1995        can_gc: CanGc,
1996    ) -> Fallible<()> {
1997        let cx = GlobalScope::get_cx();
1998
1999        let chunk = HeapBufferSource::<ArrayBufferViewU8>::from_view(chunk);
2000
2001        // If chunk.[[ByteLength]] is 0, throw a TypeError exception.
2002        if chunk.byte_length() == 0 {
2003            return Err(Error::Type("chunk.ByteLength is 0".to_owned()));
2004        }
2005
2006        // If chunk.[[ViewedArrayBuffer]].[[ByteLength]] is 0, throw a TypeError exception.
2007        if chunk.viewed_buffer_array_byte_length(cx) == 0 {
2008            return Err(Error::Type(
2009                "chunk.ViewedArrayBuffer.ByteLength is 0".to_owned(),
2010            ));
2011        }
2012
2013        // If this.[[closeRequested]] is true, throw a TypeError exception.
2014        if self.close_requested.get() {
2015            return Err(Error::Type("closeRequested is true".to_owned()));
2016        }
2017
2018        // If this.[[stream]].[[state]] is not "readable", throw a TypeError exception.
2019        if !self.stream.get().unwrap().is_readable() {
2020            return Err(Error::Type("stream is not readable".to_owned()));
2021        }
2022
2023        // Return ? ReadableByteStreamControllerEnqueue(this, chunk).
2024        self.enqueue(cx, chunk, can_gc)
2025    }
2026
2027    /// <https://streams.spec.whatwg.org/#rbs-controller-error>
2028    fn Error(&self, _cx: SafeJSContext, e: SafeHandleValue, can_gc: CanGc) -> Fallible<()> {
2029        // Perform ! ReadableByteStreamControllerError(this, e).
2030        self.error(e, can_gc);
2031        Ok(())
2032    }
2033}