main.rs

   1mod anthropic_client;
   2mod distill;
   3mod example;
   4mod filter_languages;
   5mod format_prompt;
   6mod git;
   7mod headless;
   8mod load_project;
   9mod metrics;
  10mod openai_client;
  11mod parse_output;
  12mod paths;
  13mod predict;
  14mod progress;
  15mod prompt_assets;
  16mod pull_examples;
  17mod qa;
  18mod reorder_patch;
  19mod repair;
  20mod retrieve_context;
  21mod reversal_tracking;
  22mod score;
  23mod split_commit;
  24mod split_dataset;
  25mod token_match_debug;
  26
  27mod synthesize;
  28mod truncate_expected_patch;
  29mod word_diff;
  30use clap::{Args, CommandFactory, Parser, Subcommand, ValueEnum};
  31use collections::{HashMap, HashSet};
  32use edit_prediction::EditPredictionStore;
  33use futures::channel::mpsc;
  34use futures::{SinkExt as _, StreamExt as _};
  35use gaoya::minhash::{
  36    MinHashIndex, MinHasher, MinHasher32, calculate_minhash_params, compute_minhash_similarity,
  37};
  38use gpui::{AppContext as _, BackgroundExecutor, Task};
  39use zeta_prompt::ZetaFormat;
  40
  41use reqwest_client::ReqwestClient;
  42use serde::{Deserialize, Deserializer, Serialize, Serializer};
  43use std::env;
  44use std::fmt::Display;
  45use std::fs::{File, OpenOptions};
  46use std::hash::{Hash, Hasher};
  47use std::io::{BufRead, BufReader, BufWriter, Write};
  48use std::sync::Mutex;
  49use std::{path::PathBuf, sync::Arc};
  50
  51use crate::distill::run_distill;
  52use crate::example::{Example, group_examples_by_repo, read_example_files};
  53use crate::filter_languages::{FilterLanguagesArgs, run_filter_languages};
  54use crate::format_prompt::run_format_prompt;
  55use crate::load_project::run_load_project;
  56use crate::paths::{FAILED_EXAMPLES_DIR, RUN_DIR};
  57use crate::predict::run_prediction;
  58use crate::progress::Progress;
  59use crate::pull_examples::{fetch_settled_examples_after, parse_settled_after_input};
  60use crate::retrieve_context::run_context_retrieval;
  61use crate::score::run_scoring;
  62use crate::split_commit::SplitCommitArgs;
  63use crate::split_dataset::SplitArgs;
  64use crate::synthesize::{SynthesizeConfig, run_synthesize};
  65use crate::token_match_debug::{TokenMatchDebugArgs, run_token_match_debug};
  66use crate::truncate_expected_patch::TruncatePatchArgs;
  67
  68#[derive(Parser, Debug)]
  69#[command(name = "ep")]
  70struct EpArgs {
  71    #[arg(long, default_value_t = false)]
  72    printenv: bool,
  73    #[clap(long, default_value_t = 10, global = true)]
  74    max_parallelism: usize,
  75    /// The limit for the number of examples to process
  76    /// Default is unlimited for processing local datasets, 5000 when pulling from snowflake
  77    #[clap(long, global = true)]
  78    limit: Option<usize>,
  79    #[clap(long, global = true)]
  80    offset: Option<usize>,
  81    /// Filter examples by name
  82    #[clap(long, global = true)]
  83    name: Option<String>,
  84    /// Filter examples by repository
  85    #[clap(long, global = true)]
  86    repo: Option<String>,
  87    /// Deduplicate by cursor position and keep at most this many examples per cluster
  88    #[clap(long, global = true)]
  89    max_duplicates: Option<usize>,
  90    #[command(subcommand)]
  91    command: Option<Command>,
  92    /// Input file paths
  93    #[clap(global = true)]
  94    inputs: Vec<PathBuf>,
  95    #[arg(long, short, global = true)]
  96    output: Option<PathBuf>,
  97    #[arg(long, short, global = true)]
  98    in_place: bool,
  99    #[arg(long, short, global = true)]
 100    failfast: bool,
 101    /// How to handle failed examples in output: keep them or skip them.
 102    /// Failed examples are always logged to the run's failed directory.
 103    #[arg(long, global = true, default_value = "keep")]
 104    failed: FailedHandling,
 105    /// Output as markdown files instead of JSONL. When set, -o specifies a directory
 106    /// where one .md file per example will be written (named after each example).
 107    #[arg(long, short, global = true)]
 108    markdown: bool,
 109}
 110
 111/// Controls whether failed examples are included in the main output.
 112/// Failed examples are always logged to the run's failed/ directory regardless of this setting.
 113#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, ValueEnum)]
 114pub enum FailedHandling {
 115    /// Include failed examples in the main output (default)
 116    #[default]
 117    Keep,
 118    /// Exclude failed examples from the main output
 119    Skip,
 120    /// Skip writing files
 121    SkipNoFiles,
 122}
 123
 124const INPUTS_HELP: &str = r#"
 125Inputs can be file paths or special specifiers:
 126
 127  path
 128      Path to an example(s) file (.md, .json, or .jsonl)
 129
 130  captured-after:{timestamp}
 131      Fetch captured examples from Snowflake after the given RFC3339 timestamp.
 132      These are examples captured via the "Capture Edit Prediction Example" action.
 133
 134  rejected-after:{timestamp}
 135      Fetch rejected edit predictions from Snowflake after the given RFC3339 timestamp.
 136      These are predictions that were shown to users but rejected (useful for DPO training).
 137
 138  settled-after:{timestamp}
 139      Fetch settled stream examples from Snowflake after the given RFC3339 timestamp.
 140      These are examples from the edit prediction settled stream.
 141
 142  rated-after:{timestamp}
 143      Fetch user-rated edit predictions from Snowflake after the given RFC3339 timestamp.
 144      These are predictions that users explicitly rated as positive or negative via the
 145      rate completions modal. Only zeta2 predictions are included.
 146      - Positive ratings: output becomes expected_patches
 147      - Negative ratings: output becomes rejected_patch
 148
 149  rated-positive-after:{timestamp}
 150      Same as rated-after, but only fetches positively rated predictions.
 151
 152  rated-negative-after:{timestamp}
 153      Same as rated-after, but only fetches negatively rated predictions.
 154
 155      Required environment variables to connect to Snowflake:
 156          EP_SNOWFLAKE_API_KEY
 157          EP_SNOWFLAKE_BASE_URL
 158
 159      Optional:
 160          EP_SNOWFLAKE_ROLE
 161
 162Examples:
 163
 164  # Read examples from a file
 165  ep read examples.jsonl -o output.jsonl
 166
 167  # Read captured examples after a timestamp
 168  ep read captured-after:2025-01-01T00:00:00Z -o captured.jsonl
 169
 170  # Read rejected predictions for DPO training
 171  ep read rejected-after:2025-01-01T00:00:00Z -o rejected.jsonl
 172
 173  # Read user-rated predictions
 174  ep read rated-after:2025-01-01T00:00:00Z -o rated.jsonl
 175
 176  # Read settled stream examples
 177  ep read settled-after:2025-01-01T00:00:00Z -o settled.jsonl
 178
 179  # Read only positively rated predictions
 180  ep read rated-positive-after:2025-01-01T00:00:00Z -o positive.jsonl
 181
 182  # Read only negatively rated predictions
 183  ep read rated-negative-after:2025-01-01T00:00:00Z -o negative.jsonl
 184
 185  # Mix multiple input sources
 186  ep predict examples.jsonl captured-after:2025-01-01T00:00:00Z
 187"#;
 188
 189#[derive(Subcommand, Debug, Clone)]
 190enum Command {
 191    /// Read examples from files or fetch from Snowflake, output as .jsonl
 192    Read(ReadArgs),
 193    /// Create git worktrees for each example and load file contents
 194    LoadProject,
 195    /// Retrieve context for input examples.
 196    Context,
 197    /// Generate a prompt string for a specific model
 198    FormatPrompt(FormatPromptArgs),
 199    /// Runs edit prediction
 200    Predict(PredictArgs),
 201    /// Parse model outputs (actual_output) into unified diffs (actual_patch).
 202    /// Requires format-prompt to have been run first. Uses provider from prompt.
 203    ParseOutput,
 204    /// Computes a score based on actual and expected patches
 205    Score(PredictArgs),
 206    /// Prepares a distillation dataset by copying expected outputs to
 207    /// predicted outputs and removing actual outputs and prompts.
 208    Distill,
 209    /// Print aggregated scores
 210    Eval(EvalArgs),
 211    /// Generate eval examples by analyzing git commits from a repository
 212    Synthesize(SynthesizeArgs),
 213    /// Remove git repositories and worktrees
 214    Clean,
 215    /// Generate an evaluation example by splitting a chronologically-ordered commit
 216    SplitCommit(SplitCommitArgs),
 217    /// Truncate expected patch by the given criteria
 218    TruncatePatch(TruncatePatchArgs),
 219    /// Generate token-match debug HTML for expected vs predicted patches
 220    TokenMatchDebug(TokenMatchDebugArgs),
 221    /// Split a JSONL dataset into multiple files (stratified by repository_url if present)
 222    Split(SplitArgs),
 223    /// Filter a JSONL dataset by programming language (based on cursor_path extension)
 224    FilterLanguages(FilterLanguagesArgs),
 225    /// Import Anthropic batch results by batch IDs (useful for recovering after database loss)
 226    ImportBatch(ImportBatchArgs),
 227    /// Assess the quality of predictions using LLM-as-a-judge
 228    Qa(qa::QaArgs),
 229    /// Repair predictions that received poor QA scores by generating improved predictions
 230    Repair(repair::RepairArgs),
 231    /// Print all valid zeta formats (lowercase, one per line)
 232    PrintZetaFormats,
 233}
 234
 235impl Display for Command {
 236    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
 237        match self {
 238            Command::Read(_) => write!(f, "read"),
 239            Command::LoadProject => write!(f, "load-project"),
 240            Command::Context => write!(f, "context"),
 241            Command::FormatPrompt(args) => {
 242                write!(f, "format-prompt --provider={}", args.provider)
 243            }
 244            Command::Predict(args) => match &args.provider {
 245                Some(provider) => write!(f, "predict --provider={}", provider),
 246                None => write!(f, "predict"),
 247            },
 248            Command::ParseOutput => write!(f, "parse-output"),
 249            Command::Score(args) => match &args.provider {
 250                Some(provider) => write!(f, "score --provider={}", provider),
 251                None => write!(f, "score"),
 252            },
 253            Command::Distill => write!(f, "distill"),
 254            Command::Eval(args) => match &args.predict.provider {
 255                Some(provider) => write!(f, "eval --provider={}", provider),
 256                None => write!(f, "eval"),
 257            },
 258            Command::Synthesize(args) => {
 259                write!(f, "synthesize --repos {}", args.repos.join(" "))
 260            }
 261            Command::Clean => write!(f, "clean"),
 262            Command::SplitCommit(_) => write!(f, "split-commit"),
 263            Command::TruncatePatch(_) => write!(f, "truncate-patch"),
 264            Command::TokenMatchDebug(_) => write!(f, "token-match-debug"),
 265            Command::Split(_) => write!(f, "split"),
 266            Command::FilterLanguages(_) => write!(f, "filter-languages"),
 267            Command::ImportBatch(args) => {
 268                write!(f, "import-batch --batch-ids {}", args.batch_ids.join(" "))
 269            }
 270            Command::Qa(_) => {
 271                write!(f, "qa")
 272            }
 273            Command::Repair(_) => {
 274                write!(f, "repair")
 275            }
 276            Command::PrintZetaFormats => {
 277                write!(f, "print-zeta-formats")
 278            }
 279        }
 280    }
 281}
 282
 283#[derive(Debug, Args, Clone)]
 284#[command(after_help = INPUTS_HELP)]
 285struct ReadArgs {}
 286
 287#[derive(Debug, Args, Clone)]
 288struct FormatPromptArgs {
 289    #[clap(long, short('p'), default_value_t = PredictionProvider::default())]
 290    provider: PredictionProvider,
 291}
 292
 293#[derive(Debug, Args, Clone)]
 294struct PredictArgs {
 295    #[clap(long, short('p'))]
 296    provider: Option<PredictionProvider>,
 297    #[clap(long, default_value_t = 1)]
 298    repetitions: usize,
 299    /// Only use cached responses, don't queue new requests for batching
 300    #[clap(long)]
 301    cache_only: bool,
 302}
 303
 304#[derive(Debug, Args, Clone)]
 305struct EvalArgs {
 306    #[clap(flatten)]
 307    predict: PredictArgs,
 308    /// Path to write summary scores as JSON
 309    #[clap(long)]
 310    summary_json: Option<PathBuf>,
 311    /// Print all individual example lines (default: up to 20)
 312    #[clap(long)]
 313    verbose: bool,
 314}
 315
 316#[derive(Clone, Copy, Default, Debug, PartialEq, Eq, Hash)]
 317pub enum TeacherBackend {
 318    Sonnet46,
 319    #[default]
 320    Sonnet45,
 321    Gpt52,
 322}
 323
 324impl std::fmt::Display for TeacherBackend {
 325    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
 326        match self {
 327            TeacherBackend::Sonnet46 => write!(f, "sonnet46"),
 328            TeacherBackend::Sonnet45 => write!(f, "sonnet45"),
 329            TeacherBackend::Gpt52 => write!(f, "gpt52"),
 330        }
 331    }
 332}
 333
 334impl std::str::FromStr for TeacherBackend {
 335    type Err = anyhow::Error;
 336
 337    fn from_str(s: &str) -> Result<Self, Self::Err> {
 338        match s.to_lowercase().as_str() {
 339            "sonnet45" | "sonnet" | "claude" => Ok(TeacherBackend::Sonnet45),
 340            "sonnet46" => Ok(TeacherBackend::Sonnet46),
 341            "gpt52" | "gpt" | "openai" => Ok(TeacherBackend::Gpt52),
 342            "v0114180editableregion" => Ok(TeacherBackend::Sonnet45),
 343            _ => anyhow::bail!(
 344                "unknown teacher backend `{s}`. Valid options: sonnet45, sonnet46, gpt52"
 345            ),
 346        }
 347    }
 348}
 349
 350impl TeacherBackend {
 351    pub fn model_name(&self) -> &'static str {
 352        match self {
 353            TeacherBackend::Sonnet45 => "claude-sonnet-4-5",
 354            TeacherBackend::Sonnet46 => "claude-sonnet-4-6",
 355            TeacherBackend::Gpt52 => "gpt-5.2",
 356        }
 357    }
 358}
 359
 360#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
 361enum PredictionProvider {
 362    Sweep,
 363    Mercury,
 364    Zeta1,
 365    Zeta2(ZetaFormat),
 366    Baseten(ZetaFormat),
 367    Teacher(TeacherBackend),
 368    TeacherMultiRegion(TeacherBackend),
 369    TeacherNonBatching(TeacherBackend),
 370    TeacherMultiRegionNonBatching(TeacherBackend),
 371    Repair,
 372}
 373
 374impl Default for PredictionProvider {
 375    fn default() -> Self {
 376        PredictionProvider::Zeta2(ZetaFormat::default())
 377    }
 378}
 379
 380impl std::fmt::Display for PredictionProvider {
 381    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
 382        match self {
 383            PredictionProvider::Sweep => write!(f, "sweep"),
 384            PredictionProvider::Mercury => write!(f, "mercury"),
 385            PredictionProvider::Zeta1 => write!(f, "zeta1"),
 386            PredictionProvider::Zeta2(format) => write!(f, "zeta2:{format}"),
 387            PredictionProvider::Baseten(format) => write!(f, "baseten:{format}"),
 388            PredictionProvider::Teacher(backend) => write!(f, "teacher:{backend}"),
 389            PredictionProvider::TeacherMultiRegion(backend) => {
 390                write!(f, "teacher-multi-region:{backend}")
 391            }
 392            PredictionProvider::TeacherNonBatching(backend) => {
 393                write!(f, "teacher-non-batching:{backend}")
 394            }
 395            PredictionProvider::TeacherMultiRegionNonBatching(backend) => {
 396                write!(f, "teacher-multi-region-non-batching:{backend}")
 397            }
 398            PredictionProvider::Repair => write!(f, "repair"),
 399        }
 400    }
 401}
 402
 403impl std::str::FromStr for PredictionProvider {
 404    type Err = anyhow::Error;
 405
 406    fn from_str(s: &str) -> Result<Self, Self::Err> {
 407        let (provider, arg) = s.split_once(':').map_or((s, None), |(p, a)| (p, Some(a)));
 408
 409        let provider_lower = provider.to_lowercase();
 410        match provider_lower.as_str() {
 411            "sweep" => Ok(PredictionProvider::Sweep),
 412            "mercury" => Ok(PredictionProvider::Mercury),
 413            "zeta1" => Ok(PredictionProvider::Zeta1),
 414            "zeta2" => {
 415                let format = arg.map(ZetaFormat::parse).transpose()?.unwrap_or_default();
 416                Ok(PredictionProvider::Zeta2(format))
 417            }
 418            "teacher" => {
 419                let backend = arg
 420                    .map(|a| a.parse())
 421                    .transpose()?
 422                    .unwrap_or(TeacherBackend::default());
 423                Ok(PredictionProvider::Teacher(backend))
 424            }
 425            "teacher-multi-region" | "teacher_multi_region" => {
 426                let backend = arg
 427                    .map(|a| a.parse())
 428                    .transpose()?
 429                    .unwrap_or(TeacherBackend::default());
 430                Ok(PredictionProvider::TeacherMultiRegion(backend))
 431            }
 432            "teacher-non-batching" | "teacher_non_batching" => {
 433                let backend = arg
 434                    .map(|a| a.parse())
 435                    .transpose()?
 436                    .unwrap_or(TeacherBackend::default());
 437                Ok(PredictionProvider::TeacherNonBatching(backend))
 438            }
 439            "teacher-multi-region-non-batching" | "teacher_multi_region_non_batching" => {
 440                let backend = arg
 441                    .map(|a| a.parse())
 442                    .transpose()?
 443                    .unwrap_or(TeacherBackend::default());
 444                Ok(PredictionProvider::TeacherMultiRegionNonBatching(backend))
 445            }
 446            "repair" => Ok(PredictionProvider::Repair),
 447            "baseten" => {
 448                let format = arg
 449                    .map(ZetaFormat::parse)
 450                    .transpose()?
 451                    .unwrap_or(ZetaFormat::default());
 452                Ok(PredictionProvider::Baseten(format))
 453            }
 454            _ => {
 455                anyhow::bail!(
 456                    "unknown provider `{provider}`. Valid options: sweep, mercury, zeta1, zeta2, zeta2:<version>, teacher, teacher:<backend>, teacher-multi-region, teacher-multi-region:<backend>, teacher-non-batching, teacher-multi-region-non-batching, repair\n\
 457                 For zeta2, you can optionally specify a version like `zeta2:ordered` or `zeta2:V0113_Ordered`.\n\
 458                 For teacher providers, you can specify a backend like `teacher:sonnet46`, `teacher-multi-region:sonnet46`, `teacher-multi-region-non-batching:sonnet46`, or `teacher:gpt52`.\n\
 459                 Available zeta versions:\n{}",
 460                    ZetaFormat::options_as_string()
 461                )
 462            }
 463        }
 464    }
 465}
 466
 467impl Serialize for PredictionProvider {
 468    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
 469    where
 470        S: Serializer,
 471    {
 472        serializer.serialize_str(&self.to_string())
 473    }
 474}
 475
 476impl<'de> Deserialize<'de> for PredictionProvider {
 477    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
 478    where
 479        D: Deserializer<'de>,
 480    {
 481        let s = String::deserialize(deserializer)?;
 482        s.parse().map_err(serde::de::Error::custom)
 483    }
 484}
 485
 486#[derive(Debug, Args, Clone)]
 487struct SynthesizeArgs {
 488    /// Repository URLs (git@github.com:owner/repo or https://...)
 489    #[clap(long, required = true, num_args = 1..)]
 490    repos: Vec<String>,
 491
 492    /// Number of examples to generate per repository
 493    #[clap(long, default_value_t = 5)]
 494    count: usize,
 495
 496    /// Maximum commits to scan per repository before giving up
 497    #[clap(long, default_value_t = 100)]
 498    max_commits: usize,
 499
 500    /// Ignore state file and reprocess all commits
 501    #[clap(long)]
 502    fresh: bool,
 503}
 504
 505#[derive(Debug, Args, Clone)]
 506struct ImportBatchArgs {
 507    /// Batch IDs to import (e.g., msgbatch_xxx for Anthropic, batch_xxx for OpenAI)
 508    #[clap(long, required = true, num_args = 1..)]
 509    batch_ids: Vec<String>,
 510    /// Which provider's batches to import (anthropic or openai)
 511    #[clap(long, default_value = "anthropic")]
 512    provider: BatchProvider,
 513}
 514
 515#[derive(Debug, Clone, Copy, PartialEq, Eq, clap::ValueEnum)]
 516enum BatchProvider {
 517    Anthropic,
 518    Openai,
 519}
 520
 521#[cfg(test)]
 522mod tests {
 523    use super::*;
 524
 525    #[test]
 526    fn prediction_provider_multi_region_non_batched_round_trips_to_primary_spelling() {
 527        let provider: PredictionProvider = "teacher-multi-region-non-batching:sonnet46"
 528            .parse()
 529            .unwrap();
 530        assert_eq!(
 531            provider,
 532            PredictionProvider::TeacherMultiRegionNonBatching(TeacherBackend::Sonnet46)
 533        );
 534        assert_eq!(
 535            provider.to_string(),
 536            "teacher-multi-region-non-batching:sonnet46"
 537        );
 538    }
 539
 540    #[test]
 541    fn prediction_provider_multi_region_non_batched_alias_round_trips_to_primary_spelling() {
 542        let provider: PredictionProvider =
 543            "teacher_multi_region_non_batching:gpt52".parse().unwrap();
 544        assert_eq!(
 545            provider,
 546            PredictionProvider::TeacherMultiRegionNonBatching(TeacherBackend::Gpt52)
 547        );
 548        assert_eq!(
 549            provider.to_string(),
 550            "teacher-multi-region-non-batching:gpt52"
 551        );
 552    }
 553}
 554
 555impl EpArgs {
 556    fn output_path(&self) -> Option<PathBuf> {
 557        if self.in_place {
 558            if self.inputs.len() == 1 {
 559                self.inputs.first().cloned()
 560            } else {
 561                panic!("--in-place requires exactly one input file")
 562            }
 563        } else {
 564            self.output.clone()
 565        }
 566    }
 567}
 568
 569/// Minimum Zed version required for Snowflake queries.
 570/// This version introduced the current request schema with predicted edits in the edit
 571/// history, and open source repos distinguished.
 572const MIN_CAPTURE_VERSION: pull_examples::MinCaptureVersion = pull_examples::MinCaptureVersion {
 573    minor: 224,
 574    patch: 1,
 575};
 576
 577fn deduplicate_examples(examples: &mut Vec<Example>, max_per_cluster: usize) {
 578    let total_before_exact = examples.len();
 579    let mut seen_positions = HashSet::default();
 580    examples.retain(|example| seen_positions.insert(example.spec.cursor_position.clone()));
 581    log::info!(
 582        "exact duplicate filter: {total_before_exact} examples → {} examples ({} removed)",
 583        examples.len(),
 584        total_before_exact - examples.len(),
 585    );
 586
 587    const JACCARD_THRESHOLD: f64 = 0.5;
 588    const NUM_HASHES: usize = 128;
 589    const TOKEN_NGRAM_SIZE: usize = 5;
 590
 591    let (num_bands, band_width) = calculate_minhash_params(JACCARD_THRESHOLD, NUM_HASHES);
 592    let num_hashes = num_bands * band_width;
 593    let minhasher = MinHasher32::new(num_hashes);
 594    let mut index: MinHashIndex<u32, usize> =
 595        MinHashIndex::new(num_bands, band_width, JACCARD_THRESHOLD);
 596
 597    let signatures: Vec<Vec<u32>> = examples
 598        .iter()
 599        .map(|example| {
 600            let shingles = code_token_ngrams(&example.spec.cursor_position, TOKEN_NGRAM_SIZE);
 601            minhasher.create_signature(shingles.iter())
 602        })
 603        .collect();
 604
 605    for (id, signature) in signatures.iter().enumerate() {
 606        index.insert(id, signature.clone());
 607    }
 608
 609    // Build clusters via union-find on LSH candidate pairs.
 610    let mut parent: Vec<usize> = (0..examples.len()).collect();
 611
 612    fn find(parent: &mut Vec<usize>, mut x: usize) -> usize {
 613        while parent[x] != x {
 614            parent[x] = parent[parent[x]];
 615            x = parent[x];
 616        }
 617        x
 618    }
 619
 620    for (id, signature) in signatures.iter().enumerate() {
 621        for candidate in index.query_owned(signature) {
 622            let (a, b) = (find(&mut parent, id), find(&mut parent, candidate));
 623            if a != b {
 624                parent[a] = b;
 625            }
 626        }
 627    }
 628
 629    let mut clusters: HashMap<usize, Vec<usize>> = HashMap::default();
 630    for id in 0..examples.len() {
 631        clusters.entry(find(&mut parent, id)).or_default().push(id);
 632    }
 633
 634    let mut keep: HashSet<usize> = HashSet::default();
 635    for members in clusters.values() {
 636        let selected = greedy_max_min_diverse(members, &signatures, max_per_cluster);
 637        keep.extend(selected);
 638    }
 639
 640    let total = examples.len();
 641    let mut kept_indices: Vec<usize> = keep.into_iter().collect();
 642    kept_indices.sort();
 643
 644    let mut retained = Vec::with_capacity(kept_indices.len());
 645    for index in kept_indices.into_iter().rev() {
 646        retained.push(examples.swap_remove(index));
 647    }
 648    retained.reverse();
 649
 650    *examples = retained;
 651    log::info!(
 652        "near-duplicate filter: {total} examples → {} examples ({} removed)",
 653        examples.len(),
 654        total - examples.len(),
 655    );
 656}
 657
 658fn greedy_max_min_diverse(members: &[usize], signatures: &[Vec<u32>], k: usize) -> Vec<usize> {
 659    if members.len() <= k {
 660        return members.to_vec();
 661    }
 662
 663    let mut selected = vec![members[0]];
 664    let mut min_dist: HashMap<usize, f64> = HashMap::default();
 665    for &member in &members[1..] {
 666        let dist = 1.0 - compute_minhash_similarity(&signatures[selected[0]], &signatures[member]);
 667        min_dist.insert(member, dist);
 668    }
 669
 670    while selected.len() < k {
 671        let &best = min_dist
 672            .iter()
 673            .max_by(|(_, a), (_, b)| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal))
 674            .map(|(id, _)| id)
 675            .expect("min_dist should not be empty when selected.len() < k");
 676        selected.push(best);
 677        min_dist.remove(&best);
 678
 679        let best_sig = &signatures[best];
 680        for (member, current_min) in min_dist.iter_mut() {
 681            let dist = 1.0 - compute_minhash_similarity(best_sig, &signatures[*member]);
 682            if dist < *current_min {
 683                *current_min = dist;
 684            }
 685        }
 686    }
 687
 688    selected
 689}
 690
 691fn code_token_ngrams(code: &str, ngram_size: usize) -> Vec<String> {
 692    let tokens: Vec<&str> = word_diff::tokenize(code)
 693        .into_iter()
 694        .filter(|t| !t.trim().is_empty())
 695        .collect();
 696
 697    if tokens.len() < ngram_size {
 698        return vec![tokens.join("\0")];
 699    }
 700
 701    tokens
 702        .windows(ngram_size)
 703        .map(|window| window.join("\0"))
 704        .collect()
 705}
 706
 707async fn load_examples(
 708    http_client: Arc<dyn http_client::HttpClient>,
 709    args: &EpArgs,
 710    output_path: Option<&PathBuf>,
 711    background_executor: BackgroundExecutor,
 712) -> anyhow::Result<Vec<Example>> {
 713    let mut captured_after_timestamps = Vec::new();
 714    let mut rejected_after_timestamps = Vec::new();
 715    let mut requested_after_timestamps = Vec::new();
 716    let mut settled_after_timestamps = Vec::new();
 717    let mut rated_after_inputs: Vec<(String, Option<telemetry_events::EditPredictionRating>)> =
 718        Vec::new();
 719    let mut file_inputs = Vec::new();
 720
 721    for input in &args.inputs {
 722        let input_string = input.to_string_lossy();
 723        if let Some(timestamp) = pull_examples::parse_captured_after_input(input_string.as_ref()) {
 724            captured_after_timestamps.push(timestamp.to_string());
 725        } else if let Some(timestamp) =
 726            pull_examples::parse_rejected_after_input(input_string.as_ref())
 727        {
 728            rejected_after_timestamps.push(timestamp.to_string());
 729        } else if let Some(timestamp) =
 730            pull_examples::parse_requested_after_input(input_string.as_ref())
 731        {
 732            requested_after_timestamps.push(timestamp.to_string());
 733        } else if let Some(timestamp) = parse_settled_after_input(input_string.as_ref()) {
 734            settled_after_timestamps.push(timestamp.to_string());
 735        } else if let Some((timestamp, rating_filter)) =
 736            pull_examples::parse_rated_after_input(input_string.as_ref())
 737        {
 738            rated_after_inputs.push((timestamp.to_string(), rating_filter));
 739        } else {
 740            file_inputs.push(input.clone());
 741        }
 742    }
 743
 744    let mut examples = read_example_files(&file_inputs);
 745
 746    // Apply offset to file examples first, then pass remaining offset to Snowflake.
 747    let file_example_count = examples.len();
 748    let remaining_offset = if let Some(offset) = args.offset {
 749        if offset >= file_example_count {
 750            examples.clear();
 751            offset - file_example_count
 752        } else {
 753            examples.splice(0..offset, []);
 754            0
 755        }
 756    } else {
 757        0
 758    };
 759
 760    Progress::global().set_total_examples(examples.len());
 761
 762    let remaining_limit_for_snowflake =
 763        args.limit.map(|limit| limit.saturating_sub(examples.len()));
 764
 765    if let Some(0) = remaining_limit_for_snowflake {
 766        log::info!(
 767            "skipping Snowflake inputs because --limit is already satisfied by example files"
 768        );
 769    } else {
 770        let max_rows_per_timestamp = remaining_limit_for_snowflake.unwrap_or(5000);
 771
 772        if !rejected_after_timestamps.is_empty() {
 773            rejected_after_timestamps.sort();
 774
 775            let mut rejected_examples = pull_examples::fetch_rejected_examples_after(
 776                http_client.clone(),
 777                &rejected_after_timestamps,
 778                max_rows_per_timestamp,
 779                remaining_offset,
 780                background_executor.clone(),
 781                Some(MIN_CAPTURE_VERSION),
 782            )
 783            .await?;
 784            examples.append(&mut rejected_examples);
 785        }
 786
 787        if !requested_after_timestamps.is_empty() {
 788            requested_after_timestamps.sort();
 789
 790            let mut requested_examples = pull_examples::fetch_requested_examples_after(
 791                http_client.clone(),
 792                &requested_after_timestamps,
 793                max_rows_per_timestamp,
 794                remaining_offset,
 795                background_executor.clone(),
 796                Some(MIN_CAPTURE_VERSION),
 797            )
 798            .await?;
 799            examples.append(&mut requested_examples);
 800        }
 801
 802        if !captured_after_timestamps.is_empty() {
 803            captured_after_timestamps.sort();
 804
 805            let mut captured_examples = pull_examples::fetch_captured_examples_after(
 806                http_client.clone(),
 807                &captured_after_timestamps,
 808                max_rows_per_timestamp,
 809                remaining_offset,
 810                background_executor.clone(),
 811                Some(MIN_CAPTURE_VERSION),
 812            )
 813            .await?;
 814            examples.append(&mut captured_examples);
 815        }
 816
 817        if !settled_after_timestamps.is_empty() {
 818            settled_after_timestamps.sort();
 819
 820            let mut settled_examples = fetch_settled_examples_after(
 821                http_client.clone(),
 822                &settled_after_timestamps,
 823                max_rows_per_timestamp,
 824                remaining_offset,
 825                background_executor.clone(),
 826                Some(MIN_CAPTURE_VERSION),
 827            )
 828            .await?;
 829            examples.append(&mut settled_examples);
 830        }
 831
 832        if !rated_after_inputs.is_empty() {
 833            rated_after_inputs.sort();
 834
 835            let mut rated_examples = pull_examples::fetch_rated_examples_after(
 836                http_client,
 837                &rated_after_inputs,
 838                max_rows_per_timestamp,
 839                remaining_offset,
 840                background_executor,
 841                Some(MIN_CAPTURE_VERSION),
 842            )
 843            .await?;
 844            examples.append(&mut rated_examples);
 845        }
 846    }
 847
 848    crate::example::sort_examples_by_repo_and_rev(&mut examples);
 849
 850    if let Some(name_filter) = &args.name {
 851        examples.retain(|example| example.spec.name.contains(name_filter));
 852    }
 853    if let Some(repo_filter) = &args.repo {
 854        examples.retain(|example| example.spec.repository_url.contains(repo_filter));
 855    }
 856
 857    // Skip resume logic for --in-place since input and output are the same file,
 858    // which would incorrectly treat all input examples as already processed.
 859    if !args.in_place {
 860        if let Some(path) = output_path
 861            && let Some(command) = &args.command
 862        {
 863            resume_from_output(path, &mut examples, command);
 864        }
 865    }
 866
 867    if let Some(max_duplicates) = args.max_duplicates {
 868        deduplicate_examples(&mut examples, max_duplicates);
 869    }
 870
 871    if let Some(limit) = args.limit {
 872        examples.truncate(limit);
 873    }
 874
 875    let progress = Progress::global();
 876    progress.set_total_examples(examples.len());
 877    progress.set_max_example_name_len(examples.iter().map(|e| &e.spec.name));
 878
 879    Ok(examples)
 880}
 881
 882fn spec_hash(spec: &edit_prediction::example_spec::ExampleSpec) -> u64 {
 883    let mut hasher = collections::FxHasher::default();
 884    spec.hash(&mut hasher);
 885    hasher.finish()
 886}
 887
 888fn resume_from_output(path: &PathBuf, examples: &mut Vec<Example>, command: &Command) {
 889    let file = match File::open(path) {
 890        Ok(f) => f,
 891        Err(_) => return,
 892    };
 893
 894    let input_hashes: HashSet<u64> = examples.iter().map(|e| spec_hash(&e.spec)).collect();
 895
 896    let reader = BufReader::new(file);
 897    let mut kept_lines = Vec::new();
 898    let mut kept_hashes = HashSet::default();
 899
 900    for line in reader.lines() {
 901        let line = match line {
 902            Ok(l) => l,
 903            Err(_) => continue,
 904        };
 905
 906        if let Ok(output_example) = serde_json::from_str::<Example>(&line) {
 907            let hash = spec_hash(&output_example.spec);
 908            if input_hashes.contains(&hash) && !kept_hashes.contains(&hash) {
 909                let is_complete = match command {
 910                    Command::Qa(_) => output_example
 911                        .qa
 912                        .first()
 913                        .and_then(|q| q.as_ref())
 914                        .and_then(|q| q.confidence)
 915                        .is_some(),
 916                    Command::Repair(_) => output_example.predictions.iter().any(|p| {
 917                        p.provider == PredictionProvider::Repair && p.actual_patch.is_some()
 918                    }),
 919                    _ => true,
 920                };
 921                if is_complete {
 922                    kept_hashes.insert(hash);
 923                    kept_lines.push(line);
 924                }
 925            }
 926        }
 927    }
 928
 929    let total = examples.len();
 930    let already_processed = kept_hashes.len();
 931
 932    eprintln!(
 933        "Resuming: {}/{} examples already processed",
 934        already_processed, total
 935    );
 936
 937    let file = OpenOptions::new()
 938        .write(true)
 939        .truncate(true)
 940        .open(path)
 941        .expect("Failed to open output file for rewriting");
 942    let mut writer = BufWriter::new(file);
 943    for line in &kept_lines {
 944        writeln!(writer, "{}", line).expect("Failed to write to output file");
 945    }
 946    writer.flush().expect("Failed to flush output file");
 947
 948    examples.retain(|e| !kept_hashes.contains(&spec_hash(&e.spec)));
 949}
 950
 951fn main() {
 952    let args = EpArgs::parse();
 953
 954    if args.printenv {
 955        ::util::shell_env::print_env();
 956        return;
 957    }
 958
 959    let output = args.output_path();
 960
 961    if args.markdown && output.is_none() {
 962        eprintln!("--markdown requires -o to specify the output directory");
 963        std::process::exit(1);
 964    }
 965
 966    let command = match &args.command {
 967        Some(cmd) => cmd.clone(),
 968        None => {
 969            EpArgs::command().print_help().unwrap();
 970            return;
 971        }
 972    };
 973
 974    match &command {
 975        Command::ImportBatch(import_args) => {
 976            smol::block_on(async {
 977                match import_args.provider {
 978                    BatchProvider::Anthropic => {
 979                        let client = anthropic_client::AnthropicClient::batch(&paths::LLM_CACHE_DB)
 980                            .expect("Failed to create Anthropic client");
 981                        if let Err(e) = client.import_batches(&import_args.batch_ids).await {
 982                            eprintln!("Error importing Anthropic batches: {:?}", e);
 983                            std::process::exit(1);
 984                        }
 985                    }
 986                    BatchProvider::Openai => {
 987                        let client = openai_client::OpenAiClient::batch(&paths::LLM_CACHE_DB)
 988                            .expect("Failed to create OpenAI client");
 989                        if let Err(e) = client.import_batches(&import_args.batch_ids).await {
 990                            eprintln!("Error importing OpenAI batches: {:?}", e);
 991                            std::process::exit(1);
 992                        }
 993                    }
 994                }
 995                println!(
 996                    "Successfully imported {} batch(es)",
 997                    import_args.batch_ids.len()
 998                );
 999            });
1000            return;
1001        }
1002        Command::Clean => {
1003            std::fs::remove_dir_all(&*paths::DATA_DIR).unwrap();
1004            return;
1005        }
1006        Command::PrintZetaFormats => {
1007            use strum::IntoEnumIterator as _;
1008            for format in ZetaFormat::iter() {
1009                println!("{}", format.to_string().to_lowercase());
1010            }
1011            return;
1012        }
1013
1014        Command::Synthesize(synth_args) => {
1015            let output_dir = if let Some(output_dir) = args.output {
1016                output_dir
1017            } else {
1018                let default_output_dir = env::current_dir()
1019                    .unwrap()
1020                    .join("crates/edit_prediction_cli/evals-generated");
1021                if default_output_dir.parent().unwrap().exists() {
1022                    std::fs::create_dir(&default_output_dir).ok();
1023                    default_output_dir
1024                } else {
1025                    panic!("output dir is required");
1026                }
1027            };
1028            let config = SynthesizeConfig {
1029                repo_urls: synth_args.repos.clone(),
1030                count: synth_args.count,
1031                max_commits: synth_args.max_commits,
1032                output_dir,
1033                fresh: synth_args.fresh,
1034            };
1035            smol::block_on(async {
1036                if let Err(e) = run_synthesize(config).await {
1037                    eprintln!("Error: {:?}", e);
1038                    std::process::exit(1);
1039                }
1040            });
1041            return;
1042        }
1043        Command::SplitCommit(split_commit_args) => {
1044            if let Err(error) = split_commit::run_split_commit(
1045                split_commit_args,
1046                &args.inputs,
1047                output.as_ref(),
1048                args.failed,
1049            ) {
1050                eprintln!("{error:#}");
1051                std::process::exit(1);
1052            }
1053            return;
1054        }
1055        Command::TruncatePatch(truncate_args) => {
1056            if let Err(error) =
1057                truncate_expected_patch::run_truncate_expected_patch(truncate_args, &args.inputs)
1058            {
1059                eprintln!("{error:#}");
1060                std::process::exit(1);
1061            }
1062            return;
1063        }
1064        Command::TokenMatchDebug(debug_args) => {
1065            if let Err(error) = run_token_match_debug(debug_args, &args.inputs) {
1066                eprintln!("{error:#}");
1067                std::process::exit(1);
1068            }
1069            return;
1070        }
1071        Command::Split(split_args) => {
1072            if let Err(error) = split_dataset::run_split(split_args, &args.inputs) {
1073                eprintln!("{error:#}");
1074                std::process::exit(1);
1075            }
1076            return;
1077        }
1078        Command::FilterLanguages(filter_args) => {
1079            if let Err(error) =
1080                run_filter_languages(filter_args, &args.inputs, args.output.as_ref())
1081            {
1082                eprintln!("{error:#}");
1083                std::process::exit(1);
1084            }
1085            return;
1086        }
1087
1088        _ => {}
1089    }
1090
1091    let http_client = Arc::new(ReqwestClient::new());
1092    let app = gpui_platform::headless().with_http_client(http_client);
1093
1094    app.run(move |cx| {
1095        let app_state = Arc::new(headless::init(cx));
1096        EditPredictionStore::global(&app_state.client, &app_state.user_store, cx);
1097
1098        cx.spawn(async move |cx| {
1099            let result = async {
1100                let examples = load_examples(
1101                    app_state.client.http_client(),
1102                    &args,
1103                    output.as_ref(),
1104                    cx.background_executor().clone(),
1105                )
1106                .await?;
1107
1108                match &command {
1109                    Command::Predict(args) | Command::Score(args) => {
1110                        predict::sync_batches(args.provider.as_ref()).await?;
1111                    }
1112                    Command::Eval(args) => {
1113                        predict::sync_batches(args.predict.provider.as_ref()).await?;
1114                    }
1115                    Command::Qa(args) => {
1116                        qa::sync_batches(args).await?;
1117                    }
1118                    Command::Repair(args) => {
1119                        repair::sync_batches(args).await?;
1120                    }
1121                    _ => (),
1122                }
1123
1124                let failfast_on_single_example = examples.len() == 1;
1125
1126                // For --markdown mode, create the output directory if it doesn't exist
1127                if args.markdown {
1128                    let dir = output.as_ref().expect("--markdown requires -o");
1129                    if !dir.exists() {
1130                        std::fs::create_dir_all(dir)
1131                            .expect("Failed to create markdown output directory");
1132                    }
1133                }
1134
1135                // Set up JSONL output writer (not used in markdown mode)
1136                let mut output_sender: Option<mpsc::UnboundedSender<String>> = None;
1137                let mut in_place_temp_path: Option<PathBuf> = None;
1138                if !args.markdown
1139                    && let Some(output_path) = output.as_ref()
1140                {
1141                    let write_path = if args.in_place {
1142                        let temp = output_path.with_extension("jsonl.tmp");
1143                        in_place_temp_path = Some(temp.clone());
1144                        temp
1145                    } else {
1146                        output_path.clone()
1147                    };
1148
1149                    let file = OpenOptions::new()
1150                        .create(true)
1151                        .write(true)
1152                        .truncate(args.in_place)
1153                        .append(!args.in_place)
1154                        .open(&write_path)
1155                        .expect("Failed to open output file");
1156
1157                    let mut writer = BufWriter::new(file);
1158                    let (sender, mut receiver) = mpsc::unbounded::<String>();
1159                    cx.background_spawn(async move {
1160                        while let Some(line) = receiver.next().await {
1161                            writeln!(writer, "{}", line).expect("Failed to write example");
1162                            writer.flush().expect("Failed to flush output");
1163                        }
1164                    })
1165                    .detach();
1166                    output_sender = Some(sender);
1167                }
1168
1169                let grouped_examples = Mutex::new(group_examples_by_repo(examples));
1170                let finished_examples = Mutex::new(Vec::new());
1171
1172                let mut tasks = Vec::new();
1173                for _ in 0..args.max_parallelism {
1174                    tasks.push(async {
1175                        loop {
1176                            let Some(mut repo_examples) =
1177                                grouped_examples.lock().unwrap().pop_front()
1178                            else {
1179                                break;
1180                            };
1181                            for example in &mut repo_examples {
1182                                let example_progress =
1183                                    Progress::global().start_group(&example.spec.name);
1184
1185                                let result = async {
1186                                    match &command {
1187                                        Command::Read(_) => {}
1188                                        Command::LoadProject => {
1189                                            run_load_project(
1190                                                example,
1191                                                app_state.clone(),
1192                                                &example_progress,
1193                                                cx.clone(),
1194                                            )
1195                                            .await?;
1196                                        }
1197                                        Command::Context => {
1198                                            run_context_retrieval(
1199                                                example,
1200                                                app_state.clone(),
1201                                                &example_progress,
1202                                                cx.clone(),
1203                                            )
1204                                            .await?;
1205                                        }
1206                                        Command::FormatPrompt(args) => {
1207                                            run_format_prompt(
1208                                                example,
1209                                                args,
1210                                                app_state.clone(),
1211                                                &example_progress,
1212                                                cx.clone(),
1213                                            )
1214                                            .await?;
1215                                        }
1216                                        Command::Predict(args) => {
1217                                            run_prediction(
1218                                                example,
1219                                                args,
1220                                                app_state.clone(),
1221                                                &example_progress,
1222                                                cx.clone(),
1223                                            )
1224                                            .await?;
1225                                        }
1226                                        Command::ParseOutput => {
1227                                            parse_output::run_parse_output(example)?;
1228                                        }
1229                                        Command::Distill => {
1230                                            run_distill(example).await?;
1231                                        }
1232                                        Command::Score(args) => {
1233                                            run_scoring(
1234                                                example,
1235                                                args,
1236                                                app_state.clone(),
1237                                                &example_progress,
1238                                                cx.clone(),
1239                                            )
1240                                            .await?;
1241                                        }
1242                                        Command::Eval(args) => {
1243                                            run_scoring(
1244                                                example,
1245                                                &args.predict,
1246                                                app_state.clone(),
1247                                                &example_progress,
1248                                                cx.clone(),
1249                                            )
1250                                            .await?;
1251                                        }
1252                                        Command::Qa(args) => {
1253                                            qa::run_qa(example, args, &example_progress).await?;
1254                                        }
1255                                        Command::Repair(args) => {
1256                                            repair::run_repair(example, args, &example_progress)
1257                                                .await?;
1258                                        }
1259                                        Command::Clean
1260                                        | Command::Synthesize(_)
1261                                        | Command::SplitCommit(_)
1262                                        | Command::Split(_)
1263                                        | Command::TruncatePatch(_)
1264                                        | Command::TokenMatchDebug(_)
1265                                        | Command::FilterLanguages(_)
1266                                        | Command::ImportBatch(_)
1267                                        | Command::PrintZetaFormats => {
1268                                            unreachable!()
1269                                        }
1270                                    }
1271                                    anyhow::Ok(())
1272                                }
1273                                .await;
1274
1275                                let failed = if let Err(error) = result {
1276                                    handle_error(
1277                                        error,
1278                                        &args,
1279                                        &command,
1280                                        &app_state,
1281                                        failfast_on_single_example,
1282                                        &example,
1283                                    )
1284                                    .await;
1285                                    true
1286                                } else {
1287                                    false
1288                                };
1289
1290                                let should_write = !failed || args.failed == FailedHandling::Keep;
1291                                if should_write {
1292                                    if args.markdown {
1293                                        let markdown_dir =
1294                                            output.as_ref().expect("--markdown requires -o");
1295                                        let filename = format!("{}.md", example.spec.filename());
1296                                        let path = markdown_dir.join(&filename);
1297                                        let markdown = example.spec.to_markdown();
1298                                        std::fs::write(&path, &markdown)
1299                                            .expect("Failed to write markdown file");
1300                                    } else if let Some(ref mut sender) = output_sender.clone() {
1301                                        let line = serde_json::to_string(&example).unwrap();
1302                                        sender
1303                                            .send(line)
1304                                            .await
1305                                            .expect("Failed to send to output writer");
1306                                    } else if args.output.is_none()
1307                                        && !matches!(command, Command::Eval(_))
1308                                    {
1309                                        let line = serde_json::to_string(&example).unwrap();
1310                                        println!("{}", line);
1311                                    }
1312                                }
1313                            }
1314
1315                            let project = repo_examples
1316                                .iter()
1317                                .find_map(|e| e.state.as_ref().map(|s| s.project.clone()));
1318
1319                            if let Some(project) = project {
1320                                let mut cx = cx.clone();
1321
1322                                let shutdown_task: Task<()> =
1323                                    project.update(&mut cx, |project, cx| {
1324                                        let lsp_store = project.lsp_store();
1325                                        lsp_store.update(cx, |lsp_store, cx| {
1326                                            lsp_store.shutdown_all_language_servers(cx)
1327                                        })
1328                                    });
1329
1330                                shutdown_task.await;
1331
1332                                if let Some(ep_store) =
1333                                    cx.update(|cx| EditPredictionStore::try_global(cx))
1334                                {
1335                                    ep_store.update(&mut cx, |store, _| {
1336                                        store.remove_project(&project);
1337                                    });
1338                                }
1339                            }
1340
1341                            for example in &mut repo_examples {
1342                                example.state.take();
1343                            }
1344                            finished_examples
1345                                .lock()
1346                                .unwrap()
1347                                .extend_from_slice(&repo_examples);
1348                        }
1349                    });
1350                }
1351                futures::future::join_all(tasks).await;
1352
1353                Progress::global().finalize();
1354
1355                match &command {
1356                    Command::Predict(args) | Command::Score(args) => {
1357                        predict::sync_batches(args.provider.as_ref()).await?;
1358                    }
1359                    Command::Eval(args) => {
1360                        predict::sync_batches(args.predict.provider.as_ref()).await?;
1361                    }
1362                    Command::Qa(args) => {
1363                        qa::sync_batches(args).await?;
1364                    }
1365                    Command::Repair(args) => {
1366                        repair::sync_batches(args).await?;
1367                    }
1368                    _ => (),
1369                }
1370
1371                match &command {
1372                    Command::Eval(args) => {
1373                        let examples = finished_examples.lock().unwrap();
1374                        score::print_report(&examples, args.verbose);
1375                        if let Some(summary_path) = &args.summary_json {
1376                            score::write_summary_json(&examples, summary_path)?;
1377                        }
1378                    }
1379                    Command::Repair(args) => {
1380                        let examples = finished_examples.lock().unwrap();
1381                        repair::print_report(&examples, args.confidence_threshold);
1382                    }
1383                    _ => (),
1384                };
1385
1386                // For --in-place, atomically rename temp file to original
1387                if let Some(temp_path) = &in_place_temp_path {
1388                    let final_path = output.as_ref().expect("in_place_temp_path requires output");
1389                    std::fs::rename(temp_path, final_path)
1390                        .expect("Failed to rename temp file to final output");
1391                }
1392
1393                anyhow::Ok(())
1394            }
1395            .await;
1396
1397            if let Err(e) = result {
1398                panic!("Fatal error: {:?}", e);
1399            }
1400
1401            let _ = cx.update(|cx| cx.quit());
1402        })
1403        .detach();
1404    });
1405}
1406
1407async fn handle_error(
1408    error: anyhow::Error,
1409    args: &EpArgs,
1410    command: &Command,
1411    app_state: &Arc<headless::EpAppState>,
1412    failfast_on_single_example: bool,
1413    example: &Example,
1414) {
1415    Progress::global().increment_failed();
1416
1417    let msg;
1418    if !matches!(args.failed, FailedHandling::SkipNoFiles) {
1419        let example_name = example.spec.filename();
1420
1421        let failed_example_path = FAILED_EXAMPLES_DIR.join(format!("{}.json", example_name));
1422        app_state
1423            .fs
1424            .write(
1425                &failed_example_path,
1426                &serde_json::to_vec_pretty(&example).unwrap(),
1427            )
1428            .await
1429            .unwrap();
1430        let err_path = FAILED_EXAMPLES_DIR.join(format!("{}_err.txt", example_name));
1431        app_state
1432            .fs
1433            .write(&err_path, format!("{error:?}").as_bytes())
1434            .await
1435            .unwrap();
1436
1437        let failed_jsonl_path = RUN_DIR.join("failed.jsonl");
1438        let mut file = OpenOptions::new()
1439            .create(true)
1440            .append(true)
1441            .open(&failed_jsonl_path)
1442            .expect("Failed to open failed.jsonl");
1443        writeln!(file, "{}", serde_json::to_string(example).unwrap())
1444            .expect("Failed to write to failed.jsonl");
1445
1446        let cursor_path = match example.repo_name() {
1447            Ok(repo_name) => repo_name.worktree_path().join(&example.spec.cursor_path),
1448            Err(_) => example.spec.cursor_path.as_ref().to_path_buf(),
1449        };
1450        msg = format!(
1451            indoc::indoc! {"
1452                While processing \"{}\":
1453
1454                \x1b[31m{:?}\x1b[0m
1455
1456                Example:        \x1b[36m{}\x1b[0m
1457                Error file:     \x1b[36m{}\x1b[0m
1458                Cursor file:    \x1b[36m{}\x1b[0m
1459                Re-run:         cargo run -p edit_prediction_cli -- {} \x1b[36m{}\x1b[0m
1460            "},
1461            example.spec.name,
1462            error,
1463            failed_example_path.display(),
1464            err_path.display(),
1465            cursor_path.display(),
1466            command,
1467            failed_example_path.display(),
1468        );
1469    } else {
1470        msg = format!(
1471            indoc::indoc! {"
1472            While processing \"{}\":
1473
1474                \x1b[31m{:?}\x1b[0m
1475            "},
1476            example.spec.name, error
1477        );
1478    }
1479
1480    if args.failfast || failfast_on_single_example {
1481        Progress::global().finalize();
1482        panic!("{}", msg);
1483    } else {
1484        log::error!("{}", msg);
1485    }
1486}