servo_media_audio/
render_thread.rs

1use crate::analyser_node::AnalyserNode;
2use crate::biquad_filter_node::BiquadFilterNode;
3use crate::block::{Chunk, Tick, FRAMES_PER_BLOCK};
4use crate::buffer_source_node::AudioBufferSourceNode;
5use crate::channel_node::{ChannelMergerNode, ChannelSplitterNode};
6use crate::constant_source_node::ConstantSourceNode;
7use crate::context::{AudioContextOptions, ProcessingState, StateChangeResult};
8use crate::gain_node::GainNode;
9use crate::graph::{AudioGraph, InputPort, NodeId, OutputPort, PortId};
10use crate::iir_filter_node::IIRFilterNode;
11use crate::media_element_source_node::MediaElementSourceNode;
12use crate::media_stream_destination_node::MediaStreamDestinationNode;
13use crate::media_stream_source_node::MediaStreamSourceNode;
14use crate::node::{AudioNodeEngine, AudioNodeInit, AudioNodeMessage};
15use crate::node::{BlockInfo, ChannelInfo};
16use crate::offline_sink::OfflineAudioSink;
17use crate::oscillator_node::OscillatorNode;
18use crate::panner_node::PannerNode;
19use servo_media_streams::{MediaSocket, MediaStreamId};
20use crate::sink::{AudioSink, AudioSinkError};
21use std::sync::mpsc::{Receiver, Sender};
22use crate::stereo_panner::StereoPannerNode;
23use crate::wave_shaper_node::WaveShaperNode;
24use crate::{AudioBackend, AudioStreamReader};
25
26pub enum AudioRenderThreadMsg {
27    CreateNode(AudioNodeInit, Sender<NodeId>, ChannelInfo),
28    ConnectPorts(PortId<OutputPort>, PortId<InputPort>),
29    MessageNode(NodeId, AudioNodeMessage),
30    Resume(Sender<StateChangeResult>),
31    Suspend(Sender<StateChangeResult>),
32    Close(Sender<StateChangeResult>),
33    SinkNeedData,
34    GetCurrentTime(Sender<f64>),
35
36    DisconnectAllFrom(NodeId),
37    DisconnectOutput(PortId<OutputPort>),
38    DisconnectBetween(NodeId, NodeId),
39    DisconnectTo(NodeId, PortId<InputPort>),
40    DisconnectOutputBetween(PortId<OutputPort>, NodeId),
41    DisconnectOutputBetweenTo(PortId<OutputPort>, PortId<InputPort>),
42
43    SetSinkEosCallback(Box<dyn Fn(Box<dyn AsRef<[f32]>>) + Send + Sync + 'static>),
44
45    SetMute(bool),
46}
47
48pub enum Sink {
49    RealTime(Box<dyn AudioSink>),
50    Offline(OfflineAudioSink),
51}
52
53impl AudioSink for Sink {
54    fn init(
55        &self,
56        sample_rate: f32,
57        sender: Sender<AudioRenderThreadMsg>,
58    ) -> Result<(), AudioSinkError> {
59        match *self {
60            Sink::RealTime(ref sink) => sink.init(sample_rate, sender),
61            Sink::Offline(ref sink) => Ok(sink.init(sample_rate, sender).unwrap()),
62        }
63    }
64
65    fn init_stream(&self, _: u8, _: f32, _: Box<dyn MediaSocket>) -> Result<(), AudioSinkError> {
66        unreachable!("Sink should never be used for MediaStreamDestinationNode")
67    }
68
69    fn play(&self) -> Result<(), AudioSinkError> {
70        match *self {
71            Sink::RealTime(ref sink) => sink.play(),
72            Sink::Offline(ref sink) => Ok(sink.play().unwrap()),
73        }
74    }
75
76    fn stop(&self) -> Result<(), AudioSinkError> {
77        match *self {
78            Sink::RealTime(ref sink) => sink.stop(),
79            Sink::Offline(ref sink) => Ok(sink.stop().unwrap()),
80        }
81    }
82
83    fn has_enough_data(&self) -> bool {
84        match *self {
85            Sink::RealTime(ref sink) => sink.has_enough_data(),
86            Sink::Offline(ref sink) => sink.has_enough_data(),
87        }
88    }
89
90    fn push_data(&self, chunk: Chunk) -> Result<(), AudioSinkError> {
91        match *self {
92            Sink::RealTime(ref sink) => sink.push_data(chunk),
93            Sink::Offline(ref sink) => Ok(sink.push_data(chunk).unwrap()),
94        }
95    }
96
97    fn set_eos_callback(
98        &self,
99        callback: Box<dyn Fn(Box<dyn AsRef<[f32]>>) + Send + Sync + 'static>,
100    ) {
101        match *self {
102            Sink::RealTime(ref sink) => sink.set_eos_callback(callback),
103            Sink::Offline(ref sink) => sink.set_eos_callback(callback),
104        }
105    }
106}
107
108pub struct AudioRenderThread {
109    pub graph: AudioGraph,
110    pub sink: Sink,
111    pub sink_factory: Box<dyn Fn() -> Result<Box<dyn AudioSink + 'static>, AudioSinkError>>,
112    pub reader_factory: Box<dyn Fn(MediaStreamId, f32) -> Box<dyn AudioStreamReader + Send>>,
113    pub state: ProcessingState,
114    pub sample_rate: f32,
115    pub current_time: f64,
116    pub current_frame: Tick,
117    pub muted: bool,
118}
119
120impl AudioRenderThread {
121    /// Initializes the AudioRenderThread object
122    ///
123    /// You must call .event_loop() on this to run it!
124    fn prepare_thread<B: AudioBackend>(
125        sender: Sender<AudioRenderThreadMsg>,
126        sample_rate: f32,
127        graph: AudioGraph,
128        options: AudioContextOptions,
129    ) -> Result<Self, AudioSinkError> {
130        let sink_factory = Box::new(|| B::make_sink().map(|s| Box::new(s) as Box<dyn AudioSink>));
131        let reader_factory = Box::new(|id, sample_rate| B::make_streamreader(id, sample_rate));
132        let sink = match options {
133            AudioContextOptions::RealTimeAudioContext(_) => Sink::RealTime(sink_factory()?),
134            AudioContextOptions::OfflineAudioContext(options) => Sink::Offline(
135                OfflineAudioSink::new(options.channels as usize, options.length),
136            ),
137        };
138
139        sink.init(sample_rate, sender)?;
140
141        Ok(Self {
142            graph,
143            sink,
144            sink_factory,
145            reader_factory,
146            state: ProcessingState::Suspended,
147            sample_rate,
148            current_time: 0.,
149            current_frame: Tick(0),
150            muted: false,
151        })
152    }
153
154    /// Start the audio render thread
155    ///
156    /// In case something fails, it will instead start a thread with a dummy backend
157    pub fn start<B: AudioBackend>(
158        event_queue: Receiver<AudioRenderThreadMsg>,
159        sender: Sender<AudioRenderThreadMsg>,
160        sample_rate: f32,
161        graph: AudioGraph,
162        options: AudioContextOptions,
163        init_sender: Sender<Result<(), AudioSinkError>>,
164    ) {
165        let mut thread =
166            match Self::prepare_thread::<B>(sender.clone(), sample_rate, graph, options) {
167                Ok(thread) => {
168                    let _ = init_sender.send(Ok(()));
169                    thread
170                }
171                Err(e) => {
172                    let _ = init_sender.send(Err(e));
173                    return;
174                }
175            };
176
177        thread.event_loop(event_queue);
178    }
179
180    make_render_thread_state_change!(resume, Running, play);
181
182    make_render_thread_state_change!(suspend, Suspended, stop);
183
184    fn create_node(&mut self, node_type: AudioNodeInit, ch: ChannelInfo) -> NodeId {
185        let mut needs_listener = false;
186        let mut is_dest = false;
187        let node: Box<dyn AudioNodeEngine> = match node_type {
188            AudioNodeInit::AnalyserNode(sender) => Box::new(AnalyserNode::new(sender, ch)),
189            AudioNodeInit::AudioBufferSourceNode(options) => {
190                Box::new(AudioBufferSourceNode::new(options, ch))
191            }
192            AudioNodeInit::BiquadFilterNode(options) => {
193                Box::new(BiquadFilterNode::new(options, ch, self.sample_rate))
194            }
195            AudioNodeInit::GainNode(options) => Box::new(GainNode::new(options, ch)),
196            AudioNodeInit::StereoPannerNode(options) => {
197                Box::new(StereoPannerNode::new(options, ch))
198            }
199            AudioNodeInit::PannerNode(options) => {
200                needs_listener = true;
201                Box::new(PannerNode::new(options, ch))
202            }
203            AudioNodeInit::MediaStreamSourceNode(id) => {
204                let reader = (self.reader_factory)(id, self.sample_rate);
205                Box::new(MediaStreamSourceNode::new(reader, ch))
206            }
207            AudioNodeInit::OscillatorNode(options) => Box::new(OscillatorNode::new(options, ch)),
208            AudioNodeInit::ChannelMergerNode(options) => {
209                Box::new(ChannelMergerNode::new(options, ch))
210            }
211            AudioNodeInit::ConstantSourceNode(options) => {
212                Box::new(ConstantSourceNode::new(options, ch))
213            }
214            AudioNodeInit::MediaStreamDestinationNode(socket) => {
215                is_dest = true;
216                Box::new(MediaStreamDestinationNode::new(
217                    socket,
218                    self.sample_rate,
219                    (self.sink_factory)().unwrap(),
220                    ch,
221                ))
222            }
223            AudioNodeInit::ChannelSplitterNode => Box::new(ChannelSplitterNode::new(ch)),
224            AudioNodeInit::WaveShaperNode(options) => Box::new(WaveShaperNode::new(options, ch)),
225            AudioNodeInit::MediaElementSourceNode => Box::new(MediaElementSourceNode::new(ch)),
226            AudioNodeInit::IIRFilterNode(options) => Box::new(IIRFilterNode::new(options, ch)),
227            _ => unimplemented!(),
228        };
229        let id = self.graph.add_node(node);
230        if needs_listener {
231            let listener = self.graph.listener_id().output(0);
232            self.graph.add_edge(listener, id.listener());
233        }
234        if is_dest {
235            self.graph.add_extra_dest(id);
236        }
237        id
238    }
239
240    fn connect_ports(&mut self, output: PortId<OutputPort>, input: PortId<InputPort>) {
241        self.graph.add_edge(output, input)
242    }
243
244    fn process(&mut self) -> Chunk {
245        if self.muted {
246            return Chunk::explicit_silence();
247        }
248
249        let info = BlockInfo {
250            sample_rate: self.sample_rate,
251            frame: self.current_frame,
252            time: self.current_time,
253        };
254        self.graph.process(&info)
255    }
256
257    fn set_mute(&mut self, val: bool) -> () {
258        self.muted = val;
259    }
260
261    fn event_loop(&mut self, event_queue: Receiver<AudioRenderThreadMsg>) {
262        let sample_rate = self.sample_rate;
263        let handle_msg = move |context: &mut Self, msg: AudioRenderThreadMsg| -> bool {
264            let mut break_loop = false;
265            match msg {
266                AudioRenderThreadMsg::CreateNode(node_type, tx, ch) => {
267                    let _ = tx.send(context.create_node(node_type, ch));
268                }
269                AudioRenderThreadMsg::ConnectPorts(output, input) => {
270                    context.connect_ports(output, input);
271                }
272                AudioRenderThreadMsg::Resume(tx) => {
273                    let _ = tx.send(context.resume());
274                }
275                AudioRenderThreadMsg::Suspend(tx) => {
276                    let _ = tx.send(context.suspend());
277                }
278                AudioRenderThreadMsg::Close(tx) => {
279                    let _ = tx.send(context.suspend());
280                    break_loop = true;
281                }
282                AudioRenderThreadMsg::GetCurrentTime(response) => {
283                    response.send(context.current_time).unwrap()
284                }
285                AudioRenderThreadMsg::MessageNode(id, msg) => {
286                    context.graph.node_mut(id).message(msg, sample_rate)
287                }
288                AudioRenderThreadMsg::SinkNeedData => {
289                    // Do nothing. This will simply unblock the thread so we
290                    // can restart the non-blocking event loop.
291                }
292                AudioRenderThreadMsg::DisconnectAllFrom(id) => {
293                    context.graph.disconnect_all_from(id)
294                }
295                AudioRenderThreadMsg::DisconnectOutput(out) => context.graph.disconnect_output(out),
296                AudioRenderThreadMsg::DisconnectBetween(from, to) => {
297                    context.graph.disconnect_between(from, to)
298                }
299                AudioRenderThreadMsg::DisconnectTo(from, to) => {
300                    context.graph.disconnect_to(from, to)
301                }
302                AudioRenderThreadMsg::DisconnectOutputBetween(from, to) => {
303                    context.graph.disconnect_output_between(from, to)
304                }
305                AudioRenderThreadMsg::DisconnectOutputBetweenTo(from, to) => {
306                    context.graph.disconnect_output_between_to(from, to)
307                }
308                AudioRenderThreadMsg::SetSinkEosCallback(callback) => {
309                    context.sink.set_eos_callback(callback);
310                }
311                AudioRenderThreadMsg::SetMute(val) => {
312                    context.set_mute(val);
313                }
314            };
315
316            break_loop
317        };
318
319        loop {
320            if self.sink.has_enough_data() || self.state == ProcessingState::Suspended {
321                // If we are not processing audio or
322                // if we have already pushed enough data into the audio sink
323                // we wait for messages coming from the control thread or
324                // the audio sink. The audio sink will notify whenever it
325                // needs more data.
326                if let Ok(msg) = event_queue.recv() {
327                    if handle_msg(self, msg) {
328                        break;
329                    }
330                }
331            } else {
332                // If we have not pushed enough data into the audio sink yet,
333                // we process the control message queue
334                if let Ok(msg) = event_queue.try_recv() {
335                    if handle_msg(self, msg) {
336                        break;
337                    }
338                }
339
340                if self.state == ProcessingState::Suspended {
341                    // Bail out if we just suspended processing.
342                    continue;
343                }
344
345                // push into the audio sink the result of processing a
346                // render quantum.
347                let data = self.process();
348                if self.sink.push_data(data).is_ok() {
349                    // increment current frame by the render quantum size.
350                    self.current_frame += FRAMES_PER_BLOCK;
351                    self.current_time = self.current_frame / self.sample_rate as f64;
352                } else {
353                    eprintln!("Could not push data to audio sink");
354                }
355            }
356        }
357    }
358}