script/
body.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
4
5use std::rc::Rc;
6use std::{ptr, slice, str};
7
8use base::generic_channel::GenericSharedMemory;
9use constellation_traits::BlobImpl;
10use encoding_rs::{Encoding, UTF_8};
11use ipc_channel::ipc::{self, IpcReceiver, IpcSender};
12use ipc_channel::router::ROUTER;
13use js::jsapi::{Heap, JS_ClearPendingException, JSObject, Value as JSValue};
14use js::jsval::{JSVal, UndefinedValue};
15use js::realm::CurrentRealm;
16use js::rust::HandleValue;
17use js::rust::wrappers::{JS_GetPendingException, JS_ParseJSON};
18use js::typedarray::{ArrayBufferU8, Uint8};
19use mime::{self, Mime};
20use net_traits::request::{
21    BodyChunkRequest, BodyChunkResponse, BodySource as NetBodySource, RequestBody,
22};
23use url::form_urlencoded;
24
25use crate::dom::bindings::buffer_source::create_buffer_source;
26use crate::dom::bindings::codegen::Bindings::BlobBinding::Blob_Binding::BlobMethods;
27use crate::dom::bindings::codegen::Bindings::FormDataBinding::FormDataMethods;
28use crate::dom::bindings::codegen::Bindings::XMLHttpRequestBinding::BodyInit;
29use crate::dom::bindings::error::{Error, Fallible};
30use crate::dom::bindings::refcounted::Trusted;
31use crate::dom::bindings::reflector::{DomGlobal, DomObject};
32use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom};
33use crate::dom::bindings::str::{DOMString, USVString};
34use crate::dom::bindings::trace::RootedTraceableBox;
35use crate::dom::blob::{Blob, normalize_type_string};
36use crate::dom::formdata::FormData;
37use crate::dom::globalscope::GlobalScope;
38use crate::dom::html::htmlformelement::{encode_multipart_form_data, generate_boundary};
39use crate::dom::promise::Promise;
40use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
41use crate::dom::readablestream::{ReadableStream, get_read_promise_bytes, get_read_promise_done};
42use crate::dom::urlsearchparams::URLSearchParams;
43use crate::realms::{AlreadyInRealm, InRealm, enter_realm};
44use crate::script_runtime::{CanGc, JSContext};
45use crate::task_source::SendableTaskSource;
46
47/// <https://fetch.spec.whatwg.org/#concept-body-clone>
48pub(crate) fn clone_body_stream_for_dom_body(
49    original_body_stream: &MutNullableDom<ReadableStream>,
50    cloned_body_stream: &MutNullableDom<ReadableStream>,
51    can_gc: CanGc,
52) -> Fallible<()> {
53    // To clone a body *body*, run these steps:
54
55    let Some(stream) = original_body_stream.get() else {
56        return Ok(());
57    };
58
59    // step 1. Let « out1, out2 » be the result of teeing body’s stream.
60    let branches = stream.tee(true, can_gc)?;
61    let out1 = &*branches[0];
62    let out2 = &*branches[1];
63
64    // step 2. Set body’s stream to out1.
65    // step 3. Return a body whose stream is out2 and other members are copied from body.
66    original_body_stream.set(Some(out1));
67    cloned_body_stream.set(Some(out2));
68
69    Ok(())
70}
71
72/// The Dom object, or ReadableStream, that is the source of a body.
73/// <https://fetch.spec.whatwg.org/#concept-body-source>
74#[derive(Clone, PartialEq)]
75pub(crate) enum BodySource {
76    /// A ReadableStream comes with a null-source.
77    Null,
78    /// Another Dom object as source,
79    /// TODO: store the actual object
80    /// and re-extract a stream on re-direct.
81    Object,
82}
83
84/// The reason to stop reading from the body.
85enum StopReading {
86    /// The stream has errored.
87    Error,
88    /// The stream is done.
89    Done,
90}
91
92/// The IPC route handler
93/// for <https://fetch.spec.whatwg.org/#concept-request-transmit-body>.
94/// This route runs in the script process,
95/// and will queue tasks to perform operations
96/// on the stream and transmit body chunks over IPC.
97#[derive(Clone)]
98struct TransmitBodyConnectHandler {
99    stream: Trusted<ReadableStream>,
100    task_source: SendableTaskSource,
101    bytes_sender: Option<IpcSender<BodyChunkResponse>>,
102    control_sender: Option<IpcSender<BodyChunkRequest>>,
103    in_memory: Option<GenericSharedMemory>,
104    in_memory_done: bool,
105    source: BodySource,
106}
107
108impl TransmitBodyConnectHandler {
109    pub(crate) fn new(
110        stream: Trusted<ReadableStream>,
111        task_source: SendableTaskSource,
112        control_sender: IpcSender<BodyChunkRequest>,
113        in_memory: Option<GenericSharedMemory>,
114        source: BodySource,
115    ) -> TransmitBodyConnectHandler {
116        TransmitBodyConnectHandler {
117            stream,
118            task_source,
119            bytes_sender: None,
120            control_sender: Some(control_sender),
121            in_memory,
122            in_memory_done: false,
123            source,
124        }
125    }
126
127    /// Reset `in_memory_done`, called when a stream is
128    /// re-extracted from the source to support a re-direct.
129    pub(crate) fn reset_in_memory_done(&mut self) {
130        self.in_memory_done = false;
131    }
132
133    /// Re-extract the source to support streaming it again for a re-direct.
134    /// TODO: actually re-extract the source, instead of just cloning data, to support Blob.
135    fn re_extract(&mut self, chunk_request_receiver: IpcReceiver<BodyChunkRequest>) {
136        let mut body_handler = self.clone();
137        body_handler.reset_in_memory_done();
138
139        ROUTER.add_typed_route(
140            chunk_request_receiver,
141            Box::new(move |message| {
142                let request = message.unwrap();
143                match request {
144                    BodyChunkRequest::Connect(sender) => {
145                        body_handler.start_reading(sender);
146                    },
147                    BodyChunkRequest::Extract(receiver) => {
148                        body_handler.re_extract(receiver);
149                    },
150                    BodyChunkRequest::Chunk => body_handler.transmit_source(),
151                    // Note: this is actually sent from this process
152                    // by the TransmitBodyPromiseHandler when reading stops.
153                    BodyChunkRequest::Done => {
154                        body_handler.stop_reading(StopReading::Done);
155                    },
156                    // Note: this is actually sent from this process
157                    // by the TransmitBodyPromiseHandler when the stream errors.
158                    BodyChunkRequest::Error => {
159                        body_handler.stop_reading(StopReading::Error);
160                    },
161                }
162            }),
163        );
164    }
165
166    /// In case of re-direct, and of a source available in memory,
167    /// send it all in one chunk.
168    ///
169    /// TODO: this method should be deprecated
170    /// in favor of making `re_extract` actually re-extract a stream from the source.
171    /// See #26686
172    fn transmit_source(&mut self) {
173        if self.in_memory_done {
174            // Step 5.1.3
175            self.stop_reading(StopReading::Done);
176            return;
177        }
178
179        if let BodySource::Null = self.source {
180            panic!("ReadableStream(Null) sources should not re-direct.");
181        }
182
183        if let Some(bytes) = self.in_memory.clone() {
184            // The memoized bytes are sent so we mark it as done again
185            self.in_memory_done = true;
186            let _ = self
187                .bytes_sender
188                .as_ref()
189                .expect("No bytes sender to transmit source.")
190                .send(BodyChunkResponse::Chunk(bytes));
191            return;
192        }
193        warn!("Re-directs for file-based Blobs not supported yet.");
194    }
195
196    /// Take the IPC sender sent by `net`, so we can send body chunks with it.
197    /// Also the entry point to <https://fetch.spec.whatwg.org/#concept-request-transmit-body>
198    fn start_reading(&mut self, sender: IpcSender<BodyChunkResponse>) {
199        self.bytes_sender = Some(sender);
200
201        // If we're using an actual ReadableStream, acquire a reader for it.
202        if self.source == BodySource::Null {
203            let stream = self.stream.clone();
204            self.task_source
205                .queue(task!(start_reading_request_body_stream: move || {
206                    // Step 1, Let body be request’s body.
207                    let rooted_stream = stream.root();
208
209                    // TODO: Step 2, If body is null.
210
211                    // Step 3, get a reader for stream.
212                    rooted_stream.acquire_default_reader(CanGc::note())
213                        .expect("Couldn't acquire a reader for the body stream.");
214
215                    // Note: this algorithm continues when the first chunk is requested by `net`.
216                }));
217        }
218    }
219
220    /// Drop the IPC sender sent by `net`
221    /// It is important to drop the control_sender as this will allow us to clean ourselves up.
222    /// Otherwise, the following cycle will happen: The control sender is owned by us which keeps the control receiver
223    /// alive in the router which keeps us alive.
224    fn stop_reading(&mut self, reason: StopReading) {
225        let bytes_sender = self
226            .bytes_sender
227            .take()
228            .expect("Stop reading called multiple times on TransmitBodyConnectHandler.");
229        match reason {
230            StopReading::Error => {
231                let _ = bytes_sender.send(BodyChunkResponse::Error);
232            },
233            StopReading::Done => {
234                let _ = bytes_sender.send(BodyChunkResponse::Done);
235            },
236        }
237        let _ = self.control_sender.take();
238    }
239
240    /// Step 4 and following of <https://fetch.spec.whatwg.org/#concept-request-transmit-body>
241    fn transmit_body_chunk(&mut self) {
242        if self.in_memory_done {
243            // Step 5.1.3
244            self.stop_reading(StopReading::Done);
245            return;
246        }
247
248        let stream = self.stream.clone();
249        let control_sender = self.control_sender.clone();
250        let bytes_sender = self
251            .bytes_sender
252            .clone()
253            .expect("No bytes sender to transmit chunk.");
254
255        // In case of the data being in-memory, send everything in one chunk, by-passing SpiderMonkey.
256        if let Some(bytes) = self.in_memory.clone() {
257            let _ = bytes_sender.send(BodyChunkResponse::Chunk(bytes));
258            // Mark this body as `done` so that we can stop reading in the next tick,
259            // matching the behavior of the promise-based flow
260            self.in_memory_done = true;
261            return;
262        }
263
264        self.task_source.queue(
265            task!(setup_native_body_promise_handler: move || {
266                let rooted_stream = stream.root();
267                let global = rooted_stream.global();
268                let cx = GlobalScope::get_cx();
269
270                // Step 4, the result of reading a chunk from body’s stream with reader.
271                let promise = rooted_stream.read_a_chunk(CanGc::note());
272
273                // Step 5, the parallel steps waiting for and handling the result of the read promise,
274                // are a combination of the promise native handler here,
275                // and the corresponding IPC route in `component::net::http_loader`.
276                rooted!(in(*cx) let mut promise_handler = Some(TransmitBodyPromiseHandler {
277                    bytes_sender: bytes_sender.clone(),
278                    stream: Dom::from_ref(&rooted_stream.clone()),
279                    control_sender: control_sender.clone().unwrap(),
280                }));
281
282                rooted!(in(*cx) let mut rejection_handler = Some(TransmitBodyPromiseRejectionHandler {
283                    bytes_sender,
284                    stream: Dom::from_ref(&rooted_stream.clone()),
285                    control_sender: control_sender.unwrap(),
286                }));
287
288                let handler =
289                    PromiseNativeHandler::new(&global, promise_handler.take().map(|h| Box::new(h) as Box<_>), rejection_handler.take().map(|h| Box::new(h) as Box<_>), CanGc::note());
290
291                let realm = enter_realm(&*global);
292                let comp = InRealm::Entered(&realm);
293                promise.append_native_handler(&handler, comp, CanGc::note());
294            })
295        );
296    }
297}
298
299/// The handler of read promises of body streams used in
300/// <https://fetch.spec.whatwg.org/#concept-request-transmit-body>.
301#[derive(Clone, JSTraceable, MallocSizeOf)]
302#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
303struct TransmitBodyPromiseHandler {
304    #[ignore_malloc_size_of = "Channels are hard"]
305    #[no_trace]
306    bytes_sender: IpcSender<BodyChunkResponse>,
307    stream: Dom<ReadableStream>,
308    #[ignore_malloc_size_of = "Channels are hard"]
309    #[no_trace]
310    control_sender: IpcSender<BodyChunkRequest>,
311}
312
313impl js::gc::Rootable for TransmitBodyPromiseHandler {}
314
315impl Callback for TransmitBodyPromiseHandler {
316    /// Step 5 of <https://fetch.spec.whatwg.org/#concept-request-transmit-body>
317    fn callback(&self, cx: &mut CurrentRealm, v: HandleValue) {
318        let can_gc = CanGc::from_cx(cx);
319        let _realm = InRealm::Already(&cx.into());
320        let cx = cx.into();
321        let is_done = match get_read_promise_done(cx, &v, can_gc) {
322            Ok(is_done) => is_done,
323            Err(_) => {
324                // Step 5.5, the "otherwise" steps.
325                // TODO: terminate fetch.
326                let _ = self.control_sender.send(BodyChunkRequest::Done);
327                return self.stream.stop_reading(can_gc);
328            },
329        };
330
331        if is_done {
332            // Step 5.3, the "done" steps.
333            // TODO: queue a fetch task on request to process request end-of-body.
334            let _ = self.control_sender.send(BodyChunkRequest::Done);
335            return self.stream.stop_reading(can_gc);
336        }
337
338        let chunk = match get_read_promise_bytes(cx, &v, can_gc) {
339            Ok(chunk) => chunk,
340            Err(_) => {
341                // Step 5.5, the "otherwise" steps.
342                let _ = self.control_sender.send(BodyChunkRequest::Error);
343                return self.stream.stop_reading(can_gc);
344            },
345        };
346
347        // Step 5.1 and 5.2, transmit chunk.
348        // Send the chunk to the body transmitter in net::http_loader::obtain_response.
349        // TODO: queue a fetch task on request to process request body for request.
350        let _ = self
351            .bytes_sender
352            .send(BodyChunkResponse::Chunk(GenericSharedMemory::from_bytes(
353                &chunk,
354            )));
355    }
356}
357
358/// The handler of read promises rejection of body streams used in
359/// <https://fetch.spec.whatwg.org/#concept-request-transmit-body>.
360#[derive(Clone, JSTraceable, MallocSizeOf)]
361#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
362struct TransmitBodyPromiseRejectionHandler {
363    #[ignore_malloc_size_of = "Channels are hard"]
364    #[no_trace]
365    bytes_sender: IpcSender<BodyChunkResponse>,
366    stream: Dom<ReadableStream>,
367    #[ignore_malloc_size_of = "Channels are hard"]
368    #[no_trace]
369    control_sender: IpcSender<BodyChunkRequest>,
370}
371
372impl js::gc::Rootable for TransmitBodyPromiseRejectionHandler {}
373
374impl Callback for TransmitBodyPromiseRejectionHandler {
375    /// <https://fetch.spec.whatwg.org/#concept-request-transmit-body>
376    fn callback(&self, cx: &mut CurrentRealm, _v: HandleValue) {
377        // Step 5.4, the "rejection" steps.
378        let _ = self.control_sender.send(BodyChunkRequest::Error);
379        self.stream.stop_reading(CanGc::from_cx(cx));
380    }
381}
382
383/// <https://fetch.spec.whatwg.org/#body-with-type>
384pub(crate) struct ExtractedBody {
385    /// <https://fetch.spec.whatwg.org/#concept-body-stream>
386    pub(crate) stream: DomRoot<ReadableStream>,
387    /// <https://fetch.spec.whatwg.org/#concept-body-source>
388    pub(crate) source: BodySource,
389    /// <https://fetch.spec.whatwg.org/#concept-body-total-bytes>
390    pub(crate) total_bytes: Option<usize>,
391    /// <https://fetch.spec.whatwg.org/#body-with-type-type>
392    pub(crate) content_type: Option<DOMString>,
393}
394
395impl ExtractedBody {
396    /// Build a request body from the extracted body,
397    /// to be sent over IPC to net to use with `concept-request-transmit-body`,
398    /// see <https://fetch.spec.whatwg.org/#concept-request-transmit-body>.
399    ///
400    /// Also returning the corresponding readable stream,
401    /// to be stored on the request in script,
402    /// and potentially used as part of `consume_body`,
403    /// see <https://fetch.spec.whatwg.org/#concept-body-consume-body>
404    ///
405    /// Transmitting a body over fetch, and consuming it in script,
406    /// are mutually exclusive operations, since each will lock the stream to a reader.
407    pub(crate) fn into_net_request_body(self) -> (RequestBody, DomRoot<ReadableStream>) {
408        let ExtractedBody {
409            stream,
410            total_bytes,
411            content_type: _,
412            source,
413        } = self;
414
415        // First, setup some infra to be used to transmit body
416        //  from `components::script` to `components::net`.
417        let (chunk_request_sender, chunk_request_receiver) = ipc::channel().unwrap();
418
419        let trusted_stream = Trusted::new(&*stream);
420
421        let global = stream.global();
422        let task_source = global.task_manager().networking_task_source();
423
424        // In case of the data being in-memory, send everything in one chunk, by-passing SM.
425        let in_memory = stream.get_in_memory_bytes();
426
427        let net_source = match source {
428            BodySource::Null => NetBodySource::Null,
429            _ => NetBodySource::Object,
430        };
431
432        let mut body_handler = TransmitBodyConnectHandler::new(
433            trusted_stream,
434            task_source.into(),
435            chunk_request_sender.clone(),
436            in_memory,
437            source,
438        );
439
440        ROUTER.add_typed_route(
441            chunk_request_receiver,
442            Box::new(move |message| {
443                match message.unwrap() {
444                    BodyChunkRequest::Connect(sender) => {
445                        body_handler.start_reading(sender);
446                    },
447                    BodyChunkRequest::Extract(receiver) => {
448                        body_handler.re_extract(receiver);
449                    },
450                    BodyChunkRequest::Chunk => body_handler.transmit_body_chunk(),
451                    // Note: this is actually sent from this process
452                    // by the TransmitBodyPromiseHandler when reading stops.
453                    BodyChunkRequest::Done => {
454                        body_handler.stop_reading(StopReading::Done);
455                    },
456                    // Note: this is actually sent from this process
457                    // by the TransmitBodyPromiseHandler when the stream errors.
458                    BodyChunkRequest::Error => {
459                        body_handler.stop_reading(StopReading::Error);
460                    },
461                }
462            }),
463        );
464
465        // Return `components::net` view into this request body,
466        // which can be used by `net` to transmit it over the network.
467        let request_body = RequestBody::new(chunk_request_sender, net_source, total_bytes);
468
469        // Also return the stream for this body, which can be used by script to consume it.
470        (request_body, stream)
471    }
472
473    /// Is the data of the stream of this extracted body available in memory?
474    pub(crate) fn in_memory(&self) -> bool {
475        self.stream.in_memory()
476    }
477}
478
479/// <https://fetch.spec.whatwg.org/#concept-bodyinit-extract>
480pub(crate) trait Extractable {
481    fn extract(
482        &self,
483        global: &GlobalScope,
484        keep_alive: bool,
485        can_gc: CanGc,
486    ) -> Fallible<ExtractedBody>;
487}
488
489impl Extractable for BodyInit {
490    /// <https://fetch.spec.whatwg.org/#concept-bodyinit-extract>
491    fn extract(
492        &self,
493        global: &GlobalScope,
494        keep_alive: bool,
495        can_gc: CanGc,
496    ) -> Fallible<ExtractedBody> {
497        match self {
498            BodyInit::String(s) => s.extract(global, keep_alive, can_gc),
499            BodyInit::URLSearchParams(usp) => usp.extract(global, keep_alive, can_gc),
500            BodyInit::Blob(b) => b.extract(global, keep_alive, can_gc),
501            BodyInit::FormData(formdata) => formdata.extract(global, keep_alive, can_gc),
502            BodyInit::ArrayBuffer(typedarray) => {
503                let bytes = typedarray.to_vec();
504                let total_bytes = bytes.len();
505                let stream = ReadableStream::new_from_bytes(global, bytes, can_gc)?;
506                Ok(ExtractedBody {
507                    stream,
508                    total_bytes: Some(total_bytes),
509                    content_type: None,
510                    source: BodySource::Object,
511                })
512            },
513            BodyInit::ArrayBufferView(typedarray) => {
514                let bytes = typedarray.to_vec();
515                let total_bytes = bytes.len();
516                let stream = ReadableStream::new_from_bytes(global, bytes, can_gc)?;
517                Ok(ExtractedBody {
518                    stream,
519                    total_bytes: Some(total_bytes),
520                    content_type: None,
521                    source: BodySource::Object,
522                })
523            },
524            BodyInit::ReadableStream(stream) => {
525                // If keepalive is true, then throw a TypeError.
526                if keep_alive {
527                    return Err(Error::Type(
528                        "The body's stream is for a keepalive request".to_string(),
529                    ));
530                }
531                // If object is disturbed or locked, then throw a TypeError.
532                if stream.is_locked() || stream.is_disturbed() {
533                    return Err(Error::Type(
534                        "The body's stream is disturbed or locked".to_string(),
535                    ));
536                }
537
538                Ok(ExtractedBody {
539                    stream: stream.clone(),
540                    total_bytes: None,
541                    content_type: None,
542                    source: BodySource::Null,
543                })
544            },
545        }
546    }
547}
548
549impl Extractable for Vec<u8> {
550    fn extract(
551        &self,
552        global: &GlobalScope,
553        _keep_alive: bool,
554        can_gc: CanGc,
555    ) -> Fallible<ExtractedBody> {
556        let bytes = self.clone();
557        let total_bytes = self.len();
558        let stream = ReadableStream::new_from_bytes(global, bytes, can_gc)?;
559        Ok(ExtractedBody {
560            stream,
561            total_bytes: Some(total_bytes),
562            content_type: None,
563            // A vec is used only in `submit_entity_body`.
564            source: BodySource::Object,
565        })
566    }
567}
568
569impl Extractable for Blob {
570    fn extract(
571        &self,
572        _global: &GlobalScope,
573        _keep_alive: bool,
574        can_gc: CanGc,
575    ) -> Fallible<ExtractedBody> {
576        let blob_type = self.Type();
577        let content_type = if blob_type.is_empty() {
578            None
579        } else {
580            Some(blob_type)
581        };
582        let total_bytes = self.Size() as usize;
583        let stream = self.get_stream(can_gc)?;
584        Ok(ExtractedBody {
585            stream,
586            total_bytes: Some(total_bytes),
587            content_type,
588            source: BodySource::Object,
589        })
590    }
591}
592
593impl Extractable for DOMString {
594    fn extract(
595        &self,
596        global: &GlobalScope,
597        _keep_alive: bool,
598        can_gc: CanGc,
599    ) -> Fallible<ExtractedBody> {
600        let bytes = self.as_bytes().to_owned();
601        let total_bytes = bytes.len();
602        let content_type = Some(DOMString::from("text/plain;charset=UTF-8"));
603        let stream = ReadableStream::new_from_bytes(global, bytes, can_gc)?;
604        Ok(ExtractedBody {
605            stream,
606            total_bytes: Some(total_bytes),
607            content_type,
608            source: BodySource::Object,
609        })
610    }
611}
612
613impl Extractable for FormData {
614    fn extract(
615        &self,
616        global: &GlobalScope,
617        _keep_alive: bool,
618        can_gc: CanGc,
619    ) -> Fallible<ExtractedBody> {
620        let boundary = generate_boundary();
621        let bytes = encode_multipart_form_data(&mut self.datums(), boundary.clone(), UTF_8);
622        let total_bytes = bytes.len();
623        let content_type = Some(DOMString::from(format!(
624            "multipart/form-data; boundary={}",
625            boundary
626        )));
627        let stream = ReadableStream::new_from_bytes(global, bytes, can_gc)?;
628        Ok(ExtractedBody {
629            stream,
630            total_bytes: Some(total_bytes),
631            content_type,
632            source: BodySource::Object,
633        })
634    }
635}
636
637impl Extractable for URLSearchParams {
638    fn extract(
639        &self,
640        global: &GlobalScope,
641        _keep_alive: bool,
642        can_gc: CanGc,
643    ) -> Fallible<ExtractedBody> {
644        let bytes = self.serialize_utf8().into_bytes();
645        let total_bytes = bytes.len();
646        let content_type = Some(DOMString::from(
647            "application/x-www-form-urlencoded;charset=UTF-8",
648        ));
649        let stream = ReadableStream::new_from_bytes(global, bytes, can_gc)?;
650        Ok(ExtractedBody {
651            stream,
652            total_bytes: Some(total_bytes),
653            content_type,
654            source: BodySource::Object,
655        })
656    }
657}
658
659#[derive(Clone, Copy, JSTraceable, MallocSizeOf)]
660pub(crate) enum BodyType {
661    Blob,
662    Bytes,
663    FormData,
664    Json,
665    Text,
666    ArrayBuffer,
667}
668
669pub(crate) enum FetchedData {
670    Text(String),
671    Json(RootedTraceableBox<Heap<JSValue>>),
672    BlobData(DomRoot<Blob>),
673    Bytes(RootedTraceableBox<Heap<*mut JSObject>>),
674    FormData(DomRoot<FormData>),
675    ArrayBuffer(RootedTraceableBox<Heap<*mut JSObject>>),
676    JSException(RootedTraceableBox<Heap<JSVal>>),
677}
678
679/// <https://fetch.spec.whatwg.org/#concept-body-consume-body>
680/// <https://fetch.spec.whatwg.org/#body-fully-read>
681/// A combination of parts of both algorithms,
682/// `body-fully-read` can be fully implemented, and separated, later,
683/// see #36049.
684pub(crate) fn consume_body<T: BodyMixin + DomObject>(
685    object: &T,
686    body_type: BodyType,
687    can_gc: CanGc,
688) -> Rc<Promise> {
689    let global = object.global();
690    let cx = GlobalScope::get_cx();
691
692    // Enter the realm of the object whose body is being consumed.
693    let realm = enter_realm(&*global);
694    let comp = InRealm::Entered(&realm);
695
696    // Let promise be a new promise.
697    // Note: re-ordered so we can return the promise below.
698    let promise = Promise::new_in_current_realm(comp, can_gc);
699
700    // If object is unusable, then return a promise rejected with a TypeError.
701    if object.is_unusable() {
702        promise.reject_error(
703            Error::Type("The body's stream is disturbed or locked".to_string()),
704            can_gc,
705        );
706        return promise;
707    }
708
709    let stream = match object.body() {
710        Some(stream) => stream,
711        None => {
712            // If object’s body is null, then run successSteps with an empty byte sequence.
713            resolve_result_promise(
714                body_type,
715                &promise,
716                object.get_mime_type(can_gc),
717                Vec::with_capacity(0),
718                cx,
719                can_gc,
720            );
721            return promise;
722        },
723    };
724
725    // <https://fetch.spec.whatwg.org/#concept-body-consume-body>
726    // Otherwise, fully read object’s body given successSteps, errorSteps, and object’s relevant global object.
727    //
728    // <https://fetch.spec.whatwg.org/#body-fully-read>
729    // Let reader be the result of getting a reader for body’s stream.
730    // Read all bytes from reader, given successSteps and errorSteps.
731    //
732    // <https://streams.spec.whatwg.org/#readable-stream-default-reader-read>
733    // Set stream.[[disturbed]] to true.
734    // Otherwise, if stream.[[state]] is "errored", perform readRequest’s error steps given stream.[[storedError]].
735    //
736    // If the body stream is already errored (for example, the fetch was aborted after the Response exists),
737    // the normal fully read path would reject with [[storedError]] but would also mark the stream disturbed.
738    // Once the stream is disturbed, later calls reject with TypeError ("disturbed or locked") instead of the
739    // original AbortError. This early return rejects with the same [[storedError]] without disturbing the
740    // stream, so repeated calls (for example, calling text() twice) keep rejecting with AbortError.
741    if stream.is_errored() {
742        rooted!(in(*cx) let mut stored_error = UndefinedValue());
743        stream.get_stored_error(stored_error.handle_mut());
744        promise.reject(cx, stored_error.handle(), can_gc);
745        return promise;
746    }
747
748    // Note: from `fully_read`.
749    // Let reader be the result of getting a reader for body’s stream.
750    // If that threw an exception,
751    // then run errorSteps with that exception and return.
752    let reader = match stream.acquire_default_reader(can_gc) {
753        Ok(r) => r,
754        Err(e) => {
755            promise.reject_error(e, can_gc);
756            return promise;
757        },
758    };
759
760    // Let errorSteps given error be to reject promise with error.
761    let error_promise = promise.clone();
762
763    // Let successSteps given a byte sequence data be to resolve promise
764    // with the result of running convertBytesToJSValue with data.
765    // If that threw an exception, then run errorSteps with that exception.
766    let mime_type = object.get_mime_type(can_gc);
767    let success_promise = promise.clone();
768
769    // Read all bytes from reader, given successSteps and errorSteps.
770    // Note: spec uses an intermediary concept of `fully_read`,
771    // which seems useful when invoking fetch from other places.
772    // TODO: #36049
773    reader.read_all_bytes(
774        cx,
775        Rc::new(move |bytes: &[u8]| {
776            resolve_result_promise(
777                body_type,
778                &success_promise,
779                mime_type.clone(),
780                bytes.to_vec(),
781                cx,
782                can_gc,
783            );
784        }),
785        Rc::new(move |cx, v| {
786            error_promise.reject(cx, v, can_gc);
787        }),
788        can_gc,
789    );
790
791    promise
792}
793
794/// The success steps of
795/// <https://fetch.spec.whatwg.org/#concept-body-consume-body>.
796fn resolve_result_promise(
797    body_type: BodyType,
798    promise: &Promise,
799    mime_type: Vec<u8>,
800    body: Vec<u8>,
801    cx: JSContext,
802    can_gc: CanGc,
803) {
804    let pkg_data_results = run_package_data_algorithm(cx, body, body_type, mime_type, can_gc);
805
806    match pkg_data_results {
807        Ok(results) => {
808            match results {
809                FetchedData::Text(s) => promise.resolve_native(&USVString(s), can_gc),
810                FetchedData::Json(j) => promise.resolve_native(&j, can_gc),
811                FetchedData::BlobData(b) => promise.resolve_native(&b, can_gc),
812                FetchedData::FormData(f) => promise.resolve_native(&f, can_gc),
813                FetchedData::Bytes(b) => promise.resolve_native(&b, can_gc),
814                FetchedData::ArrayBuffer(a) => promise.resolve_native(&a, can_gc),
815                FetchedData::JSException(e) => promise.reject_native(&e.handle(), can_gc),
816            };
817        },
818        Err(err) => promise.reject_error(err, can_gc),
819    }
820}
821
822/// The algorithm that takes a byte sequence
823/// and returns a JavaScript value or throws an exception of
824/// <https://fetch.spec.whatwg.org/#concept-body-consume-body>.
825fn run_package_data_algorithm(
826    cx: JSContext,
827    bytes: Vec<u8>,
828    body_type: BodyType,
829    mime_type: Vec<u8>,
830    can_gc: CanGc,
831) -> Fallible<FetchedData> {
832    let mime = &*mime_type;
833    let in_realm_proof = AlreadyInRealm::assert_for_cx(cx);
834    let global = GlobalScope::from_safe_context(cx, InRealm::Already(&in_realm_proof));
835    match body_type {
836        BodyType::Text => run_text_data_algorithm(bytes),
837        BodyType::Json => run_json_data_algorithm(cx, bytes),
838        BodyType::Blob => run_blob_data_algorithm(&global, bytes, mime, can_gc),
839        BodyType::FormData => run_form_data_algorithm(&global, bytes, mime, can_gc),
840        BodyType::ArrayBuffer => run_array_buffer_data_algorithm(cx, bytes, can_gc),
841        BodyType::Bytes => run_bytes_data_algorithm(cx, bytes, can_gc),
842    }
843}
844
845/// <https://fetch.spec.whatwg.org/#ref-for-concept-body-consume-body%E2%91%A4>
846fn run_text_data_algorithm(bytes: Vec<u8>) -> Fallible<FetchedData> {
847    // This implements the Encoding standard's "decode UTF-8", which removes the
848    // BOM if present.
849    let no_bom_bytes = if bytes.starts_with(b"\xEF\xBB\xBF") {
850        &bytes[3..]
851    } else {
852        &bytes
853    };
854    Ok(FetchedData::Text(
855        String::from_utf8_lossy(no_bom_bytes).into_owned(),
856    ))
857}
858
859#[expect(unsafe_code)]
860/// <https://fetch.spec.whatwg.org/#ref-for-concept-body-consume-body%E2%91%A3>
861fn run_json_data_algorithm(cx: JSContext, bytes: Vec<u8>) -> Fallible<FetchedData> {
862    // The JSON spec allows implementations to either ignore UTF-8 BOM or treat it as an error.
863    // `JS_ParseJSON` treats this as an error, so it is necessary for us to strip it if present.
864    //
865    // https://datatracker.ietf.org/doc/html/rfc8259#section-8.1
866    let json_text = decode_to_utf16_with_bom_removal(&bytes, UTF_8);
867    rooted!(in(*cx) let mut rval = UndefinedValue());
868    unsafe {
869        if !JS_ParseJSON(
870            *cx,
871            json_text.as_ptr(),
872            json_text.len() as u32,
873            rval.handle_mut(),
874        ) {
875            rooted!(in(*cx) let mut exception = UndefinedValue());
876            assert!(JS_GetPendingException(*cx, exception.handle_mut()));
877            JS_ClearPendingException(*cx);
878            return Ok(FetchedData::JSException(RootedTraceableBox::from_box(
879                Heap::boxed(exception.get()),
880            )));
881        }
882        let rooted_heap = RootedTraceableBox::from_box(Heap::boxed(rval.get()));
883        Ok(FetchedData::Json(rooted_heap))
884    }
885}
886
887/// <https://fetch.spec.whatwg.org/#ref-for-concept-body-consume-body%E2%91%A0>
888fn run_blob_data_algorithm(
889    root: &GlobalScope,
890    bytes: Vec<u8>,
891    mime: &[u8],
892    can_gc: CanGc,
893) -> Fallible<FetchedData> {
894    let mime_string = if let Ok(s) = String::from_utf8(mime.to_vec()) {
895        s
896    } else {
897        "".to_string()
898    };
899    let blob = Blob::new(
900        root,
901        BlobImpl::new_from_bytes(bytes, normalize_type_string(&mime_string)),
902        can_gc,
903    );
904    Ok(FetchedData::BlobData(blob))
905}
906
907/// <https://fetch.spec.whatwg.org/#ref-for-concept-body-consume-body%E2%91%A2>
908fn run_form_data_algorithm(
909    root: &GlobalScope,
910    bytes: Vec<u8>,
911    mime: &[u8],
912    can_gc: CanGc,
913) -> Fallible<FetchedData> {
914    let mime_str = str::from_utf8(mime).unwrap_or_default();
915    let mime: Mime = mime_str
916        .parse()
917        .map_err(|_| Error::Type("Inappropriate MIME-type for Body".to_string()))?;
918
919    // TODO
920    // ... Parser for Mime(TopLevel::Multipart, SubLevel::FormData, _)
921    // ... is not fully determined yet.
922    if mime.type_() == mime::APPLICATION && mime.subtype() == mime::WWW_FORM_URLENCODED {
923        let entries = form_urlencoded::parse(&bytes);
924        let formdata = FormData::new(None, root, can_gc);
925        for (k, e) in entries {
926            formdata.Append(USVString(k.into_owned()), USVString(e.into_owned()));
927        }
928        return Ok(FetchedData::FormData(formdata));
929    }
930
931    Err(Error::Type("Inappropriate MIME-type for Body".to_string()))
932}
933
934/// <https://fetch.spec.whatwg.org/#ref-for-concept-body-consume-body%E2%91%A1>
935fn run_bytes_data_algorithm(cx: JSContext, bytes: Vec<u8>, can_gc: CanGc) -> Fallible<FetchedData> {
936    rooted!(in(*cx) let mut array_buffer_ptr = ptr::null_mut::<JSObject>());
937
938    create_buffer_source::<Uint8>(cx, &bytes, array_buffer_ptr.handle_mut(), can_gc)
939        .map_err(|_| Error::JSFailed)?;
940
941    let rooted_heap = RootedTraceableBox::from_box(Heap::boxed(array_buffer_ptr.get()));
942    Ok(FetchedData::Bytes(rooted_heap))
943}
944
945/// <https://fetch.spec.whatwg.org/#ref-for-concept-body-consume-body>
946pub(crate) fn run_array_buffer_data_algorithm(
947    cx: JSContext,
948    bytes: Vec<u8>,
949    can_gc: CanGc,
950) -> Fallible<FetchedData> {
951    rooted!(in(*cx) let mut array_buffer_ptr = ptr::null_mut::<JSObject>());
952
953    create_buffer_source::<ArrayBufferU8>(cx, &bytes, array_buffer_ptr.handle_mut(), can_gc)
954        .map_err(|_| Error::JSFailed)?;
955
956    let rooted_heap = RootedTraceableBox::from_box(Heap::boxed(array_buffer_ptr.get()));
957    Ok(FetchedData::ArrayBuffer(rooted_heap))
958}
959
960#[expect(unsafe_code)]
961pub(crate) fn decode_to_utf16_with_bom_removal(
962    bytes: &[u8],
963    encoding: &'static Encoding,
964) -> Vec<u16> {
965    let mut decoder = encoding.new_decoder_with_bom_removal();
966    let capacity = decoder
967        .max_utf16_buffer_length(bytes.len())
968        .expect("Overflow");
969    let mut utf16 = Vec::with_capacity(capacity);
970    let extra = unsafe { slice::from_raw_parts_mut(utf16.as_mut_ptr(), capacity) };
971    let (_, read, written, _) = decoder.decode_to_utf16(bytes, extra, true);
972    assert_eq!(read, bytes.len());
973    unsafe { utf16.set_len(written) }
974    utf16
975}
976
977/// <https://fetch.spec.whatwg.org/#body>
978pub(crate) trait BodyMixin {
979    /// <https://fetch.spec.whatwg.org/#dom-body-bodyused>
980    fn is_body_used(&self) -> bool;
981    /// <https://fetch.spec.whatwg.org/#body-unusable>
982    fn is_unusable(&self) -> bool;
983    /// <https://fetch.spec.whatwg.org/#dom-body-body>
984    fn body(&self) -> Option<DomRoot<ReadableStream>>;
985    /// <https://fetch.spec.whatwg.org/#concept-body-mime-type>
986    fn get_mime_type(&self, can_gc: CanGc) -> Vec<u8>;
987}