script/dom/stream/readablebytestreamcontroller.rs
1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4
5use std::cell::Cell;
6use std::cmp::min;
7use std::collections::VecDeque;
8use std::rc::Rc;
9
10use dom_struct::dom_struct;
11use js::context::JSContext;
12use js::jsapi::{Heap, Type};
13use js::jsval::UndefinedValue;
14use js::realm::CurrentRealm;
15use js::rust::{HandleObject, HandleValue as SafeHandleValue, HandleValue};
16use js::typedarray::{ArrayBufferU8, ArrayBufferViewU8};
17use script_bindings::cell::DomRefCell;
18use script_bindings::reflector::{Reflector, reflect_dom_object};
19
20use super::readablestreambyobreader::ReadIntoRequest;
21use super::readablestreamdefaultreader::ReadRequest;
22use super::underlyingsourcecontainer::{UnderlyingSourceContainer, UnderlyingSourceType};
23use crate::dom::bindings::buffer_source::{
24 Constructor, HeapBufferSource, byte_size, create_array_buffer_with_size,
25 create_buffer_source_with_constructor,
26};
27use crate::dom::bindings::codegen::Bindings::ReadableByteStreamControllerBinding::ReadableByteStreamControllerMethods;
28use crate::dom::bindings::codegen::UnionTypes::ReadableStreamDefaultControllerOrReadableByteStreamController as Controller;
29use crate::dom::bindings::error::{Error, ErrorToJsval, Fallible};
30use crate::dom::bindings::reflector::DomGlobal;
31use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom};
32use crate::dom::bindings::trace::RootedTraceableBox;
33use crate::dom::globalscope::GlobalScope;
34use crate::dom::promise::Promise;
35use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
36use crate::dom::stream::readablestream::ReadableStream;
37use crate::dom::stream::readablestreambyobrequest::ReadableStreamBYOBRequest;
38use crate::realms::enter_auto_realm;
39use crate::script_runtime::CanGc;
40
41/// <https://streams.spec.whatwg.org/#readable-byte-stream-queue-entry>
42#[derive(JSTraceable, MallocSizeOf)]
43#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
44pub(crate) struct QueueEntry {
45 /// <https://streams.spec.whatwg.org/#readable-byte-stream-queue-entry-buffer>
46 #[ignore_malloc_size_of = "HeapBufferSource"]
47 buffer: HeapBufferSource<ArrayBufferU8>,
48 /// <https://streams.spec.whatwg.org/#readable-byte-stream-queue-entry-byte-offset>
49 byte_offset: usize,
50 /// <https://streams.spec.whatwg.org/#readable-byte-stream-queue-entry-byte-length>
51 byte_length: usize,
52}
53
54impl js::gc::Rootable for QueueEntry {}
55
56impl QueueEntry {
57 pub(crate) fn new(
58 buffer: RootedTraceableBox<HeapBufferSource<ArrayBufferU8>>,
59 byte_offset: usize,
60 byte_length: usize,
61 ) -> QueueEntry {
62 QueueEntry {
63 buffer: *buffer.into_box(),
64 byte_offset,
65 byte_length,
66 }
67 }
68}
69
70#[derive(Debug, Eq, JSTraceable, MallocSizeOf, PartialEq)]
71pub(crate) enum ReaderType {
72 /// <https://streams.spec.whatwg.org/#readablestreambyobreader>
73 Byob,
74 /// <https://streams.spec.whatwg.org/#readablestreamdefaultreader>
75 Default,
76}
77
78/// <https://streams.spec.whatwg.org/#pull-into-descriptor>
79#[derive(Eq, JSTraceable, MallocSizeOf, PartialEq)]
80#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
81pub(crate) struct PullIntoDescriptor {
82 #[ignore_malloc_size_of = "HeapBufferSource"]
83 /// <https://streams.spec.whatwg.org/#pull-into-descriptor-buffer>
84 buffer: HeapBufferSource<ArrayBufferU8>,
85 /// <https://streams.spec.whatwg.org/#pull-into-descriptor-buffer-byte-length>
86 buffer_byte_length: u64,
87 /// <https://streams.spec.whatwg.org/#pull-into-descriptor-byte-offset>
88 byte_offset: u64,
89 /// <https://streams.spec.whatwg.org/#pull-into-descriptor-byte-length>
90 byte_length: u64,
91 /// <https://streams.spec.whatwg.org/#pull-into-descriptor-bytes-filled>
92 bytes_filled: Cell<u64>,
93 /// <https://streams.spec.whatwg.org/#pull-into-descriptor-minimum-fill>
94 minimum_fill: u64,
95 /// <https://streams.spec.whatwg.org/#pull-into-descriptor-element-size>
96 element_size: u64,
97 /// <https://streams.spec.whatwg.org/#pull-into-descriptor-view-constructor>
98 view_constructor: Constructor,
99 /// <https://streams.spec.whatwg.org/#pull-into-descriptor-reader-type>
100 reader_type: Option<ReaderType>,
101}
102
103impl js::gc::Rootable for PullIntoDescriptor {}
104
105/// The fulfillment handler for
106/// <https://streams.spec.whatwg.org/#dom-underlyingsource-start>
107#[derive(Clone, JSTraceable, MallocSizeOf)]
108#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
109struct StartAlgorithmFulfillmentHandler {
110 controller: Dom<ReadableByteStreamController>,
111}
112
113impl Callback for StartAlgorithmFulfillmentHandler {
114 /// Continuation of <https://streams.spec.whatwg.org/#set-up-readable-byte-stream-controller>
115 /// Upon fulfillment of startPromise,
116 fn callback(&self, cx: &mut CurrentRealm, _v: HandleValue) {
117 // Set controller.[[started]] to true.
118 self.controller.started.set(true);
119
120 // Assert: controller.[[pulling]] is false.
121 assert!(!self.controller.pulling.get());
122
123 // Assert: controller.[[pullAgain]] is false.
124 assert!(!self.controller.pull_again.get());
125
126 // Perform ! ReadableByteStreamControllerCallPullIfNeeded(controller).
127 self.controller.call_pull_if_needed(cx);
128 }
129}
130
131/// The rejection handler for
132/// <https://streams.spec.whatwg.org/#dom-underlyingsource-start>
133#[derive(Clone, JSTraceable, MallocSizeOf)]
134#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
135struct StartAlgorithmRejectionHandler {
136 controller: Dom<ReadableByteStreamController>,
137}
138
139impl Callback for StartAlgorithmRejectionHandler {
140 /// Continuation of <https://streams.spec.whatwg.org/#set-up-readable-byte-stream-controller>
141 /// Upon rejection of startPromise with reason r,
142 fn callback(&self, cx: &mut CurrentRealm, v: HandleValue) {
143 // Perform ! ReadableByteStreamControllerError(controller, r).
144 self.controller.error(cx, v);
145 }
146}
147
148/// The fulfillment handler for
149/// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-call-pull-if-needed>
150#[derive(Clone, JSTraceable, MallocSizeOf)]
151#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
152struct PullAlgorithmFulfillmentHandler {
153 controller: Dom<ReadableByteStreamController>,
154}
155
156impl Callback for PullAlgorithmFulfillmentHandler {
157 /// Continuation of <https://streams.spec.whatwg.org/#readable-byte-stream-controller-call-pull-if-needed>
158 /// Upon fulfillment of pullPromise
159 fn callback(&self, cx: &mut CurrentRealm, _v: HandleValue) {
160 // Set controller.[[pulling]] to false.
161 self.controller.pulling.set(false);
162
163 // If controller.[[pullAgain]] is true,
164 if self.controller.pull_again.get() {
165 // Set controller.[[pullAgain]] to false.
166 self.controller.pull_again.set(false);
167
168 // Perform ! ReadableByteStreamControllerCallPullIfNeeded(controller).
169 self.controller.call_pull_if_needed(cx);
170 }
171 }
172}
173
174/// The rejection handler for
175/// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-call-pull-if-needed>
176#[derive(Clone, JSTraceable, MallocSizeOf)]
177#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
178struct PullAlgorithmRejectionHandler {
179 controller: Dom<ReadableByteStreamController>,
180}
181
182impl Callback for PullAlgorithmRejectionHandler {
183 /// Continuation of <https://streams.spec.whatwg.org/#readable-stream-byte-controller-call-pull-if-needed>
184 /// Upon rejection of pullPromise with reason e.
185 fn callback(&self, cx: &mut CurrentRealm, v: HandleValue) {
186 // Perform ! ReadableByteStreamControllerError(controller, e).
187 self.controller.error(cx, v);
188 }
189}
190
191/// <https://streams.spec.whatwg.org/#readablebytestreamcontroller>
192#[dom_struct]
193pub(crate) struct ReadableByteStreamController {
194 reflector_: Reflector,
195 /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-autoallocatechunksize>
196 auto_allocate_chunk_size: Option<u64>,
197 /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-stream>
198 stream: MutNullableDom<ReadableStream>,
199 /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-strategyhwm>
200 strategy_hwm: f64,
201 /// A mutable reference to the underlying source is used to implement these two
202 /// internal slots:
203 ///
204 /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-pullalgorithm>
205 /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-cancelalgorithm>
206 underlying_source: MutNullableDom<UnderlyingSourceContainer>,
207 /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-queue>
208 queue: DomRefCell<VecDeque<QueueEntry>>,
209 /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-queuetotalsize>
210 queue_total_size: Cell<f64>,
211 /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-byobrequest>
212 byob_request: MutNullableDom<ReadableStreamBYOBRequest>,
213 /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-pendingpullintos>
214 pending_pull_intos: DomRefCell<Vec<PullIntoDescriptor>>,
215 /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-closerequested>
216 close_requested: Cell<bool>,
217 /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-started>
218 started: Cell<bool>,
219 /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-pulling>
220 pulling: Cell<bool>,
221 /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-pullalgorithm>
222 pull_again: Cell<bool>,
223}
224
225impl ReadableByteStreamController {
226 fn new_inherited(
227 underlying_source_type: UnderlyingSourceType,
228 strategy_hwm: f64,
229 global: &GlobalScope,
230 can_gc: CanGc,
231 ) -> ReadableByteStreamController {
232 let underlying_source_container =
233 UnderlyingSourceContainer::new(global, underlying_source_type, can_gc);
234 let auto_allocate_chunk_size = underlying_source_container.auto_allocate_chunk_size();
235 ReadableByteStreamController {
236 reflector_: Reflector::new(),
237 byob_request: MutNullableDom::new(None),
238 stream: MutNullableDom::new(None),
239 underlying_source: MutNullableDom::new(Some(&*underlying_source_container)),
240 auto_allocate_chunk_size,
241 pending_pull_intos: DomRefCell::new(Vec::new()),
242 strategy_hwm,
243 close_requested: Default::default(),
244 queue: DomRefCell::new(Default::default()),
245 queue_total_size: Default::default(),
246 started: Default::default(),
247 pulling: Default::default(),
248 pull_again: Default::default(),
249 }
250 }
251
252 pub(crate) fn new(
253 underlying_source_type: UnderlyingSourceType,
254 strategy_hwm: f64,
255 global: &GlobalScope,
256 can_gc: CanGc,
257 ) -> DomRoot<ReadableByteStreamController> {
258 reflect_dom_object(
259 Box::new(ReadableByteStreamController::new_inherited(
260 underlying_source_type,
261 strategy_hwm,
262 global,
263 can_gc,
264 )),
265 global,
266 can_gc,
267 )
268 }
269
270 #[allow(dead_code)]
271 pub(crate) fn set_stream(&self, stream: &ReadableStream) {
272 self.stream.set(Some(stream))
273 }
274
275 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-pull-into>
276 pub(crate) fn perform_pull_into(
277 &self,
278 cx: &mut JSContext,
279 read_into_request: &ReadIntoRequest,
280 view: &HeapBufferSource<ArrayBufferViewU8>,
281 min: u64,
282 ) {
283 // Let stream be controller.[[stream]].
284 let stream = self.stream.get().unwrap();
285
286 // Let elementSize be 1.
287 let mut element_size = 1;
288
289 // Let ctor be %DataView%.
290 let mut ctor = Constructor::DataView;
291
292 // If view has a [[TypedArrayName]] internal slot (i.e., it is not a DataView),
293 if view.has_typed_array_name() {
294 // Set elementSize to the element size specified in the
295 // typed array constructors table for view.[[TypedArrayName]].
296 let view_typw = view.get_array_buffer_view_type();
297 element_size = byte_size(view_typw);
298
299 // Set ctor to the constructor specified in the typed array constructors table for view.[[TypedArrayName]].
300 ctor = Constructor::Name(view_typw);
301 }
302
303 // Let minimumFill be min × elementSize.
304 let minimum_fill = min * element_size;
305
306 // Assert: minimumFill ≥ 0 and minimumFill ≤ view.[[ByteLength]].
307 assert!(minimum_fill <= (view.byte_length() as u64));
308
309 // Assert: the remainder after dividing minimumFill by elementSize is 0.
310 assert_eq!(minimum_fill % element_size, 0);
311
312 // Let byteOffset be view.[[ByteOffset]].
313 let byte_offset = view.get_byte_offset();
314
315 // Let byteLength be view.[[ByteLength]].
316 let byte_length = view.byte_length();
317
318 // Let bufferResult be TransferArrayBuffer(view.[[ViewedArrayBuffer]]).
319 match view
320 .get_array_buffer_view_buffer(cx.into())
321 .transfer_array_buffer(cx.into())
322 {
323 Ok(buffer) => {
324 // Let buffer be bufferResult.[[Value]].
325 // Let pullIntoDescriptor be a new pull-into descriptor with
326 // buffer buffer
327 // buffer byte length buffer.[[ArrayBufferByteLength]]
328 // byte offset byteOffset
329 // byte length byteLength
330 // bytes filled 0
331 // minimum fill minimumFill
332 // element size elementSize
333 // view constructor ctor
334 // reader type "byob"
335 let buffer_byte_length = buffer.byte_length();
336 let pull_into_descriptor = RootedTraceableBox::new(PullIntoDescriptor {
337 buffer: *buffer.into_box(),
338 buffer_byte_length: buffer_byte_length as u64,
339 byte_offset: byte_offset as u64,
340 byte_length: byte_length as u64,
341 bytes_filled: Cell::new(0),
342 minimum_fill,
343 element_size,
344 view_constructor: ctor.clone(),
345 reader_type: Some(ReaderType::Byob),
346 });
347
348 // If controller.[[pendingPullIntos]] is not empty,
349 {
350 let mut pending_pull_intos = self.pending_pull_intos.borrow_mut();
351 if !pending_pull_intos.is_empty() {
352 // Append pullIntoDescriptor to controller.[[pendingPullIntos]].
353 pending_pull_intos.push(*pull_into_descriptor.into_box());
354
355 // Perform ! ReadableStreamAddReadIntoRequest(stream, readIntoRequest).
356 stream.add_read_into_request(read_into_request);
357
358 // Return.
359 return;
360 }
361 }
362
363 // If stream.[[state]] is "closed",
364 if stream.is_closed() {
365 // Let emptyView be ! Construct(ctor, « pullIntoDescriptor’s buffer,
366 // pullIntoDescriptor’s byte offset, 0 »).
367 if let Ok(empty_view) = create_buffer_source_with_constructor(
368 cx,
369 &ctor,
370 &pull_into_descriptor.buffer,
371 pull_into_descriptor.byte_offset as usize,
372 0,
373 ) {
374 // Perform readIntoRequest’s close steps, given emptyView.
375 let result = RootedTraceableBox::new(Heap::default());
376 rooted!(&in(cx) let mut view_value = UndefinedValue());
377 empty_view.get_buffer_view_value(cx.into(), view_value.handle_mut());
378 result.set(*view_value);
379
380 read_into_request.close_steps(cx, Some(result));
381
382 // Return.
383 return;
384 } else {
385 return;
386 }
387 }
388
389 // If controller.[[queueTotalSize]] > 0,
390 if self.queue_total_size.get() > 0.0 {
391 // If ! ReadableByteStreamControllerFillPullIntoDescriptorFromQueue(
392 // controller, pullIntoDescriptor) is true,
393 if self.fill_pull_into_descriptor_from_queue(cx, &pull_into_descriptor) {
394 // Let filledView be ! ReadableByteStreamControllerConvertPullIntoDescriptor(
395 // pullIntoDescriptor).
396 if let Ok(filled_view) =
397 self.convert_pull_into_descriptor(cx, &pull_into_descriptor)
398 {
399 // Perform ! ReadableByteStreamControllerHandleQueueDrain(controller).
400 self.handle_queue_drain(cx);
401
402 // Perform readIntoRequest’s chunk steps, given filledView.
403 let result = RootedTraceableBox::new(Heap::default());
404 rooted!(&in(cx) let mut view_value = UndefinedValue());
405 filled_view.get_buffer_view_value(cx.into(), view_value.handle_mut());
406 result.set(*view_value);
407 read_into_request.chunk_steps(result, CanGc::from_cx(cx));
408
409 // Return.
410 return;
411 } else {
412 return;
413 }
414 }
415
416 // If controller.[[closeRequested]] is true,
417 if self.close_requested.get() {
418 // Let e be a new TypeError exception.
419 rooted!(&in(cx) let mut error = UndefinedValue());
420 Error::Type(c"close requested".to_owned()).to_jsval(
421 cx.into(),
422 &self.global(),
423 error.handle_mut(),
424 CanGc::from_cx(cx),
425 );
426
427 // Perform ! ReadableByteStreamControllerError(controller, e).
428 self.error(cx, error.handle());
429
430 // Perform readIntoRequest’s error steps, given e.
431 read_into_request.error_steps(error.handle(), CanGc::from_cx(cx));
432
433 // Return.
434 return;
435 }
436 }
437
438 // Append pullIntoDescriptor to controller.[[pendingPullIntos]].
439 {
440 self.pending_pull_intos
441 .borrow_mut()
442 .push(*pull_into_descriptor.into_box());
443 }
444 // Perform ! ReadableStreamAddReadIntoRequest(stream, readIntoRequest).
445 stream.add_read_into_request(read_into_request);
446
447 // Perform ! ReadableByteStreamControllerCallPullIfNeeded(controller).
448 self.call_pull_if_needed(cx);
449 },
450 Err(error) => {
451 // If bufferResult is an abrupt completion,
452
453 // Perform readIntoRequest’s error steps, given bufferResult.[[Value]].
454 rooted!(&in(cx) let mut rval = UndefinedValue());
455 error.to_jsval(
456 cx.into(),
457 &self.global(),
458 rval.handle_mut(),
459 CanGc::from_cx(cx),
460 );
461 read_into_request.error_steps(rval.handle(), CanGc::from_cx(cx));
462
463 // Return.
464 },
465 }
466 }
467
468 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-respond>
469 pub(crate) fn respond(&self, cx: &mut JSContext, bytes_written: u64) -> Fallible<()> {
470 {
471 // Assert: controller.[[pendingPullIntos]] is not empty.
472 let mut pending_pull_intos = self.pending_pull_intos.borrow_mut();
473 assert!(!pending_pull_intos.is_empty());
474
475 // Let firstDescriptor be controller.[[pendingPullIntos]][0].
476 let first_descriptor = pending_pull_intos.first_mut().unwrap();
477
478 // Let state be controller.[[stream]].[[state]].
479 let stream = self.stream.get().unwrap();
480
481 // If state is "closed",
482 if stream.is_closed() {
483 // If bytesWritten is not 0, throw a TypeError exception.
484 if bytes_written != 0 {
485 return Err(Error::Type(
486 c"bytesWritten not zero on closed stream".to_owned(),
487 ));
488 }
489 } else {
490 // Assert: state is "readable".
491 assert!(stream.is_readable());
492
493 // If bytesWritten is 0, throw a TypeError exception.
494 if bytes_written == 0 {
495 return Err(Error::Type(c"bytesWritten is 0".to_owned()));
496 }
497
498 // If firstDescriptor’s bytes filled + bytesWritten > firstDescriptor’s byte length,
499 // throw a RangeError exception.
500 if first_descriptor.bytes_filled.get() + bytes_written >
501 first_descriptor.byte_length
502 {
503 return Err(Error::Range(
504 c"bytes filled + bytesWritten > byte length".to_owned(),
505 ));
506 }
507 }
508
509 // Set firstDescriptor’s buffer to ! TransferArrayBuffer(firstDescriptor’s buffer).
510 first_descriptor.buffer = *first_descriptor
511 .buffer
512 .transfer_array_buffer(cx.into())
513 .expect("TransferArrayBuffer failed")
514 .into_box();
515 }
516
517 // Perform ? ReadableByteStreamControllerRespondInternal(controller, bytesWritten).
518 self.respond_internal(cx, bytes_written)
519 }
520
521 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-respond-internal>
522 fn respond_internal(&self, cx: &mut JSContext, bytes_written: u64) -> Fallible<()> {
523 {
524 // Let firstDescriptor be controller.[[pendingPullIntos]][0].
525 let pending_pull_intos = self.pending_pull_intos.borrow();
526 let first_descriptor = pending_pull_intos.first().unwrap();
527
528 // Assert: ! CanTransferArrayBuffer(firstDescriptor’s buffer) is true
529 assert!(first_descriptor.buffer.can_transfer_array_buffer(cx.into()));
530 }
531
532 // Perform ! ReadableByteStreamControllerInvalidateBYOBRequest(controller).
533 self.invalidate_byob_request();
534
535 // Let state be controller.[[stream]].[[state]].
536 let stream = self.stream.get().unwrap();
537
538 // If state is "closed",
539 if stream.is_closed() {
540 // Assert: bytesWritten is 0.
541 assert_eq!(bytes_written, 0);
542
543 // Perform ! ReadableByteStreamControllerRespondInClosedState(controller, firstDescriptor).
544 self.respond_in_closed_state(cx)
545 .expect("respond_in_closed_state failed");
546 } else {
547 // Assert: state is "readable".
548 assert!(stream.is_readable());
549
550 // Assert: bytesWritten > 0.
551 assert!(bytes_written > 0);
552
553 // Perform ? ReadableByteStreamControllerRespondInReadableState(controller, bytesWritten, firstDescriptor).
554 self.respond_in_readable_state(cx, bytes_written)?;
555 }
556
557 // Perform ! ReadableByteStreamControllerCallPullIfNeeded(controller).
558 self.call_pull_if_needed(cx);
559
560 Ok(())
561 }
562
563 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-respond-in-closed-state>
564 fn respond_in_closed_state(&self, cx: &mut JSContext) -> Fallible<()> {
565 let pending_pull_intos = self.pending_pull_intos.borrow();
566 let first_descriptor = pending_pull_intos.first().unwrap();
567
568 // Assert: the remainder after dividing firstDescriptor’s bytes filled
569 // by firstDescriptor’s element size is 0.
570 assert_eq!(
571 first_descriptor.bytes_filled.get() % first_descriptor.element_size,
572 0
573 );
574
575 // If firstDescriptor’s reader type is "none",
576 // perform ! ReadableByteStreamControllerShiftPendingPullInto(controller).
577 let reader_type = first_descriptor.reader_type.is_none();
578
579 // needed to drop the borrow and avoid BorrowMutError
580 drop(pending_pull_intos);
581
582 if reader_type {
583 self.shift_pending_pull_into();
584 }
585
586 // Let stream be controller.[[stream]].
587 let stream = self.stream.get().unwrap();
588
589 // If ! ReadableStreamHasBYOBReader(stream) is true,
590 if stream.has_byob_reader() {
591 // Let filledPullIntos be a new empty list.
592 rooted!(&in(cx) let mut filled_pull_intos = Vec::new());
593
594 // While filledPullIntos’s size < ! ReadableStreamGetNumReadIntoRequests(stream),
595 while filled_pull_intos.len() < stream.get_num_read_into_requests() {
596 // Let pullIntoDescriptor be ! ReadableByteStreamControllerShiftPendingPullInto(controller).
597 // Append pullIntoDescriptor to filledPullIntos.
598 filled_pull_intos.push(self.shift_pending_pull_into());
599 }
600
601 // For each filledPullInto of filledPullIntos,
602 for filled_pull_into in &*filled_pull_intos {
603 // Perform ! ReadableByteStreamControllerCommitPullIntoDescriptor(stream, filledPullInto).
604 self.commit_pull_into_descriptor(cx, filled_pull_into)
605 .expect("commit_pull_into_descriptor failed");
606 }
607 }
608
609 Ok(())
610 }
611
612 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-respond-in-readable-state>
613 fn respond_in_readable_state(&self, cx: &mut JSContext, bytes_written: u64) -> Fallible<()> {
614 let pending_pull_intos = self.pending_pull_intos.borrow();
615 let first_descriptor = pending_pull_intos.first().unwrap();
616
617 // Assert: pullIntoDescriptor’s bytes filled + bytesWritten ≤ pullIntoDescriptor’s byte length.
618 assert!(
619 first_descriptor.bytes_filled.get() + bytes_written <= first_descriptor.byte_length
620 );
621
622 // Perform ! ReadableByteStreamControllerFillHeadPullIntoDescriptor(
623 // controller, bytesWritten, pullIntoDescriptor).
624 self.fill_head_pull_into_descriptor(bytes_written, first_descriptor);
625
626 // If pullIntoDescriptor’s reader type is "none",
627 if first_descriptor.reader_type.is_none() {
628 // needed to drop the borrow and avoid BorrowMutError
629 drop(pending_pull_intos);
630
631 // Perform ? ReadableByteStreamControllerEnqueueDetachedPullIntoToQueue(controller, pullIntoDescriptor).
632 self.enqueue_detached_pull_into_to_queue(cx)?;
633
634 // Let filledPullIntos be the result of performing
635 // ! ReadableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(controller).
636 rooted!(&in(cx) let filled_pull_intos = self.process_pull_into_descriptors_using_queue(cx));
637
638 // For each filledPullInto of filledPullIntos,
639 for filled_pull_into in &*filled_pull_intos {
640 // Perform ! ReadableByteStreamControllerCommitPullIntoDescriptor(controller.[[stream]]
641 // , filledPullInto).
642 self.commit_pull_into_descriptor(cx, filled_pull_into)
643 .expect("commit_pull_into_descriptor failed");
644 }
645
646 // Return.
647 return Ok(());
648 }
649
650 // If pullIntoDescriptor’s bytes filled < pullIntoDescriptor’s minimum fill, return.
651 if first_descriptor.bytes_filled.get() < first_descriptor.minimum_fill {
652 return Ok(());
653 }
654
655 // needed to drop the borrow and avoid BorrowMutError
656 drop(pending_pull_intos);
657
658 // Perform ! ReadableByteStreamControllerShiftPendingPullInto(controller).
659 rooted!(&in(cx) let pull_into_descriptor = self.shift_pending_pull_into());
660
661 // Let remainderSize be the remainder after dividing pullIntoDescriptor’s bytes
662 // filled by pullIntoDescriptor’s element size.
663 let remainder_size =
664 pull_into_descriptor.bytes_filled.get() % pull_into_descriptor.element_size;
665
666 // If remainderSize > 0,
667 if remainder_size > 0 {
668 // Let end be pullIntoDescriptor’s byte offset + pullIntoDescriptor’s bytes filled.
669 let end = pull_into_descriptor.byte_offset + pull_into_descriptor.bytes_filled.get();
670
671 // Perform ? ReadableByteStreamControllerEnqueueClonedChunkToQueue(controller,
672 // pullIntoDescriptor’s buffer, end − remainderSize, remainderSize).
673 self.enqueue_cloned_chunk_to_queue(
674 cx,
675 &pull_into_descriptor.buffer,
676 end - remainder_size,
677 remainder_size,
678 )?;
679 }
680
681 // Set pullIntoDescriptor’s bytes filled to pullIntoDescriptor’s bytes filled − remainderSize.
682 pull_into_descriptor
683 .bytes_filled
684 .set(pull_into_descriptor.bytes_filled.get() - remainder_size);
685
686 // Let filledPullIntos be the result of performing
687 // ! ReadableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(controller).
688 rooted!(&in(cx) let filled_pull_intos = self.process_pull_into_descriptors_using_queue(cx));
689
690 // Perform ! ReadableByteStreamControllerCommitPullIntoDescriptor(controller.[[stream]], pullIntoDescriptor).
691 self.commit_pull_into_descriptor(cx, &pull_into_descriptor)
692 .expect("commit_pull_into_descriptor failed");
693
694 // For each filledPullInto of filledPullIntos,
695 for filled_pull_into in &*filled_pull_intos {
696 // Perform ! ReadableByteStreamControllerCommitPullIntoDescriptor(controller.[[stream]], filledPullInto).
697 self.commit_pull_into_descriptor(cx, filled_pull_into)
698 .expect("commit_pull_into_descriptor failed");
699 }
700
701 Ok(())
702 }
703
704 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-respond-with-new-view>
705 pub(crate) fn respond_with_new_view(
706 &self,
707 cx: &mut JSContext,
708 view: &HeapBufferSource<ArrayBufferViewU8>,
709 ) -> Fallible<()> {
710 let view_byte_length;
711 {
712 // Assert: controller.[[pendingPullIntos]] is not empty.
713 let mut pending_pull_intos = self.pending_pull_intos.borrow_mut();
714 assert!(!pending_pull_intos.is_empty());
715
716 // Assert: ! IsDetachedBuffer(view.[[ViewedArrayBuffer]]) is false.
717 assert!(!view.is_detached_buffer(cx.into()));
718
719 // Let firstDescriptor be controller.[[pendingPullIntos]][0].
720 let first_descriptor = pending_pull_intos.first_mut().unwrap();
721
722 // Let state be controller.[[stream]].[[state]].
723 let stream = self.stream.get().unwrap();
724
725 // If state is "closed",
726 if stream.is_closed() {
727 // If view.[[ByteLength]] is not 0, throw a TypeError exception.
728 if view.byte_length() != 0 {
729 return Err(Error::Type(c"view byte length is not 0".to_owned()));
730 }
731 } else {
732 // Assert: state is "readable".
733 assert!(stream.is_readable());
734
735 // If view.[[ByteLength]] is 0, throw a TypeError exception.
736 if view.byte_length() == 0 {
737 return Err(Error::Type(c"view byte length is 0".to_owned()));
738 }
739 }
740
741 // If firstDescriptor’s byte offset + firstDescriptor’ bytes filled is not view.[[ByteOffset]],
742 // throw a RangeError exception.
743 if first_descriptor.byte_offset + first_descriptor.bytes_filled.get() !=
744 (view.get_byte_offset() as u64)
745 {
746 return Err(Error::Range(
747 c"firstDescriptor's byte offset + bytes filled is not view byte offset"
748 .to_owned(),
749 ));
750 }
751
752 // If firstDescriptor’s buffer byte length is not view.[[ViewedArrayBuffer]].[[ByteLength]],
753 // throw a RangeError exception.
754 if first_descriptor.buffer_byte_length !=
755 (view.viewed_buffer_array_byte_length(cx.into()) as u64)
756 {
757 return Err(Error::Range(
758 c"firstDescriptor's buffer byte length is not view viewed buffer array byte length"
759 .to_owned(),
760 ));
761 }
762
763 // If firstDescriptor’s bytes filled + view.[[ByteLength]] > firstDescriptor’s byte length,
764 // throw a RangeError exception.
765 if first_descriptor.bytes_filled.get() + (view.byte_length()) as u64 >
766 first_descriptor.byte_length
767 {
768 return Err(Error::Range(
769 c"bytes filled + view byte length > byte length".to_owned(),
770 ));
771 }
772
773 // Let viewByteLength be view.[[ByteLength]].
774 view_byte_length = view.byte_length();
775
776 // Set firstDescriptor’s buffer to ? TransferArrayBuffer(view.[[ViewedArrayBuffer]]).
777 first_descriptor.buffer = *view
778 .get_array_buffer_view_buffer(cx.into())
779 .transfer_array_buffer(cx.into())?
780 .into_box();
781 }
782
783 // Perform ? ReadableByteStreamControllerRespondInternal(controller, viewByteLength).
784 self.respond_internal(cx, view_byte_length as u64)
785 }
786
787 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-get-desired-size>
788 pub(crate) fn get_desired_size(&self) -> Option<f64> {
789 // Let state be controller.[[stream]].[[state]].
790 let stream = self.stream.get()?;
791
792 // If state is "errored", return null.
793 if stream.is_errored() {
794 return None;
795 }
796
797 // If state is "closed", return 0.
798 if stream.is_closed() {
799 return Some(0.0);
800 }
801
802 // Return controller.[[strategyHWM]] − controller.[[queueTotalSize]].
803 Some(self.strategy_hwm - self.queue_total_size.get())
804 }
805
806 /// <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamcontrollergetbyobrequest>
807 pub(crate) fn get_byob_request(
808 &self,
809 cx: &mut js::context::JSContext,
810 ) -> Fallible<Option<DomRoot<ReadableStreamBYOBRequest>>> {
811 // If controller.[[byobRequest]] is null and controller.[[pendingPullIntos]] is not empty,
812 let pending_pull_intos = self.pending_pull_intos.borrow();
813 if self.byob_request.get().is_none() && !pending_pull_intos.is_empty() {
814 // Let firstDescriptor be controller.[[pendingPullIntos]][0].
815 let first_descriptor = pending_pull_intos.first().unwrap();
816 // Let view be ! Construct(%Uint8Array%, « firstDescriptor’s buffer,
817 // firstDescriptor’s byte offset + firstDescriptor’s bytes filled,
818 // firstDescriptor’s byte length − firstDescriptor’s bytes filled »).
819
820 let byte_offset = first_descriptor.byte_offset + first_descriptor.bytes_filled.get();
821 let byte_length = first_descriptor.byte_length - first_descriptor.bytes_filled.get();
822
823 let view = create_buffer_source_with_constructor(
824 cx,
825 &Constructor::Name(Type::Uint8),
826 &first_descriptor.buffer,
827 byte_offset as usize,
828 byte_length as usize,
829 )
830 .expect("Construct Uint8Array failed");
831
832 // Let byobRequest be a new ReadableStreamBYOBRequest.
833 let byob_request = ReadableStreamBYOBRequest::new(&self.global(), CanGc::from_cx(cx));
834
835 // Set byobRequest.[[controller]] to controller.
836 byob_request.set_controller(Some(&DomRoot::from_ref(self)));
837
838 // Set byobRequest.[[view]] to view.
839 byob_request.set_view(Some(view));
840
841 // Set controller.[[byobRequest]] to byobRequest.
842 self.byob_request.set(Some(&byob_request));
843 }
844
845 // Return controller.[[byobRequest]].
846 Ok(self.byob_request.get())
847 }
848
849 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-close>
850 pub(crate) fn close(&self, cx: &mut JSContext) -> Fallible<()> {
851 // Let stream be controller.[[stream]].
852 let stream = self.stream.get().unwrap();
853
854 // If controller.[[closeRequested]] is true or stream.[[state]] is not "readable", return.
855 if self.close_requested.get() || !stream.is_readable() {
856 return Ok(());
857 }
858
859 // If controller.[[queueTotalSize]] > 0,
860 if self.queue_total_size.get() > 0.0 {
861 // Set controller.[[closeRequested]] to true.
862 self.close_requested.set(true);
863 // Return.
864 return Ok(());
865 }
866
867 {
868 // If controller.[[pendingPullIntos]] is not empty,
869 let pending_pull_intos = self.pending_pull_intos.borrow();
870 if !pending_pull_intos.is_empty() {
871 // Let firstPendingPullInto be controller.[[pendingPullIntos]][0].
872 let first_pending_pull_into = pending_pull_intos.first().unwrap();
873
874 // If the remainder after dividing firstPendingPullInto’s bytes filled by
875 // firstPendingPullInto’s element size is not 0,
876 if !first_pending_pull_into
877 .bytes_filled
878 .get()
879 .is_multiple_of(first_pending_pull_into.element_size)
880 {
881 // needed to drop the borrow and avoid BorrowMutError
882 drop(pending_pull_intos);
883
884 // Let e be a new TypeError exception.
885 let e = Error::Type(
886 c"remainder after dividing firstPendingPullInto's bytes
887 filled by firstPendingPullInto's element size is not 0"
888 .to_owned(),
889 );
890
891 // Perform ! ReadableByteStreamControllerError(controller, e).
892 rooted!(&in(cx) let mut error = UndefinedValue());
893 e.clone().to_jsval(
894 cx.into(),
895 &self.global(),
896 error.handle_mut(),
897 CanGc::from_cx(cx),
898 );
899 self.error(cx, error.handle());
900
901 // Throw e.
902 return Err(e);
903 }
904 }
905 }
906
907 // Perform ! ReadableByteStreamControllerClearAlgorithms(controller).
908 self.clear_algorithms();
909
910 // Perform ! ReadableStreamClose(stream).
911 stream.close(cx);
912 Ok(())
913 }
914
915 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-error>
916 pub(crate) fn error(&self, cx: &mut JSContext, e: SafeHandleValue) {
917 // Let stream be controller.[[stream]].
918 let stream = self.stream.get().unwrap();
919
920 // If stream.[[state]] is not "readable", return.
921 if !stream.is_readable() {
922 return;
923 }
924
925 // Perform ! ReadableByteStreamControllerClearPendingPullIntos(controller).
926 self.clear_pending_pull_intos();
927
928 // Perform ! ResetQueue(controller).
929 self.reset_queue();
930
931 // Perform ! ReadableByteStreamControllerClearAlgorithms(controller).
932 self.clear_algorithms();
933
934 // Perform ! ReadableStreamError(stream, e).
935 stream.error(cx, e);
936 }
937
938 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-clear-algorithms>
939 fn clear_algorithms(&self) {
940 // Set controller.[[pullAlgorithm]] to undefined.
941 // Set controller.[[cancelAlgorithm]] to undefined.
942 self.underlying_source.set(None);
943 }
944
945 /// <https://streams.spec.whatwg.org/#reset-queue>
946 pub(crate) fn reset_queue(&self) {
947 // Assert: container has [[queue]] and [[queueTotalSize]] internal slots.
948
949 // Set container.[[queue]] to a new empty list.
950 self.queue.borrow_mut().clear();
951
952 // Set container.[[queueTotalSize]] to 0.
953 self.queue_total_size.set(0.0);
954 }
955
956 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-clear-pending-pull-intos>
957 pub(crate) fn clear_pending_pull_intos(&self) {
958 // Perform ! ReadableByteStreamControllerInvalidateBYOBRequest(controller).
959 self.invalidate_byob_request();
960
961 // Set controller.[[pendingPullIntos]] to a new empty list.
962 self.pending_pull_intos.borrow_mut().clear();
963 }
964
965 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-invalidate-byob-request>
966 pub(crate) fn invalidate_byob_request(&self) {
967 if let Some(byob_request) = self.byob_request.get() {
968 // Set controller.[[byobRequest]].[[controller]] to undefined.
969 byob_request.set_controller(None);
970
971 // Set controller.[[byobRequest]].[[view]] to null.
972 byob_request.set_view(None);
973
974 // Set controller.[[byobRequest]] to null.
975 self.byob_request.set(None);
976 }
977 // If controller.[[byobRequest]] is null, return.
978 }
979
980 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-enqueue>
981 pub(crate) fn enqueue(
982 &self,
983 cx: &mut JSContext,
984 chunk: RootedTraceableBox<HeapBufferSource<ArrayBufferViewU8>>,
985 ) -> Fallible<()> {
986 // Let stream be controller.[[stream]].
987 let stream = self.stream.get().unwrap();
988
989 // If controller.[[closeRequested]] is true or stream.[[state]] is not "readable", return.
990 if self.close_requested.get() || !stream.is_readable() {
991 return Ok(());
992 }
993
994 // Let buffer be chunk.[[ViewedArrayBuffer]].
995 let buffer = chunk.get_array_buffer_view_buffer(cx.into());
996
997 // Let byteOffset be chunk.[[ByteOffset]].
998 let byte_offset = chunk.get_byte_offset();
999
1000 // Let byteLength be chunk.[[ByteLength]].
1001 let byte_length = chunk.byte_length();
1002
1003 // If ! IsDetachedBuffer(buffer) is true, throw a TypeError exception.
1004 if buffer.is_detached_buffer(cx.into()) {
1005 return Err(Error::Type(c"buffer is detached".to_owned()));
1006 }
1007
1008 // Let transferredBuffer be ? TransferArrayBuffer(buffer).
1009 let transferred_buffer = buffer.transfer_array_buffer(cx.into())?;
1010
1011 // If controller.[[pendingPullIntos]] is not empty,
1012 {
1013 let mut pending_pull_intos = self.pending_pull_intos.borrow_mut();
1014 if !pending_pull_intos.is_empty() {
1015 // Let firstPendingPullInto be controller.[[pendingPullIntos]][0].
1016 let first_descriptor = pending_pull_intos.first_mut().unwrap();
1017 // If ! IsDetachedBuffer(firstPendingPullInto’s buffer) is true, throw a TypeError exception.
1018 if first_descriptor.buffer.is_detached_buffer(cx.into()) {
1019 return Err(Error::Type(c"buffer is detached".to_owned()));
1020 }
1021
1022 // Perform ! ReadableByteStreamControllerInvalidateBYOBRequest(controller).
1023 self.invalidate_byob_request();
1024
1025 // Set firstPendingPullInto’s buffer to ! TransferArrayBuffer(firstPendingPullInto’s buffer).
1026 first_descriptor.buffer = *first_descriptor
1027 .buffer
1028 .transfer_array_buffer(cx.into())
1029 .expect("TransferArrayBuffer failed")
1030 .into_box();
1031
1032 // If firstPendingPullInto’s reader type is "none",
1033 if first_descriptor.reader_type.is_none() {
1034 // needed to drop the borrow and avoid BorrowMutError
1035 drop(pending_pull_intos);
1036
1037 // perform ? ReadableByteStreamControllerEnqueueDetachedPullIntoToQueue(
1038 // controller, firstPendingPullInto).
1039 self.enqueue_detached_pull_into_to_queue(cx)?;
1040 }
1041 }
1042 }
1043
1044 // If ! ReadableStreamHasDefaultReader(stream) is true,
1045 if stream.has_default_reader() {
1046 // Perform ! ReadableByteStreamControllerProcessReadRequestsUsingQueue(controller).
1047 self.process_read_requests_using_queue(cx)
1048 .expect("process_read_requests_using_queue failed");
1049
1050 // If ! ReadableStreamGetNumReadRequests(stream) is 0,
1051 if stream.get_num_read_requests() == 0 {
1052 // Assert: controller.[[pendingPullIntos]] is empty.
1053 {
1054 assert!(self.pending_pull_intos.borrow().is_empty());
1055 }
1056
1057 // Perform ! ReadableByteStreamControllerEnqueueChunkToQueue(
1058 // controller, transferredBuffer, byteOffset, byteLength).
1059 self.enqueue_chunk_to_queue(transferred_buffer, byte_offset, byte_length);
1060 } else {
1061 // Assert: controller.[[queue]] is empty.
1062 assert!(self.queue.borrow().is_empty());
1063
1064 // If controller.[[pendingPullIntos]] is not empty,
1065
1066 let pending_pull_intos = self.pending_pull_intos.borrow();
1067 if !pending_pull_intos.is_empty() {
1068 // Assert: controller.[[pendingPullIntos]][0]'s reader type is "default".
1069 assert!(matches!(
1070 pending_pull_intos.first().unwrap().reader_type,
1071 Some(ReaderType::Default)
1072 ));
1073
1074 // needed to drop the borrow and avoid BorrowMutError
1075 drop(pending_pull_intos);
1076
1077 // Perform ! ReadableByteStreamControllerShiftPendingPullInto(controller).
1078 self.shift_pending_pull_into();
1079 }
1080
1081 // Let transferredView be ! Construct(%Uint8Array%, « transferredBuffer, byteOffset, byteLength »).
1082 let transferred_view = create_buffer_source_with_constructor(
1083 cx,
1084 &Constructor::Name(Type::Uint8),
1085 &transferred_buffer,
1086 byte_offset,
1087 byte_length,
1088 )
1089 .expect("Construct Uint8Array failed");
1090
1091 // Perform ! ReadableStreamFulfillReadRequest(stream, transferredView, false).
1092 rooted!(&in(cx) let mut view_value = UndefinedValue());
1093 transferred_view.get_buffer_view_value(cx.into(), view_value.handle_mut());
1094 stream.fulfill_read_request(cx, view_value.handle(), false);
1095 }
1096 // Otherwise, if ! ReadableStreamHasBYOBReader(stream) is true,
1097 } else if stream.has_byob_reader() {
1098 // Perform ! ReadableByteStreamControllerEnqueueChunkToQueue(
1099 // controller, transferredBuffer, byteOffset, byteLength).
1100 self.enqueue_chunk_to_queue(transferred_buffer, byte_offset, byte_length);
1101
1102 // Let filledPullIntos be the result of performing !
1103 // ReadableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(controller).
1104 rooted!(&in(cx) let filled_pull_intos = self.process_pull_into_descriptors_using_queue(cx));
1105
1106 // For each filledPullInto of filledPullIntos,
1107 // Perform ! ReadableByteStreamControllerCommitPullIntoDescriptor(stream, filledPullInto).
1108 for filled_pull_into in &*filled_pull_intos {
1109 self.commit_pull_into_descriptor(cx, filled_pull_into)
1110 .expect("commit_pull_into_descriptor failed");
1111 }
1112 } else {
1113 // Assert: ! IsReadableStreamLocked(stream) is false.
1114 assert!(!stream.is_locked());
1115
1116 // Perform ! ReadableByteStreamControllerEnqueueChunkToQueue
1117 // (controller, transferredBuffer, byteOffset, byteLength).
1118 self.enqueue_chunk_to_queue(transferred_buffer, byte_offset, byte_length);
1119 }
1120
1121 // Perform ! ReadableByteStreamControllerCallPullIfNeeded(controller).
1122 self.call_pull_if_needed(cx);
1123
1124 Ok(())
1125 }
1126
1127 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-commit-pull-into-descriptor>
1128 fn commit_pull_into_descriptor(
1129 &self,
1130 cx: &mut JSContext,
1131 pull_into_descriptor: &PullIntoDescriptor,
1132 ) -> Fallible<()> {
1133 // Assert: stream.[[state]] is not "errored".
1134 let stream = self.stream.get().unwrap();
1135 assert!(!stream.is_errored());
1136
1137 // Assert: pullIntoDescriptor.reader type is not "none".
1138 assert!(pull_into_descriptor.reader_type.is_some());
1139
1140 // Let done be false.
1141 let mut done = false;
1142
1143 // If stream.[[state]] is "closed",
1144 if stream.is_closed() {
1145 // Assert: the remainder after dividing pullIntoDescriptor’s bytes filled
1146 // by pullIntoDescriptor’s element size is 0.
1147 assert!(
1148 pull_into_descriptor
1149 .bytes_filled
1150 .get()
1151 .is_multiple_of(pull_into_descriptor.element_size)
1152 );
1153
1154 // Set done to true.
1155 done = true;
1156 }
1157
1158 // Let filledView be ! ReadableByteStreamControllerConvertPullIntoDescriptor(pullIntoDescriptor).
1159 let filled_view = self
1160 .convert_pull_into_descriptor(cx, pull_into_descriptor)
1161 .expect("convert_pull_into_descriptor failed");
1162
1163 rooted!(&in(cx) let mut view_value = UndefinedValue());
1164 filled_view.get_buffer_view_value(cx.into(), view_value.handle_mut());
1165
1166 // If pullIntoDescriptor’s reader type is "default",
1167 if matches!(pull_into_descriptor.reader_type, Some(ReaderType::Default)) {
1168 // Perform ! ReadableStreamFulfillReadRequest(stream, filledView, done).
1169
1170 stream.fulfill_read_request(cx, view_value.handle(), done);
1171 } else {
1172 // Assert: pullIntoDescriptor’s reader type is "byob".
1173 assert!(matches!(
1174 pull_into_descriptor.reader_type,
1175 Some(ReaderType::Byob)
1176 ));
1177
1178 // Perform ! ReadableStreamFulfillReadIntoRequest(stream, filledView, done).
1179 stream.fulfill_read_into_request(cx, view_value.handle(), done);
1180 }
1181 Ok(())
1182 }
1183
1184 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-convert-pull-into-descriptor>
1185 pub(crate) fn convert_pull_into_descriptor(
1186 &self,
1187 cx: &mut js::context::JSContext,
1188 pull_into_descriptor: &PullIntoDescriptor,
1189 ) -> Fallible<RootedTraceableBox<HeapBufferSource<ArrayBufferViewU8>>> {
1190 // Let bytesFilled be pullIntoDescriptor’s bytes filled.
1191 let bytes_filled = pull_into_descriptor.bytes_filled.get();
1192
1193 // Let elementSize be pullIntoDescriptor’s element size.
1194 let element_size = pull_into_descriptor.element_size;
1195
1196 // Assert: bytesFilled ≤ pullIntoDescriptor’s byte length.
1197 assert!(bytes_filled <= pull_into_descriptor.byte_length);
1198
1199 // Assert: the remainder after dividing bytesFilled by elementSize is 0.
1200 assert!(bytes_filled.is_multiple_of(element_size));
1201
1202 // Let buffer be ! TransferArrayBuffer(pullIntoDescriptor’s buffer).
1203 let buffer = pull_into_descriptor
1204 .buffer
1205 .transfer_array_buffer(cx.into())
1206 .expect("TransferArrayBuffer failed");
1207
1208 // Return ! Construct(pullIntoDescriptor’s view constructor,
1209 // « buffer, pullIntoDescriptor’s byte offset, bytesFilled ÷ elementSize »).
1210 Ok(create_buffer_source_with_constructor(
1211 cx,
1212 &pull_into_descriptor.view_constructor,
1213 &buffer,
1214 pull_into_descriptor.byte_offset as usize,
1215 (bytes_filled / element_size) as usize,
1216 )
1217 .expect("Construct view failed"))
1218 }
1219
1220 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-process-pull-into-descriptors-using-queue>
1221 pub(crate) fn process_pull_into_descriptors_using_queue(
1222 &self,
1223 cx: &mut js::context::JSContext,
1224 ) -> Vec<PullIntoDescriptor> {
1225 // Assert: controller.[[closeRequested]] is false.
1226 assert!(!self.close_requested.get());
1227
1228 // Let filledPullIntos be a new empty list.
1229 rooted!(&in(cx) let mut filled_pull_intos = Vec::new());
1230
1231 // While controller.[[pendingPullIntos]] is not empty,
1232 loop {
1233 // If controller.[[queueTotalSize]] is 0, then break.
1234 if self.queue_total_size.get() == 0.0 {
1235 break;
1236 }
1237
1238 // Let pullIntoDescriptor be controller.[[pendingPullIntos]][0].
1239 let fill_pull_result = {
1240 let pending_pull_intos = self.pending_pull_intos.borrow();
1241 let Some(pull_into_descriptor) = pending_pull_intos.first() else {
1242 break;
1243 };
1244 self.fill_pull_into_descriptor_from_queue(cx, pull_into_descriptor)
1245 };
1246
1247 // If ! ReadableByteStreamControllerFillPullIntoDescriptorFromQueue(controller, pullIntoDescriptor) is true,
1248 if fill_pull_result {
1249 // Perform ! ReadableByteStreamControllerShiftPendingPullInto(controller).
1250 // Append pullIntoDescriptor to filledPullIntos.
1251 filled_pull_intos.push(self.shift_pending_pull_into());
1252 }
1253 }
1254
1255 // Return filledPullIntos.
1256 filled_pull_intos.take()
1257 }
1258
1259 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-fill-pull-into-descriptor-from-queue>
1260 pub(crate) fn fill_pull_into_descriptor_from_queue(
1261 &self,
1262 cx: &mut js::context::JSContext,
1263 pull_into_descriptor: &PullIntoDescriptor,
1264 ) -> bool {
1265 // Let maxBytesToCopy be min(controller.[[queueTotalSize]],
1266 // pullIntoDescriptor’s byte length − pullIntoDescriptor’s bytes filled).
1267 let max_bytes_to_copy = min(
1268 self.queue_total_size.get() as usize,
1269 (pull_into_descriptor.byte_length - pull_into_descriptor.bytes_filled.get()) as usize,
1270 );
1271
1272 // Let maxBytesFilled be pullIntoDescriptor’s bytes filled + maxBytesToCopy.
1273 let max_bytes_filled = pull_into_descriptor.bytes_filled.get() as usize + max_bytes_to_copy;
1274
1275 // Let totalBytesToCopyRemaining be maxBytesToCopy.
1276 let mut total_bytes_to_copy_remaining = max_bytes_to_copy;
1277
1278 // Let ready be false.
1279 let mut ready = false;
1280
1281 // Assert: ! IsDetachedBuffer(pullIntoDescriptor’s buffer) is false.
1282 assert!(!pull_into_descriptor.buffer.is_detached_buffer(cx.into()));
1283
1284 // Assert: pullIntoDescriptor’s bytes filled < pullIntoDescriptor’s minimum fill.
1285 assert!(pull_into_descriptor.bytes_filled.get() < pull_into_descriptor.minimum_fill);
1286
1287 // Let remainderBytes be the remainder after dividing maxBytesFilled by pullIntoDescriptor’s element size.
1288 let remainder_bytes = max_bytes_filled % pull_into_descriptor.element_size as usize;
1289
1290 // Let maxAlignedBytes be maxBytesFilled − remainderBytes.
1291 let max_aligned_bytes = max_bytes_filled - remainder_bytes;
1292
1293 // If maxAlignedBytes ≥ pullIntoDescriptor’s minimum fill,
1294 if max_aligned_bytes >= pull_into_descriptor.minimum_fill as usize {
1295 // Set totalBytesToCopyRemaining to maxAlignedBytes − pullIntoDescriptor’s bytes filled.
1296 total_bytes_to_copy_remaining =
1297 max_aligned_bytes - (pull_into_descriptor.bytes_filled.get() as usize);
1298
1299 // Set ready to true.
1300 ready = true;
1301 }
1302
1303 // Let queue be controller.[[queue]].
1304 let mut queue = self.queue.borrow_mut();
1305
1306 // While totalBytesToCopyRemaining > 0,
1307 while total_bytes_to_copy_remaining > 0 {
1308 // Let headOfQueue be queue[0].
1309 let head_of_queue = queue.front_mut().unwrap();
1310
1311 // Let bytesToCopy be min(totalBytesToCopyRemaining, headOfQueue’s byte length).
1312 let bytes_to_copy = total_bytes_to_copy_remaining.min(head_of_queue.byte_length);
1313
1314 // Let destStart be pullIntoDescriptor’s byte offset + pullIntoDescriptor’s bytes filled.
1315 let dest_start =
1316 pull_into_descriptor.byte_offset + pull_into_descriptor.bytes_filled.get();
1317
1318 // Let descriptorBuffer be pullIntoDescriptor’s buffer.
1319 let descriptor_buffer = &pull_into_descriptor.buffer;
1320
1321 // Let queueBuffer be headOfQueue’s buffer.
1322 let queue_buffer = &head_of_queue.buffer;
1323
1324 // Let queueByteOffset be headOfQueue’s byte offset.
1325 let queue_byte_offset = head_of_queue.byte_offset;
1326
1327 // Assert: ! CanCopyDataBlockBytes(descriptorBuffer, destStart,
1328 // queueBuffer, queueByteOffset, bytesToCopy) is true.
1329 assert!(descriptor_buffer.can_copy_data_block_bytes(
1330 cx.into(),
1331 dest_start as usize,
1332 queue_buffer,
1333 queue_byte_offset,
1334 bytes_to_copy
1335 ));
1336
1337 // Perform ! CopyDataBlockBytes(descriptorBuffer.[[ArrayBufferData]], destStart,
1338 // queueBuffer.[[ArrayBufferData]], queueByteOffset, bytesToCopy).
1339 descriptor_buffer.copy_data_block_bytes(
1340 cx.into(),
1341 dest_start as usize,
1342 queue_buffer,
1343 queue_byte_offset,
1344 bytes_to_copy,
1345 );
1346
1347 // If headOfQueue’s byte length is bytesToCopy,
1348 if head_of_queue.byte_length == bytes_to_copy {
1349 // Remove queue[0].
1350 queue.pop_front().unwrap();
1351 } else {
1352 // Set headOfQueue’s byte offset to headOfQueue’s byte offset + bytesToCopy.
1353 head_of_queue.byte_offset += bytes_to_copy;
1354
1355 // Set headOfQueue’s byte length to headOfQueue’s byte length − bytesToCopy.
1356 head_of_queue.byte_length -= bytes_to_copy;
1357 }
1358
1359 // Set controller.[[queueTotalSize]] to controller.[[queueTotalSize]] − bytesToCopy.
1360 self.queue_total_size
1361 .set(self.queue_total_size.get() - (bytes_to_copy as f64));
1362
1363 // Perform ! ReadableByteStreamControllerFillHeadPullIntoDescriptor(
1364 // controller, bytesToCopy, pullIntoDescriptor).
1365 self.fill_head_pull_into_descriptor(bytes_to_copy as u64, pull_into_descriptor);
1366
1367 // Set totalBytesToCopyRemaining to totalBytesToCopyRemaining − bytesToCopy.
1368 total_bytes_to_copy_remaining -= bytes_to_copy;
1369 }
1370
1371 // If ready is false,
1372 if !ready {
1373 // Assert: controller.[[queueTotalSize]] is 0.
1374 assert!(self.queue_total_size.get() == 0.0);
1375
1376 // Assert: pullIntoDescriptor’s bytes filled > 0.
1377 assert!(pull_into_descriptor.bytes_filled.get() > 0);
1378
1379 // Assert: pullIntoDescriptor’s bytes filled < pullIntoDescriptor’s minimum fill.
1380 assert!(pull_into_descriptor.bytes_filled.get() < pull_into_descriptor.minimum_fill);
1381 }
1382
1383 // Return ready.
1384 ready
1385 }
1386
1387 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-fill-head-pull-into-descriptor>
1388 pub(crate) fn fill_head_pull_into_descriptor(
1389 &self,
1390 bytes_copied: u64,
1391 pull_into_descriptor: &PullIntoDescriptor,
1392 ) {
1393 // Assert: either controller.[[pendingPullIntos]] is empty,
1394 // or controller.[[pendingPullIntos]][0] is pullIntoDescriptor.
1395 {
1396 let pending_pull_intos = self.pending_pull_intos.borrow();
1397 assert!(
1398 pending_pull_intos.is_empty() ||
1399 pending_pull_intos.first().unwrap() == pull_into_descriptor
1400 );
1401 }
1402
1403 // Assert: controller.[[byobRequest]] is null.
1404 assert!(self.byob_request.get().is_none());
1405
1406 // Set pullIntoDescriptor’s bytes filled to bytes filled + size.
1407 pull_into_descriptor
1408 .bytes_filled
1409 .set(pull_into_descriptor.bytes_filled.get() + bytes_copied);
1410 }
1411
1412 /// <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamcontrollerenqueuedetachedpullintotoqueue>
1413 pub(crate) fn enqueue_detached_pull_into_to_queue(&self, cx: &mut JSContext) -> Fallible<()> {
1414 // first_descriptor: &PullIntoDescriptor,
1415 let pending_pull_intos = self.pending_pull_intos.borrow();
1416 let first_descriptor = pending_pull_intos.first().unwrap();
1417
1418 // Assert: pullIntoDescriptor’s reader type is "none".
1419 assert!(first_descriptor.reader_type.is_none());
1420
1421 // If pullIntoDescriptor’s bytes filled > 0, perform ?
1422 // ReadableByteStreamControllerEnqueueClonedChunkToQueue(controller,
1423 // pullIntoDescriptor’s buffer, pullIntoDescriptor’s byte offset, pullIntoDescriptor’s bytes filled).
1424
1425 if first_descriptor.bytes_filled.get() > 0 {
1426 self.enqueue_cloned_chunk_to_queue(
1427 cx,
1428 &first_descriptor.buffer,
1429 first_descriptor.byte_offset,
1430 first_descriptor.bytes_filled.get(),
1431 )?;
1432 }
1433
1434 // needed to drop the borrow and avoid BorrowMutError
1435 drop(pending_pull_intos);
1436
1437 // Perform ! ReadableByteStreamControllerShiftPendingPullInto(controller).
1438 self.shift_pending_pull_into();
1439
1440 Ok(())
1441 }
1442
1443 /// <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamcontrollerenqueueclonedchunktoqueue>
1444 pub(crate) fn enqueue_cloned_chunk_to_queue(
1445 &self,
1446 cx: &mut JSContext,
1447 buffer: &HeapBufferSource<ArrayBufferU8>,
1448 byte_offset: u64,
1449 byte_length: u64,
1450 ) -> Fallible<()> {
1451 // Let cloneResult be CloneArrayBuffer(buffer, byteOffset, byteLength, %ArrayBuffer%).
1452 if let Ok(clone_result) =
1453 buffer.clone_array_buffer(cx, byte_offset as usize, byte_length as usize)
1454 {
1455 // Perform ! ReadableByteStreamControllerEnqueueChunkToQueue
1456 // (controller, cloneResult.[[Value]], 0, byteLength).
1457 self.enqueue_chunk_to_queue(clone_result, 0, byte_length as usize);
1458
1459 Ok(())
1460 } else {
1461 // If cloneResult is an abrupt completion,
1462
1463 // Perform ! ReadableByteStreamControllerError(controller, cloneResult.[[Value]]).
1464 rooted!(&in(cx) let mut rval = UndefinedValue());
1465 let error = Error::Type(c"can not clone array buffer".to_owned());
1466 error.clone().to_jsval(
1467 cx.into(),
1468 &self.global(),
1469 rval.handle_mut(),
1470 CanGc::from_cx(cx),
1471 );
1472 self.error(cx, rval.handle());
1473
1474 // Return cloneResult.
1475 Err(error)
1476 }
1477 }
1478
1479 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-enqueue-chunk-to-queue>
1480 pub(crate) fn enqueue_chunk_to_queue(
1481 &self,
1482 buffer: RootedTraceableBox<HeapBufferSource<ArrayBufferU8>>,
1483 byte_offset: usize,
1484 byte_length: usize,
1485 ) {
1486 // Let entry be a new ReadableByteStreamQueueEntry object.
1487 // Append entry to controller.[[queue]].
1488 self.queue
1489 .borrow_mut()
1490 .push_back(QueueEntry::new(buffer, byte_offset, byte_length));
1491
1492 // Set controller.[[queueTotalSize]] to controller.[[queueTotalSize]] + byteLength.
1493 self.queue_total_size
1494 .set(self.queue_total_size.get() + byte_length as f64);
1495 }
1496
1497 pub(crate) fn in_memory(&self) -> bool {
1498 let Some(underlying_source) = self.underlying_source.get() else {
1499 return false;
1500 };
1501 underlying_source.in_memory()
1502 }
1503
1504 pub(crate) fn get_in_memory_bytes(&self) -> Option<Vec<u8>> {
1505 let underlying_source = self.underlying_source.get()?;
1506 if !underlying_source.in_memory() {
1507 return None;
1508 }
1509
1510 let cx = GlobalScope::get_cx();
1511 self.queue.borrow().iter().try_fold(
1512 Vec::with_capacity(self.queue_total_size.get() as usize),
1513 |mut bytes, entry| {
1514 let mut chunk = vec![0; entry.byte_length];
1515 entry
1516 .buffer
1517 .copy_data_to(
1518 cx,
1519 &mut chunk,
1520 entry.byte_offset,
1521 entry.byte_offset + entry.byte_length,
1522 )
1523 .ok()?;
1524 bytes.extend(chunk);
1525 Some(bytes)
1526 },
1527 )
1528 }
1529
1530 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-shift-pending-pull-into>
1531 pub(crate) fn shift_pending_pull_into(&self) -> PullIntoDescriptor {
1532 // Assert: controller.[[byobRequest]] is null.
1533 assert!(self.byob_request.get().is_none());
1534
1535 // Let descriptor be controller.[[pendingPullIntos]][0].
1536 // Remove descriptor from controller.[[pendingPullIntos]].
1537 // Return descriptor.
1538 self.pending_pull_intos.borrow_mut().remove(0)
1539 }
1540
1541 /// <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamcontrollerprocessreadrequestsusingqueue>
1542 pub(crate) fn process_read_requests_using_queue(&self, cx: &mut JSContext) -> Fallible<()> {
1543 // Let reader be controller.[[stream]].[[reader]].
1544 // Assert: reader implements ReadableStreamDefaultReader.
1545 let reader = self.stream.get().unwrap().get_default_reader();
1546
1547 // Step 3
1548 reader.process_read_requests(cx, DomRoot::from_ref(self))
1549 }
1550
1551 /// <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamcontrollerfillreadrequestfromqueue>
1552 pub(crate) fn fill_read_request_from_queue(
1553 &self,
1554 cx: &mut JSContext,
1555 read_request: &ReadRequest,
1556 ) -> Fallible<()> {
1557 // Assert: controller.[[queueTotalSize]] > 0.
1558 assert!(self.queue_total_size.get() > 0.0);
1559 // Also assert that the queue has a non-zero length;
1560 assert!(!self.queue.borrow().is_empty());
1561
1562 // Let entry be controller.[[queue]][0].
1563 // Remove entry from controller.[[queue]].
1564 rooted!(&in(cx) let entry = self.remove_entry());
1565
1566 // Set controller.[[queueTotalSize]] to controller.[[queueTotalSize]] − entry’s byte length.
1567 self.queue_total_size
1568 .set(self.queue_total_size.get() - entry.byte_length as f64);
1569
1570 // Perform ! ReadableByteStreamControllerHandleQueueDrain(controller).
1571 self.handle_queue_drain(cx);
1572
1573 // Let view be ! Construct(%Uint8Array%, « entry’s buffer, entry’s byte offset, entry’s byte length »).
1574 let view = create_buffer_source_with_constructor(
1575 cx,
1576 &Constructor::Name(Type::Uint8),
1577 &entry.buffer,
1578 entry.byte_offset,
1579 entry.byte_length,
1580 )
1581 .expect("Construct Uint8Array failed");
1582
1583 // Perform readRequest’s chunk steps, given view.
1584 let result = RootedTraceableBox::new(Heap::default());
1585 rooted!(&in(cx) let mut view_value = UndefinedValue());
1586 view.get_buffer_view_value(cx.into(), view_value.handle_mut());
1587 result.set(*view_value);
1588
1589 read_request.chunk_steps(cx, result, &self.global());
1590
1591 Ok(())
1592 }
1593
1594 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-handle-queue-drain>
1595 pub(crate) fn handle_queue_drain(&self, cx: &mut JSContext) {
1596 // Assert: controller.[[stream]].[[state]] is "readable".
1597 assert!(self.stream.get().unwrap().is_readable());
1598
1599 // If controller.[[queueTotalSize]] is 0 and controller.[[closeRequested]] is true,
1600 if self.queue_total_size.get() == 0.0 && self.close_requested.get() {
1601 // Perform ! ReadableByteStreamControllerClearAlgorithms(controller).
1602 self.clear_algorithms();
1603
1604 // Perform ! ReadableStreamClose(controller.[[stream]]).
1605 self.stream.get().unwrap().close(cx);
1606 } else {
1607 // Perform ! ReadableByteStreamControllerCallPullIfNeeded(controller).
1608 self.call_pull_if_needed(cx);
1609 }
1610 }
1611
1612 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-call-pull-if-needed>
1613 fn call_pull_if_needed(&self, cx: &mut JSContext) {
1614 // Let shouldPull be ! ReadableByteStreamControllerShouldCallPull(controller).
1615 let should_pull = self.should_call_pull();
1616 // If shouldPull is false, return.
1617 if !should_pull {
1618 return;
1619 }
1620
1621 // If controller.[[pulling]] is true,
1622 if self.pulling.get() {
1623 // Set controller.[[pullAgain]] to true.
1624 self.pull_again.set(true);
1625
1626 // Return.
1627 return;
1628 }
1629
1630 // Assert: controller.[[pullAgain]] is false.
1631 assert!(!self.pull_again.get());
1632
1633 // Set controller.[[pulling]] to true.
1634 self.pulling.set(true);
1635
1636 // Let pullPromise be the result of performing controller.[[pullAlgorithm]].
1637 // Continues into the resolve and reject handling of the native handler.
1638 let global = self.global();
1639 let rooted_controller = DomRoot::from_ref(self);
1640 let controller = Controller::ReadableByteStreamController(rooted_controller.clone());
1641
1642 if let Some(underlying_source) = self.underlying_source.get() {
1643 let handler = PromiseNativeHandler::new(
1644 cx,
1645 &global,
1646 Some(Box::new(PullAlgorithmFulfillmentHandler {
1647 controller: Dom::from_ref(&rooted_controller),
1648 })),
1649 Some(Box::new(PullAlgorithmRejectionHandler {
1650 controller: Dom::from_ref(&rooted_controller),
1651 })),
1652 );
1653
1654 let mut realm = enter_auto_realm(cx, &*global);
1655 let cx = &mut realm.current_realm();
1656
1657 let result = underlying_source
1658 .call_pull_algorithm(cx, controller)
1659 .unwrap_or_else(|| {
1660 let promise = Promise::new_resolved(&global, cx.into(), (), CanGc::from_cx(cx));
1661 Ok(promise)
1662 });
1663 let promise = result.unwrap_or_else(|error| {
1664 rooted!(&in(cx) let mut rval = UndefinedValue());
1665 // TODO: check if `self.global()` is the right globalscope.
1666 error.to_jsval(cx.into(), &global, rval.handle_mut(), CanGc::from_cx(cx));
1667 Promise::new_rejected(&global, cx.into(), rval.handle(), CanGc::from_cx(cx))
1668 });
1669 promise.append_native_handler(cx, &handler);
1670 }
1671 }
1672
1673 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-should-call-pull>
1674 fn should_call_pull(&self) -> bool {
1675 // Let stream be controller.[[stream]].
1676 // Note: the spec does not assert that stream is not undefined here,
1677 // so we return false if it is.
1678 let stream = self.stream.get().unwrap();
1679
1680 // If stream.[[state]] is not "readable", return false.
1681 if !stream.is_readable() {
1682 return false;
1683 }
1684
1685 // If controller.[[closeRequested]] is true, return false.
1686 if self.close_requested.get() {
1687 return false;
1688 }
1689
1690 // If controller.[[started]] is false, return false.
1691 if !self.started.get() {
1692 return false;
1693 }
1694
1695 // If ! ReadableStreamHasDefaultReader(stream) is true and ! ReadableStreamGetNumReadRequests(stream) > 0
1696 // , return true.
1697 if stream.has_default_reader() && stream.get_num_read_requests() > 0 {
1698 return true;
1699 }
1700
1701 // If ! ReadableStreamHasBYOBReader(stream) is true and ! ReadableStreamGetNumReadIntoRequests(stream) > 0
1702 // , return true.
1703 if stream.has_byob_reader() && stream.get_num_read_into_requests() > 0 {
1704 return true;
1705 }
1706
1707 // Let desiredSize be ! ReadableByteStreamControllerGetDesiredSize(controller).
1708 let desired_size = self.get_desired_size();
1709
1710 // Assert: desiredSize is not null.
1711 assert!(desired_size.is_some());
1712
1713 // If desiredSize > 0, return true.
1714 if desired_size.unwrap() > 0. {
1715 return true;
1716 }
1717
1718 // Return false.
1719 false
1720 }
1721 /// <https://streams.spec.whatwg.org/#set-up-readable-byte-stream-controller>
1722 pub(crate) fn setup(
1723 &self,
1724 cx: &mut JSContext,
1725 global: &GlobalScope,
1726 stream: DomRoot<ReadableStream>,
1727 ) -> Fallible<()> {
1728 // Assert: stream.[[controller]] is undefined.
1729 stream.assert_no_controller();
1730
1731 // If autoAllocateChunkSize is not undefined,
1732 if self.auto_allocate_chunk_size.is_some() {
1733 // Assert: ! IsInteger(autoAllocateChunkSize) is true. Implicit
1734 // Assert: autoAllocateChunkSize is positive. (Implicit by type.)
1735 }
1736
1737 // Set controller.[[stream]] to stream.
1738 self.stream.set(Some(&stream));
1739
1740 // Set controller.[[pullAgain]] and controller.[[pulling]] to false.
1741 self.pull_again.set(false);
1742 self.pulling.set(false);
1743
1744 // Set controller.[[byobRequest]] to null.
1745 self.byob_request.set(None);
1746
1747 // Perform ! ResetQueue(controller).
1748 self.reset_queue();
1749
1750 // Set controller.[[closeRequested]] and controller.[[started]] to false.
1751 self.close_requested.set(false);
1752 self.started.set(false);
1753
1754 // Set controller.[[strategyHWM]] to highWaterMark.
1755 // Set controller.[[pullAlgorithm]] to pullAlgorithm.
1756 // Set controller.[[cancelAlgorithm]] to cancelAlgorithm.
1757 // Set controller.[[autoAllocateChunkSize]] to autoAllocateChunkSize.
1758 // Set controller.[[pendingPullIntos]] to a new empty list.
1759 // Note: the above steps are done in `new`.
1760
1761 // Set stream.[[controller]] to controller.
1762 let rooted_byte_controller = DomRoot::from_ref(self);
1763 stream.set_byte_controller(&rooted_byte_controller);
1764
1765 if let Some(underlying_source) = rooted_byte_controller.underlying_source.get() {
1766 // Let startResult be the result of performing startAlgorithm. (This might throw an exception.)
1767 let start_result = underlying_source
1768 .call_start_algorithm(
1769 cx,
1770 Controller::ReadableByteStreamController(rooted_byte_controller.clone()),
1771 )
1772 .unwrap_or_else(|| {
1773 Ok(Promise::new_resolved(
1774 global,
1775 cx.into(),
1776 (),
1777 CanGc::from_cx(cx),
1778 ))
1779 });
1780
1781 // Let startPromise be a promise resolved with startResult.
1782 let start_promise = start_result?;
1783
1784 // Upon fulfillment of startPromise, Upon rejection of startPromise with reason r,
1785 let handler = PromiseNativeHandler::new(
1786 cx,
1787 global,
1788 Some(Box::new(StartAlgorithmFulfillmentHandler {
1789 controller: Dom::from_ref(&rooted_byte_controller),
1790 })),
1791 Some(Box::new(StartAlgorithmRejectionHandler {
1792 controller: Dom::from_ref(&rooted_byte_controller),
1793 })),
1794 );
1795 let mut realm = enter_auto_realm(cx, global);
1796 let cx = &mut realm.current_realm();
1797 start_promise.append_native_handler(cx, &handler);
1798 };
1799
1800 Ok(())
1801 }
1802
1803 // <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamcontroller-releasesteps
1804 pub(crate) fn perform_release_steps(&self) -> Fallible<()> {
1805 // If this.[[pendingPullIntos]] is not empty,
1806 let mut pending_pull_intos = self.pending_pull_intos.borrow_mut();
1807 if !pending_pull_intos.is_empty() {
1808 // Let firstPendingPullInto be this.[[pendingPullIntos]][0].
1809 let mut first_pending_pull_into = RootedTraceableBox::new(pending_pull_intos.remove(0));
1810
1811 // Set firstPendingPullInto’s reader type to "none".
1812 first_pending_pull_into.reader_type = None;
1813
1814 // Set this.[[pendingPullIntos]] to the list « firstPendingPullInto »
1815 pending_pull_intos.clear();
1816 pending_pull_intos.push(*first_pending_pull_into.into_box());
1817 }
1818 Ok(())
1819 }
1820
1821 /// <https://streams.spec.whatwg.org/#rbs-controller-private-cancel>
1822 pub(crate) fn perform_cancel_steps(
1823 &self,
1824 cx: &mut JSContext,
1825 global: &GlobalScope,
1826 reason: SafeHandleValue,
1827 ) -> Rc<Promise> {
1828 // Perform ! ReadableByteStreamControllerClearPendingPullIntos(this).
1829 self.clear_pending_pull_intos();
1830
1831 // Perform ! ResetQueue(this).
1832 self.reset_queue();
1833
1834 let underlying_source = self
1835 .underlying_source
1836 .get()
1837 .expect("Controller should have a source when the cancel steps are called into.");
1838
1839 // Let result be the result of performing this.[[cancelAlgorithm]], passing in reason.
1840 let result = underlying_source
1841 .call_cancel_algorithm(cx, global, reason)
1842 .unwrap_or_else(|| {
1843 let promise = Promise::new2(cx, global);
1844 promise.resolve_native_with_cx(cx, &());
1845 Ok(promise)
1846 });
1847
1848 let promise = result.unwrap_or_else(|error| {
1849 rooted!(&in(cx) let mut rval = UndefinedValue());
1850 error.to_jsval(cx.into(), global, rval.handle_mut(), CanGc::from_cx(cx));
1851 let promise = Promise::new2(cx, global);
1852 promise.reject_native_with_cx(cx, &rval.handle());
1853 promise
1854 });
1855
1856 // Perform ! ReadableByteStreamControllerClearAlgorithms(this).
1857 self.clear_algorithms();
1858
1859 // Return result(the promise).
1860 promise
1861 }
1862
1863 /// <https://streams.spec.whatwg.org/#rbs-controller-private-pull>
1864 pub(crate) fn perform_pull_steps(&self, cx: &mut JSContext, read_request: &ReadRequest) {
1865 // Let stream be this.[[stream]].
1866 let stream = self.stream.get().unwrap();
1867
1868 // Assert: ! ReadableStreamHasDefaultReader(stream) is true.
1869 assert!(stream.has_default_reader());
1870
1871 // If this.[[queueTotalSize]] > 0,
1872 if self.queue_total_size.get() > 0.0 {
1873 // Assert: ! ReadableStreamGetNumReadRequests(stream) is 0.
1874 assert_eq!(stream.get_num_read_requests(), 0);
1875
1876 // Perform ! ReadableByteStreamControllerFillReadRequestFromQueue(this, readRequest).
1877 let _ = self.fill_read_request_from_queue(cx, read_request);
1878
1879 // Return.
1880 return;
1881 }
1882
1883 // Let autoAllocateChunkSize be this.[[autoAllocateChunkSize]].
1884 let auto_allocate_chunk_size = self.auto_allocate_chunk_size;
1885
1886 // If autoAllocateChunkSize is not undefined,
1887 if let Some(auto_allocate_chunk_size) = auto_allocate_chunk_size {
1888 // create_array_buffer_with_size
1889 // Let buffer be Construct(%ArrayBuffer%, « autoAllocateChunkSize »).
1890 match create_array_buffer_with_size(cx, auto_allocate_chunk_size as usize) {
1891 Ok(buffer) => {
1892 // Let pullIntoDescriptor be a new pull-into descriptor with
1893 // buffer buffer.[[Value]]
1894 // buffer byte length autoAllocateChunkSize
1895 // byte offset 0
1896 // byte length autoAllocateChunkSize
1897 // bytes filled 0
1898 // minimum fill 1
1899 // element size 1
1900 // view constructor %Uint8Array%
1901 // reader type "default"
1902
1903 // Append pullIntoDescriptor to this.[[pendingPullIntos]].
1904 self.pending_pull_intos
1905 .borrow_mut()
1906 .push(PullIntoDescriptor {
1907 buffer: *buffer.into_box(),
1908 buffer_byte_length: auto_allocate_chunk_size,
1909 byte_length: auto_allocate_chunk_size,
1910 byte_offset: 0,
1911 bytes_filled: Cell::new(0),
1912 minimum_fill: 1,
1913 element_size: 1,
1914 view_constructor: Constructor::Name(Type::Uint8),
1915 reader_type: Some(ReaderType::Default),
1916 });
1917 },
1918 Err(error) => {
1919 // If buffer is an abrupt completion,
1920 // Perform readRequest’s error steps, given buffer.[[Value]].
1921
1922 rooted!(&in(cx) let mut rval = UndefinedValue());
1923 error.to_jsval(
1924 cx.into(),
1925 &self.global(),
1926 rval.handle_mut(),
1927 CanGc::from_cx(cx),
1928 );
1929 read_request.error_steps(cx, rval.handle());
1930
1931 // Return.
1932 return;
1933 },
1934 }
1935 }
1936
1937 // Perform ! ReadableStreamAddReadRequest(stream, readRequest).
1938 stream.add_read_request(read_request);
1939
1940 // Perform ! ReadableByteStreamControllerCallPullIfNeeded(this).
1941 self.call_pull_if_needed(cx);
1942 }
1943
1944 /// Setting the JS object after the heap has settled down.
1945 pub(crate) fn set_underlying_source_this_object(&self, this_object: HandleObject) {
1946 if let Some(underlying_source) = self.underlying_source.get() {
1947 underlying_source.set_underlying_source_this_object(this_object);
1948 }
1949 }
1950
1951 pub(crate) fn remove_entry(&self) -> QueueEntry {
1952 self.queue
1953 .borrow_mut()
1954 .pop_front()
1955 .expect("Reader must have read request when remove is called into.")
1956 }
1957
1958 pub(crate) fn get_queue_total_size(&self) -> f64 {
1959 self.queue_total_size.get()
1960 }
1961
1962 pub(crate) fn get_pending_pull_intos_size(&self) -> usize {
1963 self.pending_pull_intos.borrow().len()
1964 }
1965}
1966
1967impl ReadableByteStreamControllerMethods<crate::DomTypeHolder> for ReadableByteStreamController {
1968 /// <https://streams.spec.whatwg.org/#rbs-controller-byob-request>
1969 fn GetByobRequest(
1970 &self,
1971 cx: &mut js::context::JSContext,
1972 ) -> Fallible<Option<DomRoot<ReadableStreamBYOBRequest>>> {
1973 // Return ! ReadableByteStreamControllerGetBYOBRequest(this).
1974 self.get_byob_request(cx)
1975 }
1976
1977 /// <https://streams.spec.whatwg.org/#rbs-controller-desired-size>
1978 fn GetDesiredSize(&self) -> Option<f64> {
1979 // Return ! ReadableByteStreamControllerGetDesiredSize(this).
1980 self.get_desired_size()
1981 }
1982
1983 /// <https://streams.spec.whatwg.org/#rbs-controller-close>
1984 fn Close(&self, cx: &mut JSContext) -> Fallible<()> {
1985 // If this.[[closeRequested]] is true, throw a TypeError exception.
1986 if self.close_requested.get() {
1987 return Err(Error::Type(c"closeRequested is true".to_owned()));
1988 }
1989
1990 // If this.[[stream]].[[state]] is not "readable", throw a TypeError exception.
1991 if !self.stream.get().unwrap().is_readable() {
1992 return Err(Error::Type(c"stream is not readable".to_owned()));
1993 }
1994
1995 // Perform ? ReadableByteStreamControllerClose(this).
1996 self.close(cx)
1997 }
1998
1999 /// <https://streams.spec.whatwg.org/#rbs-controller-enqueue>
2000 fn Enqueue(
2001 &self,
2002 cx: &mut JSContext,
2003 chunk: js::gc::CustomAutoRooterGuard<js::typedarray::ArrayBufferView>,
2004 ) -> Fallible<()> {
2005 let chunk = HeapBufferSource::<ArrayBufferViewU8>::from_view(chunk);
2006
2007 // If chunk.[[ByteLength]] is 0, throw a TypeError exception.
2008 if chunk.byte_length() == 0 {
2009 return Err(Error::Type(c"chunk.ByteLength is 0".to_owned()));
2010 }
2011
2012 // If chunk.[[ViewedArrayBuffer]].[[ByteLength]] is 0, throw a TypeError exception.
2013 if chunk.viewed_buffer_array_byte_length(cx.into()) == 0 {
2014 return Err(Error::Type(
2015 c"chunk.ViewedArrayBuffer.ByteLength is 0".to_owned(),
2016 ));
2017 }
2018
2019 // If this.[[closeRequested]] is true, throw a TypeError exception.
2020 if self.close_requested.get() {
2021 return Err(Error::Type(c"closeRequested is true".to_owned()));
2022 }
2023
2024 // If this.[[stream]].[[state]] is not "readable", throw a TypeError exception.
2025 if !self.stream.get().unwrap().is_readable() {
2026 return Err(Error::Type(c"stream is not readable".to_owned()));
2027 }
2028
2029 // Return ? ReadableByteStreamControllerEnqueue(this, chunk).
2030 self.enqueue(cx, chunk)
2031 }
2032
2033 /// <https://streams.spec.whatwg.org/#rbs-controller-error>
2034 fn Error(&self, cx: &mut JSContext, e: SafeHandleValue) -> Fallible<()> {
2035 // Perform ! ReadableByteStreamControllerError(this, e).
2036 self.error(cx, e);
2037 Ok(())
2038 }
2039}