servo_media_gstreamer/
audio_decoder.rs

1use byte_slice_cast::*;
2use gst;
3use gst::prelude::*;
4use gst_app;
5use gst_audio;
6use servo_media_audio::decoder::{AudioDecoder, AudioDecoderCallbacks};
7use servo_media_audio::decoder::{AudioDecoderError, AudioDecoderOptions};
8use std::io::Cursor;
9use std::io::Read;
10use std::sync::{mpsc, Arc, Mutex};
11
12pub struct GStreamerAudioDecoderProgress(gst::buffer::MappedBuffer<gst::buffer::Readable>);
13
14impl AsRef<[f32]> for GStreamerAudioDecoderProgress {
15    fn as_ref(&self) -> &[f32] {
16        self.0.as_ref().as_slice_of::<f32>().unwrap()
17    }
18}
19
20pub struct GStreamerAudioDecoder {}
21
22impl GStreamerAudioDecoder {
23    pub fn new() -> Self {
24        Self {}
25    }
26}
27
28impl AudioDecoder for GStreamerAudioDecoder {
29    fn decode(
30        &self,
31        data: Vec<u8>,
32        callbacks: AudioDecoderCallbacks,
33        options: Option<AudioDecoderOptions>,
34    ) {
35        let pipeline = gst::Pipeline::new();
36        let callbacks = Arc::new(callbacks);
37
38        let appsrc = match gst::ElementFactory::make("appsrc").build() {
39            Ok(appsrc) => appsrc,
40            _ => {
41                return callbacks.error(AudioDecoderError::Backend(
42                    "appsrc creation failed".to_owned(),
43                ));
44            }
45        };
46
47        let decodebin = match gst::ElementFactory::make("decodebin").build() {
48            Ok(decodebin) => decodebin,
49            _ => {
50                return callbacks.error(AudioDecoderError::Backend(
51                    "decodebin creation failed".to_owned(),
52                ));
53            }
54        };
55
56        // decodebin uses something called a "sometimes-pad", which is basically
57        // a pad that will show up when a certain condition is met,
58        // in decodebins case that is media being decoded
59        if let Err(e) = pipeline.add_many(&[&appsrc, &decodebin]) {
60            return callbacks.error(AudioDecoderError::Backend(e.to_string()));
61        }
62
63        if let Err(e) = gst::Element::link_many(&[&appsrc, &decodebin]) {
64            return callbacks.error(AudioDecoderError::Backend(e.to_string()));
65        }
66
67        let appsrc = appsrc.downcast::<gst_app::AppSrc>().unwrap();
68
69        let options = options.unwrap_or_default();
70
71        let (sender, receiver) = mpsc::channel();
72        let sender = Arc::new(Mutex::new(sender));
73
74        let pipeline_ = pipeline.downgrade();
75        let callbacks_ = callbacks.clone();
76        let sender_ = sender.clone();
77        // Initial pipeline looks like
78        //
79        // appsrc ! decodebin2! ...
80        //
81        // We plug in the second part of the pipeline, including the deinterleave element,
82        // once the media starts being decoded.
83        decodebin.connect_pad_added(move |_, src_pad| {
84            // A decodebin pad was added, if this is an audio file,
85            // plug in a deinterleave element to separate each planar channel.
86            //
87            // Sub pipeline looks like
88            //
89            // ... decodebin2 ! audioconvert ! audioresample ! capsfilter ! deinterleave ...
90            //
91            // deinterleave also uses a sometime-pad, so we need to wait until
92            // a pad for a planar channel is added to plug in the last part of
93            // the pipeline, with the appsink that will be pulling the data from
94            // each channel.
95
96            let callbacks = &callbacks_;
97            let sender = &sender_;
98            let pipeline = match pipeline_.upgrade() {
99                Some(pipeline) => pipeline,
100                None => {
101                    callbacks.error(AudioDecoderError::Backend(
102                        "Pipeline failed upgrade".to_owned(),
103                    ));
104                    let _ = sender.lock().unwrap().send(());
105                    return;
106                }
107            };
108
109            let (is_audio, caps) = {
110                let media_type = src_pad.current_caps().and_then(|caps| {
111                    caps.structure(0).map(|s| {
112                        let name = s.name();
113                        (name.starts_with("audio/"), caps.clone())
114                    })
115                });
116
117                match media_type {
118                    None => {
119                        callbacks.error(AudioDecoderError::Backend(
120                            "Failed to get media type from pad".to_owned(),
121                        ));
122                        let _ = sender.lock().unwrap().send(());
123                        return;
124                    }
125                    Some(media_type) => media_type,
126                }
127            };
128
129            if !is_audio {
130                callbacks.error(AudioDecoderError::InvalidMediaFormat);
131                let _ = sender.lock().unwrap().send(());
132                return;
133            }
134
135            let sample_audio_info = match gst_audio::AudioInfo::from_caps(&caps) {
136                Ok(sample_audio_info) => sample_audio_info,
137                _ => {
138                    callbacks.error(AudioDecoderError::Backend("AudioInfo failed".to_owned()));
139                    let _ = sender.lock().unwrap().send(());
140                    return;
141                }
142            };
143            let channels = sample_audio_info.channels();
144            callbacks.ready(channels);
145
146            let insert_deinterleave = || -> Result<(), AudioDecoderError> {
147                let convert = gst::ElementFactory::make("audioconvert")
148                    .build()
149                    .map_err(|error| {
150                        AudioDecoderError::Backend(format!("audioconvert creation failed: {error:?}"))
151                    })?;
152                let resample =
153                    gst::ElementFactory::make("audioresample")
154                        .build()
155                        .map_err(|error| {
156                            AudioDecoderError::Backend(format!("audioresample creation failed: {error:?}"))
157                        })?;
158                let filter = gst::ElementFactory::make("capsfilter")
159                    .build()
160                    .map_err(|error| {
161                        AudioDecoderError::Backend(format!("capsfilter creation failed: {error:?}"))
162                    })?;
163                let deinterleave = gst::ElementFactory::make("deinterleave")
164                    .name("deinterleave")
165                    .property("keep-positions", true)
166                    .build()
167                    .map_err(|error| {
168                        AudioDecoderError::Backend(format!("deinterleave creation failed: {error:?}"))
169                    })?;
170
171                let pipeline_ = pipeline.downgrade();
172                let callbacks_ = callbacks.clone();
173                deinterleave.connect_pad_added(move |_, src_pad| {
174                    // A new pad for a planar channel was added in deinterleave.
175                    // Plug in an appsink so we can pull the data from each channel.
176                    //
177                    // The end of the pipeline looks like:
178                    //
179                    // ... deinterleave ! queue ! appsink.
180                    let callbacks = &callbacks_;
181                    let pipeline = match pipeline_.upgrade() {
182                        Some(pipeline) => pipeline,
183                        None => {
184                            return callbacks.error(AudioDecoderError::Backend(
185                                "Pipeline failedupgrade".to_owned(),
186                            ));
187                        }
188                    };
189                    let insert_sink = || -> Result<(), AudioDecoderError> {
190                        let queue = gst::ElementFactory::make("queue").build().map_err(|error| {
191                            AudioDecoderError::Backend(format!("queue creation failed: {error:?}"))
192                        })?;
193                        let sink = gst::ElementFactory::make("appsink").build().map_err(|error| {
194                            AudioDecoderError::Backend(format!("appsink creation failed: {error:?}"))
195                        })?;
196                        let appsink = sink.clone().dynamic_cast::<gst_app::AppSink>().unwrap();
197                        sink.set_property("sync", false);
198
199                        let callbacks_ = callbacks.clone();
200                        appsink.set_callbacks(
201                            gst_app::AppSinkCallbacks::builder()
202                                .new_sample(move |appsink| {
203                                    let sample =
204                                        appsink.pull_sample().map_err(|_| gst::FlowError::Eos)?;
205                                    let buffer = sample.buffer_owned().ok_or_else(|| {
206                                        callbacks_.error(AudioDecoderError::InvalidSample);
207                                        gst::FlowError::Error
208                                    })?;
209
210                                    let audio_info = sample
211                                        .caps()
212                                        .and_then(|caps| gst_audio::AudioInfo::from_caps(caps).ok())
213                                        .ok_or_else(|| {
214                                            callbacks_.error(AudioDecoderError::Backend(
215                                                "Could not get caps from sample".to_owned(),
216                                            ));
217                                            gst::FlowError::Error
218                                        })?;
219                                    let positions = audio_info.positions().ok_or_else(|| {
220                                        callbacks_.error(AudioDecoderError::Backend(
221                                            "AudioInfo failed".to_owned(),
222                                        ));
223                                        gst::FlowError::Error
224                                    })?;
225
226                                    for position in positions.iter() {
227                                        let buffer = buffer.clone();
228                                        let map = if let Ok(map) =
229                                            buffer.into_mapped_buffer_readable()
230                                        {
231                                            map
232                                        } else {
233                                            callbacks_.error(AudioDecoderError::BufferReadFailed);
234                                            return Err(gst::FlowError::Error);
235                                        };
236                                        let progress = Box::new(GStreamerAudioDecoderProgress(map));
237                                        let channel = position.to_mask() as u32;
238                                        callbacks_.progress(progress, channel);
239                                    }
240
241                                    Ok(gst::FlowSuccess::Ok)
242                                })
243                                .build(),
244                        );
245
246                        let elements = &[&queue, &sink];
247                        pipeline
248                            .add_many(elements)
249                            .map_err(|e| AudioDecoderError::Backend(e.to_string()))?;
250                        gst::Element::link_many(elements)
251                            .map_err(|e| AudioDecoderError::Backend(e.to_string()))?;
252
253                        for e in elements {
254                            e.sync_state_with_parent()
255                                .map_err(|e| AudioDecoderError::Backend(e.to_string()))?;
256                        }
257
258                        let sink_pad = queue.static_pad("sink").ok_or(
259                            AudioDecoderError::Backend("Could not get static pad sink".to_owned()),
260                        )?;
261                        src_pad.link(&sink_pad).map(|_| ()).map_err(|e| {
262                            AudioDecoderError::Backend(format!("Sink pad link failed: {}", e))
263                        })
264                    };
265
266                    if let Err(e) = insert_sink() {
267                        callbacks.error(e);
268                    }
269                });
270
271                let mut audio_info_builder = gst_audio::AudioInfo::builder(
272                    gst_audio::AUDIO_FORMAT_F32,
273                    options.sample_rate as u32,
274                    channels,
275                );
276                if let Some(positions) = sample_audio_info.positions() {
277                    audio_info_builder = audio_info_builder.positions(positions);
278                }
279                let audio_info = audio_info_builder
280                    .build()
281                    .map_err(|error| AudioDecoderError::Backend(format!("AudioInfo failed: {error:?}")))?;
282                let caps = audio_info
283                    .to_caps()
284                    .map_err(|error| AudioDecoderError::Backend(format!("AudioInfo failed: {error:?}")))?;
285                filter.set_property("caps", caps);
286
287                let elements = &[&convert, &resample, &filter, &deinterleave];
288                pipeline
289                    .add_many(elements)
290                    .map_err(|e| AudioDecoderError::Backend(e.to_string()))?;
291                gst::Element::link_many(elements)
292                    .map_err(|e| AudioDecoderError::Backend(e.to_string()))?;
293
294                for e in elements {
295                    e.sync_state_with_parent()
296                        .map_err(|e| AudioDecoderError::Backend(e.to_string()))?;
297                }
298
299                let sink_pad = convert
300                    .static_pad("sink")
301                    .ok_or(AudioDecoderError::Backend(
302                        "Get static pad sink failed".to_owned(),
303                    ))?;
304                src_pad
305                    .link(&sink_pad)
306                    .map(|_| ())
307                    .map_err(|e| AudioDecoderError::Backend(format!("Sink pad link failed: {}", e)))
308            };
309
310            if let Err(e) = insert_deinterleave() {
311                callbacks.error(e);
312                let _ = sender.lock().unwrap().send(());
313            }
314        });
315
316        appsrc.set_format(gst::Format::Bytes);
317        appsrc.set_block(true);
318
319        let bus = match pipeline.bus() {
320            Some(bus) => bus,
321            None => {
322                callbacks.error(AudioDecoderError::Backend(
323                    "Pipeline without bus. Shouldn't happen!".to_owned(),
324                ));
325                let _ = sender.lock().unwrap().send(());
326                return;
327            }
328        };
329
330        let callbacks_ = callbacks.clone();
331        bus.set_sync_handler(move |_, msg| {
332            use gst::MessageView;
333
334            match msg.view() {
335                MessageView::Error(e) => {
336                    callbacks_.error(AudioDecoderError::Backend(
337                        e.debug()
338                            .map(|d| d.to_string())
339                            .unwrap_or_else(|| "Unknown".to_owned()),
340                    ));
341                    let _ = sender.lock().unwrap().send(());
342                }
343                MessageView::Eos(_) => {
344                    callbacks_.eos();
345                    let _ = sender.lock().unwrap().send(());
346                }
347                _ => (),
348            }
349            gst::BusSyncReply::Drop
350        });
351
352        if pipeline.set_state(gst::State::Playing).is_err() {
353            callbacks.error(AudioDecoderError::StateChangeFailed);
354            return;
355        }
356
357        let max_bytes = appsrc.max_bytes() as usize;
358        let data_len = data.len();
359        let mut reader = Cursor::new(data);
360        while (reader.position() as usize) < data_len {
361            let data_left = data_len - reader.position() as usize;
362            let buffer_size = if data_left < max_bytes {
363                data_left
364            } else {
365                max_bytes
366            };
367            let mut buffer = gst::Buffer::with_size(buffer_size).unwrap();
368            {
369                let buffer = buffer.get_mut().unwrap();
370                let mut map = buffer.map_writable().unwrap();
371                let buffer = map.as_mut_slice();
372                let _ = reader.read(buffer);
373            }
374            let _ = appsrc.push_buffer(buffer);
375        }
376        let _ = appsrc.end_of_stream();
377
378        // Wait until we get an error or EOS.
379        receiver.recv().unwrap();
380        let _ = pipeline.set_state(gst::State::Null);
381    }
382}