1use std::sync::mpsc::{Receiver, Sender};
6
7use malloc_size_of_derive::MallocSizeOf;
8use servo_media_streams::{MediaSocket, MediaStreamId};
9
10use crate::analyser_node::AnalyserNode;
11use crate::biquad_filter_node::BiquadFilterNode;
12use crate::block::{Chunk, FRAMES_PER_BLOCK, Tick};
13use crate::buffer_source_node::AudioBufferSourceNode;
14use crate::channel_node::{ChannelMergerNode, ChannelSplitterNode};
15use crate::constant_source_node::ConstantSourceNode;
16use crate::context::{AudioContextOptions, ProcessingState, StateChangeResult};
17use crate::gain_node::GainNode;
18use crate::graph::{AudioGraph, InputPort, NodeId, OutputPort, PortId};
19use crate::iir_filter_node::IIRFilterNode;
20use crate::media_element_source_node::MediaElementSourceNode;
21use crate::media_stream_destination_node::MediaStreamDestinationNode;
22use crate::media_stream_source_node::MediaStreamSourceNode;
23use crate::node::{AudioNodeEngine, AudioNodeInit, AudioNodeMessage, BlockInfo, ChannelInfo};
24use crate::offline_sink::OfflineAudioSink;
25use crate::oscillator_node::OscillatorNode;
26use crate::panner_node::PannerNode;
27use crate::sink::{AudioSink, AudioSinkError};
28use crate::stereo_panner::StereoPannerNode;
29use crate::wave_shaper_node::WaveShaperNode;
30use crate::{AudioBackend, AudioStreamReader};
31
32pub type SinkEosCallback = Box<dyn Fn(Box<dyn AsRef<[f32]>>) + Send + Sync + 'static>;
33
34#[derive(MallocSizeOf)]
35pub enum AudioRenderThreadMsg {
36 CreateNode(AudioNodeInit, Sender<Option<NodeId>>, ChannelInfo),
37 ConnectPorts(PortId<OutputPort>, PortId<InputPort>),
38 MessageNode(NodeId, AudioNodeMessage),
39 Resume(Sender<StateChangeResult>),
40 Suspend(Sender<StateChangeResult>),
41 Close(Sender<StateChangeResult>),
42 SinkNeedData,
43 GetCurrentTime(Sender<f64>),
44
45 DisconnectAllFrom(NodeId),
46 DisconnectOutput(PortId<OutputPort>),
47 DisconnectBetween(NodeId, NodeId),
48 DisconnectTo(NodeId, PortId<InputPort>),
49 DisconnectOutputBetween(PortId<OutputPort>, NodeId),
50 DisconnectOutputBetweenTo(PortId<OutputPort>, PortId<InputPort>),
51
52 SetSinkEosCallback(#[ignore_malloc_size_of = "Fn"] SinkEosCallback),
53
54 SetMute(bool),
55}
56
57pub enum Sink {
58 RealTime(Box<dyn AudioSink>),
59 Offline(OfflineAudioSink),
60}
61
62impl AudioSink for Sink {
63 fn init(
64 &self,
65 sample_rate: f32,
66 sender: Sender<AudioRenderThreadMsg>,
67 ) -> Result<(), AudioSinkError> {
68 match *self {
69 Sink::RealTime(ref sink) => sink.init(sample_rate, sender),
70 Sink::Offline(ref sink) => {
71 sink.init(sample_rate, sender).unwrap();
72 Ok(())
73 },
74 }
75 }
76
77 fn init_stream(&self, _: u8, _: f32, _: Box<dyn MediaSocket>) -> Result<(), AudioSinkError> {
78 unreachable!("Sink should never be used for MediaStreamDestinationNode")
79 }
80
81 fn play(&self) -> Result<(), AudioSinkError> {
82 match *self {
83 Sink::RealTime(ref sink) => sink.play(),
84 Sink::Offline(ref sink) => {
85 sink.play().unwrap();
86 Ok(())
87 },
88 }
89 }
90
91 fn stop(&self) -> Result<(), AudioSinkError> {
92 match *self {
93 Sink::RealTime(ref sink) => sink.stop(),
94 Sink::Offline(ref sink) => {
95 sink.stop().unwrap();
96 Ok(())
97 },
98 }
99 }
100
101 fn has_enough_data(&self) -> bool {
102 match *self {
103 Sink::RealTime(ref sink) => sink.has_enough_data(),
104 Sink::Offline(ref sink) => sink.has_enough_data(),
105 }
106 }
107
108 fn push_data(&self, chunk: Chunk) -> Result<(), AudioSinkError> {
109 match *self {
110 Sink::RealTime(ref sink) => sink.push_data(chunk),
111 Sink::Offline(ref sink) => {
112 sink.push_data(chunk).unwrap();
113 Ok(())
114 },
115 }
116 }
117
118 fn set_eos_callback(
119 &self,
120 callback: Box<dyn Fn(Box<dyn AsRef<[f32]>>) + Send + Sync + 'static>,
121 ) {
122 match *self {
123 Sink::RealTime(ref sink) => sink.set_eos_callback(callback),
124 Sink::Offline(ref sink) => sink.set_eos_callback(callback),
125 }
126 }
127}
128
129pub type ReaderFactoryCallback =
130 dyn Fn(MediaStreamId, f32) -> Result<Box<dyn AudioStreamReader + Send>, AudioSinkError>;
131
132pub struct AudioRenderThread {
133 pub graph: AudioGraph,
134 pub sink: Sink,
135 pub sink_factory: Box<dyn Fn() -> Result<Box<dyn AudioSink + 'static>, AudioSinkError>>,
136 pub reader_factory: Box<ReaderFactoryCallback>,
137 pub state: ProcessingState,
138 pub sample_rate: f32,
139 pub current_time: f64,
140 pub current_frame: Tick,
141 pub muted: bool,
142}
143
144impl AudioRenderThread {
145 fn prepare_thread<B: AudioBackend>(
149 sender: Sender<AudioRenderThreadMsg>,
150 sample_rate: f32,
151 graph: AudioGraph,
152 options: AudioContextOptions,
153 ) -> Result<Self, AudioSinkError> {
154 let sink_factory = Box::new(|| B::make_sink().map(|s| Box::new(s) as Box<dyn AudioSink>));
155 let reader_factory = Box::new(|id, sample_rate| B::make_streamreader(id, sample_rate));
156 let sink = match options {
157 AudioContextOptions::RealTimeAudioContext(_) => Sink::RealTime(sink_factory()?),
158 AudioContextOptions::OfflineAudioContext(options) => Sink::Offline(
159 OfflineAudioSink::new(options.channels as usize, options.length),
160 ),
161 };
162
163 sink.init(sample_rate, sender)?;
164
165 Ok(Self {
166 graph,
167 sink,
168 sink_factory,
169 reader_factory,
170 state: ProcessingState::Suspended,
171 sample_rate,
172 current_time: 0.,
173 current_frame: Tick(0),
174 muted: false,
175 })
176 }
177
178 pub fn start<B: AudioBackend>(
182 event_queue: Receiver<AudioRenderThreadMsg>,
183 sender: Sender<AudioRenderThreadMsg>,
184 sample_rate: f32,
185 graph: AudioGraph,
186 options: AudioContextOptions,
187 init_sender: Sender<Result<(), AudioSinkError>>,
188 ) {
189 let mut thread = match Self::prepare_thread::<B>(sender, sample_rate, graph, options) {
190 Ok(thread) => {
191 let _ = init_sender.send(Ok(()));
192 thread
193 },
194 Err(e) => {
195 let _ = init_sender.send(Err(e));
196 return;
197 },
198 };
199
200 thread.event_loop(event_queue);
201 }
202
203 make_render_thread_state_change!(resume, Running, play);
204
205 make_render_thread_state_change!(suspend, Suspended, stop);
206
207 fn create_node(&mut self, node_type: AudioNodeInit, ch: ChannelInfo) -> Option<NodeId> {
208 let mut needs_listener = false;
209 let mut is_dest = false;
210 let node: Box<dyn AudioNodeEngine> = match node_type {
211 AudioNodeInit::AnalyserNode(sender) => Box::new(AnalyserNode::new(sender, ch)),
212 AudioNodeInit::AudioBufferSourceNode(options) => {
213 Box::new(AudioBufferSourceNode::new(options, ch))
214 },
215 AudioNodeInit::BiquadFilterNode(options) => {
216 Box::new(BiquadFilterNode::new(options, ch, self.sample_rate))
217 },
218 AudioNodeInit::GainNode(options) => Box::new(GainNode::new(options, ch)),
219 AudioNodeInit::StereoPannerNode(options) => {
220 Box::new(StereoPannerNode::new(options, ch))
221 },
222 AudioNodeInit::PannerNode(options) => {
223 needs_listener = true;
224 Box::new(PannerNode::new(options, ch))
225 },
226 AudioNodeInit::MediaStreamSourceNode(id) => {
227 let reader = (self.reader_factory)(id, self.sample_rate);
228 Box::new(MediaStreamSourceNode::new(reader.ok()?, ch))
229 },
230 AudioNodeInit::OscillatorNode(options) => Box::new(OscillatorNode::new(options, ch)),
231 AudioNodeInit::ChannelMergerNode(options) => {
232 Box::new(ChannelMergerNode::new(options, ch))
233 },
234 AudioNodeInit::ConstantSourceNode(options) => {
235 Box::new(ConstantSourceNode::new(options, ch))
236 },
237 AudioNodeInit::MediaStreamDestinationNode(socket) => {
238 is_dest = true;
239 Box::new(MediaStreamDestinationNode::new(
240 socket,
241 self.sample_rate,
242 (self.sink_factory)().unwrap(),
243 ch,
244 ))
245 },
246 AudioNodeInit::ChannelSplitterNode => Box::new(ChannelSplitterNode::new(ch)),
247 AudioNodeInit::WaveShaperNode(options) => Box::new(WaveShaperNode::new(options, ch)),
248 AudioNodeInit::MediaElementSourceNode => Box::new(MediaElementSourceNode::new(ch)),
249 AudioNodeInit::IIRFilterNode(options) => Box::new(IIRFilterNode::new(options, ch)),
250 _ => unimplemented!(),
251 };
252 let id = self.graph.add_node(node);
253 if needs_listener {
254 let listener = self.graph.listener_id().output(0);
255 self.graph.add_edge(listener, id.listener());
256 }
257 if is_dest {
258 self.graph.add_extra_dest(id);
259 }
260 Some(id)
261 }
262
263 fn connect_ports(&mut self, output: PortId<OutputPort>, input: PortId<InputPort>) {
264 self.graph.add_edge(output, input)
265 }
266
267 fn process(&mut self) -> Chunk {
268 if self.muted {
269 return Chunk::explicit_silence();
270 }
271
272 let info = BlockInfo {
273 sample_rate: self.sample_rate,
274 frame: self.current_frame,
275 time: self.current_time,
276 };
277 self.graph.process(&info)
278 }
279
280 fn set_mute(&mut self, val: bool) {
281 self.muted = val;
282 }
283
284 fn event_loop(&mut self, event_queue: Receiver<AudioRenderThreadMsg>) {
285 let sample_rate = self.sample_rate;
286 let handle_msg = move |context: &mut Self, msg: AudioRenderThreadMsg| -> bool {
287 let mut break_loop = false;
288 match msg {
289 AudioRenderThreadMsg::CreateNode(node_type, tx, ch) => {
290 let _ = tx.send(context.create_node(node_type, ch));
291 },
292 AudioRenderThreadMsg::ConnectPorts(output, input) => {
293 context.connect_ports(output, input);
294 },
295 AudioRenderThreadMsg::Resume(tx) => {
296 let _ = tx.send(context.resume());
297 },
298 AudioRenderThreadMsg::Suspend(tx) => {
299 let _ = tx.send(context.suspend());
300 },
301 AudioRenderThreadMsg::Close(tx) => {
302 let _ = tx.send(context.suspend());
303 break_loop = true;
304 },
305 AudioRenderThreadMsg::GetCurrentTime(response) => {
306 response.send(context.current_time).unwrap()
307 },
308 AudioRenderThreadMsg::MessageNode(id, msg) => {
309 context.graph.node_mut(id).message(msg, sample_rate)
310 },
311 AudioRenderThreadMsg::SinkNeedData => {
312 },
315 AudioRenderThreadMsg::DisconnectAllFrom(id) => {
316 context.graph.disconnect_all_from(id)
317 },
318 AudioRenderThreadMsg::DisconnectOutput(out) => context.graph.disconnect_output(out),
319 AudioRenderThreadMsg::DisconnectBetween(from, to) => {
320 context.graph.disconnect_between(from, to)
321 },
322 AudioRenderThreadMsg::DisconnectTo(from, to) => {
323 context.graph.disconnect_to(from, to)
324 },
325 AudioRenderThreadMsg::DisconnectOutputBetween(from, to) => {
326 context.graph.disconnect_output_between(from, to)
327 },
328 AudioRenderThreadMsg::DisconnectOutputBetweenTo(from, to) => {
329 context.graph.disconnect_output_between_to(from, to)
330 },
331 AudioRenderThreadMsg::SetSinkEosCallback(callback) => {
332 context.sink.set_eos_callback(callback);
333 },
334 AudioRenderThreadMsg::SetMute(val) => {
335 context.set_mute(val);
336 },
337 };
338
339 break_loop
340 };
341
342 loop {
343 if self.sink.has_enough_data() || self.state == ProcessingState::Suspended {
344 if event_queue.recv().is_ok_and(|msg| handle_msg(self, msg)) {
350 break;
351 }
352 } else {
353 if event_queue
356 .try_recv()
357 .is_ok_and(|msg| handle_msg(self, msg))
358 {
359 break;
360 }
361
362 if self.state == ProcessingState::Suspended {
363 continue;
365 }
366
367 let data = self.process();
370 if self.sink.push_data(data).is_ok() {
371 self.current_frame += FRAMES_PER_BLOCK;
373 self.current_time = self.current_frame / self.sample_rate as f64;
374 } else {
375 eprintln!("Could not push data to audio sink");
376 }
377 }
378 }
379 }
380}