@@ -143,11 +143,16 @@ pub enum Status {
Authenticating,
Connecting,
ConnectionError,
- Connected { connection_id: ConnectionId },
+ Connected {
+ peer_id: PeerId,
+ connection_id: ConnectionId,
+ },
ConnectionLost,
Reauthenticating,
Reconnecting,
- ReconnectionError { next_reconnection: Instant },
+ ReconnectionError {
+ next_reconnection: Instant,
+ },
}
impl Status {
@@ -314,6 +319,14 @@ impl Client {
.map(|credentials| credentials.user_id)
}
+ pub fn peer_id(&self) -> Option<PeerId> {
+ if let Status::Connected { peer_id, .. } = &*self.status().borrow() {
+ Some(*peer_id)
+ } else {
+ None
+ }
+ }
+
pub fn status(&self) -> watch::Receiver<Status> {
self.state.read().status.1.clone()
}
@@ -663,6 +676,7 @@ impl Client {
self.set_status(Status::Reconnecting, cx);
}
+ let mut timeout = cx.background().timer(CONNECTION_TIMEOUT).fuse();
futures::select_biased! {
connection = self.establish_connection(&credentials, cx).fuse() => {
match connection {
@@ -671,8 +685,14 @@ impl Client {
if !read_from_keychain && IMPERSONATE_LOGIN.is_none() {
write_credentials_to_keychain(&credentials, cx).log_err();
}
- self.set_connection(conn, cx);
- Ok(())
+
+ futures::select_biased! {
+ result = self.set_connection(conn, cx).fuse() => result,
+ _ = timeout => {
+ self.set_status(Status::ConnectionError, cx);
+ Err(anyhow!("timed out waiting on hello message from server"))
+ }
+ }
}
Err(EstablishConnectionError::Unauthorized) => {
self.state.write().credentials.take();
@@ -695,21 +715,65 @@ impl Client {
}
}
}
- _ = cx.background().timer(CONNECTION_TIMEOUT).fuse() => {
+ _ = &mut timeout => {
self.set_status(Status::ConnectionError, cx);
Err(anyhow!("timed out trying to establish connection"))
}
}
}
- fn set_connection(self: &Arc<Self>, conn: Connection, cx: &AsyncAppContext) {
+ async fn set_connection(
+ self: &Arc<Self>,
+ conn: Connection,
+ cx: &AsyncAppContext,
+ ) -> Result<()> {
let executor = cx.background();
log::info!("add connection to peer");
let (connection_id, handle_io, mut incoming) = self
.peer
.add_connection(conn, move |duration| executor.timer(duration));
- log::info!("set status to connected {}", connection_id);
- self.set_status(Status::Connected { connection_id }, cx);
+ let handle_io = cx.background().spawn(handle_io);
+
+ let peer_id = async {
+ log::info!("waiting for server hello");
+ let message = incoming
+ .next()
+ .await
+ .ok_or_else(|| anyhow!("no hello message received"))?;
+ log::info!("got server hello");
+ let hello_message_type_name = message.payload_type_name().to_string();
+ let hello = message
+ .into_any()
+ .downcast::<TypedEnvelope<proto::Hello>>()
+ .map_err(|_| {
+ anyhow!(
+ "invalid hello message received: {:?}",
+ hello_message_type_name
+ )
+ })?;
+ Ok(PeerId(hello.payload.peer_id))
+ };
+
+ let peer_id = match peer_id.await {
+ Ok(peer_id) => peer_id,
+ Err(error) => {
+ self.peer.disconnect(connection_id);
+ return Err(error);
+ }
+ };
+
+ log::info!(
+ "set status to connected (connection id: {}, peer id: {})",
+ connection_id,
+ peer_id
+ );
+ self.set_status(
+ Status::Connected {
+ peer_id,
+ connection_id,
+ },
+ cx,
+ );
cx.foreground()
.spawn({
let cx = cx.clone();
@@ -807,14 +871,18 @@ impl Client {
})
.detach();
- let handle_io = cx.background().spawn(handle_io);
let this = self.clone();
let cx = cx.clone();
cx.foreground()
.spawn(async move {
match handle_io.await {
Ok(()) => {
- if *this.status().borrow() == (Status::Connected { connection_id }) {
+ if *this.status().borrow()
+ == (Status::Connected {
+ connection_id,
+ peer_id,
+ })
+ {
this.set_status(Status::SignedOut, &cx);
}
}
@@ -825,6 +893,8 @@ impl Client {
}
})
.detach();
+
+ Ok(())
}
fn authenticate(self: &Arc<Self>, cx: &AsyncAppContext) -> Task<Result<Credentials>> {
@@ -8,7 +8,7 @@ use anyhow::anyhow;
use call::{room, ActiveCall, ParticipantLocation, Room};
use client::{
self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Connection,
- Credentials, EstablishConnectionError, User, UserStore, RECEIVE_TIMEOUT,
+ Credentials, EstablishConnectionError, PeerId, User, UserStore, RECEIVE_TIMEOUT,
};
use collections::{BTreeMap, HashMap, HashSet};
use editor::{
@@ -16,7 +16,10 @@ use editor::{
ToggleCodeActions, Undo,
};
use fs::{FakeFs, Fs as _, HomeDir, LineEnding};
-use futures::{channel::mpsc, Future, StreamExt as _};
+use futures::{
+ channel::{mpsc, oneshot},
+ Future, StreamExt as _,
+};
use gpui::{
executor::{self, Deterministic},
geometry::vector::vec2f,
@@ -34,7 +37,6 @@ use project::{
ProjectStore, WorktreeId,
};
use rand::prelude::*;
-use rpc::PeerId;
use serde_json::json;
use settings::{Formatter, Settings};
use sqlx::types::time::OffsetDateTime;
@@ -385,7 +387,7 @@ async fn test_leaving_room_on_disconnection(
);
// When user A disconnects, both client A and B clear their room on the active call.
- server.disconnect_client(client_a.current_user_id(cx_a));
+ server.disconnect_client(client_a.peer_id().unwrap());
cx_a.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
active_call_a.read_with(cx_a, |call, _| assert!(call.room().is_none()));
active_call_b.read_with(cx_b, |call, _| assert!(call.room().is_none()));
@@ -416,7 +418,7 @@ async fn test_calls_on_multiple_connections(
let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
let client_a = server.create_client(cx_a, "user_a").await;
let client_b1 = server.create_client(cx_b1, "user_b").await;
- let _client_b2 = server.create_client(cx_b2, "user_b").await;
+ let client_b2 = server.create_client(cx_b2, "user_b").await;
server
.make_contacts(&mut [(&client_a, cx_a), (&client_b1, cx_b1)])
.await;
@@ -468,6 +470,14 @@ async fn test_calls_on_multiple_connections(
assert!(incoming_call_b1.next().await.unwrap().is_none());
assert!(incoming_call_b2.next().await.unwrap().is_none());
+ // User B disconnects the client that is not on the call. Everything should be fine.
+ client_b1.disconnect(&cx_b1.to_async()).unwrap();
+ deterministic.advance_clock(rpc::RECEIVE_TIMEOUT);
+ client_b1
+ .authenticate_and_connect(false, &cx_b1.to_async())
+ .await
+ .unwrap();
+
// User B hangs up, and user A calls them again.
active_call_b2.update(cx_b2, |call, cx| call.hang_up(cx).unwrap());
deterministic.run_until_parked();
@@ -520,11 +530,29 @@ async fn test_calls_on_multiple_connections(
assert!(incoming_call_b1.next().await.unwrap().is_some());
assert!(incoming_call_b2.next().await.unwrap().is_some());
- // User A disconnects up, causing both connections to stop ringing.
- server.disconnect_client(client_a.current_user_id(cx_a));
- cx_a.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
+ // User A disconnects, causing both connections to stop ringing.
+ server.disconnect_client(client_a.peer_id().unwrap());
+ deterministic.advance_clock(rpc::RECEIVE_TIMEOUT);
assert!(incoming_call_b1.next().await.unwrap().is_none());
assert!(incoming_call_b2.next().await.unwrap().is_none());
+
+ // User A reconnects automatically, then calls user B again.
+ active_call_a
+ .update(cx_a, |call, cx| {
+ call.invite(client_b1.user_id().unwrap(), None, cx)
+ })
+ .await
+ .unwrap();
+ deterministic.run_until_parked();
+ assert!(incoming_call_b1.next().await.unwrap().is_some());
+ assert!(incoming_call_b2.next().await.unwrap().is_some());
+
+ // User B disconnects all clients, causing user A to no longer see a pending call for them.
+ server.forbid_connections();
+ server.disconnect_client(client_b1.peer_id().unwrap());
+ server.disconnect_client(client_b2.peer_id().unwrap());
+ deterministic.advance_clock(rpc::RECEIVE_TIMEOUT);
+ active_call_a.read_with(cx_a, |call, _| assert!(call.room().is_none()));
}
#[gpui::test(iterations = 10)]
@@ -582,7 +610,7 @@ async fn test_share_project(
.update(cx_b, |call, cx| call.accept_incoming(cx))
.await
.unwrap();
- let client_b_peer_id = client_b.peer_id;
+ let client_b_peer_id = client_b.peer_id().unwrap();
let project_b = client_b
.build_remote_project(initial_project.id, cx_b)
.await;
@@ -806,7 +834,7 @@ async fn test_host_disconnect(
assert!(cx_b.is_window_edited(workspace_b.window_id()));
// Drop client A's connection. Collaborators should disappear and the project should not be shown as shared.
- server.disconnect_client(client_a.current_user_id(cx_a));
+ server.disconnect_client(client_a.peer_id().unwrap());
deterministic.advance_clock(rpc::RECEIVE_TIMEOUT);
project_a
.condition(cx_a, |project, _| project.collaborators().is_empty())
@@ -849,7 +877,7 @@ async fn test_host_disconnect(
.unwrap();
// Drop client A's connection again. We should still unshare it successfully.
- server.disconnect_client(client_a.current_user_id(cx_a));
+ server.disconnect_client(client_a.peer_id().unwrap());
deterministic.advance_clock(rpc::RECEIVE_TIMEOUT);
project_a.read_with(cx_a, |project, _| assert!(!project.is_shared()));
}
@@ -2150,7 +2178,7 @@ async fn test_leaving_project(
// Simulate connection loss for client C and ensure client A observes client C leaving the project.
client_c.wait_for_current_user(cx_c).await;
- server.disconnect_client(client_c.current_user_id(cx_c));
+ server.disconnect_client(client_c.peer_id().unwrap());
cx_a.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
deterministic.run_until_parked();
project_a.read_with(cx_a, |project, _| {
@@ -4313,7 +4341,7 @@ async fn test_chat_reconnection(cx_a: &mut TestAppContext, cx_b: &mut TestAppCon
// Disconnect client B, ensuring we can still access its cached channel data.
server.forbid_connections();
- server.disconnect_client(client_b.current_user_id(cx_b));
+ server.disconnect_client(client_b.peer_id().unwrap());
cx_b.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
while !matches!(
status_b.next().await,
@@ -4476,7 +4504,7 @@ async fn test_contacts(
]
);
- server.disconnect_client(client_c.current_user_id(cx_c));
+ server.disconnect_client(client_c.peer_id().unwrap());
server.forbid_connections();
deterministic.advance_clock(rpc::RECEIVE_TIMEOUT);
assert_eq!(
@@ -4716,7 +4744,7 @@ async fn test_contacts(
);
server.forbid_connections();
- server.disconnect_client(client_a.current_user_id(cx_a));
+ server.disconnect_client(client_a.peer_id().unwrap());
deterministic.advance_clock(rpc::RECEIVE_TIMEOUT);
assert_eq!(contacts(&client_a, cx_a), []);
assert_eq!(
@@ -5626,6 +5654,7 @@ async fn test_random_collaboration(
let mut clients = Vec::new();
let mut user_ids = Vec::new();
+ let mut peer_ids = Vec::new();
let mut op_start_signals = Vec::new();
let mut next_entity_id = 100000;
@@ -5814,6 +5843,7 @@ async fn test_random_collaboration(
let op_start_signal = futures::channel::mpsc::unbounded();
user_ids.push(host_user_id);
+ peer_ids.push(host.peer_id().unwrap());
op_start_signals.push(op_start_signal.0);
clients.push(host_cx.foreground().spawn(host.simulate_host(
host_project,
@@ -5831,7 +5861,7 @@ async fn test_random_collaboration(
let mut operations = 0;
while operations < max_operations {
if operations == disconnect_host_at {
- server.disconnect_client(user_ids[0]);
+ server.disconnect_client(peer_ids[0]);
deterministic.advance_clock(RECEIVE_TIMEOUT);
drop(op_start_signals);
@@ -5914,6 +5944,7 @@ async fn test_random_collaboration(
let op_start_signal = futures::channel::mpsc::unbounded();
user_ids.push(guest_user_id);
+ peer_ids.push(guest.peer_id().unwrap());
op_start_signals.push(op_start_signal.0);
clients.push(guest_cx.foreground().spawn(guest.simulate_guest(
guest_username.clone(),
@@ -5930,10 +5961,11 @@ async fn test_random_collaboration(
let guest_ix = rng.lock().gen_range(1..clients.len());
log::info!("Removing guest {}", user_ids[guest_ix]);
let removed_guest_id = user_ids.remove(guest_ix);
+ let removed_peer_id = peer_ids.remove(guest_ix);
let guest = clients.remove(guest_ix);
op_start_signals.remove(guest_ix);
server.forbid_connections();
- server.disconnect_client(removed_guest_id);
+ server.disconnect_client(removed_peer_id);
deterministic.advance_clock(RECEIVE_TIMEOUT);
deterministic.start_waiting();
log::info!("Waiting for guest {} to exit...", removed_guest_id);
@@ -6057,8 +6089,10 @@ async fn test_random_collaboration(
let host_buffer = host_project.read_with(&host_cx, |project, cx| {
project.buffer_for_id(buffer_id, cx).unwrap_or_else(|| {
panic!(
- "host does not have buffer for guest:{}, peer:{}, id:{}",
- guest_client.username, guest_client.peer_id, buffer_id
+ "host does not have buffer for guest:{}, peer:{:?}, id:{}",
+ guest_client.username,
+ guest_client.peer_id(),
+ buffer_id
)
})
});
@@ -6101,7 +6135,7 @@ struct TestServer {
server: Arc<Server>,
foreground: Rc<executor::Foreground>,
notifications: mpsc::UnboundedReceiver<()>,
- connection_killers: Arc<Mutex<HashMap<UserId, Arc<AtomicBool>>>>,
+ connection_killers: Arc<Mutex<HashMap<PeerId, Arc<AtomicBool>>>>,
forbid_connections: Arc<AtomicBool>,
_test_db: TestDb,
}
@@ -6167,7 +6201,6 @@ impl TestServer {
let db = self.app_state.db.clone();
let connection_killers = self.connection_killers.clone();
let forbid_connections = self.forbid_connections.clone();
- let (connection_id_tx, mut connection_id_rx) = mpsc::channel(16);
Arc::get_mut(&mut client)
.unwrap()
@@ -6190,7 +6223,6 @@ impl TestServer {
let connection_killers = connection_killers.clone();
let forbid_connections = forbid_connections.clone();
let client_name = client_name.clone();
- let connection_id_tx = connection_id_tx.clone();
cx.spawn(move |cx| async move {
if forbid_connections.load(SeqCst) {
Err(EstablishConnectionError::other(anyhow!(
@@ -6199,7 +6231,7 @@ impl TestServer {
} else {
let (client_conn, server_conn, killed) =
Connection::in_memory(cx.background());
- connection_killers.lock().insert(user_id, killed);
+ let (connection_id_tx, connection_id_rx) = oneshot::channel();
let user = db.get_user_by_id(user_id).await.unwrap().unwrap();
cx.background()
.spawn(server.handle_connection(
@@ -6210,6 +6242,10 @@ impl TestServer {
cx.background(),
))
.detach();
+ let connection_id = connection_id_rx.await.unwrap();
+ connection_killers
+ .lock()
+ .insert(PeerId(connection_id.0), killed);
Ok(client_conn)
}
})
@@ -6241,11 +6277,9 @@ impl TestServer {
.authenticate_and_connect(false, &cx.to_async())
.await
.unwrap();
- let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
let client = TestClient {
client,
- peer_id,
username: name.to_string(),
user_store,
project_store,
@@ -6257,10 +6291,10 @@ impl TestServer {
client
}
- fn disconnect_client(&self, user_id: UserId) {
+ fn disconnect_client(&self, peer_id: PeerId) {
self.connection_killers
.lock()
- .remove(&user_id)
+ .remove(&peer_id)
.unwrap()
.store(true, SeqCst);
}
@@ -6361,7 +6395,6 @@ impl Drop for TestServer {
struct TestClient {
client: Arc<Client>,
username: String,
- pub peer_id: PeerId,
pub user_store: ModelHandle<UserStore>,
pub project_store: ModelHandle<ProjectStore>,
language_registry: Arc<LanguageRegistry>,
@@ -24,7 +24,7 @@ use axum::{
};
use collections::{HashMap, HashSet};
use futures::{
- channel::mpsc,
+ channel::{mpsc, oneshot},
future::{self, BoxFuture},
stream::FuturesUnordered,
FutureExt, SinkExt, StreamExt, TryStreamExt,
@@ -348,7 +348,7 @@ impl Server {
connection: Connection,
address: String,
user: User,
- mut send_connection_id: Option<mpsc::Sender<ConnectionId>>,
+ mut send_connection_id: Option<oneshot::Sender<ConnectionId>>,
executor: E,
) -> impl Future<Output = Result<()>> {
let mut this = self.clone();
@@ -369,9 +369,11 @@ impl Server {
});
tracing::info!(%user_id, %login, %connection_id, %address, "connection opened");
+ this.peer.send(connection_id, proto::Hello { peer_id: connection_id.0 })?;
+ tracing::info!(%user_id, %login, %connection_id, %address, "sent hello message");
- if let Some(send_connection_id) = send_connection_id.as_mut() {
- let _ = send_connection_id.send(connection_id).await;
+ if let Some(send_connection_id) = send_connection_id.take() {
+ let _ = send_connection_id.send(connection_id);
}
if !user.connected_once {
@@ -477,6 +479,10 @@ impl Server {
let mut contacts_to_update = HashSet::default();
{
let mut store = self.store().await;
+
+ #[cfg(test)]
+ let removed_connection = store.remove_connection(connection_id).unwrap();
+ #[cfg(not(test))]
let removed_connection = store.remove_connection(connection_id)?;
for project in removed_connection.hosted_projects {