servo_media_gstreamer/
media_stream.rs

1use super::BACKEND_BASE_TIME;
2use gst;
3use gst::prelude::*;
4use once_cell::sync::Lazy;
5use servo_media_streams::registry::{
6    get_stream, register_stream, unregister_stream, MediaStreamId,
7};
8use servo_media_streams::{MediaOutput, MediaSocket, MediaStream, MediaStreamType};
9use std::any::Any;
10use std::sync::{Arc, Mutex};
11
12pub static RTP_CAPS_OPUS: Lazy<gst::Caps> = Lazy::new(|| {
13    gst::Caps::builder("application/x-rtp")
14        .field("media", "audio")
15        .field("encoding-name", "OPUS")
16        .build()
17});
18
19pub static RTP_CAPS_VP8: Lazy<gst::Caps> = Lazy::new(|| {
20    gst::Caps::builder("application/x-rtp")
21        .field("media", "video")
22        .field("encoding-name", "VP8")
23        .build()
24});
25
26pub struct GStreamerMediaStream {
27    id: Option<MediaStreamId>,
28    type_: MediaStreamType,
29    elements: Vec<gst::Element>,
30    pipeline: Option<gst::Pipeline>,
31}
32
33impl MediaStream for GStreamerMediaStream {
34    fn as_any(&self) -> &dyn Any {
35        self
36    }
37
38    fn as_mut_any(&mut self) -> &mut dyn Any {
39        self
40    }
41
42    fn set_id(&mut self, id: MediaStreamId) {
43        self.id = Some(id);
44    }
45
46    fn ty(&self) -> MediaStreamType {
47        self.type_
48    }
49}
50
51impl GStreamerMediaStream {
52    pub fn new(type_: MediaStreamType, elements: Vec<gst::Element>) -> Self {
53        Self {
54            id: None,
55            type_,
56            elements,
57            pipeline: None,
58        }
59    }
60
61    pub fn caps(&self) -> &gst::Caps {
62        match self.type_ {
63            MediaStreamType::Audio => &RTP_CAPS_OPUS,
64            MediaStreamType::Video => &RTP_CAPS_VP8,
65        }
66    }
67
68    pub fn caps_with_payload(&self, payload: i32) -> gst::Caps {
69        match self.type_ {
70            MediaStreamType::Audio => gst::Caps::builder("application/x-rtp")
71                .field("media", "audio")
72                .field("encoding-name", "OPUS")
73                .field("payload", payload)
74                .build(),
75            MediaStreamType::Video => gst::Caps::builder("application/x-rtp")
76                .field("media", "video")
77                .field("encoding-name", "VP8")
78                .field("payload", payload)
79                .build(),
80        }
81    }
82
83    pub fn src_element(&self) -> gst::Element {
84        self.elements.last().unwrap().clone()
85    }
86
87    pub fn attach_to_pipeline(&mut self, pipeline: &gst::Pipeline) {
88        assert!(self.pipeline.is_none());
89        let elements: Vec<_> = self.elements.iter().collect();
90        pipeline.add_many(&elements[..]).unwrap();
91        gst::Element::link_many(&elements[..]).unwrap();
92        for element in elements {
93            element.sync_state_with_parent().unwrap();
94        }
95        self.pipeline = Some(pipeline.clone());
96    }
97
98    pub fn pipeline_or_new(&mut self) -> gst::Pipeline {
99        if let Some(ref pipeline) = self.pipeline {
100            pipeline.clone()
101        } else {
102            let pipeline = gst::Pipeline::with_name("gstreamermediastream fresh pipeline");
103            let clock = gst::SystemClock::obtain();
104            pipeline.set_start_time(gst::ClockTime::NONE);
105            pipeline.set_base_time(*BACKEND_BASE_TIME);
106            pipeline.use_clock(Some(&clock));
107            self.attach_to_pipeline(&pipeline);
108            pipeline
109        }
110    }
111
112    pub fn create_video() -> MediaStreamId {
113        let videotestsrc = gst::ElementFactory::make("videotestsrc")
114            .property_from_str("pattern", "ball")
115            .property("is-live", true)
116            .build()
117            .unwrap();
118        Self::create_video_from(videotestsrc)
119    }
120
121    /// Attaches encoding adapters to the stream, returning the source element
122    pub fn encoded(&mut self) -> gst::Element {
123        let pipeline = self
124            .pipeline
125            .as_ref()
126            .expect("GStreamerMediaStream::encoded() should not be called without a pipeline");
127        let src = self.src_element();
128
129        let capsfilter = gst::ElementFactory::make("capsfilter")
130            .property("caps", self.caps())
131            .build()
132            .unwrap();
133        match self.type_ {
134            MediaStreamType::Video => {
135                let vp8enc = gst::ElementFactory::make("vp8enc")
136                    .property("deadline", 1i64)
137                    .property("error-resilient", "default")
138                    .property("cpu-used", -16i32)
139                    .property("lag-in-frames", 0i32)
140                    .build()
141                    .unwrap();
142
143                let rtpvp8pay = gst::ElementFactory::make("rtpvp8pay")
144                    .property("picture-id-mode", "15-bit")
145                    .property("mtu", 1200u32)
146                    .build()
147                    .unwrap();
148                let queue2 = gst::ElementFactory::make("queue").build().unwrap();
149
150                pipeline
151                    .add_many(&[&vp8enc, &rtpvp8pay, &queue2, &capsfilter])
152                    .unwrap();
153                gst::Element::link_many(&[&src, &vp8enc, &rtpvp8pay, &queue2, &capsfilter])
154                    .unwrap();
155                vp8enc.sync_state_with_parent().unwrap();
156                rtpvp8pay.sync_state_with_parent().unwrap();
157                queue2.sync_state_with_parent().unwrap();
158                capsfilter.sync_state_with_parent().unwrap();
159                capsfilter
160            }
161            MediaStreamType::Audio => {
162                let opusenc = gst::ElementFactory::make("opusenc").build().unwrap();
163                let rtpopuspay = gst::ElementFactory::make("rtpopuspay")
164                    .property("mtu", 1200u32)
165                    .build()
166                    .unwrap();
167                let queue3 = gst::ElementFactory::make("queue").build().unwrap();
168                pipeline
169                    .add_many(&[&opusenc, &rtpopuspay, &queue3, &capsfilter])
170                    .unwrap();
171                gst::Element::link_many(&[&src, &opusenc, &rtpopuspay, &queue3, &capsfilter])
172                    .unwrap();
173                opusenc.sync_state_with_parent().unwrap();
174                rtpopuspay.sync_state_with_parent().unwrap();
175                queue3.sync_state_with_parent().unwrap();
176                capsfilter
177            }
178        }
179    }
180
181    pub fn create_video_from(source: gst::Element) -> MediaStreamId {
182        let videoconvert = gst::ElementFactory::make("videoconvert").build().unwrap();
183        let queue = gst::ElementFactory::make("queue").build().unwrap();
184
185        register_stream(Arc::new(Mutex::new(GStreamerMediaStream::new(
186            MediaStreamType::Video,
187            vec![source, videoconvert, queue],
188        ))))
189    }
190
191    pub fn create_audio() -> MediaStreamId {
192        let audiotestsrc = gst::ElementFactory::make("audiotestsrc")
193            .property_from_str("wave", "sine")
194            .property("is-live", true)
195            .build()
196            .unwrap();
197
198        Self::create_audio_from(audiotestsrc)
199    }
200
201    pub fn create_audio_from(source: gst::Element) -> MediaStreamId {
202        let queue = gst::ElementFactory::make("queue").build().unwrap();
203        let audioconvert = gst::ElementFactory::make("audioconvert").build().unwrap();
204        let audioresample = gst::ElementFactory::make("audioresample").build().unwrap();
205        let queue2 = gst::ElementFactory::make("queue").build().unwrap();
206
207        register_stream(Arc::new(Mutex::new(GStreamerMediaStream::new(
208            MediaStreamType::Audio,
209            vec![source, queue, audioconvert, audioresample, queue2],
210        ))))
211    }
212
213    pub fn create_proxy(ty: MediaStreamType) -> (MediaStreamId, GstreamerMediaSocket) {
214        let proxy_sink = gst::ElementFactory::make("proxysink").build().unwrap();
215        let proxy_src = gst::ElementFactory::make("proxysrc")
216            .property("proxysink", &proxy_sink)
217            .build()
218            .unwrap();
219        let stream = match ty {
220            MediaStreamType::Audio => Self::create_audio_from(proxy_src),
221            MediaStreamType::Video => Self::create_video_from(proxy_src),
222        };
223
224        (stream, GstreamerMediaSocket { proxy_sink })
225    }
226}
227
228impl Drop for GStreamerMediaStream {
229    fn drop(&mut self) {
230        if let Some(ref id) = self.id {
231            unregister_stream(id);
232        }
233    }
234}
235
236pub struct MediaSink {
237    streams: Vec<Arc<Mutex<dyn MediaStream>>>,
238}
239
240impl MediaSink {
241    pub fn new() -> Self {
242        MediaSink { streams: vec![] }
243    }
244}
245
246impl MediaOutput for MediaSink {
247    fn add_stream(&mut self, stream: &MediaStreamId) {
248        let stream = get_stream(stream).expect("Media streams registry does not contain such ID");
249        {
250            let mut stream = stream.lock().unwrap();
251            let stream = stream
252                .as_mut_any()
253                .downcast_mut::<GStreamerMediaStream>()
254                .unwrap();
255            let pipeline = stream.pipeline_or_new();
256            let last_element = stream.elements.last();
257            let last_element = last_element.as_ref().unwrap();
258            let sink = match stream.type_ {
259                MediaStreamType::Audio => "autoaudiosink",
260                MediaStreamType::Video => "autovideosink",
261            };
262            let sink = gst::ElementFactory::make(sink).build().unwrap();
263            pipeline.add(&sink).unwrap();
264            gst::Element::link_many(&[last_element, &sink][..]).unwrap();
265
266            pipeline.set_state(gst::State::Playing).unwrap();
267            sink.sync_state_with_parent().unwrap();
268        }
269        self.streams.push(stream.clone());
270    }
271}
272
273pub struct GstreamerMediaSocket {
274    proxy_sink: gst::Element,
275}
276
277impl GstreamerMediaSocket {
278    pub fn proxy_sink(&self) -> &gst::Element {
279        &self.proxy_sink
280    }
281}
282
283impl MediaSocket for GstreamerMediaSocket {
284    fn as_any(&self) -> &dyn Any {
285        self
286    }
287}