WIP - try representing snapshots_to_send as a watch

Max Brunsfeld created

Change summary

crates/project/src/worktree.rs | 196 +++++++++++++++++------------------
1 file changed, 93 insertions(+), 103 deletions(-)

Detailed changes

crates/project/src/worktree.rs 🔗

@@ -134,7 +134,7 @@ enum ScanState {
 
 struct ShareState {
     project_id: u64,
-    snapshots_tx: Sender<LocalSnapshot>,
+    snapshots_tx: watch::Sender<LocalSnapshot>,
     _maintain_remote_snapshot: Option<Task<Option<()>>>,
 }
 
@@ -334,38 +334,9 @@ impl Worktree {
 
     fn poll_snapshot(&mut self, cx: &mut ModelContext<Self>) {
         match self {
-            Self::Local(worktree) => {
-                let is_fake_fs = worktree.fs.is_fake();
-                worktree.snapshot = worktree.background_snapshot.lock().clone();
-                if matches!(worktree.scan_state(), ScanState::Initializing) {
-                    if worktree.poll_task.is_none() {
-                        worktree.poll_task = Some(cx.spawn_weak(|this, mut cx| async move {
-                            if is_fake_fs {
-                                #[cfg(any(test, feature = "test-support"))]
-                                cx.background().simulate_random_delay().await;
-                            } else {
-                                smol::Timer::after(Duration::from_millis(100)).await;
-                            }
-                            if let Some(this) = this.upgrade(&cx) {
-                                this.update(&mut cx, |this, cx| {
-                                    this.as_local_mut().unwrap().poll_task = None;
-                                    this.poll_snapshot(cx);
-                                });
-                            }
-                        }));
-                    }
-                } else {
-                    worktree.poll_task.take();
-                    cx.emit(Event::UpdatedEntries);
-                }
-            }
-            Self::Remote(worktree) => {
-                worktree.snapshot = worktree.background_snapshot.lock().clone();
-                cx.emit(Event::UpdatedEntries);
-            }
+            Self::Local(worktree) => worktree.poll_snapshot(cx),
+            Self::Remote(worktree) => worktree.poll_snapshot(cx),
         };
-
-        cx.notify();
     }
 }
 
@@ -441,9 +412,8 @@ impl LocalWorktree {
                         last_scan_state_tx.blocking_send(scan_state).ok();
                         this.update(&mut cx, |this, cx| {
                             this.poll_snapshot(cx);
-                            this.as_local().unwrap().broadcast_snapshot()
-                        })
-                        .await;
+                            this.as_local_mut().unwrap().broadcast_snapshot()
+                        });
                     } else {
                         break;
                     }
@@ -527,6 +497,40 @@ impl LocalWorktree {
         Ok(updated)
     }
 
+    fn poll_snapshot(&mut self, cx: &mut ModelContext<Worktree>) {
+        match self.scan_state() {
+            ScanState::Idle => {
+                self.snapshot = self.background_snapshot.lock().clone();
+                self.poll_task.take();
+                cx.emit(Event::UpdatedEntries);
+            }
+            ScanState::Initializing => {
+                self.snapshot = self.background_snapshot.lock().clone();
+                if self.poll_task.is_none() {
+                    let is_fake_fs = self.fs.is_fake();
+                    self.poll_task = Some(cx.spawn_weak(|this, mut cx| async move {
+                        if is_fake_fs {
+                            #[cfg(any(test, feature = "test-support"))]
+                            cx.background().simulate_random_delay().await;
+                        } else {
+                            smol::Timer::after(Duration::from_millis(100)).await;
+                        }
+                        if let Some(this) = this.upgrade(&cx) {
+                            this.update(&mut cx, |this, cx| {
+                                this.as_local_mut().unwrap().poll_task = None;
+                                this.poll_snapshot(cx);
+                            });
+                        }
+                    }));
+                }
+                cx.emit(Event::UpdatedEntries);
+            }
+            ScanState::Updating => {}
+            ScanState::Err(_) => {}
+        }
+        cx.notify();
+    }
+
     pub fn scan_complete(&self) -> impl Future<Output = ()> {
         let mut scan_state_rx = self.last_scan_state_rx.clone();
         async move {
@@ -666,16 +670,15 @@ impl LocalWorktree {
 
         Some(cx.spawn(|this, mut cx| async move {
             delete.await?;
-            this.update(&mut cx, |this, _| {
-                let this = this.as_local_mut().unwrap();
-                let mut snapshot = this.background_snapshot.lock();
-                snapshot.delete_entry(entry_id);
-            });
             this.update(&mut cx, |this, cx| {
+                let this = this.as_local_mut().unwrap();
+                {
+                    let mut snapshot = this.background_snapshot.lock();
+                    snapshot.delete_entry(entry_id);
+                }
                 this.poll_snapshot(cx);
-                this.as_local().unwrap().broadcast_snapshot()
-            })
-            .await;
+                this.broadcast_snapshot();
+            });
             Ok(())
         }))
     }
@@ -712,10 +715,10 @@ impl LocalWorktree {
                 })
                 .await?;
             this.update(&mut cx, |this, cx| {
+                let this = this.as_local_mut().unwrap();
                 this.poll_snapshot(cx);
-                this.as_local().unwrap().broadcast_snapshot()
-            })
-            .await;
+                this.broadcast_snapshot();
+            });
             Ok(entry)
         }))
     }
@@ -752,10 +755,10 @@ impl LocalWorktree {
                 })
                 .await?;
             this.update(&mut cx, |this, cx| {
+                let this = this.as_local_mut().unwrap();
                 this.poll_snapshot(cx);
-                this.as_local().unwrap().broadcast_snapshot()
-            })
-            .await;
+                this.broadcast_snapshot()
+            });
             Ok(entry)
         }))
     }
@@ -790,10 +793,10 @@ impl LocalWorktree {
                 })
                 .await?;
             this.update(&mut cx, |this, cx| {
+                let this = this.as_local_mut().unwrap();
                 this.poll_snapshot(cx);
-                this.as_local().unwrap().broadcast_snapshot()
-            })
-            .await;
+                this.broadcast_snapshot();
+            });
             Ok(entry)
         })
     }
@@ -826,45 +829,42 @@ impl LocalWorktree {
             let this = this
                 .upgrade(&cx)
                 .ok_or_else(|| anyhow!("worktree was dropped"))?;
-            let (entry, snapshot, snapshots_tx) = this.read_with(&cx, |this, _| {
-                let this = this.as_local().unwrap();
-                let mut snapshot = this.background_snapshot.lock();
-                entry.is_ignored = snapshot
-                    .ignore_stack_for_path(&path, entry.is_dir())
-                    .is_path_ignored(&path, entry.is_dir());
-                if let Some(old_path) = old_path {
-                    snapshot.remove_path(&old_path);
+            this.update(&mut cx, |this, cx| {
+                let this = this.as_local_mut().unwrap();
+                let inserted_entry;
+                {
+                    let mut snapshot = this.background_snapshot.lock();
+                    entry.is_ignored = snapshot
+                        .ignore_stack_for_path(&path, entry.is_dir())
+                        .is_path_ignored(&path, entry.is_dir());
+                    if let Some(old_path) = old_path {
+                        snapshot.remove_path(&old_path);
+                    }
+                    inserted_entry = snapshot.insert_entry(entry, fs.as_ref());
+                    snapshot.scan_id += 1;
                 }
-                let entry = snapshot.insert_entry(entry, fs.as_ref());
-                snapshot.scan_id += 1;
-                let snapshots_tx = this.share.as_ref().map(|s| s.snapshots_tx.clone());
-                (entry, snapshot.clone(), snapshots_tx)
-            });
-            this.update(&mut cx, |this, cx| this.poll_snapshot(cx));
-
-            if let Some(snapshots_tx) = snapshots_tx {
-                snapshots_tx.send(snapshot).await.ok();
-            }
-
-            Ok(entry)
+                this.poll_snapshot(cx);
+                this.broadcast_snapshot();
+                Ok(inserted_entry)
+            })
         })
     }
 
     pub fn share(&mut self, project_id: u64, cx: &mut ModelContext<Worktree>) -> Task<Result<()>> {
         let (share_tx, share_rx) = oneshot::channel();
-        let (snapshots_to_send_tx, snapshots_to_send_rx) =
-            smol::channel::unbounded::<LocalSnapshot>();
+
         if self.share.is_some() {
             let _ = share_tx.send(Ok(()));
         } else {
+            let (snapshots_tx, mut snapshots_rx) = watch::channel_with(self.snapshot());
             let rpc = self.client.clone();
             let worktree_id = cx.model_id() as u64;
             let maintain_remote_snapshot = cx.background().spawn({
                 let rpc = rpc.clone();
                 let diagnostic_summaries = self.diagnostic_summaries.clone();
                 async move {
-                    let mut prev_snapshot = match snapshots_to_send_rx.recv().await {
-                        Ok(snapshot) => {
+                    let mut prev_snapshot = match snapshots_rx.recv().await {
+                        Some(snapshot) => {
                             let update = proto::UpdateWorktree {
                                 project_id,
                                 worktree_id,
@@ -886,8 +886,10 @@ impl LocalWorktree {
                                 snapshot
                             }
                         }
-                        Err(error) => {
-                            let _ = share_tx.send(Err(error.into()));
+                        None => {
+                            share_tx
+                                .send(Err(anyhow!("worktree dropped before share completed")))
+                                .ok();
                             return Err(anyhow!("failed to send initial update worktree"));
                         }
                     };
@@ -900,11 +902,7 @@ impl LocalWorktree {
                         })?;
                     }
 
-                    while let Ok(mut snapshot) = snapshots_to_send_rx.recv().await {
-                        while let Ok(newer_snapshot) = snapshots_to_send_rx.try_recv() {
-                            snapshot = newer_snapshot;
-                        }
-
+                    while let Some(snapshot) = snapshots_rx.recv().await {
                         send_worktree_update(
                             &rpc,
                             snapshot.build_update(&prev_snapshot, project_id, worktree_id, true),
@@ -919,18 +917,12 @@ impl LocalWorktree {
             });
             self.share = Some(ShareState {
                 project_id,
-                snapshots_tx: snapshots_to_send_tx.clone(),
+                snapshots_tx,
                 _maintain_remote_snapshot: Some(maintain_remote_snapshot),
             });
         }
 
-        cx.spawn_weak(|this, cx| async move {
-            if let Some(this) = this.upgrade(&cx) {
-                this.read_with(&cx, |this, _| {
-                    let this = this.as_local().unwrap();
-                    let _ = snapshots_to_send_tx.try_send(this.snapshot());
-                });
-            }
+        cx.foreground().spawn(async move {
             share_rx
                 .await
                 .unwrap_or_else(|_| Err(anyhow!("share ended")))
@@ -945,19 +937,11 @@ impl LocalWorktree {
         self.share.is_some()
     }
 
-    fn broadcast_snapshot(&self) -> impl Future<Output = ()> {
-        let mut to_send = None;
+    fn broadcast_snapshot(&mut self) {
         if matches!(self.scan_state(), ScanState::Idle) {
-            if let Some(share) = self.share.as_ref() {
-                to_send = Some((self.snapshot(), share.snapshots_tx.clone()));
-            }
-        }
-
-        async move {
-            if let Some((snapshot, snapshots_to_send_tx)) = to_send {
-                if let Err(err) = snapshots_to_send_tx.send(snapshot).await {
-                    log::error!("error submitting snapshot to send {}", err);
-                }
+            let snapshot = self.snapshot();
+            if let Some(share) = self.share.as_mut() {
+                *share.snapshots_tx.borrow_mut() = snapshot;
             }
         }
     }
@@ -968,6 +952,12 @@ impl RemoteWorktree {
         self.snapshot.clone()
     }
 
+    fn poll_snapshot(&mut self, cx: &mut ModelContext<Worktree>) {
+        self.snapshot = self.background_snapshot.lock().clone();
+        cx.emit(Event::UpdatedEntries);
+        cx.notify();
+    }
+
     pub fn disconnected_from_host(&mut self) {
         self.updates_tx.take();
         self.snapshot_subscriptions.clear();