Optimize `workspace/didChangeWatchedFiles` handler (#49778)

John Tur created

Previously, `didChangeWatchedFiles` registrations triggered a rebuild of
a single `GlobSet` containing all file watch patterns for the language
server. This means that, each time `didChangeWatchedFiles` is
registered, the work it takes to construct the `GlobSet` increases. This
quadratic blowup caused massive lag with language servers which register
thousands of watched files (like Roslyn).

Instead, create one `GlobSet` per registration and try matching them
one-by-one each time a file watcher event is raised.

---

Before you mark this PR as ready for review, make sure that you have:
- [X] Added a solid test coverage and/or screenshots from doing manual
testing
- [X] Done a self-review taking into account security and performance
aspects

Release Notes:

- Optimized performance for language servers which register many
file-watching notifications.

Change summary

crates/project/src/lsp_store.rs                   | 247 +++++++---------
crates/project/tests/integration/project_tests.rs | 164 +++++++++++
2 files changed, 276 insertions(+), 135 deletions(-)

Detailed changes

crates/project/src/lsp_store.rs 🔗

@@ -62,7 +62,7 @@ use futures::{
     select, select_biased,
     stream::FuturesUnordered,
 };
-use globset::{Glob, GlobBuilder, GlobMatcher, GlobSet, GlobSetBuilder};
+use globset::{Candidate, Glob, GlobBuilder, GlobMatcher, GlobSet, GlobSetBuilder};
 use gpui::{
     App, AppContext, AsyncApp, Context, Entity, EventEmitter, PromptLevel, SharedString,
     Subscription, Task, WeakEntity,
@@ -3527,12 +3527,12 @@ impl LocalLspStore {
         servers_to_remove.into_iter().collect()
     }
 
-    fn rebuild_watched_paths_inner<'a>(
-        &'a self,
+    fn build_watched_paths_registration(
+        &self,
         language_server_id: LanguageServerId,
-        watchers: impl Iterator<Item = &'a FileSystemWatcher>,
+        watchers: &[FileSystemWatcher],
         cx: &mut Context<LspStore>,
-    ) -> LanguageServerWatchedPathsBuilder {
+    ) -> WatchedPathsRegistration {
         let worktrees = self
             .worktree_store
             .read(cx)
@@ -3636,18 +3636,55 @@ impl LocalLspStore {
             }
         }
 
-        let mut watch_builder = LanguageServerWatchedPathsBuilder::default();
-        for (worktree_id, builder) in worktree_globs {
-            if let Ok(globset) = builder.build() {
-                watch_builder.watch_worktree(worktree_id, globset);
-            }
-        }
-        for (abs_path, builder) in abs_globs {
-            if let Ok(globset) = builder.build() {
-                watch_builder.watch_abs_path(abs_path, globset);
-            }
+        let worktree_globs = worktree_globs
+            .into_iter()
+            .filter_map(|(worktree_id, builder)| builder.build().ok().map(|gs| (worktree_id, gs)))
+            .collect();
+
+        let lsp_store = self.weak.clone();
+        let fs = self.fs.clone();
+        const LSP_ABS_PATH_OBSERVE: Duration = Duration::from_millis(100);
+
+        let abs_path_watchers = abs_globs
+            .into_iter()
+            .filter_map(|(abs_path, builder): (Arc<Path>, _)| {
+                let globset = builder.build().ok()?;
+                let task = cx.spawn({
+                    let fs = fs.clone();
+                    let lsp_store = lsp_store.clone();
+                    async move |_, cx| {
+                        maybe!(async move {
+                            let mut push_updates = fs.watch(&abs_path, LSP_ABS_PATH_OBSERVE).await;
+                            while let Some(update) = push_updates.0.next().await {
+                                let matching_entries: Vec<_> = update
+                                    .into_iter()
+                                    .filter(|event| globset.is_match(&event.path))
+                                    .collect();
+                                if matching_entries.is_empty() {
+                                    continue;
+                                }
+                                lsp_store
+                                    .update(cx, |this, _| {
+                                        this.lsp_notify_abs_paths_changed(
+                                            language_server_id,
+                                            matching_entries,
+                                        );
+                                    })
+                                    .ok()?;
+                            }
+                            Some(())
+                        })
+                        .await;
+                    }
+                });
+                Some(task)
+            })
+            .collect();
+
+        WatchedPathsRegistration {
+            worktree_globs,
+            _abs_path_watchers: abs_path_watchers,
         }
-        watch_builder
     }
 
     fn worktree_and_path_for_file_watcher(
@@ -3693,30 +3730,6 @@ impl LocalLspStore {
         })
     }
 
-    fn rebuild_watched_paths(
-        &mut self,
-        language_server_id: LanguageServerId,
-        cx: &mut Context<LspStore>,
-    ) {
-        let Some(registrations) = self
-            .language_server_dynamic_registrations
-            .get(&language_server_id)
-        else {
-            return;
-        };
-
-        let watch_builder = self.rebuild_watched_paths_inner(
-            language_server_id,
-            registrations.did_change_watched_files.values().flatten(),
-            cx,
-        );
-        let watcher = watch_builder.build(self.fs.clone(), language_server_id, cx);
-        self.language_server_watched_paths
-            .insert(language_server_id, watcher);
-
-        cx.notify();
-    }
-
     fn on_lsp_did_change_watched_files(
         &mut self,
         language_server_id: LanguageServerId,
@@ -3724,16 +3737,22 @@ impl LocalLspStore {
         params: DidChangeWatchedFilesRegistrationOptions,
         cx: &mut Context<LspStore>,
     ) {
-        let registrations = self
-            .language_server_dynamic_registrations
-            .entry(language_server_id)
-            .or_default();
+        let registration =
+            self.build_watched_paths_registration(language_server_id, &params.watchers, cx);
 
-        registrations
+        self.language_server_dynamic_registrations
+            .entry(language_server_id)
+            .or_default()
             .did_change_watched_files
             .insert(registration_id.to_string(), params.watchers);
 
-        self.rebuild_watched_paths(language_server_id, cx);
+        self.language_server_watched_paths
+            .entry(language_server_id)
+            .or_default()
+            .registrations
+            .insert(registration_id.to_string(), registration);
+
+        cx.notify();
     }
 
     fn on_lsp_unregister_did_change_watched_files(
@@ -3765,7 +3784,14 @@ impl LocalLspStore {
             );
         }
 
-        self.rebuild_watched_paths(language_server_id, cx);
+        if let Some(watched_paths) = self
+            .language_server_watched_paths
+            .get_mut(&language_server_id)
+        {
+            watched_paths.registrations.remove(registration_id);
+        }
+
+        cx.notify();
     }
 
     async fn initialization_options_for_adapter(
@@ -11642,20 +11668,34 @@ impl LspStore {
         language_server_ids.sort();
         language_server_ids.dedup();
 
-        // let abs_path = worktree_handle.read(cx).abs_path();
-        for server_id in &language_server_ids {
+        let active_servers: Vec<(LanguageServerId, &LanguageServerWatchedPaths)> =
+            language_server_ids
+                .iter()
+                .filter_map(|server_id| {
+                    if !matches!(
+                        local.language_servers.get(server_id),
+                        Some(LanguageServerState::Running { .. })
+                    ) {
+                        return None;
+                    }
+                    let watched_paths = local.language_server_watched_paths.get(server_id)?;
+                    Some((*server_id, watched_paths))
+                })
+                .collect();
+
+        for (server_id, watched_paths) in &active_servers {
             if let Some(LanguageServerState::Running { server, .. }) =
                 local.language_servers.get(server_id)
-                && let Some(watched_paths) = local
-                    .language_server_watched_paths
-                    .get(server_id)
-                    .and_then(|paths| paths.worktree_paths.get(&worktree_id))
             {
                 let params = lsp::DidChangeWatchedFilesParams {
                     changes: changes
                         .iter()
                         .filter_map(|(path, _, change)| {
-                            if !watched_paths.is_match(path.as_std_path()) {
+                            let candidate = Candidate::new(path.as_std_path());
+
+                            if !watched_paths
+                                .is_worktree_path_match_candidate(worktree_id, &candidate)
+                            {
                                 return None;
                             }
                             let typ = match change {
@@ -11665,10 +11705,9 @@ impl LspStore {
                                 PathChange::Updated => lsp::FileChangeType::CHANGED,
                                 PathChange::AddedOrUpdated => lsp::FileChangeType::CHANGED,
                             };
-                            let uri = lsp::Uri::from_file_path(
-                                worktree_handle.read(cx).absolutize(&path),
-                            )
-                            .ok()?;
+                            let uri =
+                                lsp::Uri::from_file_path(worktree_handle.read(cx).absolutize(path))
+                                    .ok()?;
                             Some(lsp::FileEvent { uri, typ })
                         })
                         .collect(),
@@ -13488,87 +13527,25 @@ impl RenameActionPredicate {
 
 #[derive(Default)]
 struct LanguageServerWatchedPaths {
-    worktree_paths: HashMap<WorktreeId, GlobSet>,
-    abs_paths: HashMap<Arc<Path>, (GlobSet, Task<()>)>,
+    registrations: HashMap<String, WatchedPathsRegistration>,
 }
 
-#[derive(Default)]
-struct LanguageServerWatchedPathsBuilder {
-    worktree_paths: HashMap<WorktreeId, GlobSet>,
-    abs_paths: HashMap<Arc<Path>, GlobSet>,
+struct WatchedPathsRegistration {
+    worktree_globs: HashMap<WorktreeId, GlobSet>,
+    _abs_path_watchers: Vec<Task<()>>,
 }
 
-impl LanguageServerWatchedPathsBuilder {
-    fn watch_worktree(&mut self, worktree_id: WorktreeId, glob_set: GlobSet) {
-        self.worktree_paths.insert(worktree_id, glob_set);
-    }
-    fn watch_abs_path(&mut self, path: Arc<Path>, glob_set: GlobSet) {
-        self.abs_paths.insert(path, glob_set);
-    }
-    fn build(
-        self,
-        fs: Arc<dyn Fs>,
-        language_server_id: LanguageServerId,
-        cx: &mut Context<LspStore>,
-    ) -> LanguageServerWatchedPaths {
-        let lsp_store = cx.weak_entity();
-
-        const LSP_ABS_PATH_OBSERVE: Duration = Duration::from_millis(100);
-        let abs_paths = self
-            .abs_paths
-            .into_iter()
-            .map(|(abs_path, globset)| {
-                let task = cx.spawn({
-                    let abs_path = abs_path.clone();
-                    let fs = fs.clone();
-
-                    let lsp_store = lsp_store.clone();
-                    async move |_, cx| {
-                        maybe!(async move {
-                            let mut push_updates = fs.watch(&abs_path, LSP_ABS_PATH_OBSERVE).await;
-                            while let Some(update) = push_updates.0.next().await {
-                                let action = lsp_store
-                                    .update(cx, |this, _| {
-                                        let Some(local) = this.as_local() else {
-                                            return ControlFlow::Break(());
-                                        };
-                                        let Some(watcher) = local
-                                            .language_server_watched_paths
-                                            .get(&language_server_id)
-                                        else {
-                                            return ControlFlow::Break(());
-                                        };
-                                        let (globs, _) = watcher.abs_paths.get(&abs_path).expect(
-                                            "Watched abs path is not registered with a watcher",
-                                        );
-                                        let matching_entries = update
-                                            .into_iter()
-                                            .filter(|event| globs.is_match(&event.path))
-                                            .collect::<Vec<_>>();
-                                        this.lsp_notify_abs_paths_changed(
-                                            language_server_id,
-                                            matching_entries,
-                                        );
-                                        ControlFlow::Continue(())
-                                    })
-                                    .ok()?;
-
-                                if action.is_break() {
-                                    break;
-                                }
-                            }
-                            Some(())
-                        })
-                        .await;
-                    }
-                });
-                (abs_path, (globset, task))
-            })
-            .collect();
-        LanguageServerWatchedPaths {
-            worktree_paths: self.worktree_paths,
-            abs_paths,
-        }
+impl LanguageServerWatchedPaths {
+    fn is_worktree_path_match_candidate(
+        &self,
+        worktree_id: WorktreeId,
+        candidate: &Candidate,
+    ) -> bool {
+        self.registrations.values().any(|reg| {
+            reg.worktree_globs
+                .get(&worktree_id)
+                .is_some_and(|gs| gs.is_match_candidate(candidate))
+        })
     }
 }
 

crates/project/tests/integration/project_tests.rs 🔗

@@ -2379,6 +2379,170 @@ async fn test_reporting_fs_changes_to_language_servers(cx: &mut gpui::TestAppCon
     );
 }
 
+#[gpui::test]
+async fn test_multiple_did_change_watched_files_registrations(cx: &mut gpui::TestAppContext) {
+    init_test(cx);
+
+    let fs = FakeFs::new(cx.executor());
+    fs.insert_tree(
+        path!("/root"),
+        json!({
+            "src": {
+                "a.rs": "",
+                "b.rs": "",
+            },
+            "docs": {
+                "readme.md": "",
+            },
+        }),
+    )
+    .await;
+
+    let project = Project::test(fs.clone(), [path!("/root").as_ref()], cx).await;
+    let language_registry = project.read_with(cx, |project, _| project.languages().clone());
+    language_registry.add(rust_lang());
+    let mut fake_servers = language_registry.register_fake_lsp(
+        "Rust",
+        FakeLspAdapter {
+            name: "the-language-server",
+            ..Default::default()
+        },
+    );
+
+    cx.executor().run_until_parked();
+
+    project
+        .update(cx, |project, cx| {
+            project.open_local_buffer_with_lsp(path!("/root/src/a.rs"), cx)
+        })
+        .await
+        .unwrap();
+
+    let fake_server = fake_servers.next().await.unwrap();
+    cx.executor().run_until_parked();
+
+    let file_changes = Arc::new(Mutex::new(Vec::new()));
+
+    // Register two separate watched file registrations.
+    fake_server
+        .request::<lsp::request::RegisterCapability>(
+            lsp::RegistrationParams {
+                registrations: vec![lsp::Registration {
+                    id: "reg-1".to_string(),
+                    method: "workspace/didChangeWatchedFiles".to_string(),
+                    register_options: serde_json::to_value(
+                        lsp::DidChangeWatchedFilesRegistrationOptions {
+                            watchers: vec![lsp::FileSystemWatcher {
+                                glob_pattern: lsp::GlobPattern::String(
+                                    path!("/root/src/*.rs").to_string(),
+                                ),
+                                kind: None,
+                            }],
+                        },
+                    )
+                    .ok(),
+                }],
+            },
+            DEFAULT_LSP_REQUEST_TIMEOUT,
+        )
+        .await
+        .into_response()
+        .unwrap();
+
+    fake_server
+        .request::<lsp::request::RegisterCapability>(
+            lsp::RegistrationParams {
+                registrations: vec![lsp::Registration {
+                    id: "reg-2".to_string(),
+                    method: "workspace/didChangeWatchedFiles".to_string(),
+                    register_options: serde_json::to_value(
+                        lsp::DidChangeWatchedFilesRegistrationOptions {
+                            watchers: vec![lsp::FileSystemWatcher {
+                                glob_pattern: lsp::GlobPattern::String(
+                                    path!("/root/docs/*.md").to_string(),
+                                ),
+                                kind: None,
+                            }],
+                        },
+                    )
+                    .ok(),
+                }],
+            },
+            DEFAULT_LSP_REQUEST_TIMEOUT,
+        )
+        .await
+        .into_response()
+        .unwrap();
+
+    fake_server.handle_notification::<lsp::notification::DidChangeWatchedFiles, _>({
+        let file_changes = file_changes.clone();
+        move |params, _| {
+            let mut file_changes = file_changes.lock();
+            file_changes.extend(params.changes);
+            file_changes.sort_by(|a, b| a.uri.cmp(&b.uri));
+        }
+    });
+
+    cx.executor().run_until_parked();
+
+    // Both registrations should match their respective patterns.
+    fs.create_file(path!("/root/src/c.rs").as_ref(), Default::default())
+        .await
+        .unwrap();
+    fs.create_file(path!("/root/docs/guide.md").as_ref(), Default::default())
+        .await
+        .unwrap();
+    cx.executor().run_until_parked();
+
+    assert_eq!(
+        &*file_changes.lock(),
+        &[
+            lsp::FileEvent {
+                uri: lsp::Uri::from_file_path(path!("/root/docs/guide.md")).unwrap(),
+                typ: lsp::FileChangeType::CREATED,
+            },
+            lsp::FileEvent {
+                uri: lsp::Uri::from_file_path(path!("/root/src/c.rs")).unwrap(),
+                typ: lsp::FileChangeType::CREATED,
+            },
+        ]
+    );
+    file_changes.lock().clear();
+
+    // Unregister the first registration.
+    fake_server
+        .request::<lsp::request::UnregisterCapability>(
+            lsp::UnregistrationParams {
+                unregisterations: vec![lsp::Unregistration {
+                    id: "reg-1".to_string(),
+                    method: "workspace/didChangeWatchedFiles".to_string(),
+                }],
+            },
+            DEFAULT_LSP_REQUEST_TIMEOUT,
+        )
+        .await
+        .into_response()
+        .unwrap();
+    cx.executor().run_until_parked();
+
+    // Only the second registration should still match.
+    fs.create_file(path!("/root/src/d.rs").as_ref(), Default::default())
+        .await
+        .unwrap();
+    fs.create_file(path!("/root/docs/notes.md").as_ref(), Default::default())
+        .await
+        .unwrap();
+    cx.executor().run_until_parked();
+
+    assert_eq!(
+        &*file_changes.lock(),
+        &[lsp::FileEvent {
+            uri: lsp::Uri::from_file_path(path!("/root/docs/notes.md")).unwrap(),
+            typ: lsp::FileChangeType::CREATED,
+        }]
+    );
+}
+
 #[gpui::test]
 async fn test_single_file_worktrees_diagnostics(cx: &mut gpui::TestAppContext) {
     init_test(cx);