1use std::collections::VecDeque;
6use std::thread::{self, Builder, JoinHandle};
7use std::time::{Duration, Instant};
8
9use background_hang_monitor_api::{
10 BackgroundHangMonitor, BackgroundHangMonitorClone, BackgroundHangMonitorControlMsg,
11 BackgroundHangMonitorExitSignal, BackgroundHangMonitorRegister, HangAlert, HangAnnotation,
12 HangMonitorAlert, MonitoredComponentId,
13};
14use base::generic_channel::{GenericReceiver, GenericSender, RoutedReceiver};
15use crossbeam_channel::{Receiver, Sender, after, never, select, unbounded};
16use rustc_hash::FxHashMap;
17
18use crate::SamplerImpl;
19use crate::sampler::{NativeStack, Sampler};
20
21#[derive(Clone)]
22pub struct HangMonitorRegister {
23 sender: MonitoredComponentSender,
24 monitoring_enabled: bool,
25}
26
27impl HangMonitorRegister {
28 pub fn init(
31 constellation_chan: GenericSender<HangMonitorAlert>,
32 control_port: GenericReceiver<BackgroundHangMonitorControlMsg>,
33 monitoring_enabled: bool,
34 ) -> (Box<dyn BackgroundHangMonitorRegister>, JoinHandle<()>) {
35 let (sender, port) = unbounded();
36 let sender_clone = sender.clone();
37
38 let join_handle = Builder::new()
39 .name("BackgroundHangMonitor".to_owned())
40 .spawn(move || {
41 let mut monitor = BackgroundHangMonitorWorker::new(
42 constellation_chan,
43 control_port,
44 port,
45 monitoring_enabled,
46 );
47 while monitor.run() {
48 }
50 })
51 .expect("Couldn't start BHM worker.");
52 (
53 Box::new(HangMonitorRegister {
54 sender: sender_clone,
55 monitoring_enabled,
56 }),
57 join_handle,
58 )
59 }
60}
61
62impl BackgroundHangMonitorRegister for HangMonitorRegister {
63 fn register_component(
67 &self,
68 component_id: MonitoredComponentId,
69 transient_hang_timeout: Duration,
70 permanent_hang_timeout: Duration,
71 exit_signal: Box<dyn BackgroundHangMonitorExitSignal>,
72 ) -> Box<dyn BackgroundHangMonitor> {
73 let bhm_chan = BackgroundHangMonitorChan::new(
74 self.sender.clone(),
75 component_id,
76 self.monitoring_enabled,
77 );
78
79 bhm_chan.send(MonitoredComponentMsg::Register(
80 SamplerImpl::new_boxed(),
81 thread::current().name().map(str::to_owned),
82 transient_hang_timeout,
83 permanent_hang_timeout,
84 exit_signal,
85 ));
86 Box::new(bhm_chan)
87 }
88}
89
90impl BackgroundHangMonitorClone for HangMonitorRegister {
91 fn clone_box(&self) -> Box<dyn BackgroundHangMonitorRegister> {
92 Box::new(self.clone())
93 }
94}
95
96enum MonitoredComponentMsg {
98 Register(
100 Box<dyn Sampler>,
101 Option<String>,
102 Duration,
103 Duration,
104 Box<dyn BackgroundHangMonitorExitSignal>,
105 ),
106 Unregister,
108 NotifyActivity(HangAnnotation),
110 NotifyWait,
112}
113
114struct BackgroundHangMonitorChan {
118 sender: MonitoredComponentSender,
119 component_id: MonitoredComponentId,
120 monitoring_enabled: bool,
121}
122
123impl BackgroundHangMonitorChan {
124 fn new(
125 sender: MonitoredComponentSender,
126 component_id: MonitoredComponentId,
127 monitoring_enabled: bool,
128 ) -> Self {
129 BackgroundHangMonitorChan {
130 sender,
131 component_id,
132 monitoring_enabled,
133 }
134 }
135
136 fn send(&self, msg: MonitoredComponentMsg) {
137 self.sender
138 .send((self.component_id.clone(), msg))
139 .expect("BHM is gone");
140 }
141}
142
143impl BackgroundHangMonitor for BackgroundHangMonitorChan {
144 fn notify_activity(&self, annotation: HangAnnotation) {
145 if self.monitoring_enabled {
146 let msg = MonitoredComponentMsg::NotifyActivity(annotation);
147 self.send(msg);
148 }
149 }
150 fn notify_wait(&self) {
151 if self.monitoring_enabled {
152 let msg = MonitoredComponentMsg::NotifyWait;
153 self.send(msg);
154 }
155 }
156 fn unregister(&self) {
157 let msg = MonitoredComponentMsg::Unregister;
158 self.send(msg);
159 }
160}
161
162struct MonitoredComponent {
163 sampler: Box<dyn Sampler>,
164 last_activity: Instant,
165 last_annotation: Option<HangAnnotation>,
166 transient_hang_timeout: Duration,
167 permanent_hang_timeout: Duration,
168 sent_transient_alert: bool,
169 sent_permanent_alert: bool,
170 is_waiting: bool,
171 exit_signal: Box<dyn BackgroundHangMonitorExitSignal>,
172}
173
174struct Sample(MonitoredComponentId, Instant, NativeStack);
175
176struct BackgroundHangMonitorWorker {
177 component_names: FxHashMap<MonitoredComponentId, String>,
178 monitored_components: FxHashMap<MonitoredComponentId, MonitoredComponent>,
179 constellation_chan: GenericSender<HangMonitorAlert>,
180 port: Receiver<(MonitoredComponentId, MonitoredComponentMsg)>,
181 control_port: RoutedReceiver<BackgroundHangMonitorControlMsg>,
182 sampling_duration: Option<Duration>,
183 sampling_max_duration: Option<Duration>,
184 last_sample: Instant,
185 creation: Instant,
186 sampling_baseline: Instant,
187 samples: VecDeque<Sample>,
188 monitoring_enabled: bool,
189 shutting_down: bool,
190}
191
192type MonitoredComponentSender = Sender<(MonitoredComponentId, MonitoredComponentMsg)>;
193type MonitoredComponentReceiver = Receiver<(MonitoredComponentId, MonitoredComponentMsg)>;
194
195impl BackgroundHangMonitorWorker {
196 fn new(
197 constellation_chan: GenericSender<HangMonitorAlert>,
198 control_port: GenericReceiver<BackgroundHangMonitorControlMsg>,
199 port: MonitoredComponentReceiver,
200 monitoring_enabled: bool,
201 ) -> Self {
202 let control_port = control_port.route_preserving_errors();
203 Self {
204 component_names: Default::default(),
205 monitored_components: Default::default(),
206 constellation_chan,
207 port,
208 control_port,
209 sampling_duration: None,
210 sampling_max_duration: None,
211 last_sample: Instant::now(),
212 sampling_baseline: Instant::now(),
213 creation: Instant::now(),
214 samples: Default::default(),
215 monitoring_enabled,
216 shutting_down: Default::default(),
217 }
218 }
219
220 fn finish_sampled_profile(&mut self) {
221 let mut bytes = vec![];
222 bytes.extend(
223 format!(
224 "{{ \"rate\": {}, \"start\": {}, \"data\": [\n",
225 self.sampling_duration.unwrap().as_millis(),
226 (self.sampling_baseline - self.creation).as_millis(),
227 )
228 .as_bytes(),
229 );
230
231 let mut first = true;
232 let to_resolve = self.samples.len();
233 for (i, Sample(id, instant, stack)) in self.samples.drain(..).enumerate() {
234 println!("Resolving {}/{}", i + 1, to_resolve);
235 let profile = stack.to_hangprofile();
236 let name = match self.component_names.get(&id) {
237 Some(ref s) => format!("\"{}\"", s),
238 None => "null".to_string(),
239 };
240 let json = format!(
241 "{}{{ \"name\": {}, \"namespace\": {}, \"index\": {}, \"type\": \"{:?}\", \
242 \"time\": {}, \"frames\": {} }}",
243 if !first { ",\n" } else { "" },
244 name,
245 id.0.namespace_id.0,
246 id.0.index.0.get(),
247 id.1,
248 (instant - self.sampling_baseline).as_millis(),
249 serde_json::to_string(&profile.backtrace).unwrap(),
250 );
251 bytes.extend(json.as_bytes());
252 first = false;
253 }
254
255 bytes.extend(b"\n] }");
256 let _ = self
257 .constellation_chan
258 .send(HangMonitorAlert::Profile(bytes));
259 }
260
261 fn run(&mut self) -> bool {
262 let tick = if let Some(duration) = self.sampling_duration {
263 let duration = duration
264 .checked_sub(Instant::now() - self.last_sample)
265 .unwrap_or_else(|| Duration::from_millis(0));
266 after(duration)
267 } else if self.monitoring_enabled {
268 after(Duration::from_millis(100))
269 } else {
270 never()
271 };
272
273 let received = select! {
274 recv(self.port) -> event => {
275 if let Ok(event) = event {
276 Some(event)
277 } else {
278 return false;
282 }
283 },
284 recv(self.control_port) -> event => {
285 match event {
286 Ok(Ok(BackgroundHangMonitorControlMsg::ToggleSampler(rate, max_duration))) => {
287 if self.sampling_duration.is_some() {
288 println!("Enabling profiler.");
289 self.finish_sampled_profile();
290 self.sampling_duration = None;
291 } else {
292 println!("Disabling profiler.");
293 self.sampling_duration = Some(rate);
294 self.sampling_max_duration = Some(max_duration);
295 self.sampling_baseline = Instant::now();
296 }
297 None
298 },
299 Ok(Ok(BackgroundHangMonitorControlMsg::Exit)) => {
300 for component in self.monitored_components.values_mut() {
301 component.exit_signal.signal_to_exit();
302 }
303
304 self.shutting_down = true;
309
310 None
314 },
315 Ok(Err(e)) => {
316 log::warn!("BackgroundHangMonitorWorker control message deserialization error: {e:?}");
317 None
318 },
319 Err(_) => return false,
320 }
321 }
322 recv(tick) -> _ => None,
323 };
324
325 if let Some(msg) = received {
326 self.handle_msg(msg);
327 while let Ok(another_msg) = self.port.try_recv() {
328 self.handle_msg(another_msg);
331 }
332 }
333
334 if let Some(duration) = self.sampling_duration {
335 let now = Instant::now();
336 if now - self.last_sample > duration {
337 self.sample();
338 self.last_sample = now;
339 }
340 } else {
341 self.perform_a_hang_monitor_checkpoint();
342 }
343 true
344 }
345
346 fn handle_msg(&mut self, msg: (MonitoredComponentId, MonitoredComponentMsg)) {
347 match msg {
348 (
349 component_id,
350 MonitoredComponentMsg::Register(
351 sampler,
352 name,
353 transient_hang_timeout,
354 permanent_hang_timeout,
355 exit_signal,
356 ),
357 ) => {
358 if self.shutting_down {
363 exit_signal.signal_to_exit();
364 }
365
366 let component = MonitoredComponent {
367 sampler,
368 last_activity: Instant::now(),
369 last_annotation: None,
370 transient_hang_timeout,
371 permanent_hang_timeout,
372 sent_transient_alert: false,
373 sent_permanent_alert: false,
374 is_waiting: true,
375 exit_signal,
376 };
377 if let Some(name) = name {
378 self.component_names.insert(component_id.clone(), name);
379 }
380 assert!(
381 self.monitored_components
382 .insert(component_id, component)
383 .is_none(),
384 "This component was already registered for monitoring."
385 );
386 },
387 (component_id, MonitoredComponentMsg::Unregister) => {
388 self.monitored_components
389 .remove_entry(&component_id)
390 .expect("Received Unregister for an unknown component");
391 },
392 (component_id, MonitoredComponentMsg::NotifyActivity(annotation)) => {
393 let component = self
394 .monitored_components
395 .get_mut(&component_id)
396 .expect("Received NotifyActivity for an unknown component");
397 component.last_activity = Instant::now();
398 component.last_annotation = Some(annotation);
399 component.sent_transient_alert = false;
400 component.sent_permanent_alert = false;
401 component.is_waiting = false;
402 },
403 (component_id, MonitoredComponentMsg::NotifyWait) => {
404 let component = self
405 .monitored_components
406 .get_mut(&component_id)
407 .expect("Received NotifyWait for an unknown component");
408 component.last_activity = Instant::now();
409 component.sent_transient_alert = false;
410 component.sent_permanent_alert = false;
411 component.is_waiting = true;
412 },
413 }
414 }
415
416 fn perform_a_hang_monitor_checkpoint(&mut self) {
417 for (component_id, monitored) in self.monitored_components.iter_mut() {
418 if monitored.is_waiting {
419 continue;
420 }
421 let last_annotation = monitored.last_annotation.unwrap();
422 if monitored.last_activity.elapsed() > monitored.permanent_hang_timeout {
423 if monitored.sent_permanent_alert {
424 continue;
425 }
426 let profile = match monitored.sampler.suspend_and_sample_thread() {
427 Ok(native_stack) => Some(native_stack.to_hangprofile()),
428 Err(()) => None,
429 };
430 let _ = self
431 .constellation_chan
432 .send(HangMonitorAlert::Hang(HangAlert::Permanent(
433 component_id.clone(),
434 last_annotation,
435 profile,
436 )));
437 monitored.sent_permanent_alert = true;
438 continue;
439 }
440 if monitored.last_activity.elapsed() > monitored.transient_hang_timeout {
441 if monitored.sent_transient_alert {
442 continue;
443 }
444 let _ = self
445 .constellation_chan
446 .send(HangMonitorAlert::Hang(HangAlert::Transient(
447 component_id.clone(),
448 last_annotation,
449 )));
450 monitored.sent_transient_alert = true;
451 }
452 }
453 }
454
455 fn sample(&mut self) {
456 for (component_id, monitored) in self.monitored_components.iter_mut() {
457 let instant = Instant::now();
458 if let Ok(stack) = monitored.sampler.suspend_and_sample_thread() {
459 if self.sampling_baseline.elapsed() >
460 self.sampling_max_duration
461 .expect("Max duration has been set")
462 {
463 self.samples.pop_front();
465 }
466 self.samples
467 .push_back(Sample(component_id.clone(), instant, stack));
468 }
469 }
470 }
471}