1use std::sync::atomic::{AtomicBool, Ordering};
6use std::sync::{Arc, LazyLock, Mutex};
7
8use glib::subclass::prelude::*;
9use gstreamer::prelude::*;
10use gstreamer::subclass::prelude::*;
11use gstreamer_base::UniqueFlowCombiner;
12use servo_media_player::PlayerError;
13use servo_media_streams::{MediaStream, MediaStreamType};
14use url::Url;
15
16use crate::media_stream::{GStreamerMediaStream, RTP_CAPS_OPUS, RTP_CAPS_VP8};
17
18mod imp {
20
21 use super::*;
22
23 static AUDIO_SRC_PAD_TEMPLATE: LazyLock<gstreamer::PadTemplate> = LazyLock::new(|| {
24 gstreamer::PadTemplate::new(
25 "audio_src",
26 gstreamer::PadDirection::Src,
27 gstreamer::PadPresence::Sometimes,
28 &RTP_CAPS_OPUS,
29 )
30 .expect("Could not create audio src pad template")
31 });
32
33 static VIDEO_SRC_PAD_TEMPLATE: LazyLock<gstreamer::PadTemplate> = LazyLock::new(|| {
34 gstreamer::PadTemplate::new(
35 "video_src",
36 gstreamer::PadDirection::Src,
37 gstreamer::PadPresence::Sometimes,
38 &RTP_CAPS_VP8,
39 )
40 .expect("Could not create video src pad template")
41 });
42
43 pub struct ServoMediaStreamSrc {
44 cat: gstreamer::DebugCategory,
45 audio_proxysrc: gstreamer::Element,
46 audio_srcpad: gstreamer::GhostPad,
47 video_proxysrc: gstreamer::Element,
48 video_srcpad: gstreamer::GhostPad,
49 flow_combiner: Arc<Mutex<UniqueFlowCombiner>>,
50 has_audio_stream: Arc<AtomicBool>,
51 has_video_stream: Arc<AtomicBool>,
52 }
53
54 impl ServoMediaStreamSrc {
55 pub fn set_stream(
56 &self,
57 stream: &mut GStreamerMediaStream,
58 src: &gstreamer::Element,
59 only_stream: bool,
60 ) -> Result<(), PlayerError> {
61 gstreamer::log!(self.cat, "Setting stream");
68
69 let pipeline = stream.pipeline_or_new();
71 let last_element = stream.encoded().map_err(|_| PlayerError::SetStreamFailed)?;
72 let sink = gstreamer::ElementFactory::make("proxysink")
73 .build()
74 .map_err(|_| PlayerError::SetStreamFailed)?;
75 pipeline
76 .add(&sink)
77 .map_err(|_| PlayerError::SetStreamFailed)?;
78 gstreamer::Element::link_many(&[&last_element, &sink][..])
79 .map_err(|_| PlayerError::SetStreamFailed)?;
80
81 self.setup_proxy_src(stream.ty(), &sink, src, only_stream);
84
85 sink.sync_state_with_parent()
86 .map_err(|_| PlayerError::SetStreamFailed)?;
87 pipeline
88 .set_state(gstreamer::State::Playing)
89 .map_err(|_| PlayerError::SetStreamFailed)?;
90
91 Ok(())
92 }
93
94 fn setup_proxy_src(
95 &self,
96 stream_type: MediaStreamType,
97 sink: &gstreamer::Element,
98 src: &gstreamer::Element,
99 only_stream: bool,
100 ) {
101 let (proxysrc, src_pad, no_more_pads) = match stream_type {
102 MediaStreamType::Audio => {
103 self.has_audio_stream.store(true, Ordering::Relaxed);
104 (
105 &self.audio_proxysrc,
106 &self.audio_srcpad,
107 self.has_video_stream.load(Ordering::Relaxed),
108 )
109 },
110 MediaStreamType::Video => {
111 self.has_video_stream.store(true, Ordering::Relaxed);
112 (
113 &self.video_proxysrc,
114 &self.video_srcpad,
115 self.has_audio_stream.load(Ordering::Relaxed),
116 )
117 },
118 };
119 proxysrc.set_property("proxysink", sink);
120
121 let bin = src.downcast_ref::<gstreamer::Bin>().unwrap();
123 bin.add(proxysrc)
124 .expect("Could not add proxysrc element to bin");
125
126 let target_pad = proxysrc
127 .static_pad("src")
128 .expect("Could not get proxysrc's static src pad");
129 src_pad
130 .set_target(Some(&target_pad))
131 .expect("Could not set target pad");
132
133 src.add_pad(src_pad)
134 .expect("Could not add source pad to media stream src");
135 src.set_element_flags(gstreamer::ElementFlags::SOURCE);
136
137 let proxy_pad = src_pad.internal().unwrap();
138 src_pad.set_active(true).expect("Could not active pad");
139 self.flow_combiner.lock().unwrap().add_pad(&proxy_pad);
140
141 src.sync_state_with_parent().unwrap();
142
143 if no_more_pads || only_stream {
144 src.no_more_pads();
145 }
146 }
147 }
148
149 #[glib::object_subclass]
151 impl ObjectSubclass for ServoMediaStreamSrc {
152 const NAME: &'static str = "ServoMediaStreamSrc";
153 type Type = super::ServoMediaStreamSrc;
154 type ParentType = gstreamer::Bin;
155 type Interfaces = (gstreamer::URIHandler,);
156
157 fn with_class(_klass: &Self::Class) -> Self {
160 let flow_combiner = Arc::new(Mutex::new(UniqueFlowCombiner::new()));
161
162 fn create_ghost_pad_with_template(
163 name: &str,
164 pad_template: &gstreamer::PadTemplate,
165 flow_combiner: Arc<Mutex<UniqueFlowCombiner>>,
166 ) -> gstreamer::GhostPad {
167 gstreamer::GhostPad::builder_from_template(pad_template)
168 .name(name)
169 .chain_function({
170 move |pad, parent, buffer| {
171 let chain_result =
172 gstreamer::ProxyPad::chain_default(pad, parent, buffer);
173 let result = flow_combiner
174 .lock()
175 .unwrap()
176 .update_pad_flow(pad, chain_result);
177 if result == Err(gstreamer::FlowError::Flushing) {
178 return chain_result;
179 }
180 result
181 }
182 })
183 .build()
184 }
185
186 let audio_proxysrc = gstreamer::ElementFactory::make("proxysrc")
187 .build()
188 .expect("Could not create proxysrc element");
189 let audio_srcpad = create_ghost_pad_with_template(
190 "audio_src",
191 &AUDIO_SRC_PAD_TEMPLATE,
192 flow_combiner.clone(),
193 );
194
195 let video_proxysrc = gstreamer::ElementFactory::make("proxysrc")
196 .build()
197 .expect("Could not create proxysrc element");
198 let video_srcpad = create_ghost_pad_with_template(
199 "video_src",
200 &VIDEO_SRC_PAD_TEMPLATE,
201 flow_combiner.clone(),
202 );
203
204 Self {
205 cat: gstreamer::DebugCategory::new(
206 "servomediastreamsrc",
207 gstreamer::DebugColorFlags::empty(),
208 Some("Servo media stream source"),
209 ),
210 audio_proxysrc,
211 audio_srcpad,
212 video_proxysrc,
213 video_srcpad,
214 flow_combiner,
215 has_video_stream: Arc::new(AtomicBool::new(false)),
216 has_audio_stream: Arc::new(AtomicBool::new(false)),
217 }
218 }
219 }
220
221 impl ObjectImpl for ServoMediaStreamSrc {
228 fn properties() -> &'static [glib::ParamSpec] {
229 static PROPERTIES: LazyLock<Vec<glib::ParamSpec>> = LazyLock::new(|| {
230 vec![
231 glib::ParamSpecBoolean::builder("is-live")
233 .nick("Is Live")
234 .blurb("Let playbin3 know we are a live source")
235 .default_value(true)
236 .readwrite()
237 .build(),
238 ]
239 });
240
241 &PROPERTIES
242 }
243
244 fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
245 match pspec.name() {
246 "is-live" => true.to_value(),
247 _ => unimplemented!(),
248 }
249 }
250 }
251
252 impl GstObjectImpl for ServoMediaStreamSrc {}
253
254 impl ElementImpl for ServoMediaStreamSrc {
256 fn metadata() -> Option<&'static gstreamer::subclass::ElementMetadata> {
257 static ELEMENT_METADATA: LazyLock<gstreamer::subclass::ElementMetadata> =
258 LazyLock::new(|| {
259 gstreamer::subclass::ElementMetadata::new(
260 "Servo Media Stream Source",
261 "Source/Audio/Video",
262 "Feed player with media stream data",
263 "Servo developers",
264 )
265 });
266
267 Some(&*ELEMENT_METADATA)
268 }
269
270 fn pad_templates() -> &'static [gstreamer::PadTemplate] {
271 static PAD_TEMPLATES: LazyLock<Vec<gstreamer::PadTemplate>> = LazyLock::new(|| {
272 vec![
277 AUDIO_SRC_PAD_TEMPLATE.clone(),
278 VIDEO_SRC_PAD_TEMPLATE.clone(),
279 ]
280 });
281
282 PAD_TEMPLATES.as_ref()
283 }
284 }
285
286 impl BinImpl for ServoMediaStreamSrc {}
288
289 impl URIHandlerImpl for ServoMediaStreamSrc {
290 const URI_TYPE: gstreamer::URIType = gstreamer::URIType::Src;
291
292 fn protocols() -> &'static [&'static str] {
293 &["mediastream"]
294 }
295
296 fn uri(&self) -> Option<String> {
297 Some("mediastream://".to_string())
298 }
299
300 fn set_uri(&self, uri: &str) -> Result<(), glib::Error> {
301 if let Ok(uri) = Url::parse(uri) &&
302 uri.scheme() == "mediastream"
303 {
304 return Ok(());
305 }
306 Err(glib::Error::new(
307 gstreamer::URIError::BadUri,
308 format!("Invalid URI '{:?}'", uri,).as_str(),
309 ))
310 }
311 }
312}
313
314glib::wrapper! {
317 pub struct ServoMediaStreamSrc(ObjectSubclass<imp::ServoMediaStreamSrc>)
318 @extends gstreamer::Bin, gstreamer::Element, gstreamer::Object, @implements gstreamer::URIHandler;
319}
320
321unsafe impl Send for ServoMediaStreamSrc {}
322unsafe impl Sync for ServoMediaStreamSrc {}
323
324impl ServoMediaStreamSrc {
325 pub fn set_stream(
326 &self,
327 stream: &mut GStreamerMediaStream,
328 only_stream: bool,
329 ) -> Result<(), PlayerError> {
330 self.imp()
331 .set_stream(stream, self.upcast_ref::<gstreamer::Element>(), only_stream)
332 }
333}
334
335pub fn register_servo_media_stream_src() -> Result<(), glib::BoolError> {
339 gstreamer::Element::register(
340 None,
341 "servomediastreamsrc",
342 gstreamer::Rank::NONE,
343 ServoMediaStreamSrc::static_type(),
344 )
345}