1use std::convert::TryFrom;
6use std::sync::Mutex;
7use std::sync::atomic::{AtomicBool, Ordering};
8
9use glib::subclass::prelude::*;
10use gstreamer::prelude::*;
11use gstreamer::subclass::prelude::*;
12use url::Url;
13
14const MAX_SRC_QUEUE_SIZE: u64 = 50 * 1024 * 1024; mod imp {
18 use std::sync::LazyLock;
19
20 use super::*;
21
22 macro_rules! inner_appsrc_proxy {
23 ($fn_name:ident, $return_type:ty) => {
24 pub fn $fn_name(&self) -> $return_type {
25 self.appsrc.$fn_name()
26 }
27 };
28
29 ($fn_name:ident, $arg1:ident, $arg1_type:ty, $return_type:ty) => {
30 pub fn $fn_name(&self, $arg1: $arg1_type) -> $return_type {
31 self.appsrc.$fn_name($arg1)
32 }
33 };
34 }
35
36 #[derive(Debug, Default)]
37 struct Position {
38 offset: u64,
39 requested_offset: u64,
40 }
41
42 pub struct ServoSrc {
45 cat: gstreamer::DebugCategory,
46 appsrc: gstreamer_app::AppSrc,
47 srcpad: gstreamer::GhostPad,
48 position: Mutex<Position>,
49 seeking: AtomicBool,
50 size: Mutex<Option<i64>>,
51 }
52
53 impl ServoSrc {
54 pub fn set_size(&self, size: i64) {
55 if self.seeking.load(Ordering::Relaxed) {
56 *self.size.lock().unwrap() = Some(size);
60 return;
61 }
62
63 if self.appsrc.size() == -1 {
64 self.appsrc.set_size(size);
65 }
66 }
67
68 pub fn set_seek_offset<O: IsA<gstreamer::Object>>(&self, parent: &O, offset: u64) -> bool {
69 let mut pos = self.position.lock().unwrap();
70
71 if pos.offset == offset || pos.requested_offset != 0 {
72 false
73 } else {
74 self.seeking.store(true, Ordering::Relaxed);
75 pos.requested_offset = offset;
76 gstreamer::debug!(
77 self.cat,
78 obj = parent,
79 "seeking to offset: {}",
80 pos.requested_offset
81 );
82
83 true
84 }
85 }
86
87 pub fn set_seek_done(&self) {
88 self.seeking.store(false, Ordering::Relaxed);
89
90 if let Some(size) = self.size.lock().unwrap().take() &&
91 self.appsrc.size() == -1
92 {
93 self.appsrc.set_size(size);
94 }
95
96 let mut pos = self.position.lock().unwrap();
97 pos.offset = pos.requested_offset;
98 pos.requested_offset = 0;
99 }
100
101 pub fn push_buffer<O: IsA<gstreamer::Object>>(
102 &self,
103 parent: &O,
104 data: Vec<u8>,
105 ) -> Result<gstreamer::FlowSuccess, gstreamer::FlowError> {
106 if self.seeking.load(Ordering::Relaxed) {
107 gstreamer::debug!(self.cat, obj = parent, "seek in progress, ignored data");
108 return Ok(gstreamer::FlowSuccess::Ok);
109 }
110
111 let mut pos = self.position.lock().unwrap(); let length = u64::try_from(data.len()).unwrap();
114 let mut data_offset = 0;
115
116 let buffer_starting_offset = pos.offset;
117
118 pos.offset += length;
122
123 gstreamer::trace!(self.cat, obj = parent, "offset: {}", pos.offset);
124
125 if let Ok(size) = u64::try_from(self.appsrc.size()) &&
128 pos.offset > size
129 {
130 gstreamer::debug!(
131 self.cat,
132 obj = parent,
133 "Updating internal size from {} to {}",
134 size,
135 pos.offset
136 );
137 let new_size = i64::try_from(pos.offset).unwrap();
138 self.appsrc.set_size(new_size);
139 }
140
141 let block_size = 4096;
146 let num_blocks = ((length - data_offset) as f64 / block_size as f64).ceil() as u64;
147
148 gstreamer::log!(
149 self.cat,
150 obj = parent,
151 "Splitting the received vec into {} blocks",
152 num_blocks
153 );
154
155 let mut ret: Result<gstreamer::FlowSuccess, gstreamer::FlowError> =
156 Ok(gstreamer::FlowSuccess::Ok);
157 for i in 0..num_blocks {
158 let start = usize::try_from(i * block_size + data_offset).unwrap();
159 data_offset = 0;
160 let size = usize::try_from(block_size.min(length - start as u64)).unwrap();
161 let end = start + size;
162
163 let buffer_offset = buffer_starting_offset + start as u64;
164 let buffer_offset_end = buffer_offset + size as u64;
165
166 let subdata = Vec::from(&data[start..end]);
167 let mut buffer = gstreamer::Buffer::from_slice(subdata);
168 {
169 let buffer = buffer.get_mut().unwrap();
170 buffer.set_offset(buffer_offset);
171 buffer.set_offset_end(buffer_offset_end);
172 }
173
174 if self.seeking.load(Ordering::Relaxed) {
175 gstreamer::trace!(
176 self.cat,
177 obj = parent,
178 "stopping buffer appends due to seek"
179 );
180 ret = Ok(gstreamer::FlowSuccess::Ok);
181 break;
182 }
183
184 gstreamer::trace!(self.cat, obj = parent, "Pushing buffer {:?}", buffer);
185
186 ret = self.appsrc.push_buffer(buffer);
187 match ret {
188 Ok(_) => (),
189 Err(gstreamer::FlowError::Eos) | Err(gstreamer::FlowError::Flushing) => {
190 ret = Ok(gstreamer::FlowSuccess::Ok)
191 },
192 Err(_) => break,
193 }
194 }
195
196 ret
197 }
198
199 inner_appsrc_proxy!(end_of_stream, Result<gstreamer::FlowSuccess, gstreamer::FlowError>);
200 inner_appsrc_proxy!(set_callbacks, callbacks, gstreamer_app::AppSrcCallbacks, ());
201
202 fn query(&self, pad: &gstreamer::GhostPad, query: &mut gstreamer::QueryRef) -> bool {
203 gstreamer::log!(self.cat, obj = pad, "Handling query {:?}", query);
204
205 let ret = match query.view_mut() {
219 gstreamer::QueryViewMut::Scheduling(ref mut q) => {
220 let flags = gstreamer::SchedulingFlags::SEQUENTIAL |
221 gstreamer::SchedulingFlags::BANDWIDTH_LIMITED;
222 q.set(flags, 1, -1, 0);
223 q.add_scheduling_modes([gstreamer::PadMode::Push]);
224 true
225 },
226 _ => gstreamer::Pad::query_default(pad, Some(&*self.obj()), query),
227 };
228
229 if ret {
230 gstreamer::log!(self.cat, obj = pad, "Handled query {:?}", query);
231 } else {
232 gstreamer::info!(self.cat, obj = pad, "Didn't handle query {:?}", query);
233 }
234 ret
235 }
236 }
237
238 #[glib::object_subclass]
240 impl ObjectSubclass for ServoSrc {
241 const NAME: &'static str = "ServoSrc";
242 type Type = super::ServoSrc;
243 type ParentType = gstreamer::Bin;
244 type Interfaces = (gstreamer::URIHandler,);
245
246 fn with_class(klass: &Self::Class) -> Self {
249 let app_src = gstreamer::ElementFactory::make("appsrc")
250 .build()
251 .map(|elem| elem.downcast::<gstreamer_app::AppSrc>().unwrap())
252 .expect("Could not create appsrc element");
253
254 let pad_templ = klass.pad_template("src").unwrap();
255 let ghost_pad = gstreamer::GhostPad::builder_from_template(&pad_templ)
256 .query_function(|pad, parent, query| {
257 ServoSrc::catch_panic_pad_function(
258 parent,
259 || false,
260 |servosrc| servosrc.query(pad, query),
261 )
262 })
263 .build();
264
265 Self {
266 cat: gstreamer::DebugCategory::new(
267 "servosrc",
268 gstreamer::DebugColorFlags::empty(),
269 Some("Servo source"),
270 ),
271 appsrc: app_src,
272 srcpad: ghost_pad,
273 position: Mutex::new(Default::default()),
274 seeking: AtomicBool::new(false),
275 size: Mutex::new(None),
276 }
277 }
278 }
279
280 impl ObjectImpl for ServoSrc {
287 fn constructed(&self) {
289 self.parent_constructed();
291
292 self.obj()
293 .add(&self.appsrc)
294 .expect("Could not add appsrc element to bin");
295
296 let target_pad = self.appsrc.static_pad("src");
297 self.srcpad.set_target(target_pad.as_ref()).unwrap();
298
299 self.obj()
300 .add_pad(&self.srcpad)
301 .expect("Could not add source pad to bin");
302
303 self.appsrc.set_caps(None::<&gstreamer::Caps>);
304 self.appsrc.set_max_bytes(MAX_SRC_QUEUE_SIZE);
305 self.appsrc.set_block(false);
306 self.appsrc.set_format(gstreamer::Format::Bytes);
307 self.appsrc
308 .set_stream_type(gstreamer_app::AppStreamType::Seekable);
309
310 self.obj()
311 .set_element_flags(gstreamer::ElementFlags::SOURCE);
312 }
313 }
314
315 impl GstObjectImpl for ServoSrc {}
316
317 impl ElementImpl for ServoSrc {
319 fn metadata() -> Option<&'static gstreamer::subclass::ElementMetadata> {
320 static ELEMENT_METADATA: LazyLock<gstreamer::subclass::ElementMetadata> =
321 LazyLock::new(|| {
322 gstreamer::subclass::ElementMetadata::new(
323 "Servo Media Source",
324 "Source/Audio/Video",
325 "Feed player with media data",
326 "Servo developers",
327 )
328 });
329
330 Some(&*ELEMENT_METADATA)
331 }
332
333 fn pad_templates() -> &'static [gstreamer::PadTemplate] {
334 static PAD_TEMPLATES: LazyLock<Vec<gstreamer::PadTemplate>> = LazyLock::new(|| {
335 let caps = gstreamer::Caps::new_any();
336 let src_pad_template = gstreamer::PadTemplate::new(
337 "src",
338 gstreamer::PadDirection::Src,
339 gstreamer::PadPresence::Always,
340 &caps,
341 )
342 .unwrap();
343
344 vec![src_pad_template]
345 });
346
347 PAD_TEMPLATES.as_ref()
348 }
349 }
350
351 impl BinImpl for ServoSrc {}
353
354 impl URIHandlerImpl for ServoSrc {
355 const URI_TYPE: gstreamer::URIType = gstreamer::URIType::Src;
356
357 fn protocols() -> &'static [&'static str] {
358 &["servosrc"]
359 }
360
361 fn uri(&self) -> Option<String> {
362 Some("servosrc://".to_string())
363 }
364
365 fn set_uri(&self, uri: &str) -> Result<(), glib::Error> {
366 if let Ok(uri) = Url::parse(uri) &&
367 uri.scheme() == "servosrc"
368 {
369 return Ok(());
370 }
371 Err(glib::Error::new(
372 gstreamer::URIError::BadUri,
373 format!("Invalid URI '{:?}'", uri,).as_str(),
374 ))
375 }
376 }
377}
378
379glib::wrapper! {
382 pub struct ServoSrc(ObjectSubclass<imp::ServoSrc>)
383 @extends gstreamer::Bin, gstreamer::Element, gstreamer::Object, @implements gstreamer::URIHandler;
384}
385
386unsafe impl Send for ServoSrc {}
387unsafe impl Sync for ServoSrc {}
388
389impl ServoSrc {
390 pub fn set_size(&self, size: i64) {
391 self.imp().set_size(size);
392 }
393
394 pub fn set_seek_offset(&self, offset: u64) -> bool {
395 self.imp().set_seek_offset(self, offset)
396 }
397
398 pub fn set_seek_done(&self) {
399 self.imp().set_seek_done();
400 }
401
402 pub fn push_buffer(
403 &self,
404 data: Vec<u8>,
405 ) -> Result<gstreamer::FlowSuccess, gstreamer::FlowError> {
406 self.imp().push_buffer(self, data)
407 }
408
409 pub fn push_end_of_stream(&self) -> Result<gstreamer::FlowSuccess, gstreamer::FlowError> {
410 self.imp().end_of_stream()
411 }
412
413 pub fn set_callbacks(&self, callbacks: gstreamer_app::AppSrcCallbacks) {
414 self.imp().set_callbacks(callbacks)
415 }
416}
417
418pub fn register_servo_src() -> Result<(), glib::BoolError> {
422 gstreamer::Element::register(
423 None,
424 "servosrc",
425 gstreamer::Rank::NONE,
426 ServoSrc::static_type(),
427 )
428}