From 27a9b60e47bda1d651d41817b812726d2aca43d5 Mon Sep 17 00:00:00 2001 From: Oleksiy Syvokon Date: Thu, 5 Mar 2026 17:20:04 +0200 Subject: [PATCH] ep: Fix race condition resulting in duplicate requests (#50830) When multiple `queue_prediction_refresh` calls land in the same synchronous frame, all spawned tasks would read the same throttle state and each send an identical cloud request. Now the second task would see that the first request has been sent and would quit. Release Notes: - N/A --- crates/edit_prediction/src/edit_prediction.rs | 42 ++++++++++++----- .../src/edit_prediction_tests.rs | 46 +++++++++++++++++++ 2 files changed, 76 insertions(+), 12 deletions(-) diff --git a/crates/edit_prediction/src/edit_prediction.rs b/crates/edit_prediction/src/edit_prediction.rs index 5c7ce045121739f341b84dd87d827878550f4048..8a61a4a8fe0cd7a3dd7cc081c051839faa43112f 100644 --- a/crates/edit_prediction/src/edit_prediction.rs +++ b/crates/edit_prediction/src/edit_prediction.rs @@ -1989,31 +1989,49 @@ impl EditPredictionStore { let project_state = self.get_or_init_project(&project, cx); let pending_prediction_id = project_state.next_pending_prediction_id; project_state.next_pending_prediction_id += 1; - let last_request = *select_throttle(project_state, request_trigger); + let throttle_at_enqueue = *select_throttle(project_state, request_trigger); let task = cx.spawn(async move |this, cx| { - if let Some(timeout) = last_request.and_then(|(last_entity, last_timestamp)| { - if throttle_entity != last_entity { - return None; - } - (last_timestamp + throttle_timeout).checked_duration_since(Instant::now()) - }) { + let throttle_wait = this + .update(cx, |this, cx| { + let project_state = this.get_or_init_project(&project, cx); + let throttle = *select_throttle(project_state, request_trigger); + + throttle.and_then(|(last_entity, last_timestamp)| { + if throttle_entity != last_entity { + return None; + } + (last_timestamp + throttle_timeout).checked_duration_since(Instant::now()) + }) + }) + .ok() + .flatten(); + + if let Some(timeout) = throttle_wait { cx.background_executor().timer(timeout).await; } // If this task was cancelled before the throttle timeout expired, - // do not perform a request. + // do not perform a request. Also skip if another task already + // proceeded since we were enqueued (duplicate). let mut is_cancelled = true; this.update(cx, |this, cx| { let project_state = this.get_or_init_project(&project, cx); let was_cancelled = project_state .cancelled_predictions .remove(&pending_prediction_id); - if !was_cancelled { - let new_refresh = (throttle_entity, Instant::now()); - *select_throttle(project_state, request_trigger) = Some(new_refresh); - is_cancelled = false; + if was_cancelled { + return; } + + // Another request has been already sent since this was enqueued + if *select_throttle(project_state, request_trigger) != throttle_at_enqueue { + return; + } + + let new_refresh = (throttle_entity, Instant::now()); + *select_throttle(project_state, request_trigger) = Some(new_refresh); + is_cancelled = false; }) .ok(); if is_cancelled { diff --git a/crates/edit_prediction/src/edit_prediction_tests.rs b/crates/edit_prediction/src/edit_prediction_tests.rs index b34ff6fce71fe7afcaff68121510f48f6f8f98c4..594bfd482052950a5b3835798f83d5905573711c 100644 --- a/crates/edit_prediction/src/edit_prediction_tests.rs +++ b/crates/edit_prediction/src/edit_prediction_tests.rs @@ -1486,6 +1486,52 @@ async fn test_jump_and_edit_throttles_are_independent(cx: &mut TestAppContext) { cx.run_until_parked(); } +#[gpui::test] +async fn test_same_frame_duplicate_requests_deduplicated(cx: &mut TestAppContext) { + let (ep_store, mut requests) = init_test_with_fake_client(cx); + let fs = FakeFs::new(cx.executor()); + fs.insert_tree( + "/root", + json!({ + "foo.md": "Hello!\nHow\nBye\n" + }), + ) + .await; + let project = Project::test(fs, vec![path!("/root").as_ref()], cx).await; + + let buffer = project + .update(cx, |project, cx| { + let path = project.find_project_path(path!("root/foo.md"), cx).unwrap(); + project.open_buffer(path, cx) + }) + .await + .unwrap(); + let snapshot = buffer.read_with(cx, |buffer, _cx| buffer.snapshot()); + let position = snapshot.anchor_before(language::Point::new(1, 3)); + + // Enqueue two refresh calls in the same synchronous frame (no yielding). + // Both `cx.spawn` tasks are created before either executes, so they both + // capture the same `proceed_count_at_enqueue`. Only the first task should + // pass the deduplication gate; the second should be skipped. + ep_store.update(cx, |ep_store, cx| { + ep_store.refresh_prediction_from_buffer(project.clone(), buffer.clone(), position, cx); + ep_store.refresh_prediction_from_buffer(project.clone(), buffer.clone(), position, cx); + }); + + // Let both spawned tasks run to completion (including any throttle waits). + cx.run_until_parked(); + + // Exactly one prediction request should have been sent. + let (request, respond_tx) = requests.predict.next().await.unwrap(); + respond_tx + .send(model_response(&request, SIMPLE_DIFF)) + .unwrap(); + cx.run_until_parked(); + + // No second request should be pending. + assert_no_predict_request_ready(&mut requests.predict); +} + #[gpui::test] async fn test_rejections_flushing(cx: &mut TestAppContext) { let (ep_store, mut requests) = init_test_with_fake_client(cx);