Skip to main content

servo_media_gstreamer/
lib.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
4
5pub mod audio_decoder;
6pub mod audio_sink;
7pub mod audio_stream_reader;
8mod datachannel;
9mod device_monitor;
10pub mod media_capture;
11pub mod media_stream;
12mod media_stream_source;
13pub mod player;
14mod registry_scanner;
15mod render;
16mod source;
17pub mod webrtc;
18
19use std::collections::HashMap;
20use std::path::PathBuf;
21use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
22use std::sync::mpsc::{self, Sender};
23use std::sync::{Arc, LazyLock, Mutex, OnceLock, Weak};
24use std::thread;
25use std::vec::Vec;
26
27use device_monitor::GStreamerDeviceMonitor;
28use gstreamer::prelude::*;
29use ipc_channel::ipc::IpcSender;
30use log::warn;
31use media_stream::GStreamerMediaStream;
32use mime::Mime;
33use registry_scanner::GSTREAMER_REGISTRY_SCANNER;
34use servo_media::{Backend, BackendDeInit, BackendInit, MediaInstanceError, SupportsMediaType};
35use servo_media_audio::context::{AudioContext, AudioContextOptions};
36use servo_media_audio::decoder::AudioDecoder;
37use servo_media_audio::sink::AudioSinkError;
38use servo_media_audio::{AudioBackend, AudioStreamReader};
39use servo_media_player::audio::AudioRenderer;
40use servo_media_player::context::PlayerGLContext;
41use servo_media_player::video::VideoFrameRenderer;
42use servo_media_player::{Player, PlayerEvent, StreamType};
43use servo_media_streams::capture::MediaTrackConstraintSet;
44use servo_media_streams::device_monitor::MediaDeviceMonitor;
45use servo_media_streams::registry::MediaStreamId;
46use servo_media_streams::{MediaOutput, MediaSocket, MediaStreamType};
47use servo_media_traits::{BackendMsg, ClientContextId, MediaInstance};
48use servo_media_webrtc::{WebRtcBackend, WebRtcController, WebRtcSignaller};
49
50static BACKEND_BASE_TIME: LazyLock<gstreamer::ClockTime> =
51    LazyLock::new(|| gstreamer::SystemClock::obtain().time());
52
53static BACKEND_THREAD: OnceLock<bool> = OnceLock::new();
54
55pub type WeakMediaInstance = Weak<Mutex<dyn MediaInstance>>;
56pub type WeakMediaInstanceHashMap = HashMap<ClientContextId, Vec<(usize, WeakMediaInstance)>>;
57
58pub struct GStreamerBackend {
59    capture_mocking: AtomicBool,
60    instances: Arc<Mutex<WeakMediaInstanceHashMap>>,
61    next_instance_id: AtomicUsize,
62    /// Channel to communicate media instances with its owner Backend.
63    backend_chan: Arc<Mutex<Sender<BackendMsg>>>,
64}
65
66#[derive(Debug)]
67#[allow(dead_code)]
68pub struct ErrorLoadingPlugins<'a>(Vec<&'a str>);
69
70impl GStreamerBackend {
71    pub fn init_with_plugins<'a, T: AsRef<str>>(
72        plugin_dir: PathBuf,
73        plugins: &'a [T],
74    ) -> Result<Box<dyn Backend>, ErrorLoadingPlugins<'a>> {
75        gstreamer::init().unwrap();
76
77        // GStreamer between 1.19.1 and 1.22.7 will not send messages like "end of stream"
78        // to GstPlayer unless there is a GLib main loop running somewhere. We should remove
79        // this workaround when we raise of required version of GStreamer.
80        // See https://github.com/servo/media/pull/393.
81        let needs_background_glib_main_loop = {
82            let (major, minor, micro, _) = gstreamer::version();
83            (major, minor, micro) >= (1, 19, 1) && (major, minor, micro) <= (1, 22, 7)
84        };
85
86        if needs_background_glib_main_loop {
87            BACKEND_THREAD.get_or_init(|| {
88                thread::spawn(|| glib::MainLoop::new(None, false).run());
89                true
90            });
91        }
92
93        let mut errors = vec![];
94        for plugin in plugins {
95            let mut path = plugin_dir.clone();
96            path.push(plugin.as_ref());
97            let registry = gstreamer::Registry::get();
98            if gstreamer::Plugin::load_file(&path)
99                .is_ok_and(|plugin| registry.add_plugin(&plugin).is_ok())
100            {
101                continue;
102            }
103            errors.push(plugin.as_ref());
104        }
105
106        if !errors.is_empty() {
107            return Err(ErrorLoadingPlugins(errors));
108        }
109
110        type MediaInstancesVec = Vec<(usize, Weak<Mutex<dyn MediaInstance>>)>;
111        let instances: HashMap<ClientContextId, MediaInstancesVec> = Default::default();
112        let instances = Arc::new(Mutex::new(instances));
113
114        let instances_ = instances.clone();
115        let (backend_chan, recvr) = mpsc::channel();
116        thread::Builder::new()
117            .name("GStreamerBackend ShutdownThread".to_owned())
118            .spawn(move || {
119                match recvr.recv().unwrap() {
120                    BackendMsg::Shutdown {
121                        context,
122                        id,
123                        tx_ack,
124                    } => {
125                        let mut instances_ = instances_.lock().unwrap();
126                        if let Some(vec) = instances_.get_mut(&context) {
127                            vec.retain(|m| m.0 != id);
128                            if vec.is_empty() {
129                                instances_.remove(&context);
130                            }
131                        }
132                        // tell caller we are done removing this instance
133                        let _ = tx_ack.send(());
134                    },
135                };
136            })
137            .unwrap();
138
139        Ok(Box::new(GStreamerBackend {
140            capture_mocking: AtomicBool::new(false),
141            instances,
142            next_instance_id: AtomicUsize::new(0),
143            backend_chan: Arc::new(Mutex::new(backend_chan)),
144        }))
145    }
146
147    fn media_instance_action(
148        &self,
149        id: &ClientContextId,
150        cb: &dyn Fn(&dyn MediaInstance) -> Result<(), MediaInstanceError>,
151    ) {
152        let mut instances = self.instances.lock().unwrap();
153        match instances.get_mut(id) {
154            Some(vec) => vec.retain(|(_, weak)| match weak.upgrade() {
155                Some(instance) => {
156                    if cb(&*(instance.lock().unwrap())).is_err() {
157                        warn!("Error executing media instance action");
158                    }
159                    true
160                },
161                _ => false,
162            }),
163            None => {
164                warn!("Trying to exec media action on an unknown client context");
165            },
166        }
167    }
168}
169
170impl Backend for GStreamerBackend {
171    #[expect(clippy::redundant_clone, reason = "False positive")]
172    fn create_player(
173        &self,
174        context_id: &ClientContextId,
175        stream_type: StreamType,
176        sender: IpcSender<PlayerEvent>,
177        renderer: Option<Arc<Mutex<dyn VideoFrameRenderer>>>,
178        audio_renderer: Option<Arc<Mutex<dyn AudioRenderer>>>,
179        gl_context: Box<dyn PlayerGLContext>,
180    ) -> Arc<Mutex<dyn Player>> {
181        let id = self.next_instance_id.fetch_add(1, Ordering::Relaxed);
182        let player = Arc::new(Mutex::new(player::GStreamerPlayer::new(
183            id,
184            context_id,
185            self.backend_chan.clone(),
186            stream_type,
187            sender,
188            renderer,
189            audio_renderer,
190            gl_context,
191        )));
192        let mut instances = self.instances.lock().unwrap();
193        let entry = instances.entry(*context_id).or_default();
194        entry.push((id, Arc::downgrade(&player).clone()));
195        player
196    }
197
198    #[expect(clippy::redundant_clone, reason = "False positive")]
199    fn create_audio_context(
200        &self,
201        client_context_id: &ClientContextId,
202        options: AudioContextOptions,
203    ) -> Result<Arc<Mutex<AudioContext>>, AudioSinkError> {
204        let id = self.next_instance_id.fetch_add(1, Ordering::Relaxed);
205        let audio_context =
206            AudioContext::new::<Self>(id, client_context_id, self.backend_chan.clone(), options)?;
207
208        let audio_context = Arc::new(Mutex::new(audio_context));
209
210        let mut instances = self.instances.lock().unwrap();
211        let entry = instances.entry(*client_context_id).or_default();
212        entry.push((id, Arc::downgrade(&audio_context).clone()));
213
214        Ok(audio_context)
215    }
216
217    fn create_webrtc(&self, signaller: Box<dyn WebRtcSignaller>) -> WebRtcController {
218        WebRtcController::new::<Self>(signaller)
219    }
220
221    fn create_audiostream(&self) -> MediaStreamId {
222        GStreamerMediaStream::create_audio()
223    }
224
225    fn create_videostream(&self) -> MediaStreamId {
226        GStreamerMediaStream::create_video()
227    }
228
229    fn create_stream_output(&self) -> Box<dyn MediaOutput> {
230        Box::new(media_stream::MediaSink::default())
231    }
232
233    fn create_stream_and_socket(
234        &self,
235        ty: MediaStreamType,
236    ) -> (Box<dyn MediaSocket>, MediaStreamId) {
237        let (id, socket) = GStreamerMediaStream::create_proxy(ty);
238        (Box::new(socket), id)
239    }
240
241    fn create_audioinput_stream(&self, set: MediaTrackConstraintSet) -> Option<MediaStreamId> {
242        if self.capture_mocking.load(Ordering::Acquire) {
243            // XXXManishearth we should caps filter this
244            return Some(self.create_audiostream());
245        }
246        media_capture::create_audioinput_stream(set)
247    }
248
249    fn create_videoinput_stream(&self, set: MediaTrackConstraintSet) -> Option<MediaStreamId> {
250        if self.capture_mocking.load(Ordering::Acquire) {
251            // XXXManishearth we should caps filter this
252            return Some(self.create_videostream());
253        }
254        media_capture::create_videoinput_stream(set)
255    }
256
257    fn can_play_type(&self, media_type: &str) -> SupportsMediaType {
258        if let Ok(mime) = media_type.parse::<Mime>() {
259            let mime_type = mime.type_().as_str().to_owned() + "/" + mime.subtype().as_str();
260            let codecs = match mime.get_param("codecs") {
261                Some(codecs) => codecs
262                    .as_str()
263                    .split(',')
264                    .map(|codec| codec.trim())
265                    .collect(),
266                None => vec![],
267            };
268
269            if GSTREAMER_REGISTRY_SCANNER.is_container_type_supported(&mime_type) {
270                if codecs.is_empty() {
271                    return SupportsMediaType::Maybe;
272                } else if GSTREAMER_REGISTRY_SCANNER.are_all_codecs_supported(&codecs) {
273                    return SupportsMediaType::Probably;
274                } else {
275                    return SupportsMediaType::No;
276                }
277            }
278        }
279        SupportsMediaType::No
280    }
281
282    fn set_capture_mocking(&self, mock: bool) {
283        self.capture_mocking.store(mock, Ordering::Release)
284    }
285
286    fn mute(&self, id: &ClientContextId, val: bool) {
287        self.media_instance_action(
288            id,
289            &(move |instance: &dyn MediaInstance| instance.mute(val)),
290        );
291    }
292
293    fn suspend(&self, id: &ClientContextId) {
294        self.media_instance_action(id, &|instance: &dyn MediaInstance| instance.suspend());
295    }
296
297    fn resume(&self, id: &ClientContextId) {
298        self.media_instance_action(id, &|instance: &dyn MediaInstance| instance.resume());
299    }
300
301    fn get_device_monitor(&self) -> Box<dyn MediaDeviceMonitor> {
302        Box::new(GStreamerDeviceMonitor::new())
303    }
304}
305
306impl AudioBackend for GStreamerBackend {
307    type Sink = audio_sink::GStreamerAudioSink;
308    fn make_decoder() -> Box<dyn AudioDecoder> {
309        Box::new(audio_decoder::GStreamerAudioDecoder::new())
310    }
311    fn make_sink() -> Result<Self::Sink, AudioSinkError> {
312        audio_sink::GStreamerAudioSink::new()
313    }
314
315    fn make_streamreader(
316        id: MediaStreamId,
317        sample_rate: f32,
318    ) -> Result<Box<dyn AudioStreamReader + Send>, AudioSinkError> {
319        let reader = audio_stream_reader::GStreamerAudioStreamReader::new(id, sample_rate)
320            .map_err(AudioSinkError::Backend)?;
321        Ok(Box::new(reader))
322    }
323}
324
325impl WebRtcBackend for GStreamerBackend {
326    type Controller = webrtc::GStreamerWebRtcController;
327
328    fn construct_webrtc_controller(
329        signaller: Box<dyn WebRtcSignaller>,
330        thread: WebRtcController,
331    ) -> Self::Controller {
332        webrtc::construct(signaller, thread).expect("WebRTC creation failed")
333    }
334}
335
336impl BackendInit for GStreamerBackend {
337    fn init() -> Box<dyn Backend> {
338        Self::init_with_plugins::<&str>(PathBuf::new(), &[]).unwrap()
339    }
340}
341
342impl BackendDeInit for GStreamerBackend {
343    fn deinit(&self) {
344        let to_shutdown: Vec<(ClientContextId, usize)> = {
345            let map = self.instances.lock().unwrap();
346            map.iter()
347                .flat_map(|(ctx, v)| v.iter().map(move |(id, _)| (*ctx, *id)))
348                .collect()
349        };
350
351        for (ctx, id) in to_shutdown {
352            let (tx_ack, rx_ack) = mpsc::channel();
353            let _ = self
354                .backend_chan
355                .lock()
356                .unwrap()
357                .send(BackendMsg::Shutdown {
358                    context: ctx,
359                    id,
360                    tx_ack,
361                });
362            let _ = rx_ack.recv();
363        }
364    }
365}