servo_media_gstreamer/
datachannel.rs

1use glib::prelude::*;
2use gst_webrtc::{WebRTCDataChannel, WebRTCDataChannelState};
3use servo_media_webrtc::thread::InternalEvent;
4use servo_media_webrtc::WebRtcController as WebRtcThread;
5use servo_media_webrtc::{
6    DataChannelEvent, DataChannelId, DataChannelInit, DataChannelMessage, DataChannelState,
7    WebRtcError,
8};
9use std::sync::Mutex;
10
11pub struct GStreamerWebRtcDataChannel {
12    channel: WebRTCDataChannel,
13    id: DataChannelId,
14    thread: WebRtcThread,
15}
16
17impl GStreamerWebRtcDataChannel {
18    pub fn new(
19        servo_channel_id: &DataChannelId,
20        webrtc: &gst::Element,
21        thread: &WebRtcThread,
22        init: &DataChannelInit,
23    ) -> Result<Self, String> {
24        let label = &init.label;
25        let mut init_struct = gst::Structure::builder("options")
26            .field("ordered", init.ordered)
27            .field("protocol", &init.protocol)
28            .field("negotiated", init.negotiated)
29            .build();
30
31        if let Some(max_packet_life_time) = init.max_packet_life_time {
32            init_struct.set_value(
33                "max-packet-lifetime",
34                (max_packet_life_time as u32).to_send_value(),
35            );
36        }
37
38        if let Some(max_retransmits) = init.max_retransmits {
39            init_struct.set_value("max-retransmits", (max_retransmits as u32).to_send_value());
40        }
41
42        if let Some(id) = init.id {
43            init_struct.set_value("id", (id as u32).to_send_value());
44        }
45
46        let channel = webrtc
47            .emit_by_name::<WebRTCDataChannel>("create-data-channel", &[&label, &init_struct]);
48
49        GStreamerWebRtcDataChannel::from(servo_channel_id, channel, thread)
50    }
51
52    pub fn from(
53        id: &DataChannelId,
54        channel: WebRTCDataChannel,
55        thread: &WebRtcThread,
56    ) -> Result<Self, String> {
57        let id_ = *id;
58        let thread_ = Mutex::new(thread.clone());
59        channel.connect_on_open(move |_| {
60            thread_
61                .lock()
62                .unwrap()
63                .internal_event(InternalEvent::OnDataChannelEvent(
64                    id_,
65                    DataChannelEvent::Open,
66                ));
67        });
68
69        let id_ = *id;
70        let thread_ = Mutex::new(thread.clone());
71        channel.connect_on_close(move |_| {
72            thread_
73                .lock()
74                .unwrap()
75                .internal_event(InternalEvent::OnDataChannelEvent(
76                    id_,
77                    DataChannelEvent::Close,
78                ));
79        });
80
81        let id_ = *id;
82        let thread_ = Mutex::new(thread.clone());
83        channel.connect_on_error(move |_, error| {
84            thread_
85                .lock()
86                .unwrap()
87                .internal_event(InternalEvent::OnDataChannelEvent(
88                    id_,
89                    DataChannelEvent::Error(WebRtcError::Backend(error.to_string())),
90                ));
91        });
92
93        let id_ = *id;
94        let thread_ = Mutex::new(thread.clone());
95        channel.connect_on_message_string(move |_, message| {
96            let Some(message) = message.map(|s| s.to_owned()) else {
97                return;
98            };
99            thread_
100                .lock()
101                .unwrap()
102                .internal_event(InternalEvent::OnDataChannelEvent(
103                    id_,
104                    DataChannelEvent::OnMessage(DataChannelMessage::Text(message)),
105                ));
106        });
107
108        let id_ = *id;
109        let thread_ = Mutex::new(thread.clone());
110        channel.connect_on_message_data(move |_, message| {
111            let Some(message) = message.map(|b| b.to_owned()) else {
112                return;
113            };
114            thread_
115                .lock()
116                .unwrap()
117                .internal_event(InternalEvent::OnDataChannelEvent(
118                    id_,
119                    DataChannelEvent::OnMessage(DataChannelMessage::Binary(message.to_vec())),
120                ));
121        });
122
123        let id_ = *id;
124        let thread_ = Mutex::new(thread.clone());
125        channel.connect_ready_state_notify(move |channel| {
126            let ready_state = channel.ready_state();
127            let ready_state = match ready_state {
128                WebRTCDataChannelState::Connecting => DataChannelState::Connecting,
129                WebRTCDataChannelState::Open => DataChannelState::Open,
130                WebRTCDataChannelState::Closing => DataChannelState::Closing,
131                WebRTCDataChannelState::Closed => DataChannelState::Closed,
132                WebRTCDataChannelState::__Unknown(state) => DataChannelState::__Unknown(state),
133                _ => return,
134            };
135            thread_
136                .lock()
137                .unwrap()
138                .internal_event(InternalEvent::OnDataChannelEvent(
139                    id_,
140                    DataChannelEvent::StateChange(ready_state),
141                ));
142        });
143
144        Ok(Self {
145            id: *id,
146            thread: thread.to_owned(),
147            channel,
148        })
149    }
150
151    pub fn send(&self, message: &DataChannelMessage) {
152        match message {
153            DataChannelMessage::Text(text) => self.channel.send_string(Some(text)),
154            DataChannelMessage::Binary(data) => self
155                .channel
156                .send_data(Some(&glib::Bytes::from(data.as_slice()))),
157        }
158    }
159
160    pub fn close(&self) {
161        self.channel.close()
162    }
163}
164
165impl Drop for GStreamerWebRtcDataChannel {
166    fn drop(&mut self) {
167        self.thread
168            .internal_event(InternalEvent::OnDataChannelEvent(
169                self.id,
170                DataChannelEvent::Close,
171            ));
172    }
173}