servo_media_audio/
render_thread.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
4
5use std::sync::mpsc::{Receiver, Sender};
6
7use servo_media_streams::{MediaSocket, MediaStreamId};
8
9use crate::analyser_node::AnalyserNode;
10use crate::biquad_filter_node::BiquadFilterNode;
11use crate::block::{Chunk, FRAMES_PER_BLOCK, Tick};
12use crate::buffer_source_node::AudioBufferSourceNode;
13use crate::channel_node::{ChannelMergerNode, ChannelSplitterNode};
14use crate::constant_source_node::ConstantSourceNode;
15use crate::context::{AudioContextOptions, ProcessingState, StateChangeResult};
16use crate::gain_node::GainNode;
17use crate::graph::{AudioGraph, InputPort, NodeId, OutputPort, PortId};
18use crate::iir_filter_node::IIRFilterNode;
19use crate::media_element_source_node::MediaElementSourceNode;
20use crate::media_stream_destination_node::MediaStreamDestinationNode;
21use crate::media_stream_source_node::MediaStreamSourceNode;
22use crate::node::{AudioNodeEngine, AudioNodeInit, AudioNodeMessage, BlockInfo, ChannelInfo};
23use crate::offline_sink::OfflineAudioSink;
24use crate::oscillator_node::OscillatorNode;
25use crate::panner_node::PannerNode;
26use crate::sink::{AudioSink, AudioSinkError};
27use crate::stereo_panner::StereoPannerNode;
28use crate::wave_shaper_node::WaveShaperNode;
29use crate::{AudioBackend, AudioStreamReader};
30
31pub type SinkEosCallback = Box<dyn Fn(Box<dyn AsRef<[f32]>>) + Send + Sync + 'static>;
32
33pub enum AudioRenderThreadMsg {
34    CreateNode(AudioNodeInit, Sender<NodeId>, ChannelInfo),
35    ConnectPorts(PortId<OutputPort>, PortId<InputPort>),
36    MessageNode(NodeId, AudioNodeMessage),
37    Resume(Sender<StateChangeResult>),
38    Suspend(Sender<StateChangeResult>),
39    Close(Sender<StateChangeResult>),
40    SinkNeedData,
41    GetCurrentTime(Sender<f64>),
42
43    DisconnectAllFrom(NodeId),
44    DisconnectOutput(PortId<OutputPort>),
45    DisconnectBetween(NodeId, NodeId),
46    DisconnectTo(NodeId, PortId<InputPort>),
47    DisconnectOutputBetween(PortId<OutputPort>, NodeId),
48    DisconnectOutputBetweenTo(PortId<OutputPort>, PortId<InputPort>),
49
50    SetSinkEosCallback(SinkEosCallback),
51
52    SetMute(bool),
53}
54
55pub enum Sink {
56    RealTime(Box<dyn AudioSink>),
57    Offline(OfflineAudioSink),
58}
59
60impl AudioSink for Sink {
61    fn init(
62        &self,
63        sample_rate: f32,
64        sender: Sender<AudioRenderThreadMsg>,
65    ) -> Result<(), AudioSinkError> {
66        match *self {
67            Sink::RealTime(ref sink) => sink.init(sample_rate, sender),
68            Sink::Offline(ref sink) => {
69                sink.init(sample_rate, sender).unwrap();
70                Ok(())
71            },
72        }
73    }
74
75    fn init_stream(&self, _: u8, _: f32, _: Box<dyn MediaSocket>) -> Result<(), AudioSinkError> {
76        unreachable!("Sink should never be used for MediaStreamDestinationNode")
77    }
78
79    fn play(&self) -> Result<(), AudioSinkError> {
80        match *self {
81            Sink::RealTime(ref sink) => sink.play(),
82            Sink::Offline(ref sink) => {
83                sink.play().unwrap();
84                Ok(())
85            },
86        }
87    }
88
89    fn stop(&self) -> Result<(), AudioSinkError> {
90        match *self {
91            Sink::RealTime(ref sink) => sink.stop(),
92            Sink::Offline(ref sink) => {
93                sink.stop().unwrap();
94                Ok(())
95            },
96        }
97    }
98
99    fn has_enough_data(&self) -> bool {
100        match *self {
101            Sink::RealTime(ref sink) => sink.has_enough_data(),
102            Sink::Offline(ref sink) => sink.has_enough_data(),
103        }
104    }
105
106    fn push_data(&self, chunk: Chunk) -> Result<(), AudioSinkError> {
107        match *self {
108            Sink::RealTime(ref sink) => sink.push_data(chunk),
109            Sink::Offline(ref sink) => {
110                sink.push_data(chunk).unwrap();
111                Ok(())
112            },
113        }
114    }
115
116    fn set_eos_callback(
117        &self,
118        callback: Box<dyn Fn(Box<dyn AsRef<[f32]>>) + Send + Sync + 'static>,
119    ) {
120        match *self {
121            Sink::RealTime(ref sink) => sink.set_eos_callback(callback),
122            Sink::Offline(ref sink) => sink.set_eos_callback(callback),
123        }
124    }
125}
126
127pub struct AudioRenderThread {
128    pub graph: AudioGraph,
129    pub sink: Sink,
130    pub sink_factory: Box<dyn Fn() -> Result<Box<dyn AudioSink + 'static>, AudioSinkError>>,
131    pub reader_factory: Box<dyn Fn(MediaStreamId, f32) -> Box<dyn AudioStreamReader + Send>>,
132    pub state: ProcessingState,
133    pub sample_rate: f32,
134    pub current_time: f64,
135    pub current_frame: Tick,
136    pub muted: bool,
137}
138
139impl AudioRenderThread {
140    /// Initializes the AudioRenderThread object
141    ///
142    /// You must call .event_loop() on this to run it!
143    fn prepare_thread<B: AudioBackend>(
144        sender: Sender<AudioRenderThreadMsg>,
145        sample_rate: f32,
146        graph: AudioGraph,
147        options: AudioContextOptions,
148    ) -> Result<Self, AudioSinkError> {
149        let sink_factory = Box::new(|| B::make_sink().map(|s| Box::new(s) as Box<dyn AudioSink>));
150        let reader_factory = Box::new(|id, sample_rate| B::make_streamreader(id, sample_rate));
151        let sink = match options {
152            AudioContextOptions::RealTimeAudioContext(_) => Sink::RealTime(sink_factory()?),
153            AudioContextOptions::OfflineAudioContext(options) => Sink::Offline(
154                OfflineAudioSink::new(options.channels as usize, options.length),
155            ),
156        };
157
158        sink.init(sample_rate, sender)?;
159
160        Ok(Self {
161            graph,
162            sink,
163            sink_factory,
164            reader_factory,
165            state: ProcessingState::Suspended,
166            sample_rate,
167            current_time: 0.,
168            current_frame: Tick(0),
169            muted: false,
170        })
171    }
172
173    /// Start the audio render thread
174    ///
175    /// In case something fails, it will instead start a thread with a dummy backend
176    pub fn start<B: AudioBackend>(
177        event_queue: Receiver<AudioRenderThreadMsg>,
178        sender: Sender<AudioRenderThreadMsg>,
179        sample_rate: f32,
180        graph: AudioGraph,
181        options: AudioContextOptions,
182        init_sender: Sender<Result<(), AudioSinkError>>,
183    ) {
184        let mut thread =
185            match Self::prepare_thread::<B>(sender.clone(), sample_rate, graph, options) {
186                Ok(thread) => {
187                    let _ = init_sender.send(Ok(()));
188                    thread
189                },
190                Err(e) => {
191                    let _ = init_sender.send(Err(e));
192                    return;
193                },
194            };
195
196        thread.event_loop(event_queue);
197    }
198
199    make_render_thread_state_change!(resume, Running, play);
200
201    make_render_thread_state_change!(suspend, Suspended, stop);
202
203    fn create_node(&mut self, node_type: AudioNodeInit, ch: ChannelInfo) -> NodeId {
204        let mut needs_listener = false;
205        let mut is_dest = false;
206        let node: Box<dyn AudioNodeEngine> = match node_type {
207            AudioNodeInit::AnalyserNode(sender) => Box::new(AnalyserNode::new(sender, ch)),
208            AudioNodeInit::AudioBufferSourceNode(options) => {
209                Box::new(AudioBufferSourceNode::new(options, ch))
210            },
211            AudioNodeInit::BiquadFilterNode(options) => {
212                Box::new(BiquadFilterNode::new(options, ch, self.sample_rate))
213            },
214            AudioNodeInit::GainNode(options) => Box::new(GainNode::new(options, ch)),
215            AudioNodeInit::StereoPannerNode(options) => {
216                Box::new(StereoPannerNode::new(options, ch))
217            },
218            AudioNodeInit::PannerNode(options) => {
219                needs_listener = true;
220                Box::new(PannerNode::new(options, ch))
221            },
222            AudioNodeInit::MediaStreamSourceNode(id) => {
223                let reader = (self.reader_factory)(id, self.sample_rate);
224                Box::new(MediaStreamSourceNode::new(reader, ch))
225            },
226            AudioNodeInit::OscillatorNode(options) => Box::new(OscillatorNode::new(options, ch)),
227            AudioNodeInit::ChannelMergerNode(options) => {
228                Box::new(ChannelMergerNode::new(options, ch))
229            },
230            AudioNodeInit::ConstantSourceNode(options) => {
231                Box::new(ConstantSourceNode::new(options, ch))
232            },
233            AudioNodeInit::MediaStreamDestinationNode(socket) => {
234                is_dest = true;
235                Box::new(MediaStreamDestinationNode::new(
236                    socket,
237                    self.sample_rate,
238                    (self.sink_factory)().unwrap(),
239                    ch,
240                ))
241            },
242            AudioNodeInit::ChannelSplitterNode => Box::new(ChannelSplitterNode::new(ch)),
243            AudioNodeInit::WaveShaperNode(options) => Box::new(WaveShaperNode::new(options, ch)),
244            AudioNodeInit::MediaElementSourceNode => Box::new(MediaElementSourceNode::new(ch)),
245            AudioNodeInit::IIRFilterNode(options) => Box::new(IIRFilterNode::new(options, ch)),
246            _ => unimplemented!(),
247        };
248        let id = self.graph.add_node(node);
249        if needs_listener {
250            let listener = self.graph.listener_id().output(0);
251            self.graph.add_edge(listener, id.listener());
252        }
253        if is_dest {
254            self.graph.add_extra_dest(id);
255        }
256        id
257    }
258
259    fn connect_ports(&mut self, output: PortId<OutputPort>, input: PortId<InputPort>) {
260        self.graph.add_edge(output, input)
261    }
262
263    fn process(&mut self) -> Chunk {
264        if self.muted {
265            return Chunk::explicit_silence();
266        }
267
268        let info = BlockInfo {
269            sample_rate: self.sample_rate,
270            frame: self.current_frame,
271            time: self.current_time,
272        };
273        self.graph.process(&info)
274    }
275
276    fn set_mute(&mut self, val: bool) {
277        self.muted = val;
278    }
279
280    fn event_loop(&mut self, event_queue: Receiver<AudioRenderThreadMsg>) {
281        let sample_rate = self.sample_rate;
282        let handle_msg = move |context: &mut Self, msg: AudioRenderThreadMsg| -> bool {
283            let mut break_loop = false;
284            match msg {
285                AudioRenderThreadMsg::CreateNode(node_type, tx, ch) => {
286                    let _ = tx.send(context.create_node(node_type, ch));
287                },
288                AudioRenderThreadMsg::ConnectPorts(output, input) => {
289                    context.connect_ports(output, input);
290                },
291                AudioRenderThreadMsg::Resume(tx) => {
292                    let _ = tx.send(context.resume());
293                },
294                AudioRenderThreadMsg::Suspend(tx) => {
295                    let _ = tx.send(context.suspend());
296                },
297                AudioRenderThreadMsg::Close(tx) => {
298                    let _ = tx.send(context.suspend());
299                    break_loop = true;
300                },
301                AudioRenderThreadMsg::GetCurrentTime(response) => {
302                    response.send(context.current_time).unwrap()
303                },
304                AudioRenderThreadMsg::MessageNode(id, msg) => {
305                    context.graph.node_mut(id).message(msg, sample_rate)
306                },
307                AudioRenderThreadMsg::SinkNeedData => {
308                    // Do nothing. This will simply unblock the thread so we
309                    // can restart the non-blocking event loop.
310                },
311                AudioRenderThreadMsg::DisconnectAllFrom(id) => {
312                    context.graph.disconnect_all_from(id)
313                },
314                AudioRenderThreadMsg::DisconnectOutput(out) => context.graph.disconnect_output(out),
315                AudioRenderThreadMsg::DisconnectBetween(from, to) => {
316                    context.graph.disconnect_between(from, to)
317                },
318                AudioRenderThreadMsg::DisconnectTo(from, to) => {
319                    context.graph.disconnect_to(from, to)
320                },
321                AudioRenderThreadMsg::DisconnectOutputBetween(from, to) => {
322                    context.graph.disconnect_output_between(from, to)
323                },
324                AudioRenderThreadMsg::DisconnectOutputBetweenTo(from, to) => {
325                    context.graph.disconnect_output_between_to(from, to)
326                },
327                AudioRenderThreadMsg::SetSinkEosCallback(callback) => {
328                    context.sink.set_eos_callback(callback);
329                },
330                AudioRenderThreadMsg::SetMute(val) => {
331                    context.set_mute(val);
332                },
333            };
334
335            break_loop
336        };
337
338        loop {
339            if self.sink.has_enough_data() || self.state == ProcessingState::Suspended {
340                // If we are not processing audio or
341                // if we have already pushed enough data into the audio sink
342                // we wait for messages coming from the control thread or
343                // the audio sink. The audio sink will notify whenever it
344                // needs more data.
345                if event_queue.recv().is_ok_and(|msg| handle_msg(self, msg)) {
346                    break;
347                }
348            } else {
349                // If we have not pushed enough data into the audio sink yet,
350                // we process the control message queue
351                if event_queue
352                    .try_recv()
353                    .is_ok_and(|msg| handle_msg(self, msg))
354                {
355                    break;
356                }
357
358                if self.state == ProcessingState::Suspended {
359                    // Bail out if we just suspended processing.
360                    continue;
361                }
362
363                // push into the audio sink the result of processing a
364                // render quantum.
365                let data = self.process();
366                if self.sink.push_data(data).is_ok() {
367                    // increment current frame by the render quantum size.
368                    self.current_frame += FRAMES_PER_BLOCK;
369                    self.current_time = self.current_frame / self.sample_rate as f64;
370                } else {
371                    eprintln!("Could not push data to audio sink");
372                }
373            }
374        }
375    }
376}