servo_media_gstreamer/
audio_decoder.rs1use byte_slice_cast::*;
2use gst;
3use gst::prelude::*;
4use gst_app;
5use gst_audio;
6use servo_media_audio::decoder::{AudioDecoder, AudioDecoderCallbacks};
7use servo_media_audio::decoder::{AudioDecoderError, AudioDecoderOptions};
8use std::io::Cursor;
9use std::io::Read;
10use std::sync::{mpsc, Arc, Mutex};
11
12pub struct GStreamerAudioDecoderProgress(gst::buffer::MappedBuffer<gst::buffer::Readable>);
13
14impl AsRef<[f32]> for GStreamerAudioDecoderProgress {
15 fn as_ref(&self) -> &[f32] {
16 self.0.as_ref().as_slice_of::<f32>().unwrap()
17 }
18}
19
20pub struct GStreamerAudioDecoder {}
21
22impl GStreamerAudioDecoder {
23 pub fn new() -> Self {
24 Self {}
25 }
26}
27
28impl AudioDecoder for GStreamerAudioDecoder {
29 fn decode(
30 &self,
31 data: Vec<u8>,
32 callbacks: AudioDecoderCallbacks,
33 options: Option<AudioDecoderOptions>,
34 ) {
35 let pipeline = gst::Pipeline::new();
36 let callbacks = Arc::new(callbacks);
37
38 let appsrc = match gst::ElementFactory::make("appsrc").build() {
39 Ok(appsrc) => appsrc,
40 _ => {
41 return callbacks.error(AudioDecoderError::Backend(
42 "appsrc creation failed".to_owned(),
43 ));
44 }
45 };
46
47 let decodebin = match gst::ElementFactory::make("decodebin").build() {
48 Ok(decodebin) => decodebin,
49 _ => {
50 return callbacks.error(AudioDecoderError::Backend(
51 "decodebin creation failed".to_owned(),
52 ));
53 }
54 };
55
56 if let Err(e) = pipeline.add_many(&[&appsrc, &decodebin]) {
60 return callbacks.error(AudioDecoderError::Backend(e.to_string()));
61 }
62
63 if let Err(e) = gst::Element::link_many(&[&appsrc, &decodebin]) {
64 return callbacks.error(AudioDecoderError::Backend(e.to_string()));
65 }
66
67 let appsrc = appsrc.downcast::<gst_app::AppSrc>().unwrap();
68
69 let options = options.unwrap_or_default();
70
71 let (sender, receiver) = mpsc::channel();
72 let sender = Arc::new(Mutex::new(sender));
73
74 let pipeline_ = pipeline.downgrade();
75 let callbacks_ = callbacks.clone();
76 let sender_ = sender.clone();
77 decodebin.connect_pad_added(move |_, src_pad| {
84 let callbacks = &callbacks_;
97 let sender = &sender_;
98 let pipeline = match pipeline_.upgrade() {
99 Some(pipeline) => pipeline,
100 None => {
101 callbacks.error(AudioDecoderError::Backend(
102 "Pipeline failed upgrade".to_owned(),
103 ));
104 let _ = sender.lock().unwrap().send(());
105 return;
106 }
107 };
108
109 let (is_audio, caps) = {
110 let media_type = src_pad.current_caps().and_then(|caps| {
111 caps.structure(0).map(|s| {
112 let name = s.name();
113 (name.starts_with("audio/"), caps.clone())
114 })
115 });
116
117 match media_type {
118 None => {
119 callbacks.error(AudioDecoderError::Backend(
120 "Failed to get media type from pad".to_owned(),
121 ));
122 let _ = sender.lock().unwrap().send(());
123 return;
124 }
125 Some(media_type) => media_type,
126 }
127 };
128
129 if !is_audio {
130 callbacks.error(AudioDecoderError::InvalidMediaFormat);
131 let _ = sender.lock().unwrap().send(());
132 return;
133 }
134
135 let sample_audio_info = match gst_audio::AudioInfo::from_caps(&caps) {
136 Ok(sample_audio_info) => sample_audio_info,
137 _ => {
138 callbacks.error(AudioDecoderError::Backend("AudioInfo failed".to_owned()));
139 let _ = sender.lock().unwrap().send(());
140 return;
141 }
142 };
143 let channels = sample_audio_info.channels();
144 callbacks.ready(channels);
145
146 let insert_deinterleave = || -> Result<(), AudioDecoderError> {
147 let convert = gst::ElementFactory::make("audioconvert")
148 .build()
149 .map_err(|error| {
150 AudioDecoderError::Backend(format!("audioconvert creation failed: {error:?}"))
151 })?;
152 let resample =
153 gst::ElementFactory::make("audioresample")
154 .build()
155 .map_err(|error| {
156 AudioDecoderError::Backend(format!("audioresample creation failed: {error:?}"))
157 })?;
158 let filter = gst::ElementFactory::make("capsfilter")
159 .build()
160 .map_err(|error| {
161 AudioDecoderError::Backend(format!("capsfilter creation failed: {error:?}"))
162 })?;
163 let deinterleave = gst::ElementFactory::make("deinterleave")
164 .name("deinterleave")
165 .property("keep-positions", true)
166 .build()
167 .map_err(|error| {
168 AudioDecoderError::Backend(format!("deinterleave creation failed: {error:?}"))
169 })?;
170
171 let pipeline_ = pipeline.downgrade();
172 let callbacks_ = callbacks.clone();
173 deinterleave.connect_pad_added(move |_, src_pad| {
174 let callbacks = &callbacks_;
181 let pipeline = match pipeline_.upgrade() {
182 Some(pipeline) => pipeline,
183 None => {
184 return callbacks.error(AudioDecoderError::Backend(
185 "Pipeline failedupgrade".to_owned(),
186 ));
187 }
188 };
189 let insert_sink = || -> Result<(), AudioDecoderError> {
190 let queue = gst::ElementFactory::make("queue").build().map_err(|error| {
191 AudioDecoderError::Backend(format!("queue creation failed: {error:?}"))
192 })?;
193 let sink = gst::ElementFactory::make("appsink").build().map_err(|error| {
194 AudioDecoderError::Backend(format!("appsink creation failed: {error:?}"))
195 })?;
196 let appsink = sink.clone().dynamic_cast::<gst_app::AppSink>().unwrap();
197 sink.set_property("sync", false);
198
199 let callbacks_ = callbacks.clone();
200 appsink.set_callbacks(
201 gst_app::AppSinkCallbacks::builder()
202 .new_sample(move |appsink| {
203 let sample =
204 appsink.pull_sample().map_err(|_| gst::FlowError::Eos)?;
205 let buffer = sample.buffer_owned().ok_or_else(|| {
206 callbacks_.error(AudioDecoderError::InvalidSample);
207 gst::FlowError::Error
208 })?;
209
210 let audio_info = sample
211 .caps()
212 .and_then(|caps| gst_audio::AudioInfo::from_caps(caps).ok())
213 .ok_or_else(|| {
214 callbacks_.error(AudioDecoderError::Backend(
215 "Could not get caps from sample".to_owned(),
216 ));
217 gst::FlowError::Error
218 })?;
219 let positions = audio_info.positions().ok_or_else(|| {
220 callbacks_.error(AudioDecoderError::Backend(
221 "AudioInfo failed".to_owned(),
222 ));
223 gst::FlowError::Error
224 })?;
225
226 for position in positions.iter() {
227 let buffer = buffer.clone();
228 let map = if let Ok(map) =
229 buffer.into_mapped_buffer_readable()
230 {
231 map
232 } else {
233 callbacks_.error(AudioDecoderError::BufferReadFailed);
234 return Err(gst::FlowError::Error);
235 };
236 let progress = Box::new(GStreamerAudioDecoderProgress(map));
237 let channel = position.to_mask() as u32;
238 callbacks_.progress(progress, channel);
239 }
240
241 Ok(gst::FlowSuccess::Ok)
242 })
243 .build(),
244 );
245
246 let elements = &[&queue, &sink];
247 pipeline
248 .add_many(elements)
249 .map_err(|e| AudioDecoderError::Backend(e.to_string()))?;
250 gst::Element::link_many(elements)
251 .map_err(|e| AudioDecoderError::Backend(e.to_string()))?;
252
253 for e in elements {
254 e.sync_state_with_parent()
255 .map_err(|e| AudioDecoderError::Backend(e.to_string()))?;
256 }
257
258 let sink_pad = queue.static_pad("sink").ok_or(
259 AudioDecoderError::Backend("Could not get static pad sink".to_owned()),
260 )?;
261 src_pad.link(&sink_pad).map(|_| ()).map_err(|e| {
262 AudioDecoderError::Backend(format!("Sink pad link failed: {}", e))
263 })
264 };
265
266 if let Err(e) = insert_sink() {
267 callbacks.error(e);
268 }
269 });
270
271 let mut audio_info_builder = gst_audio::AudioInfo::builder(
272 gst_audio::AUDIO_FORMAT_F32,
273 options.sample_rate as u32,
274 channels,
275 );
276 if let Some(positions) = sample_audio_info.positions() {
277 audio_info_builder = audio_info_builder.positions(positions);
278 }
279 let audio_info = audio_info_builder
280 .build()
281 .map_err(|error| AudioDecoderError::Backend(format!("AudioInfo failed: {error:?}")))?;
282 let caps = audio_info
283 .to_caps()
284 .map_err(|error| AudioDecoderError::Backend(format!("AudioInfo failed: {error:?}")))?;
285 filter.set_property("caps", caps);
286
287 let elements = &[&convert, &resample, &filter, &deinterleave];
288 pipeline
289 .add_many(elements)
290 .map_err(|e| AudioDecoderError::Backend(e.to_string()))?;
291 gst::Element::link_many(elements)
292 .map_err(|e| AudioDecoderError::Backend(e.to_string()))?;
293
294 for e in elements {
295 e.sync_state_with_parent()
296 .map_err(|e| AudioDecoderError::Backend(e.to_string()))?;
297 }
298
299 let sink_pad = convert
300 .static_pad("sink")
301 .ok_or(AudioDecoderError::Backend(
302 "Get static pad sink failed".to_owned(),
303 ))?;
304 src_pad
305 .link(&sink_pad)
306 .map(|_| ())
307 .map_err(|e| AudioDecoderError::Backend(format!("Sink pad link failed: {}", e)))
308 };
309
310 if let Err(e) = insert_deinterleave() {
311 callbacks.error(e);
312 let _ = sender.lock().unwrap().send(());
313 }
314 });
315
316 appsrc.set_format(gst::Format::Bytes);
317 appsrc.set_block(true);
318
319 let bus = match pipeline.bus() {
320 Some(bus) => bus,
321 None => {
322 callbacks.error(AudioDecoderError::Backend(
323 "Pipeline without bus. Shouldn't happen!".to_owned(),
324 ));
325 let _ = sender.lock().unwrap().send(());
326 return;
327 }
328 };
329
330 let callbacks_ = callbacks.clone();
331 bus.set_sync_handler(move |_, msg| {
332 use gst::MessageView;
333
334 match msg.view() {
335 MessageView::Error(e) => {
336 callbacks_.error(AudioDecoderError::Backend(
337 e.debug()
338 .map(|d| d.to_string())
339 .unwrap_or_else(|| "Unknown".to_owned()),
340 ));
341 let _ = sender.lock().unwrap().send(());
342 }
343 MessageView::Eos(_) => {
344 callbacks_.eos();
345 let _ = sender.lock().unwrap().send(());
346 }
347 _ => (),
348 }
349 gst::BusSyncReply::Drop
350 });
351
352 if pipeline.set_state(gst::State::Playing).is_err() {
353 callbacks.error(AudioDecoderError::StateChangeFailed);
354 return;
355 }
356
357 let max_bytes = appsrc.max_bytes() as usize;
358 let data_len = data.len();
359 let mut reader = Cursor::new(data);
360 while (reader.position() as usize) < data_len {
361 let data_left = data_len - reader.position() as usize;
362 let buffer_size = if data_left < max_bytes {
363 data_left
364 } else {
365 max_bytes
366 };
367 let mut buffer = gst::Buffer::with_size(buffer_size).unwrap();
368 {
369 let buffer = buffer.get_mut().unwrap();
370 let mut map = buffer.map_writable().unwrap();
371 let buffer = map.as_mut_slice();
372 let _ = reader.read(buffer);
373 }
374 let _ = appsrc.push_buffer(buffer);
375 }
376 let _ = appsrc.end_of_stream();
377
378 receiver.recv().unwrap();
380 let _ = pipeline.set_state(gst::State::Null);
381 }
382}