Skip to main content

servo_media_gstreamer/
webrtc.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
4
5use std::collections::HashMap;
6use std::sync::atomic::{AtomicUsize, Ordering};
7use std::sync::{Arc, Mutex};
8use std::{cmp, mem};
9
10use glib;
11use glib::prelude::*;
12use gstreamer;
13use gstreamer::prelude::*;
14use gstreamer_sdp;
15use gstreamer_webrtc;
16use log::warn;
17use servo_media_streams::MediaStreamType;
18use servo_media_streams::registry::{MediaStreamId, get_stream};
19use servo_media_webrtc::datachannel::DataChannelId;
20use servo_media_webrtc::thread::InternalEvent;
21use servo_media_webrtc::{WebRtcController as WebRtcThread, *};
22
23use super::BACKEND_BASE_TIME;
24use crate::datachannel::GStreamerWebRtcDataChannel;
25use crate::media_stream::GStreamerMediaStream;
26
27// TODO:
28// - figure out purpose of glib loop
29
30#[derive(Debug, Clone)]
31pub struct MLineInfo {
32    /// The caps for the given m-line
33    caps: gstreamer::Caps,
34    /// Whether or not this sink pad has already been connected
35    is_used: bool,
36    /// The payload value of the given m-line
37    payload: i32,
38}
39
40enum DataChannelEventTarget {
41    Buffered(Vec<DataChannelEvent>),
42    Created(GStreamerWebRtcDataChannel),
43}
44
45pub struct GStreamerWebRtcController {
46    webrtc: gstreamer::Element,
47    pipeline: gstreamer::Pipeline,
48    /// We can't trigger a negotiation-needed event until we have streams, or otherwise
49    /// a createOffer() call will lead to bad SDP. Instead, we delay negotiation.
50    delayed_negotiation: bool,
51    /// A handle to the event loop abstraction surrounding the webrtc implementations,
52    /// which lets gstreamer callbacks send events back to the event loop to run on this object
53    thread: WebRtcThread,
54    signaller: Box<dyn WebRtcSignaller>,
55    /// All the streams that are actually connected to the webrtcbin (i.e., their presence has already
56    /// been negotiated)
57    streams: Vec<MediaStreamId>,
58    /// Disconnected streams that are waiting to be linked. Streams are
59    /// only linked when:
60    ///
61    /// - An offer is made (all pending streams are flushed)
62    /// - An offer is received (all matching pending streams are flushed)
63    /// - A stream is added when there is a so-far-disconnected remote-m-line
64    ///
65    /// In other words, these are all yet to be negotiated
66    ///
67    /// See link_stream
68    pending_streams: Vec<MediaStreamId>,
69    /// Each new webrtc stream should have a new payload/pt value, starting at 96
70    ///
71    /// This is maintained as a known yet-unused payload number, being incremented whenever
72    /// we use it, and set to (remote_pt + 1) if the remote sends us a stream with a higher pt
73    pt_counter: i32,
74    /// We keep track of how many request pads have been created on webrtcbin
75    /// so that we can request more to fill in the gaps and acquire a specific pad if necessary
76    request_pad_counter: usize,
77    /// Streams need to be connected to the relevant sink pad, and we figure this out
78    /// by keeping track of the caps of each m-line in the SDP.
79    remote_mline_info: Vec<MLineInfo>,
80    /// Temporary storage for remote_mline_info until the remote description is applied
81    ///
82    /// Without this, a unluckily timed call to link_stream() may happen before the webrtcbin
83    /// knows the remote description, but while we _think_ it does
84    pending_remote_mline_info: Vec<MLineInfo>,
85    /// In case we get multiple remote offers, this lets us keep track of which is the newest
86    remote_offer_generation: u32,
87    _main_loop: glib::MainLoop,
88    data_channels: Arc<Mutex<HashMap<DataChannelId, DataChannelEventTarget>>>,
89    next_data_channel_id: Arc<AtomicUsize>,
90}
91
92impl WebRtcControllerBackend for GStreamerWebRtcController {
93    fn add_ice_candidate(&mut self, candidate: IceCandidate) -> WebRtcResult {
94        self.webrtc.emit_by_name::<()>(
95            "add-ice-candidate",
96            &[&candidate.sdp_mline_index, &candidate.candidate],
97        );
98        Ok(())
99    }
100
101    fn set_remote_description(
102        &mut self,
103        desc: SessionDescription,
104        cb: Box<dyn FnOnce() + Send + 'static>,
105    ) -> WebRtcResult {
106        self.set_description(desc, DescriptionType::Remote, cb)
107    }
108
109    fn set_local_description(
110        &mut self,
111        desc: SessionDescription,
112        cb: Box<dyn FnOnce() + Send + 'static>,
113    ) -> WebRtcResult {
114        self.set_description(desc, DescriptionType::Local, cb)
115    }
116
117    fn create_offer(
118        &mut self,
119        cb: Box<dyn FnOnce(SessionDescription) + Send + 'static>,
120    ) -> WebRtcResult {
121        self.flush_pending_streams(true)?;
122        self.pipeline.set_state(gstreamer::State::Playing)?;
123        let promise = gstreamer::Promise::with_change_func(move |res| {
124            res.map(|s| on_offer_or_answer_created(SdpType::Offer, s.unwrap(), cb))
125                .unwrap();
126        });
127
128        self.webrtc
129            .emit_by_name::<()>("create-offer", &[&None::<gstreamer::Structure>, &promise]);
130        Ok(())
131    }
132
133    fn create_answer(
134        &mut self,
135        cb: Box<dyn FnOnce(SessionDescription) + Send + 'static>,
136    ) -> WebRtcResult {
137        let promise = gstreamer::Promise::with_change_func(move |res| {
138            res.map(|s| on_offer_or_answer_created(SdpType::Answer, s.unwrap(), cb))
139                .unwrap();
140        });
141
142        self.webrtc
143            .emit_by_name::<()>("create-answer", &[&None::<gstreamer::Structure>, &promise]);
144        Ok(())
145    }
146
147    fn add_stream(&mut self, stream_id: &MediaStreamId) -> WebRtcResult {
148        let stream =
149            get_stream(stream_id).expect("Media streams registry does not contain such ID");
150        let mut stream = stream.lock().unwrap();
151        let stream = stream
152            .as_mut_any()
153            .downcast_mut::<GStreamerMediaStream>()
154            .ok_or("Does not currently support non-gstreamer streams")?;
155        self.link_stream(stream_id, stream, false)?;
156        if self.delayed_negotiation && (self.streams.len() > 1 || self.pending_streams.len() > 1) {
157            self.delayed_negotiation = false;
158            self.signaller.on_negotiation_needed(&self.thread);
159        }
160        Ok(())
161    }
162
163    fn create_data_channel(&mut self, init: &DataChannelInit) -> WebRtcDataChannelResult {
164        let id = self.next_data_channel_id.fetch_add(1, Ordering::Relaxed);
165        match GStreamerWebRtcDataChannel::new(&id, &self.webrtc, &self.thread, init) {
166            Ok(channel) => register_data_channel(self.data_channels.clone(), id, channel),
167            Err(error) => Err(WebRtcError::Backend(error)),
168        }
169    }
170
171    fn close_data_channel(&mut self, id: &DataChannelId) -> WebRtcResult {
172        // There is no need to unregister the channel here. It will be unregistered
173        // when the data channel backend triggers the on closed event.
174        let mut data_channels = self.data_channels.lock().unwrap();
175        match data_channels.get(id) {
176            Some(ref channel) => match channel {
177                DataChannelEventTarget::Created(channel) => {
178                    channel.close();
179                    Ok(())
180                },
181                DataChannelEventTarget::Buffered(_) => data_channels
182                    .remove(id)
183                    .ok_or(WebRtcError::Backend("Unknown data channel".to_owned()))
184                    .map(|_| ()),
185            },
186            None => Err(WebRtcError::Backend("Unknown data channel".to_owned())),
187        }
188    }
189
190    fn send_data_channel_message(
191        &mut self,
192        id: &DataChannelId,
193        message: &DataChannelMessage,
194    ) -> WebRtcResult {
195        match self.data_channels.lock().unwrap().get(id) {
196            Some(ref channel) => match channel {
197                DataChannelEventTarget::Created(channel) => {
198                    channel.send(message);
199                    Ok(())
200                },
201                _ => Ok(()),
202            },
203            None => Err(WebRtcError::Backend("Unknown data channel".to_owned())),
204        }
205    }
206
207    fn configure(&mut self, stun_server: &str, policy: BundlePolicy) -> WebRtcResult {
208        self.webrtc
209            .set_property_from_str("stun-server", stun_server);
210        self.webrtc
211            .set_property_from_str("bundle-policy", policy.as_str());
212        Ok(())
213    }
214
215    fn internal_event(&mut self, e: thread::InternalEvent) -> WebRtcResult {
216        match e {
217            InternalEvent::OnNegotiationNeeded => {
218                if self.streams.is_empty() && self.pending_streams.is_empty() {
219                    // we have no streams
220
221                    // If the pipeline starts playing and on-negotiation-needed is present before there are any
222                    // media streams, an invalid SDP offer will be created. Therefore, delay emitting the signal
223                    self.delayed_negotiation = true;
224                } else {
225                    self.signaller.on_negotiation_needed(&self.thread);
226                }
227            },
228            InternalEvent::OnIceCandidate(candidate) => {
229                self.signaller.on_ice_candidate(&self.thread, candidate);
230            },
231            InternalEvent::OnAddStream(stream, ty) => {
232                self.pipeline.set_state(gstreamer::State::Playing)?;
233                self.signaller.on_add_stream(&stream, ty);
234            },
235            InternalEvent::OnDataChannelEvent(channel_id, event) => {
236                let mut data_channels = self.data_channels.lock().unwrap();
237                match data_channels.get_mut(&channel_id) {
238                    None => {
239                        data_channels
240                            .insert(channel_id, DataChannelEventTarget::Buffered(vec![event]));
241                    },
242                    Some(ref mut channel) => match channel {
243                        &mut &mut DataChannelEventTarget::Buffered(ref mut events) => {
244                            events.push(event);
245                            return Ok(());
246                        },
247                        DataChannelEventTarget::Created(_) => {
248                            if let DataChannelEvent::Close = event {
249                                data_channels.remove(&channel_id);
250                            }
251                            self.signaller
252                                .on_data_channel_event(channel_id, event, &self.thread);
253                        },
254                    },
255                }
256            },
257            InternalEvent::DescriptionAdded(cb, description_type, ty, remote_offer_generation) => {
258                if description_type == DescriptionType::Remote &&
259                    ty == SdpType::Offer &&
260                    remote_offer_generation == self.remote_offer_generation
261                {
262                    mem::swap(
263                        &mut self.pending_remote_mline_info,
264                        &mut self.remote_mline_info,
265                    );
266                    self.pending_remote_mline_info.clear();
267                    self.flush_pending_streams(false)?;
268                }
269                self.pipeline.set_state(gstreamer::State::Playing)?;
270                cb();
271            },
272            InternalEvent::UpdateSignalingState => {
273                use gstreamer_webrtc::WebRTCSignalingState::*;
274                let val = self
275                    .webrtc
276                    .property::<gstreamer_webrtc::WebRTCSignalingState>("signaling-state");
277                let state = match val {
278                    Stable => SignalingState::Stable,
279                    HaveLocalOffer => SignalingState::HaveLocalOffer,
280                    HaveRemoteOffer => SignalingState::HaveRemoteOffer,
281                    HaveLocalPranswer => SignalingState::HaveLocalPranswer,
282                    HaveRemotePranswer => SignalingState::HaveRemotePranswer,
283                    Closed => SignalingState::Closed,
284                    i => {
285                        return Err(WebRtcError::Backend(format!(
286                            "unknown signaling state: {:?}",
287                            i
288                        )));
289                    },
290                };
291                self.signaller.update_signaling_state(state);
292            },
293            InternalEvent::UpdateGatheringState => {
294                use gstreamer_webrtc::WebRTCICEGatheringState::*;
295                let val = self
296                    .webrtc
297                    .property::<gstreamer_webrtc::WebRTCICEGatheringState>("ice-gathering-state");
298                let state = match val {
299                    New => GatheringState::New,
300                    Gathering => GatheringState::Gathering,
301                    Complete => GatheringState::Complete,
302                    i => {
303                        return Err(WebRtcError::Backend(format!(
304                            "unknown gathering state: {:?}",
305                            i
306                        )));
307                    },
308                };
309                self.signaller.update_gathering_state(state);
310            },
311            InternalEvent::UpdateIceConnectionState => {
312                use gstreamer_webrtc::WebRTCICEConnectionState::*;
313                let val = self
314                    .webrtc
315                    .property::<gstreamer_webrtc::WebRTCICEConnectionState>("ice-connection-state");
316                let state = match val {
317                    New => IceConnectionState::New,
318                    Checking => IceConnectionState::Checking,
319                    Connected => IceConnectionState::Connected,
320                    Completed => IceConnectionState::Completed,
321                    Disconnected => IceConnectionState::Disconnected,
322                    Failed => IceConnectionState::Failed,
323                    Closed => IceConnectionState::Closed,
324                    i => {
325                        return Err(WebRtcError::Backend(format!(
326                            "unknown ICE connection state: {:?}",
327                            i
328                        )));
329                    },
330                };
331                self.signaller.update_ice_connection_state(state);
332            },
333        }
334        Ok(())
335    }
336
337    fn quit(&mut self) {
338        self.signaller.close();
339
340        self.pipeline.set_state(gstreamer::State::Null).unwrap();
341    }
342}
343
344impl GStreamerWebRtcController {
345    fn set_description(
346        &mut self,
347        desc: SessionDescription,
348        description_type: DescriptionType,
349        cb: Box<dyn FnOnce() + Send + 'static>,
350    ) -> WebRtcResult {
351        let ty = match desc.type_ {
352            SdpType::Answer => gstreamer_webrtc::WebRTCSDPType::Answer,
353            SdpType::Offer => gstreamer_webrtc::WebRTCSDPType::Offer,
354            SdpType::Pranswer => gstreamer_webrtc::WebRTCSDPType::Pranswer,
355            SdpType::Rollback => gstreamer_webrtc::WebRTCSDPType::Rollback,
356        };
357
358        let kind = match description_type {
359            DescriptionType::Local => "set-local-description",
360            DescriptionType::Remote => "set-remote-description",
361        };
362
363        let sdp = gstreamer_sdp::SDPMessage::parse_buffer(desc.sdp.as_bytes()).unwrap();
364        if description_type == DescriptionType::Remote {
365            self.remote_offer_generation += 1;
366            self.store_remote_mline_info(&sdp);
367        }
368        let answer = gstreamer_webrtc::WebRTCSessionDescription::new(ty, sdp);
369        let thread = self.thread.clone();
370        let remote_offer_generation = self.remote_offer_generation;
371        let promise = gstreamer::Promise::with_change_func(move |_promise| {
372            // remote_offer_generation here ensures that DescriptionAdded doesn't
373            // flush pending_remote_mline_info for stale remote offer callbacks
374            thread.internal_event(InternalEvent::DescriptionAdded(
375                cb,
376                description_type,
377                desc.type_,
378                remote_offer_generation,
379            ));
380        });
381        self.webrtc.emit_by_name::<()>(kind, &[&answer, &promise]);
382        Ok(())
383    }
384
385    fn store_remote_mline_info(&mut self, sdp: &gstreamer_sdp::SDPMessage) {
386        self.pending_remote_mline_info.clear();
387        for media in sdp.medias() {
388            let mut caps = gstreamer::Caps::new_empty();
389            let caps_mut = caps.get_mut().expect("Fresh caps should be uniquely owned");
390            for format in media.formats() {
391                if format == "webrtc-datachannel" {
392                    return;
393                }
394                let pt = format
395                    .parse()
396                    .expect("Gstreamer provided noninteger format");
397                caps_mut.append(
398                    media
399                        .caps_from_media(pt)
400                        .expect("get_format() did not return a format from the SDP"),
401                );
402                self.pt_counter = cmp::max(self.pt_counter, pt + 1);
403            }
404            for s in caps_mut.iter_mut() {
405                // the caps are application/x-unknown by default, which will fail
406                // to intersect
407                //
408                // see https://gitlab.freedesktop.org/gstreamer/gst-plugins-bad/blob/ba62917fbfd98ea76d4e066a6f18b4a14b847362/ext/webrtc/gstwebrtcbin.c#L2521
409                s.set_name("application/x-rtp")
410            }
411            // This info is not current until the promise from set-remote-description is resolved,
412            // to avoid any races where we attempt to link streams before the promise resolves we
413            // queue this up in a pending buffer
414            self.pending_remote_mline_info.push(MLineInfo {
415                caps,
416                // XXXManishearth in the (yet unsupported) case of dynamic stream addition and renegotiation
417                // this will need to be checked against the current set of streams
418                is_used: false,
419                // XXXManishearth ideally, we keep track of all payloads and have the capability of picking
420                // the appropriate decoder. For this, a bunch of the streams code will have to be moved into
421                // a webrtc-specific abstraction.
422                payload: media
423                    .format(0)
424                    .expect("Gstreamer reported incorrect formats_len()")
425                    .parse()
426                    .expect("Gstreamer provided noninteger format"),
427            });
428        }
429    }
430
431    /// Streams need to be linked to the correct pads, so we buffer them up until we know enough
432    /// to do this.
433    ///
434    /// When we get a remote offer, we store the relevant m-line information so that we can
435    /// pick the correct sink pad and payload. Shortly after we look for any pending streams
436    /// and connect them to available compatible m-lines using link_stream.
437    ///
438    /// When we create an offer, we're controlling the pad order, so we set request_new_pads
439    /// to true and forcefully link all pending streams before generating the offer.
440    ///
441    /// When request_new_pads is false, we may still request new pads, however we only do this for
442    /// streams that have already been negotiated by the remote.
443    fn link_stream(
444        &mut self,
445        stream_id: &MediaStreamId,
446        stream: &mut GStreamerMediaStream,
447        request_new_pads: bool,
448    ) -> WebRtcResult {
449        let caps = stream.caps();
450        let idx = self
451            .remote_mline_info
452            .iter()
453            .enumerate()
454            .filter(|(_, x)| !x.is_used)
455            .find(|(_, x)| x.caps.can_intersect(caps))
456            .map(|x| x.0);
457        if let Some(idx) = idx {
458            if idx >= self.request_pad_counter {
459                for i in self.request_pad_counter..=idx {
460                    // webrtcbin needs you to request pads (or use element.link(webrtcbin))
461                    // however, it also wants them to be connected in the correct order.
462                    //
463                    // Here, we make sure all the numbered sink pads have been created beforehand, up to
464                    // and including the one we need here.
465                    //
466                    // An alternate fix is to sort pending_streams according to the m-line index
467                    // and just do it in order. This also seems brittle.
468                    self.webrtc
469                        .request_pad_simple(&format!("sink_{}", i))
470                        .ok_or("Cannot request sink pad")?;
471                }
472                self.request_pad_counter = idx + 1;
473            }
474            stream.attach_to_pipeline(&self.pipeline);
475            let element = stream.encoded().map_err(|_| {
476                WebRtcError::Backend(String::from("Failed to attach encoding adapters to stream"))
477            })?;
478            self.remote_mline_info[idx].is_used = true;
479            let caps = stream.caps_with_payload(self.remote_mline_info[idx].payload);
480            element.set_property("caps", &caps);
481            let src = element.static_pad("src").ok_or("Cannot request src pad")?;
482            let sink = self
483                .webrtc
484                .static_pad(&format!("sink_{}", idx))
485                .ok_or("Cannot request sink pad")?;
486            src.link(&sink)?;
487            self.streams.push(*stream_id);
488        } else if request_new_pads {
489            stream.attach_to_pipeline(&self.pipeline);
490            let element = stream.encoded().map_err(|_| {
491                WebRtcError::Backend(String::from("Failed to attach encoding adapters to stream"))
492            })?;
493            let caps = stream.caps_with_payload(self.pt_counter);
494            self.pt_counter += 1;
495            element.set_property("caps", &caps);
496            let src = element.static_pad("src").ok_or("Cannot request src pad")?;
497            let sink = self
498                .webrtc
499                .request_pad_simple(&format!("sink_{}", self.request_pad_counter))
500                .ok_or("Cannot request sink pad")?;
501            self.request_pad_counter += 1;
502            src.link(&sink)?;
503            self.streams.push(*stream_id);
504        } else {
505            self.pending_streams.push(*stream_id);
506        }
507        Ok(())
508    }
509
510    /// link_stream, but for all pending streams
511    fn flush_pending_streams(&mut self, request_new_pads: bool) -> WebRtcResult {
512        let pending_streams = std::mem::take(&mut self.pending_streams);
513        for stream_id in pending_streams {
514            let stream =
515                get_stream(&stream_id).expect("Media streams registry does not contain such ID");
516            let mut stream = stream.lock().unwrap();
517            let stream = stream
518                .as_mut_any()
519                .downcast_mut::<GStreamerMediaStream>()
520                .ok_or("Does not currently support non-gstreamer streams")?;
521            self.link_stream(&stream_id, stream, request_new_pads)?;
522        }
523        Ok(())
524    }
525
526    fn start_pipeline(&mut self) -> WebRtcResult {
527        self.pipeline.add(&self.webrtc)?;
528
529        // gstreamer needs Sync on these callbacks for some reason
530        // https://github.com/sdroege/gstreamer-rs/issues/154
531        let thread = Mutex::new(self.thread.clone());
532        self.webrtc
533            .connect("on-ice-candidate", false, move |values| {
534                thread
535                    .lock()
536                    .unwrap()
537                    .internal_event(InternalEvent::OnIceCandidate(candidate(values)));
538                None
539            });
540
541        let thread = Arc::new(Mutex::new(self.thread.clone()));
542        self.webrtc.connect_pad_added({
543            let pipeline_weak = self.pipeline.downgrade();
544            move |_element, pad| {
545                let Some(pipe) = pipeline_weak.upgrade() else {
546                    warn!("Pipeline already deallocated");
547                    return;
548                };
549                process_new_stream(pad, &pipe, thread.clone());
550            }
551        });
552
553        // gstreamer needs Sync on these callbacks for some reason
554        // https://github.com/sdroege/gstreamer-rs/issues/154
555        let thread = Mutex::new(self.thread.clone());
556        self.webrtc
557            .connect("on-negotiation-needed", false, move |_values| {
558                thread
559                    .lock()
560                    .unwrap()
561                    .internal_event(InternalEvent::OnNegotiationNeeded);
562                None
563            });
564
565        let thread = Mutex::new(self.thread.clone());
566        self.webrtc
567            .connect("notify::signaling-state", false, move |_values| {
568                thread
569                    .lock()
570                    .unwrap()
571                    .internal_event(InternalEvent::UpdateSignalingState);
572                None
573            });
574        let thread = Mutex::new(self.thread.clone());
575        self.webrtc
576            .connect("notify::ice-connection-state", false, move |_values| {
577                thread
578                    .lock()
579                    .unwrap()
580                    .internal_event(InternalEvent::UpdateIceConnectionState);
581                None
582            });
583        let thread = Mutex::new(self.thread.clone());
584        self.webrtc
585            .connect("notify::ice-gathering-state", false, move |_values| {
586                thread
587                    .lock()
588                    .unwrap()
589                    .internal_event(InternalEvent::UpdateGatheringState);
590                None
591            });
592        let thread = Mutex::new(self.thread.clone());
593        let data_channels = self.data_channels.clone();
594        let next_data_channel_id = self.next_data_channel_id.clone();
595        self.webrtc
596            .connect("on-data-channel", false, move |channel| {
597                let channel = channel[1]
598                    .get::<gstreamer_webrtc::WebRTCDataChannel>()
599                    .map_err(|e| e.to_string())
600                    .expect("Invalid data channel");
601                let id = next_data_channel_id.fetch_add(1, Ordering::Relaxed);
602                let thread_ = thread.lock().unwrap().clone();
603                match GStreamerWebRtcDataChannel::from(&id, channel, &thread_) {
604                    Ok(channel) => {
605                        let mut closed_channel = false;
606                        {
607                            thread_.internal_event(InternalEvent::OnDataChannelEvent(
608                                id,
609                                DataChannelEvent::NewChannel,
610                            ));
611
612                            let mut data_channels = data_channels.lock().unwrap();
613                            if let Some(ref mut channel) = data_channels.get_mut(&id) {
614                                match channel {
615                                    &mut &mut DataChannelEventTarget::Buffered(ref mut events) => {
616                                        for event in events.drain(0..) {
617                                            if let DataChannelEvent::Close = event {
618                                                closed_channel = true
619                                            }
620                                            thread_.internal_event(
621                                                InternalEvent::OnDataChannelEvent(id, event),
622                                            );
623                                        }
624                                    },
625                                    _ => debug_assert!(
626                                        false,
627                                        "Trying to register a data channel with an existing ID"
628                                    ),
629                                }
630                            }
631                            data_channels.remove(&id);
632                        }
633                        if !closed_channel &&
634                            register_data_channel(data_channels.clone(), id, channel).is_err()
635                        {
636                            warn!("Could not register data channel {:?}", id);
637                            return None;
638                        }
639                    },
640                    Err(error) => {
641                        warn!("Could not create data channel {:?}", error);
642                    },
643                }
644                None
645            });
646
647        self.pipeline.set_state(gstreamer::State::Ready)?;
648        Ok(())
649    }
650}
651
652pub fn construct(
653    signaller: Box<dyn WebRtcSignaller>,
654    thread: WebRtcThread,
655) -> Result<GStreamerWebRtcController, WebRtcError> {
656    let main_loop = glib::MainLoop::new(None, false);
657    let pipeline = gstreamer::Pipeline::with_name("webrtc main");
658    pipeline.set_start_time(gstreamer::ClockTime::NONE);
659    pipeline.set_base_time(*BACKEND_BASE_TIME);
660    pipeline.use_clock(Some(&gstreamer::SystemClock::obtain()));
661    let webrtc = gstreamer::ElementFactory::make("webrtcbin")
662        .name("sendrecv")
663        .build()
664        .map_err(|error| format!("webrtcbin element not found: {error:?}"))?;
665    let mut controller = GStreamerWebRtcController {
666        webrtc,
667        pipeline,
668        signaller,
669        thread,
670        remote_mline_info: vec![],
671        pending_remote_mline_info: vec![],
672        streams: vec![],
673        pending_streams: vec![],
674        pt_counter: 96,
675        request_pad_counter: 0,
676        remote_offer_generation: 0,
677        delayed_negotiation: false,
678        _main_loop: main_loop,
679        data_channels: Arc::new(Mutex::new(HashMap::new())),
680        next_data_channel_id: Arc::new(AtomicUsize::new(0)),
681    };
682    controller.start_pipeline()?;
683    Ok(controller)
684}
685
686fn on_offer_or_answer_created(
687    ty: SdpType,
688    reply: &gstreamer::StructureRef,
689    cb: Box<dyn FnOnce(SessionDescription) + Send + 'static>,
690) {
691    debug_assert!(ty == SdpType::Offer || ty == SdpType::Answer);
692    let reply = reply
693        .value(ty.as_str())
694        .unwrap()
695        .get::<gstreamer_webrtc::WebRTCSessionDescription>()
696        .expect("Invalid argument");
697
698    let type_ = match reply.type_() {
699        gstreamer_webrtc::WebRTCSDPType::Answer => SdpType::Answer,
700        gstreamer_webrtc::WebRTCSDPType::Offer => SdpType::Offer,
701        gstreamer_webrtc::WebRTCSDPType::Pranswer => SdpType::Pranswer,
702        gstreamer_webrtc::WebRTCSDPType::Rollback => SdpType::Rollback,
703        _ => panic!("unknown sdp response"),
704    };
705
706    let desc = SessionDescription {
707        sdp: reply.sdp().as_text().unwrap(),
708        type_,
709    };
710
711    cb(desc);
712}
713
714fn on_incoming_stream(
715    pipe: &gstreamer::Pipeline,
716    thread: Arc<Mutex<WebRtcThread>>,
717    pad: &gstreamer::Pad,
718) {
719    let decodebin = gstreamer::ElementFactory::make("decodebin")
720        .build()
721        .unwrap();
722    let caps = pad.query_caps(None);
723    let name = caps
724        .structure(0)
725        .unwrap()
726        .get::<String>("media")
727        .expect("Invalid 'media' field");
728    let decodebin2 = decodebin.clone();
729    decodebin.connect_pad_added({
730        let pipeline_weak = pipe.downgrade();
731        move |_element, pad| {
732            let Some(pipe) = pipeline_weak.upgrade() else {
733                warn!("Pipeline already deallocated");
734                return;
735            };
736            on_incoming_decodebin_stream(pad, &pipe, thread.clone(), &name);
737        }
738    });
739    pipe.add(&decodebin).unwrap();
740
741    let decodepad = decodebin.static_pad("sink").unwrap();
742    pad.link(&decodepad).unwrap();
743    decodebin2.sync_state_with_parent().unwrap();
744}
745
746fn on_incoming_decodebin_stream(
747    pad: &gstreamer::Pad,
748    pipe: &gstreamer::Pipeline,
749    thread: Arc<Mutex<WebRtcThread>>,
750    name: &str,
751) {
752    let proxy_sink = gstreamer::ElementFactory::make("proxysink")
753        .build()
754        .unwrap();
755    let proxy_src = gstreamer::ElementFactory::make("proxysrc")
756        .property("proxysink", &proxy_sink)
757        .build()
758        .unwrap();
759    pipe.add(&proxy_sink).unwrap();
760    let sinkpad = proxy_sink.static_pad("sink").unwrap();
761
762    pad.link(&sinkpad).unwrap();
763    proxy_sink.sync_state_with_parent().unwrap();
764
765    let (stream, ty) = if name == "video" {
766        (
767            GStreamerMediaStream::create_video_from(proxy_src),
768            MediaStreamType::Video,
769        )
770    } else {
771        (
772            GStreamerMediaStream::create_audio_from(proxy_src),
773            MediaStreamType::Audio,
774        )
775    };
776    thread
777        .lock()
778        .unwrap()
779        .internal_event(InternalEvent::OnAddStream(stream, ty));
780}
781
782fn process_new_stream(
783    pad: &gstreamer::Pad,
784    pipe: &gstreamer::Pipeline,
785    thread: Arc<Mutex<WebRtcThread>>,
786) {
787    if pad.direction() != gstreamer::PadDirection::Src {
788        // Ignore outgoing pad notifications.
789        return;
790    }
791    on_incoming_stream(pipe, thread, pad)
792}
793
794fn candidate(values: &[glib::Value]) -> IceCandidate {
795    let _webrtc = values[0]
796        .get::<gstreamer::Element>()
797        .expect("Invalid argument");
798    let sdp_mline_index = values[1].get::<u32>().expect("Invalid argument");
799    let candidate = values[2].get::<String>().expect("Invalid argument");
800
801    IceCandidate {
802        sdp_mline_index,
803        candidate,
804    }
805}
806
807fn register_data_channel(
808    registry: Arc<Mutex<HashMap<DataChannelId, DataChannelEventTarget>>>,
809    id: DataChannelId,
810    channel: GStreamerWebRtcDataChannel,
811) -> WebRtcDataChannelResult {
812    if registry.lock().unwrap().contains_key(&id) {
813        return Err(WebRtcError::Backend(
814            "Could not register data channel. ID collision".to_owned(),
815        ));
816    }
817    registry
818        .lock()
819        .unwrap()
820        .insert(id, DataChannelEventTarget::Created(channel));
821    Ok(id)
822}