futures_util/sink/
unfold.rs

1use super::assert_sink;
2use crate::unfold_state::UnfoldState;
3use core::{future::Future, pin::Pin};
4use futures_core::ready;
5use futures_core::task::{Context, Poll};
6use futures_sink::Sink;
7use pin_project_lite::pin_project;
8
9pin_project! {
10    /// Sink for the [`unfold`] function.
11    #[derive(Debug)]
12    #[must_use = "sinks do nothing unless polled"]
13    pub struct Unfold<T, F, R> {
14        function: F,
15        #[pin]
16        state: UnfoldState<T, R>,
17    }
18}
19
20/// Create a sink from a function which processes one item at a time.
21///
22/// # Examples
23///
24/// ```
25/// # futures::executor::block_on(async {
26/// use core::pin::pin;
27///
28/// use futures::sink;
29/// use futures::sink::SinkExt;
30///
31/// let unfold = sink::unfold(0, |mut sum, i: i32| {
32///     async move {
33///         sum += i;
34///         eprintln!("{}", i);
35///         Ok::<_, futures::never::Never>(sum)
36///     }
37/// });
38/// let mut unfold = pin!(unfold);
39/// unfold.send(5).await?;
40/// # Ok::<(), futures::never::Never>(()) }).unwrap();
41/// ```
42pub fn unfold<T, F, R, Item, E>(init: T, function: F) -> Unfold<T, F, R>
43where
44    F: FnMut(T, Item) -> R,
45    R: Future<Output = Result<T, E>>,
46{
47    assert_sink::<Item, E, _>(Unfold { function, state: UnfoldState::Value { value: init } })
48}
49
50impl<T, F, R, Item, E> Sink<Item> for Unfold<T, F, R>
51where
52    F: FnMut(T, Item) -> R,
53    R: Future<Output = Result<T, E>>,
54{
55    type Error = E;
56
57    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
58        self.poll_flush(cx)
59    }
60
61    fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
62        let mut this = self.project();
63        let future = match this.state.as_mut().take_value() {
64            Some(value) => (this.function)(value, item),
65            None => panic!("start_send called without poll_ready being called first"),
66        };
67        this.state.set(UnfoldState::Future { future });
68        Ok(())
69    }
70
71    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
72        let mut this = self.project();
73        Poll::Ready(if let Some(future) = this.state.as_mut().project_future() {
74            match ready!(future.poll(cx)) {
75                Ok(state) => {
76                    this.state.set(UnfoldState::Value { value: state });
77                    Ok(())
78                }
79                Err(err) => {
80                    this.state.set(UnfoldState::Empty);
81                    Err(err)
82                }
83            }
84        } else {
85            Ok(())
86        })
87    }
88
89    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
90        self.poll_flush(cx)
91    }
92}