From 4aa0d43b22e246880cb357da3a9ee15ac10dc10f Mon Sep 17 00:00:00 2001 From: Amolith Date: Mon, 2 Mar 2026 13:01:02 -0700 Subject: [PATCH] Rename td compact to td tidy; implement two-phase compaction --- README.md | 2 +- src/cli.rs | 4 +- src/cmd/mod.rs | 6 +- src/cmd/{compact.rs => tidy.rs} | 9 +- src/db.rs | 100 ++++++++++++++++--- tests/cli_query.rs | 8 +- tests/cli_tidy.rs | 169 ++++++++++++++++++++++++++++++++ tests/repro_compact.rs | 4 +- 8 files changed, 274 insertions(+), 28 deletions(-) rename src/cmd/{compact.rs => tidy.rs} (54%) create mode 100644 tests/cli_tidy.rs diff --git a/README.md b/README.md index 2eee2e9a4f21e0fec117bbb7a277cb7c4485afa5..f45381a29e0bf0cbc5134eea8cae013ce2db5d7f 100644 --- a/README.md +++ b/README.md @@ -75,7 +75,7 @@ Commands: ready Show tasks with no open blockers next Recommend next task(s) to work on stats Show task statistics (always JSON) - compact Vacuum the database + tidy Compact accumulated delta files into the base snapshot export Export tasks to JSONL (one JSON object per line) import Import tasks from a JSONL file sync Sync project state with a peer via magic wormhole diff --git a/src/cli.rs b/src/cli.rs index f18dfd8d6e926bf57df95c34749f3875aa1f12e8..f16d4a03c3f0093cc1506973708f85458842f34d 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -192,8 +192,8 @@ pub enum Command { /// Show task statistics (always JSON) Stats, - /// Vacuum the database - Compact, + /// Compact accumulated delta files into the base snapshot + Tidy, /// Export tasks to JSONL (one JSON object per line) Export, diff --git a/src/cmd/mod.rs b/src/cmd/mod.rs index bb674331faeaa6629d298e924649420fe528df5a..26b3b2e0be8e3dd80c8f59bb55929fd657531d31 100644 --- a/src/cmd/mod.rs +++ b/src/cmd/mod.rs @@ -1,4 +1,3 @@ -mod compact; mod create; mod dep; mod done; @@ -18,6 +17,7 @@ mod show; mod skill; mod stats; pub mod sync; +mod tidy; mod update; mod r#use; @@ -159,9 +159,9 @@ pub fn dispatch(cli: &Cli) -> Result<()> { let root = require_root()?; stats::run(&root) } - Command::Compact => { + Command::Tidy => { let root = require_root()?; - compact::run(&root) + tidy::run(&root) } Command::Export => { let root = require_root()?; diff --git a/src/cmd/compact.rs b/src/cmd/tidy.rs similarity index 54% rename from src/cmd/compact.rs rename to src/cmd/tidy.rs index 7131ab013b4ded3b3a0022a4ac539e17814d9744..d8d3fb3b3472190eb52d39ff1251587515685463 100644 --- a/src/cmd/compact.rs +++ b/src/cmd/tidy.rs @@ -6,10 +6,11 @@ use crate::db; pub fn run(root: &Path) -> Result<()> { let store = db::open(root)?; let c = crate::color::stderr_theme(); - eprintln!("{}info:{} writing compacted snapshot...", c.blue, c.reset); - let out = store.write_snapshot()?; - let removed = store.purge_deltas()?; - eprintln!("{}info:{} wrote {}", c.blue, c.reset, out.display()); + eprintln!( + "{}info:{} compacting deltas into snapshot...", + c.blue, c.reset + ); + let removed = store.tidy()?; eprintln!("{}info:{} removed {removed} delta file(s)", c.blue, c.reset); Ok(()) } diff --git a/src/db.rs b/src/db.rs index ebecf6233dc3d09ab8dc67f30082ab567cc8e1ec..c312665a854af63cb226fe462d2573c0f16fc0a3 100644 --- a/src/db.rs +++ b/src/db.rs @@ -330,26 +330,102 @@ impl Store { } /// Export all current state to a fresh base snapshot. - pub fn write_snapshot(&self) -> Result { - let out = project_dir(&self.root, &self.project).join(BASE_FILE); + /// Compact accumulated deltas into the base snapshot using a two-phase + /// protocol that is safe against concurrent writers. + /// + /// **Phase 1** — rename `changes/` to `changes.compacting./`, then + /// immediately create a fresh `changes/`. Any concurrent `td` command + /// that writes a delta after this point lands in the new `changes/` and is + /// therefore never touched by this operation. + /// + /// **Phase 2** — write a fresh base snapshot from the in-memory document + /// (which was loaded from both `base.loro` and every delta at `open` time), + /// then remove the compacting directory. + /// + /// Any orphaned `changes.compacting.*` directories left by a previously + /// crashed tidy are also removed: they were already merged into `self.doc` + /// at open time, so the new snapshot includes their contents. + /// + /// Returns the number of delta files folded into the snapshot. + pub fn tidy(&self) -> Result { + let project_dir = project_dir(&self.root, &self.project); + let changes_dir = project_dir.join(CHANGES_DIR); + + // Phase 1: atomically hand off the current changes/ to a compacting + // directory so new writers have a clean home immediately. + let compacting_dir = project_dir.join(format!("changes.compacting.{}", Ulid::new())); + if changes_dir.exists() { + fs::rename(&changes_dir, &compacting_dir).with_context(|| { + format!( + "failed to rename '{}' to '{}'", + changes_dir.display(), + compacting_dir.display() + ) + })?; + } + fs::create_dir_all(&changes_dir).context("failed to create fresh changes/")?; + + // Re-import every delta from the compacting directories. self.doc + // was populated at open() time, but a concurrent writer may have + // appended a delta to changes/ between open() and the Phase 1 + // rename — that delta is now inside compacting_dir without being in + // self.doc. CRDT import is idempotent (deduplicates by OpID), so + // re-importing already-known ops is harmless. + let mut compacting_deltas = collect_delta_paths(&project_dir)?; + compacting_deltas.sort_by_key(|path| { + path.file_stem() + .and_then(|s| s.to_str()) + .and_then(|s| Ulid::from_string(s).ok()) + }); + for delta_path in &compacting_deltas { + if let Ok(bytes) = fs::read(delta_path) { + if let Err(err) = self.doc.import(&bytes) { + eprintln!( + "warning: skipping unreadable delta '{}': {err}", + delta_path.display() + ); + } + } + } + + // Phase 2: write the new base snapshot. self.doc now holds the + // full merged state including any concurrent deltas. + let out = project_dir.join(BASE_FILE); let bytes = self .doc .export(ExportMode::Snapshot) .context("failed to export loro snapshot")?; atomic_write_file(&out, &bytes)?; - Ok(out) - } - /// Delete persisted delta files after a fresh snapshot has been written. - pub fn purge_deltas(&self) -> Result { - let project_dir = project_dir(&self.root, &self.project); - let paths = collect_delta_paths(&project_dir)?; + // Remove the compacting directory we created in phase 1 plus any + // orphaned changes.compacting.* dirs from previously crashed tidies. let mut removed = 0usize; - for path in paths { - fs::remove_file(&path) - .with_context(|| format!("failed removing delta '{}'", path.display()))?; - removed += 1; + for entry in fs::read_dir(&project_dir) + .with_context(|| format!("failed to read project dir '{}'", project_dir.display()))? + { + let path = entry?.path(); + if !path.is_dir() { + continue; + } + let Some(name) = path.file_name().and_then(|n| n.to_str()) else { + continue; + }; + if !name.starts_with("changes.compacting.") { + continue; + } + // Count files before removing for the summary report. + for file in fs::read_dir(&path) + .with_context(|| format!("failed to read '{}'", path.display()))? + { + let fp = file?.path(); + if fp.is_file() { + removed += 1; + } + } + fs::remove_dir_all(&path) + .with_context(|| format!("failed to remove compacting dir '{}'", path.display()))?; } + Ok(removed) } diff --git a/tests/cli_query.rs b/tests/cli_query.rs index a213940be83fa3e0c4537690ffc88bf9b863599d..a03c81ca4b5cadfff055dc66c780a5ae5e5ffa74 100644 --- a/tests/cli_query.rs +++ b/tests/cli_query.rs @@ -153,10 +153,10 @@ fn stats_counts_tasks() { assert_eq!(v["closed"].as_i64().unwrap(), 1); } -// ── compact ────────────────────────────────────────────────────────── +// ── tidy ───────────────────────────────────────────────────────────── #[test] -fn compact_succeeds() { +fn tidy_succeeds() { let tmp = init_tmp(); create_task(&tmp, "Anything"); create_task(&tmp, "Anything else"); @@ -171,11 +171,11 @@ fn compact_succeeds() { assert!(count_before > 0); td(&tmp) - .arg("compact") + .arg("tidy") .current_dir(&tmp) .assert() .success() - .stderr(predicate::str::contains("writing compacted snapshot")) + .stderr(predicate::str::contains("compacting deltas")) .stderr(predicate::str::contains("removed")); let count_after = std::fs::read_dir(&changes) diff --git a/tests/cli_tidy.rs b/tests/cli_tidy.rs new file mode 100644 index 0000000000000000000000000000000000000000..a189beafbc3d3e736ad77b70b76ccf1a0c4c1581 --- /dev/null +++ b/tests/cli_tidy.rs @@ -0,0 +1,169 @@ +use assert_cmd::Command; +use predicates::prelude::*; +use tempfile::TempDir; + +fn td(home: &TempDir) -> Command { + let mut cmd = Command::cargo_bin("td").unwrap(); + cmd.env("HOME", home.path()); + cmd +} + +fn init_tmp() -> TempDir { + let tmp = TempDir::new().unwrap(); + td(&tmp) + .args(["init", "main"]) + .current_dir(&tmp) + .assert() + .success(); + tmp +} + +/// td tidy exists and reports what it did. +#[test] +fn tidy_reports_progress() { + let tmp = init_tmp(); + + td(&tmp) + .args(["create", "Task 1"]) + .current_dir(&tmp) + .assert() + .success(); + td(&tmp) + .args(["create", "Task 2"]) + .current_dir(&tmp) + .assert() + .success(); + + td(&tmp) + .arg("tidy") + .current_dir(&tmp) + .assert() + .success() + .stderr(predicate::str::contains("removed 2 delta file(s)")); +} + +/// After td tidy, the changes/ directory exists but contains no delta files. +#[test] +fn tidy_empties_changes_dir() { + let tmp = init_tmp(); + + td(&tmp) + .args(["create", "Task A"]) + .current_dir(&tmp) + .assert() + .success(); + + let project_dir = tmp.path().join(".local/share/td/projects/main"); + let changes_dir = project_dir.join("changes"); + + let before = std::fs::read_dir(&changes_dir).unwrap().count(); + assert!(before > 0, "deltas should exist before tidy"); + + td(&tmp).arg("tidy").current_dir(&tmp).assert().success(); + + // changes/ should still be there (ready for new deltas)… + assert!(changes_dir.exists(), "changes/ must still exist after tidy"); + // …but empty. + let after = std::fs::read_dir(&changes_dir).unwrap().count(); + assert_eq!(after, 0, "tidy should leave changes/ empty"); +} + +/// All tasks created before td tidy are still visible afterwards. +#[test] +fn tidy_preserves_tasks() { + let tmp = init_tmp(); + + td(&tmp) + .args(["create", "Keep me"]) + .current_dir(&tmp) + .assert() + .success(); + td(&tmp) + .args(["create", "Keep me too"]) + .current_dir(&tmp) + .assert() + .success(); + + td(&tmp).arg("tidy").current_dir(&tmp).assert().success(); + + td(&tmp) + .arg("list") + .current_dir(&tmp) + .assert() + .success() + .stdout(predicate::str::contains("Keep me")) + .stdout(predicate::str::contains("Keep me too")); +} + +/// Running td tidy twice in a row is idempotent: the second run has no deltas +/// to remove, succeeds, and reports 0 files removed. +#[test] +fn tidy_is_idempotent() { + let tmp = init_tmp(); + + td(&tmp) + .args(["create", "Some task"]) + .current_dir(&tmp) + .assert() + .success(); + + td(&tmp).arg("tidy").current_dir(&tmp).assert().success(); + + td(&tmp) + .arg("tidy") + .current_dir(&tmp) + .assert() + .success() + .stderr(predicate::str::contains("removed 0 delta file(s)")); +} + +/// Crash-recovery: if a previous tidy renamed changes/ to changes.compacting.X/ +/// but crashed before finishing, the next tidy should absorb those deltas into +/// the snapshot and remove the orphaned directory. +#[test] +fn tidy_recovers_orphaned_compacting_dir() { + let tmp = init_tmp(); + + td(&tmp) + .args(["create", "Orphaned task"]) + .current_dir(&tmp) + .assert() + .success(); + + let project_dir = tmp.path().join(".local/share/td/projects/main"); + let changes_dir = project_dir.join("changes"); + + // Simulate a crash after phase 1: rename changes/ to changes.compacting.X/ + // without creating a fresh changes/ or writing the new snapshot. + let compacting_dir = project_dir.join("changes.compacting.01JNFAKEULID0000000000000"); + std::fs::rename(&changes_dir, &compacting_dir) + .expect("simulate crash: rename changes/ to compacting dir"); + + // No changes/ exists now — a subsequent write would create it via + // create_dir_all, but let's leave that to tidy to exercise recovery. + + // tidy must succeed and preserve the orphaned task. + td(&tmp).arg("tidy").current_dir(&tmp).assert().success(); + + // The orphaned compacting dir must be gone. + assert!( + !compacting_dir.exists(), + "tidy should remove the orphaned changes.compacting.* dir" + ); + + // The task from the orphaned delta must still be accessible. + td(&tmp) + .arg("list") + .current_dir(&tmp) + .assert() + .success() + .stdout(predicate::str::contains("Orphaned task")); +} + +/// td compact is gone; using it should produce an error. +#[test] +fn compact_command_removed() { + let tmp = init_tmp(); + + td(&tmp).arg("compact").current_dir(&tmp).assert().failure(); +} diff --git a/tests/repro_compact.rs b/tests/repro_compact.rs index ad4eb3d179bfe87af9f1c64c3cce2578573f992d..3d17267193507c565699f644a0122dfbd3d046f3 100644 --- a/tests/repro_compact.rs +++ b/tests/repro_compact.rs @@ -35,8 +35,8 @@ fn compact_cleans_delta_files() { let deltas = std::fs::read_dir(&changes_dir).unwrap().count(); assert!(deltas > 0, "Deltas should exist before compaction"); - // Compact - td(&tmp).arg("compact").current_dir(&tmp).assert().success(); + // Tidy (formerly compact) + td(&tmp).arg("tidy").current_dir(&tmp).assert().success(); // Deltas are folded into the snapshot and removed. let deltas_after = std::fs::read_dir(&changes_dir).unwrap().count();