Merge pull request #2471 from zed-industries/optimize-update-local-worktree-buffers

Julia created

Optimize update local worktree buffers

Change summary

crates/project/src/project.rs  | 175 ++++++++++++++++++++++-------------
crates/project/src/worktree.rs |  90 +++++++++++++----
2 files changed, 178 insertions(+), 87 deletions(-)

Detailed changes

crates/project/src/project.rs 🔗

@@ -122,6 +122,8 @@ pub struct Project {
     loading_local_worktrees:
         HashMap<Arc<Path>, Shared<Task<Result<ModelHandle<Worktree>, Arc<anyhow::Error>>>>>,
     opened_buffers: HashMap<u64, OpenBuffer>,
+    local_buffer_ids_by_path: HashMap<ProjectPath, u64>,
+    local_buffer_ids_by_entry_id: HashMap<ProjectEntryId, u64>,
     /// A mapping from a buffer ID to None means that we've started waiting for an ID but haven't finished loading it.
     /// Used for re-issuing buffer requests when peers temporarily disconnect
     incomplete_remote_buffers: HashMap<u64, Option<ModelHandle<Buffer>>>,
@@ -449,6 +451,8 @@ impl Project {
                 incomplete_remote_buffers: Default::default(),
                 loading_buffers_by_path: Default::default(),
                 loading_local_worktrees: Default::default(),
+                local_buffer_ids_by_path: Default::default(),
+                local_buffer_ids_by_entry_id: Default::default(),
                 buffer_snapshots: Default::default(),
                 join_project_response_message_id: 0,
                 client_state: None,
@@ -517,6 +521,8 @@ impl Project {
                 shared_buffers: Default::default(),
                 incomplete_remote_buffers: Default::default(),
                 loading_local_worktrees: Default::default(),
+                local_buffer_ids_by_path: Default::default(),
+                local_buffer_ids_by_entry_id: Default::default(),
                 active_entry: None,
                 collaborators: Default::default(),
                 join_project_response_message_id: response.message_id,
@@ -1628,6 +1634,21 @@ impl Project {
         })
         .detach();
 
+        if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
+            if file.is_local {
+                self.local_buffer_ids_by_path.insert(
+                    ProjectPath {
+                        worktree_id: file.worktree_id(cx),
+                        path: file.path.clone(),
+                    },
+                    remote_id,
+                );
+
+                self.local_buffer_ids_by_entry_id
+                    .insert(file.entry_id, remote_id);
+            }
+        }
+
         self.detect_language_for_buffer(buffer, cx);
         self.register_buffer_with_language_servers(buffer, cx);
         self.register_buffer_with_copilot(buffer, cx);
@@ -4536,7 +4557,7 @@ impl Project {
         if worktree.read(cx).is_local() {
             cx.subscribe(worktree, |this, worktree, event, cx| match event {
                 worktree::Event::UpdatedEntries(changes) => {
-                    this.update_local_worktree_buffers(&worktree, cx);
+                    this.update_local_worktree_buffers(&worktree, &changes, cx);
                     this.update_local_worktree_language_servers(&worktree, changes, cx);
                 }
                 worktree::Event::UpdatedGitRepositories(updated_repos) => {
@@ -4570,80 +4591,106 @@ impl Project {
     fn update_local_worktree_buffers(
         &mut self,
         worktree_handle: &ModelHandle<Worktree>,
+        changes: &HashMap<(Arc<Path>, ProjectEntryId), PathChange>,
         cx: &mut ModelContext<Self>,
     ) {
         let snapshot = worktree_handle.read(cx).snapshot();
 
-        let mut buffers_to_delete = Vec::new();
         let mut renamed_buffers = Vec::new();
+        for (path, entry_id) in changes.keys() {
+            let worktree_id = worktree_handle.read(cx).id();
+            let project_path = ProjectPath {
+                worktree_id,
+                path: path.clone(),
+            };
 
-        for (buffer_id, buffer) in &self.opened_buffers {
-            if let Some(buffer) = buffer.upgrade(cx) {
-                buffer.update(cx, |buffer, cx| {
-                    if let Some(old_file) = File::from_dyn(buffer.file()) {
-                        if old_file.worktree != *worktree_handle {
-                            return;
-                        }
+            let buffer_id = match self.local_buffer_ids_by_entry_id.get(entry_id) {
+                Some(&buffer_id) => buffer_id,
+                None => match self.local_buffer_ids_by_path.get(&project_path) {
+                    Some(&buffer_id) => buffer_id,
+                    None => continue,
+                },
+            };
 
-                        let new_file = if let Some(entry) = snapshot.entry_for_id(old_file.entry_id)
-                        {
-                            File {
-                                is_local: true,
-                                entry_id: entry.id,
-                                mtime: entry.mtime,
-                                path: entry.path.clone(),
-                                worktree: worktree_handle.clone(),
-                                is_deleted: false,
-                            }
-                        } else if let Some(entry) =
-                            snapshot.entry_for_path(old_file.path().as_ref())
-                        {
-                            File {
-                                is_local: true,
-                                entry_id: entry.id,
-                                mtime: entry.mtime,
-                                path: entry.path.clone(),
-                                worktree: worktree_handle.clone(),
-                                is_deleted: false,
-                            }
-                        } else {
-                            File {
-                                is_local: true,
-                                entry_id: old_file.entry_id,
-                                path: old_file.path().clone(),
-                                mtime: old_file.mtime(),
-                                worktree: worktree_handle.clone(),
-                                is_deleted: true,
-                            }
-                        };
+            let open_buffer = self.opened_buffers.get(&buffer_id);
+            let buffer = if let Some(buffer) = open_buffer.and_then(|buffer| buffer.upgrade(cx)) {
+                buffer
+            } else {
+                self.opened_buffers.remove(&buffer_id);
+                self.local_buffer_ids_by_path.remove(&project_path);
+                self.local_buffer_ids_by_entry_id.remove(entry_id);
+                continue;
+            };
 
-                        let old_path = old_file.abs_path(cx);
-                        if new_file.abs_path(cx) != old_path {
-                            renamed_buffers.push((cx.handle(), old_file.clone()));
+            buffer.update(cx, |buffer, cx| {
+                if let Some(old_file) = File::from_dyn(buffer.file()) {
+                    if old_file.worktree != *worktree_handle {
+                        return;
+                    }
+
+                    let new_file = if let Some(entry) = snapshot.entry_for_id(old_file.entry_id) {
+                        File {
+                            is_local: true,
+                            entry_id: entry.id,
+                            mtime: entry.mtime,
+                            path: entry.path.clone(),
+                            worktree: worktree_handle.clone(),
+                            is_deleted: false,
                         }
+                    } else if let Some(entry) = snapshot.entry_for_path(old_file.path().as_ref()) {
+                        File {
+                            is_local: true,
+                            entry_id: entry.id,
+                            mtime: entry.mtime,
+                            path: entry.path.clone(),
+                            worktree: worktree_handle.clone(),
+                            is_deleted: false,
+                        }
+                    } else {
+                        File {
+                            is_local: true,
+                            entry_id: old_file.entry_id,
+                            path: old_file.path().clone(),
+                            mtime: old_file.mtime(),
+                            worktree: worktree_handle.clone(),
+                            is_deleted: true,
+                        }
+                    };
 
-                        if new_file != *old_file {
-                            if let Some(project_id) = self.remote_id() {
-                                self.client
-                                    .send(proto::UpdateBufferFile {
-                                        project_id,
-                                        buffer_id: *buffer_id as u64,
-                                        file: Some(new_file.to_proto()),
-                                    })
-                                    .log_err();
-                            }
+                    let old_path = old_file.abs_path(cx);
+                    if new_file.abs_path(cx) != old_path {
+                        renamed_buffers.push((cx.handle(), old_file.clone()));
+                        self.local_buffer_ids_by_path.remove(&project_path);
+                        self.local_buffer_ids_by_path.insert(
+                            ProjectPath {
+                                worktree_id,
+                                path: path.clone(),
+                            },
+                            buffer_id,
+                        );
+                    }
 
-                            buffer.file_updated(Arc::new(new_file), cx).detach();
-                        }
+                    if new_file.entry_id != *entry_id {
+                        self.local_buffer_ids_by_entry_id.remove(entry_id);
+                        self.local_buffer_ids_by_entry_id
+                            .insert(new_file.entry_id, buffer_id);
                     }
-                });
-            } else {
-                buffers_to_delete.push(*buffer_id);
-            }
-        }
 
-        for buffer_id in buffers_to_delete {
-            self.opened_buffers.remove(&buffer_id);
+                    if new_file != *old_file {
+                        if let Some(project_id) = self.remote_id() {
+                            self.client
+                                .send(proto::UpdateBufferFile {
+                                    project_id,
+                                    buffer_id: buffer_id as u64,
+                                    file: Some(new_file.to_proto()),
+                                })
+                                .log_err();
+                        }
+
+                        buffer.file_updated(Arc::new(new_file), cx).detach();
+                    }
+                }
+            });
         }
 
         for (buffer, old_file) in renamed_buffers {
@@ -4656,7 +4703,7 @@ impl Project {
     fn update_local_worktree_language_servers(
         &mut self,
         worktree_handle: &ModelHandle<Worktree>,
-        changes: &HashMap<Arc<Path>, PathChange>,
+        changes: &HashMap<(Arc<Path>, ProjectEntryId), PathChange>,
         cx: &mut ModelContext<Self>,
     ) {
         let worktree_id = worktree_handle.read(cx).id();
@@ -4673,7 +4720,7 @@ impl Project {
                         let params = lsp::DidChangeWatchedFilesParams {
                             changes: changes
                                 .iter()
-                                .filter_map(|(path, change)| {
+                                .filter_map(|((path, _), change)| {
                                     let path = abs_path.join(path);
                                     if watched_paths.matches(&path) {
                                         Some(lsp::FileEvent {

crates/project/src/worktree.rs 🔗

@@ -382,7 +382,7 @@ enum ScanState {
     Started,
     Updated {
         snapshot: LocalSnapshot,
-        changes: HashMap<Arc<Path>, PathChange>,
+        changes: HashMap<(Arc<Path>, ProjectEntryId), PathChange>,
         barrier: Option<barrier::Sender>,
         scanning: bool,
     },
@@ -396,7 +396,7 @@ struct ShareState {
 }
 
 pub enum Event {
-    UpdatedEntries(HashMap<Arc<Path>, PathChange>),
+    UpdatedEntries(HashMap<(Arc<Path>, ProjectEntryId), PathChange>),
     UpdatedGitRepositories(HashMap<Arc<Path>, LocalRepositoryEntry>),
 }
 
@@ -2532,10 +2532,15 @@ struct BackgroundScanner {
     status_updates_tx: UnboundedSender<ScanState>,
     executor: Arc<executor::Background>,
     refresh_requests_rx: channel::Receiver<(Vec<PathBuf>, barrier::Sender)>,
-    prev_state: Mutex<(Snapshot, Vec<Arc<Path>>)>,
+    prev_state: Mutex<BackgroundScannerState>,
     finished_initial_scan: bool,
 }
 
+struct BackgroundScannerState {
+    snapshot: Snapshot,
+    event_paths: Vec<Arc<Path>>,
+}
+
 impl BackgroundScanner {
     fn new(
         snapshot: LocalSnapshot,
@@ -2549,7 +2554,10 @@ impl BackgroundScanner {
             status_updates_tx,
             executor,
             refresh_requests_rx,
-            prev_state: Mutex::new((snapshot.snapshot.clone(), Vec::new())),
+            prev_state: Mutex::new(BackgroundScannerState {
+                snapshot: snapshot.snapshot.clone(),
+                event_paths: Default::default(),
+            }),
             snapshot: Mutex::new(snapshot),
             finished_initial_scan: false,
         }
@@ -2648,7 +2656,15 @@ impl BackgroundScanner {
     }
 
     async fn process_refresh_request(&self, paths: Vec<PathBuf>, barrier: barrier::Sender) -> bool {
-        self.reload_entries_for_paths(paths, None).await;
+        if let Some(mut paths) = self.reload_entries_for_paths(paths, None).await {
+            paths.sort_unstable();
+            util::extend_sorted(
+                &mut self.prev_state.lock().event_paths,
+                paths,
+                usize::MAX,
+                Ord::cmp,
+            );
+        }
         self.send_status_update(false, Some(barrier))
     }
 
@@ -2659,7 +2675,12 @@ impl BackgroundScanner {
             .await
         {
             paths.sort_unstable();
-            util::extend_sorted(&mut self.prev_state.lock().1, paths, usize::MAX, Ord::cmp);
+            util::extend_sorted(
+                &mut self.prev_state.lock().event_paths,
+                paths,
+                usize::MAX,
+                Ord::cmp,
+            );
         }
         drop(scan_job_tx);
         self.scan_dirs(false, scan_job_rx).await;
@@ -2693,6 +2714,7 @@ impl BackgroundScanner {
         drop(snapshot);
 
         self.send_status_update(false, None);
+        self.prev_state.lock().event_paths.clear();
     }
 
     async fn scan_dirs(
@@ -2770,14 +2792,18 @@ impl BackgroundScanner {
 
     fn send_status_update(&self, scanning: bool, barrier: Option<barrier::Sender>) -> bool {
         let mut prev_state = self.prev_state.lock();
-        let snapshot = self.snapshot.lock().clone();
-        let mut old_snapshot = snapshot.snapshot.clone();
-        mem::swap(&mut old_snapshot, &mut prev_state.0);
-        let changed_paths = mem::take(&mut prev_state.1);
-        let changes = self.build_change_set(&old_snapshot, &snapshot.snapshot, changed_paths);
+        let new_snapshot = self.snapshot.lock().clone();
+        let old_snapshot = mem::replace(&mut prev_state.snapshot, new_snapshot.snapshot.clone());
+
+        let changes = self.build_change_set(
+            &old_snapshot,
+            &new_snapshot.snapshot,
+            &prev_state.event_paths,
+        );
+
         self.status_updates_tx
             .unbounded_send(ScanState::Updated {
-                snapshot,
+                snapshot: new_snapshot,
                 changes,
                 scanning,
                 barrier,
@@ -3245,8 +3271,8 @@ impl BackgroundScanner {
         &self,
         old_snapshot: &Snapshot,
         new_snapshot: &Snapshot,
-        event_paths: Vec<Arc<Path>>,
-    ) -> HashMap<Arc<Path>, PathChange> {
+        event_paths: &[Arc<Path>],
+    ) -> HashMap<(Arc<Path>, ProjectEntryId), PathChange> {
         use PathChange::{Added, AddedOrUpdated, Removed, Updated};
 
         let mut changes = HashMap::default();
@@ -3255,7 +3281,7 @@ impl BackgroundScanner {
         let received_before_initialized = !self.finished_initial_scan;
 
         for path in event_paths {
-            let path = PathKey(path);
+            let path = PathKey(path.clone());
             old_paths.seek(&path, Bias::Left, &());
             new_paths.seek(&path, Bias::Left, &());
 
@@ -3272,7 +3298,7 @@ impl BackgroundScanner {
 
                         match Ord::cmp(&old_entry.path, &new_entry.path) {
                             Ordering::Less => {
-                                changes.insert(old_entry.path.clone(), Removed);
+                                changes.insert((old_entry.path.clone(), old_entry.id), Removed);
                                 old_paths.next(&());
                             }
                             Ordering::Equal => {
@@ -3280,31 +3306,35 @@ impl BackgroundScanner {
                                     // If the worktree was not fully initialized when this event was generated,
                                     // we can't know whether this entry was added during the scan or whether
                                     // it was merely updated.
-                                    changes.insert(new_entry.path.clone(), AddedOrUpdated);
+                                    changes.insert(
+                                        (new_entry.path.clone(), new_entry.id),
+                                        AddedOrUpdated,
+                                    );
                                 } else if old_entry.mtime != new_entry.mtime {
-                                    changes.insert(new_entry.path.clone(), Updated);
+                                    changes.insert((new_entry.path.clone(), new_entry.id), Updated);
                                 }
                                 old_paths.next(&());
                                 new_paths.next(&());
                             }
                             Ordering::Greater => {
-                                changes.insert(new_entry.path.clone(), Added);
+                                changes.insert((new_entry.path.clone(), new_entry.id), Added);
                                 new_paths.next(&());
                             }
                         }
                     }
                     (Some(old_entry), None) => {
-                        changes.insert(old_entry.path.clone(), Removed);
+                        changes.insert((old_entry.path.clone(), old_entry.id), Removed);
                         old_paths.next(&());
                     }
                     (None, Some(new_entry)) => {
-                        changes.insert(new_entry.path.clone(), Added);
+                        changes.insert((new_entry.path.clone(), new_entry.id), Added);
                         new_paths.next(&());
                     }
                     (None, None) => break,
                 }
             }
         }
+
         changes
     }
 
@@ -4382,7 +4412,7 @@ mod tests {
 
             cx.subscribe(&worktree, move |tree, _, event, _| {
                 if let Event::UpdatedEntries(changes) = event {
-                    for (path, change_type) in changes.iter() {
+                    for ((path, _), change_type) in changes.iter() {
                         let path = path.clone();
                         let ix = match paths.binary_search(&path) {
                             Ok(ix) | Err(ix) => ix,
@@ -4392,13 +4422,16 @@ mod tests {
                                 assert_ne!(paths.get(ix), Some(&path));
                                 paths.insert(ix, path);
                             }
+
                             PathChange::Removed => {
                                 assert_eq!(paths.get(ix), Some(&path));
                                 paths.remove(ix);
                             }
+
                             PathChange::Updated => {
                                 assert_eq!(paths.get(ix), Some(&path));
                             }
+
                             PathChange::AddedOrUpdated => {
                                 if paths[ix] != path {
                                     paths.insert(ix, path);
@@ -4406,6 +4439,7 @@ mod tests {
                             }
                         }
                     }
+
                     let new_paths = tree.paths().cloned().collect::<Vec<_>>();
                     assert_eq!(paths, new_paths, "incorrect changes: {:?}", changes);
                 }
@@ -4416,7 +4450,17 @@ mod tests {
         let mut snapshots = Vec::new();
         let mut mutations_len = operations;
         while mutations_len > 1 {
-            randomly_mutate_fs(&fs, root_dir, 1.0, &mut rng).await;
+            if rng.gen_bool(0.2) {
+                worktree
+                    .update(cx, |worktree, cx| {
+                        randomly_mutate_worktree(worktree, &mut rng, cx)
+                    })
+                    .await
+                    .unwrap();
+            } else {
+                randomly_mutate_fs(&fs, root_dir, 1.0, &mut rng).await;
+            }
+
             let buffered_event_count = fs.as_fake().buffered_event_count().await;
             if buffered_event_count > 0 && rng.gen_bool(0.3) {
                 let len = rng.gen_range(0..=buffered_event_count);