1use async_tungstenite::tungstenite::{Error as WebSocketError, Message as WebSocketMessage};
2use std::{
3 io,
4 pin::Pin,
5 task::{Context, Poll},
6};
7
8pub struct Channel {
9 tx: futures::channel::mpsc::UnboundedSender<WebSocketMessage>,
10 rx: futures::channel::mpsc::UnboundedReceiver<WebSocketMessage>,
11}
12
13impl Channel {
14 pub fn new() -> Self {
15 let (tx, rx) = futures::channel::mpsc::unbounded();
16 Self { tx, rx }
17 }
18
19 pub fn bidirectional() -> (Self, Self) {
20 let (a_tx, a_rx) = futures::channel::mpsc::unbounded();
21 let (b_tx, b_rx) = futures::channel::mpsc::unbounded();
22 let a = Self { tx: a_tx, rx: b_rx };
23 let b = Self { tx: b_tx, rx: a_rx };
24 (a, b)
25 }
26}
27
28impl futures::Sink<WebSocketMessage> for Channel {
29 type Error = WebSocketError;
30
31 fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
32 Pin::new(&mut self.tx)
33 .poll_ready(cx)
34 .map_err(|err| io::Error::new(io::ErrorKind::Other, err).into())
35 }
36
37 fn start_send(mut self: Pin<&mut Self>, item: WebSocketMessage) -> Result<(), Self::Error> {
38 Pin::new(&mut self.tx)
39 .start_send(item)
40 .map_err(|err| io::Error::new(io::ErrorKind::Other, err).into())
41 }
42
43 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
44 Pin::new(&mut self.tx)
45 .poll_flush(cx)
46 .map_err(|err| io::Error::new(io::ErrorKind::Other, err).into())
47 }
48
49 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
50 Pin::new(&mut self.tx)
51 .poll_close(cx)
52 .map_err(|err| io::Error::new(io::ErrorKind::Other, err).into())
53 }
54}
55
56impl futures::Stream for Channel {
57 type Item = Result<WebSocketMessage, WebSocketError>;
58
59 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
60 Pin::new(&mut self.rx)
61 .poll_next(cx)
62 .map(|i| i.map(|i| Ok(i)))
63 }
64}