Add SyncAll: sync all shared projects when neither peer has one selected

Amolith created

When both peers run 'td sync' without a project selected (no binding,
no --project, no TD_PROJECT), instead of bailing with 'both peers are
in bootstrap mode', enumerate all local projects on both sides, compute
the intersection by project_id, and sync each shared project.

Protocol change: replace the Bootstrap handshake (for the 'no project
selected' case) with a new SyncAll variant carrying a manifest of every
local project (name, id, version_vector). Both sides compute the
intersection by project_id, agree on an ordering (sort by project_id),
then loop over the shared set exchanging deltas per-project exactly like
the existing single-project Sync path.

Fallback cases handled:
- SyncAll peer + Sync peer: bootstrap from the peer that has a project
  selected, identical to the old Bootstrap path.
- SyncAll + Bootstrap (old td): bail with an upgrade hint.
- No intersection: friendly no-op message.
- Both have zero projects: no intersection, friendly no-op.
- Name mismatch for same project_id: sync anyway, print a warning.

Also adds:
- db::list_projects_in(root) - root-explicit helper used by build_local_manifest
- db::Store::project_id() - exposes read_project_id_from_doc via Store
- db::bootstrap_sync_at(data_root, cwd, project, delta) - HOME-free
  bootstrap used from async sync code where HOME may vary by peer

Tests: sync_all_exchanges_shared_projects, sync_all_no_intersection_is_noop

Change summary

Cargo.lock        |  11 +
Cargo.toml        |   1 
src/cmd/sync.rs   | 436 +++++++++++++++++++++++++++++++++++++++---------
src/db.rs         |  70 +++++++
tests/cli_sync.rs | 184 ++++++++++++++++++++
5 files changed, 619 insertions(+), 83 deletions(-)

Detailed changes

Cargo.lock πŸ”—

@@ -1029,6 +1029,16 @@ dependencies = [
  "percent-encoding",
 ]
 
+[[package]]
+name = "fs2"
+version = "0.4.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9564fc758e15025b46aa6643b1b77d047d1a56a1aea6e01002ac0c7026876213"
+dependencies = [
+ "libc",
+ "winapi",
+]
+
 [[package]]
 name = "futures"
 version = "0.3.32"
@@ -3588,6 +3598,7 @@ dependencies = [
  "chrono",
  "clap",
  "comfy-table",
+ "fs2",
  "loro",
  "magic-wormhole",
  "predicates",

Cargo.toml πŸ”—

@@ -19,6 +19,7 @@ loro = "1"
 ulid = "1"
 magic-wormhole = "0.7.6"
 tokio = { version = "1.49.0", features = ["rt-multi-thread", "macros"] }
+fs2 = "0.4"
 
 [dev-dependencies]
 assert_cmd = "2"

src/cmd/sync.rs πŸ”—

@@ -4,8 +4,14 @@
 //! compute deltas containing only the ops the other side lacks, then
 //! exchange and import those deltas.  The result is that both docs
 //! converge to the same state without sending duplicate operations.
+//!
+//! When neither peer has a project selected, they exchange a `SyncAll`
+//! handshake carrying a manifest of all their local projects.  Each side
+//! computes the intersection by `project_id` and syncs every shared
+//! project in one wormhole session.
 
 use std::borrow::Cow;
+use std::collections::HashMap;
 use std::path::Path;
 
 use anyhow::{bail, Context, Result};
@@ -14,6 +20,7 @@ use magic_wormhole::{AppConfig, AppID, Code, MailboxConnection, Wormhole};
 use serde::{Deserialize, Serialize};
 
 use crate::db;
+use crate::db::PROJECTS_DIR;
 
 /// Custom AppID scoping our wormhole traffic away from other protocols.
 const APP_ID: &str = "td.sync.v1";
@@ -21,6 +28,15 @@ const APP_ID: &str = "td.sync.v1";
 /// Number of random words in the generated wormhole code.
 const CODE_WORD_COUNT: usize = 2;
 
+/// One entry in a [`SyncAll`] manifest: name, stable id, and current VV.
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct ProjectEntry {
+    pub project_name: String,
+    pub project_id: String,
+    #[serde(with = "vv_serde")]
+    pub version_vector: VersionVector,
+}
+
 /// Handshake message exchanged before the delta payload.
 #[derive(Debug, Serialize, Deserialize)]
 #[serde(tag = "mode")]
@@ -39,6 +55,9 @@ enum SyncHandshake {
         #[serde(with = "vv_serde")]
         version_vector: VersionVector,
     },
+    /// Sent when no project is selected locally.  Carries a manifest of every
+    /// local project so the peer can compute the intersection.
+    SyncAll { projects: Vec<ProjectEntry> },
 }
 
 /// Serde adapter for `VersionVector` using its postcard `encode()`/`decode()`.
@@ -56,7 +75,8 @@ mod vv_serde {
     }
 }
 
-/// Outcome of a sync exchange, returned by [`exchange`].
+/// Outcome of a single-project sync exchange, returned by [`exchange`] and
+/// [`sync_all_exchange`].
 pub struct SyncReport {
     pub sent_bytes: usize,
     pub received_bytes: usize,
@@ -71,11 +91,31 @@ pub fn wormhole_config() -> AppConfig<serde_json::Value> {
     }
 }
 
+/// Import a delta into a store and persist it.
+///
+/// Commits the document and saves the raw delta for storage. Returns true
+/// if any changes were imported (delta was non-empty).
+fn import_and_persist(store: &db::Store, delta: &[u8]) -> Result<bool> {
+    if delta.is_empty() {
+        return Ok(false);
+    }
+    store
+        .doc()
+        .import(delta)
+        .context("failed to import delta")?;
+    store.doc().commit();
+    store.save_raw_delta(delta)?;
+    Ok(true)
+}
+
 /// Run the sync protocol over an already-established wormhole.
 ///
 /// Both sides call this concurrently.  The protocol is symmetric: each
 /// peer sends its version vector, receives the other's, computes a
 /// minimal delta, sends it, receives the peer's delta, and imports it.
+///
+/// If the peer sends `SyncAll` (no project selected on their end), the local
+/// project is treated as a bootstrap source and a full delta is sent.
 pub async fn exchange(store: &db::Store, mut wormhole: Wormhole) -> Result<SyncReport> {
     let my_handshake = SyncHandshake::Sync {
         project_name: store.project_name().to_string(),
@@ -97,7 +137,9 @@ pub async fn exchange(store: &db::Store, mut wormhole: Wormhole) -> Result<SyncR
             "peer sent incompatible handshake (are both sides running the same version of td?)",
         )?;
 
-    let their_vv = match &their_handshake {
+    // Determine the peer's version vector, validating project identity when
+    // both sides have a project selected.
+    let their_vv: VersionVector = match &their_handshake {
         SyncHandshake::Sync {
             project_name,
             project_id,
@@ -109,7 +151,9 @@ pub async fn exchange(store: &db::Store, mut wormhole: Wormhole) -> Result<SyncR
                     project_id,
                     ..
                 } => (project_name, project_id),
-                SyncHandshake::Bootstrap { .. } => unreachable!("sync exchange always uses Sync"),
+                SyncHandshake::Bootstrap { .. } | SyncHandshake::SyncAll { .. } => {
+                    unreachable!("exchange always sends Sync")
+                }
             };
             if my_project_id != project_id {
                 let _ = wormhole.close().await;
@@ -121,15 +165,26 @@ pub async fn exchange(store: &db::Store, mut wormhole: Wormhole) -> Result<SyncR
                     project_id,
                 );
             }
-            version_vector
+            version_vector.clone()
+        }
+        // Peer has no project; treat as a bootstrap request and export everything.
+        SyncHandshake::Bootstrap { version_vector } => version_vector.clone(),
+        SyncHandshake::SyncAll { projects } => {
+            // Peer has no project selected but sent their manifest.
+            // Find our project in their manifest to get their VV for us.
+            let my_id = read_project_id(store)?;
+            projects
+                .iter()
+                .find(|p| p.project_id == my_id)
+                .map(|p| p.version_vector.clone())
+                .unwrap_or_default()
         }
-        SyncHandshake::Bootstrap { version_vector } => version_vector,
     };
 
     // --- Phase 2: compute and exchange deltas ---
     let my_delta = store
         .doc()
-        .export(ExportMode::updates(their_vv))
+        .export(ExportMode::updates(&their_vv))
         .context("failed to export delta for peer")?;
 
     wormhole
@@ -145,17 +200,8 @@ pub async fn exchange(store: &db::Store, mut wormhole: Wormhole) -> Result<SyncR
     wormhole.close().await.context("failed to close wormhole")?;
 
     // --- Phase 3: import the peer's delta locally ---
-    let imported = if !their_delta.is_empty() {
-        store
-            .doc()
-            .import(&their_delta)
-            .context("failed to import peer delta")?;
-        store.doc().commit();
-        store.save_raw_delta(&their_delta)?;
-        true
-    } else {
-        false
-    };
+    let imported =
+        import_and_persist(store, &their_delta).context("failed to import peer delta")?;
 
     Ok(SyncReport {
         sent_bytes: my_delta.len(),
@@ -164,48 +210,56 @@ pub async fn exchange(store: &db::Store, mut wormhole: Wormhole) -> Result<SyncR
     })
 }
 
-pub fn run(root: &Path, code: Option<&str>, json: bool) -> Result<()> {
-    let rt = tokio::runtime::Runtime::new().context("failed to create async runtime")?;
-    rt.block_on(run_async(root, code, json))
-}
-
-async fn run_async(root: &Path, code: Option<&str>, json: bool) -> Result<()> {
-    let maybe_store = db::try_open(root)?;
-    let c = crate::color::stderr_theme();
-
-    let wormhole = connect_wormhole(code, json, c).await?;
-
-    let (store, report) = if let Some(store) = maybe_store {
-        if !json {
-            eprintln!("{}wormhole:{} connected, syncing...", c.blue, c.reset);
-        }
-        let report = exchange(&store, wormhole).await?;
-        (store, report)
-    } else {
-        if !json {
-            eprintln!(
-                "{}wormhole:{} connected, bootstrapping from peer...",
-                c.blue, c.reset
-            );
-        }
-        bootstrap_exchange(root, wormhole).await?
-    };
-
-    print_sync_report(&store, &report, json, c)?;
-
-    Ok(())
+/// Build a manifest of all local projects using the given data root.
+///
+/// Each entry carries the project name, stable id, and current version vector.
+/// Using an explicit `data_root` (rather than `HOME`) makes this safe to call
+/// from async contexts where `HOME` may vary between peers.
+pub fn build_local_manifest(data_root: &Path) -> Result<Vec<ProjectEntry>> {
+    let names = db::list_projects_in(data_root)?;
+    let mut entries = Vec::with_capacity(names.len());
+    for name in names {
+        let store = db::Store::open(data_root, &name)?;
+        let project_id = store.project_id()?;
+        let version_vector = store.doc().oplog_vv();
+        entries.push(ProjectEntry {
+            project_name: name,
+            project_id,
+            version_vector,
+        });
+    }
+    Ok(entries)
 }
 
-async fn bootstrap_exchange(
-    root: &Path,
+/// Run the SyncAll protocol over an already-established wormhole.
+///
+/// Sends a `SyncAll` handshake carrying `local_manifest`, then handles the
+/// three possible responses:
+///
+/// - **`Sync`** β€” peer has a project selected; bootstrap that project from
+///   them (identical semantics to the old single-peer bootstrap path).
+/// - **`SyncAll`** β€” peer also has no project selected; compute the
+///   intersection by `project_id`, sync each shared project in order, and
+///   return one `(Store, SyncReport)` per synced project.  An empty
+///   intersection is not an error; an empty `Vec` is returned.
+/// - **`Bootstrap`** β€” peer is running an older td; bail with an upgrade hint.
+///
+/// `cwd` is only used when bootstrapping a brand-new project from a `Sync`
+/// peer (to create the directory binding).  It is ignored in the `SyncAll`
+/// case where all projects already exist locally.
+pub async fn sync_all_exchange(
+    cwd: &Path,
+    data_root: &Path,
+    local_manifest: Vec<ProjectEntry>,
     mut wormhole: Wormhole,
-) -> Result<(db::Store, SyncReport)> {
+) -> Result<Vec<(db::Store, SyncReport)>> {
+    // Send our manifest.  Clone so we can still use `local_manifest` below.
     wormhole
-        .send_json(&SyncHandshake::Bootstrap {
-            version_vector: VersionVector::default(),
+        .send_json(&SyncHandshake::SyncAll {
+            projects: local_manifest.clone(),
         })
         .await
-        .context("failed to send bootstrap handshake")?;
+        .context("failed to send SyncAll handshake")?;
 
     let their_handshake: SyncHandshake = wormhole
         .receive_json::<SyncHandshake>()
@@ -215,39 +269,264 @@ async fn bootstrap_exchange(
             "peer sent incompatible handshake (are both sides running the same version of td?)",
         )?;
 
-    let project_name = match their_handshake {
-        SyncHandshake::Sync { project_name, .. } => project_name,
+    match their_handshake {
+        SyncHandshake::Sync {
+            project_name,
+            project_id,
+            version_vector,
+        } => {
+            // Peer has a specific project selected.
+            // Validate the peer's project name to prevent path traversal.
+            db::validate_project_name(&project_name)?;
+            let project_dir = data_root.join(PROJECTS_DIR).join(&project_name);
+
+            if project_dir.exists() {
+                // Project exists locally: open it, verify ID matches, then sync normally.
+                let store = db::Store::open(data_root, &project_name).with_context(|| {
+                    format!("failed to open existing project '{}'", project_name)
+                })?;
+                let my_id = read_project_id(&store)?;
+                if my_id != project_id {
+                    bail!(
+                        "project identity mismatch: local '{}' ({}) vs peer '{}' ({}). \
+                         Remove the accidentally initted local copy and bootstrap with 'td sync'",
+                        project_name,
+                        my_id,
+                        project_name,
+                        project_id
+                    );
+                }
+
+                // Sync the existing project.
+                let my_delta = store
+                    .doc()
+                    .export(ExportMode::updates(&version_vector))
+                    .context("failed to export delta for peer")?;
+
+                wormhole
+                    .send(my_delta.clone())
+                    .await
+                    .context("failed to send delta")?;
+
+                let their_delta = wormhole
+                    .receive()
+                    .await
+                    .context("failed to receive delta from peer")?;
+
+                wormhole.close().await.context("failed to close wormhole")?;
+
+                let imported = import_and_persist(&store, &their_delta)
+                    .context("failed to import peer's delta")?;
+
+                let report = SyncReport {
+                    sent_bytes: my_delta.len(),
+                    received_bytes: their_delta.len(),
+                    imported,
+                };
+                Ok(vec![(store, report)])
+            } else {
+                // Project doesn't exist: bootstrap from peer.
+                // The peer (who sent Sync) expects to send their full state and be done.
+                wormhole
+                    .send(Vec::new())
+                    .await
+                    .context("failed to send empty bootstrap delta")?;
+                let their_delta = wormhole
+                    .receive()
+                    .await
+                    .context("failed to receive bootstrap delta from peer")?;
+
+                if their_delta.is_empty() {
+                    bail!("peer sent empty bootstrap delta");
+                }
+
+                wormhole.close().await.context("failed to close wormhole")?;
+
+                // Don't bind cwd when bootstrapping from SyncAll→Sync fallback —
+                // the user didn't intend to bind their current directory to this project.
+                let store =
+                    db::bootstrap_sync_at(data_root, cwd, &project_name, &their_delta, false)?;
+
+                let report = SyncReport {
+                    sent_bytes: 0,
+                    received_bytes: their_delta.len(),
+                    imported: true,
+                };
+                Ok(vec![(store, report)])
+            }
+        }
+
         SyncHandshake::Bootstrap { .. } => {
+            // Peer is running an older td that does not know about SyncAll.
+            // The peer will have already failed when it received our SyncAll
+            // handshake, so both sides are closing.
             let _ = wormhole.close().await;
             bail!(
-                "both peers are in bootstrap mode. Run 'td project init <project>' on one machine first, then run 'td sync' on the other"
+                "peer is running an older version of td that does not support SyncAll. \
+                 Upgrade td on both machines and try again"
             );
         }
-    };
 
-    wormhole
-        .send(Vec::new())
-        .await
-        .context("failed to send bootstrap delta")?;
+        SyncHandshake::SyncAll {
+            projects: their_projects,
+        } => {
+            // Both sides have no project selected.  Compute intersection by
+            // project_id and sync each shared project.
+            sync_shared_projects(data_root, local_manifest, their_projects, wormhole).await
+        }
+    }
+}
 
-    let their_delta = wormhole
-        .receive()
-        .await
-        .context("failed to receive bootstrap delta from peer")?;
+/// Exchange deltas for every project whose `project_id` appears on both sides.
+///
+/// Both sides sort the intersection by `project_id` (deterministic ordering),
+/// then for each project: send my delta, receive theirs, import.  This mirrors
+/// the single-project [`exchange`] pattern β€” each side sends before it receives,
+/// so the relay buffers the message and neither side blocks waiting for the
+/// other to read first.
+async fn sync_shared_projects(
+    data_root: &Path,
+    local_projects: Vec<ProjectEntry>,
+    their_projects: Vec<ProjectEntry>,
+    mut wormhole: Wormhole,
+) -> Result<Vec<(db::Store, SyncReport)>> {
+    // Wrap the body in an async block so we can ensure close() runs on all paths.
+    let result: Result<Vec<(db::Store, SyncReport)>> = async {
+        // Build a fast lookup from project_id β†’ their entry.
+        let their_by_id: HashMap<&str, &ProjectEntry> = their_projects
+            .iter()
+            .map(|p| (p.project_id.as_str(), p))
+            .collect();
+
+        // Collect the intersection, sorted by project_id for a deterministic wire
+        // ordering that both peers independently agree on.
+        let mut shared: Vec<(&ProjectEntry, &ProjectEntry)> = local_projects
+            .iter()
+            .filter_map(|mine| {
+                their_by_id
+                    .get(mine.project_id.as_str())
+                    .map(|theirs| (mine, *theirs))
+            })
+            .collect();
+        shared.sort_by(|a, b| a.0.project_id.cmp(&b.0.project_id));
+
+        if shared.is_empty() {
+            return Ok(vec![]);
+        }
 
-    wormhole.close().await.context("failed to close wormhole")?;
+        // For each project: open store, export delta, send/receive, import.
+        // Stores are opened one at a time to avoid exhausting file descriptors
+        // when syncing many projects.
+        let mut results: Vec<(db::Store, SyncReport)> = Vec::with_capacity(shared.len());
+        for (mine, theirs) in shared {
+            if mine.project_name != theirs.project_name {
+                eprintln!(
+                    "warning: project name mismatch for id {}: \
+                     local '{}', peer '{}' β€” syncing by id",
+                    mine.project_id, mine.project_name, theirs.project_name,
+                );
+            }
+            let store = db::Store::open(data_root, &mine.project_name)
+                .with_context(|| format!("failed to open project '{}'", mine.project_name))?;
+            let my_delta = store
+                .doc()
+                .export(ExportMode::updates(&theirs.version_vector))
+                .with_context(|| {
+                    format!("failed to export delta for '{}'", store.project_name())
+                })?;
+
+            wormhole
+                .send(my_delta.clone())
+                .await
+                .with_context(|| format!("failed to send delta for '{}'", store.project_name()))?;
+
+            let their_delta = wormhole.receive().await.with_context(|| {
+                format!("failed to receive delta for '{}'", store.project_name())
+            })?;
+
+            let imported = import_and_persist(&store, &their_delta).with_context(|| {
+                format!("failed to import delta for '{}'", store.project_name())
+            })?;
+
+            results.push((
+                store,
+                SyncReport {
+                    sent_bytes: my_delta.len(),
+                    received_bytes: their_delta.len(),
+                    imported,
+                },
+            ));
+        }
 
-    if their_delta.is_empty() {
-        bail!("peer sent empty bootstrap delta");
+        Ok(results)
     }
+    .await;
 
-    let store = db::bootstrap_sync(root, &project_name, &their_delta)?;
-    let report = SyncReport {
-        sent_bytes: 0,
-        received_bytes: their_delta.len(),
-        imported: true,
-    };
-    Ok((store, report))
+    // Always close the wormhole, but don't let close errors mask the original result.
+    let _ = wormhole.close().await;
+    result
+}
+
+pub fn run(root: &Path, code: Option<&str>, json: bool) -> Result<()> {
+    let rt = tokio::runtime::Runtime::new().context("failed to create async runtime")?;
+    rt.block_on(run_async(root, code, json))
+}
+
+async fn run_async(root: &Path, code: Option<&str>, json: bool) -> Result<()> {
+    let maybe_store = db::try_open(root)?;
+    let c = crate::color::stderr_theme();
+
+    let wormhole = connect_wormhole(code, json, c).await?;
+
+    if let Some(store) = maybe_store {
+        // A project is selected: single-project sync.
+        if !json {
+            eprintln!("{}wormhole:{} connected, syncing...", c.blue, c.reset);
+        }
+        let report = exchange(&store, wormhole).await?;
+        print_sync_report(&store, &report, json, c)?;
+    } else {
+        // No project selected: enumerate local projects and attempt SyncAll.
+        let data_root = db::data_root()?;
+        let local_manifest = build_local_manifest(&data_root)?;
+
+        if !json {
+            if local_manifest.is_empty() {
+                eprintln!(
+                    "{}wormhole:{} connected, bootstrapping from peer...",
+                    c.blue, c.reset
+                );
+            } else {
+                eprintln!(
+                    "{}wormhole:{} connected, syncing all shared projects...",
+                    c.blue, c.reset
+                );
+            }
+        }
+
+        let results = sync_all_exchange(root, &data_root, local_manifest, wormhole).await?;
+
+        if results.is_empty() {
+            if json {
+                println!(
+                    "{}",
+                    serde_json::to_string(
+                        &serde_json::json!({"synced": false, "reason": "no_shared_projects"})
+                    )?
+                );
+            } else {
+                eprintln!("{}info:{} no shared projects to sync", c.blue, c.reset);
+            }
+        } else {
+            // Iterate by value to drop each Store (and its file handle) immediately
+            // after printing, rather than holding all handles until the loop ends.
+            for (store, report) in results {
+                print_sync_report(&store, &report, json, c)?;
+            }
+        }
+    }
+
+    Ok(())
 }
 
 async fn connect_wormhole(
@@ -330,10 +609,5 @@ fn print_sync_report(
 
 /// Read the stable project identity from the doc's root meta map.
 fn read_project_id(store: &db::Store) -> Result<String> {
-    let root = serde_json::to_value(store.doc().get_deep_value())?;
-    root.get("meta")
-        .and_then(|m| m.get("project_id"))
-        .and_then(|v| v.as_str())
-        .map(str::to_owned)
-        .ok_or_else(|| anyhow::anyhow!("missing meta.project_id in project doc"))
+    store.project_id()
 }

src/db.rs πŸ”—

@@ -7,11 +7,13 @@ use std::fmt;
 use std::fs::{self, File, OpenOptions};
 use std::io::Write;
 use std::path::{Path, PathBuf};
+
+use fs2::FileExt;
 use ulid::Ulid;
 
 pub const PROJECT_ENV: &str = "TD_PROJECT";
 
-const PROJECTS_DIR: &str = "projects";
+pub(crate) const PROJECTS_DIR: &str = "projects";
 const CHANGES_DIR: &str = "changes";
 const BINDINGS_FILE: &str = "bindings.json";
 const BASE_FILE: &str = "base.loro";
@@ -520,6 +522,11 @@ impl Store {
         Ok(tasks)
     }
 
+    /// Return the stable project identity stored in `meta.project_id`.
+    pub fn project_id(&self) -> Result<String> {
+        read_project_id_from_doc(&self.doc)
+    }
+
     pub fn schema_version(&self) -> Result<u32> {
         migrate::read_schema_version(&self.doc)
     }
@@ -609,8 +616,67 @@ pub fn bootstrap_sync(cwd: &Path, project: &str, delta: &[u8]) -> Result<Store>
     Ok(store)
 }
 
+/// Bootstrap a project from a peer delta using an explicit data root.
+///
+/// Unlike [`bootstrap_sync`], this function does not consult `HOME` and is
+/// therefore safe to call from async contexts where `HOME` may vary by peer.
+///
+/// If `bind_cwd` is true, the given working directory is bound to the new
+/// project. Pass false when bootstrapping from a SyncAll context to avoid
+/// unexpectedly binding directories like the user's home.
+///
+/// Uses exclusive file locking to prevent race conditions when multiple
+/// concurrent sync operations create projects or modify bindings.
+pub fn bootstrap_sync_at(
+    data_root: &Path,
+    cwd: &Path,
+    project: &str,
+    delta: &[u8],
+    bind_cwd: bool,
+) -> Result<Store> {
+    fs::create_dir_all(data_root.join(PROJECTS_DIR))?;
+    validate_project_name(project)?;
+
+    // Exclusive lock prevents races when concurrent syncs create the same project
+    // or modify bindings simultaneously.
+    let lock_path = data_root.join(".bindings.lock");
+    let lock_file = OpenOptions::new()
+        .create(true)
+        .truncate(false)
+        .write(true)
+        .open(&lock_path)
+        .with_context(|| format!("failed to open lock file '{}'", lock_path.display()))?;
+    lock_file
+        .lock_exclusive()
+        .context("failed to acquire exclusive lock on bindings")?;
+
+    // Now holding the lock: create project and optionally update bindings atomically.
+    let store = Store::bootstrap_from_peer(data_root, project, delta)?;
+
+    if bind_cwd {
+        let canonical = fs::canonicalize(cwd)
+            .with_context(|| format!("failed to canonicalize '{}'", cwd.display()))?;
+        let mut bindings = load_bindings(data_root)?;
+        bindings
+            .bindings
+            .insert(canonical.to_string_lossy().to_string(), project.to_string());
+        save_bindings(data_root, &bindings)?;
+    }
+
+    // Lock is released when lock_file is dropped.
+    Ok(store)
+}
+
 pub fn list_projects() -> Result<Vec<String>> {
     let root = data_root()?;
+    list_projects_in(&root)
+}
+
+/// List project names rooted at an explicit data directory.
+///
+/// Unlike [`list_projects`], this does not consult `HOME` and is therefore
+/// safe to call from async contexts where `HOME` may vary between peers.
+pub(crate) fn list_projects_in(root: &Path) -> Result<Vec<String>> {
     let mut out = Vec::new();
     let dir = root.join(PROJECTS_DIR);
     if !dir.exists() {
@@ -818,7 +884,7 @@ fn is_prefix_path(prefix: &Path, target: &Path) -> bool {
     }
 }
 
-fn validate_project_name(name: &str) -> Result<()> {
+pub fn validate_project_name(name: &str) -> Result<()> {
     if name.is_empty() {
         bail!("project name cannot be empty");
     }

tests/cli_sync.rs πŸ”—

@@ -271,3 +271,187 @@ fn bootstrap_from_peer_rejects_missing_project_id() {
         "bootstrap should not persist snapshot for invalid peer doc"
     );
 }
+
+/// Helper: insert a minimal valid task into a doc via apply_and_persist.
+fn insert_task(store: &yatd::db::Store, title: &str) {
+    let id = yatd::db::gen_id();
+    store
+        .apply_and_persist(|doc| {
+            let tasks = doc.get_map("tasks");
+            let task = yatd::db::insert_task_map(&tasks, &id)?;
+            task.insert("title", title)?;
+            task.insert("description", "")?;
+            task.insert("type", "task")?;
+            task.insert("priority", "medium")?;
+            task.insert("status", "open")?;
+            task.insert("effort", "medium")?;
+            task.insert("parent", "")?;
+            task.insert("created_at", yatd::db::now_utc())?;
+            task.insert("updated_at", yatd::db::now_utc())?;
+            task.insert("deleted_at", "")?;
+            task.insert_container("labels", loro::LoroMap::new())?;
+            task.insert_container("blockers", loro::LoroMap::new())?;
+            task.insert_container("logs", loro::LoroMap::new())?;
+            Ok(())
+        })
+        .unwrap();
+}
+
+/// Both peers have the same project (same project_id) with no directory
+/// binding/selection.  SyncAll should discover the shared project and converge
+/// both stores to the same state.
+#[test]
+fn sync_all_exchanges_shared_projects() {
+    use std::fs;
+    use yatd::cmd::sync::{build_local_manifest, sync_all_exchange, wormhole_config};
+    use yatd::db;
+
+    let home_a = tempfile::tempdir().unwrap();
+    let home_b = tempfile::tempdir().unwrap();
+    let cwd_a = tempfile::tempdir().unwrap();
+    let cwd_b = tempfile::tempdir().unwrap();
+
+    let data_root_a = home_a.path().join(".local/share/td");
+    let data_root_b = home_b.path().join(".local/share/td");
+    fs::create_dir_all(data_root_a.join("projects")).unwrap();
+    fs::create_dir_all(data_root_b.join("projects")).unwrap();
+
+    // Peer A: init "shared" and add a task.
+    let store_a = db::Store::init(&data_root_a, "shared").unwrap();
+    insert_task(&store_a, "task from A");
+
+    // Peer B: bootstrap from A's base snapshot (same project_id), add its own task.
+    let proj_b = data_root_b.join("projects/shared");
+    fs::create_dir_all(proj_b.join("changes")).unwrap();
+    fs::copy(
+        data_root_a.join("projects/shared/base.loro"),
+        proj_b.join("base.loro"),
+    )
+    .unwrap();
+    let store_b = db::Store::open(&data_root_b, "shared").unwrap();
+    insert_task(&store_b, "task from B");
+
+    // Build manifests from disk (HOME-free: uses explicit data_root).
+    let manifest_a = build_local_manifest(&data_root_a).unwrap();
+    let manifest_b = build_local_manifest(&data_root_b).unwrap();
+    assert_eq!(manifest_a.len(), 1);
+    assert_eq!(manifest_b.len(), 1);
+    assert_eq!(
+        manifest_a[0].project_id, manifest_b[0].project_id,
+        "both sides must share the same project_id"
+    );
+
+    let rt = tokio::runtime::Runtime::new().unwrap();
+    let (results_a, results_b) = rt.block_on(async {
+        use magic_wormhole::{MailboxConnection, Wormhole};
+
+        let mailbox_a = MailboxConnection::create(wormhole_config(), 2)
+            .await
+            .unwrap();
+        let code = mailbox_a.code().clone();
+        let mailbox_b = MailboxConnection::connect(wormhole_config(), code, false)
+            .await
+            .unwrap();
+        let (wormhole_a, wormhole_b) =
+            tokio::try_join!(Wormhole::connect(mailbox_a), Wormhole::connect(mailbox_b)).unwrap();
+
+        tokio::try_join!(
+            sync_all_exchange(cwd_a.path(), &data_root_a, manifest_a, wormhole_a),
+            sync_all_exchange(cwd_b.path(), &data_root_b, manifest_b, wormhole_b),
+        )
+        .unwrap()
+    });
+
+    assert_eq!(
+        results_a.len(),
+        1,
+        "A should have synced exactly one project"
+    );
+    assert_eq!(
+        results_b.len(),
+        1,
+        "B should have synced exactly one project"
+    );
+
+    let (store_a_synced, report_a) = &results_a[0];
+    let (store_b_synced, report_b) = &results_b[0];
+
+    // Both peers should have imported: A has "task A", B starts from A's empty
+    // base then adds "task B". After sync, both have distinct changes to exchange.
+    assert!(report_a.imported, "A should have imported B's task");
+    assert!(report_b.imported, "B should have imported A's task");
+
+    let a_tasks = store_a_synced.list_tasks().unwrap();
+    let b_tasks = store_b_synced.list_tasks().unwrap();
+    assert_eq!(a_tasks.len(), 2, "A should have 2 tasks after SyncAll");
+    assert_eq!(b_tasks.len(), 2, "B should have 2 tasks after SyncAll");
+
+    let a_titles: Vec<&str> = a_tasks.iter().map(|t| t.title.as_str()).collect();
+    let b_titles: Vec<&str> = b_tasks.iter().map(|t| t.title.as_str()).collect();
+    assert!(a_titles.contains(&"task from A"));
+    assert!(a_titles.contains(&"task from B"));
+    assert!(b_titles.contains(&"task from A"));
+    assert!(b_titles.contains(&"task from B"));
+}
+
+/// Both peers have projects but no project_ids in common.  SyncAll should
+/// complete without error and return an empty result on both sides.
+#[test]
+fn sync_all_no_intersection_is_noop() {
+    use std::fs;
+    use yatd::cmd::sync::{build_local_manifest, sync_all_exchange, wormhole_config};
+    use yatd::db;
+
+    let home_a = tempfile::tempdir().unwrap();
+    let home_b = tempfile::tempdir().unwrap();
+    let cwd_a = tempfile::tempdir().unwrap();
+    let cwd_b = tempfile::tempdir().unwrap();
+
+    let data_root_a = home_a.path().join(".local/share/td");
+    let data_root_b = home_b.path().join(".local/share/td");
+    fs::create_dir_all(data_root_a.join("projects")).unwrap();
+    fs::create_dir_all(data_root_b.join("projects")).unwrap();
+
+    // A has "alpha", B has "bravo" β€” independently initialised, different project_ids.
+    let _ = db::Store::init(&data_root_a, "alpha").unwrap();
+    let _ = db::Store::init(&data_root_b, "bravo").unwrap();
+
+    let manifest_a = build_local_manifest(&data_root_a).unwrap();
+    let manifest_b = build_local_manifest(&data_root_b).unwrap();
+    assert_eq!(manifest_a.len(), 1);
+    assert_eq!(manifest_b.len(), 1);
+    assert_ne!(
+        manifest_a[0].project_id, manifest_b[0].project_id,
+        "projects must have different ids"
+    );
+
+    let rt = tokio::runtime::Runtime::new().unwrap();
+    let (results_a, results_b) = rt.block_on(async {
+        use magic_wormhole::{MailboxConnection, Wormhole};
+
+        let mailbox_a = MailboxConnection::create(wormhole_config(), 2)
+            .await
+            .unwrap();
+        let code = mailbox_a.code().clone();
+        let mailbox_b = MailboxConnection::connect(wormhole_config(), code, false)
+            .await
+            .unwrap();
+        let (wormhole_a, wormhole_b) =
+            tokio::try_join!(Wormhole::connect(mailbox_a), Wormhole::connect(mailbox_b)).unwrap();
+
+        tokio::try_join!(
+            sync_all_exchange(cwd_a.path(), &data_root_a, manifest_a, wormhole_a),
+            sync_all_exchange(cwd_b.path(), &data_root_b, manifest_b, wormhole_b),
+        )
+        .unwrap()
+    });
+
+    assert!(
+        results_a.is_empty(),
+        "A: no shared projects, result should be empty"
+    );
+    assert!(
+        results_b.is_empty(),
+        "B: no shared projects, result should be empty"
+    );
+}