servo_media_audio/
render_thread.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
4
5use std::sync::mpsc::{Receiver, Sender};
6
7use malloc_size_of_derive::MallocSizeOf;
8use servo_media_streams::{MediaSocket, MediaStreamId};
9
10use crate::analyser_node::AnalyserNode;
11use crate::biquad_filter_node::BiquadFilterNode;
12use crate::block::{Chunk, FRAMES_PER_BLOCK, Tick};
13use crate::buffer_source_node::AudioBufferSourceNode;
14use crate::channel_node::{ChannelMergerNode, ChannelSplitterNode};
15use crate::constant_source_node::ConstantSourceNode;
16use crate::context::{AudioContextOptions, ProcessingState, StateChangeResult};
17use crate::gain_node::GainNode;
18use crate::graph::{AudioGraph, InputPort, NodeId, OutputPort, PortId};
19use crate::iir_filter_node::IIRFilterNode;
20use crate::media_element_source_node::MediaElementSourceNode;
21use crate::media_stream_destination_node::MediaStreamDestinationNode;
22use crate::media_stream_source_node::MediaStreamSourceNode;
23use crate::node::{AudioNodeEngine, AudioNodeInit, AudioNodeMessage, BlockInfo, ChannelInfo};
24use crate::offline_sink::OfflineAudioSink;
25use crate::oscillator_node::OscillatorNode;
26use crate::panner_node::PannerNode;
27use crate::sink::{AudioSink, AudioSinkError};
28use crate::stereo_panner::StereoPannerNode;
29use crate::wave_shaper_node::WaveShaperNode;
30use crate::{AudioBackend, AudioStreamReader};
31
32pub type SinkEosCallback = Box<dyn Fn(Box<dyn AsRef<[f32]>>) + Send + Sync + 'static>;
33
34#[derive(MallocSizeOf)]
35pub enum AudioRenderThreadMsg {
36    CreateNode(AudioNodeInit, Sender<Option<NodeId>>, ChannelInfo),
37    ConnectPorts(PortId<OutputPort>, PortId<InputPort>),
38    MessageNode(NodeId, AudioNodeMessage),
39    Resume(Sender<StateChangeResult>),
40    Suspend(Sender<StateChangeResult>),
41    Close(Sender<StateChangeResult>),
42    SinkNeedData,
43    GetCurrentTime(Sender<f64>),
44
45    DisconnectAllFrom(NodeId),
46    DisconnectOutput(PortId<OutputPort>),
47    DisconnectBetween(NodeId, NodeId),
48    DisconnectTo(NodeId, PortId<InputPort>),
49    DisconnectOutputBetween(PortId<OutputPort>, NodeId),
50    DisconnectOutputBetweenTo(PortId<OutputPort>, PortId<InputPort>),
51
52    SetSinkEosCallback(#[ignore_malloc_size_of = "Fn"] SinkEosCallback),
53
54    SetMute(bool),
55}
56
57pub enum Sink {
58    RealTime(Box<dyn AudioSink>),
59    Offline(OfflineAudioSink),
60}
61
62impl AudioSink for Sink {
63    fn init(
64        &self,
65        sample_rate: f32,
66        sender: Sender<AudioRenderThreadMsg>,
67    ) -> Result<(), AudioSinkError> {
68        match *self {
69            Sink::RealTime(ref sink) => sink.init(sample_rate, sender),
70            Sink::Offline(ref sink) => {
71                sink.init(sample_rate, sender).unwrap();
72                Ok(())
73            },
74        }
75    }
76
77    fn init_stream(&self, _: u8, _: f32, _: Box<dyn MediaSocket>) -> Result<(), AudioSinkError> {
78        unreachable!("Sink should never be used for MediaStreamDestinationNode")
79    }
80
81    fn play(&self) -> Result<(), AudioSinkError> {
82        match *self {
83            Sink::RealTime(ref sink) => sink.play(),
84            Sink::Offline(ref sink) => {
85                sink.play().unwrap();
86                Ok(())
87            },
88        }
89    }
90
91    fn stop(&self) -> Result<(), AudioSinkError> {
92        match *self {
93            Sink::RealTime(ref sink) => sink.stop(),
94            Sink::Offline(ref sink) => {
95                sink.stop().unwrap();
96                Ok(())
97            },
98        }
99    }
100
101    fn has_enough_data(&self) -> bool {
102        match *self {
103            Sink::RealTime(ref sink) => sink.has_enough_data(),
104            Sink::Offline(ref sink) => sink.has_enough_data(),
105        }
106    }
107
108    fn push_data(&self, chunk: Chunk) -> Result<(), AudioSinkError> {
109        match *self {
110            Sink::RealTime(ref sink) => sink.push_data(chunk),
111            Sink::Offline(ref sink) => {
112                sink.push_data(chunk).unwrap();
113                Ok(())
114            },
115        }
116    }
117
118    fn set_eos_callback(
119        &self,
120        callback: Box<dyn Fn(Box<dyn AsRef<[f32]>>) + Send + Sync + 'static>,
121    ) {
122        match *self {
123            Sink::RealTime(ref sink) => sink.set_eos_callback(callback),
124            Sink::Offline(ref sink) => sink.set_eos_callback(callback),
125        }
126    }
127}
128
129pub type ReaderFactoryCallback =
130    dyn Fn(MediaStreamId, f32) -> Result<Box<dyn AudioStreamReader + Send>, AudioSinkError>;
131
132pub struct AudioRenderThread {
133    pub graph: AudioGraph,
134    pub sink: Sink,
135    pub sink_factory: Box<dyn Fn() -> Result<Box<dyn AudioSink + 'static>, AudioSinkError>>,
136    pub reader_factory: Box<ReaderFactoryCallback>,
137    pub state: ProcessingState,
138    pub sample_rate: f32,
139    pub current_time: f64,
140    pub current_frame: Tick,
141    pub muted: bool,
142}
143
144impl AudioRenderThread {
145    /// Initializes the AudioRenderThread object
146    ///
147    /// You must call .event_loop() on this to run it!
148    fn prepare_thread<B: AudioBackend>(
149        sender: Sender<AudioRenderThreadMsg>,
150        sample_rate: f32,
151        graph: AudioGraph,
152        options: AudioContextOptions,
153    ) -> Result<Self, AudioSinkError> {
154        let sink_factory = Box::new(|| B::make_sink().map(|s| Box::new(s) as Box<dyn AudioSink>));
155        let reader_factory = Box::new(|id, sample_rate| B::make_streamreader(id, sample_rate));
156        let sink = match options {
157            AudioContextOptions::RealTimeAudioContext(_) => Sink::RealTime(sink_factory()?),
158            AudioContextOptions::OfflineAudioContext(options) => Sink::Offline(
159                OfflineAudioSink::new(options.channels as usize, options.length),
160            ),
161        };
162
163        sink.init(sample_rate, sender)?;
164
165        Ok(Self {
166            graph,
167            sink,
168            sink_factory,
169            reader_factory,
170            state: ProcessingState::Suspended,
171            sample_rate,
172            current_time: 0.,
173            current_frame: Tick(0),
174            muted: false,
175        })
176    }
177
178    /// Start the audio render thread
179    ///
180    /// In case something fails, it will instead start a thread with a dummy backend
181    pub fn start<B: AudioBackend>(
182        event_queue: Receiver<AudioRenderThreadMsg>,
183        sender: Sender<AudioRenderThreadMsg>,
184        sample_rate: f32,
185        graph: AudioGraph,
186        options: AudioContextOptions,
187        init_sender: Sender<Result<(), AudioSinkError>>,
188    ) {
189        let mut thread = match Self::prepare_thread::<B>(sender, sample_rate, graph, options) {
190            Ok(thread) => {
191                let _ = init_sender.send(Ok(()));
192                thread
193            },
194            Err(e) => {
195                let _ = init_sender.send(Err(e));
196                return;
197            },
198        };
199
200        thread.event_loop(event_queue);
201    }
202
203    make_render_thread_state_change!(resume, Running, play);
204
205    make_render_thread_state_change!(suspend, Suspended, stop);
206
207    fn create_node(&mut self, node_type: AudioNodeInit, ch: ChannelInfo) -> Option<NodeId> {
208        let mut needs_listener = false;
209        let mut is_dest = false;
210        let node: Box<dyn AudioNodeEngine> = match node_type {
211            AudioNodeInit::AnalyserNode(sender) => Box::new(AnalyserNode::new(sender, ch)),
212            AudioNodeInit::AudioBufferSourceNode(options) => {
213                Box::new(AudioBufferSourceNode::new(options, ch))
214            },
215            AudioNodeInit::BiquadFilterNode(options) => {
216                Box::new(BiquadFilterNode::new(options, ch, self.sample_rate))
217            },
218            AudioNodeInit::GainNode(options) => Box::new(GainNode::new(options, ch)),
219            AudioNodeInit::StereoPannerNode(options) => {
220                Box::new(StereoPannerNode::new(options, ch))
221            },
222            AudioNodeInit::PannerNode(options) => {
223                needs_listener = true;
224                Box::new(PannerNode::new(options, ch))
225            },
226            AudioNodeInit::MediaStreamSourceNode(id) => {
227                let reader = (self.reader_factory)(id, self.sample_rate);
228                Box::new(MediaStreamSourceNode::new(reader.ok()?, ch))
229            },
230            AudioNodeInit::OscillatorNode(options) => Box::new(OscillatorNode::new(options, ch)),
231            AudioNodeInit::ChannelMergerNode(options) => {
232                Box::new(ChannelMergerNode::new(options, ch))
233            },
234            AudioNodeInit::ConstantSourceNode(options) => {
235                Box::new(ConstantSourceNode::new(options, ch))
236            },
237            AudioNodeInit::MediaStreamDestinationNode(socket) => {
238                is_dest = true;
239                Box::new(MediaStreamDestinationNode::new(
240                    socket,
241                    self.sample_rate,
242                    (self.sink_factory)().unwrap(),
243                    ch,
244                ))
245            },
246            AudioNodeInit::ChannelSplitterNode => Box::new(ChannelSplitterNode::new(ch)),
247            AudioNodeInit::WaveShaperNode(options) => Box::new(WaveShaperNode::new(options, ch)),
248            AudioNodeInit::MediaElementSourceNode => Box::new(MediaElementSourceNode::new(ch)),
249            AudioNodeInit::IIRFilterNode(options) => Box::new(IIRFilterNode::new(options, ch)),
250            _ => unimplemented!(),
251        };
252        let id = self.graph.add_node(node);
253        if needs_listener {
254            let listener = self.graph.listener_id().output(0);
255            self.graph.add_edge(listener, id.listener());
256        }
257        if is_dest {
258            self.graph.add_extra_dest(id);
259        }
260        Some(id)
261    }
262
263    fn connect_ports(&mut self, output: PortId<OutputPort>, input: PortId<InputPort>) {
264        self.graph.add_edge(output, input)
265    }
266
267    fn process(&mut self) -> Chunk {
268        if self.muted {
269            return Chunk::explicit_silence();
270        }
271
272        let info = BlockInfo {
273            sample_rate: self.sample_rate,
274            frame: self.current_frame,
275            time: self.current_time,
276        };
277        self.graph.process(&info)
278    }
279
280    fn set_mute(&mut self, val: bool) {
281        self.muted = val;
282    }
283
284    fn event_loop(&mut self, event_queue: Receiver<AudioRenderThreadMsg>) {
285        let sample_rate = self.sample_rate;
286        let handle_msg = move |context: &mut Self, msg: AudioRenderThreadMsg| -> bool {
287            let mut break_loop = false;
288            match msg {
289                AudioRenderThreadMsg::CreateNode(node_type, tx, ch) => {
290                    let _ = tx.send(context.create_node(node_type, ch));
291                },
292                AudioRenderThreadMsg::ConnectPorts(output, input) => {
293                    context.connect_ports(output, input);
294                },
295                AudioRenderThreadMsg::Resume(tx) => {
296                    let _ = tx.send(context.resume());
297                },
298                AudioRenderThreadMsg::Suspend(tx) => {
299                    let _ = tx.send(context.suspend());
300                },
301                AudioRenderThreadMsg::Close(tx) => {
302                    let _ = tx.send(context.suspend());
303                    break_loop = true;
304                },
305                AudioRenderThreadMsg::GetCurrentTime(response) => {
306                    response.send(context.current_time).unwrap()
307                },
308                AudioRenderThreadMsg::MessageNode(id, msg) => {
309                    context.graph.node_mut(id).message(msg, sample_rate)
310                },
311                AudioRenderThreadMsg::SinkNeedData => {
312                    // Do nothing. This will simply unblock the thread so we
313                    // can restart the non-blocking event loop.
314                },
315                AudioRenderThreadMsg::DisconnectAllFrom(id) => {
316                    context.graph.disconnect_all_from(id)
317                },
318                AudioRenderThreadMsg::DisconnectOutput(out) => context.graph.disconnect_output(out),
319                AudioRenderThreadMsg::DisconnectBetween(from, to) => {
320                    context.graph.disconnect_between(from, to)
321                },
322                AudioRenderThreadMsg::DisconnectTo(from, to) => {
323                    context.graph.disconnect_to(from, to)
324                },
325                AudioRenderThreadMsg::DisconnectOutputBetween(from, to) => {
326                    context.graph.disconnect_output_between(from, to)
327                },
328                AudioRenderThreadMsg::DisconnectOutputBetweenTo(from, to) => {
329                    context.graph.disconnect_output_between_to(from, to)
330                },
331                AudioRenderThreadMsg::SetSinkEosCallback(callback) => {
332                    context.sink.set_eos_callback(callback);
333                },
334                AudioRenderThreadMsg::SetMute(val) => {
335                    context.set_mute(val);
336                },
337            };
338
339            break_loop
340        };
341
342        loop {
343            if self.sink.has_enough_data() || self.state == ProcessingState::Suspended {
344                // If we are not processing audio or
345                // if we have already pushed enough data into the audio sink
346                // we wait for messages coming from the control thread or
347                // the audio sink. The audio sink will notify whenever it
348                // needs more data.
349                if event_queue.recv().is_ok_and(|msg| handle_msg(self, msg)) {
350                    break;
351                }
352            } else {
353                // If we have not pushed enough data into the audio sink yet,
354                // we process the control message queue
355                if event_queue
356                    .try_recv()
357                    .is_ok_and(|msg| handle_msg(self, msg))
358                {
359                    break;
360                }
361
362                if self.state == ProcessingState::Suspended {
363                    // Bail out if we just suspended processing.
364                    continue;
365                }
366
367                // push into the audio sink the result of processing a
368                // render quantum.
369                let data = self.process();
370                if self.sink.push_data(data).is_ok() {
371                    // increment current frame by the render quantum size.
372                    self.current_frame += FRAMES_PER_BLOCK;
373                    self.current_time = self.current_frame / self.sample_rate as f64;
374                } else {
375                    eprintln!("Could not push data to audio sink");
376                }
377            }
378        }
379    }
380}