1use std::task::{Context, Poll};
2#[cfg(feature = "http2")]
3use std::{future::Future, pin::Pin};
4
5#[cfg(feature = "http2")]
6use http::{Request, Response};
7#[cfg(feature = "http2")]
8use http_body::Body;
9#[cfg(feature = "http2")]
10use pin_project_lite::pin_project;
11use tokio::sync::{mpsc, oneshot};
12
13#[cfg(feature = "http2")]
14use crate::{body::Incoming, proto::h2::client::ResponseFutMap};
15
16pub(crate) type RetryPromise<T, U> = oneshot::Receiver<Result<U, TrySendError<T>>>;
17pub(crate) type Promise<T> = oneshot::Receiver<Result<T, crate::Error>>;
18
19#[derive(Debug)]
26pub struct TrySendError<T> {
27 pub(crate) error: crate::Error,
28 pub(crate) message: Option<T>,
29}
30
31pub(crate) fn channel<T, U>() -> (Sender<T, U>, Receiver<T, U>) {
32 let (tx, rx) = mpsc::unbounded_channel();
33 let (giver, taker) = want::new();
34 let tx = Sender {
35 #[cfg(feature = "http1")]
36 buffered_once: false,
37 giver,
38 inner: tx,
39 };
40 let rx = Receiver { inner: rx, taker };
41 (tx, rx)
42}
43
44pub(crate) struct Sender<T, U> {
49 #[cfg(feature = "http1")]
53 buffered_once: bool,
54 giver: want::Giver,
59 inner: mpsc::UnboundedSender<Envelope<T, U>>,
61}
62
63#[cfg(feature = "http2")]
68pub(crate) struct UnboundedSender<T, U> {
69 giver: want::SharedGiver,
71 inner: mpsc::UnboundedSender<Envelope<T, U>>,
72}
73
74impl<T, U> Sender<T, U> {
75 #[cfg(feature = "http1")]
76 pub(crate) fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
77 self.giver
78 .poll_want(cx)
79 .map_err(|_| crate::Error::new_closed())
80 }
81
82 #[cfg(feature = "http1")]
83 pub(crate) fn is_ready(&self) -> bool {
84 self.giver.is_wanting()
85 }
86
87 #[cfg(feature = "http1")]
88 pub(crate) fn is_closed(&self) -> bool {
89 self.giver.is_canceled()
90 }
91
92 #[cfg(feature = "http1")]
93 fn can_send(&mut self) -> bool {
94 if self.giver.give() || !self.buffered_once {
95 self.buffered_once = true;
100 true
101 } else {
102 false
103 }
104 }
105
106 #[cfg(feature = "http1")]
107 pub(crate) fn try_send(&mut self, val: T) -> Result<RetryPromise<T, U>, T> {
108 if !self.can_send() {
109 return Err(val);
110 }
111 let (tx, rx) = oneshot::channel();
112 self.inner
113 .send(Envelope(Some((val, Callback::Retry(Some(tx))))))
114 .map(move |_| rx)
115 .map_err(|mut e| (e.0).0.take().expect("envelope not dropped").0)
116 }
117
118 #[cfg(feature = "http1")]
119 pub(crate) fn send(&mut self, val: T) -> Result<Promise<U>, T> {
120 if !self.can_send() {
121 return Err(val);
122 }
123 let (tx, rx) = oneshot::channel();
124 self.inner
125 .send(Envelope(Some((val, Callback::NoRetry(Some(tx))))))
126 .map(move |_| rx)
127 .map_err(|mut e| (e.0).0.take().expect("envelope not dropped").0)
128 }
129
130 #[cfg(feature = "http2")]
131 pub(crate) fn unbound(self) -> UnboundedSender<T, U> {
132 UnboundedSender {
133 giver: self.giver.shared(),
134 inner: self.inner,
135 }
136 }
137}
138
139#[cfg(feature = "http2")]
140impl<T, U> UnboundedSender<T, U> {
141 pub(crate) fn is_ready(&self) -> bool {
142 !self.giver.is_canceled()
143 }
144
145 pub(crate) fn is_closed(&self) -> bool {
146 self.giver.is_canceled()
147 }
148
149 pub(crate) fn try_send(&mut self, val: T) -> Result<RetryPromise<T, U>, T> {
150 let (tx, rx) = oneshot::channel();
151 self.inner
152 .send(Envelope(Some((val, Callback::Retry(Some(tx))))))
153 .map(move |_| rx)
154 .map_err(|mut e| (e.0).0.take().expect("envelope not dropped").0)
155 }
156
157 pub(crate) fn send(&mut self, val: T) -> Result<Promise<U>, T> {
158 let (tx, rx) = oneshot::channel();
159 self.inner
160 .send(Envelope(Some((val, Callback::NoRetry(Some(tx))))))
161 .map(move |_| rx)
162 .map_err(|mut e| (e.0).0.take().expect("envelope not dropped").0)
163 }
164}
165
166#[cfg(feature = "http2")]
167impl<T, U> Clone for UnboundedSender<T, U> {
168 fn clone(&self) -> Self {
169 UnboundedSender {
170 giver: self.giver.clone(),
171 inner: self.inner.clone(),
172 }
173 }
174}
175
176pub(crate) struct Receiver<T, U> {
177 inner: mpsc::UnboundedReceiver<Envelope<T, U>>,
178 taker: want::Taker,
179}
180
181impl<T, U> Receiver<T, U> {
182 pub(crate) fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<(T, Callback<T, U>)>> {
183 match self.inner.poll_recv(cx) {
184 Poll::Ready(item) => {
185 Poll::Ready(item.map(|mut env| env.0.take().expect("envelope not dropped")))
186 }
187 Poll::Pending => {
188 self.taker.want();
189 Poll::Pending
190 }
191 }
192 }
193
194 #[cfg(feature = "http1")]
195 pub(crate) fn close(&mut self) {
196 self.taker.cancel();
197 self.inner.close();
198 }
199
200 #[cfg(feature = "http1")]
201 pub(crate) fn try_recv(&mut self) -> Option<(T, Callback<T, U>)> {
202 match crate::common::task::now_or_never(self.inner.recv()) {
203 Some(Some(mut env)) => env.0.take(),
204 _ => None,
205 }
206 }
207}
208
209impl<T, U> Drop for Receiver<T, U> {
210 fn drop(&mut self) {
211 self.taker.cancel();
214 }
215}
216
217struct Envelope<T, U>(Option<(T, Callback<T, U>)>);
218
219impl<T, U> Drop for Envelope<T, U> {
220 fn drop(&mut self) {
221 if let Some((val, cb)) = self.0.take() {
222 cb.send(Err(TrySendError {
223 error: crate::Error::new_canceled().with("connection closed"),
224 message: Some(val),
225 }));
226 }
227 }
228}
229
230pub(crate) enum Callback<T, U> {
231 #[allow(unused)]
232 Retry(Option<oneshot::Sender<Result<U, TrySendError<T>>>>),
233 NoRetry(Option<oneshot::Sender<Result<U, crate::Error>>>),
234}
235
236impl<T, U> Drop for Callback<T, U> {
237 fn drop(&mut self) {
238 match self {
239 Callback::Retry(tx) => {
240 if let Some(tx) = tx.take() {
241 let _ = tx.send(Err(TrySendError {
242 error: dispatch_gone(),
243 message: None,
244 }));
245 }
246 }
247 Callback::NoRetry(tx) => {
248 if let Some(tx) = tx.take() {
249 let _ = tx.send(Err(dispatch_gone()));
250 }
251 }
252 }
253 }
254}
255
256#[cold]
257fn dispatch_gone() -> crate::Error {
258 crate::Error::new_user_dispatch_gone().with(if std::thread::panicking() {
260 "user code panicked"
261 } else {
262 "runtime dropped the dispatch task"
263 })
264}
265
266impl<T, U> Callback<T, U> {
267 #[cfg(feature = "http2")]
268 pub(crate) fn is_canceled(&self) -> bool {
269 match *self {
270 Callback::Retry(Some(ref tx)) => tx.is_closed(),
271 Callback::NoRetry(Some(ref tx)) => tx.is_closed(),
272 _ => unreachable!(),
273 }
274 }
275
276 pub(crate) fn poll_canceled(&mut self, cx: &mut Context<'_>) -> Poll<()> {
277 match *self {
278 Callback::Retry(Some(ref mut tx)) => tx.poll_closed(cx),
279 Callback::NoRetry(Some(ref mut tx)) => tx.poll_closed(cx),
280 _ => unreachable!(),
281 }
282 }
283
284 pub(crate) fn send(mut self, val: Result<U, TrySendError<T>>) {
285 match self {
286 Callback::Retry(ref mut tx) => {
287 let _ = tx
288 .take()
289 .expect("callback sender not dropped before send")
290 .send(val);
291 }
292 Callback::NoRetry(ref mut tx) => {
293 let _ = tx
294 .take()
295 .expect("callback sender not dropped before send")
296 .send(val.map_err(|e| e.error));
297 }
298 }
299 }
300}
301
302impl<T> TrySendError<T> {
303 pub fn take_message(&mut self) -> Option<T> {
309 self.message.take()
310 }
311
312 pub fn message(&self) -> Option<&T> {
318 self.message.as_ref()
319 }
320
321 pub fn into_error(self) -> crate::Error {
323 self.error
324 }
325
326 pub fn error(&self) -> &crate::Error {
328 &self.error
329 }
330}
331
332#[cfg(feature = "http2")]
333pin_project! {
334 pub struct SendWhen<B, E>
335 where
336 B: Body,
337 B: 'static,
338 {
339 #[pin]
340 pub(crate) when: ResponseFutMap<B, E>,
341 #[pin]
342 pub(crate) call_back: Option<Callback<Request<B>, Response<Incoming>>>,
343 }
344}
345
346#[cfg(feature = "http2")]
347impl<B, E> Future for SendWhen<B, E>
348where
349 B: Body + 'static,
350 E: crate::rt::bounds::Http2UpgradedExec<B::Data>,
351{
352 type Output = ();
353
354 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
355 let mut this = self.project();
356
357 let mut call_back = this.call_back.take().expect("polled after complete");
358
359 match Pin::new(&mut this.when).poll(cx) {
360 Poll::Ready(Ok(res)) => {
361 call_back.send(Ok(res));
362 Poll::Ready(())
363 }
364 Poll::Pending => {
365 match call_back.poll_canceled(cx) {
367 Poll::Ready(v) => v,
368 Poll::Pending => {
369 this.call_back.set(Some(call_back));
371 return Poll::Pending;
372 }
373 };
374 trace!("send_when canceled");
375 this.when.as_mut().cancel();
378 Poll::Ready(())
379 }
380 Poll::Ready(Err((error, message))) => {
381 call_back.send(Err(TrySendError { error, message }));
382 Poll::Ready(())
383 }
384 }
385 }
386}
387
388#[cfg(test)]
389mod tests {
390 #[cfg(feature = "nightly")]
391 extern crate test;
392
393 use std::future::Future;
394 use std::pin::Pin;
395 use std::task::{Context, Poll};
396
397 use super::{channel, Callback, Receiver};
398
399 #[derive(Debug)]
400 struct Custom(#[allow(dead_code)] i32);
401
402 impl<T, U> Future for Receiver<T, U> {
403 type Output = Option<(T, Callback<T, U>)>;
404
405 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
406 self.poll_recv(cx)
407 }
408 }
409
410 struct PollOnce<'a, F>(&'a mut F);
412
413 impl<F, T> Future for PollOnce<'_, F>
414 where
415 F: Future<Output = T> + Unpin,
416 {
417 type Output = Option<()>;
418
419 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
420 match Pin::new(&mut self.0).poll(cx) {
421 Poll::Ready(_) => Poll::Ready(Some(())),
422 Poll::Pending => Poll::Ready(None),
423 }
424 }
425 }
426
427 #[cfg(not(miri))]
428 #[tokio::test]
429 async fn drop_receiver_sends_cancel_errors() {
430 let _ = pretty_env_logger::try_init();
431
432 let (mut tx, mut rx) = channel::<Custom, ()>();
433
434 assert!(PollOnce(&mut rx).await.is_none(), "rx empty");
436
437 let promise = tx.try_send(Custom(43)).unwrap();
438 drop(rx);
439
440 let fulfilled = promise.await;
441 let err = fulfilled
442 .expect("fulfilled")
443 .expect_err("promise should error");
444 match (err.error.is_canceled(), err.message) {
445 (true, Some(_)) => (),
446 e => panic!("expected Error::Cancel(_), found {:?}", e),
447 }
448 }
449
450 #[cfg(not(miri))]
451 #[tokio::test]
452 async fn sender_checks_for_want_on_send() {
453 let (mut tx, mut rx) = channel::<Custom, ()>();
454
455 tx.try_send(Custom(1)).expect("1 buffered");
457 tx.try_send(Custom(2)).expect_err("2 not ready");
458
459 assert!(PollOnce(&mut rx).await.is_some(), "rx once");
460
461 tx.try_send(Custom(2)).expect_err("2 still not ready");
464
465 assert!(PollOnce(&mut rx).await.is_none(), "rx empty");
466
467 tx.try_send(Custom(2)).expect("2 ready");
468 }
469
470 #[cfg(feature = "http2")]
471 #[test]
472 fn unbounded_sender_doesnt_bound_on_want() {
473 let (tx, rx) = channel::<Custom, ()>();
474 let mut tx = tx.unbound();
475
476 tx.try_send(Custom(1)).unwrap();
477 tx.try_send(Custom(2)).unwrap();
478 tx.try_send(Custom(3)).unwrap();
479
480 drop(rx);
481
482 tx.try_send(Custom(4)).unwrap_err();
483 }
484
485 #[cfg(feature = "nightly")]
486 #[bench]
487 fn giver_queue_throughput(b: &mut test::Bencher) {
488 use crate::{body::Incoming, Request, Response};
489
490 let rt = tokio::runtime::Builder::new_current_thread()
491 .build()
492 .unwrap();
493 let (mut tx, mut rx) = channel::<Request<Incoming>, Response<Incoming>>();
494
495 b.iter(move || {
496 tx.send(Request::new(Incoming::empty())).unwrap();
497 rt.block_on(async {
498 loop {
499 let poll_once = PollOnce(&mut rx);
500 let opt = poll_once.await;
501 if opt.is_none() {
502 break;
503 }
504 }
505 });
506 })
507 }
508
509 #[cfg(feature = "nightly")]
510 #[bench]
511 fn giver_queue_not_ready(b: &mut test::Bencher) {
512 let rt = tokio::runtime::Builder::new_current_thread()
513 .build()
514 .unwrap();
515 let (_tx, mut rx) = channel::<i32, ()>();
516 b.iter(move || {
517 rt.block_on(async {
518 let poll_once = PollOnce(&mut rx);
519 assert!(poll_once.await.is_none());
520 });
521 })
522 }
523
524 #[cfg(feature = "nightly")]
525 #[bench]
526 fn giver_queue_cancel(b: &mut test::Bencher) {
527 let (_tx, mut rx) = channel::<i32, ()>();
528
529 b.iter(move || {
530 rx.taker.cancel();
531 })
532 }
533}