1use std::{
2 io::{Cursor, Read},
3 pin::Pin,
4 task::Poll,
5};
6
7use bytes::Bytes;
8use futures::AsyncRead;
9use http_body::{Body, Frame};
10use serde::Serialize;
11
12/// Based on the implementation of AsyncBody in
13/// <https://github.com/sagebind/isahc/blob/5c533f1ef4d6bdf1fd291b5103c22110f41d0bf0/src/body/mod.rs>.
14pub struct AsyncBody(pub Inner);
15
16pub enum Inner {
17 /// An empty body.
18 Empty,
19
20 /// A body stored in memory.
21 Bytes(std::io::Cursor<Bytes>),
22
23 /// An asynchronous reader.
24 AsyncReader(Pin<Box<dyn futures::AsyncRead + Send + Sync>>),
25}
26
27impl AsyncBody {
28 /// Create a new empty body.
29 ///
30 /// An empty body represents the *absence* of a body, which is semantically
31 /// different than the presence of a body of zero length.
32 pub fn empty() -> Self {
33 Self(Inner::Empty)
34 }
35 /// Create a streaming body that reads from the given reader.
36 pub fn from_reader<R>(read: R) -> Self
37 where
38 R: AsyncRead + Send + Sync + 'static,
39 {
40 Self(Inner::AsyncReader(Box::pin(read)))
41 }
42
43 pub fn from_bytes(bytes: Bytes) -> Self {
44 Self(Inner::Bytes(Cursor::new(bytes)))
45 }
46}
47
48impl Default for AsyncBody {
49 fn default() -> Self {
50 Self(Inner::Empty)
51 }
52}
53
54impl From<()> for AsyncBody {
55 fn from(_: ()) -> Self {
56 Self(Inner::Empty)
57 }
58}
59
60impl From<Bytes> for AsyncBody {
61 fn from(bytes: Bytes) -> Self {
62 Self::from_bytes(bytes)
63 }
64}
65
66impl From<Vec<u8>> for AsyncBody {
67 fn from(body: Vec<u8>) -> Self {
68 Self::from_bytes(body.into())
69 }
70}
71
72impl From<String> for AsyncBody {
73 fn from(body: String) -> Self {
74 Self::from_bytes(body.into())
75 }
76}
77
78impl From<&'static [u8]> for AsyncBody {
79 #[inline]
80 fn from(s: &'static [u8]) -> Self {
81 Self::from_bytes(Bytes::from_static(s))
82 }
83}
84
85impl From<&'static str> for AsyncBody {
86 #[inline]
87 fn from(s: &'static str) -> Self {
88 Self::from_bytes(Bytes::from_static(s.as_bytes()))
89 }
90}
91
92/// Newtype wrapper that serializes a value as JSON into an `AsyncBody`.
93pub struct Json<T: Serialize>(pub T);
94
95impl<T: Serialize> From<Json<T>> for AsyncBody {
96 fn from(json: Json<T>) -> Self {
97 Self::from_bytes(
98 serde_json::to_vec(&json.0)
99 .expect("failed to serialize JSON")
100 .into(),
101 )
102 }
103}
104
105impl<T: Into<Self>> From<Option<T>> for AsyncBody {
106 fn from(body: Option<T>) -> Self {
107 match body {
108 Some(body) => body.into(),
109 None => Self::empty(),
110 }
111 }
112}
113
114impl futures::AsyncRead for AsyncBody {
115 fn poll_read(
116 self: Pin<&mut Self>,
117 cx: &mut std::task::Context<'_>,
118 buf: &mut [u8],
119 ) -> std::task::Poll<std::io::Result<usize>> {
120 // SAFETY: Standard Enum pin projection
121 let inner = unsafe { &mut self.get_unchecked_mut().0 };
122 match inner {
123 Inner::Empty => Poll::Ready(Ok(0)),
124 // Blocking call is over an in-memory buffer
125 Inner::Bytes(cursor) => Poll::Ready(cursor.read(buf)),
126 Inner::AsyncReader(async_reader) => {
127 AsyncRead::poll_read(async_reader.as_mut(), cx, buf)
128 }
129 }
130 }
131}
132
133impl Body for AsyncBody {
134 type Data = Bytes;
135 type Error = std::io::Error;
136
137 fn poll_frame(
138 mut self: Pin<&mut Self>,
139 cx: &mut std::task::Context<'_>,
140 ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
141 let mut buffer = vec![0; 8192];
142 match AsyncRead::poll_read(self.as_mut(), cx, &mut buffer) {
143 Poll::Ready(Ok(0)) => Poll::Ready(None),
144 Poll::Ready(Ok(n)) => {
145 let data = Bytes::copy_from_slice(&buffer[..n]);
146 Poll::Ready(Some(Ok(Frame::data(data))))
147 }
148 Poll::Ready(Err(e)) => Poll::Ready(Some(Err(e))),
149 Poll::Pending => Poll::Pending,
150 }
151 }
152}