1use std::{borrow::Cow, io::Read, pin::Pin, task::Poll};
2
3use futures::{AsyncRead, AsyncReadExt};
4
5/// Based on the implementation of AsyncBody in
6/// https://github.com/sagebind/isahc/blob/5c533f1ef4d6bdf1fd291b5103c22110f41d0bf0/src/body/mod.rs
7pub struct AsyncBody(pub Inner);
8
9pub enum Inner {
10 /// An empty body.
11 Empty,
12
13 /// A body stored in memory.
14 SyncReader(std::io::Cursor<Cow<'static, [u8]>>),
15
16 /// An asynchronous reader.
17 AsyncReader(Pin<Box<dyn futures::AsyncRead + Send + Sync>>),
18}
19
20impl AsyncBody {
21 /// Create a new empty body.
22 ///
23 /// An empty body represents the *absence* of a body, which is semantically
24 /// different than the presence of a body of zero length.
25 pub fn empty() -> Self {
26 Self(Inner::Empty)
27 }
28 /// Create a streaming body that reads from the given reader.
29 pub fn from_reader<R>(read: R) -> Self
30 where
31 R: AsyncRead + Send + Sync + 'static,
32 {
33 Self(Inner::AsyncReader(Box::pin(read)))
34 }
35}
36
37impl Default for AsyncBody {
38 fn default() -> Self {
39 Self(Inner::Empty)
40 }
41}
42
43impl From<()> for AsyncBody {
44 fn from(_: ()) -> Self {
45 Self(Inner::Empty)
46 }
47}
48
49impl From<Vec<u8>> for AsyncBody {
50 fn from(body: Vec<u8>) -> Self {
51 Self(Inner::SyncReader(std::io::Cursor::new(Cow::Owned(body))))
52 }
53}
54
55impl From<&'_ [u8]> for AsyncBody {
56 fn from(body: &[u8]) -> Self {
57 body.to_vec().into()
58 }
59}
60
61impl From<String> for AsyncBody {
62 fn from(body: String) -> Self {
63 body.into_bytes().into()
64 }
65}
66
67impl From<&'_ str> for AsyncBody {
68 fn from(body: &str) -> Self {
69 body.as_bytes().into()
70 }
71}
72
73impl<T: Into<Self>> From<Option<T>> for AsyncBody {
74 fn from(body: Option<T>) -> Self {
75 match body {
76 Some(body) => body.into(),
77 None => Self(Inner::Empty),
78 }
79 }
80}
81
82impl std::io::Read for AsyncBody {
83 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
84 match &mut self.0 {
85 Inner::Empty => Ok(0),
86 Inner::SyncReader(cursor) => cursor.read(buf),
87 Inner::AsyncReader(async_reader) => smol::block_on(async_reader.read(buf)),
88 }
89 }
90}
91
92impl futures::AsyncRead for AsyncBody {
93 fn poll_read(
94 self: Pin<&mut Self>,
95 cx: &mut std::task::Context<'_>,
96 buf: &mut [u8],
97 ) -> std::task::Poll<std::io::Result<usize>> {
98 // SAFETY: Standard Enum pin projection
99 let inner = unsafe { &mut self.get_unchecked_mut().0 };
100 match inner {
101 Inner::Empty => Poll::Ready(Ok(0)),
102 // Blocking call is over an in-memory buffer
103 Inner::SyncReader(cursor) => Poll::Ready(cursor.read(buf)),
104 Inner::AsyncReader(async_reader) => {
105 AsyncRead::poll_read(async_reader.as_mut(), cx, buf)
106 }
107 }
108 }
109}