1//! Peer-to-peer project sync via magic-wormhole.
2//!
3//! Both peers open the same project, exchange Loro version vectors,
4//! compute deltas containing only the ops the other side lacks, then
5//! exchange and import those deltas. The result is that both docs
6//! converge to the same state without sending duplicate operations.
7
8use std::borrow::Cow;
9use std::path::Path;
10
11use anyhow::{bail, Context, Result};
12use loro::{ExportMode, VersionVector};
13use magic_wormhole::{AppConfig, AppID, Code, MailboxConnection, Wormhole};
14use serde::{Deserialize, Serialize};
15
16use crate::db;
17
18/// Custom AppID scoping our wormhole traffic away from other protocols.
19const APP_ID: &str = "td.sync.v1";
20
21/// Number of random words in the generated wormhole code.
22const CODE_WORD_COUNT: usize = 2;
23
24/// Handshake message exchanged before the delta payload.
25#[derive(Debug, Serialize, Deserialize)]
26#[serde(tag = "mode")]
27enum SyncHandshake {
28 Sync {
29 /// Human-readable project name.
30 project_name: String,
31 /// Stable identity (ULID stored in the doc's root meta map).
32 project_id: String,
33 /// Serialised version vector so the peer can compute a minimal delta.
34 #[serde(with = "vv_serde")]
35 version_vector: VersionVector,
36 },
37 Bootstrap {
38 /// Serialised version vector so the peer can compute a minimal delta.
39 #[serde(with = "vv_serde")]
40 version_vector: VersionVector,
41 },
42}
43
44/// Serde adapter for `VersionVector` using its postcard `encode()`/`decode()`.
45mod vv_serde {
46 use loro::VersionVector;
47 use serde::{self, Deserializer, Serializer};
48
49 pub fn serialize<S: Serializer>(vv: &VersionVector, ser: S) -> Result<S::Ok, S::Error> {
50 ser.serialize_bytes(&vv.encode())
51 }
52
53 pub fn deserialize<'de, D: Deserializer<'de>>(de: D) -> Result<VersionVector, D::Error> {
54 let bytes: Vec<u8> = serde::Deserialize::deserialize(de)?;
55 VersionVector::decode(&bytes).map_err(serde::de::Error::custom)
56 }
57}
58
59/// Outcome of a sync exchange, returned by [`exchange`].
60pub struct SyncReport {
61 pub sent_bytes: usize,
62 pub received_bytes: usize,
63 pub imported: bool,
64}
65
66pub fn wormhole_config() -> AppConfig<serde_json::Value> {
67 AppConfig {
68 id: AppID::new(APP_ID),
69 rendezvous_url: Cow::Borrowed(magic_wormhole::rendezvous::DEFAULT_RENDEZVOUS_SERVER),
70 app_version: serde_json::json!({"v": 1}),
71 }
72}
73
74/// Run the sync protocol over an already-established wormhole.
75///
76/// Both sides call this concurrently. The protocol is symmetric: each
77/// peer sends its version vector, receives the other's, computes a
78/// minimal delta, sends it, receives the peer's delta, and imports it.
79pub async fn exchange(store: &db::Store, mut wormhole: Wormhole) -> Result<SyncReport> {
80 let my_handshake = SyncHandshake::Sync {
81 project_name: store.project_name().to_string(),
82 project_id: read_project_id(store)?,
83 version_vector: store.doc().oplog_vv(),
84 };
85
86 // --- Phase 1: exchange handshakes ---
87 wormhole
88 .send_json(&my_handshake)
89 .await
90 .context("failed to send handshake")?;
91
92 let their_handshake: SyncHandshake = wormhole
93 .receive_json::<SyncHandshake>()
94 .await
95 .context("failed to receive handshake")?
96 .context(
97 "peer sent incompatible handshake (are both sides running the same version of td?)",
98 )?;
99
100 let their_vv = match &their_handshake {
101 SyncHandshake::Sync {
102 project_name,
103 project_id,
104 version_vector,
105 } => {
106 let (my_project_name, my_project_id) = match &my_handshake {
107 SyncHandshake::Sync {
108 project_name,
109 project_id,
110 ..
111 } => (project_name, project_id),
112 SyncHandshake::Bootstrap { .. } => unreachable!("sync exchange always uses Sync"),
113 };
114 if my_project_id != project_id {
115 let _ = wormhole.close().await;
116 bail!(
117 "project identity mismatch: local '{}' ({}) vs peer '{}' ({}). If this is the same logical project, remove the accidentally initted local copy and bootstrap with 'td sync' instead of running 'td init' on both machines",
118 my_project_name,
119 my_project_id,
120 project_name,
121 project_id,
122 );
123 }
124 version_vector
125 }
126 SyncHandshake::Bootstrap { version_vector } => version_vector,
127 };
128
129 // --- Phase 2: compute and exchange deltas ---
130 let my_delta = store
131 .doc()
132 .export(ExportMode::updates(their_vv))
133 .context("failed to export delta for peer")?;
134
135 wormhole
136 .send(my_delta.clone())
137 .await
138 .context("failed to send delta")?;
139
140 let their_delta = wormhole
141 .receive()
142 .await
143 .context("failed to receive delta from peer")?;
144
145 wormhole.close().await.context("failed to close wormhole")?;
146
147 // --- Phase 3: import the peer's delta locally ---
148 let imported = if !their_delta.is_empty() {
149 store
150 .doc()
151 .import(&their_delta)
152 .context("failed to import peer delta")?;
153 store.doc().commit();
154 store.save_raw_delta(&their_delta)?;
155 true
156 } else {
157 false
158 };
159
160 Ok(SyncReport {
161 sent_bytes: my_delta.len(),
162 received_bytes: their_delta.len(),
163 imported,
164 })
165}
166
167pub fn run(root: &Path, code: Option<&str>, json: bool) -> Result<()> {
168 let rt = tokio::runtime::Runtime::new().context("failed to create async runtime")?;
169 rt.block_on(run_async(root, code, json))
170}
171
172async fn run_async(root: &Path, code: Option<&str>, json: bool) -> Result<()> {
173 let maybe_store = db::try_open(root)?;
174 let c = crate::color::stderr_theme();
175
176 let wormhole = connect_wormhole(code, json, c).await?;
177
178 let (store, report) = if let Some(store) = maybe_store {
179 if !json {
180 eprintln!("{}wormhole:{} connected, syncing...", c.blue, c.reset);
181 }
182 let report = exchange(&store, wormhole).await?;
183 (store, report)
184 } else {
185 if !json {
186 eprintln!(
187 "{}wormhole:{} connected, bootstrapping from peer...",
188 c.blue, c.reset
189 );
190 }
191 bootstrap_exchange(root, wormhole).await?
192 };
193
194 print_sync_report(&store, &report, json, c)?;
195
196 Ok(())
197}
198
199async fn bootstrap_exchange(
200 root: &Path,
201 mut wormhole: Wormhole,
202) -> Result<(db::Store, SyncReport)> {
203 wormhole
204 .send_json(&SyncHandshake::Bootstrap {
205 version_vector: VersionVector::default(),
206 })
207 .await
208 .context("failed to send bootstrap handshake")?;
209
210 let their_handshake: SyncHandshake = wormhole
211 .receive_json::<SyncHandshake>()
212 .await
213 .context("failed to receive handshake")?
214 .context(
215 "peer sent incompatible handshake (are both sides running the same version of td?)",
216 )?;
217
218 let project_name = match their_handshake {
219 SyncHandshake::Sync { project_name, .. } => project_name,
220 SyncHandshake::Bootstrap { .. } => {
221 let _ = wormhole.close().await;
222 bail!(
223 "both peers are in bootstrap mode. Run 'td init <project>' on one machine first, then run 'td sync' on the other"
224 );
225 }
226 };
227
228 wormhole
229 .send(Vec::new())
230 .await
231 .context("failed to send bootstrap delta")?;
232
233 let their_delta = wormhole
234 .receive()
235 .await
236 .context("failed to receive bootstrap delta from peer")?;
237
238 wormhole.close().await.context("failed to close wormhole")?;
239
240 if their_delta.is_empty() {
241 bail!("peer sent empty bootstrap delta");
242 }
243
244 let store = db::bootstrap_sync(root, &project_name, &their_delta)?;
245 let report = SyncReport {
246 sent_bytes: 0,
247 received_bytes: their_delta.len(),
248 imported: true,
249 };
250 Ok((store, report))
251}
252
253async fn connect_wormhole(
254 code: Option<&str>,
255 json: bool,
256 c: &crate::color::Theme,
257) -> Result<Wormhole> {
258 match code {
259 None => {
260 let mailbox = MailboxConnection::create(wormhole_config(), CODE_WORD_COUNT)
261 .await
262 .context("failed to create wormhole mailbox")?;
263
264 let code = mailbox.code().clone();
265 if json {
266 println!(
267 "{}",
268 serde_json::to_string(&serde_json::json!({"code": code.to_string()}))?
269 );
270 } else {
271 eprintln!("{}wormhole:{} run on the other machine:\n", c.blue, c.reset);
272 eprintln!(" td sync {}{}{}\n", c.bold, code, c.reset);
273 eprintln!("waiting for peer...");
274 }
275
276 Wormhole::connect(mailbox)
277 .await
278 .context("wormhole key exchange failed")
279 }
280 Some(raw) => {
281 let code: Code = raw.parse().context("invalid wormhole code")?;
282 let mailbox = MailboxConnection::connect(wormhole_config(), code, false)
283 .await
284 .context("failed to connect to wormhole mailbox")?;
285
286 if !json {
287 eprintln!("{}wormhole:{} connecting...", c.blue, c.reset);
288 }
289
290 Wormhole::connect(mailbox)
291 .await
292 .context("wormhole key exchange failed")
293 }
294 }
295}
296
297fn print_sync_report(
298 store: &db::Store,
299 report: &SyncReport,
300 json: bool,
301 c: &crate::color::Theme,
302) -> Result<()> {
303 if json {
304 println!(
305 "{}",
306 serde_json::to_string(&serde_json::json!({
307 "synced": true,
308 "project": store.project_name(),
309 "sent_bytes": report.sent_bytes,
310 "received_bytes": report.received_bytes,
311 }))?
312 );
313 } else {
314 eprintln!(
315 "{}synced:{} {} (sent {} bytes, received {} bytes)",
316 c.green,
317 c.reset,
318 store.project_name(),
319 report.sent_bytes,
320 report.received_bytes,
321 );
322 if report.imported {
323 eprintln!("{}info:{} imported peer changes", c.blue, c.reset);
324 } else {
325 eprintln!("{}info:{} peer had no new changes", c.blue, c.reset);
326 }
327 }
328 Ok(())
329}
330
331/// Read the stable project identity from the doc's root meta map.
332fn read_project_id(store: &db::Store) -> Result<String> {
333 let root = serde_json::to_value(store.doc().get_deep_value())?;
334 root.get("meta")
335 .and_then(|m| m.get("project_id"))
336 .and_then(|v| v.as_str())
337 .map(str::to_owned)
338 .ok_or_else(|| anyhow::anyhow!("missing meta.project_id in project doc"))
339}