servo_media_gstreamer/
audio_decoder.rs

1use byte_slice_cast::*;
2use gst;
3use gst::prelude::*;
4use gst_app;
5use gst_audio;
6use servo_media_audio::decoder::{AudioDecoder, AudioDecoderCallbacks};
7use servo_media_audio::decoder::{AudioDecoderError, AudioDecoderOptions};
8use std::io::Cursor;
9use std::io::Read;
10use std::sync::{mpsc, Arc, Mutex};
11
12pub struct GStreamerAudioDecoderProgress(gst::buffer::MappedBuffer<gst::buffer::Readable>);
13
14impl AsRef<[f32]> for GStreamerAudioDecoderProgress {
15    fn as_ref(&self) -> &[f32] {
16        self.0.as_ref().as_slice_of::<f32>().unwrap()
17    }
18}
19
20pub struct GStreamerAudioDecoder {}
21
22impl GStreamerAudioDecoder {
23    pub fn new() -> Self {
24        Self {}
25    }
26}
27
28impl AudioDecoder for GStreamerAudioDecoder {
29    fn decode(
30        &self,
31        data: Vec<u8>,
32        callbacks: AudioDecoderCallbacks,
33        options: Option<AudioDecoderOptions>,
34    ) {
35        let pipeline = gst::Pipeline::new();
36        let callbacks = Arc::new(callbacks);
37
38        let appsrc = match gst::ElementFactory::make("appsrc").build() {
39            Ok(appsrc) => appsrc,
40            _ => {
41                return callbacks.error(AudioDecoderError::Backend(
42                    "appsrc creation failed".to_owned(),
43                ));
44            },
45        };
46
47        let decodebin = match gst::ElementFactory::make("decodebin").build() {
48            Ok(decodebin) => decodebin,
49            _ => {
50                return callbacks.error(AudioDecoderError::Backend(
51                    "decodebin creation failed".to_owned(),
52                ));
53            },
54        };
55
56        // decodebin uses something called a "sometimes-pad", which is basically
57        // a pad that will show up when a certain condition is met,
58        // in decodebins case that is media being decoded
59        if let Err(e) = pipeline.add_many(&[&appsrc, &decodebin]) {
60            return callbacks.error(AudioDecoderError::Backend(e.to_string()));
61        }
62
63        if let Err(e) = gst::Element::link_many(&[&appsrc, &decodebin]) {
64            return callbacks.error(AudioDecoderError::Backend(e.to_string()));
65        }
66
67        let appsrc = appsrc.downcast::<gst_app::AppSrc>().unwrap();
68
69        let options = options.unwrap_or_default();
70
71        let (sender, receiver) = mpsc::channel();
72        let sender = Arc::new(Mutex::new(sender));
73
74        let pipeline_ = pipeline.downgrade();
75        let callbacks_ = callbacks.clone();
76        let sender_ = sender.clone();
77        // Initial pipeline looks like
78        //
79        // appsrc ! decodebin2! ...
80        //
81        // We plug in the second part of the pipeline, including the deinterleave element,
82        // once the media starts being decoded.
83        decodebin.connect_pad_added(move |_, src_pad| {
84            // A decodebin pad was added, if this is an audio file,
85            // plug in a deinterleave element to separate each planar channel.
86            //
87            // Sub pipeline looks like
88            //
89            // ... decodebin2 ! audioconvert ! audioresample ! capsfilter ! deinterleave ...
90            //
91            // deinterleave also uses a sometime-pad, so we need to wait until
92            // a pad for a planar channel is added to plug in the last part of
93            // the pipeline, with the appsink that will be pulling the data from
94            // each channel.
95
96            let callbacks = &callbacks_;
97            let sender = &sender_;
98            let pipeline = match pipeline_.upgrade() {
99                Some(pipeline) => pipeline,
100                None => {
101                    callbacks.error(AudioDecoderError::Backend(
102                        "Pipeline failed upgrade".to_owned(),
103                    ));
104                    let _ = sender.lock().unwrap().send(());
105                    return;
106                },
107            };
108
109            let (is_audio, caps) = {
110                let media_type = src_pad.current_caps().and_then(|caps| {
111                    caps.structure(0).map(|s| {
112                        let name = s.name();
113                        (name.starts_with("audio/"), caps.clone())
114                    })
115                });
116
117                match media_type {
118                    None => {
119                        callbacks.error(AudioDecoderError::Backend(
120                            "Failed to get media type from pad".to_owned(),
121                        ));
122                        let _ = sender.lock().unwrap().send(());
123                        return;
124                    },
125                    Some(media_type) => media_type,
126                }
127            };
128
129            if !is_audio {
130                callbacks.error(AudioDecoderError::InvalidMediaFormat);
131                let _ = sender.lock().unwrap().send(());
132                return;
133            }
134
135            let sample_audio_info = match gst_audio::AudioInfo::from_caps(&caps) {
136                Ok(sample_audio_info) => sample_audio_info,
137                _ => {
138                    callbacks.error(AudioDecoderError::Backend("AudioInfo failed".to_owned()));
139                    let _ = sender.lock().unwrap().send(());
140                    return;
141                },
142            };
143            let channels = sample_audio_info.channels();
144            callbacks.ready(channels);
145
146            let insert_deinterleave = || -> Result<(), AudioDecoderError> {
147                let convert =
148                    gst::ElementFactory::make("audioconvert")
149                        .build()
150                        .map_err(|error| {
151                            AudioDecoderError::Backend(format!(
152                                "audioconvert creation failed: {error:?}"
153                            ))
154                        })?;
155                let resample =
156                    gst::ElementFactory::make("audioresample")
157                        .build()
158                        .map_err(|error| {
159                            AudioDecoderError::Backend(format!(
160                                "audioresample creation failed: {error:?}"
161                            ))
162                        })?;
163                let filter = gst::ElementFactory::make("capsfilter")
164                    .build()
165                    .map_err(|error| {
166                        AudioDecoderError::Backend(format!("capsfilter creation failed: {error:?}"))
167                    })?;
168                let deinterleave = gst::ElementFactory::make("deinterleave")
169                    .name("deinterleave")
170                    .property("keep-positions", true)
171                    .build()
172                    .map_err(|error| {
173                        AudioDecoderError::Backend(format!(
174                            "deinterleave creation failed: {error:?}"
175                        ))
176                    })?;
177
178                let pipeline_ = pipeline.downgrade();
179                let callbacks_ = callbacks.clone();
180                deinterleave.connect_pad_added(move |_, src_pad| {
181                    // A new pad for a planar channel was added in deinterleave.
182                    // Plug in an appsink so we can pull the data from each channel.
183                    //
184                    // The end of the pipeline looks like:
185                    //
186                    // ... deinterleave ! queue ! appsink.
187                    let callbacks = &callbacks_;
188                    let pipeline = match pipeline_.upgrade() {
189                        Some(pipeline) => pipeline,
190                        None => {
191                            return callbacks.error(AudioDecoderError::Backend(
192                                "Pipeline failedupgrade".to_owned(),
193                            ));
194                        },
195                    };
196                    let insert_sink = || -> Result<(), AudioDecoderError> {
197                        let queue =
198                            gst::ElementFactory::make("queue")
199                                .build()
200                                .map_err(|error| {
201                                    AudioDecoderError::Backend(format!(
202                                        "queue creation failed: {error:?}"
203                                    ))
204                                })?;
205                        let sink =
206                            gst::ElementFactory::make("appsink")
207                                .build()
208                                .map_err(|error| {
209                                    AudioDecoderError::Backend(format!(
210                                        "appsink creation failed: {error:?}"
211                                    ))
212                                })?;
213                        let appsink = sink.clone().dynamic_cast::<gst_app::AppSink>().unwrap();
214                        sink.set_property("sync", false);
215
216                        let callbacks_ = callbacks.clone();
217                        appsink.set_callbacks(
218                            gst_app::AppSinkCallbacks::builder()
219                                .new_sample(move |appsink| {
220                                    let sample =
221                                        appsink.pull_sample().map_err(|_| gst::FlowError::Eos)?;
222                                    let buffer = sample.buffer_owned().ok_or_else(|| {
223                                        callbacks_.error(AudioDecoderError::InvalidSample);
224                                        gst::FlowError::Error
225                                    })?;
226
227                                    let audio_info = sample
228                                        .caps()
229                                        .and_then(|caps| gst_audio::AudioInfo::from_caps(caps).ok())
230                                        .ok_or_else(|| {
231                                            callbacks_.error(AudioDecoderError::Backend(
232                                                "Could not get caps from sample".to_owned(),
233                                            ));
234                                            gst::FlowError::Error
235                                        })?;
236                                    let positions = audio_info.positions().ok_or_else(|| {
237                                        callbacks_.error(AudioDecoderError::Backend(
238                                            "AudioInfo failed".to_owned(),
239                                        ));
240                                        gst::FlowError::Error
241                                    })?;
242
243                                    for position in positions.iter() {
244                                        let buffer = buffer.clone();
245                                        let map = if let Ok(map) =
246                                            buffer.into_mapped_buffer_readable()
247                                        {
248                                            map
249                                        } else {
250                                            callbacks_.error(AudioDecoderError::BufferReadFailed);
251                                            return Err(gst::FlowError::Error);
252                                        };
253                                        let progress = Box::new(GStreamerAudioDecoderProgress(map));
254                                        let channel = position.to_mask() as u32;
255                                        callbacks_.progress(progress, channel);
256                                    }
257
258                                    Ok(gst::FlowSuccess::Ok)
259                                })
260                                .build(),
261                        );
262
263                        let elements = &[&queue, &sink];
264                        pipeline
265                            .add_many(elements)
266                            .map_err(|e| AudioDecoderError::Backend(e.to_string()))?;
267                        gst::Element::link_many(elements)
268                            .map_err(|e| AudioDecoderError::Backend(e.to_string()))?;
269
270                        for e in elements {
271                            e.sync_state_with_parent()
272                                .map_err(|e| AudioDecoderError::Backend(e.to_string()))?;
273                        }
274
275                        let sink_pad = queue.static_pad("sink").ok_or(
276                            AudioDecoderError::Backend("Could not get static pad sink".to_owned()),
277                        )?;
278                        src_pad.link(&sink_pad).map(|_| ()).map_err(|e| {
279                            AudioDecoderError::Backend(format!("Sink pad link failed: {}", e))
280                        })
281                    };
282
283                    if let Err(e) = insert_sink() {
284                        callbacks.error(e);
285                    }
286                });
287
288                let mut audio_info_builder = gst_audio::AudioInfo::builder(
289                    gst_audio::AUDIO_FORMAT_F32,
290                    options.sample_rate as u32,
291                    channels,
292                );
293                if let Some(positions) = sample_audio_info.positions() {
294                    audio_info_builder = audio_info_builder.positions(positions);
295                }
296                let audio_info = audio_info_builder.build().map_err(|error| {
297                    AudioDecoderError::Backend(format!("AudioInfo failed: {error:?}"))
298                })?;
299                let caps = audio_info.to_caps().map_err(|error| {
300                    AudioDecoderError::Backend(format!("AudioInfo failed: {error:?}"))
301                })?;
302                filter.set_property("caps", caps);
303
304                let elements = &[&convert, &resample, &filter, &deinterleave];
305                pipeline
306                    .add_many(elements)
307                    .map_err(|e| AudioDecoderError::Backend(e.to_string()))?;
308                gst::Element::link_many(elements)
309                    .map_err(|e| AudioDecoderError::Backend(e.to_string()))?;
310
311                for e in elements {
312                    e.sync_state_with_parent()
313                        .map_err(|e| AudioDecoderError::Backend(e.to_string()))?;
314                }
315
316                let sink_pad = convert
317                    .static_pad("sink")
318                    .ok_or(AudioDecoderError::Backend(
319                        "Get static pad sink failed".to_owned(),
320                    ))?;
321                src_pad
322                    .link(&sink_pad)
323                    .map(|_| ())
324                    .map_err(|e| AudioDecoderError::Backend(format!("Sink pad link failed: {}", e)))
325            };
326
327            if let Err(e) = insert_deinterleave() {
328                callbacks.error(e);
329                let _ = sender.lock().unwrap().send(());
330            }
331        });
332
333        appsrc.set_format(gst::Format::Bytes);
334        appsrc.set_block(true);
335
336        let bus = match pipeline.bus() {
337            Some(bus) => bus,
338            None => {
339                callbacks.error(AudioDecoderError::Backend(
340                    "Pipeline without bus. Shouldn't happen!".to_owned(),
341                ));
342                let _ = sender.lock().unwrap().send(());
343                return;
344            },
345        };
346
347        let callbacks_ = callbacks.clone();
348        bus.set_sync_handler(move |_, msg| {
349            use gst::MessageView;
350
351            match msg.view() {
352                MessageView::Error(e) => {
353                    callbacks_.error(AudioDecoderError::Backend(
354                        e.debug()
355                            .map(|d| d.to_string())
356                            .unwrap_or_else(|| "Unknown".to_owned()),
357                    ));
358                    let _ = sender.lock().unwrap().send(());
359                },
360                MessageView::Eos(_) => {
361                    callbacks_.eos();
362                    let _ = sender.lock().unwrap().send(());
363                },
364                _ => (),
365            }
366            gst::BusSyncReply::Drop
367        });
368
369        if pipeline.set_state(gst::State::Playing).is_err() {
370            callbacks.error(AudioDecoderError::StateChangeFailed);
371            return;
372        }
373
374        let max_bytes = appsrc.max_bytes() as usize;
375        let data_len = data.len();
376        let mut reader = Cursor::new(data);
377        while (reader.position() as usize) < data_len {
378            let data_left = data_len - reader.position() as usize;
379            let buffer_size = if data_left < max_bytes {
380                data_left
381            } else {
382                max_bytes
383            };
384            let mut buffer = gst::Buffer::with_size(buffer_size).unwrap();
385            {
386                let buffer = buffer.get_mut().unwrap();
387                let mut map = buffer.map_writable().unwrap();
388                let buffer = map.as_mut_slice();
389                let _ = reader.read(buffer);
390            }
391            let _ = appsrc.push_buffer(buffer);
392        }
393        let _ = appsrc.end_of_stream();
394
395        // Wait until we get an error or EOS.
396        receiver.recv().unwrap();
397        let _ = pipeline.set_state(gst::State::Null);
398    }
399}