Allow reconnect before disconnect (#8684)

Conrad Irwin and Max created

Co-Authored-By: Max <max@zed.dev>



Release Notes:

- Improved handling of reconnections during calls

---------

Co-authored-by: Max <max@zed.dev>

Change summary

crates/collab/src/db/queries/buffers.rs    |  8 +++-----
crates/collab/src/db/queries/rooms.rs      | 10 +---------
crates/collab/src/rpc.rs                   | 12 ++++++++----
crates/collab/src/tests/following_tests.rs |  6 +++---
crates/collab/src/tests/test_server.rs     |  7 ++++++-
crates/rpc/src/peer.rs                     |  2 +-
6 files changed, 22 insertions(+), 23 deletions(-)

Detailed changes

crates/collab/src/db/queries/buffers.rs 🔗

@@ -161,11 +161,9 @@ impl Database {
 
                 // Find the collaborator record for this user's previous lost
                 // connection. Update it with the new connection id.
-                let server_id = ServerId(connection_id.owner_id as i32);
-                let Some(self_collaborator) = collaborators.iter_mut().find(|c| {
-                    c.user_id == user_id
-                        && (c.connection_lost || c.connection_server_id != server_id)
-                }) else {
+                let Some(self_collaborator) =
+                    collaborators.iter_mut().find(|c| c.user_id == user_id)
+                else {
                     log::info!("can't rejoin buffer, no previous collaborator found");
                     continue;
                 };

crates/collab/src/db/queries/rooms.rs 🔗

@@ -468,15 +468,7 @@ impl Database {
                     Condition::all()
                         .add(room_participant::Column::RoomId.eq(room_id))
                         .add(room_participant::Column::UserId.eq(user_id))
-                        .add(room_participant::Column::AnsweringConnectionId.is_not_null())
-                        .add(
-                            Condition::any()
-                                .add(room_participant::Column::AnsweringConnectionLost.eq(true))
-                                .add(
-                                    room_participant::Column::AnsweringConnectionServerId
-                                        .ne(connection.owner_id as i32),
-                                ),
-                        ),
+                        .add(room_participant::Column::AnsweringConnectionId.is_not_null()),
                 )
                 .set(room_participant::ActiveModel {
                     answering_connection_id: ActiveValue::set(Some(connection.id as i32)),

crates/collab/src/rpc.rs 🔗

@@ -147,7 +147,7 @@ pub struct Server {
     app_state: Arc<AppState>,
     executor: Executor,
     handlers: HashMap<TypeId, MessageHandler>,
-    teardown: watch::Sender<()>,
+    teardown: watch::Sender<bool>,
 }
 
 pub(crate) struct ConnectionPoolGuard<'a> {
@@ -180,7 +180,7 @@ impl Server {
             executor,
             connection_pool: Default::default(),
             handlers: Default::default(),
-            teardown: watch::channel(()).0,
+            teardown: watch::channel(false).0,
         };
 
         server
@@ -436,7 +436,7 @@ impl Server {
     pub fn teardown(&self) {
         self.peer.teardown();
         self.connection_pool.lock().reset();
-        let _ = self.teardown.send(());
+        let _ = self.teardown.send(true);
     }
 
     #[cfg(test)]
@@ -444,6 +444,7 @@ impl Server {
         self.teardown();
         *self.id.lock() = id;
         self.peer.reset(id.0 as u32);
+        let _ = self.teardown.send(false);
     }
 
     #[cfg(test)]
@@ -561,6 +562,9 @@ impl Server {
         }
         let mut teardown = self.teardown.subscribe();
         async move {
+            if *teardown.borrow() {
+                return Err(anyhow!("server is tearing down"))?;
+            }
             let (connection_id, handle_io, mut incoming_rx) = this
                 .peer
                 .add_connection(connection, {
@@ -943,7 +947,7 @@ pub async fn handle_metrics(Extension(server): Extension<Arc<Server>>) -> Result
 #[instrument(err, skip(executor))]
 async fn connection_lost(
     session: Session,
-    mut teardown: watch::Receiver<()>,
+    mut teardown: watch::Receiver<bool>,
     executor: Executor,
 ) -> Result<()> {
     session.peer.disconnect(session.connection_id);

crates/collab/src/tests/following_tests.rs 🔗

@@ -1578,7 +1578,7 @@ async fn test_following_across_workspaces(cx_a: &mut TestAppContext, cx_b: &mut
 
 #[gpui::test]
 async fn test_following_stops_on_unshare(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
-    let (_, client_a, client_b, channel_id) = TestServer::start2(cx_a, cx_b).await;
+    let (_server, client_a, client_b, channel_id) = TestServer::start2(cx_a, cx_b).await;
 
     let (workspace_a, cx_a) = client_a.build_test_workspace(cx_a).await;
     client_a
@@ -2024,7 +2024,7 @@ async fn test_following_to_channel_notes_other_workspace(
     cx_a: &mut TestAppContext,
     cx_b: &mut TestAppContext,
 ) {
-    let (_, client_a, client_b, channel) = TestServer::start2(cx_a, cx_b).await;
+    let (_server, client_a, client_b, channel) = TestServer::start2(cx_a, cx_b).await;
 
     let mut cx_a2 = cx_a.clone();
     let (workspace_a, cx_a) = client_a.build_test_workspace(cx_a).await;
@@ -2081,7 +2081,7 @@ async fn test_following_to_channel_notes_other_workspace(
 
 #[gpui::test]
 async fn test_following_while_deactivated(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
-    let (_, client_a, client_b, channel) = TestServer::start2(cx_a, cx_b).await;
+    let (_server, client_a, client_b, channel) = TestServer::start2(cx_a, cx_b).await;
 
     let mut cx_a2 = cx_a.clone();
     let (workspace_a, cx_a) = client_a.build_test_workspace(cx_a).await;

crates/collab/src/tests/test_server.rs 🔗

@@ -240,7 +240,12 @@ impl TestServer {
                                 Executor::Deterministic(cx.background_executor().clone()),
                             ))
                             .detach();
-                        let connection_id = connection_id_rx.await.unwrap();
+                        let connection_id = connection_id_rx.await.map_err(|e| {
+                            EstablishConnectionError::Other(anyhow!(
+                                "{} (is server shutting down?)",
+                                e
+                            ))
+                        })?;
                         connection_killers
                             .lock()
                             .insert(connection_id.into(), killed);

crates/rpc/src/peer.rs 🔗

@@ -361,8 +361,8 @@ impl Peer {
         self.connections.write().remove(&connection_id);
     }
 
+    #[cfg(any(test, feature = "test-support"))]
     pub fn reset(&self, epoch: u32) {
-        self.teardown();
         self.next_connection_id.store(0, SeqCst);
         self.epoch.store(epoch, SeqCst);
     }