Handle unshared projects when rejoining a room

Max Brunsfeld and Antonio Scandurra created

Also, construct remote projects via the room, to guarantee
that the room can manage the projects' sharing lifecycle.

Co-authored-by: Antonio Scandurra <antonio@zed.dev>

Change summary

Cargo.lock                             |  2 
crates/call/Cargo.toml                 |  4 +
crates/call/src/room.rs                | 49 ++++++++++++++++---
crates/collab/src/db.rs                | 16 +++++
crates/collab/src/integration_tests.rs | 66 ++++++++++++++++++---------
crates/collab_ui/src/collab_ui.rs      | 29 ++++++-----
crates/project/src/project.rs          |  2 
7 files changed, 120 insertions(+), 48 deletions(-)

Detailed changes

Cargo.lock 🔗

@@ -820,8 +820,10 @@ dependencies = [
  "async-broadcast",
  "client",
  "collections",
+ "fs",
  "futures 0.3.25",
  "gpui",
+ "language",
  "live_kit_client",
  "log",
  "media",

crates/call/Cargo.toml 🔗

@@ -23,6 +23,8 @@ collections = { path = "../collections" }
 gpui = { path = "../gpui" }
 log = "0.4"
 live_kit_client = { path = "../live_kit_client" }
+fs = { path = "../fs" }
+language = { path = "../language" }
 media = { path = "../media" }
 project = { path = "../project" }
 util = { path = "../util" }
@@ -34,6 +36,8 @@ postage = { version = "0.4.1", features = ["futures-traits"] }
 
 [dev-dependencies]
 client = { path = "../client", features = ["test-support"] }
+fs = { path = "../fs", features = ["test-support"] }
+language = { path = "../language", features = ["test-support"] }
 collections = { path = "../collections", features = ["test-support"] }
 gpui = { path = "../gpui", features = ["test-support"] }
 live_kit_client = { path = "../live_kit_client", features = ["test-support"] }

crates/call/src/room.rs 🔗

@@ -8,10 +8,12 @@ use client::{
     Client, TypedEnvelope, User, UserStore,
 };
 use collections::{BTreeMap, HashMap, HashSet};
+use fs::Fs;
 use futures::{FutureExt, StreamExt};
 use gpui::{
     AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext, Task, WeakModelHandle,
 };
+use language::LanguageRegistry;
 use live_kit_client::{LocalTrackPublication, LocalVideoTrack, RemoteVideoTrackUpdate};
 use postage::stream::Stream;
 use project::Project;
@@ -523,6 +525,20 @@ impl Room {
                         }
 
                         for unshared_project_id in old_projects.difference(&new_projects) {
+                            this.joined_projects.retain(|project| {
+                                if let Some(project) = project.upgrade(cx) {
+                                    project.update(cx, |project, cx| {
+                                        if project.remote_id() == Some(*unshared_project_id) {
+                                            project.disconnected_from_host(cx);
+                                            false
+                                        } else {
+                                            true
+                                        }
+                                    })
+                                } else {
+                                    false
+                                }
+                            });
                             cx.emit(Event::RemoteProjectUnshared {
                                 project_id: *unshared_project_id,
                             });
@@ -699,15 +715,30 @@ impl Room {
         })
     }
 
-    pub fn joined_project(&mut self, project: ModelHandle<Project>, cx: &mut ModelContext<Self>) {
-        self.joined_projects.retain(|project| {
-            if let Some(project) = project.upgrade(cx) {
-                !project.read(cx).is_read_only()
-            } else {
-                false
-            }
-        });
-        self.joined_projects.insert(project.downgrade());
+    pub fn join_project(
+        &mut self,
+        id: u64,
+        language_registry: Arc<LanguageRegistry>,
+        fs: Arc<dyn Fs>,
+        cx: &mut ModelContext<Self>,
+    ) -> Task<Result<ModelHandle<Project>>> {
+        let client = self.client.clone();
+        let user_store = self.user_store.clone();
+        cx.spawn(|this, mut cx| async move {
+            let project =
+                Project::remote(id, client, user_store, language_registry, fs, cx.clone()).await?;
+            this.update(&mut cx, |this, cx| {
+                this.joined_projects.retain(|project| {
+                    if let Some(project) = project.upgrade(cx) {
+                        !project.read(cx).is_read_only()
+                    } else {
+                        false
+                    }
+                });
+                this.joined_projects.insert(project.downgrade());
+            });
+            Ok(project)
+        })
     }
 
     pub(crate) fn share_project(

crates/collab/src/db.rs 🔗

@@ -1440,9 +1440,20 @@ impl Database {
                     });
                 }
 
-                // TODO: handle unshared projects
-                // TODO: handle left projects
+                project::Entity::delete_many()
+                    .filter(
+                        Condition::all()
+                            .add(project::Column::RoomId.eq(room_id))
+                            .add(project::Column::HostUserId.eq(user_id))
+                            .add(
+                                project::Column::Id
+                                    .is_not_in(reshared_projects.iter().map(|project| project.id)),
+                            ),
+                    )
+                    .exec(&*tx)
+                    .await?;
 
+                // TODO: handle left projects
                 let room = self.get_room(room_id, &tx).await?;
                 Ok((
                     room_id,
@@ -2971,6 +2982,7 @@ impl ProjectCollaborator {
     }
 }
 
+#[derive(Debug)]
 pub struct LeftProject {
     pub id: ProjectId,
     pub host_user_id: UserId,

crates/collab/src/integration_tests.rs 🔗

@@ -1324,7 +1324,7 @@ async fn test_host_reconnect(
     client_a
         .fs
         .insert_tree(
-            "/root",
+            "/root-1",
             json!({
                 "dir1": {
                     "a.txt": "a-contents",
@@ -1343,17 +1343,32 @@ async fn test_host_reconnect(
             }),
         )
         .await;
+    client_a
+        .fs
+        .insert_tree(
+            "/root-2",
+            json!({
+                "1.txt": "1-contents",
+            }),
+        )
+        .await;
 
     let active_call_a = cx_a.read(ActiveCall::global);
-    let (project_a, _) = client_a.build_local_project("/root/dir1", cx_a).await;
+    let (project_a1, _) = client_a.build_local_project("/root-1/dir1", cx_a).await;
+    let (project_a2, _) = client_a.build_local_project("/root-2", cx_a).await;
     let worktree_a1 =
-        project_a.read_with(cx_a, |project, cx| project.worktrees(cx).next().unwrap());
-    let project_id = active_call_a
-        .update(cx_a, |call, cx| call.share_project(project_a.clone(), cx))
+        project_a1.read_with(cx_a, |project, cx| project.worktrees(cx).next().unwrap());
+    let project1_id = active_call_a
+        .update(cx_a, |call, cx| call.share_project(project_a1.clone(), cx))
+        .await
+        .unwrap();
+    let project2_id = active_call_a
+        .update(cx_a, |call, cx| call.share_project(project_a2.clone(), cx))
         .await
         .unwrap();
 
-    let project_b = client_b.build_remote_project(project_id, cx_b).await;
+    let project_b1 = client_b.build_remote_project(project1_id, cx_b).await;
+    let project_b2 = client_b.build_remote_project(project2_id, cx_b).await;
     deterministic.run_until_parked();
 
     let worktree1_id = worktree_a1.read_with(cx_a, |worktree, _| {
@@ -1365,11 +1380,11 @@ async fn test_host_reconnect(
     server.forbid_connections();
     server.disconnect_client(client_a.peer_id().unwrap());
     deterministic.advance_clock(RECEIVE_TIMEOUT);
-    project_a.read_with(cx_a, |project, _| {
+    project_a1.read_with(cx_a, |project, _| {
         assert!(project.is_shared());
         assert_eq!(project.collaborators().len(), 1);
     });
-    project_b.read_with(cx_b, |project, _| {
+    project_b1.read_with(cx_b, |project, _| {
         assert!(!project.is_read_only());
         assert_eq!(project.collaborators().len(), 1);
     });
@@ -1377,11 +1392,11 @@ async fn test_host_reconnect(
         assert!(tree.as_local().unwrap().is_shared())
     });
 
-    // While disconnected, add/remove files and worktrees from client A's project.
+    // While disconnected, add and remove files from client A's project.
     client_a
         .fs
         .insert_tree(
-            "/root/dir1/subdir2",
+            "/root-1/dir1/subdir2",
             json!({
                 "f.txt": "f-contents",
                 "g.txt": "g-contents",
@@ -1393,7 +1408,7 @@ async fn test_host_reconnect(
     client_a
         .fs
         .remove_dir(
-            "/root/dir1/subdir1".as_ref(),
+            "/root-1/dir1/subdir1".as_ref(),
             RemoveOptions {
                 recursive: true,
                 ..Default::default()
@@ -1401,9 +1416,11 @@ async fn test_host_reconnect(
         )
         .await
         .unwrap();
-    let (worktree_a2, _) = project_a
+
+    // While disconnected, add a worktree to client A's project.
+    let (worktree_a2, _) = project_a1
         .update(cx_a, |p, cx| {
-            p.find_or_create_local_worktree("/root/dir2", true, cx)
+            p.find_or_create_local_worktree("/root-1/dir2", true, cx)
         })
         .await
         .unwrap();
@@ -1416,6 +1433,9 @@ async fn test_host_reconnect(
     });
     deterministic.run_until_parked();
 
+    // While disconnected, close project 2
+    cx_a.update(|_| drop(project_a2));
+
     // Client A reconnects. Their project is re-shared, and client B re-joins it.
     server.allow_connections();
     client_a
@@ -1423,7 +1443,7 @@ async fn test_host_reconnect(
         .await
         .unwrap();
     deterministic.run_until_parked();
-    project_a.read_with(cx_a, |project, cx| {
+    project_a1.read_with(cx_a, |project, cx| {
         assert!(project.is_shared());
         assert_eq!(
             worktree_a1
@@ -1456,7 +1476,7 @@ async fn test_host_reconnect(
             vec!["x", "y", "z"]
         );
     });
-    project_b.read_with(cx_b, |project, cx| {
+    project_b1.read_with(cx_b, |project, cx| {
         assert!(!project.is_read_only());
         assert_eq!(
             project
@@ -1493,6 +1513,7 @@ async fn test_host_reconnect(
             vec!["x", "y", "z"]
         );
     });
+    project_b2.read_with(cx_b, |project, _| assert!(project.is_read_only()));
 }
 
 #[gpui::test(iterations = 10)]
@@ -6930,17 +6951,18 @@ impl TestClient {
         host_project_id: u64,
         guest_cx: &mut TestAppContext,
     ) -> ModelHandle<Project> {
-        let project_b = guest_cx.spawn(|cx| {
-            Project::remote(
+        let active_call = guest_cx.read(ActiveCall::global);
+        let room = active_call.read_with(guest_cx, |call, _| call.room().unwrap().clone());
+        room.update(guest_cx, |room, cx| {
+            room.join_project(
                 host_project_id,
-                self.client.clone(),
-                self.user_store.clone(),
                 self.language_registry.clone(),
-                FakeFs::new(cx.background()),
+                self.fs.clone(),
                 cx,
             )
-        });
-        project_b.await.unwrap()
+        })
+        .await
+        .unwrap()
     }
 
     fn build_workspace(

crates/collab_ui/src/collab_ui.rs 🔗

@@ -7,10 +7,10 @@ mod incoming_call_notification;
 mod notifications;
 mod project_shared_notification;
 
+use anyhow::anyhow;
 use call::ActiveCall;
 pub use collab_titlebar_item::{CollabTitlebarItem, ToggleCollaborationMenu};
 use gpui::MutableAppContext;
-use project::Project;
 use std::sync::Arc;
 use workspace::{AppState, JoinProject, ToggleFollow, Workspace};
 
@@ -39,15 +39,20 @@ pub fn init(app_state: Arc<AppState>, cx: &mut MutableAppContext) {
             let workspace = if let Some(existing_workspace) = existing_workspace {
                 existing_workspace
             } else {
-                let project = Project::remote(
-                    project_id,
-                    app_state.client.clone(),
-                    app_state.user_store.clone(),
-                    app_state.languages.clone(),
-                    app_state.fs.clone(),
-                    cx.clone(),
-                )
-                .await?;
+                let active_call = cx.read(ActiveCall::global);
+                let room = active_call
+                    .read_with(&cx, |call, _| call.room().cloned())
+                    .ok_or_else(|| anyhow!("not in a call"))?;
+                let project = room
+                    .update(&mut cx, |room, cx| {
+                        room.join_project(
+                            project_id,
+                            app_state.languages.clone(),
+                            app_state.fs.clone(),
+                            cx,
+                        )
+                    })
+                    .await?;
 
                 let (_, workspace) = cx.add_window((app_state.build_window_options)(), |cx| {
                     let mut workspace = Workspace::new(
@@ -68,10 +73,6 @@ pub fn init(app_state: Arc<AppState>, cx: &mut MutableAppContext) {
 
             workspace.update(&mut cx, |workspace, cx| {
                 if let Some(room) = ActiveCall::global(cx).read(cx).room().cloned() {
-                    room.update(cx, |room, cx| {
-                        room.joined_project(workspace.project().clone(), cx);
-                    });
-
                     let follow_peer_id = room
                         .read(cx)
                         .remote_participants()

crates/project/src/project.rs 🔗

@@ -1146,7 +1146,7 @@ impl Project {
         }
     }
 
-    fn disconnected_from_host(&mut self, cx: &mut ModelContext<Self>) {
+    pub fn disconnected_from_host(&mut self, cx: &mut ModelContext<Self>) {
         if let Some(ProjectClientState::Remote {
             sharing_has_stopped,
             ..