servo_media_gstreamer/
lib.rs

1pub mod audio_decoder;
2pub mod audio_sink;
3pub mod audio_stream_reader;
4mod datachannel;
5mod device_monitor;
6pub mod media_capture;
7pub mod media_stream;
8mod media_stream_source;
9pub mod player;
10mod registry_scanner;
11mod render;
12mod source;
13pub mod webrtc;
14
15use device_monitor::GStreamerDeviceMonitor;
16use gst::prelude::*;
17use ipc_channel::ipc::IpcSender;
18use log::warn;
19use media_stream::GStreamerMediaStream;
20use mime::Mime;
21use once_cell::sync::{Lazy, OnceCell};
22use registry_scanner::GSTREAMER_REGISTRY_SCANNER;
23use servo_media::{Backend, BackendDeInit, BackendInit, SupportsMediaType};
24use servo_media_audio::context::{AudioContext, AudioContextOptions};
25use servo_media_audio::decoder::AudioDecoder;
26use servo_media_audio::sink::AudioSinkError;
27use servo_media_audio::{AudioBackend, AudioStreamReader};
28use servo_media_player::audio::AudioRenderer;
29use servo_media_player::context::PlayerGLContext;
30use servo_media_player::video::VideoFrameRenderer;
31use servo_media_player::{Player, PlayerEvent, StreamType};
32use servo_media_streams::capture::MediaTrackConstraintSet;
33use servo_media_streams::device_monitor::MediaDeviceMonitor;
34use servo_media_streams::registry::MediaStreamId;
35use servo_media_streams::{MediaOutput, MediaSocket, MediaStreamType};
36use servo_media_traits::{BackendMsg, ClientContextId, MediaInstance};
37use servo_media_webrtc::{WebRtcBackend, WebRtcController, WebRtcSignaller};
38use std::collections::HashMap;
39use std::path::PathBuf;
40use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
41use std::sync::mpsc::{self, Sender};
42use std::sync::{Arc, Mutex, Weak};
43use std::thread;
44use std::vec::Vec;
45
46static BACKEND_BASE_TIME: Lazy<gst::ClockTime> = Lazy::new(|| gst::SystemClock::obtain().time());
47
48static BACKEND_THREAD: OnceCell<bool> = OnceCell::new();
49
50pub struct GStreamerBackend {
51    capture_mocking: AtomicBool,
52    instances: Arc<Mutex<HashMap<ClientContextId, Vec<(usize, Weak<Mutex<dyn MediaInstance>>)>>>>,
53    next_instance_id: AtomicUsize,
54    /// Channel to communicate media instances with its owner Backend.
55    backend_chan: Arc<Mutex<Sender<BackendMsg>>>,
56}
57
58#[derive(Debug)]
59#[allow(dead_code)]
60pub struct ErrorLoadingPlugins(Vec<&'static str>);
61
62impl GStreamerBackend {
63    pub fn init_with_plugins(
64        plugin_dir: PathBuf,
65        plugins: &[&'static str],
66    ) -> Result<Box<dyn Backend>, ErrorLoadingPlugins> {
67        gst::init().unwrap();
68
69        // GStreamer between 1.19.1 and 1.22.7 will not send messages like "end of stream"
70        // to GstPlayer unless there is a GLib main loop running somewhere. We should remove
71        // this workaround when we raise of required version of GStreamer.
72        // See https://github.com/servo/media/pull/393.
73        let needs_background_glib_main_loop = {
74            let (major, minor, micro, _) = gst::version();
75            (major, minor, micro) >= (1, 19, 1) && (major, minor, micro) <= (1, 22, 7)
76        };
77
78        if needs_background_glib_main_loop {
79            BACKEND_THREAD.get_or_init(|| {
80                thread::spawn(|| glib::MainLoop::new(None, false).run());
81                true
82            });
83        }
84
85        let mut errors = vec![];
86        for plugin in plugins {
87            let mut path = plugin_dir.clone();
88            path.push(plugin);
89            let registry = gst::Registry::get();
90            if let Ok(p) = gst::Plugin::load_file(&path) {
91                if registry.add_plugin(&p).is_ok() {
92                    continue;
93                }
94            }
95            errors.push(*plugin);
96        }
97
98        if !errors.is_empty() {
99            return Err(ErrorLoadingPlugins(errors));
100        }
101
102        let instances: Arc<
103            Mutex<HashMap<ClientContextId, Vec<(usize, Weak<Mutex<dyn MediaInstance>>)>>>,
104        > = Arc::new(Mutex::new(HashMap::new()));
105
106        let instances_ = instances.clone();
107        let (backend_chan, recvr) = mpsc::channel();
108        thread::Builder::new()
109            .name("GStreamerBackend ShutdownThread".to_owned())
110            .spawn(move || {
111                match recvr.recv().unwrap() {
112                    BackendMsg::Shutdown {
113                        context,
114                        id,
115                        tx_ack,
116                    } => {
117                        let mut instances_ = instances_.lock().unwrap();
118                        if let Some(vec) = instances_.get_mut(&context) {
119                            vec.retain(|m| m.0 != id);
120                            if vec.is_empty() {
121                                instances_.remove(&context);
122                            }
123                        }
124                        // tell caller we are done removing this instance
125                        let _ = tx_ack.send(());
126                    },
127                };
128            })
129            .unwrap();
130
131        Ok(Box::new(GStreamerBackend {
132            capture_mocking: AtomicBool::new(false),
133            instances,
134            next_instance_id: AtomicUsize::new(0),
135            backend_chan: Arc::new(Mutex::new(backend_chan)),
136        }))
137    }
138
139    fn media_instance_action(
140        &self,
141        id: &ClientContextId,
142        cb: &dyn Fn(&dyn MediaInstance) -> Result<(), ()>,
143    ) {
144        let mut instances = self.instances.lock().unwrap();
145        match instances.get_mut(id) {
146            Some(vec) => vec.retain(|(_, weak)| {
147                if let Some(instance) = weak.upgrade() {
148                    if cb(&*(instance.lock().unwrap())).is_err() {
149                        warn!("Error executing media instance action");
150                    }
151                    true
152                } else {
153                    false
154                }
155            }),
156            None => {
157                warn!("Trying to exec media action on an unknown client context");
158            },
159        }
160    }
161}
162
163impl Backend for GStreamerBackend {
164    fn create_player(
165        &self,
166        context_id: &ClientContextId,
167        stream_type: StreamType,
168        sender: IpcSender<PlayerEvent>,
169        renderer: Option<Arc<Mutex<dyn VideoFrameRenderer>>>,
170        audio_renderer: Option<Arc<Mutex<dyn AudioRenderer>>>,
171        gl_context: Box<dyn PlayerGLContext>,
172    ) -> Arc<Mutex<dyn Player>> {
173        let id = self.next_instance_id.fetch_add(1, Ordering::Relaxed);
174        let player = Arc::new(Mutex::new(player::GStreamerPlayer::new(
175            id,
176            context_id,
177            self.backend_chan.clone(),
178            stream_type,
179            sender,
180            renderer,
181            audio_renderer,
182            gl_context,
183        )));
184        let mut instances = self.instances.lock().unwrap();
185        let entry = instances.entry(*context_id).or_insert(Vec::new());
186        entry.push((id, Arc::downgrade(&player).clone()));
187        player
188    }
189
190    fn create_audio_context(
191        &self,
192        client_context_id: &ClientContextId,
193        options: AudioContextOptions,
194    ) -> Result<Arc<Mutex<AudioContext>>, AudioSinkError> {
195        let id = self.next_instance_id.fetch_add(1, Ordering::Relaxed);
196        let audio_context =
197            AudioContext::new::<Self>(id, client_context_id, self.backend_chan.clone(), options)?;
198
199        let audio_context = Arc::new(Mutex::new(audio_context));
200
201        let mut instances = self.instances.lock().unwrap();
202        let entry = instances.entry(*client_context_id).or_insert(Vec::new());
203        entry.push((id, Arc::downgrade(&audio_context).clone()));
204
205        Ok(audio_context)
206    }
207
208    fn create_webrtc(&self, signaller: Box<dyn WebRtcSignaller>) -> WebRtcController {
209        WebRtcController::new::<Self>(signaller)
210    }
211
212    fn create_audiostream(&self) -> MediaStreamId {
213        GStreamerMediaStream::create_audio()
214    }
215
216    fn create_videostream(&self) -> MediaStreamId {
217        GStreamerMediaStream::create_video()
218    }
219
220    fn create_stream_output(&self) -> Box<dyn MediaOutput> {
221        Box::new(media_stream::MediaSink::new())
222    }
223
224    fn create_stream_and_socket(
225        &self,
226        ty: MediaStreamType,
227    ) -> (Box<dyn MediaSocket>, MediaStreamId) {
228        let (id, socket) = GStreamerMediaStream::create_proxy(ty);
229        (Box::new(socket), id)
230    }
231
232    fn create_audioinput_stream(&self, set: MediaTrackConstraintSet) -> Option<MediaStreamId> {
233        if self.capture_mocking.load(Ordering::Acquire) {
234            // XXXManishearth we should caps filter this
235            return Some(self.create_audiostream());
236        }
237        media_capture::create_audioinput_stream(set)
238    }
239
240    fn create_videoinput_stream(&self, set: MediaTrackConstraintSet) -> Option<MediaStreamId> {
241        if self.capture_mocking.load(Ordering::Acquire) {
242            // XXXManishearth we should caps filter this
243            return Some(self.create_videostream());
244        }
245        media_capture::create_videoinput_stream(set)
246    }
247
248    fn can_play_type(&self, media_type: &str) -> SupportsMediaType {
249        if let Ok(mime) = media_type.parse::<Mime>() {
250            // XXX GStreamer is currently not very reliable playing OGG and most of
251            //     the media related WPTs uses OGG if we report that we are able to
252            //     play this type. So we report that we are unable to play it to force
253            //     the usage of other types.
254            //     https://gitlab.freedesktop.org/gstreamer/gst-plugins-base/issues/520
255            if mime.subtype() == mime::OGG {
256                return SupportsMediaType::No;
257            }
258
259            let mime_type = mime.type_().as_str().to_owned() + "/" + mime.subtype().as_str();
260            let codecs = match mime.get_param("codecs") {
261                Some(codecs) => codecs
262                    .as_str()
263                    .split(',')
264                    .map(|codec| codec.trim())
265                    .collect(),
266                None => vec![],
267            };
268
269            if GSTREAMER_REGISTRY_SCANNER.is_container_type_supported(&mime_type) {
270                if codecs.is_empty() {
271                    return SupportsMediaType::Maybe;
272                } else if GSTREAMER_REGISTRY_SCANNER.are_all_codecs_supported(&codecs) {
273                    return SupportsMediaType::Probably;
274                } else {
275                    return SupportsMediaType::No;
276                }
277            }
278        }
279        SupportsMediaType::No
280    }
281
282    fn set_capture_mocking(&self, mock: bool) {
283        self.capture_mocking.store(mock, Ordering::Release)
284    }
285
286    fn mute(&self, id: &ClientContextId, val: bool) {
287        self.media_instance_action(
288            id,
289            &(move |instance: &dyn MediaInstance| instance.mute(val)),
290        );
291    }
292
293    fn suspend(&self, id: &ClientContextId) {
294        self.media_instance_action(id, &|instance: &dyn MediaInstance| instance.suspend());
295    }
296
297    fn resume(&self, id: &ClientContextId) {
298        self.media_instance_action(id, &|instance: &dyn MediaInstance| instance.resume());
299    }
300
301    fn get_device_monitor(&self) -> Box<dyn MediaDeviceMonitor> {
302        Box::new(GStreamerDeviceMonitor::new())
303    }
304}
305
306impl AudioBackend for GStreamerBackend {
307    type Sink = audio_sink::GStreamerAudioSink;
308    fn make_decoder() -> Box<dyn AudioDecoder> {
309        Box::new(audio_decoder::GStreamerAudioDecoder::new())
310    }
311    fn make_sink() -> Result<Self::Sink, AudioSinkError> {
312        audio_sink::GStreamerAudioSink::new()
313    }
314
315    fn make_streamreader(id: MediaStreamId, sample_rate: f32) -> Box<dyn AudioStreamReader + Send> {
316        Box::new(audio_stream_reader::GStreamerAudioStreamReader::new(id, sample_rate).unwrap())
317    }
318}
319
320impl WebRtcBackend for GStreamerBackend {
321    type Controller = webrtc::GStreamerWebRtcController;
322
323    fn construct_webrtc_controller(
324        signaller: Box<dyn WebRtcSignaller>,
325        thread: WebRtcController,
326    ) -> Self::Controller {
327        webrtc::construct(signaller, thread).expect("WebRTC creation failed")
328    }
329}
330
331impl BackendInit for GStreamerBackend {
332    fn init() -> Box<dyn Backend> {
333        Self::init_with_plugins(PathBuf::new(), &[]).unwrap()
334    }
335}
336
337impl BackendDeInit for GStreamerBackend {
338    fn deinit(&self) {
339        let to_shutdown: Vec<(ClientContextId, usize)> = {
340            let map = self.instances.lock().unwrap();
341            map.iter()
342                .flat_map(|(ctx, v)| v.iter().map(move |(id, _)| (*ctx, *id)))
343                .collect()
344        };
345
346        for (ctx, id) in to_shutdown {
347            let (tx_ack, rx_ack) = mpsc::channel();
348            let _ = self
349                .backend_chan
350                .lock()
351                .unwrap()
352                .send(BackendMsg::Shutdown {
353                    context: ctx,
354                    id,
355                    tx_ack,
356                });
357            let _ = rx_ack.recv();
358        }
359    }
360}