servo_media_gstreamer/
player.rs

1use super::BACKEND_BASE_TIME;
2use crate::media_stream::GStreamerMediaStream;
3use crate::media_stream_source::{register_servo_media_stream_src, ServoMediaStreamSrc};
4use crate::render::GStreamerRender;
5use crate::source::{register_servo_src, ServoSrc};
6use byte_slice_cast::AsSliceOf;
7use glib;
8use glib::prelude::*;
9use gst;
10use gst::prelude::*;
11use gst_app;
12use gst_play;
13use gst_play::prelude::*;
14use ipc_channel::ipc::{channel, IpcReceiver, IpcSender};
15use servo_media_player::audio::AudioRenderer;
16use servo_media_player::context::PlayerGLContext;
17use servo_media_player::metadata::Metadata;
18use servo_media_player::video::VideoFrameRenderer;
19use servo_media_player::{
20    PlaybackState, Player, PlayerError, PlayerEvent, SeekLock, SeekLockMsg, StreamType,
21};
22use servo_media_streams::registry::{get_stream, MediaStreamId};
23use servo_media_traits::{BackendMsg, ClientContextId, MediaInstance};
24use std::cell::RefCell;
25use std::ops::Range;
26use std::sync::atomic::{AtomicBool, Ordering};
27use std::sync::mpsc::{self, Sender};
28use std::sync::{Arc, Mutex, Once};
29use std::time;
30use std::u64;
31
32const MAX_BUFFER_SIZE: i32 = 500 * 1024 * 1024;
33
34fn metadata_from_media_info(media_info: &gst_play::PlayMediaInfo) -> Result<Metadata, ()> {
35    let dur = media_info.duration();
36    let duration = if let Some(dur) = dur {
37        let mut nanos = dur.nseconds();
38        nanos = nanos % 1_000_000_000;
39        let seconds = dur.seconds();
40        Some(time::Duration::new(seconds, nanos as u32))
41    } else {
42        None
43    };
44
45    let mut audio_tracks = Vec::new();
46    let mut video_tracks = Vec::new();
47
48    let format = media_info
49        .container_format()
50        .unwrap_or_else(|| glib::GString::from(""))
51        .to_string();
52
53    for stream_info in media_info.stream_list() {
54        let stream_type = stream_info.stream_type();
55        match stream_type.as_str() {
56            "audio" => {
57                let codec = stream_info
58                    .codec()
59                    .unwrap_or_else(|| glib::GString::from(""))
60                    .to_string();
61                audio_tracks.push(codec);
62            },
63            "video" => {
64                let codec = stream_info
65                    .codec()
66                    .unwrap_or_else(|| glib::GString::from(""))
67                    .to_string();
68                video_tracks.push(codec);
69            },
70            _ => {},
71        }
72    }
73
74    let mut width: u32 = 0;
75    let height: u32 = if media_info.number_of_video_streams() > 0 {
76        let first_video_stream = &media_info.video_streams()[0];
77        width = first_video_stream.width() as u32;
78        first_video_stream.height() as u32
79    } else {
80        0
81    };
82
83    let is_seekable = media_info.is_seekable();
84    let is_live = media_info.is_live();
85    let title = media_info.title().map(|s| s.as_str().to_string());
86
87    Ok(Metadata {
88        duration,
89        width,
90        height,
91        format,
92        is_seekable,
93        audio_tracks,
94        video_tracks,
95        is_live,
96        title,
97    })
98}
99
100pub struct GStreamerAudioChunk(gst::buffer::MappedBuffer<gst::buffer::Readable>);
101impl AsRef<[f32]> for GStreamerAudioChunk {
102    fn as_ref(&self) -> &[f32] {
103        self.0.as_ref().as_slice_of::<f32>().unwrap()
104    }
105}
106
107#[derive(PartialEq)]
108enum PlayerSource {
109    Seekable(ServoSrc),
110    Stream(ServoMediaStreamSrc),
111}
112
113struct PlayerInner {
114    player: gst_play::Play,
115    signal_adapter: gst_play::PlaySignalAdapter,
116    source: Option<PlayerSource>,
117    video_sink: gst_app::AppSink,
118    input_size: u64,
119    rate: f64,
120    stream_type: StreamType,
121    last_metadata: Option<Metadata>,
122    cat: gst::DebugCategory,
123    enough_data: Arc<AtomicBool>,
124}
125
126impl PlayerInner {
127    pub fn set_input_size(&mut self, size: u64) -> Result<(), PlayerError> {
128        // Set input_size to proxy its value, since it
129        // could be set by the user before calling .setup().
130        self.input_size = size;
131        match self.source {
132            // The input size is only useful for seekable streams.
133            Some(ref mut source) => {
134                if let PlayerSource::Seekable(source) = source {
135                    source.set_size(if size > 0 {
136                        size as i64
137                    } else {
138                        -1 // live source
139                    });
140                }
141            },
142            _ => (),
143        }
144        Ok(())
145    }
146
147    pub fn set_mute(&mut self, val: bool) -> Result<(), PlayerError> {
148        self.player.set_mute(val);
149        Ok(())
150    }
151
152    pub fn set_rate(&mut self, rate: f64) -> Result<(), PlayerError> {
153        // This method may be called before the player setup is done, so we safe the rate value
154        // and set it once the player is ready and after getting the media info
155        self.rate = rate;
156        if let Some(ref metadata) = self.last_metadata {
157            if !metadata.is_seekable {
158                gst::warning!(
159                    self.cat,
160                    obj = &self.player,
161                    "Player must be seekable in order to set the playback rate"
162                );
163                return Err(PlayerError::NonSeekableStream);
164            }
165            self.player.set_rate(rate);
166        }
167        Ok(())
168    }
169
170    pub fn play(&mut self) -> Result<(), PlayerError> {
171        self.player.play();
172        Ok(())
173    }
174
175    pub fn stop(&mut self) -> Result<(), PlayerError> {
176        self.player.stop();
177        self.last_metadata = None;
178        self.source = None;
179        Ok(())
180    }
181
182    pub fn pause(&mut self) -> Result<(), PlayerError> {
183        self.player.pause();
184        Ok(())
185    }
186
187    pub fn end_of_stream(&mut self) -> Result<(), PlayerError> {
188        match self.source {
189            Some(ref mut source) => {
190                if let PlayerSource::Seekable(source) = source {
191                    source
192                        .push_end_of_stream()
193                        .map(|_| ())
194                        .map_err(|_| PlayerError::EOSFailed)
195                } else {
196                    Ok(())
197                }
198            },
199            _ => Ok(()),
200        }
201    }
202
203    pub fn seek(&mut self, time: f64) -> Result<(), PlayerError> {
204        if self.stream_type != StreamType::Seekable {
205            return Err(PlayerError::NonSeekableStream);
206        }
207        if let Some(ref metadata) = self.last_metadata {
208            if let Some(ref duration) = metadata.duration {
209                if duration < &time::Duration::new(time as u64, 0) {
210                    gst::warning!(self.cat, obj = &self.player, "Trying to seek out of range");
211                    return Err(PlayerError::SeekOutOfRange);
212                }
213            }
214        }
215
216        let time = time * 1_000_000_000.;
217        self.player.seek(gst::ClockTime::from_nseconds(time as u64));
218        Ok(())
219    }
220
221    pub fn set_volume(&mut self, value: f64) -> Result<(), PlayerError> {
222        self.player.set_volume(value);
223        Ok(())
224    }
225
226    pub fn push_data(&mut self, data: Vec<u8>) -> Result<(), PlayerError> {
227        if let Some(ref mut source) = self.source {
228            if let PlayerSource::Seekable(source) = source {
229                if self.enough_data.load(Ordering::Relaxed) {
230                    return Err(PlayerError::EnoughData);
231                }
232                return source
233                    .push_buffer(data)
234                    .map(|_| ())
235                    .map_err(|_| PlayerError::BufferPushFailed);
236            }
237        }
238        Err(PlayerError::BufferPushFailed)
239    }
240
241    pub fn set_src(&mut self, source: PlayerSource) {
242        self.source = Some(source);
243    }
244
245    pub fn buffered(&mut self) -> Result<Vec<Range<f64>>, PlayerError> {
246        let mut result = vec![];
247        if let Some(ref metadata) = self.last_metadata {
248            if let Some(ref duration) = metadata.duration {
249                let pipeline = self.player.pipeline();
250                let mut buffering = gst::query::Buffering::new(gst::Format::Percent);
251                if pipeline.query(&mut buffering) {
252                    let ranges = buffering.ranges();
253                    for (start, end) in ranges {
254                        let start = (if let gst::GenericFormattedValue::Percent(start) = start {
255                            start.unwrap()
256                        } else {
257                            gst::format::Percent::from_percent(0)
258                        } * duration.as_secs() as u32
259                            / gst::format::Percent::MAX) as f64;
260                        let end = (if let gst::GenericFormattedValue::Percent(end) = end {
261                            end.unwrap()
262                        } else {
263                            gst::format::Percent::from_percent(0)
264                        } * duration.as_secs() as u32
265                            / gst::format::Percent::MAX) as f64;
266                        result.push(Range { start, end });
267                    }
268                }
269            }
270        }
271        Ok(result)
272    }
273
274    fn set_stream(&mut self, stream: &MediaStreamId, only_stream: bool) -> Result<(), PlayerError> {
275        debug_assert!(self.stream_type == StreamType::Stream);
276        if let Some(ref source) = self.source {
277            if let PlayerSource::Stream(source) = source {
278                let stream =
279                    get_stream(stream).expect("Media streams registry does not contain such ID");
280                let mut stream = stream.lock().unwrap();
281                if let Some(mut stream) = stream.as_mut_any().downcast_mut::<GStreamerMediaStream>()
282                {
283                    let playbin = self
284                        .player
285                        .pipeline()
286                        .dynamic_cast::<gst::Pipeline>()
287                        .unwrap();
288                    let clock = gst::SystemClock::obtain();
289                    playbin.set_base_time(*BACKEND_BASE_TIME);
290                    playbin.set_start_time(gst::ClockTime::NONE);
291                    playbin.use_clock(Some(&clock));
292
293                    source.set_stream(&mut stream, only_stream);
294                    return Ok(());
295                }
296            }
297        }
298        Err(PlayerError::SetStreamFailed)
299    }
300
301    fn set_audio_track(&mut self, stream_index: i32, enabled: bool) -> Result<(), PlayerError> {
302        self.player
303            .set_audio_track(stream_index)
304            .map_err(|_| PlayerError::SetTrackFailed)?;
305        self.player.set_audio_track_enabled(enabled);
306        Ok(())
307    }
308
309    fn set_video_track(&mut self, stream_index: i32, enabled: bool) -> Result<(), PlayerError> {
310        self.player
311            .set_video_track(stream_index)
312            .map_err(|_| PlayerError::SetTrackFailed)?;
313        self.player.set_video_track_enabled(enabled);
314        Ok(())
315    }
316}
317
318macro_rules! notify(
319    ($observer:expr, $event:expr) => {
320        $observer.lock().unwrap().send($event)
321    };
322);
323
324struct SeekChannel {
325    sender: SeekLock,
326    recv: IpcReceiver<SeekLockMsg>,
327}
328
329impl SeekChannel {
330    fn new() -> Self {
331        let (sender, recv) = channel::<SeekLockMsg>().expect("Couldn't create IPC channel");
332        Self {
333            sender: SeekLock {
334                lock_channel: sender,
335            },
336            recv,
337        }
338    }
339
340    fn sender(&self) -> SeekLock {
341        self.sender.clone()
342    }
343
344    fn _await(&self) -> SeekLockMsg {
345        self.recv.recv().unwrap()
346    }
347}
348
349pub struct GStreamerPlayer {
350    /// The player unique ID.
351    id: usize,
352    /// The ID of the client context this player belongs to.
353    context_id: ClientContextId,
354    /// Channel to communicate with the owner GStreamerBackend instance.
355    backend_chan: Arc<Mutex<Sender<BackendMsg>>>,
356    inner: RefCell<Option<Arc<Mutex<PlayerInner>>>>,
357    observer: Arc<Mutex<IpcSender<PlayerEvent>>>,
358    audio_renderer: Option<Arc<Mutex<dyn AudioRenderer>>>,
359    video_renderer: Option<Arc<Mutex<dyn VideoFrameRenderer>>>,
360    /// Indicates whether the setup was succesfully performed and
361    /// we are ready to consume a/v data.
362    is_ready: Arc<Once>,
363    /// Indicates whether the type of media stream to be played is a live stream.
364    stream_type: StreamType,
365    /// Decorator used to setup the video sink and process the produced frames.
366    render: Arc<Mutex<GStreamerRender>>,
367}
368
369impl GStreamerPlayer {
370    pub fn new(
371        id: usize,
372        context_id: &ClientContextId,
373        backend_chan: Arc<Mutex<Sender<BackendMsg>>>,
374        stream_type: StreamType,
375        observer: IpcSender<PlayerEvent>,
376        video_renderer: Option<Arc<Mutex<dyn VideoFrameRenderer>>>,
377        audio_renderer: Option<Arc<Mutex<dyn AudioRenderer>>>,
378        gl_context: Box<dyn PlayerGLContext>,
379    ) -> GStreamerPlayer {
380        let _ = gst::DebugCategory::new(
381            "servoplayer",
382            gst::DebugColorFlags::empty(),
383            Some("Servo player"),
384        );
385
386        Self {
387            id,
388            context_id: *context_id,
389            backend_chan,
390            inner: RefCell::new(None),
391            observer: Arc::new(Mutex::new(observer)),
392            audio_renderer,
393            video_renderer,
394            is_ready: Arc::new(Once::new()),
395            stream_type,
396            render: Arc::new(Mutex::new(GStreamerRender::new(gl_context))),
397        }
398    }
399
400    fn setup(&self) -> Result<(), PlayerError> {
401        if self.inner.borrow().is_some() {
402            return Ok(());
403        }
404
405        // Check that we actually have the elements that we
406        // need to make this work.
407        for element in vec!["playbin3", "decodebin3", "queue"].iter() {
408            if gst::ElementFactory::find(element).is_none() {
409                return Err(PlayerError::Backend(format!(
410                    "Missing dependency: {}",
411                    element
412                )));
413            }
414        }
415
416        let player = gst_play::Play::default();
417        let signal_adapter = gst_play::PlaySignalAdapter::new_sync_emit(&player);
418        let pipeline = player.pipeline();
419
420        // FIXME(#282): The progressive downloading breaks playback on Windows and Android.
421        if !cfg!(any(target_os = "windows", target_os = "android")) {
422            // Set player to perform progressive downloading. This will make the
423            // player store the downloaded media in a local temporary file for
424            // faster playback of already-downloaded chunks.
425            let flags = pipeline.property_value("flags");
426            let flags_class = match glib::FlagsClass::with_type(flags.type_()) {
427                Some(flags) => flags,
428                None => {
429                    return Err(PlayerError::Backend(
430                        "FlagsClass creation failed".to_owned(),
431                    ));
432                },
433            };
434            let flags_class = match flags_class.builder_with_value(flags) {
435                Some(class) => class,
436                None => {
437                    return Err(PlayerError::Backend(
438                        "FlagsClass creation failed".to_owned(),
439                    ));
440                },
441            };
442            let Some(flags) = flags_class.set_by_nick("download").build() else {
443                return Err(PlayerError::Backend(
444                    "FlagsClass creation failed".to_owned(),
445                ));
446            };
447            pipeline.set_property_from_value("flags", &flags);
448        }
449
450        // Set max size for the player buffer.
451        pipeline.set_property("buffer-size", MAX_BUFFER_SIZE);
452
453        // Set player position interval update to 0.5 seconds.
454        let mut config = player.config();
455        config.set_position_update_interval(500u32);
456        player
457            .set_config(config)
458            .map_err(|e| PlayerError::Backend(e.to_string()))?;
459
460        if let Some(ref audio_renderer) = self.audio_renderer {
461            let audio_sink = gst::ElementFactory::make("appsink")
462                .build()
463                .map_err(|error| {
464                    PlayerError::Backend(format!("appsink creation failed: {error:?}"))
465                })?;
466
467            pipeline.set_property("audio-sink", &audio_sink);
468
469            let audio_sink = audio_sink.dynamic_cast::<gst_app::AppSink>().unwrap();
470
471            let weak_audio_renderer = Arc::downgrade(&audio_renderer);
472
473            audio_sink.set_callbacks(
474                gst_app::AppSinkCallbacks::builder()
475                    .new_preroll(|_| Ok(gst::FlowSuccess::Ok))
476                    .new_sample(move |audio_sink| {
477                        let sample = audio_sink.pull_sample().map_err(|_| gst::FlowError::Eos)?;
478                        let buffer = sample.buffer_owned().ok_or(gst::FlowError::Error)?;
479                        let audio_info = sample
480                            .caps()
481                            .and_then(|caps| gst_audio::AudioInfo::from_caps(caps).ok())
482                            .ok_or(gst::FlowError::Error)?;
483                        let positions = audio_info.positions().ok_or(gst::FlowError::Error)?;
484
485                        let Some(audio_renderer) = weak_audio_renderer.upgrade() else {
486                            return Err(gst::FlowError::Flushing);
487                        };
488
489                        for position in positions.iter() {
490                            let buffer = buffer.clone();
491                            let map = if let Ok(map) = buffer.into_mapped_buffer_readable() {
492                                map
493                            } else {
494                                return Err(gst::FlowError::Error);
495                            };
496                            let chunk = Box::new(GStreamerAudioChunk(map));
497                            let channel = position.to_mask() as u32;
498
499                            audio_renderer.lock().unwrap().render(chunk, channel);
500                        }
501                        Ok(gst::FlowSuccess::Ok)
502                    })
503                    .build(),
504            );
505        }
506
507        let video_sink = self.render.lock().unwrap().setup_video_sink(&pipeline)?;
508
509        // There's a known bug in gstreamer that may cause a wrong transition
510        // to the ready state while setting the uri property:
511        // https://cgit.freedesktop.org/gstreamer/gst-plugins-bad/commit/?id=afbbc3a97ec391c6a582f3c746965fdc3eb3e1f3
512        // This may affect things like setting the config, so until the bug is
513        // fixed, make sure that state dependent code happens before this line.
514        // The estimated version for the fix is 1.14.5 / 1.15.1.
515        // https://github.com/servo/servo/issues/22010#issuecomment-432599657
516        let uri = match self.stream_type {
517            StreamType::Stream => {
518                register_servo_media_stream_src().map_err(|error| {
519                    PlayerError::Backend(format!(
520                        "servomediastreamsrc registration error: {error:?}"
521                    ))
522                })?;
523                "mediastream://".to_value()
524            },
525            StreamType::Seekable => {
526                register_servo_src().map_err(|error| {
527                    PlayerError::Backend(format!("servosrc registration error: {error:?}"))
528                })?;
529                "servosrc://".to_value()
530            },
531        };
532        player.set_property("uri", &uri);
533
534        // No video_renderers no video
535        if self.video_renderer.is_none() {
536            player.set_video_track_enabled(false);
537        }
538
539        *self.inner.borrow_mut() = Some(Arc::new(Mutex::new(PlayerInner {
540            player,
541            signal_adapter: signal_adapter.clone(),
542            source: None,
543            video_sink,
544            input_size: 0,
545            rate: 1.0,
546            stream_type: self.stream_type,
547            last_metadata: None,
548            cat: gst::DebugCategory::get("servoplayer").unwrap(),
549            enough_data: Arc::new(AtomicBool::new(false)),
550        })));
551
552        let inner = self.inner.borrow();
553        let inner = inner.as_ref().unwrap();
554        let observer = self.observer.clone();
555        // Handle `end-of-stream` signal.
556        signal_adapter.connect_end_of_stream(move |_| {
557            let _ = notify!(observer, PlayerEvent::EndOfStream);
558        });
559
560        let observer = self.observer.clone();
561        // Handle `error` signal
562        signal_adapter.connect_error(move |_self, error, _details| {
563            let _ = notify!(observer, PlayerEvent::Error(error.to_string()));
564        });
565
566        let observer = self.observer.clone();
567        // Handle `state-changed` signal.
568        signal_adapter.connect_state_changed(move |_, player_state| {
569            let state = match player_state {
570                gst_play::PlayState::Buffering => Some(PlaybackState::Buffering),
571                gst_play::PlayState::Stopped => Some(PlaybackState::Stopped),
572                gst_play::PlayState::Paused => Some(PlaybackState::Paused),
573                gst_play::PlayState::Playing => Some(PlaybackState::Playing),
574                _ => None,
575            };
576            if let Some(v) = state {
577                let _ = notify!(observer, PlayerEvent::StateChanged(v));
578            }
579        });
580
581        let observer = self.observer.clone();
582        // Handle `position-update` signal.
583        signal_adapter.connect_position_updated(move |_, position| {
584            if let Some(seconds) = position.map(|p| p.seconds()) {
585                let _ = notify!(observer, PlayerEvent::PositionChanged(seconds));
586            }
587        });
588
589        let observer = self.observer.clone();
590        // Handle `seek-done` signal.
591        signal_adapter.connect_seek_done(move |_, position| {
592            let _ = notify!(observer, PlayerEvent::SeekDone(position.seconds()));
593        });
594
595        // Handle `media-info-updated` signal.
596        let inner_clone = inner.clone();
597        let observer = self.observer.clone();
598        signal_adapter.connect_media_info_updated(move |_, info| {
599            let mut inner = inner_clone.lock().unwrap();
600            if let Ok(metadata) = metadata_from_media_info(info) {
601                if inner.last_metadata.as_ref() != Some(&metadata) {
602                    inner.last_metadata = Some(metadata.clone());
603                    if metadata.is_seekable {
604                        inner.player.set_rate(inner.rate);
605                    }
606                    gst::info!(
607                        inner.cat,
608                        obj = &inner.player,
609                        "Metadata updated: {:?}",
610                        metadata
611                    );
612                    let _ = notify!(observer, PlayerEvent::MetadataUpdated(metadata));
613                }
614            }
615        });
616
617        // Handle `duration-changed` signal.
618        let inner_clone = inner.clone();
619        let observer = self.observer.clone();
620        signal_adapter.connect_duration_changed(move |_, duration| {
621            let mut inner = inner_clone.lock().unwrap();
622            let duration = duration.map(|duration| {
623                let nanos = duration.nseconds();
624                let seconds = duration.seconds();
625                time::Duration::new(seconds, (nanos % 1_000_000_000) as u32)
626            });
627            let mut updated_metadata = None;
628            if let Some(ref mut metadata) = inner.last_metadata {
629                metadata.duration = duration;
630                updated_metadata = Some(metadata.clone());
631            }
632            if let Some(updated_metadata) = updated_metadata {
633                gst::info!(
634                    inner.cat,
635                    obj = &inner.player,
636                    "Duration updated: {:?}",
637                    updated_metadata
638                );
639                let _ = notify!(observer, PlayerEvent::MetadataUpdated(updated_metadata));
640            }
641        });
642
643        if let Some(video_renderer) = self.video_renderer.clone() {
644            // Creates a closure that renders a frame using the video_renderer
645            // Used in the preroll and sample callbacks
646            let render_sample = {
647                let render = self.render.clone();
648                let observer = self.observer.clone();
649                let weak_video_renderer = Arc::downgrade(&video_renderer);
650
651                move |sample: gst::Sample| {
652                    let frame = render
653                        .lock()
654                        .unwrap()
655                        .get_frame_from_sample(sample)
656                        .map_err(|_| gst::FlowError::Error)?;
657
658                    if let Some(video_renderer) = weak_video_renderer.upgrade() {
659                        video_renderer.lock().unwrap().render(frame);
660                    } else {
661                        return Err(gst::FlowError::Flushing);
662                    };
663
664                    let _ = notify!(observer, PlayerEvent::VideoFrameUpdated);
665                    Ok(gst::FlowSuccess::Ok)
666                }
667            };
668
669            // Set video_sink callbacks.
670            inner.lock().unwrap().video_sink.set_callbacks(
671                gst_app::AppSinkCallbacks::builder()
672                    .new_preroll({
673                        let render_sample = render_sample.clone();
674                        move |video_sink| {
675                            render_sample(
676                                video_sink.pull_preroll().map_err(|_| gst::FlowError::Eos)?,
677                            )
678                        }
679                    })
680                    .new_sample(move |video_sink| {
681                        render_sample(video_sink.pull_sample().map_err(|_| gst::FlowError::Eos)?)
682                    })
683                    .build(),
684            );
685        };
686
687        let (receiver, error_handler_id) = {
688            let inner_clone = inner.clone();
689            let mut inner = inner.lock().unwrap();
690            let pipeline = inner.player.pipeline();
691
692            let (sender, receiver) = mpsc::channel();
693
694            let sender = Arc::new(Mutex::new(sender));
695            let sender_clone = sender.clone();
696            let is_ready_clone = self.is_ready.clone();
697            let observer = self.observer.clone();
698            pipeline.connect("source-setup", false, move |args| {
699                let source = args[1].get::<gst::Element>().unwrap();
700
701                let mut inner = inner_clone.lock().unwrap();
702                let source = match inner.stream_type {
703                    StreamType::Seekable => {
704                        let servosrc = source
705                            .dynamic_cast::<ServoSrc>()
706                            .expect("Source element is expected to be a ServoSrc!");
707
708                        if inner.input_size > 0 {
709                            servosrc.set_size(inner.input_size as i64);
710                        }
711
712                        let sender_clone = sender.clone();
713                        let is_ready = is_ready_clone.clone();
714                        let observer_ = observer.clone();
715                        let observer__ = observer.clone();
716                        let observer___ = observer.clone();
717                        let servosrc_ = servosrc.clone();
718                        let enough_data_ = inner.enough_data.clone();
719                        let enough_data__ = inner.enough_data.clone();
720                        let seek_channel = Arc::new(Mutex::new(SeekChannel::new()));
721                        servosrc.set_callbacks(
722                            gst_app::AppSrcCallbacks::builder()
723                                .need_data(move |_, _| {
724                                    // We block the caller of the setup method until we get
725                                    // the first need-data signal, so we ensure that we
726                                    // don't miss any data between the moment the client
727                                    // calls setup and the player is actually ready to
728                                    // get any data.
729                                    is_ready.call_once(|| {
730                                        let _ = sender_clone.lock().unwrap().send(Ok(()));
731                                    });
732
733                                    enough_data_.store(false, Ordering::Relaxed);
734                                    let _ = notify!(observer_, PlayerEvent::NeedData);
735                                })
736                                .enough_data(move |_| {
737                                    enough_data__.store(true, Ordering::Relaxed);
738                                    let _ = notify!(observer__, PlayerEvent::EnoughData);
739                                })
740                                .seek_data(move |_, offset| {
741                                    let (ret, ack_channel) = if servosrc_.set_seek_offset(offset) {
742                                        let _ = notify!(
743                                            observer___,
744                                            PlayerEvent::SeekData(
745                                                offset,
746                                                seek_channel.lock().unwrap().sender()
747                                            )
748                                        );
749                                        let (ret, ack_channel) =
750                                            seek_channel.lock().unwrap()._await();
751                                        (ret, Some(ack_channel))
752                                    } else {
753                                        (true, None)
754                                    };
755
756                                    servosrc_.set_seek_done();
757                                    if let Some(ack_channel) = ack_channel {
758                                        ack_channel.send(()).unwrap();
759                                    }
760                                    ret
761                                })
762                                .build(),
763                        );
764
765                        PlayerSource::Seekable(servosrc)
766                    },
767                    StreamType::Stream => {
768                        let media_stream_src = source
769                            .dynamic_cast::<ServoMediaStreamSrc>()
770                            .expect("Source element is expected to be a ServoMediaStreamSrc!");
771                        let sender_clone = sender.clone();
772                        is_ready_clone.call_once(|| {
773                            let _ = notify!(sender_clone, Ok(()));
774                        });
775                        PlayerSource::Stream(media_stream_src)
776                    },
777                };
778
779                inner.set_src(source);
780
781                None
782            });
783
784            let error_handler_id =
785                signal_adapter.connect_error(move |signal_adapter, error, _details| {
786                    let _ = notify!(sender_clone, Err(PlayerError::Backend(error.to_string())));
787                    signal_adapter.play().stop();
788                });
789
790            let _ = inner.pause();
791
792            (receiver, error_handler_id)
793        };
794
795        let result = receiver.recv().unwrap();
796        glib::signal::signal_handler_disconnect(&inner.lock().unwrap().player, error_handler_id);
797        result
798    }
799}
800
801macro_rules! inner_player_proxy {
802    ($fn_name:ident, $return_type:ty) => {
803        fn $fn_name(&self) -> Result<$return_type, PlayerError> {
804            self.setup()?;
805            let inner = self.inner.borrow();
806            let mut inner = inner.as_ref().unwrap().lock().unwrap();
807            inner.$fn_name()
808        }
809    };
810
811    ($fn_name:ident, $arg1:ident, $arg1_type:ty) => {
812        fn $fn_name(&self, $arg1: $arg1_type) -> Result<(), PlayerError> {
813            self.setup()?;
814            let inner = self.inner.borrow();
815            let mut inner = inner.as_ref().unwrap().lock().unwrap();
816            inner.$fn_name($arg1)
817        }
818    };
819}
820
821impl Player for GStreamerPlayer {
822    inner_player_proxy!(play, ());
823    inner_player_proxy!(pause, ());
824    inner_player_proxy!(stop, ());
825    inner_player_proxy!(end_of_stream, ());
826    inner_player_proxy!(set_input_size, size, u64);
827    inner_player_proxy!(set_mute, val, bool);
828    inner_player_proxy!(set_rate, rate, f64);
829    inner_player_proxy!(push_data, data, Vec<u8>);
830    inner_player_proxy!(seek, time, f64);
831    inner_player_proxy!(set_volume, value, f64);
832    inner_player_proxy!(buffered, Vec<Range<f64>>);
833
834    fn seekable(&self) -> Result<Vec<Range<f64>>, PlayerError> {
835        self.setup()?;
836        let inner = self.inner.borrow();
837        let mut inner = inner.as_ref().unwrap().lock().unwrap();
838        // if the servosrc is seekable, we should return the duration of the media
839        if let Some(metadata) = inner.last_metadata.as_ref() {
840            if metadata.is_seekable {
841                if let Some(duration) = metadata.duration {
842                    return Ok(vec![Range {
843                        start: 0.0,
844                        end: duration.as_secs_f64(),
845                    }]);
846                }
847            }
848        }
849        // if the servosrc is not seekable, we should return the buffered range
850        inner.buffered()
851    }
852
853    fn render_use_gl(&self) -> bool {
854        self.render.lock().unwrap().is_gl()
855    }
856
857    fn set_stream(&self, stream: &MediaStreamId, only_stream: bool) -> Result<(), PlayerError> {
858        self.setup()?;
859        let inner = self.inner.borrow();
860        let mut inner = inner.as_ref().unwrap().lock().unwrap();
861        inner.set_stream(stream, only_stream)
862    }
863
864    fn set_audio_track(&self, stream_index: i32, enabled: bool) -> Result<(), PlayerError> {
865        self.setup()?;
866        let inner = self.inner.borrow();
867        let mut inner = inner.as_ref().unwrap().lock().unwrap();
868        inner.set_audio_track(stream_index, enabled)
869    }
870
871    fn set_video_track(&self, stream_index: i32, enabled: bool) -> Result<(), PlayerError> {
872        self.setup()?;
873        let inner = self.inner.borrow();
874        let mut inner = inner.as_ref().unwrap().lock().unwrap();
875        inner.set_video_track(stream_index, enabled)
876    }
877}
878
879impl MediaInstance for GStreamerPlayer {
880    fn get_id(&self) -> usize {
881        self.id
882    }
883
884    fn mute(&self, val: bool) -> Result<(), ()> {
885        self.set_mute(val).map_err(|_| ())
886    }
887
888    fn suspend(&self) -> Result<(), ()> {
889        self.pause().map_err(|_| ())
890    }
891
892    fn resume(&self) -> Result<(), ()> {
893        self.play().map_err(|_| ())
894    }
895}
896
897impl Drop for GStreamerPlayer {
898    fn drop(&mut self) {
899        let _ = self.stop();
900        let (tx_ack, rx_ack) = mpsc::channel();
901        let _ = self
902            .backend_chan
903            .lock()
904            .unwrap()
905            .send(BackendMsg::Shutdown {
906                context: self.context_id,
907                id: self.id,
908                tx_ack,
909            });
910        let _ = rx_ack.recv();
911    }
912}