servo_media_gstreamer/
media_stream_source.rs

1use crate::media_stream::{GStreamerMediaStream, RTP_CAPS_OPUS, RTP_CAPS_VP8};
2use glib::subclass::prelude::*;
3use gst::prelude::*;
4use gst::subclass::prelude::*;
5use gst_base::UniqueFlowCombiner;
6use once_cell::sync::Lazy;
7use servo_media_streams::{MediaStream, MediaStreamType};
8use std::sync::atomic::{AtomicBool, Ordering};
9use std::sync::{Arc, Mutex};
10use url::Url;
11
12// Implementation sub-module of the GObject
13mod imp {
14    use super::*;
15
16    static AUDIO_SRC_PAD_TEMPLATE: Lazy<gst::PadTemplate> = Lazy::new(|| {
17        gst::PadTemplate::new(
18            "audio_src",
19            gst::PadDirection::Src,
20            gst::PadPresence::Sometimes,
21            &RTP_CAPS_OPUS,
22        )
23        .expect("Could not create audio src pad template")
24    });
25
26    static VIDEO_SRC_PAD_TEMPLATE: Lazy<gst::PadTemplate> = Lazy::new(|| {
27        gst::PadTemplate::new(
28            "video_src",
29            gst::PadDirection::Src,
30            gst::PadPresence::Sometimes,
31            &RTP_CAPS_VP8,
32        )
33        .expect("Could not create video src pad template")
34    });
35
36    pub struct ServoMediaStreamSrc {
37        cat: gst::DebugCategory,
38        audio_proxysrc: gst::Element,
39        audio_srcpad: gst::GhostPad,
40        video_proxysrc: gst::Element,
41        video_srcpad: gst::GhostPad,
42        flow_combiner: Arc<Mutex<UniqueFlowCombiner>>,
43        has_audio_stream: Arc<AtomicBool>,
44        has_video_stream: Arc<AtomicBool>,
45    }
46
47    impl ServoMediaStreamSrc {
48        pub fn set_stream(
49            &self,
50            stream: &mut GStreamerMediaStream,
51            src: &gst::Element,
52            only_stream: bool,
53        ) {
54            // XXXferjm the current design limits the number of streams to one
55            // per type. This fulfills the basic use case for WebRTC, but we should
56            // implement support for multiple streams per type at some point, which
57            // likely involves encoding and muxing all streams of the same type
58            // in a single stream.
59
60            gst::log!(self.cat, "Setting stream");
61
62            // Append a proxysink to the media stream pipeline.
63            let pipeline = stream.pipeline_or_new();
64            let last_element = stream.encoded();
65            let sink = gst::ElementFactory::make("proxysink").build().unwrap();
66            pipeline.add(&sink).unwrap();
67            gst::Element::link_many(&[&last_element, &sink][..]).unwrap();
68
69            // Create the appropriate proxysrc depending on the stream type
70            // and connect the media stream proxysink to it.
71            self.setup_proxy_src(stream.ty(), &sink, src, only_stream);
72
73            sink.sync_state_with_parent().unwrap();
74
75            pipeline.set_state(gst::State::Playing).unwrap();
76        }
77
78        fn setup_proxy_src(
79            &self,
80            stream_type: MediaStreamType,
81            sink: &gst::Element,
82            src: &gst::Element,
83            only_stream: bool,
84        ) {
85            let (proxysrc, src_pad, no_more_pads) = match stream_type {
86                MediaStreamType::Audio => {
87                    self.has_audio_stream.store(true, Ordering::Relaxed);
88                    (
89                        &self.audio_proxysrc,
90                        &self.audio_srcpad,
91                        self.has_video_stream.load(Ordering::Relaxed),
92                    )
93                },
94                MediaStreamType::Video => {
95                    self.has_video_stream.store(true, Ordering::Relaxed);
96                    (
97                        &self.video_proxysrc,
98                        &self.video_srcpad,
99                        self.has_audio_stream.load(Ordering::Relaxed),
100                    )
101                },
102            };
103            proxysrc.set_property("proxysink", sink);
104
105            // Add proxysrc to bin
106            let bin = src.downcast_ref::<gst::Bin>().unwrap();
107            bin.add(proxysrc)
108                .expect("Could not add proxysrc element to bin");
109
110            let target_pad = proxysrc
111                .static_pad("src")
112                .expect("Could not get proxysrc's static src pad");
113            src_pad
114                .set_target(Some(&target_pad))
115                .expect("Could not set target pad");
116
117            src.add_pad(src_pad)
118                .expect("Could not add source pad to media stream src");
119            src.set_element_flags(gst::ElementFlags::SOURCE);
120
121            let proxy_pad = src_pad.internal().unwrap();
122            src_pad.set_active(true).expect("Could not active pad");
123            self.flow_combiner.lock().unwrap().add_pad(&proxy_pad);
124
125            src.sync_state_with_parent().unwrap();
126
127            if no_more_pads || only_stream {
128                src.no_more_pads();
129            }
130        }
131    }
132
133    // Basic declaration of our type for the GObject type system.
134    #[glib::object_subclass]
135    impl ObjectSubclass for ServoMediaStreamSrc {
136        const NAME: &'static str = "ServoMediaStreamSrc";
137        type Type = super::ServoMediaStreamSrc;
138        type ParentType = gst::Bin;
139        type Interfaces = (gst::URIHandler,);
140
141        // Called once at the very beginning of instantiation of each instance and
142        // creates the data structure that contains all our state
143        fn with_class(_klass: &Self::Class) -> Self {
144            let flow_combiner = Arc::new(Mutex::new(UniqueFlowCombiner::new()));
145
146            fn create_ghost_pad_with_template(
147                name: &str,
148                pad_template: &gst::PadTemplate,
149                flow_combiner: Arc<Mutex<UniqueFlowCombiner>>,
150            ) -> gst::GhostPad {
151                gst::GhostPad::builder_from_template(pad_template)
152                    .name(name)
153                    .chain_function({
154                        move |pad, parent, buffer| {
155                            let chain_result = gst::ProxyPad::chain_default(pad, parent, buffer);
156                            let result = flow_combiner
157                                .lock()
158                                .unwrap()
159                                .update_pad_flow(pad, chain_result);
160                            if result == Err(gst::FlowError::Flushing) {
161                                return chain_result;
162                            }
163                            result
164                        }
165                    })
166                    .build()
167            }
168
169            let audio_proxysrc = gst::ElementFactory::make("proxysrc")
170                .build()
171                .expect("Could not create proxysrc element");
172            let audio_srcpad = create_ghost_pad_with_template(
173                "audio_src",
174                &AUDIO_SRC_PAD_TEMPLATE,
175                flow_combiner.clone(),
176            );
177
178            let video_proxysrc = gst::ElementFactory::make("proxysrc")
179                .build()
180                .expect("Could not create proxysrc element");
181            let video_srcpad = create_ghost_pad_with_template(
182                "video_src",
183                &VIDEO_SRC_PAD_TEMPLATE,
184                flow_combiner.clone(),
185            );
186
187            Self {
188                cat: gst::DebugCategory::new(
189                    "servomediastreamsrc",
190                    gst::DebugColorFlags::empty(),
191                    Some("Servo media stream source"),
192                ),
193                audio_proxysrc,
194                audio_srcpad,
195                video_proxysrc,
196                video_srcpad,
197                flow_combiner,
198                has_video_stream: Arc::new(AtomicBool::new(false)),
199                has_audio_stream: Arc::new(AtomicBool::new(false)),
200            }
201        }
202    }
203
204    // The ObjectImpl trait provides the setters/getters for GObject properties.
205    // Here we need to provide the values that are internally stored back to the
206    // caller, or store whatever new value the caller is providing.
207    //
208    // This maps between the GObject properties and our internal storage of the
209    // corresponding values of the properties.
210    impl ObjectImpl for ServoMediaStreamSrc {
211        fn properties() -> &'static [glib::ParamSpec] {
212            static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| {
213                vec![
214                    // Let playbin3 know we are a live source.
215                    glib::ParamSpecBoolean::builder("is-live")
216                        .nick("Is Live")
217                        .blurb("Let playbin3 know we are a live source")
218                        .default_value(true)
219                        .readwrite()
220                        .build(),
221                ]
222            });
223
224            &PROPERTIES
225        }
226
227        fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
228            match pspec.name() {
229                "is-live" => true.to_value(),
230                _ => unimplemented!(),
231            }
232        }
233    }
234
235    impl GstObjectImpl for ServoMediaStreamSrc {}
236
237    // Implementation of gst::Element virtual methods
238    impl ElementImpl for ServoMediaStreamSrc {
239        fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
240            static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| {
241                gst::subclass::ElementMetadata::new(
242                    "Servo Media Stream Source",
243                    "Source/Audio/Video",
244                    "Feed player with media stream data",
245                    "Servo developers",
246                )
247            });
248
249            Some(&*ELEMENT_METADATA)
250        }
251
252        fn pad_templates() -> &'static [gst::PadTemplate] {
253            static PAD_TEMPLATES: Lazy<Vec<gst::PadTemplate>> = Lazy::new(|| {
254                // Add pad templates for our audio and video source pads.
255                // These are later used for actually creating the pads and beforehand
256                // already provide information to GStreamer about all possible
257                // pads that could exist for this type.
258                vec![
259                    AUDIO_SRC_PAD_TEMPLATE.clone(),
260                    VIDEO_SRC_PAD_TEMPLATE.clone(),
261                ]
262            });
263
264            PAD_TEMPLATES.as_ref()
265        }
266    }
267
268    // Implementation of gst::Bin virtual methods
269    impl BinImpl for ServoMediaStreamSrc {}
270
271    impl URIHandlerImpl for ServoMediaStreamSrc {
272        const URI_TYPE: gst::URIType = gst::URIType::Src;
273
274        fn protocols() -> &'static [&'static str] {
275            &["mediastream"]
276        }
277
278        fn uri(&self) -> Option<String> {
279            Some("mediastream://".to_string())
280        }
281
282        fn set_uri(&self, uri: &str) -> Result<(), glib::Error> {
283            if let Ok(uri) = Url::parse(uri) {
284                if uri.scheme() == "mediastream" {
285                    return Ok(());
286                }
287            }
288            Err(glib::Error::new(
289                gst::URIError::BadUri,
290                format!("Invalid URI '{:?}'", uri,).as_str(),
291            ))
292        }
293    }
294}
295
296// Public part of the ServoMediaStreamSrc type. This behaves like a normal
297// GObject binding
298glib::wrapper! {
299    pub struct ServoMediaStreamSrc(ObjectSubclass<imp::ServoMediaStreamSrc>)
300        @extends gst::Bin, gst::Element, gst::Object, @implements gst::URIHandler;
301}
302
303unsafe impl Send for ServoMediaStreamSrc {}
304unsafe impl Sync for ServoMediaStreamSrc {}
305
306impl ServoMediaStreamSrc {
307    pub fn set_stream(&self, stream: &mut GStreamerMediaStream, only_stream: bool) {
308        self.imp()
309            .set_stream(stream, self.upcast_ref::<gst::Element>(), only_stream)
310    }
311}
312
313// Registers the type for our element, and then registers in GStreamer
314// under the name "servomediastreamsrc" for being able to instantiate it via e.g.
315// gst::ElementFactory::make().
316pub fn register_servo_media_stream_src() -> Result<(), glib::BoolError> {
317    gst::Element::register(
318        None,
319        "servomediastreamsrc",
320        gst::Rank::NONE,
321        ServoMediaStreamSrc::static_type(),
322    )
323}