Process remote worktree updates in the background

Nathan Sobo and Max Brunsfeld created

Co-Authored-By: Max Brunsfeld <maxbrunsfeld@gmail.com>

Change summary

zed/src/worktree.rs | 66 +++++++++++++++++++++++++++++++++-------------
1 file changed, 47 insertions(+), 19 deletions(-)

Detailed changes

zed/src/worktree.rs 🔗

@@ -154,7 +154,7 @@ impl Worktree {
             .await;
 
         let worktree = cx.update(|cx| {
-            cx.add_model(|cx| {
+            cx.add_model(|cx: &mut ModelContext<Worktree>| {
                 let snapshot = Snapshot {
                     id: cx.model_id(),
                     scan_id: 0,
@@ -168,10 +168,39 @@ impl Worktree {
                     next_entry_id: Default::default(),
                 };
 
+                let (updates_tx, mut updates_rx) = postage::mpsc::channel(64);
+                let (mut snapshot_tx, mut snapshot_rx) =
+                    postage::watch::channel_with(snapshot.clone());
+
+                cx.background()
+                    .spawn(async move {
+                        while let Some(update) = updates_rx.recv().await {
+                            let mut snapshot = snapshot_tx.borrow().clone();
+                            if let Err(error) = snapshot.apply_update(update) {
+                                log::error!("error applying worktree update: {}", error);
+                            }
+                            *snapshot_tx.borrow_mut() = snapshot;
+                        }
+                    })
+                    .detach();
+
+                cx.spawn(|this, mut cx| async move {
+                    while let Some(snapshot) = snapshot_rx.recv().await {
+                        this.update(&mut cx, |this, cx| {
+                            let this = this.as_remote_mut().unwrap();
+                            this.snapshot = snapshot;
+                            cx.notify();
+                            this.update_open_buffers(cx);
+                        })
+                    }
+                })
+                .detach();
+
                 Worktree::Remote(RemoteWorktree {
                     remote_id: id,
                     replica_id,
                     snapshot,
+                    updates_tx,
                     rpc: rpc.clone(),
                     open_buffers: Default::default(),
                     peers: peers
@@ -218,7 +247,7 @@ impl Worktree {
     pub fn snapshot(&self) -> Snapshot {
         match self {
             Worktree::Local(worktree) => worktree.snapshot(),
-            Worktree::Remote(worktree) => worktree.snapshot.clone(),
+            Worktree::Remote(worktree) => worktree.snapshot(),
         }
     }
 
@@ -861,6 +890,7 @@ pub struct RemoteWorktree {
     remote_id: u64,
     snapshot: Snapshot,
     rpc: rpc::Client,
+    updates_tx: postage::mpsc::Sender<proto::UpdateWorktree>,
     replica_id: ReplicaId,
     open_buffers: HashMap<usize, WeakModelHandle<Buffer>>,
     peers: HashMap<PeerId, ReplicaId>,
@@ -923,6 +953,10 @@ impl RemoteWorktree {
         })
     }
 
+    fn snapshot(&self) -> Snapshot {
+        self.snapshot.clone()
+    }
+
     pub fn add_peer(
         &mut self,
         envelope: TypedEnvelope<proto::AddPeer>,
@@ -954,17 +988,6 @@ impl RemoteWorktree {
         Ok(())
     }
 
-    pub fn update(
-        &mut self,
-        envelope: TypedEnvelope<proto::UpdateWorktree>,
-        cx: &mut ModelContext<Worktree>,
-    ) -> Result<()> {
-        self.snapshot.apply_update(envelope.payload)?;
-        self.update_open_buffers(cx);
-        cx.notify();
-        Ok(())
-    }
-
     fn update_open_buffers(&mut self, cx: &mut ModelContext<Worktree>) {
         let mut buffers_to_delete = Vec::new();
         for (buffer_id, buffer) in &self.open_buffers {
@@ -2305,17 +2328,22 @@ mod remote {
             .read()
             .await
             .shared_worktree(envelope.payload.worktree_id, cx)?
-            .update(cx, |worktree, cx| {
+            .update(cx, |worktree, _| {
                 if let Some(worktree) = worktree.as_remote_mut() {
-                    worktree.update(envelope, cx)?;
+                    let mut tx = worktree.updates_tx.clone();
+                    Ok(async move {
+                        tx.send(envelope.payload)
+                            .await
+                            .expect("receiver runs to completion");
+                    })
                 } else {
-                    log::error!(
+                    Err(anyhow!(
                         "invalid update message for local worktree {}",
                         envelope.payload.worktree_id
-                    );
+                    ))
                 }
-                Result::<_, anyhow::Error>::Ok(())
-            })?;
+            })?
+            .await;
 
         Ok(())
     }