1use std::{
2 ffi::CString,
3 os::unix::{
4 io::{AsFd, BorrowedFd, OwnedFd, RawFd},
5 net::UnixStream,
6 },
7 sync::Arc,
8};
9
10use crate::{
11 core_interfaces::{WL_CALLBACK_INTERFACE, WL_DISPLAY_INTERFACE, WL_REGISTRY_INTERFACE},
12 debug,
13 protocol::{
14 check_for_signature, same_interface, same_interface_or_anonymous, AllowNull, Argument,
15 ArgumentType, Interface, Message, ObjectInfo, ProtocolError, ANONYMOUS_INTERFACE,
16 INLINE_ARGS,
17 },
18 rs::map::SERVER_ID_LIMIT,
19 types::server::{DisconnectReason, InvalidId},
20};
21
22use smallvec::SmallVec;
23
24use crate::rs::{
25 map::{Object, ObjectMap},
26 socket::{BufferedSocket, Socket},
27 wire::MessageParseError,
28};
29
30use super::{
31 handle::PendingDestructor, registry::Registry, ClientData, ClientId, Credentials, Data,
32 DumbObjectData, GlobalHandler, InnerClientId, InnerGlobalId, InnerObjectId, ObjectData,
33 ObjectId, UninitObjectData,
34};
35
36type ArgSmallVec<Fd> = SmallVec<[Argument<ObjectId, Fd>; INLINE_ARGS]>;
37
38#[repr(u32)]
39#[allow(dead_code)]
40pub(crate) enum DisplayError {
41 InvalidObject = 0,
42 InvalidMethod = 1,
43 NoMemory = 2,
44 Implementation = 3,
45}
46
47#[derive(Debug)]
48pub(crate) struct Client<D: 'static> {
49 pub(crate) socket: BufferedSocket,
50 pub(crate) map: ObjectMap<Data<D>>,
51 debug: bool,
52 last_serial: u32,
53 pub(crate) id: InnerClientId,
54 pub(crate) killed: bool,
55 pub(crate) data: Arc<dyn ClientData>,
56}
57
58impl<D> Client<D> {
59 fn next_serial(&mut self) -> u32 {
60 self.last_serial = self.last_serial.wrapping_add(1);
61 self.last_serial
62 }
63}
64
65impl<D> Client<D> {
66 pub(crate) fn new(
67 stream: UnixStream,
68 id: InnerClientId,
69 debug: bool,
70 data: Arc<dyn ClientData>,
71 buffer_size: usize,
72 ) -> Self {
73 let socket = BufferedSocket::new(Socket::from(stream), Some(buffer_size));
74 let mut map = ObjectMap::new();
75 map.insert_at(
76 1,
77 Object {
78 interface: &WL_DISPLAY_INTERFACE,
79 version: 1,
80 data: Data { user_data: Arc::new(DumbObjectData), serial: 0 },
81 },
82 )
83 .unwrap();
84
85 data.initialized(ClientId { id: id.clone() });
86
87 Self { socket, map, debug, id, killed: false, last_serial: 0, data }
88 }
89
90 pub(crate) fn create_object(
91 &mut self,
92 interface: &'static Interface,
93 version: u32,
94 user_data: Arc<dyn ObjectData<D>>,
95 ) -> InnerObjectId {
96 let serial = self.next_serial();
97 let id = self.map.server_insert_new(Object {
98 interface,
99 version,
100 data: Data { serial, user_data },
101 });
102 InnerObjectId { id, serial, client_id: self.id.clone(), interface }
103 }
104
105 pub(crate) fn destroy_object(
106 &mut self,
107 id: InnerObjectId,
108 pending_destructors: &mut Vec<super::handle::PendingDestructor<D>>,
109 ) -> Result<(), InvalidId> {
110 let object = self.get_object(id.clone())?;
111 pending_destructors.push((object.data.user_data.clone(), self.id.clone(), id.clone()));
112 self.send_delete_id(id.clone());
113 Ok(())
114 }
115
116 pub(crate) fn object_info(&self, id: InnerObjectId) -> Result<ObjectInfo, InvalidId> {
117 let object = self.get_object(id.clone())?;
118 Ok(ObjectInfo { id: id.id, interface: object.interface, version: object.version })
119 }
120
121 pub(crate) fn send_event(
122 &mut self,
123 Message { sender_id: object_id, opcode, args }: Message<ObjectId, RawFd>,
124 pending_destructors: Option<&mut Vec<super::handle::PendingDestructor<D>>>,
125 ) -> Result<(), InvalidId> {
126 if self.killed {
127 return Ok(());
128 }
129 let object = self.get_object(object_id.id.clone())?;
130
131 let message_desc = match object.interface.events.get(opcode as usize) {
132 Some(msg) => msg,
133 None => {
134 panic!(
135 "Unknown opcode {} for object {}@{}.",
136 opcode, object.interface.name, object_id.id
137 );
138 }
139 };
140
141 if !check_for_signature(message_desc.signature, &args) {
142 panic!(
143 "Unexpected signature for event {}@{}.{}: expected {:?}, got {:?}.",
144 object.interface.name,
145 object_id.id,
146 message_desc.name,
147 message_desc.signature,
148 args
149 );
150 }
151
152 if self.debug {
153 debug::print_send_message(
154 object.interface.name,
155 object_id.id.id,
156 message_desc.name,
157 &args,
158 false,
159 );
160 }
161
162 let mut msg_args = SmallVec::with_capacity(args.len());
163 let mut arg_interfaces = message_desc.arg_interfaces.iter();
164 for (i, arg) in args.into_iter().enumerate() {
165 msg_args.push(match arg {
166 Argument::Array(a) => Argument::Array(a),
167 Argument::Int(i) => Argument::Int(i),
168 Argument::Uint(u) => Argument::Uint(u),
169 Argument::Str(s) => Argument::Str(s),
170 Argument::Fixed(f) => Argument::Fixed(f),
171 Argument::Fd(f) => Argument::Fd(f),
172 Argument::NewId(o) => {
173 if o.id.id != 0 {
174 if o.id.client_id != self.id {
175 panic!("Attempting to send an event with objects from wrong client.")
176 }
177 let object = self.get_object(o.id.clone())?;
178 let child_interface = match message_desc.child_interface {
179 Some(iface) => iface,
180 None => panic!("Trying to send event {}@{}.{} which creates an object without specifying its interface, this is unsupported.", object_id.id.interface.name, object_id.id, message_desc.name),
181 };
182 if !same_interface(child_interface, object.interface) {
183 panic!("Event {}@{}.{} expects a newid argument of interface {} but {} was provided instead.", object.interface.name, object_id.id, message_desc.name, child_interface.name, object.interface.name);
184 }
185 } else if !matches!(message_desc.signature[i], ArgumentType::NewId) {
186 panic!("Request {}@{}.{} expects an non-null newid argument.", object.interface.name, object_id.id, message_desc.name);
187 }
188 Argument::Object(o.id.id)
189 },
190 Argument::Object(o) => {
191 let next_interface = arg_interfaces.next().unwrap();
192 if o.id.id != 0 {
193 if o.id.client_id != self.id {
194 panic!("Attempting to send an event with objects from wrong client.")
195 }
196 let arg_object = self.get_object(o.id.clone())?;
197 if !same_interface_or_anonymous(next_interface, arg_object.interface) {
198 panic!("Event {}@{}.{} expects an object argument of interface {} but {} was provided instead.", object.interface.name, object_id.id, message_desc.name, next_interface.name, arg_object.interface.name);
199 }
200 } else if !matches!(message_desc.signature[i], ArgumentType::Object(AllowNull::Yes)) {
201 panic!("Request {}@{}.{} expects an non-null object argument.", object.interface.name, object_id.id, message_desc.name);
202 }
203 Argument::Object(o.id.id)
204 }
205 });
206 }
207
208 let msg = Message { sender_id: object_id.id.id, opcode, args: msg_args };
209
210 if self.socket.write_message(&msg).is_err() {
211 self.kill(DisconnectReason::ConnectionClosed);
212 }
213
214 if message_desc.is_destructor {
216 if let Some(vec) = pending_destructors {
217 vec.push((object.data.user_data.clone(), self.id.clone(), object_id.id.clone()));
218 }
219 self.send_delete_id(object_id.id);
220 }
221
222 Ok(())
223 }
224
225 pub(crate) fn send_delete_id(&mut self, object_id: InnerObjectId) {
226 if object_id.id < SERVER_ID_LIMIT {
228 let msg = message!(1, 1, [Argument::Uint(object_id.id)]);
229 if self.socket.write_message(&msg).is_err() {
230 self.kill(DisconnectReason::ConnectionClosed);
231 }
232 }
233 self.map.remove(object_id.id);
234 }
235
236 pub(crate) fn get_object_data(
237 &self,
238 id: InnerObjectId,
239 ) -> Result<Arc<dyn ObjectData<D>>, InvalidId> {
240 let object = self.get_object(id)?;
241 Ok(object.data.user_data)
242 }
243
244 pub(crate) fn set_object_data(
245 &mut self,
246 id: InnerObjectId,
247 data: Arc<dyn ObjectData<D>>,
248 ) -> Result<(), InvalidId> {
249 self.map
250 .with(id.id, |objdata| {
251 if objdata.data.serial != id.serial {
252 Err(InvalidId)
253 } else {
254 objdata.data.user_data = data;
255 Ok(())
256 }
257 })
258 .unwrap_or(Err(InvalidId))
259 }
260
261 pub(crate) fn post_display_error(&mut self, code: DisplayError, message: CString) {
262 self.post_error(
263 InnerObjectId {
264 id: 1,
265 interface: &WL_DISPLAY_INTERFACE,
266 client_id: self.id.clone(),
267 serial: 0,
268 },
269 code as u32,
270 message,
271 )
272 }
273
274 pub(crate) fn post_error(
275 &mut self,
276 object_id: InnerObjectId,
277 error_code: u32,
278 message: CString,
279 ) {
280 let converted_message = message.to_string_lossy().into();
281 let _ = self.send_event(
283 message!(
284 ObjectId {
285 id: InnerObjectId {
286 id: 1,
287 interface: &WL_DISPLAY_INTERFACE,
288 client_id: self.id.clone(),
289 serial: 0
290 }
291 },
292 0, [
294 Argument::Object(ObjectId { id: object_id.clone() }),
295 Argument::Uint(error_code),
296 Argument::Str(Some(Box::new(message))),
297 ],
298 ),
299 None,
301 );
302 let _ = self.flush();
303 self.kill(DisconnectReason::ProtocolError(ProtocolError {
304 code: error_code,
305 object_id: object_id.id,
306 object_interface: object_id.interface.name.into(),
307 message: converted_message,
308 }));
309 }
310
311 #[cfg(any(target_os = "linux", target_os = "android"))]
312 pub(crate) fn get_credentials(&self) -> Credentials {
313 let creds =
314 rustix::net::sockopt::socket_peercred(&self.socket).expect("getsockopt failed!?");
315 let pid = rustix::process::Pid::as_raw(Some(creds.pid));
316 Credentials { pid, uid: creds.uid.as_raw(), gid: creds.gid.as_raw() }
317 }
318
319 #[cfg(not(any(target_os = "linux", target_os = "android")))]
320 pub(crate) fn get_credentials(&self) -> Credentials {
322 Credentials { pid: 0, uid: 0, gid: 0 }
323 }
324
325 pub(crate) fn kill(&mut self, reason: DisconnectReason) {
326 self.killed = true;
327 self.data.disconnected(ClientId { id: self.id.clone() }, reason);
328 }
329
330 pub(crate) fn flush(&mut self) -> std::io::Result<()> {
331 self.socket.flush()
332 }
333
334 pub(crate) fn all_objects(&self) -> impl Iterator<Item = ObjectId> + '_ {
335 let client_id = self.id.clone();
336 self.map.all_objects().map(move |(id, obj)| ObjectId {
337 id: InnerObjectId {
338 id,
339 client_id: client_id.clone(),
340 interface: obj.interface,
341 serial: obj.data.serial,
342 },
343 })
344 }
345
346 #[allow(clippy::type_complexity)]
347 pub(crate) fn next_request(
348 &mut self,
349 ) -> std::io::Result<(Message<u32, OwnedFd>, Object<Data<D>>)> {
350 if self.killed {
351 return Err(rustix::io::Errno::PIPE.into());
352 }
353 loop {
354 let map = &self.map;
355 let msg = match self.socket.read_one_message(|id, opcode| {
356 map.find(id)
357 .and_then(|o| o.interface.requests.get(opcode as usize))
358 .map(|desc| desc.signature)
359 }) {
360 Ok(msg) => msg,
361 Err(MessageParseError::MissingData) | Err(MessageParseError::MissingFD) => {
362 if let Err(e) = self.socket.fill_incoming_buffers() {
364 if e.kind() != std::io::ErrorKind::WouldBlock {
365 self.kill(DisconnectReason::ConnectionClosed);
366 }
367 return Err(e);
368 }
369 continue;
370 }
371 Err(MessageParseError::Malformed) => {
372 self.kill(DisconnectReason::ConnectionClosed);
373 return Err(rustix::io::Errno::PROTO.into());
374 }
375 };
376
377 let obj = self.map.find(msg.sender_id).unwrap();
378
379 if self.debug {
380 debug::print_dispatched_message(
381 obj.interface.name,
382 msg.sender_id,
383 obj.interface.requests.get(msg.opcode as usize).unwrap().name,
384 &msg.args,
385 );
386 }
387
388 return Ok((msg, obj));
389 }
390 }
391
392 pub(crate) fn get_object(&self, id: InnerObjectId) -> Result<Object<Data<D>>, InvalidId> {
393 let object = self.map.find(id.id).ok_or(InvalidId)?;
394 if object.data.serial != id.serial {
395 return Err(InvalidId);
396 }
397 Ok(object)
398 }
399
400 pub(crate) fn object_for_protocol_id(&self, pid: u32) -> Result<InnerObjectId, InvalidId> {
401 let object = self.map.find(pid).ok_or(InvalidId)?;
402 Ok(InnerObjectId {
403 id: pid,
404 client_id: self.id.clone(),
405 serial: object.data.serial,
406 interface: object.interface,
407 })
408 }
409
410 fn queue_all_destructors(&mut self, pending_destructors: &mut Vec<PendingDestructor<D>>) {
411 pending_destructors.extend(self.map.all_objects().map(|(id, obj)| {
412 (
413 obj.data.user_data.clone(),
414 self.id.clone(),
415 InnerObjectId {
416 id,
417 serial: obj.data.serial,
418 client_id: self.id.clone(),
419 interface: obj.interface,
420 },
421 )
422 }));
423 }
424
425 pub(crate) fn handle_display_request(
426 &mut self,
427 message: Message<u32, OwnedFd>,
428 registry: &mut Registry<D>,
429 ) {
430 match message.opcode {
431 0 => {
433 if let [Argument::NewId(new_id)] = message.args[..] {
434 let serial = self.next_serial();
435 let callback_obj = Object {
436 interface: &WL_CALLBACK_INTERFACE,
437 version: 1,
438 data: Data { user_data: Arc::new(DumbObjectData), serial },
439 };
440 if let Err(()) = self.map.insert_at(new_id, callback_obj) {
441 self.post_display_error(
442 DisplayError::InvalidObject,
443 CString::new(format!("Invalid new_id: {new_id}.")).unwrap(),
444 );
445 return;
446 }
447 let cb_id = ObjectId {
448 id: InnerObjectId {
449 id: new_id,
450 client_id: self.id.clone(),
451 serial,
452 interface: &WL_CALLBACK_INTERFACE,
453 },
454 };
455 self.send_event(message!(cb_id, 0, [Argument::Uint(0)]), None).unwrap();
457 } else {
458 unreachable!()
459 }
460 }
461 1 => {
463 if let [Argument::NewId(new_id)] = message.args[..] {
464 let serial = self.next_serial();
465 let registry_obj = Object {
466 interface: &WL_REGISTRY_INTERFACE,
467 version: 1,
468 data: Data { user_data: Arc::new(DumbObjectData), serial },
469 };
470 let registry_id = InnerObjectId {
471 id: new_id,
472 serial,
473 client_id: self.id.clone(),
474 interface: &WL_REGISTRY_INTERFACE,
475 };
476 if let Err(()) = self.map.insert_at(new_id, registry_obj) {
477 self.post_display_error(
478 DisplayError::InvalidObject,
479 CString::new(format!("Invalid new_id: {new_id}.")).unwrap(),
480 );
481 return;
482 }
483 let _ = registry.new_registry(registry_id, self);
484 } else {
485 unreachable!()
486 }
487 }
488 _ => {
489 self.post_display_error(
491 DisplayError::InvalidMethod,
492 CString::new(format!(
493 "Unknown opcode {} for interface wl_display.",
494 message.opcode
495 ))
496 .unwrap(),
497 );
498 }
499 }
500 }
501
502 #[allow(clippy::type_complexity)]
503 pub(crate) fn handle_registry_request(
504 &mut self,
505 message: Message<u32, OwnedFd>,
506 registry: &mut Registry<D>,
507 ) -> Option<(InnerClientId, InnerGlobalId, InnerObjectId, Arc<dyn GlobalHandler<D>>)> {
508 match message.opcode {
509 0 => {
511 if let [Argument::Uint(name), Argument::Str(Some(ref interface_name)), Argument::Uint(version), Argument::NewId(new_id)] =
512 message.args[..]
513 {
514 if let Some((interface, global_id, handler)) =
515 registry.check_bind(self, name, interface_name, version)
516 {
517 let serial = self.next_serial();
518 let object = Object {
519 interface,
520 version,
521 data: Data { serial, user_data: Arc::new(UninitObjectData) },
522 };
523 if let Err(()) = self.map.insert_at(new_id, object) {
524 self.post_display_error(
525 DisplayError::InvalidObject,
526 CString::new(format!("Invalid new_id: {new_id}.")).unwrap(),
527 );
528 return None;
529 }
530 Some((
531 self.id.clone(),
532 global_id,
533 InnerObjectId {
534 id: new_id,
535 client_id: self.id.clone(),
536 interface,
537 serial,
538 },
539 handler.clone(),
540 ))
541 } else {
542 self.post_display_error(
543 DisplayError::InvalidObject,
544 CString::new(format!(
545 "Invalid binding of {} version {} for global {}.",
546 interface_name.to_string_lossy(),
547 version,
548 name
549 ))
550 .unwrap(),
551 );
552 None
553 }
554 } else {
555 unreachable!()
556 }
557 }
558 _ => {
559 self.post_display_error(
561 DisplayError::InvalidMethod,
562 CString::new(format!(
563 "Unknown opcode {} for interface wl_registry.",
564 message.opcode
565 ))
566 .unwrap(),
567 );
568 None
569 }
570 }
571 }
572
573 pub(crate) fn process_request(
574 &mut self,
575 object: &Object<Data<D>>,
576 message: Message<u32, OwnedFd>,
577 ) -> Option<(ArgSmallVec<OwnedFd>, bool, Option<InnerObjectId>)> {
578 let message_desc = object.interface.requests.get(message.opcode as usize).unwrap();
579
580 if message_desc.since > object.version {
581 self.post_display_error(
582 DisplayError::InvalidMethod,
583 CString::new(format!(
584 "invalid method {} (since {} < {}), object {}#{}",
585 message.opcode,
586 object.version,
587 message_desc.since,
588 object.interface.name,
589 message.sender_id
590 ))
591 .unwrap(),
592 );
593 return None;
594 }
595
596 let mut new_args = SmallVec::with_capacity(message.args.len());
598 let mut arg_interfaces = message_desc.arg_interfaces.iter();
599 let mut created_id = None;
600 for (i, arg) in message.args.into_iter().enumerate() {
601 new_args.push(match arg {
602 Argument::Array(a) => Argument::Array(a),
603 Argument::Int(i) => Argument::Int(i),
604 Argument::Uint(u) => Argument::Uint(u),
605 Argument::Str(s) => Argument::Str(s),
606 Argument::Fixed(f) => Argument::Fixed(f),
607 Argument::Fd(f) => Argument::Fd(f),
608 Argument::Object(o) => {
609 let next_interface = arg_interfaces.next();
610 if o != 0 {
611 let obj = match self.map.find(o) {
613 Some(o) => o,
614 None => {
615 self.post_display_error(
616 DisplayError::InvalidObject,
617 CString::new(format!("Unknown id: {o}.")).unwrap()
618 );
619 return None;
620 }
621 };
622 if let Some(next_interface) = next_interface {
623 if !same_interface_or_anonymous(next_interface, obj.interface) {
624 self.post_display_error(
625 DisplayError::InvalidObject,
626 CString::new(format!(
627 "Invalid object {} in request {}.{}: expected {} but got {}.",
628 o,
629 object.interface.name,
630 message_desc.name,
631 next_interface.name,
632 obj.interface.name,
633 )).unwrap()
634 );
635 return None;
636 }
637 }
638 Argument::Object(ObjectId { id: InnerObjectId { id: o, client_id: self.id.clone(), serial: obj.data.serial, interface: obj.interface }})
639 } else if matches!(message_desc.signature[i], ArgumentType::Object(AllowNull::Yes)) {
640 Argument::Object(ObjectId { id: InnerObjectId { id: 0, client_id: self.id.clone(), serial: 0, interface: &ANONYMOUS_INTERFACE }})
641 } else {
642 self.post_display_error(
643 DisplayError::InvalidObject,
644 CString::new(format!(
645 "Invalid null object in request {}.{}.",
646 object.interface.name,
647 message_desc.name,
648 )).unwrap()
649 );
650 return None;
651 }
652 }
653 Argument::NewId(new_id) => {
654 let child_interface = match message_desc.child_interface {
656 Some(iface) => iface,
657 None => panic!("Received request {}@{}.{} which creates an object without specifying its interface, this is unsupported.", object.interface.name, message.sender_id, message_desc.name),
658 };
659
660 let child_udata = Arc::new(UninitObjectData);
661
662 let child_obj = Object {
663 interface: child_interface,
664 version: object.version,
665 data: Data {
666 user_data: child_udata,
667 serial: self.next_serial(),
668 }
669 };
670
671 let child_id = InnerObjectId { id: new_id, client_id: self.id.clone(), serial: child_obj.data.serial, interface: child_obj.interface };
672 created_id = Some(child_id.clone());
673
674 if let Err(()) = self.map.insert_at(new_id, child_obj) {
675 self.post_display_error(
677 DisplayError::InvalidObject,
678 CString::new(format!("Invalid new_id: {new_id}.")).unwrap()
679 );
680 return None;
681 }
682
683 Argument::NewId(ObjectId { id: child_id })
684 }
685 });
686 }
687 Some((new_args, message_desc.is_destructor, created_id))
688 }
689}
690
691impl<D> AsFd for Client<D> {
692 fn as_fd(&self) -> BorrowedFd<'_> {
693 self.socket.as_fd()
694 }
695}
696
697#[derive(Debug)]
698pub(crate) struct ClientStore<D: 'static> {
699 clients: Vec<Option<Client<D>>>,
700 last_serial: u32,
701 debug: bool,
702}
703
704impl<D> ClientStore<D> {
705 pub(crate) fn new(debug: bool) -> Self {
706 Self { clients: Vec::new(), last_serial: 0, debug }
707 }
708
709 pub(crate) fn create_client(
710 &mut self,
711 stream: UnixStream,
712 data: Arc<dyn ClientData>,
713 buffer_size: usize,
714 ) -> InnerClientId {
715 let serial = self.next_serial();
716 let (id, place) = match self.clients.iter_mut().enumerate().find(|(_, c)| c.is_none()) {
718 Some((id, place)) => (id, place),
719 None => {
720 self.clients.push(None);
721 (self.clients.len() - 1, self.clients.last_mut().unwrap())
722 }
723 };
724
725 let id = InnerClientId { id: id as u32, serial };
726
727 *place = Some(Client::new(stream, id.clone(), self.debug, data, buffer_size));
728
729 id
730 }
731
732 pub(crate) fn get_client(&self, id: InnerClientId) -> Result<&Client<D>, InvalidId> {
733 match self.clients.get(id.id as usize) {
734 Some(Some(client)) if client.id == id => Ok(client),
735 _ => Err(InvalidId),
736 }
737 }
738
739 pub(crate) fn get_client_mut(
740 &mut self,
741 id: InnerClientId,
742 ) -> Result<&mut Client<D>, InvalidId> {
743 match self.clients.get_mut(id.id as usize) {
744 Some(&mut Some(ref mut client)) if client.id == id => Ok(client),
745 _ => Err(InvalidId),
746 }
747 }
748
749 pub(crate) fn cleanup(
750 &mut self,
751 pending_destructors: &mut Vec<PendingDestructor<D>>,
752 ) -> SmallVec<[Client<D>; 1]> {
753 let mut cleaned = SmallVec::new();
754 for place in &mut self.clients {
755 if place.as_ref().map(|client| client.killed).unwrap_or(false) {
756 let mut client = place.take().unwrap();
758 client.queue_all_destructors(pending_destructors);
759 let _ = client.flush();
760 cleaned.push(client);
761 }
762 }
763 cleaned
764 }
765
766 fn next_serial(&mut self) -> u32 {
767 self.last_serial = self.last_serial.wrapping_add(1);
768 self.last_serial
769 }
770
771 pub(crate) fn clients_mut(&mut self) -> impl Iterator<Item = &mut Client<D>> {
772 self.clients.iter_mut().flat_map(|o| o.as_mut()).filter(|c| !c.killed)
773 }
774
775 pub(crate) fn all_clients_id(&self) -> impl Iterator<Item = ClientId> + '_ {
776 self.clients.iter().flat_map(|opt| {
777 opt.as_ref().filter(|c| !c.killed).map(|client| ClientId { id: client.id.clone() })
778 })
779 }
780}