servo_media_gstreamer/
datachannel.rs1use std::sync::Mutex;
6
7use glib::prelude::*;
8use gstreamer_webrtc::{WebRTCDataChannel, WebRTCDataChannelState};
9use servo_media_webrtc::thread::InternalEvent;
10use servo_media_webrtc::{
11 DataChannelEvent, DataChannelId, DataChannelInit, DataChannelMessage, DataChannelState,
12 WebRtcController as WebRtcThread, WebRtcError,
13};
14
15pub struct GStreamerWebRtcDataChannel {
16 channel: WebRTCDataChannel,
17 id: DataChannelId,
18 thread: WebRtcThread,
19}
20
21impl GStreamerWebRtcDataChannel {
22 pub fn new(
23 servo_channel_id: &DataChannelId,
24 webrtc: &gstreamer::Element,
25 thread: &WebRtcThread,
26 init: &DataChannelInit,
27 ) -> Result<Self, String> {
28 let label = &init.label;
29 let mut init_struct = gstreamer::Structure::builder("options")
30 .field("ordered", init.ordered)
31 .field("protocol", &init.protocol)
32 .field("negotiated", init.negotiated)
33 .build();
34
35 if let Some(max_packet_life_time) = init.max_packet_life_time {
36 init_struct.set_value(
37 "max-packet-lifetime",
38 (max_packet_life_time as u32).to_send_value(),
39 );
40 }
41
42 if let Some(max_retransmits) = init.max_retransmits {
43 init_struct.set_value("max-retransmits", (max_retransmits as u32).to_send_value());
44 }
45
46 if let Some(id) = init.id {
47 init_struct.set_value("id", (id as u32).to_send_value());
48 }
49
50 let channel = webrtc
51 .emit_by_name::<WebRTCDataChannel>("create-data-channel", &[&label, &init_struct]);
52
53 GStreamerWebRtcDataChannel::from(servo_channel_id, channel, thread)
54 }
55
56 pub fn from(
57 id: &DataChannelId,
58 channel: WebRTCDataChannel,
59 thread: &WebRtcThread,
60 ) -> Result<Self, String> {
61 let id_ = *id;
62 let thread_ = Mutex::new(thread.clone());
63 channel.connect_on_open(move |_| {
64 thread_
65 .lock()
66 .unwrap()
67 .internal_event(InternalEvent::OnDataChannelEvent(
68 id_,
69 DataChannelEvent::Open,
70 ));
71 });
72
73 let id_ = *id;
74 let thread_ = Mutex::new(thread.clone());
75 channel.connect_on_close(move |_| {
76 thread_
77 .lock()
78 .unwrap()
79 .internal_event(InternalEvent::OnDataChannelEvent(
80 id_,
81 DataChannelEvent::Close,
82 ));
83 });
84
85 let id_ = *id;
86 let thread_ = Mutex::new(thread.clone());
87 channel.connect_on_error(move |_, error| {
88 thread_
89 .lock()
90 .unwrap()
91 .internal_event(InternalEvent::OnDataChannelEvent(
92 id_,
93 DataChannelEvent::Error(WebRtcError::Backend(error.to_string())),
94 ));
95 });
96
97 let id_ = *id;
98 let thread_ = Mutex::new(thread.clone());
99 channel.connect_on_message_string(move |_, message| {
100 let Some(message) = message.map(|s| s.to_owned()) else {
101 return;
102 };
103 thread_
104 .lock()
105 .unwrap()
106 .internal_event(InternalEvent::OnDataChannelEvent(
107 id_,
108 DataChannelEvent::OnMessage(DataChannelMessage::Text(message)),
109 ));
110 });
111
112 let id_ = *id;
113 let thread_ = Mutex::new(thread.clone());
114 channel.connect_on_message_data(move |_, message| {
115 let Some(message) = message.map(|b| b.to_owned()) else {
116 return;
117 };
118 thread_
119 .lock()
120 .unwrap()
121 .internal_event(InternalEvent::OnDataChannelEvent(
122 id_,
123 DataChannelEvent::OnMessage(DataChannelMessage::Binary(message.to_vec())),
124 ));
125 });
126
127 let id_ = *id;
128 let thread_ = Mutex::new(thread.clone());
129 channel.connect_ready_state_notify(move |channel| {
130 let ready_state = channel.ready_state();
131 let ready_state = match ready_state {
132 WebRTCDataChannelState::Connecting => DataChannelState::Connecting,
133 WebRTCDataChannelState::Open => DataChannelState::Open,
134 WebRTCDataChannelState::Closing => DataChannelState::Closing,
135 WebRTCDataChannelState::Closed => DataChannelState::Closed,
136 WebRTCDataChannelState::__Unknown(state) => DataChannelState::__Unknown(state),
137 _ => return,
138 };
139 thread_
140 .lock()
141 .unwrap()
142 .internal_event(InternalEvent::OnDataChannelEvent(
143 id_,
144 DataChannelEvent::StateChange(ready_state),
145 ));
146 });
147
148 Ok(Self {
149 id: *id,
150 thread: thread.to_owned(),
151 channel,
152 })
153 }
154
155 pub fn send(&self, message: &DataChannelMessage) {
156 match message {
157 DataChannelMessage::Text(text) => self.channel.send_string(Some(text)),
158 DataChannelMessage::Binary(data) => self
159 .channel
160 .send_data(Some(&glib::Bytes::from(data.as_slice()))),
161 }
162 }
163
164 pub fn close(&self) {
165 self.channel.close()
166 }
167}
168
169impl Drop for GStreamerWebRtcDataChannel {
170 fn drop(&mut self) {
171 self.thread
172 .internal_event(InternalEvent::OnDataChannelEvent(
173 self.id,
174 DataChannelEvent::Close,
175 ));
176 }
177}