1use async_tungstenite::tungstenite::{Error as WebSocketError, Message as WebSocketMessage};
2use futures::{SinkExt as _, StreamExt as _};
3
4pub struct Connection {
5 pub(crate) tx:
6 Box<dyn 'static + Send + Unpin + futures::Sink<WebSocketMessage, Error = WebSocketError>>,
7 pub(crate) rx: Box<
8 dyn 'static
9 + Send
10 + Unpin
11 + futures::Stream<Item = Result<WebSocketMessage, WebSocketError>>,
12 >,
13}
14
15impl Connection {
16 pub fn new<S>(stream: S) -> Self
17 where
18 S: 'static
19 + Send
20 + Unpin
21 + futures::Sink<WebSocketMessage, Error = WebSocketError>
22 + futures::Stream<Item = Result<WebSocketMessage, WebSocketError>>,
23 {
24 let (tx, rx) = stream.split();
25 Self {
26 tx: Box::new(tx),
27 rx: Box::new(rx),
28 }
29 }
30
31 pub async fn send(&mut self, message: WebSocketMessage) -> Result<(), WebSocketError> {
32 self.tx.send(message).await
33 }
34
35 #[cfg(any(test, feature = "test-support"))]
36 pub fn in_memory(
37 executor: std::sync::Arc<gpui::executor::Background>,
38 ) -> (Self, Self, postage::barrier::Sender) {
39 use postage::prelude::Stream;
40
41 let (kill_tx, kill_rx) = postage::barrier::channel();
42 let (a_tx, a_rx) = channel(kill_rx.clone(), executor.clone());
43 let (b_tx, b_rx) = channel(kill_rx, executor);
44 return (
45 Self { tx: a_tx, rx: b_rx },
46 Self { tx: b_tx, rx: a_rx },
47 kill_tx,
48 );
49
50 fn channel(
51 kill_rx: postage::barrier::Receiver,
52 executor: std::sync::Arc<gpui::executor::Background>,
53 ) -> (
54 Box<dyn Send + Unpin + futures::Sink<WebSocketMessage, Error = WebSocketError>>,
55 Box<
56 dyn Send + Unpin + futures::Stream<Item = Result<WebSocketMessage, WebSocketError>>,
57 >,
58 ) {
59 use futures::channel::mpsc;
60 use std::{
61 io::{Error, ErrorKind},
62 sync::Arc,
63 };
64
65 let (tx, rx) = mpsc::unbounded::<WebSocketMessage>();
66
67 let tx = tx
68 .sink_map_err(|e| WebSocketError::from(Error::new(ErrorKind::Other, e)))
69 .with({
70 let kill_rx = kill_rx.clone();
71 let executor = Arc::downgrade(&executor);
72 move |msg| {
73 let mut kill_rx = kill_rx.clone();
74 let executor = executor.clone();
75 Box::pin(async move {
76 if let Some(executor) = executor.upgrade() {
77 executor.simulate_random_delay().await;
78 }
79
80 // Writes to a half-open TCP connection will error.
81 if kill_rx.try_recv().is_ok() {
82 std::io::Result::Err(
83 Error::new(ErrorKind::Other, "connection lost").into(),
84 )?;
85 }
86
87 Ok(msg)
88 })
89 }
90 });
91
92 let rx = rx.then({
93 let kill_rx = kill_rx.clone();
94 let executor = Arc::downgrade(&executor);
95 move |msg| {
96 let mut kill_rx = kill_rx.clone();
97 let executor = executor.clone();
98 Box::pin(async move {
99 if let Some(executor) = executor.upgrade() {
100 executor.simulate_random_delay().await;
101 }
102
103 // Reads from a half-open TCP connection will hang.
104 if kill_rx.try_recv().is_ok() {
105 futures::future::pending::<()>().await;
106 }
107
108 Ok(msg)
109 })
110 }
111 });
112
113 (Box::new(tx), Box::new(rx))
114 }
115 }
116}