ep: Fix race condition resulting in duplicate requests (#50830)

Oleksiy Syvokon created

When multiple `queue_prediction_refresh` calls land in the same
synchronous frame, all spawned tasks would read the same throttle state
and each send an identical cloud request.

Now the second task would see that the first request has been sent and
would quit.

Release Notes:

- N/A

Change summary

crates/edit_prediction/src/edit_prediction.rs       | 42 +++++++++---
crates/edit_prediction/src/edit_prediction_tests.rs | 46 ++++++++++++++
2 files changed, 76 insertions(+), 12 deletions(-)

Detailed changes

crates/edit_prediction/src/edit_prediction.rs 🔗

@@ -1989,31 +1989,49 @@ impl EditPredictionStore {
         let project_state = self.get_or_init_project(&project, cx);
         let pending_prediction_id = project_state.next_pending_prediction_id;
         project_state.next_pending_prediction_id += 1;
-        let last_request = *select_throttle(project_state, request_trigger);
+        let throttle_at_enqueue = *select_throttle(project_state, request_trigger);
 
         let task = cx.spawn(async move |this, cx| {
-            if let Some(timeout) = last_request.and_then(|(last_entity, last_timestamp)| {
-                if throttle_entity != last_entity {
-                    return None;
-                }
-                (last_timestamp + throttle_timeout).checked_duration_since(Instant::now())
-            }) {
+            let throttle_wait = this
+                .update(cx, |this, cx| {
+                    let project_state = this.get_or_init_project(&project, cx);
+                    let throttle = *select_throttle(project_state, request_trigger);
+
+                    throttle.and_then(|(last_entity, last_timestamp)| {
+                        if throttle_entity != last_entity {
+                            return None;
+                        }
+                        (last_timestamp + throttle_timeout).checked_duration_since(Instant::now())
+                    })
+                })
+                .ok()
+                .flatten();
+
+            if let Some(timeout) = throttle_wait {
                 cx.background_executor().timer(timeout).await;
             }
 
             // If this task was cancelled before the throttle timeout expired,
-            // do not perform a request.
+            // do not perform a request. Also skip if another task already
+            // proceeded since we were enqueued (duplicate).
             let mut is_cancelled = true;
             this.update(cx, |this, cx| {
                 let project_state = this.get_or_init_project(&project, cx);
                 let was_cancelled = project_state
                     .cancelled_predictions
                     .remove(&pending_prediction_id);
-                if !was_cancelled {
-                    let new_refresh = (throttle_entity, Instant::now());
-                    *select_throttle(project_state, request_trigger) = Some(new_refresh);
-                    is_cancelled = false;
+                if was_cancelled {
+                    return;
                 }
+
+                // Another request has been already sent since this was enqueued
+                if *select_throttle(project_state, request_trigger) != throttle_at_enqueue {
+                    return;
+                }
+
+                let new_refresh = (throttle_entity, Instant::now());
+                *select_throttle(project_state, request_trigger) = Some(new_refresh);
+                is_cancelled = false;
             })
             .ok();
             if is_cancelled {

crates/edit_prediction/src/edit_prediction_tests.rs 🔗

@@ -1486,6 +1486,52 @@ async fn test_jump_and_edit_throttles_are_independent(cx: &mut TestAppContext) {
     cx.run_until_parked();
 }
 
+#[gpui::test]
+async fn test_same_frame_duplicate_requests_deduplicated(cx: &mut TestAppContext) {
+    let (ep_store, mut requests) = init_test_with_fake_client(cx);
+    let fs = FakeFs::new(cx.executor());
+    fs.insert_tree(
+        "/root",
+        json!({
+            "foo.md":  "Hello!\nHow\nBye\n"
+        }),
+    )
+    .await;
+    let project = Project::test(fs, vec![path!("/root").as_ref()], cx).await;
+
+    let buffer = project
+        .update(cx, |project, cx| {
+            let path = project.find_project_path(path!("root/foo.md"), cx).unwrap();
+            project.open_buffer(path, cx)
+        })
+        .await
+        .unwrap();
+    let snapshot = buffer.read_with(cx, |buffer, _cx| buffer.snapshot());
+    let position = snapshot.anchor_before(language::Point::new(1, 3));
+
+    // Enqueue two refresh calls in the same synchronous frame (no yielding).
+    // Both `cx.spawn` tasks are created before either executes, so they both
+    // capture the same `proceed_count_at_enqueue`. Only the first task should
+    // pass the deduplication gate; the second should be skipped.
+    ep_store.update(cx, |ep_store, cx| {
+        ep_store.refresh_prediction_from_buffer(project.clone(), buffer.clone(), position, cx);
+        ep_store.refresh_prediction_from_buffer(project.clone(), buffer.clone(), position, cx);
+    });
+
+    // Let both spawned tasks run to completion (including any throttle waits).
+    cx.run_until_parked();
+
+    // Exactly one prediction request should have been sent.
+    let (request, respond_tx) = requests.predict.next().await.unwrap();
+    respond_tx
+        .send(model_response(&request, SIMPLE_DIFF))
+        .unwrap();
+    cx.run_until_parked();
+
+    // No second request should be pending.
+    assert_no_predict_request_ready(&mut requests.predict);
+}
+
 #[gpui::test]
 async fn test_rejections_flushing(cx: &mut TestAppContext) {
     let (ep_store, mut requests) = init_test_with_fake_client(cx);