background_hang_monitor/
background_hang_monitor.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
4
5use std::collections::VecDeque;
6use std::thread::{self, Builder, JoinHandle};
7use std::time::{Duration, Instant};
8
9use background_hang_monitor_api::{
10    BackgroundHangMonitor, BackgroundHangMonitorClone, BackgroundHangMonitorControlMsg,
11    BackgroundHangMonitorExitSignal, BackgroundHangMonitorRegister, HangAlert, HangAnnotation,
12    HangMonitorAlert, MonitoredComponentId,
13};
14use base::generic_channel::{GenericReceiver, GenericSender, RoutedReceiver};
15use crossbeam_channel::{Receiver, Sender, after, never, select, unbounded};
16use rustc_hash::FxHashMap;
17
18use crate::SamplerImpl;
19use crate::sampler::{NativeStack, Sampler};
20
21#[derive(Clone)]
22pub struct HangMonitorRegister {
23    sender: MonitoredComponentSender,
24    monitoring_enabled: bool,
25}
26
27impl HangMonitorRegister {
28    /// Start a new hang monitor worker, and return a handle to register components for monitoring,
29    /// as well as a join handle on the worker thread.
30    pub fn init(
31        constellation_chan: GenericSender<HangMonitorAlert>,
32        control_port: GenericReceiver<BackgroundHangMonitorControlMsg>,
33        monitoring_enabled: bool,
34    ) -> (Box<dyn BackgroundHangMonitorRegister>, JoinHandle<()>) {
35        let (sender, port) = unbounded();
36
37        let join_handle = Builder::new()
38            .name("BackgroundHangMonitor".to_owned())
39            .spawn(move || {
40                let mut monitor = BackgroundHangMonitorWorker::new(
41                    constellation_chan,
42                    control_port,
43                    port,
44                    monitoring_enabled,
45                );
46                while monitor.run() {
47                    // Monitoring until all senders have been dropped...
48                }
49            })
50            .expect("Couldn't start BHM worker.");
51        (
52            Box::new(HangMonitorRegister {
53                sender,
54                monitoring_enabled,
55            }),
56            join_handle,
57        )
58    }
59}
60
61impl BackgroundHangMonitorRegister for HangMonitorRegister {
62    /// Register a component for monitoring.
63    /// Returns a dedicated wrapper around a sender
64    /// to be used for communication with the hang monitor worker.
65    fn register_component(
66        &self,
67        component_id: MonitoredComponentId,
68        transient_hang_timeout: Duration,
69        permanent_hang_timeout: Duration,
70        exit_signal: Box<dyn BackgroundHangMonitorExitSignal>,
71    ) -> Box<dyn BackgroundHangMonitor> {
72        let bhm_chan = BackgroundHangMonitorChan::new(
73            self.sender.clone(),
74            component_id,
75            self.monitoring_enabled,
76        );
77
78        bhm_chan.send(MonitoredComponentMsg::Register(
79            SamplerImpl::default(),
80            thread::current().name().map(str::to_owned),
81            transient_hang_timeout,
82            permanent_hang_timeout,
83            exit_signal,
84        ));
85        Box::new(bhm_chan)
86    }
87}
88
89impl BackgroundHangMonitorClone for HangMonitorRegister {
90    fn clone_box(&self) -> Box<dyn BackgroundHangMonitorRegister> {
91        Box::new(self.clone())
92    }
93}
94
95/// Messages sent from monitored components to the monitor.
96enum MonitoredComponentMsg {
97    /// Register component for monitoring,
98    Register(
99        SamplerImpl,
100        Option<String>,
101        Duration,
102        Duration,
103        Box<dyn BackgroundHangMonitorExitSignal>,
104    ),
105    /// Unregister component for monitoring.
106    Unregister,
107    /// Notify start of new activity for a given component,
108    NotifyActivity(HangAnnotation),
109    /// Notify start of waiting for a new task to come-in for processing.
110    NotifyWait,
111}
112
113/// A wrapper around a sender to the monitor,
114/// which will send the Id of the monitored component along with each message,
115/// and keep track of whether the monitor is still listening on the other end.
116struct BackgroundHangMonitorChan {
117    sender: MonitoredComponentSender,
118    component_id: MonitoredComponentId,
119    monitoring_enabled: bool,
120}
121
122impl BackgroundHangMonitorChan {
123    fn new(
124        sender: MonitoredComponentSender,
125        component_id: MonitoredComponentId,
126        monitoring_enabled: bool,
127    ) -> Self {
128        BackgroundHangMonitorChan {
129            sender,
130            component_id,
131            monitoring_enabled,
132        }
133    }
134
135    fn send(&self, msg: MonitoredComponentMsg) {
136        self.sender
137            .send((self.component_id.clone(), msg))
138            .expect("BHM is gone");
139    }
140}
141
142impl BackgroundHangMonitor for BackgroundHangMonitorChan {
143    fn notify_activity(&self, annotation: HangAnnotation) {
144        if self.monitoring_enabled {
145            let msg = MonitoredComponentMsg::NotifyActivity(annotation);
146            self.send(msg);
147        }
148    }
149    fn notify_wait(&self) {
150        if self.monitoring_enabled {
151            let msg = MonitoredComponentMsg::NotifyWait;
152            self.send(msg);
153        }
154    }
155    fn unregister(&self) {
156        let msg = MonitoredComponentMsg::Unregister;
157        self.send(msg);
158    }
159}
160
161struct MonitoredComponent {
162    sampler: SamplerImpl,
163    last_activity: Instant,
164    last_annotation: Option<HangAnnotation>,
165    transient_hang_timeout: Duration,
166    permanent_hang_timeout: Duration,
167    sent_transient_alert: bool,
168    sent_permanent_alert: bool,
169    is_waiting: bool,
170    exit_signal: Box<dyn BackgroundHangMonitorExitSignal>,
171}
172
173struct Sample(MonitoredComponentId, Instant, NativeStack);
174
175struct BackgroundHangMonitorWorker {
176    component_names: FxHashMap<MonitoredComponentId, String>,
177    monitored_components: FxHashMap<MonitoredComponentId, MonitoredComponent>,
178    constellation_chan: GenericSender<HangMonitorAlert>,
179    port: Receiver<(MonitoredComponentId, MonitoredComponentMsg)>,
180    control_port: RoutedReceiver<BackgroundHangMonitorControlMsg>,
181    sampling_duration: Option<Duration>,
182    sampling_max_duration: Option<Duration>,
183    last_sample: Instant,
184    creation: Instant,
185    sampling_baseline: Instant,
186    samples: VecDeque<Sample>,
187    monitoring_enabled: bool,
188    shutting_down: bool,
189}
190
191type MonitoredComponentSender = Sender<(MonitoredComponentId, MonitoredComponentMsg)>;
192type MonitoredComponentReceiver = Receiver<(MonitoredComponentId, MonitoredComponentMsg)>;
193
194impl BackgroundHangMonitorWorker {
195    fn new(
196        constellation_chan: GenericSender<HangMonitorAlert>,
197        control_port: GenericReceiver<BackgroundHangMonitorControlMsg>,
198        port: MonitoredComponentReceiver,
199        monitoring_enabled: bool,
200    ) -> Self {
201        let control_port = control_port.route_preserving_errors();
202        Self {
203            component_names: Default::default(),
204            monitored_components: Default::default(),
205            constellation_chan,
206            port,
207            control_port,
208            sampling_duration: None,
209            sampling_max_duration: None,
210            last_sample: Instant::now(),
211            sampling_baseline: Instant::now(),
212            creation: Instant::now(),
213            samples: Default::default(),
214            monitoring_enabled,
215            shutting_down: Default::default(),
216        }
217    }
218
219    fn finish_sampled_profile(&mut self) {
220        let mut bytes = vec![];
221        bytes.extend(
222            format!(
223                "{{ \"rate\": {}, \"start\": {}, \"data\": [\n",
224                self.sampling_duration.unwrap().as_millis(),
225                (self.sampling_baseline - self.creation).as_millis(),
226            )
227            .as_bytes(),
228        );
229
230        let mut first = true;
231        let to_resolve = self.samples.len();
232        for (i, Sample(id, instant, stack)) in self.samples.drain(..).enumerate() {
233            println!("Resolving {}/{}", i + 1, to_resolve);
234            let profile = stack.to_hangprofile();
235            let name = match self.component_names.get(&id) {
236                Some(ref s) => format!("\"{}\"", s),
237                None => "null".to_string(),
238            };
239            let json = format!(
240                "{}{{ \"name\": {}, \"event loop id\": \"{:?}\", \
241                 \"time\": {}, \"frames\": {} }}",
242                if !first { ",\n" } else { "" },
243                name,
244                id,
245                (instant - self.sampling_baseline).as_millis(),
246                serde_json::to_string(&profile.backtrace).unwrap(),
247            );
248            bytes.extend(json.as_bytes());
249            first = false;
250        }
251
252        bytes.extend(b"\n] }");
253        let _ = self
254            .constellation_chan
255            .send(HangMonitorAlert::Profile(bytes));
256    }
257
258    fn run(&mut self) -> bool {
259        let tick = if let Some(duration) = self.sampling_duration {
260            let duration = duration
261                .checked_sub(Instant::now() - self.last_sample)
262                .unwrap_or_else(|| Duration::from_millis(0));
263            after(duration)
264        } else if self.monitoring_enabled {
265            after(Duration::from_millis(100))
266        } else {
267            never()
268        };
269
270        let received = select! {
271            recv(self.port) -> event => {
272                if let Ok(event) = event {
273                    Some(event)
274                } else {
275                    // All senders have dropped,
276                    // which means all monitored components have shut down,
277                    // and so we can as well.
278                    return false;
279                }
280            },
281            recv(self.control_port) -> event => {
282                match event {
283                    Ok(Ok(BackgroundHangMonitorControlMsg::ToggleSampler(rate, max_duration))) => {
284                        if self.sampling_duration.is_some() {
285                            println!("Enabling profiler.");
286                            self.finish_sampled_profile();
287                            self.sampling_duration = None;
288                        } else {
289                            println!("Disabling profiler.");
290                            self.sampling_duration = Some(rate);
291                            self.sampling_max_duration = Some(max_duration);
292                            self.sampling_baseline = Instant::now();
293                        }
294                        None
295                    },
296                    Ok(Ok(BackgroundHangMonitorControlMsg::Exit)) => {
297                        for component in self.monitored_components.values_mut() {
298                            component.exit_signal.signal_to_exit();
299                        }
300
301                        // Note the start of shutdown,
302                        // to ensure exit propagates,
303                        // even to components that have yet to register themselves,
304                        // from this point on.
305                        self.shutting_down = true;
306
307                        // Keep running; this worker thread will shutdown
308                        // when the monitored components have shutdown,
309                        // which we know has happened when `self.port` disconnects.
310                        None
311                    },
312                    Ok(Err(e)) => {
313                        log::warn!("BackgroundHangMonitorWorker control message deserialization error: {e:?}");
314                        None
315                    },
316                    Err(_) => return false,
317                }
318            }
319            recv(tick) -> _ => None,
320        };
321
322        if let Some(msg) = received {
323            self.handle_msg(msg);
324            while let Ok(another_msg) = self.port.try_recv() {
325                // Handle any other incoming messages,
326                // before performing a hang checkpoint.
327                self.handle_msg(another_msg);
328            }
329        }
330
331        if let Some(duration) = self.sampling_duration {
332            let now = Instant::now();
333            if now - self.last_sample > duration {
334                self.sample();
335                self.last_sample = now;
336            }
337        } else {
338            self.perform_a_hang_monitor_checkpoint();
339        }
340        true
341    }
342
343    fn handle_msg(&mut self, msg: (MonitoredComponentId, MonitoredComponentMsg)) {
344        match msg {
345            (
346                component_id,
347                MonitoredComponentMsg::Register(
348                    sampler,
349                    name,
350                    transient_hang_timeout,
351                    permanent_hang_timeout,
352                    exit_signal,
353                ),
354            ) => {
355                // If we are shutting down,
356                // propagate it to the component,
357                // and register it(the component will unregister itself
358                // as part of handling the exit).
359                if self.shutting_down {
360                    exit_signal.signal_to_exit();
361                }
362
363                let component = MonitoredComponent {
364                    sampler,
365                    last_activity: Instant::now(),
366                    last_annotation: None,
367                    transient_hang_timeout,
368                    permanent_hang_timeout,
369                    sent_transient_alert: false,
370                    sent_permanent_alert: false,
371                    is_waiting: true,
372                    exit_signal,
373                };
374                if let Some(name) = name {
375                    self.component_names.insert(component_id.clone(), name);
376                }
377                assert!(
378                    self.monitored_components
379                        .insert(component_id, component)
380                        .is_none(),
381                    "This component was already registered for monitoring."
382                );
383            },
384            (component_id, MonitoredComponentMsg::Unregister) => {
385                self.monitored_components
386                    .remove_entry(&component_id)
387                    .expect("Received Unregister for an unknown component");
388            },
389            (component_id, MonitoredComponentMsg::NotifyActivity(annotation)) => {
390                let component = self
391                    .monitored_components
392                    .get_mut(&component_id)
393                    .expect("Received NotifyActivity for an unknown component");
394                component.last_activity = Instant::now();
395                component.last_annotation = Some(annotation);
396                component.sent_transient_alert = false;
397                component.sent_permanent_alert = false;
398                component.is_waiting = false;
399            },
400            (component_id, MonitoredComponentMsg::NotifyWait) => {
401                let component = self
402                    .monitored_components
403                    .get_mut(&component_id)
404                    .expect("Received NotifyWait for an unknown component");
405                component.last_activity = Instant::now();
406                component.sent_transient_alert = false;
407                component.sent_permanent_alert = false;
408                component.is_waiting = true;
409            },
410        }
411    }
412
413    fn perform_a_hang_monitor_checkpoint(&mut self) {
414        for (component_id, monitored) in self.monitored_components.iter_mut() {
415            if monitored.is_waiting {
416                continue;
417            }
418            let last_annotation = monitored.last_annotation.unwrap();
419            if monitored.last_activity.elapsed() > monitored.permanent_hang_timeout {
420                if monitored.sent_permanent_alert {
421                    continue;
422                }
423                let profile = match monitored.sampler.suspend_and_sample_thread() {
424                    Ok(native_stack) => Some(native_stack.to_hangprofile()),
425                    Err(()) => None,
426                };
427                let _ = self
428                    .constellation_chan
429                    .send(HangMonitorAlert::Hang(HangAlert::Permanent(
430                        component_id.clone(),
431                        last_annotation,
432                        profile,
433                    )));
434                monitored.sent_permanent_alert = true;
435                continue;
436            }
437            if monitored.last_activity.elapsed() > monitored.transient_hang_timeout {
438                if monitored.sent_transient_alert {
439                    continue;
440                }
441                let _ = self
442                    .constellation_chan
443                    .send(HangMonitorAlert::Hang(HangAlert::Transient(
444                        component_id.clone(),
445                        last_annotation,
446                    )));
447                monitored.sent_transient_alert = true;
448            }
449        }
450    }
451
452    fn sample(&mut self) {
453        for (component_id, monitored) in self.monitored_components.iter_mut() {
454            let instant = Instant::now();
455            if let Ok(stack) = monitored.sampler.suspend_and_sample_thread() {
456                if self.sampling_baseline.elapsed() >
457                    self.sampling_max_duration
458                        .expect("Max duration has been set")
459                {
460                    // Buffer is full, start discarding older samples.
461                    self.samples.pop_front();
462                }
463                self.samples
464                    .push_back(Sample(component_id.clone(), instant, stack));
465            }
466        }
467    }
468}