Unify maintenance of open buffers into `Worktree::poll_snapshot`

Antonio Scandurra created

Change summary

zed/src/worktree.rs | 314 +++++++++++++++++++++-------------------------
1 file changed, 143 insertions(+), 171 deletions(-)

Detailed changes

zed/src/worktree.rs 🔗

@@ -181,8 +181,7 @@ impl Worktree {
                 };
 
                 let (updates_tx, mut updates_rx) = postage::mpsc::channel(64);
-                let (mut snapshot_tx, mut snapshot_rx) =
-                    postage::watch::channel_with(snapshot.clone());
+                let (mut snapshot_tx, snapshot_rx) = watch::channel_with(snapshot.clone());
 
                 cx.background()
                     .spawn(async move {
@@ -196,26 +195,25 @@ impl Worktree {
                     })
                     .detach();
 
-                cx.spawn_weak(|this, mut cx| async move {
-                    while let Some(snapshot) = snapshot_rx.recv().await {
-                        if let Some(this) = cx.read(|cx| this.upgrade(cx)) {
-                            this.update(&mut cx, |this, cx| {
-                                let this = this.as_remote_mut().unwrap();
-                                this.snapshot = snapshot;
-                                cx.notify();
-                                this.update_open_buffers(cx);
-                            });
-                        } else {
-                            break;
+                {
+                    let mut snapshot_rx = snapshot_rx.clone();
+                    cx.spawn_weak(|this, mut cx| async move {
+                        while let Some(_) = snapshot_rx.recv().await {
+                            if let Some(this) = cx.read(|cx| this.upgrade(cx)) {
+                                this.update(&mut cx, |this, cx| this.poll_snapshot(cx));
+                            } else {
+                                break;
+                            }
                         }
-                    }
-                })
-                .detach();
+                    })
+                    .detach();
+                }
 
                 Worktree::Remote(RemoteWorktree {
                     remote_id,
                     replica_id,
                     snapshot,
+                    snapshot_rx,
                     updates_tx,
                     rpc: rpc.clone(),
                     open_buffers: Default::default(),
@@ -392,6 +390,112 @@ impl Worktree {
             ))
         }
     }
+
+    fn poll_snapshot(&mut self, cx: &mut ModelContext<Worktree>) {
+        let update_buffers = match self {
+            Self::Local(worktree) => {
+                worktree.snapshot = worktree.background_snapshot.lock().clone();
+                if worktree.is_scanning() {
+                    if !worktree.poll_scheduled {
+                        cx.spawn(|this, mut cx| async move {
+                            smol::Timer::after(Duration::from_millis(100)).await;
+                            this.update(&mut cx, |this, cx| {
+                                this.as_local_mut().unwrap().poll_scheduled = false;
+                                this.poll_snapshot(cx);
+                            })
+                        })
+                        .detach();
+                        worktree.poll_scheduled = true;
+                    }
+                    false
+                } else {
+                    true
+                }
+            }
+            Self::Remote(worktree) => {
+                worktree.snapshot = worktree.snapshot_rx.borrow().clone();
+                true
+            }
+        };
+
+        if update_buffers {
+            let mut buffers_to_delete = Vec::new();
+            for (buffer_id, buffer) in self.open_buffers() {
+                if let Some(buffer) = buffer.upgrade(&cx) {
+                    buffer.update(cx, |buffer, cx| {
+                        let buffer_is_clean = !buffer.is_dirty();
+
+                        if let Some(file) = buffer.file_mut() {
+                            let mut file_changed = false;
+
+                            if let Some(entry) = file
+                                .entry_id
+                                .and_then(|entry_id| self.entry_for_id(entry_id))
+                            {
+                                if entry.path != file.path {
+                                    file.path = entry.path.clone();
+                                    file_changed = true;
+                                }
+
+                                if entry.mtime != file.mtime {
+                                    file.mtime = entry.mtime;
+                                    file_changed = true;
+                                    if let Some(worktree) = self.as_local() {
+                                        if buffer_is_clean {
+                                            let abs_path = worktree.absolutize(&file.path);
+                                            refresh_buffer(abs_path, cx);
+                                        }
+                                    }
+                                }
+                            } else if let Some(entry) = self.entry_for_path(&file.path) {
+                                file.entry_id = Some(entry.id);
+                                file.mtime = entry.mtime;
+                                if let Some(worktree) = self.as_local() {
+                                    if buffer_is_clean {
+                                        let abs_path = worktree.absolutize(&file.path);
+                                        refresh_buffer(abs_path, cx);
+                                    }
+                                }
+                                file_changed = true;
+                            } else if !file.is_deleted() {
+                                if buffer_is_clean {
+                                    cx.emit(editor::buffer::Event::Dirtied);
+                                }
+                                file.entry_id = None;
+                                file_changed = true;
+                            }
+
+                            if file_changed {
+                                cx.emit(editor::buffer::Event::FileHandleChanged);
+                            }
+                        }
+                    });
+                } else {
+                    buffers_to_delete.push(*buffer_id);
+                }
+            }
+
+            for buffer_id in buffers_to_delete {
+                self.open_buffers_mut().remove(&buffer_id);
+            }
+        }
+
+        cx.notify();
+    }
+
+    fn open_buffers(&self) -> &HashMap<usize, WeakModelHandle<Buffer>> {
+        match self {
+            Self::Local(worktree) => &worktree.open_buffers,
+            Self::Remote(worktree) => &worktree.open_buffers,
+        }
+    }
+
+    fn open_buffers_mut(&mut self) -> &mut HashMap<usize, WeakModelHandle<Buffer>> {
+        match self {
+            Self::Local(worktree) => &mut worktree.open_buffers,
+            Self::Remote(worktree) => &mut worktree.open_buffers,
+        }
+    }
 }
 
 impl Deref for Worktree {
@@ -409,7 +513,7 @@ pub struct LocalWorktree {
     snapshot: Snapshot,
     background_snapshot: Arc<Mutex<Snapshot>>,
     snapshots_to_send_tx: Option<Sender<Snapshot>>,
-    scan_state: (watch::Sender<ScanState>, watch::Receiver<ScanState>),
+    last_scan_state_rx: watch::Receiver<ScanState>,
     _event_stream_handle: fsevent::Handle,
     poll_scheduled: bool,
     rpc: Option<(rpc::Client, u64)>,
@@ -426,7 +530,8 @@ impl LocalWorktree {
         cx: &mut ModelContext<Worktree>,
     ) -> Self {
         let abs_path = path.into();
-        let (scan_state_tx, scan_state_rx) = smol::channel::unbounded();
+        let (scan_states_tx, scan_states_rx) = smol::channel::unbounded();
+        let (mut last_scan_state_tx, last_scan_state_rx) = watch::channel_with(ScanState::Scanning);
         let id = cx.model_id();
         let snapshot = Snapshot {
             id,
@@ -449,7 +554,7 @@ impl LocalWorktree {
             snapshot,
             background_snapshot: background_snapshot.clone(),
             snapshots_to_send_tx: None,
-            scan_state: watch::channel_with(ScanState::Scanning),
+            last_scan_state_rx,
             _event_stream_handle: event_stream_handle,
             poll_scheduled: false,
             open_buffers: Default::default(),
@@ -460,17 +565,26 @@ impl LocalWorktree {
         };
 
         std::thread::spawn(move || {
-            let scanner = BackgroundScanner::new(background_snapshot, scan_state_tx, id);
+            let scanner = BackgroundScanner::new(background_snapshot, scan_states_tx, id);
             scanner.run(event_stream)
         });
 
         cx.spawn_weak(|this, mut cx| async move {
-            while let Ok(scan_state) = scan_state_rx.recv().await {
+            while let Ok(scan_state) = scan_states_rx.recv().await {
                 if let Some(handle) = cx.read(|cx| this.upgrade(&cx)) {
                     handle.update(&mut cx, |this, cx| {
-                        this.as_local_mut()
-                            .unwrap()
-                            .observe_scan_state(scan_state, cx)
+                        last_scan_state_tx.blocking_send(scan_state).ok();
+                        this.poll_snapshot(cx);
+                        let tree = this.as_local_mut().unwrap();
+                        if !tree.is_scanning() {
+                            if let Some(snapshots_to_send_tx) = tree.snapshots_to_send_tx.clone() {
+                                if let Err(err) =
+                                    smol::block_on(snapshots_to_send_tx.send(tree.snapshot()))
+                                {
+                                    log::error!("error submitting snapshot to send {}", err);
+                                }
+                            }
+                        }
                     });
                 } else {
                     break;
@@ -600,7 +714,7 @@ impl LocalWorktree {
     }
 
     pub fn scan_complete(&self) -> impl Future<Output = ()> {
-        let mut scan_state_rx = self.scan_state.1.clone();
+        let mut scan_state_rx = self.last_scan_state_rx.clone();
         async move {
             let mut scan_state = Some(scan_state_rx.borrow().clone());
             while let Some(ScanState::Scanning) = scan_state {
@@ -609,96 +723,8 @@ impl LocalWorktree {
         }
     }
 
-    fn observe_scan_state(&mut self, scan_state: ScanState, cx: &mut ModelContext<Worktree>) {
-        self.scan_state.0.blocking_send(scan_state).ok();
-        self.poll_snapshot(cx);
-        if !self.is_scanning() {
-            if let Some(snapshots_to_send_tx) = self.snapshots_to_send_tx.clone() {
-                if let Err(err) = smol::block_on(snapshots_to_send_tx.send(self.snapshot())) {
-                    log::error!("error submitting snapshot to send {}", err);
-                }
-            }
-        }
-    }
-
-    fn poll_snapshot(&mut self, cx: &mut ModelContext<Worktree>) {
-        self.snapshot = self.background_snapshot.lock().clone();
-        if self.is_scanning() {
-            if !self.poll_scheduled {
-                cx.spawn(|this, mut cx| async move {
-                    smol::Timer::after(Duration::from_millis(100)).await;
-                    this.update(&mut cx, |this, cx| {
-                        let worktree = this.as_local_mut().unwrap();
-                        worktree.poll_scheduled = false;
-                        worktree.poll_snapshot(cx);
-                    })
-                })
-                .detach();
-                self.poll_scheduled = true;
-            }
-        } else {
-            let mut buffers_to_delete = Vec::new();
-            for (buffer_id, buffer) in &self.open_buffers {
-                if let Some(buffer) = buffer.upgrade(&cx) {
-                    buffer.update(cx, |buffer, cx| {
-                        let buffer_is_clean = !buffer.is_dirty();
-
-                        if let Some(file) = buffer.file_mut() {
-                            let mut file_changed = false;
-
-                            if let Some(entry) = file
-                                .entry_id
-                                .and_then(|entry_id| self.entry_for_id(entry_id))
-                            {
-                                if entry.path != file.path {
-                                    file.path = entry.path.clone();
-                                    file_changed = true;
-                                }
-
-                                if entry.mtime != file.mtime {
-                                    file.mtime = entry.mtime;
-                                    file_changed = true;
-                                    if buffer_is_clean {
-                                        let abs_path = self.absolutize(&file.path);
-                                        refresh_buffer(abs_path, cx);
-                                    }
-                                }
-                            } else if let Some(entry) = self.entry_for_path(&file.path) {
-                                file.entry_id = Some(entry.id);
-                                file.mtime = entry.mtime;
-                                if buffer_is_clean {
-                                    let abs_path = self.absolutize(&file.path);
-                                    refresh_buffer(abs_path, cx);
-                                }
-                                file_changed = true;
-                            } else if !file.is_deleted() {
-                                if buffer_is_clean {
-                                    cx.emit(editor::buffer::Event::Dirtied);
-                                }
-                                file.entry_id = None;
-                                file_changed = true;
-                            }
-
-                            if file_changed {
-                                cx.emit(editor::buffer::Event::FileHandleChanged);
-                            }
-                        }
-                    });
-                } else {
-                    buffers_to_delete.push(*buffer_id);
-                }
-            }
-
-            for buffer_id in buffers_to_delete {
-                self.open_buffers.remove(&buffer_id);
-            }
-        }
-
-        cx.notify();
-    }
-
     fn is_scanning(&self) -> bool {
-        if let ScanState::Scanning = *self.scan_state.1.borrow() {
+        if let ScanState::Scanning = *self.last_scan_state_rx.borrow() {
             true
         } else {
             false
@@ -736,10 +762,7 @@ impl LocalWorktree {
             file.read_to_string(&mut text).await?;
             // Eagerly populate the snapshot with an updated entry for the loaded file
             let entry = refresh_entry(&background_snapshot, path, &abs_path)?;
-            this.update(&mut cx, |this, cx| {
-                let this = this.as_local_mut().unwrap();
-                this.poll_snapshot(cx);
-            });
+            this.update(&mut cx, |this, cx| this.poll_snapshot(cx));
             Ok((File::new(entry.id, handle, entry.path, entry.mtime), text))
         })
     }
@@ -787,9 +810,7 @@ impl LocalWorktree {
 
         cx.spawn(|this, mut cx| async move {
             let entry = save.await?;
-            this.update(&mut cx, |this, cx| {
-                this.as_local_mut().unwrap().poll_snapshot(cx);
-            });
+            this.update(&mut cx, |this, cx| this.poll_snapshot(cx));
             Ok(entry)
         })
     }
@@ -902,6 +923,7 @@ impl fmt::Debug for LocalWorktree {
 pub struct RemoteWorktree {
     remote_id: u64,
     snapshot: Snapshot,
+    snapshot_rx: watch::Receiver<Snapshot>,
     rpc: rpc::Client,
     updates_tx: postage::mpsc::Sender<proto::UpdateWorktree>,
     replica_id: ReplicaId,
@@ -1000,56 +1022,6 @@ impl RemoteWorktree {
         cx.notify();
         Ok(())
     }
-
-    fn update_open_buffers(&mut self, cx: &mut ModelContext<Worktree>) {
-        let mut buffers_to_delete = Vec::new();
-        for (buffer_id, buffer) in &self.open_buffers {
-            if let Some(buffer) = buffer.upgrade(&cx) {
-                buffer.update(cx, |buffer, cx| {
-                    let buffer_is_clean = !buffer.is_dirty();
-
-                    if let Some(file) = buffer.file_mut() {
-                        let mut file_changed = false;
-
-                        if let Some(entry) = file
-                            .entry_id
-                            .and_then(|entry_id| self.snapshot.entry_for_id(entry_id))
-                        {
-                            if entry.path != file.path {
-                                file.path = entry.path.clone();
-                                file_changed = true;
-                            }
-
-                            if entry.mtime != file.mtime {
-                                file.mtime = entry.mtime;
-                                file_changed = true;
-                            }
-                        } else if let Some(entry) = self.snapshot.entry_for_path(&file.path) {
-                            file.entry_id = Some(entry.id);
-                            file.mtime = entry.mtime;
-                            file_changed = true;
-                        } else if !file.is_deleted() {
-                            if buffer_is_clean {
-                                cx.emit(editor::buffer::Event::Dirtied);
-                            }
-                            file.entry_id = None;
-                            file_changed = true;
-                        }
-
-                        if file_changed {
-                            cx.emit(editor::buffer::Event::FileHandleChanged);
-                        }
-                    }
-                });
-            } else {
-                buffers_to_delete.push(*buffer_id);
-            }
-        }
-
-        for buffer_id in buffers_to_delete {
-            self.open_buffers.remove(&buffer_id);
-        }
-    }
 }
 
 #[derive(Clone)]