script/dom/
eventsource.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
4
5use std::cell::Cell;
6use std::mem;
7use std::str::{Chars, FromStr};
8use std::time::Duration;
9
10use dom_struct::dom_struct;
11use headers::ContentType;
12use http::StatusCode;
13use http::header::{self, HeaderName, HeaderValue};
14use js::jsval::UndefinedValue;
15use js::rust::HandleObject;
16use mime::{self, Mime};
17use net_traits::request::{CacheMode, CorsSettings, Destination, RequestBuilder, RequestId};
18use net_traits::{FetchMetadata, FilteredMetadata, NetworkError, ResourceFetchTiming};
19use script_bindings::conversions::SafeToJSValConvertible;
20use servo_url::ServoUrl;
21use stylo_atoms::Atom;
22
23use crate::dom::bindings::cell::DomRefCell;
24use crate::dom::bindings::codegen::Bindings::EventSourceBinding::{
25    EventSourceInit, EventSourceMethods,
26};
27use crate::dom::bindings::error::{Error, Fallible};
28use crate::dom::bindings::inheritance::Castable;
29use crate::dom::bindings::refcounted::Trusted;
30use crate::dom::bindings::reflector::{DomGlobal, reflect_dom_object_with_proto};
31use crate::dom::bindings::root::DomRoot;
32use crate::dom::bindings::str::DOMString;
33use crate::dom::csp::{GlobalCspReporting, Violation};
34use crate::dom::event::Event;
35use crate::dom::eventtarget::EventTarget;
36use crate::dom::globalscope::GlobalScope;
37use crate::dom::messageevent::MessageEvent;
38use crate::dom::performance::performanceresourcetiming::InitiatorType;
39use crate::fetch::{FetchCanceller, RequestWithGlobalScope, create_a_potential_cors_request};
40use crate::network_listener::{self, FetchResponseListener, ResourceTimingListener};
41use crate::realms::enter_realm;
42use crate::script_runtime::CanGc;
43use crate::timers::OneshotTimerCallback;
44
45const DEFAULT_RECONNECTION_TIME: Duration = Duration::from_millis(5000);
46
47#[derive(Clone, Copy, Debug, JSTraceable, MallocSizeOf, PartialEq)]
48struct GenerationId(u32);
49
50#[derive(Clone, Copy, Debug, JSTraceable, MallocSizeOf, PartialEq)]
51/// <https://html.spec.whatwg.org/multipage/#dom-eventsource-readystate>
52enum ReadyState {
53    Connecting = 0,
54    Open = 1,
55    Closed = 2,
56}
57
58#[derive(JSTraceable, MallocSizeOf)]
59struct DroppableEventSource {
60    canceller: DomRefCell<FetchCanceller>,
61}
62
63impl DroppableEventSource {
64    pub(crate) fn new(canceller: DomRefCell<FetchCanceller>) -> Self {
65        DroppableEventSource { canceller }
66    }
67
68    pub(crate) fn cancel(&self) {
69        self.canceller.borrow_mut().abort();
70    }
71
72    pub(crate) fn set_canceller(&self, data: FetchCanceller) {
73        *self.canceller.borrow_mut() = data;
74    }
75}
76
77// https://html.spec.whatwg.org/multipage/#garbage-collection-2
78impl Drop for DroppableEventSource {
79    fn drop(&mut self) {
80        // If an EventSource object is garbage collected while its connection is still open,
81        // the user agent must abort any instance of the fetch algorithm opened by this EventSource.
82        self.cancel();
83    }
84}
85
86#[dom_struct]
87pub(crate) struct EventSource {
88    eventtarget: EventTarget,
89    #[no_trace]
90    url: ServoUrl,
91    #[no_trace]
92    request: DomRefCell<Option<RequestBuilder>>,
93    last_event_id: DomRefCell<DOMString>,
94    reconnection_time: Cell<Duration>,
95    generation_id: Cell<GenerationId>,
96
97    ready_state: Cell<ReadyState>,
98    with_credentials: bool,
99    droppable: DroppableEventSource,
100}
101
102#[derive(Clone, MallocSizeOf)]
103enum ParserState {
104    Field,
105    Comment,
106    Value,
107    Eol,
108}
109
110#[derive(Clone, MallocSizeOf)]
111struct EventSourceContext {
112    incomplete_utf8: Option<utf8::Incomplete>,
113    event_source: Trusted<EventSource>,
114    gen_id: GenerationId,
115    parser_state: ParserState,
116    field: String,
117    value: String,
118    origin: String,
119    event_type: String,
120    data: String,
121    last_event_id: String,
122}
123
124impl EventSourceContext {
125    /// <https://html.spec.whatwg.org/multipage/#announce-the-connection>
126    fn announce_the_connection(&self) {
127        let event_source = self.event_source.root();
128        if self.gen_id != event_source.generation_id.get() {
129            return;
130        }
131        let global = event_source.global();
132        let event_source = self.event_source.clone();
133        global.task_manager().remote_event_task_source().queue(
134            task!(announce_the_event_source_connection: move || {
135                let event_source = event_source.root();
136                if event_source.ready_state.get() != ReadyState::Closed {
137                    event_source.ready_state.set(ReadyState::Open);
138                    event_source.upcast::<EventTarget>().fire_event(atom!("open"), CanGc::note());
139                }
140            }),
141        );
142    }
143
144    /// <https://html.spec.whatwg.org/multipage/#fail-the-connection>
145    fn fail_the_connection(&self) {
146        let event_source = self.event_source.root();
147        if self.gen_id != event_source.generation_id.get() {
148            return;
149        }
150        event_source.fail_the_connection();
151    }
152
153    /// <https://html.spec.whatwg.org/multipage/#reestablish-the-connection>
154    fn reestablish_the_connection(&self) {
155        let event_source = self.event_source.root();
156
157        if self.gen_id != event_source.generation_id.get() {
158            return;
159        }
160
161        let trusted_event_source = self.event_source.clone();
162        let global = event_source.global();
163        let event_source_context = EventSourceContext {
164            incomplete_utf8: None,
165            event_source: self.event_source.clone(),
166            gen_id: self.gen_id,
167            parser_state: ParserState::Eol,
168            field: String::new(),
169            value: String::new(),
170            origin: self.origin.clone(),
171            event_type: String::new(),
172            data: String::new(),
173            last_event_id: String::from(event_source.last_event_id.borrow().clone()),
174        };
175        global.task_manager().remote_event_task_source().queue(
176            task!(reestablish_the_event_source_onnection: move || {
177                let event_source = trusted_event_source.root();
178
179                // Step 1.1.
180                if event_source.ready_state.get() == ReadyState::Closed {
181                    return;
182                }
183
184                // Step 1.2.
185                event_source.ready_state.set(ReadyState::Connecting);
186
187                // Step 1.3.
188                event_source.upcast::<EventTarget>().fire_event(atom!("error"), CanGc::note());
189
190                // Step 2.
191                let duration = event_source.reconnection_time.get();
192
193                // Step 3.
194                // TODO: Optionally wait some more.
195
196                // Steps 4-5.
197                let callback = OneshotTimerCallback::EventSourceTimeout(
198                    EventSourceTimeoutCallback {
199                        event_source: trusted_event_source,
200                        event_source_context,
201                    }
202                );
203                event_source.global().schedule_callback(callback, duration);
204            }),
205        );
206    }
207
208    /// <https://html.spec.whatwg.org/multipage/#processField>
209    fn process_field(&mut self) {
210        match &*self.field {
211            "event" => mem::swap(&mut self.event_type, &mut self.value),
212            "data" => {
213                self.data.push_str(&self.value);
214                self.data.push('\n');
215            },
216            "id" if !self.value.contains('\0') => {
217                mem::swap(&mut self.last_event_id, &mut self.value);
218            },
219            "retry" => {
220                if let Ok(time) = u64::from_str(&self.value) {
221                    self.event_source
222                        .root()
223                        .reconnection_time
224                        .set(Duration::from_millis(time));
225                }
226            },
227            _ => (),
228        }
229
230        self.field.clear();
231        self.value.clear();
232    }
233
234    /// <https://html.spec.whatwg.org/multipage/#dispatchMessage>
235    fn dispatch_event(&mut self, can_gc: CanGc) {
236        let event_source = self.event_source.root();
237        // Step 1
238        *event_source.last_event_id.borrow_mut() = DOMString::from(self.last_event_id.clone());
239        // Step 2
240        if self.data.is_empty() {
241            self.data.clear();
242            self.event_type.clear();
243            return;
244        }
245        // Step 3
246        if let Some(last) = self.data.pop() {
247            if last != '\n' {
248                self.data.push(last);
249            }
250        }
251        // Step 6
252        let type_ = if !self.event_type.is_empty() {
253            Atom::from(self.event_type.clone())
254        } else {
255            atom!("message")
256        };
257        // Steps 4-5
258        let event = {
259            let _ac = enter_realm(&*event_source);
260            rooted!(in(*GlobalScope::get_cx()) let mut data = UndefinedValue());
261            self.data
262                .safe_to_jsval(GlobalScope::get_cx(), data.handle_mut(), can_gc);
263            MessageEvent::new(
264                &event_source.global(),
265                type_,
266                false,
267                false,
268                data.handle(),
269                DOMString::from(self.origin.clone()),
270                None,
271                event_source.last_event_id.borrow().clone(),
272                Vec::with_capacity(0),
273                can_gc,
274            )
275        };
276        // Step 7
277        self.event_type.clear();
278        self.data.clear();
279
280        // Step 8.
281        let global = event_source.global();
282        let event_source = self.event_source.clone();
283        let event = Trusted::new(&*event);
284        global.task_manager().remote_event_task_source().queue(
285            task!(dispatch_the_event_source_event: move || {
286                let event_source = event_source.root();
287                if event_source.ready_state.get() != ReadyState::Closed {
288                    event.root().upcast::<Event>().fire(event_source.upcast(), CanGc::note());
289                }
290            }),
291        );
292    }
293
294    /// <https://html.spec.whatwg.org/multipage/#event-stream-interpretation>
295    fn parse(&mut self, stream: Chars, can_gc: CanGc) {
296        let mut stream = stream.peekable();
297
298        while let Some(ch) = stream.next() {
299            match (ch, &self.parser_state) {
300                (':', &ParserState::Eol) => self.parser_state = ParserState::Comment,
301                (':', &ParserState::Field) => {
302                    self.parser_state = ParserState::Value;
303                    if let Some(&' ') = stream.peek() {
304                        stream.next();
305                    }
306                },
307
308                ('\n', &ParserState::Value) => {
309                    self.parser_state = ParserState::Eol;
310                    self.process_field();
311                },
312                ('\r', &ParserState::Value) => {
313                    if let Some(&'\n') = stream.peek() {
314                        continue;
315                    }
316                    self.parser_state = ParserState::Eol;
317                    self.process_field();
318                },
319
320                ('\n', &ParserState::Field) => {
321                    self.parser_state = ParserState::Eol;
322                    self.process_field();
323                },
324                ('\r', &ParserState::Field) => {
325                    if let Some(&'\n') = stream.peek() {
326                        continue;
327                    }
328                    self.parser_state = ParserState::Eol;
329                    self.process_field();
330                },
331
332                ('\n', &ParserState::Eol) => self.dispatch_event(can_gc),
333                ('\r', &ParserState::Eol) => {
334                    if let Some(&'\n') = stream.peek() {
335                        continue;
336                    }
337                    self.dispatch_event(can_gc);
338                },
339
340                ('\n', &ParserState::Comment) => self.parser_state = ParserState::Eol,
341                ('\r', &ParserState::Comment) => {
342                    if let Some(&'\n') = stream.peek() {
343                        continue;
344                    }
345                    self.parser_state = ParserState::Eol;
346                },
347
348                (_, &ParserState::Field) => self.field.push(ch),
349                (_, &ParserState::Value) => self.value.push(ch),
350                (_, &ParserState::Eol) => {
351                    self.parser_state = ParserState::Field;
352                    self.field.push(ch);
353                },
354                (_, &ParserState::Comment) => (),
355            }
356        }
357    }
358}
359
360impl FetchResponseListener for EventSourceContext {
361    fn should_invoke(&self) -> bool {
362        self.event_source.root().generation_id.get() == self.gen_id
363    }
364
365    fn process_request_body(&mut self, _: RequestId) {
366        // TODO
367    }
368
369    fn process_request_eof(&mut self, _: RequestId) {
370        // TODO
371    }
372
373    fn process_response(&mut self, _: RequestId, metadata: Result<FetchMetadata, NetworkError>) {
374        match metadata {
375            Ok(fm) => {
376                let meta = match fm {
377                    FetchMetadata::Unfiltered(m) => m,
378                    FetchMetadata::Filtered { unsafe_, filtered } => match filtered {
379                        FilteredMetadata::Opaque | FilteredMetadata::OpaqueRedirect(_) => {
380                            return self.fail_the_connection();
381                        },
382                        _ => unsafe_,
383                    },
384                };
385                // Step 15.3 if res's status is not 200, or if res's `Content-Type` is not
386                // `text/event-stream`, then fail the connection.
387                if meta.status.code() != StatusCode::OK {
388                    return self.fail_the_connection();
389                }
390                let mime = match meta.content_type {
391                    None => return self.fail_the_connection(),
392                    Some(ct) => <ContentType as Into<Mime>>::into(ct.into_inner()),
393                };
394                if (mime.type_(), mime.subtype()) != (mime::TEXT, mime::EVENT_STREAM) {
395                    return self.fail_the_connection();
396                }
397                self.origin = meta.final_url.origin().ascii_serialization();
398                // Step 15.4 announce the connection and interpret res's body line by line.
399                self.announce_the_connection();
400            },
401            Err(error) => {
402                // Step 15.2 if res is a network error, then reestablish the connection, unless
403                // the user agent knows that to be futile, in which case the user agent may
404                // fail the connection.
405                if error.is_permanent_failure() {
406                    self.fail_the_connection()
407                } else {
408                    self.reestablish_the_connection()
409                }
410            },
411        }
412    }
413
414    fn process_response_chunk(&mut self, _: RequestId, chunk: Vec<u8>) {
415        let mut input = &*chunk;
416        if let Some(mut incomplete) = self.incomplete_utf8.take() {
417            match incomplete.try_complete(input) {
418                None => return,
419                Some((result, remaining_input)) => {
420                    self.parse(result.unwrap_or("\u{FFFD}").chars(), CanGc::note());
421                    input = remaining_input;
422                },
423            }
424        }
425
426        while !input.is_empty() {
427            match utf8::decode(input) {
428                Ok(s) => {
429                    self.parse(s.chars(), CanGc::note());
430                    return;
431                },
432                Err(utf8::DecodeError::Invalid {
433                    valid_prefix,
434                    remaining_input,
435                    ..
436                }) => {
437                    self.parse(valid_prefix.chars(), CanGc::note());
438                    self.parse("\u{FFFD}".chars(), CanGc::note());
439                    input = remaining_input;
440                },
441                Err(utf8::DecodeError::Incomplete {
442                    valid_prefix,
443                    incomplete_suffix,
444                }) => {
445                    self.parse(valid_prefix.chars(), CanGc::note());
446                    self.incomplete_utf8 = Some(incomplete_suffix);
447                    return;
448                },
449            }
450        }
451    }
452
453    fn process_response_eof(
454        mut self,
455        cx: &mut js::context::JSContext,
456        _: RequestId,
457        response: Result<(), NetworkError>,
458        timing: ResourceFetchTiming,
459    ) {
460        if self.incomplete_utf8.take().is_some() {
461            self.parse("\u{FFFD}".chars(), CanGc::from_cx(cx));
462        }
463        if response.is_ok() {
464            self.reestablish_the_connection();
465        }
466
467        network_listener::submit_timing(&self, &response, &timing, CanGc::from_cx(cx));
468    }
469
470    fn process_csp_violations(&mut self, _request_id: RequestId, violations: Vec<Violation>) {
471        let global = &self.resource_timing_global();
472        global.report_csp_violations(violations, None, None);
473    }
474}
475
476impl ResourceTimingListener for EventSourceContext {
477    fn resource_timing_information(&self) -> (InitiatorType, ServoUrl) {
478        (InitiatorType::Other, self.event_source.root().url().clone())
479    }
480
481    fn resource_timing_global(&self) -> DomRoot<GlobalScope> {
482        self.event_source.root().global()
483    }
484}
485
486impl EventSource {
487    fn new_inherited(url: ServoUrl, with_credentials: bool) -> EventSource {
488        EventSource {
489            eventtarget: EventTarget::new_inherited(),
490            url,
491            request: DomRefCell::new(None),
492            last_event_id: DomRefCell::new(DOMString::from("")),
493            reconnection_time: Cell::new(DEFAULT_RECONNECTION_TIME),
494            generation_id: Cell::new(GenerationId(0)),
495
496            ready_state: Cell::new(ReadyState::Connecting),
497            with_credentials,
498            droppable: DroppableEventSource::new(DomRefCell::new(Default::default())),
499        }
500    }
501
502    fn new(
503        global: &GlobalScope,
504        proto: Option<HandleObject>,
505        url: ServoUrl,
506        with_credentials: bool,
507        can_gc: CanGc,
508    ) -> DomRoot<EventSource> {
509        reflect_dom_object_with_proto(
510            Box::new(EventSource::new_inherited(url, with_credentials)),
511            global,
512            proto,
513            can_gc,
514        )
515    }
516
517    // https://html.spec.whatwg.org/multipage/#sse-processing-model:fail-the-connection-3
518    pub(crate) fn cancel(&self) {
519        self.droppable.cancel();
520        self.fail_the_connection();
521    }
522
523    /// <https://html.spec.whatwg.org/multipage/#fail-the-connection>
524    pub(crate) fn fail_the_connection(&self) {
525        let global = self.global();
526        let event_source = Trusted::new(self);
527        global.task_manager().remote_event_task_source().queue(
528            task!(fail_the_event_source_connection: move || {
529                let event_source = event_source.root();
530                if event_source.ready_state.get() != ReadyState::Closed {
531                    event_source.ready_state.set(ReadyState::Closed);
532                    event_source.upcast::<EventTarget>().fire_event(atom!("error"), CanGc::note());
533                }
534            }),
535        );
536    }
537
538    pub(crate) fn request(&self) -> RequestBuilder {
539        self.request.borrow().clone().unwrap()
540    }
541
542    pub(crate) fn url(&self) -> &ServoUrl {
543        &self.url
544    }
545}
546
547impl EventSourceMethods<crate::DomTypeHolder> for EventSource {
548    /// <https://html.spec.whatwg.org/multipage/#dom-eventsource>
549    fn Constructor(
550        global: &GlobalScope,
551        proto: Option<HandleObject>,
552        can_gc: CanGc,
553        url: DOMString,
554        event_source_init: &EventSourceInit,
555    ) -> Fallible<DomRoot<EventSource>> {
556        // TODO: Step 2 relevant settings object
557        // Step 3 Let urlRecord be the result of encoding-parsing a URL given url,
558        // relative to settings.
559        let base_url = global.api_base_url();
560        let url_record = match base_url.join(&url.str()) {
561            Ok(u) => u,
562            // Step 4 If urlRecord is failure, then throw a "SyntaxError" DOMException.
563            Err(_) => return Err(Error::Syntax(None)),
564        };
565        // Step 1 Let ev be a new EventSource object.
566        let event_source = EventSource::new(
567            global,
568            proto,
569            // Step 5 Set ev's url to urlRecord.
570            url_record.clone(),
571            event_source_init.withCredentials,
572            can_gc,
573        );
574        global.track_event_source(&event_source);
575        let cors_attribute_state = if event_source_init.withCredentials {
576            // Step 7 If the value of eventSourceInitDict's withCredentials member is true,
577            // then set corsAttributeState to Use Credentials and set ev's withCredentials
578            // attribute to true.
579            CorsSettings::UseCredentials
580        } else {
581            // Step 6 Let corsAttributeState be Anonymous.
582            CorsSettings::Anonymous
583        };
584        // Step 8 Let request be the result of creating a potential-CORS request
585        // given urlRecord, the empty string, and corsAttributeState.
586        // TODO: Step 9 set request's client settings
587        let mut request = create_a_potential_cors_request(
588            global.webview_id(),
589            url_record,
590            Destination::None,
591            Some(cors_attribute_state),
592            Some(true),
593            global.get_referrer(),
594        )
595        .with_global_scope(global);
596
597        // Step 10 User agents may set (`Accept`, `text/event-stream`) in request's header list.
598        // TODO(eijebong): Replace once typed headers allow it
599        request.headers.insert(
600            header::ACCEPT,
601            HeaderValue::from_static("text/event-stream"),
602        );
603        // Step 11 Set request's cache mode to "no-store".
604        request.cache_mode = CacheMode::NoStore;
605        // Step 13 Set ev's request to request.
606        *event_source.request.borrow_mut() = Some(request.clone());
607        // Step 14 Let processEventSourceEndOfBody given response res be the following step:
608        // if res is not a network error, then reestablish the connection.
609
610        event_source.droppable.set_canceller(FetchCanceller::new(
611            request.id,
612            false,
613            global.core_resource_thread(),
614        ));
615
616        let context = EventSourceContext {
617            incomplete_utf8: None,
618            event_source: Trusted::new(&event_source),
619            gen_id: event_source.generation_id.get(),
620            parser_state: ParserState::Eol,
621            field: String::new(),
622            value: String::new(),
623            origin: String::new(),
624
625            event_type: String::new(),
626            data: String::new(),
627            last_event_id: String::new(),
628        };
629
630        let task_source = global.task_manager().networking_task_source().into();
631        global.fetch(request, context, task_source);
632
633        // Step 16 Return ev.
634        Ok(event_source)
635    }
636
637    // https://html.spec.whatwg.org/multipage/#handler-eventsource-onopen
638    event_handler!(open, GetOnopen, SetOnopen);
639
640    // https://html.spec.whatwg.org/multipage/#handler-eventsource-onmessage
641    event_handler!(message, GetOnmessage, SetOnmessage);
642
643    // https://html.spec.whatwg.org/multipage/#handler-eventsource-onerror
644    event_handler!(error, GetOnerror, SetOnerror);
645
646    /// <https://html.spec.whatwg.org/multipage/#dom-eventsource-url>
647    fn Url(&self) -> DOMString {
648        DOMString::from(self.url.as_str())
649    }
650
651    /// <https://html.spec.whatwg.org/multipage/#dom-eventsource-withcredentials>
652    fn WithCredentials(&self) -> bool {
653        self.with_credentials
654    }
655
656    /// <https://html.spec.whatwg.org/multipage/#dom-eventsource-readystate>
657    fn ReadyState(&self) -> u16 {
658        self.ready_state.get() as u16
659    }
660
661    /// <https://html.spec.whatwg.org/multipage/#dom-eventsource-close>
662    fn Close(&self) {
663        let GenerationId(prev_id) = self.generation_id.get();
664        self.generation_id.set(GenerationId(prev_id + 1));
665        self.droppable.cancel();
666        self.ready_state.set(ReadyState::Closed);
667    }
668}
669
670#[derive(JSTraceable, MallocSizeOf)]
671pub(crate) struct EventSourceTimeoutCallback {
672    #[ignore_malloc_size_of = "Because it is non-owning"]
673    event_source: Trusted<EventSource>,
674    #[no_trace]
675    event_source_context: EventSourceContext,
676}
677
678impl EventSourceTimeoutCallback {
679    /// <https://html.spec.whatwg.org/multipage/#reestablish-the-connection>
680    pub(crate) fn invoke(self) {
681        let event_source = self.event_source.root();
682        let global = event_source.global();
683
684        // Step 5.1: If the EventSource object's readyState attribute is not set to CONNECTING, then return.
685        if event_source.ready_state.get() != ReadyState::Connecting {
686            return;
687        }
688
689        // Step 5.2: Let request be the EventSource object's request.
690        let mut request = event_source.request();
691
692        // Step 5.3: If the EventSource object's last event ID string is not the empty string, then:
693        //  - Let lastEventIDValue be the EventSource object's last event ID string, encoded as UTF-8.
694        //  - Set (`Last-Event-ID`, lastEventIDValue) in request's header list.
695        if !event_source.last_event_id.borrow().is_empty() {
696            // TODO(eijebong): Change this once typed header support custom values
697            request.headers.insert(
698                HeaderName::from_static("last-event-id"),
699                HeaderValue::from_str(&String::from(event_source.last_event_id.borrow().clone()))
700                    .unwrap(),
701            );
702        }
703
704        // Step 5.4: Fetch request and process the response obtained in this fashion, if
705        // any, as described earlier in this section.
706        let task_source = global.task_manager().networking_task_source().into();
707        global.fetch(request, self.event_source_context, task_source);
708    }
709}