async_body.rs

  1use std::{
  2    io::{Cursor, Read},
  3    pin::Pin,
  4    task::Poll,
  5};
  6
  7use bytes::Bytes;
  8use futures::AsyncRead;
  9use http_body::{Body, Frame};
 10
 11/// Based on the implementation of AsyncBody in
 12/// <https://github.com/sagebind/isahc/blob/5c533f1ef4d6bdf1fd291b5103c22110f41d0bf0/src/body/mod.rs>.
 13pub struct AsyncBody(pub Inner);
 14
 15pub enum Inner {
 16    /// An empty body.
 17    Empty,
 18
 19    /// A body stored in memory.
 20    Bytes(std::io::Cursor<Bytes>),
 21
 22    /// An asynchronous reader.
 23    AsyncReader(Pin<Box<dyn futures::AsyncRead + Send + Sync>>),
 24}
 25
 26impl AsyncBody {
 27    /// Create a new empty body.
 28    ///
 29    /// An empty body represents the *absence* of a body, which is semantically
 30    /// different than the presence of a body of zero length.
 31    pub const fn empty() -> Self {
 32        Self(Inner::Empty)
 33    }
 34    /// Create a streaming body that reads from the given reader.
 35    pub fn from_reader<R>(read: R) -> Self
 36    where
 37        R: AsyncRead + Send + Sync + 'static,
 38    {
 39        Self(Inner::AsyncReader(Box::pin(read)))
 40    }
 41
 42    pub const fn from_bytes(bytes: Bytes) -> Self {
 43        Self(Inner::Bytes(Cursor::new(bytes)))
 44    }
 45}
 46
 47impl Default for AsyncBody {
 48    fn default() -> Self {
 49        Self(Inner::Empty)
 50    }
 51}
 52
 53impl From<()> for AsyncBody {
 54    fn from(_: ()) -> Self {
 55        Self(Inner::Empty)
 56    }
 57}
 58
 59impl From<Bytes> for AsyncBody {
 60    fn from(bytes: Bytes) -> Self {
 61        Self::from_bytes(bytes)
 62    }
 63}
 64
 65impl From<Vec<u8>> for AsyncBody {
 66    fn from(body: Vec<u8>) -> Self {
 67        Self::from_bytes(body.into())
 68    }
 69}
 70
 71impl From<String> for AsyncBody {
 72    fn from(body: String) -> Self {
 73        Self::from_bytes(body.into())
 74    }
 75}
 76
 77impl From<&'static [u8]> for AsyncBody {
 78    #[inline]
 79    fn from(s: &'static [u8]) -> Self {
 80        Self::from_bytes(Bytes::from_static(s))
 81    }
 82}
 83
 84impl From<&'static str> for AsyncBody {
 85    #[inline]
 86    fn from(s: &'static str) -> Self {
 87        Self::from_bytes(Bytes::from_static(s.as_bytes()))
 88    }
 89}
 90
 91impl TryFrom<reqwest::Body> for AsyncBody {
 92    type Error = anyhow::Error;
 93
 94    fn try_from(value: reqwest::Body) -> Result<Self, Self::Error> {
 95        value
 96            .as_bytes()
 97            .ok_or_else(|| anyhow::anyhow!("Underlying data is a stream"))
 98            .map(|bytes| Self::from_bytes(Bytes::copy_from_slice(bytes)))
 99    }
100}
101
102impl<T: Into<Self>> From<Option<T>> for AsyncBody {
103    fn from(body: Option<T>) -> Self {
104        match body {
105            Some(body) => body.into(),
106            None => Self::empty(),
107        }
108    }
109}
110
111impl futures::AsyncRead for AsyncBody {
112    fn poll_read(
113        self: Pin<&mut Self>,
114        cx: &mut std::task::Context<'_>,
115        buf: &mut [u8],
116    ) -> std::task::Poll<std::io::Result<usize>> {
117        // SAFETY: Standard Enum pin projection
118        let inner = unsafe { &mut self.get_unchecked_mut().0 };
119        match inner {
120            Inner::Empty => Poll::Ready(Ok(0)),
121            // Blocking call is over an in-memory buffer
122            Inner::Bytes(cursor) => Poll::Ready(cursor.read(buf)),
123            Inner::AsyncReader(async_reader) => {
124                AsyncRead::poll_read(async_reader.as_mut(), cx, buf)
125            }
126        }
127    }
128}
129
130impl Body for AsyncBody {
131    type Data = Bytes;
132    type Error = std::io::Error;
133
134    fn poll_frame(
135        mut self: Pin<&mut Self>,
136        cx: &mut std::task::Context<'_>,
137    ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
138        let mut buffer = vec![0; 8192];
139        match AsyncRead::poll_read(self.as_mut(), cx, &mut buffer) {
140            Poll::Ready(Ok(0)) => Poll::Ready(None),
141            Poll::Ready(Ok(n)) => {
142                let data = Bytes::copy_from_slice(&buffer[..n]);
143                Poll::Ready(Some(Ok(Frame::data(data))))
144            }
145            Poll::Ready(Err(e)) => Poll::Ready(Some(Err(e))),
146            Poll::Pending => Poll::Pending,
147        }
148    }
149}