1use glib::subclass::prelude::*;
2use gst::prelude::*;
3use gst::subclass::prelude::*;
4use once_cell::sync::Lazy;
5use std::convert::TryFrom;
6use std::sync::atomic::{AtomicBool, Ordering};
7use std::sync::Mutex;
8use url::Url;
9
10const MAX_SRC_QUEUE_SIZE: u64 = 50 * 1024 * 1024; mod imp {
14 use super::*;
15
16 macro_rules! inner_appsrc_proxy {
17 ($fn_name:ident, $return_type:ty) => {
18 pub fn $fn_name(&self) -> $return_type {
19 self.appsrc.$fn_name()
20 }
21 };
22
23 ($fn_name:ident, $arg1:ident, $arg1_type:ty, $return_type:ty) => {
24 pub fn $fn_name(&self, $arg1: $arg1_type) -> $return_type {
25 self.appsrc.$fn_name($arg1)
26 }
27 };
28 }
29
30 #[derive(Debug, Default)]
31 struct Position {
32 offset: u64,
33 requested_offset: u64,
34 }
35
36 pub struct ServoSrc {
39 cat: gst::DebugCategory,
40 appsrc: gst_app::AppSrc,
41 srcpad: gst::GhostPad,
42 position: Mutex<Position>,
43 seeking: AtomicBool,
44 size: Mutex<Option<i64>>,
45 }
46
47 impl ServoSrc {
48 pub fn set_size(&self, size: i64) {
49 if self.seeking.load(Ordering::Relaxed) {
50 *self.size.lock().unwrap() = Some(size);
54 return;
55 }
56
57 if self.appsrc.size() == -1 {
58 self.appsrc.set_size(size);
59 }
60 }
61
62 pub fn set_seek_offset<O: IsA<gst::Object>>(&self, parent: &O, offset: u64) -> bool {
63 let mut pos = self.position.lock().unwrap();
64
65 if pos.offset == offset || pos.requested_offset != 0 {
66 false
67 } else {
68 self.seeking.store(true, Ordering::Relaxed);
69 pos.requested_offset = offset;
70 gst::debug!(
71 self.cat,
72 obj = parent,
73 "seeking to offset: {}",
74 pos.requested_offset
75 );
76
77 true
78 }
79 }
80
81 pub fn set_seek_done(&self) {
82 self.seeking.store(false, Ordering::Relaxed);
83
84 if let Some(size) = self.size.lock().unwrap().take() {
85 if self.appsrc.size() == -1 {
86 self.appsrc.set_size(size);
87 }
88 }
89
90 let mut pos = self.position.lock().unwrap();
91 pos.offset = pos.requested_offset;
92 pos.requested_offset = 0;
93 }
94
95 pub fn push_buffer<O: IsA<gst::Object>>(
96 &self,
97 parent: &O,
98 data: Vec<u8>,
99 ) -> Result<gst::FlowSuccess, gst::FlowError> {
100 if self.seeking.load(Ordering::Relaxed) {
101 gst::debug!(self.cat, obj = parent, "seek in progress, ignored data");
102 return Ok(gst::FlowSuccess::Ok);
103 }
104
105 let mut pos = self.position.lock().unwrap(); let length = u64::try_from(data.len()).unwrap();
108 let mut data_offset = 0;
109
110 let buffer_starting_offset = pos.offset;
111
112 pos.offset += length;
116
117 gst::trace!(self.cat, obj = parent, "offset: {}", pos.offset);
118
119 let _ = u64::try_from(self.appsrc.size()).and_then(|size| {
122 if pos.offset > size {
123 gst::debug!(
124 self.cat,
125 obj = parent,
126 "Updating internal size from {} to {}",
127 size,
128 pos.offset
129 );
130 let new_size = i64::try_from(pos.offset).unwrap();
131 self.appsrc.set_size(new_size);
132 }
133 Ok(())
134 });
135
136 let block_size = 4096;
141 let num_blocks = ((length - data_offset) as f64 / block_size as f64).ceil() as u64;
142
143 gst::log!(
144 self.cat,
145 obj = parent,
146 "Splitting the received vec into {} blocks",
147 num_blocks
148 );
149
150 let mut ret: Result<gst::FlowSuccess, gst::FlowError> = Ok(gst::FlowSuccess::Ok);
151 for i in 0..num_blocks {
152 let start = usize::try_from(i * block_size + data_offset).unwrap();
153 data_offset = 0;
154 let size = usize::try_from(block_size.min((length - start as u64).into())).unwrap();
155 let end = start + size;
156
157 let buffer_offset = buffer_starting_offset + start as u64;
158 let buffer_offset_end = buffer_offset + size as u64;
159
160 let subdata = Vec::from(&data[start..end]);
161 let mut buffer = gst::Buffer::from_slice(subdata);
162 {
163 let buffer = buffer.get_mut().unwrap();
164 buffer.set_offset(buffer_offset);
165 buffer.set_offset_end(buffer_offset_end);
166 }
167
168 if self.seeking.load(Ordering::Relaxed) {
169 gst::trace!(
170 self.cat,
171 obj = parent,
172 "stopping buffer appends due to seek"
173 );
174 ret = Ok(gst::FlowSuccess::Ok);
175 break;
176 }
177
178 gst::trace!(self.cat, obj = parent, "Pushing buffer {:?}", buffer);
179
180 ret = self.appsrc.push_buffer(buffer);
181 match ret {
182 Ok(_) => (),
183 Err(gst::FlowError::Eos) | Err(gst::FlowError::Flushing) => {
184 ret = Ok(gst::FlowSuccess::Ok)
185 },
186 Err(_) => break,
187 }
188 }
189
190 ret
191 }
192
193 inner_appsrc_proxy!(end_of_stream, Result<gst::FlowSuccess, gst::FlowError>);
194 inner_appsrc_proxy!(set_callbacks, callbacks, gst_app::AppSrcCallbacks, ());
195
196 fn query(&self, pad: &gst::GhostPad, query: &mut gst::QueryRef) -> bool {
197 gst::log!(self.cat, obj = pad, "Handling query {:?}", query);
198
199 let ret = match query.view_mut() {
213 gst::QueryViewMut::Scheduling(ref mut q) => {
214 let flags =
215 gst::SchedulingFlags::SEQUENTIAL | gst::SchedulingFlags::BANDWIDTH_LIMITED;
216 q.set(flags, 1, -1, 0);
217 q.add_scheduling_modes([gst::PadMode::Push]);
218 true
219 },
220 _ => gst::Pad::query_default(pad, Some(&*self.obj()), query),
221 };
222
223 if ret {
224 gst::log!(self.cat, obj = pad, "Handled query {:?}", query);
225 } else {
226 gst::info!(self.cat, obj = pad, "Didn't handle query {:?}", query);
227 }
228 ret
229 }
230 }
231
232 #[glib::object_subclass]
234 impl ObjectSubclass for ServoSrc {
235 const NAME: &'static str = "ServoSrc";
236 type Type = super::ServoSrc;
237 type ParentType = gst::Bin;
238 type Interfaces = (gst::URIHandler,);
239
240 fn with_class(klass: &Self::Class) -> Self {
243 let app_src = gst::ElementFactory::make("appsrc")
244 .build()
245 .map(|elem| elem.downcast::<gst_app::AppSrc>().unwrap())
246 .expect("Could not create appsrc element");
247
248 let pad_templ = klass.pad_template("src").unwrap();
249 let ghost_pad = gst::GhostPad::builder_from_template(&pad_templ)
250 .query_function(|pad, parent, query| {
251 ServoSrc::catch_panic_pad_function(
252 parent,
253 || false,
254 |servosrc| servosrc.query(pad, query),
255 )
256 })
257 .build();
258
259 Self {
260 cat: gst::DebugCategory::new(
261 "servosrc",
262 gst::DebugColorFlags::empty(),
263 Some("Servo source"),
264 ),
265 appsrc: app_src,
266 srcpad: ghost_pad,
267 position: Mutex::new(Default::default()),
268 seeking: AtomicBool::new(false),
269 size: Mutex::new(None),
270 }
271 }
272 }
273
274 impl ObjectImpl for ServoSrc {
281 fn constructed(&self) {
283 self.parent_constructed();
285
286 self.obj()
287 .add(&self.appsrc)
288 .expect("Could not add appsrc element to bin");
289
290 let target_pad = self.appsrc.static_pad("src");
291 self.srcpad.set_target(target_pad.as_ref()).unwrap();
292
293 self.obj()
294 .add_pad(&self.srcpad)
295 .expect("Could not add source pad to bin");
296
297 self.appsrc.set_caps(None::<&gst::Caps>);
298 self.appsrc.set_max_bytes(MAX_SRC_QUEUE_SIZE);
299 self.appsrc.set_block(false);
300 self.appsrc.set_format(gst::Format::Bytes);
301 self.appsrc
302 .set_stream_type(gst_app::AppStreamType::Seekable);
303
304 self.obj().set_element_flags(gst::ElementFlags::SOURCE);
305 }
306 }
307
308 impl GstObjectImpl for ServoSrc {}
309
310 impl ElementImpl for ServoSrc {
312 fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
313 static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| {
314 gst::subclass::ElementMetadata::new(
315 "Servo Media Source",
316 "Source/Audio/Video",
317 "Feed player with media data",
318 "Servo developers",
319 )
320 });
321
322 Some(&*ELEMENT_METADATA)
323 }
324
325 fn pad_templates() -> &'static [gst::PadTemplate] {
326 static PAD_TEMPLATES: Lazy<Vec<gst::PadTemplate>> = Lazy::new(|| {
327 let caps = gst::Caps::new_any();
328 let src_pad_template = gst::PadTemplate::new(
329 "src",
330 gst::PadDirection::Src,
331 gst::PadPresence::Always,
332 &caps,
333 )
334 .unwrap();
335
336 vec![src_pad_template]
337 });
338
339 PAD_TEMPLATES.as_ref()
340 }
341 }
342
343 impl BinImpl for ServoSrc {}
345
346 impl URIHandlerImpl for ServoSrc {
347 const URI_TYPE: gst::URIType = gst::URIType::Src;
348
349 fn protocols() -> &'static [&'static str] {
350 &["servosrc"]
351 }
352
353 fn uri(&self) -> Option<String> {
354 Some("servosrc://".to_string())
355 }
356
357 fn set_uri(&self, uri: &str) -> Result<(), glib::Error> {
358 if let Ok(uri) = Url::parse(uri) {
359 if uri.scheme() == "servosrc" {
360 return Ok(());
361 }
362 }
363 Err(glib::Error::new(
364 gst::URIError::BadUri,
365 format!("Invalid URI '{:?}'", uri,).as_str(),
366 ))
367 }
368 }
369}
370
371glib::wrapper! {
374 pub struct ServoSrc(ObjectSubclass<imp::ServoSrc>)
375 @extends gst::Bin, gst::Element, gst::Object, @implements gst::URIHandler;
376}
377
378unsafe impl Send for ServoSrc {}
379unsafe impl Sync for ServoSrc {}
380
381impl ServoSrc {
382 pub fn set_size(&self, size: i64) {
383 self.imp().set_size(size);
384 }
385
386 pub fn set_seek_offset(&self, offset: u64) -> bool {
387 self.imp().set_seek_offset(self, offset)
388 }
389
390 pub fn set_seek_done(&self) {
391 self.imp().set_seek_done();
392 }
393
394 pub fn push_buffer(&self, data: Vec<u8>) -> Result<gst::FlowSuccess, gst::FlowError> {
395 self.imp().push_buffer(self, data)
396 }
397
398 pub fn push_end_of_stream(&self) -> Result<gst::FlowSuccess, gst::FlowError> {
399 self.imp().end_of_stream()
400 }
401
402 pub fn set_callbacks(&self, callbacks: gst_app::AppSrcCallbacks) {
403 self.imp().set_callbacks(callbacks)
404 }
405}
406
407pub fn register_servo_src() -> Result<(), glib::BoolError> {
411 gst::Element::register(None, "servosrc", gst::Rank::NONE, ServoSrc::static_type())
412}