Implement rejoining projects as guest when rejoining a room

Max Brunsfeld and Julia Risley created

Co-authored-by: Julia Risley <julia@zed.dev>

Change summary

crates/collab/migrations.sqlite/20221109000000_test_schema.sql |   2 
crates/collab/src/db.rs                                        | 126 +++
crates/collab/src/db/worktree_entry.rs                         |   2 
crates/collab/src/integration_tests.rs                         | 120 +++
crates/db/src/db.rs                                            | 138 ++-
crates/project/src/project.rs                                  | 139 ++-
crates/workspace/src/persistence.rs                            |  78 +-
7 files changed, 443 insertions(+), 162 deletions(-)

Detailed changes

crates/collab/migrations.sqlite/20221109000000_test_schema.sql 🔗

@@ -65,6 +65,7 @@ CREATE INDEX "index_worktrees_on_project_id" ON "worktrees" ("project_id");
 CREATE TABLE "worktree_entries" (
     "project_id" INTEGER NOT NULL,
     "worktree_id" INTEGER NOT NULL,
+    "scan_id" INTEGER NOT NULL,
     "id" INTEGER NOT NULL,
     "is_dir" BOOL NOT NULL,
     "path" VARCHAR NOT NULL,
@@ -73,6 +74,7 @@ CREATE TABLE "worktree_entries" (
     "mtime_nanos" INTEGER NOT NULL,
     "is_symlink" BOOL NOT NULL,
     "is_ignored" BOOL NOT NULL,
+    "is_deleted" BOOL NOT NULL,
     PRIMARY KEY(project_id, worktree_id, id),
     FOREIGN KEY(project_id, worktree_id) REFERENCES worktrees (project_id, id) ON DELETE CASCADE
 );

crates/collab/src/db.rs 🔗

@@ -1453,14 +1453,124 @@ impl Database {
                     .exec(&*tx)
                     .await?;
 
-                // TODO: handle left projects
+                let mut rejoined_projects = Vec::new();
+                for rejoined_project in &rejoin_room.rejoined_projects {
+                    let project_id = ProjectId::from_proto(rejoined_project.id);
+                    let Some(project) = project::Entity::find_by_id(project_id)
+                        .one(&*tx)
+                        .await? else {
+                            continue
+                        };
+
+                    let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
+                    let mut worktrees = Vec::new();
+                    for db_worktree in db_worktrees {
+                        let mut worktree = RejoinedWorktree {
+                            id: db_worktree.id as u64,
+                            abs_path: db_worktree.abs_path,
+                            root_name: db_worktree.root_name,
+                            visible: db_worktree.visible,
+                            updated_entries: Default::default(),
+                            removed_entries: Default::default(),
+                            diagnostic_summaries: Default::default(),
+                            scan_id: db_worktree.scan_id as u64,
+                            is_complete: db_worktree.is_complete,
+                        };
+
+                        let rejoined_worktree = rejoined_project
+                            .worktrees
+                            .iter()
+                            .find(|worktree| worktree.id == db_worktree.id as u64);
+
+                        let entry_filter = if let Some(rejoined_worktree) = rejoined_worktree {
+                            Condition::all()
+                                .add(worktree_entry::Column::WorktreeId.eq(worktree.id))
+                                .add(worktree_entry::Column::ScanId.gt(rejoined_worktree.scan_id))
+                        } else {
+                            Condition::all()
+                                .add(worktree_entry::Column::WorktreeId.eq(worktree.id))
+                                .add(worktree_entry::Column::IsDeleted.eq(false))
+                        };
+
+                        let mut db_entries = worktree_entry::Entity::find()
+                            .filter(entry_filter)
+                            .stream(&*tx)
+                            .await?;
+
+                        while let Some(db_entry) = db_entries.next().await {
+                            let db_entry = db_entry?;
+
+                            if db_entry.is_deleted {
+                                worktree.removed_entries.push(db_entry.id as u64);
+                            } else {
+                                worktree.updated_entries.push(proto::Entry {
+                                    id: db_entry.id as u64,
+                                    is_dir: db_entry.is_dir,
+                                    path: db_entry.path,
+                                    inode: db_entry.inode as u64,
+                                    mtime: Some(proto::Timestamp {
+                                        seconds: db_entry.mtime_seconds as u64,
+                                        nanos: db_entry.mtime_nanos as u32,
+                                    }),
+                                    is_symlink: db_entry.is_symlink,
+                                    is_ignored: db_entry.is_ignored,
+                                });
+                            }
+                        }
+
+                        worktrees.push(worktree);
+                    }
+
+                    let language_servers = project
+                        .find_related(language_server::Entity)
+                        .all(&*tx)
+                        .await?
+                        .into_iter()
+                        .map(|language_server| proto::LanguageServer {
+                            id: language_server.id as u64,
+                            name: language_server.name,
+                        })
+                        .collect::<Vec<_>>();
+
+                    let mut collaborators = project
+                        .find_related(project_collaborator::Entity)
+                        .all(&*tx)
+                        .await?
+                        .into_iter()
+                        .map(|collaborator| ProjectCollaborator {
+                            connection_id: collaborator.connection(),
+                            user_id: collaborator.user_id,
+                            replica_id: collaborator.replica_id,
+                            is_host: collaborator.is_host,
+                        })
+                        .collect::<Vec<_>>();
+
+                    let old_connection_id;
+                    if let Some(self_collaborator_ix) = collaborators
+                        .iter()
+                        .position(|collaborator| collaborator.user_id == user_id)
+                    {
+                        let self_collaborator = collaborators.swap_remove(self_collaborator_ix);
+                        old_connection_id = self_collaborator.connection_id;
+                    } else {
+                        continue;
+                    }
+
+                    rejoined_projects.push(RejoinedProject {
+                        id: project_id,
+                        old_connection_id,
+                        collaborators,
+                        worktrees,
+                        language_servers,
+                    });
+                }
+
                 let room = self.get_room(room_id, &tx).await?;
                 Ok((
                     room_id,
                     RejoinedRoom {
                         room,
-                        // TODO: handle rejoined projects
-                        rejoined_projects: Default::default(),
+                        rejoined_projects,
                         reshared_projects,
                     },
                 ))
@@ -2079,6 +2189,8 @@ impl Database {
                         mtime_nanos: ActiveValue::set(mtime.nanos as i32),
                         is_symlink: ActiveValue::set(entry.is_symlink),
                         is_ignored: ActiveValue::set(entry.is_ignored),
+                        is_deleted: ActiveValue::set(false),
+                        scan_id: ActiveValue::set(update.scan_id as i64),
                     }
                 }))
                 .on_conflict(
@@ -2103,7 +2215,7 @@ impl Database {
             }
 
             if !update.removed_entries.is_empty() {
-                worktree_entry::Entity::delete_many()
+                worktree_entry::Entity::update_many()
                     .filter(
                         worktree_entry::Column::ProjectId
                             .eq(project_id)
@@ -2113,6 +2225,11 @@ impl Database {
                                     .is_in(update.removed_entries.iter().map(|id| *id as i64)),
                             ),
                     )
+                    .set(worktree_entry::ActiveModel {
+                        is_deleted: ActiveValue::Set(true),
+                        scan_id: ActiveValue::Set(update.scan_id as i64),
+                        ..Default::default()
+                    })
                     .exec(&*tx)
                     .await?;
             }
@@ -2935,6 +3052,7 @@ pub struct RejoinedProject {
     pub language_servers: Vec<proto::LanguageServer>,
 }
 
+#[derive(Debug)]
 pub struct RejoinedWorktree {
     pub id: u64,
     pub abs_path: String,

crates/collab/src/db/worktree_entry.rs 🔗

@@ -17,6 +17,8 @@ pub struct Model {
     pub mtime_nanos: i32,
     pub is_symlink: bool,
     pub is_ignored: bool,
+    pub is_deleted: bool,
+    pub scan_id: i64,
 }
 
 #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]

crates/collab/src/integration_tests.rs 🔗

@@ -1307,7 +1307,7 @@ async fn test_host_disconnect(
 }
 
 #[gpui::test(iterations = 10)]
-async fn test_host_reconnect(
+async fn test_project_reconnect(
     deterministic: Arc<Deterministic>,
     cx_a: &mut TestAppContext,
     cx_b: &mut TestAppContext,
@@ -1336,9 +1336,12 @@ async fn test_host_reconnect(
                     }
                 },
                 "dir2": {
-                    "x": "x-contents",
-                    "y": "y-contents",
-                    "z": "z-contents",
+                    "x.txt": "x-contents",
+                    "y.txt": "y-contents",
+                    "z.txt": "z-contents",
+                },
+                "dir3": {
+                    "w.txt": "w-contents",
                 },
             }),
         )
@@ -1348,7 +1351,16 @@ async fn test_host_reconnect(
         .insert_tree(
             "/root-2",
             json!({
-                "1.txt": "1-contents",
+                "2.txt": "2-contents",
+            }),
+        )
+        .await;
+    client_a
+        .fs
+        .insert_tree(
+            "/root-3",
+            json!({
+                "3.txt": "3-contents",
             }),
         )
         .await;
@@ -1356,6 +1368,7 @@ async fn test_host_reconnect(
     let active_call_a = cx_a.read(ActiveCall::global);
     let (project_a1, _) = client_a.build_local_project("/root-1/dir1", cx_a).await;
     let (project_a2, _) = client_a.build_local_project("/root-2", cx_a).await;
+    let (project_a3, _) = client_a.build_local_project("/root-3", cx_a).await;
     let worktree_a1 =
         project_a1.read_with(cx_a, |project, cx| project.worktrees(cx).next().unwrap());
     let project1_id = active_call_a
@@ -1366,9 +1379,14 @@ async fn test_host_reconnect(
         .update(cx_a, |call, cx| call.share_project(project_a2.clone(), cx))
         .await
         .unwrap();
+    let project3_id = active_call_a
+        .update(cx_a, |call, cx| call.share_project(project_a3.clone(), cx))
+        .await
+        .unwrap();
 
     let project_b1 = client_b.build_remote_project(project1_id, cx_b).await;
     let project_b2 = client_b.build_remote_project(project2_id, cx_b).await;
+    let project_b3 = client_b.build_remote_project(project3_id, cx_b).await;
     deterministic.run_until_parked();
 
     let worktree1_id = worktree_a1.read_with(cx_a, |worktree, _| {
@@ -1473,7 +1491,7 @@ async fn test_host_reconnect(
                 .paths()
                 .map(|p| p.to_str().unwrap())
                 .collect::<Vec<_>>(),
-            vec!["x", "y", "z"]
+            vec!["x.txt", "y.txt", "z.txt"]
         );
     });
     project_b1.read_with(cx_b, |project, cx| {
@@ -1510,10 +1528,98 @@ async fn test_host_reconnect(
                 .paths()
                 .map(|p| p.to_str().unwrap())
                 .collect::<Vec<_>>(),
-            vec!["x", "y", "z"]
+            vec!["x.txt", "y.txt", "z.txt"]
         );
     });
     project_b2.read_with(cx_b, |project, _| assert!(project.is_read_only()));
+    project_b3.read_with(cx_b, |project, _| assert!(!project.is_read_only()));
+
+    // Drop client B's connection.
+    server.forbid_connections();
+    server.disconnect_client(client_b.peer_id().unwrap());
+    deterministic.advance_clock(RECEIVE_TIMEOUT);
+
+    // While client B is disconnected, add and remove files from client A's project
+    client_a
+        .fs
+        .insert_file("/root-1/dir1/subdir2/j.txt", "j-contents".into())
+        .await;
+    client_a
+        .fs
+        .remove_file("/root-1/dir1/subdir2/i.txt".as_ref(), Default::default())
+        .await
+        .unwrap();
+
+    // While client B is disconnected, add and remove worktrees from client A's project.
+    let (worktree_a3, _) = project_a1
+        .update(cx_a, |p, cx| {
+            p.find_or_create_local_worktree("/root-1/dir3", true, cx)
+        })
+        .await
+        .unwrap();
+    worktree_a3
+        .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
+        .await;
+    let worktree3_id = worktree_a3.read_with(cx_a, |tree, _| {
+        assert!(tree.as_local().unwrap().is_shared());
+        tree.id()
+    });
+    project_a1
+        .update(cx_a, |project, cx| {
+            project.remove_worktree(worktree2_id, cx)
+        })
+        .await;
+    deterministic.run_until_parked();
+
+    // While disconnected, close project 3
+    cx_a.update(|_| drop(project_a3));
+
+    // Client B reconnects. They re-join the room and the remaining shared project.
+    server.allow_connections();
+    client_b
+        .authenticate_and_connect(false, &cx_b.to_async())
+        .await
+        .unwrap();
+    deterministic.run_until_parked();
+    project_b1.read_with(cx_b, |project, cx| {
+        assert!(!project.is_read_only());
+        assert_eq!(
+            project
+                .worktree_for_id(worktree1_id, cx)
+                .unwrap()
+                .read(cx)
+                .snapshot()
+                .paths()
+                .map(|p| p.to_str().unwrap())
+                .collect::<Vec<_>>(),
+            vec![
+                "a.txt",
+                "b.txt",
+                "subdir1",
+                "subdir1/c.txt",
+                "subdir1/d.txt",
+                "subdir1/e.txt",
+                "subdir2",
+                "subdir2/f.txt",
+                "subdir2/g.txt",
+                "subdir2/h.txt",
+                "subdir2/j.txt"
+            ]
+        );
+        assert!(project.worktree_for_id(worktree2_id, cx).is_none());
+        assert_eq!(
+            project
+                .worktree_for_id(worktree3_id, cx)
+                .unwrap()
+                .read(cx)
+                .snapshot()
+                .paths()
+                .map(|p| p.to_str().unwrap())
+                .collect::<Vec<_>>(),
+            vec!["w.txt"]
+        );
+    });
+    project_b3.read_with(cx_b, |project, _| assert!(project.is_read_only()));
 }
 
 #[gpui::test(iterations = 10)]

crates/db/src/db.rs 🔗

@@ -20,8 +20,8 @@ use std::fs::create_dir_all;
 use std::path::{Path, PathBuf};
 use std::sync::atomic::{AtomicBool, Ordering};
 use std::time::{SystemTime, UNIX_EPOCH};
-use util::{async_iife, ResultExt};
 use util::channel::ReleaseChannel;
+use util::{async_iife, ResultExt};
 
 const CONNECTION_INITIALIZE_QUERY: &'static str = sql!(
     PRAGMA foreign_keys=TRUE;
@@ -41,14 +41,17 @@ const DB_FILE_NAME: &'static str = "db.sqlite";
 lazy_static::lazy_static! {
     static ref DB_FILE_OPERATIONS: Mutex<()> = Mutex::new(());
     pub static ref BACKUP_DB_PATH: RwLock<Option<PathBuf>> = RwLock::new(None);
-    pub static ref ALL_FILE_DB_FAILED: AtomicBool = AtomicBool::new(false);    
+    pub static ref ALL_FILE_DB_FAILED: AtomicBool = AtomicBool::new(false);
 }
 
 /// Open or create a database at the given directory path.
 /// This will retry a couple times if there are failures. If opening fails once, the db directory
 /// is moved to a backup folder and a new one is created. If that fails, a shared in memory db is created.
 /// In either case, static variables are set so that the user can be notified.
-pub async fn open_db<M: Migrator + 'static>(db_dir: &Path, release_channel: &ReleaseChannel) -> ThreadSafeConnection<M> {
+pub async fn open_db<M: Migrator + 'static>(
+    db_dir: &Path,
+    release_channel: &ReleaseChannel,
+) -> ThreadSafeConnection<M> {
     let release_channel_name = release_channel.dev_name();
     let main_db_dir = db_dir.join(Path::new(&format!("0-{}", release_channel_name)));
 
@@ -117,10 +120,10 @@ pub async fn open_db<M: Migrator + 'static>(db_dir: &Path, release_channel: &Rel
     if let Some(connection) = connection {
         return connection;
     }
-   
+
     // Set another static ref so that we can escalate the notification
     ALL_FILE_DB_FAILED.store(true, Ordering::Release);
-    
+
     // If still failed, create an in memory db with a known name
     open_fallback_db().await
 }
@@ -174,15 +177,15 @@ macro_rules! define_connection {
                 &self.0
             }
         }
-        
+
         impl $crate::sqlez::domain::Domain for $t {
             fn name() -> &'static str {
                 stringify!($t)
             }
-            
+
             fn migrations() -> &'static [&'static str] {
                 $migrations
-            } 
+            }
         }
 
         #[cfg(any(test, feature = "test-support"))]
@@ -205,15 +208,15 @@ macro_rules! define_connection {
                 &self.0
             }
         }
-        
+
         impl $crate::sqlez::domain::Domain for $t {
             fn name() -> &'static str {
                 stringify!($t)
             }
-            
+
             fn migrations() -> &'static [&'static str] {
                 $migrations
-            } 
+            }
         }
 
         #[cfg(any(test, feature = "test-support"))]
@@ -232,134 +235,157 @@ macro_rules! define_connection {
 mod tests {
     use std::{fs, thread};
 
-    use sqlez::{domain::Domain, connection::Connection};
+    use sqlez::{connection::Connection, domain::Domain};
     use sqlez_macros::sql;
     use tempdir::TempDir;
 
     use crate::{open_db, DB_FILE_NAME};
-        
+
     // Test bad migration panics
     #[gpui::test]
     #[should_panic]
     async fn test_bad_migration_panics() {
         enum BadDB {}
-        
+
         impl Domain for BadDB {
             fn name() -> &'static str {
                 "db_tests"
             }
-            
+
             fn migrations() -> &'static [&'static str] {
-                &[sql!(CREATE TABLE test(value);),
+                &[
+                    sql!(CREATE TABLE test(value);),
                     // failure because test already exists
-                  sql!(CREATE TABLE test(value);)]
+                    sql!(CREATE TABLE test(value);),
+                ]
             }
         }
-       
+
         let tempdir = TempDir::new("DbTests").unwrap();
         let _bad_db = open_db::<BadDB>(tempdir.path(), &util::channel::ReleaseChannel::Dev).await;
     }
-    
+
     /// Test that DB exists but corrupted (causing recreate)
     #[gpui::test]
     async fn test_db_corruption() {
         enum CorruptedDB {}
-        
+
         impl Domain for CorruptedDB {
             fn name() -> &'static str {
                 "db_tests"
             }
-            
+
             fn migrations() -> &'static [&'static str] {
                 &[sql!(CREATE TABLE test(value);)]
             }
         }
-        
+
         enum GoodDB {}
-        
+
         impl Domain for GoodDB {
             fn name() -> &'static str {
                 "db_tests" //Notice same name
             }
-            
+
             fn migrations() -> &'static [&'static str] {
                 &[sql!(CREATE TABLE test2(value);)] //But different migration
             }
         }
-       
+
         let tempdir = TempDir::new("DbTests").unwrap();
         {
-            let corrupt_db = open_db::<CorruptedDB>(tempdir.path(), &util::channel::ReleaseChannel::Dev).await;
+            let corrupt_db =
+                open_db::<CorruptedDB>(tempdir.path(), &util::channel::ReleaseChannel::Dev).await;
             assert!(corrupt_db.persistent());
         }
-        
-        
+
         let good_db = open_db::<GoodDB>(tempdir.path(), &util::channel::ReleaseChannel::Dev).await;
-        assert!(good_db.select_row::<usize>("SELECT * FROM test2").unwrap()().unwrap().is_none());
-        
-        let mut corrupted_backup_dir = fs::read_dir(
-            tempdir.path()
-        ).unwrap().find(|entry| {
-            !entry.as_ref().unwrap().file_name().to_str().unwrap().starts_with("0")
-        }
-        ).unwrap().unwrap().path();
+        assert!(
+            good_db.select_row::<usize>("SELECT * FROM test2").unwrap()()
+                .unwrap()
+                .is_none()
+        );
+
+        let mut corrupted_backup_dir = fs::read_dir(tempdir.path())
+            .unwrap()
+            .find(|entry| {
+                !entry
+                    .as_ref()
+                    .unwrap()
+                    .file_name()
+                    .to_str()
+                    .unwrap()
+                    .starts_with("0")
+            })
+            .unwrap()
+            .unwrap()
+            .path();
         corrupted_backup_dir.push(DB_FILE_NAME);
-        
+
         dbg!(&corrupted_backup_dir);
-        
+
         let backup = Connection::open_file(&corrupted_backup_dir.to_string_lossy());
-        assert!(backup.select_row::<usize>("SELECT * FROM test").unwrap()().unwrap().is_none());
+        assert!(backup.select_row::<usize>("SELECT * FROM test").unwrap()()
+            .unwrap()
+            .is_none());
     }
-    
+
     /// Test that DB exists but corrupted (causing recreate)
     #[gpui::test]
     async fn test_simultaneous_db_corruption() {
         enum CorruptedDB {}
-        
+
         impl Domain for CorruptedDB {
             fn name() -> &'static str {
                 "db_tests"
             }
-            
+
             fn migrations() -> &'static [&'static str] {
                 &[sql!(CREATE TABLE test(value);)]
             }
         }
-        
+
         enum GoodDB {}
-        
+
         impl Domain for GoodDB {
             fn name() -> &'static str {
                 "db_tests" //Notice same name
             }
-            
+
             fn migrations() -> &'static [&'static str] {
                 &[sql!(CREATE TABLE test2(value);)] //But different migration
             }
         }
-       
+
         let tempdir = TempDir::new("DbTests").unwrap();
         {
             // Setup the bad database
-            let corrupt_db = open_db::<CorruptedDB>(tempdir.path(), &util::channel::ReleaseChannel::Dev).await;
+            let corrupt_db =
+                open_db::<CorruptedDB>(tempdir.path(), &util::channel::ReleaseChannel::Dev).await;
             assert!(corrupt_db.persistent());
         }
-        
+
         // Try to connect to it a bunch of times at once
         let mut guards = vec![];
         for _ in 0..10 {
             let tmp_path = tempdir.path().to_path_buf();
             let guard = thread::spawn(move || {
-                let good_db = smol::block_on(open_db::<GoodDB>(tmp_path.as_path(), &util::channel::ReleaseChannel::Dev));
-                assert!(good_db.select_row::<usize>("SELECT * FROM test2").unwrap()().unwrap().is_none());
+                let good_db = smol::block_on(open_db::<GoodDB>(
+                    tmp_path.as_path(),
+                    &util::channel::ReleaseChannel::Dev,
+                ));
+                assert!(
+                    good_db.select_row::<usize>("SELECT * FROM test2").unwrap()()
+                        .unwrap()
+                        .is_none()
+                );
             });
-            
+
             guards.push(guard);
-        
         }
-        
-       for guard in guards.into_iter() {
-           assert!(guard.join().is_ok());
-       }
+
+        for guard in guards.into_iter() {
+            assert!(guard.join().is_ok());
+        }
     }
 }

crates/project/src/project.rs 🔗

@@ -1088,7 +1088,26 @@ impl Project {
         message: proto::RejoinedProject,
         cx: &mut ModelContext<Self>,
     ) -> Result<()> {
+        self.set_worktrees_from_proto(message.worktrees, cx)?;
         self.set_collaborators_from_proto(message.collaborators, cx)?;
+
+        self.language_server_statuses = message
+            .language_servers
+            .into_iter()
+            .map(|server| {
+                (
+                    server.id as usize,
+                    LanguageServerStatus {
+                        name: server.name,
+                        pending_work: Default::default(),
+                        has_pending_diagnostic_updates: false,
+                        progress_tokens: Default::default(),
+                    },
+                )
+            })
+            .collect();
+
+        cx.notify();
         Ok(())
     }
 
@@ -4647,39 +4666,11 @@ impl Project {
     async fn handle_update_project(
         this: ModelHandle<Self>,
         envelope: TypedEnvelope<proto::UpdateProject>,
-        client: Arc<Client>,
+        _: Arc<Client>,
         mut cx: AsyncAppContext,
     ) -> Result<()> {
         this.update(&mut cx, |this, cx| {
-            let replica_id = this.replica_id();
-            let remote_id = this.remote_id().ok_or_else(|| anyhow!("invalid project"))?;
-
-            let mut old_worktrees_by_id = this
-                .worktrees
-                .drain(..)
-                .filter_map(|worktree| {
-                    let worktree = worktree.upgrade(cx)?;
-                    Some((worktree.read(cx).id(), worktree))
-                })
-                .collect::<HashMap<_, _>>();
-
-            for worktree in envelope.payload.worktrees {
-                if let Some(old_worktree) =
-                    old_worktrees_by_id.remove(&WorktreeId::from_proto(worktree.id))
-                {
-                    this.worktrees.push(WorktreeHandle::Strong(old_worktree));
-                } else {
-                    let worktree =
-                        Worktree::remote(remote_id, replica_id, worktree, client.clone(), cx);
-                    let _ = this.add_worktree(&worktree, cx);
-                }
-            }
-
-            let _ = this.metadata_changed(cx);
-            for (id, _) in old_worktrees_by_id {
-                cx.emit(Event::WorktreeRemoved(id));
-            }
-
+            this.set_worktrees_from_proto(envelope.payload.worktrees, cx)?;
             Ok(())
         })
     }
@@ -4871,14 +4862,15 @@ impl Project {
         _: Arc<Client>,
         mut cx: AsyncAppContext,
     ) -> Result<()> {
-        let language_server_id = envelope.payload.language_server_id as usize;
-        match envelope
-            .payload
-            .variant
-            .ok_or_else(|| anyhow!("invalid variant"))?
-        {
-            proto::update_language_server::Variant::WorkStart(payload) => {
-                this.update(&mut cx, |this, cx| {
+        this.update(&mut cx, |this, cx| {
+            let language_server_id = envelope.payload.language_server_id as usize;
+
+            match envelope
+                .payload
+                .variant
+                .ok_or_else(|| anyhow!("invalid variant"))?
+            {
+                proto::update_language_server::Variant::WorkStart(payload) => {
                     this.on_lsp_work_start(
                         language_server_id,
                         payload.token,
@@ -4889,10 +4881,9 @@ impl Project {
                         },
                         cx,
                     );
-                })
-            }
-            proto::update_language_server::Variant::WorkProgress(payload) => {
-                this.update(&mut cx, |this, cx| {
+                }
+
+                proto::update_language_server::Variant::WorkProgress(payload) => {
                     this.on_lsp_work_progress(
                         language_server_id,
                         payload.token,
@@ -4903,26 +4894,23 @@ impl Project {
                         },
                         cx,
                     );
-                })
-            }
-            proto::update_language_server::Variant::WorkEnd(payload) => {
-                this.update(&mut cx, |this, cx| {
+                }
+
+                proto::update_language_server::Variant::WorkEnd(payload) => {
                     this.on_lsp_work_end(language_server_id, payload.token, cx);
-                })
-            }
-            proto::update_language_server::Variant::DiskBasedDiagnosticsUpdating(_) => {
-                this.update(&mut cx, |this, cx| {
+                }
+
+                proto::update_language_server::Variant::DiskBasedDiagnosticsUpdating(_) => {
                     this.disk_based_diagnostics_started(language_server_id, cx);
-                })
-            }
-            proto::update_language_server::Variant::DiskBasedDiagnosticsUpdated(_) => {
-                this.update(&mut cx, |this, cx| {
+                }
+
+                proto::update_language_server::Variant::DiskBasedDiagnosticsUpdated(_) => {
                     this.disk_based_diagnostics_finished(language_server_id, cx)
-                });
+                }
             }
-        }
 
-        Ok(())
+            Ok(())
+        })
     }
 
     async fn handle_update_buffer(
@@ -5638,6 +5626,43 @@ impl Project {
         })
     }
 
+    fn set_worktrees_from_proto(
+        &mut self,
+        worktrees: Vec<proto::WorktreeMetadata>,
+        cx: &mut ModelContext<Project>,
+    ) -> Result<()> {
+        let replica_id = self.replica_id();
+        let remote_id = self.remote_id().ok_or_else(|| anyhow!("invalid project"))?;
+
+        let mut old_worktrees_by_id = self
+            .worktrees
+            .drain(..)
+            .filter_map(|worktree| {
+                let worktree = worktree.upgrade(cx)?;
+                Some((worktree.read(cx).id(), worktree))
+            })
+            .collect::<HashMap<_, _>>();
+
+        for worktree in worktrees {
+            if let Some(old_worktree) =
+                old_worktrees_by_id.remove(&WorktreeId::from_proto(worktree.id))
+            {
+                self.worktrees.push(WorktreeHandle::Strong(old_worktree));
+            } else {
+                let worktree =
+                    Worktree::remote(remote_id, replica_id, worktree, self.client.clone(), cx);
+                let _ = self.add_worktree(&worktree, cx);
+            }
+        }
+
+        let _ = self.metadata_changed(cx);
+        for (id, _) in old_worktrees_by_id {
+            cx.emit(Event::WorktreeRemoved(id));
+        }
+
+        Ok(())
+    }
+
     fn set_collaborators_from_proto(
         &mut self,
         messages: Vec<proto::Collaborator>,

crates/workspace/src/persistence.rs 🔗

@@ -8,7 +8,7 @@ use anyhow::{anyhow, bail, Context, Result};
 use db::{define_connection, query, sqlez::connection::Connection, sqlez_macros::sql};
 use gpui::Axis;
 
-use util::{ unzip_option, ResultExt};
+use util::{unzip_option, ResultExt};
 
 use crate::dock::DockPosition;
 use crate::WorkspaceId;
@@ -31,7 +31,7 @@ define_connection! {
                 timestamp TEXT DEFAULT CURRENT_TIMESTAMP NOT NULL,
                 FOREIGN KEY(dock_pane) REFERENCES panes(pane_id)
             ) STRICT;
-            
+
             CREATE TABLE pane_groups(
                 group_id INTEGER PRIMARY KEY,
                 workspace_id INTEGER NOT NULL,
@@ -43,7 +43,7 @@ define_connection! {
                 ON UPDATE CASCADE,
                 FOREIGN KEY(parent_group_id) REFERENCES pane_groups(group_id) ON DELETE CASCADE
             ) STRICT;
-            
+
             CREATE TABLE panes(
                 pane_id INTEGER PRIMARY KEY,
                 workspace_id INTEGER NOT NULL,
@@ -52,7 +52,7 @@ define_connection! {
                 ON DELETE CASCADE
                 ON UPDATE CASCADE
             ) STRICT;
-            
+
             CREATE TABLE center_panes(
                 pane_id INTEGER PRIMARY KEY,
                 parent_group_id INTEGER, // NULL means that this is a root pane
@@ -61,7 +61,7 @@ define_connection! {
                 ON DELETE CASCADE,
                 FOREIGN KEY(parent_group_id) REFERENCES pane_groups(group_id) ON DELETE CASCADE
             ) STRICT;
-            
+
             CREATE TABLE items(
                 item_id INTEGER NOT NULL, // This is the item's view id, so this is not unique
                 workspace_id INTEGER NOT NULL,
@@ -119,7 +119,7 @@ impl WorkspaceDb {
                 .context("Getting center group")
                 .log_err()?,
             dock_position,
-            left_sidebar_open
+            left_sidebar_open,
         })
     }
 
@@ -158,7 +158,12 @@ impl WorkspaceDb {
                             dock_visible = ?4,
                             dock_anchor = ?5,
                             timestamp = CURRENT_TIMESTAMP
-                ))?((workspace.id, &workspace.location, workspace.left_sidebar_open, workspace.dock_position))
+                ))?((
+                    workspace.id,
+                    &workspace.location,
+                    workspace.left_sidebar_open,
+                    workspace.dock_position,
+                ))
                 .context("Updating workspace")?;
 
                 // Save center pane group and dock pane
@@ -191,10 +196,10 @@ impl WorkspaceDb {
 
     query! {
         pub fn recent_workspaces(limit: usize) -> Result<Vec<(WorkspaceId, WorkspaceLocation)>> {
-            SELECT workspace_id, workspace_location 
+            SELECT workspace_id, workspace_location
             FROM workspaces
             WHERE workspace_location IS NOT NULL
-            ORDER BY timestamp DESC 
+            ORDER BY timestamp DESC
             LIMIT ?
         }
     }
@@ -210,10 +215,16 @@ impl WorkspaceDb {
     }
 
     fn get_center_pane_group(&self, workspace_id: WorkspaceId) -> Result<SerializedPaneGroup> {
-        Ok(self.get_pane_group(workspace_id, None)?
+        Ok(self
+            .get_pane_group(workspace_id, None)?
             .into_iter()
             .next()
-            .unwrap_or_else(|| SerializedPaneGroup::Pane(SerializedPane { active: true, children: vec![] })))
+            .unwrap_or_else(|| {
+                SerializedPaneGroup::Pane(SerializedPane {
+                    active: true,
+                    children: vec![],
+                })
+            }))
     }
 
     fn get_pane_group(
@@ -225,7 +236,7 @@ impl WorkspaceDb {
         type GroupOrPane = (Option<GroupId>, Option<Axis>, Option<PaneId>, Option<bool>);
         self.select_bound::<GroupKey, GroupOrPane>(sql!(
             SELECT group_id, axis, pane_id, active
-                FROM (SELECT 
+                FROM (SELECT
                         group_id,
                         axis,
                         NULL as pane_id,
@@ -233,18 +244,18 @@ impl WorkspaceDb {
                         position,
                         parent_group_id,
                         workspace_id
-                      FROM pane_groups 
+                      FROM pane_groups
                      UNION
-                      SELECT 
+                      SELECT
+                        NULL,
                         NULL,
-                        NULL,  
                         center_panes.pane_id,
                         panes.active as active,
                         position,
                         parent_group_id,
                         panes.workspace_id as workspace_id
                       FROM center_panes
-                      JOIN panes ON center_panes.pane_id = panes.pane_id) 
+                      JOIN panes ON center_panes.pane_id = panes.pane_id)
             WHERE parent_group_id IS ? AND workspace_id = ?
             ORDER BY position
         ))?((group_id, workspace_id))?
@@ -267,13 +278,12 @@ impl WorkspaceDb {
         // Filter out panes and pane groups which don't have any children or items
         .filter(|pane_group| match pane_group {
             Ok(SerializedPaneGroup::Group { children, .. }) => !children.is_empty(),
-            Ok(SerializedPaneGroup::Pane(pane)) => !pane.children.is_empty(), 
+            Ok(SerializedPaneGroup::Pane(pane)) => !pane.children.is_empty(),
             _ => true,
         })
         .collect::<Result<_>>()
     }
 
-   
     fn save_pane_group(
         conn: &Connection,
         workspace_id: WorkspaceId,
@@ -285,15 +295,10 @@ impl WorkspaceDb {
                 let (parent_id, position) = unzip_option(parent);
 
                 let group_id = conn.select_row_bound::<_, i64>(sql!(
-                        INSERT INTO pane_groups(workspace_id, parent_group_id, position, axis) 
-                        VALUES (?, ?, ?, ?) 
+                        INSERT INTO pane_groups(workspace_id, parent_group_id, position, axis)
+                        VALUES (?, ?, ?, ?)
                         RETURNING group_id
-                ))?((
-                    workspace_id,
-                    parent_id,
-                    position,
-                    *axis,
-                ))?
+                ))?((workspace_id, parent_id, position, *axis))?
                 .ok_or_else(|| anyhow!("Couldn't retrieve group_id from inserted pane_group"))?;
 
                 for (position, group) in children.iter().enumerate() {
@@ -314,9 +319,7 @@ impl WorkspaceDb {
             SELECT pane_id, active
             FROM panes
             WHERE pane_id = (SELECT dock_pane FROM workspaces WHERE workspace_id = ?)
-        ))?(
-            workspace_id,
-        )?
+        ))?(workspace_id)?
         .context("No dock pane for workspace")?;
 
         Ok(SerializedPane::new(
@@ -333,8 +336,8 @@ impl WorkspaceDb {
         dock: bool,
     ) -> Result<PaneId> {
         let pane_id = conn.select_row_bound::<_, i64>(sql!(
-            INSERT INTO panes(workspace_id, active) 
-            VALUES (?, ?) 
+            INSERT INTO panes(workspace_id, active)
+            VALUES (?, ?)
             RETURNING pane_id
         ))?((workspace_id, pane.active))?
         .ok_or_else(|| anyhow!("Could not retrieve inserted pane_id"))?;
@@ -376,14 +379,13 @@ impl WorkspaceDb {
         Ok(())
     }
 
-    query!{
+    query! {
         pub async fn update_timestamp(workspace_id: WorkspaceId) -> Result<()> {
             UPDATE workspaces
             SET timestamp = CURRENT_TIMESTAMP
             WHERE workspace_id = ?
         }
     }
-    
 }
 
 #[cfg(test)]
@@ -472,7 +474,7 @@ mod tests {
             dock_position: crate::dock::DockPosition::Shown(DockAnchor::Bottom),
             center_group: Default::default(),
             dock_pane: Default::default(),
-            left_sidebar_open: true
+            left_sidebar_open: true,
         };
 
         let mut workspace_2 = SerializedWorkspace {
@@ -481,7 +483,7 @@ mod tests {
             dock_position: crate::dock::DockPosition::Hidden(DockAnchor::Expanded),
             center_group: Default::default(),
             dock_pane: Default::default(),
-            left_sidebar_open: false
+            left_sidebar_open: false,
         };
 
         db.save_workspace(workspace_1.clone()).await;
@@ -587,7 +589,7 @@ mod tests {
             dock_position: DockPosition::Shown(DockAnchor::Bottom),
             center_group,
             dock_pane,
-            left_sidebar_open: true
+            left_sidebar_open: true,
         };
 
         db.save_workspace(workspace.clone()).await;
@@ -660,7 +662,7 @@ mod tests {
             dock_position: DockPosition::Shown(DockAnchor::Right),
             center_group: Default::default(),
             dock_pane: Default::default(),
-            left_sidebar_open: false
+            left_sidebar_open: false,
         };
 
         db.save_workspace(workspace_3.clone()).await;
@@ -695,7 +697,7 @@ mod tests {
             dock_position: crate::dock::DockPosition::Hidden(DockAnchor::Right),
             center_group: center_group.clone(),
             dock_pane,
-            left_sidebar_open: true
+            left_sidebar_open: true,
         }
     }