Skip to main content

script/dom/
eventsource.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
4
5use std::cell::Cell;
6use std::mem;
7use std::str::{Chars, FromStr};
8use std::time::Duration;
9
10use dom_struct::dom_struct;
11use encoding_rs::{Decoder, UTF_8};
12use headers::ContentType;
13use http::StatusCode;
14use http::header::{self, HeaderName, HeaderValue};
15use js::jsval::UndefinedValue;
16use js::rust::HandleObject;
17use mime::{self, Mime};
18use net_traits::request::{CacheMode, CorsSettings, Destination, RequestBuilder, RequestId};
19use net_traits::{FetchMetadata, FilteredMetadata, NetworkError, ResourceFetchTiming};
20use script_bindings::cell::DomRefCell;
21use script_bindings::conversions::SafeToJSValConvertible;
22use script_bindings::reflector::reflect_dom_object_with_proto;
23use servo_url::ServoUrl;
24use stylo_atoms::Atom;
25
26use crate::dom::bindings::codegen::Bindings::EventSourceBinding::{
27    EventSourceInit, EventSourceMethods,
28};
29use crate::dom::bindings::error::{Error, Fallible};
30use crate::dom::bindings::inheritance::Castable;
31use crate::dom::bindings::refcounted::Trusted;
32use crate::dom::bindings::reflector::DomGlobal;
33use crate::dom::bindings::root::DomRoot;
34use crate::dom::bindings::str::DOMString;
35use crate::dom::csp::{GlobalCspReporting, Violation};
36use crate::dom::event::Event;
37use crate::dom::eventtarget::EventTarget;
38use crate::dom::globalscope::GlobalScope;
39use crate::dom::messageevent::MessageEvent;
40use crate::dom::performance::performanceresourcetiming::InitiatorType;
41use crate::fetch::{FetchCanceller, RequestWithGlobalScope, create_a_potential_cors_request};
42use crate::network_listener::{self, FetchResponseListener, ResourceTimingListener};
43use crate::realms::enter_realm;
44use crate::script_runtime::CanGc;
45use crate::timers::OneshotTimerCallback;
46
47const DEFAULT_RECONNECTION_TIME: Duration = Duration::from_millis(5000);
48
49#[derive(Clone, Copy, Debug, JSTraceable, MallocSizeOf, PartialEq)]
50struct GenerationId(u32);
51
52#[derive(Clone, Copy, Debug, JSTraceable, MallocSizeOf, PartialEq)]
53/// <https://html.spec.whatwg.org/multipage/#dom-eventsource-readystate>
54enum ReadyState {
55    Connecting = 0,
56    Open = 1,
57    Closed = 2,
58}
59
60#[derive(JSTraceable, MallocSizeOf)]
61struct DroppableEventSource {
62    canceller: DomRefCell<FetchCanceller>,
63}
64
65impl DroppableEventSource {
66    pub(crate) fn new(canceller: DomRefCell<FetchCanceller>) -> Self {
67        DroppableEventSource { canceller }
68    }
69
70    pub(crate) fn cancel(&self) {
71        self.canceller.borrow_mut().abort();
72    }
73
74    pub(crate) fn set_canceller(&self, data: FetchCanceller) {
75        *self.canceller.borrow_mut() = data;
76    }
77}
78
79// https://html.spec.whatwg.org/multipage/#garbage-collection-2
80impl Drop for DroppableEventSource {
81    fn drop(&mut self) {
82        // If an EventSource object is garbage collected while its connection is still open,
83        // the user agent must abort any instance of the fetch algorithm opened by this EventSource.
84        self.cancel();
85    }
86}
87
88#[dom_struct]
89pub(crate) struct EventSource {
90    eventtarget: EventTarget,
91    #[no_trace]
92    url: ServoUrl,
93    #[no_trace]
94    request: DomRefCell<Option<RequestBuilder>>,
95    last_event_id: DomRefCell<DOMString>,
96    reconnection_time: Cell<Duration>,
97    generation_id: Cell<GenerationId>,
98
99    ready_state: Cell<ReadyState>,
100    with_credentials: bool,
101    droppable: DroppableEventSource,
102}
103
104#[derive(Clone, MallocSizeOf)]
105enum ParserState {
106    Field,
107    Comment,
108    Value,
109    Eol,
110}
111
112#[derive(MallocSizeOf)]
113struct EventSourceContext {
114    decoder: Decoder,
115    event_source: Trusted<EventSource>,
116    gen_id: GenerationId,
117    parser_state: ParserState,
118    field: String,
119    value: String,
120    origin: String,
121    event_type: String,
122    data: String,
123    last_event_id: String,
124}
125
126impl Clone for EventSourceContext {
127    fn clone(&self) -> Self {
128        EventSourceContext {
129            decoder: UTF_8.new_decoder_with_bom_removal(),
130            event_source: self.event_source.clone(),
131            gen_id: self.gen_id,
132            parser_state: self.parser_state.clone(),
133            field: self.field.clone(),
134            value: self.value.clone(),
135            origin: self.origin.clone(),
136            event_type: self.event_type.clone(),
137            data: self.data.clone(),
138            last_event_id: self.last_event_id.clone(),
139        }
140    }
141}
142
143impl EventSourceContext {
144    /// <https://html.spec.whatwg.org/multipage/#announce-the-connection>
145    fn announce_the_connection(&self) {
146        let event_source = self.event_source.root();
147        if self.gen_id != event_source.generation_id.get() {
148            return;
149        }
150        let global = event_source.global();
151        let event_source = self.event_source.clone();
152        global.task_manager().remote_event_task_source().queue(
153            task!(announce_the_event_source_connection: move |cx| {
154                let event_source = event_source.root();
155                if event_source.ready_state.get() != ReadyState::Closed {
156                    event_source.ready_state.set(ReadyState::Open);
157                    event_source.upcast::<EventTarget>().fire_event(cx, atom!("open"));
158                }
159            }),
160        );
161    }
162
163    /// <https://html.spec.whatwg.org/multipage/#fail-the-connection>
164    fn fail_the_connection(&self) {
165        let event_source = self.event_source.root();
166        if self.gen_id != event_source.generation_id.get() {
167            return;
168        }
169        event_source.fail_the_connection();
170    }
171
172    /// <https://html.spec.whatwg.org/multipage/#reestablish-the-connection>
173    fn reestablish_the_connection(&self) {
174        let event_source = self.event_source.root();
175
176        if self.gen_id != event_source.generation_id.get() {
177            return;
178        }
179
180        let trusted_event_source = self.event_source.clone();
181        let global = event_source.global();
182        let event_source_context = EventSourceContext {
183            decoder: UTF_8.new_decoder_with_bom_removal(),
184            event_source: self.event_source.clone(),
185            gen_id: self.gen_id,
186            parser_state: ParserState::Eol,
187            field: String::new(),
188            value: String::new(),
189            origin: self.origin.clone(),
190            event_type: String::new(),
191            data: String::new(),
192            last_event_id: String::from(event_source.last_event_id.borrow().clone()),
193        };
194        global.task_manager().remote_event_task_source().queue(
195            task!(reestablish_the_event_source_onnection: move |cx| {
196                let event_source = trusted_event_source.root();
197
198                // Step 1.1.
199                if event_source.ready_state.get() == ReadyState::Closed {
200                    return;
201                }
202
203                // Step 1.2.
204                event_source.ready_state.set(ReadyState::Connecting);
205
206                // Step 1.3.
207                event_source.upcast::<EventTarget>().fire_event(cx, atom!("error"));
208
209                // Step 2.
210                let duration = event_source.reconnection_time.get();
211
212                // Step 3.
213                // TODO: Optionally wait some more.
214
215                // Steps 4-5.
216                let callback = OneshotTimerCallback::EventSourceTimeout(
217                    EventSourceTimeoutCallback {
218                        event_source: trusted_event_source,
219                        event_source_context,
220                    }
221                );
222                event_source.global().schedule_callback(callback, duration);
223            }),
224        );
225    }
226
227    /// <https://html.spec.whatwg.org/multipage/#processField>
228    fn process_field(&mut self) {
229        match &*self.field {
230            "event" => mem::swap(&mut self.event_type, &mut self.value),
231            "data" => {
232                self.data.push_str(&self.value);
233                self.data.push('\n');
234            },
235            "id" if !self.value.contains('\0') => {
236                mem::swap(&mut self.last_event_id, &mut self.value);
237            },
238            "retry" => {
239                if let Ok(time) = u64::from_str(&self.value) {
240                    self.event_source
241                        .root()
242                        .reconnection_time
243                        .set(Duration::from_millis(time));
244                }
245            },
246            _ => (),
247        }
248
249        self.field.clear();
250        self.value.clear();
251    }
252
253    /// <https://html.spec.whatwg.org/multipage/#dispatchMessage>
254    fn dispatch_event(&mut self, cx: &mut js::context::JSContext) {
255        let event_source = self.event_source.root();
256        // Step 1
257        *event_source.last_event_id.borrow_mut() = DOMString::from(self.last_event_id.clone());
258        // Step 2
259        if self.data.is_empty() {
260            self.data.clear();
261            self.event_type.clear();
262            return;
263        }
264        // Step 3
265        if let Some(last) = self.data.pop() &&
266            last != '\n'
267        {
268            self.data.push(last);
269        }
270        // Step 6
271        let type_ = if !self.event_type.is_empty() {
272            Atom::from(self.event_type.clone())
273        } else {
274            atom!("message")
275        };
276        // Steps 4-5
277        let event = {
278            let _ac = enter_realm(&*event_source);
279            rooted!(&in(cx) let mut data = UndefinedValue());
280            self.data
281                .safe_to_jsval(cx.into(), data.handle_mut(), CanGc::from_cx(cx));
282            MessageEvent::new(
283                &event_source.global(),
284                type_,
285                false,
286                false,
287                data.handle(),
288                DOMString::from(self.origin.clone()),
289                None,
290                event_source.last_event_id.borrow().clone(),
291                Vec::with_capacity(0),
292                CanGc::from_cx(cx),
293            )
294        };
295        // Step 7
296        self.event_type.clear();
297        self.data.clear();
298
299        // Step 8.
300        let global = event_source.global();
301        let event_source = self.event_source.clone();
302        let event = Trusted::new(&*event);
303        global.task_manager().remote_event_task_source().queue(
304            task!(dispatch_the_event_source_event: move |cx| {
305                let event_source = event_source.root();
306                if event_source.ready_state.get() != ReadyState::Closed {
307                    event.root().upcast::<Event>().fire(event_source.upcast(), CanGc::from_cx(cx));
308                }
309            }),
310        );
311    }
312
313    /// <https://html.spec.whatwg.org/multipage/#event-stream-interpretation>
314    fn parse(&mut self, cx: &mut js::context::JSContext, stream: Chars) {
315        let mut stream = stream.peekable();
316
317        while let Some(ch) = stream.next() {
318            match (ch, &self.parser_state) {
319                (':', &ParserState::Eol) => self.parser_state = ParserState::Comment,
320                (':', &ParserState::Field) => {
321                    self.parser_state = ParserState::Value;
322                    if let Some(&' ') = stream.peek() {
323                        stream.next();
324                    }
325                },
326
327                ('\n', &ParserState::Value) => {
328                    self.parser_state = ParserState::Eol;
329                    self.process_field();
330                },
331                ('\r', &ParserState::Value) => {
332                    if let Some(&'\n') = stream.peek() {
333                        continue;
334                    }
335                    self.parser_state = ParserState::Eol;
336                    self.process_field();
337                },
338
339                ('\n', &ParserState::Field) => {
340                    self.parser_state = ParserState::Eol;
341                    self.process_field();
342                },
343                ('\r', &ParserState::Field) => {
344                    if let Some(&'\n') = stream.peek() {
345                        continue;
346                    }
347                    self.parser_state = ParserState::Eol;
348                    self.process_field();
349                },
350
351                ('\n', &ParserState::Eol) => self.dispatch_event(cx),
352                ('\r', &ParserState::Eol) => {
353                    if let Some(&'\n') = stream.peek() {
354                        continue;
355                    }
356                    self.dispatch_event(cx);
357                },
358
359                ('\n', &ParserState::Comment) => self.parser_state = ParserState::Eol,
360                ('\r', &ParserState::Comment) => {
361                    if let Some(&'\n') = stream.peek() {
362                        continue;
363                    }
364                    self.parser_state = ParserState::Eol;
365                },
366
367                (_, &ParserState::Field) => self.field.push(ch),
368                (_, &ParserState::Value) => self.value.push(ch),
369                (_, &ParserState::Eol) => {
370                    self.parser_state = ParserState::Field;
371                    self.field.push(ch);
372                },
373                (_, &ParserState::Comment) => (),
374            }
375        }
376    }
377}
378
379impl FetchResponseListener for EventSourceContext {
380    fn should_invoke(&self) -> bool {
381        self.event_source.root().generation_id.get() == self.gen_id
382    }
383
384    fn process_request_body(&mut self, _: RequestId) {
385        // TODO
386    }
387
388    fn process_response(
389        &mut self,
390        _: &mut js::context::JSContext,
391        _: RequestId,
392        metadata: Result<FetchMetadata, NetworkError>,
393    ) {
394        match metadata {
395            Ok(fm) => {
396                let meta = match fm {
397                    FetchMetadata::Unfiltered(m) => m,
398                    FetchMetadata::Filtered { unsafe_, filtered } => match filtered {
399                        FilteredMetadata::Opaque | FilteredMetadata::OpaqueRedirect(_) => {
400                            return self.fail_the_connection();
401                        },
402                        _ => unsafe_,
403                    },
404                };
405                // Step 15.3 if res's status is not 200, or if res's `Content-Type` is not
406                // `text/event-stream`, then fail the connection.
407                if meta.status.code() != StatusCode::OK {
408                    return self.fail_the_connection();
409                }
410                let mime = match meta.content_type {
411                    None => return self.fail_the_connection(),
412                    Some(ct) => <ContentType as Into<Mime>>::into(ct.into_inner()),
413                };
414                if (mime.type_(), mime.subtype()) != (mime::TEXT, mime::EVENT_STREAM) {
415                    return self.fail_the_connection();
416                }
417                self.origin = meta.final_url.origin().ascii_serialization();
418                // Step 15.4 announce the connection and interpret res's body line by line.
419                self.announce_the_connection();
420            },
421            Err(error) => {
422                // Step 15.2 if res is a network error, then reestablish the connection, unless
423                // the user agent knows that to be futile, in which case the user agent may
424                // fail the connection.
425                if error.is_permanent_failure() {
426                    self.fail_the_connection()
427                } else {
428                    self.reestablish_the_connection()
429                }
430            },
431        }
432    }
433
434    fn process_response_chunk(
435        &mut self,
436        cx: &mut js::context::JSContext,
437        _: RequestId,
438        chunk: Vec<u8>,
439    ) {
440        let mut output = String::with_capacity(chunk.len());
441        let mut input = &chunk[..];
442
443        loop {
444            if input.is_empty() {
445                return;
446            }
447            let (result, bytes_read) =
448                self.decoder
449                    .decode_to_string_without_replacement(input, &mut output, false);
450            match result {
451                encoding_rs::DecoderResult::InputEmpty => {
452                    self.parse(cx, output.chars());
453                    return;
454                },
455                encoding_rs::DecoderResult::Malformed(_, _) => {
456                    self.parse(cx, output.chars());
457                    self.parse(cx, "\u{FFFD}".chars());
458                    output.clear();
459                    input = &input[bytes_read..];
460                },
461                encoding_rs::DecoderResult::OutputFull => {
462                    self.parse(cx, output.chars());
463                    output.clear();
464                    input = &input[bytes_read..];
465                },
466            }
467        }
468    }
469
470    fn process_response_eof(
471        mut self,
472        cx: &mut js::context::JSContext,
473        _: RequestId,
474        response: Result<(), NetworkError>,
475        timing: ResourceFetchTiming,
476    ) {
477        let mut output = String::new();
478        let (result, _) = self
479            .decoder
480            .decode_to_string_without_replacement(&[], &mut output, true);
481        if !output.is_empty() {
482            self.parse(cx, "\u{FFFD}".chars());
483        }
484        if matches!(result, encoding_rs::DecoderResult::Malformed(_, _)) {
485            self.parse(cx, "\u{FFFD}".chars());
486        }
487        if response.is_ok() {
488            self.reestablish_the_connection();
489        }
490
491        network_listener::submit_timing(cx, &self, &response, &timing);
492    }
493
494    fn process_csp_violations(&mut self, _request_id: RequestId, violations: Vec<Violation>) {
495        let global = &self.resource_timing_global();
496        global.report_csp_violations(violations, None, None);
497    }
498}
499
500impl ResourceTimingListener for EventSourceContext {
501    fn resource_timing_information(&self) -> (InitiatorType, ServoUrl) {
502        (InitiatorType::Other, self.event_source.root().url().clone())
503    }
504
505    fn resource_timing_global(&self) -> DomRoot<GlobalScope> {
506        self.event_source.root().global()
507    }
508}
509
510impl EventSource {
511    fn new_inherited(url: ServoUrl, with_credentials: bool) -> EventSource {
512        EventSource {
513            eventtarget: EventTarget::new_inherited(),
514            url,
515            request: DomRefCell::new(None),
516            last_event_id: DomRefCell::new(DOMString::from("")),
517            reconnection_time: Cell::new(DEFAULT_RECONNECTION_TIME),
518            generation_id: Cell::new(GenerationId(0)),
519
520            ready_state: Cell::new(ReadyState::Connecting),
521            with_credentials,
522            droppable: DroppableEventSource::new(DomRefCell::new(Default::default())),
523        }
524    }
525
526    fn new(
527        global: &GlobalScope,
528        proto: Option<HandleObject>,
529        url: ServoUrl,
530        with_credentials: bool,
531        can_gc: CanGc,
532    ) -> DomRoot<EventSource> {
533        reflect_dom_object_with_proto(
534            Box::new(EventSource::new_inherited(url, with_credentials)),
535            global,
536            proto,
537            can_gc,
538        )
539    }
540
541    // https://html.spec.whatwg.org/multipage/#sse-processing-model:fail-the-connection-3
542    pub(crate) fn cancel(&self) {
543        self.droppable.cancel();
544        self.fail_the_connection();
545    }
546
547    /// <https://html.spec.whatwg.org/multipage/#fail-the-connection>
548    pub(crate) fn fail_the_connection(&self) {
549        let global = self.global();
550        let event_source = Trusted::new(self);
551        global.task_manager().remote_event_task_source().queue(
552            task!(fail_the_event_source_connection: move |cx| {
553                let event_source = event_source.root();
554                if event_source.ready_state.get() != ReadyState::Closed {
555                    event_source.ready_state.set(ReadyState::Closed);
556                    event_source.upcast::<EventTarget>().fire_event(cx, atom!("error"));
557                }
558            }),
559        );
560    }
561
562    pub(crate) fn request(&self) -> RequestBuilder {
563        self.request.borrow().clone().unwrap()
564    }
565
566    pub(crate) fn url(&self) -> &ServoUrl {
567        &self.url
568    }
569}
570
571impl EventSourceMethods<crate::DomTypeHolder> for EventSource {
572    /// <https://html.spec.whatwg.org/multipage/#dom-eventsource>
573    fn Constructor(
574        global: &GlobalScope,
575        proto: Option<HandleObject>,
576        can_gc: CanGc,
577        url: DOMString,
578        event_source_init: &EventSourceInit,
579    ) -> Fallible<DomRoot<EventSource>> {
580        // Step 2. Let settings be the relevant settings object for the `EventSource` constructor.
581        // Bindings pass that environment as `global`.
582        // Step 3. Let urlRecord be the result of encoding-parsing a URL given url, relative to settings.
583        let url_record = match global.encoding_parse_a_url(&url.str()) {
584            Ok(u) => u,
585            // Step 4 If urlRecord is failure, then throw a "SyntaxError" DOMException.
586            Err(_) => return Err(Error::Syntax(None)),
587        };
588        // Step 1 Let ev be a new EventSource object.
589        let event_source = EventSource::new(
590            global,
591            proto,
592            // Step 5 Set ev's url to urlRecord.
593            url_record.clone(),
594            event_source_init.withCredentials,
595            can_gc,
596        );
597        global.track_event_source(&event_source);
598        let cors_attribute_state = if event_source_init.withCredentials {
599            // Step 7 If the value of eventSourceInitDict's withCredentials member is true,
600            // then set corsAttributeState to Use Credentials and set ev's withCredentials
601            // attribute to true.
602            CorsSettings::UseCredentials
603        } else {
604            // Step 6 Let corsAttributeState be Anonymous.
605            CorsSettings::Anonymous
606        };
607        // Step 8 Let request be the result of creating a potential-CORS request
608        // given urlRecord, the empty string, and corsAttributeState.
609        // Step 9. Set request's client to the environment settings object and other state;
610        // `with_global_scope` supplies client, origin, policy container, etc.
611        let mut request = create_a_potential_cors_request(
612            global.webview_id(),
613            url_record,
614            Destination::None,
615            Some(cors_attribute_state),
616            Some(true),
617            global.get_referrer(),
618        )
619        .with_global_scope(global);
620
621        // Step 10 User agents may set (`Accept`, `text/event-stream`) in request's header list.
622        // TODO(eijebong): Replace once typed headers allow it
623        request.headers.insert(
624            header::ACCEPT,
625            HeaderValue::from_static("text/event-stream"),
626        );
627        // Step 11 Set request's cache mode to "no-store".
628        request.cache_mode = CacheMode::NoStore;
629        // Step 13 Set ev's request to request.
630        *event_source.request.borrow_mut() = Some(request.clone());
631        // Step 14 Let processEventSourceEndOfBody given response res be the following step:
632        // if res is not a network error, then reestablish the connection.
633
634        event_source.droppable.set_canceller(FetchCanceller::new(
635            request.id,
636            false,
637            global.core_resource_thread(),
638        ));
639
640        let context = EventSourceContext {
641            decoder: UTF_8.new_decoder_with_bom_removal(),
642            event_source: Trusted::new(&event_source),
643            gen_id: event_source.generation_id.get(),
644            parser_state: ParserState::Eol,
645            field: String::new(),
646            value: String::new(),
647            origin: String::new(),
648
649            event_type: String::new(),
650            data: String::new(),
651            last_event_id: String::new(),
652        };
653
654        let task_source = global.task_manager().networking_task_source().into();
655        global.fetch(request, context, task_source);
656
657        // Step 16 Return ev.
658        Ok(event_source)
659    }
660
661    // https://html.spec.whatwg.org/multipage/#handler-eventsource-onopen
662    event_handler!(open, GetOnopen, SetOnopen);
663
664    // https://html.spec.whatwg.org/multipage/#handler-eventsource-onmessage
665    event_handler!(message, GetOnmessage, SetOnmessage);
666
667    // https://html.spec.whatwg.org/multipage/#handler-eventsource-onerror
668    event_handler!(error, GetOnerror, SetOnerror);
669
670    /// <https://html.spec.whatwg.org/multipage/#dom-eventsource-url>
671    fn Url(&self) -> DOMString {
672        DOMString::from(self.url.as_str())
673    }
674
675    /// <https://html.spec.whatwg.org/multipage/#dom-eventsource-withcredentials>
676    fn WithCredentials(&self) -> bool {
677        self.with_credentials
678    }
679
680    /// <https://html.spec.whatwg.org/multipage/#dom-eventsource-readystate>
681    fn ReadyState(&self) -> u16 {
682        self.ready_state.get() as u16
683    }
684
685    /// <https://html.spec.whatwg.org/multipage/#dom-eventsource-close>
686    fn Close(&self) {
687        let GenerationId(prev_id) = self.generation_id.get();
688        self.generation_id.set(GenerationId(prev_id + 1));
689        self.droppable.cancel();
690        self.ready_state.set(ReadyState::Closed);
691    }
692}
693
694#[derive(JSTraceable, MallocSizeOf)]
695pub(crate) struct EventSourceTimeoutCallback {
696    #[ignore_malloc_size_of = "Because it is non-owning"]
697    event_source: Trusted<EventSource>,
698    #[no_trace]
699    event_source_context: EventSourceContext,
700}
701
702impl EventSourceTimeoutCallback {
703    /// <https://html.spec.whatwg.org/multipage/#reestablish-the-connection>
704    pub(crate) fn invoke(self) {
705        let event_source = self.event_source.root();
706        let global = event_source.global();
707
708        // Step 5.1: If the EventSource object's readyState attribute is not set to CONNECTING, then return.
709        if event_source.ready_state.get() != ReadyState::Connecting {
710            return;
711        }
712
713        // Step 5.2: Let request be the EventSource object's request.
714        let mut request = event_source.request();
715
716        // Step 5.3: If the EventSource object's last event ID string is not the empty string, then:
717        //  - Let lastEventIDValue be the EventSource object's last event ID string, encoded as UTF-8.
718        //  - Set (`Last-Event-ID`, lastEventIDValue) in request's header list.
719        if !event_source.last_event_id.borrow().is_empty() {
720            // TODO(eijebong): Change this once typed header support custom values
721            request.headers.insert(
722                HeaderName::from_static("last-event-id"),
723                HeaderValue::from_str(&String::from(event_source.last_event_id.borrow().clone()))
724                    .unwrap(),
725            );
726        }
727
728        // Step 5.4: Fetch request and process the response obtained in this fashion, if
729        // any, as described earlier in this section.
730        let task_source = global.task_manager().networking_task_source().into();
731        global.fetch(request, self.event_source_context, task_source);
732    }
733}