1use std::cell::Cell;
6use std::mem;
7use std::str::{Chars, FromStr};
8use std::time::Duration;
9
10use dom_struct::dom_struct;
11use headers::ContentType;
12use http::StatusCode;
13use http::header::{self, HeaderName, HeaderValue};
14use js::jsval::UndefinedValue;
15use js::rust::HandleObject;
16use mime::{self, Mime};
17use net_traits::request::{CacheMode, CorsSettings, Destination, RequestBuilder, RequestId};
18use net_traits::{FetchMetadata, FilteredMetadata, NetworkError, ResourceFetchTiming};
19use script_bindings::conversions::SafeToJSValConvertible;
20use servo_url::ServoUrl;
21use stylo_atoms::Atom;
22
23use crate::dom::bindings::cell::DomRefCell;
24use crate::dom::bindings::codegen::Bindings::EventSourceBinding::{
25 EventSourceInit, EventSourceMethods,
26};
27use crate::dom::bindings::error::{Error, Fallible};
28use crate::dom::bindings::inheritance::Castable;
29use crate::dom::bindings::refcounted::Trusted;
30use crate::dom::bindings::reflector::{DomGlobal, reflect_dom_object_with_proto};
31use crate::dom::bindings::root::DomRoot;
32use crate::dom::bindings::str::DOMString;
33use crate::dom::csp::{GlobalCspReporting, Violation};
34use crate::dom::event::Event;
35use crate::dom::eventtarget::EventTarget;
36use crate::dom::globalscope::GlobalScope;
37use crate::dom::messageevent::MessageEvent;
38use crate::dom::performance::performanceresourcetiming::InitiatorType;
39use crate::fetch::{FetchCanceller, RequestWithGlobalScope, create_a_potential_cors_request};
40use crate::network_listener::{self, FetchResponseListener, ResourceTimingListener};
41use crate::realms::enter_realm;
42use crate::script_runtime::CanGc;
43use crate::timers::OneshotTimerCallback;
44
45const DEFAULT_RECONNECTION_TIME: Duration = Duration::from_millis(5000);
46
47#[derive(Clone, Copy, Debug, JSTraceable, MallocSizeOf, PartialEq)]
48struct GenerationId(u32);
49
50#[derive(Clone, Copy, Debug, JSTraceable, MallocSizeOf, PartialEq)]
51enum ReadyState {
53 Connecting = 0,
54 Open = 1,
55 Closed = 2,
56}
57
58#[derive(JSTraceable, MallocSizeOf)]
59struct DroppableEventSource {
60 canceller: DomRefCell<FetchCanceller>,
61}
62
63impl DroppableEventSource {
64 pub(crate) fn new(canceller: DomRefCell<FetchCanceller>) -> Self {
65 DroppableEventSource { canceller }
66 }
67
68 pub(crate) fn cancel(&self) {
69 self.canceller.borrow_mut().abort();
70 }
71
72 pub(crate) fn set_canceller(&self, data: FetchCanceller) {
73 *self.canceller.borrow_mut() = data;
74 }
75}
76
77impl Drop for DroppableEventSource {
79 fn drop(&mut self) {
80 self.cancel();
83 }
84}
85
86#[dom_struct]
87pub(crate) struct EventSource {
88 eventtarget: EventTarget,
89 #[no_trace]
90 url: ServoUrl,
91 #[no_trace]
92 request: DomRefCell<Option<RequestBuilder>>,
93 last_event_id: DomRefCell<DOMString>,
94 reconnection_time: Cell<Duration>,
95 generation_id: Cell<GenerationId>,
96
97 ready_state: Cell<ReadyState>,
98 with_credentials: bool,
99 droppable: DroppableEventSource,
100}
101
102#[derive(Clone, MallocSizeOf)]
103enum ParserState {
104 Field,
105 Comment,
106 Value,
107 Eol,
108}
109
110#[derive(Clone, MallocSizeOf)]
111struct EventSourceContext {
112 incomplete_utf8: Option<utf8::Incomplete>,
113 event_source: Trusted<EventSource>,
114 gen_id: GenerationId,
115 parser_state: ParserState,
116 field: String,
117 value: String,
118 origin: String,
119 event_type: String,
120 data: String,
121 last_event_id: String,
122}
123
124impl EventSourceContext {
125 fn announce_the_connection(&self) {
127 let event_source = self.event_source.root();
128 if self.gen_id != event_source.generation_id.get() {
129 return;
130 }
131 let global = event_source.global();
132 let event_source = self.event_source.clone();
133 global.task_manager().remote_event_task_source().queue(
134 task!(announce_the_event_source_connection: move || {
135 let event_source = event_source.root();
136 if event_source.ready_state.get() != ReadyState::Closed {
137 event_source.ready_state.set(ReadyState::Open);
138 event_source.upcast::<EventTarget>().fire_event(atom!("open"), CanGc::note());
139 }
140 }),
141 );
142 }
143
144 fn fail_the_connection(&self) {
146 let event_source = self.event_source.root();
147 if self.gen_id != event_source.generation_id.get() {
148 return;
149 }
150 event_source.fail_the_connection();
151 }
152
153 fn reestablish_the_connection(&self) {
155 let event_source = self.event_source.root();
156
157 if self.gen_id != event_source.generation_id.get() {
158 return;
159 }
160
161 let trusted_event_source = self.event_source.clone();
162 let global = event_source.global();
163 let event_source_context = EventSourceContext {
164 incomplete_utf8: None,
165 event_source: self.event_source.clone(),
166 gen_id: self.gen_id,
167 parser_state: ParserState::Eol,
168 field: String::new(),
169 value: String::new(),
170 origin: self.origin.clone(),
171 event_type: String::new(),
172 data: String::new(),
173 last_event_id: String::from(event_source.last_event_id.borrow().clone()),
174 };
175 global.task_manager().remote_event_task_source().queue(
176 task!(reestablish_the_event_source_onnection: move || {
177 let event_source = trusted_event_source.root();
178
179 if event_source.ready_state.get() == ReadyState::Closed {
181 return;
182 }
183
184 event_source.ready_state.set(ReadyState::Connecting);
186
187 event_source.upcast::<EventTarget>().fire_event(atom!("error"), CanGc::note());
189
190 let duration = event_source.reconnection_time.get();
192
193 let callback = OneshotTimerCallback::EventSourceTimeout(
198 EventSourceTimeoutCallback {
199 event_source: trusted_event_source,
200 event_source_context,
201 }
202 );
203 event_source.global().schedule_callback(callback, duration);
204 }),
205 );
206 }
207
208 fn process_field(&mut self) {
210 match &*self.field {
211 "event" => mem::swap(&mut self.event_type, &mut self.value),
212 "data" => {
213 self.data.push_str(&self.value);
214 self.data.push('\n');
215 },
216 "id" if !self.value.contains('\0') => {
217 mem::swap(&mut self.last_event_id, &mut self.value);
218 },
219 "retry" => {
220 if let Ok(time) = u64::from_str(&self.value) {
221 self.event_source
222 .root()
223 .reconnection_time
224 .set(Duration::from_millis(time));
225 }
226 },
227 _ => (),
228 }
229
230 self.field.clear();
231 self.value.clear();
232 }
233
234 fn dispatch_event(&mut self, can_gc: CanGc) {
236 let event_source = self.event_source.root();
237 *event_source.last_event_id.borrow_mut() = DOMString::from(self.last_event_id.clone());
239 if self.data.is_empty() {
241 self.data.clear();
242 self.event_type.clear();
243 return;
244 }
245 if let Some(last) = self.data.pop() {
247 if last != '\n' {
248 self.data.push(last);
249 }
250 }
251 let type_ = if !self.event_type.is_empty() {
253 Atom::from(self.event_type.clone())
254 } else {
255 atom!("message")
256 };
257 let event = {
259 let _ac = enter_realm(&*event_source);
260 rooted!(in(*GlobalScope::get_cx()) let mut data = UndefinedValue());
261 self.data
262 .safe_to_jsval(GlobalScope::get_cx(), data.handle_mut(), can_gc);
263 MessageEvent::new(
264 &event_source.global(),
265 type_,
266 false,
267 false,
268 data.handle(),
269 DOMString::from(self.origin.clone()),
270 None,
271 event_source.last_event_id.borrow().clone(),
272 Vec::with_capacity(0),
273 can_gc,
274 )
275 };
276 self.event_type.clear();
278 self.data.clear();
279
280 let global = event_source.global();
282 let event_source = self.event_source.clone();
283 let event = Trusted::new(&*event);
284 global.task_manager().remote_event_task_source().queue(
285 task!(dispatch_the_event_source_event: move || {
286 let event_source = event_source.root();
287 if event_source.ready_state.get() != ReadyState::Closed {
288 event.root().upcast::<Event>().fire(event_source.upcast(), CanGc::note());
289 }
290 }),
291 );
292 }
293
294 fn parse(&mut self, stream: Chars, can_gc: CanGc) {
296 let mut stream = stream.peekable();
297
298 while let Some(ch) = stream.next() {
299 match (ch, &self.parser_state) {
300 (':', &ParserState::Eol) => self.parser_state = ParserState::Comment,
301 (':', &ParserState::Field) => {
302 self.parser_state = ParserState::Value;
303 if let Some(&' ') = stream.peek() {
304 stream.next();
305 }
306 },
307
308 ('\n', &ParserState::Value) => {
309 self.parser_state = ParserState::Eol;
310 self.process_field();
311 },
312 ('\r', &ParserState::Value) => {
313 if let Some(&'\n') = stream.peek() {
314 continue;
315 }
316 self.parser_state = ParserState::Eol;
317 self.process_field();
318 },
319
320 ('\n', &ParserState::Field) => {
321 self.parser_state = ParserState::Eol;
322 self.process_field();
323 },
324 ('\r', &ParserState::Field) => {
325 if let Some(&'\n') = stream.peek() {
326 continue;
327 }
328 self.parser_state = ParserState::Eol;
329 self.process_field();
330 },
331
332 ('\n', &ParserState::Eol) => self.dispatch_event(can_gc),
333 ('\r', &ParserState::Eol) => {
334 if let Some(&'\n') = stream.peek() {
335 continue;
336 }
337 self.dispatch_event(can_gc);
338 },
339
340 ('\n', &ParserState::Comment) => self.parser_state = ParserState::Eol,
341 ('\r', &ParserState::Comment) => {
342 if let Some(&'\n') = stream.peek() {
343 continue;
344 }
345 self.parser_state = ParserState::Eol;
346 },
347
348 (_, &ParserState::Field) => self.field.push(ch),
349 (_, &ParserState::Value) => self.value.push(ch),
350 (_, &ParserState::Eol) => {
351 self.parser_state = ParserState::Field;
352 self.field.push(ch);
353 },
354 (_, &ParserState::Comment) => (),
355 }
356 }
357 }
358}
359
360impl FetchResponseListener for EventSourceContext {
361 fn should_invoke(&self) -> bool {
362 self.event_source.root().generation_id.get() == self.gen_id
363 }
364
365 fn process_request_body(&mut self, _: RequestId) {
366 }
368
369 fn process_request_eof(&mut self, _: RequestId) {
370 }
372
373 fn process_response(&mut self, _: RequestId, metadata: Result<FetchMetadata, NetworkError>) {
374 match metadata {
375 Ok(fm) => {
376 let meta = match fm {
377 FetchMetadata::Unfiltered(m) => m,
378 FetchMetadata::Filtered { unsafe_, filtered } => match filtered {
379 FilteredMetadata::Opaque | FilteredMetadata::OpaqueRedirect(_) => {
380 return self.fail_the_connection();
381 },
382 _ => unsafe_,
383 },
384 };
385 if meta.status.code() != StatusCode::OK {
388 return self.fail_the_connection();
389 }
390 let mime = match meta.content_type {
391 None => return self.fail_the_connection(),
392 Some(ct) => <ContentType as Into<Mime>>::into(ct.into_inner()),
393 };
394 if (mime.type_(), mime.subtype()) != (mime::TEXT, mime::EVENT_STREAM) {
395 return self.fail_the_connection();
396 }
397 self.origin = meta.final_url.origin().ascii_serialization();
398 self.announce_the_connection();
400 },
401 Err(error) => {
402 if error.is_permanent_failure() {
406 self.fail_the_connection()
407 } else {
408 self.reestablish_the_connection()
409 }
410 },
411 }
412 }
413
414 fn process_response_chunk(&mut self, _: RequestId, chunk: Vec<u8>) {
415 let mut input = &*chunk;
416 if let Some(mut incomplete) = self.incomplete_utf8.take() {
417 match incomplete.try_complete(input) {
418 None => return,
419 Some((result, remaining_input)) => {
420 self.parse(result.unwrap_or("\u{FFFD}").chars(), CanGc::note());
421 input = remaining_input;
422 },
423 }
424 }
425
426 while !input.is_empty() {
427 match utf8::decode(input) {
428 Ok(s) => {
429 self.parse(s.chars(), CanGc::note());
430 return;
431 },
432 Err(utf8::DecodeError::Invalid {
433 valid_prefix,
434 remaining_input,
435 ..
436 }) => {
437 self.parse(valid_prefix.chars(), CanGc::note());
438 self.parse("\u{FFFD}".chars(), CanGc::note());
439 input = remaining_input;
440 },
441 Err(utf8::DecodeError::Incomplete {
442 valid_prefix,
443 incomplete_suffix,
444 }) => {
445 self.parse(valid_prefix.chars(), CanGc::note());
446 self.incomplete_utf8 = Some(incomplete_suffix);
447 return;
448 },
449 }
450 }
451 }
452
453 fn process_response_eof(
454 mut self,
455 _: RequestId,
456 response: Result<(), NetworkError>,
457 timing: ResourceFetchTiming,
458 ) {
459 if self.incomplete_utf8.take().is_some() {
460 self.parse("\u{FFFD}".chars(), CanGc::note());
461 }
462 if response.is_ok() {
463 self.reestablish_the_connection();
464 }
465
466 network_listener::submit_timing(&self, &response, &timing, CanGc::note());
467 }
468
469 fn process_csp_violations(&mut self, _request_id: RequestId, violations: Vec<Violation>) {
470 let global = &self.resource_timing_global();
471 global.report_csp_violations(violations, None, None);
472 }
473}
474
475impl ResourceTimingListener for EventSourceContext {
476 fn resource_timing_information(&self) -> (InitiatorType, ServoUrl) {
477 (InitiatorType::Other, self.event_source.root().url().clone())
478 }
479
480 fn resource_timing_global(&self) -> DomRoot<GlobalScope> {
481 self.event_source.root().global()
482 }
483}
484
485impl EventSource {
486 fn new_inherited(url: ServoUrl, with_credentials: bool) -> EventSource {
487 EventSource {
488 eventtarget: EventTarget::new_inherited(),
489 url,
490 request: DomRefCell::new(None),
491 last_event_id: DomRefCell::new(DOMString::from("")),
492 reconnection_time: Cell::new(DEFAULT_RECONNECTION_TIME),
493 generation_id: Cell::new(GenerationId(0)),
494
495 ready_state: Cell::new(ReadyState::Connecting),
496 with_credentials,
497 droppable: DroppableEventSource::new(DomRefCell::new(Default::default())),
498 }
499 }
500
501 fn new(
502 global: &GlobalScope,
503 proto: Option<HandleObject>,
504 url: ServoUrl,
505 with_credentials: bool,
506 can_gc: CanGc,
507 ) -> DomRoot<EventSource> {
508 reflect_dom_object_with_proto(
509 Box::new(EventSource::new_inherited(url, with_credentials)),
510 global,
511 proto,
512 can_gc,
513 )
514 }
515
516 pub(crate) fn cancel(&self) {
518 self.droppable.cancel();
519 self.fail_the_connection();
520 }
521
522 pub(crate) fn fail_the_connection(&self) {
524 let global = self.global();
525 let event_source = Trusted::new(self);
526 global.task_manager().remote_event_task_source().queue(
527 task!(fail_the_event_source_connection: move || {
528 let event_source = event_source.root();
529 if event_source.ready_state.get() != ReadyState::Closed {
530 event_source.ready_state.set(ReadyState::Closed);
531 event_source.upcast::<EventTarget>().fire_event(atom!("error"), CanGc::note());
532 }
533 }),
534 );
535 }
536
537 pub(crate) fn request(&self) -> RequestBuilder {
538 self.request.borrow().clone().unwrap()
539 }
540
541 pub(crate) fn url(&self) -> &ServoUrl {
542 &self.url
543 }
544}
545
546impl EventSourceMethods<crate::DomTypeHolder> for EventSource {
547 fn Constructor(
549 global: &GlobalScope,
550 proto: Option<HandleObject>,
551 can_gc: CanGc,
552 url: DOMString,
553 event_source_init: &EventSourceInit,
554 ) -> Fallible<DomRoot<EventSource>> {
555 let base_url = global.api_base_url();
559 let url_record = match base_url.join(&url.str()) {
560 Ok(u) => u,
561 Err(_) => return Err(Error::Syntax(None)),
563 };
564 let event_source = EventSource::new(
566 global,
567 proto,
568 url_record.clone(),
570 event_source_init.withCredentials,
571 can_gc,
572 );
573 global.track_event_source(&event_source);
574 let cors_attribute_state = if event_source_init.withCredentials {
575 CorsSettings::UseCredentials
579 } else {
580 CorsSettings::Anonymous
582 };
583 let mut request = create_a_potential_cors_request(
587 global.webview_id(),
588 url_record,
589 Destination::None,
590 Some(cors_attribute_state),
591 Some(true),
592 global.get_referrer(),
593 )
594 .with_global_scope(global);
595
596 request.headers.insert(
599 header::ACCEPT,
600 HeaderValue::from_static("text/event-stream"),
601 );
602 request.cache_mode = CacheMode::NoStore;
604 *event_source.request.borrow_mut() = Some(request.clone());
606 event_source.droppable.set_canceller(FetchCanceller::new(
610 request.id,
611 false,
612 global.core_resource_thread(),
613 ));
614
615 let context = EventSourceContext {
616 incomplete_utf8: None,
617 event_source: Trusted::new(&event_source),
618 gen_id: event_source.generation_id.get(),
619 parser_state: ParserState::Eol,
620 field: String::new(),
621 value: String::new(),
622 origin: String::new(),
623
624 event_type: String::new(),
625 data: String::new(),
626 last_event_id: String::new(),
627 };
628
629 let task_source = global.task_manager().networking_task_source().into();
630 global.fetch(request, context, task_source);
631
632 Ok(event_source)
634 }
635
636 event_handler!(open, GetOnopen, SetOnopen);
638
639 event_handler!(message, GetOnmessage, SetOnmessage);
641
642 event_handler!(error, GetOnerror, SetOnerror);
644
645 fn Url(&self) -> DOMString {
647 DOMString::from(self.url.as_str())
648 }
649
650 fn WithCredentials(&self) -> bool {
652 self.with_credentials
653 }
654
655 fn ReadyState(&self) -> u16 {
657 self.ready_state.get() as u16
658 }
659
660 fn Close(&self) {
662 let GenerationId(prev_id) = self.generation_id.get();
663 self.generation_id.set(GenerationId(prev_id + 1));
664 self.droppable.cancel();
665 self.ready_state.set(ReadyState::Closed);
666 }
667}
668
669#[derive(JSTraceable, MallocSizeOf)]
670pub(crate) struct EventSourceTimeoutCallback {
671 #[ignore_malloc_size_of = "Because it is non-owning"]
672 event_source: Trusted<EventSource>,
673 #[no_trace]
674 event_source_context: EventSourceContext,
675}
676
677impl EventSourceTimeoutCallback {
678 pub(crate) fn invoke(self) {
680 let event_source = self.event_source.root();
681 let global = event_source.global();
682
683 if event_source.ready_state.get() != ReadyState::Connecting {
685 return;
686 }
687
688 let mut request = event_source.request();
690
691 if !event_source.last_event_id.borrow().is_empty() {
695 request.headers.insert(
697 HeaderName::from_static("last-event-id"),
698 HeaderValue::from_str(&String::from(event_source.last_event_id.borrow().clone()))
699 .unwrap(),
700 );
701 }
702
703 let task_source = global.task_manager().networking_task_source().into();
706 global.fetch(request, self.event_source_context, task_source);
707 }
708}