script/dom/
readablebytestreamcontroller.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4
5use std::cell::Cell;
6use std::cmp::min;
7use std::collections::VecDeque;
8use std::rc::Rc;
9
10use dom_struct::dom_struct;
11use js::jsapi::{Heap, Type};
12use js::jsval::UndefinedValue;
13use js::rust::{HandleObject, HandleValue as SafeHandleValue, HandleValue};
14use js::typedarray::{ArrayBufferU8, ArrayBufferViewU8};
15
16use super::bindings::buffer_source::HeapBufferSource;
17use super::bindings::cell::DomRefCell;
18use super::bindings::codegen::Bindings::ReadableStreamBYOBReaderBinding::ReadableStreamBYOBReaderReadOptions;
19use super::bindings::reflector::reflect_dom_object;
20use super::bindings::root::Dom;
21use super::readablestreambyobreader::ReadIntoRequest;
22use super::readablestreamdefaultreader::ReadRequest;
23use super::underlyingsourcecontainer::{UnderlyingSourceContainer, UnderlyingSourceType};
24use crate::dom::bindings::buffer_source::{
25    Constructor, byte_size, create_array_buffer_with_size, create_buffer_source_with_constructor,
26};
27use crate::dom::bindings::codegen::Bindings::ReadableByteStreamControllerBinding::ReadableByteStreamControllerMethods;
28use crate::dom::bindings::codegen::UnionTypes::ReadableStreamDefaultControllerOrReadableByteStreamController as Controller;
29use crate::dom::bindings::error::{Error, ErrorToJsval, Fallible};
30use crate::dom::bindings::reflector::{DomGlobal, Reflector};
31use crate::dom::bindings::root::{DomRoot, MutNullableDom};
32use crate::dom::bindings::trace::RootedTraceableBox;
33use crate::dom::globalscope::GlobalScope;
34use crate::dom::promise::Promise;
35use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
36use crate::dom::readablestream::ReadableStream;
37use crate::dom::readablestreambyobrequest::ReadableStreamBYOBRequest;
38use crate::realms::{InRealm, enter_realm};
39use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
40
41/// <https://streams.spec.whatwg.org/#readable-byte-stream-queue-entry>
42#[derive(JSTraceable, MallocSizeOf)]
43pub(crate) struct QueueEntry {
44    /// <https://streams.spec.whatwg.org/#readable-byte-stream-queue-entry-buffer>
45    #[ignore_malloc_size_of = "HeapBufferSource"]
46    buffer: HeapBufferSource<ArrayBufferU8>,
47    /// <https://streams.spec.whatwg.org/#readable-byte-stream-queue-entry-byte-offset>
48    byte_offset: usize,
49    /// <https://streams.spec.whatwg.org/#readable-byte-stream-queue-entry-byte-length>
50    byte_length: usize,
51}
52
53impl QueueEntry {
54    pub(crate) fn new(
55        buffer: HeapBufferSource<ArrayBufferU8>,
56        byte_offset: usize,
57        byte_length: usize,
58    ) -> QueueEntry {
59        QueueEntry {
60            buffer,
61            byte_offset,
62            byte_length,
63        }
64    }
65}
66
67#[derive(Debug, Eq, JSTraceable, MallocSizeOf, PartialEq)]
68pub(crate) enum ReaderType {
69    /// <https://streams.spec.whatwg.org/#readablestreambyobreader>
70    Byob,
71    /// <https://streams.spec.whatwg.org/#readablestreamdefaultreader>
72    Default,
73}
74
75/// <https://streams.spec.whatwg.org/#pull-into-descriptor>
76#[derive(Eq, JSTraceable, MallocSizeOf, PartialEq)]
77pub(crate) struct PullIntoDescriptor {
78    #[ignore_malloc_size_of = "HeapBufferSource"]
79    /// <https://streams.spec.whatwg.org/#pull-into-descriptor-buffer>
80    buffer: HeapBufferSource<ArrayBufferU8>,
81    /// <https://streams.spec.whatwg.org/#pull-into-descriptor-buffer-byte-length>
82    buffer_byte_length: u64,
83    /// <https://streams.spec.whatwg.org/#pull-into-descriptor-byte-offset>
84    byte_offset: u64,
85    /// <https://streams.spec.whatwg.org/#pull-into-descriptor-byte-length>
86    byte_length: u64,
87    /// <https://streams.spec.whatwg.org/#pull-into-descriptor-bytes-filled>
88    bytes_filled: Cell<u64>,
89    /// <https://streams.spec.whatwg.org/#pull-into-descriptor-minimum-fill>
90    minimum_fill: u64,
91    /// <https://streams.spec.whatwg.org/#pull-into-descriptor-element-size>
92    element_size: u64,
93    /// <https://streams.spec.whatwg.org/#pull-into-descriptor-view-constructor>
94    view_constructor: Constructor,
95    /// <https://streams.spec.whatwg.org/#pull-into-descriptor-reader-type>
96    reader_type: Option<ReaderType>,
97}
98
99/// The fulfillment handler for
100/// <https://streams.spec.whatwg.org/#dom-underlyingsource-start>
101#[derive(Clone, JSTraceable, MallocSizeOf)]
102#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
103struct StartAlgorithmFulfillmentHandler {
104    controller: Dom<ReadableByteStreamController>,
105}
106
107impl Callback for StartAlgorithmFulfillmentHandler {
108    /// Continuation of <https://streams.spec.whatwg.org/#set-up-readable-byte-stream-controller>
109    /// Upon fulfillment of startPromise,
110    fn callback(&self, _cx: SafeJSContext, _v: HandleValue, _realm: InRealm, can_gc: CanGc) {
111        // Set controller.[[started]] to true.
112        self.controller.started.set(true);
113
114        // Assert: controller.[[pulling]] is false.
115        assert!(!self.controller.pulling.get());
116
117        // Assert: controller.[[pullAgain]] is false.
118        assert!(!self.controller.pull_again.get());
119
120        // Perform ! ReadableByteStreamControllerCallPullIfNeeded(controller).
121        self.controller.call_pull_if_needed(can_gc);
122    }
123}
124
125/// The rejection handler for
126/// <https://streams.spec.whatwg.org/#dom-underlyingsource-start>
127#[derive(Clone, JSTraceable, MallocSizeOf)]
128#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
129struct StartAlgorithmRejectionHandler {
130    controller: Dom<ReadableByteStreamController>,
131}
132
133impl Callback for StartAlgorithmRejectionHandler {
134    /// Continuation of <https://streams.spec.whatwg.org/#set-up-readable-byte-stream-controller>
135    /// Upon rejection of startPromise with reason r,
136    fn callback(&self, _cx: SafeJSContext, v: HandleValue, _realm: InRealm, can_gc: CanGc) {
137        // Perform ! ReadableByteStreamControllerError(controller, r).
138        self.controller.error(v, can_gc);
139    }
140}
141
142/// The fulfillment handler for
143/// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-call-pull-if-needed>
144#[derive(Clone, JSTraceable, MallocSizeOf)]
145#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
146struct PullAlgorithmFulfillmentHandler {
147    controller: Dom<ReadableByteStreamController>,
148}
149
150impl Callback for PullAlgorithmFulfillmentHandler {
151    /// Continuation of <https://streams.spec.whatwg.org/#readable-byte-stream-controller-call-pull-if-needed>
152    /// Upon fulfillment of pullPromise
153    fn callback(&self, _cx: SafeJSContext, _v: HandleValue, _realm: InRealm, can_gc: CanGc) {
154        // Set controller.[[pulling]] to false.
155        self.controller.pulling.set(false);
156
157        // If controller.[[pullAgain]] is true,
158        if self.controller.pull_again.get() {
159            // Set controller.[[pullAgain]] to false.
160            self.controller.pull_again.set(false);
161
162            // Perform ! ReadableByteStreamControllerCallPullIfNeeded(controller).
163            self.controller.call_pull_if_needed(can_gc);
164        }
165    }
166}
167
168/// The rejection handler for
169/// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-call-pull-if-needed>
170#[derive(Clone, JSTraceable, MallocSizeOf)]
171#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
172struct PullAlgorithmRejectionHandler {
173    controller: Dom<ReadableByteStreamController>,
174}
175
176impl Callback for PullAlgorithmRejectionHandler {
177    /// Continuation of <https://streams.spec.whatwg.org/#readable-stream-byte-controller-call-pull-if-needed>
178    /// Upon rejection of pullPromise with reason e.
179    fn callback(&self, _cx: SafeJSContext, v: HandleValue, _realm: InRealm, can_gc: CanGc) {
180        // Perform ! ReadableByteStreamControllerError(controller, e).
181        self.controller.error(v, can_gc);
182    }
183}
184
185/// <https://streams.spec.whatwg.org/#readablebytestreamcontroller>
186#[dom_struct]
187pub(crate) struct ReadableByteStreamController {
188    reflector_: Reflector,
189    /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-autoallocatechunksize>
190    auto_allocate_chunk_size: Option<u64>,
191    /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-stream>
192    stream: MutNullableDom<ReadableStream>,
193    /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-strategyhwm>
194    strategy_hwm: f64,
195    /// A mutable reference to the underlying source is used to implement these two
196    /// internal slots:
197    ///
198    /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-pullalgorithm>
199    /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-cancelalgorithm>
200    underlying_source: MutNullableDom<UnderlyingSourceContainer>,
201    /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-queue>
202    queue: DomRefCell<VecDeque<QueueEntry>>,
203    /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-queuetotalsize>
204    queue_total_size: Cell<f64>,
205    /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-byobrequest>
206    byob_request: MutNullableDom<ReadableStreamBYOBRequest>,
207    /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-pendingpullintos>
208    pending_pull_intos: DomRefCell<Vec<PullIntoDescriptor>>,
209    /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-closerequested>
210    close_requested: Cell<bool>,
211    /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-started>
212    started: Cell<bool>,
213    /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-pulling>
214    pulling: Cell<bool>,
215    /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-pullalgorithm>
216    pull_again: Cell<bool>,
217}
218
219impl ReadableByteStreamController {
220    #[cfg_attr(crown, allow(crown::unrooted_must_root))]
221    fn new_inherited(
222        underlying_source_type: UnderlyingSourceType,
223        strategy_hwm: f64,
224        global: &GlobalScope,
225        can_gc: CanGc,
226    ) -> ReadableByteStreamController {
227        let underlying_source_container =
228            UnderlyingSourceContainer::new(global, underlying_source_type, can_gc);
229        let auto_allocate_chunk_size = underlying_source_container.auto_allocate_chunk_size();
230        ReadableByteStreamController {
231            reflector_: Reflector::new(),
232            byob_request: MutNullableDom::new(None),
233            stream: MutNullableDom::new(None),
234            underlying_source: MutNullableDom::new(Some(&*underlying_source_container)),
235            auto_allocate_chunk_size,
236            pending_pull_intos: DomRefCell::new(Vec::new()),
237            strategy_hwm,
238            close_requested: Default::default(),
239            queue: DomRefCell::new(Default::default()),
240            queue_total_size: Default::default(),
241            started: Default::default(),
242            pulling: Default::default(),
243            pull_again: Default::default(),
244        }
245    }
246
247    #[cfg_attr(crown, allow(crown::unrooted_must_root))]
248    pub(crate) fn new(
249        underlying_source_type: UnderlyingSourceType,
250        strategy_hwm: f64,
251        global: &GlobalScope,
252        can_gc: CanGc,
253    ) -> DomRoot<ReadableByteStreamController> {
254        reflect_dom_object(
255            Box::new(ReadableByteStreamController::new_inherited(
256                underlying_source_type,
257                strategy_hwm,
258                global,
259                can_gc,
260            )),
261            global,
262            can_gc,
263        )
264    }
265
266    pub(crate) fn set_stream(&self, stream: &ReadableStream) {
267        self.stream.set(Some(stream))
268    }
269
270    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-pull-into>
271    pub(crate) fn perform_pull_into(
272        &self,
273        cx: SafeJSContext,
274        read_into_request: &ReadIntoRequest,
275        view: HeapBufferSource<ArrayBufferViewU8>,
276        options: &ReadableStreamBYOBReaderReadOptions,
277        can_gc: CanGc,
278    ) {
279        // Let stream be controller.[[stream]].
280        let stream = self.stream.get().unwrap();
281
282        // Let elementSize be 1.
283        let mut element_size = 1;
284
285        // Let ctor be %DataView%.
286        let mut ctor = Constructor::DataView;
287
288        // If view has a [[TypedArrayName]] internal slot (i.e., it is not a DataView),
289        if view.has_typed_array_name() {
290            // Set elementSize to the element size specified in the
291            // typed array constructors table for view.[[TypedArrayName]].
292            let view_typw = view.get_array_buffer_view_type();
293            element_size = byte_size(view_typw);
294
295            // Set ctor to the constructor specified in the typed array constructors table for view.[[TypedArrayName]].
296            ctor = Constructor::Name(view_typw);
297        }
298
299        // Let minimumFill be min × elementSize.
300        let minimum_fill = options.min * element_size;
301
302        // Assert: minimumFill ≥ 0 and minimumFill ≤ view.[[ByteLength]].
303        assert!(minimum_fill <= (view.byte_length() as u64));
304
305        // Assert: the remainder after dividing minimumFill by elementSize is 0.
306        assert_eq!(minimum_fill % element_size, 0);
307
308        // Let byteOffset be view.[[ByteOffset]].
309        let byte_offset = view.get_byte_offset();
310
311        // Let byteLength be view.[[ByteLength]].
312        let byte_length = view.byte_length();
313
314        // Let bufferResult be TransferArrayBuffer(view.[[ViewedArrayBuffer]]).
315        match view
316            .get_array_buffer_view_buffer(cx)
317            .transfer_array_buffer(cx)
318        {
319            Ok(buffer) => {
320                // Let buffer be bufferResult.[[Value]].
321                // Let pullIntoDescriptor be a new pull-into descriptor with
322                // buffer   buffer
323                // buffer byte length   buffer.[[ArrayBufferByteLength]]
324                // byte offset  byteOffset
325                // byte length  byteLength
326                // bytes filled  0
327                // minimum fill minimumFill
328                // element size elementSize
329                // view constructor ctor
330                // reader type  "byob"
331                let buffer_byte_length = buffer.byte_length();
332                let pull_into_descriptor = PullIntoDescriptor {
333                    buffer,
334                    buffer_byte_length: buffer_byte_length as u64,
335                    byte_offset: byte_offset as u64,
336                    byte_length: byte_length as u64,
337                    bytes_filled: Cell::new(0),
338                    minimum_fill,
339                    element_size,
340                    view_constructor: ctor.clone(),
341                    reader_type: Some(ReaderType::Byob),
342                };
343
344                // If controller.[[pendingPullIntos]] is not empty,
345                {
346                    let mut pending_pull_intos = self.pending_pull_intos.borrow_mut();
347                    if !pending_pull_intos.is_empty() {
348                        // Append pullIntoDescriptor to controller.[[pendingPullIntos]].
349                        pending_pull_intos.push(pull_into_descriptor);
350
351                        // Perform ! ReadableStreamAddReadIntoRequest(stream, readIntoRequest).
352                        stream.add_read_into_request(read_into_request);
353
354                        // Return.
355                        return;
356                    }
357                }
358
359                // If stream.[[state]] is "closed",
360                if stream.is_closed() {
361                    // Let emptyView be ! Construct(ctor, « pullIntoDescriptor’s buffer,
362                    // pullIntoDescriptor’s byte offset, 0 »).
363                    if let Ok(empty_view) = create_buffer_source_with_constructor(
364                        cx,
365                        &ctor,
366                        &pull_into_descriptor.buffer,
367                        pull_into_descriptor.byte_offset as usize,
368                        0,
369                    ) {
370                        // Perform readIntoRequest’s close steps, given emptyView.
371                        let result = RootedTraceableBox::new(Heap::default());
372                        rooted!(in(*cx) let mut view_value = UndefinedValue());
373                        empty_view.get_buffer_view_value(cx, view_value.handle_mut());
374                        result.set(*view_value);
375
376                        read_into_request.close_steps(Some(result), can_gc);
377
378                        // Return.
379                        return;
380                    } else {
381                        return;
382                    }
383                }
384
385                // If controller.[[queueTotalSize]] > 0,
386                if self.queue_total_size.get() > 0.0 {
387                    // If ! ReadableByteStreamControllerFillPullIntoDescriptorFromQueue(
388                    // controller, pullIntoDescriptor) is true,
389                    if self.fill_pull_into_descriptor_from_queue(cx, &pull_into_descriptor) {
390                        // Let filledView be ! ReadableByteStreamControllerConvertPullIntoDescriptor(
391                        // pullIntoDescriptor).
392                        if let Ok(filled_view) =
393                            self.convert_pull_into_descriptor(cx, &pull_into_descriptor)
394                        {
395                            // Perform ! ReadableByteStreamControllerHandleQueueDrain(controller).
396                            self.handle_queue_drain(can_gc);
397
398                            // Perform readIntoRequest’s chunk steps, given filledView.
399                            let result = RootedTraceableBox::new(Heap::default());
400                            rooted!(in(*cx) let mut view_value = UndefinedValue());
401                            filled_view.get_buffer_view_value(cx, view_value.handle_mut());
402                            result.set(*view_value);
403                            read_into_request.chunk_steps(result, can_gc);
404
405                            // Return.
406                            return;
407                        } else {
408                            return;
409                        }
410                    }
411
412                    // If controller.[[closeRequested]] is true,
413                    if self.close_requested.get() {
414                        // Let e be a new TypeError exception.
415                        rooted!(in(*cx) let mut error = UndefinedValue());
416                        Error::Type("close requested".to_owned()).to_jsval(
417                            cx,
418                            &self.global(),
419                            error.handle_mut(),
420                            can_gc,
421                        );
422
423                        // Perform ! ReadableByteStreamControllerError(controller, e).
424                        self.error(error.handle(), can_gc);
425
426                        // Perform readIntoRequest’s error steps, given e.
427                        read_into_request.error_steps(error.handle(), can_gc);
428
429                        // Return.
430                        return;
431                    }
432                }
433
434                // Append pullIntoDescriptor to controller.[[pendingPullIntos]].
435                {
436                    self.pending_pull_intos
437                        .borrow_mut()
438                        .push(pull_into_descriptor);
439                }
440                // Perform ! ReadableStreamAddReadIntoRequest(stream, readIntoRequest).
441                stream.add_read_into_request(read_into_request);
442
443                // Perform ! ReadableByteStreamControllerCallPullIfNeeded(controller).
444                self.call_pull_if_needed(can_gc);
445            },
446            Err(error) => {
447                // If bufferResult is an abrupt completion,
448
449                // Perform readIntoRequest’s error steps, given bufferResult.[[Value]].
450                rooted!(in(*cx) let mut rval = UndefinedValue());
451                error
452                    .clone()
453                    .to_jsval(cx, &self.global(), rval.handle_mut(), can_gc);
454                read_into_request.error_steps(rval.handle(), can_gc);
455
456                // Return.
457            },
458        }
459    }
460
461    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-respond>
462    pub(crate) fn respond(
463        &self,
464        cx: SafeJSContext,
465        bytes_written: u64,
466        can_gc: CanGc,
467    ) -> Fallible<()> {
468        {
469            // Assert: controller.[[pendingPullIntos]] is not empty.
470            let mut pending_pull_intos = self.pending_pull_intos.borrow_mut();
471            assert!(!pending_pull_intos.is_empty());
472
473            // Let firstDescriptor be controller.[[pendingPullIntos]][0].
474            let first_descriptor = pending_pull_intos.first_mut().unwrap();
475
476            // Let state be controller.[[stream]].[[state]].
477            let stream = self.stream.get().unwrap();
478
479            // If state is "closed",
480            if stream.is_closed() {
481                // If bytesWritten is not 0, throw a TypeError exception.
482                if bytes_written != 0 {
483                    return Err(Error::Type(
484                        "bytesWritten not zero on closed stream".to_owned(),
485                    ));
486                }
487            } else {
488                // Assert: state is "readable".
489                assert!(stream.is_readable());
490
491                // If bytesWritten is 0, throw a TypeError exception.
492                if bytes_written == 0 {
493                    return Err(Error::Type("bytesWritten is 0".to_owned()));
494                }
495
496                // If firstDescriptor’s bytes filled + bytesWritten > firstDescriptor’s byte length,
497                // throw a RangeError exception.
498                if first_descriptor.bytes_filled.get() + bytes_written >
499                    first_descriptor.byte_length
500                {
501                    return Err(Error::Range(
502                        "bytes filled + bytesWritten > byte length".to_owned(),
503                    ));
504                }
505            }
506
507            // Set firstDescriptor’s buffer to ! TransferArrayBuffer(firstDescriptor’s buffer).
508            first_descriptor.buffer = first_descriptor
509                .buffer
510                .transfer_array_buffer(cx)
511                .expect("TransferArrayBuffer failed");
512        }
513
514        // Perform ? ReadableByteStreamControllerRespondInternal(controller, bytesWritten).
515        self.respond_internal(cx, bytes_written, can_gc)
516    }
517
518    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-respond-internal>
519    pub(crate) fn respond_internal(
520        &self,
521        cx: SafeJSContext,
522        bytes_written: u64,
523        can_gc: CanGc,
524    ) -> Fallible<()> {
525        {
526            // Let firstDescriptor be controller.[[pendingPullIntos]][0].
527            let pending_pull_intos = self.pending_pull_intos.borrow();
528            let first_descriptor = pending_pull_intos.first().unwrap();
529
530            // Assert: ! CanTransferArrayBuffer(firstDescriptor’s buffer) is true
531            assert!(first_descriptor.buffer.can_transfer_array_buffer(cx));
532        }
533
534        // Perform ! ReadableByteStreamControllerInvalidateBYOBRequest(controller).
535        self.invalidate_byob_request();
536
537        // Let state be controller.[[stream]].[[state]].
538        let stream = self.stream.get().unwrap();
539
540        // If state is "closed",
541        if stream.is_closed() {
542            // Assert: bytesWritten is 0.
543            assert_eq!(bytes_written, 0);
544
545            // Perform ! ReadableByteStreamControllerRespondInClosedState(controller, firstDescriptor).
546            self.respond_in_closed_state(cx, can_gc)
547                .expect("respond_in_closed_state failed");
548        } else {
549            // Assert: state is "readable".
550            assert!(stream.is_readable());
551
552            // Assert: bytesWritten > 0.
553            assert!(bytes_written > 0);
554
555            // Perform ? ReadableByteStreamControllerRespondInReadableState(controller, bytesWritten, firstDescriptor).
556            self.respond_in_readable_state(cx, bytes_written, can_gc)?;
557        }
558
559        // Perform ! ReadableByteStreamControllerCallPullIfNeeded(controller).
560        self.call_pull_if_needed(can_gc);
561
562        Ok(())
563    }
564
565    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-respond-in-closed-state>
566    pub(crate) fn respond_in_closed_state(&self, cx: SafeJSContext, can_gc: CanGc) -> Fallible<()> {
567        let pending_pull_intos = self.pending_pull_intos.borrow();
568        let first_descriptor = pending_pull_intos.first().unwrap();
569
570        // Assert: the remainder after dividing firstDescriptor’s bytes filled
571        // by firstDescriptor’s element size is 0.
572        assert_eq!(
573            first_descriptor.bytes_filled.get() % first_descriptor.element_size,
574            0
575        );
576
577        // If firstDescriptor’s reader type is "none",
578        // perform ! ReadableByteStreamControllerShiftPendingPullInto(controller).
579        let reader_type = first_descriptor.reader_type.is_none();
580
581        // needed to drop the borrow and avoid BorrowMutError
582        drop(pending_pull_intos);
583
584        if reader_type {
585            self.shift_pending_pull_into();
586        }
587
588        // Let stream be controller.[[stream]].
589        let stream = self.stream.get().unwrap();
590
591        // If ! ReadableStreamHasBYOBReader(stream) is true,
592        if stream.has_byob_reader() {
593            // Let filledPullIntos be a new empty list.
594            let mut filled_pull_intos = Vec::new();
595
596            // While filledPullIntos’s size < ! ReadableStreamGetNumReadIntoRequests(stream),
597            while filled_pull_intos.len() < stream.get_num_read_into_requests() {
598                // Let pullIntoDescriptor be ! ReadableByteStreamControllerShiftPendingPullInto(controller).
599                let pull_into_descriptor = self.shift_pending_pull_into();
600
601                // Append pullIntoDescriptor to filledPullIntos.
602                filled_pull_intos.push(pull_into_descriptor);
603            }
604
605            // For each filledPullInto of filledPullIntos,
606            for filled_pull_into in filled_pull_intos {
607                // Perform ! ReadableByteStreamControllerCommitPullIntoDescriptor(stream, filledPullInto).
608                self.commit_pull_into_descriptor(cx, &filled_pull_into, can_gc)
609                    .expect("commit_pull_into_descriptor failed");
610            }
611        }
612
613        Ok(())
614    }
615
616    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-respond-in-readable-state>
617    pub(crate) fn respond_in_readable_state(
618        &self,
619        cx: SafeJSContext,
620        bytes_written: u64,
621        can_gc: CanGc,
622    ) -> Fallible<()> {
623        let pending_pull_intos = self.pending_pull_intos.borrow();
624        let first_descriptor = pending_pull_intos.first().unwrap();
625
626        // Assert: pullIntoDescriptor’s bytes filled + bytesWritten ≤ pullIntoDescriptor’s byte length.
627        assert!(
628            first_descriptor.bytes_filled.get() + bytes_written <= first_descriptor.byte_length
629        );
630
631        // Perform ! ReadableByteStreamControllerFillHeadPullIntoDescriptor(
632        // controller, bytesWritten, pullIntoDescriptor).
633        self.fill_head_pull_into_descriptor(bytes_written, first_descriptor);
634
635        // If pullIntoDescriptor’s reader type is "none",
636        if first_descriptor.reader_type.is_none() {
637            // needed to drop the borrow and avoid BorrowMutError
638            drop(pending_pull_intos);
639
640            // Perform ? ReadableByteStreamControllerEnqueueDetachedPullIntoToQueue(controller, pullIntoDescriptor).
641            self.enqueue_detached_pull_into_to_queue(cx, can_gc)?;
642
643            // Let filledPullIntos be the result of performing
644            // ! ReadableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(controller).
645            let filled_pull_intos = self.process_pull_into_descriptors_using_queue(cx);
646
647            // For each filledPullInto of filledPullIntos,
648            for filled_pull_into in filled_pull_intos {
649                // Perform ! ReadableByteStreamControllerCommitPullIntoDescriptor(controller.[[stream]]
650                // , filledPullInto).
651                self.commit_pull_into_descriptor(cx, &filled_pull_into, can_gc)
652                    .expect("commit_pull_into_descriptor failed");
653            }
654
655            // Return.
656            return Ok(());
657        }
658
659        // If pullIntoDescriptor’s bytes filled < pullIntoDescriptor’s minimum fill, return.
660        if first_descriptor.bytes_filled.get() < first_descriptor.minimum_fill {
661            return Ok(());
662        }
663
664        // needed to drop the borrow and avoid BorrowMutError
665        drop(pending_pull_intos);
666
667        // Perform ! ReadableByteStreamControllerShiftPendingPullInto(controller).
668        let pull_into_descriptor = self.shift_pending_pull_into();
669
670        // Let remainderSize be the remainder after dividing pullIntoDescriptor’s bytes
671        // filled by pullIntoDescriptor’s element size.
672        let remainder_size =
673            pull_into_descriptor.bytes_filled.get() % pull_into_descriptor.element_size;
674
675        // If remainderSize > 0,
676        if remainder_size > 0 {
677            // Let end be pullIntoDescriptor’s byte offset + pullIntoDescriptor’s bytes filled.
678            let end = pull_into_descriptor.byte_offset + pull_into_descriptor.bytes_filled.get();
679
680            // Perform ? ReadableByteStreamControllerEnqueueClonedChunkToQueue(controller,
681            // pullIntoDescriptor’s buffer, end − remainderSize, remainderSize).
682            self.enqueue_cloned_chunk_to_queue(
683                cx,
684                &pull_into_descriptor.buffer,
685                end - remainder_size,
686                remainder_size,
687                can_gc,
688            )?;
689        }
690
691        // Set pullIntoDescriptor’s bytes filled to pullIntoDescriptor’s bytes filled − remainderSize.
692        pull_into_descriptor
693            .bytes_filled
694            .set(pull_into_descriptor.bytes_filled.get() - remainder_size);
695
696        // Let filledPullIntos be the result of performing
697        // ! ReadableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(controller).
698        let filled_pull_intos = self.process_pull_into_descriptors_using_queue(cx);
699
700        // Perform ! ReadableByteStreamControllerCommitPullIntoDescriptor(controller.[[stream]], pullIntoDescriptor).
701        self.commit_pull_into_descriptor(cx, &pull_into_descriptor, can_gc)
702            .expect("commit_pull_into_descriptor failed");
703
704        // For each filledPullInto of filledPullIntos,
705        for filled_pull_into in filled_pull_intos {
706            // Perform ! ReadableByteStreamControllerCommitPullIntoDescriptor(controller.[[stream]], filledPullInto).
707            self.commit_pull_into_descriptor(cx, &filled_pull_into, can_gc)
708                .expect("commit_pull_into_descriptor failed");
709        }
710
711        Ok(())
712    }
713
714    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-respond-with-new-view>
715    pub(crate) fn respond_with_new_view(
716        &self,
717        cx: SafeJSContext,
718        view: HeapBufferSource<ArrayBufferViewU8>,
719        can_gc: CanGc,
720    ) -> Fallible<()> {
721        let view_byte_length;
722        {
723            // Assert: controller.[[pendingPullIntos]] is not empty.
724            let mut pending_pull_intos = self.pending_pull_intos.borrow_mut();
725            assert!(!pending_pull_intos.is_empty());
726
727            // Assert: ! IsDetachedBuffer(view.[[ViewedArrayBuffer]]) is false.
728            assert!(!view.is_detached_buffer(cx));
729
730            // Let firstDescriptor be controller.[[pendingPullIntos]][0].
731            let first_descriptor = pending_pull_intos.first_mut().unwrap();
732
733            // Let state be controller.[[stream]].[[state]].
734            let stream = self.stream.get().unwrap();
735
736            // If state is "closed",
737            if stream.is_closed() {
738                // If view.[[ByteLength]] is not 0, throw a TypeError exception.
739                if view.byte_length() != 0 {
740                    return Err(Error::Type("view byte length is not 0".to_owned()));
741                }
742            } else {
743                // Assert: state is "readable".
744                assert!(stream.is_readable());
745
746                // If view.[[ByteLength]] is 0, throw a TypeError exception.
747                if view.byte_length() == 0 {
748                    return Err(Error::Type("view byte length is 0".to_owned()));
749                }
750            }
751
752            // If firstDescriptor’s byte offset + firstDescriptor’ bytes filled is not view.[[ByteOffset]],
753            // throw a RangeError exception.
754            if first_descriptor.byte_offset + first_descriptor.bytes_filled.get() !=
755                (view.get_byte_offset() as u64)
756            {
757                return Err(Error::Range(
758                    "firstDescriptor's byte offset + bytes filled is not view byte offset"
759                        .to_owned(),
760                ));
761            }
762
763            // If firstDescriptor’s buffer byte length is not view.[[ViewedArrayBuffer]].[[ByteLength]],
764            // throw a RangeError exception.
765            if first_descriptor.buffer_byte_length !=
766                (view.viewed_buffer_array_byte_length(cx) as u64)
767            {
768                return Err(Error::Range(
769                "firstDescriptor's buffer byte length is not view viewed buffer array byte length"
770                    .to_owned(),
771            ));
772            }
773
774            // If firstDescriptor’s bytes filled + view.[[ByteLength]] > firstDescriptor’s byte length,
775            // throw a RangeError exception.
776            if first_descriptor.bytes_filled.get() + (view.byte_length()) as u64 >
777                first_descriptor.byte_length
778            {
779                return Err(Error::Range(
780                    "bytes filled + view byte length > byte length".to_owned(),
781                ));
782            }
783
784            // Let viewByteLength be view.[[ByteLength]].
785            view_byte_length = view.byte_length();
786
787            // Set firstDescriptor’s buffer to ? TransferArrayBuffer(view.[[ViewedArrayBuffer]]).
788            first_descriptor.buffer = view
789                .get_array_buffer_view_buffer(cx)
790                .transfer_array_buffer(cx)?;
791        }
792
793        // Perform ? ReadableByteStreamControllerRespondInternal(controller, viewByteLength).
794        self.respond_internal(cx, view_byte_length as u64, can_gc)
795    }
796
797    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-get-desired-size>
798    pub(crate) fn get_desired_size(&self) -> Option<f64> {
799        // Let state be controller.[[stream]].[[state]].
800        let stream = self.stream.get()?;
801
802        // If state is "errored", return null.
803        if stream.is_errored() {
804            return None;
805        }
806
807        // If state is "closed", return 0.
808        if stream.is_closed() {
809            return Some(0.0);
810        }
811
812        // Return controller.[[strategyHWM]] − controller.[[queueTotalSize]].
813        Some(self.strategy_hwm - self.queue_total_size.get())
814    }
815
816    /// <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamcontrollergetbyobrequest>
817    pub(crate) fn get_byob_request(
818        &self,
819        cx: SafeJSContext,
820        can_gc: CanGc,
821    ) -> Fallible<Option<DomRoot<ReadableStreamBYOBRequest>>> {
822        // If controller.[[byobRequest]] is null and controller.[[pendingPullIntos]] is not empty,
823        let pending_pull_intos = self.pending_pull_intos.borrow();
824        if self.byob_request.get().is_none() && !pending_pull_intos.is_empty() {
825            // Let firstDescriptor be controller.[[pendingPullIntos]][0].
826            let first_descriptor = pending_pull_intos.first().unwrap();
827            // Let view be ! Construct(%Uint8Array%, « firstDescriptor’s buffer,
828            // firstDescriptor’s byte offset + firstDescriptor’s bytes filled,
829            // firstDescriptor’s byte length − firstDescriptor’s bytes filled »).
830
831            let byte_offset = first_descriptor.byte_offset + first_descriptor.bytes_filled.get();
832            let byte_length = first_descriptor.byte_length - first_descriptor.bytes_filled.get();
833
834            let view = create_buffer_source_with_constructor(
835                cx,
836                &Constructor::Name(Type::Uint8),
837                &first_descriptor.buffer,
838                byte_offset as usize,
839                byte_length as usize,
840            )
841            .expect("Construct Uint8Array failed");
842
843            // Let byobRequest be a new ReadableStreamBYOBRequest.
844            let byob_request = ReadableStreamBYOBRequest::new(&self.global(), can_gc);
845
846            // Set byobRequest.[[controller]] to controller.
847            byob_request.set_controller(Some(&DomRoot::from_ref(self)));
848
849            // Set byobRequest.[[view]] to view.
850            byob_request.set_view(Some(view));
851
852            // Set controller.[[byobRequest]] to byobRequest.
853            self.byob_request.set(Some(&byob_request));
854        }
855
856        // Return controller.[[byobRequest]].
857        Ok(self.byob_request.get())
858    }
859
860    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-close>
861    pub(crate) fn close(&self, cx: SafeJSContext, can_gc: CanGc) -> Fallible<()> {
862        // Let stream be controller.[[stream]].
863        let stream = self.stream.get().unwrap();
864
865        // If controller.[[closeRequested]] is true or stream.[[state]] is not "readable", return.
866        if self.close_requested.get() || !stream.is_readable() {
867            return Ok(());
868        }
869
870        // If controller.[[queueTotalSize]] > 0,
871        if self.queue_total_size.get() > 0.0 {
872            // Set controller.[[closeRequested]] to true.
873            self.close_requested.set(true);
874            // Return.
875            return Ok(());
876        }
877
878        // If controller.[[pendingPullIntos]] is not empty,
879        let pending_pull_intos = self.pending_pull_intos.borrow();
880        if !pending_pull_intos.is_empty() {
881            // Let firstPendingPullInto be controller.[[pendingPullIntos]][0].
882            let first_pending_pull_into = pending_pull_intos.first().unwrap();
883
884            // If the remainder after dividing firstPendingPullInto’s bytes filled by
885            // firstPendingPullInto’s element size is not 0,
886            if first_pending_pull_into.bytes_filled.get() % first_pending_pull_into.element_size !=
887                0
888            {
889                // needed to drop the borrow and avoid BorrowMutError
890                drop(pending_pull_intos);
891
892                // Let e be a new TypeError exception.
893                let e = Error::Type(
894                    "remainder after dividing firstPendingPullInto's bytes
895                    filled by firstPendingPullInto's element size is not 0"
896                        .to_owned(),
897                );
898
899                // Perform ! ReadableByteStreamControllerError(controller, e).
900                rooted!(in(*cx) let mut error = UndefinedValue());
901                e.clone()
902                    .to_jsval(cx, &self.global(), error.handle_mut(), can_gc);
903                self.error(error.handle(), can_gc);
904
905                // Throw e.
906                return Err(e);
907            }
908        }
909
910        // Perform ! ReadableByteStreamControllerClearAlgorithms(controller).
911        self.clear_algorithms();
912
913        // Perform ! ReadableStreamClose(stream).
914        stream.close(can_gc);
915        Ok(())
916    }
917
918    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-error>
919    pub(crate) fn error(&self, e: SafeHandleValue, can_gc: CanGc) {
920        // Let stream be controller.[[stream]].
921        let stream = self.stream.get().unwrap();
922
923        // If stream.[[state]] is not "readable", return.
924        if !stream.is_readable() {
925            return;
926        }
927
928        // Perform ! ReadableByteStreamControllerClearPendingPullIntos(controller).
929        self.clear_pending_pull_intos();
930
931        // Perform ! ResetQueue(controller).
932        self.reset_queue();
933
934        // Perform ! ReadableByteStreamControllerClearAlgorithms(controller).
935        self.clear_algorithms();
936
937        // Perform ! ReadableStreamError(stream, e).
938        stream.error(e, can_gc);
939    }
940
941    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-clear-algorithms>
942    fn clear_algorithms(&self) {
943        // Set controller.[[pullAlgorithm]] to undefined.
944        // Set controller.[[cancelAlgorithm]] to undefined.
945        self.underlying_source.set(None);
946    }
947
948    /// <https://streams.spec.whatwg.org/#reset-queue>
949    pub(crate) fn reset_queue(&self) {
950        // Assert: container has [[queue]] and [[queueTotalSize]] internal slots.
951
952        // Set container.[[queue]] to a new empty list.
953        self.queue.borrow_mut().clear();
954
955        // Set container.[[queueTotalSize]] to 0.
956        self.queue_total_size.set(0.0);
957    }
958
959    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-clear-pending-pull-intos>
960    pub(crate) fn clear_pending_pull_intos(&self) {
961        // Perform ! ReadableByteStreamControllerInvalidateBYOBRequest(controller).
962        self.invalidate_byob_request();
963
964        // Set controller.[[pendingPullIntos]] to a new empty list.
965        self.pending_pull_intos.borrow_mut().clear();
966    }
967
968    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-invalidate-byob-request>
969    pub(crate) fn invalidate_byob_request(&self) {
970        if let Some(byob_request) = self.byob_request.get() {
971            // Set controller.[[byobRequest]].[[controller]] to undefined.
972            byob_request.set_controller(None);
973
974            // Set controller.[[byobRequest]].[[view]] to null.
975            byob_request.set_view(None);
976
977            // Set controller.[[byobRequest]] to null.
978            self.byob_request.set(None);
979        }
980        // If controller.[[byobRequest]] is null, return.
981    }
982
983    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-enqueue>
984    pub(crate) fn enqueue(
985        &self,
986        cx: SafeJSContext,
987        chunk: HeapBufferSource<ArrayBufferViewU8>,
988        can_gc: CanGc,
989    ) -> Fallible<()> {
990        // Let stream be controller.[[stream]].
991        let stream = self.stream.get().unwrap();
992
993        // If controller.[[closeRequested]] is true or stream.[[state]] is not "readable", return.
994        if self.close_requested.get() || !stream.is_readable() {
995            return Ok(());
996        }
997
998        // Let buffer be chunk.[[ViewedArrayBuffer]].
999        let buffer = chunk.get_array_buffer_view_buffer(cx);
1000
1001        // Let byteOffset be chunk.[[ByteOffset]].
1002        let byte_offset = chunk.get_byte_offset();
1003
1004        // Let byteLength be chunk.[[ByteLength]].
1005        let byte_length = chunk.byte_length();
1006
1007        // If ! IsDetachedBuffer(buffer) is true, throw a TypeError exception.
1008        if buffer.is_detached_buffer(cx) {
1009            return Err(Error::Type("buffer is detached".to_owned()));
1010        }
1011
1012        // Let transferredBuffer be ? TransferArrayBuffer(buffer).
1013        let transferred_buffer = buffer.transfer_array_buffer(cx)?;
1014
1015        // If controller.[[pendingPullIntos]] is not empty,
1016        {
1017            let mut pending_pull_intos = self.pending_pull_intos.borrow_mut();
1018            if !pending_pull_intos.is_empty() {
1019                // Let firstPendingPullInto be controller.[[pendingPullIntos]][0].
1020                let first_descriptor = pending_pull_intos.first_mut().unwrap();
1021                // If ! IsDetachedBuffer(firstPendingPullInto’s buffer) is true, throw a TypeError exception.
1022                if first_descriptor.buffer.is_detached_buffer(cx) {
1023                    return Err(Error::Type("buffer is detached".to_owned()));
1024                }
1025
1026                // Perform ! ReadableByteStreamControllerInvalidateBYOBRequest(controller).
1027                self.invalidate_byob_request();
1028
1029                // Set firstPendingPullInto’s buffer to ! TransferArrayBuffer(firstPendingPullInto’s buffer).
1030                first_descriptor.buffer = first_descriptor
1031                    .buffer
1032                    .transfer_array_buffer(cx)
1033                    .expect("TransferArrayBuffer failed");
1034
1035                // If firstPendingPullInto’s reader type is "none",
1036                if first_descriptor.reader_type.is_none() {
1037                    // needed to drop the borrow and avoid BorrowMutError
1038                    drop(pending_pull_intos);
1039
1040                    // perform ? ReadableByteStreamControllerEnqueueDetachedPullIntoToQueue(
1041                    // controller, firstPendingPullInto).
1042                    self.enqueue_detached_pull_into_to_queue(cx, can_gc)?;
1043                }
1044            }
1045        }
1046
1047        // If ! ReadableStreamHasDefaultReader(stream) is true,
1048        if stream.has_default_reader() {
1049            // Perform ! ReadableByteStreamControllerProcessReadRequestsUsingQueue(controller).
1050            self.process_read_requests_using_queue(cx, can_gc)
1051                .expect("process_read_requests_using_queue failed");
1052
1053            // If ! ReadableStreamGetNumReadRequests(stream) is 0,
1054            if stream.get_num_read_requests() == 0 {
1055                // Assert: controller.[[pendingPullIntos]] is empty.
1056                {
1057                    assert!(self.pending_pull_intos.borrow().is_empty());
1058                }
1059
1060                // Perform ! ReadableByteStreamControllerEnqueueChunkToQueue(
1061                // controller, transferredBuffer, byteOffset, byteLength).
1062                self.enqueue_chunk_to_queue(transferred_buffer, byte_offset, byte_length);
1063            } else {
1064                // Assert: controller.[[queue]] is empty.
1065                assert!(self.queue.borrow().is_empty());
1066
1067                // If controller.[[pendingPullIntos]] is not empty,
1068
1069                let pending_pull_intos = self.pending_pull_intos.borrow();
1070                if !pending_pull_intos.is_empty() {
1071                    // Assert: controller.[[pendingPullIntos]][0]'s reader type is "default".
1072                    assert!(matches!(
1073                        pending_pull_intos.first().unwrap().reader_type,
1074                        Some(ReaderType::Default)
1075                    ));
1076
1077                    // needed to drop the borrow and avoid BorrowMutError
1078                    drop(pending_pull_intos);
1079
1080                    // Perform ! ReadableByteStreamControllerShiftPendingPullInto(controller).
1081                    self.shift_pending_pull_into();
1082                }
1083
1084                // Let transferredView be ! Construct(%Uint8Array%, « transferredBuffer, byteOffset, byteLength »).
1085                let transferred_view = create_buffer_source_with_constructor(
1086                    cx,
1087                    &Constructor::Name(Type::Uint8),
1088                    &transferred_buffer,
1089                    byte_offset,
1090                    byte_length,
1091                )
1092                .expect("Construct Uint8Array failed");
1093
1094                // Perform ! ReadableStreamFulfillReadRequest(stream, transferredView, false).
1095                rooted!(in(*cx) let mut view_value = UndefinedValue());
1096                transferred_view.get_buffer_view_value(cx, view_value.handle_mut());
1097                stream.fulfill_read_request(view_value.handle(), false, can_gc);
1098            }
1099            // Otherwise, if ! ReadableStreamHasBYOBReader(stream) is true,
1100        } else if stream.has_byob_reader() {
1101            // Perform ! ReadableByteStreamControllerEnqueueChunkToQueue(
1102            // controller, transferredBuffer, byteOffset, byteLength).
1103            self.enqueue_chunk_to_queue(transferred_buffer, byte_offset, byte_length);
1104
1105            // Let filledPullIntos be the result of performing !
1106            // ReadableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(controller).
1107            let filled_pull_intos = self.process_pull_into_descriptors_using_queue(cx);
1108
1109            // For each filledPullInto of filledPullIntos,
1110            // Perform ! ReadableByteStreamControllerCommitPullIntoDescriptor(stream, filledPullInto).
1111            for filled_pull_into in filled_pull_intos {
1112                self.commit_pull_into_descriptor(cx, &filled_pull_into, can_gc)
1113                    .expect("commit_pull_into_descriptor failed");
1114            }
1115        } else {
1116            // Assert: ! IsReadableStreamLocked(stream) is false.
1117            assert!(!stream.is_locked());
1118
1119            // Perform ! ReadableByteStreamControllerEnqueueChunkToQueue
1120            // (controller, transferredBuffer, byteOffset, byteLength).
1121            self.enqueue_chunk_to_queue(transferred_buffer, byte_offset, byte_length);
1122        }
1123
1124        // Perform ! ReadableByteStreamControllerCallPullIfNeeded(controller).
1125        self.call_pull_if_needed(can_gc);
1126
1127        Ok(())
1128    }
1129
1130    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-commit-pull-into-descriptor>
1131    pub(crate) fn commit_pull_into_descriptor(
1132        &self,
1133        cx: SafeJSContext,
1134        pull_into_descriptor: &PullIntoDescriptor,
1135        can_gc: CanGc,
1136    ) -> Fallible<()> {
1137        // Assert: stream.[[state]] is not "errored".
1138        let stream = self.stream.get().unwrap();
1139        assert!(!stream.is_errored());
1140
1141        // Assert: pullIntoDescriptor.reader type is not "none".
1142        assert!(pull_into_descriptor.reader_type.is_some());
1143
1144        // Let done be false.
1145        let mut done = false;
1146
1147        // If stream.[[state]] is "closed",
1148        if stream.is_closed() {
1149            // Assert: the remainder after dividing pullIntoDescriptor’s bytes filled
1150            // by pullIntoDescriptor’s element size is 0.
1151            assert!(
1152                pull_into_descriptor.bytes_filled.get() % pull_into_descriptor.element_size == 0
1153            );
1154
1155            // Set done to true.
1156            done = true;
1157        }
1158
1159        // Let filledView be ! ReadableByteStreamControllerConvertPullIntoDescriptor(pullIntoDescriptor).
1160        let filled_view = self
1161            .convert_pull_into_descriptor(cx, pull_into_descriptor)
1162            .expect("convert_pull_into_descriptor failed");
1163
1164        rooted!(in(*cx) let mut view_value = UndefinedValue());
1165        filled_view.get_buffer_view_value(cx, view_value.handle_mut());
1166
1167        // If pullIntoDescriptor’s reader type is "default",
1168        if matches!(pull_into_descriptor.reader_type, Some(ReaderType::Default)) {
1169            // Perform ! ReadableStreamFulfillReadRequest(stream, filledView, done).
1170
1171            stream.fulfill_read_request(view_value.handle(), done, can_gc);
1172        } else {
1173            // Assert: pullIntoDescriptor’s reader type is "byob".
1174            assert!(matches!(
1175                pull_into_descriptor.reader_type,
1176                Some(ReaderType::Byob)
1177            ));
1178
1179            // Perform ! ReadableStreamFulfillReadIntoRequest(stream, filledView, done).
1180            stream.fulfill_read_into_request(view_value.handle(), done, can_gc);
1181        }
1182        Ok(())
1183    }
1184
1185    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-convert-pull-into-descriptor>
1186    pub(crate) fn convert_pull_into_descriptor(
1187        &self,
1188        cx: SafeJSContext,
1189        pull_into_descriptor: &PullIntoDescriptor,
1190    ) -> Fallible<HeapBufferSource<ArrayBufferViewU8>> {
1191        // Let bytesFilled be pullIntoDescriptor’s bytes filled.
1192        let bytes_filled = pull_into_descriptor.bytes_filled.get();
1193
1194        // Let elementSize be pullIntoDescriptor’s element size.
1195        let element_size = pull_into_descriptor.element_size;
1196
1197        // Assert: bytesFilled ≤ pullIntoDescriptor’s byte length.
1198        assert!(bytes_filled <= pull_into_descriptor.byte_length);
1199
1200        // Assert: the remainder after dividing bytesFilled by elementSize is 0.
1201        assert!(bytes_filled % element_size == 0);
1202
1203        // Let buffer be ! TransferArrayBuffer(pullIntoDescriptor’s buffer).
1204        let buffer = pull_into_descriptor
1205            .buffer
1206            .transfer_array_buffer(cx)
1207            .expect("TransferArrayBuffer failed");
1208
1209        // Return ! Construct(pullIntoDescriptor’s view constructor,
1210        // « buffer, pullIntoDescriptor’s byte offset, bytesFilled ÷ elementSize »).
1211        Ok(create_buffer_source_with_constructor(
1212            cx,
1213            &pull_into_descriptor.view_constructor,
1214            &buffer,
1215            pull_into_descriptor.byte_offset as usize,
1216            (bytes_filled / element_size) as usize,
1217        )
1218        .expect("Construct view failed"))
1219    }
1220
1221    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-process-pull-into-descriptors-using-queue>
1222    pub(crate) fn process_pull_into_descriptors_using_queue(
1223        &self,
1224        cx: SafeJSContext,
1225    ) -> Vec<PullIntoDescriptor> {
1226        // Assert: controller.[[closeRequested]] is false.
1227        assert!(!self.close_requested.get());
1228
1229        // Let filledPullIntos be a new empty list.
1230        let mut filled_pull_intos = Vec::new();
1231
1232        // While controller.[[pendingPullIntos]] is not empty,
1233        loop {
1234            // If controller.[[queueTotalSize]] is 0, then break.
1235            if self.queue_total_size.get() == 0.0 {
1236                break;
1237            }
1238
1239            // Let pullIntoDescriptor be controller.[[pendingPullIntos]][0].
1240            let fill_pull_result = {
1241                let pending_pull_intos = self.pending_pull_intos.borrow();
1242                let Some(pull_into_descriptor) = pending_pull_intos.first() else {
1243                    break;
1244                };
1245                self.fill_pull_into_descriptor_from_queue(cx, pull_into_descriptor)
1246            };
1247
1248            // If ! ReadableByteStreamControllerFillPullIntoDescriptorFromQueue(controller, pullIntoDescriptor) is true,
1249            if fill_pull_result {
1250                // Perform ! ReadableByteStreamControllerShiftPendingPullInto(controller).
1251                let pull_into_descriptor = self.shift_pending_pull_into();
1252
1253                // Append pullIntoDescriptor to filledPullIntos.
1254                filled_pull_intos.push(pull_into_descriptor);
1255            }
1256        }
1257
1258        // Return filledPullIntos.
1259        filled_pull_intos
1260    }
1261
1262    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-fill-pull-into-descriptor-from-queue>
1263    pub(crate) fn fill_pull_into_descriptor_from_queue(
1264        &self,
1265        cx: SafeJSContext,
1266        pull_into_descriptor: &PullIntoDescriptor,
1267    ) -> bool {
1268        // Let maxBytesToCopy be min(controller.[[queueTotalSize]],
1269        // pullIntoDescriptor’s byte length − pullIntoDescriptor’s bytes filled).
1270        let max_bytes_to_copy = min(
1271            self.queue_total_size.get() as usize,
1272            (pull_into_descriptor.byte_length - pull_into_descriptor.bytes_filled.get()) as usize,
1273        );
1274
1275        // Let maxBytesFilled be pullIntoDescriptor’s bytes filled + maxBytesToCopy.
1276        let max_bytes_filled = pull_into_descriptor.bytes_filled.get() as usize + max_bytes_to_copy;
1277
1278        // Let totalBytesToCopyRemaining be maxBytesToCopy.
1279        let mut total_bytes_to_copy_remaining = max_bytes_to_copy;
1280
1281        // Let ready be false.
1282        let mut ready = false;
1283
1284        // Assert: ! IsDetachedBuffer(pullIntoDescriptor’s buffer) is false.
1285        assert!(!pull_into_descriptor.buffer.is_detached_buffer(cx));
1286
1287        // Assert: pullIntoDescriptor’s bytes filled < pullIntoDescriptor’s minimum fill.
1288        assert!(pull_into_descriptor.bytes_filled.get() < pull_into_descriptor.minimum_fill);
1289
1290        // Let remainderBytes be the remainder after dividing maxBytesFilled by pullIntoDescriptor’s element size.
1291        let remainder_bytes = max_bytes_filled % pull_into_descriptor.element_size as usize;
1292
1293        // Let maxAlignedBytes be maxBytesFilled − remainderBytes.
1294        let max_aligned_bytes = max_bytes_filled - remainder_bytes;
1295
1296        // If maxAlignedBytes ≥ pullIntoDescriptor’s minimum fill,
1297        if max_aligned_bytes >= pull_into_descriptor.minimum_fill as usize {
1298            // Set totalBytesToCopyRemaining to maxAlignedBytes − pullIntoDescriptor’s bytes filled.
1299            total_bytes_to_copy_remaining =
1300                max_aligned_bytes - (pull_into_descriptor.bytes_filled.get() as usize);
1301
1302            // Set ready to true.
1303            ready = true;
1304        }
1305
1306        // Let queue be controller.[[queue]].
1307        let mut queue = self.queue.borrow_mut();
1308
1309        // While totalBytesToCopyRemaining > 0,
1310        while total_bytes_to_copy_remaining > 0 {
1311            // Let headOfQueue be queue[0].
1312            let head_of_queue = queue.front_mut().unwrap();
1313
1314            // Let bytesToCopy be min(totalBytesToCopyRemaining, headOfQueue’s byte length).
1315            let bytes_to_copy = total_bytes_to_copy_remaining.min(head_of_queue.byte_length);
1316
1317            // Let destStart be pullIntoDescriptor’s byte offset + pullIntoDescriptor’s bytes filled.
1318            let dest_start =
1319                pull_into_descriptor.byte_offset + pull_into_descriptor.bytes_filled.get();
1320
1321            // Let descriptorBuffer be pullIntoDescriptor’s buffer.
1322            let descriptor_buffer = &pull_into_descriptor.buffer;
1323
1324            // Let queueBuffer be headOfQueue’s buffer.
1325            let queue_buffer = &head_of_queue.buffer;
1326
1327            // Let queueByteOffset be headOfQueue’s byte offset.
1328            let queue_byte_offset = head_of_queue.byte_offset;
1329
1330            // Assert: ! CanCopyDataBlockBytes(descriptorBuffer, destStart,
1331            // queueBuffer, queueByteOffset, bytesToCopy) is true.
1332            assert!(descriptor_buffer.can_copy_data_block_bytes(
1333                cx,
1334                dest_start as usize,
1335                queue_buffer,
1336                queue_byte_offset,
1337                bytes_to_copy
1338            ));
1339
1340            // Perform ! CopyDataBlockBytes(descriptorBuffer.[[ArrayBufferData]], destStart,
1341            // queueBuffer.[[ArrayBufferData]], queueByteOffset, bytesToCopy).
1342            descriptor_buffer.copy_data_block_bytes(
1343                cx,
1344                dest_start as usize,
1345                queue_buffer,
1346                queue_byte_offset,
1347                bytes_to_copy,
1348            );
1349
1350            // If headOfQueue’s byte length is bytesToCopy,
1351            if head_of_queue.byte_length == bytes_to_copy {
1352                // Remove queue[0].
1353                queue.pop_front().unwrap();
1354            } else {
1355                // Set headOfQueue’s byte offset to headOfQueue’s byte offset + bytesToCopy.
1356                head_of_queue.byte_offset += bytes_to_copy;
1357
1358                // Set headOfQueue’s byte length to headOfQueue’s byte length − bytesToCopy.
1359                head_of_queue.byte_length -= bytes_to_copy;
1360            }
1361
1362            // Set controller.[[queueTotalSize]] to controller.[[queueTotalSize]] − bytesToCopy.
1363            self.queue_total_size
1364                .set(self.queue_total_size.get() - (bytes_to_copy as f64));
1365
1366            // Perform ! ReadableByteStreamControllerFillHeadPullIntoDescriptor(
1367            // controller, bytesToCopy, pullIntoDescriptor).
1368            self.fill_head_pull_into_descriptor(bytes_to_copy as u64, pull_into_descriptor);
1369
1370            // Set totalBytesToCopyRemaining to totalBytesToCopyRemaining − bytesToCopy.
1371            total_bytes_to_copy_remaining -= bytes_to_copy;
1372        }
1373
1374        // If ready is false,
1375        if !ready {
1376            // Assert: controller.[[queueTotalSize]] is 0.
1377            assert!(self.queue_total_size.get() == 0.0);
1378
1379            // Assert: pullIntoDescriptor’s bytes filled > 0.
1380            assert!(pull_into_descriptor.bytes_filled.get() > 0);
1381
1382            // Assert: pullIntoDescriptor’s bytes filled < pullIntoDescriptor’s minimum fill.
1383            assert!(pull_into_descriptor.bytes_filled.get() < pull_into_descriptor.minimum_fill);
1384        }
1385
1386        // Return ready.
1387        ready
1388    }
1389
1390    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-fill-head-pull-into-descriptor>
1391    pub(crate) fn fill_head_pull_into_descriptor(
1392        &self,
1393        bytes_copied: u64,
1394        pull_into_descriptor: &PullIntoDescriptor,
1395    ) {
1396        // Assert: either controller.[[pendingPullIntos]] is empty,
1397        // or controller.[[pendingPullIntos]][0] is pullIntoDescriptor.
1398        {
1399            let pending_pull_intos = self.pending_pull_intos.borrow();
1400            assert!(
1401                pending_pull_intos.is_empty() ||
1402                    pending_pull_intos.first().unwrap() == pull_into_descriptor
1403            );
1404        }
1405
1406        // Assert: controller.[[byobRequest]] is null.
1407        assert!(self.byob_request.get().is_none());
1408
1409        // Set pullIntoDescriptor’s bytes filled to bytes filled + size.
1410        pull_into_descriptor
1411            .bytes_filled
1412            .set(pull_into_descriptor.bytes_filled.get() + bytes_copied);
1413    }
1414
1415    /// <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamcontrollerenqueuedetachedpullintotoqueue>
1416    pub(crate) fn enqueue_detached_pull_into_to_queue(
1417        &self,
1418        cx: SafeJSContext,
1419        can_gc: CanGc,
1420    ) -> Fallible<()> {
1421        // first_descriptor: &PullIntoDescriptor,
1422        let pending_pull_intos = self.pending_pull_intos.borrow();
1423        let first_descriptor = pending_pull_intos.first().unwrap();
1424
1425        // Assert: pullIntoDescriptor’s reader type is "none".
1426        assert!(first_descriptor.reader_type.is_none());
1427
1428        // If pullIntoDescriptor’s bytes filled > 0, perform ?
1429        // ReadableByteStreamControllerEnqueueClonedChunkToQueue(controller,
1430        // pullIntoDescriptor’s buffer, pullIntoDescriptor’s byte offset, pullIntoDescriptor’s bytes filled).
1431
1432        if first_descriptor.bytes_filled.get() > 0 {
1433            self.enqueue_cloned_chunk_to_queue(
1434                cx,
1435                &first_descriptor.buffer,
1436                first_descriptor.byte_offset,
1437                first_descriptor.bytes_filled.get(),
1438                can_gc,
1439            )?;
1440        }
1441
1442        // needed to drop the borrow and avoid BorrowMutError
1443        drop(pending_pull_intos);
1444
1445        // Perform ! ReadableByteStreamControllerShiftPendingPullInto(controller).
1446        self.shift_pending_pull_into();
1447
1448        Ok(())
1449    }
1450
1451    /// <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamcontrollerenqueueclonedchunktoqueue>
1452    pub(crate) fn enqueue_cloned_chunk_to_queue(
1453        &self,
1454        cx: SafeJSContext,
1455        buffer: &HeapBufferSource<ArrayBufferU8>,
1456        byte_offset: u64,
1457        byte_length: u64,
1458        can_gc: CanGc,
1459    ) -> Fallible<()> {
1460        // Let cloneResult be CloneArrayBuffer(buffer, byteOffset, byteLength, %ArrayBuffer%).
1461        if let Some(clone_result) =
1462            buffer.clone_array_buffer(cx, byte_offset as usize, byte_length as usize)
1463        {
1464            // Perform ! ReadableByteStreamControllerEnqueueChunkToQueue
1465            // (controller, cloneResult.[[Value]], 0, byteLength).
1466            self.enqueue_chunk_to_queue(clone_result, 0, byte_length as usize);
1467
1468            Ok(())
1469        } else {
1470            // If cloneResult is an abrupt completion,
1471
1472            // Perform ! ReadableByteStreamControllerError(controller, cloneResult.[[Value]]).
1473            rooted!(in(*cx) let mut rval = UndefinedValue());
1474            let error = Error::Type("can not clone array buffer".to_owned());
1475            error
1476                .clone()
1477                .to_jsval(cx, &self.global(), rval.handle_mut(), can_gc);
1478            self.error(rval.handle(), can_gc);
1479
1480            // Return cloneResult.
1481            Err(error)
1482        }
1483    }
1484
1485    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-enqueue-chunk-to-queue>
1486    pub(crate) fn enqueue_chunk_to_queue(
1487        &self,
1488        buffer: HeapBufferSource<ArrayBufferU8>,
1489        byte_offset: usize,
1490        byte_length: usize,
1491    ) {
1492        // Let entry be a new ReadableByteStreamQueueEntry object.
1493        let entry = QueueEntry::new(buffer, byte_offset, byte_length);
1494
1495        // Append entry to controller.[[queue]].
1496        self.queue.borrow_mut().push_back(entry);
1497
1498        // Set controller.[[queueTotalSize]] to controller.[[queueTotalSize]] + byteLength.
1499        self.queue_total_size
1500            .set(self.queue_total_size.get() + byte_length as f64);
1501    }
1502
1503    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-shift-pending-pull-into>
1504    pub(crate) fn shift_pending_pull_into(&self) -> PullIntoDescriptor {
1505        // Assert: controller.[[byobRequest]] is null.
1506        assert!(self.byob_request.get().is_none());
1507
1508        // Let descriptor be controller.[[pendingPullIntos]][0].
1509        // Remove descriptor from controller.[[pendingPullIntos]].
1510        // Return descriptor.
1511        self.pending_pull_intos.borrow_mut().remove(0)
1512    }
1513
1514    /// <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamcontrollerprocessreadrequestsusingqueue>
1515    pub(crate) fn process_read_requests_using_queue(
1516        &self,
1517        cx: SafeJSContext,
1518        can_gc: CanGc,
1519    ) -> Fallible<()> {
1520        // Let reader be controller.[[stream]].[[reader]].
1521        // Assert: reader implements ReadableStreamDefaultReader.
1522        let reader = self.stream.get().unwrap().get_default_reader();
1523
1524        // Step 3
1525        reader.process_read_requests(cx, DomRoot::from_ref(self), can_gc)
1526    }
1527
1528    /// <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamcontrollerfillreadrequestfromqueue>
1529    pub(crate) fn fill_read_request_from_queue(
1530        &self,
1531        cx: SafeJSContext,
1532        read_request: &ReadRequest,
1533        can_gc: CanGc,
1534    ) -> Fallible<()> {
1535        // Assert: controller.[[queueTotalSize]] > 0.
1536        assert!(self.queue_total_size.get() > 0.0);
1537        // Also assert that the queue has a non-zero length;
1538        assert!(!self.queue.borrow().is_empty());
1539
1540        // Let entry be controller.[[queue]][0].
1541        // Remove entry from controller.[[queue]].
1542        let entry = self.remove_entry();
1543
1544        // Set controller.[[queueTotalSize]] to controller.[[queueTotalSize]] − entry’s byte length.
1545        self.queue_total_size
1546            .set(self.queue_total_size.get() - entry.byte_length as f64);
1547
1548        // Perform ! ReadableByteStreamControllerHandleQueueDrain(controller).
1549        self.handle_queue_drain(can_gc);
1550
1551        // Let view be ! Construct(%Uint8Array%, « entry’s buffer, entry’s byte offset, entry’s byte length »).
1552        let view = create_buffer_source_with_constructor(
1553            cx,
1554            &Constructor::Name(Type::Uint8),
1555            &entry.buffer,
1556            entry.byte_offset,
1557            entry.byte_length,
1558        )
1559        .expect("Construct Uint8Array failed");
1560
1561        // Perform readRequest’s chunk steps, given view.
1562        let result = RootedTraceableBox::new(Heap::default());
1563        rooted!(in(*cx) let mut view_value = UndefinedValue());
1564        view.get_buffer_view_value(cx, view_value.handle_mut());
1565        result.set(*view_value);
1566
1567        read_request.chunk_steps(result, can_gc);
1568
1569        Ok(())
1570    }
1571
1572    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-handle-queue-drain>
1573    pub(crate) fn handle_queue_drain(&self, can_gc: CanGc) {
1574        // Assert: controller.[[stream]].[[state]] is "readable".
1575        assert!(self.stream.get().unwrap().is_readable());
1576
1577        // If controller.[[queueTotalSize]] is 0 and controller.[[closeRequested]] is true,
1578        if self.queue_total_size.get() == 0.0 && self.close_requested.get() {
1579            // Perform ! ReadableByteStreamControllerClearAlgorithms(controller).
1580            self.clear_algorithms();
1581
1582            // Perform ! ReadableStreamClose(controller.[[stream]]).
1583            self.stream.get().unwrap().close(can_gc);
1584        } else {
1585            // Perform ! ReadableByteStreamControllerCallPullIfNeeded(controller).
1586            self.call_pull_if_needed(can_gc);
1587        }
1588    }
1589
1590    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-call-pull-if-needed>
1591    pub(crate) fn call_pull_if_needed(&self, can_gc: CanGc) {
1592        // Let shouldPull be ! ReadableByteStreamControllerShouldCallPull(controller).
1593        let should_pull = self.should_call_pull();
1594        // If shouldPull is false, return.
1595        if !should_pull {
1596            return;
1597        }
1598
1599        // If controller.[[pulling]] is true,
1600        if self.pulling.get() {
1601            // Set controller.[[pullAgain]] to true.
1602            self.pull_again.set(true);
1603
1604            // Return.
1605            return;
1606        }
1607
1608        // Assert: controller.[[pullAgain]] is false.
1609        assert!(!self.pull_again.get());
1610
1611        // Set controller.[[pulling]] to true.
1612        self.pulling.set(true);
1613
1614        // Let pullPromise be the result of performing controller.[[pullAlgorithm]].
1615        // Continues into the resolve and reject handling of the native handler.
1616        let global = self.global();
1617        let rooted_controller = DomRoot::from_ref(self);
1618        let controller = Controller::ReadableByteStreamController(rooted_controller.clone());
1619
1620        if let Some(underlying_source) = self.underlying_source.get() {
1621            let handler = PromiseNativeHandler::new(
1622                &global,
1623                Some(Box::new(PullAlgorithmFulfillmentHandler {
1624                    controller: Dom::from_ref(&rooted_controller),
1625                })),
1626                Some(Box::new(PullAlgorithmRejectionHandler {
1627                    controller: Dom::from_ref(&rooted_controller),
1628                })),
1629                can_gc,
1630            );
1631
1632            let realm = enter_realm(&*global);
1633            let comp = InRealm::Entered(&realm);
1634            let result = underlying_source
1635                .call_pull_algorithm(controller, &global, can_gc)
1636                .unwrap_or_else(|| {
1637                    let promise = Promise::new(&global, can_gc);
1638                    promise.resolve_native(&(), can_gc);
1639                    Ok(promise)
1640                });
1641            let promise = result.unwrap_or_else(|error| {
1642                let cx = GlobalScope::get_cx();
1643                rooted!(in(*cx) let mut rval = UndefinedValue());
1644                // TODO: check if `self.global()` is the right globalscope.
1645                error
1646                    .clone()
1647                    .to_jsval(cx, &self.global(), rval.handle_mut(), can_gc);
1648                let promise = Promise::new(&global, can_gc);
1649                promise.reject_native(&rval.handle(), can_gc);
1650                promise
1651            });
1652            promise.append_native_handler(&handler, comp, can_gc);
1653        }
1654    }
1655
1656    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-should-call-pull>
1657    fn should_call_pull(&self) -> bool {
1658        // Let stream be controller.[[stream]].
1659        // Note: the spec does not assert that stream is not undefined here,
1660        // so we return false if it is.
1661        let stream = self.stream.get().unwrap();
1662
1663        // If stream.[[state]] is not "readable", return false.
1664        if !stream.is_readable() {
1665            return false;
1666        }
1667
1668        // If controller.[[closeRequested]] is true, return false.
1669        if self.close_requested.get() {
1670            return false;
1671        }
1672
1673        // If controller.[[started]] is false, return false.
1674        if !self.started.get() {
1675            return false;
1676        }
1677
1678        // If ! ReadableStreamHasDefaultReader(stream) is true and ! ReadableStreamGetNumReadRequests(stream) > 0
1679        // , return true.
1680        if stream.has_default_reader() && stream.get_num_read_requests() > 0 {
1681            return true;
1682        }
1683
1684        // If ! ReadableStreamHasBYOBReader(stream) is true and ! ReadableStreamGetNumReadIntoRequests(stream) > 0
1685        // , return true.
1686        if stream.has_byob_reader() && stream.get_num_read_into_requests() > 0 {
1687            return true;
1688        }
1689
1690        // Let desiredSize be ! ReadableByteStreamControllerGetDesiredSize(controller).
1691        let desired_size = self.get_desired_size();
1692
1693        // Assert: desiredSize is not null.
1694        assert!(desired_size.is_some());
1695
1696        // If desiredSize > 0, return true.
1697        if desired_size.unwrap() > 0. {
1698            return true;
1699        }
1700
1701        // Return false.
1702        false
1703    }
1704    /// <https://streams.spec.whatwg.org/#set-up-readable-byte-stream-controller>
1705    pub(crate) fn setup(
1706        &self,
1707        global: &GlobalScope,
1708        stream: DomRoot<ReadableStream>,
1709        can_gc: CanGc,
1710    ) -> Fallible<()> {
1711        // Assert: stream.[[controller]] is undefined.
1712        stream.assert_no_controller();
1713
1714        // If autoAllocateChunkSize is not undefined,
1715        if self.auto_allocate_chunk_size.is_some() {
1716            // Assert: ! IsInteger(autoAllocateChunkSize) is true. Implicit
1717            // Assert: autoAllocateChunkSize is positive. (Implicit by type.)
1718        }
1719
1720        // Set controller.[[stream]] to stream.
1721        self.stream.set(Some(&stream));
1722
1723        // Set controller.[[pullAgain]] and controller.[[pulling]] to false.
1724        self.pull_again.set(false);
1725        self.pulling.set(false);
1726
1727        // Set controller.[[byobRequest]] to null.
1728        self.byob_request.set(None);
1729
1730        // Perform ! ResetQueue(controller).
1731        self.reset_queue();
1732
1733        // Set controller.[[closeRequested]] and controller.[[started]] to false.
1734        self.close_requested.set(false);
1735        self.started.set(false);
1736
1737        // Set controller.[[strategyHWM]] to highWaterMark.
1738        // Set controller.[[pullAlgorithm]] to pullAlgorithm.
1739        // Set controller.[[cancelAlgorithm]] to cancelAlgorithm.
1740        // Set controller.[[autoAllocateChunkSize]] to autoAllocateChunkSize.
1741        // Set controller.[[pendingPullIntos]] to a new empty list.
1742        // Note: the above steps are done in `new`.
1743
1744        // Set stream.[[controller]] to controller.
1745        let rooted_byte_controller = DomRoot::from_ref(self);
1746        stream.set_byte_controller(&rooted_byte_controller);
1747
1748        if let Some(underlying_source) = rooted_byte_controller.underlying_source.get() {
1749            // Let startResult be the result of performing startAlgorithm. (This might throw an exception.)
1750            let start_result = underlying_source
1751                .call_start_algorithm(
1752                    Controller::ReadableByteStreamController(rooted_byte_controller.clone()),
1753                    can_gc,
1754                )
1755                .unwrap_or_else(|| {
1756                    let promise = Promise::new(global, can_gc);
1757                    promise.resolve_native(&(), can_gc);
1758                    Ok(promise)
1759                });
1760
1761            // Let startPromise be a promise resolved with startResult.
1762            let start_promise = start_result?;
1763
1764            // Upon fulfillment of startPromise, Upon rejection of startPromise with reason r,
1765            let handler = PromiseNativeHandler::new(
1766                global,
1767                Some(Box::new(StartAlgorithmFulfillmentHandler {
1768                    controller: Dom::from_ref(&rooted_byte_controller),
1769                })),
1770                Some(Box::new(StartAlgorithmRejectionHandler {
1771                    controller: Dom::from_ref(&rooted_byte_controller),
1772                })),
1773                can_gc,
1774            );
1775            let realm = enter_realm(global);
1776            let comp = InRealm::Entered(&realm);
1777            start_promise.append_native_handler(&handler, comp, can_gc);
1778        };
1779
1780        Ok(())
1781    }
1782
1783    // <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamcontroller-releasesteps
1784    pub(crate) fn perform_release_steps(&self) -> Fallible<()> {
1785        // If this.[[pendingPullIntos]] is not empty,
1786        let mut pending_pull_intos = self.pending_pull_intos.borrow_mut();
1787        if !pending_pull_intos.is_empty() {
1788            // Let firstPendingPullInto be this.[[pendingPullIntos]][0].
1789            let mut first_pending_pull_into = pending_pull_intos.remove(0);
1790
1791            // Set firstPendingPullInto’s reader type to "none".
1792            first_pending_pull_into.reader_type = None;
1793
1794            // Set this.[[pendingPullIntos]] to the list « firstPendingPullInto »
1795            pending_pull_intos.clear();
1796            pending_pull_intos.push(first_pending_pull_into);
1797        }
1798        Ok(())
1799    }
1800
1801    /// <https://streams.spec.whatwg.org/#rbs-controller-private-cancel>
1802    pub(crate) fn perform_cancel_steps(
1803        &self,
1804        cx: SafeJSContext,
1805        global: &GlobalScope,
1806        reason: SafeHandleValue,
1807        can_gc: CanGc,
1808    ) -> Rc<Promise> {
1809        // Perform ! ReadableByteStreamControllerClearPendingPullIntos(this).
1810        self.clear_pending_pull_intos();
1811
1812        // Perform ! ResetQueue(this).
1813        self.reset_queue();
1814
1815        let underlying_source = self
1816            .underlying_source
1817            .get()
1818            .expect("Controller should have a source when the cancel steps are called into.");
1819
1820        // Let result be the result of performing this.[[cancelAlgorithm]], passing in reason.
1821        let result = underlying_source
1822            .call_cancel_algorithm(cx, global, reason, can_gc)
1823            .unwrap_or_else(|| {
1824                let promise = Promise::new(global, can_gc);
1825                promise.resolve_native(&(), can_gc);
1826                Ok(promise)
1827            });
1828
1829        let promise = result.unwrap_or_else(|error| {
1830            let cx = GlobalScope::get_cx();
1831            rooted!(in(*cx) let mut rval = UndefinedValue());
1832            error
1833                .clone()
1834                .to_jsval(cx, global, rval.handle_mut(), can_gc);
1835            let promise = Promise::new(global, can_gc);
1836            promise.reject_native(&rval.handle(), can_gc);
1837            promise
1838        });
1839
1840        // Perform ! ReadableByteStreamControllerClearAlgorithms(this).
1841        self.clear_algorithms();
1842
1843        // Return result(the promise).
1844        promise
1845    }
1846
1847    /// <https://streams.spec.whatwg.org/#rbs-controller-private-pull>
1848    pub(crate) fn perform_pull_steps(
1849        &self,
1850        cx: SafeJSContext,
1851        read_request: &ReadRequest,
1852        can_gc: CanGc,
1853    ) {
1854        // Let stream be this.[[stream]].
1855        let stream = self.stream.get().unwrap();
1856
1857        // Assert: ! ReadableStreamHasDefaultReader(stream) is true.
1858        assert!(stream.has_default_reader());
1859
1860        // If this.[[queueTotalSize]] > 0,
1861        if self.queue_total_size.get() > 0.0 {
1862            // Assert: ! ReadableStreamGetNumReadRequests(stream) is 0.
1863            assert_eq!(stream.get_num_read_requests(), 0);
1864
1865            // Perform ! ReadableByteStreamControllerFillReadRequestFromQueue(this, readRequest).
1866            let _ = self.fill_read_request_from_queue(cx, read_request, can_gc);
1867
1868            // Return.
1869            return;
1870        }
1871
1872        // Let autoAllocateChunkSize be this.[[autoAllocateChunkSize]].
1873        let auto_allocate_chunk_size = self.auto_allocate_chunk_size;
1874
1875        // If autoAllocateChunkSize is not undefined,
1876        if let Some(auto_allocate_chunk_size) = auto_allocate_chunk_size {
1877            // create_array_buffer_with_size
1878            // Let buffer be Construct(%ArrayBuffer%, « autoAllocateChunkSize »).
1879            match create_array_buffer_with_size(cx, auto_allocate_chunk_size as usize) {
1880                Ok(buffer) => {
1881                    // Let pullIntoDescriptor be a new pull-into descriptor with
1882                    // buffer buffer.[[Value]]
1883                    // buffer byte length autoAllocateChunkSize
1884                    // byte offset  0
1885                    // byte length  autoAllocateChunkSize
1886                    // bytes filled  0
1887                    // minimum fill 1
1888                    // element size 1
1889                    // view constructor %Uint8Array%
1890                    // reader type  "default"
1891                    let pull_into_descriptor = PullIntoDescriptor {
1892                        buffer,
1893                        buffer_byte_length: auto_allocate_chunk_size,
1894                        byte_length: auto_allocate_chunk_size,
1895                        byte_offset: 0,
1896                        bytes_filled: Cell::new(0),
1897                        minimum_fill: 1,
1898                        element_size: 1,
1899                        view_constructor: Constructor::Name(Type::Uint8),
1900                        reader_type: Some(ReaderType::Default),
1901                    };
1902
1903                    // Append pullIntoDescriptor to this.[[pendingPullIntos]].
1904                    self.pending_pull_intos
1905                        .borrow_mut()
1906                        .push(pull_into_descriptor);
1907                },
1908                Err(error) => {
1909                    // If buffer is an abrupt completion,
1910                    // Perform readRequest’s error steps, given buffer.[[Value]].
1911
1912                    rooted!(in(*cx) let mut rval = UndefinedValue());
1913                    error
1914                        .clone()
1915                        .to_jsval(cx, &self.global(), rval.handle_mut(), can_gc);
1916                    read_request.error_steps(rval.handle(), can_gc);
1917
1918                    // Return.
1919                    return;
1920                },
1921            }
1922        }
1923
1924        // Perform ! ReadableStreamAddReadRequest(stream, readRequest).
1925        stream.add_read_request(read_request);
1926
1927        // Perform ! ReadableByteStreamControllerCallPullIfNeeded(this).
1928        self.call_pull_if_needed(can_gc);
1929    }
1930
1931    /// Setting the JS object after the heap has settled down.
1932    pub(crate) fn set_underlying_source_this_object(&self, this_object: HandleObject) {
1933        if let Some(underlying_source) = self.underlying_source.get() {
1934            underlying_source.set_underlying_source_this_object(this_object);
1935        }
1936    }
1937
1938    pub(crate) fn remove_entry(&self) -> QueueEntry {
1939        self.queue
1940            .borrow_mut()
1941            .pop_front()
1942            .expect("Reader must have read request when remove is called into.")
1943    }
1944
1945    pub(crate) fn get_queue_total_size(&self) -> f64 {
1946        self.queue_total_size.get()
1947    }
1948}
1949
1950impl ReadableByteStreamControllerMethods<crate::DomTypeHolder> for ReadableByteStreamController {
1951    /// <https://streams.spec.whatwg.org/#rbs-controller-byob-request>
1952    fn GetByobRequest(
1953        &self,
1954        can_gc: CanGc,
1955    ) -> Fallible<Option<DomRoot<ReadableStreamBYOBRequest>>> {
1956        let cx = GlobalScope::get_cx();
1957        // Return ! ReadableByteStreamControllerGetBYOBRequest(this).
1958        self.get_byob_request(cx, can_gc)
1959    }
1960
1961    /// <https://streams.spec.whatwg.org/#rbs-controller-desired-size>
1962    fn GetDesiredSize(&self) -> Option<f64> {
1963        // Return ! ReadableByteStreamControllerGetDesiredSize(this).
1964        self.get_desired_size()
1965    }
1966
1967    /// <https://streams.spec.whatwg.org/#rbs-controller-close>
1968    fn Close(&self, can_gc: CanGc) -> Fallible<()> {
1969        let cx = GlobalScope::get_cx();
1970        // If this.[[closeRequested]] is true, throw a TypeError exception.
1971        if self.close_requested.get() {
1972            return Err(Error::Type("closeRequested is true".to_owned()));
1973        }
1974
1975        // If this.[[stream]].[[state]] is not "readable", throw a TypeError exception.
1976        if !self.stream.get().unwrap().is_readable() {
1977            return Err(Error::Type("stream is not readable".to_owned()));
1978        }
1979
1980        // Perform ? ReadableByteStreamControllerClose(this).
1981        self.close(cx, can_gc)
1982    }
1983
1984    /// <https://streams.spec.whatwg.org/#rbs-controller-enqueue>
1985    fn Enqueue(
1986        &self,
1987        chunk: js::gc::CustomAutoRooterGuard<js::typedarray::ArrayBufferView>,
1988        can_gc: CanGc,
1989    ) -> Fallible<()> {
1990        let cx = GlobalScope::get_cx();
1991
1992        let chunk = HeapBufferSource::<ArrayBufferViewU8>::from_view(chunk);
1993
1994        // If chunk.[[ByteLength]] is 0, throw a TypeError exception.
1995        if chunk.byte_length() == 0 {
1996            return Err(Error::Type("chunk.ByteLength is 0".to_owned()));
1997        }
1998
1999        // If chunk.[[ViewedArrayBuffer]].[[ByteLength]] is 0, throw a TypeError exception.
2000        if chunk.viewed_buffer_array_byte_length(cx) == 0 {
2001            return Err(Error::Type(
2002                "chunk.ViewedArrayBuffer.ByteLength is 0".to_owned(),
2003            ));
2004        }
2005
2006        // If this.[[closeRequested]] is true, throw a TypeError exception.
2007        if self.close_requested.get() {
2008            return Err(Error::Type("closeRequested is true".to_owned()));
2009        }
2010
2011        // If this.[[stream]].[[state]] is not "readable", throw a TypeError exception.
2012        if !self.stream.get().unwrap().is_readable() {
2013            return Err(Error::Type("stream is not readable".to_owned()));
2014        }
2015
2016        // Return ? ReadableByteStreamControllerEnqueue(this, chunk).
2017        self.enqueue(cx, chunk, can_gc)
2018    }
2019
2020    /// <https://streams.spec.whatwg.org/#rbs-controller-error>
2021    fn Error(&self, _cx: SafeJSContext, e: SafeHandleValue, can_gc: CanGc) -> Fallible<()> {
2022        // Perform ! ReadableByteStreamControllerError(this, e).
2023        self.error(e, can_gc);
2024        Ok(())
2025    }
2026}