servo_media_gstreamer/
audio_sink.rs1use crate::media_stream::GstreamerMediaSocket;
2use byte_slice_cast::*;
3use gst;
4use gst::prelude::*;
5use gst_app::{AppSrc, AppSrcCallbacks};
6use gst_audio;
7use servo_media_audio::block::{Chunk, FRAMES_PER_BLOCK};
8use servo_media_audio::render_thread::AudioRenderThreadMsg;
9use servo_media_audio::sink::{AudioSink, AudioSinkError};
10use servo_media_streams::MediaSocket;
11use std::cell::{Cell, RefCell};
12use std::sync::mpsc::Sender;
13use std::sync::Arc;
14use std::thread::Builder;
15
16const DEFAULT_SAMPLE_RATE: f32 = 44100.;
17
18pub struct GStreamerAudioSink {
19 pipeline: gst::Pipeline,
20 appsrc: Arc<AppSrc>,
21 sample_rate: Cell<f32>,
22 audio_info: RefCell<Option<gst_audio::AudioInfo>>,
23 sample_offset: Cell<u64>,
24}
25
26impl GStreamerAudioSink {
27 pub fn new() -> Result<Self, AudioSinkError> {
28 if let Some(category) = gst::DebugCategory::get("openslessink") {
29 category.set_threshold(gst::DebugLevel::Trace);
30 }
31 gst::init().map_err(|error| AudioSinkError::Backend(format!("GStreamer init failed: {error:?}")))?;
32
33 let appsrc = gst::ElementFactory::make("appsrc")
34 .build()
35 .map_err(|error| AudioSinkError::Backend(format!("appsrc creation failed: {error:?}")))?;
36 let appsrc = appsrc.downcast::<AppSrc>().unwrap();
37
38 Ok(Self {
39 pipeline: gst::Pipeline::new(),
40 appsrc: Arc::new(appsrc),
41 sample_rate: Cell::new(DEFAULT_SAMPLE_RATE),
42 audio_info: RefCell::new(None),
43 sample_offset: Cell::new(0),
44 })
45 }
46}
47
48impl GStreamerAudioSink {
49 fn set_audio_info(&self, sample_rate: f32, channels: u8) -> Result<(), AudioSinkError> {
50 let audio_info = gst_audio::AudioInfo::builder(
51 gst_audio::AUDIO_FORMAT_F32,
52 sample_rate as u32,
53 channels.into(),
54 )
55 .build()
56 .map_err(|error| AudioSinkError::Backend(format!("AudioInfo failed: {error:?}")))?;
57 self.appsrc.set_caps(audio_info.to_caps().ok().as_ref());
58 *self.audio_info.borrow_mut() = Some(audio_info);
59 Ok(())
60 }
61
62 fn set_channels_if_changed(&self, channels: u8) -> Result<(), AudioSinkError> {
63 let curr_channels = if let Some(ch) = self.audio_info.borrow().as_ref() {
64 ch.channels()
65 } else {
66 return Ok(());
67 };
68 if channels != curr_channels as u8 {
69 self.set_audio_info(self.sample_rate.get(), channels)?;
70 }
71 Ok(())
72 }
73}
74
75impl AudioSink for GStreamerAudioSink {
76 fn init(
77 &self,
78 sample_rate: f32,
79 graph_thread_channel: Sender<AudioRenderThreadMsg>,
80 ) -> Result<(), AudioSinkError> {
81 self.sample_rate.set(sample_rate);
82 self.set_audio_info(sample_rate, 2)?;
83 self.appsrc.set_format(gst::Format::Time);
84
85 self.appsrc.set_max_bytes(1);
87
88 let appsrc = self.appsrc.clone();
89 Builder::new()
90 .name("GstAppSrcCallbacks".to_owned())
91 .spawn(move || {
92 let need_data = move |_: &AppSrc, _: u32| {
93 if let Err(e) = graph_thread_channel
94 .send(AudioRenderThreadMsg::SinkNeedData)
95 {
96 log::warn!("Error sending need data event: {:?}", e);
97 }
98 };
99 appsrc.set_callbacks(AppSrcCallbacks::builder().need_data(need_data).build());
100 })
101 .unwrap();
102
103 let appsrc = self.appsrc.as_ref().clone().upcast();
104 let resample = gst::ElementFactory::make("audioresample")
105 .build()
106 .map_err(|error| AudioSinkError::Backend(format!("audioresample creation failed: {error:?}")))?;
107 let convert = gst::ElementFactory::make("audioconvert")
108 .build()
109 .map_err(|error| AudioSinkError::Backend(format!("audioconvert creation failed: {error:?}")))?;
110 let sink = gst::ElementFactory::make("autoaudiosink")
111 .build()
112 .map_err(|error| AudioSinkError::Backend(format!("autoaudiosink creation failed: {error:?}")))?;
113 self.pipeline
114 .add_many(&[&appsrc, &resample, &convert, &sink])
115 .map_err(|error| AudioSinkError::Backend(error.to_string()))?;
116 gst::Element::link_many(&[&appsrc, &resample, &convert, &sink])
117 .map_err(|error| AudioSinkError::Backend(error.to_string()))?;
118
119 Ok(())
120 }
121
122 fn init_stream(
123 &self,
124 channels: u8,
125 sample_rate: f32,
126 socket: Box<dyn MediaSocket>,
127 ) -> Result<(), AudioSinkError> {
128 self.sample_rate.set(sample_rate);
129 self.set_audio_info(sample_rate, channels)?;
130 self.appsrc.set_format(gst::Format::Time);
131
132 let appsrc = self.appsrc.as_ref().clone().upcast();
135 let convert = gst::ElementFactory::make("audioconvert")
136 .build()
137 .map_err(|error| AudioSinkError::Backend(format!("audioconvert creation failed: {error:?}")))?;
138 let sink = socket
139 .as_any()
140 .downcast_ref::<GstreamerMediaSocket>()
141 .unwrap()
142 .proxy_sink()
143 .clone();
144
145 self.pipeline
146 .add_many(&[&appsrc, &convert, &sink])
147 .map_err(|error| AudioSinkError::Backend(error.to_string()))?;
148 gst::Element::link_many(&[&appsrc, &convert, &sink])
149 .map_err(|error| AudioSinkError::Backend(error.to_string()))?;
150
151 Ok(())
152 }
153
154 fn play(&self) -> Result<(), AudioSinkError> {
155 self.pipeline
156 .set_state(gst::State::Playing)
157 .map(|_| ())
158 .map_err(|_| AudioSinkError::StateChangeFailed)
159 }
160
161 fn stop(&self) -> Result<(), AudioSinkError> {
162 self.pipeline
163 .set_state(gst::State::Paused)
164 .map(|_| ())
165 .map_err(|_| AudioSinkError::StateChangeFailed)
166 }
167
168 fn has_enough_data(&self) -> bool {
169 self.appsrc.current_level_bytes() >= self.appsrc.max_bytes()
170 }
171
172 fn push_data(&self, mut chunk: Chunk) -> Result<(), AudioSinkError> {
173 if let Some(block) = chunk.blocks.get(0) {
174 self.set_channels_if_changed(block.chan_count())?;
175 }
176
177 let sample_rate = self.sample_rate.get() as u64;
178 let audio_info = self.audio_info.borrow();
179 let audio_info = audio_info.as_ref().unwrap();
180 let channels = audio_info.channels();
181 let bpf = audio_info.bpf() as usize;
182 assert_eq!(bpf, 4 * channels as usize);
183 let n_samples = FRAMES_PER_BLOCK.0;
184 let buf_size = (n_samples as usize) * (bpf);
185 let mut buffer = gst::Buffer::with_size(buf_size).unwrap();
186 {
187 let buffer = buffer.get_mut().unwrap();
188 let mut sample_offset = self.sample_offset.get();
189 let pts = gst::ClockTime::from_nseconds(
193 sample_offset
194 .mul_div_floor(gst::ClockTime::SECOND.nseconds(), sample_rate)
195 .unwrap(),
196 );
197 let next_pts: gst::ClockTime = gst::ClockTime::from_nseconds(
198 (sample_offset + n_samples)
199 .mul_div_floor(gst::ClockTime::SECOND.nseconds(), sample_rate)
200 .unwrap(),
201 );
202 buffer.set_pts(Some(pts));
203 buffer.set_duration(next_pts - pts);
204
205 if chunk.len() == 0 {
207 chunk.blocks.push(Default::default());
208 chunk.blocks[0].repeat(channels as u8);
209 }
210 debug_assert!(chunk.len() == 1);
211 let mut data = chunk.blocks[0].interleave();
212 let data = data.as_mut_byte_slice().expect("casting failed");
213
214 buffer.copy_from_slice(0, data).expect("copying failed");
218
219 sample_offset += n_samples;
220 self.sample_offset.set(sample_offset);
221 }
222
223 self.appsrc
224 .push_buffer(buffer)
225 .map(|_| ())
226 .map_err(|_| AudioSinkError::BufferPushFailed)
227 }
228
229 fn set_eos_callback(&self, _: Box<dyn Fn(Box<dyn AsRef<[f32]>>) + Send + Sync + 'static>) {}
230}
231
232impl Drop for GStreamerAudioSink {
233 fn drop(&mut self) {
234 let _ = self.stop();
235 }
236}