db.rs

   1use anyhow::{anyhow, bail, Context, Result};
   2use loro::{Container, ExportMode, LoroDoc, LoroMap, PeerID, ValueOrContainer};
   3use serde::{Deserialize, Serialize};
   4use serde_json::Value;
   5use std::collections::BTreeMap;
   6use std::fmt;
   7use std::fs::{self, File, OpenOptions};
   8use std::io::Write;
   9use std::path::{Path, PathBuf};
  10
  11use fs2::FileExt;
  12use ulid::Ulid;
  13
  14pub const PROJECT_ENV: &str = "TD_PROJECT";
  15
  16pub(crate) const PROJECTS_DIR: &str = "projects";
  17const CHANGES_DIR: &str = "changes";
  18const BINDINGS_FILE: &str = "bindings.json";
  19const BASE_FILE: &str = "base.loro";
  20const TMP_SUFFIX: &str = ".tmp";
  21use crate::migrate;
  22
  23/// Current UTC time in ISO 8601 format.
  24pub fn now_utc() -> String {
  25    chrono::Utc::now().format("%Y-%m-%dT%H:%M:%SZ").to_string()
  26}
  27
  28/// Lifecycle state for a task.
  29#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
  30#[serde(rename_all = "snake_case")]
  31pub enum Status {
  32    Open,
  33    InProgress,
  34    Closed,
  35}
  36
  37impl Status {
  38    fn as_str(self) -> &'static str {
  39        match self {
  40            Status::Open => "open",
  41            Status::InProgress => "in_progress",
  42            Status::Closed => "closed",
  43        }
  44    }
  45
  46    fn parse(raw: &str) -> Result<Self> {
  47        match raw {
  48            "open" => Ok(Self::Open),
  49            "in_progress" => Ok(Self::InProgress),
  50            "closed" => Ok(Self::Closed),
  51            _ => bail!("invalid status '{raw}'"),
  52        }
  53    }
  54}
  55
  56/// Priority for task ordering.
  57#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
  58#[serde(rename_all = "snake_case")]
  59pub enum Priority {
  60    High,
  61    Medium,
  62    Low,
  63}
  64
  65impl Priority {
  66    fn as_str(self) -> &'static str {
  67        match self {
  68            Priority::High => "high",
  69            Priority::Medium => "medium",
  70            Priority::Low => "low",
  71        }
  72    }
  73
  74    fn parse(raw: &str) -> Result<Self> {
  75        match raw {
  76            "high" => Ok(Self::High),
  77            "medium" => Ok(Self::Medium),
  78            "low" => Ok(Self::Low),
  79            _ => bail!("invalid priority '{raw}'"),
  80        }
  81    }
  82
  83    pub fn score(self) -> i32 {
  84        match self {
  85            Priority::High => 1,
  86            Priority::Medium => 2,
  87            Priority::Low => 3,
  88        }
  89    }
  90}
  91
  92/// Estimated effort for a task.
  93#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
  94#[serde(rename_all = "snake_case")]
  95pub enum Effort {
  96    Low,
  97    Medium,
  98    High,
  99}
 100
 101impl Effort {
 102    fn as_str(self) -> &'static str {
 103        match self {
 104            Effort::Low => "low",
 105            Effort::Medium => "medium",
 106            Effort::High => "high",
 107        }
 108    }
 109
 110    fn parse(raw: &str) -> Result<Self> {
 111        match raw {
 112            "low" => Ok(Self::Low),
 113            "medium" => Ok(Self::Medium),
 114            "high" => Ok(Self::High),
 115            _ => bail!("invalid effort '{raw}'"),
 116        }
 117    }
 118
 119    pub fn score(self) -> i32 {
 120        match self {
 121            Effort::Low => 1,
 122            Effort::Medium => 2,
 123            Effort::High => 3,
 124        }
 125    }
 126}
 127
 128/// A stable task identifier backed by a ULID.
 129///
 130/// Serializes as the short display form (`td-XXXXXXX`) for user-facing
 131/// JSON. Use [`TaskId::as_str`] when the full ULID is needed (e.g.
 132/// for CRDT keys or export round-tripping).
 133#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
 134pub struct TaskId(String);
 135
 136impl Serialize for TaskId {
 137    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
 138        serializer.serialize_str(&self.short())
 139    }
 140}
 141
 142impl TaskId {
 143    pub fn new(id: Ulid) -> Self {
 144        Self(id.to_string())
 145    }
 146
 147    pub fn parse(raw: &str) -> Result<Self> {
 148        let id = Ulid::from_string(raw).with_context(|| format!("invalid task id '{raw}'"))?;
 149        Ok(Self::new(id))
 150    }
 151
 152    pub fn as_str(&self) -> &str {
 153        &self.0
 154    }
 155
 156    pub fn short(&self) -> String {
 157        format!("td-{}", &self.0[self.0.len() - 7..])
 158    }
 159
 160    /// Return a display-friendly short ID from a raw ULID string.
 161    pub fn display_id(raw: &str) -> String {
 162        let n = raw.len();
 163        if n > 7 {
 164            format!("td-{}", &raw[n - 7..])
 165        } else {
 166            format!("td-{raw}")
 167        }
 168    }
 169}
 170
 171impl fmt::Display for TaskId {
 172    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
 173        write!(f, "{}", self.short())
 174    }
 175}
 176
 177/// A task log entry embedded in a task record.
 178#[derive(Debug, Clone, Serialize)]
 179pub struct LogEntry {
 180    pub id: TaskId,
 181    pub timestamp: String,
 182    pub message: String,
 183}
 184
 185/// Hydrated task data from the CRDT document.
 186#[derive(Debug, Clone, Serialize)]
 187pub struct Task {
 188    pub id: TaskId,
 189    pub title: String,
 190    pub description: String,
 191    #[serde(rename = "type")]
 192    pub task_type: String,
 193    pub priority: Priority,
 194    pub status: Status,
 195    pub effort: Effort,
 196    pub parent: Option<TaskId>,
 197    pub created_at: String,
 198    pub updated_at: String,
 199    pub deleted_at: Option<String>,
 200    pub labels: Vec<String>,
 201    pub blockers: Vec<TaskId>,
 202    pub logs: Vec<LogEntry>,
 203}
 204
 205impl Task {
 206    /// Serialize this task with full ULIDs instead of short display IDs.
 207    ///
 208    /// Used by `export` so that `import` can round-trip data losslessly —
 209    /// `import` needs the full ULID to recreate exact CRDT keys.
 210    pub fn to_export_value(&self) -> serde_json::Value {
 211        serde_json::json!({
 212            "id": self.id.as_str(),
 213            "title": self.title,
 214            "description": self.description,
 215            "type": self.task_type,
 216            "priority": self.priority,
 217            "status": self.status,
 218            "effort": self.effort,
 219            "parent": self.parent.as_ref().map(|p| p.as_str()),
 220            "created_at": self.created_at,
 221            "updated_at": self.updated_at,
 222            "deleted_at": self.deleted_at,
 223            "labels": self.labels,
 224            "blockers": self.blockers.iter().map(|b| b.as_str()).collect::<Vec<_>>(),
 225            "logs": self.logs.iter().map(|l| {
 226                serde_json::json!({
 227                    "id": l.id.as_str(),
 228                    "timestamp": l.timestamp,
 229                    "message": l.message,
 230                })
 231            }).collect::<Vec<_>>(),
 232        })
 233    }
 234}
 235
 236/// Result type for partitioning blockers by task state.
 237#[derive(Debug, Default, Clone, Serialize)]
 238pub struct BlockerPartition {
 239    pub open: Vec<TaskId>,
 240    pub resolved: Vec<TaskId>,
 241}
 242
 243#[derive(Debug, Default, Clone, Serialize, Deserialize)]
 244struct BindingsFile {
 245    #[serde(default)]
 246    bindings: BTreeMap<String, String>,
 247}
 248
 249/// Storage wrapper around one project's Loro document and disk layout.
 250#[derive(Debug, Clone)]
 251pub struct Store {
 252    root: PathBuf,
 253    project: String,
 254    doc: LoroDoc,
 255}
 256
 257impl Store {
 258    pub fn init(root: &Path, project: &str) -> Result<Self> {
 259        validate_project_name(project)?;
 260        let project_dir = project_dir(root, project);
 261        if project_dir.exists() {
 262            bail!("project '{project}' already exists");
 263        }
 264        fs::create_dir_all(project_dir.join(CHANGES_DIR))?;
 265
 266        let doc = LoroDoc::new();
 267        doc.set_peer_id(load_or_create_device_peer_id(root)?)?;
 268        doc.get_map("tasks");
 269
 270        let meta = doc.get_map("meta");
 271        meta.insert("schema_version", migrate::CURRENT_SCHEMA_VERSION as i64)?;
 272        meta.insert("project_id", Ulid::new().to_string())?;
 273        meta.insert("created_at", now_utc())?;
 274
 275        let snapshot = doc
 276            .export(ExportMode::Snapshot)
 277            .context("failed to export initial loro snapshot")?;
 278        atomic_write_file(&project_dir.join(BASE_FILE), &snapshot)?;
 279
 280        Ok(Self {
 281            root: root.to_path_buf(),
 282            project: project.to_string(),
 283            doc,
 284        })
 285    }
 286
 287    pub fn open(root: &Path, project: &str) -> Result<Self> {
 288        validate_project_name(project)?;
 289        let project_dir = project_dir(root, project);
 290        let base_path = project_dir.join(BASE_FILE);
 291
 292        if !base_path.exists() {
 293            bail!("project '{project}' is not initialized. Run 'td project init {project}'");
 294        }
 295
 296        let base = fs::read(&base_path)
 297            .with_context(|| format!("failed to read loro snapshot '{}'", base_path.display()))?;
 298
 299        let doc = LoroDoc::from_snapshot(&base).context("failed to load loro snapshot")?;
 300        doc.set_peer_id(load_or_create_device_peer_id(root)?)?;
 301
 302        let mut deltas = collect_delta_paths(&project_dir)?;
 303        deltas.sort_by_key(|path| {
 304            path.file_stem()
 305                .and_then(|s| s.to_str())
 306                .and_then(|s| Ulid::from_string(s).ok())
 307        });
 308
 309        for delta_path in deltas {
 310            let bytes = fs::read(&delta_path)
 311                .with_context(|| format!("failed to read loro delta '{}'", delta_path.display()))?;
 312            if let Err(err) = doc.import(&bytes) {
 313                // Tolerate malformed or stale delta files as requested by design.
 314                eprintln!(
 315                    "warning: skipping unreadable delta '{}': {err}",
 316                    delta_path.display()
 317                );
 318            }
 319        }
 320
 321        // Apply any pending schema upgrades and persist the resulting delta
 322        // so subsequent opens don't repeat the work.
 323        let before_vv = doc.oplog_vv();
 324        let upgraded = migrate::ensure_current(&doc)?;
 325        if upgraded {
 326            doc.commit();
 327            let delta = doc
 328                .export(ExportMode::updates(&before_vv))
 329                .context("failed to export schema upgrade delta")?;
 330            let filename = format!("{}.loro", Ulid::new());
 331            let delta_path = project_dir.join(CHANGES_DIR).join(filename);
 332            atomic_write_file(&delta_path, &delta)?;
 333        }
 334
 335        Ok(Self {
 336            root: root.to_path_buf(),
 337            project: project.to_string(),
 338            doc,
 339        })
 340    }
 341
 342    /// Bootstrap a local project from peer-provided delta bytes.
 343    ///
 344    /// The incoming delta is imported into a fresh document, validated to
 345    /// ensure it carries `meta.project_id`, and then persisted as a base
 346    /// snapshot for future opens.
 347    pub fn bootstrap_from_peer(root: &Path, project: &str, delta: &[u8]) -> Result<Self> {
 348        validate_project_name(project)?;
 349        let project_dir = project_dir(root, project);
 350        if project_dir.exists() {
 351            bail!("project '{project}' already exists");
 352        }
 353        fs::create_dir_all(project_dir.join(CHANGES_DIR))?;
 354
 355        let doc = LoroDoc::new();
 356        doc.set_peer_id(load_or_create_device_peer_id(root)?)?;
 357        doc.import(delta)
 358            .context("failed to import bootstrap delta from peer")?;
 359        doc.commit();
 360
 361        read_project_id_from_doc(&doc)
 362            .context("bootstrap delta is missing required project identity")?;
 363
 364        // Upgrade the peer's document before snapshotting so the local
 365        // copy is always at CURRENT_SCHEMA_VERSION from the start.
 366        migrate::ensure_current(&doc)?;
 367        doc.commit();
 368
 369        let snapshot = doc
 370            .export(ExportMode::Snapshot)
 371            .context("failed to export bootstrap loro snapshot")?;
 372        atomic_write_file(&project_dir.join(BASE_FILE), &snapshot)?;
 373
 374        Ok(Self {
 375            root: root.to_path_buf(),
 376            project: project.to_string(),
 377            doc,
 378        })
 379    }
 380
 381    pub fn root(&self) -> &Path {
 382        &self.root
 383    }
 384
 385    pub fn project_name(&self) -> &str {
 386        &self.project
 387    }
 388
 389    pub fn doc(&self) -> &LoroDoc {
 390        &self.doc
 391    }
 392
 393    /// Export all current state to a fresh base snapshot.
 394    /// Compact accumulated deltas into the base snapshot using a two-phase
 395    /// protocol that is safe against concurrent writers.
 396    ///
 397    /// **Phase 1** — rename `changes/` to `changes.compacting.<ulid>/`, then
 398    /// immediately create a fresh `changes/`.  Any concurrent `td` command
 399    /// that writes a delta after this point lands in the new `changes/` and is
 400    /// therefore never touched by this operation.
 401    ///
 402    /// **Phase 2** — write a fresh base snapshot from the in-memory document
 403    /// (which was loaded from both `base.loro` and every delta at `open` time),
 404    /// then remove the compacting directory.
 405    ///
 406    /// Any orphaned `changes.compacting.*` directories left by a previously
 407    /// crashed tidy are also removed: they were already merged into `self.doc`
 408    /// at open time, so the new snapshot includes their contents.
 409    ///
 410    /// Returns the number of delta files folded into the snapshot.
 411    pub fn tidy(&self) -> Result<usize> {
 412        let project_dir = project_dir(&self.root, &self.project);
 413        let changes_dir = project_dir.join(CHANGES_DIR);
 414
 415        // Phase 1: atomically hand off the current changes/ to a compacting
 416        // directory so new writers have a clean home immediately.
 417        let compacting_dir = project_dir.join(format!("changes.compacting.{}", Ulid::new()));
 418        if changes_dir.exists() {
 419            fs::rename(&changes_dir, &compacting_dir).with_context(|| {
 420                format!(
 421                    "failed to rename '{}' to '{}'",
 422                    changes_dir.display(),
 423                    compacting_dir.display()
 424                )
 425            })?;
 426        }
 427        fs::create_dir_all(&changes_dir).context("failed to create fresh changes/")?;
 428
 429        // Re-import every delta from the compacting directories.  self.doc
 430        // was populated at open() time, but a concurrent writer may have
 431        // appended a delta to changes/ between open() and the Phase 1
 432        // rename — that delta is now inside compacting_dir without being in
 433        // self.doc.  CRDT import is idempotent (deduplicates by OpID), so
 434        // re-importing already-known ops is harmless.
 435        let mut compacting_deltas = collect_delta_paths(&project_dir)?;
 436        compacting_deltas.sort_by_key(|path| {
 437            path.file_stem()
 438                .and_then(|s| s.to_str())
 439                .and_then(|s| Ulid::from_string(s).ok())
 440        });
 441        for delta_path in &compacting_deltas {
 442            if let Ok(bytes) = fs::read(delta_path) {
 443                if let Err(err) = self.doc.import(&bytes) {
 444                    eprintln!(
 445                        "warning: skipping unreadable delta '{}': {err}",
 446                        delta_path.display()
 447                    );
 448                }
 449            }
 450        }
 451
 452        // Phase 2: write the new base snapshot.  self.doc now holds the
 453        // full merged state including any concurrent deltas.
 454        let out = project_dir.join(BASE_FILE);
 455        let bytes = self
 456            .doc
 457            .export(ExportMode::Snapshot)
 458            .context("failed to export loro snapshot")?;
 459        atomic_write_file(&out, &bytes)?;
 460
 461        // Remove the compacting directory we created in phase 1 plus any
 462        // orphaned changes.compacting.* dirs from previously crashed tidies.
 463        let mut removed = 0usize;
 464        for entry in fs::read_dir(&project_dir)
 465            .with_context(|| format!("failed to read project dir '{}'", project_dir.display()))?
 466        {
 467            let path = entry?.path();
 468            if !path.is_dir() {
 469                continue;
 470            }
 471            let Some(name) = path.file_name().and_then(|n| n.to_str()) else {
 472                continue;
 473            };
 474            if !name.starts_with("changes.compacting.") {
 475                continue;
 476            }
 477            // Count files before removing for the summary report.
 478            for file in fs::read_dir(&path)
 479                .with_context(|| format!("failed to read '{}'", path.display()))?
 480            {
 481                let fp = file?.path();
 482                if fp.is_file() {
 483                    removed += 1;
 484                }
 485            }
 486            fs::remove_dir_all(&path)
 487                .with_context(|| format!("failed to remove compacting dir '{}'", path.display()))?;
 488        }
 489
 490        Ok(removed)
 491    }
 492
 493    /// Apply a local mutation and persist only the resulting delta.
 494    pub fn apply_and_persist<F>(&self, mutator: F) -> Result<PathBuf>
 495    where
 496        F: FnOnce(&LoroDoc) -> Result<()>,
 497    {
 498        let before = self.doc.oplog_vv();
 499        mutator(&self.doc)?;
 500        self.doc.commit();
 501
 502        let delta = self
 503            .doc
 504            .export(ExportMode::updates(&before))
 505            .context("failed to export loro update delta")?;
 506
 507        let filename = format!("{}.loro", Ulid::new());
 508        let path = project_dir(&self.root, &self.project)
 509            .join(CHANGES_DIR)
 510            .join(filename);
 511        atomic_write_file(&path, &delta)?;
 512        Ok(path)
 513    }
 514
 515    /// Persist pre-built delta bytes (e.g. received from a peer) as a new
 516    /// change file without re-exporting from the doc.
 517    pub fn save_raw_delta(&self, bytes: &[u8]) -> Result<PathBuf> {
 518        let filename = format!("{}.loro", Ulid::new());
 519        let path = project_dir(&self.root, &self.project)
 520            .join(CHANGES_DIR)
 521            .join(filename);
 522        atomic_write_file(&path, bytes)?;
 523        Ok(path)
 524    }
 525
 526    /// Return hydrated tasks, excluding tombstones.
 527    pub fn list_tasks(&self) -> Result<Vec<Task>> {
 528        self.list_tasks_inner(false)
 529    }
 530
 531    /// Return hydrated tasks, including tombstoned rows.
 532    pub fn list_tasks_unfiltered(&self) -> Result<Vec<Task>> {
 533        self.list_tasks_inner(true)
 534    }
 535
 536    /// Find a task by exact ULID string.
 537    pub fn get_task(&self, id: &TaskId, include_deleted: bool) -> Result<Option<Task>> {
 538        let tasks = if include_deleted {
 539            self.list_tasks_unfiltered()?
 540        } else {
 541            self.list_tasks()?
 542        };
 543        Ok(tasks.into_iter().find(|task| task.id == *id))
 544    }
 545
 546    fn list_tasks_inner(&self, include_deleted: bool) -> Result<Vec<Task>> {
 547        let root = serde_json::to_value(self.doc.get_deep_value())?;
 548        let tasks_obj = root
 549            .get("tasks")
 550            .and_then(Value::as_object)
 551            .ok_or_else(|| anyhow!("missing root tasks map"))?;
 552
 553        let mut tasks = Vec::with_capacity(tasks_obj.len());
 554        for (task_id_raw, task_json) in tasks_obj {
 555            let task = hydrate_task(task_id_raw, task_json)?;
 556            if include_deleted || task.deleted_at.is_none() {
 557                tasks.push(task);
 558            }
 559        }
 560
 561        tasks.sort_by(|a, b| a.id.as_str().cmp(b.id.as_str()));
 562        Ok(tasks)
 563    }
 564
 565    /// Return the stable project identity stored in `meta.project_id`.
 566    pub fn project_id(&self) -> Result<String> {
 567        read_project_id_from_doc(&self.doc)
 568    }
 569
 570    pub fn schema_version(&self) -> Result<u32> {
 571        migrate::read_schema_version(&self.doc)
 572    }
 573}
 574
 575/// Generate a new task ULID.
 576pub fn gen_id() -> TaskId {
 577    TaskId::new(Ulid::new())
 578}
 579
 580pub fn parse_status(s: &str) -> Result<Status> {
 581    Status::parse(s)
 582}
 583
 584pub fn parse_priority(s: &str) -> Result<Priority> {
 585    Priority::parse(s)
 586}
 587
 588pub fn parse_effort(s: &str) -> Result<Effort> {
 589    Effort::parse(s)
 590}
 591
 592pub fn status_label(s: Status) -> &'static str {
 593    s.as_str()
 594}
 595
 596pub fn priority_label(p: Priority) -> &'static str {
 597    p.as_str()
 598}
 599
 600pub fn effort_label(e: Effort) -> &'static str {
 601    e.as_str()
 602}
 603
 604pub fn data_root() -> Result<PathBuf> {
 605    let home = std::env::var("HOME").context("HOME is not set")?;
 606    Ok(PathBuf::from(home).join(".local").join("share").join("td"))
 607}
 608
 609pub fn init(cwd: &Path, project: &str) -> Result<Store> {
 610    let root = data_root()?;
 611    fs::create_dir_all(root.join(PROJECTS_DIR))?;
 612    let store = Store::init(&root, project)?;
 613    bind_project(cwd, project)?;
 614    Ok(store)
 615}
 616
 617pub fn use_project(cwd: &Path, project: &str) -> Result<()> {
 618    let root = data_root()?;
 619    validate_project_name(project)?;
 620    if !project_dir(&root, project).join(BASE_FILE).exists() {
 621        bail!("project '{project}' not found. Run 'td project list' to list known projects");
 622    }
 623    bind_project(cwd, project)
 624}
 625
 626pub fn open(start: &Path) -> Result<Store> {
 627    let root = data_root()?;
 628    let explicit = std::env::var(PROJECT_ENV).ok();
 629    let project = resolve_project_name(start, &root, explicit.as_deref())?.ok_or_else(|| {
 630        anyhow!(
 631            "no project selected. Use --project/TD_PROJECT, run 'td project bind <name>', or run 'td project init <name>'"
 632        )
 633    })?;
 634    Store::open(&root, &project)
 635}
 636
 637/// Open the project selected by `--project`/`TD_PROJECT`/bindings if one exists.
 638///
 639/// Returns `Ok(None)` when no project is selected by any mechanism.
 640pub fn try_open(start: &Path) -> Result<Option<Store>> {
 641    let root = data_root()?;
 642    let explicit = std::env::var(PROJECT_ENV).ok();
 643    let Some(project) = resolve_project_name(start, &root, explicit.as_deref())? else {
 644        return Ok(None);
 645    };
 646    Store::open(&root, &project).map(Some)
 647}
 648
 649/// Bootstrap a project from a peer delta and bind the current directory.
 650pub fn bootstrap_sync(cwd: &Path, project: &str, delta: &[u8]) -> Result<Store> {
 651    let root = data_root()?;
 652    fs::create_dir_all(root.join(PROJECTS_DIR))?;
 653    validate_project_name(project)?;
 654    let store = Store::bootstrap_from_peer(&root, project, delta)?;
 655    bind_project(cwd, project)?;
 656    Ok(store)
 657}
 658
 659/// Bootstrap a project from a peer delta using an explicit data root.
 660///
 661/// Unlike [`bootstrap_sync`], this function does not consult `HOME` and is
 662/// therefore safe to call from async contexts where `HOME` may vary by peer.
 663///
 664/// If `bind_cwd` is true, the given working directory is bound to the new
 665/// project. Pass false when bootstrapping from a SyncAll context to avoid
 666/// unexpectedly binding directories like the user's home.
 667///
 668/// Uses exclusive file locking to prevent race conditions when multiple
 669/// concurrent sync operations create projects or modify bindings.
 670pub fn bootstrap_sync_at(
 671    data_root: &Path,
 672    cwd: &Path,
 673    project: &str,
 674    delta: &[u8],
 675    bind_cwd: bool,
 676) -> Result<Store> {
 677    fs::create_dir_all(data_root.join(PROJECTS_DIR))?;
 678    validate_project_name(project)?;
 679
 680    // Exclusive lock prevents races when concurrent syncs create the same project
 681    // or modify bindings simultaneously.
 682    let lock_path = data_root.join(".bindings.lock");
 683    let lock_file = OpenOptions::new()
 684        .create(true)
 685        .truncate(false)
 686        .write(true)
 687        .open(&lock_path)
 688        .with_context(|| format!("failed to open lock file '{}'", lock_path.display()))?;
 689    lock_file
 690        .lock_exclusive()
 691        .context("failed to acquire exclusive lock on bindings")?;
 692
 693    // Now holding the lock: create project and optionally update bindings atomically.
 694    let store = Store::bootstrap_from_peer(data_root, project, delta)?;
 695
 696    if bind_cwd {
 697        let canonical = fs::canonicalize(cwd)
 698            .with_context(|| format!("failed to canonicalize '{}'", cwd.display()))?;
 699        let mut bindings = load_bindings(data_root)?;
 700        bindings
 701            .bindings
 702            .insert(canonical.to_string_lossy().to_string(), project.to_string());
 703        save_bindings(data_root, &bindings)?;
 704    }
 705
 706    // Lock is released when lock_file is dropped.
 707    Ok(store)
 708}
 709
 710pub fn list_projects() -> Result<Vec<String>> {
 711    let root = data_root()?;
 712    list_projects_in(&root)
 713}
 714
 715/// List project names rooted at an explicit data directory.
 716///
 717/// Unlike [`list_projects`], this does not consult `HOME` and is therefore
 718/// safe to call from async contexts where `HOME` may vary between peers.
 719pub(crate) fn list_projects_in(root: &Path) -> Result<Vec<String>> {
 720    let mut out = Vec::new();
 721    let dir = root.join(PROJECTS_DIR);
 722    if !dir.exists() {
 723        return Ok(out);
 724    }
 725
 726    for entry in fs::read_dir(dir)? {
 727        let path = entry?.path();
 728        if !path.is_dir() {
 729            continue;
 730        }
 731        let Some(name) = path.file_name().and_then(|n| n.to_str()) else {
 732            continue;
 733        };
 734        if path.join(BASE_FILE).exists() {
 735            out.push(name.to_string());
 736        }
 737    }
 738
 739    out.sort();
 740    Ok(out)
 741}
 742
 743pub fn resolve_task_id(store: &Store, raw: &str, include_deleted: bool) -> Result<TaskId> {
 744    let raw = raw.strip_prefix("td-").unwrap_or(raw);
 745
 746    if let Ok(id) = TaskId::parse(raw) {
 747        if store.get_task(&id, include_deleted)?.is_some() {
 748            return Ok(id);
 749        }
 750    }
 751
 752    let tasks = if include_deleted {
 753        store.list_tasks_unfiltered()?
 754    } else {
 755        store.list_tasks()?
 756    };
 757
 758    let upper = raw.to_ascii_uppercase();
 759    let matches: Vec<TaskId> = tasks
 760        .into_iter()
 761        .filter(|t| t.id.as_str().ends_with(&upper))
 762        .map(|t| t.id)
 763        .collect();
 764
 765    match matches.as_slice() {
 766        [] => bail!("task '{raw}' not found"),
 767        [id] => Ok(id.clone()),
 768        _ => bail!("task reference '{raw}' is ambiguous"),
 769    }
 770}
 771
 772pub fn partition_blockers(store: &Store, blockers: &[TaskId]) -> Result<BlockerPartition> {
 773    let mut out = BlockerPartition::default();
 774    for blocker in blockers {
 775        let Some(task) = store.get_task(blocker, true)? else {
 776            out.resolved.push(blocker.clone());
 777            continue;
 778        };
 779        if task.status == Status::Closed || task.deleted_at.is_some() {
 780            out.resolved.push(blocker.clone());
 781        } else {
 782            out.open.push(blocker.clone());
 783        }
 784    }
 785    Ok(out)
 786}
 787
 788pub fn insert_task_map(tasks: &LoroMap, task_id: &TaskId) -> Result<LoroMap> {
 789    tasks
 790        .insert_container(task_id.as_str(), LoroMap::new())
 791        .context("failed to create task map")
 792}
 793
 794pub fn get_task_map(tasks: &LoroMap, task_id: &TaskId) -> Result<Option<LoroMap>> {
 795    match tasks.get(task_id.as_str()) {
 796        Some(ValueOrContainer::Container(Container::Map(map))) => Ok(Some(map)),
 797        Some(_) => bail!("task '{}' has invalid container type", task_id.as_str()),
 798        None => Ok(None),
 799    }
 800}
 801
 802pub fn get_or_create_child_map(parent: &LoroMap, key: &str) -> Result<LoroMap> {
 803    parent
 804        .get_or_create_container(key, LoroMap::new())
 805        .with_context(|| format!("failed to get or create map key '{key}'"))
 806}
 807
 808fn bindings_path(root: &Path) -> PathBuf {
 809    root.join(BINDINGS_FILE)
 810}
 811
 812fn resolve_project_name(
 813    start: &Path,
 814    root: &Path,
 815    explicit: Option<&str>,
 816) -> Result<Option<String>> {
 817    if let Some(project) = explicit {
 818        validate_project_name(project)?;
 819        return Ok(Some(project.to_string()));
 820    }
 821
 822    let cwd = canonicalize_binding_path(start)?;
 823    let bindings = load_bindings(root)?;
 824
 825    let mut best: Option<(usize, String)> = None;
 826    for (raw_path, project) in bindings.bindings {
 827        let bound = PathBuf::from(raw_path);
 828        if is_prefix_path(&bound, &cwd) {
 829            let score = bound.components().count();
 830            match &best {
 831                Some((best_score, _)) if *best_score >= score => {}
 832                _ => best = Some((score, project)),
 833            }
 834        }
 835    }
 836
 837    if let Some((_, project)) = best {
 838        return Ok(Some(project));
 839    }
 840
 841    Ok(None)
 842}
 843
 844pub fn unbind_project(cwd: &Path) -> Result<()> {
 845    let root = data_root()?;
 846    let canonical = canonicalize_binding_path(cwd)?;
 847    let canonical_str = canonical.to_string_lossy().to_string();
 848
 849    let mut bindings = load_bindings(&root)?;
 850    if !bindings.bindings.contains_key(&canonical_str) {
 851        bail!("path '{}' is not bound to any project", canonical.display());
 852    }
 853    bindings.bindings.remove(&canonical_str);
 854    save_bindings(&root, &bindings)
 855}
 856
 857pub fn delete_project(name: &str) -> Result<()> {
 858    validate_project_name(name)?;
 859    let root = data_root()?;
 860    let proj_dir = project_dir(&root, name);
 861
 862    if !proj_dir.join(BASE_FILE).exists() {
 863        bail!("project '{name}' not found");
 864    }
 865
 866    fs::remove_dir_all(&proj_dir).with_context(|| {
 867        format!(
 868            "failed to remove project directory '{}'",
 869            proj_dir.display()
 870        )
 871    })?;
 872
 873    let mut bindings = load_bindings(&root)?;
 874    bindings.bindings.retain(|_, project| project != name);
 875    save_bindings(&root, &bindings)
 876}
 877
 878fn bind_project(cwd: &Path, project: &str) -> Result<()> {
 879    validate_project_name(project)?;
 880
 881    let root = data_root()?;
 882    fs::create_dir_all(&root)?;
 883
 884    let canonical = canonicalize_binding_path(cwd)?;
 885    let mut bindings = load_bindings(&root)?;
 886    bindings
 887        .bindings
 888        .insert(canonical.to_string_lossy().to_string(), project.to_string());
 889    save_bindings(&root, &bindings)
 890}
 891
 892fn load_bindings(root: &Path) -> Result<BindingsFile> {
 893    let path = bindings_path(root);
 894    if !path.exists() {
 895        return Ok(BindingsFile::default());
 896    }
 897    let content = fs::read_to_string(&path)
 898        .with_context(|| format!("failed reading bindings from '{}'", path.display()))?;
 899    serde_json::from_str(&content)
 900        .with_context(|| format!("invalid bindings file '{}'", path.display()))
 901}
 902
 903fn save_bindings(root: &Path, bindings: &BindingsFile) -> Result<()> {
 904    let path = bindings_path(root);
 905    let bytes = serde_json::to_vec_pretty(bindings)?;
 906    atomic_write_file(&path, &bytes)
 907}
 908
 909fn canonicalize_binding_path(path: &Path) -> Result<PathBuf> {
 910    fs::canonicalize(path).with_context(|| format!("failed to canonicalize '{}'", path.display()))
 911}
 912
 913fn is_prefix_path(prefix: &Path, target: &Path) -> bool {
 914    let mut prefix_components = prefix.components();
 915    let mut target_components = target.components();
 916
 917    loop {
 918        match (prefix_components.next(), target_components.next()) {
 919            (None, _) => return true,
 920            (Some(_), None) => return false,
 921            (Some(a), Some(b)) if a == b => continue,
 922            _ => return false,
 923        }
 924    }
 925}
 926
 927pub fn validate_project_name(name: &str) -> Result<()> {
 928    if name.is_empty() {
 929        bail!("project name cannot be empty");
 930    }
 931    if name.contains('/') || name.contains('\\') || name == "." || name == ".." {
 932        bail!("invalid project name '{name}'");
 933    }
 934    if name.chars().any(char::is_control) {
 935        bail!("invalid project name '{name}'");
 936    }
 937    Ok(())
 938}
 939
 940fn read_project_id_from_doc(doc: &LoroDoc) -> Result<String> {
 941    let root = serde_json::to_value(doc.get_deep_value())?;
 942    root.get("meta")
 943        .and_then(|m| m.get("project_id"))
 944        .and_then(Value::as_str)
 945        .map(str::to_owned)
 946        .ok_or_else(|| anyhow!("missing meta.project_id in project doc"))
 947}
 948
 949fn hydrate_task(task_id_raw: &str, value: &Value) -> Result<Task> {
 950    let obj = value
 951        .as_object()
 952        .ok_or_else(|| anyhow!("task '{task_id_raw}' is not an object"))?;
 953
 954    let id = TaskId::parse(task_id_raw)?;
 955
 956    let title = get_required_string(obj, "title")?;
 957    let description = get_required_string(obj, "description")?;
 958    let task_type = get_required_string(obj, "type")?;
 959    let status = Status::parse(&get_required_string(obj, "status")?)?;
 960    let priority = Priority::parse(&get_required_string(obj, "priority")?)?;
 961    let effort = Effort::parse(&get_required_string(obj, "effort")?)?;
 962    let parent = match obj.get("parent").and_then(Value::as_str) {
 963        Some("") | None => None,
 964        Some(raw) => Some(TaskId::parse(raw)?),
 965    };
 966
 967    let created_at = get_required_string(obj, "created_at")?;
 968    let updated_at = get_required_string(obj, "updated_at")?;
 969    let deleted_at = obj
 970        .get("deleted_at")
 971        .and_then(Value::as_str)
 972        .map(str::to_owned)
 973        .filter(|s| !s.is_empty());
 974
 975    let labels = obj
 976        .get("labels")
 977        .and_then(Value::as_object)
 978        .map(|m| m.keys().cloned().collect())
 979        .unwrap_or_default();
 980
 981    let blockers = obj
 982        .get("blockers")
 983        .and_then(Value::as_object)
 984        .map(|m| {
 985            m.keys()
 986                .map(|raw| TaskId::parse(raw))
 987                .collect::<Result<Vec<_>>>()
 988        })
 989        .transpose()?
 990        .unwrap_or_default();
 991
 992    let mut logs = obj
 993        .get("logs")
 994        .and_then(Value::as_object)
 995        .map(|logs| {
 996            logs.iter()
 997                .map(|(log_id_raw, payload)| {
 998                    let payload_obj = payload.as_object().ok_or_else(|| {
 999                        anyhow!("log '{log_id_raw}' on task '{task_id_raw}' is not an object")
1000                    })?;
1001                    Ok(LogEntry {
1002                        id: TaskId::parse(log_id_raw)?,
1003                        timestamp: get_required_string(payload_obj, "timestamp")?,
1004                        message: get_required_string(payload_obj, "message")?,
1005                    })
1006                })
1007                .collect::<Result<Vec<_>>>()
1008        })
1009        .transpose()?
1010        .unwrap_or_default();
1011
1012    logs.sort_by(|a, b| a.id.as_str().cmp(b.id.as_str()));
1013
1014    Ok(Task {
1015        id,
1016        title,
1017        description,
1018        task_type,
1019        priority,
1020        status,
1021        effort,
1022        parent,
1023        created_at,
1024        updated_at,
1025        deleted_at,
1026        labels,
1027        blockers,
1028        logs,
1029    })
1030}
1031
1032fn get_required_string(map: &serde_json::Map<String, Value>, key: &str) -> Result<String> {
1033    map.get(key)
1034        .and_then(Value::as_str)
1035        .map(str::to_owned)
1036        .ok_or_else(|| anyhow!("missing or non-string key '{key}'"))
1037}
1038
1039fn collect_delta_paths(project_dir: &Path) -> Result<Vec<PathBuf>> {
1040    let mut paths = Vec::new();
1041    collect_changes_from_dir(&project_dir.join(CHANGES_DIR), &mut paths)?;
1042
1043    for entry in fs::read_dir(project_dir)? {
1044        let path = entry?.path();
1045        if !path.is_dir() {
1046            continue;
1047        }
1048        let Some(name) = path.file_name().and_then(|n| n.to_str()) else {
1049            continue;
1050        };
1051        if name.starts_with("changes.compacting.") {
1052            collect_changes_from_dir(&path, &mut paths)?;
1053        }
1054    }
1055
1056    Ok(paths)
1057}
1058
1059fn collect_changes_from_dir(dir: &Path, out: &mut Vec<PathBuf>) -> Result<()> {
1060    if !dir.exists() {
1061        return Ok(());
1062    }
1063
1064    for entry in fs::read_dir(dir)? {
1065        let path = entry?.path();
1066        if !path.is_file() {
1067            continue;
1068        }
1069
1070        let Some(filename) = path.file_name().and_then(|n| n.to_str()) else {
1071            continue;
1072        };
1073        if filename.ends_with(TMP_SUFFIX) || !filename.ends_with(".loro") {
1074            continue;
1075        }
1076
1077        let Some(stem) = path.file_stem().and_then(|s| s.to_str()) else {
1078            continue;
1079        };
1080        if Ulid::from_string(stem).is_err() {
1081            continue;
1082        }
1083
1084        out.push(path);
1085    }
1086
1087    Ok(())
1088}
1089
1090fn project_dir(root: &Path, project: &str) -> PathBuf {
1091    root.join(PROJECTS_DIR).join(project)
1092}
1093
1094fn load_or_create_device_peer_id(root: &Path) -> Result<PeerID> {
1095    let path = root.join("device_id");
1096    if let Some(parent) = path.parent() {
1097        fs::create_dir_all(parent)?;
1098    }
1099
1100    let device_ulid = if path.exists() {
1101        let content = fs::read_to_string(&path)
1102            .with_context(|| format!("failed reading device id from '{}'", path.display()))?;
1103        Ulid::from_string(content.trim()).context("invalid persisted device id ULID")?
1104    } else {
1105        let id = Ulid::new();
1106        atomic_write_file(&path, id.to_string().as_bytes())?;
1107        id
1108    };
1109
1110    let raw: u128 = device_ulid.into();
1111    Ok((raw & u64::MAX as u128) as u64)
1112}
1113
1114fn atomic_write_file(path: &Path, bytes: &[u8]) -> Result<()> {
1115    let parent = path
1116        .parent()
1117        .ok_or_else(|| anyhow!("cannot atomically write root path '{}'", path.display()))?;
1118    fs::create_dir_all(parent)?;
1119
1120    let tmp_name = format!(
1121        "{}.{}{}",
1122        path.file_name().and_then(|n| n.to_str()).unwrap_or("write"),
1123        Ulid::new(),
1124        TMP_SUFFIX
1125    );
1126    let tmp_path = parent.join(tmp_name);
1127
1128    {
1129        let mut file = OpenOptions::new()
1130            .create_new(true)
1131            .write(true)
1132            .open(&tmp_path)
1133            .with_context(|| format!("failed to open temp file '{}'", tmp_path.display()))?;
1134        file.write_all(bytes)?;
1135        file.sync_all()?;
1136    }
1137
1138    fs::rename(&tmp_path, path).with_context(|| {
1139        format!(
1140            "failed to atomically rename '{}' to '{}'",
1141            tmp_path.display(),
1142            path.display()
1143        )
1144    })?;
1145
1146    sync_dir(parent)?;
1147    Ok(())
1148}
1149
1150fn sync_dir(path: &Path) -> Result<()> {
1151    let dir =
1152        File::open(path).with_context(|| format!("failed opening dir '{}'", path.display()))?;
1153    dir.sync_all()
1154        .with_context(|| format!("failed fsync on dir '{}'", path.display()))?;
1155    Ok(())
1156}