test.rs

 1use async_tungstenite::tungstenite::{Error as WebSocketError, Message as WebSocketMessage};
 2use std::{
 3    io,
 4    pin::Pin,
 5    task::{Context, Poll},
 6};
 7
 8pub struct Channel {
 9    tx: futures::channel::mpsc::UnboundedSender<WebSocketMessage>,
10    rx: futures::channel::mpsc::UnboundedReceiver<WebSocketMessage>,
11}
12
13impl Channel {
14    pub fn new() -> Self {
15        let (tx, rx) = futures::channel::mpsc::unbounded();
16        Self { tx, rx }
17    }
18
19    pub fn bidirectional() -> (Self, Self) {
20        let (a_tx, a_rx) = futures::channel::mpsc::unbounded();
21        let (b_tx, b_rx) = futures::channel::mpsc::unbounded();
22        let a = Self { tx: a_tx, rx: b_rx };
23        let b = Self { tx: b_tx, rx: a_rx };
24        (a, b)
25    }
26}
27
28impl futures::Sink<WebSocketMessage> for Channel {
29    type Error = WebSocketError;
30
31    fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
32        Pin::new(&mut self.tx)
33            .poll_ready(cx)
34            .map_err(|err| io::Error::new(io::ErrorKind::Other, err).into())
35    }
36
37    fn start_send(mut self: Pin<&mut Self>, item: WebSocketMessage) -> Result<(), Self::Error> {
38        Pin::new(&mut self.tx)
39            .start_send(item)
40            .map_err(|err| io::Error::new(io::ErrorKind::Other, err).into())
41    }
42
43    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
44        Pin::new(&mut self.tx)
45            .poll_flush(cx)
46            .map_err(|err| io::Error::new(io::ErrorKind::Other, err).into())
47    }
48
49    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
50        Pin::new(&mut self.tx)
51            .poll_close(cx)
52            .map_err(|err| io::Error::new(io::ErrorKind::Other, err).into())
53    }
54}
55
56impl futures::Stream for Channel {
57    type Item = Result<WebSocketMessage, WebSocketError>;
58
59    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
60        Pin::new(&mut self.rx)
61            .poll_next(cx)
62            .map(|i| i.map(|i| Ok(i)))
63    }
64}