script/
body.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
4
5use std::rc::Rc;
6use std::{ptr, slice, str};
7
8use constellation_traits::BlobImpl;
9use encoding_rs::{Encoding, UTF_8};
10use ipc_channel::ipc::{self, IpcReceiver, IpcSender, IpcSharedMemory};
11use ipc_channel::router::ROUTER;
12use js::jsapi::{Heap, JS_ClearPendingException, JSObject, Value as JSValue};
13use js::jsval::{JSVal, UndefinedValue};
14use js::rust::HandleValue;
15use js::rust::wrappers::{JS_GetPendingException, JS_ParseJSON};
16use js::typedarray::{ArrayBufferU8, Uint8};
17use mime::{self, Mime};
18use net_traits::request::{
19    BodyChunkRequest, BodyChunkResponse, BodySource as NetBodySource, RequestBody,
20};
21use url::form_urlencoded;
22
23use crate::dom::bindings::buffer_source::create_buffer_source;
24use crate::dom::bindings::codegen::Bindings::BlobBinding::Blob_Binding::BlobMethods;
25use crate::dom::bindings::codegen::Bindings::FormDataBinding::FormDataMethods;
26use crate::dom::bindings::codegen::Bindings::XMLHttpRequestBinding::BodyInit;
27use crate::dom::bindings::error::{Error, Fallible};
28use crate::dom::bindings::refcounted::Trusted;
29use crate::dom::bindings::reflector::{DomGlobal, DomObject};
30use crate::dom::bindings::root::{Dom, DomRoot};
31use crate::dom::bindings::str::{DOMString, USVString};
32use crate::dom::bindings::trace::RootedTraceableBox;
33use crate::dom::blob::{Blob, normalize_type_string};
34use crate::dom::formdata::FormData;
35use crate::dom::globalscope::GlobalScope;
36use crate::dom::html::htmlformelement::{encode_multipart_form_data, generate_boundary};
37use crate::dom::promise::Promise;
38use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
39use crate::dom::readablestream::{ReadableStream, get_read_promise_bytes, get_read_promise_done};
40use crate::dom::urlsearchparams::URLSearchParams;
41use crate::realms::{AlreadyInRealm, InRealm, enter_realm};
42use crate::script_runtime::{CanGc, JSContext};
43use crate::task_source::SendableTaskSource;
44
45/// The Dom object, or ReadableStream, that is the source of a body.
46/// <https://fetch.spec.whatwg.org/#concept-body-source>
47#[derive(Clone, PartialEq)]
48pub(crate) enum BodySource {
49    /// A ReadableStream comes with a null-source.
50    Null,
51    /// Another Dom object as source,
52    /// TODO: store the actual object
53    /// and re-extract a stream on re-direct.
54    Object,
55}
56
57/// The reason to stop reading from the body.
58enum StopReading {
59    /// The stream has errored.
60    Error,
61    /// The stream is done.
62    Done,
63}
64
65/// The IPC route handler
66/// for <https://fetch.spec.whatwg.org/#concept-request-transmit-body>.
67/// This route runs in the script process,
68/// and will queue tasks to perform operations
69/// on the stream and transmit body chunks over IPC.
70#[derive(Clone)]
71struct TransmitBodyConnectHandler {
72    stream: Trusted<ReadableStream>,
73    task_source: SendableTaskSource,
74    bytes_sender: Option<IpcSender<BodyChunkResponse>>,
75    control_sender: IpcSender<BodyChunkRequest>,
76    in_memory: Option<IpcSharedMemory>,
77    in_memory_done: bool,
78    source: BodySource,
79}
80
81impl TransmitBodyConnectHandler {
82    pub(crate) fn new(
83        stream: Trusted<ReadableStream>,
84        task_source: SendableTaskSource,
85        control_sender: IpcSender<BodyChunkRequest>,
86        in_memory: Option<IpcSharedMemory>,
87        source: BodySource,
88    ) -> TransmitBodyConnectHandler {
89        TransmitBodyConnectHandler {
90            stream,
91            task_source,
92            bytes_sender: None,
93            control_sender,
94            in_memory,
95            in_memory_done: false,
96            source,
97        }
98    }
99
100    /// Reset `in_memory_done`, called when a stream is
101    /// re-extracted from the source to support a re-direct.
102    pub(crate) fn reset_in_memory_done(&mut self) {
103        self.in_memory_done = false;
104    }
105
106    /// Re-extract the source to support streaming it again for a re-direct.
107    /// TODO: actually re-extract the source, instead of just cloning data, to support Blob.
108    fn re_extract(&mut self, chunk_request_receiver: IpcReceiver<BodyChunkRequest>) {
109        let mut body_handler = self.clone();
110        body_handler.reset_in_memory_done();
111
112        ROUTER.add_typed_route(
113            chunk_request_receiver,
114            Box::new(move |message| {
115                let request = message.unwrap();
116                match request {
117                    BodyChunkRequest::Connect(sender) => {
118                        body_handler.start_reading(sender);
119                    },
120                    BodyChunkRequest::Extract(receiver) => {
121                        body_handler.re_extract(receiver);
122                    },
123                    BodyChunkRequest::Chunk => body_handler.transmit_source(),
124                    // Note: this is actually sent from this process
125                    // by the TransmitBodyPromiseHandler when reading stops.
126                    BodyChunkRequest::Done => {
127                        body_handler.stop_reading(StopReading::Done);
128                    },
129                    // Note: this is actually sent from this process
130                    // by the TransmitBodyPromiseHandler when the stream errors.
131                    BodyChunkRequest::Error => {
132                        body_handler.stop_reading(StopReading::Error);
133                    },
134                }
135            }),
136        );
137    }
138
139    /// In case of re-direct, and of a source available in memory,
140    /// send it all in one chunk.
141    ///
142    /// TODO: this method should be deprecated
143    /// in favor of making `re_extract` actually re-extract a stream from the source.
144    /// See #26686
145    fn transmit_source(&mut self) {
146        if self.in_memory_done {
147            // Step 5.1.3
148            self.stop_reading(StopReading::Done);
149            return;
150        }
151
152        if let BodySource::Null = self.source {
153            panic!("ReadableStream(Null) sources should not re-direct.");
154        }
155
156        if let Some(bytes) = self.in_memory.clone() {
157            // The memoized bytes are sent so we mark it as done again
158            self.in_memory_done = true;
159            let _ = self
160                .bytes_sender
161                .as_ref()
162                .expect("No bytes sender to transmit source.")
163                .send(BodyChunkResponse::Chunk(bytes));
164            return;
165        }
166        warn!("Re-directs for file-based Blobs not supported yet.");
167    }
168
169    /// Take the IPC sender sent by `net`, so we can send body chunks with it.
170    /// Also the entry point to <https://fetch.spec.whatwg.org/#concept-request-transmit-body>
171    fn start_reading(&mut self, sender: IpcSender<BodyChunkResponse>) {
172        self.bytes_sender = Some(sender);
173
174        // If we're using an actual ReadableStream, acquire a reader for it.
175        if self.source == BodySource::Null {
176            let stream = self.stream.clone();
177            self.task_source
178                .queue(task!(start_reading_request_body_stream: move || {
179                    // Step 1, Let body be request’s body.
180                    let rooted_stream = stream.root();
181
182                    // TODO: Step 2, If body is null.
183
184                    // Step 3, get a reader for stream.
185                    rooted_stream.acquire_default_reader(CanGc::note())
186                        .expect("Couldn't acquire a reader for the body stream.");
187
188                    // Note: this algorithm continues when the first chunk is requested by `net`.
189                }));
190        }
191    }
192
193    /// Drop the IPC sender sent by `net`
194    fn stop_reading(&mut self, reason: StopReading) {
195        let bytes_sender = self
196            .bytes_sender
197            .take()
198            .expect("Stop reading called multiple times on TransmitBodyConnectHandler.");
199        match reason {
200            StopReading::Error => {
201                let _ = bytes_sender.send(BodyChunkResponse::Error);
202            },
203            StopReading::Done => {
204                let _ = bytes_sender.send(BodyChunkResponse::Done);
205            },
206        }
207    }
208
209    /// Step 4 and following of <https://fetch.spec.whatwg.org/#concept-request-transmit-body>
210    fn transmit_body_chunk(&mut self) {
211        if self.in_memory_done {
212            // Step 5.1.3
213            self.stop_reading(StopReading::Done);
214            return;
215        }
216
217        let stream = self.stream.clone();
218        let control_sender = self.control_sender.clone();
219        let bytes_sender = self
220            .bytes_sender
221            .clone()
222            .expect("No bytes sender to transmit chunk.");
223
224        // In case of the data being in-memory, send everything in one chunk, by-passing SpiderMonkey.
225        if let Some(bytes) = self.in_memory.clone() {
226            let _ = bytes_sender.send(BodyChunkResponse::Chunk(bytes));
227            // Mark this body as `done` so that we can stop reading in the next tick,
228            // matching the behavior of the promise-based flow
229            self.in_memory_done = true;
230            return;
231        }
232
233        self.task_source.queue(
234            task!(setup_native_body_promise_handler: move || {
235                let rooted_stream = stream.root();
236                let global = rooted_stream.global();
237
238                // Step 4, the result of reading a chunk from body’s stream with reader.
239                let promise = rooted_stream.read_a_chunk(CanGc::note());
240
241                // Step 5, the parallel steps waiting for and handling the result of the read promise,
242                // are a combination of the promise native handler here,
243                // and the corresponding IPC route in `component::net::http_loader`.
244                let promise_handler = Box::new(TransmitBodyPromiseHandler {
245                    bytes_sender: bytes_sender.clone(),
246                    stream: Dom::from_ref(&rooted_stream.clone()),
247                    control_sender: control_sender.clone(),
248                });
249
250                let rejection_handler = Box::new(TransmitBodyPromiseRejectionHandler {
251                    bytes_sender,
252                    stream: Dom::from_ref(&rooted_stream.clone()),
253                    control_sender,
254                });
255
256                let handler =
257                    PromiseNativeHandler::new(&global, Some(promise_handler), Some(rejection_handler), CanGc::note());
258
259                let realm = enter_realm(&*global);
260                let comp = InRealm::Entered(&realm);
261                promise.append_native_handler(&handler, comp, CanGc::note());
262            })
263        );
264    }
265}
266
267/// The handler of read promises of body streams used in
268/// <https://fetch.spec.whatwg.org/#concept-request-transmit-body>.
269#[derive(Clone, JSTraceable, MallocSizeOf)]
270#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
271struct TransmitBodyPromiseHandler {
272    #[ignore_malloc_size_of = "Channels are hard"]
273    #[no_trace]
274    bytes_sender: IpcSender<BodyChunkResponse>,
275    stream: Dom<ReadableStream>,
276    #[ignore_malloc_size_of = "Channels are hard"]
277    #[no_trace]
278    control_sender: IpcSender<BodyChunkRequest>,
279}
280
281impl Callback for TransmitBodyPromiseHandler {
282    /// Step 5 of <https://fetch.spec.whatwg.org/#concept-request-transmit-body>
283    fn callback(&self, cx: JSContext, v: HandleValue, _realm: InRealm, can_gc: CanGc) {
284        let is_done = match get_read_promise_done(cx, &v, can_gc) {
285            Ok(is_done) => is_done,
286            Err(_) => {
287                // Step 5.5, the "otherwise" steps.
288                // TODO: terminate fetch.
289                let _ = self.control_sender.send(BodyChunkRequest::Done);
290                return self.stream.stop_reading(can_gc);
291            },
292        };
293
294        if is_done {
295            // Step 5.3, the "done" steps.
296            // TODO: queue a fetch task on request to process request end-of-body.
297            let _ = self.control_sender.send(BodyChunkRequest::Done);
298            return self.stream.stop_reading(can_gc);
299        }
300
301        let chunk = match get_read_promise_bytes(cx, &v, can_gc) {
302            Ok(chunk) => chunk,
303            Err(_) => {
304                // Step 5.5, the "otherwise" steps.
305                let _ = self.control_sender.send(BodyChunkRequest::Error);
306                return self.stream.stop_reading(can_gc);
307            },
308        };
309
310        // Step 5.1 and 5.2, transmit chunk.
311        // Send the chunk to the body transmitter in net::http_loader::obtain_response.
312        // TODO: queue a fetch task on request to process request body for request.
313        let _ = self
314            .bytes_sender
315            .send(BodyChunkResponse::Chunk(IpcSharedMemory::from_bytes(
316                &chunk,
317            )));
318    }
319}
320
321/// The handler of read promises rejection of body streams used in
322/// <https://fetch.spec.whatwg.org/#concept-request-transmit-body>.
323#[derive(Clone, JSTraceable, MallocSizeOf)]
324#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
325struct TransmitBodyPromiseRejectionHandler {
326    #[ignore_malloc_size_of = "Channels are hard"]
327    #[no_trace]
328    bytes_sender: IpcSender<BodyChunkResponse>,
329    stream: Dom<ReadableStream>,
330    #[ignore_malloc_size_of = "Channels are hard"]
331    #[no_trace]
332    control_sender: IpcSender<BodyChunkRequest>,
333}
334
335impl Callback for TransmitBodyPromiseRejectionHandler {
336    /// <https://fetch.spec.whatwg.org/#concept-request-transmit-body>
337    fn callback(&self, _cx: JSContext, _v: HandleValue, _realm: InRealm, can_gc: CanGc) {
338        // Step 5.4, the "rejection" steps.
339        let _ = self.control_sender.send(BodyChunkRequest::Error);
340        self.stream.stop_reading(can_gc);
341    }
342}
343
344/// The result of <https://fetch.spec.whatwg.org/#concept-bodyinit-extract>
345pub(crate) struct ExtractedBody {
346    pub(crate) stream: DomRoot<ReadableStream>,
347    pub(crate) source: BodySource,
348    pub(crate) total_bytes: Option<usize>,
349    pub(crate) content_type: Option<DOMString>,
350}
351
352impl ExtractedBody {
353    /// Build a request body from the extracted body,
354    /// to be sent over IPC to net to use with `concept-request-transmit-body`,
355    /// see <https://fetch.spec.whatwg.org/#concept-request-transmit-body>.
356    ///
357    /// Also returning the corresponding readable stream,
358    /// to be stored on the request in script,
359    /// and potentially used as part of `consume_body`,
360    /// see <https://fetch.spec.whatwg.org/#concept-body-consume-body>
361    ///
362    /// Transmitting a body over fetch, and consuming it in script,
363    /// are mutually exclusive operations, since each will lock the stream to a reader.
364    pub(crate) fn into_net_request_body(self) -> (RequestBody, DomRoot<ReadableStream>) {
365        let ExtractedBody {
366            stream,
367            total_bytes,
368            content_type: _,
369            source,
370        } = self;
371
372        // First, setup some infra to be used to transmit body
373        //  from `components::script` to `components::net`.
374        let (chunk_request_sender, chunk_request_receiver) = ipc::channel().unwrap();
375
376        let trusted_stream = Trusted::new(&*stream);
377
378        let global = stream.global();
379        let task_source = global.task_manager().networking_task_source();
380
381        // In case of the data being in-memory, send everything in one chunk, by-passing SM.
382        let in_memory = stream.get_in_memory_bytes();
383
384        let net_source = match source {
385            BodySource::Null => NetBodySource::Null,
386            _ => NetBodySource::Object,
387        };
388
389        let mut body_handler = TransmitBodyConnectHandler::new(
390            trusted_stream,
391            task_source.into(),
392            chunk_request_sender.clone(),
393            in_memory,
394            source,
395        );
396
397        ROUTER.add_typed_route(
398            chunk_request_receiver,
399            Box::new(move |message| {
400                match message.unwrap() {
401                    BodyChunkRequest::Connect(sender) => {
402                        body_handler.start_reading(sender);
403                    },
404                    BodyChunkRequest::Extract(receiver) => {
405                        body_handler.re_extract(receiver);
406                    },
407                    BodyChunkRequest::Chunk => body_handler.transmit_body_chunk(),
408                    // Note: this is actually sent from this process
409                    // by the TransmitBodyPromiseHandler when reading stops.
410                    BodyChunkRequest::Done => {
411                        body_handler.stop_reading(StopReading::Done);
412                    },
413                    // Note: this is actually sent from this process
414                    // by the TransmitBodyPromiseHandler when the stream errors.
415                    BodyChunkRequest::Error => {
416                        body_handler.stop_reading(StopReading::Error);
417                    },
418                }
419            }),
420        );
421
422        // Return `components::net` view into this request body,
423        // which can be used by `net` to transmit it over the network.
424        let request_body = RequestBody::new(chunk_request_sender, net_source, total_bytes);
425
426        // Also return the stream for this body, which can be used by script to consume it.
427        (request_body, stream)
428    }
429
430    /// Is the data of the stream of this extracted body available in memory?
431    pub(crate) fn in_memory(&self) -> bool {
432        self.stream.in_memory()
433    }
434}
435
436/// <https://fetch.spec.whatwg.org/#concept-bodyinit-extract>
437pub(crate) trait Extractable {
438    fn extract(&self, global: &GlobalScope, can_gc: CanGc) -> Fallible<ExtractedBody>;
439}
440
441impl Extractable for BodyInit {
442    // https://fetch.spec.whatwg.org/#concept-bodyinit-extract
443    fn extract(&self, global: &GlobalScope, can_gc: CanGc) -> Fallible<ExtractedBody> {
444        match self {
445            BodyInit::String(s) => s.extract(global, can_gc),
446            BodyInit::URLSearchParams(usp) => usp.extract(global, can_gc),
447            BodyInit::Blob(b) => b.extract(global, can_gc),
448            BodyInit::FormData(formdata) => formdata.extract(global, can_gc),
449            BodyInit::ArrayBuffer(typedarray) => {
450                let bytes = typedarray.to_vec();
451                let total_bytes = bytes.len();
452                let stream = ReadableStream::new_from_bytes(global, bytes, can_gc)?;
453                Ok(ExtractedBody {
454                    stream,
455                    total_bytes: Some(total_bytes),
456                    content_type: None,
457                    source: BodySource::Object,
458                })
459            },
460            BodyInit::ArrayBufferView(typedarray) => {
461                let bytes = typedarray.to_vec();
462                let total_bytes = bytes.len();
463                let stream = ReadableStream::new_from_bytes(global, bytes, can_gc)?;
464                Ok(ExtractedBody {
465                    stream,
466                    total_bytes: Some(total_bytes),
467                    content_type: None,
468                    source: BodySource::Object,
469                })
470            },
471            BodyInit::ReadableStream(stream) => {
472                // TODO:
473                // 1. If the keepalive flag is set, then throw a TypeError.
474
475                if stream.is_locked() || stream.is_disturbed() {
476                    return Err(Error::Type(
477                        "The body's stream is disturbed or locked".to_string(),
478                    ));
479                }
480
481                Ok(ExtractedBody {
482                    stream: stream.clone(),
483                    total_bytes: None,
484                    content_type: None,
485                    source: BodySource::Null,
486                })
487            },
488        }
489    }
490}
491
492impl Extractable for Vec<u8> {
493    fn extract(&self, global: &GlobalScope, can_gc: CanGc) -> Fallible<ExtractedBody> {
494        let bytes = self.clone();
495        let total_bytes = self.len();
496        let stream = ReadableStream::new_from_bytes(global, bytes, can_gc)?;
497        Ok(ExtractedBody {
498            stream,
499            total_bytes: Some(total_bytes),
500            content_type: None,
501            // A vec is used only in `submit_entity_body`.
502            source: BodySource::Object,
503        })
504    }
505}
506
507impl Extractable for Blob {
508    fn extract(&self, _global: &GlobalScope, can_gc: CanGc) -> Fallible<ExtractedBody> {
509        let blob_type = self.Type();
510        let content_type = if blob_type.as_ref().is_empty() {
511            None
512        } else {
513            Some(blob_type)
514        };
515        let total_bytes = self.Size() as usize;
516        let stream = self.get_stream(can_gc)?;
517        Ok(ExtractedBody {
518            stream,
519            total_bytes: Some(total_bytes),
520            content_type,
521            source: BodySource::Object,
522        })
523    }
524}
525
526impl Extractable for DOMString {
527    fn extract(&self, global: &GlobalScope, can_gc: CanGc) -> Fallible<ExtractedBody> {
528        let bytes = self.as_bytes().to_owned();
529        let total_bytes = bytes.len();
530        let content_type = Some(DOMString::from("text/plain;charset=UTF-8"));
531        let stream = ReadableStream::new_from_bytes(global, bytes, can_gc)?;
532        Ok(ExtractedBody {
533            stream,
534            total_bytes: Some(total_bytes),
535            content_type,
536            source: BodySource::Object,
537        })
538    }
539}
540
541impl Extractable for FormData {
542    fn extract(&self, global: &GlobalScope, can_gc: CanGc) -> Fallible<ExtractedBody> {
543        let boundary = generate_boundary();
544        let bytes = encode_multipart_form_data(&mut self.datums(), boundary.clone(), UTF_8);
545        let total_bytes = bytes.len();
546        let content_type = Some(DOMString::from(format!(
547            "multipart/form-data; boundary={}",
548            boundary
549        )));
550        let stream = ReadableStream::new_from_bytes(global, bytes, can_gc)?;
551        Ok(ExtractedBody {
552            stream,
553            total_bytes: Some(total_bytes),
554            content_type,
555            source: BodySource::Object,
556        })
557    }
558}
559
560impl Extractable for URLSearchParams {
561    fn extract(&self, global: &GlobalScope, can_gc: CanGc) -> Fallible<ExtractedBody> {
562        let bytes = self.serialize_utf8().into_bytes();
563        let total_bytes = bytes.len();
564        let content_type = Some(DOMString::from(
565            "application/x-www-form-urlencoded;charset=UTF-8",
566        ));
567        let stream = ReadableStream::new_from_bytes(global, bytes, can_gc)?;
568        Ok(ExtractedBody {
569            stream,
570            total_bytes: Some(total_bytes),
571            content_type,
572            source: BodySource::Object,
573        })
574    }
575}
576
577#[derive(Clone, Copy, JSTraceable, MallocSizeOf)]
578pub(crate) enum BodyType {
579    Blob,
580    Bytes,
581    FormData,
582    Json,
583    Text,
584    ArrayBuffer,
585}
586
587pub(crate) enum FetchedData {
588    Text(String),
589    Json(RootedTraceableBox<Heap<JSValue>>),
590    BlobData(DomRoot<Blob>),
591    Bytes(RootedTraceableBox<Heap<*mut JSObject>>),
592    FormData(DomRoot<FormData>),
593    ArrayBuffer(RootedTraceableBox<Heap<*mut JSObject>>),
594    JSException(RootedTraceableBox<Heap<JSVal>>),
595}
596
597/// <https://fetch.spec.whatwg.org/#concept-body-consume-body>
598/// <https://fetch.spec.whatwg.org/#body-fully-read>
599/// A combination of parts of both algorithms,
600/// `body-fully-read` can be fully implemented, and separated, later,
601/// see #36049.
602#[cfg_attr(crown, allow(crown::unrooted_must_root))]
603pub(crate) fn consume_body<T: BodyMixin + DomObject>(
604    object: &T,
605    body_type: BodyType,
606    can_gc: CanGc,
607) -> Rc<Promise> {
608    let global = object.global();
609    let cx = GlobalScope::get_cx();
610
611    // Enter the realm of the object whose body is being consumed.
612    let realm = enter_realm(&*global);
613    let comp = InRealm::Entered(&realm);
614
615    // Let promise be a new promise.
616    // Note: re-ordered so we can return the promise below.
617    let promise = Promise::new_in_current_realm(comp, can_gc);
618
619    // If object is unusable, then return a promise rejected with a TypeError.
620    if object.is_disturbed() || object.is_locked() {
621        promise.reject_error(
622            Error::Type("The body's stream is disturbed or locked".to_string()),
623            can_gc,
624        );
625        return promise;
626    }
627
628    let stream = match object.body() {
629        Some(stream) => stream,
630        None => {
631            // If object’s body is null, then run successSteps with an empty byte sequence.
632            resolve_result_promise(
633                body_type,
634                &promise,
635                object.get_mime_type(can_gc),
636                Vec::with_capacity(0),
637                cx,
638                can_gc,
639            );
640            return promise;
641        },
642    };
643
644    // Note: from `fully_read`.
645    // Let reader be the result of getting a reader for body’s stream.
646    // If that threw an exception,
647    // then run errorSteps with that exception and return.
648    let reader = match stream.acquire_default_reader(can_gc) {
649        Ok(r) => r,
650        Err(e) => {
651            promise.reject_error(e, can_gc);
652            return promise;
653        },
654    };
655
656    // Let errorSteps given error be to reject promise with error.
657    let error_promise = promise.clone();
658
659    // Let successSteps given a byte sequence data be to resolve promise
660    // with the result of running convertBytesToJSValue with data.
661    // If that threw an exception, then run errorSteps with that exception.
662    let mime_type = object.get_mime_type(can_gc);
663    let success_promise = promise.clone();
664
665    // Read all bytes from reader, given successSteps and errorSteps.
666    // Note: spec uses an intermediary concept of `fully_read`,
667    // which seems useful when invoking fetch from other places.
668    // TODO: #36049
669    reader.read_all_bytes(
670        cx,
671        &global,
672        Rc::new(move |bytes: &[u8]| {
673            resolve_result_promise(
674                body_type,
675                &success_promise,
676                mime_type.clone(),
677                bytes.to_vec(),
678                cx,
679                can_gc,
680            );
681        }),
682        Rc::new(move |cx, v| {
683            error_promise.reject(cx, v, can_gc);
684        }),
685        comp,
686        can_gc,
687    );
688
689    promise
690}
691
692/// The success steps of
693/// <https://fetch.spec.whatwg.org/#concept-body-consume-body>.
694fn resolve_result_promise(
695    body_type: BodyType,
696    promise: &Promise,
697    mime_type: Vec<u8>,
698    body: Vec<u8>,
699    cx: JSContext,
700    can_gc: CanGc,
701) {
702    let pkg_data_results = run_package_data_algorithm(cx, body, body_type, mime_type, can_gc);
703
704    match pkg_data_results {
705        Ok(results) => {
706            match results {
707                FetchedData::Text(s) => promise.resolve_native(&USVString(s), can_gc),
708                FetchedData::Json(j) => promise.resolve_native(&j, can_gc),
709                FetchedData::BlobData(b) => promise.resolve_native(&b, can_gc),
710                FetchedData::FormData(f) => promise.resolve_native(&f, can_gc),
711                FetchedData::Bytes(b) => promise.resolve_native(&b, can_gc),
712                FetchedData::ArrayBuffer(a) => promise.resolve_native(&a, can_gc),
713                FetchedData::JSException(e) => promise.reject_native(&e.handle(), can_gc),
714            };
715        },
716        Err(err) => promise.reject_error(err, can_gc),
717    }
718}
719
720/// The algorithm that takes a byte sequence
721/// and returns a JavaScript value or throws an exception of
722/// <https://fetch.spec.whatwg.org/#concept-body-consume-body>.
723fn run_package_data_algorithm(
724    cx: JSContext,
725    bytes: Vec<u8>,
726    body_type: BodyType,
727    mime_type: Vec<u8>,
728    can_gc: CanGc,
729) -> Fallible<FetchedData> {
730    let mime = &*mime_type;
731    let in_realm_proof = AlreadyInRealm::assert_for_cx(cx);
732    let global = GlobalScope::from_safe_context(cx, InRealm::Already(&in_realm_proof));
733    match body_type {
734        BodyType::Text => run_text_data_algorithm(bytes),
735        BodyType::Json => run_json_data_algorithm(cx, bytes),
736        BodyType::Blob => run_blob_data_algorithm(&global, bytes, mime, can_gc),
737        BodyType::FormData => run_form_data_algorithm(&global, bytes, mime, can_gc),
738        BodyType::ArrayBuffer => run_array_buffer_data_algorithm(cx, bytes, can_gc),
739        BodyType::Bytes => run_bytes_data_algorithm(cx, bytes, can_gc),
740    }
741}
742
743/// <https://fetch.spec.whatwg.org/#ref-for-concept-body-consume-body%E2%91%A4>
744fn run_text_data_algorithm(bytes: Vec<u8>) -> Fallible<FetchedData> {
745    // This implements the Encoding standard's "decode UTF-8", which removes the
746    // BOM if present.
747    let no_bom_bytes = if bytes.starts_with(b"\xEF\xBB\xBF") {
748        &bytes[3..]
749    } else {
750        &bytes
751    };
752    Ok(FetchedData::Text(
753        String::from_utf8_lossy(no_bom_bytes).into_owned(),
754    ))
755}
756
757#[allow(unsafe_code)]
758/// <https://fetch.spec.whatwg.org/#ref-for-concept-body-consume-body%E2%91%A3>
759fn run_json_data_algorithm(cx: JSContext, bytes: Vec<u8>) -> Fallible<FetchedData> {
760    // The JSON spec allows implementations to either ignore UTF-8 BOM or treat it as an error.
761    // `JS_ParseJSON` treats this as an error, so it is necessary for us to strip it if present.
762    //
763    // https://datatracker.ietf.org/doc/html/rfc8259#section-8.1
764    let json_text = decode_to_utf16_with_bom_removal(&bytes, UTF_8);
765    rooted!(in(*cx) let mut rval = UndefinedValue());
766    unsafe {
767        if !JS_ParseJSON(
768            *cx,
769            json_text.as_ptr(),
770            json_text.len() as u32,
771            rval.handle_mut(),
772        ) {
773            rooted!(in(*cx) let mut exception = UndefinedValue());
774            assert!(JS_GetPendingException(*cx, exception.handle_mut()));
775            JS_ClearPendingException(*cx);
776            return Ok(FetchedData::JSException(RootedTraceableBox::from_box(
777                Heap::boxed(exception.get()),
778            )));
779        }
780        let rooted_heap = RootedTraceableBox::from_box(Heap::boxed(rval.get()));
781        Ok(FetchedData::Json(rooted_heap))
782    }
783}
784
785/// <https://fetch.spec.whatwg.org/#ref-for-concept-body-consume-body%E2%91%A0>
786fn run_blob_data_algorithm(
787    root: &GlobalScope,
788    bytes: Vec<u8>,
789    mime: &[u8],
790    can_gc: CanGc,
791) -> Fallible<FetchedData> {
792    let mime_string = if let Ok(s) = String::from_utf8(mime.to_vec()) {
793        s
794    } else {
795        "".to_string()
796    };
797    let blob = Blob::new(
798        root,
799        BlobImpl::new_from_bytes(bytes, normalize_type_string(&mime_string)),
800        can_gc,
801    );
802    Ok(FetchedData::BlobData(blob))
803}
804
805/// <https://fetch.spec.whatwg.org/#ref-for-concept-body-consume-body%E2%91%A2>
806fn run_form_data_algorithm(
807    root: &GlobalScope,
808    bytes: Vec<u8>,
809    mime: &[u8],
810    can_gc: CanGc,
811) -> Fallible<FetchedData> {
812    let mime_str = str::from_utf8(mime).unwrap_or_default();
813    let mime: Mime = mime_str
814        .parse()
815        .map_err(|_| Error::Type("Inappropriate MIME-type for Body".to_string()))?;
816
817    // TODO
818    // ... Parser for Mime(TopLevel::Multipart, SubLevel::FormData, _)
819    // ... is not fully determined yet.
820    if mime.type_() == mime::APPLICATION && mime.subtype() == mime::WWW_FORM_URLENCODED {
821        let entries = form_urlencoded::parse(&bytes);
822        let formdata = FormData::new(None, root, can_gc);
823        for (k, e) in entries {
824            formdata.Append(USVString(k.into_owned()), USVString(e.into_owned()));
825        }
826        return Ok(FetchedData::FormData(formdata));
827    }
828
829    Err(Error::Type("Inappropriate MIME-type for Body".to_string()))
830}
831
832/// <https://fetch.spec.whatwg.org/#ref-for-concept-body-consume-body%E2%91%A1>
833fn run_bytes_data_algorithm(cx: JSContext, bytes: Vec<u8>, can_gc: CanGc) -> Fallible<FetchedData> {
834    rooted!(in(*cx) let mut array_buffer_ptr = ptr::null_mut::<JSObject>());
835
836    create_buffer_source::<Uint8>(cx, &bytes, array_buffer_ptr.handle_mut(), can_gc)
837        .map_err(|_| Error::JSFailed)?;
838
839    let rooted_heap = RootedTraceableBox::from_box(Heap::boxed(array_buffer_ptr.get()));
840    Ok(FetchedData::Bytes(rooted_heap))
841}
842
843/// <https://fetch.spec.whatwg.org/#ref-for-concept-body-consume-body>
844pub(crate) fn run_array_buffer_data_algorithm(
845    cx: JSContext,
846    bytes: Vec<u8>,
847    can_gc: CanGc,
848) -> Fallible<FetchedData> {
849    rooted!(in(*cx) let mut array_buffer_ptr = ptr::null_mut::<JSObject>());
850
851    create_buffer_source::<ArrayBufferU8>(cx, &bytes, array_buffer_ptr.handle_mut(), can_gc)
852        .map_err(|_| Error::JSFailed)?;
853
854    let rooted_heap = RootedTraceableBox::from_box(Heap::boxed(array_buffer_ptr.get()));
855    Ok(FetchedData::ArrayBuffer(rooted_heap))
856}
857
858#[allow(unsafe_code)]
859pub(crate) fn decode_to_utf16_with_bom_removal(
860    bytes: &[u8],
861    encoding: &'static Encoding,
862) -> Vec<u16> {
863    let mut decoder = encoding.new_decoder_with_bom_removal();
864    let capacity = decoder
865        .max_utf16_buffer_length(bytes.len())
866        .expect("Overflow");
867    let mut utf16 = Vec::with_capacity(capacity);
868    let extra = unsafe { slice::from_raw_parts_mut(utf16.as_mut_ptr(), capacity) };
869    let (_, read, written, _) = decoder.decode_to_utf16(bytes, extra, true);
870    assert_eq!(read, bytes.len());
871    unsafe { utf16.set_len(written) }
872    utf16
873}
874
875/// <https://fetch.spec.whatwg.org/#body>
876pub(crate) trait BodyMixin {
877    /// <https://fetch.spec.whatwg.org/#concept-body-disturbed>
878    fn is_disturbed(&self) -> bool;
879    /// <https://fetch.spec.whatwg.org/#dom-body-body>
880    fn body(&self) -> Option<DomRoot<ReadableStream>>;
881    /// <https://fetch.spec.whatwg.org/#concept-body-locked>
882    fn is_locked(&self) -> bool;
883    /// <https://fetch.spec.whatwg.org/#concept-body-mime-type>
884    fn get_mime_type(&self, can_gc: CanGc) -> Vec<u8>;
885}