db.rs

   1use anyhow::{anyhow, bail, Context, Result};
   2use loro::{Container, ExportMode, LoroDoc, LoroMap, PeerID, ValueOrContainer};
   3use serde::{Deserialize, Serialize};
   4use serde_json::Value;
   5use std::collections::BTreeMap;
   6use std::fmt;
   7use std::fs::{self, File, OpenOptions};
   8use std::io::Write;
   9use std::path::{Path, PathBuf};
  10
  11use fs2::FileExt;
  12use ulid::Ulid;
  13
  14pub const PROJECT_ENV: &str = "TD_PROJECT";
  15
  16pub(crate) const PROJECTS_DIR: &str = "projects";
  17const CHANGES_DIR: &str = "changes";
  18const BINDINGS_FILE: &str = "bindings.json";
  19const BASE_FILE: &str = "base.loro";
  20const TMP_SUFFIX: &str = ".tmp";
  21use crate::migrate;
  22
  23/// Current UTC time in ISO 8601 format.
  24pub fn now_utc() -> String {
  25    chrono::Utc::now().format("%Y-%m-%dT%H:%M:%SZ").to_string()
  26}
  27
  28/// Lifecycle state for a task.
  29#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
  30#[serde(rename_all = "snake_case")]
  31pub enum Status {
  32    Open,
  33    InProgress,
  34    Closed,
  35}
  36
  37impl Status {
  38    fn as_str(self) -> &'static str {
  39        match self {
  40            Status::Open => "open",
  41            Status::InProgress => "in_progress",
  42            Status::Closed => "closed",
  43        }
  44    }
  45
  46    fn parse(raw: &str) -> Result<Self> {
  47        match raw {
  48            "open" => Ok(Self::Open),
  49            "in_progress" => Ok(Self::InProgress),
  50            "closed" => Ok(Self::Closed),
  51            _ => bail!("invalid status '{raw}'"),
  52        }
  53    }
  54}
  55
  56/// Priority for task ordering.
  57#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
  58#[serde(rename_all = "snake_case")]
  59pub enum Priority {
  60    High,
  61    Medium,
  62    Low,
  63}
  64
  65impl Priority {
  66    fn as_str(self) -> &'static str {
  67        match self {
  68            Priority::High => "high",
  69            Priority::Medium => "medium",
  70            Priority::Low => "low",
  71        }
  72    }
  73
  74    fn parse(raw: &str) -> Result<Self> {
  75        match raw {
  76            "high" => Ok(Self::High),
  77            "medium" => Ok(Self::Medium),
  78            "low" => Ok(Self::Low),
  79            _ => bail!("invalid priority '{raw}'"),
  80        }
  81    }
  82
  83    pub fn score(self) -> i32 {
  84        match self {
  85            Priority::High => 1,
  86            Priority::Medium => 2,
  87            Priority::Low => 3,
  88        }
  89    }
  90}
  91
  92/// Estimated effort for a task.
  93#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
  94#[serde(rename_all = "snake_case")]
  95pub enum Effort {
  96    Low,
  97    Medium,
  98    High,
  99}
 100
 101impl Effort {
 102    fn as_str(self) -> &'static str {
 103        match self {
 104            Effort::Low => "low",
 105            Effort::Medium => "medium",
 106            Effort::High => "high",
 107        }
 108    }
 109
 110    fn parse(raw: &str) -> Result<Self> {
 111        match raw {
 112            "low" => Ok(Self::Low),
 113            "medium" => Ok(Self::Medium),
 114            "high" => Ok(Self::High),
 115            _ => bail!("invalid effort '{raw}'"),
 116        }
 117    }
 118
 119    pub fn score(self) -> i32 {
 120        match self {
 121            Effort::Low => 1,
 122            Effort::Medium => 2,
 123            Effort::High => 3,
 124        }
 125    }
 126}
 127
 128/// A stable task identifier backed by a ULID.
 129#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize)]
 130#[serde(transparent)]
 131pub struct TaskId(String);
 132
 133impl TaskId {
 134    pub fn new(id: Ulid) -> Self {
 135        Self(id.to_string())
 136    }
 137
 138    pub fn parse(raw: &str) -> Result<Self> {
 139        let id = Ulid::from_string(raw).with_context(|| format!("invalid task id '{raw}'"))?;
 140        Ok(Self::new(id))
 141    }
 142
 143    pub fn as_str(&self) -> &str {
 144        &self.0
 145    }
 146
 147    pub fn short(&self) -> String {
 148        format!("td-{}", &self.0[self.0.len() - 7..])
 149    }
 150
 151    /// Return a display-friendly short ID from a raw ULID string.
 152    pub fn display_id(raw: &str) -> String {
 153        let n = raw.len();
 154        if n > 7 {
 155            format!("td-{}", &raw[n - 7..])
 156        } else {
 157            format!("td-{raw}")
 158        }
 159    }
 160}
 161
 162impl fmt::Display for TaskId {
 163    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
 164        write!(f, "{}", self.short())
 165    }
 166}
 167
 168/// A task log entry embedded in a task record.
 169#[derive(Debug, Clone, Serialize)]
 170pub struct LogEntry {
 171    pub id: TaskId,
 172    pub timestamp: String,
 173    pub message: String,
 174}
 175
 176/// Hydrated task data from the CRDT document.
 177#[derive(Debug, Clone, Serialize)]
 178pub struct Task {
 179    pub id: TaskId,
 180    pub title: String,
 181    pub description: String,
 182    #[serde(rename = "type")]
 183    pub task_type: String,
 184    pub priority: Priority,
 185    pub status: Status,
 186    pub effort: Effort,
 187    pub parent: Option<TaskId>,
 188    pub created_at: String,
 189    pub updated_at: String,
 190    pub deleted_at: Option<String>,
 191    pub labels: Vec<String>,
 192    pub blockers: Vec<TaskId>,
 193    pub logs: Vec<LogEntry>,
 194}
 195
 196/// Result type for partitioning blockers by task state.
 197#[derive(Debug, Default, Clone, Serialize)]
 198pub struct BlockerPartition {
 199    pub open: Vec<TaskId>,
 200    pub resolved: Vec<TaskId>,
 201}
 202
 203#[derive(Debug, Default, Clone, Serialize, Deserialize)]
 204struct BindingsFile {
 205    #[serde(default)]
 206    bindings: BTreeMap<String, String>,
 207}
 208
 209/// Storage wrapper around one project's Loro document and disk layout.
 210#[derive(Debug, Clone)]
 211pub struct Store {
 212    root: PathBuf,
 213    project: String,
 214    doc: LoroDoc,
 215}
 216
 217impl Store {
 218    pub fn init(root: &Path, project: &str) -> Result<Self> {
 219        validate_project_name(project)?;
 220        let project_dir = project_dir(root, project);
 221        if project_dir.exists() {
 222            bail!("project '{project}' already exists");
 223        }
 224        fs::create_dir_all(project_dir.join(CHANGES_DIR))?;
 225
 226        let doc = LoroDoc::new();
 227        doc.set_peer_id(load_or_create_device_peer_id(root)?)?;
 228        doc.get_map("tasks");
 229
 230        let meta = doc.get_map("meta");
 231        meta.insert("schema_version", migrate::CURRENT_SCHEMA_VERSION as i64)?;
 232        meta.insert("project_id", Ulid::new().to_string())?;
 233        meta.insert("created_at", now_utc())?;
 234
 235        let snapshot = doc
 236            .export(ExportMode::Snapshot)
 237            .context("failed to export initial loro snapshot")?;
 238        atomic_write_file(&project_dir.join(BASE_FILE), &snapshot)?;
 239
 240        Ok(Self {
 241            root: root.to_path_buf(),
 242            project: project.to_string(),
 243            doc,
 244        })
 245    }
 246
 247    pub fn open(root: &Path, project: &str) -> Result<Self> {
 248        validate_project_name(project)?;
 249        let project_dir = project_dir(root, project);
 250        let base_path = project_dir.join(BASE_FILE);
 251
 252        if !base_path.exists() {
 253            bail!("project '{project}' is not initialized. Run 'td project init {project}'");
 254        }
 255
 256        let base = fs::read(&base_path)
 257            .with_context(|| format!("failed to read loro snapshot '{}'", base_path.display()))?;
 258
 259        let doc = LoroDoc::from_snapshot(&base).context("failed to load loro snapshot")?;
 260        doc.set_peer_id(load_or_create_device_peer_id(root)?)?;
 261
 262        let mut deltas = collect_delta_paths(&project_dir)?;
 263        deltas.sort_by_key(|path| {
 264            path.file_stem()
 265                .and_then(|s| s.to_str())
 266                .and_then(|s| Ulid::from_string(s).ok())
 267        });
 268
 269        for delta_path in deltas {
 270            let bytes = fs::read(&delta_path)
 271                .with_context(|| format!("failed to read loro delta '{}'", delta_path.display()))?;
 272            if let Err(err) = doc.import(&bytes) {
 273                // Tolerate malformed or stale delta files as requested by design.
 274                eprintln!(
 275                    "warning: skipping unreadable delta '{}': {err}",
 276                    delta_path.display()
 277                );
 278            }
 279        }
 280
 281        // Apply any pending schema upgrades and persist the resulting delta
 282        // so subsequent opens don't repeat the work.
 283        let before_vv = doc.oplog_vv();
 284        let upgraded = migrate::ensure_current(&doc)?;
 285        if upgraded {
 286            doc.commit();
 287            let delta = doc
 288                .export(ExportMode::updates(&before_vv))
 289                .context("failed to export schema upgrade delta")?;
 290            let filename = format!("{}.loro", Ulid::new());
 291            let delta_path = project_dir.join(CHANGES_DIR).join(filename);
 292            atomic_write_file(&delta_path, &delta)?;
 293        }
 294
 295        Ok(Self {
 296            root: root.to_path_buf(),
 297            project: project.to_string(),
 298            doc,
 299        })
 300    }
 301
 302    /// Bootstrap a local project from peer-provided delta bytes.
 303    ///
 304    /// The incoming delta is imported into a fresh document, validated to
 305    /// ensure it carries `meta.project_id`, and then persisted as a base
 306    /// snapshot for future opens.
 307    pub fn bootstrap_from_peer(root: &Path, project: &str, delta: &[u8]) -> Result<Self> {
 308        validate_project_name(project)?;
 309        let project_dir = project_dir(root, project);
 310        if project_dir.exists() {
 311            bail!("project '{project}' already exists");
 312        }
 313        fs::create_dir_all(project_dir.join(CHANGES_DIR))?;
 314
 315        let doc = LoroDoc::new();
 316        doc.set_peer_id(load_or_create_device_peer_id(root)?)?;
 317        doc.import(delta)
 318            .context("failed to import bootstrap delta from peer")?;
 319        doc.commit();
 320
 321        read_project_id_from_doc(&doc)
 322            .context("bootstrap delta is missing required project identity")?;
 323
 324        // Upgrade the peer's document before snapshotting so the local
 325        // copy is always at CURRENT_SCHEMA_VERSION from the start.
 326        migrate::ensure_current(&doc)?;
 327        doc.commit();
 328
 329        let snapshot = doc
 330            .export(ExportMode::Snapshot)
 331            .context("failed to export bootstrap loro snapshot")?;
 332        atomic_write_file(&project_dir.join(BASE_FILE), &snapshot)?;
 333
 334        Ok(Self {
 335            root: root.to_path_buf(),
 336            project: project.to_string(),
 337            doc,
 338        })
 339    }
 340
 341    pub fn root(&self) -> &Path {
 342        &self.root
 343    }
 344
 345    pub fn project_name(&self) -> &str {
 346        &self.project
 347    }
 348
 349    pub fn doc(&self) -> &LoroDoc {
 350        &self.doc
 351    }
 352
 353    /// Export all current state to a fresh base snapshot.
 354    /// Compact accumulated deltas into the base snapshot using a two-phase
 355    /// protocol that is safe against concurrent writers.
 356    ///
 357    /// **Phase 1** — rename `changes/` to `changes.compacting.<ulid>/`, then
 358    /// immediately create a fresh `changes/`.  Any concurrent `td` command
 359    /// that writes a delta after this point lands in the new `changes/` and is
 360    /// therefore never touched by this operation.
 361    ///
 362    /// **Phase 2** — write a fresh base snapshot from the in-memory document
 363    /// (which was loaded from both `base.loro` and every delta at `open` time),
 364    /// then remove the compacting directory.
 365    ///
 366    /// Any orphaned `changes.compacting.*` directories left by a previously
 367    /// crashed tidy are also removed: they were already merged into `self.doc`
 368    /// at open time, so the new snapshot includes their contents.
 369    ///
 370    /// Returns the number of delta files folded into the snapshot.
 371    pub fn tidy(&self) -> Result<usize> {
 372        let project_dir = project_dir(&self.root, &self.project);
 373        let changes_dir = project_dir.join(CHANGES_DIR);
 374
 375        // Phase 1: atomically hand off the current changes/ to a compacting
 376        // directory so new writers have a clean home immediately.
 377        let compacting_dir = project_dir.join(format!("changes.compacting.{}", Ulid::new()));
 378        if changes_dir.exists() {
 379            fs::rename(&changes_dir, &compacting_dir).with_context(|| {
 380                format!(
 381                    "failed to rename '{}' to '{}'",
 382                    changes_dir.display(),
 383                    compacting_dir.display()
 384                )
 385            })?;
 386        }
 387        fs::create_dir_all(&changes_dir).context("failed to create fresh changes/")?;
 388
 389        // Re-import every delta from the compacting directories.  self.doc
 390        // was populated at open() time, but a concurrent writer may have
 391        // appended a delta to changes/ between open() and the Phase 1
 392        // rename — that delta is now inside compacting_dir without being in
 393        // self.doc.  CRDT import is idempotent (deduplicates by OpID), so
 394        // re-importing already-known ops is harmless.
 395        let mut compacting_deltas = collect_delta_paths(&project_dir)?;
 396        compacting_deltas.sort_by_key(|path| {
 397            path.file_stem()
 398                .and_then(|s| s.to_str())
 399                .and_then(|s| Ulid::from_string(s).ok())
 400        });
 401        for delta_path in &compacting_deltas {
 402            if let Ok(bytes) = fs::read(delta_path) {
 403                if let Err(err) = self.doc.import(&bytes) {
 404                    eprintln!(
 405                        "warning: skipping unreadable delta '{}': {err}",
 406                        delta_path.display()
 407                    );
 408                }
 409            }
 410        }
 411
 412        // Phase 2: write the new base snapshot.  self.doc now holds the
 413        // full merged state including any concurrent deltas.
 414        let out = project_dir.join(BASE_FILE);
 415        let bytes = self
 416            .doc
 417            .export(ExportMode::Snapshot)
 418            .context("failed to export loro snapshot")?;
 419        atomic_write_file(&out, &bytes)?;
 420
 421        // Remove the compacting directory we created in phase 1 plus any
 422        // orphaned changes.compacting.* dirs from previously crashed tidies.
 423        let mut removed = 0usize;
 424        for entry in fs::read_dir(&project_dir)
 425            .with_context(|| format!("failed to read project dir '{}'", project_dir.display()))?
 426        {
 427            let path = entry?.path();
 428            if !path.is_dir() {
 429                continue;
 430            }
 431            let Some(name) = path.file_name().and_then(|n| n.to_str()) else {
 432                continue;
 433            };
 434            if !name.starts_with("changes.compacting.") {
 435                continue;
 436            }
 437            // Count files before removing for the summary report.
 438            for file in fs::read_dir(&path)
 439                .with_context(|| format!("failed to read '{}'", path.display()))?
 440            {
 441                let fp = file?.path();
 442                if fp.is_file() {
 443                    removed += 1;
 444                }
 445            }
 446            fs::remove_dir_all(&path)
 447                .with_context(|| format!("failed to remove compacting dir '{}'", path.display()))?;
 448        }
 449
 450        Ok(removed)
 451    }
 452
 453    /// Apply a local mutation and persist only the resulting delta.
 454    pub fn apply_and_persist<F>(&self, mutator: F) -> Result<PathBuf>
 455    where
 456        F: FnOnce(&LoroDoc) -> Result<()>,
 457    {
 458        let before = self.doc.oplog_vv();
 459        mutator(&self.doc)?;
 460        self.doc.commit();
 461
 462        let delta = self
 463            .doc
 464            .export(ExportMode::updates(&before))
 465            .context("failed to export loro update delta")?;
 466
 467        let filename = format!("{}.loro", Ulid::new());
 468        let path = project_dir(&self.root, &self.project)
 469            .join(CHANGES_DIR)
 470            .join(filename);
 471        atomic_write_file(&path, &delta)?;
 472        Ok(path)
 473    }
 474
 475    /// Persist pre-built delta bytes (e.g. received from a peer) as a new
 476    /// change file without re-exporting from the doc.
 477    pub fn save_raw_delta(&self, bytes: &[u8]) -> Result<PathBuf> {
 478        let filename = format!("{}.loro", Ulid::new());
 479        let path = project_dir(&self.root, &self.project)
 480            .join(CHANGES_DIR)
 481            .join(filename);
 482        atomic_write_file(&path, bytes)?;
 483        Ok(path)
 484    }
 485
 486    /// Return hydrated tasks, excluding tombstones.
 487    pub fn list_tasks(&self) -> Result<Vec<Task>> {
 488        self.list_tasks_inner(false)
 489    }
 490
 491    /// Return hydrated tasks, including tombstoned rows.
 492    pub fn list_tasks_unfiltered(&self) -> Result<Vec<Task>> {
 493        self.list_tasks_inner(true)
 494    }
 495
 496    /// Find a task by exact ULID string.
 497    pub fn get_task(&self, id: &TaskId, include_deleted: bool) -> Result<Option<Task>> {
 498        let tasks = if include_deleted {
 499            self.list_tasks_unfiltered()?
 500        } else {
 501            self.list_tasks()?
 502        };
 503        Ok(tasks.into_iter().find(|task| task.id == *id))
 504    }
 505
 506    fn list_tasks_inner(&self, include_deleted: bool) -> Result<Vec<Task>> {
 507        let root = serde_json::to_value(self.doc.get_deep_value())?;
 508        let tasks_obj = root
 509            .get("tasks")
 510            .and_then(Value::as_object)
 511            .ok_or_else(|| anyhow!("missing root tasks map"))?;
 512
 513        let mut tasks = Vec::with_capacity(tasks_obj.len());
 514        for (task_id_raw, task_json) in tasks_obj {
 515            let task = hydrate_task(task_id_raw, task_json)?;
 516            if include_deleted || task.deleted_at.is_none() {
 517                tasks.push(task);
 518            }
 519        }
 520
 521        tasks.sort_by(|a, b| a.id.as_str().cmp(b.id.as_str()));
 522        Ok(tasks)
 523    }
 524
 525    /// Return the stable project identity stored in `meta.project_id`.
 526    pub fn project_id(&self) -> Result<String> {
 527        read_project_id_from_doc(&self.doc)
 528    }
 529
 530    pub fn schema_version(&self) -> Result<u32> {
 531        migrate::read_schema_version(&self.doc)
 532    }
 533}
 534
 535/// Generate a new task ULID.
 536pub fn gen_id() -> TaskId {
 537    TaskId::new(Ulid::new())
 538}
 539
 540pub fn parse_status(s: &str) -> Result<Status> {
 541    Status::parse(s)
 542}
 543
 544pub fn parse_priority(s: &str) -> Result<Priority> {
 545    Priority::parse(s)
 546}
 547
 548pub fn parse_effort(s: &str) -> Result<Effort> {
 549    Effort::parse(s)
 550}
 551
 552pub fn status_label(s: Status) -> &'static str {
 553    s.as_str()
 554}
 555
 556pub fn priority_label(p: Priority) -> &'static str {
 557    p.as_str()
 558}
 559
 560pub fn effort_label(e: Effort) -> &'static str {
 561    e.as_str()
 562}
 563
 564pub fn data_root() -> Result<PathBuf> {
 565    let home = std::env::var("HOME").context("HOME is not set")?;
 566    Ok(PathBuf::from(home).join(".local").join("share").join("td"))
 567}
 568
 569pub fn init(cwd: &Path, project: &str) -> Result<Store> {
 570    let root = data_root()?;
 571    fs::create_dir_all(root.join(PROJECTS_DIR))?;
 572    let store = Store::init(&root, project)?;
 573    bind_project(cwd, project)?;
 574    Ok(store)
 575}
 576
 577pub fn use_project(cwd: &Path, project: &str) -> Result<()> {
 578    let root = data_root()?;
 579    validate_project_name(project)?;
 580    if !project_dir(&root, project).join(BASE_FILE).exists() {
 581        bail!("project '{project}' not found. Run 'td project list' to list known projects");
 582    }
 583    bind_project(cwd, project)
 584}
 585
 586pub fn open(start: &Path) -> Result<Store> {
 587    let root = data_root()?;
 588    let explicit = std::env::var(PROJECT_ENV).ok();
 589    let project = resolve_project_name(start, &root, explicit.as_deref())?.ok_or_else(|| {
 590        anyhow!(
 591            "no project selected. Use --project/TD_PROJECT, run 'td project bind <name>', or run 'td project init <name>'"
 592        )
 593    })?;
 594    Store::open(&root, &project)
 595}
 596
 597/// Open the project selected by `--project`/`TD_PROJECT`/bindings if one exists.
 598///
 599/// Returns `Ok(None)` when no project is selected by any mechanism.
 600pub fn try_open(start: &Path) -> Result<Option<Store>> {
 601    let root = data_root()?;
 602    let explicit = std::env::var(PROJECT_ENV).ok();
 603    let Some(project) = resolve_project_name(start, &root, explicit.as_deref())? else {
 604        return Ok(None);
 605    };
 606    Store::open(&root, &project).map(Some)
 607}
 608
 609/// Bootstrap a project from a peer delta and bind the current directory.
 610pub fn bootstrap_sync(cwd: &Path, project: &str, delta: &[u8]) -> Result<Store> {
 611    let root = data_root()?;
 612    fs::create_dir_all(root.join(PROJECTS_DIR))?;
 613    validate_project_name(project)?;
 614    let store = Store::bootstrap_from_peer(&root, project, delta)?;
 615    bind_project(cwd, project)?;
 616    Ok(store)
 617}
 618
 619/// Bootstrap a project from a peer delta using an explicit data root.
 620///
 621/// Unlike [`bootstrap_sync`], this function does not consult `HOME` and is
 622/// therefore safe to call from async contexts where `HOME` may vary by peer.
 623///
 624/// If `bind_cwd` is true, the given working directory is bound to the new
 625/// project. Pass false when bootstrapping from a SyncAll context to avoid
 626/// unexpectedly binding directories like the user's home.
 627///
 628/// Uses exclusive file locking to prevent race conditions when multiple
 629/// concurrent sync operations create projects or modify bindings.
 630pub fn bootstrap_sync_at(
 631    data_root: &Path,
 632    cwd: &Path,
 633    project: &str,
 634    delta: &[u8],
 635    bind_cwd: bool,
 636) -> Result<Store> {
 637    fs::create_dir_all(data_root.join(PROJECTS_DIR))?;
 638    validate_project_name(project)?;
 639
 640    // Exclusive lock prevents races when concurrent syncs create the same project
 641    // or modify bindings simultaneously.
 642    let lock_path = data_root.join(".bindings.lock");
 643    let lock_file = OpenOptions::new()
 644        .create(true)
 645        .truncate(false)
 646        .write(true)
 647        .open(&lock_path)
 648        .with_context(|| format!("failed to open lock file '{}'", lock_path.display()))?;
 649    lock_file
 650        .lock_exclusive()
 651        .context("failed to acquire exclusive lock on bindings")?;
 652
 653    // Now holding the lock: create project and optionally update bindings atomically.
 654    let store = Store::bootstrap_from_peer(data_root, project, delta)?;
 655
 656    if bind_cwd {
 657        let canonical = fs::canonicalize(cwd)
 658            .with_context(|| format!("failed to canonicalize '{}'", cwd.display()))?;
 659        let mut bindings = load_bindings(data_root)?;
 660        bindings
 661            .bindings
 662            .insert(canonical.to_string_lossy().to_string(), project.to_string());
 663        save_bindings(data_root, &bindings)?;
 664    }
 665
 666    // Lock is released when lock_file is dropped.
 667    Ok(store)
 668}
 669
 670pub fn list_projects() -> Result<Vec<String>> {
 671    let root = data_root()?;
 672    list_projects_in(&root)
 673}
 674
 675/// List project names rooted at an explicit data directory.
 676///
 677/// Unlike [`list_projects`], this does not consult `HOME` and is therefore
 678/// safe to call from async contexts where `HOME` may vary between peers.
 679pub(crate) fn list_projects_in(root: &Path) -> Result<Vec<String>> {
 680    let mut out = Vec::new();
 681    let dir = root.join(PROJECTS_DIR);
 682    if !dir.exists() {
 683        return Ok(out);
 684    }
 685
 686    for entry in fs::read_dir(dir)? {
 687        let path = entry?.path();
 688        if !path.is_dir() {
 689            continue;
 690        }
 691        let Some(name) = path.file_name().and_then(|n| n.to_str()) else {
 692            continue;
 693        };
 694        if path.join(BASE_FILE).exists() {
 695            out.push(name.to_string());
 696        }
 697    }
 698
 699    out.sort();
 700    Ok(out)
 701}
 702
 703pub fn resolve_task_id(store: &Store, raw: &str, include_deleted: bool) -> Result<TaskId> {
 704    let raw = raw.strip_prefix("td-").unwrap_or(raw);
 705
 706    if let Ok(id) = TaskId::parse(raw) {
 707        if store.get_task(&id, include_deleted)?.is_some() {
 708            return Ok(id);
 709        }
 710    }
 711
 712    let tasks = if include_deleted {
 713        store.list_tasks_unfiltered()?
 714    } else {
 715        store.list_tasks()?
 716    };
 717
 718    let upper = raw.to_ascii_uppercase();
 719    let matches: Vec<TaskId> = tasks
 720        .into_iter()
 721        .filter(|t| t.id.as_str().ends_with(&upper))
 722        .map(|t| t.id)
 723        .collect();
 724
 725    match matches.as_slice() {
 726        [] => bail!("task '{raw}' not found"),
 727        [id] => Ok(id.clone()),
 728        _ => bail!("task reference '{raw}' is ambiguous"),
 729    }
 730}
 731
 732pub fn partition_blockers(store: &Store, blockers: &[TaskId]) -> Result<BlockerPartition> {
 733    let mut out = BlockerPartition::default();
 734    for blocker in blockers {
 735        let Some(task) = store.get_task(blocker, true)? else {
 736            out.resolved.push(blocker.clone());
 737            continue;
 738        };
 739        if task.status == Status::Closed || task.deleted_at.is_some() {
 740            out.resolved.push(blocker.clone());
 741        } else {
 742            out.open.push(blocker.clone());
 743        }
 744    }
 745    Ok(out)
 746}
 747
 748pub fn insert_task_map(tasks: &LoroMap, task_id: &TaskId) -> Result<LoroMap> {
 749    tasks
 750        .insert_container(task_id.as_str(), LoroMap::new())
 751        .context("failed to create task map")
 752}
 753
 754pub fn get_task_map(tasks: &LoroMap, task_id: &TaskId) -> Result<Option<LoroMap>> {
 755    match tasks.get(task_id.as_str()) {
 756        Some(ValueOrContainer::Container(Container::Map(map))) => Ok(Some(map)),
 757        Some(_) => bail!("task '{}' has invalid container type", task_id.as_str()),
 758        None => Ok(None),
 759    }
 760}
 761
 762pub fn get_or_create_child_map(parent: &LoroMap, key: &str) -> Result<LoroMap> {
 763    parent
 764        .get_or_create_container(key, LoroMap::new())
 765        .with_context(|| format!("failed to get or create map key '{key}'"))
 766}
 767
 768fn bindings_path(root: &Path) -> PathBuf {
 769    root.join(BINDINGS_FILE)
 770}
 771
 772fn resolve_project_name(
 773    start: &Path,
 774    root: &Path,
 775    explicit: Option<&str>,
 776) -> Result<Option<String>> {
 777    if let Some(project) = explicit {
 778        validate_project_name(project)?;
 779        return Ok(Some(project.to_string()));
 780    }
 781
 782    let cwd = canonicalize_binding_path(start)?;
 783    let bindings = load_bindings(root)?;
 784
 785    let mut best: Option<(usize, String)> = None;
 786    for (raw_path, project) in bindings.bindings {
 787        let bound = PathBuf::from(raw_path);
 788        if is_prefix_path(&bound, &cwd) {
 789            let score = bound.components().count();
 790            match &best {
 791                Some((best_score, _)) if *best_score >= score => {}
 792                _ => best = Some((score, project)),
 793            }
 794        }
 795    }
 796
 797    if let Some((_, project)) = best {
 798        return Ok(Some(project));
 799    }
 800
 801    Ok(None)
 802}
 803
 804pub fn unbind_project(cwd: &Path) -> Result<()> {
 805    let root = data_root()?;
 806    let canonical = canonicalize_binding_path(cwd)?;
 807    let canonical_str = canonical.to_string_lossy().to_string();
 808
 809    let mut bindings = load_bindings(&root)?;
 810    if !bindings.bindings.contains_key(&canonical_str) {
 811        bail!("path '{}' is not bound to any project", canonical.display());
 812    }
 813    bindings.bindings.remove(&canonical_str);
 814    save_bindings(&root, &bindings)
 815}
 816
 817pub fn delete_project(name: &str) -> Result<()> {
 818    validate_project_name(name)?;
 819    let root = data_root()?;
 820    let proj_dir = project_dir(&root, name);
 821
 822    if !proj_dir.join(BASE_FILE).exists() {
 823        bail!("project '{name}' not found");
 824    }
 825
 826    fs::remove_dir_all(&proj_dir).with_context(|| {
 827        format!(
 828            "failed to remove project directory '{}'",
 829            proj_dir.display()
 830        )
 831    })?;
 832
 833    let mut bindings = load_bindings(&root)?;
 834    bindings.bindings.retain(|_, project| project != name);
 835    save_bindings(&root, &bindings)
 836}
 837
 838fn bind_project(cwd: &Path, project: &str) -> Result<()> {
 839    validate_project_name(project)?;
 840
 841    let root = data_root()?;
 842    fs::create_dir_all(&root)?;
 843
 844    let canonical = canonicalize_binding_path(cwd)?;
 845    let mut bindings = load_bindings(&root)?;
 846    bindings
 847        .bindings
 848        .insert(canonical.to_string_lossy().to_string(), project.to_string());
 849    save_bindings(&root, &bindings)
 850}
 851
 852fn load_bindings(root: &Path) -> Result<BindingsFile> {
 853    let path = bindings_path(root);
 854    if !path.exists() {
 855        return Ok(BindingsFile::default());
 856    }
 857    let content = fs::read_to_string(&path)
 858        .with_context(|| format!("failed reading bindings from '{}'", path.display()))?;
 859    serde_json::from_str(&content)
 860        .with_context(|| format!("invalid bindings file '{}'", path.display()))
 861}
 862
 863fn save_bindings(root: &Path, bindings: &BindingsFile) -> Result<()> {
 864    let path = bindings_path(root);
 865    let bytes = serde_json::to_vec_pretty(bindings)?;
 866    atomic_write_file(&path, &bytes)
 867}
 868
 869fn canonicalize_binding_path(path: &Path) -> Result<PathBuf> {
 870    fs::canonicalize(path).with_context(|| format!("failed to canonicalize '{}'", path.display()))
 871}
 872
 873fn is_prefix_path(prefix: &Path, target: &Path) -> bool {
 874    let mut prefix_components = prefix.components();
 875    let mut target_components = target.components();
 876
 877    loop {
 878        match (prefix_components.next(), target_components.next()) {
 879            (None, _) => return true,
 880            (Some(_), None) => return false,
 881            (Some(a), Some(b)) if a == b => continue,
 882            _ => return false,
 883        }
 884    }
 885}
 886
 887pub fn validate_project_name(name: &str) -> Result<()> {
 888    if name.is_empty() {
 889        bail!("project name cannot be empty");
 890    }
 891    if name.contains('/') || name.contains('\\') || name == "." || name == ".." {
 892        bail!("invalid project name '{name}'");
 893    }
 894    if name.chars().any(char::is_control) {
 895        bail!("invalid project name '{name}'");
 896    }
 897    Ok(())
 898}
 899
 900fn read_project_id_from_doc(doc: &LoroDoc) -> Result<String> {
 901    let root = serde_json::to_value(doc.get_deep_value())?;
 902    root.get("meta")
 903        .and_then(|m| m.get("project_id"))
 904        .and_then(Value::as_str)
 905        .map(str::to_owned)
 906        .ok_or_else(|| anyhow!("missing meta.project_id in project doc"))
 907}
 908
 909fn hydrate_task(task_id_raw: &str, value: &Value) -> Result<Task> {
 910    let obj = value
 911        .as_object()
 912        .ok_or_else(|| anyhow!("task '{task_id_raw}' is not an object"))?;
 913
 914    let id = TaskId::parse(task_id_raw)?;
 915
 916    let title = get_required_string(obj, "title")?;
 917    let description = get_required_string(obj, "description")?;
 918    let task_type = get_required_string(obj, "type")?;
 919    let status = Status::parse(&get_required_string(obj, "status")?)?;
 920    let priority = Priority::parse(&get_required_string(obj, "priority")?)?;
 921    let effort = Effort::parse(&get_required_string(obj, "effort")?)?;
 922    let parent = match obj.get("parent").and_then(Value::as_str) {
 923        Some("") | None => None,
 924        Some(raw) => Some(TaskId::parse(raw)?),
 925    };
 926
 927    let created_at = get_required_string(obj, "created_at")?;
 928    let updated_at = get_required_string(obj, "updated_at")?;
 929    let deleted_at = obj
 930        .get("deleted_at")
 931        .and_then(Value::as_str)
 932        .map(str::to_owned)
 933        .filter(|s| !s.is_empty());
 934
 935    let labels = obj
 936        .get("labels")
 937        .and_then(Value::as_object)
 938        .map(|m| m.keys().cloned().collect())
 939        .unwrap_or_default();
 940
 941    let blockers = obj
 942        .get("blockers")
 943        .and_then(Value::as_object)
 944        .map(|m| {
 945            m.keys()
 946                .map(|raw| TaskId::parse(raw))
 947                .collect::<Result<Vec<_>>>()
 948        })
 949        .transpose()?
 950        .unwrap_or_default();
 951
 952    let mut logs = obj
 953        .get("logs")
 954        .and_then(Value::as_object)
 955        .map(|logs| {
 956            logs.iter()
 957                .map(|(log_id_raw, payload)| {
 958                    let payload_obj = payload.as_object().ok_or_else(|| {
 959                        anyhow!("log '{log_id_raw}' on task '{task_id_raw}' is not an object")
 960                    })?;
 961                    Ok(LogEntry {
 962                        id: TaskId::parse(log_id_raw)?,
 963                        timestamp: get_required_string(payload_obj, "timestamp")?,
 964                        message: get_required_string(payload_obj, "message")?,
 965                    })
 966                })
 967                .collect::<Result<Vec<_>>>()
 968        })
 969        .transpose()?
 970        .unwrap_or_default();
 971
 972    logs.sort_by(|a, b| a.id.as_str().cmp(b.id.as_str()));
 973
 974    Ok(Task {
 975        id,
 976        title,
 977        description,
 978        task_type,
 979        priority,
 980        status,
 981        effort,
 982        parent,
 983        created_at,
 984        updated_at,
 985        deleted_at,
 986        labels,
 987        blockers,
 988        logs,
 989    })
 990}
 991
 992fn get_required_string(map: &serde_json::Map<String, Value>, key: &str) -> Result<String> {
 993    map.get(key)
 994        .and_then(Value::as_str)
 995        .map(str::to_owned)
 996        .ok_or_else(|| anyhow!("missing or non-string key '{key}'"))
 997}
 998
 999fn collect_delta_paths(project_dir: &Path) -> Result<Vec<PathBuf>> {
1000    let mut paths = Vec::new();
1001    collect_changes_from_dir(&project_dir.join(CHANGES_DIR), &mut paths)?;
1002
1003    for entry in fs::read_dir(project_dir)? {
1004        let path = entry?.path();
1005        if !path.is_dir() {
1006            continue;
1007        }
1008        let Some(name) = path.file_name().and_then(|n| n.to_str()) else {
1009            continue;
1010        };
1011        if name.starts_with("changes.compacting.") {
1012            collect_changes_from_dir(&path, &mut paths)?;
1013        }
1014    }
1015
1016    Ok(paths)
1017}
1018
1019fn collect_changes_from_dir(dir: &Path, out: &mut Vec<PathBuf>) -> Result<()> {
1020    if !dir.exists() {
1021        return Ok(());
1022    }
1023
1024    for entry in fs::read_dir(dir)? {
1025        let path = entry?.path();
1026        if !path.is_file() {
1027            continue;
1028        }
1029
1030        let Some(filename) = path.file_name().and_then(|n| n.to_str()) else {
1031            continue;
1032        };
1033        if filename.ends_with(TMP_SUFFIX) || !filename.ends_with(".loro") {
1034            continue;
1035        }
1036
1037        let Some(stem) = path.file_stem().and_then(|s| s.to_str()) else {
1038            continue;
1039        };
1040        if Ulid::from_string(stem).is_err() {
1041            continue;
1042        }
1043
1044        out.push(path);
1045    }
1046
1047    Ok(())
1048}
1049
1050fn project_dir(root: &Path, project: &str) -> PathBuf {
1051    root.join(PROJECTS_DIR).join(project)
1052}
1053
1054fn load_or_create_device_peer_id(root: &Path) -> Result<PeerID> {
1055    let path = root.join("device_id");
1056    if let Some(parent) = path.parent() {
1057        fs::create_dir_all(parent)?;
1058    }
1059
1060    let device_ulid = if path.exists() {
1061        let content = fs::read_to_string(&path)
1062            .with_context(|| format!("failed reading device id from '{}'", path.display()))?;
1063        Ulid::from_string(content.trim()).context("invalid persisted device id ULID")?
1064    } else {
1065        let id = Ulid::new();
1066        atomic_write_file(&path, id.to_string().as_bytes())?;
1067        id
1068    };
1069
1070    let raw: u128 = device_ulid.into();
1071    Ok((raw & u64::MAX as u128) as u64)
1072}
1073
1074fn atomic_write_file(path: &Path, bytes: &[u8]) -> Result<()> {
1075    let parent = path
1076        .parent()
1077        .ok_or_else(|| anyhow!("cannot atomically write root path '{}'", path.display()))?;
1078    fs::create_dir_all(parent)?;
1079
1080    let tmp_name = format!(
1081        "{}.{}{}",
1082        path.file_name().and_then(|n| n.to_str()).unwrap_or("write"),
1083        Ulid::new(),
1084        TMP_SUFFIX
1085    );
1086    let tmp_path = parent.join(tmp_name);
1087
1088    {
1089        let mut file = OpenOptions::new()
1090            .create_new(true)
1091            .write(true)
1092            .open(&tmp_path)
1093            .with_context(|| format!("failed to open temp file '{}'", tmp_path.display()))?;
1094        file.write_all(bytes)?;
1095        file.sync_all()?;
1096    }
1097
1098    fs::rename(&tmp_path, path).with_context(|| {
1099        format!(
1100            "failed to atomically rename '{}' to '{}'",
1101            tmp_path.display(),
1102            path.display()
1103        )
1104    })?;
1105
1106    sync_dir(parent)?;
1107    Ok(())
1108}
1109
1110fn sync_dir(path: &Path) -> Result<()> {
1111    let dir =
1112        File::open(path).with_context(|| format!("failed opening dir '{}'", path.display()))?;
1113    dir.sync_all()
1114        .with_context(|| format!("failed fsync on dir '{}'", path.display()))?;
1115    Ok(())
1116}