sync.rs

  1//! Peer-to-peer project sync via magic-wormhole.
  2//!
  3//! Both peers open the same project, exchange Loro version vectors,
  4//! compute deltas containing only the ops the other side lacks, then
  5//! exchange and import those deltas.  The result is that both docs
  6//! converge to the same state without sending duplicate operations.
  7
  8use std::borrow::Cow;
  9use std::path::Path;
 10
 11use anyhow::{bail, Context, Result};
 12use loro::{ExportMode, VersionVector};
 13use magic_wormhole::{AppConfig, AppID, Code, MailboxConnection, Wormhole};
 14use serde::{Deserialize, Serialize};
 15
 16use crate::db;
 17
 18/// Custom AppID scoping our wormhole traffic away from other protocols.
 19const APP_ID: &str = "td.sync.v1";
 20
 21/// Number of random words in the generated wormhole code.
 22const CODE_WORD_COUNT: usize = 2;
 23
 24/// Handshake message exchanged before the delta payload.
 25#[derive(Debug, Serialize, Deserialize)]
 26#[serde(tag = "mode")]
 27enum SyncHandshake {
 28    Sync {
 29        /// Human-readable project name.
 30        project_name: String,
 31        /// Stable identity (ULID stored in the doc's root meta map).
 32        project_id: String,
 33        /// Serialised version vector so the peer can compute a minimal delta.
 34        #[serde(with = "vv_serde")]
 35        version_vector: VersionVector,
 36    },
 37    Bootstrap {
 38        /// Serialised version vector so the peer can compute a minimal delta.
 39        #[serde(with = "vv_serde")]
 40        version_vector: VersionVector,
 41    },
 42}
 43
 44/// Serde adapter for `VersionVector` using its postcard `encode()`/`decode()`.
 45mod vv_serde {
 46    use loro::VersionVector;
 47    use serde::{self, Deserializer, Serializer};
 48
 49    pub fn serialize<S: Serializer>(vv: &VersionVector, ser: S) -> Result<S::Ok, S::Error> {
 50        ser.serialize_bytes(&vv.encode())
 51    }
 52
 53    pub fn deserialize<'de, D: Deserializer<'de>>(de: D) -> Result<VersionVector, D::Error> {
 54        let bytes: Vec<u8> = serde::Deserialize::deserialize(de)?;
 55        VersionVector::decode(&bytes).map_err(serde::de::Error::custom)
 56    }
 57}
 58
 59/// Outcome of a sync exchange, returned by [`exchange`].
 60pub struct SyncReport {
 61    pub sent_bytes: usize,
 62    pub received_bytes: usize,
 63    pub imported: bool,
 64}
 65
 66pub fn wormhole_config() -> AppConfig<serde_json::Value> {
 67    AppConfig {
 68        id: AppID::new(APP_ID),
 69        rendezvous_url: Cow::Borrowed(magic_wormhole::rendezvous::DEFAULT_RENDEZVOUS_SERVER),
 70        app_version: serde_json::json!({"v": 1}),
 71    }
 72}
 73
 74/// Run the sync protocol over an already-established wormhole.
 75///
 76/// Both sides call this concurrently.  The protocol is symmetric: each
 77/// peer sends its version vector, receives the other's, computes a
 78/// minimal delta, sends it, receives the peer's delta, and imports it.
 79pub async fn exchange(store: &db::Store, mut wormhole: Wormhole) -> Result<SyncReport> {
 80    let my_handshake = SyncHandshake::Sync {
 81        project_name: store.project_name().to_string(),
 82        project_id: read_project_id(store)?,
 83        version_vector: store.doc().oplog_vv(),
 84    };
 85
 86    // --- Phase 1: exchange handshakes ---
 87    wormhole
 88        .send_json(&my_handshake)
 89        .await
 90        .context("failed to send handshake")?;
 91
 92    let their_handshake: SyncHandshake = wormhole
 93        .receive_json::<SyncHandshake>()
 94        .await
 95        .context("failed to receive handshake")?
 96        .context("peer sent invalid handshake JSON")?;
 97
 98    let their_vv = match &their_handshake {
 99        SyncHandshake::Sync {
100            project_name,
101            project_id,
102            version_vector,
103        } => {
104            let (my_project_name, my_project_id) = match &my_handshake {
105                SyncHandshake::Sync {
106                    project_name,
107                    project_id,
108                    ..
109                } => (project_name, project_id),
110                SyncHandshake::Bootstrap { .. } => unreachable!("sync exchange always uses Sync"),
111            };
112            if my_project_id != project_id {
113                let _ = wormhole.close().await;
114                bail!(
115                    "project identity mismatch: local '{}' ({}) vs peer '{}' ({}). If this is the same logical project, remove the accidentally initted local copy and bootstrap with 'td sync' instead of running 'td init' on both machines",
116                    my_project_name,
117                    my_project_id,
118                    project_name,
119                    project_id,
120                );
121            }
122            version_vector
123        }
124        SyncHandshake::Bootstrap { version_vector } => version_vector,
125    };
126
127    // --- Phase 2: compute and exchange deltas ---
128    let my_delta = store
129        .doc()
130        .export(ExportMode::updates(their_vv))
131        .context("failed to export delta for peer")?;
132
133    wormhole
134        .send(my_delta.clone())
135        .await
136        .context("failed to send delta")?;
137
138    let their_delta = wormhole
139        .receive()
140        .await
141        .context("failed to receive delta from peer")?;
142
143    wormhole.close().await.context("failed to close wormhole")?;
144
145    // --- Phase 3: import the peer's delta locally ---
146    let imported = if !their_delta.is_empty() {
147        store
148            .doc()
149            .import(&their_delta)
150            .context("failed to import peer delta")?;
151        store.doc().commit();
152        store.save_raw_delta(&their_delta)?;
153        true
154    } else {
155        false
156    };
157
158    Ok(SyncReport {
159        sent_bytes: my_delta.len(),
160        received_bytes: their_delta.len(),
161        imported,
162    })
163}
164
165pub fn run(root: &Path, code: Option<&str>, json: bool) -> Result<()> {
166    let rt = tokio::runtime::Runtime::new().context("failed to create async runtime")?;
167    rt.block_on(run_async(root, code, json))
168}
169
170async fn run_async(root: &Path, code: Option<&str>, json: bool) -> Result<()> {
171    let maybe_store = db::try_open(root)?;
172    let c = crate::color::stderr_theme();
173
174    let wormhole = connect_wormhole(code, json, c).await?;
175
176    let (store, report) = if let Some(store) = maybe_store {
177        if !json {
178            eprintln!("{}wormhole:{} connected, syncing...", c.blue, c.reset);
179        }
180        let report = exchange(&store, wormhole).await?;
181        (store, report)
182    } else {
183        if !json {
184            eprintln!(
185                "{}wormhole:{} connected, bootstrapping from peer...",
186                c.blue, c.reset
187            );
188        }
189        bootstrap_exchange(root, wormhole).await?
190    };
191
192    print_sync_report(&store, &report, json, c)?;
193
194    Ok(())
195}
196
197async fn bootstrap_exchange(
198    root: &Path,
199    mut wormhole: Wormhole,
200) -> Result<(db::Store, SyncReport)> {
201    wormhole
202        .send_json(&SyncHandshake::Bootstrap {
203            version_vector: VersionVector::default(),
204        })
205        .await
206        .context("failed to send bootstrap handshake")?;
207
208    let their_handshake: SyncHandshake = wormhole
209        .receive_json::<SyncHandshake>()
210        .await
211        .context("failed to receive handshake")?
212        .context("peer sent invalid handshake JSON")?;
213
214    let project_name = match their_handshake {
215        SyncHandshake::Sync { project_name, .. } => project_name,
216        SyncHandshake::Bootstrap { .. } => {
217            let _ = wormhole.close().await;
218            bail!(
219                "both peers are in bootstrap mode. Run 'td init <project>' on one machine first, then run 'td sync' on the other"
220            );
221        }
222    };
223
224    wormhole
225        .send(Vec::new())
226        .await
227        .context("failed to send bootstrap delta")?;
228
229    let their_delta = wormhole
230        .receive()
231        .await
232        .context("failed to receive bootstrap delta from peer")?;
233
234    wormhole.close().await.context("failed to close wormhole")?;
235
236    if their_delta.is_empty() {
237        bail!("peer sent empty bootstrap delta");
238    }
239
240    let store = db::bootstrap_sync(root, &project_name, &their_delta)?;
241    let report = SyncReport {
242        sent_bytes: 0,
243        received_bytes: their_delta.len(),
244        imported: true,
245    };
246    Ok((store, report))
247}
248
249async fn connect_wormhole(
250    code: Option<&str>,
251    json: bool,
252    c: &crate::color::Theme,
253) -> Result<Wormhole> {
254    match code {
255        None => {
256            let mailbox = MailboxConnection::create(wormhole_config(), CODE_WORD_COUNT)
257                .await
258                .context("failed to create wormhole mailbox")?;
259
260            let code = mailbox.code().clone();
261            if json {
262                println!(
263                    "{}",
264                    serde_json::to_string(&serde_json::json!({"code": code.to_string()}))?
265                );
266            } else {
267                eprintln!("{}wormhole:{} run on the other machine:\n", c.blue, c.reset);
268                eprintln!("  td sync {}{}{}\n", c.bold, code, c.reset);
269                eprintln!("waiting for peer...");
270            }
271
272            Wormhole::connect(mailbox)
273                .await
274                .context("wormhole key exchange failed")
275        }
276        Some(raw) => {
277            let code: Code = raw.parse().context("invalid wormhole code")?;
278            let mailbox = MailboxConnection::connect(wormhole_config(), code, false)
279                .await
280                .context("failed to connect to wormhole mailbox")?;
281
282            if !json {
283                eprintln!("{}wormhole:{} connecting...", c.blue, c.reset);
284            }
285
286            Wormhole::connect(mailbox)
287                .await
288                .context("wormhole key exchange failed")
289        }
290    }
291}
292
293fn print_sync_report(
294    store: &db::Store,
295    report: &SyncReport,
296    json: bool,
297    c: &crate::color::Theme,
298) -> Result<()> {
299    if json {
300        println!(
301            "{}",
302            serde_json::to_string(&serde_json::json!({
303                "synced": true,
304                "project": store.project_name(),
305                "sent_bytes": report.sent_bytes,
306                "received_bytes": report.received_bytes,
307            }))?
308        );
309    } else {
310        eprintln!(
311            "{}synced:{} {} (sent {} bytes, received {} bytes)",
312            c.green,
313            c.reset,
314            store.project_name(),
315            report.sent_bytes,
316            report.received_bytes,
317        );
318        if report.imported {
319            eprintln!("{}info:{} imported peer changes", c.blue, c.reset);
320        } else {
321            eprintln!("{}info:{} peer had no new changes", c.blue, c.reset);
322        }
323    }
324    Ok(())
325}
326
327/// Read the stable project identity from the doc's root meta map.
328fn read_project_id(store: &db::Store) -> Result<String> {
329    let root = serde_json::to_value(store.doc().get_deep_value())?;
330    root.get("meta")
331        .and_then(|m| m.get("project_id"))
332        .and_then(|v| v.as_str())
333        .map(str::to_owned)
334        .ok_or_else(|| anyhow::anyhow!("missing meta.project_id in project doc"))
335}