ep: Fix in-place processing (#47274)

Oleksiy Syvokon created

Release Notes:

- N/A

Change summary

crates/edit_prediction_cli/src/main.rs | 41 +++++++++++++++++++++++----
1 file changed, 35 insertions(+), 6 deletions(-)

Detailed changes

crates/edit_prediction_cli/src/main.rs 🔗

@@ -549,14 +549,37 @@ fn main() {
 
                 let failfast_on_single_example = examples.len() == 1;
 
+                // For --in-place, write to a temp file and rename at the end to avoid data loss on interruption
+                let in_place_temp_path = if args.in_place {
+                    output.as_ref().map(|path| {
+                        let mut temp_path = path.clone();
+                        temp_path.set_extension("jsonl.tmp");
+                        temp_path
+                    })
+                } else {
+                    None
+                };
+
                 let output_sender: Option<mpsc::UnboundedSender<String>> =
                     if args.output.is_some() || !matches!(command, Command::Eval(_)) {
-                        output.as_ref().map(|path| {
-                            let file = OpenOptions::new()
-                                .create(true)
-                                .append(true)
-                                .open(path)
-                                .expect("Failed to open output file");
+                        let write_path = in_place_temp_path.as_ref().or(output.as_ref());
+                        write_path.map(|path| {
+                            let file = if args.in_place {
+                                // For --in-place, write to temp file (truncate if exists)
+                                OpenOptions::new()
+                                    .create(true)
+                                    .write(true)
+                                    .truncate(true)
+                                    .open(path)
+                                    .expect("Failed to open temp output file")
+                            } else {
+                                // For regular output, append to support resuming
+                                OpenOptions::new()
+                                    .create(true)
+                                    .append(true)
+                                    .open(path)
+                                    .expect("Failed to open output file")
+                            };
                             let mut writer = BufWriter::new(file);
                             let (sender, mut receiver) = mpsc::unbounded::<String>();
                             cx.background_spawn(async move {
@@ -744,6 +767,12 @@ fn main() {
                     _ => (),
                 };
 
+                // For --in-place, atomically rename temp file to original
+                if let (Some(temp_path), Some(final_path)) = (&in_place_temp_path, &output) {
+                    std::fs::rename(temp_path, final_path)
+                        .expect("Failed to rename temp file to final output");
+                }
+
                 anyhow::Ok(())
             }
             .await;