servo_media_gstreamer/
media_stream.rs1use super::BACKEND_BASE_TIME;
2use gst;
3use gst::prelude::*;
4use once_cell::sync::Lazy;
5use servo_media_streams::registry::{
6 get_stream, register_stream, unregister_stream, MediaStreamId,
7};
8use servo_media_streams::{MediaOutput, MediaSocket, MediaStream, MediaStreamType};
9use std::any::Any;
10use std::sync::{Arc, Mutex};
11
12pub static RTP_CAPS_OPUS: Lazy<gst::Caps> = Lazy::new(|| {
13 gst::Caps::builder("application/x-rtp")
14 .field("media", "audio")
15 .field("encoding-name", "OPUS")
16 .build()
17});
18
19pub static RTP_CAPS_VP8: Lazy<gst::Caps> = Lazy::new(|| {
20 gst::Caps::builder("application/x-rtp")
21 .field("media", "video")
22 .field("encoding-name", "VP8")
23 .build()
24});
25
26pub struct GStreamerMediaStream {
27 id: Option<MediaStreamId>,
28 type_: MediaStreamType,
29 elements: Vec<gst::Element>,
30 pipeline: Option<gst::Pipeline>,
31}
32
33impl MediaStream for GStreamerMediaStream {
34 fn as_any(&self) -> &dyn Any {
35 self
36 }
37
38 fn as_mut_any(&mut self) -> &mut dyn Any {
39 self
40 }
41
42 fn set_id(&mut self, id: MediaStreamId) {
43 self.id = Some(id);
44 }
45
46 fn ty(&self) -> MediaStreamType {
47 self.type_
48 }
49}
50
51impl GStreamerMediaStream {
52 pub fn new(type_: MediaStreamType, elements: Vec<gst::Element>) -> Self {
53 Self {
54 id: None,
55 type_,
56 elements,
57 pipeline: None,
58 }
59 }
60
61 pub fn caps(&self) -> &gst::Caps {
62 match self.type_ {
63 MediaStreamType::Audio => &RTP_CAPS_OPUS,
64 MediaStreamType::Video => &RTP_CAPS_VP8,
65 }
66 }
67
68 pub fn caps_with_payload(&self, payload: i32) -> gst::Caps {
69 match self.type_ {
70 MediaStreamType::Audio => gst::Caps::builder("application/x-rtp")
71 .field("media", "audio")
72 .field("encoding-name", "OPUS")
73 .field("payload", payload)
74 .build(),
75 MediaStreamType::Video => gst::Caps::builder("application/x-rtp")
76 .field("media", "video")
77 .field("encoding-name", "VP8")
78 .field("payload", payload)
79 .build(),
80 }
81 }
82
83 pub fn src_element(&self) -> gst::Element {
84 self.elements.last().unwrap().clone()
85 }
86
87 pub fn attach_to_pipeline(&mut self, pipeline: &gst::Pipeline) {
88 assert!(self.pipeline.is_none());
89 let elements: Vec<_> = self.elements.iter().collect();
90 pipeline.add_many(&elements[..]).unwrap();
91 gst::Element::link_many(&elements[..]).unwrap();
92 for element in elements {
93 element.sync_state_with_parent().unwrap();
94 }
95 self.pipeline = Some(pipeline.clone());
96 }
97
98 pub fn pipeline_or_new(&mut self) -> gst::Pipeline {
99 if let Some(ref pipeline) = self.pipeline {
100 pipeline.clone()
101 } else {
102 let pipeline = gst::Pipeline::with_name("gstreamermediastream fresh pipeline");
103 let clock = gst::SystemClock::obtain();
104 pipeline.set_start_time(gst::ClockTime::NONE);
105 pipeline.set_base_time(*BACKEND_BASE_TIME);
106 pipeline.use_clock(Some(&clock));
107 self.attach_to_pipeline(&pipeline);
108 pipeline
109 }
110 }
111
112 pub fn create_video() -> MediaStreamId {
113 let videotestsrc = gst::ElementFactory::make("videotestsrc")
114 .property_from_str("pattern", "ball")
115 .property("is-live", true)
116 .build()
117 .unwrap();
118 Self::create_video_from(videotestsrc)
119 }
120
121 pub fn encoded(&mut self) -> gst::Element {
123 let pipeline = self
124 .pipeline
125 .as_ref()
126 .expect("GStreamerMediaStream::encoded() should not be called without a pipeline");
127 let src = self.src_element();
128
129 let capsfilter = gst::ElementFactory::make("capsfilter")
130 .property("caps", self.caps())
131 .build()
132 .unwrap();
133 match self.type_ {
134 MediaStreamType::Video => {
135 let vp8enc = gst::ElementFactory::make("vp8enc")
136 .property("deadline", 1i64)
137 .property("error-resilient", "default")
138 .property("cpu-used", -16i32)
139 .property("lag-in-frames", 0i32)
140 .build()
141 .unwrap();
142
143 let rtpvp8pay = gst::ElementFactory::make("rtpvp8pay")
144 .property("picture-id-mode", "15-bit")
145 .property("mtu", 1200u32)
146 .build()
147 .unwrap();
148 let queue2 = gst::ElementFactory::make("queue").build().unwrap();
149
150 pipeline
151 .add_many(&[&vp8enc, &rtpvp8pay, &queue2, &capsfilter])
152 .unwrap();
153 gst::Element::link_many(&[&src, &vp8enc, &rtpvp8pay, &queue2, &capsfilter])
154 .unwrap();
155 vp8enc.sync_state_with_parent().unwrap();
156 rtpvp8pay.sync_state_with_parent().unwrap();
157 queue2.sync_state_with_parent().unwrap();
158 capsfilter.sync_state_with_parent().unwrap();
159 capsfilter
160 }
161 MediaStreamType::Audio => {
162 let opusenc = gst::ElementFactory::make("opusenc").build().unwrap();
163 let rtpopuspay = gst::ElementFactory::make("rtpopuspay")
164 .property("mtu", 1200u32)
165 .build()
166 .unwrap();
167 let queue3 = gst::ElementFactory::make("queue").build().unwrap();
168 pipeline
169 .add_many(&[&opusenc, &rtpopuspay, &queue3, &capsfilter])
170 .unwrap();
171 gst::Element::link_many(&[&src, &opusenc, &rtpopuspay, &queue3, &capsfilter])
172 .unwrap();
173 opusenc.sync_state_with_parent().unwrap();
174 rtpopuspay.sync_state_with_parent().unwrap();
175 queue3.sync_state_with_parent().unwrap();
176 capsfilter
177 }
178 }
179 }
180
181 pub fn create_video_from(source: gst::Element) -> MediaStreamId {
182 let videoconvert = gst::ElementFactory::make("videoconvert").build().unwrap();
183 let queue = gst::ElementFactory::make("queue").build().unwrap();
184
185 register_stream(Arc::new(Mutex::new(GStreamerMediaStream::new(
186 MediaStreamType::Video,
187 vec![source, videoconvert, queue],
188 ))))
189 }
190
191 pub fn create_audio() -> MediaStreamId {
192 let audiotestsrc = gst::ElementFactory::make("audiotestsrc")
193 .property_from_str("wave", "sine")
194 .property("is-live", true)
195 .build()
196 .unwrap();
197
198 Self::create_audio_from(audiotestsrc)
199 }
200
201 pub fn create_audio_from(source: gst::Element) -> MediaStreamId {
202 let queue = gst::ElementFactory::make("queue").build().unwrap();
203 let audioconvert = gst::ElementFactory::make("audioconvert").build().unwrap();
204 let audioresample = gst::ElementFactory::make("audioresample").build().unwrap();
205 let queue2 = gst::ElementFactory::make("queue").build().unwrap();
206
207 register_stream(Arc::new(Mutex::new(GStreamerMediaStream::new(
208 MediaStreamType::Audio,
209 vec![source, queue, audioconvert, audioresample, queue2],
210 ))))
211 }
212
213 pub fn create_proxy(ty: MediaStreamType) -> (MediaStreamId, GstreamerMediaSocket) {
214 let proxy_sink = gst::ElementFactory::make("proxysink").build().unwrap();
215 let proxy_src = gst::ElementFactory::make("proxysrc")
216 .property("proxysink", &proxy_sink)
217 .build()
218 .unwrap();
219 let stream = match ty {
220 MediaStreamType::Audio => Self::create_audio_from(proxy_src),
221 MediaStreamType::Video => Self::create_video_from(proxy_src),
222 };
223
224 (stream, GstreamerMediaSocket { proxy_sink })
225 }
226}
227
228impl Drop for GStreamerMediaStream {
229 fn drop(&mut self) {
230 if let Some(ref id) = self.id {
231 unregister_stream(id);
232 }
233 }
234}
235
236pub struct MediaSink {
237 streams: Vec<Arc<Mutex<dyn MediaStream>>>,
238}
239
240impl MediaSink {
241 pub fn new() -> Self {
242 MediaSink { streams: vec![] }
243 }
244}
245
246impl MediaOutput for MediaSink {
247 fn add_stream(&mut self, stream: &MediaStreamId) {
248 let stream = get_stream(stream).expect("Media streams registry does not contain such ID");
249 {
250 let mut stream = stream.lock().unwrap();
251 let stream = stream
252 .as_mut_any()
253 .downcast_mut::<GStreamerMediaStream>()
254 .unwrap();
255 let pipeline = stream.pipeline_or_new();
256 let last_element = stream.elements.last();
257 let last_element = last_element.as_ref().unwrap();
258 let sink = match stream.type_ {
259 MediaStreamType::Audio => "autoaudiosink",
260 MediaStreamType::Video => "autovideosink",
261 };
262 let sink = gst::ElementFactory::make(sink).build().unwrap();
263 pipeline.add(&sink).unwrap();
264 gst::Element::link_many(&[last_element, &sink][..]).unwrap();
265
266 pipeline.set_state(gst::State::Playing).unwrap();
267 sink.sync_state_with_parent().unwrap();
268 }
269 self.streams.push(stream.clone());
270 }
271}
272
273pub struct GstreamerMediaSocket {
274 proxy_sink: gst::Element,
275}
276
277impl GstreamerMediaSocket {
278 pub fn proxy_sink(&self) -> &gst::Element {
279 &self.proxy_sink
280 }
281}
282
283impl MediaSocket for GstreamerMediaSocket {
284 fn as_any(&self) -> &dyn Any {
285 self
286 }
287}