diff --git a/crates/edit_prediction_cli/src/main.rs b/crates/edit_prediction_cli/src/main.rs index 8bb4b2a8e2f50d448fc314a70e2fc94cfa2c3d71..afe25c5badcfff03babd5e951ae66839ce0f790b 100644 --- a/crates/edit_prediction_cli/src/main.rs +++ b/crates/edit_prediction_cli/src/main.rs @@ -738,6 +738,21 @@ async fn load_examples( examples.append(&mut requested_examples); } + if !captured_after_timestamps.is_empty() { + captured_after_timestamps.sort(); + + let mut captured_examples = pull_examples::fetch_captured_examples_after( + http_client.clone(), + &captured_after_timestamps, + max_rows_per_timestamp, + remaining_offset, + background_executor.clone(), + Some(MIN_CAPTURE_VERSION), + ) + .await?; + examples.append(&mut captured_examples); + } + if !settled_after_timestamps.is_empty() { settled_after_timestamps.sort(); diff --git a/crates/edit_prediction_cli/src/pull_examples.rs b/crates/edit_prediction_cli/src/pull_examples.rs index cccd351dcdeda0dbf059d851a44b02bc1e558654..15591ae03ccd7b0d537b437c1da2c0898e7e9446 100644 --- a/crates/edit_prediction_cli/src/pull_examples.rs +++ b/crates/edit_prediction_cli/src/pull_examples.rs @@ -565,6 +565,101 @@ pub async fn fetch_requested_examples_after( Ok(all_examples) } +pub async fn fetch_captured_examples_after( + http_client: Arc, + after_timestamps: &[String], + max_rows_per_timestamp: usize, + offset: usize, + background_executor: BackgroundExecutor, + min_capture_version: Option, +) -> Result> { + if after_timestamps.is_empty() { + return Ok(Vec::new()); + } + + let progress = Progress::global(); + + let mut all_examples = Vec::new(); + + for after_date in after_timestamps.iter() { + let step_progress_name = format!("captured>{after_date}"); + let step_progress = progress.start(Step::PullExamples, &step_progress_name); + step_progress.set_substatus("querying"); + + let min_minor_str = min_capture_version.map(|version| version.minor.to_string()); + let min_patch_str = min_capture_version.map(|version| version.patch.to_string()); + let min_minor_str_ref = min_minor_str.as_deref(); + let min_patch_str_ref = min_patch_str.as_deref(); + + let statement = indoc! {r#" + SELECT + settled.event_properties:request_id::string AS request_id, + settled.device_id::string AS device_id, + settled.time::string AS time, + req.event_properties:input AS input, + settled.event_properties:settled_editable_region::string AS settled_editable_region, + settled.event_properties:example AS example, + req.event_properties:zed_version::string AS zed_version + FROM events settled + INNER JOIN events req + ON settled.event_properties:request_id::string = req.event_properties:request_id::string + WHERE settled.event_type = ? + AND req.event_type = ? + AND req.event_properties:version = 'V3' + AND req.event_properties:input:can_collect_data = true + AND settled.event_properties:example IS NOT NULL + AND TYPEOF(settled.event_properties:example) != 'NULL_VALUE' + AND settled.time > TRY_TO_TIMESTAMP_NTZ(?) + AND (? IS NULL OR ( + TRY_CAST(SPLIT_PART(req.event_properties:zed_version::string, '.', 2) AS INTEGER) > ? + OR ( + TRY_CAST(SPLIT_PART(req.event_properties:zed_version::string, '.', 2) AS INTEGER) = ? + AND TRY_CAST(SPLIT_PART(SPLIT_PART(req.event_properties:zed_version::string, '.', 3), '+', 1) AS INTEGER) >= ? + ) + )) + ORDER BY settled.time ASC + LIMIT ? + OFFSET ? + "#}; + + let bindings = json!({ + "1": { "type": "TEXT", "value": EDIT_PREDICTION_SETTLED_EVENT }, + "2": { "type": "TEXT", "value": PREDICTIVE_EDIT_REQUESTED_EVENT }, + "3": { "type": "TEXT", "value": after_date }, + "4": { "type": "FIXED", "value": min_minor_str_ref }, + "5": { "type": "FIXED", "value": min_minor_str_ref }, + "6": { "type": "FIXED", "value": min_minor_str_ref }, + "7": { "type": "FIXED", "value": min_patch_str_ref }, + "8": { "type": "FIXED", "value": max_rows_per_timestamp.to_string() }, + "9": { "type": "FIXED", "value": offset.to_string() } + }); + + let examples = fetch_examples_with_query( + http_client.clone(), + &step_progress, + background_executor.clone(), + statement, + bindings, + DEFAULT_STATEMENT_TIMEOUT_SECONDS, + &[ + "request_id", + "device_id", + "time", + "input", + "settled_editable_region", + "example", + "zed_version", + ], + captured_examples_from_response, + ) + .await?; + + all_examples.extend(examples); + } + + Ok(all_examples) +} + pub async fn fetch_settled_examples_after( http_client: Arc, after_timestamps: &[String], @@ -1018,7 +1113,7 @@ fn settled_examples_from_response<'a>( } }; - let parse_json_value = |_: &str, raw: Option<&JsonValue>| -> Option { + let parse_json_value = |raw: Option<&JsonValue>| -> Option { let value = raw?; match value { JsonValue::String(s) => serde_json::from_str::(s).ok(), @@ -1030,7 +1125,7 @@ fn settled_examples_from_response<'a>( let device_id = get_string("device_id"); let time = get_string("time"); let input_raw = get_value("input"); - let input_json = parse_json_value("input", input_raw.as_ref()); + let input_json = parse_json_value(input_raw.as_ref()); let input: Option = input_json .as_ref() .and_then(|parsed| serde_json::from_value(parsed.clone()).ok()); @@ -1104,6 +1199,133 @@ fn settled_examples_from_response<'a>( Ok(Box::new(iter)) } +fn captured_examples_from_response<'a>( + response: &'a SnowflakeStatementResponse, + column_indices: &'a std::collections::HashMap, +) -> Result + 'a>> { + if let Some(code) = &response.code { + if code != SNOWFLAKE_SUCCESS_CODE { + anyhow::bail!( + "snowflake sql api returned error code={code} message={}", + response.message.as_deref().unwrap_or("") + ); + } + } + + let iter = response + .data + .iter() + .enumerate() + .filter_map(move |(row_index, data_row)| { + let get_value = |name: &str| -> Option { + let index = column_indices.get(name).copied()?; + let value = data_row.get(index)?; + if value.is_null() { + None + } else { + Some(value.clone()) + } + }; + + let get_string = |name: &str| -> Option { + match get_value(name)? { + JsonValue::String(s) => Some(s), + other => Some(other.to_string()), + } + }; + + let parse_json_value = |raw: Option<&JsonValue>| -> Option { + let value = raw?; + match value { + JsonValue::String(s) => serde_json::from_str::(s).ok(), + other => Some(other.clone()), + } + }; + + let request_id = get_string("request_id"); + let device_id = get_string("device_id"); + let time = get_string("time"); + let input_raw = get_value("input"); + let input_json = parse_json_value(input_raw.as_ref()); + let input: Option = input_json + .as_ref() + .and_then(|parsed| serde_json::from_value(parsed.clone()).ok()); + let example_raw = get_value("example"); + let example_json = parse_json_value(example_raw.as_ref()); + let example_spec: Option = example_json.as_ref().and_then(|parsed| { + serde_json::from_value(parsed.clone()) + .or_else(|_| { + parsed + .as_str() + .and_then(|markdown| ExampleSpec::from_markdown(markdown).ok()) + .ok_or_else(|| { + serde_json::Error::io(std::io::Error::other("not markdown")) + }) + }) + .ok() + }); + let has_example_spec = example_spec.is_some(); + let settled_editable_region = get_string("settled_editable_region"); + let zed_version = get_string("zed_version"); + + match ( + request_id.clone(), + device_id.clone(), + time.clone(), + input.clone(), + example_spec, + settled_editable_region.clone(), + ) { + ( + Some(request_id), + Some(device_id), + Some(time), + Some(input), + Some(example_spec), + Some(settled_editable_region), + ) => Some(build_captured_example( + request_id, + device_id, + time, + input, + example_spec, + settled_editable_region, + zed_version, + )), + _ => { + let mut missing_fields = Vec::new(); + + if request_id.is_none() { + missing_fields.push("request_id"); + } + if device_id.is_none() { + missing_fields.push("device_id"); + } + if time.is_none() { + missing_fields.push("time"); + } + if input_raw.is_none() || input_json.is_none() || input.is_none() { + missing_fields.push("input"); + } + if example_raw.is_none() || !has_example_spec { + missing_fields.push("example"); + } + if settled_editable_region.is_none() { + missing_fields.push("settled_editable_region"); + } + + log::warn!( + "skipping captured row {row_index}: [{}]", + missing_fields.join(", "), + ); + None + } + } + }); + + Ok(Box::new(iter)) +} + fn build_settled_example( request_id: String, device_id: String, @@ -1160,6 +1382,43 @@ fn build_settled_example( example } +fn build_captured_example( + request_id: String, + device_id: String, + time: String, + input: ZetaPromptInput, + mut example_spec: ExampleSpec, + settled_editable_region: String, + zed_version: Option, +) -> Example { + let expected_patch = build_output_patch( + &input.cursor_path, + input.cursor_excerpt.as_ref(), + &input.excerpt_ranges.editable_350, + settled_editable_region.as_str(), + ); + + example_spec.expected_patches = vec![expected_patch]; + example_spec.telemetry = Some(TelemetrySource { + request_id, + device_id, + time, + rejection_reason: String::new(), + was_shown: false, + }); + + Example { + spec: example_spec, + zed_version, + prompt_inputs: Some(input), + prompt: None, + predictions: Vec::new(), + score: Vec::new(), + qa: Vec::new(), + state: None, + } +} + fn rejected_examples_from_response<'a>( response: &'a SnowflakeStatementResponse, column_indices: &'a std::collections::HashMap,