Filter collection of snowflake requests to those with latest schemas (#49424)

Max Brunsfeld created

This allows us to just pull requests that have the latest EP request
schema with the `predicted` boolean on events in the edit history.

Release Notes:

- N/A

Change summary

crates/edit_prediction/src/capture_example.rs   |   1 
crates/edit_prediction/src/example_spec.rs      |   2 
crates/edit_prediction_cli/src/main.rs          |  34 +++++
crates/edit_prediction_cli/src/pull_examples.rs | 105 +++++++++++++++++-
4 files changed, 129 insertions(+), 13 deletions(-)

Detailed changes

crates/edit_prediction/src/example_spec.rs 🔗

@@ -68,6 +68,8 @@ pub struct CapturedPromptInput {
     pub related_files: Vec<CapturedRelatedFile>,
     #[serde(default)]
     pub in_open_source_repo: bool,
+    #[serde(default, skip_serializing_if = "Option::is_none")]
+    pub zed_version: Option<String>,
 }
 
 #[derive(Clone, Debug, PartialEq, Hash, Serialize, Deserialize)]

crates/edit_prediction_cli/src/main.rs 🔗

@@ -478,6 +478,14 @@ impl EpArgs {
     }
 }
 
+/// Minimum Zed version required for Snowflake queries.
+/// This version introduced the current request schema with predicted edits in the edit
+/// history, and open source repos distinguished.
+const MIN_CAPTURE_VERSION: pull_examples::MinCaptureVersion = pull_examples::MinCaptureVersion {
+    minor: 224,
+    patch: 1,
+};
+
 async fn load_examples(
     http_client: Arc<dyn http_client::HttpClient>,
     args: &EpArgs,
@@ -514,6 +522,20 @@ async fn load_examples(
 
     let mut examples = read_example_files(&file_inputs);
 
+    // Apply offset to file examples first, then pass remaining offset to Snowflake.
+    let file_example_count = examples.len();
+    let remaining_offset = if let Some(offset) = args.offset {
+        if offset >= file_example_count {
+            examples.clear();
+            offset - file_example_count
+        } else {
+            examples.splice(0..offset, []);
+            0
+        }
+    } else {
+        0
+    };
+
     Progress::global().set_total_examples(examples.len());
 
     let remaining_limit_for_snowflake =
@@ -533,7 +555,9 @@ async fn load_examples(
                 http_client.clone(),
                 &captured_after_timestamps,
                 max_rows_per_timestamp,
+                remaining_offset,
                 background_executor.clone(),
+                Some(MIN_CAPTURE_VERSION),
             )
             .await?;
             examples.append(&mut captured_examples);
@@ -546,7 +570,9 @@ async fn load_examples(
                 http_client.clone(),
                 &rejected_after_timestamps,
                 max_rows_per_timestamp,
+                remaining_offset,
                 background_executor.clone(),
+                Some(MIN_CAPTURE_VERSION),
             )
             .await?;
             examples.append(&mut rejected_examples);
@@ -559,7 +585,9 @@ async fn load_examples(
                 http_client.clone(),
                 &requested_after_timestamps,
                 max_rows_per_timestamp,
+                remaining_offset,
                 background_executor.clone(),
+                Some(MIN_CAPTURE_VERSION),
             )
             .await?;
             examples.append(&mut requested_examples);
@@ -572,7 +600,9 @@ async fn load_examples(
                 http_client,
                 &rated_after_inputs,
                 max_rows_per_timestamp,
+                remaining_offset,
                 background_executor,
+                Some(MIN_CAPTURE_VERSION),
             )
             .await?;
             examples.append(&mut rated_examples);
@@ -598,10 +628,6 @@ async fn load_examples(
         }
     }
 
-    if let Some(offset) = args.offset {
-        examples.splice(0..offset, []);
-    }
-
     if let Some(limit) = args.limit {
         examples.truncate(limit);
     }

crates/edit_prediction_cli/src/pull_examples.rs 🔗

@@ -28,6 +28,15 @@ const PREDICTIVE_EDIT_REQUESTED_EVENT: &str = "Predictive Edit Requested";
 const PREDICTIVE_EDIT_REJECTED_EVENT: &str = "Predictive Edit Rejected";
 const EDIT_PREDICTION_RATED_EVENT: &str = "Edit Prediction Rated";
 
+/// Minimum Zed version for filtering captured examples.
+/// For example, `MinCaptureVersion { minor: 224, patch: 1 }` means only pull examples
+/// where `zed_version >= 0.224.1`.
+#[derive(Clone, Copy, Debug)]
+pub struct MinCaptureVersion {
+    pub minor: u32,
+    pub patch: u32,
+}
+
 const DEFAULT_STATEMENT_TIMEOUT_SECONDS: u64 = 120;
 pub(crate) const POLL_INTERVAL: Duration = Duration::from_secs(2);
 pub(crate) const MAX_POLL_ATTEMPTS: usize = 120;
@@ -66,7 +75,9 @@ pub async fn fetch_captured_examples_after(
     http_client: Arc<dyn HttpClient>,
     after_timestamps: &[String],
     max_rows_per_timestamp: usize,
+    offset: usize,
     background_executor: BackgroundExecutor,
+    _min_capture_version: Option<MinCaptureVersion>,
 ) -> Result<Vec<Example>> {
     if after_timestamps.is_empty() {
         return Ok(Vec::new());
@@ -96,6 +107,7 @@ pub async fn fetch_captured_examples_after(
                 AND time > TRY_TO_TIMESTAMP_NTZ(?)
             ORDER BY time ASC
             LIMIT ?
+            OFFSET ?
         "#};
 
         let request = json!({
@@ -108,7 +120,8 @@ pub async fn fetch_captured_examples_after(
             "bindings": {
                 "1": { "type": "TEXT", "value": EDIT_PREDICTION_EXAMPLE_CAPTURED_EVENT },
                 "2": { "type": "TEXT", "value": after_date },
-                "3": { "type": "FIXED", "value": max_rows_per_timestamp.to_string() }
+                "3": { "type": "FIXED", "value": max_rows_per_timestamp.to_string() },
+                "4": { "type": "FIXED", "value": offset.to_string() }
             }
         });
 
@@ -457,7 +470,9 @@ pub async fn fetch_rejected_examples_after(
     http_client: Arc<dyn HttpClient>,
     after_timestamps: &[String],
     max_rows_per_timestamp: usize,
+    offset: usize,
     background_executor: BackgroundExecutor,
+    min_capture_version: Option<MinCaptureVersion>,
 ) -> Result<Vec<Example>> {
     if after_timestamps.is_empty() {
         return Ok(Vec::new());
@@ -492,7 +507,8 @@ pub async fn fetch_rejected_examples_after(
                 req.event_properties:prompt::string AS prompt,
                 req.event_properties:output::string AS output,
                 rej.event_properties:was_shown::boolean AS was_shown,
-                rej.event_properties:reason::string AS reason
+                rej.event_properties:reason::string AS reason,
+                req.event_properties:zed_version::string AS zed_version
             FROM events req
             INNER JOIN events rej
                 ON req.event_properties:request_id = rej.event_properties:request_id
@@ -501,10 +517,22 @@ pub async fn fetch_rejected_examples_after(
                 AND req.event_properties:version = 'V3'
                 AND rej.event_properties:was_shown = true
                 AND req.time > TRY_TO_TIMESTAMP_NTZ(?)
+                AND (? IS NULL OR (
+                    TRY_CAST(SPLIT_PART(req.event_properties:zed_version::string, '.', 2) AS INTEGER) > ?
+                    OR (
+                        TRY_CAST(SPLIT_PART(req.event_properties:zed_version::string, '.', 2) AS INTEGER) = ?
+                        AND TRY_CAST(SPLIT_PART(SPLIT_PART(req.event_properties:zed_version::string, '.', 3), '+', 1) AS INTEGER) >= ?
+                    )
+                ))
             ORDER BY req.time ASC
             LIMIT ?
+            OFFSET ?
         "#};
 
+        let min_minor_str = min_capture_version.map(|v| v.minor.to_string());
+        let min_patch_str = min_capture_version.map(|v| v.patch.to_string());
+        let min_minor_str_ref = min_minor_str.as_deref();
+        let min_patch_str_ref = min_patch_str.as_deref();
         let request = json!({
             "statement": statement,
             "timeout": DEFAULT_STATEMENT_TIMEOUT_SECONDS,
@@ -516,7 +544,12 @@ pub async fn fetch_rejected_examples_after(
                 "1": { "type": "TEXT", "value": PREDICTIVE_EDIT_REQUESTED_EVENT },
                 "2": { "type": "TEXT", "value": PREDICTIVE_EDIT_REJECTED_EVENT },
                 "3": { "type": "TEXT", "value": after_date },
-                "4": { "type": "FIXED", "value": max_rows_per_timestamp.to_string() }
+                "4": { "type": "FIXED", "value": min_minor_str_ref },
+                "5": { "type": "FIXED", "value": min_minor_str_ref },
+                "6": { "type": "FIXED", "value": min_minor_str_ref },
+                "7": { "type": "FIXED", "value": min_patch_str_ref },
+                "8": { "type": "FIXED", "value": max_rows_per_timestamp.to_string() },
+                "9": { "type": "FIXED", "value": offset.to_string() }
             }
         });
 
@@ -557,6 +590,7 @@ pub async fn fetch_rejected_examples_after(
                 "output",
                 "was_shown",
                 "reason",
+                "zed_version",
             ],
         );
 
@@ -601,7 +635,9 @@ pub async fn fetch_requested_examples_after(
     http_client: Arc<dyn HttpClient>,
     after_timestamps: &[String],
     max_rows_per_timestamp: usize,
+    offset: usize,
     background_executor: BackgroundExecutor,
+    min_capture_version: Option<MinCaptureVersion>,
 ) -> Result<Vec<Example>> {
     if after_timestamps.is_empty() {
         return Ok(Vec::new());
@@ -628,15 +664,28 @@ pub async fn fetch_requested_examples_after(
                 req.event_properties:request_id::string AS request_id,
                 req.device_id::string AS device_id,
                 req.time::string AS time,
-                req.event_properties:input AS input
+                req.event_properties:input AS input,
+                req.event_properties:zed_version::string AS zed_version
             FROM events req
             WHERE req.event_type = ?
                 AND req.event_properties:version = 'V3'
                 AND req.time > TRY_TO_TIMESTAMP_NTZ(?)
+                AND (? IS NULL OR (
+                    TRY_CAST(SPLIT_PART(req.event_properties:zed_version::string, '.', 2) AS INTEGER) > ?
+                    OR (
+                        TRY_CAST(SPLIT_PART(req.event_properties:zed_version::string, '.', 2) AS INTEGER) = ?
+                        AND TRY_CAST(SPLIT_PART(SPLIT_PART(req.event_properties:zed_version::string, '.', 3), '+', 1) AS INTEGER) >= ?
+                    )
+                ))
             ORDER BY req.time ASC
             LIMIT ?
+            OFFSET ?
         "#};
 
+        let min_minor_str = min_capture_version.map(|v| v.minor.to_string());
+        let min_patch_str = min_capture_version.map(|v| v.patch.to_string());
+        let min_minor_str_ref = min_minor_str.as_deref();
+        let min_patch_str_ref = min_patch_str.as_deref();
         let request = json!({
             "statement": statement,
             "timeout": DEFAULT_STATEMENT_TIMEOUT_SECONDS,
@@ -647,7 +696,12 @@ pub async fn fetch_requested_examples_after(
             "bindings": {
                 "1": { "type": "TEXT", "value": PREDICTIVE_EDIT_REQUESTED_EVENT },
                 "2": { "type": "TEXT", "value": after_date },
-                "3": { "type": "FIXED", "value": max_rows_per_timestamp.to_string() }
+                "3": { "type": "FIXED", "value": min_minor_str_ref },
+                "4": { "type": "FIXED", "value": min_minor_str_ref },
+                "5": { "type": "FIXED", "value": min_minor_str_ref },
+                "6": { "type": "FIXED", "value": min_patch_str_ref },
+                "7": { "type": "FIXED", "value": max_rows_per_timestamp.to_string() },
+                "8": { "type": "FIXED", "value": offset.to_string() }
             }
         });
 
@@ -679,7 +733,7 @@ pub async fn fetch_requested_examples_after(
 
         let column_indices = get_column_indices(
             &response.result_set_meta_data,
-            &["request_id", "device_id", "time", "input"],
+            &["request_id", "device_id", "time", "input", "zed_version"],
         );
 
         all_examples.extend(requested_examples_from_response(
@@ -726,7 +780,9 @@ pub async fn fetch_rated_examples_after(
     http_client: Arc<dyn HttpClient>,
     inputs: &[(String, Option<EditPredictionRating>)],
     max_rows_per_timestamp: usize,
+    offset: usize,
     background_executor: BackgroundExecutor,
+    min_capture_version: Option<MinCaptureVersion>,
 ) -> Result<Vec<Example>> {
     if inputs.is_empty() {
         return Ok(Vec::new());
@@ -768,7 +824,8 @@ pub async fn fetch_rated_examples_after(
                 rated.device_id::string AS device_id,
                 rated.time::string AS time,
                 deploy.event_properties:experiment_name::string AS experiment_name,
-                deploy.event_properties:environment::string AS environment
+                deploy.event_properties:environment::string AS environment,
+                rated.event_properties:zed_version::string AS zed_version
             FROM events rated
             LEFT JOIN events req
                 ON rated.event_properties:request_id::string = req.event_properties:request_id::string
@@ -783,10 +840,22 @@ pub async fn fetch_rated_examples_after(
                 AND rated.event_properties:inputs IS NOT NULL
                 AND rated.event_properties:inputs:cursor_excerpt IS NOT NULL
                 AND rated.event_properties:output IS NOT NULL
+                AND (? IS NULL OR (
+                    TRY_CAST(SPLIT_PART(rated.event_properties:zed_version::string, '.', 2) AS INTEGER) > ?
+                    OR (
+                        TRY_CAST(SPLIT_PART(rated.event_properties:zed_version::string, '.', 2) AS INTEGER) = ?
+                        AND TRY_CAST(SPLIT_PART(SPLIT_PART(rated.event_properties:zed_version::string, '.', 3), '+', 1) AS INTEGER) >= ?
+                    )
+                ))
             ORDER BY rated.time ASC
             LIMIT ?
+            OFFSET ?
         "#};
 
+        let min_minor_str = min_capture_version.map(|v| v.minor.to_string());
+        let min_patch_str = min_capture_version.map(|v| v.patch.to_string());
+        let min_minor_str_ref = min_minor_str.as_deref();
+        let min_patch_str_ref = min_patch_str.as_deref();
         let bindings = json!({
             "1": { "type": "TEXT", "value": PREDICTIVE_EDIT_REQUESTED_EVENT },
             "2": { "type": "TEXT", "value": EDIT_PREDICTION_DEPLOYMENT_EVENT },
@@ -794,7 +863,12 @@ pub async fn fetch_rated_examples_after(
             "4": { "type": "TEXT", "value": rating_value },
             "5": { "type": "TEXT", "value": rating_value },
             "6": { "type": "TEXT", "value": after_date },
-            "7": { "type": "FIXED", "value": max_rows_per_timestamp.to_string() }
+            "7": { "type": "FIXED", "value": min_minor_str_ref },
+            "8": { "type": "FIXED", "value": min_minor_str_ref },
+            "9": { "type": "FIXED", "value": min_minor_str_ref },
+            "10": { "type": "FIXED", "value": min_patch_str_ref },
+            "11": { "type": "FIXED", "value": max_rows_per_timestamp.to_string() },
+            "12": { "type": "FIXED", "value": offset.to_string() }
         });
 
         let request = json!({
@@ -845,6 +919,7 @@ pub async fn fetch_rated_examples_after(
                 "time",
                 "experiment_name",
                 "environment",
+                "zed_version",
             ],
         );
 
@@ -945,6 +1020,7 @@ fn rated_examples_from_response<'a>(
             let time = get_string("time");
             let experiment_name = get_string("experiment_name");
             let environment = get_string("environment");
+            let zed_version = get_string("zed_version");
 
             match (inputs, output.clone(), rating.clone(), device_id.clone(), time.clone()) {
                 (Some(inputs), Some(output), Some(rating), Some(device_id), Some(time)) => {
@@ -958,6 +1034,7 @@ fn rated_examples_from_response<'a>(
                         feedback,
                         experiment_name,
                         environment,
+                        zed_version,
                     ))
                 }
                 _ => {
@@ -987,6 +1064,7 @@ fn build_rated_example(
     feedback: String,
     experiment_name: Option<String>,
     environment: Option<String>,
+    zed_version: Option<String>,
 ) -> Example {
     let parsed_rating = if rating == "Positive" {
         EditPredictionRating::Positive
@@ -1009,7 +1087,8 @@ fn build_rated_example(
         tags.push(format!("environment:{env}"));
     }
 
-    let mut example = build_example_from_snowflake(request_id, device_id, time, input, tags, None);
+    let mut example =
+        build_example_from_snowflake(request_id, device_id, time, input, tags, None, zed_version);
 
     example.spec.rating = Some(parsed_rating);
 
@@ -1074,6 +1153,7 @@ fn requested_examples_from_response<'a>(
             let input_json = get_json("input");
             let input: Option<ZetaPromptInput> =
                 input_json.clone().and_then(|v| serde_json::from_value(v).ok());
+            let zed_version = get_string("zed_version");
 
             match (request_id_str.clone(), device_id.clone(), time.clone(), input) {
                 (Some(request_id), Some(device_id), Some(time), Some(input)) => {
@@ -1084,6 +1164,7 @@ fn requested_examples_from_response<'a>(
                         input,
                         vec!["requested".to_string()],
                         None,
+                        zed_version,
                     ))
                 }
                 _ => {
@@ -1159,6 +1240,7 @@ fn rejected_examples_from_response<'a>(
             let output = get_string("output");
             let was_shown = get_bool("was_shown");
             let reason = get_string("reason");
+            let zed_version = get_string("zed_version");
 
             match (request_id_str.clone(), device_id.clone(), time.clone(), input, output.clone(), was_shown, reason.clone()) {
                 (Some(request_id), Some(device_id), Some(time), Some(input), Some(output), Some(was_shown), Some(reason)) => {
@@ -1170,6 +1252,7 @@ fn rejected_examples_from_response<'a>(
                         output,
                         was_shown,
                         reason,
+                        zed_version,
                     ))
                 }
                 _ => {
@@ -1199,6 +1282,7 @@ fn build_rejected_example(
     output: String,
     was_shown: bool,
     reason: String,
+    zed_version: Option<String>,
 ) -> Example {
     let rejected_patch = build_output_patch(
         &input.cursor_path,
@@ -1213,6 +1297,7 @@ fn build_rejected_example(
         input,
         vec![format!("rejection:{}", reason.to_lowercase())],
         Some(RejectionInfo { reason, was_shown }),
+        zed_version,
     );
     example.spec.rejected_patch = Some(rejected_patch);
     example
@@ -1230,6 +1315,7 @@ fn build_example_from_snowflake(
     input: ZetaPromptInput,
     tags: Vec<String>,
     rejection: Option<RejectionInfo>,
+    zed_version: Option<String>,
 ) -> Example {
     let events: Vec<CapturedEvent> = input
         .events
@@ -1305,6 +1391,7 @@ fn build_example_from_snowflake(
             events,
             related_files,
             in_open_source_repo: input.in_open_source_repo,
+            zed_version,
         }),
         telemetry: Some(TelemetrySource {
             request_id,