1use std::sync::mpsc::{Receiver, Sender};
6
7use servo_media_streams::{MediaSocket, MediaStreamId};
8
9use crate::analyser_node::AnalyserNode;
10use crate::biquad_filter_node::BiquadFilterNode;
11use crate::block::{Chunk, FRAMES_PER_BLOCK, Tick};
12use crate::buffer_source_node::AudioBufferSourceNode;
13use crate::channel_node::{ChannelMergerNode, ChannelSplitterNode};
14use crate::constant_source_node::ConstantSourceNode;
15use crate::context::{AudioContextOptions, ProcessingState, StateChangeResult};
16use crate::gain_node::GainNode;
17use crate::graph::{AudioGraph, InputPort, NodeId, OutputPort, PortId};
18use crate::iir_filter_node::IIRFilterNode;
19use crate::media_element_source_node::MediaElementSourceNode;
20use crate::media_stream_destination_node::MediaStreamDestinationNode;
21use crate::media_stream_source_node::MediaStreamSourceNode;
22use crate::node::{AudioNodeEngine, AudioNodeInit, AudioNodeMessage, BlockInfo, ChannelInfo};
23use crate::offline_sink::OfflineAudioSink;
24use crate::oscillator_node::OscillatorNode;
25use crate::panner_node::PannerNode;
26use crate::sink::{AudioSink, AudioSinkError};
27use crate::stereo_panner::StereoPannerNode;
28use crate::wave_shaper_node::WaveShaperNode;
29use crate::{AudioBackend, AudioStreamReader};
30
31pub type SinkEosCallback = Box<dyn Fn(Box<dyn AsRef<[f32]>>) + Send + Sync + 'static>;
32
33pub enum AudioRenderThreadMsg {
34 CreateNode(AudioNodeInit, Sender<NodeId>, ChannelInfo),
35 ConnectPorts(PortId<OutputPort>, PortId<InputPort>),
36 MessageNode(NodeId, AudioNodeMessage),
37 Resume(Sender<StateChangeResult>),
38 Suspend(Sender<StateChangeResult>),
39 Close(Sender<StateChangeResult>),
40 SinkNeedData,
41 GetCurrentTime(Sender<f64>),
42
43 DisconnectAllFrom(NodeId),
44 DisconnectOutput(PortId<OutputPort>),
45 DisconnectBetween(NodeId, NodeId),
46 DisconnectTo(NodeId, PortId<InputPort>),
47 DisconnectOutputBetween(PortId<OutputPort>, NodeId),
48 DisconnectOutputBetweenTo(PortId<OutputPort>, PortId<InputPort>),
49
50 SetSinkEosCallback(SinkEosCallback),
51
52 SetMute(bool),
53}
54
55pub enum Sink {
56 RealTime(Box<dyn AudioSink>),
57 Offline(OfflineAudioSink),
58}
59
60impl AudioSink for Sink {
61 fn init(
62 &self,
63 sample_rate: f32,
64 sender: Sender<AudioRenderThreadMsg>,
65 ) -> Result<(), AudioSinkError> {
66 match *self {
67 Sink::RealTime(ref sink) => sink.init(sample_rate, sender),
68 Sink::Offline(ref sink) => {
69 sink.init(sample_rate, sender).unwrap();
70 Ok(())
71 },
72 }
73 }
74
75 fn init_stream(&self, _: u8, _: f32, _: Box<dyn MediaSocket>) -> Result<(), AudioSinkError> {
76 unreachable!("Sink should never be used for MediaStreamDestinationNode")
77 }
78
79 fn play(&self) -> Result<(), AudioSinkError> {
80 match *self {
81 Sink::RealTime(ref sink) => sink.play(),
82 Sink::Offline(ref sink) => {
83 sink.play().unwrap();
84 Ok(())
85 },
86 }
87 }
88
89 fn stop(&self) -> Result<(), AudioSinkError> {
90 match *self {
91 Sink::RealTime(ref sink) => sink.stop(),
92 Sink::Offline(ref sink) => {
93 sink.stop().unwrap();
94 Ok(())
95 },
96 }
97 }
98
99 fn has_enough_data(&self) -> bool {
100 match *self {
101 Sink::RealTime(ref sink) => sink.has_enough_data(),
102 Sink::Offline(ref sink) => sink.has_enough_data(),
103 }
104 }
105
106 fn push_data(&self, chunk: Chunk) -> Result<(), AudioSinkError> {
107 match *self {
108 Sink::RealTime(ref sink) => sink.push_data(chunk),
109 Sink::Offline(ref sink) => {
110 sink.push_data(chunk).unwrap();
111 Ok(())
112 },
113 }
114 }
115
116 fn set_eos_callback(
117 &self,
118 callback: Box<dyn Fn(Box<dyn AsRef<[f32]>>) + Send + Sync + 'static>,
119 ) {
120 match *self {
121 Sink::RealTime(ref sink) => sink.set_eos_callback(callback),
122 Sink::Offline(ref sink) => sink.set_eos_callback(callback),
123 }
124 }
125}
126
127pub struct AudioRenderThread {
128 pub graph: AudioGraph,
129 pub sink: Sink,
130 pub sink_factory: Box<dyn Fn() -> Result<Box<dyn AudioSink + 'static>, AudioSinkError>>,
131 pub reader_factory: Box<dyn Fn(MediaStreamId, f32) -> Box<dyn AudioStreamReader + Send>>,
132 pub state: ProcessingState,
133 pub sample_rate: f32,
134 pub current_time: f64,
135 pub current_frame: Tick,
136 pub muted: bool,
137}
138
139impl AudioRenderThread {
140 fn prepare_thread<B: AudioBackend>(
144 sender: Sender<AudioRenderThreadMsg>,
145 sample_rate: f32,
146 graph: AudioGraph,
147 options: AudioContextOptions,
148 ) -> Result<Self, AudioSinkError> {
149 let sink_factory = Box::new(|| B::make_sink().map(|s| Box::new(s) as Box<dyn AudioSink>));
150 let reader_factory = Box::new(|id, sample_rate| B::make_streamreader(id, sample_rate));
151 let sink = match options {
152 AudioContextOptions::RealTimeAudioContext(_) => Sink::RealTime(sink_factory()?),
153 AudioContextOptions::OfflineAudioContext(options) => Sink::Offline(
154 OfflineAudioSink::new(options.channels as usize, options.length),
155 ),
156 };
157
158 sink.init(sample_rate, sender)?;
159
160 Ok(Self {
161 graph,
162 sink,
163 sink_factory,
164 reader_factory,
165 state: ProcessingState::Suspended,
166 sample_rate,
167 current_time: 0.,
168 current_frame: Tick(0),
169 muted: false,
170 })
171 }
172
173 pub fn start<B: AudioBackend>(
177 event_queue: Receiver<AudioRenderThreadMsg>,
178 sender: Sender<AudioRenderThreadMsg>,
179 sample_rate: f32,
180 graph: AudioGraph,
181 options: AudioContextOptions,
182 init_sender: Sender<Result<(), AudioSinkError>>,
183 ) {
184 let mut thread =
185 match Self::prepare_thread::<B>(sender.clone(), sample_rate, graph, options) {
186 Ok(thread) => {
187 let _ = init_sender.send(Ok(()));
188 thread
189 },
190 Err(e) => {
191 let _ = init_sender.send(Err(e));
192 return;
193 },
194 };
195
196 thread.event_loop(event_queue);
197 }
198
199 make_render_thread_state_change!(resume, Running, play);
200
201 make_render_thread_state_change!(suspend, Suspended, stop);
202
203 fn create_node(&mut self, node_type: AudioNodeInit, ch: ChannelInfo) -> NodeId {
204 let mut needs_listener = false;
205 let mut is_dest = false;
206 let node: Box<dyn AudioNodeEngine> = match node_type {
207 AudioNodeInit::AnalyserNode(sender) => Box::new(AnalyserNode::new(sender, ch)),
208 AudioNodeInit::AudioBufferSourceNode(options) => {
209 Box::new(AudioBufferSourceNode::new(options, ch))
210 },
211 AudioNodeInit::BiquadFilterNode(options) => {
212 Box::new(BiquadFilterNode::new(options, ch, self.sample_rate))
213 },
214 AudioNodeInit::GainNode(options) => Box::new(GainNode::new(options, ch)),
215 AudioNodeInit::StereoPannerNode(options) => {
216 Box::new(StereoPannerNode::new(options, ch))
217 },
218 AudioNodeInit::PannerNode(options) => {
219 needs_listener = true;
220 Box::new(PannerNode::new(options, ch))
221 },
222 AudioNodeInit::MediaStreamSourceNode(id) => {
223 let reader = (self.reader_factory)(id, self.sample_rate);
224 Box::new(MediaStreamSourceNode::new(reader, ch))
225 },
226 AudioNodeInit::OscillatorNode(options) => Box::new(OscillatorNode::new(options, ch)),
227 AudioNodeInit::ChannelMergerNode(options) => {
228 Box::new(ChannelMergerNode::new(options, ch))
229 },
230 AudioNodeInit::ConstantSourceNode(options) => {
231 Box::new(ConstantSourceNode::new(options, ch))
232 },
233 AudioNodeInit::MediaStreamDestinationNode(socket) => {
234 is_dest = true;
235 Box::new(MediaStreamDestinationNode::new(
236 socket,
237 self.sample_rate,
238 (self.sink_factory)().unwrap(),
239 ch,
240 ))
241 },
242 AudioNodeInit::ChannelSplitterNode => Box::new(ChannelSplitterNode::new(ch)),
243 AudioNodeInit::WaveShaperNode(options) => Box::new(WaveShaperNode::new(options, ch)),
244 AudioNodeInit::MediaElementSourceNode => Box::new(MediaElementSourceNode::new(ch)),
245 AudioNodeInit::IIRFilterNode(options) => Box::new(IIRFilterNode::new(options, ch)),
246 _ => unimplemented!(),
247 };
248 let id = self.graph.add_node(node);
249 if needs_listener {
250 let listener = self.graph.listener_id().output(0);
251 self.graph.add_edge(listener, id.listener());
252 }
253 if is_dest {
254 self.graph.add_extra_dest(id);
255 }
256 id
257 }
258
259 fn connect_ports(&mut self, output: PortId<OutputPort>, input: PortId<InputPort>) {
260 self.graph.add_edge(output, input)
261 }
262
263 fn process(&mut self) -> Chunk {
264 if self.muted {
265 return Chunk::explicit_silence();
266 }
267
268 let info = BlockInfo {
269 sample_rate: self.sample_rate,
270 frame: self.current_frame,
271 time: self.current_time,
272 };
273 self.graph.process(&info)
274 }
275
276 fn set_mute(&mut self, val: bool) {
277 self.muted = val;
278 }
279
280 fn event_loop(&mut self, event_queue: Receiver<AudioRenderThreadMsg>) {
281 let sample_rate = self.sample_rate;
282 let handle_msg = move |context: &mut Self, msg: AudioRenderThreadMsg| -> bool {
283 let mut break_loop = false;
284 match msg {
285 AudioRenderThreadMsg::CreateNode(node_type, tx, ch) => {
286 let _ = tx.send(context.create_node(node_type, ch));
287 },
288 AudioRenderThreadMsg::ConnectPorts(output, input) => {
289 context.connect_ports(output, input);
290 },
291 AudioRenderThreadMsg::Resume(tx) => {
292 let _ = tx.send(context.resume());
293 },
294 AudioRenderThreadMsg::Suspend(tx) => {
295 let _ = tx.send(context.suspend());
296 },
297 AudioRenderThreadMsg::Close(tx) => {
298 let _ = tx.send(context.suspend());
299 break_loop = true;
300 },
301 AudioRenderThreadMsg::GetCurrentTime(response) => {
302 response.send(context.current_time).unwrap()
303 },
304 AudioRenderThreadMsg::MessageNode(id, msg) => {
305 context.graph.node_mut(id).message(msg, sample_rate)
306 },
307 AudioRenderThreadMsg::SinkNeedData => {
308 },
311 AudioRenderThreadMsg::DisconnectAllFrom(id) => {
312 context.graph.disconnect_all_from(id)
313 },
314 AudioRenderThreadMsg::DisconnectOutput(out) => context.graph.disconnect_output(out),
315 AudioRenderThreadMsg::DisconnectBetween(from, to) => {
316 context.graph.disconnect_between(from, to)
317 },
318 AudioRenderThreadMsg::DisconnectTo(from, to) => {
319 context.graph.disconnect_to(from, to)
320 },
321 AudioRenderThreadMsg::DisconnectOutputBetween(from, to) => {
322 context.graph.disconnect_output_between(from, to)
323 },
324 AudioRenderThreadMsg::DisconnectOutputBetweenTo(from, to) => {
325 context.graph.disconnect_output_between_to(from, to)
326 },
327 AudioRenderThreadMsg::SetSinkEosCallback(callback) => {
328 context.sink.set_eos_callback(callback);
329 },
330 AudioRenderThreadMsg::SetMute(val) => {
331 context.set_mute(val);
332 },
333 };
334
335 break_loop
336 };
337
338 loop {
339 if self.sink.has_enough_data() || self.state == ProcessingState::Suspended {
340 if event_queue.recv().is_ok_and(|msg| handle_msg(self, msg)) {
346 break;
347 }
348 } else {
349 if event_queue
352 .try_recv()
353 .is_ok_and(|msg| handle_msg(self, msg))
354 {
355 break;
356 }
357
358 if self.state == ProcessingState::Suspended {
359 continue;
361 }
362
363 let data = self.process();
366 if self.sink.push_data(data).is_ok() {
367 self.current_frame += FRAMES_PER_BLOCK;
369 self.current_time = self.current_frame / self.sample_rate as f64;
370 } else {
371 eprintln!("Could not push data to audio sink");
372 }
373 }
374 }
375 }
376}