1use std::any::Any;
6use std::sync::{Arc, LazyLock, Mutex};
7
8use glib::BoolError;
9use gstreamer;
10use gstreamer::prelude::*;
11use servo_media_streams::registry::{
12 MediaStreamId, get_stream, register_stream, unregister_stream,
13};
14use servo_media_streams::{MediaOutput, MediaSocket, MediaStream, MediaStreamType};
15
16use super::BACKEND_BASE_TIME;
17
18pub static RTP_CAPS_OPUS: LazyLock<gstreamer::Caps> = LazyLock::new(|| {
19 gstreamer::Caps::builder("application/x-rtp")
20 .field("media", "audio")
21 .field("encoding-name", "OPUS")
22 .build()
23});
24
25pub static RTP_CAPS_VP8: LazyLock<gstreamer::Caps> = LazyLock::new(|| {
26 gstreamer::Caps::builder("application/x-rtp")
27 .field("media", "video")
28 .field("encoding-name", "VP8")
29 .build()
30});
31
32pub struct GStreamerMediaStream {
33 id: Option<MediaStreamId>,
34 type_: MediaStreamType,
35 elements: Vec<gstreamer::Element>,
36 pipeline: Option<gstreamer::Pipeline>,
37}
38
39impl MediaStream for GStreamerMediaStream {
40 fn as_any(&self) -> &dyn Any {
41 self
42 }
43
44 fn as_mut_any(&mut self) -> &mut dyn Any {
45 self
46 }
47
48 fn set_id(&mut self, id: MediaStreamId) {
49 self.id = Some(id);
50 }
51
52 fn ty(&self) -> MediaStreamType {
53 self.type_
54 }
55}
56
57impl GStreamerMediaStream {
58 pub fn new(type_: MediaStreamType, elements: Vec<gstreamer::Element>) -> Self {
59 Self {
60 id: None,
61 type_,
62 elements,
63 pipeline: None,
64 }
65 }
66
67 pub fn caps(&self) -> &gstreamer::Caps {
68 match self.type_ {
69 MediaStreamType::Audio => &RTP_CAPS_OPUS,
70 MediaStreamType::Video => &RTP_CAPS_VP8,
71 }
72 }
73
74 pub fn caps_with_payload(&self, payload: i32) -> gstreamer::Caps {
75 match self.type_ {
76 MediaStreamType::Audio => gstreamer::Caps::builder("application/x-rtp")
77 .field("media", "audio")
78 .field("encoding-name", "OPUS")
79 .field("payload", payload)
80 .build(),
81 MediaStreamType::Video => gstreamer::Caps::builder("application/x-rtp")
82 .field("media", "video")
83 .field("encoding-name", "VP8")
84 .field("payload", payload)
85 .build(),
86 }
87 }
88
89 pub fn src_element(&self) -> gstreamer::Element {
90 self.elements.last().unwrap().clone()
91 }
92
93 pub fn attach_to_pipeline(&mut self, pipeline: &gstreamer::Pipeline) {
94 assert!(self.pipeline.is_none());
95 let elements: Vec<_> = self.elements.iter().collect();
96 pipeline.add_many(&elements[..]).unwrap();
97 gstreamer::Element::link_many(&elements[..]).unwrap();
98 for element in elements {
99 element.sync_state_with_parent().unwrap();
100 }
101 self.pipeline = Some(pipeline.clone());
102 }
103
104 pub fn pipeline_or_new(&mut self) -> gstreamer::Pipeline {
105 match self.pipeline {
106 Some(ref pipeline) => pipeline.clone(),
107 _ => {
108 let pipeline =
109 gstreamer::Pipeline::with_name("gstreamermediastream fresh pipeline");
110 let clock = gstreamer::SystemClock::obtain();
111 pipeline.set_start_time(gstreamer::ClockTime::NONE);
112 pipeline.set_base_time(*BACKEND_BASE_TIME);
113 pipeline.use_clock(Some(&clock));
114 self.attach_to_pipeline(&pipeline);
115 pipeline
116 },
117 }
118 }
119
120 pub fn create_video() -> MediaStreamId {
121 let videotestsrc = gstreamer::ElementFactory::make("videotestsrc")
122 .property_from_str("pattern", "ball")
123 .property("is-live", true)
124 .build()
125 .unwrap();
126 Self::create_video_from(videotestsrc)
127 }
128
129 pub fn encoded(&mut self) -> Result<gstreamer::Element, BoolError> {
131 let pipeline = self
132 .pipeline
133 .as_ref()
134 .expect("GStreamerMediaStream::encoded() should not be called without a pipeline");
135 let src = self.src_element();
136
137 let capsfilter = gstreamer::ElementFactory::make("capsfilter")
138 .property("caps", self.caps())
139 .build()?;
140 match self.type_ {
141 MediaStreamType::Video => {
142 let vp8enc = gstreamer::ElementFactory::make("vp8enc")
143 .property("deadline", 1i64)
144 .property("error-resilient", "default")
145 .property("cpu-used", -16i32)
146 .property("lag-in-frames", 0i32)
147 .build()?;
148
149 let rtpvp8pay = gstreamer::ElementFactory::make("rtpvp8pay")
150 .property("picture-id-mode", "15-bit")
151 .property("mtu", 1200u32)
152 .build()?;
153 let queue2 = gstreamer::ElementFactory::make("queue").build()?;
154
155 pipeline.add_many([&vp8enc, &rtpvp8pay, &queue2, &capsfilter])?;
156 gstreamer::Element::link_many([&src, &vp8enc, &rtpvp8pay, &queue2, &capsfilter])?;
157 vp8enc.sync_state_with_parent()?;
158 rtpvp8pay.sync_state_with_parent()?;
159 queue2.sync_state_with_parent()?;
160 capsfilter.sync_state_with_parent()?;
161 Ok(capsfilter)
162 },
163 MediaStreamType::Audio => {
164 let opusenc = gstreamer::ElementFactory::make("opusenc").build()?;
165 let rtpopuspay = gstreamer::ElementFactory::make("rtpopuspay")
166 .property("mtu", 1200u32)
167 .build()?;
168 let queue3 = gstreamer::ElementFactory::make("queue").build()?;
169 pipeline.add_many([&opusenc, &rtpopuspay, &queue3, &capsfilter])?;
170 gstreamer::Element::link_many([&src, &opusenc, &rtpopuspay, &queue3, &capsfilter])?;
171 opusenc.sync_state_with_parent()?;
172 rtpopuspay.sync_state_with_parent()?;
173 queue3.sync_state_with_parent()?;
174 Ok(capsfilter)
175 },
176 }
177 }
178
179 pub fn create_video_from(source: gstreamer::Element) -> MediaStreamId {
180 let videoconvert = gstreamer::ElementFactory::make("videoconvert")
181 .build()
182 .unwrap();
183 let queue = gstreamer::ElementFactory::make("queue").build().unwrap();
184
185 register_stream(Arc::new(Mutex::new(GStreamerMediaStream::new(
186 MediaStreamType::Video,
187 vec![source, videoconvert, queue],
188 ))))
189 }
190
191 pub fn create_audio() -> MediaStreamId {
192 let audiotestsrc = gstreamer::ElementFactory::make("audiotestsrc")
193 .property_from_str("wave", "sine")
194 .property("is-live", true)
195 .build()
196 .unwrap();
197
198 Self::create_audio_from(audiotestsrc)
199 }
200
201 pub fn create_audio_from(source: gstreamer::Element) -> MediaStreamId {
202 let queue = gstreamer::ElementFactory::make("queue").build().unwrap();
203 let audioconvert = gstreamer::ElementFactory::make("audioconvert")
204 .build()
205 .unwrap();
206 let audioresample = gstreamer::ElementFactory::make("audioresample")
207 .build()
208 .unwrap();
209 let queue2 = gstreamer::ElementFactory::make("queue").build().unwrap();
210
211 register_stream(Arc::new(Mutex::new(GStreamerMediaStream::new(
212 MediaStreamType::Audio,
213 vec![source, queue, audioconvert, audioresample, queue2],
214 ))))
215 }
216
217 pub fn create_proxy(ty: MediaStreamType) -> (MediaStreamId, GstreamerMediaSocket) {
218 let proxy_sink = gstreamer::ElementFactory::make("proxysink")
219 .build()
220 .unwrap();
221 let proxy_src = gstreamer::ElementFactory::make("proxysrc")
222 .property("proxysink", &proxy_sink)
223 .build()
224 .unwrap();
225 let stream = match ty {
226 MediaStreamType::Audio => Self::create_audio_from(proxy_src),
227 MediaStreamType::Video => Self::create_video_from(proxy_src),
228 };
229
230 (stream, GstreamerMediaSocket { proxy_sink })
231 }
232}
233
234impl Drop for GStreamerMediaStream {
235 fn drop(&mut self) {
236 if let Some(ref id) = self.id {
237 unregister_stream(id);
238 }
239 }
240}
241
242#[derive(Default)]
243pub struct MediaSink {
244 streams: Vec<Arc<Mutex<dyn MediaStream>>>,
245}
246
247impl MediaOutput for MediaSink {
248 fn add_stream(&mut self, stream: &MediaStreamId) {
249 let stream = get_stream(stream).expect("Media streams registry does not contain such ID");
250 {
251 let mut stream = stream.lock().unwrap();
252 let stream = stream
253 .as_mut_any()
254 .downcast_mut::<GStreamerMediaStream>()
255 .unwrap();
256 let pipeline = stream.pipeline_or_new();
257 let last_element = stream.elements.last();
258 let last_element = last_element.as_ref().unwrap();
259 let sink = match stream.type_ {
260 MediaStreamType::Audio => "autoaudiosink",
261 MediaStreamType::Video => "autovideosink",
262 };
263 let sink = gstreamer::ElementFactory::make(sink).build().unwrap();
264 pipeline.add(&sink).unwrap();
265 gstreamer::Element::link_many(&[last_element, &sink][..]).unwrap();
266
267 pipeline.set_state(gstreamer::State::Playing).unwrap();
268 sink.sync_state_with_parent().unwrap();
269 }
270 self.streams.push(stream.clone());
271 }
272}
273
274pub struct GstreamerMediaSocket {
275 proxy_sink: gstreamer::Element,
276}
277
278impl GstreamerMediaSocket {
279 pub fn proxy_sink(&self) -> &gstreamer::Element {
280 &self.proxy_sink
281 }
282}
283
284impl MediaSocket for GstreamerMediaSocket {
285 fn as_any(&self) -> &dyn Any {
286 self
287 }
288}