Skip to main content

servo_media_gstreamer/
media_stream.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
4
5use std::any::Any;
6use std::sync::{Arc, LazyLock, Mutex};
7
8use glib::BoolError;
9use gstreamer;
10use gstreamer::prelude::*;
11use servo_media_streams::registry::{
12    MediaStreamId, get_stream, register_stream, unregister_stream,
13};
14use servo_media_streams::{MediaOutput, MediaSocket, MediaStream, MediaStreamType};
15
16use super::BACKEND_BASE_TIME;
17
18pub static RTP_CAPS_OPUS: LazyLock<gstreamer::Caps> = LazyLock::new(|| {
19    gstreamer::Caps::builder("application/x-rtp")
20        .field("media", "audio")
21        .field("encoding-name", "OPUS")
22        .build()
23});
24
25pub static RTP_CAPS_VP8: LazyLock<gstreamer::Caps> = LazyLock::new(|| {
26    gstreamer::Caps::builder("application/x-rtp")
27        .field("media", "video")
28        .field("encoding-name", "VP8")
29        .build()
30});
31
32pub struct GStreamerMediaStream {
33    id: Option<MediaStreamId>,
34    type_: MediaStreamType,
35    elements: Vec<gstreamer::Element>,
36    pipeline: Option<gstreamer::Pipeline>,
37}
38
39impl MediaStream for GStreamerMediaStream {
40    fn as_any(&self) -> &dyn Any {
41        self
42    }
43
44    fn as_mut_any(&mut self) -> &mut dyn Any {
45        self
46    }
47
48    fn set_id(&mut self, id: MediaStreamId) {
49        self.id = Some(id);
50    }
51
52    fn ty(&self) -> MediaStreamType {
53        self.type_
54    }
55}
56
57impl GStreamerMediaStream {
58    pub fn new(type_: MediaStreamType, elements: Vec<gstreamer::Element>) -> Self {
59        Self {
60            id: None,
61            type_,
62            elements,
63            pipeline: None,
64        }
65    }
66
67    pub fn caps(&self) -> &gstreamer::Caps {
68        match self.type_ {
69            MediaStreamType::Audio => &RTP_CAPS_OPUS,
70            MediaStreamType::Video => &RTP_CAPS_VP8,
71        }
72    }
73
74    pub fn caps_with_payload(&self, payload: i32) -> gstreamer::Caps {
75        match self.type_ {
76            MediaStreamType::Audio => gstreamer::Caps::builder("application/x-rtp")
77                .field("media", "audio")
78                .field("encoding-name", "OPUS")
79                .field("payload", payload)
80                .build(),
81            MediaStreamType::Video => gstreamer::Caps::builder("application/x-rtp")
82                .field("media", "video")
83                .field("encoding-name", "VP8")
84                .field("payload", payload)
85                .build(),
86        }
87    }
88
89    pub fn src_element(&self) -> gstreamer::Element {
90        self.elements.last().unwrap().clone()
91    }
92
93    pub fn attach_to_pipeline(&mut self, pipeline: &gstreamer::Pipeline) {
94        assert!(self.pipeline.is_none());
95        let elements: Vec<_> = self.elements.iter().collect();
96        pipeline.add_many(&elements[..]).unwrap();
97        gstreamer::Element::link_many(&elements[..]).unwrap();
98        for element in elements {
99            element.sync_state_with_parent().unwrap();
100        }
101        self.pipeline = Some(pipeline.clone());
102    }
103
104    pub fn pipeline_or_new(&mut self) -> gstreamer::Pipeline {
105        match self.pipeline {
106            Some(ref pipeline) => pipeline.clone(),
107            _ => {
108                let pipeline =
109                    gstreamer::Pipeline::with_name("gstreamermediastream fresh pipeline");
110                let clock = gstreamer::SystemClock::obtain();
111                pipeline.set_start_time(gstreamer::ClockTime::NONE);
112                pipeline.set_base_time(*BACKEND_BASE_TIME);
113                pipeline.use_clock(Some(&clock));
114                self.attach_to_pipeline(&pipeline);
115                pipeline
116            },
117        }
118    }
119
120    pub fn create_video() -> MediaStreamId {
121        let videotestsrc = gstreamer::ElementFactory::make("videotestsrc")
122            .property_from_str("pattern", "ball")
123            .property("is-live", true)
124            .build()
125            .unwrap();
126        Self::create_video_from(videotestsrc)
127    }
128
129    /// Attaches encoding adapters to the stream, returning the source element when successful.
130    pub fn encoded(&mut self) -> Result<gstreamer::Element, BoolError> {
131        let pipeline = self
132            .pipeline
133            .as_ref()
134            .expect("GStreamerMediaStream::encoded() should not be called without a pipeline");
135        let src = self.src_element();
136
137        let capsfilter = gstreamer::ElementFactory::make("capsfilter")
138            .property("caps", self.caps())
139            .build()?;
140        match self.type_ {
141            MediaStreamType::Video => {
142                let vp8enc = gstreamer::ElementFactory::make("vp8enc")
143                    .property("deadline", 1i64)
144                    .property("error-resilient", "default")
145                    .property("cpu-used", -16i32)
146                    .property("lag-in-frames", 0i32)
147                    .build()?;
148
149                let rtpvp8pay = gstreamer::ElementFactory::make("rtpvp8pay")
150                    .property("picture-id-mode", "15-bit")
151                    .property("mtu", 1200u32)
152                    .build()?;
153                let queue2 = gstreamer::ElementFactory::make("queue").build()?;
154
155                pipeline.add_many([&vp8enc, &rtpvp8pay, &queue2, &capsfilter])?;
156                gstreamer::Element::link_many([&src, &vp8enc, &rtpvp8pay, &queue2, &capsfilter])?;
157                vp8enc.sync_state_with_parent()?;
158                rtpvp8pay.sync_state_with_parent()?;
159                queue2.sync_state_with_parent()?;
160                capsfilter.sync_state_with_parent()?;
161                Ok(capsfilter)
162            },
163            MediaStreamType::Audio => {
164                let opusenc = gstreamer::ElementFactory::make("opusenc").build()?;
165                let rtpopuspay = gstreamer::ElementFactory::make("rtpopuspay")
166                    .property("mtu", 1200u32)
167                    .build()?;
168                let queue3 = gstreamer::ElementFactory::make("queue").build()?;
169                pipeline.add_many([&opusenc, &rtpopuspay, &queue3, &capsfilter])?;
170                gstreamer::Element::link_many([&src, &opusenc, &rtpopuspay, &queue3, &capsfilter])?;
171                opusenc.sync_state_with_parent()?;
172                rtpopuspay.sync_state_with_parent()?;
173                queue3.sync_state_with_parent()?;
174                Ok(capsfilter)
175            },
176        }
177    }
178
179    pub fn create_video_from(source: gstreamer::Element) -> MediaStreamId {
180        let videoconvert = gstreamer::ElementFactory::make("videoconvert")
181            .build()
182            .unwrap();
183        let queue = gstreamer::ElementFactory::make("queue").build().unwrap();
184
185        register_stream(Arc::new(Mutex::new(GStreamerMediaStream::new(
186            MediaStreamType::Video,
187            vec![source, videoconvert, queue],
188        ))))
189    }
190
191    pub fn create_audio() -> MediaStreamId {
192        let audiotestsrc = gstreamer::ElementFactory::make("audiotestsrc")
193            .property_from_str("wave", "sine")
194            .property("is-live", true)
195            .build()
196            .unwrap();
197
198        Self::create_audio_from(audiotestsrc)
199    }
200
201    pub fn create_audio_from(source: gstreamer::Element) -> MediaStreamId {
202        let queue = gstreamer::ElementFactory::make("queue").build().unwrap();
203        let audioconvert = gstreamer::ElementFactory::make("audioconvert")
204            .build()
205            .unwrap();
206        let audioresample = gstreamer::ElementFactory::make("audioresample")
207            .build()
208            .unwrap();
209        let queue2 = gstreamer::ElementFactory::make("queue").build().unwrap();
210
211        register_stream(Arc::new(Mutex::new(GStreamerMediaStream::new(
212            MediaStreamType::Audio,
213            vec![source, queue, audioconvert, audioresample, queue2],
214        ))))
215    }
216
217    pub fn create_proxy(ty: MediaStreamType) -> (MediaStreamId, GstreamerMediaSocket) {
218        let proxy_sink = gstreamer::ElementFactory::make("proxysink")
219            .build()
220            .unwrap();
221        let proxy_src = gstreamer::ElementFactory::make("proxysrc")
222            .property("proxysink", &proxy_sink)
223            .build()
224            .unwrap();
225        let stream = match ty {
226            MediaStreamType::Audio => Self::create_audio_from(proxy_src),
227            MediaStreamType::Video => Self::create_video_from(proxy_src),
228        };
229
230        (stream, GstreamerMediaSocket { proxy_sink })
231    }
232}
233
234impl Drop for GStreamerMediaStream {
235    fn drop(&mut self) {
236        if let Some(ref id) = self.id {
237            unregister_stream(id);
238        }
239    }
240}
241
242#[derive(Default)]
243pub struct MediaSink {
244    streams: Vec<Arc<Mutex<dyn MediaStream>>>,
245}
246
247impl MediaOutput for MediaSink {
248    fn add_stream(&mut self, stream: &MediaStreamId) {
249        let stream = get_stream(stream).expect("Media streams registry does not contain such ID");
250        {
251            let mut stream = stream.lock().unwrap();
252            let stream = stream
253                .as_mut_any()
254                .downcast_mut::<GStreamerMediaStream>()
255                .unwrap();
256            let pipeline = stream.pipeline_or_new();
257            let last_element = stream.elements.last();
258            let last_element = last_element.as_ref().unwrap();
259            let sink = match stream.type_ {
260                MediaStreamType::Audio => "autoaudiosink",
261                MediaStreamType::Video => "autovideosink",
262            };
263            let sink = gstreamer::ElementFactory::make(sink).build().unwrap();
264            pipeline.add(&sink).unwrap();
265            gstreamer::Element::link_many(&[last_element, &sink][..]).unwrap();
266
267            pipeline.set_state(gstreamer::State::Playing).unwrap();
268            sink.sync_state_with_parent().unwrap();
269        }
270        self.streams.push(stream.clone());
271    }
272}
273
274pub struct GstreamerMediaSocket {
275    proxy_sink: gstreamer::Element,
276}
277
278impl GstreamerMediaSocket {
279    pub fn proxy_sink(&self) -> &gstreamer::Element {
280        &self.proxy_sink
281    }
282}
283
284impl MediaSocket for GstreamerMediaSocket {
285    fn as_any(&self) -> &dyn Any {
286        self
287    }
288}