warp/filter/
then.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use futures_util::{ready, TryFuture};
6use pin_project::pin_project;
7
8use super::{Filter, FilterBase, Func, Internal};
9
10#[derive(Clone, Copy, Debug)]
11pub struct Then<T, F> {
12    pub(super) filter: T,
13    pub(super) callback: F,
14}
15
16impl<T, F> FilterBase for Then<T, F>
17where
18    T: Filter,
19    F: Func<T::Extract> + Clone + Send,
20    F::Output: Future + Send,
21{
22    type Extract = (<F::Output as Future>::Output,);
23    type Error = T::Error;
24    type Future = ThenFuture<T, F>;
25    #[inline]
26    fn filter(&self, _: Internal) -> Self::Future {
27        ThenFuture {
28            state: State::First(self.filter.filter(Internal), self.callback.clone()),
29        }
30    }
31}
32
33#[allow(missing_debug_implementations)]
34#[pin_project]
35pub struct ThenFuture<T, F>
36where
37    T: Filter,
38    F: Func<T::Extract>,
39    F::Output: Future + Send,
40{
41    #[pin]
42    state: State<T::Future, F>,
43}
44
45#[pin_project(project = StateProj)]
46enum State<T, F>
47where
48    T: TryFuture,
49    F: Func<T::Ok>,
50    F::Output: Future + Send,
51{
52    First(#[pin] T, F),
53    Second(#[pin] F::Output),
54    Done,
55}
56
57impl<T, F> Future for ThenFuture<T, F>
58where
59    T: Filter,
60    F: Func<T::Extract>,
61    F::Output: Future + Send,
62{
63    type Output = Result<(<F::Output as Future>::Output,), T::Error>;
64
65    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
66        self.project().state.poll(cx)
67    }
68}
69
70impl<T, F> Future for State<T, F>
71where
72    T: TryFuture,
73    F: Func<T::Ok>,
74    F::Output: Future + Send,
75{
76    type Output = Result<(<F::Output as Future>::Output,), T::Error>;
77
78    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
79        loop {
80            match self.as_mut().project() {
81                StateProj::First(first, second) => {
82                    let ex1 = ready!(first.try_poll(cx))?;
83                    let fut2 = second.call(ex1);
84                    self.set(State::Second(fut2));
85                }
86                StateProj::Second(second) => {
87                    let ex2 = (ready!(second.poll(cx)),);
88                    self.set(State::Done);
89                    return Poll::Ready(Ok(ex2));
90                }
91                StateProj::Done => panic!("polled after complete"),
92            }
93        }
94    }
95}