1use crate::analyser_node::AnalyserNode;
2use crate::biquad_filter_node::BiquadFilterNode;
3use crate::block::{Chunk, Tick, FRAMES_PER_BLOCK};
4use crate::buffer_source_node::AudioBufferSourceNode;
5use crate::channel_node::{ChannelMergerNode, ChannelSplitterNode};
6use crate::constant_source_node::ConstantSourceNode;
7use crate::context::{AudioContextOptions, ProcessingState, StateChangeResult};
8use crate::gain_node::GainNode;
9use crate::graph::{AudioGraph, InputPort, NodeId, OutputPort, PortId};
10use crate::iir_filter_node::IIRFilterNode;
11use crate::media_element_source_node::MediaElementSourceNode;
12use crate::media_stream_destination_node::MediaStreamDestinationNode;
13use crate::media_stream_source_node::MediaStreamSourceNode;
14use crate::node::{AudioNodeEngine, AudioNodeInit, AudioNodeMessage};
15use crate::node::{BlockInfo, ChannelInfo};
16use crate::offline_sink::OfflineAudioSink;
17use crate::oscillator_node::OscillatorNode;
18use crate::panner_node::PannerNode;
19use servo_media_streams::{MediaSocket, MediaStreamId};
20use crate::sink::{AudioSink, AudioSinkError};
21use std::sync::mpsc::{Receiver, Sender};
22use crate::stereo_panner::StereoPannerNode;
23use crate::wave_shaper_node::WaveShaperNode;
24use crate::{AudioBackend, AudioStreamReader};
25
26pub enum AudioRenderThreadMsg {
27 CreateNode(AudioNodeInit, Sender<NodeId>, ChannelInfo),
28 ConnectPorts(PortId<OutputPort>, PortId<InputPort>),
29 MessageNode(NodeId, AudioNodeMessage),
30 Resume(Sender<StateChangeResult>),
31 Suspend(Sender<StateChangeResult>),
32 Close(Sender<StateChangeResult>),
33 SinkNeedData,
34 GetCurrentTime(Sender<f64>),
35
36 DisconnectAllFrom(NodeId),
37 DisconnectOutput(PortId<OutputPort>),
38 DisconnectBetween(NodeId, NodeId),
39 DisconnectTo(NodeId, PortId<InputPort>),
40 DisconnectOutputBetween(PortId<OutputPort>, NodeId),
41 DisconnectOutputBetweenTo(PortId<OutputPort>, PortId<InputPort>),
42
43 SetSinkEosCallback(Box<dyn Fn(Box<dyn AsRef<[f32]>>) + Send + Sync + 'static>),
44
45 SetMute(bool),
46}
47
48pub enum Sink {
49 RealTime(Box<dyn AudioSink>),
50 Offline(OfflineAudioSink),
51}
52
53impl AudioSink for Sink {
54 fn init(
55 &self,
56 sample_rate: f32,
57 sender: Sender<AudioRenderThreadMsg>,
58 ) -> Result<(), AudioSinkError> {
59 match *self {
60 Sink::RealTime(ref sink) => sink.init(sample_rate, sender),
61 Sink::Offline(ref sink) => Ok(sink.init(sample_rate, sender).unwrap()),
62 }
63 }
64
65 fn init_stream(&self, _: u8, _: f32, _: Box<dyn MediaSocket>) -> Result<(), AudioSinkError> {
66 unreachable!("Sink should never be used for MediaStreamDestinationNode")
67 }
68
69 fn play(&self) -> Result<(), AudioSinkError> {
70 match *self {
71 Sink::RealTime(ref sink) => sink.play(),
72 Sink::Offline(ref sink) => Ok(sink.play().unwrap()),
73 }
74 }
75
76 fn stop(&self) -> Result<(), AudioSinkError> {
77 match *self {
78 Sink::RealTime(ref sink) => sink.stop(),
79 Sink::Offline(ref sink) => Ok(sink.stop().unwrap()),
80 }
81 }
82
83 fn has_enough_data(&self) -> bool {
84 match *self {
85 Sink::RealTime(ref sink) => sink.has_enough_data(),
86 Sink::Offline(ref sink) => sink.has_enough_data(),
87 }
88 }
89
90 fn push_data(&self, chunk: Chunk) -> Result<(), AudioSinkError> {
91 match *self {
92 Sink::RealTime(ref sink) => sink.push_data(chunk),
93 Sink::Offline(ref sink) => Ok(sink.push_data(chunk).unwrap()),
94 }
95 }
96
97 fn set_eos_callback(
98 &self,
99 callback: Box<dyn Fn(Box<dyn AsRef<[f32]>>) + Send + Sync + 'static>,
100 ) {
101 match *self {
102 Sink::RealTime(ref sink) => sink.set_eos_callback(callback),
103 Sink::Offline(ref sink) => sink.set_eos_callback(callback),
104 }
105 }
106}
107
108pub struct AudioRenderThread {
109 pub graph: AudioGraph,
110 pub sink: Sink,
111 pub sink_factory: Box<dyn Fn() -> Result<Box<dyn AudioSink + 'static>, AudioSinkError>>,
112 pub reader_factory: Box<dyn Fn(MediaStreamId, f32) -> Box<dyn AudioStreamReader + Send>>,
113 pub state: ProcessingState,
114 pub sample_rate: f32,
115 pub current_time: f64,
116 pub current_frame: Tick,
117 pub muted: bool,
118}
119
120impl AudioRenderThread {
121 fn prepare_thread<B: AudioBackend>(
125 sender: Sender<AudioRenderThreadMsg>,
126 sample_rate: f32,
127 graph: AudioGraph,
128 options: AudioContextOptions,
129 ) -> Result<Self, AudioSinkError> {
130 let sink_factory = Box::new(|| B::make_sink().map(|s| Box::new(s) as Box<dyn AudioSink>));
131 let reader_factory = Box::new(|id, sample_rate| B::make_streamreader(id, sample_rate));
132 let sink = match options {
133 AudioContextOptions::RealTimeAudioContext(_) => Sink::RealTime(sink_factory()?),
134 AudioContextOptions::OfflineAudioContext(options) => Sink::Offline(
135 OfflineAudioSink::new(options.channels as usize, options.length),
136 ),
137 };
138
139 sink.init(sample_rate, sender)?;
140
141 Ok(Self {
142 graph,
143 sink,
144 sink_factory,
145 reader_factory,
146 state: ProcessingState::Suspended,
147 sample_rate,
148 current_time: 0.,
149 current_frame: Tick(0),
150 muted: false,
151 })
152 }
153
154 pub fn start<B: AudioBackend>(
158 event_queue: Receiver<AudioRenderThreadMsg>,
159 sender: Sender<AudioRenderThreadMsg>,
160 sample_rate: f32,
161 graph: AudioGraph,
162 options: AudioContextOptions,
163 init_sender: Sender<Result<(), AudioSinkError>>,
164 ) {
165 let mut thread =
166 match Self::prepare_thread::<B>(sender.clone(), sample_rate, graph, options) {
167 Ok(thread) => {
168 let _ = init_sender.send(Ok(()));
169 thread
170 }
171 Err(e) => {
172 let _ = init_sender.send(Err(e));
173 return;
174 }
175 };
176
177 thread.event_loop(event_queue);
178 }
179
180 make_render_thread_state_change!(resume, Running, play);
181
182 make_render_thread_state_change!(suspend, Suspended, stop);
183
184 fn create_node(&mut self, node_type: AudioNodeInit, ch: ChannelInfo) -> NodeId {
185 let mut needs_listener = false;
186 let mut is_dest = false;
187 let node: Box<dyn AudioNodeEngine> = match node_type {
188 AudioNodeInit::AnalyserNode(sender) => Box::new(AnalyserNode::new(sender, ch)),
189 AudioNodeInit::AudioBufferSourceNode(options) => {
190 Box::new(AudioBufferSourceNode::new(options, ch))
191 }
192 AudioNodeInit::BiquadFilterNode(options) => {
193 Box::new(BiquadFilterNode::new(options, ch, self.sample_rate))
194 }
195 AudioNodeInit::GainNode(options) => Box::new(GainNode::new(options, ch)),
196 AudioNodeInit::StereoPannerNode(options) => {
197 Box::new(StereoPannerNode::new(options, ch))
198 }
199 AudioNodeInit::PannerNode(options) => {
200 needs_listener = true;
201 Box::new(PannerNode::new(options, ch))
202 }
203 AudioNodeInit::MediaStreamSourceNode(id) => {
204 let reader = (self.reader_factory)(id, self.sample_rate);
205 Box::new(MediaStreamSourceNode::new(reader, ch))
206 }
207 AudioNodeInit::OscillatorNode(options) => Box::new(OscillatorNode::new(options, ch)),
208 AudioNodeInit::ChannelMergerNode(options) => {
209 Box::new(ChannelMergerNode::new(options, ch))
210 }
211 AudioNodeInit::ConstantSourceNode(options) => {
212 Box::new(ConstantSourceNode::new(options, ch))
213 }
214 AudioNodeInit::MediaStreamDestinationNode(socket) => {
215 is_dest = true;
216 Box::new(MediaStreamDestinationNode::new(
217 socket,
218 self.sample_rate,
219 (self.sink_factory)().unwrap(),
220 ch,
221 ))
222 }
223 AudioNodeInit::ChannelSplitterNode => Box::new(ChannelSplitterNode::new(ch)),
224 AudioNodeInit::WaveShaperNode(options) => Box::new(WaveShaperNode::new(options, ch)),
225 AudioNodeInit::MediaElementSourceNode => Box::new(MediaElementSourceNode::new(ch)),
226 AudioNodeInit::IIRFilterNode(options) => Box::new(IIRFilterNode::new(options, ch)),
227 _ => unimplemented!(),
228 };
229 let id = self.graph.add_node(node);
230 if needs_listener {
231 let listener = self.graph.listener_id().output(0);
232 self.graph.add_edge(listener, id.listener());
233 }
234 if is_dest {
235 self.graph.add_extra_dest(id);
236 }
237 id
238 }
239
240 fn connect_ports(&mut self, output: PortId<OutputPort>, input: PortId<InputPort>) {
241 self.graph.add_edge(output, input)
242 }
243
244 fn process(&mut self) -> Chunk {
245 if self.muted {
246 return Chunk::explicit_silence();
247 }
248
249 let info = BlockInfo {
250 sample_rate: self.sample_rate,
251 frame: self.current_frame,
252 time: self.current_time,
253 };
254 self.graph.process(&info)
255 }
256
257 fn set_mute(&mut self, val: bool) -> () {
258 self.muted = val;
259 }
260
261 fn event_loop(&mut self, event_queue: Receiver<AudioRenderThreadMsg>) {
262 let sample_rate = self.sample_rate;
263 let handle_msg = move |context: &mut Self, msg: AudioRenderThreadMsg| -> bool {
264 let mut break_loop = false;
265 match msg {
266 AudioRenderThreadMsg::CreateNode(node_type, tx, ch) => {
267 let _ = tx.send(context.create_node(node_type, ch));
268 }
269 AudioRenderThreadMsg::ConnectPorts(output, input) => {
270 context.connect_ports(output, input);
271 }
272 AudioRenderThreadMsg::Resume(tx) => {
273 let _ = tx.send(context.resume());
274 }
275 AudioRenderThreadMsg::Suspend(tx) => {
276 let _ = tx.send(context.suspend());
277 }
278 AudioRenderThreadMsg::Close(tx) => {
279 let _ = tx.send(context.suspend());
280 break_loop = true;
281 }
282 AudioRenderThreadMsg::GetCurrentTime(response) => {
283 response.send(context.current_time).unwrap()
284 }
285 AudioRenderThreadMsg::MessageNode(id, msg) => {
286 context.graph.node_mut(id).message(msg, sample_rate)
287 }
288 AudioRenderThreadMsg::SinkNeedData => {
289 }
292 AudioRenderThreadMsg::DisconnectAllFrom(id) => {
293 context.graph.disconnect_all_from(id)
294 }
295 AudioRenderThreadMsg::DisconnectOutput(out) => context.graph.disconnect_output(out),
296 AudioRenderThreadMsg::DisconnectBetween(from, to) => {
297 context.graph.disconnect_between(from, to)
298 }
299 AudioRenderThreadMsg::DisconnectTo(from, to) => {
300 context.graph.disconnect_to(from, to)
301 }
302 AudioRenderThreadMsg::DisconnectOutputBetween(from, to) => {
303 context.graph.disconnect_output_between(from, to)
304 }
305 AudioRenderThreadMsg::DisconnectOutputBetweenTo(from, to) => {
306 context.graph.disconnect_output_between_to(from, to)
307 }
308 AudioRenderThreadMsg::SetSinkEosCallback(callback) => {
309 context.sink.set_eos_callback(callback);
310 }
311 AudioRenderThreadMsg::SetMute(val) => {
312 context.set_mute(val);
313 }
314 };
315
316 break_loop
317 };
318
319 loop {
320 if self.sink.has_enough_data() || self.state == ProcessingState::Suspended {
321 if let Ok(msg) = event_queue.recv() {
327 if handle_msg(self, msg) {
328 break;
329 }
330 }
331 } else {
332 if let Ok(msg) = event_queue.try_recv() {
335 if handle_msg(self, msg) {
336 break;
337 }
338 }
339
340 if self.state == ProcessingState::Suspended {
341 continue;
343 }
344
345 let data = self.process();
348 if self.sink.push_data(data).is_ok() {
349 self.current_frame += FRAMES_PER_BLOCK;
351 self.current_time = self.current_frame / self.sample_rate as f64;
352 } else {
353 eprintln!("Could not push data to audio sink");
354 }
355 }
356 }
357 }
358}