script/dom/
readablebytestreamcontroller.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4
5use std::cell::Cell;
6use std::cmp::min;
7use std::collections::VecDeque;
8use std::rc::Rc;
9
10use dom_struct::dom_struct;
11use js::jsapi::{Heap, Type};
12use js::jsval::UndefinedValue;
13use js::realm::CurrentRealm;
14use js::rust::{HandleObject, HandleValue as SafeHandleValue, HandleValue};
15use js::typedarray::{ArrayBufferU8, ArrayBufferViewU8};
16
17use super::bindings::buffer_source::HeapBufferSource;
18use super::bindings::cell::DomRefCell;
19use super::bindings::reflector::reflect_dom_object;
20use super::bindings::root::Dom;
21use super::readablestreambyobreader::ReadIntoRequest;
22use super::readablestreamdefaultreader::ReadRequest;
23use super::underlyingsourcecontainer::{UnderlyingSourceContainer, UnderlyingSourceType};
24use crate::dom::bindings::buffer_source::{
25    Constructor, byte_size, create_array_buffer_with_size, create_buffer_source_with_constructor,
26};
27use crate::dom::bindings::codegen::Bindings::ReadableByteStreamControllerBinding::ReadableByteStreamControllerMethods;
28use crate::dom::bindings::codegen::UnionTypes::ReadableStreamDefaultControllerOrReadableByteStreamController as Controller;
29use crate::dom::bindings::error::{Error, ErrorToJsval, Fallible};
30use crate::dom::bindings::reflector::{DomGlobal, Reflector};
31use crate::dom::bindings::root::{DomRoot, MutNullableDom};
32use crate::dom::bindings::trace::RootedTraceableBox;
33use crate::dom::globalscope::GlobalScope;
34use crate::dom::promise::Promise;
35use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
36use crate::dom::readablestream::ReadableStream;
37use crate::dom::readablestreambyobrequest::ReadableStreamBYOBRequest;
38use crate::realms::{InRealm, enter_realm};
39use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
40
41/// <https://streams.spec.whatwg.org/#readable-byte-stream-queue-entry>
42#[derive(JSTraceable, MallocSizeOf)]
43pub(crate) struct QueueEntry {
44    /// <https://streams.spec.whatwg.org/#readable-byte-stream-queue-entry-buffer>
45    #[ignore_malloc_size_of = "HeapBufferSource"]
46    buffer: HeapBufferSource<ArrayBufferU8>,
47    /// <https://streams.spec.whatwg.org/#readable-byte-stream-queue-entry-byte-offset>
48    byte_offset: usize,
49    /// <https://streams.spec.whatwg.org/#readable-byte-stream-queue-entry-byte-length>
50    byte_length: usize,
51}
52
53impl QueueEntry {
54    pub(crate) fn new(
55        buffer: HeapBufferSource<ArrayBufferU8>,
56        byte_offset: usize,
57        byte_length: usize,
58    ) -> QueueEntry {
59        QueueEntry {
60            buffer,
61            byte_offset,
62            byte_length,
63        }
64    }
65}
66
67#[derive(Debug, Eq, JSTraceable, MallocSizeOf, PartialEq)]
68pub(crate) enum ReaderType {
69    /// <https://streams.spec.whatwg.org/#readablestreambyobreader>
70    Byob,
71    /// <https://streams.spec.whatwg.org/#readablestreamdefaultreader>
72    Default,
73}
74
75/// <https://streams.spec.whatwg.org/#pull-into-descriptor>
76#[derive(Eq, JSTraceable, MallocSizeOf, PartialEq)]
77pub(crate) struct PullIntoDescriptor {
78    #[ignore_malloc_size_of = "HeapBufferSource"]
79    /// <https://streams.spec.whatwg.org/#pull-into-descriptor-buffer>
80    buffer: HeapBufferSource<ArrayBufferU8>,
81    /// <https://streams.spec.whatwg.org/#pull-into-descriptor-buffer-byte-length>
82    buffer_byte_length: u64,
83    /// <https://streams.spec.whatwg.org/#pull-into-descriptor-byte-offset>
84    byte_offset: u64,
85    /// <https://streams.spec.whatwg.org/#pull-into-descriptor-byte-length>
86    byte_length: u64,
87    /// <https://streams.spec.whatwg.org/#pull-into-descriptor-bytes-filled>
88    bytes_filled: Cell<u64>,
89    /// <https://streams.spec.whatwg.org/#pull-into-descriptor-minimum-fill>
90    minimum_fill: u64,
91    /// <https://streams.spec.whatwg.org/#pull-into-descriptor-element-size>
92    element_size: u64,
93    /// <https://streams.spec.whatwg.org/#pull-into-descriptor-view-constructor>
94    view_constructor: Constructor,
95    /// <https://streams.spec.whatwg.org/#pull-into-descriptor-reader-type>
96    reader_type: Option<ReaderType>,
97}
98
99/// The fulfillment handler for
100/// <https://streams.spec.whatwg.org/#dom-underlyingsource-start>
101#[derive(Clone, JSTraceable, MallocSizeOf)]
102#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
103struct StartAlgorithmFulfillmentHandler {
104    controller: Dom<ReadableByteStreamController>,
105}
106
107impl Callback for StartAlgorithmFulfillmentHandler {
108    /// Continuation of <https://streams.spec.whatwg.org/#set-up-readable-byte-stream-controller>
109    /// Upon fulfillment of startPromise,
110    fn callback(&self, cx: &mut CurrentRealm, _v: HandleValue) {
111        let can_gc = CanGc::from_cx(cx);
112        // Set controller.[[started]] to true.
113        self.controller.started.set(true);
114
115        // Assert: controller.[[pulling]] is false.
116        assert!(!self.controller.pulling.get());
117
118        // Assert: controller.[[pullAgain]] is false.
119        assert!(!self.controller.pull_again.get());
120
121        // Perform ! ReadableByteStreamControllerCallPullIfNeeded(controller).
122        self.controller.call_pull_if_needed(can_gc);
123    }
124}
125
126/// The rejection handler for
127/// <https://streams.spec.whatwg.org/#dom-underlyingsource-start>
128#[derive(Clone, JSTraceable, MallocSizeOf)]
129#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
130struct StartAlgorithmRejectionHandler {
131    controller: Dom<ReadableByteStreamController>,
132}
133
134impl Callback for StartAlgorithmRejectionHandler {
135    /// Continuation of <https://streams.spec.whatwg.org/#set-up-readable-byte-stream-controller>
136    /// Upon rejection of startPromise with reason r,
137    fn callback(&self, cx: &mut CurrentRealm, v: HandleValue) {
138        let can_gc = CanGc::from_cx(cx);
139        // Perform ! ReadableByteStreamControllerError(controller, r).
140        self.controller.error(v, can_gc);
141    }
142}
143
144/// The fulfillment handler for
145/// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-call-pull-if-needed>
146#[derive(Clone, JSTraceable, MallocSizeOf)]
147#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
148struct PullAlgorithmFulfillmentHandler {
149    controller: Dom<ReadableByteStreamController>,
150}
151
152impl Callback for PullAlgorithmFulfillmentHandler {
153    /// Continuation of <https://streams.spec.whatwg.org/#readable-byte-stream-controller-call-pull-if-needed>
154    /// Upon fulfillment of pullPromise
155    fn callback(&self, cx: &mut CurrentRealm, _v: HandleValue) {
156        let can_gc = CanGc::from_cx(cx);
157        // Set controller.[[pulling]] to false.
158        self.controller.pulling.set(false);
159
160        // If controller.[[pullAgain]] is true,
161        if self.controller.pull_again.get() {
162            // Set controller.[[pullAgain]] to false.
163            self.controller.pull_again.set(false);
164
165            // Perform ! ReadableByteStreamControllerCallPullIfNeeded(controller).
166            self.controller.call_pull_if_needed(can_gc);
167        }
168    }
169}
170
171/// The rejection handler for
172/// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-call-pull-if-needed>
173#[derive(Clone, JSTraceable, MallocSizeOf)]
174#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
175struct PullAlgorithmRejectionHandler {
176    controller: Dom<ReadableByteStreamController>,
177}
178
179impl Callback for PullAlgorithmRejectionHandler {
180    /// Continuation of <https://streams.spec.whatwg.org/#readable-stream-byte-controller-call-pull-if-needed>
181    /// Upon rejection of pullPromise with reason e.
182    fn callback(&self, cx: &mut CurrentRealm, v: HandleValue) {
183        let can_gc = CanGc::from_cx(cx);
184        // Perform ! ReadableByteStreamControllerError(controller, e).
185        self.controller.error(v, can_gc);
186    }
187}
188
189/// <https://streams.spec.whatwg.org/#readablebytestreamcontroller>
190#[dom_struct]
191pub(crate) struct ReadableByteStreamController {
192    reflector_: Reflector,
193    /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-autoallocatechunksize>
194    auto_allocate_chunk_size: Option<u64>,
195    /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-stream>
196    stream: MutNullableDom<ReadableStream>,
197    /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-strategyhwm>
198    strategy_hwm: f64,
199    /// A mutable reference to the underlying source is used to implement these two
200    /// internal slots:
201    ///
202    /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-pullalgorithm>
203    /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-cancelalgorithm>
204    underlying_source: MutNullableDom<UnderlyingSourceContainer>,
205    /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-queue>
206    queue: DomRefCell<VecDeque<QueueEntry>>,
207    /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-queuetotalsize>
208    queue_total_size: Cell<f64>,
209    /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-byobrequest>
210    byob_request: MutNullableDom<ReadableStreamBYOBRequest>,
211    /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-pendingpullintos>
212    pending_pull_intos: DomRefCell<Vec<PullIntoDescriptor>>,
213    /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-closerequested>
214    close_requested: Cell<bool>,
215    /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-started>
216    started: Cell<bool>,
217    /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-pulling>
218    pulling: Cell<bool>,
219    /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-pullalgorithm>
220    pull_again: Cell<bool>,
221}
222
223impl ReadableByteStreamController {
224    #[cfg_attr(crown, expect(crown::unrooted_must_root))]
225    fn new_inherited(
226        underlying_source_type: UnderlyingSourceType,
227        strategy_hwm: f64,
228        global: &GlobalScope,
229        can_gc: CanGc,
230    ) -> ReadableByteStreamController {
231        let underlying_source_container =
232            UnderlyingSourceContainer::new(global, underlying_source_type, can_gc);
233        let auto_allocate_chunk_size = underlying_source_container.auto_allocate_chunk_size();
234        ReadableByteStreamController {
235            reflector_: Reflector::new(),
236            byob_request: MutNullableDom::new(None),
237            stream: MutNullableDom::new(None),
238            underlying_source: MutNullableDom::new(Some(&*underlying_source_container)),
239            auto_allocate_chunk_size,
240            pending_pull_intos: DomRefCell::new(Vec::new()),
241            strategy_hwm,
242            close_requested: Default::default(),
243            queue: DomRefCell::new(Default::default()),
244            queue_total_size: Default::default(),
245            started: Default::default(),
246            pulling: Default::default(),
247            pull_again: Default::default(),
248        }
249    }
250
251    #[cfg_attr(crown, expect(crown::unrooted_must_root))]
252    pub(crate) fn new(
253        underlying_source_type: UnderlyingSourceType,
254        strategy_hwm: f64,
255        global: &GlobalScope,
256        can_gc: CanGc,
257    ) -> DomRoot<ReadableByteStreamController> {
258        reflect_dom_object(
259            Box::new(ReadableByteStreamController::new_inherited(
260                underlying_source_type,
261                strategy_hwm,
262                global,
263                can_gc,
264            )),
265            global,
266            can_gc,
267        )
268    }
269
270    pub(crate) fn set_stream(&self, stream: &ReadableStream) {
271        self.stream.set(Some(stream))
272    }
273
274    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-pull-into>
275    pub(crate) fn perform_pull_into(
276        &self,
277        cx: SafeJSContext,
278        read_into_request: &ReadIntoRequest,
279        view: HeapBufferSource<ArrayBufferViewU8>,
280        min: u64,
281        can_gc: CanGc,
282    ) {
283        // Let stream be controller.[[stream]].
284        let stream = self.stream.get().unwrap();
285
286        // Let elementSize be 1.
287        let mut element_size = 1;
288
289        // Let ctor be %DataView%.
290        let mut ctor = Constructor::DataView;
291
292        // If view has a [[TypedArrayName]] internal slot (i.e., it is not a DataView),
293        if view.has_typed_array_name() {
294            // Set elementSize to the element size specified in the
295            // typed array constructors table for view.[[TypedArrayName]].
296            let view_typw = view.get_array_buffer_view_type();
297            element_size = byte_size(view_typw);
298
299            // Set ctor to the constructor specified in the typed array constructors table for view.[[TypedArrayName]].
300            ctor = Constructor::Name(view_typw);
301        }
302
303        // Let minimumFill be min × elementSize.
304        let minimum_fill = min * element_size;
305
306        // Assert: minimumFill ≥ 0 and minimumFill ≤ view.[[ByteLength]].
307        assert!(minimum_fill <= (view.byte_length() as u64));
308
309        // Assert: the remainder after dividing minimumFill by elementSize is 0.
310        assert_eq!(minimum_fill % element_size, 0);
311
312        // Let byteOffset be view.[[ByteOffset]].
313        let byte_offset = view.get_byte_offset();
314
315        // Let byteLength be view.[[ByteLength]].
316        let byte_length = view.byte_length();
317
318        // Let bufferResult be TransferArrayBuffer(view.[[ViewedArrayBuffer]]).
319        match view
320            .get_array_buffer_view_buffer(cx)
321            .transfer_array_buffer(cx)
322        {
323            Ok(buffer) => {
324                // Let buffer be bufferResult.[[Value]].
325                // Let pullIntoDescriptor be a new pull-into descriptor with
326                // buffer   buffer
327                // buffer byte length   buffer.[[ArrayBufferByteLength]]
328                // byte offset  byteOffset
329                // byte length  byteLength
330                // bytes filled  0
331                // minimum fill minimumFill
332                // element size elementSize
333                // view constructor ctor
334                // reader type  "byob"
335                let buffer_byte_length = buffer.byte_length();
336                let pull_into_descriptor = PullIntoDescriptor {
337                    buffer,
338                    buffer_byte_length: buffer_byte_length as u64,
339                    byte_offset: byte_offset as u64,
340                    byte_length: byte_length as u64,
341                    bytes_filled: Cell::new(0),
342                    minimum_fill,
343                    element_size,
344                    view_constructor: ctor.clone(),
345                    reader_type: Some(ReaderType::Byob),
346                };
347
348                // If controller.[[pendingPullIntos]] is not empty,
349                {
350                    let mut pending_pull_intos = self.pending_pull_intos.borrow_mut();
351                    if !pending_pull_intos.is_empty() {
352                        // Append pullIntoDescriptor to controller.[[pendingPullIntos]].
353                        pending_pull_intos.push(pull_into_descriptor);
354
355                        // Perform ! ReadableStreamAddReadIntoRequest(stream, readIntoRequest).
356                        stream.add_read_into_request(read_into_request);
357
358                        // Return.
359                        return;
360                    }
361                }
362
363                // If stream.[[state]] is "closed",
364                if stream.is_closed() {
365                    // Let emptyView be ! Construct(ctor, « pullIntoDescriptor’s buffer,
366                    // pullIntoDescriptor’s byte offset, 0 »).
367                    if let Ok(empty_view) = create_buffer_source_with_constructor(
368                        cx,
369                        &ctor,
370                        &pull_into_descriptor.buffer,
371                        pull_into_descriptor.byte_offset as usize,
372                        0,
373                    ) {
374                        // Perform readIntoRequest’s close steps, given emptyView.
375                        let result = RootedTraceableBox::new(Heap::default());
376                        rooted!(in(*cx) let mut view_value = UndefinedValue());
377                        empty_view.get_buffer_view_value(cx, view_value.handle_mut());
378                        result.set(*view_value);
379
380                        read_into_request.close_steps(Some(result), can_gc);
381
382                        // Return.
383                        return;
384                    } else {
385                        return;
386                    }
387                }
388
389                // If controller.[[queueTotalSize]] > 0,
390                if self.queue_total_size.get() > 0.0 {
391                    // If ! ReadableByteStreamControllerFillPullIntoDescriptorFromQueue(
392                    // controller, pullIntoDescriptor) is true,
393                    if self.fill_pull_into_descriptor_from_queue(cx, &pull_into_descriptor) {
394                        // Let filledView be ! ReadableByteStreamControllerConvertPullIntoDescriptor(
395                        // pullIntoDescriptor).
396                        if let Ok(filled_view) =
397                            self.convert_pull_into_descriptor(cx, &pull_into_descriptor)
398                        {
399                            // Perform ! ReadableByteStreamControllerHandleQueueDrain(controller).
400                            self.handle_queue_drain(can_gc);
401
402                            // Perform readIntoRequest’s chunk steps, given filledView.
403                            let result = RootedTraceableBox::new(Heap::default());
404                            rooted!(in(*cx) let mut view_value = UndefinedValue());
405                            filled_view.get_buffer_view_value(cx, view_value.handle_mut());
406                            result.set(*view_value);
407                            read_into_request.chunk_steps(result, can_gc);
408
409                            // Return.
410                            return;
411                        } else {
412                            return;
413                        }
414                    }
415
416                    // If controller.[[closeRequested]] is true,
417                    if self.close_requested.get() {
418                        // Let e be a new TypeError exception.
419                        rooted!(in(*cx) let mut error = UndefinedValue());
420                        Error::Type("close requested".to_owned()).to_jsval(
421                            cx,
422                            &self.global(),
423                            error.handle_mut(),
424                            can_gc,
425                        );
426
427                        // Perform ! ReadableByteStreamControllerError(controller, e).
428                        self.error(error.handle(), can_gc);
429
430                        // Perform readIntoRequest’s error steps, given e.
431                        read_into_request.error_steps(error.handle(), can_gc);
432
433                        // Return.
434                        return;
435                    }
436                }
437
438                // Append pullIntoDescriptor to controller.[[pendingPullIntos]].
439                {
440                    self.pending_pull_intos
441                        .borrow_mut()
442                        .push(pull_into_descriptor);
443                }
444                // Perform ! ReadableStreamAddReadIntoRequest(stream, readIntoRequest).
445                stream.add_read_into_request(read_into_request);
446
447                // Perform ! ReadableByteStreamControllerCallPullIfNeeded(controller).
448                self.call_pull_if_needed(can_gc);
449            },
450            Err(error) => {
451                // If bufferResult is an abrupt completion,
452
453                // Perform readIntoRequest’s error steps, given bufferResult.[[Value]].
454                rooted!(in(*cx) let mut rval = UndefinedValue());
455                error
456                    .clone()
457                    .to_jsval(cx, &self.global(), rval.handle_mut(), can_gc);
458                read_into_request.error_steps(rval.handle(), can_gc);
459
460                // Return.
461            },
462        }
463    }
464
465    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-respond>
466    pub(crate) fn respond(
467        &self,
468        cx: SafeJSContext,
469        bytes_written: u64,
470        can_gc: CanGc,
471    ) -> Fallible<()> {
472        {
473            // Assert: controller.[[pendingPullIntos]] is not empty.
474            let mut pending_pull_intos = self.pending_pull_intos.borrow_mut();
475            assert!(!pending_pull_intos.is_empty());
476
477            // Let firstDescriptor be controller.[[pendingPullIntos]][0].
478            let first_descriptor = pending_pull_intos.first_mut().unwrap();
479
480            // Let state be controller.[[stream]].[[state]].
481            let stream = self.stream.get().unwrap();
482
483            // If state is "closed",
484            if stream.is_closed() {
485                // If bytesWritten is not 0, throw a TypeError exception.
486                if bytes_written != 0 {
487                    return Err(Error::Type(
488                        "bytesWritten not zero on closed stream".to_owned(),
489                    ));
490                }
491            } else {
492                // Assert: state is "readable".
493                assert!(stream.is_readable());
494
495                // If bytesWritten is 0, throw a TypeError exception.
496                if bytes_written == 0 {
497                    return Err(Error::Type("bytesWritten is 0".to_owned()));
498                }
499
500                // If firstDescriptor’s bytes filled + bytesWritten > firstDescriptor’s byte length,
501                // throw a RangeError exception.
502                if first_descriptor.bytes_filled.get() + bytes_written >
503                    first_descriptor.byte_length
504                {
505                    return Err(Error::Range(
506                        "bytes filled + bytesWritten > byte length".to_owned(),
507                    ));
508                }
509            }
510
511            // Set firstDescriptor’s buffer to ! TransferArrayBuffer(firstDescriptor’s buffer).
512            first_descriptor.buffer = first_descriptor
513                .buffer
514                .transfer_array_buffer(cx)
515                .expect("TransferArrayBuffer failed");
516        }
517
518        // Perform ? ReadableByteStreamControllerRespondInternal(controller, bytesWritten).
519        self.respond_internal(cx, bytes_written, can_gc)
520    }
521
522    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-respond-internal>
523    pub(crate) fn respond_internal(
524        &self,
525        cx: SafeJSContext,
526        bytes_written: u64,
527        can_gc: CanGc,
528    ) -> Fallible<()> {
529        {
530            // Let firstDescriptor be controller.[[pendingPullIntos]][0].
531            let pending_pull_intos = self.pending_pull_intos.borrow();
532            let first_descriptor = pending_pull_intos.first().unwrap();
533
534            // Assert: ! CanTransferArrayBuffer(firstDescriptor’s buffer) is true
535            assert!(first_descriptor.buffer.can_transfer_array_buffer(cx));
536        }
537
538        // Perform ! ReadableByteStreamControllerInvalidateBYOBRequest(controller).
539        self.invalidate_byob_request();
540
541        // Let state be controller.[[stream]].[[state]].
542        let stream = self.stream.get().unwrap();
543
544        // If state is "closed",
545        if stream.is_closed() {
546            // Assert: bytesWritten is 0.
547            assert_eq!(bytes_written, 0);
548
549            // Perform ! ReadableByteStreamControllerRespondInClosedState(controller, firstDescriptor).
550            self.respond_in_closed_state(cx, can_gc)
551                .expect("respond_in_closed_state failed");
552        } else {
553            // Assert: state is "readable".
554            assert!(stream.is_readable());
555
556            // Assert: bytesWritten > 0.
557            assert!(bytes_written > 0);
558
559            // Perform ? ReadableByteStreamControllerRespondInReadableState(controller, bytesWritten, firstDescriptor).
560            self.respond_in_readable_state(cx, bytes_written, can_gc)?;
561        }
562
563        // Perform ! ReadableByteStreamControllerCallPullIfNeeded(controller).
564        self.call_pull_if_needed(can_gc);
565
566        Ok(())
567    }
568
569    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-respond-in-closed-state>
570    pub(crate) fn respond_in_closed_state(&self, cx: SafeJSContext, can_gc: CanGc) -> Fallible<()> {
571        let pending_pull_intos = self.pending_pull_intos.borrow();
572        let first_descriptor = pending_pull_intos.first().unwrap();
573
574        // Assert: the remainder after dividing firstDescriptor’s bytes filled
575        // by firstDescriptor’s element size is 0.
576        assert_eq!(
577            first_descriptor.bytes_filled.get() % first_descriptor.element_size,
578            0
579        );
580
581        // If firstDescriptor’s reader type is "none",
582        // perform ! ReadableByteStreamControllerShiftPendingPullInto(controller).
583        let reader_type = first_descriptor.reader_type.is_none();
584
585        // needed to drop the borrow and avoid BorrowMutError
586        drop(pending_pull_intos);
587
588        if reader_type {
589            self.shift_pending_pull_into();
590        }
591
592        // Let stream be controller.[[stream]].
593        let stream = self.stream.get().unwrap();
594
595        // If ! ReadableStreamHasBYOBReader(stream) is true,
596        if stream.has_byob_reader() {
597            // Let filledPullIntos be a new empty list.
598            let mut filled_pull_intos = Vec::new();
599
600            // While filledPullIntos’s size < ! ReadableStreamGetNumReadIntoRequests(stream),
601            while filled_pull_intos.len() < stream.get_num_read_into_requests() {
602                // Let pullIntoDescriptor be ! ReadableByteStreamControllerShiftPendingPullInto(controller).
603                let pull_into_descriptor = self.shift_pending_pull_into();
604
605                // Append pullIntoDescriptor to filledPullIntos.
606                filled_pull_intos.push(pull_into_descriptor);
607            }
608
609            // For each filledPullInto of filledPullIntos,
610            for filled_pull_into in filled_pull_intos {
611                // Perform ! ReadableByteStreamControllerCommitPullIntoDescriptor(stream, filledPullInto).
612                self.commit_pull_into_descriptor(cx, &filled_pull_into, can_gc)
613                    .expect("commit_pull_into_descriptor failed");
614            }
615        }
616
617        Ok(())
618    }
619
620    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-respond-in-readable-state>
621    pub(crate) fn respond_in_readable_state(
622        &self,
623        cx: SafeJSContext,
624        bytes_written: u64,
625        can_gc: CanGc,
626    ) -> Fallible<()> {
627        let pending_pull_intos = self.pending_pull_intos.borrow();
628        let first_descriptor = pending_pull_intos.first().unwrap();
629
630        // Assert: pullIntoDescriptor’s bytes filled + bytesWritten ≤ pullIntoDescriptor’s byte length.
631        assert!(
632            first_descriptor.bytes_filled.get() + bytes_written <= first_descriptor.byte_length
633        );
634
635        // Perform ! ReadableByteStreamControllerFillHeadPullIntoDescriptor(
636        // controller, bytesWritten, pullIntoDescriptor).
637        self.fill_head_pull_into_descriptor(bytes_written, first_descriptor);
638
639        // If pullIntoDescriptor’s reader type is "none",
640        if first_descriptor.reader_type.is_none() {
641            // needed to drop the borrow and avoid BorrowMutError
642            drop(pending_pull_intos);
643
644            // Perform ? ReadableByteStreamControllerEnqueueDetachedPullIntoToQueue(controller, pullIntoDescriptor).
645            self.enqueue_detached_pull_into_to_queue(cx, can_gc)?;
646
647            // Let filledPullIntos be the result of performing
648            // ! ReadableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(controller).
649            let filled_pull_intos = self.process_pull_into_descriptors_using_queue(cx);
650
651            // For each filledPullInto of filledPullIntos,
652            for filled_pull_into in filled_pull_intos {
653                // Perform ! ReadableByteStreamControllerCommitPullIntoDescriptor(controller.[[stream]]
654                // , filledPullInto).
655                self.commit_pull_into_descriptor(cx, &filled_pull_into, can_gc)
656                    .expect("commit_pull_into_descriptor failed");
657            }
658
659            // Return.
660            return Ok(());
661        }
662
663        // If pullIntoDescriptor’s bytes filled < pullIntoDescriptor’s minimum fill, return.
664        if first_descriptor.bytes_filled.get() < first_descriptor.minimum_fill {
665            return Ok(());
666        }
667
668        // needed to drop the borrow and avoid BorrowMutError
669        drop(pending_pull_intos);
670
671        // Perform ! ReadableByteStreamControllerShiftPendingPullInto(controller).
672        let pull_into_descriptor = self.shift_pending_pull_into();
673
674        // Let remainderSize be the remainder after dividing pullIntoDescriptor’s bytes
675        // filled by pullIntoDescriptor’s element size.
676        let remainder_size =
677            pull_into_descriptor.bytes_filled.get() % pull_into_descriptor.element_size;
678
679        // If remainderSize > 0,
680        if remainder_size > 0 {
681            // Let end be pullIntoDescriptor’s byte offset + pullIntoDescriptor’s bytes filled.
682            let end = pull_into_descriptor.byte_offset + pull_into_descriptor.bytes_filled.get();
683
684            // Perform ? ReadableByteStreamControllerEnqueueClonedChunkToQueue(controller,
685            // pullIntoDescriptor’s buffer, end − remainderSize, remainderSize).
686            self.enqueue_cloned_chunk_to_queue(
687                cx,
688                &pull_into_descriptor.buffer,
689                end - remainder_size,
690                remainder_size,
691                can_gc,
692            )?;
693        }
694
695        // Set pullIntoDescriptor’s bytes filled to pullIntoDescriptor’s bytes filled − remainderSize.
696        pull_into_descriptor
697            .bytes_filled
698            .set(pull_into_descriptor.bytes_filled.get() - remainder_size);
699
700        // Let filledPullIntos be the result of performing
701        // ! ReadableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(controller).
702        let filled_pull_intos = self.process_pull_into_descriptors_using_queue(cx);
703
704        // Perform ! ReadableByteStreamControllerCommitPullIntoDescriptor(controller.[[stream]], pullIntoDescriptor).
705        self.commit_pull_into_descriptor(cx, &pull_into_descriptor, can_gc)
706            .expect("commit_pull_into_descriptor failed");
707
708        // For each filledPullInto of filledPullIntos,
709        for filled_pull_into in filled_pull_intos {
710            // Perform ! ReadableByteStreamControllerCommitPullIntoDescriptor(controller.[[stream]], filledPullInto).
711            self.commit_pull_into_descriptor(cx, &filled_pull_into, can_gc)
712                .expect("commit_pull_into_descriptor failed");
713        }
714
715        Ok(())
716    }
717
718    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-respond-with-new-view>
719    pub(crate) fn respond_with_new_view(
720        &self,
721        cx: SafeJSContext,
722        view: HeapBufferSource<ArrayBufferViewU8>,
723        can_gc: CanGc,
724    ) -> Fallible<()> {
725        let view_byte_length;
726        {
727            // Assert: controller.[[pendingPullIntos]] is not empty.
728            let mut pending_pull_intos = self.pending_pull_intos.borrow_mut();
729            assert!(!pending_pull_intos.is_empty());
730
731            // Assert: ! IsDetachedBuffer(view.[[ViewedArrayBuffer]]) is false.
732            assert!(!view.is_detached_buffer(cx));
733
734            // Let firstDescriptor be controller.[[pendingPullIntos]][0].
735            let first_descriptor = pending_pull_intos.first_mut().unwrap();
736
737            // Let state be controller.[[stream]].[[state]].
738            let stream = self.stream.get().unwrap();
739
740            // If state is "closed",
741            if stream.is_closed() {
742                // If view.[[ByteLength]] is not 0, throw a TypeError exception.
743                if view.byte_length() != 0 {
744                    return Err(Error::Type("view byte length is not 0".to_owned()));
745                }
746            } else {
747                // Assert: state is "readable".
748                assert!(stream.is_readable());
749
750                // If view.[[ByteLength]] is 0, throw a TypeError exception.
751                if view.byte_length() == 0 {
752                    return Err(Error::Type("view byte length is 0".to_owned()));
753                }
754            }
755
756            // If firstDescriptor’s byte offset + firstDescriptor’ bytes filled is not view.[[ByteOffset]],
757            // throw a RangeError exception.
758            if first_descriptor.byte_offset + first_descriptor.bytes_filled.get() !=
759                (view.get_byte_offset() as u64)
760            {
761                return Err(Error::Range(
762                    "firstDescriptor's byte offset + bytes filled is not view byte offset"
763                        .to_owned(),
764                ));
765            }
766
767            // If firstDescriptor’s buffer byte length is not view.[[ViewedArrayBuffer]].[[ByteLength]],
768            // throw a RangeError exception.
769            if first_descriptor.buffer_byte_length !=
770                (view.viewed_buffer_array_byte_length(cx) as u64)
771            {
772                return Err(Error::Range(
773                "firstDescriptor's buffer byte length is not view viewed buffer array byte length"
774                    .to_owned(),
775            ));
776            }
777
778            // If firstDescriptor’s bytes filled + view.[[ByteLength]] > firstDescriptor’s byte length,
779            // throw a RangeError exception.
780            if first_descriptor.bytes_filled.get() + (view.byte_length()) as u64 >
781                first_descriptor.byte_length
782            {
783                return Err(Error::Range(
784                    "bytes filled + view byte length > byte length".to_owned(),
785                ));
786            }
787
788            // Let viewByteLength be view.[[ByteLength]].
789            view_byte_length = view.byte_length();
790
791            // Set firstDescriptor’s buffer to ? TransferArrayBuffer(view.[[ViewedArrayBuffer]]).
792            first_descriptor.buffer = view
793                .get_array_buffer_view_buffer(cx)
794                .transfer_array_buffer(cx)?;
795        }
796
797        // Perform ? ReadableByteStreamControllerRespondInternal(controller, viewByteLength).
798        self.respond_internal(cx, view_byte_length as u64, can_gc)
799    }
800
801    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-get-desired-size>
802    pub(crate) fn get_desired_size(&self) -> Option<f64> {
803        // Let state be controller.[[stream]].[[state]].
804        let stream = self.stream.get()?;
805
806        // If state is "errored", return null.
807        if stream.is_errored() {
808            return None;
809        }
810
811        // If state is "closed", return 0.
812        if stream.is_closed() {
813            return Some(0.0);
814        }
815
816        // Return controller.[[strategyHWM]] − controller.[[queueTotalSize]].
817        Some(self.strategy_hwm - self.queue_total_size.get())
818    }
819
820    /// <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamcontrollergetbyobrequest>
821    pub(crate) fn get_byob_request(
822        &self,
823        cx: SafeJSContext,
824        can_gc: CanGc,
825    ) -> Fallible<Option<DomRoot<ReadableStreamBYOBRequest>>> {
826        // If controller.[[byobRequest]] is null and controller.[[pendingPullIntos]] is not empty,
827        let pending_pull_intos = self.pending_pull_intos.borrow();
828        if self.byob_request.get().is_none() && !pending_pull_intos.is_empty() {
829            // Let firstDescriptor be controller.[[pendingPullIntos]][0].
830            let first_descriptor = pending_pull_intos.first().unwrap();
831            // Let view be ! Construct(%Uint8Array%, « firstDescriptor’s buffer,
832            // firstDescriptor’s byte offset + firstDescriptor’s bytes filled,
833            // firstDescriptor’s byte length − firstDescriptor’s bytes filled »).
834
835            let byte_offset = first_descriptor.byte_offset + first_descriptor.bytes_filled.get();
836            let byte_length = first_descriptor.byte_length - first_descriptor.bytes_filled.get();
837
838            let view = create_buffer_source_with_constructor(
839                cx,
840                &Constructor::Name(Type::Uint8),
841                &first_descriptor.buffer,
842                byte_offset as usize,
843                byte_length as usize,
844            )
845            .expect("Construct Uint8Array failed");
846
847            // Let byobRequest be a new ReadableStreamBYOBRequest.
848            let byob_request = ReadableStreamBYOBRequest::new(&self.global(), can_gc);
849
850            // Set byobRequest.[[controller]] to controller.
851            byob_request.set_controller(Some(&DomRoot::from_ref(self)));
852
853            // Set byobRequest.[[view]] to view.
854            byob_request.set_view(Some(view));
855
856            // Set controller.[[byobRequest]] to byobRequest.
857            self.byob_request.set(Some(&byob_request));
858        }
859
860        // Return controller.[[byobRequest]].
861        Ok(self.byob_request.get())
862    }
863
864    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-close>
865    pub(crate) fn close(&self, cx: SafeJSContext, can_gc: CanGc) -> Fallible<()> {
866        // Let stream be controller.[[stream]].
867        let stream = self.stream.get().unwrap();
868
869        // If controller.[[closeRequested]] is true or stream.[[state]] is not "readable", return.
870        if self.close_requested.get() || !stream.is_readable() {
871            return Ok(());
872        }
873
874        // If controller.[[queueTotalSize]] > 0,
875        if self.queue_total_size.get() > 0.0 {
876            // Set controller.[[closeRequested]] to true.
877            self.close_requested.set(true);
878            // Return.
879            return Ok(());
880        }
881
882        // If controller.[[pendingPullIntos]] is not empty,
883        let pending_pull_intos = self.pending_pull_intos.borrow();
884        if !pending_pull_intos.is_empty() {
885            // Let firstPendingPullInto be controller.[[pendingPullIntos]][0].
886            let first_pending_pull_into = pending_pull_intos.first().unwrap();
887
888            // If the remainder after dividing firstPendingPullInto’s bytes filled by
889            // firstPendingPullInto’s element size is not 0,
890            if first_pending_pull_into.bytes_filled.get() % first_pending_pull_into.element_size !=
891                0
892            {
893                // needed to drop the borrow and avoid BorrowMutError
894                drop(pending_pull_intos);
895
896                // Let e be a new TypeError exception.
897                let e = Error::Type(
898                    "remainder after dividing firstPendingPullInto's bytes
899                    filled by firstPendingPullInto's element size is not 0"
900                        .to_owned(),
901                );
902
903                // Perform ! ReadableByteStreamControllerError(controller, e).
904                rooted!(in(*cx) let mut error = UndefinedValue());
905                e.clone()
906                    .to_jsval(cx, &self.global(), error.handle_mut(), can_gc);
907                self.error(error.handle(), can_gc);
908
909                // Throw e.
910                return Err(e);
911            }
912        }
913
914        // Perform ! ReadableByteStreamControllerClearAlgorithms(controller).
915        self.clear_algorithms();
916
917        // Perform ! ReadableStreamClose(stream).
918        stream.close(can_gc);
919        Ok(())
920    }
921
922    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-error>
923    pub(crate) fn error(&self, e: SafeHandleValue, can_gc: CanGc) {
924        // Let stream be controller.[[stream]].
925        let stream = self.stream.get().unwrap();
926
927        // If stream.[[state]] is not "readable", return.
928        if !stream.is_readable() {
929            return;
930        }
931
932        // Perform ! ReadableByteStreamControllerClearPendingPullIntos(controller).
933        self.clear_pending_pull_intos();
934
935        // Perform ! ResetQueue(controller).
936        self.reset_queue();
937
938        // Perform ! ReadableByteStreamControllerClearAlgorithms(controller).
939        self.clear_algorithms();
940
941        // Perform ! ReadableStreamError(stream, e).
942        stream.error(e, can_gc);
943    }
944
945    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-clear-algorithms>
946    fn clear_algorithms(&self) {
947        // Set controller.[[pullAlgorithm]] to undefined.
948        // Set controller.[[cancelAlgorithm]] to undefined.
949        self.underlying_source.set(None);
950    }
951
952    /// <https://streams.spec.whatwg.org/#reset-queue>
953    pub(crate) fn reset_queue(&self) {
954        // Assert: container has [[queue]] and [[queueTotalSize]] internal slots.
955
956        // Set container.[[queue]] to a new empty list.
957        self.queue.borrow_mut().clear();
958
959        // Set container.[[queueTotalSize]] to 0.
960        self.queue_total_size.set(0.0);
961    }
962
963    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-clear-pending-pull-intos>
964    pub(crate) fn clear_pending_pull_intos(&self) {
965        // Perform ! ReadableByteStreamControllerInvalidateBYOBRequest(controller).
966        self.invalidate_byob_request();
967
968        // Set controller.[[pendingPullIntos]] to a new empty list.
969        self.pending_pull_intos.borrow_mut().clear();
970    }
971
972    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-invalidate-byob-request>
973    pub(crate) fn invalidate_byob_request(&self) {
974        if let Some(byob_request) = self.byob_request.get() {
975            // Set controller.[[byobRequest]].[[controller]] to undefined.
976            byob_request.set_controller(None);
977
978            // Set controller.[[byobRequest]].[[view]] to null.
979            byob_request.set_view(None);
980
981            // Set controller.[[byobRequest]] to null.
982            self.byob_request.set(None);
983        }
984        // If controller.[[byobRequest]] is null, return.
985    }
986
987    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-enqueue>
988    pub(crate) fn enqueue(
989        &self,
990        cx: SafeJSContext,
991        chunk: HeapBufferSource<ArrayBufferViewU8>,
992        can_gc: CanGc,
993    ) -> Fallible<()> {
994        // Let stream be controller.[[stream]].
995        let stream = self.stream.get().unwrap();
996
997        // If controller.[[closeRequested]] is true or stream.[[state]] is not "readable", return.
998        if self.close_requested.get() || !stream.is_readable() {
999            return Ok(());
1000        }
1001
1002        // Let buffer be chunk.[[ViewedArrayBuffer]].
1003        let buffer = chunk.get_array_buffer_view_buffer(cx);
1004
1005        // Let byteOffset be chunk.[[ByteOffset]].
1006        let byte_offset = chunk.get_byte_offset();
1007
1008        // Let byteLength be chunk.[[ByteLength]].
1009        let byte_length = chunk.byte_length();
1010
1011        // If ! IsDetachedBuffer(buffer) is true, throw a TypeError exception.
1012        if buffer.is_detached_buffer(cx) {
1013            return Err(Error::Type("buffer is detached".to_owned()));
1014        }
1015
1016        // Let transferredBuffer be ? TransferArrayBuffer(buffer).
1017        let transferred_buffer = buffer.transfer_array_buffer(cx)?;
1018
1019        // If controller.[[pendingPullIntos]] is not empty,
1020        {
1021            let mut pending_pull_intos = self.pending_pull_intos.borrow_mut();
1022            if !pending_pull_intos.is_empty() {
1023                // Let firstPendingPullInto be controller.[[pendingPullIntos]][0].
1024                let first_descriptor = pending_pull_intos.first_mut().unwrap();
1025                // If ! IsDetachedBuffer(firstPendingPullInto’s buffer) is true, throw a TypeError exception.
1026                if first_descriptor.buffer.is_detached_buffer(cx) {
1027                    return Err(Error::Type("buffer is detached".to_owned()));
1028                }
1029
1030                // Perform ! ReadableByteStreamControllerInvalidateBYOBRequest(controller).
1031                self.invalidate_byob_request();
1032
1033                // Set firstPendingPullInto’s buffer to ! TransferArrayBuffer(firstPendingPullInto’s buffer).
1034                first_descriptor.buffer = first_descriptor
1035                    .buffer
1036                    .transfer_array_buffer(cx)
1037                    .expect("TransferArrayBuffer failed");
1038
1039                // If firstPendingPullInto’s reader type is "none",
1040                if first_descriptor.reader_type.is_none() {
1041                    // needed to drop the borrow and avoid BorrowMutError
1042                    drop(pending_pull_intos);
1043
1044                    // perform ? ReadableByteStreamControllerEnqueueDetachedPullIntoToQueue(
1045                    // controller, firstPendingPullInto).
1046                    self.enqueue_detached_pull_into_to_queue(cx, can_gc)?;
1047                }
1048            }
1049        }
1050
1051        // If ! ReadableStreamHasDefaultReader(stream) is true,
1052        if stream.has_default_reader() {
1053            // Perform ! ReadableByteStreamControllerProcessReadRequestsUsingQueue(controller).
1054            self.process_read_requests_using_queue(cx, can_gc)
1055                .expect("process_read_requests_using_queue failed");
1056
1057            // If ! ReadableStreamGetNumReadRequests(stream) is 0,
1058            if stream.get_num_read_requests() == 0 {
1059                // Assert: controller.[[pendingPullIntos]] is empty.
1060                {
1061                    assert!(self.pending_pull_intos.borrow().is_empty());
1062                }
1063
1064                // Perform ! ReadableByteStreamControllerEnqueueChunkToQueue(
1065                // controller, transferredBuffer, byteOffset, byteLength).
1066                self.enqueue_chunk_to_queue(transferred_buffer, byte_offset, byte_length);
1067            } else {
1068                // Assert: controller.[[queue]] is empty.
1069                assert!(self.queue.borrow().is_empty());
1070
1071                // If controller.[[pendingPullIntos]] is not empty,
1072
1073                let pending_pull_intos = self.pending_pull_intos.borrow();
1074                if !pending_pull_intos.is_empty() {
1075                    // Assert: controller.[[pendingPullIntos]][0]'s reader type is "default".
1076                    assert!(matches!(
1077                        pending_pull_intos.first().unwrap().reader_type,
1078                        Some(ReaderType::Default)
1079                    ));
1080
1081                    // needed to drop the borrow and avoid BorrowMutError
1082                    drop(pending_pull_intos);
1083
1084                    // Perform ! ReadableByteStreamControllerShiftPendingPullInto(controller).
1085                    self.shift_pending_pull_into();
1086                }
1087
1088                // Let transferredView be ! Construct(%Uint8Array%, « transferredBuffer, byteOffset, byteLength »).
1089                let transferred_view = create_buffer_source_with_constructor(
1090                    cx,
1091                    &Constructor::Name(Type::Uint8),
1092                    &transferred_buffer,
1093                    byte_offset,
1094                    byte_length,
1095                )
1096                .expect("Construct Uint8Array failed");
1097
1098                // Perform ! ReadableStreamFulfillReadRequest(stream, transferredView, false).
1099                rooted!(in(*cx) let mut view_value = UndefinedValue());
1100                transferred_view.get_buffer_view_value(cx, view_value.handle_mut());
1101                stream.fulfill_read_request(view_value.handle(), false, can_gc);
1102            }
1103            // Otherwise, if ! ReadableStreamHasBYOBReader(stream) is true,
1104        } else if stream.has_byob_reader() {
1105            // Perform ! ReadableByteStreamControllerEnqueueChunkToQueue(
1106            // controller, transferredBuffer, byteOffset, byteLength).
1107            self.enqueue_chunk_to_queue(transferred_buffer, byte_offset, byte_length);
1108
1109            // Let filledPullIntos be the result of performing !
1110            // ReadableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(controller).
1111            let filled_pull_intos = self.process_pull_into_descriptors_using_queue(cx);
1112
1113            // For each filledPullInto of filledPullIntos,
1114            // Perform ! ReadableByteStreamControllerCommitPullIntoDescriptor(stream, filledPullInto).
1115            for filled_pull_into in filled_pull_intos {
1116                self.commit_pull_into_descriptor(cx, &filled_pull_into, can_gc)
1117                    .expect("commit_pull_into_descriptor failed");
1118            }
1119        } else {
1120            // Assert: ! IsReadableStreamLocked(stream) is false.
1121            assert!(!stream.is_locked());
1122
1123            // Perform ! ReadableByteStreamControllerEnqueueChunkToQueue
1124            // (controller, transferredBuffer, byteOffset, byteLength).
1125            self.enqueue_chunk_to_queue(transferred_buffer, byte_offset, byte_length);
1126        }
1127
1128        // Perform ! ReadableByteStreamControllerCallPullIfNeeded(controller).
1129        self.call_pull_if_needed(can_gc);
1130
1131        Ok(())
1132    }
1133
1134    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-commit-pull-into-descriptor>
1135    pub(crate) fn commit_pull_into_descriptor(
1136        &self,
1137        cx: SafeJSContext,
1138        pull_into_descriptor: &PullIntoDescriptor,
1139        can_gc: CanGc,
1140    ) -> Fallible<()> {
1141        // Assert: stream.[[state]] is not "errored".
1142        let stream = self.stream.get().unwrap();
1143        assert!(!stream.is_errored());
1144
1145        // Assert: pullIntoDescriptor.reader type is not "none".
1146        assert!(pull_into_descriptor.reader_type.is_some());
1147
1148        // Let done be false.
1149        let mut done = false;
1150
1151        // If stream.[[state]] is "closed",
1152        if stream.is_closed() {
1153            // Assert: the remainder after dividing pullIntoDescriptor’s bytes filled
1154            // by pullIntoDescriptor’s element size is 0.
1155            assert!(
1156                pull_into_descriptor.bytes_filled.get() % pull_into_descriptor.element_size == 0
1157            );
1158
1159            // Set done to true.
1160            done = true;
1161        }
1162
1163        // Let filledView be ! ReadableByteStreamControllerConvertPullIntoDescriptor(pullIntoDescriptor).
1164        let filled_view = self
1165            .convert_pull_into_descriptor(cx, pull_into_descriptor)
1166            .expect("convert_pull_into_descriptor failed");
1167
1168        rooted!(in(*cx) let mut view_value = UndefinedValue());
1169        filled_view.get_buffer_view_value(cx, view_value.handle_mut());
1170
1171        // If pullIntoDescriptor’s reader type is "default",
1172        if matches!(pull_into_descriptor.reader_type, Some(ReaderType::Default)) {
1173            // Perform ! ReadableStreamFulfillReadRequest(stream, filledView, done).
1174
1175            stream.fulfill_read_request(view_value.handle(), done, can_gc);
1176        } else {
1177            // Assert: pullIntoDescriptor’s reader type is "byob".
1178            assert!(matches!(
1179                pull_into_descriptor.reader_type,
1180                Some(ReaderType::Byob)
1181            ));
1182
1183            // Perform ! ReadableStreamFulfillReadIntoRequest(stream, filledView, done).
1184            stream.fulfill_read_into_request(view_value.handle(), done, can_gc);
1185        }
1186        Ok(())
1187    }
1188
1189    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-convert-pull-into-descriptor>
1190    pub(crate) fn convert_pull_into_descriptor(
1191        &self,
1192        cx: SafeJSContext,
1193        pull_into_descriptor: &PullIntoDescriptor,
1194    ) -> Fallible<HeapBufferSource<ArrayBufferViewU8>> {
1195        // Let bytesFilled be pullIntoDescriptor’s bytes filled.
1196        let bytes_filled = pull_into_descriptor.bytes_filled.get();
1197
1198        // Let elementSize be pullIntoDescriptor’s element size.
1199        let element_size = pull_into_descriptor.element_size;
1200
1201        // Assert: bytesFilled ≤ pullIntoDescriptor’s byte length.
1202        assert!(bytes_filled <= pull_into_descriptor.byte_length);
1203
1204        // Assert: the remainder after dividing bytesFilled by elementSize is 0.
1205        assert!(bytes_filled % element_size == 0);
1206
1207        // Let buffer be ! TransferArrayBuffer(pullIntoDescriptor’s buffer).
1208        let buffer = pull_into_descriptor
1209            .buffer
1210            .transfer_array_buffer(cx)
1211            .expect("TransferArrayBuffer failed");
1212
1213        // Return ! Construct(pullIntoDescriptor’s view constructor,
1214        // « buffer, pullIntoDescriptor’s byte offset, bytesFilled ÷ elementSize »).
1215        Ok(create_buffer_source_with_constructor(
1216            cx,
1217            &pull_into_descriptor.view_constructor,
1218            &buffer,
1219            pull_into_descriptor.byte_offset as usize,
1220            (bytes_filled / element_size) as usize,
1221        )
1222        .expect("Construct view failed"))
1223    }
1224
1225    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-process-pull-into-descriptors-using-queue>
1226    pub(crate) fn process_pull_into_descriptors_using_queue(
1227        &self,
1228        cx: SafeJSContext,
1229    ) -> Vec<PullIntoDescriptor> {
1230        // Assert: controller.[[closeRequested]] is false.
1231        assert!(!self.close_requested.get());
1232
1233        // Let filledPullIntos be a new empty list.
1234        let mut filled_pull_intos = Vec::new();
1235
1236        // While controller.[[pendingPullIntos]] is not empty,
1237        loop {
1238            // If controller.[[queueTotalSize]] is 0, then break.
1239            if self.queue_total_size.get() == 0.0 {
1240                break;
1241            }
1242
1243            // Let pullIntoDescriptor be controller.[[pendingPullIntos]][0].
1244            let fill_pull_result = {
1245                let pending_pull_intos = self.pending_pull_intos.borrow();
1246                let Some(pull_into_descriptor) = pending_pull_intos.first() else {
1247                    break;
1248                };
1249                self.fill_pull_into_descriptor_from_queue(cx, pull_into_descriptor)
1250            };
1251
1252            // If ! ReadableByteStreamControllerFillPullIntoDescriptorFromQueue(controller, pullIntoDescriptor) is true,
1253            if fill_pull_result {
1254                // Perform ! ReadableByteStreamControllerShiftPendingPullInto(controller).
1255                let pull_into_descriptor = self.shift_pending_pull_into();
1256
1257                // Append pullIntoDescriptor to filledPullIntos.
1258                filled_pull_intos.push(pull_into_descriptor);
1259            }
1260        }
1261
1262        // Return filledPullIntos.
1263        filled_pull_intos
1264    }
1265
1266    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-fill-pull-into-descriptor-from-queue>
1267    pub(crate) fn fill_pull_into_descriptor_from_queue(
1268        &self,
1269        cx: SafeJSContext,
1270        pull_into_descriptor: &PullIntoDescriptor,
1271    ) -> bool {
1272        // Let maxBytesToCopy be min(controller.[[queueTotalSize]],
1273        // pullIntoDescriptor’s byte length − pullIntoDescriptor’s bytes filled).
1274        let max_bytes_to_copy = min(
1275            self.queue_total_size.get() as usize,
1276            (pull_into_descriptor.byte_length - pull_into_descriptor.bytes_filled.get()) as usize,
1277        );
1278
1279        // Let maxBytesFilled be pullIntoDescriptor’s bytes filled + maxBytesToCopy.
1280        let max_bytes_filled = pull_into_descriptor.bytes_filled.get() as usize + max_bytes_to_copy;
1281
1282        // Let totalBytesToCopyRemaining be maxBytesToCopy.
1283        let mut total_bytes_to_copy_remaining = max_bytes_to_copy;
1284
1285        // Let ready be false.
1286        let mut ready = false;
1287
1288        // Assert: ! IsDetachedBuffer(pullIntoDescriptor’s buffer) is false.
1289        assert!(!pull_into_descriptor.buffer.is_detached_buffer(cx));
1290
1291        // Assert: pullIntoDescriptor’s bytes filled < pullIntoDescriptor’s minimum fill.
1292        assert!(pull_into_descriptor.bytes_filled.get() < pull_into_descriptor.minimum_fill);
1293
1294        // Let remainderBytes be the remainder after dividing maxBytesFilled by pullIntoDescriptor’s element size.
1295        let remainder_bytes = max_bytes_filled % pull_into_descriptor.element_size as usize;
1296
1297        // Let maxAlignedBytes be maxBytesFilled − remainderBytes.
1298        let max_aligned_bytes = max_bytes_filled - remainder_bytes;
1299
1300        // If maxAlignedBytes ≥ pullIntoDescriptor’s minimum fill,
1301        if max_aligned_bytes >= pull_into_descriptor.minimum_fill as usize {
1302            // Set totalBytesToCopyRemaining to maxAlignedBytes − pullIntoDescriptor’s bytes filled.
1303            total_bytes_to_copy_remaining =
1304                max_aligned_bytes - (pull_into_descriptor.bytes_filled.get() as usize);
1305
1306            // Set ready to true.
1307            ready = true;
1308        }
1309
1310        // Let queue be controller.[[queue]].
1311        let mut queue = self.queue.borrow_mut();
1312
1313        // While totalBytesToCopyRemaining > 0,
1314        while total_bytes_to_copy_remaining > 0 {
1315            // Let headOfQueue be queue[0].
1316            let head_of_queue = queue.front_mut().unwrap();
1317
1318            // Let bytesToCopy be min(totalBytesToCopyRemaining, headOfQueue’s byte length).
1319            let bytes_to_copy = total_bytes_to_copy_remaining.min(head_of_queue.byte_length);
1320
1321            // Let destStart be pullIntoDescriptor’s byte offset + pullIntoDescriptor’s bytes filled.
1322            let dest_start =
1323                pull_into_descriptor.byte_offset + pull_into_descriptor.bytes_filled.get();
1324
1325            // Let descriptorBuffer be pullIntoDescriptor’s buffer.
1326            let descriptor_buffer = &pull_into_descriptor.buffer;
1327
1328            // Let queueBuffer be headOfQueue’s buffer.
1329            let queue_buffer = &head_of_queue.buffer;
1330
1331            // Let queueByteOffset be headOfQueue’s byte offset.
1332            let queue_byte_offset = head_of_queue.byte_offset;
1333
1334            // Assert: ! CanCopyDataBlockBytes(descriptorBuffer, destStart,
1335            // queueBuffer, queueByteOffset, bytesToCopy) is true.
1336            assert!(descriptor_buffer.can_copy_data_block_bytes(
1337                cx,
1338                dest_start as usize,
1339                queue_buffer,
1340                queue_byte_offset,
1341                bytes_to_copy
1342            ));
1343
1344            // Perform ! CopyDataBlockBytes(descriptorBuffer.[[ArrayBufferData]], destStart,
1345            // queueBuffer.[[ArrayBufferData]], queueByteOffset, bytesToCopy).
1346            descriptor_buffer.copy_data_block_bytes(
1347                cx,
1348                dest_start as usize,
1349                queue_buffer,
1350                queue_byte_offset,
1351                bytes_to_copy,
1352            );
1353
1354            // If headOfQueue’s byte length is bytesToCopy,
1355            if head_of_queue.byte_length == bytes_to_copy {
1356                // Remove queue[0].
1357                queue.pop_front().unwrap();
1358            } else {
1359                // Set headOfQueue’s byte offset to headOfQueue’s byte offset + bytesToCopy.
1360                head_of_queue.byte_offset += bytes_to_copy;
1361
1362                // Set headOfQueue’s byte length to headOfQueue’s byte length − bytesToCopy.
1363                head_of_queue.byte_length -= bytes_to_copy;
1364            }
1365
1366            // Set controller.[[queueTotalSize]] to controller.[[queueTotalSize]] − bytesToCopy.
1367            self.queue_total_size
1368                .set(self.queue_total_size.get() - (bytes_to_copy as f64));
1369
1370            // Perform ! ReadableByteStreamControllerFillHeadPullIntoDescriptor(
1371            // controller, bytesToCopy, pullIntoDescriptor).
1372            self.fill_head_pull_into_descriptor(bytes_to_copy as u64, pull_into_descriptor);
1373
1374            // Set totalBytesToCopyRemaining to totalBytesToCopyRemaining − bytesToCopy.
1375            total_bytes_to_copy_remaining -= bytes_to_copy;
1376        }
1377
1378        // If ready is false,
1379        if !ready {
1380            // Assert: controller.[[queueTotalSize]] is 0.
1381            assert!(self.queue_total_size.get() == 0.0);
1382
1383            // Assert: pullIntoDescriptor’s bytes filled > 0.
1384            assert!(pull_into_descriptor.bytes_filled.get() > 0);
1385
1386            // Assert: pullIntoDescriptor’s bytes filled < pullIntoDescriptor’s minimum fill.
1387            assert!(pull_into_descriptor.bytes_filled.get() < pull_into_descriptor.minimum_fill);
1388        }
1389
1390        // Return ready.
1391        ready
1392    }
1393
1394    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-fill-head-pull-into-descriptor>
1395    pub(crate) fn fill_head_pull_into_descriptor(
1396        &self,
1397        bytes_copied: u64,
1398        pull_into_descriptor: &PullIntoDescriptor,
1399    ) {
1400        // Assert: either controller.[[pendingPullIntos]] is empty,
1401        // or controller.[[pendingPullIntos]][0] is pullIntoDescriptor.
1402        {
1403            let pending_pull_intos = self.pending_pull_intos.borrow();
1404            assert!(
1405                pending_pull_intos.is_empty() ||
1406                    pending_pull_intos.first().unwrap() == pull_into_descriptor
1407            );
1408        }
1409
1410        // Assert: controller.[[byobRequest]] is null.
1411        assert!(self.byob_request.get().is_none());
1412
1413        // Set pullIntoDescriptor’s bytes filled to bytes filled + size.
1414        pull_into_descriptor
1415            .bytes_filled
1416            .set(pull_into_descriptor.bytes_filled.get() + bytes_copied);
1417    }
1418
1419    /// <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamcontrollerenqueuedetachedpullintotoqueue>
1420    pub(crate) fn enqueue_detached_pull_into_to_queue(
1421        &self,
1422        cx: SafeJSContext,
1423        can_gc: CanGc,
1424    ) -> Fallible<()> {
1425        // first_descriptor: &PullIntoDescriptor,
1426        let pending_pull_intos = self.pending_pull_intos.borrow();
1427        let first_descriptor = pending_pull_intos.first().unwrap();
1428
1429        // Assert: pullIntoDescriptor’s reader type is "none".
1430        assert!(first_descriptor.reader_type.is_none());
1431
1432        // If pullIntoDescriptor’s bytes filled > 0, perform ?
1433        // ReadableByteStreamControllerEnqueueClonedChunkToQueue(controller,
1434        // pullIntoDescriptor’s buffer, pullIntoDescriptor’s byte offset, pullIntoDescriptor’s bytes filled).
1435
1436        if first_descriptor.bytes_filled.get() > 0 {
1437            self.enqueue_cloned_chunk_to_queue(
1438                cx,
1439                &first_descriptor.buffer,
1440                first_descriptor.byte_offset,
1441                first_descriptor.bytes_filled.get(),
1442                can_gc,
1443            )?;
1444        }
1445
1446        // needed to drop the borrow and avoid BorrowMutError
1447        drop(pending_pull_intos);
1448
1449        // Perform ! ReadableByteStreamControllerShiftPendingPullInto(controller).
1450        self.shift_pending_pull_into();
1451
1452        Ok(())
1453    }
1454
1455    /// <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamcontrollerenqueueclonedchunktoqueue>
1456    pub(crate) fn enqueue_cloned_chunk_to_queue(
1457        &self,
1458        cx: SafeJSContext,
1459        buffer: &HeapBufferSource<ArrayBufferU8>,
1460        byte_offset: u64,
1461        byte_length: u64,
1462        can_gc: CanGc,
1463    ) -> Fallible<()> {
1464        // Let cloneResult be CloneArrayBuffer(buffer, byteOffset, byteLength, %ArrayBuffer%).
1465        if let Ok(clone_result) =
1466            buffer.clone_array_buffer(cx, byte_offset as usize, byte_length as usize)
1467        {
1468            // Perform ! ReadableByteStreamControllerEnqueueChunkToQueue
1469            // (controller, cloneResult.[[Value]], 0, byteLength).
1470            self.enqueue_chunk_to_queue(clone_result, 0, byte_length as usize);
1471
1472            Ok(())
1473        } else {
1474            // If cloneResult is an abrupt completion,
1475
1476            // Perform ! ReadableByteStreamControllerError(controller, cloneResult.[[Value]]).
1477            rooted!(in(*cx) let mut rval = UndefinedValue());
1478            let error = Error::Type("can not clone array buffer".to_owned());
1479            error
1480                .clone()
1481                .to_jsval(cx, &self.global(), rval.handle_mut(), can_gc);
1482            self.error(rval.handle(), can_gc);
1483
1484            // Return cloneResult.
1485            Err(error)
1486        }
1487    }
1488
1489    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-enqueue-chunk-to-queue>
1490    pub(crate) fn enqueue_chunk_to_queue(
1491        &self,
1492        buffer: HeapBufferSource<ArrayBufferU8>,
1493        byte_offset: usize,
1494        byte_length: usize,
1495    ) {
1496        // Let entry be a new ReadableByteStreamQueueEntry object.
1497        let entry = QueueEntry::new(buffer, byte_offset, byte_length);
1498
1499        // Append entry to controller.[[queue]].
1500        self.queue.borrow_mut().push_back(entry);
1501
1502        // Set controller.[[queueTotalSize]] to controller.[[queueTotalSize]] + byteLength.
1503        self.queue_total_size
1504            .set(self.queue_total_size.get() + byte_length as f64);
1505    }
1506
1507    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-shift-pending-pull-into>
1508    pub(crate) fn shift_pending_pull_into(&self) -> PullIntoDescriptor {
1509        // Assert: controller.[[byobRequest]] is null.
1510        assert!(self.byob_request.get().is_none());
1511
1512        // Let descriptor be controller.[[pendingPullIntos]][0].
1513        // Remove descriptor from controller.[[pendingPullIntos]].
1514        // Return descriptor.
1515        self.pending_pull_intos.borrow_mut().remove(0)
1516    }
1517
1518    /// <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamcontrollerprocessreadrequestsusingqueue>
1519    pub(crate) fn process_read_requests_using_queue(
1520        &self,
1521        cx: SafeJSContext,
1522        can_gc: CanGc,
1523    ) -> Fallible<()> {
1524        // Let reader be controller.[[stream]].[[reader]].
1525        // Assert: reader implements ReadableStreamDefaultReader.
1526        let reader = self.stream.get().unwrap().get_default_reader();
1527
1528        // Step 3
1529        reader.process_read_requests(cx, DomRoot::from_ref(self), can_gc)
1530    }
1531
1532    /// <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamcontrollerfillreadrequestfromqueue>
1533    pub(crate) fn fill_read_request_from_queue(
1534        &self,
1535        cx: SafeJSContext,
1536        read_request: &ReadRequest,
1537        can_gc: CanGc,
1538    ) -> Fallible<()> {
1539        // Assert: controller.[[queueTotalSize]] > 0.
1540        assert!(self.queue_total_size.get() > 0.0);
1541        // Also assert that the queue has a non-zero length;
1542        assert!(!self.queue.borrow().is_empty());
1543
1544        // Let entry be controller.[[queue]][0].
1545        // Remove entry from controller.[[queue]].
1546        let entry = self.remove_entry();
1547
1548        // Set controller.[[queueTotalSize]] to controller.[[queueTotalSize]] − entry’s byte length.
1549        self.queue_total_size
1550            .set(self.queue_total_size.get() - entry.byte_length as f64);
1551
1552        // Perform ! ReadableByteStreamControllerHandleQueueDrain(controller).
1553        self.handle_queue_drain(can_gc);
1554
1555        // Let view be ! Construct(%Uint8Array%, « entry’s buffer, entry’s byte offset, entry’s byte length »).
1556        let view = create_buffer_source_with_constructor(
1557            cx,
1558            &Constructor::Name(Type::Uint8),
1559            &entry.buffer,
1560            entry.byte_offset,
1561            entry.byte_length,
1562        )
1563        .expect("Construct Uint8Array failed");
1564
1565        // Perform readRequest’s chunk steps, given view.
1566        let result = RootedTraceableBox::new(Heap::default());
1567        rooted!(in(*cx) let mut view_value = UndefinedValue());
1568        view.get_buffer_view_value(cx, view_value.handle_mut());
1569        result.set(*view_value);
1570
1571        read_request.chunk_steps(result, &self.global(), can_gc);
1572
1573        Ok(())
1574    }
1575
1576    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-handle-queue-drain>
1577    pub(crate) fn handle_queue_drain(&self, can_gc: CanGc) {
1578        // Assert: controller.[[stream]].[[state]] is "readable".
1579        assert!(self.stream.get().unwrap().is_readable());
1580
1581        // If controller.[[queueTotalSize]] is 0 and controller.[[closeRequested]] is true,
1582        if self.queue_total_size.get() == 0.0 && self.close_requested.get() {
1583            // Perform ! ReadableByteStreamControllerClearAlgorithms(controller).
1584            self.clear_algorithms();
1585
1586            // Perform ! ReadableStreamClose(controller.[[stream]]).
1587            self.stream.get().unwrap().close(can_gc);
1588        } else {
1589            // Perform ! ReadableByteStreamControllerCallPullIfNeeded(controller).
1590            self.call_pull_if_needed(can_gc);
1591        }
1592    }
1593
1594    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-call-pull-if-needed>
1595    pub(crate) fn call_pull_if_needed(&self, can_gc: CanGc) {
1596        // Let shouldPull be ! ReadableByteStreamControllerShouldCallPull(controller).
1597        let should_pull = self.should_call_pull();
1598        // If shouldPull is false, return.
1599        if !should_pull {
1600            return;
1601        }
1602
1603        // If controller.[[pulling]] is true,
1604        if self.pulling.get() {
1605            // Set controller.[[pullAgain]] to true.
1606            self.pull_again.set(true);
1607
1608            // Return.
1609            return;
1610        }
1611
1612        // Assert: controller.[[pullAgain]] is false.
1613        assert!(!self.pull_again.get());
1614
1615        // Set controller.[[pulling]] to true.
1616        self.pulling.set(true);
1617
1618        // Let pullPromise be the result of performing controller.[[pullAlgorithm]].
1619        // Continues into the resolve and reject handling of the native handler.
1620        let global = self.global();
1621        let rooted_controller = DomRoot::from_ref(self);
1622        let controller = Controller::ReadableByteStreamController(rooted_controller.clone());
1623
1624        if let Some(underlying_source) = self.underlying_source.get() {
1625            let handler = PromiseNativeHandler::new(
1626                &global,
1627                Some(Box::new(PullAlgorithmFulfillmentHandler {
1628                    controller: Dom::from_ref(&rooted_controller),
1629                })),
1630                Some(Box::new(PullAlgorithmRejectionHandler {
1631                    controller: Dom::from_ref(&rooted_controller),
1632                })),
1633                can_gc,
1634            );
1635
1636            let realm = enter_realm(&*global);
1637            let comp = InRealm::Entered(&realm);
1638            let result = underlying_source
1639                .call_pull_algorithm(controller, &global, can_gc)
1640                .unwrap_or_else(|| {
1641                    let promise = Promise::new(&global, can_gc);
1642                    promise.resolve_native(&(), can_gc);
1643                    Ok(promise)
1644                });
1645            let promise = result.unwrap_or_else(|error| {
1646                let cx = GlobalScope::get_cx();
1647                rooted!(in(*cx) let mut rval = UndefinedValue());
1648                // TODO: check if `self.global()` is the right globalscope.
1649                error
1650                    .clone()
1651                    .to_jsval(cx, &self.global(), rval.handle_mut(), can_gc);
1652                let promise = Promise::new(&global, can_gc);
1653                promise.reject_native(&rval.handle(), can_gc);
1654                promise
1655            });
1656            promise.append_native_handler(&handler, comp, can_gc);
1657        }
1658    }
1659
1660    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-should-call-pull>
1661    fn should_call_pull(&self) -> bool {
1662        // Let stream be controller.[[stream]].
1663        // Note: the spec does not assert that stream is not undefined here,
1664        // so we return false if it is.
1665        let stream = self.stream.get().unwrap();
1666
1667        // If stream.[[state]] is not "readable", return false.
1668        if !stream.is_readable() {
1669            return false;
1670        }
1671
1672        // If controller.[[closeRequested]] is true, return false.
1673        if self.close_requested.get() {
1674            return false;
1675        }
1676
1677        // If controller.[[started]] is false, return false.
1678        if !self.started.get() {
1679            return false;
1680        }
1681
1682        // If ! ReadableStreamHasDefaultReader(stream) is true and ! ReadableStreamGetNumReadRequests(stream) > 0
1683        // , return true.
1684        if stream.has_default_reader() && stream.get_num_read_requests() > 0 {
1685            return true;
1686        }
1687
1688        // If ! ReadableStreamHasBYOBReader(stream) is true and ! ReadableStreamGetNumReadIntoRequests(stream) > 0
1689        // , return true.
1690        if stream.has_byob_reader() && stream.get_num_read_into_requests() > 0 {
1691            return true;
1692        }
1693
1694        // Let desiredSize be ! ReadableByteStreamControllerGetDesiredSize(controller).
1695        let desired_size = self.get_desired_size();
1696
1697        // Assert: desiredSize is not null.
1698        assert!(desired_size.is_some());
1699
1700        // If desiredSize > 0, return true.
1701        if desired_size.unwrap() > 0. {
1702            return true;
1703        }
1704
1705        // Return false.
1706        false
1707    }
1708    /// <https://streams.spec.whatwg.org/#set-up-readable-byte-stream-controller>
1709    pub(crate) fn setup(
1710        &self,
1711        global: &GlobalScope,
1712        stream: DomRoot<ReadableStream>,
1713        can_gc: CanGc,
1714    ) -> Fallible<()> {
1715        // Assert: stream.[[controller]] is undefined.
1716        stream.assert_no_controller();
1717
1718        // If autoAllocateChunkSize is not undefined,
1719        if self.auto_allocate_chunk_size.is_some() {
1720            // Assert: ! IsInteger(autoAllocateChunkSize) is true. Implicit
1721            // Assert: autoAllocateChunkSize is positive. (Implicit by type.)
1722        }
1723
1724        // Set controller.[[stream]] to stream.
1725        self.stream.set(Some(&stream));
1726
1727        // Set controller.[[pullAgain]] and controller.[[pulling]] to false.
1728        self.pull_again.set(false);
1729        self.pulling.set(false);
1730
1731        // Set controller.[[byobRequest]] to null.
1732        self.byob_request.set(None);
1733
1734        // Perform ! ResetQueue(controller).
1735        self.reset_queue();
1736
1737        // Set controller.[[closeRequested]] and controller.[[started]] to false.
1738        self.close_requested.set(false);
1739        self.started.set(false);
1740
1741        // Set controller.[[strategyHWM]] to highWaterMark.
1742        // Set controller.[[pullAlgorithm]] to pullAlgorithm.
1743        // Set controller.[[cancelAlgorithm]] to cancelAlgorithm.
1744        // Set controller.[[autoAllocateChunkSize]] to autoAllocateChunkSize.
1745        // Set controller.[[pendingPullIntos]] to a new empty list.
1746        // Note: the above steps are done in `new`.
1747
1748        // Set stream.[[controller]] to controller.
1749        let rooted_byte_controller = DomRoot::from_ref(self);
1750        stream.set_byte_controller(&rooted_byte_controller);
1751
1752        if let Some(underlying_source) = rooted_byte_controller.underlying_source.get() {
1753            // Let startResult be the result of performing startAlgorithm. (This might throw an exception.)
1754            let start_result = underlying_source
1755                .call_start_algorithm(
1756                    Controller::ReadableByteStreamController(rooted_byte_controller.clone()),
1757                    can_gc,
1758                )
1759                .unwrap_or_else(|| {
1760                    let promise = Promise::new(global, can_gc);
1761                    promise.resolve_native(&(), can_gc);
1762                    Ok(promise)
1763                });
1764
1765            // Let startPromise be a promise resolved with startResult.
1766            let start_promise = start_result?;
1767
1768            // Upon fulfillment of startPromise, Upon rejection of startPromise with reason r,
1769            let handler = PromiseNativeHandler::new(
1770                global,
1771                Some(Box::new(StartAlgorithmFulfillmentHandler {
1772                    controller: Dom::from_ref(&rooted_byte_controller),
1773                })),
1774                Some(Box::new(StartAlgorithmRejectionHandler {
1775                    controller: Dom::from_ref(&rooted_byte_controller),
1776                })),
1777                can_gc,
1778            );
1779            let realm = enter_realm(global);
1780            let comp = InRealm::Entered(&realm);
1781            start_promise.append_native_handler(&handler, comp, can_gc);
1782        };
1783
1784        Ok(())
1785    }
1786
1787    // <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamcontroller-releasesteps
1788    pub(crate) fn perform_release_steps(&self) -> Fallible<()> {
1789        // If this.[[pendingPullIntos]] is not empty,
1790        let mut pending_pull_intos = self.pending_pull_intos.borrow_mut();
1791        if !pending_pull_intos.is_empty() {
1792            // Let firstPendingPullInto be this.[[pendingPullIntos]][0].
1793            let mut first_pending_pull_into = pending_pull_intos.remove(0);
1794
1795            // Set firstPendingPullInto’s reader type to "none".
1796            first_pending_pull_into.reader_type = None;
1797
1798            // Set this.[[pendingPullIntos]] to the list « firstPendingPullInto »
1799            pending_pull_intos.clear();
1800            pending_pull_intos.push(first_pending_pull_into);
1801        }
1802        Ok(())
1803    }
1804
1805    /// <https://streams.spec.whatwg.org/#rbs-controller-private-cancel>
1806    pub(crate) fn perform_cancel_steps(
1807        &self,
1808        cx: SafeJSContext,
1809        global: &GlobalScope,
1810        reason: SafeHandleValue,
1811        can_gc: CanGc,
1812    ) -> Rc<Promise> {
1813        // Perform ! ReadableByteStreamControllerClearPendingPullIntos(this).
1814        self.clear_pending_pull_intos();
1815
1816        // Perform ! ResetQueue(this).
1817        self.reset_queue();
1818
1819        let underlying_source = self
1820            .underlying_source
1821            .get()
1822            .expect("Controller should have a source when the cancel steps are called into.");
1823
1824        // Let result be the result of performing this.[[cancelAlgorithm]], passing in reason.
1825        let result = underlying_source
1826            .call_cancel_algorithm(cx, global, reason, can_gc)
1827            .unwrap_or_else(|| {
1828                let promise = Promise::new(global, can_gc);
1829                promise.resolve_native(&(), can_gc);
1830                Ok(promise)
1831            });
1832
1833        let promise = result.unwrap_or_else(|error| {
1834            let cx = GlobalScope::get_cx();
1835            rooted!(in(*cx) let mut rval = UndefinedValue());
1836            error
1837                .clone()
1838                .to_jsval(cx, global, rval.handle_mut(), can_gc);
1839            let promise = Promise::new(global, can_gc);
1840            promise.reject_native(&rval.handle(), can_gc);
1841            promise
1842        });
1843
1844        // Perform ! ReadableByteStreamControllerClearAlgorithms(this).
1845        self.clear_algorithms();
1846
1847        // Return result(the promise).
1848        promise
1849    }
1850
1851    /// <https://streams.spec.whatwg.org/#rbs-controller-private-pull>
1852    pub(crate) fn perform_pull_steps(
1853        &self,
1854        cx: SafeJSContext,
1855        read_request: &ReadRequest,
1856        can_gc: CanGc,
1857    ) {
1858        // Let stream be this.[[stream]].
1859        let stream = self.stream.get().unwrap();
1860
1861        // Assert: ! ReadableStreamHasDefaultReader(stream) is true.
1862        assert!(stream.has_default_reader());
1863
1864        // If this.[[queueTotalSize]] > 0,
1865        if self.queue_total_size.get() > 0.0 {
1866            // Assert: ! ReadableStreamGetNumReadRequests(stream) is 0.
1867            assert_eq!(stream.get_num_read_requests(), 0);
1868
1869            // Perform ! ReadableByteStreamControllerFillReadRequestFromQueue(this, readRequest).
1870            let _ = self.fill_read_request_from_queue(cx, read_request, can_gc);
1871
1872            // Return.
1873            return;
1874        }
1875
1876        // Let autoAllocateChunkSize be this.[[autoAllocateChunkSize]].
1877        let auto_allocate_chunk_size = self.auto_allocate_chunk_size;
1878
1879        // If autoAllocateChunkSize is not undefined,
1880        if let Some(auto_allocate_chunk_size) = auto_allocate_chunk_size {
1881            // create_array_buffer_with_size
1882            // Let buffer be Construct(%ArrayBuffer%, « autoAllocateChunkSize »).
1883            match create_array_buffer_with_size(cx, auto_allocate_chunk_size as usize) {
1884                Ok(buffer) => {
1885                    // Let pullIntoDescriptor be a new pull-into descriptor with
1886                    // buffer buffer.[[Value]]
1887                    // buffer byte length autoAllocateChunkSize
1888                    // byte offset  0
1889                    // byte length  autoAllocateChunkSize
1890                    // bytes filled  0
1891                    // minimum fill 1
1892                    // element size 1
1893                    // view constructor %Uint8Array%
1894                    // reader type  "default"
1895                    let pull_into_descriptor = PullIntoDescriptor {
1896                        buffer,
1897                        buffer_byte_length: auto_allocate_chunk_size,
1898                        byte_length: auto_allocate_chunk_size,
1899                        byte_offset: 0,
1900                        bytes_filled: Cell::new(0),
1901                        minimum_fill: 1,
1902                        element_size: 1,
1903                        view_constructor: Constructor::Name(Type::Uint8),
1904                        reader_type: Some(ReaderType::Default),
1905                    };
1906
1907                    // Append pullIntoDescriptor to this.[[pendingPullIntos]].
1908                    self.pending_pull_intos
1909                        .borrow_mut()
1910                        .push(pull_into_descriptor);
1911                },
1912                Err(error) => {
1913                    // If buffer is an abrupt completion,
1914                    // Perform readRequest’s error steps, given buffer.[[Value]].
1915
1916                    rooted!(in(*cx) let mut rval = UndefinedValue());
1917                    error
1918                        .clone()
1919                        .to_jsval(cx, &self.global(), rval.handle_mut(), can_gc);
1920                    read_request.error_steps(rval.handle(), can_gc);
1921
1922                    // Return.
1923                    return;
1924                },
1925            }
1926        }
1927
1928        // Perform ! ReadableStreamAddReadRequest(stream, readRequest).
1929        stream.add_read_request(read_request);
1930
1931        // Perform ! ReadableByteStreamControllerCallPullIfNeeded(this).
1932        self.call_pull_if_needed(can_gc);
1933    }
1934
1935    /// Setting the JS object after the heap has settled down.
1936    pub(crate) fn set_underlying_source_this_object(&self, this_object: HandleObject) {
1937        if let Some(underlying_source) = self.underlying_source.get() {
1938            underlying_source.set_underlying_source_this_object(this_object);
1939        }
1940    }
1941
1942    pub(crate) fn remove_entry(&self) -> QueueEntry {
1943        self.queue
1944            .borrow_mut()
1945            .pop_front()
1946            .expect("Reader must have read request when remove is called into.")
1947    }
1948
1949    pub(crate) fn get_queue_total_size(&self) -> f64 {
1950        self.queue_total_size.get()
1951    }
1952
1953    pub(crate) fn get_pending_pull_intos_size(&self) -> usize {
1954        self.pending_pull_intos.borrow().len()
1955    }
1956}
1957
1958impl ReadableByteStreamControllerMethods<crate::DomTypeHolder> for ReadableByteStreamController {
1959    /// <https://streams.spec.whatwg.org/#rbs-controller-byob-request>
1960    fn GetByobRequest(
1961        &self,
1962        can_gc: CanGc,
1963    ) -> Fallible<Option<DomRoot<ReadableStreamBYOBRequest>>> {
1964        let cx = GlobalScope::get_cx();
1965        // Return ! ReadableByteStreamControllerGetBYOBRequest(this).
1966        self.get_byob_request(cx, can_gc)
1967    }
1968
1969    /// <https://streams.spec.whatwg.org/#rbs-controller-desired-size>
1970    fn GetDesiredSize(&self) -> Option<f64> {
1971        // Return ! ReadableByteStreamControllerGetDesiredSize(this).
1972        self.get_desired_size()
1973    }
1974
1975    /// <https://streams.spec.whatwg.org/#rbs-controller-close>
1976    fn Close(&self, can_gc: CanGc) -> Fallible<()> {
1977        let cx = GlobalScope::get_cx();
1978        // If this.[[closeRequested]] is true, throw a TypeError exception.
1979        if self.close_requested.get() {
1980            return Err(Error::Type("closeRequested is true".to_owned()));
1981        }
1982
1983        // If this.[[stream]].[[state]] is not "readable", throw a TypeError exception.
1984        if !self.stream.get().unwrap().is_readable() {
1985            return Err(Error::Type("stream is not readable".to_owned()));
1986        }
1987
1988        // Perform ? ReadableByteStreamControllerClose(this).
1989        self.close(cx, can_gc)
1990    }
1991
1992    /// <https://streams.spec.whatwg.org/#rbs-controller-enqueue>
1993    fn Enqueue(
1994        &self,
1995        chunk: js::gc::CustomAutoRooterGuard<js::typedarray::ArrayBufferView>,
1996        can_gc: CanGc,
1997    ) -> Fallible<()> {
1998        let cx = GlobalScope::get_cx();
1999
2000        let chunk = HeapBufferSource::<ArrayBufferViewU8>::from_view(chunk);
2001
2002        // If chunk.[[ByteLength]] is 0, throw a TypeError exception.
2003        if chunk.byte_length() == 0 {
2004            return Err(Error::Type("chunk.ByteLength is 0".to_owned()));
2005        }
2006
2007        // If chunk.[[ViewedArrayBuffer]].[[ByteLength]] is 0, throw a TypeError exception.
2008        if chunk.viewed_buffer_array_byte_length(cx) == 0 {
2009            return Err(Error::Type(
2010                "chunk.ViewedArrayBuffer.ByteLength is 0".to_owned(),
2011            ));
2012        }
2013
2014        // If this.[[closeRequested]] is true, throw a TypeError exception.
2015        if self.close_requested.get() {
2016            return Err(Error::Type("closeRequested is true".to_owned()));
2017        }
2018
2019        // If this.[[stream]].[[state]] is not "readable", throw a TypeError exception.
2020        if !self.stream.get().unwrap().is_readable() {
2021            return Err(Error::Type("stream is not readable".to_owned()));
2022        }
2023
2024        // Return ? ReadableByteStreamControllerEnqueue(this, chunk).
2025        self.enqueue(cx, chunk, can_gc)
2026    }
2027
2028    /// <https://streams.spec.whatwg.org/#rbs-controller-error>
2029    fn Error(&self, _cx: SafeJSContext, e: SafeHandleValue, can_gc: CanGc) -> Fallible<()> {
2030        // Perform ! ReadableByteStreamControllerError(this, e).
2031        self.error(e, can_gc);
2032        Ok(())
2033    }
2034}