script/dom/stream/readablebytestreamcontroller.rs
1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4
5use std::cell::Cell;
6use std::cmp::min;
7use std::collections::VecDeque;
8use std::rc::Rc;
9
10use dom_struct::dom_struct;
11use js::context::JSContext;
12use js::jsapi::{Heap, Type};
13use js::jsval::UndefinedValue;
14use js::realm::CurrentRealm;
15use js::rust::{HandleObject, HandleValue as SafeHandleValue, HandleValue};
16use js::typedarray::{ArrayBufferU8, ArrayBufferViewU8};
17use script_bindings::cell::DomRefCell;
18use script_bindings::reflector::{Reflector, reflect_dom_object_with_cx};
19
20use super::readablestreambyobreader::ReadIntoRequest;
21use super::readablestreamdefaultreader::ReadRequest;
22use super::underlyingsourcecontainer::{UnderlyingSourceContainer, UnderlyingSourceType};
23use crate::dom::bindings::buffer_source::{
24 Constructor, HeapBufferSource, byte_size, create_array_buffer_with_size,
25 create_buffer_source_with_constructor,
26};
27use crate::dom::bindings::codegen::Bindings::ReadableByteStreamControllerBinding::ReadableByteStreamControllerMethods;
28use crate::dom::bindings::codegen::UnionTypes::ReadableStreamDefaultControllerOrReadableByteStreamController as Controller;
29use crate::dom::bindings::error::{Error, ErrorToJsval, Fallible};
30use crate::dom::bindings::reflector::DomGlobal;
31use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom};
32use crate::dom::bindings::trace::RootedTraceableBox;
33use crate::dom::globalscope::GlobalScope;
34use crate::dom::promise::Promise;
35use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
36use crate::dom::stream::readablestream::ReadableStream;
37use crate::dom::stream::readablestreambyobrequest::ReadableStreamBYOBRequest;
38use crate::realms::enter_auto_realm;
39
40/// <https://streams.spec.whatwg.org/#readable-byte-stream-queue-entry>
41#[derive(JSTraceable, MallocSizeOf)]
42#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
43pub(crate) struct QueueEntry {
44 /// <https://streams.spec.whatwg.org/#readable-byte-stream-queue-entry-buffer>
45 #[ignore_malloc_size_of = "HeapBufferSource"]
46 buffer: HeapBufferSource<ArrayBufferU8>,
47 /// <https://streams.spec.whatwg.org/#readable-byte-stream-queue-entry-byte-offset>
48 byte_offset: usize,
49 /// <https://streams.spec.whatwg.org/#readable-byte-stream-queue-entry-byte-length>
50 byte_length: usize,
51}
52
53impl js::gc::Rootable for QueueEntry {}
54
55impl QueueEntry {
56 pub(crate) fn new(
57 buffer: RootedTraceableBox<HeapBufferSource<ArrayBufferU8>>,
58 byte_offset: usize,
59 byte_length: usize,
60 ) -> QueueEntry {
61 QueueEntry {
62 buffer: *buffer.into_box(),
63 byte_offset,
64 byte_length,
65 }
66 }
67}
68
69#[derive(Debug, Eq, JSTraceable, MallocSizeOf, PartialEq)]
70pub(crate) enum ReaderType {
71 /// <https://streams.spec.whatwg.org/#readablestreambyobreader>
72 Byob,
73 /// <https://streams.spec.whatwg.org/#readablestreamdefaultreader>
74 Default,
75}
76
77/// <https://streams.spec.whatwg.org/#pull-into-descriptor>
78#[derive(Eq, JSTraceable, MallocSizeOf, PartialEq)]
79#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
80pub(crate) struct PullIntoDescriptor {
81 #[ignore_malloc_size_of = "HeapBufferSource"]
82 /// <https://streams.spec.whatwg.org/#pull-into-descriptor-buffer>
83 buffer: HeapBufferSource<ArrayBufferU8>,
84 /// <https://streams.spec.whatwg.org/#pull-into-descriptor-buffer-byte-length>
85 buffer_byte_length: u64,
86 /// <https://streams.spec.whatwg.org/#pull-into-descriptor-byte-offset>
87 byte_offset: u64,
88 /// <https://streams.spec.whatwg.org/#pull-into-descriptor-byte-length>
89 byte_length: u64,
90 /// <https://streams.spec.whatwg.org/#pull-into-descriptor-bytes-filled>
91 bytes_filled: Cell<u64>,
92 /// <https://streams.spec.whatwg.org/#pull-into-descriptor-minimum-fill>
93 minimum_fill: u64,
94 /// <https://streams.spec.whatwg.org/#pull-into-descriptor-element-size>
95 element_size: u64,
96 /// <https://streams.spec.whatwg.org/#pull-into-descriptor-view-constructor>
97 view_constructor: Constructor,
98 /// <https://streams.spec.whatwg.org/#pull-into-descriptor-reader-type>
99 reader_type: Option<ReaderType>,
100}
101
102impl js::gc::Rootable for PullIntoDescriptor {}
103
104/// The fulfillment handler for
105/// <https://streams.spec.whatwg.org/#dom-underlyingsource-start>
106#[derive(Clone, JSTraceable, MallocSizeOf)]
107#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
108struct StartAlgorithmFulfillmentHandler {
109 controller: Dom<ReadableByteStreamController>,
110}
111
112impl Callback for StartAlgorithmFulfillmentHandler {
113 /// Continuation of <https://streams.spec.whatwg.org/#set-up-readable-byte-stream-controller>
114 /// Upon fulfillment of startPromise,
115 fn callback(&self, cx: &mut CurrentRealm, _v: HandleValue) {
116 // Set controller.[[started]] to true.
117 self.controller.started.set(true);
118
119 // Assert: controller.[[pulling]] is false.
120 assert!(!self.controller.pulling.get());
121
122 // Assert: controller.[[pullAgain]] is false.
123 assert!(!self.controller.pull_again.get());
124
125 // Perform ! ReadableByteStreamControllerCallPullIfNeeded(controller).
126 self.controller.call_pull_if_needed(cx);
127 }
128}
129
130/// The rejection handler for
131/// <https://streams.spec.whatwg.org/#dom-underlyingsource-start>
132#[derive(Clone, JSTraceable, MallocSizeOf)]
133#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
134struct StartAlgorithmRejectionHandler {
135 controller: Dom<ReadableByteStreamController>,
136}
137
138impl Callback for StartAlgorithmRejectionHandler {
139 /// Continuation of <https://streams.spec.whatwg.org/#set-up-readable-byte-stream-controller>
140 /// Upon rejection of startPromise with reason r,
141 fn callback(&self, cx: &mut CurrentRealm, v: HandleValue) {
142 // Perform ! ReadableByteStreamControllerError(controller, r).
143 self.controller.error(cx, v);
144 }
145}
146
147/// The fulfillment handler for
148/// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-call-pull-if-needed>
149#[derive(Clone, JSTraceable, MallocSizeOf)]
150#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
151struct PullAlgorithmFulfillmentHandler {
152 controller: Dom<ReadableByteStreamController>,
153}
154
155impl Callback for PullAlgorithmFulfillmentHandler {
156 /// Continuation of <https://streams.spec.whatwg.org/#readable-byte-stream-controller-call-pull-if-needed>
157 /// Upon fulfillment of pullPromise
158 fn callback(&self, cx: &mut CurrentRealm, _v: HandleValue) {
159 // Set controller.[[pulling]] to false.
160 self.controller.pulling.set(false);
161
162 // If controller.[[pullAgain]] is true,
163 if self.controller.pull_again.get() {
164 // Set controller.[[pullAgain]] to false.
165 self.controller.pull_again.set(false);
166
167 // Perform ! ReadableByteStreamControllerCallPullIfNeeded(controller).
168 self.controller.call_pull_if_needed(cx);
169 }
170 }
171}
172
173/// The rejection handler for
174/// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-call-pull-if-needed>
175#[derive(Clone, JSTraceable, MallocSizeOf)]
176#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
177struct PullAlgorithmRejectionHandler {
178 controller: Dom<ReadableByteStreamController>,
179}
180
181impl Callback for PullAlgorithmRejectionHandler {
182 /// Continuation of <https://streams.spec.whatwg.org/#readable-stream-byte-controller-call-pull-if-needed>
183 /// Upon rejection of pullPromise with reason e.
184 fn callback(&self, cx: &mut CurrentRealm, v: HandleValue) {
185 // Perform ! ReadableByteStreamControllerError(controller, e).
186 self.controller.error(cx, v);
187 }
188}
189
190/// <https://streams.spec.whatwg.org/#readablebytestreamcontroller>
191#[dom_struct]
192pub(crate) struct ReadableByteStreamController {
193 reflector_: Reflector,
194 /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-autoallocatechunksize>
195 auto_allocate_chunk_size: Option<u64>,
196 /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-stream>
197 stream: MutNullableDom<ReadableStream>,
198 /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-strategyhwm>
199 strategy_hwm: f64,
200 /// A mutable reference to the underlying source is used to implement these two
201 /// internal slots:
202 ///
203 /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-pullalgorithm>
204 /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-cancelalgorithm>
205 underlying_source: MutNullableDom<UnderlyingSourceContainer>,
206 /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-queue>
207 queue: DomRefCell<VecDeque<QueueEntry>>,
208 /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-queuetotalsize>
209 queue_total_size: Cell<f64>,
210 /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-byobrequest>
211 byob_request: MutNullableDom<ReadableStreamBYOBRequest>,
212 /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-pendingpullintos>
213 pending_pull_intos: DomRefCell<Vec<PullIntoDescriptor>>,
214 /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-closerequested>
215 close_requested: Cell<bool>,
216 /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-started>
217 started: Cell<bool>,
218 /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-pulling>
219 pulling: Cell<bool>,
220 /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-pullalgorithm>
221 pull_again: Cell<bool>,
222}
223
224impl ReadableByteStreamController {
225 fn new_inherited(
226 underlying_source_container: &UnderlyingSourceContainer,
227 strategy_hwm: f64,
228 ) -> ReadableByteStreamController {
229 let auto_allocate_chunk_size = underlying_source_container.auto_allocate_chunk_size();
230 ReadableByteStreamController {
231 reflector_: Reflector::new(),
232 byob_request: MutNullableDom::new(None),
233 stream: MutNullableDom::new(None),
234 underlying_source: MutNullableDom::new(Some(underlying_source_container)),
235 auto_allocate_chunk_size,
236 pending_pull_intos: DomRefCell::new(Vec::new()),
237 strategy_hwm,
238 close_requested: Default::default(),
239 queue: DomRefCell::new(Default::default()),
240 queue_total_size: Default::default(),
241 started: Default::default(),
242 pulling: Default::default(),
243 pull_again: Default::default(),
244 }
245 }
246
247 pub(crate) fn new(
248 cx: &mut JSContext,
249 underlying_source_type: UnderlyingSourceType,
250 strategy_hwm: f64,
251 global: &GlobalScope,
252 ) -> DomRoot<ReadableByteStreamController> {
253 let underlying_source_container =
254 UnderlyingSourceContainer::new(cx, global, underlying_source_type);
255 reflect_dom_object_with_cx(
256 Box::new(ReadableByteStreamController::new_inherited(
257 &underlying_source_container,
258 strategy_hwm,
259 )),
260 global,
261 cx,
262 )
263 }
264
265 #[allow(dead_code)]
266 pub(crate) fn set_stream(&self, stream: &ReadableStream) {
267 self.stream.set(Some(stream))
268 }
269
270 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-pull-into>
271 pub(crate) fn perform_pull_into(
272 &self,
273 cx: &mut JSContext,
274 read_into_request: &ReadIntoRequest,
275 view: &HeapBufferSource<ArrayBufferViewU8>,
276 min: u64,
277 ) {
278 // Let stream be controller.[[stream]].
279 let stream = self.stream.get().unwrap();
280
281 // Let elementSize be 1.
282 let mut element_size = 1;
283
284 // Let ctor be %DataView%.
285 let mut ctor = Constructor::DataView;
286
287 // If view has a [[TypedArrayName]] internal slot (i.e., it is not a DataView),
288 if view.has_typed_array_name() {
289 // Set elementSize to the element size specified in the
290 // typed array constructors table for view.[[TypedArrayName]].
291 let view_typw = view.get_array_buffer_view_type();
292 element_size = byte_size(view_typw);
293
294 // Set ctor to the constructor specified in the typed array constructors table for view.[[TypedArrayName]].
295 ctor = Constructor::Name(view_typw);
296 }
297
298 // Let minimumFill be min × elementSize.
299 let minimum_fill = min * element_size;
300
301 // Assert: minimumFill ≥ 0 and minimumFill ≤ view.[[ByteLength]].
302 assert!(minimum_fill <= (view.byte_length() as u64));
303
304 // Assert: the remainder after dividing minimumFill by elementSize is 0.
305 assert_eq!(minimum_fill % element_size, 0);
306
307 // Let byteOffset be view.[[ByteOffset]].
308 let byte_offset = view.get_byte_offset();
309
310 // Let byteLength be view.[[ByteLength]].
311 let byte_length = view.byte_length();
312
313 // Let bufferResult be TransferArrayBuffer(view.[[ViewedArrayBuffer]]).
314 match view
315 .get_array_buffer_view_buffer(cx)
316 .transfer_array_buffer(cx)
317 {
318 Ok(buffer) => {
319 // Let buffer be bufferResult.[[Value]].
320 // Let pullIntoDescriptor be a new pull-into descriptor with
321 // buffer buffer
322 // buffer byte length buffer.[[ArrayBufferByteLength]]
323 // byte offset byteOffset
324 // byte length byteLength
325 // bytes filled 0
326 // minimum fill minimumFill
327 // element size elementSize
328 // view constructor ctor
329 // reader type "byob"
330 let buffer_byte_length = buffer.byte_length();
331 let pull_into_descriptor = RootedTraceableBox::new(PullIntoDescriptor {
332 buffer: *buffer.into_box(),
333 buffer_byte_length: buffer_byte_length as u64,
334 byte_offset: byte_offset as u64,
335 byte_length: byte_length as u64,
336 bytes_filled: Cell::new(0),
337 minimum_fill,
338 element_size,
339 view_constructor: ctor.clone(),
340 reader_type: Some(ReaderType::Byob),
341 });
342
343 // If controller.[[pendingPullIntos]] is not empty,
344 {
345 let mut pending_pull_intos = self.pending_pull_intos.borrow_mut();
346 if !pending_pull_intos.is_empty() {
347 // Append pullIntoDescriptor to controller.[[pendingPullIntos]].
348 pending_pull_intos.push(*pull_into_descriptor.into_box());
349
350 // Perform ! ReadableStreamAddReadIntoRequest(stream, readIntoRequest).
351 stream.add_read_into_request(read_into_request);
352
353 // Return.
354 return;
355 }
356 }
357
358 // If stream.[[state]] is "closed",
359 if stream.is_closed() {
360 // Let emptyView be ! Construct(ctor, « pullIntoDescriptor’s buffer,
361 // pullIntoDescriptor’s byte offset, 0 »).
362 if let Ok(empty_view) = create_buffer_source_with_constructor(
363 cx,
364 &ctor,
365 &pull_into_descriptor.buffer,
366 pull_into_descriptor.byte_offset as usize,
367 0,
368 ) {
369 // Perform readIntoRequest’s close steps, given emptyView.
370 let result = RootedTraceableBox::new(Heap::default());
371 rooted!(&in(cx) let mut view_value = UndefinedValue());
372 empty_view.get_buffer_view_value(cx, view_value.handle_mut());
373 result.set(*view_value);
374
375 read_into_request.close_steps(cx, Some(result));
376
377 // Return.
378 return;
379 } else {
380 return;
381 }
382 }
383
384 // If controller.[[queueTotalSize]] > 0,
385 if self.queue_total_size.get() > 0.0 {
386 // If ! ReadableByteStreamControllerFillPullIntoDescriptorFromQueue(
387 // controller, pullIntoDescriptor) is true,
388 if self.fill_pull_into_descriptor_from_queue(cx, &pull_into_descriptor) {
389 // Let filledView be ! ReadableByteStreamControllerConvertPullIntoDescriptor(
390 // pullIntoDescriptor).
391 if let Ok(filled_view) =
392 self.convert_pull_into_descriptor(cx, &pull_into_descriptor)
393 {
394 // Perform ! ReadableByteStreamControllerHandleQueueDrain(controller).
395 self.handle_queue_drain(cx);
396
397 // Perform readIntoRequest’s chunk steps, given filledView.
398 let result = RootedTraceableBox::new(Heap::default());
399 rooted!(&in(cx) let mut view_value = UndefinedValue());
400 filled_view.get_buffer_view_value(cx, view_value.handle_mut());
401 result.set(*view_value);
402 read_into_request.chunk_steps(cx, result);
403
404 // Return.
405 return;
406 } else {
407 return;
408 }
409 }
410
411 // If controller.[[closeRequested]] is true,
412 if self.close_requested.get() {
413 // Let e be a new TypeError exception.
414 rooted!(&in(cx) let mut error = UndefinedValue());
415 Error::Type(c"close requested".to_owned()).to_jsval(
416 cx,
417 &self.global(),
418 error.handle_mut(),
419 );
420
421 // Perform ! ReadableByteStreamControllerError(controller, e).
422 self.error(cx, error.handle());
423
424 // Perform readIntoRequest’s error steps, given e.
425 read_into_request.error_steps(cx, error.handle());
426
427 // Return.
428 return;
429 }
430 }
431
432 // Append pullIntoDescriptor to controller.[[pendingPullIntos]].
433 {
434 self.pending_pull_intos
435 .borrow_mut()
436 .push(*pull_into_descriptor.into_box());
437 }
438 // Perform ! ReadableStreamAddReadIntoRequest(stream, readIntoRequest).
439 stream.add_read_into_request(read_into_request);
440
441 // Perform ! ReadableByteStreamControllerCallPullIfNeeded(controller).
442 self.call_pull_if_needed(cx);
443 },
444 Err(error) => {
445 // If bufferResult is an abrupt completion,
446
447 // Perform readIntoRequest’s error steps, given bufferResult.[[Value]].
448 rooted!(&in(cx) let mut rval = UndefinedValue());
449 error.to_jsval(cx, &self.global(), rval.handle_mut());
450 read_into_request.error_steps(cx, rval.handle());
451
452 // Return.
453 },
454 }
455 }
456
457 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-respond>
458 pub(crate) fn respond(&self, cx: &mut JSContext, bytes_written: u64) -> Fallible<()> {
459 {
460 // Assert: controller.[[pendingPullIntos]] is not empty.
461 let mut pending_pull_intos = self.pending_pull_intos.borrow_mut();
462 assert!(!pending_pull_intos.is_empty());
463
464 // Let firstDescriptor be controller.[[pendingPullIntos]][0].
465 let first_descriptor = pending_pull_intos.first_mut().unwrap();
466
467 // Let state be controller.[[stream]].[[state]].
468 let stream = self.stream.get().unwrap();
469
470 // If state is "closed",
471 if stream.is_closed() {
472 // If bytesWritten is not 0, throw a TypeError exception.
473 if bytes_written != 0 {
474 return Err(Error::Type(
475 c"bytesWritten not zero on closed stream".to_owned(),
476 ));
477 }
478 } else {
479 // Assert: state is "readable".
480 assert!(stream.is_readable());
481
482 // If bytesWritten is 0, throw a TypeError exception.
483 if bytes_written == 0 {
484 return Err(Error::Type(c"bytesWritten is 0".to_owned()));
485 }
486
487 // If firstDescriptor’s bytes filled + bytesWritten > firstDescriptor’s byte length,
488 // throw a RangeError exception.
489 if first_descriptor.bytes_filled.get() + bytes_written >
490 first_descriptor.byte_length
491 {
492 return Err(Error::Range(
493 c"bytes filled + bytesWritten > byte length".to_owned(),
494 ));
495 }
496 }
497
498 // Set firstDescriptor’s buffer to ! TransferArrayBuffer(firstDescriptor’s buffer).
499 first_descriptor.buffer = *first_descriptor
500 .buffer
501 .transfer_array_buffer(cx)
502 .expect("TransferArrayBuffer failed")
503 .into_box();
504 }
505
506 // Perform ? ReadableByteStreamControllerRespondInternal(controller, bytesWritten).
507 self.respond_internal(cx, bytes_written)
508 }
509
510 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-respond-internal>
511 fn respond_internal(&self, cx: &mut JSContext, bytes_written: u64) -> Fallible<()> {
512 {
513 // Let firstDescriptor be controller.[[pendingPullIntos]][0].
514 let pending_pull_intos = self.pending_pull_intos.borrow();
515 let first_descriptor = pending_pull_intos.first().unwrap();
516
517 // Assert: ! CanTransferArrayBuffer(firstDescriptor’s buffer) is true
518 assert!(first_descriptor.buffer.can_transfer_array_buffer(cx));
519 }
520
521 // Perform ! ReadableByteStreamControllerInvalidateBYOBRequest(controller).
522 self.invalidate_byob_request();
523
524 // Let state be controller.[[stream]].[[state]].
525 let stream = self.stream.get().unwrap();
526
527 // If state is "closed",
528 if stream.is_closed() {
529 // Assert: bytesWritten is 0.
530 assert_eq!(bytes_written, 0);
531
532 // Perform ! ReadableByteStreamControllerRespondInClosedState(controller, firstDescriptor).
533 self.respond_in_closed_state(cx)
534 .expect("respond_in_closed_state failed");
535 } else {
536 // Assert: state is "readable".
537 assert!(stream.is_readable());
538
539 // Assert: bytesWritten > 0.
540 assert!(bytes_written > 0);
541
542 // Perform ? ReadableByteStreamControllerRespondInReadableState(controller, bytesWritten, firstDescriptor).
543 self.respond_in_readable_state(cx, bytes_written)?;
544 }
545
546 // Perform ! ReadableByteStreamControllerCallPullIfNeeded(controller).
547 self.call_pull_if_needed(cx);
548
549 Ok(())
550 }
551
552 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-respond-in-closed-state>
553 fn respond_in_closed_state(&self, cx: &mut JSContext) -> Fallible<()> {
554 let pending_pull_intos = self.pending_pull_intos.borrow();
555 let first_descriptor = pending_pull_intos.first().unwrap();
556
557 // Assert: the remainder after dividing firstDescriptor’s bytes filled
558 // by firstDescriptor’s element size is 0.
559 assert_eq!(
560 first_descriptor.bytes_filled.get() % first_descriptor.element_size,
561 0
562 );
563
564 // If firstDescriptor’s reader type is "none",
565 // perform ! ReadableByteStreamControllerShiftPendingPullInto(controller).
566 let reader_type = first_descriptor.reader_type.is_none();
567
568 // needed to drop the borrow and avoid BorrowMutError
569 drop(pending_pull_intos);
570
571 if reader_type {
572 self.shift_pending_pull_into();
573 }
574
575 // Let stream be controller.[[stream]].
576 let stream = self.stream.get().unwrap();
577
578 // If ! ReadableStreamHasBYOBReader(stream) is true,
579 if stream.has_byob_reader() {
580 // Let filledPullIntos be a new empty list.
581 rooted!(&in(cx) let mut filled_pull_intos = Vec::new());
582
583 // While filledPullIntos’s size < ! ReadableStreamGetNumReadIntoRequests(stream),
584 while filled_pull_intos.len() < stream.get_num_read_into_requests() {
585 // Let pullIntoDescriptor be ! ReadableByteStreamControllerShiftPendingPullInto(controller).
586 // Append pullIntoDescriptor to filledPullIntos.
587 filled_pull_intos.push(self.shift_pending_pull_into());
588 }
589
590 // For each filledPullInto of filledPullIntos,
591 for filled_pull_into in &*filled_pull_intos {
592 // Perform ! ReadableByteStreamControllerCommitPullIntoDescriptor(stream, filledPullInto).
593 self.commit_pull_into_descriptor(cx, filled_pull_into)
594 .expect("commit_pull_into_descriptor failed");
595 }
596 }
597
598 Ok(())
599 }
600
601 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-respond-in-readable-state>
602 fn respond_in_readable_state(&self, cx: &mut JSContext, bytes_written: u64) -> Fallible<()> {
603 let pending_pull_intos = self.pending_pull_intos.borrow();
604 let first_descriptor = pending_pull_intos.first().unwrap();
605
606 // Assert: pullIntoDescriptor’s bytes filled + bytesWritten ≤ pullIntoDescriptor’s byte length.
607 assert!(
608 first_descriptor.bytes_filled.get() + bytes_written <= first_descriptor.byte_length
609 );
610
611 // Perform ! ReadableByteStreamControllerFillHeadPullIntoDescriptor(
612 // controller, bytesWritten, pullIntoDescriptor).
613 self.fill_head_pull_into_descriptor(bytes_written, first_descriptor);
614
615 // If pullIntoDescriptor’s reader type is "none",
616 if first_descriptor.reader_type.is_none() {
617 // needed to drop the borrow and avoid BorrowMutError
618 drop(pending_pull_intos);
619
620 // Perform ? ReadableByteStreamControllerEnqueueDetachedPullIntoToQueue(controller, pullIntoDescriptor).
621 self.enqueue_detached_pull_into_to_queue(cx)?;
622
623 // Let filledPullIntos be the result of performing
624 // ! ReadableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(controller).
625 rooted!(&in(cx) let filled_pull_intos = self.process_pull_into_descriptors_using_queue(cx));
626
627 // For each filledPullInto of filledPullIntos,
628 for filled_pull_into in &*filled_pull_intos {
629 // Perform ! ReadableByteStreamControllerCommitPullIntoDescriptor(controller.[[stream]]
630 // , filledPullInto).
631 self.commit_pull_into_descriptor(cx, filled_pull_into)
632 .expect("commit_pull_into_descriptor failed");
633 }
634
635 // Return.
636 return Ok(());
637 }
638
639 // If pullIntoDescriptor’s bytes filled < pullIntoDescriptor’s minimum fill, return.
640 if first_descriptor.bytes_filled.get() < first_descriptor.minimum_fill {
641 return Ok(());
642 }
643
644 // needed to drop the borrow and avoid BorrowMutError
645 drop(pending_pull_intos);
646
647 // Perform ! ReadableByteStreamControllerShiftPendingPullInto(controller).
648 rooted!(&in(cx) let pull_into_descriptor = self.shift_pending_pull_into());
649
650 // Let remainderSize be the remainder after dividing pullIntoDescriptor’s bytes
651 // filled by pullIntoDescriptor’s element size.
652 let remainder_size =
653 pull_into_descriptor.bytes_filled.get() % pull_into_descriptor.element_size;
654
655 // If remainderSize > 0,
656 if remainder_size > 0 {
657 // Let end be pullIntoDescriptor’s byte offset + pullIntoDescriptor’s bytes filled.
658 let end = pull_into_descriptor.byte_offset + pull_into_descriptor.bytes_filled.get();
659
660 // Perform ? ReadableByteStreamControllerEnqueueClonedChunkToQueue(controller,
661 // pullIntoDescriptor’s buffer, end − remainderSize, remainderSize).
662 self.enqueue_cloned_chunk_to_queue(
663 cx,
664 &pull_into_descriptor.buffer,
665 end - remainder_size,
666 remainder_size,
667 )?;
668 }
669
670 // Set pullIntoDescriptor’s bytes filled to pullIntoDescriptor’s bytes filled − remainderSize.
671 pull_into_descriptor
672 .bytes_filled
673 .set(pull_into_descriptor.bytes_filled.get() - remainder_size);
674
675 // Let filledPullIntos be the result of performing
676 // ! ReadableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(controller).
677 rooted!(&in(cx) let filled_pull_intos = self.process_pull_into_descriptors_using_queue(cx));
678
679 // Perform ! ReadableByteStreamControllerCommitPullIntoDescriptor(controller.[[stream]], pullIntoDescriptor).
680 self.commit_pull_into_descriptor(cx, &pull_into_descriptor)
681 .expect("commit_pull_into_descriptor failed");
682
683 // For each filledPullInto of filledPullIntos,
684 for filled_pull_into in &*filled_pull_intos {
685 // Perform ! ReadableByteStreamControllerCommitPullIntoDescriptor(controller.[[stream]], filledPullInto).
686 self.commit_pull_into_descriptor(cx, filled_pull_into)
687 .expect("commit_pull_into_descriptor failed");
688 }
689
690 Ok(())
691 }
692
693 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-respond-with-new-view>
694 pub(crate) fn respond_with_new_view(
695 &self,
696 cx: &mut JSContext,
697 view: &HeapBufferSource<ArrayBufferViewU8>,
698 ) -> Fallible<()> {
699 let view_byte_length;
700 {
701 // Assert: controller.[[pendingPullIntos]] is not empty.
702 let mut pending_pull_intos = self.pending_pull_intos.borrow_mut();
703 assert!(!pending_pull_intos.is_empty());
704
705 // Assert: ! IsDetachedBuffer(view.[[ViewedArrayBuffer]]) is false.
706 assert!(!view.is_detached_buffer(cx));
707
708 // Let firstDescriptor be controller.[[pendingPullIntos]][0].
709 let first_descriptor = pending_pull_intos.first_mut().unwrap();
710
711 // Let state be controller.[[stream]].[[state]].
712 let stream = self.stream.get().unwrap();
713
714 // If state is "closed",
715 if stream.is_closed() {
716 // If view.[[ByteLength]] is not 0, throw a TypeError exception.
717 if view.byte_length() != 0 {
718 return Err(Error::Type(c"view byte length is not 0".to_owned()));
719 }
720 } else {
721 // Assert: state is "readable".
722 assert!(stream.is_readable());
723
724 // If view.[[ByteLength]] is 0, throw a TypeError exception.
725 if view.byte_length() == 0 {
726 return Err(Error::Type(c"view byte length is 0".to_owned()));
727 }
728 }
729
730 // If firstDescriptor’s byte offset + firstDescriptor’ bytes filled is not view.[[ByteOffset]],
731 // throw a RangeError exception.
732 if first_descriptor.byte_offset + first_descriptor.bytes_filled.get() !=
733 (view.get_byte_offset() as u64)
734 {
735 return Err(Error::Range(
736 c"firstDescriptor's byte offset + bytes filled is not view byte offset"
737 .to_owned(),
738 ));
739 }
740
741 // If firstDescriptor’s buffer byte length is not view.[[ViewedArrayBuffer]].[[ByteLength]],
742 // throw a RangeError exception.
743 if first_descriptor.buffer_byte_length !=
744 (view.viewed_buffer_array_byte_length(cx) as u64)
745 {
746 return Err(Error::Range(
747 c"firstDescriptor's buffer byte length is not view viewed buffer array byte length"
748 .to_owned(),
749 ));
750 }
751
752 // If firstDescriptor’s bytes filled + view.[[ByteLength]] > firstDescriptor’s byte length,
753 // throw a RangeError exception.
754 if first_descriptor.bytes_filled.get() + (view.byte_length()) as u64 >
755 first_descriptor.byte_length
756 {
757 return Err(Error::Range(
758 c"bytes filled + view byte length > byte length".to_owned(),
759 ));
760 }
761
762 // Let viewByteLength be view.[[ByteLength]].
763 view_byte_length = view.byte_length();
764
765 // Set firstDescriptor’s buffer to ? TransferArrayBuffer(view.[[ViewedArrayBuffer]]).
766 first_descriptor.buffer = *view
767 .get_array_buffer_view_buffer(cx)
768 .transfer_array_buffer(cx)?
769 .into_box();
770 }
771
772 // Perform ? ReadableByteStreamControllerRespondInternal(controller, viewByteLength).
773 self.respond_internal(cx, view_byte_length as u64)
774 }
775
776 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-get-desired-size>
777 pub(crate) fn get_desired_size(&self) -> Option<f64> {
778 // Let state be controller.[[stream]].[[state]].
779 let stream = self.stream.get()?;
780
781 // If state is "errored", return null.
782 if stream.is_errored() {
783 return None;
784 }
785
786 // If state is "closed", return 0.
787 if stream.is_closed() {
788 return Some(0.0);
789 }
790
791 // Return controller.[[strategyHWM]] − controller.[[queueTotalSize]].
792 Some(self.strategy_hwm - self.queue_total_size.get())
793 }
794
795 /// <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamcontrollergetbyobrequest>
796 pub(crate) fn get_byob_request(
797 &self,
798 cx: &mut js::context::JSContext,
799 ) -> Fallible<Option<DomRoot<ReadableStreamBYOBRequest>>> {
800 // If controller.[[byobRequest]] is null and controller.[[pendingPullIntos]] is not empty,
801 let pending_pull_intos = self.pending_pull_intos.borrow();
802 if self.byob_request.get().is_none() && !pending_pull_intos.is_empty() {
803 // Let firstDescriptor be controller.[[pendingPullIntos]][0].
804 let first_descriptor = pending_pull_intos.first().unwrap();
805 // Let view be ! Construct(%Uint8Array%, « firstDescriptor’s buffer,
806 // firstDescriptor’s byte offset + firstDescriptor’s bytes filled,
807 // firstDescriptor’s byte length − firstDescriptor’s bytes filled »).
808
809 let byte_offset = first_descriptor.byte_offset + first_descriptor.bytes_filled.get();
810 let byte_length = first_descriptor.byte_length - first_descriptor.bytes_filled.get();
811
812 let view = create_buffer_source_with_constructor(
813 cx,
814 &Constructor::Name(Type::Uint8),
815 &first_descriptor.buffer,
816 byte_offset as usize,
817 byte_length as usize,
818 )
819 .expect("Construct Uint8Array failed");
820
821 // Let byobRequest be a new ReadableStreamBYOBRequest.
822 let byob_request = ReadableStreamBYOBRequest::new(cx, &self.global());
823
824 // Set byobRequest.[[controller]] to controller.
825 byob_request.set_controller(Some(&DomRoot::from_ref(self)));
826
827 // Set byobRequest.[[view]] to view.
828 byob_request.set_view(Some(view));
829
830 // Set controller.[[byobRequest]] to byobRequest.
831 self.byob_request.set(Some(&byob_request));
832 }
833
834 // Return controller.[[byobRequest]].
835 Ok(self.byob_request.get())
836 }
837
838 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-close>
839 pub(crate) fn close(&self, cx: &mut JSContext) -> Fallible<()> {
840 // Let stream be controller.[[stream]].
841 let stream = self.stream.get().unwrap();
842
843 // If controller.[[closeRequested]] is true or stream.[[state]] is not "readable", return.
844 if self.close_requested.get() || !stream.is_readable() {
845 return Ok(());
846 }
847
848 // If controller.[[queueTotalSize]] > 0,
849 if self.queue_total_size.get() > 0.0 {
850 // Set controller.[[closeRequested]] to true.
851 self.close_requested.set(true);
852 // Return.
853 return Ok(());
854 }
855
856 {
857 // If controller.[[pendingPullIntos]] is not empty,
858 let pending_pull_intos = self.pending_pull_intos.borrow();
859 if !pending_pull_intos.is_empty() {
860 // Let firstPendingPullInto be controller.[[pendingPullIntos]][0].
861 let first_pending_pull_into = pending_pull_intos.first().unwrap();
862
863 // If the remainder after dividing firstPendingPullInto’s bytes filled by
864 // firstPendingPullInto’s element size is not 0,
865 if !first_pending_pull_into
866 .bytes_filled
867 .get()
868 .is_multiple_of(first_pending_pull_into.element_size)
869 {
870 // needed to drop the borrow and avoid BorrowMutError
871 drop(pending_pull_intos);
872
873 // Let e be a new TypeError exception.
874 let e = Error::Type(
875 c"remainder after dividing firstPendingPullInto's bytes
876 filled by firstPendingPullInto's element size is not 0"
877 .to_owned(),
878 );
879
880 // Perform ! ReadableByteStreamControllerError(controller, e).
881 rooted!(&in(cx) let mut error = UndefinedValue());
882 e.clone().to_jsval(cx, &self.global(), error.handle_mut());
883 self.error(cx, error.handle());
884
885 // Throw e.
886 return Err(e);
887 }
888 }
889 }
890
891 // Perform ! ReadableByteStreamControllerClearAlgorithms(controller).
892 self.clear_algorithms();
893
894 // Perform ! ReadableStreamClose(stream).
895 stream.close(cx);
896 Ok(())
897 }
898
899 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-error>
900 pub(crate) fn error(&self, cx: &mut JSContext, e: SafeHandleValue) {
901 // Let stream be controller.[[stream]].
902 let stream = self.stream.get().unwrap();
903
904 // If stream.[[state]] is not "readable", return.
905 if !stream.is_readable() {
906 return;
907 }
908
909 // Perform ! ReadableByteStreamControllerClearPendingPullIntos(controller).
910 self.clear_pending_pull_intos();
911
912 // Perform ! ResetQueue(controller).
913 self.reset_queue();
914
915 // Perform ! ReadableByteStreamControllerClearAlgorithms(controller).
916 self.clear_algorithms();
917
918 // Perform ! ReadableStreamError(stream, e).
919 stream.error(cx, e);
920 }
921
922 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-clear-algorithms>
923 fn clear_algorithms(&self) {
924 // Set controller.[[pullAlgorithm]] to undefined.
925 // Set controller.[[cancelAlgorithm]] to undefined.
926 self.underlying_source.set(None);
927 }
928
929 /// <https://streams.spec.whatwg.org/#reset-queue>
930 pub(crate) fn reset_queue(&self) {
931 // Assert: container has [[queue]] and [[queueTotalSize]] internal slots.
932
933 // Set container.[[queue]] to a new empty list.
934 self.queue.borrow_mut().clear();
935
936 // Set container.[[queueTotalSize]] to 0.
937 self.queue_total_size.set(0.0);
938 }
939
940 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-clear-pending-pull-intos>
941 pub(crate) fn clear_pending_pull_intos(&self) {
942 // Perform ! ReadableByteStreamControllerInvalidateBYOBRequest(controller).
943 self.invalidate_byob_request();
944
945 // Set controller.[[pendingPullIntos]] to a new empty list.
946 self.pending_pull_intos.borrow_mut().clear();
947 }
948
949 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-invalidate-byob-request>
950 pub(crate) fn invalidate_byob_request(&self) {
951 if let Some(byob_request) = self.byob_request.get() {
952 // Set controller.[[byobRequest]].[[controller]] to undefined.
953 byob_request.set_controller(None);
954
955 // Set controller.[[byobRequest]].[[view]] to null.
956 byob_request.set_view(None);
957
958 // Set controller.[[byobRequest]] to null.
959 self.byob_request.set(None);
960 }
961 // If controller.[[byobRequest]] is null, return.
962 }
963
964 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-enqueue>
965 pub(crate) fn enqueue(
966 &self,
967 cx: &mut JSContext,
968 chunk: RootedTraceableBox<HeapBufferSource<ArrayBufferViewU8>>,
969 ) -> Fallible<()> {
970 // Let stream be controller.[[stream]].
971 let stream = self.stream.get().unwrap();
972
973 // If controller.[[closeRequested]] is true or stream.[[state]] is not "readable", return.
974 if self.close_requested.get() || !stream.is_readable() {
975 return Ok(());
976 }
977
978 // Let buffer be chunk.[[ViewedArrayBuffer]].
979 let buffer = chunk.get_array_buffer_view_buffer(cx);
980
981 // Let byteOffset be chunk.[[ByteOffset]].
982 let byte_offset = chunk.get_byte_offset();
983
984 // Let byteLength be chunk.[[ByteLength]].
985 let byte_length = chunk.byte_length();
986
987 // If ! IsDetachedBuffer(buffer) is true, throw a TypeError exception.
988 if buffer.is_detached_buffer(cx) {
989 return Err(Error::Type(c"buffer is detached".to_owned()));
990 }
991
992 // Let transferredBuffer be ? TransferArrayBuffer(buffer).
993 let transferred_buffer = buffer.transfer_array_buffer(cx)?;
994
995 // If controller.[[pendingPullIntos]] is not empty,
996 {
997 let mut pending_pull_intos = self.pending_pull_intos.borrow_mut();
998 if !pending_pull_intos.is_empty() {
999 // Let firstPendingPullInto be controller.[[pendingPullIntos]][0].
1000 let first_descriptor = pending_pull_intos.first_mut().unwrap();
1001 // If ! IsDetachedBuffer(firstPendingPullInto’s buffer) is true, throw a TypeError exception.
1002 if first_descriptor.buffer.is_detached_buffer(cx) {
1003 return Err(Error::Type(c"buffer is detached".to_owned()));
1004 }
1005
1006 // Perform ! ReadableByteStreamControllerInvalidateBYOBRequest(controller).
1007 self.invalidate_byob_request();
1008
1009 // Set firstPendingPullInto’s buffer to ! TransferArrayBuffer(firstPendingPullInto’s buffer).
1010 first_descriptor.buffer = *first_descriptor
1011 .buffer
1012 .transfer_array_buffer(cx)
1013 .expect("TransferArrayBuffer failed")
1014 .into_box();
1015
1016 // If firstPendingPullInto’s reader type is "none",
1017 if first_descriptor.reader_type.is_none() {
1018 // needed to drop the borrow and avoid BorrowMutError
1019 drop(pending_pull_intos);
1020
1021 // perform ? ReadableByteStreamControllerEnqueueDetachedPullIntoToQueue(
1022 // controller, firstPendingPullInto).
1023 self.enqueue_detached_pull_into_to_queue(cx)?;
1024 }
1025 }
1026 }
1027
1028 // If ! ReadableStreamHasDefaultReader(stream) is true,
1029 if stream.has_default_reader() {
1030 // Perform ! ReadableByteStreamControllerProcessReadRequestsUsingQueue(controller).
1031 self.process_read_requests_using_queue(cx)
1032 .expect("process_read_requests_using_queue failed");
1033
1034 // If ! ReadableStreamGetNumReadRequests(stream) is 0,
1035 if stream.get_num_read_requests() == 0 {
1036 // Assert: controller.[[pendingPullIntos]] is empty.
1037 {
1038 assert!(self.pending_pull_intos.borrow().is_empty());
1039 }
1040
1041 // Perform ! ReadableByteStreamControllerEnqueueChunkToQueue(
1042 // controller, transferredBuffer, byteOffset, byteLength).
1043 self.enqueue_chunk_to_queue(transferred_buffer, byte_offset, byte_length);
1044 } else {
1045 // Assert: controller.[[queue]] is empty.
1046 assert!(self.queue.borrow().is_empty());
1047
1048 // If controller.[[pendingPullIntos]] is not empty,
1049
1050 let pending_pull_intos = self.pending_pull_intos.borrow();
1051 if !pending_pull_intos.is_empty() {
1052 // Assert: controller.[[pendingPullIntos]][0]'s reader type is "default".
1053 assert!(matches!(
1054 pending_pull_intos.first().unwrap().reader_type,
1055 Some(ReaderType::Default)
1056 ));
1057
1058 // needed to drop the borrow and avoid BorrowMutError
1059 drop(pending_pull_intos);
1060
1061 // Perform ! ReadableByteStreamControllerShiftPendingPullInto(controller).
1062 self.shift_pending_pull_into();
1063 }
1064
1065 // Let transferredView be ! Construct(%Uint8Array%, « transferredBuffer, byteOffset, byteLength »).
1066 let transferred_view = create_buffer_source_with_constructor(
1067 cx,
1068 &Constructor::Name(Type::Uint8),
1069 &transferred_buffer,
1070 byte_offset,
1071 byte_length,
1072 )
1073 .expect("Construct Uint8Array failed");
1074
1075 // Perform ! ReadableStreamFulfillReadRequest(stream, transferredView, false).
1076 rooted!(&in(cx) let mut view_value = UndefinedValue());
1077 transferred_view.get_buffer_view_value(cx, view_value.handle_mut());
1078 stream.fulfill_read_request(cx, view_value.handle(), false);
1079 }
1080 // Otherwise, if ! ReadableStreamHasBYOBReader(stream) is true,
1081 } else if stream.has_byob_reader() {
1082 // Perform ! ReadableByteStreamControllerEnqueueChunkToQueue(
1083 // controller, transferredBuffer, byteOffset, byteLength).
1084 self.enqueue_chunk_to_queue(transferred_buffer, byte_offset, byte_length);
1085
1086 // Let filledPullIntos be the result of performing !
1087 // ReadableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(controller).
1088 rooted!(&in(cx) let filled_pull_intos = self.process_pull_into_descriptors_using_queue(cx));
1089
1090 // For each filledPullInto of filledPullIntos,
1091 // Perform ! ReadableByteStreamControllerCommitPullIntoDescriptor(stream, filledPullInto).
1092 for filled_pull_into in &*filled_pull_intos {
1093 self.commit_pull_into_descriptor(cx, filled_pull_into)
1094 .expect("commit_pull_into_descriptor failed");
1095 }
1096 } else {
1097 // Assert: ! IsReadableStreamLocked(stream) is false.
1098 assert!(!stream.is_locked());
1099
1100 // Perform ! ReadableByteStreamControllerEnqueueChunkToQueue
1101 // (controller, transferredBuffer, byteOffset, byteLength).
1102 self.enqueue_chunk_to_queue(transferred_buffer, byte_offset, byte_length);
1103 }
1104
1105 // Perform ! ReadableByteStreamControllerCallPullIfNeeded(controller).
1106 self.call_pull_if_needed(cx);
1107
1108 Ok(())
1109 }
1110
1111 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-commit-pull-into-descriptor>
1112 fn commit_pull_into_descriptor(
1113 &self,
1114 cx: &mut JSContext,
1115 pull_into_descriptor: &PullIntoDescriptor,
1116 ) -> Fallible<()> {
1117 // Assert: stream.[[state]] is not "errored".
1118 let stream = self.stream.get().unwrap();
1119 assert!(!stream.is_errored());
1120
1121 // Assert: pullIntoDescriptor.reader type is not "none".
1122 assert!(pull_into_descriptor.reader_type.is_some());
1123
1124 // Let done be false.
1125 let mut done = false;
1126
1127 // If stream.[[state]] is "closed",
1128 if stream.is_closed() {
1129 // Assert: the remainder after dividing pullIntoDescriptor’s bytes filled
1130 // by pullIntoDescriptor’s element size is 0.
1131 assert!(
1132 pull_into_descriptor
1133 .bytes_filled
1134 .get()
1135 .is_multiple_of(pull_into_descriptor.element_size)
1136 );
1137
1138 // Set done to true.
1139 done = true;
1140 }
1141
1142 // Let filledView be ! ReadableByteStreamControllerConvertPullIntoDescriptor(pullIntoDescriptor).
1143 let filled_view = self
1144 .convert_pull_into_descriptor(cx, pull_into_descriptor)
1145 .expect("convert_pull_into_descriptor failed");
1146
1147 rooted!(&in(cx) let mut view_value = UndefinedValue());
1148 filled_view.get_buffer_view_value(cx, view_value.handle_mut());
1149
1150 // If pullIntoDescriptor’s reader type is "default",
1151 if matches!(pull_into_descriptor.reader_type, Some(ReaderType::Default)) {
1152 // Perform ! ReadableStreamFulfillReadRequest(stream, filledView, done).
1153
1154 stream.fulfill_read_request(cx, view_value.handle(), done);
1155 } else {
1156 // Assert: pullIntoDescriptor’s reader type is "byob".
1157 assert!(matches!(
1158 pull_into_descriptor.reader_type,
1159 Some(ReaderType::Byob)
1160 ));
1161
1162 // Perform ! ReadableStreamFulfillReadIntoRequest(stream, filledView, done).
1163 stream.fulfill_read_into_request(cx, view_value.handle(), done);
1164 }
1165 Ok(())
1166 }
1167
1168 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-convert-pull-into-descriptor>
1169 pub(crate) fn convert_pull_into_descriptor(
1170 &self,
1171 cx: &mut js::context::JSContext,
1172 pull_into_descriptor: &PullIntoDescriptor,
1173 ) -> Fallible<RootedTraceableBox<HeapBufferSource<ArrayBufferViewU8>>> {
1174 // Let bytesFilled be pullIntoDescriptor’s bytes filled.
1175 let bytes_filled = pull_into_descriptor.bytes_filled.get();
1176
1177 // Let elementSize be pullIntoDescriptor’s element size.
1178 let element_size = pull_into_descriptor.element_size;
1179
1180 // Assert: bytesFilled ≤ pullIntoDescriptor’s byte length.
1181 assert!(bytes_filled <= pull_into_descriptor.byte_length);
1182
1183 // Assert: the remainder after dividing bytesFilled by elementSize is 0.
1184 assert!(bytes_filled.is_multiple_of(element_size));
1185
1186 // Let buffer be ! TransferArrayBuffer(pullIntoDescriptor’s buffer).
1187 let buffer = pull_into_descriptor
1188 .buffer
1189 .transfer_array_buffer(cx)
1190 .expect("TransferArrayBuffer failed");
1191
1192 // Return ! Construct(pullIntoDescriptor’s view constructor,
1193 // « buffer, pullIntoDescriptor’s byte offset, bytesFilled ÷ elementSize »).
1194 Ok(create_buffer_source_with_constructor(
1195 cx,
1196 &pull_into_descriptor.view_constructor,
1197 &buffer,
1198 pull_into_descriptor.byte_offset as usize,
1199 (bytes_filled / element_size) as usize,
1200 )
1201 .expect("Construct view failed"))
1202 }
1203
1204 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-process-pull-into-descriptors-using-queue>
1205 pub(crate) fn process_pull_into_descriptors_using_queue(
1206 &self,
1207 cx: &mut js::context::JSContext,
1208 ) -> Vec<PullIntoDescriptor> {
1209 // Assert: controller.[[closeRequested]] is false.
1210 assert!(!self.close_requested.get());
1211
1212 // Let filledPullIntos be a new empty list.
1213 rooted!(&in(cx) let mut filled_pull_intos = Vec::new());
1214
1215 // While controller.[[pendingPullIntos]] is not empty,
1216 loop {
1217 // If controller.[[queueTotalSize]] is 0, then break.
1218 if self.queue_total_size.get() == 0.0 {
1219 break;
1220 }
1221
1222 // Let pullIntoDescriptor be controller.[[pendingPullIntos]][0].
1223 let fill_pull_result = {
1224 let pending_pull_intos = self.pending_pull_intos.borrow();
1225 let Some(pull_into_descriptor) = pending_pull_intos.first() else {
1226 break;
1227 };
1228 self.fill_pull_into_descriptor_from_queue(cx, pull_into_descriptor)
1229 };
1230
1231 // If ! ReadableByteStreamControllerFillPullIntoDescriptorFromQueue(controller, pullIntoDescriptor) is true,
1232 if fill_pull_result {
1233 // Perform ! ReadableByteStreamControllerShiftPendingPullInto(controller).
1234 // Append pullIntoDescriptor to filledPullIntos.
1235 filled_pull_intos.push(self.shift_pending_pull_into());
1236 }
1237 }
1238
1239 // Return filledPullIntos.
1240 filled_pull_intos.take()
1241 }
1242
1243 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-fill-pull-into-descriptor-from-queue>
1244 pub(crate) fn fill_pull_into_descriptor_from_queue(
1245 &self,
1246 cx: &mut js::context::JSContext,
1247 pull_into_descriptor: &PullIntoDescriptor,
1248 ) -> bool {
1249 // Let maxBytesToCopy be min(controller.[[queueTotalSize]],
1250 // pullIntoDescriptor’s byte length − pullIntoDescriptor’s bytes filled).
1251 let max_bytes_to_copy = min(
1252 self.queue_total_size.get() as usize,
1253 (pull_into_descriptor.byte_length - pull_into_descriptor.bytes_filled.get()) as usize,
1254 );
1255
1256 // Let maxBytesFilled be pullIntoDescriptor’s bytes filled + maxBytesToCopy.
1257 let max_bytes_filled = pull_into_descriptor.bytes_filled.get() as usize + max_bytes_to_copy;
1258
1259 // Let totalBytesToCopyRemaining be maxBytesToCopy.
1260 let mut total_bytes_to_copy_remaining = max_bytes_to_copy;
1261
1262 // Let ready be false.
1263 let mut ready = false;
1264
1265 // Assert: ! IsDetachedBuffer(pullIntoDescriptor’s buffer) is false.
1266 assert!(!pull_into_descriptor.buffer.is_detached_buffer(cx));
1267
1268 // Assert: pullIntoDescriptor’s bytes filled < pullIntoDescriptor’s minimum fill.
1269 assert!(pull_into_descriptor.bytes_filled.get() < pull_into_descriptor.minimum_fill);
1270
1271 // Let remainderBytes be the remainder after dividing maxBytesFilled by pullIntoDescriptor’s element size.
1272 let remainder_bytes = max_bytes_filled % pull_into_descriptor.element_size as usize;
1273
1274 // Let maxAlignedBytes be maxBytesFilled − remainderBytes.
1275 let max_aligned_bytes = max_bytes_filled - remainder_bytes;
1276
1277 // If maxAlignedBytes ≥ pullIntoDescriptor’s minimum fill,
1278 if max_aligned_bytes >= pull_into_descriptor.minimum_fill as usize {
1279 // Set totalBytesToCopyRemaining to maxAlignedBytes − pullIntoDescriptor’s bytes filled.
1280 total_bytes_to_copy_remaining =
1281 max_aligned_bytes - (pull_into_descriptor.bytes_filled.get() as usize);
1282
1283 // Set ready to true.
1284 ready = true;
1285 }
1286
1287 // Let queue be controller.[[queue]].
1288 let mut queue = self.queue.borrow_mut();
1289
1290 // While totalBytesToCopyRemaining > 0,
1291 while total_bytes_to_copy_remaining > 0 {
1292 // Let headOfQueue be queue[0].
1293 let head_of_queue = queue.front_mut().unwrap();
1294
1295 // Let bytesToCopy be min(totalBytesToCopyRemaining, headOfQueue’s byte length).
1296 let bytes_to_copy = total_bytes_to_copy_remaining.min(head_of_queue.byte_length);
1297
1298 // Let destStart be pullIntoDescriptor’s byte offset + pullIntoDescriptor’s bytes filled.
1299 let dest_start =
1300 pull_into_descriptor.byte_offset + pull_into_descriptor.bytes_filled.get();
1301
1302 // Let descriptorBuffer be pullIntoDescriptor’s buffer.
1303 let descriptor_buffer = &pull_into_descriptor.buffer;
1304
1305 // Let queueBuffer be headOfQueue’s buffer.
1306 let queue_buffer = &head_of_queue.buffer;
1307
1308 // Let queueByteOffset be headOfQueue’s byte offset.
1309 let queue_byte_offset = head_of_queue.byte_offset;
1310
1311 // Assert: ! CanCopyDataBlockBytes(descriptorBuffer, destStart,
1312 // queueBuffer, queueByteOffset, bytesToCopy) is true.
1313 assert!(descriptor_buffer.can_copy_data_block_bytes(
1314 cx,
1315 dest_start as usize,
1316 queue_buffer,
1317 queue_byte_offset,
1318 bytes_to_copy
1319 ));
1320
1321 // Perform ! CopyDataBlockBytes(descriptorBuffer.[[ArrayBufferData]], destStart,
1322 // queueBuffer.[[ArrayBufferData]], queueByteOffset, bytesToCopy).
1323 descriptor_buffer.copy_data_block_bytes(
1324 cx,
1325 dest_start as usize,
1326 queue_buffer,
1327 queue_byte_offset,
1328 bytes_to_copy,
1329 );
1330
1331 // If headOfQueue’s byte length is bytesToCopy,
1332 if head_of_queue.byte_length == bytes_to_copy {
1333 // Remove queue[0].
1334 queue.pop_front().unwrap();
1335 } else {
1336 // Set headOfQueue’s byte offset to headOfQueue’s byte offset + bytesToCopy.
1337 head_of_queue.byte_offset += bytes_to_copy;
1338
1339 // Set headOfQueue’s byte length to headOfQueue’s byte length − bytesToCopy.
1340 head_of_queue.byte_length -= bytes_to_copy;
1341 }
1342
1343 // Set controller.[[queueTotalSize]] to controller.[[queueTotalSize]] − bytesToCopy.
1344 self.queue_total_size
1345 .set(self.queue_total_size.get() - (bytes_to_copy as f64));
1346
1347 // Perform ! ReadableByteStreamControllerFillHeadPullIntoDescriptor(
1348 // controller, bytesToCopy, pullIntoDescriptor).
1349 self.fill_head_pull_into_descriptor(bytes_to_copy as u64, pull_into_descriptor);
1350
1351 // Set totalBytesToCopyRemaining to totalBytesToCopyRemaining − bytesToCopy.
1352 total_bytes_to_copy_remaining -= bytes_to_copy;
1353 }
1354
1355 // If ready is false,
1356 if !ready {
1357 // Assert: controller.[[queueTotalSize]] is 0.
1358 assert!(self.queue_total_size.get() == 0.0);
1359
1360 // Assert: pullIntoDescriptor’s bytes filled > 0.
1361 assert!(pull_into_descriptor.bytes_filled.get() > 0);
1362
1363 // Assert: pullIntoDescriptor’s bytes filled < pullIntoDescriptor’s minimum fill.
1364 assert!(pull_into_descriptor.bytes_filled.get() < pull_into_descriptor.minimum_fill);
1365 }
1366
1367 // Return ready.
1368 ready
1369 }
1370
1371 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-fill-head-pull-into-descriptor>
1372 pub(crate) fn fill_head_pull_into_descriptor(
1373 &self,
1374 bytes_copied: u64,
1375 pull_into_descriptor: &PullIntoDescriptor,
1376 ) {
1377 // Assert: either controller.[[pendingPullIntos]] is empty,
1378 // or controller.[[pendingPullIntos]][0] is pullIntoDescriptor.
1379 {
1380 let pending_pull_intos = self.pending_pull_intos.borrow();
1381 assert!(
1382 pending_pull_intos.is_empty() ||
1383 pending_pull_intos.first().unwrap() == pull_into_descriptor
1384 );
1385 }
1386
1387 // Assert: controller.[[byobRequest]] is null.
1388 assert!(self.byob_request.get().is_none());
1389
1390 // Set pullIntoDescriptor’s bytes filled to bytes filled + size.
1391 pull_into_descriptor
1392 .bytes_filled
1393 .set(pull_into_descriptor.bytes_filled.get() + bytes_copied);
1394 }
1395
1396 /// <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamcontrollerenqueuedetachedpullintotoqueue>
1397 pub(crate) fn enqueue_detached_pull_into_to_queue(&self, cx: &mut JSContext) -> Fallible<()> {
1398 // first_descriptor: &PullIntoDescriptor,
1399 let pending_pull_intos = self.pending_pull_intos.borrow();
1400 let first_descriptor = pending_pull_intos.first().unwrap();
1401
1402 // Assert: pullIntoDescriptor’s reader type is "none".
1403 assert!(first_descriptor.reader_type.is_none());
1404
1405 // If pullIntoDescriptor’s bytes filled > 0, perform ?
1406 // ReadableByteStreamControllerEnqueueClonedChunkToQueue(controller,
1407 // pullIntoDescriptor’s buffer, pullIntoDescriptor’s byte offset, pullIntoDescriptor’s bytes filled).
1408
1409 if first_descriptor.bytes_filled.get() > 0 {
1410 self.enqueue_cloned_chunk_to_queue(
1411 cx,
1412 &first_descriptor.buffer,
1413 first_descriptor.byte_offset,
1414 first_descriptor.bytes_filled.get(),
1415 )?;
1416 }
1417
1418 // needed to drop the borrow and avoid BorrowMutError
1419 drop(pending_pull_intos);
1420
1421 // Perform ! ReadableByteStreamControllerShiftPendingPullInto(controller).
1422 self.shift_pending_pull_into();
1423
1424 Ok(())
1425 }
1426
1427 /// <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamcontrollerenqueueclonedchunktoqueue>
1428 pub(crate) fn enqueue_cloned_chunk_to_queue(
1429 &self,
1430 cx: &mut JSContext,
1431 buffer: &HeapBufferSource<ArrayBufferU8>,
1432 byte_offset: u64,
1433 byte_length: u64,
1434 ) -> Fallible<()> {
1435 // Let cloneResult be CloneArrayBuffer(buffer, byteOffset, byteLength, %ArrayBuffer%).
1436 if let Ok(clone_result) =
1437 buffer.clone_array_buffer(cx, byte_offset as usize, byte_length as usize)
1438 {
1439 // Perform ! ReadableByteStreamControllerEnqueueChunkToQueue
1440 // (controller, cloneResult.[[Value]], 0, byteLength).
1441 self.enqueue_chunk_to_queue(clone_result, 0, byte_length as usize);
1442
1443 Ok(())
1444 } else {
1445 // If cloneResult is an abrupt completion,
1446
1447 // Perform ! ReadableByteStreamControllerError(controller, cloneResult.[[Value]]).
1448 rooted!(&in(cx) let mut rval = UndefinedValue());
1449 let error = Error::Type(c"can not clone array buffer".to_owned());
1450 error
1451 .clone()
1452 .to_jsval(cx, &self.global(), rval.handle_mut());
1453 self.error(cx, rval.handle());
1454
1455 // Return cloneResult.
1456 Err(error)
1457 }
1458 }
1459
1460 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-enqueue-chunk-to-queue>
1461 pub(crate) fn enqueue_chunk_to_queue(
1462 &self,
1463 buffer: RootedTraceableBox<HeapBufferSource<ArrayBufferU8>>,
1464 byte_offset: usize,
1465 byte_length: usize,
1466 ) {
1467 // Let entry be a new ReadableByteStreamQueueEntry object.
1468 // Append entry to controller.[[queue]].
1469 self.queue
1470 .borrow_mut()
1471 .push_back(QueueEntry::new(buffer, byte_offset, byte_length));
1472
1473 // Set controller.[[queueTotalSize]] to controller.[[queueTotalSize]] + byteLength.
1474 self.queue_total_size
1475 .set(self.queue_total_size.get() + byte_length as f64);
1476 }
1477
1478 pub(crate) fn in_memory(&self) -> bool {
1479 let Some(underlying_source) = self.underlying_source.get() else {
1480 return false;
1481 };
1482 underlying_source.in_memory()
1483 }
1484
1485 pub(crate) fn get_in_memory_bytes(&self, cx: &mut JSContext) -> Option<Vec<u8>> {
1486 let underlying_source = self.underlying_source.get()?;
1487 if !underlying_source.in_memory() {
1488 return None;
1489 }
1490
1491 self.queue.borrow().iter().try_fold(
1492 Vec::with_capacity(self.queue_total_size.get() as usize),
1493 |mut bytes, entry| {
1494 let mut chunk = vec![0; entry.byte_length];
1495 entry
1496 .buffer
1497 .copy_data_to(
1498 cx,
1499 &mut chunk,
1500 entry.byte_offset,
1501 entry.byte_offset + entry.byte_length,
1502 )
1503 .ok()?;
1504 bytes.extend(chunk);
1505 Some(bytes)
1506 },
1507 )
1508 }
1509
1510 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-shift-pending-pull-into>
1511 pub(crate) fn shift_pending_pull_into(&self) -> PullIntoDescriptor {
1512 // Assert: controller.[[byobRequest]] is null.
1513 assert!(self.byob_request.get().is_none());
1514
1515 // Let descriptor be controller.[[pendingPullIntos]][0].
1516 // Remove descriptor from controller.[[pendingPullIntos]].
1517 // Return descriptor.
1518 self.pending_pull_intos.borrow_mut().remove(0)
1519 }
1520
1521 /// <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamcontrollerprocessreadrequestsusingqueue>
1522 pub(crate) fn process_read_requests_using_queue(&self, cx: &mut JSContext) -> Fallible<()> {
1523 // Let reader be controller.[[stream]].[[reader]].
1524 // Assert: reader implements ReadableStreamDefaultReader.
1525 let reader = self.stream.get().unwrap().get_default_reader();
1526
1527 // Step 3
1528 reader.process_read_requests(cx, DomRoot::from_ref(self))
1529 }
1530
1531 /// <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamcontrollerfillreadrequestfromqueue>
1532 pub(crate) fn fill_read_request_from_queue(
1533 &self,
1534 cx: &mut JSContext,
1535 read_request: &ReadRequest,
1536 ) -> Fallible<()> {
1537 // Assert: controller.[[queueTotalSize]] > 0.
1538 assert!(self.queue_total_size.get() > 0.0);
1539 // Also assert that the queue has a non-zero length;
1540 assert!(!self.queue.borrow().is_empty());
1541
1542 // Let entry be controller.[[queue]][0].
1543 // Remove entry from controller.[[queue]].
1544 rooted!(&in(cx) let entry = self.remove_entry());
1545
1546 // Set controller.[[queueTotalSize]] to controller.[[queueTotalSize]] − entry’s byte length.
1547 self.queue_total_size
1548 .set(self.queue_total_size.get() - entry.byte_length as f64);
1549
1550 // Perform ! ReadableByteStreamControllerHandleQueueDrain(controller).
1551 self.handle_queue_drain(cx);
1552
1553 // Let view be ! Construct(%Uint8Array%, « entry’s buffer, entry’s byte offset, entry’s byte length »).
1554 let view = create_buffer_source_with_constructor(
1555 cx,
1556 &Constructor::Name(Type::Uint8),
1557 &entry.buffer,
1558 entry.byte_offset,
1559 entry.byte_length,
1560 )
1561 .expect("Construct Uint8Array failed");
1562
1563 // Perform readRequest’s chunk steps, given view.
1564 let result = RootedTraceableBox::new(Heap::default());
1565 rooted!(&in(cx) let mut view_value = UndefinedValue());
1566 view.get_buffer_view_value(cx, view_value.handle_mut());
1567 result.set(*view_value);
1568
1569 read_request.chunk_steps(cx, result, &self.global());
1570
1571 Ok(())
1572 }
1573
1574 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-handle-queue-drain>
1575 pub(crate) fn handle_queue_drain(&self, cx: &mut JSContext) {
1576 // Assert: controller.[[stream]].[[state]] is "readable".
1577 assert!(self.stream.get().unwrap().is_readable());
1578
1579 // If controller.[[queueTotalSize]] is 0 and controller.[[closeRequested]] is true,
1580 if self.queue_total_size.get() == 0.0 && self.close_requested.get() {
1581 // Perform ! ReadableByteStreamControllerClearAlgorithms(controller).
1582 self.clear_algorithms();
1583
1584 // Perform ! ReadableStreamClose(controller.[[stream]]).
1585 self.stream.get().unwrap().close(cx);
1586 } else {
1587 // Perform ! ReadableByteStreamControllerCallPullIfNeeded(controller).
1588 self.call_pull_if_needed(cx);
1589 }
1590 }
1591
1592 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-call-pull-if-needed>
1593 fn call_pull_if_needed(&self, cx: &mut JSContext) {
1594 // Let shouldPull be ! ReadableByteStreamControllerShouldCallPull(controller).
1595 let should_pull = self.should_call_pull();
1596 // If shouldPull is false, return.
1597 if !should_pull {
1598 return;
1599 }
1600
1601 // If controller.[[pulling]] is true,
1602 if self.pulling.get() {
1603 // Set controller.[[pullAgain]] to true.
1604 self.pull_again.set(true);
1605
1606 // Return.
1607 return;
1608 }
1609
1610 // Assert: controller.[[pullAgain]] is false.
1611 assert!(!self.pull_again.get());
1612
1613 // Set controller.[[pulling]] to true.
1614 self.pulling.set(true);
1615
1616 // Let pullPromise be the result of performing controller.[[pullAlgorithm]].
1617 // Continues into the resolve and reject handling of the native handler.
1618 let global = self.global();
1619 let rooted_controller = DomRoot::from_ref(self);
1620 let controller = Controller::ReadableByteStreamController(rooted_controller.clone());
1621
1622 if let Some(underlying_source) = self.underlying_source.get() {
1623 let handler = PromiseNativeHandler::new(
1624 cx,
1625 &global,
1626 Some(Box::new(PullAlgorithmFulfillmentHandler {
1627 controller: Dom::from_ref(&rooted_controller),
1628 })),
1629 Some(Box::new(PullAlgorithmRejectionHandler {
1630 controller: Dom::from_ref(&rooted_controller),
1631 })),
1632 );
1633
1634 let mut realm = enter_auto_realm(cx, &*global);
1635 let cx = &mut realm.current_realm();
1636
1637 let result = underlying_source
1638 .call_pull_algorithm(cx, controller)
1639 .unwrap_or_else(|| {
1640 let promise = Promise::new_resolved(cx, &global, ());
1641 Ok(promise)
1642 });
1643 let promise = result.unwrap_or_else(|error| {
1644 rooted!(&in(cx) let mut rval = UndefinedValue());
1645 // TODO: check if `self.global()` is the right globalscope.
1646 error.to_jsval(cx, &global, rval.handle_mut());
1647 Promise::new_rejected(cx, &global, rval.handle())
1648 });
1649 promise.append_native_handler(cx, &handler);
1650 }
1651 }
1652
1653 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-should-call-pull>
1654 fn should_call_pull(&self) -> bool {
1655 // Let stream be controller.[[stream]].
1656 // Note: the spec does not assert that stream is not undefined here,
1657 // so we return false if it is.
1658 let stream = self.stream.get().unwrap();
1659
1660 // If stream.[[state]] is not "readable", return false.
1661 if !stream.is_readable() {
1662 return false;
1663 }
1664
1665 // If controller.[[closeRequested]] is true, return false.
1666 if self.close_requested.get() {
1667 return false;
1668 }
1669
1670 // If controller.[[started]] is false, return false.
1671 if !self.started.get() {
1672 return false;
1673 }
1674
1675 // If ! ReadableStreamHasDefaultReader(stream) is true and ! ReadableStreamGetNumReadRequests(stream) > 0
1676 // , return true.
1677 if stream.has_default_reader() && stream.get_num_read_requests() > 0 {
1678 return true;
1679 }
1680
1681 // If ! ReadableStreamHasBYOBReader(stream) is true and ! ReadableStreamGetNumReadIntoRequests(stream) > 0
1682 // , return true.
1683 if stream.has_byob_reader() && stream.get_num_read_into_requests() > 0 {
1684 return true;
1685 }
1686
1687 // Let desiredSize be ! ReadableByteStreamControllerGetDesiredSize(controller).
1688 let desired_size = self.get_desired_size();
1689
1690 // Assert: desiredSize is not null.
1691 assert!(desired_size.is_some());
1692
1693 // If desiredSize > 0, return true.
1694 if desired_size.unwrap() > 0. {
1695 return true;
1696 }
1697
1698 // Return false.
1699 false
1700 }
1701 /// <https://streams.spec.whatwg.org/#set-up-readable-byte-stream-controller>
1702 pub(crate) fn setup(
1703 &self,
1704 cx: &mut JSContext,
1705 global: &GlobalScope,
1706 stream: DomRoot<ReadableStream>,
1707 ) -> Fallible<()> {
1708 // Assert: stream.[[controller]] is undefined.
1709 stream.assert_no_controller();
1710
1711 // If autoAllocateChunkSize is not undefined,
1712 if self.auto_allocate_chunk_size.is_some() {
1713 // Assert: ! IsInteger(autoAllocateChunkSize) is true. Implicit
1714 // Assert: autoAllocateChunkSize is positive. (Implicit by type.)
1715 }
1716
1717 // Set controller.[[stream]] to stream.
1718 self.stream.set(Some(&stream));
1719
1720 // Set controller.[[pullAgain]] and controller.[[pulling]] to false.
1721 self.pull_again.set(false);
1722 self.pulling.set(false);
1723
1724 // Set controller.[[byobRequest]] to null.
1725 self.byob_request.set(None);
1726
1727 // Perform ! ResetQueue(controller).
1728 self.reset_queue();
1729
1730 // Set controller.[[closeRequested]] and controller.[[started]] to false.
1731 self.close_requested.set(false);
1732 self.started.set(false);
1733
1734 // Set controller.[[strategyHWM]] to highWaterMark.
1735 // Set controller.[[pullAlgorithm]] to pullAlgorithm.
1736 // Set controller.[[cancelAlgorithm]] to cancelAlgorithm.
1737 // Set controller.[[autoAllocateChunkSize]] to autoAllocateChunkSize.
1738 // Set controller.[[pendingPullIntos]] to a new empty list.
1739 // Note: the above steps are done in `new`.
1740
1741 // Set stream.[[controller]] to controller.
1742 let rooted_byte_controller = DomRoot::from_ref(self);
1743 stream.set_byte_controller(&rooted_byte_controller);
1744
1745 if let Some(underlying_source) = rooted_byte_controller.underlying_source.get() {
1746 // Let startResult be the result of performing startAlgorithm. (This might throw an exception.)
1747 let start_result = underlying_source
1748 .call_start_algorithm(
1749 cx,
1750 Controller::ReadableByteStreamController(rooted_byte_controller.clone()),
1751 )
1752 .unwrap_or_else(|| Ok(Promise::new_resolved(cx, global, ())));
1753
1754 // Let startPromise be a promise resolved with startResult.
1755 let start_promise = start_result?;
1756
1757 // Upon fulfillment of startPromise, Upon rejection of startPromise with reason r,
1758 let handler = PromiseNativeHandler::new(
1759 cx,
1760 global,
1761 Some(Box::new(StartAlgorithmFulfillmentHandler {
1762 controller: Dom::from_ref(&rooted_byte_controller),
1763 })),
1764 Some(Box::new(StartAlgorithmRejectionHandler {
1765 controller: Dom::from_ref(&rooted_byte_controller),
1766 })),
1767 );
1768 let mut realm = enter_auto_realm(cx, global);
1769 let cx = &mut realm.current_realm();
1770 start_promise.append_native_handler(cx, &handler);
1771 };
1772
1773 Ok(())
1774 }
1775
1776 // <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamcontroller-releasesteps
1777 pub(crate) fn perform_release_steps(&self) -> Fallible<()> {
1778 // If this.[[pendingPullIntos]] is not empty,
1779 let mut pending_pull_intos = self.pending_pull_intos.borrow_mut();
1780 if !pending_pull_intos.is_empty() {
1781 // Let firstPendingPullInto be this.[[pendingPullIntos]][0].
1782 let mut first_pending_pull_into = RootedTraceableBox::new(pending_pull_intos.remove(0));
1783
1784 // Set firstPendingPullInto’s reader type to "none".
1785 first_pending_pull_into.reader_type = None;
1786
1787 // Set this.[[pendingPullIntos]] to the list « firstPendingPullInto »
1788 pending_pull_intos.clear();
1789 pending_pull_intos.push(*first_pending_pull_into.into_box());
1790 }
1791 Ok(())
1792 }
1793
1794 /// <https://streams.spec.whatwg.org/#rbs-controller-private-cancel>
1795 pub(crate) fn perform_cancel_steps(
1796 &self,
1797 cx: &mut JSContext,
1798 global: &GlobalScope,
1799 reason: SafeHandleValue,
1800 ) -> Rc<Promise> {
1801 // Perform ! ReadableByteStreamControllerClearPendingPullIntos(this).
1802 self.clear_pending_pull_intos();
1803
1804 // Perform ! ResetQueue(this).
1805 self.reset_queue();
1806
1807 let underlying_source = self
1808 .underlying_source
1809 .get()
1810 .expect("Controller should have a source when the cancel steps are called into.");
1811
1812 // Let result be the result of performing this.[[cancelAlgorithm]], passing in reason.
1813 let result = underlying_source
1814 .call_cancel_algorithm(cx, global, reason)
1815 .unwrap_or_else(|| {
1816 let promise = Promise::new(cx, global);
1817 promise.resolve_native(cx, &());
1818 Ok(promise)
1819 });
1820
1821 let promise = result.unwrap_or_else(|error| {
1822 rooted!(&in(cx) let mut rval = UndefinedValue());
1823 error.to_jsval(cx, global, rval.handle_mut());
1824 let promise = Promise::new(cx, global);
1825 promise.reject_native(cx, &rval.handle());
1826 promise
1827 });
1828
1829 // Perform ! ReadableByteStreamControllerClearAlgorithms(this).
1830 self.clear_algorithms();
1831
1832 // Return result(the promise).
1833 promise
1834 }
1835
1836 /// <https://streams.spec.whatwg.org/#rbs-controller-private-pull>
1837 pub(crate) fn perform_pull_steps(&self, cx: &mut JSContext, read_request: &ReadRequest) {
1838 // Let stream be this.[[stream]].
1839 let stream = self.stream.get().unwrap();
1840
1841 // Assert: ! ReadableStreamHasDefaultReader(stream) is true.
1842 assert!(stream.has_default_reader());
1843
1844 // If this.[[queueTotalSize]] > 0,
1845 if self.queue_total_size.get() > 0.0 {
1846 // Assert: ! ReadableStreamGetNumReadRequests(stream) is 0.
1847 assert_eq!(stream.get_num_read_requests(), 0);
1848
1849 // Perform ! ReadableByteStreamControllerFillReadRequestFromQueue(this, readRequest).
1850 let _ = self.fill_read_request_from_queue(cx, read_request);
1851
1852 // Return.
1853 return;
1854 }
1855
1856 // Let autoAllocateChunkSize be this.[[autoAllocateChunkSize]].
1857 let auto_allocate_chunk_size = self.auto_allocate_chunk_size;
1858
1859 // If autoAllocateChunkSize is not undefined,
1860 if let Some(auto_allocate_chunk_size) = auto_allocate_chunk_size {
1861 // create_array_buffer_with_size
1862 // Let buffer be Construct(%ArrayBuffer%, « autoAllocateChunkSize »).
1863 match create_array_buffer_with_size(cx, auto_allocate_chunk_size as usize) {
1864 Ok(buffer) => {
1865 // Let pullIntoDescriptor be a new pull-into descriptor with
1866 // buffer buffer.[[Value]]
1867 // buffer byte length autoAllocateChunkSize
1868 // byte offset 0
1869 // byte length autoAllocateChunkSize
1870 // bytes filled 0
1871 // minimum fill 1
1872 // element size 1
1873 // view constructor %Uint8Array%
1874 // reader type "default"
1875
1876 // Append pullIntoDescriptor to this.[[pendingPullIntos]].
1877 self.pending_pull_intos
1878 .borrow_mut()
1879 .push(PullIntoDescriptor {
1880 buffer: *buffer.into_box(),
1881 buffer_byte_length: auto_allocate_chunk_size,
1882 byte_length: auto_allocate_chunk_size,
1883 byte_offset: 0,
1884 bytes_filled: Cell::new(0),
1885 minimum_fill: 1,
1886 element_size: 1,
1887 view_constructor: Constructor::Name(Type::Uint8),
1888 reader_type: Some(ReaderType::Default),
1889 });
1890 },
1891 Err(error) => {
1892 // If buffer is an abrupt completion,
1893 // Perform readRequest’s error steps, given buffer.[[Value]].
1894
1895 rooted!(&in(cx) let mut rval = UndefinedValue());
1896 error.to_jsval(cx, &self.global(), rval.handle_mut());
1897 read_request.error_steps(cx, rval.handle());
1898
1899 // Return.
1900 return;
1901 },
1902 }
1903 }
1904
1905 // Perform ! ReadableStreamAddReadRequest(stream, readRequest).
1906 stream.add_read_request(read_request);
1907
1908 // Perform ! ReadableByteStreamControllerCallPullIfNeeded(this).
1909 self.call_pull_if_needed(cx);
1910 }
1911
1912 /// Setting the JS object after the heap has settled down.
1913 pub(crate) fn set_underlying_source_this_object(&self, this_object: HandleObject) {
1914 if let Some(underlying_source) = self.underlying_source.get() {
1915 underlying_source.set_underlying_source_this_object(this_object);
1916 }
1917 }
1918
1919 pub(crate) fn remove_entry(&self) -> QueueEntry {
1920 self.queue
1921 .borrow_mut()
1922 .pop_front()
1923 .expect("Reader must have read request when remove is called into.")
1924 }
1925
1926 pub(crate) fn get_queue_total_size(&self) -> f64 {
1927 self.queue_total_size.get()
1928 }
1929
1930 pub(crate) fn get_pending_pull_intos_size(&self) -> usize {
1931 self.pending_pull_intos.borrow().len()
1932 }
1933}
1934
1935impl ReadableByteStreamControllerMethods<crate::DomTypeHolder> for ReadableByteStreamController {
1936 /// <https://streams.spec.whatwg.org/#rbs-controller-byob-request>
1937 fn GetByobRequest(
1938 &self,
1939 cx: &mut js::context::JSContext,
1940 ) -> Fallible<Option<DomRoot<ReadableStreamBYOBRequest>>> {
1941 // Return ! ReadableByteStreamControllerGetBYOBRequest(this).
1942 self.get_byob_request(cx)
1943 }
1944
1945 /// <https://streams.spec.whatwg.org/#rbs-controller-desired-size>
1946 fn GetDesiredSize(&self) -> Option<f64> {
1947 // Return ! ReadableByteStreamControllerGetDesiredSize(this).
1948 self.get_desired_size()
1949 }
1950
1951 /// <https://streams.spec.whatwg.org/#rbs-controller-close>
1952 fn Close(&self, cx: &mut JSContext) -> Fallible<()> {
1953 // If this.[[closeRequested]] is true, throw a TypeError exception.
1954 if self.close_requested.get() {
1955 return Err(Error::Type(c"closeRequested is true".to_owned()));
1956 }
1957
1958 // If this.[[stream]].[[state]] is not "readable", throw a TypeError exception.
1959 if !self.stream.get().unwrap().is_readable() {
1960 return Err(Error::Type(c"stream is not readable".to_owned()));
1961 }
1962
1963 // Perform ? ReadableByteStreamControllerClose(this).
1964 self.close(cx)
1965 }
1966
1967 /// <https://streams.spec.whatwg.org/#rbs-controller-enqueue>
1968 fn Enqueue(
1969 &self,
1970 cx: &mut JSContext,
1971 chunk: js::gc::CustomAutoRooterGuard<js::typedarray::ArrayBufferView>,
1972 ) -> Fallible<()> {
1973 let chunk = HeapBufferSource::<ArrayBufferViewU8>::from_view(chunk);
1974
1975 // If chunk.[[ByteLength]] is 0, throw a TypeError exception.
1976 if chunk.byte_length() == 0 {
1977 return Err(Error::Type(c"chunk.ByteLength is 0".to_owned()));
1978 }
1979
1980 // If chunk.[[ViewedArrayBuffer]].[[ByteLength]] is 0, throw a TypeError exception.
1981 if chunk.viewed_buffer_array_byte_length(cx) == 0 {
1982 return Err(Error::Type(
1983 c"chunk.ViewedArrayBuffer.ByteLength is 0".to_owned(),
1984 ));
1985 }
1986
1987 // If this.[[closeRequested]] is true, throw a TypeError exception.
1988 if self.close_requested.get() {
1989 return Err(Error::Type(c"closeRequested is true".to_owned()));
1990 }
1991
1992 // If this.[[stream]].[[state]] is not "readable", throw a TypeError exception.
1993 if !self.stream.get().unwrap().is_readable() {
1994 return Err(Error::Type(c"stream is not readable".to_owned()));
1995 }
1996
1997 // Return ? ReadableByteStreamControllerEnqueue(this, chunk).
1998 self.enqueue(cx, chunk)
1999 }
2000
2001 /// <https://streams.spec.whatwg.org/#rbs-controller-error>
2002 fn Error(&self, cx: &mut JSContext, e: SafeHandleValue) -> Fallible<()> {
2003 // Perform ! ReadableByteStreamControllerError(this, e).
2004 self.error(cx, e);
2005 Ok(())
2006 }
2007}