1//! Peer-to-peer project sync via magic-wormhole.
2//!
3//! Both peers open the same project, exchange Loro version vectors,
4//! compute deltas containing only the ops the other side lacks, then
5//! exchange and import those deltas. The result is that both docs
6//! converge to the same state without sending duplicate operations.
7
8use std::borrow::Cow;
9use std::path::Path;
10
11use anyhow::{bail, Context, Result};
12use loro::{ExportMode, VersionVector};
13use magic_wormhole::{AppConfig, AppID, Code, MailboxConnection, Wormhole};
14use serde::{Deserialize, Serialize};
15
16use crate::db;
17
18/// Custom AppID scoping our wormhole traffic away from other protocols.
19const APP_ID: &str = "td.sync.v1";
20
21/// Number of random words in the generated wormhole code.
22const CODE_WORD_COUNT: usize = 2;
23
24/// Handshake message exchanged before the delta payload.
25#[derive(Debug, Serialize, Deserialize)]
26#[serde(tag = "mode")]
27enum SyncHandshake {
28 Sync {
29 /// Human-readable project name.
30 project_name: String,
31 /// Stable identity (ULID stored in the doc's root meta map).
32 project_id: String,
33 /// Serialised version vector so the peer can compute a minimal delta.
34 #[serde(with = "vv_serde")]
35 version_vector: VersionVector,
36 },
37 Bootstrap {
38 /// Serialised version vector so the peer can compute a minimal delta.
39 #[serde(with = "vv_serde")]
40 version_vector: VersionVector,
41 },
42}
43
44/// Serde adapter for `VersionVector` using its postcard `encode()`/`decode()`.
45mod vv_serde {
46 use loro::VersionVector;
47 use serde::{self, Deserializer, Serializer};
48
49 pub fn serialize<S: Serializer>(vv: &VersionVector, ser: S) -> Result<S::Ok, S::Error> {
50 ser.serialize_bytes(&vv.encode())
51 }
52
53 pub fn deserialize<'de, D: Deserializer<'de>>(de: D) -> Result<VersionVector, D::Error> {
54 let bytes: Vec<u8> = serde::Deserialize::deserialize(de)?;
55 VersionVector::decode(&bytes).map_err(serde::de::Error::custom)
56 }
57}
58
59/// Outcome of a sync exchange, returned by [`exchange`].
60pub struct SyncReport {
61 pub sent_bytes: usize,
62 pub received_bytes: usize,
63 pub imported: bool,
64}
65
66pub fn wormhole_config() -> AppConfig<serde_json::Value> {
67 AppConfig {
68 id: AppID::new(APP_ID),
69 rendezvous_url: Cow::Borrowed(magic_wormhole::rendezvous::DEFAULT_RENDEZVOUS_SERVER),
70 app_version: serde_json::json!({"v": 1}),
71 }
72}
73
74/// Run the sync protocol over an already-established wormhole.
75///
76/// Both sides call this concurrently. The protocol is symmetric: each
77/// peer sends its version vector, receives the other's, computes a
78/// minimal delta, sends it, receives the peer's delta, and imports it.
79pub async fn exchange(store: &db::Store, mut wormhole: Wormhole) -> Result<SyncReport> {
80 let my_handshake = SyncHandshake::Sync {
81 project_name: store.project_name().to_string(),
82 project_id: read_project_id(store)?,
83 version_vector: store.doc().oplog_vv(),
84 };
85
86 // --- Phase 1: exchange handshakes ---
87 wormhole
88 .send_json(&my_handshake)
89 .await
90 .context("failed to send handshake")?;
91
92 let their_handshake: SyncHandshake = wormhole
93 .receive_json::<SyncHandshake>()
94 .await
95 .context("failed to receive handshake")?
96 .context("peer sent invalid handshake JSON")?;
97
98 let their_vv = match &their_handshake {
99 SyncHandshake::Sync {
100 project_name,
101 project_id,
102 version_vector,
103 } => {
104 let (my_project_name, my_project_id) = match &my_handshake {
105 SyncHandshake::Sync {
106 project_name,
107 project_id,
108 ..
109 } => (project_name, project_id),
110 SyncHandshake::Bootstrap { .. } => unreachable!("sync exchange always uses Sync"),
111 };
112 if my_project_id != project_id {
113 let _ = wormhole.close().await;
114 bail!(
115 "project identity mismatch: local '{}' ({}) vs peer '{}' ({}). If this is the same logical project, remove the accidentally initted local copy and bootstrap with 'td sync' instead of running 'td init' on both machines",
116 my_project_name,
117 my_project_id,
118 project_name,
119 project_id,
120 );
121 }
122 version_vector
123 }
124 SyncHandshake::Bootstrap { version_vector } => version_vector,
125 };
126
127 // --- Phase 2: compute and exchange deltas ---
128 let my_delta = store
129 .doc()
130 .export(ExportMode::updates(their_vv))
131 .context("failed to export delta for peer")?;
132
133 wormhole
134 .send(my_delta.clone())
135 .await
136 .context("failed to send delta")?;
137
138 let their_delta = wormhole
139 .receive()
140 .await
141 .context("failed to receive delta from peer")?;
142
143 wormhole.close().await.context("failed to close wormhole")?;
144
145 // --- Phase 3: import the peer's delta locally ---
146 let imported = if !their_delta.is_empty() {
147 store
148 .doc()
149 .import(&their_delta)
150 .context("failed to import peer delta")?;
151 store.doc().commit();
152 store.save_raw_delta(&their_delta)?;
153 true
154 } else {
155 false
156 };
157
158 Ok(SyncReport {
159 sent_bytes: my_delta.len(),
160 received_bytes: their_delta.len(),
161 imported,
162 })
163}
164
165pub fn run(root: &Path, code: Option<&str>, json: bool) -> Result<()> {
166 let rt = tokio::runtime::Runtime::new().context("failed to create async runtime")?;
167 rt.block_on(run_async(root, code, json))
168}
169
170async fn run_async(root: &Path, code: Option<&str>, json: bool) -> Result<()> {
171 let maybe_store = db::try_open(root)?;
172 let c = crate::color::stderr_theme();
173
174 let wormhole = connect_wormhole(code, json, c).await?;
175
176 let (store, report) = if let Some(store) = maybe_store {
177 if !json {
178 eprintln!("{}wormhole:{} connected, syncing...", c.blue, c.reset);
179 }
180 let report = exchange(&store, wormhole).await?;
181 (store, report)
182 } else {
183 if !json {
184 eprintln!(
185 "{}wormhole:{} connected, bootstrapping from peer...",
186 c.blue, c.reset
187 );
188 }
189 bootstrap_exchange(root, wormhole).await?
190 };
191
192 print_sync_report(&store, &report, json, c)?;
193
194 Ok(())
195}
196
197async fn bootstrap_exchange(
198 root: &Path,
199 mut wormhole: Wormhole,
200) -> Result<(db::Store, SyncReport)> {
201 wormhole
202 .send_json(&SyncHandshake::Bootstrap {
203 version_vector: VersionVector::default(),
204 })
205 .await
206 .context("failed to send bootstrap handshake")?;
207
208 let their_handshake: SyncHandshake = wormhole
209 .receive_json::<SyncHandshake>()
210 .await
211 .context("failed to receive handshake")?
212 .context("peer sent invalid handshake JSON")?;
213
214 let project_name = match their_handshake {
215 SyncHandshake::Sync { project_name, .. } => project_name,
216 SyncHandshake::Bootstrap { .. } => {
217 let _ = wormhole.close().await;
218 bail!(
219 "both peers are in bootstrap mode. Run 'td init <project>' on one machine first, then run 'td sync' on the other"
220 );
221 }
222 };
223
224 wormhole
225 .send(Vec::new())
226 .await
227 .context("failed to send bootstrap delta")?;
228
229 let their_delta = wormhole
230 .receive()
231 .await
232 .context("failed to receive bootstrap delta from peer")?;
233
234 wormhole.close().await.context("failed to close wormhole")?;
235
236 if their_delta.is_empty() {
237 bail!("peer sent empty bootstrap delta");
238 }
239
240 let store = db::bootstrap_sync(root, &project_name, &their_delta)?;
241 let report = SyncReport {
242 sent_bytes: 0,
243 received_bytes: their_delta.len(),
244 imported: true,
245 };
246 Ok((store, report))
247}
248
249async fn connect_wormhole(
250 code: Option<&str>,
251 json: bool,
252 c: &crate::color::Theme,
253) -> Result<Wormhole> {
254 match code {
255 None => {
256 let mailbox = MailboxConnection::create(wormhole_config(), CODE_WORD_COUNT)
257 .await
258 .context("failed to create wormhole mailbox")?;
259
260 let code = mailbox.code().clone();
261 if json {
262 println!(
263 "{}",
264 serde_json::to_string(&serde_json::json!({"code": code.to_string()}))?
265 );
266 } else {
267 eprintln!("{}wormhole:{} run on the other machine:\n", c.blue, c.reset);
268 eprintln!(" td sync {}{}{}\n", c.bold, code, c.reset);
269 eprintln!("waiting for peer...");
270 }
271
272 Wormhole::connect(mailbox)
273 .await
274 .context("wormhole key exchange failed")
275 }
276 Some(raw) => {
277 let code: Code = raw.parse().context("invalid wormhole code")?;
278 let mailbox = MailboxConnection::connect(wormhole_config(), code, false)
279 .await
280 .context("failed to connect to wormhole mailbox")?;
281
282 if !json {
283 eprintln!("{}wormhole:{} connecting...", c.blue, c.reset);
284 }
285
286 Wormhole::connect(mailbox)
287 .await
288 .context("wormhole key exchange failed")
289 }
290 }
291}
292
293fn print_sync_report(
294 store: &db::Store,
295 report: &SyncReport,
296 json: bool,
297 c: &crate::color::Theme,
298) -> Result<()> {
299 if json {
300 println!(
301 "{}",
302 serde_json::to_string(&serde_json::json!({
303 "synced": true,
304 "project": store.project_name(),
305 "sent_bytes": report.sent_bytes,
306 "received_bytes": report.received_bytes,
307 }))?
308 );
309 } else {
310 eprintln!(
311 "{}synced:{} {} (sent {} bytes, received {} bytes)",
312 c.green,
313 c.reset,
314 store.project_name(),
315 report.sent_bytes,
316 report.received_bytes,
317 );
318 if report.imported {
319 eprintln!("{}info:{} imported peer changes", c.blue, c.reset);
320 } else {
321 eprintln!("{}info:{} peer had no new changes", c.blue, c.reset);
322 }
323 }
324 Ok(())
325}
326
327/// Read the stable project identity from the doc's root meta map.
328fn read_project_id(store: &db::Store) -> Result<String> {
329 let root = serde_json::to_value(store.doc().get_deep_value())?;
330 root.get("meta")
331 .and_then(|m| m.get("project_id"))
332 .and_then(|v| v.as_str())
333 .map(str::to_owned)
334 .ok_or_else(|| anyhow::anyhow!("missing meta.project_id in project doc"))
335}