db.rs

  1use anyhow::{anyhow, bail, Context, Result};
  2use loro::{Container, ExportMode, LoroDoc, LoroMap, PeerID, ValueOrContainer};
  3use serde::{Deserialize, Serialize};
  4use serde_json::Value;
  5use std::collections::BTreeMap;
  6use std::fmt;
  7use std::fs::{self, File, OpenOptions};
  8use std::io::Write;
  9use std::path::{Path, PathBuf};
 10use ulid::Ulid;
 11
 12pub const PROJECT_ENV: &str = "TD_PROJECT";
 13
 14const PROJECTS_DIR: &str = "projects";
 15const CHANGES_DIR: &str = "changes";
 16const BINDINGS_FILE: &str = "bindings.json";
 17const BASE_FILE: &str = "base.loro";
 18const TMP_SUFFIX: &str = ".tmp";
 19const SCHEMA_VERSION: u32 = 1;
 20
 21/// Current UTC time in ISO 8601 format.
 22pub fn now_utc() -> String {
 23    chrono::Utc::now().format("%Y-%m-%dT%H:%M:%SZ").to_string()
 24}
 25
 26/// Lifecycle state for a task.
 27#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
 28#[serde(rename_all = "snake_case")]
 29pub enum Status {
 30    Open,
 31    InProgress,
 32    Closed,
 33}
 34
 35impl Status {
 36    fn as_str(self) -> &'static str {
 37        match self {
 38            Status::Open => "open",
 39            Status::InProgress => "in_progress",
 40            Status::Closed => "closed",
 41        }
 42    }
 43
 44    fn parse(raw: &str) -> Result<Self> {
 45        match raw {
 46            "open" => Ok(Self::Open),
 47            "in_progress" => Ok(Self::InProgress),
 48            "closed" => Ok(Self::Closed),
 49            _ => bail!("invalid status '{raw}'"),
 50        }
 51    }
 52}
 53
 54/// Priority for task ordering.
 55#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
 56#[serde(rename_all = "snake_case")]
 57pub enum Priority {
 58    High,
 59    Medium,
 60    Low,
 61}
 62
 63impl Priority {
 64    fn as_str(self) -> &'static str {
 65        match self {
 66            Priority::High => "high",
 67            Priority::Medium => "medium",
 68            Priority::Low => "low",
 69        }
 70    }
 71
 72    fn parse(raw: &str) -> Result<Self> {
 73        match raw {
 74            "high" => Ok(Self::High),
 75            "medium" => Ok(Self::Medium),
 76            "low" => Ok(Self::Low),
 77            _ => bail!("invalid priority '{raw}'"),
 78        }
 79    }
 80
 81    pub fn score(self) -> i32 {
 82        match self {
 83            Priority::High => 1,
 84            Priority::Medium => 2,
 85            Priority::Low => 3,
 86        }
 87    }
 88}
 89
 90/// Estimated effort for a task.
 91#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
 92#[serde(rename_all = "snake_case")]
 93pub enum Effort {
 94    Low,
 95    Medium,
 96    High,
 97}
 98
 99impl Effort {
100    fn as_str(self) -> &'static str {
101        match self {
102            Effort::Low => "low",
103            Effort::Medium => "medium",
104            Effort::High => "high",
105        }
106    }
107
108    fn parse(raw: &str) -> Result<Self> {
109        match raw {
110            "low" => Ok(Self::Low),
111            "medium" => Ok(Self::Medium),
112            "high" => Ok(Self::High),
113            _ => bail!("invalid effort '{raw}'"),
114        }
115    }
116
117    pub fn score(self) -> i32 {
118        match self {
119            Effort::Low => 1,
120            Effort::Medium => 2,
121            Effort::High => 3,
122        }
123    }
124}
125
126/// A stable task identifier backed by a ULID.
127#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize)]
128#[serde(transparent)]
129pub struct TaskId(String);
130
131impl TaskId {
132    pub fn new(id: Ulid) -> Self {
133        Self(id.to_string())
134    }
135
136    pub fn parse(raw: &str) -> Result<Self> {
137        let id = Ulid::from_string(raw).with_context(|| format!("invalid task id '{raw}'"))?;
138        Ok(Self::new(id))
139    }
140
141    pub fn as_str(&self) -> &str {
142        &self.0
143    }
144
145    pub fn short(&self) -> String {
146        self.0[self.0.len() - 7..].to_string()
147    }
148}
149
150impl fmt::Display for TaskId {
151    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
152        write!(f, "{}", self.short())
153    }
154}
155
156/// A task log entry embedded in a task record.
157#[derive(Debug, Clone, Serialize)]
158pub struct LogEntry {
159    pub id: TaskId,
160    pub timestamp: String,
161    pub message: String,
162}
163
164/// Hydrated task data from the CRDT document.
165#[derive(Debug, Clone, Serialize)]
166pub struct Task {
167    pub id: TaskId,
168    pub title: String,
169    pub description: String,
170    #[serde(rename = "type")]
171    pub task_type: String,
172    pub priority: Priority,
173    pub status: Status,
174    pub effort: Effort,
175    pub parent: Option<TaskId>,
176    pub created_at: String,
177    pub updated_at: String,
178    pub deleted_at: Option<String>,
179    pub labels: Vec<String>,
180    pub blockers: Vec<TaskId>,
181    pub logs: Vec<LogEntry>,
182}
183
184/// Result type for partitioning blockers by task state.
185#[derive(Debug, Default, Clone, Serialize)]
186pub struct BlockerPartition {
187    pub open: Vec<TaskId>,
188    pub resolved: Vec<TaskId>,
189}
190
191#[derive(Debug, Default, Clone, Serialize, Deserialize)]
192struct BindingsFile {
193    #[serde(default)]
194    bindings: BTreeMap<String, String>,
195}
196
197/// Storage wrapper around one project's Loro document and disk layout.
198#[derive(Debug, Clone)]
199pub struct Store {
200    root: PathBuf,
201    project: String,
202    doc: LoroDoc,
203}
204
205impl Store {
206    pub fn init(root: &Path, project: &str) -> Result<Self> {
207        validate_project_name(project)?;
208        let project_dir = project_dir(root, project);
209        if project_dir.exists() {
210            bail!("project '{project}' already exists");
211        }
212        fs::create_dir_all(project_dir.join(CHANGES_DIR))?;
213
214        let doc = LoroDoc::new();
215        doc.set_peer_id(load_or_create_device_peer_id(root)?)?;
216        doc.get_map("tasks");
217
218        let meta = doc.get_map("meta");
219        meta.insert("schema_version", SCHEMA_VERSION as i64)?;
220        meta.insert("project_id", Ulid::new().to_string())?;
221        meta.insert("created_at", now_utc())?;
222
223        let snapshot = doc
224            .export(ExportMode::Snapshot)
225            .context("failed to export initial loro snapshot")?;
226        atomic_write_file(&project_dir.join(BASE_FILE), &snapshot)?;
227
228        Ok(Self {
229            root: root.to_path_buf(),
230            project: project.to_string(),
231            doc,
232        })
233    }
234
235    pub fn open(root: &Path, project: &str) -> Result<Self> {
236        validate_project_name(project)?;
237        let project_dir = project_dir(root, project);
238        let base_path = project_dir.join(BASE_FILE);
239
240        if !base_path.exists() {
241            bail!("project '{project}' is not initialized. Run 'td init {project}'");
242        }
243
244        let base = fs::read(&base_path)
245            .with_context(|| format!("failed to read loro snapshot '{}'", base_path.display()))?;
246
247        let doc = LoroDoc::from_snapshot(&base).context("failed to load loro snapshot")?;
248        doc.set_peer_id(load_or_create_device_peer_id(root)?)?;
249
250        let mut deltas = collect_delta_paths(&project_dir)?;
251        deltas.sort_by_key(|path| {
252            path.file_stem()
253                .and_then(|s| s.to_str())
254                .and_then(|s| Ulid::from_string(s).ok())
255        });
256
257        for delta_path in deltas {
258            let bytes = fs::read(&delta_path)
259                .with_context(|| format!("failed to read loro delta '{}'", delta_path.display()))?;
260            if let Err(err) = doc.import(&bytes) {
261                // Tolerate malformed or stale delta files as requested by design.
262                eprintln!(
263                    "warning: skipping unreadable delta '{}': {err}",
264                    delta_path.display()
265                );
266            }
267        }
268
269        Ok(Self {
270            root: root.to_path_buf(),
271            project: project.to_string(),
272            doc,
273        })
274    }
275
276    /// Bootstrap a local project from peer-provided delta bytes.
277    ///
278    /// The incoming delta is imported into a fresh document, validated to
279    /// ensure it carries `meta.project_id`, and then persisted as a base
280    /// snapshot for future opens.
281    pub fn bootstrap_from_peer(root: &Path, project: &str, delta: &[u8]) -> Result<Self> {
282        validate_project_name(project)?;
283        let project_dir = project_dir(root, project);
284        if project_dir.exists() {
285            bail!("project '{project}' already exists");
286        }
287        fs::create_dir_all(project_dir.join(CHANGES_DIR))?;
288
289        let doc = LoroDoc::new();
290        doc.set_peer_id(load_or_create_device_peer_id(root)?)?;
291        doc.import(delta)
292            .context("failed to import bootstrap delta from peer")?;
293        doc.commit();
294
295        read_project_id_from_doc(&doc)
296            .context("bootstrap delta is missing required project identity")?;
297
298        let snapshot = doc
299            .export(ExportMode::Snapshot)
300            .context("failed to export bootstrap loro snapshot")?;
301        atomic_write_file(&project_dir.join(BASE_FILE), &snapshot)?;
302
303        Ok(Self {
304            root: root.to_path_buf(),
305            project: project.to_string(),
306            doc,
307        })
308    }
309
310    pub fn root(&self) -> &Path {
311        &self.root
312    }
313
314    pub fn project_name(&self) -> &str {
315        &self.project
316    }
317
318    pub fn doc(&self) -> &LoroDoc {
319        &self.doc
320    }
321
322    /// Export all current state to a fresh base snapshot.
323    pub fn write_snapshot(&self) -> Result<PathBuf> {
324        let out = project_dir(&self.root, &self.project).join(BASE_FILE);
325        let bytes = self
326            .doc
327            .export(ExportMode::Snapshot)
328            .context("failed to export loro snapshot")?;
329        atomic_write_file(&out, &bytes)?;
330        Ok(out)
331    }
332
333    /// Delete persisted delta files after a fresh snapshot has been written.
334    pub fn purge_deltas(&self) -> Result<usize> {
335        let project_dir = project_dir(&self.root, &self.project);
336        let paths = collect_delta_paths(&project_dir)?;
337        let mut removed = 0usize;
338        for path in paths {
339            fs::remove_file(&path)
340                .with_context(|| format!("failed removing delta '{}'", path.display()))?;
341            removed += 1;
342        }
343        Ok(removed)
344    }
345
346    /// Apply a local mutation and persist only the resulting delta.
347    pub fn apply_and_persist<F>(&self, mutator: F) -> Result<PathBuf>
348    where
349        F: FnOnce(&LoroDoc) -> Result<()>,
350    {
351        let before = self.doc.oplog_vv();
352        mutator(&self.doc)?;
353        self.doc.commit();
354
355        let delta = self
356            .doc
357            .export(ExportMode::updates(&before))
358            .context("failed to export loro update delta")?;
359
360        let filename = format!("{}.loro", Ulid::new());
361        let path = project_dir(&self.root, &self.project)
362            .join(CHANGES_DIR)
363            .join(filename);
364        atomic_write_file(&path, &delta)?;
365        Ok(path)
366    }
367
368    /// Persist pre-built delta bytes (e.g. received from a peer) as a new
369    /// change file without re-exporting from the doc.
370    pub fn save_raw_delta(&self, bytes: &[u8]) -> Result<PathBuf> {
371        let filename = format!("{}.loro", Ulid::new());
372        let path = project_dir(&self.root, &self.project)
373            .join(CHANGES_DIR)
374            .join(filename);
375        atomic_write_file(&path, bytes)?;
376        Ok(path)
377    }
378
379    /// Return hydrated tasks, excluding tombstones.
380    pub fn list_tasks(&self) -> Result<Vec<Task>> {
381        self.list_tasks_inner(false)
382    }
383
384    /// Return hydrated tasks, including tombstoned rows.
385    pub fn list_tasks_unfiltered(&self) -> Result<Vec<Task>> {
386        self.list_tasks_inner(true)
387    }
388
389    /// Find a task by exact ULID string.
390    pub fn get_task(&self, id: &TaskId, include_deleted: bool) -> Result<Option<Task>> {
391        let tasks = if include_deleted {
392            self.list_tasks_unfiltered()?
393        } else {
394            self.list_tasks()?
395        };
396        Ok(tasks.into_iter().find(|task| task.id == *id))
397    }
398
399    fn list_tasks_inner(&self, include_deleted: bool) -> Result<Vec<Task>> {
400        let root = serde_json::to_value(self.doc.get_deep_value())?;
401        let tasks_obj = root
402            .get("tasks")
403            .and_then(Value::as_object)
404            .ok_or_else(|| anyhow!("missing root tasks map"))?;
405
406        let mut tasks = Vec::with_capacity(tasks_obj.len());
407        for (task_id_raw, task_json) in tasks_obj {
408            let task = hydrate_task(task_id_raw, task_json)?;
409            if include_deleted || task.deleted_at.is_none() {
410                tasks.push(task);
411            }
412        }
413
414        tasks.sort_by(|a, b| a.id.as_str().cmp(b.id.as_str()));
415        Ok(tasks)
416    }
417
418    pub fn schema_version(&self) -> Result<u32> {
419        let root = serde_json::to_value(self.doc.get_deep_value())?;
420        let meta = root
421            .get("meta")
422            .and_then(Value::as_object)
423            .ok_or_else(|| anyhow!("missing root meta map"))?;
424        let n = meta
425            .get("schema_version")
426            .and_then(Value::as_u64)
427            .ok_or_else(|| anyhow!("invalid or missing meta.schema_version"))?;
428        Ok(n as u32)
429    }
430}
431
432/// Generate a new task ULID.
433pub fn gen_id() -> TaskId {
434    TaskId::new(Ulid::new())
435}
436
437pub fn parse_status(s: &str) -> Result<Status> {
438    Status::parse(s)
439}
440
441pub fn parse_priority(s: &str) -> Result<Priority> {
442    Priority::parse(s)
443}
444
445pub fn parse_effort(s: &str) -> Result<Effort> {
446    Effort::parse(s)
447}
448
449pub fn status_label(s: Status) -> &'static str {
450    s.as_str()
451}
452
453pub fn priority_label(p: Priority) -> &'static str {
454    p.as_str()
455}
456
457pub fn effort_label(e: Effort) -> &'static str {
458    e.as_str()
459}
460
461pub fn data_root() -> Result<PathBuf> {
462    let home = std::env::var("HOME").context("HOME is not set")?;
463    Ok(PathBuf::from(home).join(".local").join("share").join("td"))
464}
465
466pub fn init(cwd: &Path, project: &str) -> Result<Store> {
467    let root = data_root()?;
468    fs::create_dir_all(root.join(PROJECTS_DIR))?;
469    let store = Store::init(&root, project)?;
470    bind_project(cwd, project)?;
471    Ok(store)
472}
473
474pub fn use_project(cwd: &Path, project: &str) -> Result<()> {
475    let root = data_root()?;
476    validate_project_name(project)?;
477    if !project_dir(&root, project).join(BASE_FILE).exists() {
478        bail!("project '{project}' not found. Run 'td projects' to list known projects");
479    }
480    bind_project(cwd, project)
481}
482
483pub fn open(start: &Path) -> Result<Store> {
484    let root = data_root()?;
485    let explicit = std::env::var(PROJECT_ENV).ok();
486    let project = resolve_project_name(start, &root, explicit.as_deref())?.ok_or_else(|| {
487        anyhow!(
488            "no project selected. Use --project/TD_PROJECT, run 'td use <name>', or run 'td init <name>'"
489        )
490    })?;
491    Store::open(&root, &project)
492}
493
494/// Open the project selected by `--project`/`TD_PROJECT`/bindings if one exists.
495///
496/// Returns `Ok(None)` when no project is selected by any mechanism.
497pub fn try_open(start: &Path) -> Result<Option<Store>> {
498    let root = data_root()?;
499    let explicit = std::env::var(PROJECT_ENV).ok();
500    let Some(project) = resolve_project_name(start, &root, explicit.as_deref())? else {
501        return Ok(None);
502    };
503    Store::open(&root, &project).map(Some)
504}
505
506/// Bootstrap a project from a peer delta and bind the current directory.
507pub fn bootstrap_sync(cwd: &Path, project: &str, delta: &[u8]) -> Result<Store> {
508    let root = data_root()?;
509    fs::create_dir_all(root.join(PROJECTS_DIR))?;
510    validate_project_name(project)?;
511    let store = Store::bootstrap_from_peer(&root, project, delta)?;
512    bind_project(cwd, project)?;
513    Ok(store)
514}
515
516pub fn list_projects() -> Result<Vec<String>> {
517    let root = data_root()?;
518    let mut out = Vec::new();
519    let dir = root.join(PROJECTS_DIR);
520    if !dir.exists() {
521        return Ok(out);
522    }
523
524    for entry in fs::read_dir(dir)? {
525        let path = entry?.path();
526        if !path.is_dir() {
527            continue;
528        }
529        let Some(name) = path.file_name().and_then(|n| n.to_str()) else {
530            continue;
531        };
532        if path.join(BASE_FILE).exists() {
533            out.push(name.to_string());
534        }
535    }
536
537    out.sort();
538    Ok(out)
539}
540
541pub fn resolve_task_id(store: &Store, raw: &str, include_deleted: bool) -> Result<TaskId> {
542    if let Ok(id) = TaskId::parse(raw) {
543        if store.get_task(&id, include_deleted)?.is_some() {
544            return Ok(id);
545        }
546    }
547
548    let tasks = if include_deleted {
549        store.list_tasks_unfiltered()?
550    } else {
551        store.list_tasks()?
552    };
553
554    let upper = raw.to_ascii_uppercase();
555    let matches: Vec<TaskId> = tasks
556        .into_iter()
557        .filter(|t| t.id.as_str().ends_with(&upper))
558        .map(|t| t.id)
559        .collect();
560
561    match matches.as_slice() {
562        [] => bail!("task '{raw}' not found"),
563        [id] => Ok(id.clone()),
564        _ => bail!("task reference '{raw}' is ambiguous"),
565    }
566}
567
568pub fn partition_blockers(store: &Store, blockers: &[TaskId]) -> Result<BlockerPartition> {
569    let mut out = BlockerPartition::default();
570    for blocker in blockers {
571        let Some(task) = store.get_task(blocker, true)? else {
572            out.resolved.push(blocker.clone());
573            continue;
574        };
575        if task.status == Status::Closed || task.deleted_at.is_some() {
576            out.resolved.push(blocker.clone());
577        } else {
578            out.open.push(blocker.clone());
579        }
580    }
581    Ok(out)
582}
583
584pub fn insert_task_map(tasks: &LoroMap, task_id: &TaskId) -> Result<LoroMap> {
585    tasks
586        .insert_container(task_id.as_str(), LoroMap::new())
587        .context("failed to create task map")
588}
589
590pub fn get_task_map(tasks: &LoroMap, task_id: &TaskId) -> Result<Option<LoroMap>> {
591    match tasks.get(task_id.as_str()) {
592        Some(ValueOrContainer::Container(Container::Map(map))) => Ok(Some(map)),
593        Some(_) => bail!("task '{}' has invalid container type", task_id.as_str()),
594        None => Ok(None),
595    }
596}
597
598pub fn get_or_create_child_map(parent: &LoroMap, key: &str) -> Result<LoroMap> {
599    parent
600        .get_or_create_container(key, LoroMap::new())
601        .with_context(|| format!("failed to get or create map key '{key}'"))
602}
603
604fn bindings_path(root: &Path) -> PathBuf {
605    root.join(BINDINGS_FILE)
606}
607
608fn resolve_project_name(
609    start: &Path,
610    root: &Path,
611    explicit: Option<&str>,
612) -> Result<Option<String>> {
613    if let Some(project) = explicit {
614        validate_project_name(project)?;
615        return Ok(Some(project.to_string()));
616    }
617
618    let cwd = canonicalize_binding_path(start)?;
619    let bindings = load_bindings(root)?;
620
621    let mut best: Option<(usize, String)> = None;
622    for (raw_path, project) in bindings.bindings {
623        let bound = PathBuf::from(raw_path);
624        if is_prefix_path(&bound, &cwd) {
625            let score = bound.components().count();
626            match &best {
627                Some((best_score, _)) if *best_score >= score => {}
628                _ => best = Some((score, project)),
629            }
630        }
631    }
632
633    if let Some((_, project)) = best {
634        return Ok(Some(project));
635    }
636
637    Ok(None)
638}
639
640fn bind_project(cwd: &Path, project: &str) -> Result<()> {
641    validate_project_name(project)?;
642
643    let root = data_root()?;
644    fs::create_dir_all(&root)?;
645
646    let canonical = canonicalize_binding_path(cwd)?;
647    let mut bindings = load_bindings(&root)?;
648    bindings
649        .bindings
650        .insert(canonical.to_string_lossy().to_string(), project.to_string());
651    save_bindings(&root, &bindings)
652}
653
654fn load_bindings(root: &Path) -> Result<BindingsFile> {
655    let path = bindings_path(root);
656    if !path.exists() {
657        return Ok(BindingsFile::default());
658    }
659    let content = fs::read_to_string(&path)
660        .with_context(|| format!("failed reading bindings from '{}'", path.display()))?;
661    serde_json::from_str(&content)
662        .with_context(|| format!("invalid bindings file '{}'", path.display()))
663}
664
665fn save_bindings(root: &Path, bindings: &BindingsFile) -> Result<()> {
666    let path = bindings_path(root);
667    let bytes = serde_json::to_vec_pretty(bindings)?;
668    atomic_write_file(&path, &bytes)
669}
670
671fn canonicalize_binding_path(path: &Path) -> Result<PathBuf> {
672    fs::canonicalize(path).with_context(|| format!("failed to canonicalize '{}'", path.display()))
673}
674
675fn is_prefix_path(prefix: &Path, target: &Path) -> bool {
676    let mut prefix_components = prefix.components();
677    let mut target_components = target.components();
678
679    loop {
680        match (prefix_components.next(), target_components.next()) {
681            (None, _) => return true,
682            (Some(_), None) => return false,
683            (Some(a), Some(b)) if a == b => continue,
684            _ => return false,
685        }
686    }
687}
688
689fn validate_project_name(name: &str) -> Result<()> {
690    if name.is_empty() {
691        bail!("project name cannot be empty");
692    }
693    if name.contains('/') || name.contains('\\') || name == "." || name == ".." {
694        bail!("invalid project name '{name}'");
695    }
696    if name.chars().any(char::is_control) {
697        bail!("invalid project name '{name}'");
698    }
699    Ok(())
700}
701
702fn read_project_id_from_doc(doc: &LoroDoc) -> Result<String> {
703    let root = serde_json::to_value(doc.get_deep_value())?;
704    root.get("meta")
705        .and_then(|m| m.get("project_id"))
706        .and_then(Value::as_str)
707        .map(str::to_owned)
708        .ok_or_else(|| anyhow!("missing meta.project_id in project doc"))
709}
710
711fn hydrate_task(task_id_raw: &str, value: &Value) -> Result<Task> {
712    let obj = value
713        .as_object()
714        .ok_or_else(|| anyhow!("task '{task_id_raw}' is not an object"))?;
715
716    let id = TaskId::parse(task_id_raw)?;
717
718    let title = get_required_string(obj, "title")?;
719    let description = get_required_string(obj, "description")?;
720    let task_type = get_required_string(obj, "type")?;
721    let status = Status::parse(&get_required_string(obj, "status")?)?;
722    let priority = Priority::parse(&get_required_string(obj, "priority")?)?;
723    let effort = Effort::parse(&get_required_string(obj, "effort")?)?;
724    let parent = match obj.get("parent").and_then(Value::as_str) {
725        Some("") | None => None,
726        Some(raw) => Some(TaskId::parse(raw)?),
727    };
728
729    let created_at = get_required_string(obj, "created_at")?;
730    let updated_at = get_required_string(obj, "updated_at")?;
731    let deleted_at = obj
732        .get("deleted_at")
733        .and_then(Value::as_str)
734        .map(str::to_owned)
735        .filter(|s| !s.is_empty());
736
737    let labels = obj
738        .get("labels")
739        .and_then(Value::as_object)
740        .map(|m| m.keys().cloned().collect())
741        .unwrap_or_default();
742
743    let blockers = obj
744        .get("blockers")
745        .and_then(Value::as_object)
746        .map(|m| {
747            m.keys()
748                .map(|raw| TaskId::parse(raw))
749                .collect::<Result<Vec<_>>>()
750        })
751        .transpose()?
752        .unwrap_or_default();
753
754    let mut logs = obj
755        .get("logs")
756        .and_then(Value::as_object)
757        .map(|logs| {
758            logs.iter()
759                .map(|(log_id_raw, payload)| {
760                    let payload_obj = payload.as_object().ok_or_else(|| {
761                        anyhow!("log '{log_id_raw}' on task '{task_id_raw}' is not an object")
762                    })?;
763                    Ok(LogEntry {
764                        id: TaskId::parse(log_id_raw)?,
765                        timestamp: get_required_string(payload_obj, "timestamp")?,
766                        message: get_required_string(payload_obj, "message")?,
767                    })
768                })
769                .collect::<Result<Vec<_>>>()
770        })
771        .transpose()?
772        .unwrap_or_default();
773
774    logs.sort_by(|a, b| a.id.as_str().cmp(b.id.as_str()));
775
776    Ok(Task {
777        id,
778        title,
779        description,
780        task_type,
781        priority,
782        status,
783        effort,
784        parent,
785        created_at,
786        updated_at,
787        deleted_at,
788        labels,
789        blockers,
790        logs,
791    })
792}
793
794fn get_required_string(map: &serde_json::Map<String, Value>, key: &str) -> Result<String> {
795    map.get(key)
796        .and_then(Value::as_str)
797        .map(str::to_owned)
798        .ok_or_else(|| anyhow!("missing or non-string key '{key}'"))
799}
800
801fn collect_delta_paths(project_dir: &Path) -> Result<Vec<PathBuf>> {
802    let mut paths = Vec::new();
803    collect_changes_from_dir(&project_dir.join(CHANGES_DIR), &mut paths)?;
804
805    for entry in fs::read_dir(project_dir)? {
806        let path = entry?.path();
807        if !path.is_dir() {
808            continue;
809        }
810        let Some(name) = path.file_name().and_then(|n| n.to_str()) else {
811            continue;
812        };
813        if name.starts_with("changes.compacting.") {
814            collect_changes_from_dir(&path, &mut paths)?;
815        }
816    }
817
818    Ok(paths)
819}
820
821fn collect_changes_from_dir(dir: &Path, out: &mut Vec<PathBuf>) -> Result<()> {
822    if !dir.exists() {
823        return Ok(());
824    }
825
826    for entry in fs::read_dir(dir)? {
827        let path = entry?.path();
828        if !path.is_file() {
829            continue;
830        }
831
832        let Some(filename) = path.file_name().and_then(|n| n.to_str()) else {
833            continue;
834        };
835        if filename.ends_with(TMP_SUFFIX) || !filename.ends_with(".loro") {
836            continue;
837        }
838
839        let Some(stem) = path.file_stem().and_then(|s| s.to_str()) else {
840            continue;
841        };
842        if Ulid::from_string(stem).is_err() {
843            continue;
844        }
845
846        out.push(path);
847    }
848
849    Ok(())
850}
851
852fn project_dir(root: &Path, project: &str) -> PathBuf {
853    root.join(PROJECTS_DIR).join(project)
854}
855
856fn load_or_create_device_peer_id(root: &Path) -> Result<PeerID> {
857    let path = root.join("device_id");
858    if let Some(parent) = path.parent() {
859        fs::create_dir_all(parent)?;
860    }
861
862    let device_ulid = if path.exists() {
863        let content = fs::read_to_string(&path)
864            .with_context(|| format!("failed reading device id from '{}'", path.display()))?;
865        Ulid::from_string(content.trim()).context("invalid persisted device id ULID")?
866    } else {
867        let id = Ulid::new();
868        atomic_write_file(&path, id.to_string().as_bytes())?;
869        id
870    };
871
872    let raw: u128 = device_ulid.into();
873    Ok((raw & u64::MAX as u128) as u64)
874}
875
876fn atomic_write_file(path: &Path, bytes: &[u8]) -> Result<()> {
877    let parent = path
878        .parent()
879        .ok_or_else(|| anyhow!("cannot atomically write root path '{}'", path.display()))?;
880    fs::create_dir_all(parent)?;
881
882    let tmp_name = format!(
883        "{}.{}{}",
884        path.file_name().and_then(|n| n.to_str()).unwrap_or("write"),
885        Ulid::new(),
886        TMP_SUFFIX
887    );
888    let tmp_path = parent.join(tmp_name);
889
890    {
891        let mut file = OpenOptions::new()
892            .create_new(true)
893            .write(true)
894            .open(&tmp_path)
895            .with_context(|| format!("failed to open temp file '{}'", tmp_path.display()))?;
896        file.write_all(bytes)?;
897        file.sync_all()?;
898    }
899
900    fs::rename(&tmp_path, path).with_context(|| {
901        format!(
902            "failed to atomically rename '{}' to '{}'",
903            tmp_path.display(),
904            path.display()
905        )
906    })?;
907
908    sync_dir(parent)?;
909    Ok(())
910}
911
912fn sync_dir(path: &Path) -> Result<()> {
913    let dir =
914        File::open(path).with_context(|| format!("failed opening dir '{}'", path.display()))?;
915    dir.sync_all()
916        .with_context(|| format!("failed fsync on dir '{}'", path.display()))?;
917    Ok(())
918}