sync.rs

  1//! Peer-to-peer project sync via magic-wormhole.
  2//!
  3//! Both peers open the same project, exchange Loro version vectors,
  4//! compute deltas containing only the ops the other side lacks, then
  5//! exchange and import those deltas.  The result is that both docs
  6//! converge to the same state without sending duplicate operations.
  7
  8use std::borrow::Cow;
  9use std::path::Path;
 10
 11use anyhow::{bail, Context, Result};
 12use loro::{ExportMode, VersionVector};
 13use magic_wormhole::{AppConfig, AppID, Code, MailboxConnection, Wormhole};
 14use serde::{Deserialize, Serialize};
 15
 16use crate::db;
 17
 18/// Custom AppID scoping our wormhole traffic away from other protocols.
 19const APP_ID: &str = "td.sync.v1";
 20
 21/// Number of random words in the generated wormhole code.
 22const CODE_WORD_COUNT: usize = 2;
 23
 24/// Handshake message exchanged before the delta payload.
 25#[derive(Debug, Serialize, Deserialize)]
 26struct SyncHandshake {
 27    /// Human-readable project name.
 28    project_name: String,
 29    /// Stable identity (ULID stored in the doc's root meta map).
 30    project_id: String,
 31    /// Serialised version vector so the peer can compute a minimal delta.
 32    #[serde(with = "vv_serde")]
 33    version_vector: VersionVector,
 34}
 35
 36/// Serde adapter for `VersionVector` using its postcard `encode()`/`decode()`.
 37mod vv_serde {
 38    use loro::VersionVector;
 39    use serde::{self, Deserializer, Serializer};
 40
 41    pub fn serialize<S: Serializer>(vv: &VersionVector, ser: S) -> Result<S::Ok, S::Error> {
 42        ser.serialize_bytes(&vv.encode())
 43    }
 44
 45    pub fn deserialize<'de, D: Deserializer<'de>>(de: D) -> Result<VersionVector, D::Error> {
 46        let bytes: Vec<u8> = serde::Deserialize::deserialize(de)?;
 47        VersionVector::decode(&bytes).map_err(serde::de::Error::custom)
 48    }
 49}
 50
 51/// Outcome of a sync exchange, returned by [`exchange`].
 52pub struct SyncReport {
 53    pub sent_bytes: usize,
 54    pub received_bytes: usize,
 55    pub imported: bool,
 56}
 57
 58pub fn wormhole_config() -> AppConfig<serde_json::Value> {
 59    AppConfig {
 60        id: AppID::new(APP_ID),
 61        rendezvous_url: Cow::Borrowed(magic_wormhole::rendezvous::DEFAULT_RENDEZVOUS_SERVER),
 62        app_version: serde_json::json!({"v": 1}),
 63    }
 64}
 65
 66/// Run the sync protocol over an already-established wormhole.
 67///
 68/// Both sides call this concurrently.  The protocol is symmetric: each
 69/// peer sends its version vector, receives the other's, computes a
 70/// minimal delta, sends it, receives the peer's delta, and imports it.
 71pub async fn exchange(store: &db::Store, mut wormhole: Wormhole) -> Result<SyncReport> {
 72    let my_vv = store.doc().oplog_vv();
 73    let my_handshake = SyncHandshake {
 74        project_name: store.project_name().to_string(),
 75        project_id: read_project_id(store)?,
 76        version_vector: my_vv,
 77    };
 78
 79    // --- Phase 1: exchange handshakes ---
 80    wormhole
 81        .send_json(&my_handshake)
 82        .await
 83        .context("failed to send handshake")?;
 84
 85    let their_handshake: SyncHandshake = wormhole
 86        .receive_json::<SyncHandshake>()
 87        .await
 88        .context("failed to receive handshake")?
 89        .context("peer sent invalid handshake JSON")?;
 90
 91    if my_handshake.project_id != their_handshake.project_id {
 92        let _ = wormhole.close().await;
 93        bail!(
 94            "project identity mismatch: local '{}' ({}) vs peer '{}' ({})",
 95            my_handshake.project_name,
 96            my_handshake.project_id,
 97            their_handshake.project_name,
 98            their_handshake.project_id,
 99        );
100    }
101
102    // --- Phase 2: compute and exchange deltas ---
103    let my_delta = store
104        .doc()
105        .export(ExportMode::updates(&their_handshake.version_vector))
106        .context("failed to export delta for peer")?;
107
108    wormhole
109        .send(my_delta.clone())
110        .await
111        .context("failed to send delta")?;
112
113    let their_delta = wormhole
114        .receive()
115        .await
116        .context("failed to receive delta from peer")?;
117
118    wormhole.close().await.context("failed to close wormhole")?;
119
120    // --- Phase 3: import the peer's delta locally ---
121    let imported = if !their_delta.is_empty() {
122        store
123            .doc()
124            .import(&their_delta)
125            .context("failed to import peer delta")?;
126        store.doc().commit();
127        store.save_raw_delta(&their_delta)?;
128        true
129    } else {
130        false
131    };
132
133    Ok(SyncReport {
134        sent_bytes: my_delta.len(),
135        received_bytes: their_delta.len(),
136        imported,
137    })
138}
139
140pub fn run(root: &Path, code: Option<&str>, json: bool) -> Result<()> {
141    let rt = tokio::runtime::Runtime::new().context("failed to create async runtime")?;
142    rt.block_on(run_async(root, code, json))
143}
144
145async fn run_async(root: &Path, code: Option<&str>, json: bool) -> Result<()> {
146    let store = db::open(root)?;
147    let c = crate::color::stderr_theme();
148
149    let wormhole = match code {
150        None => {
151            let mailbox = MailboxConnection::create(wormhole_config(), CODE_WORD_COUNT)
152                .await
153                .context("failed to create wormhole mailbox")?;
154
155            let code = mailbox.code().clone();
156            if json {
157                println!(
158                    "{}",
159                    serde_json::to_string(&serde_json::json!({"code": code.to_string()}))?
160                );
161            } else {
162                eprintln!("{}wormhole:{} run on the other machine:\n", c.blue, c.reset);
163                eprintln!("  td sync {}{}{}\n", c.bold, code, c.reset);
164                eprintln!("waiting for peer...");
165            }
166
167            Wormhole::connect(mailbox)
168                .await
169                .context("wormhole key exchange failed")?
170        }
171        Some(raw) => {
172            let code: Code = raw.parse().context("invalid wormhole code")?;
173            let mailbox = MailboxConnection::connect(wormhole_config(), code, false)
174                .await
175                .context("failed to connect to wormhole mailbox")?;
176
177            if !json {
178                eprintln!("{}wormhole:{} connecting...", c.blue, c.reset);
179            }
180
181            Wormhole::connect(mailbox)
182                .await
183                .context("wormhole key exchange failed")?
184        }
185    };
186
187    if !json {
188        eprintln!("{}wormhole:{} connected, syncing...", c.blue, c.reset);
189    }
190
191    let report = exchange(&store, wormhole).await?;
192
193    if json {
194        println!(
195            "{}",
196            serde_json::to_string(&serde_json::json!({
197                "synced": true,
198                "project": store.project_name(),
199                "sent_bytes": report.sent_bytes,
200                "received_bytes": report.received_bytes,
201            }))?
202        );
203    } else {
204        eprintln!(
205            "{}synced:{} {} (sent {} bytes, received {} bytes)",
206            c.green,
207            c.reset,
208            store.project_name(),
209            report.sent_bytes,
210            report.received_bytes,
211        );
212        if report.imported {
213            eprintln!("{}info:{} imported peer changes", c.blue, c.reset);
214        } else {
215            eprintln!("{}info:{} peer had no new changes", c.blue, c.reset);
216        }
217    }
218
219    Ok(())
220}
221
222/// Read the stable project identity from the doc's root meta map.
223fn read_project_id(store: &db::Store) -> Result<String> {
224    let root = serde_json::to_value(store.doc().get_deep_value())?;
225    root.get("meta")
226        .and_then(|m| m.get("project_id"))
227        .and_then(|v| v.as_str())
228        .map(str::to_owned)
229        .ok_or_else(|| anyhow::anyhow!("missing meta.project_id in project doc"))
230}