1use anyhow::{anyhow, bail, Context, Result};
2use loro::{Container, ExportMode, LoroDoc, LoroMap, PeerID, ValueOrContainer};
3use serde::{Deserialize, Serialize};
4use serde_json::Value;
5use std::collections::BTreeMap;
6use std::fmt;
7use std::fs::{self, File, OpenOptions};
8use std::io::Write;
9use std::path::{Path, PathBuf};
10
11use fs2::FileExt;
12use ulid::Ulid;
13
14pub const PROJECT_ENV: &str = "TD_PROJECT";
15
16pub(crate) const PROJECTS_DIR: &str = "projects";
17const CHANGES_DIR: &str = "changes";
18const BINDINGS_FILE: &str = "bindings.json";
19const BASE_FILE: &str = "base.loro";
20const TMP_SUFFIX: &str = ".tmp";
21use crate::migrate;
22
23/// Current UTC time in ISO 8601 format.
24pub fn now_utc() -> String {
25 chrono::Utc::now().format("%Y-%m-%dT%H:%M:%SZ").to_string()
26}
27
28/// Lifecycle state for a task.
29#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
30#[serde(rename_all = "snake_case")]
31pub enum Status {
32 Open,
33 InProgress,
34 Closed,
35}
36
37impl Status {
38 fn as_str(self) -> &'static str {
39 match self {
40 Status::Open => "open",
41 Status::InProgress => "in_progress",
42 Status::Closed => "closed",
43 }
44 }
45
46 fn parse(raw: &str) -> Result<Self> {
47 match raw {
48 "open" => Ok(Self::Open),
49 "in_progress" => Ok(Self::InProgress),
50 "closed" => Ok(Self::Closed),
51 _ => bail!("invalid status '{raw}'"),
52 }
53 }
54}
55
56/// Priority for task ordering.
57#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
58#[serde(rename_all = "snake_case")]
59pub enum Priority {
60 High,
61 Medium,
62 Low,
63}
64
65impl Priority {
66 fn as_str(self) -> &'static str {
67 match self {
68 Priority::High => "high",
69 Priority::Medium => "medium",
70 Priority::Low => "low",
71 }
72 }
73
74 fn parse(raw: &str) -> Result<Self> {
75 match raw {
76 "high" => Ok(Self::High),
77 "medium" => Ok(Self::Medium),
78 "low" => Ok(Self::Low),
79 _ => bail!("invalid priority '{raw}'"),
80 }
81 }
82
83 pub fn score(self) -> i32 {
84 match self {
85 Priority::High => 1,
86 Priority::Medium => 2,
87 Priority::Low => 3,
88 }
89 }
90}
91
92/// Estimated effort for a task.
93#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
94#[serde(rename_all = "snake_case")]
95pub enum Effort {
96 Low,
97 Medium,
98 High,
99}
100
101impl Effort {
102 fn as_str(self) -> &'static str {
103 match self {
104 Effort::Low => "low",
105 Effort::Medium => "medium",
106 Effort::High => "high",
107 }
108 }
109
110 fn parse(raw: &str) -> Result<Self> {
111 match raw {
112 "low" => Ok(Self::Low),
113 "medium" => Ok(Self::Medium),
114 "high" => Ok(Self::High),
115 _ => bail!("invalid effort '{raw}'"),
116 }
117 }
118
119 pub fn score(self) -> i32 {
120 match self {
121 Effort::Low => 1,
122 Effort::Medium => 2,
123 Effort::High => 3,
124 }
125 }
126}
127
128/// A stable task identifier backed by a ULID.
129#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize)]
130#[serde(transparent)]
131pub struct TaskId(String);
132
133impl TaskId {
134 pub fn new(id: Ulid) -> Self {
135 Self(id.to_string())
136 }
137
138 pub fn parse(raw: &str) -> Result<Self> {
139 let id = Ulid::from_string(raw).with_context(|| format!("invalid task id '{raw}'"))?;
140 Ok(Self::new(id))
141 }
142
143 pub fn as_str(&self) -> &str {
144 &self.0
145 }
146
147 pub fn short(&self) -> String {
148 format!("td-{}", &self.0[self.0.len() - 7..])
149 }
150
151 /// Return a display-friendly short ID from a raw ULID string.
152 pub fn display_id(raw: &str) -> String {
153 let n = raw.len();
154 if n > 7 {
155 format!("td-{}", &raw[n - 7..])
156 } else {
157 format!("td-{raw}")
158 }
159 }
160}
161
162impl fmt::Display for TaskId {
163 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
164 write!(f, "{}", self.short())
165 }
166}
167
168/// A task log entry embedded in a task record.
169#[derive(Debug, Clone, Serialize)]
170pub struct LogEntry {
171 pub id: TaskId,
172 pub timestamp: String,
173 pub message: String,
174}
175
176/// Hydrated task data from the CRDT document.
177#[derive(Debug, Clone, Serialize)]
178pub struct Task {
179 pub id: TaskId,
180 pub title: String,
181 pub description: String,
182 #[serde(rename = "type")]
183 pub task_type: String,
184 pub priority: Priority,
185 pub status: Status,
186 pub effort: Effort,
187 pub parent: Option<TaskId>,
188 pub created_at: String,
189 pub updated_at: String,
190 pub deleted_at: Option<String>,
191 pub labels: Vec<String>,
192 pub blockers: Vec<TaskId>,
193 pub logs: Vec<LogEntry>,
194}
195
196/// Result type for partitioning blockers by task state.
197#[derive(Debug, Default, Clone, Serialize)]
198pub struct BlockerPartition {
199 pub open: Vec<TaskId>,
200 pub resolved: Vec<TaskId>,
201}
202
203#[derive(Debug, Default, Clone, Serialize, Deserialize)]
204struct BindingsFile {
205 #[serde(default)]
206 bindings: BTreeMap<String, String>,
207}
208
209/// Storage wrapper around one project's Loro document and disk layout.
210#[derive(Debug, Clone)]
211pub struct Store {
212 root: PathBuf,
213 project: String,
214 doc: LoroDoc,
215}
216
217impl Store {
218 pub fn init(root: &Path, project: &str) -> Result<Self> {
219 validate_project_name(project)?;
220 let project_dir = project_dir(root, project);
221 if project_dir.exists() {
222 bail!("project '{project}' already exists");
223 }
224 fs::create_dir_all(project_dir.join(CHANGES_DIR))?;
225
226 let doc = LoroDoc::new();
227 doc.set_peer_id(load_or_create_device_peer_id(root)?)?;
228 doc.get_map("tasks");
229
230 let meta = doc.get_map("meta");
231 meta.insert("schema_version", migrate::CURRENT_SCHEMA_VERSION as i64)?;
232 meta.insert("project_id", Ulid::new().to_string())?;
233 meta.insert("created_at", now_utc())?;
234
235 let snapshot = doc
236 .export(ExportMode::Snapshot)
237 .context("failed to export initial loro snapshot")?;
238 atomic_write_file(&project_dir.join(BASE_FILE), &snapshot)?;
239
240 Ok(Self {
241 root: root.to_path_buf(),
242 project: project.to_string(),
243 doc,
244 })
245 }
246
247 pub fn open(root: &Path, project: &str) -> Result<Self> {
248 validate_project_name(project)?;
249 let project_dir = project_dir(root, project);
250 let base_path = project_dir.join(BASE_FILE);
251
252 if !base_path.exists() {
253 bail!("project '{project}' is not initialized. Run 'td project init {project}'");
254 }
255
256 let base = fs::read(&base_path)
257 .with_context(|| format!("failed to read loro snapshot '{}'", base_path.display()))?;
258
259 let doc = LoroDoc::from_snapshot(&base).context("failed to load loro snapshot")?;
260 doc.set_peer_id(load_or_create_device_peer_id(root)?)?;
261
262 let mut deltas = collect_delta_paths(&project_dir)?;
263 deltas.sort_by_key(|path| {
264 path.file_stem()
265 .and_then(|s| s.to_str())
266 .and_then(|s| Ulid::from_string(s).ok())
267 });
268
269 for delta_path in deltas {
270 let bytes = fs::read(&delta_path)
271 .with_context(|| format!("failed to read loro delta '{}'", delta_path.display()))?;
272 if let Err(err) = doc.import(&bytes) {
273 // Tolerate malformed or stale delta files as requested by design.
274 eprintln!(
275 "warning: skipping unreadable delta '{}': {err}",
276 delta_path.display()
277 );
278 }
279 }
280
281 // Apply any pending schema upgrades and persist the resulting delta
282 // so subsequent opens don't repeat the work.
283 let before_vv = doc.oplog_vv();
284 let upgraded = migrate::ensure_current(&doc)?;
285 if upgraded {
286 doc.commit();
287 let delta = doc
288 .export(ExportMode::updates(&before_vv))
289 .context("failed to export schema upgrade delta")?;
290 let filename = format!("{}.loro", Ulid::new());
291 let delta_path = project_dir.join(CHANGES_DIR).join(filename);
292 atomic_write_file(&delta_path, &delta)?;
293 }
294
295 Ok(Self {
296 root: root.to_path_buf(),
297 project: project.to_string(),
298 doc,
299 })
300 }
301
302 /// Bootstrap a local project from peer-provided delta bytes.
303 ///
304 /// The incoming delta is imported into a fresh document, validated to
305 /// ensure it carries `meta.project_id`, and then persisted as a base
306 /// snapshot for future opens.
307 pub fn bootstrap_from_peer(root: &Path, project: &str, delta: &[u8]) -> Result<Self> {
308 validate_project_name(project)?;
309 let project_dir = project_dir(root, project);
310 if project_dir.exists() {
311 bail!("project '{project}' already exists");
312 }
313 fs::create_dir_all(project_dir.join(CHANGES_DIR))?;
314
315 let doc = LoroDoc::new();
316 doc.set_peer_id(load_or_create_device_peer_id(root)?)?;
317 doc.import(delta)
318 .context("failed to import bootstrap delta from peer")?;
319 doc.commit();
320
321 read_project_id_from_doc(&doc)
322 .context("bootstrap delta is missing required project identity")?;
323
324 // Upgrade the peer's document before snapshotting so the local
325 // copy is always at CURRENT_SCHEMA_VERSION from the start.
326 migrate::ensure_current(&doc)?;
327 doc.commit();
328
329 let snapshot = doc
330 .export(ExportMode::Snapshot)
331 .context("failed to export bootstrap loro snapshot")?;
332 atomic_write_file(&project_dir.join(BASE_FILE), &snapshot)?;
333
334 Ok(Self {
335 root: root.to_path_buf(),
336 project: project.to_string(),
337 doc,
338 })
339 }
340
341 pub fn root(&self) -> &Path {
342 &self.root
343 }
344
345 pub fn project_name(&self) -> &str {
346 &self.project
347 }
348
349 pub fn doc(&self) -> &LoroDoc {
350 &self.doc
351 }
352
353 /// Export all current state to a fresh base snapshot.
354 /// Compact accumulated deltas into the base snapshot using a two-phase
355 /// protocol that is safe against concurrent writers.
356 ///
357 /// **Phase 1** — rename `changes/` to `changes.compacting.<ulid>/`, then
358 /// immediately create a fresh `changes/`. Any concurrent `td` command
359 /// that writes a delta after this point lands in the new `changes/` and is
360 /// therefore never touched by this operation.
361 ///
362 /// **Phase 2** — write a fresh base snapshot from the in-memory document
363 /// (which was loaded from both `base.loro` and every delta at `open` time),
364 /// then remove the compacting directory.
365 ///
366 /// Any orphaned `changes.compacting.*` directories left by a previously
367 /// crashed tidy are also removed: they were already merged into `self.doc`
368 /// at open time, so the new snapshot includes their contents.
369 ///
370 /// Returns the number of delta files folded into the snapshot.
371 pub fn tidy(&self) -> Result<usize> {
372 let project_dir = project_dir(&self.root, &self.project);
373 let changes_dir = project_dir.join(CHANGES_DIR);
374
375 // Phase 1: atomically hand off the current changes/ to a compacting
376 // directory so new writers have a clean home immediately.
377 let compacting_dir = project_dir.join(format!("changes.compacting.{}", Ulid::new()));
378 if changes_dir.exists() {
379 fs::rename(&changes_dir, &compacting_dir).with_context(|| {
380 format!(
381 "failed to rename '{}' to '{}'",
382 changes_dir.display(),
383 compacting_dir.display()
384 )
385 })?;
386 }
387 fs::create_dir_all(&changes_dir).context("failed to create fresh changes/")?;
388
389 // Re-import every delta from the compacting directories. self.doc
390 // was populated at open() time, but a concurrent writer may have
391 // appended a delta to changes/ between open() and the Phase 1
392 // rename — that delta is now inside compacting_dir without being in
393 // self.doc. CRDT import is idempotent (deduplicates by OpID), so
394 // re-importing already-known ops is harmless.
395 let mut compacting_deltas = collect_delta_paths(&project_dir)?;
396 compacting_deltas.sort_by_key(|path| {
397 path.file_stem()
398 .and_then(|s| s.to_str())
399 .and_then(|s| Ulid::from_string(s).ok())
400 });
401 for delta_path in &compacting_deltas {
402 if let Ok(bytes) = fs::read(delta_path) {
403 if let Err(err) = self.doc.import(&bytes) {
404 eprintln!(
405 "warning: skipping unreadable delta '{}': {err}",
406 delta_path.display()
407 );
408 }
409 }
410 }
411
412 // Phase 2: write the new base snapshot. self.doc now holds the
413 // full merged state including any concurrent deltas.
414 let out = project_dir.join(BASE_FILE);
415 let bytes = self
416 .doc
417 .export(ExportMode::Snapshot)
418 .context("failed to export loro snapshot")?;
419 atomic_write_file(&out, &bytes)?;
420
421 // Remove the compacting directory we created in phase 1 plus any
422 // orphaned changes.compacting.* dirs from previously crashed tidies.
423 let mut removed = 0usize;
424 for entry in fs::read_dir(&project_dir)
425 .with_context(|| format!("failed to read project dir '{}'", project_dir.display()))?
426 {
427 let path = entry?.path();
428 if !path.is_dir() {
429 continue;
430 }
431 let Some(name) = path.file_name().and_then(|n| n.to_str()) else {
432 continue;
433 };
434 if !name.starts_with("changes.compacting.") {
435 continue;
436 }
437 // Count files before removing for the summary report.
438 for file in fs::read_dir(&path)
439 .with_context(|| format!("failed to read '{}'", path.display()))?
440 {
441 let fp = file?.path();
442 if fp.is_file() {
443 removed += 1;
444 }
445 }
446 fs::remove_dir_all(&path)
447 .with_context(|| format!("failed to remove compacting dir '{}'", path.display()))?;
448 }
449
450 Ok(removed)
451 }
452
453 /// Apply a local mutation and persist only the resulting delta.
454 pub fn apply_and_persist<F>(&self, mutator: F) -> Result<PathBuf>
455 where
456 F: FnOnce(&LoroDoc) -> Result<()>,
457 {
458 let before = self.doc.oplog_vv();
459 mutator(&self.doc)?;
460 self.doc.commit();
461
462 let delta = self
463 .doc
464 .export(ExportMode::updates(&before))
465 .context("failed to export loro update delta")?;
466
467 let filename = format!("{}.loro", Ulid::new());
468 let path = project_dir(&self.root, &self.project)
469 .join(CHANGES_DIR)
470 .join(filename);
471 atomic_write_file(&path, &delta)?;
472 Ok(path)
473 }
474
475 /// Persist pre-built delta bytes (e.g. received from a peer) as a new
476 /// change file without re-exporting from the doc.
477 pub fn save_raw_delta(&self, bytes: &[u8]) -> Result<PathBuf> {
478 let filename = format!("{}.loro", Ulid::new());
479 let path = project_dir(&self.root, &self.project)
480 .join(CHANGES_DIR)
481 .join(filename);
482 atomic_write_file(&path, bytes)?;
483 Ok(path)
484 }
485
486 /// Return hydrated tasks, excluding tombstones.
487 pub fn list_tasks(&self) -> Result<Vec<Task>> {
488 self.list_tasks_inner(false)
489 }
490
491 /// Return hydrated tasks, including tombstoned rows.
492 pub fn list_tasks_unfiltered(&self) -> Result<Vec<Task>> {
493 self.list_tasks_inner(true)
494 }
495
496 /// Find a task by exact ULID string.
497 pub fn get_task(&self, id: &TaskId, include_deleted: bool) -> Result<Option<Task>> {
498 let tasks = if include_deleted {
499 self.list_tasks_unfiltered()?
500 } else {
501 self.list_tasks()?
502 };
503 Ok(tasks.into_iter().find(|task| task.id == *id))
504 }
505
506 fn list_tasks_inner(&self, include_deleted: bool) -> Result<Vec<Task>> {
507 let root = serde_json::to_value(self.doc.get_deep_value())?;
508 let tasks_obj = root
509 .get("tasks")
510 .and_then(Value::as_object)
511 .ok_or_else(|| anyhow!("missing root tasks map"))?;
512
513 let mut tasks = Vec::with_capacity(tasks_obj.len());
514 for (task_id_raw, task_json) in tasks_obj {
515 let task = hydrate_task(task_id_raw, task_json)?;
516 if include_deleted || task.deleted_at.is_none() {
517 tasks.push(task);
518 }
519 }
520
521 tasks.sort_by(|a, b| a.id.as_str().cmp(b.id.as_str()));
522 Ok(tasks)
523 }
524
525 /// Return the stable project identity stored in `meta.project_id`.
526 pub fn project_id(&self) -> Result<String> {
527 read_project_id_from_doc(&self.doc)
528 }
529
530 pub fn schema_version(&self) -> Result<u32> {
531 migrate::read_schema_version(&self.doc)
532 }
533}
534
535/// Generate a new task ULID.
536pub fn gen_id() -> TaskId {
537 TaskId::new(Ulid::new())
538}
539
540pub fn parse_status(s: &str) -> Result<Status> {
541 Status::parse(s)
542}
543
544pub fn parse_priority(s: &str) -> Result<Priority> {
545 Priority::parse(s)
546}
547
548pub fn parse_effort(s: &str) -> Result<Effort> {
549 Effort::parse(s)
550}
551
552pub fn status_label(s: Status) -> &'static str {
553 s.as_str()
554}
555
556pub fn priority_label(p: Priority) -> &'static str {
557 p.as_str()
558}
559
560pub fn effort_label(e: Effort) -> &'static str {
561 e.as_str()
562}
563
564pub fn data_root() -> Result<PathBuf> {
565 let home = std::env::var("HOME").context("HOME is not set")?;
566 Ok(PathBuf::from(home).join(".local").join("share").join("td"))
567}
568
569pub fn init(cwd: &Path, project: &str) -> Result<Store> {
570 let root = data_root()?;
571 fs::create_dir_all(root.join(PROJECTS_DIR))?;
572 let store = Store::init(&root, project)?;
573 bind_project(cwd, project)?;
574 Ok(store)
575}
576
577pub fn use_project(cwd: &Path, project: &str) -> Result<()> {
578 let root = data_root()?;
579 validate_project_name(project)?;
580 if !project_dir(&root, project).join(BASE_FILE).exists() {
581 bail!("project '{project}' not found. Run 'td project list' to list known projects");
582 }
583 bind_project(cwd, project)
584}
585
586pub fn open(start: &Path) -> Result<Store> {
587 let root = data_root()?;
588 let explicit = std::env::var(PROJECT_ENV).ok();
589 let project = resolve_project_name(start, &root, explicit.as_deref())?.ok_or_else(|| {
590 anyhow!(
591 "no project selected. Use --project/TD_PROJECT, run 'td project bind <name>', or run 'td project init <name>'"
592 )
593 })?;
594 Store::open(&root, &project)
595}
596
597/// Open the project selected by `--project`/`TD_PROJECT`/bindings if one exists.
598///
599/// Returns `Ok(None)` when no project is selected by any mechanism.
600pub fn try_open(start: &Path) -> Result<Option<Store>> {
601 let root = data_root()?;
602 let explicit = std::env::var(PROJECT_ENV).ok();
603 let Some(project) = resolve_project_name(start, &root, explicit.as_deref())? else {
604 return Ok(None);
605 };
606 Store::open(&root, &project).map(Some)
607}
608
609/// Bootstrap a project from a peer delta and bind the current directory.
610pub fn bootstrap_sync(cwd: &Path, project: &str, delta: &[u8]) -> Result<Store> {
611 let root = data_root()?;
612 fs::create_dir_all(root.join(PROJECTS_DIR))?;
613 validate_project_name(project)?;
614 let store = Store::bootstrap_from_peer(&root, project, delta)?;
615 bind_project(cwd, project)?;
616 Ok(store)
617}
618
619/// Bootstrap a project from a peer delta using an explicit data root.
620///
621/// Unlike [`bootstrap_sync`], this function does not consult `HOME` and is
622/// therefore safe to call from async contexts where `HOME` may vary by peer.
623///
624/// If `bind_cwd` is true, the given working directory is bound to the new
625/// project. Pass false when bootstrapping from a SyncAll context to avoid
626/// unexpectedly binding directories like the user's home.
627///
628/// Uses exclusive file locking to prevent race conditions when multiple
629/// concurrent sync operations create projects or modify bindings.
630pub fn bootstrap_sync_at(
631 data_root: &Path,
632 cwd: &Path,
633 project: &str,
634 delta: &[u8],
635 bind_cwd: bool,
636) -> Result<Store> {
637 fs::create_dir_all(data_root.join(PROJECTS_DIR))?;
638 validate_project_name(project)?;
639
640 // Exclusive lock prevents races when concurrent syncs create the same project
641 // or modify bindings simultaneously.
642 let lock_path = data_root.join(".bindings.lock");
643 let lock_file = OpenOptions::new()
644 .create(true)
645 .truncate(false)
646 .write(true)
647 .open(&lock_path)
648 .with_context(|| format!("failed to open lock file '{}'", lock_path.display()))?;
649 lock_file
650 .lock_exclusive()
651 .context("failed to acquire exclusive lock on bindings")?;
652
653 // Now holding the lock: create project and optionally update bindings atomically.
654 let store = Store::bootstrap_from_peer(data_root, project, delta)?;
655
656 if bind_cwd {
657 let canonical = fs::canonicalize(cwd)
658 .with_context(|| format!("failed to canonicalize '{}'", cwd.display()))?;
659 let mut bindings = load_bindings(data_root)?;
660 bindings
661 .bindings
662 .insert(canonical.to_string_lossy().to_string(), project.to_string());
663 save_bindings(data_root, &bindings)?;
664 }
665
666 // Lock is released when lock_file is dropped.
667 Ok(store)
668}
669
670pub fn list_projects() -> Result<Vec<String>> {
671 let root = data_root()?;
672 list_projects_in(&root)
673}
674
675/// List project names rooted at an explicit data directory.
676///
677/// Unlike [`list_projects`], this does not consult `HOME` and is therefore
678/// safe to call from async contexts where `HOME` may vary between peers.
679pub(crate) fn list_projects_in(root: &Path) -> Result<Vec<String>> {
680 let mut out = Vec::new();
681 let dir = root.join(PROJECTS_DIR);
682 if !dir.exists() {
683 return Ok(out);
684 }
685
686 for entry in fs::read_dir(dir)? {
687 let path = entry?.path();
688 if !path.is_dir() {
689 continue;
690 }
691 let Some(name) = path.file_name().and_then(|n| n.to_str()) else {
692 continue;
693 };
694 if path.join(BASE_FILE).exists() {
695 out.push(name.to_string());
696 }
697 }
698
699 out.sort();
700 Ok(out)
701}
702
703pub fn resolve_task_id(store: &Store, raw: &str, include_deleted: bool) -> Result<TaskId> {
704 let raw = raw.strip_prefix("td-").unwrap_or(raw);
705
706 if let Ok(id) = TaskId::parse(raw) {
707 if store.get_task(&id, include_deleted)?.is_some() {
708 return Ok(id);
709 }
710 }
711
712 let tasks = if include_deleted {
713 store.list_tasks_unfiltered()?
714 } else {
715 store.list_tasks()?
716 };
717
718 let upper = raw.to_ascii_uppercase();
719 let matches: Vec<TaskId> = tasks
720 .into_iter()
721 .filter(|t| t.id.as_str().ends_with(&upper))
722 .map(|t| t.id)
723 .collect();
724
725 match matches.as_slice() {
726 [] => bail!("task '{raw}' not found"),
727 [id] => Ok(id.clone()),
728 _ => bail!("task reference '{raw}' is ambiguous"),
729 }
730}
731
732pub fn partition_blockers(store: &Store, blockers: &[TaskId]) -> Result<BlockerPartition> {
733 let mut out = BlockerPartition::default();
734 for blocker in blockers {
735 let Some(task) = store.get_task(blocker, true)? else {
736 out.resolved.push(blocker.clone());
737 continue;
738 };
739 if task.status == Status::Closed || task.deleted_at.is_some() {
740 out.resolved.push(blocker.clone());
741 } else {
742 out.open.push(blocker.clone());
743 }
744 }
745 Ok(out)
746}
747
748pub fn insert_task_map(tasks: &LoroMap, task_id: &TaskId) -> Result<LoroMap> {
749 tasks
750 .insert_container(task_id.as_str(), LoroMap::new())
751 .context("failed to create task map")
752}
753
754pub fn get_task_map(tasks: &LoroMap, task_id: &TaskId) -> Result<Option<LoroMap>> {
755 match tasks.get(task_id.as_str()) {
756 Some(ValueOrContainer::Container(Container::Map(map))) => Ok(Some(map)),
757 Some(_) => bail!("task '{}' has invalid container type", task_id.as_str()),
758 None => Ok(None),
759 }
760}
761
762pub fn get_or_create_child_map(parent: &LoroMap, key: &str) -> Result<LoroMap> {
763 parent
764 .get_or_create_container(key, LoroMap::new())
765 .with_context(|| format!("failed to get or create map key '{key}'"))
766}
767
768fn bindings_path(root: &Path) -> PathBuf {
769 root.join(BINDINGS_FILE)
770}
771
772fn resolve_project_name(
773 start: &Path,
774 root: &Path,
775 explicit: Option<&str>,
776) -> Result<Option<String>> {
777 if let Some(project) = explicit {
778 validate_project_name(project)?;
779 return Ok(Some(project.to_string()));
780 }
781
782 let cwd = canonicalize_binding_path(start)?;
783 let bindings = load_bindings(root)?;
784
785 let mut best: Option<(usize, String)> = None;
786 for (raw_path, project) in bindings.bindings {
787 let bound = PathBuf::from(raw_path);
788 if is_prefix_path(&bound, &cwd) {
789 let score = bound.components().count();
790 match &best {
791 Some((best_score, _)) if *best_score >= score => {}
792 _ => best = Some((score, project)),
793 }
794 }
795 }
796
797 if let Some((_, project)) = best {
798 return Ok(Some(project));
799 }
800
801 Ok(None)
802}
803
804pub fn unbind_project(cwd: &Path) -> Result<()> {
805 let root = data_root()?;
806 let canonical = canonicalize_binding_path(cwd)?;
807 let canonical_str = canonical.to_string_lossy().to_string();
808
809 let mut bindings = load_bindings(&root)?;
810 if !bindings.bindings.contains_key(&canonical_str) {
811 bail!("path '{}' is not bound to any project", canonical.display());
812 }
813 bindings.bindings.remove(&canonical_str);
814 save_bindings(&root, &bindings)
815}
816
817pub fn delete_project(name: &str) -> Result<()> {
818 validate_project_name(name)?;
819 let root = data_root()?;
820 let proj_dir = project_dir(&root, name);
821
822 if !proj_dir.join(BASE_FILE).exists() {
823 bail!("project '{name}' not found");
824 }
825
826 fs::remove_dir_all(&proj_dir).with_context(|| {
827 format!(
828 "failed to remove project directory '{}'",
829 proj_dir.display()
830 )
831 })?;
832
833 let mut bindings = load_bindings(&root)?;
834 bindings.bindings.retain(|_, project| project != name);
835 save_bindings(&root, &bindings)
836}
837
838fn bind_project(cwd: &Path, project: &str) -> Result<()> {
839 validate_project_name(project)?;
840
841 let root = data_root()?;
842 fs::create_dir_all(&root)?;
843
844 let canonical = canonicalize_binding_path(cwd)?;
845 let mut bindings = load_bindings(&root)?;
846 bindings
847 .bindings
848 .insert(canonical.to_string_lossy().to_string(), project.to_string());
849 save_bindings(&root, &bindings)
850}
851
852fn load_bindings(root: &Path) -> Result<BindingsFile> {
853 let path = bindings_path(root);
854 if !path.exists() {
855 return Ok(BindingsFile::default());
856 }
857 let content = fs::read_to_string(&path)
858 .with_context(|| format!("failed reading bindings from '{}'", path.display()))?;
859 serde_json::from_str(&content)
860 .with_context(|| format!("invalid bindings file '{}'", path.display()))
861}
862
863fn save_bindings(root: &Path, bindings: &BindingsFile) -> Result<()> {
864 let path = bindings_path(root);
865 let bytes = serde_json::to_vec_pretty(bindings)?;
866 atomic_write_file(&path, &bytes)
867}
868
869fn canonicalize_binding_path(path: &Path) -> Result<PathBuf> {
870 fs::canonicalize(path).with_context(|| format!("failed to canonicalize '{}'", path.display()))
871}
872
873fn is_prefix_path(prefix: &Path, target: &Path) -> bool {
874 let mut prefix_components = prefix.components();
875 let mut target_components = target.components();
876
877 loop {
878 match (prefix_components.next(), target_components.next()) {
879 (None, _) => return true,
880 (Some(_), None) => return false,
881 (Some(a), Some(b)) if a == b => continue,
882 _ => return false,
883 }
884 }
885}
886
887pub fn validate_project_name(name: &str) -> Result<()> {
888 if name.is_empty() {
889 bail!("project name cannot be empty");
890 }
891 if name.contains('/') || name.contains('\\') || name == "." || name == ".." {
892 bail!("invalid project name '{name}'");
893 }
894 if name.chars().any(char::is_control) {
895 bail!("invalid project name '{name}'");
896 }
897 Ok(())
898}
899
900fn read_project_id_from_doc(doc: &LoroDoc) -> Result<String> {
901 let root = serde_json::to_value(doc.get_deep_value())?;
902 root.get("meta")
903 .and_then(|m| m.get("project_id"))
904 .and_then(Value::as_str)
905 .map(str::to_owned)
906 .ok_or_else(|| anyhow!("missing meta.project_id in project doc"))
907}
908
909fn hydrate_task(task_id_raw: &str, value: &Value) -> Result<Task> {
910 let obj = value
911 .as_object()
912 .ok_or_else(|| anyhow!("task '{task_id_raw}' is not an object"))?;
913
914 let id = TaskId::parse(task_id_raw)?;
915
916 let title = get_required_string(obj, "title")?;
917 let description = get_required_string(obj, "description")?;
918 let task_type = get_required_string(obj, "type")?;
919 let status = Status::parse(&get_required_string(obj, "status")?)?;
920 let priority = Priority::parse(&get_required_string(obj, "priority")?)?;
921 let effort = Effort::parse(&get_required_string(obj, "effort")?)?;
922 let parent = match obj.get("parent").and_then(Value::as_str) {
923 Some("") | None => None,
924 Some(raw) => Some(TaskId::parse(raw)?),
925 };
926
927 let created_at = get_required_string(obj, "created_at")?;
928 let updated_at = get_required_string(obj, "updated_at")?;
929 let deleted_at = obj
930 .get("deleted_at")
931 .and_then(Value::as_str)
932 .map(str::to_owned)
933 .filter(|s| !s.is_empty());
934
935 let labels = obj
936 .get("labels")
937 .and_then(Value::as_object)
938 .map(|m| m.keys().cloned().collect())
939 .unwrap_or_default();
940
941 let blockers = obj
942 .get("blockers")
943 .and_then(Value::as_object)
944 .map(|m| {
945 m.keys()
946 .map(|raw| TaskId::parse(raw))
947 .collect::<Result<Vec<_>>>()
948 })
949 .transpose()?
950 .unwrap_or_default();
951
952 let mut logs = obj
953 .get("logs")
954 .and_then(Value::as_object)
955 .map(|logs| {
956 logs.iter()
957 .map(|(log_id_raw, payload)| {
958 let payload_obj = payload.as_object().ok_or_else(|| {
959 anyhow!("log '{log_id_raw}' on task '{task_id_raw}' is not an object")
960 })?;
961 Ok(LogEntry {
962 id: TaskId::parse(log_id_raw)?,
963 timestamp: get_required_string(payload_obj, "timestamp")?,
964 message: get_required_string(payload_obj, "message")?,
965 })
966 })
967 .collect::<Result<Vec<_>>>()
968 })
969 .transpose()?
970 .unwrap_or_default();
971
972 logs.sort_by(|a, b| a.id.as_str().cmp(b.id.as_str()));
973
974 Ok(Task {
975 id,
976 title,
977 description,
978 task_type,
979 priority,
980 status,
981 effort,
982 parent,
983 created_at,
984 updated_at,
985 deleted_at,
986 labels,
987 blockers,
988 logs,
989 })
990}
991
992fn get_required_string(map: &serde_json::Map<String, Value>, key: &str) -> Result<String> {
993 map.get(key)
994 .and_then(Value::as_str)
995 .map(str::to_owned)
996 .ok_or_else(|| anyhow!("missing or non-string key '{key}'"))
997}
998
999fn collect_delta_paths(project_dir: &Path) -> Result<Vec<PathBuf>> {
1000 let mut paths = Vec::new();
1001 collect_changes_from_dir(&project_dir.join(CHANGES_DIR), &mut paths)?;
1002
1003 for entry in fs::read_dir(project_dir)? {
1004 let path = entry?.path();
1005 if !path.is_dir() {
1006 continue;
1007 }
1008 let Some(name) = path.file_name().and_then(|n| n.to_str()) else {
1009 continue;
1010 };
1011 if name.starts_with("changes.compacting.") {
1012 collect_changes_from_dir(&path, &mut paths)?;
1013 }
1014 }
1015
1016 Ok(paths)
1017}
1018
1019fn collect_changes_from_dir(dir: &Path, out: &mut Vec<PathBuf>) -> Result<()> {
1020 if !dir.exists() {
1021 return Ok(());
1022 }
1023
1024 for entry in fs::read_dir(dir)? {
1025 let path = entry?.path();
1026 if !path.is_file() {
1027 continue;
1028 }
1029
1030 let Some(filename) = path.file_name().and_then(|n| n.to_str()) else {
1031 continue;
1032 };
1033 if filename.ends_with(TMP_SUFFIX) || !filename.ends_with(".loro") {
1034 continue;
1035 }
1036
1037 let Some(stem) = path.file_stem().and_then(|s| s.to_str()) else {
1038 continue;
1039 };
1040 if Ulid::from_string(stem).is_err() {
1041 continue;
1042 }
1043
1044 out.push(path);
1045 }
1046
1047 Ok(())
1048}
1049
1050fn project_dir(root: &Path, project: &str) -> PathBuf {
1051 root.join(PROJECTS_DIR).join(project)
1052}
1053
1054fn load_or_create_device_peer_id(root: &Path) -> Result<PeerID> {
1055 let path = root.join("device_id");
1056 if let Some(parent) = path.parent() {
1057 fs::create_dir_all(parent)?;
1058 }
1059
1060 let device_ulid = if path.exists() {
1061 let content = fs::read_to_string(&path)
1062 .with_context(|| format!("failed reading device id from '{}'", path.display()))?;
1063 Ulid::from_string(content.trim()).context("invalid persisted device id ULID")?
1064 } else {
1065 let id = Ulid::new();
1066 atomic_write_file(&path, id.to_string().as_bytes())?;
1067 id
1068 };
1069
1070 let raw: u128 = device_ulid.into();
1071 Ok((raw & u64::MAX as u128) as u64)
1072}
1073
1074fn atomic_write_file(path: &Path, bytes: &[u8]) -> Result<()> {
1075 let parent = path
1076 .parent()
1077 .ok_or_else(|| anyhow!("cannot atomically write root path '{}'", path.display()))?;
1078 fs::create_dir_all(parent)?;
1079
1080 let tmp_name = format!(
1081 "{}.{}{}",
1082 path.file_name().and_then(|n| n.to_str()).unwrap_or("write"),
1083 Ulid::new(),
1084 TMP_SUFFIX
1085 );
1086 let tmp_path = parent.join(tmp_name);
1087
1088 {
1089 let mut file = OpenOptions::new()
1090 .create_new(true)
1091 .write(true)
1092 .open(&tmp_path)
1093 .with_context(|| format!("failed to open temp file '{}'", tmp_path.display()))?;
1094 file.write_all(bytes)?;
1095 file.sync_all()?;
1096 }
1097
1098 fs::rename(&tmp_path, path).with_context(|| {
1099 format!(
1100 "failed to atomically rename '{}' to '{}'",
1101 tmp_path.display(),
1102 path.display()
1103 )
1104 })?;
1105
1106 sync_dir(parent)?;
1107 Ok(())
1108}
1109
1110fn sync_dir(path: &Path) -> Result<()> {
1111 let dir =
1112 File::open(path).with_context(|| format!("failed opening dir '{}'", path.display()))?;
1113 dir.sync_all()
1114 .with_context(|| format!("failed fsync on dir '{}'", path.display()))?;
1115 Ok(())
1116}