1use anyhow::{anyhow, bail, Context, Result};
2use loro::{Container, ExportMode, LoroDoc, LoroMap, PeerID, ValueOrContainer};
3use serde::{Deserialize, Serialize};
4use serde_json::Value;
5use std::collections::BTreeMap;
6use std::fs::{self, File, OpenOptions};
7use std::io::Write;
8use std::path::{Path, PathBuf};
9
10use fs2::FileExt;
11use ulid::Ulid;
12
13use crate::model::{now_utc, BlockerPartition, Effort, LogEntry, Priority, Status, Task, TaskId};
14pub const PROJECT_ENV: &str = "TD_PROJECT";
15
16pub(crate) const PROJECTS_DIR: &str = "projects";
17const CHANGES_DIR: &str = "changes";
18const BINDINGS_FILE: &str = "bindings.json";
19const BASE_FILE: &str = "base.loro";
20const TMP_SUFFIX: &str = ".tmp";
21use crate::migrate;
22
23#[derive(Debug, Default, Clone, Serialize, Deserialize)]
24struct BindingsFile {
25 #[serde(default)]
26 bindings: BTreeMap<String, String>,
27}
28
29/// Storage wrapper around one project's Loro document and disk layout.
30#[derive(Debug, Clone)]
31pub struct Store {
32 root: PathBuf,
33 project: String,
34 doc: LoroDoc,
35}
36
37impl Store {
38 pub fn init(root: &Path, project: &str) -> Result<Self> {
39 validate_project_name(project)?;
40 let project_dir = project_dir(root, project);
41 if project_dir.exists() {
42 bail!("project '{project}' already exists");
43 }
44 fs::create_dir_all(project_dir.join(CHANGES_DIR))?;
45
46 let doc = LoroDoc::new();
47 doc.set_peer_id(load_or_create_device_peer_id(root)?)?;
48 doc.get_map("tasks");
49
50 let meta = doc.get_map("meta");
51 meta.insert("schema_version", migrate::CURRENT_SCHEMA_VERSION as i64)?;
52 meta.insert("project_id", Ulid::new().to_string())?;
53 meta.insert("created_at", now_utc())?;
54
55 let snapshot = doc
56 .export(ExportMode::Snapshot)
57 .context("failed to export initial loro snapshot")?;
58 atomic_write_file(&project_dir.join(BASE_FILE), &snapshot)?;
59
60 Ok(Self {
61 root: root.to_path_buf(),
62 project: project.to_string(),
63 doc,
64 })
65 }
66
67 pub fn open(root: &Path, project: &str) -> Result<Self> {
68 validate_project_name(project)?;
69 let project_dir = project_dir(root, project);
70 let base_path = project_dir.join(BASE_FILE);
71
72 if !base_path.exists() {
73 bail!("project '{project}' is not initialized. Run 'td project init {project}'");
74 }
75
76 let base = fs::read(&base_path)
77 .with_context(|| format!("failed to read loro snapshot '{}'", base_path.display()))?;
78
79 let doc = LoroDoc::from_snapshot(&base).context("failed to load loro snapshot")?;
80 doc.set_peer_id(load_or_create_device_peer_id(root)?)?;
81
82 let mut deltas = collect_delta_paths(&project_dir)?;
83 deltas.sort_by_key(|path| {
84 path.file_stem()
85 .and_then(|s| s.to_str())
86 .and_then(|s| Ulid::from_string(s).ok())
87 });
88
89 for delta_path in deltas {
90 let bytes = fs::read(&delta_path)
91 .with_context(|| format!("failed to read loro delta '{}'", delta_path.display()))?;
92 if let Err(err) = doc.import(&bytes) {
93 // Tolerate malformed or stale delta files as requested by design.
94 eprintln!(
95 "warning: skipping unreadable delta '{}': {err}",
96 delta_path.display()
97 );
98 }
99 }
100
101 // Apply any pending schema upgrades and persist the resulting delta
102 // so subsequent opens don't repeat the work.
103 let before_vv = doc.oplog_vv();
104 let upgraded = migrate::ensure_current(&doc)?;
105 if upgraded {
106 doc.commit();
107 let delta = doc
108 .export(ExportMode::updates(&before_vv))
109 .context("failed to export schema upgrade delta")?;
110 let filename = format!("{}.loro", Ulid::new());
111 let delta_path = project_dir.join(CHANGES_DIR).join(filename);
112 atomic_write_file(&delta_path, &delta)?;
113 }
114
115 Ok(Self {
116 root: root.to_path_buf(),
117 project: project.to_string(),
118 doc,
119 })
120 }
121
122 /// Bootstrap a local project from peer-provided delta bytes.
123 ///
124 /// The incoming delta is imported into a fresh document, validated to
125 /// ensure it carries `meta.project_id`, and then persisted as a base
126 /// snapshot for future opens.
127 pub fn bootstrap_from_peer(root: &Path, project: &str, delta: &[u8]) -> Result<Self> {
128 validate_project_name(project)?;
129 let project_dir = project_dir(root, project);
130 if project_dir.exists() {
131 bail!("project '{project}' already exists");
132 }
133 fs::create_dir_all(project_dir.join(CHANGES_DIR))?;
134
135 let doc = LoroDoc::new();
136 doc.set_peer_id(load_or_create_device_peer_id(root)?)?;
137 doc.import(delta)
138 .context("failed to import bootstrap delta from peer")?;
139 doc.commit();
140
141 read_project_id_from_doc(&doc)
142 .context("bootstrap delta is missing required project identity")?;
143
144 // Upgrade the peer's document before snapshotting so the local
145 // copy is always at CURRENT_SCHEMA_VERSION from the start.
146 migrate::ensure_current(&doc)?;
147 doc.commit();
148
149 let snapshot = doc
150 .export(ExportMode::Snapshot)
151 .context("failed to export bootstrap loro snapshot")?;
152 atomic_write_file(&project_dir.join(BASE_FILE), &snapshot)?;
153
154 Ok(Self {
155 root: root.to_path_buf(),
156 project: project.to_string(),
157 doc,
158 })
159 }
160
161 pub fn root(&self) -> &Path {
162 &self.root
163 }
164
165 pub fn project_name(&self) -> &str {
166 &self.project
167 }
168
169 pub fn doc(&self) -> &LoroDoc {
170 &self.doc
171 }
172
173 /// Export all current state to a fresh base snapshot.
174 /// Compact accumulated deltas into the base snapshot using a two-phase
175 /// protocol that is safe against concurrent writers.
176 ///
177 /// **Phase 1** — rename `changes/` to `changes.compacting.<ulid>/`, then
178 /// immediately create a fresh `changes/`. Any concurrent `td` command
179 /// that writes a delta after this point lands in the new `changes/` and is
180 /// therefore never touched by this operation.
181 ///
182 /// **Phase 2** — write a fresh base snapshot from the in-memory document
183 /// (which was loaded from both `base.loro` and every delta at `open` time),
184 /// then remove the compacting directory.
185 ///
186 /// Any orphaned `changes.compacting.*` directories left by a previously
187 /// crashed tidy are also removed: they were already merged into `self.doc`
188 /// at open time, so the new snapshot includes their contents.
189 ///
190 /// Returns the number of delta files folded into the snapshot.
191 pub fn tidy(&self) -> Result<usize> {
192 let project_dir = project_dir(&self.root, &self.project);
193 let changes_dir = project_dir.join(CHANGES_DIR);
194
195 // Phase 1: atomically hand off the current changes/ to a compacting
196 // directory so new writers have a clean home immediately.
197 let compacting_dir = project_dir.join(format!("changes.compacting.{}", Ulid::new()));
198 if changes_dir.exists() {
199 fs::rename(&changes_dir, &compacting_dir).with_context(|| {
200 format!(
201 "failed to rename '{}' to '{}'",
202 changes_dir.display(),
203 compacting_dir.display()
204 )
205 })?;
206 }
207 fs::create_dir_all(&changes_dir).context("failed to create fresh changes/")?;
208
209 // Re-import every delta from the compacting directories. self.doc
210 // was populated at open() time, but a concurrent writer may have
211 // appended a delta to changes/ between open() and the Phase 1
212 // rename — that delta is now inside compacting_dir without being in
213 // self.doc. CRDT import is idempotent (deduplicates by OpID), so
214 // re-importing already-known ops is harmless.
215 let mut compacting_deltas = collect_delta_paths(&project_dir)?;
216 compacting_deltas.sort_by_key(|path| {
217 path.file_stem()
218 .and_then(|s| s.to_str())
219 .and_then(|s| Ulid::from_string(s).ok())
220 });
221 for delta_path in &compacting_deltas {
222 if let Ok(bytes) = fs::read(delta_path) {
223 if let Err(err) = self.doc.import(&bytes) {
224 eprintln!(
225 "warning: skipping unreadable delta '{}': {err}",
226 delta_path.display()
227 );
228 }
229 }
230 }
231
232 // Phase 2: write the new base snapshot. self.doc now holds the
233 // full merged state including any concurrent deltas.
234 let out = project_dir.join(BASE_FILE);
235 let bytes = self
236 .doc
237 .export(ExportMode::Snapshot)
238 .context("failed to export loro snapshot")?;
239 atomic_write_file(&out, &bytes)?;
240
241 // Remove the compacting directory we created in phase 1 plus any
242 // orphaned changes.compacting.* dirs from previously crashed tidies.
243 let mut removed = 0usize;
244 for entry in fs::read_dir(&project_dir)
245 .with_context(|| format!("failed to read project dir '{}'", project_dir.display()))?
246 {
247 let path = entry?.path();
248 if !path.is_dir() {
249 continue;
250 }
251 let Some(name) = path.file_name().and_then(|n| n.to_str()) else {
252 continue;
253 };
254 if !name.starts_with("changes.compacting.") {
255 continue;
256 }
257 // Count files before removing for the summary report.
258 for file in fs::read_dir(&path)
259 .with_context(|| format!("failed to read '{}'", path.display()))?
260 {
261 let fp = file?.path();
262 if fp.is_file() {
263 removed += 1;
264 }
265 }
266 fs::remove_dir_all(&path)
267 .with_context(|| format!("failed to remove compacting dir '{}'", path.display()))?;
268 }
269
270 Ok(removed)
271 }
272
273 /// Apply a local mutation and persist only the resulting delta.
274 pub fn apply_and_persist<F>(&self, mutator: F) -> Result<PathBuf>
275 where
276 F: FnOnce(&LoroDoc) -> Result<()>,
277 {
278 let before = self.doc.oplog_vv();
279 mutator(&self.doc)?;
280 self.doc.commit();
281
282 let delta = self
283 .doc
284 .export(ExportMode::updates(&before))
285 .context("failed to export loro update delta")?;
286
287 let filename = format!("{}.loro", Ulid::new());
288 let path = project_dir(&self.root, &self.project)
289 .join(CHANGES_DIR)
290 .join(filename);
291 atomic_write_file(&path, &delta)?;
292 Ok(path)
293 }
294
295 /// Persist pre-built delta bytes (e.g. received from a peer) as a new
296 /// change file without re-exporting from the doc.
297 pub fn save_raw_delta(&self, bytes: &[u8]) -> Result<PathBuf> {
298 let filename = format!("{}.loro", Ulid::new());
299 let path = project_dir(&self.root, &self.project)
300 .join(CHANGES_DIR)
301 .join(filename);
302 atomic_write_file(&path, bytes)?;
303 Ok(path)
304 }
305
306 /// Return hydrated tasks, excluding tombstones.
307 pub fn list_tasks(&self) -> Result<Vec<Task>> {
308 self.list_tasks_inner(false)
309 }
310
311 /// Return hydrated tasks, including tombstoned rows.
312 pub fn list_tasks_unfiltered(&self) -> Result<Vec<Task>> {
313 self.list_tasks_inner(true)
314 }
315
316 /// Find a task by exact ULID string.
317 pub fn get_task(&self, id: &TaskId, include_deleted: bool) -> Result<Option<Task>> {
318 let tasks = if include_deleted {
319 self.list_tasks_unfiltered()?
320 } else {
321 self.list_tasks()?
322 };
323 Ok(tasks.into_iter().find(|task| task.id == *id))
324 }
325
326 fn list_tasks_inner(&self, include_deleted: bool) -> Result<Vec<Task>> {
327 let root = serde_json::to_value(self.doc.get_deep_value())?;
328 let tasks_obj = root
329 .get("tasks")
330 .and_then(Value::as_object)
331 .ok_or_else(|| anyhow!("missing root tasks map"))?;
332
333 let mut tasks = Vec::with_capacity(tasks_obj.len());
334 for (task_id_raw, task_json) in tasks_obj {
335 let task = hydrate_task(task_id_raw, task_json)?;
336 if include_deleted || task.deleted_at.is_none() {
337 tasks.push(task);
338 }
339 }
340
341 tasks.sort_by(|a, b| a.id.as_str().cmp(b.id.as_str()));
342 Ok(tasks)
343 }
344
345 /// Return the stable project identity stored in `meta.project_id`.
346 pub fn project_id(&self) -> Result<String> {
347 read_project_id_from_doc(&self.doc)
348 }
349
350 pub fn schema_version(&self) -> Result<u32> {
351 migrate::read_schema_version(&self.doc)
352 }
353}
354
355pub fn data_root() -> Result<PathBuf> {
356 let home = std::env::var("HOME").context("HOME is not set")?;
357 Ok(PathBuf::from(home).join(".local").join("share").join("td"))
358}
359
360pub fn init(cwd: &Path, project: &str) -> Result<Store> {
361 let root = data_root()?;
362 fs::create_dir_all(root.join(PROJECTS_DIR))?;
363 let store = Store::init(&root, project)?;
364 bind_project(cwd, project)?;
365 Ok(store)
366}
367
368pub fn use_project(cwd: &Path, project: &str) -> Result<()> {
369 let root = data_root()?;
370 validate_project_name(project)?;
371 if !project_dir(&root, project).join(BASE_FILE).exists() {
372 bail!("project '{project}' not found. Run 'td project list' to list known projects");
373 }
374 bind_project(cwd, project)
375}
376
377pub fn open(start: &Path) -> Result<Store> {
378 let root = data_root()?;
379 let explicit = std::env::var(PROJECT_ENV).ok();
380 let project = resolve_project_name(start, &root, explicit.as_deref())?.ok_or_else(|| {
381 anyhow!(
382 "no project selected. Use --project/TD_PROJECT, run 'td project bind <name>', or run 'td project init <name>'"
383 )
384 })?;
385 Store::open(&root, &project)
386}
387
388/// Open the project selected by `--project`/`TD_PROJECT`/bindings if one exists.
389///
390/// Returns `Ok(None)` when no project is selected by any mechanism.
391pub fn try_open(start: &Path) -> Result<Option<Store>> {
392 let root = data_root()?;
393 let explicit = std::env::var(PROJECT_ENV).ok();
394 let Some(project) = resolve_project_name(start, &root, explicit.as_deref())? else {
395 return Ok(None);
396 };
397 Store::open(&root, &project).map(Some)
398}
399
400/// Bootstrap a project from a peer delta and bind the current directory.
401pub fn bootstrap_sync(cwd: &Path, project: &str, delta: &[u8]) -> Result<Store> {
402 let root = data_root()?;
403 fs::create_dir_all(root.join(PROJECTS_DIR))?;
404 validate_project_name(project)?;
405 let store = Store::bootstrap_from_peer(&root, project, delta)?;
406 bind_project(cwd, project)?;
407 Ok(store)
408}
409
410/// Bootstrap a project from a peer delta using an explicit data root.
411///
412/// Unlike [`bootstrap_sync`], this function does not consult `HOME` and is
413/// therefore safe to call from async contexts where `HOME` may vary by peer.
414///
415/// If `bind_cwd` is true, the given working directory is bound to the new
416/// project. Pass false when bootstrapping from a SyncAll context to avoid
417/// unexpectedly binding directories like the user's home.
418///
419/// Uses exclusive file locking to prevent race conditions when multiple
420/// concurrent sync operations create projects or modify bindings.
421pub fn bootstrap_sync_at(
422 data_root: &Path,
423 cwd: &Path,
424 project: &str,
425 delta: &[u8],
426 bind_cwd: bool,
427) -> Result<Store> {
428 fs::create_dir_all(data_root.join(PROJECTS_DIR))?;
429 validate_project_name(project)?;
430
431 // Exclusive lock prevents races when concurrent syncs create the same project
432 // or modify bindings simultaneously.
433 let lock_path = data_root.join(".bindings.lock");
434 let lock_file = OpenOptions::new()
435 .create(true)
436 .truncate(false)
437 .write(true)
438 .open(&lock_path)
439 .with_context(|| format!("failed to open lock file '{}'", lock_path.display()))?;
440 lock_file
441 .lock_exclusive()
442 .context("failed to acquire exclusive lock on bindings")?;
443
444 // Now holding the lock: create project and optionally update bindings atomically.
445 let store = Store::bootstrap_from_peer(data_root, project, delta)?;
446
447 if bind_cwd {
448 let canonical = fs::canonicalize(cwd)
449 .with_context(|| format!("failed to canonicalize '{}'", cwd.display()))?;
450 let mut bindings = load_bindings(data_root)?;
451 bindings
452 .bindings
453 .insert(canonical.to_string_lossy().to_string(), project.to_string());
454 save_bindings(data_root, &bindings)?;
455 }
456
457 // Lock is released when lock_file is dropped.
458 Ok(store)
459}
460
461pub fn list_projects() -> Result<Vec<String>> {
462 let root = data_root()?;
463 list_projects_in(&root)
464}
465
466/// List project names rooted at an explicit data directory.
467///
468/// Unlike [`list_projects`], this does not consult `HOME` and is therefore
469/// safe to call from async contexts where `HOME` may vary between peers.
470pub(crate) fn list_projects_in(root: &Path) -> Result<Vec<String>> {
471 let mut out = Vec::new();
472 let dir = root.join(PROJECTS_DIR);
473 if !dir.exists() {
474 return Ok(out);
475 }
476
477 for entry in fs::read_dir(dir)? {
478 let path = entry?.path();
479 if !path.is_dir() {
480 continue;
481 }
482 let Some(name) = path.file_name().and_then(|n| n.to_str()) else {
483 continue;
484 };
485 if path.join(BASE_FILE).exists() {
486 out.push(name.to_string());
487 }
488 }
489
490 out.sort();
491 Ok(out)
492}
493
494pub fn resolve_task_id(store: &Store, raw: &str, include_deleted: bool) -> Result<TaskId> {
495 let raw = raw.strip_prefix("td-").unwrap_or(raw);
496
497 if let Ok(id) = TaskId::parse(raw) {
498 if store.get_task(&id, include_deleted)?.is_some() {
499 return Ok(id);
500 }
501 }
502
503 let tasks = if include_deleted {
504 store.list_tasks_unfiltered()?
505 } else {
506 store.list_tasks()?
507 };
508
509 let upper = raw.to_ascii_uppercase();
510 let matches: Vec<TaskId> = tasks
511 .into_iter()
512 .filter(|t| t.id.as_str().ends_with(&upper))
513 .map(|t| t.id)
514 .collect();
515
516 match matches.as_slice() {
517 [] => bail!("task '{raw}' not found"),
518 [id] => Ok(id.clone()),
519 _ => bail!("task reference '{raw}' is ambiguous"),
520 }
521}
522
523pub fn partition_blockers(store: &Store, blockers: &[TaskId]) -> Result<BlockerPartition> {
524 let mut out = BlockerPartition::default();
525 for blocker in blockers {
526 let Some(task) = store.get_task(blocker, true)? else {
527 out.resolved.push(blocker.clone());
528 continue;
529 };
530 if task.status == Status::Closed || task.deleted_at.is_some() {
531 out.resolved.push(blocker.clone());
532 } else {
533 out.open.push(blocker.clone());
534 }
535 }
536 Ok(out)
537}
538
539pub fn insert_task_map(tasks: &LoroMap, task_id: &TaskId) -> Result<LoroMap> {
540 tasks
541 .insert_container(task_id.as_str(), LoroMap::new())
542 .context("failed to create task map")
543}
544
545pub fn get_task_map(tasks: &LoroMap, task_id: &TaskId) -> Result<Option<LoroMap>> {
546 match tasks.get(task_id.as_str()) {
547 Some(ValueOrContainer::Container(Container::Map(map))) => Ok(Some(map)),
548 Some(_) => bail!("task '{}' has invalid container type", task_id.as_str()),
549 None => Ok(None),
550 }
551}
552
553pub fn get_or_create_child_map(parent: &LoroMap, key: &str) -> Result<LoroMap> {
554 parent
555 .get_or_create_container(key, LoroMap::new())
556 .with_context(|| format!("failed to get or create map key '{key}'"))
557}
558
559fn bindings_path(root: &Path) -> PathBuf {
560 root.join(BINDINGS_FILE)
561}
562
563fn resolve_project_name(
564 start: &Path,
565 root: &Path,
566 explicit: Option<&str>,
567) -> Result<Option<String>> {
568 if let Some(project) = explicit {
569 validate_project_name(project)?;
570 return Ok(Some(project.to_string()));
571 }
572
573 let cwd = canonicalize_binding_path(start)?;
574 let bindings = load_bindings(root)?;
575
576 let mut best: Option<(usize, String)> = None;
577 for (raw_path, project) in bindings.bindings {
578 let bound = PathBuf::from(raw_path);
579 if is_prefix_path(&bound, &cwd) {
580 let score = bound.components().count();
581 match &best {
582 Some((best_score, _)) if *best_score >= score => {}
583 _ => best = Some((score, project)),
584 }
585 }
586 }
587
588 if let Some((_, project)) = best {
589 return Ok(Some(project));
590 }
591
592 Ok(None)
593}
594
595pub fn unbind_project(cwd: &Path) -> Result<()> {
596 let root = data_root()?;
597 let canonical = canonicalize_binding_path(cwd)?;
598 let canonical_str = canonical.to_string_lossy().to_string();
599
600 let mut bindings = load_bindings(&root)?;
601 if !bindings.bindings.contains_key(&canonical_str) {
602 bail!("path '{}' is not bound to any project", canonical.display());
603 }
604 bindings.bindings.remove(&canonical_str);
605 save_bindings(&root, &bindings)
606}
607
608pub fn delete_project(name: &str) -> Result<()> {
609 validate_project_name(name)?;
610 let root = data_root()?;
611 let proj_dir = project_dir(&root, name);
612
613 if !proj_dir.join(BASE_FILE).exists() {
614 bail!("project '{name}' not found");
615 }
616
617 fs::remove_dir_all(&proj_dir).with_context(|| {
618 format!(
619 "failed to remove project directory '{}'",
620 proj_dir.display()
621 )
622 })?;
623
624 let mut bindings = load_bindings(&root)?;
625 bindings.bindings.retain(|_, project| project != name);
626 save_bindings(&root, &bindings)
627}
628
629fn bind_project(cwd: &Path, project: &str) -> Result<()> {
630 validate_project_name(project)?;
631
632 let root = data_root()?;
633 fs::create_dir_all(&root)?;
634
635 let canonical = canonicalize_binding_path(cwd)?;
636 let mut bindings = load_bindings(&root)?;
637 bindings
638 .bindings
639 .insert(canonical.to_string_lossy().to_string(), project.to_string());
640 save_bindings(&root, &bindings)
641}
642
643fn load_bindings(root: &Path) -> Result<BindingsFile> {
644 let path = bindings_path(root);
645 if !path.exists() {
646 return Ok(BindingsFile::default());
647 }
648 let content = fs::read_to_string(&path)
649 .with_context(|| format!("failed reading bindings from '{}'", path.display()))?;
650 serde_json::from_str(&content)
651 .with_context(|| format!("invalid bindings file '{}'", path.display()))
652}
653
654fn save_bindings(root: &Path, bindings: &BindingsFile) -> Result<()> {
655 let path = bindings_path(root);
656 let bytes = serde_json::to_vec_pretty(bindings)?;
657 atomic_write_file(&path, &bytes)
658}
659
660fn canonicalize_binding_path(path: &Path) -> Result<PathBuf> {
661 fs::canonicalize(path).with_context(|| format!("failed to canonicalize '{}'", path.display()))
662}
663
664fn is_prefix_path(prefix: &Path, target: &Path) -> bool {
665 let mut prefix_components = prefix.components();
666 let mut target_components = target.components();
667
668 loop {
669 match (prefix_components.next(), target_components.next()) {
670 (None, _) => return true,
671 (Some(_), None) => return false,
672 (Some(a), Some(b)) if a == b => continue,
673 _ => return false,
674 }
675 }
676}
677
678pub fn validate_project_name(name: &str) -> Result<()> {
679 if name.is_empty() {
680 bail!("project name cannot be empty");
681 }
682 if name.contains('/') || name.contains('\\') || name == "." || name == ".." {
683 bail!("invalid project name '{name}'");
684 }
685 if name.chars().any(char::is_control) {
686 bail!("invalid project name '{name}'");
687 }
688 Ok(())
689}
690
691fn read_project_id_from_doc(doc: &LoroDoc) -> Result<String> {
692 let root = serde_json::to_value(doc.get_deep_value())?;
693 root.get("meta")
694 .and_then(|m| m.get("project_id"))
695 .and_then(Value::as_str)
696 .map(str::to_owned)
697 .ok_or_else(|| anyhow!("missing meta.project_id in project doc"))
698}
699
700fn hydrate_task(task_id_raw: &str, value: &Value) -> Result<Task> {
701 let obj = value
702 .as_object()
703 .ok_or_else(|| anyhow!("task '{task_id_raw}' is not an object"))?;
704
705 let id = TaskId::parse(task_id_raw)?;
706
707 let title = get_required_string(obj, "title")?;
708 let description = get_required_string(obj, "description")?;
709 let task_type = get_required_string(obj, "type")?;
710 let status = Status::parse(&get_required_string(obj, "status")?)?;
711 let priority = Priority::parse(&get_required_string(obj, "priority")?)?;
712 let effort = Effort::parse(&get_required_string(obj, "effort")?)?;
713 let parent = match obj.get("parent").and_then(Value::as_str) {
714 Some("") | None => None,
715 Some(raw) => Some(TaskId::parse(raw)?),
716 };
717
718 let created_at = get_required_string(obj, "created_at")?;
719 let updated_at = get_required_string(obj, "updated_at")?;
720 let deleted_at = obj
721 .get("deleted_at")
722 .and_then(Value::as_str)
723 .map(str::to_owned)
724 .filter(|s| !s.is_empty());
725
726 let labels = obj
727 .get("labels")
728 .and_then(Value::as_object)
729 .map(|m| m.keys().cloned().collect())
730 .unwrap_or_default();
731
732 let blockers = obj
733 .get("blockers")
734 .and_then(Value::as_object)
735 .map(|m| {
736 m.keys()
737 .map(|raw| TaskId::parse(raw))
738 .collect::<Result<Vec<_>>>()
739 })
740 .transpose()?
741 .unwrap_or_default();
742
743 let mut logs = obj
744 .get("logs")
745 .and_then(Value::as_object)
746 .map(|logs| {
747 logs.iter()
748 .map(|(log_id_raw, payload)| {
749 let payload_obj = payload.as_object().ok_or_else(|| {
750 anyhow!("log '{log_id_raw}' on task '{task_id_raw}' is not an object")
751 })?;
752 Ok(LogEntry {
753 id: TaskId::parse(log_id_raw)?,
754 timestamp: get_required_string(payload_obj, "timestamp")?,
755 message: get_required_string(payload_obj, "message")?,
756 })
757 })
758 .collect::<Result<Vec<_>>>()
759 })
760 .transpose()?
761 .unwrap_or_default();
762
763 logs.sort_by(|a, b| a.id.as_str().cmp(b.id.as_str()));
764
765 Ok(Task {
766 id,
767 title,
768 description,
769 task_type,
770 priority,
771 status,
772 effort,
773 parent,
774 created_at,
775 updated_at,
776 deleted_at,
777 labels,
778 blockers,
779 logs,
780 })
781}
782
783fn get_required_string(map: &serde_json::Map<String, Value>, key: &str) -> Result<String> {
784 map.get(key)
785 .and_then(Value::as_str)
786 .map(str::to_owned)
787 .ok_or_else(|| anyhow!("missing or non-string key '{key}'"))
788}
789
790fn collect_delta_paths(project_dir: &Path) -> Result<Vec<PathBuf>> {
791 let mut paths = Vec::new();
792 collect_changes_from_dir(&project_dir.join(CHANGES_DIR), &mut paths)?;
793
794 for entry in fs::read_dir(project_dir)? {
795 let path = entry?.path();
796 if !path.is_dir() {
797 continue;
798 }
799 let Some(name) = path.file_name().and_then(|n| n.to_str()) else {
800 continue;
801 };
802 if name.starts_with("changes.compacting.") {
803 collect_changes_from_dir(&path, &mut paths)?;
804 }
805 }
806
807 Ok(paths)
808}
809
810fn collect_changes_from_dir(dir: &Path, out: &mut Vec<PathBuf>) -> Result<()> {
811 if !dir.exists() {
812 return Ok(());
813 }
814
815 for entry in fs::read_dir(dir)? {
816 let path = entry?.path();
817 if !path.is_file() {
818 continue;
819 }
820
821 let Some(filename) = path.file_name().and_then(|n| n.to_str()) else {
822 continue;
823 };
824 if filename.ends_with(TMP_SUFFIX) || !filename.ends_with(".loro") {
825 continue;
826 }
827
828 let Some(stem) = path.file_stem().and_then(|s| s.to_str()) else {
829 continue;
830 };
831 if Ulid::from_string(stem).is_err() {
832 continue;
833 }
834
835 out.push(path);
836 }
837
838 Ok(())
839}
840
841fn project_dir(root: &Path, project: &str) -> PathBuf {
842 root.join(PROJECTS_DIR).join(project)
843}
844
845fn load_or_create_device_peer_id(root: &Path) -> Result<PeerID> {
846 let path = root.join("device_id");
847 if let Some(parent) = path.parent() {
848 fs::create_dir_all(parent)?;
849 }
850
851 let device_ulid = if path.exists() {
852 let content = fs::read_to_string(&path)
853 .with_context(|| format!("failed reading device id from '{}'", path.display()))?;
854 Ulid::from_string(content.trim()).context("invalid persisted device id ULID")?
855 } else {
856 let id = Ulid::new();
857 atomic_write_file(&path, id.to_string().as_bytes())?;
858 id
859 };
860
861 let raw: u128 = device_ulid.into();
862 Ok((raw & u64::MAX as u128) as u64)
863}
864
865fn atomic_write_file(path: &Path, bytes: &[u8]) -> Result<()> {
866 let parent = path
867 .parent()
868 .ok_or_else(|| anyhow!("cannot atomically write root path '{}'", path.display()))?;
869 fs::create_dir_all(parent)?;
870
871 let tmp_name = format!(
872 "{}.{}{}",
873 path.file_name().and_then(|n| n.to_str()).unwrap_or("write"),
874 Ulid::new(),
875 TMP_SUFFIX
876 );
877 let tmp_path = parent.join(tmp_name);
878
879 {
880 let mut file = OpenOptions::new()
881 .create_new(true)
882 .write(true)
883 .open(&tmp_path)
884 .with_context(|| format!("failed to open temp file '{}'", tmp_path.display()))?;
885 file.write_all(bytes)?;
886 file.sync_all()?;
887 }
888
889 fs::rename(&tmp_path, path).with_context(|| {
890 format!(
891 "failed to atomically rename '{}' to '{}'",
892 tmp_path.display(),
893 path.display()
894 )
895 })?;
896
897 sync_dir(parent)?;
898 Ok(())
899}
900
901fn sync_dir(path: &Path) -> Result<()> {
902 let dir =
903 File::open(path).with_context(|| format!("failed opening dir '{}'", path.display()))?;
904 dir.sync_all()
905 .with_context(|| format!("failed fsync on dir '{}'", path.display()))?;
906 Ok(())
907}