main.rs

   1mod anthropic_client;
   2mod distill;
   3mod example;
   4mod filter_languages;
   5mod format_prompt;
   6mod git;
   7mod headless;
   8mod load_project;
   9mod metrics;
  10mod openai_client;
  11mod parse_output;
  12mod paths;
  13mod predict;
  14mod progress;
  15mod prompt_assets;
  16mod pull_examples;
  17mod qa;
  18mod reorder_patch;
  19mod repair;
  20mod retrieve_context;
  21mod reversal_tracking;
  22mod score;
  23mod split_commit;
  24mod split_dataset;
  25
  26mod synthesize;
  27mod truncate_expected_patch;
  28mod word_diff;
  29use clap::{Args, CommandFactory, Parser, Subcommand, ValueEnum};
  30use collections::{HashMap, HashSet};
  31use edit_prediction::EditPredictionStore;
  32use futures::channel::mpsc;
  33use futures::{SinkExt as _, StreamExt as _};
  34use gaoya::minhash::{
  35    MinHashIndex, MinHasher, MinHasher32, calculate_minhash_params, compute_minhash_similarity,
  36};
  37use gpui::{AppContext as _, BackgroundExecutor, Task};
  38use zeta_prompt::ZetaFormat;
  39
  40use reqwest_client::ReqwestClient;
  41use serde::{Deserialize, Deserializer, Serialize, Serializer};
  42use std::env;
  43use std::fmt::Display;
  44use std::fs::{File, OpenOptions};
  45use std::hash::{Hash, Hasher};
  46use std::io::{BufRead, BufReader, BufWriter, Write};
  47use std::sync::Mutex;
  48use std::{path::PathBuf, sync::Arc};
  49
  50use crate::distill::run_distill;
  51use crate::example::{Example, group_examples_by_repo, read_example_files};
  52use crate::filter_languages::{FilterLanguagesArgs, run_filter_languages};
  53use crate::format_prompt::run_format_prompt;
  54use crate::load_project::run_load_project;
  55use crate::paths::{FAILED_EXAMPLES_DIR, RUN_DIR};
  56use crate::predict::run_prediction;
  57use crate::progress::Progress;
  58use crate::pull_examples::{fetch_settled_examples_after, parse_settled_after_input};
  59use crate::retrieve_context::run_context_retrieval;
  60use crate::score::run_scoring;
  61use crate::split_commit::SplitCommitArgs;
  62use crate::split_dataset::SplitArgs;
  63use crate::synthesize::{SynthesizeConfig, run_synthesize};
  64use crate::truncate_expected_patch::TruncatePatchArgs;
  65
  66#[derive(Parser, Debug)]
  67#[command(name = "ep")]
  68struct EpArgs {
  69    #[arg(long, default_value_t = false)]
  70    printenv: bool,
  71    #[clap(long, default_value_t = 10, global = true)]
  72    max_parallelism: usize,
  73    /// The limit for the number of examples to process
  74    /// Default is unlimited for processing local datasets, 5000 when pulling from snowflake
  75    #[clap(long, global = true)]
  76    limit: Option<usize>,
  77    #[clap(long, global = true)]
  78    offset: Option<usize>,
  79    /// Filter examples by name
  80    #[clap(long, global = true)]
  81    name: Option<String>,
  82    /// Filter examples by repository
  83    #[clap(long, global = true)]
  84    repo: Option<String>,
  85    /// Deduplicate by cursor position and keep at most this many examples per cluster
  86    #[clap(long, global = true)]
  87    max_duplicates: Option<usize>,
  88    #[command(subcommand)]
  89    command: Option<Command>,
  90    /// Input file paths
  91    #[clap(global = true)]
  92    inputs: Vec<PathBuf>,
  93    #[arg(long, short, global = true)]
  94    output: Option<PathBuf>,
  95    #[arg(long, short, global = true)]
  96    in_place: bool,
  97    #[arg(long, short, global = true)]
  98    failfast: bool,
  99    /// How to handle failed examples in output: keep them or skip them.
 100    /// Failed examples are always logged to the run's failed directory.
 101    #[arg(long, global = true, default_value = "keep")]
 102    failed: FailedHandling,
 103    /// Output as markdown files instead of JSONL. When set, -o specifies a directory
 104    /// where one .md file per example will be written (named after each example).
 105    #[arg(long, short, global = true)]
 106    markdown: bool,
 107}
 108
 109/// Controls whether failed examples are included in the main output.
 110/// Failed examples are always logged to the run's failed/ directory regardless of this setting.
 111#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, ValueEnum)]
 112pub enum FailedHandling {
 113    /// Include failed examples in the main output (default)
 114    #[default]
 115    Keep,
 116    /// Exclude failed examples from the main output
 117    Skip,
 118    /// Skip writing files
 119    SkipNoFiles,
 120}
 121
 122const INPUTS_HELP: &str = r#"
 123Inputs can be file paths or special specifiers:
 124
 125  path
 126      Path to an example(s) file (.md, .json, or .jsonl)
 127
 128  captured-after:{timestamp}
 129      Fetch captured examples from Snowflake after the given RFC3339 timestamp.
 130      These are examples captured via the "Capture Edit Prediction Example" action.
 131
 132  rejected-after:{timestamp}
 133      Fetch rejected edit predictions from Snowflake after the given RFC3339 timestamp.
 134      These are predictions that were shown to users but rejected (useful for DPO training).
 135
 136  settled-after:{timestamp}
 137      Fetch settled stream examples from Snowflake after the given RFC3339 timestamp.
 138      These are examples from the edit prediction settled stream.
 139
 140  rated-after:{timestamp}
 141      Fetch user-rated edit predictions from Snowflake after the given RFC3339 timestamp.
 142      These are predictions that users explicitly rated as positive or negative via the
 143      rate completions modal. Only zeta2 predictions are included.
 144      - Positive ratings: output becomes expected_patches
 145      - Negative ratings: output becomes rejected_patch
 146
 147  rated-positive-after:{timestamp}
 148      Same as rated-after, but only fetches positively rated predictions.
 149
 150  rated-negative-after:{timestamp}
 151      Same as rated-after, but only fetches negatively rated predictions.
 152
 153      Required environment variables to connect to Snowflake:
 154          EP_SNOWFLAKE_API_KEY
 155          EP_SNOWFLAKE_BASE_URL
 156
 157      Optional:
 158          EP_SNOWFLAKE_ROLE
 159
 160Examples:
 161
 162  # Read examples from a file
 163  ep read examples.jsonl -o output.jsonl
 164
 165  # Read captured examples after a timestamp
 166  ep read captured-after:2025-01-01T00:00:00Z -o captured.jsonl
 167
 168  # Read rejected predictions for DPO training
 169  ep read rejected-after:2025-01-01T00:00:00Z -o rejected.jsonl
 170
 171  # Read user-rated predictions
 172  ep read rated-after:2025-01-01T00:00:00Z -o rated.jsonl
 173
 174  # Read settled stream examples
 175  ep read settled-after:2025-01-01T00:00:00Z -o settled.jsonl
 176
 177  # Read only positively rated predictions
 178  ep read rated-positive-after:2025-01-01T00:00:00Z -o positive.jsonl
 179
 180  # Read only negatively rated predictions
 181  ep read rated-negative-after:2025-01-01T00:00:00Z -o negative.jsonl
 182
 183  # Mix multiple input sources
 184  ep predict examples.jsonl captured-after:2025-01-01T00:00:00Z
 185"#;
 186
 187#[derive(Subcommand, Debug, Clone)]
 188enum Command {
 189    /// Read examples from files or fetch from Snowflake, output as .jsonl
 190    Read(ReadArgs),
 191    /// Create git worktrees for each example and load file contents
 192    LoadProject,
 193    /// Retrieve context for input examples.
 194    Context,
 195    /// Generate a prompt string for a specific model
 196    FormatPrompt(FormatPromptArgs),
 197    /// Runs edit prediction
 198    Predict(PredictArgs),
 199    /// Parse model outputs (actual_output) into unified diffs (actual_patch).
 200    /// Requires format-prompt to have been run first. Uses provider from prompt.
 201    ParseOutput,
 202    /// Computes a score based on actual and expected patches
 203    Score(PredictArgs),
 204    /// Prepares a distillation dataset by copying expected outputs to
 205    /// predicted outputs and removing actual outputs and prompts.
 206    Distill,
 207    /// Print aggregated scores
 208    Eval(EvalArgs),
 209    /// Generate eval examples by analyzing git commits from a repository
 210    Synthesize(SynthesizeArgs),
 211    /// Remove git repositories and worktrees
 212    Clean,
 213    /// Generate an evaluation example by splitting a chronologically-ordered commit
 214    SplitCommit(SplitCommitArgs),
 215    /// Truncate expected patch by the given criteria
 216    TruncatePatch(TruncatePatchArgs),
 217    /// Split a JSONL dataset into multiple files (stratified by repository_url if present)
 218    Split(SplitArgs),
 219    /// Filter a JSONL dataset by programming language (based on cursor_path extension)
 220    FilterLanguages(FilterLanguagesArgs),
 221    /// Import Anthropic batch results by batch IDs (useful for recovering after database loss)
 222    ImportBatch(ImportBatchArgs),
 223    /// Assess the quality of predictions using LLM-as-a-judge
 224    Qa(qa::QaArgs),
 225    /// Repair predictions that received poor QA scores by generating improved predictions
 226    Repair(repair::RepairArgs),
 227    /// Print all valid zeta formats (lowercase, one per line)
 228    PrintZetaFormats,
 229}
 230
 231impl Display for Command {
 232    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
 233        match self {
 234            Command::Read(_) => write!(f, "read"),
 235            Command::LoadProject => write!(f, "load-project"),
 236            Command::Context => write!(f, "context"),
 237            Command::FormatPrompt(args) => {
 238                write!(f, "format-prompt --provider={}", args.provider)
 239            }
 240            Command::Predict(args) => match &args.provider {
 241                Some(provider) => write!(f, "predict --provider={}", provider),
 242                None => write!(f, "predict"),
 243            },
 244            Command::ParseOutput => write!(f, "parse-output"),
 245            Command::Score(args) => match &args.provider {
 246                Some(provider) => write!(f, "score --provider={}", provider),
 247                None => write!(f, "score"),
 248            },
 249            Command::Distill => write!(f, "distill"),
 250            Command::Eval(args) => match &args.predict.provider {
 251                Some(provider) => write!(f, "eval --provider={}", provider),
 252                None => write!(f, "eval"),
 253            },
 254            Command::Synthesize(args) => {
 255                write!(f, "synthesize --repos {}", args.repos.join(" "))
 256            }
 257            Command::Clean => write!(f, "clean"),
 258            Command::SplitCommit(_) => write!(f, "split-commit"),
 259            Command::TruncatePatch(_) => write!(f, "truncate-patch"),
 260            Command::Split(_) => write!(f, "split"),
 261            Command::FilterLanguages(_) => write!(f, "filter-languages"),
 262            Command::ImportBatch(args) => {
 263                write!(f, "import-batch --batch-ids {}", args.batch_ids.join(" "))
 264            }
 265            Command::Qa(_) => {
 266                write!(f, "qa")
 267            }
 268            Command::Repair(_) => {
 269                write!(f, "repair")
 270            }
 271            Command::PrintZetaFormats => {
 272                write!(f, "print-zeta-formats")
 273            }
 274        }
 275    }
 276}
 277
 278#[derive(Debug, Args, Clone)]
 279#[command(after_help = INPUTS_HELP)]
 280struct ReadArgs {}
 281
 282#[derive(Debug, Args, Clone)]
 283struct FormatPromptArgs {
 284    #[clap(long, short('p'), default_value_t = PredictionProvider::default())]
 285    provider: PredictionProvider,
 286}
 287
 288#[derive(Debug, Args, Clone)]
 289struct PredictArgs {
 290    #[clap(long, short('p'))]
 291    provider: Option<PredictionProvider>,
 292    #[clap(long, default_value_t = 1)]
 293    repetitions: usize,
 294    /// Only use cached responses, don't queue new requests for batching
 295    #[clap(long)]
 296    cache_only: bool,
 297}
 298
 299#[derive(Debug, Args, Clone)]
 300struct EvalArgs {
 301    #[clap(flatten)]
 302    predict: PredictArgs,
 303    /// Path to write summary scores as JSON
 304    #[clap(long)]
 305    summary_json: Option<PathBuf>,
 306    /// Print all individual example lines (default: up to 20)
 307    #[clap(long)]
 308    verbose: bool,
 309}
 310
 311#[derive(Clone, Copy, Default, Debug, PartialEq, Eq, Hash)]
 312pub enum TeacherBackend {
 313    Sonnet46,
 314    #[default]
 315    Sonnet45,
 316    Gpt52,
 317}
 318
 319impl std::fmt::Display for TeacherBackend {
 320    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
 321        match self {
 322            TeacherBackend::Sonnet46 => write!(f, "sonnet46"),
 323            TeacherBackend::Sonnet45 => write!(f, "sonnet45"),
 324            TeacherBackend::Gpt52 => write!(f, "gpt52"),
 325        }
 326    }
 327}
 328
 329impl std::str::FromStr for TeacherBackend {
 330    type Err = anyhow::Error;
 331
 332    fn from_str(s: &str) -> Result<Self, Self::Err> {
 333        match s.to_lowercase().as_str() {
 334            "sonnet45" | "sonnet" | "claude" => Ok(TeacherBackend::Sonnet45),
 335            "sonnet46" => Ok(TeacherBackend::Sonnet46),
 336            "gpt52" | "gpt" | "openai" => Ok(TeacherBackend::Gpt52),
 337            "v0114180editableregion" => Ok(TeacherBackend::Sonnet45),
 338            _ => anyhow::bail!(
 339                "unknown teacher backend `{s}`. Valid options: sonnet45, sonnet46, gpt52"
 340            ),
 341        }
 342    }
 343}
 344
 345impl TeacherBackend {
 346    pub fn model_name(&self) -> &'static str {
 347        match self {
 348            TeacherBackend::Sonnet45 => "claude-sonnet-4-5",
 349            TeacherBackend::Sonnet46 => "claude-sonnet-4-6",
 350            TeacherBackend::Gpt52 => "gpt-5.2",
 351        }
 352    }
 353}
 354
 355#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
 356enum PredictionProvider {
 357    Sweep,
 358    Mercury,
 359    Zeta1,
 360    Zeta2(ZetaFormat),
 361    Baseten(ZetaFormat),
 362    Teacher(TeacherBackend),
 363    TeacherMultiRegion(TeacherBackend),
 364    TeacherNonBatching(TeacherBackend),
 365    TeacherMultiRegionNonBatching(TeacherBackend),
 366    Repair,
 367}
 368
 369impl Default for PredictionProvider {
 370    fn default() -> Self {
 371        PredictionProvider::Zeta2(ZetaFormat::default())
 372    }
 373}
 374
 375impl std::fmt::Display for PredictionProvider {
 376    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
 377        match self {
 378            PredictionProvider::Sweep => write!(f, "sweep"),
 379            PredictionProvider::Mercury => write!(f, "mercury"),
 380            PredictionProvider::Zeta1 => write!(f, "zeta1"),
 381            PredictionProvider::Zeta2(format) => write!(f, "zeta2:{format}"),
 382            PredictionProvider::Baseten(format) => write!(f, "baseten:{format}"),
 383            PredictionProvider::Teacher(backend) => write!(f, "teacher:{backend}"),
 384            PredictionProvider::TeacherMultiRegion(backend) => {
 385                write!(f, "teacher-multi-region:{backend}")
 386            }
 387            PredictionProvider::TeacherNonBatching(backend) => {
 388                write!(f, "teacher-non-batching:{backend}")
 389            }
 390            PredictionProvider::TeacherMultiRegionNonBatching(backend) => {
 391                write!(f, "teacher-multi-region-non-batching:{backend}")
 392            }
 393            PredictionProvider::Repair => write!(f, "repair"),
 394        }
 395    }
 396}
 397
 398impl std::str::FromStr for PredictionProvider {
 399    type Err = anyhow::Error;
 400
 401    fn from_str(s: &str) -> Result<Self, Self::Err> {
 402        let (provider, arg) = s.split_once(':').map_or((s, None), |(p, a)| (p, Some(a)));
 403
 404        let provider_lower = provider.to_lowercase();
 405        match provider_lower.as_str() {
 406            "sweep" => Ok(PredictionProvider::Sweep),
 407            "mercury" => Ok(PredictionProvider::Mercury),
 408            "zeta1" => Ok(PredictionProvider::Zeta1),
 409            "zeta2" => {
 410                let format = arg.map(ZetaFormat::parse).transpose()?.unwrap_or_default();
 411                Ok(PredictionProvider::Zeta2(format))
 412            }
 413            "teacher" => {
 414                let backend = arg
 415                    .map(|a| a.parse())
 416                    .transpose()?
 417                    .unwrap_or(TeacherBackend::default());
 418                Ok(PredictionProvider::Teacher(backend))
 419            }
 420            "teacher-multi-region" | "teacher_multi_region" => {
 421                let backend = arg
 422                    .map(|a| a.parse())
 423                    .transpose()?
 424                    .unwrap_or(TeacherBackend::default());
 425                Ok(PredictionProvider::TeacherMultiRegion(backend))
 426            }
 427            "teacher-non-batching" | "teacher_non_batching" => {
 428                let backend = arg
 429                    .map(|a| a.parse())
 430                    .transpose()?
 431                    .unwrap_or(TeacherBackend::default());
 432                Ok(PredictionProvider::TeacherNonBatching(backend))
 433            }
 434            "teacher-multi-region-non-batching" | "teacher_multi_region_non_batching" => {
 435                let backend = arg
 436                    .map(|a| a.parse())
 437                    .transpose()?
 438                    .unwrap_or(TeacherBackend::default());
 439                Ok(PredictionProvider::TeacherMultiRegionNonBatching(backend))
 440            }
 441            "repair" => Ok(PredictionProvider::Repair),
 442            "baseten" => {
 443                let format = arg
 444                    .map(ZetaFormat::parse)
 445                    .transpose()?
 446                    .unwrap_or(ZetaFormat::default());
 447                Ok(PredictionProvider::Baseten(format))
 448            }
 449            _ => {
 450                anyhow::bail!(
 451                    "unknown provider `{provider}`. Valid options: sweep, mercury, zeta1, zeta2, zeta2:<version>, teacher, teacher:<backend>, teacher-multi-region, teacher-multi-region:<backend>, teacher-non-batching, teacher-multi-region-non-batching, repair\n\
 452                 For zeta2, you can optionally specify a version like `zeta2:ordered` or `zeta2:V0113_Ordered`.\n\
 453                 For teacher providers, you can specify a backend like `teacher:sonnet46`, `teacher-multi-region:sonnet46`, `teacher-multi-region-non-batching:sonnet46`, or `teacher:gpt52`.\n\
 454                 Available zeta versions:\n{}",
 455                    ZetaFormat::options_as_string()
 456                )
 457            }
 458        }
 459    }
 460}
 461
 462impl Serialize for PredictionProvider {
 463    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
 464    where
 465        S: Serializer,
 466    {
 467        serializer.serialize_str(&self.to_string())
 468    }
 469}
 470
 471impl<'de> Deserialize<'de> for PredictionProvider {
 472    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
 473    where
 474        D: Deserializer<'de>,
 475    {
 476        let s = String::deserialize(deserializer)?;
 477        s.parse().map_err(serde::de::Error::custom)
 478    }
 479}
 480
 481#[derive(Debug, Args, Clone)]
 482struct SynthesizeArgs {
 483    /// Repository URLs (git@github.com:owner/repo or https://...)
 484    #[clap(long, required = true, num_args = 1..)]
 485    repos: Vec<String>,
 486
 487    /// Number of examples to generate per repository
 488    #[clap(long, default_value_t = 5)]
 489    count: usize,
 490
 491    /// Maximum commits to scan per repository before giving up
 492    #[clap(long, default_value_t = 100)]
 493    max_commits: usize,
 494
 495    /// Ignore state file and reprocess all commits
 496    #[clap(long)]
 497    fresh: bool,
 498}
 499
 500#[derive(Debug, Args, Clone)]
 501struct ImportBatchArgs {
 502    /// Batch IDs to import (e.g., msgbatch_xxx for Anthropic, batch_xxx for OpenAI)
 503    #[clap(long, required = true, num_args = 1..)]
 504    batch_ids: Vec<String>,
 505    /// Which provider's batches to import (anthropic or openai)
 506    #[clap(long, default_value = "anthropic")]
 507    provider: BatchProvider,
 508}
 509
 510#[derive(Debug, Clone, Copy, PartialEq, Eq, clap::ValueEnum)]
 511enum BatchProvider {
 512    Anthropic,
 513    Openai,
 514}
 515
 516#[cfg(test)]
 517mod tests {
 518    use super::*;
 519
 520    #[test]
 521    fn prediction_provider_multi_region_non_batched_round_trips_to_primary_spelling() {
 522        let provider: PredictionProvider = "teacher-multi-region-non-batching:sonnet46"
 523            .parse()
 524            .unwrap();
 525        assert_eq!(
 526            provider,
 527            PredictionProvider::TeacherMultiRegionNonBatching(TeacherBackend::Sonnet46)
 528        );
 529        assert_eq!(
 530            provider.to_string(),
 531            "teacher-multi-region-non-batching:sonnet46"
 532        );
 533    }
 534
 535    #[test]
 536    fn prediction_provider_multi_region_non_batched_alias_round_trips_to_primary_spelling() {
 537        let provider: PredictionProvider =
 538            "teacher_multi_region_non_batching:gpt52".parse().unwrap();
 539        assert_eq!(
 540            provider,
 541            PredictionProvider::TeacherMultiRegionNonBatching(TeacherBackend::Gpt52)
 542        );
 543        assert_eq!(
 544            provider.to_string(),
 545            "teacher-multi-region-non-batching:gpt52"
 546        );
 547    }
 548}
 549
 550impl EpArgs {
 551    fn output_path(&self) -> Option<PathBuf> {
 552        if self.in_place {
 553            if self.inputs.len() == 1 {
 554                self.inputs.first().cloned()
 555            } else {
 556                panic!("--in-place requires exactly one input file")
 557            }
 558        } else {
 559            self.output.clone()
 560        }
 561    }
 562}
 563
 564/// Minimum Zed version required for Snowflake queries.
 565/// This version introduced the current request schema with predicted edits in the edit
 566/// history, and open source repos distinguished.
 567const MIN_CAPTURE_VERSION: pull_examples::MinCaptureVersion = pull_examples::MinCaptureVersion {
 568    minor: 224,
 569    patch: 1,
 570};
 571
 572fn deduplicate_examples(examples: &mut Vec<Example>, max_per_cluster: usize) {
 573    let total_before_exact = examples.len();
 574    let mut seen_positions = HashSet::default();
 575    examples.retain(|example| seen_positions.insert(example.spec.cursor_position.clone()));
 576    log::info!(
 577        "exact duplicate filter: {total_before_exact} examples → {} examples ({} removed)",
 578        examples.len(),
 579        total_before_exact - examples.len(),
 580    );
 581
 582    const JACCARD_THRESHOLD: f64 = 0.5;
 583    const NUM_HASHES: usize = 128;
 584    const TOKEN_NGRAM_SIZE: usize = 5;
 585
 586    let (num_bands, band_width) = calculate_minhash_params(JACCARD_THRESHOLD, NUM_HASHES);
 587    let num_hashes = num_bands * band_width;
 588    let minhasher = MinHasher32::new(num_hashes);
 589    let mut index: MinHashIndex<u32, usize> =
 590        MinHashIndex::new(num_bands, band_width, JACCARD_THRESHOLD);
 591
 592    let signatures: Vec<Vec<u32>> = examples
 593        .iter()
 594        .map(|example| {
 595            let shingles = code_token_ngrams(&example.spec.cursor_position, TOKEN_NGRAM_SIZE);
 596            minhasher.create_signature(shingles.iter())
 597        })
 598        .collect();
 599
 600    for (id, signature) in signatures.iter().enumerate() {
 601        index.insert(id, signature.clone());
 602    }
 603
 604    // Build clusters via union-find on LSH candidate pairs.
 605    let mut parent: Vec<usize> = (0..examples.len()).collect();
 606
 607    fn find(parent: &mut Vec<usize>, mut x: usize) -> usize {
 608        while parent[x] != x {
 609            parent[x] = parent[parent[x]];
 610            x = parent[x];
 611        }
 612        x
 613    }
 614
 615    for (id, signature) in signatures.iter().enumerate() {
 616        for candidate in index.query_owned(signature) {
 617            let (a, b) = (find(&mut parent, id), find(&mut parent, candidate));
 618            if a != b {
 619                parent[a] = b;
 620            }
 621        }
 622    }
 623
 624    let mut clusters: HashMap<usize, Vec<usize>> = HashMap::default();
 625    for id in 0..examples.len() {
 626        clusters.entry(find(&mut parent, id)).or_default().push(id);
 627    }
 628
 629    let mut keep: HashSet<usize> = HashSet::default();
 630    for members in clusters.values() {
 631        let selected = greedy_max_min_diverse(members, &signatures, max_per_cluster);
 632        keep.extend(selected);
 633    }
 634
 635    let total = examples.len();
 636    let mut kept_indices: Vec<usize> = keep.into_iter().collect();
 637    kept_indices.sort();
 638
 639    let mut retained = Vec::with_capacity(kept_indices.len());
 640    for index in kept_indices.into_iter().rev() {
 641        retained.push(examples.swap_remove(index));
 642    }
 643    retained.reverse();
 644
 645    *examples = retained;
 646    log::info!(
 647        "near-duplicate filter: {total} examples → {} examples ({} removed)",
 648        examples.len(),
 649        total - examples.len(),
 650    );
 651}
 652
 653fn greedy_max_min_diverse(members: &[usize], signatures: &[Vec<u32>], k: usize) -> Vec<usize> {
 654    if members.len() <= k {
 655        return members.to_vec();
 656    }
 657
 658    let mut selected = vec![members[0]];
 659    let mut min_dist: HashMap<usize, f64> = HashMap::default();
 660    for &member in &members[1..] {
 661        let dist = 1.0 - compute_minhash_similarity(&signatures[selected[0]], &signatures[member]);
 662        min_dist.insert(member, dist);
 663    }
 664
 665    while selected.len() < k {
 666        let &best = min_dist
 667            .iter()
 668            .max_by(|(_, a), (_, b)| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal))
 669            .map(|(id, _)| id)
 670            .expect("min_dist should not be empty when selected.len() < k");
 671        selected.push(best);
 672        min_dist.remove(&best);
 673
 674        let best_sig = &signatures[best];
 675        for (member, current_min) in min_dist.iter_mut() {
 676            let dist = 1.0 - compute_minhash_similarity(best_sig, &signatures[*member]);
 677            if dist < *current_min {
 678                *current_min = dist;
 679            }
 680        }
 681    }
 682
 683    selected
 684}
 685
 686fn code_token_ngrams(code: &str, ngram_size: usize) -> Vec<String> {
 687    let tokens: Vec<&str> = word_diff::tokenize(code)
 688        .into_iter()
 689        .filter(|t| !t.trim().is_empty())
 690        .collect();
 691
 692    if tokens.len() < ngram_size {
 693        return vec![tokens.join("\0")];
 694    }
 695
 696    tokens
 697        .windows(ngram_size)
 698        .map(|window| window.join("\0"))
 699        .collect()
 700}
 701
 702async fn load_examples(
 703    http_client: Arc<dyn http_client::HttpClient>,
 704    args: &EpArgs,
 705    output_path: Option<&PathBuf>,
 706    background_executor: BackgroundExecutor,
 707) -> anyhow::Result<Vec<Example>> {
 708    let mut captured_after_timestamps = Vec::new();
 709    let mut rejected_after_timestamps = Vec::new();
 710    let mut requested_after_timestamps = Vec::new();
 711    let mut settled_after_timestamps = Vec::new();
 712    let mut rated_after_inputs: Vec<(String, Option<telemetry_events::EditPredictionRating>)> =
 713        Vec::new();
 714    let mut file_inputs = Vec::new();
 715
 716    for input in &args.inputs {
 717        let input_string = input.to_string_lossy();
 718        if let Some(timestamp) = pull_examples::parse_captured_after_input(input_string.as_ref()) {
 719            captured_after_timestamps.push(timestamp.to_string());
 720        } else if let Some(timestamp) =
 721            pull_examples::parse_rejected_after_input(input_string.as_ref())
 722        {
 723            rejected_after_timestamps.push(timestamp.to_string());
 724        } else if let Some(timestamp) =
 725            pull_examples::parse_requested_after_input(input_string.as_ref())
 726        {
 727            requested_after_timestamps.push(timestamp.to_string());
 728        } else if let Some(timestamp) = parse_settled_after_input(input_string.as_ref()) {
 729            settled_after_timestamps.push(timestamp.to_string());
 730        } else if let Some((timestamp, rating_filter)) =
 731            pull_examples::parse_rated_after_input(input_string.as_ref())
 732        {
 733            rated_after_inputs.push((timestamp.to_string(), rating_filter));
 734        } else {
 735            file_inputs.push(input.clone());
 736        }
 737    }
 738
 739    let mut examples = read_example_files(&file_inputs);
 740
 741    // Apply offset to file examples first, then pass remaining offset to Snowflake.
 742    let file_example_count = examples.len();
 743    let remaining_offset = if let Some(offset) = args.offset {
 744        if offset >= file_example_count {
 745            examples.clear();
 746            offset - file_example_count
 747        } else {
 748            examples.splice(0..offset, []);
 749            0
 750        }
 751    } else {
 752        0
 753    };
 754
 755    Progress::global().set_total_examples(examples.len());
 756
 757    let remaining_limit_for_snowflake =
 758        args.limit.map(|limit| limit.saturating_sub(examples.len()));
 759
 760    if let Some(0) = remaining_limit_for_snowflake {
 761        log::info!(
 762            "skipping Snowflake inputs because --limit is already satisfied by example files"
 763        );
 764    } else {
 765        let max_rows_per_timestamp = remaining_limit_for_snowflake.unwrap_or(5000);
 766
 767        if !rejected_after_timestamps.is_empty() {
 768            rejected_after_timestamps.sort();
 769
 770            let mut rejected_examples = pull_examples::fetch_rejected_examples_after(
 771                http_client.clone(),
 772                &rejected_after_timestamps,
 773                max_rows_per_timestamp,
 774                remaining_offset,
 775                background_executor.clone(),
 776                Some(MIN_CAPTURE_VERSION),
 777            )
 778            .await?;
 779            examples.append(&mut rejected_examples);
 780        }
 781
 782        if !requested_after_timestamps.is_empty() {
 783            requested_after_timestamps.sort();
 784
 785            let mut requested_examples = pull_examples::fetch_requested_examples_after(
 786                http_client.clone(),
 787                &requested_after_timestamps,
 788                max_rows_per_timestamp,
 789                remaining_offset,
 790                background_executor.clone(),
 791                Some(MIN_CAPTURE_VERSION),
 792            )
 793            .await?;
 794            examples.append(&mut requested_examples);
 795        }
 796
 797        if !captured_after_timestamps.is_empty() {
 798            captured_after_timestamps.sort();
 799
 800            let mut captured_examples = pull_examples::fetch_captured_examples_after(
 801                http_client.clone(),
 802                &captured_after_timestamps,
 803                max_rows_per_timestamp,
 804                remaining_offset,
 805                background_executor.clone(),
 806                Some(MIN_CAPTURE_VERSION),
 807            )
 808            .await?;
 809            examples.append(&mut captured_examples);
 810        }
 811
 812        if !settled_after_timestamps.is_empty() {
 813            settled_after_timestamps.sort();
 814
 815            let mut settled_examples = fetch_settled_examples_after(
 816                http_client.clone(),
 817                &settled_after_timestamps,
 818                max_rows_per_timestamp,
 819                remaining_offset,
 820                background_executor.clone(),
 821                Some(MIN_CAPTURE_VERSION),
 822            )
 823            .await?;
 824            examples.append(&mut settled_examples);
 825        }
 826
 827        if !rated_after_inputs.is_empty() {
 828            rated_after_inputs.sort();
 829
 830            let mut rated_examples = pull_examples::fetch_rated_examples_after(
 831                http_client,
 832                &rated_after_inputs,
 833                max_rows_per_timestamp,
 834                remaining_offset,
 835                background_executor,
 836                Some(MIN_CAPTURE_VERSION),
 837            )
 838            .await?;
 839            examples.append(&mut rated_examples);
 840        }
 841    }
 842
 843    crate::example::sort_examples_by_repo_and_rev(&mut examples);
 844
 845    if let Some(name_filter) = &args.name {
 846        examples.retain(|example| example.spec.name.contains(name_filter));
 847    }
 848    if let Some(repo_filter) = &args.repo {
 849        examples.retain(|example| example.spec.repository_url.contains(repo_filter));
 850    }
 851
 852    // Skip resume logic for --in-place since input and output are the same file,
 853    // which would incorrectly treat all input examples as already processed.
 854    if !args.in_place {
 855        if let Some(path) = output_path
 856            && let Some(command) = &args.command
 857        {
 858            resume_from_output(path, &mut examples, command);
 859        }
 860    }
 861
 862    if let Some(max_duplicates) = args.max_duplicates {
 863        deduplicate_examples(&mut examples, max_duplicates);
 864    }
 865
 866    if let Some(limit) = args.limit {
 867        examples.truncate(limit);
 868    }
 869
 870    let progress = Progress::global();
 871    progress.set_total_examples(examples.len());
 872    progress.set_max_example_name_len(examples.iter().map(|e| &e.spec.name));
 873
 874    Ok(examples)
 875}
 876
 877fn spec_hash(spec: &edit_prediction::example_spec::ExampleSpec) -> u64 {
 878    let mut hasher = collections::FxHasher::default();
 879    spec.hash(&mut hasher);
 880    hasher.finish()
 881}
 882
 883fn resume_from_output(path: &PathBuf, examples: &mut Vec<Example>, command: &Command) {
 884    let file = match File::open(path) {
 885        Ok(f) => f,
 886        Err(_) => return,
 887    };
 888
 889    let input_hashes: HashSet<u64> = examples.iter().map(|e| spec_hash(&e.spec)).collect();
 890
 891    let reader = BufReader::new(file);
 892    let mut kept_lines = Vec::new();
 893    let mut kept_hashes = HashSet::default();
 894
 895    for line in reader.lines() {
 896        let line = match line {
 897            Ok(l) => l,
 898            Err(_) => continue,
 899        };
 900
 901        if let Ok(output_example) = serde_json::from_str::<Example>(&line) {
 902            let hash = spec_hash(&output_example.spec);
 903            if input_hashes.contains(&hash) && !kept_hashes.contains(&hash) {
 904                let is_complete = match command {
 905                    Command::Qa(_) => output_example
 906                        .qa
 907                        .first()
 908                        .and_then(|q| q.as_ref())
 909                        .and_then(|q| q.confidence)
 910                        .is_some(),
 911                    Command::Repair(_) => output_example.predictions.iter().any(|p| {
 912                        p.provider == PredictionProvider::Repair && p.actual_patch.is_some()
 913                    }),
 914                    _ => true,
 915                };
 916                if is_complete {
 917                    kept_hashes.insert(hash);
 918                    kept_lines.push(line);
 919                }
 920            }
 921        }
 922    }
 923
 924    let total = examples.len();
 925    let already_processed = kept_hashes.len();
 926
 927    eprintln!(
 928        "Resuming: {}/{} examples already processed",
 929        already_processed, total
 930    );
 931
 932    let file = OpenOptions::new()
 933        .write(true)
 934        .truncate(true)
 935        .open(path)
 936        .expect("Failed to open output file for rewriting");
 937    let mut writer = BufWriter::new(file);
 938    for line in &kept_lines {
 939        writeln!(writer, "{}", line).expect("Failed to write to output file");
 940    }
 941    writer.flush().expect("Failed to flush output file");
 942
 943    examples.retain(|e| !kept_hashes.contains(&spec_hash(&e.spec)));
 944}
 945
 946fn main() {
 947    let args = EpArgs::parse();
 948
 949    if args.printenv {
 950        ::util::shell_env::print_env();
 951        return;
 952    }
 953
 954    let output = args.output_path();
 955
 956    if args.markdown && output.is_none() {
 957        eprintln!("--markdown requires -o to specify the output directory");
 958        std::process::exit(1);
 959    }
 960
 961    let command = match &args.command {
 962        Some(cmd) => cmd.clone(),
 963        None => {
 964            EpArgs::command().print_help().unwrap();
 965            return;
 966        }
 967    };
 968
 969    match &command {
 970        Command::ImportBatch(import_args) => {
 971            smol::block_on(async {
 972                match import_args.provider {
 973                    BatchProvider::Anthropic => {
 974                        let client = anthropic_client::AnthropicClient::batch(&paths::LLM_CACHE_DB)
 975                            .expect("Failed to create Anthropic client");
 976                        if let Err(e) = client.import_batches(&import_args.batch_ids).await {
 977                            eprintln!("Error importing Anthropic batches: {:?}", e);
 978                            std::process::exit(1);
 979                        }
 980                    }
 981                    BatchProvider::Openai => {
 982                        let client = openai_client::OpenAiClient::batch(&paths::LLM_CACHE_DB)
 983                            .expect("Failed to create OpenAI client");
 984                        if let Err(e) = client.import_batches(&import_args.batch_ids).await {
 985                            eprintln!("Error importing OpenAI batches: {:?}", e);
 986                            std::process::exit(1);
 987                        }
 988                    }
 989                }
 990                println!(
 991                    "Successfully imported {} batch(es)",
 992                    import_args.batch_ids.len()
 993                );
 994            });
 995            return;
 996        }
 997        Command::Clean => {
 998            std::fs::remove_dir_all(&*paths::DATA_DIR).unwrap();
 999            return;
1000        }
1001        Command::PrintZetaFormats => {
1002            use strum::IntoEnumIterator as _;
1003            for format in ZetaFormat::iter() {
1004                println!("{}", format.to_string().to_lowercase());
1005            }
1006            return;
1007        }
1008
1009        Command::Synthesize(synth_args) => {
1010            let output_dir = if let Some(output_dir) = args.output {
1011                output_dir
1012            } else {
1013                let default_output_dir = env::current_dir()
1014                    .unwrap()
1015                    .join("crates/edit_prediction_cli/evals-generated");
1016                if default_output_dir.parent().unwrap().exists() {
1017                    std::fs::create_dir(&default_output_dir).ok();
1018                    default_output_dir
1019                } else {
1020                    panic!("output dir is required");
1021                }
1022            };
1023            let config = SynthesizeConfig {
1024                repo_urls: synth_args.repos.clone(),
1025                count: synth_args.count,
1026                max_commits: synth_args.max_commits,
1027                output_dir,
1028                fresh: synth_args.fresh,
1029            };
1030            smol::block_on(async {
1031                if let Err(e) = run_synthesize(config).await {
1032                    eprintln!("Error: {:?}", e);
1033                    std::process::exit(1);
1034                }
1035            });
1036            return;
1037        }
1038        Command::SplitCommit(split_commit_args) => {
1039            if let Err(error) = split_commit::run_split_commit(
1040                split_commit_args,
1041                &args.inputs,
1042                output.as_ref(),
1043                args.failed,
1044            ) {
1045                eprintln!("{error:#}");
1046                std::process::exit(1);
1047            }
1048            return;
1049        }
1050        Command::TruncatePatch(truncate_args) => {
1051            if let Err(error) =
1052                truncate_expected_patch::run_truncate_expected_patch(truncate_args, &args.inputs)
1053            {
1054                eprintln!("{error:#}");
1055                std::process::exit(1);
1056            }
1057            return;
1058        }
1059        Command::Split(split_args) => {
1060            if let Err(error) = split_dataset::run_split(split_args, &args.inputs) {
1061                eprintln!("{error:#}");
1062                std::process::exit(1);
1063            }
1064            return;
1065        }
1066        Command::FilterLanguages(filter_args) => {
1067            if let Err(error) =
1068                run_filter_languages(filter_args, &args.inputs, args.output.as_ref())
1069            {
1070                eprintln!("{error:#}");
1071                std::process::exit(1);
1072            }
1073            return;
1074        }
1075
1076        _ => {}
1077    }
1078
1079    let http_client = Arc::new(ReqwestClient::new());
1080    let app = gpui_platform::headless().with_http_client(http_client);
1081
1082    app.run(move |cx| {
1083        let app_state = Arc::new(headless::init(cx));
1084        EditPredictionStore::global(&app_state.client, &app_state.user_store, cx);
1085
1086        cx.spawn(async move |cx| {
1087            let result = async {
1088                let examples = load_examples(
1089                    app_state.client.http_client(),
1090                    &args,
1091                    output.as_ref(),
1092                    cx.background_executor().clone(),
1093                )
1094                .await?;
1095
1096                match &command {
1097                    Command::Predict(args) | Command::Score(args) => {
1098                        predict::sync_batches(args.provider.as_ref()).await?;
1099                    }
1100                    Command::Eval(args) => {
1101                        predict::sync_batches(args.predict.provider.as_ref()).await?;
1102                    }
1103                    Command::Qa(args) => {
1104                        qa::sync_batches(args).await?;
1105                    }
1106                    Command::Repair(args) => {
1107                        repair::sync_batches(args).await?;
1108                    }
1109                    _ => (),
1110                }
1111
1112                let failfast_on_single_example = examples.len() == 1;
1113
1114                // For --markdown mode, create the output directory if it doesn't exist
1115                if args.markdown {
1116                    let dir = output.as_ref().expect("--markdown requires -o");
1117                    if !dir.exists() {
1118                        std::fs::create_dir_all(dir)
1119                            .expect("Failed to create markdown output directory");
1120                    }
1121                }
1122
1123                // Set up JSONL output writer (not used in markdown mode)
1124                let mut output_sender: Option<mpsc::UnboundedSender<String>> = None;
1125                let mut in_place_temp_path: Option<PathBuf> = None;
1126                if !args.markdown
1127                    && let Some(output_path) = output.as_ref()
1128                {
1129                    let write_path = if args.in_place {
1130                        let temp = output_path.with_extension("jsonl.tmp");
1131                        in_place_temp_path = Some(temp.clone());
1132                        temp
1133                    } else {
1134                        output_path.clone()
1135                    };
1136
1137                    let file = OpenOptions::new()
1138                        .create(true)
1139                        .write(true)
1140                        .truncate(args.in_place)
1141                        .append(!args.in_place)
1142                        .open(&write_path)
1143                        .expect("Failed to open output file");
1144
1145                    let mut writer = BufWriter::new(file);
1146                    let (sender, mut receiver) = mpsc::unbounded::<String>();
1147                    cx.background_spawn(async move {
1148                        while let Some(line) = receiver.next().await {
1149                            writeln!(writer, "{}", line).expect("Failed to write example");
1150                            writer.flush().expect("Failed to flush output");
1151                        }
1152                    })
1153                    .detach();
1154                    output_sender = Some(sender);
1155                }
1156
1157                let grouped_examples = Mutex::new(group_examples_by_repo(examples));
1158                let finished_examples = Mutex::new(Vec::new());
1159
1160                let mut tasks = Vec::new();
1161                for _ in 0..args.max_parallelism {
1162                    tasks.push(async {
1163                        loop {
1164                            let Some(mut repo_examples) =
1165                                grouped_examples.lock().unwrap().pop_front()
1166                            else {
1167                                break;
1168                            };
1169                            for example in &mut repo_examples {
1170                                let example_progress =
1171                                    Progress::global().start_group(&example.spec.name);
1172
1173                                let result = async {
1174                                    match &command {
1175                                        Command::Read(_) => {}
1176                                        Command::LoadProject => {
1177                                            run_load_project(
1178                                                example,
1179                                                app_state.clone(),
1180                                                &example_progress,
1181                                                cx.clone(),
1182                                            )
1183                                            .await?;
1184                                        }
1185                                        Command::Context => {
1186                                            run_context_retrieval(
1187                                                example,
1188                                                app_state.clone(),
1189                                                &example_progress,
1190                                                cx.clone(),
1191                                            )
1192                                            .await?;
1193                                        }
1194                                        Command::FormatPrompt(args) => {
1195                                            run_format_prompt(
1196                                                example,
1197                                                args,
1198                                                app_state.clone(),
1199                                                &example_progress,
1200                                                cx.clone(),
1201                                            )
1202                                            .await?;
1203                                        }
1204                                        Command::Predict(args) => {
1205                                            run_prediction(
1206                                                example,
1207                                                args,
1208                                                app_state.clone(),
1209                                                &example_progress,
1210                                                cx.clone(),
1211                                            )
1212                                            .await?;
1213                                        }
1214                                        Command::ParseOutput => {
1215                                            parse_output::run_parse_output(example)?;
1216                                        }
1217                                        Command::Distill => {
1218                                            run_distill(example).await?;
1219                                        }
1220                                        Command::Score(args) => {
1221                                            run_scoring(
1222                                                example,
1223                                                args,
1224                                                app_state.clone(),
1225                                                &example_progress,
1226                                                cx.clone(),
1227                                            )
1228                                            .await?;
1229                                        }
1230                                        Command::Eval(args) => {
1231                                            run_scoring(
1232                                                example,
1233                                                &args.predict,
1234                                                app_state.clone(),
1235                                                &example_progress,
1236                                                cx.clone(),
1237                                            )
1238                                            .await?;
1239                                        }
1240                                        Command::Qa(args) => {
1241                                            qa::run_qa(example, args, &example_progress).await?;
1242                                        }
1243                                        Command::Repair(args) => {
1244                                            repair::run_repair(example, args, &example_progress)
1245                                                .await?;
1246                                        }
1247                                        Command::Clean
1248                                        | Command::Synthesize(_)
1249                                        | Command::SplitCommit(_)
1250                                        | Command::Split(_)
1251                                        | Command::TruncatePatch(_)
1252                                        | Command::FilterLanguages(_)
1253                                        | Command::ImportBatch(_)
1254                                        | Command::PrintZetaFormats => {
1255                                            unreachable!()
1256                                        }
1257                                    }
1258                                    anyhow::Ok(())
1259                                }
1260                                .await;
1261
1262                                let failed = if let Err(error) = result {
1263                                    handle_error(
1264                                        error,
1265                                        &args,
1266                                        &command,
1267                                        &app_state,
1268                                        failfast_on_single_example,
1269                                        &example,
1270                                    )
1271                                    .await;
1272                                    true
1273                                } else {
1274                                    false
1275                                };
1276
1277                                let should_write = !failed || args.failed == FailedHandling::Keep;
1278                                if should_write {
1279                                    if args.markdown {
1280                                        let markdown_dir =
1281                                            output.as_ref().expect("--markdown requires -o");
1282                                        let filename = format!("{}.md", example.spec.filename());
1283                                        let path = markdown_dir.join(&filename);
1284                                        let markdown = example.spec.to_markdown();
1285                                        std::fs::write(&path, &markdown)
1286                                            .expect("Failed to write markdown file");
1287                                    } else if let Some(ref mut sender) = output_sender.clone() {
1288                                        let line = serde_json::to_string(&example).unwrap();
1289                                        sender
1290                                            .send(line)
1291                                            .await
1292                                            .expect("Failed to send to output writer");
1293                                    } else if args.output.is_none()
1294                                        && !matches!(command, Command::Eval(_))
1295                                    {
1296                                        let line = serde_json::to_string(&example).unwrap();
1297                                        println!("{}", line);
1298                                    }
1299                                }
1300                            }
1301
1302                            let project = repo_examples
1303                                .iter()
1304                                .find_map(|e| e.state.as_ref().map(|s| s.project.clone()));
1305
1306                            if let Some(project) = project {
1307                                let mut cx = cx.clone();
1308
1309                                let shutdown_task: Task<()> =
1310                                    project.update(&mut cx, |project, cx| {
1311                                        let lsp_store = project.lsp_store();
1312                                        lsp_store.update(cx, |lsp_store, cx| {
1313                                            lsp_store.shutdown_all_language_servers(cx)
1314                                        })
1315                                    });
1316
1317                                shutdown_task.await;
1318
1319                                if let Some(ep_store) =
1320                                    cx.update(|cx| EditPredictionStore::try_global(cx))
1321                                {
1322                                    ep_store.update(&mut cx, |store, _| {
1323                                        store.remove_project(&project);
1324                                    });
1325                                }
1326                            }
1327
1328                            for example in &mut repo_examples {
1329                                example.state.take();
1330                            }
1331                            finished_examples
1332                                .lock()
1333                                .unwrap()
1334                                .extend_from_slice(&repo_examples);
1335                        }
1336                    });
1337                }
1338                futures::future::join_all(tasks).await;
1339
1340                Progress::global().finalize();
1341
1342                match &command {
1343                    Command::Predict(args) | Command::Score(args) => {
1344                        predict::sync_batches(args.provider.as_ref()).await?;
1345                    }
1346                    Command::Eval(args) => {
1347                        predict::sync_batches(args.predict.provider.as_ref()).await?;
1348                    }
1349                    Command::Qa(args) => {
1350                        qa::sync_batches(args).await?;
1351                    }
1352                    Command::Repair(args) => {
1353                        repair::sync_batches(args).await?;
1354                    }
1355                    _ => (),
1356                }
1357
1358                match &command {
1359                    Command::Eval(args) => {
1360                        let examples = finished_examples.lock().unwrap();
1361                        score::print_report(&examples, args.verbose);
1362                        if let Some(summary_path) = &args.summary_json {
1363                            score::write_summary_json(&examples, summary_path)?;
1364                        }
1365                    }
1366                    Command::Repair(args) => {
1367                        let examples = finished_examples.lock().unwrap();
1368                        repair::print_report(&examples, args.confidence_threshold);
1369                    }
1370                    _ => (),
1371                };
1372
1373                // For --in-place, atomically rename temp file to original
1374                if let Some(temp_path) = &in_place_temp_path {
1375                    let final_path = output.as_ref().expect("in_place_temp_path requires output");
1376                    std::fs::rename(temp_path, final_path)
1377                        .expect("Failed to rename temp file to final output");
1378                }
1379
1380                anyhow::Ok(())
1381            }
1382            .await;
1383
1384            if let Err(e) = result {
1385                panic!("Fatal error: {:?}", e);
1386            }
1387
1388            let _ = cx.update(|cx| cx.quit());
1389        })
1390        .detach();
1391    });
1392}
1393
1394async fn handle_error(
1395    error: anyhow::Error,
1396    args: &EpArgs,
1397    command: &Command,
1398    app_state: &Arc<headless::EpAppState>,
1399    failfast_on_single_example: bool,
1400    example: &Example,
1401) {
1402    Progress::global().increment_failed();
1403
1404    let msg;
1405    if !matches!(args.failed, FailedHandling::SkipNoFiles) {
1406        let example_name = example.spec.filename();
1407
1408        let failed_example_path = FAILED_EXAMPLES_DIR.join(format!("{}.json", example_name));
1409        app_state
1410            .fs
1411            .write(
1412                &failed_example_path,
1413                &serde_json::to_vec_pretty(&example).unwrap(),
1414            )
1415            .await
1416            .unwrap();
1417        let err_path = FAILED_EXAMPLES_DIR.join(format!("{}_err.txt", example_name));
1418        app_state
1419            .fs
1420            .write(&err_path, format!("{error:?}").as_bytes())
1421            .await
1422            .unwrap();
1423
1424        let failed_jsonl_path = RUN_DIR.join("failed.jsonl");
1425        let mut file = OpenOptions::new()
1426            .create(true)
1427            .append(true)
1428            .open(&failed_jsonl_path)
1429            .expect("Failed to open failed.jsonl");
1430        writeln!(file, "{}", serde_json::to_string(example).unwrap())
1431            .expect("Failed to write to failed.jsonl");
1432
1433        let cursor_path = match example.repo_name() {
1434            Ok(repo_name) => repo_name.worktree_path().join(&example.spec.cursor_path),
1435            Err(_) => example.spec.cursor_path.as_ref().to_path_buf(),
1436        };
1437        msg = format!(
1438            indoc::indoc! {"
1439                While processing \"{}\":
1440
1441                \x1b[31m{:?}\x1b[0m
1442
1443                Example:        \x1b[36m{}\x1b[0m
1444                Error file:     \x1b[36m{}\x1b[0m
1445                Cursor file:    \x1b[36m{}\x1b[0m
1446                Re-run:         cargo run -p edit_prediction_cli -- {} \x1b[36m{}\x1b[0m
1447            "},
1448            example.spec.name,
1449            error,
1450            failed_example_path.display(),
1451            err_path.display(),
1452            cursor_path.display(),
1453            command,
1454            failed_example_path.display(),
1455        );
1456    } else {
1457        msg = format!(
1458            indoc::indoc! {"
1459            While processing \"{}\":
1460
1461                \x1b[31m{:?}\x1b[0m
1462            "},
1463            example.spec.name, error
1464        );
1465    }
1466
1467    if args.failfast || failfast_on_single_example {
1468        Progress::global().finalize();
1469        panic!("{}", msg);
1470    } else {
1471        log::error!("{}", msg);
1472    }
1473}