Rename td compact to td tidy; implement two-phase compaction

Amolith created

Change summary

README.md              |   2 
src/cli.rs             |   4 
src/cmd/mod.rs         |   6 
src/cmd/tidy.rs        |   9 +-
src/db.rs              | 100 ++++++++++++++++++++++---
tests/cli_query.rs     |   8 +-
tests/cli_tidy.rs      | 169 ++++++++++++++++++++++++++++++++++++++++++++
tests/repro_compact.rs |   4 
8 files changed, 274 insertions(+), 28 deletions(-)

Detailed changes

README.md 🔗

@@ -75,7 +75,7 @@ Commands:
   ready     Show tasks with no open blockers
   next      Recommend next task(s) to work on
   stats     Show task statistics (always JSON)
-  compact   Vacuum the database
+  tidy      Compact accumulated delta files into the base snapshot
   export    Export tasks to JSONL (one JSON object per line)
   import    Import tasks from a JSONL file
   sync      Sync project state with a peer via magic wormhole

src/cli.rs 🔗

@@ -192,8 +192,8 @@ pub enum Command {
     /// Show task statistics (always JSON)
     Stats,
 
-    /// Vacuum the database
-    Compact,
+    /// Compact accumulated delta files into the base snapshot
+    Tidy,
 
     /// Export tasks to JSONL (one JSON object per line)
     Export,

src/cmd/mod.rs 🔗

@@ -1,4 +1,3 @@
-mod compact;
 mod create;
 mod dep;
 mod done;
@@ -18,6 +17,7 @@ mod show;
 mod skill;
 mod stats;
 pub mod sync;
+mod tidy;
 mod update;
 mod r#use;
 
@@ -159,9 +159,9 @@ pub fn dispatch(cli: &Cli) -> Result<()> {
             let root = require_root()?;
             stats::run(&root)
         }
-        Command::Compact => {
+        Command::Tidy => {
             let root = require_root()?;
-            compact::run(&root)
+            tidy::run(&root)
         }
         Command::Export => {
             let root = require_root()?;

src/cmd/compact.rs → src/cmd/tidy.rs 🔗

@@ -6,10 +6,11 @@ use crate::db;
 pub fn run(root: &Path) -> Result<()> {
     let store = db::open(root)?;
     let c = crate::color::stderr_theme();
-    eprintln!("{}info:{} writing compacted snapshot...", c.blue, c.reset);
-    let out = store.write_snapshot()?;
-    let removed = store.purge_deltas()?;
-    eprintln!("{}info:{} wrote {}", c.blue, c.reset, out.display());
+    eprintln!(
+        "{}info:{} compacting deltas into snapshot...",
+        c.blue, c.reset
+    );
+    let removed = store.tidy()?;
     eprintln!("{}info:{} removed {removed} delta file(s)", c.blue, c.reset);
     Ok(())
 }

src/db.rs 🔗

@@ -330,26 +330,102 @@ impl Store {
     }
 
     /// Export all current state to a fresh base snapshot.
-    pub fn write_snapshot(&self) -> Result<PathBuf> {
-        let out = project_dir(&self.root, &self.project).join(BASE_FILE);
+    /// Compact accumulated deltas into the base snapshot using a two-phase
+    /// protocol that is safe against concurrent writers.
+    ///
+    /// **Phase 1** — rename `changes/` to `changes.compacting.<ulid>/`, then
+    /// immediately create a fresh `changes/`.  Any concurrent `td` command
+    /// that writes a delta after this point lands in the new `changes/` and is
+    /// therefore never touched by this operation.
+    ///
+    /// **Phase 2** — write a fresh base snapshot from the in-memory document
+    /// (which was loaded from both `base.loro` and every delta at `open` time),
+    /// then remove the compacting directory.
+    ///
+    /// Any orphaned `changes.compacting.*` directories left by a previously
+    /// crashed tidy are also removed: they were already merged into `self.doc`
+    /// at open time, so the new snapshot includes their contents.
+    ///
+    /// Returns the number of delta files folded into the snapshot.
+    pub fn tidy(&self) -> Result<usize> {
+        let project_dir = project_dir(&self.root, &self.project);
+        let changes_dir = project_dir.join(CHANGES_DIR);
+
+        // Phase 1: atomically hand off the current changes/ to a compacting
+        // directory so new writers have a clean home immediately.
+        let compacting_dir = project_dir.join(format!("changes.compacting.{}", Ulid::new()));
+        if changes_dir.exists() {
+            fs::rename(&changes_dir, &compacting_dir).with_context(|| {
+                format!(
+                    "failed to rename '{}' to '{}'",
+                    changes_dir.display(),
+                    compacting_dir.display()
+                )
+            })?;
+        }
+        fs::create_dir_all(&changes_dir).context("failed to create fresh changes/")?;
+
+        // Re-import every delta from the compacting directories.  self.doc
+        // was populated at open() time, but a concurrent writer may have
+        // appended a delta to changes/ between open() and the Phase 1
+        // rename — that delta is now inside compacting_dir without being in
+        // self.doc.  CRDT import is idempotent (deduplicates by OpID), so
+        // re-importing already-known ops is harmless.
+        let mut compacting_deltas = collect_delta_paths(&project_dir)?;
+        compacting_deltas.sort_by_key(|path| {
+            path.file_stem()
+                .and_then(|s| s.to_str())
+                .and_then(|s| Ulid::from_string(s).ok())
+        });
+        for delta_path in &compacting_deltas {
+            if let Ok(bytes) = fs::read(delta_path) {
+                if let Err(err) = self.doc.import(&bytes) {
+                    eprintln!(
+                        "warning: skipping unreadable delta '{}': {err}",
+                        delta_path.display()
+                    );
+                }
+            }
+        }
+
+        // Phase 2: write the new base snapshot.  self.doc now holds the
+        // full merged state including any concurrent deltas.
+        let out = project_dir.join(BASE_FILE);
         let bytes = self
             .doc
             .export(ExportMode::Snapshot)
             .context("failed to export loro snapshot")?;
         atomic_write_file(&out, &bytes)?;
-        Ok(out)
-    }
 
-    /// Delete persisted delta files after a fresh snapshot has been written.
-    pub fn purge_deltas(&self) -> Result<usize> {
-        let project_dir = project_dir(&self.root, &self.project);
-        let paths = collect_delta_paths(&project_dir)?;
+        // Remove the compacting directory we created in phase 1 plus any
+        // orphaned changes.compacting.* dirs from previously crashed tidies.
         let mut removed = 0usize;
-        for path in paths {
-            fs::remove_file(&path)
-                .with_context(|| format!("failed removing delta '{}'", path.display()))?;
-            removed += 1;
+        for entry in fs::read_dir(&project_dir)
+            .with_context(|| format!("failed to read project dir '{}'", project_dir.display()))?
+        {
+            let path = entry?.path();
+            if !path.is_dir() {
+                continue;
+            }
+            let Some(name) = path.file_name().and_then(|n| n.to_str()) else {
+                continue;
+            };
+            if !name.starts_with("changes.compacting.") {
+                continue;
+            }
+            // Count files before removing for the summary report.
+            for file in fs::read_dir(&path)
+                .with_context(|| format!("failed to read '{}'", path.display()))?
+            {
+                let fp = file?.path();
+                if fp.is_file() {
+                    removed += 1;
+                }
+            }
+            fs::remove_dir_all(&path)
+                .with_context(|| format!("failed to remove compacting dir '{}'", path.display()))?;
         }
+
         Ok(removed)
     }
 

tests/cli_query.rs 🔗

@@ -153,10 +153,10 @@ fn stats_counts_tasks() {
     assert_eq!(v["closed"].as_i64().unwrap(), 1);
 }
 
-// ── compact ──────────────────────────────────────────────────────────
+// ── tidy ─────────────────────────────────────────────────────────────
 
 #[test]
-fn compact_succeeds() {
+fn tidy_succeeds() {
     let tmp = init_tmp();
     create_task(&tmp, "Anything");
     create_task(&tmp, "Anything else");
@@ -171,11 +171,11 @@ fn compact_succeeds() {
     assert!(count_before > 0);
 
     td(&tmp)
-        .arg("compact")
+        .arg("tidy")
         .current_dir(&tmp)
         .assert()
         .success()
-        .stderr(predicate::str::contains("writing compacted snapshot"))
+        .stderr(predicate::str::contains("compacting deltas"))
         .stderr(predicate::str::contains("removed"));
 
     let count_after = std::fs::read_dir(&changes)

tests/cli_tidy.rs 🔗

@@ -0,0 +1,169 @@
+use assert_cmd::Command;
+use predicates::prelude::*;
+use tempfile::TempDir;
+
+fn td(home: &TempDir) -> Command {
+    let mut cmd = Command::cargo_bin("td").unwrap();
+    cmd.env("HOME", home.path());
+    cmd
+}
+
+fn init_tmp() -> TempDir {
+    let tmp = TempDir::new().unwrap();
+    td(&tmp)
+        .args(["init", "main"])
+        .current_dir(&tmp)
+        .assert()
+        .success();
+    tmp
+}
+
+/// td tidy exists and reports what it did.
+#[test]
+fn tidy_reports_progress() {
+    let tmp = init_tmp();
+
+    td(&tmp)
+        .args(["create", "Task 1"])
+        .current_dir(&tmp)
+        .assert()
+        .success();
+    td(&tmp)
+        .args(["create", "Task 2"])
+        .current_dir(&tmp)
+        .assert()
+        .success();
+
+    td(&tmp)
+        .arg("tidy")
+        .current_dir(&tmp)
+        .assert()
+        .success()
+        .stderr(predicate::str::contains("removed 2 delta file(s)"));
+}
+
+/// After td tidy, the changes/ directory exists but contains no delta files.
+#[test]
+fn tidy_empties_changes_dir() {
+    let tmp = init_tmp();
+
+    td(&tmp)
+        .args(["create", "Task A"])
+        .current_dir(&tmp)
+        .assert()
+        .success();
+
+    let project_dir = tmp.path().join(".local/share/td/projects/main");
+    let changes_dir = project_dir.join("changes");
+
+    let before = std::fs::read_dir(&changes_dir).unwrap().count();
+    assert!(before > 0, "deltas should exist before tidy");
+
+    td(&tmp).arg("tidy").current_dir(&tmp).assert().success();
+
+    // changes/ should still be there (ready for new deltas)…
+    assert!(changes_dir.exists(), "changes/ must still exist after tidy");
+    // …but empty.
+    let after = std::fs::read_dir(&changes_dir).unwrap().count();
+    assert_eq!(after, 0, "tidy should leave changes/ empty");
+}
+
+/// All tasks created before td tidy are still visible afterwards.
+#[test]
+fn tidy_preserves_tasks() {
+    let tmp = init_tmp();
+
+    td(&tmp)
+        .args(["create", "Keep me"])
+        .current_dir(&tmp)
+        .assert()
+        .success();
+    td(&tmp)
+        .args(["create", "Keep me too"])
+        .current_dir(&tmp)
+        .assert()
+        .success();
+
+    td(&tmp).arg("tidy").current_dir(&tmp).assert().success();
+
+    td(&tmp)
+        .arg("list")
+        .current_dir(&tmp)
+        .assert()
+        .success()
+        .stdout(predicate::str::contains("Keep me"))
+        .stdout(predicate::str::contains("Keep me too"));
+}
+
+/// Running td tidy twice in a row is idempotent: the second run has no deltas
+/// to remove, succeeds, and reports 0 files removed.
+#[test]
+fn tidy_is_idempotent() {
+    let tmp = init_tmp();
+
+    td(&tmp)
+        .args(["create", "Some task"])
+        .current_dir(&tmp)
+        .assert()
+        .success();
+
+    td(&tmp).arg("tidy").current_dir(&tmp).assert().success();
+
+    td(&tmp)
+        .arg("tidy")
+        .current_dir(&tmp)
+        .assert()
+        .success()
+        .stderr(predicate::str::contains("removed 0 delta file(s)"));
+}
+
+/// Crash-recovery: if a previous tidy renamed changes/ to changes.compacting.X/
+/// but crashed before finishing, the next tidy should absorb those deltas into
+/// the snapshot and remove the orphaned directory.
+#[test]
+fn tidy_recovers_orphaned_compacting_dir() {
+    let tmp = init_tmp();
+
+    td(&tmp)
+        .args(["create", "Orphaned task"])
+        .current_dir(&tmp)
+        .assert()
+        .success();
+
+    let project_dir = tmp.path().join(".local/share/td/projects/main");
+    let changes_dir = project_dir.join("changes");
+
+    // Simulate a crash after phase 1: rename changes/ to changes.compacting.X/
+    // without creating a fresh changes/ or writing the new snapshot.
+    let compacting_dir = project_dir.join("changes.compacting.01JNFAKEULID0000000000000");
+    std::fs::rename(&changes_dir, &compacting_dir)
+        .expect("simulate crash: rename changes/ to compacting dir");
+
+    // No changes/ exists now — a subsequent write would create it via
+    // create_dir_all, but let's leave that to tidy to exercise recovery.
+
+    // tidy must succeed and preserve the orphaned task.
+    td(&tmp).arg("tidy").current_dir(&tmp).assert().success();
+
+    // The orphaned compacting dir must be gone.
+    assert!(
+        !compacting_dir.exists(),
+        "tidy should remove the orphaned changes.compacting.* dir"
+    );
+
+    // The task from the orphaned delta must still be accessible.
+    td(&tmp)
+        .arg("list")
+        .current_dir(&tmp)
+        .assert()
+        .success()
+        .stdout(predicate::str::contains("Orphaned task"));
+}
+
+/// td compact is gone; using it should produce an error.
+#[test]
+fn compact_command_removed() {
+    let tmp = init_tmp();
+
+    td(&tmp).arg("compact").current_dir(&tmp).assert().failure();
+}

tests/repro_compact.rs 🔗

@@ -35,8 +35,8 @@ fn compact_cleans_delta_files() {
     let deltas = std::fs::read_dir(&changes_dir).unwrap().count();
     assert!(deltas > 0, "Deltas should exist before compaction");
 
-    // Compact
-    td(&tmp).arg("compact").current_dir(&tmp).assert().success();
+    // Tidy (formerly compact)
+    td(&tmp).arg("tidy").current_dir(&tmp).assert().success();
 
     // Deltas are folded into the snapshot and removed.
     let deltas_after = std::fs::read_dir(&changes_dir).unwrap().count();