Add unit test for worktree update messages, fix minor issues

Max Brunsfeld and Nathan Sobo created

Co-Authored-By: Nathan Sobo <nathan@zed.dev>

Change summary

zed-rpc/proto/zed.proto |   7 +
zed/src/workspace.rs    |   2 
zed/src/worktree.rs     | 162 +++++++++++++++++++++++++++++++-----------
3 files changed, 122 insertions(+), 49 deletions(-)

Detailed changes

zed-rpc/proto/zed.proto 🔗

@@ -49,9 +49,10 @@ message OpenWorktree {
 }
 
 message OpenWorktreeResponse {
-    Worktree worktree = 1;
-    uint32 replica_id = 2;
-    repeated Peer peers = 3;
+    uint64 worktree_id = 1;
+    Worktree worktree = 2;
+    uint32 replica_id = 3;
+    repeated Peer peers = 4;
 }
 
 message UpdateWorktree {

zed/src/workspace.rs 🔗

@@ -727,7 +727,7 @@ impl Workspace {
             log::info!("read worktree url from clipboard: {}", worktree_url.text());
 
             let worktree =
-                Worktree::remote(rpc.clone(), worktree_id, access_token, languages, &mut cx)
+                Worktree::open_remote(rpc.clone(), worktree_id, access_token, languages, &mut cx)
                     .await?;
             this.update(&mut cx, |workspace, cx| {
                 cx.observe_model(&worktree, |_, _, cx| cx.notify());

zed/src/worktree.rs 🔗

@@ -109,36 +109,48 @@ impl Worktree {
         Worktree::Local(LocalWorktree::new(path, languages, cx))
     }
 
-    pub async fn remote(
+    pub async fn open_remote(
         rpc: rpc::Client,
         id: u64,
         access_token: String,
         languages: Arc<LanguageRegistry>,
         cx: &mut AsyncAppContext,
     ) -> Result<ModelHandle<Self>> {
-        let open_worktree_response = rpc
+        let response = rpc
             .request(proto::OpenWorktree {
                 worktree_id: id,
                 access_token,
             })
             .await?;
-        let worktree_message = open_worktree_response
+
+        Worktree::remote(response, rpc, languages, cx).await
+    }
+
+    async fn remote(
+        open_response: proto::OpenWorktreeResponse,
+        rpc: rpc::Client,
+        languages: Arc<LanguageRegistry>,
+        cx: &mut AsyncAppContext,
+    ) -> Result<ModelHandle<Self>> {
+        let worktree = open_response
             .worktree
             .ok_or_else(|| anyhow!("empty worktree"))?;
-        let replica_id = open_worktree_response.replica_id as ReplicaId;
-        let peers = open_worktree_response.peers;
-        let root_char_bag: CharBag = worktree_message
+
+        let remote_id = open_response.worktree_id;
+        let replica_id = open_response.replica_id as ReplicaId;
+        let peers = open_response.peers;
+        let root_char_bag: CharBag = worktree
             .root_name
             .chars()
             .map(|c| c.to_ascii_lowercase())
             .collect();
-        let root_name = worktree_message.root_name.clone();
+        let root_name = worktree.root_name.clone();
         let (entries, paths_by_id) = cx
             .background()
             .spawn(async move {
                 let mut paths_by_id = rpds::RedBlackTreeMapSync::default();
                 let mut edits = Vec::new();
-                for entry in worktree_message.entries {
+                for entry in worktree.entries {
                     match Entry::try_from((&root_char_bag, entry)) {
                         Ok(entry) => {
                             paths_by_id.insert_mut(entry.id as usize, (entry.path.clone(), 0));
@@ -197,7 +209,7 @@ impl Worktree {
                 .detach();
 
                 Worktree::Remote(RemoteWorktree {
-                    remote_id: id,
+                    remote_id,
                     replica_id,
                     snapshot,
                     updates_tx,
@@ -215,7 +227,7 @@ impl Worktree {
             .write()
             .await
             .shared_worktrees
-            .insert(id, worktree.downgrade());
+            .insert(open_response.worktree_id, worktree.downgrade());
 
         Ok(worktree)
     }
@@ -795,21 +807,12 @@ impl LocalWorktree {
         rpc: rpc::Client,
         cx: &mut ModelContext<Worktree>,
     ) -> Task<anyhow::Result<(u64, String)>> {
-        let root_name = self.root_name.clone();
         let snapshot = self.snapshot();
+        let share_request = self.share_request(cx);
         let handle = cx.handle();
         cx.spawn(|this, mut cx| async move {
-            let entries = {
-                let entries = snapshot.entries.clone();
-                cx.background()
-                    .spawn(async move { entries.cursor::<(), ()>().map(Into::into).collect() })
-                    .await
-            };
-            let share_response = rpc
-                .request(proto::ShareWorktree {
-                    worktree: Some(proto::Worktree { root_name, entries }),
-                })
-                .await?;
+            let share_request = share_request.await;
+            let share_response = rpc.request(share_request).await?;
 
             rpc.state
                 .write()
@@ -821,29 +824,47 @@ impl LocalWorktree {
             let (snapshots_to_send_tx, snapshots_to_send_rx) =
                 smol::channel::unbounded::<Snapshot>();
 
-            {
-                let rpc = rpc.clone();
-                let worktree_id = share_response.worktree_id;
-                std::thread::spawn(move || {
-                    let mut prev_snapshot = snapshot;
-                    while let Ok(snapshot) = smol::block_on(snapshots_to_send_rx.recv()) {
-                        let message = snapshot.build_update(&prev_snapshot, worktree_id);
-                        match smol::block_on(rpc.send(message)) {
-                            Ok(()) => prev_snapshot = snapshot,
-                            Err(err) => log::error!("error sending snapshot diff {}", err),
+            cx.background()
+                .spawn({
+                    let rpc = rpc.clone();
+                    let worktree_id = share_response.worktree_id;
+                    async move {
+                        let mut prev_snapshot = snapshot;
+                        while let Ok(snapshot) = snapshots_to_send_rx.recv().await {
+                            let message = snapshot.build_update(&prev_snapshot, worktree_id);
+                            match rpc.send(message).await {
+                                Ok(()) => prev_snapshot = snapshot,
+                                Err(err) => log::error!("error sending snapshot diff {}", err),
+                            }
                         }
                     }
-                });
-            }
+                })
+                .detach();
 
             this.update(&mut cx, |worktree, _| {
                 let worktree = worktree.as_local_mut().unwrap();
                 worktree.rpc = Some((rpc, share_response.worktree_id));
                 worktree.snapshots_to_send_tx = Some(snapshots_to_send_tx);
             });
+
             Ok((share_response.worktree_id, share_response.access_token))
         })
     }
+
+    fn share_request(&self, cx: &mut ModelContext<Worktree>) -> Task<proto::ShareWorktree> {
+        let snapshot = self.snapshot();
+        let root_name = self.root_name.clone();
+        cx.background().spawn(async move {
+            let entries = snapshot
+                .entries
+                .cursor::<(), ()>()
+                .map(Into::into)
+                .collect();
+            proto::ShareWorktree {
+                worktree: Some(proto::Worktree { root_name, entries }),
+            }
+        })
+    }
 }
 
 pub fn refresh_buffer(abs_path: PathBuf, cx: &mut ModelContext<Buffer>) {
@@ -1120,6 +1141,9 @@ impl Snapshot {
 
         for entry in update.updated_entries {
             let entry = Entry::try_from((&self.root_char_bag, entry))?;
+            if let Some((path, _)) = self.paths_by_id.get(&entry.id) {
+                edits.push(Edit::Remove(PathKey(path.clone())));
+            }
             self.paths_by_id
                 .insert_mut(entry.id, (entry.path.clone(), scan_id));
             edits.push(Edit::Insert(entry));
@@ -2572,7 +2596,7 @@ mod tests {
     }
 
     #[gpui::test]
-    async fn test_rescan_simple(mut cx: gpui::TestAppContext) {
+    async fn test_rescan_and_remote_updates(mut cx: gpui::TestAppContext) {
         let dir = temp_tree(json!({
             "a": {
                 "file1": "",
@@ -2610,10 +2634,32 @@ mod tests {
         let file3_id = id_for_path("a/file3", &cx);
         let file4_id = id_for_path("b/c/file4", &cx);
 
-        // After scanning, the worktree knows which files exist and which don't.
+        // Wait for the initial scan.
         cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
             .await;
 
+        // Create a remote copy of this worktree.
+        let initial_snapshot = tree.read_with(&cx, |tree, _| tree.snapshot());
+        let worktree_id = 1;
+        let share_request = tree
+            .update(&mut cx, |tree, cx| {
+                tree.as_local().unwrap().share_request(cx)
+            })
+            .await;
+        let remote = Worktree::remote(
+            proto::OpenWorktreeResponse {
+                worktree_id,
+                worktree: share_request.worktree,
+                replica_id: 1,
+                peers: Vec::new(),
+            },
+            rpc::Client::new(Default::default()),
+            Default::default(),
+            &mut cx.to_async(),
+        )
+        .await
+        .unwrap();
+
         cx.read(|cx| {
             assert!(!buffer2.read(cx).is_dirty());
             assert!(!buffer3.read(cx).is_dirty());
@@ -2621,6 +2667,7 @@ mod tests {
             assert!(!buffer5.read(cx).is_dirty());
         });
 
+        // Rename and delete files and directories.
         tree.flush_fs_events(&cx).await;
         std::fs::rename(dir.path().join("a/file3"), dir.path().join("b/c/file3")).unwrap();
         std::fs::remove_file(dir.path().join("b/c/file5")).unwrap();
@@ -2628,21 +2675,23 @@ mod tests {
         std::fs::rename(dir.path().join("a/file2"), dir.path().join("a/file2.new")).unwrap();
         tree.flush_fs_events(&cx).await;
 
+        let expected_paths = vec![
+            "a",
+            "a/file1",
+            "a/file2.new",
+            "b",
+            "d",
+            "d/file3",
+            "d/file4",
+        ];
+
         cx.read(|app| {
             assert_eq!(
                 tree.read(app)
                     .paths()
                     .map(|p| p.to_str().unwrap())
                     .collect::<Vec<_>>(),
-                vec![
-                    "a",
-                    "a/file1",
-                    "a/file2.new",
-                    "b",
-                    "d",
-                    "d/file3",
-                    "d/file4"
-                ]
+                expected_paths
             );
 
             assert_eq!(id_for_path("a/file2.new", &cx), file2_id);
@@ -2671,6 +2720,29 @@ mod tests {
             assert!(!buffer4.read(app).file().unwrap().is_deleted());
             assert!(buffer5.read(app).file().unwrap().is_deleted());
         });
+
+        // Update the remote worktree. Check that it becomes consistent with the
+        // local worktree.
+        remote.update(&mut cx, |remote, cx| {
+            let update_message = tree
+                .read(cx)
+                .snapshot()
+                .build_update(&initial_snapshot, worktree_id);
+            remote
+                .as_remote_mut()
+                .unwrap()
+                .snapshot
+                .apply_update(update_message)
+                .unwrap();
+
+            assert_eq!(
+                remote
+                    .paths()
+                    .map(|p| p.to_str().unwrap())
+                    .collect::<Vec<_>>(),
+                expected_paths
+            );
+        });
     }
 
     #[gpui::test]