1use anyhow::{anyhow, bail, Context, Result};
2use loro::{Container, ExportMode, LoroDoc, LoroMap, PeerID, ValueOrContainer};
3use serde::{Deserialize, Serialize};
4use serde_json::Value;
5use std::collections::BTreeMap;
6use std::fmt;
7use std::fs::{self, File, OpenOptions};
8use std::io::Write;
9use std::path::{Path, PathBuf};
10
11use fs2::FileExt;
12use ulid::Ulid;
13
14pub const PROJECT_ENV: &str = "TD_PROJECT";
15
16pub(crate) const PROJECTS_DIR: &str = "projects";
17const CHANGES_DIR: &str = "changes";
18const BINDINGS_FILE: &str = "bindings.json";
19const BASE_FILE: &str = "base.loro";
20const TMP_SUFFIX: &str = ".tmp";
21use crate::migrate;
22
23/// Current UTC time in ISO 8601 format.
24pub fn now_utc() -> String {
25 chrono::Utc::now().format("%Y-%m-%dT%H:%M:%SZ").to_string()
26}
27
28/// Lifecycle state for a task.
29#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
30#[serde(rename_all = "snake_case")]
31pub enum Status {
32 Open,
33 InProgress,
34 Closed,
35}
36
37impl Status {
38 fn as_str(self) -> &'static str {
39 match self {
40 Status::Open => "open",
41 Status::InProgress => "in_progress",
42 Status::Closed => "closed",
43 }
44 }
45
46 fn parse(raw: &str) -> Result<Self> {
47 match raw {
48 "open" => Ok(Self::Open),
49 "in_progress" => Ok(Self::InProgress),
50 "closed" => Ok(Self::Closed),
51 _ => bail!("invalid status '{raw}'"),
52 }
53 }
54}
55
56/// Priority for task ordering.
57#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
58#[serde(rename_all = "snake_case")]
59pub enum Priority {
60 High,
61 Medium,
62 Low,
63}
64
65impl Priority {
66 fn as_str(self) -> &'static str {
67 match self {
68 Priority::High => "high",
69 Priority::Medium => "medium",
70 Priority::Low => "low",
71 }
72 }
73
74 fn parse(raw: &str) -> Result<Self> {
75 match raw {
76 "high" => Ok(Self::High),
77 "medium" => Ok(Self::Medium),
78 "low" => Ok(Self::Low),
79 _ => bail!("invalid priority '{raw}'"),
80 }
81 }
82
83 pub fn score(self) -> i32 {
84 match self {
85 Priority::High => 1,
86 Priority::Medium => 2,
87 Priority::Low => 3,
88 }
89 }
90}
91
92/// Estimated effort for a task.
93#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
94#[serde(rename_all = "snake_case")]
95pub enum Effort {
96 Low,
97 Medium,
98 High,
99}
100
101impl Effort {
102 fn as_str(self) -> &'static str {
103 match self {
104 Effort::Low => "low",
105 Effort::Medium => "medium",
106 Effort::High => "high",
107 }
108 }
109
110 fn parse(raw: &str) -> Result<Self> {
111 match raw {
112 "low" => Ok(Self::Low),
113 "medium" => Ok(Self::Medium),
114 "high" => Ok(Self::High),
115 _ => bail!("invalid effort '{raw}'"),
116 }
117 }
118
119 pub fn score(self) -> i32 {
120 match self {
121 Effort::Low => 1,
122 Effort::Medium => 2,
123 Effort::High => 3,
124 }
125 }
126}
127
128/// A stable task identifier backed by a ULID.
129///
130/// Serializes as the short display form (`td-XXXXXXX`) for user-facing
131/// JSON. Use [`TaskId::as_str`] when the full ULID is needed (e.g.
132/// for CRDT keys or export round-tripping).
133#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
134pub struct TaskId(String);
135
136impl Serialize for TaskId {
137 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
138 serializer.serialize_str(&self.short())
139 }
140}
141
142impl TaskId {
143 pub fn new(id: Ulid) -> Self {
144 Self(id.to_string())
145 }
146
147 pub fn parse(raw: &str) -> Result<Self> {
148 let id = Ulid::from_string(raw).with_context(|| format!("invalid task id '{raw}'"))?;
149 Ok(Self::new(id))
150 }
151
152 pub fn as_str(&self) -> &str {
153 &self.0
154 }
155
156 pub fn short(&self) -> String {
157 format!("td-{}", &self.0[self.0.len() - 7..])
158 }
159
160 /// Return a display-friendly short ID from a raw ULID string.
161 pub fn display_id(raw: &str) -> String {
162 let n = raw.len();
163 if n > 7 {
164 format!("td-{}", &raw[n - 7..])
165 } else {
166 format!("td-{raw}")
167 }
168 }
169}
170
171impl fmt::Display for TaskId {
172 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
173 write!(f, "{}", self.short())
174 }
175}
176
177/// A task log entry embedded in a task record.
178#[derive(Debug, Clone, Serialize)]
179pub struct LogEntry {
180 pub id: TaskId,
181 pub timestamp: String,
182 pub message: String,
183}
184
185/// Hydrated task data from the CRDT document.
186#[derive(Debug, Clone, Serialize)]
187pub struct Task {
188 pub id: TaskId,
189 pub title: String,
190 pub description: String,
191 #[serde(rename = "type")]
192 pub task_type: String,
193 pub priority: Priority,
194 pub status: Status,
195 pub effort: Effort,
196 pub parent: Option<TaskId>,
197 pub created_at: String,
198 pub updated_at: String,
199 pub deleted_at: Option<String>,
200 pub labels: Vec<String>,
201 pub blockers: Vec<TaskId>,
202 pub logs: Vec<LogEntry>,
203}
204
205impl Task {
206 /// Serialize this task with full ULIDs instead of short display IDs.
207 ///
208 /// Used by `export` so that `import` can round-trip data losslessly —
209 /// `import` needs the full ULID to recreate exact CRDT keys.
210 pub fn to_export_value(&self) -> serde_json::Value {
211 serde_json::json!({
212 "id": self.id.as_str(),
213 "title": self.title,
214 "description": self.description,
215 "type": self.task_type,
216 "priority": self.priority,
217 "status": self.status,
218 "effort": self.effort,
219 "parent": self.parent.as_ref().map(|p| p.as_str()),
220 "created_at": self.created_at,
221 "updated_at": self.updated_at,
222 "deleted_at": self.deleted_at,
223 "labels": self.labels,
224 "blockers": self.blockers.iter().map(|b| b.as_str()).collect::<Vec<_>>(),
225 "logs": self.logs.iter().map(|l| {
226 serde_json::json!({
227 "id": l.id.as_str(),
228 "timestamp": l.timestamp,
229 "message": l.message,
230 })
231 }).collect::<Vec<_>>(),
232 })
233 }
234}
235
236/// Result type for partitioning blockers by task state.
237#[derive(Debug, Default, Clone, Serialize)]
238pub struct BlockerPartition {
239 pub open: Vec<TaskId>,
240 pub resolved: Vec<TaskId>,
241}
242
243#[derive(Debug, Default, Clone, Serialize, Deserialize)]
244struct BindingsFile {
245 #[serde(default)]
246 bindings: BTreeMap<String, String>,
247}
248
249/// Storage wrapper around one project's Loro document and disk layout.
250#[derive(Debug, Clone)]
251pub struct Store {
252 root: PathBuf,
253 project: String,
254 doc: LoroDoc,
255}
256
257impl Store {
258 pub fn init(root: &Path, project: &str) -> Result<Self> {
259 validate_project_name(project)?;
260 let project_dir = project_dir(root, project);
261 if project_dir.exists() {
262 bail!("project '{project}' already exists");
263 }
264 fs::create_dir_all(project_dir.join(CHANGES_DIR))?;
265
266 let doc = LoroDoc::new();
267 doc.set_peer_id(load_or_create_device_peer_id(root)?)?;
268 doc.get_map("tasks");
269
270 let meta = doc.get_map("meta");
271 meta.insert("schema_version", migrate::CURRENT_SCHEMA_VERSION as i64)?;
272 meta.insert("project_id", Ulid::new().to_string())?;
273 meta.insert("created_at", now_utc())?;
274
275 let snapshot = doc
276 .export(ExportMode::Snapshot)
277 .context("failed to export initial loro snapshot")?;
278 atomic_write_file(&project_dir.join(BASE_FILE), &snapshot)?;
279
280 Ok(Self {
281 root: root.to_path_buf(),
282 project: project.to_string(),
283 doc,
284 })
285 }
286
287 pub fn open(root: &Path, project: &str) -> Result<Self> {
288 validate_project_name(project)?;
289 let project_dir = project_dir(root, project);
290 let base_path = project_dir.join(BASE_FILE);
291
292 if !base_path.exists() {
293 bail!("project '{project}' is not initialized. Run 'td project init {project}'");
294 }
295
296 let base = fs::read(&base_path)
297 .with_context(|| format!("failed to read loro snapshot '{}'", base_path.display()))?;
298
299 let doc = LoroDoc::from_snapshot(&base).context("failed to load loro snapshot")?;
300 doc.set_peer_id(load_or_create_device_peer_id(root)?)?;
301
302 let mut deltas = collect_delta_paths(&project_dir)?;
303 deltas.sort_by_key(|path| {
304 path.file_stem()
305 .and_then(|s| s.to_str())
306 .and_then(|s| Ulid::from_string(s).ok())
307 });
308
309 for delta_path in deltas {
310 let bytes = fs::read(&delta_path)
311 .with_context(|| format!("failed to read loro delta '{}'", delta_path.display()))?;
312 if let Err(err) = doc.import(&bytes) {
313 // Tolerate malformed or stale delta files as requested by design.
314 eprintln!(
315 "warning: skipping unreadable delta '{}': {err}",
316 delta_path.display()
317 );
318 }
319 }
320
321 // Apply any pending schema upgrades and persist the resulting delta
322 // so subsequent opens don't repeat the work.
323 let before_vv = doc.oplog_vv();
324 let upgraded = migrate::ensure_current(&doc)?;
325 if upgraded {
326 doc.commit();
327 let delta = doc
328 .export(ExportMode::updates(&before_vv))
329 .context("failed to export schema upgrade delta")?;
330 let filename = format!("{}.loro", Ulid::new());
331 let delta_path = project_dir.join(CHANGES_DIR).join(filename);
332 atomic_write_file(&delta_path, &delta)?;
333 }
334
335 Ok(Self {
336 root: root.to_path_buf(),
337 project: project.to_string(),
338 doc,
339 })
340 }
341
342 /// Bootstrap a local project from peer-provided delta bytes.
343 ///
344 /// The incoming delta is imported into a fresh document, validated to
345 /// ensure it carries `meta.project_id`, and then persisted as a base
346 /// snapshot for future opens.
347 pub fn bootstrap_from_peer(root: &Path, project: &str, delta: &[u8]) -> Result<Self> {
348 validate_project_name(project)?;
349 let project_dir = project_dir(root, project);
350 if project_dir.exists() {
351 bail!("project '{project}' already exists");
352 }
353 fs::create_dir_all(project_dir.join(CHANGES_DIR))?;
354
355 let doc = LoroDoc::new();
356 doc.set_peer_id(load_or_create_device_peer_id(root)?)?;
357 doc.import(delta)
358 .context("failed to import bootstrap delta from peer")?;
359 doc.commit();
360
361 read_project_id_from_doc(&doc)
362 .context("bootstrap delta is missing required project identity")?;
363
364 // Upgrade the peer's document before snapshotting so the local
365 // copy is always at CURRENT_SCHEMA_VERSION from the start.
366 migrate::ensure_current(&doc)?;
367 doc.commit();
368
369 let snapshot = doc
370 .export(ExportMode::Snapshot)
371 .context("failed to export bootstrap loro snapshot")?;
372 atomic_write_file(&project_dir.join(BASE_FILE), &snapshot)?;
373
374 Ok(Self {
375 root: root.to_path_buf(),
376 project: project.to_string(),
377 doc,
378 })
379 }
380
381 pub fn root(&self) -> &Path {
382 &self.root
383 }
384
385 pub fn project_name(&self) -> &str {
386 &self.project
387 }
388
389 pub fn doc(&self) -> &LoroDoc {
390 &self.doc
391 }
392
393 /// Export all current state to a fresh base snapshot.
394 /// Compact accumulated deltas into the base snapshot using a two-phase
395 /// protocol that is safe against concurrent writers.
396 ///
397 /// **Phase 1** — rename `changes/` to `changes.compacting.<ulid>/`, then
398 /// immediately create a fresh `changes/`. Any concurrent `td` command
399 /// that writes a delta after this point lands in the new `changes/` and is
400 /// therefore never touched by this operation.
401 ///
402 /// **Phase 2** — write a fresh base snapshot from the in-memory document
403 /// (which was loaded from both `base.loro` and every delta at `open` time),
404 /// then remove the compacting directory.
405 ///
406 /// Any orphaned `changes.compacting.*` directories left by a previously
407 /// crashed tidy are also removed: they were already merged into `self.doc`
408 /// at open time, so the new snapshot includes their contents.
409 ///
410 /// Returns the number of delta files folded into the snapshot.
411 pub fn tidy(&self) -> Result<usize> {
412 let project_dir = project_dir(&self.root, &self.project);
413 let changes_dir = project_dir.join(CHANGES_DIR);
414
415 // Phase 1: atomically hand off the current changes/ to a compacting
416 // directory so new writers have a clean home immediately.
417 let compacting_dir = project_dir.join(format!("changes.compacting.{}", Ulid::new()));
418 if changes_dir.exists() {
419 fs::rename(&changes_dir, &compacting_dir).with_context(|| {
420 format!(
421 "failed to rename '{}' to '{}'",
422 changes_dir.display(),
423 compacting_dir.display()
424 )
425 })?;
426 }
427 fs::create_dir_all(&changes_dir).context("failed to create fresh changes/")?;
428
429 // Re-import every delta from the compacting directories. self.doc
430 // was populated at open() time, but a concurrent writer may have
431 // appended a delta to changes/ between open() and the Phase 1
432 // rename — that delta is now inside compacting_dir without being in
433 // self.doc. CRDT import is idempotent (deduplicates by OpID), so
434 // re-importing already-known ops is harmless.
435 let mut compacting_deltas = collect_delta_paths(&project_dir)?;
436 compacting_deltas.sort_by_key(|path| {
437 path.file_stem()
438 .and_then(|s| s.to_str())
439 .and_then(|s| Ulid::from_string(s).ok())
440 });
441 for delta_path in &compacting_deltas {
442 if let Ok(bytes) = fs::read(delta_path) {
443 if let Err(err) = self.doc.import(&bytes) {
444 eprintln!(
445 "warning: skipping unreadable delta '{}': {err}",
446 delta_path.display()
447 );
448 }
449 }
450 }
451
452 // Phase 2: write the new base snapshot. self.doc now holds the
453 // full merged state including any concurrent deltas.
454 let out = project_dir.join(BASE_FILE);
455 let bytes = self
456 .doc
457 .export(ExportMode::Snapshot)
458 .context("failed to export loro snapshot")?;
459 atomic_write_file(&out, &bytes)?;
460
461 // Remove the compacting directory we created in phase 1 plus any
462 // orphaned changes.compacting.* dirs from previously crashed tidies.
463 let mut removed = 0usize;
464 for entry in fs::read_dir(&project_dir)
465 .with_context(|| format!("failed to read project dir '{}'", project_dir.display()))?
466 {
467 let path = entry?.path();
468 if !path.is_dir() {
469 continue;
470 }
471 let Some(name) = path.file_name().and_then(|n| n.to_str()) else {
472 continue;
473 };
474 if !name.starts_with("changes.compacting.") {
475 continue;
476 }
477 // Count files before removing for the summary report.
478 for file in fs::read_dir(&path)
479 .with_context(|| format!("failed to read '{}'", path.display()))?
480 {
481 let fp = file?.path();
482 if fp.is_file() {
483 removed += 1;
484 }
485 }
486 fs::remove_dir_all(&path)
487 .with_context(|| format!("failed to remove compacting dir '{}'", path.display()))?;
488 }
489
490 Ok(removed)
491 }
492
493 /// Apply a local mutation and persist only the resulting delta.
494 pub fn apply_and_persist<F>(&self, mutator: F) -> Result<PathBuf>
495 where
496 F: FnOnce(&LoroDoc) -> Result<()>,
497 {
498 let before = self.doc.oplog_vv();
499 mutator(&self.doc)?;
500 self.doc.commit();
501
502 let delta = self
503 .doc
504 .export(ExportMode::updates(&before))
505 .context("failed to export loro update delta")?;
506
507 let filename = format!("{}.loro", Ulid::new());
508 let path = project_dir(&self.root, &self.project)
509 .join(CHANGES_DIR)
510 .join(filename);
511 atomic_write_file(&path, &delta)?;
512 Ok(path)
513 }
514
515 /// Persist pre-built delta bytes (e.g. received from a peer) as a new
516 /// change file without re-exporting from the doc.
517 pub fn save_raw_delta(&self, bytes: &[u8]) -> Result<PathBuf> {
518 let filename = format!("{}.loro", Ulid::new());
519 let path = project_dir(&self.root, &self.project)
520 .join(CHANGES_DIR)
521 .join(filename);
522 atomic_write_file(&path, bytes)?;
523 Ok(path)
524 }
525
526 /// Return hydrated tasks, excluding tombstones.
527 pub fn list_tasks(&self) -> Result<Vec<Task>> {
528 self.list_tasks_inner(false)
529 }
530
531 /// Return hydrated tasks, including tombstoned rows.
532 pub fn list_tasks_unfiltered(&self) -> Result<Vec<Task>> {
533 self.list_tasks_inner(true)
534 }
535
536 /// Find a task by exact ULID string.
537 pub fn get_task(&self, id: &TaskId, include_deleted: bool) -> Result<Option<Task>> {
538 let tasks = if include_deleted {
539 self.list_tasks_unfiltered()?
540 } else {
541 self.list_tasks()?
542 };
543 Ok(tasks.into_iter().find(|task| task.id == *id))
544 }
545
546 fn list_tasks_inner(&self, include_deleted: bool) -> Result<Vec<Task>> {
547 let root = serde_json::to_value(self.doc.get_deep_value())?;
548 let tasks_obj = root
549 .get("tasks")
550 .and_then(Value::as_object)
551 .ok_or_else(|| anyhow!("missing root tasks map"))?;
552
553 let mut tasks = Vec::with_capacity(tasks_obj.len());
554 for (task_id_raw, task_json) in tasks_obj {
555 let task = hydrate_task(task_id_raw, task_json)?;
556 if include_deleted || task.deleted_at.is_none() {
557 tasks.push(task);
558 }
559 }
560
561 tasks.sort_by(|a, b| a.id.as_str().cmp(b.id.as_str()));
562 Ok(tasks)
563 }
564
565 /// Return the stable project identity stored in `meta.project_id`.
566 pub fn project_id(&self) -> Result<String> {
567 read_project_id_from_doc(&self.doc)
568 }
569
570 pub fn schema_version(&self) -> Result<u32> {
571 migrate::read_schema_version(&self.doc)
572 }
573}
574
575/// Generate a new task ULID.
576pub fn gen_id() -> TaskId {
577 TaskId::new(Ulid::new())
578}
579
580pub fn parse_status(s: &str) -> Result<Status> {
581 Status::parse(s)
582}
583
584pub fn parse_priority(s: &str) -> Result<Priority> {
585 Priority::parse(s)
586}
587
588pub fn parse_effort(s: &str) -> Result<Effort> {
589 Effort::parse(s)
590}
591
592pub fn status_label(s: Status) -> &'static str {
593 s.as_str()
594}
595
596pub fn priority_label(p: Priority) -> &'static str {
597 p.as_str()
598}
599
600pub fn effort_label(e: Effort) -> &'static str {
601 e.as_str()
602}
603
604pub fn data_root() -> Result<PathBuf> {
605 let home = std::env::var("HOME").context("HOME is not set")?;
606 Ok(PathBuf::from(home).join(".local").join("share").join("td"))
607}
608
609pub fn init(cwd: &Path, project: &str) -> Result<Store> {
610 let root = data_root()?;
611 fs::create_dir_all(root.join(PROJECTS_DIR))?;
612 let store = Store::init(&root, project)?;
613 bind_project(cwd, project)?;
614 Ok(store)
615}
616
617pub fn use_project(cwd: &Path, project: &str) -> Result<()> {
618 let root = data_root()?;
619 validate_project_name(project)?;
620 if !project_dir(&root, project).join(BASE_FILE).exists() {
621 bail!("project '{project}' not found. Run 'td project list' to list known projects");
622 }
623 bind_project(cwd, project)
624}
625
626pub fn open(start: &Path) -> Result<Store> {
627 let root = data_root()?;
628 let explicit = std::env::var(PROJECT_ENV).ok();
629 let project = resolve_project_name(start, &root, explicit.as_deref())?.ok_or_else(|| {
630 anyhow!(
631 "no project selected. Use --project/TD_PROJECT, run 'td project bind <name>', or run 'td project init <name>'"
632 )
633 })?;
634 Store::open(&root, &project)
635}
636
637/// Open the project selected by `--project`/`TD_PROJECT`/bindings if one exists.
638///
639/// Returns `Ok(None)` when no project is selected by any mechanism.
640pub fn try_open(start: &Path) -> Result<Option<Store>> {
641 let root = data_root()?;
642 let explicit = std::env::var(PROJECT_ENV).ok();
643 let Some(project) = resolve_project_name(start, &root, explicit.as_deref())? else {
644 return Ok(None);
645 };
646 Store::open(&root, &project).map(Some)
647}
648
649/// Bootstrap a project from a peer delta and bind the current directory.
650pub fn bootstrap_sync(cwd: &Path, project: &str, delta: &[u8]) -> Result<Store> {
651 let root = data_root()?;
652 fs::create_dir_all(root.join(PROJECTS_DIR))?;
653 validate_project_name(project)?;
654 let store = Store::bootstrap_from_peer(&root, project, delta)?;
655 bind_project(cwd, project)?;
656 Ok(store)
657}
658
659/// Bootstrap a project from a peer delta using an explicit data root.
660///
661/// Unlike [`bootstrap_sync`], this function does not consult `HOME` and is
662/// therefore safe to call from async contexts where `HOME` may vary by peer.
663///
664/// If `bind_cwd` is true, the given working directory is bound to the new
665/// project. Pass false when bootstrapping from a SyncAll context to avoid
666/// unexpectedly binding directories like the user's home.
667///
668/// Uses exclusive file locking to prevent race conditions when multiple
669/// concurrent sync operations create projects or modify bindings.
670pub fn bootstrap_sync_at(
671 data_root: &Path,
672 cwd: &Path,
673 project: &str,
674 delta: &[u8],
675 bind_cwd: bool,
676) -> Result<Store> {
677 fs::create_dir_all(data_root.join(PROJECTS_DIR))?;
678 validate_project_name(project)?;
679
680 // Exclusive lock prevents races when concurrent syncs create the same project
681 // or modify bindings simultaneously.
682 let lock_path = data_root.join(".bindings.lock");
683 let lock_file = OpenOptions::new()
684 .create(true)
685 .truncate(false)
686 .write(true)
687 .open(&lock_path)
688 .with_context(|| format!("failed to open lock file '{}'", lock_path.display()))?;
689 lock_file
690 .lock_exclusive()
691 .context("failed to acquire exclusive lock on bindings")?;
692
693 // Now holding the lock: create project and optionally update bindings atomically.
694 let store = Store::bootstrap_from_peer(data_root, project, delta)?;
695
696 if bind_cwd {
697 let canonical = fs::canonicalize(cwd)
698 .with_context(|| format!("failed to canonicalize '{}'", cwd.display()))?;
699 let mut bindings = load_bindings(data_root)?;
700 bindings
701 .bindings
702 .insert(canonical.to_string_lossy().to_string(), project.to_string());
703 save_bindings(data_root, &bindings)?;
704 }
705
706 // Lock is released when lock_file is dropped.
707 Ok(store)
708}
709
710pub fn list_projects() -> Result<Vec<String>> {
711 let root = data_root()?;
712 list_projects_in(&root)
713}
714
715/// List project names rooted at an explicit data directory.
716///
717/// Unlike [`list_projects`], this does not consult `HOME` and is therefore
718/// safe to call from async contexts where `HOME` may vary between peers.
719pub(crate) fn list_projects_in(root: &Path) -> Result<Vec<String>> {
720 let mut out = Vec::new();
721 let dir = root.join(PROJECTS_DIR);
722 if !dir.exists() {
723 return Ok(out);
724 }
725
726 for entry in fs::read_dir(dir)? {
727 let path = entry?.path();
728 if !path.is_dir() {
729 continue;
730 }
731 let Some(name) = path.file_name().and_then(|n| n.to_str()) else {
732 continue;
733 };
734 if path.join(BASE_FILE).exists() {
735 out.push(name.to_string());
736 }
737 }
738
739 out.sort();
740 Ok(out)
741}
742
743pub fn resolve_task_id(store: &Store, raw: &str, include_deleted: bool) -> Result<TaskId> {
744 let raw = raw.strip_prefix("td-").unwrap_or(raw);
745
746 if let Ok(id) = TaskId::parse(raw) {
747 if store.get_task(&id, include_deleted)?.is_some() {
748 return Ok(id);
749 }
750 }
751
752 let tasks = if include_deleted {
753 store.list_tasks_unfiltered()?
754 } else {
755 store.list_tasks()?
756 };
757
758 let upper = raw.to_ascii_uppercase();
759 let matches: Vec<TaskId> = tasks
760 .into_iter()
761 .filter(|t| t.id.as_str().ends_with(&upper))
762 .map(|t| t.id)
763 .collect();
764
765 match matches.as_slice() {
766 [] => bail!("task '{raw}' not found"),
767 [id] => Ok(id.clone()),
768 _ => bail!("task reference '{raw}' is ambiguous"),
769 }
770}
771
772pub fn partition_blockers(store: &Store, blockers: &[TaskId]) -> Result<BlockerPartition> {
773 let mut out = BlockerPartition::default();
774 for blocker in blockers {
775 let Some(task) = store.get_task(blocker, true)? else {
776 out.resolved.push(blocker.clone());
777 continue;
778 };
779 if task.status == Status::Closed || task.deleted_at.is_some() {
780 out.resolved.push(blocker.clone());
781 } else {
782 out.open.push(blocker.clone());
783 }
784 }
785 Ok(out)
786}
787
788pub fn insert_task_map(tasks: &LoroMap, task_id: &TaskId) -> Result<LoroMap> {
789 tasks
790 .insert_container(task_id.as_str(), LoroMap::new())
791 .context("failed to create task map")
792}
793
794pub fn get_task_map(tasks: &LoroMap, task_id: &TaskId) -> Result<Option<LoroMap>> {
795 match tasks.get(task_id.as_str()) {
796 Some(ValueOrContainer::Container(Container::Map(map))) => Ok(Some(map)),
797 Some(_) => bail!("task '{}' has invalid container type", task_id.as_str()),
798 None => Ok(None),
799 }
800}
801
802pub fn get_or_create_child_map(parent: &LoroMap, key: &str) -> Result<LoroMap> {
803 parent
804 .get_or_create_container(key, LoroMap::new())
805 .with_context(|| format!("failed to get or create map key '{key}'"))
806}
807
808fn bindings_path(root: &Path) -> PathBuf {
809 root.join(BINDINGS_FILE)
810}
811
812fn resolve_project_name(
813 start: &Path,
814 root: &Path,
815 explicit: Option<&str>,
816) -> Result<Option<String>> {
817 if let Some(project) = explicit {
818 validate_project_name(project)?;
819 return Ok(Some(project.to_string()));
820 }
821
822 let cwd = canonicalize_binding_path(start)?;
823 let bindings = load_bindings(root)?;
824
825 let mut best: Option<(usize, String)> = None;
826 for (raw_path, project) in bindings.bindings {
827 let bound = PathBuf::from(raw_path);
828 if is_prefix_path(&bound, &cwd) {
829 let score = bound.components().count();
830 match &best {
831 Some((best_score, _)) if *best_score >= score => {}
832 _ => best = Some((score, project)),
833 }
834 }
835 }
836
837 if let Some((_, project)) = best {
838 return Ok(Some(project));
839 }
840
841 Ok(None)
842}
843
844pub fn unbind_project(cwd: &Path) -> Result<()> {
845 let root = data_root()?;
846 let canonical = canonicalize_binding_path(cwd)?;
847 let canonical_str = canonical.to_string_lossy().to_string();
848
849 let mut bindings = load_bindings(&root)?;
850 if !bindings.bindings.contains_key(&canonical_str) {
851 bail!("path '{}' is not bound to any project", canonical.display());
852 }
853 bindings.bindings.remove(&canonical_str);
854 save_bindings(&root, &bindings)
855}
856
857pub fn delete_project(name: &str) -> Result<()> {
858 validate_project_name(name)?;
859 let root = data_root()?;
860 let proj_dir = project_dir(&root, name);
861
862 if !proj_dir.join(BASE_FILE).exists() {
863 bail!("project '{name}' not found");
864 }
865
866 fs::remove_dir_all(&proj_dir).with_context(|| {
867 format!(
868 "failed to remove project directory '{}'",
869 proj_dir.display()
870 )
871 })?;
872
873 let mut bindings = load_bindings(&root)?;
874 bindings.bindings.retain(|_, project| project != name);
875 save_bindings(&root, &bindings)
876}
877
878fn bind_project(cwd: &Path, project: &str) -> Result<()> {
879 validate_project_name(project)?;
880
881 let root = data_root()?;
882 fs::create_dir_all(&root)?;
883
884 let canonical = canonicalize_binding_path(cwd)?;
885 let mut bindings = load_bindings(&root)?;
886 bindings
887 .bindings
888 .insert(canonical.to_string_lossy().to_string(), project.to_string());
889 save_bindings(&root, &bindings)
890}
891
892fn load_bindings(root: &Path) -> Result<BindingsFile> {
893 let path = bindings_path(root);
894 if !path.exists() {
895 return Ok(BindingsFile::default());
896 }
897 let content = fs::read_to_string(&path)
898 .with_context(|| format!("failed reading bindings from '{}'", path.display()))?;
899 serde_json::from_str(&content)
900 .with_context(|| format!("invalid bindings file '{}'", path.display()))
901}
902
903fn save_bindings(root: &Path, bindings: &BindingsFile) -> Result<()> {
904 let path = bindings_path(root);
905 let bytes = serde_json::to_vec_pretty(bindings)?;
906 atomic_write_file(&path, &bytes)
907}
908
909fn canonicalize_binding_path(path: &Path) -> Result<PathBuf> {
910 fs::canonicalize(path).with_context(|| format!("failed to canonicalize '{}'", path.display()))
911}
912
913fn is_prefix_path(prefix: &Path, target: &Path) -> bool {
914 let mut prefix_components = prefix.components();
915 let mut target_components = target.components();
916
917 loop {
918 match (prefix_components.next(), target_components.next()) {
919 (None, _) => return true,
920 (Some(_), None) => return false,
921 (Some(a), Some(b)) if a == b => continue,
922 _ => return false,
923 }
924 }
925}
926
927pub fn validate_project_name(name: &str) -> Result<()> {
928 if name.is_empty() {
929 bail!("project name cannot be empty");
930 }
931 if name.contains('/') || name.contains('\\') || name == "." || name == ".." {
932 bail!("invalid project name '{name}'");
933 }
934 if name.chars().any(char::is_control) {
935 bail!("invalid project name '{name}'");
936 }
937 Ok(())
938}
939
940fn read_project_id_from_doc(doc: &LoroDoc) -> Result<String> {
941 let root = serde_json::to_value(doc.get_deep_value())?;
942 root.get("meta")
943 .and_then(|m| m.get("project_id"))
944 .and_then(Value::as_str)
945 .map(str::to_owned)
946 .ok_or_else(|| anyhow!("missing meta.project_id in project doc"))
947}
948
949fn hydrate_task(task_id_raw: &str, value: &Value) -> Result<Task> {
950 let obj = value
951 .as_object()
952 .ok_or_else(|| anyhow!("task '{task_id_raw}' is not an object"))?;
953
954 let id = TaskId::parse(task_id_raw)?;
955
956 let title = get_required_string(obj, "title")?;
957 let description = get_required_string(obj, "description")?;
958 let task_type = get_required_string(obj, "type")?;
959 let status = Status::parse(&get_required_string(obj, "status")?)?;
960 let priority = Priority::parse(&get_required_string(obj, "priority")?)?;
961 let effort = Effort::parse(&get_required_string(obj, "effort")?)?;
962 let parent = match obj.get("parent").and_then(Value::as_str) {
963 Some("") | None => None,
964 Some(raw) => Some(TaskId::parse(raw)?),
965 };
966
967 let created_at = get_required_string(obj, "created_at")?;
968 let updated_at = get_required_string(obj, "updated_at")?;
969 let deleted_at = obj
970 .get("deleted_at")
971 .and_then(Value::as_str)
972 .map(str::to_owned)
973 .filter(|s| !s.is_empty());
974
975 let labels = obj
976 .get("labels")
977 .and_then(Value::as_object)
978 .map(|m| m.keys().cloned().collect())
979 .unwrap_or_default();
980
981 let blockers = obj
982 .get("blockers")
983 .and_then(Value::as_object)
984 .map(|m| {
985 m.keys()
986 .map(|raw| TaskId::parse(raw))
987 .collect::<Result<Vec<_>>>()
988 })
989 .transpose()?
990 .unwrap_or_default();
991
992 let mut logs = obj
993 .get("logs")
994 .and_then(Value::as_object)
995 .map(|logs| {
996 logs.iter()
997 .map(|(log_id_raw, payload)| {
998 let payload_obj = payload.as_object().ok_or_else(|| {
999 anyhow!("log '{log_id_raw}' on task '{task_id_raw}' is not an object")
1000 })?;
1001 Ok(LogEntry {
1002 id: TaskId::parse(log_id_raw)?,
1003 timestamp: get_required_string(payload_obj, "timestamp")?,
1004 message: get_required_string(payload_obj, "message")?,
1005 })
1006 })
1007 .collect::<Result<Vec<_>>>()
1008 })
1009 .transpose()?
1010 .unwrap_or_default();
1011
1012 logs.sort_by(|a, b| a.id.as_str().cmp(b.id.as_str()));
1013
1014 Ok(Task {
1015 id,
1016 title,
1017 description,
1018 task_type,
1019 priority,
1020 status,
1021 effort,
1022 parent,
1023 created_at,
1024 updated_at,
1025 deleted_at,
1026 labels,
1027 blockers,
1028 logs,
1029 })
1030}
1031
1032fn get_required_string(map: &serde_json::Map<String, Value>, key: &str) -> Result<String> {
1033 map.get(key)
1034 .and_then(Value::as_str)
1035 .map(str::to_owned)
1036 .ok_or_else(|| anyhow!("missing or non-string key '{key}'"))
1037}
1038
1039fn collect_delta_paths(project_dir: &Path) -> Result<Vec<PathBuf>> {
1040 let mut paths = Vec::new();
1041 collect_changes_from_dir(&project_dir.join(CHANGES_DIR), &mut paths)?;
1042
1043 for entry in fs::read_dir(project_dir)? {
1044 let path = entry?.path();
1045 if !path.is_dir() {
1046 continue;
1047 }
1048 let Some(name) = path.file_name().and_then(|n| n.to_str()) else {
1049 continue;
1050 };
1051 if name.starts_with("changes.compacting.") {
1052 collect_changes_from_dir(&path, &mut paths)?;
1053 }
1054 }
1055
1056 Ok(paths)
1057}
1058
1059fn collect_changes_from_dir(dir: &Path, out: &mut Vec<PathBuf>) -> Result<()> {
1060 if !dir.exists() {
1061 return Ok(());
1062 }
1063
1064 for entry in fs::read_dir(dir)? {
1065 let path = entry?.path();
1066 if !path.is_file() {
1067 continue;
1068 }
1069
1070 let Some(filename) = path.file_name().and_then(|n| n.to_str()) else {
1071 continue;
1072 };
1073 if filename.ends_with(TMP_SUFFIX) || !filename.ends_with(".loro") {
1074 continue;
1075 }
1076
1077 let Some(stem) = path.file_stem().and_then(|s| s.to_str()) else {
1078 continue;
1079 };
1080 if Ulid::from_string(stem).is_err() {
1081 continue;
1082 }
1083
1084 out.push(path);
1085 }
1086
1087 Ok(())
1088}
1089
1090fn project_dir(root: &Path, project: &str) -> PathBuf {
1091 root.join(PROJECTS_DIR).join(project)
1092}
1093
1094fn load_or_create_device_peer_id(root: &Path) -> Result<PeerID> {
1095 let path = root.join("device_id");
1096 if let Some(parent) = path.parent() {
1097 fs::create_dir_all(parent)?;
1098 }
1099
1100 let device_ulid = if path.exists() {
1101 let content = fs::read_to_string(&path)
1102 .with_context(|| format!("failed reading device id from '{}'", path.display()))?;
1103 Ulid::from_string(content.trim()).context("invalid persisted device id ULID")?
1104 } else {
1105 let id = Ulid::new();
1106 atomic_write_file(&path, id.to_string().as_bytes())?;
1107 id
1108 };
1109
1110 let raw: u128 = device_ulid.into();
1111 Ok((raw & u64::MAX as u128) as u64)
1112}
1113
1114fn atomic_write_file(path: &Path, bytes: &[u8]) -> Result<()> {
1115 let parent = path
1116 .parent()
1117 .ok_or_else(|| anyhow!("cannot atomically write root path '{}'", path.display()))?;
1118 fs::create_dir_all(parent)?;
1119
1120 let tmp_name = format!(
1121 "{}.{}{}",
1122 path.file_name().and_then(|n| n.to_str()).unwrap_or("write"),
1123 Ulid::new(),
1124 TMP_SUFFIX
1125 );
1126 let tmp_path = parent.join(tmp_name);
1127
1128 {
1129 let mut file = OpenOptions::new()
1130 .create_new(true)
1131 .write(true)
1132 .open(&tmp_path)
1133 .with_context(|| format!("failed to open temp file '{}'", tmp_path.display()))?;
1134 file.write_all(bytes)?;
1135 file.sync_all()?;
1136 }
1137
1138 fs::rename(&tmp_path, path).with_context(|| {
1139 format!(
1140 "failed to atomically rename '{}' to '{}'",
1141 tmp_path.display(),
1142 path.display()
1143 )
1144 })?;
1145
1146 sync_dir(parent)?;
1147 Ok(())
1148}
1149
1150fn sync_dir(path: &Path) -> Result<()> {
1151 let dir =
1152 File::open(path).with_context(|| format!("failed opening dir '{}'", path.display()))?;
1153 dir.sync_all()
1154 .with_context(|| format!("failed fsync on dir '{}'", path.display()))?;
1155 Ok(())
1156}