db.rs

  1use anyhow::{anyhow, bail, Context, Result};
  2use loro::{Container, ExportMode, LoroDoc, LoroMap, PeerID, ValueOrContainer};
  3use serde::{Deserialize, Serialize};
  4use serde_json::Value;
  5use std::collections::BTreeMap;
  6use std::fmt;
  7use std::fs::{self, File, OpenOptions};
  8use std::io::Write;
  9use std::path::{Path, PathBuf};
 10use ulid::Ulid;
 11
 12pub const PROJECT_ENV: &str = "TD_PROJECT";
 13
 14const PROJECTS_DIR: &str = "projects";
 15const CHANGES_DIR: &str = "changes";
 16const BINDINGS_FILE: &str = "bindings.json";
 17const BASE_FILE: &str = "base.loro";
 18const TMP_SUFFIX: &str = ".tmp";
 19const SCHEMA_VERSION: u32 = 1;
 20
 21/// Current UTC time in ISO 8601 format.
 22pub fn now_utc() -> String {
 23    chrono::Utc::now().format("%Y-%m-%dT%H:%M:%SZ").to_string()
 24}
 25
 26/// Lifecycle state for a task.
 27#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
 28#[serde(rename_all = "snake_case")]
 29pub enum Status {
 30    Open,
 31    InProgress,
 32    Closed,
 33}
 34
 35impl Status {
 36    fn as_str(self) -> &'static str {
 37        match self {
 38            Status::Open => "open",
 39            Status::InProgress => "in_progress",
 40            Status::Closed => "closed",
 41        }
 42    }
 43
 44    fn parse(raw: &str) -> Result<Self> {
 45        match raw {
 46            "open" => Ok(Self::Open),
 47            "in_progress" => Ok(Self::InProgress),
 48            "closed" => Ok(Self::Closed),
 49            _ => bail!("invalid status '{raw}'"),
 50        }
 51    }
 52}
 53
 54/// Priority for task ordering.
 55#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
 56#[serde(rename_all = "snake_case")]
 57pub enum Priority {
 58    High,
 59    Medium,
 60    Low,
 61}
 62
 63impl Priority {
 64    fn as_str(self) -> &'static str {
 65        match self {
 66            Priority::High => "high",
 67            Priority::Medium => "medium",
 68            Priority::Low => "low",
 69        }
 70    }
 71
 72    fn parse(raw: &str) -> Result<Self> {
 73        match raw {
 74            "high" => Ok(Self::High),
 75            "medium" => Ok(Self::Medium),
 76            "low" => Ok(Self::Low),
 77            _ => bail!("invalid priority '{raw}'"),
 78        }
 79    }
 80
 81    pub fn score(self) -> i32 {
 82        match self {
 83            Priority::High => 1,
 84            Priority::Medium => 2,
 85            Priority::Low => 3,
 86        }
 87    }
 88}
 89
 90/// Estimated effort for a task.
 91#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
 92#[serde(rename_all = "snake_case")]
 93pub enum Effort {
 94    Low,
 95    Medium,
 96    High,
 97}
 98
 99impl Effort {
100    fn as_str(self) -> &'static str {
101        match self {
102            Effort::Low => "low",
103            Effort::Medium => "medium",
104            Effort::High => "high",
105        }
106    }
107
108    fn parse(raw: &str) -> Result<Self> {
109        match raw {
110            "low" => Ok(Self::Low),
111            "medium" => Ok(Self::Medium),
112            "high" => Ok(Self::High),
113            _ => bail!("invalid effort '{raw}'"),
114        }
115    }
116
117    pub fn score(self) -> i32 {
118        match self {
119            Effort::Low => 1,
120            Effort::Medium => 2,
121            Effort::High => 3,
122        }
123    }
124}
125
126/// A stable task identifier backed by a ULID.
127#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize)]
128#[serde(transparent)]
129pub struct TaskId(String);
130
131impl TaskId {
132    pub fn new(id: Ulid) -> Self {
133        Self(id.to_string())
134    }
135
136    pub fn parse(raw: &str) -> Result<Self> {
137        let id = Ulid::from_string(raw).with_context(|| format!("invalid task id '{raw}'"))?;
138        Ok(Self::new(id))
139    }
140
141    pub fn as_str(&self) -> &str {
142        &self.0
143    }
144
145    pub fn short(&self) -> String {
146        format!("td-{}", &self.0[self.0.len() - 7..])
147    }
148
149    /// Return a display-friendly short ID from a raw ULID string.
150    pub fn display_id(raw: &str) -> String {
151        let n = raw.len();
152        if n > 7 {
153            format!("td-{}", &raw[n - 7..])
154        } else {
155            format!("td-{raw}")
156        }
157    }
158}
159
160impl fmt::Display for TaskId {
161    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
162        write!(f, "{}", self.short())
163    }
164}
165
166/// A task log entry embedded in a task record.
167#[derive(Debug, Clone, Serialize)]
168pub struct LogEntry {
169    pub id: TaskId,
170    pub timestamp: String,
171    pub message: String,
172}
173
174/// Hydrated task data from the CRDT document.
175#[derive(Debug, Clone, Serialize)]
176pub struct Task {
177    pub id: TaskId,
178    pub title: String,
179    pub description: String,
180    #[serde(rename = "type")]
181    pub task_type: String,
182    pub priority: Priority,
183    pub status: Status,
184    pub effort: Effort,
185    pub parent: Option<TaskId>,
186    pub created_at: String,
187    pub updated_at: String,
188    pub deleted_at: Option<String>,
189    pub labels: Vec<String>,
190    pub blockers: Vec<TaskId>,
191    pub logs: Vec<LogEntry>,
192}
193
194/// Result type for partitioning blockers by task state.
195#[derive(Debug, Default, Clone, Serialize)]
196pub struct BlockerPartition {
197    pub open: Vec<TaskId>,
198    pub resolved: Vec<TaskId>,
199}
200
201#[derive(Debug, Default, Clone, Serialize, Deserialize)]
202struct BindingsFile {
203    #[serde(default)]
204    bindings: BTreeMap<String, String>,
205}
206
207/// Storage wrapper around one project's Loro document and disk layout.
208#[derive(Debug, Clone)]
209pub struct Store {
210    root: PathBuf,
211    project: String,
212    doc: LoroDoc,
213}
214
215impl Store {
216    pub fn init(root: &Path, project: &str) -> Result<Self> {
217        validate_project_name(project)?;
218        let project_dir = project_dir(root, project);
219        if project_dir.exists() {
220            bail!("project '{project}' already exists");
221        }
222        fs::create_dir_all(project_dir.join(CHANGES_DIR))?;
223
224        let doc = LoroDoc::new();
225        doc.set_peer_id(load_or_create_device_peer_id(root)?)?;
226        doc.get_map("tasks");
227
228        let meta = doc.get_map("meta");
229        meta.insert("schema_version", SCHEMA_VERSION as i64)?;
230        meta.insert("project_id", Ulid::new().to_string())?;
231        meta.insert("created_at", now_utc())?;
232
233        let snapshot = doc
234            .export(ExportMode::Snapshot)
235            .context("failed to export initial loro snapshot")?;
236        atomic_write_file(&project_dir.join(BASE_FILE), &snapshot)?;
237
238        Ok(Self {
239            root: root.to_path_buf(),
240            project: project.to_string(),
241            doc,
242        })
243    }
244
245    pub fn open(root: &Path, project: &str) -> Result<Self> {
246        validate_project_name(project)?;
247        let project_dir = project_dir(root, project);
248        let base_path = project_dir.join(BASE_FILE);
249
250        if !base_path.exists() {
251            bail!("project '{project}' is not initialized. Run 'td init {project}'");
252        }
253
254        let base = fs::read(&base_path)
255            .with_context(|| format!("failed to read loro snapshot '{}'", base_path.display()))?;
256
257        let doc = LoroDoc::from_snapshot(&base).context("failed to load loro snapshot")?;
258        doc.set_peer_id(load_or_create_device_peer_id(root)?)?;
259
260        let mut deltas = collect_delta_paths(&project_dir)?;
261        deltas.sort_by_key(|path| {
262            path.file_stem()
263                .and_then(|s| s.to_str())
264                .and_then(|s| Ulid::from_string(s).ok())
265        });
266
267        for delta_path in deltas {
268            let bytes = fs::read(&delta_path)
269                .with_context(|| format!("failed to read loro delta '{}'", delta_path.display()))?;
270            if let Err(err) = doc.import(&bytes) {
271                // Tolerate malformed or stale delta files as requested by design.
272                eprintln!(
273                    "warning: skipping unreadable delta '{}': {err}",
274                    delta_path.display()
275                );
276            }
277        }
278
279        Ok(Self {
280            root: root.to_path_buf(),
281            project: project.to_string(),
282            doc,
283        })
284    }
285
286    /// Bootstrap a local project from peer-provided delta bytes.
287    ///
288    /// The incoming delta is imported into a fresh document, validated to
289    /// ensure it carries `meta.project_id`, and then persisted as a base
290    /// snapshot for future opens.
291    pub fn bootstrap_from_peer(root: &Path, project: &str, delta: &[u8]) -> Result<Self> {
292        validate_project_name(project)?;
293        let project_dir = project_dir(root, project);
294        if project_dir.exists() {
295            bail!("project '{project}' already exists");
296        }
297        fs::create_dir_all(project_dir.join(CHANGES_DIR))?;
298
299        let doc = LoroDoc::new();
300        doc.set_peer_id(load_or_create_device_peer_id(root)?)?;
301        doc.import(delta)
302            .context("failed to import bootstrap delta from peer")?;
303        doc.commit();
304
305        read_project_id_from_doc(&doc)
306            .context("bootstrap delta is missing required project identity")?;
307
308        let snapshot = doc
309            .export(ExportMode::Snapshot)
310            .context("failed to export bootstrap loro snapshot")?;
311        atomic_write_file(&project_dir.join(BASE_FILE), &snapshot)?;
312
313        Ok(Self {
314            root: root.to_path_buf(),
315            project: project.to_string(),
316            doc,
317        })
318    }
319
320    pub fn root(&self) -> &Path {
321        &self.root
322    }
323
324    pub fn project_name(&self) -> &str {
325        &self.project
326    }
327
328    pub fn doc(&self) -> &LoroDoc {
329        &self.doc
330    }
331
332    /// Export all current state to a fresh base snapshot.
333    pub fn write_snapshot(&self) -> Result<PathBuf> {
334        let out = project_dir(&self.root, &self.project).join(BASE_FILE);
335        let bytes = self
336            .doc
337            .export(ExportMode::Snapshot)
338            .context("failed to export loro snapshot")?;
339        atomic_write_file(&out, &bytes)?;
340        Ok(out)
341    }
342
343    /// Delete persisted delta files after a fresh snapshot has been written.
344    pub fn purge_deltas(&self) -> Result<usize> {
345        let project_dir = project_dir(&self.root, &self.project);
346        let paths = collect_delta_paths(&project_dir)?;
347        let mut removed = 0usize;
348        for path in paths {
349            fs::remove_file(&path)
350                .with_context(|| format!("failed removing delta '{}'", path.display()))?;
351            removed += 1;
352        }
353        Ok(removed)
354    }
355
356    /// Apply a local mutation and persist only the resulting delta.
357    pub fn apply_and_persist<F>(&self, mutator: F) -> Result<PathBuf>
358    where
359        F: FnOnce(&LoroDoc) -> Result<()>,
360    {
361        let before = self.doc.oplog_vv();
362        mutator(&self.doc)?;
363        self.doc.commit();
364
365        let delta = self
366            .doc
367            .export(ExportMode::updates(&before))
368            .context("failed to export loro update delta")?;
369
370        let filename = format!("{}.loro", Ulid::new());
371        let path = project_dir(&self.root, &self.project)
372            .join(CHANGES_DIR)
373            .join(filename);
374        atomic_write_file(&path, &delta)?;
375        Ok(path)
376    }
377
378    /// Persist pre-built delta bytes (e.g. received from a peer) as a new
379    /// change file without re-exporting from the doc.
380    pub fn save_raw_delta(&self, bytes: &[u8]) -> Result<PathBuf> {
381        let filename = format!("{}.loro", Ulid::new());
382        let path = project_dir(&self.root, &self.project)
383            .join(CHANGES_DIR)
384            .join(filename);
385        atomic_write_file(&path, bytes)?;
386        Ok(path)
387    }
388
389    /// Return hydrated tasks, excluding tombstones.
390    pub fn list_tasks(&self) -> Result<Vec<Task>> {
391        self.list_tasks_inner(false)
392    }
393
394    /// Return hydrated tasks, including tombstoned rows.
395    pub fn list_tasks_unfiltered(&self) -> Result<Vec<Task>> {
396        self.list_tasks_inner(true)
397    }
398
399    /// Find a task by exact ULID string.
400    pub fn get_task(&self, id: &TaskId, include_deleted: bool) -> Result<Option<Task>> {
401        let tasks = if include_deleted {
402            self.list_tasks_unfiltered()?
403        } else {
404            self.list_tasks()?
405        };
406        Ok(tasks.into_iter().find(|task| task.id == *id))
407    }
408
409    fn list_tasks_inner(&self, include_deleted: bool) -> Result<Vec<Task>> {
410        let root = serde_json::to_value(self.doc.get_deep_value())?;
411        let tasks_obj = root
412            .get("tasks")
413            .and_then(Value::as_object)
414            .ok_or_else(|| anyhow!("missing root tasks map"))?;
415
416        let mut tasks = Vec::with_capacity(tasks_obj.len());
417        for (task_id_raw, task_json) in tasks_obj {
418            let task = hydrate_task(task_id_raw, task_json)?;
419            if include_deleted || task.deleted_at.is_none() {
420                tasks.push(task);
421            }
422        }
423
424        tasks.sort_by(|a, b| a.id.as_str().cmp(b.id.as_str()));
425        Ok(tasks)
426    }
427
428    pub fn schema_version(&self) -> Result<u32> {
429        let root = serde_json::to_value(self.doc.get_deep_value())?;
430        let meta = root
431            .get("meta")
432            .and_then(Value::as_object)
433            .ok_or_else(|| anyhow!("missing root meta map"))?;
434        let n = meta
435            .get("schema_version")
436            .and_then(Value::as_u64)
437            .ok_or_else(|| anyhow!("invalid or missing meta.schema_version"))?;
438        Ok(n as u32)
439    }
440}
441
442/// Generate a new task ULID.
443pub fn gen_id() -> TaskId {
444    TaskId::new(Ulid::new())
445}
446
447pub fn parse_status(s: &str) -> Result<Status> {
448    Status::parse(s)
449}
450
451pub fn parse_priority(s: &str) -> Result<Priority> {
452    Priority::parse(s)
453}
454
455pub fn parse_effort(s: &str) -> Result<Effort> {
456    Effort::parse(s)
457}
458
459pub fn status_label(s: Status) -> &'static str {
460    s.as_str()
461}
462
463pub fn priority_label(p: Priority) -> &'static str {
464    p.as_str()
465}
466
467pub fn effort_label(e: Effort) -> &'static str {
468    e.as_str()
469}
470
471pub fn data_root() -> Result<PathBuf> {
472    let home = std::env::var("HOME").context("HOME is not set")?;
473    Ok(PathBuf::from(home).join(".local").join("share").join("td"))
474}
475
476pub fn init(cwd: &Path, project: &str) -> Result<Store> {
477    let root = data_root()?;
478    fs::create_dir_all(root.join(PROJECTS_DIR))?;
479    let store = Store::init(&root, project)?;
480    bind_project(cwd, project)?;
481    Ok(store)
482}
483
484pub fn use_project(cwd: &Path, project: &str) -> Result<()> {
485    let root = data_root()?;
486    validate_project_name(project)?;
487    if !project_dir(&root, project).join(BASE_FILE).exists() {
488        bail!("project '{project}' not found. Run 'td projects' to list known projects");
489    }
490    bind_project(cwd, project)
491}
492
493pub fn open(start: &Path) -> Result<Store> {
494    let root = data_root()?;
495    let explicit = std::env::var(PROJECT_ENV).ok();
496    let project = resolve_project_name(start, &root, explicit.as_deref())?.ok_or_else(|| {
497        anyhow!(
498            "no project selected. Use --project/TD_PROJECT, run 'td use <name>', or run 'td init <name>'"
499        )
500    })?;
501    Store::open(&root, &project)
502}
503
504/// Open the project selected by `--project`/`TD_PROJECT`/bindings if one exists.
505///
506/// Returns `Ok(None)` when no project is selected by any mechanism.
507pub fn try_open(start: &Path) -> Result<Option<Store>> {
508    let root = data_root()?;
509    let explicit = std::env::var(PROJECT_ENV).ok();
510    let Some(project) = resolve_project_name(start, &root, explicit.as_deref())? else {
511        return Ok(None);
512    };
513    Store::open(&root, &project).map(Some)
514}
515
516/// Bootstrap a project from a peer delta and bind the current directory.
517pub fn bootstrap_sync(cwd: &Path, project: &str, delta: &[u8]) -> Result<Store> {
518    let root = data_root()?;
519    fs::create_dir_all(root.join(PROJECTS_DIR))?;
520    validate_project_name(project)?;
521    let store = Store::bootstrap_from_peer(&root, project, delta)?;
522    bind_project(cwd, project)?;
523    Ok(store)
524}
525
526pub fn list_projects() -> Result<Vec<String>> {
527    let root = data_root()?;
528    let mut out = Vec::new();
529    let dir = root.join(PROJECTS_DIR);
530    if !dir.exists() {
531        return Ok(out);
532    }
533
534    for entry in fs::read_dir(dir)? {
535        let path = entry?.path();
536        if !path.is_dir() {
537            continue;
538        }
539        let Some(name) = path.file_name().and_then(|n| n.to_str()) else {
540            continue;
541        };
542        if path.join(BASE_FILE).exists() {
543            out.push(name.to_string());
544        }
545    }
546
547    out.sort();
548    Ok(out)
549}
550
551pub fn resolve_task_id(store: &Store, raw: &str, include_deleted: bool) -> Result<TaskId> {
552    let raw = raw.strip_prefix("td-").unwrap_or(raw);
553
554    if let Ok(id) = TaskId::parse(raw) {
555        if store.get_task(&id, include_deleted)?.is_some() {
556            return Ok(id);
557        }
558    }
559
560    let tasks = if include_deleted {
561        store.list_tasks_unfiltered()?
562    } else {
563        store.list_tasks()?
564    };
565
566    let upper = raw.to_ascii_uppercase();
567    let matches: Vec<TaskId> = tasks
568        .into_iter()
569        .filter(|t| t.id.as_str().ends_with(&upper))
570        .map(|t| t.id)
571        .collect();
572
573    match matches.as_slice() {
574        [] => bail!("task '{raw}' not found"),
575        [id] => Ok(id.clone()),
576        _ => bail!("task reference '{raw}' is ambiguous"),
577    }
578}
579
580pub fn partition_blockers(store: &Store, blockers: &[TaskId]) -> Result<BlockerPartition> {
581    let mut out = BlockerPartition::default();
582    for blocker in blockers {
583        let Some(task) = store.get_task(blocker, true)? else {
584            out.resolved.push(blocker.clone());
585            continue;
586        };
587        if task.status == Status::Closed || task.deleted_at.is_some() {
588            out.resolved.push(blocker.clone());
589        } else {
590            out.open.push(blocker.clone());
591        }
592    }
593    Ok(out)
594}
595
596pub fn insert_task_map(tasks: &LoroMap, task_id: &TaskId) -> Result<LoroMap> {
597    tasks
598        .insert_container(task_id.as_str(), LoroMap::new())
599        .context("failed to create task map")
600}
601
602pub fn get_task_map(tasks: &LoroMap, task_id: &TaskId) -> Result<Option<LoroMap>> {
603    match tasks.get(task_id.as_str()) {
604        Some(ValueOrContainer::Container(Container::Map(map))) => Ok(Some(map)),
605        Some(_) => bail!("task '{}' has invalid container type", task_id.as_str()),
606        None => Ok(None),
607    }
608}
609
610pub fn get_or_create_child_map(parent: &LoroMap, key: &str) -> Result<LoroMap> {
611    parent
612        .get_or_create_container(key, LoroMap::new())
613        .with_context(|| format!("failed to get or create map key '{key}'"))
614}
615
616fn bindings_path(root: &Path) -> PathBuf {
617    root.join(BINDINGS_FILE)
618}
619
620fn resolve_project_name(
621    start: &Path,
622    root: &Path,
623    explicit: Option<&str>,
624) -> Result<Option<String>> {
625    if let Some(project) = explicit {
626        validate_project_name(project)?;
627        return Ok(Some(project.to_string()));
628    }
629
630    let cwd = canonicalize_binding_path(start)?;
631    let bindings = load_bindings(root)?;
632
633    let mut best: Option<(usize, String)> = None;
634    for (raw_path, project) in bindings.bindings {
635        let bound = PathBuf::from(raw_path);
636        if is_prefix_path(&bound, &cwd) {
637            let score = bound.components().count();
638            match &best {
639                Some((best_score, _)) if *best_score >= score => {}
640                _ => best = Some((score, project)),
641            }
642        }
643    }
644
645    if let Some((_, project)) = best {
646        return Ok(Some(project));
647    }
648
649    Ok(None)
650}
651
652fn bind_project(cwd: &Path, project: &str) -> Result<()> {
653    validate_project_name(project)?;
654
655    let root = data_root()?;
656    fs::create_dir_all(&root)?;
657
658    let canonical = canonicalize_binding_path(cwd)?;
659    let mut bindings = load_bindings(&root)?;
660    bindings
661        .bindings
662        .insert(canonical.to_string_lossy().to_string(), project.to_string());
663    save_bindings(&root, &bindings)
664}
665
666fn load_bindings(root: &Path) -> Result<BindingsFile> {
667    let path = bindings_path(root);
668    if !path.exists() {
669        return Ok(BindingsFile::default());
670    }
671    let content = fs::read_to_string(&path)
672        .with_context(|| format!("failed reading bindings from '{}'", path.display()))?;
673    serde_json::from_str(&content)
674        .with_context(|| format!("invalid bindings file '{}'", path.display()))
675}
676
677fn save_bindings(root: &Path, bindings: &BindingsFile) -> Result<()> {
678    let path = bindings_path(root);
679    let bytes = serde_json::to_vec_pretty(bindings)?;
680    atomic_write_file(&path, &bytes)
681}
682
683fn canonicalize_binding_path(path: &Path) -> Result<PathBuf> {
684    fs::canonicalize(path).with_context(|| format!("failed to canonicalize '{}'", path.display()))
685}
686
687fn is_prefix_path(prefix: &Path, target: &Path) -> bool {
688    let mut prefix_components = prefix.components();
689    let mut target_components = target.components();
690
691    loop {
692        match (prefix_components.next(), target_components.next()) {
693            (None, _) => return true,
694            (Some(_), None) => return false,
695            (Some(a), Some(b)) if a == b => continue,
696            _ => return false,
697        }
698    }
699}
700
701fn validate_project_name(name: &str) -> Result<()> {
702    if name.is_empty() {
703        bail!("project name cannot be empty");
704    }
705    if name.contains('/') || name.contains('\\') || name == "." || name == ".." {
706        bail!("invalid project name '{name}'");
707    }
708    if name.chars().any(char::is_control) {
709        bail!("invalid project name '{name}'");
710    }
711    Ok(())
712}
713
714fn read_project_id_from_doc(doc: &LoroDoc) -> Result<String> {
715    let root = serde_json::to_value(doc.get_deep_value())?;
716    root.get("meta")
717        .and_then(|m| m.get("project_id"))
718        .and_then(Value::as_str)
719        .map(str::to_owned)
720        .ok_or_else(|| anyhow!("missing meta.project_id in project doc"))
721}
722
723fn hydrate_task(task_id_raw: &str, value: &Value) -> Result<Task> {
724    let obj = value
725        .as_object()
726        .ok_or_else(|| anyhow!("task '{task_id_raw}' is not an object"))?;
727
728    let id = TaskId::parse(task_id_raw)?;
729
730    let title = get_required_string(obj, "title")?;
731    let description = get_required_string(obj, "description")?;
732    let task_type = get_required_string(obj, "type")?;
733    let status = Status::parse(&get_required_string(obj, "status")?)?;
734    let priority = Priority::parse(&get_required_string(obj, "priority")?)?;
735    let effort = Effort::parse(&get_required_string(obj, "effort")?)?;
736    let parent = match obj.get("parent").and_then(Value::as_str) {
737        Some("") | None => None,
738        Some(raw) => Some(TaskId::parse(raw)?),
739    };
740
741    let created_at = get_required_string(obj, "created_at")?;
742    let updated_at = get_required_string(obj, "updated_at")?;
743    let deleted_at = obj
744        .get("deleted_at")
745        .and_then(Value::as_str)
746        .map(str::to_owned)
747        .filter(|s| !s.is_empty());
748
749    let labels = obj
750        .get("labels")
751        .and_then(Value::as_object)
752        .map(|m| m.keys().cloned().collect())
753        .unwrap_or_default();
754
755    let blockers = obj
756        .get("blockers")
757        .and_then(Value::as_object)
758        .map(|m| {
759            m.keys()
760                .map(|raw| TaskId::parse(raw))
761                .collect::<Result<Vec<_>>>()
762        })
763        .transpose()?
764        .unwrap_or_default();
765
766    let mut logs = obj
767        .get("logs")
768        .and_then(Value::as_object)
769        .map(|logs| {
770            logs.iter()
771                .map(|(log_id_raw, payload)| {
772                    let payload_obj = payload.as_object().ok_or_else(|| {
773                        anyhow!("log '{log_id_raw}' on task '{task_id_raw}' is not an object")
774                    })?;
775                    Ok(LogEntry {
776                        id: TaskId::parse(log_id_raw)?,
777                        timestamp: get_required_string(payload_obj, "timestamp")?,
778                        message: get_required_string(payload_obj, "message")?,
779                    })
780                })
781                .collect::<Result<Vec<_>>>()
782        })
783        .transpose()?
784        .unwrap_or_default();
785
786    logs.sort_by(|a, b| a.id.as_str().cmp(b.id.as_str()));
787
788    Ok(Task {
789        id,
790        title,
791        description,
792        task_type,
793        priority,
794        status,
795        effort,
796        parent,
797        created_at,
798        updated_at,
799        deleted_at,
800        labels,
801        blockers,
802        logs,
803    })
804}
805
806fn get_required_string(map: &serde_json::Map<String, Value>, key: &str) -> Result<String> {
807    map.get(key)
808        .and_then(Value::as_str)
809        .map(str::to_owned)
810        .ok_or_else(|| anyhow!("missing or non-string key '{key}'"))
811}
812
813fn collect_delta_paths(project_dir: &Path) -> Result<Vec<PathBuf>> {
814    let mut paths = Vec::new();
815    collect_changes_from_dir(&project_dir.join(CHANGES_DIR), &mut paths)?;
816
817    for entry in fs::read_dir(project_dir)? {
818        let path = entry?.path();
819        if !path.is_dir() {
820            continue;
821        }
822        let Some(name) = path.file_name().and_then(|n| n.to_str()) else {
823            continue;
824        };
825        if name.starts_with("changes.compacting.") {
826            collect_changes_from_dir(&path, &mut paths)?;
827        }
828    }
829
830    Ok(paths)
831}
832
833fn collect_changes_from_dir(dir: &Path, out: &mut Vec<PathBuf>) -> Result<()> {
834    if !dir.exists() {
835        return Ok(());
836    }
837
838    for entry in fs::read_dir(dir)? {
839        let path = entry?.path();
840        if !path.is_file() {
841            continue;
842        }
843
844        let Some(filename) = path.file_name().and_then(|n| n.to_str()) else {
845            continue;
846        };
847        if filename.ends_with(TMP_SUFFIX) || !filename.ends_with(".loro") {
848            continue;
849        }
850
851        let Some(stem) = path.file_stem().and_then(|s| s.to_str()) else {
852            continue;
853        };
854        if Ulid::from_string(stem).is_err() {
855            continue;
856        }
857
858        out.push(path);
859    }
860
861    Ok(())
862}
863
864fn project_dir(root: &Path, project: &str) -> PathBuf {
865    root.join(PROJECTS_DIR).join(project)
866}
867
868fn load_or_create_device_peer_id(root: &Path) -> Result<PeerID> {
869    let path = root.join("device_id");
870    if let Some(parent) = path.parent() {
871        fs::create_dir_all(parent)?;
872    }
873
874    let device_ulid = if path.exists() {
875        let content = fs::read_to_string(&path)
876            .with_context(|| format!("failed reading device id from '{}'", path.display()))?;
877        Ulid::from_string(content.trim()).context("invalid persisted device id ULID")?
878    } else {
879        let id = Ulid::new();
880        atomic_write_file(&path, id.to_string().as_bytes())?;
881        id
882    };
883
884    let raw: u128 = device_ulid.into();
885    Ok((raw & u64::MAX as u128) as u64)
886}
887
888fn atomic_write_file(path: &Path, bytes: &[u8]) -> Result<()> {
889    let parent = path
890        .parent()
891        .ok_or_else(|| anyhow!("cannot atomically write root path '{}'", path.display()))?;
892    fs::create_dir_all(parent)?;
893
894    let tmp_name = format!(
895        "{}.{}{}",
896        path.file_name().and_then(|n| n.to_str()).unwrap_or("write"),
897        Ulid::new(),
898        TMP_SUFFIX
899    );
900    let tmp_path = parent.join(tmp_name);
901
902    {
903        let mut file = OpenOptions::new()
904            .create_new(true)
905            .write(true)
906            .open(&tmp_path)
907            .with_context(|| format!("failed to open temp file '{}'", tmp_path.display()))?;
908        file.write_all(bytes)?;
909        file.sync_all()?;
910    }
911
912    fs::rename(&tmp_path, path).with_context(|| {
913        format!(
914            "failed to atomically rename '{}' to '{}'",
915            tmp_path.display(),
916            path.display()
917        )
918    })?;
919
920    sync_dir(parent)?;
921    Ok(())
922}
923
924fn sync_dir(path: &Path) -> Result<()> {
925    let dir =
926        File::open(path).with_context(|| format!("failed opening dir '{}'", path.display()))?;
927    dir.sync_all()
928        .with_context(|| format!("failed fsync on dir '{}'", path.display()))?;
929    Ok(())
930}