futures_util/stream/stream/
peek.rs1use crate::fns::FnOnce1;
2use crate::stream::{Fuse, StreamExt};
3use core::fmt;
4use core::marker::PhantomData;
5use core::pin::Pin;
6use futures_core::future::{FusedFuture, Future};
7use futures_core::ready;
8use futures_core::stream::{FusedStream, Stream};
9use futures_core::task::{Context, Poll};
10#[cfg(feature = "sink")]
11use futures_sink::Sink;
12use pin_project_lite::pin_project;
13
14pin_project! {
15 #[derive(Debug)]
21 #[must_use = "streams do nothing unless polled"]
22 pub struct Peekable<St: Stream> {
23 #[pin]
24 stream: Fuse<St>,
25 peeked: Option<St::Item>,
26 }
27}
28
29impl<St: Stream> Peekable<St> {
30 pub(super) fn new(stream: St) -> Self {
31 Self { stream: stream.fuse(), peeked: None }
32 }
33
34 delegate_access_inner!(stream, St, (.));
35
36 pub fn peek(self: Pin<&mut Self>) -> Peek<'_, St> {
39 Peek { inner: Some(self) }
40 }
41
42 pub fn poll_peek(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<&St::Item>> {
47 let mut this = self.project();
48
49 Poll::Ready(loop {
50 if this.peeked.is_some() {
51 break this.peeked.as_ref();
52 } else if let Some(item) = ready!(this.stream.as_mut().poll_next(cx)) {
53 *this.peeked = Some(item);
54 } else {
55 break None;
56 }
57 })
58 }
59
60 pub fn peek_mut(self: Pin<&mut Self>) -> PeekMut<'_, St> {
89 PeekMut { inner: Some(self) }
90 }
91
92 pub fn poll_peek_mut(
94 self: Pin<&mut Self>,
95 cx: &mut Context<'_>,
96 ) -> Poll<Option<&mut St::Item>> {
97 let mut this = self.project();
98
99 Poll::Ready(loop {
100 if this.peeked.is_some() {
101 break this.peeked.as_mut();
102 } else if let Some(item) = ready!(this.stream.as_mut().poll_next(cx)) {
103 *this.peeked = Some(item);
104 } else {
105 break None;
106 }
107 })
108 }
109
110 pub fn next_if<F>(self: Pin<&mut Self>, func: F) -> NextIf<'_, St, F>
156 where
157 F: FnOnce(&St::Item) -> bool,
158 {
159 NextIf { inner: Some((self, func)) }
160 }
161
162 pub fn next_if_eq<'a, T>(self: Pin<&'a mut Self>, expected: &'a T) -> NextIfEq<'a, St, T>
187 where
188 T: ?Sized,
189 St::Item: PartialEq<T>,
190 {
191 NextIfEq {
192 inner: NextIf { inner: Some((self, NextIfEqFn { expected, _next: PhantomData })) },
193 }
194 }
195}
196
197impl<St: Stream> FusedStream for Peekable<St> {
198 fn is_terminated(&self) -> bool {
199 self.peeked.is_none() && self.stream.is_terminated()
200 }
201}
202
203impl<S: Stream> Stream for Peekable<S> {
204 type Item = S::Item;
205
206 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
207 let this = self.project();
208 if let Some(item) = this.peeked.take() {
209 return Poll::Ready(Some(item));
210 }
211 this.stream.poll_next(cx)
212 }
213
214 fn size_hint(&self) -> (usize, Option<usize>) {
215 let peek_len = usize::from(self.peeked.is_some());
216 let (lower, upper) = self.stream.size_hint();
217 let lower = lower.saturating_add(peek_len);
218 let upper = match upper {
219 Some(x) => x.checked_add(peek_len),
220 None => None,
221 };
222 (lower, upper)
223 }
224}
225
226#[cfg(feature = "sink")]
228impl<S, Item> Sink<Item> for Peekable<S>
229where
230 S: Sink<Item> + Stream,
231{
232 type Error = S::Error;
233
234 delegate_sink!(stream, Item);
235}
236
237pin_project! {
238 #[must_use = "futures do nothing unless polled"]
240 pub struct Peek<'a, St: Stream> {
241 inner: Option<Pin<&'a mut Peekable<St>>>,
242 }
243}
244
245impl<St> fmt::Debug for Peek<'_, St>
246where
247 St: Stream + fmt::Debug,
248 St::Item: fmt::Debug,
249{
250 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
251 f.debug_struct("Peek").field("inner", &self.inner).finish()
252 }
253}
254
255impl<St: Stream> FusedFuture for Peek<'_, St> {
256 fn is_terminated(&self) -> bool {
257 self.inner.is_none()
258 }
259}
260
261impl<'a, St> Future for Peek<'a, St>
262where
263 St: Stream,
264{
265 type Output = Option<&'a St::Item>;
266
267 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
268 let inner = self.project().inner;
269 if let Some(peekable) = inner {
270 ready!(peekable.as_mut().poll_peek(cx));
271
272 inner.take().unwrap().poll_peek(cx)
273 } else {
274 panic!("Peek polled after completion")
275 }
276 }
277}
278
279pin_project! {
280 #[must_use = "futures do nothing unless polled"]
282 pub struct PeekMut<'a, St: Stream> {
283 inner: Option<Pin<&'a mut Peekable<St>>>,
284 }
285}
286
287impl<St> fmt::Debug for PeekMut<'_, St>
288where
289 St: Stream + fmt::Debug,
290 St::Item: fmt::Debug,
291{
292 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
293 f.debug_struct("PeekMut").field("inner", &self.inner).finish()
294 }
295}
296
297impl<St: Stream> FusedFuture for PeekMut<'_, St> {
298 fn is_terminated(&self) -> bool {
299 self.inner.is_none()
300 }
301}
302
303impl<'a, St> Future for PeekMut<'a, St>
304where
305 St: Stream,
306{
307 type Output = Option<&'a mut St::Item>;
308
309 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
310 let inner = self.project().inner;
311 if let Some(peekable) = inner {
312 ready!(peekable.as_mut().poll_peek_mut(cx));
313
314 inner.take().unwrap().poll_peek_mut(cx)
315 } else {
316 panic!("PeekMut polled after completion")
317 }
318 }
319}
320
321pin_project! {
322 #[must_use = "futures do nothing unless polled"]
324 pub struct NextIf<'a, St: Stream, F> {
325 inner: Option<(Pin<&'a mut Peekable<St>>, F)>,
326 }
327}
328
329impl<St, F> fmt::Debug for NextIf<'_, St, F>
330where
331 St: Stream + fmt::Debug,
332 St::Item: fmt::Debug,
333{
334 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
335 f.debug_struct("NextIf").field("inner", &self.inner.as_ref().map(|(s, _f)| s)).finish()
336 }
337}
338
339#[allow(single_use_lifetimes)] impl<St, F> FusedFuture for NextIf<'_, St, F>
341where
342 St: Stream,
343 F: for<'a> FnOnce1<&'a St::Item, Output = bool>,
344{
345 fn is_terminated(&self) -> bool {
346 self.inner.is_none()
347 }
348}
349
350#[allow(single_use_lifetimes)] impl<St, F> Future for NextIf<'_, St, F>
352where
353 St: Stream,
354 F: for<'a> FnOnce1<&'a St::Item, Output = bool>,
355{
356 type Output = Option<St::Item>;
357
358 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
359 let inner = self.project().inner;
360 if let Some((peekable, _)) = inner {
361 let res = ready!(peekable.as_mut().poll_next(cx));
362
363 let (peekable, func) = inner.take().unwrap();
364 match res {
365 Some(ref matched) if func.call_once(matched) => Poll::Ready(res),
366 other => {
367 let peekable = peekable.project();
368 assert!(peekable.peeked.is_none());
370 *peekable.peeked = other;
371 Poll::Ready(None)
372 }
373 }
374 } else {
375 panic!("NextIf polled after completion")
376 }
377 }
378}
379
380pin_project! {
381 #[must_use = "futures do nothing unless polled"]
383 pub struct NextIfEq<'a, St: Stream, T: ?Sized> {
384 #[pin]
385 inner: NextIf<'a, St, NextIfEqFn<'a, T, St::Item>>,
386 }
387}
388
389impl<St, T> fmt::Debug for NextIfEq<'_, St, T>
390where
391 St: Stream + fmt::Debug,
392 St::Item: fmt::Debug,
393 T: ?Sized,
394{
395 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
396 f.debug_struct("NextIfEq")
397 .field("inner", &self.inner.inner.as_ref().map(|(s, _f)| s))
398 .finish()
399 }
400}
401
402impl<St, T> FusedFuture for NextIfEq<'_, St, T>
403where
404 St: Stream,
405 T: ?Sized,
406 St::Item: PartialEq<T>,
407{
408 fn is_terminated(&self) -> bool {
409 self.inner.is_terminated()
410 }
411}
412
413impl<St, T> Future for NextIfEq<'_, St, T>
414where
415 St: Stream,
416 T: ?Sized,
417 St::Item: PartialEq<T>,
418{
419 type Output = Option<St::Item>;
420
421 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
422 self.project().inner.poll(cx)
423 }
424}
425
426struct NextIfEqFn<'a, T: ?Sized, Item> {
427 expected: &'a T,
428 _next: PhantomData<Item>,
429}
430
431impl<T, Item> FnOnce1<&Item> for NextIfEqFn<'_, T, Item>
432where
433 T: ?Sized,
434 Item: PartialEq<T>,
435{
436 type Output = bool;
437
438 fn call_once(self, next: &Item) -> Self::Output {
439 next == self.expected
440 }
441}