Skip to main content

servo_media_gstreamer/
source.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
4
5use std::convert::TryFrom;
6use std::sync::Mutex;
7use std::sync::atomic::{AtomicBool, Ordering};
8
9use glib::subclass::prelude::*;
10use gstreamer::prelude::*;
11use gstreamer::subclass::prelude::*;
12use url::Url;
13
14const MAX_SRC_QUEUE_SIZE: u64 = 50 * 1024 * 1024; // 50 MB.
15
16// Implementation sub-module of the GObject
17mod imp {
18    use std::sync::LazyLock;
19
20    use super::*;
21
22    macro_rules! inner_appsrc_proxy {
23        ($fn_name:ident, $return_type:ty) => {
24            pub fn $fn_name(&self) -> $return_type {
25                self.appsrc.$fn_name()
26            }
27        };
28
29        ($fn_name:ident, $arg1:ident, $arg1_type:ty, $return_type:ty) => {
30            pub fn $fn_name(&self, $arg1: $arg1_type) -> $return_type {
31                self.appsrc.$fn_name($arg1)
32            }
33        };
34    }
35
36    #[derive(Debug, Default)]
37    struct Position {
38        offset: u64,
39        requested_offset: u64,
40    }
41
42    // The actual data structure that stores our values. This is not accessible
43    // directly from the outside.
44    pub struct ServoSrc {
45        cat: gstreamer::DebugCategory,
46        appsrc: gstreamer_app::AppSrc,
47        srcpad: gstreamer::GhostPad,
48        position: Mutex<Position>,
49        seeking: AtomicBool,
50        size: Mutex<Option<i64>>,
51    }
52
53    impl ServoSrc {
54        pub fn set_size(&self, size: i64) {
55            if self.seeking.load(Ordering::Relaxed) {
56                // We ignore set_size requests if we are seeking.
57                // The size value is temporarily stored so it
58                // is properly set once we are done seeking.
59                *self.size.lock().unwrap() = Some(size);
60                return;
61            }
62
63            if self.appsrc.size() == -1 {
64                self.appsrc.set_size(size);
65            }
66        }
67
68        pub fn set_seek_offset<O: IsA<gstreamer::Object>>(&self, parent: &O, offset: u64) -> bool {
69            let mut pos = self.position.lock().unwrap();
70
71            if pos.offset == offset || pos.requested_offset != 0 {
72                false
73            } else {
74                self.seeking.store(true, Ordering::Relaxed);
75                pos.requested_offset = offset;
76                gstreamer::debug!(
77                    self.cat,
78                    obj = parent,
79                    "seeking to offset: {}",
80                    pos.requested_offset
81                );
82
83                true
84            }
85        }
86
87        pub fn set_seek_done(&self) {
88            self.seeking.store(false, Ordering::Relaxed);
89
90            if let Some(size) = self.size.lock().unwrap().take() &&
91                self.appsrc.size() == -1
92            {
93                self.appsrc.set_size(size);
94            }
95
96            let mut pos = self.position.lock().unwrap();
97            pos.offset = pos.requested_offset;
98            pos.requested_offset = 0;
99        }
100
101        pub fn push_buffer<O: IsA<gstreamer::Object>>(
102            &self,
103            parent: &O,
104            data: Vec<u8>,
105        ) -> Result<gstreamer::FlowSuccess, gstreamer::FlowError> {
106            if self.seeking.load(Ordering::Relaxed) {
107                gstreamer::debug!(self.cat, obj = parent, "seek in progress, ignored data");
108                return Ok(gstreamer::FlowSuccess::Ok);
109            }
110
111            let mut pos = self.position.lock().unwrap(); // will block seeking
112
113            let length = u64::try_from(data.len()).unwrap();
114            let mut data_offset = 0;
115
116            let buffer_starting_offset = pos.offset;
117
118            // @TODO: optimization: update the element's blocksize by
119            // X factor given current length
120
121            pos.offset += length;
122
123            gstreamer::trace!(self.cat, obj = parent, "offset: {}", pos.offset);
124
125            // set the stream size (in bytes) to current offset if
126            // size is lesser than it
127            if let Ok(size) = u64::try_from(self.appsrc.size()) &&
128                pos.offset > size
129            {
130                gstreamer::debug!(
131                    self.cat,
132                    obj = parent,
133                    "Updating internal size from {} to {}",
134                    size,
135                    pos.offset
136                );
137                let new_size = i64::try_from(pos.offset).unwrap();
138                self.appsrc.set_size(new_size);
139            }
140
141            // Split the received vec<> into buffers that are of a
142            // size basesrc suggest. It is important not to push
143            // buffers that are too large, otherwise incorrect
144            // buffering messages can be sent from the pipeline
145            let block_size = 4096;
146            let num_blocks = ((length - data_offset) as f64 / block_size as f64).ceil() as u64;
147
148            gstreamer::log!(
149                self.cat,
150                obj = parent,
151                "Splitting the received vec into {} blocks",
152                num_blocks
153            );
154
155            let mut ret: Result<gstreamer::FlowSuccess, gstreamer::FlowError> =
156                Ok(gstreamer::FlowSuccess::Ok);
157            for i in 0..num_blocks {
158                let start = usize::try_from(i * block_size + data_offset).unwrap();
159                data_offset = 0;
160                let size = usize::try_from(block_size.min(length - start as u64)).unwrap();
161                let end = start + size;
162
163                let buffer_offset = buffer_starting_offset + start as u64;
164                let buffer_offset_end = buffer_offset + size as u64;
165
166                let subdata = Vec::from(&data[start..end]);
167                let mut buffer = gstreamer::Buffer::from_slice(subdata);
168                {
169                    let buffer = buffer.get_mut().unwrap();
170                    buffer.set_offset(buffer_offset);
171                    buffer.set_offset_end(buffer_offset_end);
172                }
173
174                if self.seeking.load(Ordering::Relaxed) {
175                    gstreamer::trace!(
176                        self.cat,
177                        obj = parent,
178                        "stopping buffer appends due to seek"
179                    );
180                    ret = Ok(gstreamer::FlowSuccess::Ok);
181                    break;
182                }
183
184                gstreamer::trace!(self.cat, obj = parent, "Pushing buffer {:?}", buffer);
185
186                ret = self.appsrc.push_buffer(buffer);
187                match ret {
188                    Ok(_) => (),
189                    Err(gstreamer::FlowError::Eos) | Err(gstreamer::FlowError::Flushing) => {
190                        ret = Ok(gstreamer::FlowSuccess::Ok)
191                    },
192                    Err(_) => break,
193                }
194            }
195
196            ret
197        }
198
199        inner_appsrc_proxy!(end_of_stream, Result<gstreamer::FlowSuccess, gstreamer::FlowError>);
200        inner_appsrc_proxy!(set_callbacks, callbacks, gstreamer_app::AppSrcCallbacks, ());
201
202        fn query(&self, pad: &gstreamer::GhostPad, query: &mut gstreamer::QueryRef) -> bool {
203            gstreamer::log!(self.cat, obj = pad, "Handling query {:?}", query);
204
205            // In order to make buffering/downloading work as we want, apart from
206            // setting the appropriate flags on the player playbin,
207            // the source needs to either:
208            //
209            // 1. be an http, mms, etc. scheme
210            // 2. report that it is "bandwidth limited".
211            //
212            // 1. is not straightforward because we are using a servosrc scheme for now.
213            // This may change in the future if we end up handling http/https/data
214            // URIs, which is what WebKit does.
215            //
216            // For 2. we need to make servosrc handle the scheduling properties query
217            // to report that it "is bandwidth limited".
218            let ret = match query.view_mut() {
219                gstreamer::QueryViewMut::Scheduling(ref mut q) => {
220                    let flags = gstreamer::SchedulingFlags::SEQUENTIAL |
221                        gstreamer::SchedulingFlags::BANDWIDTH_LIMITED;
222                    q.set(flags, 1, -1, 0);
223                    q.add_scheduling_modes([gstreamer::PadMode::Push]);
224                    true
225                },
226                _ => gstreamer::Pad::query_default(pad, Some(&*self.obj()), query),
227            };
228
229            if ret {
230                gstreamer::log!(self.cat, obj = pad, "Handled query {:?}", query);
231            } else {
232                gstreamer::info!(self.cat, obj = pad, "Didn't handle query {:?}", query);
233            }
234            ret
235        }
236    }
237
238    // Basic declaration of our type for the GObject type system
239    #[glib::object_subclass]
240    impl ObjectSubclass for ServoSrc {
241        const NAME: &'static str = "ServoSrc";
242        type Type = super::ServoSrc;
243        type ParentType = gstreamer::Bin;
244        type Interfaces = (gstreamer::URIHandler,);
245
246        // Called once at the very beginning of instantiation of each instance and
247        // creates the data structure that contains all our state
248        fn with_class(klass: &Self::Class) -> Self {
249            let app_src = gstreamer::ElementFactory::make("appsrc")
250                .build()
251                .map(|elem| elem.downcast::<gstreamer_app::AppSrc>().unwrap())
252                .expect("Could not create appsrc element");
253
254            let pad_templ = klass.pad_template("src").unwrap();
255            let ghost_pad = gstreamer::GhostPad::builder_from_template(&pad_templ)
256                .query_function(|pad, parent, query| {
257                    ServoSrc::catch_panic_pad_function(
258                        parent,
259                        || false,
260                        |servosrc| servosrc.query(pad, query),
261                    )
262                })
263                .build();
264
265            Self {
266                cat: gstreamer::DebugCategory::new(
267                    "servosrc",
268                    gstreamer::DebugColorFlags::empty(),
269                    Some("Servo source"),
270                ),
271                appsrc: app_src,
272                srcpad: ghost_pad,
273                position: Mutex::new(Default::default()),
274                seeking: AtomicBool::new(false),
275                size: Mutex::new(None),
276            }
277        }
278    }
279
280    // The ObjectImpl trait provides the setters/getters for GObject properties.
281    // Here we need to provide the values that are internally stored back to the
282    // caller, or store whatever new value the caller is providing.
283    //
284    // This maps between the GObject properties and our internal storage of the
285    // corresponding values of the properties.
286    impl ObjectImpl for ServoSrc {
287        // Called right after construction of a new instance
288        fn constructed(&self) {
289            // Call the parent class' ::constructed() implementation first
290            self.parent_constructed();
291
292            self.obj()
293                .add(&self.appsrc)
294                .expect("Could not add appsrc element to bin");
295
296            let target_pad = self.appsrc.static_pad("src");
297            self.srcpad.set_target(target_pad.as_ref()).unwrap();
298
299            self.obj()
300                .add_pad(&self.srcpad)
301                .expect("Could not add source pad to bin");
302
303            self.appsrc.set_caps(None::<&gstreamer::Caps>);
304            self.appsrc.set_max_bytes(MAX_SRC_QUEUE_SIZE);
305            self.appsrc.set_block(false);
306            self.appsrc.set_format(gstreamer::Format::Bytes);
307            self.appsrc
308                .set_stream_type(gstreamer_app::AppStreamType::Seekable);
309
310            self.obj()
311                .set_element_flags(gstreamer::ElementFlags::SOURCE);
312        }
313    }
314
315    impl GstObjectImpl for ServoSrc {}
316
317    // Implementation of gstreamer::Element virtual methods
318    impl ElementImpl for ServoSrc {
319        fn metadata() -> Option<&'static gstreamer::subclass::ElementMetadata> {
320            static ELEMENT_METADATA: LazyLock<gstreamer::subclass::ElementMetadata> =
321                LazyLock::new(|| {
322                    gstreamer::subclass::ElementMetadata::new(
323                        "Servo Media Source",
324                        "Source/Audio/Video",
325                        "Feed player with media data",
326                        "Servo developers",
327                    )
328                });
329
330            Some(&*ELEMENT_METADATA)
331        }
332
333        fn pad_templates() -> &'static [gstreamer::PadTemplate] {
334            static PAD_TEMPLATES: LazyLock<Vec<gstreamer::PadTemplate>> = LazyLock::new(|| {
335                let caps = gstreamer::Caps::new_any();
336                let src_pad_template = gstreamer::PadTemplate::new(
337                    "src",
338                    gstreamer::PadDirection::Src,
339                    gstreamer::PadPresence::Always,
340                    &caps,
341                )
342                .unwrap();
343
344                vec![src_pad_template]
345            });
346
347            PAD_TEMPLATES.as_ref()
348        }
349    }
350
351    // Implementation of gstreamer::Bin virtual methods
352    impl BinImpl for ServoSrc {}
353
354    impl URIHandlerImpl for ServoSrc {
355        const URI_TYPE: gstreamer::URIType = gstreamer::URIType::Src;
356
357        fn protocols() -> &'static [&'static str] {
358            &["servosrc"]
359        }
360
361        fn uri(&self) -> Option<String> {
362            Some("servosrc://".to_string())
363        }
364
365        fn set_uri(&self, uri: &str) -> Result<(), glib::Error> {
366            if let Ok(uri) = Url::parse(uri) &&
367                uri.scheme() == "servosrc"
368            {
369                return Ok(());
370            }
371            Err(glib::Error::new(
372                gstreamer::URIError::BadUri,
373                format!("Invalid URI '{:?}'", uri,).as_str(),
374            ))
375        }
376    }
377}
378
379// Public part of the ServoSrc type. This behaves like a normal
380// GObject binding
381glib::wrapper! {
382    pub struct ServoSrc(ObjectSubclass<imp::ServoSrc>)
383        @extends gstreamer::Bin, gstreamer::Element, gstreamer::Object, @implements gstreamer::URIHandler;
384}
385
386unsafe impl Send for ServoSrc {}
387unsafe impl Sync for ServoSrc {}
388
389impl ServoSrc {
390    pub fn set_size(&self, size: i64) {
391        self.imp().set_size(size);
392    }
393
394    pub fn set_seek_offset(&self, offset: u64) -> bool {
395        self.imp().set_seek_offset(self, offset)
396    }
397
398    pub fn set_seek_done(&self) {
399        self.imp().set_seek_done();
400    }
401
402    pub fn push_buffer(
403        &self,
404        data: Vec<u8>,
405    ) -> Result<gstreamer::FlowSuccess, gstreamer::FlowError> {
406        self.imp().push_buffer(self, data)
407    }
408
409    pub fn push_end_of_stream(&self) -> Result<gstreamer::FlowSuccess, gstreamer::FlowError> {
410        self.imp().end_of_stream()
411    }
412
413    pub fn set_callbacks(&self, callbacks: gstreamer_app::AppSrcCallbacks) {
414        self.imp().set_callbacks(callbacks)
415    }
416}
417
418// Registers the type for our element, and then registers in GStreamer
419// under the name "servosrc" for being able to instantiate it via e.g.
420// gstreamer::ElementFactory::make().
421pub fn register_servo_src() -> Result<(), glib::BoolError> {
422    gstreamer::Element::register(
423        None,
424        "servosrc",
425        gstreamer::Rank::NONE,
426        ServoSrc::static_type(),
427    )
428}