1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
use core::future::Future; use core::pin::Pin; use pin_project_lite::pin_project; use crate::stream::Stream; use crate::task::{Context, Poll}; pin_project! { #[derive(Clone, Debug)] #[cfg(feature = "unstable")] #[cfg_attr(feature = "docs", doc(cfg(unstable)))] pub struct UnzipFuture<S, FromA, FromB> { #[pin] stream: S, res: Option<(FromA, FromB)>, } } impl<S: Stream, FromA, FromB> UnzipFuture<S, FromA, FromB> where FromA: Default, FromB: Default, { pub(super) fn new(stream: S) -> Self { UnzipFuture { stream, res: Some((FromA::default(), FromB::default())), } } } impl<S, A, B, FromA, FromB> Future for UnzipFuture<S, FromA, FromB> where S: Stream<Item = (A, B)>, FromA: Default + Extend<A>, FromB: Default + Extend<B>, { type Output = (FromA, FromB); fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { let mut this = self.project(); loop { let next = futures_core::ready!(this.stream.as_mut().poll_next(cx)); match next { Some((a, b)) => { let res = this.res.as_mut().unwrap(); res.0.extend(Some(a)); res.1.extend(Some(b)); } None => return Poll::Ready(this.res.take().unwrap()), } } } }