servo_media_gstreamer/
datachannel.rs1use glib::prelude::*;
2use gst_webrtc::{WebRTCDataChannel, WebRTCDataChannelState};
3use servo_media_webrtc::thread::InternalEvent;
4use servo_media_webrtc::WebRtcController as WebRtcThread;
5use servo_media_webrtc::{
6 DataChannelEvent, DataChannelId, DataChannelInit, DataChannelMessage, DataChannelState,
7 WebRtcError,
8};
9use std::sync::Mutex;
10
11pub struct GStreamerWebRtcDataChannel {
12 channel: WebRTCDataChannel,
13 id: DataChannelId,
14 thread: WebRtcThread,
15}
16
17impl GStreamerWebRtcDataChannel {
18 pub fn new(
19 servo_channel_id: &DataChannelId,
20 webrtc: &gst::Element,
21 thread: &WebRtcThread,
22 init: &DataChannelInit,
23 ) -> Result<Self, String> {
24 let label = &init.label;
25 let mut init_struct = gst::Structure::builder("options")
26 .field("ordered", init.ordered)
27 .field("protocol", &init.protocol)
28 .field("negotiated", init.negotiated)
29 .build();
30
31 if let Some(max_packet_life_time) = init.max_packet_life_time {
32 init_struct.set_value(
33 "max-packet-lifetime",
34 (max_packet_life_time as u32).to_send_value(),
35 );
36 }
37
38 if let Some(max_retransmits) = init.max_retransmits {
39 init_struct.set_value("max-retransmits", (max_retransmits as u32).to_send_value());
40 }
41
42 if let Some(id) = init.id {
43 init_struct.set_value("id", (id as u32).to_send_value());
44 }
45
46 let channel = webrtc
47 .emit_by_name::<WebRTCDataChannel>("create-data-channel", &[&label, &init_struct]);
48
49 GStreamerWebRtcDataChannel::from(servo_channel_id, channel, thread)
50 }
51
52 pub fn from(
53 id: &DataChannelId,
54 channel: WebRTCDataChannel,
55 thread: &WebRtcThread,
56 ) -> Result<Self, String> {
57 let id_ = *id;
58 let thread_ = Mutex::new(thread.clone());
59 channel.connect_on_open(move |_| {
60 thread_
61 .lock()
62 .unwrap()
63 .internal_event(InternalEvent::OnDataChannelEvent(
64 id_,
65 DataChannelEvent::Open,
66 ));
67 });
68
69 let id_ = *id;
70 let thread_ = Mutex::new(thread.clone());
71 channel.connect_on_close(move |_| {
72 thread_
73 .lock()
74 .unwrap()
75 .internal_event(InternalEvent::OnDataChannelEvent(
76 id_,
77 DataChannelEvent::Close,
78 ));
79 });
80
81 let id_ = *id;
82 let thread_ = Mutex::new(thread.clone());
83 channel.connect_on_error(move |_, error| {
84 thread_
85 .lock()
86 .unwrap()
87 .internal_event(InternalEvent::OnDataChannelEvent(
88 id_,
89 DataChannelEvent::Error(WebRtcError::Backend(error.to_string())),
90 ));
91 });
92
93 let id_ = *id;
94 let thread_ = Mutex::new(thread.clone());
95 channel.connect_on_message_string(move |_, message| {
96 let Some(message) = message.map(|s| s.to_owned()) else {
97 return;
98 };
99 thread_
100 .lock()
101 .unwrap()
102 .internal_event(InternalEvent::OnDataChannelEvent(
103 id_,
104 DataChannelEvent::OnMessage(DataChannelMessage::Text(message)),
105 ));
106 });
107
108 let id_ = *id;
109 let thread_ = Mutex::new(thread.clone());
110 channel.connect_on_message_data(move |_, message| {
111 let Some(message) = message.map(|b| b.to_owned()) else {
112 return;
113 };
114 thread_
115 .lock()
116 .unwrap()
117 .internal_event(InternalEvent::OnDataChannelEvent(
118 id_,
119 DataChannelEvent::OnMessage(DataChannelMessage::Binary(message.to_vec())),
120 ));
121 });
122
123 let id_ = *id;
124 let thread_ = Mutex::new(thread.clone());
125 channel.connect_ready_state_notify(move |channel| {
126 let ready_state = channel.ready_state();
127 let ready_state = match ready_state {
128 WebRTCDataChannelState::Connecting => DataChannelState::Connecting,
129 WebRTCDataChannelState::Open => DataChannelState::Open,
130 WebRTCDataChannelState::Closing => DataChannelState::Closing,
131 WebRTCDataChannelState::Closed => DataChannelState::Closed,
132 WebRTCDataChannelState::__Unknown(state) => DataChannelState::__Unknown(state),
133 _ => return,
134 };
135 thread_
136 .lock()
137 .unwrap()
138 .internal_event(InternalEvent::OnDataChannelEvent(
139 id_,
140 DataChannelEvent::StateChange(ready_state),
141 ));
142 });
143
144 Ok(Self {
145 id: *id,
146 thread: thread.to_owned(),
147 channel,
148 })
149 }
150
151 pub fn send(&self, message: &DataChannelMessage) {
152 match message {
153 DataChannelMessage::Text(text) => self.channel.send_string(Some(text)),
154 DataChannelMessage::Binary(data) => self
155 .channel
156 .send_data(Some(&glib::Bytes::from(data.as_slice()))),
157 }
158 }
159
160 pub fn close(&self) {
161 self.channel.close()
162 }
163}
164
165impl Drop for GStreamerWebRtcDataChannel {
166 fn drop(&mut self) {
167 self.thread
168 .internal_event(InternalEvent::OnDataChannelEvent(
169 self.id,
170 DataChannelEvent::Close,
171 ));
172 }
173}