async_body.rs

  1use std::{
  2    io::{Cursor, Read},
  3    pin::Pin,
  4    task::Poll,
  5};
  6
  7use bytes::Bytes;
  8use futures::AsyncRead;
  9
 10/// Based on the implementation of AsyncBody in
 11/// <https://github.com/sagebind/isahc/blob/5c533f1ef4d6bdf1fd291b5103c22110f41d0bf0/src/body/mod.rs>.
 12pub struct AsyncBody(pub Inner);
 13
 14pub enum Inner {
 15    /// An empty body.
 16    Empty,
 17
 18    /// A body stored in memory.
 19    Bytes(std::io::Cursor<Bytes>),
 20
 21    /// An asynchronous reader.
 22    AsyncReader(Pin<Box<dyn futures::AsyncRead + Send + Sync>>),
 23}
 24
 25impl AsyncBody {
 26    /// Create a new empty body.
 27    ///
 28    /// An empty body represents the *absence* of a body, which is semantically
 29    /// different than the presence of a body of zero length.
 30    pub fn empty() -> Self {
 31        Self(Inner::Empty)
 32    }
 33    /// Create a streaming body that reads from the given reader.
 34    pub fn from_reader<R>(read: R) -> Self
 35    where
 36        R: AsyncRead + Send + Sync + 'static,
 37    {
 38        Self(Inner::AsyncReader(Box::pin(read)))
 39    }
 40
 41    pub fn from_bytes(bytes: Bytes) -> Self {
 42        Self(Inner::Bytes(Cursor::new(bytes.clone())))
 43    }
 44}
 45
 46impl Default for AsyncBody {
 47    fn default() -> Self {
 48        Self(Inner::Empty)
 49    }
 50}
 51
 52impl From<()> for AsyncBody {
 53    fn from(_: ()) -> Self {
 54        Self(Inner::Empty)
 55    }
 56}
 57
 58impl From<Bytes> for AsyncBody {
 59    fn from(bytes: Bytes) -> Self {
 60        Self::from_bytes(bytes)
 61    }
 62}
 63
 64impl From<Vec<u8>> for AsyncBody {
 65    fn from(body: Vec<u8>) -> Self {
 66        Self::from_bytes(body.into())
 67    }
 68}
 69
 70impl From<String> for AsyncBody {
 71    fn from(body: String) -> Self {
 72        Self::from_bytes(body.into())
 73    }
 74}
 75
 76impl From<&'static [u8]> for AsyncBody {
 77    #[inline]
 78    fn from(s: &'static [u8]) -> Self {
 79        Self::from_bytes(Bytes::from_static(s))
 80    }
 81}
 82
 83impl From<&'static str> for AsyncBody {
 84    #[inline]
 85    fn from(s: &'static str) -> Self {
 86        Self::from_bytes(Bytes::from_static(s.as_bytes()))
 87    }
 88}
 89
 90impl<T: Into<Self>> From<Option<T>> for AsyncBody {
 91    fn from(body: Option<T>) -> Self {
 92        match body {
 93            Some(body) => body.into(),
 94            None => Self::empty(),
 95        }
 96    }
 97}
 98
 99impl futures::AsyncRead for AsyncBody {
100    fn poll_read(
101        self: Pin<&mut Self>,
102        cx: &mut std::task::Context<'_>,
103        buf: &mut [u8],
104    ) -> std::task::Poll<std::io::Result<usize>> {
105        // SAFETY: Standard Enum pin projection
106        let inner = unsafe { &mut self.get_unchecked_mut().0 };
107        match inner {
108            Inner::Empty => Poll::Ready(Ok(0)),
109            // Blocking call is over an in-memory buffer
110            Inner::Bytes(cursor) => Poll::Ready(cursor.read(buf)),
111            Inner::AsyncReader(async_reader) => {
112                AsyncRead::poll_read(async_reader.as_mut(), cx, buf)
113            }
114        }
115    }
116}