script/dom/
eventsource.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
4
5use std::cell::Cell;
6use std::mem;
7use std::str::{Chars, FromStr};
8use std::time::Duration;
9
10use dom_struct::dom_struct;
11use headers::ContentType;
12use http::StatusCode;
13use http::header::{self, HeaderName, HeaderValue};
14use js::jsval::UndefinedValue;
15use js::rust::HandleObject;
16use mime::{self, Mime};
17use net_traits::request::{CacheMode, CorsSettings, Destination, RequestBuilder, RequestId};
18use net_traits::{FetchMetadata, FilteredMetadata, NetworkError, ResourceFetchTiming};
19use script_bindings::conversions::SafeToJSValConvertible;
20use servo_url::ServoUrl;
21use stylo_atoms::Atom;
22
23use crate::dom::bindings::cell::DomRefCell;
24use crate::dom::bindings::codegen::Bindings::EventSourceBinding::{
25    EventSourceInit, EventSourceMethods,
26};
27use crate::dom::bindings::error::{Error, Fallible};
28use crate::dom::bindings::inheritance::Castable;
29use crate::dom::bindings::refcounted::Trusted;
30use crate::dom::bindings::reflector::{DomGlobal, reflect_dom_object_with_proto};
31use crate::dom::bindings::root::DomRoot;
32use crate::dom::bindings::str::DOMString;
33use crate::dom::csp::{GlobalCspReporting, Violation};
34use crate::dom::event::Event;
35use crate::dom::eventtarget::EventTarget;
36use crate::dom::globalscope::GlobalScope;
37use crate::dom::messageevent::MessageEvent;
38use crate::dom::performance::performanceresourcetiming::InitiatorType;
39use crate::fetch::{FetchCanceller, RequestWithGlobalScope, create_a_potential_cors_request};
40use crate::network_listener::{self, FetchResponseListener, ResourceTimingListener};
41use crate::realms::enter_realm;
42use crate::script_runtime::CanGc;
43use crate::timers::OneshotTimerCallback;
44
45const DEFAULT_RECONNECTION_TIME: Duration = Duration::from_millis(5000);
46
47#[derive(Clone, Copy, Debug, JSTraceable, MallocSizeOf, PartialEq)]
48struct GenerationId(u32);
49
50#[derive(Clone, Copy, Debug, JSTraceable, MallocSizeOf, PartialEq)]
51/// <https://html.spec.whatwg.org/multipage/#dom-eventsource-readystate>
52enum ReadyState {
53    Connecting = 0,
54    Open = 1,
55    Closed = 2,
56}
57
58#[derive(JSTraceable, MallocSizeOf)]
59struct DroppableEventSource {
60    canceller: DomRefCell<FetchCanceller>,
61}
62
63impl DroppableEventSource {
64    pub(crate) fn new(canceller: DomRefCell<FetchCanceller>) -> Self {
65        DroppableEventSource { canceller }
66    }
67
68    pub(crate) fn cancel(&self) {
69        self.canceller.borrow_mut().abort();
70    }
71
72    pub(crate) fn set_canceller(&self, data: FetchCanceller) {
73        *self.canceller.borrow_mut() = data;
74    }
75}
76
77// https://html.spec.whatwg.org/multipage/#garbage-collection-2
78impl Drop for DroppableEventSource {
79    fn drop(&mut self) {
80        // If an EventSource object is garbage collected while its connection is still open,
81        // the user agent must abort any instance of the fetch algorithm opened by this EventSource.
82        self.cancel();
83    }
84}
85
86#[dom_struct]
87pub(crate) struct EventSource {
88    eventtarget: EventTarget,
89    #[no_trace]
90    url: ServoUrl,
91    #[no_trace]
92    request: DomRefCell<Option<RequestBuilder>>,
93    last_event_id: DomRefCell<DOMString>,
94    reconnection_time: Cell<Duration>,
95    generation_id: Cell<GenerationId>,
96
97    ready_state: Cell<ReadyState>,
98    with_credentials: bool,
99    droppable: DroppableEventSource,
100}
101
102#[derive(Clone, MallocSizeOf)]
103enum ParserState {
104    Field,
105    Comment,
106    Value,
107    Eol,
108}
109
110#[derive(Clone, MallocSizeOf)]
111struct EventSourceContext {
112    incomplete_utf8: Option<utf8::Incomplete>,
113    event_source: Trusted<EventSource>,
114    gen_id: GenerationId,
115    parser_state: ParserState,
116    field: String,
117    value: String,
118    origin: String,
119    event_type: String,
120    data: String,
121    last_event_id: String,
122}
123
124impl EventSourceContext {
125    /// <https://html.spec.whatwg.org/multipage/#announce-the-connection>
126    fn announce_the_connection(&self) {
127        let event_source = self.event_source.root();
128        if self.gen_id != event_source.generation_id.get() {
129            return;
130        }
131        let global = event_source.global();
132        let event_source = self.event_source.clone();
133        global.task_manager().remote_event_task_source().queue(
134            task!(announce_the_event_source_connection: move || {
135                let event_source = event_source.root();
136                if event_source.ready_state.get() != ReadyState::Closed {
137                    event_source.ready_state.set(ReadyState::Open);
138                    event_source.upcast::<EventTarget>().fire_event(atom!("open"), CanGc::note());
139                }
140            }),
141        );
142    }
143
144    /// <https://html.spec.whatwg.org/multipage/#fail-the-connection>
145    fn fail_the_connection(&self) {
146        let event_source = self.event_source.root();
147        if self.gen_id != event_source.generation_id.get() {
148            return;
149        }
150        event_source.fail_the_connection();
151    }
152
153    /// <https://html.spec.whatwg.org/multipage/#reestablish-the-connection>
154    fn reestablish_the_connection(&self) {
155        let event_source = self.event_source.root();
156
157        if self.gen_id != event_source.generation_id.get() {
158            return;
159        }
160
161        let trusted_event_source = self.event_source.clone();
162        let global = event_source.global();
163        let event_source_context = EventSourceContext {
164            incomplete_utf8: None,
165            event_source: self.event_source.clone(),
166            gen_id: self.gen_id,
167            parser_state: ParserState::Eol,
168            field: String::new(),
169            value: String::new(),
170            origin: self.origin.clone(),
171            event_type: String::new(),
172            data: String::new(),
173            last_event_id: String::from(event_source.last_event_id.borrow().clone()),
174        };
175        global.task_manager().remote_event_task_source().queue(
176            task!(reestablish_the_event_source_onnection: move || {
177                let event_source = trusted_event_source.root();
178
179                // Step 1.1.
180                if event_source.ready_state.get() == ReadyState::Closed {
181                    return;
182                }
183
184                // Step 1.2.
185                event_source.ready_state.set(ReadyState::Connecting);
186
187                // Step 1.3.
188                event_source.upcast::<EventTarget>().fire_event(atom!("error"), CanGc::note());
189
190                // Step 2.
191                let duration = event_source.reconnection_time.get();
192
193                // Step 3.
194                // TODO: Optionally wait some more.
195
196                // Steps 4-5.
197                let callback = OneshotTimerCallback::EventSourceTimeout(
198                    EventSourceTimeoutCallback {
199                        event_source: trusted_event_source,
200                        event_source_context,
201                    }
202                );
203                event_source.global().schedule_callback(callback, duration);
204            }),
205        );
206    }
207
208    /// <https://html.spec.whatwg.org/multipage/#processField>
209    fn process_field(&mut self) {
210        match &*self.field {
211            "event" => mem::swap(&mut self.event_type, &mut self.value),
212            "data" => {
213                self.data.push_str(&self.value);
214                self.data.push('\n');
215            },
216            "id" if !self.value.contains('\0') => {
217                mem::swap(&mut self.last_event_id, &mut self.value);
218            },
219            "retry" => {
220                if let Ok(time) = u64::from_str(&self.value) {
221                    self.event_source
222                        .root()
223                        .reconnection_time
224                        .set(Duration::from_millis(time));
225                }
226            },
227            _ => (),
228        }
229
230        self.field.clear();
231        self.value.clear();
232    }
233
234    /// <https://html.spec.whatwg.org/multipage/#dispatchMessage>
235    fn dispatch_event(&mut self, can_gc: CanGc) {
236        let event_source = self.event_source.root();
237        // Step 1
238        *event_source.last_event_id.borrow_mut() = DOMString::from(self.last_event_id.clone());
239        // Step 2
240        if self.data.is_empty() {
241            self.data.clear();
242            self.event_type.clear();
243            return;
244        }
245        // Step 3
246        if let Some(last) = self.data.pop() {
247            if last != '\n' {
248                self.data.push(last);
249            }
250        }
251        // Step 6
252        let type_ = if !self.event_type.is_empty() {
253            Atom::from(self.event_type.clone())
254        } else {
255            atom!("message")
256        };
257        // Steps 4-5
258        let event = {
259            let _ac = enter_realm(&*event_source);
260            rooted!(in(*GlobalScope::get_cx()) let mut data = UndefinedValue());
261            self.data
262                .safe_to_jsval(GlobalScope::get_cx(), data.handle_mut(), can_gc);
263            MessageEvent::new(
264                &event_source.global(),
265                type_,
266                false,
267                false,
268                data.handle(),
269                DOMString::from(self.origin.clone()),
270                None,
271                event_source.last_event_id.borrow().clone(),
272                Vec::with_capacity(0),
273                can_gc,
274            )
275        };
276        // Step 7
277        self.event_type.clear();
278        self.data.clear();
279
280        // Step 8.
281        let global = event_source.global();
282        let event_source = self.event_source.clone();
283        let event = Trusted::new(&*event);
284        global.task_manager().remote_event_task_source().queue(
285            task!(dispatch_the_event_source_event: move || {
286                let event_source = event_source.root();
287                if event_source.ready_state.get() != ReadyState::Closed {
288                    event.root().upcast::<Event>().fire(event_source.upcast(), CanGc::note());
289                }
290            }),
291        );
292    }
293
294    /// <https://html.spec.whatwg.org/multipage/#event-stream-interpretation>
295    fn parse(&mut self, stream: Chars, can_gc: CanGc) {
296        let mut stream = stream.peekable();
297
298        while let Some(ch) = stream.next() {
299            match (ch, &self.parser_state) {
300                (':', &ParserState::Eol) => self.parser_state = ParserState::Comment,
301                (':', &ParserState::Field) => {
302                    self.parser_state = ParserState::Value;
303                    if let Some(&' ') = stream.peek() {
304                        stream.next();
305                    }
306                },
307
308                ('\n', &ParserState::Value) => {
309                    self.parser_state = ParserState::Eol;
310                    self.process_field();
311                },
312                ('\r', &ParserState::Value) => {
313                    if let Some(&'\n') = stream.peek() {
314                        continue;
315                    }
316                    self.parser_state = ParserState::Eol;
317                    self.process_field();
318                },
319
320                ('\n', &ParserState::Field) => {
321                    self.parser_state = ParserState::Eol;
322                    self.process_field();
323                },
324                ('\r', &ParserState::Field) => {
325                    if let Some(&'\n') = stream.peek() {
326                        continue;
327                    }
328                    self.parser_state = ParserState::Eol;
329                    self.process_field();
330                },
331
332                ('\n', &ParserState::Eol) => self.dispatch_event(can_gc),
333                ('\r', &ParserState::Eol) => {
334                    if let Some(&'\n') = stream.peek() {
335                        continue;
336                    }
337                    self.dispatch_event(can_gc);
338                },
339
340                ('\n', &ParserState::Comment) => self.parser_state = ParserState::Eol,
341                ('\r', &ParserState::Comment) => {
342                    if let Some(&'\n') = stream.peek() {
343                        continue;
344                    }
345                    self.parser_state = ParserState::Eol;
346                },
347
348                (_, &ParserState::Field) => self.field.push(ch),
349                (_, &ParserState::Value) => self.value.push(ch),
350                (_, &ParserState::Eol) => {
351                    self.parser_state = ParserState::Field;
352                    self.field.push(ch);
353                },
354                (_, &ParserState::Comment) => (),
355            }
356        }
357    }
358}
359
360impl FetchResponseListener for EventSourceContext {
361    fn should_invoke(&self) -> bool {
362        self.event_source.root().generation_id.get() == self.gen_id
363    }
364
365    fn process_request_body(&mut self, _: RequestId) {
366        // TODO
367    }
368
369    fn process_request_eof(&mut self, _: RequestId) {
370        // TODO
371    }
372
373    fn process_response(&mut self, _: RequestId, metadata: Result<FetchMetadata, NetworkError>) {
374        match metadata {
375            Ok(fm) => {
376                let meta = match fm {
377                    FetchMetadata::Unfiltered(m) => m,
378                    FetchMetadata::Filtered { unsafe_, filtered } => match filtered {
379                        FilteredMetadata::Opaque | FilteredMetadata::OpaqueRedirect(_) => {
380                            return self.fail_the_connection();
381                        },
382                        _ => unsafe_,
383                    },
384                };
385                // Step 15.3 if res's status is not 200, or if res's `Content-Type` is not
386                // `text/event-stream`, then fail the connection.
387                if meta.status.code() != StatusCode::OK {
388                    return self.fail_the_connection();
389                }
390                let mime = match meta.content_type {
391                    None => return self.fail_the_connection(),
392                    Some(ct) => <ContentType as Into<Mime>>::into(ct.into_inner()),
393                };
394                if (mime.type_(), mime.subtype()) != (mime::TEXT, mime::EVENT_STREAM) {
395                    return self.fail_the_connection();
396                }
397                self.origin = meta.final_url.origin().ascii_serialization();
398                // Step 15.4 announce the connection and interpret res's body line by line.
399                self.announce_the_connection();
400            },
401            Err(error) => {
402                // Step 15.2 if res is a network error, then reestablish the connection, unless
403                // the user agent knows that to be futile, in which case the user agent may
404                // fail the connection.
405                if error.is_permanent_failure() {
406                    self.fail_the_connection()
407                } else {
408                    self.reestablish_the_connection()
409                }
410            },
411        }
412    }
413
414    fn process_response_chunk(&mut self, _: RequestId, chunk: Vec<u8>) {
415        let mut input = &*chunk;
416        if let Some(mut incomplete) = self.incomplete_utf8.take() {
417            match incomplete.try_complete(input) {
418                None => return,
419                Some((result, remaining_input)) => {
420                    self.parse(result.unwrap_or("\u{FFFD}").chars(), CanGc::note());
421                    input = remaining_input;
422                },
423            }
424        }
425
426        while !input.is_empty() {
427            match utf8::decode(input) {
428                Ok(s) => {
429                    self.parse(s.chars(), CanGc::note());
430                    return;
431                },
432                Err(utf8::DecodeError::Invalid {
433                    valid_prefix,
434                    remaining_input,
435                    ..
436                }) => {
437                    self.parse(valid_prefix.chars(), CanGc::note());
438                    self.parse("\u{FFFD}".chars(), CanGc::note());
439                    input = remaining_input;
440                },
441                Err(utf8::DecodeError::Incomplete {
442                    valid_prefix,
443                    incomplete_suffix,
444                }) => {
445                    self.parse(valid_prefix.chars(), CanGc::note());
446                    self.incomplete_utf8 = Some(incomplete_suffix);
447                    return;
448                },
449            }
450        }
451    }
452
453    fn process_response_eof(
454        mut self,
455        _: RequestId,
456        response: Result<(), NetworkError>,
457        timing: ResourceFetchTiming,
458    ) {
459        if self.incomplete_utf8.take().is_some() {
460            self.parse("\u{FFFD}".chars(), CanGc::note());
461        }
462        if response.is_ok() {
463            self.reestablish_the_connection();
464        }
465
466        network_listener::submit_timing(&self, &response, &timing, CanGc::note());
467    }
468
469    fn process_csp_violations(&mut self, _request_id: RequestId, violations: Vec<Violation>) {
470        let global = &self.resource_timing_global();
471        global.report_csp_violations(violations, None, None);
472    }
473}
474
475impl ResourceTimingListener for EventSourceContext {
476    fn resource_timing_information(&self) -> (InitiatorType, ServoUrl) {
477        (InitiatorType::Other, self.event_source.root().url().clone())
478    }
479
480    fn resource_timing_global(&self) -> DomRoot<GlobalScope> {
481        self.event_source.root().global()
482    }
483}
484
485impl EventSource {
486    fn new_inherited(url: ServoUrl, with_credentials: bool) -> EventSource {
487        EventSource {
488            eventtarget: EventTarget::new_inherited(),
489            url,
490            request: DomRefCell::new(None),
491            last_event_id: DomRefCell::new(DOMString::from("")),
492            reconnection_time: Cell::new(DEFAULT_RECONNECTION_TIME),
493            generation_id: Cell::new(GenerationId(0)),
494
495            ready_state: Cell::new(ReadyState::Connecting),
496            with_credentials,
497            droppable: DroppableEventSource::new(DomRefCell::new(Default::default())),
498        }
499    }
500
501    fn new(
502        global: &GlobalScope,
503        proto: Option<HandleObject>,
504        url: ServoUrl,
505        with_credentials: bool,
506        can_gc: CanGc,
507    ) -> DomRoot<EventSource> {
508        reflect_dom_object_with_proto(
509            Box::new(EventSource::new_inherited(url, with_credentials)),
510            global,
511            proto,
512            can_gc,
513        )
514    }
515
516    // https://html.spec.whatwg.org/multipage/#sse-processing-model:fail-the-connection-3
517    pub(crate) fn cancel(&self) {
518        self.droppable.cancel();
519        self.fail_the_connection();
520    }
521
522    /// <https://html.spec.whatwg.org/multipage/#fail-the-connection>
523    pub(crate) fn fail_the_connection(&self) {
524        let global = self.global();
525        let event_source = Trusted::new(self);
526        global.task_manager().remote_event_task_source().queue(
527            task!(fail_the_event_source_connection: move || {
528                let event_source = event_source.root();
529                if event_source.ready_state.get() != ReadyState::Closed {
530                    event_source.ready_state.set(ReadyState::Closed);
531                    event_source.upcast::<EventTarget>().fire_event(atom!("error"), CanGc::note());
532                }
533            }),
534        );
535    }
536
537    pub(crate) fn request(&self) -> RequestBuilder {
538        self.request.borrow().clone().unwrap()
539    }
540
541    pub(crate) fn url(&self) -> &ServoUrl {
542        &self.url
543    }
544}
545
546impl EventSourceMethods<crate::DomTypeHolder> for EventSource {
547    /// <https://html.spec.whatwg.org/multipage/#dom-eventsource>
548    fn Constructor(
549        global: &GlobalScope,
550        proto: Option<HandleObject>,
551        can_gc: CanGc,
552        url: DOMString,
553        event_source_init: &EventSourceInit,
554    ) -> Fallible<DomRoot<EventSource>> {
555        // TODO: Step 2 relevant settings object
556        // Step 3 Let urlRecord be the result of encoding-parsing a URL given url,
557        // relative to settings.
558        let base_url = global.api_base_url();
559        let url_record = match base_url.join(&url.str()) {
560            Ok(u) => u,
561            // Step 4 If urlRecord is failure, then throw a "SyntaxError" DOMException.
562            Err(_) => return Err(Error::Syntax(None)),
563        };
564        // Step 1 Let ev be a new EventSource object.
565        let event_source = EventSource::new(
566            global,
567            proto,
568            // Step 5 Set ev's url to urlRecord.
569            url_record.clone(),
570            event_source_init.withCredentials,
571            can_gc,
572        );
573        global.track_event_source(&event_source);
574        let cors_attribute_state = if event_source_init.withCredentials {
575            // Step 7 If the value of eventSourceInitDict's withCredentials member is true,
576            // then set corsAttributeState to Use Credentials and set ev's withCredentials
577            // attribute to true.
578            CorsSettings::UseCredentials
579        } else {
580            // Step 6 Let corsAttributeState be Anonymous.
581            CorsSettings::Anonymous
582        };
583        // Step 8 Let request be the result of creating a potential-CORS request
584        // given urlRecord, the empty string, and corsAttributeState.
585        // TODO: Step 9 set request's client settings
586        let mut request = create_a_potential_cors_request(
587            global.webview_id(),
588            url_record,
589            Destination::None,
590            Some(cors_attribute_state),
591            Some(true),
592            global.get_referrer(),
593        )
594        .with_global_scope(global);
595
596        // Step 10 User agents may set (`Accept`, `text/event-stream`) in request's header list.
597        // TODO(eijebong): Replace once typed headers allow it
598        request.headers.insert(
599            header::ACCEPT,
600            HeaderValue::from_static("text/event-stream"),
601        );
602        // Step 11 Set request's cache mode to "no-store".
603        request.cache_mode = CacheMode::NoStore;
604        // Step 13 Set ev's request to request.
605        *event_source.request.borrow_mut() = Some(request.clone());
606        // Step 14 Let processEventSourceEndOfBody given response res be the following step:
607        // if res is not a network error, then reestablish the connection.
608
609        event_source.droppable.set_canceller(FetchCanceller::new(
610            request.id,
611            false,
612            global.core_resource_thread(),
613        ));
614
615        let context = EventSourceContext {
616            incomplete_utf8: None,
617            event_source: Trusted::new(&event_source),
618            gen_id: event_source.generation_id.get(),
619            parser_state: ParserState::Eol,
620            field: String::new(),
621            value: String::new(),
622            origin: String::new(),
623
624            event_type: String::new(),
625            data: String::new(),
626            last_event_id: String::new(),
627        };
628
629        let task_source = global.task_manager().networking_task_source().into();
630        global.fetch(request, context, task_source);
631
632        // Step 16 Return ev.
633        Ok(event_source)
634    }
635
636    // https://html.spec.whatwg.org/multipage/#handler-eventsource-onopen
637    event_handler!(open, GetOnopen, SetOnopen);
638
639    // https://html.spec.whatwg.org/multipage/#handler-eventsource-onmessage
640    event_handler!(message, GetOnmessage, SetOnmessage);
641
642    // https://html.spec.whatwg.org/multipage/#handler-eventsource-onerror
643    event_handler!(error, GetOnerror, SetOnerror);
644
645    /// <https://html.spec.whatwg.org/multipage/#dom-eventsource-url>
646    fn Url(&self) -> DOMString {
647        DOMString::from(self.url.as_str())
648    }
649
650    /// <https://html.spec.whatwg.org/multipage/#dom-eventsource-withcredentials>
651    fn WithCredentials(&self) -> bool {
652        self.with_credentials
653    }
654
655    /// <https://html.spec.whatwg.org/multipage/#dom-eventsource-readystate>
656    fn ReadyState(&self) -> u16 {
657        self.ready_state.get() as u16
658    }
659
660    /// <https://html.spec.whatwg.org/multipage/#dom-eventsource-close>
661    fn Close(&self) {
662        let GenerationId(prev_id) = self.generation_id.get();
663        self.generation_id.set(GenerationId(prev_id + 1));
664        self.droppable.cancel();
665        self.ready_state.set(ReadyState::Closed);
666    }
667}
668
669#[derive(JSTraceable, MallocSizeOf)]
670pub(crate) struct EventSourceTimeoutCallback {
671    #[ignore_malloc_size_of = "Because it is non-owning"]
672    event_source: Trusted<EventSource>,
673    #[no_trace]
674    event_source_context: EventSourceContext,
675}
676
677impl EventSourceTimeoutCallback {
678    /// <https://html.spec.whatwg.org/multipage/#reestablish-the-connection>
679    pub(crate) fn invoke(self) {
680        let event_source = self.event_source.root();
681        let global = event_source.global();
682
683        // Step 5.1: If the EventSource object's readyState attribute is not set to CONNECTING, then return.
684        if event_source.ready_state.get() != ReadyState::Connecting {
685            return;
686        }
687
688        // Step 5.2: Let request be the EventSource object's request.
689        let mut request = event_source.request();
690
691        // Step 5.3: If the EventSource object's last event ID string is not the empty string, then:
692        //  - Let lastEventIDValue be the EventSource object's last event ID string, encoded as UTF-8.
693        //  - Set (`Last-Event-ID`, lastEventIDValue) in request's header list.
694        if !event_source.last_event_id.borrow().is_empty() {
695            // TODO(eijebong): Change this once typed header support custom values
696            request.headers.insert(
697                HeaderName::from_static("last-event-id"),
698                HeaderValue::from_str(&String::from(event_source.last_event_id.borrow().clone()))
699                    .unwrap(),
700            );
701        }
702
703        // Step 5.4: Fetch request and process the response obtained in this fashion, if
704        // any, as described earlier in this section.
705        let task_source = global.task_manager().networking_task_source().into();
706        global.fetch(request, self.event_source_context, task_source);
707    }
708}