servo_media_audio/
render_thread.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
4
5use std::sync::mpsc::{Receiver, Sender};
6
7use malloc_size_of_derive::MallocSizeOf;
8use servo_media_streams::{MediaSocket, MediaStreamId};
9
10use crate::analyser_node::AnalyserNode;
11use crate::biquad_filter_node::BiquadFilterNode;
12use crate::block::{Chunk, FRAMES_PER_BLOCK, Tick};
13use crate::buffer_source_node::AudioBufferSourceNode;
14use crate::channel_node::{ChannelMergerNode, ChannelSplitterNode};
15use crate::constant_source_node::ConstantSourceNode;
16use crate::context::{AudioContextOptions, ProcessingState, StateChangeResult};
17use crate::gain_node::GainNode;
18use crate::graph::{AudioGraph, InputPort, NodeId, OutputPort, PortId};
19use crate::iir_filter_node::IIRFilterNode;
20use crate::media_element_source_node::MediaElementSourceNode;
21use crate::media_stream_destination_node::MediaStreamDestinationNode;
22use crate::media_stream_source_node::MediaStreamSourceNode;
23use crate::node::{AudioNodeEngine, AudioNodeInit, AudioNodeMessage, BlockInfo, ChannelInfo};
24use crate::offline_sink::OfflineAudioSink;
25use crate::oscillator_node::OscillatorNode;
26use crate::panner_node::PannerNode;
27use crate::sink::{AudioSink, AudioSinkError};
28use crate::stereo_panner::StereoPannerNode;
29use crate::wave_shaper_node::WaveShaperNode;
30use crate::{AudioBackend, AudioStreamReader};
31
32pub type SinkEosCallback = Box<dyn Fn(Box<dyn AsRef<[f32]>>) + Send + Sync + 'static>;
33
34#[derive(MallocSizeOf)]
35pub enum AudioRenderThreadMsg {
36    CreateNode(AudioNodeInit, Sender<Option<NodeId>>, ChannelInfo),
37    ConnectPorts(PortId<OutputPort>, PortId<InputPort>),
38    MessageNode(NodeId, AudioNodeMessage),
39    Resume(Sender<StateChangeResult>),
40    Suspend(Sender<StateChangeResult>),
41    Close(Sender<StateChangeResult>),
42    SinkNeedData,
43    GetCurrentTime(Sender<f64>),
44
45    DisconnectAllFrom(NodeId),
46    DisconnectOutput(PortId<OutputPort>),
47    DisconnectBetween(NodeId, NodeId),
48    DisconnectTo(NodeId, PortId<InputPort>),
49    DisconnectOutputBetween(PortId<OutputPort>, NodeId),
50    DisconnectOutputBetweenTo(PortId<OutputPort>, PortId<InputPort>),
51
52    SetSinkEosCallback(#[ignore_malloc_size_of = "Fn"] SinkEosCallback),
53
54    SetMute(bool),
55}
56
57pub enum Sink {
58    RealTime(Box<dyn AudioSink>),
59    Offline(OfflineAudioSink),
60}
61
62impl AudioSink for Sink {
63    fn init(
64        &self,
65        sample_rate: f32,
66        sender: Sender<AudioRenderThreadMsg>,
67    ) -> Result<(), AudioSinkError> {
68        match *self {
69            Sink::RealTime(ref sink) => sink.init(sample_rate, sender),
70            Sink::Offline(ref sink) => {
71                sink.init(sample_rate, sender).unwrap();
72                Ok(())
73            },
74        }
75    }
76
77    fn init_stream(&self, _: u8, _: f32, _: Box<dyn MediaSocket>) -> Result<(), AudioSinkError> {
78        unreachable!("Sink should never be used for MediaStreamDestinationNode")
79    }
80
81    fn play(&self) -> Result<(), AudioSinkError> {
82        match *self {
83            Sink::RealTime(ref sink) => sink.play(),
84            Sink::Offline(ref sink) => {
85                sink.play().unwrap();
86                Ok(())
87            },
88        }
89    }
90
91    fn stop(&self) -> Result<(), AudioSinkError> {
92        match *self {
93            Sink::RealTime(ref sink) => sink.stop(),
94            Sink::Offline(ref sink) => {
95                sink.stop().unwrap();
96                Ok(())
97            },
98        }
99    }
100
101    fn has_enough_data(&self) -> bool {
102        match *self {
103            Sink::RealTime(ref sink) => sink.has_enough_data(),
104            Sink::Offline(ref sink) => sink.has_enough_data(),
105        }
106    }
107
108    fn push_data(&self, chunk: Chunk) -> Result<(), AudioSinkError> {
109        match *self {
110            Sink::RealTime(ref sink) => sink.push_data(chunk),
111            Sink::Offline(ref sink) => {
112                sink.push_data(chunk).unwrap();
113                Ok(())
114            },
115        }
116    }
117
118    fn set_eos_callback(
119        &self,
120        callback: Box<dyn Fn(Box<dyn AsRef<[f32]>>) + Send + Sync + 'static>,
121    ) {
122        match *self {
123            Sink::RealTime(ref sink) => sink.set_eos_callback(callback),
124            Sink::Offline(ref sink) => sink.set_eos_callback(callback),
125        }
126    }
127}
128
129pub type ReaderFactoryCallback =
130    dyn Fn(MediaStreamId, f32) -> Result<Box<dyn AudioStreamReader + Send>, AudioSinkError>;
131
132pub struct AudioRenderThread {
133    pub graph: AudioGraph,
134    pub sink: Sink,
135    pub sink_factory: Box<dyn Fn() -> Result<Box<dyn AudioSink + 'static>, AudioSinkError>>,
136    pub reader_factory: Box<ReaderFactoryCallback>,
137    pub state: ProcessingState,
138    pub sample_rate: f32,
139    pub current_time: f64,
140    pub current_frame: Tick,
141    pub muted: bool,
142}
143
144impl AudioRenderThread {
145    /// Initializes the AudioRenderThread object
146    ///
147    /// You must call .event_loop() on this to run it!
148    fn prepare_thread<B: AudioBackend>(
149        sender: Sender<AudioRenderThreadMsg>,
150        sample_rate: f32,
151        graph: AudioGraph,
152        options: AudioContextOptions,
153    ) -> Result<Self, AudioSinkError> {
154        let sink_factory = Box::new(|| B::make_sink().map(|s| Box::new(s) as Box<dyn AudioSink>));
155        let reader_factory = Box::new(|id, sample_rate| B::make_streamreader(id, sample_rate));
156        let sink = match options {
157            AudioContextOptions::RealTimeAudioContext(_) => Sink::RealTime(sink_factory()?),
158            AudioContextOptions::OfflineAudioContext(options) => Sink::Offline(
159                OfflineAudioSink::new(options.channels as usize, options.length),
160            ),
161        };
162
163        sink.init(sample_rate, sender)?;
164
165        Ok(Self {
166            graph,
167            sink,
168            sink_factory,
169            reader_factory,
170            state: ProcessingState::Suspended,
171            sample_rate,
172            current_time: 0.,
173            current_frame: Tick(0),
174            muted: false,
175        })
176    }
177
178    /// Start the audio render thread
179    ///
180    /// In case something fails, it will instead start a thread with a dummy backend
181    pub fn start<B: AudioBackend>(
182        event_queue: Receiver<AudioRenderThreadMsg>,
183        sender: Sender<AudioRenderThreadMsg>,
184        sample_rate: f32,
185        graph: AudioGraph,
186        options: AudioContextOptions,
187        init_sender: Sender<Result<(), AudioSinkError>>,
188    ) {
189        let mut thread =
190            match Self::prepare_thread::<B>(sender.clone(), sample_rate, graph, options) {
191                Ok(thread) => {
192                    let _ = init_sender.send(Ok(()));
193                    thread
194                },
195                Err(e) => {
196                    let _ = init_sender.send(Err(e));
197                    return;
198                },
199            };
200
201        thread.event_loop(event_queue);
202    }
203
204    make_render_thread_state_change!(resume, Running, play);
205
206    make_render_thread_state_change!(suspend, Suspended, stop);
207
208    fn create_node(&mut self, node_type: AudioNodeInit, ch: ChannelInfo) -> Option<NodeId> {
209        let mut needs_listener = false;
210        let mut is_dest = false;
211        let node: Box<dyn AudioNodeEngine> = match node_type {
212            AudioNodeInit::AnalyserNode(sender) => Box::new(AnalyserNode::new(sender, ch)),
213            AudioNodeInit::AudioBufferSourceNode(options) => {
214                Box::new(AudioBufferSourceNode::new(options, ch))
215            },
216            AudioNodeInit::BiquadFilterNode(options) => {
217                Box::new(BiquadFilterNode::new(options, ch, self.sample_rate))
218            },
219            AudioNodeInit::GainNode(options) => Box::new(GainNode::new(options, ch)),
220            AudioNodeInit::StereoPannerNode(options) => {
221                Box::new(StereoPannerNode::new(options, ch))
222            },
223            AudioNodeInit::PannerNode(options) => {
224                needs_listener = true;
225                Box::new(PannerNode::new(options, ch))
226            },
227            AudioNodeInit::MediaStreamSourceNode(id) => {
228                let reader = (self.reader_factory)(id, self.sample_rate);
229                Box::new(MediaStreamSourceNode::new(reader.ok()?, ch))
230            },
231            AudioNodeInit::OscillatorNode(options) => Box::new(OscillatorNode::new(options, ch)),
232            AudioNodeInit::ChannelMergerNode(options) => {
233                Box::new(ChannelMergerNode::new(options, ch))
234            },
235            AudioNodeInit::ConstantSourceNode(options) => {
236                Box::new(ConstantSourceNode::new(options, ch))
237            },
238            AudioNodeInit::MediaStreamDestinationNode(socket) => {
239                is_dest = true;
240                Box::new(MediaStreamDestinationNode::new(
241                    socket,
242                    self.sample_rate,
243                    (self.sink_factory)().unwrap(),
244                    ch,
245                ))
246            },
247            AudioNodeInit::ChannelSplitterNode => Box::new(ChannelSplitterNode::new(ch)),
248            AudioNodeInit::WaveShaperNode(options) => Box::new(WaveShaperNode::new(options, ch)),
249            AudioNodeInit::MediaElementSourceNode => Box::new(MediaElementSourceNode::new(ch)),
250            AudioNodeInit::IIRFilterNode(options) => Box::new(IIRFilterNode::new(options, ch)),
251            _ => unimplemented!(),
252        };
253        let id = self.graph.add_node(node);
254        if needs_listener {
255            let listener = self.graph.listener_id().output(0);
256            self.graph.add_edge(listener, id.listener());
257        }
258        if is_dest {
259            self.graph.add_extra_dest(id);
260        }
261        Some(id)
262    }
263
264    fn connect_ports(&mut self, output: PortId<OutputPort>, input: PortId<InputPort>) {
265        self.graph.add_edge(output, input)
266    }
267
268    fn process(&mut self) -> Chunk {
269        if self.muted {
270            return Chunk::explicit_silence();
271        }
272
273        let info = BlockInfo {
274            sample_rate: self.sample_rate,
275            frame: self.current_frame,
276            time: self.current_time,
277        };
278        self.graph.process(&info)
279    }
280
281    fn set_mute(&mut self, val: bool) {
282        self.muted = val;
283    }
284
285    fn event_loop(&mut self, event_queue: Receiver<AudioRenderThreadMsg>) {
286        let sample_rate = self.sample_rate;
287        let handle_msg = move |context: &mut Self, msg: AudioRenderThreadMsg| -> bool {
288            let mut break_loop = false;
289            match msg {
290                AudioRenderThreadMsg::CreateNode(node_type, tx, ch) => {
291                    let _ = tx.send(context.create_node(node_type, ch));
292                },
293                AudioRenderThreadMsg::ConnectPorts(output, input) => {
294                    context.connect_ports(output, input);
295                },
296                AudioRenderThreadMsg::Resume(tx) => {
297                    let _ = tx.send(context.resume());
298                },
299                AudioRenderThreadMsg::Suspend(tx) => {
300                    let _ = tx.send(context.suspend());
301                },
302                AudioRenderThreadMsg::Close(tx) => {
303                    let _ = tx.send(context.suspend());
304                    break_loop = true;
305                },
306                AudioRenderThreadMsg::GetCurrentTime(response) => {
307                    response.send(context.current_time).unwrap()
308                },
309                AudioRenderThreadMsg::MessageNode(id, msg) => {
310                    context.graph.node_mut(id).message(msg, sample_rate)
311                },
312                AudioRenderThreadMsg::SinkNeedData => {
313                    // Do nothing. This will simply unblock the thread so we
314                    // can restart the non-blocking event loop.
315                },
316                AudioRenderThreadMsg::DisconnectAllFrom(id) => {
317                    context.graph.disconnect_all_from(id)
318                },
319                AudioRenderThreadMsg::DisconnectOutput(out) => context.graph.disconnect_output(out),
320                AudioRenderThreadMsg::DisconnectBetween(from, to) => {
321                    context.graph.disconnect_between(from, to)
322                },
323                AudioRenderThreadMsg::DisconnectTo(from, to) => {
324                    context.graph.disconnect_to(from, to)
325                },
326                AudioRenderThreadMsg::DisconnectOutputBetween(from, to) => {
327                    context.graph.disconnect_output_between(from, to)
328                },
329                AudioRenderThreadMsg::DisconnectOutputBetweenTo(from, to) => {
330                    context.graph.disconnect_output_between_to(from, to)
331                },
332                AudioRenderThreadMsg::SetSinkEosCallback(callback) => {
333                    context.sink.set_eos_callback(callback);
334                },
335                AudioRenderThreadMsg::SetMute(val) => {
336                    context.set_mute(val);
337                },
338            };
339
340            break_loop
341        };
342
343        loop {
344            if self.sink.has_enough_data() || self.state == ProcessingState::Suspended {
345                // If we are not processing audio or
346                // if we have already pushed enough data into the audio sink
347                // we wait for messages coming from the control thread or
348                // the audio sink. The audio sink will notify whenever it
349                // needs more data.
350                if event_queue.recv().is_ok_and(|msg| handle_msg(self, msg)) {
351                    break;
352                }
353            } else {
354                // If we have not pushed enough data into the audio sink yet,
355                // we process the control message queue
356                if event_queue
357                    .try_recv()
358                    .is_ok_and(|msg| handle_msg(self, msg))
359                {
360                    break;
361                }
362
363                if self.state == ProcessingState::Suspended {
364                    // Bail out if we just suspended processing.
365                    continue;
366                }
367
368                // push into the audio sink the result of processing a
369                // render quantum.
370                let data = self.process();
371                if self.sink.push_data(data).is_ok() {
372                    // increment current frame by the render quantum size.
373                    self.current_frame += FRAMES_PER_BLOCK;
374                    self.current_time = self.current_frame / self.sample_rate as f64;
375                } else {
376                    eprintln!("Could not push data to audio sink");
377                }
378            }
379        }
380    }
381}