servo_media_gstreamer/
audio_stream_reader.rs1use std::sync::mpsc::{Receiver, channel};
6
7use byte_slice_cast::*;
8use gstreamer::Fraction;
9use gstreamer::prelude::*;
10use gstreamer_audio::AUDIO_FORMAT_F32;
11use servo_media_audio::AudioStreamReader;
12use servo_media_audio::block::{Block, FRAMES_PER_BLOCK_USIZE};
13use servo_media_streams::registry::{MediaStreamId, get_stream};
14
15use crate::media_stream::GStreamerMediaStream;
16
17pub struct GStreamerAudioStreamReader {
18 rx: Receiver<Block>,
19 pipeline: gstreamer::Pipeline,
20}
21
22impl GStreamerAudioStreamReader {
23 pub fn new(stream: MediaStreamId, sample_rate: f32) -> Result<Self, String> {
24 let (tx, rx) = channel();
25 let stream = get_stream(&stream).unwrap();
26 let mut stream = stream.lock().unwrap();
27 let g_stream = stream
28 .as_mut_any()
29 .downcast_mut::<GStreamerMediaStream>()
30 .unwrap();
31 let element = g_stream.src_element();
32 let pipeline = g_stream.pipeline_or_new();
33 drop(stream);
34 let time_per_block = Fraction::new(FRAMES_PER_BLOCK_USIZE as i32, sample_rate as i32);
35
36 let caps = gstreamer_audio::AudioCapsBuilder::new()
39 .layout(gstreamer_audio::AudioLayout::Interleaved)
40 .build();
41 let capsfilter0 = gstreamer::ElementFactory::make("capsfilter")
42 .property("caps", caps)
43 .build()
44 .map_err(|error| format!("capsfilter creation failed: {error:?}"))?;
45
46 let split = gstreamer::ElementFactory::make("audiobuffersplit")
47 .property("output-buffer-duration", time_per_block)
48 .build()
49 .map_err(|error| format!("audiobuffersplit creation failed: {error:?}"))?;
50 let convert = gstreamer::ElementFactory::make("audioconvert")
51 .build()
52 .map_err(|error| format!("audioconvert creation failed: {error:?}"))?;
53 let caps = gstreamer_audio::AudioCapsBuilder::new()
54 .layout(gstreamer_audio::AudioLayout::NonInterleaved)
55 .format(AUDIO_FORMAT_F32)
56 .rate(sample_rate as i32)
57 .build();
58 let capsfilter = gstreamer::ElementFactory::make("capsfilter")
59 .property("caps", caps)
60 .build()
61 .map_err(|error| format!("capsfilter creation failed: {error:?}"))?;
62 let sink = gstreamer::ElementFactory::make("appsink")
63 .property("sync", false)
64 .build()
65 .map_err(|error| format!("appsink creation failed: {error:?}"))?;
66
67 let appsink = sink
68 .clone()
69 .dynamic_cast::<gstreamer_app::AppSink>()
70 .unwrap();
71
72 let elements = [&element, &capsfilter0, &split, &convert, &capsfilter, &sink];
73 pipeline
74 .add_many(&elements[1..])
75 .map_err(|error| format!("pipeline adding failed: {error:?}"))?;
76 gstreamer::Element::link_many(elements)
77 .map_err(|error| format!("element linking failed: {error:?}"))?;
78 for e in &elements {
79 e.sync_state_with_parent().map_err(|e| e.to_string())?;
80 }
81 appsink.set_callbacks(
82 gstreamer_app::AppSinkCallbacks::builder()
83 .new_sample(move |appsink| {
84 let sample = appsink
85 .pull_sample()
86 .map_err(|_| gstreamer::FlowError::Eos)?;
87 let buffer = sample.buffer_owned().ok_or(gstreamer::FlowError::Error)?;
88
89 let buffer = buffer
90 .into_mapped_buffer_readable()
91 .map_err(|_| gstreamer::FlowError::Error)?;
92 let floatref = buffer
93 .as_slice()
94 .as_slice_of::<f32>()
95 .map_err(|_| gstreamer::FlowError::Error)?;
96
97 let block = Block::for_vec(floatref.into());
98 tx.send(block).map_err(|_| gstreamer::FlowError::Error)?;
99 Ok(gstreamer::FlowSuccess::Ok)
100 })
101 .build(),
102 );
103 Ok(Self { rx, pipeline })
104 }
105}
106
107impl AudioStreamReader for GStreamerAudioStreamReader {
108 fn pull(&self) -> Block {
109 self.rx.recv().unwrap()
110 }
111
112 fn start(&self) {
113 self.pipeline.set_state(gstreamer::State::Playing).unwrap();
114 }
115
116 fn stop(&self) {
117 self.pipeline.set_state(gstreamer::State::Null).unwrap();
118 }
119}