Fix latent race conditions in extension test (#54167)

Ben Kunkle created

Hit a flake in this test in
https://github.com/zed-industries/zed/actions/runs/24555985980/job/71792549618.
Ran for 1000 iterations and hit a second race, now also hopefully fixed.

Self-Review Checklist:

- [x] I've reviewed my own diff for quality, security, and reliability
- [x] Unsafe blocks (if any) have justifying comments
- [x] The content is consistent with the [UI/UX
checklist](https://github.com/zed-industries/zed/blob/main/CONTRIBUTING.md#uiux-checklist)
- [x] Tests cover the new/changed behavior
- [x] Performance impact has been considered and is acceptable

Closes #ISSUE

Release Notes:

- N/A or Added/Fixed/Improved ...

Change summary

crates/extension_host/src/extension_store_test.rs | 53 ++++++++++++++--
1 file changed, 44 insertions(+), 9 deletions(-)

Detailed changes

crates/extension_host/src/extension_store_test.rs 🔗

@@ -721,6 +721,11 @@ async fn test_extension_store_with_test_extension(cx: &mut TestAppContext) {
         .detach();
     });
 
+    let mut extension_events = cx.events(&cx.update(|cx| {
+        extension::ExtensionEvents::try_global(cx)
+            .expect("ExtensionEvents should be initialized in tests")
+    }));
+
     let executor = cx.executor();
     await_or_timeout(
         &executor,
@@ -733,6 +738,24 @@ async fn test_extension_store_with_test_extension(cx: &mut TestAppContext) {
     .await
     .unwrap();
 
+    await_or_timeout(
+        &executor,
+        "awaiting ExtensionsInstalledChanged",
+        10,
+        async {
+            while let Some(event) = extension_events.next().await {
+                if matches!(event, extension::Event::ExtensionsInstalledChanged) {
+                    return;
+                }
+            }
+
+            panic!(
+                "[test_extension_store_with_test_extension] extension event stream ended before ExtensionsInstalledChanged"
+            );
+        },
+    )
+    .await;
+
     let mut fake_servers = language_registry.register_fake_lsp_server(
         LanguageServerName("gleam".into()),
         lsp::ServerCapabilities {
@@ -743,18 +766,22 @@ async fn test_extension_store_with_test_extension(cx: &mut TestAppContext) {
     );
     cx.executor().run_until_parked();
 
+    let mut project_events = cx.events(&project);
+    let buffer_path = project_dir.join("test.gleam");
     let (buffer, _handle) = await_or_timeout(
         &executor,
         "awaiting open_local_buffer_with_lsp",
         5,
         project.update(cx, |project, cx| {
-            project.open_local_buffer_with_lsp(project_dir.join("test.gleam"), cx)
+            project.open_local_buffer_with_lsp(buffer_path.clone(), cx)
         }),
     )
     .await
     .unwrap();
     cx.executor().run_until_parked();
 
+    let buffer_remote_id = buffer.read_with(cx, |buffer, _cx| buffer.remote_id());
+
     let fake_server = await_or_timeout(
         &executor,
         "awaiting first fake server spawn",
@@ -870,18 +897,26 @@ async fn test_extension_store_with_test_extension(cx: &mut TestAppContext) {
         ])))
     });
 
-    // `register_fake_lsp_server` can yield a server instance before the client has finished the LSP
-    // initialization handshake. Wait until we observe the client's `initialized` notification before
-    // issuing requests like completion.
+    // `register_fake_lsp_server` can yield a server instance before the client has fully registered
+    // the buffer with the project LSP plumbing. Wait for the project to observe that registration
+    // before issuing requests like completion.
     await_or_timeout(
         &executor,
-        "awaiting LSP Initialized notification",
+        "awaiting LanguageServerBufferRegistered",
         5,
         async {
-            fake_server
-                .clone()
-                .try_receive_notification::<lsp::notification::Initialized>()
-                .await;
+            while let Some(event) = project_events.next().await {
+                if let project::Event::LanguageServerBufferRegistered { buffer_id, .. } = event {
+                    if buffer_id == buffer_remote_id {
+                        return;
+                    }
+                }
+            }
+
+            panic!(
+                "[test_extension_store_with_test_extension] project event stream ended before buffer registration for {}",
+                buffer_path.display()
+            );
         },
     )
     .await;