1use std::cell::Cell;
6use std::mem;
7use std::str::{Chars, FromStr};
8use std::sync::{Arc, Mutex};
9use std::time::Duration;
10
11use dom_struct::dom_struct;
12use headers::ContentType;
13use http::StatusCode;
14use http::header::{self, HeaderName, HeaderValue};
15use ipc_channel::ipc;
16use ipc_channel::router::ROUTER;
17use js::jsval::UndefinedValue;
18use js::rust::HandleObject;
19use mime::{self, Mime};
20use net_traits::request::{CacheMode, CorsSettings, Destination, RequestBuilder, RequestId};
21use net_traits::{
22 CoreResourceMsg, FetchChannels, FetchMetadata, FetchResponseListener, FetchResponseMsg,
23 FilteredMetadata, NetworkError, ResourceFetchTiming, ResourceTimingType,
24};
25use script_bindings::conversions::SafeToJSValConvertible;
26use servo_url::ServoUrl;
27use stylo_atoms::Atom;
28
29use crate::dom::bindings::cell::DomRefCell;
30use crate::dom::bindings::codegen::Bindings::EventSourceBinding::{
31 EventSourceInit, EventSourceMethods,
32};
33use crate::dom::bindings::error::{Error, Fallible};
34use crate::dom::bindings::inheritance::Castable;
35use crate::dom::bindings::refcounted::Trusted;
36use crate::dom::bindings::reflector::{DomGlobal, reflect_dom_object_with_proto};
37use crate::dom::bindings::root::DomRoot;
38use crate::dom::bindings::str::DOMString;
39use crate::dom::csp::{GlobalCspReporting, Violation};
40use crate::dom::event::Event;
41use crate::dom::eventtarget::EventTarget;
42use crate::dom::globalscope::GlobalScope;
43use crate::dom::messageevent::MessageEvent;
44use crate::dom::performanceresourcetiming::InitiatorType;
45use crate::fetch::{FetchCanceller, create_a_potential_cors_request};
46use crate::network_listener::{self, NetworkListener, PreInvoke, ResourceTimingListener};
47use crate::realms::enter_realm;
48use crate::script_runtime::CanGc;
49use crate::timers::OneshotTimerCallback;
50
51const DEFAULT_RECONNECTION_TIME: Duration = Duration::from_millis(5000);
52
53#[derive(Clone, Copy, Debug, JSTraceable, MallocSizeOf, PartialEq)]
54struct GenerationId(u32);
55
56#[derive(Clone, Copy, Debug, JSTraceable, MallocSizeOf, PartialEq)]
57enum ReadyState {
59 Connecting = 0,
60 Open = 1,
61 Closed = 2,
62}
63
64#[derive(JSTraceable, MallocSizeOf)]
65struct DroppableEventSource {
66 canceller: DomRefCell<FetchCanceller>,
67}
68
69impl DroppableEventSource {
70 pub(crate) fn new(canceller: DomRefCell<FetchCanceller>) -> Self {
71 DroppableEventSource { canceller }
72 }
73
74 pub(crate) fn cancel(&self) {
75 self.canceller.borrow_mut().cancel();
76 }
77
78 pub(crate) fn set_canceller(&self, data: FetchCanceller) {
79 *self.canceller.borrow_mut() = data;
80 }
81}
82
83impl Drop for DroppableEventSource {
85 fn drop(&mut self) {
86 self.cancel();
89 }
90}
91
92#[dom_struct]
93pub(crate) struct EventSource {
94 eventtarget: EventTarget,
95 #[no_trace]
96 url: ServoUrl,
97 #[no_trace]
98 request: DomRefCell<Option<RequestBuilder>>,
99 last_event_id: DomRefCell<DOMString>,
100 reconnection_time: Cell<Duration>,
101 generation_id: Cell<GenerationId>,
102
103 ready_state: Cell<ReadyState>,
104 with_credentials: bool,
105 droppable: DroppableEventSource,
106}
107
108enum ParserState {
109 Field,
110 Comment,
111 Value,
112 Eol,
113}
114
115struct EventSourceContext {
116 incomplete_utf8: Option<utf8::Incomplete>,
117
118 event_source: Trusted<EventSource>,
119 gen_id: GenerationId,
120 action_sender: ipc::IpcSender<FetchResponseMsg>,
121
122 parser_state: ParserState,
123 field: String,
124 value: String,
125 origin: String,
126
127 event_type: String,
128 data: String,
129 last_event_id: String,
130
131 resource_timing: ResourceFetchTiming,
132}
133
134impl EventSourceContext {
135 fn announce_the_connection(&self) {
137 let event_source = self.event_source.root();
138 if self.gen_id != event_source.generation_id.get() {
139 return;
140 }
141 let global = event_source.global();
142 let event_source = self.event_source.clone();
143 global.task_manager().remote_event_task_source().queue(
144 task!(announce_the_event_source_connection: move || {
145 let event_source = event_source.root();
146 if event_source.ready_state.get() != ReadyState::Closed {
147 event_source.ready_state.set(ReadyState::Open);
148 event_source.upcast::<EventTarget>().fire_event(atom!("open"), CanGc::note());
149 }
150 }),
151 );
152 }
153
154 fn fail_the_connection(&self) {
156 let event_source = self.event_source.root();
157 if self.gen_id != event_source.generation_id.get() {
158 return;
159 }
160 event_source.fail_the_connection();
161 }
162
163 fn reestablish_the_connection(&self) {
165 let event_source = self.event_source.root();
166
167 if self.gen_id != event_source.generation_id.get() {
168 return;
169 }
170
171 let trusted_event_source = self.event_source.clone();
172 let action_sender = self.action_sender.clone();
173 let global = event_source.global();
174 global.task_manager().remote_event_task_source().queue(
175 task!(reestablish_the_event_source_onnection: move || {
176 let event_source = trusted_event_source.root();
177
178 if event_source.ready_state.get() == ReadyState::Closed {
180 return;
181 }
182
183 event_source.ready_state.set(ReadyState::Connecting);
185
186 event_source.upcast::<EventTarget>().fire_event(atom!("error"), CanGc::note());
188
189 let duration = event_source.reconnection_time.get();
191
192 let callback = OneshotTimerCallback::EventSourceTimeout(
197 EventSourceTimeoutCallback {
198 event_source: trusted_event_source,
199 action_sender,
200 }
201 );
202 event_source.global().schedule_callback(callback, duration);
203 }),
204 );
205 }
206
207 fn process_field(&mut self) {
209 match &*self.field {
210 "event" => mem::swap(&mut self.event_type, &mut self.value),
211 "data" => {
212 self.data.push_str(&self.value);
213 self.data.push('\n');
214 },
215 "id" if !self.value.contains('\0') => {
216 mem::swap(&mut self.last_event_id, &mut self.value);
217 },
218 "retry" => {
219 if let Ok(time) = u64::from_str(&self.value) {
220 self.event_source
221 .root()
222 .reconnection_time
223 .set(Duration::from_millis(time));
224 }
225 },
226 _ => (),
227 }
228
229 self.field.clear();
230 self.value.clear();
231 }
232
233 fn dispatch_event(&mut self, can_gc: CanGc) {
235 let event_source = self.event_source.root();
236 *event_source.last_event_id.borrow_mut() = DOMString::from(self.last_event_id.clone());
238 if self.data.is_empty() {
240 self.data.clear();
241 self.event_type.clear();
242 return;
243 }
244 if let Some(last) = self.data.pop() {
246 if last != '\n' {
247 self.data.push(last);
248 }
249 }
250 let type_ = if !self.event_type.is_empty() {
252 Atom::from(self.event_type.clone())
253 } else {
254 atom!("message")
255 };
256 let event = {
258 let _ac = enter_realm(&*event_source);
259 rooted!(in(*GlobalScope::get_cx()) let mut data = UndefinedValue());
260 self.data
261 .safe_to_jsval(GlobalScope::get_cx(), data.handle_mut());
262 MessageEvent::new(
263 &event_source.global(),
264 type_,
265 false,
266 false,
267 data.handle(),
268 DOMString::from(self.origin.clone()),
269 None,
270 event_source.last_event_id.borrow().clone(),
271 Vec::with_capacity(0),
272 can_gc,
273 )
274 };
275 self.event_type.clear();
277 self.data.clear();
278
279 let global = event_source.global();
281 let event_source = self.event_source.clone();
282 let event = Trusted::new(&*event);
283 global.task_manager().remote_event_task_source().queue(
284 task!(dispatch_the_event_source_event: move || {
285 let event_source = event_source.root();
286 if event_source.ready_state.get() != ReadyState::Closed {
287 event.root().upcast::<Event>().fire(event_source.upcast(), CanGc::note());
288 }
289 }),
290 );
291 }
292
293 fn parse(&mut self, stream: Chars, can_gc: CanGc) {
295 let mut stream = stream.peekable();
296
297 while let Some(ch) = stream.next() {
298 match (ch, &self.parser_state) {
299 (':', &ParserState::Eol) => self.parser_state = ParserState::Comment,
300 (':', &ParserState::Field) => {
301 self.parser_state = ParserState::Value;
302 if let Some(&' ') = stream.peek() {
303 stream.next();
304 }
305 },
306
307 ('\n', &ParserState::Value) => {
308 self.parser_state = ParserState::Eol;
309 self.process_field();
310 },
311 ('\r', &ParserState::Value) => {
312 if let Some(&'\n') = stream.peek() {
313 continue;
314 }
315 self.parser_state = ParserState::Eol;
316 self.process_field();
317 },
318
319 ('\n', &ParserState::Field) => {
320 self.parser_state = ParserState::Eol;
321 self.process_field();
322 },
323 ('\r', &ParserState::Field) => {
324 if let Some(&'\n') = stream.peek() {
325 continue;
326 }
327 self.parser_state = ParserState::Eol;
328 self.process_field();
329 },
330
331 ('\n', &ParserState::Eol) => self.dispatch_event(can_gc),
332 ('\r', &ParserState::Eol) => {
333 if let Some(&'\n') = stream.peek() {
334 continue;
335 }
336 self.dispatch_event(can_gc);
337 },
338
339 ('\n', &ParserState::Comment) => self.parser_state = ParserState::Eol,
340 ('\r', &ParserState::Comment) => {
341 if let Some(&'\n') = stream.peek() {
342 continue;
343 }
344 self.parser_state = ParserState::Eol;
345 },
346
347 (_, &ParserState::Field) => self.field.push(ch),
348 (_, &ParserState::Value) => self.value.push(ch),
349 (_, &ParserState::Eol) => {
350 self.parser_state = ParserState::Field;
351 self.field.push(ch);
352 },
353 (_, &ParserState::Comment) => (),
354 }
355 }
356 }
357}
358
359impl FetchResponseListener for EventSourceContext {
360 fn process_request_body(&mut self, _: RequestId) {
361 }
363
364 fn process_request_eof(&mut self, _: RequestId) {
365 }
367
368 fn process_response(&mut self, _: RequestId, metadata: Result<FetchMetadata, NetworkError>) {
369 match metadata {
370 Ok(fm) => {
371 let meta = match fm {
372 FetchMetadata::Unfiltered(m) => m,
373 FetchMetadata::Filtered { unsafe_, filtered } => match filtered {
374 FilteredMetadata::Opaque | FilteredMetadata::OpaqueRedirect(_) => {
375 return self.fail_the_connection();
376 },
377 _ => unsafe_,
378 },
379 };
380 if meta.status.code() != StatusCode::OK {
383 return self.fail_the_connection();
384 }
385 let mime = match meta.content_type {
386 None => return self.fail_the_connection(),
387 Some(ct) => <ContentType as Into<Mime>>::into(ct.into_inner()),
388 };
389 if (mime.type_(), mime.subtype()) != (mime::TEXT, mime::EVENT_STREAM) {
390 return self.fail_the_connection();
391 }
392 self.origin = meta.final_url.origin().ascii_serialization();
393 self.announce_the_connection();
395 },
396 Err(_) => {
397 match self.event_source.root().url.scheme() {
403 "http" | "https" => self.reestablish_the_connection(),
404 _ => self.fail_the_connection(),
405 }
406 },
407 }
408 }
409
410 fn process_response_chunk(&mut self, _: RequestId, chunk: Vec<u8>) {
411 let mut input = &*chunk;
412 if let Some(mut incomplete) = self.incomplete_utf8.take() {
413 match incomplete.try_complete(input) {
414 None => return,
415 Some((result, remaining_input)) => {
416 self.parse(result.unwrap_or("\u{FFFD}").chars(), CanGc::note());
417 input = remaining_input;
418 },
419 }
420 }
421
422 while !input.is_empty() {
423 match utf8::decode(input) {
424 Ok(s) => {
425 self.parse(s.chars(), CanGc::note());
426 return;
427 },
428 Err(utf8::DecodeError::Invalid {
429 valid_prefix,
430 remaining_input,
431 ..
432 }) => {
433 self.parse(valid_prefix.chars(), CanGc::note());
434 self.parse("\u{FFFD}".chars(), CanGc::note());
435 input = remaining_input;
436 },
437 Err(utf8::DecodeError::Incomplete {
438 valid_prefix,
439 incomplete_suffix,
440 }) => {
441 self.parse(valid_prefix.chars(), CanGc::note());
442 self.incomplete_utf8 = Some(incomplete_suffix);
443 return;
444 },
445 }
446 }
447 }
448
449 fn process_response_eof(
450 &mut self,
451 _: RequestId,
452 response: Result<ResourceFetchTiming, NetworkError>,
453 ) {
454 if self.incomplete_utf8.take().is_some() {
455 self.parse("\u{FFFD}".chars(), CanGc::note());
456 }
457 if response.is_ok() {
458 self.reestablish_the_connection();
459 }
460 }
461
462 fn resource_timing_mut(&mut self) -> &mut ResourceFetchTiming {
463 &mut self.resource_timing
464 }
465
466 fn resource_timing(&self) -> &ResourceFetchTiming {
467 &self.resource_timing
468 }
469
470 fn submit_resource_timing(&mut self) {
471 network_listener::submit_timing(self, CanGc::note())
472 }
473
474 fn process_csp_violations(&mut self, _request_id: RequestId, violations: Vec<Violation>) {
475 let global = &self.resource_timing_global();
476 global.report_csp_violations(violations, None, None);
477 }
478}
479
480impl ResourceTimingListener for EventSourceContext {
481 fn resource_timing_information(&self) -> (InitiatorType, ServoUrl) {
482 (InitiatorType::Other, self.event_source.root().url().clone())
483 }
484
485 fn resource_timing_global(&self) -> DomRoot<GlobalScope> {
486 self.event_source.root().global()
487 }
488}
489
490impl PreInvoke for EventSourceContext {
491 fn should_invoke(&self) -> bool {
492 self.event_source.root().generation_id.get() == self.gen_id
493 }
494}
495
496impl EventSource {
497 fn new_inherited(url: ServoUrl, with_credentials: bool) -> EventSource {
498 EventSource {
499 eventtarget: EventTarget::new_inherited(),
500 url,
501 request: DomRefCell::new(None),
502 last_event_id: DomRefCell::new(DOMString::from("")),
503 reconnection_time: Cell::new(DEFAULT_RECONNECTION_TIME),
504 generation_id: Cell::new(GenerationId(0)),
505
506 ready_state: Cell::new(ReadyState::Connecting),
507 with_credentials,
508 droppable: DroppableEventSource::new(DomRefCell::new(Default::default())),
509 }
510 }
511
512 fn new(
513 global: &GlobalScope,
514 proto: Option<HandleObject>,
515 url: ServoUrl,
516 with_credentials: bool,
517 can_gc: CanGc,
518 ) -> DomRoot<EventSource> {
519 reflect_dom_object_with_proto(
520 Box::new(EventSource::new_inherited(url, with_credentials)),
521 global,
522 proto,
523 can_gc,
524 )
525 }
526
527 pub(crate) fn cancel(&self) {
529 self.droppable.cancel();
530 self.fail_the_connection();
531 }
532
533 pub(crate) fn fail_the_connection(&self) {
535 let global = self.global();
536 let event_source = Trusted::new(self);
537 global.task_manager().remote_event_task_source().queue(
538 task!(fail_the_event_source_connection: move || {
539 let event_source = event_source.root();
540 if event_source.ready_state.get() != ReadyState::Closed {
541 event_source.ready_state.set(ReadyState::Closed);
542 event_source.upcast::<EventTarget>().fire_event(atom!("error"), CanGc::note());
543 }
544 }),
545 );
546 }
547
548 pub(crate) fn request(&self) -> RequestBuilder {
549 self.request.borrow().clone().unwrap()
550 }
551
552 pub(crate) fn url(&self) -> &ServoUrl {
553 &self.url
554 }
555}
556
557impl EventSourceMethods<crate::DomTypeHolder> for EventSource {
558 fn Constructor(
560 global: &GlobalScope,
561 proto: Option<HandleObject>,
562 can_gc: CanGc,
563 url: DOMString,
564 event_source_init: &EventSourceInit,
565 ) -> Fallible<DomRoot<EventSource>> {
566 let base_url = global.api_base_url();
570 let url_record = match base_url.join(&url) {
571 Ok(u) => u,
572 Err(_) => return Err(Error::Syntax(None)),
574 };
575 let ev = EventSource::new(
577 global,
578 proto,
579 url_record.clone(),
581 event_source_init.withCredentials,
582 can_gc,
583 );
584 global.track_event_source(&ev);
585 let cors_attribute_state = if event_source_init.withCredentials {
586 CorsSettings::UseCredentials
590 } else {
591 CorsSettings::Anonymous
593 };
594 let mut request = create_a_potential_cors_request(
598 global.webview_id(),
599 url_record,
600 Destination::None,
601 Some(cors_attribute_state),
602 Some(true),
603 global.get_referrer(),
604 global.insecure_requests_policy(),
605 global.has_trustworthy_ancestor_or_current_origin(),
606 global.policy_container(),
607 )
608 .origin(global.origin().immutable().clone())
609 .pipeline_id(Some(global.pipeline_id()));
610
611 request.headers.insert(
614 header::ACCEPT,
615 HeaderValue::from_static("text/event-stream"),
616 );
617 request.cache_mode = CacheMode::NoStore;
619 *ev.request.borrow_mut() = Some(request.clone());
621 let (action_sender, action_receiver) = ipc::channel().unwrap();
624 let context = EventSourceContext {
625 incomplete_utf8: None,
626
627 event_source: Trusted::new(&ev),
628 gen_id: ev.generation_id.get(),
629 action_sender: action_sender.clone(),
630
631 parser_state: ParserState::Eol,
632 field: String::new(),
633 value: String::new(),
634 origin: String::new(),
635
636 event_type: String::new(),
637 data: String::new(),
638 last_event_id: String::new(),
639 resource_timing: ResourceFetchTiming::new(ResourceTimingType::Resource),
640 };
641 let mut listener = NetworkListener {
642 context: Arc::new(Mutex::new(context)),
643 task_source: global.task_manager().networking_task_source().into(),
644 };
645 ROUTER.add_typed_route(
646 action_receiver,
647 Box::new(move |message| {
648 listener.notify_fetch(message.unwrap());
649 }),
650 );
651 ev.droppable.set_canceller(FetchCanceller::new(
652 request.id,
653 global.core_resource_thread(),
654 ));
655 global
656 .core_resource_thread()
657 .send(CoreResourceMsg::Fetch(
658 request,
659 FetchChannels::ResponseMsg(action_sender),
660 ))
661 .unwrap();
662 Ok(ev)
664 }
665
666 event_handler!(open, GetOnopen, SetOnopen);
668
669 event_handler!(message, GetOnmessage, SetOnmessage);
671
672 event_handler!(error, GetOnerror, SetOnerror);
674
675 fn Url(&self) -> DOMString {
677 DOMString::from(self.url.as_str())
678 }
679
680 fn WithCredentials(&self) -> bool {
682 self.with_credentials
683 }
684
685 fn ReadyState(&self) -> u16 {
687 self.ready_state.get() as u16
688 }
689
690 fn Close(&self) {
692 let GenerationId(prev_id) = self.generation_id.get();
693 self.generation_id.set(GenerationId(prev_id + 1));
694 self.droppable.cancel();
695 self.ready_state.set(ReadyState::Closed);
696 }
697}
698
699#[derive(JSTraceable, MallocSizeOf)]
700pub(crate) struct EventSourceTimeoutCallback {
701 #[ignore_malloc_size_of = "Because it is non-owning"]
702 event_source: Trusted<EventSource>,
703 #[ignore_malloc_size_of = "Because it is non-owning"]
704 #[no_trace]
705 action_sender: ipc::IpcSender<FetchResponseMsg>,
706}
707
708impl EventSourceTimeoutCallback {
709 pub(crate) fn invoke(self) {
711 let event_source = self.event_source.root();
712 let global = event_source.global();
713 if event_source.ready_state.get() != ReadyState::Connecting {
715 return;
716 }
717 let mut request = event_source.request();
719 if !event_source.last_event_id.borrow().is_empty() {
721 request.headers.insert(
723 HeaderName::from_static("last-event-id"),
724 HeaderValue::from_str(&String::from(event_source.last_event_id.borrow().clone()))
725 .unwrap(),
726 );
727 }
728 global
730 .core_resource_thread()
731 .send(CoreResourceMsg::Fetch(
732 request,
733 FetchChannels::ResponseMsg(self.action_sender),
734 ))
735 .unwrap();
736 }
737}