Skip to main content

servo_media_gstreamer/
player.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
4
5use std::cell::{Cell, RefCell};
6use std::ops::Range;
7use std::sync::atomic::{AtomicBool, Ordering};
8use std::sync::mpsc::{self, Sender};
9use std::sync::{Arc, Mutex, Once};
10use std::time;
11
12use byte_slice_cast::AsSliceOf;
13use glib;
14use glib::prelude::*;
15use gstreamer;
16use gstreamer_app;
17use gstreamer_play;
18use gstreamer_play::prelude::*;
19use ipc_channel::ipc::{IpcReceiver, IpcSender, channel};
20use servo_media::MediaInstanceError;
21use servo_media_player::audio::AudioRenderer;
22use servo_media_player::context::PlayerGLContext;
23use servo_media_player::metadata::Metadata;
24use servo_media_player::video::VideoFrameRenderer;
25use servo_media_player::{
26    PlaybackState, Player, PlayerError, PlayerEvent, SeekLock, SeekLockMsg, StreamType,
27};
28use servo_media_streams::registry::{MediaStreamId, get_stream};
29use servo_media_traits::{BackendMsg, ClientContextId, MediaInstance};
30
31use super::BACKEND_BASE_TIME;
32use crate::media_stream::GStreamerMediaStream;
33use crate::media_stream_source::{ServoMediaStreamSrc, register_servo_media_stream_src};
34use crate::render::GStreamerRender;
35use crate::source::{ServoSrc, register_servo_src};
36
37const DEFAULT_MUTED: bool = false;
38const DEFAULT_PAUSED: bool = true;
39const DEFAULT_CAN_RESUME: bool = false;
40const DEFAULT_PLAYBACK_RATE: f64 = 1.0;
41const DEFAULT_VOLUME: f64 = 1.0;
42const DEFAULT_TIME_RANGES: Vec<Range<f64>> = vec![];
43
44const MAX_BUFFER_SIZE: i32 = 500 * 1024 * 1024;
45
46fn metadata_from_media_info(media_info: &gstreamer_play::PlayMediaInfo) -> Result<Metadata, ()> {
47    let dur = media_info.duration();
48    let duration = if let Some(dur) = dur {
49        let mut nanos = dur.nseconds();
50        nanos %= 1_000_000_000;
51        let seconds = dur.seconds();
52        Some(time::Duration::new(seconds, nanos as u32))
53    } else {
54        None
55    };
56
57    let mut audio_tracks = Vec::new();
58    let mut video_tracks = Vec::new();
59
60    let format = media_info
61        .container_format()
62        .unwrap_or_else(|| glib::GString::from(""))
63        .to_string();
64
65    for stream_info in media_info.stream_list() {
66        let stream_type = stream_info.stream_type();
67        match stream_type.as_str() {
68            "audio" => {
69                let codec = stream_info
70                    .codec()
71                    .unwrap_or_else(|| glib::GString::from(""))
72                    .to_string();
73                audio_tracks.push(codec);
74            },
75            "video" => {
76                let codec = stream_info
77                    .codec()
78                    .unwrap_or_else(|| glib::GString::from(""))
79                    .to_string();
80                video_tracks.push(codec);
81            },
82            _ => {},
83        }
84    }
85
86    let mut width: u32 = 0;
87    let height: u32 = if media_info.number_of_video_streams() > 0 {
88        let first_video_stream = &media_info.video_streams()[0];
89        width = first_video_stream.width() as u32;
90        first_video_stream.height() as u32
91    } else {
92        0
93    };
94
95    let is_seekable = media_info.is_seekable();
96    let is_live = media_info.is_live();
97    let title = media_info.title().map(|s| s.as_str().to_string());
98
99    Ok(Metadata {
100        duration,
101        width,
102        height,
103        format,
104        is_seekable,
105        audio_tracks,
106        video_tracks,
107        is_live,
108        title,
109    })
110}
111
112pub struct GStreamerAudioChunk(gstreamer::buffer::MappedBuffer<gstreamer::buffer::Readable>);
113impl AsRef<[f32]> for GStreamerAudioChunk {
114    fn as_ref(&self) -> &[f32] {
115        self.0.as_ref().as_slice_of::<f32>().unwrap_or_default()
116    }
117}
118
119#[derive(PartialEq)]
120enum PlayerSource {
121    Seekable(ServoSrc),
122    Stream(ServoMediaStreamSrc),
123}
124
125struct PlayerInner {
126    player: gstreamer_play::Play,
127    _signal_adapter: gstreamer_play::PlaySignalAdapter,
128    source: Option<PlayerSource>,
129    video_sink: gstreamer_app::AppSink,
130    input_size: u64,
131    play_state: gstreamer_play::PlayState,
132    paused: Cell<bool>,
133    can_resume: Cell<bool>,
134    playback_rate: Cell<f64>,
135    muted: Cell<bool>,
136    volume: Cell<f64>,
137    stream_type: StreamType,
138    last_metadata: Option<Metadata>,
139    cat: gstreamer::DebugCategory,
140    enough_data: Arc<AtomicBool>,
141}
142
143impl PlayerInner {
144    pub fn set_input_size(&mut self, size: u64) -> Result<(), PlayerError> {
145        // Set input_size to proxy its value, since it
146        // could be set by the user before calling .setup().
147        self.input_size = size;
148        if let Some(PlayerSource::Seekable(ref mut source)) = self.source {
149            source.set_size(if size > 0 {
150                size as i64
151            } else {
152                -1 // live source
153            });
154        }
155        Ok(())
156    }
157
158    pub fn set_mute(&mut self, muted: bool) -> Result<(), PlayerError> {
159        if self.muted.get() == muted {
160            return Ok(());
161        }
162
163        self.muted.set(muted);
164        self.player.set_mute(muted);
165        Ok(())
166    }
167
168    pub fn muted(&self) -> bool {
169        self.muted.get()
170    }
171
172    pub fn set_playback_rate(&mut self, playback_rate: f64) -> Result<(), PlayerError> {
173        if self.stream_type != StreamType::Seekable {
174            return Err(PlayerError::NonSeekableStream);
175        }
176
177        if self.playback_rate.get() == playback_rate {
178            return Ok(());
179        }
180
181        self.playback_rate.set(playback_rate);
182
183        // The new playback rate will not be passed to the pipeline if the
184        // current GstPlay state is less than GST_STATE_PAUSED, which will be
185        // set immediately before the initial gstreamer_play_MESSAGE_MEDIA_INFO_UPDATED
186        // message is posted to bus.
187        if self.last_metadata.is_some() {
188            self.player.set_rate(playback_rate);
189        }
190        Ok(())
191    }
192
193    pub fn playback_rate(&self) -> f64 {
194        self.playback_rate.get()
195    }
196
197    pub fn play(&mut self) -> Result<(), PlayerError> {
198        if !self.paused.get() {
199            return Ok(());
200        }
201
202        self.paused.set(false);
203        self.can_resume.set(false);
204        self.player.play();
205        Ok(())
206    }
207
208    pub fn stop(&mut self) -> Result<(), PlayerError> {
209        self.player.stop();
210        self.paused.set(true);
211        self.can_resume.set(false);
212        self.last_metadata = None;
213        self.source = None;
214        Ok(())
215    }
216
217    pub fn pause(&mut self) -> Result<(), PlayerError> {
218        if self.paused.get() {
219            return Ok(());
220        }
221
222        self.paused.set(true);
223        self.can_resume.set(true);
224        self.player.pause();
225        Ok(())
226    }
227
228    pub fn paused(&self) -> bool {
229        self.paused.get()
230    }
231
232    pub fn can_resume(&self) -> bool {
233        self.can_resume.get()
234    }
235
236    pub fn end_of_stream(&mut self) -> Result<(), PlayerError> {
237        match self.source {
238            Some(ref mut source) => {
239                if let PlayerSource::Seekable(source) = source {
240                    source
241                        .push_end_of_stream()
242                        .map(|_| ())
243                        .map_err(|_| PlayerError::EOSFailed)
244                } else {
245                    Ok(())
246                }
247            },
248            _ => Ok(()),
249        }
250    }
251
252    pub fn seek(&mut self, time: f64) -> Result<(), PlayerError> {
253        if self.stream_type != StreamType::Seekable {
254            return Err(PlayerError::NonSeekableStream);
255        }
256        if let Some(ref metadata) = self.last_metadata &&
257            let Some(ref duration) = metadata.duration &&
258            duration < &time::Duration::new(time as u64, 0)
259        {
260            gstreamer::warning!(self.cat, obj = &self.player, "Trying to seek out of range");
261            return Err(PlayerError::SeekOutOfRange);
262        }
263
264        let time = time * 1_000_000_000.;
265        self.player
266            .seek(gstreamer::ClockTime::from_nseconds(time as u64));
267        Ok(())
268    }
269
270    pub fn set_volume(&mut self, volume: f64) -> Result<(), PlayerError> {
271        if self.volume.get() == volume {
272            return Ok(());
273        }
274
275        self.volume.set(volume);
276        self.player.set_volume(volume);
277        Ok(())
278    }
279
280    pub fn volume(&self) -> f64 {
281        self.volume.get()
282    }
283
284    pub fn push_data(&mut self, data: Vec<u8>) -> Result<(), PlayerError> {
285        if let Some(PlayerSource::Seekable(ref mut source)) = self.source {
286            if self.enough_data.load(Ordering::Relaxed) {
287                return Err(PlayerError::EnoughData);
288            }
289            return source
290                .push_buffer(data)
291                .map(|_| ())
292                .map_err(|_| PlayerError::BufferPushFailed);
293        }
294        Err(PlayerError::BufferPushFailed)
295    }
296
297    pub fn set_src(&mut self, source: PlayerSource) {
298        self.source = Some(source);
299    }
300
301    pub fn buffered(&self) -> Vec<Range<f64>> {
302        let mut buffered_ranges = vec![];
303
304        let Some(duration) = self
305            .last_metadata
306            .as_ref()
307            .and_then(|metadata| metadata.duration)
308        else {
309            return buffered_ranges;
310        };
311
312        let pipeline = self.player.pipeline();
313        let mut buffering = gstreamer::query::Buffering::new(gstreamer::Format::Percent);
314        if pipeline.query(&mut buffering) {
315            let ranges = buffering.ranges();
316            for (start, end) in ranges {
317                let start = (if let gstreamer::GenericFormattedValue::Percent(start) = start {
318                    start.unwrap()
319                } else {
320                    gstreamer::format::Percent::from_percent(0)
321                } / gstreamer::format::Percent::MAX) as f64 *
322                    duration.as_secs_f64();
323                let end = (if let gstreamer::GenericFormattedValue::Percent(end) = end {
324                    end.unwrap()
325                } else {
326                    gstreamer::format::Percent::from_percent(0)
327                } / gstreamer::format::Percent::MAX) as f64 *
328                    duration.as_secs_f64();
329                buffered_ranges.push(Range { start, end });
330            }
331        }
332
333        buffered_ranges
334    }
335
336    pub fn seekable(&self) -> Vec<Range<f64>> {
337        // if the servosrc is seekable, we should return the duration of the media
338        if let Some(metadata) = self.last_metadata.as_ref() &&
339            metadata.is_seekable &&
340            let Some(duration) = metadata.duration
341        {
342            return vec![Range {
343                start: 0.0,
344                end: duration.as_secs_f64(),
345            }];
346        }
347
348        // if the servosrc is not seekable, we should return the buffered range
349        self.buffered()
350    }
351
352    fn set_stream(&mut self, stream: &MediaStreamId, only_stream: bool) -> Result<(), PlayerError> {
353        debug_assert!(self.stream_type == StreamType::Stream);
354        let Some(PlayerSource::Stream(ref source)) = self.source else {
355            return Err(PlayerError::SetStreamFailed);
356        };
357
358        let stream = get_stream(stream).expect("Media streams registry does not contain such ID");
359        let mut stream = stream.lock().unwrap();
360        let Some(stream) = stream.as_mut_any().downcast_mut::<GStreamerMediaStream>() else {
361            return Err(PlayerError::SetStreamFailed);
362        };
363
364        let playbin = self
365            .player
366            .pipeline()
367            .dynamic_cast::<gstreamer::Pipeline>()
368            .unwrap();
369        let clock = gstreamer::SystemClock::obtain();
370        playbin.set_base_time(*BACKEND_BASE_TIME);
371        playbin.set_start_time(gstreamer::ClockTime::NONE);
372        playbin.use_clock(Some(&clock));
373        source
374            .set_stream(stream, only_stream)
375            .map_err(|_| PlayerError::SetStreamFailed)
376    }
377
378    fn set_audio_track(&mut self, stream_index: i32, enabled: bool) -> Result<(), PlayerError> {
379        self.player
380            .set_audio_track(stream_index)
381            .map_err(|_| PlayerError::SetTrackFailed)?;
382        self.player.set_audio_track_enabled(enabled);
383        Ok(())
384    }
385
386    fn set_video_track(&mut self, stream_index: i32, enabled: bool) -> Result<(), PlayerError> {
387        self.player
388            .set_video_track(stream_index)
389            .map_err(|_| PlayerError::SetTrackFailed)?;
390        self.player.set_video_track_enabled(enabled);
391        Ok(())
392    }
393}
394
395macro_rules! notify(
396    ($observer:expr_2021, $event:expr_2021) => {
397        $observer.lock().unwrap().send($event)
398    };
399);
400
401struct SeekChannel {
402    sender: SeekLock,
403    recv: IpcReceiver<SeekLockMsg>,
404}
405
406impl SeekChannel {
407    fn new() -> Self {
408        let (sender, recv) = channel::<SeekLockMsg>().expect("Couldn't create IPC channel");
409        Self {
410            sender: SeekLock {
411                lock_channel: sender,
412            },
413            recv,
414        }
415    }
416
417    fn sender(&self) -> SeekLock {
418        self.sender.clone()
419    }
420
421    fn _await(&self) -> SeekLockMsg {
422        self.recv.recv().unwrap()
423    }
424}
425
426pub struct GStreamerPlayer {
427    /// The player unique ID.
428    id: usize,
429    /// The ID of the client context this player belongs to.
430    context_id: ClientContextId,
431    /// Channel to communicate with the owner GStreamerBackend instance.
432    backend_chan: Arc<Mutex<Sender<BackendMsg>>>,
433    inner: RefCell<Option<Arc<Mutex<PlayerInner>>>>,
434    observer: Arc<Mutex<IpcSender<PlayerEvent>>>,
435    audio_renderer: Option<Arc<Mutex<dyn AudioRenderer>>>,
436    video_renderer: Option<Arc<Mutex<dyn VideoFrameRenderer>>>,
437    /// Indicates whether the setup was succesfully performed and
438    /// we are ready to consume a/v data.
439    is_ready: Arc<Once>,
440    /// Indicates whether the type of media stream to be played is a live stream.
441    stream_type: StreamType,
442    /// Decorator used to setup the video sink and process the produced frames.
443    render: Arc<Mutex<GStreamerRender>>,
444}
445
446impl GStreamerPlayer {
447    #[allow(clippy::too_many_arguments)]
448    pub fn new(
449        id: usize,
450        context_id: &ClientContextId,
451        backend_chan: Arc<Mutex<Sender<BackendMsg>>>,
452        stream_type: StreamType,
453        observer: IpcSender<PlayerEvent>,
454        video_renderer: Option<Arc<Mutex<dyn VideoFrameRenderer>>>,
455        audio_renderer: Option<Arc<Mutex<dyn AudioRenderer>>>,
456        gl_context: Box<dyn PlayerGLContext>,
457    ) -> GStreamerPlayer {
458        let _ = gstreamer::DebugCategory::new(
459            "servoplayer",
460            gstreamer::DebugColorFlags::empty(),
461            Some("Servo player"),
462        );
463
464        Self {
465            id,
466            context_id: *context_id,
467            backend_chan,
468            inner: RefCell::new(None),
469            observer: Arc::new(Mutex::new(observer)),
470            audio_renderer,
471            video_renderer,
472            is_ready: Arc::new(Once::new()),
473            stream_type,
474            render: Arc::new(Mutex::new(GStreamerRender::new(gl_context))),
475        }
476    }
477
478    fn setup(&self) -> Result<(), PlayerError> {
479        if self.inner.borrow().is_some() {
480            return Ok(());
481        }
482
483        // Check that we actually have the elements that we
484        // need to make this work.
485        for element in ["playbin3", "decodebin3", "queue"] {
486            if gstreamer::ElementFactory::find(element).is_none() {
487                return Err(PlayerError::Backend(format!(
488                    "Missing dependency: {}",
489                    element
490                )));
491            }
492        }
493
494        let player = gstreamer_play::Play::default();
495        let signal_adapter = gstreamer_play::PlaySignalAdapter::new_sync_emit(&player);
496        let pipeline = player.pipeline();
497
498        // FIXME(#282): The progressive downloading breaks playback on Windows and Android.
499        if !cfg!(any(target_os = "windows", target_os = "android")) {
500            // Set player to perform progressive downloading. This will make the
501            // player store the downloaded media in a local temporary file for
502            // faster playback of already-downloaded chunks.
503            let flags = pipeline.property_value("flags");
504            let flags_class = match glib::FlagsClass::with_type(flags.type_()) {
505                Some(flags) => flags,
506                None => {
507                    return Err(PlayerError::Backend(
508                        "FlagsClass creation failed".to_owned(),
509                    ));
510                },
511            };
512            let flags_class = match flags_class.builder_with_value(flags) {
513                Some(class) => class,
514                None => {
515                    return Err(PlayerError::Backend(
516                        "FlagsClass creation failed".to_owned(),
517                    ));
518                },
519            };
520            let Some(flags) = flags_class.set_by_nick("download").build() else {
521                return Err(PlayerError::Backend(
522                    "FlagsClass creation failed".to_owned(),
523                ));
524            };
525            pipeline.set_property_from_value("flags", &flags);
526        }
527
528        // Set max size for the player buffer.
529        pipeline.set_property("buffer-size", MAX_BUFFER_SIZE);
530
531        // Set player position interval update to 0.5 seconds.
532        let mut config = player.config();
533        config.set_position_update_interval(500u32);
534        player
535            .set_config(config)
536            .map_err(|e| PlayerError::Backend(e.to_string()))?;
537
538        if let Some(ref audio_renderer) = self.audio_renderer {
539            let audio_sink =
540                gstreamer::ElementFactory::make("appsink")
541                    .build()
542                    .map_err(|error| {
543                        PlayerError::Backend(format!("appsink creation failed: {error:?}"))
544                    })?;
545
546            pipeline.set_property("audio-sink", &audio_sink);
547
548            let audio_sink = audio_sink.dynamic_cast::<gstreamer_app::AppSink>().unwrap();
549
550            let weak_audio_renderer = Arc::downgrade(audio_renderer);
551
552            audio_sink.set_callbacks(
553                gstreamer_app::AppSinkCallbacks::builder()
554                    .new_preroll(|_| Ok(gstreamer::FlowSuccess::Ok))
555                    .new_sample(move |audio_sink| {
556                        let sample = audio_sink
557                            .pull_sample()
558                            .map_err(|_| gstreamer::FlowError::Eos)?;
559                        let buffer = sample.buffer_owned().ok_or(gstreamer::FlowError::Error)?;
560                        let audio_info = sample
561                            .caps()
562                            .and_then(|caps| gstreamer_audio::AudioInfo::from_caps(caps).ok())
563                            .ok_or(gstreamer::FlowError::Error)?;
564                        let positions =
565                            audio_info.positions().ok_or(gstreamer::FlowError::Error)?;
566
567                        let Some(audio_renderer) = weak_audio_renderer.upgrade() else {
568                            return Err(gstreamer::FlowError::Flushing);
569                        };
570
571                        for position in positions.iter() {
572                            let buffer = buffer.clone();
573                            let map = match buffer.into_mapped_buffer_readable() {
574                                Ok(map) => map,
575                                _ => {
576                                    return Err(gstreamer::FlowError::Error);
577                                },
578                            };
579                            let chunk = Box::new(GStreamerAudioChunk(map));
580                            let channel = position.to_mask() as u32;
581
582                            audio_renderer.lock().unwrap().render(chunk, channel);
583                        }
584                        Ok(gstreamer::FlowSuccess::Ok)
585                    })
586                    .build(),
587            );
588        }
589
590        let video_sink = self.render.lock().unwrap().setup_video_sink(&pipeline)?;
591
592        // There's a known bug in gstreamer that may cause a wrong transition
593        // to the ready state while setting the uri property:
594        // https://cgit.freedesktop.org/gstreamer/gst-plugins-bad/commit/?id=afbbc3a97ec391c6a582f3c746965fdc3eb3e1f3
595        // This may affect things like setting the config, so until the bug is
596        // fixed, make sure that state dependent code happens before this line.
597        // The estimated version for the fix is 1.14.5 / 1.15.1.
598        // https://github.com/servo/servo/issues/22010#issuecomment-432599657
599        let uri = match self.stream_type {
600            StreamType::Stream => {
601                register_servo_media_stream_src().map_err(|error| {
602                    PlayerError::Backend(format!(
603                        "servomediastreamsrc registration error: {error:?}"
604                    ))
605                })?;
606                "mediastream://".to_value()
607            },
608            StreamType::Seekable => {
609                register_servo_src().map_err(|error| {
610                    PlayerError::Backend(format!("servosrc registration error: {error:?}"))
611                })?;
612                "servosrc://".to_value()
613            },
614        };
615        player.set_property("uri", &uri);
616
617        // No video_renderers no video
618        if self.video_renderer.is_none() {
619            player.set_video_track_enabled(false);
620        }
621
622        *self.inner.borrow_mut() = Some(Arc::new(Mutex::new(PlayerInner {
623            player,
624            _signal_adapter: signal_adapter.clone(),
625            source: None,
626            video_sink,
627            input_size: 0,
628            play_state: gstreamer_play::PlayState::Stopped,
629            paused: Cell::new(DEFAULT_PAUSED),
630            can_resume: Cell::new(DEFAULT_CAN_RESUME),
631            playback_rate: Cell::new(DEFAULT_PLAYBACK_RATE),
632            muted: Cell::new(DEFAULT_MUTED),
633            volume: Cell::new(DEFAULT_VOLUME),
634            stream_type: self.stream_type,
635            last_metadata: None,
636            cat: gstreamer::DebugCategory::get("servoplayer").unwrap(),
637            enough_data: Arc::new(AtomicBool::new(false)),
638        })));
639
640        let inner = self.inner.borrow();
641        let inner = inner.as_ref().unwrap();
642        let observer = self.observer.clone();
643        // Handle `end-of-stream` signal.
644        signal_adapter.connect_end_of_stream(move |_| {
645            let _ = notify!(observer, PlayerEvent::EndOfStream);
646        });
647
648        let observer = self.observer.clone();
649        // Handle `error` signal
650        signal_adapter.connect_error(move |_self, error, _details| {
651            let _ = notify!(observer, PlayerEvent::Error(error.to_string()));
652        });
653
654        let inner_clone = inner.clone();
655        let observer = self.observer.clone();
656        // Handle `state-changed` signal.
657        signal_adapter.connect_state_changed(move |_, play_state| {
658            inner_clone.lock().unwrap().play_state = play_state;
659
660            let state = match play_state {
661                gstreamer_play::PlayState::Buffering => Some(PlaybackState::Buffering),
662                gstreamer_play::PlayState::Stopped => Some(PlaybackState::Stopped),
663                gstreamer_play::PlayState::Paused => Some(PlaybackState::Paused),
664                gstreamer_play::PlayState::Playing => Some(PlaybackState::Playing),
665                _ => None,
666            };
667            if let Some(v) = state {
668                let _ = notify!(observer, PlayerEvent::StateChanged(v));
669            }
670        });
671
672        let observer = self.observer.clone();
673        // Handle `position-update` signal.
674        signal_adapter.connect_position_updated(move |_, position| {
675            if let Some(seconds) = position.map(|p| p.seconds_f64()) {
676                let _ = notify!(observer, PlayerEvent::PositionChanged(seconds));
677            }
678        });
679
680        let observer = self.observer.clone();
681        // Handle `seek-done` signal.
682        signal_adapter.connect_seek_done(move |_, position| {
683            let _ = notify!(observer, PlayerEvent::SeekDone(position.seconds_f64()));
684        });
685
686        // Handle `media-info-updated` signal.
687        let inner_clone = inner.clone();
688        let observer = self.observer.clone();
689        signal_adapter.connect_media_info_updated(move |_, info| {
690            let Ok(metadata) = metadata_from_media_info(info) else {
691                return;
692            };
693
694            let mut inner = inner_clone.lock().unwrap();
695
696            if inner.last_metadata.as_ref() == Some(&metadata) {
697                return;
698            }
699
700            // TODO: Workaround to generate expected `paused` state change event.
701            // <https://github.com/servo/servo/issues/40740>
702            let mut send_pause_event = false;
703
704            if inner.last_metadata.is_none() && metadata.is_seekable {
705                if inner.playback_rate.get() != DEFAULT_PLAYBACK_RATE {
706                    // The `paused` state change event will be fired after the
707                    // seek initiated by the playback rate change has
708                    // completed.
709                    inner.player.set_rate(inner.playback_rate.get());
710                } else if inner.play_state == gstreamer_play::PlayState::Paused {
711                    send_pause_event = true;
712                }
713            }
714
715            inner.last_metadata = Some(metadata.clone());
716            gstreamer::info!(
717                inner.cat,
718                obj = &inner.player,
719                "Metadata updated: {:?}",
720                metadata
721            );
722            let _ = notify!(observer, PlayerEvent::MetadataUpdated(metadata));
723
724            if send_pause_event {
725                let _ = notify!(observer, PlayerEvent::StateChanged(PlaybackState::Paused));
726            }
727        });
728
729        // Handle `duration-changed` signal.
730        let inner_clone = inner.clone();
731        let observer = self.observer.clone();
732        signal_adapter.connect_duration_changed(move |_, duration| {
733            let duration = duration.map(|duration| {
734                time::Duration::new(
735                    duration.seconds(),
736                    (duration.nseconds() % 1_000_000_000) as u32,
737                )
738            });
739
740            let mut inner = inner_clone.lock().unwrap();
741            if let Some(ref mut metadata) = inner.last_metadata &&
742                metadata.duration != duration
743            {
744                metadata.duration = duration;
745                gstreamer::info!(
746                    inner.cat,
747                    obj = &inner.player,
748                    "Duration changed: {:?}",
749                    duration
750                );
751                let _ = notify!(observer, PlayerEvent::DurationChanged(duration));
752            }
753        });
754
755        if let Some(video_renderer) = self.video_renderer.clone() {
756            // Creates a closure that renders a frame using the video_renderer
757            // Used in the preroll and sample callbacks
758            let render_sample = {
759                let render = self.render.clone();
760                let observer = self.observer.clone();
761                let weak_video_renderer = Arc::downgrade(&video_renderer);
762
763                move |sample: gstreamer::Sample| {
764                    let Some(frame) = render.lock().unwrap().get_frame_from_sample(sample) else {
765                        return Err(gstreamer::FlowError::Error);
766                    };
767
768                    match weak_video_renderer.upgrade() {
769                        Some(video_renderer) => {
770                            video_renderer.lock().unwrap().render(frame);
771                        },
772                        _ => {
773                            return Err(gstreamer::FlowError::Flushing);
774                        },
775                    };
776
777                    let _ = notify!(observer, PlayerEvent::VideoFrameUpdated);
778                    Ok(gstreamer::FlowSuccess::Ok)
779                }
780            };
781
782            // Set video_sink callbacks.
783            inner.lock().unwrap().video_sink.set_callbacks(
784                gstreamer_app::AppSinkCallbacks::builder()
785                    .new_preroll({
786                        let render_sample = render_sample.clone();
787                        move |video_sink| {
788                            render_sample(
789                                video_sink
790                                    .pull_preroll()
791                                    .map_err(|_| gstreamer::FlowError::Eos)?,
792                            )
793                        }
794                    })
795                    .new_sample(move |video_sink| {
796                        render_sample(
797                            video_sink
798                                .pull_sample()
799                                .map_err(|_| gstreamer::FlowError::Eos)?,
800                        )
801                    })
802                    .build(),
803            );
804        };
805
806        let (receiver, error_handler_id) = {
807            let inner_clone = inner.clone();
808            let inner = inner.lock().unwrap();
809            let pipeline = inner.player.pipeline();
810
811            let (sender, receiver) = mpsc::channel();
812
813            let sender = Arc::new(Mutex::new(sender));
814            let sender_clone = sender.clone();
815            let is_ready_clone = self.is_ready.clone();
816            let observer = self.observer.clone();
817            pipeline.connect("source-setup", false, move |args| {
818                let source = args[1].get::<gstreamer::Element>().unwrap();
819
820                let mut inner = inner_clone.lock().unwrap();
821                let source = match inner.stream_type {
822                    StreamType::Seekable => {
823                        let servosrc = source
824                            .dynamic_cast::<ServoSrc>()
825                            .expect("Source element is expected to be a ServoSrc!");
826
827                        if inner.input_size > 0 {
828                            servosrc.set_size(inner.input_size as i64);
829                        }
830
831                        let sender_clone = sender.clone();
832                        let is_ready = is_ready_clone.clone();
833                        let observer_ = observer.clone();
834                        let observer__ = observer.clone();
835                        let observer___ = observer.clone();
836                        let servosrc_ = servosrc.clone();
837                        let enough_data_ = inner.enough_data.clone();
838                        let enough_data__ = inner.enough_data.clone();
839                        let seek_channel = Arc::new(Mutex::new(SeekChannel::new()));
840                        servosrc.set_callbacks(
841                            gstreamer_app::AppSrcCallbacks::builder()
842                                .need_data(move |_, _| {
843                                    // We block the caller of the setup method until we get
844                                    // the first need-data signal, so we ensure that we
845                                    // don't miss any data between the moment the client
846                                    // calls setup and the player is actually ready to
847                                    // get any data.
848                                    is_ready.call_once(|| {
849                                        let _ = sender_clone.lock().unwrap().send(Ok(()));
850                                    });
851
852                                    enough_data_.store(false, Ordering::Relaxed);
853                                    let _ = notify!(observer_, PlayerEvent::NeedData);
854                                })
855                                .enough_data(move |_| {
856                                    enough_data__.store(true, Ordering::Relaxed);
857                                    let _ = notify!(observer__, PlayerEvent::EnoughData);
858                                })
859                                .seek_data(move |_, offset| {
860                                    let (ret, ack_channel) = if servosrc_.set_seek_offset(offset) {
861                                        let _ = notify!(
862                                            observer___,
863                                            PlayerEvent::SeekData(
864                                                offset,
865                                                seek_channel.lock().unwrap().sender()
866                                            )
867                                        );
868                                        let (ret, ack_channel) =
869                                            seek_channel.lock().unwrap()._await();
870                                        (ret, Some(ack_channel))
871                                    } else {
872                                        (true, None)
873                                    };
874
875                                    servosrc_.set_seek_done();
876                                    if let Some(ack_channel) = ack_channel {
877                                        ack_channel.send(()).unwrap();
878                                    }
879                                    ret
880                                })
881                                .build(),
882                        );
883
884                        PlayerSource::Seekable(servosrc)
885                    },
886                    StreamType::Stream => {
887                        let media_stream_src = source
888                            .dynamic_cast::<ServoMediaStreamSrc>()
889                            .expect("Source element is expected to be a ServoMediaStreamSrc!");
890                        let sender_clone = sender.clone();
891                        is_ready_clone.call_once(|| {
892                            let _ = notify!(sender_clone, Ok(()));
893                        });
894                        PlayerSource::Stream(media_stream_src)
895                    },
896                };
897
898                inner.set_src(source);
899
900                None
901            });
902
903            let error_handler_id =
904                signal_adapter.connect_error(move |signal_adapter, error, _details| {
905                    let _ = notify!(sender_clone, Err(PlayerError::Backend(error.to_string())));
906                    signal_adapter.play().stop();
907                });
908
909            inner.player.pause();
910
911            (receiver, error_handler_id)
912        };
913
914        let result = receiver.recv().unwrap();
915        glib::signal::signal_handler_disconnect(&inner.lock().unwrap().player, error_handler_id);
916        result
917    }
918}
919
920macro_rules! inner_player_proxy_getter {
921    ($fn_name:ident, $return_type:ty, $default_value:expr_2021) => {
922        fn $fn_name(&self) -> $return_type {
923            if self.setup().is_err() {
924                return $default_value;
925            }
926
927            let inner = self.inner.borrow();
928            let inner = inner.as_ref().unwrap().lock().unwrap();
929            inner.$fn_name()
930        }
931    };
932}
933
934macro_rules! inner_player_proxy {
935    ($fn_name:ident, $return_type:ty) => {
936        fn $fn_name(&self) -> Result<$return_type, PlayerError> {
937            self.setup()?;
938            let inner = self.inner.borrow();
939            let mut inner = inner.as_ref().unwrap().lock().unwrap();
940            inner.$fn_name()
941        }
942    };
943
944    ($fn_name:ident, $arg1:ident, $arg1_type:ty) => {
945        fn $fn_name(&self, $arg1: $arg1_type) -> Result<(), PlayerError> {
946            self.setup()?;
947            let inner = self.inner.borrow();
948            let mut inner = inner.as_ref().unwrap().lock().unwrap();
949            inner.$fn_name($arg1)
950        }
951    };
952
953    ($fn_name:ident, $arg1:ident, $arg1_type:ty, $arg2:ident, $arg2_type:ty) => {
954        fn $fn_name(&self, $arg1: $arg1_type, $arg2: $arg2_type) -> Result<(), PlayerError> {
955            self.setup()?;
956            let inner = self.inner.borrow();
957            let mut inner = inner.as_ref().unwrap().lock().unwrap();
958            inner.$fn_name($arg1, $arg2)
959        }
960    };
961}
962
963impl Player for GStreamerPlayer {
964    inner_player_proxy!(play, ());
965    inner_player_proxy!(pause, ());
966    inner_player_proxy_getter!(paused, bool, DEFAULT_PAUSED);
967    inner_player_proxy_getter!(can_resume, bool, DEFAULT_CAN_RESUME);
968    inner_player_proxy!(stop, ());
969    inner_player_proxy!(end_of_stream, ());
970    inner_player_proxy!(set_input_size, size, u64);
971    inner_player_proxy!(set_mute, muted, bool);
972    inner_player_proxy_getter!(muted, bool, DEFAULT_MUTED);
973    inner_player_proxy!(set_playback_rate, playback_rate, f64);
974    inner_player_proxy_getter!(playback_rate, f64, DEFAULT_PLAYBACK_RATE);
975    inner_player_proxy!(push_data, data, Vec<u8>);
976    inner_player_proxy!(seek, time, f64);
977    inner_player_proxy!(set_volume, volume, f64);
978    inner_player_proxy_getter!(volume, f64, DEFAULT_VOLUME);
979    inner_player_proxy_getter!(buffered, Vec<Range<f64>>, DEFAULT_TIME_RANGES);
980    inner_player_proxy_getter!(seekable, Vec<Range<f64>>, DEFAULT_TIME_RANGES);
981    inner_player_proxy!(set_stream, stream, &MediaStreamId, only_stream, bool);
982    inner_player_proxy!(set_audio_track, stream_index, i32, enabled, bool);
983    inner_player_proxy!(set_video_track, stream_index, i32, enabled, bool);
984
985    fn render_use_gl(&self) -> bool {
986        self.render.lock().unwrap().is_gl()
987    }
988}
989
990impl MediaInstance for GStreamerPlayer {
991    fn get_id(&self) -> usize {
992        self.id
993    }
994
995    fn mute(&self, val: bool) -> Result<(), MediaInstanceError> {
996        self.set_mute(val).map_err(|_| MediaInstanceError)
997    }
998
999    fn suspend(&self) -> Result<(), MediaInstanceError> {
1000        self.pause().map_err(|_| MediaInstanceError)
1001    }
1002
1003    fn resume(&self) -> Result<(), MediaInstanceError> {
1004        if !self.can_resume() {
1005            return Ok(());
1006        }
1007
1008        self.play().map_err(|_| MediaInstanceError)
1009    }
1010}
1011
1012impl Drop for GStreamerPlayer {
1013    fn drop(&mut self) {
1014        let _ = self.stop();
1015        let (tx_ack, rx_ack) = mpsc::channel();
1016        let _ = self
1017            .backend_chan
1018            .lock()
1019            .unwrap()
1020            .send(BackendMsg::Shutdown {
1021                context: self.context_id,
1022                id: self.id,
1023                tx_ack,
1024            });
1025        let _ = rx_ack.recv();
1026    }
1027}