use std::borrow::Cow;
#[cfg(feature = "stream")]
use std::error::Error as StdError;
use std::fmt;
use bytes::Bytes;
use futures_channel::{mpsc, oneshot};
use futures_core::Stream; 
#[cfg(feature = "stream")]
use futures_util::TryStreamExt;
use http::HeaderMap;
use http_body::{Body as HttpBody, SizeHint};
#[cfg(feature = "stream")]
use crate::common::sync_wrapper::SyncWrapper;
use crate::common::{task, watch, Future, Never, Pin, Poll};
use crate::proto::h2::ping;
use crate::proto::DecodedLength;
use crate::upgrade::OnUpgrade;
type BodySender = mpsc::Sender<Result<Bytes, crate::Error>>;
#[must_use = "streams do nothing unless polled"]
pub struct Body {
    kind: Kind,
    
    
    extra: Option<Box<Extra>>,
}
enum Kind {
    Once(Option<Bytes>),
    Chan {
        content_length: DecodedLength,
        want_tx: watch::Sender,
        rx: mpsc::Receiver<Result<Bytes, crate::Error>>,
    },
    H2 {
        ping: ping::Recorder,
        content_length: DecodedLength,
        recv: h2::RecvStream,
    },
    #[cfg(feature = "stream")]
    Wrapped(
        SyncWrapper<
            Pin<Box<dyn Stream<Item = Result<Bytes, Box<dyn StdError + Send + Sync>>> + Send>>,
        >,
    ),
}
struct Extra {
    
    
    
    
    
    
    
    
    delayed_eof: Option<DelayEof>,
    on_upgrade: OnUpgrade,
}
type DelayEofUntil = oneshot::Receiver<Never>;
enum DelayEof {
    
    NotEof(DelayEofUntil),
    
    
    
    Eof(DelayEofUntil),
}
#[must_use = "Sender does nothing unless sent on"]
pub struct Sender {
    want_rx: watch::Receiver,
    tx: BodySender,
}
const WANT_PENDING: usize = 1;
const WANT_READY: usize = 2;
impl Body {
    
    
    
    
    
    
    
    
    
    
    #[inline]
    pub fn empty() -> Body {
        Body::new(Kind::Once(None))
    }
    
    
    
    #[inline]
    pub fn channel() -> (Sender, Body) {
        Self::new_channel(DecodedLength::CHUNKED,  false)
    }
    pub(crate) fn new_channel(content_length: DecodedLength, wanter: bool) -> (Sender, Body) {
        let (tx, rx) = mpsc::channel(0);
        
        
        let want = if wanter { WANT_PENDING } else { WANT_READY };
        let (want_tx, want_rx) = watch::channel(want);
        let tx = Sender { want_rx, tx };
        let rx = Body::new(Kind::Chan {
            content_length,
            want_tx,
            rx,
        });
        (tx, rx)
    }
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    #[cfg(feature = "stream")]
    pub fn wrap_stream<S, O, E>(stream: S) -> Body
    where
        S: Stream<Item = Result<O, E>> + Send + 'static,
        O: Into<Bytes> + 'static,
        E: Into<Box<dyn StdError + Send + Sync>> + 'static,
    {
        let mapped = stream.map_ok(Into::into).map_err(Into::into);
        Body::new(Kind::Wrapped(SyncWrapper::new(Box::pin(mapped))))
    }
    
    
    
    pub fn on_upgrade(self) -> OnUpgrade {
        self.extra
            .map(|ex| ex.on_upgrade)
            .unwrap_or_else(OnUpgrade::none)
    }
    fn new(kind: Kind) -> Body {
        Body { kind, extra: None }
    }
    pub(crate) fn h2(
        recv: h2::RecvStream,
        content_length: DecodedLength,
        ping: ping::Recorder,
    ) -> Self {
        let body = Body::new(Kind::H2 {
            ping,
            content_length,
            recv,
        });
        body
    }
    pub(crate) fn set_on_upgrade(&mut self, upgrade: OnUpgrade) {
        debug_assert!(!upgrade.is_none(), "set_on_upgrade with empty upgrade");
        let extra = self.extra_mut();
        debug_assert!(extra.on_upgrade.is_none(), "set_on_upgrade twice");
        extra.on_upgrade = upgrade;
    }
    pub(crate) fn delayed_eof(&mut self, fut: DelayEofUntil) {
        self.extra_mut().delayed_eof = Some(DelayEof::NotEof(fut));
    }
    fn take_delayed_eof(&mut self) -> Option<DelayEof> {
        self.extra
            .as_mut()
            .and_then(|extra| extra.delayed_eof.take())
    }
    fn extra_mut(&mut self) -> &mut Extra {
        self.extra.get_or_insert_with(|| {
            Box::new(Extra {
                delayed_eof: None,
                on_upgrade: OnUpgrade::none(),
            })
        })
    }
    fn poll_eof(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<crate::Result<Bytes>>> {
        match self.take_delayed_eof() {
            Some(DelayEof::NotEof(mut delay)) => match self.poll_inner(cx) {
                ok @ Poll::Ready(Some(Ok(..))) | ok @ Poll::Pending => {
                    self.extra_mut().delayed_eof = Some(DelayEof::NotEof(delay));
                    ok
                }
                Poll::Ready(None) => match Pin::new(&mut delay).poll(cx) {
                    Poll::Ready(Ok(never)) => match never {},
                    Poll::Pending => {
                        self.extra_mut().delayed_eof = Some(DelayEof::Eof(delay));
                        Poll::Pending
                    }
                    Poll::Ready(Err(_done)) => Poll::Ready(None),
                },
                Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
            },
            Some(DelayEof::Eof(mut delay)) => match Pin::new(&mut delay).poll(cx) {
                Poll::Ready(Ok(never)) => match never {},
                Poll::Pending => {
                    self.extra_mut().delayed_eof = Some(DelayEof::Eof(delay));
                    Poll::Pending
                }
                Poll::Ready(Err(_done)) => Poll::Ready(None),
            },
            None => self.poll_inner(cx),
        }
    }
    fn poll_inner(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<crate::Result<Bytes>>> {
        match self.kind {
            Kind::Once(ref mut val) => Poll::Ready(val.take().map(Ok)),
            Kind::Chan {
                content_length: ref mut len,
                ref mut rx,
                ref mut want_tx,
            } => {
                want_tx.send(WANT_READY);
                match ready!(Pin::new(rx).poll_next(cx)?) {
                    Some(chunk) => {
                        len.sub_if(chunk.len() as u64);
                        Poll::Ready(Some(Ok(chunk)))
                    }
                    None => Poll::Ready(None),
                }
            }
            Kind::H2 {
                ref ping,
                recv: ref mut h2,
                content_length: ref mut len,
            } => match ready!(h2.poll_data(cx)) {
                Some(Ok(bytes)) => {
                    let _ = h2.flow_control().release_capacity(bytes.len());
                    len.sub_if(bytes.len() as u64);
                    ping.record_data(bytes.len());
                    Poll::Ready(Some(Ok(bytes)))
                }
                Some(Err(e)) => Poll::Ready(Some(Err(crate::Error::new_body(e)))),
                None => Poll::Ready(None),
            },
            #[cfg(feature = "stream")]
            Kind::Wrapped(ref mut s) => match ready!(s.get_mut().as_mut().poll_next(cx)) {
                Some(res) => Poll::Ready(Some(res.map_err(crate::Error::new_body))),
                None => Poll::Ready(None),
            },
        }
    }
    pub(super) fn take_full_data(&mut self) -> Option<Bytes> {
        if let Kind::Once(ref mut chunk) = self.kind {
            chunk.take()
        } else {
            None
        }
    }
}
impl Default for Body {
    
    #[inline]
    fn default() -> Body {
        Body::empty()
    }
}
impl HttpBody for Body {
    type Data = Bytes;
    type Error = crate::Error;
    fn poll_data(
        mut self: Pin<&mut Self>,
        cx: &mut task::Context<'_>,
    ) -> Poll<Option<Result<Self::Data, Self::Error>>> {
        self.poll_eof(cx)
    }
    fn poll_trailers(
        mut self: Pin<&mut Self>,
        cx: &mut task::Context<'_>,
    ) -> Poll<Result<Option<HeaderMap>, Self::Error>> {
        match self.kind {
            Kind::H2 {
                recv: ref mut h2,
                ref ping,
                ..
            } => match ready!(h2.poll_trailers(cx)) {
                Ok(t) => {
                    ping.record_non_data();
                    Poll::Ready(Ok(t))
                }
                Err(e) => Poll::Ready(Err(crate::Error::new_h2(e))),
            },
            _ => Poll::Ready(Ok(None)),
        }
    }
    fn is_end_stream(&self) -> bool {
        match self.kind {
            Kind::Once(ref val) => val.is_none(),
            Kind::Chan { content_length, .. } => content_length == DecodedLength::ZERO,
            Kind::H2 { recv: ref h2, .. } => h2.is_end_stream(),
            #[cfg(feature = "stream")]
            Kind::Wrapped(..) => false,
        }
    }
    fn size_hint(&self) -> SizeHint {
        match self.kind {
            Kind::Once(Some(ref val)) => SizeHint::with_exact(val.len() as u64),
            Kind::Once(None) => SizeHint::with_exact(0),
            #[cfg(feature = "stream")]
            Kind::Wrapped(..) => SizeHint::default(),
            Kind::Chan { content_length, .. } | Kind::H2 { content_length, .. } => {
                let mut hint = SizeHint::default();
                if let Some(content_length) = content_length.into_opt() {
                    hint.set_exact(content_length);
                }
                hint
            }
        }
    }
}
impl fmt::Debug for Body {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        #[derive(Debug)]
        struct Streaming;
        #[derive(Debug)]
        struct Empty;
        #[derive(Debug)]
        struct Full<'a>(&'a Bytes);
        let mut builder = f.debug_tuple("Body");
        match self.kind {
            Kind::Once(None) => builder.field(&Empty),
            Kind::Once(Some(ref chunk)) => builder.field(&Full(chunk)),
            _ => builder.field(&Streaming),
        };
        builder.finish()
    }
}
#[cfg(feature = "stream")]
impl Stream for Body {
    type Item = crate::Result<Bytes>;
    fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
        HttpBody::poll_data(self, cx)
    }
}
#[cfg(feature = "stream")]
impl From<Box<dyn Stream<Item = Result<Bytes, Box<dyn StdError + Send + Sync>>> + Send>> for Body {
    #[inline]
    fn from(
        stream: Box<dyn Stream<Item = Result<Bytes, Box<dyn StdError + Send + Sync>>> + Send>,
    ) -> Body {
        Body::new(Kind::Wrapped(SyncWrapper::new(stream.into())))
    }
}
impl From<Bytes> for Body {
    #[inline]
    fn from(chunk: Bytes) -> Body {
        if chunk.is_empty() {
            Body::empty()
        } else {
            Body::new(Kind::Once(Some(chunk)))
        }
    }
}
impl From<Vec<u8>> for Body {
    #[inline]
    fn from(vec: Vec<u8>) -> Body {
        Body::from(Bytes::from(vec))
    }
}
impl From<&'static [u8]> for Body {
    #[inline]
    fn from(slice: &'static [u8]) -> Body {
        Body::from(Bytes::from(slice))
    }
}
impl From<Cow<'static, [u8]>> for Body {
    #[inline]
    fn from(cow: Cow<'static, [u8]>) -> Body {
        match cow {
            Cow::Borrowed(b) => Body::from(b),
            Cow::Owned(o) => Body::from(o),
        }
    }
}
impl From<String> for Body {
    #[inline]
    fn from(s: String) -> Body {
        Body::from(Bytes::from(s.into_bytes()))
    }
}
impl From<&'static str> for Body {
    #[inline]
    fn from(slice: &'static str) -> Body {
        Body::from(Bytes::from(slice.as_bytes()))
    }
}
impl From<Cow<'static, str>> for Body {
    #[inline]
    fn from(cow: Cow<'static, str>) -> Body {
        match cow {
            Cow::Borrowed(b) => Body::from(b),
            Cow::Owned(o) => Body::from(o),
        }
    }
}
impl Sender {
    
    pub fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
        
        ready!(self.poll_want(cx)?);
        self.tx
            .poll_ready(cx)
            .map_err(|_| crate::Error::new_closed())
    }
    fn poll_want(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
        match self.want_rx.load(cx) {
            WANT_READY => Poll::Ready(Ok(())),
            WANT_PENDING => Poll::Pending,
            watch::CLOSED => Poll::Ready(Err(crate::Error::new_closed())),
            unexpected => unreachable!("want_rx value: {}", unexpected),
        }
    }
    async fn ready(&mut self) -> crate::Result<()> {
        futures_util::future::poll_fn(|cx| self.poll_ready(cx)).await
    }
    
    pub async fn send_data(&mut self, chunk: Bytes) -> crate::Result<()> {
        self.ready().await?;
        self.tx
            .try_send(Ok(chunk))
            .map_err(|_| crate::Error::new_closed())
    }
    
    
    
    
    
    
    
    
    
    
    
    
    pub fn try_send_data(&mut self, chunk: Bytes) -> Result<(), Bytes> {
        self.tx
            .try_send(Ok(chunk))
            .map_err(|err| err.into_inner().expect("just sent Ok"))
    }
    
    pub fn abort(self) {
        let _ = self
            .tx
            
            .clone()
            .try_send(Err(crate::Error::new_body_write_aborted()));
    }
    pub(crate) fn send_error(&mut self, err: crate::Error) {
        let _ = self.tx.try_send(Err(err));
    }
}
impl fmt::Debug for Sender {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        #[derive(Debug)]
        struct Open;
        #[derive(Debug)]
        struct Closed;
        let mut builder = f.debug_tuple("Sender");
        match self.want_rx.peek() {
            watch::CLOSED => builder.field(&Closed),
            _ => builder.field(&Open),
        };
        builder.finish()
    }
}
#[cfg(test)]
mod tests {
    use std::mem;
    use std::task::Poll;
    use super::{Body, DecodedLength, HttpBody, Sender, SizeHint};
    #[test]
    fn test_size_of() {
        
        
        let body_size = mem::size_of::<Body>();
        let body_expected_size = mem::size_of::<u64>() * 6;
        assert!(
            body_size <= body_expected_size,
            "Body size = {} <= {}",
            body_size,
            body_expected_size,
        );
        assert_eq!(body_size, mem::size_of::<Option<Body>>(), "Option<Body>");
        assert_eq!(
            mem::size_of::<Sender>(),
            mem::size_of::<usize>() * 4,
            "Sender"
        );
        assert_eq!(
            mem::size_of::<Sender>(),
            mem::size_of::<Option<Sender>>(),
            "Option<Sender>"
        );
    }
    #[test]
    fn size_hint() {
        fn eq(body: Body, b: SizeHint, note: &str) {
            let a = body.size_hint();
            assert_eq!(a.lower(), b.lower(), "lower for {:?}", note);
            assert_eq!(a.upper(), b.upper(), "upper for {:?}", note);
        }
        eq(Body::from("Hello"), SizeHint::with_exact(5), "from str");
        eq(Body::empty(), SizeHint::with_exact(0), "empty");
        eq(Body::channel().1, SizeHint::new(), "channel");
        eq(
            Body::new_channel(DecodedLength::new(4),  false).1,
            SizeHint::with_exact(4),
            "channel with length",
        );
    }
    #[tokio::test]
    async fn channel_abort() {
        let (tx, mut rx) = Body::channel();
        tx.abort();
        let err = rx.data().await.unwrap().unwrap_err();
        assert!(err.is_body_write_aborted(), "{:?}", err);
    }
    #[tokio::test]
    async fn channel_abort_when_buffer_is_full() {
        let (mut tx, mut rx) = Body::channel();
        tx.try_send_data("chunk 1".into()).expect("send 1");
        
        tx.abort();
        let chunk1 = rx.data().await.expect("item 1").expect("chunk 1");
        assert_eq!(chunk1, "chunk 1");
        let err = rx.data().await.unwrap().unwrap_err();
        assert!(err.is_body_write_aborted(), "{:?}", err);
    }
    #[test]
    fn channel_buffers_one() {
        let (mut tx, _rx) = Body::channel();
        tx.try_send_data("chunk 1".into()).expect("send 1");
        
        let chunk2 = tx.try_send_data("chunk 2".into()).expect_err("send 2");
        assert_eq!(chunk2, "chunk 2");
    }
    #[tokio::test]
    async fn channel_empty() {
        let (_, mut rx) = Body::channel();
        assert!(rx.data().await.is_none());
    }
    #[test]
    fn channel_ready() {
        let (mut tx, _rx) = Body::new_channel(DecodedLength::CHUNKED,  false);
        let mut tx_ready = tokio_test::task::spawn(tx.ready());
        assert!(tx_ready.poll().is_ready(), "tx is ready immediately");
    }
    #[test]
    fn channel_wanter() {
        let (mut tx, mut rx) = Body::new_channel(DecodedLength::CHUNKED,  true);
        let mut tx_ready = tokio_test::task::spawn(tx.ready());
        let mut rx_data = tokio_test::task::spawn(rx.data());
        assert!(
            tx_ready.poll().is_pending(),
            "tx isn't ready before rx has been polled"
        );
        assert!(rx_data.poll().is_pending(), "poll rx.data");
        assert!(tx_ready.is_woken(), "rx poll wakes tx");
        assert!(
            tx_ready.poll().is_ready(),
            "tx is ready after rx has been polled"
        );
    }
    #[test]
    fn channel_notices_closure() {
        let (mut tx, rx) = Body::new_channel(DecodedLength::CHUNKED,  true);
        let mut tx_ready = tokio_test::task::spawn(tx.ready());
        assert!(
            tx_ready.poll().is_pending(),
            "tx isn't ready before rx has been polled"
        );
        drop(rx);
        assert!(tx_ready.is_woken(), "dropping rx wakes tx");
        match tx_ready.poll() {
            Poll::Ready(Err(ref e)) if e.is_closed() => (),
            unexpected => panic!("tx poll ready unexpected: {:?}", unexpected),
        }
    }
}