async_body.rs

  1use std::{
  2    io::{Cursor, Read},
  3    pin::Pin,
  4    task::Poll,
  5};
  6
  7use bytes::Bytes;
  8use futures::AsyncRead;
  9use http_body::{Body, Frame};
 10use serde::Serialize;
 11
 12/// Based on the implementation of AsyncBody in
 13/// <https://github.com/sagebind/isahc/blob/5c533f1ef4d6bdf1fd291b5103c22110f41d0bf0/src/body/mod.rs>.
 14pub struct AsyncBody(pub Inner);
 15
 16pub enum Inner {
 17    /// An empty body.
 18    Empty,
 19
 20    /// A body stored in memory.
 21    Bytes(std::io::Cursor<Bytes>),
 22
 23    /// An asynchronous reader.
 24    AsyncReader(Pin<Box<dyn futures::AsyncRead + Send + Sync>>),
 25}
 26
 27impl AsyncBody {
 28    /// Create a new empty body.
 29    ///
 30    /// An empty body represents the *absence* of a body, which is semantically
 31    /// different than the presence of a body of zero length.
 32    pub fn empty() -> Self {
 33        Self(Inner::Empty)
 34    }
 35    /// Create a streaming body that reads from the given reader.
 36    pub fn from_reader<R>(read: R) -> Self
 37    where
 38        R: AsyncRead + Send + Sync + 'static,
 39    {
 40        Self(Inner::AsyncReader(Box::pin(read)))
 41    }
 42
 43    pub fn from_bytes(bytes: Bytes) -> Self {
 44        Self(Inner::Bytes(Cursor::new(bytes)))
 45    }
 46}
 47
 48impl Default for AsyncBody {
 49    fn default() -> Self {
 50        Self(Inner::Empty)
 51    }
 52}
 53
 54impl From<()> for AsyncBody {
 55    fn from(_: ()) -> Self {
 56        Self(Inner::Empty)
 57    }
 58}
 59
 60impl From<Bytes> for AsyncBody {
 61    fn from(bytes: Bytes) -> Self {
 62        Self::from_bytes(bytes)
 63    }
 64}
 65
 66impl From<Vec<u8>> for AsyncBody {
 67    fn from(body: Vec<u8>) -> Self {
 68        Self::from_bytes(body.into())
 69    }
 70}
 71
 72impl From<String> for AsyncBody {
 73    fn from(body: String) -> Self {
 74        Self::from_bytes(body.into())
 75    }
 76}
 77
 78impl From<&'static [u8]> for AsyncBody {
 79    #[inline]
 80    fn from(s: &'static [u8]) -> Self {
 81        Self::from_bytes(Bytes::from_static(s))
 82    }
 83}
 84
 85impl From<&'static str> for AsyncBody {
 86    #[inline]
 87    fn from(s: &'static str) -> Self {
 88        Self::from_bytes(Bytes::from_static(s.as_bytes()))
 89    }
 90}
 91
 92/// Newtype wrapper that serializes a value as JSON into an `AsyncBody`.
 93pub struct Json<T: Serialize>(pub T);
 94
 95impl<T: Serialize> From<Json<T>> for AsyncBody {
 96    fn from(json: Json<T>) -> Self {
 97        Self::from_bytes(
 98            serde_json::to_vec(&json.0)
 99                .expect("failed to serialize JSON")
100                .into(),
101        )
102    }
103}
104
105impl<T: Into<Self>> From<Option<T>> for AsyncBody {
106    fn from(body: Option<T>) -> Self {
107        match body {
108            Some(body) => body.into(),
109            None => Self::empty(),
110        }
111    }
112}
113
114impl futures::AsyncRead for AsyncBody {
115    fn poll_read(
116        self: Pin<&mut Self>,
117        cx: &mut std::task::Context<'_>,
118        buf: &mut [u8],
119    ) -> std::task::Poll<std::io::Result<usize>> {
120        // SAFETY: Standard Enum pin projection
121        let inner = unsafe { &mut self.get_unchecked_mut().0 };
122        match inner {
123            Inner::Empty => Poll::Ready(Ok(0)),
124            // Blocking call is over an in-memory buffer
125            Inner::Bytes(cursor) => Poll::Ready(cursor.read(buf)),
126            Inner::AsyncReader(async_reader) => {
127                AsyncRead::poll_read(async_reader.as_mut(), cx, buf)
128            }
129        }
130    }
131}
132
133impl Body for AsyncBody {
134    type Data = Bytes;
135    type Error = std::io::Error;
136
137    fn poll_frame(
138        mut self: Pin<&mut Self>,
139        cx: &mut std::task::Context<'_>,
140    ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
141        let mut buffer = vec![0; 8192];
142        match AsyncRead::poll_read(self.as_mut(), cx, &mut buffer) {
143            Poll::Ready(Ok(0)) => Poll::Ready(None),
144            Poll::Ready(Ok(n)) => {
145                let data = Bytes::copy_from_slice(&buffer[..n]);
146                Poll::Ready(Some(Ok(Frame::data(data))))
147            }
148            Poll::Ready(Err(e)) => Poll::Ready(Some(Err(e))),
149            Poll::Pending => Poll::Pending,
150        }
151    }
152}