1use std::{
2 io::{Cursor, Read},
3 pin::Pin,
4 task::Poll,
5};
6
7use bytes::Bytes;
8use futures::AsyncRead;
9use http_body::{Body, Frame};
10
11/// Based on the implementation of AsyncBody in
12/// <https://github.com/sagebind/isahc/blob/5c533f1ef4d6bdf1fd291b5103c22110f41d0bf0/src/body/mod.rs>.
13pub struct AsyncBody(pub Inner);
14
15pub enum Inner {
16 /// An empty body.
17 Empty,
18
19 /// A body stored in memory.
20 Bytes(std::io::Cursor<Bytes>),
21
22 /// An asynchronous reader.
23 AsyncReader(Pin<Box<dyn futures::AsyncRead + Send + Sync>>),
24}
25
26impl AsyncBody {
27 /// Create a new empty body.
28 ///
29 /// An empty body represents the *absence* of a body, which is semantically
30 /// different than the presence of a body of zero length.
31 pub const fn empty() -> Self {
32 Self(Inner::Empty)
33 }
34 /// Create a streaming body that reads from the given reader.
35 pub fn from_reader<R>(read: R) -> Self
36 where
37 R: AsyncRead + Send + Sync + 'static,
38 {
39 Self(Inner::AsyncReader(Box::pin(read)))
40 }
41
42 pub const fn from_bytes(bytes: Bytes) -> Self {
43 Self(Inner::Bytes(Cursor::new(bytes)))
44 }
45}
46
47impl Default for AsyncBody {
48 fn default() -> Self {
49 Self(Inner::Empty)
50 }
51}
52
53impl From<()> for AsyncBody {
54 fn from(_: ()) -> Self {
55 Self(Inner::Empty)
56 }
57}
58
59impl From<Bytes> for AsyncBody {
60 fn from(bytes: Bytes) -> Self {
61 Self::from_bytes(bytes)
62 }
63}
64
65impl From<Vec<u8>> for AsyncBody {
66 fn from(body: Vec<u8>) -> Self {
67 Self::from_bytes(body.into())
68 }
69}
70
71impl From<String> for AsyncBody {
72 fn from(body: String) -> Self {
73 Self::from_bytes(body.into())
74 }
75}
76
77impl From<&'static [u8]> for AsyncBody {
78 #[inline]
79 fn from(s: &'static [u8]) -> Self {
80 Self::from_bytes(Bytes::from_static(s))
81 }
82}
83
84impl From<&'static str> for AsyncBody {
85 #[inline]
86 fn from(s: &'static str) -> Self {
87 Self::from_bytes(Bytes::from_static(s.as_bytes()))
88 }
89}
90
91impl TryFrom<reqwest::Body> for AsyncBody {
92 type Error = anyhow::Error;
93
94 fn try_from(value: reqwest::Body) -> Result<Self, Self::Error> {
95 value
96 .as_bytes()
97 .ok_or_else(|| anyhow::anyhow!("Underlying data is a stream"))
98 .map(|bytes| Self::from_bytes(Bytes::copy_from_slice(bytes)))
99 }
100}
101
102impl<T: Into<Self>> From<Option<T>> for AsyncBody {
103 fn from(body: Option<T>) -> Self {
104 match body {
105 Some(body) => body.into(),
106 None => Self::empty(),
107 }
108 }
109}
110
111impl futures::AsyncRead for AsyncBody {
112 fn poll_read(
113 self: Pin<&mut Self>,
114 cx: &mut std::task::Context<'_>,
115 buf: &mut [u8],
116 ) -> std::task::Poll<std::io::Result<usize>> {
117 // SAFETY: Standard Enum pin projection
118 let inner = unsafe { &mut self.get_unchecked_mut().0 };
119 match inner {
120 Inner::Empty => Poll::Ready(Ok(0)),
121 // Blocking call is over an in-memory buffer
122 Inner::Bytes(cursor) => Poll::Ready(cursor.read(buf)),
123 Inner::AsyncReader(async_reader) => {
124 AsyncRead::poll_read(async_reader.as_mut(), cx, buf)
125 }
126 }
127 }
128}
129
130impl Body for AsyncBody {
131 type Data = Bytes;
132 type Error = std::io::Error;
133
134 fn poll_frame(
135 mut self: Pin<&mut Self>,
136 cx: &mut std::task::Context<'_>,
137 ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
138 let mut buffer = vec![0; 8192];
139 match AsyncRead::poll_read(self.as_mut(), cx, &mut buffer) {
140 Poll::Ready(Ok(0)) => Poll::Ready(None),
141 Poll::Ready(Ok(n)) => {
142 let data = Bytes::copy_from_slice(&buffer[..n]);
143 Poll::Ready(Some(Ok(Frame::data(data))))
144 }
145 Poll::Ready(Err(e)) => Poll::Ready(Some(Err(e))),
146 Poll::Pending => Poll::Pending,
147 }
148 }
149}