script/dom/readablebytestreamcontroller.rs
1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4
5use std::cell::Cell;
6use std::cmp::min;
7use std::collections::VecDeque;
8use std::rc::Rc;
9
10use dom_struct::dom_struct;
11use js::jsapi::{Heap, Type};
12use js::jsval::UndefinedValue;
13use js::realm::CurrentRealm;
14use js::rust::{HandleObject, HandleValue as SafeHandleValue, HandleValue};
15use js::typedarray::{ArrayBufferU8, ArrayBufferViewU8};
16
17use super::bindings::buffer_source::HeapBufferSource;
18use super::bindings::cell::DomRefCell;
19use super::bindings::reflector::reflect_dom_object;
20use super::bindings::root::Dom;
21use super::readablestreambyobreader::ReadIntoRequest;
22use super::readablestreamdefaultreader::ReadRequest;
23use super::underlyingsourcecontainer::{UnderlyingSourceContainer, UnderlyingSourceType};
24use crate::dom::bindings::buffer_source::{
25 Constructor, byte_size, create_array_buffer_with_size, create_buffer_source_with_constructor,
26};
27use crate::dom::bindings::codegen::Bindings::ReadableByteStreamControllerBinding::ReadableByteStreamControllerMethods;
28use crate::dom::bindings::codegen::UnionTypes::ReadableStreamDefaultControllerOrReadableByteStreamController as Controller;
29use crate::dom::bindings::error::{Error, ErrorToJsval, Fallible};
30use crate::dom::bindings::reflector::{DomGlobal, Reflector};
31use crate::dom::bindings::root::{DomRoot, MutNullableDom};
32use crate::dom::bindings::trace::RootedTraceableBox;
33use crate::dom::globalscope::GlobalScope;
34use crate::dom::promise::Promise;
35use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
36use crate::dom::readablestream::ReadableStream;
37use crate::dom::readablestreambyobrequest::ReadableStreamBYOBRequest;
38use crate::realms::{InRealm, enter_realm};
39use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
40
41/// <https://streams.spec.whatwg.org/#readable-byte-stream-queue-entry>
42#[derive(JSTraceable, MallocSizeOf)]
43pub(crate) struct QueueEntry {
44 /// <https://streams.spec.whatwg.org/#readable-byte-stream-queue-entry-buffer>
45 #[ignore_malloc_size_of = "HeapBufferSource"]
46 buffer: HeapBufferSource<ArrayBufferU8>,
47 /// <https://streams.spec.whatwg.org/#readable-byte-stream-queue-entry-byte-offset>
48 byte_offset: usize,
49 /// <https://streams.spec.whatwg.org/#readable-byte-stream-queue-entry-byte-length>
50 byte_length: usize,
51}
52
53impl QueueEntry {
54 pub(crate) fn new(
55 buffer: HeapBufferSource<ArrayBufferU8>,
56 byte_offset: usize,
57 byte_length: usize,
58 ) -> QueueEntry {
59 QueueEntry {
60 buffer,
61 byte_offset,
62 byte_length,
63 }
64 }
65}
66
67#[derive(Debug, Eq, JSTraceable, MallocSizeOf, PartialEq)]
68pub(crate) enum ReaderType {
69 /// <https://streams.spec.whatwg.org/#readablestreambyobreader>
70 Byob,
71 /// <https://streams.spec.whatwg.org/#readablestreamdefaultreader>
72 Default,
73}
74
75/// <https://streams.spec.whatwg.org/#pull-into-descriptor>
76#[derive(Eq, JSTraceable, MallocSizeOf, PartialEq)]
77pub(crate) struct PullIntoDescriptor {
78 #[ignore_malloc_size_of = "HeapBufferSource"]
79 /// <https://streams.spec.whatwg.org/#pull-into-descriptor-buffer>
80 buffer: HeapBufferSource<ArrayBufferU8>,
81 /// <https://streams.spec.whatwg.org/#pull-into-descriptor-buffer-byte-length>
82 buffer_byte_length: u64,
83 /// <https://streams.spec.whatwg.org/#pull-into-descriptor-byte-offset>
84 byte_offset: u64,
85 /// <https://streams.spec.whatwg.org/#pull-into-descriptor-byte-length>
86 byte_length: u64,
87 /// <https://streams.spec.whatwg.org/#pull-into-descriptor-bytes-filled>
88 bytes_filled: Cell<u64>,
89 /// <https://streams.spec.whatwg.org/#pull-into-descriptor-minimum-fill>
90 minimum_fill: u64,
91 /// <https://streams.spec.whatwg.org/#pull-into-descriptor-element-size>
92 element_size: u64,
93 /// <https://streams.spec.whatwg.org/#pull-into-descriptor-view-constructor>
94 view_constructor: Constructor,
95 /// <https://streams.spec.whatwg.org/#pull-into-descriptor-reader-type>
96 reader_type: Option<ReaderType>,
97}
98
99/// The fulfillment handler for
100/// <https://streams.spec.whatwg.org/#dom-underlyingsource-start>
101#[derive(Clone, JSTraceable, MallocSizeOf)]
102#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
103struct StartAlgorithmFulfillmentHandler {
104 controller: Dom<ReadableByteStreamController>,
105}
106
107impl Callback for StartAlgorithmFulfillmentHandler {
108 /// Continuation of <https://streams.spec.whatwg.org/#set-up-readable-byte-stream-controller>
109 /// Upon fulfillment of startPromise,
110 fn callback(&self, cx: &mut CurrentRealm, _v: HandleValue) {
111 let can_gc = CanGc::from_cx(cx);
112 // Set controller.[[started]] to true.
113 self.controller.started.set(true);
114
115 // Assert: controller.[[pulling]] is false.
116 assert!(!self.controller.pulling.get());
117
118 // Assert: controller.[[pullAgain]] is false.
119 assert!(!self.controller.pull_again.get());
120
121 // Perform ! ReadableByteStreamControllerCallPullIfNeeded(controller).
122 self.controller.call_pull_if_needed(can_gc);
123 }
124}
125
126/// The rejection handler for
127/// <https://streams.spec.whatwg.org/#dom-underlyingsource-start>
128#[derive(Clone, JSTraceable, MallocSizeOf)]
129#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
130struct StartAlgorithmRejectionHandler {
131 controller: Dom<ReadableByteStreamController>,
132}
133
134impl Callback for StartAlgorithmRejectionHandler {
135 /// Continuation of <https://streams.spec.whatwg.org/#set-up-readable-byte-stream-controller>
136 /// Upon rejection of startPromise with reason r,
137 fn callback(&self, cx: &mut CurrentRealm, v: HandleValue) {
138 let can_gc = CanGc::from_cx(cx);
139 // Perform ! ReadableByteStreamControllerError(controller, r).
140 self.controller.error(v, can_gc);
141 }
142}
143
144/// The fulfillment handler for
145/// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-call-pull-if-needed>
146#[derive(Clone, JSTraceable, MallocSizeOf)]
147#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
148struct PullAlgorithmFulfillmentHandler {
149 controller: Dom<ReadableByteStreamController>,
150}
151
152impl Callback for PullAlgorithmFulfillmentHandler {
153 /// Continuation of <https://streams.spec.whatwg.org/#readable-byte-stream-controller-call-pull-if-needed>
154 /// Upon fulfillment of pullPromise
155 fn callback(&self, cx: &mut CurrentRealm, _v: HandleValue) {
156 let can_gc = CanGc::from_cx(cx);
157 // Set controller.[[pulling]] to false.
158 self.controller.pulling.set(false);
159
160 // If controller.[[pullAgain]] is true,
161 if self.controller.pull_again.get() {
162 // Set controller.[[pullAgain]] to false.
163 self.controller.pull_again.set(false);
164
165 // Perform ! ReadableByteStreamControllerCallPullIfNeeded(controller).
166 self.controller.call_pull_if_needed(can_gc);
167 }
168 }
169}
170
171/// The rejection handler for
172/// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-call-pull-if-needed>
173#[derive(Clone, JSTraceable, MallocSizeOf)]
174#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
175struct PullAlgorithmRejectionHandler {
176 controller: Dom<ReadableByteStreamController>,
177}
178
179impl Callback for PullAlgorithmRejectionHandler {
180 /// Continuation of <https://streams.spec.whatwg.org/#readable-stream-byte-controller-call-pull-if-needed>
181 /// Upon rejection of pullPromise with reason e.
182 fn callback(&self, cx: &mut CurrentRealm, v: HandleValue) {
183 let can_gc = CanGc::from_cx(cx);
184 // Perform ! ReadableByteStreamControllerError(controller, e).
185 self.controller.error(v, can_gc);
186 }
187}
188
189/// <https://streams.spec.whatwg.org/#readablebytestreamcontroller>
190#[dom_struct]
191pub(crate) struct ReadableByteStreamController {
192 reflector_: Reflector,
193 /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-autoallocatechunksize>
194 auto_allocate_chunk_size: Option<u64>,
195 /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-stream>
196 stream: MutNullableDom<ReadableStream>,
197 /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-strategyhwm>
198 strategy_hwm: f64,
199 /// A mutable reference to the underlying source is used to implement these two
200 /// internal slots:
201 ///
202 /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-pullalgorithm>
203 /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-cancelalgorithm>
204 underlying_source: MutNullableDom<UnderlyingSourceContainer>,
205 /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-queue>
206 queue: DomRefCell<VecDeque<QueueEntry>>,
207 /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-queuetotalsize>
208 queue_total_size: Cell<f64>,
209 /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-byobrequest>
210 byob_request: MutNullableDom<ReadableStreamBYOBRequest>,
211 /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-pendingpullintos>
212 pending_pull_intos: DomRefCell<Vec<PullIntoDescriptor>>,
213 /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-closerequested>
214 close_requested: Cell<bool>,
215 /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-started>
216 started: Cell<bool>,
217 /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-pulling>
218 pulling: Cell<bool>,
219 /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-pullalgorithm>
220 pull_again: Cell<bool>,
221}
222
223impl ReadableByteStreamController {
224 #[cfg_attr(crown, expect(crown::unrooted_must_root))]
225 fn new_inherited(
226 underlying_source_type: UnderlyingSourceType,
227 strategy_hwm: f64,
228 global: &GlobalScope,
229 can_gc: CanGc,
230 ) -> ReadableByteStreamController {
231 let underlying_source_container =
232 UnderlyingSourceContainer::new(global, underlying_source_type, can_gc);
233 let auto_allocate_chunk_size = underlying_source_container.auto_allocate_chunk_size();
234 ReadableByteStreamController {
235 reflector_: Reflector::new(),
236 byob_request: MutNullableDom::new(None),
237 stream: MutNullableDom::new(None),
238 underlying_source: MutNullableDom::new(Some(&*underlying_source_container)),
239 auto_allocate_chunk_size,
240 pending_pull_intos: DomRefCell::new(Vec::new()),
241 strategy_hwm,
242 close_requested: Default::default(),
243 queue: DomRefCell::new(Default::default()),
244 queue_total_size: Default::default(),
245 started: Default::default(),
246 pulling: Default::default(),
247 pull_again: Default::default(),
248 }
249 }
250
251 #[cfg_attr(crown, expect(crown::unrooted_must_root))]
252 pub(crate) fn new(
253 underlying_source_type: UnderlyingSourceType,
254 strategy_hwm: f64,
255 global: &GlobalScope,
256 can_gc: CanGc,
257 ) -> DomRoot<ReadableByteStreamController> {
258 reflect_dom_object(
259 Box::new(ReadableByteStreamController::new_inherited(
260 underlying_source_type,
261 strategy_hwm,
262 global,
263 can_gc,
264 )),
265 global,
266 can_gc,
267 )
268 }
269
270 pub(crate) fn set_stream(&self, stream: &ReadableStream) {
271 self.stream.set(Some(stream))
272 }
273
274 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-pull-into>
275 pub(crate) fn perform_pull_into(
276 &self,
277 cx: SafeJSContext,
278 read_into_request: &ReadIntoRequest,
279 view: HeapBufferSource<ArrayBufferViewU8>,
280 min: u64,
281 can_gc: CanGc,
282 ) {
283 // Let stream be controller.[[stream]].
284 let stream = self.stream.get().unwrap();
285
286 // Let elementSize be 1.
287 let mut element_size = 1;
288
289 // Let ctor be %DataView%.
290 let mut ctor = Constructor::DataView;
291
292 // If view has a [[TypedArrayName]] internal slot (i.e., it is not a DataView),
293 if view.has_typed_array_name() {
294 // Set elementSize to the element size specified in the
295 // typed array constructors table for view.[[TypedArrayName]].
296 let view_typw = view.get_array_buffer_view_type();
297 element_size = byte_size(view_typw);
298
299 // Set ctor to the constructor specified in the typed array constructors table for view.[[TypedArrayName]].
300 ctor = Constructor::Name(view_typw);
301 }
302
303 // Let minimumFill be min × elementSize.
304 let minimum_fill = min * element_size;
305
306 // Assert: minimumFill ≥ 0 and minimumFill ≤ view.[[ByteLength]].
307 assert!(minimum_fill <= (view.byte_length() as u64));
308
309 // Assert: the remainder after dividing minimumFill by elementSize is 0.
310 assert_eq!(minimum_fill % element_size, 0);
311
312 // Let byteOffset be view.[[ByteOffset]].
313 let byte_offset = view.get_byte_offset();
314
315 // Let byteLength be view.[[ByteLength]].
316 let byte_length = view.byte_length();
317
318 // Let bufferResult be TransferArrayBuffer(view.[[ViewedArrayBuffer]]).
319 match view
320 .get_array_buffer_view_buffer(cx)
321 .transfer_array_buffer(cx)
322 {
323 Ok(buffer) => {
324 // Let buffer be bufferResult.[[Value]].
325 // Let pullIntoDescriptor be a new pull-into descriptor with
326 // buffer buffer
327 // buffer byte length buffer.[[ArrayBufferByteLength]]
328 // byte offset byteOffset
329 // byte length byteLength
330 // bytes filled 0
331 // minimum fill minimumFill
332 // element size elementSize
333 // view constructor ctor
334 // reader type "byob"
335 let buffer_byte_length = buffer.byte_length();
336 let pull_into_descriptor = PullIntoDescriptor {
337 buffer,
338 buffer_byte_length: buffer_byte_length as u64,
339 byte_offset: byte_offset as u64,
340 byte_length: byte_length as u64,
341 bytes_filled: Cell::new(0),
342 minimum_fill,
343 element_size,
344 view_constructor: ctor.clone(),
345 reader_type: Some(ReaderType::Byob),
346 };
347
348 // If controller.[[pendingPullIntos]] is not empty,
349 {
350 let mut pending_pull_intos = self.pending_pull_intos.borrow_mut();
351 if !pending_pull_intos.is_empty() {
352 // Append pullIntoDescriptor to controller.[[pendingPullIntos]].
353 pending_pull_intos.push(pull_into_descriptor);
354
355 // Perform ! ReadableStreamAddReadIntoRequest(stream, readIntoRequest).
356 stream.add_read_into_request(read_into_request);
357
358 // Return.
359 return;
360 }
361 }
362
363 // If stream.[[state]] is "closed",
364 if stream.is_closed() {
365 // Let emptyView be ! Construct(ctor, « pullIntoDescriptor’s buffer,
366 // pullIntoDescriptor’s byte offset, 0 »).
367 if let Ok(empty_view) = create_buffer_source_with_constructor(
368 cx,
369 &ctor,
370 &pull_into_descriptor.buffer,
371 pull_into_descriptor.byte_offset as usize,
372 0,
373 ) {
374 // Perform readIntoRequest’s close steps, given emptyView.
375 let result = RootedTraceableBox::new(Heap::default());
376 rooted!(in(*cx) let mut view_value = UndefinedValue());
377 empty_view.get_buffer_view_value(cx, view_value.handle_mut());
378 result.set(*view_value);
379
380 read_into_request.close_steps(Some(result), can_gc);
381
382 // Return.
383 return;
384 } else {
385 return;
386 }
387 }
388
389 // If controller.[[queueTotalSize]] > 0,
390 if self.queue_total_size.get() > 0.0 {
391 // If ! ReadableByteStreamControllerFillPullIntoDescriptorFromQueue(
392 // controller, pullIntoDescriptor) is true,
393 if self.fill_pull_into_descriptor_from_queue(cx, &pull_into_descriptor) {
394 // Let filledView be ! ReadableByteStreamControllerConvertPullIntoDescriptor(
395 // pullIntoDescriptor).
396 if let Ok(filled_view) =
397 self.convert_pull_into_descriptor(cx, &pull_into_descriptor)
398 {
399 // Perform ! ReadableByteStreamControllerHandleQueueDrain(controller).
400 self.handle_queue_drain(can_gc);
401
402 // Perform readIntoRequest’s chunk steps, given filledView.
403 let result = RootedTraceableBox::new(Heap::default());
404 rooted!(in(*cx) let mut view_value = UndefinedValue());
405 filled_view.get_buffer_view_value(cx, view_value.handle_mut());
406 result.set(*view_value);
407 read_into_request.chunk_steps(result, can_gc);
408
409 // Return.
410 return;
411 } else {
412 return;
413 }
414 }
415
416 // If controller.[[closeRequested]] is true,
417 if self.close_requested.get() {
418 // Let e be a new TypeError exception.
419 rooted!(in(*cx) let mut error = UndefinedValue());
420 Error::Type("close requested".to_owned()).to_jsval(
421 cx,
422 &self.global(),
423 error.handle_mut(),
424 can_gc,
425 );
426
427 // Perform ! ReadableByteStreamControllerError(controller, e).
428 self.error(error.handle(), can_gc);
429
430 // Perform readIntoRequest’s error steps, given e.
431 read_into_request.error_steps(error.handle(), can_gc);
432
433 // Return.
434 return;
435 }
436 }
437
438 // Append pullIntoDescriptor to controller.[[pendingPullIntos]].
439 {
440 self.pending_pull_intos
441 .borrow_mut()
442 .push(pull_into_descriptor);
443 }
444 // Perform ! ReadableStreamAddReadIntoRequest(stream, readIntoRequest).
445 stream.add_read_into_request(read_into_request);
446
447 // Perform ! ReadableByteStreamControllerCallPullIfNeeded(controller).
448 self.call_pull_if_needed(can_gc);
449 },
450 Err(error) => {
451 // If bufferResult is an abrupt completion,
452
453 // Perform readIntoRequest’s error steps, given bufferResult.[[Value]].
454 rooted!(in(*cx) let mut rval = UndefinedValue());
455 error
456 .clone()
457 .to_jsval(cx, &self.global(), rval.handle_mut(), can_gc);
458 read_into_request.error_steps(rval.handle(), can_gc);
459
460 // Return.
461 },
462 }
463 }
464
465 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-respond>
466 pub(crate) fn respond(
467 &self,
468 cx: SafeJSContext,
469 bytes_written: u64,
470 can_gc: CanGc,
471 ) -> Fallible<()> {
472 {
473 // Assert: controller.[[pendingPullIntos]] is not empty.
474 let mut pending_pull_intos = self.pending_pull_intos.borrow_mut();
475 assert!(!pending_pull_intos.is_empty());
476
477 // Let firstDescriptor be controller.[[pendingPullIntos]][0].
478 let first_descriptor = pending_pull_intos.first_mut().unwrap();
479
480 // Let state be controller.[[stream]].[[state]].
481 let stream = self.stream.get().unwrap();
482
483 // If state is "closed",
484 if stream.is_closed() {
485 // If bytesWritten is not 0, throw a TypeError exception.
486 if bytes_written != 0 {
487 return Err(Error::Type(
488 "bytesWritten not zero on closed stream".to_owned(),
489 ));
490 }
491 } else {
492 // Assert: state is "readable".
493 assert!(stream.is_readable());
494
495 // If bytesWritten is 0, throw a TypeError exception.
496 if bytes_written == 0 {
497 return Err(Error::Type("bytesWritten is 0".to_owned()));
498 }
499
500 // If firstDescriptor’s bytes filled + bytesWritten > firstDescriptor’s byte length,
501 // throw a RangeError exception.
502 if first_descriptor.bytes_filled.get() + bytes_written >
503 first_descriptor.byte_length
504 {
505 return Err(Error::Range(
506 "bytes filled + bytesWritten > byte length".to_owned(),
507 ));
508 }
509 }
510
511 // Set firstDescriptor’s buffer to ! TransferArrayBuffer(firstDescriptor’s buffer).
512 first_descriptor.buffer = first_descriptor
513 .buffer
514 .transfer_array_buffer(cx)
515 .expect("TransferArrayBuffer failed");
516 }
517
518 // Perform ? ReadableByteStreamControllerRespondInternal(controller, bytesWritten).
519 self.respond_internal(cx, bytes_written, can_gc)
520 }
521
522 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-respond-internal>
523 pub(crate) fn respond_internal(
524 &self,
525 cx: SafeJSContext,
526 bytes_written: u64,
527 can_gc: CanGc,
528 ) -> Fallible<()> {
529 {
530 // Let firstDescriptor be controller.[[pendingPullIntos]][0].
531 let pending_pull_intos = self.pending_pull_intos.borrow();
532 let first_descriptor = pending_pull_intos.first().unwrap();
533
534 // Assert: ! CanTransferArrayBuffer(firstDescriptor’s buffer) is true
535 assert!(first_descriptor.buffer.can_transfer_array_buffer(cx));
536 }
537
538 // Perform ! ReadableByteStreamControllerInvalidateBYOBRequest(controller).
539 self.invalidate_byob_request();
540
541 // Let state be controller.[[stream]].[[state]].
542 let stream = self.stream.get().unwrap();
543
544 // If state is "closed",
545 if stream.is_closed() {
546 // Assert: bytesWritten is 0.
547 assert_eq!(bytes_written, 0);
548
549 // Perform ! ReadableByteStreamControllerRespondInClosedState(controller, firstDescriptor).
550 self.respond_in_closed_state(cx, can_gc)
551 .expect("respond_in_closed_state failed");
552 } else {
553 // Assert: state is "readable".
554 assert!(stream.is_readable());
555
556 // Assert: bytesWritten > 0.
557 assert!(bytes_written > 0);
558
559 // Perform ? ReadableByteStreamControllerRespondInReadableState(controller, bytesWritten, firstDescriptor).
560 self.respond_in_readable_state(cx, bytes_written, can_gc)?;
561 }
562
563 // Perform ! ReadableByteStreamControllerCallPullIfNeeded(controller).
564 self.call_pull_if_needed(can_gc);
565
566 Ok(())
567 }
568
569 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-respond-in-closed-state>
570 pub(crate) fn respond_in_closed_state(&self, cx: SafeJSContext, can_gc: CanGc) -> Fallible<()> {
571 let pending_pull_intos = self.pending_pull_intos.borrow();
572 let first_descriptor = pending_pull_intos.first().unwrap();
573
574 // Assert: the remainder after dividing firstDescriptor’s bytes filled
575 // by firstDescriptor’s element size is 0.
576 assert_eq!(
577 first_descriptor.bytes_filled.get() % first_descriptor.element_size,
578 0
579 );
580
581 // If firstDescriptor’s reader type is "none",
582 // perform ! ReadableByteStreamControllerShiftPendingPullInto(controller).
583 let reader_type = first_descriptor.reader_type.is_none();
584
585 // needed to drop the borrow and avoid BorrowMutError
586 drop(pending_pull_intos);
587
588 if reader_type {
589 self.shift_pending_pull_into();
590 }
591
592 // Let stream be controller.[[stream]].
593 let stream = self.stream.get().unwrap();
594
595 // If ! ReadableStreamHasBYOBReader(stream) is true,
596 if stream.has_byob_reader() {
597 // Let filledPullIntos be a new empty list.
598 let mut filled_pull_intos = Vec::new();
599
600 // While filledPullIntos’s size < ! ReadableStreamGetNumReadIntoRequests(stream),
601 while filled_pull_intos.len() < stream.get_num_read_into_requests() {
602 // Let pullIntoDescriptor be ! ReadableByteStreamControllerShiftPendingPullInto(controller).
603 let pull_into_descriptor = self.shift_pending_pull_into();
604
605 // Append pullIntoDescriptor to filledPullIntos.
606 filled_pull_intos.push(pull_into_descriptor);
607 }
608
609 // For each filledPullInto of filledPullIntos,
610 for filled_pull_into in filled_pull_intos {
611 // Perform ! ReadableByteStreamControllerCommitPullIntoDescriptor(stream, filledPullInto).
612 self.commit_pull_into_descriptor(cx, &filled_pull_into, can_gc)
613 .expect("commit_pull_into_descriptor failed");
614 }
615 }
616
617 Ok(())
618 }
619
620 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-respond-in-readable-state>
621 pub(crate) fn respond_in_readable_state(
622 &self,
623 cx: SafeJSContext,
624 bytes_written: u64,
625 can_gc: CanGc,
626 ) -> Fallible<()> {
627 let pending_pull_intos = self.pending_pull_intos.borrow();
628 let first_descriptor = pending_pull_intos.first().unwrap();
629
630 // Assert: pullIntoDescriptor’s bytes filled + bytesWritten ≤ pullIntoDescriptor’s byte length.
631 assert!(
632 first_descriptor.bytes_filled.get() + bytes_written <= first_descriptor.byte_length
633 );
634
635 // Perform ! ReadableByteStreamControllerFillHeadPullIntoDescriptor(
636 // controller, bytesWritten, pullIntoDescriptor).
637 self.fill_head_pull_into_descriptor(bytes_written, first_descriptor);
638
639 // If pullIntoDescriptor’s reader type is "none",
640 if first_descriptor.reader_type.is_none() {
641 // needed to drop the borrow and avoid BorrowMutError
642 drop(pending_pull_intos);
643
644 // Perform ? ReadableByteStreamControllerEnqueueDetachedPullIntoToQueue(controller, pullIntoDescriptor).
645 self.enqueue_detached_pull_into_to_queue(cx, can_gc)?;
646
647 // Let filledPullIntos be the result of performing
648 // ! ReadableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(controller).
649 let filled_pull_intos = self.process_pull_into_descriptors_using_queue(cx);
650
651 // For each filledPullInto of filledPullIntos,
652 for filled_pull_into in filled_pull_intos {
653 // Perform ! ReadableByteStreamControllerCommitPullIntoDescriptor(controller.[[stream]]
654 // , filledPullInto).
655 self.commit_pull_into_descriptor(cx, &filled_pull_into, can_gc)
656 .expect("commit_pull_into_descriptor failed");
657 }
658
659 // Return.
660 return Ok(());
661 }
662
663 // If pullIntoDescriptor’s bytes filled < pullIntoDescriptor’s minimum fill, return.
664 if first_descriptor.bytes_filled.get() < first_descriptor.minimum_fill {
665 return Ok(());
666 }
667
668 // needed to drop the borrow and avoid BorrowMutError
669 drop(pending_pull_intos);
670
671 // Perform ! ReadableByteStreamControllerShiftPendingPullInto(controller).
672 let pull_into_descriptor = self.shift_pending_pull_into();
673
674 // Let remainderSize be the remainder after dividing pullIntoDescriptor’s bytes
675 // filled by pullIntoDescriptor’s element size.
676 let remainder_size =
677 pull_into_descriptor.bytes_filled.get() % pull_into_descriptor.element_size;
678
679 // If remainderSize > 0,
680 if remainder_size > 0 {
681 // Let end be pullIntoDescriptor’s byte offset + pullIntoDescriptor’s bytes filled.
682 let end = pull_into_descriptor.byte_offset + pull_into_descriptor.bytes_filled.get();
683
684 // Perform ? ReadableByteStreamControllerEnqueueClonedChunkToQueue(controller,
685 // pullIntoDescriptor’s buffer, end − remainderSize, remainderSize).
686 self.enqueue_cloned_chunk_to_queue(
687 cx,
688 &pull_into_descriptor.buffer,
689 end - remainder_size,
690 remainder_size,
691 can_gc,
692 )?;
693 }
694
695 // Set pullIntoDescriptor’s bytes filled to pullIntoDescriptor’s bytes filled − remainderSize.
696 pull_into_descriptor
697 .bytes_filled
698 .set(pull_into_descriptor.bytes_filled.get() - remainder_size);
699
700 // Let filledPullIntos be the result of performing
701 // ! ReadableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(controller).
702 let filled_pull_intos = self.process_pull_into_descriptors_using_queue(cx);
703
704 // Perform ! ReadableByteStreamControllerCommitPullIntoDescriptor(controller.[[stream]], pullIntoDescriptor).
705 self.commit_pull_into_descriptor(cx, &pull_into_descriptor, can_gc)
706 .expect("commit_pull_into_descriptor failed");
707
708 // For each filledPullInto of filledPullIntos,
709 for filled_pull_into in filled_pull_intos {
710 // Perform ! ReadableByteStreamControllerCommitPullIntoDescriptor(controller.[[stream]], filledPullInto).
711 self.commit_pull_into_descriptor(cx, &filled_pull_into, can_gc)
712 .expect("commit_pull_into_descriptor failed");
713 }
714
715 Ok(())
716 }
717
718 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-respond-with-new-view>
719 pub(crate) fn respond_with_new_view(
720 &self,
721 cx: SafeJSContext,
722 view: HeapBufferSource<ArrayBufferViewU8>,
723 can_gc: CanGc,
724 ) -> Fallible<()> {
725 let view_byte_length;
726 {
727 // Assert: controller.[[pendingPullIntos]] is not empty.
728 let mut pending_pull_intos = self.pending_pull_intos.borrow_mut();
729 assert!(!pending_pull_intos.is_empty());
730
731 // Assert: ! IsDetachedBuffer(view.[[ViewedArrayBuffer]]) is false.
732 assert!(!view.is_detached_buffer(cx));
733
734 // Let firstDescriptor be controller.[[pendingPullIntos]][0].
735 let first_descriptor = pending_pull_intos.first_mut().unwrap();
736
737 // Let state be controller.[[stream]].[[state]].
738 let stream = self.stream.get().unwrap();
739
740 // If state is "closed",
741 if stream.is_closed() {
742 // If view.[[ByteLength]] is not 0, throw a TypeError exception.
743 if view.byte_length() != 0 {
744 return Err(Error::Type("view byte length is not 0".to_owned()));
745 }
746 } else {
747 // Assert: state is "readable".
748 assert!(stream.is_readable());
749
750 // If view.[[ByteLength]] is 0, throw a TypeError exception.
751 if view.byte_length() == 0 {
752 return Err(Error::Type("view byte length is 0".to_owned()));
753 }
754 }
755
756 // If firstDescriptor’s byte offset + firstDescriptor’ bytes filled is not view.[[ByteOffset]],
757 // throw a RangeError exception.
758 if first_descriptor.byte_offset + first_descriptor.bytes_filled.get() !=
759 (view.get_byte_offset() as u64)
760 {
761 return Err(Error::Range(
762 "firstDescriptor's byte offset + bytes filled is not view byte offset"
763 .to_owned(),
764 ));
765 }
766
767 // If firstDescriptor’s buffer byte length is not view.[[ViewedArrayBuffer]].[[ByteLength]],
768 // throw a RangeError exception.
769 if first_descriptor.buffer_byte_length !=
770 (view.viewed_buffer_array_byte_length(cx) as u64)
771 {
772 return Err(Error::Range(
773 "firstDescriptor's buffer byte length is not view viewed buffer array byte length"
774 .to_owned(),
775 ));
776 }
777
778 // If firstDescriptor’s bytes filled + view.[[ByteLength]] > firstDescriptor’s byte length,
779 // throw a RangeError exception.
780 if first_descriptor.bytes_filled.get() + (view.byte_length()) as u64 >
781 first_descriptor.byte_length
782 {
783 return Err(Error::Range(
784 "bytes filled + view byte length > byte length".to_owned(),
785 ));
786 }
787
788 // Let viewByteLength be view.[[ByteLength]].
789 view_byte_length = view.byte_length();
790
791 // Set firstDescriptor’s buffer to ? TransferArrayBuffer(view.[[ViewedArrayBuffer]]).
792 first_descriptor.buffer = view
793 .get_array_buffer_view_buffer(cx)
794 .transfer_array_buffer(cx)?;
795 }
796
797 // Perform ? ReadableByteStreamControllerRespondInternal(controller, viewByteLength).
798 self.respond_internal(cx, view_byte_length as u64, can_gc)
799 }
800
801 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-get-desired-size>
802 pub(crate) fn get_desired_size(&self) -> Option<f64> {
803 // Let state be controller.[[stream]].[[state]].
804 let stream = self.stream.get()?;
805
806 // If state is "errored", return null.
807 if stream.is_errored() {
808 return None;
809 }
810
811 // If state is "closed", return 0.
812 if stream.is_closed() {
813 return Some(0.0);
814 }
815
816 // Return controller.[[strategyHWM]] − controller.[[queueTotalSize]].
817 Some(self.strategy_hwm - self.queue_total_size.get())
818 }
819
820 /// <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamcontrollergetbyobrequest>
821 pub(crate) fn get_byob_request(
822 &self,
823 cx: SafeJSContext,
824 can_gc: CanGc,
825 ) -> Fallible<Option<DomRoot<ReadableStreamBYOBRequest>>> {
826 // If controller.[[byobRequest]] is null and controller.[[pendingPullIntos]] is not empty,
827 let pending_pull_intos = self.pending_pull_intos.borrow();
828 if self.byob_request.get().is_none() && !pending_pull_intos.is_empty() {
829 // Let firstDescriptor be controller.[[pendingPullIntos]][0].
830 let first_descriptor = pending_pull_intos.first().unwrap();
831 // Let view be ! Construct(%Uint8Array%, « firstDescriptor’s buffer,
832 // firstDescriptor’s byte offset + firstDescriptor’s bytes filled,
833 // firstDescriptor’s byte length − firstDescriptor’s bytes filled »).
834
835 let byte_offset = first_descriptor.byte_offset + first_descriptor.bytes_filled.get();
836 let byte_length = first_descriptor.byte_length - first_descriptor.bytes_filled.get();
837
838 let view = create_buffer_source_with_constructor(
839 cx,
840 &Constructor::Name(Type::Uint8),
841 &first_descriptor.buffer,
842 byte_offset as usize,
843 byte_length as usize,
844 )
845 .expect("Construct Uint8Array failed");
846
847 // Let byobRequest be a new ReadableStreamBYOBRequest.
848 let byob_request = ReadableStreamBYOBRequest::new(&self.global(), can_gc);
849
850 // Set byobRequest.[[controller]] to controller.
851 byob_request.set_controller(Some(&DomRoot::from_ref(self)));
852
853 // Set byobRequest.[[view]] to view.
854 byob_request.set_view(Some(view));
855
856 // Set controller.[[byobRequest]] to byobRequest.
857 self.byob_request.set(Some(&byob_request));
858 }
859
860 // Return controller.[[byobRequest]].
861 Ok(self.byob_request.get())
862 }
863
864 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-close>
865 pub(crate) fn close(&self, cx: SafeJSContext, can_gc: CanGc) -> Fallible<()> {
866 // Let stream be controller.[[stream]].
867 let stream = self.stream.get().unwrap();
868
869 // If controller.[[closeRequested]] is true or stream.[[state]] is not "readable", return.
870 if self.close_requested.get() || !stream.is_readable() {
871 return Ok(());
872 }
873
874 // If controller.[[queueTotalSize]] > 0,
875 if self.queue_total_size.get() > 0.0 {
876 // Set controller.[[closeRequested]] to true.
877 self.close_requested.set(true);
878 // Return.
879 return Ok(());
880 }
881
882 // If controller.[[pendingPullIntos]] is not empty,
883 let pending_pull_intos = self.pending_pull_intos.borrow();
884 if !pending_pull_intos.is_empty() {
885 // Let firstPendingPullInto be controller.[[pendingPullIntos]][0].
886 let first_pending_pull_into = pending_pull_intos.first().unwrap();
887
888 // If the remainder after dividing firstPendingPullInto’s bytes filled by
889 // firstPendingPullInto’s element size is not 0,
890 if first_pending_pull_into.bytes_filled.get() % first_pending_pull_into.element_size !=
891 0
892 {
893 // needed to drop the borrow and avoid BorrowMutError
894 drop(pending_pull_intos);
895
896 // Let e be a new TypeError exception.
897 let e = Error::Type(
898 "remainder after dividing firstPendingPullInto's bytes
899 filled by firstPendingPullInto's element size is not 0"
900 .to_owned(),
901 );
902
903 // Perform ! ReadableByteStreamControllerError(controller, e).
904 rooted!(in(*cx) let mut error = UndefinedValue());
905 e.clone()
906 .to_jsval(cx, &self.global(), error.handle_mut(), can_gc);
907 self.error(error.handle(), can_gc);
908
909 // Throw e.
910 return Err(e);
911 }
912 }
913
914 // Perform ! ReadableByteStreamControllerClearAlgorithms(controller).
915 self.clear_algorithms();
916
917 // Perform ! ReadableStreamClose(stream).
918 stream.close(can_gc);
919 Ok(())
920 }
921
922 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-error>
923 pub(crate) fn error(&self, e: SafeHandleValue, can_gc: CanGc) {
924 // Let stream be controller.[[stream]].
925 let stream = self.stream.get().unwrap();
926
927 // If stream.[[state]] is not "readable", return.
928 if !stream.is_readable() {
929 return;
930 }
931
932 // Perform ! ReadableByteStreamControllerClearPendingPullIntos(controller).
933 self.clear_pending_pull_intos();
934
935 // Perform ! ResetQueue(controller).
936 self.reset_queue();
937
938 // Perform ! ReadableByteStreamControllerClearAlgorithms(controller).
939 self.clear_algorithms();
940
941 // Perform ! ReadableStreamError(stream, e).
942 stream.error(e, can_gc);
943 }
944
945 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-clear-algorithms>
946 fn clear_algorithms(&self) {
947 // Set controller.[[pullAlgorithm]] to undefined.
948 // Set controller.[[cancelAlgorithm]] to undefined.
949 self.underlying_source.set(None);
950 }
951
952 /// <https://streams.spec.whatwg.org/#reset-queue>
953 pub(crate) fn reset_queue(&self) {
954 // Assert: container has [[queue]] and [[queueTotalSize]] internal slots.
955
956 // Set container.[[queue]] to a new empty list.
957 self.queue.borrow_mut().clear();
958
959 // Set container.[[queueTotalSize]] to 0.
960 self.queue_total_size.set(0.0);
961 }
962
963 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-clear-pending-pull-intos>
964 pub(crate) fn clear_pending_pull_intos(&self) {
965 // Perform ! ReadableByteStreamControllerInvalidateBYOBRequest(controller).
966 self.invalidate_byob_request();
967
968 // Set controller.[[pendingPullIntos]] to a new empty list.
969 self.pending_pull_intos.borrow_mut().clear();
970 }
971
972 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-invalidate-byob-request>
973 pub(crate) fn invalidate_byob_request(&self) {
974 if let Some(byob_request) = self.byob_request.get() {
975 // Set controller.[[byobRequest]].[[controller]] to undefined.
976 byob_request.set_controller(None);
977
978 // Set controller.[[byobRequest]].[[view]] to null.
979 byob_request.set_view(None);
980
981 // Set controller.[[byobRequest]] to null.
982 self.byob_request.set(None);
983 }
984 // If controller.[[byobRequest]] is null, return.
985 }
986
987 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-enqueue>
988 pub(crate) fn enqueue(
989 &self,
990 cx: SafeJSContext,
991 chunk: HeapBufferSource<ArrayBufferViewU8>,
992 can_gc: CanGc,
993 ) -> Fallible<()> {
994 // Let stream be controller.[[stream]].
995 let stream = self.stream.get().unwrap();
996
997 // If controller.[[closeRequested]] is true or stream.[[state]] is not "readable", return.
998 if self.close_requested.get() || !stream.is_readable() {
999 return Ok(());
1000 }
1001
1002 // Let buffer be chunk.[[ViewedArrayBuffer]].
1003 let buffer = chunk.get_array_buffer_view_buffer(cx);
1004
1005 // Let byteOffset be chunk.[[ByteOffset]].
1006 let byte_offset = chunk.get_byte_offset();
1007
1008 // Let byteLength be chunk.[[ByteLength]].
1009 let byte_length = chunk.byte_length();
1010
1011 // If ! IsDetachedBuffer(buffer) is true, throw a TypeError exception.
1012 if buffer.is_detached_buffer(cx) {
1013 return Err(Error::Type("buffer is detached".to_owned()));
1014 }
1015
1016 // Let transferredBuffer be ? TransferArrayBuffer(buffer).
1017 let transferred_buffer = buffer.transfer_array_buffer(cx)?;
1018
1019 // If controller.[[pendingPullIntos]] is not empty,
1020 {
1021 let mut pending_pull_intos = self.pending_pull_intos.borrow_mut();
1022 if !pending_pull_intos.is_empty() {
1023 // Let firstPendingPullInto be controller.[[pendingPullIntos]][0].
1024 let first_descriptor = pending_pull_intos.first_mut().unwrap();
1025 // If ! IsDetachedBuffer(firstPendingPullInto’s buffer) is true, throw a TypeError exception.
1026 if first_descriptor.buffer.is_detached_buffer(cx) {
1027 return Err(Error::Type("buffer is detached".to_owned()));
1028 }
1029
1030 // Perform ! ReadableByteStreamControllerInvalidateBYOBRequest(controller).
1031 self.invalidate_byob_request();
1032
1033 // Set firstPendingPullInto’s buffer to ! TransferArrayBuffer(firstPendingPullInto’s buffer).
1034 first_descriptor.buffer = first_descriptor
1035 .buffer
1036 .transfer_array_buffer(cx)
1037 .expect("TransferArrayBuffer failed");
1038
1039 // If firstPendingPullInto’s reader type is "none",
1040 if first_descriptor.reader_type.is_none() {
1041 // needed to drop the borrow and avoid BorrowMutError
1042 drop(pending_pull_intos);
1043
1044 // perform ? ReadableByteStreamControllerEnqueueDetachedPullIntoToQueue(
1045 // controller, firstPendingPullInto).
1046 self.enqueue_detached_pull_into_to_queue(cx, can_gc)?;
1047 }
1048 }
1049 }
1050
1051 // If ! ReadableStreamHasDefaultReader(stream) is true,
1052 if stream.has_default_reader() {
1053 // Perform ! ReadableByteStreamControllerProcessReadRequestsUsingQueue(controller).
1054 self.process_read_requests_using_queue(cx, can_gc)
1055 .expect("process_read_requests_using_queue failed");
1056
1057 // If ! ReadableStreamGetNumReadRequests(stream) is 0,
1058 if stream.get_num_read_requests() == 0 {
1059 // Assert: controller.[[pendingPullIntos]] is empty.
1060 {
1061 assert!(self.pending_pull_intos.borrow().is_empty());
1062 }
1063
1064 // Perform ! ReadableByteStreamControllerEnqueueChunkToQueue(
1065 // controller, transferredBuffer, byteOffset, byteLength).
1066 self.enqueue_chunk_to_queue(transferred_buffer, byte_offset, byte_length);
1067 } else {
1068 // Assert: controller.[[queue]] is empty.
1069 assert!(self.queue.borrow().is_empty());
1070
1071 // If controller.[[pendingPullIntos]] is not empty,
1072
1073 let pending_pull_intos = self.pending_pull_intos.borrow();
1074 if !pending_pull_intos.is_empty() {
1075 // Assert: controller.[[pendingPullIntos]][0]'s reader type is "default".
1076 assert!(matches!(
1077 pending_pull_intos.first().unwrap().reader_type,
1078 Some(ReaderType::Default)
1079 ));
1080
1081 // needed to drop the borrow and avoid BorrowMutError
1082 drop(pending_pull_intos);
1083
1084 // Perform ! ReadableByteStreamControllerShiftPendingPullInto(controller).
1085 self.shift_pending_pull_into();
1086 }
1087
1088 // Let transferredView be ! Construct(%Uint8Array%, « transferredBuffer, byteOffset, byteLength »).
1089 let transferred_view = create_buffer_source_with_constructor(
1090 cx,
1091 &Constructor::Name(Type::Uint8),
1092 &transferred_buffer,
1093 byte_offset,
1094 byte_length,
1095 )
1096 .expect("Construct Uint8Array failed");
1097
1098 // Perform ! ReadableStreamFulfillReadRequest(stream, transferredView, false).
1099 rooted!(in(*cx) let mut view_value = UndefinedValue());
1100 transferred_view.get_buffer_view_value(cx, view_value.handle_mut());
1101 stream.fulfill_read_request(view_value.handle(), false, can_gc);
1102 }
1103 // Otherwise, if ! ReadableStreamHasBYOBReader(stream) is true,
1104 } else if stream.has_byob_reader() {
1105 // Perform ! ReadableByteStreamControllerEnqueueChunkToQueue(
1106 // controller, transferredBuffer, byteOffset, byteLength).
1107 self.enqueue_chunk_to_queue(transferred_buffer, byte_offset, byte_length);
1108
1109 // Let filledPullIntos be the result of performing !
1110 // ReadableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(controller).
1111 let filled_pull_intos = self.process_pull_into_descriptors_using_queue(cx);
1112
1113 // For each filledPullInto of filledPullIntos,
1114 // Perform ! ReadableByteStreamControllerCommitPullIntoDescriptor(stream, filledPullInto).
1115 for filled_pull_into in filled_pull_intos {
1116 self.commit_pull_into_descriptor(cx, &filled_pull_into, can_gc)
1117 .expect("commit_pull_into_descriptor failed");
1118 }
1119 } else {
1120 // Assert: ! IsReadableStreamLocked(stream) is false.
1121 assert!(!stream.is_locked());
1122
1123 // Perform ! ReadableByteStreamControllerEnqueueChunkToQueue
1124 // (controller, transferredBuffer, byteOffset, byteLength).
1125 self.enqueue_chunk_to_queue(transferred_buffer, byte_offset, byte_length);
1126 }
1127
1128 // Perform ! ReadableByteStreamControllerCallPullIfNeeded(controller).
1129 self.call_pull_if_needed(can_gc);
1130
1131 Ok(())
1132 }
1133
1134 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-commit-pull-into-descriptor>
1135 pub(crate) fn commit_pull_into_descriptor(
1136 &self,
1137 cx: SafeJSContext,
1138 pull_into_descriptor: &PullIntoDescriptor,
1139 can_gc: CanGc,
1140 ) -> Fallible<()> {
1141 // Assert: stream.[[state]] is not "errored".
1142 let stream = self.stream.get().unwrap();
1143 assert!(!stream.is_errored());
1144
1145 // Assert: pullIntoDescriptor.reader type is not "none".
1146 assert!(pull_into_descriptor.reader_type.is_some());
1147
1148 // Let done be false.
1149 let mut done = false;
1150
1151 // If stream.[[state]] is "closed",
1152 if stream.is_closed() {
1153 // Assert: the remainder after dividing pullIntoDescriptor’s bytes filled
1154 // by pullIntoDescriptor’s element size is 0.
1155 assert!(
1156 pull_into_descriptor.bytes_filled.get() % pull_into_descriptor.element_size == 0
1157 );
1158
1159 // Set done to true.
1160 done = true;
1161 }
1162
1163 // Let filledView be ! ReadableByteStreamControllerConvertPullIntoDescriptor(pullIntoDescriptor).
1164 let filled_view = self
1165 .convert_pull_into_descriptor(cx, pull_into_descriptor)
1166 .expect("convert_pull_into_descriptor failed");
1167
1168 rooted!(in(*cx) let mut view_value = UndefinedValue());
1169 filled_view.get_buffer_view_value(cx, view_value.handle_mut());
1170
1171 // If pullIntoDescriptor’s reader type is "default",
1172 if matches!(pull_into_descriptor.reader_type, Some(ReaderType::Default)) {
1173 // Perform ! ReadableStreamFulfillReadRequest(stream, filledView, done).
1174
1175 stream.fulfill_read_request(view_value.handle(), done, can_gc);
1176 } else {
1177 // Assert: pullIntoDescriptor’s reader type is "byob".
1178 assert!(matches!(
1179 pull_into_descriptor.reader_type,
1180 Some(ReaderType::Byob)
1181 ));
1182
1183 // Perform ! ReadableStreamFulfillReadIntoRequest(stream, filledView, done).
1184 stream.fulfill_read_into_request(view_value.handle(), done, can_gc);
1185 }
1186 Ok(())
1187 }
1188
1189 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-convert-pull-into-descriptor>
1190 pub(crate) fn convert_pull_into_descriptor(
1191 &self,
1192 cx: SafeJSContext,
1193 pull_into_descriptor: &PullIntoDescriptor,
1194 ) -> Fallible<HeapBufferSource<ArrayBufferViewU8>> {
1195 // Let bytesFilled be pullIntoDescriptor’s bytes filled.
1196 let bytes_filled = pull_into_descriptor.bytes_filled.get();
1197
1198 // Let elementSize be pullIntoDescriptor’s element size.
1199 let element_size = pull_into_descriptor.element_size;
1200
1201 // Assert: bytesFilled ≤ pullIntoDescriptor’s byte length.
1202 assert!(bytes_filled <= pull_into_descriptor.byte_length);
1203
1204 // Assert: the remainder after dividing bytesFilled by elementSize is 0.
1205 assert!(bytes_filled % element_size == 0);
1206
1207 // Let buffer be ! TransferArrayBuffer(pullIntoDescriptor’s buffer).
1208 let buffer = pull_into_descriptor
1209 .buffer
1210 .transfer_array_buffer(cx)
1211 .expect("TransferArrayBuffer failed");
1212
1213 // Return ! Construct(pullIntoDescriptor’s view constructor,
1214 // « buffer, pullIntoDescriptor’s byte offset, bytesFilled ÷ elementSize »).
1215 Ok(create_buffer_source_with_constructor(
1216 cx,
1217 &pull_into_descriptor.view_constructor,
1218 &buffer,
1219 pull_into_descriptor.byte_offset as usize,
1220 (bytes_filled / element_size) as usize,
1221 )
1222 .expect("Construct view failed"))
1223 }
1224
1225 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-process-pull-into-descriptors-using-queue>
1226 pub(crate) fn process_pull_into_descriptors_using_queue(
1227 &self,
1228 cx: SafeJSContext,
1229 ) -> Vec<PullIntoDescriptor> {
1230 // Assert: controller.[[closeRequested]] is false.
1231 assert!(!self.close_requested.get());
1232
1233 // Let filledPullIntos be a new empty list.
1234 let mut filled_pull_intos = Vec::new();
1235
1236 // While controller.[[pendingPullIntos]] is not empty,
1237 loop {
1238 // If controller.[[queueTotalSize]] is 0, then break.
1239 if self.queue_total_size.get() == 0.0 {
1240 break;
1241 }
1242
1243 // Let pullIntoDescriptor be controller.[[pendingPullIntos]][0].
1244 let fill_pull_result = {
1245 let pending_pull_intos = self.pending_pull_intos.borrow();
1246 let Some(pull_into_descriptor) = pending_pull_intos.first() else {
1247 break;
1248 };
1249 self.fill_pull_into_descriptor_from_queue(cx, pull_into_descriptor)
1250 };
1251
1252 // If ! ReadableByteStreamControllerFillPullIntoDescriptorFromQueue(controller, pullIntoDescriptor) is true,
1253 if fill_pull_result {
1254 // Perform ! ReadableByteStreamControllerShiftPendingPullInto(controller).
1255 let pull_into_descriptor = self.shift_pending_pull_into();
1256
1257 // Append pullIntoDescriptor to filledPullIntos.
1258 filled_pull_intos.push(pull_into_descriptor);
1259 }
1260 }
1261
1262 // Return filledPullIntos.
1263 filled_pull_intos
1264 }
1265
1266 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-fill-pull-into-descriptor-from-queue>
1267 pub(crate) fn fill_pull_into_descriptor_from_queue(
1268 &self,
1269 cx: SafeJSContext,
1270 pull_into_descriptor: &PullIntoDescriptor,
1271 ) -> bool {
1272 // Let maxBytesToCopy be min(controller.[[queueTotalSize]],
1273 // pullIntoDescriptor’s byte length − pullIntoDescriptor’s bytes filled).
1274 let max_bytes_to_copy = min(
1275 self.queue_total_size.get() as usize,
1276 (pull_into_descriptor.byte_length - pull_into_descriptor.bytes_filled.get()) as usize,
1277 );
1278
1279 // Let maxBytesFilled be pullIntoDescriptor’s bytes filled + maxBytesToCopy.
1280 let max_bytes_filled = pull_into_descriptor.bytes_filled.get() as usize + max_bytes_to_copy;
1281
1282 // Let totalBytesToCopyRemaining be maxBytesToCopy.
1283 let mut total_bytes_to_copy_remaining = max_bytes_to_copy;
1284
1285 // Let ready be false.
1286 let mut ready = false;
1287
1288 // Assert: ! IsDetachedBuffer(pullIntoDescriptor’s buffer) is false.
1289 assert!(!pull_into_descriptor.buffer.is_detached_buffer(cx));
1290
1291 // Assert: pullIntoDescriptor’s bytes filled < pullIntoDescriptor’s minimum fill.
1292 assert!(pull_into_descriptor.bytes_filled.get() < pull_into_descriptor.minimum_fill);
1293
1294 // Let remainderBytes be the remainder after dividing maxBytesFilled by pullIntoDescriptor’s element size.
1295 let remainder_bytes = max_bytes_filled % pull_into_descriptor.element_size as usize;
1296
1297 // Let maxAlignedBytes be maxBytesFilled − remainderBytes.
1298 let max_aligned_bytes = max_bytes_filled - remainder_bytes;
1299
1300 // If maxAlignedBytes ≥ pullIntoDescriptor’s minimum fill,
1301 if max_aligned_bytes >= pull_into_descriptor.minimum_fill as usize {
1302 // Set totalBytesToCopyRemaining to maxAlignedBytes − pullIntoDescriptor’s bytes filled.
1303 total_bytes_to_copy_remaining =
1304 max_aligned_bytes - (pull_into_descriptor.bytes_filled.get() as usize);
1305
1306 // Set ready to true.
1307 ready = true;
1308 }
1309
1310 // Let queue be controller.[[queue]].
1311 let mut queue = self.queue.borrow_mut();
1312
1313 // While totalBytesToCopyRemaining > 0,
1314 while total_bytes_to_copy_remaining > 0 {
1315 // Let headOfQueue be queue[0].
1316 let head_of_queue = queue.front_mut().unwrap();
1317
1318 // Let bytesToCopy be min(totalBytesToCopyRemaining, headOfQueue’s byte length).
1319 let bytes_to_copy = total_bytes_to_copy_remaining.min(head_of_queue.byte_length);
1320
1321 // Let destStart be pullIntoDescriptor’s byte offset + pullIntoDescriptor’s bytes filled.
1322 let dest_start =
1323 pull_into_descriptor.byte_offset + pull_into_descriptor.bytes_filled.get();
1324
1325 // Let descriptorBuffer be pullIntoDescriptor’s buffer.
1326 let descriptor_buffer = &pull_into_descriptor.buffer;
1327
1328 // Let queueBuffer be headOfQueue’s buffer.
1329 let queue_buffer = &head_of_queue.buffer;
1330
1331 // Let queueByteOffset be headOfQueue’s byte offset.
1332 let queue_byte_offset = head_of_queue.byte_offset;
1333
1334 // Assert: ! CanCopyDataBlockBytes(descriptorBuffer, destStart,
1335 // queueBuffer, queueByteOffset, bytesToCopy) is true.
1336 assert!(descriptor_buffer.can_copy_data_block_bytes(
1337 cx,
1338 dest_start as usize,
1339 queue_buffer,
1340 queue_byte_offset,
1341 bytes_to_copy
1342 ));
1343
1344 // Perform ! CopyDataBlockBytes(descriptorBuffer.[[ArrayBufferData]], destStart,
1345 // queueBuffer.[[ArrayBufferData]], queueByteOffset, bytesToCopy).
1346 descriptor_buffer.copy_data_block_bytes(
1347 cx,
1348 dest_start as usize,
1349 queue_buffer,
1350 queue_byte_offset,
1351 bytes_to_copy,
1352 );
1353
1354 // If headOfQueue’s byte length is bytesToCopy,
1355 if head_of_queue.byte_length == bytes_to_copy {
1356 // Remove queue[0].
1357 queue.pop_front().unwrap();
1358 } else {
1359 // Set headOfQueue’s byte offset to headOfQueue’s byte offset + bytesToCopy.
1360 head_of_queue.byte_offset += bytes_to_copy;
1361
1362 // Set headOfQueue’s byte length to headOfQueue’s byte length − bytesToCopy.
1363 head_of_queue.byte_length -= bytes_to_copy;
1364 }
1365
1366 // Set controller.[[queueTotalSize]] to controller.[[queueTotalSize]] − bytesToCopy.
1367 self.queue_total_size
1368 .set(self.queue_total_size.get() - (bytes_to_copy as f64));
1369
1370 // Perform ! ReadableByteStreamControllerFillHeadPullIntoDescriptor(
1371 // controller, bytesToCopy, pullIntoDescriptor).
1372 self.fill_head_pull_into_descriptor(bytes_to_copy as u64, pull_into_descriptor);
1373
1374 // Set totalBytesToCopyRemaining to totalBytesToCopyRemaining − bytesToCopy.
1375 total_bytes_to_copy_remaining -= bytes_to_copy;
1376 }
1377
1378 // If ready is false,
1379 if !ready {
1380 // Assert: controller.[[queueTotalSize]] is 0.
1381 assert!(self.queue_total_size.get() == 0.0);
1382
1383 // Assert: pullIntoDescriptor’s bytes filled > 0.
1384 assert!(pull_into_descriptor.bytes_filled.get() > 0);
1385
1386 // Assert: pullIntoDescriptor’s bytes filled < pullIntoDescriptor’s minimum fill.
1387 assert!(pull_into_descriptor.bytes_filled.get() < pull_into_descriptor.minimum_fill);
1388 }
1389
1390 // Return ready.
1391 ready
1392 }
1393
1394 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-fill-head-pull-into-descriptor>
1395 pub(crate) fn fill_head_pull_into_descriptor(
1396 &self,
1397 bytes_copied: u64,
1398 pull_into_descriptor: &PullIntoDescriptor,
1399 ) {
1400 // Assert: either controller.[[pendingPullIntos]] is empty,
1401 // or controller.[[pendingPullIntos]][0] is pullIntoDescriptor.
1402 {
1403 let pending_pull_intos = self.pending_pull_intos.borrow();
1404 assert!(
1405 pending_pull_intos.is_empty() ||
1406 pending_pull_intos.first().unwrap() == pull_into_descriptor
1407 );
1408 }
1409
1410 // Assert: controller.[[byobRequest]] is null.
1411 assert!(self.byob_request.get().is_none());
1412
1413 // Set pullIntoDescriptor’s bytes filled to bytes filled + size.
1414 pull_into_descriptor
1415 .bytes_filled
1416 .set(pull_into_descriptor.bytes_filled.get() + bytes_copied);
1417 }
1418
1419 /// <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamcontrollerenqueuedetachedpullintotoqueue>
1420 pub(crate) fn enqueue_detached_pull_into_to_queue(
1421 &self,
1422 cx: SafeJSContext,
1423 can_gc: CanGc,
1424 ) -> Fallible<()> {
1425 // first_descriptor: &PullIntoDescriptor,
1426 let pending_pull_intos = self.pending_pull_intos.borrow();
1427 let first_descriptor = pending_pull_intos.first().unwrap();
1428
1429 // Assert: pullIntoDescriptor’s reader type is "none".
1430 assert!(first_descriptor.reader_type.is_none());
1431
1432 // If pullIntoDescriptor’s bytes filled > 0, perform ?
1433 // ReadableByteStreamControllerEnqueueClonedChunkToQueue(controller,
1434 // pullIntoDescriptor’s buffer, pullIntoDescriptor’s byte offset, pullIntoDescriptor’s bytes filled).
1435
1436 if first_descriptor.bytes_filled.get() > 0 {
1437 self.enqueue_cloned_chunk_to_queue(
1438 cx,
1439 &first_descriptor.buffer,
1440 first_descriptor.byte_offset,
1441 first_descriptor.bytes_filled.get(),
1442 can_gc,
1443 )?;
1444 }
1445
1446 // needed to drop the borrow and avoid BorrowMutError
1447 drop(pending_pull_intos);
1448
1449 // Perform ! ReadableByteStreamControllerShiftPendingPullInto(controller).
1450 self.shift_pending_pull_into();
1451
1452 Ok(())
1453 }
1454
1455 /// <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamcontrollerenqueueclonedchunktoqueue>
1456 pub(crate) fn enqueue_cloned_chunk_to_queue(
1457 &self,
1458 cx: SafeJSContext,
1459 buffer: &HeapBufferSource<ArrayBufferU8>,
1460 byte_offset: u64,
1461 byte_length: u64,
1462 can_gc: CanGc,
1463 ) -> Fallible<()> {
1464 // Let cloneResult be CloneArrayBuffer(buffer, byteOffset, byteLength, %ArrayBuffer%).
1465 if let Ok(clone_result) =
1466 buffer.clone_array_buffer(cx, byte_offset as usize, byte_length as usize)
1467 {
1468 // Perform ! ReadableByteStreamControllerEnqueueChunkToQueue
1469 // (controller, cloneResult.[[Value]], 0, byteLength).
1470 self.enqueue_chunk_to_queue(clone_result, 0, byte_length as usize);
1471
1472 Ok(())
1473 } else {
1474 // If cloneResult is an abrupt completion,
1475
1476 // Perform ! ReadableByteStreamControllerError(controller, cloneResult.[[Value]]).
1477 rooted!(in(*cx) let mut rval = UndefinedValue());
1478 let error = Error::Type("can not clone array buffer".to_owned());
1479 error
1480 .clone()
1481 .to_jsval(cx, &self.global(), rval.handle_mut(), can_gc);
1482 self.error(rval.handle(), can_gc);
1483
1484 // Return cloneResult.
1485 Err(error)
1486 }
1487 }
1488
1489 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-enqueue-chunk-to-queue>
1490 pub(crate) fn enqueue_chunk_to_queue(
1491 &self,
1492 buffer: HeapBufferSource<ArrayBufferU8>,
1493 byte_offset: usize,
1494 byte_length: usize,
1495 ) {
1496 // Let entry be a new ReadableByteStreamQueueEntry object.
1497 let entry = QueueEntry::new(buffer, byte_offset, byte_length);
1498
1499 // Append entry to controller.[[queue]].
1500 self.queue.borrow_mut().push_back(entry);
1501
1502 // Set controller.[[queueTotalSize]] to controller.[[queueTotalSize]] + byteLength.
1503 self.queue_total_size
1504 .set(self.queue_total_size.get() + byte_length as f64);
1505 }
1506
1507 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-shift-pending-pull-into>
1508 pub(crate) fn shift_pending_pull_into(&self) -> PullIntoDescriptor {
1509 // Assert: controller.[[byobRequest]] is null.
1510 assert!(self.byob_request.get().is_none());
1511
1512 // Let descriptor be controller.[[pendingPullIntos]][0].
1513 // Remove descriptor from controller.[[pendingPullIntos]].
1514 // Return descriptor.
1515 self.pending_pull_intos.borrow_mut().remove(0)
1516 }
1517
1518 /// <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamcontrollerprocessreadrequestsusingqueue>
1519 pub(crate) fn process_read_requests_using_queue(
1520 &self,
1521 cx: SafeJSContext,
1522 can_gc: CanGc,
1523 ) -> Fallible<()> {
1524 // Let reader be controller.[[stream]].[[reader]].
1525 // Assert: reader implements ReadableStreamDefaultReader.
1526 let reader = self.stream.get().unwrap().get_default_reader();
1527
1528 // Step 3
1529 reader.process_read_requests(cx, DomRoot::from_ref(self), can_gc)
1530 }
1531
1532 /// <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamcontrollerfillreadrequestfromqueue>
1533 pub(crate) fn fill_read_request_from_queue(
1534 &self,
1535 cx: SafeJSContext,
1536 read_request: &ReadRequest,
1537 can_gc: CanGc,
1538 ) -> Fallible<()> {
1539 // Assert: controller.[[queueTotalSize]] > 0.
1540 assert!(self.queue_total_size.get() > 0.0);
1541 // Also assert that the queue has a non-zero length;
1542 assert!(!self.queue.borrow().is_empty());
1543
1544 // Let entry be controller.[[queue]][0].
1545 // Remove entry from controller.[[queue]].
1546 let entry = self.remove_entry();
1547
1548 // Set controller.[[queueTotalSize]] to controller.[[queueTotalSize]] − entry’s byte length.
1549 self.queue_total_size
1550 .set(self.queue_total_size.get() - entry.byte_length as f64);
1551
1552 // Perform ! ReadableByteStreamControllerHandleQueueDrain(controller).
1553 self.handle_queue_drain(can_gc);
1554
1555 // Let view be ! Construct(%Uint8Array%, « entry’s buffer, entry’s byte offset, entry’s byte length »).
1556 let view = create_buffer_source_with_constructor(
1557 cx,
1558 &Constructor::Name(Type::Uint8),
1559 &entry.buffer,
1560 entry.byte_offset,
1561 entry.byte_length,
1562 )
1563 .expect("Construct Uint8Array failed");
1564
1565 // Perform readRequest’s chunk steps, given view.
1566 let result = RootedTraceableBox::new(Heap::default());
1567 rooted!(in(*cx) let mut view_value = UndefinedValue());
1568 view.get_buffer_view_value(cx, view_value.handle_mut());
1569 result.set(*view_value);
1570
1571 read_request.chunk_steps(result, &self.global(), can_gc);
1572
1573 Ok(())
1574 }
1575
1576 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-handle-queue-drain>
1577 pub(crate) fn handle_queue_drain(&self, can_gc: CanGc) {
1578 // Assert: controller.[[stream]].[[state]] is "readable".
1579 assert!(self.stream.get().unwrap().is_readable());
1580
1581 // If controller.[[queueTotalSize]] is 0 and controller.[[closeRequested]] is true,
1582 if self.queue_total_size.get() == 0.0 && self.close_requested.get() {
1583 // Perform ! ReadableByteStreamControllerClearAlgorithms(controller).
1584 self.clear_algorithms();
1585
1586 // Perform ! ReadableStreamClose(controller.[[stream]]).
1587 self.stream.get().unwrap().close(can_gc);
1588 } else {
1589 // Perform ! ReadableByteStreamControllerCallPullIfNeeded(controller).
1590 self.call_pull_if_needed(can_gc);
1591 }
1592 }
1593
1594 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-call-pull-if-needed>
1595 pub(crate) fn call_pull_if_needed(&self, can_gc: CanGc) {
1596 // Let shouldPull be ! ReadableByteStreamControllerShouldCallPull(controller).
1597 let should_pull = self.should_call_pull();
1598 // If shouldPull is false, return.
1599 if !should_pull {
1600 return;
1601 }
1602
1603 // If controller.[[pulling]] is true,
1604 if self.pulling.get() {
1605 // Set controller.[[pullAgain]] to true.
1606 self.pull_again.set(true);
1607
1608 // Return.
1609 return;
1610 }
1611
1612 // Assert: controller.[[pullAgain]] is false.
1613 assert!(!self.pull_again.get());
1614
1615 // Set controller.[[pulling]] to true.
1616 self.pulling.set(true);
1617
1618 // Let pullPromise be the result of performing controller.[[pullAlgorithm]].
1619 // Continues into the resolve and reject handling of the native handler.
1620 let global = self.global();
1621 let rooted_controller = DomRoot::from_ref(self);
1622 let controller = Controller::ReadableByteStreamController(rooted_controller.clone());
1623
1624 if let Some(underlying_source) = self.underlying_source.get() {
1625 let handler = PromiseNativeHandler::new(
1626 &global,
1627 Some(Box::new(PullAlgorithmFulfillmentHandler {
1628 controller: Dom::from_ref(&rooted_controller),
1629 })),
1630 Some(Box::new(PullAlgorithmRejectionHandler {
1631 controller: Dom::from_ref(&rooted_controller),
1632 })),
1633 can_gc,
1634 );
1635
1636 let realm = enter_realm(&*global);
1637 let comp = InRealm::Entered(&realm);
1638 let result = underlying_source
1639 .call_pull_algorithm(controller, &global, can_gc)
1640 .unwrap_or_else(|| {
1641 let promise = Promise::new(&global, can_gc);
1642 promise.resolve_native(&(), can_gc);
1643 Ok(promise)
1644 });
1645 let promise = result.unwrap_or_else(|error| {
1646 let cx = GlobalScope::get_cx();
1647 rooted!(in(*cx) let mut rval = UndefinedValue());
1648 // TODO: check if `self.global()` is the right globalscope.
1649 error
1650 .clone()
1651 .to_jsval(cx, &self.global(), rval.handle_mut(), can_gc);
1652 let promise = Promise::new(&global, can_gc);
1653 promise.reject_native(&rval.handle(), can_gc);
1654 promise
1655 });
1656 promise.append_native_handler(&handler, comp, can_gc);
1657 }
1658 }
1659
1660 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-should-call-pull>
1661 fn should_call_pull(&self) -> bool {
1662 // Let stream be controller.[[stream]].
1663 // Note: the spec does not assert that stream is not undefined here,
1664 // so we return false if it is.
1665 let stream = self.stream.get().unwrap();
1666
1667 // If stream.[[state]] is not "readable", return false.
1668 if !stream.is_readable() {
1669 return false;
1670 }
1671
1672 // If controller.[[closeRequested]] is true, return false.
1673 if self.close_requested.get() {
1674 return false;
1675 }
1676
1677 // If controller.[[started]] is false, return false.
1678 if !self.started.get() {
1679 return false;
1680 }
1681
1682 // If ! ReadableStreamHasDefaultReader(stream) is true and ! ReadableStreamGetNumReadRequests(stream) > 0
1683 // , return true.
1684 if stream.has_default_reader() && stream.get_num_read_requests() > 0 {
1685 return true;
1686 }
1687
1688 // If ! ReadableStreamHasBYOBReader(stream) is true and ! ReadableStreamGetNumReadIntoRequests(stream) > 0
1689 // , return true.
1690 if stream.has_byob_reader() && stream.get_num_read_into_requests() > 0 {
1691 return true;
1692 }
1693
1694 // Let desiredSize be ! ReadableByteStreamControllerGetDesiredSize(controller).
1695 let desired_size = self.get_desired_size();
1696
1697 // Assert: desiredSize is not null.
1698 assert!(desired_size.is_some());
1699
1700 // If desiredSize > 0, return true.
1701 if desired_size.unwrap() > 0. {
1702 return true;
1703 }
1704
1705 // Return false.
1706 false
1707 }
1708 /// <https://streams.spec.whatwg.org/#set-up-readable-byte-stream-controller>
1709 pub(crate) fn setup(
1710 &self,
1711 global: &GlobalScope,
1712 stream: DomRoot<ReadableStream>,
1713 can_gc: CanGc,
1714 ) -> Fallible<()> {
1715 // Assert: stream.[[controller]] is undefined.
1716 stream.assert_no_controller();
1717
1718 // If autoAllocateChunkSize is not undefined,
1719 if self.auto_allocate_chunk_size.is_some() {
1720 // Assert: ! IsInteger(autoAllocateChunkSize) is true. Implicit
1721 // Assert: autoAllocateChunkSize is positive. (Implicit by type.)
1722 }
1723
1724 // Set controller.[[stream]] to stream.
1725 self.stream.set(Some(&stream));
1726
1727 // Set controller.[[pullAgain]] and controller.[[pulling]] to false.
1728 self.pull_again.set(false);
1729 self.pulling.set(false);
1730
1731 // Set controller.[[byobRequest]] to null.
1732 self.byob_request.set(None);
1733
1734 // Perform ! ResetQueue(controller).
1735 self.reset_queue();
1736
1737 // Set controller.[[closeRequested]] and controller.[[started]] to false.
1738 self.close_requested.set(false);
1739 self.started.set(false);
1740
1741 // Set controller.[[strategyHWM]] to highWaterMark.
1742 // Set controller.[[pullAlgorithm]] to pullAlgorithm.
1743 // Set controller.[[cancelAlgorithm]] to cancelAlgorithm.
1744 // Set controller.[[autoAllocateChunkSize]] to autoAllocateChunkSize.
1745 // Set controller.[[pendingPullIntos]] to a new empty list.
1746 // Note: the above steps are done in `new`.
1747
1748 // Set stream.[[controller]] to controller.
1749 let rooted_byte_controller = DomRoot::from_ref(self);
1750 stream.set_byte_controller(&rooted_byte_controller);
1751
1752 if let Some(underlying_source) = rooted_byte_controller.underlying_source.get() {
1753 // Let startResult be the result of performing startAlgorithm. (This might throw an exception.)
1754 let start_result = underlying_source
1755 .call_start_algorithm(
1756 Controller::ReadableByteStreamController(rooted_byte_controller.clone()),
1757 can_gc,
1758 )
1759 .unwrap_or_else(|| {
1760 let promise = Promise::new(global, can_gc);
1761 promise.resolve_native(&(), can_gc);
1762 Ok(promise)
1763 });
1764
1765 // Let startPromise be a promise resolved with startResult.
1766 let start_promise = start_result?;
1767
1768 // Upon fulfillment of startPromise, Upon rejection of startPromise with reason r,
1769 let handler = PromiseNativeHandler::new(
1770 global,
1771 Some(Box::new(StartAlgorithmFulfillmentHandler {
1772 controller: Dom::from_ref(&rooted_byte_controller),
1773 })),
1774 Some(Box::new(StartAlgorithmRejectionHandler {
1775 controller: Dom::from_ref(&rooted_byte_controller),
1776 })),
1777 can_gc,
1778 );
1779 let realm = enter_realm(global);
1780 let comp = InRealm::Entered(&realm);
1781 start_promise.append_native_handler(&handler, comp, can_gc);
1782 };
1783
1784 Ok(())
1785 }
1786
1787 // <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamcontroller-releasesteps
1788 pub(crate) fn perform_release_steps(&self) -> Fallible<()> {
1789 // If this.[[pendingPullIntos]] is not empty,
1790 let mut pending_pull_intos = self.pending_pull_intos.borrow_mut();
1791 if !pending_pull_intos.is_empty() {
1792 // Let firstPendingPullInto be this.[[pendingPullIntos]][0].
1793 let mut first_pending_pull_into = pending_pull_intos.remove(0);
1794
1795 // Set firstPendingPullInto’s reader type to "none".
1796 first_pending_pull_into.reader_type = None;
1797
1798 // Set this.[[pendingPullIntos]] to the list « firstPendingPullInto »
1799 pending_pull_intos.clear();
1800 pending_pull_intos.push(first_pending_pull_into);
1801 }
1802 Ok(())
1803 }
1804
1805 /// <https://streams.spec.whatwg.org/#rbs-controller-private-cancel>
1806 pub(crate) fn perform_cancel_steps(
1807 &self,
1808 cx: SafeJSContext,
1809 global: &GlobalScope,
1810 reason: SafeHandleValue,
1811 can_gc: CanGc,
1812 ) -> Rc<Promise> {
1813 // Perform ! ReadableByteStreamControllerClearPendingPullIntos(this).
1814 self.clear_pending_pull_intos();
1815
1816 // Perform ! ResetQueue(this).
1817 self.reset_queue();
1818
1819 let underlying_source = self
1820 .underlying_source
1821 .get()
1822 .expect("Controller should have a source when the cancel steps are called into.");
1823
1824 // Let result be the result of performing this.[[cancelAlgorithm]], passing in reason.
1825 let result = underlying_source
1826 .call_cancel_algorithm(cx, global, reason, can_gc)
1827 .unwrap_or_else(|| {
1828 let promise = Promise::new(global, can_gc);
1829 promise.resolve_native(&(), can_gc);
1830 Ok(promise)
1831 });
1832
1833 let promise = result.unwrap_or_else(|error| {
1834 let cx = GlobalScope::get_cx();
1835 rooted!(in(*cx) let mut rval = UndefinedValue());
1836 error
1837 .clone()
1838 .to_jsval(cx, global, rval.handle_mut(), can_gc);
1839 let promise = Promise::new(global, can_gc);
1840 promise.reject_native(&rval.handle(), can_gc);
1841 promise
1842 });
1843
1844 // Perform ! ReadableByteStreamControllerClearAlgorithms(this).
1845 self.clear_algorithms();
1846
1847 // Return result(the promise).
1848 promise
1849 }
1850
1851 /// <https://streams.spec.whatwg.org/#rbs-controller-private-pull>
1852 pub(crate) fn perform_pull_steps(
1853 &self,
1854 cx: SafeJSContext,
1855 read_request: &ReadRequest,
1856 can_gc: CanGc,
1857 ) {
1858 // Let stream be this.[[stream]].
1859 let stream = self.stream.get().unwrap();
1860
1861 // Assert: ! ReadableStreamHasDefaultReader(stream) is true.
1862 assert!(stream.has_default_reader());
1863
1864 // If this.[[queueTotalSize]] > 0,
1865 if self.queue_total_size.get() > 0.0 {
1866 // Assert: ! ReadableStreamGetNumReadRequests(stream) is 0.
1867 assert_eq!(stream.get_num_read_requests(), 0);
1868
1869 // Perform ! ReadableByteStreamControllerFillReadRequestFromQueue(this, readRequest).
1870 let _ = self.fill_read_request_from_queue(cx, read_request, can_gc);
1871
1872 // Return.
1873 return;
1874 }
1875
1876 // Let autoAllocateChunkSize be this.[[autoAllocateChunkSize]].
1877 let auto_allocate_chunk_size = self.auto_allocate_chunk_size;
1878
1879 // If autoAllocateChunkSize is not undefined,
1880 if let Some(auto_allocate_chunk_size) = auto_allocate_chunk_size {
1881 // create_array_buffer_with_size
1882 // Let buffer be Construct(%ArrayBuffer%, « autoAllocateChunkSize »).
1883 match create_array_buffer_with_size(cx, auto_allocate_chunk_size as usize) {
1884 Ok(buffer) => {
1885 // Let pullIntoDescriptor be a new pull-into descriptor with
1886 // buffer buffer.[[Value]]
1887 // buffer byte length autoAllocateChunkSize
1888 // byte offset 0
1889 // byte length autoAllocateChunkSize
1890 // bytes filled 0
1891 // minimum fill 1
1892 // element size 1
1893 // view constructor %Uint8Array%
1894 // reader type "default"
1895 let pull_into_descriptor = PullIntoDescriptor {
1896 buffer,
1897 buffer_byte_length: auto_allocate_chunk_size,
1898 byte_length: auto_allocate_chunk_size,
1899 byte_offset: 0,
1900 bytes_filled: Cell::new(0),
1901 minimum_fill: 1,
1902 element_size: 1,
1903 view_constructor: Constructor::Name(Type::Uint8),
1904 reader_type: Some(ReaderType::Default),
1905 };
1906
1907 // Append pullIntoDescriptor to this.[[pendingPullIntos]].
1908 self.pending_pull_intos
1909 .borrow_mut()
1910 .push(pull_into_descriptor);
1911 },
1912 Err(error) => {
1913 // If buffer is an abrupt completion,
1914 // Perform readRequest’s error steps, given buffer.[[Value]].
1915
1916 rooted!(in(*cx) let mut rval = UndefinedValue());
1917 error
1918 .clone()
1919 .to_jsval(cx, &self.global(), rval.handle_mut(), can_gc);
1920 read_request.error_steps(rval.handle(), can_gc);
1921
1922 // Return.
1923 return;
1924 },
1925 }
1926 }
1927
1928 // Perform ! ReadableStreamAddReadRequest(stream, readRequest).
1929 stream.add_read_request(read_request);
1930
1931 // Perform ! ReadableByteStreamControllerCallPullIfNeeded(this).
1932 self.call_pull_if_needed(can_gc);
1933 }
1934
1935 /// Setting the JS object after the heap has settled down.
1936 pub(crate) fn set_underlying_source_this_object(&self, this_object: HandleObject) {
1937 if let Some(underlying_source) = self.underlying_source.get() {
1938 underlying_source.set_underlying_source_this_object(this_object);
1939 }
1940 }
1941
1942 pub(crate) fn remove_entry(&self) -> QueueEntry {
1943 self.queue
1944 .borrow_mut()
1945 .pop_front()
1946 .expect("Reader must have read request when remove is called into.")
1947 }
1948
1949 pub(crate) fn get_queue_total_size(&self) -> f64 {
1950 self.queue_total_size.get()
1951 }
1952
1953 pub(crate) fn get_pending_pull_intos_size(&self) -> usize {
1954 self.pending_pull_intos.borrow().len()
1955 }
1956}
1957
1958impl ReadableByteStreamControllerMethods<crate::DomTypeHolder> for ReadableByteStreamController {
1959 /// <https://streams.spec.whatwg.org/#rbs-controller-byob-request>
1960 fn GetByobRequest(
1961 &self,
1962 can_gc: CanGc,
1963 ) -> Fallible<Option<DomRoot<ReadableStreamBYOBRequest>>> {
1964 let cx = GlobalScope::get_cx();
1965 // Return ! ReadableByteStreamControllerGetBYOBRequest(this).
1966 self.get_byob_request(cx, can_gc)
1967 }
1968
1969 /// <https://streams.spec.whatwg.org/#rbs-controller-desired-size>
1970 fn GetDesiredSize(&self) -> Option<f64> {
1971 // Return ! ReadableByteStreamControllerGetDesiredSize(this).
1972 self.get_desired_size()
1973 }
1974
1975 /// <https://streams.spec.whatwg.org/#rbs-controller-close>
1976 fn Close(&self, can_gc: CanGc) -> Fallible<()> {
1977 let cx = GlobalScope::get_cx();
1978 // If this.[[closeRequested]] is true, throw a TypeError exception.
1979 if self.close_requested.get() {
1980 return Err(Error::Type("closeRequested is true".to_owned()));
1981 }
1982
1983 // If this.[[stream]].[[state]] is not "readable", throw a TypeError exception.
1984 if !self.stream.get().unwrap().is_readable() {
1985 return Err(Error::Type("stream is not readable".to_owned()));
1986 }
1987
1988 // Perform ? ReadableByteStreamControllerClose(this).
1989 self.close(cx, can_gc)
1990 }
1991
1992 /// <https://streams.spec.whatwg.org/#rbs-controller-enqueue>
1993 fn Enqueue(
1994 &self,
1995 chunk: js::gc::CustomAutoRooterGuard<js::typedarray::ArrayBufferView>,
1996 can_gc: CanGc,
1997 ) -> Fallible<()> {
1998 let cx = GlobalScope::get_cx();
1999
2000 let chunk = HeapBufferSource::<ArrayBufferViewU8>::from_view(chunk);
2001
2002 // If chunk.[[ByteLength]] is 0, throw a TypeError exception.
2003 if chunk.byte_length() == 0 {
2004 return Err(Error::Type("chunk.ByteLength is 0".to_owned()));
2005 }
2006
2007 // If chunk.[[ViewedArrayBuffer]].[[ByteLength]] is 0, throw a TypeError exception.
2008 if chunk.viewed_buffer_array_byte_length(cx) == 0 {
2009 return Err(Error::Type(
2010 "chunk.ViewedArrayBuffer.ByteLength is 0".to_owned(),
2011 ));
2012 }
2013
2014 // If this.[[closeRequested]] is true, throw a TypeError exception.
2015 if self.close_requested.get() {
2016 return Err(Error::Type("closeRequested is true".to_owned()));
2017 }
2018
2019 // If this.[[stream]].[[state]] is not "readable", throw a TypeError exception.
2020 if !self.stream.get().unwrap().is_readable() {
2021 return Err(Error::Type("stream is not readable".to_owned()));
2022 }
2023
2024 // Return ? ReadableByteStreamControllerEnqueue(this, chunk).
2025 self.enqueue(cx, chunk, can_gc)
2026 }
2027
2028 /// <https://streams.spec.whatwg.org/#rbs-controller-error>
2029 fn Error(&self, _cx: SafeJSContext, e: SafeHandleValue, can_gc: CanGc) -> Fallible<()> {
2030 // Perform ! ReadableByteStreamControllerError(this, e).
2031 self.error(e, can_gc);
2032 Ok(())
2033 }
2034}