1use anyhow::{anyhow, bail, Context, Result};
2use loro::{ExportMode, LoroDoc, PeerID};
3use serde::Serialize;
4use serde_json::Value;
5use std::fmt;
6use std::fs::{self, File, OpenOptions};
7use std::io::Write;
8use std::path::{Path, PathBuf};
9use ulid::Ulid;
10
11const TD_DIR: &str = ".td";
12const PROJECTS_DIR: &str = "projects";
13const CHANGES_DIR: &str = "changes";
14const BASE_FILE: &str = "base.loro";
15const TMP_SUFFIX: &str = ".tmp";
16const SCHEMA_VERSION: u32 = 1;
17
18/// Current UTC time in ISO 8601 format.
19pub fn now_utc() -> String {
20 chrono::Utc::now().format("%Y-%m-%dT%H:%M:%SZ").to_string()
21}
22
23/// Lifecycle state for a task.
24#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
25#[serde(rename_all = "snake_case")]
26pub enum Status {
27 Open,
28 InProgress,
29 Closed,
30}
31
32impl Status {
33 fn as_str(self) -> &'static str {
34 match self {
35 Status::Open => "open",
36 Status::InProgress => "in_progress",
37 Status::Closed => "closed",
38 }
39 }
40
41 fn parse(raw: &str) -> Result<Self> {
42 match raw {
43 "open" => Ok(Self::Open),
44 "in_progress" => Ok(Self::InProgress),
45 "closed" => Ok(Self::Closed),
46 _ => bail!("invalid status '{raw}'"),
47 }
48 }
49}
50
51/// Priority for task ordering.
52#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
53#[serde(rename_all = "snake_case")]
54pub enum Priority {
55 High,
56 Medium,
57 Low,
58}
59
60impl Priority {
61 fn as_str(self) -> &'static str {
62 match self {
63 Priority::High => "high",
64 Priority::Medium => "medium",
65 Priority::Low => "low",
66 }
67 }
68
69 fn parse(raw: &str) -> Result<Self> {
70 match raw {
71 "high" => Ok(Self::High),
72 "medium" => Ok(Self::Medium),
73 "low" => Ok(Self::Low),
74 _ => bail!("invalid priority '{raw}'"),
75 }
76 }
77}
78
79/// Estimated effort for a task.
80#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
81#[serde(rename_all = "snake_case")]
82pub enum Effort {
83 Low,
84 Medium,
85 High,
86}
87
88impl Effort {
89 fn as_str(self) -> &'static str {
90 match self {
91 Effort::Low => "low",
92 Effort::Medium => "medium",
93 Effort::High => "high",
94 }
95 }
96
97 fn parse(raw: &str) -> Result<Self> {
98 match raw {
99 "low" => Ok(Self::Low),
100 "medium" => Ok(Self::Medium),
101 "high" => Ok(Self::High),
102 _ => bail!("invalid effort '{raw}'"),
103 }
104 }
105}
106
107/// A stable task identifier backed by a ULID.
108#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)]
109#[serde(transparent)]
110pub struct TaskId(String);
111
112impl TaskId {
113 pub fn new(id: Ulid) -> Self {
114 Self(id.to_string())
115 }
116
117 pub fn parse(raw: &str) -> Result<Self> {
118 let id = Ulid::from_string(raw).with_context(|| format!("invalid task id '{raw}'"))?;
119 Ok(Self::new(id))
120 }
121
122 pub fn as_str(&self) -> &str {
123 &self.0
124 }
125
126 pub fn short(&self) -> String {
127 self.0.chars().take(7).collect()
128 }
129}
130
131impl fmt::Display for TaskId {
132 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
133 write!(f, "{}", self.short())
134 }
135}
136
137/// A task log entry embedded in a task record.
138#[derive(Debug, Clone, Serialize)]
139pub struct LogEntry {
140 pub id: TaskId,
141 pub timestamp: String,
142 pub message: String,
143}
144
145/// Hydrated task data from the CRDT document.
146#[derive(Debug, Clone, Serialize)]
147pub struct Task {
148 pub id: TaskId,
149 pub title: String,
150 pub description: String,
151 #[serde(rename = "type")]
152 pub task_type: String,
153 pub priority: Priority,
154 pub status: Status,
155 pub effort: Effort,
156 pub parent: Option<TaskId>,
157 pub created_at: String,
158 pub updated_at: String,
159 pub deleted_at: Option<String>,
160 pub labels: Vec<String>,
161 pub blockers: Vec<TaskId>,
162 pub logs: Vec<LogEntry>,
163}
164
165/// Result type for partitioning blockers by task state.
166#[derive(Debug, Default, Clone, Serialize)]
167pub struct BlockerPartition {
168 pub open: Vec<TaskId>,
169 pub resolved: Vec<TaskId>,
170}
171
172/// Storage wrapper around one project's Loro document and disk layout.
173#[derive(Debug, Clone)]
174pub struct Store {
175 root: PathBuf,
176 project: String,
177 doc: LoroDoc,
178}
179
180impl Store {
181 /// Create a new store rooted at the current project path.
182 pub fn init(root: &Path) -> Result<Self> {
183 let project = project_name(root)?;
184 let project_dir = project_dir(root, &project);
185 fs::create_dir_all(project_dir.join(CHANGES_DIR))?;
186
187 let doc = LoroDoc::new();
188 let peer_id = load_or_create_device_peer_id()?;
189 doc.set_peer_id(peer_id)?;
190
191 doc.get_map("tasks");
192 let meta = doc.get_map("meta");
193 meta.insert("schema_version", SCHEMA_VERSION as i64)?;
194 meta.insert("project_id", Ulid::new().to_string())?;
195 meta.insert("created_at", now_utc())?;
196
197 let snapshot = doc
198 .export(ExportMode::Snapshot)
199 .context("failed to export initial loro snapshot")?;
200 atomic_write_file(&project_dir.join(BASE_FILE), &snapshot)?;
201
202 Ok(Self {
203 root: root.to_path_buf(),
204 project,
205 doc,
206 })
207 }
208
209 /// Open an existing store and replay deltas.
210 pub fn open(root: &Path) -> Result<Self> {
211 let project = project_name(root)?;
212 let project_dir = project_dir(root, &project);
213 let base_path = project_dir.join(BASE_FILE);
214
215 if !base_path.exists() {
216 bail!("not initialized. Run 'td init'");
217 }
218
219 let base = fs::read(&base_path)
220 .with_context(|| format!("failed to read loro snapshot '{}'", base_path.display()))?;
221
222 let doc = LoroDoc::from_snapshot(&base).context("failed to load loro snapshot")?;
223 doc.set_peer_id(load_or_create_device_peer_id()?)?;
224
225 let mut deltas = collect_delta_paths(&project_dir)?;
226 deltas.sort_by_key(|path| {
227 path.file_stem()
228 .and_then(|s| s.to_str())
229 .and_then(|s| Ulid::from_string(s).ok())
230 });
231
232 for delta_path in deltas {
233 let bytes = fs::read(&delta_path)
234 .with_context(|| format!("failed to read loro delta '{}'", delta_path.display()))?;
235 doc.import(&bytes).with_context(|| {
236 format!("failed to import loro delta '{}'", delta_path.display())
237 })?;
238 }
239
240 Ok(Self {
241 root: root.to_path_buf(),
242 project,
243 doc,
244 })
245 }
246
247 pub fn root(&self) -> &Path {
248 &self.root
249 }
250
251 pub fn project_name(&self) -> &str {
252 &self.project
253 }
254
255 pub fn doc(&self) -> &LoroDoc {
256 &self.doc
257 }
258
259 /// Export all current state to a fresh base snapshot.
260 pub fn write_snapshot(&self) -> Result<PathBuf> {
261 let out = project_dir(&self.root, &self.project).join(BASE_FILE);
262 let bytes = self
263 .doc
264 .export(ExportMode::Snapshot)
265 .context("failed to export loro snapshot")?;
266 atomic_write_file(&out, &bytes)?;
267 Ok(out)
268 }
269
270 /// Apply a local mutation and persist only the resulting delta.
271 pub fn apply_and_persist<F>(&self, mutator: F) -> Result<PathBuf>
272 where
273 F: FnOnce(&LoroDoc) -> Result<()>,
274 {
275 let before = self.doc.oplog_vv();
276 mutator(&self.doc)?;
277 self.doc.commit();
278
279 let delta = self
280 .doc
281 .export(ExportMode::updates(&before))
282 .context("failed to export loro update delta")?;
283
284 let filename = format!("{}.loro", Ulid::new());
285 let path = project_dir(&self.root, &self.project)
286 .join(CHANGES_DIR)
287 .join(filename);
288 atomic_write_file(&path, &delta)?;
289 Ok(path)
290 }
291
292 /// Return hydrated tasks, excluding tombstones.
293 pub fn list_tasks(&self) -> Result<Vec<Task>> {
294 self.list_tasks_inner(false)
295 }
296
297 /// Return hydrated tasks, including tombstoned rows.
298 pub fn list_tasks_unfiltered(&self) -> Result<Vec<Task>> {
299 self.list_tasks_inner(true)
300 }
301
302 /// Find a task by exact ULID string.
303 pub fn get_task(&self, id: &TaskId, include_deleted: bool) -> Result<Option<Task>> {
304 let tasks = if include_deleted {
305 self.list_tasks_unfiltered()?
306 } else {
307 self.list_tasks()?
308 };
309 Ok(tasks.into_iter().find(|task| task.id == *id))
310 }
311
312 fn list_tasks_inner(&self, include_deleted: bool) -> Result<Vec<Task>> {
313 let root = serde_json::to_value(self.doc.get_deep_value())?;
314 let tasks_obj = root
315 .get("tasks")
316 .and_then(Value::as_object)
317 .ok_or_else(|| anyhow!("missing root tasks map"))?;
318
319 let mut tasks = Vec::with_capacity(tasks_obj.len());
320 for (task_id_raw, task_json) in tasks_obj {
321 let task = hydrate_task(task_id_raw, task_json)?;
322 if include_deleted || task.deleted_at.is_none() {
323 tasks.push(task);
324 }
325 }
326
327 tasks.sort_by(|a, b| a.id.as_str().cmp(b.id.as_str()));
328 Ok(tasks)
329 }
330
331 /// Return current schema version from root meta map.
332 pub fn schema_version(&self) -> Result<u32> {
333 let root = serde_json::to_value(self.doc.get_deep_value())?;
334 let meta = root
335 .get("meta")
336 .and_then(Value::as_object)
337 .ok_or_else(|| anyhow!("missing root meta map"))?;
338 let n = meta
339 .get("schema_version")
340 .and_then(Value::as_u64)
341 .ok_or_else(|| anyhow!("invalid or missing meta.schema_version"))?;
342 Ok(n as u32)
343 }
344}
345
346/// Generate a new task ULID.
347pub fn gen_id() -> TaskId {
348 TaskId::new(Ulid::new())
349}
350
351/// Parse a priority string value.
352pub fn parse_priority(s: &str) -> Result<Priority> {
353 Priority::parse(s)
354}
355
356/// Parse an effort string value.
357pub fn parse_effort(s: &str) -> Result<Effort> {
358 Effort::parse(s)
359}
360
361/// Convert a priority value to its storage label.
362pub fn priority_label(p: Priority) -> &'static str {
363 p.as_str()
364}
365
366/// Convert an effort value to its storage label.
367pub fn effort_label(e: Effort) -> &'static str {
368 e.as_str()
369}
370
371/// Walk up from `start` looking for a `.td/` directory.
372pub fn find_root(start: &Path) -> Result<PathBuf> {
373 let mut dir = start.to_path_buf();
374 loop {
375 if dir.join(TD_DIR).is_dir() {
376 return Ok(dir);
377 }
378 if !dir.pop() {
379 bail!("not initialized. Run 'td init'");
380 }
381 }
382}
383
384/// Return the path to the `.td/` directory under `root`.
385pub fn td_dir(root: &Path) -> PathBuf {
386 root.join(TD_DIR)
387}
388
389/// Initialize on-disk project storage and return the opened store.
390pub fn init(root: &Path) -> Result<Store> {
391 fs::create_dir_all(td_dir(root))?;
392 Store::init(root)
393}
394
395/// Open an existing project's storage.
396pub fn open(root: &Path) -> Result<Store> {
397 Store::open(root)
398}
399
400fn hydrate_task(task_id_raw: &str, value: &Value) -> Result<Task> {
401 let obj = value
402 .as_object()
403 .ok_or_else(|| anyhow!("task '{task_id_raw}' is not an object"))?;
404
405 let id = TaskId::parse(task_id_raw)?;
406
407 let title = get_required_string(obj, "title")?;
408 let description = get_required_string(obj, "description")?;
409 let task_type = get_required_string(obj, "type")?;
410 let status = Status::parse(&get_required_string(obj, "status")?)?;
411 let priority = Priority::parse(&get_required_string(obj, "priority")?)?;
412 let effort = Effort::parse(&get_required_string(obj, "effort")?)?;
413 let parent = match obj.get("parent").and_then(Value::as_str) {
414 Some("") | None => None,
415 Some(raw) => Some(TaskId::parse(raw)?),
416 };
417
418 let created_at = get_required_string(obj, "created_at")?;
419 let updated_at = get_required_string(obj, "updated_at")?;
420 let deleted_at = obj
421 .get("deleted_at")
422 .and_then(Value::as_str)
423 .map(str::to_owned)
424 .filter(|s| !s.is_empty());
425
426 let labels = obj
427 .get("labels")
428 .and_then(Value::as_object)
429 .map(|m| m.keys().cloned().collect())
430 .unwrap_or_else(Vec::new);
431
432 let blockers = obj
433 .get("blockers")
434 .and_then(Value::as_object)
435 .map(|m| {
436 m.keys()
437 .map(|raw| TaskId::parse(raw))
438 .collect::<Result<Vec<_>>>()
439 })
440 .transpose()?
441 .unwrap_or_else(Vec::new);
442
443 let mut logs = obj
444 .get("logs")
445 .and_then(Value::as_object)
446 .map(|logs| {
447 logs.iter()
448 .map(|(log_id_raw, payload)| {
449 let payload_obj = payload.as_object().ok_or_else(|| {
450 anyhow!("log '{log_id_raw}' on task '{task_id_raw}' is not an object")
451 })?;
452 Ok(LogEntry {
453 id: TaskId::parse(log_id_raw)?,
454 timestamp: get_required_string(payload_obj, "timestamp")?,
455 message: get_required_string(payload_obj, "message")?,
456 })
457 })
458 .collect::<Result<Vec<_>>>()
459 })
460 .transpose()?
461 .unwrap_or_else(Vec::new);
462
463 logs.sort_by(|a, b| a.id.as_str().cmp(b.id.as_str()));
464
465 Ok(Task {
466 id,
467 title,
468 description,
469 task_type,
470 priority,
471 status,
472 effort,
473 parent,
474 created_at,
475 updated_at,
476 deleted_at,
477 labels,
478 blockers,
479 logs,
480 })
481}
482
483fn get_required_string(map: &serde_json::Map<String, Value>, key: &str) -> Result<String> {
484 map.get(key)
485 .and_then(Value::as_str)
486 .map(str::to_owned)
487 .ok_or_else(|| anyhow!("missing or non-string key '{key}'"))
488}
489
490fn collect_delta_paths(project_dir: &Path) -> Result<Vec<PathBuf>> {
491 let mut paths = Vec::new();
492
493 collect_changes_from_dir(&project_dir.join(CHANGES_DIR), &mut paths)?;
494
495 for entry in fs::read_dir(project_dir)? {
496 let entry = entry?;
497 let path = entry.path();
498 if !path.is_dir() {
499 continue;
500 }
501 let Some(name) = path.file_name().and_then(|n| n.to_str()) else {
502 continue;
503 };
504 if name.starts_with("changes.compacting.") {
505 collect_changes_from_dir(&path, &mut paths)?;
506 }
507 }
508
509 Ok(paths)
510}
511
512fn collect_changes_from_dir(dir: &Path, out: &mut Vec<PathBuf>) -> Result<()> {
513 if !dir.exists() {
514 return Ok(());
515 }
516
517 for entry in fs::read_dir(dir)? {
518 let entry = entry?;
519 let path = entry.path();
520 if !path.is_file() {
521 continue;
522 }
523
524 let Some(filename) = path.file_name().and_then(|n| n.to_str()) else {
525 continue;
526 };
527 if filename.ends_with(TMP_SUFFIX) {
528 continue;
529 }
530 if !filename.ends_with(".loro") {
531 continue;
532 }
533
534 let Some(stem) = path.file_stem().and_then(|s| s.to_str()) else {
535 continue;
536 };
537 if Ulid::from_string(stem).is_err() {
538 continue;
539 }
540
541 out.push(path);
542 }
543
544 Ok(())
545}
546
547fn project_name(root: &Path) -> Result<String> {
548 root.file_name()
549 .and_then(|n| n.to_str())
550 .map(str::to_owned)
551 .ok_or_else(|| {
552 anyhow!(
553 "could not infer project name from path '{}'",
554 root.display()
555 )
556 })
557}
558
559fn project_dir(root: &Path, project: &str) -> PathBuf {
560 td_dir(root).join(PROJECTS_DIR).join(project)
561}
562
563fn load_or_create_device_peer_id() -> Result<PeerID> {
564 let home = std::env::var("HOME").context("HOME is not set")?;
565 let path = PathBuf::from(home)
566 .join(".local")
567 .join("share")
568 .join("td")
569 .join("device_id");
570
571 if let Some(parent) = path.parent() {
572 fs::create_dir_all(parent)?;
573 }
574
575 let device_ulid = if path.exists() {
576 let content = fs::read_to_string(&path)
577 .with_context(|| format!("failed reading device id from '{}'", path.display()))?;
578 Ulid::from_string(content.trim()).context("invalid persisted device id ULID")?
579 } else {
580 let id = Ulid::new();
581 atomic_write_file(&path, id.to_string().as_bytes())?;
582 id
583 };
584
585 Ok((device_ulid.to_u128() & u64::MAX as u128) as u64)
586}
587
588fn atomic_write_file(path: &Path, bytes: &[u8]) -> Result<()> {
589 let parent = path
590 .parent()
591 .ok_or_else(|| anyhow!("cannot atomically write root path '{}'", path.display()))?;
592 fs::create_dir_all(parent)?;
593
594 let tmp_name = format!(
595 "{}.{}{}",
596 path.file_name().and_then(|n| n.to_str()).unwrap_or("write"),
597 Ulid::new(),
598 TMP_SUFFIX
599 );
600 let tmp_path = parent.join(tmp_name);
601
602 {
603 let mut file = OpenOptions::new()
604 .create_new(true)
605 .write(true)
606 .open(&tmp_path)
607 .with_context(|| format!("failed to open temp file '{}'", tmp_path.display()))?;
608 file.write_all(bytes)?;
609 file.sync_all()?;
610 }
611
612 fs::rename(&tmp_path, path).with_context(|| {
613 format!(
614 "failed to atomically rename '{}' to '{}'",
615 tmp_path.display(),
616 path.display()
617 )
618 })?;
619
620 sync_dir(parent)?;
621 Ok(())
622}
623
624fn sync_dir(path: &Path) -> Result<()> {
625 let dir =
626 File::open(path).with_context(|| format!("failed opening dir '{}'", path.display()))?;
627 dir.sync_all()
628 .with_context(|| format!("failed fsync on dir '{}'", path.display()))?;
629 Ok(())
630}