1pub mod audio_decoder;
2pub mod audio_sink;
3pub mod audio_stream_reader;
4mod datachannel;
5mod device_monitor;
6pub mod media_capture;
7pub mod media_stream;
8mod media_stream_source;
9pub mod player;
10mod registry_scanner;
11mod render;
12mod source;
13pub mod webrtc;
14
15use device_monitor::GStreamerDeviceMonitor;
16use gst::prelude::*;
17use ipc_channel::ipc::IpcSender;
18use log::warn;
19use media_stream::GStreamerMediaStream;
20use mime::Mime;
21use once_cell::sync::{Lazy, OnceCell};
22use registry_scanner::GSTREAMER_REGISTRY_SCANNER;
23use servo_media::{Backend, BackendDeInit, BackendInit, SupportsMediaType};
24use servo_media_audio::context::{AudioContext, AudioContextOptions};
25use servo_media_audio::decoder::AudioDecoder;
26use servo_media_audio::sink::AudioSinkError;
27use servo_media_audio::{AudioBackend, AudioStreamReader};
28use servo_media_player::audio::AudioRenderer;
29use servo_media_player::context::PlayerGLContext;
30use servo_media_player::video::VideoFrameRenderer;
31use servo_media_player::{Player, PlayerEvent, StreamType};
32use servo_media_streams::capture::MediaTrackConstraintSet;
33use servo_media_streams::device_monitor::MediaDeviceMonitor;
34use servo_media_streams::registry::MediaStreamId;
35use servo_media_streams::{MediaOutput, MediaSocket, MediaStreamType};
36use servo_media_traits::{BackendMsg, ClientContextId, MediaInstance};
37use servo_media_webrtc::{WebRtcBackend, WebRtcController, WebRtcSignaller};
38use std::collections::HashMap;
39use std::path::PathBuf;
40use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
41use std::sync::mpsc::{self, Sender};
42use std::sync::{Arc, Mutex, Weak};
43use std::thread;
44use std::vec::Vec;
45
46static BACKEND_BASE_TIME: Lazy<gst::ClockTime> = Lazy::new(|| gst::SystemClock::obtain().time());
47
48static BACKEND_THREAD: OnceCell<bool> = OnceCell::new();
49
50pub struct GStreamerBackend {
51 capture_mocking: AtomicBool,
52 instances: Arc<Mutex<HashMap<ClientContextId, Vec<(usize, Weak<Mutex<dyn MediaInstance>>)>>>>,
53 next_instance_id: AtomicUsize,
54 backend_chan: Arc<Mutex<Sender<BackendMsg>>>,
56}
57
58#[derive(Debug)]
59#[allow(dead_code)]
60pub struct ErrorLoadingPlugins(Vec<&'static str>);
61
62impl GStreamerBackend {
63 pub fn init_with_plugins(
64 plugin_dir: PathBuf,
65 plugins: &[&'static str],
66 ) -> Result<Box<dyn Backend>, ErrorLoadingPlugins> {
67 gst::init().unwrap();
68
69 let needs_background_glib_main_loop = {
74 let (major, minor, micro, _) = gst::version();
75 (major, minor, micro) >= (1, 19, 1) && (major, minor, micro) <= (1, 22, 7)
76 };
77
78 if needs_background_glib_main_loop {
79 BACKEND_THREAD.get_or_init(|| {
80 thread::spawn(|| glib::MainLoop::new(None, false).run());
81 true
82 });
83 }
84
85 let mut errors = vec![];
86 for plugin in plugins {
87 let mut path = plugin_dir.clone();
88 path.push(plugin);
89 let registry = gst::Registry::get();
90 if let Ok(p) = gst::Plugin::load_file(&path) {
91 if registry.add_plugin(&p).is_ok() {
92 continue;
93 }
94 }
95 errors.push(*plugin);
96 }
97
98 if !errors.is_empty() {
99 return Err(ErrorLoadingPlugins(errors));
100 }
101
102 let instances: Arc<
103 Mutex<HashMap<ClientContextId, Vec<(usize, Weak<Mutex<dyn MediaInstance>>)>>>,
104 > = Arc::new(Mutex::new(HashMap::new()));
105
106 let instances_ = instances.clone();
107 let (backend_chan, recvr) = mpsc::channel();
108 thread::Builder::new()
109 .name("GStreamerBackend ShutdownThread".to_owned())
110 .spawn(move || {
111 match recvr.recv().unwrap() {
112 BackendMsg::Shutdown {
113 context,
114 id,
115 tx_ack,
116 } => {
117 let mut instances_ = instances_.lock().unwrap();
118 if let Some(vec) = instances_.get_mut(&context) {
119 vec.retain(|m| m.0 != id);
120 if vec.is_empty() {
121 instances_.remove(&context);
122 }
123 }
124 let _ = tx_ack.send(());
126 },
127 };
128 })
129 .unwrap();
130
131 Ok(Box::new(GStreamerBackend {
132 capture_mocking: AtomicBool::new(false),
133 instances,
134 next_instance_id: AtomicUsize::new(0),
135 backend_chan: Arc::new(Mutex::new(backend_chan)),
136 }))
137 }
138
139 fn media_instance_action(
140 &self,
141 id: &ClientContextId,
142 cb: &dyn Fn(&dyn MediaInstance) -> Result<(), ()>,
143 ) {
144 let mut instances = self.instances.lock().unwrap();
145 match instances.get_mut(id) {
146 Some(vec) => vec.retain(|(_, weak)| {
147 if let Some(instance) = weak.upgrade() {
148 if cb(&*(instance.lock().unwrap())).is_err() {
149 warn!("Error executing media instance action");
150 }
151 true
152 } else {
153 false
154 }
155 }),
156 None => {
157 warn!("Trying to exec media action on an unknown client context");
158 },
159 }
160 }
161}
162
163impl Backend for GStreamerBackend {
164 fn create_player(
165 &self,
166 context_id: &ClientContextId,
167 stream_type: StreamType,
168 sender: IpcSender<PlayerEvent>,
169 renderer: Option<Arc<Mutex<dyn VideoFrameRenderer>>>,
170 audio_renderer: Option<Arc<Mutex<dyn AudioRenderer>>>,
171 gl_context: Box<dyn PlayerGLContext>,
172 ) -> Arc<Mutex<dyn Player>> {
173 let id = self.next_instance_id.fetch_add(1, Ordering::Relaxed);
174 let player = Arc::new(Mutex::new(player::GStreamerPlayer::new(
175 id,
176 context_id,
177 self.backend_chan.clone(),
178 stream_type,
179 sender,
180 renderer,
181 audio_renderer,
182 gl_context,
183 )));
184 let mut instances = self.instances.lock().unwrap();
185 let entry = instances.entry(*context_id).or_insert(Vec::new());
186 entry.push((id, Arc::downgrade(&player).clone()));
187 player
188 }
189
190 fn create_audio_context(
191 &self,
192 client_context_id: &ClientContextId,
193 options: AudioContextOptions,
194 ) -> Result<Arc<Mutex<AudioContext>>, AudioSinkError> {
195 let id = self.next_instance_id.fetch_add(1, Ordering::Relaxed);
196 let audio_context =
197 AudioContext::new::<Self>(id, client_context_id, self.backend_chan.clone(), options)?;
198
199 let audio_context = Arc::new(Mutex::new(audio_context));
200
201 let mut instances = self.instances.lock().unwrap();
202 let entry = instances.entry(*client_context_id).or_insert(Vec::new());
203 entry.push((id, Arc::downgrade(&audio_context).clone()));
204
205 Ok(audio_context)
206 }
207
208 fn create_webrtc(&self, signaller: Box<dyn WebRtcSignaller>) -> WebRtcController {
209 WebRtcController::new::<Self>(signaller)
210 }
211
212 fn create_audiostream(&self) -> MediaStreamId {
213 GStreamerMediaStream::create_audio()
214 }
215
216 fn create_videostream(&self) -> MediaStreamId {
217 GStreamerMediaStream::create_video()
218 }
219
220 fn create_stream_output(&self) -> Box<dyn MediaOutput> {
221 Box::new(media_stream::MediaSink::new())
222 }
223
224 fn create_stream_and_socket(
225 &self,
226 ty: MediaStreamType,
227 ) -> (Box<dyn MediaSocket>, MediaStreamId) {
228 let (id, socket) = GStreamerMediaStream::create_proxy(ty);
229 (Box::new(socket), id)
230 }
231
232 fn create_audioinput_stream(&self, set: MediaTrackConstraintSet) -> Option<MediaStreamId> {
233 if self.capture_mocking.load(Ordering::Acquire) {
234 return Some(self.create_audiostream());
236 }
237 media_capture::create_audioinput_stream(set)
238 }
239
240 fn create_videoinput_stream(&self, set: MediaTrackConstraintSet) -> Option<MediaStreamId> {
241 if self.capture_mocking.load(Ordering::Acquire) {
242 return Some(self.create_videostream());
244 }
245 media_capture::create_videoinput_stream(set)
246 }
247
248 fn can_play_type(&self, media_type: &str) -> SupportsMediaType {
249 if let Ok(mime) = media_type.parse::<Mime>() {
250 if mime.subtype() == mime::OGG {
256 return SupportsMediaType::No;
257 }
258
259 let mime_type = mime.type_().as_str().to_owned() + "/" + mime.subtype().as_str();
260 let codecs = match mime.get_param("codecs") {
261 Some(codecs) => codecs
262 .as_str()
263 .split(',')
264 .map(|codec| codec.trim())
265 .collect(),
266 None => vec![],
267 };
268
269 if GSTREAMER_REGISTRY_SCANNER.is_container_type_supported(&mime_type) {
270 if codecs.is_empty() {
271 return SupportsMediaType::Maybe;
272 } else if GSTREAMER_REGISTRY_SCANNER.are_all_codecs_supported(&codecs) {
273 return SupportsMediaType::Probably;
274 } else {
275 return SupportsMediaType::No;
276 }
277 }
278 }
279 SupportsMediaType::No
280 }
281
282 fn set_capture_mocking(&self, mock: bool) {
283 self.capture_mocking.store(mock, Ordering::Release)
284 }
285
286 fn mute(&self, id: &ClientContextId, val: bool) {
287 self.media_instance_action(
288 id,
289 &(move |instance: &dyn MediaInstance| instance.mute(val)),
290 );
291 }
292
293 fn suspend(&self, id: &ClientContextId) {
294 self.media_instance_action(id, &|instance: &dyn MediaInstance| instance.suspend());
295 }
296
297 fn resume(&self, id: &ClientContextId) {
298 self.media_instance_action(id, &|instance: &dyn MediaInstance| instance.resume());
299 }
300
301 fn get_device_monitor(&self) -> Box<dyn MediaDeviceMonitor> {
302 Box::new(GStreamerDeviceMonitor::new())
303 }
304}
305
306impl AudioBackend for GStreamerBackend {
307 type Sink = audio_sink::GStreamerAudioSink;
308 fn make_decoder() -> Box<dyn AudioDecoder> {
309 Box::new(audio_decoder::GStreamerAudioDecoder::new())
310 }
311 fn make_sink() -> Result<Self::Sink, AudioSinkError> {
312 audio_sink::GStreamerAudioSink::new()
313 }
314
315 fn make_streamreader(id: MediaStreamId, sample_rate: f32) -> Box<dyn AudioStreamReader + Send> {
316 Box::new(audio_stream_reader::GStreamerAudioStreamReader::new(id, sample_rate).unwrap())
317 }
318}
319
320impl WebRtcBackend for GStreamerBackend {
321 type Controller = webrtc::GStreamerWebRtcController;
322
323 fn construct_webrtc_controller(
324 signaller: Box<dyn WebRtcSignaller>,
325 thread: WebRtcController,
326 ) -> Self::Controller {
327 webrtc::construct(signaller, thread).expect("WebRTC creation failed")
328 }
329}
330
331impl BackendInit for GStreamerBackend {
332 fn init() -> Box<dyn Backend> {
333 Self::init_with_plugins(PathBuf::new(), &[]).unwrap()
334 }
335}
336
337impl BackendDeInit for GStreamerBackend {
338 fn deinit(&self) {
339 let to_shutdown: Vec<(ClientContextId, usize)> = {
340 let map = self.instances.lock().unwrap();
341 map.iter()
342 .flat_map(|(ctx, v)| v.iter().map(move |(id, _)| (*ctx, *id)))
343 .collect()
344 };
345
346 for (ctx, id) in to_shutdown {
347 let (tx_ack, rx_ack) = mpsc::channel();
348 let _ = self
349 .backend_chan
350 .lock()
351 .unwrap()
352 .send(BackendMsg::Shutdown {
353 context: ctx,
354 id,
355 tx_ack,
356 });
357 let _ = rx_ack.recv();
358 }
359 }
360}