servo_media_gstreamer/
audio_decoder.rs1use byte_slice_cast::*;
2use gst;
3use gst::prelude::*;
4use gst_app;
5use gst_audio;
6use servo_media_audio::decoder::{AudioDecoder, AudioDecoderCallbacks};
7use servo_media_audio::decoder::{AudioDecoderError, AudioDecoderOptions};
8use std::io::Cursor;
9use std::io::Read;
10use std::sync::{mpsc, Arc, Mutex};
11
12pub struct GStreamerAudioDecoderProgress(gst::buffer::MappedBuffer<gst::buffer::Readable>);
13
14impl AsRef<[f32]> for GStreamerAudioDecoderProgress {
15 fn as_ref(&self) -> &[f32] {
16 self.0.as_ref().as_slice_of::<f32>().unwrap()
17 }
18}
19
20pub struct GStreamerAudioDecoder {}
21
22impl GStreamerAudioDecoder {
23 pub fn new() -> Self {
24 Self {}
25 }
26}
27
28impl AudioDecoder for GStreamerAudioDecoder {
29 fn decode(
30 &self,
31 data: Vec<u8>,
32 callbacks: AudioDecoderCallbacks,
33 options: Option<AudioDecoderOptions>,
34 ) {
35 let pipeline = gst::Pipeline::new();
36 let callbacks = Arc::new(callbacks);
37
38 let appsrc = match gst::ElementFactory::make("appsrc").build() {
39 Ok(appsrc) => appsrc,
40 _ => {
41 return callbacks.error(AudioDecoderError::Backend(
42 "appsrc creation failed".to_owned(),
43 ));
44 },
45 };
46
47 let decodebin = match gst::ElementFactory::make("decodebin").build() {
48 Ok(decodebin) => decodebin,
49 _ => {
50 return callbacks.error(AudioDecoderError::Backend(
51 "decodebin creation failed".to_owned(),
52 ));
53 },
54 };
55
56 if let Err(e) = pipeline.add_many(&[&appsrc, &decodebin]) {
60 return callbacks.error(AudioDecoderError::Backend(e.to_string()));
61 }
62
63 if let Err(e) = gst::Element::link_many(&[&appsrc, &decodebin]) {
64 return callbacks.error(AudioDecoderError::Backend(e.to_string()));
65 }
66
67 let appsrc = appsrc.downcast::<gst_app::AppSrc>().unwrap();
68
69 let options = options.unwrap_or_default();
70
71 let (sender, receiver) = mpsc::channel();
72 let sender = Arc::new(Mutex::new(sender));
73
74 let pipeline_ = pipeline.downgrade();
75 let callbacks_ = callbacks.clone();
76 let sender_ = sender.clone();
77 decodebin.connect_pad_added(move |_, src_pad| {
84 let callbacks = &callbacks_;
97 let sender = &sender_;
98 let pipeline = match pipeline_.upgrade() {
99 Some(pipeline) => pipeline,
100 None => {
101 callbacks.error(AudioDecoderError::Backend(
102 "Pipeline failed upgrade".to_owned(),
103 ));
104 let _ = sender.lock().unwrap().send(());
105 return;
106 },
107 };
108
109 let (is_audio, caps) = {
110 let media_type = src_pad.current_caps().and_then(|caps| {
111 caps.structure(0).map(|s| {
112 let name = s.name();
113 (name.starts_with("audio/"), caps.clone())
114 })
115 });
116
117 match media_type {
118 None => {
119 callbacks.error(AudioDecoderError::Backend(
120 "Failed to get media type from pad".to_owned(),
121 ));
122 let _ = sender.lock().unwrap().send(());
123 return;
124 },
125 Some(media_type) => media_type,
126 }
127 };
128
129 if !is_audio {
130 callbacks.error(AudioDecoderError::InvalidMediaFormat);
131 let _ = sender.lock().unwrap().send(());
132 return;
133 }
134
135 let sample_audio_info = match gst_audio::AudioInfo::from_caps(&caps) {
136 Ok(sample_audio_info) => sample_audio_info,
137 _ => {
138 callbacks.error(AudioDecoderError::Backend("AudioInfo failed".to_owned()));
139 let _ = sender.lock().unwrap().send(());
140 return;
141 },
142 };
143 let channels = sample_audio_info.channels();
144 callbacks.ready(channels);
145
146 let insert_deinterleave = || -> Result<(), AudioDecoderError> {
147 let convert =
148 gst::ElementFactory::make("audioconvert")
149 .build()
150 .map_err(|error| {
151 AudioDecoderError::Backend(format!(
152 "audioconvert creation failed: {error:?}"
153 ))
154 })?;
155 let resample =
156 gst::ElementFactory::make("audioresample")
157 .build()
158 .map_err(|error| {
159 AudioDecoderError::Backend(format!(
160 "audioresample creation failed: {error:?}"
161 ))
162 })?;
163 let filter = gst::ElementFactory::make("capsfilter")
164 .build()
165 .map_err(|error| {
166 AudioDecoderError::Backend(format!("capsfilter creation failed: {error:?}"))
167 })?;
168 let deinterleave = gst::ElementFactory::make("deinterleave")
169 .name("deinterleave")
170 .property("keep-positions", true)
171 .build()
172 .map_err(|error| {
173 AudioDecoderError::Backend(format!(
174 "deinterleave creation failed: {error:?}"
175 ))
176 })?;
177
178 let pipeline_ = pipeline.downgrade();
179 let callbacks_ = callbacks.clone();
180 deinterleave.connect_pad_added(move |_, src_pad| {
181 let callbacks = &callbacks_;
188 let pipeline = match pipeline_.upgrade() {
189 Some(pipeline) => pipeline,
190 None => {
191 return callbacks.error(AudioDecoderError::Backend(
192 "Pipeline failedupgrade".to_owned(),
193 ));
194 },
195 };
196 let insert_sink = || -> Result<(), AudioDecoderError> {
197 let queue =
198 gst::ElementFactory::make("queue")
199 .build()
200 .map_err(|error| {
201 AudioDecoderError::Backend(format!(
202 "queue creation failed: {error:?}"
203 ))
204 })?;
205 let sink =
206 gst::ElementFactory::make("appsink")
207 .build()
208 .map_err(|error| {
209 AudioDecoderError::Backend(format!(
210 "appsink creation failed: {error:?}"
211 ))
212 })?;
213 let appsink = sink.clone().dynamic_cast::<gst_app::AppSink>().unwrap();
214 sink.set_property("sync", false);
215
216 let callbacks_ = callbacks.clone();
217 appsink.set_callbacks(
218 gst_app::AppSinkCallbacks::builder()
219 .new_sample(move |appsink| {
220 let sample =
221 appsink.pull_sample().map_err(|_| gst::FlowError::Eos)?;
222 let buffer = sample.buffer_owned().ok_or_else(|| {
223 callbacks_.error(AudioDecoderError::InvalidSample);
224 gst::FlowError::Error
225 })?;
226
227 let audio_info = sample
228 .caps()
229 .and_then(|caps| gst_audio::AudioInfo::from_caps(caps).ok())
230 .ok_or_else(|| {
231 callbacks_.error(AudioDecoderError::Backend(
232 "Could not get caps from sample".to_owned(),
233 ));
234 gst::FlowError::Error
235 })?;
236 let positions = audio_info.positions().ok_or_else(|| {
237 callbacks_.error(AudioDecoderError::Backend(
238 "AudioInfo failed".to_owned(),
239 ));
240 gst::FlowError::Error
241 })?;
242
243 for position in positions.iter() {
244 let buffer = buffer.clone();
245 let map = if let Ok(map) =
246 buffer.into_mapped_buffer_readable()
247 {
248 map
249 } else {
250 callbacks_.error(AudioDecoderError::BufferReadFailed);
251 return Err(gst::FlowError::Error);
252 };
253 let progress = Box::new(GStreamerAudioDecoderProgress(map));
254 let channel = position.to_mask() as u32;
255 callbacks_.progress(progress, channel);
256 }
257
258 Ok(gst::FlowSuccess::Ok)
259 })
260 .build(),
261 );
262
263 let elements = &[&queue, &sink];
264 pipeline
265 .add_many(elements)
266 .map_err(|e| AudioDecoderError::Backend(e.to_string()))?;
267 gst::Element::link_many(elements)
268 .map_err(|e| AudioDecoderError::Backend(e.to_string()))?;
269
270 for e in elements {
271 e.sync_state_with_parent()
272 .map_err(|e| AudioDecoderError::Backend(e.to_string()))?;
273 }
274
275 let sink_pad = queue.static_pad("sink").ok_or(
276 AudioDecoderError::Backend("Could not get static pad sink".to_owned()),
277 )?;
278 src_pad.link(&sink_pad).map(|_| ()).map_err(|e| {
279 AudioDecoderError::Backend(format!("Sink pad link failed: {}", e))
280 })
281 };
282
283 if let Err(e) = insert_sink() {
284 callbacks.error(e);
285 }
286 });
287
288 let mut audio_info_builder = gst_audio::AudioInfo::builder(
289 gst_audio::AUDIO_FORMAT_F32,
290 options.sample_rate as u32,
291 channels,
292 );
293 if let Some(positions) = sample_audio_info.positions() {
294 audio_info_builder = audio_info_builder.positions(positions);
295 }
296 let audio_info = audio_info_builder.build().map_err(|error| {
297 AudioDecoderError::Backend(format!("AudioInfo failed: {error:?}"))
298 })?;
299 let caps = audio_info.to_caps().map_err(|error| {
300 AudioDecoderError::Backend(format!("AudioInfo failed: {error:?}"))
301 })?;
302 filter.set_property("caps", caps);
303
304 let elements = &[&convert, &resample, &filter, &deinterleave];
305 pipeline
306 .add_many(elements)
307 .map_err(|e| AudioDecoderError::Backend(e.to_string()))?;
308 gst::Element::link_many(elements)
309 .map_err(|e| AudioDecoderError::Backend(e.to_string()))?;
310
311 for e in elements {
312 e.sync_state_with_parent()
313 .map_err(|e| AudioDecoderError::Backend(e.to_string()))?;
314 }
315
316 let sink_pad = convert
317 .static_pad("sink")
318 .ok_or(AudioDecoderError::Backend(
319 "Get static pad sink failed".to_owned(),
320 ))?;
321 src_pad
322 .link(&sink_pad)
323 .map(|_| ())
324 .map_err(|e| AudioDecoderError::Backend(format!("Sink pad link failed: {}", e)))
325 };
326
327 if let Err(e) = insert_deinterleave() {
328 callbacks.error(e);
329 let _ = sender.lock().unwrap().send(());
330 }
331 });
332
333 appsrc.set_format(gst::Format::Bytes);
334 appsrc.set_block(true);
335
336 let bus = match pipeline.bus() {
337 Some(bus) => bus,
338 None => {
339 callbacks.error(AudioDecoderError::Backend(
340 "Pipeline without bus. Shouldn't happen!".to_owned(),
341 ));
342 let _ = sender.lock().unwrap().send(());
343 return;
344 },
345 };
346
347 let callbacks_ = callbacks.clone();
348 bus.set_sync_handler(move |_, msg| {
349 use gst::MessageView;
350
351 match msg.view() {
352 MessageView::Error(e) => {
353 callbacks_.error(AudioDecoderError::Backend(
354 e.debug()
355 .map(|d| d.to_string())
356 .unwrap_or_else(|| "Unknown".to_owned()),
357 ));
358 let _ = sender.lock().unwrap().send(());
359 },
360 MessageView::Eos(_) => {
361 callbacks_.eos();
362 let _ = sender.lock().unwrap().send(());
363 },
364 _ => (),
365 }
366 gst::BusSyncReply::Drop
367 });
368
369 if pipeline.set_state(gst::State::Playing).is_err() {
370 callbacks.error(AudioDecoderError::StateChangeFailed);
371 return;
372 }
373
374 let max_bytes = appsrc.max_bytes() as usize;
375 let data_len = data.len();
376 let mut reader = Cursor::new(data);
377 while (reader.position() as usize) < data_len {
378 let data_left = data_len - reader.position() as usize;
379 let buffer_size = if data_left < max_bytes {
380 data_left
381 } else {
382 max_bytes
383 };
384 let mut buffer = gst::Buffer::with_size(buffer_size).unwrap();
385 {
386 let buffer = buffer.get_mut().unwrap();
387 let mut map = buffer.map_writable().unwrap();
388 let buffer = map.as_mut_slice();
389 let _ = reader.read(buffer);
390 }
391 let _ = appsrc.push_buffer(buffer);
392 }
393 let _ = appsrc.end_of_stream();
394
395 receiver.recv().unwrap();
397 let _ = pipeline.set_state(gst::State::Null);
398 }
399}