servo_media_gstreamer/
audio_sink.rs

1use crate::media_stream::GstreamerMediaSocket;
2use byte_slice_cast::*;
3use gst;
4use gst::prelude::*;
5use gst_app::{AppSrc, AppSrcCallbacks};
6use gst_audio;
7use servo_media_audio::block::{Chunk, FRAMES_PER_BLOCK};
8use servo_media_audio::render_thread::AudioRenderThreadMsg;
9use servo_media_audio::sink::{AudioSink, AudioSinkError};
10use servo_media_streams::MediaSocket;
11use std::cell::{Cell, RefCell};
12use std::sync::mpsc::Sender;
13use std::sync::Arc;
14use std::thread::Builder;
15
16const DEFAULT_SAMPLE_RATE: f32 = 44100.;
17
18pub struct GStreamerAudioSink {
19    pipeline: gst::Pipeline,
20    appsrc: Arc<AppSrc>,
21    sample_rate: Cell<f32>,
22    audio_info: RefCell<Option<gst_audio::AudioInfo>>,
23    sample_offset: Cell<u64>,
24}
25
26impl GStreamerAudioSink {
27    pub fn new() -> Result<Self, AudioSinkError> {
28        if let Some(category) = gst::DebugCategory::get("openslessink") {
29            category.set_threshold(gst::DebugLevel::Trace);
30        }
31        gst::init().map_err(|error| {
32            AudioSinkError::Backend(format!("GStreamer init failed: {error:?}"))
33        })?;
34
35        let appsrc = gst::ElementFactory::make("appsrc")
36            .build()
37            .map_err(|error| {
38                AudioSinkError::Backend(format!("appsrc creation failed: {error:?}"))
39            })?;
40        let appsrc = appsrc.downcast::<AppSrc>().unwrap();
41
42        Ok(Self {
43            pipeline: gst::Pipeline::new(),
44            appsrc: Arc::new(appsrc),
45            sample_rate: Cell::new(DEFAULT_SAMPLE_RATE),
46            audio_info: RefCell::new(None),
47            sample_offset: Cell::new(0),
48        })
49    }
50}
51
52impl GStreamerAudioSink {
53    fn set_audio_info(&self, sample_rate: f32, channels: u8) -> Result<(), AudioSinkError> {
54        let audio_info = gst_audio::AudioInfo::builder(
55            gst_audio::AUDIO_FORMAT_F32,
56            sample_rate as u32,
57            channels.into(),
58        )
59        .build()
60        .map_err(|error| AudioSinkError::Backend(format!("AudioInfo failed: {error:?}")))?;
61        self.appsrc.set_caps(audio_info.to_caps().ok().as_ref());
62        *self.audio_info.borrow_mut() = Some(audio_info);
63        Ok(())
64    }
65
66    fn set_channels_if_changed(&self, channels: u8) -> Result<(), AudioSinkError> {
67        let curr_channels = if let Some(ch) = self.audio_info.borrow().as_ref() {
68            ch.channels()
69        } else {
70            return Ok(());
71        };
72        if channels != curr_channels as u8 {
73            self.set_audio_info(self.sample_rate.get(), channels)?;
74        }
75        Ok(())
76    }
77}
78
79impl AudioSink for GStreamerAudioSink {
80    fn init(
81        &self,
82        sample_rate: f32,
83        graph_thread_channel: Sender<AudioRenderThreadMsg>,
84    ) -> Result<(), AudioSinkError> {
85        self.sample_rate.set(sample_rate);
86        self.set_audio_info(sample_rate, 2)?;
87        self.appsrc.set_format(gst::Format::Time);
88
89        // Allow only a single chunk.
90        self.appsrc.set_max_bytes(1);
91
92        let appsrc = self.appsrc.clone();
93        Builder::new()
94            .name("GstAppSrcCallbacks".to_owned())
95            .spawn(move || {
96                let need_data = move |_: &AppSrc, _: u32| {
97                    if let Err(e) = graph_thread_channel.send(AudioRenderThreadMsg::SinkNeedData) {
98                        log::warn!("Error sending need data event: {:?}", e);
99                    }
100                };
101                appsrc.set_callbacks(AppSrcCallbacks::builder().need_data(need_data).build());
102            })
103            .unwrap();
104
105        let appsrc = self.appsrc.as_ref().clone().upcast();
106        let resample = gst::ElementFactory::make("audioresample")
107            .build()
108            .map_err(|error| {
109                AudioSinkError::Backend(format!("audioresample creation failed: {error:?}"))
110            })?;
111        let convert = gst::ElementFactory::make("audioconvert")
112            .build()
113            .map_err(|error| {
114                AudioSinkError::Backend(format!("audioconvert creation failed: {error:?}"))
115            })?;
116        let sink = gst::ElementFactory::make("autoaudiosink")
117            .build()
118            .map_err(|error| {
119                AudioSinkError::Backend(format!("autoaudiosink creation failed: {error:?}"))
120            })?;
121        self.pipeline
122            .add_many(&[&appsrc, &resample, &convert, &sink])
123            .map_err(|error| AudioSinkError::Backend(error.to_string()))?;
124        gst::Element::link_many(&[&appsrc, &resample, &convert, &sink])
125            .map_err(|error| AudioSinkError::Backend(error.to_string()))?;
126
127        Ok(())
128    }
129
130    fn init_stream(
131        &self,
132        channels: u8,
133        sample_rate: f32,
134        socket: Box<dyn MediaSocket>,
135    ) -> Result<(), AudioSinkError> {
136        self.sample_rate.set(sample_rate);
137        self.set_audio_info(sample_rate, channels)?;
138        self.appsrc.set_format(gst::Format::Time);
139
140        // Do not set max bytes or callback, we will push as needed
141
142        let appsrc = self.appsrc.as_ref().clone().upcast();
143        let convert = gst::ElementFactory::make("audioconvert")
144            .build()
145            .map_err(|error| {
146                AudioSinkError::Backend(format!("audioconvert creation failed: {error:?}"))
147            })?;
148        let sink = socket
149            .as_any()
150            .downcast_ref::<GstreamerMediaSocket>()
151            .unwrap()
152            .proxy_sink()
153            .clone();
154
155        self.pipeline
156            .add_many(&[&appsrc, &convert, &sink])
157            .map_err(|error| AudioSinkError::Backend(error.to_string()))?;
158        gst::Element::link_many(&[&appsrc, &convert, &sink])
159            .map_err(|error| AudioSinkError::Backend(error.to_string()))?;
160
161        Ok(())
162    }
163
164    fn play(&self) -> Result<(), AudioSinkError> {
165        self.pipeline
166            .set_state(gst::State::Playing)
167            .map(|_| ())
168            .map_err(|_| AudioSinkError::StateChangeFailed)
169    }
170
171    fn stop(&self) -> Result<(), AudioSinkError> {
172        self.pipeline
173            .set_state(gst::State::Paused)
174            .map(|_| ())
175            .map_err(|_| AudioSinkError::StateChangeFailed)
176    }
177
178    fn has_enough_data(&self) -> bool {
179        self.appsrc.current_level_bytes() >= self.appsrc.max_bytes()
180    }
181
182    fn push_data(&self, mut chunk: Chunk) -> Result<(), AudioSinkError> {
183        if let Some(block) = chunk.blocks.get(0) {
184            self.set_channels_if_changed(block.chan_count())?;
185        }
186
187        let sample_rate = self.sample_rate.get() as u64;
188        let audio_info = self.audio_info.borrow();
189        let audio_info = audio_info.as_ref().unwrap();
190        let channels = audio_info.channels();
191        let bpf = audio_info.bpf() as usize;
192        assert_eq!(bpf, 4 * channels as usize);
193        let n_samples = FRAMES_PER_BLOCK.0;
194        let buf_size = (n_samples as usize) * (bpf);
195        let mut buffer = gst::Buffer::with_size(buf_size).unwrap();
196        {
197            let buffer = buffer.get_mut().unwrap();
198            let mut sample_offset = self.sample_offset.get();
199            // Calculate the current timestamp (PTS) and the next one,
200            // and calculate the duration from the difference instead of
201            // simply the number of samples to prevent rounding errors
202            let pts = gst::ClockTime::from_nseconds(
203                sample_offset
204                    .mul_div_floor(gst::ClockTime::SECOND.nseconds(), sample_rate)
205                    .unwrap(),
206            );
207            let next_pts: gst::ClockTime = gst::ClockTime::from_nseconds(
208                (sample_offset + n_samples)
209                    .mul_div_floor(gst::ClockTime::SECOND.nseconds(), sample_rate)
210                    .unwrap(),
211            );
212            buffer.set_pts(Some(pts));
213            buffer.set_duration(next_pts - pts);
214
215            // sometimes nothing reaches the output
216            if chunk.len() == 0 {
217                chunk.blocks.push(Default::default());
218                chunk.blocks[0].repeat(channels as u8);
219            }
220            debug_assert!(chunk.len() == 1);
221            let mut data = chunk.blocks[0].interleave();
222            let data = data.as_mut_byte_slice().expect("casting failed");
223
224            // XXXManishearth if we have a safe way to convert
225            // from Box<[f32]> to Box<[u8]> (similarly for Vec)
226            // we can use Buffer::from_slice instead
227            buffer.copy_from_slice(0, data).expect("copying failed");
228
229            sample_offset += n_samples;
230            self.sample_offset.set(sample_offset);
231        }
232
233        self.appsrc
234            .push_buffer(buffer)
235            .map(|_| ())
236            .map_err(|_| AudioSinkError::BufferPushFailed)
237    }
238
239    fn set_eos_callback(&self, _: Box<dyn Fn(Box<dyn AsRef<[f32]>>) + Send + Sync + 'static>) {}
240}
241
242impl Drop for GStreamerAudioSink {
243    fn drop(&mut self) {
244        let _ = self.stop();
245    }
246}