1//! Peer-to-peer project sync via magic-wormhole.
2//!
3//! Both peers open the same project, exchange Loro version vectors,
4//! compute deltas containing only the ops the other side lacks, then
5//! exchange and import those deltas. The result is that both docs
6//! converge to the same state without sending duplicate operations.
7//!
8//! When neither peer has a project selected, they exchange a `SyncAll`
9//! handshake carrying a manifest of all their local projects. Each side
10//! computes the intersection by `project_id` and syncs every shared
11//! project in one wormhole session.
12
13use std::borrow::Cow;
14use std::collections::HashMap;
15use std::path::Path;
16
17use anyhow::{bail, Context, Result};
18use loro::{ExportMode, VersionVector};
19use magic_wormhole::{AppConfig, AppID, Code, MailboxConnection, Wormhole};
20use serde::{Deserialize, Serialize};
21
22use crate::db;
23use crate::db::PROJECTS_DIR;
24
25/// Custom AppID scoping our wormhole traffic away from other protocols.
26const APP_ID: &str = "td.sync.v1";
27
28/// Number of random words in the generated wormhole code.
29const CODE_WORD_COUNT: usize = 2;
30
31/// One entry in a [`SyncAll`] manifest: name, stable id, and current VV.
32#[derive(Debug, Clone, Serialize, Deserialize)]
33pub struct ProjectEntry {
34 pub project_name: String,
35 pub project_id: String,
36 #[serde(with = "vv_serde")]
37 pub version_vector: VersionVector,
38}
39
40/// Handshake message exchanged before the delta payload.
41#[derive(Debug, Serialize, Deserialize)]
42#[serde(tag = "mode")]
43enum SyncHandshake {
44 Sync {
45 /// Human-readable project name.
46 project_name: String,
47 /// Stable identity (ULID stored in the doc's root meta map).
48 project_id: String,
49 /// Serialised version vector so the peer can compute a minimal delta.
50 #[serde(with = "vv_serde")]
51 version_vector: VersionVector,
52 },
53 Bootstrap {
54 /// Serialised version vector so the peer can compute a minimal delta.
55 #[serde(with = "vv_serde")]
56 version_vector: VersionVector,
57 },
58 /// Sent when no project is selected locally. Carries a manifest of every
59 /// local project so the peer can compute the intersection.
60 SyncAll { projects: Vec<ProjectEntry> },
61}
62
63/// Serde adapter for `VersionVector` using its postcard `encode()`/`decode()`.
64mod vv_serde {
65 use loro::VersionVector;
66 use serde::{self, Deserializer, Serializer};
67
68 pub fn serialize<S: Serializer>(vv: &VersionVector, ser: S) -> Result<S::Ok, S::Error> {
69 ser.serialize_bytes(&vv.encode())
70 }
71
72 pub fn deserialize<'de, D: Deserializer<'de>>(de: D) -> Result<VersionVector, D::Error> {
73 let bytes: Vec<u8> = serde::Deserialize::deserialize(de)?;
74 VersionVector::decode(&bytes).map_err(serde::de::Error::custom)
75 }
76}
77
78/// Outcome of a single-project sync exchange, returned by [`exchange`] and
79/// [`sync_all_exchange`].
80pub struct SyncReport {
81 pub sent_bytes: usize,
82 pub received_bytes: usize,
83 pub imported: bool,
84}
85
86pub fn wormhole_config() -> AppConfig<serde_json::Value> {
87 AppConfig {
88 id: AppID::new(APP_ID),
89 rendezvous_url: Cow::Borrowed(magic_wormhole::rendezvous::DEFAULT_RENDEZVOUS_SERVER),
90 app_version: serde_json::json!({"v": 1}),
91 }
92}
93
94/// Import a delta into a store and persist it.
95///
96/// Commits the document and saves the raw delta for storage. Returns true
97/// if any changes were imported (delta was non-empty).
98fn import_and_persist(store: &db::Store, delta: &[u8]) -> Result<bool> {
99 if delta.is_empty() {
100 return Ok(false);
101 }
102 store
103 .doc()
104 .import(delta)
105 .context("failed to import delta")?;
106 store.doc().commit();
107 store.save_raw_delta(delta)?;
108 Ok(true)
109}
110
111/// Run the sync protocol over an already-established wormhole.
112///
113/// Both sides call this concurrently. The protocol is symmetric: each
114/// peer sends its version vector, receives the other's, computes a
115/// minimal delta, sends it, receives the peer's delta, and imports it.
116///
117/// If the peer sends `SyncAll` (no project selected on their end), the local
118/// project is treated as a bootstrap source and a full delta is sent.
119pub async fn exchange(store: &db::Store, mut wormhole: Wormhole) -> Result<SyncReport> {
120 let my_handshake = SyncHandshake::Sync {
121 project_name: store.project_name().to_string(),
122 project_id: read_project_id(store)?,
123 version_vector: store.doc().oplog_vv(),
124 };
125
126 // --- Phase 1: exchange handshakes ---
127 wormhole
128 .send_json(&my_handshake)
129 .await
130 .context("failed to send handshake")?;
131
132 let their_handshake: SyncHandshake = wormhole
133 .receive_json::<SyncHandshake>()
134 .await
135 .context("failed to receive handshake")?
136 .context(
137 "peer sent incompatible handshake (are both sides running the same version of td?)",
138 )?;
139
140 // Determine the peer's version vector, validating project identity when
141 // both sides have a project selected.
142 let their_vv: VersionVector = match &their_handshake {
143 SyncHandshake::Sync {
144 project_name,
145 project_id,
146 version_vector,
147 } => {
148 let (my_project_name, my_project_id) = match &my_handshake {
149 SyncHandshake::Sync {
150 project_name,
151 project_id,
152 ..
153 } => (project_name, project_id),
154 SyncHandshake::Bootstrap { .. } | SyncHandshake::SyncAll { .. } => {
155 unreachable!("exchange always sends Sync")
156 }
157 };
158 if my_project_id != project_id {
159 let _ = wormhole.close().await;
160 bail!(
161 "project identity mismatch: local '{}' ({}) vs peer '{}' ({}). If this is the same logical project, remove the accidentally initted local copy and bootstrap with 'td sync' instead of running 'td project init <project>' on both machines",
162 my_project_name,
163 my_project_id,
164 project_name,
165 project_id,
166 );
167 }
168 version_vector.clone()
169 }
170 // Peer has no project; treat as a bootstrap request and export everything.
171 SyncHandshake::Bootstrap { version_vector } => version_vector.clone(),
172 SyncHandshake::SyncAll { projects } => {
173 // Peer has no project selected but sent their manifest.
174 // Find our project in their manifest to get their VV for us.
175 let my_id = read_project_id(store)?;
176 projects
177 .iter()
178 .find(|p| p.project_id == my_id)
179 .map(|p| p.version_vector.clone())
180 .unwrap_or_default()
181 }
182 };
183
184 // --- Phase 2: compute and exchange deltas ---
185 let my_delta = store
186 .doc()
187 .export(ExportMode::updates(&their_vv))
188 .context("failed to export delta for peer")?;
189
190 wormhole
191 .send(my_delta.clone())
192 .await
193 .context("failed to send delta")?;
194
195 let their_delta = wormhole
196 .receive()
197 .await
198 .context("failed to receive delta from peer")?;
199
200 wormhole.close().await.context("failed to close wormhole")?;
201
202 // --- Phase 3: import the peer's delta locally ---
203 let imported =
204 import_and_persist(store, &their_delta).context("failed to import peer delta")?;
205
206 Ok(SyncReport {
207 sent_bytes: my_delta.len(),
208 received_bytes: their_delta.len(),
209 imported,
210 })
211}
212
213/// Build a manifest of all local projects using the given data root.
214///
215/// Each entry carries the project name, stable id, and current version vector.
216/// Using an explicit `data_root` (rather than `HOME`) makes this safe to call
217/// from async contexts where `HOME` may vary between peers.
218pub fn build_local_manifest(data_root: &Path) -> Result<Vec<ProjectEntry>> {
219 let names = db::list_projects_in(data_root)?;
220 let mut entries = Vec::with_capacity(names.len());
221 for name in names {
222 let store = db::Store::open(data_root, &name)?;
223 let project_id = store.project_id()?;
224 let version_vector = store.doc().oplog_vv();
225 entries.push(ProjectEntry {
226 project_name: name,
227 project_id,
228 version_vector,
229 });
230 }
231 Ok(entries)
232}
233
234/// Run the SyncAll protocol over an already-established wormhole.
235///
236/// Sends a `SyncAll` handshake carrying `local_manifest`, then handles the
237/// three possible responses:
238///
239/// - **`Sync`** — peer has a project selected; bootstrap that project from
240/// them (identical semantics to the old single-peer bootstrap path).
241/// - **`SyncAll`** — peer also has no project selected; compute the
242/// intersection by `project_id`, sync each shared project in order, and
243/// return one `(Store, SyncReport)` per synced project. An empty
244/// intersection is not an error; an empty `Vec` is returned.
245/// - **`Bootstrap`** — peer is running an older td; bail with an upgrade hint.
246///
247/// `cwd` is only used when bootstrapping a brand-new project from a `Sync`
248/// peer (to create the directory binding). It is ignored in the `SyncAll`
249/// case where all projects already exist locally.
250pub async fn sync_all_exchange(
251 cwd: &Path,
252 data_root: &Path,
253 local_manifest: Vec<ProjectEntry>,
254 mut wormhole: Wormhole,
255) -> Result<Vec<(db::Store, SyncReport)>> {
256 // Send our manifest. Clone so we can still use `local_manifest` below.
257 wormhole
258 .send_json(&SyncHandshake::SyncAll {
259 projects: local_manifest.clone(),
260 })
261 .await
262 .context("failed to send SyncAll handshake")?;
263
264 let their_handshake: SyncHandshake = wormhole
265 .receive_json::<SyncHandshake>()
266 .await
267 .context("failed to receive handshake")?
268 .context(
269 "peer sent incompatible handshake (are both sides running the same version of td?)",
270 )?;
271
272 match their_handshake {
273 SyncHandshake::Sync {
274 project_name,
275 project_id,
276 version_vector,
277 } => {
278 // Peer has a specific project selected.
279 // Validate the peer's project name to prevent path traversal.
280 db::validate_project_name(&project_name)?;
281 let project_dir = data_root.join(PROJECTS_DIR).join(&project_name);
282
283 if project_dir.exists() {
284 // Project exists locally: open it, verify ID matches, then sync normally.
285 let store = db::Store::open(data_root, &project_name).with_context(|| {
286 format!("failed to open existing project '{}'", project_name)
287 })?;
288 let my_id = read_project_id(&store)?;
289 if my_id != project_id {
290 bail!(
291 "project identity mismatch: local '{}' ({}) vs peer '{}' ({}). \
292 Remove the accidentally initted local copy and bootstrap with 'td sync'",
293 project_name,
294 my_id,
295 project_name,
296 project_id
297 );
298 }
299
300 // Sync the existing project.
301 let my_delta = store
302 .doc()
303 .export(ExportMode::updates(&version_vector))
304 .context("failed to export delta for peer")?;
305
306 wormhole
307 .send(my_delta.clone())
308 .await
309 .context("failed to send delta")?;
310
311 let their_delta = wormhole
312 .receive()
313 .await
314 .context("failed to receive delta from peer")?;
315
316 wormhole.close().await.context("failed to close wormhole")?;
317
318 let imported = import_and_persist(&store, &their_delta)
319 .context("failed to import peer's delta")?;
320
321 let report = SyncReport {
322 sent_bytes: my_delta.len(),
323 received_bytes: their_delta.len(),
324 imported,
325 };
326 Ok(vec![(store, report)])
327 } else {
328 // Project doesn't exist: bootstrap from peer.
329 // The peer (who sent Sync) expects to send their full state and be done.
330 wormhole
331 .send(Vec::new())
332 .await
333 .context("failed to send empty bootstrap delta")?;
334 let their_delta = wormhole
335 .receive()
336 .await
337 .context("failed to receive bootstrap delta from peer")?;
338
339 if their_delta.is_empty() {
340 bail!("peer sent empty bootstrap delta");
341 }
342
343 wormhole.close().await.context("failed to close wormhole")?;
344
345 // Don't bind cwd when bootstrapping from SyncAll→Sync fallback —
346 // the user didn't intend to bind their current directory to this project.
347 let store =
348 db::bootstrap_sync_at(data_root, cwd, &project_name, &their_delta, false)?;
349
350 let report = SyncReport {
351 sent_bytes: 0,
352 received_bytes: their_delta.len(),
353 imported: true,
354 };
355 Ok(vec![(store, report)])
356 }
357 }
358
359 SyncHandshake::Bootstrap { .. } => {
360 // Peer is running an older td that does not know about SyncAll.
361 // The peer will have already failed when it received our SyncAll
362 // handshake, so both sides are closing.
363 let _ = wormhole.close().await;
364 bail!(
365 "peer is running an older version of td that does not support SyncAll. \
366 Upgrade td on both machines and try again"
367 );
368 }
369
370 SyncHandshake::SyncAll {
371 projects: their_projects,
372 } => {
373 // Both sides have no project selected. Compute intersection by
374 // project_id and sync each shared project.
375 sync_shared_projects(data_root, local_manifest, their_projects, wormhole).await
376 }
377 }
378}
379
380/// Exchange deltas for every project whose `project_id` appears on both sides.
381///
382/// Both sides sort the intersection by `project_id` (deterministic ordering),
383/// then for each project: send my delta, receive theirs, import. This mirrors
384/// the single-project [`exchange`] pattern — each side sends before it receives,
385/// so the relay buffers the message and neither side blocks waiting for the
386/// other to read first.
387async fn sync_shared_projects(
388 data_root: &Path,
389 local_projects: Vec<ProjectEntry>,
390 their_projects: Vec<ProjectEntry>,
391 mut wormhole: Wormhole,
392) -> Result<Vec<(db::Store, SyncReport)>> {
393 // Wrap the body in an async block so we can ensure close() runs on all paths.
394 let result: Result<Vec<(db::Store, SyncReport)>> = async {
395 // Build a fast lookup from project_id → their entry.
396 let their_by_id: HashMap<&str, &ProjectEntry> = their_projects
397 .iter()
398 .map(|p| (p.project_id.as_str(), p))
399 .collect();
400
401 // Collect the intersection, sorted by project_id for a deterministic wire
402 // ordering that both peers independently agree on.
403 let mut shared: Vec<(&ProjectEntry, &ProjectEntry)> = local_projects
404 .iter()
405 .filter_map(|mine| {
406 their_by_id
407 .get(mine.project_id.as_str())
408 .map(|theirs| (mine, *theirs))
409 })
410 .collect();
411 shared.sort_by(|a, b| a.0.project_id.cmp(&b.0.project_id));
412
413 if shared.is_empty() {
414 return Ok(vec![]);
415 }
416
417 // For each project: open store, export delta, send/receive, import.
418 // Stores are opened one at a time to avoid exhausting file descriptors
419 // when syncing many projects.
420 let mut results: Vec<(db::Store, SyncReport)> = Vec::with_capacity(shared.len());
421 for (mine, theirs) in shared {
422 if mine.project_name != theirs.project_name {
423 eprintln!(
424 "warning: project name mismatch for id {}: \
425 local '{}', peer '{}' — syncing by id",
426 mine.project_id, mine.project_name, theirs.project_name,
427 );
428 }
429 let store = db::Store::open(data_root, &mine.project_name)
430 .with_context(|| format!("failed to open project '{}'", mine.project_name))?;
431 let my_delta = store
432 .doc()
433 .export(ExportMode::updates(&theirs.version_vector))
434 .with_context(|| {
435 format!("failed to export delta for '{}'", store.project_name())
436 })?;
437
438 wormhole
439 .send(my_delta.clone())
440 .await
441 .with_context(|| format!("failed to send delta for '{}'", store.project_name()))?;
442
443 let their_delta = wormhole.receive().await.with_context(|| {
444 format!("failed to receive delta for '{}'", store.project_name())
445 })?;
446
447 let imported = import_and_persist(&store, &their_delta).with_context(|| {
448 format!("failed to import delta for '{}'", store.project_name())
449 })?;
450
451 results.push((
452 store,
453 SyncReport {
454 sent_bytes: my_delta.len(),
455 received_bytes: their_delta.len(),
456 imported,
457 },
458 ));
459 }
460
461 Ok(results)
462 }
463 .await;
464
465 // Always close the wormhole, but don't let close errors mask the original result.
466 let _ = wormhole.close().await;
467 result
468}
469
470pub fn run(root: &Path, code: Option<&str>, json: bool) -> Result<()> {
471 let rt = tokio::runtime::Runtime::new().context("failed to create async runtime")?;
472 rt.block_on(run_async(root, code, json))
473}
474
475async fn run_async(root: &Path, code: Option<&str>, json: bool) -> Result<()> {
476 let maybe_store = db::try_open(root)?;
477 let c = crate::color::stderr_theme();
478
479 let wormhole = connect_wormhole(code, json, c).await?;
480
481 if let Some(store) = maybe_store {
482 // A project is selected: single-project sync.
483 if !json {
484 eprintln!("{}wormhole:{} connected, syncing...", c.blue, c.reset);
485 }
486 let report = exchange(&store, wormhole).await?;
487 print_sync_report(&store, &report, json, c)?;
488 } else {
489 // No project selected: enumerate local projects and attempt SyncAll.
490 let data_root = db::data_root()?;
491 let local_manifest = build_local_manifest(&data_root)?;
492
493 if !json {
494 if local_manifest.is_empty() {
495 eprintln!(
496 "{}wormhole:{} connected, bootstrapping from peer...",
497 c.blue, c.reset
498 );
499 } else {
500 eprintln!(
501 "{}wormhole:{} connected, syncing all shared projects...",
502 c.blue, c.reset
503 );
504 }
505 }
506
507 let results = sync_all_exchange(root, &data_root, local_manifest, wormhole).await?;
508
509 if results.is_empty() {
510 if json {
511 println!(
512 "{}",
513 serde_json::to_string(
514 &serde_json::json!({"synced": false, "reason": "no_shared_projects"})
515 )?
516 );
517 } else {
518 eprintln!("{}info:{} no shared projects to sync", c.blue, c.reset);
519 }
520 } else {
521 // Iterate by value to drop each Store (and its file handle) immediately
522 // after printing, rather than holding all handles until the loop ends.
523 for (store, report) in results {
524 print_sync_report(&store, &report, json, c)?;
525 }
526 }
527 }
528
529 Ok(())
530}
531
532async fn connect_wormhole(
533 code: Option<&str>,
534 json: bool,
535 c: &crate::color::Theme,
536) -> Result<Wormhole> {
537 match code {
538 None => {
539 let mailbox = MailboxConnection::create(wormhole_config(), CODE_WORD_COUNT)
540 .await
541 .context("failed to create wormhole mailbox")?;
542
543 let code = mailbox.code().clone();
544 if json {
545 println!(
546 "{}",
547 serde_json::to_string(&serde_json::json!({"code": code.to_string()}))?
548 );
549 } else {
550 eprintln!("{}wormhole:{} run on the other machine:\n", c.blue, c.reset);
551 eprintln!(" td sync {}{}{}\n", c.bold, code, c.reset);
552 eprintln!("waiting for peer...");
553 }
554
555 Wormhole::connect(mailbox)
556 .await
557 .context("wormhole key exchange failed")
558 }
559 Some(raw) => {
560 let code: Code = raw.parse().context("invalid wormhole code")?;
561 let mailbox = MailboxConnection::connect(wormhole_config(), code, false)
562 .await
563 .context("failed to connect to wormhole mailbox")?;
564
565 if !json {
566 eprintln!("{}wormhole:{} connecting...", c.blue, c.reset);
567 }
568
569 Wormhole::connect(mailbox)
570 .await
571 .context("wormhole key exchange failed")
572 }
573 }
574}
575
576fn print_sync_report(
577 store: &db::Store,
578 report: &SyncReport,
579 json: bool,
580 c: &crate::color::Theme,
581) -> Result<()> {
582 if json {
583 println!(
584 "{}",
585 serde_json::to_string(&serde_json::json!({
586 "synced": true,
587 "project": store.project_name(),
588 "sent_bytes": report.sent_bytes,
589 "received_bytes": report.received_bytes,
590 }))?
591 );
592 } else {
593 eprintln!(
594 "{}synced:{} {} (sent {} bytes, received {} bytes)",
595 c.green,
596 c.reset,
597 store.project_name(),
598 report.sent_bytes,
599 report.received_bytes,
600 );
601 if report.imported {
602 eprintln!("{}info:{} imported peer changes", c.blue, c.reset);
603 } else {
604 eprintln!("{}info:{} peer had no new changes", c.blue, c.reset);
605 }
606 }
607 Ok(())
608}
609
610/// Read the stable project identity from the doc's root meta map.
611fn read_project_id(store: &db::Store) -> Result<String> {
612 store.project_id()
613}