Skip to main content

servo_media_gstreamer/
media_stream_source.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
4
5use std::sync::atomic::{AtomicBool, Ordering};
6use std::sync::{Arc, LazyLock, Mutex};
7
8use glib::subclass::prelude::*;
9use gstreamer::prelude::*;
10use gstreamer::subclass::prelude::*;
11use gstreamer_base::UniqueFlowCombiner;
12use servo_media_player::PlayerError;
13use servo_media_streams::{MediaStream, MediaStreamType};
14use url::Url;
15
16use crate::media_stream::{GStreamerMediaStream, RTP_CAPS_OPUS, RTP_CAPS_VP8};
17
18// Implementation sub-module of the GObject
19mod imp {
20
21    use super::*;
22
23    static AUDIO_SRC_PAD_TEMPLATE: LazyLock<gstreamer::PadTemplate> = LazyLock::new(|| {
24        gstreamer::PadTemplate::new(
25            "audio_src",
26            gstreamer::PadDirection::Src,
27            gstreamer::PadPresence::Sometimes,
28            &RTP_CAPS_OPUS,
29        )
30        .expect("Could not create audio src pad template")
31    });
32
33    static VIDEO_SRC_PAD_TEMPLATE: LazyLock<gstreamer::PadTemplate> = LazyLock::new(|| {
34        gstreamer::PadTemplate::new(
35            "video_src",
36            gstreamer::PadDirection::Src,
37            gstreamer::PadPresence::Sometimes,
38            &RTP_CAPS_VP8,
39        )
40        .expect("Could not create video src pad template")
41    });
42
43    pub struct ServoMediaStreamSrc {
44        cat: gstreamer::DebugCategory,
45        audio_proxysrc: gstreamer::Element,
46        audio_srcpad: gstreamer::GhostPad,
47        video_proxysrc: gstreamer::Element,
48        video_srcpad: gstreamer::GhostPad,
49        flow_combiner: Arc<Mutex<UniqueFlowCombiner>>,
50        has_audio_stream: Arc<AtomicBool>,
51        has_video_stream: Arc<AtomicBool>,
52    }
53
54    impl ServoMediaStreamSrc {
55        pub fn set_stream(
56            &self,
57            stream: &mut GStreamerMediaStream,
58            src: &gstreamer::Element,
59            only_stream: bool,
60        ) -> Result<(), PlayerError> {
61            // XXXferjm the current design limits the number of streams to one
62            // per type. This fulfills the basic use case for WebRTC, but we should
63            // implement support for multiple streams per type at some point, which
64            // likely involves encoding and muxing all streams of the same type
65            // in a single stream.
66
67            gstreamer::log!(self.cat, "Setting stream");
68
69            // Append a proxysink to the media stream pipeline.
70            let pipeline = stream.pipeline_or_new();
71            let last_element = stream.encoded().map_err(|_| PlayerError::SetStreamFailed)?;
72            let sink = gstreamer::ElementFactory::make("proxysink")
73                .build()
74                .map_err(|_| PlayerError::SetStreamFailed)?;
75            pipeline
76                .add(&sink)
77                .map_err(|_| PlayerError::SetStreamFailed)?;
78            gstreamer::Element::link_many(&[&last_element, &sink][..])
79                .map_err(|_| PlayerError::SetStreamFailed)?;
80
81            // Create the appropriate proxysrc depending on the stream type
82            // and connect the media stream proxysink to it.
83            self.setup_proxy_src(stream.ty(), &sink, src, only_stream);
84
85            sink.sync_state_with_parent()
86                .map_err(|_| PlayerError::SetStreamFailed)?;
87            pipeline
88                .set_state(gstreamer::State::Playing)
89                .map_err(|_| PlayerError::SetStreamFailed)?;
90
91            Ok(())
92        }
93
94        fn setup_proxy_src(
95            &self,
96            stream_type: MediaStreamType,
97            sink: &gstreamer::Element,
98            src: &gstreamer::Element,
99            only_stream: bool,
100        ) {
101            let (proxysrc, src_pad, no_more_pads) = match stream_type {
102                MediaStreamType::Audio => {
103                    self.has_audio_stream.store(true, Ordering::Relaxed);
104                    (
105                        &self.audio_proxysrc,
106                        &self.audio_srcpad,
107                        self.has_video_stream.load(Ordering::Relaxed),
108                    )
109                },
110                MediaStreamType::Video => {
111                    self.has_video_stream.store(true, Ordering::Relaxed);
112                    (
113                        &self.video_proxysrc,
114                        &self.video_srcpad,
115                        self.has_audio_stream.load(Ordering::Relaxed),
116                    )
117                },
118            };
119            proxysrc.set_property("proxysink", sink);
120
121            // Add proxysrc to bin
122            let bin = src.downcast_ref::<gstreamer::Bin>().unwrap();
123            bin.add(proxysrc)
124                .expect("Could not add proxysrc element to bin");
125
126            let target_pad = proxysrc
127                .static_pad("src")
128                .expect("Could not get proxysrc's static src pad");
129            src_pad
130                .set_target(Some(&target_pad))
131                .expect("Could not set target pad");
132
133            src.add_pad(src_pad)
134                .expect("Could not add source pad to media stream src");
135            src.set_element_flags(gstreamer::ElementFlags::SOURCE);
136
137            let proxy_pad = src_pad.internal().unwrap();
138            src_pad.set_active(true).expect("Could not active pad");
139            self.flow_combiner.lock().unwrap().add_pad(&proxy_pad);
140
141            src.sync_state_with_parent().unwrap();
142
143            if no_more_pads || only_stream {
144                src.no_more_pads();
145            }
146        }
147    }
148
149    // Basic declaration of our type for the GObject type system.
150    #[glib::object_subclass]
151    impl ObjectSubclass for ServoMediaStreamSrc {
152        const NAME: &'static str = "ServoMediaStreamSrc";
153        type Type = super::ServoMediaStreamSrc;
154        type ParentType = gstreamer::Bin;
155        type Interfaces = (gstreamer::URIHandler,);
156
157        // Called once at the very beginning of instantiation of each instance and
158        // creates the data structure that contains all our state
159        fn with_class(_klass: &Self::Class) -> Self {
160            let flow_combiner = Arc::new(Mutex::new(UniqueFlowCombiner::new()));
161
162            fn create_ghost_pad_with_template(
163                name: &str,
164                pad_template: &gstreamer::PadTemplate,
165                flow_combiner: Arc<Mutex<UniqueFlowCombiner>>,
166            ) -> gstreamer::GhostPad {
167                gstreamer::GhostPad::builder_from_template(pad_template)
168                    .name(name)
169                    .chain_function({
170                        move |pad, parent, buffer| {
171                            let chain_result =
172                                gstreamer::ProxyPad::chain_default(pad, parent, buffer);
173                            let result = flow_combiner
174                                .lock()
175                                .unwrap()
176                                .update_pad_flow(pad, chain_result);
177                            if result == Err(gstreamer::FlowError::Flushing) {
178                                return chain_result;
179                            }
180                            result
181                        }
182                    })
183                    .build()
184            }
185
186            let audio_proxysrc = gstreamer::ElementFactory::make("proxysrc")
187                .build()
188                .expect("Could not create proxysrc element");
189            let audio_srcpad = create_ghost_pad_with_template(
190                "audio_src",
191                &AUDIO_SRC_PAD_TEMPLATE,
192                flow_combiner.clone(),
193            );
194
195            let video_proxysrc = gstreamer::ElementFactory::make("proxysrc")
196                .build()
197                .expect("Could not create proxysrc element");
198            let video_srcpad = create_ghost_pad_with_template(
199                "video_src",
200                &VIDEO_SRC_PAD_TEMPLATE,
201                flow_combiner.clone(),
202            );
203
204            Self {
205                cat: gstreamer::DebugCategory::new(
206                    "servomediastreamsrc",
207                    gstreamer::DebugColorFlags::empty(),
208                    Some("Servo media stream source"),
209                ),
210                audio_proxysrc,
211                audio_srcpad,
212                video_proxysrc,
213                video_srcpad,
214                flow_combiner,
215                has_video_stream: Arc::new(AtomicBool::new(false)),
216                has_audio_stream: Arc::new(AtomicBool::new(false)),
217            }
218        }
219    }
220
221    // The ObjectImpl trait provides the setters/getters for GObject properties.
222    // Here we need to provide the values that are internally stored back to the
223    // caller, or store whatever new value the caller is providing.
224    //
225    // This maps between the GObject properties and our internal storage of the
226    // corresponding values of the properties.
227    impl ObjectImpl for ServoMediaStreamSrc {
228        fn properties() -> &'static [glib::ParamSpec] {
229            static PROPERTIES: LazyLock<Vec<glib::ParamSpec>> = LazyLock::new(|| {
230                vec![
231                    // Let playbin3 know we are a live source.
232                    glib::ParamSpecBoolean::builder("is-live")
233                        .nick("Is Live")
234                        .blurb("Let playbin3 know we are a live source")
235                        .default_value(true)
236                        .readwrite()
237                        .build(),
238                ]
239            });
240
241            &PROPERTIES
242        }
243
244        fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
245            match pspec.name() {
246                "is-live" => true.to_value(),
247                _ => unimplemented!(),
248            }
249        }
250    }
251
252    impl GstObjectImpl for ServoMediaStreamSrc {}
253
254    // Implementation of gstreamer::Element virtual methods
255    impl ElementImpl for ServoMediaStreamSrc {
256        fn metadata() -> Option<&'static gstreamer::subclass::ElementMetadata> {
257            static ELEMENT_METADATA: LazyLock<gstreamer::subclass::ElementMetadata> =
258                LazyLock::new(|| {
259                    gstreamer::subclass::ElementMetadata::new(
260                        "Servo Media Stream Source",
261                        "Source/Audio/Video",
262                        "Feed player with media stream data",
263                        "Servo developers",
264                    )
265                });
266
267            Some(&*ELEMENT_METADATA)
268        }
269
270        fn pad_templates() -> &'static [gstreamer::PadTemplate] {
271            static PAD_TEMPLATES: LazyLock<Vec<gstreamer::PadTemplate>> = LazyLock::new(|| {
272                // Add pad templates for our audio and video source pads.
273                // These are later used for actually creating the pads and beforehand
274                // already provide information to GStreamer about all possible
275                // pads that could exist for this type.
276                vec![
277                    AUDIO_SRC_PAD_TEMPLATE.clone(),
278                    VIDEO_SRC_PAD_TEMPLATE.clone(),
279                ]
280            });
281
282            PAD_TEMPLATES.as_ref()
283        }
284    }
285
286    // Implementation of gstreamer::Bin virtual methods
287    impl BinImpl for ServoMediaStreamSrc {}
288
289    impl URIHandlerImpl for ServoMediaStreamSrc {
290        const URI_TYPE: gstreamer::URIType = gstreamer::URIType::Src;
291
292        fn protocols() -> &'static [&'static str] {
293            &["mediastream"]
294        }
295
296        fn uri(&self) -> Option<String> {
297            Some("mediastream://".to_string())
298        }
299
300        fn set_uri(&self, uri: &str) -> Result<(), glib::Error> {
301            if let Ok(uri) = Url::parse(uri) &&
302                uri.scheme() == "mediastream"
303            {
304                return Ok(());
305            }
306            Err(glib::Error::new(
307                gstreamer::URIError::BadUri,
308                format!("Invalid URI '{:?}'", uri,).as_str(),
309            ))
310        }
311    }
312}
313
314// Public part of the ServoMediaStreamSrc type. This behaves like a normal
315// GObject binding
316glib::wrapper! {
317    pub struct ServoMediaStreamSrc(ObjectSubclass<imp::ServoMediaStreamSrc>)
318        @extends gstreamer::Bin, gstreamer::Element, gstreamer::Object, @implements gstreamer::URIHandler;
319}
320
321unsafe impl Send for ServoMediaStreamSrc {}
322unsafe impl Sync for ServoMediaStreamSrc {}
323
324impl ServoMediaStreamSrc {
325    pub fn set_stream(
326        &self,
327        stream: &mut GStreamerMediaStream,
328        only_stream: bool,
329    ) -> Result<(), PlayerError> {
330        self.imp()
331            .set_stream(stream, self.upcast_ref::<gstreamer::Element>(), only_stream)
332    }
333}
334
335// Registers the type for our element, and then registers in GStreamer
336// under the name "servomediastreamsrc" for being able to instantiate it via e.g.
337// gstreamer::ElementFactory::make().
338pub fn register_servo_media_stream_src() -> Result<(), glib::BoolError> {
339    gstreamer::Element::register(
340        None,
341        "servomediastreamsrc",
342        gstreamer::Rank::NONE,
343        ServoMediaStreamSrc::static_type(),
344    )
345}