Add remote worktree to project before it is fully deserialized

Antonio Scandurra created

This prevents a race condition where the host will send us messages and
responses about a worktree that we have seen but haven't yet finished loading.

Change summary

crates/project/src/project.rs  |  38 ++++------
crates/project/src/worktree.rs | 124 ++++++++++++++++++-----------------
2 files changed, 82 insertions(+), 80 deletions(-)

Detailed changes

crates/project/src/project.rs 🔗

@@ -247,8 +247,10 @@ impl Project {
 
         let mut worktrees = Vec::new();
         for worktree in response.worktrees {
-            worktrees
-                .push(Worktree::remote(remote_id, replica_id, worktree, client.clone(), cx).await?);
+            let (worktree, load_task) = cx
+                .update(|cx| Worktree::remote(remote_id, replica_id, worktree, client.clone(), cx));
+            worktrees.push(worktree);
+            load_task.detach();
         }
 
         let user_ids = response
@@ -1464,16 +1466,9 @@ impl Project {
             .payload
             .worktree
             .ok_or_else(|| anyhow!("invalid worktree"))?;
-        cx.spawn(|this, mut cx| {
-            async move {
-                let worktree =
-                    Worktree::remote(remote_id, replica_id, worktree, client, &mut cx).await?;
-                this.update(&mut cx, |this, cx| this.add_worktree(&worktree, cx));
-                Ok(())
-            }
-            .log_err()
-        })
-        .detach();
+        let (worktree, load_task) = Worktree::remote(remote_id, replica_id, worktree, client, cx);
+        self.add_worktree(&worktree, cx);
+        load_task.detach();
         Ok(())
     }
 
@@ -2551,15 +2546,16 @@ mod tests {
 
         // Create a remote copy of this worktree.
         let initial_snapshot = tree.read_with(&cx, |tree, _| tree.snapshot());
-        let remote = Worktree::remote(
-            1,
-            1,
-            initial_snapshot.to_proto(&Default::default(), Default::default()),
-            rpc.clone(),
-            &mut cx.to_async(),
-        )
-        .await
-        .unwrap();
+        let (remote, load_task) = cx.update(|cx| {
+            Worktree::remote(
+                1,
+                1,
+                initial_snapshot.to_proto(&Default::default(), Default::default()),
+                rpc.clone(),
+                cx,
+            )
+        });
+        load_task.await;
 
         cx.read(|cx| {
             assert!(!buffer2.read(cx).is_dirty());

crates/project/src/worktree.rs 🔗

@@ -190,13 +190,13 @@ impl Worktree {
         Ok(tree)
     }
 
-    pub async fn remote(
+    pub fn remote(
         project_remote_id: u64,
         replica_id: ReplicaId,
         worktree: proto::Worktree,
         client: Arc<Client>,
-        cx: &mut AsyncAppContext,
-    ) -> Result<ModelHandle<Self>> {
+        cx: &mut MutableAppContext,
+    ) -> (ModelHandle<Self>, Task<()>) {
         let remote_id = worktree.id;
         let root_char_bag: CharBag = worktree
             .root_name
@@ -205,32 +205,26 @@ impl Worktree {
             .collect();
         let root_name = worktree.root_name.clone();
         let weak = worktree.weak;
-        let (entries_by_path, entries_by_id, diagnostic_summaries) = cx
-            .background()
-            .spawn(async move {
-                let mut entries_by_path_edits = Vec::new();
-                let mut entries_by_id_edits = Vec::new();
-                for entry in worktree.entries {
-                    match Entry::try_from((&root_char_bag, entry)) {
-                        Ok(entry) => {
-                            entries_by_id_edits.push(Edit::Insert(PathEntry {
-                                id: entry.id,
-                                path: entry.path.clone(),
-                                is_ignored: entry.is_ignored,
-                                scan_id: 0,
-                            }));
-                            entries_by_path_edits.push(Edit::Insert(entry));
-                        }
-                        Err(err) => log::warn!("error for remote worktree entry {:?}", err),
-                    }
-                }
-
-                let mut entries_by_path = SumTree::new();
-                let mut entries_by_id = SumTree::new();
-                entries_by_path.edit(entries_by_path_edits, &());
-                entries_by_id.edit(entries_by_id_edits, &());
+        let snapshot = Snapshot {
+            id: WorktreeId(remote_id as usize),
+            root_name,
+            root_char_bag,
+            entries_by_path: Default::default(),
+            entries_by_id: Default::default(),
+        };
 
-                let diagnostic_summaries = TreeMap::from_ordered_entries(
+        let (updates_tx, mut updates_rx) = postage::mpsc::channel(64);
+        let (mut snapshot_tx, snapshot_rx) = watch::channel_with(snapshot.clone());
+        let worktree_handle = cx.add_model(|_: &mut ModelContext<Worktree>| {
+            Worktree::Remote(RemoteWorktree {
+                project_id: project_remote_id,
+                replica_id,
+                snapshot: snapshot.clone(),
+                snapshot_rx: snapshot_rx.clone(),
+                updates_tx,
+                client: client.clone(),
+                queued_operations: Default::default(),
+                diagnostic_summaries: TreeMap::from_ordered_entries(
                     worktree.diagnostic_summaries.into_iter().map(|summary| {
                         (
                             PathKey(PathBuf::from(summary.path).into()),
@@ -242,24 +236,48 @@ impl Worktree {
                             },
                         )
                     }),
-                );
-
-                (entries_by_path, entries_by_id, diagnostic_summaries)
+                ),
+                weak,
             })
-            .await;
+        });
 
-        let worktree = cx.update(|cx| {
-            cx.add_model(|cx: &mut ModelContext<Worktree>| {
-                let snapshot = Snapshot {
-                    id: WorktreeId(remote_id as usize),
-                    root_name,
-                    root_char_bag,
-                    entries_by_path,
-                    entries_by_id,
-                };
+        let deserialize_task = cx.spawn({
+            let worktree_handle = worktree_handle.clone();
+            |cx| async move {
+                let (entries_by_path, entries_by_id) = cx
+                    .background()
+                    .spawn(async move {
+                        let mut entries_by_path_edits = Vec::new();
+                        let mut entries_by_id_edits = Vec::new();
+                        for entry in worktree.entries {
+                            match Entry::try_from((&root_char_bag, entry)) {
+                                Ok(entry) => {
+                                    entries_by_id_edits.push(Edit::Insert(PathEntry {
+                                        id: entry.id,
+                                        path: entry.path.clone(),
+                                        is_ignored: entry.is_ignored,
+                                        scan_id: 0,
+                                    }));
+                                    entries_by_path_edits.push(Edit::Insert(entry));
+                                }
+                                Err(err) => log::warn!("error for remote worktree entry {:?}", err),
+                            }
+                        }
 
-                let (updates_tx, mut updates_rx) = postage::mpsc::channel(64);
-                let (mut snapshot_tx, snapshot_rx) = watch::channel_with(snapshot.clone());
+                        let mut entries_by_path = SumTree::new();
+                        let mut entries_by_id = SumTree::new();
+                        entries_by_path.edit(entries_by_path_edits, &());
+                        entries_by_id.edit(entries_by_id_edits, &());
+
+                        (entries_by_path, entries_by_id)
+                    })
+                    .await;
+
+                {
+                    let mut snapshot = snapshot_tx.borrow_mut();
+                    snapshot.entries_by_path = entries_by_path;
+                    snapshot.entries_by_id = entries_by_id;
+                }
 
                 cx.background()
                     .spawn(async move {
@@ -275,7 +293,8 @@ impl Worktree {
 
                 {
                     let mut snapshot_rx = snapshot_rx.clone();
-                    cx.spawn_weak(|this, mut cx| async move {
+                    let this = worktree_handle.downgrade();
+                    cx.spawn(|mut cx| async move {
                         while let Some(_) = snapshot_rx.recv().await {
                             if let Some(this) = cx.read(|cx| this.upgrade(cx)) {
                                 this.update(&mut cx, |this, cx| this.poll_snapshot(cx));
@@ -286,22 +305,9 @@ impl Worktree {
                     })
                     .detach();
                 }
-
-                Worktree::Remote(RemoteWorktree {
-                    project_id: project_remote_id,
-                    replica_id,
-                    snapshot,
-                    snapshot_rx,
-                    updates_tx,
-                    client: client.clone(),
-                    queued_operations: Default::default(),
-                    diagnostic_summaries,
-                    weak,
-                })
-            })
+            }
         });
-
-        Ok(worktree)
+        (worktree_handle, deserialize_task)
     }
 
     pub fn as_local(&self) -> Option<&LocalWorktree> {