agent: Wait until linked git worktree state is loaded (#51395)

Anthony Eid , Ben Kunkle , cameron , and Zed Zippy created

### Context 

This fixes a bug where the sidebar would show a newly created git
worktree thread as its own project while the recently created workspace
is loading its git state. The fix is adding project APIs to await the
initial worktree store scan and then each git repo initial snapshot;
then awaiting on them before adding the new workspace to the
multi-workspace.

### Architecture:

I added the `Worktree::Remote::wait_for_snapshot` API to
`Worktree::Local` to enable `WorktreeStore` to await for both remote and
local projects until there's an initial scan. The `WorktreeStore` uses
the watcher pattern so it can update the initial scan state whenever
visible worktrees are added or removed from the store.

Before you mark this PR as ready for review, make sure that you have:
- [x] Added solid test coverage and/or screenshots from doing manual
testing
- [x] Done a self-review taking into account security and performance
aspects
- [x] Aligned any UI changes with the [UI
checklist](https://github.com/zed-industries/zed/blob/main/CONTRIBUTING.md#uiux-checklist)

Release Notes:

- N/A

---------

Co-authored-by: Ben Kunkle <ben@zed.dev>
Co-authored-by: cameron <cameron.studdstreet@gmail.com>
Co-authored-by: Zed Zippy <234243425+zed-zippy[bot]@users.noreply.github.com>

Change summary

crates/agent_ui/src/agent_panel.rs                                  | 28 
crates/edit_prediction_context/src/edit_prediction_context_tests.rs |  1 
crates/edit_prediction_context/src/fake_definition_lsp.rs           | 35 
crates/project/src/project.rs                                       |  7 
crates/project/src/worktree_store.rs                                | 78 
crates/project/tests/integration/project_tests.rs                   | 71 
crates/worktree/src/worktree.rs                                     | 43 
7 files changed, 250 insertions(+), 13 deletions(-)

Detailed changes

crates/agent_ui/src/agent_panel.rs 🔗

@@ -2945,13 +2945,35 @@ impl AgentPanel {
             })?
             .await?;
 
-        let panels_task = new_window_handle.update(cx, |_, _, cx| {
-            new_workspace.update(cx, |workspace, _cx| workspace.take_panels_task())
-        })?;
+        let panels_task = new_workspace.update(cx, |workspace, _cx| workspace.take_panels_task());
+
         if let Some(task) = panels_task {
             task.await.log_err();
         }
 
+        new_workspace
+            .update(cx, |workspace, cx| {
+                workspace.project().read(cx).wait_for_initial_scan(cx)
+            })
+            .await;
+
+        new_workspace
+            .update(cx, |workspace, cx| {
+                let repos = workspace
+                    .project()
+                    .read(cx)
+                    .repositories(cx)
+                    .values()
+                    .cloned()
+                    .collect::<Vec<_>>();
+
+                let tasks = repos
+                    .into_iter()
+                    .map(|repo| repo.update(cx, |repo, _| repo.barrier()));
+                futures::future::join_all(tasks)
+            })
+            .await;
+
         let initial_content = AgentInitialContent::ContentBlock {
             blocks: content,
             auto_submit: true,

crates/edit_prediction_context/src/fake_definition_lsp.rs 🔗

@@ -174,7 +174,7 @@ pub fn register_fake_definition_server(
 struct DefinitionIndex {
     language: Arc<Language>,
     definitions: HashMap<String, Vec<lsp::Location>>,
-    type_annotations: HashMap<String, String>,
+    type_annotations_by_file: HashMap<Uri, HashMap<String, String>>,
     files: HashMap<Uri, FileEntry>,
 }
 
@@ -189,7 +189,7 @@ impl DefinitionIndex {
         Self {
             language,
             definitions: HashMap::default(),
-            type_annotations: HashMap::default(),
+            type_annotations_by_file: HashMap::default(),
             files: HashMap::default(),
         }
     }
@@ -199,6 +199,7 @@ impl DefinitionIndex {
             locations.retain(|loc| &loc.uri != uri);
             !locations.is_empty()
         });
+        self.type_annotations_by_file.remove(uri);
         self.files.remove(uri);
     }
 
@@ -243,11 +244,11 @@ impl DefinitionIndex {
                 .push(location);
         }
 
-        for (identifier_name, type_name) in extract_type_annotations(content) {
-            self.type_annotations
-                .entry(identifier_name)
-                .or_insert(type_name);
-        }
+        let type_annotations = extract_type_annotations(content)
+            .into_iter()
+            .collect::<HashMap<_, _>>();
+        self.type_annotations_by_file
+            .insert(uri.clone(), type_annotations);
 
         self.files.insert(
             uri,
@@ -279,7 +280,11 @@ impl DefinitionIndex {
         let entry = self.files.get(&uri)?;
         let name = word_at_position(&entry.contents, position)?;
 
-        if let Some(type_name) = self.type_annotations.get(name) {
+        if let Some(type_name) = self
+            .type_annotations_by_file
+            .get(&uri)
+            .and_then(|annotations| annotations.get(name))
+        {
             if let Some(locations) = self.definitions.get(type_name) {
                 return Some(lsp::GotoDefinitionResponse::Array(locations.clone()));
             }
@@ -367,6 +372,20 @@ fn extract_base_type_name(type_str: &str) -> String {
         return outer.to_string();
     }
 
+    if let Some(call_start) = trimmed.find("::") {
+        let outer = &trimmed[..call_start];
+        if matches!(outer, "Arc" | "Box" | "Rc" | "Option" | "Vec" | "Cow") {
+            let rest = trimmed[call_start + 2..].trim_start();
+            if let Some(paren_start) = rest.find('(') {
+                let inner = &rest[paren_start + 1..];
+                let inner = inner.trim();
+                if !inner.is_empty() {
+                    return extract_base_type_name(inner);
+                }
+            }
+        }
+    }
+
     trimmed
         .split(|c: char| !c.is_alphanumeric() && c != '_')
         .next()

crates/project/src/project.rs 🔗

@@ -120,6 +120,7 @@ use std::{
     borrow::Cow,
     collections::BTreeMap,
     ffi::OsString,
+    future::Future,
     ops::{Not as _, Range},
     path::{Path, PathBuf},
     pin::pin,
@@ -2078,6 +2079,12 @@ impl Project {
         self.worktree_store.clone()
     }
 
+    /// Returns a future that resolves when all visible worktrees have completed
+    /// their initial scan.
+    pub fn wait_for_initial_scan(&self, cx: &App) -> impl Future<Output = ()> + use<> {
+        self.worktree_store.read(cx).wait_for_initial_scan()
+    }
+
     #[inline]
     pub fn context_server_store(&self) -> Entity<ContextServerStore> {
         self.context_server_store.clone()

crates/project/src/worktree_store.rs 🔗

@@ -1,4 +1,5 @@
 use std::{
+    future::Future,
     path::{Path, PathBuf},
     sync::{
         Arc,
@@ -15,6 +16,7 @@ use gpui::{
     WeakEntity,
 };
 use itertools::Either;
+use postage::{prelude::Stream as _, watch};
 use rpc::{
     AnyProtoClient, ErrorExt, TypedEnvelope,
     proto::{self, REMOTE_SERVER_PROJECT_ID},
@@ -75,6 +77,7 @@ pub struct WorktreeStore {
     #[allow(clippy::type_complexity)]
     loading_worktrees:
         HashMap<Arc<SanitizedPath>, Shared<Task<Result<Entity<Worktree>, Arc<anyhow::Error>>>>>,
+    initial_scan_complete: (watch::Sender<bool>, watch::Receiver<bool>),
     state: WorktreeStoreState,
 }
 
@@ -119,6 +122,7 @@ impl WorktreeStore {
             worktrees_reordered: false,
             scanning_enabled: true,
             retain_worktrees,
+            initial_scan_complete: watch::channel_with(true),
             state: WorktreeStoreState::Local { fs },
         }
     }
@@ -139,6 +143,7 @@ impl WorktreeStore {
             worktrees_reordered: false,
             scanning_enabled: true,
             retain_worktrees,
+            initial_scan_complete: watch::channel_with(true),
             state: WorktreeStoreState::Remote {
                 upstream_client,
                 upstream_project_id,
@@ -174,6 +179,57 @@ impl WorktreeStore {
 
     pub fn disable_scanner(&mut self) {
         self.scanning_enabled = false;
+        *self.initial_scan_complete.0.borrow_mut() = true;
+    }
+
+    /// Returns a future that resolves when all visible worktrees have completed
+    /// their initial scan (entries populated, git repos detected).
+    pub fn wait_for_initial_scan(&self) -> impl Future<Output = ()> + use<> {
+        let mut rx = self.initial_scan_complete.1.clone();
+        async move {
+            let mut done = *rx.borrow();
+            while !done {
+                if let Some(value) = rx.recv().await {
+                    done = value;
+                } else {
+                    break;
+                }
+            }
+        }
+    }
+
+    /// Returns whether all visible worktrees have completed their initial scan.
+    pub fn initial_scan_completed(&self) -> bool {
+        *self.initial_scan_complete.1.borrow()
+    }
+
+    /// Checks whether all visible worktrees have completed their initial scan
+    /// and no worktree creations are pending, and updates the watch channel accordingly.
+    fn update_initial_scan_state(&mut self, cx: &App) {
+        let complete = self.loading_worktrees.is_empty()
+            && self
+                .visible_worktrees(cx)
+                .all(|wt| wt.read(cx).completed_scan_id() >= 1);
+        *self.initial_scan_complete.0.borrow_mut() = complete;
+    }
+
+    /// Spawns a detached task that waits for a worktree's initial scan to complete,
+    /// then rechecks and updates the aggregate initial scan state.
+    fn observe_worktree_scan_completion(
+        &mut self,
+        worktree: &Entity<Worktree>,
+        cx: &mut Context<Self>,
+    ) {
+        let await_scan = worktree.update(cx, |worktree, _cx| worktree.wait_for_snapshot(1));
+        cx.spawn(async move |this, cx| {
+            await_scan.await.ok();
+            this.update(cx, |this, cx| {
+                this.update_initial_scan_state(cx);
+            })
+            .ok();
+            anyhow::Ok(())
+        })
+        .detach();
     }
 
     /// Iterates through all worktrees, including ones that don't appear in the project panel
@@ -554,12 +610,22 @@ impl WorktreeStore {
 
             self.loading_worktrees
                 .insert(abs_path.clone(), task.shared());
+
+            if visible && self.scanning_enabled {
+                *self.initial_scan_complete.0.borrow_mut() = false;
+            }
         }
         let task = self.loading_worktrees.get(&abs_path).unwrap().clone();
         cx.spawn(async move |this, cx| {
             let result = task.await;
-            this.update(cx, |this, _| this.loading_worktrees.remove(&abs_path))
-                .ok();
+            this.update(cx, |this, cx| {
+                this.loading_worktrees.remove(&abs_path);
+                if !visible || !this.scanning_enabled || result.is_err() {
+                    this.update_initial_scan_state(cx);
+                }
+            })
+            .ok();
+
             match result {
                 Ok(worktree) => {
                     if !is_via_collab {
@@ -578,6 +644,13 @@ impl WorktreeStore {
                                 );
                             });
                         }
+
+                        this.update(cx, |this, cx| {
+                            if this.scanning_enabled && visible {
+                                this.observe_worktree_scan_completion(&worktree, cx);
+                            }
+                        })
+                        .ok();
                     }
                     Ok(worktree)
                 }
@@ -768,6 +841,7 @@ impl WorktreeStore {
                 false
             }
         });
+        self.update_initial_scan_state(cx);
         self.send_project_updates(cx);
     }
 

crates/project/tests/integration/project_tests.rs 🔗

@@ -11883,6 +11883,77 @@ async fn test_undo_encoding_change(cx: &mut gpui::TestAppContext) {
     });
 }
 
+#[gpui::test]
+async fn test_initial_scan_complete(cx: &mut gpui::TestAppContext) {
+    init_test(cx);
+
+    let fs = FakeFs::new(cx.executor());
+    fs.insert_tree(
+        path!("/root"),
+        json!({
+            "a": {
+                ".git": {},
+                ".zed": {
+                    "tasks.json": r#"[{"label": "task-a", "command": "echo a"}]"#
+                },
+                "src": { "main.rs": "" }
+            },
+            "b": {
+                ".git": {},
+                ".zed": {
+                    "tasks.json": r#"[{"label": "task-b", "command": "echo b"}]"#
+                },
+                "src": { "lib.rs": "" }
+            },
+        }),
+    )
+    .await;
+
+    let repos_created = Rc::new(RefCell::new(Vec::new()));
+    let _observe = {
+        let repos_created = repos_created.clone();
+        cx.update(|cx| {
+            cx.observe_new::<Repository>(move |repo, _, cx| {
+                repos_created.borrow_mut().push(cx.entity().downgrade());
+                let _ = repo;
+            })
+        })
+    };
+
+    let project = Project::test(
+        fs.clone(),
+        [path!("/root/a").as_ref(), path!("/root/b").as_ref()],
+        cx,
+    )
+    .await;
+
+    let scan_complete = project.read_with(cx, |project, cx| project.wait_for_initial_scan(cx));
+    scan_complete.await;
+
+    project.read_with(cx, |project, cx| {
+        assert!(
+            project.worktree_store().read(cx).initial_scan_completed(),
+            "Expected initial scan to be completed after awaiting wait_for_initial_scan"
+        );
+    });
+
+    let created_repos_len = repos_created.borrow().len();
+    assert_eq!(
+        created_repos_len, 2,
+        "Expected 2 repositories to be created during scan, got {}",
+        created_repos_len
+    );
+
+    project.read_with(cx, |project, cx| {
+        let git_store = project.git_store().read(cx);
+        assert_eq!(
+            git_store.repositories().len(),
+            2,
+            "Expected 2 repositories in GitStore"
+        );
+    });
+}
+
 pub fn init_test(cx: &mut gpui::TestAppContext) {
     zlog::init_test();
 

crates/worktree/src/worktree.rs 🔗

@@ -128,6 +128,7 @@ pub struct LocalWorktree {
     scan_requests_tx: channel::Sender<ScanRequest>,
     path_prefixes_to_scan_tx: channel::Sender<PathPrefixScanRequest>,
     is_scanning: (watch::Sender<bool>, watch::Receiver<bool>),
+    snapshot_subscriptions: VecDeque<(usize, oneshot::Sender<()>)>,
     _background_scanner_tasks: Vec<Task<()>>,
     update_observer: Option<UpdateObservationState>,
     fs: Arc<dyn Fs>,
@@ -470,6 +471,7 @@ impl Worktree {
                 next_entry_id,
                 snapshot,
                 is_scanning: watch::channel_with(true),
+                snapshot_subscriptions: Default::default(),
                 update_observer: None,
                 scan_requests_tx,
                 path_prefixes_to_scan_tx,
@@ -714,6 +716,16 @@ impl Worktree {
         }
     }
 
+    pub fn wait_for_snapshot(
+        &mut self,
+        scan_id: usize,
+    ) -> impl Future<Output = Result<()>> + use<> {
+        match self {
+            Worktree::Local(this) => this.wait_for_snapshot(scan_id).boxed(),
+            Worktree::Remote(this) => this.wait_for_snapshot(scan_id).boxed(),
+        }
+    }
+
     #[cfg(feature = "test-support")]
     pub fn has_update_observer(&self) -> bool {
         match self {
@@ -1170,6 +1182,15 @@ impl LocalWorktree {
         if !repo_changes.is_empty() {
             cx.emit(Event::UpdatedGitRepositories(repo_changes));
         }
+
+        while let Some((scan_id, _)) = self.snapshot_subscriptions.front() {
+            if self.snapshot.completed_scan_id >= *scan_id {
+                let (_, tx) = self.snapshot_subscriptions.pop_front().unwrap();
+                tx.send(()).ok();
+            } else {
+                break;
+            }
+        }
     }
 
     fn changed_repos(
@@ -1286,6 +1307,28 @@ impl LocalWorktree {
         }
     }
 
+    pub fn wait_for_snapshot(
+        &mut self,
+        scan_id: usize,
+    ) -> impl Future<Output = Result<()>> + use<> {
+        let (tx, rx) = oneshot::channel();
+        if self.snapshot.completed_scan_id >= scan_id {
+            tx.send(()).ok();
+        } else {
+            match self
+                .snapshot_subscriptions
+                .binary_search_by_key(&scan_id, |probe| probe.0)
+            {
+                Ok(ix) | Err(ix) => self.snapshot_subscriptions.insert(ix, (scan_id, tx)),
+            }
+        }
+
+        async move {
+            rx.await?;
+            Ok(())
+        }
+    }
+
     pub fn snapshot(&self) -> LocalSnapshot {
         self.snapshot.clone()
     }