servo_media_gstreamer/
audio_sink.rs1use crate::media_stream::GstreamerMediaSocket;
2use byte_slice_cast::*;
3use gst;
4use gst::prelude::*;
5use gst_app::{AppSrc, AppSrcCallbacks};
6use gst_audio;
7use servo_media_audio::block::{Chunk, FRAMES_PER_BLOCK};
8use servo_media_audio::render_thread::AudioRenderThreadMsg;
9use servo_media_audio::sink::{AudioSink, AudioSinkError};
10use servo_media_streams::MediaSocket;
11use std::cell::{Cell, RefCell};
12use std::sync::mpsc::Sender;
13use std::sync::Arc;
14use std::thread::Builder;
15
16const DEFAULT_SAMPLE_RATE: f32 = 44100.;
17
18pub struct GStreamerAudioSink {
19 pipeline: gst::Pipeline,
20 appsrc: Arc<AppSrc>,
21 sample_rate: Cell<f32>,
22 audio_info: RefCell<Option<gst_audio::AudioInfo>>,
23 sample_offset: Cell<u64>,
24}
25
26impl GStreamerAudioSink {
27 pub fn new() -> Result<Self, AudioSinkError> {
28 if let Some(category) = gst::DebugCategory::get("openslessink") {
29 category.set_threshold(gst::DebugLevel::Trace);
30 }
31 gst::init().map_err(|error| {
32 AudioSinkError::Backend(format!("GStreamer init failed: {error:?}"))
33 })?;
34
35 let appsrc = gst::ElementFactory::make("appsrc")
36 .build()
37 .map_err(|error| {
38 AudioSinkError::Backend(format!("appsrc creation failed: {error:?}"))
39 })?;
40 let appsrc = appsrc.downcast::<AppSrc>().unwrap();
41
42 Ok(Self {
43 pipeline: gst::Pipeline::new(),
44 appsrc: Arc::new(appsrc),
45 sample_rate: Cell::new(DEFAULT_SAMPLE_RATE),
46 audio_info: RefCell::new(None),
47 sample_offset: Cell::new(0),
48 })
49 }
50}
51
52impl GStreamerAudioSink {
53 fn set_audio_info(&self, sample_rate: f32, channels: u8) -> Result<(), AudioSinkError> {
54 let audio_info = gst_audio::AudioInfo::builder(
55 gst_audio::AUDIO_FORMAT_F32,
56 sample_rate as u32,
57 channels.into(),
58 )
59 .build()
60 .map_err(|error| AudioSinkError::Backend(format!("AudioInfo failed: {error:?}")))?;
61 self.appsrc.set_caps(audio_info.to_caps().ok().as_ref());
62 *self.audio_info.borrow_mut() = Some(audio_info);
63 Ok(())
64 }
65
66 fn set_channels_if_changed(&self, channels: u8) -> Result<(), AudioSinkError> {
67 let curr_channels = if let Some(ch) = self.audio_info.borrow().as_ref() {
68 ch.channels()
69 } else {
70 return Ok(());
71 };
72 if channels != curr_channels as u8 {
73 self.set_audio_info(self.sample_rate.get(), channels)?;
74 }
75 Ok(())
76 }
77}
78
79impl AudioSink for GStreamerAudioSink {
80 fn init(
81 &self,
82 sample_rate: f32,
83 graph_thread_channel: Sender<AudioRenderThreadMsg>,
84 ) -> Result<(), AudioSinkError> {
85 self.sample_rate.set(sample_rate);
86 self.set_audio_info(sample_rate, 2)?;
87 self.appsrc.set_format(gst::Format::Time);
88
89 self.appsrc.set_max_bytes(1);
91
92 let appsrc = self.appsrc.clone();
93 Builder::new()
94 .name("GstAppSrcCallbacks".to_owned())
95 .spawn(move || {
96 let need_data = move |_: &AppSrc, _: u32| {
97 if let Err(e) = graph_thread_channel.send(AudioRenderThreadMsg::SinkNeedData) {
98 log::warn!("Error sending need data event: {:?}", e);
99 }
100 };
101 appsrc.set_callbacks(AppSrcCallbacks::builder().need_data(need_data).build());
102 })
103 .unwrap();
104
105 let appsrc = self.appsrc.as_ref().clone().upcast();
106 let resample = gst::ElementFactory::make("audioresample")
107 .build()
108 .map_err(|error| {
109 AudioSinkError::Backend(format!("audioresample creation failed: {error:?}"))
110 })?;
111 let convert = gst::ElementFactory::make("audioconvert")
112 .build()
113 .map_err(|error| {
114 AudioSinkError::Backend(format!("audioconvert creation failed: {error:?}"))
115 })?;
116 let sink = gst::ElementFactory::make("autoaudiosink")
117 .build()
118 .map_err(|error| {
119 AudioSinkError::Backend(format!("autoaudiosink creation failed: {error:?}"))
120 })?;
121 self.pipeline
122 .add_many(&[&appsrc, &resample, &convert, &sink])
123 .map_err(|error| AudioSinkError::Backend(error.to_string()))?;
124 gst::Element::link_many(&[&appsrc, &resample, &convert, &sink])
125 .map_err(|error| AudioSinkError::Backend(error.to_string()))?;
126
127 Ok(())
128 }
129
130 fn init_stream(
131 &self,
132 channels: u8,
133 sample_rate: f32,
134 socket: Box<dyn MediaSocket>,
135 ) -> Result<(), AudioSinkError> {
136 self.sample_rate.set(sample_rate);
137 self.set_audio_info(sample_rate, channels)?;
138 self.appsrc.set_format(gst::Format::Time);
139
140 let appsrc = self.appsrc.as_ref().clone().upcast();
143 let convert = gst::ElementFactory::make("audioconvert")
144 .build()
145 .map_err(|error| {
146 AudioSinkError::Backend(format!("audioconvert creation failed: {error:?}"))
147 })?;
148 let sink = socket
149 .as_any()
150 .downcast_ref::<GstreamerMediaSocket>()
151 .unwrap()
152 .proxy_sink()
153 .clone();
154
155 self.pipeline
156 .add_many(&[&appsrc, &convert, &sink])
157 .map_err(|error| AudioSinkError::Backend(error.to_string()))?;
158 gst::Element::link_many(&[&appsrc, &convert, &sink])
159 .map_err(|error| AudioSinkError::Backend(error.to_string()))?;
160
161 Ok(())
162 }
163
164 fn play(&self) -> Result<(), AudioSinkError> {
165 self.pipeline
166 .set_state(gst::State::Playing)
167 .map(|_| ())
168 .map_err(|_| AudioSinkError::StateChangeFailed)
169 }
170
171 fn stop(&self) -> Result<(), AudioSinkError> {
172 self.pipeline
173 .set_state(gst::State::Paused)
174 .map(|_| ())
175 .map_err(|_| AudioSinkError::StateChangeFailed)
176 }
177
178 fn has_enough_data(&self) -> bool {
179 self.appsrc.current_level_bytes() >= self.appsrc.max_bytes()
180 }
181
182 fn push_data(&self, mut chunk: Chunk) -> Result<(), AudioSinkError> {
183 if let Some(block) = chunk.blocks.get(0) {
184 self.set_channels_if_changed(block.chan_count())?;
185 }
186
187 let sample_rate = self.sample_rate.get() as u64;
188 let audio_info = self.audio_info.borrow();
189 let audio_info = audio_info.as_ref().unwrap();
190 let channels = audio_info.channels();
191 let bpf = audio_info.bpf() as usize;
192 assert_eq!(bpf, 4 * channels as usize);
193 let n_samples = FRAMES_PER_BLOCK.0;
194 let buf_size = (n_samples as usize) * (bpf);
195 let mut buffer = gst::Buffer::with_size(buf_size).unwrap();
196 {
197 let buffer = buffer.get_mut().unwrap();
198 let mut sample_offset = self.sample_offset.get();
199 let pts = gst::ClockTime::from_nseconds(
203 sample_offset
204 .mul_div_floor(gst::ClockTime::SECOND.nseconds(), sample_rate)
205 .unwrap(),
206 );
207 let next_pts: gst::ClockTime = gst::ClockTime::from_nseconds(
208 (sample_offset + n_samples)
209 .mul_div_floor(gst::ClockTime::SECOND.nseconds(), sample_rate)
210 .unwrap(),
211 );
212 buffer.set_pts(Some(pts));
213 buffer.set_duration(next_pts - pts);
214
215 if chunk.len() == 0 {
217 chunk.blocks.push(Default::default());
218 chunk.blocks[0].repeat(channels as u8);
219 }
220 debug_assert!(chunk.len() == 1);
221 let mut data = chunk.blocks[0].interleave();
222 let data = data.as_mut_byte_slice().expect("casting failed");
223
224 buffer.copy_from_slice(0, data).expect("copying failed");
228
229 sample_offset += n_samples;
230 self.sample_offset.set(sample_offset);
231 }
232
233 self.appsrc
234 .push_buffer(buffer)
235 .map(|_| ())
236 .map_err(|_| AudioSinkError::BufferPushFailed)
237 }
238
239 fn set_eos_callback(&self, _: Box<dyn Fn(Box<dyn AsRef<[f32]>>) + Send + Sync + 'static>) {}
240}
241
242impl Drop for GStreamerAudioSink {
243 fn drop(&mut self) {
244 let _ = self.stop();
245 }
246}