db.rs

  1use anyhow::{anyhow, bail, Context, Result};
  2use loro::{ExportMode, LoroDoc, PeerID};
  3use serde::Serialize;
  4use serde_json::Value;
  5use std::fmt;
  6use std::fs::{self, File, OpenOptions};
  7use std::io::Write;
  8use std::path::{Path, PathBuf};
  9use ulid::Ulid;
 10
 11const TD_DIR: &str = ".td";
 12const PROJECTS_DIR: &str = "projects";
 13const CHANGES_DIR: &str = "changes";
 14const BASE_FILE: &str = "base.loro";
 15const TMP_SUFFIX: &str = ".tmp";
 16const SCHEMA_VERSION: u32 = 1;
 17
 18/// Current UTC time in ISO 8601 format.
 19pub fn now_utc() -> String {
 20    chrono::Utc::now().format("%Y-%m-%dT%H:%M:%SZ").to_string()
 21}
 22
 23/// Lifecycle state for a task.
 24#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
 25#[serde(rename_all = "snake_case")]
 26pub enum Status {
 27    Open,
 28    InProgress,
 29    Closed,
 30}
 31
 32impl Status {
 33    fn as_str(self) -> &'static str {
 34        match self {
 35            Status::Open => "open",
 36            Status::InProgress => "in_progress",
 37            Status::Closed => "closed",
 38        }
 39    }
 40
 41    fn parse(raw: &str) -> Result<Self> {
 42        match raw {
 43            "open" => Ok(Self::Open),
 44            "in_progress" => Ok(Self::InProgress),
 45            "closed" => Ok(Self::Closed),
 46            _ => bail!("invalid status '{raw}'"),
 47        }
 48    }
 49}
 50
 51/// Priority for task ordering.
 52#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
 53#[serde(rename_all = "snake_case")]
 54pub enum Priority {
 55    High,
 56    Medium,
 57    Low,
 58}
 59
 60impl Priority {
 61    fn as_str(self) -> &'static str {
 62        match self {
 63            Priority::High => "high",
 64            Priority::Medium => "medium",
 65            Priority::Low => "low",
 66        }
 67    }
 68
 69    fn parse(raw: &str) -> Result<Self> {
 70        match raw {
 71            "high" => Ok(Self::High),
 72            "medium" => Ok(Self::Medium),
 73            "low" => Ok(Self::Low),
 74            _ => bail!("invalid priority '{raw}'"),
 75        }
 76    }
 77}
 78
 79/// Estimated effort for a task.
 80#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
 81#[serde(rename_all = "snake_case")]
 82pub enum Effort {
 83    Low,
 84    Medium,
 85    High,
 86}
 87
 88impl Effort {
 89    fn as_str(self) -> &'static str {
 90        match self {
 91            Effort::Low => "low",
 92            Effort::Medium => "medium",
 93            Effort::High => "high",
 94        }
 95    }
 96
 97    fn parse(raw: &str) -> Result<Self> {
 98        match raw {
 99            "low" => Ok(Self::Low),
100            "medium" => Ok(Self::Medium),
101            "high" => Ok(Self::High),
102            _ => bail!("invalid effort '{raw}'"),
103        }
104    }
105}
106
107/// A stable task identifier backed by a ULID.
108#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)]
109#[serde(transparent)]
110pub struct TaskId(String);
111
112impl TaskId {
113    pub fn new(id: Ulid) -> Self {
114        Self(id.to_string())
115    }
116
117    pub fn parse(raw: &str) -> Result<Self> {
118        let id = Ulid::from_string(raw).with_context(|| format!("invalid task id '{raw}'"))?;
119        Ok(Self::new(id))
120    }
121
122    pub fn as_str(&self) -> &str {
123        &self.0
124    }
125
126    pub fn short(&self) -> String {
127        self.0.chars().take(7).collect()
128    }
129}
130
131impl fmt::Display for TaskId {
132    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
133        write!(f, "{}", self.short())
134    }
135}
136
137/// A task log entry embedded in a task record.
138#[derive(Debug, Clone, Serialize)]
139pub struct LogEntry {
140    pub id: TaskId,
141    pub timestamp: String,
142    pub message: String,
143}
144
145/// Hydrated task data from the CRDT document.
146#[derive(Debug, Clone, Serialize)]
147pub struct Task {
148    pub id: TaskId,
149    pub title: String,
150    pub description: String,
151    #[serde(rename = "type")]
152    pub task_type: String,
153    pub priority: Priority,
154    pub status: Status,
155    pub effort: Effort,
156    pub parent: Option<TaskId>,
157    pub created_at: String,
158    pub updated_at: String,
159    pub deleted_at: Option<String>,
160    pub labels: Vec<String>,
161    pub blockers: Vec<TaskId>,
162    pub logs: Vec<LogEntry>,
163}
164
165/// Result type for partitioning blockers by task state.
166#[derive(Debug, Default, Clone, Serialize)]
167pub struct BlockerPartition {
168    pub open: Vec<TaskId>,
169    pub resolved: Vec<TaskId>,
170}
171
172/// Storage wrapper around one project's Loro document and disk layout.
173#[derive(Debug, Clone)]
174pub struct Store {
175    root: PathBuf,
176    project: String,
177    doc: LoroDoc,
178}
179
180impl Store {
181    /// Create a new store rooted at the current project path.
182    pub fn init(root: &Path) -> Result<Self> {
183        let project = project_name(root)?;
184        let project_dir = project_dir(root, &project);
185        fs::create_dir_all(project_dir.join(CHANGES_DIR))?;
186
187        let doc = LoroDoc::new();
188        let peer_id = load_or_create_device_peer_id()?;
189        doc.set_peer_id(peer_id)?;
190
191        doc.get_map("tasks");
192        let meta = doc.get_map("meta");
193        meta.insert("schema_version", SCHEMA_VERSION as i64)?;
194        meta.insert("project_id", Ulid::new().to_string())?;
195        meta.insert("created_at", now_utc())?;
196
197        let snapshot = doc
198            .export(ExportMode::Snapshot)
199            .context("failed to export initial loro snapshot")?;
200        atomic_write_file(&project_dir.join(BASE_FILE), &snapshot)?;
201
202        Ok(Self {
203            root: root.to_path_buf(),
204            project,
205            doc,
206        })
207    }
208
209    /// Open an existing store and replay deltas.
210    pub fn open(root: &Path) -> Result<Self> {
211        let project = project_name(root)?;
212        let project_dir = project_dir(root, &project);
213        let base_path = project_dir.join(BASE_FILE);
214
215        if !base_path.exists() {
216            bail!("not initialized. Run 'td init'");
217        }
218
219        let base = fs::read(&base_path)
220            .with_context(|| format!("failed to read loro snapshot '{}'", base_path.display()))?;
221
222        let doc = LoroDoc::from_snapshot(&base).context("failed to load loro snapshot")?;
223        doc.set_peer_id(load_or_create_device_peer_id()?)?;
224
225        let mut deltas = collect_delta_paths(&project_dir)?;
226        deltas.sort_by_key(|path| {
227            path.file_stem()
228                .and_then(|s| s.to_str())
229                .and_then(|s| Ulid::from_string(s).ok())
230        });
231
232        for delta_path in deltas {
233            let bytes = fs::read(&delta_path)
234                .with_context(|| format!("failed to read loro delta '{}'", delta_path.display()))?;
235            doc.import(&bytes).with_context(|| {
236                format!("failed to import loro delta '{}'", delta_path.display())
237            })?;
238        }
239
240        Ok(Self {
241            root: root.to_path_buf(),
242            project,
243            doc,
244        })
245    }
246
247    pub fn root(&self) -> &Path {
248        &self.root
249    }
250
251    pub fn project_name(&self) -> &str {
252        &self.project
253    }
254
255    pub fn doc(&self) -> &LoroDoc {
256        &self.doc
257    }
258
259    /// Export all current state to a fresh base snapshot.
260    pub fn write_snapshot(&self) -> Result<PathBuf> {
261        let out = project_dir(&self.root, &self.project).join(BASE_FILE);
262        let bytes = self
263            .doc
264            .export(ExportMode::Snapshot)
265            .context("failed to export loro snapshot")?;
266        atomic_write_file(&out, &bytes)?;
267        Ok(out)
268    }
269
270    /// Apply a local mutation and persist only the resulting delta.
271    pub fn apply_and_persist<F>(&self, mutator: F) -> Result<PathBuf>
272    where
273        F: FnOnce(&LoroDoc) -> Result<()>,
274    {
275        let before = self.doc.oplog_vv();
276        mutator(&self.doc)?;
277        self.doc.commit();
278
279        let delta = self
280            .doc
281            .export(ExportMode::updates(&before))
282            .context("failed to export loro update delta")?;
283
284        let filename = format!("{}.loro", Ulid::new());
285        let path = project_dir(&self.root, &self.project)
286            .join(CHANGES_DIR)
287            .join(filename);
288        atomic_write_file(&path, &delta)?;
289        Ok(path)
290    }
291
292    /// Return hydrated tasks, excluding tombstones.
293    pub fn list_tasks(&self) -> Result<Vec<Task>> {
294        self.list_tasks_inner(false)
295    }
296
297    /// Return hydrated tasks, including tombstoned rows.
298    pub fn list_tasks_unfiltered(&self) -> Result<Vec<Task>> {
299        self.list_tasks_inner(true)
300    }
301
302    /// Find a task by exact ULID string.
303    pub fn get_task(&self, id: &TaskId, include_deleted: bool) -> Result<Option<Task>> {
304        let tasks = if include_deleted {
305            self.list_tasks_unfiltered()?
306        } else {
307            self.list_tasks()?
308        };
309        Ok(tasks.into_iter().find(|task| task.id == *id))
310    }
311
312    fn list_tasks_inner(&self, include_deleted: bool) -> Result<Vec<Task>> {
313        let root = serde_json::to_value(self.doc.get_deep_value())?;
314        let tasks_obj = root
315            .get("tasks")
316            .and_then(Value::as_object)
317            .ok_or_else(|| anyhow!("missing root tasks map"))?;
318
319        let mut tasks = Vec::with_capacity(tasks_obj.len());
320        for (task_id_raw, task_json) in tasks_obj {
321            let task = hydrate_task(task_id_raw, task_json)?;
322            if include_deleted || task.deleted_at.is_none() {
323                tasks.push(task);
324            }
325        }
326
327        tasks.sort_by(|a, b| a.id.as_str().cmp(b.id.as_str()));
328        Ok(tasks)
329    }
330
331    /// Return current schema version from root meta map.
332    pub fn schema_version(&self) -> Result<u32> {
333        let root = serde_json::to_value(self.doc.get_deep_value())?;
334        let meta = root
335            .get("meta")
336            .and_then(Value::as_object)
337            .ok_or_else(|| anyhow!("missing root meta map"))?;
338        let n = meta
339            .get("schema_version")
340            .and_then(Value::as_u64)
341            .ok_or_else(|| anyhow!("invalid or missing meta.schema_version"))?;
342        Ok(n as u32)
343    }
344}
345
346/// Generate a new task ULID.
347pub fn gen_id() -> TaskId {
348    TaskId::new(Ulid::new())
349}
350
351/// Parse a priority string value.
352pub fn parse_priority(s: &str) -> Result<Priority> {
353    Priority::parse(s)
354}
355
356/// Parse an effort string value.
357pub fn parse_effort(s: &str) -> Result<Effort> {
358    Effort::parse(s)
359}
360
361/// Convert a priority value to its storage label.
362pub fn priority_label(p: Priority) -> &'static str {
363    p.as_str()
364}
365
366/// Convert an effort value to its storage label.
367pub fn effort_label(e: Effort) -> &'static str {
368    e.as_str()
369}
370
371/// Walk up from `start` looking for a `.td/` directory.
372pub fn find_root(start: &Path) -> Result<PathBuf> {
373    let mut dir = start.to_path_buf();
374    loop {
375        if dir.join(TD_DIR).is_dir() {
376            return Ok(dir);
377        }
378        if !dir.pop() {
379            bail!("not initialized. Run 'td init'");
380        }
381    }
382}
383
384/// Return the path to the `.td/` directory under `root`.
385pub fn td_dir(root: &Path) -> PathBuf {
386    root.join(TD_DIR)
387}
388
389/// Initialize on-disk project storage and return the opened store.
390pub fn init(root: &Path) -> Result<Store> {
391    fs::create_dir_all(td_dir(root))?;
392    Store::init(root)
393}
394
395/// Open an existing project's storage.
396pub fn open(root: &Path) -> Result<Store> {
397    Store::open(root)
398}
399
400fn hydrate_task(task_id_raw: &str, value: &Value) -> Result<Task> {
401    let obj = value
402        .as_object()
403        .ok_or_else(|| anyhow!("task '{task_id_raw}' is not an object"))?;
404
405    let id = TaskId::parse(task_id_raw)?;
406
407    let title = get_required_string(obj, "title")?;
408    let description = get_required_string(obj, "description")?;
409    let task_type = get_required_string(obj, "type")?;
410    let status = Status::parse(&get_required_string(obj, "status")?)?;
411    let priority = Priority::parse(&get_required_string(obj, "priority")?)?;
412    let effort = Effort::parse(&get_required_string(obj, "effort")?)?;
413    let parent = match obj.get("parent").and_then(Value::as_str) {
414        Some("") | None => None,
415        Some(raw) => Some(TaskId::parse(raw)?),
416    };
417
418    let created_at = get_required_string(obj, "created_at")?;
419    let updated_at = get_required_string(obj, "updated_at")?;
420    let deleted_at = obj
421        .get("deleted_at")
422        .and_then(Value::as_str)
423        .map(str::to_owned)
424        .filter(|s| !s.is_empty());
425
426    let labels = obj
427        .get("labels")
428        .and_then(Value::as_object)
429        .map(|m| m.keys().cloned().collect())
430        .unwrap_or_else(Vec::new);
431
432    let blockers = obj
433        .get("blockers")
434        .and_then(Value::as_object)
435        .map(|m| {
436            m.keys()
437                .map(|raw| TaskId::parse(raw))
438                .collect::<Result<Vec<_>>>()
439        })
440        .transpose()?
441        .unwrap_or_else(Vec::new);
442
443    let mut logs = obj
444        .get("logs")
445        .and_then(Value::as_object)
446        .map(|logs| {
447            logs.iter()
448                .map(|(log_id_raw, payload)| {
449                    let payload_obj = payload.as_object().ok_or_else(|| {
450                        anyhow!("log '{log_id_raw}' on task '{task_id_raw}' is not an object")
451                    })?;
452                    Ok(LogEntry {
453                        id: TaskId::parse(log_id_raw)?,
454                        timestamp: get_required_string(payload_obj, "timestamp")?,
455                        message: get_required_string(payload_obj, "message")?,
456                    })
457                })
458                .collect::<Result<Vec<_>>>()
459        })
460        .transpose()?
461        .unwrap_or_else(Vec::new);
462
463    logs.sort_by(|a, b| a.id.as_str().cmp(b.id.as_str()));
464
465    Ok(Task {
466        id,
467        title,
468        description,
469        task_type,
470        priority,
471        status,
472        effort,
473        parent,
474        created_at,
475        updated_at,
476        deleted_at,
477        labels,
478        blockers,
479        logs,
480    })
481}
482
483fn get_required_string(map: &serde_json::Map<String, Value>, key: &str) -> Result<String> {
484    map.get(key)
485        .and_then(Value::as_str)
486        .map(str::to_owned)
487        .ok_or_else(|| anyhow!("missing or non-string key '{key}'"))
488}
489
490fn collect_delta_paths(project_dir: &Path) -> Result<Vec<PathBuf>> {
491    let mut paths = Vec::new();
492
493    collect_changes_from_dir(&project_dir.join(CHANGES_DIR), &mut paths)?;
494
495    for entry in fs::read_dir(project_dir)? {
496        let entry = entry?;
497        let path = entry.path();
498        if !path.is_dir() {
499            continue;
500        }
501        let Some(name) = path.file_name().and_then(|n| n.to_str()) else {
502            continue;
503        };
504        if name.starts_with("changes.compacting.") {
505            collect_changes_from_dir(&path, &mut paths)?;
506        }
507    }
508
509    Ok(paths)
510}
511
512fn collect_changes_from_dir(dir: &Path, out: &mut Vec<PathBuf>) -> Result<()> {
513    if !dir.exists() {
514        return Ok(());
515    }
516
517    for entry in fs::read_dir(dir)? {
518        let entry = entry?;
519        let path = entry.path();
520        if !path.is_file() {
521            continue;
522        }
523
524        let Some(filename) = path.file_name().and_then(|n| n.to_str()) else {
525            continue;
526        };
527        if filename.ends_with(TMP_SUFFIX) {
528            continue;
529        }
530        if !filename.ends_with(".loro") {
531            continue;
532        }
533
534        let Some(stem) = path.file_stem().and_then(|s| s.to_str()) else {
535            continue;
536        };
537        if Ulid::from_string(stem).is_err() {
538            continue;
539        }
540
541        out.push(path);
542    }
543
544    Ok(())
545}
546
547fn project_name(root: &Path) -> Result<String> {
548    root.file_name()
549        .and_then(|n| n.to_str())
550        .map(str::to_owned)
551        .ok_or_else(|| {
552            anyhow!(
553                "could not infer project name from path '{}'",
554                root.display()
555            )
556        })
557}
558
559fn project_dir(root: &Path, project: &str) -> PathBuf {
560    td_dir(root).join(PROJECTS_DIR).join(project)
561}
562
563fn load_or_create_device_peer_id() -> Result<PeerID> {
564    let home = std::env::var("HOME").context("HOME is not set")?;
565    let path = PathBuf::from(home)
566        .join(".local")
567        .join("share")
568        .join("td")
569        .join("device_id");
570
571    if let Some(parent) = path.parent() {
572        fs::create_dir_all(parent)?;
573    }
574
575    let device_ulid = if path.exists() {
576        let content = fs::read_to_string(&path)
577            .with_context(|| format!("failed reading device id from '{}'", path.display()))?;
578        Ulid::from_string(content.trim()).context("invalid persisted device id ULID")?
579    } else {
580        let id = Ulid::new();
581        atomic_write_file(&path, id.to_string().as_bytes())?;
582        id
583    };
584
585    Ok((device_ulid.to_u128() & u64::MAX as u128) as u64)
586}
587
588fn atomic_write_file(path: &Path, bytes: &[u8]) -> Result<()> {
589    let parent = path
590        .parent()
591        .ok_or_else(|| anyhow!("cannot atomically write root path '{}'", path.display()))?;
592    fs::create_dir_all(parent)?;
593
594    let tmp_name = format!(
595        "{}.{}{}",
596        path.file_name().and_then(|n| n.to_str()).unwrap_or("write"),
597        Ulid::new(),
598        TMP_SUFFIX
599    );
600    let tmp_path = parent.join(tmp_name);
601
602    {
603        let mut file = OpenOptions::new()
604            .create_new(true)
605            .write(true)
606            .open(&tmp_path)
607            .with_context(|| format!("failed to open temp file '{}'", tmp_path.display()))?;
608        file.write_all(bytes)?;
609        file.sync_all()?;
610    }
611
612    fs::rename(&tmp_path, path).with_context(|| {
613        format!(
614            "failed to atomically rename '{}' to '{}'",
615            tmp_path.display(),
616            path.display()
617        )
618    })?;
619
620    sync_dir(parent)?;
621    Ok(())
622}
623
624fn sync_dir(path: &Path) -> Result<()> {
625    let dir =
626        File::open(path).with_context(|| format!("failed opening dir '{}'", path.display()))?;
627    dir.sync_all()
628        .with_context(|| format!("failed fsync on dir '{}'", path.display()))?;
629    Ok(())
630}