From 664e7796293822e00a7bf57a2cef71ebbb2c386e Mon Sep 17 00:00:00 2001 From: Amolith Date: Mon, 2 Mar 2026 15:55:38 -0700 Subject: [PATCH] Add SyncAll: sync all shared projects when neither peer has one selected When both peers run 'td sync' without a project selected (no binding, no --project, no TD_PROJECT), instead of bailing with 'both peers are in bootstrap mode', enumerate all local projects on both sides, compute the intersection by project_id, and sync each shared project. Protocol change: replace the Bootstrap handshake (for the 'no project selected' case) with a new SyncAll variant carrying a manifest of every local project (name, id, version_vector). Both sides compute the intersection by project_id, agree on an ordering (sort by project_id), then loop over the shared set exchanging deltas per-project exactly like the existing single-project Sync path. Fallback cases handled: - SyncAll peer + Sync peer: bootstrap from the peer that has a project selected, identical to the old Bootstrap path. - SyncAll + Bootstrap (old td): bail with an upgrade hint. - No intersection: friendly no-op message. - Both have zero projects: no intersection, friendly no-op. - Name mismatch for same project_id: sync anyway, print a warning. Also adds: - db::list_projects_in(root) - root-explicit helper used by build_local_manifest - db::Store::project_id() - exposes read_project_id_from_doc via Store - db::bootstrap_sync_at(data_root, cwd, project, delta) - HOME-free bootstrap used from async sync code where HOME may vary by peer Tests: sync_all_exchanges_shared_projects, sync_all_no_intersection_is_noop --- Cargo.lock | 11 ++ Cargo.toml | 1 + src/cmd/sync.rs | 436 +++++++++++++++++++++++++++++++++++++--------- src/db.rs | 70 +++++++- tests/cli_sync.rs | 184 +++++++++++++++++++ 5 files changed, 619 insertions(+), 83 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 69fdbc747e8db1a89b752db8f6227569f908d2b1..d9a0a446fda65096ca60e035ac0baedfc6439467 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1029,6 +1029,16 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "fs2" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9564fc758e15025b46aa6643b1b77d047d1a56a1aea6e01002ac0c7026876213" +dependencies = [ + "libc", + "winapi", +] + [[package]] name = "futures" version = "0.3.32" @@ -3588,6 +3598,7 @@ dependencies = [ "chrono", "clap", "comfy-table", + "fs2", "loro", "magic-wormhole", "predicates", diff --git a/Cargo.toml b/Cargo.toml index 1f19bc57b7660a5a24ec398146a85d69d4e99633..c67908c93b0753fe7a4b305b066fc4bd21fd4436 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,6 +19,7 @@ loro = "1" ulid = "1" magic-wormhole = "0.7.6" tokio = { version = "1.49.0", features = ["rt-multi-thread", "macros"] } +fs2 = "0.4" [dev-dependencies] assert_cmd = "2" diff --git a/src/cmd/sync.rs b/src/cmd/sync.rs index 059a8b00f485411f92f35386597d3df8d1419a7b..0d78ad932fe44f55a17c9853da64d10a748a4cdc 100644 --- a/src/cmd/sync.rs +++ b/src/cmd/sync.rs @@ -4,8 +4,14 @@ //! compute deltas containing only the ops the other side lacks, then //! exchange and import those deltas. The result is that both docs //! converge to the same state without sending duplicate operations. +//! +//! When neither peer has a project selected, they exchange a `SyncAll` +//! handshake carrying a manifest of all their local projects. Each side +//! computes the intersection by `project_id` and syncs every shared +//! project in one wormhole session. use std::borrow::Cow; +use std::collections::HashMap; use std::path::Path; use anyhow::{bail, Context, Result}; @@ -14,6 +20,7 @@ use magic_wormhole::{AppConfig, AppID, Code, MailboxConnection, Wormhole}; use serde::{Deserialize, Serialize}; use crate::db; +use crate::db::PROJECTS_DIR; /// Custom AppID scoping our wormhole traffic away from other protocols. const APP_ID: &str = "td.sync.v1"; @@ -21,6 +28,15 @@ const APP_ID: &str = "td.sync.v1"; /// Number of random words in the generated wormhole code. const CODE_WORD_COUNT: usize = 2; +/// One entry in a [`SyncAll`] manifest: name, stable id, and current VV. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ProjectEntry { + pub project_name: String, + pub project_id: String, + #[serde(with = "vv_serde")] + pub version_vector: VersionVector, +} + /// Handshake message exchanged before the delta payload. #[derive(Debug, Serialize, Deserialize)] #[serde(tag = "mode")] @@ -39,6 +55,9 @@ enum SyncHandshake { #[serde(with = "vv_serde")] version_vector: VersionVector, }, + /// Sent when no project is selected locally. Carries a manifest of every + /// local project so the peer can compute the intersection. + SyncAll { projects: Vec }, } /// Serde adapter for `VersionVector` using its postcard `encode()`/`decode()`. @@ -56,7 +75,8 @@ mod vv_serde { } } -/// Outcome of a sync exchange, returned by [`exchange`]. +/// Outcome of a single-project sync exchange, returned by [`exchange`] and +/// [`sync_all_exchange`]. pub struct SyncReport { pub sent_bytes: usize, pub received_bytes: usize, @@ -71,11 +91,31 @@ pub fn wormhole_config() -> AppConfig { } } +/// Import a delta into a store and persist it. +/// +/// Commits the document and saves the raw delta for storage. Returns true +/// if any changes were imported (delta was non-empty). +fn import_and_persist(store: &db::Store, delta: &[u8]) -> Result { + if delta.is_empty() { + return Ok(false); + } + store + .doc() + .import(delta) + .context("failed to import delta")?; + store.doc().commit(); + store.save_raw_delta(delta)?; + Ok(true) +} + /// Run the sync protocol over an already-established wormhole. /// /// Both sides call this concurrently. The protocol is symmetric: each /// peer sends its version vector, receives the other's, computes a /// minimal delta, sends it, receives the peer's delta, and imports it. +/// +/// If the peer sends `SyncAll` (no project selected on their end), the local +/// project is treated as a bootstrap source and a full delta is sent. pub async fn exchange(store: &db::Store, mut wormhole: Wormhole) -> Result { let my_handshake = SyncHandshake::Sync { project_name: store.project_name().to_string(), @@ -97,7 +137,9 @@ pub async fn exchange(store: &db::Store, mut wormhole: Wormhole) -> Result Result (project_name, project_id), - SyncHandshake::Bootstrap { .. } => unreachable!("sync exchange always uses Sync"), + SyncHandshake::Bootstrap { .. } | SyncHandshake::SyncAll { .. } => { + unreachable!("exchange always sends Sync") + } }; if my_project_id != project_id { let _ = wormhole.close().await; @@ -121,15 +165,26 @@ pub async fn exchange(store: &db::Store, mut wormhole: Wormhole) -> Result version_vector.clone(), + SyncHandshake::SyncAll { projects } => { + // Peer has no project selected but sent their manifest. + // Find our project in their manifest to get their VV for us. + let my_id = read_project_id(store)?; + projects + .iter() + .find(|p| p.project_id == my_id) + .map(|p| p.version_vector.clone()) + .unwrap_or_default() } - SyncHandshake::Bootstrap { version_vector } => version_vector, }; // --- Phase 2: compute and exchange deltas --- let my_delta = store .doc() - .export(ExportMode::updates(their_vv)) + .export(ExportMode::updates(&their_vv)) .context("failed to export delta for peer")?; wormhole @@ -145,17 +200,8 @@ pub async fn exchange(store: &db::Store, mut wormhole: Wormhole) -> Result Result, json: bool) -> Result<()> { - let rt = tokio::runtime::Runtime::new().context("failed to create async runtime")?; - rt.block_on(run_async(root, code, json)) -} - -async fn run_async(root: &Path, code: Option<&str>, json: bool) -> Result<()> { - let maybe_store = db::try_open(root)?; - let c = crate::color::stderr_theme(); - - let wormhole = connect_wormhole(code, json, c).await?; - - let (store, report) = if let Some(store) = maybe_store { - if !json { - eprintln!("{}wormhole:{} connected, syncing...", c.blue, c.reset); - } - let report = exchange(&store, wormhole).await?; - (store, report) - } else { - if !json { - eprintln!( - "{}wormhole:{} connected, bootstrapping from peer...", - c.blue, c.reset - ); - } - bootstrap_exchange(root, wormhole).await? - }; - - print_sync_report(&store, &report, json, c)?; - - Ok(()) +/// Build a manifest of all local projects using the given data root. +/// +/// Each entry carries the project name, stable id, and current version vector. +/// Using an explicit `data_root` (rather than `HOME`) makes this safe to call +/// from async contexts where `HOME` may vary between peers. +pub fn build_local_manifest(data_root: &Path) -> Result> { + let names = db::list_projects_in(data_root)?; + let mut entries = Vec::with_capacity(names.len()); + for name in names { + let store = db::Store::open(data_root, &name)?; + let project_id = store.project_id()?; + let version_vector = store.doc().oplog_vv(); + entries.push(ProjectEntry { + project_name: name, + project_id, + version_vector, + }); + } + Ok(entries) } -async fn bootstrap_exchange( - root: &Path, +/// Run the SyncAll protocol over an already-established wormhole. +/// +/// Sends a `SyncAll` handshake carrying `local_manifest`, then handles the +/// three possible responses: +/// +/// - **`Sync`** — peer has a project selected; bootstrap that project from +/// them (identical semantics to the old single-peer bootstrap path). +/// - **`SyncAll`** — peer also has no project selected; compute the +/// intersection by `project_id`, sync each shared project in order, and +/// return one `(Store, SyncReport)` per synced project. An empty +/// intersection is not an error; an empty `Vec` is returned. +/// - **`Bootstrap`** — peer is running an older td; bail with an upgrade hint. +/// +/// `cwd` is only used when bootstrapping a brand-new project from a `Sync` +/// peer (to create the directory binding). It is ignored in the `SyncAll` +/// case where all projects already exist locally. +pub async fn sync_all_exchange( + cwd: &Path, + data_root: &Path, + local_manifest: Vec, mut wormhole: Wormhole, -) -> Result<(db::Store, SyncReport)> { +) -> Result> { + // Send our manifest. Clone so we can still use `local_manifest` below. wormhole - .send_json(&SyncHandshake::Bootstrap { - version_vector: VersionVector::default(), + .send_json(&SyncHandshake::SyncAll { + projects: local_manifest.clone(), }) .await - .context("failed to send bootstrap handshake")?; + .context("failed to send SyncAll handshake")?; let their_handshake: SyncHandshake = wormhole .receive_json::() @@ -215,39 +269,264 @@ async fn bootstrap_exchange( "peer sent incompatible handshake (are both sides running the same version of td?)", )?; - let project_name = match their_handshake { - SyncHandshake::Sync { project_name, .. } => project_name, + match their_handshake { + SyncHandshake::Sync { + project_name, + project_id, + version_vector, + } => { + // Peer has a specific project selected. + // Validate the peer's project name to prevent path traversal. + db::validate_project_name(&project_name)?; + let project_dir = data_root.join(PROJECTS_DIR).join(&project_name); + + if project_dir.exists() { + // Project exists locally: open it, verify ID matches, then sync normally. + let store = db::Store::open(data_root, &project_name).with_context(|| { + format!("failed to open existing project '{}'", project_name) + })?; + let my_id = read_project_id(&store)?; + if my_id != project_id { + bail!( + "project identity mismatch: local '{}' ({}) vs peer '{}' ({}). \ + Remove the accidentally initted local copy and bootstrap with 'td sync'", + project_name, + my_id, + project_name, + project_id + ); + } + + // Sync the existing project. + let my_delta = store + .doc() + .export(ExportMode::updates(&version_vector)) + .context("failed to export delta for peer")?; + + wormhole + .send(my_delta.clone()) + .await + .context("failed to send delta")?; + + let their_delta = wormhole + .receive() + .await + .context("failed to receive delta from peer")?; + + wormhole.close().await.context("failed to close wormhole")?; + + let imported = import_and_persist(&store, &their_delta) + .context("failed to import peer's delta")?; + + let report = SyncReport { + sent_bytes: my_delta.len(), + received_bytes: their_delta.len(), + imported, + }; + Ok(vec![(store, report)]) + } else { + // Project doesn't exist: bootstrap from peer. + // The peer (who sent Sync) expects to send their full state and be done. + wormhole + .send(Vec::new()) + .await + .context("failed to send empty bootstrap delta")?; + let their_delta = wormhole + .receive() + .await + .context("failed to receive bootstrap delta from peer")?; + + if their_delta.is_empty() { + bail!("peer sent empty bootstrap delta"); + } + + wormhole.close().await.context("failed to close wormhole")?; + + // Don't bind cwd when bootstrapping from SyncAll→Sync fallback — + // the user didn't intend to bind their current directory to this project. + let store = + db::bootstrap_sync_at(data_root, cwd, &project_name, &their_delta, false)?; + + let report = SyncReport { + sent_bytes: 0, + received_bytes: their_delta.len(), + imported: true, + }; + Ok(vec![(store, report)]) + } + } + SyncHandshake::Bootstrap { .. } => { + // Peer is running an older td that does not know about SyncAll. + // The peer will have already failed when it received our SyncAll + // handshake, so both sides are closing. let _ = wormhole.close().await; bail!( - "both peers are in bootstrap mode. Run 'td project init ' on one machine first, then run 'td sync' on the other" + "peer is running an older version of td that does not support SyncAll. \ + Upgrade td on both machines and try again" ); } - }; - wormhole - .send(Vec::new()) - .await - .context("failed to send bootstrap delta")?; + SyncHandshake::SyncAll { + projects: their_projects, + } => { + // Both sides have no project selected. Compute intersection by + // project_id and sync each shared project. + sync_shared_projects(data_root, local_manifest, their_projects, wormhole).await + } + } +} - let their_delta = wormhole - .receive() - .await - .context("failed to receive bootstrap delta from peer")?; +/// Exchange deltas for every project whose `project_id` appears on both sides. +/// +/// Both sides sort the intersection by `project_id` (deterministic ordering), +/// then for each project: send my delta, receive theirs, import. This mirrors +/// the single-project [`exchange`] pattern — each side sends before it receives, +/// so the relay buffers the message and neither side blocks waiting for the +/// other to read first. +async fn sync_shared_projects( + data_root: &Path, + local_projects: Vec, + their_projects: Vec, + mut wormhole: Wormhole, +) -> Result> { + // Wrap the body in an async block so we can ensure close() runs on all paths. + let result: Result> = async { + // Build a fast lookup from project_id → their entry. + let their_by_id: HashMap<&str, &ProjectEntry> = their_projects + .iter() + .map(|p| (p.project_id.as_str(), p)) + .collect(); + + // Collect the intersection, sorted by project_id for a deterministic wire + // ordering that both peers independently agree on. + let mut shared: Vec<(&ProjectEntry, &ProjectEntry)> = local_projects + .iter() + .filter_map(|mine| { + their_by_id + .get(mine.project_id.as_str()) + .map(|theirs| (mine, *theirs)) + }) + .collect(); + shared.sort_by(|a, b| a.0.project_id.cmp(&b.0.project_id)); + + if shared.is_empty() { + return Ok(vec![]); + } - wormhole.close().await.context("failed to close wormhole")?; + // For each project: open store, export delta, send/receive, import. + // Stores are opened one at a time to avoid exhausting file descriptors + // when syncing many projects. + let mut results: Vec<(db::Store, SyncReport)> = Vec::with_capacity(shared.len()); + for (mine, theirs) in shared { + if mine.project_name != theirs.project_name { + eprintln!( + "warning: project name mismatch for id {}: \ + local '{}', peer '{}' — syncing by id", + mine.project_id, mine.project_name, theirs.project_name, + ); + } + let store = db::Store::open(data_root, &mine.project_name) + .with_context(|| format!("failed to open project '{}'", mine.project_name))?; + let my_delta = store + .doc() + .export(ExportMode::updates(&theirs.version_vector)) + .with_context(|| { + format!("failed to export delta for '{}'", store.project_name()) + })?; + + wormhole + .send(my_delta.clone()) + .await + .with_context(|| format!("failed to send delta for '{}'", store.project_name()))?; + + let their_delta = wormhole.receive().await.with_context(|| { + format!("failed to receive delta for '{}'", store.project_name()) + })?; + + let imported = import_and_persist(&store, &their_delta).with_context(|| { + format!("failed to import delta for '{}'", store.project_name()) + })?; + + results.push(( + store, + SyncReport { + sent_bytes: my_delta.len(), + received_bytes: their_delta.len(), + imported, + }, + )); + } - if their_delta.is_empty() { - bail!("peer sent empty bootstrap delta"); + Ok(results) } + .await; - let store = db::bootstrap_sync(root, &project_name, &their_delta)?; - let report = SyncReport { - sent_bytes: 0, - received_bytes: their_delta.len(), - imported: true, - }; - Ok((store, report)) + // Always close the wormhole, but don't let close errors mask the original result. + let _ = wormhole.close().await; + result +} + +pub fn run(root: &Path, code: Option<&str>, json: bool) -> Result<()> { + let rt = tokio::runtime::Runtime::new().context("failed to create async runtime")?; + rt.block_on(run_async(root, code, json)) +} + +async fn run_async(root: &Path, code: Option<&str>, json: bool) -> Result<()> { + let maybe_store = db::try_open(root)?; + let c = crate::color::stderr_theme(); + + let wormhole = connect_wormhole(code, json, c).await?; + + if let Some(store) = maybe_store { + // A project is selected: single-project sync. + if !json { + eprintln!("{}wormhole:{} connected, syncing...", c.blue, c.reset); + } + let report = exchange(&store, wormhole).await?; + print_sync_report(&store, &report, json, c)?; + } else { + // No project selected: enumerate local projects and attempt SyncAll. + let data_root = db::data_root()?; + let local_manifest = build_local_manifest(&data_root)?; + + if !json { + if local_manifest.is_empty() { + eprintln!( + "{}wormhole:{} connected, bootstrapping from peer...", + c.blue, c.reset + ); + } else { + eprintln!( + "{}wormhole:{} connected, syncing all shared projects...", + c.blue, c.reset + ); + } + } + + let results = sync_all_exchange(root, &data_root, local_manifest, wormhole).await?; + + if results.is_empty() { + if json { + println!( + "{}", + serde_json::to_string( + &serde_json::json!({"synced": false, "reason": "no_shared_projects"}) + )? + ); + } else { + eprintln!("{}info:{} no shared projects to sync", c.blue, c.reset); + } + } else { + // Iterate by value to drop each Store (and its file handle) immediately + // after printing, rather than holding all handles until the loop ends. + for (store, report) in results { + print_sync_report(&store, &report, json, c)?; + } + } + } + + Ok(()) } async fn connect_wormhole( @@ -330,10 +609,5 @@ fn print_sync_report( /// Read the stable project identity from the doc's root meta map. fn read_project_id(store: &db::Store) -> Result { - let root = serde_json::to_value(store.doc().get_deep_value())?; - root.get("meta") - .and_then(|m| m.get("project_id")) - .and_then(|v| v.as_str()) - .map(str::to_owned) - .ok_or_else(|| anyhow::anyhow!("missing meta.project_id in project doc")) + store.project_id() } diff --git a/src/db.rs b/src/db.rs index e63b4948dd70ca7c24eb68b76a9e5ca6761dfb1e..968ed27f3d8765edb6adc524453e878bbfb8868b 100644 --- a/src/db.rs +++ b/src/db.rs @@ -7,11 +7,13 @@ use std::fmt; use std::fs::{self, File, OpenOptions}; use std::io::Write; use std::path::{Path, PathBuf}; + +use fs2::FileExt; use ulid::Ulid; pub const PROJECT_ENV: &str = "TD_PROJECT"; -const PROJECTS_DIR: &str = "projects"; +pub(crate) const PROJECTS_DIR: &str = "projects"; const CHANGES_DIR: &str = "changes"; const BINDINGS_FILE: &str = "bindings.json"; const BASE_FILE: &str = "base.loro"; @@ -520,6 +522,11 @@ impl Store { Ok(tasks) } + /// Return the stable project identity stored in `meta.project_id`. + pub fn project_id(&self) -> Result { + read_project_id_from_doc(&self.doc) + } + pub fn schema_version(&self) -> Result { migrate::read_schema_version(&self.doc) } @@ -609,8 +616,67 @@ pub fn bootstrap_sync(cwd: &Path, project: &str, delta: &[u8]) -> Result Ok(store) } +/// Bootstrap a project from a peer delta using an explicit data root. +/// +/// Unlike [`bootstrap_sync`], this function does not consult `HOME` and is +/// therefore safe to call from async contexts where `HOME` may vary by peer. +/// +/// If `bind_cwd` is true, the given working directory is bound to the new +/// project. Pass false when bootstrapping from a SyncAll context to avoid +/// unexpectedly binding directories like the user's home. +/// +/// Uses exclusive file locking to prevent race conditions when multiple +/// concurrent sync operations create projects or modify bindings. +pub fn bootstrap_sync_at( + data_root: &Path, + cwd: &Path, + project: &str, + delta: &[u8], + bind_cwd: bool, +) -> Result { + fs::create_dir_all(data_root.join(PROJECTS_DIR))?; + validate_project_name(project)?; + + // Exclusive lock prevents races when concurrent syncs create the same project + // or modify bindings simultaneously. + let lock_path = data_root.join(".bindings.lock"); + let lock_file = OpenOptions::new() + .create(true) + .truncate(false) + .write(true) + .open(&lock_path) + .with_context(|| format!("failed to open lock file '{}'", lock_path.display()))?; + lock_file + .lock_exclusive() + .context("failed to acquire exclusive lock on bindings")?; + + // Now holding the lock: create project and optionally update bindings atomically. + let store = Store::bootstrap_from_peer(data_root, project, delta)?; + + if bind_cwd { + let canonical = fs::canonicalize(cwd) + .with_context(|| format!("failed to canonicalize '{}'", cwd.display()))?; + let mut bindings = load_bindings(data_root)?; + bindings + .bindings + .insert(canonical.to_string_lossy().to_string(), project.to_string()); + save_bindings(data_root, &bindings)?; + } + + // Lock is released when lock_file is dropped. + Ok(store) +} + pub fn list_projects() -> Result> { let root = data_root()?; + list_projects_in(&root) +} + +/// List project names rooted at an explicit data directory. +/// +/// Unlike [`list_projects`], this does not consult `HOME` and is therefore +/// safe to call from async contexts where `HOME` may vary between peers. +pub(crate) fn list_projects_in(root: &Path) -> Result> { let mut out = Vec::new(); let dir = root.join(PROJECTS_DIR); if !dir.exists() { @@ -818,7 +884,7 @@ fn is_prefix_path(prefix: &Path, target: &Path) -> bool { } } -fn validate_project_name(name: &str) -> Result<()> { +pub fn validate_project_name(name: &str) -> Result<()> { if name.is_empty() { bail!("project name cannot be empty"); } diff --git a/tests/cli_sync.rs b/tests/cli_sync.rs index 26df6bd466442b7d2b0c40729f32c44d2d21c7a5..8958d15a408c46ca480d1a7bcfae8caec45bc9c2 100644 --- a/tests/cli_sync.rs +++ b/tests/cli_sync.rs @@ -271,3 +271,187 @@ fn bootstrap_from_peer_rejects_missing_project_id() { "bootstrap should not persist snapshot for invalid peer doc" ); } + +/// Helper: insert a minimal valid task into a doc via apply_and_persist. +fn insert_task(store: &yatd::db::Store, title: &str) { + let id = yatd::db::gen_id(); + store + .apply_and_persist(|doc| { + let tasks = doc.get_map("tasks"); + let task = yatd::db::insert_task_map(&tasks, &id)?; + task.insert("title", title)?; + task.insert("description", "")?; + task.insert("type", "task")?; + task.insert("priority", "medium")?; + task.insert("status", "open")?; + task.insert("effort", "medium")?; + task.insert("parent", "")?; + task.insert("created_at", yatd::db::now_utc())?; + task.insert("updated_at", yatd::db::now_utc())?; + task.insert("deleted_at", "")?; + task.insert_container("labels", loro::LoroMap::new())?; + task.insert_container("blockers", loro::LoroMap::new())?; + task.insert_container("logs", loro::LoroMap::new())?; + Ok(()) + }) + .unwrap(); +} + +/// Both peers have the same project (same project_id) with no directory +/// binding/selection. SyncAll should discover the shared project and converge +/// both stores to the same state. +#[test] +fn sync_all_exchanges_shared_projects() { + use std::fs; + use yatd::cmd::sync::{build_local_manifest, sync_all_exchange, wormhole_config}; + use yatd::db; + + let home_a = tempfile::tempdir().unwrap(); + let home_b = tempfile::tempdir().unwrap(); + let cwd_a = tempfile::tempdir().unwrap(); + let cwd_b = tempfile::tempdir().unwrap(); + + let data_root_a = home_a.path().join(".local/share/td"); + let data_root_b = home_b.path().join(".local/share/td"); + fs::create_dir_all(data_root_a.join("projects")).unwrap(); + fs::create_dir_all(data_root_b.join("projects")).unwrap(); + + // Peer A: init "shared" and add a task. + let store_a = db::Store::init(&data_root_a, "shared").unwrap(); + insert_task(&store_a, "task from A"); + + // Peer B: bootstrap from A's base snapshot (same project_id), add its own task. + let proj_b = data_root_b.join("projects/shared"); + fs::create_dir_all(proj_b.join("changes")).unwrap(); + fs::copy( + data_root_a.join("projects/shared/base.loro"), + proj_b.join("base.loro"), + ) + .unwrap(); + let store_b = db::Store::open(&data_root_b, "shared").unwrap(); + insert_task(&store_b, "task from B"); + + // Build manifests from disk (HOME-free: uses explicit data_root). + let manifest_a = build_local_manifest(&data_root_a).unwrap(); + let manifest_b = build_local_manifest(&data_root_b).unwrap(); + assert_eq!(manifest_a.len(), 1); + assert_eq!(manifest_b.len(), 1); + assert_eq!( + manifest_a[0].project_id, manifest_b[0].project_id, + "both sides must share the same project_id" + ); + + let rt = tokio::runtime::Runtime::new().unwrap(); + let (results_a, results_b) = rt.block_on(async { + use magic_wormhole::{MailboxConnection, Wormhole}; + + let mailbox_a = MailboxConnection::create(wormhole_config(), 2) + .await + .unwrap(); + let code = mailbox_a.code().clone(); + let mailbox_b = MailboxConnection::connect(wormhole_config(), code, false) + .await + .unwrap(); + let (wormhole_a, wormhole_b) = + tokio::try_join!(Wormhole::connect(mailbox_a), Wormhole::connect(mailbox_b)).unwrap(); + + tokio::try_join!( + sync_all_exchange(cwd_a.path(), &data_root_a, manifest_a, wormhole_a), + sync_all_exchange(cwd_b.path(), &data_root_b, manifest_b, wormhole_b), + ) + .unwrap() + }); + + assert_eq!( + results_a.len(), + 1, + "A should have synced exactly one project" + ); + assert_eq!( + results_b.len(), + 1, + "B should have synced exactly one project" + ); + + let (store_a_synced, report_a) = &results_a[0]; + let (store_b_synced, report_b) = &results_b[0]; + + // Both peers should have imported: A has "task A", B starts from A's empty + // base then adds "task B". After sync, both have distinct changes to exchange. + assert!(report_a.imported, "A should have imported B's task"); + assert!(report_b.imported, "B should have imported A's task"); + + let a_tasks = store_a_synced.list_tasks().unwrap(); + let b_tasks = store_b_synced.list_tasks().unwrap(); + assert_eq!(a_tasks.len(), 2, "A should have 2 tasks after SyncAll"); + assert_eq!(b_tasks.len(), 2, "B should have 2 tasks after SyncAll"); + + let a_titles: Vec<&str> = a_tasks.iter().map(|t| t.title.as_str()).collect(); + let b_titles: Vec<&str> = b_tasks.iter().map(|t| t.title.as_str()).collect(); + assert!(a_titles.contains(&"task from A")); + assert!(a_titles.contains(&"task from B")); + assert!(b_titles.contains(&"task from A")); + assert!(b_titles.contains(&"task from B")); +} + +/// Both peers have projects but no project_ids in common. SyncAll should +/// complete without error and return an empty result on both sides. +#[test] +fn sync_all_no_intersection_is_noop() { + use std::fs; + use yatd::cmd::sync::{build_local_manifest, sync_all_exchange, wormhole_config}; + use yatd::db; + + let home_a = tempfile::tempdir().unwrap(); + let home_b = tempfile::tempdir().unwrap(); + let cwd_a = tempfile::tempdir().unwrap(); + let cwd_b = tempfile::tempdir().unwrap(); + + let data_root_a = home_a.path().join(".local/share/td"); + let data_root_b = home_b.path().join(".local/share/td"); + fs::create_dir_all(data_root_a.join("projects")).unwrap(); + fs::create_dir_all(data_root_b.join("projects")).unwrap(); + + // A has "alpha", B has "bravo" — independently initialised, different project_ids. + let _ = db::Store::init(&data_root_a, "alpha").unwrap(); + let _ = db::Store::init(&data_root_b, "bravo").unwrap(); + + let manifest_a = build_local_manifest(&data_root_a).unwrap(); + let manifest_b = build_local_manifest(&data_root_b).unwrap(); + assert_eq!(manifest_a.len(), 1); + assert_eq!(manifest_b.len(), 1); + assert_ne!( + manifest_a[0].project_id, manifest_b[0].project_id, + "projects must have different ids" + ); + + let rt = tokio::runtime::Runtime::new().unwrap(); + let (results_a, results_b) = rt.block_on(async { + use magic_wormhole::{MailboxConnection, Wormhole}; + + let mailbox_a = MailboxConnection::create(wormhole_config(), 2) + .await + .unwrap(); + let code = mailbox_a.code().clone(); + let mailbox_b = MailboxConnection::connect(wormhole_config(), code, false) + .await + .unwrap(); + let (wormhole_a, wormhole_b) = + tokio::try_join!(Wormhole::connect(mailbox_a), Wormhole::connect(mailbox_b)).unwrap(); + + tokio::try_join!( + sync_all_exchange(cwd_a.path(), &data_root_a, manifest_a, wormhole_a), + sync_all_exchange(cwd_b.path(), &data_root_b, manifest_b, wormhole_b), + ) + .unwrap() + }); + + assert!( + results_a.is_empty(), + "A: no shared projects, result should be empty" + ); + assert!( + results_b.is_empty(), + "B: no shared projects, result should be empty" + ); +}