ep: Add captured example fetching (#50960)

Ben Kunkle created

Closes #ISSUE

Before you mark this PR as ready for review, make sure that you have:
- [ ] Added a solid test coverage and/or screenshots from doing manual
testing
- [ ] Done a self-review taking into account security and performance
aspects
- [ ] Aligned any UI changes with the [UI
checklist](https://github.com/zed-industries/zed/blob/main/CONTRIBUTING.md#uiux-checklist)

Release Notes:

- N/A *or* Added/Fixed/Improved ...

Change summary

crates/edit_prediction_cli/src/main.rs          |  15 +
crates/edit_prediction_cli/src/pull_examples.rs | 263 ++++++++++++++++++
2 files changed, 276 insertions(+), 2 deletions(-)

Detailed changes

crates/edit_prediction_cli/src/main.rs 🔗

@@ -738,6 +738,21 @@ async fn load_examples(
             examples.append(&mut requested_examples);
         }
 
+        if !captured_after_timestamps.is_empty() {
+            captured_after_timestamps.sort();
+
+            let mut captured_examples = pull_examples::fetch_captured_examples_after(
+                http_client.clone(),
+                &captured_after_timestamps,
+                max_rows_per_timestamp,
+                remaining_offset,
+                background_executor.clone(),
+                Some(MIN_CAPTURE_VERSION),
+            )
+            .await?;
+            examples.append(&mut captured_examples);
+        }
+
         if !settled_after_timestamps.is_empty() {
             settled_after_timestamps.sort();
 

crates/edit_prediction_cli/src/pull_examples.rs 🔗

@@ -565,6 +565,101 @@ pub async fn fetch_requested_examples_after(
     Ok(all_examples)
 }
 
+pub async fn fetch_captured_examples_after(
+    http_client: Arc<dyn HttpClient>,
+    after_timestamps: &[String],
+    max_rows_per_timestamp: usize,
+    offset: usize,
+    background_executor: BackgroundExecutor,
+    min_capture_version: Option<MinCaptureVersion>,
+) -> Result<Vec<Example>> {
+    if after_timestamps.is_empty() {
+        return Ok(Vec::new());
+    }
+
+    let progress = Progress::global();
+
+    let mut all_examples = Vec::new();
+
+    for after_date in after_timestamps.iter() {
+        let step_progress_name = format!("captured>{after_date}");
+        let step_progress = progress.start(Step::PullExamples, &step_progress_name);
+        step_progress.set_substatus("querying");
+
+        let min_minor_str = min_capture_version.map(|version| version.minor.to_string());
+        let min_patch_str = min_capture_version.map(|version| version.patch.to_string());
+        let min_minor_str_ref = min_minor_str.as_deref();
+        let min_patch_str_ref = min_patch_str.as_deref();
+
+        let statement = indoc! {r#"
+            SELECT
+                settled.event_properties:request_id::string AS request_id,
+                settled.device_id::string AS device_id,
+                settled.time::string AS time,
+                req.event_properties:input AS input,
+                settled.event_properties:settled_editable_region::string AS settled_editable_region,
+                settled.event_properties:example AS example,
+                req.event_properties:zed_version::string AS zed_version
+            FROM events settled
+            INNER JOIN events req
+                ON settled.event_properties:request_id::string = req.event_properties:request_id::string
+            WHERE settled.event_type = ?
+                AND req.event_type = ?
+                AND req.event_properties:version = 'V3'
+                AND req.event_properties:input:can_collect_data = true
+                AND settled.event_properties:example IS NOT NULL
+                AND TYPEOF(settled.event_properties:example) != 'NULL_VALUE'
+                AND settled.time > TRY_TO_TIMESTAMP_NTZ(?)
+                AND (? IS NULL OR (
+                    TRY_CAST(SPLIT_PART(req.event_properties:zed_version::string, '.', 2) AS INTEGER) > ?
+                    OR (
+                        TRY_CAST(SPLIT_PART(req.event_properties:zed_version::string, '.', 2) AS INTEGER) = ?
+                        AND TRY_CAST(SPLIT_PART(SPLIT_PART(req.event_properties:zed_version::string, '.', 3), '+', 1) AS INTEGER) >= ?
+                    )
+                ))
+            ORDER BY settled.time ASC
+            LIMIT ?
+            OFFSET ?
+        "#};
+
+        let bindings = json!({
+            "1": { "type": "TEXT", "value": EDIT_PREDICTION_SETTLED_EVENT },
+            "2": { "type": "TEXT", "value": PREDICTIVE_EDIT_REQUESTED_EVENT },
+            "3": { "type": "TEXT", "value": after_date },
+            "4": { "type": "FIXED", "value": min_minor_str_ref },
+            "5": { "type": "FIXED", "value": min_minor_str_ref },
+            "6": { "type": "FIXED", "value": min_minor_str_ref },
+            "7": { "type": "FIXED", "value": min_patch_str_ref },
+            "8": { "type": "FIXED", "value": max_rows_per_timestamp.to_string() },
+            "9": { "type": "FIXED", "value": offset.to_string() }
+        });
+
+        let examples = fetch_examples_with_query(
+            http_client.clone(),
+            &step_progress,
+            background_executor.clone(),
+            statement,
+            bindings,
+            DEFAULT_STATEMENT_TIMEOUT_SECONDS,
+            &[
+                "request_id",
+                "device_id",
+                "time",
+                "input",
+                "settled_editable_region",
+                "example",
+                "zed_version",
+            ],
+            captured_examples_from_response,
+        )
+        .await?;
+
+        all_examples.extend(examples);
+    }
+
+    Ok(all_examples)
+}
+
 pub async fn fetch_settled_examples_after(
     http_client: Arc<dyn HttpClient>,
     after_timestamps: &[String],
@@ -1018,7 +1113,7 @@ fn settled_examples_from_response<'a>(
                 }
             };
 
-            let parse_json_value = |_: &str, raw: Option<&JsonValue>| -> Option<JsonValue> {
+            let parse_json_value = |raw: Option<&JsonValue>| -> Option<JsonValue> {
                 let value = raw?;
                 match value {
                     JsonValue::String(s) => serde_json::from_str::<JsonValue>(s).ok(),
@@ -1030,7 +1125,7 @@ fn settled_examples_from_response<'a>(
             let device_id = get_string("device_id");
             let time = get_string("time");
             let input_raw = get_value("input");
-            let input_json = parse_json_value("input", input_raw.as_ref());
+            let input_json = parse_json_value(input_raw.as_ref());
             let input: Option<ZetaPromptInput> = input_json
                 .as_ref()
                 .and_then(|parsed| serde_json::from_value(parsed.clone()).ok());
@@ -1104,6 +1199,133 @@ fn settled_examples_from_response<'a>(
     Ok(Box::new(iter))
 }
 
+fn captured_examples_from_response<'a>(
+    response: &'a SnowflakeStatementResponse,
+    column_indices: &'a std::collections::HashMap<String, usize>,
+) -> Result<Box<dyn Iterator<Item = Example> + 'a>> {
+    if let Some(code) = &response.code {
+        if code != SNOWFLAKE_SUCCESS_CODE {
+            anyhow::bail!(
+                "snowflake sql api returned error code={code} message={}",
+                response.message.as_deref().unwrap_or("<no message>")
+            );
+        }
+    }
+
+    let iter = response
+        .data
+        .iter()
+        .enumerate()
+        .filter_map(move |(row_index, data_row)| {
+            let get_value = |name: &str| -> Option<JsonValue> {
+                let index = column_indices.get(name).copied()?;
+                let value = data_row.get(index)?;
+                if value.is_null() {
+                    None
+                } else {
+                    Some(value.clone())
+                }
+            };
+
+            let get_string = |name: &str| -> Option<String> {
+                match get_value(name)? {
+                    JsonValue::String(s) => Some(s),
+                    other => Some(other.to_string()),
+                }
+            };
+
+            let parse_json_value = |raw: Option<&JsonValue>| -> Option<JsonValue> {
+                let value = raw?;
+                match value {
+                    JsonValue::String(s) => serde_json::from_str::<JsonValue>(s).ok(),
+                    other => Some(other.clone()),
+                }
+            };
+
+            let request_id = get_string("request_id");
+            let device_id = get_string("device_id");
+            let time = get_string("time");
+            let input_raw = get_value("input");
+            let input_json = parse_json_value(input_raw.as_ref());
+            let input: Option<ZetaPromptInput> = input_json
+                .as_ref()
+                .and_then(|parsed| serde_json::from_value(parsed.clone()).ok());
+            let example_raw = get_value("example");
+            let example_json = parse_json_value(example_raw.as_ref());
+            let example_spec: Option<ExampleSpec> = example_json.as_ref().and_then(|parsed| {
+                serde_json::from_value(parsed.clone())
+                    .or_else(|_| {
+                        parsed
+                            .as_str()
+                            .and_then(|markdown| ExampleSpec::from_markdown(markdown).ok())
+                            .ok_or_else(|| {
+                                serde_json::Error::io(std::io::Error::other("not markdown"))
+                            })
+                    })
+                    .ok()
+            });
+            let has_example_spec = example_spec.is_some();
+            let settled_editable_region = get_string("settled_editable_region");
+            let zed_version = get_string("zed_version");
+
+            match (
+                request_id.clone(),
+                device_id.clone(),
+                time.clone(),
+                input.clone(),
+                example_spec,
+                settled_editable_region.clone(),
+            ) {
+                (
+                    Some(request_id),
+                    Some(device_id),
+                    Some(time),
+                    Some(input),
+                    Some(example_spec),
+                    Some(settled_editable_region),
+                ) => Some(build_captured_example(
+                    request_id,
+                    device_id,
+                    time,
+                    input,
+                    example_spec,
+                    settled_editable_region,
+                    zed_version,
+                )),
+                _ => {
+                    let mut missing_fields = Vec::new();
+
+                    if request_id.is_none() {
+                        missing_fields.push("request_id");
+                    }
+                    if device_id.is_none() {
+                        missing_fields.push("device_id");
+                    }
+                    if time.is_none() {
+                        missing_fields.push("time");
+                    }
+                    if input_raw.is_none() || input_json.is_none() || input.is_none() {
+                        missing_fields.push("input");
+                    }
+                    if example_raw.is_none() || !has_example_spec {
+                        missing_fields.push("example");
+                    }
+                    if settled_editable_region.is_none() {
+                        missing_fields.push("settled_editable_region");
+                    }
+
+                    log::warn!(
+                        "skipping captured row {row_index}: [{}]",
+                        missing_fields.join(", "),
+                    );
+                    None
+                }
+            }
+        });
+
+    Ok(Box::new(iter))
+}
+
 fn build_settled_example(
     request_id: String,
     device_id: String,
@@ -1160,6 +1382,43 @@ fn build_settled_example(
     example
 }
 
+fn build_captured_example(
+    request_id: String,
+    device_id: String,
+    time: String,
+    input: ZetaPromptInput,
+    mut example_spec: ExampleSpec,
+    settled_editable_region: String,
+    zed_version: Option<String>,
+) -> Example {
+    let expected_patch = build_output_patch(
+        &input.cursor_path,
+        input.cursor_excerpt.as_ref(),
+        &input.excerpt_ranges.editable_350,
+        settled_editable_region.as_str(),
+    );
+
+    example_spec.expected_patches = vec![expected_patch];
+    example_spec.telemetry = Some(TelemetrySource {
+        request_id,
+        device_id,
+        time,
+        rejection_reason: String::new(),
+        was_shown: false,
+    });
+
+    Example {
+        spec: example_spec,
+        zed_version,
+        prompt_inputs: Some(input),
+        prompt: None,
+        predictions: Vec::new(),
+        score: Vec::new(),
+        qa: Vec::new(),
+        state: None,
+    }
+}
+
 fn rejected_examples_from_response<'a>(
     response: &'a SnowflakeStatementResponse,
     column_indices: &'a std::collections::HashMap<String, usize>,