sync.rs

  1//! Peer-to-peer project sync via magic-wormhole.
  2//!
  3//! Both peers open the same project, exchange Loro version vectors,
  4//! compute deltas containing only the ops the other side lacks, then
  5//! exchange and import those deltas.  The result is that both docs
  6//! converge to the same state without sending duplicate operations.
  7
  8use std::borrow::Cow;
  9use std::path::Path;
 10
 11use anyhow::{bail, Context, Result};
 12use loro::{ExportMode, VersionVector};
 13use magic_wormhole::{AppConfig, AppID, Code, MailboxConnection, Wormhole};
 14use serde::{Deserialize, Serialize};
 15
 16use crate::db;
 17
 18/// Custom AppID scoping our wormhole traffic away from other protocols.
 19const APP_ID: &str = "td.sync.v1";
 20
 21/// Number of random words in the generated wormhole code.
 22const CODE_WORD_COUNT: usize = 2;
 23
 24/// Handshake message exchanged before the delta payload.
 25#[derive(Debug, Serialize, Deserialize)]
 26#[serde(tag = "mode")]
 27enum SyncHandshake {
 28    Sync {
 29        /// Human-readable project name.
 30        project_name: String,
 31        /// Stable identity (ULID stored in the doc's root meta map).
 32        project_id: String,
 33        /// Serialised version vector so the peer can compute a minimal delta.
 34        #[serde(with = "vv_serde")]
 35        version_vector: VersionVector,
 36    },
 37    Bootstrap {
 38        /// Serialised version vector so the peer can compute a minimal delta.
 39        #[serde(with = "vv_serde")]
 40        version_vector: VersionVector,
 41    },
 42}
 43
 44/// Serde adapter for `VersionVector` using its postcard `encode()`/`decode()`.
 45mod vv_serde {
 46    use loro::VersionVector;
 47    use serde::{self, Deserializer, Serializer};
 48
 49    pub fn serialize<S: Serializer>(vv: &VersionVector, ser: S) -> Result<S::Ok, S::Error> {
 50        ser.serialize_bytes(&vv.encode())
 51    }
 52
 53    pub fn deserialize<'de, D: Deserializer<'de>>(de: D) -> Result<VersionVector, D::Error> {
 54        let bytes: Vec<u8> = serde::Deserialize::deserialize(de)?;
 55        VersionVector::decode(&bytes).map_err(serde::de::Error::custom)
 56    }
 57}
 58
 59/// Outcome of a sync exchange, returned by [`exchange`].
 60pub struct SyncReport {
 61    pub sent_bytes: usize,
 62    pub received_bytes: usize,
 63    pub imported: bool,
 64}
 65
 66pub fn wormhole_config() -> AppConfig<serde_json::Value> {
 67    AppConfig {
 68        id: AppID::new(APP_ID),
 69        rendezvous_url: Cow::Borrowed(magic_wormhole::rendezvous::DEFAULT_RENDEZVOUS_SERVER),
 70        app_version: serde_json::json!({"v": 1}),
 71    }
 72}
 73
 74/// Run the sync protocol over an already-established wormhole.
 75///
 76/// Both sides call this concurrently.  The protocol is symmetric: each
 77/// peer sends its version vector, receives the other's, computes a
 78/// minimal delta, sends it, receives the peer's delta, and imports it.
 79pub async fn exchange(store: &db::Store, mut wormhole: Wormhole) -> Result<SyncReport> {
 80    let my_handshake = SyncHandshake::Sync {
 81        project_name: store.project_name().to_string(),
 82        project_id: read_project_id(store)?,
 83        version_vector: store.doc().oplog_vv(),
 84    };
 85
 86    // --- Phase 1: exchange handshakes ---
 87    wormhole
 88        .send_json(&my_handshake)
 89        .await
 90        .context("failed to send handshake")?;
 91
 92    let their_handshake: SyncHandshake = wormhole
 93        .receive_json::<SyncHandshake>()
 94        .await
 95        .context("failed to receive handshake")?
 96        .context(
 97            "peer sent incompatible handshake (are both sides running the same version of td?)",
 98        )?;
 99
100    let their_vv = match &their_handshake {
101        SyncHandshake::Sync {
102            project_name,
103            project_id,
104            version_vector,
105        } => {
106            let (my_project_name, my_project_id) = match &my_handshake {
107                SyncHandshake::Sync {
108                    project_name,
109                    project_id,
110                    ..
111                } => (project_name, project_id),
112                SyncHandshake::Bootstrap { .. } => unreachable!("sync exchange always uses Sync"),
113            };
114            if my_project_id != project_id {
115                let _ = wormhole.close().await;
116                bail!(
117                    "project identity mismatch: local '{}' ({}) vs peer '{}' ({}). If this is the same logical project, remove the accidentally initted local copy and bootstrap with 'td sync' instead of running 'td project init <project>' on both machines",
118                    my_project_name,
119                    my_project_id,
120                    project_name,
121                    project_id,
122                );
123            }
124            version_vector
125        }
126        SyncHandshake::Bootstrap { version_vector } => version_vector,
127    };
128
129    // --- Phase 2: compute and exchange deltas ---
130    let my_delta = store
131        .doc()
132        .export(ExportMode::updates(their_vv))
133        .context("failed to export delta for peer")?;
134
135    wormhole
136        .send(my_delta.clone())
137        .await
138        .context("failed to send delta")?;
139
140    let their_delta = wormhole
141        .receive()
142        .await
143        .context("failed to receive delta from peer")?;
144
145    wormhole.close().await.context("failed to close wormhole")?;
146
147    // --- Phase 3: import the peer's delta locally ---
148    let imported = if !their_delta.is_empty() {
149        store
150            .doc()
151            .import(&their_delta)
152            .context("failed to import peer delta")?;
153        store.doc().commit();
154        store.save_raw_delta(&their_delta)?;
155        true
156    } else {
157        false
158    };
159
160    Ok(SyncReport {
161        sent_bytes: my_delta.len(),
162        received_bytes: their_delta.len(),
163        imported,
164    })
165}
166
167pub fn run(root: &Path, code: Option<&str>, json: bool) -> Result<()> {
168    let rt = tokio::runtime::Runtime::new().context("failed to create async runtime")?;
169    rt.block_on(run_async(root, code, json))
170}
171
172async fn run_async(root: &Path, code: Option<&str>, json: bool) -> Result<()> {
173    let maybe_store = db::try_open(root)?;
174    let c = crate::color::stderr_theme();
175
176    let wormhole = connect_wormhole(code, json, c).await?;
177
178    let (store, report) = if let Some(store) = maybe_store {
179        if !json {
180            eprintln!("{}wormhole:{} connected, syncing...", c.blue, c.reset);
181        }
182        let report = exchange(&store, wormhole).await?;
183        (store, report)
184    } else {
185        if !json {
186            eprintln!(
187                "{}wormhole:{} connected, bootstrapping from peer...",
188                c.blue, c.reset
189            );
190        }
191        bootstrap_exchange(root, wormhole).await?
192    };
193
194    print_sync_report(&store, &report, json, c)?;
195
196    Ok(())
197}
198
199async fn bootstrap_exchange(
200    root: &Path,
201    mut wormhole: Wormhole,
202) -> Result<(db::Store, SyncReport)> {
203    wormhole
204        .send_json(&SyncHandshake::Bootstrap {
205            version_vector: VersionVector::default(),
206        })
207        .await
208        .context("failed to send bootstrap handshake")?;
209
210    let their_handshake: SyncHandshake = wormhole
211        .receive_json::<SyncHandshake>()
212        .await
213        .context("failed to receive handshake")?
214        .context(
215            "peer sent incompatible handshake (are both sides running the same version of td?)",
216        )?;
217
218    let project_name = match their_handshake {
219        SyncHandshake::Sync { project_name, .. } => project_name,
220        SyncHandshake::Bootstrap { .. } => {
221            let _ = wormhole.close().await;
222            bail!(
223                "both peers are in bootstrap mode. Run 'td project init <project>' on one machine first, then run 'td sync' on the other"
224            );
225        }
226    };
227
228    wormhole
229        .send(Vec::new())
230        .await
231        .context("failed to send bootstrap delta")?;
232
233    let their_delta = wormhole
234        .receive()
235        .await
236        .context("failed to receive bootstrap delta from peer")?;
237
238    wormhole.close().await.context("failed to close wormhole")?;
239
240    if their_delta.is_empty() {
241        bail!("peer sent empty bootstrap delta");
242    }
243
244    let store = db::bootstrap_sync(root, &project_name, &their_delta)?;
245    let report = SyncReport {
246        sent_bytes: 0,
247        received_bytes: their_delta.len(),
248        imported: true,
249    };
250    Ok((store, report))
251}
252
253async fn connect_wormhole(
254    code: Option<&str>,
255    json: bool,
256    c: &crate::color::Theme,
257) -> Result<Wormhole> {
258    match code {
259        None => {
260            let mailbox = MailboxConnection::create(wormhole_config(), CODE_WORD_COUNT)
261                .await
262                .context("failed to create wormhole mailbox")?;
263
264            let code = mailbox.code().clone();
265            if json {
266                println!(
267                    "{}",
268                    serde_json::to_string(&serde_json::json!({"code": code.to_string()}))?
269                );
270            } else {
271                eprintln!("{}wormhole:{} run on the other machine:\n", c.blue, c.reset);
272                eprintln!("  td sync {}{}{}\n", c.bold, code, c.reset);
273                eprintln!("waiting for peer...");
274            }
275
276            Wormhole::connect(mailbox)
277                .await
278                .context("wormhole key exchange failed")
279        }
280        Some(raw) => {
281            let code: Code = raw.parse().context("invalid wormhole code")?;
282            let mailbox = MailboxConnection::connect(wormhole_config(), code, false)
283                .await
284                .context("failed to connect to wormhole mailbox")?;
285
286            if !json {
287                eprintln!("{}wormhole:{} connecting...", c.blue, c.reset);
288            }
289
290            Wormhole::connect(mailbox)
291                .await
292                .context("wormhole key exchange failed")
293        }
294    }
295}
296
297fn print_sync_report(
298    store: &db::Store,
299    report: &SyncReport,
300    json: bool,
301    c: &crate::color::Theme,
302) -> Result<()> {
303    if json {
304        println!(
305            "{}",
306            serde_json::to_string(&serde_json::json!({
307                "synced": true,
308                "project": store.project_name(),
309                "sent_bytes": report.sent_bytes,
310                "received_bytes": report.received_bytes,
311            }))?
312        );
313    } else {
314        eprintln!(
315            "{}synced:{} {} (sent {} bytes, received {} bytes)",
316            c.green,
317            c.reset,
318            store.project_name(),
319            report.sent_bytes,
320            report.received_bytes,
321        );
322        if report.imported {
323            eprintln!("{}info:{} imported peer changes", c.blue, c.reset);
324        } else {
325            eprintln!("{}info:{} peer had no new changes", c.blue, c.reset);
326        }
327    }
328    Ok(())
329}
330
331/// Read the stable project identity from the doc's root meta map.
332fn read_project_id(store: &db::Store) -> Result<String> {
333    let root = serde_json::to_value(store.doc().get_deep_value())?;
334    root.get("meta")
335        .and_then(|m| m.get("project_id"))
336        .and_then(|v| v.as_str())
337        .map(str::to_owned)
338        .ok_or_else(|| anyhow::anyhow!("missing meta.project_id in project doc"))
339}