futures_util/sink/
unfold.rs1use super::assert_sink;
2use crate::unfold_state::UnfoldState;
3use core::{future::Future, pin::Pin};
4use futures_core::ready;
5use futures_core::task::{Context, Poll};
6use futures_sink::Sink;
7use pin_project_lite::pin_project;
8
9pin_project! {
10 #[derive(Debug)]
12 #[must_use = "sinks do nothing unless polled"]
13 pub struct Unfold<T, F, R> {
14 function: F,
15 #[pin]
16 state: UnfoldState<T, R>,
17 }
18}
19
20pub fn unfold<T, F, R, Item, E>(init: T, function: F) -> Unfold<T, F, R>
43where
44 F: FnMut(T, Item) -> R,
45 R: Future<Output = Result<T, E>>,
46{
47 assert_sink::<Item, E, _>(Unfold { function, state: UnfoldState::Value { value: init } })
48}
49
50impl<T, F, R, Item, E> Sink<Item> for Unfold<T, F, R>
51where
52 F: FnMut(T, Item) -> R,
53 R: Future<Output = Result<T, E>>,
54{
55 type Error = E;
56
57 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
58 self.poll_flush(cx)
59 }
60
61 fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
62 let mut this = self.project();
63 let future = match this.state.as_mut().take_value() {
64 Some(value) => (this.function)(value, item),
65 None => panic!("start_send called without poll_ready being called first"),
66 };
67 this.state.set(UnfoldState::Future { future });
68 Ok(())
69 }
70
71 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
72 let mut this = self.project();
73 Poll::Ready(if let Some(future) = this.state.as_mut().project_future() {
74 match ready!(future.poll(cx)) {
75 Ok(state) => {
76 this.state.set(UnfoldState::Value { value: state });
77 Ok(())
78 }
79 Err(err) => {
80 this.state.set(UnfoldState::Empty);
81 Err(err)
82 }
83 }
84 } else {
85 Ok(())
86 })
87 }
88
89 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
90 self.poll_flush(cx)
91 }
92}