background_hang_monitor/
background_hang_monitor.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
4
5use std::collections::VecDeque;
6use std::thread::{self, Builder, JoinHandle};
7use std::time::{Duration, Instant};
8
9use background_hang_monitor_api::{
10    BackgroundHangMonitor, BackgroundHangMonitorClone, BackgroundHangMonitorControlMsg,
11    BackgroundHangMonitorExitSignal, BackgroundHangMonitorRegister, HangAlert, HangAnnotation,
12    HangMonitorAlert, MonitoredComponentId,
13};
14use base::generic_channel::{GenericReceiver, GenericSender, RoutedReceiver};
15use crossbeam_channel::{Receiver, Sender, after, never, select, unbounded};
16use rustc_hash::FxHashMap;
17
18use crate::SamplerImpl;
19use crate::sampler::{NativeStack, Sampler};
20
21#[derive(Clone)]
22pub struct HangMonitorRegister {
23    sender: MonitoredComponentSender,
24    monitoring_enabled: bool,
25}
26
27impl HangMonitorRegister {
28    /// Start a new hang monitor worker, and return a handle to register components for monitoring,
29    /// as well as a join handle on the worker thread.
30    pub fn init(
31        constellation_chan: GenericSender<HangMonitorAlert>,
32        control_port: GenericReceiver<BackgroundHangMonitorControlMsg>,
33        monitoring_enabled: bool,
34    ) -> (Box<dyn BackgroundHangMonitorRegister>, JoinHandle<()>) {
35        let (sender, port) = unbounded();
36        let sender_clone = sender.clone();
37
38        let join_handle = Builder::new()
39            .name("BackgroundHangMonitor".to_owned())
40            .spawn(move || {
41                let mut monitor = BackgroundHangMonitorWorker::new(
42                    constellation_chan,
43                    control_port,
44                    port,
45                    monitoring_enabled,
46                );
47                while monitor.run() {
48                    // Monitoring until all senders have been dropped...
49                }
50            })
51            .expect("Couldn't start BHM worker.");
52        (
53            Box::new(HangMonitorRegister {
54                sender: sender_clone,
55                monitoring_enabled,
56            }),
57            join_handle,
58        )
59    }
60}
61
62impl BackgroundHangMonitorRegister for HangMonitorRegister {
63    /// Register a component for monitoring.
64    /// Returns a dedicated wrapper around a sender
65    /// to be used for communication with the hang monitor worker.
66    fn register_component(
67        &self,
68        component_id: MonitoredComponentId,
69        transient_hang_timeout: Duration,
70        permanent_hang_timeout: Duration,
71        exit_signal: Box<dyn BackgroundHangMonitorExitSignal>,
72    ) -> Box<dyn BackgroundHangMonitor> {
73        let bhm_chan = BackgroundHangMonitorChan::new(
74            self.sender.clone(),
75            component_id,
76            self.monitoring_enabled,
77        );
78
79        bhm_chan.send(MonitoredComponentMsg::Register(
80            SamplerImpl::new_boxed(),
81            thread::current().name().map(str::to_owned),
82            transient_hang_timeout,
83            permanent_hang_timeout,
84            exit_signal,
85        ));
86        Box::new(bhm_chan)
87    }
88}
89
90impl BackgroundHangMonitorClone for HangMonitorRegister {
91    fn clone_box(&self) -> Box<dyn BackgroundHangMonitorRegister> {
92        Box::new(self.clone())
93    }
94}
95
96/// Messages sent from monitored components to the monitor.
97enum MonitoredComponentMsg {
98    /// Register component for monitoring,
99    Register(
100        Box<dyn Sampler>,
101        Option<String>,
102        Duration,
103        Duration,
104        Box<dyn BackgroundHangMonitorExitSignal>,
105    ),
106    /// Unregister component for monitoring.
107    Unregister,
108    /// Notify start of new activity for a given component,
109    NotifyActivity(HangAnnotation),
110    /// Notify start of waiting for a new task to come-in for processing.
111    NotifyWait,
112}
113
114/// A wrapper around a sender to the monitor,
115/// which will send the Id of the monitored component along with each message,
116/// and keep track of whether the monitor is still listening on the other end.
117struct BackgroundHangMonitorChan {
118    sender: MonitoredComponentSender,
119    component_id: MonitoredComponentId,
120    monitoring_enabled: bool,
121}
122
123impl BackgroundHangMonitorChan {
124    fn new(
125        sender: MonitoredComponentSender,
126        component_id: MonitoredComponentId,
127        monitoring_enabled: bool,
128    ) -> Self {
129        BackgroundHangMonitorChan {
130            sender,
131            component_id,
132            monitoring_enabled,
133        }
134    }
135
136    fn send(&self, msg: MonitoredComponentMsg) {
137        self.sender
138            .send((self.component_id.clone(), msg))
139            .expect("BHM is gone");
140    }
141}
142
143impl BackgroundHangMonitor for BackgroundHangMonitorChan {
144    fn notify_activity(&self, annotation: HangAnnotation) {
145        if self.monitoring_enabled {
146            let msg = MonitoredComponentMsg::NotifyActivity(annotation);
147            self.send(msg);
148        }
149    }
150    fn notify_wait(&self) {
151        if self.monitoring_enabled {
152            let msg = MonitoredComponentMsg::NotifyWait;
153            self.send(msg);
154        }
155    }
156    fn unregister(&self) {
157        let msg = MonitoredComponentMsg::Unregister;
158        self.send(msg);
159    }
160}
161
162struct MonitoredComponent {
163    sampler: Box<dyn Sampler>,
164    last_activity: Instant,
165    last_annotation: Option<HangAnnotation>,
166    transient_hang_timeout: Duration,
167    permanent_hang_timeout: Duration,
168    sent_transient_alert: bool,
169    sent_permanent_alert: bool,
170    is_waiting: bool,
171    exit_signal: Box<dyn BackgroundHangMonitorExitSignal>,
172}
173
174struct Sample(MonitoredComponentId, Instant, NativeStack);
175
176struct BackgroundHangMonitorWorker {
177    component_names: FxHashMap<MonitoredComponentId, String>,
178    monitored_components: FxHashMap<MonitoredComponentId, MonitoredComponent>,
179    constellation_chan: GenericSender<HangMonitorAlert>,
180    port: Receiver<(MonitoredComponentId, MonitoredComponentMsg)>,
181    control_port: RoutedReceiver<BackgroundHangMonitorControlMsg>,
182    sampling_duration: Option<Duration>,
183    sampling_max_duration: Option<Duration>,
184    last_sample: Instant,
185    creation: Instant,
186    sampling_baseline: Instant,
187    samples: VecDeque<Sample>,
188    monitoring_enabled: bool,
189    shutting_down: bool,
190}
191
192type MonitoredComponentSender = Sender<(MonitoredComponentId, MonitoredComponentMsg)>;
193type MonitoredComponentReceiver = Receiver<(MonitoredComponentId, MonitoredComponentMsg)>;
194
195impl BackgroundHangMonitorWorker {
196    fn new(
197        constellation_chan: GenericSender<HangMonitorAlert>,
198        control_port: GenericReceiver<BackgroundHangMonitorControlMsg>,
199        port: MonitoredComponentReceiver,
200        monitoring_enabled: bool,
201    ) -> Self {
202        let control_port = control_port.route_preserving_errors();
203        Self {
204            component_names: Default::default(),
205            monitored_components: Default::default(),
206            constellation_chan,
207            port,
208            control_port,
209            sampling_duration: None,
210            sampling_max_duration: None,
211            last_sample: Instant::now(),
212            sampling_baseline: Instant::now(),
213            creation: Instant::now(),
214            samples: Default::default(),
215            monitoring_enabled,
216            shutting_down: Default::default(),
217        }
218    }
219
220    fn finish_sampled_profile(&mut self) {
221        let mut bytes = vec![];
222        bytes.extend(
223            format!(
224                "{{ \"rate\": {}, \"start\": {}, \"data\": [\n",
225                self.sampling_duration.unwrap().as_millis(),
226                (self.sampling_baseline - self.creation).as_millis(),
227            )
228            .as_bytes(),
229        );
230
231        let mut first = true;
232        let to_resolve = self.samples.len();
233        for (i, Sample(id, instant, stack)) in self.samples.drain(..).enumerate() {
234            println!("Resolving {}/{}", i + 1, to_resolve);
235            let profile = stack.to_hangprofile();
236            let name = match self.component_names.get(&id) {
237                Some(ref s) => format!("\"{}\"", s),
238                None => "null".to_string(),
239            };
240            let json = format!(
241                "{}{{ \"name\": {}, \"namespace\": {}, \"index\": {}, \"type\": \"{:?}\", \
242                 \"time\": {}, \"frames\": {} }}",
243                if !first { ",\n" } else { "" },
244                name,
245                id.0.namespace_id.0,
246                id.0.index.0.get(),
247                id.1,
248                (instant - self.sampling_baseline).as_millis(),
249                serde_json::to_string(&profile.backtrace).unwrap(),
250            );
251            bytes.extend(json.as_bytes());
252            first = false;
253        }
254
255        bytes.extend(b"\n] }");
256        let _ = self
257            .constellation_chan
258            .send(HangMonitorAlert::Profile(bytes));
259    }
260
261    fn run(&mut self) -> bool {
262        let tick = if let Some(duration) = self.sampling_duration {
263            let duration = duration
264                .checked_sub(Instant::now() - self.last_sample)
265                .unwrap_or_else(|| Duration::from_millis(0));
266            after(duration)
267        } else if self.monitoring_enabled {
268            after(Duration::from_millis(100))
269        } else {
270            never()
271        };
272
273        let received = select! {
274            recv(self.port) -> event => {
275                if let Ok(event) = event {
276                    Some(event)
277                } else {
278                    // All senders have dropped,
279                    // which means all monitored components have shut down,
280                    // and so we can as well.
281                    return false;
282                }
283            },
284            recv(self.control_port) -> event => {
285                match event {
286                    Ok(Ok(BackgroundHangMonitorControlMsg::ToggleSampler(rate, max_duration))) => {
287                        if self.sampling_duration.is_some() {
288                            println!("Enabling profiler.");
289                            self.finish_sampled_profile();
290                            self.sampling_duration = None;
291                        } else {
292                            println!("Disabling profiler.");
293                            self.sampling_duration = Some(rate);
294                            self.sampling_max_duration = Some(max_duration);
295                            self.sampling_baseline = Instant::now();
296                        }
297                        None
298                    },
299                    Ok(Ok(BackgroundHangMonitorControlMsg::Exit)) => {
300                        for component in self.monitored_components.values_mut() {
301                            component.exit_signal.signal_to_exit();
302                        }
303
304                        // Note the start of shutdown,
305                        // to ensure exit propagates,
306                        // even to components that have yet to register themselves,
307                        // from this point on.
308                        self.shutting_down = true;
309
310                        // Keep running; this worker thread will shutdown
311                        // when the monitored components have shutdown,
312                        // which we know has happened when `self.port` disconnects.
313                        None
314                    },
315                    Ok(Err(e)) => {
316                        log::warn!("BackgroundHangMonitorWorker control message deserialization error: {e:?}");
317                        None
318                    },
319                    Err(_) => return false,
320                }
321            }
322            recv(tick) -> _ => None,
323        };
324
325        if let Some(msg) = received {
326            self.handle_msg(msg);
327            while let Ok(another_msg) = self.port.try_recv() {
328                // Handle any other incoming messages,
329                // before performing a hang checkpoint.
330                self.handle_msg(another_msg);
331            }
332        }
333
334        if let Some(duration) = self.sampling_duration {
335            let now = Instant::now();
336            if now - self.last_sample > duration {
337                self.sample();
338                self.last_sample = now;
339            }
340        } else {
341            self.perform_a_hang_monitor_checkpoint();
342        }
343        true
344    }
345
346    fn handle_msg(&mut self, msg: (MonitoredComponentId, MonitoredComponentMsg)) {
347        match msg {
348            (
349                component_id,
350                MonitoredComponentMsg::Register(
351                    sampler,
352                    name,
353                    transient_hang_timeout,
354                    permanent_hang_timeout,
355                    exit_signal,
356                ),
357            ) => {
358                // If we are shutting down,
359                // propagate it to the component,
360                // and register it(the component will unregister itself
361                // as part of handling the exit).
362                if self.shutting_down {
363                    exit_signal.signal_to_exit();
364                }
365
366                let component = MonitoredComponent {
367                    sampler,
368                    last_activity: Instant::now(),
369                    last_annotation: None,
370                    transient_hang_timeout,
371                    permanent_hang_timeout,
372                    sent_transient_alert: false,
373                    sent_permanent_alert: false,
374                    is_waiting: true,
375                    exit_signal,
376                };
377                if let Some(name) = name {
378                    self.component_names.insert(component_id.clone(), name);
379                }
380                assert!(
381                    self.monitored_components
382                        .insert(component_id, component)
383                        .is_none(),
384                    "This component was already registered for monitoring."
385                );
386            },
387            (component_id, MonitoredComponentMsg::Unregister) => {
388                self.monitored_components
389                    .remove_entry(&component_id)
390                    .expect("Received Unregister for an unknown component");
391            },
392            (component_id, MonitoredComponentMsg::NotifyActivity(annotation)) => {
393                let component = self
394                    .monitored_components
395                    .get_mut(&component_id)
396                    .expect("Received NotifyActivity for an unknown component");
397                component.last_activity = Instant::now();
398                component.last_annotation = Some(annotation);
399                component.sent_transient_alert = false;
400                component.sent_permanent_alert = false;
401                component.is_waiting = false;
402            },
403            (component_id, MonitoredComponentMsg::NotifyWait) => {
404                let component = self
405                    .monitored_components
406                    .get_mut(&component_id)
407                    .expect("Received NotifyWait for an unknown component");
408                component.last_activity = Instant::now();
409                component.sent_transient_alert = false;
410                component.sent_permanent_alert = false;
411                component.is_waiting = true;
412            },
413        }
414    }
415
416    fn perform_a_hang_monitor_checkpoint(&mut self) {
417        for (component_id, monitored) in self.monitored_components.iter_mut() {
418            if monitored.is_waiting {
419                continue;
420            }
421            let last_annotation = monitored.last_annotation.unwrap();
422            if monitored.last_activity.elapsed() > monitored.permanent_hang_timeout {
423                if monitored.sent_permanent_alert {
424                    continue;
425                }
426                let profile = match monitored.sampler.suspend_and_sample_thread() {
427                    Ok(native_stack) => Some(native_stack.to_hangprofile()),
428                    Err(()) => None,
429                };
430                let _ = self
431                    .constellation_chan
432                    .send(HangMonitorAlert::Hang(HangAlert::Permanent(
433                        component_id.clone(),
434                        last_annotation,
435                        profile,
436                    )));
437                monitored.sent_permanent_alert = true;
438                continue;
439            }
440            if monitored.last_activity.elapsed() > monitored.transient_hang_timeout {
441                if monitored.sent_transient_alert {
442                    continue;
443                }
444                let _ = self
445                    .constellation_chan
446                    .send(HangMonitorAlert::Hang(HangAlert::Transient(
447                        component_id.clone(),
448                        last_annotation,
449                    )));
450                monitored.sent_transient_alert = true;
451            }
452        }
453    }
454
455    fn sample(&mut self) {
456        for (component_id, monitored) in self.monitored_components.iter_mut() {
457            let instant = Instant::now();
458            if let Ok(stack) = monitored.sampler.suspend_and_sample_thread() {
459                if self.sampling_baseline.elapsed() >
460                    self.sampling_max_duration
461                        .expect("Max duration has been set")
462                {
463                    // Buffer is full, start discarding older samples.
464                    self.samples.pop_front();
465                }
466                self.samples
467                    .push_back(Sample(component_id.clone(), instant, stack));
468            }
469        }
470    }
471}