async_body.rs

  1use std::{borrow::Cow, io::Read, pin::Pin, task::Poll};
  2
  3use futures::{AsyncRead, AsyncReadExt};
  4
  5/// Based on the implementation of AsyncBody in
  6/// https://github.com/sagebind/isahc/blob/5c533f1ef4d6bdf1fd291b5103c22110f41d0bf0/src/body/mod.rs
  7pub struct AsyncBody(pub Inner);
  8
  9pub enum Inner {
 10    /// An empty body.
 11    Empty,
 12
 13    /// A body stored in memory.
 14    SyncReader(std::io::Cursor<Cow<'static, [u8]>>),
 15
 16    /// An asynchronous reader.
 17    AsyncReader(Pin<Box<dyn futures::AsyncRead + Send + Sync>>),
 18}
 19
 20impl AsyncBody {
 21    /// Create a new empty body.
 22    ///
 23    /// An empty body represents the *absence* of a body, which is semantically
 24    /// different than the presence of a body of zero length.
 25    pub fn empty() -> Self {
 26        Self(Inner::Empty)
 27    }
 28    /// Create a streaming body that reads from the given reader.
 29    pub fn from_reader<R>(read: R) -> Self
 30    where
 31        R: AsyncRead + Send + Sync + 'static,
 32    {
 33        Self(Inner::AsyncReader(Box::pin(read)))
 34    }
 35}
 36
 37impl Default for AsyncBody {
 38    fn default() -> Self {
 39        Self(Inner::Empty)
 40    }
 41}
 42
 43impl From<()> for AsyncBody {
 44    fn from(_: ()) -> Self {
 45        Self(Inner::Empty)
 46    }
 47}
 48
 49impl From<Vec<u8>> for AsyncBody {
 50    fn from(body: Vec<u8>) -> Self {
 51        Self(Inner::SyncReader(std::io::Cursor::new(Cow::Owned(body))))
 52    }
 53}
 54
 55impl From<&'_ [u8]> for AsyncBody {
 56    fn from(body: &[u8]) -> Self {
 57        body.to_vec().into()
 58    }
 59}
 60
 61impl From<String> for AsyncBody {
 62    fn from(body: String) -> Self {
 63        body.into_bytes().into()
 64    }
 65}
 66
 67impl From<&'_ str> for AsyncBody {
 68    fn from(body: &str) -> Self {
 69        body.as_bytes().into()
 70    }
 71}
 72
 73impl<T: Into<Self>> From<Option<T>> for AsyncBody {
 74    fn from(body: Option<T>) -> Self {
 75        match body {
 76            Some(body) => body.into(),
 77            None => Self(Inner::Empty),
 78        }
 79    }
 80}
 81
 82impl std::io::Read for AsyncBody {
 83    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
 84        match &mut self.0 {
 85            Inner::Empty => Ok(0),
 86            Inner::SyncReader(cursor) => cursor.read(buf),
 87            Inner::AsyncReader(async_reader) => smol::block_on(async_reader.read(buf)),
 88        }
 89    }
 90}
 91
 92impl futures::AsyncRead for AsyncBody {
 93    fn poll_read(
 94        self: Pin<&mut Self>,
 95        cx: &mut std::task::Context<'_>,
 96        buf: &mut [u8],
 97    ) -> std::task::Poll<std::io::Result<usize>> {
 98        // SAFETY: Standard Enum pin projection
 99        let inner = unsafe { &mut self.get_unchecked_mut().0 };
100        match inner {
101            Inner::Empty => Poll::Ready(Ok(0)),
102            // Blocking call is over an in-memory buffer
103            Inner::SyncReader(cursor) => Poll::Ready(cursor.read(buf)),
104            Inner::AsyncReader(async_reader) => {
105                AsyncRead::poll_read(async_reader.as_mut(), cx, buf)
106            }
107        }
108    }
109}