1use std::{
2 io::{Cursor, Read},
3 pin::Pin,
4 task::Poll,
5};
6
7use bytes::Bytes;
8use futures::AsyncRead;
9use http_body::{Body, Frame};
10
11/// Based on the implementation of AsyncBody in
12/// <https://github.com/sagebind/isahc/blob/5c533f1ef4d6bdf1fd291b5103c22110f41d0bf0/src/body/mod.rs>.
13pub struct AsyncBody(pub Inner);
14
15pub enum Inner {
16 /// An empty body.
17 Empty,
18
19 /// A body stored in memory.
20 Bytes(std::io::Cursor<Bytes>),
21
22 /// An asynchronous reader.
23 AsyncReader(Pin<Box<dyn futures::AsyncRead + Send + Sync>>),
24}
25
26impl AsyncBody {
27 /// Create a new empty body.
28 ///
29 /// An empty body represents the *absence* of a body, which is semantically
30 /// different than the presence of a body of zero length.
31 pub fn empty() -> Self {
32 Self(Inner::Empty)
33 }
34 /// Create a streaming body that reads from the given reader.
35 pub fn from_reader<R>(read: R) -> Self
36 where
37 R: AsyncRead + Send + Sync + 'static,
38 {
39 Self(Inner::AsyncReader(Box::pin(read)))
40 }
41
42 pub fn from_bytes(bytes: Bytes) -> Self {
43 Self(Inner::Bytes(Cursor::new(bytes.clone())))
44 }
45}
46
47impl Default for AsyncBody {
48 fn default() -> Self {
49 Self(Inner::Empty)
50 }
51}
52
53impl From<()> for AsyncBody {
54 fn from(_: ()) -> Self {
55 Self(Inner::Empty)
56 }
57}
58
59impl From<Bytes> for AsyncBody {
60 fn from(bytes: Bytes) -> Self {
61 Self::from_bytes(bytes)
62 }
63}
64
65impl From<Vec<u8>> for AsyncBody {
66 fn from(body: Vec<u8>) -> Self {
67 Self::from_bytes(body.into())
68 }
69}
70
71impl From<String> for AsyncBody {
72 fn from(body: String) -> Self {
73 Self::from_bytes(body.into())
74 }
75}
76
77impl From<&'static [u8]> for AsyncBody {
78 #[inline]
79 fn from(s: &'static [u8]) -> Self {
80 Self::from_bytes(Bytes::from_static(s))
81 }
82}
83
84impl From<&'static str> for AsyncBody {
85 #[inline]
86 fn from(s: &'static str) -> Self {
87 Self::from_bytes(Bytes::from_static(s.as_bytes()))
88 }
89}
90
91impl<T: Into<Self>> From<Option<T>> for AsyncBody {
92 fn from(body: Option<T>) -> Self {
93 match body {
94 Some(body) => body.into(),
95 None => Self::empty(),
96 }
97 }
98}
99
100impl futures::AsyncRead for AsyncBody {
101 fn poll_read(
102 self: Pin<&mut Self>,
103 cx: &mut std::task::Context<'_>,
104 buf: &mut [u8],
105 ) -> std::task::Poll<std::io::Result<usize>> {
106 // SAFETY: Standard Enum pin projection
107 let inner = unsafe { &mut self.get_unchecked_mut().0 };
108 match inner {
109 Inner::Empty => Poll::Ready(Ok(0)),
110 // Blocking call is over an in-memory buffer
111 Inner::Bytes(cursor) => Poll::Ready(cursor.read(buf)),
112 Inner::AsyncReader(async_reader) => {
113 AsyncRead::poll_read(async_reader.as_mut(), cx, buf)
114 }
115 }
116 }
117}
118
119impl Body for AsyncBody {
120 type Data = Bytes;
121 type Error = std::io::Error;
122
123 fn poll_frame(
124 mut self: Pin<&mut Self>,
125 cx: &mut std::task::Context<'_>,
126 ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
127 let mut buffer = vec![0; 8192];
128 match AsyncRead::poll_read(self.as_mut(), cx, &mut buffer) {
129 Poll::Ready(Ok(0)) => Poll::Ready(None),
130 Poll::Ready(Ok(n)) => {
131 let data = Bytes::copy_from_slice(&buffer[..n]);
132 Poll::Ready(Some(Ok(Frame::data(data))))
133 }
134 Poll::Ready(Err(e)) => Poll::Ready(Some(Err(e))),
135 Poll::Pending => Poll::Pending,
136 }
137 }
138}