Skip to main content

script/dom/stream/
readablebytestreamcontroller.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4
5use std::cell::Cell;
6use std::cmp::min;
7use std::collections::VecDeque;
8use std::rc::Rc;
9
10use dom_struct::dom_struct;
11use js::context::JSContext;
12use js::jsapi::{Heap, Type};
13use js::jsval::UndefinedValue;
14use js::realm::CurrentRealm;
15use js::rust::{HandleObject, HandleValue as SafeHandleValue, HandleValue};
16use js::typedarray::{ArrayBufferU8, ArrayBufferViewU8};
17use script_bindings::cell::DomRefCell;
18use script_bindings::reflector::{Reflector, reflect_dom_object};
19
20use super::readablestreambyobreader::ReadIntoRequest;
21use super::readablestreamdefaultreader::ReadRequest;
22use super::underlyingsourcecontainer::{UnderlyingSourceContainer, UnderlyingSourceType};
23use crate::dom::bindings::buffer_source::{
24    Constructor, HeapBufferSource, byte_size, create_array_buffer_with_size,
25    create_buffer_source_with_constructor,
26};
27use crate::dom::bindings::codegen::Bindings::ReadableByteStreamControllerBinding::ReadableByteStreamControllerMethods;
28use crate::dom::bindings::codegen::UnionTypes::ReadableStreamDefaultControllerOrReadableByteStreamController as Controller;
29use crate::dom::bindings::error::{Error, ErrorToJsval, Fallible};
30use crate::dom::bindings::reflector::DomGlobal;
31use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom};
32use crate::dom::bindings::trace::RootedTraceableBox;
33use crate::dom::globalscope::GlobalScope;
34use crate::dom::promise::Promise;
35use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
36use crate::dom::stream::readablestream::ReadableStream;
37use crate::dom::stream::readablestreambyobrequest::ReadableStreamBYOBRequest;
38use crate::realms::enter_auto_realm;
39use crate::script_runtime::CanGc;
40
41/// <https://streams.spec.whatwg.org/#readable-byte-stream-queue-entry>
42#[derive(JSTraceable, MallocSizeOf)]
43#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
44pub(crate) struct QueueEntry {
45    /// <https://streams.spec.whatwg.org/#readable-byte-stream-queue-entry-buffer>
46    #[ignore_malloc_size_of = "HeapBufferSource"]
47    buffer: HeapBufferSource<ArrayBufferU8>,
48    /// <https://streams.spec.whatwg.org/#readable-byte-stream-queue-entry-byte-offset>
49    byte_offset: usize,
50    /// <https://streams.spec.whatwg.org/#readable-byte-stream-queue-entry-byte-length>
51    byte_length: usize,
52}
53
54impl js::gc::Rootable for QueueEntry {}
55
56impl QueueEntry {
57    pub(crate) fn new(
58        buffer: RootedTraceableBox<HeapBufferSource<ArrayBufferU8>>,
59        byte_offset: usize,
60        byte_length: usize,
61    ) -> QueueEntry {
62        QueueEntry {
63            buffer: *buffer.into_box(),
64            byte_offset,
65            byte_length,
66        }
67    }
68}
69
70#[derive(Debug, Eq, JSTraceable, MallocSizeOf, PartialEq)]
71pub(crate) enum ReaderType {
72    /// <https://streams.spec.whatwg.org/#readablestreambyobreader>
73    Byob,
74    /// <https://streams.spec.whatwg.org/#readablestreamdefaultreader>
75    Default,
76}
77
78/// <https://streams.spec.whatwg.org/#pull-into-descriptor>
79#[derive(Eq, JSTraceable, MallocSizeOf, PartialEq)]
80#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
81pub(crate) struct PullIntoDescriptor {
82    #[ignore_malloc_size_of = "HeapBufferSource"]
83    /// <https://streams.spec.whatwg.org/#pull-into-descriptor-buffer>
84    buffer: HeapBufferSource<ArrayBufferU8>,
85    /// <https://streams.spec.whatwg.org/#pull-into-descriptor-buffer-byte-length>
86    buffer_byte_length: u64,
87    /// <https://streams.spec.whatwg.org/#pull-into-descriptor-byte-offset>
88    byte_offset: u64,
89    /// <https://streams.spec.whatwg.org/#pull-into-descriptor-byte-length>
90    byte_length: u64,
91    /// <https://streams.spec.whatwg.org/#pull-into-descriptor-bytes-filled>
92    bytes_filled: Cell<u64>,
93    /// <https://streams.spec.whatwg.org/#pull-into-descriptor-minimum-fill>
94    minimum_fill: u64,
95    /// <https://streams.spec.whatwg.org/#pull-into-descriptor-element-size>
96    element_size: u64,
97    /// <https://streams.spec.whatwg.org/#pull-into-descriptor-view-constructor>
98    view_constructor: Constructor,
99    /// <https://streams.spec.whatwg.org/#pull-into-descriptor-reader-type>
100    reader_type: Option<ReaderType>,
101}
102
103impl js::gc::Rootable for PullIntoDescriptor {}
104
105/// The fulfillment handler for
106/// <https://streams.spec.whatwg.org/#dom-underlyingsource-start>
107#[derive(Clone, JSTraceable, MallocSizeOf)]
108#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
109struct StartAlgorithmFulfillmentHandler {
110    controller: Dom<ReadableByteStreamController>,
111}
112
113impl Callback for StartAlgorithmFulfillmentHandler {
114    /// Continuation of <https://streams.spec.whatwg.org/#set-up-readable-byte-stream-controller>
115    /// Upon fulfillment of startPromise,
116    fn callback(&self, cx: &mut CurrentRealm, _v: HandleValue) {
117        // Set controller.[[started]] to true.
118        self.controller.started.set(true);
119
120        // Assert: controller.[[pulling]] is false.
121        assert!(!self.controller.pulling.get());
122
123        // Assert: controller.[[pullAgain]] is false.
124        assert!(!self.controller.pull_again.get());
125
126        // Perform ! ReadableByteStreamControllerCallPullIfNeeded(controller).
127        self.controller.call_pull_if_needed(cx);
128    }
129}
130
131/// The rejection handler for
132/// <https://streams.spec.whatwg.org/#dom-underlyingsource-start>
133#[derive(Clone, JSTraceable, MallocSizeOf)]
134#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
135struct StartAlgorithmRejectionHandler {
136    controller: Dom<ReadableByteStreamController>,
137}
138
139impl Callback for StartAlgorithmRejectionHandler {
140    /// Continuation of <https://streams.spec.whatwg.org/#set-up-readable-byte-stream-controller>
141    /// Upon rejection of startPromise with reason r,
142    fn callback(&self, cx: &mut CurrentRealm, v: HandleValue) {
143        // Perform ! ReadableByteStreamControllerError(controller, r).
144        self.controller.error(cx, v);
145    }
146}
147
148/// The fulfillment handler for
149/// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-call-pull-if-needed>
150#[derive(Clone, JSTraceable, MallocSizeOf)]
151#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
152struct PullAlgorithmFulfillmentHandler {
153    controller: Dom<ReadableByteStreamController>,
154}
155
156impl Callback for PullAlgorithmFulfillmentHandler {
157    /// Continuation of <https://streams.spec.whatwg.org/#readable-byte-stream-controller-call-pull-if-needed>
158    /// Upon fulfillment of pullPromise
159    fn callback(&self, cx: &mut CurrentRealm, _v: HandleValue) {
160        // Set controller.[[pulling]] to false.
161        self.controller.pulling.set(false);
162
163        // If controller.[[pullAgain]] is true,
164        if self.controller.pull_again.get() {
165            // Set controller.[[pullAgain]] to false.
166            self.controller.pull_again.set(false);
167
168            // Perform ! ReadableByteStreamControllerCallPullIfNeeded(controller).
169            self.controller.call_pull_if_needed(cx);
170        }
171    }
172}
173
174/// The rejection handler for
175/// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-call-pull-if-needed>
176#[derive(Clone, JSTraceable, MallocSizeOf)]
177#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
178struct PullAlgorithmRejectionHandler {
179    controller: Dom<ReadableByteStreamController>,
180}
181
182impl Callback for PullAlgorithmRejectionHandler {
183    /// Continuation of <https://streams.spec.whatwg.org/#readable-stream-byte-controller-call-pull-if-needed>
184    /// Upon rejection of pullPromise with reason e.
185    fn callback(&self, cx: &mut CurrentRealm, v: HandleValue) {
186        // Perform ! ReadableByteStreamControllerError(controller, e).
187        self.controller.error(cx, v);
188    }
189}
190
191/// <https://streams.spec.whatwg.org/#readablebytestreamcontroller>
192#[dom_struct]
193pub(crate) struct ReadableByteStreamController {
194    reflector_: Reflector,
195    /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-autoallocatechunksize>
196    auto_allocate_chunk_size: Option<u64>,
197    /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-stream>
198    stream: MutNullableDom<ReadableStream>,
199    /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-strategyhwm>
200    strategy_hwm: f64,
201    /// A mutable reference to the underlying source is used to implement these two
202    /// internal slots:
203    ///
204    /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-pullalgorithm>
205    /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-cancelalgorithm>
206    underlying_source: MutNullableDom<UnderlyingSourceContainer>,
207    /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-queue>
208    queue: DomRefCell<VecDeque<QueueEntry>>,
209    /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-queuetotalsize>
210    queue_total_size: Cell<f64>,
211    /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-byobrequest>
212    byob_request: MutNullableDom<ReadableStreamBYOBRequest>,
213    /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-pendingpullintos>
214    pending_pull_intos: DomRefCell<Vec<PullIntoDescriptor>>,
215    /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-closerequested>
216    close_requested: Cell<bool>,
217    /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-started>
218    started: Cell<bool>,
219    /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-pulling>
220    pulling: Cell<bool>,
221    /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-pullalgorithm>
222    pull_again: Cell<bool>,
223}
224
225impl ReadableByteStreamController {
226    fn new_inherited(
227        underlying_source_type: UnderlyingSourceType,
228        strategy_hwm: f64,
229        global: &GlobalScope,
230        can_gc: CanGc,
231    ) -> ReadableByteStreamController {
232        let underlying_source_container =
233            UnderlyingSourceContainer::new(global, underlying_source_type, can_gc);
234        let auto_allocate_chunk_size = underlying_source_container.auto_allocate_chunk_size();
235        ReadableByteStreamController {
236            reflector_: Reflector::new(),
237            byob_request: MutNullableDom::new(None),
238            stream: MutNullableDom::new(None),
239            underlying_source: MutNullableDom::new(Some(&*underlying_source_container)),
240            auto_allocate_chunk_size,
241            pending_pull_intos: DomRefCell::new(Vec::new()),
242            strategy_hwm,
243            close_requested: Default::default(),
244            queue: DomRefCell::new(Default::default()),
245            queue_total_size: Default::default(),
246            started: Default::default(),
247            pulling: Default::default(),
248            pull_again: Default::default(),
249        }
250    }
251
252    pub(crate) fn new(
253        underlying_source_type: UnderlyingSourceType,
254        strategy_hwm: f64,
255        global: &GlobalScope,
256        can_gc: CanGc,
257    ) -> DomRoot<ReadableByteStreamController> {
258        reflect_dom_object(
259            Box::new(ReadableByteStreamController::new_inherited(
260                underlying_source_type,
261                strategy_hwm,
262                global,
263                can_gc,
264            )),
265            global,
266            can_gc,
267        )
268    }
269
270    #[allow(dead_code)]
271    pub(crate) fn set_stream(&self, stream: &ReadableStream) {
272        self.stream.set(Some(stream))
273    }
274
275    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-pull-into>
276    pub(crate) fn perform_pull_into(
277        &self,
278        cx: &mut JSContext,
279        read_into_request: &ReadIntoRequest,
280        view: &HeapBufferSource<ArrayBufferViewU8>,
281        min: u64,
282    ) {
283        // Let stream be controller.[[stream]].
284        let stream = self.stream.get().unwrap();
285
286        // Let elementSize be 1.
287        let mut element_size = 1;
288
289        // Let ctor be %DataView%.
290        let mut ctor = Constructor::DataView;
291
292        // If view has a [[TypedArrayName]] internal slot (i.e., it is not a DataView),
293        if view.has_typed_array_name() {
294            // Set elementSize to the element size specified in the
295            // typed array constructors table for view.[[TypedArrayName]].
296            let view_typw = view.get_array_buffer_view_type();
297            element_size = byte_size(view_typw);
298
299            // Set ctor to the constructor specified in the typed array constructors table for view.[[TypedArrayName]].
300            ctor = Constructor::Name(view_typw);
301        }
302
303        // Let minimumFill be min × elementSize.
304        let minimum_fill = min * element_size;
305
306        // Assert: minimumFill ≥ 0 and minimumFill ≤ view.[[ByteLength]].
307        assert!(minimum_fill <= (view.byte_length() as u64));
308
309        // Assert: the remainder after dividing minimumFill by elementSize is 0.
310        assert_eq!(minimum_fill % element_size, 0);
311
312        // Let byteOffset be view.[[ByteOffset]].
313        let byte_offset = view.get_byte_offset();
314
315        // Let byteLength be view.[[ByteLength]].
316        let byte_length = view.byte_length();
317
318        // Let bufferResult be TransferArrayBuffer(view.[[ViewedArrayBuffer]]).
319        match view
320            .get_array_buffer_view_buffer(cx.into())
321            .transfer_array_buffer(cx.into())
322        {
323            Ok(buffer) => {
324                // Let buffer be bufferResult.[[Value]].
325                // Let pullIntoDescriptor be a new pull-into descriptor with
326                // buffer   buffer
327                // buffer byte length   buffer.[[ArrayBufferByteLength]]
328                // byte offset  byteOffset
329                // byte length  byteLength
330                // bytes filled  0
331                // minimum fill minimumFill
332                // element size elementSize
333                // view constructor ctor
334                // reader type  "byob"
335                let buffer_byte_length = buffer.byte_length();
336                let pull_into_descriptor = RootedTraceableBox::new(PullIntoDescriptor {
337                    buffer: *buffer.into_box(),
338                    buffer_byte_length: buffer_byte_length as u64,
339                    byte_offset: byte_offset as u64,
340                    byte_length: byte_length as u64,
341                    bytes_filled: Cell::new(0),
342                    minimum_fill,
343                    element_size,
344                    view_constructor: ctor.clone(),
345                    reader_type: Some(ReaderType::Byob),
346                });
347
348                // If controller.[[pendingPullIntos]] is not empty,
349                {
350                    let mut pending_pull_intos = self.pending_pull_intos.borrow_mut();
351                    if !pending_pull_intos.is_empty() {
352                        // Append pullIntoDescriptor to controller.[[pendingPullIntos]].
353                        pending_pull_intos.push(*pull_into_descriptor.into_box());
354
355                        // Perform ! ReadableStreamAddReadIntoRequest(stream, readIntoRequest).
356                        stream.add_read_into_request(read_into_request);
357
358                        // Return.
359                        return;
360                    }
361                }
362
363                // If stream.[[state]] is "closed",
364                if stream.is_closed() {
365                    // Let emptyView be ! Construct(ctor, « pullIntoDescriptor’s buffer,
366                    // pullIntoDescriptor’s byte offset, 0 »).
367                    if let Ok(empty_view) = create_buffer_source_with_constructor(
368                        cx,
369                        &ctor,
370                        &pull_into_descriptor.buffer,
371                        pull_into_descriptor.byte_offset as usize,
372                        0,
373                    ) {
374                        // Perform readIntoRequest’s close steps, given emptyView.
375                        let result = RootedTraceableBox::new(Heap::default());
376                        rooted!(&in(cx) let mut view_value = UndefinedValue());
377                        empty_view.get_buffer_view_value(cx.into(), view_value.handle_mut());
378                        result.set(*view_value);
379
380                        read_into_request.close_steps(cx, Some(result));
381
382                        // Return.
383                        return;
384                    } else {
385                        return;
386                    }
387                }
388
389                // If controller.[[queueTotalSize]] > 0,
390                if self.queue_total_size.get() > 0.0 {
391                    // If ! ReadableByteStreamControllerFillPullIntoDescriptorFromQueue(
392                    // controller, pullIntoDescriptor) is true,
393                    if self.fill_pull_into_descriptor_from_queue(cx, &pull_into_descriptor) {
394                        // Let filledView be ! ReadableByteStreamControllerConvertPullIntoDescriptor(
395                        // pullIntoDescriptor).
396                        if let Ok(filled_view) =
397                            self.convert_pull_into_descriptor(cx, &pull_into_descriptor)
398                        {
399                            // Perform ! ReadableByteStreamControllerHandleQueueDrain(controller).
400                            self.handle_queue_drain(cx);
401
402                            // Perform readIntoRequest’s chunk steps, given filledView.
403                            let result = RootedTraceableBox::new(Heap::default());
404                            rooted!(&in(cx) let mut view_value = UndefinedValue());
405                            filled_view.get_buffer_view_value(cx.into(), view_value.handle_mut());
406                            result.set(*view_value);
407                            read_into_request.chunk_steps(result, CanGc::from_cx(cx));
408
409                            // Return.
410                            return;
411                        } else {
412                            return;
413                        }
414                    }
415
416                    // If controller.[[closeRequested]] is true,
417                    if self.close_requested.get() {
418                        // Let e be a new TypeError exception.
419                        rooted!(&in(cx) let mut error = UndefinedValue());
420                        Error::Type(c"close requested".to_owned()).to_jsval(
421                            cx.into(),
422                            &self.global(),
423                            error.handle_mut(),
424                            CanGc::from_cx(cx),
425                        );
426
427                        // Perform ! ReadableByteStreamControllerError(controller, e).
428                        self.error(cx, error.handle());
429
430                        // Perform readIntoRequest’s error steps, given e.
431                        read_into_request.error_steps(error.handle(), CanGc::from_cx(cx));
432
433                        // Return.
434                        return;
435                    }
436                }
437
438                // Append pullIntoDescriptor to controller.[[pendingPullIntos]].
439                {
440                    self.pending_pull_intos
441                        .borrow_mut()
442                        .push(*pull_into_descriptor.into_box());
443                }
444                // Perform ! ReadableStreamAddReadIntoRequest(stream, readIntoRequest).
445                stream.add_read_into_request(read_into_request);
446
447                // Perform ! ReadableByteStreamControllerCallPullIfNeeded(controller).
448                self.call_pull_if_needed(cx);
449            },
450            Err(error) => {
451                // If bufferResult is an abrupt completion,
452
453                // Perform readIntoRequest’s error steps, given bufferResult.[[Value]].
454                rooted!(&in(cx) let mut rval = UndefinedValue());
455                error.to_jsval(
456                    cx.into(),
457                    &self.global(),
458                    rval.handle_mut(),
459                    CanGc::from_cx(cx),
460                );
461                read_into_request.error_steps(rval.handle(), CanGc::from_cx(cx));
462
463                // Return.
464            },
465        }
466    }
467
468    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-respond>
469    pub(crate) fn respond(&self, cx: &mut JSContext, bytes_written: u64) -> Fallible<()> {
470        {
471            // Assert: controller.[[pendingPullIntos]] is not empty.
472            let mut pending_pull_intos = self.pending_pull_intos.borrow_mut();
473            assert!(!pending_pull_intos.is_empty());
474
475            // Let firstDescriptor be controller.[[pendingPullIntos]][0].
476            let first_descriptor = pending_pull_intos.first_mut().unwrap();
477
478            // Let state be controller.[[stream]].[[state]].
479            let stream = self.stream.get().unwrap();
480
481            // If state is "closed",
482            if stream.is_closed() {
483                // If bytesWritten is not 0, throw a TypeError exception.
484                if bytes_written != 0 {
485                    return Err(Error::Type(
486                        c"bytesWritten not zero on closed stream".to_owned(),
487                    ));
488                }
489            } else {
490                // Assert: state is "readable".
491                assert!(stream.is_readable());
492
493                // If bytesWritten is 0, throw a TypeError exception.
494                if bytes_written == 0 {
495                    return Err(Error::Type(c"bytesWritten is 0".to_owned()));
496                }
497
498                // If firstDescriptor’s bytes filled + bytesWritten > firstDescriptor’s byte length,
499                // throw a RangeError exception.
500                if first_descriptor.bytes_filled.get() + bytes_written >
501                    first_descriptor.byte_length
502                {
503                    return Err(Error::Range(
504                        c"bytes filled + bytesWritten > byte length".to_owned(),
505                    ));
506                }
507            }
508
509            // Set firstDescriptor’s buffer to ! TransferArrayBuffer(firstDescriptor’s buffer).
510            first_descriptor.buffer = *first_descriptor
511                .buffer
512                .transfer_array_buffer(cx.into())
513                .expect("TransferArrayBuffer failed")
514                .into_box();
515        }
516
517        // Perform ? ReadableByteStreamControllerRespondInternal(controller, bytesWritten).
518        self.respond_internal(cx, bytes_written)
519    }
520
521    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-respond-internal>
522    fn respond_internal(&self, cx: &mut JSContext, bytes_written: u64) -> Fallible<()> {
523        {
524            // Let firstDescriptor be controller.[[pendingPullIntos]][0].
525            let pending_pull_intos = self.pending_pull_intos.borrow();
526            let first_descriptor = pending_pull_intos.first().unwrap();
527
528            // Assert: ! CanTransferArrayBuffer(firstDescriptor’s buffer) is true
529            assert!(first_descriptor.buffer.can_transfer_array_buffer(cx.into()));
530        }
531
532        // Perform ! ReadableByteStreamControllerInvalidateBYOBRequest(controller).
533        self.invalidate_byob_request();
534
535        // Let state be controller.[[stream]].[[state]].
536        let stream = self.stream.get().unwrap();
537
538        // If state is "closed",
539        if stream.is_closed() {
540            // Assert: bytesWritten is 0.
541            assert_eq!(bytes_written, 0);
542
543            // Perform ! ReadableByteStreamControllerRespondInClosedState(controller, firstDescriptor).
544            self.respond_in_closed_state(cx)
545                .expect("respond_in_closed_state failed");
546        } else {
547            // Assert: state is "readable".
548            assert!(stream.is_readable());
549
550            // Assert: bytesWritten > 0.
551            assert!(bytes_written > 0);
552
553            // Perform ? ReadableByteStreamControllerRespondInReadableState(controller, bytesWritten, firstDescriptor).
554            self.respond_in_readable_state(cx, bytes_written)?;
555        }
556
557        // Perform ! ReadableByteStreamControllerCallPullIfNeeded(controller).
558        self.call_pull_if_needed(cx);
559
560        Ok(())
561    }
562
563    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-respond-in-closed-state>
564    fn respond_in_closed_state(&self, cx: &mut JSContext) -> Fallible<()> {
565        let pending_pull_intos = self.pending_pull_intos.borrow();
566        let first_descriptor = pending_pull_intos.first().unwrap();
567
568        // Assert: the remainder after dividing firstDescriptor’s bytes filled
569        // by firstDescriptor’s element size is 0.
570        assert_eq!(
571            first_descriptor.bytes_filled.get() % first_descriptor.element_size,
572            0
573        );
574
575        // If firstDescriptor’s reader type is "none",
576        // perform ! ReadableByteStreamControllerShiftPendingPullInto(controller).
577        let reader_type = first_descriptor.reader_type.is_none();
578
579        // needed to drop the borrow and avoid BorrowMutError
580        drop(pending_pull_intos);
581
582        if reader_type {
583            self.shift_pending_pull_into();
584        }
585
586        // Let stream be controller.[[stream]].
587        let stream = self.stream.get().unwrap();
588
589        // If ! ReadableStreamHasBYOBReader(stream) is true,
590        if stream.has_byob_reader() {
591            // Let filledPullIntos be a new empty list.
592            rooted!(&in(cx) let mut filled_pull_intos = Vec::new());
593
594            // While filledPullIntos’s size < ! ReadableStreamGetNumReadIntoRequests(stream),
595            while filled_pull_intos.len() < stream.get_num_read_into_requests() {
596                // Let pullIntoDescriptor be ! ReadableByteStreamControllerShiftPendingPullInto(controller).
597                // Append pullIntoDescriptor to filledPullIntos.
598                filled_pull_intos.push(self.shift_pending_pull_into());
599            }
600
601            // For each filledPullInto of filledPullIntos,
602            for filled_pull_into in &*filled_pull_intos {
603                // Perform ! ReadableByteStreamControllerCommitPullIntoDescriptor(stream, filledPullInto).
604                self.commit_pull_into_descriptor(cx, filled_pull_into)
605                    .expect("commit_pull_into_descriptor failed");
606            }
607        }
608
609        Ok(())
610    }
611
612    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-respond-in-readable-state>
613    fn respond_in_readable_state(&self, cx: &mut JSContext, bytes_written: u64) -> Fallible<()> {
614        let pending_pull_intos = self.pending_pull_intos.borrow();
615        let first_descriptor = pending_pull_intos.first().unwrap();
616
617        // Assert: pullIntoDescriptor’s bytes filled + bytesWritten ≤ pullIntoDescriptor’s byte length.
618        assert!(
619            first_descriptor.bytes_filled.get() + bytes_written <= first_descriptor.byte_length
620        );
621
622        // Perform ! ReadableByteStreamControllerFillHeadPullIntoDescriptor(
623        // controller, bytesWritten, pullIntoDescriptor).
624        self.fill_head_pull_into_descriptor(bytes_written, first_descriptor);
625
626        // If pullIntoDescriptor’s reader type is "none",
627        if first_descriptor.reader_type.is_none() {
628            // needed to drop the borrow and avoid BorrowMutError
629            drop(pending_pull_intos);
630
631            // Perform ? ReadableByteStreamControllerEnqueueDetachedPullIntoToQueue(controller, pullIntoDescriptor).
632            self.enqueue_detached_pull_into_to_queue(cx)?;
633
634            // Let filledPullIntos be the result of performing
635            // ! ReadableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(controller).
636            rooted!(&in(cx) let filled_pull_intos = self.process_pull_into_descriptors_using_queue(cx));
637
638            // For each filledPullInto of filledPullIntos,
639            for filled_pull_into in &*filled_pull_intos {
640                // Perform ! ReadableByteStreamControllerCommitPullIntoDescriptor(controller.[[stream]]
641                // , filledPullInto).
642                self.commit_pull_into_descriptor(cx, filled_pull_into)
643                    .expect("commit_pull_into_descriptor failed");
644            }
645
646            // Return.
647            return Ok(());
648        }
649
650        // If pullIntoDescriptor’s bytes filled < pullIntoDescriptor’s minimum fill, return.
651        if first_descriptor.bytes_filled.get() < first_descriptor.minimum_fill {
652            return Ok(());
653        }
654
655        // needed to drop the borrow and avoid BorrowMutError
656        drop(pending_pull_intos);
657
658        // Perform ! ReadableByteStreamControllerShiftPendingPullInto(controller).
659        rooted!(&in(cx) let pull_into_descriptor = self.shift_pending_pull_into());
660
661        // Let remainderSize be the remainder after dividing pullIntoDescriptor’s bytes
662        // filled by pullIntoDescriptor’s element size.
663        let remainder_size =
664            pull_into_descriptor.bytes_filled.get() % pull_into_descriptor.element_size;
665
666        // If remainderSize > 0,
667        if remainder_size > 0 {
668            // Let end be pullIntoDescriptor’s byte offset + pullIntoDescriptor’s bytes filled.
669            let end = pull_into_descriptor.byte_offset + pull_into_descriptor.bytes_filled.get();
670
671            // Perform ? ReadableByteStreamControllerEnqueueClonedChunkToQueue(controller,
672            // pullIntoDescriptor’s buffer, end − remainderSize, remainderSize).
673            self.enqueue_cloned_chunk_to_queue(
674                cx,
675                &pull_into_descriptor.buffer,
676                end - remainder_size,
677                remainder_size,
678            )?;
679        }
680
681        // Set pullIntoDescriptor’s bytes filled to pullIntoDescriptor’s bytes filled − remainderSize.
682        pull_into_descriptor
683            .bytes_filled
684            .set(pull_into_descriptor.bytes_filled.get() - remainder_size);
685
686        // Let filledPullIntos be the result of performing
687        // ! ReadableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(controller).
688        rooted!(&in(cx) let filled_pull_intos = self.process_pull_into_descriptors_using_queue(cx));
689
690        // Perform ! ReadableByteStreamControllerCommitPullIntoDescriptor(controller.[[stream]], pullIntoDescriptor).
691        self.commit_pull_into_descriptor(cx, &pull_into_descriptor)
692            .expect("commit_pull_into_descriptor failed");
693
694        // For each filledPullInto of filledPullIntos,
695        for filled_pull_into in &*filled_pull_intos {
696            // Perform ! ReadableByteStreamControllerCommitPullIntoDescriptor(controller.[[stream]], filledPullInto).
697            self.commit_pull_into_descriptor(cx, filled_pull_into)
698                .expect("commit_pull_into_descriptor failed");
699        }
700
701        Ok(())
702    }
703
704    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-respond-with-new-view>
705    pub(crate) fn respond_with_new_view(
706        &self,
707        cx: &mut JSContext,
708        view: &HeapBufferSource<ArrayBufferViewU8>,
709    ) -> Fallible<()> {
710        let view_byte_length;
711        {
712            // Assert: controller.[[pendingPullIntos]] is not empty.
713            let mut pending_pull_intos = self.pending_pull_intos.borrow_mut();
714            assert!(!pending_pull_intos.is_empty());
715
716            // Assert: ! IsDetachedBuffer(view.[[ViewedArrayBuffer]]) is false.
717            assert!(!view.is_detached_buffer(cx.into()));
718
719            // Let firstDescriptor be controller.[[pendingPullIntos]][0].
720            let first_descriptor = pending_pull_intos.first_mut().unwrap();
721
722            // Let state be controller.[[stream]].[[state]].
723            let stream = self.stream.get().unwrap();
724
725            // If state is "closed",
726            if stream.is_closed() {
727                // If view.[[ByteLength]] is not 0, throw a TypeError exception.
728                if view.byte_length() != 0 {
729                    return Err(Error::Type(c"view byte length is not 0".to_owned()));
730                }
731            } else {
732                // Assert: state is "readable".
733                assert!(stream.is_readable());
734
735                // If view.[[ByteLength]] is 0, throw a TypeError exception.
736                if view.byte_length() == 0 {
737                    return Err(Error::Type(c"view byte length is 0".to_owned()));
738                }
739            }
740
741            // If firstDescriptor’s byte offset + firstDescriptor’ bytes filled is not view.[[ByteOffset]],
742            // throw a RangeError exception.
743            if first_descriptor.byte_offset + first_descriptor.bytes_filled.get() !=
744                (view.get_byte_offset() as u64)
745            {
746                return Err(Error::Range(
747                    c"firstDescriptor's byte offset + bytes filled is not view byte offset"
748                        .to_owned(),
749                ));
750            }
751
752            // If firstDescriptor’s buffer byte length is not view.[[ViewedArrayBuffer]].[[ByteLength]],
753            // throw a RangeError exception.
754            if first_descriptor.buffer_byte_length !=
755                (view.viewed_buffer_array_byte_length(cx.into()) as u64)
756            {
757                return Err(Error::Range(
758                c"firstDescriptor's buffer byte length is not view viewed buffer array byte length"
759                    .to_owned(),
760            ));
761            }
762
763            // If firstDescriptor’s bytes filled + view.[[ByteLength]] > firstDescriptor’s byte length,
764            // throw a RangeError exception.
765            if first_descriptor.bytes_filled.get() + (view.byte_length()) as u64 >
766                first_descriptor.byte_length
767            {
768                return Err(Error::Range(
769                    c"bytes filled + view byte length > byte length".to_owned(),
770                ));
771            }
772
773            // Let viewByteLength be view.[[ByteLength]].
774            view_byte_length = view.byte_length();
775
776            // Set firstDescriptor’s buffer to ? TransferArrayBuffer(view.[[ViewedArrayBuffer]]).
777            first_descriptor.buffer = *view
778                .get_array_buffer_view_buffer(cx.into())
779                .transfer_array_buffer(cx.into())?
780                .into_box();
781        }
782
783        // Perform ? ReadableByteStreamControllerRespondInternal(controller, viewByteLength).
784        self.respond_internal(cx, view_byte_length as u64)
785    }
786
787    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-get-desired-size>
788    pub(crate) fn get_desired_size(&self) -> Option<f64> {
789        // Let state be controller.[[stream]].[[state]].
790        let stream = self.stream.get()?;
791
792        // If state is "errored", return null.
793        if stream.is_errored() {
794            return None;
795        }
796
797        // If state is "closed", return 0.
798        if stream.is_closed() {
799            return Some(0.0);
800        }
801
802        // Return controller.[[strategyHWM]] − controller.[[queueTotalSize]].
803        Some(self.strategy_hwm - self.queue_total_size.get())
804    }
805
806    /// <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamcontrollergetbyobrequest>
807    pub(crate) fn get_byob_request(
808        &self,
809        cx: &mut js::context::JSContext,
810    ) -> Fallible<Option<DomRoot<ReadableStreamBYOBRequest>>> {
811        // If controller.[[byobRequest]] is null and controller.[[pendingPullIntos]] is not empty,
812        let pending_pull_intos = self.pending_pull_intos.borrow();
813        if self.byob_request.get().is_none() && !pending_pull_intos.is_empty() {
814            // Let firstDescriptor be controller.[[pendingPullIntos]][0].
815            let first_descriptor = pending_pull_intos.first().unwrap();
816            // Let view be ! Construct(%Uint8Array%, « firstDescriptor’s buffer,
817            // firstDescriptor’s byte offset + firstDescriptor’s bytes filled,
818            // firstDescriptor’s byte length − firstDescriptor’s bytes filled »).
819
820            let byte_offset = first_descriptor.byte_offset + first_descriptor.bytes_filled.get();
821            let byte_length = first_descriptor.byte_length - first_descriptor.bytes_filled.get();
822
823            let view = create_buffer_source_with_constructor(
824                cx,
825                &Constructor::Name(Type::Uint8),
826                &first_descriptor.buffer,
827                byte_offset as usize,
828                byte_length as usize,
829            )
830            .expect("Construct Uint8Array failed");
831
832            // Let byobRequest be a new ReadableStreamBYOBRequest.
833            let byob_request = ReadableStreamBYOBRequest::new(&self.global(), CanGc::from_cx(cx));
834
835            // Set byobRequest.[[controller]] to controller.
836            byob_request.set_controller(Some(&DomRoot::from_ref(self)));
837
838            // Set byobRequest.[[view]] to view.
839            byob_request.set_view(Some(view));
840
841            // Set controller.[[byobRequest]] to byobRequest.
842            self.byob_request.set(Some(&byob_request));
843        }
844
845        // Return controller.[[byobRequest]].
846        Ok(self.byob_request.get())
847    }
848
849    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-close>
850    pub(crate) fn close(&self, cx: &mut JSContext) -> Fallible<()> {
851        // Let stream be controller.[[stream]].
852        let stream = self.stream.get().unwrap();
853
854        // If controller.[[closeRequested]] is true or stream.[[state]] is not "readable", return.
855        if self.close_requested.get() || !stream.is_readable() {
856            return Ok(());
857        }
858
859        // If controller.[[queueTotalSize]] > 0,
860        if self.queue_total_size.get() > 0.0 {
861            // Set controller.[[closeRequested]] to true.
862            self.close_requested.set(true);
863            // Return.
864            return Ok(());
865        }
866
867        {
868            // If controller.[[pendingPullIntos]] is not empty,
869            let pending_pull_intos = self.pending_pull_intos.borrow();
870            if !pending_pull_intos.is_empty() {
871                // Let firstPendingPullInto be controller.[[pendingPullIntos]][0].
872                let first_pending_pull_into = pending_pull_intos.first().unwrap();
873
874                // If the remainder after dividing firstPendingPullInto’s bytes filled by
875                // firstPendingPullInto’s element size is not 0,
876                if !first_pending_pull_into
877                    .bytes_filled
878                    .get()
879                    .is_multiple_of(first_pending_pull_into.element_size)
880                {
881                    // needed to drop the borrow and avoid BorrowMutError
882                    drop(pending_pull_intos);
883
884                    // Let e be a new TypeError exception.
885                    let e = Error::Type(
886                        c"remainder after dividing firstPendingPullInto's bytes
887                    filled by firstPendingPullInto's element size is not 0"
888                            .to_owned(),
889                    );
890
891                    // Perform ! ReadableByteStreamControllerError(controller, e).
892                    rooted!(&in(cx) let mut error = UndefinedValue());
893                    e.clone().to_jsval(
894                        cx.into(),
895                        &self.global(),
896                        error.handle_mut(),
897                        CanGc::from_cx(cx),
898                    );
899                    self.error(cx, error.handle());
900
901                    // Throw e.
902                    return Err(e);
903                }
904            }
905        }
906
907        // Perform ! ReadableByteStreamControllerClearAlgorithms(controller).
908        self.clear_algorithms();
909
910        // Perform ! ReadableStreamClose(stream).
911        stream.close(cx);
912        Ok(())
913    }
914
915    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-error>
916    pub(crate) fn error(&self, cx: &mut JSContext, e: SafeHandleValue) {
917        // Let stream be controller.[[stream]].
918        let stream = self.stream.get().unwrap();
919
920        // If stream.[[state]] is not "readable", return.
921        if !stream.is_readable() {
922            return;
923        }
924
925        // Perform ! ReadableByteStreamControllerClearPendingPullIntos(controller).
926        self.clear_pending_pull_intos();
927
928        // Perform ! ResetQueue(controller).
929        self.reset_queue();
930
931        // Perform ! ReadableByteStreamControllerClearAlgorithms(controller).
932        self.clear_algorithms();
933
934        // Perform ! ReadableStreamError(stream, e).
935        stream.error(cx, e);
936    }
937
938    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-clear-algorithms>
939    fn clear_algorithms(&self) {
940        // Set controller.[[pullAlgorithm]] to undefined.
941        // Set controller.[[cancelAlgorithm]] to undefined.
942        self.underlying_source.set(None);
943    }
944
945    /// <https://streams.spec.whatwg.org/#reset-queue>
946    pub(crate) fn reset_queue(&self) {
947        // Assert: container has [[queue]] and [[queueTotalSize]] internal slots.
948
949        // Set container.[[queue]] to a new empty list.
950        self.queue.borrow_mut().clear();
951
952        // Set container.[[queueTotalSize]] to 0.
953        self.queue_total_size.set(0.0);
954    }
955
956    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-clear-pending-pull-intos>
957    pub(crate) fn clear_pending_pull_intos(&self) {
958        // Perform ! ReadableByteStreamControllerInvalidateBYOBRequest(controller).
959        self.invalidate_byob_request();
960
961        // Set controller.[[pendingPullIntos]] to a new empty list.
962        self.pending_pull_intos.borrow_mut().clear();
963    }
964
965    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-invalidate-byob-request>
966    pub(crate) fn invalidate_byob_request(&self) {
967        if let Some(byob_request) = self.byob_request.get() {
968            // Set controller.[[byobRequest]].[[controller]] to undefined.
969            byob_request.set_controller(None);
970
971            // Set controller.[[byobRequest]].[[view]] to null.
972            byob_request.set_view(None);
973
974            // Set controller.[[byobRequest]] to null.
975            self.byob_request.set(None);
976        }
977        // If controller.[[byobRequest]] is null, return.
978    }
979
980    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-enqueue>
981    pub(crate) fn enqueue(
982        &self,
983        cx: &mut JSContext,
984        chunk: RootedTraceableBox<HeapBufferSource<ArrayBufferViewU8>>,
985    ) -> Fallible<()> {
986        // Let stream be controller.[[stream]].
987        let stream = self.stream.get().unwrap();
988
989        // If controller.[[closeRequested]] is true or stream.[[state]] is not "readable", return.
990        if self.close_requested.get() || !stream.is_readable() {
991            return Ok(());
992        }
993
994        // Let buffer be chunk.[[ViewedArrayBuffer]].
995        let buffer = chunk.get_array_buffer_view_buffer(cx.into());
996
997        // Let byteOffset be chunk.[[ByteOffset]].
998        let byte_offset = chunk.get_byte_offset();
999
1000        // Let byteLength be chunk.[[ByteLength]].
1001        let byte_length = chunk.byte_length();
1002
1003        // If ! IsDetachedBuffer(buffer) is true, throw a TypeError exception.
1004        if buffer.is_detached_buffer(cx.into()) {
1005            return Err(Error::Type(c"buffer is detached".to_owned()));
1006        }
1007
1008        // Let transferredBuffer be ? TransferArrayBuffer(buffer).
1009        let transferred_buffer = buffer.transfer_array_buffer(cx.into())?;
1010
1011        // If controller.[[pendingPullIntos]] is not empty,
1012        {
1013            let mut pending_pull_intos = self.pending_pull_intos.borrow_mut();
1014            if !pending_pull_intos.is_empty() {
1015                // Let firstPendingPullInto be controller.[[pendingPullIntos]][0].
1016                let first_descriptor = pending_pull_intos.first_mut().unwrap();
1017                // If ! IsDetachedBuffer(firstPendingPullInto’s buffer) is true, throw a TypeError exception.
1018                if first_descriptor.buffer.is_detached_buffer(cx.into()) {
1019                    return Err(Error::Type(c"buffer is detached".to_owned()));
1020                }
1021
1022                // Perform ! ReadableByteStreamControllerInvalidateBYOBRequest(controller).
1023                self.invalidate_byob_request();
1024
1025                // Set firstPendingPullInto’s buffer to ! TransferArrayBuffer(firstPendingPullInto’s buffer).
1026                first_descriptor.buffer = *first_descriptor
1027                    .buffer
1028                    .transfer_array_buffer(cx.into())
1029                    .expect("TransferArrayBuffer failed")
1030                    .into_box();
1031
1032                // If firstPendingPullInto’s reader type is "none",
1033                if first_descriptor.reader_type.is_none() {
1034                    // needed to drop the borrow and avoid BorrowMutError
1035                    drop(pending_pull_intos);
1036
1037                    // perform ? ReadableByteStreamControllerEnqueueDetachedPullIntoToQueue(
1038                    // controller, firstPendingPullInto).
1039                    self.enqueue_detached_pull_into_to_queue(cx)?;
1040                }
1041            }
1042        }
1043
1044        // If ! ReadableStreamHasDefaultReader(stream) is true,
1045        if stream.has_default_reader() {
1046            // Perform ! ReadableByteStreamControllerProcessReadRequestsUsingQueue(controller).
1047            self.process_read_requests_using_queue(cx)
1048                .expect("process_read_requests_using_queue failed");
1049
1050            // If ! ReadableStreamGetNumReadRequests(stream) is 0,
1051            if stream.get_num_read_requests() == 0 {
1052                // Assert: controller.[[pendingPullIntos]] is empty.
1053                {
1054                    assert!(self.pending_pull_intos.borrow().is_empty());
1055                }
1056
1057                // Perform ! ReadableByteStreamControllerEnqueueChunkToQueue(
1058                // controller, transferredBuffer, byteOffset, byteLength).
1059                self.enqueue_chunk_to_queue(transferred_buffer, byte_offset, byte_length);
1060            } else {
1061                // Assert: controller.[[queue]] is empty.
1062                assert!(self.queue.borrow().is_empty());
1063
1064                // If controller.[[pendingPullIntos]] is not empty,
1065
1066                let pending_pull_intos = self.pending_pull_intos.borrow();
1067                if !pending_pull_intos.is_empty() {
1068                    // Assert: controller.[[pendingPullIntos]][0]'s reader type is "default".
1069                    assert!(matches!(
1070                        pending_pull_intos.first().unwrap().reader_type,
1071                        Some(ReaderType::Default)
1072                    ));
1073
1074                    // needed to drop the borrow and avoid BorrowMutError
1075                    drop(pending_pull_intos);
1076
1077                    // Perform ! ReadableByteStreamControllerShiftPendingPullInto(controller).
1078                    self.shift_pending_pull_into();
1079                }
1080
1081                // Let transferredView be ! Construct(%Uint8Array%, « transferredBuffer, byteOffset, byteLength »).
1082                let transferred_view = create_buffer_source_with_constructor(
1083                    cx,
1084                    &Constructor::Name(Type::Uint8),
1085                    &transferred_buffer,
1086                    byte_offset,
1087                    byte_length,
1088                )
1089                .expect("Construct Uint8Array failed");
1090
1091                // Perform ! ReadableStreamFulfillReadRequest(stream, transferredView, false).
1092                rooted!(&in(cx) let mut view_value = UndefinedValue());
1093                transferred_view.get_buffer_view_value(cx.into(), view_value.handle_mut());
1094                stream.fulfill_read_request(cx, view_value.handle(), false);
1095            }
1096            // Otherwise, if ! ReadableStreamHasBYOBReader(stream) is true,
1097        } else if stream.has_byob_reader() {
1098            // Perform ! ReadableByteStreamControllerEnqueueChunkToQueue(
1099            // controller, transferredBuffer, byteOffset, byteLength).
1100            self.enqueue_chunk_to_queue(transferred_buffer, byte_offset, byte_length);
1101
1102            // Let filledPullIntos be the result of performing !
1103            // ReadableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(controller).
1104            rooted!(&in(cx) let filled_pull_intos = self.process_pull_into_descriptors_using_queue(cx));
1105
1106            // For each filledPullInto of filledPullIntos,
1107            // Perform ! ReadableByteStreamControllerCommitPullIntoDescriptor(stream, filledPullInto).
1108            for filled_pull_into in &*filled_pull_intos {
1109                self.commit_pull_into_descriptor(cx, filled_pull_into)
1110                    .expect("commit_pull_into_descriptor failed");
1111            }
1112        } else {
1113            // Assert: ! IsReadableStreamLocked(stream) is false.
1114            assert!(!stream.is_locked());
1115
1116            // Perform ! ReadableByteStreamControllerEnqueueChunkToQueue
1117            // (controller, transferredBuffer, byteOffset, byteLength).
1118            self.enqueue_chunk_to_queue(transferred_buffer, byte_offset, byte_length);
1119        }
1120
1121        // Perform ! ReadableByteStreamControllerCallPullIfNeeded(controller).
1122        self.call_pull_if_needed(cx);
1123
1124        Ok(())
1125    }
1126
1127    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-commit-pull-into-descriptor>
1128    fn commit_pull_into_descriptor(
1129        &self,
1130        cx: &mut JSContext,
1131        pull_into_descriptor: &PullIntoDescriptor,
1132    ) -> Fallible<()> {
1133        // Assert: stream.[[state]] is not "errored".
1134        let stream = self.stream.get().unwrap();
1135        assert!(!stream.is_errored());
1136
1137        // Assert: pullIntoDescriptor.reader type is not "none".
1138        assert!(pull_into_descriptor.reader_type.is_some());
1139
1140        // Let done be false.
1141        let mut done = false;
1142
1143        // If stream.[[state]] is "closed",
1144        if stream.is_closed() {
1145            // Assert: the remainder after dividing pullIntoDescriptor’s bytes filled
1146            // by pullIntoDescriptor’s element size is 0.
1147            assert!(
1148                pull_into_descriptor
1149                    .bytes_filled
1150                    .get()
1151                    .is_multiple_of(pull_into_descriptor.element_size)
1152            );
1153
1154            // Set done to true.
1155            done = true;
1156        }
1157
1158        // Let filledView be ! ReadableByteStreamControllerConvertPullIntoDescriptor(pullIntoDescriptor).
1159        let filled_view = self
1160            .convert_pull_into_descriptor(cx, pull_into_descriptor)
1161            .expect("convert_pull_into_descriptor failed");
1162
1163        rooted!(&in(cx) let mut view_value = UndefinedValue());
1164        filled_view.get_buffer_view_value(cx.into(), view_value.handle_mut());
1165
1166        // If pullIntoDescriptor’s reader type is "default",
1167        if matches!(pull_into_descriptor.reader_type, Some(ReaderType::Default)) {
1168            // Perform ! ReadableStreamFulfillReadRequest(stream, filledView, done).
1169
1170            stream.fulfill_read_request(cx, view_value.handle(), done);
1171        } else {
1172            // Assert: pullIntoDescriptor’s reader type is "byob".
1173            assert!(matches!(
1174                pull_into_descriptor.reader_type,
1175                Some(ReaderType::Byob)
1176            ));
1177
1178            // Perform ! ReadableStreamFulfillReadIntoRequest(stream, filledView, done).
1179            stream.fulfill_read_into_request(cx, view_value.handle(), done);
1180        }
1181        Ok(())
1182    }
1183
1184    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-convert-pull-into-descriptor>
1185    pub(crate) fn convert_pull_into_descriptor(
1186        &self,
1187        cx: &mut js::context::JSContext,
1188        pull_into_descriptor: &PullIntoDescriptor,
1189    ) -> Fallible<RootedTraceableBox<HeapBufferSource<ArrayBufferViewU8>>> {
1190        // Let bytesFilled be pullIntoDescriptor’s bytes filled.
1191        let bytes_filled = pull_into_descriptor.bytes_filled.get();
1192
1193        // Let elementSize be pullIntoDescriptor’s element size.
1194        let element_size = pull_into_descriptor.element_size;
1195
1196        // Assert: bytesFilled ≤ pullIntoDescriptor’s byte length.
1197        assert!(bytes_filled <= pull_into_descriptor.byte_length);
1198
1199        // Assert: the remainder after dividing bytesFilled by elementSize is 0.
1200        assert!(bytes_filled.is_multiple_of(element_size));
1201
1202        // Let buffer be ! TransferArrayBuffer(pullIntoDescriptor’s buffer).
1203        let buffer = pull_into_descriptor
1204            .buffer
1205            .transfer_array_buffer(cx.into())
1206            .expect("TransferArrayBuffer failed");
1207
1208        // Return ! Construct(pullIntoDescriptor’s view constructor,
1209        // « buffer, pullIntoDescriptor’s byte offset, bytesFilled ÷ elementSize »).
1210        Ok(create_buffer_source_with_constructor(
1211            cx,
1212            &pull_into_descriptor.view_constructor,
1213            &buffer,
1214            pull_into_descriptor.byte_offset as usize,
1215            (bytes_filled / element_size) as usize,
1216        )
1217        .expect("Construct view failed"))
1218    }
1219
1220    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-process-pull-into-descriptors-using-queue>
1221    pub(crate) fn process_pull_into_descriptors_using_queue(
1222        &self,
1223        cx: &mut js::context::JSContext,
1224    ) -> Vec<PullIntoDescriptor> {
1225        // Assert: controller.[[closeRequested]] is false.
1226        assert!(!self.close_requested.get());
1227
1228        // Let filledPullIntos be a new empty list.
1229        rooted!(&in(cx) let mut filled_pull_intos = Vec::new());
1230
1231        // While controller.[[pendingPullIntos]] is not empty,
1232        loop {
1233            // If controller.[[queueTotalSize]] is 0, then break.
1234            if self.queue_total_size.get() == 0.0 {
1235                break;
1236            }
1237
1238            // Let pullIntoDescriptor be controller.[[pendingPullIntos]][0].
1239            let fill_pull_result = {
1240                let pending_pull_intos = self.pending_pull_intos.borrow();
1241                let Some(pull_into_descriptor) = pending_pull_intos.first() else {
1242                    break;
1243                };
1244                self.fill_pull_into_descriptor_from_queue(cx, pull_into_descriptor)
1245            };
1246
1247            // If ! ReadableByteStreamControllerFillPullIntoDescriptorFromQueue(controller, pullIntoDescriptor) is true,
1248            if fill_pull_result {
1249                // Perform ! ReadableByteStreamControllerShiftPendingPullInto(controller).
1250                // Append pullIntoDescriptor to filledPullIntos.
1251                filled_pull_intos.push(self.shift_pending_pull_into());
1252            }
1253        }
1254
1255        // Return filledPullIntos.
1256        filled_pull_intos.take()
1257    }
1258
1259    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-fill-pull-into-descriptor-from-queue>
1260    pub(crate) fn fill_pull_into_descriptor_from_queue(
1261        &self,
1262        cx: &mut js::context::JSContext,
1263        pull_into_descriptor: &PullIntoDescriptor,
1264    ) -> bool {
1265        // Let maxBytesToCopy be min(controller.[[queueTotalSize]],
1266        // pullIntoDescriptor’s byte length − pullIntoDescriptor’s bytes filled).
1267        let max_bytes_to_copy = min(
1268            self.queue_total_size.get() as usize,
1269            (pull_into_descriptor.byte_length - pull_into_descriptor.bytes_filled.get()) as usize,
1270        );
1271
1272        // Let maxBytesFilled be pullIntoDescriptor’s bytes filled + maxBytesToCopy.
1273        let max_bytes_filled = pull_into_descriptor.bytes_filled.get() as usize + max_bytes_to_copy;
1274
1275        // Let totalBytesToCopyRemaining be maxBytesToCopy.
1276        let mut total_bytes_to_copy_remaining = max_bytes_to_copy;
1277
1278        // Let ready be false.
1279        let mut ready = false;
1280
1281        // Assert: ! IsDetachedBuffer(pullIntoDescriptor’s buffer) is false.
1282        assert!(!pull_into_descriptor.buffer.is_detached_buffer(cx.into()));
1283
1284        // Assert: pullIntoDescriptor’s bytes filled < pullIntoDescriptor’s minimum fill.
1285        assert!(pull_into_descriptor.bytes_filled.get() < pull_into_descriptor.minimum_fill);
1286
1287        // Let remainderBytes be the remainder after dividing maxBytesFilled by pullIntoDescriptor’s element size.
1288        let remainder_bytes = max_bytes_filled % pull_into_descriptor.element_size as usize;
1289
1290        // Let maxAlignedBytes be maxBytesFilled − remainderBytes.
1291        let max_aligned_bytes = max_bytes_filled - remainder_bytes;
1292
1293        // If maxAlignedBytes ≥ pullIntoDescriptor’s minimum fill,
1294        if max_aligned_bytes >= pull_into_descriptor.minimum_fill as usize {
1295            // Set totalBytesToCopyRemaining to maxAlignedBytes − pullIntoDescriptor’s bytes filled.
1296            total_bytes_to_copy_remaining =
1297                max_aligned_bytes - (pull_into_descriptor.bytes_filled.get() as usize);
1298
1299            // Set ready to true.
1300            ready = true;
1301        }
1302
1303        // Let queue be controller.[[queue]].
1304        let mut queue = self.queue.borrow_mut();
1305
1306        // While totalBytesToCopyRemaining > 0,
1307        while total_bytes_to_copy_remaining > 0 {
1308            // Let headOfQueue be queue[0].
1309            let head_of_queue = queue.front_mut().unwrap();
1310
1311            // Let bytesToCopy be min(totalBytesToCopyRemaining, headOfQueue’s byte length).
1312            let bytes_to_copy = total_bytes_to_copy_remaining.min(head_of_queue.byte_length);
1313
1314            // Let destStart be pullIntoDescriptor’s byte offset + pullIntoDescriptor’s bytes filled.
1315            let dest_start =
1316                pull_into_descriptor.byte_offset + pull_into_descriptor.bytes_filled.get();
1317
1318            // Let descriptorBuffer be pullIntoDescriptor’s buffer.
1319            let descriptor_buffer = &pull_into_descriptor.buffer;
1320
1321            // Let queueBuffer be headOfQueue’s buffer.
1322            let queue_buffer = &head_of_queue.buffer;
1323
1324            // Let queueByteOffset be headOfQueue’s byte offset.
1325            let queue_byte_offset = head_of_queue.byte_offset;
1326
1327            // Assert: ! CanCopyDataBlockBytes(descriptorBuffer, destStart,
1328            // queueBuffer, queueByteOffset, bytesToCopy) is true.
1329            assert!(descriptor_buffer.can_copy_data_block_bytes(
1330                cx.into(),
1331                dest_start as usize,
1332                queue_buffer,
1333                queue_byte_offset,
1334                bytes_to_copy
1335            ));
1336
1337            // Perform ! CopyDataBlockBytes(descriptorBuffer.[[ArrayBufferData]], destStart,
1338            // queueBuffer.[[ArrayBufferData]], queueByteOffset, bytesToCopy).
1339            descriptor_buffer.copy_data_block_bytes(
1340                cx.into(),
1341                dest_start as usize,
1342                queue_buffer,
1343                queue_byte_offset,
1344                bytes_to_copy,
1345            );
1346
1347            // If headOfQueue’s byte length is bytesToCopy,
1348            if head_of_queue.byte_length == bytes_to_copy {
1349                // Remove queue[0].
1350                queue.pop_front().unwrap();
1351            } else {
1352                // Set headOfQueue’s byte offset to headOfQueue’s byte offset + bytesToCopy.
1353                head_of_queue.byte_offset += bytes_to_copy;
1354
1355                // Set headOfQueue’s byte length to headOfQueue’s byte length − bytesToCopy.
1356                head_of_queue.byte_length -= bytes_to_copy;
1357            }
1358
1359            // Set controller.[[queueTotalSize]] to controller.[[queueTotalSize]] − bytesToCopy.
1360            self.queue_total_size
1361                .set(self.queue_total_size.get() - (bytes_to_copy as f64));
1362
1363            // Perform ! ReadableByteStreamControllerFillHeadPullIntoDescriptor(
1364            // controller, bytesToCopy, pullIntoDescriptor).
1365            self.fill_head_pull_into_descriptor(bytes_to_copy as u64, pull_into_descriptor);
1366
1367            // Set totalBytesToCopyRemaining to totalBytesToCopyRemaining − bytesToCopy.
1368            total_bytes_to_copy_remaining -= bytes_to_copy;
1369        }
1370
1371        // If ready is false,
1372        if !ready {
1373            // Assert: controller.[[queueTotalSize]] is 0.
1374            assert!(self.queue_total_size.get() == 0.0);
1375
1376            // Assert: pullIntoDescriptor’s bytes filled > 0.
1377            assert!(pull_into_descriptor.bytes_filled.get() > 0);
1378
1379            // Assert: pullIntoDescriptor’s bytes filled < pullIntoDescriptor’s minimum fill.
1380            assert!(pull_into_descriptor.bytes_filled.get() < pull_into_descriptor.minimum_fill);
1381        }
1382
1383        // Return ready.
1384        ready
1385    }
1386
1387    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-fill-head-pull-into-descriptor>
1388    pub(crate) fn fill_head_pull_into_descriptor(
1389        &self,
1390        bytes_copied: u64,
1391        pull_into_descriptor: &PullIntoDescriptor,
1392    ) {
1393        // Assert: either controller.[[pendingPullIntos]] is empty,
1394        // or controller.[[pendingPullIntos]][0] is pullIntoDescriptor.
1395        {
1396            let pending_pull_intos = self.pending_pull_intos.borrow();
1397            assert!(
1398                pending_pull_intos.is_empty() ||
1399                    pending_pull_intos.first().unwrap() == pull_into_descriptor
1400            );
1401        }
1402
1403        // Assert: controller.[[byobRequest]] is null.
1404        assert!(self.byob_request.get().is_none());
1405
1406        // Set pullIntoDescriptor’s bytes filled to bytes filled + size.
1407        pull_into_descriptor
1408            .bytes_filled
1409            .set(pull_into_descriptor.bytes_filled.get() + bytes_copied);
1410    }
1411
1412    /// <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamcontrollerenqueuedetachedpullintotoqueue>
1413    pub(crate) fn enqueue_detached_pull_into_to_queue(&self, cx: &mut JSContext) -> Fallible<()> {
1414        // first_descriptor: &PullIntoDescriptor,
1415        let pending_pull_intos = self.pending_pull_intos.borrow();
1416        let first_descriptor = pending_pull_intos.first().unwrap();
1417
1418        // Assert: pullIntoDescriptor’s reader type is "none".
1419        assert!(first_descriptor.reader_type.is_none());
1420
1421        // If pullIntoDescriptor’s bytes filled > 0, perform ?
1422        // ReadableByteStreamControllerEnqueueClonedChunkToQueue(controller,
1423        // pullIntoDescriptor’s buffer, pullIntoDescriptor’s byte offset, pullIntoDescriptor’s bytes filled).
1424
1425        if first_descriptor.bytes_filled.get() > 0 {
1426            self.enqueue_cloned_chunk_to_queue(
1427                cx,
1428                &first_descriptor.buffer,
1429                first_descriptor.byte_offset,
1430                first_descriptor.bytes_filled.get(),
1431            )?;
1432        }
1433
1434        // needed to drop the borrow and avoid BorrowMutError
1435        drop(pending_pull_intos);
1436
1437        // Perform ! ReadableByteStreamControllerShiftPendingPullInto(controller).
1438        self.shift_pending_pull_into();
1439
1440        Ok(())
1441    }
1442
1443    /// <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamcontrollerenqueueclonedchunktoqueue>
1444    pub(crate) fn enqueue_cloned_chunk_to_queue(
1445        &self,
1446        cx: &mut JSContext,
1447        buffer: &HeapBufferSource<ArrayBufferU8>,
1448        byte_offset: u64,
1449        byte_length: u64,
1450    ) -> Fallible<()> {
1451        // Let cloneResult be CloneArrayBuffer(buffer, byteOffset, byteLength, %ArrayBuffer%).
1452        if let Ok(clone_result) =
1453            buffer.clone_array_buffer(cx, byte_offset as usize, byte_length as usize)
1454        {
1455            // Perform ! ReadableByteStreamControllerEnqueueChunkToQueue
1456            // (controller, cloneResult.[[Value]], 0, byteLength).
1457            self.enqueue_chunk_to_queue(clone_result, 0, byte_length as usize);
1458
1459            Ok(())
1460        } else {
1461            // If cloneResult is an abrupt completion,
1462
1463            // Perform ! ReadableByteStreamControllerError(controller, cloneResult.[[Value]]).
1464            rooted!(&in(cx) let mut rval = UndefinedValue());
1465            let error = Error::Type(c"can not clone array buffer".to_owned());
1466            error.clone().to_jsval(
1467                cx.into(),
1468                &self.global(),
1469                rval.handle_mut(),
1470                CanGc::from_cx(cx),
1471            );
1472            self.error(cx, rval.handle());
1473
1474            // Return cloneResult.
1475            Err(error)
1476        }
1477    }
1478
1479    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-enqueue-chunk-to-queue>
1480    pub(crate) fn enqueue_chunk_to_queue(
1481        &self,
1482        buffer: RootedTraceableBox<HeapBufferSource<ArrayBufferU8>>,
1483        byte_offset: usize,
1484        byte_length: usize,
1485    ) {
1486        // Let entry be a new ReadableByteStreamQueueEntry object.
1487        // Append entry to controller.[[queue]].
1488        self.queue
1489            .borrow_mut()
1490            .push_back(QueueEntry::new(buffer, byte_offset, byte_length));
1491
1492        // Set controller.[[queueTotalSize]] to controller.[[queueTotalSize]] + byteLength.
1493        self.queue_total_size
1494            .set(self.queue_total_size.get() + byte_length as f64);
1495    }
1496
1497    pub(crate) fn in_memory(&self) -> bool {
1498        let Some(underlying_source) = self.underlying_source.get() else {
1499            return false;
1500        };
1501        underlying_source.in_memory()
1502    }
1503
1504    pub(crate) fn get_in_memory_bytes(&self) -> Option<Vec<u8>> {
1505        let underlying_source = self.underlying_source.get()?;
1506        if !underlying_source.in_memory() {
1507            return None;
1508        }
1509
1510        let cx = GlobalScope::get_cx();
1511        self.queue.borrow().iter().try_fold(
1512            Vec::with_capacity(self.queue_total_size.get() as usize),
1513            |mut bytes, entry| {
1514                let mut chunk = vec![0; entry.byte_length];
1515                entry
1516                    .buffer
1517                    .copy_data_to(
1518                        cx,
1519                        &mut chunk,
1520                        entry.byte_offset,
1521                        entry.byte_offset + entry.byte_length,
1522                    )
1523                    .ok()?;
1524                bytes.extend(chunk);
1525                Some(bytes)
1526            },
1527        )
1528    }
1529
1530    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-shift-pending-pull-into>
1531    pub(crate) fn shift_pending_pull_into(&self) -> PullIntoDescriptor {
1532        // Assert: controller.[[byobRequest]] is null.
1533        assert!(self.byob_request.get().is_none());
1534
1535        // Let descriptor be controller.[[pendingPullIntos]][0].
1536        // Remove descriptor from controller.[[pendingPullIntos]].
1537        // Return descriptor.
1538        self.pending_pull_intos.borrow_mut().remove(0)
1539    }
1540
1541    /// <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamcontrollerprocessreadrequestsusingqueue>
1542    pub(crate) fn process_read_requests_using_queue(&self, cx: &mut JSContext) -> Fallible<()> {
1543        // Let reader be controller.[[stream]].[[reader]].
1544        // Assert: reader implements ReadableStreamDefaultReader.
1545        let reader = self.stream.get().unwrap().get_default_reader();
1546
1547        // Step 3
1548        reader.process_read_requests(cx, DomRoot::from_ref(self))
1549    }
1550
1551    /// <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamcontrollerfillreadrequestfromqueue>
1552    pub(crate) fn fill_read_request_from_queue(
1553        &self,
1554        cx: &mut JSContext,
1555        read_request: &ReadRequest,
1556    ) -> Fallible<()> {
1557        // Assert: controller.[[queueTotalSize]] > 0.
1558        assert!(self.queue_total_size.get() > 0.0);
1559        // Also assert that the queue has a non-zero length;
1560        assert!(!self.queue.borrow().is_empty());
1561
1562        // Let entry be controller.[[queue]][0].
1563        // Remove entry from controller.[[queue]].
1564        rooted!(&in(cx) let entry = self.remove_entry());
1565
1566        // Set controller.[[queueTotalSize]] to controller.[[queueTotalSize]] − entry’s byte length.
1567        self.queue_total_size
1568            .set(self.queue_total_size.get() - entry.byte_length as f64);
1569
1570        // Perform ! ReadableByteStreamControllerHandleQueueDrain(controller).
1571        self.handle_queue_drain(cx);
1572
1573        // Let view be ! Construct(%Uint8Array%, « entry’s buffer, entry’s byte offset, entry’s byte length »).
1574        let view = create_buffer_source_with_constructor(
1575            cx,
1576            &Constructor::Name(Type::Uint8),
1577            &entry.buffer,
1578            entry.byte_offset,
1579            entry.byte_length,
1580        )
1581        .expect("Construct Uint8Array failed");
1582
1583        // Perform readRequest’s chunk steps, given view.
1584        let result = RootedTraceableBox::new(Heap::default());
1585        rooted!(&in(cx) let mut view_value = UndefinedValue());
1586        view.get_buffer_view_value(cx.into(), view_value.handle_mut());
1587        result.set(*view_value);
1588
1589        read_request.chunk_steps(cx, result, &self.global());
1590
1591        Ok(())
1592    }
1593
1594    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-handle-queue-drain>
1595    pub(crate) fn handle_queue_drain(&self, cx: &mut JSContext) {
1596        // Assert: controller.[[stream]].[[state]] is "readable".
1597        assert!(self.stream.get().unwrap().is_readable());
1598
1599        // If controller.[[queueTotalSize]] is 0 and controller.[[closeRequested]] is true,
1600        if self.queue_total_size.get() == 0.0 && self.close_requested.get() {
1601            // Perform ! ReadableByteStreamControllerClearAlgorithms(controller).
1602            self.clear_algorithms();
1603
1604            // Perform ! ReadableStreamClose(controller.[[stream]]).
1605            self.stream.get().unwrap().close(cx);
1606        } else {
1607            // Perform ! ReadableByteStreamControllerCallPullIfNeeded(controller).
1608            self.call_pull_if_needed(cx);
1609        }
1610    }
1611
1612    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-call-pull-if-needed>
1613    fn call_pull_if_needed(&self, cx: &mut JSContext) {
1614        // Let shouldPull be ! ReadableByteStreamControllerShouldCallPull(controller).
1615        let should_pull = self.should_call_pull();
1616        // If shouldPull is false, return.
1617        if !should_pull {
1618            return;
1619        }
1620
1621        // If controller.[[pulling]] is true,
1622        if self.pulling.get() {
1623            // Set controller.[[pullAgain]] to true.
1624            self.pull_again.set(true);
1625
1626            // Return.
1627            return;
1628        }
1629
1630        // Assert: controller.[[pullAgain]] is false.
1631        assert!(!self.pull_again.get());
1632
1633        // Set controller.[[pulling]] to true.
1634        self.pulling.set(true);
1635
1636        // Let pullPromise be the result of performing controller.[[pullAlgorithm]].
1637        // Continues into the resolve and reject handling of the native handler.
1638        let global = self.global();
1639        let rooted_controller = DomRoot::from_ref(self);
1640        let controller = Controller::ReadableByteStreamController(rooted_controller.clone());
1641
1642        if let Some(underlying_source) = self.underlying_source.get() {
1643            let handler = PromiseNativeHandler::new(
1644                cx,
1645                &global,
1646                Some(Box::new(PullAlgorithmFulfillmentHandler {
1647                    controller: Dom::from_ref(&rooted_controller),
1648                })),
1649                Some(Box::new(PullAlgorithmRejectionHandler {
1650                    controller: Dom::from_ref(&rooted_controller),
1651                })),
1652            );
1653
1654            let mut realm = enter_auto_realm(cx, &*global);
1655            let cx = &mut realm.current_realm();
1656
1657            let result = underlying_source
1658                .call_pull_algorithm(cx, controller)
1659                .unwrap_or_else(|| {
1660                    let promise = Promise::new_resolved(&global, cx.into(), (), CanGc::from_cx(cx));
1661                    Ok(promise)
1662                });
1663            let promise = result.unwrap_or_else(|error| {
1664                rooted!(&in(cx) let mut rval = UndefinedValue());
1665                // TODO: check if `self.global()` is the right globalscope.
1666                error.to_jsval(cx.into(), &global, rval.handle_mut(), CanGc::from_cx(cx));
1667                Promise::new_rejected(&global, cx.into(), rval.handle(), CanGc::from_cx(cx))
1668            });
1669            promise.append_native_handler(cx, &handler);
1670        }
1671    }
1672
1673    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-should-call-pull>
1674    fn should_call_pull(&self) -> bool {
1675        // Let stream be controller.[[stream]].
1676        // Note: the spec does not assert that stream is not undefined here,
1677        // so we return false if it is.
1678        let stream = self.stream.get().unwrap();
1679
1680        // If stream.[[state]] is not "readable", return false.
1681        if !stream.is_readable() {
1682            return false;
1683        }
1684
1685        // If controller.[[closeRequested]] is true, return false.
1686        if self.close_requested.get() {
1687            return false;
1688        }
1689
1690        // If controller.[[started]] is false, return false.
1691        if !self.started.get() {
1692            return false;
1693        }
1694
1695        // If ! ReadableStreamHasDefaultReader(stream) is true and ! ReadableStreamGetNumReadRequests(stream) > 0
1696        // , return true.
1697        if stream.has_default_reader() && stream.get_num_read_requests() > 0 {
1698            return true;
1699        }
1700
1701        // If ! ReadableStreamHasBYOBReader(stream) is true and ! ReadableStreamGetNumReadIntoRequests(stream) > 0
1702        // , return true.
1703        if stream.has_byob_reader() && stream.get_num_read_into_requests() > 0 {
1704            return true;
1705        }
1706
1707        // Let desiredSize be ! ReadableByteStreamControllerGetDesiredSize(controller).
1708        let desired_size = self.get_desired_size();
1709
1710        // Assert: desiredSize is not null.
1711        assert!(desired_size.is_some());
1712
1713        // If desiredSize > 0, return true.
1714        if desired_size.unwrap() > 0. {
1715            return true;
1716        }
1717
1718        // Return false.
1719        false
1720    }
1721    /// <https://streams.spec.whatwg.org/#set-up-readable-byte-stream-controller>
1722    pub(crate) fn setup(
1723        &self,
1724        cx: &mut JSContext,
1725        global: &GlobalScope,
1726        stream: DomRoot<ReadableStream>,
1727    ) -> Fallible<()> {
1728        // Assert: stream.[[controller]] is undefined.
1729        stream.assert_no_controller();
1730
1731        // If autoAllocateChunkSize is not undefined,
1732        if self.auto_allocate_chunk_size.is_some() {
1733            // Assert: ! IsInteger(autoAllocateChunkSize) is true. Implicit
1734            // Assert: autoAllocateChunkSize is positive. (Implicit by type.)
1735        }
1736
1737        // Set controller.[[stream]] to stream.
1738        self.stream.set(Some(&stream));
1739
1740        // Set controller.[[pullAgain]] and controller.[[pulling]] to false.
1741        self.pull_again.set(false);
1742        self.pulling.set(false);
1743
1744        // Set controller.[[byobRequest]] to null.
1745        self.byob_request.set(None);
1746
1747        // Perform ! ResetQueue(controller).
1748        self.reset_queue();
1749
1750        // Set controller.[[closeRequested]] and controller.[[started]] to false.
1751        self.close_requested.set(false);
1752        self.started.set(false);
1753
1754        // Set controller.[[strategyHWM]] to highWaterMark.
1755        // Set controller.[[pullAlgorithm]] to pullAlgorithm.
1756        // Set controller.[[cancelAlgorithm]] to cancelAlgorithm.
1757        // Set controller.[[autoAllocateChunkSize]] to autoAllocateChunkSize.
1758        // Set controller.[[pendingPullIntos]] to a new empty list.
1759        // Note: the above steps are done in `new`.
1760
1761        // Set stream.[[controller]] to controller.
1762        let rooted_byte_controller = DomRoot::from_ref(self);
1763        stream.set_byte_controller(&rooted_byte_controller);
1764
1765        if let Some(underlying_source) = rooted_byte_controller.underlying_source.get() {
1766            // Let startResult be the result of performing startAlgorithm. (This might throw an exception.)
1767            let start_result = underlying_source
1768                .call_start_algorithm(
1769                    cx,
1770                    Controller::ReadableByteStreamController(rooted_byte_controller.clone()),
1771                )
1772                .unwrap_or_else(|| {
1773                    Ok(Promise::new_resolved(
1774                        global,
1775                        cx.into(),
1776                        (),
1777                        CanGc::from_cx(cx),
1778                    ))
1779                });
1780
1781            // Let startPromise be a promise resolved with startResult.
1782            let start_promise = start_result?;
1783
1784            // Upon fulfillment of startPromise, Upon rejection of startPromise with reason r,
1785            let handler = PromiseNativeHandler::new(
1786                cx,
1787                global,
1788                Some(Box::new(StartAlgorithmFulfillmentHandler {
1789                    controller: Dom::from_ref(&rooted_byte_controller),
1790                })),
1791                Some(Box::new(StartAlgorithmRejectionHandler {
1792                    controller: Dom::from_ref(&rooted_byte_controller),
1793                })),
1794            );
1795            let mut realm = enter_auto_realm(cx, global);
1796            let cx = &mut realm.current_realm();
1797            start_promise.append_native_handler(cx, &handler);
1798        };
1799
1800        Ok(())
1801    }
1802
1803    // <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamcontroller-releasesteps
1804    pub(crate) fn perform_release_steps(&self) -> Fallible<()> {
1805        // If this.[[pendingPullIntos]] is not empty,
1806        let mut pending_pull_intos = self.pending_pull_intos.borrow_mut();
1807        if !pending_pull_intos.is_empty() {
1808            // Let firstPendingPullInto be this.[[pendingPullIntos]][0].
1809            let mut first_pending_pull_into = RootedTraceableBox::new(pending_pull_intos.remove(0));
1810
1811            // Set firstPendingPullInto’s reader type to "none".
1812            first_pending_pull_into.reader_type = None;
1813
1814            // Set this.[[pendingPullIntos]] to the list « firstPendingPullInto »
1815            pending_pull_intos.clear();
1816            pending_pull_intos.push(*first_pending_pull_into.into_box());
1817        }
1818        Ok(())
1819    }
1820
1821    /// <https://streams.spec.whatwg.org/#rbs-controller-private-cancel>
1822    pub(crate) fn perform_cancel_steps(
1823        &self,
1824        cx: &mut JSContext,
1825        global: &GlobalScope,
1826        reason: SafeHandleValue,
1827    ) -> Rc<Promise> {
1828        // Perform ! ReadableByteStreamControllerClearPendingPullIntos(this).
1829        self.clear_pending_pull_intos();
1830
1831        // Perform ! ResetQueue(this).
1832        self.reset_queue();
1833
1834        let underlying_source = self
1835            .underlying_source
1836            .get()
1837            .expect("Controller should have a source when the cancel steps are called into.");
1838
1839        // Let result be the result of performing this.[[cancelAlgorithm]], passing in reason.
1840        let result = underlying_source
1841            .call_cancel_algorithm(cx, global, reason)
1842            .unwrap_or_else(|| {
1843                let promise = Promise::new2(cx, global);
1844                promise.resolve_native_with_cx(cx, &());
1845                Ok(promise)
1846            });
1847
1848        let promise = result.unwrap_or_else(|error| {
1849            rooted!(&in(cx) let mut rval = UndefinedValue());
1850            error.to_jsval(cx.into(), global, rval.handle_mut(), CanGc::from_cx(cx));
1851            let promise = Promise::new2(cx, global);
1852            promise.reject_native_with_cx(cx, &rval.handle());
1853            promise
1854        });
1855
1856        // Perform ! ReadableByteStreamControllerClearAlgorithms(this).
1857        self.clear_algorithms();
1858
1859        // Return result(the promise).
1860        promise
1861    }
1862
1863    /// <https://streams.spec.whatwg.org/#rbs-controller-private-pull>
1864    pub(crate) fn perform_pull_steps(&self, cx: &mut JSContext, read_request: &ReadRequest) {
1865        // Let stream be this.[[stream]].
1866        let stream = self.stream.get().unwrap();
1867
1868        // Assert: ! ReadableStreamHasDefaultReader(stream) is true.
1869        assert!(stream.has_default_reader());
1870
1871        // If this.[[queueTotalSize]] > 0,
1872        if self.queue_total_size.get() > 0.0 {
1873            // Assert: ! ReadableStreamGetNumReadRequests(stream) is 0.
1874            assert_eq!(stream.get_num_read_requests(), 0);
1875
1876            // Perform ! ReadableByteStreamControllerFillReadRequestFromQueue(this, readRequest).
1877            let _ = self.fill_read_request_from_queue(cx, read_request);
1878
1879            // Return.
1880            return;
1881        }
1882
1883        // Let autoAllocateChunkSize be this.[[autoAllocateChunkSize]].
1884        let auto_allocate_chunk_size = self.auto_allocate_chunk_size;
1885
1886        // If autoAllocateChunkSize is not undefined,
1887        if let Some(auto_allocate_chunk_size) = auto_allocate_chunk_size {
1888            // create_array_buffer_with_size
1889            // Let buffer be Construct(%ArrayBuffer%, « autoAllocateChunkSize »).
1890            match create_array_buffer_with_size(cx, auto_allocate_chunk_size as usize) {
1891                Ok(buffer) => {
1892                    // Let pullIntoDescriptor be a new pull-into descriptor with
1893                    // buffer buffer.[[Value]]
1894                    // buffer byte length autoAllocateChunkSize
1895                    // byte offset  0
1896                    // byte length  autoAllocateChunkSize
1897                    // bytes filled  0
1898                    // minimum fill 1
1899                    // element size 1
1900                    // view constructor %Uint8Array%
1901                    // reader type  "default"
1902
1903                    // Append pullIntoDescriptor to this.[[pendingPullIntos]].
1904                    self.pending_pull_intos
1905                        .borrow_mut()
1906                        .push(PullIntoDescriptor {
1907                            buffer: *buffer.into_box(),
1908                            buffer_byte_length: auto_allocate_chunk_size,
1909                            byte_length: auto_allocate_chunk_size,
1910                            byte_offset: 0,
1911                            bytes_filled: Cell::new(0),
1912                            minimum_fill: 1,
1913                            element_size: 1,
1914                            view_constructor: Constructor::Name(Type::Uint8),
1915                            reader_type: Some(ReaderType::Default),
1916                        });
1917                },
1918                Err(error) => {
1919                    // If buffer is an abrupt completion,
1920                    // Perform readRequest’s error steps, given buffer.[[Value]].
1921
1922                    rooted!(&in(cx) let mut rval = UndefinedValue());
1923                    error.to_jsval(
1924                        cx.into(),
1925                        &self.global(),
1926                        rval.handle_mut(),
1927                        CanGc::from_cx(cx),
1928                    );
1929                    read_request.error_steps(cx, rval.handle());
1930
1931                    // Return.
1932                    return;
1933                },
1934            }
1935        }
1936
1937        // Perform ! ReadableStreamAddReadRequest(stream, readRequest).
1938        stream.add_read_request(read_request);
1939
1940        // Perform ! ReadableByteStreamControllerCallPullIfNeeded(this).
1941        self.call_pull_if_needed(cx);
1942    }
1943
1944    /// Setting the JS object after the heap has settled down.
1945    pub(crate) fn set_underlying_source_this_object(&self, this_object: HandleObject) {
1946        if let Some(underlying_source) = self.underlying_source.get() {
1947            underlying_source.set_underlying_source_this_object(this_object);
1948        }
1949    }
1950
1951    pub(crate) fn remove_entry(&self) -> QueueEntry {
1952        self.queue
1953            .borrow_mut()
1954            .pop_front()
1955            .expect("Reader must have read request when remove is called into.")
1956    }
1957
1958    pub(crate) fn get_queue_total_size(&self) -> f64 {
1959        self.queue_total_size.get()
1960    }
1961
1962    pub(crate) fn get_pending_pull_intos_size(&self) -> usize {
1963        self.pending_pull_intos.borrow().len()
1964    }
1965}
1966
1967impl ReadableByteStreamControllerMethods<crate::DomTypeHolder> for ReadableByteStreamController {
1968    /// <https://streams.spec.whatwg.org/#rbs-controller-byob-request>
1969    fn GetByobRequest(
1970        &self,
1971        cx: &mut js::context::JSContext,
1972    ) -> Fallible<Option<DomRoot<ReadableStreamBYOBRequest>>> {
1973        // Return ! ReadableByteStreamControllerGetBYOBRequest(this).
1974        self.get_byob_request(cx)
1975    }
1976
1977    /// <https://streams.spec.whatwg.org/#rbs-controller-desired-size>
1978    fn GetDesiredSize(&self) -> Option<f64> {
1979        // Return ! ReadableByteStreamControllerGetDesiredSize(this).
1980        self.get_desired_size()
1981    }
1982
1983    /// <https://streams.spec.whatwg.org/#rbs-controller-close>
1984    fn Close(&self, cx: &mut JSContext) -> Fallible<()> {
1985        // If this.[[closeRequested]] is true, throw a TypeError exception.
1986        if self.close_requested.get() {
1987            return Err(Error::Type(c"closeRequested is true".to_owned()));
1988        }
1989
1990        // If this.[[stream]].[[state]] is not "readable", throw a TypeError exception.
1991        if !self.stream.get().unwrap().is_readable() {
1992            return Err(Error::Type(c"stream is not readable".to_owned()));
1993        }
1994
1995        // Perform ? ReadableByteStreamControllerClose(this).
1996        self.close(cx)
1997    }
1998
1999    /// <https://streams.spec.whatwg.org/#rbs-controller-enqueue>
2000    fn Enqueue(
2001        &self,
2002        cx: &mut JSContext,
2003        chunk: js::gc::CustomAutoRooterGuard<js::typedarray::ArrayBufferView>,
2004    ) -> Fallible<()> {
2005        let chunk = HeapBufferSource::<ArrayBufferViewU8>::from_view(chunk);
2006
2007        // If chunk.[[ByteLength]] is 0, throw a TypeError exception.
2008        if chunk.byte_length() == 0 {
2009            return Err(Error::Type(c"chunk.ByteLength is 0".to_owned()));
2010        }
2011
2012        // If chunk.[[ViewedArrayBuffer]].[[ByteLength]] is 0, throw a TypeError exception.
2013        if chunk.viewed_buffer_array_byte_length(cx.into()) == 0 {
2014            return Err(Error::Type(
2015                c"chunk.ViewedArrayBuffer.ByteLength is 0".to_owned(),
2016            ));
2017        }
2018
2019        // If this.[[closeRequested]] is true, throw a TypeError exception.
2020        if self.close_requested.get() {
2021            return Err(Error::Type(c"closeRequested is true".to_owned()));
2022        }
2023
2024        // If this.[[stream]].[[state]] is not "readable", throw a TypeError exception.
2025        if !self.stream.get().unwrap().is_readable() {
2026            return Err(Error::Type(c"stream is not readable".to_owned()));
2027        }
2028
2029        // Return ? ReadableByteStreamControllerEnqueue(this, chunk).
2030        self.enqueue(cx, chunk)
2031    }
2032
2033    /// <https://streams.spec.whatwg.org/#rbs-controller-error>
2034    fn Error(&self, cx: &mut JSContext, e: SafeHandleValue) -> Fallible<()> {
2035        // Perform ! ReadableByteStreamControllerError(this, e).
2036        self.error(cx, e);
2037        Ok(())
2038    }
2039}