servo_media_gstreamer/
webrtc.rs

1use super::BACKEND_BASE_TIME;
2use crate::datachannel::GStreamerWebRtcDataChannel;
3use crate::media_stream::GStreamerMediaStream;
4use glib;
5use glib::prelude::*;
6use gst;
7use gst::prelude::*;
8use gst_sdp;
9use gst_webrtc;
10use log::warn;
11use servo_media_streams::registry::{get_stream, MediaStreamId};
12use servo_media_streams::MediaStreamType;
13use servo_media_webrtc::datachannel::DataChannelId;
14use servo_media_webrtc::thread::InternalEvent;
15use servo_media_webrtc::WebRtcController as WebRtcThread;
16use servo_media_webrtc::*;
17use std::collections::HashMap;
18use std::sync::atomic::{AtomicUsize, Ordering};
19use std::sync::{Arc, Mutex};
20use std::{cmp, mem};
21
22// TODO:
23// - figure out purpose of glib loop
24
25#[derive(Debug, Clone)]
26pub struct MLineInfo {
27    /// The caps for the given m-line
28    caps: gst::Caps,
29    /// Whether or not this sink pad has already been connected
30    is_used: bool,
31    /// The payload value of the given m-line
32    payload: i32,
33}
34
35enum DataChannelEventTarget {
36    Buffered(Vec<DataChannelEvent>),
37    Created(GStreamerWebRtcDataChannel),
38}
39
40pub struct GStreamerWebRtcController {
41    webrtc: gst::Element,
42    pipeline: gst::Pipeline,
43    /// We can't trigger a negotiation-needed event until we have streams, or otherwise
44    /// a createOffer() call will lead to bad SDP. Instead, we delay negotiation.
45    delayed_negotiation: bool,
46    /// A handle to the event loop abstraction surrounding the webrtc implementations,
47    /// which lets gstreamer callbacks send events back to the event loop to run on this object
48    thread: WebRtcThread,
49    signaller: Box<dyn WebRtcSignaller>,
50    /// All the streams that are actually connected to the webrtcbin (i.e., their presence has already
51    /// been negotiated)
52    streams: Vec<MediaStreamId>,
53    /// Disconnected streams that are waiting to be linked. Streams are
54    /// only linked when:
55    ///
56    /// - An offer is made (all pending streams are flushed)
57    /// - An offer is received (all matching pending streams are flushed)
58    /// - A stream is added when there is a so-far-disconnected remote-m-line
59    ///
60    /// In other words, these are all yet to be negotiated
61    ///
62    /// See link_stream
63    pending_streams: Vec<MediaStreamId>,
64    /// Each new webrtc stream should have a new payload/pt value, starting at 96
65    ///
66    /// This is maintained as a known yet-unused payload number, being incremented whenever
67    /// we use it, and set to (remote_pt + 1) if the remote sends us a stream with a higher pt
68    pt_counter: i32,
69    /// We keep track of how many request pads have been created on webrtcbin
70    /// so that we can request more to fill in the gaps and acquire a specific pad if necessary
71    request_pad_counter: usize,
72    /// Streams need to be connected to the relevant sink pad, and we figure this out
73    /// by keeping track of the caps of each m-line in the SDP.
74    remote_mline_info: Vec<MLineInfo>,
75    /// Temporary storage for remote_mline_info until the remote description is applied
76    ///
77    /// Without this, a unluckily timed call to link_stream() may happen before the webrtcbin
78    /// knows the remote description, but while we _think_ it does
79    pending_remote_mline_info: Vec<MLineInfo>,
80    /// In case we get multiple remote offers, this lets us keep track of which is the newest
81    remote_offer_generation: u32,
82    _main_loop: glib::MainLoop,
83    data_channels: Arc<Mutex<HashMap<DataChannelId, DataChannelEventTarget>>>,
84    next_data_channel_id: Arc<AtomicUsize>,
85}
86
87impl WebRtcControllerBackend for GStreamerWebRtcController {
88    fn add_ice_candidate(&mut self, candidate: IceCandidate) -> WebRtcResult {
89        self.webrtc.emit_by_name::<()>(
90            "add-ice-candidate",
91            &[&candidate.sdp_mline_index, &candidate.candidate],
92        );
93        Ok(())
94    }
95
96    fn set_remote_description(
97        &mut self,
98        desc: SessionDescription,
99        cb: Box<dyn FnOnce() + Send + 'static>,
100    ) -> WebRtcResult {
101        self.set_description(desc, DescriptionType::Remote, cb)
102    }
103
104    fn set_local_description(
105        &mut self,
106        desc: SessionDescription,
107        cb: Box<dyn FnOnce() + Send + 'static>,
108    ) -> WebRtcResult {
109        self.set_description(desc, DescriptionType::Local, cb)
110    }
111
112    fn create_offer(
113        &mut self,
114        cb: Box<dyn FnOnce(SessionDescription) + Send + 'static>,
115    ) -> WebRtcResult {
116        self.flush_pending_streams(true)?;
117        self.pipeline.set_state(gst::State::Playing)?;
118        let promise = gst::Promise::with_change_func(move |res| {
119            res.map(|s| on_offer_or_answer_created(SdpType::Offer, s.unwrap(), cb))
120                .unwrap();
121        });
122
123        self.webrtc
124            .emit_by_name::<()>("create-offer", &[&None::<gst::Structure>, &promise]);
125        Ok(())
126    }
127
128    fn create_answer(
129        &mut self,
130        cb: Box<dyn FnOnce(SessionDescription) + Send + 'static>,
131    ) -> WebRtcResult {
132        let promise = gst::Promise::with_change_func(move |res| {
133            res.map(|s| on_offer_or_answer_created(SdpType::Answer, s.unwrap(), cb))
134                .unwrap();
135        });
136
137        self.webrtc
138            .emit_by_name::<()>("create-answer", &[&None::<gst::Structure>, &promise]);
139        Ok(())
140    }
141
142    fn add_stream(&mut self, stream_id: &MediaStreamId) -> WebRtcResult {
143        let stream =
144            get_stream(stream_id).expect("Media streams registry does not contain such ID");
145        let mut stream = stream.lock().unwrap();
146        let mut stream = stream
147            .as_mut_any()
148            .downcast_mut::<GStreamerMediaStream>()
149            .ok_or("Does not currently support non-gstreamer streams")?;
150        self.link_stream(stream_id, &mut stream, false)?;
151        if self.delayed_negotiation && (self.streams.len() > 1 || self.pending_streams.len() > 1) {
152            self.delayed_negotiation = false;
153            self.signaller.on_negotiation_needed(&self.thread);
154        }
155        Ok(())
156    }
157
158    fn create_data_channel(&mut self, init: &DataChannelInit) -> WebRtcDataChannelResult {
159        let id = self.next_data_channel_id.fetch_add(1, Ordering::Relaxed);
160        match GStreamerWebRtcDataChannel::new(&id, &self.webrtc, &self.thread, init) {
161            Ok(channel) => register_data_channel(self.data_channels.clone(), id, channel),
162            Err(error) => Err(WebRtcError::Backend(error)),
163        }
164    }
165
166    fn close_data_channel(&mut self, id: &DataChannelId) -> WebRtcResult {
167        // There is no need to unregister the channel here. It will be unregistered
168        // when the data channel backend triggers the on closed event.
169        let mut data_channels = self.data_channels.lock().unwrap();
170        match data_channels.get(id) {
171            Some(ref channel) => match channel {
172                DataChannelEventTarget::Created(ref channel) => {
173                    channel.close();
174                    Ok(())
175                },
176                DataChannelEventTarget::Buffered(_) => data_channels
177                    .remove(id)
178                    .ok_or(WebRtcError::Backend("Unknown data channel".to_owned()))
179                    .map(|_| ()),
180            },
181            None => Err(WebRtcError::Backend("Unknown data channel".to_owned())),
182        }
183    }
184
185    fn send_data_channel_message(
186        &mut self,
187        id: &DataChannelId,
188        message: &DataChannelMessage,
189    ) -> WebRtcResult {
190        match self.data_channels.lock().unwrap().get(id) {
191            Some(ref channel) => match channel {
192                DataChannelEventTarget::Created(ref channel) => {
193                    channel.send(message);
194                    Ok(())
195                },
196                _ => Ok(()),
197            },
198            None => Err(WebRtcError::Backend("Unknown data channel".to_owned())),
199        }
200    }
201
202    fn configure(&mut self, stun_server: &str, policy: BundlePolicy) -> WebRtcResult {
203        self.webrtc
204            .set_property_from_str("stun-server", stun_server);
205        self.webrtc
206            .set_property_from_str("bundle-policy", policy.as_str());
207        Ok(())
208    }
209
210    fn internal_event(&mut self, e: thread::InternalEvent) -> WebRtcResult {
211        match e {
212            InternalEvent::OnNegotiationNeeded => {
213                if self.streams.is_empty() && self.pending_streams.is_empty() {
214                    // we have no streams
215
216                    // If the pipeline starts playing and on-negotiation-needed is present before there are any
217                    // media streams, an invalid SDP offer will be created. Therefore, delay emitting the signal
218                    self.delayed_negotiation = true;
219                } else {
220                    self.signaller.on_negotiation_needed(&self.thread);
221                }
222            },
223            InternalEvent::OnIceCandidate(candidate) => {
224                self.signaller.on_ice_candidate(&self.thread, candidate);
225            },
226            InternalEvent::OnAddStream(stream, ty) => {
227                self.pipeline.set_state(gst::State::Playing)?;
228                self.signaller.on_add_stream(&stream, ty);
229            },
230            InternalEvent::OnDataChannelEvent(channel_id, event) => {
231                let mut data_channels = self.data_channels.lock().unwrap();
232                match data_channels.get_mut(&channel_id) {
233                    None => {
234                        data_channels
235                            .insert(channel_id, DataChannelEventTarget::Buffered(vec![event]));
236                    },
237                    Some(ref mut channel) => match channel {
238                        DataChannelEventTarget::Buffered(ref mut events) => {
239                            events.push(event);
240                            return Ok(());
241                        },
242                        DataChannelEventTarget::Created(_) => {
243                            match event {
244                                DataChannelEvent::Close => {
245                                    data_channels.remove(&channel_id);
246                                },
247                                _ => {},
248                            }
249                            self.signaller
250                                .on_data_channel_event(channel_id, event, &self.thread);
251                        },
252                    },
253                }
254            },
255            InternalEvent::DescriptionAdded(cb, description_type, ty, remote_offer_generation) => {
256                if description_type == DescriptionType::Remote
257                    && ty == SdpType::Offer
258                    && remote_offer_generation == self.remote_offer_generation
259                {
260                    mem::swap(
261                        &mut self.pending_remote_mline_info,
262                        &mut self.remote_mline_info,
263                    );
264                    self.pending_remote_mline_info.clear();
265                    self.flush_pending_streams(false)?;
266                }
267                self.pipeline.set_state(gst::State::Playing)?;
268                cb();
269            },
270            InternalEvent::UpdateSignalingState => {
271                use gst_webrtc::WebRTCSignalingState::*;
272                let val = self
273                    .webrtc
274                    .property::<gst_webrtc::WebRTCSignalingState>("signaling-state");
275                let state = match val {
276                    Stable => SignalingState::Stable,
277                    HaveLocalOffer => SignalingState::HaveLocalOffer,
278                    HaveRemoteOffer => SignalingState::HaveRemoteOffer,
279                    HaveLocalPranswer => SignalingState::HaveLocalPranswer,
280                    HaveRemotePranswer => SignalingState::HaveRemotePranswer,
281                    Closed => SignalingState::Closed,
282                    i => {
283                        return Err(WebRtcError::Backend(format!(
284                            "unknown signaling state: {:?}",
285                            i
286                        )))
287                    },
288                };
289                self.signaller.update_signaling_state(state);
290            },
291            InternalEvent::UpdateGatheringState => {
292                use gst_webrtc::WebRTCICEGatheringState::*;
293                let val = self
294                    .webrtc
295                    .property::<gst_webrtc::WebRTCICEGatheringState>("ice-gathering-state");
296                let state = match val {
297                    New => GatheringState::New,
298                    Gathering => GatheringState::Gathering,
299                    Complete => GatheringState::Complete,
300                    i => {
301                        return Err(WebRtcError::Backend(format!(
302                            "unknown gathering state: {:?}",
303                            i
304                        )))
305                    },
306                };
307                self.signaller.update_gathering_state(state);
308            },
309            InternalEvent::UpdateIceConnectionState => {
310                use gst_webrtc::WebRTCICEConnectionState::*;
311                let val = self
312                    .webrtc
313                    .property::<gst_webrtc::WebRTCICEConnectionState>("ice-connection-state");
314                let state = match val {
315                    New => IceConnectionState::New,
316                    Checking => IceConnectionState::Checking,
317                    Connected => IceConnectionState::Connected,
318                    Completed => IceConnectionState::Completed,
319                    Disconnected => IceConnectionState::Disconnected,
320                    Failed => IceConnectionState::Failed,
321                    Closed => IceConnectionState::Closed,
322                    i => {
323                        return Err(WebRtcError::Backend(format!(
324                            "unknown ICE connection state: {:?}",
325                            i
326                        )))
327                    },
328                };
329                self.signaller.update_ice_connection_state(state);
330            },
331        }
332        Ok(())
333    }
334
335    fn quit(&mut self) {
336        self.signaller.close();
337
338        self.pipeline.set_state(gst::State::Null).unwrap();
339    }
340}
341
342impl GStreamerWebRtcController {
343    fn set_description(
344        &mut self,
345        desc: SessionDescription,
346        description_type: DescriptionType,
347        cb: Box<dyn FnOnce() + Send + 'static>,
348    ) -> WebRtcResult {
349        let ty = match desc.type_ {
350            SdpType::Answer => gst_webrtc::WebRTCSDPType::Answer,
351            SdpType::Offer => gst_webrtc::WebRTCSDPType::Offer,
352            SdpType::Pranswer => gst_webrtc::WebRTCSDPType::Pranswer,
353            SdpType::Rollback => gst_webrtc::WebRTCSDPType::Rollback,
354        };
355
356        let kind = match description_type {
357            DescriptionType::Local => "set-local-description",
358            DescriptionType::Remote => "set-remote-description",
359        };
360
361        let sdp = gst_sdp::SDPMessage::parse_buffer(desc.sdp.as_bytes()).unwrap();
362        if description_type == DescriptionType::Remote {
363            self.remote_offer_generation += 1;
364            self.store_remote_mline_info(&sdp);
365        }
366        let answer = gst_webrtc::WebRTCSessionDescription::new(ty, sdp);
367        let thread = self.thread.clone();
368        let remote_offer_generation = self.remote_offer_generation;
369        let promise = gst::Promise::with_change_func(move |_promise| {
370            // remote_offer_generation here ensures that DescriptionAdded doesn't
371            // flush pending_remote_mline_info for stale remote offer callbacks
372            thread.internal_event(InternalEvent::DescriptionAdded(
373                cb,
374                description_type,
375                desc.type_,
376                remote_offer_generation,
377            ));
378        });
379        self.webrtc.emit_by_name::<()>(kind, &[&answer, &promise]);
380        Ok(())
381    }
382
383    fn store_remote_mline_info(&mut self, sdp: &gst_sdp::SDPMessage) {
384        self.pending_remote_mline_info.clear();
385        for media in sdp.medias() {
386            let mut caps = gst::Caps::new_empty();
387            let caps_mut = caps.get_mut().expect("Fresh caps should be uniquely owned");
388            for format in media.formats() {
389                if format == "webrtc-datachannel" {
390                    return;
391                }
392                let pt = format
393                    .parse()
394                    .expect("Gstreamer provided noninteger format");
395                caps_mut.append(
396                    media
397                        .caps_from_media(pt)
398                        .expect("get_format() did not return a format from the SDP"),
399                );
400                self.pt_counter = cmp::max(self.pt_counter, pt + 1);
401            }
402            for s in caps_mut.iter_mut() {
403                // the caps are application/x-unknown by default, which will fail
404                // to intersect
405                //
406                // see https://gitlab.freedesktop.org/gstreamer/gst-plugins-bad/blob/ba62917fbfd98ea76d4e066a6f18b4a14b847362/ext/webrtc/gstwebrtcbin.c#L2521
407                s.set_name("application/x-rtp")
408            }
409            // This info is not current until the promise from set-remote-description is resolved,
410            // to avoid any races where we attempt to link streams before the promise resolves we
411            // queue this up in a pending buffer
412            self.pending_remote_mline_info.push(MLineInfo {
413                caps,
414                // XXXManishearth in the (yet unsupported) case of dynamic stream addition and renegotiation
415                // this will need to be checked against the current set of streams
416                is_used: false,
417                // XXXManishearth ideally, we keep track of all payloads and have the capability of picking
418                // the appropriate decoder. For this, a bunch of the streams code will have to be moved into
419                // a webrtc-specific abstraction.
420                payload: media
421                    .format(0)
422                    .expect("Gstreamer reported incorrect formats_len()")
423                    .parse()
424                    .expect("Gstreamer provided noninteger format"),
425            });
426        }
427    }
428
429    /// Streams need to be linked to the correct pads, so we buffer them up until we know enough
430    /// to do this.
431    ///
432    /// When we get a remote offer, we store the relevant m-line information so that we can
433    /// pick the correct sink pad and payload. Shortly after we look for any pending streams
434    /// and connect them to available compatible m-lines using link_stream.
435    ///
436    /// When we create an offer, we're controlling the pad order, so we set request_new_pads
437    /// to true and forcefully link all pending streams before generating the offer.
438    ///
439    /// When request_new_pads is false, we may still request new pads, however we only do this for
440    /// streams that have already been negotiated by the remote.
441    fn link_stream(
442        &mut self,
443        stream_id: &MediaStreamId,
444        stream: &mut GStreamerMediaStream,
445        request_new_pads: bool,
446    ) -> WebRtcResult {
447        let caps = stream.caps();
448        let idx = self
449            .remote_mline_info
450            .iter()
451            .enumerate()
452            .filter(|(_, x)| !x.is_used)
453            .find(|(_, x)| x.caps.can_intersect(caps))
454            .map(|x| x.0);
455        if let Some(idx) = idx {
456            if idx >= self.request_pad_counter {
457                for i in self.request_pad_counter..=idx {
458                    // webrtcbin needs you to request pads (or use element.link(webrtcbin))
459                    // however, it also wants them to be connected in the correct order.
460                    //
461                    // Here, we make sure all the numbered sink pads have been created beforehand, up to
462                    // and including the one we need here.
463                    //
464                    // An alternate fix is to sort pending_streams according to the m-line index
465                    // and just do it in order. This also seems brittle.
466                    self.webrtc
467                        .request_pad_simple(&format!("sink_{}", i))
468                        .ok_or("Cannot request sink pad")?;
469                }
470                self.request_pad_counter = idx + 1;
471            }
472            stream.attach_to_pipeline(&self.pipeline);
473            let element = stream.encoded();
474            self.remote_mline_info[idx].is_used = true;
475            let caps = stream.caps_with_payload(self.remote_mline_info[idx].payload);
476            element.set_property("caps", &caps);
477            let src = element.static_pad("src").ok_or("Cannot request src pad")?;
478            let sink = self
479                .webrtc
480                .static_pad(&format!("sink_{}", idx))
481                .ok_or("Cannot request sink pad")?;
482            src.link(&sink)?;
483            self.streams.push(*stream_id);
484        } else if request_new_pads {
485            stream.attach_to_pipeline(&self.pipeline);
486            let element = stream.encoded();
487            let caps = stream.caps_with_payload(self.pt_counter);
488            self.pt_counter += 1;
489            element.set_property("caps", &caps);
490            let src = element.static_pad("src").ok_or("Cannot request src pad")?;
491            let sink = self
492                .webrtc
493                .request_pad_simple(&format!("sink_{}", self.request_pad_counter))
494                .ok_or("Cannot request sink pad")?;
495            self.request_pad_counter += 1;
496            src.link(&sink)?;
497            self.streams.push(*stream_id);
498        } else {
499            self.pending_streams.push(*stream_id);
500        }
501        Ok(())
502    }
503
504    /// link_stream, but for all pending streams
505    fn flush_pending_streams(&mut self, request_new_pads: bool) -> WebRtcResult {
506        let pending_streams = mem::replace(&mut self.pending_streams, vec![]);
507        for stream_id in pending_streams {
508            let stream =
509                get_stream(&stream_id).expect("Media streams registry does not contain such ID");
510            let mut stream = stream.lock().unwrap();
511            let mut stream = stream
512                .as_mut_any()
513                .downcast_mut::<GStreamerMediaStream>()
514                .ok_or("Does not currently support non-gstreamer streams")?;
515            self.link_stream(&stream_id, &mut stream, request_new_pads)?;
516        }
517        Ok(())
518    }
519
520    fn start_pipeline(&mut self) -> WebRtcResult {
521        self.pipeline.add(&self.webrtc)?;
522
523        // gstreamer needs Sync on these callbacks for some reason
524        // https://github.com/sdroege/gstreamer-rs/issues/154
525        let thread = Mutex::new(self.thread.clone());
526        self.webrtc
527            .connect("on-ice-candidate", false, move |values| {
528                thread
529                    .lock()
530                    .unwrap()
531                    .internal_event(InternalEvent::OnIceCandidate(candidate(values)));
532                None
533            });
534
535        let thread = Arc::new(Mutex::new(self.thread.clone()));
536        self.webrtc.connect_pad_added({
537            let pipeline_weak = self.pipeline.downgrade();
538            move |_element, pad| {
539                let Some(pipe) = pipeline_weak.upgrade() else {
540                    warn!("Pipeline already deallocated");
541                    return;
542                };
543                process_new_stream(pad, &pipe, thread.clone());
544            }
545        });
546
547        // gstreamer needs Sync on these callbacks for some reason
548        // https://github.com/sdroege/gstreamer-rs/issues/154
549        let thread = Mutex::new(self.thread.clone());
550        self.webrtc
551            .connect("on-negotiation-needed", false, move |_values| {
552                thread
553                    .lock()
554                    .unwrap()
555                    .internal_event(InternalEvent::OnNegotiationNeeded);
556                None
557            });
558
559        let thread = Mutex::new(self.thread.clone());
560        self.webrtc
561            .connect("notify::signaling-state", false, move |_values| {
562                thread
563                    .lock()
564                    .unwrap()
565                    .internal_event(InternalEvent::UpdateSignalingState);
566                None
567            });
568        let thread = Mutex::new(self.thread.clone());
569        self.webrtc
570            .connect("notify::ice-connection-state", false, move |_values| {
571                thread
572                    .lock()
573                    .unwrap()
574                    .internal_event(InternalEvent::UpdateIceConnectionState);
575                None
576            });
577        let thread = Mutex::new(self.thread.clone());
578        self.webrtc
579            .connect("notify::ice-gathering-state", false, move |_values| {
580                thread
581                    .lock()
582                    .unwrap()
583                    .internal_event(InternalEvent::UpdateGatheringState);
584                None
585            });
586        let thread = Mutex::new(self.thread.clone());
587        let data_channels = self.data_channels.clone();
588        let next_data_channel_id = self.next_data_channel_id.clone();
589        self.webrtc
590            .connect("on-data-channel", false, move |channel| {
591                let channel = channel[1]
592                    .get::<gst_webrtc::WebRTCDataChannel>()
593                    .map_err(|e| e.to_string())
594                    .expect("Invalid data channel");
595                let id = next_data_channel_id.fetch_add(1, Ordering::Relaxed);
596                let thread_ = thread.lock().unwrap().clone();
597                match GStreamerWebRtcDataChannel::from(&id, channel, &thread_) {
598                    Ok(channel) => {
599                        let mut closed_channel = false;
600                        {
601                            thread_.internal_event(InternalEvent::OnDataChannelEvent(
602                                id,
603                                DataChannelEvent::NewChannel,
604                            ));
605
606                            let mut data_channels = data_channels.lock().unwrap();
607                            if let Some(ref mut channel) = data_channels.get_mut(&id) {
608                                match channel {
609                                    DataChannelEventTarget::Buffered(ref mut events) => {
610                                        for event in events.drain(0..) {
611                                            match event {
612                                                DataChannelEvent::Close => closed_channel = true,
613                                                _ => {},
614                                            }
615                                            thread_.internal_event(
616                                                InternalEvent::OnDataChannelEvent(id, event),
617                                            );
618                                        }
619                                    },
620                                    _ => debug_assert!(
621                                        false,
622                                        "Trying to register a data channel with an existing ID"
623                                    ),
624                                }
625                            }
626                            data_channels.remove(&id);
627                        }
628                        if !closed_channel {
629                            if register_data_channel(data_channels.clone(), id, channel).is_err() {
630                                warn!("Could not register data channel {:?}", id);
631                                return None;
632                            }
633                        }
634                    },
635                    Err(error) => {
636                        warn!("Could not create data channel {:?}", error);
637                    },
638                }
639                None
640            });
641
642        self.pipeline.set_state(gst::State::Ready)?;
643        Ok(())
644    }
645}
646
647pub fn construct(
648    signaller: Box<dyn WebRtcSignaller>,
649    thread: WebRtcThread,
650) -> Result<GStreamerWebRtcController, WebRtcError> {
651    let main_loop = glib::MainLoop::new(None, false);
652    let pipeline = gst::Pipeline::with_name("webrtc main");
653    pipeline.set_start_time(gst::ClockTime::NONE);
654    pipeline.set_base_time(*BACKEND_BASE_TIME);
655    pipeline.use_clock(Some(&gst::SystemClock::obtain()));
656    let webrtc = gst::ElementFactory::make("webrtcbin")
657        .name("sendrecv")
658        .build()
659        .map_err(|error| format!("webrtcbin element not found: {error:?}"))?;
660    let mut controller = GStreamerWebRtcController {
661        webrtc,
662        pipeline,
663        signaller,
664        thread,
665        remote_mline_info: vec![],
666        pending_remote_mline_info: vec![],
667        streams: vec![],
668        pending_streams: vec![],
669        pt_counter: 96,
670        request_pad_counter: 0,
671        remote_offer_generation: 0,
672        delayed_negotiation: false,
673        _main_loop: main_loop,
674        data_channels: Arc::new(Mutex::new(HashMap::new())),
675        next_data_channel_id: Arc::new(AtomicUsize::new(0)),
676    };
677    controller.start_pipeline()?;
678    Ok(controller)
679}
680
681fn on_offer_or_answer_created(
682    ty: SdpType,
683    reply: &gst::StructureRef,
684    cb: Box<dyn FnOnce(SessionDescription) + Send + 'static>,
685) {
686    debug_assert!(ty == SdpType::Offer || ty == SdpType::Answer);
687    let reply = reply
688        .value(ty.as_str())
689        .unwrap()
690        .get::<gst_webrtc::WebRTCSessionDescription>()
691        .expect("Invalid argument");
692
693    let type_ = match reply.type_() {
694        gst_webrtc::WebRTCSDPType::Answer => SdpType::Answer,
695        gst_webrtc::WebRTCSDPType::Offer => SdpType::Offer,
696        gst_webrtc::WebRTCSDPType::Pranswer => SdpType::Pranswer,
697        gst_webrtc::WebRTCSDPType::Rollback => SdpType::Rollback,
698        _ => panic!("unknown sdp response"),
699    };
700
701    let desc = SessionDescription {
702        sdp: reply.sdp().as_text().unwrap(),
703        type_,
704    };
705
706    cb(desc);
707}
708
709fn on_incoming_stream(pipe: &gst::Pipeline, thread: Arc<Mutex<WebRtcThread>>, pad: &gst::Pad) {
710    let decodebin = gst::ElementFactory::make("decodebin").build().unwrap();
711    let caps = pad.query_caps(None);
712    let name = caps
713        .structure(0)
714        .unwrap()
715        .get::<String>("media")
716        .expect("Invalid 'media' field");
717    let decodebin2 = decodebin.clone();
718    decodebin.connect_pad_added({
719        let pipeline_weak = pipe.downgrade();
720        move |_element, pad| {
721            let Some(pipe) = pipeline_weak.upgrade() else {
722                warn!("Pipeline already deallocated");
723                return;
724            };
725            on_incoming_decodebin_stream(pad, &pipe, thread.clone(), &name);
726        }
727    });
728    pipe.add(&decodebin).unwrap();
729
730    let decodepad = decodebin.static_pad("sink").unwrap();
731    pad.link(&decodepad).unwrap();
732    decodebin2.sync_state_with_parent().unwrap();
733}
734
735fn on_incoming_decodebin_stream(
736    pad: &gst::Pad,
737    pipe: &gst::Pipeline,
738    thread: Arc<Mutex<WebRtcThread>>,
739    name: &str,
740) {
741    let proxy_sink = gst::ElementFactory::make("proxysink").build().unwrap();
742    let proxy_src = gst::ElementFactory::make("proxysrc")
743        .property("proxysink", &proxy_sink)
744        .build()
745        .unwrap();
746    pipe.add(&proxy_sink).unwrap();
747    let sinkpad = proxy_sink.static_pad("sink").unwrap();
748
749    pad.link(&sinkpad).unwrap();
750    proxy_sink.sync_state_with_parent().unwrap();
751
752    let (stream, ty) = if name == "video" {
753        (
754            GStreamerMediaStream::create_video_from(proxy_src),
755            MediaStreamType::Video,
756        )
757    } else {
758        (
759            GStreamerMediaStream::create_audio_from(proxy_src),
760            MediaStreamType::Audio,
761        )
762    };
763    thread
764        .lock()
765        .unwrap()
766        .internal_event(InternalEvent::OnAddStream(stream, ty));
767}
768
769fn process_new_stream(pad: &gst::Pad, pipe: &gst::Pipeline, thread: Arc<Mutex<WebRtcThread>>) {
770    if pad.direction() != gst::PadDirection::Src {
771        // Ignore outgoing pad notifications.
772        return;
773    }
774    on_incoming_stream(pipe, thread, pad)
775}
776
777fn candidate(values: &[glib::Value]) -> IceCandidate {
778    let _webrtc = values[0].get::<gst::Element>().expect("Invalid argument");
779    let sdp_mline_index = values[1].get::<u32>().expect("Invalid argument");
780    let candidate = values[2].get::<String>().expect("Invalid argument");
781
782    IceCandidate {
783        sdp_mline_index,
784        candidate,
785    }
786}
787
788fn register_data_channel(
789    registry: Arc<Mutex<HashMap<DataChannelId, DataChannelEventTarget>>>,
790    id: DataChannelId,
791    channel: GStreamerWebRtcDataChannel,
792) -> WebRtcDataChannelResult {
793    if registry.lock().unwrap().contains_key(&id) {
794        return Err(WebRtcError::Backend(
795            "Could not register data channel. ID collision".to_owned(),
796        ));
797    }
798    registry
799        .lock()
800        .unwrap()
801        .insert(id, DataChannelEventTarget::Created(channel));
802    Ok(id)
803}