servo_media_gstreamer/
audio_sink.rs1use std::cell::{Cell, RefCell};
6use std::sync::Arc;
7use std::sync::mpsc::Sender;
8use std::thread::Builder;
9
10use byte_slice_cast::*;
11use gstreamer;
12use gstreamer::prelude::*;
13use gstreamer_app::{AppSrc, AppSrcCallbacks};
14use gstreamer_audio;
15use servo_media_audio::block::{Chunk, FRAMES_PER_BLOCK};
16use servo_media_audio::render_thread::AudioRenderThreadMsg;
17use servo_media_audio::sink::{AudioSink, AudioSinkError};
18use servo_media_streams::MediaSocket;
19
20use crate::media_stream::GstreamerMediaSocket;
21
22const DEFAULT_SAMPLE_RATE: f32 = 44100.;
23
24pub struct GStreamerAudioSink {
25 pipeline: gstreamer::Pipeline,
26 appsrc: Arc<AppSrc>,
27 sample_rate: Cell<f32>,
28 audio_info: RefCell<Option<gstreamer_audio::AudioInfo>>,
29 sample_offset: Cell<u64>,
30}
31
32impl GStreamerAudioSink {
33 pub fn new() -> Result<Self, AudioSinkError> {
34 if let Some(category) = gstreamer::DebugCategory::get("openslessink") {
35 category.set_threshold(gstreamer::DebugLevel::Trace);
36 }
37 gstreamer::init().map_err(|error| {
38 AudioSinkError::Backend(format!("GStreamer init failed: {error:?}"))
39 })?;
40
41 let appsrc = gstreamer::ElementFactory::make("appsrc")
42 .build()
43 .map_err(|error| {
44 AudioSinkError::Backend(format!("appsrc creation failed: {error:?}"))
45 })?;
46 let appsrc = appsrc.downcast::<AppSrc>().unwrap();
47
48 Ok(Self {
49 pipeline: gstreamer::Pipeline::new(),
50 appsrc: Arc::new(appsrc),
51 sample_rate: Cell::new(DEFAULT_SAMPLE_RATE),
52 audio_info: RefCell::new(None),
53 sample_offset: Cell::new(0),
54 })
55 }
56}
57
58impl GStreamerAudioSink {
59 fn set_audio_info(&self, sample_rate: f32, channels: u8) -> Result<(), AudioSinkError> {
60 let audio_info = gstreamer_audio::AudioInfo::builder(
61 gstreamer_audio::AUDIO_FORMAT_F32,
62 sample_rate as u32,
63 channels.into(),
64 )
65 .build()
66 .map_err(|error| AudioSinkError::Backend(format!("AudioInfo failed: {error:?}")))?;
67 self.appsrc.set_caps(audio_info.to_caps().ok().as_ref());
68 *self.audio_info.borrow_mut() = Some(audio_info);
69 Ok(())
70 }
71
72 fn set_channels_if_changed(&self, channels: u8) -> Result<(), AudioSinkError> {
73 let curr_channels = match self.audio_info.borrow().as_ref() {
74 Some(ch) => ch.channels(),
75 _ => {
76 return Ok(());
77 },
78 };
79 if channels != curr_channels as u8 {
80 self.set_audio_info(self.sample_rate.get(), channels)?;
81 }
82 Ok(())
83 }
84}
85
86impl AudioSink for GStreamerAudioSink {
87 fn init(
88 &self,
89 sample_rate: f32,
90 graph_thread_channel: Sender<AudioRenderThreadMsg>,
91 ) -> Result<(), AudioSinkError> {
92 self.sample_rate.set(sample_rate);
93 self.set_audio_info(sample_rate, 2)?;
94 self.appsrc.set_format(gstreamer::Format::Time);
95
96 self.appsrc.set_max_bytes(1);
98
99 let appsrc = self.appsrc.clone();
100 Builder::new()
101 .name("GstAppSrcCallbacks".to_owned())
102 .spawn(move || {
103 let need_data = move |_: &AppSrc, _: u32| {
104 if let Err(e) = graph_thread_channel.send(AudioRenderThreadMsg::SinkNeedData) {
105 log::warn!("Error sending need data event: {:?}", e);
106 }
107 };
108 appsrc.set_callbacks(AppSrcCallbacks::builder().need_data(need_data).build());
109 })
110 .unwrap();
111
112 let appsrc = self.appsrc.as_ref().clone().upcast();
113 let resample = gstreamer::ElementFactory::make("audioresample")
114 .build()
115 .map_err(|error| {
116 AudioSinkError::Backend(format!("audioresample creation failed: {error:?}"))
117 })?;
118 let convert = gstreamer::ElementFactory::make("audioconvert")
119 .build()
120 .map_err(|error| {
121 AudioSinkError::Backend(format!("audioconvert creation failed: {error:?}"))
122 })?;
123 let sink = gstreamer::ElementFactory::make("autoaudiosink")
124 .build()
125 .map_err(|error| {
126 AudioSinkError::Backend(format!("autoaudiosink creation failed: {error:?}"))
127 })?;
128 self.pipeline
129 .add_many([&appsrc, &resample, &convert, &sink])
130 .map_err(|error| AudioSinkError::Backend(error.to_string()))?;
131 gstreamer::Element::link_many([&appsrc, &resample, &convert, &sink])
132 .map_err(|error| AudioSinkError::Backend(error.to_string()))?;
133
134 Ok(())
135 }
136
137 fn init_stream(
138 &self,
139 channels: u8,
140 sample_rate: f32,
141 socket: Box<dyn MediaSocket>,
142 ) -> Result<(), AudioSinkError> {
143 self.sample_rate.set(sample_rate);
144 self.set_audio_info(sample_rate, channels)?;
145 self.appsrc.set_format(gstreamer::Format::Time);
146
147 let appsrc = self.appsrc.as_ref().clone().upcast();
150 let convert = gstreamer::ElementFactory::make("audioconvert")
151 .build()
152 .map_err(|error| {
153 AudioSinkError::Backend(format!("audioconvert creation failed: {error:?}"))
154 })?;
155 let sink = socket
156 .as_any()
157 .downcast_ref::<GstreamerMediaSocket>()
158 .unwrap()
159 .proxy_sink()
160 .clone();
161
162 self.pipeline
163 .add_many([&appsrc, &convert, &sink])
164 .map_err(|error| AudioSinkError::Backend(error.to_string()))?;
165 gstreamer::Element::link_many([&appsrc, &convert, &sink])
166 .map_err(|error| AudioSinkError::Backend(error.to_string()))?;
167
168 Ok(())
169 }
170
171 fn play(&self) -> Result<(), AudioSinkError> {
172 self.pipeline
173 .set_state(gstreamer::State::Playing)
174 .map(|_| ())
175 .map_err(|_| AudioSinkError::StateChangeFailed)
176 }
177
178 fn stop(&self) -> Result<(), AudioSinkError> {
179 self.pipeline
180 .set_state(gstreamer::State::Paused)
181 .map(|_| ())
182 .map_err(|_| AudioSinkError::StateChangeFailed)
183 }
184
185 fn has_enough_data(&self) -> bool {
186 self.appsrc.current_level_bytes() >= self.appsrc.max_bytes()
187 }
188
189 fn push_data(&self, mut chunk: Chunk) -> Result<(), AudioSinkError> {
190 if let Some(block) = chunk.blocks.first() {
191 self.set_channels_if_changed(block.chan_count())?;
192 }
193
194 let sample_rate = self.sample_rate.get() as u64;
195 let audio_info = self.audio_info.borrow();
196 let audio_info = audio_info.as_ref().unwrap();
197 let channels = audio_info.channels();
198 let bpf = audio_info.bpf() as usize;
199 assert_eq!(bpf, 4 * channels as usize);
200 let n_samples = FRAMES_PER_BLOCK.0;
201 let buf_size = (n_samples as usize) * (bpf);
202 let mut buffer = gstreamer::Buffer::with_size(buf_size).unwrap();
203 {
204 let buffer = buffer.get_mut().unwrap();
205 let mut sample_offset = self.sample_offset.get();
206 let pts = gstreamer::ClockTime::from_nseconds(
210 sample_offset
211 .mul_div_floor(gstreamer::ClockTime::SECOND.nseconds(), sample_rate)
212 .unwrap(),
213 );
214 let next_pts: gstreamer::ClockTime = gstreamer::ClockTime::from_nseconds(
215 (sample_offset + n_samples)
216 .mul_div_floor(gstreamer::ClockTime::SECOND.nseconds(), sample_rate)
217 .unwrap(),
218 );
219 buffer.set_pts(Some(pts));
220 buffer.set_duration(next_pts - pts);
221
222 if chunk.is_empty() {
224 chunk.blocks.push(Default::default());
225 chunk.blocks[0].repeat(channels as u8);
226 }
227 debug_assert!(chunk.len() == 1);
228 let mut data = chunk.blocks[0].interleave();
229 let data = data.as_mut_byte_slice();
230
231 buffer.copy_from_slice(0, data).expect("copying failed");
235
236 sample_offset += n_samples;
237 self.sample_offset.set(sample_offset);
238 }
239
240 self.appsrc
241 .push_buffer(buffer)
242 .map(|_| ())
243 .map_err(|_| AudioSinkError::BufferPushFailed)
244 }
245
246 fn set_eos_callback(&self, _: Box<dyn Fn(Box<dyn AsRef<[f32]>>) + Send + Sync + 'static>) {}
247}
248
249impl Drop for GStreamerAudioSink {
250 fn drop(&mut self) {
251 let _ = self.stop();
252 }
253}