servo_media_gstreamer/
audio_decoder.rs1use std::io::{Cursor, Read};
6use std::sync::{Arc, Mutex, mpsc};
7
8use byte_slice_cast::*;
9use gstreamer;
10use gstreamer::prelude::*;
11use gstreamer_app;
12use gstreamer_audio;
13use servo_media_audio::decoder::{
14 AudioDecoder, AudioDecoderCallbacks, AudioDecoderError, AudioDecoderOptions,
15};
16
17pub struct GStreamerAudioDecoderProgress(
18 gstreamer::buffer::MappedBuffer<gstreamer::buffer::Readable>,
19);
20
21impl AsRef<[f32]> for GStreamerAudioDecoderProgress {
22 fn as_ref(&self) -> &[f32] {
23 self.0.as_ref().as_slice_of::<f32>().unwrap()
24 }
25}
26
27#[derive(Default)]
28pub struct GStreamerAudioDecoder {}
29
30impl GStreamerAudioDecoder {
31 pub fn new() -> Self {
32 Default::default()
33 }
34}
35
36impl AudioDecoder for GStreamerAudioDecoder {
37 fn decode(
38 &self,
39 data: Vec<u8>,
40 callbacks: AudioDecoderCallbacks,
41 options: Option<AudioDecoderOptions>,
42 ) {
43 let pipeline = gstreamer::Pipeline::new();
44 let callbacks = Arc::new(callbacks);
45
46 let appsrc = match gstreamer::ElementFactory::make("appsrc").build() {
47 Ok(appsrc) => appsrc,
48 _ => {
49 return callbacks.error(AudioDecoderError::Backend(
50 "appsrc creation failed".to_owned(),
51 ));
52 },
53 };
54
55 let decodebin = match gstreamer::ElementFactory::make("decodebin").build() {
56 Ok(decodebin) => decodebin,
57 _ => {
58 return callbacks.error(AudioDecoderError::Backend(
59 "decodebin creation failed".to_owned(),
60 ));
61 },
62 };
63
64 if let Err(e) = pipeline.add_many([&appsrc, &decodebin]) {
68 return callbacks.error(AudioDecoderError::Backend(e.to_string()));
69 }
70
71 if let Err(e) = gstreamer::Element::link_many([&appsrc, &decodebin]) {
72 return callbacks.error(AudioDecoderError::Backend(e.to_string()));
73 }
74
75 let appsrc = appsrc.downcast::<gstreamer_app::AppSrc>().unwrap();
76
77 let options = options.unwrap_or_default();
78
79 let (sender, receiver) = mpsc::channel();
80 let sender = Arc::new(Mutex::new(sender));
81
82 let pipeline_ = pipeline.downgrade();
83 let callbacks_ = callbacks.clone();
84 let sender_ = sender.clone();
85 decodebin.connect_pad_added(move |_, src_pad| {
92 let callbacks = &callbacks_;
105 let sender = &sender_;
106 let pipeline = match pipeline_.upgrade() {
107 Some(pipeline) => pipeline,
108 None => {
109 callbacks.error(AudioDecoderError::Backend(
110 "Pipeline failed upgrade".to_owned(),
111 ));
112 let _ = sender.lock().unwrap().send(());
113 return;
114 },
115 };
116
117 let (is_audio, caps) = {
118 let media_type = src_pad.current_caps().and_then(|caps| {
119 caps.structure(0).map(|s| {
120 let name = s.name();
121 (name.starts_with("audio/"), caps.clone())
122 })
123 });
124
125 match media_type {
126 None => {
127 callbacks.error(AudioDecoderError::Backend(
128 "Failed to get media type from pad".to_owned(),
129 ));
130 let _ = sender.lock().unwrap().send(());
131 return;
132 },
133 Some(media_type) => media_type,
134 }
135 };
136
137 if !is_audio {
138 callbacks.error(AudioDecoderError::InvalidMediaFormat);
139 let _ = sender.lock().unwrap().send(());
140 return;
141 }
142
143 let sample_audio_info = match gstreamer_audio::AudioInfo::from_caps(&caps) {
144 Ok(sample_audio_info) => sample_audio_info,
145 _ => {
146 callbacks.error(AudioDecoderError::Backend("AudioInfo failed".to_owned()));
147 let _ = sender.lock().unwrap().send(());
148 return;
149 },
150 };
151 let channels = sample_audio_info.channels();
152 callbacks.ready(channels);
153
154 let insert_deinterleave = || -> Result<(), AudioDecoderError> {
155 let convert = gstreamer::ElementFactory::make("audioconvert")
156 .build()
157 .map_err(|error| {
158 AudioDecoderError::Backend(format!(
159 "audioconvert creation failed: {error:?}"
160 ))
161 })?;
162 let resample = gstreamer::ElementFactory::make("audioresample")
163 .build()
164 .map_err(|error| {
165 AudioDecoderError::Backend(format!(
166 "audioresample creation failed: {error:?}"
167 ))
168 })?;
169 let filter = gstreamer::ElementFactory::make("capsfilter")
170 .build()
171 .map_err(|error| {
172 AudioDecoderError::Backend(format!("capsfilter creation failed: {error:?}"))
173 })?;
174 let deinterleave = gstreamer::ElementFactory::make("deinterleave")
175 .name("deinterleave")
176 .property("keep-positions", true)
177 .build()
178 .map_err(|error| {
179 AudioDecoderError::Backend(format!(
180 "deinterleave creation failed: {error:?}"
181 ))
182 })?;
183
184 let pipeline_ = pipeline.downgrade();
185 let callbacks_ = callbacks.clone();
186 deinterleave.connect_pad_added(move |_, src_pad| {
187 let callbacks = &callbacks_;
194 let pipeline = match pipeline_.upgrade() {
195 Some(pipeline) => pipeline,
196 None => {
197 return callbacks.error(AudioDecoderError::Backend(
198 "Pipeline failedupgrade".to_owned(),
199 ));
200 },
201 };
202 let insert_sink = || -> Result<(), AudioDecoderError> {
203 let queue =
204 gstreamer::ElementFactory::make("queue")
205 .build()
206 .map_err(|error| {
207 AudioDecoderError::Backend(format!(
208 "queue creation failed: {error:?}"
209 ))
210 })?;
211 let sink = gstreamer::ElementFactory::make("appsink").build().map_err(
212 |error| {
213 AudioDecoderError::Backend(format!(
214 "appsink creation failed: {error:?}"
215 ))
216 },
217 )?;
218 let appsink = sink
219 .clone()
220 .dynamic_cast::<gstreamer_app::AppSink>()
221 .unwrap();
222 sink.set_property("sync", false);
223
224 let callbacks_ = callbacks.clone();
225 appsink.set_callbacks(
226 gstreamer_app::AppSinkCallbacks::builder()
227 .new_sample(move |appsink| {
228 let sample = appsink
229 .pull_sample()
230 .map_err(|_| gstreamer::FlowError::Eos)?;
231 let buffer = sample.buffer_owned().ok_or_else(|| {
232 callbacks_.error(AudioDecoderError::InvalidSample);
233 gstreamer::FlowError::Error
234 })?;
235
236 let audio_info = sample
237 .caps()
238 .and_then(|caps| {
239 gstreamer_audio::AudioInfo::from_caps(caps).ok()
240 })
241 .ok_or_else(|| {
242 callbacks_.error(AudioDecoderError::Backend(
243 "Could not get caps from sample".to_owned(),
244 ));
245 gstreamer::FlowError::Error
246 })?;
247 let positions = audio_info.positions().ok_or_else(|| {
248 callbacks_.error(AudioDecoderError::Backend(
249 "AudioInfo failed".to_owned(),
250 ));
251 gstreamer::FlowError::Error
252 })?;
253
254 for position in positions.iter() {
255 let buffer = buffer.clone();
256 let map = match buffer.into_mapped_buffer_readable() {
257 Ok(map) => map,
258 _ => {
259 callbacks_
260 .error(AudioDecoderError::BufferReadFailed);
261 return Err(gstreamer::FlowError::Error);
262 },
263 };
264 let progress = Box::new(GStreamerAudioDecoderProgress(map));
265 let channel = position.to_mask() as u32;
266 callbacks_.progress(progress, channel);
267 }
268
269 Ok(gstreamer::FlowSuccess::Ok)
270 })
271 .build(),
272 );
273
274 let elements = &[&queue, &sink];
275 pipeline
276 .add_many(elements)
277 .map_err(|e| AudioDecoderError::Backend(e.to_string()))?;
278 gstreamer::Element::link_many(elements)
279 .map_err(|e| AudioDecoderError::Backend(e.to_string()))?;
280
281 for e in elements {
282 e.sync_state_with_parent()
283 .map_err(|e| AudioDecoderError::Backend(e.to_string()))?;
284 }
285
286 let sink_pad = queue.static_pad("sink").ok_or(
287 AudioDecoderError::Backend("Could not get static pad sink".to_owned()),
288 )?;
289 src_pad.link(&sink_pad).map(|_| ()).map_err(|e| {
290 AudioDecoderError::Backend(format!("Sink pad link failed: {}", e))
291 })
292 };
293
294 if let Err(e) = insert_sink() {
295 callbacks.error(e);
296 }
297 });
298
299 let mut audio_info_builder = gstreamer_audio::AudioInfo::builder(
300 gstreamer_audio::AUDIO_FORMAT_F32,
301 options.sample_rate as u32,
302 channels,
303 );
304 if let Some(positions) = sample_audio_info.positions() {
305 audio_info_builder = audio_info_builder.positions(positions);
306 }
307 let audio_info = audio_info_builder.build().map_err(|error| {
308 AudioDecoderError::Backend(format!("AudioInfo failed: {error:?}"))
309 })?;
310 let caps = audio_info.to_caps().map_err(|error| {
311 AudioDecoderError::Backend(format!("AudioInfo failed: {error:?}"))
312 })?;
313 filter.set_property("caps", caps);
314
315 let elements = &[&convert, &resample, &filter, &deinterleave];
316 pipeline
317 .add_many(elements)
318 .map_err(|e| AudioDecoderError::Backend(e.to_string()))?;
319 gstreamer::Element::link_many(elements)
320 .map_err(|e| AudioDecoderError::Backend(e.to_string()))?;
321
322 for e in elements {
323 e.sync_state_with_parent()
324 .map_err(|e| AudioDecoderError::Backend(e.to_string()))?;
325 }
326
327 let sink_pad = convert
328 .static_pad("sink")
329 .ok_or(AudioDecoderError::Backend(
330 "Get static pad sink failed".to_owned(),
331 ))?;
332 src_pad
333 .link(&sink_pad)
334 .map(|_| ())
335 .map_err(|e| AudioDecoderError::Backend(format!("Sink pad link failed: {}", e)))
336 };
337
338 if let Err(e) = insert_deinterleave() {
339 callbacks.error(e);
340 let _ = sender.lock().unwrap().send(());
341 }
342 });
343
344 appsrc.set_format(gstreamer::Format::Bytes);
345 appsrc.set_block(true);
346
347 let bus = match pipeline.bus() {
348 Some(bus) => bus,
349 None => {
350 callbacks.error(AudioDecoderError::Backend(
351 "Pipeline without bus. Shouldn't happen!".to_owned(),
352 ));
353 let _ = sender.lock().unwrap().send(());
354 return;
355 },
356 };
357
358 let callbacks_ = callbacks.clone();
359 bus.set_sync_handler(move |_, msg| {
360 use gstreamer::MessageView;
361
362 match msg.view() {
363 MessageView::Error(e) => {
364 callbacks_.error(AudioDecoderError::Backend(
365 e.debug()
366 .map(|d| d.to_string())
367 .unwrap_or_else(|| "Unknown".to_owned()),
368 ));
369 let _ = sender.lock().unwrap().send(());
370 },
371 MessageView::Eos(_) => {
372 callbacks_.eos();
373 let _ = sender.lock().unwrap().send(());
374 },
375 _ => (),
376 }
377 gstreamer::BusSyncReply::Drop
378 });
379
380 if pipeline.set_state(gstreamer::State::Playing).is_err() {
381 callbacks.error(AudioDecoderError::StateChangeFailed);
382 return;
383 }
384
385 let max_bytes = appsrc.max_bytes() as usize;
386 let data_len = data.len();
387 let mut reader = Cursor::new(data);
388 while (reader.position() as usize) < data_len {
389 let data_left = data_len - reader.position() as usize;
390 let buffer_size = if data_left < max_bytes {
391 data_left
392 } else {
393 max_bytes
394 };
395 let mut buffer = gstreamer::Buffer::with_size(buffer_size).unwrap();
396 {
397 let buffer = buffer.get_mut().unwrap();
398 let mut map = buffer.map_writable().unwrap();
399 let buffer = map.as_mut_slice();
400 let _ = reader.read(buffer);
401 }
402 let _ = appsrc.push_buffer(buffer);
403 }
404 let _ = appsrc.end_of_stream();
405
406 receiver.recv().unwrap();
408 let _ = pipeline.set_state(gstreamer::State::Null);
409 }
410}