servo_media_gstreamer/
audio_stream_reader.rs

1use crate::media_stream::GStreamerMediaStream;
2use servo_media_audio::block::{Block, FRAMES_PER_BLOCK_USIZE};
3use servo_media_audio::AudioStreamReader;
4use servo_media_streams::registry::{get_stream, MediaStreamId};
5use std::sync::mpsc::{channel, Receiver};
6
7use byte_slice_cast::*;
8use gst::prelude::*;
9use gst::Fraction;
10use gst_audio::AUDIO_FORMAT_F32;
11
12pub struct GStreamerAudioStreamReader {
13    rx: Receiver<Block>,
14    pipeline: gst::Pipeline,
15}
16
17impl GStreamerAudioStreamReader {
18    pub fn new(stream: MediaStreamId, sample_rate: f32) -> Result<Self, String> {
19        let (tx, rx) = channel();
20        let stream = get_stream(&stream).unwrap();
21        let mut stream = stream.lock().unwrap();
22        let g_stream = stream
23            .as_mut_any()
24            .downcast_mut::<GStreamerMediaStream>()
25            .unwrap();
26        let element = g_stream.src_element();
27        let pipeline = g_stream.pipeline_or_new();
28        drop(stream);
29        let time_per_block = Fraction::new(FRAMES_PER_BLOCK_USIZE as i32, sample_rate as i32);
30
31        // XXXManishearth this is only necessary because of an upstream
32        // gstreamer bug. https://github.com/servo/media/pull/362#issuecomment-647947034
33        let caps = gst_audio::AudioCapsBuilder::new()
34            .layout(gst_audio::AudioLayout::Interleaved)
35            .build();
36        let capsfilter0 = gst::ElementFactory::make("capsfilter")
37            .property("caps", caps)
38            .build()
39            .map_err(|error| format!("capsfilter creation failed: {error:?}"))?;
40
41        let split = gst::ElementFactory::make("audiobuffersplit")
42            .property("output-buffer-duration", time_per_block)
43            .build()
44            .map_err(|error| format!("audiobuffersplit creation failed: {error:?}"))?;
45        let convert = gst::ElementFactory::make("audioconvert")
46            .build()
47            .map_err(|error| format!("audioconvert creation failed: {error:?}"))?;
48        let caps = gst_audio::AudioCapsBuilder::new()
49            .layout(gst_audio::AudioLayout::NonInterleaved)
50            .format(AUDIO_FORMAT_F32)
51            .rate(sample_rate as i32)
52            .build();
53        let capsfilter = gst::ElementFactory::make("capsfilter")
54            .property("caps", caps)
55            .build()
56            .map_err(|error| format!("capsfilter creation failed: {error:?}"))?;
57        let sink = gst::ElementFactory::make("appsink")
58            .property("sync", false)
59            .build()
60            .map_err(|error| format!("appsink creation failed: {error:?}"))?;
61
62        let appsink = sink.clone().dynamic_cast::<gst_app::AppSink>().unwrap();
63
64        let elements = [&element, &capsfilter0, &split, &convert, &capsfilter, &sink];
65        pipeline
66            .add_many(&elements[1..])
67            .map_err(|error| format!("pipeline adding failed: {error:?}"))?;
68        gst::Element::link_many(&elements)
69            .map_err(|error| format!("element linking failed: {error:?}"))?;
70        for e in &elements {
71            e.sync_state_with_parent().map_err(|e| e.to_string())?;
72        }
73        appsink.set_callbacks(
74            gst_app::AppSinkCallbacks::builder()
75                .new_sample(move |appsink| {
76                    let sample = appsink.pull_sample().map_err(|_| gst::FlowError::Eos)?;
77                    let buffer = sample.buffer_owned().ok_or(gst::FlowError::Error)?;
78
79                    let buffer = buffer
80                        .into_mapped_buffer_readable()
81                        .map_err(|_| gst::FlowError::Error)?;
82                    let floatref = buffer
83                        .as_slice()
84                        .as_slice_of::<f32>()
85                        .map_err(|_| gst::FlowError::Error)?;
86
87                    let block = Block::for_vec(floatref.into());
88                    tx.send(block).map_err(|_| gst::FlowError::Error)?;
89                    Ok(gst::FlowSuccess::Ok)
90                })
91                .build(),
92        );
93        Ok(Self { rx, pipeline })
94    }
95}
96
97impl AudioStreamReader for GStreamerAudioStreamReader {
98    fn pull(&self) -> Block {
99        self.rx.recv().unwrap()
100    }
101
102    fn start(&self) {
103        self.pipeline.set_state(gst::State::Playing).unwrap();
104    }
105
106    fn stop(&self) {
107        self.pipeline.set_state(gst::State::Null).unwrap();
108    }
109}