script/dom/
eventsource.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
4
5use std::cell::Cell;
6use std::mem;
7use std::str::{Chars, FromStr};
8use std::time::Duration;
9
10use dom_struct::dom_struct;
11use headers::ContentType;
12use http::StatusCode;
13use http::header::{self, HeaderName, HeaderValue};
14use js::jsval::UndefinedValue;
15use js::rust::HandleObject;
16use mime::{self, Mime};
17use net_traits::request::{CacheMode, CorsSettings, Destination, RequestBuilder, RequestId};
18use net_traits::{FetchMetadata, FilteredMetadata, NetworkError, ResourceFetchTiming};
19use script_bindings::conversions::SafeToJSValConvertible;
20use servo_url::ServoUrl;
21use stylo_atoms::Atom;
22
23use crate::dom::bindings::cell::DomRefCell;
24use crate::dom::bindings::codegen::Bindings::EventSourceBinding::{
25    EventSourceInit, EventSourceMethods,
26};
27use crate::dom::bindings::error::{Error, Fallible};
28use crate::dom::bindings::inheritance::Castable;
29use crate::dom::bindings::refcounted::Trusted;
30use crate::dom::bindings::reflector::{DomGlobal, reflect_dom_object_with_proto};
31use crate::dom::bindings::root::DomRoot;
32use crate::dom::bindings::str::DOMString;
33use crate::dom::csp::{GlobalCspReporting, Violation};
34use crate::dom::event::Event;
35use crate::dom::eventtarget::EventTarget;
36use crate::dom::globalscope::GlobalScope;
37use crate::dom::messageevent::MessageEvent;
38use crate::dom::performance::performanceresourcetiming::InitiatorType;
39use crate::fetch::{FetchCanceller, create_a_potential_cors_request};
40use crate::network_listener::{self, FetchResponseListener, ResourceTimingListener};
41use crate::realms::enter_realm;
42use crate::script_runtime::CanGc;
43use crate::timers::OneshotTimerCallback;
44
45const DEFAULT_RECONNECTION_TIME: Duration = Duration::from_millis(5000);
46
47#[derive(Clone, Copy, Debug, JSTraceable, MallocSizeOf, PartialEq)]
48struct GenerationId(u32);
49
50#[derive(Clone, Copy, Debug, JSTraceable, MallocSizeOf, PartialEq)]
51/// <https://html.spec.whatwg.org/multipage/#dom-eventsource-readystate>
52enum ReadyState {
53    Connecting = 0,
54    Open = 1,
55    Closed = 2,
56}
57
58#[derive(JSTraceable, MallocSizeOf)]
59struct DroppableEventSource {
60    canceller: DomRefCell<FetchCanceller>,
61}
62
63impl DroppableEventSource {
64    pub(crate) fn new(canceller: DomRefCell<FetchCanceller>) -> Self {
65        DroppableEventSource { canceller }
66    }
67
68    pub(crate) fn cancel(&self) {
69        self.canceller.borrow_mut().cancel();
70    }
71
72    pub(crate) fn set_canceller(&self, data: FetchCanceller) {
73        *self.canceller.borrow_mut() = data;
74    }
75}
76
77// https://html.spec.whatwg.org/multipage/#garbage-collection-2
78impl Drop for DroppableEventSource {
79    fn drop(&mut self) {
80        // If an EventSource object is garbage collected while its connection is still open,
81        // the user agent must abort any instance of the fetch algorithm opened by this EventSource.
82        self.cancel();
83    }
84}
85
86#[dom_struct]
87pub(crate) struct EventSource {
88    eventtarget: EventTarget,
89    #[no_trace]
90    url: ServoUrl,
91    #[no_trace]
92    request: DomRefCell<Option<RequestBuilder>>,
93    last_event_id: DomRefCell<DOMString>,
94    reconnection_time: Cell<Duration>,
95    generation_id: Cell<GenerationId>,
96
97    ready_state: Cell<ReadyState>,
98    with_credentials: bool,
99    droppable: DroppableEventSource,
100}
101
102#[derive(Clone, MallocSizeOf)]
103enum ParserState {
104    Field,
105    Comment,
106    Value,
107    Eol,
108}
109
110#[derive(Clone, MallocSizeOf)]
111struct EventSourceContext {
112    incomplete_utf8: Option<utf8::Incomplete>,
113    event_source: Trusted<EventSource>,
114    gen_id: GenerationId,
115    parser_state: ParserState,
116    field: String,
117    value: String,
118    origin: String,
119    event_type: String,
120    data: String,
121    last_event_id: String,
122}
123
124impl EventSourceContext {
125    /// <https://html.spec.whatwg.org/multipage/#announce-the-connection>
126    fn announce_the_connection(&self) {
127        let event_source = self.event_source.root();
128        if self.gen_id != event_source.generation_id.get() {
129            return;
130        }
131        let global = event_source.global();
132        let event_source = self.event_source.clone();
133        global.task_manager().remote_event_task_source().queue(
134            task!(announce_the_event_source_connection: move || {
135                let event_source = event_source.root();
136                if event_source.ready_state.get() != ReadyState::Closed {
137                    event_source.ready_state.set(ReadyState::Open);
138                    event_source.upcast::<EventTarget>().fire_event(atom!("open"), CanGc::note());
139                }
140            }),
141        );
142    }
143
144    /// <https://html.spec.whatwg.org/multipage/#fail-the-connection>
145    fn fail_the_connection(&self) {
146        let event_source = self.event_source.root();
147        if self.gen_id != event_source.generation_id.get() {
148            return;
149        }
150        event_source.fail_the_connection();
151    }
152
153    /// <https://html.spec.whatwg.org/multipage/#reestablish-the-connection>
154    fn reestablish_the_connection(&self) {
155        let event_source = self.event_source.root();
156
157        if self.gen_id != event_source.generation_id.get() {
158            return;
159        }
160
161        let trusted_event_source = self.event_source.clone();
162        let global = event_source.global();
163        let event_source_context = self.clone();
164        global.task_manager().remote_event_task_source().queue(
165            task!(reestablish_the_event_source_onnection: move || {
166                let event_source = trusted_event_source.root();
167
168                // Step 1.1.
169                if event_source.ready_state.get() == ReadyState::Closed {
170                    return;
171                }
172
173                // Step 1.2.
174                event_source.ready_state.set(ReadyState::Connecting);
175
176                // Step 1.3.
177                event_source.upcast::<EventTarget>().fire_event(atom!("error"), CanGc::note());
178
179                // Step 2.
180                let duration = event_source.reconnection_time.get();
181
182                // Step 3.
183                // TODO: Optionally wait some more.
184
185                // Steps 4-5.
186                let callback = OneshotTimerCallback::EventSourceTimeout(
187                    EventSourceTimeoutCallback {
188                        event_source: trusted_event_source,
189                        event_source_context,
190                    }
191                );
192                event_source.global().schedule_callback(callback, duration);
193            }),
194        );
195    }
196
197    /// <https://html.spec.whatwg.org/multipage/#processField>
198    fn process_field(&mut self) {
199        match &*self.field {
200            "event" => mem::swap(&mut self.event_type, &mut self.value),
201            "data" => {
202                self.data.push_str(&self.value);
203                self.data.push('\n');
204            },
205            "id" if !self.value.contains('\0') => {
206                mem::swap(&mut self.last_event_id, &mut self.value);
207            },
208            "retry" => {
209                if let Ok(time) = u64::from_str(&self.value) {
210                    self.event_source
211                        .root()
212                        .reconnection_time
213                        .set(Duration::from_millis(time));
214                }
215            },
216            _ => (),
217        }
218
219        self.field.clear();
220        self.value.clear();
221    }
222
223    /// <https://html.spec.whatwg.org/multipage/#dispatchMessage>
224    fn dispatch_event(&mut self, can_gc: CanGc) {
225        let event_source = self.event_source.root();
226        // Step 1
227        *event_source.last_event_id.borrow_mut() = DOMString::from(self.last_event_id.clone());
228        // Step 2
229        if self.data.is_empty() {
230            self.data.clear();
231            self.event_type.clear();
232            return;
233        }
234        // Step 3
235        if let Some(last) = self.data.pop() {
236            if last != '\n' {
237                self.data.push(last);
238            }
239        }
240        // Step 6
241        let type_ = if !self.event_type.is_empty() {
242            Atom::from(self.event_type.clone())
243        } else {
244            atom!("message")
245        };
246        // Steps 4-5
247        let event = {
248            let _ac = enter_realm(&*event_source);
249            rooted!(in(*GlobalScope::get_cx()) let mut data = UndefinedValue());
250            self.data
251                .safe_to_jsval(GlobalScope::get_cx(), data.handle_mut(), can_gc);
252            MessageEvent::new(
253                &event_source.global(),
254                type_,
255                false,
256                false,
257                data.handle(),
258                DOMString::from(self.origin.clone()),
259                None,
260                event_source.last_event_id.borrow().clone(),
261                Vec::with_capacity(0),
262                can_gc,
263            )
264        };
265        // Step 7
266        self.event_type.clear();
267        self.data.clear();
268
269        // Step 8.
270        let global = event_source.global();
271        let event_source = self.event_source.clone();
272        let event = Trusted::new(&*event);
273        global.task_manager().remote_event_task_source().queue(
274            task!(dispatch_the_event_source_event: move || {
275                let event_source = event_source.root();
276                if event_source.ready_state.get() != ReadyState::Closed {
277                    event.root().upcast::<Event>().fire(event_source.upcast(), CanGc::note());
278                }
279            }),
280        );
281    }
282
283    /// <https://html.spec.whatwg.org/multipage/#event-stream-interpretation>
284    fn parse(&mut self, stream: Chars, can_gc: CanGc) {
285        let mut stream = stream.peekable();
286
287        while let Some(ch) = stream.next() {
288            match (ch, &self.parser_state) {
289                (':', &ParserState::Eol) => self.parser_state = ParserState::Comment,
290                (':', &ParserState::Field) => {
291                    self.parser_state = ParserState::Value;
292                    if let Some(&' ') = stream.peek() {
293                        stream.next();
294                    }
295                },
296
297                ('\n', &ParserState::Value) => {
298                    self.parser_state = ParserState::Eol;
299                    self.process_field();
300                },
301                ('\r', &ParserState::Value) => {
302                    if let Some(&'\n') = stream.peek() {
303                        continue;
304                    }
305                    self.parser_state = ParserState::Eol;
306                    self.process_field();
307                },
308
309                ('\n', &ParserState::Field) => {
310                    self.parser_state = ParserState::Eol;
311                    self.process_field();
312                },
313                ('\r', &ParserState::Field) => {
314                    if let Some(&'\n') = stream.peek() {
315                        continue;
316                    }
317                    self.parser_state = ParserState::Eol;
318                    self.process_field();
319                },
320
321                ('\n', &ParserState::Eol) => self.dispatch_event(can_gc),
322                ('\r', &ParserState::Eol) => {
323                    if let Some(&'\n') = stream.peek() {
324                        continue;
325                    }
326                    self.dispatch_event(can_gc);
327                },
328
329                ('\n', &ParserState::Comment) => self.parser_state = ParserState::Eol,
330                ('\r', &ParserState::Comment) => {
331                    if let Some(&'\n') = stream.peek() {
332                        continue;
333                    }
334                    self.parser_state = ParserState::Eol;
335                },
336
337                (_, &ParserState::Field) => self.field.push(ch),
338                (_, &ParserState::Value) => self.value.push(ch),
339                (_, &ParserState::Eol) => {
340                    self.parser_state = ParserState::Field;
341                    self.field.push(ch);
342                },
343                (_, &ParserState::Comment) => (),
344            }
345        }
346    }
347}
348
349impl FetchResponseListener for EventSourceContext {
350    fn should_invoke(&self) -> bool {
351        self.event_source.root().generation_id.get() == self.gen_id
352    }
353
354    fn process_request_body(&mut self, _: RequestId) {
355        // TODO
356    }
357
358    fn process_request_eof(&mut self, _: RequestId) {
359        // TODO
360    }
361
362    fn process_response(&mut self, _: RequestId, metadata: Result<FetchMetadata, NetworkError>) {
363        match metadata {
364            Ok(fm) => {
365                let meta = match fm {
366                    FetchMetadata::Unfiltered(m) => m,
367                    FetchMetadata::Filtered { unsafe_, filtered } => match filtered {
368                        FilteredMetadata::Opaque | FilteredMetadata::OpaqueRedirect(_) => {
369                            return self.fail_the_connection();
370                        },
371                        _ => unsafe_,
372                    },
373                };
374                // Step 15.3 if res's status is not 200, or if res's `Content-Type` is not
375                // `text/event-stream`, then fail the connection.
376                if meta.status.code() != StatusCode::OK {
377                    return self.fail_the_connection();
378                }
379                let mime = match meta.content_type {
380                    None => return self.fail_the_connection(),
381                    Some(ct) => <ContentType as Into<Mime>>::into(ct.into_inner()),
382                };
383                if (mime.type_(), mime.subtype()) != (mime::TEXT, mime::EVENT_STREAM) {
384                    return self.fail_the_connection();
385                }
386                self.origin = meta.final_url.origin().ascii_serialization();
387                // Step 15.4 announce the connection and interpret res's body line by line.
388                self.announce_the_connection();
389            },
390            Err(_) => {
391                // Step 15.2 if res is a network error, then reestablish the connection, unless
392                // the user agent knows that to be futile, in which case the user agent may
393                // fail the connection.
394
395                // WPT tests consider a non-http(s) scheme to be futile.
396                match self.event_source.root().url.scheme() {
397                    "http" | "https" => self.reestablish_the_connection(),
398                    _ => self.fail_the_connection(),
399                }
400            },
401        }
402    }
403
404    fn process_response_chunk(&mut self, _: RequestId, chunk: Vec<u8>) {
405        let mut input = &*chunk;
406        if let Some(mut incomplete) = self.incomplete_utf8.take() {
407            match incomplete.try_complete(input) {
408                None => return,
409                Some((result, remaining_input)) => {
410                    self.parse(result.unwrap_or("\u{FFFD}").chars(), CanGc::note());
411                    input = remaining_input;
412                },
413            }
414        }
415
416        while !input.is_empty() {
417            match utf8::decode(input) {
418                Ok(s) => {
419                    self.parse(s.chars(), CanGc::note());
420                    return;
421                },
422                Err(utf8::DecodeError::Invalid {
423                    valid_prefix,
424                    remaining_input,
425                    ..
426                }) => {
427                    self.parse(valid_prefix.chars(), CanGc::note());
428                    self.parse("\u{FFFD}".chars(), CanGc::note());
429                    input = remaining_input;
430                },
431                Err(utf8::DecodeError::Incomplete {
432                    valid_prefix,
433                    incomplete_suffix,
434                }) => {
435                    self.parse(valid_prefix.chars(), CanGc::note());
436                    self.incomplete_utf8 = Some(incomplete_suffix);
437                    return;
438                },
439            }
440        }
441    }
442
443    fn process_response_eof(
444        mut self,
445        _: RequestId,
446        response: Result<ResourceFetchTiming, NetworkError>,
447    ) {
448        if self.incomplete_utf8.take().is_some() {
449            self.parse("\u{FFFD}".chars(), CanGc::note());
450        }
451        if let Ok(response) = response {
452            self.reestablish_the_connection();
453            network_listener::submit_timing(&self, &response, CanGc::note());
454        }
455    }
456
457    fn process_csp_violations(&mut self, _request_id: RequestId, violations: Vec<Violation>) {
458        let global = &self.resource_timing_global();
459        global.report_csp_violations(violations, None, None);
460    }
461}
462
463impl ResourceTimingListener for EventSourceContext {
464    fn resource_timing_information(&self) -> (InitiatorType, ServoUrl) {
465        (InitiatorType::Other, self.event_source.root().url().clone())
466    }
467
468    fn resource_timing_global(&self) -> DomRoot<GlobalScope> {
469        self.event_source.root().global()
470    }
471}
472
473impl EventSource {
474    fn new_inherited(url: ServoUrl, with_credentials: bool) -> EventSource {
475        EventSource {
476            eventtarget: EventTarget::new_inherited(),
477            url,
478            request: DomRefCell::new(None),
479            last_event_id: DomRefCell::new(DOMString::from("")),
480            reconnection_time: Cell::new(DEFAULT_RECONNECTION_TIME),
481            generation_id: Cell::new(GenerationId(0)),
482
483            ready_state: Cell::new(ReadyState::Connecting),
484            with_credentials,
485            droppable: DroppableEventSource::new(DomRefCell::new(Default::default())),
486        }
487    }
488
489    fn new(
490        global: &GlobalScope,
491        proto: Option<HandleObject>,
492        url: ServoUrl,
493        with_credentials: bool,
494        can_gc: CanGc,
495    ) -> DomRoot<EventSource> {
496        reflect_dom_object_with_proto(
497            Box::new(EventSource::new_inherited(url, with_credentials)),
498            global,
499            proto,
500            can_gc,
501        )
502    }
503
504    // https://html.spec.whatwg.org/multipage/#sse-processing-model:fail-the-connection-3
505    pub(crate) fn cancel(&self) {
506        self.droppable.cancel();
507        self.fail_the_connection();
508    }
509
510    /// <https://html.spec.whatwg.org/multipage/#fail-the-connection>
511    pub(crate) fn fail_the_connection(&self) {
512        let global = self.global();
513        let event_source = Trusted::new(self);
514        global.task_manager().remote_event_task_source().queue(
515            task!(fail_the_event_source_connection: move || {
516                let event_source = event_source.root();
517                if event_source.ready_state.get() != ReadyState::Closed {
518                    event_source.ready_state.set(ReadyState::Closed);
519                    event_source.upcast::<EventTarget>().fire_event(atom!("error"), CanGc::note());
520                }
521            }),
522        );
523    }
524
525    pub(crate) fn request(&self) -> RequestBuilder {
526        self.request.borrow().clone().unwrap()
527    }
528
529    pub(crate) fn url(&self) -> &ServoUrl {
530        &self.url
531    }
532}
533
534impl EventSourceMethods<crate::DomTypeHolder> for EventSource {
535    /// <https://html.spec.whatwg.org/multipage/#dom-eventsource>
536    fn Constructor(
537        global: &GlobalScope,
538        proto: Option<HandleObject>,
539        can_gc: CanGc,
540        url: DOMString,
541        event_source_init: &EventSourceInit,
542    ) -> Fallible<DomRoot<EventSource>> {
543        // TODO: Step 2 relevant settings object
544        // Step 3 Let urlRecord be the result of encoding-parsing a URL given url,
545        // relative to settings.
546        let base_url = global.api_base_url();
547        let url_record = match base_url.join(&url.str()) {
548            Ok(u) => u,
549            // Step 4 If urlRecord is failure, then throw a "SyntaxError" DOMException.
550            Err(_) => return Err(Error::Syntax(None)),
551        };
552        // Step 1 Let ev be a new EventSource object.
553        let event_source = EventSource::new(
554            global,
555            proto,
556            // Step 5 Set ev's url to urlRecord.
557            url_record.clone(),
558            event_source_init.withCredentials,
559            can_gc,
560        );
561        global.track_event_source(&event_source);
562        let cors_attribute_state = if event_source_init.withCredentials {
563            // Step 7 If the value of eventSourceInitDict's withCredentials member is true,
564            // then set corsAttributeState to Use Credentials and set ev's withCredentials
565            // attribute to true.
566            CorsSettings::UseCredentials
567        } else {
568            // Step 6 Let corsAttributeState be Anonymous.
569            CorsSettings::Anonymous
570        };
571        // Step 8 Let request be the result of creating a potential-CORS request
572        // given urlRecord, the empty string, and corsAttributeState.
573        // TODO: Step 9 set request's client settings
574        let mut request = create_a_potential_cors_request(
575            global.webview_id(),
576            url_record,
577            Destination::None,
578            Some(cors_attribute_state),
579            Some(true),
580            global.get_referrer(),
581            global.insecure_requests_policy(),
582            global.has_trustworthy_ancestor_or_current_origin(),
583            global.policy_container(),
584        )
585        .origin(global.origin().immutable().clone())
586        .pipeline_id(Some(global.pipeline_id()));
587
588        // Step 10 User agents may set (`Accept`, `text/event-stream`) in request's header list.
589        // TODO(eijebong): Replace once typed headers allow it
590        request.headers.insert(
591            header::ACCEPT,
592            HeaderValue::from_static("text/event-stream"),
593        );
594        // Step 11 Set request's cache mode to "no-store".
595        request.cache_mode = CacheMode::NoStore;
596        // Step 13 Set ev's request to request.
597        *event_source.request.borrow_mut() = Some(request.clone());
598        // Step 14 Let processEventSourceEndOfBody given response res be the following step:
599        // if res is not a network error, then reestablish the connection.
600
601        event_source.droppable.set_canceller(FetchCanceller::new(
602            request.id,
603            global.core_resource_thread(),
604        ));
605
606        let context = EventSourceContext {
607            incomplete_utf8: None,
608            event_source: Trusted::new(&event_source),
609            gen_id: event_source.generation_id.get(),
610            parser_state: ParserState::Eol,
611            field: String::new(),
612            value: String::new(),
613            origin: String::new(),
614
615            event_type: String::new(),
616            data: String::new(),
617            last_event_id: String::new(),
618        };
619
620        let task_source = global.task_manager().networking_task_source().into();
621        global.fetch(request, context, task_source);
622
623        // Step 16 Return ev.
624        Ok(event_source)
625    }
626
627    // https://html.spec.whatwg.org/multipage/#handler-eventsource-onopen
628    event_handler!(open, GetOnopen, SetOnopen);
629
630    // https://html.spec.whatwg.org/multipage/#handler-eventsource-onmessage
631    event_handler!(message, GetOnmessage, SetOnmessage);
632
633    // https://html.spec.whatwg.org/multipage/#handler-eventsource-onerror
634    event_handler!(error, GetOnerror, SetOnerror);
635
636    /// <https://html.spec.whatwg.org/multipage/#dom-eventsource-url>
637    fn Url(&self) -> DOMString {
638        DOMString::from(self.url.as_str())
639    }
640
641    /// <https://html.spec.whatwg.org/multipage/#dom-eventsource-withcredentials>
642    fn WithCredentials(&self) -> bool {
643        self.with_credentials
644    }
645
646    /// <https://html.spec.whatwg.org/multipage/#dom-eventsource-readystate>
647    fn ReadyState(&self) -> u16 {
648        self.ready_state.get() as u16
649    }
650
651    /// <https://html.spec.whatwg.org/multipage/#dom-eventsource-close>
652    fn Close(&self) {
653        let GenerationId(prev_id) = self.generation_id.get();
654        self.generation_id.set(GenerationId(prev_id + 1));
655        self.droppable.cancel();
656        self.ready_state.set(ReadyState::Closed);
657    }
658}
659
660#[derive(JSTraceable, MallocSizeOf)]
661pub(crate) struct EventSourceTimeoutCallback {
662    #[ignore_malloc_size_of = "Because it is non-owning"]
663    event_source: Trusted<EventSource>,
664    #[no_trace]
665    event_source_context: EventSourceContext,
666}
667
668impl EventSourceTimeoutCallback {
669    /// <https://html.spec.whatwg.org/multipage/#reestablish-the-connection>
670    pub(crate) fn invoke(self) {
671        let event_source = self.event_source.root();
672        let global = event_source.global();
673
674        // Step 5.1: If the EventSource object's readyState attribute is not set to CONNECTING, then return.
675        if event_source.ready_state.get() != ReadyState::Connecting {
676            return;
677        }
678
679        // Step 5.2: Let request be the EventSource object's request.
680        let mut request = event_source.request();
681
682        // Step 5.3: If the EventSource object's last event ID string is not the empty string, then:
683        //  - Let lastEventIDValue be the EventSource object's last event ID string, encoded as UTF-8.
684        //  - Set (`Last-Event-ID`, lastEventIDValue) in request's header list.
685        if !event_source.last_event_id.borrow().is_empty() {
686            // TODO(eijebong): Change this once typed header support custom values
687            request.headers.insert(
688                HeaderName::from_static("last-event-id"),
689                HeaderValue::from_str(&String::from(event_source.last_event_id.borrow().clone()))
690                    .unwrap(),
691            );
692        }
693
694        // Step 5.4: Fetch request and process the response obtained in this fashion, if
695        // any, as described earlier in this section.
696        let task_source = global.task_manager().networking_task_source().into();
697        global.fetch(request, self.event_source_context, task_source);
698    }
699}