Skip to main content

script/dom/stream/
readablebytestreamcontroller.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4
5use std::cell::Cell;
6use std::cmp::min;
7use std::collections::VecDeque;
8use std::rc::Rc;
9
10use dom_struct::dom_struct;
11use js::context::JSContext;
12use js::jsapi::{Heap, Type};
13use js::jsval::UndefinedValue;
14use js::realm::CurrentRealm;
15use js::rust::{HandleObject, HandleValue as SafeHandleValue, HandleValue};
16use js::typedarray::{ArrayBufferU8, ArrayBufferViewU8};
17use script_bindings::cell::DomRefCell;
18use script_bindings::reflector::{Reflector, reflect_dom_object_with_cx};
19
20use super::readablestreambyobreader::ReadIntoRequest;
21use super::readablestreamdefaultreader::ReadRequest;
22use super::underlyingsourcecontainer::{UnderlyingSourceContainer, UnderlyingSourceType};
23use crate::dom::bindings::buffer_source::{
24    Constructor, HeapBufferSource, byte_size, create_array_buffer_with_size,
25    create_buffer_source_with_constructor,
26};
27use crate::dom::bindings::codegen::Bindings::ReadableByteStreamControllerBinding::ReadableByteStreamControllerMethods;
28use crate::dom::bindings::codegen::UnionTypes::ReadableStreamDefaultControllerOrReadableByteStreamController as Controller;
29use crate::dom::bindings::error::{Error, ErrorToJsval, Fallible};
30use crate::dom::bindings::reflector::DomGlobal;
31use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom};
32use crate::dom::bindings::trace::RootedTraceableBox;
33use crate::dom::globalscope::GlobalScope;
34use crate::dom::promise::Promise;
35use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
36use crate::dom::stream::readablestream::ReadableStream;
37use crate::dom::stream::readablestreambyobrequest::ReadableStreamBYOBRequest;
38use crate::realms::enter_auto_realm;
39
40/// <https://streams.spec.whatwg.org/#readable-byte-stream-queue-entry>
41#[derive(JSTraceable, MallocSizeOf)]
42#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
43pub(crate) struct QueueEntry {
44    /// <https://streams.spec.whatwg.org/#readable-byte-stream-queue-entry-buffer>
45    #[ignore_malloc_size_of = "HeapBufferSource"]
46    buffer: HeapBufferSource<ArrayBufferU8>,
47    /// <https://streams.spec.whatwg.org/#readable-byte-stream-queue-entry-byte-offset>
48    byte_offset: usize,
49    /// <https://streams.spec.whatwg.org/#readable-byte-stream-queue-entry-byte-length>
50    byte_length: usize,
51}
52
53impl js::gc::Rootable for QueueEntry {}
54
55impl QueueEntry {
56    pub(crate) fn new(
57        buffer: RootedTraceableBox<HeapBufferSource<ArrayBufferU8>>,
58        byte_offset: usize,
59        byte_length: usize,
60    ) -> QueueEntry {
61        QueueEntry {
62            buffer: *buffer.into_box(),
63            byte_offset,
64            byte_length,
65        }
66    }
67}
68
69#[derive(Debug, Eq, JSTraceable, MallocSizeOf, PartialEq)]
70pub(crate) enum ReaderType {
71    /// <https://streams.spec.whatwg.org/#readablestreambyobreader>
72    Byob,
73    /// <https://streams.spec.whatwg.org/#readablestreamdefaultreader>
74    Default,
75}
76
77/// <https://streams.spec.whatwg.org/#pull-into-descriptor>
78#[derive(Eq, JSTraceable, MallocSizeOf, PartialEq)]
79#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
80pub(crate) struct PullIntoDescriptor {
81    #[ignore_malloc_size_of = "HeapBufferSource"]
82    /// <https://streams.spec.whatwg.org/#pull-into-descriptor-buffer>
83    buffer: HeapBufferSource<ArrayBufferU8>,
84    /// <https://streams.spec.whatwg.org/#pull-into-descriptor-buffer-byte-length>
85    buffer_byte_length: u64,
86    /// <https://streams.spec.whatwg.org/#pull-into-descriptor-byte-offset>
87    byte_offset: u64,
88    /// <https://streams.spec.whatwg.org/#pull-into-descriptor-byte-length>
89    byte_length: u64,
90    /// <https://streams.spec.whatwg.org/#pull-into-descriptor-bytes-filled>
91    bytes_filled: Cell<u64>,
92    /// <https://streams.spec.whatwg.org/#pull-into-descriptor-minimum-fill>
93    minimum_fill: u64,
94    /// <https://streams.spec.whatwg.org/#pull-into-descriptor-element-size>
95    element_size: u64,
96    /// <https://streams.spec.whatwg.org/#pull-into-descriptor-view-constructor>
97    view_constructor: Constructor,
98    /// <https://streams.spec.whatwg.org/#pull-into-descriptor-reader-type>
99    reader_type: Option<ReaderType>,
100}
101
102impl js::gc::Rootable for PullIntoDescriptor {}
103
104/// The fulfillment handler for
105/// <https://streams.spec.whatwg.org/#dom-underlyingsource-start>
106#[derive(Clone, JSTraceable, MallocSizeOf)]
107#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
108struct StartAlgorithmFulfillmentHandler {
109    controller: Dom<ReadableByteStreamController>,
110}
111
112impl Callback for StartAlgorithmFulfillmentHandler {
113    /// Continuation of <https://streams.spec.whatwg.org/#set-up-readable-byte-stream-controller>
114    /// Upon fulfillment of startPromise,
115    fn callback(&self, cx: &mut CurrentRealm, _v: HandleValue) {
116        // Set controller.[[started]] to true.
117        self.controller.started.set(true);
118
119        // Assert: controller.[[pulling]] is false.
120        assert!(!self.controller.pulling.get());
121
122        // Assert: controller.[[pullAgain]] is false.
123        assert!(!self.controller.pull_again.get());
124
125        // Perform ! ReadableByteStreamControllerCallPullIfNeeded(controller).
126        self.controller.call_pull_if_needed(cx);
127    }
128}
129
130/// The rejection handler for
131/// <https://streams.spec.whatwg.org/#dom-underlyingsource-start>
132#[derive(Clone, JSTraceable, MallocSizeOf)]
133#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
134struct StartAlgorithmRejectionHandler {
135    controller: Dom<ReadableByteStreamController>,
136}
137
138impl Callback for StartAlgorithmRejectionHandler {
139    /// Continuation of <https://streams.spec.whatwg.org/#set-up-readable-byte-stream-controller>
140    /// Upon rejection of startPromise with reason r,
141    fn callback(&self, cx: &mut CurrentRealm, v: HandleValue) {
142        // Perform ! ReadableByteStreamControllerError(controller, r).
143        self.controller.error(cx, v);
144    }
145}
146
147/// The fulfillment handler for
148/// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-call-pull-if-needed>
149#[derive(Clone, JSTraceable, MallocSizeOf)]
150#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
151struct PullAlgorithmFulfillmentHandler {
152    controller: Dom<ReadableByteStreamController>,
153}
154
155impl Callback for PullAlgorithmFulfillmentHandler {
156    /// Continuation of <https://streams.spec.whatwg.org/#readable-byte-stream-controller-call-pull-if-needed>
157    /// Upon fulfillment of pullPromise
158    fn callback(&self, cx: &mut CurrentRealm, _v: HandleValue) {
159        // Set controller.[[pulling]] to false.
160        self.controller.pulling.set(false);
161
162        // If controller.[[pullAgain]] is true,
163        if self.controller.pull_again.get() {
164            // Set controller.[[pullAgain]] to false.
165            self.controller.pull_again.set(false);
166
167            // Perform ! ReadableByteStreamControllerCallPullIfNeeded(controller).
168            self.controller.call_pull_if_needed(cx);
169        }
170    }
171}
172
173/// The rejection handler for
174/// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-call-pull-if-needed>
175#[derive(Clone, JSTraceable, MallocSizeOf)]
176#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
177struct PullAlgorithmRejectionHandler {
178    controller: Dom<ReadableByteStreamController>,
179}
180
181impl Callback for PullAlgorithmRejectionHandler {
182    /// Continuation of <https://streams.spec.whatwg.org/#readable-stream-byte-controller-call-pull-if-needed>
183    /// Upon rejection of pullPromise with reason e.
184    fn callback(&self, cx: &mut CurrentRealm, v: HandleValue) {
185        // Perform ! ReadableByteStreamControllerError(controller, e).
186        self.controller.error(cx, v);
187    }
188}
189
190/// <https://streams.spec.whatwg.org/#readablebytestreamcontroller>
191#[dom_struct]
192pub(crate) struct ReadableByteStreamController {
193    reflector_: Reflector,
194    /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-autoallocatechunksize>
195    auto_allocate_chunk_size: Option<u64>,
196    /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-stream>
197    stream: MutNullableDom<ReadableStream>,
198    /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-strategyhwm>
199    strategy_hwm: f64,
200    /// A mutable reference to the underlying source is used to implement these two
201    /// internal slots:
202    ///
203    /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-pullalgorithm>
204    /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-cancelalgorithm>
205    underlying_source: MutNullableDom<UnderlyingSourceContainer>,
206    /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-queue>
207    queue: DomRefCell<VecDeque<QueueEntry>>,
208    /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-queuetotalsize>
209    queue_total_size: Cell<f64>,
210    /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-byobrequest>
211    byob_request: MutNullableDom<ReadableStreamBYOBRequest>,
212    /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-pendingpullintos>
213    pending_pull_intos: DomRefCell<Vec<PullIntoDescriptor>>,
214    /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-closerequested>
215    close_requested: Cell<bool>,
216    /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-started>
217    started: Cell<bool>,
218    /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-pulling>
219    pulling: Cell<bool>,
220    /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-pullalgorithm>
221    pull_again: Cell<bool>,
222}
223
224impl ReadableByteStreamController {
225    fn new_inherited(
226        underlying_source_container: &UnderlyingSourceContainer,
227        strategy_hwm: f64,
228    ) -> ReadableByteStreamController {
229        let auto_allocate_chunk_size = underlying_source_container.auto_allocate_chunk_size();
230        ReadableByteStreamController {
231            reflector_: Reflector::new(),
232            byob_request: MutNullableDom::new(None),
233            stream: MutNullableDom::new(None),
234            underlying_source: MutNullableDom::new(Some(underlying_source_container)),
235            auto_allocate_chunk_size,
236            pending_pull_intos: DomRefCell::new(Vec::new()),
237            strategy_hwm,
238            close_requested: Default::default(),
239            queue: DomRefCell::new(Default::default()),
240            queue_total_size: Default::default(),
241            started: Default::default(),
242            pulling: Default::default(),
243            pull_again: Default::default(),
244        }
245    }
246
247    pub(crate) fn new(
248        cx: &mut JSContext,
249        underlying_source_type: UnderlyingSourceType,
250        strategy_hwm: f64,
251        global: &GlobalScope,
252    ) -> DomRoot<ReadableByteStreamController> {
253        let underlying_source_container =
254            UnderlyingSourceContainer::new(cx, global, underlying_source_type);
255        reflect_dom_object_with_cx(
256            Box::new(ReadableByteStreamController::new_inherited(
257                &underlying_source_container,
258                strategy_hwm,
259            )),
260            global,
261            cx,
262        )
263    }
264
265    #[allow(dead_code)]
266    pub(crate) fn set_stream(&self, stream: &ReadableStream) {
267        self.stream.set(Some(stream))
268    }
269
270    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-pull-into>
271    pub(crate) fn perform_pull_into(
272        &self,
273        cx: &mut JSContext,
274        read_into_request: &ReadIntoRequest,
275        view: &HeapBufferSource<ArrayBufferViewU8>,
276        min: u64,
277    ) {
278        // Let stream be controller.[[stream]].
279        let stream = self.stream.get().unwrap();
280
281        // Let elementSize be 1.
282        let mut element_size = 1;
283
284        // Let ctor be %DataView%.
285        let mut ctor = Constructor::DataView;
286
287        // If view has a [[TypedArrayName]] internal slot (i.e., it is not a DataView),
288        if view.has_typed_array_name() {
289            // Set elementSize to the element size specified in the
290            // typed array constructors table for view.[[TypedArrayName]].
291            let view_typw = view.get_array_buffer_view_type();
292            element_size = byte_size(view_typw);
293
294            // Set ctor to the constructor specified in the typed array constructors table for view.[[TypedArrayName]].
295            ctor = Constructor::Name(view_typw);
296        }
297
298        // Let minimumFill be min × elementSize.
299        let minimum_fill = min * element_size;
300
301        // Assert: minimumFill ≥ 0 and minimumFill ≤ view.[[ByteLength]].
302        assert!(minimum_fill <= (view.byte_length() as u64));
303
304        // Assert: the remainder after dividing minimumFill by elementSize is 0.
305        assert_eq!(minimum_fill % element_size, 0);
306
307        // Let byteOffset be view.[[ByteOffset]].
308        let byte_offset = view.get_byte_offset();
309
310        // Let byteLength be view.[[ByteLength]].
311        let byte_length = view.byte_length();
312
313        // Let bufferResult be TransferArrayBuffer(view.[[ViewedArrayBuffer]]).
314        match view
315            .get_array_buffer_view_buffer(cx)
316            .transfer_array_buffer(cx)
317        {
318            Ok(buffer) => {
319                // Let buffer be bufferResult.[[Value]].
320                // Let pullIntoDescriptor be a new pull-into descriptor with
321                // buffer   buffer
322                // buffer byte length   buffer.[[ArrayBufferByteLength]]
323                // byte offset  byteOffset
324                // byte length  byteLength
325                // bytes filled  0
326                // minimum fill minimumFill
327                // element size elementSize
328                // view constructor ctor
329                // reader type  "byob"
330                let buffer_byte_length = buffer.byte_length();
331                let pull_into_descriptor = RootedTraceableBox::new(PullIntoDescriptor {
332                    buffer: *buffer.into_box(),
333                    buffer_byte_length: buffer_byte_length as u64,
334                    byte_offset: byte_offset as u64,
335                    byte_length: byte_length as u64,
336                    bytes_filled: Cell::new(0),
337                    minimum_fill,
338                    element_size,
339                    view_constructor: ctor.clone(),
340                    reader_type: Some(ReaderType::Byob),
341                });
342
343                // If controller.[[pendingPullIntos]] is not empty,
344                {
345                    let mut pending_pull_intos = self.pending_pull_intos.borrow_mut();
346                    if !pending_pull_intos.is_empty() {
347                        // Append pullIntoDescriptor to controller.[[pendingPullIntos]].
348                        pending_pull_intos.push(*pull_into_descriptor.into_box());
349
350                        // Perform ! ReadableStreamAddReadIntoRequest(stream, readIntoRequest).
351                        stream.add_read_into_request(read_into_request);
352
353                        // Return.
354                        return;
355                    }
356                }
357
358                // If stream.[[state]] is "closed",
359                if stream.is_closed() {
360                    // Let emptyView be ! Construct(ctor, « pullIntoDescriptor’s buffer,
361                    // pullIntoDescriptor’s byte offset, 0 »).
362                    if let Ok(empty_view) = create_buffer_source_with_constructor(
363                        cx,
364                        &ctor,
365                        &pull_into_descriptor.buffer,
366                        pull_into_descriptor.byte_offset as usize,
367                        0,
368                    ) {
369                        // Perform readIntoRequest’s close steps, given emptyView.
370                        let result = RootedTraceableBox::new(Heap::default());
371                        rooted!(&in(cx) let mut view_value = UndefinedValue());
372                        empty_view.get_buffer_view_value(cx, view_value.handle_mut());
373                        result.set(*view_value);
374
375                        read_into_request.close_steps(cx, Some(result));
376
377                        // Return.
378                        return;
379                    } else {
380                        return;
381                    }
382                }
383
384                // If controller.[[queueTotalSize]] > 0,
385                if self.queue_total_size.get() > 0.0 {
386                    // If ! ReadableByteStreamControllerFillPullIntoDescriptorFromQueue(
387                    // controller, pullIntoDescriptor) is true,
388                    if self.fill_pull_into_descriptor_from_queue(cx, &pull_into_descriptor) {
389                        // Let filledView be ! ReadableByteStreamControllerConvertPullIntoDescriptor(
390                        // pullIntoDescriptor).
391                        if let Ok(filled_view) =
392                            self.convert_pull_into_descriptor(cx, &pull_into_descriptor)
393                        {
394                            // Perform ! ReadableByteStreamControllerHandleQueueDrain(controller).
395                            self.handle_queue_drain(cx);
396
397                            // Perform readIntoRequest’s chunk steps, given filledView.
398                            let result = RootedTraceableBox::new(Heap::default());
399                            rooted!(&in(cx) let mut view_value = UndefinedValue());
400                            filled_view.get_buffer_view_value(cx, view_value.handle_mut());
401                            result.set(*view_value);
402                            read_into_request.chunk_steps(cx, result);
403
404                            // Return.
405                            return;
406                        } else {
407                            return;
408                        }
409                    }
410
411                    // If controller.[[closeRequested]] is true,
412                    if self.close_requested.get() {
413                        // Let e be a new TypeError exception.
414                        rooted!(&in(cx) let mut error = UndefinedValue());
415                        Error::Type(c"close requested".to_owned()).to_jsval(
416                            cx,
417                            &self.global(),
418                            error.handle_mut(),
419                        );
420
421                        // Perform ! ReadableByteStreamControllerError(controller, e).
422                        self.error(cx, error.handle());
423
424                        // Perform readIntoRequest’s error steps, given e.
425                        read_into_request.error_steps(cx, error.handle());
426
427                        // Return.
428                        return;
429                    }
430                }
431
432                // Append pullIntoDescriptor to controller.[[pendingPullIntos]].
433                {
434                    self.pending_pull_intos
435                        .borrow_mut()
436                        .push(*pull_into_descriptor.into_box());
437                }
438                // Perform ! ReadableStreamAddReadIntoRequest(stream, readIntoRequest).
439                stream.add_read_into_request(read_into_request);
440
441                // Perform ! ReadableByteStreamControllerCallPullIfNeeded(controller).
442                self.call_pull_if_needed(cx);
443            },
444            Err(error) => {
445                // If bufferResult is an abrupt completion,
446
447                // Perform readIntoRequest’s error steps, given bufferResult.[[Value]].
448                rooted!(&in(cx) let mut rval = UndefinedValue());
449                error.to_jsval(cx, &self.global(), rval.handle_mut());
450                read_into_request.error_steps(cx, rval.handle());
451
452                // Return.
453            },
454        }
455    }
456
457    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-respond>
458    pub(crate) fn respond(&self, cx: &mut JSContext, bytes_written: u64) -> Fallible<()> {
459        {
460            // Assert: controller.[[pendingPullIntos]] is not empty.
461            let mut pending_pull_intos = self.pending_pull_intos.borrow_mut();
462            assert!(!pending_pull_intos.is_empty());
463
464            // Let firstDescriptor be controller.[[pendingPullIntos]][0].
465            let first_descriptor = pending_pull_intos.first_mut().unwrap();
466
467            // Let state be controller.[[stream]].[[state]].
468            let stream = self.stream.get().unwrap();
469
470            // If state is "closed",
471            if stream.is_closed() {
472                // If bytesWritten is not 0, throw a TypeError exception.
473                if bytes_written != 0 {
474                    return Err(Error::Type(
475                        c"bytesWritten not zero on closed stream".to_owned(),
476                    ));
477                }
478            } else {
479                // Assert: state is "readable".
480                assert!(stream.is_readable());
481
482                // If bytesWritten is 0, throw a TypeError exception.
483                if bytes_written == 0 {
484                    return Err(Error::Type(c"bytesWritten is 0".to_owned()));
485                }
486
487                // If firstDescriptor’s bytes filled + bytesWritten > firstDescriptor’s byte length,
488                // throw a RangeError exception.
489                if first_descriptor.bytes_filled.get() + bytes_written >
490                    first_descriptor.byte_length
491                {
492                    return Err(Error::Range(
493                        c"bytes filled + bytesWritten > byte length".to_owned(),
494                    ));
495                }
496            }
497
498            // Set firstDescriptor’s buffer to ! TransferArrayBuffer(firstDescriptor’s buffer).
499            first_descriptor.buffer = *first_descriptor
500                .buffer
501                .transfer_array_buffer(cx)
502                .expect("TransferArrayBuffer failed")
503                .into_box();
504        }
505
506        // Perform ? ReadableByteStreamControllerRespondInternal(controller, bytesWritten).
507        self.respond_internal(cx, bytes_written)
508    }
509
510    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-respond-internal>
511    fn respond_internal(&self, cx: &mut JSContext, bytes_written: u64) -> Fallible<()> {
512        {
513            // Let firstDescriptor be controller.[[pendingPullIntos]][0].
514            let pending_pull_intos = self.pending_pull_intos.borrow();
515            let first_descriptor = pending_pull_intos.first().unwrap();
516
517            // Assert: ! CanTransferArrayBuffer(firstDescriptor’s buffer) is true
518            assert!(first_descriptor.buffer.can_transfer_array_buffer(cx));
519        }
520
521        // Perform ! ReadableByteStreamControllerInvalidateBYOBRequest(controller).
522        self.invalidate_byob_request();
523
524        // Let state be controller.[[stream]].[[state]].
525        let stream = self.stream.get().unwrap();
526
527        // If state is "closed",
528        if stream.is_closed() {
529            // Assert: bytesWritten is 0.
530            assert_eq!(bytes_written, 0);
531
532            // Perform ! ReadableByteStreamControllerRespondInClosedState(controller, firstDescriptor).
533            self.respond_in_closed_state(cx)
534                .expect("respond_in_closed_state failed");
535        } else {
536            // Assert: state is "readable".
537            assert!(stream.is_readable());
538
539            // Assert: bytesWritten > 0.
540            assert!(bytes_written > 0);
541
542            // Perform ? ReadableByteStreamControllerRespondInReadableState(controller, bytesWritten, firstDescriptor).
543            self.respond_in_readable_state(cx, bytes_written)?;
544        }
545
546        // Perform ! ReadableByteStreamControllerCallPullIfNeeded(controller).
547        self.call_pull_if_needed(cx);
548
549        Ok(())
550    }
551
552    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-respond-in-closed-state>
553    fn respond_in_closed_state(&self, cx: &mut JSContext) -> Fallible<()> {
554        let pending_pull_intos = self.pending_pull_intos.borrow();
555        let first_descriptor = pending_pull_intos.first().unwrap();
556
557        // Assert: the remainder after dividing firstDescriptor’s bytes filled
558        // by firstDescriptor’s element size is 0.
559        assert_eq!(
560            first_descriptor.bytes_filled.get() % first_descriptor.element_size,
561            0
562        );
563
564        // If firstDescriptor’s reader type is "none",
565        // perform ! ReadableByteStreamControllerShiftPendingPullInto(controller).
566        let reader_type = first_descriptor.reader_type.is_none();
567
568        // needed to drop the borrow and avoid BorrowMutError
569        drop(pending_pull_intos);
570
571        if reader_type {
572            self.shift_pending_pull_into();
573        }
574
575        // Let stream be controller.[[stream]].
576        let stream = self.stream.get().unwrap();
577
578        // If ! ReadableStreamHasBYOBReader(stream) is true,
579        if stream.has_byob_reader() {
580            // Let filledPullIntos be a new empty list.
581            rooted!(&in(cx) let mut filled_pull_intos = Vec::new());
582
583            // While filledPullIntos’s size < ! ReadableStreamGetNumReadIntoRequests(stream),
584            while filled_pull_intos.len() < stream.get_num_read_into_requests() {
585                // Let pullIntoDescriptor be ! ReadableByteStreamControllerShiftPendingPullInto(controller).
586                // Append pullIntoDescriptor to filledPullIntos.
587                filled_pull_intos.push(self.shift_pending_pull_into());
588            }
589
590            // For each filledPullInto of filledPullIntos,
591            for filled_pull_into in &*filled_pull_intos {
592                // Perform ! ReadableByteStreamControllerCommitPullIntoDescriptor(stream, filledPullInto).
593                self.commit_pull_into_descriptor(cx, filled_pull_into)
594                    .expect("commit_pull_into_descriptor failed");
595            }
596        }
597
598        Ok(())
599    }
600
601    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-respond-in-readable-state>
602    fn respond_in_readable_state(&self, cx: &mut JSContext, bytes_written: u64) -> Fallible<()> {
603        let pending_pull_intos = self.pending_pull_intos.borrow();
604        let first_descriptor = pending_pull_intos.first().unwrap();
605
606        // Assert: pullIntoDescriptor’s bytes filled + bytesWritten ≤ pullIntoDescriptor’s byte length.
607        assert!(
608            first_descriptor.bytes_filled.get() + bytes_written <= first_descriptor.byte_length
609        );
610
611        // Perform ! ReadableByteStreamControllerFillHeadPullIntoDescriptor(
612        // controller, bytesWritten, pullIntoDescriptor).
613        self.fill_head_pull_into_descriptor(bytes_written, first_descriptor);
614
615        // If pullIntoDescriptor’s reader type is "none",
616        if first_descriptor.reader_type.is_none() {
617            // needed to drop the borrow and avoid BorrowMutError
618            drop(pending_pull_intos);
619
620            // Perform ? ReadableByteStreamControllerEnqueueDetachedPullIntoToQueue(controller, pullIntoDescriptor).
621            self.enqueue_detached_pull_into_to_queue(cx)?;
622
623            // Let filledPullIntos be the result of performing
624            // ! ReadableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(controller).
625            rooted!(&in(cx) let filled_pull_intos = self.process_pull_into_descriptors_using_queue(cx));
626
627            // For each filledPullInto of filledPullIntos,
628            for filled_pull_into in &*filled_pull_intos {
629                // Perform ! ReadableByteStreamControllerCommitPullIntoDescriptor(controller.[[stream]]
630                // , filledPullInto).
631                self.commit_pull_into_descriptor(cx, filled_pull_into)
632                    .expect("commit_pull_into_descriptor failed");
633            }
634
635            // Return.
636            return Ok(());
637        }
638
639        // If pullIntoDescriptor’s bytes filled < pullIntoDescriptor’s minimum fill, return.
640        if first_descriptor.bytes_filled.get() < first_descriptor.minimum_fill {
641            return Ok(());
642        }
643
644        // needed to drop the borrow and avoid BorrowMutError
645        drop(pending_pull_intos);
646
647        // Perform ! ReadableByteStreamControllerShiftPendingPullInto(controller).
648        rooted!(&in(cx) let pull_into_descriptor = self.shift_pending_pull_into());
649
650        // Let remainderSize be the remainder after dividing pullIntoDescriptor’s bytes
651        // filled by pullIntoDescriptor’s element size.
652        let remainder_size =
653            pull_into_descriptor.bytes_filled.get() % pull_into_descriptor.element_size;
654
655        // If remainderSize > 0,
656        if remainder_size > 0 {
657            // Let end be pullIntoDescriptor’s byte offset + pullIntoDescriptor’s bytes filled.
658            let end = pull_into_descriptor.byte_offset + pull_into_descriptor.bytes_filled.get();
659
660            // Perform ? ReadableByteStreamControllerEnqueueClonedChunkToQueue(controller,
661            // pullIntoDescriptor’s buffer, end − remainderSize, remainderSize).
662            self.enqueue_cloned_chunk_to_queue(
663                cx,
664                &pull_into_descriptor.buffer,
665                end - remainder_size,
666                remainder_size,
667            )?;
668        }
669
670        // Set pullIntoDescriptor’s bytes filled to pullIntoDescriptor’s bytes filled − remainderSize.
671        pull_into_descriptor
672            .bytes_filled
673            .set(pull_into_descriptor.bytes_filled.get() - remainder_size);
674
675        // Let filledPullIntos be the result of performing
676        // ! ReadableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(controller).
677        rooted!(&in(cx) let filled_pull_intos = self.process_pull_into_descriptors_using_queue(cx));
678
679        // Perform ! ReadableByteStreamControllerCommitPullIntoDescriptor(controller.[[stream]], pullIntoDescriptor).
680        self.commit_pull_into_descriptor(cx, &pull_into_descriptor)
681            .expect("commit_pull_into_descriptor failed");
682
683        // For each filledPullInto of filledPullIntos,
684        for filled_pull_into in &*filled_pull_intos {
685            // Perform ! ReadableByteStreamControllerCommitPullIntoDescriptor(controller.[[stream]], filledPullInto).
686            self.commit_pull_into_descriptor(cx, filled_pull_into)
687                .expect("commit_pull_into_descriptor failed");
688        }
689
690        Ok(())
691    }
692
693    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-respond-with-new-view>
694    pub(crate) fn respond_with_new_view(
695        &self,
696        cx: &mut JSContext,
697        view: &HeapBufferSource<ArrayBufferViewU8>,
698    ) -> Fallible<()> {
699        let view_byte_length;
700        {
701            // Assert: controller.[[pendingPullIntos]] is not empty.
702            let mut pending_pull_intos = self.pending_pull_intos.borrow_mut();
703            assert!(!pending_pull_intos.is_empty());
704
705            // Assert: ! IsDetachedBuffer(view.[[ViewedArrayBuffer]]) is false.
706            assert!(!view.is_detached_buffer(cx));
707
708            // Let firstDescriptor be controller.[[pendingPullIntos]][0].
709            let first_descriptor = pending_pull_intos.first_mut().unwrap();
710
711            // Let state be controller.[[stream]].[[state]].
712            let stream = self.stream.get().unwrap();
713
714            // If state is "closed",
715            if stream.is_closed() {
716                // If view.[[ByteLength]] is not 0, throw a TypeError exception.
717                if view.byte_length() != 0 {
718                    return Err(Error::Type(c"view byte length is not 0".to_owned()));
719                }
720            } else {
721                // Assert: state is "readable".
722                assert!(stream.is_readable());
723
724                // If view.[[ByteLength]] is 0, throw a TypeError exception.
725                if view.byte_length() == 0 {
726                    return Err(Error::Type(c"view byte length is 0".to_owned()));
727                }
728            }
729
730            // If firstDescriptor’s byte offset + firstDescriptor’ bytes filled is not view.[[ByteOffset]],
731            // throw a RangeError exception.
732            if first_descriptor.byte_offset + first_descriptor.bytes_filled.get() !=
733                (view.get_byte_offset() as u64)
734            {
735                return Err(Error::Range(
736                    c"firstDescriptor's byte offset + bytes filled is not view byte offset"
737                        .to_owned(),
738                ));
739            }
740
741            // If firstDescriptor’s buffer byte length is not view.[[ViewedArrayBuffer]].[[ByteLength]],
742            // throw a RangeError exception.
743            if first_descriptor.buffer_byte_length !=
744                (view.viewed_buffer_array_byte_length(cx) as u64)
745            {
746                return Err(Error::Range(
747                c"firstDescriptor's buffer byte length is not view viewed buffer array byte length"
748                    .to_owned(),
749            ));
750            }
751
752            // If firstDescriptor’s bytes filled + view.[[ByteLength]] > firstDescriptor’s byte length,
753            // throw a RangeError exception.
754            if first_descriptor.bytes_filled.get() + (view.byte_length()) as u64 >
755                first_descriptor.byte_length
756            {
757                return Err(Error::Range(
758                    c"bytes filled + view byte length > byte length".to_owned(),
759                ));
760            }
761
762            // Let viewByteLength be view.[[ByteLength]].
763            view_byte_length = view.byte_length();
764
765            // Set firstDescriptor’s buffer to ? TransferArrayBuffer(view.[[ViewedArrayBuffer]]).
766            first_descriptor.buffer = *view
767                .get_array_buffer_view_buffer(cx)
768                .transfer_array_buffer(cx)?
769                .into_box();
770        }
771
772        // Perform ? ReadableByteStreamControllerRespondInternal(controller, viewByteLength).
773        self.respond_internal(cx, view_byte_length as u64)
774    }
775
776    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-get-desired-size>
777    pub(crate) fn get_desired_size(&self) -> Option<f64> {
778        // Let state be controller.[[stream]].[[state]].
779        let stream = self.stream.get()?;
780
781        // If state is "errored", return null.
782        if stream.is_errored() {
783            return None;
784        }
785
786        // If state is "closed", return 0.
787        if stream.is_closed() {
788            return Some(0.0);
789        }
790
791        // Return controller.[[strategyHWM]] − controller.[[queueTotalSize]].
792        Some(self.strategy_hwm - self.queue_total_size.get())
793    }
794
795    /// <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamcontrollergetbyobrequest>
796    pub(crate) fn get_byob_request(
797        &self,
798        cx: &mut js::context::JSContext,
799    ) -> Fallible<Option<DomRoot<ReadableStreamBYOBRequest>>> {
800        // If controller.[[byobRequest]] is null and controller.[[pendingPullIntos]] is not empty,
801        let pending_pull_intos = self.pending_pull_intos.borrow();
802        if self.byob_request.get().is_none() && !pending_pull_intos.is_empty() {
803            // Let firstDescriptor be controller.[[pendingPullIntos]][0].
804            let first_descriptor = pending_pull_intos.first().unwrap();
805            // Let view be ! Construct(%Uint8Array%, « firstDescriptor’s buffer,
806            // firstDescriptor’s byte offset + firstDescriptor’s bytes filled,
807            // firstDescriptor’s byte length − firstDescriptor’s bytes filled »).
808
809            let byte_offset = first_descriptor.byte_offset + first_descriptor.bytes_filled.get();
810            let byte_length = first_descriptor.byte_length - first_descriptor.bytes_filled.get();
811
812            let view = create_buffer_source_with_constructor(
813                cx,
814                &Constructor::Name(Type::Uint8),
815                &first_descriptor.buffer,
816                byte_offset as usize,
817                byte_length as usize,
818            )
819            .expect("Construct Uint8Array failed");
820
821            // Let byobRequest be a new ReadableStreamBYOBRequest.
822            let byob_request = ReadableStreamBYOBRequest::new(cx, &self.global());
823
824            // Set byobRequest.[[controller]] to controller.
825            byob_request.set_controller(Some(&DomRoot::from_ref(self)));
826
827            // Set byobRequest.[[view]] to view.
828            byob_request.set_view(Some(view));
829
830            // Set controller.[[byobRequest]] to byobRequest.
831            self.byob_request.set(Some(&byob_request));
832        }
833
834        // Return controller.[[byobRequest]].
835        Ok(self.byob_request.get())
836    }
837
838    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-close>
839    pub(crate) fn close(&self, cx: &mut JSContext) -> Fallible<()> {
840        // Let stream be controller.[[stream]].
841        let stream = self.stream.get().unwrap();
842
843        // If controller.[[closeRequested]] is true or stream.[[state]] is not "readable", return.
844        if self.close_requested.get() || !stream.is_readable() {
845            return Ok(());
846        }
847
848        // If controller.[[queueTotalSize]] > 0,
849        if self.queue_total_size.get() > 0.0 {
850            // Set controller.[[closeRequested]] to true.
851            self.close_requested.set(true);
852            // Return.
853            return Ok(());
854        }
855
856        {
857            // If controller.[[pendingPullIntos]] is not empty,
858            let pending_pull_intos = self.pending_pull_intos.borrow();
859            if !pending_pull_intos.is_empty() {
860                // Let firstPendingPullInto be controller.[[pendingPullIntos]][0].
861                let first_pending_pull_into = pending_pull_intos.first().unwrap();
862
863                // If the remainder after dividing firstPendingPullInto’s bytes filled by
864                // firstPendingPullInto’s element size is not 0,
865                if !first_pending_pull_into
866                    .bytes_filled
867                    .get()
868                    .is_multiple_of(first_pending_pull_into.element_size)
869                {
870                    // needed to drop the borrow and avoid BorrowMutError
871                    drop(pending_pull_intos);
872
873                    // Let e be a new TypeError exception.
874                    let e = Error::Type(
875                        c"remainder after dividing firstPendingPullInto's bytes
876                    filled by firstPendingPullInto's element size is not 0"
877                            .to_owned(),
878                    );
879
880                    // Perform ! ReadableByteStreamControllerError(controller, e).
881                    rooted!(&in(cx) let mut error = UndefinedValue());
882                    e.clone().to_jsval(cx, &self.global(), error.handle_mut());
883                    self.error(cx, error.handle());
884
885                    // Throw e.
886                    return Err(e);
887                }
888            }
889        }
890
891        // Perform ! ReadableByteStreamControllerClearAlgorithms(controller).
892        self.clear_algorithms();
893
894        // Perform ! ReadableStreamClose(stream).
895        stream.close(cx);
896        Ok(())
897    }
898
899    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-error>
900    pub(crate) fn error(&self, cx: &mut JSContext, e: SafeHandleValue) {
901        // Let stream be controller.[[stream]].
902        let stream = self.stream.get().unwrap();
903
904        // If stream.[[state]] is not "readable", return.
905        if !stream.is_readable() {
906            return;
907        }
908
909        // Perform ! ReadableByteStreamControllerClearPendingPullIntos(controller).
910        self.clear_pending_pull_intos();
911
912        // Perform ! ResetQueue(controller).
913        self.reset_queue();
914
915        // Perform ! ReadableByteStreamControllerClearAlgorithms(controller).
916        self.clear_algorithms();
917
918        // Perform ! ReadableStreamError(stream, e).
919        stream.error(cx, e);
920    }
921
922    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-clear-algorithms>
923    fn clear_algorithms(&self) {
924        // Set controller.[[pullAlgorithm]] to undefined.
925        // Set controller.[[cancelAlgorithm]] to undefined.
926        self.underlying_source.set(None);
927    }
928
929    /// <https://streams.spec.whatwg.org/#reset-queue>
930    pub(crate) fn reset_queue(&self) {
931        // Assert: container has [[queue]] and [[queueTotalSize]] internal slots.
932
933        // Set container.[[queue]] to a new empty list.
934        self.queue.borrow_mut().clear();
935
936        // Set container.[[queueTotalSize]] to 0.
937        self.queue_total_size.set(0.0);
938    }
939
940    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-clear-pending-pull-intos>
941    pub(crate) fn clear_pending_pull_intos(&self) {
942        // Perform ! ReadableByteStreamControllerInvalidateBYOBRequest(controller).
943        self.invalidate_byob_request();
944
945        // Set controller.[[pendingPullIntos]] to a new empty list.
946        self.pending_pull_intos.borrow_mut().clear();
947    }
948
949    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-invalidate-byob-request>
950    pub(crate) fn invalidate_byob_request(&self) {
951        if let Some(byob_request) = self.byob_request.get() {
952            // Set controller.[[byobRequest]].[[controller]] to undefined.
953            byob_request.set_controller(None);
954
955            // Set controller.[[byobRequest]].[[view]] to null.
956            byob_request.set_view(None);
957
958            // Set controller.[[byobRequest]] to null.
959            self.byob_request.set(None);
960        }
961        // If controller.[[byobRequest]] is null, return.
962    }
963
964    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-enqueue>
965    pub(crate) fn enqueue(
966        &self,
967        cx: &mut JSContext,
968        chunk: RootedTraceableBox<HeapBufferSource<ArrayBufferViewU8>>,
969    ) -> Fallible<()> {
970        // Let stream be controller.[[stream]].
971        let stream = self.stream.get().unwrap();
972
973        // If controller.[[closeRequested]] is true or stream.[[state]] is not "readable", return.
974        if self.close_requested.get() || !stream.is_readable() {
975            return Ok(());
976        }
977
978        // Let buffer be chunk.[[ViewedArrayBuffer]].
979        let buffer = chunk.get_array_buffer_view_buffer(cx);
980
981        // Let byteOffset be chunk.[[ByteOffset]].
982        let byte_offset = chunk.get_byte_offset();
983
984        // Let byteLength be chunk.[[ByteLength]].
985        let byte_length = chunk.byte_length();
986
987        // If ! IsDetachedBuffer(buffer) is true, throw a TypeError exception.
988        if buffer.is_detached_buffer(cx) {
989            return Err(Error::Type(c"buffer is detached".to_owned()));
990        }
991
992        // Let transferredBuffer be ? TransferArrayBuffer(buffer).
993        let transferred_buffer = buffer.transfer_array_buffer(cx)?;
994
995        // If controller.[[pendingPullIntos]] is not empty,
996        {
997            let mut pending_pull_intos = self.pending_pull_intos.borrow_mut();
998            if !pending_pull_intos.is_empty() {
999                // Let firstPendingPullInto be controller.[[pendingPullIntos]][0].
1000                let first_descriptor = pending_pull_intos.first_mut().unwrap();
1001                // If ! IsDetachedBuffer(firstPendingPullInto’s buffer) is true, throw a TypeError exception.
1002                if first_descriptor.buffer.is_detached_buffer(cx) {
1003                    return Err(Error::Type(c"buffer is detached".to_owned()));
1004                }
1005
1006                // Perform ! ReadableByteStreamControllerInvalidateBYOBRequest(controller).
1007                self.invalidate_byob_request();
1008
1009                // Set firstPendingPullInto’s buffer to ! TransferArrayBuffer(firstPendingPullInto’s buffer).
1010                first_descriptor.buffer = *first_descriptor
1011                    .buffer
1012                    .transfer_array_buffer(cx)
1013                    .expect("TransferArrayBuffer failed")
1014                    .into_box();
1015
1016                // If firstPendingPullInto’s reader type is "none",
1017                if first_descriptor.reader_type.is_none() {
1018                    // needed to drop the borrow and avoid BorrowMutError
1019                    drop(pending_pull_intos);
1020
1021                    // perform ? ReadableByteStreamControllerEnqueueDetachedPullIntoToQueue(
1022                    // controller, firstPendingPullInto).
1023                    self.enqueue_detached_pull_into_to_queue(cx)?;
1024                }
1025            }
1026        }
1027
1028        // If ! ReadableStreamHasDefaultReader(stream) is true,
1029        if stream.has_default_reader() {
1030            // Perform ! ReadableByteStreamControllerProcessReadRequestsUsingQueue(controller).
1031            self.process_read_requests_using_queue(cx)
1032                .expect("process_read_requests_using_queue failed");
1033
1034            // If ! ReadableStreamGetNumReadRequests(stream) is 0,
1035            if stream.get_num_read_requests() == 0 {
1036                // Assert: controller.[[pendingPullIntos]] is empty.
1037                {
1038                    assert!(self.pending_pull_intos.borrow().is_empty());
1039                }
1040
1041                // Perform ! ReadableByteStreamControllerEnqueueChunkToQueue(
1042                // controller, transferredBuffer, byteOffset, byteLength).
1043                self.enqueue_chunk_to_queue(transferred_buffer, byte_offset, byte_length);
1044            } else {
1045                // Assert: controller.[[queue]] is empty.
1046                assert!(self.queue.borrow().is_empty());
1047
1048                // If controller.[[pendingPullIntos]] is not empty,
1049
1050                let pending_pull_intos = self.pending_pull_intos.borrow();
1051                if !pending_pull_intos.is_empty() {
1052                    // Assert: controller.[[pendingPullIntos]][0]'s reader type is "default".
1053                    assert!(matches!(
1054                        pending_pull_intos.first().unwrap().reader_type,
1055                        Some(ReaderType::Default)
1056                    ));
1057
1058                    // needed to drop the borrow and avoid BorrowMutError
1059                    drop(pending_pull_intos);
1060
1061                    // Perform ! ReadableByteStreamControllerShiftPendingPullInto(controller).
1062                    self.shift_pending_pull_into();
1063                }
1064
1065                // Let transferredView be ! Construct(%Uint8Array%, « transferredBuffer, byteOffset, byteLength »).
1066                let transferred_view = create_buffer_source_with_constructor(
1067                    cx,
1068                    &Constructor::Name(Type::Uint8),
1069                    &transferred_buffer,
1070                    byte_offset,
1071                    byte_length,
1072                )
1073                .expect("Construct Uint8Array failed");
1074
1075                // Perform ! ReadableStreamFulfillReadRequest(stream, transferredView, false).
1076                rooted!(&in(cx) let mut view_value = UndefinedValue());
1077                transferred_view.get_buffer_view_value(cx, view_value.handle_mut());
1078                stream.fulfill_read_request(cx, view_value.handle(), false);
1079            }
1080            // Otherwise, if ! ReadableStreamHasBYOBReader(stream) is true,
1081        } else if stream.has_byob_reader() {
1082            // Perform ! ReadableByteStreamControllerEnqueueChunkToQueue(
1083            // controller, transferredBuffer, byteOffset, byteLength).
1084            self.enqueue_chunk_to_queue(transferred_buffer, byte_offset, byte_length);
1085
1086            // Let filledPullIntos be the result of performing !
1087            // ReadableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(controller).
1088            rooted!(&in(cx) let filled_pull_intos = self.process_pull_into_descriptors_using_queue(cx));
1089
1090            // For each filledPullInto of filledPullIntos,
1091            // Perform ! ReadableByteStreamControllerCommitPullIntoDescriptor(stream, filledPullInto).
1092            for filled_pull_into in &*filled_pull_intos {
1093                self.commit_pull_into_descriptor(cx, filled_pull_into)
1094                    .expect("commit_pull_into_descriptor failed");
1095            }
1096        } else {
1097            // Assert: ! IsReadableStreamLocked(stream) is false.
1098            assert!(!stream.is_locked());
1099
1100            // Perform ! ReadableByteStreamControllerEnqueueChunkToQueue
1101            // (controller, transferredBuffer, byteOffset, byteLength).
1102            self.enqueue_chunk_to_queue(transferred_buffer, byte_offset, byte_length);
1103        }
1104
1105        // Perform ! ReadableByteStreamControllerCallPullIfNeeded(controller).
1106        self.call_pull_if_needed(cx);
1107
1108        Ok(())
1109    }
1110
1111    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-commit-pull-into-descriptor>
1112    fn commit_pull_into_descriptor(
1113        &self,
1114        cx: &mut JSContext,
1115        pull_into_descriptor: &PullIntoDescriptor,
1116    ) -> Fallible<()> {
1117        // Assert: stream.[[state]] is not "errored".
1118        let stream = self.stream.get().unwrap();
1119        assert!(!stream.is_errored());
1120
1121        // Assert: pullIntoDescriptor.reader type is not "none".
1122        assert!(pull_into_descriptor.reader_type.is_some());
1123
1124        // Let done be false.
1125        let mut done = false;
1126
1127        // If stream.[[state]] is "closed",
1128        if stream.is_closed() {
1129            // Assert: the remainder after dividing pullIntoDescriptor’s bytes filled
1130            // by pullIntoDescriptor’s element size is 0.
1131            assert!(
1132                pull_into_descriptor
1133                    .bytes_filled
1134                    .get()
1135                    .is_multiple_of(pull_into_descriptor.element_size)
1136            );
1137
1138            // Set done to true.
1139            done = true;
1140        }
1141
1142        // Let filledView be ! ReadableByteStreamControllerConvertPullIntoDescriptor(pullIntoDescriptor).
1143        let filled_view = self
1144            .convert_pull_into_descriptor(cx, pull_into_descriptor)
1145            .expect("convert_pull_into_descriptor failed");
1146
1147        rooted!(&in(cx) let mut view_value = UndefinedValue());
1148        filled_view.get_buffer_view_value(cx, view_value.handle_mut());
1149
1150        // If pullIntoDescriptor’s reader type is "default",
1151        if matches!(pull_into_descriptor.reader_type, Some(ReaderType::Default)) {
1152            // Perform ! ReadableStreamFulfillReadRequest(stream, filledView, done).
1153
1154            stream.fulfill_read_request(cx, view_value.handle(), done);
1155        } else {
1156            // Assert: pullIntoDescriptor’s reader type is "byob".
1157            assert!(matches!(
1158                pull_into_descriptor.reader_type,
1159                Some(ReaderType::Byob)
1160            ));
1161
1162            // Perform ! ReadableStreamFulfillReadIntoRequest(stream, filledView, done).
1163            stream.fulfill_read_into_request(cx, view_value.handle(), done);
1164        }
1165        Ok(())
1166    }
1167
1168    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-convert-pull-into-descriptor>
1169    pub(crate) fn convert_pull_into_descriptor(
1170        &self,
1171        cx: &mut js::context::JSContext,
1172        pull_into_descriptor: &PullIntoDescriptor,
1173    ) -> Fallible<RootedTraceableBox<HeapBufferSource<ArrayBufferViewU8>>> {
1174        // Let bytesFilled be pullIntoDescriptor’s bytes filled.
1175        let bytes_filled = pull_into_descriptor.bytes_filled.get();
1176
1177        // Let elementSize be pullIntoDescriptor’s element size.
1178        let element_size = pull_into_descriptor.element_size;
1179
1180        // Assert: bytesFilled ≤ pullIntoDescriptor’s byte length.
1181        assert!(bytes_filled <= pull_into_descriptor.byte_length);
1182
1183        // Assert: the remainder after dividing bytesFilled by elementSize is 0.
1184        assert!(bytes_filled.is_multiple_of(element_size));
1185
1186        // Let buffer be ! TransferArrayBuffer(pullIntoDescriptor’s buffer).
1187        let buffer = pull_into_descriptor
1188            .buffer
1189            .transfer_array_buffer(cx)
1190            .expect("TransferArrayBuffer failed");
1191
1192        // Return ! Construct(pullIntoDescriptor’s view constructor,
1193        // « buffer, pullIntoDescriptor’s byte offset, bytesFilled ÷ elementSize »).
1194        Ok(create_buffer_source_with_constructor(
1195            cx,
1196            &pull_into_descriptor.view_constructor,
1197            &buffer,
1198            pull_into_descriptor.byte_offset as usize,
1199            (bytes_filled / element_size) as usize,
1200        )
1201        .expect("Construct view failed"))
1202    }
1203
1204    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-process-pull-into-descriptors-using-queue>
1205    pub(crate) fn process_pull_into_descriptors_using_queue(
1206        &self,
1207        cx: &mut js::context::JSContext,
1208    ) -> Vec<PullIntoDescriptor> {
1209        // Assert: controller.[[closeRequested]] is false.
1210        assert!(!self.close_requested.get());
1211
1212        // Let filledPullIntos be a new empty list.
1213        rooted!(&in(cx) let mut filled_pull_intos = Vec::new());
1214
1215        // While controller.[[pendingPullIntos]] is not empty,
1216        loop {
1217            // If controller.[[queueTotalSize]] is 0, then break.
1218            if self.queue_total_size.get() == 0.0 {
1219                break;
1220            }
1221
1222            // Let pullIntoDescriptor be controller.[[pendingPullIntos]][0].
1223            let fill_pull_result = {
1224                let pending_pull_intos = self.pending_pull_intos.borrow();
1225                let Some(pull_into_descriptor) = pending_pull_intos.first() else {
1226                    break;
1227                };
1228                self.fill_pull_into_descriptor_from_queue(cx, pull_into_descriptor)
1229            };
1230
1231            // If ! ReadableByteStreamControllerFillPullIntoDescriptorFromQueue(controller, pullIntoDescriptor) is true,
1232            if fill_pull_result {
1233                // Perform ! ReadableByteStreamControllerShiftPendingPullInto(controller).
1234                // Append pullIntoDescriptor to filledPullIntos.
1235                filled_pull_intos.push(self.shift_pending_pull_into());
1236            }
1237        }
1238
1239        // Return filledPullIntos.
1240        filled_pull_intos.take()
1241    }
1242
1243    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-fill-pull-into-descriptor-from-queue>
1244    pub(crate) fn fill_pull_into_descriptor_from_queue(
1245        &self,
1246        cx: &mut js::context::JSContext,
1247        pull_into_descriptor: &PullIntoDescriptor,
1248    ) -> bool {
1249        // Let maxBytesToCopy be min(controller.[[queueTotalSize]],
1250        // pullIntoDescriptor’s byte length − pullIntoDescriptor’s bytes filled).
1251        let max_bytes_to_copy = min(
1252            self.queue_total_size.get() as usize,
1253            (pull_into_descriptor.byte_length - pull_into_descriptor.bytes_filled.get()) as usize,
1254        );
1255
1256        // Let maxBytesFilled be pullIntoDescriptor’s bytes filled + maxBytesToCopy.
1257        let max_bytes_filled = pull_into_descriptor.bytes_filled.get() as usize + max_bytes_to_copy;
1258
1259        // Let totalBytesToCopyRemaining be maxBytesToCopy.
1260        let mut total_bytes_to_copy_remaining = max_bytes_to_copy;
1261
1262        // Let ready be false.
1263        let mut ready = false;
1264
1265        // Assert: ! IsDetachedBuffer(pullIntoDescriptor’s buffer) is false.
1266        assert!(!pull_into_descriptor.buffer.is_detached_buffer(cx));
1267
1268        // Assert: pullIntoDescriptor’s bytes filled < pullIntoDescriptor’s minimum fill.
1269        assert!(pull_into_descriptor.bytes_filled.get() < pull_into_descriptor.minimum_fill);
1270
1271        // Let remainderBytes be the remainder after dividing maxBytesFilled by pullIntoDescriptor’s element size.
1272        let remainder_bytes = max_bytes_filled % pull_into_descriptor.element_size as usize;
1273
1274        // Let maxAlignedBytes be maxBytesFilled − remainderBytes.
1275        let max_aligned_bytes = max_bytes_filled - remainder_bytes;
1276
1277        // If maxAlignedBytes ≥ pullIntoDescriptor’s minimum fill,
1278        if max_aligned_bytes >= pull_into_descriptor.minimum_fill as usize {
1279            // Set totalBytesToCopyRemaining to maxAlignedBytes − pullIntoDescriptor’s bytes filled.
1280            total_bytes_to_copy_remaining =
1281                max_aligned_bytes - (pull_into_descriptor.bytes_filled.get() as usize);
1282
1283            // Set ready to true.
1284            ready = true;
1285        }
1286
1287        // Let queue be controller.[[queue]].
1288        let mut queue = self.queue.borrow_mut();
1289
1290        // While totalBytesToCopyRemaining > 0,
1291        while total_bytes_to_copy_remaining > 0 {
1292            // Let headOfQueue be queue[0].
1293            let head_of_queue = queue.front_mut().unwrap();
1294
1295            // Let bytesToCopy be min(totalBytesToCopyRemaining, headOfQueue’s byte length).
1296            let bytes_to_copy = total_bytes_to_copy_remaining.min(head_of_queue.byte_length);
1297
1298            // Let destStart be pullIntoDescriptor’s byte offset + pullIntoDescriptor’s bytes filled.
1299            let dest_start =
1300                pull_into_descriptor.byte_offset + pull_into_descriptor.bytes_filled.get();
1301
1302            // Let descriptorBuffer be pullIntoDescriptor’s buffer.
1303            let descriptor_buffer = &pull_into_descriptor.buffer;
1304
1305            // Let queueBuffer be headOfQueue’s buffer.
1306            let queue_buffer = &head_of_queue.buffer;
1307
1308            // Let queueByteOffset be headOfQueue’s byte offset.
1309            let queue_byte_offset = head_of_queue.byte_offset;
1310
1311            // Assert: ! CanCopyDataBlockBytes(descriptorBuffer, destStart,
1312            // queueBuffer, queueByteOffset, bytesToCopy) is true.
1313            assert!(descriptor_buffer.can_copy_data_block_bytes(
1314                cx,
1315                dest_start as usize,
1316                queue_buffer,
1317                queue_byte_offset,
1318                bytes_to_copy
1319            ));
1320
1321            // Perform ! CopyDataBlockBytes(descriptorBuffer.[[ArrayBufferData]], destStart,
1322            // queueBuffer.[[ArrayBufferData]], queueByteOffset, bytesToCopy).
1323            descriptor_buffer.copy_data_block_bytes(
1324                cx,
1325                dest_start as usize,
1326                queue_buffer,
1327                queue_byte_offset,
1328                bytes_to_copy,
1329            );
1330
1331            // If headOfQueue’s byte length is bytesToCopy,
1332            if head_of_queue.byte_length == bytes_to_copy {
1333                // Remove queue[0].
1334                queue.pop_front().unwrap();
1335            } else {
1336                // Set headOfQueue’s byte offset to headOfQueue’s byte offset + bytesToCopy.
1337                head_of_queue.byte_offset += bytes_to_copy;
1338
1339                // Set headOfQueue’s byte length to headOfQueue’s byte length − bytesToCopy.
1340                head_of_queue.byte_length -= bytes_to_copy;
1341            }
1342
1343            // Set controller.[[queueTotalSize]] to controller.[[queueTotalSize]] − bytesToCopy.
1344            self.queue_total_size
1345                .set(self.queue_total_size.get() - (bytes_to_copy as f64));
1346
1347            // Perform ! ReadableByteStreamControllerFillHeadPullIntoDescriptor(
1348            // controller, bytesToCopy, pullIntoDescriptor).
1349            self.fill_head_pull_into_descriptor(bytes_to_copy as u64, pull_into_descriptor);
1350
1351            // Set totalBytesToCopyRemaining to totalBytesToCopyRemaining − bytesToCopy.
1352            total_bytes_to_copy_remaining -= bytes_to_copy;
1353        }
1354
1355        // If ready is false,
1356        if !ready {
1357            // Assert: controller.[[queueTotalSize]] is 0.
1358            assert!(self.queue_total_size.get() == 0.0);
1359
1360            // Assert: pullIntoDescriptor’s bytes filled > 0.
1361            assert!(pull_into_descriptor.bytes_filled.get() > 0);
1362
1363            // Assert: pullIntoDescriptor’s bytes filled < pullIntoDescriptor’s minimum fill.
1364            assert!(pull_into_descriptor.bytes_filled.get() < pull_into_descriptor.minimum_fill);
1365        }
1366
1367        // Return ready.
1368        ready
1369    }
1370
1371    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-fill-head-pull-into-descriptor>
1372    pub(crate) fn fill_head_pull_into_descriptor(
1373        &self,
1374        bytes_copied: u64,
1375        pull_into_descriptor: &PullIntoDescriptor,
1376    ) {
1377        // Assert: either controller.[[pendingPullIntos]] is empty,
1378        // or controller.[[pendingPullIntos]][0] is pullIntoDescriptor.
1379        {
1380            let pending_pull_intos = self.pending_pull_intos.borrow();
1381            assert!(
1382                pending_pull_intos.is_empty() ||
1383                    pending_pull_intos.first().unwrap() == pull_into_descriptor
1384            );
1385        }
1386
1387        // Assert: controller.[[byobRequest]] is null.
1388        assert!(self.byob_request.get().is_none());
1389
1390        // Set pullIntoDescriptor’s bytes filled to bytes filled + size.
1391        pull_into_descriptor
1392            .bytes_filled
1393            .set(pull_into_descriptor.bytes_filled.get() + bytes_copied);
1394    }
1395
1396    /// <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamcontrollerenqueuedetachedpullintotoqueue>
1397    pub(crate) fn enqueue_detached_pull_into_to_queue(&self, cx: &mut JSContext) -> Fallible<()> {
1398        // first_descriptor: &PullIntoDescriptor,
1399        let pending_pull_intos = self.pending_pull_intos.borrow();
1400        let first_descriptor = pending_pull_intos.first().unwrap();
1401
1402        // Assert: pullIntoDescriptor’s reader type is "none".
1403        assert!(first_descriptor.reader_type.is_none());
1404
1405        // If pullIntoDescriptor’s bytes filled > 0, perform ?
1406        // ReadableByteStreamControllerEnqueueClonedChunkToQueue(controller,
1407        // pullIntoDescriptor’s buffer, pullIntoDescriptor’s byte offset, pullIntoDescriptor’s bytes filled).
1408
1409        if first_descriptor.bytes_filled.get() > 0 {
1410            self.enqueue_cloned_chunk_to_queue(
1411                cx,
1412                &first_descriptor.buffer,
1413                first_descriptor.byte_offset,
1414                first_descriptor.bytes_filled.get(),
1415            )?;
1416        }
1417
1418        // needed to drop the borrow and avoid BorrowMutError
1419        drop(pending_pull_intos);
1420
1421        // Perform ! ReadableByteStreamControllerShiftPendingPullInto(controller).
1422        self.shift_pending_pull_into();
1423
1424        Ok(())
1425    }
1426
1427    /// <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamcontrollerenqueueclonedchunktoqueue>
1428    pub(crate) fn enqueue_cloned_chunk_to_queue(
1429        &self,
1430        cx: &mut JSContext,
1431        buffer: &HeapBufferSource<ArrayBufferU8>,
1432        byte_offset: u64,
1433        byte_length: u64,
1434    ) -> Fallible<()> {
1435        // Let cloneResult be CloneArrayBuffer(buffer, byteOffset, byteLength, %ArrayBuffer%).
1436        if let Ok(clone_result) =
1437            buffer.clone_array_buffer(cx, byte_offset as usize, byte_length as usize)
1438        {
1439            // Perform ! ReadableByteStreamControllerEnqueueChunkToQueue
1440            // (controller, cloneResult.[[Value]], 0, byteLength).
1441            self.enqueue_chunk_to_queue(clone_result, 0, byte_length as usize);
1442
1443            Ok(())
1444        } else {
1445            // If cloneResult is an abrupt completion,
1446
1447            // Perform ! ReadableByteStreamControllerError(controller, cloneResult.[[Value]]).
1448            rooted!(&in(cx) let mut rval = UndefinedValue());
1449            let error = Error::Type(c"can not clone array buffer".to_owned());
1450            error
1451                .clone()
1452                .to_jsval(cx, &self.global(), rval.handle_mut());
1453            self.error(cx, rval.handle());
1454
1455            // Return cloneResult.
1456            Err(error)
1457        }
1458    }
1459
1460    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-enqueue-chunk-to-queue>
1461    pub(crate) fn enqueue_chunk_to_queue(
1462        &self,
1463        buffer: RootedTraceableBox<HeapBufferSource<ArrayBufferU8>>,
1464        byte_offset: usize,
1465        byte_length: usize,
1466    ) {
1467        // Let entry be a new ReadableByteStreamQueueEntry object.
1468        // Append entry to controller.[[queue]].
1469        self.queue
1470            .borrow_mut()
1471            .push_back(QueueEntry::new(buffer, byte_offset, byte_length));
1472
1473        // Set controller.[[queueTotalSize]] to controller.[[queueTotalSize]] + byteLength.
1474        self.queue_total_size
1475            .set(self.queue_total_size.get() + byte_length as f64);
1476    }
1477
1478    pub(crate) fn in_memory(&self) -> bool {
1479        let Some(underlying_source) = self.underlying_source.get() else {
1480            return false;
1481        };
1482        underlying_source.in_memory()
1483    }
1484
1485    pub(crate) fn get_in_memory_bytes(&self, cx: &mut JSContext) -> Option<Vec<u8>> {
1486        let underlying_source = self.underlying_source.get()?;
1487        if !underlying_source.in_memory() {
1488            return None;
1489        }
1490
1491        self.queue.borrow().iter().try_fold(
1492            Vec::with_capacity(self.queue_total_size.get() as usize),
1493            |mut bytes, entry| {
1494                let mut chunk = vec![0; entry.byte_length];
1495                entry
1496                    .buffer
1497                    .copy_data_to(
1498                        cx,
1499                        &mut chunk,
1500                        entry.byte_offset,
1501                        entry.byte_offset + entry.byte_length,
1502                    )
1503                    .ok()?;
1504                bytes.extend(chunk);
1505                Some(bytes)
1506            },
1507        )
1508    }
1509
1510    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-shift-pending-pull-into>
1511    pub(crate) fn shift_pending_pull_into(&self) -> PullIntoDescriptor {
1512        // Assert: controller.[[byobRequest]] is null.
1513        assert!(self.byob_request.get().is_none());
1514
1515        // Let descriptor be controller.[[pendingPullIntos]][0].
1516        // Remove descriptor from controller.[[pendingPullIntos]].
1517        // Return descriptor.
1518        self.pending_pull_intos.borrow_mut().remove(0)
1519    }
1520
1521    /// <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamcontrollerprocessreadrequestsusingqueue>
1522    pub(crate) fn process_read_requests_using_queue(&self, cx: &mut JSContext) -> Fallible<()> {
1523        // Let reader be controller.[[stream]].[[reader]].
1524        // Assert: reader implements ReadableStreamDefaultReader.
1525        let reader = self.stream.get().unwrap().get_default_reader();
1526
1527        // Step 3
1528        reader.process_read_requests(cx, DomRoot::from_ref(self))
1529    }
1530
1531    /// <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamcontrollerfillreadrequestfromqueue>
1532    pub(crate) fn fill_read_request_from_queue(
1533        &self,
1534        cx: &mut JSContext,
1535        read_request: &ReadRequest,
1536    ) -> Fallible<()> {
1537        // Assert: controller.[[queueTotalSize]] > 0.
1538        assert!(self.queue_total_size.get() > 0.0);
1539        // Also assert that the queue has a non-zero length;
1540        assert!(!self.queue.borrow().is_empty());
1541
1542        // Let entry be controller.[[queue]][0].
1543        // Remove entry from controller.[[queue]].
1544        rooted!(&in(cx) let entry = self.remove_entry());
1545
1546        // Set controller.[[queueTotalSize]] to controller.[[queueTotalSize]] − entry’s byte length.
1547        self.queue_total_size
1548            .set(self.queue_total_size.get() - entry.byte_length as f64);
1549
1550        // Perform ! ReadableByteStreamControllerHandleQueueDrain(controller).
1551        self.handle_queue_drain(cx);
1552
1553        // Let view be ! Construct(%Uint8Array%, « entry’s buffer, entry’s byte offset, entry’s byte length »).
1554        let view = create_buffer_source_with_constructor(
1555            cx,
1556            &Constructor::Name(Type::Uint8),
1557            &entry.buffer,
1558            entry.byte_offset,
1559            entry.byte_length,
1560        )
1561        .expect("Construct Uint8Array failed");
1562
1563        // Perform readRequest’s chunk steps, given view.
1564        let result = RootedTraceableBox::new(Heap::default());
1565        rooted!(&in(cx) let mut view_value = UndefinedValue());
1566        view.get_buffer_view_value(cx, view_value.handle_mut());
1567        result.set(*view_value);
1568
1569        read_request.chunk_steps(cx, result, &self.global());
1570
1571        Ok(())
1572    }
1573
1574    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-handle-queue-drain>
1575    pub(crate) fn handle_queue_drain(&self, cx: &mut JSContext) {
1576        // Assert: controller.[[stream]].[[state]] is "readable".
1577        assert!(self.stream.get().unwrap().is_readable());
1578
1579        // If controller.[[queueTotalSize]] is 0 and controller.[[closeRequested]] is true,
1580        if self.queue_total_size.get() == 0.0 && self.close_requested.get() {
1581            // Perform ! ReadableByteStreamControllerClearAlgorithms(controller).
1582            self.clear_algorithms();
1583
1584            // Perform ! ReadableStreamClose(controller.[[stream]]).
1585            self.stream.get().unwrap().close(cx);
1586        } else {
1587            // Perform ! ReadableByteStreamControllerCallPullIfNeeded(controller).
1588            self.call_pull_if_needed(cx);
1589        }
1590    }
1591
1592    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-call-pull-if-needed>
1593    fn call_pull_if_needed(&self, cx: &mut JSContext) {
1594        // Let shouldPull be ! ReadableByteStreamControllerShouldCallPull(controller).
1595        let should_pull = self.should_call_pull();
1596        // If shouldPull is false, return.
1597        if !should_pull {
1598            return;
1599        }
1600
1601        // If controller.[[pulling]] is true,
1602        if self.pulling.get() {
1603            // Set controller.[[pullAgain]] to true.
1604            self.pull_again.set(true);
1605
1606            // Return.
1607            return;
1608        }
1609
1610        // Assert: controller.[[pullAgain]] is false.
1611        assert!(!self.pull_again.get());
1612
1613        // Set controller.[[pulling]] to true.
1614        self.pulling.set(true);
1615
1616        // Let pullPromise be the result of performing controller.[[pullAlgorithm]].
1617        // Continues into the resolve and reject handling of the native handler.
1618        let global = self.global();
1619        let rooted_controller = DomRoot::from_ref(self);
1620        let controller = Controller::ReadableByteStreamController(rooted_controller.clone());
1621
1622        if let Some(underlying_source) = self.underlying_source.get() {
1623            let handler = PromiseNativeHandler::new(
1624                cx,
1625                &global,
1626                Some(Box::new(PullAlgorithmFulfillmentHandler {
1627                    controller: Dom::from_ref(&rooted_controller),
1628                })),
1629                Some(Box::new(PullAlgorithmRejectionHandler {
1630                    controller: Dom::from_ref(&rooted_controller),
1631                })),
1632            );
1633
1634            let mut realm = enter_auto_realm(cx, &*global);
1635            let cx = &mut realm.current_realm();
1636
1637            let result = underlying_source
1638                .call_pull_algorithm(cx, controller)
1639                .unwrap_or_else(|| {
1640                    let promise = Promise::new_resolved(cx, &global, ());
1641                    Ok(promise)
1642                });
1643            let promise = result.unwrap_or_else(|error| {
1644                rooted!(&in(cx) let mut rval = UndefinedValue());
1645                // TODO: check if `self.global()` is the right globalscope.
1646                error.to_jsval(cx, &global, rval.handle_mut());
1647                Promise::new_rejected(cx, &global, rval.handle())
1648            });
1649            promise.append_native_handler(cx, &handler);
1650        }
1651    }
1652
1653    /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-should-call-pull>
1654    fn should_call_pull(&self) -> bool {
1655        // Let stream be controller.[[stream]].
1656        // Note: the spec does not assert that stream is not undefined here,
1657        // so we return false if it is.
1658        let stream = self.stream.get().unwrap();
1659
1660        // If stream.[[state]] is not "readable", return false.
1661        if !stream.is_readable() {
1662            return false;
1663        }
1664
1665        // If controller.[[closeRequested]] is true, return false.
1666        if self.close_requested.get() {
1667            return false;
1668        }
1669
1670        // If controller.[[started]] is false, return false.
1671        if !self.started.get() {
1672            return false;
1673        }
1674
1675        // If ! ReadableStreamHasDefaultReader(stream) is true and ! ReadableStreamGetNumReadRequests(stream) > 0
1676        // , return true.
1677        if stream.has_default_reader() && stream.get_num_read_requests() > 0 {
1678            return true;
1679        }
1680
1681        // If ! ReadableStreamHasBYOBReader(stream) is true and ! ReadableStreamGetNumReadIntoRequests(stream) > 0
1682        // , return true.
1683        if stream.has_byob_reader() && stream.get_num_read_into_requests() > 0 {
1684            return true;
1685        }
1686
1687        // Let desiredSize be ! ReadableByteStreamControllerGetDesiredSize(controller).
1688        let desired_size = self.get_desired_size();
1689
1690        // Assert: desiredSize is not null.
1691        assert!(desired_size.is_some());
1692
1693        // If desiredSize > 0, return true.
1694        if desired_size.unwrap() > 0. {
1695            return true;
1696        }
1697
1698        // Return false.
1699        false
1700    }
1701    /// <https://streams.spec.whatwg.org/#set-up-readable-byte-stream-controller>
1702    pub(crate) fn setup(
1703        &self,
1704        cx: &mut JSContext,
1705        global: &GlobalScope,
1706        stream: DomRoot<ReadableStream>,
1707    ) -> Fallible<()> {
1708        // Assert: stream.[[controller]] is undefined.
1709        stream.assert_no_controller();
1710
1711        // If autoAllocateChunkSize is not undefined,
1712        if self.auto_allocate_chunk_size.is_some() {
1713            // Assert: ! IsInteger(autoAllocateChunkSize) is true. Implicit
1714            // Assert: autoAllocateChunkSize is positive. (Implicit by type.)
1715        }
1716
1717        // Set controller.[[stream]] to stream.
1718        self.stream.set(Some(&stream));
1719
1720        // Set controller.[[pullAgain]] and controller.[[pulling]] to false.
1721        self.pull_again.set(false);
1722        self.pulling.set(false);
1723
1724        // Set controller.[[byobRequest]] to null.
1725        self.byob_request.set(None);
1726
1727        // Perform ! ResetQueue(controller).
1728        self.reset_queue();
1729
1730        // Set controller.[[closeRequested]] and controller.[[started]] to false.
1731        self.close_requested.set(false);
1732        self.started.set(false);
1733
1734        // Set controller.[[strategyHWM]] to highWaterMark.
1735        // Set controller.[[pullAlgorithm]] to pullAlgorithm.
1736        // Set controller.[[cancelAlgorithm]] to cancelAlgorithm.
1737        // Set controller.[[autoAllocateChunkSize]] to autoAllocateChunkSize.
1738        // Set controller.[[pendingPullIntos]] to a new empty list.
1739        // Note: the above steps are done in `new`.
1740
1741        // Set stream.[[controller]] to controller.
1742        let rooted_byte_controller = DomRoot::from_ref(self);
1743        stream.set_byte_controller(&rooted_byte_controller);
1744
1745        if let Some(underlying_source) = rooted_byte_controller.underlying_source.get() {
1746            // Let startResult be the result of performing startAlgorithm. (This might throw an exception.)
1747            let start_result = underlying_source
1748                .call_start_algorithm(
1749                    cx,
1750                    Controller::ReadableByteStreamController(rooted_byte_controller.clone()),
1751                )
1752                .unwrap_or_else(|| Ok(Promise::new_resolved(cx, global, ())));
1753
1754            // Let startPromise be a promise resolved with startResult.
1755            let start_promise = start_result?;
1756
1757            // Upon fulfillment of startPromise, Upon rejection of startPromise with reason r,
1758            let handler = PromiseNativeHandler::new(
1759                cx,
1760                global,
1761                Some(Box::new(StartAlgorithmFulfillmentHandler {
1762                    controller: Dom::from_ref(&rooted_byte_controller),
1763                })),
1764                Some(Box::new(StartAlgorithmRejectionHandler {
1765                    controller: Dom::from_ref(&rooted_byte_controller),
1766                })),
1767            );
1768            let mut realm = enter_auto_realm(cx, global);
1769            let cx = &mut realm.current_realm();
1770            start_promise.append_native_handler(cx, &handler);
1771        };
1772
1773        Ok(())
1774    }
1775
1776    // <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamcontroller-releasesteps
1777    pub(crate) fn perform_release_steps(&self) -> Fallible<()> {
1778        // If this.[[pendingPullIntos]] is not empty,
1779        let mut pending_pull_intos = self.pending_pull_intos.borrow_mut();
1780        if !pending_pull_intos.is_empty() {
1781            // Let firstPendingPullInto be this.[[pendingPullIntos]][0].
1782            let mut first_pending_pull_into = RootedTraceableBox::new(pending_pull_intos.remove(0));
1783
1784            // Set firstPendingPullInto’s reader type to "none".
1785            first_pending_pull_into.reader_type = None;
1786
1787            // Set this.[[pendingPullIntos]] to the list « firstPendingPullInto »
1788            pending_pull_intos.clear();
1789            pending_pull_intos.push(*first_pending_pull_into.into_box());
1790        }
1791        Ok(())
1792    }
1793
1794    /// <https://streams.spec.whatwg.org/#rbs-controller-private-cancel>
1795    pub(crate) fn perform_cancel_steps(
1796        &self,
1797        cx: &mut JSContext,
1798        global: &GlobalScope,
1799        reason: SafeHandleValue,
1800    ) -> Rc<Promise> {
1801        // Perform ! ReadableByteStreamControllerClearPendingPullIntos(this).
1802        self.clear_pending_pull_intos();
1803
1804        // Perform ! ResetQueue(this).
1805        self.reset_queue();
1806
1807        let underlying_source = self
1808            .underlying_source
1809            .get()
1810            .expect("Controller should have a source when the cancel steps are called into.");
1811
1812        // Let result be the result of performing this.[[cancelAlgorithm]], passing in reason.
1813        let result = underlying_source
1814            .call_cancel_algorithm(cx, global, reason)
1815            .unwrap_or_else(|| {
1816                let promise = Promise::new(cx, global);
1817                promise.resolve_native(cx, &());
1818                Ok(promise)
1819            });
1820
1821        let promise = result.unwrap_or_else(|error| {
1822            rooted!(&in(cx) let mut rval = UndefinedValue());
1823            error.to_jsval(cx, global, rval.handle_mut());
1824            let promise = Promise::new(cx, global);
1825            promise.reject_native(cx, &rval.handle());
1826            promise
1827        });
1828
1829        // Perform ! ReadableByteStreamControllerClearAlgorithms(this).
1830        self.clear_algorithms();
1831
1832        // Return result(the promise).
1833        promise
1834    }
1835
1836    /// <https://streams.spec.whatwg.org/#rbs-controller-private-pull>
1837    pub(crate) fn perform_pull_steps(&self, cx: &mut JSContext, read_request: &ReadRequest) {
1838        // Let stream be this.[[stream]].
1839        let stream = self.stream.get().unwrap();
1840
1841        // Assert: ! ReadableStreamHasDefaultReader(stream) is true.
1842        assert!(stream.has_default_reader());
1843
1844        // If this.[[queueTotalSize]] > 0,
1845        if self.queue_total_size.get() > 0.0 {
1846            // Assert: ! ReadableStreamGetNumReadRequests(stream) is 0.
1847            assert_eq!(stream.get_num_read_requests(), 0);
1848
1849            // Perform ! ReadableByteStreamControllerFillReadRequestFromQueue(this, readRequest).
1850            let _ = self.fill_read_request_from_queue(cx, read_request);
1851
1852            // Return.
1853            return;
1854        }
1855
1856        // Let autoAllocateChunkSize be this.[[autoAllocateChunkSize]].
1857        let auto_allocate_chunk_size = self.auto_allocate_chunk_size;
1858
1859        // If autoAllocateChunkSize is not undefined,
1860        if let Some(auto_allocate_chunk_size) = auto_allocate_chunk_size {
1861            // create_array_buffer_with_size
1862            // Let buffer be Construct(%ArrayBuffer%, « autoAllocateChunkSize »).
1863            match create_array_buffer_with_size(cx, auto_allocate_chunk_size as usize) {
1864                Ok(buffer) => {
1865                    // Let pullIntoDescriptor be a new pull-into descriptor with
1866                    // buffer buffer.[[Value]]
1867                    // buffer byte length autoAllocateChunkSize
1868                    // byte offset  0
1869                    // byte length  autoAllocateChunkSize
1870                    // bytes filled  0
1871                    // minimum fill 1
1872                    // element size 1
1873                    // view constructor %Uint8Array%
1874                    // reader type  "default"
1875
1876                    // Append pullIntoDescriptor to this.[[pendingPullIntos]].
1877                    self.pending_pull_intos
1878                        .borrow_mut()
1879                        .push(PullIntoDescriptor {
1880                            buffer: *buffer.into_box(),
1881                            buffer_byte_length: auto_allocate_chunk_size,
1882                            byte_length: auto_allocate_chunk_size,
1883                            byte_offset: 0,
1884                            bytes_filled: Cell::new(0),
1885                            minimum_fill: 1,
1886                            element_size: 1,
1887                            view_constructor: Constructor::Name(Type::Uint8),
1888                            reader_type: Some(ReaderType::Default),
1889                        });
1890                },
1891                Err(error) => {
1892                    // If buffer is an abrupt completion,
1893                    // Perform readRequest’s error steps, given buffer.[[Value]].
1894
1895                    rooted!(&in(cx) let mut rval = UndefinedValue());
1896                    error.to_jsval(cx, &self.global(), rval.handle_mut());
1897                    read_request.error_steps(cx, rval.handle());
1898
1899                    // Return.
1900                    return;
1901                },
1902            }
1903        }
1904
1905        // Perform ! ReadableStreamAddReadRequest(stream, readRequest).
1906        stream.add_read_request(read_request);
1907
1908        // Perform ! ReadableByteStreamControllerCallPullIfNeeded(this).
1909        self.call_pull_if_needed(cx);
1910    }
1911
1912    /// Setting the JS object after the heap has settled down.
1913    pub(crate) fn set_underlying_source_this_object(&self, this_object: HandleObject) {
1914        if let Some(underlying_source) = self.underlying_source.get() {
1915            underlying_source.set_underlying_source_this_object(this_object);
1916        }
1917    }
1918
1919    pub(crate) fn remove_entry(&self) -> QueueEntry {
1920        self.queue
1921            .borrow_mut()
1922            .pop_front()
1923            .expect("Reader must have read request when remove is called into.")
1924    }
1925
1926    pub(crate) fn get_queue_total_size(&self) -> f64 {
1927        self.queue_total_size.get()
1928    }
1929
1930    pub(crate) fn get_pending_pull_intos_size(&self) -> usize {
1931        self.pending_pull_intos.borrow().len()
1932    }
1933}
1934
1935impl ReadableByteStreamControllerMethods<crate::DomTypeHolder> for ReadableByteStreamController {
1936    /// <https://streams.spec.whatwg.org/#rbs-controller-byob-request>
1937    fn GetByobRequest(
1938        &self,
1939        cx: &mut js::context::JSContext,
1940    ) -> Fallible<Option<DomRoot<ReadableStreamBYOBRequest>>> {
1941        // Return ! ReadableByteStreamControllerGetBYOBRequest(this).
1942        self.get_byob_request(cx)
1943    }
1944
1945    /// <https://streams.spec.whatwg.org/#rbs-controller-desired-size>
1946    fn GetDesiredSize(&self) -> Option<f64> {
1947        // Return ! ReadableByteStreamControllerGetDesiredSize(this).
1948        self.get_desired_size()
1949    }
1950
1951    /// <https://streams.spec.whatwg.org/#rbs-controller-close>
1952    fn Close(&self, cx: &mut JSContext) -> Fallible<()> {
1953        // If this.[[closeRequested]] is true, throw a TypeError exception.
1954        if self.close_requested.get() {
1955            return Err(Error::Type(c"closeRequested is true".to_owned()));
1956        }
1957
1958        // If this.[[stream]].[[state]] is not "readable", throw a TypeError exception.
1959        if !self.stream.get().unwrap().is_readable() {
1960            return Err(Error::Type(c"stream is not readable".to_owned()));
1961        }
1962
1963        // Perform ? ReadableByteStreamControllerClose(this).
1964        self.close(cx)
1965    }
1966
1967    /// <https://streams.spec.whatwg.org/#rbs-controller-enqueue>
1968    fn Enqueue(
1969        &self,
1970        cx: &mut JSContext,
1971        chunk: js::gc::CustomAutoRooterGuard<js::typedarray::ArrayBufferView>,
1972    ) -> Fallible<()> {
1973        let chunk = HeapBufferSource::<ArrayBufferViewU8>::from_view(chunk);
1974
1975        // If chunk.[[ByteLength]] is 0, throw a TypeError exception.
1976        if chunk.byte_length() == 0 {
1977            return Err(Error::Type(c"chunk.ByteLength is 0".to_owned()));
1978        }
1979
1980        // If chunk.[[ViewedArrayBuffer]].[[ByteLength]] is 0, throw a TypeError exception.
1981        if chunk.viewed_buffer_array_byte_length(cx) == 0 {
1982            return Err(Error::Type(
1983                c"chunk.ViewedArrayBuffer.ByteLength is 0".to_owned(),
1984            ));
1985        }
1986
1987        // If this.[[closeRequested]] is true, throw a TypeError exception.
1988        if self.close_requested.get() {
1989            return Err(Error::Type(c"closeRequested is true".to_owned()));
1990        }
1991
1992        // If this.[[stream]].[[state]] is not "readable", throw a TypeError exception.
1993        if !self.stream.get().unwrap().is_readable() {
1994            return Err(Error::Type(c"stream is not readable".to_owned()));
1995        }
1996
1997        // Return ? ReadableByteStreamControllerEnqueue(this, chunk).
1998        self.enqueue(cx, chunk)
1999    }
2000
2001    /// <https://streams.spec.whatwg.org/#rbs-controller-error>
2002    fn Error(&self, cx: &mut JSContext, e: SafeHandleValue) -> Fallible<()> {
2003        // Perform ! ReadableByteStreamControllerError(this, e).
2004        self.error(cx, e);
2005        Ok(())
2006    }
2007}