1use super::BACKEND_BASE_TIME;
2use crate::datachannel::GStreamerWebRtcDataChannel;
3use crate::media_stream::GStreamerMediaStream;
4use glib;
5use glib::prelude::*;
6use gst;
7use gst::prelude::*;
8use gst_sdp;
9use gst_webrtc;
10use log::warn;
11use servo_media_streams::registry::{get_stream, MediaStreamId};
12use servo_media_streams::MediaStreamType;
13use servo_media_webrtc::datachannel::DataChannelId;
14use servo_media_webrtc::thread::InternalEvent;
15use servo_media_webrtc::WebRtcController as WebRtcThread;
16use servo_media_webrtc::*;
17use std::collections::HashMap;
18use std::sync::atomic::{AtomicUsize, Ordering};
19use std::sync::{Arc, Mutex};
20use std::{cmp, mem};
21
22#[derive(Debug, Clone)]
26pub struct MLineInfo {
27 caps: gst::Caps,
29 is_used: bool,
31 payload: i32,
33}
34
35enum DataChannelEventTarget {
36 Buffered(Vec<DataChannelEvent>),
37 Created(GStreamerWebRtcDataChannel),
38}
39
40pub struct GStreamerWebRtcController {
41 webrtc: gst::Element,
42 pipeline: gst::Pipeline,
43 delayed_negotiation: bool,
46 thread: WebRtcThread,
49 signaller: Box<dyn WebRtcSignaller>,
50 streams: Vec<MediaStreamId>,
53 pending_streams: Vec<MediaStreamId>,
64 pt_counter: i32,
69 request_pad_counter: usize,
72 remote_mline_info: Vec<MLineInfo>,
75 pending_remote_mline_info: Vec<MLineInfo>,
80 remote_offer_generation: u32,
82 _main_loop: glib::MainLoop,
83 data_channels: Arc<Mutex<HashMap<DataChannelId, DataChannelEventTarget>>>,
84 next_data_channel_id: Arc<AtomicUsize>,
85}
86
87impl WebRtcControllerBackend for GStreamerWebRtcController {
88 fn add_ice_candidate(&mut self, candidate: IceCandidate) -> WebRtcResult {
89 self.webrtc.emit_by_name::<()>(
90 "add-ice-candidate",
91 &[&candidate.sdp_mline_index, &candidate.candidate],
92 );
93 Ok(())
94 }
95
96 fn set_remote_description(
97 &mut self,
98 desc: SessionDescription,
99 cb: Box<dyn FnOnce() + Send + 'static>,
100 ) -> WebRtcResult {
101 self.set_description(desc, DescriptionType::Remote, cb)
102 }
103
104 fn set_local_description(
105 &mut self,
106 desc: SessionDescription,
107 cb: Box<dyn FnOnce() + Send + 'static>,
108 ) -> WebRtcResult {
109 self.set_description(desc, DescriptionType::Local, cb)
110 }
111
112 fn create_offer(
113 &mut self,
114 cb: Box<dyn FnOnce(SessionDescription) + Send + 'static>,
115 ) -> WebRtcResult {
116 self.flush_pending_streams(true)?;
117 self.pipeline.set_state(gst::State::Playing)?;
118 let promise = gst::Promise::with_change_func(move |res| {
119 res.map(|s| on_offer_or_answer_created(SdpType::Offer, s.unwrap(), cb))
120 .unwrap();
121 });
122
123 self.webrtc
124 .emit_by_name::<()>("create-offer", &[&None::<gst::Structure>, &promise]);
125 Ok(())
126 }
127
128 fn create_answer(
129 &mut self,
130 cb: Box<dyn FnOnce(SessionDescription) + Send + 'static>,
131 ) -> WebRtcResult {
132 let promise = gst::Promise::with_change_func(move |res| {
133 res.map(|s| on_offer_or_answer_created(SdpType::Answer, s.unwrap(), cb))
134 .unwrap();
135 });
136
137 self.webrtc
138 .emit_by_name::<()>("create-answer", &[&None::<gst::Structure>, &promise]);
139 Ok(())
140 }
141
142 fn add_stream(&mut self, stream_id: &MediaStreamId) -> WebRtcResult {
143 let stream =
144 get_stream(stream_id).expect("Media streams registry does not contain such ID");
145 let mut stream = stream.lock().unwrap();
146 let mut stream = stream
147 .as_mut_any()
148 .downcast_mut::<GStreamerMediaStream>()
149 .ok_or("Does not currently support non-gstreamer streams")?;
150 self.link_stream(stream_id, &mut stream, false)?;
151 if self.delayed_negotiation && (self.streams.len() > 1 || self.pending_streams.len() > 1) {
152 self.delayed_negotiation = false;
153 self.signaller.on_negotiation_needed(&self.thread);
154 }
155 Ok(())
156 }
157
158 fn create_data_channel(&mut self, init: &DataChannelInit) -> WebRtcDataChannelResult {
159 let id = self.next_data_channel_id.fetch_add(1, Ordering::Relaxed);
160 match GStreamerWebRtcDataChannel::new(&id, &self.webrtc, &self.thread, init) {
161 Ok(channel) => register_data_channel(self.data_channels.clone(), id, channel),
162 Err(error) => Err(WebRtcError::Backend(error)),
163 }
164 }
165
166 fn close_data_channel(&mut self, id: &DataChannelId) -> WebRtcResult {
167 let mut data_channels = self.data_channels.lock().unwrap();
170 match data_channels.get(id) {
171 Some(ref channel) => match channel {
172 DataChannelEventTarget::Created(ref channel) => {
173 channel.close();
174 Ok(())
175 },
176 DataChannelEventTarget::Buffered(_) => data_channels
177 .remove(id)
178 .ok_or(WebRtcError::Backend("Unknown data channel".to_owned()))
179 .map(|_| ()),
180 },
181 None => Err(WebRtcError::Backend("Unknown data channel".to_owned())),
182 }
183 }
184
185 fn send_data_channel_message(
186 &mut self,
187 id: &DataChannelId,
188 message: &DataChannelMessage,
189 ) -> WebRtcResult {
190 match self.data_channels.lock().unwrap().get(id) {
191 Some(ref channel) => match channel {
192 DataChannelEventTarget::Created(ref channel) => {
193 channel.send(message);
194 Ok(())
195 },
196 _ => Ok(()),
197 },
198 None => Err(WebRtcError::Backend("Unknown data channel".to_owned())),
199 }
200 }
201
202 fn configure(&mut self, stun_server: &str, policy: BundlePolicy) -> WebRtcResult {
203 self.webrtc
204 .set_property_from_str("stun-server", stun_server);
205 self.webrtc
206 .set_property_from_str("bundle-policy", policy.as_str());
207 Ok(())
208 }
209
210 fn internal_event(&mut self, e: thread::InternalEvent) -> WebRtcResult {
211 match e {
212 InternalEvent::OnNegotiationNeeded => {
213 if self.streams.is_empty() && self.pending_streams.is_empty() {
214 self.delayed_negotiation = true;
219 } else {
220 self.signaller.on_negotiation_needed(&self.thread);
221 }
222 },
223 InternalEvent::OnIceCandidate(candidate) => {
224 self.signaller.on_ice_candidate(&self.thread, candidate);
225 },
226 InternalEvent::OnAddStream(stream, ty) => {
227 self.pipeline.set_state(gst::State::Playing)?;
228 self.signaller.on_add_stream(&stream, ty);
229 },
230 InternalEvent::OnDataChannelEvent(channel_id, event) => {
231 let mut data_channels = self.data_channels.lock().unwrap();
232 match data_channels.get_mut(&channel_id) {
233 None => {
234 data_channels
235 .insert(channel_id, DataChannelEventTarget::Buffered(vec![event]));
236 },
237 Some(ref mut channel) => match channel {
238 DataChannelEventTarget::Buffered(ref mut events) => {
239 events.push(event);
240 return Ok(());
241 },
242 DataChannelEventTarget::Created(_) => {
243 match event {
244 DataChannelEvent::Close => {
245 data_channels.remove(&channel_id);
246 },
247 _ => {},
248 }
249 self.signaller
250 .on_data_channel_event(channel_id, event, &self.thread);
251 },
252 },
253 }
254 },
255 InternalEvent::DescriptionAdded(cb, description_type, ty, remote_offer_generation) => {
256 if description_type == DescriptionType::Remote
257 && ty == SdpType::Offer
258 && remote_offer_generation == self.remote_offer_generation
259 {
260 mem::swap(
261 &mut self.pending_remote_mline_info,
262 &mut self.remote_mline_info,
263 );
264 self.pending_remote_mline_info.clear();
265 self.flush_pending_streams(false)?;
266 }
267 self.pipeline.set_state(gst::State::Playing)?;
268 cb();
269 },
270 InternalEvent::UpdateSignalingState => {
271 use gst_webrtc::WebRTCSignalingState::*;
272 let val = self
273 .webrtc
274 .property::<gst_webrtc::WebRTCSignalingState>("signaling-state");
275 let state = match val {
276 Stable => SignalingState::Stable,
277 HaveLocalOffer => SignalingState::HaveLocalOffer,
278 HaveRemoteOffer => SignalingState::HaveRemoteOffer,
279 HaveLocalPranswer => SignalingState::HaveLocalPranswer,
280 HaveRemotePranswer => SignalingState::HaveRemotePranswer,
281 Closed => SignalingState::Closed,
282 i => {
283 return Err(WebRtcError::Backend(format!(
284 "unknown signaling state: {:?}",
285 i
286 )))
287 },
288 };
289 self.signaller.update_signaling_state(state);
290 },
291 InternalEvent::UpdateGatheringState => {
292 use gst_webrtc::WebRTCICEGatheringState::*;
293 let val = self
294 .webrtc
295 .property::<gst_webrtc::WebRTCICEGatheringState>("ice-gathering-state");
296 let state = match val {
297 New => GatheringState::New,
298 Gathering => GatheringState::Gathering,
299 Complete => GatheringState::Complete,
300 i => {
301 return Err(WebRtcError::Backend(format!(
302 "unknown gathering state: {:?}",
303 i
304 )))
305 },
306 };
307 self.signaller.update_gathering_state(state);
308 },
309 InternalEvent::UpdateIceConnectionState => {
310 use gst_webrtc::WebRTCICEConnectionState::*;
311 let val = self
312 .webrtc
313 .property::<gst_webrtc::WebRTCICEConnectionState>("ice-connection-state");
314 let state = match val {
315 New => IceConnectionState::New,
316 Checking => IceConnectionState::Checking,
317 Connected => IceConnectionState::Connected,
318 Completed => IceConnectionState::Completed,
319 Disconnected => IceConnectionState::Disconnected,
320 Failed => IceConnectionState::Failed,
321 Closed => IceConnectionState::Closed,
322 i => {
323 return Err(WebRtcError::Backend(format!(
324 "unknown ICE connection state: {:?}",
325 i
326 )))
327 },
328 };
329 self.signaller.update_ice_connection_state(state);
330 },
331 }
332 Ok(())
333 }
334
335 fn quit(&mut self) {
336 self.signaller.close();
337
338 self.pipeline.set_state(gst::State::Null).unwrap();
339 }
340}
341
342impl GStreamerWebRtcController {
343 fn set_description(
344 &mut self,
345 desc: SessionDescription,
346 description_type: DescriptionType,
347 cb: Box<dyn FnOnce() + Send + 'static>,
348 ) -> WebRtcResult {
349 let ty = match desc.type_ {
350 SdpType::Answer => gst_webrtc::WebRTCSDPType::Answer,
351 SdpType::Offer => gst_webrtc::WebRTCSDPType::Offer,
352 SdpType::Pranswer => gst_webrtc::WebRTCSDPType::Pranswer,
353 SdpType::Rollback => gst_webrtc::WebRTCSDPType::Rollback,
354 };
355
356 let kind = match description_type {
357 DescriptionType::Local => "set-local-description",
358 DescriptionType::Remote => "set-remote-description",
359 };
360
361 let sdp = gst_sdp::SDPMessage::parse_buffer(desc.sdp.as_bytes()).unwrap();
362 if description_type == DescriptionType::Remote {
363 self.remote_offer_generation += 1;
364 self.store_remote_mline_info(&sdp);
365 }
366 let answer = gst_webrtc::WebRTCSessionDescription::new(ty, sdp);
367 let thread = self.thread.clone();
368 let remote_offer_generation = self.remote_offer_generation;
369 let promise = gst::Promise::with_change_func(move |_promise| {
370 thread.internal_event(InternalEvent::DescriptionAdded(
373 cb,
374 description_type,
375 desc.type_,
376 remote_offer_generation,
377 ));
378 });
379 self.webrtc.emit_by_name::<()>(kind, &[&answer, &promise]);
380 Ok(())
381 }
382
383 fn store_remote_mline_info(&mut self, sdp: &gst_sdp::SDPMessage) {
384 self.pending_remote_mline_info.clear();
385 for media in sdp.medias() {
386 let mut caps = gst::Caps::new_empty();
387 let caps_mut = caps.get_mut().expect("Fresh caps should be uniquely owned");
388 for format in media.formats() {
389 if format == "webrtc-datachannel" {
390 return;
391 }
392 let pt = format
393 .parse()
394 .expect("Gstreamer provided noninteger format");
395 caps_mut.append(
396 media
397 .caps_from_media(pt)
398 .expect("get_format() did not return a format from the SDP"),
399 );
400 self.pt_counter = cmp::max(self.pt_counter, pt + 1);
401 }
402 for s in caps_mut.iter_mut() {
403 s.set_name("application/x-rtp")
408 }
409 self.pending_remote_mline_info.push(MLineInfo {
413 caps,
414 is_used: false,
417 payload: media
421 .format(0)
422 .expect("Gstreamer reported incorrect formats_len()")
423 .parse()
424 .expect("Gstreamer provided noninteger format"),
425 });
426 }
427 }
428
429 fn link_stream(
442 &mut self,
443 stream_id: &MediaStreamId,
444 stream: &mut GStreamerMediaStream,
445 request_new_pads: bool,
446 ) -> WebRtcResult {
447 let caps = stream.caps();
448 let idx = self
449 .remote_mline_info
450 .iter()
451 .enumerate()
452 .filter(|(_, x)| !x.is_used)
453 .find(|(_, x)| x.caps.can_intersect(caps))
454 .map(|x| x.0);
455 if let Some(idx) = idx {
456 if idx >= self.request_pad_counter {
457 for i in self.request_pad_counter..=idx {
458 self.webrtc
467 .request_pad_simple(&format!("sink_{}", i))
468 .ok_or("Cannot request sink pad")?;
469 }
470 self.request_pad_counter = idx + 1;
471 }
472 stream.attach_to_pipeline(&self.pipeline);
473 let element = stream.encoded();
474 self.remote_mline_info[idx].is_used = true;
475 let caps = stream.caps_with_payload(self.remote_mline_info[idx].payload);
476 element.set_property("caps", &caps);
477 let src = element.static_pad("src").ok_or("Cannot request src pad")?;
478 let sink = self
479 .webrtc
480 .static_pad(&format!("sink_{}", idx))
481 .ok_or("Cannot request sink pad")?;
482 src.link(&sink)?;
483 self.streams.push(*stream_id);
484 } else if request_new_pads {
485 stream.attach_to_pipeline(&self.pipeline);
486 let element = stream.encoded();
487 let caps = stream.caps_with_payload(self.pt_counter);
488 self.pt_counter += 1;
489 element.set_property("caps", &caps);
490 let src = element.static_pad("src").ok_or("Cannot request src pad")?;
491 let sink = self
492 .webrtc
493 .request_pad_simple(&format!("sink_{}", self.request_pad_counter))
494 .ok_or("Cannot request sink pad")?;
495 self.request_pad_counter += 1;
496 src.link(&sink)?;
497 self.streams.push(*stream_id);
498 } else {
499 self.pending_streams.push(*stream_id);
500 }
501 Ok(())
502 }
503
504 fn flush_pending_streams(&mut self, request_new_pads: bool) -> WebRtcResult {
506 let pending_streams = mem::replace(&mut self.pending_streams, vec![]);
507 for stream_id in pending_streams {
508 let stream =
509 get_stream(&stream_id).expect("Media streams registry does not contain such ID");
510 let mut stream = stream.lock().unwrap();
511 let mut stream = stream
512 .as_mut_any()
513 .downcast_mut::<GStreamerMediaStream>()
514 .ok_or("Does not currently support non-gstreamer streams")?;
515 self.link_stream(&stream_id, &mut stream, request_new_pads)?;
516 }
517 Ok(())
518 }
519
520 fn start_pipeline(&mut self) -> WebRtcResult {
521 self.pipeline.add(&self.webrtc)?;
522
523 let thread = Mutex::new(self.thread.clone());
526 self.webrtc
527 .connect("on-ice-candidate", false, move |values| {
528 thread
529 .lock()
530 .unwrap()
531 .internal_event(InternalEvent::OnIceCandidate(candidate(values)));
532 None
533 });
534
535 let thread = Arc::new(Mutex::new(self.thread.clone()));
536 self.webrtc.connect_pad_added({
537 let pipeline_weak = self.pipeline.downgrade();
538 move |_element, pad| {
539 let Some(pipe) = pipeline_weak.upgrade() else {
540 warn!("Pipeline already deallocated");
541 return;
542 };
543 process_new_stream(pad, &pipe, thread.clone());
544 }
545 });
546
547 let thread = Mutex::new(self.thread.clone());
550 self.webrtc
551 .connect("on-negotiation-needed", false, move |_values| {
552 thread
553 .lock()
554 .unwrap()
555 .internal_event(InternalEvent::OnNegotiationNeeded);
556 None
557 });
558
559 let thread = Mutex::new(self.thread.clone());
560 self.webrtc
561 .connect("notify::signaling-state", false, move |_values| {
562 thread
563 .lock()
564 .unwrap()
565 .internal_event(InternalEvent::UpdateSignalingState);
566 None
567 });
568 let thread = Mutex::new(self.thread.clone());
569 self.webrtc
570 .connect("notify::ice-connection-state", false, move |_values| {
571 thread
572 .lock()
573 .unwrap()
574 .internal_event(InternalEvent::UpdateIceConnectionState);
575 None
576 });
577 let thread = Mutex::new(self.thread.clone());
578 self.webrtc
579 .connect("notify::ice-gathering-state", false, move |_values| {
580 thread
581 .lock()
582 .unwrap()
583 .internal_event(InternalEvent::UpdateGatheringState);
584 None
585 });
586 let thread = Mutex::new(self.thread.clone());
587 let data_channels = self.data_channels.clone();
588 let next_data_channel_id = self.next_data_channel_id.clone();
589 self.webrtc
590 .connect("on-data-channel", false, move |channel| {
591 let channel = channel[1]
592 .get::<gst_webrtc::WebRTCDataChannel>()
593 .map_err(|e| e.to_string())
594 .expect("Invalid data channel");
595 let id = next_data_channel_id.fetch_add(1, Ordering::Relaxed);
596 let thread_ = thread.lock().unwrap().clone();
597 match GStreamerWebRtcDataChannel::from(&id, channel, &thread_) {
598 Ok(channel) => {
599 let mut closed_channel = false;
600 {
601 thread_.internal_event(InternalEvent::OnDataChannelEvent(
602 id,
603 DataChannelEvent::NewChannel,
604 ));
605
606 let mut data_channels = data_channels.lock().unwrap();
607 if let Some(ref mut channel) = data_channels.get_mut(&id) {
608 match channel {
609 DataChannelEventTarget::Buffered(ref mut events) => {
610 for event in events.drain(0..) {
611 match event {
612 DataChannelEvent::Close => closed_channel = true,
613 _ => {},
614 }
615 thread_.internal_event(
616 InternalEvent::OnDataChannelEvent(id, event),
617 );
618 }
619 },
620 _ => debug_assert!(
621 false,
622 "Trying to register a data channel with an existing ID"
623 ),
624 }
625 }
626 data_channels.remove(&id);
627 }
628 if !closed_channel {
629 if register_data_channel(data_channels.clone(), id, channel).is_err() {
630 warn!("Could not register data channel {:?}", id);
631 return None;
632 }
633 }
634 },
635 Err(error) => {
636 warn!("Could not create data channel {:?}", error);
637 },
638 }
639 None
640 });
641
642 self.pipeline.set_state(gst::State::Ready)?;
643 Ok(())
644 }
645}
646
647pub fn construct(
648 signaller: Box<dyn WebRtcSignaller>,
649 thread: WebRtcThread,
650) -> Result<GStreamerWebRtcController, WebRtcError> {
651 let main_loop = glib::MainLoop::new(None, false);
652 let pipeline = gst::Pipeline::with_name("webrtc main");
653 pipeline.set_start_time(gst::ClockTime::NONE);
654 pipeline.set_base_time(*BACKEND_BASE_TIME);
655 pipeline.use_clock(Some(&gst::SystemClock::obtain()));
656 let webrtc = gst::ElementFactory::make("webrtcbin")
657 .name("sendrecv")
658 .build()
659 .map_err(|error| format!("webrtcbin element not found: {error:?}"))?;
660 let mut controller = GStreamerWebRtcController {
661 webrtc,
662 pipeline,
663 signaller,
664 thread,
665 remote_mline_info: vec![],
666 pending_remote_mline_info: vec![],
667 streams: vec![],
668 pending_streams: vec![],
669 pt_counter: 96,
670 request_pad_counter: 0,
671 remote_offer_generation: 0,
672 delayed_negotiation: false,
673 _main_loop: main_loop,
674 data_channels: Arc::new(Mutex::new(HashMap::new())),
675 next_data_channel_id: Arc::new(AtomicUsize::new(0)),
676 };
677 controller.start_pipeline()?;
678 Ok(controller)
679}
680
681fn on_offer_or_answer_created(
682 ty: SdpType,
683 reply: &gst::StructureRef,
684 cb: Box<dyn FnOnce(SessionDescription) + Send + 'static>,
685) {
686 debug_assert!(ty == SdpType::Offer || ty == SdpType::Answer);
687 let reply = reply
688 .value(ty.as_str())
689 .unwrap()
690 .get::<gst_webrtc::WebRTCSessionDescription>()
691 .expect("Invalid argument");
692
693 let type_ = match reply.type_() {
694 gst_webrtc::WebRTCSDPType::Answer => SdpType::Answer,
695 gst_webrtc::WebRTCSDPType::Offer => SdpType::Offer,
696 gst_webrtc::WebRTCSDPType::Pranswer => SdpType::Pranswer,
697 gst_webrtc::WebRTCSDPType::Rollback => SdpType::Rollback,
698 _ => panic!("unknown sdp response"),
699 };
700
701 let desc = SessionDescription {
702 sdp: reply.sdp().as_text().unwrap(),
703 type_,
704 };
705
706 cb(desc);
707}
708
709fn on_incoming_stream(pipe: &gst::Pipeline, thread: Arc<Mutex<WebRtcThread>>, pad: &gst::Pad) {
710 let decodebin = gst::ElementFactory::make("decodebin").build().unwrap();
711 let caps = pad.query_caps(None);
712 let name = caps
713 .structure(0)
714 .unwrap()
715 .get::<String>("media")
716 .expect("Invalid 'media' field");
717 let decodebin2 = decodebin.clone();
718 decodebin.connect_pad_added({
719 let pipeline_weak = pipe.downgrade();
720 move |_element, pad| {
721 let Some(pipe) = pipeline_weak.upgrade() else {
722 warn!("Pipeline already deallocated");
723 return;
724 };
725 on_incoming_decodebin_stream(pad, &pipe, thread.clone(), &name);
726 }
727 });
728 pipe.add(&decodebin).unwrap();
729
730 let decodepad = decodebin.static_pad("sink").unwrap();
731 pad.link(&decodepad).unwrap();
732 decodebin2.sync_state_with_parent().unwrap();
733}
734
735fn on_incoming_decodebin_stream(
736 pad: &gst::Pad,
737 pipe: &gst::Pipeline,
738 thread: Arc<Mutex<WebRtcThread>>,
739 name: &str,
740) {
741 let proxy_sink = gst::ElementFactory::make("proxysink").build().unwrap();
742 let proxy_src = gst::ElementFactory::make("proxysrc")
743 .property("proxysink", &proxy_sink)
744 .build()
745 .unwrap();
746 pipe.add(&proxy_sink).unwrap();
747 let sinkpad = proxy_sink.static_pad("sink").unwrap();
748
749 pad.link(&sinkpad).unwrap();
750 proxy_sink.sync_state_with_parent().unwrap();
751
752 let (stream, ty) = if name == "video" {
753 (
754 GStreamerMediaStream::create_video_from(proxy_src),
755 MediaStreamType::Video,
756 )
757 } else {
758 (
759 GStreamerMediaStream::create_audio_from(proxy_src),
760 MediaStreamType::Audio,
761 )
762 };
763 thread
764 .lock()
765 .unwrap()
766 .internal_event(InternalEvent::OnAddStream(stream, ty));
767}
768
769fn process_new_stream(pad: &gst::Pad, pipe: &gst::Pipeline, thread: Arc<Mutex<WebRtcThread>>) {
770 if pad.direction() != gst::PadDirection::Src {
771 return;
773 }
774 on_incoming_stream(pipe, thread, pad)
775}
776
777fn candidate(values: &[glib::Value]) -> IceCandidate {
778 let _webrtc = values[0].get::<gst::Element>().expect("Invalid argument");
779 let sdp_mline_index = values[1].get::<u32>().expect("Invalid argument");
780 let candidate = values[2].get::<String>().expect("Invalid argument");
781
782 IceCandidate {
783 sdp_mline_index,
784 candidate,
785 }
786}
787
788fn register_data_channel(
789 registry: Arc<Mutex<HashMap<DataChannelId, DataChannelEventTarget>>>,
790 id: DataChannelId,
791 channel: GStreamerWebRtcDataChannel,
792) -> WebRtcDataChannelResult {
793 if registry.lock().unwrap().contains_key(&id) {
794 return Err(WebRtcError::Backend(
795 "Could not register data channel. ID collision".to_owned(),
796 ));
797 }
798 registry
799 .lock()
800 .unwrap()
801 .insert(id, DataChannelEventTarget::Created(channel));
802 Ok(id)
803}