Skip to main content

servo_media_gstreamer/
datachannel.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
4
5use std::sync::Mutex;
6
7use glib::prelude::*;
8use gstreamer_webrtc::{WebRTCDataChannel, WebRTCDataChannelState};
9use servo_media_webrtc::thread::InternalEvent;
10use servo_media_webrtc::{
11    DataChannelEvent, DataChannelId, DataChannelInit, DataChannelMessage, DataChannelState,
12    WebRtcController as WebRtcThread, WebRtcError,
13};
14
15pub struct GStreamerWebRtcDataChannel {
16    channel: WebRTCDataChannel,
17    id: DataChannelId,
18    thread: WebRtcThread,
19}
20
21impl GStreamerWebRtcDataChannel {
22    pub fn new(
23        servo_channel_id: &DataChannelId,
24        webrtc: &gstreamer::Element,
25        thread: &WebRtcThread,
26        init: &DataChannelInit,
27    ) -> Result<Self, String> {
28        let label = &init.label;
29        let mut init_struct = gstreamer::Structure::builder("options")
30            .field("ordered", init.ordered)
31            .field("protocol", &init.protocol)
32            .field("negotiated", init.negotiated)
33            .build();
34
35        if let Some(max_packet_life_time) = init.max_packet_life_time {
36            init_struct.set_value(
37                "max-packet-lifetime",
38                (max_packet_life_time as u32).to_send_value(),
39            );
40        }
41
42        if let Some(max_retransmits) = init.max_retransmits {
43            init_struct.set_value("max-retransmits", (max_retransmits as u32).to_send_value());
44        }
45
46        if let Some(id) = init.id {
47            init_struct.set_value("id", (id as u32).to_send_value());
48        }
49
50        let channel = webrtc
51            .emit_by_name::<WebRTCDataChannel>("create-data-channel", &[&label, &init_struct]);
52
53        GStreamerWebRtcDataChannel::from(servo_channel_id, channel, thread)
54    }
55
56    pub fn from(
57        id: &DataChannelId,
58        channel: WebRTCDataChannel,
59        thread: &WebRtcThread,
60    ) -> Result<Self, String> {
61        let id_ = *id;
62        let thread_ = Mutex::new(thread.clone());
63        channel.connect_on_open(move |_| {
64            thread_
65                .lock()
66                .unwrap()
67                .internal_event(InternalEvent::OnDataChannelEvent(
68                    id_,
69                    DataChannelEvent::Open,
70                ));
71        });
72
73        let id_ = *id;
74        let thread_ = Mutex::new(thread.clone());
75        channel.connect_on_close(move |_| {
76            thread_
77                .lock()
78                .unwrap()
79                .internal_event(InternalEvent::OnDataChannelEvent(
80                    id_,
81                    DataChannelEvent::Close,
82                ));
83        });
84
85        let id_ = *id;
86        let thread_ = Mutex::new(thread.clone());
87        channel.connect_on_error(move |_, error| {
88            thread_
89                .lock()
90                .unwrap()
91                .internal_event(InternalEvent::OnDataChannelEvent(
92                    id_,
93                    DataChannelEvent::Error(WebRtcError::Backend(error.to_string())),
94                ));
95        });
96
97        let id_ = *id;
98        let thread_ = Mutex::new(thread.clone());
99        channel.connect_on_message_string(move |_, message| {
100            let Some(message) = message.map(|s| s.to_owned()) else {
101                return;
102            };
103            thread_
104                .lock()
105                .unwrap()
106                .internal_event(InternalEvent::OnDataChannelEvent(
107                    id_,
108                    DataChannelEvent::OnMessage(DataChannelMessage::Text(message)),
109                ));
110        });
111
112        let id_ = *id;
113        let thread_ = Mutex::new(thread.clone());
114        channel.connect_on_message_data(move |_, message| {
115            let Some(message) = message.map(|b| b.to_owned()) else {
116                return;
117            };
118            thread_
119                .lock()
120                .unwrap()
121                .internal_event(InternalEvent::OnDataChannelEvent(
122                    id_,
123                    DataChannelEvent::OnMessage(DataChannelMessage::Binary(message.to_vec())),
124                ));
125        });
126
127        let id_ = *id;
128        let thread_ = Mutex::new(thread.clone());
129        channel.connect_ready_state_notify(move |channel| {
130            let ready_state = channel.ready_state();
131            let ready_state = match ready_state {
132                WebRTCDataChannelState::Connecting => DataChannelState::Connecting,
133                WebRTCDataChannelState::Open => DataChannelState::Open,
134                WebRTCDataChannelState::Closing => DataChannelState::Closing,
135                WebRTCDataChannelState::Closed => DataChannelState::Closed,
136                WebRTCDataChannelState::__Unknown(state) => DataChannelState::__Unknown(state),
137                _ => return,
138            };
139            thread_
140                .lock()
141                .unwrap()
142                .internal_event(InternalEvent::OnDataChannelEvent(
143                    id_,
144                    DataChannelEvent::StateChange(ready_state),
145                ));
146        });
147
148        Ok(Self {
149            id: *id,
150            thread: thread.to_owned(),
151            channel,
152        })
153    }
154
155    pub fn send(&self, message: &DataChannelMessage) {
156        match message {
157            DataChannelMessage::Text(text) => self.channel.send_string(Some(text)),
158            DataChannelMessage::Binary(data) => self
159                .channel
160                .send_data(Some(&glib::Bytes::from(data.as_slice()))),
161        }
162    }
163
164    pub fn close(&self) {
165        self.channel.close()
166    }
167}
168
169impl Drop for GStreamerWebRtcDataChannel {
170    fn drop(&mut self) {
171        self.thread
172            .internal_event(InternalEvent::OnDataChannelEvent(
173                self.id,
174                DataChannelEvent::Close,
175            ));
176    }
177}