db.rs

   1use anyhow::{anyhow, bail, Context, Result};
   2use loro::{Container, ExportMode, LoroDoc, LoroMap, PeerID, ValueOrContainer};
   3use serde::{Deserialize, Serialize};
   4use serde_json::Value;
   5use std::collections::BTreeMap;
   6use std::fmt;
   7use std::fs::{self, File, OpenOptions};
   8use std::io::Write;
   9use std::path::{Path, PathBuf};
  10use ulid::Ulid;
  11
  12pub const PROJECT_ENV: &str = "TD_PROJECT";
  13
  14const PROJECTS_DIR: &str = "projects";
  15const CHANGES_DIR: &str = "changes";
  16const BINDINGS_FILE: &str = "bindings.json";
  17const BASE_FILE: &str = "base.loro";
  18const TMP_SUFFIX: &str = ".tmp";
  19const SCHEMA_VERSION: u32 = 1;
  20
  21/// Current UTC time in ISO 8601 format.
  22pub fn now_utc() -> String {
  23    chrono::Utc::now().format("%Y-%m-%dT%H:%M:%SZ").to_string()
  24}
  25
  26/// Lifecycle state for a task.
  27#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
  28#[serde(rename_all = "snake_case")]
  29pub enum Status {
  30    Open,
  31    InProgress,
  32    Closed,
  33}
  34
  35impl Status {
  36    fn as_str(self) -> &'static str {
  37        match self {
  38            Status::Open => "open",
  39            Status::InProgress => "in_progress",
  40            Status::Closed => "closed",
  41        }
  42    }
  43
  44    fn parse(raw: &str) -> Result<Self> {
  45        match raw {
  46            "open" => Ok(Self::Open),
  47            "in_progress" => Ok(Self::InProgress),
  48            "closed" => Ok(Self::Closed),
  49            _ => bail!("invalid status '{raw}'"),
  50        }
  51    }
  52}
  53
  54/// Priority for task ordering.
  55#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
  56#[serde(rename_all = "snake_case")]
  57pub enum Priority {
  58    High,
  59    Medium,
  60    Low,
  61}
  62
  63impl Priority {
  64    fn as_str(self) -> &'static str {
  65        match self {
  66            Priority::High => "high",
  67            Priority::Medium => "medium",
  68            Priority::Low => "low",
  69        }
  70    }
  71
  72    fn parse(raw: &str) -> Result<Self> {
  73        match raw {
  74            "high" => Ok(Self::High),
  75            "medium" => Ok(Self::Medium),
  76            "low" => Ok(Self::Low),
  77            _ => bail!("invalid priority '{raw}'"),
  78        }
  79    }
  80
  81    pub fn score(self) -> i32 {
  82        match self {
  83            Priority::High => 1,
  84            Priority::Medium => 2,
  85            Priority::Low => 3,
  86        }
  87    }
  88}
  89
  90/// Estimated effort for a task.
  91#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
  92#[serde(rename_all = "snake_case")]
  93pub enum Effort {
  94    Low,
  95    Medium,
  96    High,
  97}
  98
  99impl Effort {
 100    fn as_str(self) -> &'static str {
 101        match self {
 102            Effort::Low => "low",
 103            Effort::Medium => "medium",
 104            Effort::High => "high",
 105        }
 106    }
 107
 108    fn parse(raw: &str) -> Result<Self> {
 109        match raw {
 110            "low" => Ok(Self::Low),
 111            "medium" => Ok(Self::Medium),
 112            "high" => Ok(Self::High),
 113            _ => bail!("invalid effort '{raw}'"),
 114        }
 115    }
 116
 117    pub fn score(self) -> i32 {
 118        match self {
 119            Effort::Low => 1,
 120            Effort::Medium => 2,
 121            Effort::High => 3,
 122        }
 123    }
 124}
 125
 126/// A stable task identifier backed by a ULID.
 127#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize)]
 128#[serde(transparent)]
 129pub struct TaskId(String);
 130
 131impl TaskId {
 132    pub fn new(id: Ulid) -> Self {
 133        Self(id.to_string())
 134    }
 135
 136    pub fn parse(raw: &str) -> Result<Self> {
 137        let id = Ulid::from_string(raw).with_context(|| format!("invalid task id '{raw}'"))?;
 138        Ok(Self::new(id))
 139    }
 140
 141    pub fn as_str(&self) -> &str {
 142        &self.0
 143    }
 144
 145    pub fn short(&self) -> String {
 146        format!("td-{}", &self.0[self.0.len() - 7..])
 147    }
 148
 149    /// Return a display-friendly short ID from a raw ULID string.
 150    pub fn display_id(raw: &str) -> String {
 151        let n = raw.len();
 152        if n > 7 {
 153            format!("td-{}", &raw[n - 7..])
 154        } else {
 155            format!("td-{raw}")
 156        }
 157    }
 158}
 159
 160impl fmt::Display for TaskId {
 161    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
 162        write!(f, "{}", self.short())
 163    }
 164}
 165
 166/// A task log entry embedded in a task record.
 167#[derive(Debug, Clone, Serialize)]
 168pub struct LogEntry {
 169    pub id: TaskId,
 170    pub timestamp: String,
 171    pub message: String,
 172}
 173
 174/// Hydrated task data from the CRDT document.
 175#[derive(Debug, Clone, Serialize)]
 176pub struct Task {
 177    pub id: TaskId,
 178    pub title: String,
 179    pub description: String,
 180    #[serde(rename = "type")]
 181    pub task_type: String,
 182    pub priority: Priority,
 183    pub status: Status,
 184    pub effort: Effort,
 185    pub parent: Option<TaskId>,
 186    pub created_at: String,
 187    pub updated_at: String,
 188    pub deleted_at: Option<String>,
 189    pub labels: Vec<String>,
 190    pub blockers: Vec<TaskId>,
 191    pub logs: Vec<LogEntry>,
 192}
 193
 194/// Result type for partitioning blockers by task state.
 195#[derive(Debug, Default, Clone, Serialize)]
 196pub struct BlockerPartition {
 197    pub open: Vec<TaskId>,
 198    pub resolved: Vec<TaskId>,
 199}
 200
 201#[derive(Debug, Default, Clone, Serialize, Deserialize)]
 202struct BindingsFile {
 203    #[serde(default)]
 204    bindings: BTreeMap<String, String>,
 205}
 206
 207/// Storage wrapper around one project's Loro document and disk layout.
 208#[derive(Debug, Clone)]
 209pub struct Store {
 210    root: PathBuf,
 211    project: String,
 212    doc: LoroDoc,
 213}
 214
 215impl Store {
 216    pub fn init(root: &Path, project: &str) -> Result<Self> {
 217        validate_project_name(project)?;
 218        let project_dir = project_dir(root, project);
 219        if project_dir.exists() {
 220            bail!("project '{project}' already exists");
 221        }
 222        fs::create_dir_all(project_dir.join(CHANGES_DIR))?;
 223
 224        let doc = LoroDoc::new();
 225        doc.set_peer_id(load_or_create_device_peer_id(root)?)?;
 226        doc.get_map("tasks");
 227
 228        let meta = doc.get_map("meta");
 229        meta.insert("schema_version", SCHEMA_VERSION as i64)?;
 230        meta.insert("project_id", Ulid::new().to_string())?;
 231        meta.insert("created_at", now_utc())?;
 232
 233        let snapshot = doc
 234            .export(ExportMode::Snapshot)
 235            .context("failed to export initial loro snapshot")?;
 236        atomic_write_file(&project_dir.join(BASE_FILE), &snapshot)?;
 237
 238        Ok(Self {
 239            root: root.to_path_buf(),
 240            project: project.to_string(),
 241            doc,
 242        })
 243    }
 244
 245    pub fn open(root: &Path, project: &str) -> Result<Self> {
 246        validate_project_name(project)?;
 247        let project_dir = project_dir(root, project);
 248        let base_path = project_dir.join(BASE_FILE);
 249
 250        if !base_path.exists() {
 251            bail!("project '{project}' is not initialized. Run 'td project init {project}'");
 252        }
 253
 254        let base = fs::read(&base_path)
 255            .with_context(|| format!("failed to read loro snapshot '{}'", base_path.display()))?;
 256
 257        let doc = LoroDoc::from_snapshot(&base).context("failed to load loro snapshot")?;
 258        doc.set_peer_id(load_or_create_device_peer_id(root)?)?;
 259
 260        let mut deltas = collect_delta_paths(&project_dir)?;
 261        deltas.sort_by_key(|path| {
 262            path.file_stem()
 263                .and_then(|s| s.to_str())
 264                .and_then(|s| Ulid::from_string(s).ok())
 265        });
 266
 267        for delta_path in deltas {
 268            let bytes = fs::read(&delta_path)
 269                .with_context(|| format!("failed to read loro delta '{}'", delta_path.display()))?;
 270            if let Err(err) = doc.import(&bytes) {
 271                // Tolerate malformed or stale delta files as requested by design.
 272                eprintln!(
 273                    "warning: skipping unreadable delta '{}': {err}",
 274                    delta_path.display()
 275                );
 276            }
 277        }
 278
 279        Ok(Self {
 280            root: root.to_path_buf(),
 281            project: project.to_string(),
 282            doc,
 283        })
 284    }
 285
 286    /// Bootstrap a local project from peer-provided delta bytes.
 287    ///
 288    /// The incoming delta is imported into a fresh document, validated to
 289    /// ensure it carries `meta.project_id`, and then persisted as a base
 290    /// snapshot for future opens.
 291    pub fn bootstrap_from_peer(root: &Path, project: &str, delta: &[u8]) -> Result<Self> {
 292        validate_project_name(project)?;
 293        let project_dir = project_dir(root, project);
 294        if project_dir.exists() {
 295            bail!("project '{project}' already exists");
 296        }
 297        fs::create_dir_all(project_dir.join(CHANGES_DIR))?;
 298
 299        let doc = LoroDoc::new();
 300        doc.set_peer_id(load_or_create_device_peer_id(root)?)?;
 301        doc.import(delta)
 302            .context("failed to import bootstrap delta from peer")?;
 303        doc.commit();
 304
 305        read_project_id_from_doc(&doc)
 306            .context("bootstrap delta is missing required project identity")?;
 307
 308        let snapshot = doc
 309            .export(ExportMode::Snapshot)
 310            .context("failed to export bootstrap loro snapshot")?;
 311        atomic_write_file(&project_dir.join(BASE_FILE), &snapshot)?;
 312
 313        Ok(Self {
 314            root: root.to_path_buf(),
 315            project: project.to_string(),
 316            doc,
 317        })
 318    }
 319
 320    pub fn root(&self) -> &Path {
 321        &self.root
 322    }
 323
 324    pub fn project_name(&self) -> &str {
 325        &self.project
 326    }
 327
 328    pub fn doc(&self) -> &LoroDoc {
 329        &self.doc
 330    }
 331
 332    /// Export all current state to a fresh base snapshot.
 333    /// Compact accumulated deltas into the base snapshot using a two-phase
 334    /// protocol that is safe against concurrent writers.
 335    ///
 336    /// **Phase 1** — rename `changes/` to `changes.compacting.<ulid>/`, then
 337    /// immediately create a fresh `changes/`.  Any concurrent `td` command
 338    /// that writes a delta after this point lands in the new `changes/` and is
 339    /// therefore never touched by this operation.
 340    ///
 341    /// **Phase 2** — write a fresh base snapshot from the in-memory document
 342    /// (which was loaded from both `base.loro` and every delta at `open` time),
 343    /// then remove the compacting directory.
 344    ///
 345    /// Any orphaned `changes.compacting.*` directories left by a previously
 346    /// crashed tidy are also removed: they were already merged into `self.doc`
 347    /// at open time, so the new snapshot includes their contents.
 348    ///
 349    /// Returns the number of delta files folded into the snapshot.
 350    pub fn tidy(&self) -> Result<usize> {
 351        let project_dir = project_dir(&self.root, &self.project);
 352        let changes_dir = project_dir.join(CHANGES_DIR);
 353
 354        // Phase 1: atomically hand off the current changes/ to a compacting
 355        // directory so new writers have a clean home immediately.
 356        let compacting_dir = project_dir.join(format!("changes.compacting.{}", Ulid::new()));
 357        if changes_dir.exists() {
 358            fs::rename(&changes_dir, &compacting_dir).with_context(|| {
 359                format!(
 360                    "failed to rename '{}' to '{}'",
 361                    changes_dir.display(),
 362                    compacting_dir.display()
 363                )
 364            })?;
 365        }
 366        fs::create_dir_all(&changes_dir).context("failed to create fresh changes/")?;
 367
 368        // Re-import every delta from the compacting directories.  self.doc
 369        // was populated at open() time, but a concurrent writer may have
 370        // appended a delta to changes/ between open() and the Phase 1
 371        // rename — that delta is now inside compacting_dir without being in
 372        // self.doc.  CRDT import is idempotent (deduplicates by OpID), so
 373        // re-importing already-known ops is harmless.
 374        let mut compacting_deltas = collect_delta_paths(&project_dir)?;
 375        compacting_deltas.sort_by_key(|path| {
 376            path.file_stem()
 377                .and_then(|s| s.to_str())
 378                .and_then(|s| Ulid::from_string(s).ok())
 379        });
 380        for delta_path in &compacting_deltas {
 381            if let Ok(bytes) = fs::read(delta_path) {
 382                if let Err(err) = self.doc.import(&bytes) {
 383                    eprintln!(
 384                        "warning: skipping unreadable delta '{}': {err}",
 385                        delta_path.display()
 386                    );
 387                }
 388            }
 389        }
 390
 391        // Phase 2: write the new base snapshot.  self.doc now holds the
 392        // full merged state including any concurrent deltas.
 393        let out = project_dir.join(BASE_FILE);
 394        let bytes = self
 395            .doc
 396            .export(ExportMode::Snapshot)
 397            .context("failed to export loro snapshot")?;
 398        atomic_write_file(&out, &bytes)?;
 399
 400        // Remove the compacting directory we created in phase 1 plus any
 401        // orphaned changes.compacting.* dirs from previously crashed tidies.
 402        let mut removed = 0usize;
 403        for entry in fs::read_dir(&project_dir)
 404            .with_context(|| format!("failed to read project dir '{}'", project_dir.display()))?
 405        {
 406            let path = entry?.path();
 407            if !path.is_dir() {
 408                continue;
 409            }
 410            let Some(name) = path.file_name().and_then(|n| n.to_str()) else {
 411                continue;
 412            };
 413            if !name.starts_with("changes.compacting.") {
 414                continue;
 415            }
 416            // Count files before removing for the summary report.
 417            for file in fs::read_dir(&path)
 418                .with_context(|| format!("failed to read '{}'", path.display()))?
 419            {
 420                let fp = file?.path();
 421                if fp.is_file() {
 422                    removed += 1;
 423                }
 424            }
 425            fs::remove_dir_all(&path)
 426                .with_context(|| format!("failed to remove compacting dir '{}'", path.display()))?;
 427        }
 428
 429        Ok(removed)
 430    }
 431
 432    /// Apply a local mutation and persist only the resulting delta.
 433    pub fn apply_and_persist<F>(&self, mutator: F) -> Result<PathBuf>
 434    where
 435        F: FnOnce(&LoroDoc) -> Result<()>,
 436    {
 437        let before = self.doc.oplog_vv();
 438        mutator(&self.doc)?;
 439        self.doc.commit();
 440
 441        let delta = self
 442            .doc
 443            .export(ExportMode::updates(&before))
 444            .context("failed to export loro update delta")?;
 445
 446        let filename = format!("{}.loro", Ulid::new());
 447        let path = project_dir(&self.root, &self.project)
 448            .join(CHANGES_DIR)
 449            .join(filename);
 450        atomic_write_file(&path, &delta)?;
 451        Ok(path)
 452    }
 453
 454    /// Persist pre-built delta bytes (e.g. received from a peer) as a new
 455    /// change file without re-exporting from the doc.
 456    pub fn save_raw_delta(&self, bytes: &[u8]) -> Result<PathBuf> {
 457        let filename = format!("{}.loro", Ulid::new());
 458        let path = project_dir(&self.root, &self.project)
 459            .join(CHANGES_DIR)
 460            .join(filename);
 461        atomic_write_file(&path, bytes)?;
 462        Ok(path)
 463    }
 464
 465    /// Return hydrated tasks, excluding tombstones.
 466    pub fn list_tasks(&self) -> Result<Vec<Task>> {
 467        self.list_tasks_inner(false)
 468    }
 469
 470    /// Return hydrated tasks, including tombstoned rows.
 471    pub fn list_tasks_unfiltered(&self) -> Result<Vec<Task>> {
 472        self.list_tasks_inner(true)
 473    }
 474
 475    /// Find a task by exact ULID string.
 476    pub fn get_task(&self, id: &TaskId, include_deleted: bool) -> Result<Option<Task>> {
 477        let tasks = if include_deleted {
 478            self.list_tasks_unfiltered()?
 479        } else {
 480            self.list_tasks()?
 481        };
 482        Ok(tasks.into_iter().find(|task| task.id == *id))
 483    }
 484
 485    fn list_tasks_inner(&self, include_deleted: bool) -> Result<Vec<Task>> {
 486        let root = serde_json::to_value(self.doc.get_deep_value())?;
 487        let tasks_obj = root
 488            .get("tasks")
 489            .and_then(Value::as_object)
 490            .ok_or_else(|| anyhow!("missing root tasks map"))?;
 491
 492        let mut tasks = Vec::with_capacity(tasks_obj.len());
 493        for (task_id_raw, task_json) in tasks_obj {
 494            let task = hydrate_task(task_id_raw, task_json)?;
 495            if include_deleted || task.deleted_at.is_none() {
 496                tasks.push(task);
 497            }
 498        }
 499
 500        tasks.sort_by(|a, b| a.id.as_str().cmp(b.id.as_str()));
 501        Ok(tasks)
 502    }
 503
 504    pub fn schema_version(&self) -> Result<u32> {
 505        let root = serde_json::to_value(self.doc.get_deep_value())?;
 506        let meta = root
 507            .get("meta")
 508            .and_then(Value::as_object)
 509            .ok_or_else(|| anyhow!("missing root meta map"))?;
 510        let n = meta
 511            .get("schema_version")
 512            .and_then(Value::as_u64)
 513            .ok_or_else(|| anyhow!("invalid or missing meta.schema_version"))?;
 514        Ok(n as u32)
 515    }
 516}
 517
 518/// Generate a new task ULID.
 519pub fn gen_id() -> TaskId {
 520    TaskId::new(Ulid::new())
 521}
 522
 523pub fn parse_status(s: &str) -> Result<Status> {
 524    Status::parse(s)
 525}
 526
 527pub fn parse_priority(s: &str) -> Result<Priority> {
 528    Priority::parse(s)
 529}
 530
 531pub fn parse_effort(s: &str) -> Result<Effort> {
 532    Effort::parse(s)
 533}
 534
 535pub fn status_label(s: Status) -> &'static str {
 536    s.as_str()
 537}
 538
 539pub fn priority_label(p: Priority) -> &'static str {
 540    p.as_str()
 541}
 542
 543pub fn effort_label(e: Effort) -> &'static str {
 544    e.as_str()
 545}
 546
 547pub fn data_root() -> Result<PathBuf> {
 548    let home = std::env::var("HOME").context("HOME is not set")?;
 549    Ok(PathBuf::from(home).join(".local").join("share").join("td"))
 550}
 551
 552pub fn init(cwd: &Path, project: &str) -> Result<Store> {
 553    let root = data_root()?;
 554    fs::create_dir_all(root.join(PROJECTS_DIR))?;
 555    let store = Store::init(&root, project)?;
 556    bind_project(cwd, project)?;
 557    Ok(store)
 558}
 559
 560pub fn use_project(cwd: &Path, project: &str) -> Result<()> {
 561    let root = data_root()?;
 562    validate_project_name(project)?;
 563    if !project_dir(&root, project).join(BASE_FILE).exists() {
 564        bail!("project '{project}' not found. Run 'td project list' to list known projects");
 565    }
 566    bind_project(cwd, project)
 567}
 568
 569pub fn open(start: &Path) -> Result<Store> {
 570    let root = data_root()?;
 571    let explicit = std::env::var(PROJECT_ENV).ok();
 572    let project = resolve_project_name(start, &root, explicit.as_deref())?.ok_or_else(|| {
 573        anyhow!(
 574            "no project selected. Use --project/TD_PROJECT, run 'td project bind <name>', or run 'td project init <name>'"
 575        )
 576    })?;
 577    Store::open(&root, &project)
 578}
 579
 580/// Open the project selected by `--project`/`TD_PROJECT`/bindings if one exists.
 581///
 582/// Returns `Ok(None)` when no project is selected by any mechanism.
 583pub fn try_open(start: &Path) -> Result<Option<Store>> {
 584    let root = data_root()?;
 585    let explicit = std::env::var(PROJECT_ENV).ok();
 586    let Some(project) = resolve_project_name(start, &root, explicit.as_deref())? else {
 587        return Ok(None);
 588    };
 589    Store::open(&root, &project).map(Some)
 590}
 591
 592/// Bootstrap a project from a peer delta and bind the current directory.
 593pub fn bootstrap_sync(cwd: &Path, project: &str, delta: &[u8]) -> Result<Store> {
 594    let root = data_root()?;
 595    fs::create_dir_all(root.join(PROJECTS_DIR))?;
 596    validate_project_name(project)?;
 597    let store = Store::bootstrap_from_peer(&root, project, delta)?;
 598    bind_project(cwd, project)?;
 599    Ok(store)
 600}
 601
 602pub fn list_projects() -> Result<Vec<String>> {
 603    let root = data_root()?;
 604    let mut out = Vec::new();
 605    let dir = root.join(PROJECTS_DIR);
 606    if !dir.exists() {
 607        return Ok(out);
 608    }
 609
 610    for entry in fs::read_dir(dir)? {
 611        let path = entry?.path();
 612        if !path.is_dir() {
 613            continue;
 614        }
 615        let Some(name) = path.file_name().and_then(|n| n.to_str()) else {
 616            continue;
 617        };
 618        if path.join(BASE_FILE).exists() {
 619            out.push(name.to_string());
 620        }
 621    }
 622
 623    out.sort();
 624    Ok(out)
 625}
 626
 627pub fn resolve_task_id(store: &Store, raw: &str, include_deleted: bool) -> Result<TaskId> {
 628    let raw = raw.strip_prefix("td-").unwrap_or(raw);
 629
 630    if let Ok(id) = TaskId::parse(raw) {
 631        if store.get_task(&id, include_deleted)?.is_some() {
 632            return Ok(id);
 633        }
 634    }
 635
 636    let tasks = if include_deleted {
 637        store.list_tasks_unfiltered()?
 638    } else {
 639        store.list_tasks()?
 640    };
 641
 642    let upper = raw.to_ascii_uppercase();
 643    let matches: Vec<TaskId> = tasks
 644        .into_iter()
 645        .filter(|t| t.id.as_str().ends_with(&upper))
 646        .map(|t| t.id)
 647        .collect();
 648
 649    match matches.as_slice() {
 650        [] => bail!("task '{raw}' not found"),
 651        [id] => Ok(id.clone()),
 652        _ => bail!("task reference '{raw}' is ambiguous"),
 653    }
 654}
 655
 656pub fn partition_blockers(store: &Store, blockers: &[TaskId]) -> Result<BlockerPartition> {
 657    let mut out = BlockerPartition::default();
 658    for blocker in blockers {
 659        let Some(task) = store.get_task(blocker, true)? else {
 660            out.resolved.push(blocker.clone());
 661            continue;
 662        };
 663        if task.status == Status::Closed || task.deleted_at.is_some() {
 664            out.resolved.push(blocker.clone());
 665        } else {
 666            out.open.push(blocker.clone());
 667        }
 668    }
 669    Ok(out)
 670}
 671
 672pub fn insert_task_map(tasks: &LoroMap, task_id: &TaskId) -> Result<LoroMap> {
 673    tasks
 674        .insert_container(task_id.as_str(), LoroMap::new())
 675        .context("failed to create task map")
 676}
 677
 678pub fn get_task_map(tasks: &LoroMap, task_id: &TaskId) -> Result<Option<LoroMap>> {
 679    match tasks.get(task_id.as_str()) {
 680        Some(ValueOrContainer::Container(Container::Map(map))) => Ok(Some(map)),
 681        Some(_) => bail!("task '{}' has invalid container type", task_id.as_str()),
 682        None => Ok(None),
 683    }
 684}
 685
 686pub fn get_or_create_child_map(parent: &LoroMap, key: &str) -> Result<LoroMap> {
 687    parent
 688        .get_or_create_container(key, LoroMap::new())
 689        .with_context(|| format!("failed to get or create map key '{key}'"))
 690}
 691
 692fn bindings_path(root: &Path) -> PathBuf {
 693    root.join(BINDINGS_FILE)
 694}
 695
 696fn resolve_project_name(
 697    start: &Path,
 698    root: &Path,
 699    explicit: Option<&str>,
 700) -> Result<Option<String>> {
 701    if let Some(project) = explicit {
 702        validate_project_name(project)?;
 703        return Ok(Some(project.to_string()));
 704    }
 705
 706    let cwd = canonicalize_binding_path(start)?;
 707    let bindings = load_bindings(root)?;
 708
 709    let mut best: Option<(usize, String)> = None;
 710    for (raw_path, project) in bindings.bindings {
 711        let bound = PathBuf::from(raw_path);
 712        if is_prefix_path(&bound, &cwd) {
 713            let score = bound.components().count();
 714            match &best {
 715                Some((best_score, _)) if *best_score >= score => {}
 716                _ => best = Some((score, project)),
 717            }
 718        }
 719    }
 720
 721    if let Some((_, project)) = best {
 722        return Ok(Some(project));
 723    }
 724
 725    Ok(None)
 726}
 727
 728pub fn unbind_project(cwd: &Path) -> Result<()> {
 729    let root = data_root()?;
 730    let canonical = canonicalize_binding_path(cwd)?;
 731    let canonical_str = canonical.to_string_lossy().to_string();
 732
 733    let mut bindings = load_bindings(&root)?;
 734    if !bindings.bindings.contains_key(&canonical_str) {
 735        bail!("path '{}' is not bound to any project", canonical.display());
 736    }
 737    bindings.bindings.remove(&canonical_str);
 738    save_bindings(&root, &bindings)
 739}
 740
 741pub fn delete_project(name: &str) -> Result<()> {
 742    validate_project_name(name)?;
 743    let root = data_root()?;
 744    let proj_dir = project_dir(&root, name);
 745
 746    if !proj_dir.join(BASE_FILE).exists() {
 747        bail!("project '{name}' not found");
 748    }
 749
 750    fs::remove_dir_all(&proj_dir).with_context(|| {
 751        format!(
 752            "failed to remove project directory '{}'",
 753            proj_dir.display()
 754        )
 755    })?;
 756
 757    let mut bindings = load_bindings(&root)?;
 758    bindings.bindings.retain(|_, project| project != name);
 759    save_bindings(&root, &bindings)
 760}
 761
 762fn bind_project(cwd: &Path, project: &str) -> Result<()> {
 763    validate_project_name(project)?;
 764
 765    let root = data_root()?;
 766    fs::create_dir_all(&root)?;
 767
 768    let canonical = canonicalize_binding_path(cwd)?;
 769    let mut bindings = load_bindings(&root)?;
 770    bindings
 771        .bindings
 772        .insert(canonical.to_string_lossy().to_string(), project.to_string());
 773    save_bindings(&root, &bindings)
 774}
 775
 776fn load_bindings(root: &Path) -> Result<BindingsFile> {
 777    let path = bindings_path(root);
 778    if !path.exists() {
 779        return Ok(BindingsFile::default());
 780    }
 781    let content = fs::read_to_string(&path)
 782        .with_context(|| format!("failed reading bindings from '{}'", path.display()))?;
 783    serde_json::from_str(&content)
 784        .with_context(|| format!("invalid bindings file '{}'", path.display()))
 785}
 786
 787fn save_bindings(root: &Path, bindings: &BindingsFile) -> Result<()> {
 788    let path = bindings_path(root);
 789    let bytes = serde_json::to_vec_pretty(bindings)?;
 790    atomic_write_file(&path, &bytes)
 791}
 792
 793fn canonicalize_binding_path(path: &Path) -> Result<PathBuf> {
 794    fs::canonicalize(path).with_context(|| format!("failed to canonicalize '{}'", path.display()))
 795}
 796
 797fn is_prefix_path(prefix: &Path, target: &Path) -> bool {
 798    let mut prefix_components = prefix.components();
 799    let mut target_components = target.components();
 800
 801    loop {
 802        match (prefix_components.next(), target_components.next()) {
 803            (None, _) => return true,
 804            (Some(_), None) => return false,
 805            (Some(a), Some(b)) if a == b => continue,
 806            _ => return false,
 807        }
 808    }
 809}
 810
 811fn validate_project_name(name: &str) -> Result<()> {
 812    if name.is_empty() {
 813        bail!("project name cannot be empty");
 814    }
 815    if name.contains('/') || name.contains('\\') || name == "." || name == ".." {
 816        bail!("invalid project name '{name}'");
 817    }
 818    if name.chars().any(char::is_control) {
 819        bail!("invalid project name '{name}'");
 820    }
 821    Ok(())
 822}
 823
 824fn read_project_id_from_doc(doc: &LoroDoc) -> Result<String> {
 825    let root = serde_json::to_value(doc.get_deep_value())?;
 826    root.get("meta")
 827        .and_then(|m| m.get("project_id"))
 828        .and_then(Value::as_str)
 829        .map(str::to_owned)
 830        .ok_or_else(|| anyhow!("missing meta.project_id in project doc"))
 831}
 832
 833fn hydrate_task(task_id_raw: &str, value: &Value) -> Result<Task> {
 834    let obj = value
 835        .as_object()
 836        .ok_or_else(|| anyhow!("task '{task_id_raw}' is not an object"))?;
 837
 838    let id = TaskId::parse(task_id_raw)?;
 839
 840    let title = get_required_string(obj, "title")?;
 841    let description = get_required_string(obj, "description")?;
 842    let task_type = get_required_string(obj, "type")?;
 843    let status = Status::parse(&get_required_string(obj, "status")?)?;
 844    let priority = Priority::parse(&get_required_string(obj, "priority")?)?;
 845    let effort = Effort::parse(&get_required_string(obj, "effort")?)?;
 846    let parent = match obj.get("parent").and_then(Value::as_str) {
 847        Some("") | None => None,
 848        Some(raw) => Some(TaskId::parse(raw)?),
 849    };
 850
 851    let created_at = get_required_string(obj, "created_at")?;
 852    let updated_at = get_required_string(obj, "updated_at")?;
 853    let deleted_at = obj
 854        .get("deleted_at")
 855        .and_then(Value::as_str)
 856        .map(str::to_owned)
 857        .filter(|s| !s.is_empty());
 858
 859    let labels = obj
 860        .get("labels")
 861        .and_then(Value::as_object)
 862        .map(|m| m.keys().cloned().collect())
 863        .unwrap_or_default();
 864
 865    let blockers = obj
 866        .get("blockers")
 867        .and_then(Value::as_object)
 868        .map(|m| {
 869            m.keys()
 870                .map(|raw| TaskId::parse(raw))
 871                .collect::<Result<Vec<_>>>()
 872        })
 873        .transpose()?
 874        .unwrap_or_default();
 875
 876    let mut logs = obj
 877        .get("logs")
 878        .and_then(Value::as_object)
 879        .map(|logs| {
 880            logs.iter()
 881                .map(|(log_id_raw, payload)| {
 882                    let payload_obj = payload.as_object().ok_or_else(|| {
 883                        anyhow!("log '{log_id_raw}' on task '{task_id_raw}' is not an object")
 884                    })?;
 885                    Ok(LogEntry {
 886                        id: TaskId::parse(log_id_raw)?,
 887                        timestamp: get_required_string(payload_obj, "timestamp")?,
 888                        message: get_required_string(payload_obj, "message")?,
 889                    })
 890                })
 891                .collect::<Result<Vec<_>>>()
 892        })
 893        .transpose()?
 894        .unwrap_or_default();
 895
 896    logs.sort_by(|a, b| a.id.as_str().cmp(b.id.as_str()));
 897
 898    Ok(Task {
 899        id,
 900        title,
 901        description,
 902        task_type,
 903        priority,
 904        status,
 905        effort,
 906        parent,
 907        created_at,
 908        updated_at,
 909        deleted_at,
 910        labels,
 911        blockers,
 912        logs,
 913    })
 914}
 915
 916fn get_required_string(map: &serde_json::Map<String, Value>, key: &str) -> Result<String> {
 917    map.get(key)
 918        .and_then(Value::as_str)
 919        .map(str::to_owned)
 920        .ok_or_else(|| anyhow!("missing or non-string key '{key}'"))
 921}
 922
 923fn collect_delta_paths(project_dir: &Path) -> Result<Vec<PathBuf>> {
 924    let mut paths = Vec::new();
 925    collect_changes_from_dir(&project_dir.join(CHANGES_DIR), &mut paths)?;
 926
 927    for entry in fs::read_dir(project_dir)? {
 928        let path = entry?.path();
 929        if !path.is_dir() {
 930            continue;
 931        }
 932        let Some(name) = path.file_name().and_then(|n| n.to_str()) else {
 933            continue;
 934        };
 935        if name.starts_with("changes.compacting.") {
 936            collect_changes_from_dir(&path, &mut paths)?;
 937        }
 938    }
 939
 940    Ok(paths)
 941}
 942
 943fn collect_changes_from_dir(dir: &Path, out: &mut Vec<PathBuf>) -> Result<()> {
 944    if !dir.exists() {
 945        return Ok(());
 946    }
 947
 948    for entry in fs::read_dir(dir)? {
 949        let path = entry?.path();
 950        if !path.is_file() {
 951            continue;
 952        }
 953
 954        let Some(filename) = path.file_name().and_then(|n| n.to_str()) else {
 955            continue;
 956        };
 957        if filename.ends_with(TMP_SUFFIX) || !filename.ends_with(".loro") {
 958            continue;
 959        }
 960
 961        let Some(stem) = path.file_stem().and_then(|s| s.to_str()) else {
 962            continue;
 963        };
 964        if Ulid::from_string(stem).is_err() {
 965            continue;
 966        }
 967
 968        out.push(path);
 969    }
 970
 971    Ok(())
 972}
 973
 974fn project_dir(root: &Path, project: &str) -> PathBuf {
 975    root.join(PROJECTS_DIR).join(project)
 976}
 977
 978fn load_or_create_device_peer_id(root: &Path) -> Result<PeerID> {
 979    let path = root.join("device_id");
 980    if let Some(parent) = path.parent() {
 981        fs::create_dir_all(parent)?;
 982    }
 983
 984    let device_ulid = if path.exists() {
 985        let content = fs::read_to_string(&path)
 986            .with_context(|| format!("failed reading device id from '{}'", path.display()))?;
 987        Ulid::from_string(content.trim()).context("invalid persisted device id ULID")?
 988    } else {
 989        let id = Ulid::new();
 990        atomic_write_file(&path, id.to_string().as_bytes())?;
 991        id
 992    };
 993
 994    let raw: u128 = device_ulid.into();
 995    Ok((raw & u64::MAX as u128) as u64)
 996}
 997
 998fn atomic_write_file(path: &Path, bytes: &[u8]) -> Result<()> {
 999    let parent = path
1000        .parent()
1001        .ok_or_else(|| anyhow!("cannot atomically write root path '{}'", path.display()))?;
1002    fs::create_dir_all(parent)?;
1003
1004    let tmp_name = format!(
1005        "{}.{}{}",
1006        path.file_name().and_then(|n| n.to_str()).unwrap_or("write"),
1007        Ulid::new(),
1008        TMP_SUFFIX
1009    );
1010    let tmp_path = parent.join(tmp_name);
1011
1012    {
1013        let mut file = OpenOptions::new()
1014            .create_new(true)
1015            .write(true)
1016            .open(&tmp_path)
1017            .with_context(|| format!("failed to open temp file '{}'", tmp_path.display()))?;
1018        file.write_all(bytes)?;
1019        file.sync_all()?;
1020    }
1021
1022    fs::rename(&tmp_path, path).with_context(|| {
1023        format!(
1024            "failed to atomically rename '{}' to '{}'",
1025            tmp_path.display(),
1026            path.display()
1027        )
1028    })?;
1029
1030    sync_dir(parent)?;
1031    Ok(())
1032}
1033
1034fn sync_dir(path: &Path) -> Result<()> {
1035    let dir =
1036        File::open(path).with_context(|| format!("failed opening dir '{}'", path.display()))?;
1037    dir.sync_all()
1038        .with_context(|| format!("failed fsync on dir '{}'", path.display()))?;
1039    Ok(())
1040}