ep: Replace snapshot events with settled event (#50356)

Ben Kunkle , Max , and Max Brunsfeld created

Release Notes:

- N/A

---------

Co-authored-by: Max <max@zed.dev>
Co-authored-by: Max Brunsfeld <maxbrunsfeld@gmail.com>

Change summary

crates/edit_prediction/Cargo.toml                   |   1 
crates/edit_prediction/src/edit_prediction.rs       | 145 ++++++++++++
crates/edit_prediction/src/edit_prediction_tests.rs | 176 ++++++++++++++
crates/edit_prediction/src/zeta.rs                  |  16 +
4 files changed, 335 insertions(+), 3 deletions(-)

Detailed changes

crates/edit_prediction/Cargo.toml 🔗

@@ -21,6 +21,7 @@ arrayvec.workspace = true
 brotli.workspace = true
 buffer_diff.workspace = true
 client.workspace = true
+clock.workspace = true
 cloud_api_types.workspace = true
 cloud_llm_client.workspace = true
 collections.workspace = true

crates/edit_prediction/src/edit_prediction.rs 🔗

@@ -40,7 +40,7 @@ use settings::{
 };
 use std::collections::{VecDeque, hash_map};
 use std::env;
-use text::Edit;
+use text::{AnchorRangeExt, Edit};
 use workspace::Workspace;
 use zeta_prompt::{ZetaFormat, ZetaPromptInput};
 
@@ -103,6 +103,9 @@ const CHANGE_GROUPING_LINE_SPAN: u32 = 8;
 const LAST_CHANGE_GROUPING_TIME: Duration = Duration::from_secs(1);
 const ZED_PREDICT_DATA_COLLECTION_CHOICE: &str = "zed_predict_data_collection_choice";
 const REJECT_REQUEST_DEBOUNCE: Duration = Duration::from_secs(15);
+const EDIT_PREDICTION_SETTLED_EVENT: &str = "Edit Prediction Settled";
+const EDIT_PREDICTION_SETTLED_TTL: Duration = Duration::from_secs(60 * 5);
+const EDIT_PREDICTION_SETTLED_QUIESCENCE: Duration = Duration::from_secs(10);
 
 pub struct Zeta2FeatureFlag;
 pub struct EditPredictionJumpsFeatureFlag;
@@ -142,8 +145,11 @@ pub struct EditPredictionStore {
     pub mercury: Mercury,
     data_collection_choice: DataCollectionChoice,
     reject_predictions_tx: mpsc::UnboundedSender<EditPredictionRejection>,
+    settled_predictions_tx: mpsc::UnboundedSender<Instant>,
     shown_predictions: VecDeque<EditPrediction>,
     rated_predictions: HashSet<EditPredictionId>,
+    #[cfg(test)]
+    settled_event_callback: Option<Box<dyn Fn(EditPredictionId, String)>>,
 }
 
 #[derive(Copy, Clone, PartialEq, Eq)]
@@ -482,9 +488,18 @@ impl std::ops::Deref for BufferEditPrediction<'_> {
     }
 }
 
+#[derive(Clone)]
+struct PendingSettledPrediction {
+    request_id: EditPredictionId,
+    editable_anchor_range: Range<Anchor>,
+    enqueued_at: Instant,
+    last_edit_at: Instant,
+}
+
 struct RegisteredBuffer {
     file: Option<Arc<dyn File>>,
     snapshot: TextBufferSnapshot,
+    pending_predictions: Vec<PendingSettledPrediction>,
     last_position: Option<Anchor>,
     _subscriptions: [gpui::Subscription; 2],
 }
@@ -676,6 +691,12 @@ impl EditPredictionStore {
         })
         .detach();
 
+        let (settled_predictions_tx, settled_predictions_rx) = mpsc::unbounded();
+        cx.spawn(async move |this, cx| {
+            Self::run_settled_predictions_worker(this, settled_predictions_rx, cx).await;
+        })
+        .detach();
+
         let this = Self {
             projects: HashMap::default(),
             client,
@@ -701,8 +722,11 @@ impl EditPredictionStore {
 
             data_collection_choice,
             reject_predictions_tx: reject_tx,
+            settled_predictions_tx,
             rated_predictions: Default::default(),
             shown_predictions: Default::default(),
+            #[cfg(test)]
+            settled_event_callback: None,
         };
 
         this
@@ -1091,6 +1115,7 @@ impl EditPredictionStore {
                     snapshot,
                     file,
                     last_position: None,
+                    pending_predictions: Vec::new(),
                     _subscriptions: [
                         cx.subscribe(buffer, {
                             let project = project.downgrade();
@@ -1139,6 +1164,7 @@ impl EditPredictionStore {
         let mut total_inserted = 0usize;
         let mut edit_range: Option<Range<Anchor>> = None;
         let mut last_offset: Option<usize> = None;
+        let now = cx.background_executor().now();
 
         for (edit, anchor_range) in
             new_snapshot.anchored_edits_since::<usize>(&old_snapshot.version)
@@ -1157,6 +1183,12 @@ impl EditPredictionStore {
             return;
         };
 
+        for pending_prediction in &mut registered_buffer.pending_predictions {
+            if edit_range.overlaps(&pending_prediction.editable_anchor_range, &new_snapshot) {
+                pending_prediction.last_edit_at = now;
+            }
+        }
+
         let action_type = match (total_deleted, total_inserted, num_edits) {
             (0, ins, n) if ins == n => UserActionType::InsertChar,
             (0, _, _) => UserActionType::InsertSelection,
@@ -1183,7 +1215,6 @@ impl EditPredictionStore {
 
         let events = &mut project_state.events;
 
-        let now = cx.background_executor().now();
         if let Some(last_event) = project_state.last_event.as_mut() {
             let is_next_snapshot_of_same_buffer = old_snapshot.remote_id()
                 == last_event.new_snapshot.remote_id()
@@ -1386,6 +1417,116 @@ impl EditPredictionStore {
         }
     }
 
+    async fn run_settled_predictions_worker(
+        this: WeakEntity<Self>,
+        mut rx: UnboundedReceiver<Instant>,
+        cx: &mut AsyncApp,
+    ) {
+        let mut next_wake_time: Option<Instant> = None;
+        loop {
+            let now = cx.background_executor().now();
+            if let Some(wake_time) = next_wake_time.take() {
+                cx.background_executor()
+                    .timer(wake_time.duration_since(now))
+                    .await;
+            } else {
+                let Some(new_enqueue_time) = rx.next().await else {
+                    break;
+                };
+                next_wake_time = Some(new_enqueue_time + EDIT_PREDICTION_SETTLED_QUIESCENCE);
+                while rx.next().now_or_never().flatten().is_some() {}
+                continue;
+            }
+
+            let Some(this) = this.upgrade() else {
+                break;
+            };
+
+            let now = cx.background_executor().now();
+
+            let mut oldest_edited_at = None;
+
+            this.update(cx, |this, _| {
+                for (_, project_state) in this.projects.iter_mut() {
+                    for (_, registered_buffer) in project_state.registered_buffers.iter_mut() {
+                        registered_buffer
+                            .pending_predictions
+                            .retain_mut(|pending_prediction| {
+                                let age =
+                                    now.saturating_duration_since(pending_prediction.enqueued_at);
+                                if age >= EDIT_PREDICTION_SETTLED_TTL {
+                                    return false;
+                                }
+
+                                let quiet_for =
+                                    now.saturating_duration_since(pending_prediction.last_edit_at);
+                                if quiet_for >= EDIT_PREDICTION_SETTLED_QUIESCENCE {
+                                    let settled_editable_region = registered_buffer
+                                        .snapshot
+                                        .text_for_range(
+                                            pending_prediction.editable_anchor_range.clone(),
+                                        )
+                                        .collect::<String>();
+
+                                    #[cfg(test)]
+                                    if let Some(callback) = &this.settled_event_callback {
+                                        callback(
+                                            pending_prediction.request_id.clone(),
+                                            settled_editable_region.clone(),
+                                        );
+                                    }
+
+                                    telemetry::event!(
+                                        EDIT_PREDICTION_SETTLED_EVENT,
+                                        request_id = pending_prediction.request_id.0.clone(),
+                                        settled_editable_region,
+                                    );
+
+                                    return false;
+                                }
+
+                                if oldest_edited_at
+                                    .is_none_or(|t| pending_prediction.last_edit_at < t)
+                                {
+                                    oldest_edited_at = Some(pending_prediction.last_edit_at);
+                                }
+
+                                true
+                            });
+                    }
+                }
+            });
+
+            next_wake_time = oldest_edited_at.map(|t| t + EDIT_PREDICTION_SETTLED_QUIESCENCE);
+        }
+    }
+
+    pub(crate) fn enqueue_settled_prediction(
+        &mut self,
+        request_id: EditPredictionId,
+        project: &Entity<Project>,
+        edited_buffer: &Entity<Buffer>,
+        edited_buffer_snapshot: &BufferSnapshot,
+        editable_offset_range: Range<usize>,
+        cx: &mut Context<Self>,
+    ) {
+        let project_state = self.get_or_init_project(project, cx);
+        if let Some(buffer) = project_state
+            .registered_buffers
+            .get_mut(&edited_buffer.entity_id())
+        {
+            let now = cx.background_executor().now();
+            buffer.pending_predictions.push(PendingSettledPrediction {
+                request_id,
+                editable_anchor_range: edited_buffer_snapshot
+                    .anchor_range_around(editable_offset_range),
+                enqueued_at: now,
+                last_edit_at: now,
+            });
+            self.settled_predictions_tx.unbounded_send(now).ok();
+        }
+    }
+
     fn reject_current_prediction(
         &mut self,
         reason: EditPredictionRejectReason,

crates/edit_prediction/src/edit_prediction_tests.rs 🔗

@@ -29,7 +29,10 @@ use util::path;
 use uuid::Uuid;
 use zeta_prompt::ZetaPromptInput;
 
-use crate::{BufferEditPrediction, EditPredictionId, EditPredictionStore, REJECT_REQUEST_DEBOUNCE};
+use crate::{
+    BufferEditPrediction, EDIT_PREDICTION_SETTLED_QUIESCENCE, EditPredictionId,
+    EditPredictionStore, REJECT_REQUEST_DEBOUNCE,
+};
 
 #[gpui::test]
 async fn test_current_state(cx: &mut TestAppContext) {
@@ -2574,6 +2577,177 @@ async fn test_diagnostic_jump_excludes_collaborator_regions(cx: &mut TestAppCont
     );
 }
 
+#[gpui::test]
+async fn test_edit_prediction_settled(cx: &mut TestAppContext) {
+    let (ep_store, _requests) = init_test_with_fake_client(cx);
+    let fs = FakeFs::new(cx.executor());
+
+    // Buffer with two clearly separated regions:
+    //   Region A = lines 0-9   (offsets 0..50)
+    //   Region B = lines 20-29 (offsets 105..155)
+    // A big gap in between so edits in one region never overlap the other.
+    let mut content = String::new();
+    for i in 0..30 {
+        content.push_str(&format!("line {i:02}\n"));
+    }
+
+    fs.insert_tree(
+        "/root",
+        json!({
+            "foo.md": content.clone()
+        }),
+    )
+    .await;
+    let project = Project::test(fs, vec![path!("/root").as_ref()], cx).await;
+
+    let buffer = project
+        .update(cx, |project, cx| {
+            let path = project.find_project_path(path!("root/foo.md"), cx).unwrap();
+            project.open_buffer(path, cx)
+        })
+        .await
+        .unwrap();
+
+    let settled_events: Arc<Mutex<Vec<(EditPredictionId, String)>>> =
+        Arc::new(Mutex::new(Vec::new()));
+
+    ep_store.update(cx, |ep_store, cx| {
+        ep_store.register_buffer(&buffer, &project, cx);
+
+        let settled_events = settled_events.clone();
+        ep_store.settled_event_callback = Some(Box::new(move |id, text| {
+            settled_events.lock().push((id, text));
+        }));
+    });
+
+    // --- Phase 1: edit in region A and enqueue prediction A ---
+
+    buffer.update(cx, |buffer, cx| {
+        // Edit at the start of line 0.
+        buffer.edit(vec![(0..0, "ADDED ")], None, cx);
+    });
+    cx.run_until_parked();
+
+    let snapshot_a = buffer.read_with(cx, |buffer, _cx| buffer.snapshot());
+
+    // Region A: first 10 lines of the buffer.
+    let editable_region_a = 0..snapshot_a.point_to_offset(Point::new(10, 0));
+    ep_store.update(cx, |ep_store, cx| {
+        ep_store.enqueue_settled_prediction(
+            EditPredictionId("prediction-a".into()),
+            &project,
+            &buffer,
+            &snapshot_a,
+            editable_region_a,
+            cx,
+        );
+    });
+
+    // --- Phase 2: repeatedly edit in region A to keep it unsettled ---
+
+    // Let the worker process the channel message before we start advancing.
+    cx.run_until_parked();
+
+    let mut region_a_edit_offset = 5;
+    for _ in 0..3 {
+        // Edit inside region A (not at the boundary) so `last_edit_at` is
+        // updated before the worker's next wake.
+        buffer.update(cx, |buffer, cx| {
+            buffer.edit(
+                vec![(region_a_edit_offset..region_a_edit_offset, "x")],
+                None,
+                cx,
+            );
+        });
+        region_a_edit_offset += 1;
+        cx.run_until_parked();
+
+        cx.executor()
+            .advance_clock(EDIT_PREDICTION_SETTLED_QUIESCENCE / 2);
+        cx.run_until_parked();
+        assert!(
+            settled_events.lock().is_empty(),
+            "no settled events should fire while region A is still being edited"
+        );
+    }
+
+    // Still nothing settled.
+    assert!(settled_events.lock().is_empty());
+
+    // --- Phase 3: edit in distinct region B, enqueue prediction B ---
+    // Advance a small amount so B's quiescence window starts later than A's,
+    // but not so much that A settles (A's last edit was at the start of
+    // iteration 3, and it needs a full Q to settle).
+    cx.executor()
+        .advance_clock(EDIT_PREDICTION_SETTLED_QUIESCENCE / 4);
+    cx.run_until_parked();
+    assert!(settled_events.lock().is_empty());
+
+    let snapshot_b = buffer.read_with(cx, |buffer, _cx| buffer.snapshot());
+    let line_20_offset = snapshot_b.point_to_offset(Point::new(20, 0));
+
+    buffer.update(cx, |buffer, cx| {
+        buffer.edit(vec![(line_20_offset..line_20_offset, "NEW ")], None, cx);
+    });
+    cx.run_until_parked();
+
+    let snapshot_b2 = buffer.read_with(cx, |buffer, _cx| buffer.snapshot());
+    let editable_region_b = line_20_offset..snapshot_b2.point_to_offset(Point::new(25, 0));
+    ep_store.update(cx, |ep_store, cx| {
+        ep_store.enqueue_settled_prediction(
+            EditPredictionId("prediction-b".into()),
+            &project,
+            &buffer,
+            &snapshot_b2,
+            editable_region_b,
+            cx,
+        );
+    });
+
+    cx.run_until_parked();
+    assert!(
+        settled_events.lock().is_empty(),
+        "neither prediction should have settled yet"
+    );
+
+    // --- Phase 4: let enough time pass for region A to settle ---
+    // A's last edit was at T_a (during the last loop iteration). The worker is
+    // sleeping until T_a + Q. We advance just enough to reach that wake time
+    // (Q/4 since we already advanced Q/4 in phase 3 on top of the loop's
+    // 3*Q/2). At that point A has been quiet for Q and settles, but B was
+    // enqueued only Q/4 ago and stays pending.
+    cx.executor()
+        .advance_clock(EDIT_PREDICTION_SETTLED_QUIESCENCE / 4);
+    cx.run_until_parked();
+
+    {
+        let events = settled_events.lock().clone();
+        assert_eq!(
+            events.len(),
+            1,
+            "only prediction A should have settled, got: {events:?}"
+        );
+        assert_eq!(events[0].0, EditPredictionId("prediction-a".into()));
+    }
+
+    // --- Phase 5: let more time pass for region B to settle ---
+    // B's last edit was Q/4 before A settled. The worker rescheduled to
+    // B's last_edit_at + Q, which is 3Q/4 from now.
+    cx.executor()
+        .advance_clock(EDIT_PREDICTION_SETTLED_QUIESCENCE * 3 / 4);
+    cx.run_until_parked();
+
+    {
+        let events = settled_events.lock().clone();
+        assert_eq!(
+            events.len(),
+            2,
+            "both predictions should have settled, got: {events:?}"
+        );
+        assert_eq!(events[1].0, EditPredictionId("prediction-b".into()));
+    }
+}
+
 #[ctor::ctor]
 fn init_logger() {
     zlog::init_test();

crates/edit_prediction/src/zeta.rs 🔗

@@ -309,6 +309,7 @@ pub fn request_prediction_with_zeta(
                         edits,
                         cursor_position,
                         received_response_at,
+                        editable_range_in_buffer,
                     )),
                     model_version,
                 )),
@@ -331,6 +332,7 @@ pub fn request_prediction_with_zeta(
             edits,
             cursor_position,
             received_response_at,
+            editable_range_in_buffer,
         )) = prediction
         else {
             return Ok(Some(EditPredictionResult {
@@ -339,6 +341,20 @@ pub fn request_prediction_with_zeta(
             }));
         };
 
+        if can_collect_data {
+            this.update(cx, |this, cx| {
+                this.enqueue_settled_prediction(
+                    id.clone(),
+                    &project,
+                    &edited_buffer,
+                    &edited_buffer_snapshot,
+                    editable_range_in_buffer,
+                    cx,
+                );
+            })
+            .ok();
+        }
+
         Ok(Some(
             EditPredictionResult::new(
                 id,