1use glib::subclass::prelude::*;
2use gst::prelude::*;
3use gst::subclass::prelude::*;
4use once_cell::sync::Lazy;
5use std::convert::TryFrom;
6use std::sync::atomic::{AtomicBool, Ordering};
7use std::sync::Mutex;
8use url::Url;
9
10const MAX_SRC_QUEUE_SIZE: u64 = 50 * 1024 * 1024; mod imp {
14 use super::*;
15
16 macro_rules! inner_appsrc_proxy {
17 ($fn_name:ident, $return_type:ty) => {
18 pub fn $fn_name(&self) -> $return_type {
19 self.appsrc.$fn_name()
20 }
21 };
22
23 ($fn_name:ident, $arg1:ident, $arg1_type:ty, $return_type:ty) => {
24 pub fn $fn_name(&self, $arg1: $arg1_type) -> $return_type {
25 self.appsrc.$fn_name($arg1)
26 }
27 };
28 }
29
30 #[derive(Debug, Default)]
31 struct Position {
32 offset: u64,
33 requested_offset: u64,
34 }
35
36 pub struct ServoSrc {
39 cat: gst::DebugCategory,
40 appsrc: gst_app::AppSrc,
41 srcpad: gst::GhostPad,
42 position: Mutex<Position>,
43 seeking: AtomicBool,
44 size: Mutex<Option<i64>>,
45 }
46
47 impl ServoSrc {
48 pub fn set_size(&self, size: i64) {
49 if self.seeking.load(Ordering::Relaxed) {
50 *self.size.lock().unwrap() = Some(size);
54 return;
55 }
56
57 if self.appsrc.size() == -1 {
58 self.appsrc.set_size(size);
59 }
60 }
61
62 pub fn set_seek_offset<O: IsA<gst::Object>>(&self, parent: &O, offset: u64) -> bool {
63 let mut pos = self.position.lock().unwrap();
64
65 if pos.offset == offset || pos.requested_offset != 0 {
66 false
67 } else {
68 self.seeking.store(true, Ordering::Relaxed);
69 pos.requested_offset = offset;
70 gst::debug!(
71 self.cat,
72 obj = parent,
73 "seeking to offset: {}",
74 pos.requested_offset
75 );
76
77 true
78 }
79 }
80
81 pub fn set_seek_done(&self) {
82 self.seeking.store(false, Ordering::Relaxed);
83
84 if let Some(size) = self.size.lock().unwrap().take() {
85 if self.appsrc.size() == -1 {
86 self.appsrc.set_size(size);
87 }
88 }
89
90 let mut pos = self.position.lock().unwrap();
91 pos.offset = pos.requested_offset;
92 pos.requested_offset = 0;
93 }
94
95 pub fn push_buffer<O: IsA<gst::Object>>(
96 &self,
97 parent: &O,
98 data: Vec<u8>,
99 ) -> Result<gst::FlowSuccess, gst::FlowError> {
100 if self.seeking.load(Ordering::Relaxed) {
101 gst::debug!(self.cat, obj = parent, "seek in progress, ignored data");
102 return Ok(gst::FlowSuccess::Ok);
103 }
104
105 let mut pos = self.position.lock().unwrap(); let length = u64::try_from(data.len()).unwrap();
108 let mut data_offset = 0;
109
110 let buffer_starting_offset = pos.offset;
111
112 pos.offset += length;
116
117 gst::trace!(self.cat, obj = parent, "offset: {}", pos.offset);
118
119 let _ = u64::try_from(self.appsrc.size()).and_then(|size| {
122 if pos.offset > size {
123 gst::debug!(
124 self.cat,
125 obj = parent,
126 "Updating internal size from {} to {}",
127 size,
128 pos.offset
129 );
130 let new_size = i64::try_from(pos.offset).unwrap();
131 self.appsrc.set_size(new_size);
132 }
133 Ok(())
134 });
135
136 let block_size = 4096;
141 let num_blocks = ((length - data_offset) as f64 / block_size as f64).ceil() as u64;
142
143 gst::log!(
144 self.cat,
145 obj = parent,
146 "Splitting the received vec into {} blocks",
147 num_blocks
148 );
149
150 let mut ret: Result<gst::FlowSuccess, gst::FlowError> = Ok(gst::FlowSuccess::Ok);
151 for i in 0..num_blocks {
152 let start = usize::try_from(i * block_size + data_offset).unwrap();
153 data_offset = 0;
154 let size = usize::try_from(block_size.min((length - start as u64).into())).unwrap();
155 let end = start + size;
156
157 let buffer_offset = buffer_starting_offset + start as u64;
158 let buffer_offset_end = buffer_offset + size as u64;
159
160 let subdata = Vec::from(&data[start..end]);
161 let mut buffer = gst::Buffer::from_slice(subdata);
162 {
163 let buffer = buffer.get_mut().unwrap();
164 buffer.set_offset(buffer_offset);
165 buffer.set_offset_end(buffer_offset_end);
166 }
167
168 if self.seeking.load(Ordering::Relaxed) {
169 gst::trace!(self.cat, obj = parent, "stopping buffer appends due to seek");
170 ret = Ok(gst::FlowSuccess::Ok);
171 break;
172 }
173
174 gst::trace!(self.cat, obj = parent, "Pushing buffer {:?}", buffer);
175
176 ret = self.appsrc.push_buffer(buffer);
177 match ret {
178 Ok(_) => (),
179 Err(gst::FlowError::Eos) | Err(gst::FlowError::Flushing) => {
180 ret = Ok(gst::FlowSuccess::Ok)
181 }
182 Err(_) => break,
183 }
184 }
185
186 ret
187 }
188
189 inner_appsrc_proxy!(end_of_stream, Result<gst::FlowSuccess, gst::FlowError>);
190 inner_appsrc_proxy!(set_callbacks, callbacks, gst_app::AppSrcCallbacks, ());
191
192 fn query(&self, pad: &gst::GhostPad, query: &mut gst::QueryRef) -> bool {
193 gst::log!(self.cat, obj = pad, "Handling query {:?}", query);
194
195 let ret = match query.view_mut() {
209 gst::QueryViewMut::Scheduling(ref mut q) => {
210 let flags =
211 gst::SchedulingFlags::SEQUENTIAL | gst::SchedulingFlags::BANDWIDTH_LIMITED;
212 q.set(flags, 1, -1, 0);
213 q.add_scheduling_modes(&[gst::PadMode::Push]);
214 true
215 }
216 _ => gst::Pad::query_default(pad, Some(&*self.obj()), query),
217 };
218
219 if ret {
220 gst::log!(self.cat, obj = pad, "Handled query {:?}", query);
221 } else {
222 gst::info!(self.cat, obj = pad, "Didn't handle query {:?}", query);
223 }
224 ret
225 }
226 }
227
228 #[glib::object_subclass]
230 impl ObjectSubclass for ServoSrc {
231 const NAME: &'static str = "ServoSrc";
232 type Type = super::ServoSrc;
233 type ParentType = gst::Bin;
234 type Interfaces = (gst::URIHandler,);
235
236 fn with_class(klass: &Self::Class) -> Self {
239 let app_src = gst::ElementFactory::make("appsrc")
240 .build()
241 .map(|elem| elem.downcast::<gst_app::AppSrc>().unwrap())
242 .expect("Could not create appsrc element");
243
244 let pad_templ = klass.pad_template("src").unwrap();
245 let ghost_pad = gst::GhostPad::builder_from_template(&pad_templ)
246 .query_function(|pad, parent, query| {
247 ServoSrc::catch_panic_pad_function(
248 parent,
249 || false,
250 |servosrc| servosrc.query(pad, query),
251 )
252 })
253 .build();
254
255 Self {
256 cat: gst::DebugCategory::new(
257 "servosrc",
258 gst::DebugColorFlags::empty(),
259 Some("Servo source"),
260 ),
261 appsrc: app_src,
262 srcpad: ghost_pad,
263 position: Mutex::new(Default::default()),
264 seeking: AtomicBool::new(false),
265 size: Mutex::new(None),
266 }
267 }
268 }
269
270 impl ObjectImpl for ServoSrc {
277 fn constructed(&self) {
279 self.parent_constructed();
281
282 self.obj()
283 .add(&self.appsrc)
284 .expect("Could not add appsrc element to bin");
285
286 let target_pad = self.appsrc.static_pad("src");
287 self.srcpad.set_target(target_pad.as_ref()).unwrap();
288
289 self.obj()
290 .add_pad(&self.srcpad)
291 .expect("Could not add source pad to bin");
292
293 self.appsrc.set_caps(None::<&gst::Caps>);
294 self.appsrc.set_max_bytes(MAX_SRC_QUEUE_SIZE);
295 self.appsrc.set_block(false);
296 self.appsrc.set_format(gst::Format::Bytes);
297 self.appsrc
298 .set_stream_type(gst_app::AppStreamType::Seekable);
299
300 self.obj().set_element_flags(gst::ElementFlags::SOURCE);
301 }
302 }
303
304 impl GstObjectImpl for ServoSrc {}
305
306 impl ElementImpl for ServoSrc {
308 fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
309 static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| {
310 gst::subclass::ElementMetadata::new(
311 "Servo Media Source",
312 "Source/Audio/Video",
313 "Feed player with media data",
314 "Servo developers",
315 )
316 });
317
318 Some(&*ELEMENT_METADATA)
319 }
320
321 fn pad_templates() -> &'static [gst::PadTemplate] {
322 static PAD_TEMPLATES: Lazy<Vec<gst::PadTemplate>> = Lazy::new(|| {
323 let caps = gst::Caps::new_any();
324 let src_pad_template = gst::PadTemplate::new(
325 "src",
326 gst::PadDirection::Src,
327 gst::PadPresence::Always,
328 &caps,
329 )
330 .unwrap();
331
332 vec![src_pad_template]
333 });
334
335 PAD_TEMPLATES.as_ref()
336 }
337 }
338
339 impl BinImpl for ServoSrc {}
341
342 impl URIHandlerImpl for ServoSrc {
343 const URI_TYPE: gst::URIType = gst::URIType::Src;
344
345 fn protocols() -> &'static [&'static str] {
346 &["servosrc"]
347 }
348
349 fn uri(&self) -> Option<String> {
350 Some("servosrc://".to_string())
351 }
352
353 fn set_uri(&self, uri: &str) -> Result<(), glib::Error> {
354 if let Ok(uri) = Url::parse(uri) {
355 if uri.scheme() == "servosrc" {
356 return Ok(());
357 }
358 }
359 Err(glib::Error::new(
360 gst::URIError::BadUri,
361 format!("Invalid URI '{:?}'", uri,).as_str(),
362 ))
363 }
364 }
365}
366
367glib::wrapper! {
370 pub struct ServoSrc(ObjectSubclass<imp::ServoSrc>)
371 @extends gst::Bin, gst::Element, gst::Object, @implements gst::URIHandler;
372}
373
374unsafe impl Send for ServoSrc {}
375unsafe impl Sync for ServoSrc {}
376
377impl ServoSrc {
378 pub fn set_size(&self, size: i64) {
379 self.imp().set_size(size);
380 }
381
382 pub fn set_seek_offset(&self, offset: u64) -> bool {
383 self.imp().set_seek_offset(self, offset)
384 }
385
386 pub fn set_seek_done(&self) {
387 self.imp().set_seek_done();
388 }
389
390 pub fn push_buffer(&self, data: Vec<u8>) -> Result<gst::FlowSuccess, gst::FlowError> {
391 self.imp().push_buffer(self, data)
392 }
393
394 pub fn push_end_of_stream(&self) -> Result<gst::FlowSuccess, gst::FlowError> {
395 self.imp().end_of_stream()
396 }
397
398 pub fn set_callbacks(&self, callbacks: gst_app::AppSrcCallbacks) {
399 self.imp().set_callbacks(callbacks)
400 }
401}
402
403pub fn register_servo_src() -> Result<(), glib::BoolError> {
407 gst::Element::register(None, "servosrc", gst::Rank::NONE, ServoSrc::static_type())
408}