Skip to main content

servo_media_gstreamer/
audio_decoder.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
4
5use std::io::{Cursor, Read};
6use std::sync::{Arc, Mutex, mpsc};
7
8use byte_slice_cast::*;
9use gstreamer;
10use gstreamer::prelude::*;
11use gstreamer_app;
12use gstreamer_audio;
13use servo_media_audio::decoder::{
14    AudioDecoder, AudioDecoderCallbacks, AudioDecoderError, AudioDecoderOptions,
15};
16
17pub struct GStreamerAudioDecoderProgress(
18    gstreamer::buffer::MappedBuffer<gstreamer::buffer::Readable>,
19);
20
21impl AsRef<[f32]> for GStreamerAudioDecoderProgress {
22    fn as_ref(&self) -> &[f32] {
23        self.0.as_ref().as_slice_of::<f32>().unwrap()
24    }
25}
26
27#[derive(Default)]
28pub struct GStreamerAudioDecoder {}
29
30impl GStreamerAudioDecoder {
31    pub fn new() -> Self {
32        Default::default()
33    }
34}
35
36impl AudioDecoder for GStreamerAudioDecoder {
37    fn decode(
38        &self,
39        data: Vec<u8>,
40        callbacks: AudioDecoderCallbacks,
41        options: Option<AudioDecoderOptions>,
42    ) {
43        let pipeline = gstreamer::Pipeline::new();
44        let callbacks = Arc::new(callbacks);
45
46        let appsrc = match gstreamer::ElementFactory::make("appsrc").build() {
47            Ok(appsrc) => appsrc,
48            _ => {
49                return callbacks.error(AudioDecoderError::Backend(
50                    "appsrc creation failed".to_owned(),
51                ));
52            },
53        };
54
55        let decodebin = match gstreamer::ElementFactory::make("decodebin").build() {
56            Ok(decodebin) => decodebin,
57            _ => {
58                return callbacks.error(AudioDecoderError::Backend(
59                    "decodebin creation failed".to_owned(),
60                ));
61            },
62        };
63
64        // decodebin uses something called a "sometimes-pad", which is basically
65        // a pad that will show up when a certain condition is met,
66        // in decodebins case that is media being decoded
67        if let Err(e) = pipeline.add_many([&appsrc, &decodebin]) {
68            return callbacks.error(AudioDecoderError::Backend(e.to_string()));
69        }
70
71        if let Err(e) = gstreamer::Element::link_many([&appsrc, &decodebin]) {
72            return callbacks.error(AudioDecoderError::Backend(e.to_string()));
73        }
74
75        let appsrc = appsrc.downcast::<gstreamer_app::AppSrc>().unwrap();
76
77        let options = options.unwrap_or_default();
78
79        let (sender, receiver) = mpsc::channel();
80        let sender = Arc::new(Mutex::new(sender));
81
82        let pipeline_ = pipeline.downgrade();
83        let callbacks_ = callbacks.clone();
84        let sender_ = sender.clone();
85        // Initial pipeline looks like
86        //
87        // appsrc ! decodebin2! ...
88        //
89        // We plug in the second part of the pipeline, including the deinterleave element,
90        // once the media starts being decoded.
91        decodebin.connect_pad_added(move |_, src_pad| {
92            // A decodebin pad was added, if this is an audio file,
93            // plug in a deinterleave element to separate each planar channel.
94            //
95            // Sub pipeline looks like
96            //
97            // ... decodebin2 ! audioconvert ! audioresample ! capsfilter ! deinterleave ...
98            //
99            // deinterleave also uses a sometime-pad, so we need to wait until
100            // a pad for a planar channel is added to plug in the last part of
101            // the pipeline, with the appsink that will be pulling the data from
102            // each channel.
103
104            let callbacks = &callbacks_;
105            let sender = &sender_;
106            let pipeline = match pipeline_.upgrade() {
107                Some(pipeline) => pipeline,
108                None => {
109                    callbacks.error(AudioDecoderError::Backend(
110                        "Pipeline failed upgrade".to_owned(),
111                    ));
112                    let _ = sender.lock().unwrap().send(());
113                    return;
114                },
115            };
116
117            let (is_audio, caps) = {
118                let media_type = src_pad.current_caps().and_then(|caps| {
119                    caps.structure(0).map(|s| {
120                        let name = s.name();
121                        (name.starts_with("audio/"), caps.clone())
122                    })
123                });
124
125                match media_type {
126                    None => {
127                        callbacks.error(AudioDecoderError::Backend(
128                            "Failed to get media type from pad".to_owned(),
129                        ));
130                        let _ = sender.lock().unwrap().send(());
131                        return;
132                    },
133                    Some(media_type) => media_type,
134                }
135            };
136
137            if !is_audio {
138                callbacks.error(AudioDecoderError::InvalidMediaFormat);
139                let _ = sender.lock().unwrap().send(());
140                return;
141            }
142
143            let sample_audio_info = match gstreamer_audio::AudioInfo::from_caps(&caps) {
144                Ok(sample_audio_info) => sample_audio_info,
145                _ => {
146                    callbacks.error(AudioDecoderError::Backend("AudioInfo failed".to_owned()));
147                    let _ = sender.lock().unwrap().send(());
148                    return;
149                },
150            };
151            let channels = sample_audio_info.channels();
152            callbacks.ready(channels);
153
154            let insert_deinterleave = || -> Result<(), AudioDecoderError> {
155                let convert = gstreamer::ElementFactory::make("audioconvert")
156                    .build()
157                    .map_err(|error| {
158                        AudioDecoderError::Backend(format!(
159                            "audioconvert creation failed: {error:?}"
160                        ))
161                    })?;
162                let resample = gstreamer::ElementFactory::make("audioresample")
163                    .build()
164                    .map_err(|error| {
165                        AudioDecoderError::Backend(format!(
166                            "audioresample creation failed: {error:?}"
167                        ))
168                    })?;
169                let filter = gstreamer::ElementFactory::make("capsfilter")
170                    .build()
171                    .map_err(|error| {
172                        AudioDecoderError::Backend(format!("capsfilter creation failed: {error:?}"))
173                    })?;
174                let deinterleave = gstreamer::ElementFactory::make("deinterleave")
175                    .name("deinterleave")
176                    .property("keep-positions", true)
177                    .build()
178                    .map_err(|error| {
179                        AudioDecoderError::Backend(format!(
180                            "deinterleave creation failed: {error:?}"
181                        ))
182                    })?;
183
184                let pipeline_ = pipeline.downgrade();
185                let callbacks_ = callbacks.clone();
186                deinterleave.connect_pad_added(move |_, src_pad| {
187                    // A new pad for a planar channel was added in deinterleave.
188                    // Plug in an appsink so we can pull the data from each channel.
189                    //
190                    // The end of the pipeline looks like:
191                    //
192                    // ... deinterleave ! queue ! appsink.
193                    let callbacks = &callbacks_;
194                    let pipeline = match pipeline_.upgrade() {
195                        Some(pipeline) => pipeline,
196                        None => {
197                            return callbacks.error(AudioDecoderError::Backend(
198                                "Pipeline failedupgrade".to_owned(),
199                            ));
200                        },
201                    };
202                    let insert_sink = || -> Result<(), AudioDecoderError> {
203                        let queue =
204                            gstreamer::ElementFactory::make("queue")
205                                .build()
206                                .map_err(|error| {
207                                    AudioDecoderError::Backend(format!(
208                                        "queue creation failed: {error:?}"
209                                    ))
210                                })?;
211                        let sink = gstreamer::ElementFactory::make("appsink").build().map_err(
212                            |error| {
213                                AudioDecoderError::Backend(format!(
214                                    "appsink creation failed: {error:?}"
215                                ))
216                            },
217                        )?;
218                        let appsink = sink
219                            .clone()
220                            .dynamic_cast::<gstreamer_app::AppSink>()
221                            .unwrap();
222                        sink.set_property("sync", false);
223
224                        let callbacks_ = callbacks.clone();
225                        appsink.set_callbacks(
226                            gstreamer_app::AppSinkCallbacks::builder()
227                                .new_sample(move |appsink| {
228                                    let sample = appsink
229                                        .pull_sample()
230                                        .map_err(|_| gstreamer::FlowError::Eos)?;
231                                    let buffer = sample.buffer_owned().ok_or_else(|| {
232                                        callbacks_.error(AudioDecoderError::InvalidSample);
233                                        gstreamer::FlowError::Error
234                                    })?;
235
236                                    let audio_info = sample
237                                        .caps()
238                                        .and_then(|caps| {
239                                            gstreamer_audio::AudioInfo::from_caps(caps).ok()
240                                        })
241                                        .ok_or_else(|| {
242                                            callbacks_.error(AudioDecoderError::Backend(
243                                                "Could not get caps from sample".to_owned(),
244                                            ));
245                                            gstreamer::FlowError::Error
246                                        })?;
247                                    let positions = audio_info.positions().ok_or_else(|| {
248                                        callbacks_.error(AudioDecoderError::Backend(
249                                            "AudioInfo failed".to_owned(),
250                                        ));
251                                        gstreamer::FlowError::Error
252                                    })?;
253
254                                    for position in positions.iter() {
255                                        let buffer = buffer.clone();
256                                        let map = match buffer.into_mapped_buffer_readable() {
257                                            Ok(map) => map,
258                                            _ => {
259                                                callbacks_
260                                                    .error(AudioDecoderError::BufferReadFailed);
261                                                return Err(gstreamer::FlowError::Error);
262                                            },
263                                        };
264                                        let progress = Box::new(GStreamerAudioDecoderProgress(map));
265                                        let channel = position.to_mask() as u32;
266                                        callbacks_.progress(progress, channel);
267                                    }
268
269                                    Ok(gstreamer::FlowSuccess::Ok)
270                                })
271                                .build(),
272                        );
273
274                        let elements = &[&queue, &sink];
275                        pipeline
276                            .add_many(elements)
277                            .map_err(|e| AudioDecoderError::Backend(e.to_string()))?;
278                        gstreamer::Element::link_many(elements)
279                            .map_err(|e| AudioDecoderError::Backend(e.to_string()))?;
280
281                        for e in elements {
282                            e.sync_state_with_parent()
283                                .map_err(|e| AudioDecoderError::Backend(e.to_string()))?;
284                        }
285
286                        let sink_pad = queue.static_pad("sink").ok_or(
287                            AudioDecoderError::Backend("Could not get static pad sink".to_owned()),
288                        )?;
289                        src_pad.link(&sink_pad).map(|_| ()).map_err(|e| {
290                            AudioDecoderError::Backend(format!("Sink pad link failed: {}", e))
291                        })
292                    };
293
294                    if let Err(e) = insert_sink() {
295                        callbacks.error(e);
296                    }
297                });
298
299                let mut audio_info_builder = gstreamer_audio::AudioInfo::builder(
300                    gstreamer_audio::AUDIO_FORMAT_F32,
301                    options.sample_rate as u32,
302                    channels,
303                );
304                if let Some(positions) = sample_audio_info.positions() {
305                    audio_info_builder = audio_info_builder.positions(positions);
306                }
307                let audio_info = audio_info_builder.build().map_err(|error| {
308                    AudioDecoderError::Backend(format!("AudioInfo failed: {error:?}"))
309                })?;
310                let caps = audio_info.to_caps().map_err(|error| {
311                    AudioDecoderError::Backend(format!("AudioInfo failed: {error:?}"))
312                })?;
313                filter.set_property("caps", caps);
314
315                let elements = &[&convert, &resample, &filter, &deinterleave];
316                pipeline
317                    .add_many(elements)
318                    .map_err(|e| AudioDecoderError::Backend(e.to_string()))?;
319                gstreamer::Element::link_many(elements)
320                    .map_err(|e| AudioDecoderError::Backend(e.to_string()))?;
321
322                for e in elements {
323                    e.sync_state_with_parent()
324                        .map_err(|e| AudioDecoderError::Backend(e.to_string()))?;
325                }
326
327                let sink_pad = convert
328                    .static_pad("sink")
329                    .ok_or(AudioDecoderError::Backend(
330                        "Get static pad sink failed".to_owned(),
331                    ))?;
332                src_pad
333                    .link(&sink_pad)
334                    .map(|_| ())
335                    .map_err(|e| AudioDecoderError::Backend(format!("Sink pad link failed: {}", e)))
336            };
337
338            if let Err(e) = insert_deinterleave() {
339                callbacks.error(e);
340                let _ = sender.lock().unwrap().send(());
341            }
342        });
343
344        appsrc.set_format(gstreamer::Format::Bytes);
345        appsrc.set_block(true);
346
347        let bus = match pipeline.bus() {
348            Some(bus) => bus,
349            None => {
350                callbacks.error(AudioDecoderError::Backend(
351                    "Pipeline without bus. Shouldn't happen!".to_owned(),
352                ));
353                let _ = sender.lock().unwrap().send(());
354                return;
355            },
356        };
357
358        let callbacks_ = callbacks.clone();
359        bus.set_sync_handler(move |_, msg| {
360            use gstreamer::MessageView;
361
362            match msg.view() {
363                MessageView::Error(e) => {
364                    callbacks_.error(AudioDecoderError::Backend(
365                        e.debug()
366                            .map(|d| d.to_string())
367                            .unwrap_or_else(|| "Unknown".to_owned()),
368                    ));
369                    let _ = sender.lock().unwrap().send(());
370                },
371                MessageView::Eos(_) => {
372                    callbacks_.eos();
373                    let _ = sender.lock().unwrap().send(());
374                },
375                _ => (),
376            }
377            gstreamer::BusSyncReply::Drop
378        });
379
380        if pipeline.set_state(gstreamer::State::Playing).is_err() {
381            callbacks.error(AudioDecoderError::StateChangeFailed);
382            return;
383        }
384
385        let max_bytes = appsrc.max_bytes() as usize;
386        let data_len = data.len();
387        let mut reader = Cursor::new(data);
388        while (reader.position() as usize) < data_len {
389            let data_left = data_len - reader.position() as usize;
390            let buffer_size = if data_left < max_bytes {
391                data_left
392            } else {
393                max_bytes
394            };
395            let mut buffer = gstreamer::Buffer::with_size(buffer_size).unwrap();
396            {
397                let buffer = buffer.get_mut().unwrap();
398                let mut map = buffer.map_writable().unwrap();
399                let buffer = map.as_mut_slice();
400                let _ = reader.read(buffer);
401            }
402            let _ = appsrc.push_buffer(buffer);
403        }
404        let _ = appsrc.end_of_stream();
405
406        // Wait until we get an error or EOS.
407        receiver.recv().unwrap();
408        let _ = pipeline.set_state(gstreamer::State::Null);
409    }
410}