Skip to main content

hyper/client/
dispatch.rs

1use std::task::{Context, Poll};
2#[cfg(feature = "http2")]
3use std::{future::Future, pin::Pin};
4
5#[cfg(feature = "http2")]
6use http::{Request, Response};
7#[cfg(feature = "http2")]
8use http_body::Body;
9#[cfg(feature = "http2")]
10use pin_project_lite::pin_project;
11use tokio::sync::{mpsc, oneshot};
12
13#[cfg(feature = "http2")]
14use crate::{body::Incoming, proto::h2::client::ResponseFutMap};
15
16pub(crate) type RetryPromise<T, U> = oneshot::Receiver<Result<U, TrySendError<T>>>;
17pub(crate) type Promise<T> = oneshot::Receiver<Result<T, crate::Error>>;
18
19/// An error when calling `try_send_request`.
20///
21/// There is a possibility of an error occurring on a connection in-between the
22/// time that a request is queued and when it is actually written to the IO
23/// transport. If that happens, it is safe to return the request back to the
24/// caller, as it was never fully sent.
25#[derive(Debug)]
26pub struct TrySendError<T> {
27    pub(crate) error: crate::Error,
28    pub(crate) message: Option<T>,
29}
30
31pub(crate) fn channel<T, U>() -> (Sender<T, U>, Receiver<T, U>) {
32    let (tx, rx) = mpsc::unbounded_channel();
33    let (giver, taker) = want::new();
34    let tx = Sender {
35        #[cfg(feature = "http1")]
36        buffered_once: false,
37        giver,
38        inner: tx,
39    };
40    let rx = Receiver { inner: rx, taker };
41    (tx, rx)
42}
43
44/// A bounded sender of requests and callbacks for when responses are ready.
45///
46/// While the inner sender is unbounded, the Giver is used to determine
47/// if the Receiver is ready for another request.
48pub(crate) struct Sender<T, U> {
49    /// One message is always allowed, even if the Receiver hasn't asked
50    /// for it yet. This boolean keeps track of whether we've sent one
51    /// without notice.
52    #[cfg(feature = "http1")]
53    buffered_once: bool,
54    /// The Giver helps watch that the Receiver side has been polled
55    /// when the queue is empty. This helps us know when a request and
56    /// response have been fully processed, and a connection is ready
57    /// for more.
58    giver: want::Giver,
59    /// Actually bounded by the Giver, plus `buffered_once`.
60    inner: mpsc::UnboundedSender<Envelope<T, U>>,
61}
62
63/// An unbounded version.
64///
65/// Cannot poll the Giver, but can still use it to determine if the Receiver
66/// has been dropped. However, this version can be cloned.
67#[cfg(feature = "http2")]
68pub(crate) struct UnboundedSender<T, U> {
69    /// Only used for `is_closed`, since `mpsc::UnboundedSender` cannot be checked.
70    giver: want::SharedGiver,
71    inner: mpsc::UnboundedSender<Envelope<T, U>>,
72}
73
74impl<T, U> Sender<T, U> {
75    #[cfg(feature = "http1")]
76    pub(crate) fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
77        self.giver
78            .poll_want(cx)
79            .map_err(|_| crate::Error::new_closed())
80    }
81
82    #[cfg(feature = "http1")]
83    pub(crate) fn is_ready(&self) -> bool {
84        self.giver.is_wanting()
85    }
86
87    #[cfg(feature = "http1")]
88    pub(crate) fn is_closed(&self) -> bool {
89        self.giver.is_canceled()
90    }
91
92    #[cfg(feature = "http1")]
93    fn can_send(&mut self) -> bool {
94        if self.giver.give() || !self.buffered_once {
95            // If the receiver is ready *now*, then of course we can send.
96            //
97            // If the receiver isn't ready yet, but we don't have anything
98            // in the channel yet, then allow one message.
99            self.buffered_once = true;
100            true
101        } else {
102            false
103        }
104    }
105
106    #[cfg(feature = "http1")]
107    pub(crate) fn try_send(&mut self, val: T) -> Result<RetryPromise<T, U>, T> {
108        if !self.can_send() {
109            return Err(val);
110        }
111        let (tx, rx) = oneshot::channel();
112        self.inner
113            .send(Envelope(Some((val, Callback::Retry(Some(tx))))))
114            .map(move |_| rx)
115            .map_err(|mut e| (e.0).0.take().expect("envelope not dropped").0)
116    }
117
118    #[cfg(feature = "http1")]
119    pub(crate) fn send(&mut self, val: T) -> Result<Promise<U>, T> {
120        if !self.can_send() {
121            return Err(val);
122        }
123        let (tx, rx) = oneshot::channel();
124        self.inner
125            .send(Envelope(Some((val, Callback::NoRetry(Some(tx))))))
126            .map(move |_| rx)
127            .map_err(|mut e| (e.0).0.take().expect("envelope not dropped").0)
128    }
129
130    #[cfg(feature = "http2")]
131    pub(crate) fn unbound(self) -> UnboundedSender<T, U> {
132        UnboundedSender {
133            giver: self.giver.shared(),
134            inner: self.inner,
135        }
136    }
137}
138
139#[cfg(feature = "http2")]
140impl<T, U> UnboundedSender<T, U> {
141    pub(crate) fn is_ready(&self) -> bool {
142        !self.giver.is_canceled()
143    }
144
145    pub(crate) fn is_closed(&self) -> bool {
146        self.giver.is_canceled()
147    }
148
149    pub(crate) fn try_send(&mut self, val: T) -> Result<RetryPromise<T, U>, T> {
150        let (tx, rx) = oneshot::channel();
151        self.inner
152            .send(Envelope(Some((val, Callback::Retry(Some(tx))))))
153            .map(move |_| rx)
154            .map_err(|mut e| (e.0).0.take().expect("envelope not dropped").0)
155    }
156
157    pub(crate) fn send(&mut self, val: T) -> Result<Promise<U>, T> {
158        let (tx, rx) = oneshot::channel();
159        self.inner
160            .send(Envelope(Some((val, Callback::NoRetry(Some(tx))))))
161            .map(move |_| rx)
162            .map_err(|mut e| (e.0).0.take().expect("envelope not dropped").0)
163    }
164}
165
166#[cfg(feature = "http2")]
167impl<T, U> Clone for UnboundedSender<T, U> {
168    fn clone(&self) -> Self {
169        UnboundedSender {
170            giver: self.giver.clone(),
171            inner: self.inner.clone(),
172        }
173    }
174}
175
176pub(crate) struct Receiver<T, U> {
177    inner: mpsc::UnboundedReceiver<Envelope<T, U>>,
178    taker: want::Taker,
179}
180
181impl<T, U> Receiver<T, U> {
182    pub(crate) fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<(T, Callback<T, U>)>> {
183        match self.inner.poll_recv(cx) {
184            Poll::Ready(item) => {
185                Poll::Ready(item.map(|mut env| env.0.take().expect("envelope not dropped")))
186            }
187            Poll::Pending => {
188                self.taker.want();
189                Poll::Pending
190            }
191        }
192    }
193
194    #[cfg(feature = "http1")]
195    pub(crate) fn close(&mut self) {
196        self.taker.cancel();
197        self.inner.close();
198    }
199
200    #[cfg(feature = "http1")]
201    pub(crate) fn try_recv(&mut self) -> Option<(T, Callback<T, U>)> {
202        match crate::common::task::now_or_never(self.inner.recv()) {
203            Some(Some(mut env)) => env.0.take(),
204            _ => None,
205        }
206    }
207}
208
209impl<T, U> Drop for Receiver<T, U> {
210    fn drop(&mut self) {
211        // Notify the giver about the closure first, before dropping
212        // the mpsc::Receiver.
213        self.taker.cancel();
214    }
215}
216
217struct Envelope<T, U>(Option<(T, Callback<T, U>)>);
218
219impl<T, U> Drop for Envelope<T, U> {
220    fn drop(&mut self) {
221        if let Some((val, cb)) = self.0.take() {
222            cb.send(Err(TrySendError {
223                error: crate::Error::new_canceled().with("connection closed"),
224                message: Some(val),
225            }));
226        }
227    }
228}
229
230pub(crate) enum Callback<T, U> {
231    #[allow(unused)]
232    Retry(Option<oneshot::Sender<Result<U, TrySendError<T>>>>),
233    NoRetry(Option<oneshot::Sender<Result<U, crate::Error>>>),
234}
235
236impl<T, U> Drop for Callback<T, U> {
237    fn drop(&mut self) {
238        match self {
239            Callback::Retry(tx) => {
240                if let Some(tx) = tx.take() {
241                    let _ = tx.send(Err(TrySendError {
242                        error: dispatch_gone(),
243                        message: None,
244                    }));
245                }
246            }
247            Callback::NoRetry(tx) => {
248                if let Some(tx) = tx.take() {
249                    let _ = tx.send(Err(dispatch_gone()));
250                }
251            }
252        }
253    }
254}
255
256#[cold]
257fn dispatch_gone() -> crate::Error {
258    // FIXME(nox): What errors do we want here?
259    crate::Error::new_user_dispatch_gone().with(if std::thread::panicking() {
260        "user code panicked"
261    } else {
262        "runtime dropped the dispatch task"
263    })
264}
265
266impl<T, U> Callback<T, U> {
267    #[cfg(feature = "http2")]
268    pub(crate) fn is_canceled(&self) -> bool {
269        match *self {
270            Callback::Retry(Some(ref tx)) => tx.is_closed(),
271            Callback::NoRetry(Some(ref tx)) => tx.is_closed(),
272            _ => unreachable!(),
273        }
274    }
275
276    pub(crate) fn poll_canceled(&mut self, cx: &mut Context<'_>) -> Poll<()> {
277        match *self {
278            Callback::Retry(Some(ref mut tx)) => tx.poll_closed(cx),
279            Callback::NoRetry(Some(ref mut tx)) => tx.poll_closed(cx),
280            _ => unreachable!(),
281        }
282    }
283
284    pub(crate) fn send(mut self, val: Result<U, TrySendError<T>>) {
285        match self {
286            Callback::Retry(ref mut tx) => {
287                let _ = tx
288                    .take()
289                    .expect("callback sender not dropped before send")
290                    .send(val);
291            }
292            Callback::NoRetry(ref mut tx) => {
293                let _ = tx
294                    .take()
295                    .expect("callback sender not dropped before send")
296                    .send(val.map_err(|e| e.error));
297            }
298        }
299    }
300}
301
302impl<T> TrySendError<T> {
303    /// Take the message from this error.
304    ///
305    /// The message will not always have been recovered. If an error occurs
306    /// after the message has been serialized onto the connection, it will not
307    /// be available here.
308    pub fn take_message(&mut self) -> Option<T> {
309        self.message.take()
310    }
311
312    /// Returns a reference to the recovered message.
313    ///
314    /// The message will not always have been recovered. If an error occurs
315    /// after the message has been serialized onto the connection, it will not
316    /// be available here.
317    pub fn message(&self) -> Option<&T> {
318        self.message.as_ref()
319    }
320
321    /// Consumes this to return the inner error.
322    pub fn into_error(self) -> crate::Error {
323        self.error
324    }
325
326    /// Returns a reference to the inner error.
327    pub fn error(&self) -> &crate::Error {
328        &self.error
329    }
330}
331
332#[cfg(feature = "http2")]
333pin_project! {
334    pub struct SendWhen<B, E>
335    where
336        B: Body,
337        B: 'static,
338    {
339        #[pin]
340        pub(crate) when: ResponseFutMap<B, E>,
341        #[pin]
342        pub(crate) call_back: Option<Callback<Request<B>, Response<Incoming>>>,
343    }
344}
345
346#[cfg(feature = "http2")]
347impl<B, E> Future for SendWhen<B, E>
348where
349    B: Body + 'static,
350    E: crate::rt::bounds::Http2UpgradedExec<B::Data>,
351{
352    type Output = ();
353
354    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
355        let mut this = self.project();
356
357        let mut call_back = this.call_back.take().expect("polled after complete");
358
359        match Pin::new(&mut this.when).poll(cx) {
360            Poll::Ready(Ok(res)) => {
361                call_back.send(Ok(res));
362                Poll::Ready(())
363            }
364            Poll::Pending => {
365                // check if the callback is canceled
366                match call_back.poll_canceled(cx) {
367                    Poll::Ready(v) => v,
368                    Poll::Pending => {
369                        // Move call_back back to struct before return
370                        this.call_back.set(Some(call_back));
371                        return Poll::Pending;
372                    }
373                };
374                trace!("send_when canceled");
375                // Tell pipe_task to reset the h2 stream so that
376                // RST_STREAM is sent and flow-control capacity freed.
377                this.when.as_mut().cancel();
378                Poll::Ready(())
379            }
380            Poll::Ready(Err((error, message))) => {
381                call_back.send(Err(TrySendError { error, message }));
382                Poll::Ready(())
383            }
384        }
385    }
386}
387
388#[cfg(test)]
389mod tests {
390    #[cfg(feature = "nightly")]
391    extern crate test;
392
393    use std::future::Future;
394    use std::pin::Pin;
395    use std::task::{Context, Poll};
396
397    use super::{channel, Callback, Receiver};
398
399    #[derive(Debug)]
400    struct Custom(#[allow(dead_code)] i32);
401
402    impl<T, U> Future for Receiver<T, U> {
403        type Output = Option<(T, Callback<T, U>)>;
404
405        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
406            self.poll_recv(cx)
407        }
408    }
409
410    /// Helper to check if the future is ready after polling once.
411    struct PollOnce<'a, F>(&'a mut F);
412
413    impl<F, T> Future for PollOnce<'_, F>
414    where
415        F: Future<Output = T> + Unpin,
416    {
417        type Output = Option<()>;
418
419        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
420            match Pin::new(&mut self.0).poll(cx) {
421                Poll::Ready(_) => Poll::Ready(Some(())),
422                Poll::Pending => Poll::Ready(None),
423            }
424        }
425    }
426
427    #[cfg(not(miri))]
428    #[tokio::test]
429    async fn drop_receiver_sends_cancel_errors() {
430        let _ = pretty_env_logger::try_init();
431
432        let (mut tx, mut rx) = channel::<Custom, ()>();
433
434        // must poll once for try_send to succeed
435        assert!(PollOnce(&mut rx).await.is_none(), "rx empty");
436
437        let promise = tx.try_send(Custom(43)).unwrap();
438        drop(rx);
439
440        let fulfilled = promise.await;
441        let err = fulfilled
442            .expect("fulfilled")
443            .expect_err("promise should error");
444        match (err.error.is_canceled(), err.message) {
445            (true, Some(_)) => (),
446            e => panic!("expected Error::Cancel(_), found {:?}", e),
447        }
448    }
449
450    #[cfg(not(miri))]
451    #[tokio::test]
452    async fn sender_checks_for_want_on_send() {
453        let (mut tx, mut rx) = channel::<Custom, ()>();
454
455        // one is allowed to buffer, second is rejected
456        tx.try_send(Custom(1)).expect("1 buffered");
457        tx.try_send(Custom(2)).expect_err("2 not ready");
458
459        assert!(PollOnce(&mut rx).await.is_some(), "rx once");
460
461        // Even though 1 has been popped, only 1 could be buffered for the
462        // lifetime of the channel.
463        tx.try_send(Custom(2)).expect_err("2 still not ready");
464
465        assert!(PollOnce(&mut rx).await.is_none(), "rx empty");
466
467        tx.try_send(Custom(2)).expect("2 ready");
468    }
469
470    #[cfg(feature = "http2")]
471    #[test]
472    fn unbounded_sender_doesnt_bound_on_want() {
473        let (tx, rx) = channel::<Custom, ()>();
474        let mut tx = tx.unbound();
475
476        tx.try_send(Custom(1)).unwrap();
477        tx.try_send(Custom(2)).unwrap();
478        tx.try_send(Custom(3)).unwrap();
479
480        drop(rx);
481
482        tx.try_send(Custom(4)).unwrap_err();
483    }
484
485    #[cfg(feature = "nightly")]
486    #[bench]
487    fn giver_queue_throughput(b: &mut test::Bencher) {
488        use crate::{body::Incoming, Request, Response};
489
490        let rt = tokio::runtime::Builder::new_current_thread()
491            .build()
492            .unwrap();
493        let (mut tx, mut rx) = channel::<Request<Incoming>, Response<Incoming>>();
494
495        b.iter(move || {
496            tx.send(Request::new(Incoming::empty())).unwrap();
497            rt.block_on(async {
498                loop {
499                    let poll_once = PollOnce(&mut rx);
500                    let opt = poll_once.await;
501                    if opt.is_none() {
502                        break;
503                    }
504                }
505            });
506        })
507    }
508
509    #[cfg(feature = "nightly")]
510    #[bench]
511    fn giver_queue_not_ready(b: &mut test::Bencher) {
512        let rt = tokio::runtime::Builder::new_current_thread()
513            .build()
514            .unwrap();
515        let (_tx, mut rx) = channel::<i32, ()>();
516        b.iter(move || {
517            rt.block_on(async {
518                let poll_once = PollOnce(&mut rx);
519                assert!(poll_once.await.is_none());
520            });
521        })
522    }
523
524    #[cfg(feature = "nightly")]
525    #[bench]
526    fn giver_queue_cancel(b: &mut test::Bencher) {
527        let (_tx, mut rx) = channel::<i32, ()>();
528
529        b.iter(move || {
530            rx.taker.cancel();
531        })
532    }
533}