script/dom/readablebytestreamcontroller.rs
1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4
5use std::cell::Cell;
6use std::cmp::min;
7use std::collections::VecDeque;
8use std::rc::Rc;
9
10use dom_struct::dom_struct;
11use js::jsapi::{Heap, Type};
12use js::jsval::UndefinedValue;
13use js::rust::{HandleObject, HandleValue as SafeHandleValue, HandleValue};
14use js::typedarray::{ArrayBufferU8, ArrayBufferViewU8};
15
16use super::bindings::buffer_source::HeapBufferSource;
17use super::bindings::cell::DomRefCell;
18use super::bindings::codegen::Bindings::ReadableStreamBYOBReaderBinding::ReadableStreamBYOBReaderReadOptions;
19use super::bindings::reflector::reflect_dom_object;
20use super::bindings::root::Dom;
21use super::readablestreambyobreader::ReadIntoRequest;
22use super::readablestreamdefaultreader::ReadRequest;
23use super::underlyingsourcecontainer::{UnderlyingSourceContainer, UnderlyingSourceType};
24use crate::dom::bindings::buffer_source::{
25 Constructor, byte_size, create_array_buffer_with_size, create_buffer_source_with_constructor,
26};
27use crate::dom::bindings::codegen::Bindings::ReadableByteStreamControllerBinding::ReadableByteStreamControllerMethods;
28use crate::dom::bindings::codegen::UnionTypes::ReadableStreamDefaultControllerOrReadableByteStreamController as Controller;
29use crate::dom::bindings::error::{Error, ErrorToJsval, Fallible};
30use crate::dom::bindings::reflector::{DomGlobal, Reflector};
31use crate::dom::bindings::root::{DomRoot, MutNullableDom};
32use crate::dom::bindings::trace::RootedTraceableBox;
33use crate::dom::globalscope::GlobalScope;
34use crate::dom::promise::Promise;
35use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
36use crate::dom::readablestream::ReadableStream;
37use crate::dom::readablestreambyobrequest::ReadableStreamBYOBRequest;
38use crate::realms::{InRealm, enter_realm};
39use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
40
41/// <https://streams.spec.whatwg.org/#readable-byte-stream-queue-entry>
42#[derive(JSTraceable, MallocSizeOf)]
43pub(crate) struct QueueEntry {
44 /// <https://streams.spec.whatwg.org/#readable-byte-stream-queue-entry-buffer>
45 #[ignore_malloc_size_of = "HeapBufferSource"]
46 buffer: HeapBufferSource<ArrayBufferU8>,
47 /// <https://streams.spec.whatwg.org/#readable-byte-stream-queue-entry-byte-offset>
48 byte_offset: usize,
49 /// <https://streams.spec.whatwg.org/#readable-byte-stream-queue-entry-byte-length>
50 byte_length: usize,
51}
52
53impl QueueEntry {
54 pub(crate) fn new(
55 buffer: HeapBufferSource<ArrayBufferU8>,
56 byte_offset: usize,
57 byte_length: usize,
58 ) -> QueueEntry {
59 QueueEntry {
60 buffer,
61 byte_offset,
62 byte_length,
63 }
64 }
65}
66
67#[derive(Debug, Eq, JSTraceable, MallocSizeOf, PartialEq)]
68pub(crate) enum ReaderType {
69 /// <https://streams.spec.whatwg.org/#readablestreambyobreader>
70 Byob,
71 /// <https://streams.spec.whatwg.org/#readablestreamdefaultreader>
72 Default,
73}
74
75/// <https://streams.spec.whatwg.org/#pull-into-descriptor>
76#[derive(Eq, JSTraceable, MallocSizeOf, PartialEq)]
77pub(crate) struct PullIntoDescriptor {
78 #[ignore_malloc_size_of = "HeapBufferSource"]
79 /// <https://streams.spec.whatwg.org/#pull-into-descriptor-buffer>
80 buffer: HeapBufferSource<ArrayBufferU8>,
81 /// <https://streams.spec.whatwg.org/#pull-into-descriptor-buffer-byte-length>
82 buffer_byte_length: u64,
83 /// <https://streams.spec.whatwg.org/#pull-into-descriptor-byte-offset>
84 byte_offset: u64,
85 /// <https://streams.spec.whatwg.org/#pull-into-descriptor-byte-length>
86 byte_length: u64,
87 /// <https://streams.spec.whatwg.org/#pull-into-descriptor-bytes-filled>
88 bytes_filled: Cell<u64>,
89 /// <https://streams.spec.whatwg.org/#pull-into-descriptor-minimum-fill>
90 minimum_fill: u64,
91 /// <https://streams.spec.whatwg.org/#pull-into-descriptor-element-size>
92 element_size: u64,
93 /// <https://streams.spec.whatwg.org/#pull-into-descriptor-view-constructor>
94 view_constructor: Constructor,
95 /// <https://streams.spec.whatwg.org/#pull-into-descriptor-reader-type>
96 reader_type: Option<ReaderType>,
97}
98
99/// The fulfillment handler for
100/// <https://streams.spec.whatwg.org/#dom-underlyingsource-start>
101#[derive(Clone, JSTraceable, MallocSizeOf)]
102#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
103struct StartAlgorithmFulfillmentHandler {
104 controller: Dom<ReadableByteStreamController>,
105}
106
107impl Callback for StartAlgorithmFulfillmentHandler {
108 /// Continuation of <https://streams.spec.whatwg.org/#set-up-readable-byte-stream-controller>
109 /// Upon fulfillment of startPromise,
110 fn callback(&self, _cx: SafeJSContext, _v: HandleValue, _realm: InRealm, can_gc: CanGc) {
111 // Set controller.[[started]] to true.
112 self.controller.started.set(true);
113
114 // Assert: controller.[[pulling]] is false.
115 assert!(!self.controller.pulling.get());
116
117 // Assert: controller.[[pullAgain]] is false.
118 assert!(!self.controller.pull_again.get());
119
120 // Perform ! ReadableByteStreamControllerCallPullIfNeeded(controller).
121 self.controller.call_pull_if_needed(can_gc);
122 }
123}
124
125/// The rejection handler for
126/// <https://streams.spec.whatwg.org/#dom-underlyingsource-start>
127#[derive(Clone, JSTraceable, MallocSizeOf)]
128#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
129struct StartAlgorithmRejectionHandler {
130 controller: Dom<ReadableByteStreamController>,
131}
132
133impl Callback for StartAlgorithmRejectionHandler {
134 /// Continuation of <https://streams.spec.whatwg.org/#set-up-readable-byte-stream-controller>
135 /// Upon rejection of startPromise with reason r,
136 fn callback(&self, _cx: SafeJSContext, v: HandleValue, _realm: InRealm, can_gc: CanGc) {
137 // Perform ! ReadableByteStreamControllerError(controller, r).
138 self.controller.error(v, can_gc);
139 }
140}
141
142/// The fulfillment handler for
143/// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-call-pull-if-needed>
144#[derive(Clone, JSTraceable, MallocSizeOf)]
145#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
146struct PullAlgorithmFulfillmentHandler {
147 controller: Dom<ReadableByteStreamController>,
148}
149
150impl Callback for PullAlgorithmFulfillmentHandler {
151 /// Continuation of <https://streams.spec.whatwg.org/#readable-byte-stream-controller-call-pull-if-needed>
152 /// Upon fulfillment of pullPromise
153 fn callback(&self, _cx: SafeJSContext, _v: HandleValue, _realm: InRealm, can_gc: CanGc) {
154 // Set controller.[[pulling]] to false.
155 self.controller.pulling.set(false);
156
157 // If controller.[[pullAgain]] is true,
158 if self.controller.pull_again.get() {
159 // Set controller.[[pullAgain]] to false.
160 self.controller.pull_again.set(false);
161
162 // Perform ! ReadableByteStreamControllerCallPullIfNeeded(controller).
163 self.controller.call_pull_if_needed(can_gc);
164 }
165 }
166}
167
168/// The rejection handler for
169/// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-call-pull-if-needed>
170#[derive(Clone, JSTraceable, MallocSizeOf)]
171#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
172struct PullAlgorithmRejectionHandler {
173 controller: Dom<ReadableByteStreamController>,
174}
175
176impl Callback for PullAlgorithmRejectionHandler {
177 /// Continuation of <https://streams.spec.whatwg.org/#readable-stream-byte-controller-call-pull-if-needed>
178 /// Upon rejection of pullPromise with reason e.
179 fn callback(&self, _cx: SafeJSContext, v: HandleValue, _realm: InRealm, can_gc: CanGc) {
180 // Perform ! ReadableByteStreamControllerError(controller, e).
181 self.controller.error(v, can_gc);
182 }
183}
184
185/// <https://streams.spec.whatwg.org/#readablebytestreamcontroller>
186#[dom_struct]
187pub(crate) struct ReadableByteStreamController {
188 reflector_: Reflector,
189 /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-autoallocatechunksize>
190 auto_allocate_chunk_size: Option<u64>,
191 /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-stream>
192 stream: MutNullableDom<ReadableStream>,
193 /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-strategyhwm>
194 strategy_hwm: f64,
195 /// A mutable reference to the underlying source is used to implement these two
196 /// internal slots:
197 ///
198 /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-pullalgorithm>
199 /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-cancelalgorithm>
200 underlying_source: MutNullableDom<UnderlyingSourceContainer>,
201 /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-queue>
202 queue: DomRefCell<VecDeque<QueueEntry>>,
203 /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-queuetotalsize>
204 queue_total_size: Cell<f64>,
205 /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-byobrequest>
206 byob_request: MutNullableDom<ReadableStreamBYOBRequest>,
207 /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-pendingpullintos>
208 pending_pull_intos: DomRefCell<Vec<PullIntoDescriptor>>,
209 /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-closerequested>
210 close_requested: Cell<bool>,
211 /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-started>
212 started: Cell<bool>,
213 /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-pulling>
214 pulling: Cell<bool>,
215 /// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-pullalgorithm>
216 pull_again: Cell<bool>,
217}
218
219impl ReadableByteStreamController {
220 #[cfg_attr(crown, allow(crown::unrooted_must_root))]
221 fn new_inherited(
222 underlying_source_type: UnderlyingSourceType,
223 strategy_hwm: f64,
224 global: &GlobalScope,
225 can_gc: CanGc,
226 ) -> ReadableByteStreamController {
227 let underlying_source_container =
228 UnderlyingSourceContainer::new(global, underlying_source_type, can_gc);
229 let auto_allocate_chunk_size = underlying_source_container.auto_allocate_chunk_size();
230 ReadableByteStreamController {
231 reflector_: Reflector::new(),
232 byob_request: MutNullableDom::new(None),
233 stream: MutNullableDom::new(None),
234 underlying_source: MutNullableDom::new(Some(&*underlying_source_container)),
235 auto_allocate_chunk_size,
236 pending_pull_intos: DomRefCell::new(Vec::new()),
237 strategy_hwm,
238 close_requested: Default::default(),
239 queue: DomRefCell::new(Default::default()),
240 queue_total_size: Default::default(),
241 started: Default::default(),
242 pulling: Default::default(),
243 pull_again: Default::default(),
244 }
245 }
246
247 #[cfg_attr(crown, allow(crown::unrooted_must_root))]
248 pub(crate) fn new(
249 underlying_source_type: UnderlyingSourceType,
250 strategy_hwm: f64,
251 global: &GlobalScope,
252 can_gc: CanGc,
253 ) -> DomRoot<ReadableByteStreamController> {
254 reflect_dom_object(
255 Box::new(ReadableByteStreamController::new_inherited(
256 underlying_source_type,
257 strategy_hwm,
258 global,
259 can_gc,
260 )),
261 global,
262 can_gc,
263 )
264 }
265
266 pub(crate) fn set_stream(&self, stream: &ReadableStream) {
267 self.stream.set(Some(stream))
268 }
269
270 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-pull-into>
271 pub(crate) fn perform_pull_into(
272 &self,
273 cx: SafeJSContext,
274 read_into_request: &ReadIntoRequest,
275 view: HeapBufferSource<ArrayBufferViewU8>,
276 options: &ReadableStreamBYOBReaderReadOptions,
277 can_gc: CanGc,
278 ) {
279 // Let stream be controller.[[stream]].
280 let stream = self.stream.get().unwrap();
281
282 // Let elementSize be 1.
283 let mut element_size = 1;
284
285 // Let ctor be %DataView%.
286 let mut ctor = Constructor::DataView;
287
288 // If view has a [[TypedArrayName]] internal slot (i.e., it is not a DataView),
289 if view.has_typed_array_name() {
290 // Set elementSize to the element size specified in the
291 // typed array constructors table for view.[[TypedArrayName]].
292 let view_typw = view.get_array_buffer_view_type();
293 element_size = byte_size(view_typw);
294
295 // Set ctor to the constructor specified in the typed array constructors table for view.[[TypedArrayName]].
296 ctor = Constructor::Name(view_typw);
297 }
298
299 // Let minimumFill be min × elementSize.
300 let minimum_fill = options.min * element_size;
301
302 // Assert: minimumFill ≥ 0 and minimumFill ≤ view.[[ByteLength]].
303 assert!(minimum_fill <= (view.byte_length() as u64));
304
305 // Assert: the remainder after dividing minimumFill by elementSize is 0.
306 assert_eq!(minimum_fill % element_size, 0);
307
308 // Let byteOffset be view.[[ByteOffset]].
309 let byte_offset = view.get_byte_offset();
310
311 // Let byteLength be view.[[ByteLength]].
312 let byte_length = view.byte_length();
313
314 // Let bufferResult be TransferArrayBuffer(view.[[ViewedArrayBuffer]]).
315 match view
316 .get_array_buffer_view_buffer(cx)
317 .transfer_array_buffer(cx)
318 {
319 Ok(buffer) => {
320 // Let buffer be bufferResult.[[Value]].
321 // Let pullIntoDescriptor be a new pull-into descriptor with
322 // buffer buffer
323 // buffer byte length buffer.[[ArrayBufferByteLength]]
324 // byte offset byteOffset
325 // byte length byteLength
326 // bytes filled 0
327 // minimum fill minimumFill
328 // element size elementSize
329 // view constructor ctor
330 // reader type "byob"
331 let buffer_byte_length = buffer.byte_length();
332 let pull_into_descriptor = PullIntoDescriptor {
333 buffer,
334 buffer_byte_length: buffer_byte_length as u64,
335 byte_offset: byte_offset as u64,
336 byte_length: byte_length as u64,
337 bytes_filled: Cell::new(0),
338 minimum_fill,
339 element_size,
340 view_constructor: ctor.clone(),
341 reader_type: Some(ReaderType::Byob),
342 };
343
344 // If controller.[[pendingPullIntos]] is not empty,
345 {
346 let mut pending_pull_intos = self.pending_pull_intos.borrow_mut();
347 if !pending_pull_intos.is_empty() {
348 // Append pullIntoDescriptor to controller.[[pendingPullIntos]].
349 pending_pull_intos.push(pull_into_descriptor);
350
351 // Perform ! ReadableStreamAddReadIntoRequest(stream, readIntoRequest).
352 stream.add_read_into_request(read_into_request);
353
354 // Return.
355 return;
356 }
357 }
358
359 // If stream.[[state]] is "closed",
360 if stream.is_closed() {
361 // Let emptyView be ! Construct(ctor, « pullIntoDescriptor’s buffer,
362 // pullIntoDescriptor’s byte offset, 0 »).
363 if let Ok(empty_view) = create_buffer_source_with_constructor(
364 cx,
365 &ctor,
366 &pull_into_descriptor.buffer,
367 pull_into_descriptor.byte_offset as usize,
368 0,
369 ) {
370 // Perform readIntoRequest’s close steps, given emptyView.
371 let result = RootedTraceableBox::new(Heap::default());
372 rooted!(in(*cx) let mut view_value = UndefinedValue());
373 empty_view.get_buffer_view_value(cx, view_value.handle_mut());
374 result.set(*view_value);
375
376 read_into_request.close_steps(Some(result), can_gc);
377
378 // Return.
379 return;
380 } else {
381 return;
382 }
383 }
384
385 // If controller.[[queueTotalSize]] > 0,
386 if self.queue_total_size.get() > 0.0 {
387 // If ! ReadableByteStreamControllerFillPullIntoDescriptorFromQueue(
388 // controller, pullIntoDescriptor) is true,
389 if self.fill_pull_into_descriptor_from_queue(cx, &pull_into_descriptor) {
390 // Let filledView be ! ReadableByteStreamControllerConvertPullIntoDescriptor(
391 // pullIntoDescriptor).
392 if let Ok(filled_view) =
393 self.convert_pull_into_descriptor(cx, &pull_into_descriptor)
394 {
395 // Perform ! ReadableByteStreamControllerHandleQueueDrain(controller).
396 self.handle_queue_drain(can_gc);
397
398 // Perform readIntoRequest’s chunk steps, given filledView.
399 let result = RootedTraceableBox::new(Heap::default());
400 rooted!(in(*cx) let mut view_value = UndefinedValue());
401 filled_view.get_buffer_view_value(cx, view_value.handle_mut());
402 result.set(*view_value);
403 read_into_request.chunk_steps(result, can_gc);
404
405 // Return.
406 return;
407 } else {
408 return;
409 }
410 }
411
412 // If controller.[[closeRequested]] is true,
413 if self.close_requested.get() {
414 // Let e be a new TypeError exception.
415 rooted!(in(*cx) let mut error = UndefinedValue());
416 Error::Type("close requested".to_owned()).to_jsval(
417 cx,
418 &self.global(),
419 error.handle_mut(),
420 can_gc,
421 );
422
423 // Perform ! ReadableByteStreamControllerError(controller, e).
424 self.error(error.handle(), can_gc);
425
426 // Perform readIntoRequest’s error steps, given e.
427 read_into_request.error_steps(error.handle(), can_gc);
428
429 // Return.
430 return;
431 }
432 }
433
434 // Append pullIntoDescriptor to controller.[[pendingPullIntos]].
435 {
436 self.pending_pull_intos
437 .borrow_mut()
438 .push(pull_into_descriptor);
439 }
440 // Perform ! ReadableStreamAddReadIntoRequest(stream, readIntoRequest).
441 stream.add_read_into_request(read_into_request);
442
443 // Perform ! ReadableByteStreamControllerCallPullIfNeeded(controller).
444 self.call_pull_if_needed(can_gc);
445 },
446 Err(error) => {
447 // If bufferResult is an abrupt completion,
448
449 // Perform readIntoRequest’s error steps, given bufferResult.[[Value]].
450 rooted!(in(*cx) let mut rval = UndefinedValue());
451 error
452 .clone()
453 .to_jsval(cx, &self.global(), rval.handle_mut(), can_gc);
454 read_into_request.error_steps(rval.handle(), can_gc);
455
456 // Return.
457 },
458 }
459 }
460
461 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-respond>
462 pub(crate) fn respond(
463 &self,
464 cx: SafeJSContext,
465 bytes_written: u64,
466 can_gc: CanGc,
467 ) -> Fallible<()> {
468 {
469 // Assert: controller.[[pendingPullIntos]] is not empty.
470 let mut pending_pull_intos = self.pending_pull_intos.borrow_mut();
471 assert!(!pending_pull_intos.is_empty());
472
473 // Let firstDescriptor be controller.[[pendingPullIntos]][0].
474 let first_descriptor = pending_pull_intos.first_mut().unwrap();
475
476 // Let state be controller.[[stream]].[[state]].
477 let stream = self.stream.get().unwrap();
478
479 // If state is "closed",
480 if stream.is_closed() {
481 // If bytesWritten is not 0, throw a TypeError exception.
482 if bytes_written != 0 {
483 return Err(Error::Type(
484 "bytesWritten not zero on closed stream".to_owned(),
485 ));
486 }
487 } else {
488 // Assert: state is "readable".
489 assert!(stream.is_readable());
490
491 // If bytesWritten is 0, throw a TypeError exception.
492 if bytes_written == 0 {
493 return Err(Error::Type("bytesWritten is 0".to_owned()));
494 }
495
496 // If firstDescriptor’s bytes filled + bytesWritten > firstDescriptor’s byte length,
497 // throw a RangeError exception.
498 if first_descriptor.bytes_filled.get() + bytes_written >
499 first_descriptor.byte_length
500 {
501 return Err(Error::Range(
502 "bytes filled + bytesWritten > byte length".to_owned(),
503 ));
504 }
505 }
506
507 // Set firstDescriptor’s buffer to ! TransferArrayBuffer(firstDescriptor’s buffer).
508 first_descriptor.buffer = first_descriptor.buffer.transfer_array_buffer(cx)?;
509 }
510
511 // Perform ? ReadableByteStreamControllerRespondInternal(controller, bytesWritten).
512 self.respond_internal(cx, bytes_written, can_gc)
513 }
514
515 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-respond-internal>
516 pub(crate) fn respond_internal(
517 &self,
518 cx: SafeJSContext,
519 bytes_written: u64,
520 can_gc: CanGc,
521 ) -> Fallible<()> {
522 {
523 // Let firstDescriptor be controller.[[pendingPullIntos]][0].
524 let pending_pull_intos = self.pending_pull_intos.borrow();
525 let first_descriptor = pending_pull_intos.first().unwrap();
526
527 // Assert: ! CanTransferArrayBuffer(firstDescriptor’s buffer) is true
528 assert!(first_descriptor.buffer.can_transfer_array_buffer(cx));
529 }
530
531 // Perform ! ReadableByteStreamControllerInvalidateBYOBRequest(controller).
532 self.invalidate_byob_request();
533
534 // Let state be controller.[[stream]].[[state]].
535 let stream = self.stream.get().unwrap();
536
537 // If state is "closed",
538 if stream.is_closed() {
539 // Assert: bytesWritten is 0.
540 assert_eq!(bytes_written, 0);
541
542 // Perform ! ReadableByteStreamControllerRespondInClosedState(controller, firstDescriptor).
543 self.respond_in_closed_state(cx, can_gc)?;
544 } else {
545 // Assert: state is "readable".
546 assert!(stream.is_readable());
547
548 // Assert: bytesWritten > 0.
549 assert!(bytes_written > 0);
550
551 // Perform ? ReadableByteStreamControllerRespondInReadableState(controller, bytesWritten, firstDescriptor).
552 self.respond_in_readable_state(cx, bytes_written, can_gc)?;
553 }
554
555 // Perform ! ReadableByteStreamControllerCallPullIfNeeded(controller).
556 self.call_pull_if_needed(can_gc);
557
558 Ok(())
559 }
560
561 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-respond-in-closed-state>
562 pub(crate) fn respond_in_closed_state(&self, cx: SafeJSContext, can_gc: CanGc) -> Fallible<()> {
563 let pending_pull_intos = self.pending_pull_intos.borrow();
564 let first_descriptor = pending_pull_intos.first().unwrap();
565
566 // Assert: the remainder after dividing firstDescriptor’s bytes filled
567 // by firstDescriptor’s element size is 0.
568 assert_eq!(
569 first_descriptor.bytes_filled.get() % first_descriptor.element_size,
570 0
571 );
572
573 // If firstDescriptor’s reader type is "none",
574 // perform ! ReadableByteStreamControllerShiftPendingPullInto(controller).
575 let reader_type = first_descriptor.reader_type.is_none();
576
577 // needed to drop the borrow and avoid BorrowMutError
578 drop(pending_pull_intos);
579
580 if reader_type {
581 self.shift_pending_pull_into();
582 }
583
584 // Let stream be controller.[[stream]].
585 let stream = self.stream.get().unwrap();
586
587 // If ! ReadableStreamHasBYOBReader(stream) is true,
588 if stream.has_byob_reader() {
589 // Let filledPullIntos be a new empty list.
590 let mut filled_pull_intos = Vec::new();
591
592 // While filledPullIntos’s size < ! ReadableStreamGetNumReadIntoRequests(stream),
593 while filled_pull_intos.len() < stream.get_num_read_into_requests() {
594 // Let pullIntoDescriptor be ! ReadableByteStreamControllerShiftPendingPullInto(controller).
595 let pull_into_descriptor = self.shift_pending_pull_into();
596
597 // Append pullIntoDescriptor to filledPullIntos.
598 filled_pull_intos.push(pull_into_descriptor);
599 }
600
601 // For each filledPullInto of filledPullIntos,
602 for filled_pull_into in filled_pull_intos {
603 // Perform ! ReadableByteStreamControllerCommitPullIntoDescriptor(stream, filledPullInto).
604 self.commit_pull_into_descriptor(cx, &filled_pull_into, can_gc)?;
605 }
606 }
607
608 Ok(())
609 }
610
611 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-respond-in-readable-state>
612 pub(crate) fn respond_in_readable_state(
613 &self,
614 cx: SafeJSContext,
615 bytes_written: u64,
616 can_gc: CanGc,
617 ) -> Fallible<()> {
618 let pending_pull_intos = self.pending_pull_intos.borrow();
619 let first_descriptor = pending_pull_intos.first().unwrap();
620
621 // Assert: pullIntoDescriptor’s bytes filled + bytesWritten ≤ pullIntoDescriptor’s byte length.
622 assert!(
623 first_descriptor.bytes_filled.get() + bytes_written <= first_descriptor.byte_length
624 );
625
626 // Perform ! ReadableByteStreamControllerFillHeadPullIntoDescriptor(
627 // controller, bytesWritten, pullIntoDescriptor).
628 self.fill_head_pull_into_descriptor(bytes_written, first_descriptor);
629
630 // If pullIntoDescriptor’s reader type is "none",
631 if first_descriptor.reader_type.is_none() {
632 // needed to drop the borrow and avoid BorrowMutError
633 drop(pending_pull_intos);
634
635 // Perform ? ReadableByteStreamControllerEnqueueDetachedPullIntoToQueue(controller, pullIntoDescriptor).
636 self.enqueue_detached_pull_into_to_queue(cx, can_gc)?;
637
638 // Let filledPullIntos be the result of performing
639 // ! ReadableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(controller).
640 let filled_pull_intos = self.process_pull_into_descriptors_using_queue(cx);
641
642 // For each filledPullInto of filledPullIntos,
643 for filled_pull_into in filled_pull_intos {
644 // Perform ! ReadableByteStreamControllerCommitPullIntoDescriptor(controller.[[stream]]
645 // , filledPullInto).
646 self.commit_pull_into_descriptor(cx, &filled_pull_into, can_gc)?;
647 }
648
649 // Return.
650 return Ok(());
651 }
652
653 // If pullIntoDescriptor’s bytes filled < pullIntoDescriptor’s minimum fill, return.
654 if first_descriptor.bytes_filled.get() < first_descriptor.minimum_fill {
655 return Ok(());
656 }
657
658 // needed to drop the borrow and avoid BorrowMutError
659 drop(pending_pull_intos);
660
661 // Perform ! ReadableByteStreamControllerShiftPendingPullInto(controller).
662 let pull_into_descriptor = self.shift_pending_pull_into();
663
664 // Let remainderSize be the remainder after dividing pullIntoDescriptor’s bytes
665 // filled by pullIntoDescriptor’s element size.
666 let remainder_size =
667 pull_into_descriptor.bytes_filled.get() % pull_into_descriptor.element_size;
668
669 // If remainderSize > 0,
670 if remainder_size > 0 {
671 // Let end be pullIntoDescriptor’s byte offset + pullIntoDescriptor’s bytes filled.
672 let end = pull_into_descriptor.byte_offset + pull_into_descriptor.bytes_filled.get();
673
674 // Perform ? ReadableByteStreamControllerEnqueueClonedChunkToQueue(controller,
675 // pullIntoDescriptor’s buffer, end − remainderSize, remainderSize).
676 self.enqueue_cloned_chunk_to_queue(
677 cx,
678 &pull_into_descriptor.buffer,
679 end - remainder_size,
680 remainder_size,
681 can_gc,
682 )?;
683 }
684
685 // Set pullIntoDescriptor’s bytes filled to pullIntoDescriptor’s bytes filled − remainderSize.
686 pull_into_descriptor
687 .bytes_filled
688 .set(pull_into_descriptor.bytes_filled.get() - remainder_size);
689
690 // Let filledPullIntos be the result of performing
691 // ! ReadableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(controller).
692 let filled_pull_intos = self.process_pull_into_descriptors_using_queue(cx);
693
694 // Perform ! ReadableByteStreamControllerCommitPullIntoDescriptor(controller.[[stream]], pullIntoDescriptor).
695 self.commit_pull_into_descriptor(cx, &pull_into_descriptor, can_gc)?;
696
697 // For each filledPullInto of filledPullIntos,
698 for filled_pull_into in filled_pull_intos {
699 // Perform ! ReadableByteStreamControllerCommitPullIntoDescriptor(controller.[[stream]], filledPullInto).
700 self.commit_pull_into_descriptor(cx, &filled_pull_into, can_gc)?;
701 }
702
703 Ok(())
704 }
705
706 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-respond-with-new-view>
707 pub(crate) fn respond_with_new_view(
708 &self,
709 cx: SafeJSContext,
710 view: HeapBufferSource<ArrayBufferViewU8>,
711 can_gc: CanGc,
712 ) -> Fallible<()> {
713 let view_byte_length;
714 {
715 // Assert: controller.[[pendingPullIntos]] is not empty.
716 let mut pending_pull_intos = self.pending_pull_intos.borrow_mut();
717 assert!(!pending_pull_intos.is_empty());
718
719 // Assert: ! IsDetachedBuffer(view.[[ViewedArrayBuffer]]) is false.
720 assert!(!view.is_detached_buffer(cx));
721
722 // Let firstDescriptor be controller.[[pendingPullIntos]][0].
723 let first_descriptor = pending_pull_intos.first_mut().unwrap();
724
725 // Let state be controller.[[stream]].[[state]].
726 let stream = self.stream.get().unwrap();
727
728 // If state is "closed",
729 if stream.is_closed() {
730 // If view.[[ByteLength]] is not 0, throw a TypeError exception.
731 if view.byte_length() != 0 {
732 return Err(Error::Type("view byte length is not 0".to_owned()));
733 }
734 } else {
735 // Assert: state is "readable".
736 assert!(stream.is_readable());
737
738 // If view.[[ByteLength]] is 0, throw a TypeError exception.
739 if view.byte_length() == 0 {
740 return Err(Error::Type("view byte length is 0".to_owned()));
741 }
742 }
743
744 // If firstDescriptor’s byte offset + firstDescriptor’ bytes filled is not view.[[ByteOffset]],
745 // throw a RangeError exception.
746 if first_descriptor.byte_offset + first_descriptor.bytes_filled.get() !=
747 (view.get_byte_offset() as u64)
748 {
749 return Err(Error::Range(
750 "firstDescriptor's byte offset + bytes filled is not view byte offset"
751 .to_owned(),
752 ));
753 }
754
755 // If firstDescriptor’s buffer byte length is not view.[[ViewedArrayBuffer]].[[ByteLength]],
756 // throw a RangeError exception.
757 if first_descriptor.buffer_byte_length !=
758 (view.viewed_buffer_array_byte_length(cx) as u64)
759 {
760 return Err(Error::Range(
761 "firstDescriptor's buffer byte length is not view viewed buffer array byte length"
762 .to_owned(),
763 ));
764 }
765
766 // If firstDescriptor’s bytes filled + view.[[ByteLength]] > firstDescriptor’s byte length,
767 // throw a RangeError exception.
768 if first_descriptor.bytes_filled.get() + (view.byte_length()) as u64 >
769 first_descriptor.byte_length
770 {
771 return Err(Error::Range(
772 "bytes filled + view byte length > byte length".to_owned(),
773 ));
774 }
775
776 // Let viewByteLength be view.[[ByteLength]].
777 view_byte_length = view.byte_length();
778
779 // Set firstDescriptor’s buffer to ? TransferArrayBuffer(view.[[ViewedArrayBuffer]]).
780 first_descriptor.buffer = view
781 .get_array_buffer_view_buffer(cx)
782 .transfer_array_buffer(cx)?;
783 }
784
785 // Perform ? ReadableByteStreamControllerRespondInternal(controller, viewByteLength).
786 self.respond_internal(cx, view_byte_length as u64, can_gc)
787 }
788
789 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-get-desired-size>
790 pub(crate) fn get_desired_size(&self) -> Option<f64> {
791 // Let state be controller.[[stream]].[[state]].
792 let stream = self.stream.get()?;
793
794 // If state is "errored", return null.
795 if stream.is_errored() {
796 return None;
797 }
798
799 // If state is "closed", return 0.
800 if stream.is_closed() {
801 return Some(0.0);
802 }
803
804 // Return controller.[[strategyHWM]] − controller.[[queueTotalSize]].
805 Some(self.strategy_hwm - self.queue_total_size.get())
806 }
807
808 /// <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamcontrollergetbyobrequest>
809 pub(crate) fn get_byob_request(
810 &self,
811 cx: SafeJSContext,
812 can_gc: CanGc,
813 ) -> Fallible<Option<DomRoot<ReadableStreamBYOBRequest>>> {
814 // If controller.[[byobRequest]] is null and controller.[[pendingPullIntos]] is not empty,
815 let pending_pull_intos = self.pending_pull_intos.borrow();
816 if self.byob_request.get().is_none() && !pending_pull_intos.is_empty() {
817 // Let firstDescriptor be controller.[[pendingPullIntos]][0].
818 let first_descriptor = pending_pull_intos.first().unwrap();
819 // Let view be ! Construct(%Uint8Array%, « firstDescriptor’s buffer,
820 // firstDescriptor’s byte offset + firstDescriptor’s bytes filled,
821 // firstDescriptor’s byte length − firstDescriptor’s bytes filled »).
822
823 let byte_offset = first_descriptor.byte_offset + first_descriptor.bytes_filled.get();
824 let byte_length = first_descriptor.byte_length - first_descriptor.bytes_filled.get();
825
826 let view = create_buffer_source_with_constructor(
827 cx,
828 &Constructor::Name(Type::Uint8),
829 &first_descriptor.buffer,
830 byte_offset as usize,
831 byte_length as usize,
832 )?;
833
834 // Let byobRequest be a new ReadableStreamBYOBRequest.
835 let byob_request = ReadableStreamBYOBRequest::new(&self.global(), can_gc);
836
837 // Set byobRequest.[[controller]] to controller.
838 byob_request.set_controller(Some(&DomRoot::from_ref(self)));
839
840 // Set byobRequest.[[view]] to view.
841 byob_request.set_view(Some(view));
842
843 // Set controller.[[byobRequest]] to byobRequest.
844 self.byob_request.set(Some(&byob_request));
845 }
846
847 // Return controller.[[byobRequest]].
848 Ok(self.byob_request.get())
849 }
850
851 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-close>
852 pub(crate) fn close(&self, cx: SafeJSContext, can_gc: CanGc) -> Fallible<()> {
853 // Let stream be controller.[[stream]].
854 let stream = self.stream.get().unwrap();
855
856 // If controller.[[closeRequested]] is true or stream.[[state]] is not "readable", return.
857 if self.close_requested.get() || !stream.is_readable() {
858 return Ok(());
859 }
860
861 // If controller.[[queueTotalSize]] > 0,
862 if self.queue_total_size.get() > 0.0 {
863 // Set controller.[[closeRequested]] to true.
864 self.close_requested.set(true);
865 // Return.
866 return Ok(());
867 }
868
869 // If controller.[[pendingPullIntos]] is not empty,
870 let pending_pull_intos = self.pending_pull_intos.borrow();
871 if !pending_pull_intos.is_empty() {
872 // Let firstPendingPullInto be controller.[[pendingPullIntos]][0].
873 let first_pending_pull_into = pending_pull_intos.first().unwrap();
874
875 // If the remainder after dividing firstPendingPullInto’s bytes filled by
876 // firstPendingPullInto’s element size is not 0,
877 if first_pending_pull_into.bytes_filled.get() % first_pending_pull_into.element_size !=
878 0
879 {
880 // needed to drop the borrow and avoid BorrowMutError
881 drop(pending_pull_intos);
882
883 // Let e be a new TypeError exception.
884 let e = Error::Type(
885 "remainder after dividing firstPendingPullInto's bytes
886 filled by firstPendingPullInto's element size is not 0"
887 .to_owned(),
888 );
889
890 // Perform ! ReadableByteStreamControllerError(controller, e).
891 rooted!(in(*cx) let mut error = UndefinedValue());
892 e.clone()
893 .to_jsval(cx, &self.global(), error.handle_mut(), can_gc);
894 self.error(error.handle(), can_gc);
895
896 // Throw e.
897 return Err(e);
898 }
899 }
900
901 // Perform ! ReadableByteStreamControllerClearAlgorithms(controller).
902 self.clear_algorithms();
903
904 // Perform ! ReadableStreamClose(stream).
905 stream.close(can_gc);
906 Ok(())
907 }
908
909 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-error>
910 pub(crate) fn error(&self, e: SafeHandleValue, can_gc: CanGc) {
911 // Let stream be controller.[[stream]].
912 let stream = self.stream.get().unwrap();
913
914 // If stream.[[state]] is not "readable", return.
915 if !stream.is_readable() {
916 return;
917 }
918
919 // Perform ! ReadableByteStreamControllerClearPendingPullIntos(controller).
920 self.clear_pending_pull_intos();
921
922 // Perform ! ResetQueue(controller).
923 self.reset_queue();
924
925 // Perform ! ReadableByteStreamControllerClearAlgorithms(controller).
926 self.clear_algorithms();
927
928 // Perform ! ReadableStreamError(stream, e).
929 stream.error(e, can_gc);
930 }
931
932 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-clear-algorithms>
933 fn clear_algorithms(&self) {
934 // Set controller.[[pullAlgorithm]] to undefined.
935 // Set controller.[[cancelAlgorithm]] to undefined.
936 self.underlying_source.set(None);
937 }
938
939 /// <https://streams.spec.whatwg.org/#reset-queue>
940 pub(crate) fn reset_queue(&self) {
941 // Assert: container has [[queue]] and [[queueTotalSize]] internal slots.
942
943 // Set container.[[queue]] to a new empty list.
944 self.queue.borrow_mut().clear();
945
946 // Set container.[[queueTotalSize]] to 0.
947 self.queue_total_size.set(0.0);
948 }
949
950 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-clear-pending-pull-intos>
951 pub(crate) fn clear_pending_pull_intos(&self) {
952 // Perform ! ReadableByteStreamControllerInvalidateBYOBRequest(controller).
953 self.invalidate_byob_request();
954
955 // Set controller.[[pendingPullIntos]] to a new empty list.
956 self.pending_pull_intos.borrow_mut().clear();
957 }
958
959 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-invalidate-byob-request>
960 pub(crate) fn invalidate_byob_request(&self) {
961 if let Some(byob_request) = self.byob_request.get() {
962 // Set controller.[[byobRequest]].[[controller]] to undefined.
963 byob_request.set_controller(None);
964
965 // Set controller.[[byobRequest]].[[view]] to null.
966 byob_request.set_view(None);
967
968 // Set controller.[[byobRequest]] to null.
969 self.byob_request.set(None);
970 }
971 // If controller.[[byobRequest]] is null, return.
972 }
973
974 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-enqueue>
975 pub(crate) fn enqueue(
976 &self,
977 cx: SafeJSContext,
978 chunk: HeapBufferSource<ArrayBufferViewU8>,
979 can_gc: CanGc,
980 ) -> Fallible<()> {
981 // Let stream be controller.[[stream]].
982 let stream = self.stream.get().unwrap();
983
984 // If controller.[[closeRequested]] is true or stream.[[state]] is not "readable", return.
985 if self.close_requested.get() || !stream.is_readable() {
986 return Ok(());
987 }
988
989 // Let buffer be chunk.[[ViewedArrayBuffer]].
990 let buffer = chunk.get_array_buffer_view_buffer(cx);
991
992 // Let byteOffset be chunk.[[ByteOffset]].
993 let byte_offset = chunk.get_byte_offset();
994
995 // Let byteLength be chunk.[[ByteLength]].
996 let byte_length = chunk.byte_length();
997
998 // If ! IsDetachedBuffer(buffer) is true, throw a TypeError exception.
999 if buffer.is_detached_buffer(cx) {
1000 return Err(Error::Type("buffer is detached".to_owned()));
1001 }
1002
1003 // Let transferredBuffer be ? TransferArrayBuffer(buffer).
1004 let transferred_buffer = buffer.transfer_array_buffer(cx)?;
1005
1006 // If controller.[[pendingPullIntos]] is not empty,
1007 {
1008 let mut pending_pull_intos = self.pending_pull_intos.borrow_mut();
1009 if !pending_pull_intos.is_empty() {
1010 // Let firstPendingPullInto be controller.[[pendingPullIntos]][0].
1011 let first_descriptor = pending_pull_intos.first_mut().unwrap();
1012 // If ! IsDetachedBuffer(firstPendingPullInto’s buffer) is true, throw a TypeError exception.
1013 if first_descriptor.buffer.is_detached_buffer(cx) {
1014 return Err(Error::Type("buffer is detached".to_owned()));
1015 }
1016
1017 // Perform ! ReadableByteStreamControllerInvalidateBYOBRequest(controller).
1018 self.invalidate_byob_request();
1019
1020 // Set firstPendingPullInto’s buffer to ! TransferArrayBuffer(firstPendingPullInto’s buffer).
1021 first_descriptor.buffer = first_descriptor.buffer.transfer_array_buffer(cx)?;
1022
1023 // If firstPendingPullInto’s reader type is "none",
1024 if first_descriptor.reader_type.is_none() {
1025 // needed to drop the borrow and avoid BorrowMutError
1026 drop(pending_pull_intos);
1027
1028 // perform ? ReadableByteStreamControllerEnqueueDetachedPullIntoToQueue(
1029 // controller, firstPendingPullInto).
1030 self.enqueue_detached_pull_into_to_queue(cx, can_gc)?;
1031 }
1032 }
1033 }
1034
1035 // If ! ReadableStreamHasDefaultReader(stream) is true,
1036 if stream.has_default_reader() {
1037 // Perform ! ReadableByteStreamControllerProcessReadRequestsUsingQueue(controller).
1038 self.process_read_requests_using_queue(cx, can_gc)?;
1039
1040 // If ! ReadableStreamGetNumReadRequests(stream) is 0,
1041 if stream.get_num_read_requests() == 0 {
1042 // Assert: controller.[[pendingPullIntos]] is empty.
1043 {
1044 assert!(self.pending_pull_intos.borrow().is_empty());
1045 }
1046
1047 // Perform ! ReadableByteStreamControllerEnqueueChunkToQueue(
1048 // controller, transferredBuffer, byteOffset, byteLength).
1049 self.enqueue_chunk_to_queue(transferred_buffer, byte_offset, byte_length);
1050 } else {
1051 // Assert: controller.[[queue]] is empty.
1052 assert!(self.queue.borrow().is_empty());
1053
1054 // If controller.[[pendingPullIntos]] is not empty,
1055
1056 let pending_pull_intos = self.pending_pull_intos.borrow();
1057 if !pending_pull_intos.is_empty() {
1058 // Assert: controller.[[pendingPullIntos]][0]'s reader type is "default".
1059 assert!(matches!(
1060 pending_pull_intos.first().unwrap().reader_type,
1061 Some(ReaderType::Default)
1062 ));
1063
1064 // needed to drop the borrow and avoid BorrowMutError
1065 drop(pending_pull_intos);
1066
1067 // Perform ! ReadableByteStreamControllerShiftPendingPullInto(controller).
1068 self.shift_pending_pull_into();
1069 }
1070
1071 // Let transferredView be ! Construct(%Uint8Array%, « transferredBuffer, byteOffset, byteLength »).
1072 let transferred_view = create_buffer_source_with_constructor(
1073 cx,
1074 &Constructor::Name(Type::Uint8),
1075 &transferred_buffer,
1076 byte_offset,
1077 byte_length,
1078 )?;
1079
1080 // Perform ! ReadableStreamFulfillReadRequest(stream, transferredView, false).
1081 rooted!(in(*cx) let mut view_value = UndefinedValue());
1082 transferred_view.get_buffer_view_value(cx, view_value.handle_mut());
1083 stream.fulfill_read_request(view_value.handle(), false, can_gc);
1084 }
1085 // Otherwise, if ! ReadableStreamHasBYOBReader(stream) is true,
1086 } else if stream.has_byob_reader() {
1087 // Perform ! ReadableByteStreamControllerEnqueueChunkToQueue(
1088 // controller, transferredBuffer, byteOffset, byteLength).
1089 self.enqueue_chunk_to_queue(transferred_buffer, byte_offset, byte_length);
1090
1091 // Let filledPullIntos be the result of performing !
1092 // ReadableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(controller).
1093 let filled_pull_intos = self.process_pull_into_descriptors_using_queue(cx);
1094
1095 // For each filledPullInto of filledPullIntos,
1096 // Perform ! ReadableByteStreamControllerCommitPullIntoDescriptor(stream, filledPullInto).
1097 for filled_pull_into in filled_pull_intos {
1098 self.commit_pull_into_descriptor(cx, &filled_pull_into, can_gc)?;
1099 }
1100 } else {
1101 // Assert: ! IsReadableStreamLocked(stream) is false.
1102 assert!(!stream.is_locked());
1103
1104 // Perform ! ReadableByteStreamControllerEnqueueChunkToQueue
1105 // (controller, transferredBuffer, byteOffset, byteLength).
1106 self.enqueue_chunk_to_queue(transferred_buffer, byte_offset, byte_length);
1107 }
1108
1109 // Perform ! ReadableByteStreamControllerCallPullIfNeeded(controller).
1110 self.call_pull_if_needed(can_gc);
1111
1112 Ok(())
1113 }
1114
1115 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-commit-pull-into-descriptor>
1116 pub(crate) fn commit_pull_into_descriptor(
1117 &self,
1118 cx: SafeJSContext,
1119 pull_into_descriptor: &PullIntoDescriptor,
1120 can_gc: CanGc,
1121 ) -> Fallible<()> {
1122 // Assert: stream.[[state]] is not "errored".
1123 let stream = self.stream.get().unwrap();
1124 assert!(!stream.is_errored());
1125
1126 // Assert: pullIntoDescriptor.reader type is not "none".
1127 assert!(pull_into_descriptor.reader_type.is_some());
1128
1129 // Let done be false.
1130 let mut done = false;
1131
1132 // If stream.[[state]] is "closed",
1133 if stream.is_closed() {
1134 // Assert: the remainder after dividing pullIntoDescriptor’s bytes filled
1135 // by pullIntoDescriptor’s element size is 0.
1136 assert!(
1137 pull_into_descriptor.bytes_filled.get() % pull_into_descriptor.element_size == 0
1138 );
1139
1140 // Set done to true.
1141 done = true;
1142 }
1143
1144 // Let filledView be ! ReadableByteStreamControllerConvertPullIntoDescriptor(pullIntoDescriptor).
1145 let filled_view = self.convert_pull_into_descriptor(cx, pull_into_descriptor)?;
1146
1147 rooted!(in(*cx) let mut view_value = UndefinedValue());
1148 filled_view.get_buffer_view_value(cx, view_value.handle_mut());
1149
1150 // If pullIntoDescriptor’s reader type is "default",
1151 if matches!(pull_into_descriptor.reader_type, Some(ReaderType::Default)) {
1152 // Perform ! ReadableStreamFulfillReadRequest(stream, filledView, done).
1153
1154 stream.fulfill_read_request(view_value.handle(), done, can_gc);
1155 } else {
1156 // Assert: pullIntoDescriptor’s reader type is "byob".
1157 assert!(matches!(
1158 pull_into_descriptor.reader_type,
1159 Some(ReaderType::Byob)
1160 ));
1161
1162 // Perform ! ReadableStreamFulfillReadIntoRequest(stream, filledView, done).
1163 stream.fulfill_read_into_request(view_value.handle(), done, can_gc);
1164 }
1165 Ok(())
1166 }
1167
1168 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-convert-pull-into-descriptor>
1169 pub(crate) fn convert_pull_into_descriptor(
1170 &self,
1171 cx: SafeJSContext,
1172 pull_into_descriptor: &PullIntoDescriptor,
1173 ) -> Fallible<HeapBufferSource<ArrayBufferViewU8>> {
1174 // Let bytesFilled be pullIntoDescriptor’s bytes filled.
1175 let bytes_filled = pull_into_descriptor.bytes_filled.get();
1176
1177 // Let elementSize be pullIntoDescriptor’s element size.
1178 let element_size = pull_into_descriptor.element_size;
1179
1180 // Assert: bytesFilled ≤ pullIntoDescriptor’s byte length.
1181 assert!(bytes_filled <= pull_into_descriptor.byte_length);
1182
1183 // Assert: the remainder after dividing bytesFilled by elementSize is 0.
1184 assert!(bytes_filled % element_size == 0);
1185
1186 // Let buffer be ! TransferArrayBuffer(pullIntoDescriptor’s buffer).
1187 let buffer = pull_into_descriptor.buffer.transfer_array_buffer(cx)?;
1188
1189 // Return ! Construct(pullIntoDescriptor’s view constructor,
1190 // « buffer, pullIntoDescriptor’s byte offset, bytesFilled ÷ elementSize »).
1191 create_buffer_source_with_constructor(
1192 cx,
1193 &pull_into_descriptor.view_constructor,
1194 &buffer,
1195 pull_into_descriptor.byte_offset as usize,
1196 (bytes_filled / element_size) as usize,
1197 )
1198 }
1199
1200 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-process-pull-into-descriptors-using-queue>
1201 pub(crate) fn process_pull_into_descriptors_using_queue(
1202 &self,
1203 cx: SafeJSContext,
1204 ) -> Vec<PullIntoDescriptor> {
1205 // Assert: controller.[[closeRequested]] is false.
1206 assert!(!self.close_requested.get());
1207
1208 // Let filledPullIntos be a new empty list.
1209 let mut filled_pull_intos = Vec::new();
1210
1211 // While controller.[[pendingPullIntos]] is not empty,
1212 loop {
1213 // If controller.[[queueTotalSize]] is 0, then break.
1214 if self.queue_total_size.get() == 0.0 {
1215 break;
1216 }
1217
1218 // Let pullIntoDescriptor be controller.[[pendingPullIntos]][0].
1219 let fill_pull_result = {
1220 let pending_pull_intos = self.pending_pull_intos.borrow();
1221 let Some(pull_into_descriptor) = pending_pull_intos.first() else {
1222 break;
1223 };
1224 self.fill_pull_into_descriptor_from_queue(cx, pull_into_descriptor)
1225 };
1226
1227 // If ! ReadableByteStreamControllerFillPullIntoDescriptorFromQueue(controller, pullIntoDescriptor) is true,
1228 if fill_pull_result {
1229 // Perform ! ReadableByteStreamControllerShiftPendingPullInto(controller).
1230 let pull_into_descriptor = self.shift_pending_pull_into();
1231
1232 // Append pullIntoDescriptor to filledPullIntos.
1233 filled_pull_intos.push(pull_into_descriptor);
1234 }
1235 }
1236
1237 // Return filledPullIntos.
1238 filled_pull_intos
1239 }
1240
1241 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-fill-pull-into-descriptor-from-queue>
1242 pub(crate) fn fill_pull_into_descriptor_from_queue(
1243 &self,
1244 cx: SafeJSContext,
1245 pull_into_descriptor: &PullIntoDescriptor,
1246 ) -> bool {
1247 // Let maxBytesToCopy be min(controller.[[queueTotalSize]],
1248 // pullIntoDescriptor’s byte length − pullIntoDescriptor’s bytes filled).
1249 let max_bytes_to_copy = min(
1250 self.queue_total_size.get() as usize,
1251 (pull_into_descriptor.byte_length - pull_into_descriptor.bytes_filled.get()) as usize,
1252 );
1253
1254 // Let maxBytesFilled be pullIntoDescriptor’s bytes filled + maxBytesToCopy.
1255 let max_bytes_filled = pull_into_descriptor.bytes_filled.get() as usize + max_bytes_to_copy;
1256
1257 // Let totalBytesToCopyRemaining be maxBytesToCopy.
1258 let mut total_bytes_to_copy_remaining = max_bytes_to_copy;
1259
1260 // Let ready be false.
1261 let mut ready = false;
1262
1263 // Assert: ! IsDetachedBuffer(pullIntoDescriptor’s buffer) is false.
1264 assert!(!pull_into_descriptor.buffer.is_detached_buffer(cx));
1265
1266 // Assert: pullIntoDescriptor’s bytes filled < pullIntoDescriptor’s minimum fill.
1267 assert!(pull_into_descriptor.bytes_filled.get() < pull_into_descriptor.minimum_fill);
1268
1269 // Let remainderBytes be the remainder after dividing maxBytesFilled by pullIntoDescriptor’s element size.
1270 let remainder_bytes = max_bytes_filled % pull_into_descriptor.element_size as usize;
1271
1272 // Let maxAlignedBytes be maxBytesFilled − remainderBytes.
1273 let max_aligned_bytes = max_bytes_filled - remainder_bytes;
1274
1275 // If maxAlignedBytes ≥ pullIntoDescriptor’s minimum fill,
1276 if max_aligned_bytes >= pull_into_descriptor.minimum_fill as usize {
1277 // Set totalBytesToCopyRemaining to maxAlignedBytes − pullIntoDescriptor’s bytes filled.
1278 total_bytes_to_copy_remaining =
1279 max_aligned_bytes - (pull_into_descriptor.bytes_filled.get() as usize);
1280
1281 // Set ready to true.
1282 ready = true;
1283 }
1284
1285 // Let queue be controller.[[queue]].
1286 let mut queue = self.queue.borrow_mut();
1287
1288 // While totalBytesToCopyRemaining > 0,
1289 while total_bytes_to_copy_remaining > 0 {
1290 // Let headOfQueue be queue[0].
1291 let head_of_queue = queue.front_mut().unwrap();
1292
1293 // Let bytesToCopy be min(totalBytesToCopyRemaining, headOfQueue’s byte length).
1294 let bytes_to_copy = total_bytes_to_copy_remaining.min(head_of_queue.byte_length);
1295
1296 // Let destStart be pullIntoDescriptor’s byte offset + pullIntoDescriptor’s bytes filled.
1297 let dest_start =
1298 pull_into_descriptor.byte_offset + pull_into_descriptor.bytes_filled.get();
1299
1300 // Let descriptorBuffer be pullIntoDescriptor’s buffer.
1301 let descriptor_buffer = &pull_into_descriptor.buffer;
1302
1303 // Let queueBuffer be headOfQueue’s buffer.
1304 let queue_buffer = &head_of_queue.buffer;
1305
1306 // Let queueByteOffset be headOfQueue’s byte offset.
1307 let queue_byte_offset = head_of_queue.byte_offset;
1308
1309 // Assert: ! CanCopyDataBlockBytes(descriptorBuffer, destStart,
1310 // queueBuffer, queueByteOffset, bytesToCopy) is true.
1311 assert!(descriptor_buffer.can_copy_data_block_bytes(
1312 cx,
1313 dest_start as usize,
1314 queue_buffer,
1315 queue_byte_offset,
1316 bytes_to_copy
1317 ));
1318
1319 // Perform ! CopyDataBlockBytes(descriptorBuffer.[[ArrayBufferData]], destStart,
1320 // queueBuffer.[[ArrayBufferData]], queueByteOffset, bytesToCopy).
1321 descriptor_buffer.copy_data_block_bytes(
1322 cx,
1323 dest_start as usize,
1324 queue_buffer,
1325 queue_byte_offset,
1326 bytes_to_copy,
1327 );
1328
1329 // If headOfQueue’s byte length is bytesToCopy,
1330 if head_of_queue.byte_length == bytes_to_copy {
1331 // Remove queue[0].
1332 queue.pop_front().unwrap();
1333 } else {
1334 // Set headOfQueue’s byte offset to headOfQueue’s byte offset + bytesToCopy.
1335 head_of_queue.byte_offset += bytes_to_copy;
1336
1337 // Set headOfQueue’s byte length to headOfQueue’s byte length − bytesToCopy.
1338 head_of_queue.byte_length -= bytes_to_copy;
1339 }
1340
1341 // Set controller.[[queueTotalSize]] to controller.[[queueTotalSize]] − bytesToCopy.
1342 self.queue_total_size
1343 .set(self.queue_total_size.get() - (bytes_to_copy as f64));
1344
1345 // Perform ! ReadableByteStreamControllerFillHeadPullIntoDescriptor(
1346 // controller, bytesToCopy, pullIntoDescriptor).
1347 self.fill_head_pull_into_descriptor(bytes_to_copy as u64, pull_into_descriptor);
1348
1349 // Set totalBytesToCopyRemaining to totalBytesToCopyRemaining − bytesToCopy.
1350 total_bytes_to_copy_remaining -= bytes_to_copy;
1351 }
1352
1353 // If ready is false,
1354 if !ready {
1355 // Assert: controller.[[queueTotalSize]] is 0.
1356 assert!(self.queue_total_size.get() == 0.0);
1357
1358 // Assert: pullIntoDescriptor’s bytes filled > 0.
1359 assert!(pull_into_descriptor.bytes_filled.get() > 0);
1360
1361 // Assert: pullIntoDescriptor’s bytes filled < pullIntoDescriptor’s minimum fill.
1362 assert!(pull_into_descriptor.bytes_filled.get() < pull_into_descriptor.minimum_fill);
1363 }
1364
1365 // Return ready.
1366 ready
1367 }
1368
1369 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-fill-head-pull-into-descriptor>
1370 pub(crate) fn fill_head_pull_into_descriptor(
1371 &self,
1372 bytes_copied: u64,
1373 pull_into_descriptor: &PullIntoDescriptor,
1374 ) {
1375 // Assert: either controller.[[pendingPullIntos]] is empty,
1376 // or controller.[[pendingPullIntos]][0] is pullIntoDescriptor.
1377 {
1378 let pending_pull_intos = self.pending_pull_intos.borrow();
1379 assert!(
1380 pending_pull_intos.is_empty() ||
1381 pending_pull_intos.first().unwrap() == pull_into_descriptor
1382 );
1383 }
1384
1385 // Assert: controller.[[byobRequest]] is null.
1386 assert!(self.byob_request.get().is_none());
1387
1388 // Set pullIntoDescriptor’s bytes filled to bytes filled + size.
1389 pull_into_descriptor
1390 .bytes_filled
1391 .set(pull_into_descriptor.bytes_filled.get() + bytes_copied);
1392 }
1393
1394 /// <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamcontrollerenqueuedetachedpullintotoqueue>
1395 pub(crate) fn enqueue_detached_pull_into_to_queue(
1396 &self,
1397 cx: SafeJSContext,
1398 can_gc: CanGc,
1399 ) -> Fallible<()> {
1400 // first_descriptor: &PullIntoDescriptor,
1401 let pending_pull_intos = self.pending_pull_intos.borrow();
1402 let first_descriptor = pending_pull_intos.first().unwrap();
1403
1404 // Assert: pullIntoDescriptor’s reader type is "none".
1405 assert!(first_descriptor.reader_type.is_none());
1406
1407 // If pullIntoDescriptor’s bytes filled > 0, perform ?
1408 // ReadableByteStreamControllerEnqueueClonedChunkToQueue(controller,
1409 // pullIntoDescriptor’s buffer, pullIntoDescriptor’s byte offset, pullIntoDescriptor’s bytes filled).
1410
1411 if first_descriptor.bytes_filled.get() > 0 {
1412 self.enqueue_cloned_chunk_to_queue(
1413 cx,
1414 &first_descriptor.buffer,
1415 first_descriptor.byte_offset,
1416 first_descriptor.bytes_filled.get(),
1417 can_gc,
1418 )?;
1419 }
1420
1421 // needed to drop the borrow and avoid BorrowMutError
1422 drop(pending_pull_intos);
1423
1424 // Perform ! ReadableByteStreamControllerShiftPendingPullInto(controller).
1425 self.shift_pending_pull_into();
1426
1427 Ok(())
1428 }
1429
1430 /// <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamcontrollerenqueueclonedchunktoqueue>
1431 pub(crate) fn enqueue_cloned_chunk_to_queue(
1432 &self,
1433 cx: SafeJSContext,
1434 buffer: &HeapBufferSource<ArrayBufferU8>,
1435 byte_offset: u64,
1436 byte_length: u64,
1437 can_gc: CanGc,
1438 ) -> Fallible<()> {
1439 // Let cloneResult be CloneArrayBuffer(buffer, byteOffset, byteLength, %ArrayBuffer%).
1440 if let Some(clone_result) =
1441 buffer.clone_array_buffer(cx, byte_offset as usize, byte_length as usize)
1442 {
1443 // Perform ! ReadableByteStreamControllerEnqueueChunkToQueue
1444 // (controller, cloneResult.[[Value]], 0, byteLength).
1445 self.enqueue_chunk_to_queue(clone_result, 0, byte_length as usize);
1446
1447 Ok(())
1448 } else {
1449 // If cloneResult is an abrupt completion,
1450
1451 // Perform ! ReadableByteStreamControllerError(controller, cloneResult.[[Value]]).
1452 rooted!(in(*cx) let mut rval = UndefinedValue());
1453 let error = Error::Type("can not clone array buffer".to_owned());
1454 error
1455 .clone()
1456 .to_jsval(cx, &self.global(), rval.handle_mut(), can_gc);
1457 self.error(rval.handle(), can_gc);
1458
1459 // Return cloneResult.
1460 Err(error)
1461 }
1462 }
1463
1464 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-enqueue-chunk-to-queue>
1465 pub(crate) fn enqueue_chunk_to_queue(
1466 &self,
1467 buffer: HeapBufferSource<ArrayBufferU8>,
1468 byte_offset: usize,
1469 byte_length: usize,
1470 ) {
1471 // Let entry be a new ReadableByteStreamQueueEntry object.
1472 let entry = QueueEntry::new(buffer, byte_offset, byte_length);
1473
1474 // Append entry to controller.[[queue]].
1475 self.queue.borrow_mut().push_back(entry);
1476
1477 // Set controller.[[queueTotalSize]] to controller.[[queueTotalSize]] + byteLength.
1478 self.queue_total_size
1479 .set(self.queue_total_size.get() + byte_length as f64);
1480 }
1481
1482 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-shift-pending-pull-into>
1483 pub(crate) fn shift_pending_pull_into(&self) -> PullIntoDescriptor {
1484 // Assert: controller.[[byobRequest]] is null.
1485 assert!(self.byob_request.get().is_none());
1486
1487 // Let descriptor be controller.[[pendingPullIntos]][0].
1488 // Remove descriptor from controller.[[pendingPullIntos]].
1489 // Return descriptor.
1490 self.pending_pull_intos.borrow_mut().remove(0)
1491 }
1492
1493 /// <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamcontrollerprocessreadrequestsusingqueue>
1494 pub(crate) fn process_read_requests_using_queue(
1495 &self,
1496 cx: SafeJSContext,
1497 can_gc: CanGc,
1498 ) -> Fallible<()> {
1499 // Let reader be controller.[[stream]].[[reader]].
1500 // Assert: reader implements ReadableStreamDefaultReader.
1501 let reader = self.stream.get().unwrap().get_default_reader();
1502
1503 // Step 3
1504 reader.process_read_requests(cx, DomRoot::from_ref(self), can_gc)
1505 }
1506
1507 /// <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamcontrollerfillreadrequestfromqueue>
1508 pub(crate) fn fill_read_request_from_queue(
1509 &self,
1510 cx: SafeJSContext,
1511 read_request: &ReadRequest,
1512 can_gc: CanGc,
1513 ) -> Fallible<()> {
1514 // Assert: controller.[[queueTotalSize]] > 0.
1515 assert!(self.queue_total_size.get() > 0.0);
1516 // Also assert that the queue has a non-zero length;
1517 assert!(!self.queue.borrow().is_empty());
1518
1519 // Let entry be controller.[[queue]][0].
1520 // Remove entry from controller.[[queue]].
1521 let entry = self.remove_entry();
1522
1523 // Set controller.[[queueTotalSize]] to controller.[[queueTotalSize]] − entry’s byte length.
1524 self.queue_total_size
1525 .set(self.queue_total_size.get() - entry.byte_length as f64);
1526
1527 // Perform ! ReadableByteStreamControllerHandleQueueDrain(controller).
1528 self.handle_queue_drain(can_gc);
1529
1530 // Let view be ! Construct(%Uint8Array%, « entry’s buffer, entry’s byte offset, entry’s byte length »).
1531 let view = create_buffer_source_with_constructor(
1532 cx,
1533 &Constructor::Name(Type::Uint8),
1534 &entry.buffer,
1535 entry.byte_offset,
1536 entry.byte_length,
1537 )?;
1538
1539 // Perform readRequest’s chunk steps, given view.
1540 let result = RootedTraceableBox::new(Heap::default());
1541 rooted!(in(*cx) let mut view_value = UndefinedValue());
1542 view.get_buffer_view_value(cx, view_value.handle_mut());
1543 result.set(*view_value);
1544
1545 read_request.chunk_steps(result, can_gc);
1546
1547 Ok(())
1548 }
1549
1550 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-handle-queue-drain>
1551 pub(crate) fn handle_queue_drain(&self, can_gc: CanGc) {
1552 // Assert: controller.[[stream]].[[state]] is "readable".
1553 assert!(self.stream.get().unwrap().is_readable());
1554
1555 // If controller.[[queueTotalSize]] is 0 and controller.[[closeRequested]] is true,
1556 if self.queue_total_size.get() == 0.0 && self.close_requested.get() {
1557 // Perform ! ReadableByteStreamControllerClearAlgorithms(controller).
1558 self.clear_algorithms();
1559
1560 // Perform ! ReadableStreamClose(controller.[[stream]]).
1561 self.stream.get().unwrap().close(can_gc);
1562 } else {
1563 // Perform ! ReadableByteStreamControllerCallPullIfNeeded(controller).
1564 self.call_pull_if_needed(can_gc);
1565 }
1566 }
1567
1568 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-call-pull-if-needed>
1569 pub(crate) fn call_pull_if_needed(&self, can_gc: CanGc) {
1570 // Let shouldPull be ! ReadableByteStreamControllerShouldCallPull(controller).
1571 let should_pull = self.should_call_pull();
1572 // If shouldPull is false, return.
1573 if !should_pull {
1574 return;
1575 }
1576
1577 // If controller.[[pulling]] is true,
1578 if self.pulling.get() {
1579 // Set controller.[[pullAgain]] to true.
1580 self.pull_again.set(true);
1581
1582 // Return.
1583 return;
1584 }
1585
1586 // Assert: controller.[[pullAgain]] is false.
1587 assert!(!self.pull_again.get());
1588
1589 // Set controller.[[pulling]] to true.
1590 self.pulling.set(true);
1591
1592 // Let pullPromise be the result of performing controller.[[pullAlgorithm]].
1593 // Continues into the resolve and reject handling of the native handler.
1594 let global = self.global();
1595 let rooted_controller = DomRoot::from_ref(self);
1596 let controller = Controller::ReadableByteStreamController(rooted_controller.clone());
1597
1598 if let Some(underlying_source) = self.underlying_source.get() {
1599 let handler = PromiseNativeHandler::new(
1600 &global,
1601 Some(Box::new(PullAlgorithmFulfillmentHandler {
1602 controller: Dom::from_ref(&rooted_controller),
1603 })),
1604 Some(Box::new(PullAlgorithmRejectionHandler {
1605 controller: Dom::from_ref(&rooted_controller),
1606 })),
1607 can_gc,
1608 );
1609
1610 let realm = enter_realm(&*global);
1611 let comp = InRealm::Entered(&realm);
1612 let result = underlying_source
1613 .call_pull_algorithm(controller, &global, can_gc)
1614 .unwrap_or_else(|| {
1615 let promise = Promise::new(&global, can_gc);
1616 promise.resolve_native(&(), can_gc);
1617 Ok(promise)
1618 });
1619 let promise = result.unwrap_or_else(|error| {
1620 let cx = GlobalScope::get_cx();
1621 rooted!(in(*cx) let mut rval = UndefinedValue());
1622 // TODO: check if `self.global()` is the right globalscope.
1623 error
1624 .clone()
1625 .to_jsval(cx, &self.global(), rval.handle_mut(), can_gc);
1626 let promise = Promise::new(&global, can_gc);
1627 promise.reject_native(&rval.handle(), can_gc);
1628 promise
1629 });
1630 promise.append_native_handler(&handler, comp, can_gc);
1631 }
1632 }
1633
1634 /// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-should-call-pull>
1635 fn should_call_pull(&self) -> bool {
1636 // Let stream be controller.[[stream]].
1637 // Note: the spec does not assert that stream is not undefined here,
1638 // so we return false if it is.
1639 let stream = self.stream.get().unwrap();
1640
1641 // If stream.[[state]] is not "readable", return false.
1642 if !stream.is_readable() {
1643 return false;
1644 }
1645
1646 // If controller.[[closeRequested]] is true, return false.
1647 if self.close_requested.get() {
1648 return false;
1649 }
1650
1651 // If controller.[[started]] is false, return false.
1652 if !self.started.get() {
1653 return false;
1654 }
1655
1656 // If ! ReadableStreamHasDefaultReader(stream) is true and ! ReadableStreamGetNumReadRequests(stream) > 0
1657 // , return true.
1658 if stream.has_default_reader() && stream.get_num_read_requests() > 0 {
1659 return true;
1660 }
1661
1662 // If ! ReadableStreamHasBYOBReader(stream) is true and ! ReadableStreamGetNumReadIntoRequests(stream) > 0
1663 // , return true.
1664 if stream.has_byob_reader() && stream.get_num_read_into_requests() > 0 {
1665 return true;
1666 }
1667
1668 // Let desiredSize be ! ReadableByteStreamControllerGetDesiredSize(controller).
1669 let desired_size = self.get_desired_size();
1670
1671 // Assert: desiredSize is not null.
1672 assert!(desired_size.is_some());
1673
1674 // If desiredSize > 0, return true.
1675 if desired_size.unwrap() > 0. {
1676 return true;
1677 }
1678
1679 // Return false.
1680 false
1681 }
1682 /// <https://streams.spec.whatwg.org/#set-up-readable-byte-stream-controller>
1683 pub(crate) fn setup(
1684 &self,
1685 global: &GlobalScope,
1686 stream: DomRoot<ReadableStream>,
1687 can_gc: CanGc,
1688 ) -> Fallible<()> {
1689 // Assert: stream.[[controller]] is undefined.
1690 stream.assert_no_controller();
1691
1692 // If autoAllocateChunkSize is not undefined,
1693 if self.auto_allocate_chunk_size.is_some() {
1694 // Assert: ! IsInteger(autoAllocateChunkSize) is true. Implicit
1695 // Assert: autoAllocateChunkSize is positive. (Implicit by type.)
1696 }
1697
1698 // Set controller.[[stream]] to stream.
1699 self.stream.set(Some(&stream));
1700
1701 // Set controller.[[pullAgain]] and controller.[[pulling]] to false.
1702 self.pull_again.set(false);
1703 self.pulling.set(false);
1704
1705 // Set controller.[[byobRequest]] to null.
1706 self.byob_request.set(None);
1707
1708 // Perform ! ResetQueue(controller).
1709 self.reset_queue();
1710
1711 // Set controller.[[closeRequested]] and controller.[[started]] to false.
1712 self.close_requested.set(false);
1713 self.started.set(false);
1714
1715 // Set controller.[[strategyHWM]] to highWaterMark.
1716 // Set controller.[[pullAlgorithm]] to pullAlgorithm.
1717 // Set controller.[[cancelAlgorithm]] to cancelAlgorithm.
1718 // Set controller.[[autoAllocateChunkSize]] to autoAllocateChunkSize.
1719 // Set controller.[[pendingPullIntos]] to a new empty list.
1720 // Note: the above steps are done in `new`.
1721
1722 // Set stream.[[controller]] to controller.
1723 let rooted_byte_controller = DomRoot::from_ref(self);
1724 stream.set_byte_controller(&rooted_byte_controller);
1725
1726 if let Some(underlying_source) = rooted_byte_controller.underlying_source.get() {
1727 // Let startResult be the result of performing startAlgorithm. (This might throw an exception.)
1728 let start_result = underlying_source
1729 .call_start_algorithm(
1730 Controller::ReadableByteStreamController(rooted_byte_controller.clone()),
1731 can_gc,
1732 )
1733 .unwrap_or_else(|| {
1734 let promise = Promise::new(global, can_gc);
1735 promise.resolve_native(&(), can_gc);
1736 Ok(promise)
1737 });
1738
1739 // Let startPromise be a promise resolved with startResult.
1740 let start_promise = start_result?;
1741
1742 // Upon fulfillment of startPromise, Upon rejection of startPromise with reason r,
1743 let handler = PromiseNativeHandler::new(
1744 global,
1745 Some(Box::new(StartAlgorithmFulfillmentHandler {
1746 controller: Dom::from_ref(&rooted_byte_controller),
1747 })),
1748 Some(Box::new(StartAlgorithmRejectionHandler {
1749 controller: Dom::from_ref(&rooted_byte_controller),
1750 })),
1751 can_gc,
1752 );
1753 let realm = enter_realm(global);
1754 let comp = InRealm::Entered(&realm);
1755 start_promise.append_native_handler(&handler, comp, can_gc);
1756 };
1757
1758 Ok(())
1759 }
1760
1761 // <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamcontroller-releasesteps
1762 pub(crate) fn perform_release_steps(&self) -> Fallible<()> {
1763 // If this.[[pendingPullIntos]] is not empty,
1764 let mut pending_pull_intos = self.pending_pull_intos.borrow_mut();
1765 if !pending_pull_intos.is_empty() {
1766 // Let firstPendingPullInto be this.[[pendingPullIntos]][0].
1767 let mut first_pending_pull_into = pending_pull_intos.remove(0);
1768
1769 // Set firstPendingPullInto’s reader type to "none".
1770 first_pending_pull_into.reader_type = None;
1771
1772 // Set this.[[pendingPullIntos]] to the list « firstPendingPullInto »
1773 pending_pull_intos.clear();
1774 pending_pull_intos.push(first_pending_pull_into);
1775 }
1776 Ok(())
1777 }
1778
1779 /// <https://streams.spec.whatwg.org/#rbs-controller-private-cancel>
1780 pub(crate) fn perform_cancel_steps(
1781 &self,
1782 cx: SafeJSContext,
1783 global: &GlobalScope,
1784 reason: SafeHandleValue,
1785 can_gc: CanGc,
1786 ) -> Rc<Promise> {
1787 // Perform ! ReadableByteStreamControllerClearPendingPullIntos(this).
1788 self.clear_pending_pull_intos();
1789
1790 // Perform ! ResetQueue(this).
1791 self.reset_queue();
1792
1793 let underlying_source = self
1794 .underlying_source
1795 .get()
1796 .expect("Controller should have a source when the cancel steps are called into.");
1797
1798 // Let result be the result of performing this.[[cancelAlgorithm]], passing in reason.
1799 let result = underlying_source
1800 .call_cancel_algorithm(cx, global, reason, can_gc)
1801 .unwrap_or_else(|| {
1802 let promise = Promise::new(global, can_gc);
1803 promise.resolve_native(&(), can_gc);
1804 Ok(promise)
1805 });
1806
1807 let promise = result.unwrap_or_else(|error| {
1808 let cx = GlobalScope::get_cx();
1809 rooted!(in(*cx) let mut rval = UndefinedValue());
1810 error
1811 .clone()
1812 .to_jsval(cx, global, rval.handle_mut(), can_gc);
1813 let promise = Promise::new(global, can_gc);
1814 promise.reject_native(&rval.handle(), can_gc);
1815 promise
1816 });
1817
1818 // Perform ! ReadableByteStreamControllerClearAlgorithms(this).
1819 self.clear_algorithms();
1820
1821 // Return result(the promise).
1822 promise
1823 }
1824
1825 /// <https://streams.spec.whatwg.org/#rbs-controller-private-pull>
1826 pub(crate) fn perform_pull_steps(
1827 &self,
1828 cx: SafeJSContext,
1829 read_request: &ReadRequest,
1830 can_gc: CanGc,
1831 ) {
1832 // Let stream be this.[[stream]].
1833 let stream = self.stream.get().unwrap();
1834
1835 // Assert: ! ReadableStreamHasDefaultReader(stream) is true.
1836 assert!(stream.has_default_reader());
1837
1838 // If this.[[queueTotalSize]] > 0,
1839 if self.queue_total_size.get() > 0.0 {
1840 // Assert: ! ReadableStreamGetNumReadRequests(stream) is 0.
1841 assert_eq!(stream.get_num_read_requests(), 0);
1842
1843 // Perform ! ReadableByteStreamControllerFillReadRequestFromQueue(this, readRequest).
1844 let _ = self.fill_read_request_from_queue(cx, read_request, can_gc);
1845
1846 // Return.
1847 return;
1848 }
1849
1850 // Let autoAllocateChunkSize be this.[[autoAllocateChunkSize]].
1851 let auto_allocate_chunk_size = self.auto_allocate_chunk_size;
1852
1853 // If autoAllocateChunkSize is not undefined,
1854 if let Some(auto_allocate_chunk_size) = auto_allocate_chunk_size {
1855 // create_array_buffer_with_size
1856 // Let buffer be Construct(%ArrayBuffer%, « autoAllocateChunkSize »).
1857 match create_array_buffer_with_size(cx, auto_allocate_chunk_size as usize) {
1858 Ok(buffer) => {
1859 // Let pullIntoDescriptor be a new pull-into descriptor with
1860 // buffer buffer.[[Value]]
1861 // buffer byte length autoAllocateChunkSize
1862 // byte offset 0
1863 // byte length autoAllocateChunkSize
1864 // bytes filled 0
1865 // minimum fill 1
1866 // element size 1
1867 // view constructor %Uint8Array%
1868 // reader type "default"
1869 let pull_into_descriptor = PullIntoDescriptor {
1870 buffer,
1871 buffer_byte_length: auto_allocate_chunk_size,
1872 byte_length: auto_allocate_chunk_size,
1873 byte_offset: 0,
1874 bytes_filled: Cell::new(0),
1875 minimum_fill: 1,
1876 element_size: 1,
1877 view_constructor: Constructor::Name(Type::Uint8),
1878 reader_type: Some(ReaderType::Default),
1879 };
1880
1881 // Append pullIntoDescriptor to this.[[pendingPullIntos]].
1882 self.pending_pull_intos
1883 .borrow_mut()
1884 .push(pull_into_descriptor);
1885 },
1886 Err(error) => {
1887 // If buffer is an abrupt completion,
1888 // Perform readRequest’s error steps, given buffer.[[Value]].
1889
1890 rooted!(in(*cx) let mut rval = UndefinedValue());
1891 error
1892 .clone()
1893 .to_jsval(cx, &self.global(), rval.handle_mut(), can_gc);
1894 read_request.error_steps(rval.handle(), can_gc);
1895
1896 // Return.
1897 return;
1898 },
1899 }
1900 }
1901
1902 // Perform ! ReadableStreamAddReadRequest(stream, readRequest).
1903 stream.add_read_request(read_request);
1904
1905 // Perform ! ReadableByteStreamControllerCallPullIfNeeded(this).
1906 self.call_pull_if_needed(can_gc);
1907 }
1908
1909 /// Setting the JS object after the heap has settled down.
1910 pub(crate) fn set_underlying_source_this_object(&self, this_object: HandleObject) {
1911 if let Some(underlying_source) = self.underlying_source.get() {
1912 underlying_source.set_underlying_source_this_object(this_object);
1913 }
1914 }
1915
1916 pub(crate) fn remove_entry(&self) -> QueueEntry {
1917 self.queue
1918 .borrow_mut()
1919 .pop_front()
1920 .expect("Reader must have read request when remove is called into.")
1921 }
1922
1923 pub(crate) fn get_queue_total_size(&self) -> f64 {
1924 self.queue_total_size.get()
1925 }
1926}
1927
1928impl ReadableByteStreamControllerMethods<crate::DomTypeHolder> for ReadableByteStreamController {
1929 /// <https://streams.spec.whatwg.org/#rbs-controller-byob-request>
1930 fn GetByobRequest(
1931 &self,
1932 can_gc: CanGc,
1933 ) -> Fallible<Option<DomRoot<ReadableStreamBYOBRequest>>> {
1934 let cx = GlobalScope::get_cx();
1935 // Return ! ReadableByteStreamControllerGetBYOBRequest(this).
1936 self.get_byob_request(cx, can_gc)
1937 }
1938
1939 /// <https://streams.spec.whatwg.org/#rbs-controller-desired-size>
1940 fn GetDesiredSize(&self) -> Option<f64> {
1941 // Return ! ReadableByteStreamControllerGetDesiredSize(this).
1942 self.get_desired_size()
1943 }
1944
1945 /// <https://streams.spec.whatwg.org/#rbs-controller-close>
1946 fn Close(&self, can_gc: CanGc) -> Fallible<()> {
1947 let cx = GlobalScope::get_cx();
1948 // If this.[[closeRequested]] is true, throw a TypeError exception.
1949 if self.close_requested.get() {
1950 return Err(Error::Type("closeRequested is true".to_owned()));
1951 }
1952
1953 // If this.[[stream]].[[state]] is not "readable", throw a TypeError exception.
1954 if !self.stream.get().unwrap().is_readable() {
1955 return Err(Error::Type("stream is not readable".to_owned()));
1956 }
1957
1958 // Perform ? ReadableByteStreamControllerClose(this).
1959 self.close(cx, can_gc)
1960 }
1961
1962 /// <https://streams.spec.whatwg.org/#rbs-controller-enqueue>
1963 fn Enqueue(
1964 &self,
1965 chunk: js::gc::CustomAutoRooterGuard<js::typedarray::ArrayBufferView>,
1966 can_gc: CanGc,
1967 ) -> Fallible<()> {
1968 let cx = GlobalScope::get_cx();
1969
1970 let chunk = HeapBufferSource::<ArrayBufferViewU8>::from_view(chunk);
1971
1972 // If chunk.[[ByteLength]] is 0, throw a TypeError exception.
1973 if chunk.byte_length() == 0 {
1974 return Err(Error::Type("chunk.ByteLength is 0".to_owned()));
1975 }
1976
1977 // If chunk.[[ViewedArrayBuffer]].[[ByteLength]] is 0, throw a TypeError exception.
1978 if chunk.viewed_buffer_array_byte_length(cx) == 0 {
1979 return Err(Error::Type(
1980 "chunk.ViewedArrayBuffer.ByteLength is 0".to_owned(),
1981 ));
1982 }
1983
1984 // If this.[[closeRequested]] is true, throw a TypeError exception.
1985 if self.close_requested.get() {
1986 return Err(Error::Type("closeRequested is true".to_owned()));
1987 }
1988
1989 // If this.[[stream]].[[state]] is not "readable", throw a TypeError exception.
1990 if !self.stream.get().unwrap().is_readable() {
1991 return Err(Error::Type("stream is not readable".to_owned()));
1992 }
1993
1994 // Return ? ReadableByteStreamControllerEnqueue(this, chunk).
1995 self.enqueue(cx, chunk, can_gc)
1996 }
1997
1998 /// <https://streams.spec.whatwg.org/#rbs-controller-error>
1999 fn Error(&self, _cx: SafeJSContext, e: SafeHandleValue, can_gc: CanGc) -> Fallible<()> {
2000 // Perform ! ReadableByteStreamControllerError(this, e).
2001 self.error(e, can_gc);
2002 Ok(())
2003 }
2004}