Implement sync bootstrap mode

Amolith created

Change summary

Makefile          |   5 
README.md         |  17 ++++
SKILL.md          |   6 +
src/cmd/sync.rs   | 171 +++++++++++++++++++++++++++++++++++++++---------
src/db.rs         |  85 ++++++++++++++++++++++--
tests/cli_sync.rs |  96 +++++++++++++++++++++++++++
6 files changed, 338 insertions(+), 42 deletions(-)

Detailed changes

Makefile 🔗

@@ -1,11 +1,12 @@
 BINDIR := $(or $(XDG_BIN_HOME),$(XDG_BIN_DIR),$(HOME)/.local/bin)
+JJ_FIX := jj --config 'fix.tools.rustfmt.command=["rustfmt","--emit","stdout","--edition","2021"]' --config 'fix.tools.rustfmt.patterns=["glob:**/*.rs"]' fix
 
 .PHONY: all check test fmt clippy verify install
 
 all: fmt check test
 
 verify:
-	@jj fix
+	@$(JJ_FIX)
 	@out=$$(cargo check --quiet 2>&1) || { printf '%s\n' "$$out"; exit 1; }; echo '✓ check'
 	@out=$$(cargo clippy --quiet -- -D warnings 2>&1) || { printf '%s\n' "$$out"; exit 1; }; echo '✓ clippy'
 	@out=$$(cargo test --quiet 2>&1) || { printf '%s\n' "$$out"; exit 1; }; echo '✓ tests'
@@ -18,7 +19,7 @@ test:
 	@cargo test --quiet
 
 fmt:
-	@jj fix
+	@$(JJ_FIX)
 
 clippy:
 	@cargo clippy --quiet -- -D warnings

README.md 🔗

@@ -16,6 +16,23 @@ allowing your agent to use other todo/issue tools in other repos even with this
 global skill. Td IDs are prefixed with `td-`, so pasting the ID should be
 enough of a mention.
 
+## Sync Bootstrapping
+
+When bringing a project to a second machine, do **not** run `td init` again.
+Initialize once on the first machine, then bootstrap the second machine by
+running `td sync` and entering the wormhole code from the first machine.
+
+```sh
+# Machine A (already initialized project)
+td sync
+
+# Machine B (same repo checkout, no td project yet)
+td sync <code-from-machine-a>
+```
+
+Running `td init` on both machines creates different `project_id` values and
+prevents sync from merging them.
+
 Inspired by [alosec/td].
 
 [alosec/td]: https://github.com/alosec/td/

SKILL.md 🔗

@@ -14,10 +14,16 @@ is a named Loro CRDT document. Directories are bound to projects via
 the canonical path. You can also override with `--project <name>` or the
 `TD_PROJECT` env var.
 
+For multi-machine setup, initialize only once. On the second machine, bootstrap
+from the first with `td sync` instead of `td init` so both sides share the same
+project identity.
+
 ```bash
 td init myproject           # create project + bind cwd to it
 td use myproject            # bind cwd to an existing project
 td projects                 # list all known projects
+td sync                     # machine A: print a wormhole code
+td sync 7-goldfish-soccer   # machine B: bootstrap from machine A
 td --project other list     # one-off override
 TD_PROJECT=other td list    # env override
 ```

src/cmd/sync.rs 🔗

@@ -23,14 +23,22 @@ const CODE_WORD_COUNT: usize = 2;
 
 /// Handshake message exchanged before the delta payload.
 #[derive(Debug, Serialize, Deserialize)]
-struct SyncHandshake {
-    /// Human-readable project name.
-    project_name: String,
-    /// Stable identity (ULID stored in the doc's root meta map).
-    project_id: String,
-    /// Serialised version vector so the peer can compute a minimal delta.
-    #[serde(with = "vv_serde")]
-    version_vector: VersionVector,
+#[serde(tag = "mode")]
+enum SyncHandshake {
+    Sync {
+        /// Human-readable project name.
+        project_name: String,
+        /// Stable identity (ULID stored in the doc's root meta map).
+        project_id: String,
+        /// Serialised version vector so the peer can compute a minimal delta.
+        #[serde(with = "vv_serde")]
+        version_vector: VersionVector,
+    },
+    Bootstrap {
+        /// Serialised version vector so the peer can compute a minimal delta.
+        #[serde(with = "vv_serde")]
+        version_vector: VersionVector,
+    },
 }
 
 /// Serde adapter for `VersionVector` using its postcard `encode()`/`decode()`.
@@ -69,11 +77,10 @@ pub fn wormhole_config() -> AppConfig<serde_json::Value> {
 /// peer sends its version vector, receives the other's, computes a
 /// minimal delta, sends it, receives the peer's delta, and imports it.
 pub async fn exchange(store: &db::Store, mut wormhole: Wormhole) -> Result<SyncReport> {
-    let my_vv = store.doc().oplog_vv();
-    let my_handshake = SyncHandshake {
+    let my_handshake = SyncHandshake::Sync {
         project_name: store.project_name().to_string(),
         project_id: read_project_id(store)?,
-        version_vector: my_vv,
+        version_vector: store.doc().oplog_vv(),
     };
 
     // --- Phase 1: exchange handshakes ---
@@ -88,21 +95,39 @@ pub async fn exchange(store: &db::Store, mut wormhole: Wormhole) -> Result<SyncR
         .context("failed to receive handshake")?
         .context("peer sent invalid handshake JSON")?;
 
-    if my_handshake.project_id != their_handshake.project_id {
-        let _ = wormhole.close().await;
-        bail!(
-            "project identity mismatch: local '{}' ({}) vs peer '{}' ({})",
-            my_handshake.project_name,
-            my_handshake.project_id,
-            their_handshake.project_name,
-            their_handshake.project_id,
-        );
-    }
+    let their_vv = match &their_handshake {
+        SyncHandshake::Sync {
+            project_name,
+            project_id,
+            version_vector,
+        } => {
+            let (my_project_name, my_project_id) = match &my_handshake {
+                SyncHandshake::Sync {
+                    project_name,
+                    project_id,
+                    ..
+                } => (project_name, project_id),
+                SyncHandshake::Bootstrap { .. } => unreachable!("sync exchange always uses Sync"),
+            };
+            if my_project_id != project_id {
+                let _ = wormhole.close().await;
+                bail!(
+                    "project identity mismatch: local '{}' ({}) vs peer '{}' ({}). If this is the same logical project, remove the accidentally initted local copy and bootstrap with 'td sync' instead of running 'td init' on both machines",
+                    my_project_name,
+                    my_project_id,
+                    project_name,
+                    project_id,
+                );
+            }
+            version_vector
+        }
+        SyncHandshake::Bootstrap { version_vector } => version_vector,
+    };
 
     // --- Phase 2: compute and exchange deltas ---
     let my_delta = store
         .doc()
-        .export(ExportMode::updates(&their_handshake.version_vector))
+        .export(ExportMode::updates(their_vv))
         .context("failed to export delta for peer")?;
 
     wormhole
@@ -143,10 +168,90 @@ pub fn run(root: &Path, code: Option<&str>, json: bool) -> Result<()> {
 }
 
 async fn run_async(root: &Path, code: Option<&str>, json: bool) -> Result<()> {
-    let store = db::open(root)?;
+    let maybe_store = db::try_open(root)?;
     let c = crate::color::stderr_theme();
 
-    let wormhole = match code {
+    let wormhole = connect_wormhole(code, json, c).await?;
+
+    let (store, report) = if let Some(store) = maybe_store {
+        if !json {
+            eprintln!("{}wormhole:{} connected, syncing...", c.blue, c.reset);
+        }
+        let report = exchange(&store, wormhole).await?;
+        (store, report)
+    } else {
+        if !json {
+            eprintln!(
+                "{}wormhole:{} connected, bootstrapping from peer...",
+                c.blue, c.reset
+            );
+        }
+        bootstrap_exchange(root, wormhole).await?
+    };
+
+    print_sync_report(&store, &report, json, c)?;
+
+    Ok(())
+}
+
+async fn bootstrap_exchange(
+    root: &Path,
+    mut wormhole: Wormhole,
+) -> Result<(db::Store, SyncReport)> {
+    wormhole
+        .send_json(&SyncHandshake::Bootstrap {
+            version_vector: VersionVector::default(),
+        })
+        .await
+        .context("failed to send bootstrap handshake")?;
+
+    let their_handshake: SyncHandshake = wormhole
+        .receive_json::<SyncHandshake>()
+        .await
+        .context("failed to receive handshake")?
+        .context("peer sent invalid handshake JSON")?;
+
+    let project_name = match their_handshake {
+        SyncHandshake::Sync { project_name, .. } => project_name,
+        SyncHandshake::Bootstrap { .. } => {
+            let _ = wormhole.close().await;
+            bail!(
+                "both peers are in bootstrap mode. Run 'td init <project>' on one machine first, then run 'td sync' on the other"
+            );
+        }
+    };
+
+    wormhole
+        .send(Vec::new())
+        .await
+        .context("failed to send bootstrap delta")?;
+
+    let their_delta = wormhole
+        .receive()
+        .await
+        .context("failed to receive bootstrap delta from peer")?;
+
+    wormhole.close().await.context("failed to close wormhole")?;
+
+    if their_delta.is_empty() {
+        bail!("peer sent empty bootstrap delta");
+    }
+
+    let store = db::bootstrap_sync(root, &project_name, &their_delta)?;
+    let report = SyncReport {
+        sent_bytes: 0,
+        received_bytes: their_delta.len(),
+        imported: true,
+    };
+    Ok((store, report))
+}
+
+async fn connect_wormhole(
+    code: Option<&str>,
+    json: bool,
+    c: &crate::color::Theme,
+) -> Result<Wormhole> {
+    match code {
         None => {
             let mailbox = MailboxConnection::create(wormhole_config(), CODE_WORD_COUNT)
                 .await
@@ -166,7 +271,7 @@ async fn run_async(root: &Path, code: Option<&str>, json: bool) -> Result<()> {
 
             Wormhole::connect(mailbox)
                 .await
-                .context("wormhole key exchange failed")?
+                .context("wormhole key exchange failed")
         }
         Some(raw) => {
             let code: Code = raw.parse().context("invalid wormhole code")?;
@@ -180,16 +285,17 @@ async fn run_async(root: &Path, code: Option<&str>, json: bool) -> Result<()> {
 
             Wormhole::connect(mailbox)
                 .await
-                .context("wormhole key exchange failed")?
+                .context("wormhole key exchange failed")
         }
-    };
-
-    if !json {
-        eprintln!("{}wormhole:{} connected, syncing...", c.blue, c.reset);
     }
+}
 
-    let report = exchange(&store, wormhole).await?;
-
+fn print_sync_report(
+    store: &db::Store,
+    report: &SyncReport,
+    json: bool,
+    c: &crate::color::Theme,
+) -> Result<()> {
     if json {
         println!(
             "{}",
@@ -215,7 +321,6 @@ async fn run_async(root: &Path, code: Option<&str>, json: bool) -> Result<()> {
             eprintln!("{}info:{} peer had no new changes", c.blue, c.reset);
         }
     }
-
     Ok(())
 }
 

src/db.rs 🔗

@@ -273,6 +273,40 @@ impl Store {
         })
     }
 
+    /// Bootstrap a local project from peer-provided delta bytes.
+    ///
+    /// The incoming delta is imported into a fresh document, validated to
+    /// ensure it carries `meta.project_id`, and then persisted as a base
+    /// snapshot for future opens.
+    pub fn bootstrap_from_peer(root: &Path, project: &str, delta: &[u8]) -> Result<Self> {
+        validate_project_name(project)?;
+        let project_dir = project_dir(root, project);
+        if project_dir.exists() {
+            bail!("project '{project}' already exists");
+        }
+        fs::create_dir_all(project_dir.join(CHANGES_DIR))?;
+
+        let doc = LoroDoc::new();
+        doc.set_peer_id(load_or_create_device_peer_id(root)?)?;
+        doc.import(delta)
+            .context("failed to import bootstrap delta from peer")?;
+        doc.commit();
+
+        read_project_id_from_doc(&doc)
+            .context("bootstrap delta is missing required project identity")?;
+
+        let snapshot = doc
+            .export(ExportMode::Snapshot)
+            .context("failed to export bootstrap loro snapshot")?;
+        atomic_write_file(&project_dir.join(BASE_FILE), &snapshot)?;
+
+        Ok(Self {
+            root: root.to_path_buf(),
+            project: project.to_string(),
+            doc,
+        })
+    }
+
     pub fn root(&self) -> &Path {
         &self.root
     }
@@ -449,10 +483,36 @@ pub fn use_project(cwd: &Path, project: &str) -> Result<()> {
 pub fn open(start: &Path) -> Result<Store> {
     let root = data_root()?;
     let explicit = std::env::var(PROJECT_ENV).ok();
-    let project = resolve_project_name(start, &root, explicit.as_deref())?;
+    let project = resolve_project_name(start, &root, explicit.as_deref())?.ok_or_else(|| {
+        anyhow!(
+            "no project selected. Use --project/TD_PROJECT, run 'td use <name>', or run 'td init <name>'"
+        )
+    })?;
     Store::open(&root, &project)
 }
 
+/// Open the project selected by `--project`/`TD_PROJECT`/bindings if one exists.
+///
+/// Returns `Ok(None)` when no project is selected by any mechanism.
+pub fn try_open(start: &Path) -> Result<Option<Store>> {
+    let root = data_root()?;
+    let explicit = std::env::var(PROJECT_ENV).ok();
+    let Some(project) = resolve_project_name(start, &root, explicit.as_deref())? else {
+        return Ok(None);
+    };
+    Store::open(&root, &project).map(Some)
+}
+
+/// Bootstrap a project from a peer delta and bind the current directory.
+pub fn bootstrap_sync(cwd: &Path, project: &str, delta: &[u8]) -> Result<Store> {
+    let root = data_root()?;
+    fs::create_dir_all(root.join(PROJECTS_DIR))?;
+    validate_project_name(project)?;
+    let store = Store::bootstrap_from_peer(&root, project, delta)?;
+    bind_project(cwd, project)?;
+    Ok(store)
+}
+
 pub fn list_projects() -> Result<Vec<String>> {
     let root = data_root()?;
     let mut out = Vec::new();
@@ -545,10 +605,14 @@ fn bindings_path(root: &Path) -> PathBuf {
     root.join(BINDINGS_FILE)
 }
 
-fn resolve_project_name(start: &Path, root: &Path, explicit: Option<&str>) -> Result<String> {
+fn resolve_project_name(
+    start: &Path,
+    root: &Path,
+    explicit: Option<&str>,
+) -> Result<Option<String>> {
     if let Some(project) = explicit {
         validate_project_name(project)?;
-        return Ok(project.to_string());
+        return Ok(Some(project.to_string()));
     }
 
     let cwd = canonicalize_binding_path(start)?;
@@ -567,12 +631,10 @@ fn resolve_project_name(start: &Path, root: &Path, explicit: Option<&str>) -> Re
     }
 
     if let Some((_, project)) = best {
-        return Ok(project);
+        return Ok(Some(project));
     }
 
-    bail!(
-        "no project selected. Use --project/TD_PROJECT, run 'td use <name>', or run 'td init <name>'"
-    )
+    Ok(None)
 }
 
 fn bind_project(cwd: &Path, project: &str) -> Result<()> {
@@ -637,6 +699,15 @@ fn validate_project_name(name: &str) -> Result<()> {
     Ok(())
 }
 
+fn read_project_id_from_doc(doc: &LoroDoc) -> Result<String> {
+    let root = serde_json::to_value(doc.get_deep_value())?;
+    root.get("meta")
+        .and_then(|m| m.get("project_id"))
+        .and_then(Value::as_str)
+        .map(str::to_owned)
+        .ok_or_else(|| anyhow!("missing meta.project_id in project doc"))
+}
+
 fn hydrate_task(task_id_raw: &str, value: &Value) -> Result<Task> {
     let obj = value
         .as_object()

tests/cli_sync.rs 🔗

@@ -1,4 +1,5 @@
 use assert_cmd::Command;
+use loro::{ExportMode, LoroDoc, VersionVector};
 use predicates::prelude::*;
 
 #[test]
@@ -176,3 +177,98 @@ fn sync_exchanges_tasks_between_peers() {
     assert!(b_titles.contains(&"task from A"));
     assert!(b_titles.contains(&"task from B"));
 }
+
+#[test]
+fn try_open_returns_none_without_binding() {
+    use yatd::db;
+
+    let home = tempfile::tempdir().unwrap();
+    let cwd = tempfile::tempdir().unwrap();
+
+    std::env::set_var("HOME", home.path());
+    assert!(
+        db::try_open(cwd.path()).unwrap().is_none(),
+        "expected no store when cwd is unbound and TD_PROJECT is unset"
+    );
+}
+
+#[test]
+fn bootstrap_from_peer_creates_openable_store() {
+    use yatd::db;
+
+    let home_a = tempfile::tempdir().unwrap();
+    let cwd_a = tempfile::tempdir().unwrap();
+    std::env::set_var("HOME", home_a.path());
+    let source = db::init(cwd_a.path(), "shared").unwrap();
+
+    let id = db::gen_id();
+    source
+        .apply_and_persist(|doc| {
+            let tasks = doc.get_map("tasks");
+            let task = db::insert_task_map(&tasks, &id)?;
+            task.insert("title", "bootstrapped task")?;
+            task.insert("description", "")?;
+            task.insert("type", "task")?;
+            task.insert("priority", "medium")?;
+            task.insert("status", "open")?;
+            task.insert("effort", "medium")?;
+            task.insert("parent", "")?;
+            task.insert("created_at", db::now_utc())?;
+            task.insert("updated_at", db::now_utc())?;
+            task.insert("deleted_at", "")?;
+            task.insert_container("labels", loro::LoroMap::new())?;
+            task.insert_container("blockers", loro::LoroMap::new())?;
+            task.insert_container("logs", loro::LoroMap::new())?;
+            Ok(())
+        })
+        .unwrap();
+
+    let full_delta = source
+        .doc()
+        .export(ExportMode::updates(&VersionVector::default()))
+        .unwrap();
+
+    let home_b = tempfile::tempdir().unwrap();
+    let root_b = home_b.path().join(".local/share/td");
+    let store_b = db::Store::bootstrap_from_peer(&root_b, "shared", &full_delta).unwrap();
+
+    assert_eq!(store_b.project_name(), "shared");
+    assert!(
+        root_b.join("projects/shared/base.loro").exists(),
+        "bootstrap should persist a base snapshot"
+    );
+
+    let reopened = db::Store::open(&root_b, "shared").unwrap();
+    let tasks = reopened.list_tasks().unwrap();
+    assert_eq!(tasks.len(), 1);
+    assert_eq!(tasks[0].title, "bootstrapped task");
+}
+
+#[test]
+fn bootstrap_from_peer_rejects_missing_project_id() {
+    use yatd::db;
+
+    let doc = LoroDoc::new();
+    doc.get_map("tasks");
+    let meta = doc.get_map("meta");
+    meta.insert("schema_version", 1i64).unwrap();
+    doc.commit();
+
+    let delta = doc
+        .export(ExportMode::updates(&VersionVector::default()))
+        .unwrap();
+
+    let home = tempfile::tempdir().unwrap();
+    let root = home.path().join(".local/share/td");
+    let err = db::Store::bootstrap_from_peer(&root, "shared", &delta).unwrap_err();
+
+    assert!(
+        err.to_string()
+            .contains("missing required project identity"),
+        "unexpected error: {err:#}"
+    );
+    assert!(
+        !root.join("projects/shared/base.loro").exists(),
+        "bootstrap should not persist snapshot for invalid peer doc"
+    );
+}