From bbb31f29c8b6147bb0dd599f1a0f2df1275b2f53 Mon Sep 17 00:00:00 2001 From: Ben Kunkle Date: Fri, 27 Feb 2026 18:09:45 -0600 Subject: [PATCH] ep: Replace snapshot events with settled event (#50356) Release Notes: - N/A --------- Co-authored-by: Max Co-authored-by: Max Brunsfeld --- crates/edit_prediction/Cargo.toml | 1 + crates/edit_prediction/src/edit_prediction.rs | 145 ++++++++++++++- .../src/edit_prediction_tests.rs | 176 +++++++++++++++++- crates/edit_prediction/src/zeta.rs | 16 ++ 4 files changed, 335 insertions(+), 3 deletions(-) diff --git a/crates/edit_prediction/Cargo.toml b/crates/edit_prediction/Cargo.toml index ace898fb6004668fbde916ab4b0447d8e5b8a553..9f867584b57c8aed86f7003cca3a2b034c184476 100644 --- a/crates/edit_prediction/Cargo.toml +++ b/crates/edit_prediction/Cargo.toml @@ -21,6 +21,7 @@ arrayvec.workspace = true brotli.workspace = true buffer_diff.workspace = true client.workspace = true +clock.workspace = true cloud_api_types.workspace = true cloud_llm_client.workspace = true collections.workspace = true diff --git a/crates/edit_prediction/src/edit_prediction.rs b/crates/edit_prediction/src/edit_prediction.rs index 836b4a477f62e2da6674568d0a7a1ccfc2b603cf..02ffcbe065e8b0334ab7c200c0e43b817cdad416 100644 --- a/crates/edit_prediction/src/edit_prediction.rs +++ b/crates/edit_prediction/src/edit_prediction.rs @@ -40,7 +40,7 @@ use settings::{ }; use std::collections::{VecDeque, hash_map}; use std::env; -use text::Edit; +use text::{AnchorRangeExt, Edit}; use workspace::Workspace; use zeta_prompt::{ZetaFormat, ZetaPromptInput}; @@ -103,6 +103,9 @@ const CHANGE_GROUPING_LINE_SPAN: u32 = 8; const LAST_CHANGE_GROUPING_TIME: Duration = Duration::from_secs(1); const ZED_PREDICT_DATA_COLLECTION_CHOICE: &str = "zed_predict_data_collection_choice"; const REJECT_REQUEST_DEBOUNCE: Duration = Duration::from_secs(15); +const EDIT_PREDICTION_SETTLED_EVENT: &str = "Edit Prediction Settled"; +const EDIT_PREDICTION_SETTLED_TTL: Duration = Duration::from_secs(60 * 5); +const EDIT_PREDICTION_SETTLED_QUIESCENCE: Duration = Duration::from_secs(10); pub struct Zeta2FeatureFlag; pub struct EditPredictionJumpsFeatureFlag; @@ -142,8 +145,11 @@ pub struct EditPredictionStore { pub mercury: Mercury, data_collection_choice: DataCollectionChoice, reject_predictions_tx: mpsc::UnboundedSender, + settled_predictions_tx: mpsc::UnboundedSender, shown_predictions: VecDeque, rated_predictions: HashSet, + #[cfg(test)] + settled_event_callback: Option>, } #[derive(Copy, Clone, PartialEq, Eq)] @@ -482,9 +488,18 @@ impl std::ops::Deref for BufferEditPrediction<'_> { } } +#[derive(Clone)] +struct PendingSettledPrediction { + request_id: EditPredictionId, + editable_anchor_range: Range, + enqueued_at: Instant, + last_edit_at: Instant, +} + struct RegisteredBuffer { file: Option>, snapshot: TextBufferSnapshot, + pending_predictions: Vec, last_position: Option, _subscriptions: [gpui::Subscription; 2], } @@ -676,6 +691,12 @@ impl EditPredictionStore { }) .detach(); + let (settled_predictions_tx, settled_predictions_rx) = mpsc::unbounded(); + cx.spawn(async move |this, cx| { + Self::run_settled_predictions_worker(this, settled_predictions_rx, cx).await; + }) + .detach(); + let this = Self { projects: HashMap::default(), client, @@ -701,8 +722,11 @@ impl EditPredictionStore { data_collection_choice, reject_predictions_tx: reject_tx, + settled_predictions_tx, rated_predictions: Default::default(), shown_predictions: Default::default(), + #[cfg(test)] + settled_event_callback: None, }; this @@ -1091,6 +1115,7 @@ impl EditPredictionStore { snapshot, file, last_position: None, + pending_predictions: Vec::new(), _subscriptions: [ cx.subscribe(buffer, { let project = project.downgrade(); @@ -1139,6 +1164,7 @@ impl EditPredictionStore { let mut total_inserted = 0usize; let mut edit_range: Option> = None; let mut last_offset: Option = None; + let now = cx.background_executor().now(); for (edit, anchor_range) in new_snapshot.anchored_edits_since::(&old_snapshot.version) @@ -1157,6 +1183,12 @@ impl EditPredictionStore { return; }; + for pending_prediction in &mut registered_buffer.pending_predictions { + if edit_range.overlaps(&pending_prediction.editable_anchor_range, &new_snapshot) { + pending_prediction.last_edit_at = now; + } + } + let action_type = match (total_deleted, total_inserted, num_edits) { (0, ins, n) if ins == n => UserActionType::InsertChar, (0, _, _) => UserActionType::InsertSelection, @@ -1183,7 +1215,6 @@ impl EditPredictionStore { let events = &mut project_state.events; - let now = cx.background_executor().now(); if let Some(last_event) = project_state.last_event.as_mut() { let is_next_snapshot_of_same_buffer = old_snapshot.remote_id() == last_event.new_snapshot.remote_id() @@ -1386,6 +1417,116 @@ impl EditPredictionStore { } } + async fn run_settled_predictions_worker( + this: WeakEntity, + mut rx: UnboundedReceiver, + cx: &mut AsyncApp, + ) { + let mut next_wake_time: Option = None; + loop { + let now = cx.background_executor().now(); + if let Some(wake_time) = next_wake_time.take() { + cx.background_executor() + .timer(wake_time.duration_since(now)) + .await; + } else { + let Some(new_enqueue_time) = rx.next().await else { + break; + }; + next_wake_time = Some(new_enqueue_time + EDIT_PREDICTION_SETTLED_QUIESCENCE); + while rx.next().now_or_never().flatten().is_some() {} + continue; + } + + let Some(this) = this.upgrade() else { + break; + }; + + let now = cx.background_executor().now(); + + let mut oldest_edited_at = None; + + this.update(cx, |this, _| { + for (_, project_state) in this.projects.iter_mut() { + for (_, registered_buffer) in project_state.registered_buffers.iter_mut() { + registered_buffer + .pending_predictions + .retain_mut(|pending_prediction| { + let age = + now.saturating_duration_since(pending_prediction.enqueued_at); + if age >= EDIT_PREDICTION_SETTLED_TTL { + return false; + } + + let quiet_for = + now.saturating_duration_since(pending_prediction.last_edit_at); + if quiet_for >= EDIT_PREDICTION_SETTLED_QUIESCENCE { + let settled_editable_region = registered_buffer + .snapshot + .text_for_range( + pending_prediction.editable_anchor_range.clone(), + ) + .collect::(); + + #[cfg(test)] + if let Some(callback) = &this.settled_event_callback { + callback( + pending_prediction.request_id.clone(), + settled_editable_region.clone(), + ); + } + + telemetry::event!( + EDIT_PREDICTION_SETTLED_EVENT, + request_id = pending_prediction.request_id.0.clone(), + settled_editable_region, + ); + + return false; + } + + if oldest_edited_at + .is_none_or(|t| pending_prediction.last_edit_at < t) + { + oldest_edited_at = Some(pending_prediction.last_edit_at); + } + + true + }); + } + } + }); + + next_wake_time = oldest_edited_at.map(|t| t + EDIT_PREDICTION_SETTLED_QUIESCENCE); + } + } + + pub(crate) fn enqueue_settled_prediction( + &mut self, + request_id: EditPredictionId, + project: &Entity, + edited_buffer: &Entity, + edited_buffer_snapshot: &BufferSnapshot, + editable_offset_range: Range, + cx: &mut Context, + ) { + let project_state = self.get_or_init_project(project, cx); + if let Some(buffer) = project_state + .registered_buffers + .get_mut(&edited_buffer.entity_id()) + { + let now = cx.background_executor().now(); + buffer.pending_predictions.push(PendingSettledPrediction { + request_id, + editable_anchor_range: edited_buffer_snapshot + .anchor_range_around(editable_offset_range), + enqueued_at: now, + last_edit_at: now, + }); + self.settled_predictions_tx.unbounded_send(now).ok(); + } + } + fn reject_current_prediction( &mut self, reason: EditPredictionRejectReason, diff --git a/crates/edit_prediction/src/edit_prediction_tests.rs b/crates/edit_prediction/src/edit_prediction_tests.rs index abe522494fc8962a995313ffb1a57b8672c22ca4..beeb855c7b84bae53ea2f8f8bd6a117403e77db1 100644 --- a/crates/edit_prediction/src/edit_prediction_tests.rs +++ b/crates/edit_prediction/src/edit_prediction_tests.rs @@ -29,7 +29,10 @@ use util::path; use uuid::Uuid; use zeta_prompt::ZetaPromptInput; -use crate::{BufferEditPrediction, EditPredictionId, EditPredictionStore, REJECT_REQUEST_DEBOUNCE}; +use crate::{ + BufferEditPrediction, EDIT_PREDICTION_SETTLED_QUIESCENCE, EditPredictionId, + EditPredictionStore, REJECT_REQUEST_DEBOUNCE, +}; #[gpui::test] async fn test_current_state(cx: &mut TestAppContext) { @@ -2574,6 +2577,177 @@ async fn test_diagnostic_jump_excludes_collaborator_regions(cx: &mut TestAppCont ); } +#[gpui::test] +async fn test_edit_prediction_settled(cx: &mut TestAppContext) { + let (ep_store, _requests) = init_test_with_fake_client(cx); + let fs = FakeFs::new(cx.executor()); + + // Buffer with two clearly separated regions: + // Region A = lines 0-9 (offsets 0..50) + // Region B = lines 20-29 (offsets 105..155) + // A big gap in between so edits in one region never overlap the other. + let mut content = String::new(); + for i in 0..30 { + content.push_str(&format!("line {i:02}\n")); + } + + fs.insert_tree( + "/root", + json!({ + "foo.md": content.clone() + }), + ) + .await; + let project = Project::test(fs, vec![path!("/root").as_ref()], cx).await; + + let buffer = project + .update(cx, |project, cx| { + let path = project.find_project_path(path!("root/foo.md"), cx).unwrap(); + project.open_buffer(path, cx) + }) + .await + .unwrap(); + + let settled_events: Arc>> = + Arc::new(Mutex::new(Vec::new())); + + ep_store.update(cx, |ep_store, cx| { + ep_store.register_buffer(&buffer, &project, cx); + + let settled_events = settled_events.clone(); + ep_store.settled_event_callback = Some(Box::new(move |id, text| { + settled_events.lock().push((id, text)); + })); + }); + + // --- Phase 1: edit in region A and enqueue prediction A --- + + buffer.update(cx, |buffer, cx| { + // Edit at the start of line 0. + buffer.edit(vec![(0..0, "ADDED ")], None, cx); + }); + cx.run_until_parked(); + + let snapshot_a = buffer.read_with(cx, |buffer, _cx| buffer.snapshot()); + + // Region A: first 10 lines of the buffer. + let editable_region_a = 0..snapshot_a.point_to_offset(Point::new(10, 0)); + ep_store.update(cx, |ep_store, cx| { + ep_store.enqueue_settled_prediction( + EditPredictionId("prediction-a".into()), + &project, + &buffer, + &snapshot_a, + editable_region_a, + cx, + ); + }); + + // --- Phase 2: repeatedly edit in region A to keep it unsettled --- + + // Let the worker process the channel message before we start advancing. + cx.run_until_parked(); + + let mut region_a_edit_offset = 5; + for _ in 0..3 { + // Edit inside region A (not at the boundary) so `last_edit_at` is + // updated before the worker's next wake. + buffer.update(cx, |buffer, cx| { + buffer.edit( + vec![(region_a_edit_offset..region_a_edit_offset, "x")], + None, + cx, + ); + }); + region_a_edit_offset += 1; + cx.run_until_parked(); + + cx.executor() + .advance_clock(EDIT_PREDICTION_SETTLED_QUIESCENCE / 2); + cx.run_until_parked(); + assert!( + settled_events.lock().is_empty(), + "no settled events should fire while region A is still being edited" + ); + } + + // Still nothing settled. + assert!(settled_events.lock().is_empty()); + + // --- Phase 3: edit in distinct region B, enqueue prediction B --- + // Advance a small amount so B's quiescence window starts later than A's, + // but not so much that A settles (A's last edit was at the start of + // iteration 3, and it needs a full Q to settle). + cx.executor() + .advance_clock(EDIT_PREDICTION_SETTLED_QUIESCENCE / 4); + cx.run_until_parked(); + assert!(settled_events.lock().is_empty()); + + let snapshot_b = buffer.read_with(cx, |buffer, _cx| buffer.snapshot()); + let line_20_offset = snapshot_b.point_to_offset(Point::new(20, 0)); + + buffer.update(cx, |buffer, cx| { + buffer.edit(vec![(line_20_offset..line_20_offset, "NEW ")], None, cx); + }); + cx.run_until_parked(); + + let snapshot_b2 = buffer.read_with(cx, |buffer, _cx| buffer.snapshot()); + let editable_region_b = line_20_offset..snapshot_b2.point_to_offset(Point::new(25, 0)); + ep_store.update(cx, |ep_store, cx| { + ep_store.enqueue_settled_prediction( + EditPredictionId("prediction-b".into()), + &project, + &buffer, + &snapshot_b2, + editable_region_b, + cx, + ); + }); + + cx.run_until_parked(); + assert!( + settled_events.lock().is_empty(), + "neither prediction should have settled yet" + ); + + // --- Phase 4: let enough time pass for region A to settle --- + // A's last edit was at T_a (during the last loop iteration). The worker is + // sleeping until T_a + Q. We advance just enough to reach that wake time + // (Q/4 since we already advanced Q/4 in phase 3 on top of the loop's + // 3*Q/2). At that point A has been quiet for Q and settles, but B was + // enqueued only Q/4 ago and stays pending. + cx.executor() + .advance_clock(EDIT_PREDICTION_SETTLED_QUIESCENCE / 4); + cx.run_until_parked(); + + { + let events = settled_events.lock().clone(); + assert_eq!( + events.len(), + 1, + "only prediction A should have settled, got: {events:?}" + ); + assert_eq!(events[0].0, EditPredictionId("prediction-a".into())); + } + + // --- Phase 5: let more time pass for region B to settle --- + // B's last edit was Q/4 before A settled. The worker rescheduled to + // B's last_edit_at + Q, which is 3Q/4 from now. + cx.executor() + .advance_clock(EDIT_PREDICTION_SETTLED_QUIESCENCE * 3 / 4); + cx.run_until_parked(); + + { + let events = settled_events.lock().clone(); + assert_eq!( + events.len(), + 2, + "both predictions should have settled, got: {events:?}" + ); + assert_eq!(events[1].0, EditPredictionId("prediction-b".into())); + } +} + #[ctor::ctor] fn init_logger() { zlog::init_test(); diff --git a/crates/edit_prediction/src/zeta.rs b/crates/edit_prediction/src/zeta.rs index 1159c67aec41128262e75c31fdfff5954a8b3952..9c6e9e30d94c5e1988d54da7966a58fd8e69e233 100644 --- a/crates/edit_prediction/src/zeta.rs +++ b/crates/edit_prediction/src/zeta.rs @@ -309,6 +309,7 @@ pub fn request_prediction_with_zeta( edits, cursor_position, received_response_at, + editable_range_in_buffer, )), model_version, )), @@ -331,6 +332,7 @@ pub fn request_prediction_with_zeta( edits, cursor_position, received_response_at, + editable_range_in_buffer, )) = prediction else { return Ok(Some(EditPredictionResult { @@ -339,6 +341,20 @@ pub fn request_prediction_with_zeta( })); }; + if can_collect_data { + this.update(cx, |this, cx| { + this.enqueue_settled_prediction( + id.clone(), + &project, + &edited_buffer, + &edited_buffer_snapshot, + editable_range_in_buffer, + cx, + ); + }) + .ok(); + } + Ok(Some( EditPredictionResult::new( id,