1use std::{
2 convert::Infallible,
3 future::Future,
4 marker::PhantomData,
5 pin::Pin,
6 task::{Context, Poll},
7 time::Duration,
8};
9
10use crate::rt::{Read, Write};
11use bytes::Bytes;
12use futures_channel::mpsc::{Receiver, Sender};
13use futures_channel::{mpsc, oneshot};
14use futures_core::{ready, FusedFuture, FusedStream, Stream};
15use h2::client::{Builder, Connection, SendRequest};
16use h2::SendStream;
17use http::{Method, StatusCode};
18use pin_project_lite::pin_project;
19
20use super::ping::{Ponger, Recorder};
21use super::{ping, PipeToSendStream, SendBuf};
22use crate::body::{Body, Incoming as IncomingBody};
23use crate::client::dispatch::{Callback, SendWhen, TrySendError};
24use crate::common::either::Either;
25use crate::common::io::Compat;
26use crate::common::time::Time;
27use crate::ext::Protocol;
28use crate::headers;
29use crate::proto::Dispatched;
30use crate::rt::bounds::{Http2ClientConnExec, Http2UpgradedExec};
31use crate::upgrade::Upgraded;
32use crate::{Request, Response};
33use h2::client::ResponseFuture;
34
35type ClientRx<B> = crate::client::dispatch::Receiver<Request<B>, Response<IncomingBody>>;
36
37type ConnDropRef = mpsc::Sender<Infallible>;
40
41type ConnEof = oneshot::Receiver<Infallible>;
44
45const DEFAULT_CONN_WINDOW: u32 = 1024 * 1024 * 5; const DEFAULT_STREAM_WINDOW: u32 = 1024 * 1024 * 2; const DEFAULT_MAX_FRAME_SIZE: u32 = 1024 * 16; const DEFAULT_MAX_SEND_BUF_SIZE: usize = 1024 * 1024; const DEFAULT_MAX_HEADER_LIST_SIZE: u32 = 1024 * 16; const DEFAULT_INITIAL_MAX_SEND_STREAMS: usize = 100;
62
63#[derive(Clone, Debug)]
64pub(crate) struct Config {
65 pub(crate) adaptive_window: bool,
66 pub(crate) initial_conn_window_size: u32,
67 pub(crate) initial_stream_window_size: u32,
68 pub(crate) initial_max_send_streams: usize,
69 pub(crate) max_frame_size: Option<u32>,
70 pub(crate) max_header_list_size: u32,
71 pub(crate) keep_alive_interval: Option<Duration>,
72 pub(crate) keep_alive_timeout: Duration,
73 pub(crate) keep_alive_while_idle: bool,
74 pub(crate) max_concurrent_reset_streams: Option<usize>,
75 pub(crate) max_send_buffer_size: usize,
76 pub(crate) max_pending_accept_reset_streams: Option<usize>,
77 pub(crate) max_local_error_reset_streams: Option<usize>,
78 pub(crate) header_table_size: Option<u32>,
79 pub(crate) max_concurrent_streams: Option<u32>,
80 pub(crate) reset_stream_duration: Option<Duration>,
81}
82
83impl Default for Config {
84 fn default() -> Config {
85 Config {
86 adaptive_window: false,
87 initial_conn_window_size: DEFAULT_CONN_WINDOW,
88 initial_stream_window_size: DEFAULT_STREAM_WINDOW,
89 initial_max_send_streams: DEFAULT_INITIAL_MAX_SEND_STREAMS,
90 max_frame_size: Some(DEFAULT_MAX_FRAME_SIZE),
91 max_header_list_size: DEFAULT_MAX_HEADER_LIST_SIZE,
92 keep_alive_interval: None,
93 keep_alive_timeout: Duration::from_secs(20),
94 keep_alive_while_idle: false,
95 max_concurrent_reset_streams: None,
96 max_send_buffer_size: DEFAULT_MAX_SEND_BUF_SIZE,
97 max_pending_accept_reset_streams: None,
98 max_local_error_reset_streams: Some(1024),
99 header_table_size: None,
100 max_concurrent_streams: None,
101 reset_stream_duration: None,
102 }
103 }
104}
105
106fn new_builder(config: &Config) -> Builder {
107 let mut builder = Builder::default();
108 builder
109 .initial_max_send_streams(config.initial_max_send_streams)
110 .initial_window_size(config.initial_stream_window_size)
111 .initial_connection_window_size(config.initial_conn_window_size)
112 .max_header_list_size(config.max_header_list_size)
113 .max_send_buffer_size(config.max_send_buffer_size)
114 .max_local_error_reset_streams(config.max_local_error_reset_streams)
115 .enable_push(false);
116 if let Some(max) = config.max_frame_size {
117 builder.max_frame_size(max);
118 }
119 if let Some(max) = config.max_concurrent_reset_streams {
120 builder.max_concurrent_reset_streams(max);
121 }
122 if let Some(max) = config.max_pending_accept_reset_streams {
123 builder.max_pending_accept_reset_streams(max);
124 }
125 if let Some(size) = config.header_table_size {
126 builder.header_table_size(size);
127 }
128 if let Some(max) = config.max_concurrent_streams {
129 builder.max_concurrent_streams(max);
130 }
131 if let Some(dur) = config.reset_stream_duration {
132 builder.reset_stream_duration(dur);
133 }
134 builder
135}
136
137fn new_ping_config(config: &Config) -> ping::Config {
138 ping::Config {
139 bdp_initial_window: if config.adaptive_window {
140 Some(config.initial_stream_window_size)
141 } else {
142 None
143 },
144 keep_alive_interval: config.keep_alive_interval,
145 keep_alive_timeout: config.keep_alive_timeout,
146 keep_alive_while_idle: config.keep_alive_while_idle,
147 }
148}
149
150pub(crate) async fn handshake<T, B, E>(
151 io: T,
152 req_rx: ClientRx<B>,
153 config: &Config,
154 mut exec: E,
155 timer: Time,
156) -> crate::Result<ClientTask<B, E, T>>
157where
158 T: Read + Write + Unpin,
159 B: Body + 'static,
160 B::Data: Send + 'static,
161 E: Http2ClientConnExec<B, T> + Clone + Unpin,
162 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
163{
164 let (h2_tx, mut conn) = new_builder(config)
165 .handshake::<_, SendBuf<B::Data>>(Compat::new(io))
166 .await
167 .map_err(crate::Error::new_h2)?;
168
169 let (conn_drop_ref, conn_drop_rx) = mpsc::channel(1);
174 let (cancel_tx, conn_eof) = oneshot::channel();
175
176 let ping_config = new_ping_config(config);
177
178 let (conn, ping) = if ping_config.is_enabled() {
179 let pp = conn.ping_pong().expect("conn.ping_pong");
180 let (recorder, ponger) = ping::channel(pp, ping_config, timer);
181
182 let conn: Conn<_, B> = Conn::new(ponger, conn);
183 (Either::left(conn), recorder)
184 } else {
185 (Either::right(conn), ping::disabled())
186 };
187 let conn: ConnMapErr<T, B> = ConnMapErr {
188 conn,
189 is_terminated: false,
190 };
191
192 exec.execute_h2_future(H2ClientFuture::Task {
193 task: ConnTask::new(conn, conn_drop_rx, cancel_tx),
194 });
195
196 Ok(ClientTask {
197 ping,
198 conn_drop_ref,
199 conn_eof,
200 executor: exec,
201 h2_tx,
202 req_rx,
203 fut_ctx: None,
204 marker: PhantomData,
205 })
206}
207
208pin_project! {
209 struct Conn<T, B>
210 where
211 B: Body,
212 {
213 #[pin]
214 ponger: Ponger,
215 #[pin]
216 conn: Connection<Compat<T>, SendBuf<<B as Body>::Data>>,
217 }
218}
219
220impl<T, B> Conn<T, B>
221where
222 B: Body,
223 T: Read + Write + Unpin,
224{
225 fn new(ponger: Ponger, conn: Connection<Compat<T>, SendBuf<<B as Body>::Data>>) -> Self {
226 Conn { ponger, conn }
227 }
228}
229
230impl<T, B> Future for Conn<T, B>
231where
232 B: Body,
233 T: Read + Write + Unpin,
234{
235 type Output = Result<(), h2::Error>;
236
237 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
238 let mut this = self.project();
239 match this.ponger.poll(cx) {
240 Poll::Ready(ping::Ponged::SizeUpdate(wnd)) => {
241 this.conn.set_target_window_size(wnd);
242 this.conn.set_initial_window_size(wnd)?;
243 }
244 Poll::Ready(ping::Ponged::KeepAliveTimedOut) => {
245 debug!("connection keep-alive timed out");
246 return Poll::Ready(Ok(()));
247 }
248 Poll::Pending => {}
249 }
250
251 Pin::new(&mut this.conn).poll(cx)
252 }
253}
254
255pin_project! {
256 struct ConnMapErr<T, B>
257 where
258 B: Body,
259 T: Read,
260 T: Write,
261 T: Unpin,
262 {
263 #[pin]
264 conn: Either<Conn<T, B>, Connection<Compat<T>, SendBuf<<B as Body>::Data>>>,
265 #[pin]
266 is_terminated: bool,
267 }
268}
269
270impl<T, B> Future for ConnMapErr<T, B>
271where
272 B: Body,
273 T: Read + Write + Unpin,
274{
275 type Output = Result<(), ()>;
276
277 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
278 let mut this = self.project();
279
280 if *this.is_terminated {
281 return Poll::Pending;
282 }
283 let polled = this.conn.poll(cx);
284 if polled.is_ready() {
285 *this.is_terminated = true;
286 }
287 polled.map_err(|_e| {
288 debug!(error = %_e, "connection error");
289 })
290 }
291}
292
293impl<T, B> FusedFuture for ConnMapErr<T, B>
294where
295 B: Body,
296 T: Read + Write + Unpin,
297{
298 fn is_terminated(&self) -> bool {
299 self.is_terminated
300 }
301}
302
303pin_project! {
304 pub struct ConnTask<T, B>
305 where
306 B: Body,
307 T: Read,
308 T: Write,
309 T: Unpin,
310 {
311 #[pin]
312 drop_rx: Receiver<Infallible>,
313 #[pin]
314 cancel_tx: Option<oneshot::Sender<Infallible>>,
315 #[pin]
316 conn: ConnMapErr<T, B>,
317 }
318}
319
320impl<T, B> ConnTask<T, B>
321where
322 B: Body,
323 T: Read + Write + Unpin,
324{
325 fn new(
326 conn: ConnMapErr<T, B>,
327 drop_rx: Receiver<Infallible>,
328 cancel_tx: oneshot::Sender<Infallible>,
329 ) -> Self {
330 Self {
331 drop_rx,
332 cancel_tx: Some(cancel_tx),
333 conn,
334 }
335 }
336}
337
338impl<T, B> Future for ConnTask<T, B>
339where
340 B: Body,
341 T: Read + Write + Unpin,
342{
343 type Output = ();
344
345 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
346 let mut this = self.project();
347
348 if !this.conn.is_terminated() && Pin::new(&mut this.conn).poll(cx).is_ready() {
349 return Poll::Ready(());
351 }
352
353 if !this.drop_rx.is_terminated() && Pin::new(&mut this.drop_rx).poll_next(cx).is_ready() {
354 trace!("send_request dropped, starting conn shutdown");
358 drop(this.cancel_tx.take().expect("ConnTask Future polled twice"));
359 }
360
361 Poll::Pending
362 }
363}
364
365pin_project! {
366 #[project = H2ClientFutureProject]
367 pub enum H2ClientFuture<B, T, E>
368 where
369 B: http_body::Body,
370 B: 'static,
371 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
372 T: Read,
373 T: Write,
374 T: Unpin,
375 {
376 Pipe {
377 #[pin]
378 pipe: PipeMap<B>,
379 },
380 Send {
381 #[pin]
382 send_when: SendWhen<B, E>,
383 },
384 Task {
385 #[pin]
386 task: ConnTask<T, B>,
387 },
388 }
389}
390
391impl<B, T, E> Future for H2ClientFuture<B, T, E>
392where
393 B: http_body::Body + 'static,
394 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
395 T: Read + Write + Unpin,
396 E: Http2UpgradedExec<B::Data>,
397{
398 type Output = ();
399
400 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll<Self::Output> {
401 let this = self.project();
402
403 match this {
404 H2ClientFutureProject::Pipe { pipe } => pipe.poll(cx),
405 H2ClientFutureProject::Send { send_when } => send_when.poll(cx),
406 H2ClientFutureProject::Task { task } => task.poll(cx),
407 }
408 }
409}
410
411struct FutCtx<B>
412where
413 B: Body,
414{
415 is_connect: bool,
416 eos: bool,
417 fut: ResponseFuture,
418 body_tx: SendStream<SendBuf<B::Data>>,
419 body: B,
420 cb: Callback<Request<B>, Response<IncomingBody>>,
421}
422
423impl<B: Body> Unpin for FutCtx<B> {}
424
425pub(crate) struct ClientTask<B, E, T>
426where
427 B: Body,
428 E: Unpin,
429{
430 ping: ping::Recorder,
431 conn_drop_ref: ConnDropRef,
432 conn_eof: ConnEof,
433 executor: E,
434 h2_tx: SendRequest<SendBuf<B::Data>>,
435 req_rx: ClientRx<B>,
436 fut_ctx: Option<FutCtx<B>>,
437 marker: PhantomData<T>,
438}
439
440impl<B, E, T> ClientTask<B, E, T>
441where
442 B: Body + 'static,
443 E: Http2ClientConnExec<B, T> + Unpin,
444 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
445 T: Read + Write + Unpin,
446{
447 pub(crate) fn is_extended_connect_protocol_enabled(&self) -> bool {
448 self.h2_tx.is_extended_connect_protocol_enabled()
449 }
450 pub(crate) fn current_max_send_streams(&self) -> usize {
451 self.h2_tx.current_max_send_streams()
452 }
453 pub(crate) fn current_max_recv_streams(&self) -> usize {
454 self.h2_tx.current_max_recv_streams()
455 }
456}
457
458pin_project! {
459 pub struct PipeMap<S>
460 where
461 S: Body,
462 {
463 #[pin]
464 pipe: PipeToSendStream<S>,
465 #[pin]
466 conn_drop_ref: Option<Sender<Infallible>>,
467 #[pin]
468 ping: Option<Recorder>,
469 cancel_rx: Option<oneshot::Receiver<()>>,
470 }
471}
472
473impl<B> Future for PipeMap<B>
474where
475 B: http_body::Body,
476 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
477{
478 type Output = ();
479
480 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll<Self::Output> {
481 let mut this = self.project();
482
483 let cancel_result = this.cancel_rx.as_mut().map(|rx| Pin::new(rx).poll(cx));
487 match cancel_result {
488 Some(Poll::Ready(Ok(()))) => {
489 debug!("client request body send cancelled, resetting stream");
490 this.pipe.as_mut().send_reset(h2::Reason::CANCEL);
491 drop(this.conn_drop_ref.take().expect("Future polled twice"));
492 drop(this.ping.take().expect("Future polled twice"));
493 return Poll::Ready(());
494 }
495 Some(Poll::Ready(Err(_))) => {
496 *this.cancel_rx = None;
499 }
500 Some(Poll::Pending) | None => {}
501 }
502
503 match Pin::new(&mut this.pipe).poll(cx) {
504 Poll::Ready(result) => {
505 if let Err(_e) = result {
506 debug!("client request body error: {}", _e);
507 }
508 drop(this.conn_drop_ref.take().expect("Future polled twice"));
509 drop(this.ping.take().expect("Future polled twice"));
510 return Poll::Ready(());
511 }
512 Poll::Pending => (),
513 };
514 Poll::Pending
515 }
516}
517
518impl<B, E, T> ClientTask<B, E, T>
519where
520 B: Body + 'static + Unpin,
521 B::Data: Send,
522 E: Http2ClientConnExec<B, T> + Clone + Unpin,
523 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
524 T: Read + Write + Unpin,
525{
526 fn poll_pipe(&mut self, f: FutCtx<B>, cx: &mut Context<'_>) {
527 let ping = self.ping.clone();
528
529 let (cancel_tx, cancel_rx) = oneshot::channel::<()>();
532
533 let send_stream = if !f.is_connect {
534 if !f.eos {
535 let mut pipe = PipeToSendStream::new(f.body, f.body_tx);
536
537 match Pin::new(&mut pipe).poll(cx) {
540 Poll::Ready(_) => (),
541 Poll::Pending => {
542 let conn_drop_ref = self.conn_drop_ref.clone();
543 let ping = ping.clone();
547
548 let pipe = PipeMap {
549 pipe,
550 conn_drop_ref: Some(conn_drop_ref),
551 ping: Some(ping),
552 cancel_rx: Some(cancel_rx),
553 };
554 self.executor
556 .execute_h2_future(H2ClientFuture::Pipe { pipe });
557 }
558 }
559 }
560
561 None
562 } else {
563 Some(f.body_tx)
564 };
565
566 self.executor.execute_h2_future(H2ClientFuture::Send {
567 send_when: SendWhen {
568 when: ResponseFutMap {
569 fut: f.fut,
570 ping: Some(ping),
571 send_stream: Some(send_stream),
572 exec: self.executor.clone(),
573 cancel_tx: Some(cancel_tx),
574 },
575 call_back: Some(f.cb),
576 },
577 });
578 }
579}
580
581pin_project! {
582 pub(crate) struct ResponseFutMap<B, E>
583 where
584 B: Body,
585 B: 'static,
586 {
587 #[pin]
588 fut: ResponseFuture,
589 ping: Option<Recorder>,
590 #[pin]
591 send_stream: Option<Option<SendStream<SendBuf<<B as Body>::Data>>>>,
592 exec: E,
593 cancel_tx: Option<oneshot::Sender<()>>,
594 }
595}
596
597impl<B: Body + 'static, E> ResponseFutMap<B, E> {
598 pub(crate) fn cancel(self: Pin<&mut Self>) {
600 if let Some(cancel_tx) = self.project().cancel_tx.take() {
601 let _ = cancel_tx.send(());
602 }
603 }
604}
605
606impl<B, E> Future for ResponseFutMap<B, E>
607where
608 B: Body + 'static,
609 E: Http2UpgradedExec<B::Data>,
610{
611 type Output = Result<Response<crate::body::Incoming>, (crate::Error, Option<Request<B>>)>;
612
613 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
614 let mut this = self.as_mut().project();
615
616 let result = ready!(this.fut.poll(cx));
617
618 let ping = this.ping.take().expect("Future polled twice");
619 let send_stream = this.send_stream.take().expect("Future polled twice");
620
621 match result {
622 Ok(res) => {
623 ping.record_non_data();
625
626 let content_length = headers::content_length_parse_all(res.headers());
627 if let (Some(mut send_stream), StatusCode::OK) = (send_stream, res.status()) {
628 if content_length.map_or(false, |len| len != 0) {
629 warn!("h2 connect response with non-zero body not supported");
630
631 send_stream.send_reset(h2::Reason::INTERNAL_ERROR);
632 return Poll::Ready(Err((
633 crate::Error::new_h2(h2::Reason::INTERNAL_ERROR.into()),
634 None::<Request<B>>,
635 )));
636 }
637 let (parts, recv_stream) = res.into_parts();
638 let mut res = Response::from_parts(parts, IncomingBody::empty());
639
640 let (pending, on_upgrade) = crate::upgrade::pending();
641
642 let (h2_up, up_task) = super::upgrade::pair(send_stream, recv_stream, ping);
643 self.exec.execute_upgrade(up_task);
644 let upgraded = Upgraded::new(h2_up, Bytes::new());
645
646 pending.fulfill(upgraded);
647 res.extensions_mut().insert(on_upgrade);
648
649 Poll::Ready(Ok(res))
650 } else {
651 let res = res.map(|stream| {
652 let ping = ping.for_stream(&stream);
653 IncomingBody::h2(stream, content_length.into(), ping)
654 });
655 Poll::Ready(Ok(res))
656 }
657 }
658 Err(err) => {
659 ping.ensure_not_timed_out().map_err(|e| (e, None))?;
660
661 debug!("client response error: {}", err);
662 Poll::Ready(Err((crate::Error::new_h2(err), None::<Request<B>>)))
663 }
664 }
665 }
666}
667
668impl<B, E, T> Future for ClientTask<B, E, T>
669where
670 B: Body + 'static + Unpin,
671 B::Data: Send,
672 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
673 E: Http2ClientConnExec<B, T> + Clone + Unpin,
674 T: Read + Write + Unpin,
675{
676 type Output = crate::Result<Dispatched>;
677
678 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
679 loop {
680 match ready!(self.h2_tx.poll_ready(cx)) {
681 Ok(()) => (),
682 Err(err) => {
683 self.ping.ensure_not_timed_out()?;
684 return if err.reason() == Some(::h2::Reason::NO_ERROR) {
685 trace!("connection gracefully shutdown");
686 Poll::Ready(Ok(Dispatched::Shutdown))
687 } else {
688 Poll::Ready(Err(crate::Error::new_h2(err)))
689 };
690 }
691 };
692
693 if let Some(f) = self.fut_ctx.take() {
696 self.poll_pipe(f, cx);
697 continue;
698 }
699
700 match self.req_rx.poll_recv(cx) {
701 Poll::Ready(Some((req, cb))) => {
702 if cb.is_canceled() {
704 trace!("request callback is canceled");
705 continue;
706 }
707 let (head, body) = req.into_parts();
708 let mut req = ::http::Request::from_parts(head, ());
709 super::strip_connection_headers(req.headers_mut(), true);
710 if let Some(len) = body.size_hint().exact() {
711 if len != 0 || headers::method_has_defined_payload_semantics(req.method()) {
712 headers::set_content_length_if_missing(req.headers_mut(), len);
713 }
714 }
715
716 let is_connect = req.method() == Method::CONNECT;
717 let eos = body.is_end_stream();
718
719 if is_connect
720 && headers::content_length_parse_all(req.headers())
721 .map_or(false, |len| len != 0)
722 {
723 debug!("h2 connect request with non-zero body not supported");
724 cb.send(Err(TrySendError {
725 error: crate::Error::new_user_invalid_connect(),
726 message: None,
727 }));
728 continue;
729 }
730
731 if let Some(protocol) = req.extensions_mut().remove::<Protocol>() {
732 req.extensions_mut().insert(protocol.into_inner());
733 }
734
735 let (fut, body_tx) = match self.h2_tx.send_request(req, !is_connect && eos) {
736 Ok(ok) => ok,
737 Err(err) => {
738 debug!("client send request error: {}", err);
739 cb.send(Err(TrySendError {
740 error: crate::Error::new_h2(err),
741 message: None,
742 }));
743 continue;
744 }
745 };
746
747 let f = FutCtx {
748 is_connect,
749 eos,
750 fut,
751 body_tx,
752 body,
753 cb,
754 };
755
756 match self.h2_tx.poll_ready(cx) {
760 Poll::Pending => {
761 self.fut_ctx = Some(f);
763 return Poll::Pending;
764 }
765 Poll::Ready(Ok(())) => (),
766 Poll::Ready(Err(err)) => {
767 f.cb.send(Err(TrySendError {
768 error: crate::Error::new_h2(err),
769 message: None,
770 }));
771 continue;
772 }
773 }
774 self.poll_pipe(f, cx);
775 continue;
776 }
777
778 Poll::Ready(None) => {
779 trace!("client::dispatch::Sender dropped");
780 return Poll::Ready(Ok(Dispatched::Shutdown));
781 }
782
783 Poll::Pending => match ready!(Pin::new(&mut self.conn_eof).poll(cx)) {
784 #[allow(unused)]
787 Ok(never) => match never {},
788 Err(_conn_is_eof) => {
789 trace!("connection task is closed, closing dispatch task");
790 return Poll::Ready(Ok(Dispatched::Shutdown));
791 }
792 },
793 }
794 }
795 }
796}