1use super::BACKEND_BASE_TIME;
2use crate::datachannel::GStreamerWebRtcDataChannel;
3use crate::media_stream::GStreamerMediaStream;
4use glib;
5use glib::prelude::*;
6use gst;
7use gst::prelude::*;
8use gst_sdp;
9use gst_webrtc;
10use log::warn;
11use servo_media_streams::registry::{get_stream, MediaStreamId};
12use servo_media_streams::MediaStreamType;
13use servo_media_webrtc::datachannel::DataChannelId;
14use servo_media_webrtc::thread::InternalEvent;
15use servo_media_webrtc::WebRtcController as WebRtcThread;
16use servo_media_webrtc::*;
17use std::collections::HashMap;
18use std::sync::atomic::{AtomicUsize, Ordering};
19use std::sync::{Arc, Mutex};
20use std::{cmp, mem};
21
22#[derive(Debug, Clone)]
26pub struct MLineInfo {
27 caps: gst::Caps,
29 is_used: bool,
31 payload: i32,
33}
34
35enum DataChannelEventTarget {
36 Buffered(Vec<DataChannelEvent>),
37 Created(GStreamerWebRtcDataChannel),
38}
39
40pub struct GStreamerWebRtcController {
41 webrtc: gst::Element,
42 pipeline: gst::Pipeline,
43 delayed_negotiation: bool,
46 thread: WebRtcThread,
49 signaller: Box<dyn WebRtcSignaller>,
50 streams: Vec<MediaStreamId>,
53 pending_streams: Vec<MediaStreamId>,
64 pt_counter: i32,
69 request_pad_counter: usize,
72 remote_mline_info: Vec<MLineInfo>,
75 pending_remote_mline_info: Vec<MLineInfo>,
80 remote_offer_generation: u32,
82 _main_loop: glib::MainLoop,
83 data_channels: Arc<Mutex<HashMap<DataChannelId, DataChannelEventTarget>>>,
84 next_data_channel_id: Arc<AtomicUsize>,
85}
86
87impl WebRtcControllerBackend for GStreamerWebRtcController {
88 fn add_ice_candidate(&mut self, candidate: IceCandidate) -> WebRtcResult {
89 self.webrtc.emit_by_name::<()>(
90 "add-ice-candidate",
91 &[&candidate.sdp_mline_index, &candidate.candidate],
92 );
93 Ok(())
94 }
95
96 fn set_remote_description(
97 &mut self,
98 desc: SessionDescription,
99 cb: Box<dyn FnOnce() + Send + 'static>,
100 ) -> WebRtcResult {
101 self.set_description(desc, DescriptionType::Remote, cb)
102 }
103
104 fn set_local_description(
105 &mut self,
106 desc: SessionDescription,
107 cb: Box<dyn FnOnce() + Send + 'static>,
108 ) -> WebRtcResult {
109 self.set_description(desc, DescriptionType::Local, cb)
110 }
111
112 fn create_offer(&mut self, cb: Box<dyn FnOnce(SessionDescription) + Send + 'static>) -> WebRtcResult {
113 self.flush_pending_streams(true)?;
114 self.pipeline.set_state(gst::State::Playing)?;
115 let promise = gst::Promise::with_change_func(move |res| {
116 res.map(|s| on_offer_or_answer_created(SdpType::Offer, s.unwrap(), cb))
117 .unwrap();
118 });
119
120 self.webrtc
121 .emit_by_name::<()>("create-offer", &[&None::<gst::Structure>, &promise]);
122 Ok(())
123 }
124
125 fn create_answer(&mut self, cb: Box<dyn FnOnce(SessionDescription) + Send + 'static>) -> WebRtcResult {
126 let promise = gst::Promise::with_change_func(move |res| {
127 res.map(|s| on_offer_or_answer_created(SdpType::Answer, s.unwrap(), cb))
128 .unwrap();
129 });
130
131 self.webrtc
132 .emit_by_name::<()>("create-answer", &[&None::<gst::Structure>, &promise]);
133 Ok(())
134 }
135
136 fn add_stream(&mut self, stream_id: &MediaStreamId) -> WebRtcResult {
137 let stream =
138 get_stream(stream_id).expect("Media streams registry does not contain such ID");
139 let mut stream = stream.lock().unwrap();
140 let mut stream = stream
141 .as_mut_any()
142 .downcast_mut::<GStreamerMediaStream>()
143 .ok_or("Does not currently support non-gstreamer streams")?;
144 self.link_stream(stream_id, &mut stream, false)?;
145 if self.delayed_negotiation && (self.streams.len() > 1 || self.pending_streams.len() > 1) {
146 self.delayed_negotiation = false;
147 self.signaller.on_negotiation_needed(&self.thread);
148 }
149 Ok(())
150 }
151
152 fn create_data_channel(&mut self, init: &DataChannelInit) -> WebRtcDataChannelResult {
153 let id = self.next_data_channel_id.fetch_add(1, Ordering::Relaxed);
154 match GStreamerWebRtcDataChannel::new(&id, &self.webrtc, &self.thread, init) {
155 Ok(channel) => register_data_channel(self.data_channels.clone(), id, channel),
156 Err(error) => Err(WebRtcError::Backend(error)),
157 }
158 }
159
160 fn close_data_channel(&mut self, id: &DataChannelId) -> WebRtcResult {
161 let mut data_channels = self.data_channels.lock().unwrap();
164 match data_channels.get(id) {
165 Some(ref channel) => match channel {
166 DataChannelEventTarget::Created(ref channel) => {
167 channel.close();
168 Ok(())
169 }
170 DataChannelEventTarget::Buffered(_) => data_channels
171 .remove(id)
172 .ok_or(WebRtcError::Backend("Unknown data channel".to_owned()))
173 .map(|_| ()),
174 },
175 None => Err(WebRtcError::Backend("Unknown data channel".to_owned())),
176 }
177 }
178
179 fn send_data_channel_message(
180 &mut self,
181 id: &DataChannelId,
182 message: &DataChannelMessage,
183 ) -> WebRtcResult {
184 match self.data_channels.lock().unwrap().get(id) {
185 Some(ref channel) => match channel {
186 DataChannelEventTarget::Created(ref channel) => {
187 channel.send(message);
188 Ok(())
189 }
190 _ => Ok(()),
191 },
192 None => Err(WebRtcError::Backend("Unknown data channel".to_owned())),
193 }
194 }
195
196 fn configure(&mut self, stun_server: &str, policy: BundlePolicy) -> WebRtcResult {
197 self.webrtc
198 .set_property_from_str("stun-server", stun_server);
199 self.webrtc
200 .set_property_from_str("bundle-policy", policy.as_str());
201 Ok(())
202 }
203
204 fn internal_event(&mut self, e: thread::InternalEvent) -> WebRtcResult {
205 match e {
206 InternalEvent::OnNegotiationNeeded => {
207 if self.streams.is_empty() && self.pending_streams.is_empty() {
208 self.delayed_negotiation = true;
213 } else {
214 self.signaller.on_negotiation_needed(&self.thread);
215 }
216 }
217 InternalEvent::OnIceCandidate(candidate) => {
218 self.signaller.on_ice_candidate(&self.thread, candidate);
219 }
220 InternalEvent::OnAddStream(stream, ty) => {
221 self.pipeline.set_state(gst::State::Playing)?;
222 self.signaller.on_add_stream(&stream, ty);
223 }
224 InternalEvent::OnDataChannelEvent(channel_id, event) => {
225 let mut data_channels = self.data_channels.lock().unwrap();
226 match data_channels.get_mut(&channel_id) {
227 None => {
228 data_channels
229 .insert(channel_id, DataChannelEventTarget::Buffered(vec![event]));
230 }
231 Some(ref mut channel) => match channel {
232 DataChannelEventTarget::Buffered(ref mut events) => {
233 events.push(event);
234 return Ok(());
235 }
236 DataChannelEventTarget::Created(_) => {
237 match event {
238 DataChannelEvent::Close => {
239 data_channels.remove(&channel_id);
240 }
241 _ => {}
242 }
243 self.signaller
244 .on_data_channel_event(channel_id, event, &self.thread);
245 }
246 },
247 }
248 }
249 InternalEvent::DescriptionAdded(cb, description_type, ty, remote_offer_generation) => {
250 if description_type == DescriptionType::Remote
251 && ty == SdpType::Offer
252 && remote_offer_generation == self.remote_offer_generation
253 {
254 mem::swap(
255 &mut self.pending_remote_mline_info,
256 &mut self.remote_mline_info,
257 );
258 self.pending_remote_mline_info.clear();
259 self.flush_pending_streams(false)?;
260 }
261 self.pipeline.set_state(gst::State::Playing)?;
262 cb();
263 }
264 InternalEvent::UpdateSignalingState => {
265 use gst_webrtc::WebRTCSignalingState::*;
266 let val = self
267 .webrtc
268 .property::<gst_webrtc::WebRTCSignalingState>("signaling-state");
269 let state = match val {
270 Stable => SignalingState::Stable,
271 HaveLocalOffer => SignalingState::HaveLocalOffer,
272 HaveRemoteOffer => SignalingState::HaveRemoteOffer,
273 HaveLocalPranswer => SignalingState::HaveLocalPranswer,
274 HaveRemotePranswer => SignalingState::HaveRemotePranswer,
275 Closed => SignalingState::Closed,
276 i => {
277 return Err(WebRtcError::Backend(format!(
278 "unknown signaling state: {:?}",
279 i
280 )))
281 }
282 };
283 self.signaller.update_signaling_state(state);
284 }
285 InternalEvent::UpdateGatheringState => {
286 use gst_webrtc::WebRTCICEGatheringState::*;
287 let val = self
288 .webrtc
289 .property::<gst_webrtc::WebRTCICEGatheringState>("ice-gathering-state");
290 let state = match val {
291 New => GatheringState::New,
292 Gathering => GatheringState::Gathering,
293 Complete => GatheringState::Complete,
294 i => {
295 return Err(WebRtcError::Backend(format!(
296 "unknown gathering state: {:?}",
297 i
298 )))
299 }
300 };
301 self.signaller.update_gathering_state(state);
302 }
303 InternalEvent::UpdateIceConnectionState => {
304 use gst_webrtc::WebRTCICEConnectionState::*;
305 let val = self
306 .webrtc
307 .property::<gst_webrtc::WebRTCICEConnectionState>("ice-connection-state");
308 let state = match val {
309 New => IceConnectionState::New,
310 Checking => IceConnectionState::Checking,
311 Connected => IceConnectionState::Connected,
312 Completed => IceConnectionState::Completed,
313 Disconnected => IceConnectionState::Disconnected,
314 Failed => IceConnectionState::Failed,
315 Closed => IceConnectionState::Closed,
316 i => {
317 return Err(WebRtcError::Backend(format!(
318 "unknown ICE connection state: {:?}",
319 i
320 )))
321 }
322 };
323 self.signaller.update_ice_connection_state(state);
324 }
325 }
326 Ok(())
327 }
328
329 fn quit(&mut self) {
330 self.signaller.close();
331
332 self.pipeline.set_state(gst::State::Null).unwrap();
333 }
334}
335
336impl GStreamerWebRtcController {
337 fn set_description(
338 &mut self,
339 desc: SessionDescription,
340 description_type: DescriptionType,
341 cb: Box<dyn FnOnce() + Send + 'static>,
342 ) -> WebRtcResult {
343 let ty = match desc.type_ {
344 SdpType::Answer => gst_webrtc::WebRTCSDPType::Answer,
345 SdpType::Offer => gst_webrtc::WebRTCSDPType::Offer,
346 SdpType::Pranswer => gst_webrtc::WebRTCSDPType::Pranswer,
347 SdpType::Rollback => gst_webrtc::WebRTCSDPType::Rollback,
348 };
349
350 let kind = match description_type {
351 DescriptionType::Local => "set-local-description",
352 DescriptionType::Remote => "set-remote-description",
353 };
354
355 let sdp = gst_sdp::SDPMessage::parse_buffer(desc.sdp.as_bytes()).unwrap();
356 if description_type == DescriptionType::Remote {
357 self.remote_offer_generation += 1;
358 self.store_remote_mline_info(&sdp);
359 }
360 let answer = gst_webrtc::WebRTCSessionDescription::new(ty, sdp);
361 let thread = self.thread.clone();
362 let remote_offer_generation = self.remote_offer_generation;
363 let promise = gst::Promise::with_change_func(move |_promise| {
364 thread.internal_event(InternalEvent::DescriptionAdded(
367 cb,
368 description_type,
369 desc.type_,
370 remote_offer_generation,
371 ));
372 });
373 self.webrtc.emit_by_name::<()>(kind, &[&answer, &promise]);
374 Ok(())
375 }
376
377 fn store_remote_mline_info(&mut self, sdp: &gst_sdp::SDPMessage) {
378 self.pending_remote_mline_info.clear();
379 for media in sdp.medias() {
380 let mut caps = gst::Caps::new_empty();
381 let caps_mut = caps.get_mut().expect("Fresh caps should be uniquely owned");
382 for format in media.formats() {
383 if format == "webrtc-datachannel" {
384 return;
385 }
386 let pt = format
387 .parse()
388 .expect("Gstreamer provided noninteger format");
389 caps_mut.append(
390 media
391 .caps_from_media(pt)
392 .expect("get_format() did not return a format from the SDP"),
393 );
394 self.pt_counter = cmp::max(self.pt_counter, pt + 1);
395 }
396 for s in caps_mut.iter_mut() {
397 s.set_name("application/x-rtp")
402 }
403 self.pending_remote_mline_info.push(MLineInfo {
407 caps,
408 is_used: false,
411 payload: media
415 .format(0)
416 .expect("Gstreamer reported incorrect formats_len()")
417 .parse()
418 .expect("Gstreamer provided noninteger format"),
419 });
420 }
421 }
422
423 fn link_stream(
436 &mut self,
437 stream_id: &MediaStreamId,
438 stream: &mut GStreamerMediaStream,
439 request_new_pads: bool,
440 ) -> WebRtcResult {
441 let caps = stream.caps();
442 let idx = self
443 .remote_mline_info
444 .iter()
445 .enumerate()
446 .filter(|(_, x)| !x.is_used)
447 .find(|(_, x)| x.caps.can_intersect(caps))
448 .map(|x| x.0);
449 if let Some(idx) = idx {
450 if idx >= self.request_pad_counter {
451 for i in self.request_pad_counter..=idx {
452 self.webrtc
461 .request_pad_simple(&format!("sink_{}", i))
462 .ok_or("Cannot request sink pad")?;
463 }
464 self.request_pad_counter = idx + 1;
465 }
466 stream.attach_to_pipeline(&self.pipeline);
467 let element = stream.encoded();
468 self.remote_mline_info[idx].is_used = true;
469 let caps = stream.caps_with_payload(self.remote_mline_info[idx].payload);
470 element.set_property("caps", &caps);
471 let src = element.static_pad("src").ok_or("Cannot request src pad")?;
472 let sink = self
473 .webrtc
474 .static_pad(&format!("sink_{}", idx))
475 .ok_or("Cannot request sink pad")?;
476 src.link(&sink)?;
477 self.streams.push(*stream_id);
478 } else if request_new_pads {
479 stream.attach_to_pipeline(&self.pipeline);
480 let element = stream.encoded();
481 let caps = stream.caps_with_payload(self.pt_counter);
482 self.pt_counter += 1;
483 element.set_property("caps", &caps);
484 let src = element.static_pad("src").ok_or("Cannot request src pad")?;
485 let sink = self
486 .webrtc
487 .request_pad_simple(&format!("sink_{}", self.request_pad_counter))
488 .ok_or("Cannot request sink pad")?;
489 self.request_pad_counter += 1;
490 src.link(&sink)?;
491 self.streams.push(*stream_id);
492 } else {
493 self.pending_streams.push(*stream_id);
494 }
495 Ok(())
496 }
497
498 fn flush_pending_streams(&mut self, request_new_pads: bool) -> WebRtcResult {
500 let pending_streams = mem::replace(&mut self.pending_streams, vec![]);
501 for stream_id in pending_streams {
502 let stream =
503 get_stream(&stream_id).expect("Media streams registry does not contain such ID");
504 let mut stream = stream.lock().unwrap();
505 let mut stream = stream
506 .as_mut_any()
507 .downcast_mut::<GStreamerMediaStream>()
508 .ok_or("Does not currently support non-gstreamer streams")?;
509 self.link_stream(&stream_id, &mut stream, request_new_pads)?;
510 }
511 Ok(())
512 }
513
514 fn start_pipeline(&mut self) -> WebRtcResult {
515 self.pipeline.add(&self.webrtc)?;
516
517 let thread = Mutex::new(self.thread.clone());
520 self.webrtc
521 .connect("on-ice-candidate", false, move |values| {
522 thread
523 .lock()
524 .unwrap()
525 .internal_event(InternalEvent::OnIceCandidate(candidate(values)));
526 None
527 });
528
529 let thread = Arc::new(Mutex::new(self.thread.clone()));
530 self.webrtc.connect_pad_added({
531 let pipeline_weak = self.pipeline.downgrade();
532 move |_element, pad| {
533 let Some(pipe) = pipeline_weak.upgrade() else {
534 warn!("Pipeline already deallocated");
535 return;
536 };
537 process_new_stream(pad, &pipe, thread.clone());
538 }
539 });
540
541 let thread = Mutex::new(self.thread.clone());
544 self.webrtc
545 .connect("on-negotiation-needed", false, move |_values| {
546 thread
547 .lock()
548 .unwrap()
549 .internal_event(InternalEvent::OnNegotiationNeeded);
550 None
551 });
552
553 let thread = Mutex::new(self.thread.clone());
554 self.webrtc
555 .connect("notify::signaling-state", false, move |_values| {
556 thread
557 .lock()
558 .unwrap()
559 .internal_event(InternalEvent::UpdateSignalingState);
560 None
561 });
562 let thread = Mutex::new(self.thread.clone());
563 self.webrtc
564 .connect("notify::ice-connection-state", false, move |_values| {
565 thread
566 .lock()
567 .unwrap()
568 .internal_event(InternalEvent::UpdateIceConnectionState);
569 None
570 });
571 let thread = Mutex::new(self.thread.clone());
572 self.webrtc
573 .connect("notify::ice-gathering-state", false, move |_values| {
574 thread
575 .lock()
576 .unwrap()
577 .internal_event(InternalEvent::UpdateGatheringState);
578 None
579 });
580 let thread = Mutex::new(self.thread.clone());
581 let data_channels = self.data_channels.clone();
582 let next_data_channel_id = self.next_data_channel_id.clone();
583 self.webrtc
584 .connect("on-data-channel", false, move |channel| {
585 let channel = channel[1]
586 .get::<gst_webrtc::WebRTCDataChannel>()
587 .map_err(|e| e.to_string())
588 .expect("Invalid data channel");
589 let id = next_data_channel_id.fetch_add(1, Ordering::Relaxed);
590 let thread_ = thread.lock().unwrap().clone();
591 match GStreamerWebRtcDataChannel::from(&id, channel, &thread_) {
592 Ok(channel) => {
593 let mut closed_channel = false;
594 {
595 thread_.internal_event(InternalEvent::OnDataChannelEvent(
596 id,
597 DataChannelEvent::NewChannel,
598 ));
599
600 let mut data_channels = data_channels.lock().unwrap();
601 if let Some(ref mut channel) = data_channels.get_mut(&id) {
602 match channel {
603 DataChannelEventTarget::Buffered(ref mut events) => {
604 for event in events.drain(0..) {
605 match event {
606 DataChannelEvent::Close => closed_channel = true,
607 _ => {}
608 }
609 thread_.internal_event(
610 InternalEvent::OnDataChannelEvent(id, event),
611 );
612 }
613 }
614 _ => debug_assert!(
615 false,
616 "Trying to register a data channel with an existing ID"
617 ),
618 }
619 }
620 data_channels.remove(&id);
621 }
622 if !closed_channel {
623 if register_data_channel(data_channels.clone(), id, channel).is_err() {
624 warn!("Could not register data channel {:?}", id);
625 return None;
626 }
627 }
628 }
629 Err(error) => {
630 warn!("Could not create data channel {:?}", error);
631 }
632 }
633 None
634 });
635
636 self.pipeline.set_state(gst::State::Ready)?;
637 Ok(())
638 }
639}
640
641pub fn construct(
642 signaller: Box<dyn WebRtcSignaller>,
643 thread: WebRtcThread,
644) -> Result<GStreamerWebRtcController, WebRtcError> {
645 let main_loop = glib::MainLoop::new(None, false);
646 let pipeline = gst::Pipeline::with_name("webrtc main");
647 pipeline.set_start_time(gst::ClockTime::NONE);
648 pipeline.set_base_time(*BACKEND_BASE_TIME);
649 pipeline.use_clock(Some(&gst::SystemClock::obtain()));
650 let webrtc = gst::ElementFactory::make("webrtcbin")
651 .name("sendrecv")
652 .build()
653 .map_err(|error| format!("webrtcbin element not found: {error:?}"))?;
654 let mut controller = GStreamerWebRtcController {
655 webrtc,
656 pipeline,
657 signaller,
658 thread,
659 remote_mline_info: vec![],
660 pending_remote_mline_info: vec![],
661 streams: vec![],
662 pending_streams: vec![],
663 pt_counter: 96,
664 request_pad_counter: 0,
665 remote_offer_generation: 0,
666 delayed_negotiation: false,
667 _main_loop: main_loop,
668 data_channels: Arc::new(Mutex::new(HashMap::new())),
669 next_data_channel_id: Arc::new(AtomicUsize::new(0)),
670 };
671 controller.start_pipeline()?;
672 Ok(controller)
673}
674
675fn on_offer_or_answer_created(
676 ty: SdpType,
677 reply: &gst::StructureRef,
678 cb: Box<dyn FnOnce(SessionDescription) + Send + 'static>,
679) {
680 debug_assert!(ty == SdpType::Offer || ty == SdpType::Answer);
681 let reply = reply
682 .value(ty.as_str())
683 .unwrap()
684 .get::<gst_webrtc::WebRTCSessionDescription>()
685 .expect("Invalid argument");
686
687 let type_ = match reply.type_() {
688 gst_webrtc::WebRTCSDPType::Answer => SdpType::Answer,
689 gst_webrtc::WebRTCSDPType::Offer => SdpType::Offer,
690 gst_webrtc::WebRTCSDPType::Pranswer => SdpType::Pranswer,
691 gst_webrtc::WebRTCSDPType::Rollback => SdpType::Rollback,
692 _ => panic!("unknown sdp response"),
693 };
694
695 let desc = SessionDescription {
696 sdp: reply.sdp().as_text().unwrap(),
697 type_,
698 };
699
700 cb(desc);
701}
702
703fn on_incoming_stream(pipe: &gst::Pipeline, thread: Arc<Mutex<WebRtcThread>>, pad: &gst::Pad) {
704 let decodebin = gst::ElementFactory::make("decodebin").build().unwrap();
705 let caps = pad.query_caps(None);
706 let name = caps
707 .structure(0)
708 .unwrap()
709 .get::<String>("media")
710 .expect("Invalid 'media' field");
711 let decodebin2 = decodebin.clone();
712 decodebin.connect_pad_added({
713 let pipeline_weak = pipe.downgrade();
714 move |_element, pad| {
715 let Some(pipe) = pipeline_weak.upgrade() else {
716 warn!("Pipeline already deallocated");
717 return;
718 };
719 on_incoming_decodebin_stream(pad, &pipe, thread.clone(), &name);
720 }
721 });
722 pipe.add(&decodebin).unwrap();
723
724 let decodepad = decodebin.static_pad("sink").unwrap();
725 pad.link(&decodepad).unwrap();
726 decodebin2.sync_state_with_parent().unwrap();
727}
728
729fn on_incoming_decodebin_stream(
730 pad: &gst::Pad,
731 pipe: &gst::Pipeline,
732 thread: Arc<Mutex<WebRtcThread>>,
733 name: &str,
734) {
735 let proxy_sink = gst::ElementFactory::make("proxysink").build().unwrap();
736 let proxy_src = gst::ElementFactory::make("proxysrc")
737 .property("proxysink", &proxy_sink)
738 .build()
739 .unwrap();
740 pipe.add(&proxy_sink).unwrap();
741 let sinkpad = proxy_sink.static_pad("sink").unwrap();
742
743 pad.link(&sinkpad).unwrap();
744 proxy_sink.sync_state_with_parent().unwrap();
745
746 let (stream, ty) = if name == "video" {
747 (
748 GStreamerMediaStream::create_video_from(proxy_src),
749 MediaStreamType::Video,
750 )
751 } else {
752 (
753 GStreamerMediaStream::create_audio_from(proxy_src),
754 MediaStreamType::Audio,
755 )
756 };
757 thread
758 .lock()
759 .unwrap()
760 .internal_event(InternalEvent::OnAddStream(stream, ty));
761}
762
763fn process_new_stream(pad: &gst::Pad, pipe: &gst::Pipeline, thread: Arc<Mutex<WebRtcThread>>) {
764 if pad.direction() != gst::PadDirection::Src {
765 return;
767 }
768 on_incoming_stream(pipe, thread, pad)
769}
770
771fn candidate(values: &[glib::Value]) -> IceCandidate {
772 let _webrtc = values[0].get::<gst::Element>().expect("Invalid argument");
773 let sdp_mline_index = values[1].get::<u32>().expect("Invalid argument");
774 let candidate = values[2].get::<String>().expect("Invalid argument");
775
776 IceCandidate {
777 sdp_mline_index,
778 candidate,
779 }
780}
781
782fn register_data_channel(
783 registry: Arc<Mutex<HashMap<DataChannelId, DataChannelEventTarget>>>,
784 id: DataChannelId,
785 channel: GStreamerWebRtcDataChannel,
786) -> WebRtcDataChannelResult {
787 if registry.lock().unwrap().contains_key(&id) {
788 return Err(WebRtcError::Backend(
789 "Could not register data channel. ID collision".to_owned(),
790 ));
791 }
792 registry
793 .lock()
794 .unwrap()
795 .insert(id, DataChannelEventTarget::Created(channel));
796 Ok(id)
797}