1use std::cell::Cell;
6use std::mem;
7use std::str::{Chars, FromStr};
8use std::time::Duration;
9
10use dom_struct::dom_struct;
11use encoding_rs::{Decoder, UTF_8};
12use headers::ContentType;
13use http::StatusCode;
14use http::header::{self, HeaderName, HeaderValue};
15use js::jsval::UndefinedValue;
16use js::rust::HandleObject;
17use mime::{self, Mime};
18use net_traits::request::{CacheMode, CorsSettings, Destination, RequestBuilder, RequestId};
19use net_traits::{FetchMetadata, FilteredMetadata, NetworkError, ResourceFetchTiming};
20use script_bindings::cell::DomRefCell;
21use script_bindings::conversions::SafeToJSValConvertible;
22use script_bindings::reflector::reflect_dom_object_with_proto;
23use servo_url::ServoUrl;
24use stylo_atoms::Atom;
25
26use crate::dom::bindings::codegen::Bindings::EventSourceBinding::{
27 EventSourceInit, EventSourceMethods,
28};
29use crate::dom::bindings::error::{Error, Fallible};
30use crate::dom::bindings::inheritance::Castable;
31use crate::dom::bindings::refcounted::Trusted;
32use crate::dom::bindings::reflector::DomGlobal;
33use crate::dom::bindings::root::DomRoot;
34use crate::dom::bindings::str::DOMString;
35use crate::dom::csp::{GlobalCspReporting, Violation};
36use crate::dom::event::Event;
37use crate::dom::eventtarget::EventTarget;
38use crate::dom::globalscope::GlobalScope;
39use crate::dom::messageevent::MessageEvent;
40use crate::dom::performance::performanceresourcetiming::InitiatorType;
41use crate::fetch::{FetchCanceller, RequestWithGlobalScope, create_a_potential_cors_request};
42use crate::network_listener::{self, FetchResponseListener, ResourceTimingListener};
43use crate::realms::enter_realm;
44use crate::script_runtime::CanGc;
45use crate::timers::OneshotTimerCallback;
46
47const DEFAULT_RECONNECTION_TIME: Duration = Duration::from_millis(5000);
48
49#[derive(Clone, Copy, Debug, JSTraceable, MallocSizeOf, PartialEq)]
50struct GenerationId(u32);
51
52#[derive(Clone, Copy, Debug, JSTraceable, MallocSizeOf, PartialEq)]
53enum ReadyState {
55 Connecting = 0,
56 Open = 1,
57 Closed = 2,
58}
59
60#[derive(JSTraceable, MallocSizeOf)]
61struct DroppableEventSource {
62 canceller: DomRefCell<FetchCanceller>,
63}
64
65impl DroppableEventSource {
66 pub(crate) fn new(canceller: DomRefCell<FetchCanceller>) -> Self {
67 DroppableEventSource { canceller }
68 }
69
70 pub(crate) fn cancel(&self) {
71 self.canceller.borrow_mut().abort();
72 }
73
74 pub(crate) fn set_canceller(&self, data: FetchCanceller) {
75 *self.canceller.borrow_mut() = data;
76 }
77}
78
79impl Drop for DroppableEventSource {
81 fn drop(&mut self) {
82 self.cancel();
85 }
86}
87
88#[dom_struct]
89pub(crate) struct EventSource {
90 eventtarget: EventTarget,
91 #[no_trace]
92 url: ServoUrl,
93 #[no_trace]
94 request: DomRefCell<Option<RequestBuilder>>,
95 last_event_id: DomRefCell<DOMString>,
96 reconnection_time: Cell<Duration>,
97 generation_id: Cell<GenerationId>,
98
99 ready_state: Cell<ReadyState>,
100 with_credentials: bool,
101 droppable: DroppableEventSource,
102}
103
104#[derive(Clone, MallocSizeOf)]
105enum ParserState {
106 Field,
107 Comment,
108 Value,
109 Eol,
110}
111
112#[derive(MallocSizeOf)]
113struct EventSourceContext {
114 decoder: Decoder,
115 event_source: Trusted<EventSource>,
116 gen_id: GenerationId,
117 parser_state: ParserState,
118 field: String,
119 value: String,
120 origin: String,
121 event_type: String,
122 data: String,
123 last_event_id: String,
124}
125
126impl Clone for EventSourceContext {
127 fn clone(&self) -> Self {
128 EventSourceContext {
129 decoder: UTF_8.new_decoder_with_bom_removal(),
130 event_source: self.event_source.clone(),
131 gen_id: self.gen_id,
132 parser_state: self.parser_state.clone(),
133 field: self.field.clone(),
134 value: self.value.clone(),
135 origin: self.origin.clone(),
136 event_type: self.event_type.clone(),
137 data: self.data.clone(),
138 last_event_id: self.last_event_id.clone(),
139 }
140 }
141}
142
143impl EventSourceContext {
144 fn announce_the_connection(&self) {
146 let event_source = self.event_source.root();
147 if self.gen_id != event_source.generation_id.get() {
148 return;
149 }
150 let global = event_source.global();
151 let event_source = self.event_source.clone();
152 global.task_manager().remote_event_task_source().queue(
153 task!(announce_the_event_source_connection: move |cx| {
154 let event_source = event_source.root();
155 if event_source.ready_state.get() != ReadyState::Closed {
156 event_source.ready_state.set(ReadyState::Open);
157 event_source.upcast::<EventTarget>().fire_event(cx, atom!("open"));
158 }
159 }),
160 );
161 }
162
163 fn fail_the_connection(&self) {
165 let event_source = self.event_source.root();
166 if self.gen_id != event_source.generation_id.get() {
167 return;
168 }
169 event_source.fail_the_connection();
170 }
171
172 fn reestablish_the_connection(&self) {
174 let event_source = self.event_source.root();
175
176 if self.gen_id != event_source.generation_id.get() {
177 return;
178 }
179
180 let trusted_event_source = self.event_source.clone();
181 let global = event_source.global();
182 let event_source_context = EventSourceContext {
183 decoder: UTF_8.new_decoder_with_bom_removal(),
184 event_source: self.event_source.clone(),
185 gen_id: self.gen_id,
186 parser_state: ParserState::Eol,
187 field: String::new(),
188 value: String::new(),
189 origin: self.origin.clone(),
190 event_type: String::new(),
191 data: String::new(),
192 last_event_id: String::from(event_source.last_event_id.borrow().clone()),
193 };
194 global.task_manager().remote_event_task_source().queue(
195 task!(reestablish_the_event_source_onnection: move |cx| {
196 let event_source = trusted_event_source.root();
197
198 if event_source.ready_state.get() == ReadyState::Closed {
200 return;
201 }
202
203 event_source.ready_state.set(ReadyState::Connecting);
205
206 event_source.upcast::<EventTarget>().fire_event(cx, atom!("error"));
208
209 let duration = event_source.reconnection_time.get();
211
212 let callback = OneshotTimerCallback::EventSourceTimeout(
217 EventSourceTimeoutCallback {
218 event_source: trusted_event_source,
219 event_source_context,
220 }
221 );
222 event_source.global().schedule_callback(callback, duration);
223 }),
224 );
225 }
226
227 fn process_field(&mut self) {
229 match &*self.field {
230 "event" => mem::swap(&mut self.event_type, &mut self.value),
231 "data" => {
232 self.data.push_str(&self.value);
233 self.data.push('\n');
234 },
235 "id" if !self.value.contains('\0') => {
236 mem::swap(&mut self.last_event_id, &mut self.value);
237 },
238 "retry" => {
239 if let Ok(time) = u64::from_str(&self.value) {
240 self.event_source
241 .root()
242 .reconnection_time
243 .set(Duration::from_millis(time));
244 }
245 },
246 _ => (),
247 }
248
249 self.field.clear();
250 self.value.clear();
251 }
252
253 fn dispatch_event(&mut self, cx: &mut js::context::JSContext) {
255 let event_source = self.event_source.root();
256 *event_source.last_event_id.borrow_mut() = DOMString::from(self.last_event_id.clone());
258 if self.data.is_empty() {
260 self.data.clear();
261 self.event_type.clear();
262 return;
263 }
264 if let Some(last) = self.data.pop() &&
266 last != '\n'
267 {
268 self.data.push(last);
269 }
270 let type_ = if !self.event_type.is_empty() {
272 Atom::from(self.event_type.clone())
273 } else {
274 atom!("message")
275 };
276 let event = {
278 let _ac = enter_realm(&*event_source);
279 rooted!(&in(cx) let mut data = UndefinedValue());
280 self.data
281 .safe_to_jsval(cx.into(), data.handle_mut(), CanGc::from_cx(cx));
282 MessageEvent::new(
283 &event_source.global(),
284 type_,
285 false,
286 false,
287 data.handle(),
288 DOMString::from(self.origin.clone()),
289 None,
290 event_source.last_event_id.borrow().clone(),
291 Vec::with_capacity(0),
292 CanGc::from_cx(cx),
293 )
294 };
295 self.event_type.clear();
297 self.data.clear();
298
299 let global = event_source.global();
301 let event_source = self.event_source.clone();
302 let event = Trusted::new(&*event);
303 global.task_manager().remote_event_task_source().queue(
304 task!(dispatch_the_event_source_event: move |cx| {
305 let event_source = event_source.root();
306 if event_source.ready_state.get() != ReadyState::Closed {
307 event.root().upcast::<Event>().fire(event_source.upcast(), CanGc::from_cx(cx));
308 }
309 }),
310 );
311 }
312
313 fn parse(&mut self, cx: &mut js::context::JSContext, stream: Chars) {
315 let mut stream = stream.peekable();
316
317 while let Some(ch) = stream.next() {
318 match (ch, &self.parser_state) {
319 (':', &ParserState::Eol) => self.parser_state = ParserState::Comment,
320 (':', &ParserState::Field) => {
321 self.parser_state = ParserState::Value;
322 if let Some(&' ') = stream.peek() {
323 stream.next();
324 }
325 },
326
327 ('\n', &ParserState::Value) => {
328 self.parser_state = ParserState::Eol;
329 self.process_field();
330 },
331 ('\r', &ParserState::Value) => {
332 if let Some(&'\n') = stream.peek() {
333 continue;
334 }
335 self.parser_state = ParserState::Eol;
336 self.process_field();
337 },
338
339 ('\n', &ParserState::Field) => {
340 self.parser_state = ParserState::Eol;
341 self.process_field();
342 },
343 ('\r', &ParserState::Field) => {
344 if let Some(&'\n') = stream.peek() {
345 continue;
346 }
347 self.parser_state = ParserState::Eol;
348 self.process_field();
349 },
350
351 ('\n', &ParserState::Eol) => self.dispatch_event(cx),
352 ('\r', &ParserState::Eol) => {
353 if let Some(&'\n') = stream.peek() {
354 continue;
355 }
356 self.dispatch_event(cx);
357 },
358
359 ('\n', &ParserState::Comment) => self.parser_state = ParserState::Eol,
360 ('\r', &ParserState::Comment) => {
361 if let Some(&'\n') = stream.peek() {
362 continue;
363 }
364 self.parser_state = ParserState::Eol;
365 },
366
367 (_, &ParserState::Field) => self.field.push(ch),
368 (_, &ParserState::Value) => self.value.push(ch),
369 (_, &ParserState::Eol) => {
370 self.parser_state = ParserState::Field;
371 self.field.push(ch);
372 },
373 (_, &ParserState::Comment) => (),
374 }
375 }
376 }
377}
378
379impl FetchResponseListener for EventSourceContext {
380 fn should_invoke(&self) -> bool {
381 self.event_source.root().generation_id.get() == self.gen_id
382 }
383
384 fn process_request_body(&mut self, _: RequestId) {
385 }
387
388 fn process_response(
389 &mut self,
390 _: &mut js::context::JSContext,
391 _: RequestId,
392 metadata: Result<FetchMetadata, NetworkError>,
393 ) {
394 match metadata {
395 Ok(fm) => {
396 let meta = match fm {
397 FetchMetadata::Unfiltered(m) => m,
398 FetchMetadata::Filtered { unsafe_, filtered } => match filtered {
399 FilteredMetadata::Opaque | FilteredMetadata::OpaqueRedirect(_) => {
400 return self.fail_the_connection();
401 },
402 _ => unsafe_,
403 },
404 };
405 if meta.status.code() != StatusCode::OK {
408 return self.fail_the_connection();
409 }
410 let mime = match meta.content_type {
411 None => return self.fail_the_connection(),
412 Some(ct) => <ContentType as Into<Mime>>::into(ct.into_inner()),
413 };
414 if (mime.type_(), mime.subtype()) != (mime::TEXT, mime::EVENT_STREAM) {
415 return self.fail_the_connection();
416 }
417 self.origin = meta.final_url.origin().ascii_serialization();
418 self.announce_the_connection();
420 },
421 Err(error) => {
422 if error.is_permanent_failure() {
426 self.fail_the_connection()
427 } else {
428 self.reestablish_the_connection()
429 }
430 },
431 }
432 }
433
434 fn process_response_chunk(
435 &mut self,
436 cx: &mut js::context::JSContext,
437 _: RequestId,
438 chunk: Vec<u8>,
439 ) {
440 let mut output = String::with_capacity(chunk.len());
441 let mut input = &chunk[..];
442
443 loop {
444 if input.is_empty() {
445 return;
446 }
447 let (result, bytes_read) =
448 self.decoder
449 .decode_to_string_without_replacement(input, &mut output, false);
450 match result {
451 encoding_rs::DecoderResult::InputEmpty => {
452 self.parse(cx, output.chars());
453 return;
454 },
455 encoding_rs::DecoderResult::Malformed(_, _) => {
456 self.parse(cx, output.chars());
457 self.parse(cx, "\u{FFFD}".chars());
458 output.clear();
459 input = &input[bytes_read..];
460 },
461 encoding_rs::DecoderResult::OutputFull => {
462 self.parse(cx, output.chars());
463 output.clear();
464 input = &input[bytes_read..];
465 },
466 }
467 }
468 }
469
470 fn process_response_eof(
471 mut self,
472 cx: &mut js::context::JSContext,
473 _: RequestId,
474 response: Result<(), NetworkError>,
475 timing: ResourceFetchTiming,
476 ) {
477 let mut output = String::new();
478 let (result, _) = self
479 .decoder
480 .decode_to_string_without_replacement(&[], &mut output, true);
481 if !output.is_empty() {
482 self.parse(cx, "\u{FFFD}".chars());
483 }
484 if matches!(result, encoding_rs::DecoderResult::Malformed(_, _)) {
485 self.parse(cx, "\u{FFFD}".chars());
486 }
487 if response.is_ok() {
488 self.reestablish_the_connection();
489 }
490
491 network_listener::submit_timing(cx, &self, &response, &timing);
492 }
493
494 fn process_csp_violations(&mut self, _request_id: RequestId, violations: Vec<Violation>) {
495 let global = &self.resource_timing_global();
496 global.report_csp_violations(violations, None, None);
497 }
498}
499
500impl ResourceTimingListener for EventSourceContext {
501 fn resource_timing_information(&self) -> (InitiatorType, ServoUrl) {
502 (InitiatorType::Other, self.event_source.root().url().clone())
503 }
504
505 fn resource_timing_global(&self) -> DomRoot<GlobalScope> {
506 self.event_source.root().global()
507 }
508}
509
510impl EventSource {
511 fn new_inherited(url: ServoUrl, with_credentials: bool) -> EventSource {
512 EventSource {
513 eventtarget: EventTarget::new_inherited(),
514 url,
515 request: DomRefCell::new(None),
516 last_event_id: DomRefCell::new(DOMString::from("")),
517 reconnection_time: Cell::new(DEFAULT_RECONNECTION_TIME),
518 generation_id: Cell::new(GenerationId(0)),
519
520 ready_state: Cell::new(ReadyState::Connecting),
521 with_credentials,
522 droppable: DroppableEventSource::new(DomRefCell::new(Default::default())),
523 }
524 }
525
526 fn new(
527 global: &GlobalScope,
528 proto: Option<HandleObject>,
529 url: ServoUrl,
530 with_credentials: bool,
531 can_gc: CanGc,
532 ) -> DomRoot<EventSource> {
533 reflect_dom_object_with_proto(
534 Box::new(EventSource::new_inherited(url, with_credentials)),
535 global,
536 proto,
537 can_gc,
538 )
539 }
540
541 pub(crate) fn cancel(&self) {
543 self.droppable.cancel();
544 self.fail_the_connection();
545 }
546
547 pub(crate) fn fail_the_connection(&self) {
549 let global = self.global();
550 let event_source = Trusted::new(self);
551 global.task_manager().remote_event_task_source().queue(
552 task!(fail_the_event_source_connection: move |cx| {
553 let event_source = event_source.root();
554 if event_source.ready_state.get() != ReadyState::Closed {
555 event_source.ready_state.set(ReadyState::Closed);
556 event_source.upcast::<EventTarget>().fire_event(cx, atom!("error"));
557 }
558 }),
559 );
560 }
561
562 pub(crate) fn request(&self) -> RequestBuilder {
563 self.request.borrow().clone().unwrap()
564 }
565
566 pub(crate) fn url(&self) -> &ServoUrl {
567 &self.url
568 }
569}
570
571impl EventSourceMethods<crate::DomTypeHolder> for EventSource {
572 fn Constructor(
574 global: &GlobalScope,
575 proto: Option<HandleObject>,
576 can_gc: CanGc,
577 url: DOMString,
578 event_source_init: &EventSourceInit,
579 ) -> Fallible<DomRoot<EventSource>> {
580 let url_record = match global.encoding_parse_a_url(&url.str()) {
584 Ok(u) => u,
585 Err(_) => return Err(Error::Syntax(None)),
587 };
588 let event_source = EventSource::new(
590 global,
591 proto,
592 url_record.clone(),
594 event_source_init.withCredentials,
595 can_gc,
596 );
597 global.track_event_source(&event_source);
598 let cors_attribute_state = if event_source_init.withCredentials {
599 CorsSettings::UseCredentials
603 } else {
604 CorsSettings::Anonymous
606 };
607 let mut request = create_a_potential_cors_request(
612 global.webview_id(),
613 url_record,
614 Destination::None,
615 Some(cors_attribute_state),
616 Some(true),
617 global.get_referrer(),
618 )
619 .with_global_scope(global);
620
621 request.headers.insert(
624 header::ACCEPT,
625 HeaderValue::from_static("text/event-stream"),
626 );
627 request.cache_mode = CacheMode::NoStore;
629 *event_source.request.borrow_mut() = Some(request.clone());
631 event_source.droppable.set_canceller(FetchCanceller::new(
635 request.id,
636 false,
637 global.core_resource_thread(),
638 ));
639
640 let context = EventSourceContext {
641 decoder: UTF_8.new_decoder_with_bom_removal(),
642 event_source: Trusted::new(&event_source),
643 gen_id: event_source.generation_id.get(),
644 parser_state: ParserState::Eol,
645 field: String::new(),
646 value: String::new(),
647 origin: String::new(),
648
649 event_type: String::new(),
650 data: String::new(),
651 last_event_id: String::new(),
652 };
653
654 let task_source = global.task_manager().networking_task_source().into();
655 global.fetch(request, context, task_source);
656
657 Ok(event_source)
659 }
660
661 event_handler!(open, GetOnopen, SetOnopen);
663
664 event_handler!(message, GetOnmessage, SetOnmessage);
666
667 event_handler!(error, GetOnerror, SetOnerror);
669
670 fn Url(&self) -> DOMString {
672 DOMString::from(self.url.as_str())
673 }
674
675 fn WithCredentials(&self) -> bool {
677 self.with_credentials
678 }
679
680 fn ReadyState(&self) -> u16 {
682 self.ready_state.get() as u16
683 }
684
685 fn Close(&self) {
687 let GenerationId(prev_id) = self.generation_id.get();
688 self.generation_id.set(GenerationId(prev_id + 1));
689 self.droppable.cancel();
690 self.ready_state.set(ReadyState::Closed);
691 }
692}
693
694#[derive(JSTraceable, MallocSizeOf)]
695pub(crate) struct EventSourceTimeoutCallback {
696 #[ignore_malloc_size_of = "Because it is non-owning"]
697 event_source: Trusted<EventSource>,
698 #[no_trace]
699 event_source_context: EventSourceContext,
700}
701
702impl EventSourceTimeoutCallback {
703 pub(crate) fn invoke(self) {
705 let event_source = self.event_source.root();
706 let global = event_source.global();
707
708 if event_source.ready_state.get() != ReadyState::Connecting {
710 return;
711 }
712
713 let mut request = event_source.request();
715
716 if !event_source.last_event_id.borrow().is_empty() {
720 request.headers.insert(
722 HeaderName::from_static("last-event-id"),
723 HeaderValue::from_str(&String::from(event_source.last_event_id.borrow().clone()))
724 .unwrap(),
725 );
726 }
727
728 let task_source = global.task_manager().networking_task_source().into();
731 global.fetch(request, self.event_source_context, task_source);
732 }
733}