1use std::cell::{Cell, RefCell};
6use std::ops::Range;
7use std::sync::atomic::{AtomicBool, Ordering};
8use std::sync::mpsc::{self, Sender};
9use std::sync::{Arc, Mutex, Once};
10use std::time;
11
12use byte_slice_cast::AsSliceOf;
13use glib;
14use glib::prelude::*;
15use gstreamer;
16use gstreamer_app;
17use gstreamer_play;
18use gstreamer_play::prelude::*;
19use ipc_channel::ipc::{IpcReceiver, IpcSender, channel};
20use servo_media::MediaInstanceError;
21use servo_media_player::audio::AudioRenderer;
22use servo_media_player::context::PlayerGLContext;
23use servo_media_player::metadata::Metadata;
24use servo_media_player::video::VideoFrameRenderer;
25use servo_media_player::{
26 PlaybackState, Player, PlayerError, PlayerEvent, SeekLock, SeekLockMsg, StreamType,
27};
28use servo_media_streams::registry::{MediaStreamId, get_stream};
29use servo_media_traits::{BackendMsg, ClientContextId, MediaInstance};
30
31use super::BACKEND_BASE_TIME;
32use crate::media_stream::GStreamerMediaStream;
33use crate::media_stream_source::{ServoMediaStreamSrc, register_servo_media_stream_src};
34use crate::render::GStreamerRender;
35use crate::source::{ServoSrc, register_servo_src};
36
37const DEFAULT_MUTED: bool = false;
38const DEFAULT_PAUSED: bool = true;
39const DEFAULT_CAN_RESUME: bool = false;
40const DEFAULT_PLAYBACK_RATE: f64 = 1.0;
41const DEFAULT_VOLUME: f64 = 1.0;
42const DEFAULT_TIME_RANGES: Vec<Range<f64>> = vec![];
43
44const MAX_BUFFER_SIZE: i32 = 500 * 1024 * 1024;
45
46fn metadata_from_media_info(media_info: &gstreamer_play::PlayMediaInfo) -> Result<Metadata, ()> {
47 let dur = media_info.duration();
48 let duration = if let Some(dur) = dur {
49 let mut nanos = dur.nseconds();
50 nanos %= 1_000_000_000;
51 let seconds = dur.seconds();
52 Some(time::Duration::new(seconds, nanos as u32))
53 } else {
54 None
55 };
56
57 let mut audio_tracks = Vec::new();
58 let mut video_tracks = Vec::new();
59
60 let format = media_info
61 .container_format()
62 .unwrap_or_else(|| glib::GString::from(""))
63 .to_string();
64
65 for stream_info in media_info.stream_list() {
66 let stream_type = stream_info.stream_type();
67 match stream_type.as_str() {
68 "audio" => {
69 let codec = stream_info
70 .codec()
71 .unwrap_or_else(|| glib::GString::from(""))
72 .to_string();
73 audio_tracks.push(codec);
74 },
75 "video" => {
76 let codec = stream_info
77 .codec()
78 .unwrap_or_else(|| glib::GString::from(""))
79 .to_string();
80 video_tracks.push(codec);
81 },
82 _ => {},
83 }
84 }
85
86 let mut width: u32 = 0;
87 let height: u32 = if media_info.number_of_video_streams() > 0 {
88 let first_video_stream = &media_info.video_streams()[0];
89 width = first_video_stream.width() as u32;
90 first_video_stream.height() as u32
91 } else {
92 0
93 };
94
95 let is_seekable = media_info.is_seekable();
96 let is_live = media_info.is_live();
97 let title = media_info.title().map(|s| s.as_str().to_string());
98
99 Ok(Metadata {
100 duration,
101 width,
102 height,
103 format,
104 is_seekable,
105 audio_tracks,
106 video_tracks,
107 is_live,
108 title,
109 })
110}
111
112pub struct GStreamerAudioChunk(gstreamer::buffer::MappedBuffer<gstreamer::buffer::Readable>);
113impl AsRef<[f32]> for GStreamerAudioChunk {
114 fn as_ref(&self) -> &[f32] {
115 self.0.as_ref().as_slice_of::<f32>().unwrap_or_default()
116 }
117}
118
119#[derive(PartialEq)]
120enum PlayerSource {
121 Seekable(ServoSrc),
122 Stream(ServoMediaStreamSrc),
123}
124
125struct PlayerInner {
126 player: gstreamer_play::Play,
127 _signal_adapter: gstreamer_play::PlaySignalAdapter,
128 source: Option<PlayerSource>,
129 video_sink: gstreamer_app::AppSink,
130 input_size: u64,
131 play_state: gstreamer_play::PlayState,
132 paused: Cell<bool>,
133 can_resume: Cell<bool>,
134 playback_rate: Cell<f64>,
135 muted: Cell<bool>,
136 volume: Cell<f64>,
137 stream_type: StreamType,
138 last_metadata: Option<Metadata>,
139 cat: gstreamer::DebugCategory,
140 enough_data: Arc<AtomicBool>,
141}
142
143impl PlayerInner {
144 pub fn set_input_size(&mut self, size: u64) -> Result<(), PlayerError> {
145 self.input_size = size;
148 if let Some(PlayerSource::Seekable(ref mut source)) = self.source {
149 source.set_size(if size > 0 {
150 size as i64
151 } else {
152 -1 });
154 }
155 Ok(())
156 }
157
158 pub fn set_mute(&mut self, muted: bool) -> Result<(), PlayerError> {
159 if self.muted.get() == muted {
160 return Ok(());
161 }
162
163 self.muted.set(muted);
164 self.player.set_mute(muted);
165 Ok(())
166 }
167
168 pub fn muted(&self) -> bool {
169 self.muted.get()
170 }
171
172 pub fn set_playback_rate(&mut self, playback_rate: f64) -> Result<(), PlayerError> {
173 if self.stream_type != StreamType::Seekable {
174 return Err(PlayerError::NonSeekableStream);
175 }
176
177 if self.playback_rate.get() == playback_rate {
178 return Ok(());
179 }
180
181 self.playback_rate.set(playback_rate);
182
183 if self.last_metadata.is_some() {
188 self.player.set_rate(playback_rate);
189 }
190 Ok(())
191 }
192
193 pub fn playback_rate(&self) -> f64 {
194 self.playback_rate.get()
195 }
196
197 pub fn play(&mut self) -> Result<(), PlayerError> {
198 if !self.paused.get() {
199 return Ok(());
200 }
201
202 self.paused.set(false);
203 self.can_resume.set(false);
204 self.player.play();
205 Ok(())
206 }
207
208 pub fn stop(&mut self) -> Result<(), PlayerError> {
209 self.player.stop();
210 self.paused.set(true);
211 self.can_resume.set(false);
212 self.last_metadata = None;
213 self.source = None;
214 Ok(())
215 }
216
217 pub fn pause(&mut self) -> Result<(), PlayerError> {
218 if self.paused.get() {
219 return Ok(());
220 }
221
222 self.paused.set(true);
223 self.can_resume.set(true);
224 self.player.pause();
225 Ok(())
226 }
227
228 pub fn paused(&self) -> bool {
229 self.paused.get()
230 }
231
232 pub fn can_resume(&self) -> bool {
233 self.can_resume.get()
234 }
235
236 pub fn end_of_stream(&mut self) -> Result<(), PlayerError> {
237 match self.source {
238 Some(ref mut source) => {
239 if let PlayerSource::Seekable(source) = source {
240 source
241 .push_end_of_stream()
242 .map(|_| ())
243 .map_err(|_| PlayerError::EOSFailed)
244 } else {
245 Ok(())
246 }
247 },
248 _ => Ok(()),
249 }
250 }
251
252 pub fn seek(&mut self, time: f64) -> Result<(), PlayerError> {
253 if self.stream_type != StreamType::Seekable {
254 return Err(PlayerError::NonSeekableStream);
255 }
256 if let Some(ref metadata) = self.last_metadata &&
257 let Some(ref duration) = metadata.duration &&
258 duration < &time::Duration::new(time as u64, 0)
259 {
260 gstreamer::warning!(self.cat, obj = &self.player, "Trying to seek out of range");
261 return Err(PlayerError::SeekOutOfRange);
262 }
263
264 let time = time * 1_000_000_000.;
265 self.player
266 .seek(gstreamer::ClockTime::from_nseconds(time as u64));
267 Ok(())
268 }
269
270 pub fn set_volume(&mut self, volume: f64) -> Result<(), PlayerError> {
271 if self.volume.get() == volume {
272 return Ok(());
273 }
274
275 self.volume.set(volume);
276 self.player.set_volume(volume);
277 Ok(())
278 }
279
280 pub fn volume(&self) -> f64 {
281 self.volume.get()
282 }
283
284 pub fn push_data(&mut self, data: Vec<u8>) -> Result<(), PlayerError> {
285 if let Some(PlayerSource::Seekable(ref mut source)) = self.source {
286 if self.enough_data.load(Ordering::Relaxed) {
287 return Err(PlayerError::EnoughData);
288 }
289 return source
290 .push_buffer(data)
291 .map(|_| ())
292 .map_err(|_| PlayerError::BufferPushFailed);
293 }
294 Err(PlayerError::BufferPushFailed)
295 }
296
297 pub fn set_src(&mut self, source: PlayerSource) {
298 self.source = Some(source);
299 }
300
301 pub fn buffered(&self) -> Vec<Range<f64>> {
302 let mut buffered_ranges = vec![];
303
304 let Some(duration) = self
305 .last_metadata
306 .as_ref()
307 .and_then(|metadata| metadata.duration)
308 else {
309 return buffered_ranges;
310 };
311
312 let pipeline = self.player.pipeline();
313 let mut buffering = gstreamer::query::Buffering::new(gstreamer::Format::Percent);
314 if pipeline.query(&mut buffering) {
315 let ranges = buffering.ranges();
316 for (start, end) in ranges {
317 let start = (if let gstreamer::GenericFormattedValue::Percent(start) = start {
318 start.unwrap()
319 } else {
320 gstreamer::format::Percent::from_percent(0)
321 } / gstreamer::format::Percent::MAX) as f64 *
322 duration.as_secs_f64();
323 let end = (if let gstreamer::GenericFormattedValue::Percent(end) = end {
324 end.unwrap()
325 } else {
326 gstreamer::format::Percent::from_percent(0)
327 } / gstreamer::format::Percent::MAX) as f64 *
328 duration.as_secs_f64();
329 buffered_ranges.push(Range { start, end });
330 }
331 }
332
333 buffered_ranges
334 }
335
336 pub fn seekable(&self) -> Vec<Range<f64>> {
337 if let Some(metadata) = self.last_metadata.as_ref() &&
339 metadata.is_seekable &&
340 let Some(duration) = metadata.duration
341 {
342 return vec![Range {
343 start: 0.0,
344 end: duration.as_secs_f64(),
345 }];
346 }
347
348 self.buffered()
350 }
351
352 fn set_stream(&mut self, stream: &MediaStreamId, only_stream: bool) -> Result<(), PlayerError> {
353 debug_assert!(self.stream_type == StreamType::Stream);
354 let Some(PlayerSource::Stream(ref source)) = self.source else {
355 return Err(PlayerError::SetStreamFailed);
356 };
357
358 let stream = get_stream(stream).expect("Media streams registry does not contain such ID");
359 let mut stream = stream.lock().unwrap();
360 let Some(stream) = stream.as_mut_any().downcast_mut::<GStreamerMediaStream>() else {
361 return Err(PlayerError::SetStreamFailed);
362 };
363
364 let playbin = self
365 .player
366 .pipeline()
367 .dynamic_cast::<gstreamer::Pipeline>()
368 .unwrap();
369 let clock = gstreamer::SystemClock::obtain();
370 playbin.set_base_time(*BACKEND_BASE_TIME);
371 playbin.set_start_time(gstreamer::ClockTime::NONE);
372 playbin.use_clock(Some(&clock));
373 source
374 .set_stream(stream, only_stream)
375 .map_err(|_| PlayerError::SetStreamFailed)
376 }
377
378 fn set_audio_track(&mut self, stream_index: i32, enabled: bool) -> Result<(), PlayerError> {
379 self.player
380 .set_audio_track(stream_index)
381 .map_err(|_| PlayerError::SetTrackFailed)?;
382 self.player.set_audio_track_enabled(enabled);
383 Ok(())
384 }
385
386 fn set_video_track(&mut self, stream_index: i32, enabled: bool) -> Result<(), PlayerError> {
387 self.player
388 .set_video_track(stream_index)
389 .map_err(|_| PlayerError::SetTrackFailed)?;
390 self.player.set_video_track_enabled(enabled);
391 Ok(())
392 }
393}
394
395macro_rules! notify(
396 ($observer:expr_2021, $event:expr_2021) => {
397 $observer.lock().unwrap().send($event)
398 };
399);
400
401struct SeekChannel {
402 sender: SeekLock,
403 recv: IpcReceiver<SeekLockMsg>,
404}
405
406impl SeekChannel {
407 fn new() -> Self {
408 let (sender, recv) = channel::<SeekLockMsg>().expect("Couldn't create IPC channel");
409 Self {
410 sender: SeekLock {
411 lock_channel: sender,
412 },
413 recv,
414 }
415 }
416
417 fn sender(&self) -> SeekLock {
418 self.sender.clone()
419 }
420
421 fn _await(&self) -> SeekLockMsg {
422 self.recv.recv().unwrap()
423 }
424}
425
426pub struct GStreamerPlayer {
427 id: usize,
429 context_id: ClientContextId,
431 backend_chan: Arc<Mutex<Sender<BackendMsg>>>,
433 inner: RefCell<Option<Arc<Mutex<PlayerInner>>>>,
434 observer: Arc<Mutex<IpcSender<PlayerEvent>>>,
435 audio_renderer: Option<Arc<Mutex<dyn AudioRenderer>>>,
436 video_renderer: Option<Arc<Mutex<dyn VideoFrameRenderer>>>,
437 is_ready: Arc<Once>,
440 stream_type: StreamType,
442 render: Arc<Mutex<GStreamerRender>>,
444}
445
446impl GStreamerPlayer {
447 #[allow(clippy::too_many_arguments)]
448 pub fn new(
449 id: usize,
450 context_id: &ClientContextId,
451 backend_chan: Arc<Mutex<Sender<BackendMsg>>>,
452 stream_type: StreamType,
453 observer: IpcSender<PlayerEvent>,
454 video_renderer: Option<Arc<Mutex<dyn VideoFrameRenderer>>>,
455 audio_renderer: Option<Arc<Mutex<dyn AudioRenderer>>>,
456 gl_context: Box<dyn PlayerGLContext>,
457 ) -> GStreamerPlayer {
458 let _ = gstreamer::DebugCategory::new(
459 "servoplayer",
460 gstreamer::DebugColorFlags::empty(),
461 Some("Servo player"),
462 );
463
464 Self {
465 id,
466 context_id: *context_id,
467 backend_chan,
468 inner: RefCell::new(None),
469 observer: Arc::new(Mutex::new(observer)),
470 audio_renderer,
471 video_renderer,
472 is_ready: Arc::new(Once::new()),
473 stream_type,
474 render: Arc::new(Mutex::new(GStreamerRender::new(gl_context))),
475 }
476 }
477
478 fn setup(&self) -> Result<(), PlayerError> {
479 if self.inner.borrow().is_some() {
480 return Ok(());
481 }
482
483 for element in ["playbin3", "decodebin3", "queue"] {
486 if gstreamer::ElementFactory::find(element).is_none() {
487 return Err(PlayerError::Backend(format!(
488 "Missing dependency: {}",
489 element
490 )));
491 }
492 }
493
494 let player = gstreamer_play::Play::default();
495 let signal_adapter = gstreamer_play::PlaySignalAdapter::new_sync_emit(&player);
496 let pipeline = player.pipeline();
497
498 if !cfg!(any(target_os = "windows", target_os = "android")) {
500 let flags = pipeline.property_value("flags");
504 let flags_class = match glib::FlagsClass::with_type(flags.type_()) {
505 Some(flags) => flags,
506 None => {
507 return Err(PlayerError::Backend(
508 "FlagsClass creation failed".to_owned(),
509 ));
510 },
511 };
512 let flags_class = match flags_class.builder_with_value(flags) {
513 Some(class) => class,
514 None => {
515 return Err(PlayerError::Backend(
516 "FlagsClass creation failed".to_owned(),
517 ));
518 },
519 };
520 let Some(flags) = flags_class.set_by_nick("download").build() else {
521 return Err(PlayerError::Backend(
522 "FlagsClass creation failed".to_owned(),
523 ));
524 };
525 pipeline.set_property_from_value("flags", &flags);
526 }
527
528 pipeline.set_property("buffer-size", MAX_BUFFER_SIZE);
530
531 let mut config = player.config();
533 config.set_position_update_interval(500u32);
534 player
535 .set_config(config)
536 .map_err(|e| PlayerError::Backend(e.to_string()))?;
537
538 if let Some(ref audio_renderer) = self.audio_renderer {
539 let audio_sink =
540 gstreamer::ElementFactory::make("appsink")
541 .build()
542 .map_err(|error| {
543 PlayerError::Backend(format!("appsink creation failed: {error:?}"))
544 })?;
545
546 pipeline.set_property("audio-sink", &audio_sink);
547
548 let audio_sink = audio_sink.dynamic_cast::<gstreamer_app::AppSink>().unwrap();
549
550 let weak_audio_renderer = Arc::downgrade(audio_renderer);
551
552 audio_sink.set_callbacks(
553 gstreamer_app::AppSinkCallbacks::builder()
554 .new_preroll(|_| Ok(gstreamer::FlowSuccess::Ok))
555 .new_sample(move |audio_sink| {
556 let sample = audio_sink
557 .pull_sample()
558 .map_err(|_| gstreamer::FlowError::Eos)?;
559 let buffer = sample.buffer_owned().ok_or(gstreamer::FlowError::Error)?;
560 let audio_info = sample
561 .caps()
562 .and_then(|caps| gstreamer_audio::AudioInfo::from_caps(caps).ok())
563 .ok_or(gstreamer::FlowError::Error)?;
564 let positions =
565 audio_info.positions().ok_or(gstreamer::FlowError::Error)?;
566
567 let Some(audio_renderer) = weak_audio_renderer.upgrade() else {
568 return Err(gstreamer::FlowError::Flushing);
569 };
570
571 for position in positions.iter() {
572 let buffer = buffer.clone();
573 let map = match buffer.into_mapped_buffer_readable() {
574 Ok(map) => map,
575 _ => {
576 return Err(gstreamer::FlowError::Error);
577 },
578 };
579 let chunk = Box::new(GStreamerAudioChunk(map));
580 let channel = position.to_mask() as u32;
581
582 audio_renderer.lock().unwrap().render(chunk, channel);
583 }
584 Ok(gstreamer::FlowSuccess::Ok)
585 })
586 .build(),
587 );
588 }
589
590 let video_sink = self.render.lock().unwrap().setup_video_sink(&pipeline)?;
591
592 let uri = match self.stream_type {
600 StreamType::Stream => {
601 register_servo_media_stream_src().map_err(|error| {
602 PlayerError::Backend(format!(
603 "servomediastreamsrc registration error: {error:?}"
604 ))
605 })?;
606 "mediastream://".to_value()
607 },
608 StreamType::Seekable => {
609 register_servo_src().map_err(|error| {
610 PlayerError::Backend(format!("servosrc registration error: {error:?}"))
611 })?;
612 "servosrc://".to_value()
613 },
614 };
615 player.set_property("uri", &uri);
616
617 if self.video_renderer.is_none() {
619 player.set_video_track_enabled(false);
620 }
621
622 *self.inner.borrow_mut() = Some(Arc::new(Mutex::new(PlayerInner {
623 player,
624 _signal_adapter: signal_adapter.clone(),
625 source: None,
626 video_sink,
627 input_size: 0,
628 play_state: gstreamer_play::PlayState::Stopped,
629 paused: Cell::new(DEFAULT_PAUSED),
630 can_resume: Cell::new(DEFAULT_CAN_RESUME),
631 playback_rate: Cell::new(DEFAULT_PLAYBACK_RATE),
632 muted: Cell::new(DEFAULT_MUTED),
633 volume: Cell::new(DEFAULT_VOLUME),
634 stream_type: self.stream_type,
635 last_metadata: None,
636 cat: gstreamer::DebugCategory::get("servoplayer").unwrap(),
637 enough_data: Arc::new(AtomicBool::new(false)),
638 })));
639
640 let inner = self.inner.borrow();
641 let inner = inner.as_ref().unwrap();
642 let observer = self.observer.clone();
643 signal_adapter.connect_end_of_stream(move |_| {
645 let _ = notify!(observer, PlayerEvent::EndOfStream);
646 });
647
648 let observer = self.observer.clone();
649 signal_adapter.connect_error(move |_self, error, _details| {
651 let _ = notify!(observer, PlayerEvent::Error(error.to_string()));
652 });
653
654 let inner_clone = inner.clone();
655 let observer = self.observer.clone();
656 signal_adapter.connect_state_changed(move |_, play_state| {
658 inner_clone.lock().unwrap().play_state = play_state;
659
660 let state = match play_state {
661 gstreamer_play::PlayState::Buffering => Some(PlaybackState::Buffering),
662 gstreamer_play::PlayState::Stopped => Some(PlaybackState::Stopped),
663 gstreamer_play::PlayState::Paused => Some(PlaybackState::Paused),
664 gstreamer_play::PlayState::Playing => Some(PlaybackState::Playing),
665 _ => None,
666 };
667 if let Some(v) = state {
668 let _ = notify!(observer, PlayerEvent::StateChanged(v));
669 }
670 });
671
672 let observer = self.observer.clone();
673 signal_adapter.connect_position_updated(move |_, position| {
675 if let Some(seconds) = position.map(|p| p.seconds_f64()) {
676 let _ = notify!(observer, PlayerEvent::PositionChanged(seconds));
677 }
678 });
679
680 let observer = self.observer.clone();
681 signal_adapter.connect_seek_done(move |_, position| {
683 let _ = notify!(observer, PlayerEvent::SeekDone(position.seconds_f64()));
684 });
685
686 let inner_clone = inner.clone();
688 let observer = self.observer.clone();
689 signal_adapter.connect_media_info_updated(move |_, info| {
690 let Ok(metadata) = metadata_from_media_info(info) else {
691 return;
692 };
693
694 let mut inner = inner_clone.lock().unwrap();
695
696 if inner.last_metadata.as_ref() == Some(&metadata) {
697 return;
698 }
699
700 let mut send_pause_event = false;
703
704 if inner.last_metadata.is_none() && metadata.is_seekable {
705 if inner.playback_rate.get() != DEFAULT_PLAYBACK_RATE {
706 inner.player.set_rate(inner.playback_rate.get());
710 } else if inner.play_state == gstreamer_play::PlayState::Paused {
711 send_pause_event = true;
712 }
713 }
714
715 inner.last_metadata = Some(metadata.clone());
716 gstreamer::info!(
717 inner.cat,
718 obj = &inner.player,
719 "Metadata updated: {:?}",
720 metadata
721 );
722 let _ = notify!(observer, PlayerEvent::MetadataUpdated(metadata));
723
724 if send_pause_event {
725 let _ = notify!(observer, PlayerEvent::StateChanged(PlaybackState::Paused));
726 }
727 });
728
729 let inner_clone = inner.clone();
731 let observer = self.observer.clone();
732 signal_adapter.connect_duration_changed(move |_, duration| {
733 let duration = duration.map(|duration| {
734 time::Duration::new(
735 duration.seconds(),
736 (duration.nseconds() % 1_000_000_000) as u32,
737 )
738 });
739
740 let mut inner = inner_clone.lock().unwrap();
741 if let Some(ref mut metadata) = inner.last_metadata &&
742 metadata.duration != duration
743 {
744 metadata.duration = duration;
745 gstreamer::info!(
746 inner.cat,
747 obj = &inner.player,
748 "Duration changed: {:?}",
749 duration
750 );
751 let _ = notify!(observer, PlayerEvent::DurationChanged(duration));
752 }
753 });
754
755 if let Some(video_renderer) = self.video_renderer.clone() {
756 let render_sample = {
759 let render = self.render.clone();
760 let observer = self.observer.clone();
761 let weak_video_renderer = Arc::downgrade(&video_renderer);
762
763 move |sample: gstreamer::Sample| {
764 let Some(frame) = render.lock().unwrap().get_frame_from_sample(sample) else {
765 return Err(gstreamer::FlowError::Error);
766 };
767
768 match weak_video_renderer.upgrade() {
769 Some(video_renderer) => {
770 video_renderer.lock().unwrap().render(frame);
771 },
772 _ => {
773 return Err(gstreamer::FlowError::Flushing);
774 },
775 };
776
777 let _ = notify!(observer, PlayerEvent::VideoFrameUpdated);
778 Ok(gstreamer::FlowSuccess::Ok)
779 }
780 };
781
782 inner.lock().unwrap().video_sink.set_callbacks(
784 gstreamer_app::AppSinkCallbacks::builder()
785 .new_preroll({
786 let render_sample = render_sample.clone();
787 move |video_sink| {
788 render_sample(
789 video_sink
790 .pull_preroll()
791 .map_err(|_| gstreamer::FlowError::Eos)?,
792 )
793 }
794 })
795 .new_sample(move |video_sink| {
796 render_sample(
797 video_sink
798 .pull_sample()
799 .map_err(|_| gstreamer::FlowError::Eos)?,
800 )
801 })
802 .build(),
803 );
804 };
805
806 let (receiver, error_handler_id) = {
807 let inner_clone = inner.clone();
808 let inner = inner.lock().unwrap();
809 let pipeline = inner.player.pipeline();
810
811 let (sender, receiver) = mpsc::channel();
812
813 let sender = Arc::new(Mutex::new(sender));
814 let sender_clone = sender.clone();
815 let is_ready_clone = self.is_ready.clone();
816 let observer = self.observer.clone();
817 pipeline.connect("source-setup", false, move |args| {
818 let source = args[1].get::<gstreamer::Element>().unwrap();
819
820 let mut inner = inner_clone.lock().unwrap();
821 let source = match inner.stream_type {
822 StreamType::Seekable => {
823 let servosrc = source
824 .dynamic_cast::<ServoSrc>()
825 .expect("Source element is expected to be a ServoSrc!");
826
827 if inner.input_size > 0 {
828 servosrc.set_size(inner.input_size as i64);
829 }
830
831 let sender_clone = sender.clone();
832 let is_ready = is_ready_clone.clone();
833 let observer_ = observer.clone();
834 let observer__ = observer.clone();
835 let observer___ = observer.clone();
836 let servosrc_ = servosrc.clone();
837 let enough_data_ = inner.enough_data.clone();
838 let enough_data__ = inner.enough_data.clone();
839 let seek_channel = Arc::new(Mutex::new(SeekChannel::new()));
840 servosrc.set_callbacks(
841 gstreamer_app::AppSrcCallbacks::builder()
842 .need_data(move |_, _| {
843 is_ready.call_once(|| {
849 let _ = sender_clone.lock().unwrap().send(Ok(()));
850 });
851
852 enough_data_.store(false, Ordering::Relaxed);
853 let _ = notify!(observer_, PlayerEvent::NeedData);
854 })
855 .enough_data(move |_| {
856 enough_data__.store(true, Ordering::Relaxed);
857 let _ = notify!(observer__, PlayerEvent::EnoughData);
858 })
859 .seek_data(move |_, offset| {
860 let (ret, ack_channel) = if servosrc_.set_seek_offset(offset) {
861 let _ = notify!(
862 observer___,
863 PlayerEvent::SeekData(
864 offset,
865 seek_channel.lock().unwrap().sender()
866 )
867 );
868 let (ret, ack_channel) =
869 seek_channel.lock().unwrap()._await();
870 (ret, Some(ack_channel))
871 } else {
872 (true, None)
873 };
874
875 servosrc_.set_seek_done();
876 if let Some(ack_channel) = ack_channel {
877 ack_channel.send(()).unwrap();
878 }
879 ret
880 })
881 .build(),
882 );
883
884 PlayerSource::Seekable(servosrc)
885 },
886 StreamType::Stream => {
887 let media_stream_src = source
888 .dynamic_cast::<ServoMediaStreamSrc>()
889 .expect("Source element is expected to be a ServoMediaStreamSrc!");
890 let sender_clone = sender.clone();
891 is_ready_clone.call_once(|| {
892 let _ = notify!(sender_clone, Ok(()));
893 });
894 PlayerSource::Stream(media_stream_src)
895 },
896 };
897
898 inner.set_src(source);
899
900 None
901 });
902
903 let error_handler_id =
904 signal_adapter.connect_error(move |signal_adapter, error, _details| {
905 let _ = notify!(sender_clone, Err(PlayerError::Backend(error.to_string())));
906 signal_adapter.play().stop();
907 });
908
909 inner.player.pause();
910
911 (receiver, error_handler_id)
912 };
913
914 let result = receiver.recv().unwrap();
915 glib::signal::signal_handler_disconnect(&inner.lock().unwrap().player, error_handler_id);
916 result
917 }
918}
919
920macro_rules! inner_player_proxy_getter {
921 ($fn_name:ident, $return_type:ty, $default_value:expr_2021) => {
922 fn $fn_name(&self) -> $return_type {
923 if self.setup().is_err() {
924 return $default_value;
925 }
926
927 let inner = self.inner.borrow();
928 let inner = inner.as_ref().unwrap().lock().unwrap();
929 inner.$fn_name()
930 }
931 };
932}
933
934macro_rules! inner_player_proxy {
935 ($fn_name:ident, $return_type:ty) => {
936 fn $fn_name(&self) -> Result<$return_type, PlayerError> {
937 self.setup()?;
938 let inner = self.inner.borrow();
939 let mut inner = inner.as_ref().unwrap().lock().unwrap();
940 inner.$fn_name()
941 }
942 };
943
944 ($fn_name:ident, $arg1:ident, $arg1_type:ty) => {
945 fn $fn_name(&self, $arg1: $arg1_type) -> Result<(), PlayerError> {
946 self.setup()?;
947 let inner = self.inner.borrow();
948 let mut inner = inner.as_ref().unwrap().lock().unwrap();
949 inner.$fn_name($arg1)
950 }
951 };
952
953 ($fn_name:ident, $arg1:ident, $arg1_type:ty, $arg2:ident, $arg2_type:ty) => {
954 fn $fn_name(&self, $arg1: $arg1_type, $arg2: $arg2_type) -> Result<(), PlayerError> {
955 self.setup()?;
956 let inner = self.inner.borrow();
957 let mut inner = inner.as_ref().unwrap().lock().unwrap();
958 inner.$fn_name($arg1, $arg2)
959 }
960 };
961}
962
963impl Player for GStreamerPlayer {
964 inner_player_proxy!(play, ());
965 inner_player_proxy!(pause, ());
966 inner_player_proxy_getter!(paused, bool, DEFAULT_PAUSED);
967 inner_player_proxy_getter!(can_resume, bool, DEFAULT_CAN_RESUME);
968 inner_player_proxy!(stop, ());
969 inner_player_proxy!(end_of_stream, ());
970 inner_player_proxy!(set_input_size, size, u64);
971 inner_player_proxy!(set_mute, muted, bool);
972 inner_player_proxy_getter!(muted, bool, DEFAULT_MUTED);
973 inner_player_proxy!(set_playback_rate, playback_rate, f64);
974 inner_player_proxy_getter!(playback_rate, f64, DEFAULT_PLAYBACK_RATE);
975 inner_player_proxy!(push_data, data, Vec<u8>);
976 inner_player_proxy!(seek, time, f64);
977 inner_player_proxy!(set_volume, volume, f64);
978 inner_player_proxy_getter!(volume, f64, DEFAULT_VOLUME);
979 inner_player_proxy_getter!(buffered, Vec<Range<f64>>, DEFAULT_TIME_RANGES);
980 inner_player_proxy_getter!(seekable, Vec<Range<f64>>, DEFAULT_TIME_RANGES);
981 inner_player_proxy!(set_stream, stream, &MediaStreamId, only_stream, bool);
982 inner_player_proxy!(set_audio_track, stream_index, i32, enabled, bool);
983 inner_player_proxy!(set_video_track, stream_index, i32, enabled, bool);
984
985 fn render_use_gl(&self) -> bool {
986 self.render.lock().unwrap().is_gl()
987 }
988}
989
990impl MediaInstance for GStreamerPlayer {
991 fn get_id(&self) -> usize {
992 self.id
993 }
994
995 fn mute(&self, val: bool) -> Result<(), MediaInstanceError> {
996 self.set_mute(val).map_err(|_| MediaInstanceError)
997 }
998
999 fn suspend(&self) -> Result<(), MediaInstanceError> {
1000 self.pause().map_err(|_| MediaInstanceError)
1001 }
1002
1003 fn resume(&self) -> Result<(), MediaInstanceError> {
1004 if !self.can_resume() {
1005 return Ok(());
1006 }
1007
1008 self.play().map_err(|_| MediaInstanceError)
1009 }
1010}
1011
1012impl Drop for GStreamerPlayer {
1013 fn drop(&mut self) {
1014 let _ = self.stop();
1015 let (tx_ack, rx_ack) = mpsc::channel();
1016 let _ = self
1017 .backend_chan
1018 .lock()
1019 .unwrap()
1020 .send(BackendMsg::Shutdown {
1021 context: self.context_id,
1022 id: self.id,
1023 tx_ack,
1024 });
1025 let _ = rx_ack.recv();
1026 }
1027}