From 717ea9e9981acec67977bbfb8f8223bbdaa859c5 Mon Sep 17 00:00:00 2001 From: Max Brunsfeld Date: Tue, 17 Feb 2026 16:37:28 -0800 Subject: [PATCH] Filter collection of snowflake requests to those with latest schemas (#49424) This allows us to just pull requests that have the latest EP request schema with the `predicted` boolean on events in the edit history. Release Notes: - N/A --- crates/edit_prediction/src/capture_example.rs | 1 + crates/edit_prediction/src/example_spec.rs | 2 + crates/edit_prediction_cli/src/main.rs | 34 +++++- .../edit_prediction_cli/src/pull_examples.rs | 105 ++++++++++++++++-- 4 files changed, 129 insertions(+), 13 deletions(-) diff --git a/crates/edit_prediction/src/capture_example.rs b/crates/edit_prediction/src/capture_example.rs index bfe56408dc5ea9c1017c8c77c54068e3ae0f99cf..d9cccda9fee5abe93b3bff3823e2a6897c22f07f 100644 --- a/crates/edit_prediction/src/capture_example.rs +++ b/crates/edit_prediction/src/capture_example.rs @@ -155,6 +155,7 @@ pub fn capture_example( events: captured_events, related_files: captured_related_files, in_open_source_repo: false, + zed_version: None, } }); diff --git a/crates/edit_prediction/src/example_spec.rs b/crates/edit_prediction/src/example_spec.rs index 81e786670056814482fc0642a8ea79546366f2ed..32f696639b2c4643e79bc2cd571548b4f04bd0f9 100644 --- a/crates/edit_prediction/src/example_spec.rs +++ b/crates/edit_prediction/src/example_spec.rs @@ -68,6 +68,8 @@ pub struct CapturedPromptInput { pub related_files: Vec, #[serde(default)] pub in_open_source_repo: bool, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub zed_version: Option, } #[derive(Clone, Debug, PartialEq, Hash, Serialize, Deserialize)] diff --git a/crates/edit_prediction_cli/src/main.rs b/crates/edit_prediction_cli/src/main.rs index d2b4950e868a3afa477ac141481bb234fe79aedb..887b313235936df9d9c7b7dc3fd02366790ec9cc 100644 --- a/crates/edit_prediction_cli/src/main.rs +++ b/crates/edit_prediction_cli/src/main.rs @@ -478,6 +478,14 @@ impl EpArgs { } } +/// Minimum Zed version required for Snowflake queries. +/// This version introduced the current request schema with predicted edits in the edit +/// history, and open source repos distinguished. +const MIN_CAPTURE_VERSION: pull_examples::MinCaptureVersion = pull_examples::MinCaptureVersion { + minor: 224, + patch: 1, +}; + async fn load_examples( http_client: Arc, args: &EpArgs, @@ -514,6 +522,20 @@ async fn load_examples( let mut examples = read_example_files(&file_inputs); + // Apply offset to file examples first, then pass remaining offset to Snowflake. + let file_example_count = examples.len(); + let remaining_offset = if let Some(offset) = args.offset { + if offset >= file_example_count { + examples.clear(); + offset - file_example_count + } else { + examples.splice(0..offset, []); + 0 + } + } else { + 0 + }; + Progress::global().set_total_examples(examples.len()); let remaining_limit_for_snowflake = @@ -533,7 +555,9 @@ async fn load_examples( http_client.clone(), &captured_after_timestamps, max_rows_per_timestamp, + remaining_offset, background_executor.clone(), + Some(MIN_CAPTURE_VERSION), ) .await?; examples.append(&mut captured_examples); @@ -546,7 +570,9 @@ async fn load_examples( http_client.clone(), &rejected_after_timestamps, max_rows_per_timestamp, + remaining_offset, background_executor.clone(), + Some(MIN_CAPTURE_VERSION), ) .await?; examples.append(&mut rejected_examples); @@ -559,7 +585,9 @@ async fn load_examples( http_client.clone(), &requested_after_timestamps, max_rows_per_timestamp, + remaining_offset, background_executor.clone(), + Some(MIN_CAPTURE_VERSION), ) .await?; examples.append(&mut requested_examples); @@ -572,7 +600,9 @@ async fn load_examples( http_client, &rated_after_inputs, max_rows_per_timestamp, + remaining_offset, background_executor, + Some(MIN_CAPTURE_VERSION), ) .await?; examples.append(&mut rated_examples); @@ -598,10 +628,6 @@ async fn load_examples( } } - if let Some(offset) = args.offset { - examples.splice(0..offset, []); - } - if let Some(limit) = args.limit { examples.truncate(limit); } diff --git a/crates/edit_prediction_cli/src/pull_examples.rs b/crates/edit_prediction_cli/src/pull_examples.rs index 46ee3ba590ed98aad0e05aac527cf671018fd162..fbf4782db02fa6ef3d74c8f0cb7bf3c73fb42a30 100644 --- a/crates/edit_prediction_cli/src/pull_examples.rs +++ b/crates/edit_prediction_cli/src/pull_examples.rs @@ -28,6 +28,15 @@ const PREDICTIVE_EDIT_REQUESTED_EVENT: &str = "Predictive Edit Requested"; const PREDICTIVE_EDIT_REJECTED_EVENT: &str = "Predictive Edit Rejected"; const EDIT_PREDICTION_RATED_EVENT: &str = "Edit Prediction Rated"; +/// Minimum Zed version for filtering captured examples. +/// For example, `MinCaptureVersion { minor: 224, patch: 1 }` means only pull examples +/// where `zed_version >= 0.224.1`. +#[derive(Clone, Copy, Debug)] +pub struct MinCaptureVersion { + pub minor: u32, + pub patch: u32, +} + const DEFAULT_STATEMENT_TIMEOUT_SECONDS: u64 = 120; pub(crate) const POLL_INTERVAL: Duration = Duration::from_secs(2); pub(crate) const MAX_POLL_ATTEMPTS: usize = 120; @@ -66,7 +75,9 @@ pub async fn fetch_captured_examples_after( http_client: Arc, after_timestamps: &[String], max_rows_per_timestamp: usize, + offset: usize, background_executor: BackgroundExecutor, + _min_capture_version: Option, ) -> Result> { if after_timestamps.is_empty() { return Ok(Vec::new()); @@ -96,6 +107,7 @@ pub async fn fetch_captured_examples_after( AND time > TRY_TO_TIMESTAMP_NTZ(?) ORDER BY time ASC LIMIT ? + OFFSET ? "#}; let request = json!({ @@ -108,7 +120,8 @@ pub async fn fetch_captured_examples_after( "bindings": { "1": { "type": "TEXT", "value": EDIT_PREDICTION_EXAMPLE_CAPTURED_EVENT }, "2": { "type": "TEXT", "value": after_date }, - "3": { "type": "FIXED", "value": max_rows_per_timestamp.to_string() } + "3": { "type": "FIXED", "value": max_rows_per_timestamp.to_string() }, + "4": { "type": "FIXED", "value": offset.to_string() } } }); @@ -457,7 +470,9 @@ pub async fn fetch_rejected_examples_after( http_client: Arc, after_timestamps: &[String], max_rows_per_timestamp: usize, + offset: usize, background_executor: BackgroundExecutor, + min_capture_version: Option, ) -> Result> { if after_timestamps.is_empty() { return Ok(Vec::new()); @@ -492,7 +507,8 @@ pub async fn fetch_rejected_examples_after( req.event_properties:prompt::string AS prompt, req.event_properties:output::string AS output, rej.event_properties:was_shown::boolean AS was_shown, - rej.event_properties:reason::string AS reason + rej.event_properties:reason::string AS reason, + req.event_properties:zed_version::string AS zed_version FROM events req INNER JOIN events rej ON req.event_properties:request_id = rej.event_properties:request_id @@ -501,10 +517,22 @@ pub async fn fetch_rejected_examples_after( AND req.event_properties:version = 'V3' AND rej.event_properties:was_shown = true AND req.time > TRY_TO_TIMESTAMP_NTZ(?) + AND (? IS NULL OR ( + TRY_CAST(SPLIT_PART(req.event_properties:zed_version::string, '.', 2) AS INTEGER) > ? + OR ( + TRY_CAST(SPLIT_PART(req.event_properties:zed_version::string, '.', 2) AS INTEGER) = ? + AND TRY_CAST(SPLIT_PART(SPLIT_PART(req.event_properties:zed_version::string, '.', 3), '+', 1) AS INTEGER) >= ? + ) + )) ORDER BY req.time ASC LIMIT ? + OFFSET ? "#}; + let min_minor_str = min_capture_version.map(|v| v.minor.to_string()); + let min_patch_str = min_capture_version.map(|v| v.patch.to_string()); + let min_minor_str_ref = min_minor_str.as_deref(); + let min_patch_str_ref = min_patch_str.as_deref(); let request = json!({ "statement": statement, "timeout": DEFAULT_STATEMENT_TIMEOUT_SECONDS, @@ -516,7 +544,12 @@ pub async fn fetch_rejected_examples_after( "1": { "type": "TEXT", "value": PREDICTIVE_EDIT_REQUESTED_EVENT }, "2": { "type": "TEXT", "value": PREDICTIVE_EDIT_REJECTED_EVENT }, "3": { "type": "TEXT", "value": after_date }, - "4": { "type": "FIXED", "value": max_rows_per_timestamp.to_string() } + "4": { "type": "FIXED", "value": min_minor_str_ref }, + "5": { "type": "FIXED", "value": min_minor_str_ref }, + "6": { "type": "FIXED", "value": min_minor_str_ref }, + "7": { "type": "FIXED", "value": min_patch_str_ref }, + "8": { "type": "FIXED", "value": max_rows_per_timestamp.to_string() }, + "9": { "type": "FIXED", "value": offset.to_string() } } }); @@ -557,6 +590,7 @@ pub async fn fetch_rejected_examples_after( "output", "was_shown", "reason", + "zed_version", ], ); @@ -601,7 +635,9 @@ pub async fn fetch_requested_examples_after( http_client: Arc, after_timestamps: &[String], max_rows_per_timestamp: usize, + offset: usize, background_executor: BackgroundExecutor, + min_capture_version: Option, ) -> Result> { if after_timestamps.is_empty() { return Ok(Vec::new()); @@ -628,15 +664,28 @@ pub async fn fetch_requested_examples_after( req.event_properties:request_id::string AS request_id, req.device_id::string AS device_id, req.time::string AS time, - req.event_properties:input AS input + req.event_properties:input AS input, + req.event_properties:zed_version::string AS zed_version FROM events req WHERE req.event_type = ? AND req.event_properties:version = 'V3' AND req.time > TRY_TO_TIMESTAMP_NTZ(?) + AND (? IS NULL OR ( + TRY_CAST(SPLIT_PART(req.event_properties:zed_version::string, '.', 2) AS INTEGER) > ? + OR ( + TRY_CAST(SPLIT_PART(req.event_properties:zed_version::string, '.', 2) AS INTEGER) = ? + AND TRY_CAST(SPLIT_PART(SPLIT_PART(req.event_properties:zed_version::string, '.', 3), '+', 1) AS INTEGER) >= ? + ) + )) ORDER BY req.time ASC LIMIT ? + OFFSET ? "#}; + let min_minor_str = min_capture_version.map(|v| v.minor.to_string()); + let min_patch_str = min_capture_version.map(|v| v.patch.to_string()); + let min_minor_str_ref = min_minor_str.as_deref(); + let min_patch_str_ref = min_patch_str.as_deref(); let request = json!({ "statement": statement, "timeout": DEFAULT_STATEMENT_TIMEOUT_SECONDS, @@ -647,7 +696,12 @@ pub async fn fetch_requested_examples_after( "bindings": { "1": { "type": "TEXT", "value": PREDICTIVE_EDIT_REQUESTED_EVENT }, "2": { "type": "TEXT", "value": after_date }, - "3": { "type": "FIXED", "value": max_rows_per_timestamp.to_string() } + "3": { "type": "FIXED", "value": min_minor_str_ref }, + "4": { "type": "FIXED", "value": min_minor_str_ref }, + "5": { "type": "FIXED", "value": min_minor_str_ref }, + "6": { "type": "FIXED", "value": min_patch_str_ref }, + "7": { "type": "FIXED", "value": max_rows_per_timestamp.to_string() }, + "8": { "type": "FIXED", "value": offset.to_string() } } }); @@ -679,7 +733,7 @@ pub async fn fetch_requested_examples_after( let column_indices = get_column_indices( &response.result_set_meta_data, - &["request_id", "device_id", "time", "input"], + &["request_id", "device_id", "time", "input", "zed_version"], ); all_examples.extend(requested_examples_from_response( @@ -726,7 +780,9 @@ pub async fn fetch_rated_examples_after( http_client: Arc, inputs: &[(String, Option)], max_rows_per_timestamp: usize, + offset: usize, background_executor: BackgroundExecutor, + min_capture_version: Option, ) -> Result> { if inputs.is_empty() { return Ok(Vec::new()); @@ -768,7 +824,8 @@ pub async fn fetch_rated_examples_after( rated.device_id::string AS device_id, rated.time::string AS time, deploy.event_properties:experiment_name::string AS experiment_name, - deploy.event_properties:environment::string AS environment + deploy.event_properties:environment::string AS environment, + rated.event_properties:zed_version::string AS zed_version FROM events rated LEFT JOIN events req ON rated.event_properties:request_id::string = req.event_properties:request_id::string @@ -783,10 +840,22 @@ pub async fn fetch_rated_examples_after( AND rated.event_properties:inputs IS NOT NULL AND rated.event_properties:inputs:cursor_excerpt IS NOT NULL AND rated.event_properties:output IS NOT NULL + AND (? IS NULL OR ( + TRY_CAST(SPLIT_PART(rated.event_properties:zed_version::string, '.', 2) AS INTEGER) > ? + OR ( + TRY_CAST(SPLIT_PART(rated.event_properties:zed_version::string, '.', 2) AS INTEGER) = ? + AND TRY_CAST(SPLIT_PART(SPLIT_PART(rated.event_properties:zed_version::string, '.', 3), '+', 1) AS INTEGER) >= ? + ) + )) ORDER BY rated.time ASC LIMIT ? + OFFSET ? "#}; + let min_minor_str = min_capture_version.map(|v| v.minor.to_string()); + let min_patch_str = min_capture_version.map(|v| v.patch.to_string()); + let min_minor_str_ref = min_minor_str.as_deref(); + let min_patch_str_ref = min_patch_str.as_deref(); let bindings = json!({ "1": { "type": "TEXT", "value": PREDICTIVE_EDIT_REQUESTED_EVENT }, "2": { "type": "TEXT", "value": EDIT_PREDICTION_DEPLOYMENT_EVENT }, @@ -794,7 +863,12 @@ pub async fn fetch_rated_examples_after( "4": { "type": "TEXT", "value": rating_value }, "5": { "type": "TEXT", "value": rating_value }, "6": { "type": "TEXT", "value": after_date }, - "7": { "type": "FIXED", "value": max_rows_per_timestamp.to_string() } + "7": { "type": "FIXED", "value": min_minor_str_ref }, + "8": { "type": "FIXED", "value": min_minor_str_ref }, + "9": { "type": "FIXED", "value": min_minor_str_ref }, + "10": { "type": "FIXED", "value": min_patch_str_ref }, + "11": { "type": "FIXED", "value": max_rows_per_timestamp.to_string() }, + "12": { "type": "FIXED", "value": offset.to_string() } }); let request = json!({ @@ -845,6 +919,7 @@ pub async fn fetch_rated_examples_after( "time", "experiment_name", "environment", + "zed_version", ], ); @@ -945,6 +1020,7 @@ fn rated_examples_from_response<'a>( let time = get_string("time"); let experiment_name = get_string("experiment_name"); let environment = get_string("environment"); + let zed_version = get_string("zed_version"); match (inputs, output.clone(), rating.clone(), device_id.clone(), time.clone()) { (Some(inputs), Some(output), Some(rating), Some(device_id), Some(time)) => { @@ -958,6 +1034,7 @@ fn rated_examples_from_response<'a>( feedback, experiment_name, environment, + zed_version, )) } _ => { @@ -987,6 +1064,7 @@ fn build_rated_example( feedback: String, experiment_name: Option, environment: Option, + zed_version: Option, ) -> Example { let parsed_rating = if rating == "Positive" { EditPredictionRating::Positive @@ -1009,7 +1087,8 @@ fn build_rated_example( tags.push(format!("environment:{env}")); } - let mut example = build_example_from_snowflake(request_id, device_id, time, input, tags, None); + let mut example = + build_example_from_snowflake(request_id, device_id, time, input, tags, None, zed_version); example.spec.rating = Some(parsed_rating); @@ -1074,6 +1153,7 @@ fn requested_examples_from_response<'a>( let input_json = get_json("input"); let input: Option = input_json.clone().and_then(|v| serde_json::from_value(v).ok()); + let zed_version = get_string("zed_version"); match (request_id_str.clone(), device_id.clone(), time.clone(), input) { (Some(request_id), Some(device_id), Some(time), Some(input)) => { @@ -1084,6 +1164,7 @@ fn requested_examples_from_response<'a>( input, vec!["requested".to_string()], None, + zed_version, )) } _ => { @@ -1159,6 +1240,7 @@ fn rejected_examples_from_response<'a>( let output = get_string("output"); let was_shown = get_bool("was_shown"); let reason = get_string("reason"); + let zed_version = get_string("zed_version"); match (request_id_str.clone(), device_id.clone(), time.clone(), input, output.clone(), was_shown, reason.clone()) { (Some(request_id), Some(device_id), Some(time), Some(input), Some(output), Some(was_shown), Some(reason)) => { @@ -1170,6 +1252,7 @@ fn rejected_examples_from_response<'a>( output, was_shown, reason, + zed_version, )) } _ => { @@ -1199,6 +1282,7 @@ fn build_rejected_example( output: String, was_shown: bool, reason: String, + zed_version: Option, ) -> Example { let rejected_patch = build_output_patch( &input.cursor_path, @@ -1213,6 +1297,7 @@ fn build_rejected_example( input, vec![format!("rejection:{}", reason.to_lowercase())], Some(RejectionInfo { reason, was_shown }), + zed_version, ); example.spec.rejected_patch = Some(rejected_patch); example @@ -1230,6 +1315,7 @@ fn build_example_from_snowflake( input: ZetaPromptInput, tags: Vec, rejection: Option, + zed_version: Option, ) -> Example { let events: Vec = input .events @@ -1305,6 +1391,7 @@ fn build_example_from_snowflake( events, related_files, in_open_source_repo: input.in_open_source_repo, + zed_version, }), telemetry: Some(TelemetrySource { request_id,