main.rs

  1mod anthropic_client;
  2mod distill;
  3mod example;
  4mod format_prompt;
  5mod git;
  6mod headless;
  7mod load_project;
  8mod metrics;
  9mod parse_output;
 10mod paths;
 11mod predict;
 12mod progress;
 13mod pull_examples;
 14mod reorder_patch;
 15mod retrieve_context;
 16mod score;
 17mod split_commit;
 18mod split_dataset;
 19mod synthesize;
 20use clap::{Args, CommandFactory, Parser, Subcommand, ValueEnum};
 21use collections::HashSet;
 22use edit_prediction::EditPredictionStore;
 23use futures::channel::mpsc;
 24use futures::{SinkExt as _, StreamExt as _};
 25use gpui::{AppContext as _, Application, BackgroundExecutor, Task};
 26use zeta_prompt::ZetaVersion;
 27
 28use reqwest_client::ReqwestClient;
 29use serde::{Deserialize, Deserializer, Serialize, Serializer};
 30use std::fmt::Display;
 31use std::fs::{File, OpenOptions};
 32use std::hash::{Hash, Hasher};
 33use std::io::{BufRead, BufReader, BufWriter, Write};
 34use std::sync::Mutex;
 35use std::{path::PathBuf, sync::Arc};
 36
 37use crate::distill::run_distill;
 38use crate::example::{Example, group_examples_by_repo, read_example_files};
 39use crate::format_prompt::run_format_prompt;
 40use crate::load_project::run_load_project;
 41use crate::paths::{FAILED_EXAMPLES_DIR, RUN_DIR};
 42use crate::predict::run_prediction;
 43use crate::progress::Progress;
 44use crate::retrieve_context::run_context_retrieval;
 45use crate::score::run_scoring;
 46use crate::split_commit::SplitCommitArgs;
 47use crate::split_dataset::SplitArgs;
 48use crate::synthesize::{SynthesizeConfig, run_synthesize};
 49
 50#[derive(Parser, Debug)]
 51#[command(name = "ep")]
 52struct EpArgs {
 53    #[arg(long, default_value_t = false)]
 54    printenv: bool,
 55    #[clap(long, default_value_t = 10, global = true)]
 56    max_parallelism: usize,
 57    #[clap(long, global = true)]
 58    limit: Option<usize>,
 59    #[clap(long, global = true)]
 60    offset: Option<usize>,
 61    /// Filter examples by name
 62    #[clap(long, global = true)]
 63    name: Option<String>,
 64    /// Filter examples by repository
 65    #[clap(long, global = true)]
 66    repo: Option<String>,
 67    #[command(subcommand)]
 68    command: Option<Command>,
 69    #[clap(global = true, help = INPUTS_HELP)]
 70    inputs: Vec<PathBuf>,
 71    #[arg(long, short, global = true)]
 72    output: Option<PathBuf>,
 73    #[arg(long, short, global = true)]
 74    in_place: bool,
 75    #[arg(long, short, global = true)]
 76    failfast: bool,
 77    /// How to handle failed examples in output: keep them or skip them.
 78    /// Failed examples are always logged to the run's failed directory.
 79    #[arg(long, global = true, default_value = "keep")]
 80    failed: FailedHandling,
 81}
 82
 83/// Controls whether failed examples are included in the main output.
 84/// Failed examples are always logged to the run's failed/ directory regardless of this setting.
 85#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, ValueEnum)]
 86pub enum FailedHandling {
 87    /// Include failed examples in the main output (default)
 88    #[default]
 89    Keep,
 90    /// Exclude failed examples from the main output
 91    Skip,
 92    /// Skip writing files
 93    SkipNoFiles,
 94}
 95
 96const INPUTS_HELP: &str = r#"
 97Inputs can be file paths or special specifiers:
 98
 99  path
100      Path to an example(s) file (.md, .json, or .jsonl)
101
102  captured-after:{timestamp}
103      Fetch captured examples from Snowflake after the given RFC3339 timestamp.
104
105      You can specify this multiple times and mix it with file inputs.
106
107      Required environment variables to connect to Snowflake:
108          EP_SNOWFLAKE_API_KEY
109          EP_SNOWFLAKE_BASE_URL
110
111      Optional:
112          EP_SNOWFLAKE_ROLE
113
114Examples:
115
116  # Predict from a file
117  ep predict examples.jsonl
118
119  # Predict from captured examples after a timestamp
120  ep predict captured-after:2025-01-01T00:00:00Z
121
122  # Mix file inputs and captured-after in the same invocation
123  ep predict examples.jsonl captured-after:2025-01-01T00:00:00Z
124"#;
125
126#[derive(Subcommand, Debug, Clone)]
127enum Command {
128    /// Parse markdown examples and output a combined .jsonl file
129    ParseExample,
130    /// Create git worktrees for each example and load file contents
131    LoadProject,
132    /// Retrieve context for input examples.
133    Context,
134    /// Generate a prompt string for a specific model
135    FormatPrompt(FormatPromptArgs),
136    /// Runs edit prediction
137    Predict(PredictArgs),
138    /// Parse model outputs (actual_output) into unified diffs (actual_patch).
139    /// Requires format-prompt to have been run first. Uses provider from prompt.
140    ParseOutput,
141    /// Computes a score based on actual and expected patches
142    Score(PredictArgs),
143    /// Prepares a distillation dataset by copying expected outputs to
144    /// predicted outputs and removing actual outputs and prompts.
145    Distill,
146    /// Print aggregated scores
147    Eval(PredictArgs),
148    /// Generate eval examples by analyzing git commits from a repository
149    Synthesize(SynthesizeArgs),
150    /// Remove git repositories and worktrees
151    Clean,
152    /// Generate an evaluation example by splitting a chronologically-ordered commit
153    SplitCommit(SplitCommitArgs),
154    /// Split a JSONL dataset into multiple files (stratified by repository_url if present)
155    Split(SplitArgs),
156}
157
158impl Display for Command {
159    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
160        match self {
161            Command::ParseExample => write!(f, "parse-example"),
162            Command::LoadProject => write!(f, "load-project"),
163            Command::Context => write!(f, "context"),
164            Command::FormatPrompt(args) => {
165                write!(f, "format-prompt --provider={}", args.provider)
166            }
167            Command::Predict(args) => match &args.provider {
168                Some(provider) => write!(f, "predict --provider={}", provider),
169                None => write!(f, "predict"),
170            },
171            Command::ParseOutput => write!(f, "parse-output"),
172            Command::Score(args) => match &args.provider {
173                Some(provider) => write!(f, "score --provider={}", provider),
174                None => write!(f, "score"),
175            },
176            Command::Distill => write!(f, "distill"),
177            Command::Eval(args) => match &args.provider {
178                Some(provider) => write!(f, "eval --provider={}", provider),
179                None => write!(f, "eval"),
180            },
181            Command::Synthesize(args) => {
182                write!(f, "synthesize --repos {}", args.repos.join(" "))
183            }
184            Command::Clean => write!(f, "clean"),
185            Command::SplitCommit(_) => write!(f, "split-commit"),
186            Command::Split(_) => write!(f, "split"),
187        }
188    }
189}
190
191#[derive(Debug, Args, Clone)]
192struct FormatPromptArgs {
193    #[clap(long, short('p'), default_value_t = PredictionProvider::default())]
194    provider: PredictionProvider,
195}
196
197#[derive(Debug, Args, Clone)]
198struct PredictArgs {
199    #[clap(long, short('p'))]
200    provider: Option<PredictionProvider>,
201    #[clap(long, default_value_t = 1)]
202    repetitions: usize,
203}
204
205#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
206enum PredictionProvider {
207    Sweep,
208    Mercury,
209    Zeta1,
210    Zeta2(ZetaVersion),
211    Teacher(ZetaVersion),
212    TeacherNonBatching(ZetaVersion),
213}
214
215impl Default for PredictionProvider {
216    fn default() -> Self {
217        PredictionProvider::Zeta2(ZetaVersion::default())
218    }
219}
220
221impl std::fmt::Display for PredictionProvider {
222    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
223        match self {
224            PredictionProvider::Sweep => write!(f, "sweep"),
225            PredictionProvider::Mercury => write!(f, "mercury"),
226            PredictionProvider::Zeta1 => write!(f, "zeta1"),
227            PredictionProvider::Zeta2(version) => write!(f, "zeta2:{version}"),
228            PredictionProvider::Teacher(version) => write!(f, "teacher:{version}"),
229            PredictionProvider::TeacherNonBatching(version) => {
230                write!(f, "teacher-non-batching:{version}")
231            }
232        }
233    }
234}
235
236impl std::str::FromStr for PredictionProvider {
237    type Err = anyhow::Error;
238
239    fn from_str(mut s: &str) -> Result<Self, Self::Err> {
240        let mut version = ZetaVersion::default();
241        if let Some((first, second)) = s.split_once(':') {
242            version = ZetaVersion::parse(second)?;
243            s = first;
244        }
245
246        let s_lower = s.to_lowercase();
247        match s_lower.as_str() {
248            "sweep" => Ok(PredictionProvider::Sweep),
249            "mercury" => Ok(PredictionProvider::Mercury),
250            "zeta1" => Ok(PredictionProvider::Zeta1),
251            "zeta2" => Ok(PredictionProvider::Zeta2(version)),
252            "teacher" => Ok(PredictionProvider::Teacher(version)),
253            "teacher-non-batching" | "teacher_non_batching" | "teachernonbatching" => {
254                Ok(PredictionProvider::TeacherNonBatching(version))
255            }
256            _ => {
257                anyhow::bail!(
258                    "unknown provider `{s}`. Valid options: sweep, mercury, zeta1, zeta2, zeta2:<version>, teacher, teacher-non-batching\n\
259                 For zeta2, you can optionally specify a version like `zeta2:ordered` or `zeta2:V0113_Ordered`.\n\
260                 Available zeta versions:\n{}",
261                    ZetaVersion::options_as_string()
262                )
263            }
264        }
265    }
266}
267
268impl Serialize for PredictionProvider {
269    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
270    where
271        S: Serializer,
272    {
273        serializer.serialize_str(&self.to_string())
274    }
275}
276
277impl<'de> Deserialize<'de> for PredictionProvider {
278    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
279    where
280        D: Deserializer<'de>,
281    {
282        let s = String::deserialize(deserializer)?;
283        s.parse().map_err(serde::de::Error::custom)
284    }
285}
286
287#[derive(Debug, Args, Clone)]
288struct SynthesizeArgs {
289    /// Repository URLs (git@github.com:owner/repo or https://...)
290    #[clap(long, required = true, num_args = 1..)]
291    repos: Vec<String>,
292
293    /// Number of examples to generate per repository
294    #[clap(long, default_value_t = 5)]
295    count: usize,
296
297    /// Maximum commits to scan per repository before giving up
298    #[clap(long, default_value_t = 100)]
299    max_commits: usize,
300
301    /// Ignore state file and reprocess all commits
302    #[clap(long)]
303    fresh: bool,
304}
305
306impl EpArgs {
307    fn output_path(&self) -> Option<PathBuf> {
308        if self.in_place {
309            if self.inputs.len() == 1 {
310                self.inputs.first().cloned()
311            } else {
312                panic!("--in-place requires exactly one input file")
313            }
314        } else {
315            self.output.clone()
316        }
317    }
318}
319
320async fn load_examples(
321    http_client: Arc<dyn http_client::HttpClient>,
322    args: &EpArgs,
323    output_path: Option<&PathBuf>,
324    background_executor: BackgroundExecutor,
325) -> anyhow::Result<Vec<Example>> {
326    let mut captured_after_timestamps = Vec::new();
327    let mut file_inputs = Vec::new();
328
329    for input in &args.inputs {
330        let input_string = input.to_string_lossy();
331        if let Some(timestamp) = pull_examples::parse_captured_after_input(input_string.as_ref()) {
332            captured_after_timestamps.push(timestamp.to_string());
333        } else {
334            file_inputs.push(input.clone());
335        }
336    }
337
338    let mut examples = read_example_files(&file_inputs);
339
340    Progress::global().set_total_examples(examples.len());
341
342    let remaining_limit_for_snowflake =
343        args.limit.map(|limit| limit.saturating_sub(examples.len()));
344
345    if let Some(0) = remaining_limit_for_snowflake {
346        log::info!(
347            "skipping captured-after inputs because --limit is already satisfied by example files"
348        );
349    } else if !captured_after_timestamps.is_empty() {
350        captured_after_timestamps.sort();
351
352        let max_rows_per_timestamp = remaining_limit_for_snowflake.unwrap_or(5000);
353
354        let mut captured_examples = pull_examples::fetch_captured_examples_after(
355            http_client,
356            &captured_after_timestamps,
357            max_rows_per_timestamp,
358            background_executor,
359        )
360        .await?;
361        examples.append(&mut captured_examples);
362    }
363
364    crate::example::sort_examples_by_repo_and_rev(&mut examples);
365
366    if let Some(name_filter) = &args.name {
367        examples.retain(|example| example.spec.name.contains(name_filter));
368    }
369    if let Some(repo_filter) = &args.repo {
370        examples.retain(|example| example.spec.repository_url.contains(repo_filter));
371    }
372
373    // Skip resume logic for --in-place since input and output are the same file,
374    // which would incorrectly treat all input examples as already processed.
375    if !args.in_place {
376        if let Some(path) = output_path {
377            resume_from_output(path, &mut examples);
378        }
379    }
380
381    if let Some(offset) = args.offset {
382        examples.splice(0..offset, []);
383    }
384
385    if let Some(limit) = args.limit {
386        examples.truncate(limit);
387    }
388
389    Progress::global().set_total_examples(examples.len());
390
391    Ok(examples)
392}
393
394fn spec_hash(spec: &edit_prediction::example_spec::ExampleSpec) -> u64 {
395    let mut hasher = collections::FxHasher::default();
396    spec.hash(&mut hasher);
397    hasher.finish()
398}
399
400fn resume_from_output(path: &PathBuf, examples: &mut Vec<Example>) {
401    let file = match File::open(path) {
402        Ok(f) => f,
403        Err(_) => return,
404    };
405
406    let input_hashes: HashSet<u64> = examples.iter().map(|e| spec_hash(&e.spec)).collect();
407
408    let reader = BufReader::new(file);
409    let mut kept_lines = Vec::new();
410    let mut kept_hashes = HashSet::default();
411
412    for line in reader.lines() {
413        let line = match line {
414            Ok(l) => l,
415            Err(_) => continue,
416        };
417
418        if let Ok(output_example) = serde_json::from_str::<Example>(&line) {
419            let hash = spec_hash(&output_example.spec);
420            if input_hashes.contains(&hash) && !kept_hashes.contains(&hash) {
421                kept_hashes.insert(hash);
422                kept_lines.push(line);
423            }
424        }
425    }
426
427    let total = examples.len();
428    let already_processed = kept_hashes.len();
429
430    eprintln!(
431        "Resuming: {}/{} examples already processed",
432        already_processed, total
433    );
434
435    let file = OpenOptions::new()
436        .write(true)
437        .truncate(true)
438        .open(path)
439        .expect("Failed to open output file for rewriting");
440    let mut writer = BufWriter::new(file);
441    for line in &kept_lines {
442        writeln!(writer, "{}", line).expect("Failed to write to output file");
443    }
444    writer.flush().expect("Failed to flush output file");
445
446    examples.retain(|e| !kept_hashes.contains(&spec_hash(&e.spec)));
447}
448
449fn main() {
450    let args = EpArgs::parse();
451
452    if args.printenv {
453        ::util::shell_env::print_env();
454        return;
455    }
456
457    let output = args.output_path();
458    let command = match &args.command {
459        Some(cmd) => cmd.clone(),
460        None => {
461            EpArgs::command().print_help().unwrap();
462            return;
463        }
464    };
465
466    match &command {
467        Command::Clean => {
468            std::fs::remove_dir_all(&*paths::DATA_DIR).unwrap();
469            return;
470        }
471        Command::Synthesize(synth_args) => {
472            let Some(output_dir) = args.output else {
473                panic!("output dir is required");
474            };
475            let config = SynthesizeConfig {
476                repo_urls: synth_args.repos.clone(),
477                count: synth_args.count,
478                max_commits: synth_args.max_commits,
479                output_dir,
480                fresh: synth_args.fresh,
481            };
482            smol::block_on(async {
483                if let Err(e) = run_synthesize(config).await {
484                    eprintln!("Error: {:?}", e);
485                    std::process::exit(1);
486                }
487            });
488            return;
489        }
490        Command::SplitCommit(split_commit_args) => {
491            if let Err(error) = split_commit::run_split_commit(
492                split_commit_args,
493                &args.inputs,
494                output.as_ref(),
495                args.failed,
496            ) {
497                eprintln!("{error:#}");
498                std::process::exit(1);
499            }
500            return;
501        }
502        Command::Split(split_args) => {
503            if let Err(error) = split_dataset::run_split(split_args, &args.inputs) {
504                eprintln!("{error:#}");
505                std::process::exit(1);
506            }
507            return;
508        }
509        _ => {}
510    }
511
512    let http_client = Arc::new(ReqwestClient::new());
513    let app = Application::headless().with_http_client(http_client);
514
515    app.run(move |cx| {
516        let app_state = Arc::new(headless::init(cx));
517        EditPredictionStore::global(&app_state.client, &app_state.user_store, cx);
518
519        cx.spawn(async move |cx| {
520            let result = async {
521                let examples = load_examples(
522                    app_state.client.http_client(),
523                    &args,
524                    output.as_ref(),
525                    cx.background_executor().clone(),
526                )
527                .await?;
528
529                match &command {
530                    Command::Predict(args) | Command::Score(args) | Command::Eval(args) => {
531                        predict::sync_batches(args.provider.as_ref()).await?;
532                    }
533                    _ => (),
534                }
535
536                let failfast_on_single_example = examples.len() == 1;
537
538                let output_sender: Option<mpsc::UnboundedSender<String>> =
539                    if args.output.is_some() || !matches!(command, Command::Eval(_)) {
540                        output.as_ref().map(|path| {
541                            let file = OpenOptions::new()
542                                .create(true)
543                                .append(true)
544                                .open(path)
545                                .expect("Failed to open output file");
546                            let mut writer = BufWriter::new(file);
547                            let (sender, mut receiver) = mpsc::unbounded::<String>();
548                            cx.background_spawn(async move {
549                                while let Some(line) = receiver.next().await {
550                                    writeln!(writer, "{}", line).expect("Failed to write example");
551                                    writer.flush().expect("Failed to flush output");
552                                }
553                            })
554                            .detach();
555                            sender
556                        })
557                    } else {
558                        None
559                    };
560
561                let grouped_examples = Mutex::new(group_examples_by_repo(examples));
562                let finished_examples = Mutex::new(Vec::new());
563
564                let mut tasks = Vec::new();
565                for _ in 0..args.max_parallelism {
566                    tasks.push(async {
567                        loop {
568                            let Some(mut repo_examples) =
569                                grouped_examples.lock().unwrap().pop_front()
570                            else {
571                                break;
572                            };
573                            for example in &mut repo_examples {
574                                let example_progress =
575                                    Progress::global().start_group(&example.spec.name);
576
577                                let result = async {
578                                    match &command {
579                                        Command::ParseExample => {}
580                                        Command::LoadProject => {
581                                            run_load_project(
582                                                example,
583                                                app_state.clone(),
584                                                &example_progress,
585                                                cx.clone(),
586                                            )
587                                            .await?;
588                                        }
589                                        Command::Context => {
590                                            run_context_retrieval(
591                                                example,
592                                                app_state.clone(),
593                                                &example_progress,
594                                                cx.clone(),
595                                            )
596                                            .await?;
597                                        }
598                                        Command::FormatPrompt(args) => {
599                                            run_format_prompt(
600                                                example,
601                                                args,
602                                                app_state.clone(),
603                                                &example_progress,
604                                                cx.clone(),
605                                            )
606                                            .await?;
607                                        }
608                                        Command::Predict(args) => {
609                                            run_prediction(
610                                                example,
611                                                args,
612                                                app_state.clone(),
613                                                &example_progress,
614                                                cx.clone(),
615                                            )
616                                            .await?;
617                                        }
618                                        Command::ParseOutput => {
619                                            parse_output::run_parse_output(example)?;
620                                        }
621                                        Command::Distill => {
622                                            run_distill(example).await?;
623                                        }
624                                        Command::Score(args) | Command::Eval(args) => {
625                                            run_scoring(
626                                                example,
627                                                &args,
628                                                app_state.clone(),
629                                                &example_progress,
630                                                cx.clone(),
631                                            )
632                                            .await?;
633                                        }
634                                        Command::Clean
635                                        | Command::Synthesize(_)
636                                        | Command::SplitCommit(_)
637                                        | Command::Split(_) => {
638                                            unreachable!()
639                                        }
640                                    }
641                                    anyhow::Ok(())
642                                }
643                                .await;
644
645                                let failed = if let Err(error) = result {
646                                    handle_error(
647                                        error,
648                                        &args,
649                                        &command,
650                                        &app_state,
651                                        failfast_on_single_example,
652                                        &example,
653                                    )
654                                    .await;
655                                    true
656                                } else {
657                                    false
658                                };
659
660                                let should_write = !failed || args.failed == FailedHandling::Keep;
661                                if should_write {
662                                    if let Some(ref mut sender) = output_sender.clone() {
663                                        let line = serde_json::to_string(&example).unwrap();
664                                        sender
665                                            .send(line)
666                                            .await
667                                            .expect("Failed to send to output writer");
668                                    } else if args.output.is_none()
669                                        && !matches!(command, Command::Eval(_))
670                                    {
671                                        let line = serde_json::to_string(&example).unwrap();
672                                        println!("{}", line);
673                                    }
674                                }
675                            }
676
677                            let repo_url = &repo_examples.first().unwrap().spec.repository_url;
678                            let project = repo_examples
679                                .iter()
680                                .find_map(|e| e.state.as_ref().map(|s| s.project.clone()))
681                                .or_else(|| app_state.project_cache.get(repo_url));
682
683                            if let Some(project) = project {
684                                let mut cx = cx.clone();
685
686                                let shutdown_task: Task<()> =
687                                    project.update(&mut cx, |project, cx| {
688                                        let lsp_store = project.lsp_store();
689                                        lsp_store.update(cx, |lsp_store, cx| {
690                                            lsp_store.shutdown_all_language_servers(cx)
691                                        })
692                                    });
693
694                                shutdown_task.await;
695
696                                if let Some(ep_store) =
697                                    cx.update(|cx| EditPredictionStore::try_global(cx))
698                                {
699                                    ep_store.update(&mut cx, |store, _| {
700                                        store.remove_project(&project);
701                                    });
702                                }
703                            }
704
705                            app_state.project_cache.remove(repo_url);
706                            for example in &mut repo_examples {
707                                example.state.take();
708                            }
709                            finished_examples
710                                .lock()
711                                .unwrap()
712                                .extend_from_slice(&repo_examples);
713                        }
714                    });
715                }
716                futures::future::join_all(tasks).await;
717
718                Progress::global().finalize();
719
720                match &command {
721                    Command::Predict(args) | Command::Score(args) | Command::Eval(args) => {
722                        predict::sync_batches(args.provider.as_ref()).await?;
723                    }
724                    _ => (),
725                }
726
727                match &command {
728                    Command::Eval(_) => score::print_report(&finished_examples.lock().unwrap()),
729                    _ => (),
730                };
731
732                anyhow::Ok(())
733            }
734            .await;
735
736            if let Err(e) = result {
737                panic!("Fatal error: {:?}", e);
738            }
739
740            let _ = cx.update(|cx| cx.quit());
741        })
742        .detach();
743    });
744}
745
746async fn handle_error(
747    error: anyhow::Error,
748    args: &EpArgs,
749    command: &Command,
750    app_state: &Arc<headless::EpAppState>,
751    failfast_on_single_example: bool,
752    example: &Example,
753) {
754    Progress::global().increment_failed();
755
756    let msg;
757    if !matches!(args.failed, FailedHandling::SkipNoFiles) {
758        let example_name = example.spec.filename();
759
760        let failed_example_path = FAILED_EXAMPLES_DIR.join(format!("{}.json", example_name));
761        app_state
762            .fs
763            .write(
764                &failed_example_path,
765                &serde_json::to_vec_pretty(&example).unwrap(),
766            )
767            .await
768            .unwrap();
769        let err_path = FAILED_EXAMPLES_DIR.join(format!("{}_err.txt", example_name));
770        app_state
771            .fs
772            .write(&err_path, format!("{error:?}").as_bytes())
773            .await
774            .unwrap();
775
776        let failed_jsonl_path = RUN_DIR.join("failed.jsonl");
777        let mut file = OpenOptions::new()
778            .create(true)
779            .append(true)
780            .open(&failed_jsonl_path)
781            .expect("Failed to open failed.jsonl");
782        writeln!(file, "{}", serde_json::to_string(example).unwrap())
783            .expect("Failed to write to failed.jsonl");
784
785        let cursor_path = example
786            .repo_name()
787            .unwrap()
788            .worktree_path()
789            .join(&example.spec.cursor_path);
790        msg = format!(
791            indoc::indoc! {"
792                While processing \"{}\":
793
794                \x1b[31m{:?}\x1b[0m
795
796                Example:        \x1b[36m{}\x1b[0m
797                Error file:     \x1b[36m{}\x1b[0m
798                Cursor file:    \x1b[36m{}\x1b[0m
799                Re-run:         cargo run -p edit_prediction_cli -- {} \x1b[36m{}\x1b[0m
800            "},
801            example.spec.name,
802            error,
803            failed_example_path.display(),
804            err_path.display(),
805            cursor_path.display(),
806            command,
807            failed_example_path.display(),
808        );
809    } else {
810        msg = format!(
811            indoc::indoc! {"
812            While processing \"{}\":
813
814                \x1b[31m{:?}\x1b[0m
815            "},
816            example.spec.name, error
817        );
818    }
819
820    if args.failfast || failfast_on_single_example {
821        Progress::global().finalize();
822        panic!("{}", msg);
823    } else {
824        log::error!("{}", msg);
825    }
826}