db.rs

  1use anyhow::{anyhow, bail, Context, Result};
  2use loro::{Container, ExportMode, LoroDoc, LoroMap, PeerID, ValueOrContainer};
  3use serde::{Deserialize, Serialize};
  4use serde_json::Value;
  5use std::collections::BTreeMap;
  6use std::fs::{self, File, OpenOptions};
  7use std::io::Write;
  8use std::path::{Path, PathBuf};
  9
 10use fs2::FileExt;
 11use ulid::Ulid;
 12
 13use crate::model::{now_utc, BlockerPartition, Effort, LogEntry, Priority, Status, Task, TaskId};
 14pub const PROJECT_ENV: &str = "TD_PROJECT";
 15
 16pub(crate) const PROJECTS_DIR: &str = "projects";
 17const CHANGES_DIR: &str = "changes";
 18const BINDINGS_FILE: &str = "bindings.json";
 19const BASE_FILE: &str = "base.loro";
 20const TMP_SUFFIX: &str = ".tmp";
 21use crate::migrate;
 22
 23#[derive(Debug, Default, Clone, Serialize, Deserialize)]
 24struct BindingsFile {
 25    #[serde(default)]
 26    bindings: BTreeMap<String, String>,
 27}
 28
 29/// Storage wrapper around one project's Loro document and disk layout.
 30#[derive(Debug, Clone)]
 31pub struct Store {
 32    root: PathBuf,
 33    project: String,
 34    doc: LoroDoc,
 35}
 36
 37impl Store {
 38    pub fn init(root: &Path, project: &str) -> Result<Self> {
 39        validate_project_name(project)?;
 40        let project_dir = project_dir(root, project);
 41        if project_dir.exists() {
 42            bail!("project '{project}' already exists");
 43        }
 44        fs::create_dir_all(project_dir.join(CHANGES_DIR))?;
 45
 46        let doc = LoroDoc::new();
 47        doc.set_peer_id(load_or_create_device_peer_id(root)?)?;
 48        doc.get_map("tasks");
 49
 50        let meta = doc.get_map("meta");
 51        meta.insert("schema_version", migrate::CURRENT_SCHEMA_VERSION as i64)?;
 52        meta.insert("project_id", Ulid::new().to_string())?;
 53        meta.insert("created_at", now_utc())?;
 54
 55        let snapshot = doc
 56            .export(ExportMode::Snapshot)
 57            .context("failed to export initial loro snapshot")?;
 58        atomic_write_file(&project_dir.join(BASE_FILE), &snapshot)?;
 59
 60        Ok(Self {
 61            root: root.to_path_buf(),
 62            project: project.to_string(),
 63            doc,
 64        })
 65    }
 66
 67    pub fn open(root: &Path, project: &str) -> Result<Self> {
 68        validate_project_name(project)?;
 69        let project_dir = project_dir(root, project);
 70        let base_path = project_dir.join(BASE_FILE);
 71
 72        if !base_path.exists() {
 73            bail!("project '{project}' is not initialized. Run 'td project init {project}'");
 74        }
 75
 76        let base = fs::read(&base_path)
 77            .with_context(|| format!("failed to read loro snapshot '{}'", base_path.display()))?;
 78
 79        let doc = LoroDoc::from_snapshot(&base).context("failed to load loro snapshot")?;
 80        doc.set_peer_id(load_or_create_device_peer_id(root)?)?;
 81
 82        let mut deltas = collect_delta_paths(&project_dir)?;
 83        deltas.sort_by_key(|path| {
 84            path.file_stem()
 85                .and_then(|s| s.to_str())
 86                .and_then(|s| Ulid::from_string(s).ok())
 87        });
 88
 89        for delta_path in deltas {
 90            let bytes = fs::read(&delta_path)
 91                .with_context(|| format!("failed to read loro delta '{}'", delta_path.display()))?;
 92            if let Err(err) = doc.import(&bytes) {
 93                // Tolerate malformed or stale delta files as requested by design.
 94                eprintln!(
 95                    "warning: skipping unreadable delta '{}': {err}",
 96                    delta_path.display()
 97                );
 98            }
 99        }
100
101        // Apply any pending schema upgrades and persist the resulting delta
102        // so subsequent opens don't repeat the work.
103        let before_vv = doc.oplog_vv();
104        let upgraded = migrate::ensure_current(&doc)?;
105        if upgraded {
106            doc.commit();
107            let delta = doc
108                .export(ExportMode::updates(&before_vv))
109                .context("failed to export schema upgrade delta")?;
110            let filename = format!("{}.loro", Ulid::new());
111            let delta_path = project_dir.join(CHANGES_DIR).join(filename);
112            atomic_write_file(&delta_path, &delta)?;
113        }
114
115        Ok(Self {
116            root: root.to_path_buf(),
117            project: project.to_string(),
118            doc,
119        })
120    }
121
122    /// Bootstrap a local project from peer-provided delta bytes.
123    ///
124    /// The incoming delta is imported into a fresh document, validated to
125    /// ensure it carries `meta.project_id`, and then persisted as a base
126    /// snapshot for future opens.
127    pub fn bootstrap_from_peer(root: &Path, project: &str, delta: &[u8]) -> Result<Self> {
128        validate_project_name(project)?;
129        let project_dir = project_dir(root, project);
130        if project_dir.exists() {
131            bail!("project '{project}' already exists");
132        }
133        fs::create_dir_all(project_dir.join(CHANGES_DIR))?;
134
135        let doc = LoroDoc::new();
136        doc.set_peer_id(load_or_create_device_peer_id(root)?)?;
137        doc.import(delta)
138            .context("failed to import bootstrap delta from peer")?;
139        doc.commit();
140
141        read_project_id_from_doc(&doc)
142            .context("bootstrap delta is missing required project identity")?;
143
144        // Upgrade the peer's document before snapshotting so the local
145        // copy is always at CURRENT_SCHEMA_VERSION from the start.
146        migrate::ensure_current(&doc)?;
147        doc.commit();
148
149        let snapshot = doc
150            .export(ExportMode::Snapshot)
151            .context("failed to export bootstrap loro snapshot")?;
152        atomic_write_file(&project_dir.join(BASE_FILE), &snapshot)?;
153
154        Ok(Self {
155            root: root.to_path_buf(),
156            project: project.to_string(),
157            doc,
158        })
159    }
160
161    pub fn root(&self) -> &Path {
162        &self.root
163    }
164
165    pub fn project_name(&self) -> &str {
166        &self.project
167    }
168
169    pub fn doc(&self) -> &LoroDoc {
170        &self.doc
171    }
172
173    /// Export all current state to a fresh base snapshot.
174    /// Compact accumulated deltas into the base snapshot using a two-phase
175    /// protocol that is safe against concurrent writers.
176    ///
177    /// **Phase 1** — rename `changes/` to `changes.compacting.<ulid>/`, then
178    /// immediately create a fresh `changes/`.  Any concurrent `td` command
179    /// that writes a delta after this point lands in the new `changes/` and is
180    /// therefore never touched by this operation.
181    ///
182    /// **Phase 2** — write a fresh base snapshot from the in-memory document
183    /// (which was loaded from both `base.loro` and every delta at `open` time),
184    /// then remove the compacting directory.
185    ///
186    /// Any orphaned `changes.compacting.*` directories left by a previously
187    /// crashed tidy are also removed: they were already merged into `self.doc`
188    /// at open time, so the new snapshot includes their contents.
189    ///
190    /// Returns the number of delta files folded into the snapshot.
191    pub fn tidy(&self) -> Result<usize> {
192        let project_dir = project_dir(&self.root, &self.project);
193        let changes_dir = project_dir.join(CHANGES_DIR);
194
195        // Phase 1: atomically hand off the current changes/ to a compacting
196        // directory so new writers have a clean home immediately.
197        let compacting_dir = project_dir.join(format!("changes.compacting.{}", Ulid::new()));
198        if changes_dir.exists() {
199            fs::rename(&changes_dir, &compacting_dir).with_context(|| {
200                format!(
201                    "failed to rename '{}' to '{}'",
202                    changes_dir.display(),
203                    compacting_dir.display()
204                )
205            })?;
206        }
207        fs::create_dir_all(&changes_dir).context("failed to create fresh changes/")?;
208
209        // Re-import every delta from the compacting directories.  self.doc
210        // was populated at open() time, but a concurrent writer may have
211        // appended a delta to changes/ between open() and the Phase 1
212        // rename — that delta is now inside compacting_dir without being in
213        // self.doc.  CRDT import is idempotent (deduplicates by OpID), so
214        // re-importing already-known ops is harmless.
215        let mut compacting_deltas = collect_delta_paths(&project_dir)?;
216        compacting_deltas.sort_by_key(|path| {
217            path.file_stem()
218                .and_then(|s| s.to_str())
219                .and_then(|s| Ulid::from_string(s).ok())
220        });
221        for delta_path in &compacting_deltas {
222            if let Ok(bytes) = fs::read(delta_path) {
223                if let Err(err) = self.doc.import(&bytes) {
224                    eprintln!(
225                        "warning: skipping unreadable delta '{}': {err}",
226                        delta_path.display()
227                    );
228                }
229            }
230        }
231
232        // Phase 2: write the new base snapshot.  self.doc now holds the
233        // full merged state including any concurrent deltas.
234        let out = project_dir.join(BASE_FILE);
235        let bytes = self
236            .doc
237            .export(ExportMode::Snapshot)
238            .context("failed to export loro snapshot")?;
239        atomic_write_file(&out, &bytes)?;
240
241        // Remove the compacting directory we created in phase 1 plus any
242        // orphaned changes.compacting.* dirs from previously crashed tidies.
243        let mut removed = 0usize;
244        for entry in fs::read_dir(&project_dir)
245            .with_context(|| format!("failed to read project dir '{}'", project_dir.display()))?
246        {
247            let path = entry?.path();
248            if !path.is_dir() {
249                continue;
250            }
251            let Some(name) = path.file_name().and_then(|n| n.to_str()) else {
252                continue;
253            };
254            if !name.starts_with("changes.compacting.") {
255                continue;
256            }
257            // Count files before removing for the summary report.
258            for file in fs::read_dir(&path)
259                .with_context(|| format!("failed to read '{}'", path.display()))?
260            {
261                let fp = file?.path();
262                if fp.is_file() {
263                    removed += 1;
264                }
265            }
266            fs::remove_dir_all(&path)
267                .with_context(|| format!("failed to remove compacting dir '{}'", path.display()))?;
268        }
269
270        Ok(removed)
271    }
272
273    /// Apply a local mutation and persist only the resulting delta.
274    pub fn apply_and_persist<F>(&self, mutator: F) -> Result<PathBuf>
275    where
276        F: FnOnce(&LoroDoc) -> Result<()>,
277    {
278        let before = self.doc.oplog_vv();
279        mutator(&self.doc)?;
280        self.doc.commit();
281
282        let delta = self
283            .doc
284            .export(ExportMode::updates(&before))
285            .context("failed to export loro update delta")?;
286
287        let filename = format!("{}.loro", Ulid::new());
288        let path = project_dir(&self.root, &self.project)
289            .join(CHANGES_DIR)
290            .join(filename);
291        atomic_write_file(&path, &delta)?;
292        Ok(path)
293    }
294
295    /// Persist pre-built delta bytes (e.g. received from a peer) as a new
296    /// change file without re-exporting from the doc.
297    pub fn save_raw_delta(&self, bytes: &[u8]) -> Result<PathBuf> {
298        let filename = format!("{}.loro", Ulid::new());
299        let path = project_dir(&self.root, &self.project)
300            .join(CHANGES_DIR)
301            .join(filename);
302        atomic_write_file(&path, bytes)?;
303        Ok(path)
304    }
305
306    /// Return hydrated tasks, excluding tombstones.
307    pub fn list_tasks(&self) -> Result<Vec<Task>> {
308        self.list_tasks_inner(false)
309    }
310
311    /// Return hydrated tasks, including tombstoned rows.
312    pub fn list_tasks_unfiltered(&self) -> Result<Vec<Task>> {
313        self.list_tasks_inner(true)
314    }
315
316    /// Find a task by exact ULID string.
317    pub fn get_task(&self, id: &TaskId, include_deleted: bool) -> Result<Option<Task>> {
318        let tasks = if include_deleted {
319            self.list_tasks_unfiltered()?
320        } else {
321            self.list_tasks()?
322        };
323        Ok(tasks.into_iter().find(|task| task.id == *id))
324    }
325
326    fn list_tasks_inner(&self, include_deleted: bool) -> Result<Vec<Task>> {
327        let root = serde_json::to_value(self.doc.get_deep_value())?;
328        let tasks_obj = root
329            .get("tasks")
330            .and_then(Value::as_object)
331            .ok_or_else(|| anyhow!("missing root tasks map"))?;
332
333        let mut tasks = Vec::with_capacity(tasks_obj.len());
334        for (task_id_raw, task_json) in tasks_obj {
335            let task = hydrate_task(task_id_raw, task_json)?;
336            if include_deleted || task.deleted_at.is_none() {
337                tasks.push(task);
338            }
339        }
340
341        tasks.sort_by(|a, b| a.id.as_str().cmp(b.id.as_str()));
342        Ok(tasks)
343    }
344
345    /// Return the stable project identity stored in `meta.project_id`.
346    pub fn project_id(&self) -> Result<String> {
347        read_project_id_from_doc(&self.doc)
348    }
349
350    pub fn schema_version(&self) -> Result<u32> {
351        migrate::read_schema_version(&self.doc)
352    }
353}
354
355pub fn data_root() -> Result<PathBuf> {
356    let home = std::env::var("HOME").context("HOME is not set")?;
357    Ok(PathBuf::from(home).join(".local").join("share").join("td"))
358}
359
360pub fn init(cwd: &Path, project: &str) -> Result<Store> {
361    let root = data_root()?;
362    fs::create_dir_all(root.join(PROJECTS_DIR))?;
363    let store = Store::init(&root, project)?;
364    bind_project(cwd, project)?;
365    Ok(store)
366}
367
368pub fn use_project(cwd: &Path, project: &str) -> Result<()> {
369    let root = data_root()?;
370    validate_project_name(project)?;
371    if !project_dir(&root, project).join(BASE_FILE).exists() {
372        bail!("project '{project}' not found. Run 'td project list' to list known projects");
373    }
374    bind_project(cwd, project)
375}
376
377pub fn open(start: &Path) -> Result<Store> {
378    let root = data_root()?;
379    let explicit = std::env::var(PROJECT_ENV).ok();
380    let project = resolve_project_name(start, &root, explicit.as_deref())?.ok_or_else(|| {
381        anyhow!(
382            "no project selected. Use --project/TD_PROJECT, run 'td project bind <name>', or run 'td project init <name>'"
383        )
384    })?;
385    Store::open(&root, &project)
386}
387
388/// Open the project selected by `--project`/`TD_PROJECT`/bindings if one exists.
389///
390/// Returns `Ok(None)` when no project is selected by any mechanism.
391pub fn try_open(start: &Path) -> Result<Option<Store>> {
392    let root = data_root()?;
393    let explicit = std::env::var(PROJECT_ENV).ok();
394    let Some(project) = resolve_project_name(start, &root, explicit.as_deref())? else {
395        return Ok(None);
396    };
397    Store::open(&root, &project).map(Some)
398}
399
400/// Bootstrap a project from a peer delta and bind the current directory.
401pub fn bootstrap_sync(cwd: &Path, project: &str, delta: &[u8]) -> Result<Store> {
402    let root = data_root()?;
403    fs::create_dir_all(root.join(PROJECTS_DIR))?;
404    validate_project_name(project)?;
405    let store = Store::bootstrap_from_peer(&root, project, delta)?;
406    bind_project(cwd, project)?;
407    Ok(store)
408}
409
410/// Bootstrap a project from a peer delta using an explicit data root.
411///
412/// Unlike [`bootstrap_sync`], this function does not consult `HOME` and is
413/// therefore safe to call from async contexts where `HOME` may vary by peer.
414///
415/// If `bind_cwd` is true, the given working directory is bound to the new
416/// project. Pass false when bootstrapping from a SyncAll context to avoid
417/// unexpectedly binding directories like the user's home.
418///
419/// Uses exclusive file locking to prevent race conditions when multiple
420/// concurrent sync operations create projects or modify bindings.
421pub fn bootstrap_sync_at(
422    data_root: &Path,
423    cwd: &Path,
424    project: &str,
425    delta: &[u8],
426    bind_cwd: bool,
427) -> Result<Store> {
428    fs::create_dir_all(data_root.join(PROJECTS_DIR))?;
429    validate_project_name(project)?;
430
431    // Exclusive lock prevents races when concurrent syncs create the same project
432    // or modify bindings simultaneously.
433    let lock_path = data_root.join(".bindings.lock");
434    let lock_file = OpenOptions::new()
435        .create(true)
436        .truncate(false)
437        .write(true)
438        .open(&lock_path)
439        .with_context(|| format!("failed to open lock file '{}'", lock_path.display()))?;
440    lock_file
441        .lock_exclusive()
442        .context("failed to acquire exclusive lock on bindings")?;
443
444    // Now holding the lock: create project and optionally update bindings atomically.
445    let store = Store::bootstrap_from_peer(data_root, project, delta)?;
446
447    if bind_cwd {
448        let canonical = fs::canonicalize(cwd)
449            .with_context(|| format!("failed to canonicalize '{}'", cwd.display()))?;
450        let mut bindings = load_bindings(data_root)?;
451        bindings
452            .bindings
453            .insert(canonical.to_string_lossy().to_string(), project.to_string());
454        save_bindings(data_root, &bindings)?;
455    }
456
457    // Lock is released when lock_file is dropped.
458    Ok(store)
459}
460
461pub fn list_projects() -> Result<Vec<String>> {
462    let root = data_root()?;
463    list_projects_in(&root)
464}
465
466/// List project names rooted at an explicit data directory.
467///
468/// Unlike [`list_projects`], this does not consult `HOME` and is therefore
469/// safe to call from async contexts where `HOME` may vary between peers.
470pub(crate) fn list_projects_in(root: &Path) -> Result<Vec<String>> {
471    let mut out = Vec::new();
472    let dir = root.join(PROJECTS_DIR);
473    if !dir.exists() {
474        return Ok(out);
475    }
476
477    for entry in fs::read_dir(dir)? {
478        let path = entry?.path();
479        if !path.is_dir() {
480            continue;
481        }
482        let Some(name) = path.file_name().and_then(|n| n.to_str()) else {
483            continue;
484        };
485        if path.join(BASE_FILE).exists() {
486            out.push(name.to_string());
487        }
488    }
489
490    out.sort();
491    Ok(out)
492}
493
494pub fn resolve_task_id(store: &Store, raw: &str, include_deleted: bool) -> Result<TaskId> {
495    let raw = raw.strip_prefix("td-").unwrap_or(raw);
496
497    if let Ok(id) = TaskId::parse(raw) {
498        if store.get_task(&id, include_deleted)?.is_some() {
499            return Ok(id);
500        }
501    }
502
503    let tasks = if include_deleted {
504        store.list_tasks_unfiltered()?
505    } else {
506        store.list_tasks()?
507    };
508
509    let upper = raw.to_ascii_uppercase();
510    let matches: Vec<TaskId> = tasks
511        .into_iter()
512        .filter(|t| t.id.as_str().ends_with(&upper))
513        .map(|t| t.id)
514        .collect();
515
516    match matches.as_slice() {
517        [] => bail!("task '{raw}' not found"),
518        [id] => Ok(id.clone()),
519        _ => bail!("task reference '{raw}' is ambiguous"),
520    }
521}
522
523pub fn partition_blockers(store: &Store, blockers: &[TaskId]) -> Result<BlockerPartition> {
524    let mut out = BlockerPartition::default();
525    for blocker in blockers {
526        let Some(task) = store.get_task(blocker, true)? else {
527            out.resolved.push(blocker.clone());
528            continue;
529        };
530        if task.status == Status::Closed || task.deleted_at.is_some() {
531            out.resolved.push(blocker.clone());
532        } else {
533            out.open.push(blocker.clone());
534        }
535    }
536    Ok(out)
537}
538
539pub fn insert_task_map(tasks: &LoroMap, task_id: &TaskId) -> Result<LoroMap> {
540    tasks
541        .insert_container(task_id.as_str(), LoroMap::new())
542        .context("failed to create task map")
543}
544
545pub fn get_task_map(tasks: &LoroMap, task_id: &TaskId) -> Result<Option<LoroMap>> {
546    match tasks.get(task_id.as_str()) {
547        Some(ValueOrContainer::Container(Container::Map(map))) => Ok(Some(map)),
548        Some(_) => bail!("task '{}' has invalid container type", task_id.as_str()),
549        None => Ok(None),
550    }
551}
552
553pub fn get_or_create_child_map(parent: &LoroMap, key: &str) -> Result<LoroMap> {
554    parent
555        .get_or_create_container(key, LoroMap::new())
556        .with_context(|| format!("failed to get or create map key '{key}'"))
557}
558
559fn bindings_path(root: &Path) -> PathBuf {
560    root.join(BINDINGS_FILE)
561}
562
563fn resolve_project_name(
564    start: &Path,
565    root: &Path,
566    explicit: Option<&str>,
567) -> Result<Option<String>> {
568    if let Some(project) = explicit {
569        validate_project_name(project)?;
570        return Ok(Some(project.to_string()));
571    }
572
573    let cwd = canonicalize_binding_path(start)?;
574    let bindings = load_bindings(root)?;
575
576    let mut best: Option<(usize, String)> = None;
577    for (raw_path, project) in bindings.bindings {
578        let bound = PathBuf::from(raw_path);
579        if is_prefix_path(&bound, &cwd) {
580            let score = bound.components().count();
581            match &best {
582                Some((best_score, _)) if *best_score >= score => {}
583                _ => best = Some((score, project)),
584            }
585        }
586    }
587
588    if let Some((_, project)) = best {
589        return Ok(Some(project));
590    }
591
592    Ok(None)
593}
594
595pub fn unbind_project(cwd: &Path) -> Result<()> {
596    let root = data_root()?;
597    let canonical = canonicalize_binding_path(cwd)?;
598    let canonical_str = canonical.to_string_lossy().to_string();
599
600    let mut bindings = load_bindings(&root)?;
601    if !bindings.bindings.contains_key(&canonical_str) {
602        bail!("path '{}' is not bound to any project", canonical.display());
603    }
604    bindings.bindings.remove(&canonical_str);
605    save_bindings(&root, &bindings)
606}
607
608pub fn delete_project(name: &str) -> Result<()> {
609    validate_project_name(name)?;
610    let root = data_root()?;
611    let proj_dir = project_dir(&root, name);
612
613    if !proj_dir.join(BASE_FILE).exists() {
614        bail!("project '{name}' not found");
615    }
616
617    fs::remove_dir_all(&proj_dir).with_context(|| {
618        format!(
619            "failed to remove project directory '{}'",
620            proj_dir.display()
621        )
622    })?;
623
624    let mut bindings = load_bindings(&root)?;
625    bindings.bindings.retain(|_, project| project != name);
626    save_bindings(&root, &bindings)
627}
628
629fn bind_project(cwd: &Path, project: &str) -> Result<()> {
630    validate_project_name(project)?;
631
632    let root = data_root()?;
633    fs::create_dir_all(&root)?;
634
635    let canonical = canonicalize_binding_path(cwd)?;
636    let mut bindings = load_bindings(&root)?;
637    bindings
638        .bindings
639        .insert(canonical.to_string_lossy().to_string(), project.to_string());
640    save_bindings(&root, &bindings)
641}
642
643fn load_bindings(root: &Path) -> Result<BindingsFile> {
644    let path = bindings_path(root);
645    if !path.exists() {
646        return Ok(BindingsFile::default());
647    }
648    let content = fs::read_to_string(&path)
649        .with_context(|| format!("failed reading bindings from '{}'", path.display()))?;
650    serde_json::from_str(&content)
651        .with_context(|| format!("invalid bindings file '{}'", path.display()))
652}
653
654fn save_bindings(root: &Path, bindings: &BindingsFile) -> Result<()> {
655    let path = bindings_path(root);
656    let bytes = serde_json::to_vec_pretty(bindings)?;
657    atomic_write_file(&path, &bytes)
658}
659
660fn canonicalize_binding_path(path: &Path) -> Result<PathBuf> {
661    fs::canonicalize(path).with_context(|| format!("failed to canonicalize '{}'", path.display()))
662}
663
664fn is_prefix_path(prefix: &Path, target: &Path) -> bool {
665    let mut prefix_components = prefix.components();
666    let mut target_components = target.components();
667
668    loop {
669        match (prefix_components.next(), target_components.next()) {
670            (None, _) => return true,
671            (Some(_), None) => return false,
672            (Some(a), Some(b)) if a == b => continue,
673            _ => return false,
674        }
675    }
676}
677
678pub fn validate_project_name(name: &str) -> Result<()> {
679    if name.is_empty() {
680        bail!("project name cannot be empty");
681    }
682    if name.contains('/') || name.contains('\\') || name == "." || name == ".." {
683        bail!("invalid project name '{name}'");
684    }
685    if name.chars().any(char::is_control) {
686        bail!("invalid project name '{name}'");
687    }
688    Ok(())
689}
690
691fn read_project_id_from_doc(doc: &LoroDoc) -> Result<String> {
692    let root = serde_json::to_value(doc.get_deep_value())?;
693    root.get("meta")
694        .and_then(|m| m.get("project_id"))
695        .and_then(Value::as_str)
696        .map(str::to_owned)
697        .ok_or_else(|| anyhow!("missing meta.project_id in project doc"))
698}
699
700fn hydrate_task(task_id_raw: &str, value: &Value) -> Result<Task> {
701    let obj = value
702        .as_object()
703        .ok_or_else(|| anyhow!("task '{task_id_raw}' is not an object"))?;
704
705    let id = TaskId::parse(task_id_raw)?;
706
707    let title = get_required_string(obj, "title")?;
708    let description = get_required_string(obj, "description")?;
709    let task_type = get_required_string(obj, "type")?;
710    let status = Status::parse(&get_required_string(obj, "status")?)?;
711    let priority = Priority::parse(&get_required_string(obj, "priority")?)?;
712    let effort = Effort::parse(&get_required_string(obj, "effort")?)?;
713    let parent = match obj.get("parent").and_then(Value::as_str) {
714        Some("") | None => None,
715        Some(raw) => Some(TaskId::parse(raw)?),
716    };
717
718    let created_at = get_required_string(obj, "created_at")?;
719    let updated_at = get_required_string(obj, "updated_at")?;
720    let deleted_at = obj
721        .get("deleted_at")
722        .and_then(Value::as_str)
723        .map(str::to_owned)
724        .filter(|s| !s.is_empty());
725
726    let labels = obj
727        .get("labels")
728        .and_then(Value::as_object)
729        .map(|m| m.keys().cloned().collect())
730        .unwrap_or_default();
731
732    let blockers = obj
733        .get("blockers")
734        .and_then(Value::as_object)
735        .map(|m| {
736            m.keys()
737                .map(|raw| TaskId::parse(raw))
738                .collect::<Result<Vec<_>>>()
739        })
740        .transpose()?
741        .unwrap_or_default();
742
743    let mut logs = obj
744        .get("logs")
745        .and_then(Value::as_object)
746        .map(|logs| {
747            logs.iter()
748                .map(|(log_id_raw, payload)| {
749                    let payload_obj = payload.as_object().ok_or_else(|| {
750                        anyhow!("log '{log_id_raw}' on task '{task_id_raw}' is not an object")
751                    })?;
752                    Ok(LogEntry {
753                        id: TaskId::parse(log_id_raw)?,
754                        timestamp: get_required_string(payload_obj, "timestamp")?,
755                        message: get_required_string(payload_obj, "message")?,
756                    })
757                })
758                .collect::<Result<Vec<_>>>()
759        })
760        .transpose()?
761        .unwrap_or_default();
762
763    logs.sort_by(|a, b| a.id.as_str().cmp(b.id.as_str()));
764
765    Ok(Task {
766        id,
767        title,
768        description,
769        task_type,
770        priority,
771        status,
772        effort,
773        parent,
774        created_at,
775        updated_at,
776        deleted_at,
777        labels,
778        blockers,
779        logs,
780    })
781}
782
783fn get_required_string(map: &serde_json::Map<String, Value>, key: &str) -> Result<String> {
784    map.get(key)
785        .and_then(Value::as_str)
786        .map(str::to_owned)
787        .ok_or_else(|| anyhow!("missing or non-string key '{key}'"))
788}
789
790fn collect_delta_paths(project_dir: &Path) -> Result<Vec<PathBuf>> {
791    let mut paths = Vec::new();
792    collect_changes_from_dir(&project_dir.join(CHANGES_DIR), &mut paths)?;
793
794    for entry in fs::read_dir(project_dir)? {
795        let path = entry?.path();
796        if !path.is_dir() {
797            continue;
798        }
799        let Some(name) = path.file_name().and_then(|n| n.to_str()) else {
800            continue;
801        };
802        if name.starts_with("changes.compacting.") {
803            collect_changes_from_dir(&path, &mut paths)?;
804        }
805    }
806
807    Ok(paths)
808}
809
810fn collect_changes_from_dir(dir: &Path, out: &mut Vec<PathBuf>) -> Result<()> {
811    if !dir.exists() {
812        return Ok(());
813    }
814
815    for entry in fs::read_dir(dir)? {
816        let path = entry?.path();
817        if !path.is_file() {
818            continue;
819        }
820
821        let Some(filename) = path.file_name().and_then(|n| n.to_str()) else {
822            continue;
823        };
824        if filename.ends_with(TMP_SUFFIX) || !filename.ends_with(".loro") {
825            continue;
826        }
827
828        let Some(stem) = path.file_stem().and_then(|s| s.to_str()) else {
829            continue;
830        };
831        if Ulid::from_string(stem).is_err() {
832            continue;
833        }
834
835        out.push(path);
836    }
837
838    Ok(())
839}
840
841fn project_dir(root: &Path, project: &str) -> PathBuf {
842    root.join(PROJECTS_DIR).join(project)
843}
844
845fn load_or_create_device_peer_id(root: &Path) -> Result<PeerID> {
846    let path = root.join("device_id");
847    if let Some(parent) = path.parent() {
848        fs::create_dir_all(parent)?;
849    }
850
851    let device_ulid = if path.exists() {
852        let content = fs::read_to_string(&path)
853            .with_context(|| format!("failed reading device id from '{}'", path.display()))?;
854        Ulid::from_string(content.trim()).context("invalid persisted device id ULID")?
855    } else {
856        let id = Ulid::new();
857        atomic_write_file(&path, id.to_string().as_bytes())?;
858        id
859    };
860
861    let raw: u128 = device_ulid.into();
862    Ok((raw & u64::MAX as u128) as u64)
863}
864
865fn atomic_write_file(path: &Path, bytes: &[u8]) -> Result<()> {
866    let parent = path
867        .parent()
868        .ok_or_else(|| anyhow!("cannot atomically write root path '{}'", path.display()))?;
869    fs::create_dir_all(parent)?;
870
871    let tmp_name = format!(
872        "{}.{}{}",
873        path.file_name().and_then(|n| n.to_str()).unwrap_or("write"),
874        Ulid::new(),
875        TMP_SUFFIX
876    );
877    let tmp_path = parent.join(tmp_name);
878
879    {
880        let mut file = OpenOptions::new()
881            .create_new(true)
882            .write(true)
883            .open(&tmp_path)
884            .with_context(|| format!("failed to open temp file '{}'", tmp_path.display()))?;
885        file.write_all(bytes)?;
886        file.sync_all()?;
887    }
888
889    fs::rename(&tmp_path, path).with_context(|| {
890        format!(
891            "failed to atomically rename '{}' to '{}'",
892            tmp_path.display(),
893            path.display()
894        )
895    })?;
896
897    sync_dir(parent)?;
898    Ok(())
899}
900
901fn sync_dir(path: &Path) -> Result<()> {
902    let dir =
903        File::open(path).with_context(|| format!("failed opening dir '{}'", path.display()))?;
904    dir.sync_all()
905        .with_context(|| format!("failed fsync on dir '{}'", path.display()))?;
906    Ok(())
907}