servo_media_gstreamer/
audio_sink.rs

1use crate::media_stream::GstreamerMediaSocket;
2use byte_slice_cast::*;
3use gst;
4use gst::prelude::*;
5use gst_app::{AppSrc, AppSrcCallbacks};
6use gst_audio;
7use servo_media_audio::block::{Chunk, FRAMES_PER_BLOCK};
8use servo_media_audio::render_thread::AudioRenderThreadMsg;
9use servo_media_audio::sink::{AudioSink, AudioSinkError};
10use servo_media_streams::MediaSocket;
11use std::cell::{Cell, RefCell};
12use std::sync::mpsc::Sender;
13use std::sync::Arc;
14use std::thread::Builder;
15
16const DEFAULT_SAMPLE_RATE: f32 = 44100.;
17
18pub struct GStreamerAudioSink {
19    pipeline: gst::Pipeline,
20    appsrc: Arc<AppSrc>,
21    sample_rate: Cell<f32>,
22    audio_info: RefCell<Option<gst_audio::AudioInfo>>,
23    sample_offset: Cell<u64>,
24}
25
26impl GStreamerAudioSink {
27    pub fn new() -> Result<Self, AudioSinkError> {
28        if let Some(category) = gst::DebugCategory::get("openslessink") {
29            category.set_threshold(gst::DebugLevel::Trace);
30        }
31        gst::init().map_err(|error| AudioSinkError::Backend(format!("GStreamer init failed: {error:?}")))?;
32
33        let appsrc = gst::ElementFactory::make("appsrc")
34            .build()
35            .map_err(|error| AudioSinkError::Backend(format!("appsrc creation failed: {error:?}")))?;
36        let appsrc = appsrc.downcast::<AppSrc>().unwrap();
37
38        Ok(Self {
39            pipeline: gst::Pipeline::new(),
40            appsrc: Arc::new(appsrc),
41            sample_rate: Cell::new(DEFAULT_SAMPLE_RATE),
42            audio_info: RefCell::new(None),
43            sample_offset: Cell::new(0),
44        })
45    }
46}
47
48impl GStreamerAudioSink {
49    fn set_audio_info(&self, sample_rate: f32, channels: u8) -> Result<(), AudioSinkError> {
50        let audio_info = gst_audio::AudioInfo::builder(
51            gst_audio::AUDIO_FORMAT_F32,
52            sample_rate as u32,
53            channels.into(),
54        )
55        .build()
56        .map_err(|error| AudioSinkError::Backend(format!("AudioInfo failed: {error:?}")))?;
57        self.appsrc.set_caps(audio_info.to_caps().ok().as_ref());
58        *self.audio_info.borrow_mut() = Some(audio_info);
59        Ok(())
60    }
61
62    fn set_channels_if_changed(&self, channels: u8) -> Result<(), AudioSinkError> {
63        let curr_channels = if let Some(ch) = self.audio_info.borrow().as_ref() {
64            ch.channels()
65        } else {
66            return Ok(());
67        };
68        if channels != curr_channels as u8 {
69            self.set_audio_info(self.sample_rate.get(), channels)?;
70        }
71        Ok(())
72    }
73}
74
75impl AudioSink for GStreamerAudioSink {
76    fn init(
77        &self,
78        sample_rate: f32,
79        graph_thread_channel: Sender<AudioRenderThreadMsg>,
80    ) -> Result<(), AudioSinkError> {
81        self.sample_rate.set(sample_rate);
82        self.set_audio_info(sample_rate, 2)?;
83        self.appsrc.set_format(gst::Format::Time);
84
85        // Allow only a single chunk.
86        self.appsrc.set_max_bytes(1);
87
88        let appsrc = self.appsrc.clone();
89        Builder::new()
90            .name("GstAppSrcCallbacks".to_owned())
91            .spawn(move || {
92                let need_data = move |_: &AppSrc, _: u32| {
93                    if let Err(e) = graph_thread_channel
94                        .send(AudioRenderThreadMsg::SinkNeedData)
95                    {
96                        log::warn!("Error sending need data event: {:?}", e);
97                    }
98                };
99                appsrc.set_callbacks(AppSrcCallbacks::builder().need_data(need_data).build());
100            })
101            .unwrap();
102
103        let appsrc = self.appsrc.as_ref().clone().upcast();
104        let resample = gst::ElementFactory::make("audioresample")
105            .build()
106            .map_err(|error| AudioSinkError::Backend(format!("audioresample creation failed: {error:?}")))?;
107        let convert = gst::ElementFactory::make("audioconvert")
108            .build()
109            .map_err(|error| AudioSinkError::Backend(format!("audioconvert creation failed: {error:?}")))?;
110        let sink = gst::ElementFactory::make("autoaudiosink")
111            .build()
112            .map_err(|error| AudioSinkError::Backend(format!("autoaudiosink creation failed: {error:?}")))?;
113        self.pipeline
114            .add_many(&[&appsrc, &resample, &convert, &sink])
115            .map_err(|error| AudioSinkError::Backend(error.to_string()))?;
116        gst::Element::link_many(&[&appsrc, &resample, &convert, &sink])
117            .map_err(|error| AudioSinkError::Backend(error.to_string()))?;
118
119        Ok(())
120    }
121
122    fn init_stream(
123        &self,
124        channels: u8,
125        sample_rate: f32,
126        socket: Box<dyn MediaSocket>,
127    ) -> Result<(), AudioSinkError> {
128        self.sample_rate.set(sample_rate);
129        self.set_audio_info(sample_rate, channels)?;
130        self.appsrc.set_format(gst::Format::Time);
131
132        // Do not set max bytes or callback, we will push as needed
133
134        let appsrc = self.appsrc.as_ref().clone().upcast();
135        let convert = gst::ElementFactory::make("audioconvert")
136            .build()
137            .map_err(|error| AudioSinkError::Backend(format!("audioconvert creation failed: {error:?}")))?;
138        let sink = socket
139            .as_any()
140            .downcast_ref::<GstreamerMediaSocket>()
141            .unwrap()
142            .proxy_sink()
143            .clone();
144
145        self.pipeline
146            .add_many(&[&appsrc, &convert, &sink])
147            .map_err(|error| AudioSinkError::Backend(error.to_string()))?;
148        gst::Element::link_many(&[&appsrc, &convert, &sink])
149            .map_err(|error| AudioSinkError::Backend(error.to_string()))?;
150
151        Ok(())
152    }
153
154    fn play(&self) -> Result<(), AudioSinkError> {
155        self.pipeline
156            .set_state(gst::State::Playing)
157            .map(|_| ())
158            .map_err(|_| AudioSinkError::StateChangeFailed)
159    }
160
161    fn stop(&self) -> Result<(), AudioSinkError> {
162        self.pipeline
163            .set_state(gst::State::Paused)
164            .map(|_| ())
165            .map_err(|_| AudioSinkError::StateChangeFailed)
166    }
167
168    fn has_enough_data(&self) -> bool {
169        self.appsrc.current_level_bytes() >= self.appsrc.max_bytes()
170    }
171
172    fn push_data(&self, mut chunk: Chunk) -> Result<(), AudioSinkError> {
173        if let Some(block) = chunk.blocks.get(0) {
174            self.set_channels_if_changed(block.chan_count())?;
175        }
176
177        let sample_rate = self.sample_rate.get() as u64;
178        let audio_info = self.audio_info.borrow();
179        let audio_info = audio_info.as_ref().unwrap();
180        let channels = audio_info.channels();
181        let bpf = audio_info.bpf() as usize;
182        assert_eq!(bpf, 4 * channels as usize);
183        let n_samples = FRAMES_PER_BLOCK.0;
184        let buf_size = (n_samples as usize) * (bpf);
185        let mut buffer = gst::Buffer::with_size(buf_size).unwrap();
186        {
187            let buffer = buffer.get_mut().unwrap();
188            let mut sample_offset = self.sample_offset.get();
189            // Calculate the current timestamp (PTS) and the next one,
190            // and calculate the duration from the difference instead of
191            // simply the number of samples to prevent rounding errors
192            let pts = gst::ClockTime::from_nseconds(
193                sample_offset
194                    .mul_div_floor(gst::ClockTime::SECOND.nseconds(), sample_rate)
195                    .unwrap(),
196            );
197            let next_pts: gst::ClockTime = gst::ClockTime::from_nseconds(
198                (sample_offset + n_samples)
199                    .mul_div_floor(gst::ClockTime::SECOND.nseconds(), sample_rate)
200                    .unwrap(),
201            );
202            buffer.set_pts(Some(pts));
203            buffer.set_duration(next_pts - pts);
204
205            // sometimes nothing reaches the output
206            if chunk.len() == 0 {
207                chunk.blocks.push(Default::default());
208                chunk.blocks[0].repeat(channels as u8);
209            }
210            debug_assert!(chunk.len() == 1);
211            let mut data = chunk.blocks[0].interleave();
212            let data = data.as_mut_byte_slice().expect("casting failed");
213
214            // XXXManishearth if we have a safe way to convert
215            // from Box<[f32]> to Box<[u8]> (similarly for Vec)
216            // we can use Buffer::from_slice instead
217            buffer.copy_from_slice(0, data).expect("copying failed");
218
219            sample_offset += n_samples;
220            self.sample_offset.set(sample_offset);
221        }
222
223        self.appsrc
224            .push_buffer(buffer)
225            .map(|_| ())
226            .map_err(|_| AudioSinkError::BufferPushFailed)
227    }
228
229    fn set_eos_callback(&self, _: Box<dyn Fn(Box<dyn AsRef<[f32]>>) + Send + Sync + 'static>) {}
230}
231
232impl Drop for GStreamerAudioSink {
233    fn drop(&mut self) {
234        let _ = self.stop();
235    }
236}