warp/filter/
and_then.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use futures_util::{ready, TryFuture};
6use pin_project::pin_project;
7
8use super::{Filter, FilterBase, Func, Internal};
9use crate::reject::CombineRejection;
10
11#[derive(Clone, Copy, Debug)]
12pub struct AndThen<T, F> {
13    pub(super) filter: T,
14    pub(super) callback: F,
15}
16
17impl<T, F> FilterBase for AndThen<T, F>
18where
19    T: Filter,
20    F: Func<T::Extract> + Clone + Send,
21    F::Output: TryFuture + Send,
22    <F::Output as TryFuture>::Error: CombineRejection<T::Error>,
23{
24    type Extract = (<F::Output as TryFuture>::Ok,);
25    type Error = <<F::Output as TryFuture>::Error as CombineRejection<T::Error>>::One;
26    type Future = AndThenFuture<T, F>;
27    #[inline]
28    fn filter(&self, _: Internal) -> Self::Future {
29        AndThenFuture {
30            state: State::First(self.filter.filter(Internal), self.callback.clone()),
31        }
32    }
33}
34
35#[allow(missing_debug_implementations)]
36#[pin_project]
37pub struct AndThenFuture<T, F>
38where
39    T: Filter,
40    F: Func<T::Extract>,
41    F::Output: TryFuture + Send,
42    <F::Output as TryFuture>::Error: CombineRejection<T::Error>,
43{
44    #[pin]
45    state: State<T::Future, F>,
46}
47
48#[pin_project(project = StateProj)]
49enum State<T, F>
50where
51    T: TryFuture,
52    F: Func<T::Ok>,
53    F::Output: TryFuture + Send,
54    <F::Output as TryFuture>::Error: CombineRejection<T::Error>,
55{
56    First(#[pin] T, F),
57    Second(#[pin] F::Output),
58    Done,
59}
60
61impl<T, F> Future for AndThenFuture<T, F>
62where
63    T: Filter,
64    F: Func<T::Extract>,
65    F::Output: TryFuture + Send,
66    <F::Output as TryFuture>::Error: CombineRejection<T::Error>,
67{
68    type Output = Result<
69        (<F::Output as TryFuture>::Ok,),
70        <<F::Output as TryFuture>::Error as CombineRejection<T::Error>>::One,
71    >;
72
73    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
74        self.project().state.poll(cx)
75    }
76}
77
78impl<T, F> Future for State<T, F>
79where
80    T: TryFuture,
81    F: Func<T::Ok>,
82    F::Output: TryFuture + Send,
83    <F::Output as TryFuture>::Error: CombineRejection<T::Error>,
84{
85    type Output = Result<
86        (<F::Output as TryFuture>::Ok,),
87        <<F::Output as TryFuture>::Error as CombineRejection<T::Error>>::One,
88    >;
89
90    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
91        loop {
92            match self.as_mut().project() {
93                StateProj::First(first, second) => {
94                    let ex1 = ready!(first.try_poll(cx))?;
95                    let fut2 = second.call(ex1);
96                    self.set(State::Second(fut2));
97                }
98                StateProj::Second(second) => {
99                    let ex2 = match ready!(second.try_poll(cx)) {
100                        Ok(item) => Ok((item,)),
101                        Err(err) => Err(From::from(err)),
102                    };
103                    self.set(State::Done);
104                    return Poll::Ready(ex2);
105                }
106                StateProj::Done => panic!("polled after complete"),
107            }
108        }
109    }
110}