1use std::cell::Cell;
6use std::mem;
7use std::str::{Chars, FromStr};
8use std::time::Duration;
9
10use dom_struct::dom_struct;
11use headers::ContentType;
12use http::StatusCode;
13use http::header::{self, HeaderName, HeaderValue};
14use js::jsval::UndefinedValue;
15use js::rust::HandleObject;
16use mime::{self, Mime};
17use net_traits::request::{CacheMode, CorsSettings, Destination, RequestBuilder, RequestId};
18use net_traits::{FetchMetadata, FilteredMetadata, NetworkError, ResourceFetchTiming};
19use script_bindings::conversions::SafeToJSValConvertible;
20use servo_url::ServoUrl;
21use stylo_atoms::Atom;
22
23use crate::dom::bindings::cell::DomRefCell;
24use crate::dom::bindings::codegen::Bindings::EventSourceBinding::{
25 EventSourceInit, EventSourceMethods,
26};
27use crate::dom::bindings::error::{Error, Fallible};
28use crate::dom::bindings::inheritance::Castable;
29use crate::dom::bindings::refcounted::Trusted;
30use crate::dom::bindings::reflector::{DomGlobal, reflect_dom_object_with_proto};
31use crate::dom::bindings::root::DomRoot;
32use crate::dom::bindings::str::DOMString;
33use crate::dom::csp::{GlobalCspReporting, Violation};
34use crate::dom::event::Event;
35use crate::dom::eventtarget::EventTarget;
36use crate::dom::globalscope::GlobalScope;
37use crate::dom::messageevent::MessageEvent;
38use crate::dom::performance::performanceresourcetiming::InitiatorType;
39use crate::fetch::{FetchCanceller, create_a_potential_cors_request};
40use crate::network_listener::{self, FetchResponseListener, ResourceTimingListener};
41use crate::realms::enter_realm;
42use crate::script_runtime::CanGc;
43use crate::timers::OneshotTimerCallback;
44
45const DEFAULT_RECONNECTION_TIME: Duration = Duration::from_millis(5000);
46
47#[derive(Clone, Copy, Debug, JSTraceable, MallocSizeOf, PartialEq)]
48struct GenerationId(u32);
49
50#[derive(Clone, Copy, Debug, JSTraceable, MallocSizeOf, PartialEq)]
51enum ReadyState {
53 Connecting = 0,
54 Open = 1,
55 Closed = 2,
56}
57
58#[derive(JSTraceable, MallocSizeOf)]
59struct DroppableEventSource {
60 canceller: DomRefCell<FetchCanceller>,
61}
62
63impl DroppableEventSource {
64 pub(crate) fn new(canceller: DomRefCell<FetchCanceller>) -> Self {
65 DroppableEventSource { canceller }
66 }
67
68 pub(crate) fn cancel(&self) {
69 self.canceller.borrow_mut().cancel();
70 }
71
72 pub(crate) fn set_canceller(&self, data: FetchCanceller) {
73 *self.canceller.borrow_mut() = data;
74 }
75}
76
77impl Drop for DroppableEventSource {
79 fn drop(&mut self) {
80 self.cancel();
83 }
84}
85
86#[dom_struct]
87pub(crate) struct EventSource {
88 eventtarget: EventTarget,
89 #[no_trace]
90 url: ServoUrl,
91 #[no_trace]
92 request: DomRefCell<Option<RequestBuilder>>,
93 last_event_id: DomRefCell<DOMString>,
94 reconnection_time: Cell<Duration>,
95 generation_id: Cell<GenerationId>,
96
97 ready_state: Cell<ReadyState>,
98 with_credentials: bool,
99 droppable: DroppableEventSource,
100}
101
102#[derive(Clone, MallocSizeOf)]
103enum ParserState {
104 Field,
105 Comment,
106 Value,
107 Eol,
108}
109
110#[derive(Clone, MallocSizeOf)]
111struct EventSourceContext {
112 incomplete_utf8: Option<utf8::Incomplete>,
113 event_source: Trusted<EventSource>,
114 gen_id: GenerationId,
115 parser_state: ParserState,
116 field: String,
117 value: String,
118 origin: String,
119 event_type: String,
120 data: String,
121 last_event_id: String,
122}
123
124impl EventSourceContext {
125 fn announce_the_connection(&self) {
127 let event_source = self.event_source.root();
128 if self.gen_id != event_source.generation_id.get() {
129 return;
130 }
131 let global = event_source.global();
132 let event_source = self.event_source.clone();
133 global.task_manager().remote_event_task_source().queue(
134 task!(announce_the_event_source_connection: move || {
135 let event_source = event_source.root();
136 if event_source.ready_state.get() != ReadyState::Closed {
137 event_source.ready_state.set(ReadyState::Open);
138 event_source.upcast::<EventTarget>().fire_event(atom!("open"), CanGc::note());
139 }
140 }),
141 );
142 }
143
144 fn fail_the_connection(&self) {
146 let event_source = self.event_source.root();
147 if self.gen_id != event_source.generation_id.get() {
148 return;
149 }
150 event_source.fail_the_connection();
151 }
152
153 fn reestablish_the_connection(&self) {
155 let event_source = self.event_source.root();
156
157 if self.gen_id != event_source.generation_id.get() {
158 return;
159 }
160
161 let trusted_event_source = self.event_source.clone();
162 let global = event_source.global();
163 let event_source_context = self.clone();
164 global.task_manager().remote_event_task_source().queue(
165 task!(reestablish_the_event_source_onnection: move || {
166 let event_source = trusted_event_source.root();
167
168 if event_source.ready_state.get() == ReadyState::Closed {
170 return;
171 }
172
173 event_source.ready_state.set(ReadyState::Connecting);
175
176 event_source.upcast::<EventTarget>().fire_event(atom!("error"), CanGc::note());
178
179 let duration = event_source.reconnection_time.get();
181
182 let callback = OneshotTimerCallback::EventSourceTimeout(
187 EventSourceTimeoutCallback {
188 event_source: trusted_event_source,
189 event_source_context,
190 }
191 );
192 event_source.global().schedule_callback(callback, duration);
193 }),
194 );
195 }
196
197 fn process_field(&mut self) {
199 match &*self.field {
200 "event" => mem::swap(&mut self.event_type, &mut self.value),
201 "data" => {
202 self.data.push_str(&self.value);
203 self.data.push('\n');
204 },
205 "id" if !self.value.contains('\0') => {
206 mem::swap(&mut self.last_event_id, &mut self.value);
207 },
208 "retry" => {
209 if let Ok(time) = u64::from_str(&self.value) {
210 self.event_source
211 .root()
212 .reconnection_time
213 .set(Duration::from_millis(time));
214 }
215 },
216 _ => (),
217 }
218
219 self.field.clear();
220 self.value.clear();
221 }
222
223 fn dispatch_event(&mut self, can_gc: CanGc) {
225 let event_source = self.event_source.root();
226 *event_source.last_event_id.borrow_mut() = DOMString::from(self.last_event_id.clone());
228 if self.data.is_empty() {
230 self.data.clear();
231 self.event_type.clear();
232 return;
233 }
234 if let Some(last) = self.data.pop() {
236 if last != '\n' {
237 self.data.push(last);
238 }
239 }
240 let type_ = if !self.event_type.is_empty() {
242 Atom::from(self.event_type.clone())
243 } else {
244 atom!("message")
245 };
246 let event = {
248 let _ac = enter_realm(&*event_source);
249 rooted!(in(*GlobalScope::get_cx()) let mut data = UndefinedValue());
250 self.data
251 .safe_to_jsval(GlobalScope::get_cx(), data.handle_mut(), can_gc);
252 MessageEvent::new(
253 &event_source.global(),
254 type_,
255 false,
256 false,
257 data.handle(),
258 DOMString::from(self.origin.clone()),
259 None,
260 event_source.last_event_id.borrow().clone(),
261 Vec::with_capacity(0),
262 can_gc,
263 )
264 };
265 self.event_type.clear();
267 self.data.clear();
268
269 let global = event_source.global();
271 let event_source = self.event_source.clone();
272 let event = Trusted::new(&*event);
273 global.task_manager().remote_event_task_source().queue(
274 task!(dispatch_the_event_source_event: move || {
275 let event_source = event_source.root();
276 if event_source.ready_state.get() != ReadyState::Closed {
277 event.root().upcast::<Event>().fire(event_source.upcast(), CanGc::note());
278 }
279 }),
280 );
281 }
282
283 fn parse(&mut self, stream: Chars, can_gc: CanGc) {
285 let mut stream = stream.peekable();
286
287 while let Some(ch) = stream.next() {
288 match (ch, &self.parser_state) {
289 (':', &ParserState::Eol) => self.parser_state = ParserState::Comment,
290 (':', &ParserState::Field) => {
291 self.parser_state = ParserState::Value;
292 if let Some(&' ') = stream.peek() {
293 stream.next();
294 }
295 },
296
297 ('\n', &ParserState::Value) => {
298 self.parser_state = ParserState::Eol;
299 self.process_field();
300 },
301 ('\r', &ParserState::Value) => {
302 if let Some(&'\n') = stream.peek() {
303 continue;
304 }
305 self.parser_state = ParserState::Eol;
306 self.process_field();
307 },
308
309 ('\n', &ParserState::Field) => {
310 self.parser_state = ParserState::Eol;
311 self.process_field();
312 },
313 ('\r', &ParserState::Field) => {
314 if let Some(&'\n') = stream.peek() {
315 continue;
316 }
317 self.parser_state = ParserState::Eol;
318 self.process_field();
319 },
320
321 ('\n', &ParserState::Eol) => self.dispatch_event(can_gc),
322 ('\r', &ParserState::Eol) => {
323 if let Some(&'\n') = stream.peek() {
324 continue;
325 }
326 self.dispatch_event(can_gc);
327 },
328
329 ('\n', &ParserState::Comment) => self.parser_state = ParserState::Eol,
330 ('\r', &ParserState::Comment) => {
331 if let Some(&'\n') = stream.peek() {
332 continue;
333 }
334 self.parser_state = ParserState::Eol;
335 },
336
337 (_, &ParserState::Field) => self.field.push(ch),
338 (_, &ParserState::Value) => self.value.push(ch),
339 (_, &ParserState::Eol) => {
340 self.parser_state = ParserState::Field;
341 self.field.push(ch);
342 },
343 (_, &ParserState::Comment) => (),
344 }
345 }
346 }
347}
348
349impl FetchResponseListener for EventSourceContext {
350 fn should_invoke(&self) -> bool {
351 self.event_source.root().generation_id.get() == self.gen_id
352 }
353
354 fn process_request_body(&mut self, _: RequestId) {
355 }
357
358 fn process_request_eof(&mut self, _: RequestId) {
359 }
361
362 fn process_response(&mut self, _: RequestId, metadata: Result<FetchMetadata, NetworkError>) {
363 match metadata {
364 Ok(fm) => {
365 let meta = match fm {
366 FetchMetadata::Unfiltered(m) => m,
367 FetchMetadata::Filtered { unsafe_, filtered } => match filtered {
368 FilteredMetadata::Opaque | FilteredMetadata::OpaqueRedirect(_) => {
369 return self.fail_the_connection();
370 },
371 _ => unsafe_,
372 },
373 };
374 if meta.status.code() != StatusCode::OK {
377 return self.fail_the_connection();
378 }
379 let mime = match meta.content_type {
380 None => return self.fail_the_connection(),
381 Some(ct) => <ContentType as Into<Mime>>::into(ct.into_inner()),
382 };
383 if (mime.type_(), mime.subtype()) != (mime::TEXT, mime::EVENT_STREAM) {
384 return self.fail_the_connection();
385 }
386 self.origin = meta.final_url.origin().ascii_serialization();
387 self.announce_the_connection();
389 },
390 Err(_) => {
391 match self.event_source.root().url.scheme() {
397 "http" | "https" => self.reestablish_the_connection(),
398 _ => self.fail_the_connection(),
399 }
400 },
401 }
402 }
403
404 fn process_response_chunk(&mut self, _: RequestId, chunk: Vec<u8>) {
405 let mut input = &*chunk;
406 if let Some(mut incomplete) = self.incomplete_utf8.take() {
407 match incomplete.try_complete(input) {
408 None => return,
409 Some((result, remaining_input)) => {
410 self.parse(result.unwrap_or("\u{FFFD}").chars(), CanGc::note());
411 input = remaining_input;
412 },
413 }
414 }
415
416 while !input.is_empty() {
417 match utf8::decode(input) {
418 Ok(s) => {
419 self.parse(s.chars(), CanGc::note());
420 return;
421 },
422 Err(utf8::DecodeError::Invalid {
423 valid_prefix,
424 remaining_input,
425 ..
426 }) => {
427 self.parse(valid_prefix.chars(), CanGc::note());
428 self.parse("\u{FFFD}".chars(), CanGc::note());
429 input = remaining_input;
430 },
431 Err(utf8::DecodeError::Incomplete {
432 valid_prefix,
433 incomplete_suffix,
434 }) => {
435 self.parse(valid_prefix.chars(), CanGc::note());
436 self.incomplete_utf8 = Some(incomplete_suffix);
437 return;
438 },
439 }
440 }
441 }
442
443 fn process_response_eof(
444 mut self,
445 _: RequestId,
446 response: Result<ResourceFetchTiming, NetworkError>,
447 ) {
448 if self.incomplete_utf8.take().is_some() {
449 self.parse("\u{FFFD}".chars(), CanGc::note());
450 }
451 if let Ok(response) = response {
452 self.reestablish_the_connection();
453 network_listener::submit_timing(&self, &response, CanGc::note());
454 }
455 }
456
457 fn process_csp_violations(&mut self, _request_id: RequestId, violations: Vec<Violation>) {
458 let global = &self.resource_timing_global();
459 global.report_csp_violations(violations, None, None);
460 }
461}
462
463impl ResourceTimingListener for EventSourceContext {
464 fn resource_timing_information(&self) -> (InitiatorType, ServoUrl) {
465 (InitiatorType::Other, self.event_source.root().url().clone())
466 }
467
468 fn resource_timing_global(&self) -> DomRoot<GlobalScope> {
469 self.event_source.root().global()
470 }
471}
472
473impl EventSource {
474 fn new_inherited(url: ServoUrl, with_credentials: bool) -> EventSource {
475 EventSource {
476 eventtarget: EventTarget::new_inherited(),
477 url,
478 request: DomRefCell::new(None),
479 last_event_id: DomRefCell::new(DOMString::from("")),
480 reconnection_time: Cell::new(DEFAULT_RECONNECTION_TIME),
481 generation_id: Cell::new(GenerationId(0)),
482
483 ready_state: Cell::new(ReadyState::Connecting),
484 with_credentials,
485 droppable: DroppableEventSource::new(DomRefCell::new(Default::default())),
486 }
487 }
488
489 fn new(
490 global: &GlobalScope,
491 proto: Option<HandleObject>,
492 url: ServoUrl,
493 with_credentials: bool,
494 can_gc: CanGc,
495 ) -> DomRoot<EventSource> {
496 reflect_dom_object_with_proto(
497 Box::new(EventSource::new_inherited(url, with_credentials)),
498 global,
499 proto,
500 can_gc,
501 )
502 }
503
504 pub(crate) fn cancel(&self) {
506 self.droppable.cancel();
507 self.fail_the_connection();
508 }
509
510 pub(crate) fn fail_the_connection(&self) {
512 let global = self.global();
513 let event_source = Trusted::new(self);
514 global.task_manager().remote_event_task_source().queue(
515 task!(fail_the_event_source_connection: move || {
516 let event_source = event_source.root();
517 if event_source.ready_state.get() != ReadyState::Closed {
518 event_source.ready_state.set(ReadyState::Closed);
519 event_source.upcast::<EventTarget>().fire_event(atom!("error"), CanGc::note());
520 }
521 }),
522 );
523 }
524
525 pub(crate) fn request(&self) -> RequestBuilder {
526 self.request.borrow().clone().unwrap()
527 }
528
529 pub(crate) fn url(&self) -> &ServoUrl {
530 &self.url
531 }
532}
533
534impl EventSourceMethods<crate::DomTypeHolder> for EventSource {
535 fn Constructor(
537 global: &GlobalScope,
538 proto: Option<HandleObject>,
539 can_gc: CanGc,
540 url: DOMString,
541 event_source_init: &EventSourceInit,
542 ) -> Fallible<DomRoot<EventSource>> {
543 let base_url = global.api_base_url();
547 let url_record = match base_url.join(&url.str()) {
548 Ok(u) => u,
549 Err(_) => return Err(Error::Syntax(None)),
551 };
552 let event_source = EventSource::new(
554 global,
555 proto,
556 url_record.clone(),
558 event_source_init.withCredentials,
559 can_gc,
560 );
561 global.track_event_source(&event_source);
562 let cors_attribute_state = if event_source_init.withCredentials {
563 CorsSettings::UseCredentials
567 } else {
568 CorsSettings::Anonymous
570 };
571 let mut request = create_a_potential_cors_request(
575 global.webview_id(),
576 url_record,
577 Destination::None,
578 Some(cors_attribute_state),
579 Some(true),
580 global.get_referrer(),
581 global.insecure_requests_policy(),
582 global.has_trustworthy_ancestor_or_current_origin(),
583 global.policy_container(),
584 )
585 .origin(global.origin().immutable().clone())
586 .pipeline_id(Some(global.pipeline_id()));
587
588 request.headers.insert(
591 header::ACCEPT,
592 HeaderValue::from_static("text/event-stream"),
593 );
594 request.cache_mode = CacheMode::NoStore;
596 *event_source.request.borrow_mut() = Some(request.clone());
598 event_source.droppable.set_canceller(FetchCanceller::new(
602 request.id,
603 global.core_resource_thread(),
604 ));
605
606 let context = EventSourceContext {
607 incomplete_utf8: None,
608 event_source: Trusted::new(&event_source),
609 gen_id: event_source.generation_id.get(),
610 parser_state: ParserState::Eol,
611 field: String::new(),
612 value: String::new(),
613 origin: String::new(),
614
615 event_type: String::new(),
616 data: String::new(),
617 last_event_id: String::new(),
618 };
619
620 let task_source = global.task_manager().networking_task_source().into();
621 global.fetch(request, context, task_source);
622
623 Ok(event_source)
625 }
626
627 event_handler!(open, GetOnopen, SetOnopen);
629
630 event_handler!(message, GetOnmessage, SetOnmessage);
632
633 event_handler!(error, GetOnerror, SetOnerror);
635
636 fn Url(&self) -> DOMString {
638 DOMString::from(self.url.as_str())
639 }
640
641 fn WithCredentials(&self) -> bool {
643 self.with_credentials
644 }
645
646 fn ReadyState(&self) -> u16 {
648 self.ready_state.get() as u16
649 }
650
651 fn Close(&self) {
653 let GenerationId(prev_id) = self.generation_id.get();
654 self.generation_id.set(GenerationId(prev_id + 1));
655 self.droppable.cancel();
656 self.ready_state.set(ReadyState::Closed);
657 }
658}
659
660#[derive(JSTraceable, MallocSizeOf)]
661pub(crate) struct EventSourceTimeoutCallback {
662 #[ignore_malloc_size_of = "Because it is non-owning"]
663 event_source: Trusted<EventSource>,
664 #[no_trace]
665 event_source_context: EventSourceContext,
666}
667
668impl EventSourceTimeoutCallback {
669 pub(crate) fn invoke(self) {
671 let event_source = self.event_source.root();
672 let global = event_source.global();
673
674 if event_source.ready_state.get() != ReadyState::Connecting {
676 return;
677 }
678
679 let mut request = event_source.request();
681
682 if !event_source.last_event_id.borrow().is_empty() {
686 request.headers.insert(
688 HeaderName::from_static("last-event-id"),
689 HeaderValue::from_str(&String::from(event_source.last_event_id.borrow().clone()))
690 .unwrap(),
691 );
692 }
693
694 let task_source = global.task_manager().networking_task_source().into();
697 global.fetch(request, self.event_source_context, task_source);
698 }
699}