script/
body.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
4
5use std::rc::Rc;
6use std::{ptr, slice, str};
7
8use constellation_traits::BlobImpl;
9use encoding_rs::{Encoding, UTF_8};
10use ipc_channel::ipc::{self, IpcReceiver, IpcSender, IpcSharedMemory};
11use ipc_channel::router::ROUTER;
12use js::jsapi::{Heap, JS_ClearPendingException, JSObject, Value as JSValue};
13use js::jsval::{JSVal, UndefinedValue};
14use js::rust::HandleValue;
15use js::rust::wrappers::{JS_GetPendingException, JS_ParseJSON};
16use js::typedarray::{ArrayBufferU8, Uint8};
17use mime::{self, Mime};
18use net_traits::request::{
19    BodyChunkRequest, BodyChunkResponse, BodySource as NetBodySource, RequestBody,
20};
21use url::form_urlencoded;
22
23use crate::dom::bindings::buffer_source::create_buffer_source;
24use crate::dom::bindings::codegen::Bindings::BlobBinding::Blob_Binding::BlobMethods;
25use crate::dom::bindings::codegen::Bindings::FormDataBinding::FormDataMethods;
26use crate::dom::bindings::codegen::Bindings::XMLHttpRequestBinding::BodyInit;
27use crate::dom::bindings::error::{Error, Fallible};
28use crate::dom::bindings::refcounted::Trusted;
29use crate::dom::bindings::reflector::{DomGlobal, DomObject};
30use crate::dom::bindings::root::{Dom, DomRoot};
31use crate::dom::bindings::str::{DOMString, USVString};
32use crate::dom::bindings::trace::RootedTraceableBox;
33use crate::dom::blob::{Blob, normalize_type_string};
34use crate::dom::formdata::FormData;
35use crate::dom::globalscope::GlobalScope;
36use crate::dom::html::htmlformelement::{encode_multipart_form_data, generate_boundary};
37use crate::dom::promise::Promise;
38use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
39use crate::dom::readablestream::{ReadableStream, get_read_promise_bytes, get_read_promise_done};
40use crate::dom::urlsearchparams::URLSearchParams;
41use crate::realms::{AlreadyInRealm, InRealm, enter_realm};
42use crate::script_runtime::{CanGc, JSContext};
43use crate::task_source::SendableTaskSource;
44
45/// The Dom object, or ReadableStream, that is the source of a body.
46/// <https://fetch.spec.whatwg.org/#concept-body-source>
47#[derive(Clone, PartialEq)]
48pub(crate) enum BodySource {
49    /// A ReadableStream comes with a null-source.
50    Null,
51    /// Another Dom object as source,
52    /// TODO: store the actual object
53    /// and re-extract a stream on re-direct.
54    Object,
55}
56
57/// The reason to stop reading from the body.
58enum StopReading {
59    /// The stream has errored.
60    Error,
61    /// The stream is done.
62    Done,
63}
64
65/// The IPC route handler
66/// for <https://fetch.spec.whatwg.org/#concept-request-transmit-body>.
67/// This route runs in the script process,
68/// and will queue tasks to perform operations
69/// on the stream and transmit body chunks over IPC.
70#[derive(Clone)]
71struct TransmitBodyConnectHandler {
72    stream: Trusted<ReadableStream>,
73    task_source: SendableTaskSource,
74    bytes_sender: Option<IpcSender<BodyChunkResponse>>,
75    control_sender: IpcSender<BodyChunkRequest>,
76    in_memory: Option<IpcSharedMemory>,
77    in_memory_done: bool,
78    source: BodySource,
79}
80
81impl TransmitBodyConnectHandler {
82    pub(crate) fn new(
83        stream: Trusted<ReadableStream>,
84        task_source: SendableTaskSource,
85        control_sender: IpcSender<BodyChunkRequest>,
86        in_memory: Option<IpcSharedMemory>,
87        source: BodySource,
88    ) -> TransmitBodyConnectHandler {
89        TransmitBodyConnectHandler {
90            stream,
91            task_source,
92            bytes_sender: None,
93            control_sender,
94            in_memory,
95            in_memory_done: false,
96            source,
97        }
98    }
99
100    /// Reset `in_memory_done`, called when a stream is
101    /// re-extracted from the source to support a re-direct.
102    pub(crate) fn reset_in_memory_done(&mut self) {
103        self.in_memory_done = false;
104    }
105
106    /// Re-extract the source to support streaming it again for a re-direct.
107    /// TODO: actually re-extract the source, instead of just cloning data, to support Blob.
108    fn re_extract(&mut self, chunk_request_receiver: IpcReceiver<BodyChunkRequest>) {
109        let mut body_handler = self.clone();
110        body_handler.reset_in_memory_done();
111
112        ROUTER.add_typed_route(
113            chunk_request_receiver,
114            Box::new(move |message| {
115                let request = message.unwrap();
116                match request {
117                    BodyChunkRequest::Connect(sender) => {
118                        body_handler.start_reading(sender);
119                    },
120                    BodyChunkRequest::Extract(receiver) => {
121                        body_handler.re_extract(receiver);
122                    },
123                    BodyChunkRequest::Chunk => body_handler.transmit_source(),
124                    // Note: this is actually sent from this process
125                    // by the TransmitBodyPromiseHandler when reading stops.
126                    BodyChunkRequest::Done => {
127                        body_handler.stop_reading(StopReading::Done);
128                    },
129                    // Note: this is actually sent from this process
130                    // by the TransmitBodyPromiseHandler when the stream errors.
131                    BodyChunkRequest::Error => {
132                        body_handler.stop_reading(StopReading::Error);
133                    },
134                }
135            }),
136        );
137    }
138
139    /// In case of re-direct, and of a source available in memory,
140    /// send it all in one chunk.
141    ///
142    /// TODO: this method should be deprecated
143    /// in favor of making `re_extract` actually re-extract a stream from the source.
144    /// See #26686
145    fn transmit_source(&mut self) {
146        if self.in_memory_done {
147            // Step 5.1.3
148            self.stop_reading(StopReading::Done);
149            return;
150        }
151
152        if let BodySource::Null = self.source {
153            panic!("ReadableStream(Null) sources should not re-direct.");
154        }
155
156        if let Some(bytes) = self.in_memory.clone() {
157            // The memoized bytes are sent so we mark it as done again
158            self.in_memory_done = true;
159            let _ = self
160                .bytes_sender
161                .as_ref()
162                .expect("No bytes sender to transmit source.")
163                .send(BodyChunkResponse::Chunk(bytes));
164            return;
165        }
166        warn!("Re-directs for file-based Blobs not supported yet.");
167    }
168
169    /// Take the IPC sender sent by `net`, so we can send body chunks with it.
170    /// Also the entry point to <https://fetch.spec.whatwg.org/#concept-request-transmit-body>
171    fn start_reading(&mut self, sender: IpcSender<BodyChunkResponse>) {
172        self.bytes_sender = Some(sender);
173
174        // If we're using an actual ReadableStream, acquire a reader for it.
175        if self.source == BodySource::Null {
176            let stream = self.stream.clone();
177            self.task_source
178                .queue(task!(start_reading_request_body_stream: move || {
179                    // Step 1, Let body be request’s body.
180                    let rooted_stream = stream.root();
181
182                    // TODO: Step 2, If body is null.
183
184                    // Step 3, get a reader for stream.
185                    rooted_stream.acquire_default_reader(CanGc::note())
186                        .expect("Couldn't acquire a reader for the body stream.");
187
188                    // Note: this algorithm continues when the first chunk is requested by `net`.
189                }));
190        }
191    }
192
193    /// Drop the IPC sender sent by `net`
194    fn stop_reading(&mut self, reason: StopReading) {
195        let bytes_sender = self
196            .bytes_sender
197            .take()
198            .expect("Stop reading called multiple times on TransmitBodyConnectHandler.");
199        match reason {
200            StopReading::Error => {
201                let _ = bytes_sender.send(BodyChunkResponse::Error);
202            },
203            StopReading::Done => {
204                let _ = bytes_sender.send(BodyChunkResponse::Done);
205            },
206        }
207    }
208
209    /// Step 4 and following of <https://fetch.spec.whatwg.org/#concept-request-transmit-body>
210    fn transmit_body_chunk(&mut self) {
211        if self.in_memory_done {
212            // Step 5.1.3
213            self.stop_reading(StopReading::Done);
214            return;
215        }
216
217        let stream = self.stream.clone();
218        let control_sender = self.control_sender.clone();
219        let bytes_sender = self
220            .bytes_sender
221            .clone()
222            .expect("No bytes sender to transmit chunk.");
223
224        // In case of the data being in-memory, send everything in one chunk, by-passing SpiderMonkey.
225        if let Some(bytes) = self.in_memory.clone() {
226            let _ = bytes_sender.send(BodyChunkResponse::Chunk(bytes));
227            // Mark this body as `done` so that we can stop reading in the next tick,
228            // matching the behavior of the promise-based flow
229            self.in_memory_done = true;
230            return;
231        }
232
233        self.task_source.queue(
234            task!(setup_native_body_promise_handler: move || {
235                let rooted_stream = stream.root();
236                let global = rooted_stream.global();
237                let cx = GlobalScope::get_cx();
238
239                // Step 4, the result of reading a chunk from body’s stream with reader.
240                let promise = rooted_stream.read_a_chunk(CanGc::note());
241
242                // Step 5, the parallel steps waiting for and handling the result of the read promise,
243                // are a combination of the promise native handler here,
244                // and the corresponding IPC route in `component::net::http_loader`.
245                rooted!(in(*cx) let mut promise_handler = Some(TransmitBodyPromiseHandler {
246                    bytes_sender: bytes_sender.clone(),
247                    stream: Dom::from_ref(&rooted_stream.clone()),
248                    control_sender: control_sender.clone(),
249                }));
250
251                rooted!(in(*cx) let mut rejection_handler = Some(TransmitBodyPromiseRejectionHandler {
252                    bytes_sender,
253                    stream: Dom::from_ref(&rooted_stream.clone()),
254                    control_sender,
255                }));
256
257                let handler =
258                    PromiseNativeHandler::new(&global, promise_handler.take().map(|h| Box::new(h) as Box<_>), rejection_handler.take().map(|h| Box::new(h) as Box<_>), CanGc::note());
259
260                let realm = enter_realm(&*global);
261                let comp = InRealm::Entered(&realm);
262                promise.append_native_handler(&handler, comp, CanGc::note());
263            })
264        );
265    }
266}
267
268/// The handler of read promises of body streams used in
269/// <https://fetch.spec.whatwg.org/#concept-request-transmit-body>.
270#[derive(Clone, JSTraceable, MallocSizeOf)]
271#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
272struct TransmitBodyPromiseHandler {
273    #[ignore_malloc_size_of = "Channels are hard"]
274    #[no_trace]
275    bytes_sender: IpcSender<BodyChunkResponse>,
276    stream: Dom<ReadableStream>,
277    #[ignore_malloc_size_of = "Channels are hard"]
278    #[no_trace]
279    control_sender: IpcSender<BodyChunkRequest>,
280}
281
282impl js::gc::Rootable for TransmitBodyPromiseHandler {}
283
284impl Callback for TransmitBodyPromiseHandler {
285    /// Step 5 of <https://fetch.spec.whatwg.org/#concept-request-transmit-body>
286    fn callback(&self, cx: JSContext, v: HandleValue, _realm: InRealm, can_gc: CanGc) {
287        let is_done = match get_read_promise_done(cx, &v, can_gc) {
288            Ok(is_done) => is_done,
289            Err(_) => {
290                // Step 5.5, the "otherwise" steps.
291                // TODO: terminate fetch.
292                let _ = self.control_sender.send(BodyChunkRequest::Done);
293                return self.stream.stop_reading(can_gc);
294            },
295        };
296
297        if is_done {
298            // Step 5.3, the "done" steps.
299            // TODO: queue a fetch task on request to process request end-of-body.
300            let _ = self.control_sender.send(BodyChunkRequest::Done);
301            return self.stream.stop_reading(can_gc);
302        }
303
304        let chunk = match get_read_promise_bytes(cx, &v, can_gc) {
305            Ok(chunk) => chunk,
306            Err(_) => {
307                // Step 5.5, the "otherwise" steps.
308                let _ = self.control_sender.send(BodyChunkRequest::Error);
309                return self.stream.stop_reading(can_gc);
310            },
311        };
312
313        // Step 5.1 and 5.2, transmit chunk.
314        // Send the chunk to the body transmitter in net::http_loader::obtain_response.
315        // TODO: queue a fetch task on request to process request body for request.
316        let _ = self
317            .bytes_sender
318            .send(BodyChunkResponse::Chunk(IpcSharedMemory::from_bytes(
319                &chunk,
320            )));
321    }
322}
323
324/// The handler of read promises rejection of body streams used in
325/// <https://fetch.spec.whatwg.org/#concept-request-transmit-body>.
326#[derive(Clone, JSTraceable, MallocSizeOf)]
327#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
328struct TransmitBodyPromiseRejectionHandler {
329    #[ignore_malloc_size_of = "Channels are hard"]
330    #[no_trace]
331    bytes_sender: IpcSender<BodyChunkResponse>,
332    stream: Dom<ReadableStream>,
333    #[ignore_malloc_size_of = "Channels are hard"]
334    #[no_trace]
335    control_sender: IpcSender<BodyChunkRequest>,
336}
337
338impl js::gc::Rootable for TransmitBodyPromiseRejectionHandler {}
339
340impl Callback for TransmitBodyPromiseRejectionHandler {
341    /// <https://fetch.spec.whatwg.org/#concept-request-transmit-body>
342    fn callback(&self, _cx: JSContext, _v: HandleValue, _realm: InRealm, can_gc: CanGc) {
343        // Step 5.4, the "rejection" steps.
344        let _ = self.control_sender.send(BodyChunkRequest::Error);
345        self.stream.stop_reading(can_gc);
346    }
347}
348
349/// <https://fetch.spec.whatwg.org/#body-with-type>
350pub(crate) struct ExtractedBody {
351    /// <https://fetch.spec.whatwg.org/#concept-body-stream>
352    pub(crate) stream: DomRoot<ReadableStream>,
353    /// <https://fetch.spec.whatwg.org/#concept-body-source>
354    pub(crate) source: BodySource,
355    /// <https://fetch.spec.whatwg.org/#concept-body-total-bytes>
356    pub(crate) total_bytes: Option<usize>,
357    /// <https://fetch.spec.whatwg.org/#body-with-type-type>
358    pub(crate) content_type: Option<DOMString>,
359}
360
361impl ExtractedBody {
362    /// Build a request body from the extracted body,
363    /// to be sent over IPC to net to use with `concept-request-transmit-body`,
364    /// see <https://fetch.spec.whatwg.org/#concept-request-transmit-body>.
365    ///
366    /// Also returning the corresponding readable stream,
367    /// to be stored on the request in script,
368    /// and potentially used as part of `consume_body`,
369    /// see <https://fetch.spec.whatwg.org/#concept-body-consume-body>
370    ///
371    /// Transmitting a body over fetch, and consuming it in script,
372    /// are mutually exclusive operations, since each will lock the stream to a reader.
373    pub(crate) fn into_net_request_body(self) -> (RequestBody, DomRoot<ReadableStream>) {
374        let ExtractedBody {
375            stream,
376            total_bytes,
377            content_type: _,
378            source,
379        } = self;
380
381        // First, setup some infra to be used to transmit body
382        //  from `components::script` to `components::net`.
383        let (chunk_request_sender, chunk_request_receiver) = ipc::channel().unwrap();
384
385        let trusted_stream = Trusted::new(&*stream);
386
387        let global = stream.global();
388        let task_source = global.task_manager().networking_task_source();
389
390        // In case of the data being in-memory, send everything in one chunk, by-passing SM.
391        let in_memory = stream.get_in_memory_bytes();
392
393        let net_source = match source {
394            BodySource::Null => NetBodySource::Null,
395            _ => NetBodySource::Object,
396        };
397
398        let mut body_handler = TransmitBodyConnectHandler::new(
399            trusted_stream,
400            task_source.into(),
401            chunk_request_sender.clone(),
402            in_memory,
403            source,
404        );
405
406        ROUTER.add_typed_route(
407            chunk_request_receiver,
408            Box::new(move |message| {
409                match message.unwrap() {
410                    BodyChunkRequest::Connect(sender) => {
411                        body_handler.start_reading(sender);
412                    },
413                    BodyChunkRequest::Extract(receiver) => {
414                        body_handler.re_extract(receiver);
415                    },
416                    BodyChunkRequest::Chunk => body_handler.transmit_body_chunk(),
417                    // Note: this is actually sent from this process
418                    // by the TransmitBodyPromiseHandler when reading stops.
419                    BodyChunkRequest::Done => {
420                        body_handler.stop_reading(StopReading::Done);
421                    },
422                    // Note: this is actually sent from this process
423                    // by the TransmitBodyPromiseHandler when the stream errors.
424                    BodyChunkRequest::Error => {
425                        body_handler.stop_reading(StopReading::Error);
426                    },
427                }
428            }),
429        );
430
431        // Return `components::net` view into this request body,
432        // which can be used by `net` to transmit it over the network.
433        let request_body = RequestBody::new(chunk_request_sender, net_source, total_bytes);
434
435        // Also return the stream for this body, which can be used by script to consume it.
436        (request_body, stream)
437    }
438
439    /// Is the data of the stream of this extracted body available in memory?
440    pub(crate) fn in_memory(&self) -> bool {
441        self.stream.in_memory()
442    }
443}
444
445/// <https://fetch.spec.whatwg.org/#concept-bodyinit-extract>
446pub(crate) trait Extractable {
447    fn extract(&self, global: &GlobalScope, can_gc: CanGc) -> Fallible<ExtractedBody>;
448}
449
450impl Extractable for BodyInit {
451    // https://fetch.spec.whatwg.org/#concept-bodyinit-extract
452    fn extract(&self, global: &GlobalScope, can_gc: CanGc) -> Fallible<ExtractedBody> {
453        match self {
454            BodyInit::String(s) => s.extract(global, can_gc),
455            BodyInit::URLSearchParams(usp) => usp.extract(global, can_gc),
456            BodyInit::Blob(b) => b.extract(global, can_gc),
457            BodyInit::FormData(formdata) => formdata.extract(global, can_gc),
458            BodyInit::ArrayBuffer(typedarray) => {
459                let bytes = typedarray.to_vec();
460                let total_bytes = bytes.len();
461                let stream = ReadableStream::new_from_bytes(global, bytes, can_gc)?;
462                Ok(ExtractedBody {
463                    stream,
464                    total_bytes: Some(total_bytes),
465                    content_type: None,
466                    source: BodySource::Object,
467                })
468            },
469            BodyInit::ArrayBufferView(typedarray) => {
470                let bytes = typedarray.to_vec();
471                let total_bytes = bytes.len();
472                let stream = ReadableStream::new_from_bytes(global, bytes, can_gc)?;
473                Ok(ExtractedBody {
474                    stream,
475                    total_bytes: Some(total_bytes),
476                    content_type: None,
477                    source: BodySource::Object,
478                })
479            },
480            BodyInit::ReadableStream(stream) => {
481                // TODO:
482                // 1. If the keepalive flag is set, then throw a TypeError.
483
484                if stream.is_locked() || stream.is_disturbed() {
485                    return Err(Error::Type(
486                        "The body's stream is disturbed or locked".to_string(),
487                    ));
488                }
489
490                Ok(ExtractedBody {
491                    stream: stream.clone(),
492                    total_bytes: None,
493                    content_type: None,
494                    source: BodySource::Null,
495                })
496            },
497        }
498    }
499}
500
501impl Extractable for Vec<u8> {
502    fn extract(&self, global: &GlobalScope, can_gc: CanGc) -> Fallible<ExtractedBody> {
503        let bytes = self.clone();
504        let total_bytes = self.len();
505        let stream = ReadableStream::new_from_bytes(global, bytes, can_gc)?;
506        Ok(ExtractedBody {
507            stream,
508            total_bytes: Some(total_bytes),
509            content_type: None,
510            // A vec is used only in `submit_entity_body`.
511            source: BodySource::Object,
512        })
513    }
514}
515
516impl Extractable for Blob {
517    fn extract(&self, _global: &GlobalScope, can_gc: CanGc) -> Fallible<ExtractedBody> {
518        let blob_type = self.Type();
519        let content_type = if blob_type.is_empty() {
520            None
521        } else {
522            Some(blob_type)
523        };
524        let total_bytes = self.Size() as usize;
525        let stream = self.get_stream(can_gc)?;
526        Ok(ExtractedBody {
527            stream,
528            total_bytes: Some(total_bytes),
529            content_type,
530            source: BodySource::Object,
531        })
532    }
533}
534
535impl Extractable for DOMString {
536    fn extract(&self, global: &GlobalScope, can_gc: CanGc) -> Fallible<ExtractedBody> {
537        let bytes = self.as_bytes().to_owned();
538        let total_bytes = bytes.len();
539        let content_type = Some(DOMString::from("text/plain;charset=UTF-8"));
540        let stream = ReadableStream::new_from_bytes(global, bytes, can_gc)?;
541        Ok(ExtractedBody {
542            stream,
543            total_bytes: Some(total_bytes),
544            content_type,
545            source: BodySource::Object,
546        })
547    }
548}
549
550impl Extractable for FormData {
551    fn extract(&self, global: &GlobalScope, can_gc: CanGc) -> Fallible<ExtractedBody> {
552        let boundary = generate_boundary();
553        let bytes = encode_multipart_form_data(&mut self.datums(), boundary.clone(), UTF_8);
554        let total_bytes = bytes.len();
555        let content_type = Some(DOMString::from(format!(
556            "multipart/form-data; boundary={}",
557            boundary
558        )));
559        let stream = ReadableStream::new_from_bytes(global, bytes, can_gc)?;
560        Ok(ExtractedBody {
561            stream,
562            total_bytes: Some(total_bytes),
563            content_type,
564            source: BodySource::Object,
565        })
566    }
567}
568
569impl Extractable for URLSearchParams {
570    fn extract(&self, global: &GlobalScope, can_gc: CanGc) -> Fallible<ExtractedBody> {
571        let bytes = self.serialize_utf8().into_bytes();
572        let total_bytes = bytes.len();
573        let content_type = Some(DOMString::from(
574            "application/x-www-form-urlencoded;charset=UTF-8",
575        ));
576        let stream = ReadableStream::new_from_bytes(global, bytes, can_gc)?;
577        Ok(ExtractedBody {
578            stream,
579            total_bytes: Some(total_bytes),
580            content_type,
581            source: BodySource::Object,
582        })
583    }
584}
585
586#[derive(Clone, Copy, JSTraceable, MallocSizeOf)]
587pub(crate) enum BodyType {
588    Blob,
589    Bytes,
590    FormData,
591    Json,
592    Text,
593    ArrayBuffer,
594}
595
596pub(crate) enum FetchedData {
597    Text(String),
598    Json(RootedTraceableBox<Heap<JSValue>>),
599    BlobData(DomRoot<Blob>),
600    Bytes(RootedTraceableBox<Heap<*mut JSObject>>),
601    FormData(DomRoot<FormData>),
602    ArrayBuffer(RootedTraceableBox<Heap<*mut JSObject>>),
603    JSException(RootedTraceableBox<Heap<JSVal>>),
604}
605
606/// <https://fetch.spec.whatwg.org/#concept-body-consume-body>
607/// <https://fetch.spec.whatwg.org/#body-fully-read>
608/// A combination of parts of both algorithms,
609/// `body-fully-read` can be fully implemented, and separated, later,
610/// see #36049.
611#[cfg_attr(crown, allow(crown::unrooted_must_root))]
612pub(crate) fn consume_body<T: BodyMixin + DomObject>(
613    object: &T,
614    body_type: BodyType,
615    can_gc: CanGc,
616) -> Rc<Promise> {
617    let global = object.global();
618    let cx = GlobalScope::get_cx();
619
620    // Enter the realm of the object whose body is being consumed.
621    let realm = enter_realm(&*global);
622    let comp = InRealm::Entered(&realm);
623
624    // Let promise be a new promise.
625    // Note: re-ordered so we can return the promise below.
626    let promise = Promise::new_in_current_realm(comp, can_gc);
627
628    // If object is unusable, then return a promise rejected with a TypeError.
629    if object.is_unusable() {
630        promise.reject_error(
631            Error::Type("The body's stream is disturbed or locked".to_string()),
632            can_gc,
633        );
634        return promise;
635    }
636
637    let stream = match object.body() {
638        Some(stream) => stream,
639        None => {
640            // If object’s body is null, then run successSteps with an empty byte sequence.
641            resolve_result_promise(
642                body_type,
643                &promise,
644                object.get_mime_type(can_gc),
645                Vec::with_capacity(0),
646                cx,
647                can_gc,
648            );
649            return promise;
650        },
651    };
652
653    // Note: from `fully_read`.
654    // Let reader be the result of getting a reader for body’s stream.
655    // If that threw an exception,
656    // then run errorSteps with that exception and return.
657    let reader = match stream.acquire_default_reader(can_gc) {
658        Ok(r) => r,
659        Err(e) => {
660            promise.reject_error(e, can_gc);
661            return promise;
662        },
663    };
664
665    // Let errorSteps given error be to reject promise with error.
666    let error_promise = promise.clone();
667
668    // Let successSteps given a byte sequence data be to resolve promise
669    // with the result of running convertBytesToJSValue with data.
670    // If that threw an exception, then run errorSteps with that exception.
671    let mime_type = object.get_mime_type(can_gc);
672    let success_promise = promise.clone();
673
674    // Read all bytes from reader, given successSteps and errorSteps.
675    // Note: spec uses an intermediary concept of `fully_read`,
676    // which seems useful when invoking fetch from other places.
677    // TODO: #36049
678    reader.read_all_bytes(
679        cx,
680        &global,
681        Rc::new(move |bytes: &[u8]| {
682            resolve_result_promise(
683                body_type,
684                &success_promise,
685                mime_type.clone(),
686                bytes.to_vec(),
687                cx,
688                can_gc,
689            );
690        }),
691        Rc::new(move |cx, v| {
692            error_promise.reject(cx, v, can_gc);
693        }),
694        comp,
695        can_gc,
696    );
697
698    promise
699}
700
701/// The success steps of
702/// <https://fetch.spec.whatwg.org/#concept-body-consume-body>.
703fn resolve_result_promise(
704    body_type: BodyType,
705    promise: &Promise,
706    mime_type: Vec<u8>,
707    body: Vec<u8>,
708    cx: JSContext,
709    can_gc: CanGc,
710) {
711    let pkg_data_results = run_package_data_algorithm(cx, body, body_type, mime_type, can_gc);
712
713    match pkg_data_results {
714        Ok(results) => {
715            match results {
716                FetchedData::Text(s) => promise.resolve_native(&USVString(s), can_gc),
717                FetchedData::Json(j) => promise.resolve_native(&j, can_gc),
718                FetchedData::BlobData(b) => promise.resolve_native(&b, can_gc),
719                FetchedData::FormData(f) => promise.resolve_native(&f, can_gc),
720                FetchedData::Bytes(b) => promise.resolve_native(&b, can_gc),
721                FetchedData::ArrayBuffer(a) => promise.resolve_native(&a, can_gc),
722                FetchedData::JSException(e) => promise.reject_native(&e.handle(), can_gc),
723            };
724        },
725        Err(err) => promise.reject_error(err, can_gc),
726    }
727}
728
729/// The algorithm that takes a byte sequence
730/// and returns a JavaScript value or throws an exception of
731/// <https://fetch.spec.whatwg.org/#concept-body-consume-body>.
732fn run_package_data_algorithm(
733    cx: JSContext,
734    bytes: Vec<u8>,
735    body_type: BodyType,
736    mime_type: Vec<u8>,
737    can_gc: CanGc,
738) -> Fallible<FetchedData> {
739    let mime = &*mime_type;
740    let in_realm_proof = AlreadyInRealm::assert_for_cx(cx);
741    let global = GlobalScope::from_safe_context(cx, InRealm::Already(&in_realm_proof));
742    match body_type {
743        BodyType::Text => run_text_data_algorithm(bytes),
744        BodyType::Json => run_json_data_algorithm(cx, bytes),
745        BodyType::Blob => run_blob_data_algorithm(&global, bytes, mime, can_gc),
746        BodyType::FormData => run_form_data_algorithm(&global, bytes, mime, can_gc),
747        BodyType::ArrayBuffer => run_array_buffer_data_algorithm(cx, bytes, can_gc),
748        BodyType::Bytes => run_bytes_data_algorithm(cx, bytes, can_gc),
749    }
750}
751
752/// <https://fetch.spec.whatwg.org/#ref-for-concept-body-consume-body%E2%91%A4>
753fn run_text_data_algorithm(bytes: Vec<u8>) -> Fallible<FetchedData> {
754    // This implements the Encoding standard's "decode UTF-8", which removes the
755    // BOM if present.
756    let no_bom_bytes = if bytes.starts_with(b"\xEF\xBB\xBF") {
757        &bytes[3..]
758    } else {
759        &bytes
760    };
761    Ok(FetchedData::Text(
762        String::from_utf8_lossy(no_bom_bytes).into_owned(),
763    ))
764}
765
766#[allow(unsafe_code)]
767/// <https://fetch.spec.whatwg.org/#ref-for-concept-body-consume-body%E2%91%A3>
768fn run_json_data_algorithm(cx: JSContext, bytes: Vec<u8>) -> Fallible<FetchedData> {
769    // The JSON spec allows implementations to either ignore UTF-8 BOM or treat it as an error.
770    // `JS_ParseJSON` treats this as an error, so it is necessary for us to strip it if present.
771    //
772    // https://datatracker.ietf.org/doc/html/rfc8259#section-8.1
773    let json_text = decode_to_utf16_with_bom_removal(&bytes, UTF_8);
774    rooted!(in(*cx) let mut rval = UndefinedValue());
775    unsafe {
776        if !JS_ParseJSON(
777            *cx,
778            json_text.as_ptr(),
779            json_text.len() as u32,
780            rval.handle_mut(),
781        ) {
782            rooted!(in(*cx) let mut exception = UndefinedValue());
783            assert!(JS_GetPendingException(*cx, exception.handle_mut()));
784            JS_ClearPendingException(*cx);
785            return Ok(FetchedData::JSException(RootedTraceableBox::from_box(
786                Heap::boxed(exception.get()),
787            )));
788        }
789        let rooted_heap = RootedTraceableBox::from_box(Heap::boxed(rval.get()));
790        Ok(FetchedData::Json(rooted_heap))
791    }
792}
793
794/// <https://fetch.spec.whatwg.org/#ref-for-concept-body-consume-body%E2%91%A0>
795fn run_blob_data_algorithm(
796    root: &GlobalScope,
797    bytes: Vec<u8>,
798    mime: &[u8],
799    can_gc: CanGc,
800) -> Fallible<FetchedData> {
801    let mime_string = if let Ok(s) = String::from_utf8(mime.to_vec()) {
802        s
803    } else {
804        "".to_string()
805    };
806    let blob = Blob::new(
807        root,
808        BlobImpl::new_from_bytes(bytes, normalize_type_string(&mime_string)),
809        can_gc,
810    );
811    Ok(FetchedData::BlobData(blob))
812}
813
814/// <https://fetch.spec.whatwg.org/#ref-for-concept-body-consume-body%E2%91%A2>
815fn run_form_data_algorithm(
816    root: &GlobalScope,
817    bytes: Vec<u8>,
818    mime: &[u8],
819    can_gc: CanGc,
820) -> Fallible<FetchedData> {
821    let mime_str = str::from_utf8(mime).unwrap_or_default();
822    let mime: Mime = mime_str
823        .parse()
824        .map_err(|_| Error::Type("Inappropriate MIME-type for Body".to_string()))?;
825
826    // TODO
827    // ... Parser for Mime(TopLevel::Multipart, SubLevel::FormData, _)
828    // ... is not fully determined yet.
829    if mime.type_() == mime::APPLICATION && mime.subtype() == mime::WWW_FORM_URLENCODED {
830        let entries = form_urlencoded::parse(&bytes);
831        let formdata = FormData::new(None, root, can_gc);
832        for (k, e) in entries {
833            formdata.Append(USVString(k.into_owned()), USVString(e.into_owned()));
834        }
835        return Ok(FetchedData::FormData(formdata));
836    }
837
838    Err(Error::Type("Inappropriate MIME-type for Body".to_string()))
839}
840
841/// <https://fetch.spec.whatwg.org/#ref-for-concept-body-consume-body%E2%91%A1>
842fn run_bytes_data_algorithm(cx: JSContext, bytes: Vec<u8>, can_gc: CanGc) -> Fallible<FetchedData> {
843    rooted!(in(*cx) let mut array_buffer_ptr = ptr::null_mut::<JSObject>());
844
845    create_buffer_source::<Uint8>(cx, &bytes, array_buffer_ptr.handle_mut(), can_gc)
846        .map_err(|_| Error::JSFailed)?;
847
848    let rooted_heap = RootedTraceableBox::from_box(Heap::boxed(array_buffer_ptr.get()));
849    Ok(FetchedData::Bytes(rooted_heap))
850}
851
852/// <https://fetch.spec.whatwg.org/#ref-for-concept-body-consume-body>
853pub(crate) fn run_array_buffer_data_algorithm(
854    cx: JSContext,
855    bytes: Vec<u8>,
856    can_gc: CanGc,
857) -> Fallible<FetchedData> {
858    rooted!(in(*cx) let mut array_buffer_ptr = ptr::null_mut::<JSObject>());
859
860    create_buffer_source::<ArrayBufferU8>(cx, &bytes, array_buffer_ptr.handle_mut(), can_gc)
861        .map_err(|_| Error::JSFailed)?;
862
863    let rooted_heap = RootedTraceableBox::from_box(Heap::boxed(array_buffer_ptr.get()));
864    Ok(FetchedData::ArrayBuffer(rooted_heap))
865}
866
867#[allow(unsafe_code)]
868pub(crate) fn decode_to_utf16_with_bom_removal(
869    bytes: &[u8],
870    encoding: &'static Encoding,
871) -> Vec<u16> {
872    let mut decoder = encoding.new_decoder_with_bom_removal();
873    let capacity = decoder
874        .max_utf16_buffer_length(bytes.len())
875        .expect("Overflow");
876    let mut utf16 = Vec::with_capacity(capacity);
877    let extra = unsafe { slice::from_raw_parts_mut(utf16.as_mut_ptr(), capacity) };
878    let (_, read, written, _) = decoder.decode_to_utf16(bytes, extra, true);
879    assert_eq!(read, bytes.len());
880    unsafe { utf16.set_len(written) }
881    utf16
882}
883
884/// <https://fetch.spec.whatwg.org/#body>
885pub(crate) trait BodyMixin {
886    /// <https://fetch.spec.whatwg.org/#dom-body-bodyused>
887    fn is_body_used(&self) -> bool;
888    /// <https://fetch.spec.whatwg.org/#body-unusable>
889    fn is_unusable(&self) -> bool;
890    /// <https://fetch.spec.whatwg.org/#dom-body-body>
891    fn body(&self) -> Option<DomRoot<ReadableStream>>;
892    /// <https://fetch.spec.whatwg.org/#concept-body-mime-type>
893    fn get_mime_type(&self, can_gc: CanGc) -> Vec<u8>;
894}