servo_media_gstreamer/
audio_stream_reader.rs1use crate::media_stream::GStreamerMediaStream;
2use servo_media_audio::block::{Block, FRAMES_PER_BLOCK_USIZE};
3use servo_media_audio::AudioStreamReader;
4use servo_media_streams::registry::{get_stream, MediaStreamId};
5use std::sync::mpsc::{channel, Receiver};
6
7use byte_slice_cast::*;
8use gst::prelude::*;
9use gst::Fraction;
10use gst_audio::AUDIO_FORMAT_F32;
11
12pub struct GStreamerAudioStreamReader {
13 rx: Receiver<Block>,
14 pipeline: gst::Pipeline,
15}
16
17impl GStreamerAudioStreamReader {
18 pub fn new(stream: MediaStreamId, sample_rate: f32) -> Result<Self, String> {
19 let (tx, rx) = channel();
20 let stream = get_stream(&stream).unwrap();
21 let mut stream = stream.lock().unwrap();
22 let g_stream = stream
23 .as_mut_any()
24 .downcast_mut::<GStreamerMediaStream>()
25 .unwrap();
26 let element = g_stream.src_element();
27 let pipeline = g_stream.pipeline_or_new();
28 drop(stream);
29 let time_per_block = Fraction::new(FRAMES_PER_BLOCK_USIZE as i32, sample_rate as i32);
30
31 let caps = gst_audio::AudioCapsBuilder::new()
34 .layout(gst_audio::AudioLayout::Interleaved)
35 .build();
36 let capsfilter0 = gst::ElementFactory::make("capsfilter")
37 .property("caps", caps)
38 .build()
39 .map_err(|error| format!("capsfilter creation failed: {error:?}"))?;
40
41 let split = gst::ElementFactory::make("audiobuffersplit")
42 .property("output-buffer-duration", time_per_block)
43 .build()
44 .map_err(|error| format!("audiobuffersplit creation failed: {error:?}"))?;
45 let convert = gst::ElementFactory::make("audioconvert")
46 .build()
47 .map_err(|error| format!("audioconvert creation failed: {error:?}"))?;
48 let caps = gst_audio::AudioCapsBuilder::new()
49 .layout(gst_audio::AudioLayout::NonInterleaved)
50 .format(AUDIO_FORMAT_F32)
51 .rate(sample_rate as i32)
52 .build();
53 let capsfilter = gst::ElementFactory::make("capsfilter")
54 .property("caps", caps)
55 .build()
56 .map_err(|error| format!("capsfilter creation failed: {error:?}"))?;
57 let sink = gst::ElementFactory::make("appsink")
58 .property("sync", false)
59 .build()
60 .map_err(|error| format!("appsink creation failed: {error:?}"))?;
61
62 let appsink = sink.clone().dynamic_cast::<gst_app::AppSink>().unwrap();
63
64 let elements = [&element, &capsfilter0, &split, &convert, &capsfilter, &sink];
65 pipeline
66 .add_many(&elements[1..])
67 .map_err(|error| format!("pipeline adding failed: {error:?}"))?;
68 gst::Element::link_many(&elements).map_err(|error| format!("element linking failed: {error:?}"))?;
69 for e in &elements {
70 e.sync_state_with_parent().map_err(|e| e.to_string())?;
71 }
72 appsink.set_callbacks(
73 gst_app::AppSinkCallbacks::builder()
74 .new_sample(move |appsink| {
75 let sample = appsink.pull_sample().map_err(|_| gst::FlowError::Eos)?;
76 let buffer = sample.buffer_owned().ok_or(gst::FlowError::Error)?;
77
78 let buffer = buffer
79 .into_mapped_buffer_readable()
80 .map_err(|_| gst::FlowError::Error)?;
81 let floatref = buffer
82 .as_slice()
83 .as_slice_of::<f32>()
84 .map_err(|_| gst::FlowError::Error)?;
85
86 let block = Block::for_vec(floatref.into());
87 tx.send(block).map_err(|_| gst::FlowError::Error)?;
88 Ok(gst::FlowSuccess::Ok)
89 })
90 .build(),
91 );
92 Ok(Self { rx, pipeline })
93 }
94}
95
96impl AudioStreamReader for GStreamerAudioStreamReader {
97 fn pull(&self) -> Block {
98 self.rx.recv().unwrap()
99 }
100
101 fn start(&self) {
102 self.pipeline.set_state(gst::State::Playing).unwrap();
103 }
104
105 fn stop(&self) {
106 self.pipeline.set_state(gst::State::Null).unwrap();
107 }
108}