Skip to main content

servo_media_gstreamer/
audio_stream_reader.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
4
5use std::sync::mpsc::{Receiver, channel};
6
7use byte_slice_cast::*;
8use gstreamer::Fraction;
9use gstreamer::prelude::*;
10use gstreamer_audio::AUDIO_FORMAT_F32;
11use servo_media_audio::AudioStreamReader;
12use servo_media_audio::block::{Block, FRAMES_PER_BLOCK_USIZE};
13use servo_media_streams::registry::{MediaStreamId, get_stream};
14
15use crate::media_stream::GStreamerMediaStream;
16
17pub struct GStreamerAudioStreamReader {
18    rx: Receiver<Block>,
19    pipeline: gstreamer::Pipeline,
20}
21
22impl GStreamerAudioStreamReader {
23    pub fn new(stream: MediaStreamId, sample_rate: f32) -> Result<Self, String> {
24        let (tx, rx) = channel();
25        let stream = get_stream(&stream).unwrap();
26        let mut stream = stream.lock().unwrap();
27        let g_stream = stream
28            .as_mut_any()
29            .downcast_mut::<GStreamerMediaStream>()
30            .unwrap();
31        let element = g_stream.src_element();
32        let pipeline = g_stream.pipeline_or_new();
33        drop(stream);
34        let time_per_block = Fraction::new(FRAMES_PER_BLOCK_USIZE as i32, sample_rate as i32);
35
36        // XXXManishearth this is only necessary because of an upstream
37        // gstreamer bug. https://github.com/servo/media/pull/362#issuecomment-647947034
38        let caps = gstreamer_audio::AudioCapsBuilder::new()
39            .layout(gstreamer_audio::AudioLayout::Interleaved)
40            .build();
41        let capsfilter0 = gstreamer::ElementFactory::make("capsfilter")
42            .property("caps", caps)
43            .build()
44            .map_err(|error| format!("capsfilter creation failed: {error:?}"))?;
45
46        let split = gstreamer::ElementFactory::make("audiobuffersplit")
47            .property("output-buffer-duration", time_per_block)
48            .build()
49            .map_err(|error| format!("audiobuffersplit creation failed: {error:?}"))?;
50        let convert = gstreamer::ElementFactory::make("audioconvert")
51            .build()
52            .map_err(|error| format!("audioconvert creation failed: {error:?}"))?;
53        let caps = gstreamer_audio::AudioCapsBuilder::new()
54            .layout(gstreamer_audio::AudioLayout::NonInterleaved)
55            .format(AUDIO_FORMAT_F32)
56            .rate(sample_rate as i32)
57            .build();
58        let capsfilter = gstreamer::ElementFactory::make("capsfilter")
59            .property("caps", caps)
60            .build()
61            .map_err(|error| format!("capsfilter creation failed: {error:?}"))?;
62        let sink = gstreamer::ElementFactory::make("appsink")
63            .property("sync", false)
64            .build()
65            .map_err(|error| format!("appsink creation failed: {error:?}"))?;
66
67        let appsink = sink
68            .clone()
69            .dynamic_cast::<gstreamer_app::AppSink>()
70            .unwrap();
71
72        let elements = [&element, &capsfilter0, &split, &convert, &capsfilter, &sink];
73        pipeline
74            .add_many(&elements[1..])
75            .map_err(|error| format!("pipeline adding failed: {error:?}"))?;
76        gstreamer::Element::link_many(elements)
77            .map_err(|error| format!("element linking failed: {error:?}"))?;
78        for e in &elements {
79            e.sync_state_with_parent().map_err(|e| e.to_string())?;
80        }
81        appsink.set_callbacks(
82            gstreamer_app::AppSinkCallbacks::builder()
83                .new_sample(move |appsink| {
84                    let sample = appsink
85                        .pull_sample()
86                        .map_err(|_| gstreamer::FlowError::Eos)?;
87                    let buffer = sample.buffer_owned().ok_or(gstreamer::FlowError::Error)?;
88
89                    let buffer = buffer
90                        .into_mapped_buffer_readable()
91                        .map_err(|_| gstreamer::FlowError::Error)?;
92                    let floatref = buffer
93                        .as_slice()
94                        .as_slice_of::<f32>()
95                        .map_err(|_| gstreamer::FlowError::Error)?;
96
97                    let block = Block::for_vec(floatref.into());
98                    tx.send(block).map_err(|_| gstreamer::FlowError::Error)?;
99                    Ok(gstreamer::FlowSuccess::Ok)
100                })
101                .build(),
102        );
103        Ok(Self { rx, pipeline })
104    }
105}
106
107impl AudioStreamReader for GStreamerAudioStreamReader {
108    fn pull(&self) -> Block {
109        self.rx.recv().unwrap()
110    }
111
112    fn start(&self) {
113        self.pipeline.set_state(gstreamer::State::Playing).unwrap();
114    }
115
116    fn stop(&self) {
117        self.pipeline.set_state(gstreamer::State::Null).unwrap();
118    }
119}