servo_media_gstreamer/
source.rs

1use glib::subclass::prelude::*;
2use gst::prelude::*;
3use gst::subclass::prelude::*;
4use once_cell::sync::Lazy;
5use std::convert::TryFrom;
6use std::sync::atomic::{AtomicBool, Ordering};
7use std::sync::Mutex;
8use url::Url;
9
10const MAX_SRC_QUEUE_SIZE: u64 = 50 * 1024 * 1024; // 50 MB.
11
12// Implementation sub-module of the GObject
13mod imp {
14    use super::*;
15
16    macro_rules! inner_appsrc_proxy {
17        ($fn_name:ident, $return_type:ty) => {
18            pub fn $fn_name(&self) -> $return_type {
19                self.appsrc.$fn_name()
20            }
21        };
22
23        ($fn_name:ident, $arg1:ident, $arg1_type:ty, $return_type:ty) => {
24            pub fn $fn_name(&self, $arg1: $arg1_type) -> $return_type {
25                self.appsrc.$fn_name($arg1)
26            }
27        };
28    }
29
30    #[derive(Debug, Default)]
31    struct Position {
32        offset: u64,
33        requested_offset: u64,
34    }
35
36    // The actual data structure that stores our values. This is not accessible
37    // directly from the outside.
38    pub struct ServoSrc {
39        cat: gst::DebugCategory,
40        appsrc: gst_app::AppSrc,
41        srcpad: gst::GhostPad,
42        position: Mutex<Position>,
43        seeking: AtomicBool,
44        size: Mutex<Option<i64>>,
45    }
46
47    impl ServoSrc {
48        pub fn set_size(&self, size: i64) {
49            if self.seeking.load(Ordering::Relaxed) {
50                // We ignore set_size requests if we are seeking.
51                // The size value is temporarily stored so it
52                // is properly set once we are done seeking.
53                *self.size.lock().unwrap() = Some(size);
54                return;
55            }
56
57            if self.appsrc.size() == -1 {
58                self.appsrc.set_size(size);
59            }
60        }
61
62        pub fn set_seek_offset<O: IsA<gst::Object>>(&self, parent: &O, offset: u64) -> bool {
63            let mut pos = self.position.lock().unwrap();
64
65            if pos.offset == offset || pos.requested_offset != 0 {
66                false
67            } else {
68                self.seeking.store(true, Ordering::Relaxed);
69                pos.requested_offset = offset;
70                gst::debug!(
71                    self.cat,
72                    obj = parent,
73                    "seeking to offset: {}",
74                    pos.requested_offset
75                );
76
77                true
78            }
79        }
80
81        pub fn set_seek_done(&self) {
82            self.seeking.store(false, Ordering::Relaxed);
83
84            if let Some(size) = self.size.lock().unwrap().take() {
85                if self.appsrc.size() == -1 {
86                    self.appsrc.set_size(size);
87                }
88            }
89
90            let mut pos = self.position.lock().unwrap();
91            pos.offset = pos.requested_offset;
92            pos.requested_offset = 0;
93        }
94
95        pub fn push_buffer<O: IsA<gst::Object>>(
96            &self,
97            parent: &O,
98            data: Vec<u8>,
99        ) -> Result<gst::FlowSuccess, gst::FlowError> {
100            if self.seeking.load(Ordering::Relaxed) {
101                gst::debug!(self.cat, obj = parent, "seek in progress, ignored data");
102                return Ok(gst::FlowSuccess::Ok);
103            }
104
105            let mut pos = self.position.lock().unwrap(); // will block seeking
106
107            let length = u64::try_from(data.len()).unwrap();
108            let mut data_offset = 0;
109
110            let buffer_starting_offset = pos.offset;
111
112            // @TODO: optimization: update the element's blocksize by
113            // X factor given current length
114
115            pos.offset += length;
116
117            gst::trace!(self.cat, obj = parent, "offset: {}", pos.offset);
118
119            // set the stream size (in bytes) to current offset if
120            // size is lesser than it
121            let _ = u64::try_from(self.appsrc.size()).and_then(|size| {
122                if pos.offset > size {
123                    gst::debug!(
124                        self.cat,
125                        obj = parent,
126                        "Updating internal size from {} to {}",
127                        size,
128                        pos.offset
129                    );
130                    let new_size = i64::try_from(pos.offset).unwrap();
131                    self.appsrc.set_size(new_size);
132                }
133                Ok(())
134            });
135
136            // Split the received vec<> into buffers that are of a
137            // size basesrc suggest. It is important not to push
138            // buffers that are too large, otherwise incorrect
139            // buffering messages can be sent from the pipeline
140            let block_size = 4096;
141            let num_blocks = ((length - data_offset) as f64 / block_size as f64).ceil() as u64;
142
143            gst::log!(
144                self.cat,
145                obj = parent,
146                "Splitting the received vec into {} blocks",
147                num_blocks
148            );
149
150            let mut ret: Result<gst::FlowSuccess, gst::FlowError> = Ok(gst::FlowSuccess::Ok);
151            for i in 0..num_blocks {
152                let start = usize::try_from(i * block_size + data_offset).unwrap();
153                data_offset = 0;
154                let size = usize::try_from(block_size.min((length - start as u64).into())).unwrap();
155                let end = start + size;
156
157                let buffer_offset = buffer_starting_offset + start as u64;
158                let buffer_offset_end = buffer_offset + size as u64;
159
160                let subdata = Vec::from(&data[start..end]);
161                let mut buffer = gst::Buffer::from_slice(subdata);
162                {
163                    let buffer = buffer.get_mut().unwrap();
164                    buffer.set_offset(buffer_offset);
165                    buffer.set_offset_end(buffer_offset_end);
166                }
167
168                if self.seeking.load(Ordering::Relaxed) {
169                    gst::trace!(self.cat, obj = parent, "stopping buffer appends due to seek");
170                    ret = Ok(gst::FlowSuccess::Ok);
171                    break;
172                }
173
174                gst::trace!(self.cat, obj = parent, "Pushing buffer {:?}", buffer);
175
176                ret = self.appsrc.push_buffer(buffer);
177                match ret {
178                    Ok(_) => (),
179                    Err(gst::FlowError::Eos) | Err(gst::FlowError::Flushing) => {
180                        ret = Ok(gst::FlowSuccess::Ok)
181                    }
182                    Err(_) => break,
183                }
184            }
185
186            ret
187        }
188
189        inner_appsrc_proxy!(end_of_stream, Result<gst::FlowSuccess, gst::FlowError>);
190        inner_appsrc_proxy!(set_callbacks, callbacks, gst_app::AppSrcCallbacks, ());
191
192        fn query(&self, pad: &gst::GhostPad, query: &mut gst::QueryRef) -> bool {
193            gst::log!(self.cat, obj = pad, "Handling query {:?}", query);
194
195            // In order to make buffering/downloading work as we want, apart from
196            // setting the appropriate flags on the player playbin,
197            // the source needs to either:
198            //
199            // 1. be an http, mms, etc. scheme
200            // 2. report that it is "bandwidth limited".
201            //
202            // 1. is not straightforward because we are using a servosrc scheme for now.
203            // This may change in the future if we end up handling http/https/data
204            // URIs, which is what WebKit does.
205            //
206            // For 2. we need to make servosrc handle the scheduling properties query
207            // to report that it "is bandwidth limited".
208            let ret = match query.view_mut() {
209                gst::QueryViewMut::Scheduling(ref mut q) => {
210                    let flags =
211                        gst::SchedulingFlags::SEQUENTIAL | gst::SchedulingFlags::BANDWIDTH_LIMITED;
212                    q.set(flags, 1, -1, 0);
213                    q.add_scheduling_modes(&[gst::PadMode::Push]);
214                    true
215                }
216                _ => gst::Pad::query_default(pad, Some(&*self.obj()), query),
217            };
218
219            if ret {
220                gst::log!(self.cat, obj = pad, "Handled query {:?}", query);
221            } else {
222                gst::info!(self.cat, obj = pad, "Didn't handle query {:?}", query);
223            }
224            ret
225        }
226    }
227
228    // Basic declaration of our type for the GObject type system
229    #[glib::object_subclass]
230    impl ObjectSubclass for ServoSrc {
231        const NAME: &'static str = "ServoSrc";
232        type Type = super::ServoSrc;
233        type ParentType = gst::Bin;
234        type Interfaces = (gst::URIHandler,);
235
236        // Called once at the very beginning of instantiation of each instance and
237        // creates the data structure that contains all our state
238        fn with_class(klass: &Self::Class) -> Self {
239            let app_src = gst::ElementFactory::make("appsrc")
240                .build()
241                .map(|elem| elem.downcast::<gst_app::AppSrc>().unwrap())
242                .expect("Could not create appsrc element");
243
244            let pad_templ = klass.pad_template("src").unwrap();
245            let ghost_pad = gst::GhostPad::builder_from_template(&pad_templ)
246                .query_function(|pad, parent, query| {
247                    ServoSrc::catch_panic_pad_function(
248                        parent,
249                        || false,
250                        |servosrc| servosrc.query(pad, query),
251                    )
252                })
253                .build();
254
255            Self {
256                cat: gst::DebugCategory::new(
257                    "servosrc",
258                    gst::DebugColorFlags::empty(),
259                    Some("Servo source"),
260                ),
261                appsrc: app_src,
262                srcpad: ghost_pad,
263                position: Mutex::new(Default::default()),
264                seeking: AtomicBool::new(false),
265                size: Mutex::new(None),
266            }
267        }
268    }
269
270    // The ObjectImpl trait provides the setters/getters for GObject properties.
271    // Here we need to provide the values that are internally stored back to the
272    // caller, or store whatever new value the caller is providing.
273    //
274    // This maps between the GObject properties and our internal storage of the
275    // corresponding values of the properties.
276    impl ObjectImpl for ServoSrc {
277        // Called right after construction of a new instance
278        fn constructed(&self) {
279            // Call the parent class' ::constructed() implementation first
280            self.parent_constructed();
281
282            self.obj()
283                .add(&self.appsrc)
284                .expect("Could not add appsrc element to bin");
285
286            let target_pad = self.appsrc.static_pad("src");
287            self.srcpad.set_target(target_pad.as_ref()).unwrap();
288
289            self.obj()
290                .add_pad(&self.srcpad)
291                .expect("Could not add source pad to bin");
292
293            self.appsrc.set_caps(None::<&gst::Caps>);
294            self.appsrc.set_max_bytes(MAX_SRC_QUEUE_SIZE);
295            self.appsrc.set_block(false);
296            self.appsrc.set_format(gst::Format::Bytes);
297            self.appsrc
298                .set_stream_type(gst_app::AppStreamType::Seekable);
299
300            self.obj().set_element_flags(gst::ElementFlags::SOURCE);
301        }
302    }
303
304    impl GstObjectImpl for ServoSrc {}
305
306    // Implementation of gst::Element virtual methods
307    impl ElementImpl for ServoSrc {
308        fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
309            static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| {
310                gst::subclass::ElementMetadata::new(
311                    "Servo Media Source",
312                    "Source/Audio/Video",
313                    "Feed player with media data",
314                    "Servo developers",
315                )
316            });
317
318            Some(&*ELEMENT_METADATA)
319        }
320
321        fn pad_templates() -> &'static [gst::PadTemplate] {
322            static PAD_TEMPLATES: Lazy<Vec<gst::PadTemplate>> = Lazy::new(|| {
323                let caps = gst::Caps::new_any();
324                let src_pad_template = gst::PadTemplate::new(
325                    "src",
326                    gst::PadDirection::Src,
327                    gst::PadPresence::Always,
328                    &caps,
329                )
330                .unwrap();
331
332                vec![src_pad_template]
333            });
334
335            PAD_TEMPLATES.as_ref()
336        }
337    }
338
339    // Implementation of gst::Bin virtual methods
340    impl BinImpl for ServoSrc {}
341
342    impl URIHandlerImpl for ServoSrc {
343        const URI_TYPE: gst::URIType = gst::URIType::Src;
344
345        fn protocols() -> &'static [&'static str] {
346            &["servosrc"]
347        }
348
349        fn uri(&self) -> Option<String> {
350            Some("servosrc://".to_string())
351        }
352
353        fn set_uri(&self, uri: &str) -> Result<(), glib::Error> {
354            if let Ok(uri) = Url::parse(uri) {
355                if uri.scheme() == "servosrc" {
356                    return Ok(());
357                }
358            }
359            Err(glib::Error::new(
360                gst::URIError::BadUri,
361                format!("Invalid URI '{:?}'", uri,).as_str(),
362            ))
363        }
364    }
365}
366
367// Public part of the ServoSrc type. This behaves like a normal
368// GObject binding
369glib::wrapper! {
370    pub struct ServoSrc(ObjectSubclass<imp::ServoSrc>)
371        @extends gst::Bin, gst::Element, gst::Object, @implements gst::URIHandler;
372}
373
374unsafe impl Send for ServoSrc {}
375unsafe impl Sync for ServoSrc {}
376
377impl ServoSrc {
378    pub fn set_size(&self, size: i64) {
379        self.imp().set_size(size);
380    }
381
382    pub fn set_seek_offset(&self, offset: u64) -> bool {
383        self.imp().set_seek_offset(self, offset)
384    }
385
386    pub fn set_seek_done(&self) {
387        self.imp().set_seek_done();
388    }
389
390    pub fn push_buffer(&self, data: Vec<u8>) -> Result<gst::FlowSuccess, gst::FlowError> {
391        self.imp().push_buffer(self, data)
392    }
393
394    pub fn push_end_of_stream(&self) -> Result<gst::FlowSuccess, gst::FlowError> {
395        self.imp().end_of_stream()
396    }
397
398    pub fn set_callbacks(&self, callbacks: gst_app::AppSrcCallbacks) {
399        self.imp().set_callbacks(callbacks)
400    }
401}
402
403// Registers the type for our element, and then registers in GStreamer
404// under the name "servosrc" for being able to instantiate it via e.g.
405// gst::ElementFactory::make().
406pub fn register_servo_src() -> Result<(), glib::BoolError> {
407    gst::Element::register(None, "servosrc", gst::Rank::NONE, ServoSrc::static_type())
408}