script/dom/stream/readablebytestreamcontroller.rs
1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4
5use std::cell::Cell;
6use std::cmp::min;
7use std::collections::VecDeque;
8use std::rc::Rc;
9
10use dom_struct::dom_struct;
11use js::jsapi::{Heap, Type};
12use js::jsval::UndefinedValue;
13use js::realm::CurrentRealm;
14use js::rust::{HandleObject, HandleValue as SafeHandleValue, HandleValue};
15use js::typedarray::{ArrayBufferU8, ArrayBufferViewU8};
16
17use super::readablestreambyobreader::ReadIntoRequest;
18use super::readablestreamdefaultreader::ReadRequest;
19use super::underlyingsourcecontainer::{UnderlyingSourceContainer, UnderlyingSourceType};
20use crate::dom::bindings::buffer_source::{
21 Constructor, HeapBufferSource, byte_size, create_array_buffer_with_size,
22 create_buffer_source_with_constructor,
23};
24use crate::dom::bindings::cell::DomRefCell;
25use crate::dom::bindings::codegen::Bindings::ReadableByteStreamControllerBinding::ReadableByteStreamControllerMethods;
26use crate::dom::bindings::codegen::UnionTypes::ReadableStreamDefaultControllerOrReadableByteStreamController as Controller;
27use crate::dom::bindings::error::{Error, ErrorToJsval, Fallible};
28use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object};
29use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom};
30use crate::dom::bindings::trace::RootedTraceableBox;
31use crate::dom::globalscope::GlobalScope;
32use crate::dom::promise::Promise;
33use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
34use crate::dom::stream::readablestream::ReadableStream;
35use crate::dom::stream::readablestreambyobrequest::ReadableStreamBYOBRequest;
36use crate::realms::{InRealm, enter_realm};
37use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
38
39/// <https://streams.spec.whatwg.org/#readable-byte-stream-queue-entry>
40#[derive(JSTraceable, MallocSizeOf)]
41pub(crate) struct QueueEntry {
42 /// <https://streams.spec.whatwg.org/#readable-byte-stream-queue-entry-buffer>
43 #[ignore_malloc_size_of = "HeapBufferSource"]
44 buffer: HeapBufferSource<ArrayBufferU8>,
45 /// <https://streams.spec.whatwg.org/#readable-byte-stream-queue-entry-byte-offset>
46 byte_offset: usize,
47 /// <https://streams.spec.whatwg.org/#readable-byte-stream-queue-entry-byte-length>
48 byte_length: usize,
49}
50
51impl QueueEntry {
52 pub(crate) fn new(
53 buffer: HeapBufferSource<ArrayBufferU8>,
54 byte_offset: usize,
55 byte_length: usize,
56 ) -> QueueEntry {
57 QueueEntry {
58 buffer,
59 byte_offset,
60 byte_length,
61 }
62 }
63}
64
65#[derive(Debug, Eq, JSTraceable, MallocSizeOf, PartialEq)]
66pub(crate) enum ReaderType {
67 /// <https://streams.spec.whatwg.org/#readablestreambyobreader>
68 Byob,
69 /// <https://streams.spec.whatwg.org/#readablestreamdefaultreader>
70 Default,
71}
72
73/// <https://streams.spec.whatwg.org/#pull-into-descriptor>
74#[derive(Eq, JSTraceable, MallocSizeOf, PartialEq)]
75pub(crate) struct PullIntoDescriptor {
76 #[ignore_malloc_size_of = "HeapBufferSource"]
77 /// <https://streams.spec.whatwg.org/#pull-into-descriptor-buffer>
78 buffer: HeapBufferSource<ArrayBufferU8>,
79 /// <https://streams.spec.whatwg.org/#pull-into-descriptor-buffer-byte-length>
80 buffer_byte_length: u64,
81 /// <https://streams.spec.whatwg.org/#pull-into-descriptor-byte-offset>
82 byte_offset: u64,
83 /// <https://streams.spec.whatwg.org/#pull-into-descriptor-byte-length>
84 byte_length: u64,
85 /// <https://streams.spec.whatwg.org/#pull-into-descriptor-bytes-filled>
86 bytes_filled: Cell<u64>,
87 /// <https://streams.spec.whatwg.org/#pull-into-descriptor-minimum-fill>
88 minimum_fill: u64,
89 /// <https://streams.spec.whatwg.org/#pull-into-descriptor-element-size>
90 element_size: u64,
91 /// <https://streams.spec.whatwg.org/#pull-into-descriptor-view-constructor>
92 view_constructor: Constructor,
93 /// <https://streams.spec.whatwg.org/#pull-into-descriptor-reader-type>
94 reader_type: Option<ReaderType>,
95}
96
97/// The fulfillment handler for
98/// <https://streams.spec.whatwg.org/#dom-underlyingsource-start>
99#[derive(Clone, JSTraceable, MallocSizeOf)]
100#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
101struct StartAlgorithmFulfillmentHandler {
102 controller: Dom<ReadableByteStreamController>,
103}
104
105impl Callback for StartAlgorithmFulfillmentHandler {
106 /// Continuation of <https://streams.spec.whatwg.org/#set-up-readable-byte-stream-controller>
107 /// Upon fulfillment of startPromise,
108 fn callback(&self, cx: &mut CurrentRealm, _v: HandleValue) {
109 let can_gc = CanGc::from_cx(cx);
110 // Set controller.[[started]] to true.
111 self.controller.started.set(true);
112
113 // Assert: controller.[[pulling]] is false.
114 assert!(!self.controller.pulling.get());
115
116 // Assert: controller.[[pullAgain]] is false.
117 assert!(!self.controller.pull_again.get());
118
119 // Perform ! ReadableByteStreamControllerCallPullIfNeeded(controller).
120 self.controller.call_pull_if_needed(can_gc);
121 }
122}
123
124/// The rejection handler for
125/// <https://streams.spec.whatwg.org/#dom-underlyingsource-start>
126#[derive(Clone, JSTraceable, MallocSizeOf)]
127#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
128struct StartAlgorithmRejectionHandler {
129 controller: Dom<ReadableByteStreamController>,
130}
131
132impl Callback for StartAlgorithmRejectionHandler {
133 /// Continuation of <https://streams.spec.whatwg.org/#set-up-readable-byte-stream-controller>
134 /// Upon rejection of startPromise with reason r,
135 fn callback(&self, cx: &mut CurrentRealm, v: HandleValue) {
136 let can_gc = CanGc::from_cx(cx);
137 // Perform ! ReadableByteStreamControllerError(controller, r).
138 self.controller.error(v, can_gc);
139 }
140}
141
142/// The fulfillment handler for
143/// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-call-pull-if-needed>
144#[derive(Clone, JSTraceable, MallocSizeOf)]
145#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
146struct PullAlgorithmFulfillmentHandler {
147 controller: Dom<ReadableByteStreamController>,
148}
149
150impl Callback for PullAlgorithmFulfillmentHandler {
151 /// Continuation of <https://streams.spec.whatwg.org/#readable-byte-stream-controller-call-pull-if-needed>
152 /// Upon fulfillment of pullPromise
153 fn callback(&self, cx: &mut CurrentRealm, _v: HandleValue) {
154 let can_gc = CanGc::from_cx(cx);
155 // Set controller.[[pulling]] to false.
156 self.controller.pulling.set(false);
157
158 // If controller.[[pullAgain]] is true,
159 if self.controller.pull_again.get() {
160 // Set controller.[[pullAgain]] to false.
161 self.controller.pull_again.set(false);
162
163 // Perform ! ReadableByteStreamControllerCallPullIfNeeded(controller).
164 self.controller.call_pull_if_needed(can_gc);
165 }
166 }
167}
168
169/// The rejection handler for
170/// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-call-pull-if-needed>
171#[derive(Clone, JSTraceable, MallocSizeOf)]
172#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
173struct PullAlgorithmRejectionHandler {
174 controller: Dom<ReadableByteStreamController>,
175}
176
177impl Callback for PullAlgorithmRejectionHandler {
178 /// Continuation of <https://streams.spec.whatwg.org/#readable-stream-byte-controller-call-pull-if-needed>
179 /// Upon rejection of pullPromise with reason e.
180 fn callback(&self, cx: &mut CurrentRealm, v: HandleValue) {
181 let can_gc = CanGc::from_cx(cx);
182 // Perform ! ReadableByteStreamControllerError(controller, e).
183 self.controller.error(v, can_gc);
184 }
185}
186
187/// <https://streams.spec.whatwg.org/#readablebytestreamcontroller>
188#[dom_struct]
189pub(crate) struct ReadableByteStreamController {
190 reflector_: Reflector,
191 /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-autoallocatechunksize>
192 auto_allocate_chunk_size: Option<u64>,
193 /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-stream>
194 stream: MutNullableDom<ReadableStream>,
195 /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-strategyhwm>
196 strategy_hwm: f64,
197 /// A mutable reference to the underlying source is used to implement these two
198 /// internal slots:
199 ///
200 /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-pullalgorithm>
201 /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-cancelalgorithm>
202 underlying_source: MutNullableDom<UnderlyingSourceContainer>,
203 /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-queue>
204 queue: DomRefCell<VecDeque<QueueEntry>>,
205 /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-queuetotalsize>
206 queue_total_size: Cell<f64>,
207 /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-byobrequest>
208 byob_request: MutNullableDom<ReadableStreamBYOBRequest>,
209 /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-pendingpullintos>
210 pending_pull_intos: DomRefCell<Vec<PullIntoDescriptor>>,
211 /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-closerequested>
212 close_requested: Cell<bool>,
213 /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-started>
214 started: Cell<bool>,
215 /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-pulling>
216 pulling: Cell<bool>,
217 /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-pullalgorithm>
218 pull_again: Cell<bool>,
219}
220
221impl ReadableByteStreamController {
222 #[cfg_attr(crown, expect(crown::unrooted_must_root))]
223 fn new_inherited(
224 underlying_source_type: UnderlyingSourceType,
225 strategy_hwm: f64,
226 global: &GlobalScope,
227 can_gc: CanGc,
228 ) -> ReadableByteStreamController {
229 let underlying_source_container =
230 UnderlyingSourceContainer::new(global, underlying_source_type, can_gc);
231 let auto_allocate_chunk_size = underlying_source_container.auto_allocate_chunk_size();
232 ReadableByteStreamController {
233 reflector_: Reflector::new(),
234 byob_request: MutNullableDom::new(None),
235 stream: MutNullableDom::new(None),
236 underlying_source: MutNullableDom::new(Some(&*underlying_source_container)),
237 auto_allocate_chunk_size,
238 pending_pull_intos: DomRefCell::new(Vec::new()),
239 strategy_hwm,
240 close_requested: Default::default(),
241 queue: DomRefCell::new(Default::default()),
242 queue_total_size: Default::default(),
243 started: Default::default(),
244 pulling: Default::default(),
245 pull_again: Default::default(),
246 }
247 }
248
249 #[cfg_attr(crown, expect(crown::unrooted_must_root))]
250 pub(crate) fn new(
251 underlying_source_type: UnderlyingSourceType,
252 strategy_hwm: f64,
253 global: &GlobalScope,
254 can_gc: CanGc,
255 ) -> DomRoot<ReadableByteStreamController> {
256 reflect_dom_object(
257 Box::new(ReadableByteStreamController::new_inherited(
258 underlying_source_type,
259 strategy_hwm,
260 global,
261 can_gc,
262 )),
263 global,
264 can_gc,
265 )
266 }
267
268 #[allow(dead_code)]
269 pub(crate) fn set_stream(&self, stream: &ReadableStream) {
270 self.stream.set(Some(stream))
271 }
272
273 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-pull-into>
274 pub(crate) fn perform_pull_into(
275 &self,
276 cx: SafeJSContext,
277 read_into_request: &ReadIntoRequest,
278 view: HeapBufferSource<ArrayBufferViewU8>,
279 min: u64,
280 can_gc: CanGc,
281 ) {
282 // Let stream be controller.[[stream]].
283 let stream = self.stream.get().unwrap();
284
285 // Let elementSize be 1.
286 let mut element_size = 1;
287
288 // Let ctor be %DataView%.
289 let mut ctor = Constructor::DataView;
290
291 // If view has a [[TypedArrayName]] internal slot (i.e., it is not a DataView),
292 if view.has_typed_array_name() {
293 // Set elementSize to the element size specified in the
294 // typed array constructors table for view.[[TypedArrayName]].
295 let view_typw = view.get_array_buffer_view_type();
296 element_size = byte_size(view_typw);
297
298 // Set ctor to the constructor specified in the typed array constructors table for view.[[TypedArrayName]].
299 ctor = Constructor::Name(view_typw);
300 }
301
302 // Let minimumFill be min × elementSize.
303 let minimum_fill = min * element_size;
304
305 // Assert: minimumFill ≥ 0 and minimumFill ≤ view.[[ByteLength]].
306 assert!(minimum_fill <= (view.byte_length() as u64));
307
308 // Assert: the remainder after dividing minimumFill by elementSize is 0.
309 assert_eq!(minimum_fill % element_size, 0);
310
311 // Let byteOffset be view.[[ByteOffset]].
312 let byte_offset = view.get_byte_offset();
313
314 // Let byteLength be view.[[ByteLength]].
315 let byte_length = view.byte_length();
316
317 // Let bufferResult be TransferArrayBuffer(view.[[ViewedArrayBuffer]]).
318 match view
319 .get_array_buffer_view_buffer(cx)
320 .transfer_array_buffer(cx)
321 {
322 Ok(buffer) => {
323 // Let buffer be bufferResult.[[Value]].
324 // Let pullIntoDescriptor be a new pull-into descriptor with
325 // buffer buffer
326 // buffer byte length buffer.[[ArrayBufferByteLength]]
327 // byte offset byteOffset
328 // byte length byteLength
329 // bytes filled 0
330 // minimum fill minimumFill
331 // element size elementSize
332 // view constructor ctor
333 // reader type "byob"
334 let buffer_byte_length = buffer.byte_length();
335 let pull_into_descriptor = PullIntoDescriptor {
336 buffer,
337 buffer_byte_length: buffer_byte_length as u64,
338 byte_offset: byte_offset as u64,
339 byte_length: byte_length as u64,
340 bytes_filled: Cell::new(0),
341 minimum_fill,
342 element_size,
343 view_constructor: ctor.clone(),
344 reader_type: Some(ReaderType::Byob),
345 };
346
347 // If controller.[[pendingPullIntos]] is not empty,
348 {
349 let mut pending_pull_intos = self.pending_pull_intos.borrow_mut();
350 if !pending_pull_intos.is_empty() {
351 // Append pullIntoDescriptor to controller.[[pendingPullIntos]].
352 pending_pull_intos.push(pull_into_descriptor);
353
354 // Perform ! ReadableStreamAddReadIntoRequest(stream, readIntoRequest).
355 stream.add_read_into_request(read_into_request);
356
357 // Return.
358 return;
359 }
360 }
361
362 // If stream.[[state]] is "closed",
363 if stream.is_closed() {
364 // Let emptyView be ! Construct(ctor, « pullIntoDescriptor’s buffer,
365 // pullIntoDescriptor’s byte offset, 0 »).
366 if let Ok(empty_view) = create_buffer_source_with_constructor(
367 cx,
368 &ctor,
369 &pull_into_descriptor.buffer,
370 pull_into_descriptor.byte_offset as usize,
371 0,
372 ) {
373 // Perform readIntoRequest’s close steps, given emptyView.
374 let result = RootedTraceableBox::new(Heap::default());
375 rooted!(in(*cx) let mut view_value = UndefinedValue());
376 empty_view.get_buffer_view_value(cx, view_value.handle_mut());
377 result.set(*view_value);
378
379 read_into_request.close_steps(Some(result), can_gc);
380
381 // Return.
382 return;
383 } else {
384 return;
385 }
386 }
387
388 // If controller.[[queueTotalSize]] > 0,
389 if self.queue_total_size.get() > 0.0 {
390 // If ! ReadableByteStreamControllerFillPullIntoDescriptorFromQueue(
391 // controller, pullIntoDescriptor) is true,
392 if self.fill_pull_into_descriptor_from_queue(cx, &pull_into_descriptor) {
393 // Let filledView be ! ReadableByteStreamControllerConvertPullIntoDescriptor(
394 // pullIntoDescriptor).
395 if let Ok(filled_view) =
396 self.convert_pull_into_descriptor(cx, &pull_into_descriptor)
397 {
398 // Perform ! ReadableByteStreamControllerHandleQueueDrain(controller).
399 self.handle_queue_drain(can_gc);
400
401 // Perform readIntoRequest’s chunk steps, given filledView.
402 let result = RootedTraceableBox::new(Heap::default());
403 rooted!(in(*cx) let mut view_value = UndefinedValue());
404 filled_view.get_buffer_view_value(cx, view_value.handle_mut());
405 result.set(*view_value);
406 read_into_request.chunk_steps(result, can_gc);
407
408 // Return.
409 return;
410 } else {
411 return;
412 }
413 }
414
415 // If controller.[[closeRequested]] is true,
416 if self.close_requested.get() {
417 // Let e be a new TypeError exception.
418 rooted!(in(*cx) let mut error = UndefinedValue());
419 Error::Type(c"close requested".to_owned()).to_jsval(
420 cx,
421 &self.global(),
422 error.handle_mut(),
423 can_gc,
424 );
425
426 // Perform ! ReadableByteStreamControllerError(controller, e).
427 self.error(error.handle(), can_gc);
428
429 // Perform readIntoRequest’s error steps, given e.
430 read_into_request.error_steps(error.handle(), can_gc);
431
432 // Return.
433 return;
434 }
435 }
436
437 // Append pullIntoDescriptor to controller.[[pendingPullIntos]].
438 {
439 self.pending_pull_intos
440 .borrow_mut()
441 .push(pull_into_descriptor);
442 }
443 // Perform ! ReadableStreamAddReadIntoRequest(stream, readIntoRequest).
444 stream.add_read_into_request(read_into_request);
445
446 // Perform ! ReadableByteStreamControllerCallPullIfNeeded(controller).
447 self.call_pull_if_needed(can_gc);
448 },
449 Err(error) => {
450 // If bufferResult is an abrupt completion,
451
452 // Perform readIntoRequest’s error steps, given bufferResult.[[Value]].
453 rooted!(in(*cx) let mut rval = UndefinedValue());
454 error.to_jsval(cx, &self.global(), rval.handle_mut(), can_gc);
455 read_into_request.error_steps(rval.handle(), can_gc);
456
457 // Return.
458 },
459 }
460 }
461
462 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-respond>
463 pub(crate) fn respond(
464 &self,
465 cx: SafeJSContext,
466 bytes_written: u64,
467 can_gc: CanGc,
468 ) -> Fallible<()> {
469 {
470 // Assert: controller.[[pendingPullIntos]] is not empty.
471 let mut pending_pull_intos = self.pending_pull_intos.borrow_mut();
472 assert!(!pending_pull_intos.is_empty());
473
474 // Let firstDescriptor be controller.[[pendingPullIntos]][0].
475 let first_descriptor = pending_pull_intos.first_mut().unwrap();
476
477 // Let state be controller.[[stream]].[[state]].
478 let stream = self.stream.get().unwrap();
479
480 // If state is "closed",
481 if stream.is_closed() {
482 // If bytesWritten is not 0, throw a TypeError exception.
483 if bytes_written != 0 {
484 return Err(Error::Type(
485 c"bytesWritten not zero on closed stream".to_owned(),
486 ));
487 }
488 } else {
489 // Assert: state is "readable".
490 assert!(stream.is_readable());
491
492 // If bytesWritten is 0, throw a TypeError exception.
493 if bytes_written == 0 {
494 return Err(Error::Type(c"bytesWritten is 0".to_owned()));
495 }
496
497 // If firstDescriptor’s bytes filled + bytesWritten > firstDescriptor’s byte length,
498 // throw a RangeError exception.
499 if first_descriptor.bytes_filled.get() + bytes_written >
500 first_descriptor.byte_length
501 {
502 return Err(Error::Range(
503 c"bytes filled + bytesWritten > byte length".to_owned(),
504 ));
505 }
506 }
507
508 // Set firstDescriptor’s buffer to ! TransferArrayBuffer(firstDescriptor’s buffer).
509 first_descriptor.buffer = first_descriptor
510 .buffer
511 .transfer_array_buffer(cx)
512 .expect("TransferArrayBuffer failed");
513 }
514
515 // Perform ? ReadableByteStreamControllerRespondInternal(controller, bytesWritten).
516 self.respond_internal(cx, bytes_written, can_gc)
517 }
518
519 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-respond-internal>
520 pub(crate) fn respond_internal(
521 &self,
522 cx: SafeJSContext,
523 bytes_written: u64,
524 can_gc: CanGc,
525 ) -> Fallible<()> {
526 {
527 // Let firstDescriptor be controller.[[pendingPullIntos]][0].
528 let pending_pull_intos = self.pending_pull_intos.borrow();
529 let first_descriptor = pending_pull_intos.first().unwrap();
530
531 // Assert: ! CanTransferArrayBuffer(firstDescriptor’s buffer) is true
532 assert!(first_descriptor.buffer.can_transfer_array_buffer(cx));
533 }
534
535 // Perform ! ReadableByteStreamControllerInvalidateBYOBRequest(controller).
536 self.invalidate_byob_request();
537
538 // Let state be controller.[[stream]].[[state]].
539 let stream = self.stream.get().unwrap();
540
541 // If state is "closed",
542 if stream.is_closed() {
543 // Assert: bytesWritten is 0.
544 assert_eq!(bytes_written, 0);
545
546 // Perform ! ReadableByteStreamControllerRespondInClosedState(controller, firstDescriptor).
547 self.respond_in_closed_state(cx, can_gc)
548 .expect("respond_in_closed_state failed");
549 } else {
550 // Assert: state is "readable".
551 assert!(stream.is_readable());
552
553 // Assert: bytesWritten > 0.
554 assert!(bytes_written > 0);
555
556 // Perform ? ReadableByteStreamControllerRespondInReadableState(controller, bytesWritten, firstDescriptor).
557 self.respond_in_readable_state(cx, bytes_written, can_gc)?;
558 }
559
560 // Perform ! ReadableByteStreamControllerCallPullIfNeeded(controller).
561 self.call_pull_if_needed(can_gc);
562
563 Ok(())
564 }
565
566 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-respond-in-closed-state>
567 pub(crate) fn respond_in_closed_state(&self, cx: SafeJSContext, can_gc: CanGc) -> Fallible<()> {
568 let pending_pull_intos = self.pending_pull_intos.borrow();
569 let first_descriptor = pending_pull_intos.first().unwrap();
570
571 // Assert: the remainder after dividing firstDescriptor’s bytes filled
572 // by firstDescriptor’s element size is 0.
573 assert_eq!(
574 first_descriptor.bytes_filled.get() % first_descriptor.element_size,
575 0
576 );
577
578 // If firstDescriptor’s reader type is "none",
579 // perform ! ReadableByteStreamControllerShiftPendingPullInto(controller).
580 let reader_type = first_descriptor.reader_type.is_none();
581
582 // needed to drop the borrow and avoid BorrowMutError
583 drop(pending_pull_intos);
584
585 if reader_type {
586 self.shift_pending_pull_into();
587 }
588
589 // Let stream be controller.[[stream]].
590 let stream = self.stream.get().unwrap();
591
592 // If ! ReadableStreamHasBYOBReader(stream) is true,
593 if stream.has_byob_reader() {
594 // Let filledPullIntos be a new empty list.
595 let mut filled_pull_intos = Vec::new();
596
597 // While filledPullIntos’s size < ! ReadableStreamGetNumReadIntoRequests(stream),
598 while filled_pull_intos.len() < stream.get_num_read_into_requests() {
599 // Let pullIntoDescriptor be ! ReadableByteStreamControllerShiftPendingPullInto(controller).
600 let pull_into_descriptor = self.shift_pending_pull_into();
601
602 // Append pullIntoDescriptor to filledPullIntos.
603 filled_pull_intos.push(pull_into_descriptor);
604 }
605
606 // For each filledPullInto of filledPullIntos,
607 for filled_pull_into in filled_pull_intos {
608 // Perform ! ReadableByteStreamControllerCommitPullIntoDescriptor(stream, filledPullInto).
609 self.commit_pull_into_descriptor(cx, &filled_pull_into, can_gc)
610 .expect("commit_pull_into_descriptor failed");
611 }
612 }
613
614 Ok(())
615 }
616
617 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-respond-in-readable-state>
618 pub(crate) fn respond_in_readable_state(
619 &self,
620 cx: SafeJSContext,
621 bytes_written: u64,
622 can_gc: CanGc,
623 ) -> Fallible<()> {
624 let pending_pull_intos = self.pending_pull_intos.borrow();
625 let first_descriptor = pending_pull_intos.first().unwrap();
626
627 // Assert: pullIntoDescriptor’s bytes filled + bytesWritten ≤ pullIntoDescriptor’s byte length.
628 assert!(
629 first_descriptor.bytes_filled.get() + bytes_written <= first_descriptor.byte_length
630 );
631
632 // Perform ! ReadableByteStreamControllerFillHeadPullIntoDescriptor(
633 // controller, bytesWritten, pullIntoDescriptor).
634 self.fill_head_pull_into_descriptor(bytes_written, first_descriptor);
635
636 // If pullIntoDescriptor’s reader type is "none",
637 if first_descriptor.reader_type.is_none() {
638 // needed to drop the borrow and avoid BorrowMutError
639 drop(pending_pull_intos);
640
641 // Perform ? ReadableByteStreamControllerEnqueueDetachedPullIntoToQueue(controller, pullIntoDescriptor).
642 self.enqueue_detached_pull_into_to_queue(cx, can_gc)?;
643
644 // Let filledPullIntos be the result of performing
645 // ! ReadableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(controller).
646 let filled_pull_intos = self.process_pull_into_descriptors_using_queue(cx);
647
648 // For each filledPullInto of filledPullIntos,
649 for filled_pull_into in filled_pull_intos {
650 // Perform ! ReadableByteStreamControllerCommitPullIntoDescriptor(controller.[[stream]]
651 // , filledPullInto).
652 self.commit_pull_into_descriptor(cx, &filled_pull_into, can_gc)
653 .expect("commit_pull_into_descriptor failed");
654 }
655
656 // Return.
657 return Ok(());
658 }
659
660 // If pullIntoDescriptor’s bytes filled < pullIntoDescriptor’s minimum fill, return.
661 if first_descriptor.bytes_filled.get() < first_descriptor.minimum_fill {
662 return Ok(());
663 }
664
665 // needed to drop the borrow and avoid BorrowMutError
666 drop(pending_pull_intos);
667
668 // Perform ! ReadableByteStreamControllerShiftPendingPullInto(controller).
669 let pull_into_descriptor = self.shift_pending_pull_into();
670
671 // Let remainderSize be the remainder after dividing pullIntoDescriptor’s bytes
672 // filled by pullIntoDescriptor’s element size.
673 let remainder_size =
674 pull_into_descriptor.bytes_filled.get() % pull_into_descriptor.element_size;
675
676 // If remainderSize > 0,
677 if remainder_size > 0 {
678 // Let end be pullIntoDescriptor’s byte offset + pullIntoDescriptor’s bytes filled.
679 let end = pull_into_descriptor.byte_offset + pull_into_descriptor.bytes_filled.get();
680
681 // Perform ? ReadableByteStreamControllerEnqueueClonedChunkToQueue(controller,
682 // pullIntoDescriptor’s buffer, end − remainderSize, remainderSize).
683 self.enqueue_cloned_chunk_to_queue(
684 cx,
685 &pull_into_descriptor.buffer,
686 end - remainder_size,
687 remainder_size,
688 can_gc,
689 )?;
690 }
691
692 // Set pullIntoDescriptor’s bytes filled to pullIntoDescriptor’s bytes filled − remainderSize.
693 pull_into_descriptor
694 .bytes_filled
695 .set(pull_into_descriptor.bytes_filled.get() - remainder_size);
696
697 // Let filledPullIntos be the result of performing
698 // ! ReadableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(controller).
699 let filled_pull_intos = self.process_pull_into_descriptors_using_queue(cx);
700
701 // Perform ! ReadableByteStreamControllerCommitPullIntoDescriptor(controller.[[stream]], pullIntoDescriptor).
702 self.commit_pull_into_descriptor(cx, &pull_into_descriptor, can_gc)
703 .expect("commit_pull_into_descriptor failed");
704
705 // For each filledPullInto of filledPullIntos,
706 for filled_pull_into in filled_pull_intos {
707 // Perform ! ReadableByteStreamControllerCommitPullIntoDescriptor(controller.[[stream]], filledPullInto).
708 self.commit_pull_into_descriptor(cx, &filled_pull_into, can_gc)
709 .expect("commit_pull_into_descriptor failed");
710 }
711
712 Ok(())
713 }
714
715 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-respond-with-new-view>
716 pub(crate) fn respond_with_new_view(
717 &self,
718 cx: SafeJSContext,
719 view: HeapBufferSource<ArrayBufferViewU8>,
720 can_gc: CanGc,
721 ) -> Fallible<()> {
722 let view_byte_length;
723 {
724 // Assert: controller.[[pendingPullIntos]] is not empty.
725 let mut pending_pull_intos = self.pending_pull_intos.borrow_mut();
726 assert!(!pending_pull_intos.is_empty());
727
728 // Assert: ! IsDetachedBuffer(view.[[ViewedArrayBuffer]]) is false.
729 assert!(!view.is_detached_buffer(cx));
730
731 // Let firstDescriptor be controller.[[pendingPullIntos]][0].
732 let first_descriptor = pending_pull_intos.first_mut().unwrap();
733
734 // Let state be controller.[[stream]].[[state]].
735 let stream = self.stream.get().unwrap();
736
737 // If state is "closed",
738 if stream.is_closed() {
739 // If view.[[ByteLength]] is not 0, throw a TypeError exception.
740 if view.byte_length() != 0 {
741 return Err(Error::Type(c"view byte length is not 0".to_owned()));
742 }
743 } else {
744 // Assert: state is "readable".
745 assert!(stream.is_readable());
746
747 // If view.[[ByteLength]] is 0, throw a TypeError exception.
748 if view.byte_length() == 0 {
749 return Err(Error::Type(c"view byte length is 0".to_owned()));
750 }
751 }
752
753 // If firstDescriptor’s byte offset + firstDescriptor’ bytes filled is not view.[[ByteOffset]],
754 // throw a RangeError exception.
755 if first_descriptor.byte_offset + first_descriptor.bytes_filled.get() !=
756 (view.get_byte_offset() as u64)
757 {
758 return Err(Error::Range(
759 c"firstDescriptor's byte offset + bytes filled is not view byte offset"
760 .to_owned(),
761 ));
762 }
763
764 // If firstDescriptor’s buffer byte length is not view.[[ViewedArrayBuffer]].[[ByteLength]],
765 // throw a RangeError exception.
766 if first_descriptor.buffer_byte_length !=
767 (view.viewed_buffer_array_byte_length(cx) as u64)
768 {
769 return Err(Error::Range(
770 c"firstDescriptor's buffer byte length is not view viewed buffer array byte length"
771 .to_owned(),
772 ));
773 }
774
775 // If firstDescriptor’s bytes filled + view.[[ByteLength]] > firstDescriptor’s byte length,
776 // throw a RangeError exception.
777 if first_descriptor.bytes_filled.get() + (view.byte_length()) as u64 >
778 first_descriptor.byte_length
779 {
780 return Err(Error::Range(
781 c"bytes filled + view byte length > byte length".to_owned(),
782 ));
783 }
784
785 // Let viewByteLength be view.[[ByteLength]].
786 view_byte_length = view.byte_length();
787
788 // Set firstDescriptor’s buffer to ? TransferArrayBuffer(view.[[ViewedArrayBuffer]]).
789 first_descriptor.buffer = view
790 .get_array_buffer_view_buffer(cx)
791 .transfer_array_buffer(cx)?;
792 }
793
794 // Perform ? ReadableByteStreamControllerRespondInternal(controller, viewByteLength).
795 self.respond_internal(cx, view_byte_length as u64, can_gc)
796 }
797
798 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-get-desired-size>
799 pub(crate) fn get_desired_size(&self) -> Option<f64> {
800 // Let state be controller.[[stream]].[[state]].
801 let stream = self.stream.get()?;
802
803 // If state is "errored", return null.
804 if stream.is_errored() {
805 return None;
806 }
807
808 // If state is "closed", return 0.
809 if stream.is_closed() {
810 return Some(0.0);
811 }
812
813 // Return controller.[[strategyHWM]] − controller.[[queueTotalSize]].
814 Some(self.strategy_hwm - self.queue_total_size.get())
815 }
816
817 /// <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamcontrollergetbyobrequest>
818 pub(crate) fn get_byob_request(
819 &self,
820 cx: SafeJSContext,
821 can_gc: CanGc,
822 ) -> Fallible<Option<DomRoot<ReadableStreamBYOBRequest>>> {
823 // If controller.[[byobRequest]] is null and controller.[[pendingPullIntos]] is not empty,
824 let pending_pull_intos = self.pending_pull_intos.borrow();
825 if self.byob_request.get().is_none() && !pending_pull_intos.is_empty() {
826 // Let firstDescriptor be controller.[[pendingPullIntos]][0].
827 let first_descriptor = pending_pull_intos.first().unwrap();
828 // Let view be ! Construct(%Uint8Array%, « firstDescriptor’s buffer,
829 // firstDescriptor’s byte offset + firstDescriptor’s bytes filled,
830 // firstDescriptor’s byte length − firstDescriptor’s bytes filled »).
831
832 let byte_offset = first_descriptor.byte_offset + first_descriptor.bytes_filled.get();
833 let byte_length = first_descriptor.byte_length - first_descriptor.bytes_filled.get();
834
835 let view = create_buffer_source_with_constructor(
836 cx,
837 &Constructor::Name(Type::Uint8),
838 &first_descriptor.buffer,
839 byte_offset as usize,
840 byte_length as usize,
841 )
842 .expect("Construct Uint8Array failed");
843
844 // Let byobRequest be a new ReadableStreamBYOBRequest.
845 let byob_request = ReadableStreamBYOBRequest::new(&self.global(), can_gc);
846
847 // Set byobRequest.[[controller]] to controller.
848 byob_request.set_controller(Some(&DomRoot::from_ref(self)));
849
850 // Set byobRequest.[[view]] to view.
851 byob_request.set_view(Some(view));
852
853 // Set controller.[[byobRequest]] to byobRequest.
854 self.byob_request.set(Some(&byob_request));
855 }
856
857 // Return controller.[[byobRequest]].
858 Ok(self.byob_request.get())
859 }
860
861 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-close>
862 pub(crate) fn close(&self, cx: SafeJSContext, can_gc: CanGc) -> Fallible<()> {
863 // Let stream be controller.[[stream]].
864 let stream = self.stream.get().unwrap();
865
866 // If controller.[[closeRequested]] is true or stream.[[state]] is not "readable", return.
867 if self.close_requested.get() || !stream.is_readable() {
868 return Ok(());
869 }
870
871 // If controller.[[queueTotalSize]] > 0,
872 if self.queue_total_size.get() > 0.0 {
873 // Set controller.[[closeRequested]] to true.
874 self.close_requested.set(true);
875 // Return.
876 return Ok(());
877 }
878
879 // If controller.[[pendingPullIntos]] is not empty,
880 let pending_pull_intos = self.pending_pull_intos.borrow();
881 if !pending_pull_intos.is_empty() {
882 // Let firstPendingPullInto be controller.[[pendingPullIntos]][0].
883 let first_pending_pull_into = pending_pull_intos.first().unwrap();
884
885 // If the remainder after dividing firstPendingPullInto’s bytes filled by
886 // firstPendingPullInto’s element size is not 0,
887 if first_pending_pull_into.bytes_filled.get() % first_pending_pull_into.element_size !=
888 0
889 {
890 // needed to drop the borrow and avoid BorrowMutError
891 drop(pending_pull_intos);
892
893 // Let e be a new TypeError exception.
894 let e = Error::Type(
895 c"remainder after dividing firstPendingPullInto's bytes
896 filled by firstPendingPullInto's element size is not 0"
897 .to_owned(),
898 );
899
900 // Perform ! ReadableByteStreamControllerError(controller, e).
901 rooted!(in(*cx) let mut error = UndefinedValue());
902 e.clone()
903 .to_jsval(cx, &self.global(), error.handle_mut(), can_gc);
904 self.error(error.handle(), can_gc);
905
906 // Throw e.
907 return Err(e);
908 }
909 }
910
911 // Perform ! ReadableByteStreamControllerClearAlgorithms(controller).
912 self.clear_algorithms();
913
914 // Perform ! ReadableStreamClose(stream).
915 stream.close(can_gc);
916 Ok(())
917 }
918
919 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-error>
920 pub(crate) fn error(&self, e: SafeHandleValue, can_gc: CanGc) {
921 // Let stream be controller.[[stream]].
922 let stream = self.stream.get().unwrap();
923
924 // If stream.[[state]] is not "readable", return.
925 if !stream.is_readable() {
926 return;
927 }
928
929 // Perform ! ReadableByteStreamControllerClearPendingPullIntos(controller).
930 self.clear_pending_pull_intos();
931
932 // Perform ! ResetQueue(controller).
933 self.reset_queue();
934
935 // Perform ! ReadableByteStreamControllerClearAlgorithms(controller).
936 self.clear_algorithms();
937
938 // Perform ! ReadableStreamError(stream, e).
939 stream.error(e, can_gc);
940 }
941
942 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-clear-algorithms>
943 fn clear_algorithms(&self) {
944 // Set controller.[[pullAlgorithm]] to undefined.
945 // Set controller.[[cancelAlgorithm]] to undefined.
946 self.underlying_source.set(None);
947 }
948
949 /// <https://streams.spec.whatwg.org/#reset-queue>
950 pub(crate) fn reset_queue(&self) {
951 // Assert: container has [[queue]] and [[queueTotalSize]] internal slots.
952
953 // Set container.[[queue]] to a new empty list.
954 self.queue.borrow_mut().clear();
955
956 // Set container.[[queueTotalSize]] to 0.
957 self.queue_total_size.set(0.0);
958 }
959
960 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-clear-pending-pull-intos>
961 pub(crate) fn clear_pending_pull_intos(&self) {
962 // Perform ! ReadableByteStreamControllerInvalidateBYOBRequest(controller).
963 self.invalidate_byob_request();
964
965 // Set controller.[[pendingPullIntos]] to a new empty list.
966 self.pending_pull_intos.borrow_mut().clear();
967 }
968
969 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-invalidate-byob-request>
970 pub(crate) fn invalidate_byob_request(&self) {
971 if let Some(byob_request) = self.byob_request.get() {
972 // Set controller.[[byobRequest]].[[controller]] to undefined.
973 byob_request.set_controller(None);
974
975 // Set controller.[[byobRequest]].[[view]] to null.
976 byob_request.set_view(None);
977
978 // Set controller.[[byobRequest]] to null.
979 self.byob_request.set(None);
980 }
981 // If controller.[[byobRequest]] is null, return.
982 }
983
984 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-enqueue>
985 pub(crate) fn enqueue(
986 &self,
987 cx: &mut js::context::JSContext,
988 chunk: HeapBufferSource<ArrayBufferViewU8>,
989 ) -> Fallible<()> {
990 // Let stream be controller.[[stream]].
991 let stream = self.stream.get().unwrap();
992
993 // If controller.[[closeRequested]] is true or stream.[[state]] is not "readable", return.
994 if self.close_requested.get() || !stream.is_readable() {
995 return Ok(());
996 }
997
998 // Let buffer be chunk.[[ViewedArrayBuffer]].
999 let buffer = chunk.get_array_buffer_view_buffer(cx.into());
1000
1001 // Let byteOffset be chunk.[[ByteOffset]].
1002 let byte_offset = chunk.get_byte_offset();
1003
1004 // Let byteLength be chunk.[[ByteLength]].
1005 let byte_length = chunk.byte_length();
1006
1007 // If ! IsDetachedBuffer(buffer) is true, throw a TypeError exception.
1008 if buffer.is_detached_buffer(cx.into()) {
1009 return Err(Error::Type(c"buffer is detached".to_owned()));
1010 }
1011
1012 // Let transferredBuffer be ? TransferArrayBuffer(buffer).
1013 let transferred_buffer = buffer.transfer_array_buffer(cx.into())?;
1014
1015 // If controller.[[pendingPullIntos]] is not empty,
1016 {
1017 let mut pending_pull_intos = self.pending_pull_intos.borrow_mut();
1018 if !pending_pull_intos.is_empty() {
1019 // Let firstPendingPullInto be controller.[[pendingPullIntos]][0].
1020 let first_descriptor = pending_pull_intos.first_mut().unwrap();
1021 // If ! IsDetachedBuffer(firstPendingPullInto’s buffer) is true, throw a TypeError exception.
1022 if first_descriptor.buffer.is_detached_buffer(cx.into()) {
1023 return Err(Error::Type(c"buffer is detached".to_owned()));
1024 }
1025
1026 // Perform ! ReadableByteStreamControllerInvalidateBYOBRequest(controller).
1027 self.invalidate_byob_request();
1028
1029 // Set firstPendingPullInto’s buffer to ! TransferArrayBuffer(firstPendingPullInto’s buffer).
1030 first_descriptor.buffer = first_descriptor
1031 .buffer
1032 .transfer_array_buffer(cx.into())
1033 .expect("TransferArrayBuffer failed");
1034
1035 // If firstPendingPullInto’s reader type is "none",
1036 if first_descriptor.reader_type.is_none() {
1037 // needed to drop the borrow and avoid BorrowMutError
1038 drop(pending_pull_intos);
1039
1040 // perform ? ReadableByteStreamControllerEnqueueDetachedPullIntoToQueue(
1041 // controller, firstPendingPullInto).
1042 self.enqueue_detached_pull_into_to_queue(cx.into(), CanGc::from_cx(cx))?;
1043 }
1044 }
1045 }
1046
1047 // If ! ReadableStreamHasDefaultReader(stream) is true,
1048 if stream.has_default_reader() {
1049 // Perform ! ReadableByteStreamControllerProcessReadRequestsUsingQueue(controller).
1050 self.process_read_requests_using_queue(cx)
1051 .expect("process_read_requests_using_queue failed");
1052
1053 // If ! ReadableStreamGetNumReadRequests(stream) is 0,
1054 if stream.get_num_read_requests() == 0 {
1055 // Assert: controller.[[pendingPullIntos]] is empty.
1056 {
1057 assert!(self.pending_pull_intos.borrow().is_empty());
1058 }
1059
1060 // Perform ! ReadableByteStreamControllerEnqueueChunkToQueue(
1061 // controller, transferredBuffer, byteOffset, byteLength).
1062 self.enqueue_chunk_to_queue(transferred_buffer, byte_offset, byte_length);
1063 } else {
1064 // Assert: controller.[[queue]] is empty.
1065 assert!(self.queue.borrow().is_empty());
1066
1067 // If controller.[[pendingPullIntos]] is not empty,
1068
1069 let pending_pull_intos = self.pending_pull_intos.borrow();
1070 if !pending_pull_intos.is_empty() {
1071 // Assert: controller.[[pendingPullIntos]][0]'s reader type is "default".
1072 assert!(matches!(
1073 pending_pull_intos.first().unwrap().reader_type,
1074 Some(ReaderType::Default)
1075 ));
1076
1077 // needed to drop the borrow and avoid BorrowMutError
1078 drop(pending_pull_intos);
1079
1080 // Perform ! ReadableByteStreamControllerShiftPendingPullInto(controller).
1081 self.shift_pending_pull_into();
1082 }
1083
1084 // Let transferredView be ! Construct(%Uint8Array%, « transferredBuffer, byteOffset, byteLength »).
1085 let transferred_view = create_buffer_source_with_constructor(
1086 cx.into(),
1087 &Constructor::Name(Type::Uint8),
1088 &transferred_buffer,
1089 byte_offset,
1090 byte_length,
1091 )
1092 .expect("Construct Uint8Array failed");
1093
1094 // Perform ! ReadableStreamFulfillReadRequest(stream, transferredView, false).
1095 rooted!(&in(cx) let mut view_value = UndefinedValue());
1096 transferred_view.get_buffer_view_value(cx.into(), view_value.handle_mut());
1097 stream.fulfill_read_request(view_value.handle(), false, CanGc::from_cx(cx));
1098 }
1099 // Otherwise, if ! ReadableStreamHasBYOBReader(stream) is true,
1100 } else if stream.has_byob_reader() {
1101 // Perform ! ReadableByteStreamControllerEnqueueChunkToQueue(
1102 // controller, transferredBuffer, byteOffset, byteLength).
1103 self.enqueue_chunk_to_queue(transferred_buffer, byte_offset, byte_length);
1104
1105 // Let filledPullIntos be the result of performing !
1106 // ReadableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(controller).
1107 let filled_pull_intos = self.process_pull_into_descriptors_using_queue(cx.into());
1108
1109 // For each filledPullInto of filledPullIntos,
1110 // Perform ! ReadableByteStreamControllerCommitPullIntoDescriptor(stream, filledPullInto).
1111 for filled_pull_into in filled_pull_intos {
1112 self.commit_pull_into_descriptor(cx.into(), &filled_pull_into, CanGc::from_cx(cx))
1113 .expect("commit_pull_into_descriptor failed");
1114 }
1115 } else {
1116 // Assert: ! IsReadableStreamLocked(stream) is false.
1117 assert!(!stream.is_locked());
1118
1119 // Perform ! ReadableByteStreamControllerEnqueueChunkToQueue
1120 // (controller, transferredBuffer, byteOffset, byteLength).
1121 self.enqueue_chunk_to_queue(transferred_buffer, byte_offset, byte_length);
1122 }
1123
1124 // Perform ! ReadableByteStreamControllerCallPullIfNeeded(controller).
1125 self.call_pull_if_needed(CanGc::from_cx(cx));
1126
1127 Ok(())
1128 }
1129
1130 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-commit-pull-into-descriptor>
1131 pub(crate) fn commit_pull_into_descriptor(
1132 &self,
1133 cx: SafeJSContext,
1134 pull_into_descriptor: &PullIntoDescriptor,
1135 can_gc: CanGc,
1136 ) -> Fallible<()> {
1137 // Assert: stream.[[state]] is not "errored".
1138 let stream = self.stream.get().unwrap();
1139 assert!(!stream.is_errored());
1140
1141 // Assert: pullIntoDescriptor.reader type is not "none".
1142 assert!(pull_into_descriptor.reader_type.is_some());
1143
1144 // Let done be false.
1145 let mut done = false;
1146
1147 // If stream.[[state]] is "closed",
1148 if stream.is_closed() {
1149 // Assert: the remainder after dividing pullIntoDescriptor’s bytes filled
1150 // by pullIntoDescriptor’s element size is 0.
1151 assert!(
1152 pull_into_descriptor.bytes_filled.get() % pull_into_descriptor.element_size == 0
1153 );
1154
1155 // Set done to true.
1156 done = true;
1157 }
1158
1159 // Let filledView be ! ReadableByteStreamControllerConvertPullIntoDescriptor(pullIntoDescriptor).
1160 let filled_view = self
1161 .convert_pull_into_descriptor(cx, pull_into_descriptor)
1162 .expect("convert_pull_into_descriptor failed");
1163
1164 rooted!(in(*cx) let mut view_value = UndefinedValue());
1165 filled_view.get_buffer_view_value(cx, view_value.handle_mut());
1166
1167 // If pullIntoDescriptor’s reader type is "default",
1168 if matches!(pull_into_descriptor.reader_type, Some(ReaderType::Default)) {
1169 // Perform ! ReadableStreamFulfillReadRequest(stream, filledView, done).
1170
1171 stream.fulfill_read_request(view_value.handle(), done, can_gc);
1172 } else {
1173 // Assert: pullIntoDescriptor’s reader type is "byob".
1174 assert!(matches!(
1175 pull_into_descriptor.reader_type,
1176 Some(ReaderType::Byob)
1177 ));
1178
1179 // Perform ! ReadableStreamFulfillReadIntoRequest(stream, filledView, done).
1180 stream.fulfill_read_into_request(view_value.handle(), done, can_gc);
1181 }
1182 Ok(())
1183 }
1184
1185 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-convert-pull-into-descriptor>
1186 pub(crate) fn convert_pull_into_descriptor(
1187 &self,
1188 cx: SafeJSContext,
1189 pull_into_descriptor: &PullIntoDescriptor,
1190 ) -> Fallible<HeapBufferSource<ArrayBufferViewU8>> {
1191 // Let bytesFilled be pullIntoDescriptor’s bytes filled.
1192 let bytes_filled = pull_into_descriptor.bytes_filled.get();
1193
1194 // Let elementSize be pullIntoDescriptor’s element size.
1195 let element_size = pull_into_descriptor.element_size;
1196
1197 // Assert: bytesFilled ≤ pullIntoDescriptor’s byte length.
1198 assert!(bytes_filled <= pull_into_descriptor.byte_length);
1199
1200 // Assert: the remainder after dividing bytesFilled by elementSize is 0.
1201 assert!(bytes_filled % element_size == 0);
1202
1203 // Let buffer be ! TransferArrayBuffer(pullIntoDescriptor’s buffer).
1204 let buffer = pull_into_descriptor
1205 .buffer
1206 .transfer_array_buffer(cx)
1207 .expect("TransferArrayBuffer failed");
1208
1209 // Return ! Construct(pullIntoDescriptor’s view constructor,
1210 // « buffer, pullIntoDescriptor’s byte offset, bytesFilled ÷ elementSize »).
1211 Ok(create_buffer_source_with_constructor(
1212 cx,
1213 &pull_into_descriptor.view_constructor,
1214 &buffer,
1215 pull_into_descriptor.byte_offset as usize,
1216 (bytes_filled / element_size) as usize,
1217 )
1218 .expect("Construct view failed"))
1219 }
1220
1221 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-process-pull-into-descriptors-using-queue>
1222 pub(crate) fn process_pull_into_descriptors_using_queue(
1223 &self,
1224 cx: SafeJSContext,
1225 ) -> Vec<PullIntoDescriptor> {
1226 // Assert: controller.[[closeRequested]] is false.
1227 assert!(!self.close_requested.get());
1228
1229 // Let filledPullIntos be a new empty list.
1230 let mut filled_pull_intos = Vec::new();
1231
1232 // While controller.[[pendingPullIntos]] is not empty,
1233 loop {
1234 // If controller.[[queueTotalSize]] is 0, then break.
1235 if self.queue_total_size.get() == 0.0 {
1236 break;
1237 }
1238
1239 // Let pullIntoDescriptor be controller.[[pendingPullIntos]][0].
1240 let fill_pull_result = {
1241 let pending_pull_intos = self.pending_pull_intos.borrow();
1242 let Some(pull_into_descriptor) = pending_pull_intos.first() else {
1243 break;
1244 };
1245 self.fill_pull_into_descriptor_from_queue(cx, pull_into_descriptor)
1246 };
1247
1248 // If ! ReadableByteStreamControllerFillPullIntoDescriptorFromQueue(controller, pullIntoDescriptor) is true,
1249 if fill_pull_result {
1250 // Perform ! ReadableByteStreamControllerShiftPendingPullInto(controller).
1251 let pull_into_descriptor = self.shift_pending_pull_into();
1252
1253 // Append pullIntoDescriptor to filledPullIntos.
1254 filled_pull_intos.push(pull_into_descriptor);
1255 }
1256 }
1257
1258 // Return filledPullIntos.
1259 filled_pull_intos
1260 }
1261
1262 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-fill-pull-into-descriptor-from-queue>
1263 pub(crate) fn fill_pull_into_descriptor_from_queue(
1264 &self,
1265 cx: SafeJSContext,
1266 pull_into_descriptor: &PullIntoDescriptor,
1267 ) -> bool {
1268 // Let maxBytesToCopy be min(controller.[[queueTotalSize]],
1269 // pullIntoDescriptor’s byte length − pullIntoDescriptor’s bytes filled).
1270 let max_bytes_to_copy = min(
1271 self.queue_total_size.get() as usize,
1272 (pull_into_descriptor.byte_length - pull_into_descriptor.bytes_filled.get()) as usize,
1273 );
1274
1275 // Let maxBytesFilled be pullIntoDescriptor’s bytes filled + maxBytesToCopy.
1276 let max_bytes_filled = pull_into_descriptor.bytes_filled.get() as usize + max_bytes_to_copy;
1277
1278 // Let totalBytesToCopyRemaining be maxBytesToCopy.
1279 let mut total_bytes_to_copy_remaining = max_bytes_to_copy;
1280
1281 // Let ready be false.
1282 let mut ready = false;
1283
1284 // Assert: ! IsDetachedBuffer(pullIntoDescriptor’s buffer) is false.
1285 assert!(!pull_into_descriptor.buffer.is_detached_buffer(cx));
1286
1287 // Assert: pullIntoDescriptor’s bytes filled < pullIntoDescriptor’s minimum fill.
1288 assert!(pull_into_descriptor.bytes_filled.get() < pull_into_descriptor.minimum_fill);
1289
1290 // Let remainderBytes be the remainder after dividing maxBytesFilled by pullIntoDescriptor’s element size.
1291 let remainder_bytes = max_bytes_filled % pull_into_descriptor.element_size as usize;
1292
1293 // Let maxAlignedBytes be maxBytesFilled − remainderBytes.
1294 let max_aligned_bytes = max_bytes_filled - remainder_bytes;
1295
1296 // If maxAlignedBytes ≥ pullIntoDescriptor’s minimum fill,
1297 if max_aligned_bytes >= pull_into_descriptor.minimum_fill as usize {
1298 // Set totalBytesToCopyRemaining to maxAlignedBytes − pullIntoDescriptor’s bytes filled.
1299 total_bytes_to_copy_remaining =
1300 max_aligned_bytes - (pull_into_descriptor.bytes_filled.get() as usize);
1301
1302 // Set ready to true.
1303 ready = true;
1304 }
1305
1306 // Let queue be controller.[[queue]].
1307 let mut queue = self.queue.borrow_mut();
1308
1309 // While totalBytesToCopyRemaining > 0,
1310 while total_bytes_to_copy_remaining > 0 {
1311 // Let headOfQueue be queue[0].
1312 let head_of_queue = queue.front_mut().unwrap();
1313
1314 // Let bytesToCopy be min(totalBytesToCopyRemaining, headOfQueue’s byte length).
1315 let bytes_to_copy = total_bytes_to_copy_remaining.min(head_of_queue.byte_length);
1316
1317 // Let destStart be pullIntoDescriptor’s byte offset + pullIntoDescriptor’s bytes filled.
1318 let dest_start =
1319 pull_into_descriptor.byte_offset + pull_into_descriptor.bytes_filled.get();
1320
1321 // Let descriptorBuffer be pullIntoDescriptor’s buffer.
1322 let descriptor_buffer = &pull_into_descriptor.buffer;
1323
1324 // Let queueBuffer be headOfQueue’s buffer.
1325 let queue_buffer = &head_of_queue.buffer;
1326
1327 // Let queueByteOffset be headOfQueue’s byte offset.
1328 let queue_byte_offset = head_of_queue.byte_offset;
1329
1330 // Assert: ! CanCopyDataBlockBytes(descriptorBuffer, destStart,
1331 // queueBuffer, queueByteOffset, bytesToCopy) is true.
1332 assert!(descriptor_buffer.can_copy_data_block_bytes(
1333 cx,
1334 dest_start as usize,
1335 queue_buffer,
1336 queue_byte_offset,
1337 bytes_to_copy
1338 ));
1339
1340 // Perform ! CopyDataBlockBytes(descriptorBuffer.[[ArrayBufferData]], destStart,
1341 // queueBuffer.[[ArrayBufferData]], queueByteOffset, bytesToCopy).
1342 descriptor_buffer.copy_data_block_bytes(
1343 cx,
1344 dest_start as usize,
1345 queue_buffer,
1346 queue_byte_offset,
1347 bytes_to_copy,
1348 );
1349
1350 // If headOfQueue’s byte length is bytesToCopy,
1351 if head_of_queue.byte_length == bytes_to_copy {
1352 // Remove queue[0].
1353 queue.pop_front().unwrap();
1354 } else {
1355 // Set headOfQueue’s byte offset to headOfQueue’s byte offset + bytesToCopy.
1356 head_of_queue.byte_offset += bytes_to_copy;
1357
1358 // Set headOfQueue’s byte length to headOfQueue’s byte length − bytesToCopy.
1359 head_of_queue.byte_length -= bytes_to_copy;
1360 }
1361
1362 // Set controller.[[queueTotalSize]] to controller.[[queueTotalSize]] − bytesToCopy.
1363 self.queue_total_size
1364 .set(self.queue_total_size.get() - (bytes_to_copy as f64));
1365
1366 // Perform ! ReadableByteStreamControllerFillHeadPullIntoDescriptor(
1367 // controller, bytesToCopy, pullIntoDescriptor).
1368 self.fill_head_pull_into_descriptor(bytes_to_copy as u64, pull_into_descriptor);
1369
1370 // Set totalBytesToCopyRemaining to totalBytesToCopyRemaining − bytesToCopy.
1371 total_bytes_to_copy_remaining -= bytes_to_copy;
1372 }
1373
1374 // If ready is false,
1375 if !ready {
1376 // Assert: controller.[[queueTotalSize]] is 0.
1377 assert!(self.queue_total_size.get() == 0.0);
1378
1379 // Assert: pullIntoDescriptor’s bytes filled > 0.
1380 assert!(pull_into_descriptor.bytes_filled.get() > 0);
1381
1382 // Assert: pullIntoDescriptor’s bytes filled < pullIntoDescriptor’s minimum fill.
1383 assert!(pull_into_descriptor.bytes_filled.get() < pull_into_descriptor.minimum_fill);
1384 }
1385
1386 // Return ready.
1387 ready
1388 }
1389
1390 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-fill-head-pull-into-descriptor>
1391 pub(crate) fn fill_head_pull_into_descriptor(
1392 &self,
1393 bytes_copied: u64,
1394 pull_into_descriptor: &PullIntoDescriptor,
1395 ) {
1396 // Assert: either controller.[[pendingPullIntos]] is empty,
1397 // or controller.[[pendingPullIntos]][0] is pullIntoDescriptor.
1398 {
1399 let pending_pull_intos = self.pending_pull_intos.borrow();
1400 assert!(
1401 pending_pull_intos.is_empty() ||
1402 pending_pull_intos.first().unwrap() == pull_into_descriptor
1403 );
1404 }
1405
1406 // Assert: controller.[[byobRequest]] is null.
1407 assert!(self.byob_request.get().is_none());
1408
1409 // Set pullIntoDescriptor’s bytes filled to bytes filled + size.
1410 pull_into_descriptor
1411 .bytes_filled
1412 .set(pull_into_descriptor.bytes_filled.get() + bytes_copied);
1413 }
1414
1415 /// <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamcontrollerenqueuedetachedpullintotoqueue>
1416 pub(crate) fn enqueue_detached_pull_into_to_queue(
1417 &self,
1418 cx: SafeJSContext,
1419 can_gc: CanGc,
1420 ) -> Fallible<()> {
1421 // first_descriptor: &PullIntoDescriptor,
1422 let pending_pull_intos = self.pending_pull_intos.borrow();
1423 let first_descriptor = pending_pull_intos.first().unwrap();
1424
1425 // Assert: pullIntoDescriptor’s reader type is "none".
1426 assert!(first_descriptor.reader_type.is_none());
1427
1428 // If pullIntoDescriptor’s bytes filled > 0, perform ?
1429 // ReadableByteStreamControllerEnqueueClonedChunkToQueue(controller,
1430 // pullIntoDescriptor’s buffer, pullIntoDescriptor’s byte offset, pullIntoDescriptor’s bytes filled).
1431
1432 if first_descriptor.bytes_filled.get() > 0 {
1433 self.enqueue_cloned_chunk_to_queue(
1434 cx,
1435 &first_descriptor.buffer,
1436 first_descriptor.byte_offset,
1437 first_descriptor.bytes_filled.get(),
1438 can_gc,
1439 )?;
1440 }
1441
1442 // needed to drop the borrow and avoid BorrowMutError
1443 drop(pending_pull_intos);
1444
1445 // Perform ! ReadableByteStreamControllerShiftPendingPullInto(controller).
1446 self.shift_pending_pull_into();
1447
1448 Ok(())
1449 }
1450
1451 /// <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamcontrollerenqueueclonedchunktoqueue>
1452 pub(crate) fn enqueue_cloned_chunk_to_queue(
1453 &self,
1454 cx: SafeJSContext,
1455 buffer: &HeapBufferSource<ArrayBufferU8>,
1456 byte_offset: u64,
1457 byte_length: u64,
1458 can_gc: CanGc,
1459 ) -> Fallible<()> {
1460 // Let cloneResult be CloneArrayBuffer(buffer, byteOffset, byteLength, %ArrayBuffer%).
1461 if let Ok(clone_result) =
1462 buffer.clone_array_buffer(cx, byte_offset as usize, byte_length as usize)
1463 {
1464 // Perform ! ReadableByteStreamControllerEnqueueChunkToQueue
1465 // (controller, cloneResult.[[Value]], 0, byteLength).
1466 self.enqueue_chunk_to_queue(clone_result, 0, byte_length as usize);
1467
1468 Ok(())
1469 } else {
1470 // If cloneResult is an abrupt completion,
1471
1472 // Perform ! ReadableByteStreamControllerError(controller, cloneResult.[[Value]]).
1473 rooted!(in(*cx) let mut rval = UndefinedValue());
1474 let error = Error::Type(c"can not clone array buffer".to_owned());
1475 error
1476 .clone()
1477 .to_jsval(cx, &self.global(), rval.handle_mut(), can_gc);
1478 self.error(rval.handle(), can_gc);
1479
1480 // Return cloneResult.
1481 Err(error)
1482 }
1483 }
1484
1485 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-enqueue-chunk-to-queue>
1486 pub(crate) fn enqueue_chunk_to_queue(
1487 &self,
1488 buffer: HeapBufferSource<ArrayBufferU8>,
1489 byte_offset: usize,
1490 byte_length: usize,
1491 ) {
1492 // Let entry be a new ReadableByteStreamQueueEntry object.
1493 let entry = QueueEntry::new(buffer, byte_offset, byte_length);
1494
1495 // Append entry to controller.[[queue]].
1496 self.queue.borrow_mut().push_back(entry);
1497
1498 // Set controller.[[queueTotalSize]] to controller.[[queueTotalSize]] + byteLength.
1499 self.queue_total_size
1500 .set(self.queue_total_size.get() + byte_length as f64);
1501 }
1502
1503 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-shift-pending-pull-into>
1504 pub(crate) fn shift_pending_pull_into(&self) -> PullIntoDescriptor {
1505 // Assert: controller.[[byobRequest]] is null.
1506 assert!(self.byob_request.get().is_none());
1507
1508 // Let descriptor be controller.[[pendingPullIntos]][0].
1509 // Remove descriptor from controller.[[pendingPullIntos]].
1510 // Return descriptor.
1511 self.pending_pull_intos.borrow_mut().remove(0)
1512 }
1513
1514 /// <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamcontrollerprocessreadrequestsusingqueue>
1515 pub(crate) fn process_read_requests_using_queue(
1516 &self,
1517 cx: &mut js::context::JSContext,
1518 ) -> Fallible<()> {
1519 // Let reader be controller.[[stream]].[[reader]].
1520 // Assert: reader implements ReadableStreamDefaultReader.
1521 let reader = self.stream.get().unwrap().get_default_reader();
1522
1523 // Step 3
1524 reader.process_read_requests(cx, DomRoot::from_ref(self))
1525 }
1526
1527 /// <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamcontrollerfillreadrequestfromqueue>
1528 pub(crate) fn fill_read_request_from_queue(
1529 &self,
1530 cx: SafeJSContext,
1531 read_request: &ReadRequest,
1532 can_gc: CanGc,
1533 ) -> Fallible<()> {
1534 // Assert: controller.[[queueTotalSize]] > 0.
1535 assert!(self.queue_total_size.get() > 0.0);
1536 // Also assert that the queue has a non-zero length;
1537 assert!(!self.queue.borrow().is_empty());
1538
1539 // Let entry be controller.[[queue]][0].
1540 // Remove entry from controller.[[queue]].
1541 let entry = self.remove_entry();
1542
1543 // Set controller.[[queueTotalSize]] to controller.[[queueTotalSize]] − entry’s byte length.
1544 self.queue_total_size
1545 .set(self.queue_total_size.get() - entry.byte_length as f64);
1546
1547 // Perform ! ReadableByteStreamControllerHandleQueueDrain(controller).
1548 self.handle_queue_drain(can_gc);
1549
1550 // Let view be ! Construct(%Uint8Array%, « entry’s buffer, entry’s byte offset, entry’s byte length »).
1551 let view = create_buffer_source_with_constructor(
1552 cx,
1553 &Constructor::Name(Type::Uint8),
1554 &entry.buffer,
1555 entry.byte_offset,
1556 entry.byte_length,
1557 )
1558 .expect("Construct Uint8Array failed");
1559
1560 // Perform readRequest’s chunk steps, given view.
1561 let result = RootedTraceableBox::new(Heap::default());
1562 rooted!(in(*cx) let mut view_value = UndefinedValue());
1563 view.get_buffer_view_value(cx, view_value.handle_mut());
1564 result.set(*view_value);
1565
1566 read_request.chunk_steps(result, &self.global(), can_gc);
1567
1568 Ok(())
1569 }
1570
1571 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-handle-queue-drain>
1572 pub(crate) fn handle_queue_drain(&self, can_gc: CanGc) {
1573 // Assert: controller.[[stream]].[[state]] is "readable".
1574 assert!(self.stream.get().unwrap().is_readable());
1575
1576 // If controller.[[queueTotalSize]] is 0 and controller.[[closeRequested]] is true,
1577 if self.queue_total_size.get() == 0.0 && self.close_requested.get() {
1578 // Perform ! ReadableByteStreamControllerClearAlgorithms(controller).
1579 self.clear_algorithms();
1580
1581 // Perform ! ReadableStreamClose(controller.[[stream]]).
1582 self.stream.get().unwrap().close(can_gc);
1583 } else {
1584 // Perform ! ReadableByteStreamControllerCallPullIfNeeded(controller).
1585 self.call_pull_if_needed(can_gc);
1586 }
1587 }
1588
1589 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-call-pull-if-needed>
1590 pub(crate) fn call_pull_if_needed(&self, can_gc: CanGc) {
1591 // Let shouldPull be ! ReadableByteStreamControllerShouldCallPull(controller).
1592 let should_pull = self.should_call_pull();
1593 // If shouldPull is false, return.
1594 if !should_pull {
1595 return;
1596 }
1597
1598 // If controller.[[pulling]] is true,
1599 if self.pulling.get() {
1600 // Set controller.[[pullAgain]] to true.
1601 self.pull_again.set(true);
1602
1603 // Return.
1604 return;
1605 }
1606
1607 // Assert: controller.[[pullAgain]] is false.
1608 assert!(!self.pull_again.get());
1609
1610 // Set controller.[[pulling]] to true.
1611 self.pulling.set(true);
1612
1613 // Let pullPromise be the result of performing controller.[[pullAlgorithm]].
1614 // Continues into the resolve and reject handling of the native handler.
1615 let global = self.global();
1616 let rooted_controller = DomRoot::from_ref(self);
1617 let controller = Controller::ReadableByteStreamController(rooted_controller.clone());
1618
1619 if let Some(underlying_source) = self.underlying_source.get() {
1620 let handler = PromiseNativeHandler::new(
1621 &global,
1622 Some(Box::new(PullAlgorithmFulfillmentHandler {
1623 controller: Dom::from_ref(&rooted_controller),
1624 })),
1625 Some(Box::new(PullAlgorithmRejectionHandler {
1626 controller: Dom::from_ref(&rooted_controller),
1627 })),
1628 can_gc,
1629 );
1630
1631 let realm = enter_realm(&*global);
1632 let comp = InRealm::Entered(&realm);
1633 let result = underlying_source
1634 .call_pull_algorithm(controller, can_gc)
1635 .unwrap_or_else(|| {
1636 let promise = Promise::new(&global, can_gc);
1637 promise.resolve_native(&(), can_gc);
1638 Ok(promise)
1639 });
1640 let promise = result.unwrap_or_else(|error| {
1641 let cx = GlobalScope::get_cx();
1642 rooted!(in(*cx) let mut rval = UndefinedValue());
1643 // TODO: check if `self.global()` is the right globalscope.
1644 error.to_jsval(cx, &self.global(), rval.handle_mut(), can_gc);
1645 let promise = Promise::new(&global, can_gc);
1646 promise.reject_native(&rval.handle(), can_gc);
1647 promise
1648 });
1649 promise.append_native_handler(&handler, comp, can_gc);
1650 }
1651 }
1652
1653 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-should-call-pull>
1654 fn should_call_pull(&self) -> bool {
1655 // Let stream be controller.[[stream]].
1656 // Note: the spec does not assert that stream is not undefined here,
1657 // so we return false if it is.
1658 let stream = self.stream.get().unwrap();
1659
1660 // If stream.[[state]] is not "readable", return false.
1661 if !stream.is_readable() {
1662 return false;
1663 }
1664
1665 // If controller.[[closeRequested]] is true, return false.
1666 if self.close_requested.get() {
1667 return false;
1668 }
1669
1670 // If controller.[[started]] is false, return false.
1671 if !self.started.get() {
1672 return false;
1673 }
1674
1675 // If ! ReadableStreamHasDefaultReader(stream) is true and ! ReadableStreamGetNumReadRequests(stream) > 0
1676 // , return true.
1677 if stream.has_default_reader() && stream.get_num_read_requests() > 0 {
1678 return true;
1679 }
1680
1681 // If ! ReadableStreamHasBYOBReader(stream) is true and ! ReadableStreamGetNumReadIntoRequests(stream) > 0
1682 // , return true.
1683 if stream.has_byob_reader() && stream.get_num_read_into_requests() > 0 {
1684 return true;
1685 }
1686
1687 // Let desiredSize be ! ReadableByteStreamControllerGetDesiredSize(controller).
1688 let desired_size = self.get_desired_size();
1689
1690 // Assert: desiredSize is not null.
1691 assert!(desired_size.is_some());
1692
1693 // If desiredSize > 0, return true.
1694 if desired_size.unwrap() > 0. {
1695 return true;
1696 }
1697
1698 // Return false.
1699 false
1700 }
1701 /// <https://streams.spec.whatwg.org/#set-up-readable-byte-stream-controller>
1702 pub(crate) fn setup(
1703 &self,
1704 global: &GlobalScope,
1705 stream: DomRoot<ReadableStream>,
1706 can_gc: CanGc,
1707 ) -> Fallible<()> {
1708 // Assert: stream.[[controller]] is undefined.
1709 stream.assert_no_controller();
1710
1711 // If autoAllocateChunkSize is not undefined,
1712 if self.auto_allocate_chunk_size.is_some() {
1713 // Assert: ! IsInteger(autoAllocateChunkSize) is true. Implicit
1714 // Assert: autoAllocateChunkSize is positive. (Implicit by type.)
1715 }
1716
1717 // Set controller.[[stream]] to stream.
1718 self.stream.set(Some(&stream));
1719
1720 // Set controller.[[pullAgain]] and controller.[[pulling]] to false.
1721 self.pull_again.set(false);
1722 self.pulling.set(false);
1723
1724 // Set controller.[[byobRequest]] to null.
1725 self.byob_request.set(None);
1726
1727 // Perform ! ResetQueue(controller).
1728 self.reset_queue();
1729
1730 // Set controller.[[closeRequested]] and controller.[[started]] to false.
1731 self.close_requested.set(false);
1732 self.started.set(false);
1733
1734 // Set controller.[[strategyHWM]] to highWaterMark.
1735 // Set controller.[[pullAlgorithm]] to pullAlgorithm.
1736 // Set controller.[[cancelAlgorithm]] to cancelAlgorithm.
1737 // Set controller.[[autoAllocateChunkSize]] to autoAllocateChunkSize.
1738 // Set controller.[[pendingPullIntos]] to a new empty list.
1739 // Note: the above steps are done in `new`.
1740
1741 // Set stream.[[controller]] to controller.
1742 let rooted_byte_controller = DomRoot::from_ref(self);
1743 stream.set_byte_controller(&rooted_byte_controller);
1744
1745 if let Some(underlying_source) = rooted_byte_controller.underlying_source.get() {
1746 // Let startResult be the result of performing startAlgorithm. (This might throw an exception.)
1747 let start_result = underlying_source
1748 .call_start_algorithm(
1749 Controller::ReadableByteStreamController(rooted_byte_controller.clone()),
1750 can_gc,
1751 )
1752 .unwrap_or_else(|| {
1753 let promise = Promise::new(global, can_gc);
1754 promise.resolve_native(&(), can_gc);
1755 Ok(promise)
1756 });
1757
1758 // Let startPromise be a promise resolved with startResult.
1759 let start_promise = start_result?;
1760
1761 // Upon fulfillment of startPromise, Upon rejection of startPromise with reason r,
1762 let handler = PromiseNativeHandler::new(
1763 global,
1764 Some(Box::new(StartAlgorithmFulfillmentHandler {
1765 controller: Dom::from_ref(&rooted_byte_controller),
1766 })),
1767 Some(Box::new(StartAlgorithmRejectionHandler {
1768 controller: Dom::from_ref(&rooted_byte_controller),
1769 })),
1770 can_gc,
1771 );
1772 let realm = enter_realm(global);
1773 let comp = InRealm::Entered(&realm);
1774 start_promise.append_native_handler(&handler, comp, can_gc);
1775 };
1776
1777 Ok(())
1778 }
1779
1780 // <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamcontroller-releasesteps
1781 pub(crate) fn perform_release_steps(&self) -> Fallible<()> {
1782 // If this.[[pendingPullIntos]] is not empty,
1783 let mut pending_pull_intos = self.pending_pull_intos.borrow_mut();
1784 if !pending_pull_intos.is_empty() {
1785 // Let firstPendingPullInto be this.[[pendingPullIntos]][0].
1786 let mut first_pending_pull_into = pending_pull_intos.remove(0);
1787
1788 // Set firstPendingPullInto’s reader type to "none".
1789 first_pending_pull_into.reader_type = None;
1790
1791 // Set this.[[pendingPullIntos]] to the list « firstPendingPullInto »
1792 pending_pull_intos.clear();
1793 pending_pull_intos.push(first_pending_pull_into);
1794 }
1795 Ok(())
1796 }
1797
1798 /// <https://streams.spec.whatwg.org/#rbs-controller-private-cancel>
1799 pub(crate) fn perform_cancel_steps(
1800 &self,
1801 cx: &mut js::context::JSContext,
1802 global: &GlobalScope,
1803 reason: SafeHandleValue,
1804 ) -> Rc<Promise> {
1805 // Perform ! ReadableByteStreamControllerClearPendingPullIntos(this).
1806 self.clear_pending_pull_intos();
1807
1808 // Perform ! ResetQueue(this).
1809 self.reset_queue();
1810
1811 let underlying_source = self
1812 .underlying_source
1813 .get()
1814 .expect("Controller should have a source when the cancel steps are called into.");
1815
1816 // Let result be the result of performing this.[[cancelAlgorithm]], passing in reason.
1817 let result = underlying_source
1818 .call_cancel_algorithm(cx, global, reason)
1819 .unwrap_or_else(|| {
1820 let promise = Promise::new2(cx, global);
1821 promise.resolve_native(&(), CanGc::from_cx(cx));
1822 Ok(promise)
1823 });
1824
1825 let promise = result.unwrap_or_else(|error| {
1826 rooted!(&in(cx) let mut rval = UndefinedValue());
1827 error.to_jsval(cx.into(), global, rval.handle_mut(), CanGc::from_cx(cx));
1828 let promise = Promise::new2(cx, global);
1829 promise.reject_native(&rval.handle(), CanGc::from_cx(cx));
1830 promise
1831 });
1832
1833 // Perform ! ReadableByteStreamControllerClearAlgorithms(this).
1834 self.clear_algorithms();
1835
1836 // Return result(the promise).
1837 promise
1838 }
1839
1840 /// <https://streams.spec.whatwg.org/#rbs-controller-private-pull>
1841 pub(crate) fn perform_pull_steps(
1842 &self,
1843 cx: SafeJSContext,
1844 read_request: &ReadRequest,
1845 can_gc: CanGc,
1846 ) {
1847 // Let stream be this.[[stream]].
1848 let stream = self.stream.get().unwrap();
1849
1850 // Assert: ! ReadableStreamHasDefaultReader(stream) is true.
1851 assert!(stream.has_default_reader());
1852
1853 // If this.[[queueTotalSize]] > 0,
1854 if self.queue_total_size.get() > 0.0 {
1855 // Assert: ! ReadableStreamGetNumReadRequests(stream) is 0.
1856 assert_eq!(stream.get_num_read_requests(), 0);
1857
1858 // Perform ! ReadableByteStreamControllerFillReadRequestFromQueue(this, readRequest).
1859 let _ = self.fill_read_request_from_queue(cx, read_request, can_gc);
1860
1861 // Return.
1862 return;
1863 }
1864
1865 // Let autoAllocateChunkSize be this.[[autoAllocateChunkSize]].
1866 let auto_allocate_chunk_size = self.auto_allocate_chunk_size;
1867
1868 // If autoAllocateChunkSize is not undefined,
1869 if let Some(auto_allocate_chunk_size) = auto_allocate_chunk_size {
1870 // create_array_buffer_with_size
1871 // Let buffer be Construct(%ArrayBuffer%, « autoAllocateChunkSize »).
1872 match create_array_buffer_with_size(cx, auto_allocate_chunk_size as usize) {
1873 Ok(buffer) => {
1874 // Let pullIntoDescriptor be a new pull-into descriptor with
1875 // buffer buffer.[[Value]]
1876 // buffer byte length autoAllocateChunkSize
1877 // byte offset 0
1878 // byte length autoAllocateChunkSize
1879 // bytes filled 0
1880 // minimum fill 1
1881 // element size 1
1882 // view constructor %Uint8Array%
1883 // reader type "default"
1884 let pull_into_descriptor = PullIntoDescriptor {
1885 buffer,
1886 buffer_byte_length: auto_allocate_chunk_size,
1887 byte_length: auto_allocate_chunk_size,
1888 byte_offset: 0,
1889 bytes_filled: Cell::new(0),
1890 minimum_fill: 1,
1891 element_size: 1,
1892 view_constructor: Constructor::Name(Type::Uint8),
1893 reader_type: Some(ReaderType::Default),
1894 };
1895
1896 // Append pullIntoDescriptor to this.[[pendingPullIntos]].
1897 self.pending_pull_intos
1898 .borrow_mut()
1899 .push(pull_into_descriptor);
1900 },
1901 Err(error) => {
1902 // If buffer is an abrupt completion,
1903 // Perform readRequest’s error steps, given buffer.[[Value]].
1904
1905 rooted!(in(*cx) let mut rval = UndefinedValue());
1906 error.to_jsval(cx, &self.global(), rval.handle_mut(), can_gc);
1907 read_request.error_steps(rval.handle(), can_gc);
1908
1909 // Return.
1910 return;
1911 },
1912 }
1913 }
1914
1915 // Perform ! ReadableStreamAddReadRequest(stream, readRequest).
1916 stream.add_read_request(read_request);
1917
1918 // Perform ! ReadableByteStreamControllerCallPullIfNeeded(this).
1919 self.call_pull_if_needed(can_gc);
1920 }
1921
1922 /// Setting the JS object after the heap has settled down.
1923 pub(crate) fn set_underlying_source_this_object(&self, this_object: HandleObject) {
1924 if let Some(underlying_source) = self.underlying_source.get() {
1925 underlying_source.set_underlying_source_this_object(this_object);
1926 }
1927 }
1928
1929 pub(crate) fn remove_entry(&self) -> QueueEntry {
1930 self.queue
1931 .borrow_mut()
1932 .pop_front()
1933 .expect("Reader must have read request when remove is called into.")
1934 }
1935
1936 pub(crate) fn get_queue_total_size(&self) -> f64 {
1937 self.queue_total_size.get()
1938 }
1939
1940 pub(crate) fn get_pending_pull_intos_size(&self) -> usize {
1941 self.pending_pull_intos.borrow().len()
1942 }
1943}
1944
1945impl ReadableByteStreamControllerMethods<crate::DomTypeHolder> for ReadableByteStreamController {
1946 /// <https://streams.spec.whatwg.org/#rbs-controller-byob-request>
1947 fn GetByobRequest(
1948 &self,
1949 can_gc: CanGc,
1950 ) -> Fallible<Option<DomRoot<ReadableStreamBYOBRequest>>> {
1951 let cx = GlobalScope::get_cx();
1952 // Return ! ReadableByteStreamControllerGetBYOBRequest(this).
1953 self.get_byob_request(cx, can_gc)
1954 }
1955
1956 /// <https://streams.spec.whatwg.org/#rbs-controller-desired-size>
1957 fn GetDesiredSize(&self) -> Option<f64> {
1958 // Return ! ReadableByteStreamControllerGetDesiredSize(this).
1959 self.get_desired_size()
1960 }
1961
1962 /// <https://streams.spec.whatwg.org/#rbs-controller-close>
1963 fn Close(&self, can_gc: CanGc) -> Fallible<()> {
1964 let cx = GlobalScope::get_cx();
1965 // If this.[[closeRequested]] is true, throw a TypeError exception.
1966 if self.close_requested.get() {
1967 return Err(Error::Type(c"closeRequested is true".to_owned()));
1968 }
1969
1970 // If this.[[stream]].[[state]] is not "readable", throw a TypeError exception.
1971 if !self.stream.get().unwrap().is_readable() {
1972 return Err(Error::Type(c"stream is not readable".to_owned()));
1973 }
1974
1975 // Perform ? ReadableByteStreamControllerClose(this).
1976 self.close(cx, can_gc)
1977 }
1978
1979 /// <https://streams.spec.whatwg.org/#rbs-controller-enqueue>
1980 fn Enqueue(
1981 &self,
1982 cx: &mut js::context::JSContext,
1983 chunk: js::gc::CustomAutoRooterGuard<js::typedarray::ArrayBufferView>,
1984 ) -> Fallible<()> {
1985 let chunk = HeapBufferSource::<ArrayBufferViewU8>::from_view(chunk);
1986
1987 // If chunk.[[ByteLength]] is 0, throw a TypeError exception.
1988 if chunk.byte_length() == 0 {
1989 return Err(Error::Type(c"chunk.ByteLength is 0".to_owned()));
1990 }
1991
1992 // If chunk.[[ViewedArrayBuffer]].[[ByteLength]] is 0, throw a TypeError exception.
1993 if chunk.viewed_buffer_array_byte_length(cx.into()) == 0 {
1994 return Err(Error::Type(
1995 c"chunk.ViewedArrayBuffer.ByteLength is 0".to_owned(),
1996 ));
1997 }
1998
1999 // If this.[[closeRequested]] is true, throw a TypeError exception.
2000 if self.close_requested.get() {
2001 return Err(Error::Type(c"closeRequested is true".to_owned()));
2002 }
2003
2004 // If this.[[stream]].[[state]] is not "readable", throw a TypeError exception.
2005 if !self.stream.get().unwrap().is_readable() {
2006 return Err(Error::Type(c"stream is not readable".to_owned()));
2007 }
2008
2009 // Return ? ReadableByteStreamControllerEnqueue(this, chunk).
2010 self.enqueue(cx, chunk)
2011 }
2012
2013 /// <https://streams.spec.whatwg.org/#rbs-controller-error>
2014 fn Error(&self, cx: &mut js::context::JSContext, e: SafeHandleValue) -> Fallible<()> {
2015 // Perform ! ReadableByteStreamControllerError(this, e).
2016 self.error(e, CanGc::from_cx(cx));
2017 Ok(())
2018 }
2019}