Skip to main content

servo_media_gstreamer/
audio_sink.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
4
5use std::cell::{Cell, RefCell};
6use std::sync::Arc;
7use std::sync::mpsc::Sender;
8use std::thread::Builder;
9
10use byte_slice_cast::*;
11use gstreamer;
12use gstreamer::prelude::*;
13use gstreamer_app::{AppSrc, AppSrcCallbacks};
14use gstreamer_audio;
15use servo_media_audio::block::{Chunk, FRAMES_PER_BLOCK};
16use servo_media_audio::render_thread::AudioRenderThreadMsg;
17use servo_media_audio::sink::{AudioSink, AudioSinkError};
18use servo_media_streams::MediaSocket;
19
20use crate::media_stream::GstreamerMediaSocket;
21
22const DEFAULT_SAMPLE_RATE: f32 = 44100.;
23
24pub struct GStreamerAudioSink {
25    pipeline: gstreamer::Pipeline,
26    appsrc: Arc<AppSrc>,
27    sample_rate: Cell<f32>,
28    audio_info: RefCell<Option<gstreamer_audio::AudioInfo>>,
29    sample_offset: Cell<u64>,
30}
31
32impl GStreamerAudioSink {
33    pub fn new() -> Result<Self, AudioSinkError> {
34        if let Some(category) = gstreamer::DebugCategory::get("openslessink") {
35            category.set_threshold(gstreamer::DebugLevel::Trace);
36        }
37        gstreamer::init().map_err(|error| {
38            AudioSinkError::Backend(format!("GStreamer init failed: {error:?}"))
39        })?;
40
41        let appsrc = gstreamer::ElementFactory::make("appsrc")
42            .build()
43            .map_err(|error| {
44                AudioSinkError::Backend(format!("appsrc creation failed: {error:?}"))
45            })?;
46        let appsrc = appsrc.downcast::<AppSrc>().unwrap();
47
48        Ok(Self {
49            pipeline: gstreamer::Pipeline::new(),
50            appsrc: Arc::new(appsrc),
51            sample_rate: Cell::new(DEFAULT_SAMPLE_RATE),
52            audio_info: RefCell::new(None),
53            sample_offset: Cell::new(0),
54        })
55    }
56}
57
58impl GStreamerAudioSink {
59    fn set_audio_info(&self, sample_rate: f32, channels: u8) -> Result<(), AudioSinkError> {
60        let audio_info = gstreamer_audio::AudioInfo::builder(
61            gstreamer_audio::AUDIO_FORMAT_F32,
62            sample_rate as u32,
63            channels.into(),
64        )
65        .build()
66        .map_err(|error| AudioSinkError::Backend(format!("AudioInfo failed: {error:?}")))?;
67        self.appsrc.set_caps(audio_info.to_caps().ok().as_ref());
68        *self.audio_info.borrow_mut() = Some(audio_info);
69        Ok(())
70    }
71
72    fn set_channels_if_changed(&self, channels: u8) -> Result<(), AudioSinkError> {
73        let curr_channels = match self.audio_info.borrow().as_ref() {
74            Some(ch) => ch.channels(),
75            _ => {
76                return Ok(());
77            },
78        };
79        if channels != curr_channels as u8 {
80            self.set_audio_info(self.sample_rate.get(), channels)?;
81        }
82        Ok(())
83    }
84}
85
86impl AudioSink for GStreamerAudioSink {
87    fn init(
88        &self,
89        sample_rate: f32,
90        graph_thread_channel: Sender<AudioRenderThreadMsg>,
91    ) -> Result<(), AudioSinkError> {
92        self.sample_rate.set(sample_rate);
93        self.set_audio_info(sample_rate, 2)?;
94        self.appsrc.set_format(gstreamer::Format::Time);
95
96        // Allow only a single chunk.
97        self.appsrc.set_max_bytes(1);
98
99        let appsrc = self.appsrc.clone();
100        Builder::new()
101            .name("GstAppSrcCallbacks".to_owned())
102            .spawn(move || {
103                let need_data = move |_: &AppSrc, _: u32| {
104                    if let Err(e) = graph_thread_channel.send(AudioRenderThreadMsg::SinkNeedData) {
105                        log::warn!("Error sending need data event: {:?}", e);
106                    }
107                };
108                appsrc.set_callbacks(AppSrcCallbacks::builder().need_data(need_data).build());
109            })
110            .unwrap();
111
112        let appsrc = self.appsrc.as_ref().clone().upcast();
113        let resample = gstreamer::ElementFactory::make("audioresample")
114            .build()
115            .map_err(|error| {
116                AudioSinkError::Backend(format!("audioresample creation failed: {error:?}"))
117            })?;
118        let convert = gstreamer::ElementFactory::make("audioconvert")
119            .build()
120            .map_err(|error| {
121                AudioSinkError::Backend(format!("audioconvert creation failed: {error:?}"))
122            })?;
123        let sink = gstreamer::ElementFactory::make("autoaudiosink")
124            .build()
125            .map_err(|error| {
126                AudioSinkError::Backend(format!("autoaudiosink creation failed: {error:?}"))
127            })?;
128        self.pipeline
129            .add_many([&appsrc, &resample, &convert, &sink])
130            .map_err(|error| AudioSinkError::Backend(error.to_string()))?;
131        gstreamer::Element::link_many([&appsrc, &resample, &convert, &sink])
132            .map_err(|error| AudioSinkError::Backend(error.to_string()))?;
133
134        Ok(())
135    }
136
137    fn init_stream(
138        &self,
139        channels: u8,
140        sample_rate: f32,
141        socket: Box<dyn MediaSocket>,
142    ) -> Result<(), AudioSinkError> {
143        self.sample_rate.set(sample_rate);
144        self.set_audio_info(sample_rate, channels)?;
145        self.appsrc.set_format(gstreamer::Format::Time);
146
147        // Do not set max bytes or callback, we will push as needed
148
149        let appsrc = self.appsrc.as_ref().clone().upcast();
150        let convert = gstreamer::ElementFactory::make("audioconvert")
151            .build()
152            .map_err(|error| {
153                AudioSinkError::Backend(format!("audioconvert creation failed: {error:?}"))
154            })?;
155        let sink = socket
156            .as_any()
157            .downcast_ref::<GstreamerMediaSocket>()
158            .unwrap()
159            .proxy_sink()
160            .clone();
161
162        self.pipeline
163            .add_many([&appsrc, &convert, &sink])
164            .map_err(|error| AudioSinkError::Backend(error.to_string()))?;
165        gstreamer::Element::link_many([&appsrc, &convert, &sink])
166            .map_err(|error| AudioSinkError::Backend(error.to_string()))?;
167
168        Ok(())
169    }
170
171    fn play(&self) -> Result<(), AudioSinkError> {
172        self.pipeline
173            .set_state(gstreamer::State::Playing)
174            .map(|_| ())
175            .map_err(|_| AudioSinkError::StateChangeFailed)
176    }
177
178    fn stop(&self) -> Result<(), AudioSinkError> {
179        self.pipeline
180            .set_state(gstreamer::State::Paused)
181            .map(|_| ())
182            .map_err(|_| AudioSinkError::StateChangeFailed)
183    }
184
185    fn has_enough_data(&self) -> bool {
186        self.appsrc.current_level_bytes() >= self.appsrc.max_bytes()
187    }
188
189    fn push_data(&self, mut chunk: Chunk) -> Result<(), AudioSinkError> {
190        if let Some(block) = chunk.blocks.first() {
191            self.set_channels_if_changed(block.chan_count())?;
192        }
193
194        let sample_rate = self.sample_rate.get() as u64;
195        let audio_info = self.audio_info.borrow();
196        let audio_info = audio_info.as_ref().unwrap();
197        let channels = audio_info.channels();
198        let bpf = audio_info.bpf() as usize;
199        assert_eq!(bpf, 4 * channels as usize);
200        let n_samples = FRAMES_PER_BLOCK.0;
201        let buf_size = (n_samples as usize) * (bpf);
202        let mut buffer = gstreamer::Buffer::with_size(buf_size).unwrap();
203        {
204            let buffer = buffer.get_mut().unwrap();
205            let mut sample_offset = self.sample_offset.get();
206            // Calculate the current timestamp (PTS) and the next one,
207            // and calculate the duration from the difference instead of
208            // simply the number of samples to prevent rounding errors
209            let pts = gstreamer::ClockTime::from_nseconds(
210                sample_offset
211                    .mul_div_floor(gstreamer::ClockTime::SECOND.nseconds(), sample_rate)
212                    .unwrap(),
213            );
214            let next_pts: gstreamer::ClockTime = gstreamer::ClockTime::from_nseconds(
215                (sample_offset + n_samples)
216                    .mul_div_floor(gstreamer::ClockTime::SECOND.nseconds(), sample_rate)
217                    .unwrap(),
218            );
219            buffer.set_pts(Some(pts));
220            buffer.set_duration(next_pts - pts);
221
222            // sometimes nothing reaches the output
223            if chunk.is_empty() {
224                chunk.blocks.push(Default::default());
225                chunk.blocks[0].repeat(channels as u8);
226            }
227            debug_assert!(chunk.len() == 1);
228            let mut data = chunk.blocks[0].interleave();
229            let data = data.as_mut_byte_slice();
230
231            // XXXManishearth if we have a safe way to convert
232            // from Box<[f32]> to Box<[u8]> (similarly for Vec)
233            // we can use Buffer::from_slice instead
234            buffer.copy_from_slice(0, data).expect("copying failed");
235
236            sample_offset += n_samples;
237            self.sample_offset.set(sample_offset);
238        }
239
240        self.appsrc
241            .push_buffer(buffer)
242            .map(|_| ())
243            .map_err(|_| AudioSinkError::BufferPushFailed)
244    }
245
246    fn set_eos_callback(&self, _: Box<dyn Fn(Box<dyn AsRef<[f32]>>) + Send + Sync + 'static>) {}
247}
248
249impl Drop for GStreamerAudioSink {
250    fn drop(&mut self) {
251        let _ = self.stop();
252    }
253}