1//! Peer-to-peer project sync via magic-wormhole.
2//!
3//! Both peers open the same project, exchange Loro version vectors,
4//! compute deltas containing only the ops the other side lacks, then
5//! exchange and import those deltas. The result is that both docs
6//! converge to the same state without sending duplicate operations.
7
8use std::borrow::Cow;
9use std::path::Path;
10
11use anyhow::{bail, Context, Result};
12use loro::{ExportMode, VersionVector};
13use magic_wormhole::{AppConfig, AppID, Code, MailboxConnection, Wormhole};
14use serde::{Deserialize, Serialize};
15
16use crate::db;
17
18/// Custom AppID scoping our wormhole traffic away from other protocols.
19const APP_ID: &str = "td.sync.v1";
20
21/// Number of random words in the generated wormhole code.
22const CODE_WORD_COUNT: usize = 2;
23
24/// Handshake message exchanged before the delta payload.
25#[derive(Debug, Serialize, Deserialize)]
26struct SyncHandshake {
27 /// Human-readable project name.
28 project_name: String,
29 /// Stable identity (ULID stored in the doc's root meta map).
30 project_id: String,
31 /// Serialised version vector so the peer can compute a minimal delta.
32 #[serde(with = "vv_serde")]
33 version_vector: VersionVector,
34}
35
36/// Serde adapter for `VersionVector` using its postcard `encode()`/`decode()`.
37mod vv_serde {
38 use loro::VersionVector;
39 use serde::{self, Deserializer, Serializer};
40
41 pub fn serialize<S: Serializer>(vv: &VersionVector, ser: S) -> Result<S::Ok, S::Error> {
42 ser.serialize_bytes(&vv.encode())
43 }
44
45 pub fn deserialize<'de, D: Deserializer<'de>>(de: D) -> Result<VersionVector, D::Error> {
46 let bytes: Vec<u8> = serde::Deserialize::deserialize(de)?;
47 VersionVector::decode(&bytes).map_err(serde::de::Error::custom)
48 }
49}
50
51/// Outcome of a sync exchange, returned by [`exchange`].
52pub struct SyncReport {
53 pub sent_bytes: usize,
54 pub received_bytes: usize,
55 pub imported: bool,
56}
57
58pub fn wormhole_config() -> AppConfig<serde_json::Value> {
59 AppConfig {
60 id: AppID::new(APP_ID),
61 rendezvous_url: Cow::Borrowed(magic_wormhole::rendezvous::DEFAULT_RENDEZVOUS_SERVER),
62 app_version: serde_json::json!({"v": 1}),
63 }
64}
65
66/// Run the sync protocol over an already-established wormhole.
67///
68/// Both sides call this concurrently. The protocol is symmetric: each
69/// peer sends its version vector, receives the other's, computes a
70/// minimal delta, sends it, receives the peer's delta, and imports it.
71pub async fn exchange(store: &db::Store, mut wormhole: Wormhole) -> Result<SyncReport> {
72 let my_vv = store.doc().oplog_vv();
73 let my_handshake = SyncHandshake {
74 project_name: store.project_name().to_string(),
75 project_id: read_project_id(store)?,
76 version_vector: my_vv,
77 };
78
79 // --- Phase 1: exchange handshakes ---
80 wormhole
81 .send_json(&my_handshake)
82 .await
83 .context("failed to send handshake")?;
84
85 let their_handshake: SyncHandshake = wormhole
86 .receive_json::<SyncHandshake>()
87 .await
88 .context("failed to receive handshake")?
89 .context("peer sent invalid handshake JSON")?;
90
91 if my_handshake.project_id != their_handshake.project_id {
92 let _ = wormhole.close().await;
93 bail!(
94 "project identity mismatch: local '{}' ({}) vs peer '{}' ({})",
95 my_handshake.project_name,
96 my_handshake.project_id,
97 their_handshake.project_name,
98 their_handshake.project_id,
99 );
100 }
101
102 // --- Phase 2: compute and exchange deltas ---
103 let my_delta = store
104 .doc()
105 .export(ExportMode::updates(&their_handshake.version_vector))
106 .context("failed to export delta for peer")?;
107
108 wormhole
109 .send(my_delta.clone())
110 .await
111 .context("failed to send delta")?;
112
113 let their_delta = wormhole
114 .receive()
115 .await
116 .context("failed to receive delta from peer")?;
117
118 wormhole.close().await.context("failed to close wormhole")?;
119
120 // --- Phase 3: import the peer's delta locally ---
121 let imported = if !their_delta.is_empty() {
122 store
123 .doc()
124 .import(&their_delta)
125 .context("failed to import peer delta")?;
126 store.doc().commit();
127 store.save_raw_delta(&their_delta)?;
128 true
129 } else {
130 false
131 };
132
133 Ok(SyncReport {
134 sent_bytes: my_delta.len(),
135 received_bytes: their_delta.len(),
136 imported,
137 })
138}
139
140pub fn run(root: &Path, code: Option<&str>, json: bool) -> Result<()> {
141 let rt = tokio::runtime::Runtime::new().context("failed to create async runtime")?;
142 rt.block_on(run_async(root, code, json))
143}
144
145async fn run_async(root: &Path, code: Option<&str>, json: bool) -> Result<()> {
146 let store = db::open(root)?;
147 let c = crate::color::stderr_theme();
148
149 let wormhole = match code {
150 None => {
151 let mailbox = MailboxConnection::create(wormhole_config(), CODE_WORD_COUNT)
152 .await
153 .context("failed to create wormhole mailbox")?;
154
155 let code = mailbox.code().clone();
156 if json {
157 println!(
158 "{}",
159 serde_json::to_string(&serde_json::json!({"code": code.to_string()}))?
160 );
161 } else {
162 eprintln!("{}wormhole:{} run on the other machine:\n", c.blue, c.reset);
163 eprintln!(" td sync {}{}{}\n", c.bold, code, c.reset);
164 eprintln!("waiting for peer...");
165 }
166
167 Wormhole::connect(mailbox)
168 .await
169 .context("wormhole key exchange failed")?
170 }
171 Some(raw) => {
172 let code: Code = raw.parse().context("invalid wormhole code")?;
173 let mailbox = MailboxConnection::connect(wormhole_config(), code, false)
174 .await
175 .context("failed to connect to wormhole mailbox")?;
176
177 if !json {
178 eprintln!("{}wormhole:{} connecting...", c.blue, c.reset);
179 }
180
181 Wormhole::connect(mailbox)
182 .await
183 .context("wormhole key exchange failed")?
184 }
185 };
186
187 if !json {
188 eprintln!("{}wormhole:{} connected, syncing...", c.blue, c.reset);
189 }
190
191 let report = exchange(&store, wormhole).await?;
192
193 if json {
194 println!(
195 "{}",
196 serde_json::to_string(&serde_json::json!({
197 "synced": true,
198 "project": store.project_name(),
199 "sent_bytes": report.sent_bytes,
200 "received_bytes": report.received_bytes,
201 }))?
202 );
203 } else {
204 eprintln!(
205 "{}synced:{} {} (sent {} bytes, received {} bytes)",
206 c.green,
207 c.reset,
208 store.project_name(),
209 report.sent_bytes,
210 report.received_bytes,
211 );
212 if report.imported {
213 eprintln!("{}info:{} imported peer changes", c.blue, c.reset);
214 } else {
215 eprintln!("{}info:{} peer had no new changes", c.blue, c.reset);
216 }
217 }
218
219 Ok(())
220}
221
222/// Read the stable project identity from the doc's root meta map.
223fn read_project_id(store: &db::Store) -> Result<String> {
224 let root = serde_json::to_value(store.doc().get_deep_value())?;
225 root.get("meta")
226 .and_then(|m| m.get("project_id"))
227 .and_then(|v| v.as_str())
228 .map(str::to_owned)
229 .ok_or_else(|| anyhow::anyhow!("missing meta.project_id in project doc"))
230}