1use std::collections::HashMap;
6use std::sync::atomic::{AtomicUsize, Ordering};
7use std::sync::{Arc, Mutex};
8use std::{cmp, mem};
9
10use glib;
11use glib::prelude::*;
12use gstreamer;
13use gstreamer::prelude::*;
14use gstreamer_sdp;
15use gstreamer_webrtc;
16use log::warn;
17use servo_media_streams::MediaStreamType;
18use servo_media_streams::registry::{MediaStreamId, get_stream};
19use servo_media_webrtc::datachannel::DataChannelId;
20use servo_media_webrtc::thread::InternalEvent;
21use servo_media_webrtc::{WebRtcController as WebRtcThread, *};
22
23use super::BACKEND_BASE_TIME;
24use crate::datachannel::GStreamerWebRtcDataChannel;
25use crate::media_stream::GStreamerMediaStream;
26
27#[derive(Debug, Clone)]
31pub struct MLineInfo {
32 caps: gstreamer::Caps,
34 is_used: bool,
36 payload: i32,
38}
39
40enum DataChannelEventTarget {
41 Buffered(Vec<DataChannelEvent>),
42 Created(GStreamerWebRtcDataChannel),
43}
44
45pub struct GStreamerWebRtcController {
46 webrtc: gstreamer::Element,
47 pipeline: gstreamer::Pipeline,
48 delayed_negotiation: bool,
51 thread: WebRtcThread,
54 signaller: Box<dyn WebRtcSignaller>,
55 streams: Vec<MediaStreamId>,
58 pending_streams: Vec<MediaStreamId>,
69 pt_counter: i32,
74 request_pad_counter: usize,
77 remote_mline_info: Vec<MLineInfo>,
80 pending_remote_mline_info: Vec<MLineInfo>,
85 remote_offer_generation: u32,
87 _main_loop: glib::MainLoop,
88 data_channels: Arc<Mutex<HashMap<DataChannelId, DataChannelEventTarget>>>,
89 next_data_channel_id: Arc<AtomicUsize>,
90}
91
92impl WebRtcControllerBackend for GStreamerWebRtcController {
93 fn add_ice_candidate(&mut self, candidate: IceCandidate) -> WebRtcResult {
94 self.webrtc.emit_by_name::<()>(
95 "add-ice-candidate",
96 &[&candidate.sdp_mline_index, &candidate.candidate],
97 );
98 Ok(())
99 }
100
101 fn set_remote_description(
102 &mut self,
103 desc: SessionDescription,
104 cb: Box<dyn FnOnce() + Send + 'static>,
105 ) -> WebRtcResult {
106 self.set_description(desc, DescriptionType::Remote, cb)
107 }
108
109 fn set_local_description(
110 &mut self,
111 desc: SessionDescription,
112 cb: Box<dyn FnOnce() + Send + 'static>,
113 ) -> WebRtcResult {
114 self.set_description(desc, DescriptionType::Local, cb)
115 }
116
117 fn create_offer(
118 &mut self,
119 cb: Box<dyn FnOnce(SessionDescription) + Send + 'static>,
120 ) -> WebRtcResult {
121 self.flush_pending_streams(true)?;
122 self.pipeline.set_state(gstreamer::State::Playing)?;
123 let promise = gstreamer::Promise::with_change_func(move |res| {
124 res.map(|s| on_offer_or_answer_created(SdpType::Offer, s.unwrap(), cb))
125 .unwrap();
126 });
127
128 self.webrtc
129 .emit_by_name::<()>("create-offer", &[&None::<gstreamer::Structure>, &promise]);
130 Ok(())
131 }
132
133 fn create_answer(
134 &mut self,
135 cb: Box<dyn FnOnce(SessionDescription) + Send + 'static>,
136 ) -> WebRtcResult {
137 let promise = gstreamer::Promise::with_change_func(move |res| {
138 res.map(|s| on_offer_or_answer_created(SdpType::Answer, s.unwrap(), cb))
139 .unwrap();
140 });
141
142 self.webrtc
143 .emit_by_name::<()>("create-answer", &[&None::<gstreamer::Structure>, &promise]);
144 Ok(())
145 }
146
147 fn add_stream(&mut self, stream_id: &MediaStreamId) -> WebRtcResult {
148 let stream =
149 get_stream(stream_id).expect("Media streams registry does not contain such ID");
150 let mut stream = stream.lock().unwrap();
151 let stream = stream
152 .as_mut_any()
153 .downcast_mut::<GStreamerMediaStream>()
154 .ok_or("Does not currently support non-gstreamer streams")?;
155 self.link_stream(stream_id, stream, false)?;
156 if self.delayed_negotiation && (self.streams.len() > 1 || self.pending_streams.len() > 1) {
157 self.delayed_negotiation = false;
158 self.signaller.on_negotiation_needed(&self.thread);
159 }
160 Ok(())
161 }
162
163 fn create_data_channel(&mut self, init: &DataChannelInit) -> WebRtcDataChannelResult {
164 let id = self.next_data_channel_id.fetch_add(1, Ordering::Relaxed);
165 match GStreamerWebRtcDataChannel::new(&id, &self.webrtc, &self.thread, init) {
166 Ok(channel) => register_data_channel(self.data_channels.clone(), id, channel),
167 Err(error) => Err(WebRtcError::Backend(error)),
168 }
169 }
170
171 fn close_data_channel(&mut self, id: &DataChannelId) -> WebRtcResult {
172 let mut data_channels = self.data_channels.lock().unwrap();
175 match data_channels.get(id) {
176 Some(ref channel) => match channel {
177 DataChannelEventTarget::Created(channel) => {
178 channel.close();
179 Ok(())
180 },
181 DataChannelEventTarget::Buffered(_) => data_channels
182 .remove(id)
183 .ok_or(WebRtcError::Backend("Unknown data channel".to_owned()))
184 .map(|_| ()),
185 },
186 None => Err(WebRtcError::Backend("Unknown data channel".to_owned())),
187 }
188 }
189
190 fn send_data_channel_message(
191 &mut self,
192 id: &DataChannelId,
193 message: &DataChannelMessage,
194 ) -> WebRtcResult {
195 match self.data_channels.lock().unwrap().get(id) {
196 Some(ref channel) => match channel {
197 DataChannelEventTarget::Created(channel) => {
198 channel.send(message);
199 Ok(())
200 },
201 _ => Ok(()),
202 },
203 None => Err(WebRtcError::Backend("Unknown data channel".to_owned())),
204 }
205 }
206
207 fn configure(&mut self, stun_server: &str, policy: BundlePolicy) -> WebRtcResult {
208 self.webrtc
209 .set_property_from_str("stun-server", stun_server);
210 self.webrtc
211 .set_property_from_str("bundle-policy", policy.as_str());
212 Ok(())
213 }
214
215 fn internal_event(&mut self, e: thread::InternalEvent) -> WebRtcResult {
216 match e {
217 InternalEvent::OnNegotiationNeeded => {
218 if self.streams.is_empty() && self.pending_streams.is_empty() {
219 self.delayed_negotiation = true;
224 } else {
225 self.signaller.on_negotiation_needed(&self.thread);
226 }
227 },
228 InternalEvent::OnIceCandidate(candidate) => {
229 self.signaller.on_ice_candidate(&self.thread, candidate);
230 },
231 InternalEvent::OnAddStream(stream, ty) => {
232 self.pipeline.set_state(gstreamer::State::Playing)?;
233 self.signaller.on_add_stream(&stream, ty);
234 },
235 InternalEvent::OnDataChannelEvent(channel_id, event) => {
236 let mut data_channels = self.data_channels.lock().unwrap();
237 match data_channels.get_mut(&channel_id) {
238 None => {
239 data_channels
240 .insert(channel_id, DataChannelEventTarget::Buffered(vec![event]));
241 },
242 Some(ref mut channel) => match channel {
243 &mut &mut DataChannelEventTarget::Buffered(ref mut events) => {
244 events.push(event);
245 return Ok(());
246 },
247 DataChannelEventTarget::Created(_) => {
248 if let DataChannelEvent::Close = event {
249 data_channels.remove(&channel_id);
250 }
251 self.signaller
252 .on_data_channel_event(channel_id, event, &self.thread);
253 },
254 },
255 }
256 },
257 InternalEvent::DescriptionAdded(cb, description_type, ty, remote_offer_generation) => {
258 if description_type == DescriptionType::Remote &&
259 ty == SdpType::Offer &&
260 remote_offer_generation == self.remote_offer_generation
261 {
262 mem::swap(
263 &mut self.pending_remote_mline_info,
264 &mut self.remote_mline_info,
265 );
266 self.pending_remote_mline_info.clear();
267 self.flush_pending_streams(false)?;
268 }
269 self.pipeline.set_state(gstreamer::State::Playing)?;
270 cb();
271 },
272 InternalEvent::UpdateSignalingState => {
273 use gstreamer_webrtc::WebRTCSignalingState::*;
274 let val = self
275 .webrtc
276 .property::<gstreamer_webrtc::WebRTCSignalingState>("signaling-state");
277 let state = match val {
278 Stable => SignalingState::Stable,
279 HaveLocalOffer => SignalingState::HaveLocalOffer,
280 HaveRemoteOffer => SignalingState::HaveRemoteOffer,
281 HaveLocalPranswer => SignalingState::HaveLocalPranswer,
282 HaveRemotePranswer => SignalingState::HaveRemotePranswer,
283 Closed => SignalingState::Closed,
284 i => {
285 return Err(WebRtcError::Backend(format!(
286 "unknown signaling state: {:?}",
287 i
288 )));
289 },
290 };
291 self.signaller.update_signaling_state(state);
292 },
293 InternalEvent::UpdateGatheringState => {
294 use gstreamer_webrtc::WebRTCICEGatheringState::*;
295 let val = self
296 .webrtc
297 .property::<gstreamer_webrtc::WebRTCICEGatheringState>("ice-gathering-state");
298 let state = match val {
299 New => GatheringState::New,
300 Gathering => GatheringState::Gathering,
301 Complete => GatheringState::Complete,
302 i => {
303 return Err(WebRtcError::Backend(format!(
304 "unknown gathering state: {:?}",
305 i
306 )));
307 },
308 };
309 self.signaller.update_gathering_state(state);
310 },
311 InternalEvent::UpdateIceConnectionState => {
312 use gstreamer_webrtc::WebRTCICEConnectionState::*;
313 let val = self
314 .webrtc
315 .property::<gstreamer_webrtc::WebRTCICEConnectionState>("ice-connection-state");
316 let state = match val {
317 New => IceConnectionState::New,
318 Checking => IceConnectionState::Checking,
319 Connected => IceConnectionState::Connected,
320 Completed => IceConnectionState::Completed,
321 Disconnected => IceConnectionState::Disconnected,
322 Failed => IceConnectionState::Failed,
323 Closed => IceConnectionState::Closed,
324 i => {
325 return Err(WebRtcError::Backend(format!(
326 "unknown ICE connection state: {:?}",
327 i
328 )));
329 },
330 };
331 self.signaller.update_ice_connection_state(state);
332 },
333 }
334 Ok(())
335 }
336
337 fn quit(&mut self) {
338 self.signaller.close();
339
340 self.pipeline.set_state(gstreamer::State::Null).unwrap();
341 }
342}
343
344impl GStreamerWebRtcController {
345 fn set_description(
346 &mut self,
347 desc: SessionDescription,
348 description_type: DescriptionType,
349 cb: Box<dyn FnOnce() + Send + 'static>,
350 ) -> WebRtcResult {
351 let ty = match desc.type_ {
352 SdpType::Answer => gstreamer_webrtc::WebRTCSDPType::Answer,
353 SdpType::Offer => gstreamer_webrtc::WebRTCSDPType::Offer,
354 SdpType::Pranswer => gstreamer_webrtc::WebRTCSDPType::Pranswer,
355 SdpType::Rollback => gstreamer_webrtc::WebRTCSDPType::Rollback,
356 };
357
358 let kind = match description_type {
359 DescriptionType::Local => "set-local-description",
360 DescriptionType::Remote => "set-remote-description",
361 };
362
363 let sdp = gstreamer_sdp::SDPMessage::parse_buffer(desc.sdp.as_bytes()).unwrap();
364 if description_type == DescriptionType::Remote {
365 self.remote_offer_generation += 1;
366 self.store_remote_mline_info(&sdp);
367 }
368 let answer = gstreamer_webrtc::WebRTCSessionDescription::new(ty, sdp);
369 let thread = self.thread.clone();
370 let remote_offer_generation = self.remote_offer_generation;
371 let promise = gstreamer::Promise::with_change_func(move |_promise| {
372 thread.internal_event(InternalEvent::DescriptionAdded(
375 cb,
376 description_type,
377 desc.type_,
378 remote_offer_generation,
379 ));
380 });
381 self.webrtc.emit_by_name::<()>(kind, &[&answer, &promise]);
382 Ok(())
383 }
384
385 fn store_remote_mline_info(&mut self, sdp: &gstreamer_sdp::SDPMessage) {
386 self.pending_remote_mline_info.clear();
387 for media in sdp.medias() {
388 let mut caps = gstreamer::Caps::new_empty();
389 let caps_mut = caps.get_mut().expect("Fresh caps should be uniquely owned");
390 for format in media.formats() {
391 if format == "webrtc-datachannel" {
392 return;
393 }
394 let pt = format
395 .parse()
396 .expect("Gstreamer provided noninteger format");
397 caps_mut.append(
398 media
399 .caps_from_media(pt)
400 .expect("get_format() did not return a format from the SDP"),
401 );
402 self.pt_counter = cmp::max(self.pt_counter, pt + 1);
403 }
404 for s in caps_mut.iter_mut() {
405 s.set_name("application/x-rtp")
410 }
411 self.pending_remote_mline_info.push(MLineInfo {
415 caps,
416 is_used: false,
419 payload: media
423 .format(0)
424 .expect("Gstreamer reported incorrect formats_len()")
425 .parse()
426 .expect("Gstreamer provided noninteger format"),
427 });
428 }
429 }
430
431 fn link_stream(
444 &mut self,
445 stream_id: &MediaStreamId,
446 stream: &mut GStreamerMediaStream,
447 request_new_pads: bool,
448 ) -> WebRtcResult {
449 let caps = stream.caps();
450 let idx = self
451 .remote_mline_info
452 .iter()
453 .enumerate()
454 .filter(|(_, x)| !x.is_used)
455 .find(|(_, x)| x.caps.can_intersect(caps))
456 .map(|x| x.0);
457 if let Some(idx) = idx {
458 if idx >= self.request_pad_counter {
459 for i in self.request_pad_counter..=idx {
460 self.webrtc
469 .request_pad_simple(&format!("sink_{}", i))
470 .ok_or("Cannot request sink pad")?;
471 }
472 self.request_pad_counter = idx + 1;
473 }
474 stream.attach_to_pipeline(&self.pipeline);
475 let element = stream.encoded().map_err(|_| {
476 WebRtcError::Backend(String::from("Failed to attach encoding adapters to stream"))
477 })?;
478 self.remote_mline_info[idx].is_used = true;
479 let caps = stream.caps_with_payload(self.remote_mline_info[idx].payload);
480 element.set_property("caps", &caps);
481 let src = element.static_pad("src").ok_or("Cannot request src pad")?;
482 let sink = self
483 .webrtc
484 .static_pad(&format!("sink_{}", idx))
485 .ok_or("Cannot request sink pad")?;
486 src.link(&sink)?;
487 self.streams.push(*stream_id);
488 } else if request_new_pads {
489 stream.attach_to_pipeline(&self.pipeline);
490 let element = stream.encoded().map_err(|_| {
491 WebRtcError::Backend(String::from("Failed to attach encoding adapters to stream"))
492 })?;
493 let caps = stream.caps_with_payload(self.pt_counter);
494 self.pt_counter += 1;
495 element.set_property("caps", &caps);
496 let src = element.static_pad("src").ok_or("Cannot request src pad")?;
497 let sink = self
498 .webrtc
499 .request_pad_simple(&format!("sink_{}", self.request_pad_counter))
500 .ok_or("Cannot request sink pad")?;
501 self.request_pad_counter += 1;
502 src.link(&sink)?;
503 self.streams.push(*stream_id);
504 } else {
505 self.pending_streams.push(*stream_id);
506 }
507 Ok(())
508 }
509
510 fn flush_pending_streams(&mut self, request_new_pads: bool) -> WebRtcResult {
512 let pending_streams = std::mem::take(&mut self.pending_streams);
513 for stream_id in pending_streams {
514 let stream =
515 get_stream(&stream_id).expect("Media streams registry does not contain such ID");
516 let mut stream = stream.lock().unwrap();
517 let stream = stream
518 .as_mut_any()
519 .downcast_mut::<GStreamerMediaStream>()
520 .ok_or("Does not currently support non-gstreamer streams")?;
521 self.link_stream(&stream_id, stream, request_new_pads)?;
522 }
523 Ok(())
524 }
525
526 fn start_pipeline(&mut self) -> WebRtcResult {
527 self.pipeline.add(&self.webrtc)?;
528
529 let thread = Mutex::new(self.thread.clone());
532 self.webrtc
533 .connect("on-ice-candidate", false, move |values| {
534 thread
535 .lock()
536 .unwrap()
537 .internal_event(InternalEvent::OnIceCandidate(candidate(values)));
538 None
539 });
540
541 let thread = Arc::new(Mutex::new(self.thread.clone()));
542 self.webrtc.connect_pad_added({
543 let pipeline_weak = self.pipeline.downgrade();
544 move |_element, pad| {
545 let Some(pipe) = pipeline_weak.upgrade() else {
546 warn!("Pipeline already deallocated");
547 return;
548 };
549 process_new_stream(pad, &pipe, thread.clone());
550 }
551 });
552
553 let thread = Mutex::new(self.thread.clone());
556 self.webrtc
557 .connect("on-negotiation-needed", false, move |_values| {
558 thread
559 .lock()
560 .unwrap()
561 .internal_event(InternalEvent::OnNegotiationNeeded);
562 None
563 });
564
565 let thread = Mutex::new(self.thread.clone());
566 self.webrtc
567 .connect("notify::signaling-state", false, move |_values| {
568 thread
569 .lock()
570 .unwrap()
571 .internal_event(InternalEvent::UpdateSignalingState);
572 None
573 });
574 let thread = Mutex::new(self.thread.clone());
575 self.webrtc
576 .connect("notify::ice-connection-state", false, move |_values| {
577 thread
578 .lock()
579 .unwrap()
580 .internal_event(InternalEvent::UpdateIceConnectionState);
581 None
582 });
583 let thread = Mutex::new(self.thread.clone());
584 self.webrtc
585 .connect("notify::ice-gathering-state", false, move |_values| {
586 thread
587 .lock()
588 .unwrap()
589 .internal_event(InternalEvent::UpdateGatheringState);
590 None
591 });
592 let thread = Mutex::new(self.thread.clone());
593 let data_channels = self.data_channels.clone();
594 let next_data_channel_id = self.next_data_channel_id.clone();
595 self.webrtc
596 .connect("on-data-channel", false, move |channel| {
597 let channel = channel[1]
598 .get::<gstreamer_webrtc::WebRTCDataChannel>()
599 .map_err(|e| e.to_string())
600 .expect("Invalid data channel");
601 let id = next_data_channel_id.fetch_add(1, Ordering::Relaxed);
602 let thread_ = thread.lock().unwrap().clone();
603 match GStreamerWebRtcDataChannel::from(&id, channel, &thread_) {
604 Ok(channel) => {
605 let mut closed_channel = false;
606 {
607 thread_.internal_event(InternalEvent::OnDataChannelEvent(
608 id,
609 DataChannelEvent::NewChannel,
610 ));
611
612 let mut data_channels = data_channels.lock().unwrap();
613 if let Some(ref mut channel) = data_channels.get_mut(&id) {
614 match channel {
615 &mut &mut DataChannelEventTarget::Buffered(ref mut events) => {
616 for event in events.drain(0..) {
617 if let DataChannelEvent::Close = event {
618 closed_channel = true
619 }
620 thread_.internal_event(
621 InternalEvent::OnDataChannelEvent(id, event),
622 );
623 }
624 },
625 _ => debug_assert!(
626 false,
627 "Trying to register a data channel with an existing ID"
628 ),
629 }
630 }
631 data_channels.remove(&id);
632 }
633 if !closed_channel &&
634 register_data_channel(data_channels.clone(), id, channel).is_err()
635 {
636 warn!("Could not register data channel {:?}", id);
637 return None;
638 }
639 },
640 Err(error) => {
641 warn!("Could not create data channel {:?}", error);
642 },
643 }
644 None
645 });
646
647 self.pipeline.set_state(gstreamer::State::Ready)?;
648 Ok(())
649 }
650}
651
652pub fn construct(
653 signaller: Box<dyn WebRtcSignaller>,
654 thread: WebRtcThread,
655) -> Result<GStreamerWebRtcController, WebRtcError> {
656 let main_loop = glib::MainLoop::new(None, false);
657 let pipeline = gstreamer::Pipeline::with_name("webrtc main");
658 pipeline.set_start_time(gstreamer::ClockTime::NONE);
659 pipeline.set_base_time(*BACKEND_BASE_TIME);
660 pipeline.use_clock(Some(&gstreamer::SystemClock::obtain()));
661 let webrtc = gstreamer::ElementFactory::make("webrtcbin")
662 .name("sendrecv")
663 .build()
664 .map_err(|error| format!("webrtcbin element not found: {error:?}"))?;
665 let mut controller = GStreamerWebRtcController {
666 webrtc,
667 pipeline,
668 signaller,
669 thread,
670 remote_mline_info: vec![],
671 pending_remote_mline_info: vec![],
672 streams: vec![],
673 pending_streams: vec![],
674 pt_counter: 96,
675 request_pad_counter: 0,
676 remote_offer_generation: 0,
677 delayed_negotiation: false,
678 _main_loop: main_loop,
679 data_channels: Arc::new(Mutex::new(HashMap::new())),
680 next_data_channel_id: Arc::new(AtomicUsize::new(0)),
681 };
682 controller.start_pipeline()?;
683 Ok(controller)
684}
685
686fn on_offer_or_answer_created(
687 ty: SdpType,
688 reply: &gstreamer::StructureRef,
689 cb: Box<dyn FnOnce(SessionDescription) + Send + 'static>,
690) {
691 debug_assert!(ty == SdpType::Offer || ty == SdpType::Answer);
692 let reply = reply
693 .value(ty.as_str())
694 .unwrap()
695 .get::<gstreamer_webrtc::WebRTCSessionDescription>()
696 .expect("Invalid argument");
697
698 let type_ = match reply.type_() {
699 gstreamer_webrtc::WebRTCSDPType::Answer => SdpType::Answer,
700 gstreamer_webrtc::WebRTCSDPType::Offer => SdpType::Offer,
701 gstreamer_webrtc::WebRTCSDPType::Pranswer => SdpType::Pranswer,
702 gstreamer_webrtc::WebRTCSDPType::Rollback => SdpType::Rollback,
703 _ => panic!("unknown sdp response"),
704 };
705
706 let desc = SessionDescription {
707 sdp: reply.sdp().as_text().unwrap(),
708 type_,
709 };
710
711 cb(desc);
712}
713
714fn on_incoming_stream(
715 pipe: &gstreamer::Pipeline,
716 thread: Arc<Mutex<WebRtcThread>>,
717 pad: &gstreamer::Pad,
718) {
719 let decodebin = gstreamer::ElementFactory::make("decodebin")
720 .build()
721 .unwrap();
722 let caps = pad.query_caps(None);
723 let name = caps
724 .structure(0)
725 .unwrap()
726 .get::<String>("media")
727 .expect("Invalid 'media' field");
728 let decodebin2 = decodebin.clone();
729 decodebin.connect_pad_added({
730 let pipeline_weak = pipe.downgrade();
731 move |_element, pad| {
732 let Some(pipe) = pipeline_weak.upgrade() else {
733 warn!("Pipeline already deallocated");
734 return;
735 };
736 on_incoming_decodebin_stream(pad, &pipe, thread.clone(), &name);
737 }
738 });
739 pipe.add(&decodebin).unwrap();
740
741 let decodepad = decodebin.static_pad("sink").unwrap();
742 pad.link(&decodepad).unwrap();
743 decodebin2.sync_state_with_parent().unwrap();
744}
745
746fn on_incoming_decodebin_stream(
747 pad: &gstreamer::Pad,
748 pipe: &gstreamer::Pipeline,
749 thread: Arc<Mutex<WebRtcThread>>,
750 name: &str,
751) {
752 let proxy_sink = gstreamer::ElementFactory::make("proxysink")
753 .build()
754 .unwrap();
755 let proxy_src = gstreamer::ElementFactory::make("proxysrc")
756 .property("proxysink", &proxy_sink)
757 .build()
758 .unwrap();
759 pipe.add(&proxy_sink).unwrap();
760 let sinkpad = proxy_sink.static_pad("sink").unwrap();
761
762 pad.link(&sinkpad).unwrap();
763 proxy_sink.sync_state_with_parent().unwrap();
764
765 let (stream, ty) = if name == "video" {
766 (
767 GStreamerMediaStream::create_video_from(proxy_src),
768 MediaStreamType::Video,
769 )
770 } else {
771 (
772 GStreamerMediaStream::create_audio_from(proxy_src),
773 MediaStreamType::Audio,
774 )
775 };
776 thread
777 .lock()
778 .unwrap()
779 .internal_event(InternalEvent::OnAddStream(stream, ty));
780}
781
782fn process_new_stream(
783 pad: &gstreamer::Pad,
784 pipe: &gstreamer::Pipeline,
785 thread: Arc<Mutex<WebRtcThread>>,
786) {
787 if pad.direction() != gstreamer::PadDirection::Src {
788 return;
790 }
791 on_incoming_stream(pipe, thread, pad)
792}
793
794fn candidate(values: &[glib::Value]) -> IceCandidate {
795 let _webrtc = values[0]
796 .get::<gstreamer::Element>()
797 .expect("Invalid argument");
798 let sdp_mline_index = values[1].get::<u32>().expect("Invalid argument");
799 let candidate = values[2].get::<String>().expect("Invalid argument");
800
801 IceCandidate {
802 sdp_mline_index,
803 candidate,
804 }
805}
806
807fn register_data_channel(
808 registry: Arc<Mutex<HashMap<DataChannelId, DataChannelEventTarget>>>,
809 id: DataChannelId,
810 channel: GStreamerWebRtcDataChannel,
811) -> WebRtcDataChannelResult {
812 if registry.lock().unwrap().contains_key(&id) {
813 return Err(WebRtcError::Backend(
814 "Could not register data channel. ID collision".to_owned(),
815 ));
816 }
817 registry
818 .lock()
819 .unwrap()
820 .insert(id, DataChannelEventTarget::Created(channel));
821 Ok(id)
822}