sync.rs

  1//! Peer-to-peer project sync via magic-wormhole.
  2//!
  3//! Both peers open the same project, exchange Loro version vectors,
  4//! compute deltas containing only the ops the other side lacks, then
  5//! exchange and import those deltas.  The result is that both docs
  6//! converge to the same state without sending duplicate operations.
  7//!
  8//! When neither peer has a project selected, they exchange a `SyncAll`
  9//! handshake carrying a manifest of all their local projects.  Each side
 10//! computes the intersection by `project_id` and syncs every shared
 11//! project in one wormhole session.
 12
 13use std::borrow::Cow;
 14use std::collections::HashMap;
 15use std::path::Path;
 16
 17use anyhow::{bail, Context, Result};
 18use loro::{ExportMode, VersionVector};
 19use magic_wormhole::{AppConfig, AppID, Code, MailboxConnection, Wormhole};
 20use serde::{Deserialize, Serialize};
 21
 22use crate::db;
 23use crate::db::PROJECTS_DIR;
 24
 25/// Custom AppID scoping our wormhole traffic away from other protocols.
 26const APP_ID: &str = "td.sync.v1";
 27
 28/// Number of random words in the generated wormhole code.
 29const CODE_WORD_COUNT: usize = 2;
 30
 31/// One entry in a [`SyncAll`] manifest: name, stable id, and current VV.
 32#[derive(Debug, Clone, Serialize, Deserialize)]
 33pub struct ProjectEntry {
 34    pub project_name: String,
 35    pub project_id: String,
 36    #[serde(with = "vv_serde")]
 37    pub version_vector: VersionVector,
 38}
 39
 40/// Handshake message exchanged before the delta payload.
 41#[derive(Debug, Serialize, Deserialize)]
 42#[serde(tag = "mode")]
 43enum SyncHandshake {
 44    Sync {
 45        /// Human-readable project name.
 46        project_name: String,
 47        /// Stable identity (ULID stored in the doc's root meta map).
 48        project_id: String,
 49        /// Serialised version vector so the peer can compute a minimal delta.
 50        #[serde(with = "vv_serde")]
 51        version_vector: VersionVector,
 52    },
 53    Bootstrap {
 54        /// Serialised version vector so the peer can compute a minimal delta.
 55        #[serde(with = "vv_serde")]
 56        version_vector: VersionVector,
 57    },
 58    /// Sent when no project is selected locally.  Carries a manifest of every
 59    /// local project so the peer can compute the intersection.
 60    SyncAll { projects: Vec<ProjectEntry> },
 61}
 62
 63/// Serde adapter for `VersionVector` using its postcard `encode()`/`decode()`.
 64mod vv_serde {
 65    use loro::VersionVector;
 66    use serde::{self, Deserializer, Serializer};
 67
 68    pub fn serialize<S: Serializer>(vv: &VersionVector, ser: S) -> Result<S::Ok, S::Error> {
 69        ser.serialize_bytes(&vv.encode())
 70    }
 71
 72    pub fn deserialize<'de, D: Deserializer<'de>>(de: D) -> Result<VersionVector, D::Error> {
 73        let bytes: Vec<u8> = serde::Deserialize::deserialize(de)?;
 74        VersionVector::decode(&bytes).map_err(serde::de::Error::custom)
 75    }
 76}
 77
 78/// Outcome of a single-project sync exchange, returned by [`exchange`] and
 79/// [`sync_all_exchange`].
 80pub struct SyncReport {
 81    pub sent_bytes: usize,
 82    pub received_bytes: usize,
 83    pub imported: bool,
 84}
 85
 86pub fn wormhole_config() -> AppConfig<serde_json::Value> {
 87    AppConfig {
 88        id: AppID::new(APP_ID),
 89        rendezvous_url: Cow::Borrowed(magic_wormhole::rendezvous::DEFAULT_RENDEZVOUS_SERVER),
 90        app_version: serde_json::json!({"v": 1}),
 91    }
 92}
 93
 94/// Import a delta into a store and persist it.
 95///
 96/// Commits the document and saves the raw delta for storage. Returns true
 97/// if any changes were imported (delta was non-empty).
 98fn import_and_persist(store: &db::Store, delta: &[u8]) -> Result<bool> {
 99    if delta.is_empty() {
100        return Ok(false);
101    }
102    store
103        .doc()
104        .import(delta)
105        .context("failed to import delta")?;
106    store.doc().commit();
107    store.save_raw_delta(delta)?;
108    Ok(true)
109}
110
111/// Run the sync protocol over an already-established wormhole.
112///
113/// Both sides call this concurrently.  The protocol is symmetric: each
114/// peer sends its version vector, receives the other's, computes a
115/// minimal delta, sends it, receives the peer's delta, and imports it.
116///
117/// If the peer sends `SyncAll` (no project selected on their end), the local
118/// project is treated as a bootstrap source and a full delta is sent.
119pub async fn exchange(store: &db::Store, mut wormhole: Wormhole) -> Result<SyncReport> {
120    let my_handshake = SyncHandshake::Sync {
121        project_name: store.project_name().to_string(),
122        project_id: read_project_id(store)?,
123        version_vector: store.doc().oplog_vv(),
124    };
125
126    // --- Phase 1: exchange handshakes ---
127    wormhole
128        .send_json(&my_handshake)
129        .await
130        .context("failed to send handshake")?;
131
132    let their_handshake: SyncHandshake = wormhole
133        .receive_json::<SyncHandshake>()
134        .await
135        .context("failed to receive handshake")?
136        .context(
137            "peer sent incompatible handshake (are both sides running the same version of td?)",
138        )?;
139
140    // Determine the peer's version vector, validating project identity when
141    // both sides have a project selected.
142    let their_vv: VersionVector = match &their_handshake {
143        SyncHandshake::Sync {
144            project_name,
145            project_id,
146            version_vector,
147        } => {
148            let (my_project_name, my_project_id) = match &my_handshake {
149                SyncHandshake::Sync {
150                    project_name,
151                    project_id,
152                    ..
153                } => (project_name, project_id),
154                SyncHandshake::Bootstrap { .. } | SyncHandshake::SyncAll { .. } => {
155                    unreachable!("exchange always sends Sync")
156                }
157            };
158            if my_project_id != project_id {
159                let _ = wormhole.close().await;
160                bail!(
161                    "project identity mismatch: local '{}' ({}) vs peer '{}' ({}). If this is the same logical project, remove the accidentally initted local copy and bootstrap with 'td sync' instead of running 'td project init <project>' on both machines",
162                    my_project_name,
163                    my_project_id,
164                    project_name,
165                    project_id,
166                );
167            }
168            version_vector.clone()
169        }
170        // Peer has no project; treat as a bootstrap request and export everything.
171        SyncHandshake::Bootstrap { version_vector } => version_vector.clone(),
172        SyncHandshake::SyncAll { projects } => {
173            // Peer has no project selected but sent their manifest.
174            // Find our project in their manifest to get their VV for us.
175            let my_id = read_project_id(store)?;
176            projects
177                .iter()
178                .find(|p| p.project_id == my_id)
179                .map(|p| p.version_vector.clone())
180                .unwrap_or_default()
181        }
182    };
183
184    // --- Phase 2: compute and exchange deltas ---
185    let my_delta = store
186        .doc()
187        .export(ExportMode::updates(&their_vv))
188        .context("failed to export delta for peer")?;
189
190    wormhole
191        .send(my_delta.clone())
192        .await
193        .context("failed to send delta")?;
194
195    let their_delta = wormhole
196        .receive()
197        .await
198        .context("failed to receive delta from peer")?;
199
200    wormhole.close().await.context("failed to close wormhole")?;
201
202    // --- Phase 3: import the peer's delta locally ---
203    let imported =
204        import_and_persist(store, &their_delta).context("failed to import peer delta")?;
205
206    Ok(SyncReport {
207        sent_bytes: my_delta.len(),
208        received_bytes: their_delta.len(),
209        imported,
210    })
211}
212
213/// Build a manifest of all local projects using the given data root.
214///
215/// Each entry carries the project name, stable id, and current version vector.
216/// Using an explicit `data_root` (rather than `HOME`) makes this safe to call
217/// from async contexts where `HOME` may vary between peers.
218pub fn build_local_manifest(data_root: &Path) -> Result<Vec<ProjectEntry>> {
219    let names = db::list_projects_in(data_root)?;
220    let mut entries = Vec::with_capacity(names.len());
221    for name in names {
222        let store = db::Store::open(data_root, &name)?;
223        let project_id = store.project_id()?;
224        let version_vector = store.doc().oplog_vv();
225        entries.push(ProjectEntry {
226            project_name: name,
227            project_id,
228            version_vector,
229        });
230    }
231    Ok(entries)
232}
233
234/// Run the SyncAll protocol over an already-established wormhole.
235///
236/// Sends a `SyncAll` handshake carrying `local_manifest`, then handles the
237/// three possible responses:
238///
239/// - **`Sync`** — peer has a project selected; bootstrap that project from
240///   them (identical semantics to the old single-peer bootstrap path).
241/// - **`SyncAll`** — peer also has no project selected; compute the
242///   intersection by `project_id`, sync each shared project in order, and
243///   return one `(Store, SyncReport)` per synced project.  An empty
244///   intersection is not an error; an empty `Vec` is returned.
245/// - **`Bootstrap`** — peer is running an older td; bail with an upgrade hint.
246///
247/// `cwd` is only used when bootstrapping a brand-new project from a `Sync`
248/// peer (to create the directory binding).  It is ignored in the `SyncAll`
249/// case where all projects already exist locally.
250pub async fn sync_all_exchange(
251    cwd: &Path,
252    data_root: &Path,
253    local_manifest: Vec<ProjectEntry>,
254    mut wormhole: Wormhole,
255) -> Result<Vec<(db::Store, SyncReport)>> {
256    // Send our manifest.  Clone so we can still use `local_manifest` below.
257    wormhole
258        .send_json(&SyncHandshake::SyncAll {
259            projects: local_manifest.clone(),
260        })
261        .await
262        .context("failed to send SyncAll handshake")?;
263
264    let their_handshake: SyncHandshake = wormhole
265        .receive_json::<SyncHandshake>()
266        .await
267        .context("failed to receive handshake")?
268        .context(
269            "peer sent incompatible handshake (are both sides running the same version of td?)",
270        )?;
271
272    match their_handshake {
273        SyncHandshake::Sync {
274            project_name,
275            project_id,
276            version_vector,
277        } => {
278            // Peer has a specific project selected.
279            // Validate the peer's project name to prevent path traversal.
280            db::validate_project_name(&project_name)?;
281            let project_dir = data_root.join(PROJECTS_DIR).join(&project_name);
282
283            if project_dir.exists() {
284                // Project exists locally: open it, verify ID matches, then sync normally.
285                let store = db::Store::open(data_root, &project_name).with_context(|| {
286                    format!("failed to open existing project '{}'", project_name)
287                })?;
288                let my_id = read_project_id(&store)?;
289                if my_id != project_id {
290                    bail!(
291                        "project identity mismatch: local '{}' ({}) vs peer '{}' ({}). \
292                         Remove the accidentally initted local copy and bootstrap with 'td sync'",
293                        project_name,
294                        my_id,
295                        project_name,
296                        project_id
297                    );
298                }
299
300                // Sync the existing project.
301                let my_delta = store
302                    .doc()
303                    .export(ExportMode::updates(&version_vector))
304                    .context("failed to export delta for peer")?;
305
306                wormhole
307                    .send(my_delta.clone())
308                    .await
309                    .context("failed to send delta")?;
310
311                let their_delta = wormhole
312                    .receive()
313                    .await
314                    .context("failed to receive delta from peer")?;
315
316                wormhole.close().await.context("failed to close wormhole")?;
317
318                let imported = import_and_persist(&store, &their_delta)
319                    .context("failed to import peer's delta")?;
320
321                let report = SyncReport {
322                    sent_bytes: my_delta.len(),
323                    received_bytes: their_delta.len(),
324                    imported,
325                };
326                Ok(vec![(store, report)])
327            } else {
328                // Project doesn't exist: bootstrap from peer.
329                // The peer (who sent Sync) expects to send their full state and be done.
330                wormhole
331                    .send(Vec::new())
332                    .await
333                    .context("failed to send empty bootstrap delta")?;
334                let their_delta = wormhole
335                    .receive()
336                    .await
337                    .context("failed to receive bootstrap delta from peer")?;
338
339                if their_delta.is_empty() {
340                    bail!("peer sent empty bootstrap delta");
341                }
342
343                wormhole.close().await.context("failed to close wormhole")?;
344
345                // Don't bind cwd when bootstrapping from SyncAll→Sync fallback —
346                // the user didn't intend to bind their current directory to this project.
347                let store =
348                    db::bootstrap_sync_at(data_root, cwd, &project_name, &their_delta, false)?;
349
350                let report = SyncReport {
351                    sent_bytes: 0,
352                    received_bytes: their_delta.len(),
353                    imported: true,
354                };
355                Ok(vec![(store, report)])
356            }
357        }
358
359        SyncHandshake::Bootstrap { .. } => {
360            // Peer is running an older td that does not know about SyncAll.
361            // The peer will have already failed when it received our SyncAll
362            // handshake, so both sides are closing.
363            let _ = wormhole.close().await;
364            bail!(
365                "peer is running an older version of td that does not support SyncAll. \
366                 Upgrade td on both machines and try again"
367            );
368        }
369
370        SyncHandshake::SyncAll {
371            projects: their_projects,
372        } => {
373            // Both sides have no project selected.  Compute intersection by
374            // project_id and sync each shared project.
375            sync_shared_projects(data_root, local_manifest, their_projects, wormhole).await
376        }
377    }
378}
379
380/// Exchange deltas for every project whose `project_id` appears on both sides.
381///
382/// Both sides sort the intersection by `project_id` (deterministic ordering),
383/// then for each project: send my delta, receive theirs, import.  This mirrors
384/// the single-project [`exchange`] pattern — each side sends before it receives,
385/// so the relay buffers the message and neither side blocks waiting for the
386/// other to read first.
387async fn sync_shared_projects(
388    data_root: &Path,
389    local_projects: Vec<ProjectEntry>,
390    their_projects: Vec<ProjectEntry>,
391    mut wormhole: Wormhole,
392) -> Result<Vec<(db::Store, SyncReport)>> {
393    // Wrap the body in an async block so we can ensure close() runs on all paths.
394    let result: Result<Vec<(db::Store, SyncReport)>> = async {
395        // Build a fast lookup from project_id → their entry.
396        let their_by_id: HashMap<&str, &ProjectEntry> = their_projects
397            .iter()
398            .map(|p| (p.project_id.as_str(), p))
399            .collect();
400
401        // Collect the intersection, sorted by project_id for a deterministic wire
402        // ordering that both peers independently agree on.
403        let mut shared: Vec<(&ProjectEntry, &ProjectEntry)> = local_projects
404            .iter()
405            .filter_map(|mine| {
406                their_by_id
407                    .get(mine.project_id.as_str())
408                    .map(|theirs| (mine, *theirs))
409            })
410            .collect();
411        shared.sort_by(|a, b| a.0.project_id.cmp(&b.0.project_id));
412
413        if shared.is_empty() {
414            return Ok(vec![]);
415        }
416
417        // For each project: open store, export delta, send/receive, import.
418        // Stores are opened one at a time to avoid exhausting file descriptors
419        // when syncing many projects.
420        let mut results: Vec<(db::Store, SyncReport)> = Vec::with_capacity(shared.len());
421        for (mine, theirs) in shared {
422            if mine.project_name != theirs.project_name {
423                eprintln!(
424                    "warning: project name mismatch for id {}: \
425                     local '{}', peer '{}' — syncing by id",
426                    mine.project_id, mine.project_name, theirs.project_name,
427                );
428            }
429            let store = db::Store::open(data_root, &mine.project_name)
430                .with_context(|| format!("failed to open project '{}'", mine.project_name))?;
431            let my_delta = store
432                .doc()
433                .export(ExportMode::updates(&theirs.version_vector))
434                .with_context(|| {
435                    format!("failed to export delta for '{}'", store.project_name())
436                })?;
437
438            wormhole
439                .send(my_delta.clone())
440                .await
441                .with_context(|| format!("failed to send delta for '{}'", store.project_name()))?;
442
443            let their_delta = wormhole.receive().await.with_context(|| {
444                format!("failed to receive delta for '{}'", store.project_name())
445            })?;
446
447            let imported = import_and_persist(&store, &their_delta).with_context(|| {
448                format!("failed to import delta for '{}'", store.project_name())
449            })?;
450
451            results.push((
452                store,
453                SyncReport {
454                    sent_bytes: my_delta.len(),
455                    received_bytes: their_delta.len(),
456                    imported,
457                },
458            ));
459        }
460
461        Ok(results)
462    }
463    .await;
464
465    // Always close the wormhole, but don't let close errors mask the original result.
466    let _ = wormhole.close().await;
467    result
468}
469
470pub fn run(root: &Path, code: Option<&str>, json: bool) -> Result<()> {
471    let rt = tokio::runtime::Runtime::new().context("failed to create async runtime")?;
472    rt.block_on(run_async(root, code, json))
473}
474
475async fn run_async(root: &Path, code: Option<&str>, json: bool) -> Result<()> {
476    let maybe_store = db::try_open(root)?;
477    let c = crate::color::stderr_theme();
478
479    let wormhole = connect_wormhole(code, json, c).await?;
480
481    if let Some(store) = maybe_store {
482        // A project is selected: single-project sync.
483        if !json {
484            eprintln!("{}wormhole:{} connected, syncing...", c.blue, c.reset);
485        }
486        let report = exchange(&store, wormhole).await?;
487        print_sync_report(&store, &report, json, c)?;
488    } else {
489        // No project selected: enumerate local projects and attempt SyncAll.
490        let data_root = db::data_root()?;
491        let local_manifest = build_local_manifest(&data_root)?;
492
493        if !json {
494            if local_manifest.is_empty() {
495                eprintln!(
496                    "{}wormhole:{} connected, bootstrapping from peer...",
497                    c.blue, c.reset
498                );
499            } else {
500                eprintln!(
501                    "{}wormhole:{} connected, syncing all shared projects...",
502                    c.blue, c.reset
503                );
504            }
505        }
506
507        let results = sync_all_exchange(root, &data_root, local_manifest, wormhole).await?;
508
509        if results.is_empty() {
510            if json {
511                println!(
512                    "{}",
513                    serde_json::to_string(
514                        &serde_json::json!({"synced": false, "reason": "no_shared_projects"})
515                    )?
516                );
517            } else {
518                eprintln!("{}info:{} no shared projects to sync", c.blue, c.reset);
519            }
520        } else {
521            // Iterate by value to drop each Store (and its file handle) immediately
522            // after printing, rather than holding all handles until the loop ends.
523            for (store, report) in results {
524                print_sync_report(&store, &report, json, c)?;
525            }
526        }
527    }
528
529    Ok(())
530}
531
532async fn connect_wormhole(
533    code: Option<&str>,
534    json: bool,
535    c: &crate::color::Theme,
536) -> Result<Wormhole> {
537    match code {
538        None => {
539            let mailbox = MailboxConnection::create(wormhole_config(), CODE_WORD_COUNT)
540                .await
541                .context("failed to create wormhole mailbox")?;
542
543            let code = mailbox.code().clone();
544            if json {
545                println!(
546                    "{}",
547                    serde_json::to_string(&serde_json::json!({"code": code.to_string()}))?
548                );
549            } else {
550                eprintln!("{}wormhole:{} run on the other machine:\n", c.blue, c.reset);
551                eprintln!("  td sync {}{}{}\n", c.bold, code, c.reset);
552                eprintln!("waiting for peer...");
553            }
554
555            Wormhole::connect(mailbox)
556                .await
557                .context("wormhole key exchange failed")
558        }
559        Some(raw) => {
560            let code: Code = raw.parse().context("invalid wormhole code")?;
561            let mailbox = MailboxConnection::connect(wormhole_config(), code, false)
562                .await
563                .context("failed to connect to wormhole mailbox")?;
564
565            if !json {
566                eprintln!("{}wormhole:{} connecting...", c.blue, c.reset);
567            }
568
569            Wormhole::connect(mailbox)
570                .await
571                .context("wormhole key exchange failed")
572        }
573    }
574}
575
576fn print_sync_report(
577    store: &db::Store,
578    report: &SyncReport,
579    json: bool,
580    c: &crate::color::Theme,
581) -> Result<()> {
582    if json {
583        println!(
584            "{}",
585            serde_json::to_string(&serde_json::json!({
586                "synced": true,
587                "project": store.project_name(),
588                "sent_bytes": report.sent_bytes,
589                "received_bytes": report.received_bytes,
590            }))?
591        );
592    } else {
593        eprintln!(
594            "{}synced:{} {} (sent {} bytes, received {} bytes)",
595            c.green,
596            c.reset,
597            store.project_name(),
598            report.sent_bytes,
599            report.received_bytes,
600        );
601        if report.imported {
602            eprintln!("{}info:{} imported peer changes", c.blue, c.reset);
603        } else {
604            eprintln!("{}info:{} peer had no new changes", c.blue, c.reset);
605        }
606    }
607    Ok(())
608}
609
610/// Read the stable project identity from the doc's root meta map.
611fn read_project_id(store: &db::Store) -> Result<String> {
612    store.project_id()
613}