1use super::BACKEND_BASE_TIME;
2use crate::media_stream::GStreamerMediaStream;
3use crate::media_stream_source::{register_servo_media_stream_src, ServoMediaStreamSrc};
4use crate::render::GStreamerRender;
5use crate::source::{register_servo_src, ServoSrc};
6use byte_slice_cast::AsSliceOf;
7use glib;
8use glib::prelude::*;
9use gst;
10use gst::prelude::*;
11use gst_app;
12use gst_play;
13use gst_play::prelude::*;
14use ipc_channel::ipc::{channel, IpcReceiver, IpcSender};
15use servo_media_player::audio::AudioRenderer;
16use servo_media_player::context::PlayerGLContext;
17use servo_media_player::metadata::Metadata;
18use servo_media_player::video::VideoFrameRenderer;
19use servo_media_player::{
20 PlaybackState, Player, PlayerError, PlayerEvent, SeekLock, SeekLockMsg, StreamType,
21};
22use servo_media_streams::registry::{get_stream, MediaStreamId};
23use servo_media_traits::{BackendMsg, ClientContextId, MediaInstance};
24use std::cell::RefCell;
25use std::ops::Range;
26use std::sync::atomic::{AtomicBool, Ordering};
27use std::sync::mpsc::{self, Sender};
28use std::sync::{Arc, Mutex, Once};
29use std::time;
30use std::u64;
31
32const MAX_BUFFER_SIZE: i32 = 500 * 1024 * 1024;
33
34fn metadata_from_media_info(media_info: &gst_play::PlayMediaInfo) -> Result<Metadata, ()> {
35 let dur = media_info.duration();
36 let duration = if let Some(dur) = dur {
37 let mut nanos = dur.nseconds();
38 nanos = nanos % 1_000_000_000;
39 let seconds = dur.seconds();
40 Some(time::Duration::new(seconds, nanos as u32))
41 } else {
42 None
43 };
44
45 let mut audio_tracks = Vec::new();
46 let mut video_tracks = Vec::new();
47
48 let format = media_info
49 .container_format()
50 .unwrap_or_else(|| glib::GString::from(""))
51 .to_string();
52
53 for stream_info in media_info.stream_list() {
54 let stream_type = stream_info.stream_type();
55 match stream_type.as_str() {
56 "audio" => {
57 let codec = stream_info
58 .codec()
59 .unwrap_or_else(|| glib::GString::from(""))
60 .to_string();
61 audio_tracks.push(codec);
62 },
63 "video" => {
64 let codec = stream_info
65 .codec()
66 .unwrap_or_else(|| glib::GString::from(""))
67 .to_string();
68 video_tracks.push(codec);
69 },
70 _ => {},
71 }
72 }
73
74 let mut width: u32 = 0;
75 let height: u32 = if media_info.number_of_video_streams() > 0 {
76 let first_video_stream = &media_info.video_streams()[0];
77 width = first_video_stream.width() as u32;
78 first_video_stream.height() as u32
79 } else {
80 0
81 };
82
83 let is_seekable = media_info.is_seekable();
84 let is_live = media_info.is_live();
85 let title = media_info.title().map(|s| s.as_str().to_string());
86
87 Ok(Metadata {
88 duration,
89 width,
90 height,
91 format,
92 is_seekable,
93 audio_tracks,
94 video_tracks,
95 is_live,
96 title,
97 })
98}
99
100pub struct GStreamerAudioChunk(gst::buffer::MappedBuffer<gst::buffer::Readable>);
101impl AsRef<[f32]> for GStreamerAudioChunk {
102 fn as_ref(&self) -> &[f32] {
103 self.0.as_ref().as_slice_of::<f32>().unwrap()
104 }
105}
106
107#[derive(PartialEq)]
108enum PlayerSource {
109 Seekable(ServoSrc),
110 Stream(ServoMediaStreamSrc),
111}
112
113struct PlayerInner {
114 player: gst_play::Play,
115 signal_adapter: gst_play::PlaySignalAdapter,
116 source: Option<PlayerSource>,
117 video_sink: gst_app::AppSink,
118 input_size: u64,
119 rate: f64,
120 stream_type: StreamType,
121 last_metadata: Option<Metadata>,
122 cat: gst::DebugCategory,
123 enough_data: Arc<AtomicBool>,
124}
125
126impl PlayerInner {
127 pub fn set_input_size(&mut self, size: u64) -> Result<(), PlayerError> {
128 self.input_size = size;
131 match self.source {
132 Some(ref mut source) => {
134 if let PlayerSource::Seekable(source) = source {
135 source.set_size(if size > 0 {
136 size as i64
137 } else {
138 -1 });
140 }
141 },
142 _ => (),
143 }
144 Ok(())
145 }
146
147 pub fn set_mute(&mut self, val: bool) -> Result<(), PlayerError> {
148 self.player.set_mute(val);
149 Ok(())
150 }
151
152 pub fn set_rate(&mut self, rate: f64) -> Result<(), PlayerError> {
153 self.rate = rate;
156 if let Some(ref metadata) = self.last_metadata {
157 if !metadata.is_seekable {
158 gst::warning!(
159 self.cat,
160 obj = &self.player,
161 "Player must be seekable in order to set the playback rate"
162 );
163 return Err(PlayerError::NonSeekableStream);
164 }
165 self.player.set_rate(rate);
166 }
167 Ok(())
168 }
169
170 pub fn play(&mut self) -> Result<(), PlayerError> {
171 self.player.play();
172 Ok(())
173 }
174
175 pub fn stop(&mut self) -> Result<(), PlayerError> {
176 self.player.stop();
177 self.last_metadata = None;
178 self.source = None;
179 Ok(())
180 }
181
182 pub fn pause(&mut self) -> Result<(), PlayerError> {
183 self.player.pause();
184 Ok(())
185 }
186
187 pub fn end_of_stream(&mut self) -> Result<(), PlayerError> {
188 match self.source {
189 Some(ref mut source) => {
190 if let PlayerSource::Seekable(source) = source {
191 source
192 .push_end_of_stream()
193 .map(|_| ())
194 .map_err(|_| PlayerError::EOSFailed)
195 } else {
196 Ok(())
197 }
198 },
199 _ => Ok(()),
200 }
201 }
202
203 pub fn seek(&mut self, time: f64) -> Result<(), PlayerError> {
204 if self.stream_type != StreamType::Seekable {
205 return Err(PlayerError::NonSeekableStream);
206 }
207 if let Some(ref metadata) = self.last_metadata {
208 if let Some(ref duration) = metadata.duration {
209 if duration < &time::Duration::new(time as u64, 0) {
210 gst::warning!(self.cat, obj = &self.player, "Trying to seek out of range");
211 return Err(PlayerError::SeekOutOfRange);
212 }
213 }
214 }
215
216 let time = time * 1_000_000_000.;
217 self.player.seek(gst::ClockTime::from_nseconds(time as u64));
218 Ok(())
219 }
220
221 pub fn set_volume(&mut self, value: f64) -> Result<(), PlayerError> {
222 self.player.set_volume(value);
223 Ok(())
224 }
225
226 pub fn push_data(&mut self, data: Vec<u8>) -> Result<(), PlayerError> {
227 if let Some(ref mut source) = self.source {
228 if let PlayerSource::Seekable(source) = source {
229 if self.enough_data.load(Ordering::Relaxed) {
230 return Err(PlayerError::EnoughData);
231 }
232 return source
233 .push_buffer(data)
234 .map(|_| ())
235 .map_err(|_| PlayerError::BufferPushFailed);
236 }
237 }
238 Err(PlayerError::BufferPushFailed)
239 }
240
241 pub fn set_src(&mut self, source: PlayerSource) {
242 self.source = Some(source);
243 }
244
245 pub fn buffered(&mut self) -> Result<Vec<Range<f64>>, PlayerError> {
246 let mut result = vec![];
247 if let Some(ref metadata) = self.last_metadata {
248 if let Some(ref duration) = metadata.duration {
249 let pipeline = self.player.pipeline();
250 let mut buffering = gst::query::Buffering::new(gst::Format::Percent);
251 if pipeline.query(&mut buffering) {
252 let ranges = buffering.ranges();
253 for (start, end) in ranges {
254 let start = (if let gst::GenericFormattedValue::Percent(start) = start {
255 start.unwrap()
256 } else {
257 gst::format::Percent::from_percent(0)
258 } * duration.as_secs() as u32
259 / gst::format::Percent::MAX) as f64;
260 let end = (if let gst::GenericFormattedValue::Percent(end) = end {
261 end.unwrap()
262 } else {
263 gst::format::Percent::from_percent(0)
264 } * duration.as_secs() as u32
265 / gst::format::Percent::MAX) as f64;
266 result.push(Range { start, end });
267 }
268 }
269 }
270 }
271 Ok(result)
272 }
273
274 fn set_stream(&mut self, stream: &MediaStreamId, only_stream: bool) -> Result<(), PlayerError> {
275 debug_assert!(self.stream_type == StreamType::Stream);
276 if let Some(ref source) = self.source {
277 if let PlayerSource::Stream(source) = source {
278 let stream =
279 get_stream(stream).expect("Media streams registry does not contain such ID");
280 let mut stream = stream.lock().unwrap();
281 if let Some(mut stream) = stream.as_mut_any().downcast_mut::<GStreamerMediaStream>()
282 {
283 let playbin = self
284 .player
285 .pipeline()
286 .dynamic_cast::<gst::Pipeline>()
287 .unwrap();
288 let clock = gst::SystemClock::obtain();
289 playbin.set_base_time(*BACKEND_BASE_TIME);
290 playbin.set_start_time(gst::ClockTime::NONE);
291 playbin.use_clock(Some(&clock));
292
293 source.set_stream(&mut stream, only_stream);
294 return Ok(());
295 }
296 }
297 }
298 Err(PlayerError::SetStreamFailed)
299 }
300
301 fn set_audio_track(&mut self, stream_index: i32, enabled: bool) -> Result<(), PlayerError> {
302 self.player
303 .set_audio_track(stream_index)
304 .map_err(|_| PlayerError::SetTrackFailed)?;
305 self.player.set_audio_track_enabled(enabled);
306 Ok(())
307 }
308
309 fn set_video_track(&mut self, stream_index: i32, enabled: bool) -> Result<(), PlayerError> {
310 self.player
311 .set_video_track(stream_index)
312 .map_err(|_| PlayerError::SetTrackFailed)?;
313 self.player.set_video_track_enabled(enabled);
314 Ok(())
315 }
316}
317
318macro_rules! notify(
319 ($observer:expr, $event:expr) => {
320 $observer.lock().unwrap().send($event)
321 };
322);
323
324struct SeekChannel {
325 sender: SeekLock,
326 recv: IpcReceiver<SeekLockMsg>,
327}
328
329impl SeekChannel {
330 fn new() -> Self {
331 let (sender, recv) = channel::<SeekLockMsg>().expect("Couldn't create IPC channel");
332 Self {
333 sender: SeekLock {
334 lock_channel: sender,
335 },
336 recv,
337 }
338 }
339
340 fn sender(&self) -> SeekLock {
341 self.sender.clone()
342 }
343
344 fn _await(&self) -> SeekLockMsg {
345 self.recv.recv().unwrap()
346 }
347}
348
349pub struct GStreamerPlayer {
350 id: usize,
352 context_id: ClientContextId,
354 backend_chan: Arc<Mutex<Sender<BackendMsg>>>,
356 inner: RefCell<Option<Arc<Mutex<PlayerInner>>>>,
357 observer: Arc<Mutex<IpcSender<PlayerEvent>>>,
358 audio_renderer: Option<Arc<Mutex<dyn AudioRenderer>>>,
359 video_renderer: Option<Arc<Mutex<dyn VideoFrameRenderer>>>,
360 is_ready: Arc<Once>,
363 stream_type: StreamType,
365 render: Arc<Mutex<GStreamerRender>>,
367}
368
369impl GStreamerPlayer {
370 pub fn new(
371 id: usize,
372 context_id: &ClientContextId,
373 backend_chan: Arc<Mutex<Sender<BackendMsg>>>,
374 stream_type: StreamType,
375 observer: IpcSender<PlayerEvent>,
376 video_renderer: Option<Arc<Mutex<dyn VideoFrameRenderer>>>,
377 audio_renderer: Option<Arc<Mutex<dyn AudioRenderer>>>,
378 gl_context: Box<dyn PlayerGLContext>,
379 ) -> GStreamerPlayer {
380 let _ = gst::DebugCategory::new(
381 "servoplayer",
382 gst::DebugColorFlags::empty(),
383 Some("Servo player"),
384 );
385
386 Self {
387 id,
388 context_id: *context_id,
389 backend_chan,
390 inner: RefCell::new(None),
391 observer: Arc::new(Mutex::new(observer)),
392 audio_renderer,
393 video_renderer,
394 is_ready: Arc::new(Once::new()),
395 stream_type,
396 render: Arc::new(Mutex::new(GStreamerRender::new(gl_context))),
397 }
398 }
399
400 fn setup(&self) -> Result<(), PlayerError> {
401 if self.inner.borrow().is_some() {
402 return Ok(());
403 }
404
405 for element in vec!["playbin3", "decodebin3", "queue"].iter() {
408 if gst::ElementFactory::find(element).is_none() {
409 return Err(PlayerError::Backend(format!(
410 "Missing dependency: {}",
411 element
412 )));
413 }
414 }
415
416 let player = gst_play::Play::default();
417 let signal_adapter = gst_play::PlaySignalAdapter::new_sync_emit(&player);
418 let pipeline = player.pipeline();
419
420 if !cfg!(any(target_os = "windows", target_os = "android")) {
422 let flags = pipeline.property_value("flags");
426 let flags_class = match glib::FlagsClass::with_type(flags.type_()) {
427 Some(flags) => flags,
428 None => {
429 return Err(PlayerError::Backend(
430 "FlagsClass creation failed".to_owned(),
431 ));
432 },
433 };
434 let flags_class = match flags_class.builder_with_value(flags) {
435 Some(class) => class,
436 None => {
437 return Err(PlayerError::Backend(
438 "FlagsClass creation failed".to_owned(),
439 ));
440 },
441 };
442 let Some(flags) = flags_class.set_by_nick("download").build() else {
443 return Err(PlayerError::Backend(
444 "FlagsClass creation failed".to_owned(),
445 ));
446 };
447 pipeline.set_property_from_value("flags", &flags);
448 }
449
450 pipeline.set_property("buffer-size", MAX_BUFFER_SIZE);
452
453 let mut config = player.config();
455 config.set_position_update_interval(500u32);
456 player
457 .set_config(config)
458 .map_err(|e| PlayerError::Backend(e.to_string()))?;
459
460 if let Some(ref audio_renderer) = self.audio_renderer {
461 let audio_sink = gst::ElementFactory::make("appsink")
462 .build()
463 .map_err(|error| {
464 PlayerError::Backend(format!("appsink creation failed: {error:?}"))
465 })?;
466
467 pipeline.set_property("audio-sink", &audio_sink);
468
469 let audio_sink = audio_sink.dynamic_cast::<gst_app::AppSink>().unwrap();
470
471 let weak_audio_renderer = Arc::downgrade(&audio_renderer);
472
473 audio_sink.set_callbacks(
474 gst_app::AppSinkCallbacks::builder()
475 .new_preroll(|_| Ok(gst::FlowSuccess::Ok))
476 .new_sample(move |audio_sink| {
477 let sample = audio_sink.pull_sample().map_err(|_| gst::FlowError::Eos)?;
478 let buffer = sample.buffer_owned().ok_or(gst::FlowError::Error)?;
479 let audio_info = sample
480 .caps()
481 .and_then(|caps| gst_audio::AudioInfo::from_caps(caps).ok())
482 .ok_or(gst::FlowError::Error)?;
483 let positions = audio_info.positions().ok_or(gst::FlowError::Error)?;
484
485 let Some(audio_renderer) = weak_audio_renderer.upgrade() else {
486 return Err(gst::FlowError::Flushing);
487 };
488
489 for position in positions.iter() {
490 let buffer = buffer.clone();
491 let map = if let Ok(map) = buffer.into_mapped_buffer_readable() {
492 map
493 } else {
494 return Err(gst::FlowError::Error);
495 };
496 let chunk = Box::new(GStreamerAudioChunk(map));
497 let channel = position.to_mask() as u32;
498
499 audio_renderer.lock().unwrap().render(chunk, channel);
500 }
501 Ok(gst::FlowSuccess::Ok)
502 })
503 .build(),
504 );
505 }
506
507 let video_sink = self.render.lock().unwrap().setup_video_sink(&pipeline)?;
508
509 let uri = match self.stream_type {
517 StreamType::Stream => {
518 register_servo_media_stream_src().map_err(|error| {
519 PlayerError::Backend(format!(
520 "servomediastreamsrc registration error: {error:?}"
521 ))
522 })?;
523 "mediastream://".to_value()
524 },
525 StreamType::Seekable => {
526 register_servo_src().map_err(|error| {
527 PlayerError::Backend(format!("servosrc registration error: {error:?}"))
528 })?;
529 "servosrc://".to_value()
530 },
531 };
532 player.set_property("uri", &uri);
533
534 if self.video_renderer.is_none() {
536 player.set_video_track_enabled(false);
537 }
538
539 *self.inner.borrow_mut() = Some(Arc::new(Mutex::new(PlayerInner {
540 player,
541 signal_adapter: signal_adapter.clone(),
542 source: None,
543 video_sink,
544 input_size: 0,
545 rate: 1.0,
546 stream_type: self.stream_type,
547 last_metadata: None,
548 cat: gst::DebugCategory::get("servoplayer").unwrap(),
549 enough_data: Arc::new(AtomicBool::new(false)),
550 })));
551
552 let inner = self.inner.borrow();
553 let inner = inner.as_ref().unwrap();
554 let observer = self.observer.clone();
555 signal_adapter.connect_end_of_stream(move |_| {
557 let _ = notify!(observer, PlayerEvent::EndOfStream);
558 });
559
560 let observer = self.observer.clone();
561 signal_adapter.connect_error(move |_self, error, _details| {
563 let _ = notify!(observer, PlayerEvent::Error(error.to_string()));
564 });
565
566 let observer = self.observer.clone();
567 signal_adapter.connect_state_changed(move |_, player_state| {
569 let state = match player_state {
570 gst_play::PlayState::Buffering => Some(PlaybackState::Buffering),
571 gst_play::PlayState::Stopped => Some(PlaybackState::Stopped),
572 gst_play::PlayState::Paused => Some(PlaybackState::Paused),
573 gst_play::PlayState::Playing => Some(PlaybackState::Playing),
574 _ => None,
575 };
576 if let Some(v) = state {
577 let _ = notify!(observer, PlayerEvent::StateChanged(v));
578 }
579 });
580
581 let observer = self.observer.clone();
582 signal_adapter.connect_position_updated(move |_, position| {
584 if let Some(seconds) = position.map(|p| p.seconds()) {
585 let _ = notify!(observer, PlayerEvent::PositionChanged(seconds));
586 }
587 });
588
589 let observer = self.observer.clone();
590 signal_adapter.connect_seek_done(move |_, position| {
592 let _ = notify!(observer, PlayerEvent::SeekDone(position.seconds()));
593 });
594
595 let inner_clone = inner.clone();
597 let observer = self.observer.clone();
598 signal_adapter.connect_media_info_updated(move |_, info| {
599 let mut inner = inner_clone.lock().unwrap();
600 if let Ok(metadata) = metadata_from_media_info(info) {
601 if inner.last_metadata.as_ref() != Some(&metadata) {
602 inner.last_metadata = Some(metadata.clone());
603 if metadata.is_seekable {
604 inner.player.set_rate(inner.rate);
605 }
606 gst::info!(
607 inner.cat,
608 obj = &inner.player,
609 "Metadata updated: {:?}",
610 metadata
611 );
612 let _ = notify!(observer, PlayerEvent::MetadataUpdated(metadata));
613 }
614 }
615 });
616
617 let inner_clone = inner.clone();
619 let observer = self.observer.clone();
620 signal_adapter.connect_duration_changed(move |_, duration| {
621 let mut inner = inner_clone.lock().unwrap();
622 let duration = duration.map(|duration| {
623 let nanos = duration.nseconds();
624 let seconds = duration.seconds();
625 time::Duration::new(seconds, (nanos % 1_000_000_000) as u32)
626 });
627 let mut updated_metadata = None;
628 if let Some(ref mut metadata) = inner.last_metadata {
629 metadata.duration = duration;
630 updated_metadata = Some(metadata.clone());
631 }
632 if let Some(updated_metadata) = updated_metadata {
633 gst::info!(
634 inner.cat,
635 obj = &inner.player,
636 "Duration updated: {:?}",
637 updated_metadata
638 );
639 let _ = notify!(observer, PlayerEvent::MetadataUpdated(updated_metadata));
640 }
641 });
642
643 if let Some(video_renderer) = self.video_renderer.clone() {
644 let render_sample = {
647 let render = self.render.clone();
648 let observer = self.observer.clone();
649 let weak_video_renderer = Arc::downgrade(&video_renderer);
650
651 move |sample: gst::Sample| {
652 let frame = render
653 .lock()
654 .unwrap()
655 .get_frame_from_sample(sample)
656 .map_err(|_| gst::FlowError::Error)?;
657
658 if let Some(video_renderer) = weak_video_renderer.upgrade() {
659 video_renderer.lock().unwrap().render(frame);
660 } else {
661 return Err(gst::FlowError::Flushing);
662 };
663
664 let _ = notify!(observer, PlayerEvent::VideoFrameUpdated);
665 Ok(gst::FlowSuccess::Ok)
666 }
667 };
668
669 inner.lock().unwrap().video_sink.set_callbacks(
671 gst_app::AppSinkCallbacks::builder()
672 .new_preroll({
673 let render_sample = render_sample.clone();
674 move |video_sink| {
675 render_sample(
676 video_sink.pull_preroll().map_err(|_| gst::FlowError::Eos)?,
677 )
678 }
679 })
680 .new_sample(move |video_sink| {
681 render_sample(video_sink.pull_sample().map_err(|_| gst::FlowError::Eos)?)
682 })
683 .build(),
684 );
685 };
686
687 let (receiver, error_handler_id) = {
688 let inner_clone = inner.clone();
689 let mut inner = inner.lock().unwrap();
690 let pipeline = inner.player.pipeline();
691
692 let (sender, receiver) = mpsc::channel();
693
694 let sender = Arc::new(Mutex::new(sender));
695 let sender_clone = sender.clone();
696 let is_ready_clone = self.is_ready.clone();
697 let observer = self.observer.clone();
698 pipeline.connect("source-setup", false, move |args| {
699 let source = args[1].get::<gst::Element>().unwrap();
700
701 let mut inner = inner_clone.lock().unwrap();
702 let source = match inner.stream_type {
703 StreamType::Seekable => {
704 let servosrc = source
705 .dynamic_cast::<ServoSrc>()
706 .expect("Source element is expected to be a ServoSrc!");
707
708 if inner.input_size > 0 {
709 servosrc.set_size(inner.input_size as i64);
710 }
711
712 let sender_clone = sender.clone();
713 let is_ready = is_ready_clone.clone();
714 let observer_ = observer.clone();
715 let observer__ = observer.clone();
716 let observer___ = observer.clone();
717 let servosrc_ = servosrc.clone();
718 let enough_data_ = inner.enough_data.clone();
719 let enough_data__ = inner.enough_data.clone();
720 let seek_channel = Arc::new(Mutex::new(SeekChannel::new()));
721 servosrc.set_callbacks(
722 gst_app::AppSrcCallbacks::builder()
723 .need_data(move |_, _| {
724 is_ready.call_once(|| {
730 let _ = sender_clone.lock().unwrap().send(Ok(()));
731 });
732
733 enough_data_.store(false, Ordering::Relaxed);
734 let _ = notify!(observer_, PlayerEvent::NeedData);
735 })
736 .enough_data(move |_| {
737 enough_data__.store(true, Ordering::Relaxed);
738 let _ = notify!(observer__, PlayerEvent::EnoughData);
739 })
740 .seek_data(move |_, offset| {
741 let (ret, ack_channel) = if servosrc_.set_seek_offset(offset) {
742 let _ = notify!(
743 observer___,
744 PlayerEvent::SeekData(
745 offset,
746 seek_channel.lock().unwrap().sender()
747 )
748 );
749 let (ret, ack_channel) =
750 seek_channel.lock().unwrap()._await();
751 (ret, Some(ack_channel))
752 } else {
753 (true, None)
754 };
755
756 servosrc_.set_seek_done();
757 if let Some(ack_channel) = ack_channel {
758 ack_channel.send(()).unwrap();
759 }
760 ret
761 })
762 .build(),
763 );
764
765 PlayerSource::Seekable(servosrc)
766 },
767 StreamType::Stream => {
768 let media_stream_src = source
769 .dynamic_cast::<ServoMediaStreamSrc>()
770 .expect("Source element is expected to be a ServoMediaStreamSrc!");
771 let sender_clone = sender.clone();
772 is_ready_clone.call_once(|| {
773 let _ = notify!(sender_clone, Ok(()));
774 });
775 PlayerSource::Stream(media_stream_src)
776 },
777 };
778
779 inner.set_src(source);
780
781 None
782 });
783
784 let error_handler_id =
785 signal_adapter.connect_error(move |signal_adapter, error, _details| {
786 let _ = notify!(sender_clone, Err(PlayerError::Backend(error.to_string())));
787 signal_adapter.play().stop();
788 });
789
790 let _ = inner.pause();
791
792 (receiver, error_handler_id)
793 };
794
795 let result = receiver.recv().unwrap();
796 glib::signal::signal_handler_disconnect(&inner.lock().unwrap().player, error_handler_id);
797 result
798 }
799}
800
801macro_rules! inner_player_proxy {
802 ($fn_name:ident, $return_type:ty) => {
803 fn $fn_name(&self) -> Result<$return_type, PlayerError> {
804 self.setup()?;
805 let inner = self.inner.borrow();
806 let mut inner = inner.as_ref().unwrap().lock().unwrap();
807 inner.$fn_name()
808 }
809 };
810
811 ($fn_name:ident, $arg1:ident, $arg1_type:ty) => {
812 fn $fn_name(&self, $arg1: $arg1_type) -> Result<(), PlayerError> {
813 self.setup()?;
814 let inner = self.inner.borrow();
815 let mut inner = inner.as_ref().unwrap().lock().unwrap();
816 inner.$fn_name($arg1)
817 }
818 };
819}
820
821impl Player for GStreamerPlayer {
822 inner_player_proxy!(play, ());
823 inner_player_proxy!(pause, ());
824 inner_player_proxy!(stop, ());
825 inner_player_proxy!(end_of_stream, ());
826 inner_player_proxy!(set_input_size, size, u64);
827 inner_player_proxy!(set_mute, val, bool);
828 inner_player_proxy!(set_rate, rate, f64);
829 inner_player_proxy!(push_data, data, Vec<u8>);
830 inner_player_proxy!(seek, time, f64);
831 inner_player_proxy!(set_volume, value, f64);
832 inner_player_proxy!(buffered, Vec<Range<f64>>);
833
834 fn seekable(&self) -> Result<Vec<Range<f64>>, PlayerError> {
835 self.setup()?;
836 let inner = self.inner.borrow();
837 let mut inner = inner.as_ref().unwrap().lock().unwrap();
838 if let Some(metadata) = inner.last_metadata.as_ref() {
840 if metadata.is_seekable {
841 if let Some(duration) = metadata.duration {
842 return Ok(vec![Range {
843 start: 0.0,
844 end: duration.as_secs_f64(),
845 }]);
846 }
847 }
848 }
849 inner.buffered()
851 }
852
853 fn render_use_gl(&self) -> bool {
854 self.render.lock().unwrap().is_gl()
855 }
856
857 fn set_stream(&self, stream: &MediaStreamId, only_stream: bool) -> Result<(), PlayerError> {
858 self.setup()?;
859 let inner = self.inner.borrow();
860 let mut inner = inner.as_ref().unwrap().lock().unwrap();
861 inner.set_stream(stream, only_stream)
862 }
863
864 fn set_audio_track(&self, stream_index: i32, enabled: bool) -> Result<(), PlayerError> {
865 self.setup()?;
866 let inner = self.inner.borrow();
867 let mut inner = inner.as_ref().unwrap().lock().unwrap();
868 inner.set_audio_track(stream_index, enabled)
869 }
870
871 fn set_video_track(&self, stream_index: i32, enabled: bool) -> Result<(), PlayerError> {
872 self.setup()?;
873 let inner = self.inner.borrow();
874 let mut inner = inner.as_ref().unwrap().lock().unwrap();
875 inner.set_video_track(stream_index, enabled)
876 }
877}
878
879impl MediaInstance for GStreamerPlayer {
880 fn get_id(&self) -> usize {
881 self.id
882 }
883
884 fn mute(&self, val: bool) -> Result<(), ()> {
885 self.set_mute(val).map_err(|_| ())
886 }
887
888 fn suspend(&self) -> Result<(), ()> {
889 self.pause().map_err(|_| ())
890 }
891
892 fn resume(&self) -> Result<(), ()> {
893 self.play().map_err(|_| ())
894 }
895}
896
897impl Drop for GStreamerPlayer {
898 fn drop(&mut self) {
899 let _ = self.stop();
900 let (tx_ack, rx_ack) = mpsc::channel();
901 let _ = self
902 .backend_chan
903 .lock()
904 .unwrap()
905 .send(BackendMsg::Shutdown {
906 context: self.context_id,
907 id: self.id,
908 tx_ack,
909 });
910 let _ = rx_ack.recv();
911 }
912}