1use std::cell::Cell;
6use std::mem;
7use std::str::{Chars, FromStr};
8use std::time::Duration;
9
10use dom_struct::dom_struct;
11use encoding_rs::{Decoder, UTF_8};
12use headers::ContentType;
13use http::StatusCode;
14use http::header::{self, HeaderName, HeaderValue};
15use js::jsval::UndefinedValue;
16use js::rust::HandleObject;
17use mime::{self, Mime};
18use net_traits::request::{CacheMode, CorsSettings, Destination, RequestBuilder, RequestId};
19use net_traits::{FetchMetadata, FilteredMetadata, NetworkError, ResourceFetchTiming};
20use script_bindings::conversions::SafeToJSValConvertible;
21use servo_url::ServoUrl;
22use stylo_atoms::Atom;
23
24use crate::dom::bindings::cell::DomRefCell;
25use crate::dom::bindings::codegen::Bindings::EventSourceBinding::{
26 EventSourceInit, EventSourceMethods,
27};
28use crate::dom::bindings::error::{Error, Fallible};
29use crate::dom::bindings::inheritance::Castable;
30use crate::dom::bindings::refcounted::Trusted;
31use crate::dom::bindings::reflector::{DomGlobal, reflect_dom_object_with_proto};
32use crate::dom::bindings::root::DomRoot;
33use crate::dom::bindings::str::DOMString;
34use crate::dom::csp::{GlobalCspReporting, Violation};
35use crate::dom::event::Event;
36use crate::dom::eventtarget::EventTarget;
37use crate::dom::globalscope::GlobalScope;
38use crate::dom::messageevent::MessageEvent;
39use crate::dom::performance::performanceresourcetiming::InitiatorType;
40use crate::fetch::{FetchCanceller, RequestWithGlobalScope, create_a_potential_cors_request};
41use crate::network_listener::{self, FetchResponseListener, ResourceTimingListener};
42use crate::realms::enter_realm;
43use crate::script_runtime::CanGc;
44use crate::timers::OneshotTimerCallback;
45
46const DEFAULT_RECONNECTION_TIME: Duration = Duration::from_millis(5000);
47
48#[derive(Clone, Copy, Debug, JSTraceable, MallocSizeOf, PartialEq)]
49struct GenerationId(u32);
50
51#[derive(Clone, Copy, Debug, JSTraceable, MallocSizeOf, PartialEq)]
52enum ReadyState {
54 Connecting = 0,
55 Open = 1,
56 Closed = 2,
57}
58
59#[derive(JSTraceable, MallocSizeOf)]
60struct DroppableEventSource {
61 canceller: DomRefCell<FetchCanceller>,
62}
63
64impl DroppableEventSource {
65 pub(crate) fn new(canceller: DomRefCell<FetchCanceller>) -> Self {
66 DroppableEventSource { canceller }
67 }
68
69 pub(crate) fn cancel(&self) {
70 self.canceller.borrow_mut().abort();
71 }
72
73 pub(crate) fn set_canceller(&self, data: FetchCanceller) {
74 *self.canceller.borrow_mut() = data;
75 }
76}
77
78impl Drop for DroppableEventSource {
80 fn drop(&mut self) {
81 self.cancel();
84 }
85}
86
87#[dom_struct]
88pub(crate) struct EventSource {
89 eventtarget: EventTarget,
90 #[no_trace]
91 url: ServoUrl,
92 #[no_trace]
93 request: DomRefCell<Option<RequestBuilder>>,
94 last_event_id: DomRefCell<DOMString>,
95 reconnection_time: Cell<Duration>,
96 generation_id: Cell<GenerationId>,
97
98 ready_state: Cell<ReadyState>,
99 with_credentials: bool,
100 droppable: DroppableEventSource,
101}
102
103#[derive(Clone, MallocSizeOf)]
104enum ParserState {
105 Field,
106 Comment,
107 Value,
108 Eol,
109}
110
111#[derive(MallocSizeOf)]
112struct EventSourceContext {
113 decoder: Decoder,
114 event_source: Trusted<EventSource>,
115 gen_id: GenerationId,
116 parser_state: ParserState,
117 field: String,
118 value: String,
119 origin: String,
120 event_type: String,
121 data: String,
122 last_event_id: String,
123}
124
125impl Clone for EventSourceContext {
126 fn clone(&self) -> Self {
127 EventSourceContext {
128 decoder: UTF_8.new_decoder_with_bom_removal(),
129 event_source: self.event_source.clone(),
130 gen_id: self.gen_id,
131 parser_state: self.parser_state.clone(),
132 field: self.field.clone(),
133 value: self.value.clone(),
134 origin: self.origin.clone(),
135 event_type: self.event_type.clone(),
136 data: self.data.clone(),
137 last_event_id: self.last_event_id.clone(),
138 }
139 }
140}
141
142impl EventSourceContext {
143 fn announce_the_connection(&self) {
145 let event_source = self.event_source.root();
146 if self.gen_id != event_source.generation_id.get() {
147 return;
148 }
149 let global = event_source.global();
150 let event_source = self.event_source.clone();
151 global.task_manager().remote_event_task_source().queue(
152 task!(announce_the_event_source_connection: move || {
153 let event_source = event_source.root();
154 if event_source.ready_state.get() != ReadyState::Closed {
155 event_source.ready_state.set(ReadyState::Open);
156 event_source.upcast::<EventTarget>().fire_event(atom!("open"), CanGc::note());
157 }
158 }),
159 );
160 }
161
162 fn fail_the_connection(&self) {
164 let event_source = self.event_source.root();
165 if self.gen_id != event_source.generation_id.get() {
166 return;
167 }
168 event_source.fail_the_connection();
169 }
170
171 fn reestablish_the_connection(&self) {
173 let event_source = self.event_source.root();
174
175 if self.gen_id != event_source.generation_id.get() {
176 return;
177 }
178
179 let trusted_event_source = self.event_source.clone();
180 let global = event_source.global();
181 let event_source_context = EventSourceContext {
182 decoder: UTF_8.new_decoder_with_bom_removal(),
183 event_source: self.event_source.clone(),
184 gen_id: self.gen_id,
185 parser_state: ParserState::Eol,
186 field: String::new(),
187 value: String::new(),
188 origin: self.origin.clone(),
189 event_type: String::new(),
190 data: String::new(),
191 last_event_id: String::from(event_source.last_event_id.borrow().clone()),
192 };
193 global.task_manager().remote_event_task_source().queue(
194 task!(reestablish_the_event_source_onnection: move || {
195 let event_source = trusted_event_source.root();
196
197 if event_source.ready_state.get() == ReadyState::Closed {
199 return;
200 }
201
202 event_source.ready_state.set(ReadyState::Connecting);
204
205 event_source.upcast::<EventTarget>().fire_event(atom!("error"), CanGc::note());
207
208 let duration = event_source.reconnection_time.get();
210
211 let callback = OneshotTimerCallback::EventSourceTimeout(
216 EventSourceTimeoutCallback {
217 event_source: trusted_event_source,
218 event_source_context,
219 }
220 );
221 event_source.global().schedule_callback(callback, duration);
222 }),
223 );
224 }
225
226 fn process_field(&mut self) {
228 match &*self.field {
229 "event" => mem::swap(&mut self.event_type, &mut self.value),
230 "data" => {
231 self.data.push_str(&self.value);
232 self.data.push('\n');
233 },
234 "id" if !self.value.contains('\0') => {
235 mem::swap(&mut self.last_event_id, &mut self.value);
236 },
237 "retry" => {
238 if let Ok(time) = u64::from_str(&self.value) {
239 self.event_source
240 .root()
241 .reconnection_time
242 .set(Duration::from_millis(time));
243 }
244 },
245 _ => (),
246 }
247
248 self.field.clear();
249 self.value.clear();
250 }
251
252 fn dispatch_event(&mut self, cx: &mut js::context::JSContext) {
254 let event_source = self.event_source.root();
255 *event_source.last_event_id.borrow_mut() = DOMString::from(self.last_event_id.clone());
257 if self.data.is_empty() {
259 self.data.clear();
260 self.event_type.clear();
261 return;
262 }
263 if let Some(last) = self.data.pop() {
265 if last != '\n' {
266 self.data.push(last);
267 }
268 }
269 let type_ = if !self.event_type.is_empty() {
271 Atom::from(self.event_type.clone())
272 } else {
273 atom!("message")
274 };
275 let event = {
277 let _ac = enter_realm(&*event_source);
278 rooted!(&in(cx) let mut data = UndefinedValue());
279 self.data
280 .safe_to_jsval(cx.into(), data.handle_mut(), CanGc::from_cx(cx));
281 MessageEvent::new(
282 &event_source.global(),
283 type_,
284 false,
285 false,
286 data.handle(),
287 DOMString::from(self.origin.clone()),
288 None,
289 event_source.last_event_id.borrow().clone(),
290 Vec::with_capacity(0),
291 CanGc::from_cx(cx),
292 )
293 };
294 self.event_type.clear();
296 self.data.clear();
297
298 let global = event_source.global();
300 let event_source = self.event_source.clone();
301 let event = Trusted::new(&*event);
302 global.task_manager().remote_event_task_source().queue(
303 task!(dispatch_the_event_source_event: move |cx| {
304 let event_source = event_source.root();
305 if event_source.ready_state.get() != ReadyState::Closed {
306 event.root().upcast::<Event>().fire(event_source.upcast(), CanGc::from_cx(cx));
307 }
308 }),
309 );
310 }
311
312 fn parse(&mut self, cx: &mut js::context::JSContext, stream: Chars) {
314 let mut stream = stream.peekable();
315
316 while let Some(ch) = stream.next() {
317 match (ch, &self.parser_state) {
318 (':', &ParserState::Eol) => self.parser_state = ParserState::Comment,
319 (':', &ParserState::Field) => {
320 self.parser_state = ParserState::Value;
321 if let Some(&' ') = stream.peek() {
322 stream.next();
323 }
324 },
325
326 ('\n', &ParserState::Value) => {
327 self.parser_state = ParserState::Eol;
328 self.process_field();
329 },
330 ('\r', &ParserState::Value) => {
331 if let Some(&'\n') = stream.peek() {
332 continue;
333 }
334 self.parser_state = ParserState::Eol;
335 self.process_field();
336 },
337
338 ('\n', &ParserState::Field) => {
339 self.parser_state = ParserState::Eol;
340 self.process_field();
341 },
342 ('\r', &ParserState::Field) => {
343 if let Some(&'\n') = stream.peek() {
344 continue;
345 }
346 self.parser_state = ParserState::Eol;
347 self.process_field();
348 },
349
350 ('\n', &ParserState::Eol) => self.dispatch_event(cx),
351 ('\r', &ParserState::Eol) => {
352 if let Some(&'\n') = stream.peek() {
353 continue;
354 }
355 self.dispatch_event(cx);
356 },
357
358 ('\n', &ParserState::Comment) => self.parser_state = ParserState::Eol,
359 ('\r', &ParserState::Comment) => {
360 if let Some(&'\n') = stream.peek() {
361 continue;
362 }
363 self.parser_state = ParserState::Eol;
364 },
365
366 (_, &ParserState::Field) => self.field.push(ch),
367 (_, &ParserState::Value) => self.value.push(ch),
368 (_, &ParserState::Eol) => {
369 self.parser_state = ParserState::Field;
370 self.field.push(ch);
371 },
372 (_, &ParserState::Comment) => (),
373 }
374 }
375 }
376}
377
378impl FetchResponseListener for EventSourceContext {
379 fn should_invoke(&self) -> bool {
380 self.event_source.root().generation_id.get() == self.gen_id
381 }
382
383 fn process_request_body(&mut self, _: RequestId) {
384 }
386
387 fn process_response(
388 &mut self,
389 _: &mut js::context::JSContext,
390 _: RequestId,
391 metadata: Result<FetchMetadata, NetworkError>,
392 ) {
393 match metadata {
394 Ok(fm) => {
395 let meta = match fm {
396 FetchMetadata::Unfiltered(m) => m,
397 FetchMetadata::Filtered { unsafe_, filtered } => match filtered {
398 FilteredMetadata::Opaque | FilteredMetadata::OpaqueRedirect(_) => {
399 return self.fail_the_connection();
400 },
401 _ => unsafe_,
402 },
403 };
404 if meta.status.code() != StatusCode::OK {
407 return self.fail_the_connection();
408 }
409 let mime = match meta.content_type {
410 None => return self.fail_the_connection(),
411 Some(ct) => <ContentType as Into<Mime>>::into(ct.into_inner()),
412 };
413 if (mime.type_(), mime.subtype()) != (mime::TEXT, mime::EVENT_STREAM) {
414 return self.fail_the_connection();
415 }
416 self.origin = meta.final_url.origin().ascii_serialization();
417 self.announce_the_connection();
419 },
420 Err(error) => {
421 if error.is_permanent_failure() {
425 self.fail_the_connection()
426 } else {
427 self.reestablish_the_connection()
428 }
429 },
430 }
431 }
432
433 fn process_response_chunk(
434 &mut self,
435 cx: &mut js::context::JSContext,
436 _: RequestId,
437 chunk: Vec<u8>,
438 ) {
439 let mut output = String::with_capacity(chunk.len());
440 let mut input = &chunk[..];
441
442 loop {
443 if input.is_empty() {
444 return;
445 }
446 let (result, bytes_read) =
447 self.decoder
448 .decode_to_string_without_replacement(input, &mut output, false);
449 match result {
450 encoding_rs::DecoderResult::InputEmpty => {
451 self.parse(cx, output.chars());
452 return;
453 },
454 encoding_rs::DecoderResult::Malformed(_, _) => {
455 self.parse(cx, output.chars());
456 self.parse(cx, "\u{FFFD}".chars());
457 output.clear();
458 input = &input[bytes_read..];
459 },
460 encoding_rs::DecoderResult::OutputFull => {
461 self.parse(cx, output.chars());
462 output.clear();
463 input = &input[bytes_read..];
464 },
465 }
466 }
467 }
468
469 fn process_response_eof(
470 mut self,
471 cx: &mut js::context::JSContext,
472 _: RequestId,
473 response: Result<(), NetworkError>,
474 timing: ResourceFetchTiming,
475 ) {
476 let mut output = String::new();
477 let (result, _) = self
478 .decoder
479 .decode_to_string_without_replacement(&[], &mut output, true);
480 if !output.is_empty() {
481 self.parse(cx, "\u{FFFD}".chars());
482 }
483 if matches!(result, encoding_rs::DecoderResult::Malformed(_, _)) {
484 self.parse(cx, "\u{FFFD}".chars());
485 }
486 if response.is_ok() {
487 self.reestablish_the_connection();
488 }
489
490 network_listener::submit_timing(cx, &self, &response, &timing);
491 }
492
493 fn process_csp_violations(&mut self, _request_id: RequestId, violations: Vec<Violation>) {
494 let global = &self.resource_timing_global();
495 global.report_csp_violations(violations, None, None);
496 }
497}
498
499impl ResourceTimingListener for EventSourceContext {
500 fn resource_timing_information(&self) -> (InitiatorType, ServoUrl) {
501 (InitiatorType::Other, self.event_source.root().url().clone())
502 }
503
504 fn resource_timing_global(&self) -> DomRoot<GlobalScope> {
505 self.event_source.root().global()
506 }
507}
508
509impl EventSource {
510 fn new_inherited(url: ServoUrl, with_credentials: bool) -> EventSource {
511 EventSource {
512 eventtarget: EventTarget::new_inherited(),
513 url,
514 request: DomRefCell::new(None),
515 last_event_id: DomRefCell::new(DOMString::from("")),
516 reconnection_time: Cell::new(DEFAULT_RECONNECTION_TIME),
517 generation_id: Cell::new(GenerationId(0)),
518
519 ready_state: Cell::new(ReadyState::Connecting),
520 with_credentials,
521 droppable: DroppableEventSource::new(DomRefCell::new(Default::default())),
522 }
523 }
524
525 fn new(
526 global: &GlobalScope,
527 proto: Option<HandleObject>,
528 url: ServoUrl,
529 with_credentials: bool,
530 can_gc: CanGc,
531 ) -> DomRoot<EventSource> {
532 reflect_dom_object_with_proto(
533 Box::new(EventSource::new_inherited(url, with_credentials)),
534 global,
535 proto,
536 can_gc,
537 )
538 }
539
540 pub(crate) fn cancel(&self) {
542 self.droppable.cancel();
543 self.fail_the_connection();
544 }
545
546 pub(crate) fn fail_the_connection(&self) {
548 let global = self.global();
549 let event_source = Trusted::new(self);
550 global.task_manager().remote_event_task_source().queue(
551 task!(fail_the_event_source_connection: move || {
552 let event_source = event_source.root();
553 if event_source.ready_state.get() != ReadyState::Closed {
554 event_source.ready_state.set(ReadyState::Closed);
555 event_source.upcast::<EventTarget>().fire_event(atom!("error"), CanGc::note());
556 }
557 }),
558 );
559 }
560
561 pub(crate) fn request(&self) -> RequestBuilder {
562 self.request.borrow().clone().unwrap()
563 }
564
565 pub(crate) fn url(&self) -> &ServoUrl {
566 &self.url
567 }
568}
569
570impl EventSourceMethods<crate::DomTypeHolder> for EventSource {
571 fn Constructor(
573 global: &GlobalScope,
574 proto: Option<HandleObject>,
575 can_gc: CanGc,
576 url: DOMString,
577 event_source_init: &EventSourceInit,
578 ) -> Fallible<DomRoot<EventSource>> {
579 let url_record = match global.encoding_parse_a_url(&url.str()) {
583 Ok(u) => u,
584 Err(_) => return Err(Error::Syntax(None)),
586 };
587 let event_source = EventSource::new(
589 global,
590 proto,
591 url_record.clone(),
593 event_source_init.withCredentials,
594 can_gc,
595 );
596 global.track_event_source(&event_source);
597 let cors_attribute_state = if event_source_init.withCredentials {
598 CorsSettings::UseCredentials
602 } else {
603 CorsSettings::Anonymous
605 };
606 let mut request = create_a_potential_cors_request(
611 global.webview_id(),
612 url_record,
613 Destination::None,
614 Some(cors_attribute_state),
615 Some(true),
616 global.get_referrer(),
617 )
618 .with_global_scope(global);
619
620 request.headers.insert(
623 header::ACCEPT,
624 HeaderValue::from_static("text/event-stream"),
625 );
626 request.cache_mode = CacheMode::NoStore;
628 *event_source.request.borrow_mut() = Some(request.clone());
630 event_source.droppable.set_canceller(FetchCanceller::new(
634 request.id,
635 false,
636 global.core_resource_thread(),
637 ));
638
639 let context = EventSourceContext {
640 decoder: UTF_8.new_decoder_with_bom_removal(),
641 event_source: Trusted::new(&event_source),
642 gen_id: event_source.generation_id.get(),
643 parser_state: ParserState::Eol,
644 field: String::new(),
645 value: String::new(),
646 origin: String::new(),
647
648 event_type: String::new(),
649 data: String::new(),
650 last_event_id: String::new(),
651 };
652
653 let task_source = global.task_manager().networking_task_source().into();
654 global.fetch(request, context, task_source);
655
656 Ok(event_source)
658 }
659
660 event_handler!(open, GetOnopen, SetOnopen);
662
663 event_handler!(message, GetOnmessage, SetOnmessage);
665
666 event_handler!(error, GetOnerror, SetOnerror);
668
669 fn Url(&self) -> DOMString {
671 DOMString::from(self.url.as_str())
672 }
673
674 fn WithCredentials(&self) -> bool {
676 self.with_credentials
677 }
678
679 fn ReadyState(&self) -> u16 {
681 self.ready_state.get() as u16
682 }
683
684 fn Close(&self) {
686 let GenerationId(prev_id) = self.generation_id.get();
687 self.generation_id.set(GenerationId(prev_id + 1));
688 self.droppable.cancel();
689 self.ready_state.set(ReadyState::Closed);
690 }
691}
692
693#[derive(JSTraceable, MallocSizeOf)]
694pub(crate) struct EventSourceTimeoutCallback {
695 #[ignore_malloc_size_of = "Because it is non-owning"]
696 event_source: Trusted<EventSource>,
697 #[no_trace]
698 event_source_context: EventSourceContext,
699}
700
701impl EventSourceTimeoutCallback {
702 pub(crate) fn invoke(self) {
704 let event_source = self.event_source.root();
705 let global = event_source.global();
706
707 if event_source.ready_state.get() != ReadyState::Connecting {
709 return;
710 }
711
712 let mut request = event_source.request();
714
715 if !event_source.last_event_id.borrow().is_empty() {
719 request.headers.insert(
721 HeaderName::from_static("last-event-id"),
722 HeaderValue::from_str(&String::from(event_source.last_event_id.borrow().clone()))
723 .unwrap(),
724 );
725 }
726
727 let task_source = global.task_manager().networking_task_source().into();
730 global.fetch(request, self.event_source_context, task_source);
731 }
732}