script/dom/
readablebytestreamcontroller.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4
5use std::cell::Cell;
6use std::cmp::min;
7use std::collections::VecDeque;
8use std::rc::Rc;
9
10use dom_struct::dom_struct;
11use js::jsapi::{Heap, Type};
12use js::jsval::UndefinedValue;
13use js::rust::{HandleObject, HandleValue as SafeHandleValue, HandleValue};
14use js::typedarray::{ArrayBufferU8, ArrayBufferViewU8};
15
16use super::bindings::buffer_source::HeapBufferSource;
17use super::bindings::cell::DomRefCell;
18use super::bindings::codegen::Bindings::ReadableStreamBYOBReaderBinding::ReadableStreamBYOBReaderReadOptions;
19use super::bindings::reflector::reflect_dom_object;
20use super::bindings::root::Dom;
21use super::readablestreambyobreader::ReadIntoRequest;
22use super::readablestreamdefaultreader::ReadRequest;
23use super::underlyingsourcecontainer::{UnderlyingSourceContainer, UnderlyingSourceType};
24use crate::dom::bindings::buffer_source::{
25    Constructor, byte_size, create_array_buffer_with_size, create_buffer_source_with_constructor,
26};
27use crate::dom::bindings::codegen::Bindings::ReadableByteStreamControllerBinding::ReadableByteStreamControllerMethods;
28use crate::dom::bindings::codegen::UnionTypes::ReadableStreamDefaultControllerOrReadableByteStreamController as Controller;
29use crate::dom::bindings::error::{Error, ErrorToJsval, Fallible};
30use crate::dom::bindings::reflector::{DomGlobal, Reflector};
31use crate::dom::bindings::root::{DomRoot, MutNullableDom};
32use crate::dom::bindings::trace::RootedTraceableBox;
33use crate::dom::globalscope::GlobalScope;
34use crate::dom::promise::Promise;
35use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
36use crate::dom::readablestream::ReadableStream;
37use crate::dom::readablestreambyobrequest::ReadableStreamBYOBRequest;
38use crate::realms::{InRealm, enter_realm};
39use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
40
41/// <https://streams.spec.whatwg.org/#readable-byte-stream-queue-entry>
42#[derive(JSTraceable, MallocSizeOf)]
43pub(crate) struct QueueEntry {
44    /// <https://streams.spec.whatwg.org/#readable-byte-stream-queue-entry-buffer>
45    #[ignore_malloc_size_of = "HeapBufferSource"]
46    buffer: HeapBufferSource<ArrayBufferU8>,
47    /// <https://streams.spec.whatwg.org/#readable-byte-stream-queue-entry-byte-offset>
48    byte_offset: usize,
49    /// <https://streams.spec.whatwg.org/#readable-byte-stream-queue-entry-byte-length>
50    byte_length: usize,
51}
52
53impl QueueEntry {
54    pub(crate) fn new(
55        buffer: HeapBufferSource<ArrayBufferU8>,
56        byte_offset: usize,
57        byte_length: usize,
58    ) -> QueueEntry {
59        QueueEntry {
60            buffer,
61            byte_offset,
62            byte_length,
63        }
64    }
65}
66
67#[derive(Debug, Eq, JSTraceable, MallocSizeOf, PartialEq)]
68pub(crate) enum ReaderType {
69    /// <https://streams.spec.whatwg.org/#readablestreambyobreader>
70    Byob,
71    /// <https://streams.spec.whatwg.org/#readablestreamdefaultreader>
72    Default,
73}
74
75/// <https://streams.spec.whatwg.org/#pull-into-descriptor>
76#[derive(Eq, JSTraceable, MallocSizeOf, PartialEq)]
77pub(crate) struct PullIntoDescriptor {
78    #[ignore_malloc_size_of = "HeapBufferSource"]
79    /// <https://streams.spec.whatwg.org/#pull-into-descriptor-buffer>
80    buffer: HeapBufferSource<ArrayBufferU8>,
81    /// <https://streams.spec.whatwg.org/#pull-into-descriptor-buffer-byte-length>
82    buffer_byte_length: u64,
83    /// <https://streams.spec.whatwg.org/#pull-into-descriptor-byte-offset>
84    byte_offset: u64,
85    /// <https://streams.spec.whatwg.org/#pull-into-descriptor-byte-length>
86    byte_length: u64,
87    /// <https://streams.spec.whatwg.org/#pull-into-descriptor-bytes-filled>
88    bytes_filled: Cell<u64>,
89    /// <https://streams.spec.whatwg.org/#pull-into-descriptor-minimum-fill>
90    minimum_fill: u64,
91    /// <https://streams.spec.whatwg.org/#pull-into-descriptor-element-size>
92    element_size: u64,
93    /// <https://streams.spec.whatwg.org/#pull-into-descriptor-view-constructor>
94    view_constructor: Constructor,
95    /// <https://streams.spec.whatwg.org/#pull-into-descriptor-reader-type>
96    reader_type: Option<ReaderType>,
97}
98
99/// The fulfillment handler for
100/// <https://streams.spec.whatwg.org/#dom-underlyingsource-start>
101#[derive(Clone, JSTraceable, MallocSizeOf)]
102#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
103struct StartAlgorithmFulfillmentHandler {
104    controller: Dom<ReadableByteStreamController>,
105}
106
107impl Callback for StartAlgorithmFulfillmentHandler {
108    /// Continuation of <https://streams.spec.whatwg.org/#set-up-readable-byte-stream-controller>
109    /// Upon fulfillment of startPromise,
110    fn callback(&self, _cx: SafeJSContext, _v: HandleValue, _realm: InRealm, can_gc: CanGc) {
111        // Set controller.[[started]] to true.
112        self.controller.started.set(true);
113
114        // Assert: controller.[[pulling]] is false.
115        assert!(!self.controller.pulling.get());
116
117        // Assert: controller.[[pullAgain]] is false.
118        assert!(!self.controller.pull_again.get());
119
120        // Perform ! ReadableByteStreamControllerCallPullIfNeeded(controller).
121        self.controller.call_pull_if_needed(can_gc);
122    }
123}
124
125/// The rejection handler for
126/// <https://streams.spec.whatwg.org/#dom-underlyingsource-start>
127#[derive(Clone, JSTraceable, MallocSizeOf)]
128#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
129struct StartAlgorithmRejectionHandler {
130    controller: Dom<ReadableByteStreamController>,
131}
132
133impl Callback for StartAlgorithmRejectionHandler {
134    /// Continuation of <https://streams.spec.whatwg.org/#set-up-readable-byte-stream-controller>
135    /// Upon rejection of startPromise with reason r,
136    fn callback(&self, _cx: SafeJSContext, v: HandleValue, _realm: InRealm, can_gc: CanGc) {
137        // Perform ! ReadableByteStreamControllerError(controller, r).
138        self.controller.error(v, can_gc);
139    }
140}
141
142/// The fulfillment handler for
143/// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-call-pull-if-needed>
144#[derive(Clone, JSTraceable, MallocSizeOf)]
145#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
146struct PullAlgorithmFulfillmentHandler {
147    controller: Dom<ReadableByteStreamController>,
148}
149
150impl Callback for PullAlgorithmFulfillmentHandler {
151    /// Continuation of <https://streams.spec.whatwg.org/#readable-byte-stream-controller-call-pull-if-needed>
152    /// Upon fulfillment of pullPromise
153    fn callback(&self, _cx: SafeJSContext, _v: HandleValue, _realm: InRealm, can_gc: CanGc) {
154        // Set controller.[[pulling]] to false.
155        self.controller.pulling.set(false);
156
157        // If controller.[[pullAgain]] is true,
158        if self.controller.pull_again.get() {
159            // Set controller.[[pullAgain]] to false.
160            self.controller.pull_again.set(false);
161
162            // Perform ! ReadableByteStreamControllerCallPullIfNeeded(controller).
163            self.controller.call_pull_if_needed(can_gc);
164        }
165    }
166}
167
168/// The rejection handler for
169/// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-call-pull-if-needed>
170#[derive(Clone, JSTraceable, MallocSizeOf)]
171#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
172struct PullAlgorithmRejectionHandler {
173    controller: Dom<ReadableByteStreamController>,
174}
175
176impl Callback for PullAlgorithmRejectionHandler {
177    /// Continuation of <https://streams.spec.whatwg.org/#readable-stream-byte-controller-call-pull-if-needed>
178    /// Upon rejection of pullPromise with reason e.
179    fn callback(&self, _cx: SafeJSContext, v: HandleValue, _realm: InRealm, can_gc: CanGc) {
180        // Perform ! ReadableByteStreamControllerError(controller, e).
181        self.controller.error(v, can_gc);
182    }
183}
184
185/// <https://streams.spec.whatwg.org/#readablebytestreamcontroller>
186#[dom_struct]
187pub(crate) struct ReadableByteStreamController {
188    reflector_: Reflector,
189    /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-autoallocatechunksize>
190    auto_allocate_chunk_size: Option<u64>,
191    /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-stream>
192    stream: MutNullableDom<ReadableStream>,
193    /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-strategyhwm>
194    strategy_hwm: f64,
195    /// A mutable reference to the underlying source is used to implement these two
196    /// internal slots:
197    ///
198    /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-pullalgorithm>
199    /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-cancelalgorithm>
200    underlying_source: MutNullableDom<UnderlyingSourceContainer>,
201    /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-queue>
202    queue: DomRefCell<VecDeque<QueueEntry>>,
203    /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-queuetotalsize>
204    queue_total_size: Cell<f64>,
205    /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-byobrequest>
206    byob_request: MutNullableDom<ReadableStreamBYOBRequest>,
207    /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-pendingpullintos>
208    pending_pull_intos: DomRefCell<Vec<PullIntoDescriptor>>,
209    /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-closerequested>
210    close_requested: Cell<bool>,
211    /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-started>
212    started: Cell<bool>,
213    /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-pulling>
214    pulling: Cell<bool>,
215    /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-pullalgorithm>
216    pull_again: Cell<bool>,
217}
218
219impl ReadableByteStreamController {
220    #[cfg_attr(crown, allow(crown::unrooted_must_root))]
221    fn new_inherited(
222        underlying_source_type: UnderlyingSourceType,
223        strategy_hwm: f64,
224        global: &GlobalScope,
225        can_gc: CanGc,
226    ) -> ReadableByteStreamController {
227        let underlying_source_container =
228            UnderlyingSourceContainer::new(global, underlying_source_type, can_gc);
229        let auto_allocate_chunk_size = underlying_source_container.auto_allocate_chunk_size();
230        ReadableByteStreamController {
231            reflector_: Reflector::new(),
232            byob_request: MutNullableDom::new(None),
233            stream: MutNullableDom::new(None),
234            underlying_source: MutNullableDom::new(Some(&*underlying_source_container)),
235            auto_allocate_chunk_size,
236            pending_pull_intos: DomRefCell::new(Vec::new()),
237            strategy_hwm,
238            close_requested: Default::default(),
239            queue: DomRefCell::new(Default::default()),
240            queue_total_size: Default::default(),
241            started: Default::default(),
242            pulling: Default::default(),
243            pull_again: Default::default(),
244        }
245    }
246
247    #[cfg_attr(crown, allow(crown::unrooted_must_root))]
248    pub(crate) fn new(
249        underlying_source_type: UnderlyingSourceType,
250        strategy_hwm: f64,
251        global: &GlobalScope,
252        can_gc: CanGc,
253    ) -> DomRoot<ReadableByteStreamController> {
254        reflect_dom_object(
255            Box::new(ReadableByteStreamController::new_inherited(
256                underlying_source_type,
257                strategy_hwm,
258                global,
259                can_gc,
260            )),
261            global,
262            can_gc,
263        )
264    }
265
266    pub(crate) fn set_stream(&self, stream: &ReadableStream) {
267        self.stream.set(Some(stream))
268    }
269
270    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-pull-into>
271    pub(crate) fn perform_pull_into(
272        &self,
273        cx: SafeJSContext,
274        read_into_request: &ReadIntoRequest,
275        view: HeapBufferSource<ArrayBufferViewU8>,
276        options: &ReadableStreamBYOBReaderReadOptions,
277        can_gc: CanGc,
278    ) {
279        // Let stream be controller.[[stream]].
280        let stream = self.stream.get().unwrap();
281
282        // Let elementSize be 1.
283        let mut element_size = 1;
284
285        // Let ctor be %DataView%.
286        let mut ctor = Constructor::DataView;
287
288        // If view has a [[TypedArrayName]] internal slot (i.e., it is not a DataView),
289        if view.has_typed_array_name() {
290            // Set elementSize to the element size specified in the
291            // typed array constructors table for view.[[TypedArrayName]].
292            let view_typw = view.get_array_buffer_view_type();
293            element_size = byte_size(view_typw);
294
295            // Set ctor to the constructor specified in the typed array constructors table for view.[[TypedArrayName]].
296            ctor = Constructor::Name(view_typw);
297        }
298
299        // Let minimumFill be min × elementSize.
300        let minimum_fill = options.min * element_size;
301
302        // Assert: minimumFill ≥ 0 and minimumFill ≤ view.[[ByteLength]].
303        assert!(minimum_fill <= (view.byte_length() as u64));
304
305        // Assert: the remainder after dividing minimumFill by elementSize is 0.
306        assert_eq!(minimum_fill % element_size, 0);
307
308        // Let byteOffset be view.[[ByteOffset]].
309        let byte_offset = view.get_byte_offset();
310
311        // Let byteLength be view.[[ByteLength]].
312        let byte_length = view.byte_length();
313
314        // Let bufferResult be TransferArrayBuffer(view.[[ViewedArrayBuffer]]).
315        match view
316            .get_array_buffer_view_buffer(cx)
317            .transfer_array_buffer(cx)
318        {
319            Ok(buffer) => {
320                // Let buffer be bufferResult.[[Value]].
321                // Let pullIntoDescriptor be a new pull-into descriptor with
322                // buffer   buffer
323                // buffer byte length   buffer.[[ArrayBufferByteLength]]
324                // byte offset  byteOffset
325                // byte length  byteLength
326                // bytes filled  0
327                // minimum fill minimumFill
328                // element size elementSize
329                // view constructor ctor
330                // reader type  "byob"
331                let buffer_byte_length = buffer.byte_length();
332                let pull_into_descriptor = PullIntoDescriptor {
333                    buffer,
334                    buffer_byte_length: buffer_byte_length as u64,
335                    byte_offset: byte_offset as u64,
336                    byte_length: byte_length as u64,
337                    bytes_filled: Cell::new(0),
338                    minimum_fill,
339                    element_size,
340                    view_constructor: ctor.clone(),
341                    reader_type: Some(ReaderType::Byob),
342                };
343
344                // If controller.[[pendingPullIntos]] is not empty,
345                {
346                    let mut pending_pull_intos = self.pending_pull_intos.borrow_mut();
347                    if !pending_pull_intos.is_empty() {
348                        // Append pullIntoDescriptor to controller.[[pendingPullIntos]].
349                        pending_pull_intos.push(pull_into_descriptor);
350
351                        // Perform ! ReadableStreamAddReadIntoRequest(stream, readIntoRequest).
352                        stream.add_read_into_request(read_into_request);
353
354                        // Return.
355                        return;
356                    }
357                }
358
359                // If stream.[[state]] is "closed",
360                if stream.is_closed() {
361                    // Let emptyView be ! Construct(ctor, « pullIntoDescriptor’s buffer,
362                    // pullIntoDescriptor’s byte offset, 0 »).
363                    if let Ok(empty_view) = create_buffer_source_with_constructor(
364                        cx,
365                        &ctor,
366                        &pull_into_descriptor.buffer,
367                        pull_into_descriptor.byte_offset as usize,
368                        0,
369                    ) {
370                        // Perform readIntoRequest’s close steps, given emptyView.
371                        let result = RootedTraceableBox::new(Heap::default());
372                        rooted!(in(*cx) let mut view_value = UndefinedValue());
373                        empty_view.get_buffer_view_value(cx, view_value.handle_mut());
374                        result.set(*view_value);
375
376                        read_into_request.close_steps(Some(result), can_gc);
377
378                        // Return.
379                        return;
380                    } else {
381                        return;
382                    }
383                }
384
385                // If controller.[[queueTotalSize]] > 0,
386                if self.queue_total_size.get() > 0.0 {
387                    // If ! ReadableByteStreamControllerFillPullIntoDescriptorFromQueue(
388                    // controller, pullIntoDescriptor) is true,
389                    if self.fill_pull_into_descriptor_from_queue(cx, &pull_into_descriptor) {
390                        // Let filledView be ! ReadableByteStreamControllerConvertPullIntoDescriptor(
391                        // pullIntoDescriptor).
392                        if let Ok(filled_view) =
393                            self.convert_pull_into_descriptor(cx, &pull_into_descriptor)
394                        {
395                            // Perform ! ReadableByteStreamControllerHandleQueueDrain(controller).
396                            self.handle_queue_drain(can_gc);
397
398                            // Perform readIntoRequest’s chunk steps, given filledView.
399                            let result = RootedTraceableBox::new(Heap::default());
400                            rooted!(in(*cx) let mut view_value = UndefinedValue());
401                            filled_view.get_buffer_view_value(cx, view_value.handle_mut());
402                            result.set(*view_value);
403                            read_into_request.chunk_steps(result, can_gc);
404
405                            // Return.
406                            return;
407                        } else {
408                            return;
409                        }
410                    }
411
412                    // If controller.[[closeRequested]] is true,
413                    if self.close_requested.get() {
414                        // Let e be a new TypeError exception.
415                        rooted!(in(*cx) let mut error = UndefinedValue());
416                        Error::Type("close requested".to_owned()).to_jsval(
417                            cx,
418                            &self.global(),
419                            error.handle_mut(),
420                            can_gc,
421                        );
422
423                        // Perform ! ReadableByteStreamControllerError(controller, e).
424                        self.error(error.handle(), can_gc);
425
426                        // Perform readIntoRequest’s error steps, given e.
427                        read_into_request.error_steps(error.handle(), can_gc);
428
429                        // Return.
430                        return;
431                    }
432                }
433
434                // Append pullIntoDescriptor to controller.[[pendingPullIntos]].
435                {
436                    self.pending_pull_intos
437                        .borrow_mut()
438                        .push(pull_into_descriptor);
439                }
440                // Perform ! ReadableStreamAddReadIntoRequest(stream, readIntoRequest).
441                stream.add_read_into_request(read_into_request);
442
443                // Perform ! ReadableByteStreamControllerCallPullIfNeeded(controller).
444                self.call_pull_if_needed(can_gc);
445            },
446            Err(error) => {
447                // If bufferResult is an abrupt completion,
448
449                // Perform readIntoRequest’s error steps, given bufferResult.[[Value]].
450                rooted!(in(*cx) let mut rval = UndefinedValue());
451                error
452                    .clone()
453                    .to_jsval(cx, &self.global(), rval.handle_mut(), can_gc);
454                read_into_request.error_steps(rval.handle(), can_gc);
455
456                // Return.
457            },
458        }
459    }
460
461    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-respond>
462    pub(crate) fn respond(
463        &self,
464        cx: SafeJSContext,
465        bytes_written: u64,
466        can_gc: CanGc,
467    ) -> Fallible<()> {
468        {
469            // Assert: controller.[[pendingPullIntos]] is not empty.
470            let mut pending_pull_intos = self.pending_pull_intos.borrow_mut();
471            assert!(!pending_pull_intos.is_empty());
472
473            // Let firstDescriptor be controller.[[pendingPullIntos]][0].
474            let first_descriptor = pending_pull_intos.first_mut().unwrap();
475
476            // Let state be controller.[[stream]].[[state]].
477            let stream = self.stream.get().unwrap();
478
479            // If state is "closed",
480            if stream.is_closed() {
481                // If bytesWritten is not 0, throw a TypeError exception.
482                if bytes_written != 0 {
483                    return Err(Error::Type(
484                        "bytesWritten not zero on closed stream".to_owned(),
485                    ));
486                }
487            } else {
488                // Assert: state is "readable".
489                assert!(stream.is_readable());
490
491                // If bytesWritten is 0, throw a TypeError exception.
492                if bytes_written == 0 {
493                    return Err(Error::Type("bytesWritten is 0".to_owned()));
494                }
495
496                // If firstDescriptor’s bytes filled + bytesWritten > firstDescriptor’s byte length,
497                // throw a RangeError exception.
498                if first_descriptor.bytes_filled.get() + bytes_written >
499                    first_descriptor.byte_length
500                {
501                    return Err(Error::Range(
502                        "bytes filled + bytesWritten > byte length".to_owned(),
503                    ));
504                }
505            }
506
507            // Set firstDescriptor’s buffer to ! TransferArrayBuffer(firstDescriptor’s buffer).
508            first_descriptor.buffer = first_descriptor.buffer.transfer_array_buffer(cx)?;
509        }
510
511        // Perform ? ReadableByteStreamControllerRespondInternal(controller, bytesWritten).
512        self.respond_internal(cx, bytes_written, can_gc)
513    }
514
515    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-respond-internal>
516    pub(crate) fn respond_internal(
517        &self,
518        cx: SafeJSContext,
519        bytes_written: u64,
520        can_gc: CanGc,
521    ) -> Fallible<()> {
522        {
523            // Let firstDescriptor be controller.[[pendingPullIntos]][0].
524            let pending_pull_intos = self.pending_pull_intos.borrow();
525            let first_descriptor = pending_pull_intos.first().unwrap();
526
527            // Assert: ! CanTransferArrayBuffer(firstDescriptor’s buffer) is true
528            assert!(first_descriptor.buffer.can_transfer_array_buffer(cx));
529        }
530
531        // Perform ! ReadableByteStreamControllerInvalidateBYOBRequest(controller).
532        self.invalidate_byob_request();
533
534        // Let state be controller.[[stream]].[[state]].
535        let stream = self.stream.get().unwrap();
536
537        // If state is "closed",
538        if stream.is_closed() {
539            // Assert: bytesWritten is 0.
540            assert_eq!(bytes_written, 0);
541
542            // Perform ! ReadableByteStreamControllerRespondInClosedState(controller, firstDescriptor).
543            self.respond_in_closed_state(cx, can_gc)?;
544        } else {
545            // Assert: state is "readable".
546            assert!(stream.is_readable());
547
548            // Assert: bytesWritten > 0.
549            assert!(bytes_written > 0);
550
551            // Perform ? ReadableByteStreamControllerRespondInReadableState(controller, bytesWritten, firstDescriptor).
552            self.respond_in_readable_state(cx, bytes_written, can_gc)?;
553        }
554
555        // Perform ! ReadableByteStreamControllerCallPullIfNeeded(controller).
556        self.call_pull_if_needed(can_gc);
557
558        Ok(())
559    }
560
561    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-respond-in-closed-state>
562    pub(crate) fn respond_in_closed_state(&self, cx: SafeJSContext, can_gc: CanGc) -> Fallible<()> {
563        let pending_pull_intos = self.pending_pull_intos.borrow();
564        let first_descriptor = pending_pull_intos.first().unwrap();
565
566        // Assert: the remainder after dividing firstDescriptor’s bytes filled
567        // by firstDescriptor’s element size is 0.
568        assert_eq!(
569            first_descriptor.bytes_filled.get() % first_descriptor.element_size,
570            0
571        );
572
573        // If firstDescriptor’s reader type is "none",
574        // perform ! ReadableByteStreamControllerShiftPendingPullInto(controller).
575        let reader_type = first_descriptor.reader_type.is_none();
576
577        // needed to drop the borrow and avoid BorrowMutError
578        drop(pending_pull_intos);
579
580        if reader_type {
581            self.shift_pending_pull_into();
582        }
583
584        // Let stream be controller.[[stream]].
585        let stream = self.stream.get().unwrap();
586
587        // If ! ReadableStreamHasBYOBReader(stream) is true,
588        if stream.has_byob_reader() {
589            // Let filledPullIntos be a new empty list.
590            let mut filled_pull_intos = Vec::new();
591
592            // While filledPullIntos’s size < ! ReadableStreamGetNumReadIntoRequests(stream),
593            while filled_pull_intos.len() < stream.get_num_read_into_requests() {
594                // Let pullIntoDescriptor be ! ReadableByteStreamControllerShiftPendingPullInto(controller).
595                let pull_into_descriptor = self.shift_pending_pull_into();
596
597                // Append pullIntoDescriptor to filledPullIntos.
598                filled_pull_intos.push(pull_into_descriptor);
599            }
600
601            // For each filledPullInto of filledPullIntos,
602            for filled_pull_into in filled_pull_intos {
603                // Perform ! ReadableByteStreamControllerCommitPullIntoDescriptor(stream, filledPullInto).
604                self.commit_pull_into_descriptor(cx, &filled_pull_into, can_gc)?;
605            }
606        }
607
608        Ok(())
609    }
610
611    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-respond-in-readable-state>
612    pub(crate) fn respond_in_readable_state(
613        &self,
614        cx: SafeJSContext,
615        bytes_written: u64,
616        can_gc: CanGc,
617    ) -> Fallible<()> {
618        let pending_pull_intos = self.pending_pull_intos.borrow();
619        let first_descriptor = pending_pull_intos.first().unwrap();
620
621        // Assert: pullIntoDescriptor’s bytes filled + bytesWritten ≤ pullIntoDescriptor’s byte length.
622        assert!(
623            first_descriptor.bytes_filled.get() + bytes_written <= first_descriptor.byte_length
624        );
625
626        // Perform ! ReadableByteStreamControllerFillHeadPullIntoDescriptor(
627        // controller, bytesWritten, pullIntoDescriptor).
628        self.fill_head_pull_into_descriptor(bytes_written, first_descriptor);
629
630        // If pullIntoDescriptor’s reader type is "none",
631        if first_descriptor.reader_type.is_none() {
632            // needed to drop the borrow and avoid BorrowMutError
633            drop(pending_pull_intos);
634
635            // Perform ? ReadableByteStreamControllerEnqueueDetachedPullIntoToQueue(controller, pullIntoDescriptor).
636            self.enqueue_detached_pull_into_to_queue(cx, can_gc)?;
637
638            // Let filledPullIntos be the result of performing
639            // ! ReadableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(controller).
640            let filled_pull_intos = self.process_pull_into_descriptors_using_queue(cx);
641
642            // For each filledPullInto of filledPullIntos,
643            for filled_pull_into in filled_pull_intos {
644                // Perform ! ReadableByteStreamControllerCommitPullIntoDescriptor(controller.[[stream]]
645                // , filledPullInto).
646                self.commit_pull_into_descriptor(cx, &filled_pull_into, can_gc)?;
647            }
648
649            // Return.
650            return Ok(());
651        }
652
653        // If pullIntoDescriptor’s bytes filled < pullIntoDescriptor’s minimum fill, return.
654        if first_descriptor.bytes_filled.get() < first_descriptor.minimum_fill {
655            return Ok(());
656        }
657
658        // needed to drop the borrow and avoid BorrowMutError
659        drop(pending_pull_intos);
660
661        // Perform ! ReadableByteStreamControllerShiftPendingPullInto(controller).
662        let pull_into_descriptor = self.shift_pending_pull_into();
663
664        // Let remainderSize be the remainder after dividing pullIntoDescriptor’s bytes
665        // filled by pullIntoDescriptor’s element size.
666        let remainder_size =
667            pull_into_descriptor.bytes_filled.get() % pull_into_descriptor.element_size;
668
669        // If remainderSize > 0,
670        if remainder_size > 0 {
671            // Let end be pullIntoDescriptor’s byte offset + pullIntoDescriptor’s bytes filled.
672            let end = pull_into_descriptor.byte_offset + pull_into_descriptor.bytes_filled.get();
673
674            // Perform ? ReadableByteStreamControllerEnqueueClonedChunkToQueue(controller,
675            // pullIntoDescriptor’s buffer, end − remainderSize, remainderSize).
676            self.enqueue_cloned_chunk_to_queue(
677                cx,
678                &pull_into_descriptor.buffer,
679                end - remainder_size,
680                remainder_size,
681                can_gc,
682            )?;
683        }
684
685        // Set pullIntoDescriptor’s bytes filled to pullIntoDescriptor’s bytes filled − remainderSize.
686        pull_into_descriptor
687            .bytes_filled
688            .set(pull_into_descriptor.bytes_filled.get() - remainder_size);
689
690        // Let filledPullIntos be the result of performing
691        // ! ReadableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(controller).
692        let filled_pull_intos = self.process_pull_into_descriptors_using_queue(cx);
693
694        // Perform ! ReadableByteStreamControllerCommitPullIntoDescriptor(controller.[[stream]], pullIntoDescriptor).
695        self.commit_pull_into_descriptor(cx, &pull_into_descriptor, can_gc)?;
696
697        // For each filledPullInto of filledPullIntos,
698        for filled_pull_into in filled_pull_intos {
699            // Perform ! ReadableByteStreamControllerCommitPullIntoDescriptor(controller.[[stream]], filledPullInto).
700            self.commit_pull_into_descriptor(cx, &filled_pull_into, can_gc)?;
701        }
702
703        Ok(())
704    }
705
706    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-respond-with-new-view>
707    pub(crate) fn respond_with_new_view(
708        &self,
709        cx: SafeJSContext,
710        view: HeapBufferSource<ArrayBufferViewU8>,
711        can_gc: CanGc,
712    ) -> Fallible<()> {
713        let view_byte_length;
714        {
715            // Assert: controller.[[pendingPullIntos]] is not empty.
716            let mut pending_pull_intos = self.pending_pull_intos.borrow_mut();
717            assert!(!pending_pull_intos.is_empty());
718
719            // Assert: ! IsDetachedBuffer(view.[[ViewedArrayBuffer]]) is false.
720            assert!(!view.is_detached_buffer(cx));
721
722            // Let firstDescriptor be controller.[[pendingPullIntos]][0].
723            let first_descriptor = pending_pull_intos.first_mut().unwrap();
724
725            // Let state be controller.[[stream]].[[state]].
726            let stream = self.stream.get().unwrap();
727
728            // If state is "closed",
729            if stream.is_closed() {
730                // If view.[[ByteLength]] is not 0, throw a TypeError exception.
731                if view.byte_length() != 0 {
732                    return Err(Error::Type("view byte length is not 0".to_owned()));
733                }
734            } else {
735                // Assert: state is "readable".
736                assert!(stream.is_readable());
737
738                // If view.[[ByteLength]] is 0, throw a TypeError exception.
739                if view.byte_length() == 0 {
740                    return Err(Error::Type("view byte length is 0".to_owned()));
741                }
742            }
743
744            // If firstDescriptor’s byte offset + firstDescriptor’ bytes filled is not view.[[ByteOffset]],
745            // throw a RangeError exception.
746            if first_descriptor.byte_offset + first_descriptor.bytes_filled.get() !=
747                (view.get_byte_offset() as u64)
748            {
749                return Err(Error::Range(
750                    "firstDescriptor's byte offset + bytes filled is not view byte offset"
751                        .to_owned(),
752                ));
753            }
754
755            // If firstDescriptor’s buffer byte length is not view.[[ViewedArrayBuffer]].[[ByteLength]],
756            // throw a RangeError exception.
757            if first_descriptor.buffer_byte_length !=
758                (view.viewed_buffer_array_byte_length(cx) as u64)
759            {
760                return Err(Error::Range(
761                "firstDescriptor's buffer byte length is not view viewed buffer array byte length"
762                    .to_owned(),
763            ));
764            }
765
766            // If firstDescriptor’s bytes filled + view.[[ByteLength]] > firstDescriptor’s byte length,
767            // throw a RangeError exception.
768            if first_descriptor.bytes_filled.get() + (view.byte_length()) as u64 >
769                first_descriptor.byte_length
770            {
771                return Err(Error::Range(
772                    "bytes filled + view byte length > byte length".to_owned(),
773                ));
774            }
775
776            // Let viewByteLength be view.[[ByteLength]].
777            view_byte_length = view.byte_length();
778
779            // Set firstDescriptor’s buffer to ? TransferArrayBuffer(view.[[ViewedArrayBuffer]]).
780            first_descriptor.buffer = view
781                .get_array_buffer_view_buffer(cx)
782                .transfer_array_buffer(cx)?;
783        }
784
785        // Perform ? ReadableByteStreamControllerRespondInternal(controller, viewByteLength).
786        self.respond_internal(cx, view_byte_length as u64, can_gc)
787    }
788
789    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-get-desired-size>
790    pub(crate) fn get_desired_size(&self) -> Option<f64> {
791        // Let state be controller.[[stream]].[[state]].
792        let stream = self.stream.get()?;
793
794        // If state is "errored", return null.
795        if stream.is_errored() {
796            return None;
797        }
798
799        // If state is "closed", return 0.
800        if stream.is_closed() {
801            return Some(0.0);
802        }
803
804        // Return controller.[[strategyHWM]] − controller.[[queueTotalSize]].
805        Some(self.strategy_hwm - self.queue_total_size.get())
806    }
807
808    /// <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamcontrollergetbyobrequest>
809    pub(crate) fn get_byob_request(
810        &self,
811        cx: SafeJSContext,
812        can_gc: CanGc,
813    ) -> Fallible<Option<DomRoot<ReadableStreamBYOBRequest>>> {
814        // If controller.[[byobRequest]] is null and controller.[[pendingPullIntos]] is not empty,
815        let pending_pull_intos = self.pending_pull_intos.borrow();
816        if self.byob_request.get().is_none() && !pending_pull_intos.is_empty() {
817            // Let firstDescriptor be controller.[[pendingPullIntos]][0].
818            let first_descriptor = pending_pull_intos.first().unwrap();
819            // Let view be ! Construct(%Uint8Array%, « firstDescriptor’s buffer,
820            // firstDescriptor’s byte offset + firstDescriptor’s bytes filled,
821            // firstDescriptor’s byte length − firstDescriptor’s bytes filled »).
822
823            let byte_offset = first_descriptor.byte_offset + first_descriptor.bytes_filled.get();
824            let byte_length = first_descriptor.byte_length - first_descriptor.bytes_filled.get();
825
826            let view = create_buffer_source_with_constructor(
827                cx,
828                &Constructor::Name(Type::Uint8),
829                &first_descriptor.buffer,
830                byte_offset as usize,
831                byte_length as usize,
832            )?;
833
834            // Let byobRequest be a new ReadableStreamBYOBRequest.
835            let byob_request = ReadableStreamBYOBRequest::new(&self.global(), can_gc);
836
837            // Set byobRequest.[[controller]] to controller.
838            byob_request.set_controller(Some(&DomRoot::from_ref(self)));
839
840            // Set byobRequest.[[view]] to view.
841            byob_request.set_view(Some(view));
842
843            // Set controller.[[byobRequest]] to byobRequest.
844            self.byob_request.set(Some(&byob_request));
845        }
846
847        // Return controller.[[byobRequest]].
848        Ok(self.byob_request.get())
849    }
850
851    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-close>
852    pub(crate) fn close(&self, cx: SafeJSContext, can_gc: CanGc) -> Fallible<()> {
853        // Let stream be controller.[[stream]].
854        let stream = self.stream.get().unwrap();
855
856        // If controller.[[closeRequested]] is true or stream.[[state]] is not "readable", return.
857        if self.close_requested.get() || !stream.is_readable() {
858            return Ok(());
859        }
860
861        // If controller.[[queueTotalSize]] > 0,
862        if self.queue_total_size.get() > 0.0 {
863            // Set controller.[[closeRequested]] to true.
864            self.close_requested.set(true);
865            // Return.
866            return Ok(());
867        }
868
869        // If controller.[[pendingPullIntos]] is not empty,
870        let pending_pull_intos = self.pending_pull_intos.borrow();
871        if !pending_pull_intos.is_empty() {
872            // Let firstPendingPullInto be controller.[[pendingPullIntos]][0].
873            let first_pending_pull_into = pending_pull_intos.first().unwrap();
874
875            // If the remainder after dividing firstPendingPullInto’s bytes filled by
876            // firstPendingPullInto’s element size is not 0,
877            if first_pending_pull_into.bytes_filled.get() % first_pending_pull_into.element_size !=
878                0
879            {
880                // needed to drop the borrow and avoid BorrowMutError
881                drop(pending_pull_intos);
882
883                // Let e be a new TypeError exception.
884                let e = Error::Type(
885                    "remainder after dividing firstPendingPullInto's bytes
886                    filled by firstPendingPullInto's element size is not 0"
887                        .to_owned(),
888                );
889
890                // Perform ! ReadableByteStreamControllerError(controller, e).
891                rooted!(in(*cx) let mut error = UndefinedValue());
892                e.clone()
893                    .to_jsval(cx, &self.global(), error.handle_mut(), can_gc);
894                self.error(error.handle(), can_gc);
895
896                // Throw e.
897                return Err(e);
898            }
899        }
900
901        // Perform ! ReadableByteStreamControllerClearAlgorithms(controller).
902        self.clear_algorithms();
903
904        // Perform ! ReadableStreamClose(stream).
905        stream.close(can_gc);
906        Ok(())
907    }
908
909    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-error>
910    pub(crate) fn error(&self, e: SafeHandleValue, can_gc: CanGc) {
911        // Let stream be controller.[[stream]].
912        let stream = self.stream.get().unwrap();
913
914        // If stream.[[state]] is not "readable", return.
915        if !stream.is_readable() {
916            return;
917        }
918
919        // Perform ! ReadableByteStreamControllerClearPendingPullIntos(controller).
920        self.clear_pending_pull_intos();
921
922        // Perform ! ResetQueue(controller).
923        self.reset_queue();
924
925        // Perform ! ReadableByteStreamControllerClearAlgorithms(controller).
926        self.clear_algorithms();
927
928        // Perform ! ReadableStreamError(stream, e).
929        stream.error(e, can_gc);
930    }
931
932    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-clear-algorithms>
933    fn clear_algorithms(&self) {
934        // Set controller.[[pullAlgorithm]] to undefined.
935        // Set controller.[[cancelAlgorithm]] to undefined.
936        self.underlying_source.set(None);
937    }
938
939    /// <https://streams.spec.whatwg.org/#reset-queue>
940    pub(crate) fn reset_queue(&self) {
941        // Assert: container has [[queue]] and [[queueTotalSize]] internal slots.
942
943        // Set container.[[queue]] to a new empty list.
944        self.queue.borrow_mut().clear();
945
946        // Set container.[[queueTotalSize]] to 0.
947        self.queue_total_size.set(0.0);
948    }
949
950    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-clear-pending-pull-intos>
951    pub(crate) fn clear_pending_pull_intos(&self) {
952        // Perform ! ReadableByteStreamControllerInvalidateBYOBRequest(controller).
953        self.invalidate_byob_request();
954
955        // Set controller.[[pendingPullIntos]] to a new empty list.
956        self.pending_pull_intos.borrow_mut().clear();
957    }
958
959    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-invalidate-byob-request>
960    pub(crate) fn invalidate_byob_request(&self) {
961        if let Some(byob_request) = self.byob_request.get() {
962            // Set controller.[[byobRequest]].[[controller]] to undefined.
963            byob_request.set_controller(None);
964
965            // Set controller.[[byobRequest]].[[view]] to null.
966            byob_request.set_view(None);
967
968            // Set controller.[[byobRequest]] to null.
969            self.byob_request.set(None);
970        }
971        // If controller.[[byobRequest]] is null, return.
972    }
973
974    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-enqueue>
975    pub(crate) fn enqueue(
976        &self,
977        cx: SafeJSContext,
978        chunk: HeapBufferSource<ArrayBufferViewU8>,
979        can_gc: CanGc,
980    ) -> Fallible<()> {
981        // Let stream be controller.[[stream]].
982        let stream = self.stream.get().unwrap();
983
984        // If controller.[[closeRequested]] is true or stream.[[state]] is not "readable", return.
985        if self.close_requested.get() || !stream.is_readable() {
986            return Ok(());
987        }
988
989        // Let buffer be chunk.[[ViewedArrayBuffer]].
990        let buffer = chunk.get_array_buffer_view_buffer(cx);
991
992        // Let byteOffset be chunk.[[ByteOffset]].
993        let byte_offset = chunk.get_byte_offset();
994
995        // Let byteLength be chunk.[[ByteLength]].
996        let byte_length = chunk.byte_length();
997
998        // If ! IsDetachedBuffer(buffer) is true, throw a TypeError exception.
999        if buffer.is_detached_buffer(cx) {
1000            return Err(Error::Type("buffer is detached".to_owned()));
1001        }
1002
1003        // Let transferredBuffer be ? TransferArrayBuffer(buffer).
1004        let transferred_buffer = buffer.transfer_array_buffer(cx)?;
1005
1006        // If controller.[[pendingPullIntos]] is not empty,
1007        {
1008            let mut pending_pull_intos = self.pending_pull_intos.borrow_mut();
1009            if !pending_pull_intos.is_empty() {
1010                // Let firstPendingPullInto be controller.[[pendingPullIntos]][0].
1011                let first_descriptor = pending_pull_intos.first_mut().unwrap();
1012                // If ! IsDetachedBuffer(firstPendingPullInto’s buffer) is true, throw a TypeError exception.
1013                if first_descriptor.buffer.is_detached_buffer(cx) {
1014                    return Err(Error::Type("buffer is detached".to_owned()));
1015                }
1016
1017                // Perform ! ReadableByteStreamControllerInvalidateBYOBRequest(controller).
1018                self.invalidate_byob_request();
1019
1020                // Set firstPendingPullInto’s buffer to ! TransferArrayBuffer(firstPendingPullInto’s buffer).
1021                first_descriptor.buffer = first_descriptor.buffer.transfer_array_buffer(cx)?;
1022
1023                // If firstPendingPullInto’s reader type is "none",
1024                if first_descriptor.reader_type.is_none() {
1025                    // needed to drop the borrow and avoid BorrowMutError
1026                    drop(pending_pull_intos);
1027
1028                    // perform ? ReadableByteStreamControllerEnqueueDetachedPullIntoToQueue(
1029                    // controller, firstPendingPullInto).
1030                    self.enqueue_detached_pull_into_to_queue(cx, can_gc)?;
1031                }
1032            }
1033        }
1034
1035        // If ! ReadableStreamHasDefaultReader(stream) is true,
1036        if stream.has_default_reader() {
1037            // Perform ! ReadableByteStreamControllerProcessReadRequestsUsingQueue(controller).
1038            self.process_read_requests_using_queue(cx, can_gc)?;
1039
1040            // If ! ReadableStreamGetNumReadRequests(stream) is 0,
1041            if stream.get_num_read_requests() == 0 {
1042                // Assert: controller.[[pendingPullIntos]] is empty.
1043                {
1044                    assert!(self.pending_pull_intos.borrow().is_empty());
1045                }
1046
1047                // Perform ! ReadableByteStreamControllerEnqueueChunkToQueue(
1048                // controller, transferredBuffer, byteOffset, byteLength).
1049                self.enqueue_chunk_to_queue(transferred_buffer, byte_offset, byte_length);
1050            } else {
1051                // Assert: controller.[[queue]] is empty.
1052                assert!(self.queue.borrow().is_empty());
1053
1054                // If controller.[[pendingPullIntos]] is not empty,
1055
1056                let pending_pull_intos = self.pending_pull_intos.borrow();
1057                if !pending_pull_intos.is_empty() {
1058                    // Assert: controller.[[pendingPullIntos]][0]'s reader type is "default".
1059                    assert!(matches!(
1060                        pending_pull_intos.first().unwrap().reader_type,
1061                        Some(ReaderType::Default)
1062                    ));
1063
1064                    // needed to drop the borrow and avoid BorrowMutError
1065                    drop(pending_pull_intos);
1066
1067                    // Perform ! ReadableByteStreamControllerShiftPendingPullInto(controller).
1068                    self.shift_pending_pull_into();
1069                }
1070
1071                // Let transferredView be ! Construct(%Uint8Array%, « transferredBuffer, byteOffset, byteLength »).
1072                let transferred_view = create_buffer_source_with_constructor(
1073                    cx,
1074                    &Constructor::Name(Type::Uint8),
1075                    &transferred_buffer,
1076                    byte_offset,
1077                    byte_length,
1078                )?;
1079
1080                // Perform ! ReadableStreamFulfillReadRequest(stream, transferredView, false).
1081                rooted!(in(*cx) let mut view_value = UndefinedValue());
1082                transferred_view.get_buffer_view_value(cx, view_value.handle_mut());
1083                stream.fulfill_read_request(view_value.handle(), false, can_gc);
1084            }
1085            // Otherwise, if ! ReadableStreamHasBYOBReader(stream) is true,
1086        } else if stream.has_byob_reader() {
1087            // Perform ! ReadableByteStreamControllerEnqueueChunkToQueue(
1088            // controller, transferredBuffer, byteOffset, byteLength).
1089            self.enqueue_chunk_to_queue(transferred_buffer, byte_offset, byte_length);
1090
1091            // Let filledPullIntos be the result of performing !
1092            // ReadableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(controller).
1093            let filled_pull_intos = self.process_pull_into_descriptors_using_queue(cx);
1094
1095            // For each filledPullInto of filledPullIntos,
1096            // Perform ! ReadableByteStreamControllerCommitPullIntoDescriptor(stream, filledPullInto).
1097            for filled_pull_into in filled_pull_intos {
1098                self.commit_pull_into_descriptor(cx, &filled_pull_into, can_gc)?;
1099            }
1100        } else {
1101            // Assert: ! IsReadableStreamLocked(stream) is false.
1102            assert!(!stream.is_locked());
1103
1104            // Perform ! ReadableByteStreamControllerEnqueueChunkToQueue
1105            // (controller, transferredBuffer, byteOffset, byteLength).
1106            self.enqueue_chunk_to_queue(transferred_buffer, byte_offset, byte_length);
1107        }
1108
1109        // Perform ! ReadableByteStreamControllerCallPullIfNeeded(controller).
1110        self.call_pull_if_needed(can_gc);
1111
1112        Ok(())
1113    }
1114
1115    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-commit-pull-into-descriptor>
1116    pub(crate) fn commit_pull_into_descriptor(
1117        &self,
1118        cx: SafeJSContext,
1119        pull_into_descriptor: &PullIntoDescriptor,
1120        can_gc: CanGc,
1121    ) -> Fallible<()> {
1122        // Assert: stream.[[state]] is not "errored".
1123        let stream = self.stream.get().unwrap();
1124        assert!(!stream.is_errored());
1125
1126        // Assert: pullIntoDescriptor.reader type is not "none".
1127        assert!(pull_into_descriptor.reader_type.is_some());
1128
1129        // Let done be false.
1130        let mut done = false;
1131
1132        // If stream.[[state]] is "closed",
1133        if stream.is_closed() {
1134            // Assert: the remainder after dividing pullIntoDescriptor’s bytes filled
1135            // by pullIntoDescriptor’s element size is 0.
1136            assert!(
1137                pull_into_descriptor.bytes_filled.get() % pull_into_descriptor.element_size == 0
1138            );
1139
1140            // Set done to true.
1141            done = true;
1142        }
1143
1144        // Let filledView be ! ReadableByteStreamControllerConvertPullIntoDescriptor(pullIntoDescriptor).
1145        let filled_view = self.convert_pull_into_descriptor(cx, pull_into_descriptor)?;
1146
1147        rooted!(in(*cx) let mut view_value = UndefinedValue());
1148        filled_view.get_buffer_view_value(cx, view_value.handle_mut());
1149
1150        // If pullIntoDescriptor’s reader type is "default",
1151        if matches!(pull_into_descriptor.reader_type, Some(ReaderType::Default)) {
1152            // Perform ! ReadableStreamFulfillReadRequest(stream, filledView, done).
1153
1154            stream.fulfill_read_request(view_value.handle(), done, can_gc);
1155        } else {
1156            // Assert: pullIntoDescriptor’s reader type is "byob".
1157            assert!(matches!(
1158                pull_into_descriptor.reader_type,
1159                Some(ReaderType::Byob)
1160            ));
1161
1162            // Perform ! ReadableStreamFulfillReadIntoRequest(stream, filledView, done).
1163            stream.fulfill_read_into_request(view_value.handle(), done, can_gc);
1164        }
1165        Ok(())
1166    }
1167
1168    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-convert-pull-into-descriptor>
1169    pub(crate) fn convert_pull_into_descriptor(
1170        &self,
1171        cx: SafeJSContext,
1172        pull_into_descriptor: &PullIntoDescriptor,
1173    ) -> Fallible<HeapBufferSource<ArrayBufferViewU8>> {
1174        // Let bytesFilled be pullIntoDescriptor’s bytes filled.
1175        let bytes_filled = pull_into_descriptor.bytes_filled.get();
1176
1177        // Let elementSize be pullIntoDescriptor’s element size.
1178        let element_size = pull_into_descriptor.element_size;
1179
1180        // Assert: bytesFilled ≤ pullIntoDescriptor’s byte length.
1181        assert!(bytes_filled <= pull_into_descriptor.byte_length);
1182
1183        // Assert: the remainder after dividing bytesFilled by elementSize is 0.
1184        assert!(bytes_filled % element_size == 0);
1185
1186        // Let buffer be ! TransferArrayBuffer(pullIntoDescriptor’s buffer).
1187        let buffer = pull_into_descriptor.buffer.transfer_array_buffer(cx)?;
1188
1189        // Return ! Construct(pullIntoDescriptor’s view constructor,
1190        // « buffer, pullIntoDescriptor’s byte offset, bytesFilled ÷ elementSize »).
1191        create_buffer_source_with_constructor(
1192            cx,
1193            &pull_into_descriptor.view_constructor,
1194            &buffer,
1195            pull_into_descriptor.byte_offset as usize,
1196            (bytes_filled / element_size) as usize,
1197        )
1198    }
1199
1200    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-process-pull-into-descriptors-using-queue>
1201    pub(crate) fn process_pull_into_descriptors_using_queue(
1202        &self,
1203        cx: SafeJSContext,
1204    ) -> Vec<PullIntoDescriptor> {
1205        // Assert: controller.[[closeRequested]] is false.
1206        assert!(!self.close_requested.get());
1207
1208        // Let filledPullIntos be a new empty list.
1209        let mut filled_pull_intos = Vec::new();
1210
1211        // While controller.[[pendingPullIntos]] is not empty,
1212        loop {
1213            // If controller.[[queueTotalSize]] is 0, then break.
1214            if self.queue_total_size.get() == 0.0 {
1215                break;
1216            }
1217
1218            // Let pullIntoDescriptor be controller.[[pendingPullIntos]][0].
1219            let fill_pull_result = {
1220                let pending_pull_intos = self.pending_pull_intos.borrow();
1221                let Some(pull_into_descriptor) = pending_pull_intos.first() else {
1222                    break;
1223                };
1224                self.fill_pull_into_descriptor_from_queue(cx, pull_into_descriptor)
1225            };
1226
1227            // If ! ReadableByteStreamControllerFillPullIntoDescriptorFromQueue(controller, pullIntoDescriptor) is true,
1228            if fill_pull_result {
1229                // Perform ! ReadableByteStreamControllerShiftPendingPullInto(controller).
1230                let pull_into_descriptor = self.shift_pending_pull_into();
1231
1232                // Append pullIntoDescriptor to filledPullIntos.
1233                filled_pull_intos.push(pull_into_descriptor);
1234            }
1235        }
1236
1237        // Return filledPullIntos.
1238        filled_pull_intos
1239    }
1240
1241    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-fill-pull-into-descriptor-from-queue>
1242    pub(crate) fn fill_pull_into_descriptor_from_queue(
1243        &self,
1244        cx: SafeJSContext,
1245        pull_into_descriptor: &PullIntoDescriptor,
1246    ) -> bool {
1247        // Let maxBytesToCopy be min(controller.[[queueTotalSize]],
1248        // pullIntoDescriptor’s byte length − pullIntoDescriptor’s bytes filled).
1249        let max_bytes_to_copy = min(
1250            self.queue_total_size.get() as usize,
1251            (pull_into_descriptor.byte_length - pull_into_descriptor.bytes_filled.get()) as usize,
1252        );
1253
1254        // Let maxBytesFilled be pullIntoDescriptor’s bytes filled + maxBytesToCopy.
1255        let max_bytes_filled = pull_into_descriptor.bytes_filled.get() as usize + max_bytes_to_copy;
1256
1257        // Let totalBytesToCopyRemaining be maxBytesToCopy.
1258        let mut total_bytes_to_copy_remaining = max_bytes_to_copy;
1259
1260        // Let ready be false.
1261        let mut ready = false;
1262
1263        // Assert: ! IsDetachedBuffer(pullIntoDescriptor’s buffer) is false.
1264        assert!(!pull_into_descriptor.buffer.is_detached_buffer(cx));
1265
1266        // Assert: pullIntoDescriptor’s bytes filled < pullIntoDescriptor’s minimum fill.
1267        assert!(pull_into_descriptor.bytes_filled.get() < pull_into_descriptor.minimum_fill);
1268
1269        // Let remainderBytes be the remainder after dividing maxBytesFilled by pullIntoDescriptor’s element size.
1270        let remainder_bytes = max_bytes_filled % pull_into_descriptor.element_size as usize;
1271
1272        // Let maxAlignedBytes be maxBytesFilled − remainderBytes.
1273        let max_aligned_bytes = max_bytes_filled - remainder_bytes;
1274
1275        // If maxAlignedBytes ≥ pullIntoDescriptor’s minimum fill,
1276        if max_aligned_bytes >= pull_into_descriptor.minimum_fill as usize {
1277            // Set totalBytesToCopyRemaining to maxAlignedBytes − pullIntoDescriptor’s bytes filled.
1278            total_bytes_to_copy_remaining =
1279                max_aligned_bytes - (pull_into_descriptor.bytes_filled.get() as usize);
1280
1281            // Set ready to true.
1282            ready = true;
1283        }
1284
1285        // Let queue be controller.[[queue]].
1286        let mut queue = self.queue.borrow_mut();
1287
1288        // While totalBytesToCopyRemaining > 0,
1289        while total_bytes_to_copy_remaining > 0 {
1290            // Let headOfQueue be queue[0].
1291            let head_of_queue = queue.front_mut().unwrap();
1292
1293            // Let bytesToCopy be min(totalBytesToCopyRemaining, headOfQueue’s byte length).
1294            let bytes_to_copy = total_bytes_to_copy_remaining.min(head_of_queue.byte_length);
1295
1296            // Let destStart be pullIntoDescriptor’s byte offset + pullIntoDescriptor’s bytes filled.
1297            let dest_start =
1298                pull_into_descriptor.byte_offset + pull_into_descriptor.bytes_filled.get();
1299
1300            // Let descriptorBuffer be pullIntoDescriptor’s buffer.
1301            let descriptor_buffer = &pull_into_descriptor.buffer;
1302
1303            // Let queueBuffer be headOfQueue’s buffer.
1304            let queue_buffer = &head_of_queue.buffer;
1305
1306            // Let queueByteOffset be headOfQueue’s byte offset.
1307            let queue_byte_offset = head_of_queue.byte_offset;
1308
1309            // Assert: ! CanCopyDataBlockBytes(descriptorBuffer, destStart,
1310            // queueBuffer, queueByteOffset, bytesToCopy) is true.
1311            assert!(descriptor_buffer.can_copy_data_block_bytes(
1312                cx,
1313                dest_start as usize,
1314                queue_buffer,
1315                queue_byte_offset,
1316                bytes_to_copy
1317            ));
1318
1319            // Perform ! CopyDataBlockBytes(descriptorBuffer.[[ArrayBufferData]], destStart,
1320            // queueBuffer.[[ArrayBufferData]], queueByteOffset, bytesToCopy).
1321            descriptor_buffer.copy_data_block_bytes(
1322                cx,
1323                dest_start as usize,
1324                queue_buffer,
1325                queue_byte_offset,
1326                bytes_to_copy,
1327            );
1328
1329            // If headOfQueue’s byte length is bytesToCopy,
1330            if head_of_queue.byte_length == bytes_to_copy {
1331                // Remove queue[0].
1332                queue.pop_front().unwrap();
1333            } else {
1334                // Set headOfQueue’s byte offset to headOfQueue’s byte offset + bytesToCopy.
1335                head_of_queue.byte_offset += bytes_to_copy;
1336
1337                // Set headOfQueue’s byte length to headOfQueue’s byte length − bytesToCopy.
1338                head_of_queue.byte_length -= bytes_to_copy;
1339            }
1340
1341            // Set controller.[[queueTotalSize]] to controller.[[queueTotalSize]] − bytesToCopy.
1342            self.queue_total_size
1343                .set(self.queue_total_size.get() - (bytes_to_copy as f64));
1344
1345            // Perform ! ReadableByteStreamControllerFillHeadPullIntoDescriptor(
1346            // controller, bytesToCopy, pullIntoDescriptor).
1347            self.fill_head_pull_into_descriptor(bytes_to_copy as u64, pull_into_descriptor);
1348
1349            // Set totalBytesToCopyRemaining to totalBytesToCopyRemaining − bytesToCopy.
1350            total_bytes_to_copy_remaining -= bytes_to_copy;
1351        }
1352
1353        // If ready is false,
1354        if !ready {
1355            // Assert: controller.[[queueTotalSize]] is 0.
1356            assert!(self.queue_total_size.get() == 0.0);
1357
1358            // Assert: pullIntoDescriptor’s bytes filled > 0.
1359            assert!(pull_into_descriptor.bytes_filled.get() > 0);
1360
1361            // Assert: pullIntoDescriptor’s bytes filled < pullIntoDescriptor’s minimum fill.
1362            assert!(pull_into_descriptor.bytes_filled.get() < pull_into_descriptor.minimum_fill);
1363        }
1364
1365        // Return ready.
1366        ready
1367    }
1368
1369    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-fill-head-pull-into-descriptor>
1370    pub(crate) fn fill_head_pull_into_descriptor(
1371        &self,
1372        bytes_copied: u64,
1373        pull_into_descriptor: &PullIntoDescriptor,
1374    ) {
1375        // Assert: either controller.[[pendingPullIntos]] is empty,
1376        // or controller.[[pendingPullIntos]][0] is pullIntoDescriptor.
1377        {
1378            let pending_pull_intos = self.pending_pull_intos.borrow();
1379            assert!(
1380                pending_pull_intos.is_empty() ||
1381                    pending_pull_intos.first().unwrap() == pull_into_descriptor
1382            );
1383        }
1384
1385        // Assert: controller.[[byobRequest]] is null.
1386        assert!(self.byob_request.get().is_none());
1387
1388        // Set pullIntoDescriptor’s bytes filled to bytes filled + size.
1389        pull_into_descriptor
1390            .bytes_filled
1391            .set(pull_into_descriptor.bytes_filled.get() + bytes_copied);
1392    }
1393
1394    /// <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamcontrollerenqueuedetachedpullintotoqueue>
1395    pub(crate) fn enqueue_detached_pull_into_to_queue(
1396        &self,
1397        cx: SafeJSContext,
1398        can_gc: CanGc,
1399    ) -> Fallible<()> {
1400        // first_descriptor: &PullIntoDescriptor,
1401        let pending_pull_intos = self.pending_pull_intos.borrow();
1402        let first_descriptor = pending_pull_intos.first().unwrap();
1403
1404        // Assert: pullIntoDescriptor’s reader type is "none".
1405        assert!(first_descriptor.reader_type.is_none());
1406
1407        // If pullIntoDescriptor’s bytes filled > 0, perform ?
1408        // ReadableByteStreamControllerEnqueueClonedChunkToQueue(controller,
1409        // pullIntoDescriptor’s buffer, pullIntoDescriptor’s byte offset, pullIntoDescriptor’s bytes filled).
1410
1411        if first_descriptor.bytes_filled.get() > 0 {
1412            self.enqueue_cloned_chunk_to_queue(
1413                cx,
1414                &first_descriptor.buffer,
1415                first_descriptor.byte_offset,
1416                first_descriptor.bytes_filled.get(),
1417                can_gc,
1418            )?;
1419        }
1420
1421        // needed to drop the borrow and avoid BorrowMutError
1422        drop(pending_pull_intos);
1423
1424        // Perform ! ReadableByteStreamControllerShiftPendingPullInto(controller).
1425        self.shift_pending_pull_into();
1426
1427        Ok(())
1428    }
1429
1430    /// <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamcontrollerenqueueclonedchunktoqueue>
1431    pub(crate) fn enqueue_cloned_chunk_to_queue(
1432        &self,
1433        cx: SafeJSContext,
1434        buffer: &HeapBufferSource<ArrayBufferU8>,
1435        byte_offset: u64,
1436        byte_length: u64,
1437        can_gc: CanGc,
1438    ) -> Fallible<()> {
1439        // Let cloneResult be CloneArrayBuffer(buffer, byteOffset, byteLength, %ArrayBuffer%).
1440        if let Some(clone_result) =
1441            buffer.clone_array_buffer(cx, byte_offset as usize, byte_length as usize)
1442        {
1443            // Perform ! ReadableByteStreamControllerEnqueueChunkToQueue
1444            // (controller, cloneResult.[[Value]], 0, byteLength).
1445            self.enqueue_chunk_to_queue(clone_result, 0, byte_length as usize);
1446
1447            Ok(())
1448        } else {
1449            // If cloneResult is an abrupt completion,
1450
1451            // Perform ! ReadableByteStreamControllerError(controller, cloneResult.[[Value]]).
1452            rooted!(in(*cx) let mut rval = UndefinedValue());
1453            let error = Error::Type("can not clone array buffer".to_owned());
1454            error
1455                .clone()
1456                .to_jsval(cx, &self.global(), rval.handle_mut(), can_gc);
1457            self.error(rval.handle(), can_gc);
1458
1459            // Return cloneResult.
1460            Err(error)
1461        }
1462    }
1463
1464    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-enqueue-chunk-to-queue>
1465    pub(crate) fn enqueue_chunk_to_queue(
1466        &self,
1467        buffer: HeapBufferSource<ArrayBufferU8>,
1468        byte_offset: usize,
1469        byte_length: usize,
1470    ) {
1471        // Let entry be a new ReadableByteStreamQueueEntry object.
1472        let entry = QueueEntry::new(buffer, byte_offset, byte_length);
1473
1474        // Append entry to controller.[[queue]].
1475        self.queue.borrow_mut().push_back(entry);
1476
1477        // Set controller.[[queueTotalSize]] to controller.[[queueTotalSize]] + byteLength.
1478        self.queue_total_size
1479            .set(self.queue_total_size.get() + byte_length as f64);
1480    }
1481
1482    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-shift-pending-pull-into>
1483    pub(crate) fn shift_pending_pull_into(&self) -> PullIntoDescriptor {
1484        // Assert: controller.[[byobRequest]] is null.
1485        assert!(self.byob_request.get().is_none());
1486
1487        // Let descriptor be controller.[[pendingPullIntos]][0].
1488        // Remove descriptor from controller.[[pendingPullIntos]].
1489        // Return descriptor.
1490        self.pending_pull_intos.borrow_mut().remove(0)
1491    }
1492
1493    /// <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamcontrollerprocessreadrequestsusingqueue>
1494    pub(crate) fn process_read_requests_using_queue(
1495        &self,
1496        cx: SafeJSContext,
1497        can_gc: CanGc,
1498    ) -> Fallible<()> {
1499        // Let reader be controller.[[stream]].[[reader]].
1500        // Assert: reader implements ReadableStreamDefaultReader.
1501        let reader = self.stream.get().unwrap().get_default_reader();
1502
1503        // Step 3
1504        reader.process_read_requests(cx, DomRoot::from_ref(self), can_gc)
1505    }
1506
1507    /// <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamcontrollerfillreadrequestfromqueue>
1508    pub(crate) fn fill_read_request_from_queue(
1509        &self,
1510        cx: SafeJSContext,
1511        read_request: &ReadRequest,
1512        can_gc: CanGc,
1513    ) -> Fallible<()> {
1514        // Assert: controller.[[queueTotalSize]] > 0.
1515        assert!(self.queue_total_size.get() > 0.0);
1516        // Also assert that the queue has a non-zero length;
1517        assert!(!self.queue.borrow().is_empty());
1518
1519        // Let entry be controller.[[queue]][0].
1520        // Remove entry from controller.[[queue]].
1521        let entry = self.remove_entry();
1522
1523        // Set controller.[[queueTotalSize]] to controller.[[queueTotalSize]] − entry’s byte length.
1524        self.queue_total_size
1525            .set(self.queue_total_size.get() - entry.byte_length as f64);
1526
1527        // Perform ! ReadableByteStreamControllerHandleQueueDrain(controller).
1528        self.handle_queue_drain(can_gc);
1529
1530        // Let view be ! Construct(%Uint8Array%, « entry’s buffer, entry’s byte offset, entry’s byte length »).
1531        let view = create_buffer_source_with_constructor(
1532            cx,
1533            &Constructor::Name(Type::Uint8),
1534            &entry.buffer,
1535            entry.byte_offset,
1536            entry.byte_length,
1537        )?;
1538
1539        // Perform readRequest’s chunk steps, given view.
1540        let result = RootedTraceableBox::new(Heap::default());
1541        rooted!(in(*cx) let mut view_value = UndefinedValue());
1542        view.get_buffer_view_value(cx, view_value.handle_mut());
1543        result.set(*view_value);
1544
1545        read_request.chunk_steps(result, can_gc);
1546
1547        Ok(())
1548    }
1549
1550    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-handle-queue-drain>
1551    pub(crate) fn handle_queue_drain(&self, can_gc: CanGc) {
1552        // Assert: controller.[[stream]].[[state]] is "readable".
1553        assert!(self.stream.get().unwrap().is_readable());
1554
1555        // If controller.[[queueTotalSize]] is 0 and controller.[[closeRequested]] is true,
1556        if self.queue_total_size.get() == 0.0 && self.close_requested.get() {
1557            // Perform ! ReadableByteStreamControllerClearAlgorithms(controller).
1558            self.clear_algorithms();
1559
1560            // Perform ! ReadableStreamClose(controller.[[stream]]).
1561            self.stream.get().unwrap().close(can_gc);
1562        } else {
1563            // Perform ! ReadableByteStreamControllerCallPullIfNeeded(controller).
1564            self.call_pull_if_needed(can_gc);
1565        }
1566    }
1567
1568    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-call-pull-if-needed>
1569    pub(crate) fn call_pull_if_needed(&self, can_gc: CanGc) {
1570        // Let shouldPull be ! ReadableByteStreamControllerShouldCallPull(controller).
1571        let should_pull = self.should_call_pull();
1572        // If shouldPull is false, return.
1573        if !should_pull {
1574            return;
1575        }
1576
1577        // If controller.[[pulling]] is true,
1578        if self.pulling.get() {
1579            // Set controller.[[pullAgain]] to true.
1580            self.pull_again.set(true);
1581
1582            // Return.
1583            return;
1584        }
1585
1586        // Assert: controller.[[pullAgain]] is false.
1587        assert!(!self.pull_again.get());
1588
1589        // Set controller.[[pulling]] to true.
1590        self.pulling.set(true);
1591
1592        // Let pullPromise be the result of performing controller.[[pullAlgorithm]].
1593        // Continues into the resolve and reject handling of the native handler.
1594        let global = self.global();
1595        let rooted_controller = DomRoot::from_ref(self);
1596        let controller = Controller::ReadableByteStreamController(rooted_controller.clone());
1597
1598        if let Some(underlying_source) = self.underlying_source.get() {
1599            let handler = PromiseNativeHandler::new(
1600                &global,
1601                Some(Box::new(PullAlgorithmFulfillmentHandler {
1602                    controller: Dom::from_ref(&rooted_controller),
1603                })),
1604                Some(Box::new(PullAlgorithmRejectionHandler {
1605                    controller: Dom::from_ref(&rooted_controller),
1606                })),
1607                can_gc,
1608            );
1609
1610            let realm = enter_realm(&*global);
1611            let comp = InRealm::Entered(&realm);
1612            let result = underlying_source
1613                .call_pull_algorithm(controller, &global, can_gc)
1614                .unwrap_or_else(|| {
1615                    let promise = Promise::new(&global, can_gc);
1616                    promise.resolve_native(&(), can_gc);
1617                    Ok(promise)
1618                });
1619            let promise = result.unwrap_or_else(|error| {
1620                let cx = GlobalScope::get_cx();
1621                rooted!(in(*cx) let mut rval = UndefinedValue());
1622                // TODO: check if `self.global()` is the right globalscope.
1623                error
1624                    .clone()
1625                    .to_jsval(cx, &self.global(), rval.handle_mut(), can_gc);
1626                let promise = Promise::new(&global, can_gc);
1627                promise.reject_native(&rval.handle(), can_gc);
1628                promise
1629            });
1630            promise.append_native_handler(&handler, comp, can_gc);
1631        }
1632    }
1633
1634    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-should-call-pull>
1635    fn should_call_pull(&self) -> bool {
1636        // Let stream be controller.[[stream]].
1637        // Note: the spec does not assert that stream is not undefined here,
1638        // so we return false if it is.
1639        let stream = self.stream.get().unwrap();
1640
1641        // If stream.[[state]] is not "readable", return false.
1642        if !stream.is_readable() {
1643            return false;
1644        }
1645
1646        // If controller.[[closeRequested]] is true, return false.
1647        if self.close_requested.get() {
1648            return false;
1649        }
1650
1651        // If controller.[[started]] is false, return false.
1652        if !self.started.get() {
1653            return false;
1654        }
1655
1656        // If ! ReadableStreamHasDefaultReader(stream) is true and ! ReadableStreamGetNumReadRequests(stream) > 0
1657        // , return true.
1658        if stream.has_default_reader() && stream.get_num_read_requests() > 0 {
1659            return true;
1660        }
1661
1662        // If ! ReadableStreamHasBYOBReader(stream) is true and ! ReadableStreamGetNumReadIntoRequests(stream) > 0
1663        // , return true.
1664        if stream.has_byob_reader() && stream.get_num_read_into_requests() > 0 {
1665            return true;
1666        }
1667
1668        // Let desiredSize be ! ReadableByteStreamControllerGetDesiredSize(controller).
1669        let desired_size = self.get_desired_size();
1670
1671        // Assert: desiredSize is not null.
1672        assert!(desired_size.is_some());
1673
1674        // If desiredSize > 0, return true.
1675        if desired_size.unwrap() > 0. {
1676            return true;
1677        }
1678
1679        // Return false.
1680        false
1681    }
1682    /// <https://streams.spec.whatwg.org/#set-up-readable-byte-stream-controller>
1683    pub(crate) fn setup(
1684        &self,
1685        global: &GlobalScope,
1686        stream: DomRoot<ReadableStream>,
1687        can_gc: CanGc,
1688    ) -> Fallible<()> {
1689        // Assert: stream.[[controller]] is undefined.
1690        stream.assert_no_controller();
1691
1692        // If autoAllocateChunkSize is not undefined,
1693        if self.auto_allocate_chunk_size.is_some() {
1694            // Assert: ! IsInteger(autoAllocateChunkSize) is true. Implicit
1695            // Assert: autoAllocateChunkSize is positive. (Implicit by type.)
1696        }
1697
1698        // Set controller.[[stream]] to stream.
1699        self.stream.set(Some(&stream));
1700
1701        // Set controller.[[pullAgain]] and controller.[[pulling]] to false.
1702        self.pull_again.set(false);
1703        self.pulling.set(false);
1704
1705        // Set controller.[[byobRequest]] to null.
1706        self.byob_request.set(None);
1707
1708        // Perform ! ResetQueue(controller).
1709        self.reset_queue();
1710
1711        // Set controller.[[closeRequested]] and controller.[[started]] to false.
1712        self.close_requested.set(false);
1713        self.started.set(false);
1714
1715        // Set controller.[[strategyHWM]] to highWaterMark.
1716        // Set controller.[[pullAlgorithm]] to pullAlgorithm.
1717        // Set controller.[[cancelAlgorithm]] to cancelAlgorithm.
1718        // Set controller.[[autoAllocateChunkSize]] to autoAllocateChunkSize.
1719        // Set controller.[[pendingPullIntos]] to a new empty list.
1720        // Note: the above steps are done in `new`.
1721
1722        // Set stream.[[controller]] to controller.
1723        let rooted_byte_controller = DomRoot::from_ref(self);
1724        stream.set_byte_controller(&rooted_byte_controller);
1725
1726        if let Some(underlying_source) = rooted_byte_controller.underlying_source.get() {
1727            // Let startResult be the result of performing startAlgorithm. (This might throw an exception.)
1728            let start_result = underlying_source
1729                .call_start_algorithm(
1730                    Controller::ReadableByteStreamController(rooted_byte_controller.clone()),
1731                    can_gc,
1732                )
1733                .unwrap_or_else(|| {
1734                    let promise = Promise::new(global, can_gc);
1735                    promise.resolve_native(&(), can_gc);
1736                    Ok(promise)
1737                });
1738
1739            // Let startPromise be a promise resolved with startResult.
1740            let start_promise = start_result?;
1741
1742            // Upon fulfillment of startPromise, Upon rejection of startPromise with reason r,
1743            let handler = PromiseNativeHandler::new(
1744                global,
1745                Some(Box::new(StartAlgorithmFulfillmentHandler {
1746                    controller: Dom::from_ref(&rooted_byte_controller),
1747                })),
1748                Some(Box::new(StartAlgorithmRejectionHandler {
1749                    controller: Dom::from_ref(&rooted_byte_controller),
1750                })),
1751                can_gc,
1752            );
1753            let realm = enter_realm(global);
1754            let comp = InRealm::Entered(&realm);
1755            start_promise.append_native_handler(&handler, comp, can_gc);
1756        };
1757
1758        Ok(())
1759    }
1760
1761    // <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamcontroller-releasesteps
1762    pub(crate) fn perform_release_steps(&self) -> Fallible<()> {
1763        // If this.[[pendingPullIntos]] is not empty,
1764        let mut pending_pull_intos = self.pending_pull_intos.borrow_mut();
1765        if !pending_pull_intos.is_empty() {
1766            // Let firstPendingPullInto be this.[[pendingPullIntos]][0].
1767            let mut first_pending_pull_into = pending_pull_intos.remove(0);
1768
1769            // Set firstPendingPullInto’s reader type to "none".
1770            first_pending_pull_into.reader_type = None;
1771
1772            // Set this.[[pendingPullIntos]] to the list « firstPendingPullInto »
1773            pending_pull_intos.clear();
1774            pending_pull_intos.push(first_pending_pull_into);
1775        }
1776        Ok(())
1777    }
1778
1779    /// <https://streams.spec.whatwg.org/#rbs-controller-private-cancel>
1780    pub(crate) fn perform_cancel_steps(
1781        &self,
1782        cx: SafeJSContext,
1783        global: &GlobalScope,
1784        reason: SafeHandleValue,
1785        can_gc: CanGc,
1786    ) -> Rc<Promise> {
1787        // Perform ! ReadableByteStreamControllerClearPendingPullIntos(this).
1788        self.clear_pending_pull_intos();
1789
1790        // Perform ! ResetQueue(this).
1791        self.reset_queue();
1792
1793        let underlying_source = self
1794            .underlying_source
1795            .get()
1796            .expect("Controller should have a source when the cancel steps are called into.");
1797
1798        // Let result be the result of performing this.[[cancelAlgorithm]], passing in reason.
1799        let result = underlying_source
1800            .call_cancel_algorithm(cx, global, reason, can_gc)
1801            .unwrap_or_else(|| {
1802                let promise = Promise::new(global, can_gc);
1803                promise.resolve_native(&(), can_gc);
1804                Ok(promise)
1805            });
1806
1807        let promise = result.unwrap_or_else(|error| {
1808            let cx = GlobalScope::get_cx();
1809            rooted!(in(*cx) let mut rval = UndefinedValue());
1810            error
1811                .clone()
1812                .to_jsval(cx, global, rval.handle_mut(), can_gc);
1813            let promise = Promise::new(global, can_gc);
1814            promise.reject_native(&rval.handle(), can_gc);
1815            promise
1816        });
1817
1818        // Perform ! ReadableByteStreamControllerClearAlgorithms(this).
1819        self.clear_algorithms();
1820
1821        // Return result(the promise).
1822        promise
1823    }
1824
1825    /// <https://streams.spec.whatwg.org/#rbs-controller-private-pull>
1826    pub(crate) fn perform_pull_steps(
1827        &self,
1828        cx: SafeJSContext,
1829        read_request: &ReadRequest,
1830        can_gc: CanGc,
1831    ) {
1832        // Let stream be this.[[stream]].
1833        let stream = self.stream.get().unwrap();
1834
1835        // Assert: ! ReadableStreamHasDefaultReader(stream) is true.
1836        assert!(stream.has_default_reader());
1837
1838        // If this.[[queueTotalSize]] > 0,
1839        if self.queue_total_size.get() > 0.0 {
1840            // Assert: ! ReadableStreamGetNumReadRequests(stream) is 0.
1841            assert_eq!(stream.get_num_read_requests(), 0);
1842
1843            // Perform ! ReadableByteStreamControllerFillReadRequestFromQueue(this, readRequest).
1844            let _ = self.fill_read_request_from_queue(cx, read_request, can_gc);
1845
1846            // Return.
1847            return;
1848        }
1849
1850        // Let autoAllocateChunkSize be this.[[autoAllocateChunkSize]].
1851        let auto_allocate_chunk_size = self.auto_allocate_chunk_size;
1852
1853        // If autoAllocateChunkSize is not undefined,
1854        if let Some(auto_allocate_chunk_size) = auto_allocate_chunk_size {
1855            // create_array_buffer_with_size
1856            // Let buffer be Construct(%ArrayBuffer%, « autoAllocateChunkSize »).
1857            match create_array_buffer_with_size(cx, auto_allocate_chunk_size as usize) {
1858                Ok(buffer) => {
1859                    // Let pullIntoDescriptor be a new pull-into descriptor with
1860                    // buffer buffer.[[Value]]
1861                    // buffer byte length autoAllocateChunkSize
1862                    // byte offset  0
1863                    // byte length  autoAllocateChunkSize
1864                    // bytes filled  0
1865                    // minimum fill 1
1866                    // element size 1
1867                    // view constructor %Uint8Array%
1868                    // reader type  "default"
1869                    let pull_into_descriptor = PullIntoDescriptor {
1870                        buffer,
1871                        buffer_byte_length: auto_allocate_chunk_size,
1872                        byte_length: auto_allocate_chunk_size,
1873                        byte_offset: 0,
1874                        bytes_filled: Cell::new(0),
1875                        minimum_fill: 1,
1876                        element_size: 1,
1877                        view_constructor: Constructor::Name(Type::Uint8),
1878                        reader_type: Some(ReaderType::Default),
1879                    };
1880
1881                    // Append pullIntoDescriptor to this.[[pendingPullIntos]].
1882                    self.pending_pull_intos
1883                        .borrow_mut()
1884                        .push(pull_into_descriptor);
1885                },
1886                Err(error) => {
1887                    // If buffer is an abrupt completion,
1888                    // Perform readRequest’s error steps, given buffer.[[Value]].
1889
1890                    rooted!(in(*cx) let mut rval = UndefinedValue());
1891                    error
1892                        .clone()
1893                        .to_jsval(cx, &self.global(), rval.handle_mut(), can_gc);
1894                    read_request.error_steps(rval.handle(), can_gc);
1895
1896                    // Return.
1897                    return;
1898                },
1899            }
1900        }
1901
1902        // Perform ! ReadableStreamAddReadRequest(stream, readRequest).
1903        stream.add_read_request(read_request);
1904
1905        // Perform ! ReadableByteStreamControllerCallPullIfNeeded(this).
1906        self.call_pull_if_needed(can_gc);
1907    }
1908
1909    /// Setting the JS object after the heap has settled down.
1910    pub(crate) fn set_underlying_source_this_object(&self, this_object: HandleObject) {
1911        if let Some(underlying_source) = self.underlying_source.get() {
1912            underlying_source.set_underlying_source_this_object(this_object);
1913        }
1914    }
1915
1916    pub(crate) fn remove_entry(&self) -> QueueEntry {
1917        self.queue
1918            .borrow_mut()
1919            .pop_front()
1920            .expect("Reader must have read request when remove is called into.")
1921    }
1922
1923    pub(crate) fn get_queue_total_size(&self) -> f64 {
1924        self.queue_total_size.get()
1925    }
1926}
1927
1928impl ReadableByteStreamControllerMethods<crate::DomTypeHolder> for ReadableByteStreamController {
1929    /// <https://streams.spec.whatwg.org/#rbs-controller-byob-request>
1930    fn GetByobRequest(
1931        &self,
1932        can_gc: CanGc,
1933    ) -> Fallible<Option<DomRoot<ReadableStreamBYOBRequest>>> {
1934        let cx = GlobalScope::get_cx();
1935        // Return ! ReadableByteStreamControllerGetBYOBRequest(this).
1936        self.get_byob_request(cx, can_gc)
1937    }
1938
1939    /// <https://streams.spec.whatwg.org/#rbs-controller-desired-size>
1940    fn GetDesiredSize(&self) -> Option<f64> {
1941        // Return ! ReadableByteStreamControllerGetDesiredSize(this).
1942        self.get_desired_size()
1943    }
1944
1945    /// <https://streams.spec.whatwg.org/#rbs-controller-close>
1946    fn Close(&self, can_gc: CanGc) -> Fallible<()> {
1947        let cx = GlobalScope::get_cx();
1948        // If this.[[closeRequested]] is true, throw a TypeError exception.
1949        if self.close_requested.get() {
1950            return Err(Error::Type("closeRequested is true".to_owned()));
1951        }
1952
1953        // If this.[[stream]].[[state]] is not "readable", throw a TypeError exception.
1954        if !self.stream.get().unwrap().is_readable() {
1955            return Err(Error::Type("stream is not readable".to_owned()));
1956        }
1957
1958        // Perform ? ReadableByteStreamControllerClose(this).
1959        self.close(cx, can_gc)
1960    }
1961
1962    /// <https://streams.spec.whatwg.org/#rbs-controller-enqueue>
1963    fn Enqueue(
1964        &self,
1965        chunk: js::gc::CustomAutoRooterGuard<js::typedarray::ArrayBufferView>,
1966        can_gc: CanGc,
1967    ) -> Fallible<()> {
1968        let cx = GlobalScope::get_cx();
1969
1970        let chunk = HeapBufferSource::<ArrayBufferViewU8>::from_view(chunk);
1971
1972        // If chunk.[[ByteLength]] is 0, throw a TypeError exception.
1973        if chunk.byte_length() == 0 {
1974            return Err(Error::Type("chunk.ByteLength is 0".to_owned()));
1975        }
1976
1977        // If chunk.[[ViewedArrayBuffer]].[[ByteLength]] is 0, throw a TypeError exception.
1978        if chunk.viewed_buffer_array_byte_length(cx) == 0 {
1979            return Err(Error::Type(
1980                "chunk.ViewedArrayBuffer.ByteLength is 0".to_owned(),
1981            ));
1982        }
1983
1984        // If this.[[closeRequested]] is true, throw a TypeError exception.
1985        if self.close_requested.get() {
1986            return Err(Error::Type("closeRequested is true".to_owned()));
1987        }
1988
1989        // If this.[[stream]].[[state]] is not "readable", throw a TypeError exception.
1990        if !self.stream.get().unwrap().is_readable() {
1991            return Err(Error::Type("stream is not readable".to_owned()));
1992        }
1993
1994        // Return ? ReadableByteStreamControllerEnqueue(this, chunk).
1995        self.enqueue(cx, chunk, can_gc)
1996    }
1997
1998    /// <https://streams.spec.whatwg.org/#rbs-controller-error>
1999    fn Error(&self, _cx: SafeJSContext, e: SafeHandleValue, can_gc: CanGc) -> Fallible<()> {
2000        // Perform ! ReadableByteStreamControllerError(this, e).
2001        self.error(e, can_gc);
2002        Ok(())
2003    }
2004}