1use crate::media_stream::{GStreamerMediaStream, RTP_CAPS_OPUS, RTP_CAPS_VP8};
2use glib::subclass::prelude::*;
3use gst::prelude::*;
4use gst::subclass::prelude::*;
5use gst_base::UniqueFlowCombiner;
6use once_cell::sync::Lazy;
7use servo_media_streams::{MediaStream, MediaStreamType};
8use std::sync::atomic::{AtomicBool, Ordering};
9use std::sync::{Arc, Mutex};
10use url::Url;
11
12mod imp {
14 use super::*;
15
16 static AUDIO_SRC_PAD_TEMPLATE: Lazy<gst::PadTemplate> = Lazy::new(|| {
17 gst::PadTemplate::new(
18 "audio_src",
19 gst::PadDirection::Src,
20 gst::PadPresence::Sometimes,
21 &RTP_CAPS_OPUS,
22 )
23 .expect("Could not create audio src pad template")
24 });
25
26 static VIDEO_SRC_PAD_TEMPLATE: Lazy<gst::PadTemplate> = Lazy::new(|| {
27 gst::PadTemplate::new(
28 "video_src",
29 gst::PadDirection::Src,
30 gst::PadPresence::Sometimes,
31 &RTP_CAPS_VP8,
32 )
33 .expect("Could not create video src pad template")
34 });
35
36 pub struct ServoMediaStreamSrc {
37 cat: gst::DebugCategory,
38 audio_proxysrc: gst::Element,
39 audio_srcpad: gst::GhostPad,
40 video_proxysrc: gst::Element,
41 video_srcpad: gst::GhostPad,
42 flow_combiner: Arc<Mutex<UniqueFlowCombiner>>,
43 has_audio_stream: Arc<AtomicBool>,
44 has_video_stream: Arc<AtomicBool>,
45 }
46
47 impl ServoMediaStreamSrc {
48 pub fn set_stream(
49 &self,
50 stream: &mut GStreamerMediaStream,
51 src: &gst::Element,
52 only_stream: bool,
53 ) {
54 gst::log!(self.cat, "Setting stream");
61
62 let pipeline = stream.pipeline_or_new();
64 let last_element = stream.encoded();
65 let sink = gst::ElementFactory::make("proxysink").build().unwrap();
66 pipeline.add(&sink).unwrap();
67 gst::Element::link_many(&[&last_element, &sink][..]).unwrap();
68
69 self.setup_proxy_src(stream.ty(), &sink, src, only_stream);
72
73 sink.sync_state_with_parent().unwrap();
74
75 pipeline.set_state(gst::State::Playing).unwrap();
76 }
77
78 fn setup_proxy_src(
79 &self,
80 stream_type: MediaStreamType,
81 sink: &gst::Element,
82 src: &gst::Element,
83 only_stream: bool,
84 ) {
85 let (proxysrc, src_pad, no_more_pads) = match stream_type {
86 MediaStreamType::Audio => {
87 self.has_audio_stream.store(true, Ordering::Relaxed);
88 (
89 &self.audio_proxysrc,
90 &self.audio_srcpad,
91 self.has_video_stream.load(Ordering::Relaxed),
92 )
93 },
94 MediaStreamType::Video => {
95 self.has_video_stream.store(true, Ordering::Relaxed);
96 (
97 &self.video_proxysrc,
98 &self.video_srcpad,
99 self.has_audio_stream.load(Ordering::Relaxed),
100 )
101 },
102 };
103 proxysrc.set_property("proxysink", sink);
104
105 let bin = src.downcast_ref::<gst::Bin>().unwrap();
107 bin.add(proxysrc)
108 .expect("Could not add proxysrc element to bin");
109
110 let target_pad = proxysrc
111 .static_pad("src")
112 .expect("Could not get proxysrc's static src pad");
113 src_pad
114 .set_target(Some(&target_pad))
115 .expect("Could not set target pad");
116
117 src.add_pad(src_pad)
118 .expect("Could not add source pad to media stream src");
119 src.set_element_flags(gst::ElementFlags::SOURCE);
120
121 let proxy_pad = src_pad.internal().unwrap();
122 src_pad.set_active(true).expect("Could not active pad");
123 self.flow_combiner.lock().unwrap().add_pad(&proxy_pad);
124
125 src.sync_state_with_parent().unwrap();
126
127 if no_more_pads || only_stream {
128 src.no_more_pads();
129 }
130 }
131 }
132
133 #[glib::object_subclass]
135 impl ObjectSubclass for ServoMediaStreamSrc {
136 const NAME: &'static str = "ServoMediaStreamSrc";
137 type Type = super::ServoMediaStreamSrc;
138 type ParentType = gst::Bin;
139 type Interfaces = (gst::URIHandler,);
140
141 fn with_class(_klass: &Self::Class) -> Self {
144 let flow_combiner = Arc::new(Mutex::new(UniqueFlowCombiner::new()));
145
146 fn create_ghost_pad_with_template(
147 name: &str,
148 pad_template: &gst::PadTemplate,
149 flow_combiner: Arc<Mutex<UniqueFlowCombiner>>,
150 ) -> gst::GhostPad {
151 gst::GhostPad::builder_from_template(pad_template)
152 .name(name)
153 .chain_function({
154 move |pad, parent, buffer| {
155 let chain_result = gst::ProxyPad::chain_default(pad, parent, buffer);
156 let result = flow_combiner
157 .lock()
158 .unwrap()
159 .update_pad_flow(pad, chain_result);
160 if result == Err(gst::FlowError::Flushing) {
161 return chain_result;
162 }
163 result
164 }
165 })
166 .build()
167 }
168
169 let audio_proxysrc = gst::ElementFactory::make("proxysrc")
170 .build()
171 .expect("Could not create proxysrc element");
172 let audio_srcpad = create_ghost_pad_with_template(
173 "audio_src",
174 &AUDIO_SRC_PAD_TEMPLATE,
175 flow_combiner.clone(),
176 );
177
178 let video_proxysrc = gst::ElementFactory::make("proxysrc")
179 .build()
180 .expect("Could not create proxysrc element");
181 let video_srcpad = create_ghost_pad_with_template(
182 "video_src",
183 &VIDEO_SRC_PAD_TEMPLATE,
184 flow_combiner.clone(),
185 );
186
187 Self {
188 cat: gst::DebugCategory::new(
189 "servomediastreamsrc",
190 gst::DebugColorFlags::empty(),
191 Some("Servo media stream source"),
192 ),
193 audio_proxysrc,
194 audio_srcpad,
195 video_proxysrc,
196 video_srcpad,
197 flow_combiner,
198 has_video_stream: Arc::new(AtomicBool::new(false)),
199 has_audio_stream: Arc::new(AtomicBool::new(false)),
200 }
201 }
202 }
203
204 impl ObjectImpl for ServoMediaStreamSrc {
211 fn properties() -> &'static [glib::ParamSpec] {
212 static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| {
213 vec![
214 glib::ParamSpecBoolean::builder("is-live")
216 .nick("Is Live")
217 .blurb("Let playbin3 know we are a live source")
218 .default_value(true)
219 .readwrite()
220 .build(),
221 ]
222 });
223
224 &PROPERTIES
225 }
226
227 fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
228 match pspec.name() {
229 "is-live" => true.to_value(),
230 _ => unimplemented!(),
231 }
232 }
233 }
234
235 impl GstObjectImpl for ServoMediaStreamSrc {}
236
237 impl ElementImpl for ServoMediaStreamSrc {
239 fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
240 static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| {
241 gst::subclass::ElementMetadata::new(
242 "Servo Media Stream Source",
243 "Source/Audio/Video",
244 "Feed player with media stream data",
245 "Servo developers",
246 )
247 });
248
249 Some(&*ELEMENT_METADATA)
250 }
251
252 fn pad_templates() -> &'static [gst::PadTemplate] {
253 static PAD_TEMPLATES: Lazy<Vec<gst::PadTemplate>> = Lazy::new(|| {
254 vec![
259 AUDIO_SRC_PAD_TEMPLATE.clone(),
260 VIDEO_SRC_PAD_TEMPLATE.clone(),
261 ]
262 });
263
264 PAD_TEMPLATES.as_ref()
265 }
266 }
267
268 impl BinImpl for ServoMediaStreamSrc {}
270
271 impl URIHandlerImpl for ServoMediaStreamSrc {
272 const URI_TYPE: gst::URIType = gst::URIType::Src;
273
274 fn protocols() -> &'static [&'static str] {
275 &["mediastream"]
276 }
277
278 fn uri(&self) -> Option<String> {
279 Some("mediastream://".to_string())
280 }
281
282 fn set_uri(&self, uri: &str) -> Result<(), glib::Error> {
283 if let Ok(uri) = Url::parse(uri) {
284 if uri.scheme() == "mediastream" {
285 return Ok(());
286 }
287 }
288 Err(glib::Error::new(
289 gst::URIError::BadUri,
290 format!("Invalid URI '{:?}'", uri,).as_str(),
291 ))
292 }
293 }
294}
295
296glib::wrapper! {
299 pub struct ServoMediaStreamSrc(ObjectSubclass<imp::ServoMediaStreamSrc>)
300 @extends gst::Bin, gst::Element, gst::Object, @implements gst::URIHandler;
301}
302
303unsafe impl Send for ServoMediaStreamSrc {}
304unsafe impl Sync for ServoMediaStreamSrc {}
305
306impl ServoMediaStreamSrc {
307 pub fn set_stream(&self, stream: &mut GStreamerMediaStream, only_stream: bool) {
308 self.imp()
309 .set_stream(stream, self.upcast_ref::<gst::Element>(), only_stream)
310 }
311}
312
313pub fn register_servo_media_stream_src() -> Result<(), glib::BoolError> {
317 gst::Element::register(
318 None,
319 "servomediastreamsrc",
320 gst::Rank::NONE,
321 ServoMediaStreamSrc::static_type(),
322 )
323}