servo_media_gstreamer/
player.rs

1use super::BACKEND_BASE_TIME;
2use crate::media_stream::GStreamerMediaStream;
3use crate::media_stream_source::{register_servo_media_stream_src, ServoMediaStreamSrc};
4use crate::render::GStreamerRender;
5use crate::source::{register_servo_src, ServoSrc};
6use byte_slice_cast::AsSliceOf;
7use glib;
8use glib::prelude::*;
9use gst;
10use gst::prelude::*;
11use gst_app;
12use gst_play;
13use gst_play::prelude::*;
14use ipc_channel::ipc::{channel, IpcReceiver, IpcSender};
15use servo_media_player::audio::AudioRenderer;
16use servo_media_player::context::PlayerGLContext;
17use servo_media_player::metadata::Metadata;
18use servo_media_player::video::VideoFrameRenderer;
19use servo_media_player::{
20    PlaybackState, Player, PlayerError, PlayerEvent, SeekLock, SeekLockMsg, StreamType,
21};
22use servo_media_streams::registry::{get_stream, MediaStreamId};
23use servo_media_traits::{BackendMsg, ClientContextId, MediaInstance};
24use std::cell::RefCell;
25use std::ops::Range;
26use std::sync::atomic::{AtomicBool, Ordering};
27use std::sync::mpsc::{self, Sender};
28use std::sync::{Arc, Mutex, Once};
29use std::time;
30use std::u64;
31
32const MAX_BUFFER_SIZE: i32 = 500 * 1024 * 1024;
33
34fn metadata_from_media_info(media_info: &gst_play::PlayMediaInfo) -> Result<Metadata, ()> {
35    let dur = media_info.duration();
36    let duration = if let Some(dur) = dur {
37        let mut nanos = dur.nseconds();
38        nanos = nanos % 1_000_000_000;
39        let seconds = dur.seconds();
40        Some(time::Duration::new(seconds, nanos as u32))
41    } else {
42        None
43    };
44
45    let mut audio_tracks = Vec::new();
46    let mut video_tracks = Vec::new();
47
48    let format = media_info
49        .container_format()
50        .unwrap_or_else(|| glib::GString::from(""))
51        .to_string();
52
53    for stream_info in media_info.stream_list() {
54        let stream_type = stream_info.stream_type();
55        match stream_type.as_str() {
56            "audio" => {
57                let codec = stream_info
58                    .codec()
59                    .unwrap_or_else(|| glib::GString::from(""))
60                    .to_string();
61                audio_tracks.push(codec);
62            }
63            "video" => {
64                let codec = stream_info
65                    .codec()
66                    .unwrap_or_else(|| glib::GString::from(""))
67                    .to_string();
68                video_tracks.push(codec);
69            }
70            _ => {}
71        }
72    }
73
74    let mut width: u32 = 0;
75    let height: u32 = if media_info.number_of_video_streams() > 0 {
76        let first_video_stream = &media_info.video_streams()[0];
77        width = first_video_stream.width() as u32;
78        first_video_stream.height() as u32
79    } else {
80        0
81    };
82
83    let is_seekable = media_info.is_seekable();
84    let is_live = media_info.is_live();
85    let title = media_info.title().map(|s| s.as_str().to_string());
86
87    Ok(Metadata {
88        duration,
89        width,
90        height,
91        format,
92        is_seekable,
93        audio_tracks,
94        video_tracks,
95        is_live,
96        title,
97    })
98}
99
100pub struct GStreamerAudioChunk(gst::buffer::MappedBuffer<gst::buffer::Readable>);
101impl AsRef<[f32]> for GStreamerAudioChunk {
102    fn as_ref(&self) -> &[f32] {
103        self.0.as_ref().as_slice_of::<f32>().unwrap()
104    }
105}
106
107#[derive(PartialEq)]
108enum PlayerSource {
109    Seekable(ServoSrc),
110    Stream(ServoMediaStreamSrc),
111}
112
113struct PlayerInner {
114    player: gst_play::Play,
115    signal_adapter: gst_play::PlaySignalAdapter,
116    source: Option<PlayerSource>,
117    video_sink: gst_app::AppSink,
118    input_size: u64,
119    rate: f64,
120    stream_type: StreamType,
121    last_metadata: Option<Metadata>,
122    cat: gst::DebugCategory,
123    enough_data: Arc<AtomicBool>,
124}
125
126impl PlayerInner {
127    pub fn set_input_size(&mut self, size: u64) -> Result<(), PlayerError> {
128        // Set input_size to proxy its value, since it
129        // could be set by the user before calling .setup().
130        self.input_size = size;
131        match self.source {
132            // The input size is only useful for seekable streams.
133            Some(ref mut source) => {
134                if let PlayerSource::Seekable(source) = source {
135                    source.set_size(if size > 0 {
136                        size as i64
137                    } else {
138                        -1 // live source
139                    });
140                }
141            }
142            _ => (),
143        }
144        Ok(())
145    }
146
147    pub fn set_mute(&mut self, val: bool) -> Result<(), PlayerError> {
148        self.player.set_mute(val);
149        Ok(())
150    }
151
152    pub fn set_rate(&mut self, rate: f64) -> Result<(), PlayerError> {
153        // This method may be called before the player setup is done, so we safe the rate value
154        // and set it once the player is ready and after getting the media info
155        self.rate = rate;
156        if let Some(ref metadata) = self.last_metadata {
157            if !metadata.is_seekable {
158                gst::warning!(self.cat, obj = &self.player,
159                             "Player must be seekable in order to set the playback rate");
160                return Err(PlayerError::NonSeekableStream);
161            }
162            self.player.set_rate(rate);
163        }
164        Ok(())
165    }
166
167    pub fn play(&mut self) -> Result<(), PlayerError> {
168        self.player.play();
169        Ok(())
170    }
171
172    pub fn stop(&mut self) -> Result<(), PlayerError> {
173        self.player.stop();
174        self.last_metadata = None;
175        self.source = None;
176        Ok(())
177    }
178
179    pub fn pause(&mut self) -> Result<(), PlayerError> {
180        self.player.pause();
181        Ok(())
182    }
183
184    pub fn end_of_stream(&mut self) -> Result<(), PlayerError> {
185        match self.source {
186            Some(ref mut source) => {
187                if let PlayerSource::Seekable(source) = source {
188                    source
189                        .push_end_of_stream()
190                        .map(|_| ())
191                        .map_err(|_| PlayerError::EOSFailed)
192                } else {
193                    Ok(())
194                }
195            }
196            _ => Ok(()),
197        }
198    }
199
200    pub fn seek(&mut self, time: f64) -> Result<(), PlayerError> {
201        if self.stream_type != StreamType::Seekable {
202            return Err(PlayerError::NonSeekableStream);
203        }
204        if let Some(ref metadata) = self.last_metadata {
205            if let Some(ref duration) = metadata.duration {
206                if duration < &time::Duration::new(time as u64, 0) {
207                    gst::warning!(self.cat, obj = &self.player, "Trying to seek out of range");
208                    return Err(PlayerError::SeekOutOfRange);
209                }
210            }
211        }
212
213        let time = time * 1_000_000_000.;
214        self.player.seek(gst::ClockTime::from_nseconds(time as u64));
215        Ok(())
216    }
217
218    pub fn set_volume(&mut self, value: f64) -> Result<(), PlayerError> {
219        self.player.set_volume(value);
220        Ok(())
221    }
222
223    pub fn push_data(&mut self, data: Vec<u8>) -> Result<(), PlayerError> {
224        if let Some(ref mut source) = self.source {
225            if let PlayerSource::Seekable(source) = source {
226                if self.enough_data.load(Ordering::Relaxed) {
227                    return Err(PlayerError::EnoughData);
228                }
229                return source
230                    .push_buffer(data)
231                    .map(|_| ())
232                    .map_err(|_| PlayerError::BufferPushFailed);
233            }
234        }
235        Err(PlayerError::BufferPushFailed)
236    }
237
238    pub fn set_src(&mut self, source: PlayerSource) {
239        self.source = Some(source);
240    }
241
242    pub fn buffered(&mut self) -> Result<Vec<Range<f64>>, PlayerError> {
243        let mut result = vec![];
244        if let Some(ref metadata) = self.last_metadata {
245            if let Some(ref duration) = metadata.duration {
246                let pipeline = self.player.pipeline();
247                let mut buffering = gst::query::Buffering::new(gst::Format::Percent);
248                if pipeline.query(&mut buffering) {
249                    let ranges = buffering.ranges();
250                    for (start, end) in &ranges {
251                        let start = (if let gst::GenericFormattedValue::Percent(start) = start {
252                            start.unwrap()
253                        } else {
254                            gst::format::Percent::from_percent(0)
255                        } * duration.as_secs() as u32
256                            / gst::format::Percent::MAX) as f64;
257                        let end = (if let gst::GenericFormattedValue::Percent(end) = end {
258                            end.unwrap()
259                        } else {
260                            gst::format::Percent::from_percent(0)
261                        } * duration.as_secs() as u32
262                            / gst::format::Percent::MAX) as f64;
263                        result.push(Range { start, end });
264                    }
265                }
266            }
267        }
268        Ok(result)
269    }
270
271    fn set_stream(&mut self, stream: &MediaStreamId, only_stream: bool) -> Result<(), PlayerError> {
272        debug_assert!(self.stream_type == StreamType::Stream);
273        if let Some(ref source) = self.source {
274            if let PlayerSource::Stream(source) = source {
275                let stream =
276                    get_stream(stream).expect("Media streams registry does not contain such ID");
277                let mut stream = stream.lock().unwrap();
278                if let Some(mut stream) = stream.as_mut_any().downcast_mut::<GStreamerMediaStream>()
279                {
280                    let playbin = self
281                        .player
282                        .pipeline()
283                        .dynamic_cast::<gst::Pipeline>()
284                        .unwrap();
285                    let clock = gst::SystemClock::obtain();
286                    playbin.set_base_time(*BACKEND_BASE_TIME);
287                    playbin.set_start_time(gst::ClockTime::NONE);
288                    playbin.use_clock(Some(&clock));
289
290                    source.set_stream(&mut stream, only_stream);
291                    return Ok(());
292                }
293            }
294        }
295        Err(PlayerError::SetStreamFailed)
296    }
297
298    fn set_audio_track(&mut self, stream_index: i32, enabled: bool) -> Result<(), PlayerError> {
299        self.player
300            .set_audio_track(stream_index)
301            .map_err(|_| PlayerError::SetTrackFailed)?;
302        self.player.set_audio_track_enabled(enabled);
303        Ok(())
304    }
305
306    fn set_video_track(&mut self, stream_index: i32, enabled: bool) -> Result<(), PlayerError> {
307        self.player
308            .set_video_track(stream_index)
309            .map_err(|_| PlayerError::SetTrackFailed)?;
310        self.player.set_video_track_enabled(enabled);
311        Ok(())
312    }
313}
314
315macro_rules! notify(
316    ($observer:expr, $event:expr) => {
317        $observer.lock().unwrap().send($event)
318    };
319);
320
321struct SeekChannel {
322    sender: SeekLock,
323    recv: IpcReceiver<SeekLockMsg>,
324}
325
326impl SeekChannel {
327    fn new() -> Self {
328        let (sender, recv) = channel::<SeekLockMsg>().expect("Couldn't create IPC channel");
329        Self {
330            sender: SeekLock {
331                lock_channel: sender,
332            },
333            recv,
334        }
335    }
336
337    fn sender(&self) -> SeekLock {
338        self.sender.clone()
339    }
340
341    fn _await(&self) -> SeekLockMsg {
342        self.recv.recv().unwrap()
343    }
344}
345
346pub struct GStreamerPlayer {
347    /// The player unique ID.
348    id: usize,
349    /// The ID of the client context this player belongs to.
350    context_id: ClientContextId,
351    /// Channel to communicate with the owner GStreamerBackend instance.
352    backend_chan: Arc<Mutex<Sender<BackendMsg>>>,
353    inner: RefCell<Option<Arc<Mutex<PlayerInner>>>>,
354    observer: Arc<Mutex<IpcSender<PlayerEvent>>>,
355    audio_renderer: Option<Arc<Mutex<dyn AudioRenderer>>>,
356    video_renderer: Option<Arc<Mutex<dyn VideoFrameRenderer>>>,
357    /// Indicates whether the setup was succesfully performed and
358    /// we are ready to consume a/v data.
359    is_ready: Arc<Once>,
360    /// Indicates whether the type of media stream to be played is a live stream.
361    stream_type: StreamType,
362    /// Decorator used to setup the video sink and process the produced frames.
363    render: Arc<Mutex<GStreamerRender>>,
364}
365
366impl GStreamerPlayer {
367    pub fn new(
368        id: usize,
369        context_id: &ClientContextId,
370        backend_chan: Arc<Mutex<Sender<BackendMsg>>>,
371        stream_type: StreamType,
372        observer: IpcSender<PlayerEvent>,
373        video_renderer: Option<Arc<Mutex<dyn VideoFrameRenderer>>>,
374        audio_renderer: Option<Arc<Mutex<dyn AudioRenderer>>>,
375        gl_context: Box<dyn PlayerGLContext>,
376    ) -> GStreamerPlayer {
377        let _ = gst::DebugCategory::new(
378            "servoplayer",
379            gst::DebugColorFlags::empty(),
380            Some("Servo player"),
381        );
382
383        Self {
384            id,
385            context_id: *context_id,
386            backend_chan,
387            inner: RefCell::new(None),
388            observer: Arc::new(Mutex::new(observer)),
389            audio_renderer,
390            video_renderer,
391            is_ready: Arc::new(Once::new()),
392            stream_type,
393            render: Arc::new(Mutex::new(GStreamerRender::new(gl_context))),
394        }
395    }
396
397    fn setup(&self) -> Result<(), PlayerError> {
398        if self.inner.borrow().is_some() {
399            return Ok(());
400        }
401
402        // Check that we actually have the elements that we
403        // need to make this work.
404        for element in vec!["playbin3", "decodebin3", "queue"].iter() {
405            if gst::ElementFactory::find(element).is_none() {
406                return Err(PlayerError::Backend(format!(
407                    "Missing dependency: {}",
408                    element
409                )));
410            }
411        }
412
413        let player = gst_play::Play::default();
414        let signal_adapter = gst_play::PlaySignalAdapter::new_sync_emit(&player);
415        let pipeline = player.pipeline();
416
417        // FIXME(#282): The progressive downloading breaks playback on Windows and Android.
418        if !cfg!(any(target_os = "windows", target_os = "android")) {
419            // Set player to perform progressive downloading. This will make the
420            // player store the downloaded media in a local temporary file for
421            // faster playback of already-downloaded chunks.
422            let flags = pipeline.property_value("flags");
423            let flags_class = match glib::FlagsClass::with_type(flags.type_()) {
424                Some(flags) => flags,
425                None => {
426                    return Err(PlayerError::Backend(
427                        "FlagsClass creation failed".to_owned(),
428                    ));
429                }
430            };
431            let flags_class = match flags_class.builder_with_value(flags) {
432                Some(class) => class,
433                None => {
434                    return Err(PlayerError::Backend(
435                        "FlagsClass creation failed".to_owned(),
436                    ));
437                }
438            };
439            let Some(flags) = flags_class.set_by_nick("download").build() else {
440                return Err(PlayerError::Backend(
441                    "FlagsClass creation failed".to_owned(),
442                ));
443            };
444            pipeline.set_property_from_value("flags", &flags);
445        }
446
447        // Set max size for the player buffer.
448        pipeline.set_property("buffer-size", MAX_BUFFER_SIZE);
449
450        // Set player position interval update to 0.5 seconds.
451        let mut config = player.config();
452        config.set_position_update_interval(500u32);
453        player
454            .set_config(config)
455            .map_err(|e| PlayerError::Backend(e.to_string()))?;
456
457        if let Some(ref audio_renderer) = self.audio_renderer {
458            let audio_sink = gst::ElementFactory::make("appsink")
459                .build()
460                .map_err(|error| PlayerError::Backend(format!("appsink creation failed: {error:?}")))?;
461
462            pipeline.set_property("audio-sink", &audio_sink);
463
464            let audio_sink = audio_sink.dynamic_cast::<gst_app::AppSink>().unwrap();
465            let audio_renderer_ = audio_renderer.clone();
466            audio_sink.set_callbacks(
467                gst_app::AppSinkCallbacks::builder()
468                    .new_preroll(|_| Ok(gst::FlowSuccess::Ok))
469                    .new_sample(move |audio_sink| {
470                        let sample = audio_sink.pull_sample().map_err(|_| gst::FlowError::Eos)?;
471                        let buffer = sample.buffer_owned().ok_or(gst::FlowError::Error)?;
472                        let audio_info = sample
473                            .caps()
474                            .and_then(|caps| gst_audio::AudioInfo::from_caps(caps).ok())
475                            .ok_or(gst::FlowError::Error)?;
476                        let positions = audio_info.positions().ok_or(gst::FlowError::Error)?;
477                        for position in positions.iter() {
478                            let buffer = buffer.clone();
479                            let map = if let Ok(map) = buffer.into_mapped_buffer_readable() {
480                                map
481                            } else {
482                                return Err(gst::FlowError::Error);
483                            };
484                            let chunk = Box::new(GStreamerAudioChunk(map));
485                            let channel = position.to_mask() as u32;
486                            audio_renderer_.lock().unwrap().render(chunk, channel);
487                        }
488                        Ok(gst::FlowSuccess::Ok)
489                    })
490                    .build(),
491            );
492        }
493
494        let video_sink = self.render.lock().unwrap().setup_video_sink(&pipeline)?;
495
496        // There's a known bug in gstreamer that may cause a wrong transition
497        // to the ready state while setting the uri property:
498        // https://cgit.freedesktop.org/gstreamer/gst-plugins-bad/commit/?id=afbbc3a97ec391c6a582f3c746965fdc3eb3e1f3
499        // This may affect things like setting the config, so until the bug is
500        // fixed, make sure that state dependent code happens before this line.
501        // The estimated version for the fix is 1.14.5 / 1.15.1.
502        // https://github.com/servo/servo/issues/22010#issuecomment-432599657
503        let uri = match self.stream_type {
504            StreamType::Stream => {
505                register_servo_media_stream_src().map_err(|error| {
506                    PlayerError::Backend(format!("servomediastreamsrc registration error: {error:?}"))
507                })?;
508                "mediastream://".to_value()
509            }
510            StreamType::Seekable => {
511                register_servo_src()
512                    .map_err(|error| PlayerError::Backend(format!("servosrc registration error: {error:?}")))?;
513                "servosrc://".to_value()
514            }
515        };
516        player.set_property("uri", &uri);
517
518        // No video_renderers no video
519        if self.video_renderer.is_none() {
520            player.set_video_track_enabled(false);
521        }
522
523        *self.inner.borrow_mut() = Some(Arc::new(Mutex::new(PlayerInner {
524            player,
525            signal_adapter: signal_adapter.clone(),
526            source: None,
527            video_sink,
528            input_size: 0,
529            rate: 1.0,
530            stream_type: self.stream_type,
531            last_metadata: None,
532            cat: gst::DebugCategory::get("servoplayer").unwrap(),
533            enough_data: Arc::new(AtomicBool::new(false)),
534        })));
535
536        let inner = self.inner.borrow();
537        let inner = inner.as_ref().unwrap();
538        let observer = self.observer.clone();
539        // Handle `end-of-stream` signal.
540        signal_adapter.connect_end_of_stream(move |_| {
541            let _ = notify!(observer, PlayerEvent::EndOfStream);
542        });
543
544        let observer = self.observer.clone();
545        // Handle `error` signal
546        signal_adapter.connect_error(move |_self, error, _details| {
547            let _ = notify!(observer, PlayerEvent::Error(error.to_string()));
548        });
549
550        let observer = self.observer.clone();
551        // Handle `state-changed` signal.
552        signal_adapter.connect_state_changed(move |_, player_state| {
553            let state = match player_state {
554                gst_play::PlayState::Buffering => Some(PlaybackState::Buffering),
555                gst_play::PlayState::Stopped => Some(PlaybackState::Stopped),
556                gst_play::PlayState::Paused => Some(PlaybackState::Paused),
557                gst_play::PlayState::Playing => Some(PlaybackState::Playing),
558                _ => None,
559            };
560            if let Some(v) = state {
561                let _ = notify!(observer, PlayerEvent::StateChanged(v));
562            }
563        });
564
565        let observer = self.observer.clone();
566        // Handle `position-update` signal.
567        signal_adapter.connect_position_updated(move |_, position| {
568            if let Some(seconds) = position.map(|p| p.seconds()) {
569                let _ = notify!(observer, PlayerEvent::PositionChanged(seconds));
570            }
571        });
572
573        let observer = self.observer.clone();
574        // Handle `seek-done` signal.
575        signal_adapter.connect_seek_done(move |_, position| {
576            let _ = notify!(observer, PlayerEvent::SeekDone(position.seconds()));
577        });
578
579        // Handle `media-info-updated` signal.
580        let inner_clone = inner.clone();
581        let observer = self.observer.clone();
582        signal_adapter.connect_media_info_updated(move |_, info| {
583            let mut inner = inner_clone.lock().unwrap();
584            if let Ok(metadata) = metadata_from_media_info(info) {
585                if inner.last_metadata.as_ref() != Some(&metadata) {
586                    inner.last_metadata = Some(metadata.clone());
587                    if metadata.is_seekable {
588                        inner.player.set_rate(inner.rate);
589                    }
590                    gst::info!(inner.cat, obj = &inner.player, "Metadata updated: {:?}", metadata);
591                    let _ = notify!(observer, PlayerEvent::MetadataUpdated(metadata));
592                }
593            }
594        });
595
596        // Handle `duration-changed` signal.
597        let inner_clone = inner.clone();
598        let observer = self.observer.clone();
599        signal_adapter.connect_duration_changed(move |_, duration| {
600            let mut inner = inner_clone.lock().unwrap();
601            let duration = duration.map(|duration| {
602                let nanos = duration.nseconds();
603                let seconds = duration.seconds();
604                time::Duration::new(seconds, (nanos % 1_000_000_000) as u32)
605            });
606            let mut updated_metadata = None;
607            if let Some(ref mut metadata) = inner.last_metadata {
608                metadata.duration = duration;
609                updated_metadata = Some(metadata.clone());
610            }
611            if let Some(updated_metadata) = updated_metadata {
612                gst::info!(inner.cat, obj = &inner.player, "Duration updated: {:?}",
613                              updated_metadata);
614                let _ = notify!(observer, PlayerEvent::MetadataUpdated(updated_metadata));
615            }
616        });
617
618        if let Some(video_renderer) = self.video_renderer.clone() {
619            // Creates a closure that renders a frame using the video_renderer
620            // Used in the preroll and sample callbacks
621            let render_sample = {
622                let render = self.render.clone();
623                let observer = self.observer.clone();
624                move |sample: gst::Sample| {
625                    let frame = render
626                        .lock()
627                        .unwrap()
628                        .get_frame_from_sample(sample)
629                        .map_err(|_| gst::FlowError::Error)?;
630                    video_renderer.lock().unwrap().render(frame);
631                    let _ = notify!(observer, PlayerEvent::VideoFrameUpdated);
632                    Ok(gst::FlowSuccess::Ok)
633                }
634            };
635
636            // Set video_sink callbacks.
637            inner.lock().unwrap().video_sink.set_callbacks(
638                gst_app::AppSinkCallbacks::builder()
639                    .new_preroll({
640                        let render_sample = render_sample.clone();
641                        move |video_sink| {
642                            render_sample(video_sink.pull_preroll().map_err(|_| gst::FlowError::Eos)?)
643                        }
644                    })
645                    .new_sample(move |video_sink| {
646                        render_sample(video_sink.pull_sample().map_err(|_| gst::FlowError::Eos)?)
647                    })
648                    .build(),
649            );
650        };
651
652        let (receiver, error_handler_id) = {
653            let inner_clone = inner.clone();
654            let mut inner = inner.lock().unwrap();
655            let pipeline = inner.player.pipeline();
656
657            let (sender, receiver) = mpsc::channel();
658
659            let sender = Arc::new(Mutex::new(sender));
660            let sender_clone = sender.clone();
661            let is_ready_clone = self.is_ready.clone();
662            let observer = self.observer.clone();
663            pipeline.connect("source-setup", false, move |args| {
664                let source = args[1].get::<gst::Element>().unwrap();
665
666                let mut inner = inner_clone.lock().unwrap();
667                let source = match inner.stream_type {
668                    StreamType::Seekable => {
669                        let servosrc = source
670                            .dynamic_cast::<ServoSrc>()
671                            .expect("Source element is expected to be a ServoSrc!");
672
673                        if inner.input_size > 0 {
674                            servosrc.set_size(inner.input_size as i64);
675                        }
676
677                        let sender_clone = sender.clone();
678                        let is_ready = is_ready_clone.clone();
679                        let observer_ = observer.clone();
680                        let observer__ = observer.clone();
681                        let observer___ = observer.clone();
682                        let servosrc_ = servosrc.clone();
683                        let enough_data_ = inner.enough_data.clone();
684                        let enough_data__ = inner.enough_data.clone();
685                        let seek_channel = Arc::new(Mutex::new(SeekChannel::new()));
686                        servosrc.set_callbacks(
687                            gst_app::AppSrcCallbacks::builder()
688                                .need_data(move |_, _| {
689                                    // We block the caller of the setup method until we get
690                                    // the first need-data signal, so we ensure that we
691                                    // don't miss any data between the moment the client
692                                    // calls setup and the player is actually ready to
693                                    // get any data.
694                                    is_ready.call_once(|| {
695                                        let _ = sender_clone.lock().unwrap().send(Ok(()));
696                                    });
697
698                                    enough_data_.store(false, Ordering::Relaxed);
699                                    let _ = notify!(observer_, PlayerEvent::NeedData);
700                                })
701                                .enough_data(move |_| {
702                                    enough_data__.store(true, Ordering::Relaxed);
703                                    let _ = notify!(observer__, PlayerEvent::EnoughData);
704                                })
705                                .seek_data(move |_, offset| {
706                                    let (ret, ack_channel) = if servosrc_.set_seek_offset(offset) {
707                                        let _ = notify!(
708                                            observer___,
709                                            PlayerEvent::SeekData(
710                                                offset,
711                                                seek_channel.lock().unwrap().sender()
712                                            )
713                                        );
714                                        let (ret, ack_channel) =
715                                            seek_channel.lock().unwrap()._await();
716                                        (ret, Some(ack_channel))
717                                    } else {
718                                        (true, None)
719                                    };
720
721                                    servosrc_.set_seek_done();
722                                    if let Some(ack_channel) = ack_channel {
723                                        ack_channel.send(()).unwrap();
724                                    }
725                                    ret
726                                })
727                                .build(),
728                        );
729
730                        PlayerSource::Seekable(servosrc)
731                    }
732                    StreamType::Stream => {
733                        let media_stream_src = source
734                            .dynamic_cast::<ServoMediaStreamSrc>()
735                            .expect("Source element is expected to be a ServoMediaStreamSrc!");
736                        let sender_clone = sender.clone();
737                        is_ready_clone.call_once(|| {
738                            let _ = notify!(sender_clone, Ok(()));
739                        });
740                        PlayerSource::Stream(media_stream_src)
741                    }
742                };
743
744                inner.set_src(source);
745
746                None
747            });
748
749            let error_handler_id = signal_adapter.connect_error(move |signal_adapter, error, _details| {
750                let _ = notify!(sender_clone, Err(PlayerError::Backend(error.to_string())));
751                signal_adapter.play().stop();
752            });
753
754            let _ = inner.pause();
755
756            (receiver, error_handler_id)
757        };
758
759        let result = receiver.recv().unwrap();
760        glib::signal::signal_handler_disconnect(&inner.lock().unwrap().player, error_handler_id);
761        result
762    }
763}
764
765macro_rules! inner_player_proxy {
766    ($fn_name:ident, $return_type:ty) => {
767        fn $fn_name(&self) -> Result<$return_type, PlayerError> {
768            self.setup()?;
769            let inner = self.inner.borrow();
770            let mut inner = inner.as_ref().unwrap().lock().unwrap();
771            inner.$fn_name()
772        }
773    };
774
775    ($fn_name:ident, $arg1:ident, $arg1_type:ty) => {
776        fn $fn_name(&self, $arg1: $arg1_type) -> Result<(), PlayerError> {
777            self.setup()?;
778            let inner = self.inner.borrow();
779            let mut inner = inner.as_ref().unwrap().lock().unwrap();
780            inner.$fn_name($arg1)
781        }
782    };
783}
784
785impl Player for GStreamerPlayer {
786    inner_player_proxy!(play, ());
787    inner_player_proxy!(pause, ());
788    inner_player_proxy!(stop, ());
789    inner_player_proxy!(end_of_stream, ());
790    inner_player_proxy!(set_input_size, size, u64);
791    inner_player_proxy!(set_mute, val, bool);
792    inner_player_proxy!(set_rate, rate, f64);
793    inner_player_proxy!(push_data, data, Vec<u8>);
794    inner_player_proxy!(seek, time, f64);
795    inner_player_proxy!(set_volume, value, f64);
796    inner_player_proxy!(buffered, Vec<Range<f64>>);
797
798    fn seekable(&self) -> Result<Vec<Range<f64>>, PlayerError> {
799        self.setup()?;
800        let inner = self.inner.borrow();
801        let mut inner = inner.as_ref().unwrap().lock().unwrap();
802        // if the servosrc is seekable, we should return the duration of the media
803        if let Some(metadata) = inner.last_metadata.as_ref() {
804            if metadata.is_seekable {
805                if let Some(duration) = metadata.duration {
806                    return Ok(vec![Range {
807                        start: 0.0,
808                        end: duration.as_secs_f64(),
809                    }]);
810                }
811            }
812        }
813        // if the servosrc is not seekable, we should return the buffered range
814        inner.buffered()
815    }
816
817    fn render_use_gl(&self) -> bool {
818        self.render.lock().unwrap().is_gl()
819    }
820
821    fn set_stream(&self, stream: &MediaStreamId, only_stream: bool) -> Result<(), PlayerError> {
822        self.setup()?;
823        let inner = self.inner.borrow();
824        let mut inner = inner.as_ref().unwrap().lock().unwrap();
825        inner.set_stream(stream, only_stream)
826    }
827
828    fn set_audio_track(&self, stream_index: i32, enabled: bool) -> Result<(), PlayerError> {
829        self.setup()?;
830        let inner = self.inner.borrow();
831        let mut inner = inner.as_ref().unwrap().lock().unwrap();
832        inner.set_audio_track(stream_index, enabled)
833    }
834
835    fn set_video_track(&self, stream_index: i32, enabled: bool) -> Result<(), PlayerError> {
836        self.setup()?;
837        let inner = self.inner.borrow();
838        let mut inner = inner.as_ref().unwrap().lock().unwrap();
839        inner.set_video_track(stream_index, enabled)
840    }
841}
842
843impl MediaInstance for GStreamerPlayer {
844    fn get_id(&self) -> usize {
845        self.id
846    }
847
848    fn mute(&self, val: bool) -> Result<(), ()> {
849        self.set_mute(val).map_err(|_| ())
850    }
851
852    fn suspend(&self) -> Result<(), ()> {
853        self.pause().map_err(|_| ())
854    }
855
856    fn resume(&self) -> Result<(), ()> {
857        self.play().map_err(|_| ())
858    }
859}
860
861impl Drop for GStreamerPlayer {
862    fn drop(&mut self) {
863        let _ = self.stop();
864        let (tx_ack, rx_ack) = mpsc::channel();
865        let _ = self
866            .backend_chan
867            .lock()
868            .unwrap()
869            .send(BackendMsg::Shutdown {
870                context: self.context_id,
871                id: self.id,
872                tx_ack,
873            });
874        let _ = rx_ack.recv();
875    }
876}