1pub mod audio_decoder;
6pub mod audio_sink;
7pub mod audio_stream_reader;
8mod datachannel;
9mod device_monitor;
10pub mod media_capture;
11pub mod media_stream;
12mod media_stream_source;
13pub mod player;
14mod registry_scanner;
15mod render;
16mod source;
17pub mod webrtc;
18
19use std::collections::HashMap;
20use std::path::PathBuf;
21use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
22use std::sync::mpsc::{self, Sender};
23use std::sync::{Arc, LazyLock, Mutex, OnceLock, Weak};
24use std::thread;
25use std::vec::Vec;
26
27use device_monitor::GStreamerDeviceMonitor;
28use gstreamer::prelude::*;
29use ipc_channel::ipc::IpcSender;
30use log::warn;
31use media_stream::GStreamerMediaStream;
32use mime::Mime;
33use registry_scanner::GSTREAMER_REGISTRY_SCANNER;
34use servo_media::{Backend, BackendDeInit, BackendInit, MediaInstanceError, SupportsMediaType};
35use servo_media_audio::context::{AudioContext, AudioContextOptions};
36use servo_media_audio::decoder::AudioDecoder;
37use servo_media_audio::sink::AudioSinkError;
38use servo_media_audio::{AudioBackend, AudioStreamReader};
39use servo_media_player::audio::AudioRenderer;
40use servo_media_player::context::PlayerGLContext;
41use servo_media_player::video::VideoFrameRenderer;
42use servo_media_player::{Player, PlayerEvent, StreamType};
43use servo_media_streams::capture::MediaTrackConstraintSet;
44use servo_media_streams::device_monitor::MediaDeviceMonitor;
45use servo_media_streams::registry::MediaStreamId;
46use servo_media_streams::{MediaOutput, MediaSocket, MediaStreamType};
47use servo_media_traits::{BackendMsg, ClientContextId, MediaInstance};
48use servo_media_webrtc::{WebRtcBackend, WebRtcController, WebRtcSignaller};
49
50static BACKEND_BASE_TIME: LazyLock<gstreamer::ClockTime> =
51 LazyLock::new(|| gstreamer::SystemClock::obtain().time());
52
53static BACKEND_THREAD: OnceLock<bool> = OnceLock::new();
54
55pub type WeakMediaInstance = Weak<Mutex<dyn MediaInstance>>;
56pub type WeakMediaInstanceHashMap = HashMap<ClientContextId, Vec<(usize, WeakMediaInstance)>>;
57
58pub struct GStreamerBackend {
59 capture_mocking: AtomicBool,
60 instances: Arc<Mutex<WeakMediaInstanceHashMap>>,
61 next_instance_id: AtomicUsize,
62 backend_chan: Arc<Mutex<Sender<BackendMsg>>>,
64}
65
66#[derive(Debug)]
67#[allow(dead_code)]
68pub struct ErrorLoadingPlugins<'a>(Vec<&'a str>);
69
70impl GStreamerBackend {
71 pub fn init_with_plugins<'a, T: AsRef<str>>(
72 plugin_dir: PathBuf,
73 plugins: &'a [T],
74 ) -> Result<Box<dyn Backend>, ErrorLoadingPlugins<'a>> {
75 gstreamer::init().unwrap();
76
77 let needs_background_glib_main_loop = {
82 let (major, minor, micro, _) = gstreamer::version();
83 (major, minor, micro) >= (1, 19, 1) && (major, minor, micro) <= (1, 22, 7)
84 };
85
86 if needs_background_glib_main_loop {
87 BACKEND_THREAD.get_or_init(|| {
88 thread::spawn(|| glib::MainLoop::new(None, false).run());
89 true
90 });
91 }
92
93 let mut errors = vec![];
94 for plugin in plugins {
95 let mut path = plugin_dir.clone();
96 path.push(plugin.as_ref());
97 let registry = gstreamer::Registry::get();
98 if gstreamer::Plugin::load_file(&path)
99 .is_ok_and(|plugin| registry.add_plugin(&plugin).is_ok())
100 {
101 continue;
102 }
103 errors.push(plugin.as_ref());
104 }
105
106 if !errors.is_empty() {
107 return Err(ErrorLoadingPlugins(errors));
108 }
109
110 type MediaInstancesVec = Vec<(usize, Weak<Mutex<dyn MediaInstance>>)>;
111 let instances: HashMap<ClientContextId, MediaInstancesVec> = Default::default();
112 let instances = Arc::new(Mutex::new(instances));
113
114 let instances_ = instances.clone();
115 let (backend_chan, recvr) = mpsc::channel();
116 thread::Builder::new()
117 .name("GStreamerBackend ShutdownThread".to_owned())
118 .spawn(move || {
119 match recvr.recv().unwrap() {
120 BackendMsg::Shutdown {
121 context,
122 id,
123 tx_ack,
124 } => {
125 let mut instances_ = instances_.lock().unwrap();
126 if let Some(vec) = instances_.get_mut(&context) {
127 vec.retain(|m| m.0 != id);
128 if vec.is_empty() {
129 instances_.remove(&context);
130 }
131 }
132 let _ = tx_ack.send(());
134 },
135 };
136 })
137 .unwrap();
138
139 Ok(Box::new(GStreamerBackend {
140 capture_mocking: AtomicBool::new(false),
141 instances,
142 next_instance_id: AtomicUsize::new(0),
143 backend_chan: Arc::new(Mutex::new(backend_chan)),
144 }))
145 }
146
147 fn media_instance_action(
148 &self,
149 id: &ClientContextId,
150 cb: &dyn Fn(&dyn MediaInstance) -> Result<(), MediaInstanceError>,
151 ) {
152 let mut instances = self.instances.lock().unwrap();
153 match instances.get_mut(id) {
154 Some(vec) => vec.retain(|(_, weak)| match weak.upgrade() {
155 Some(instance) => {
156 if cb(&*(instance.lock().unwrap())).is_err() {
157 warn!("Error executing media instance action");
158 }
159 true
160 },
161 _ => false,
162 }),
163 None => {
164 warn!("Trying to exec media action on an unknown client context");
165 },
166 }
167 }
168}
169
170impl Backend for GStreamerBackend {
171 #[expect(clippy::redundant_clone, reason = "False positive")]
172 fn create_player(
173 &self,
174 context_id: &ClientContextId,
175 stream_type: StreamType,
176 sender: IpcSender<PlayerEvent>,
177 renderer: Option<Arc<Mutex<dyn VideoFrameRenderer>>>,
178 audio_renderer: Option<Arc<Mutex<dyn AudioRenderer>>>,
179 gl_context: Box<dyn PlayerGLContext>,
180 ) -> Arc<Mutex<dyn Player>> {
181 let id = self.next_instance_id.fetch_add(1, Ordering::Relaxed);
182 let player = Arc::new(Mutex::new(player::GStreamerPlayer::new(
183 id,
184 context_id,
185 self.backend_chan.clone(),
186 stream_type,
187 sender,
188 renderer,
189 audio_renderer,
190 gl_context,
191 )));
192 let mut instances = self.instances.lock().unwrap();
193 let entry = instances.entry(*context_id).or_default();
194 entry.push((id, Arc::downgrade(&player).clone()));
195 player
196 }
197
198 #[expect(clippy::redundant_clone, reason = "False positive")]
199 fn create_audio_context(
200 &self,
201 client_context_id: &ClientContextId,
202 options: AudioContextOptions,
203 ) -> Result<Arc<Mutex<AudioContext>>, AudioSinkError> {
204 let id = self.next_instance_id.fetch_add(1, Ordering::Relaxed);
205 let audio_context =
206 AudioContext::new::<Self>(id, client_context_id, self.backend_chan.clone(), options)?;
207
208 let audio_context = Arc::new(Mutex::new(audio_context));
209
210 let mut instances = self.instances.lock().unwrap();
211 let entry = instances.entry(*client_context_id).or_default();
212 entry.push((id, Arc::downgrade(&audio_context).clone()));
213
214 Ok(audio_context)
215 }
216
217 fn create_webrtc(&self, signaller: Box<dyn WebRtcSignaller>) -> WebRtcController {
218 WebRtcController::new::<Self>(signaller)
219 }
220
221 fn create_audiostream(&self) -> MediaStreamId {
222 GStreamerMediaStream::create_audio()
223 }
224
225 fn create_videostream(&self) -> MediaStreamId {
226 GStreamerMediaStream::create_video()
227 }
228
229 fn create_stream_output(&self) -> Box<dyn MediaOutput> {
230 Box::new(media_stream::MediaSink::default())
231 }
232
233 fn create_stream_and_socket(
234 &self,
235 ty: MediaStreamType,
236 ) -> (Box<dyn MediaSocket>, MediaStreamId) {
237 let (id, socket) = GStreamerMediaStream::create_proxy(ty);
238 (Box::new(socket), id)
239 }
240
241 fn create_audioinput_stream(&self, set: MediaTrackConstraintSet) -> Option<MediaStreamId> {
242 if self.capture_mocking.load(Ordering::Acquire) {
243 return Some(self.create_audiostream());
245 }
246 media_capture::create_audioinput_stream(set)
247 }
248
249 fn create_videoinput_stream(&self, set: MediaTrackConstraintSet) -> Option<MediaStreamId> {
250 if self.capture_mocking.load(Ordering::Acquire) {
251 return Some(self.create_videostream());
253 }
254 media_capture::create_videoinput_stream(set)
255 }
256
257 fn can_play_type(&self, media_type: &str) -> SupportsMediaType {
258 if let Ok(mime) = media_type.parse::<Mime>() {
259 let mime_type = mime.type_().as_str().to_owned() + "/" + mime.subtype().as_str();
260 let codecs = match mime.get_param("codecs") {
261 Some(codecs) => codecs
262 .as_str()
263 .split(',')
264 .map(|codec| codec.trim())
265 .collect(),
266 None => vec![],
267 };
268
269 if GSTREAMER_REGISTRY_SCANNER.is_container_type_supported(&mime_type) {
270 if codecs.is_empty() {
271 return SupportsMediaType::Maybe;
272 } else if GSTREAMER_REGISTRY_SCANNER.are_all_codecs_supported(&codecs) {
273 return SupportsMediaType::Probably;
274 } else {
275 return SupportsMediaType::No;
276 }
277 }
278 }
279 SupportsMediaType::No
280 }
281
282 fn set_capture_mocking(&self, mock: bool) {
283 self.capture_mocking.store(mock, Ordering::Release)
284 }
285
286 fn mute(&self, id: &ClientContextId, val: bool) {
287 self.media_instance_action(
288 id,
289 &(move |instance: &dyn MediaInstance| instance.mute(val)),
290 );
291 }
292
293 fn suspend(&self, id: &ClientContextId) {
294 self.media_instance_action(id, &|instance: &dyn MediaInstance| instance.suspend());
295 }
296
297 fn resume(&self, id: &ClientContextId) {
298 self.media_instance_action(id, &|instance: &dyn MediaInstance| instance.resume());
299 }
300
301 fn get_device_monitor(&self) -> Box<dyn MediaDeviceMonitor> {
302 Box::new(GStreamerDeviceMonitor::new())
303 }
304}
305
306impl AudioBackend for GStreamerBackend {
307 type Sink = audio_sink::GStreamerAudioSink;
308 fn make_decoder() -> Box<dyn AudioDecoder> {
309 Box::new(audio_decoder::GStreamerAudioDecoder::new())
310 }
311 fn make_sink() -> Result<Self::Sink, AudioSinkError> {
312 audio_sink::GStreamerAudioSink::new()
313 }
314
315 fn make_streamreader(
316 id: MediaStreamId,
317 sample_rate: f32,
318 ) -> Result<Box<dyn AudioStreamReader + Send>, AudioSinkError> {
319 let reader = audio_stream_reader::GStreamerAudioStreamReader::new(id, sample_rate)
320 .map_err(AudioSinkError::Backend)?;
321 Ok(Box::new(reader))
322 }
323}
324
325impl WebRtcBackend for GStreamerBackend {
326 type Controller = webrtc::GStreamerWebRtcController;
327
328 fn construct_webrtc_controller(
329 signaller: Box<dyn WebRtcSignaller>,
330 thread: WebRtcController,
331 ) -> Self::Controller {
332 webrtc::construct(signaller, thread).expect("WebRTC creation failed")
333 }
334}
335
336impl BackendInit for GStreamerBackend {
337 fn init() -> Box<dyn Backend> {
338 Self::init_with_plugins::<&str>(PathBuf::new(), &[]).unwrap()
339 }
340}
341
342impl BackendDeInit for GStreamerBackend {
343 fn deinit(&self) {
344 let to_shutdown: Vec<(ClientContextId, usize)> = {
345 let map = self.instances.lock().unwrap();
346 map.iter()
347 .flat_map(|(ctx, v)| v.iter().map(move |(id, _)| (*ctx, *id)))
348 .collect()
349 };
350
351 for (ctx, id) in to_shutdown {
352 let (tx_ack, rx_ack) = mpsc::channel();
353 let _ = self
354 .backend_chan
355 .lock()
356 .unwrap()
357 .send(BackendMsg::Shutdown {
358 context: ctx,
359 id,
360 tx_ack,
361 });
362 let _ = rx_ack.recv();
363 }
364 }
365}