1use std::sync::mpsc::{Receiver, Sender};
6
7use malloc_size_of_derive::MallocSizeOf;
8use servo_media_streams::{MediaSocket, MediaStreamId};
9
10use crate::analyser_node::AnalyserNode;
11use crate::biquad_filter_node::BiquadFilterNode;
12use crate::block::{Chunk, FRAMES_PER_BLOCK, Tick};
13use crate::buffer_source_node::AudioBufferSourceNode;
14use crate::channel_node::{ChannelMergerNode, ChannelSplitterNode};
15use crate::constant_source_node::ConstantSourceNode;
16use crate::context::{AudioContextOptions, ProcessingState, StateChangeResult};
17use crate::gain_node::GainNode;
18use crate::graph::{AudioGraph, InputPort, NodeId, OutputPort, PortId};
19use crate::iir_filter_node::IIRFilterNode;
20use crate::media_element_source_node::MediaElementSourceNode;
21use crate::media_stream_destination_node::MediaStreamDestinationNode;
22use crate::media_stream_source_node::MediaStreamSourceNode;
23use crate::node::{AudioNodeEngine, AudioNodeInit, AudioNodeMessage, BlockInfo, ChannelInfo};
24use crate::offline_sink::OfflineAudioSink;
25use crate::oscillator_node::OscillatorNode;
26use crate::panner_node::PannerNode;
27use crate::sink::{AudioSink, AudioSinkError};
28use crate::stereo_panner::StereoPannerNode;
29use crate::wave_shaper_node::WaveShaperNode;
30use crate::{AudioBackend, AudioStreamReader};
31
32pub type SinkEosCallback = Box<dyn Fn(Box<dyn AsRef<[f32]>>) + Send + Sync + 'static>;
33
34#[derive(MallocSizeOf)]
35pub enum AudioRenderThreadMsg {
36 CreateNode(AudioNodeInit, Sender<Option<NodeId>>, ChannelInfo),
37 ConnectPorts(PortId<OutputPort>, PortId<InputPort>),
38 MessageNode(NodeId, AudioNodeMessage),
39 Resume(Sender<StateChangeResult>),
40 Suspend(Sender<StateChangeResult>),
41 Close(Sender<StateChangeResult>),
42 SinkNeedData,
43 GetCurrentTime(Sender<f64>),
44
45 DisconnectAllFrom(NodeId),
46 DisconnectOutput(PortId<OutputPort>),
47 DisconnectBetween(NodeId, NodeId),
48 DisconnectTo(NodeId, PortId<InputPort>),
49 DisconnectOutputBetween(PortId<OutputPort>, NodeId),
50 DisconnectOutputBetweenTo(PortId<OutputPort>, PortId<InputPort>),
51
52 SetSinkEosCallback(#[ignore_malloc_size_of = "Fn"] SinkEosCallback),
53
54 SetMute(bool),
55}
56
57pub enum Sink {
58 RealTime(Box<dyn AudioSink>),
59 Offline(OfflineAudioSink),
60}
61
62impl AudioSink for Sink {
63 fn init(
64 &self,
65 sample_rate: f32,
66 sender: Sender<AudioRenderThreadMsg>,
67 ) -> Result<(), AudioSinkError> {
68 match *self {
69 Sink::RealTime(ref sink) => sink.init(sample_rate, sender),
70 Sink::Offline(ref sink) => {
71 sink.init(sample_rate, sender).unwrap();
72 Ok(())
73 },
74 }
75 }
76
77 fn init_stream(&self, _: u8, _: f32, _: Box<dyn MediaSocket>) -> Result<(), AudioSinkError> {
78 unreachable!("Sink should never be used for MediaStreamDestinationNode")
79 }
80
81 fn play(&self) -> Result<(), AudioSinkError> {
82 match *self {
83 Sink::RealTime(ref sink) => sink.play(),
84 Sink::Offline(ref sink) => {
85 sink.play().unwrap();
86 Ok(())
87 },
88 }
89 }
90
91 fn stop(&self) -> Result<(), AudioSinkError> {
92 match *self {
93 Sink::RealTime(ref sink) => sink.stop(),
94 Sink::Offline(ref sink) => {
95 sink.stop().unwrap();
96 Ok(())
97 },
98 }
99 }
100
101 fn has_enough_data(&self) -> bool {
102 match *self {
103 Sink::RealTime(ref sink) => sink.has_enough_data(),
104 Sink::Offline(ref sink) => sink.has_enough_data(),
105 }
106 }
107
108 fn push_data(&self, chunk: Chunk) -> Result<(), AudioSinkError> {
109 match *self {
110 Sink::RealTime(ref sink) => sink.push_data(chunk),
111 Sink::Offline(ref sink) => {
112 sink.push_data(chunk).unwrap();
113 Ok(())
114 },
115 }
116 }
117
118 fn set_eos_callback(
119 &self,
120 callback: Box<dyn Fn(Box<dyn AsRef<[f32]>>) + Send + Sync + 'static>,
121 ) {
122 match *self {
123 Sink::RealTime(ref sink) => sink.set_eos_callback(callback),
124 Sink::Offline(ref sink) => sink.set_eos_callback(callback),
125 }
126 }
127}
128
129pub type ReaderFactoryCallback =
130 dyn Fn(MediaStreamId, f32) -> Result<Box<dyn AudioStreamReader + Send>, AudioSinkError>;
131
132pub struct AudioRenderThread {
133 pub graph: AudioGraph,
134 pub sink: Sink,
135 pub sink_factory: Box<dyn Fn() -> Result<Box<dyn AudioSink + 'static>, AudioSinkError>>,
136 pub reader_factory: Box<ReaderFactoryCallback>,
137 pub state: ProcessingState,
138 pub sample_rate: f32,
139 pub current_time: f64,
140 pub current_frame: Tick,
141 pub muted: bool,
142}
143
144impl AudioRenderThread {
145 fn prepare_thread<B: AudioBackend>(
149 sender: Sender<AudioRenderThreadMsg>,
150 sample_rate: f32,
151 graph: AudioGraph,
152 options: AudioContextOptions,
153 ) -> Result<Self, AudioSinkError> {
154 let sink_factory = Box::new(|| B::make_sink().map(|s| Box::new(s) as Box<dyn AudioSink>));
155 let reader_factory = Box::new(|id, sample_rate| B::make_streamreader(id, sample_rate));
156 let sink = match options {
157 AudioContextOptions::RealTimeAudioContext(_) => Sink::RealTime(sink_factory()?),
158 AudioContextOptions::OfflineAudioContext(options) => Sink::Offline(
159 OfflineAudioSink::new(options.channels as usize, options.length),
160 ),
161 };
162
163 sink.init(sample_rate, sender)?;
164
165 Ok(Self {
166 graph,
167 sink,
168 sink_factory,
169 reader_factory,
170 state: ProcessingState::Suspended,
171 sample_rate,
172 current_time: 0.,
173 current_frame: Tick(0),
174 muted: false,
175 })
176 }
177
178 pub fn start<B: AudioBackend>(
182 event_queue: Receiver<AudioRenderThreadMsg>,
183 sender: Sender<AudioRenderThreadMsg>,
184 sample_rate: f32,
185 graph: AudioGraph,
186 options: AudioContextOptions,
187 init_sender: Sender<Result<(), AudioSinkError>>,
188 ) {
189 let mut thread =
190 match Self::prepare_thread::<B>(sender.clone(), sample_rate, graph, options) {
191 Ok(thread) => {
192 let _ = init_sender.send(Ok(()));
193 thread
194 },
195 Err(e) => {
196 let _ = init_sender.send(Err(e));
197 return;
198 },
199 };
200
201 thread.event_loop(event_queue);
202 }
203
204 make_render_thread_state_change!(resume, Running, play);
205
206 make_render_thread_state_change!(suspend, Suspended, stop);
207
208 fn create_node(&mut self, node_type: AudioNodeInit, ch: ChannelInfo) -> Option<NodeId> {
209 let mut needs_listener = false;
210 let mut is_dest = false;
211 let node: Box<dyn AudioNodeEngine> = match node_type {
212 AudioNodeInit::AnalyserNode(sender) => Box::new(AnalyserNode::new(sender, ch)),
213 AudioNodeInit::AudioBufferSourceNode(options) => {
214 Box::new(AudioBufferSourceNode::new(options, ch))
215 },
216 AudioNodeInit::BiquadFilterNode(options) => {
217 Box::new(BiquadFilterNode::new(options, ch, self.sample_rate))
218 },
219 AudioNodeInit::GainNode(options) => Box::new(GainNode::new(options, ch)),
220 AudioNodeInit::StereoPannerNode(options) => {
221 Box::new(StereoPannerNode::new(options, ch))
222 },
223 AudioNodeInit::PannerNode(options) => {
224 needs_listener = true;
225 Box::new(PannerNode::new(options, ch))
226 },
227 AudioNodeInit::MediaStreamSourceNode(id) => {
228 let reader = (self.reader_factory)(id, self.sample_rate);
229 Box::new(MediaStreamSourceNode::new(reader.ok()?, ch))
230 },
231 AudioNodeInit::OscillatorNode(options) => Box::new(OscillatorNode::new(options, ch)),
232 AudioNodeInit::ChannelMergerNode(options) => {
233 Box::new(ChannelMergerNode::new(options, ch))
234 },
235 AudioNodeInit::ConstantSourceNode(options) => {
236 Box::new(ConstantSourceNode::new(options, ch))
237 },
238 AudioNodeInit::MediaStreamDestinationNode(socket) => {
239 is_dest = true;
240 Box::new(MediaStreamDestinationNode::new(
241 socket,
242 self.sample_rate,
243 (self.sink_factory)().unwrap(),
244 ch,
245 ))
246 },
247 AudioNodeInit::ChannelSplitterNode => Box::new(ChannelSplitterNode::new(ch)),
248 AudioNodeInit::WaveShaperNode(options) => Box::new(WaveShaperNode::new(options, ch)),
249 AudioNodeInit::MediaElementSourceNode => Box::new(MediaElementSourceNode::new(ch)),
250 AudioNodeInit::IIRFilterNode(options) => Box::new(IIRFilterNode::new(options, ch)),
251 _ => unimplemented!(),
252 };
253 let id = self.graph.add_node(node);
254 if needs_listener {
255 let listener = self.graph.listener_id().output(0);
256 self.graph.add_edge(listener, id.listener());
257 }
258 if is_dest {
259 self.graph.add_extra_dest(id);
260 }
261 Some(id)
262 }
263
264 fn connect_ports(&mut self, output: PortId<OutputPort>, input: PortId<InputPort>) {
265 self.graph.add_edge(output, input)
266 }
267
268 fn process(&mut self) -> Chunk {
269 if self.muted {
270 return Chunk::explicit_silence();
271 }
272
273 let info = BlockInfo {
274 sample_rate: self.sample_rate,
275 frame: self.current_frame,
276 time: self.current_time,
277 };
278 self.graph.process(&info)
279 }
280
281 fn set_mute(&mut self, val: bool) {
282 self.muted = val;
283 }
284
285 fn event_loop(&mut self, event_queue: Receiver<AudioRenderThreadMsg>) {
286 let sample_rate = self.sample_rate;
287 let handle_msg = move |context: &mut Self, msg: AudioRenderThreadMsg| -> bool {
288 let mut break_loop = false;
289 match msg {
290 AudioRenderThreadMsg::CreateNode(node_type, tx, ch) => {
291 let _ = tx.send(context.create_node(node_type, ch));
292 },
293 AudioRenderThreadMsg::ConnectPorts(output, input) => {
294 context.connect_ports(output, input);
295 },
296 AudioRenderThreadMsg::Resume(tx) => {
297 let _ = tx.send(context.resume());
298 },
299 AudioRenderThreadMsg::Suspend(tx) => {
300 let _ = tx.send(context.suspend());
301 },
302 AudioRenderThreadMsg::Close(tx) => {
303 let _ = tx.send(context.suspend());
304 break_loop = true;
305 },
306 AudioRenderThreadMsg::GetCurrentTime(response) => {
307 response.send(context.current_time).unwrap()
308 },
309 AudioRenderThreadMsg::MessageNode(id, msg) => {
310 context.graph.node_mut(id).message(msg, sample_rate)
311 },
312 AudioRenderThreadMsg::SinkNeedData => {
313 },
316 AudioRenderThreadMsg::DisconnectAllFrom(id) => {
317 context.graph.disconnect_all_from(id)
318 },
319 AudioRenderThreadMsg::DisconnectOutput(out) => context.graph.disconnect_output(out),
320 AudioRenderThreadMsg::DisconnectBetween(from, to) => {
321 context.graph.disconnect_between(from, to)
322 },
323 AudioRenderThreadMsg::DisconnectTo(from, to) => {
324 context.graph.disconnect_to(from, to)
325 },
326 AudioRenderThreadMsg::DisconnectOutputBetween(from, to) => {
327 context.graph.disconnect_output_between(from, to)
328 },
329 AudioRenderThreadMsg::DisconnectOutputBetweenTo(from, to) => {
330 context.graph.disconnect_output_between_to(from, to)
331 },
332 AudioRenderThreadMsg::SetSinkEosCallback(callback) => {
333 context.sink.set_eos_callback(callback);
334 },
335 AudioRenderThreadMsg::SetMute(val) => {
336 context.set_mute(val);
337 },
338 };
339
340 break_loop
341 };
342
343 loop {
344 if self.sink.has_enough_data() || self.state == ProcessingState::Suspended {
345 if event_queue.recv().is_ok_and(|msg| handle_msg(self, msg)) {
351 break;
352 }
353 } else {
354 if event_queue
357 .try_recv()
358 .is_ok_and(|msg| handle_msg(self, msg))
359 {
360 break;
361 }
362
363 if self.state == ProcessingState::Suspended {
364 continue;
366 }
367
368 let data = self.process();
371 if self.sink.push_data(data).is_ok() {
372 self.current_frame += FRAMES_PER_BLOCK;
374 self.current_time = self.current_frame / self.sample_rate as f64;
375 } else {
376 eprintln!("Could not push data to audio sink");
377 }
378 }
379 }
380 }
381}