@@ -143,11 +143,16 @@ pub enum Status {
Authenticating,
Connecting,
ConnectionError,
- Connected { connection_id: ConnectionId },
+ Connected {
+ peer_id: PeerId,
+ connection_id: ConnectionId,
+ },
ConnectionLost,
Reauthenticating,
Reconnecting,
- ReconnectionError { next_reconnection: Instant },
+ ReconnectionError {
+ next_reconnection: Instant,
+ },
}
impl Status {
@@ -663,6 +668,7 @@ impl Client {
self.set_status(Status::Reconnecting, cx);
}
+ let mut timeout = cx.background().timer(CONNECTION_TIMEOUT).fuse();
futures::select_biased! {
connection = self.establish_connection(&credentials, cx).fuse() => {
match connection {
@@ -671,8 +677,14 @@ impl Client {
if !read_from_keychain && IMPERSONATE_LOGIN.is_none() {
write_credentials_to_keychain(&credentials, cx).log_err();
}
- self.set_connection(conn, cx);
- Ok(())
+
+ futures::select_biased! {
+ result = self.set_connection(conn, cx).fuse() => result,
+ _ = timeout => {
+ self.set_status(Status::ConnectionError, cx);
+ Err(anyhow!("timed out waiting on hello message from server"))
+ }
+ }
}
Err(EstablishConnectionError::Unauthorized) => {
self.state.write().credentials.take();
@@ -695,21 +707,65 @@ impl Client {
}
}
}
- _ = cx.background().timer(CONNECTION_TIMEOUT).fuse() => {
+ _ = &mut timeout => {
self.set_status(Status::ConnectionError, cx);
Err(anyhow!("timed out trying to establish connection"))
}
}
}
- fn set_connection(self: &Arc<Self>, conn: Connection, cx: &AsyncAppContext) {
+ async fn set_connection(
+ self: &Arc<Self>,
+ conn: Connection,
+ cx: &AsyncAppContext,
+ ) -> Result<()> {
let executor = cx.background();
log::info!("add connection to peer");
let (connection_id, handle_io, mut incoming) = self
.peer
.add_connection(conn, move |duration| executor.timer(duration));
- log::info!("set status to connected {}", connection_id);
- self.set_status(Status::Connected { connection_id }, cx);
+ let handle_io = cx.background().spawn(handle_io);
+
+ let peer_id = async {
+ log::info!("waiting for server hello");
+ let message = incoming
+ .next()
+ .await
+ .ok_or_else(|| anyhow!("no hello message received"))?;
+ log::info!("got server hello");
+ let hello_message_type_name = message.payload_type_name().to_string();
+ let hello = message
+ .into_any()
+ .downcast::<TypedEnvelope<proto::Hello>>()
+ .map_err(|_| {
+ anyhow!(
+ "invalid hello message received: {:?}",
+ hello_message_type_name
+ )
+ })?;
+ Ok(PeerId(hello.payload.peer_id))
+ };
+
+ let peer_id = match peer_id.await {
+ Ok(peer_id) => peer_id,
+ Err(error) => {
+ self.peer.disconnect(connection_id);
+ return Err(error);
+ }
+ };
+
+ log::info!(
+ "set status to connected (connection id: {}, peer id: {})",
+ connection_id,
+ peer_id
+ );
+ self.set_status(
+ Status::Connected {
+ peer_id,
+ connection_id,
+ },
+ cx,
+ );
cx.foreground()
.spawn({
let cx = cx.clone();
@@ -807,14 +863,18 @@ impl Client {
})
.detach();
- let handle_io = cx.background().spawn(handle_io);
let this = self.clone();
let cx = cx.clone();
cx.foreground()
.spawn(async move {
match handle_io.await {
Ok(()) => {
- if *this.status().borrow() == (Status::Connected { connection_id }) {
+ if *this.status().borrow()
+ == (Status::Connected {
+ connection_id,
+ peer_id,
+ })
+ {
this.set_status(Status::SignedOut, &cx);
}
}
@@ -825,6 +885,8 @@ impl Client {
}
})
.detach();
+
+ Ok(())
}
fn authenticate(self: &Arc<Self>, cx: &AsyncAppContext) -> Task<Result<Credentials>> {
@@ -369,6 +369,8 @@ impl Server {
});
tracing::info!(%user_id, %login, %connection_id, %address, "connection opened");
+ this.peer.send(connection_id, proto::Hello { peer_id: connection_id.0 })?;
+ tracing::info!(%user_id, %login, %connection_id, %address, "sent hello message");
if let Some(send_connection_id) = send_connection_id.as_mut() {
let _ = send_connection_id.send(connection_id).await;
@@ -121,6 +121,7 @@ messages!(
(GetProjectSymbols, Background),
(GetProjectSymbolsResponse, Background),
(GetUsers, Foreground),
+ (Hello, Foreground),
(IncomingCall, Foreground),
(UsersResponse, Foreground),
(JoinChannel, Foreground),