Repurpose migrations to Loro schema upgrader

Amolith created

Change summary

src/db.rs                                   |  34 +++-
src/migrate.rs                              | 153 ++++++++++++++++++++--
src/migrations/0001_initial_schema.down.sql |   6 
src/migrations/0001_initial_schema.up.sql   |  29 ----
src/migrations/0002_add_effort.down.sql     |   1 
src/migrations/0002_add_effort.up.sql       |   1 
src/migrations/0003_blocker_fk.down.sql     |  15 --
src/migrations/0003_blocker_fk.up.sql       |  20 ---
src/migrations/0004_task_logs.down.sql      |   1 
src/migrations/0004_task_logs.up.sql        |   7 -
src/migrations/0005_cascade_fks.down.sql    |  30 ----
src/migrations/0005_cascade_fks.up.sql      |  34 -----
12 files changed, 160 insertions(+), 171 deletions(-)

Detailed changes

src/db.rs 🔗

@@ -16,7 +16,7 @@ const CHANGES_DIR: &str = "changes";
 const BINDINGS_FILE: &str = "bindings.json";
 const BASE_FILE: &str = "base.loro";
 const TMP_SUFFIX: &str = ".tmp";
-const SCHEMA_VERSION: u32 = 1;
+use crate::migrate;
 
 /// Current UTC time in ISO 8601 format.
 pub fn now_utc() -> String {
@@ -226,7 +226,7 @@ impl Store {
         doc.get_map("tasks");
 
         let meta = doc.get_map("meta");
-        meta.insert("schema_version", SCHEMA_VERSION as i64)?;
+        meta.insert("schema_version", migrate::CURRENT_SCHEMA_VERSION as i64)?;
         meta.insert("project_id", Ulid::new().to_string())?;
         meta.insert("created_at", now_utc())?;
 
@@ -276,6 +276,20 @@ impl Store {
             }
         }
 
+        // Apply any pending schema upgrades and persist the resulting delta
+        // so subsequent opens don't repeat the work.
+        let before_vv = doc.oplog_vv();
+        let upgraded = migrate::ensure_current(&doc)?;
+        if upgraded {
+            doc.commit();
+            let delta = doc
+                .export(ExportMode::updates(&before_vv))
+                .context("failed to export schema upgrade delta")?;
+            let filename = format!("{}.loro", Ulid::new());
+            let delta_path = project_dir.join(CHANGES_DIR).join(filename);
+            atomic_write_file(&delta_path, &delta)?;
+        }
+
         Ok(Self {
             root: root.to_path_buf(),
             project: project.to_string(),
@@ -305,6 +319,11 @@ impl Store {
         read_project_id_from_doc(&doc)
             .context("bootstrap delta is missing required project identity")?;
 
+        // Upgrade the peer's document before snapshotting so the local
+        // copy is always at CURRENT_SCHEMA_VERSION from the start.
+        migrate::ensure_current(&doc)?;
+        doc.commit();
+
         let snapshot = doc
             .export(ExportMode::Snapshot)
             .context("failed to export bootstrap loro snapshot")?;
@@ -502,16 +521,7 @@ impl Store {
     }
 
     pub fn schema_version(&self) -> Result<u32> {
-        let root = serde_json::to_value(self.doc.get_deep_value())?;
-        let meta = root
-            .get("meta")
-            .and_then(Value::as_object)
-            .ok_or_else(|| anyhow!("missing root meta map"))?;
-        let n = meta
-            .get("schema_version")
-            .and_then(Value::as_u64)
-            .ok_or_else(|| anyhow!("invalid or missing meta.schema_version"))?;
-        Ok(n as u32)
+        migrate::read_schema_version(&self.doc)
     }
 }
 

src/migrate.rs 🔗

@@ -1,29 +1,152 @@
-//! Loro-backed storage does not use SQL schema migrations.
+//! Forward-only document-schema upgrader for Loro state.
 //!
-//! The old SQLite migration flow has been replaced by document-level metadata
-//! (`meta.schema_version`) in the Loro snapshot. This module remains only as a
-//! compatibility shim for call sites that still invoke migration entry points.
+//! Each project's Loro document carries `meta.schema_version`.  On every
+//! [`Store::open`](crate::db::Store::open) call, [`ensure_current`] compares
+//! that version to [`CURRENT_SCHEMA_VERSION`] and applies any registered
+//! upgraders in sequence.  Upgraders are keyed by the source version they
+//! transform *from*; each one is responsible for mutating the document so it
+//! conforms to the next version's expectations.
+//!
+//! When a new schema change is introduced:
+//! 1. Bump [`CURRENT_SCHEMA_VERSION`].
+//! 2. Write an `upgrade_vN_to_vM` function.
+//! 3. Add a match arm in [`upgrader_for`].
+
+use anyhow::{anyhow, bail, Result};
+use loro::{LoroDoc, LoroValue, ValueOrContainer};
+
+/// The schema version that the running code expects every document to be at.
+pub const CURRENT_SCHEMA_VERSION: u32 = 1;
+
+/// Check a document's schema version and apply any necessary upgrades.
+///
+/// Returns `Ok(true)` when at least one upgrade was applied (the caller
+/// should persist the resulting delta), or `Ok(false)` when the document
+/// was already current.
+///
+/// # Errors
+///
+/// - Document has no `meta.schema_version`.
+/// - Document version is newer than `CURRENT_SCHEMA_VERSION`.
+/// - No upgrader is registered for a version in the upgrade path.
+/// - An upgrader itself fails.
+pub fn ensure_current(doc: &LoroDoc) -> Result<bool> {
+    let mut version = read_schema_version(doc)?;
+
+    if version == CURRENT_SCHEMA_VERSION {
+        return Ok(false);
+    }
+
+    if version > CURRENT_SCHEMA_VERSION {
+        bail!(
+            "document schema version {version} is newer than supported \
+             ({CURRENT_SCHEMA_VERSION}); please upgrade td"
+        );
+    }
 
-use anyhow::Result;
+    while version < CURRENT_SCHEMA_VERSION {
+        let next = version + 1;
+        let upgrader = upgrader_for(version).ok_or_else(|| {
+            anyhow!("no upgrader registered for schema version {version} → {next}")
+        })?;
+        upgrader(doc)?;
+        version = next;
+    }
+
+    let meta = doc.get_map("meta");
+    meta.insert("schema_version", CURRENT_SCHEMA_VERSION as i64)?;
+
+    Ok(true)
+}
 
-/// No-op compatibility function for legacy call sites.
-pub fn migrate_up<T>(_conn: &mut T) -> Result<()> {
-    Ok(())
+/// Read `meta.schema_version` from an already-loaded Loro document.
+///
+/// Reads directly from the Loro map handler, avoiding a full-document
+/// JSON serialisation.
+pub fn read_schema_version(doc: &LoroDoc) -> Result<u32> {
+    let meta = doc.get_map("meta");
+    match meta.get("schema_version") {
+        Some(ValueOrContainer::Value(LoroValue::I64(n))) => {
+            u32::try_from(n).map_err(|_| anyhow!("meta.schema_version out of u32 range: {n}"))
+        }
+        Some(_) => bail!("meta.schema_version has unexpected type"),
+        None => bail!("invalid or missing meta.schema_version"),
+    }
 }
 
-/// No-op compatibility function for legacy call sites.
-pub fn migrate_down<T>(_conn: &mut T, _target_version: u32) -> Result<()> {
-    Ok(())
+/// Look up the upgrade function that transforms a document *from* the given
+/// version to `version + 1`.  Returns `None` when no upgrader is registered.
+// The wildcard-only match is intentional: each new schema version adds an
+// arm here (e.g. `1 => Some(upgrade_v1_to_v2)`).  Clippy's
+// match_single_binding lint fires because there are no concrete arms yet.
+#[allow(clippy::match_single_binding)]
+fn upgrader_for(version: u32) -> Option<fn(&LoroDoc) -> Result<()>> {
+    match version {
+        _ => None,
+    }
 }
 
 #[cfg(test)]
 mod tests {
     use super::*;
 
+    /// Build a minimal Loro document stamped at the given schema version.
+    fn doc_at_version(v: u32) -> LoroDoc {
+        let doc = LoroDoc::new();
+        let meta = doc.get_map("meta");
+        meta.insert("schema_version", v as i64).unwrap();
+        doc.commit();
+        doc
+    }
+
+    #[test]
+    fn current_version_is_noop() {
+        let doc = doc_at_version(CURRENT_SCHEMA_VERSION);
+        assert!(!ensure_current(&doc).unwrap());
+    }
+
+    #[test]
+    fn future_version_rejected() {
+        let doc = doc_at_version(CURRENT_SCHEMA_VERSION + 1);
+        let err = ensure_current(&doc).unwrap_err();
+        assert!(
+            err.to_string().contains("newer than supported"),
+            "unexpected error: {err}"
+        );
+    }
+
+    #[test]
+    fn old_version_without_upgrader_gives_clear_error() {
+        let doc = doc_at_version(0);
+        let err = ensure_current(&doc).unwrap_err();
+        assert!(
+            err.to_string().contains("no upgrader registered"),
+            "unexpected error: {err}"
+        );
+    }
+
+    #[test]
+    fn idempotent_on_current_version() {
+        let doc = doc_at_version(CURRENT_SCHEMA_VERSION);
+        assert!(!ensure_current(&doc).unwrap());
+        assert!(!ensure_current(&doc).unwrap());
+    }
+
+    #[test]
+    fn missing_meta_is_error() {
+        let doc = LoroDoc::new();
+        doc.get_map("meta"); // exists but has no schema_version key
+        doc.commit();
+        let err = ensure_current(&doc).unwrap_err();
+        assert!(
+            err.to_string().contains("meta.schema_version"),
+            "unexpected error: {err}"
+        );
+    }
+
     #[test]
-    fn compatibility_noops_succeed() {
-        let mut placeholder = ();
-        migrate_up(&mut placeholder).unwrap();
-        migrate_down(&mut placeholder, 0).unwrap();
+    fn read_schema_version_returns_stamped_value() {
+        let doc = doc_at_version(42);
+        assert_eq!(read_schema_version(&doc).unwrap(), 42);
     }
 }

src/migrations/0001_initial_schema.down.sql 🔗

@@ -1,6 +0,0 @@
-DROP INDEX IF EXISTS idx_parent;
-DROP INDEX IF EXISTS idx_priority;
-DROP INDEX IF EXISTS idx_status;
-DROP TABLE IF EXISTS blockers;
-DROP TABLE IF EXISTS labels;
-DROP TABLE IF EXISTS tasks;

src/migrations/0001_initial_schema.up.sql 🔗

@@ -1,29 +0,0 @@
-CREATE TABLE IF NOT EXISTS tasks (
-    id          TEXT PRIMARY KEY,
-    title       TEXT NOT NULL,
-    description TEXT DEFAULT '',
-    type        TEXT DEFAULT 'task',
-    priority    INTEGER DEFAULT 2,
-    status      TEXT DEFAULT 'open',
-    parent      TEXT DEFAULT '',
-    created     TEXT NOT NULL,
-    updated     TEXT NOT NULL
-);
-
-CREATE TABLE IF NOT EXISTS labels (
-    task_id TEXT,
-    label   TEXT,
-    PRIMARY KEY (task_id, label),
-    FOREIGN KEY (task_id) REFERENCES tasks(id)
-);
-
-CREATE TABLE IF NOT EXISTS blockers (
-    task_id    TEXT,
-    blocker_id TEXT,
-    PRIMARY KEY (task_id, blocker_id),
-    FOREIGN KEY (task_id) REFERENCES tasks(id)
-);
-
-CREATE INDEX IF NOT EXISTS idx_status   ON tasks(status);
-CREATE INDEX IF NOT EXISTS idx_priority ON tasks(priority);
-CREATE INDEX IF NOT EXISTS idx_parent   ON tasks(parent);

src/migrations/0003_blocker_fk.down.sql 🔗

@@ -1,15 +0,0 @@
--- Revert to the original blockers table without the blocker_id FK.
-
-CREATE TABLE blockers_old (
-    task_id    TEXT,
-    blocker_id TEXT,
-    PRIMARY KEY (task_id, blocker_id),
-    FOREIGN KEY (task_id) REFERENCES tasks(id)
-);
-
-INSERT INTO blockers_old (task_id, blocker_id)
-    SELECT task_id, blocker_id FROM blockers;
-
-DROP TABLE blockers;
-
-ALTER TABLE blockers_old RENAME TO blockers;

src/migrations/0003_blocker_fk.up.sql 🔗

@@ -1,20 +0,0 @@
--- Add FOREIGN KEY (blocker_id) REFERENCES tasks(id) to the blockers table.
--- SQLite has no ALTER TABLE ADD CONSTRAINT, so we rebuild.
-
--- Drop dangling blocker_id rows that reference nonexistent tasks.
-DELETE FROM blockers WHERE blocker_id NOT IN (SELECT id FROM tasks);
-
-CREATE TABLE blockers_new (
-    task_id    TEXT,
-    blocker_id TEXT,
-    PRIMARY KEY (task_id, blocker_id),
-    FOREIGN KEY (task_id)    REFERENCES tasks(id),
-    FOREIGN KEY (blocker_id) REFERENCES tasks(id)
-);
-
-INSERT INTO blockers_new (task_id, blocker_id)
-    SELECT task_id, blocker_id FROM blockers;
-
-DROP TABLE blockers;
-
-ALTER TABLE blockers_new RENAME TO blockers;

src/migrations/0004_task_logs.up.sql 🔗

@@ -1,7 +0,0 @@
-CREATE TABLE task_logs (
-    id INTEGER PRIMARY KEY AUTOINCREMENT,
-    task_id TEXT NOT NULL,
-    timestamp TEXT NOT NULL,
-    body TEXT NOT NULL,
-    FOREIGN KEY (task_id) REFERENCES tasks(id) ON DELETE CASCADE
-);

src/migrations/0005_cascade_fks.down.sql 🔗

@@ -1,30 +0,0 @@
--- Revert labels/blockers foreign keys to definitions without ON DELETE CASCADE.
-
-CREATE TABLE labels_old (
-    task_id TEXT,
-    label   TEXT,
-    PRIMARY KEY (task_id, label),
-    FOREIGN KEY (task_id) REFERENCES tasks(id)
-);
-
-INSERT INTO labels_old (task_id, label)
-    SELECT task_id, label FROM labels;
-
-DROP TABLE labels;
-
-ALTER TABLE labels_old RENAME TO labels;
-
-CREATE TABLE blockers_old (
-    task_id    TEXT,
-    blocker_id TEXT,
-    PRIMARY KEY (task_id, blocker_id),
-    FOREIGN KEY (task_id) REFERENCES tasks(id),
-    FOREIGN KEY (blocker_id) REFERENCES tasks(id)
-);
-
-INSERT INTO blockers_old (task_id, blocker_id)
-    SELECT task_id, blocker_id FROM blockers;
-
-DROP TABLE blockers;
-
-ALTER TABLE blockers_old RENAME TO blockers;

src/migrations/0005_cascade_fks.up.sql 🔗

@@ -1,34 +0,0 @@
--- Add ON DELETE CASCADE to labels/task_id and blockers/task_id+blocker_id.
--- SQLite has no ALTER TABLE ADD CONSTRAINT, so we rebuild both tables.
-
--- Drop dangling label rows before introducing stricter FK behavior.
-DELETE FROM labels WHERE task_id NOT IN (SELECT id FROM tasks);
-
-CREATE TABLE labels_new (
-    task_id TEXT,
-    label   TEXT,
-    PRIMARY KEY (task_id, label),
-    FOREIGN KEY (task_id) REFERENCES tasks(id) ON DELETE CASCADE
-);
-
-INSERT INTO labels_new (task_id, label)
-    SELECT task_id, label FROM labels;
-
-DROP TABLE labels;
-
-ALTER TABLE labels_new RENAME TO labels;
-
-CREATE TABLE blockers_new (
-    task_id    TEXT,
-    blocker_id TEXT,
-    PRIMARY KEY (task_id, blocker_id),
-    FOREIGN KEY (task_id) REFERENCES tasks(id) ON DELETE CASCADE,
-    FOREIGN KEY (blocker_id) REFERENCES tasks(id) ON DELETE CASCADE
-);
-
-INSERT INTO blockers_new (task_id, blocker_id)
-    SELECT task_id, blocker_id FROM blockers;
-
-DROP TABLE blockers;
-
-ALTER TABLE blockers_new RENAME TO blockers;