servo_media_gstreamer/
audio_stream_reader.rs

1use crate::media_stream::GStreamerMediaStream;
2use servo_media_audio::block::{Block, FRAMES_PER_BLOCK_USIZE};
3use servo_media_audio::AudioStreamReader;
4use servo_media_streams::registry::{get_stream, MediaStreamId};
5use std::sync::mpsc::{channel, Receiver};
6
7use byte_slice_cast::*;
8use gst::prelude::*;
9use gst::Fraction;
10use gst_audio::AUDIO_FORMAT_F32;
11
12pub struct GStreamerAudioStreamReader {
13    rx: Receiver<Block>,
14    pipeline: gst::Pipeline,
15}
16
17impl GStreamerAudioStreamReader {
18    pub fn new(stream: MediaStreamId, sample_rate: f32) -> Result<Self, String> {
19        let (tx, rx) = channel();
20        let stream = get_stream(&stream).unwrap();
21        let mut stream = stream.lock().unwrap();
22        let g_stream = stream
23            .as_mut_any()
24            .downcast_mut::<GStreamerMediaStream>()
25            .unwrap();
26        let element = g_stream.src_element();
27        let pipeline = g_stream.pipeline_or_new();
28        drop(stream);
29        let time_per_block = Fraction::new(FRAMES_PER_BLOCK_USIZE as i32, sample_rate as i32);
30
31        // XXXManishearth this is only necessary because of an upstream
32        // gstreamer bug. https://github.com/servo/media/pull/362#issuecomment-647947034
33        let caps = gst_audio::AudioCapsBuilder::new()
34            .layout(gst_audio::AudioLayout::Interleaved)
35            .build();
36        let capsfilter0 = gst::ElementFactory::make("capsfilter")
37            .property("caps", caps)
38            .build()
39            .map_err(|error| format!("capsfilter creation failed: {error:?}"))?;
40
41        let split = gst::ElementFactory::make("audiobuffersplit")
42            .property("output-buffer-duration", time_per_block)
43            .build()
44            .map_err(|error| format!("audiobuffersplit creation failed: {error:?}"))?;
45        let convert = gst::ElementFactory::make("audioconvert")
46            .build()
47            .map_err(|error| format!("audioconvert creation failed: {error:?}"))?;
48        let caps = gst_audio::AudioCapsBuilder::new()
49            .layout(gst_audio::AudioLayout::NonInterleaved)
50            .format(AUDIO_FORMAT_F32)
51            .rate(sample_rate as i32)
52            .build();
53        let capsfilter = gst::ElementFactory::make("capsfilter")
54            .property("caps", caps)
55            .build()
56            .map_err(|error| format!("capsfilter creation failed: {error:?}"))?;
57        let sink = gst::ElementFactory::make("appsink")
58            .property("sync", false)
59            .build()
60            .map_err(|error| format!("appsink creation failed: {error:?}"))?;
61
62        let appsink = sink.clone().dynamic_cast::<gst_app::AppSink>().unwrap();
63
64        let elements = [&element, &capsfilter0, &split, &convert, &capsfilter, &sink];
65        pipeline
66            .add_many(&elements[1..])
67            .map_err(|error| format!("pipeline adding failed: {error:?}"))?;
68        gst::Element::link_many(&elements).map_err(|error| format!("element linking failed: {error:?}"))?;
69        for e in &elements {
70            e.sync_state_with_parent().map_err(|e| e.to_string())?;
71        }
72        appsink.set_callbacks(
73            gst_app::AppSinkCallbacks::builder()
74                .new_sample(move |appsink| {
75                    let sample = appsink.pull_sample().map_err(|_| gst::FlowError::Eos)?;
76                    let buffer = sample.buffer_owned().ok_or(gst::FlowError::Error)?;
77
78                    let buffer = buffer
79                        .into_mapped_buffer_readable()
80                        .map_err(|_| gst::FlowError::Error)?;
81                    let floatref = buffer
82                        .as_slice()
83                        .as_slice_of::<f32>()
84                        .map_err(|_| gst::FlowError::Error)?;
85
86                    let block = Block::for_vec(floatref.into());
87                    tx.send(block).map_err(|_| gst::FlowError::Error)?;
88                    Ok(gst::FlowSuccess::Ok)
89                })
90                .build(),
91        );
92        Ok(Self { rx, pipeline })
93    }
94}
95
96impl AudioStreamReader for GStreamerAudioStreamReader {
97    fn pull(&self) -> Block {
98        self.rx.recv().unwrap()
99    }
100
101    fn start(&self) {
102        self.pipeline.set_state(gst::State::Playing).unwrap();
103    }
104
105    fn stop(&self) {
106        self.pipeline.set_state(gst::State::Null).unwrap();
107    }
108}