Delete stale rooms/participants after `RECONNECT_TIMEOUT`

Antonio Scandurra created

Change summary

crates/call/src/room.rs                                                              |   8 
crates/collab/migrations/20221207165001_add_connection_lost_to_room_participants.sql |   1 
crates/collab/src/db.rs                                                              |  85 
crates/collab/src/integration_tests.rs                                               | 516 
crates/collab/src/lib.rs                                                             |   2 
crates/collab/src/main.rs                                                            |   4 
crates/collab/src/rpc.rs                                                             | 125 
7 files changed, 579 insertions(+), 162 deletions(-)

Detailed changes

crates/call/src/room.rs 🔗

@@ -50,7 +50,7 @@ pub struct Room {
     user_store: ModelHandle<UserStore>,
     subscriptions: Vec<client::Subscription>,
     pending_room_update: Option<Task<()>>,
-    _maintain_connection: Task<Result<()>>,
+    maintain_connection: Option<Task<Result<()>>>,
 }
 
 impl Entity for Room {
@@ -121,7 +121,7 @@ impl Room {
             None
         };
 
-        let _maintain_connection =
+        let maintain_connection =
             cx.spawn_weak(|this, cx| Self::maintain_connection(this, client.clone(), cx));
 
         Self {
@@ -138,7 +138,7 @@ impl Room {
             pending_room_update: None,
             client,
             user_store,
-            _maintain_connection,
+            maintain_connection: Some(maintain_connection),
         }
     }
 
@@ -235,6 +235,8 @@ impl Room {
         self.participant_user_ids.clear();
         self.subscriptions.clear();
         self.live_kit.take();
+        self.pending_room_update.take();
+        self.maintain_connection.take();
         self.client.send(proto::LeaveRoom {})?;
         Ok(())
     }

crates/collab/migrations/20221207165001_add_connection_lost_to_room_participants.sql 🔗

@@ -3,5 +3,6 @@ ALTER TABLE "room_participants"
 
 CREATE INDEX "index_project_collaborators_on_connection_id" ON "project_collaborators" ("connection_id");
 CREATE UNIQUE INDEX "index_project_collaborators_on_project_id_connection_id_and_epoch" ON "project_collaborators" ("project_id", "connection_id", "connection_epoch");
+CREATE INDEX "index_room_participants_on_room_id_id" ON "room_participants" ("room_id");
 CREATE INDEX "index_room_participants_on_answering_connection_id" ON "room_participants" ("answering_connection_id");
 CREATE UNIQUE INDEX "index_room_participants_on_answering_connection_id_and_answering_connection_epoch" ON "room_participants" ("answering_connection_id", "answering_connection_epoch");

crates/collab/src/db.rs 🔗

@@ -131,29 +131,70 @@ impl Database {
         .await
     }
 
-    pub async fn delete_stale_rooms(&self) -> Result<()> {
+    pub async fn outdated_room_ids(&self) -> Result<Vec<RoomId>> {
         self.transaction(|tx| async move {
+            #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
+            enum QueryAs {
+                RoomId,
+            }
+
+            Ok(room_participant::Entity::find()
+                .select_only()
+                .column(room_participant::Column::RoomId)
+                .distinct()
+                .filter(room_participant::Column::AnsweringConnectionEpoch.ne(self.epoch()))
+                .into_values::<_, QueryAs>()
+                .all(&*tx)
+                .await?)
+        })
+        .await
+    }
+
+    pub async fn refresh_room(&self, room_id: RoomId) -> Result<RoomGuard<RefreshedRoom>> {
+        self.room_transaction(|tx| async move {
+            let stale_participant_filter = Condition::all()
+                .add(room_participant::Column::RoomId.eq(room_id))
+                .add(room_participant::Column::AnsweringConnectionId.is_not_null())
+                .add(room_participant::Column::AnsweringConnectionEpoch.ne(self.epoch()));
+
+            let stale_participant_user_ids = room_participant::Entity::find()
+                .filter(stale_participant_filter.clone())
+                .all(&*tx)
+                .await?
+                .into_iter()
+                .map(|participant| participant.user_id)
+                .collect::<Vec<_>>();
+
+            // Delete participants who failed to reconnect.
             room_participant::Entity::delete_many()
-                .filter(
-                    room_participant::Column::AnsweringConnectionEpoch
-                        .ne(self.epoch())
-                        .or(room_participant::Column::CallingConnectionEpoch.ne(self.epoch())),
-                )
+                .filter(stale_participant_filter)
                 .exec(&*tx)
                 .await?;
-            room::Entity::delete_many()
-                .filter(
-                    room::Column::Id.not_in_subquery(
-                        Query::select()
-                            .column(room_participant::Column::RoomId)
-                            .from(room_participant::Entity)
-                            .distinct()
-                            .to_owned(),
-                    ),
-                )
-                .exec(&*tx)
-                .await?;
-            Ok(())
+
+            let room = self.get_room(room_id, &tx).await?;
+            let mut canceled_calls_to_user_ids = Vec::new();
+            // Delete the room if it becomes empty and cancel pending calls.
+            if room.participants.is_empty() {
+                canceled_calls_to_user_ids.extend(
+                    room.pending_participants
+                        .iter()
+                        .map(|pending_participant| UserId::from_proto(pending_participant.user_id)),
+                );
+                room_participant::Entity::delete_many()
+                    .filter(room_participant::Column::RoomId.eq(room_id))
+                    .exec(&*tx)
+                    .await?;
+                room::Entity::delete_by_id(room_id).exec(&*tx).await?;
+            }
+
+            Ok((
+                room_id,
+                RefreshedRoom {
+                    room,
+                    stale_participant_user_ids,
+                    canceled_calls_to_user_ids,
+                },
+            ))
         })
         .await
     }
@@ -2575,6 +2616,12 @@ pub struct LeftRoom {
     pub canceled_calls_to_user_ids: Vec<UserId>,
 }
 
+pub struct RefreshedRoom {
+    pub room: proto::Room,
+    pub stale_participant_user_ids: Vec<UserId>,
+    pub canceled_calls_to_user_ids: Vec<UserId>,
+}
+
 pub struct Project {
     pub collaborators: Vec<project_collaborator::Model>,
     pub worktrees: BTreeMap<u64, Worktree>,

crates/collab/src/integration_tests.rs 🔗

@@ -18,10 +18,8 @@ use editor::{
 use fs::{FakeFs, Fs as _, HomeDir, LineEnding};
 use futures::{channel::oneshot, StreamExt as _};
 use gpui::{
-    executor::{self, Deterministic},
-    geometry::vector::vec2f,
-    test::EmptyView,
-    ModelHandle, Task, TestAppContext, ViewHandle,
+    executor::Deterministic, geometry::vector::vec2f, test::EmptyView, ModelHandle, Task,
+    TestAppContext, ViewHandle,
 };
 use language::{
     range_to_lsp, tree_sitter_rust, Diagnostic, DiagnosticEntry, FakeLspAdapter, Language,
@@ -36,7 +34,7 @@ use serde_json::json;
 use settings::{Formatter, Settings};
 use std::{
     cell::{Cell, RefCell},
-    env, mem,
+    env, future, mem,
     ops::Deref,
     path::{Path, PathBuf},
     rc::Rc,
@@ -66,7 +64,7 @@ async fn test_basic_calls(
     cx_c: &mut TestAppContext,
 ) {
     deterministic.forbid_parking();
-    let mut server = TestServer::start(cx_a.background()).await;
+    let mut server = TestServer::start(&deterministic).await;
 
     let client_a = server.create_client(cx_a, "user_a").await;
     let client_b = server.create_client(cx_b, "user_b").await;
@@ -265,7 +263,7 @@ async fn test_room_uniqueness(
     cx_c: &mut TestAppContext,
 ) {
     deterministic.forbid_parking();
-    let mut server = TestServer::start(cx_a.background()).await;
+    let mut server = TestServer::start(&deterministic).await;
     let client_a = server.create_client(cx_a, "user_a").await;
     let _client_a2 = server.create_client(cx_a2, "user_a").await;
     let client_b = server.create_client(cx_b, "user_b").await;
@@ -370,7 +368,7 @@ async fn test_client_disconnecting_from_room(
     cx_b: &mut TestAppContext,
 ) {
     deterministic.forbid_parking();
-    let mut server = TestServer::start(cx_a.background()).await;
+    let mut server = TestServer::start(&deterministic).await;
     let client_a = server.create_client(cx_a, "user_a").await;
     let client_b = server.create_client(cx_b, "user_b").await;
     server
@@ -520,68 +518,295 @@ async fn test_server_restarts(
     deterministic: Arc<Deterministic>,
     cx_a: &mut TestAppContext,
     cx_b: &mut TestAppContext,
+    cx_c: &mut TestAppContext,
+    cx_d: &mut TestAppContext,
 ) {
     deterministic.forbid_parking();
-    let mut server = TestServer::start(cx_a.background()).await;
+    let mut server = TestServer::start(&deterministic).await;
     let client_a = server.create_client(cx_a, "user_a").await;
     let client_b = server.create_client(cx_b, "user_b").await;
+    let client_c = server.create_client(cx_c, "user_c").await;
+    let client_d = server.create_client(cx_d, "user_d").await;
     server
-        .make_contacts(&mut [(&client_a, cx_a), (&client_b, cx_b)])
+        .make_contacts(&mut [
+            (&client_a, cx_a),
+            (&client_b, cx_b),
+            (&client_c, cx_c),
+            (&client_d, cx_d),
+        ])
         .await;
 
     let active_call_a = cx_a.read(ActiveCall::global);
     let active_call_b = cx_b.read(ActiveCall::global);
+    let active_call_c = cx_c.read(ActiveCall::global);
+    let active_call_d = cx_d.read(ActiveCall::global);
 
-    // Call user B from client A.
+    // User A calls users B, C, and D.
     active_call_a
         .update(cx_a, |call, cx| {
             call.invite(client_b.user_id().unwrap(), None, cx)
         })
         .await
         .unwrap();
+    active_call_a
+        .update(cx_a, |call, cx| {
+            call.invite(client_c.user_id().unwrap(), None, cx)
+        })
+        .await
+        .unwrap();
+    active_call_a
+        .update(cx_a, |call, cx| {
+            call.invite(client_d.user_id().unwrap(), None, cx)
+        })
+        .await
+        .unwrap();
     let room_a = active_call_a.read_with(cx_a, |call, _| call.room().unwrap().clone());
 
     // User B receives the call and joins the room.
     let mut incoming_call_b = active_call_b.read_with(cx_b, |call, _| call.incoming());
-    incoming_call_b.next().await.unwrap().unwrap();
+    assert!(incoming_call_b.next().await.unwrap().is_some());
     active_call_b
         .update(cx_b, |call, cx| call.accept_incoming(cx))
         .await
         .unwrap();
     let room_b = active_call_b.read_with(cx_b, |call, _| call.room().unwrap().clone());
+
+    // User C receives the call and joins the room.
+    let mut incoming_call_c = active_call_c.read_with(cx_c, |call, _| call.incoming());
+    assert!(incoming_call_c.next().await.unwrap().is_some());
+    active_call_c
+        .update(cx_c, |call, cx| call.accept_incoming(cx))
+        .await
+        .unwrap();
+    let room_c = active_call_c.read_with(cx_c, |call, _| call.room().unwrap().clone());
+
+    // User D receives the call but doesn't join the room yet.
+    let mut incoming_call_d = active_call_d.read_with(cx_d, |call, _| call.incoming());
+    assert!(incoming_call_d.next().await.unwrap().is_some());
+
+    deterministic.run_until_parked();
+    assert_eq!(
+        room_participants(&room_a, cx_a),
+        RoomParticipants {
+            remote: vec!["user_b".to_string(), "user_c".to_string()],
+            pending: vec!["user_d".to_string()]
+        }
+    );
+    assert_eq!(
+        room_participants(&room_b, cx_b),
+        RoomParticipants {
+            remote: vec!["user_a".to_string(), "user_c".to_string()],
+            pending: vec!["user_d".to_string()]
+        }
+    );
+    assert_eq!(
+        room_participants(&room_c, cx_c),
+        RoomParticipants {
+            remote: vec!["user_a".to_string(), "user_b".to_string()],
+            pending: vec!["user_d".to_string()]
+        }
+    );
+
+    // The server is torn down.
+    server.teardown();
+
+    // Users A and B reconnect to the call. User C has troubles reconnecting, so it leaves the room.
+    client_c.override_establish_connection(|_, cx| cx.spawn(|_| future::pending()));
+    deterministic.advance_clock(RECEIVE_TIMEOUT);
+    assert_eq!(
+        room_participants(&room_a, cx_a),
+        RoomParticipants {
+            remote: vec!["user_b".to_string(), "user_c".to_string()],
+            pending: vec!["user_d".to_string()]
+        }
+    );
+    assert_eq!(
+        room_participants(&room_b, cx_b),
+        RoomParticipants {
+            remote: vec!["user_a".to_string(), "user_c".to_string()],
+            pending: vec!["user_d".to_string()]
+        }
+    );
+    assert_eq!(
+        room_participants(&room_c, cx_c),
+        RoomParticipants {
+            remote: vec![],
+            pending: vec![]
+        }
+    );
+
+    // User D is notified again of the incoming call and accepts it.
+    assert!(incoming_call_d.next().await.unwrap().is_some());
+    active_call_d
+        .update(cx_d, |call, cx| call.accept_incoming(cx))
+        .await
+        .unwrap();
+    let room_d = active_call_d.read_with(cx_d, |call, _| call.room().unwrap().clone());
+    assert_eq!(
+        room_participants(&room_a, cx_a),
+        RoomParticipants {
+            remote: vec![
+                "user_b".to_string(),
+                "user_c".to_string(),
+                "user_d".to_string(),
+            ],
+            pending: vec![]
+        }
+    );
+    assert_eq!(
+        room_participants(&room_b, cx_b),
+        RoomParticipants {
+            remote: vec![
+                "user_a".to_string(),
+                "user_c".to_string(),
+                "user_d".to_string(),
+            ],
+            pending: vec![]
+        }
+    );
+    assert_eq!(
+        room_participants(&room_c, cx_c),
+        RoomParticipants {
+            remote: vec![],
+            pending: vec![]
+        }
+    );
+    assert_eq!(
+        room_participants(&room_d, cx_d),
+        RoomParticipants {
+            remote: vec![
+                "user_a".to_string(),
+                "user_b".to_string(),
+                "user_c".to_string(),
+            ],
+            pending: vec![]
+        }
+    );
+
+    // The server finishes restarting, cleaning up stale connections.
+    server.start().await.unwrap();
+    deterministic.advance_clock(RECONNECT_TIMEOUT);
+    assert_eq!(
+        room_participants(&room_a, cx_a),
+        RoomParticipants {
+            remote: vec!["user_b".to_string(), "user_d".to_string()],
+            pending: vec![]
+        }
+    );
+    assert_eq!(
+        room_participants(&room_b, cx_b),
+        RoomParticipants {
+            remote: vec!["user_a".to_string(), "user_d".to_string()],
+            pending: vec![]
+        }
+    );
+    assert_eq!(
+        room_participants(&room_c, cx_c),
+        RoomParticipants {
+            remote: vec![],
+            pending: vec![]
+        }
+    );
+    assert_eq!(
+        room_participants(&room_d, cx_d),
+        RoomParticipants {
+            remote: vec!["user_a".to_string(), "user_b".to_string()],
+            pending: vec![]
+        }
+    );
+
+    // User D hangs up.
+    active_call_d
+        .update(cx_d, |call, cx| call.hang_up(cx))
+        .unwrap();
     deterministic.run_until_parked();
     assert_eq!(
         room_participants(&room_a, cx_a),
         RoomParticipants {
             remote: vec!["user_b".to_string()],
-            pending: Default::default()
+            pending: vec![]
         }
     );
     assert_eq!(
         room_participants(&room_b, cx_b),
         RoomParticipants {
             remote: vec!["user_a".to_string()],
-            pending: Default::default()
+            pending: vec![]
+        }
+    );
+    assert_eq!(
+        room_participants(&room_c, cx_c),
+        RoomParticipants {
+            remote: vec![],
+            pending: vec![]
+        }
+    );
+    assert_eq!(
+        room_participants(&room_d, cx_d),
+        RoomParticipants {
+            remote: vec![],
+            pending: vec![]
         }
     );
 
-    // User A automatically reconnects to the room when the server restarts.
-    server.restart().await;
-    deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT);
+    // User B calls user D again.
+    active_call_b
+        .update(cx_b, |call, cx| {
+            call.invite(client_d.user_id().unwrap(), None, cx)
+        })
+        .await
+        .unwrap();
+
+    // User D receives the call but doesn't join the room yet.
+    let mut incoming_call_d = active_call_d.read_with(cx_d, |call, _| call.incoming());
+    assert!(incoming_call_d.next().await.unwrap().is_some());
+    deterministic.run_until_parked();
     assert_eq!(
         room_participants(&room_a, cx_a),
         RoomParticipants {
             remote: vec!["user_b".to_string()],
-            pending: Default::default()
+            pending: vec!["user_d".to_string()]
         }
     );
     assert_eq!(
         room_participants(&room_b, cx_b),
         RoomParticipants {
             remote: vec!["user_a".to_string()],
-            pending: Default::default()
+            pending: vec!["user_d".to_string()]
+        }
+    );
+
+    // The server is torn down.
+    server.teardown();
+
+    // Users A and B have troubles reconnecting, so they leave the room.
+    client_a.override_establish_connection(|_, cx| cx.spawn(|_| future::pending()));
+    client_b.override_establish_connection(|_, cx| cx.spawn(|_| future::pending()));
+    client_c.override_establish_connection(|_, cx| cx.spawn(|_| future::pending()));
+    deterministic.advance_clock(RECEIVE_TIMEOUT);
+    assert_eq!(
+        room_participants(&room_a, cx_a),
+        RoomParticipants {
+            remote: vec![],
+            pending: vec![]
+        }
+    );
+    assert_eq!(
+        room_participants(&room_b, cx_b),
+        RoomParticipants {
+            remote: vec![],
+            pending: vec![]
         }
     );
+
+    // User D is notified again of the incoming call but doesn't accept it.
+    assert!(incoming_call_d.next().await.unwrap().is_some());
+
+    // The server finishes restarting, cleaning up stale connections and canceling the
+    // call to user D because the room has become empty.
+    server.start().await.unwrap();
+    deterministic.advance_clock(RECONNECT_TIMEOUT);
+    assert!(incoming_call_d.next().await.unwrap().is_none());
 }
 
 #[gpui::test(iterations = 10)]
@@ -592,7 +817,7 @@ async fn test_calls_on_multiple_connections(
     cx_b2: &mut TestAppContext,
 ) {
     deterministic.forbid_parking();
-    let mut server = TestServer::start(cx_a.background()).await;
+    let mut server = TestServer::start(&deterministic).await;
     let client_a = server.create_client(cx_a, "user_a").await;
     let client_b1 = server.create_client(cx_b1, "user_b").await;
     let client_b2 = server.create_client(cx_b2, "user_b").await;
@@ -744,7 +969,7 @@ async fn test_share_project(
 ) {
     deterministic.forbid_parking();
     let (_, window_b) = cx_b.add_window(|_| EmptyView);
-    let mut server = TestServer::start(cx_a.background()).await;
+    let mut server = TestServer::start(&deterministic).await;
     let client_a = server.create_client(cx_a, "user_a").await;
     let client_b = server.create_client(cx_b, "user_b").await;
     let client_c = server.create_client(cx_c, "user_c").await;
@@ -881,7 +1106,7 @@ async fn test_unshare_project(
     cx_c: &mut TestAppContext,
 ) {
     deterministic.forbid_parking();
-    let mut server = TestServer::start(cx_a.background()).await;
+    let mut server = TestServer::start(&deterministic).await;
     let client_a = server.create_client(cx_a, "user_a").await;
     let client_b = server.create_client(cx_b, "user_b").await;
     let client_c = server.create_client(cx_c, "user_c").await;
@@ -964,7 +1189,7 @@ async fn test_host_disconnect(
 ) {
     cx_b.update(editor::init);
     deterministic.forbid_parking();
-    let mut server = TestServer::start(cx_a.background()).await;
+    let mut server = TestServer::start(&deterministic).await;
     let client_a = server.create_client(cx_a, "user_a").await;
     let client_b = server.create_client(cx_b, "user_b").await;
     let client_c = server.create_client(cx_c, "user_c").await;
@@ -1082,7 +1307,7 @@ async fn test_active_call_events(
     cx_b: &mut TestAppContext,
 ) {
     deterministic.forbid_parking();
-    let mut server = TestServer::start(cx_a.background()).await;
+    let mut server = TestServer::start(&deterministic).await;
     let client_a = server.create_client(cx_a, "user_a").await;
     let client_b = server.create_client(cx_b, "user_b").await;
     client_a.fs.insert_tree("/a", json!({})).await;
@@ -1171,7 +1396,7 @@ async fn test_room_location(
     cx_b: &mut TestAppContext,
 ) {
     deterministic.forbid_parking();
-    let mut server = TestServer::start(cx_a.background()).await;
+    let mut server = TestServer::start(&deterministic).await;
     let client_a = server.create_client(cx_a, "user_a").await;
     let client_b = server.create_client(cx_b, "user_b").await;
     client_a.fs.insert_tree("/a", json!({})).await;
@@ -1337,7 +1562,7 @@ async fn test_propagate_saves_and_fs_changes(
     cx_c: &mut TestAppContext,
 ) {
     deterministic.forbid_parking();
-    let mut server = TestServer::start(cx_a.background()).await;
+    let mut server = TestServer::start(&deterministic).await;
     let client_a = server.create_client(cx_a, "user_a").await;
     let client_b = server.create_client(cx_b, "user_b").await;
     let client_c = server.create_client(cx_c, "user_c").await;
@@ -1507,12 +1732,12 @@ async fn test_propagate_saves_and_fs_changes(
 
 #[gpui::test(iterations = 10)]
 async fn test_git_diff_base_change(
-    executor: Arc<Deterministic>,
+    deterministic: Arc<Deterministic>,
     cx_a: &mut TestAppContext,
     cx_b: &mut TestAppContext,
 ) {
-    executor.forbid_parking();
-    let mut server = TestServer::start(cx_a.background()).await;
+    deterministic.forbid_parking();
+    let mut server = TestServer::start(&deterministic).await;
     let client_a = server.create_client(cx_a, "user_a").await;
     let client_b = server.create_client(cx_b, "user_b").await;
     server
@@ -1581,7 +1806,7 @@ async fn test_git_diff_base_change(
         .unwrap();
 
     // Wait for it to catch up to the new diff
-    executor.run_until_parked();
+    deterministic.run_until_parked();
 
     // Smoke test diffing
     buffer_local_a.read_with(cx_a, |buffer, _| {
@@ -1601,7 +1826,7 @@ async fn test_git_diff_base_change(
         .unwrap();
 
     // Wait remote buffer to catch up to the new diff
-    executor.run_until_parked();
+    deterministic.run_until_parked();
 
     // Smoke test diffing
     buffer_remote_a.read_with(cx_b, |buffer, _| {
@@ -1624,7 +1849,7 @@ async fn test_git_diff_base_change(
         .await;
 
     // Wait for buffer_local_a to receive it
-    executor.run_until_parked();
+    deterministic.run_until_parked();
 
     // Smoke test new diffing
     buffer_local_a.read_with(cx_a, |buffer, _| {
@@ -1679,7 +1904,7 @@ async fn test_git_diff_base_change(
         .unwrap();
 
     // Wait for it to catch up to the new diff
-    executor.run_until_parked();
+    deterministic.run_until_parked();
 
     // Smoke test diffing
     buffer_local_b.read_with(cx_a, |buffer, _| {
@@ -1699,7 +1924,7 @@ async fn test_git_diff_base_change(
         .unwrap();
 
     // Wait remote buffer to catch up to the new diff
-    executor.run_until_parked();
+    deterministic.run_until_parked();
 
     // Smoke test diffing
     buffer_remote_b.read_with(cx_b, |buffer, _| {
@@ -1722,7 +1947,7 @@ async fn test_git_diff_base_change(
         .await;
 
     // Wait for buffer_local_b to receive it
-    executor.run_until_parked();
+    deterministic.run_until_parked();
 
     // Smoke test new diffing
     buffer_local_b.read_with(cx_a, |buffer, _| {
@@ -1759,12 +1984,12 @@ async fn test_git_diff_base_change(
 
 #[gpui::test(iterations = 10)]
 async fn test_fs_operations(
-    executor: Arc<Deterministic>,
+    deterministic: Arc<Deterministic>,
     cx_a: &mut TestAppContext,
     cx_b: &mut TestAppContext,
 ) {
-    executor.forbid_parking();
-    let mut server = TestServer::start(cx_a.background()).await;
+    deterministic.forbid_parking();
+    let mut server = TestServer::start(&deterministic).await;
     let client_a = server.create_client(cx_a, "user_a").await;
     let client_b = server.create_client(cx_b, "user_b").await;
     server
@@ -2028,9 +2253,13 @@ async fn test_fs_operations(
 }
 
 #[gpui::test(iterations = 10)]
-async fn test_buffer_conflict_after_save(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
-    cx_a.foreground().forbid_parking();
-    let mut server = TestServer::start(cx_a.background()).await;
+async fn test_buffer_conflict_after_save(
+    deterministic: Arc<Deterministic>,
+    cx_a: &mut TestAppContext,
+    cx_b: &mut TestAppContext,
+) {
+    deterministic.forbid_parking();
+    let mut server = TestServer::start(&deterministic).await;
     let client_a = server.create_client(cx_a, "user_a").await;
     let client_b = server.create_client(cx_b, "user_b").await;
     server
@@ -2082,9 +2311,13 @@ async fn test_buffer_conflict_after_save(cx_a: &mut TestAppContext, cx_b: &mut T
 }
 
 #[gpui::test(iterations = 10)]
-async fn test_buffer_reloading(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
-    cx_a.foreground().forbid_parking();
-    let mut server = TestServer::start(cx_a.background()).await;
+async fn test_buffer_reloading(
+    deterministic: Arc<Deterministic>,
+    cx_a: &mut TestAppContext,
+    cx_b: &mut TestAppContext,
+) {
+    deterministic.forbid_parking();
+    let mut server = TestServer::start(&deterministic).await;
     let client_a = server.create_client(cx_a, "user_a").await;
     let client_b = server.create_client(cx_b, "user_b").await;
     server
@@ -2139,11 +2372,12 @@ async fn test_buffer_reloading(cx_a: &mut TestAppContext, cx_b: &mut TestAppCont
 
 #[gpui::test(iterations = 10)]
 async fn test_editing_while_guest_opens_buffer(
+    deterministic: Arc<Deterministic>,
     cx_a: &mut TestAppContext,
     cx_b: &mut TestAppContext,
 ) {
-    cx_a.foreground().forbid_parking();
-    let mut server = TestServer::start(cx_a.background()).await;
+    deterministic.forbid_parking();
+    let mut server = TestServer::start(&deterministic).await;
     let client_a = server.create_client(cx_a, "user_a").await;
     let client_b = server.create_client(cx_b, "user_b").await;
     server
@@ -2186,11 +2420,12 @@ async fn test_editing_while_guest_opens_buffer(
 
 #[gpui::test(iterations = 10)]
 async fn test_leaving_worktree_while_opening_buffer(
+    deterministic: Arc<Deterministic>,
     cx_a: &mut TestAppContext,
     cx_b: &mut TestAppContext,
 ) {
-    cx_a.foreground().forbid_parking();
-    let mut server = TestServer::start(cx_a.background()).await;
+    deterministic.forbid_parking();
+    let mut server = TestServer::start(&deterministic).await;
     let client_a = server.create_client(cx_a, "user_a").await;
     let client_b = server.create_client(cx_b, "user_b").await;
     server
@@ -2235,7 +2470,7 @@ async fn test_canceling_buffer_opening(
 ) {
     deterministic.forbid_parking();
 
-    let mut server = TestServer::start(cx_a.background()).await;
+    let mut server = TestServer::start(&deterministic).await;
     let client_a = server.create_client(cx_a, "user_a").await;
     let client_b = server.create_client(cx_b, "user_b").await;
     server
@@ -2286,7 +2521,7 @@ async fn test_leaving_project(
     cx_c: &mut TestAppContext,
 ) {
     deterministic.forbid_parking();
-    let mut server = TestServer::start(cx_a.background()).await;
+    let mut server = TestServer::start(&deterministic).await;
     let client_a = server.create_client(cx_a, "user_a").await;
     let client_b = server.create_client(cx_b, "user_b").await;
     let client_c = server.create_client(cx_c, "user_c").await;
@@ -2397,7 +2632,7 @@ async fn test_leaving_project(
     // Simulate connection loss for client C and ensure client A observes client C leaving the project.
     client_c.wait_for_current_user(cx_c).await;
     server.disconnect_client(client_c.peer_id().unwrap());
-    cx_a.foreground().advance_clock(RECEIVE_TIMEOUT);
+    deterministic.advance_clock(RECEIVE_TIMEOUT);
     deterministic.run_until_parked();
     project_a.read_with(cx_a, |project, _| {
         assert_eq!(project.collaborators().len(), 0);
@@ -2418,7 +2653,7 @@ async fn test_collaborating_with_diagnostics(
     cx_c: &mut TestAppContext,
 ) {
     deterministic.forbid_parking();
-    let mut server = TestServer::start(cx_a.background()).await;
+    let mut server = TestServer::start(&deterministic).await;
     let client_a = server.create_client(cx_a, "user_a").await;
     let client_b = server.create_client(cx_b, "user_b").await;
     let client_c = server.create_client(cx_c, "user_c").await;
@@ -2678,9 +2913,13 @@ async fn test_collaborating_with_diagnostics(
 }
 
 #[gpui::test(iterations = 10)]
-async fn test_collaborating_with_completion(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
-    cx_a.foreground().forbid_parking();
-    let mut server = TestServer::start(cx_a.background()).await;
+async fn test_collaborating_with_completion(
+    deterministic: Arc<Deterministic>,
+    cx_a: &mut TestAppContext,
+    cx_b: &mut TestAppContext,
+) {
+    deterministic.forbid_parking();
+    let mut server = TestServer::start(&deterministic).await;
     let client_a = server.create_client(cx_a, "user_a").await;
     let client_b = server.create_client(cx_b, "user_b").await;
     server
@@ -2852,9 +3091,13 @@ async fn test_collaborating_with_completion(cx_a: &mut TestAppContext, cx_b: &mu
 }
 
 #[gpui::test(iterations = 10)]
-async fn test_reloading_buffer_manually(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
-    cx_a.foreground().forbid_parking();
-    let mut server = TestServer::start(cx_a.background()).await;
+async fn test_reloading_buffer_manually(
+    deterministic: Arc<Deterministic>,
+    cx_a: &mut TestAppContext,
+    cx_b: &mut TestAppContext,
+) {
+    deterministic.forbid_parking();
+    let mut server = TestServer::start(&deterministic).await;
     let client_a = server.create_client(cx_a, "user_a").await;
     let client_b = server.create_client(cx_b, "user_b").await;
     server
@@ -2944,10 +3187,14 @@ async fn test_reloading_buffer_manually(cx_a: &mut TestAppContext, cx_b: &mut Te
 }
 
 #[gpui::test(iterations = 10)]
-async fn test_formatting_buffer(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
+async fn test_formatting_buffer(
+    deterministic: Arc<Deterministic>,
+    cx_a: &mut TestAppContext,
+    cx_b: &mut TestAppContext,
+) {
     use project::FormatTrigger;
 
-    let mut server = TestServer::start(cx_a.background()).await;
+    let mut server = TestServer::start(&deterministic).await;
     let client_a = server.create_client(cx_a, "user_a").await;
     let client_b = server.create_client(cx_b, "user_b").await;
     server
@@ -3046,9 +3293,13 @@ async fn test_formatting_buffer(cx_a: &mut TestAppContext, cx_b: &mut TestAppCon
 }
 
 #[gpui::test(iterations = 10)]
-async fn test_definition(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
-    cx_a.foreground().forbid_parking();
-    let mut server = TestServer::start(cx_a.background()).await;
+async fn test_definition(
+    deterministic: Arc<Deterministic>,
+    cx_a: &mut TestAppContext,
+    cx_b: &mut TestAppContext,
+) {
+    deterministic.forbid_parking();
+    let mut server = TestServer::start(&deterministic).await;
     let client_a = server.create_client(cx_a, "user_a").await;
     let client_b = server.create_client(cx_b, "user_b").await;
     server
@@ -3190,9 +3441,13 @@ async fn test_definition(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
 }
 
 #[gpui::test(iterations = 10)]
-async fn test_references(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
-    cx_a.foreground().forbid_parking();
-    let mut server = TestServer::start(cx_a.background()).await;
+async fn test_references(
+    deterministic: Arc<Deterministic>,
+    cx_a: &mut TestAppContext,
+    cx_b: &mut TestAppContext,
+) {
+    deterministic.forbid_parking();
+    let mut server = TestServer::start(&deterministic).await;
     let client_a = server.create_client(cx_a, "user_a").await;
     let client_b = server.create_client(cx_b, "user_b").await;
     server
@@ -3291,9 +3546,13 @@ async fn test_references(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
 }
 
 #[gpui::test(iterations = 10)]
-async fn test_project_search(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
-    cx_a.foreground().forbid_parking();
-    let mut server = TestServer::start(cx_a.background()).await;
+async fn test_project_search(
+    deterministic: Arc<Deterministic>,
+    cx_a: &mut TestAppContext,
+    cx_b: &mut TestAppContext,
+) {
+    deterministic.forbid_parking();
+    let mut server = TestServer::start(&deterministic).await;
     let client_a = server.create_client(cx_a, "user_a").await;
     let client_b = server.create_client(cx_b, "user_b").await;
     server
@@ -3370,9 +3629,13 @@ async fn test_project_search(cx_a: &mut TestAppContext, cx_b: &mut TestAppContex
 }
 
 #[gpui::test(iterations = 10)]
-async fn test_document_highlights(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
-    cx_a.foreground().forbid_parking();
-    let mut server = TestServer::start(cx_a.background()).await;
+async fn test_document_highlights(
+    deterministic: Arc<Deterministic>,
+    cx_a: &mut TestAppContext,
+    cx_b: &mut TestAppContext,
+) {
+    deterministic.forbid_parking();
+    let mut server = TestServer::start(&deterministic).await;
     let client_a = server.create_client(cx_a, "user_a").await;
     let client_b = server.create_client(cx_b, "user_b").await;
     server
@@ -3472,9 +3735,13 @@ async fn test_document_highlights(cx_a: &mut TestAppContext, cx_b: &mut TestAppC
 }
 
 #[gpui::test(iterations = 10)]
-async fn test_lsp_hover(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
-    cx_a.foreground().forbid_parking();
-    let mut server = TestServer::start(cx_a.background()).await;
+async fn test_lsp_hover(
+    deterministic: Arc<Deterministic>,
+    cx_a: &mut TestAppContext,
+    cx_b: &mut TestAppContext,
+) {
+    deterministic.forbid_parking();
+    let mut server = TestServer::start(&deterministic).await;
     let client_a = server.create_client(cx_a, "user_a").await;
     let client_b = server.create_client(cx_b, "user_b").await;
     server
@@ -3575,9 +3842,13 @@ async fn test_lsp_hover(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
 }
 
 #[gpui::test(iterations = 10)]
-async fn test_project_symbols(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
-    cx_a.foreground().forbid_parking();
-    let mut server = TestServer::start(cx_a.background()).await;
+async fn test_project_symbols(
+    deterministic: Arc<Deterministic>,
+    cx_a: &mut TestAppContext,
+    cx_b: &mut TestAppContext,
+) {
+    deterministic.forbid_parking();
+    let mut server = TestServer::start(&deterministic).await;
     let client_a = server.create_client(cx_a, "user_a").await;
     let client_b = server.create_client(cx_b, "user_b").await;
     server
@@ -3680,12 +3951,13 @@ async fn test_project_symbols(cx_a: &mut TestAppContext, cx_b: &mut TestAppConte
 
 #[gpui::test(iterations = 10)]
 async fn test_open_buffer_while_getting_definition_pointing_to_it(
+    deterministic: Arc<Deterministic>,
     cx_a: &mut TestAppContext,
     cx_b: &mut TestAppContext,
     mut rng: StdRng,
 ) {
-    cx_a.foreground().forbid_parking();
-    let mut server = TestServer::start(cx_a.background()).await;
+    deterministic.forbid_parking();
+    let mut server = TestServer::start(&deterministic).await;
     let client_a = server.create_client(cx_a, "user_a").await;
     let client_b = server.create_client(cx_b, "user_b").await;
     server
@@ -3756,12 +4028,13 @@ async fn test_open_buffer_while_getting_definition_pointing_to_it(
 
 #[gpui::test(iterations = 10)]
 async fn test_collaborating_with_code_actions(
+    deterministic: Arc<Deterministic>,
     cx_a: &mut TestAppContext,
     cx_b: &mut TestAppContext,
 ) {
-    cx_a.foreground().forbid_parking();
+    deterministic.forbid_parking();
     cx_b.update(editor::init);
-    let mut server = TestServer::start(cx_a.background()).await;
+    let mut server = TestServer::start(&deterministic).await;
     let client_a = server.create_client(cx_a, "user_a").await;
     let client_b = server.create_client(cx_b, "user_b").await;
     server
@@ -3976,10 +4249,14 @@ async fn test_collaborating_with_code_actions(
 }
 
 #[gpui::test(iterations = 10)]
-async fn test_collaborating_with_renames(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
-    cx_a.foreground().forbid_parking();
+async fn test_collaborating_with_renames(
+    deterministic: Arc<Deterministic>,
+    cx_a: &mut TestAppContext,
+    cx_b: &mut TestAppContext,
+) {
+    deterministic.forbid_parking();
     cx_b.update(editor::init);
-    let mut server = TestServer::start(cx_a.background()).await;
+    let mut server = TestServer::start(&deterministic).await;
     let client_a = server.create_client(cx_a, "user_a").await;
     let client_b = server.create_client(cx_b, "user_b").await;
     server
@@ -4178,7 +4455,7 @@ async fn test_language_server_statuses(
     deterministic.forbid_parking();
 
     cx_b.update(editor::init);
-    let mut server = TestServer::start(cx_a.background()).await;
+    let mut server = TestServer::start(&deterministic).await;
     let client_a = server.create_client(cx_a, "user_a").await;
     let client_b = server.create_client(cx_b, "user_b").await;
     server
@@ -4290,8 +4567,8 @@ async fn test_contacts(
     cx_c: &mut TestAppContext,
     cx_d: &mut TestAppContext,
 ) {
-    cx_a.foreground().forbid_parking();
-    let mut server = TestServer::start(cx_a.background()).await;
+    deterministic.forbid_parking();
+    let mut server = TestServer::start(&deterministic).await;
     let client_a = server.create_client(cx_a, "user_a").await;
     let client_b = server.create_client(cx_b, "user_b").await;
     let client_c = server.create_client(cx_c, "user_c").await;
@@ -4678,7 +4955,7 @@ async fn test_contacts(
 
 #[gpui::test(iterations = 10)]
 async fn test_contact_requests(
-    executor: Arc<Deterministic>,
+    deterministic: Arc<Deterministic>,
     cx_a: &mut TestAppContext,
     cx_a2: &mut TestAppContext,
     cx_b: &mut TestAppContext,
@@ -4686,10 +4963,10 @@ async fn test_contact_requests(
     cx_c: &mut TestAppContext,
     cx_c2: &mut TestAppContext,
 ) {
-    cx_a.foreground().forbid_parking();
+    deterministic.forbid_parking();
 
     // Connect to a server as 3 clients.
-    let mut server = TestServer::start(cx_a.background()).await;
+    let mut server = TestServer::start(&deterministic).await;
     let client_a = server.create_client(cx_a, "user_a").await;
     let client_a2 = server.create_client(cx_a2, "user_a").await;
     let client_b = server.create_client(cx_b, "user_b").await;
@@ -4716,7 +4993,7 @@ async fn test_contact_requests(
         })
         .await
         .unwrap();
-    executor.run_until_parked();
+    deterministic.run_until_parked();
 
     // All users see the pending request appear in all their clients.
     assert_eq!(
@@ -4748,7 +5025,7 @@ async fn test_contact_requests(
     disconnect_and_reconnect(&client_a, cx_a).await;
     disconnect_and_reconnect(&client_b, cx_b).await;
     disconnect_and_reconnect(&client_c, cx_c).await;
-    executor.run_until_parked();
+    deterministic.run_until_parked();
     assert_eq!(
         client_a.summarize_contacts(cx_a).outgoing_requests,
         &["user_b"]
@@ -4771,7 +5048,7 @@ async fn test_contact_requests(
         .await
         .unwrap();
 
-    executor.run_until_parked();
+    deterministic.run_until_parked();
 
     // User B sees user A as their contact now in all client, and the incoming request from them is removed.
     let contacts_b = client_b.summarize_contacts(cx_b);
@@ -4793,7 +5070,7 @@ async fn test_contact_requests(
     disconnect_and_reconnect(&client_a, cx_a).await;
     disconnect_and_reconnect(&client_b, cx_b).await;
     disconnect_and_reconnect(&client_c, cx_c).await;
-    executor.run_until_parked();
+    deterministic.run_until_parked();
     assert_eq!(client_a.summarize_contacts(cx_a).current, &["user_b"]);
     assert_eq!(client_b.summarize_contacts(cx_b).current, &["user_a"]);
     assert_eq!(
@@ -4815,7 +5092,7 @@ async fn test_contact_requests(
         .await
         .unwrap();
 
-    executor.run_until_parked();
+    deterministic.run_until_parked();
 
     // User B doesn't see user C as their contact, and the incoming request from them is removed.
     let contacts_b = client_b.summarize_contacts(cx_b);
@@ -4837,7 +5114,7 @@ async fn test_contact_requests(
     disconnect_and_reconnect(&client_a, cx_a).await;
     disconnect_and_reconnect(&client_b, cx_b).await;
     disconnect_and_reconnect(&client_c, cx_c).await;
-    executor.run_until_parked();
+    deterministic.run_until_parked();
     assert_eq!(client_a.summarize_contacts(cx_a).current, &["user_b"]);
     assert_eq!(client_b.summarize_contacts(cx_b).current, &["user_a"]);
     assert!(client_b
@@ -4866,11 +5143,11 @@ async fn test_following(
     cx_a: &mut TestAppContext,
     cx_b: &mut TestAppContext,
 ) {
-    cx_a.foreground().forbid_parking();
+    deterministic.forbid_parking();
     cx_a.update(editor::init);
     cx_b.update(editor::init);
 
-    let mut server = TestServer::start(cx_a.background()).await;
+    let mut server = TestServer::start(&deterministic).await;
     let client_a = server.create_client(cx_a, "user_a").await;
     let client_b = server.create_client(cx_b, "user_b").await;
     server
@@ -5147,7 +5424,7 @@ async fn test_following_tab_order(
     cx_a.update(editor::init);
     cx_b.update(editor::init);
 
-    let mut server = TestServer::start(cx_a.background()).await;
+    let mut server = TestServer::start(&deterministic).await;
     let client_a = server.create_client(cx_a, "user_a").await;
     let client_b = server.create_client(cx_b, "user_b").await;
     server
@@ -5262,12 +5539,16 @@ async fn test_following_tab_order(
 }
 
 #[gpui::test(iterations = 10)]
-async fn test_peers_following_each_other(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
-    cx_a.foreground().forbid_parking();
+async fn test_peers_following_each_other(
+    deterministic: Arc<Deterministic>,
+    cx_a: &mut TestAppContext,
+    cx_b: &mut TestAppContext,
+) {
+    deterministic.forbid_parking();
     cx_a.update(editor::init);
     cx_b.update(editor::init);
 
-    let mut server = TestServer::start(cx_a.background()).await;
+    let mut server = TestServer::start(&deterministic).await;
     let client_a = server.create_client(cx_a, "user_a").await;
     let client_b = server.create_client(cx_b, "user_b").await;
     server
@@ -5439,13 +5720,17 @@ async fn test_peers_following_each_other(cx_a: &mut TestAppContext, cx_b: &mut T
 }
 
 #[gpui::test(iterations = 10)]
-async fn test_auto_unfollowing(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
-    cx_a.foreground().forbid_parking();
+async fn test_auto_unfollowing(
+    deterministic: Arc<Deterministic>,
+    cx_a: &mut TestAppContext,
+    cx_b: &mut TestAppContext,
+) {
+    deterministic.forbid_parking();
     cx_a.update(editor::init);
     cx_b.update(editor::init);
 
     // 2 clients connect to a server.
-    let mut server = TestServer::start(cx_a.background()).await;
+    let mut server = TestServer::start(&deterministic).await;
     let client_a = server.create_client(cx_a, "user_a").await;
     let client_b = server.create_client(cx_b, "user_b").await;
     server
@@ -5619,7 +5904,7 @@ async fn test_peers_simultaneously_following_each_other(
     cx_a.update(editor::init);
     cx_b.update(editor::init);
 
-    let mut server = TestServer::start(cx_a.background()).await;
+    let mut server = TestServer::start(&deterministic).await;
     let client_a = server.create_client(cx_a, "user_a").await;
     let client_b = server.create_client(cx_b, "user_b").await;
     server
@@ -5689,7 +5974,7 @@ async fn test_random_collaboration(
         .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
         .unwrap_or(10);
 
-    let mut server = TestServer::start(cx.background()).await;
+    let mut server = TestServer::start(&deterministic).await;
     let db = server.app_state.db.clone();
 
     let mut available_guests = Vec::new();
@@ -6010,27 +6295,32 @@ struct TestServer {
 }
 
 impl TestServer {
-    async fn start(background: Arc<executor::Background>) -> Self {
+    async fn start(deterministic: &Arc<Deterministic>) -> Self {
         static NEXT_LIVE_KIT_SERVER_ID: AtomicUsize = AtomicUsize::new(0);
 
         let use_postgres = env::var("USE_POSTGRES").ok();
         let use_postgres = use_postgres.as_deref();
         let test_db = if use_postgres == Some("true") || use_postgres == Some("1") {
-            TestDb::postgres(background.clone())
+            TestDb::postgres(deterministic.build_background())
         } else {
-            TestDb::sqlite(background.clone())
+            TestDb::sqlite(deterministic.build_background())
         };
         let live_kit_server_id = NEXT_LIVE_KIT_SERVER_ID.fetch_add(1, SeqCst);
         let live_kit_server = live_kit_client::TestServer::create(
             format!("http://livekit.{}.test", live_kit_server_id),
             format!("devkey-{}", live_kit_server_id),
             format!("secret-{}", live_kit_server_id),
-            background.clone(),
+            deterministic.build_background(),
         )
         .unwrap();
         let app_state = Self::build_app_state(&test_db, &live_kit_server).await;
-        let server = Server::new(app_state.clone());
+        let server = Server::new(
+            app_state.clone(),
+            Executor::Deterministic(deterministic.build_background()),
+        );
         server.start().await.unwrap();
+        // Advance clock to ensure the server's cleanup task is finished.
+        deterministic.advance_clock(RECONNECT_TIMEOUT);
         Self {
             app_state,
             server,

crates/collab/src/lib.rs 🔗

@@ -2,7 +2,7 @@ pub mod api;
 pub mod auth;
 pub mod db;
 pub mod env;
-mod executor;
+pub mod executor;
 #[cfg(test)]
 mod integration_tests;
 pub mod rpc;

crates/collab/src/main.rs 🔗

@@ -1,6 +1,6 @@
 use anyhow::anyhow;
 use axum::{routing::get, Router};
-use collab::{db, env, AppState, Config, MigrateConfig, Result};
+use collab::{db, env, executor::Executor, AppState, Config, MigrateConfig, Result};
 use db::Database;
 use std::{
     env::args,
@@ -56,7 +56,7 @@ async fn main() -> Result<()> {
             let listener = TcpListener::bind(&format!("0.0.0.0:{}", state.config.http_port))
                 .expect("failed to bind TCP listener");
 
-            let rpc_server = collab::rpc::Server::new(state.clone());
+            let rpc_server = collab::rpc::Server::new(state.clone(), Executor::Production);
             rpc_server.start().await?;
 
             let app = collab::api::routes(rpc_server.clone(), state.clone())

crates/collab/src/rpc.rs 🔗

@@ -142,6 +142,7 @@ pub struct Server {
     peer: Arc<Peer>,
     pub(crate) connection_pool: Arc<Mutex<ConnectionPool>>,
     app_state: Arc<AppState>,
+    executor: Executor,
     handlers: HashMap<TypeId, MessageHandler>,
     teardown: watch::Sender<()>,
 }
@@ -168,10 +169,11 @@ where
 }
 
 impl Server {
-    pub fn new(app_state: Arc<AppState>) -> Arc<Self> {
+    pub fn new(app_state: Arc<AppState>, executor: Executor) -> Arc<Self> {
         let mut server = Self {
             peer: Peer::new(),
             app_state,
+            executor,
             connection_pool: Default::default(),
             handlers: Default::default(),
             teardown: watch::channel(()).0,
@@ -239,8 +241,85 @@ impl Server {
 
     pub async fn start(&self) -> Result<()> {
         self.app_state.db.delete_stale_projects().await?;
-        // TODO: delete stale rooms after timeout.
-        // self.app_state.db.delete_stale_rooms().await?;
+        let db = self.app_state.db.clone();
+        let peer = self.peer.clone();
+        let timeout = self.executor.sleep(RECONNECT_TIMEOUT);
+        let pool = self.connection_pool.clone();
+        let live_kit_client = self.app_state.live_kit_client.clone();
+        self.executor.spawn_detached(async move {
+            timeout.await;
+            if let Some(room_ids) = db.outdated_room_ids().await.trace_err() {
+                for room_id in room_ids {
+                    let mut contacts_to_update = HashSet::default();
+                    let mut canceled_calls_to_user_ids = Vec::new();
+                    let mut live_kit_room = String::new();
+                    let mut delete_live_kit_room = false;
+
+                    if let Ok(mut refreshed_room) = db.refresh_room(room_id).await {
+                        room_updated(&refreshed_room.room, &peer);
+                        contacts_to_update
+                            .extend(refreshed_room.stale_participant_user_ids.iter().copied());
+                        contacts_to_update
+                            .extend(refreshed_room.canceled_calls_to_user_ids.iter().copied());
+                        canceled_calls_to_user_ids =
+                            mem::take(&mut refreshed_room.canceled_calls_to_user_ids);
+                        dbg!(&canceled_calls_to_user_ids);
+                        live_kit_room = mem::take(&mut refreshed_room.room.live_kit_room);
+                        delete_live_kit_room = refreshed_room.room.participants.is_empty();
+                    }
+
+                    {
+                        let pool = pool.lock().await;
+                        for canceled_user_id in canceled_calls_to_user_ids {
+                            for connection_id in pool.user_connection_ids(canceled_user_id) {
+                                peer.send(connection_id, proto::CallCanceled {}).trace_err();
+                            }
+                        }
+                    }
+
+                    for user_id in contacts_to_update {
+                        if let Some((busy, contacts)) = db
+                            .is_user_busy(user_id)
+                            .await
+                            .trace_err()
+                            .zip(db.get_contacts(user_id).await.trace_err())
+                        {
+                            let pool = pool.lock().await;
+                            let updated_contact = contact_for_user(user_id, false, busy, &pool);
+                            for contact in contacts {
+                                if let db::Contact::Accepted {
+                                    user_id: contact_user_id,
+                                    ..
+                                } = contact
+                                {
+                                    for contact_conn_id in pool.user_connection_ids(contact_user_id)
+                                    {
+                                        peer.send(
+                                            contact_conn_id,
+                                            proto::UpdateContacts {
+                                                contacts: vec![updated_contact.clone()],
+                                                remove_contacts: Default::default(),
+                                                incoming_requests: Default::default(),
+                                                remove_incoming_requests: Default::default(),
+                                                outgoing_requests: Default::default(),
+                                                remove_outgoing_requests: Default::default(),
+                                            },
+                                        )
+                                        .trace_err();
+                                    }
+                                }
+                            }
+                        }
+                    }
+
+                    if let Some(live_kit) = live_kit_client.as_ref() {
+                        if delete_live_kit_room {
+                            live_kit.delete_room(live_kit_room).await.trace_err();
+                        }
+                    }
+                }
+            }
+        });
         Ok(())
     }
 
@@ -690,7 +769,7 @@ async fn sign_out(
             {
                 let db = session.db().await;
                 if let Some(room) = db.decline_call(None, session.user_id).await.trace_err() {
-                    room_updated(&room, &session);
+                    room_updated(&room, &session.peer);
                 }
             }
             update_user_contacts(session.user_id, &session).await?;
@@ -768,7 +847,7 @@ async fn join_room(
                 session.connection_id,
             )
             .await?;
-        room_updated(&room, &session);
+        room_updated(&room, &session.peer);
         room.clone()
     };
 
@@ -843,7 +922,7 @@ async fn call(
                 initial_project_id,
             )
             .await?;
-        room_updated(&room, &session);
+        room_updated(&room, &session.peer);
         mem::take(incoming_call)
     };
     update_user_contacts(called_user_id, &session).await?;
@@ -873,7 +952,7 @@ async fn call(
             .await
             .call_failed(room_id, called_user_id)
             .await?;
-        room_updated(&room, &session);
+        room_updated(&room, &session.peer);
     }
     update_user_contacts(called_user_id, &session).await?;
 
@@ -893,7 +972,7 @@ async fn cancel_call(
             .await
             .cancel_call(Some(room_id), session.connection_id, called_user_id)
             .await?;
-        room_updated(&room, &session);
+        room_updated(&room, &session.peer);
     }
 
     for connection_id in session
@@ -920,7 +999,7 @@ async fn decline_call(message: proto::DeclineCall, session: Session) -> Result<(
             .await
             .decline_call(Some(room_id), session.user_id)
             .await?;
-        room_updated(&room, &session);
+        room_updated(&room, &session.peer);
     }
 
     for connection_id in session
@@ -951,7 +1030,7 @@ async fn update_participant_location(
         .await
         .update_room_participant_location(room_id, session.connection_id, location)
         .await?;
-    room_updated(&room, &session);
+    room_updated(&room, &session.peer);
     response.send(proto::Ack {})?;
     Ok(())
 }
@@ -973,7 +1052,7 @@ async fn share_project(
     response.send(proto::ShareProjectResponse {
         project_id: project_id.to_proto(),
     })?;
-    room_updated(&room, &session);
+    room_updated(&room, &session.peer);
 
     Ok(())
 }
@@ -992,7 +1071,7 @@ async fn unshare_project(message: proto::UnshareProject, session: Session) -> Re
         guest_connection_ids.iter().copied(),
         |conn_id| session.peer.send(conn_id, message.clone()),
     );
-    room_updated(&room, &session);
+    room_updated(&room, &session.peer);
 
     Ok(())
 }
@@ -1151,7 +1230,7 @@ async fn update_project(
                 .forward_send(session.connection_id, connection_id, request.clone())
         },
     );
-    room_updated(&room, &session);
+    room_updated(&room, &session.peer);
     response.send(proto::Ack {})?;
 
     Ok(())
@@ -1798,17 +1877,15 @@ fn contact_for_user(
     }
 }
 
-fn room_updated(room: &proto::Room, session: &Session) {
+fn room_updated(room: &proto::Room, peer: &Peer) {
     for participant in &room.participants {
-        session
-            .peer
-            .send(
-                ConnectionId(participant.peer_id),
-                proto::RoomUpdated {
-                    room: Some(room.clone()),
-                },
-            )
-            .trace_err();
+        peer.send(
+            ConnectionId(participant.peer_id),
+            proto::RoomUpdated {
+                room: Some(room.clone()),
+            },
+        )
+        .trace_err();
     }
 }
 
@@ -1860,7 +1937,7 @@ async fn leave_room_for_session(session: &Session) -> Result<()> {
             project_left(project, session);
         }
 
-        room_updated(&left_room.room, &session);
+        room_updated(&left_room.room, &session.peer);
         canceled_calls_to_user_ids = mem::take(&mut left_room.canceled_calls_to_user_ids);
         live_kit_room = mem::take(&mut left_room.room.live_kit_room);
         delete_live_kit_room = left_room.room.participants.is_empty();