1use anyhow::{anyhow, bail, Context, Result};
2use loro::{Container, ExportMode, LoroDoc, LoroMap, PeerID, ValueOrContainer};
3use serde::{Deserialize, Serialize};
4use serde_json::Value;
5use std::collections::BTreeMap;
6use std::fmt;
7use std::fs::{self, File, OpenOptions};
8use std::io::Write;
9use std::path::{Path, PathBuf};
10use ulid::Ulid;
11
12pub const PROJECT_ENV: &str = "TD_PROJECT";
13
14const PROJECTS_DIR: &str = "projects";
15const CHANGES_DIR: &str = "changes";
16const BINDINGS_FILE: &str = "bindings.json";
17const BASE_FILE: &str = "base.loro";
18const TMP_SUFFIX: &str = ".tmp";
19const SCHEMA_VERSION: u32 = 1;
20
21/// Current UTC time in ISO 8601 format.
22pub fn now_utc() -> String {
23 chrono::Utc::now().format("%Y-%m-%dT%H:%M:%SZ").to_string()
24}
25
26/// Lifecycle state for a task.
27#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
28#[serde(rename_all = "snake_case")]
29pub enum Status {
30 Open,
31 InProgress,
32 Closed,
33}
34
35impl Status {
36 fn as_str(self) -> &'static str {
37 match self {
38 Status::Open => "open",
39 Status::InProgress => "in_progress",
40 Status::Closed => "closed",
41 }
42 }
43
44 fn parse(raw: &str) -> Result<Self> {
45 match raw {
46 "open" => Ok(Self::Open),
47 "in_progress" => Ok(Self::InProgress),
48 "closed" => Ok(Self::Closed),
49 _ => bail!("invalid status '{raw}'"),
50 }
51 }
52}
53
54/// Priority for task ordering.
55#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
56#[serde(rename_all = "snake_case")]
57pub enum Priority {
58 High,
59 Medium,
60 Low,
61}
62
63impl Priority {
64 fn as_str(self) -> &'static str {
65 match self {
66 Priority::High => "high",
67 Priority::Medium => "medium",
68 Priority::Low => "low",
69 }
70 }
71
72 fn parse(raw: &str) -> Result<Self> {
73 match raw {
74 "high" => Ok(Self::High),
75 "medium" => Ok(Self::Medium),
76 "low" => Ok(Self::Low),
77 _ => bail!("invalid priority '{raw}'"),
78 }
79 }
80
81 pub fn score(self) -> i32 {
82 match self {
83 Priority::High => 1,
84 Priority::Medium => 2,
85 Priority::Low => 3,
86 }
87 }
88}
89
90/// Estimated effort for a task.
91#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
92#[serde(rename_all = "snake_case")]
93pub enum Effort {
94 Low,
95 Medium,
96 High,
97}
98
99impl Effort {
100 fn as_str(self) -> &'static str {
101 match self {
102 Effort::Low => "low",
103 Effort::Medium => "medium",
104 Effort::High => "high",
105 }
106 }
107
108 fn parse(raw: &str) -> Result<Self> {
109 match raw {
110 "low" => Ok(Self::Low),
111 "medium" => Ok(Self::Medium),
112 "high" => Ok(Self::High),
113 _ => bail!("invalid effort '{raw}'"),
114 }
115 }
116
117 pub fn score(self) -> i32 {
118 match self {
119 Effort::Low => 1,
120 Effort::Medium => 2,
121 Effort::High => 3,
122 }
123 }
124}
125
126/// A stable task identifier backed by a ULID.
127#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize)]
128#[serde(transparent)]
129pub struct TaskId(String);
130
131impl TaskId {
132 pub fn new(id: Ulid) -> Self {
133 Self(id.to_string())
134 }
135
136 pub fn parse(raw: &str) -> Result<Self> {
137 let id = Ulid::from_string(raw).with_context(|| format!("invalid task id '{raw}'"))?;
138 Ok(Self::new(id))
139 }
140
141 pub fn as_str(&self) -> &str {
142 &self.0
143 }
144
145 pub fn short(&self) -> String {
146 self.0[self.0.len() - 7..].to_string()
147 }
148}
149
150impl fmt::Display for TaskId {
151 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
152 write!(f, "{}", self.short())
153 }
154}
155
156/// A task log entry embedded in a task record.
157#[derive(Debug, Clone, Serialize)]
158pub struct LogEntry {
159 pub id: TaskId,
160 pub timestamp: String,
161 pub message: String,
162}
163
164/// Hydrated task data from the CRDT document.
165#[derive(Debug, Clone, Serialize)]
166pub struct Task {
167 pub id: TaskId,
168 pub title: String,
169 pub description: String,
170 #[serde(rename = "type")]
171 pub task_type: String,
172 pub priority: Priority,
173 pub status: Status,
174 pub effort: Effort,
175 pub parent: Option<TaskId>,
176 pub created_at: String,
177 pub updated_at: String,
178 pub deleted_at: Option<String>,
179 pub labels: Vec<String>,
180 pub blockers: Vec<TaskId>,
181 pub logs: Vec<LogEntry>,
182}
183
184/// Result type for partitioning blockers by task state.
185#[derive(Debug, Default, Clone, Serialize)]
186pub struct BlockerPartition {
187 pub open: Vec<TaskId>,
188 pub resolved: Vec<TaskId>,
189}
190
191#[derive(Debug, Default, Clone, Serialize, Deserialize)]
192struct BindingsFile {
193 #[serde(default)]
194 bindings: BTreeMap<String, String>,
195}
196
197/// Storage wrapper around one project's Loro document and disk layout.
198#[derive(Debug, Clone)]
199pub struct Store {
200 root: PathBuf,
201 project: String,
202 doc: LoroDoc,
203}
204
205impl Store {
206 pub fn init(root: &Path, project: &str) -> Result<Self> {
207 validate_project_name(project)?;
208 let project_dir = project_dir(root, project);
209 if project_dir.exists() {
210 bail!("project '{project}' already exists");
211 }
212 fs::create_dir_all(project_dir.join(CHANGES_DIR))?;
213
214 let doc = LoroDoc::new();
215 doc.set_peer_id(load_or_create_device_peer_id(root)?)?;
216 doc.get_map("tasks");
217
218 let meta = doc.get_map("meta");
219 meta.insert("schema_version", SCHEMA_VERSION as i64)?;
220 meta.insert("project_id", Ulid::new().to_string())?;
221 meta.insert("created_at", now_utc())?;
222
223 let snapshot = doc
224 .export(ExportMode::Snapshot)
225 .context("failed to export initial loro snapshot")?;
226 atomic_write_file(&project_dir.join(BASE_FILE), &snapshot)?;
227
228 Ok(Self {
229 root: root.to_path_buf(),
230 project: project.to_string(),
231 doc,
232 })
233 }
234
235 pub fn open(root: &Path, project: &str) -> Result<Self> {
236 validate_project_name(project)?;
237 let project_dir = project_dir(root, project);
238 let base_path = project_dir.join(BASE_FILE);
239
240 if !base_path.exists() {
241 bail!("project '{project}' is not initialized. Run 'td init {project}'");
242 }
243
244 let base = fs::read(&base_path)
245 .with_context(|| format!("failed to read loro snapshot '{}'", base_path.display()))?;
246
247 let doc = LoroDoc::from_snapshot(&base).context("failed to load loro snapshot")?;
248 doc.set_peer_id(load_or_create_device_peer_id(root)?)?;
249
250 let mut deltas = collect_delta_paths(&project_dir)?;
251 deltas.sort_by_key(|path| {
252 path.file_stem()
253 .and_then(|s| s.to_str())
254 .and_then(|s| Ulid::from_string(s).ok())
255 });
256
257 for delta_path in deltas {
258 let bytes = fs::read(&delta_path)
259 .with_context(|| format!("failed to read loro delta '{}'", delta_path.display()))?;
260 if let Err(err) = doc.import(&bytes) {
261 // Tolerate malformed or stale delta files as requested by design.
262 eprintln!(
263 "warning: skipping unreadable delta '{}': {err}",
264 delta_path.display()
265 );
266 }
267 }
268
269 Ok(Self {
270 root: root.to_path_buf(),
271 project: project.to_string(),
272 doc,
273 })
274 }
275
276 /// Bootstrap a local project from peer-provided delta bytes.
277 ///
278 /// The incoming delta is imported into a fresh document, validated to
279 /// ensure it carries `meta.project_id`, and then persisted as a base
280 /// snapshot for future opens.
281 pub fn bootstrap_from_peer(root: &Path, project: &str, delta: &[u8]) -> Result<Self> {
282 validate_project_name(project)?;
283 let project_dir = project_dir(root, project);
284 if project_dir.exists() {
285 bail!("project '{project}' already exists");
286 }
287 fs::create_dir_all(project_dir.join(CHANGES_DIR))?;
288
289 let doc = LoroDoc::new();
290 doc.set_peer_id(load_or_create_device_peer_id(root)?)?;
291 doc.import(delta)
292 .context("failed to import bootstrap delta from peer")?;
293 doc.commit();
294
295 read_project_id_from_doc(&doc)
296 .context("bootstrap delta is missing required project identity")?;
297
298 let snapshot = doc
299 .export(ExportMode::Snapshot)
300 .context("failed to export bootstrap loro snapshot")?;
301 atomic_write_file(&project_dir.join(BASE_FILE), &snapshot)?;
302
303 Ok(Self {
304 root: root.to_path_buf(),
305 project: project.to_string(),
306 doc,
307 })
308 }
309
310 pub fn root(&self) -> &Path {
311 &self.root
312 }
313
314 pub fn project_name(&self) -> &str {
315 &self.project
316 }
317
318 pub fn doc(&self) -> &LoroDoc {
319 &self.doc
320 }
321
322 /// Export all current state to a fresh base snapshot.
323 pub fn write_snapshot(&self) -> Result<PathBuf> {
324 let out = project_dir(&self.root, &self.project).join(BASE_FILE);
325 let bytes = self
326 .doc
327 .export(ExportMode::Snapshot)
328 .context("failed to export loro snapshot")?;
329 atomic_write_file(&out, &bytes)?;
330 Ok(out)
331 }
332
333 /// Delete persisted delta files after a fresh snapshot has been written.
334 pub fn purge_deltas(&self) -> Result<usize> {
335 let project_dir = project_dir(&self.root, &self.project);
336 let paths = collect_delta_paths(&project_dir)?;
337 let mut removed = 0usize;
338 for path in paths {
339 fs::remove_file(&path)
340 .with_context(|| format!("failed removing delta '{}'", path.display()))?;
341 removed += 1;
342 }
343 Ok(removed)
344 }
345
346 /// Apply a local mutation and persist only the resulting delta.
347 pub fn apply_and_persist<F>(&self, mutator: F) -> Result<PathBuf>
348 where
349 F: FnOnce(&LoroDoc) -> Result<()>,
350 {
351 let before = self.doc.oplog_vv();
352 mutator(&self.doc)?;
353 self.doc.commit();
354
355 let delta = self
356 .doc
357 .export(ExportMode::updates(&before))
358 .context("failed to export loro update delta")?;
359
360 let filename = format!("{}.loro", Ulid::new());
361 let path = project_dir(&self.root, &self.project)
362 .join(CHANGES_DIR)
363 .join(filename);
364 atomic_write_file(&path, &delta)?;
365 Ok(path)
366 }
367
368 /// Persist pre-built delta bytes (e.g. received from a peer) as a new
369 /// change file without re-exporting from the doc.
370 pub fn save_raw_delta(&self, bytes: &[u8]) -> Result<PathBuf> {
371 let filename = format!("{}.loro", Ulid::new());
372 let path = project_dir(&self.root, &self.project)
373 .join(CHANGES_DIR)
374 .join(filename);
375 atomic_write_file(&path, bytes)?;
376 Ok(path)
377 }
378
379 /// Return hydrated tasks, excluding tombstones.
380 pub fn list_tasks(&self) -> Result<Vec<Task>> {
381 self.list_tasks_inner(false)
382 }
383
384 /// Return hydrated tasks, including tombstoned rows.
385 pub fn list_tasks_unfiltered(&self) -> Result<Vec<Task>> {
386 self.list_tasks_inner(true)
387 }
388
389 /// Find a task by exact ULID string.
390 pub fn get_task(&self, id: &TaskId, include_deleted: bool) -> Result<Option<Task>> {
391 let tasks = if include_deleted {
392 self.list_tasks_unfiltered()?
393 } else {
394 self.list_tasks()?
395 };
396 Ok(tasks.into_iter().find(|task| task.id == *id))
397 }
398
399 fn list_tasks_inner(&self, include_deleted: bool) -> Result<Vec<Task>> {
400 let root = serde_json::to_value(self.doc.get_deep_value())?;
401 let tasks_obj = root
402 .get("tasks")
403 .and_then(Value::as_object)
404 .ok_or_else(|| anyhow!("missing root tasks map"))?;
405
406 let mut tasks = Vec::with_capacity(tasks_obj.len());
407 for (task_id_raw, task_json) in tasks_obj {
408 let task = hydrate_task(task_id_raw, task_json)?;
409 if include_deleted || task.deleted_at.is_none() {
410 tasks.push(task);
411 }
412 }
413
414 tasks.sort_by(|a, b| a.id.as_str().cmp(b.id.as_str()));
415 Ok(tasks)
416 }
417
418 pub fn schema_version(&self) -> Result<u32> {
419 let root = serde_json::to_value(self.doc.get_deep_value())?;
420 let meta = root
421 .get("meta")
422 .and_then(Value::as_object)
423 .ok_or_else(|| anyhow!("missing root meta map"))?;
424 let n = meta
425 .get("schema_version")
426 .and_then(Value::as_u64)
427 .ok_or_else(|| anyhow!("invalid or missing meta.schema_version"))?;
428 Ok(n as u32)
429 }
430}
431
432/// Generate a new task ULID.
433pub fn gen_id() -> TaskId {
434 TaskId::new(Ulid::new())
435}
436
437pub fn parse_status(s: &str) -> Result<Status> {
438 Status::parse(s)
439}
440
441pub fn parse_priority(s: &str) -> Result<Priority> {
442 Priority::parse(s)
443}
444
445pub fn parse_effort(s: &str) -> Result<Effort> {
446 Effort::parse(s)
447}
448
449pub fn status_label(s: Status) -> &'static str {
450 s.as_str()
451}
452
453pub fn priority_label(p: Priority) -> &'static str {
454 p.as_str()
455}
456
457pub fn effort_label(e: Effort) -> &'static str {
458 e.as_str()
459}
460
461pub fn data_root() -> Result<PathBuf> {
462 let home = std::env::var("HOME").context("HOME is not set")?;
463 Ok(PathBuf::from(home).join(".local").join("share").join("td"))
464}
465
466pub fn init(cwd: &Path, project: &str) -> Result<Store> {
467 let root = data_root()?;
468 fs::create_dir_all(root.join(PROJECTS_DIR))?;
469 let store = Store::init(&root, project)?;
470 bind_project(cwd, project)?;
471 Ok(store)
472}
473
474pub fn use_project(cwd: &Path, project: &str) -> Result<()> {
475 let root = data_root()?;
476 validate_project_name(project)?;
477 if !project_dir(&root, project).join(BASE_FILE).exists() {
478 bail!("project '{project}' not found. Run 'td projects' to list known projects");
479 }
480 bind_project(cwd, project)
481}
482
483pub fn open(start: &Path) -> Result<Store> {
484 let root = data_root()?;
485 let explicit = std::env::var(PROJECT_ENV).ok();
486 let project = resolve_project_name(start, &root, explicit.as_deref())?.ok_or_else(|| {
487 anyhow!(
488 "no project selected. Use --project/TD_PROJECT, run 'td use <name>', or run 'td init <name>'"
489 )
490 })?;
491 Store::open(&root, &project)
492}
493
494/// Open the project selected by `--project`/`TD_PROJECT`/bindings if one exists.
495///
496/// Returns `Ok(None)` when no project is selected by any mechanism.
497pub fn try_open(start: &Path) -> Result<Option<Store>> {
498 let root = data_root()?;
499 let explicit = std::env::var(PROJECT_ENV).ok();
500 let Some(project) = resolve_project_name(start, &root, explicit.as_deref())? else {
501 return Ok(None);
502 };
503 Store::open(&root, &project).map(Some)
504}
505
506/// Bootstrap a project from a peer delta and bind the current directory.
507pub fn bootstrap_sync(cwd: &Path, project: &str, delta: &[u8]) -> Result<Store> {
508 let root = data_root()?;
509 fs::create_dir_all(root.join(PROJECTS_DIR))?;
510 validate_project_name(project)?;
511 let store = Store::bootstrap_from_peer(&root, project, delta)?;
512 bind_project(cwd, project)?;
513 Ok(store)
514}
515
516pub fn list_projects() -> Result<Vec<String>> {
517 let root = data_root()?;
518 let mut out = Vec::new();
519 let dir = root.join(PROJECTS_DIR);
520 if !dir.exists() {
521 return Ok(out);
522 }
523
524 for entry in fs::read_dir(dir)? {
525 let path = entry?.path();
526 if !path.is_dir() {
527 continue;
528 }
529 let Some(name) = path.file_name().and_then(|n| n.to_str()) else {
530 continue;
531 };
532 if path.join(BASE_FILE).exists() {
533 out.push(name.to_string());
534 }
535 }
536
537 out.sort();
538 Ok(out)
539}
540
541pub fn resolve_task_id(store: &Store, raw: &str, include_deleted: bool) -> Result<TaskId> {
542 if let Ok(id) = TaskId::parse(raw) {
543 if store.get_task(&id, include_deleted)?.is_some() {
544 return Ok(id);
545 }
546 }
547
548 let tasks = if include_deleted {
549 store.list_tasks_unfiltered()?
550 } else {
551 store.list_tasks()?
552 };
553
554 let upper = raw.to_ascii_uppercase();
555 let matches: Vec<TaskId> = tasks
556 .into_iter()
557 .filter(|t| t.id.as_str().ends_with(&upper))
558 .map(|t| t.id)
559 .collect();
560
561 match matches.as_slice() {
562 [] => bail!("task '{raw}' not found"),
563 [id] => Ok(id.clone()),
564 _ => bail!("task reference '{raw}' is ambiguous"),
565 }
566}
567
568pub fn partition_blockers(store: &Store, blockers: &[TaskId]) -> Result<BlockerPartition> {
569 let mut out = BlockerPartition::default();
570 for blocker in blockers {
571 let Some(task) = store.get_task(blocker, true)? else {
572 out.resolved.push(blocker.clone());
573 continue;
574 };
575 if task.status == Status::Closed || task.deleted_at.is_some() {
576 out.resolved.push(blocker.clone());
577 } else {
578 out.open.push(blocker.clone());
579 }
580 }
581 Ok(out)
582}
583
584pub fn insert_task_map(tasks: &LoroMap, task_id: &TaskId) -> Result<LoroMap> {
585 tasks
586 .insert_container(task_id.as_str(), LoroMap::new())
587 .context("failed to create task map")
588}
589
590pub fn get_task_map(tasks: &LoroMap, task_id: &TaskId) -> Result<Option<LoroMap>> {
591 match tasks.get(task_id.as_str()) {
592 Some(ValueOrContainer::Container(Container::Map(map))) => Ok(Some(map)),
593 Some(_) => bail!("task '{}' has invalid container type", task_id.as_str()),
594 None => Ok(None),
595 }
596}
597
598pub fn get_or_create_child_map(parent: &LoroMap, key: &str) -> Result<LoroMap> {
599 parent
600 .get_or_create_container(key, LoroMap::new())
601 .with_context(|| format!("failed to get or create map key '{key}'"))
602}
603
604fn bindings_path(root: &Path) -> PathBuf {
605 root.join(BINDINGS_FILE)
606}
607
608fn resolve_project_name(
609 start: &Path,
610 root: &Path,
611 explicit: Option<&str>,
612) -> Result<Option<String>> {
613 if let Some(project) = explicit {
614 validate_project_name(project)?;
615 return Ok(Some(project.to_string()));
616 }
617
618 let cwd = canonicalize_binding_path(start)?;
619 let bindings = load_bindings(root)?;
620
621 let mut best: Option<(usize, String)> = None;
622 for (raw_path, project) in bindings.bindings {
623 let bound = PathBuf::from(raw_path);
624 if is_prefix_path(&bound, &cwd) {
625 let score = bound.components().count();
626 match &best {
627 Some((best_score, _)) if *best_score >= score => {}
628 _ => best = Some((score, project)),
629 }
630 }
631 }
632
633 if let Some((_, project)) = best {
634 return Ok(Some(project));
635 }
636
637 Ok(None)
638}
639
640fn bind_project(cwd: &Path, project: &str) -> Result<()> {
641 validate_project_name(project)?;
642
643 let root = data_root()?;
644 fs::create_dir_all(&root)?;
645
646 let canonical = canonicalize_binding_path(cwd)?;
647 let mut bindings = load_bindings(&root)?;
648 bindings
649 .bindings
650 .insert(canonical.to_string_lossy().to_string(), project.to_string());
651 save_bindings(&root, &bindings)
652}
653
654fn load_bindings(root: &Path) -> Result<BindingsFile> {
655 let path = bindings_path(root);
656 if !path.exists() {
657 return Ok(BindingsFile::default());
658 }
659 let content = fs::read_to_string(&path)
660 .with_context(|| format!("failed reading bindings from '{}'", path.display()))?;
661 serde_json::from_str(&content)
662 .with_context(|| format!("invalid bindings file '{}'", path.display()))
663}
664
665fn save_bindings(root: &Path, bindings: &BindingsFile) -> Result<()> {
666 let path = bindings_path(root);
667 let bytes = serde_json::to_vec_pretty(bindings)?;
668 atomic_write_file(&path, &bytes)
669}
670
671fn canonicalize_binding_path(path: &Path) -> Result<PathBuf> {
672 fs::canonicalize(path).with_context(|| format!("failed to canonicalize '{}'", path.display()))
673}
674
675fn is_prefix_path(prefix: &Path, target: &Path) -> bool {
676 let mut prefix_components = prefix.components();
677 let mut target_components = target.components();
678
679 loop {
680 match (prefix_components.next(), target_components.next()) {
681 (None, _) => return true,
682 (Some(_), None) => return false,
683 (Some(a), Some(b)) if a == b => continue,
684 _ => return false,
685 }
686 }
687}
688
689fn validate_project_name(name: &str) -> Result<()> {
690 if name.is_empty() {
691 bail!("project name cannot be empty");
692 }
693 if name.contains('/') || name.contains('\\') || name == "." || name == ".." {
694 bail!("invalid project name '{name}'");
695 }
696 if name.chars().any(char::is_control) {
697 bail!("invalid project name '{name}'");
698 }
699 Ok(())
700}
701
702fn read_project_id_from_doc(doc: &LoroDoc) -> Result<String> {
703 let root = serde_json::to_value(doc.get_deep_value())?;
704 root.get("meta")
705 .and_then(|m| m.get("project_id"))
706 .and_then(Value::as_str)
707 .map(str::to_owned)
708 .ok_or_else(|| anyhow!("missing meta.project_id in project doc"))
709}
710
711fn hydrate_task(task_id_raw: &str, value: &Value) -> Result<Task> {
712 let obj = value
713 .as_object()
714 .ok_or_else(|| anyhow!("task '{task_id_raw}' is not an object"))?;
715
716 let id = TaskId::parse(task_id_raw)?;
717
718 let title = get_required_string(obj, "title")?;
719 let description = get_required_string(obj, "description")?;
720 let task_type = get_required_string(obj, "type")?;
721 let status = Status::parse(&get_required_string(obj, "status")?)?;
722 let priority = Priority::parse(&get_required_string(obj, "priority")?)?;
723 let effort = Effort::parse(&get_required_string(obj, "effort")?)?;
724 let parent = match obj.get("parent").and_then(Value::as_str) {
725 Some("") | None => None,
726 Some(raw) => Some(TaskId::parse(raw)?),
727 };
728
729 let created_at = get_required_string(obj, "created_at")?;
730 let updated_at = get_required_string(obj, "updated_at")?;
731 let deleted_at = obj
732 .get("deleted_at")
733 .and_then(Value::as_str)
734 .map(str::to_owned)
735 .filter(|s| !s.is_empty());
736
737 let labels = obj
738 .get("labels")
739 .and_then(Value::as_object)
740 .map(|m| m.keys().cloned().collect())
741 .unwrap_or_default();
742
743 let blockers = obj
744 .get("blockers")
745 .and_then(Value::as_object)
746 .map(|m| {
747 m.keys()
748 .map(|raw| TaskId::parse(raw))
749 .collect::<Result<Vec<_>>>()
750 })
751 .transpose()?
752 .unwrap_or_default();
753
754 let mut logs = obj
755 .get("logs")
756 .and_then(Value::as_object)
757 .map(|logs| {
758 logs.iter()
759 .map(|(log_id_raw, payload)| {
760 let payload_obj = payload.as_object().ok_or_else(|| {
761 anyhow!("log '{log_id_raw}' on task '{task_id_raw}' is not an object")
762 })?;
763 Ok(LogEntry {
764 id: TaskId::parse(log_id_raw)?,
765 timestamp: get_required_string(payload_obj, "timestamp")?,
766 message: get_required_string(payload_obj, "message")?,
767 })
768 })
769 .collect::<Result<Vec<_>>>()
770 })
771 .transpose()?
772 .unwrap_or_default();
773
774 logs.sort_by(|a, b| a.id.as_str().cmp(b.id.as_str()));
775
776 Ok(Task {
777 id,
778 title,
779 description,
780 task_type,
781 priority,
782 status,
783 effort,
784 parent,
785 created_at,
786 updated_at,
787 deleted_at,
788 labels,
789 blockers,
790 logs,
791 })
792}
793
794fn get_required_string(map: &serde_json::Map<String, Value>, key: &str) -> Result<String> {
795 map.get(key)
796 .and_then(Value::as_str)
797 .map(str::to_owned)
798 .ok_or_else(|| anyhow!("missing or non-string key '{key}'"))
799}
800
801fn collect_delta_paths(project_dir: &Path) -> Result<Vec<PathBuf>> {
802 let mut paths = Vec::new();
803 collect_changes_from_dir(&project_dir.join(CHANGES_DIR), &mut paths)?;
804
805 for entry in fs::read_dir(project_dir)? {
806 let path = entry?.path();
807 if !path.is_dir() {
808 continue;
809 }
810 let Some(name) = path.file_name().and_then(|n| n.to_str()) else {
811 continue;
812 };
813 if name.starts_with("changes.compacting.") {
814 collect_changes_from_dir(&path, &mut paths)?;
815 }
816 }
817
818 Ok(paths)
819}
820
821fn collect_changes_from_dir(dir: &Path, out: &mut Vec<PathBuf>) -> Result<()> {
822 if !dir.exists() {
823 return Ok(());
824 }
825
826 for entry in fs::read_dir(dir)? {
827 let path = entry?.path();
828 if !path.is_file() {
829 continue;
830 }
831
832 let Some(filename) = path.file_name().and_then(|n| n.to_str()) else {
833 continue;
834 };
835 if filename.ends_with(TMP_SUFFIX) || !filename.ends_with(".loro") {
836 continue;
837 }
838
839 let Some(stem) = path.file_stem().and_then(|s| s.to_str()) else {
840 continue;
841 };
842 if Ulid::from_string(stem).is_err() {
843 continue;
844 }
845
846 out.push(path);
847 }
848
849 Ok(())
850}
851
852fn project_dir(root: &Path, project: &str) -> PathBuf {
853 root.join(PROJECTS_DIR).join(project)
854}
855
856fn load_or_create_device_peer_id(root: &Path) -> Result<PeerID> {
857 let path = root.join("device_id");
858 if let Some(parent) = path.parent() {
859 fs::create_dir_all(parent)?;
860 }
861
862 let device_ulid = if path.exists() {
863 let content = fs::read_to_string(&path)
864 .with_context(|| format!("failed reading device id from '{}'", path.display()))?;
865 Ulid::from_string(content.trim()).context("invalid persisted device id ULID")?
866 } else {
867 let id = Ulid::new();
868 atomic_write_file(&path, id.to_string().as_bytes())?;
869 id
870 };
871
872 let raw: u128 = device_ulid.into();
873 Ok((raw & u64::MAX as u128) as u64)
874}
875
876fn atomic_write_file(path: &Path, bytes: &[u8]) -> Result<()> {
877 let parent = path
878 .parent()
879 .ok_or_else(|| anyhow!("cannot atomically write root path '{}'", path.display()))?;
880 fs::create_dir_all(parent)?;
881
882 let tmp_name = format!(
883 "{}.{}{}",
884 path.file_name().and_then(|n| n.to_str()).unwrap_or("write"),
885 Ulid::new(),
886 TMP_SUFFIX
887 );
888 let tmp_path = parent.join(tmp_name);
889
890 {
891 let mut file = OpenOptions::new()
892 .create_new(true)
893 .write(true)
894 .open(&tmp_path)
895 .with_context(|| format!("failed to open temp file '{}'", tmp_path.display()))?;
896 file.write_all(bytes)?;
897 file.sync_all()?;
898 }
899
900 fs::rename(&tmp_path, path).with_context(|| {
901 format!(
902 "failed to atomically rename '{}' to '{}'",
903 tmp_path.display(),
904 path.display()
905 )
906 })?;
907
908 sync_dir(parent)?;
909 Ok(())
910}
911
912fn sync_dir(path: &Path) -> Result<()> {
913 let dir =
914 File::open(path).with_context(|| format!("failed opening dir '{}'", path.display()))?;
915 dir.sync_all()
916 .with_context(|| format!("failed fsync on dir '{}'", path.display()))?;
917 Ok(())
918}