servo_media_gstreamer/
webrtc.rs

1use super::BACKEND_BASE_TIME;
2use crate::datachannel::GStreamerWebRtcDataChannel;
3use crate::media_stream::GStreamerMediaStream;
4use glib;
5use glib::prelude::*;
6use gst;
7use gst::prelude::*;
8use gst_sdp;
9use gst_webrtc;
10use log::warn;
11use servo_media_streams::registry::{get_stream, MediaStreamId};
12use servo_media_streams::MediaStreamType;
13use servo_media_webrtc::datachannel::DataChannelId;
14use servo_media_webrtc::thread::InternalEvent;
15use servo_media_webrtc::WebRtcController as WebRtcThread;
16use servo_media_webrtc::*;
17use std::collections::HashMap;
18use std::sync::atomic::{AtomicUsize, Ordering};
19use std::sync::{Arc, Mutex};
20use std::{cmp, mem};
21
22// TODO:
23// - figure out purpose of glib loop
24
25#[derive(Debug, Clone)]
26pub struct MLineInfo {
27    /// The caps for the given m-line
28    caps: gst::Caps,
29    /// Whether or not this sink pad has already been connected
30    is_used: bool,
31    /// The payload value of the given m-line
32    payload: i32,
33}
34
35enum DataChannelEventTarget {
36    Buffered(Vec<DataChannelEvent>),
37    Created(GStreamerWebRtcDataChannel),
38}
39
40pub struct GStreamerWebRtcController {
41    webrtc: gst::Element,
42    pipeline: gst::Pipeline,
43    /// We can't trigger a negotiation-needed event until we have streams, or otherwise
44    /// a createOffer() call will lead to bad SDP. Instead, we delay negotiation.
45    delayed_negotiation: bool,
46    /// A handle to the event loop abstraction surrounding the webrtc implementations,
47    /// which lets gstreamer callbacks send events back to the event loop to run on this object
48    thread: WebRtcThread,
49    signaller: Box<dyn WebRtcSignaller>,
50    /// All the streams that are actually connected to the webrtcbin (i.e., their presence has already
51    /// been negotiated)
52    streams: Vec<MediaStreamId>,
53    /// Disconnected streams that are waiting to be linked. Streams are
54    /// only linked when:
55    ///
56    /// - An offer is made (all pending streams are flushed)
57    /// - An offer is received (all matching pending streams are flushed)
58    /// - A stream is added when there is a so-far-disconnected remote-m-line
59    ///
60    /// In other words, these are all yet to be negotiated
61    ///
62    /// See link_stream
63    pending_streams: Vec<MediaStreamId>,
64    /// Each new webrtc stream should have a new payload/pt value, starting at 96
65    ///
66    /// This is maintained as a known yet-unused payload number, being incremented whenever
67    /// we use it, and set to (remote_pt + 1) if the remote sends us a stream with a higher pt
68    pt_counter: i32,
69    /// We keep track of how many request pads have been created on webrtcbin
70    /// so that we can request more to fill in the gaps and acquire a specific pad if necessary
71    request_pad_counter: usize,
72    /// Streams need to be connected to the relevant sink pad, and we figure this out
73    /// by keeping track of the caps of each m-line in the SDP.
74    remote_mline_info: Vec<MLineInfo>,
75    /// Temporary storage for remote_mline_info until the remote description is applied
76    ///
77    /// Without this, a unluckily timed call to link_stream() may happen before the webrtcbin
78    /// knows the remote description, but while we _think_ it does
79    pending_remote_mline_info: Vec<MLineInfo>,
80    /// In case we get multiple remote offers, this lets us keep track of which is the newest
81    remote_offer_generation: u32,
82    _main_loop: glib::MainLoop,
83    data_channels: Arc<Mutex<HashMap<DataChannelId, DataChannelEventTarget>>>,
84    next_data_channel_id: Arc<AtomicUsize>,
85}
86
87impl WebRtcControllerBackend for GStreamerWebRtcController {
88    fn add_ice_candidate(&mut self, candidate: IceCandidate) -> WebRtcResult {
89        self.webrtc.emit_by_name::<()>(
90            "add-ice-candidate",
91            &[&candidate.sdp_mline_index, &candidate.candidate],
92        );
93        Ok(())
94    }
95
96    fn set_remote_description(
97        &mut self,
98        desc: SessionDescription,
99        cb: Box<dyn FnOnce() + Send + 'static>,
100    ) -> WebRtcResult {
101        self.set_description(desc, DescriptionType::Remote, cb)
102    }
103
104    fn set_local_description(
105        &mut self,
106        desc: SessionDescription,
107        cb: Box<dyn FnOnce() + Send + 'static>,
108    ) -> WebRtcResult {
109        self.set_description(desc, DescriptionType::Local, cb)
110    }
111
112    fn create_offer(&mut self, cb: Box<dyn FnOnce(SessionDescription) + Send + 'static>) -> WebRtcResult {
113        self.flush_pending_streams(true)?;
114        self.pipeline.set_state(gst::State::Playing)?;
115        let promise = gst::Promise::with_change_func(move |res| {
116            res.map(|s| on_offer_or_answer_created(SdpType::Offer, s.unwrap(), cb))
117                .unwrap();
118        });
119
120        self.webrtc
121            .emit_by_name::<()>("create-offer", &[&None::<gst::Structure>, &promise]);
122        Ok(())
123    }
124
125    fn create_answer(&mut self, cb: Box<dyn FnOnce(SessionDescription) + Send + 'static>) -> WebRtcResult {
126        let promise = gst::Promise::with_change_func(move |res| {
127            res.map(|s| on_offer_or_answer_created(SdpType::Answer, s.unwrap(), cb))
128                .unwrap();
129        });
130
131        self.webrtc
132            .emit_by_name::<()>("create-answer", &[&None::<gst::Structure>, &promise]);
133        Ok(())
134    }
135
136    fn add_stream(&mut self, stream_id: &MediaStreamId) -> WebRtcResult {
137        let stream =
138            get_stream(stream_id).expect("Media streams registry does not contain such ID");
139        let mut stream = stream.lock().unwrap();
140        let mut stream = stream
141            .as_mut_any()
142            .downcast_mut::<GStreamerMediaStream>()
143            .ok_or("Does not currently support non-gstreamer streams")?;
144        self.link_stream(stream_id, &mut stream, false)?;
145        if self.delayed_negotiation && (self.streams.len() > 1 || self.pending_streams.len() > 1) {
146            self.delayed_negotiation = false;
147            self.signaller.on_negotiation_needed(&self.thread);
148        }
149        Ok(())
150    }
151
152    fn create_data_channel(&mut self, init: &DataChannelInit) -> WebRtcDataChannelResult {
153        let id = self.next_data_channel_id.fetch_add(1, Ordering::Relaxed);
154        match GStreamerWebRtcDataChannel::new(&id, &self.webrtc, &self.thread, init) {
155            Ok(channel) => register_data_channel(self.data_channels.clone(), id, channel),
156            Err(error) => Err(WebRtcError::Backend(error)),
157        }
158    }
159
160    fn close_data_channel(&mut self, id: &DataChannelId) -> WebRtcResult {
161        // There is no need to unregister the channel here. It will be unregistered
162        // when the data channel backend triggers the on closed event.
163        let mut data_channels = self.data_channels.lock().unwrap();
164        match data_channels.get(id) {
165            Some(ref channel) => match channel {
166                DataChannelEventTarget::Created(ref channel) => {
167                    channel.close();
168                    Ok(())
169                }
170                DataChannelEventTarget::Buffered(_) => data_channels
171                    .remove(id)
172                    .ok_or(WebRtcError::Backend("Unknown data channel".to_owned()))
173                    .map(|_| ()),
174            },
175            None => Err(WebRtcError::Backend("Unknown data channel".to_owned())),
176        }
177    }
178
179    fn send_data_channel_message(
180        &mut self,
181        id: &DataChannelId,
182        message: &DataChannelMessage,
183    ) -> WebRtcResult {
184        match self.data_channels.lock().unwrap().get(id) {
185            Some(ref channel) => match channel {
186                DataChannelEventTarget::Created(ref channel) => {
187                    channel.send(message);
188                    Ok(())
189                }
190                _ => Ok(()),
191            },
192            None => Err(WebRtcError::Backend("Unknown data channel".to_owned())),
193        }
194    }
195
196    fn configure(&mut self, stun_server: &str, policy: BundlePolicy) -> WebRtcResult {
197        self.webrtc
198            .set_property_from_str("stun-server", stun_server);
199        self.webrtc
200            .set_property_from_str("bundle-policy", policy.as_str());
201        Ok(())
202    }
203
204    fn internal_event(&mut self, e: thread::InternalEvent) -> WebRtcResult {
205        match e {
206            InternalEvent::OnNegotiationNeeded => {
207                if self.streams.is_empty() && self.pending_streams.is_empty() {
208                    // we have no streams
209
210                    // If the pipeline starts playing and on-negotiation-needed is present before there are any
211                    // media streams, an invalid SDP offer will be created. Therefore, delay emitting the signal
212                    self.delayed_negotiation = true;
213                } else {
214                    self.signaller.on_negotiation_needed(&self.thread);
215                }
216            }
217            InternalEvent::OnIceCandidate(candidate) => {
218                self.signaller.on_ice_candidate(&self.thread, candidate);
219            }
220            InternalEvent::OnAddStream(stream, ty) => {
221                self.pipeline.set_state(gst::State::Playing)?;
222                self.signaller.on_add_stream(&stream, ty);
223            }
224            InternalEvent::OnDataChannelEvent(channel_id, event) => {
225                let mut data_channels = self.data_channels.lock().unwrap();
226                match data_channels.get_mut(&channel_id) {
227                    None => {
228                        data_channels
229                            .insert(channel_id, DataChannelEventTarget::Buffered(vec![event]));
230                    }
231                    Some(ref mut channel) => match channel {
232                        DataChannelEventTarget::Buffered(ref mut events) => {
233                            events.push(event);
234                            return Ok(());
235                        }
236                        DataChannelEventTarget::Created(_) => {
237                            match event {
238                                DataChannelEvent::Close => {
239                                    data_channels.remove(&channel_id);
240                                }
241                                _ => {}
242                            }
243                            self.signaller
244                                .on_data_channel_event(channel_id, event, &self.thread);
245                        }
246                    },
247                }
248            }
249            InternalEvent::DescriptionAdded(cb, description_type, ty, remote_offer_generation) => {
250                if description_type == DescriptionType::Remote
251                    && ty == SdpType::Offer
252                    && remote_offer_generation == self.remote_offer_generation
253                {
254                    mem::swap(
255                        &mut self.pending_remote_mline_info,
256                        &mut self.remote_mline_info,
257                    );
258                    self.pending_remote_mline_info.clear();
259                    self.flush_pending_streams(false)?;
260                }
261                self.pipeline.set_state(gst::State::Playing)?;
262                cb();
263            }
264            InternalEvent::UpdateSignalingState => {
265                use gst_webrtc::WebRTCSignalingState::*;
266                let val = self
267                    .webrtc
268                    .property::<gst_webrtc::WebRTCSignalingState>("signaling-state");
269                let state = match val {
270                    Stable => SignalingState::Stable,
271                    HaveLocalOffer => SignalingState::HaveLocalOffer,
272                    HaveRemoteOffer => SignalingState::HaveRemoteOffer,
273                    HaveLocalPranswer => SignalingState::HaveLocalPranswer,
274                    HaveRemotePranswer => SignalingState::HaveRemotePranswer,
275                    Closed => SignalingState::Closed,
276                    i => {
277                        return Err(WebRtcError::Backend(format!(
278                            "unknown signaling state: {:?}",
279                            i
280                        )))
281                    }
282                };
283                self.signaller.update_signaling_state(state);
284            }
285            InternalEvent::UpdateGatheringState => {
286                use gst_webrtc::WebRTCICEGatheringState::*;
287                let val = self
288                    .webrtc
289                    .property::<gst_webrtc::WebRTCICEGatheringState>("ice-gathering-state");
290                let state = match val {
291                    New => GatheringState::New,
292                    Gathering => GatheringState::Gathering,
293                    Complete => GatheringState::Complete,
294                    i => {
295                        return Err(WebRtcError::Backend(format!(
296                            "unknown gathering state: {:?}",
297                            i
298                        )))
299                    }
300                };
301                self.signaller.update_gathering_state(state);
302            }
303            InternalEvent::UpdateIceConnectionState => {
304                use gst_webrtc::WebRTCICEConnectionState::*;
305                let val = self
306                    .webrtc
307                    .property::<gst_webrtc::WebRTCICEConnectionState>("ice-connection-state");
308                let state = match val {
309                    New => IceConnectionState::New,
310                    Checking => IceConnectionState::Checking,
311                    Connected => IceConnectionState::Connected,
312                    Completed => IceConnectionState::Completed,
313                    Disconnected => IceConnectionState::Disconnected,
314                    Failed => IceConnectionState::Failed,
315                    Closed => IceConnectionState::Closed,
316                    i => {
317                        return Err(WebRtcError::Backend(format!(
318                            "unknown ICE connection state: {:?}",
319                            i
320                        )))
321                    }
322                };
323                self.signaller.update_ice_connection_state(state);
324            }
325        }
326        Ok(())
327    }
328
329    fn quit(&mut self) {
330        self.signaller.close();
331
332        self.pipeline.set_state(gst::State::Null).unwrap();
333    }
334}
335
336impl GStreamerWebRtcController {
337    fn set_description(
338        &mut self,
339        desc: SessionDescription,
340        description_type: DescriptionType,
341        cb: Box<dyn FnOnce() + Send + 'static>,
342    ) -> WebRtcResult {
343        let ty = match desc.type_ {
344            SdpType::Answer => gst_webrtc::WebRTCSDPType::Answer,
345            SdpType::Offer => gst_webrtc::WebRTCSDPType::Offer,
346            SdpType::Pranswer => gst_webrtc::WebRTCSDPType::Pranswer,
347            SdpType::Rollback => gst_webrtc::WebRTCSDPType::Rollback,
348        };
349
350        let kind = match description_type {
351            DescriptionType::Local => "set-local-description",
352            DescriptionType::Remote => "set-remote-description",
353        };
354
355        let sdp = gst_sdp::SDPMessage::parse_buffer(desc.sdp.as_bytes()).unwrap();
356        if description_type == DescriptionType::Remote {
357            self.remote_offer_generation += 1;
358            self.store_remote_mline_info(&sdp);
359        }
360        let answer = gst_webrtc::WebRTCSessionDescription::new(ty, sdp);
361        let thread = self.thread.clone();
362        let remote_offer_generation = self.remote_offer_generation;
363        let promise = gst::Promise::with_change_func(move |_promise| {
364            // remote_offer_generation here ensures that DescriptionAdded doesn't
365            // flush pending_remote_mline_info for stale remote offer callbacks
366            thread.internal_event(InternalEvent::DescriptionAdded(
367                cb,
368                description_type,
369                desc.type_,
370                remote_offer_generation,
371            ));
372        });
373        self.webrtc.emit_by_name::<()>(kind, &[&answer, &promise]);
374        Ok(())
375    }
376
377    fn store_remote_mline_info(&mut self, sdp: &gst_sdp::SDPMessage) {
378        self.pending_remote_mline_info.clear();
379        for media in sdp.medias() {
380            let mut caps = gst::Caps::new_empty();
381            let caps_mut = caps.get_mut().expect("Fresh caps should be uniquely owned");
382            for format in media.formats() {
383                if format == "webrtc-datachannel" {
384                    return;
385                }
386                let pt = format
387                    .parse()
388                    .expect("Gstreamer provided noninteger format");
389                caps_mut.append(
390                    media
391                        .caps_from_media(pt)
392                        .expect("get_format() did not return a format from the SDP"),
393                );
394                self.pt_counter = cmp::max(self.pt_counter, pt + 1);
395            }
396            for s in caps_mut.iter_mut() {
397                // the caps are application/x-unknown by default, which will fail
398                // to intersect
399                //
400                // see https://gitlab.freedesktop.org/gstreamer/gst-plugins-bad/blob/ba62917fbfd98ea76d4e066a6f18b4a14b847362/ext/webrtc/gstwebrtcbin.c#L2521
401                s.set_name("application/x-rtp")
402            }
403            // This info is not current until the promise from set-remote-description is resolved,
404            // to avoid any races where we attempt to link streams before the promise resolves we
405            // queue this up in a pending buffer
406            self.pending_remote_mline_info.push(MLineInfo {
407                caps,
408                // XXXManishearth in the (yet unsupported) case of dynamic stream addition and renegotiation
409                // this will need to be checked against the current set of streams
410                is_used: false,
411                // XXXManishearth ideally, we keep track of all payloads and have the capability of picking
412                // the appropriate decoder. For this, a bunch of the streams code will have to be moved into
413                // a webrtc-specific abstraction.
414                payload: media
415                    .format(0)
416                    .expect("Gstreamer reported incorrect formats_len()")
417                    .parse()
418                    .expect("Gstreamer provided noninteger format"),
419            });
420        }
421    }
422
423    /// Streams need to be linked to the correct pads, so we buffer them up until we know enough
424    /// to do this.
425    ///
426    /// When we get a remote offer, we store the relevant m-line information so that we can
427    /// pick the correct sink pad and payload. Shortly after we look for any pending streams
428    /// and connect them to available compatible m-lines using link_stream.
429    ///
430    /// When we create an offer, we're controlling the pad order, so we set request_new_pads
431    /// to true and forcefully link all pending streams before generating the offer.
432    ///
433    /// When request_new_pads is false, we may still request new pads, however we only do this for
434    /// streams that have already been negotiated by the remote.
435    fn link_stream(
436        &mut self,
437        stream_id: &MediaStreamId,
438        stream: &mut GStreamerMediaStream,
439        request_new_pads: bool,
440    ) -> WebRtcResult {
441        let caps = stream.caps();
442        let idx = self
443            .remote_mline_info
444            .iter()
445            .enumerate()
446            .filter(|(_, x)| !x.is_used)
447            .find(|(_, x)| x.caps.can_intersect(caps))
448            .map(|x| x.0);
449        if let Some(idx) = idx {
450            if idx >= self.request_pad_counter {
451                for i in self.request_pad_counter..=idx {
452                    // webrtcbin needs you to request pads (or use element.link(webrtcbin))
453                    // however, it also wants them to be connected in the correct order.
454                    //
455                    // Here, we make sure all the numbered sink pads have been created beforehand, up to
456                    // and including the one we need here.
457                    //
458                    // An alternate fix is to sort pending_streams according to the m-line index
459                    // and just do it in order. This also seems brittle.
460                    self.webrtc
461                        .request_pad_simple(&format!("sink_{}", i))
462                        .ok_or("Cannot request sink pad")?;
463                }
464                self.request_pad_counter = idx + 1;
465            }
466            stream.attach_to_pipeline(&self.pipeline);
467            let element = stream.encoded();
468            self.remote_mline_info[idx].is_used = true;
469            let caps = stream.caps_with_payload(self.remote_mline_info[idx].payload);
470            element.set_property("caps", &caps);
471            let src = element.static_pad("src").ok_or("Cannot request src pad")?;
472            let sink = self
473                .webrtc
474                .static_pad(&format!("sink_{}", idx))
475                .ok_or("Cannot request sink pad")?;
476            src.link(&sink)?;
477            self.streams.push(*stream_id);
478        } else if request_new_pads {
479            stream.attach_to_pipeline(&self.pipeline);
480            let element = stream.encoded();
481            let caps = stream.caps_with_payload(self.pt_counter);
482            self.pt_counter += 1;
483            element.set_property("caps", &caps);
484            let src = element.static_pad("src").ok_or("Cannot request src pad")?;
485            let sink = self
486                .webrtc
487                .request_pad_simple(&format!("sink_{}", self.request_pad_counter))
488                .ok_or("Cannot request sink pad")?;
489            self.request_pad_counter += 1;
490            src.link(&sink)?;
491            self.streams.push(*stream_id);
492        } else {
493            self.pending_streams.push(*stream_id);
494        }
495        Ok(())
496    }
497
498    /// link_stream, but for all pending streams
499    fn flush_pending_streams(&mut self, request_new_pads: bool) -> WebRtcResult {
500        let pending_streams = mem::replace(&mut self.pending_streams, vec![]);
501        for stream_id in pending_streams {
502            let stream =
503                get_stream(&stream_id).expect("Media streams registry does not contain such ID");
504            let mut stream = stream.lock().unwrap();
505            let mut stream = stream
506                .as_mut_any()
507                .downcast_mut::<GStreamerMediaStream>()
508                .ok_or("Does not currently support non-gstreamer streams")?;
509            self.link_stream(&stream_id, &mut stream, request_new_pads)?;
510        }
511        Ok(())
512    }
513
514    fn start_pipeline(&mut self) -> WebRtcResult {
515        self.pipeline.add(&self.webrtc)?;
516
517        // gstreamer needs Sync on these callbacks for some reason
518        // https://github.com/sdroege/gstreamer-rs/issues/154
519        let thread = Mutex::new(self.thread.clone());
520        self.webrtc
521            .connect("on-ice-candidate", false, move |values| {
522                thread
523                    .lock()
524                    .unwrap()
525                    .internal_event(InternalEvent::OnIceCandidate(candidate(values)));
526                None
527            });
528
529        let thread = Arc::new(Mutex::new(self.thread.clone()));
530        self.webrtc.connect_pad_added({
531            let pipeline_weak = self.pipeline.downgrade();
532            move |_element, pad| {
533                let Some(pipe) = pipeline_weak.upgrade() else {
534                    warn!("Pipeline already deallocated");
535                    return;
536                };
537                process_new_stream(pad, &pipe, thread.clone());
538            }
539        });
540
541        // gstreamer needs Sync on these callbacks for some reason
542        // https://github.com/sdroege/gstreamer-rs/issues/154
543        let thread = Mutex::new(self.thread.clone());
544        self.webrtc
545            .connect("on-negotiation-needed", false, move |_values| {
546                thread
547                    .lock()
548                    .unwrap()
549                    .internal_event(InternalEvent::OnNegotiationNeeded);
550                None
551            });
552
553        let thread = Mutex::new(self.thread.clone());
554        self.webrtc
555            .connect("notify::signaling-state", false, move |_values| {
556                thread
557                    .lock()
558                    .unwrap()
559                    .internal_event(InternalEvent::UpdateSignalingState);
560                None
561            });
562        let thread = Mutex::new(self.thread.clone());
563        self.webrtc
564            .connect("notify::ice-connection-state", false, move |_values| {
565                thread
566                    .lock()
567                    .unwrap()
568                    .internal_event(InternalEvent::UpdateIceConnectionState);
569                None
570            });
571        let thread = Mutex::new(self.thread.clone());
572        self.webrtc
573            .connect("notify::ice-gathering-state", false, move |_values| {
574                thread
575                    .lock()
576                    .unwrap()
577                    .internal_event(InternalEvent::UpdateGatheringState);
578                None
579            });
580        let thread = Mutex::new(self.thread.clone());
581        let data_channels = self.data_channels.clone();
582        let next_data_channel_id = self.next_data_channel_id.clone();
583        self.webrtc
584            .connect("on-data-channel", false, move |channel| {
585                let channel = channel[1]
586                    .get::<gst_webrtc::WebRTCDataChannel>()
587                    .map_err(|e| e.to_string())
588                    .expect("Invalid data channel");
589                let id = next_data_channel_id.fetch_add(1, Ordering::Relaxed);
590                let thread_ = thread.lock().unwrap().clone();
591                match GStreamerWebRtcDataChannel::from(&id, channel, &thread_) {
592                    Ok(channel) => {
593                        let mut closed_channel = false;
594                        {
595                            thread_.internal_event(InternalEvent::OnDataChannelEvent(
596                                id,
597                                DataChannelEvent::NewChannel,
598                            ));
599
600                            let mut data_channels = data_channels.lock().unwrap();
601                            if let Some(ref mut channel) = data_channels.get_mut(&id) {
602                                match channel {
603                                    DataChannelEventTarget::Buffered(ref mut events) => {
604                                        for event in events.drain(0..) {
605                                            match event {
606                                                DataChannelEvent::Close => closed_channel = true,
607                                                _ => {}
608                                            }
609                                            thread_.internal_event(
610                                                InternalEvent::OnDataChannelEvent(id, event),
611                                            );
612                                        }
613                                    }
614                                    _ => debug_assert!(
615                                        false,
616                                        "Trying to register a data channel with an existing ID"
617                                    ),
618                                }
619                            }
620                            data_channels.remove(&id);
621                        }
622                        if !closed_channel {
623                            if register_data_channel(data_channels.clone(), id, channel).is_err() {
624                                warn!("Could not register data channel {:?}", id);
625                                return None;
626                            }
627                        }
628                    }
629                    Err(error) => {
630                        warn!("Could not create data channel {:?}", error);
631                    }
632                }
633                None
634            });
635
636        self.pipeline.set_state(gst::State::Ready)?;
637        Ok(())
638    }
639}
640
641pub fn construct(
642    signaller: Box<dyn WebRtcSignaller>,
643    thread: WebRtcThread,
644) -> Result<GStreamerWebRtcController, WebRtcError> {
645    let main_loop = glib::MainLoop::new(None, false);
646    let pipeline = gst::Pipeline::with_name("webrtc main");
647    pipeline.set_start_time(gst::ClockTime::NONE);
648    pipeline.set_base_time(*BACKEND_BASE_TIME);
649    pipeline.use_clock(Some(&gst::SystemClock::obtain()));
650    let webrtc = gst::ElementFactory::make("webrtcbin")
651        .name("sendrecv")
652        .build()
653        .map_err(|error| format!("webrtcbin element not found: {error:?}"))?;
654    let mut controller = GStreamerWebRtcController {
655        webrtc,
656        pipeline,
657        signaller,
658        thread,
659        remote_mline_info: vec![],
660        pending_remote_mline_info: vec![],
661        streams: vec![],
662        pending_streams: vec![],
663        pt_counter: 96,
664        request_pad_counter: 0,
665        remote_offer_generation: 0,
666        delayed_negotiation: false,
667        _main_loop: main_loop,
668        data_channels: Arc::new(Mutex::new(HashMap::new())),
669        next_data_channel_id: Arc::new(AtomicUsize::new(0)),
670    };
671    controller.start_pipeline()?;
672    Ok(controller)
673}
674
675fn on_offer_or_answer_created(
676    ty: SdpType,
677    reply: &gst::StructureRef,
678    cb: Box<dyn FnOnce(SessionDescription) + Send + 'static>,
679) {
680    debug_assert!(ty == SdpType::Offer || ty == SdpType::Answer);
681    let reply = reply
682        .value(ty.as_str())
683        .unwrap()
684        .get::<gst_webrtc::WebRTCSessionDescription>()
685        .expect("Invalid argument");
686
687    let type_ = match reply.type_() {
688        gst_webrtc::WebRTCSDPType::Answer => SdpType::Answer,
689        gst_webrtc::WebRTCSDPType::Offer => SdpType::Offer,
690        gst_webrtc::WebRTCSDPType::Pranswer => SdpType::Pranswer,
691        gst_webrtc::WebRTCSDPType::Rollback => SdpType::Rollback,
692        _ => panic!("unknown sdp response"),
693    };
694
695    let desc = SessionDescription {
696        sdp: reply.sdp().as_text().unwrap(),
697        type_,
698    };
699
700    cb(desc);
701}
702
703fn on_incoming_stream(pipe: &gst::Pipeline, thread: Arc<Mutex<WebRtcThread>>, pad: &gst::Pad) {
704    let decodebin = gst::ElementFactory::make("decodebin").build().unwrap();
705    let caps = pad.query_caps(None);
706    let name = caps
707        .structure(0)
708        .unwrap()
709        .get::<String>("media")
710        .expect("Invalid 'media' field");
711    let decodebin2 = decodebin.clone();
712    decodebin.connect_pad_added({
713        let pipeline_weak = pipe.downgrade();
714        move |_element, pad| {
715            let Some(pipe) = pipeline_weak.upgrade() else {
716                warn!("Pipeline already deallocated");
717                return;
718            };
719            on_incoming_decodebin_stream(pad, &pipe, thread.clone(), &name);
720        }
721    });
722    pipe.add(&decodebin).unwrap();
723
724    let decodepad = decodebin.static_pad("sink").unwrap();
725    pad.link(&decodepad).unwrap();
726    decodebin2.sync_state_with_parent().unwrap();
727}
728
729fn on_incoming_decodebin_stream(
730    pad: &gst::Pad,
731    pipe: &gst::Pipeline,
732    thread: Arc<Mutex<WebRtcThread>>,
733    name: &str,
734) {
735    let proxy_sink = gst::ElementFactory::make("proxysink").build().unwrap();
736    let proxy_src = gst::ElementFactory::make("proxysrc")
737        .property("proxysink", &proxy_sink)
738        .build()
739        .unwrap();
740    pipe.add(&proxy_sink).unwrap();
741    let sinkpad = proxy_sink.static_pad("sink").unwrap();
742
743    pad.link(&sinkpad).unwrap();
744    proxy_sink.sync_state_with_parent().unwrap();
745
746    let (stream, ty) = if name == "video" {
747        (
748            GStreamerMediaStream::create_video_from(proxy_src),
749            MediaStreamType::Video,
750        )
751    } else {
752        (
753            GStreamerMediaStream::create_audio_from(proxy_src),
754            MediaStreamType::Audio,
755        )
756    };
757    thread
758        .lock()
759        .unwrap()
760        .internal_event(InternalEvent::OnAddStream(stream, ty));
761}
762
763fn process_new_stream(pad: &gst::Pad, pipe: &gst::Pipeline, thread: Arc<Mutex<WebRtcThread>>) {
764    if pad.direction() != gst::PadDirection::Src {
765        // Ignore outgoing pad notifications.
766        return;
767    }
768    on_incoming_stream(pipe, thread, pad)
769}
770
771fn candidate(values: &[glib::Value]) -> IceCandidate {
772    let _webrtc = values[0].get::<gst::Element>().expect("Invalid argument");
773    let sdp_mline_index = values[1].get::<u32>().expect("Invalid argument");
774    let candidate = values[2].get::<String>().expect("Invalid argument");
775
776    IceCandidate {
777        sdp_mline_index,
778        candidate,
779    }
780}
781
782fn register_data_channel(
783    registry: Arc<Mutex<HashMap<DataChannelId, DataChannelEventTarget>>>,
784    id: DataChannelId,
785    channel: GStreamerWebRtcDataChannel,
786) -> WebRtcDataChannelResult {
787    if registry.lock().unwrap().contains_key(&id) {
788        return Err(WebRtcError::Backend(
789            "Could not register data channel. ID collision".to_owned(),
790        ));
791    }
792    registry
793        .lock()
794        .unwrap()
795        .insert(id, DataChannelEventTarget::Created(channel));
796    Ok(id)
797}