1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use futures_util::{ready, TryFuture};
6use pin_project::pin_project;
7
8use super::{Filter, FilterBase, Func, Internal};
9use crate::reject::CombineRejection;
10
11#[derive(Clone, Copy, Debug)]
12pub struct AndThen<T, F> {
13 pub(super) filter: T,
14 pub(super) callback: F,
15}
16
17impl<T, F> FilterBase for AndThen<T, F>
18where
19 T: Filter,
20 F: Func<T::Extract> + Clone + Send,
21 F::Output: TryFuture + Send,
22 <F::Output as TryFuture>::Error: CombineRejection<T::Error>,
23{
24 type Extract = (<F::Output as TryFuture>::Ok,);
25 type Error = <<F::Output as TryFuture>::Error as CombineRejection<T::Error>>::One;
26 type Future = AndThenFuture<T, F>;
27 #[inline]
28 fn filter(&self, _: Internal) -> Self::Future {
29 AndThenFuture {
30 state: State::First(self.filter.filter(Internal), self.callback.clone()),
31 }
32 }
33}
34
35#[allow(missing_debug_implementations)]
36#[pin_project]
37pub struct AndThenFuture<T, F>
38where
39 T: Filter,
40 F: Func<T::Extract>,
41 F::Output: TryFuture + Send,
42 <F::Output as TryFuture>::Error: CombineRejection<T::Error>,
43{
44 #[pin]
45 state: State<T::Future, F>,
46}
47
48#[pin_project(project = StateProj)]
49enum State<T, F>
50where
51 T: TryFuture,
52 F: Func<T::Ok>,
53 F::Output: TryFuture + Send,
54 <F::Output as TryFuture>::Error: CombineRejection<T::Error>,
55{
56 First(#[pin] T, F),
57 Second(#[pin] F::Output),
58 Done,
59}
60
61impl<T, F> Future for AndThenFuture<T, F>
62where
63 T: Filter,
64 F: Func<T::Extract>,
65 F::Output: TryFuture + Send,
66 <F::Output as TryFuture>::Error: CombineRejection<T::Error>,
67{
68 type Output = Result<
69 (<F::Output as TryFuture>::Ok,),
70 <<F::Output as TryFuture>::Error as CombineRejection<T::Error>>::One,
71 >;
72
73 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
74 self.project().state.poll(cx)
75 }
76}
77
78impl<T, F> Future for State<T, F>
79where
80 T: TryFuture,
81 F: Func<T::Ok>,
82 F::Output: TryFuture + Send,
83 <F::Output as TryFuture>::Error: CombineRejection<T::Error>,
84{
85 type Output = Result<
86 (<F::Output as TryFuture>::Ok,),
87 <<F::Output as TryFuture>::Error as CombineRejection<T::Error>>::One,
88 >;
89
90 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
91 loop {
92 match self.as_mut().project() {
93 StateProj::First(first, second) => {
94 let ex1 = ready!(first.try_poll(cx))?;
95 let fut2 = second.call(ex1);
96 self.set(State::Second(fut2));
97 }
98 StateProj::Second(second) => {
99 let ex2 = match ready!(second.try_poll(cx)) {
100 Ok(item) => Ok((item,)),
101 Err(err) => Err(From::from(err)),
102 };
103 self.set(State::Done);
104 return Poll::Ready(ex2);
105 }
106 StateProj::Done => panic!("polled after complete"),
107 }
108 }
109 }
110}