db.rs

  1use anyhow::{anyhow, bail, Context, Result};
  2use loro::{Container, ExportMode, LoroDoc, LoroMap, PeerID, ValueOrContainer};
  3use serde::{Deserialize, Serialize};
  4use serde_json::Value;
  5use std::collections::BTreeMap;
  6use std::fmt;
  7use std::fs::{self, File, OpenOptions};
  8use std::io::Write;
  9use std::path::{Path, PathBuf};
 10use ulid::Ulid;
 11
 12pub const PROJECT_ENV: &str = "TD_PROJECT";
 13
 14const PROJECTS_DIR: &str = "projects";
 15const CHANGES_DIR: &str = "changes";
 16const BINDINGS_FILE: &str = "bindings.json";
 17const BASE_FILE: &str = "base.loro";
 18const TMP_SUFFIX: &str = ".tmp";
 19const SCHEMA_VERSION: u32 = 1;
 20
 21/// Current UTC time in ISO 8601 format.
 22pub fn now_utc() -> String {
 23    chrono::Utc::now().format("%Y-%m-%dT%H:%M:%SZ").to_string()
 24}
 25
 26/// Lifecycle state for a task.
 27#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
 28#[serde(rename_all = "snake_case")]
 29pub enum Status {
 30    Open,
 31    InProgress,
 32    Closed,
 33}
 34
 35impl Status {
 36    fn as_str(self) -> &'static str {
 37        match self {
 38            Status::Open => "open",
 39            Status::InProgress => "in_progress",
 40            Status::Closed => "closed",
 41        }
 42    }
 43
 44    fn parse(raw: &str) -> Result<Self> {
 45        match raw {
 46            "open" => Ok(Self::Open),
 47            "in_progress" => Ok(Self::InProgress),
 48            "closed" => Ok(Self::Closed),
 49            _ => bail!("invalid status '{raw}'"),
 50        }
 51    }
 52}
 53
 54/// Priority for task ordering.
 55#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
 56#[serde(rename_all = "snake_case")]
 57pub enum Priority {
 58    High,
 59    Medium,
 60    Low,
 61}
 62
 63impl Priority {
 64    fn as_str(self) -> &'static str {
 65        match self {
 66            Priority::High => "high",
 67            Priority::Medium => "medium",
 68            Priority::Low => "low",
 69        }
 70    }
 71
 72    fn parse(raw: &str) -> Result<Self> {
 73        match raw {
 74            "high" => Ok(Self::High),
 75            "medium" => Ok(Self::Medium),
 76            "low" => Ok(Self::Low),
 77            _ => bail!("invalid priority '{raw}'"),
 78        }
 79    }
 80
 81    pub fn score(self) -> i32 {
 82        match self {
 83            Priority::High => 1,
 84            Priority::Medium => 2,
 85            Priority::Low => 3,
 86        }
 87    }
 88}
 89
 90/// Estimated effort for a task.
 91#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
 92#[serde(rename_all = "snake_case")]
 93pub enum Effort {
 94    Low,
 95    Medium,
 96    High,
 97}
 98
 99impl Effort {
100    fn as_str(self) -> &'static str {
101        match self {
102            Effort::Low => "low",
103            Effort::Medium => "medium",
104            Effort::High => "high",
105        }
106    }
107
108    fn parse(raw: &str) -> Result<Self> {
109        match raw {
110            "low" => Ok(Self::Low),
111            "medium" => Ok(Self::Medium),
112            "high" => Ok(Self::High),
113            _ => bail!("invalid effort '{raw}'"),
114        }
115    }
116
117    pub fn score(self) -> i32 {
118        match self {
119            Effort::Low => 1,
120            Effort::Medium => 2,
121            Effort::High => 3,
122        }
123    }
124}
125
126/// A stable task identifier backed by a ULID.
127#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize)]
128#[serde(transparent)]
129pub struct TaskId(String);
130
131impl TaskId {
132    pub fn new(id: Ulid) -> Self {
133        Self(id.to_string())
134    }
135
136    pub fn parse(raw: &str) -> Result<Self> {
137        let id = Ulid::from_string(raw).with_context(|| format!("invalid task id '{raw}'"))?;
138        Ok(Self::new(id))
139    }
140
141    pub fn as_str(&self) -> &str {
142        &self.0
143    }
144
145    pub fn short(&self) -> String {
146        self.0.chars().take(7).collect()
147    }
148}
149
150impl fmt::Display for TaskId {
151    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
152        write!(f, "{}", self.short())
153    }
154}
155
156/// A task log entry embedded in a task record.
157#[derive(Debug, Clone, Serialize)]
158pub struct LogEntry {
159    pub id: TaskId,
160    pub timestamp: String,
161    pub message: String,
162}
163
164/// Hydrated task data from the CRDT document.
165#[derive(Debug, Clone, Serialize)]
166pub struct Task {
167    pub id: TaskId,
168    pub title: String,
169    pub description: String,
170    #[serde(rename = "type")]
171    pub task_type: String,
172    pub priority: Priority,
173    pub status: Status,
174    pub effort: Effort,
175    pub parent: Option<TaskId>,
176    pub created_at: String,
177    pub updated_at: String,
178    pub deleted_at: Option<String>,
179    pub labels: Vec<String>,
180    pub blockers: Vec<TaskId>,
181    pub logs: Vec<LogEntry>,
182}
183
184/// Result type for partitioning blockers by task state.
185#[derive(Debug, Default, Clone, Serialize)]
186pub struct BlockerPartition {
187    pub open: Vec<TaskId>,
188    pub resolved: Vec<TaskId>,
189}
190
191#[derive(Debug, Default, Clone, Serialize, Deserialize)]
192struct BindingsFile {
193    #[serde(default)]
194    bindings: BTreeMap<String, String>,
195}
196
197/// Storage wrapper around one project's Loro document and disk layout.
198#[derive(Debug, Clone)]
199pub struct Store {
200    root: PathBuf,
201    project: String,
202    doc: LoroDoc,
203}
204
205impl Store {
206    pub fn init(root: &Path, project: &str) -> Result<Self> {
207        validate_project_name(project)?;
208        let project_dir = project_dir(root, project);
209        if project_dir.exists() {
210            bail!("project '{project}' already exists");
211        }
212        fs::create_dir_all(project_dir.join(CHANGES_DIR))?;
213
214        let doc = LoroDoc::new();
215        doc.set_peer_id(load_or_create_device_peer_id(root)?)?;
216        doc.get_map("tasks");
217
218        let meta = doc.get_map("meta");
219        meta.insert("schema_version", SCHEMA_VERSION as i64)?;
220        meta.insert("project_id", Ulid::new().to_string())?;
221        meta.insert("created_at", now_utc())?;
222
223        let snapshot = doc
224            .export(ExportMode::Snapshot)
225            .context("failed to export initial loro snapshot")?;
226        atomic_write_file(&project_dir.join(BASE_FILE), &snapshot)?;
227
228        Ok(Self {
229            root: root.to_path_buf(),
230            project: project.to_string(),
231            doc,
232        })
233    }
234
235    pub fn open(root: &Path, project: &str) -> Result<Self> {
236        validate_project_name(project)?;
237        let project_dir = project_dir(root, project);
238        let base_path = project_dir.join(BASE_FILE);
239
240        if !base_path.exists() {
241            bail!("project '{project}' is not initialized. Run 'td init {project}'");
242        }
243
244        let base = fs::read(&base_path)
245            .with_context(|| format!("failed to read loro snapshot '{}'", base_path.display()))?;
246
247        let doc = LoroDoc::from_snapshot(&base).context("failed to load loro snapshot")?;
248        doc.set_peer_id(load_or_create_device_peer_id(root)?)?;
249
250        let mut deltas = collect_delta_paths(&project_dir)?;
251        deltas.sort_by_key(|path| {
252            path.file_stem()
253                .and_then(|s| s.to_str())
254                .and_then(|s| Ulid::from_string(s).ok())
255        });
256
257        for delta_path in deltas {
258            let bytes = fs::read(&delta_path)
259                .with_context(|| format!("failed to read loro delta '{}'", delta_path.display()))?;
260            if let Err(err) = doc.import(&bytes) {
261                // Tolerate malformed or stale delta files as requested by design.
262                eprintln!(
263                    "warning: skipping unreadable delta '{}': {err}",
264                    delta_path.display()
265                );
266            }
267        }
268
269        Ok(Self {
270            root: root.to_path_buf(),
271            project: project.to_string(),
272            doc,
273        })
274    }
275
276    pub fn root(&self) -> &Path {
277        &self.root
278    }
279
280    pub fn project_name(&self) -> &str {
281        &self.project
282    }
283
284    pub fn doc(&self) -> &LoroDoc {
285        &self.doc
286    }
287
288    /// Export all current state to a fresh base snapshot.
289    pub fn write_snapshot(&self) -> Result<PathBuf> {
290        let out = project_dir(&self.root, &self.project).join(BASE_FILE);
291        let bytes = self
292            .doc
293            .export(ExportMode::Snapshot)
294            .context("failed to export loro snapshot")?;
295        atomic_write_file(&out, &bytes)?;
296        Ok(out)
297    }
298
299    /// Delete persisted delta files after a fresh snapshot has been written.
300    pub fn purge_deltas(&self) -> Result<usize> {
301        let project_dir = project_dir(&self.root, &self.project);
302        let paths = collect_delta_paths(&project_dir)?;
303        let mut removed = 0usize;
304        for path in paths {
305            fs::remove_file(&path)
306                .with_context(|| format!("failed removing delta '{}'", path.display()))?;
307            removed += 1;
308        }
309        Ok(removed)
310    }
311
312    /// Apply a local mutation and persist only the resulting delta.
313    pub fn apply_and_persist<F>(&self, mutator: F) -> Result<PathBuf>
314    where
315        F: FnOnce(&LoroDoc) -> Result<()>,
316    {
317        let before = self.doc.oplog_vv();
318        mutator(&self.doc)?;
319        self.doc.commit();
320
321        let delta = self
322            .doc
323            .export(ExportMode::updates(&before))
324            .context("failed to export loro update delta")?;
325
326        let filename = format!("{}.loro", Ulid::new());
327        let path = project_dir(&self.root, &self.project)
328            .join(CHANGES_DIR)
329            .join(filename);
330        atomic_write_file(&path, &delta)?;
331        Ok(path)
332    }
333
334    /// Persist pre-built delta bytes (e.g. received from a peer) as a new
335    /// change file without re-exporting from the doc.
336    pub fn save_raw_delta(&self, bytes: &[u8]) -> Result<PathBuf> {
337        let filename = format!("{}.loro", Ulid::new());
338        let path = project_dir(&self.root, &self.project)
339            .join(CHANGES_DIR)
340            .join(filename);
341        atomic_write_file(&path, bytes)?;
342        Ok(path)
343    }
344
345    /// Return hydrated tasks, excluding tombstones.
346    pub fn list_tasks(&self) -> Result<Vec<Task>> {
347        self.list_tasks_inner(false)
348    }
349
350    /// Return hydrated tasks, including tombstoned rows.
351    pub fn list_tasks_unfiltered(&self) -> Result<Vec<Task>> {
352        self.list_tasks_inner(true)
353    }
354
355    /// Find a task by exact ULID string.
356    pub fn get_task(&self, id: &TaskId, include_deleted: bool) -> Result<Option<Task>> {
357        let tasks = if include_deleted {
358            self.list_tasks_unfiltered()?
359        } else {
360            self.list_tasks()?
361        };
362        Ok(tasks.into_iter().find(|task| task.id == *id))
363    }
364
365    fn list_tasks_inner(&self, include_deleted: bool) -> Result<Vec<Task>> {
366        let root = serde_json::to_value(self.doc.get_deep_value())?;
367        let tasks_obj = root
368            .get("tasks")
369            .and_then(Value::as_object)
370            .ok_or_else(|| anyhow!("missing root tasks map"))?;
371
372        let mut tasks = Vec::with_capacity(tasks_obj.len());
373        for (task_id_raw, task_json) in tasks_obj {
374            let task = hydrate_task(task_id_raw, task_json)?;
375            if include_deleted || task.deleted_at.is_none() {
376                tasks.push(task);
377            }
378        }
379
380        tasks.sort_by(|a, b| a.id.as_str().cmp(b.id.as_str()));
381        Ok(tasks)
382    }
383
384    pub fn schema_version(&self) -> Result<u32> {
385        let root = serde_json::to_value(self.doc.get_deep_value())?;
386        let meta = root
387            .get("meta")
388            .and_then(Value::as_object)
389            .ok_or_else(|| anyhow!("missing root meta map"))?;
390        let n = meta
391            .get("schema_version")
392            .and_then(Value::as_u64)
393            .ok_or_else(|| anyhow!("invalid or missing meta.schema_version"))?;
394        Ok(n as u32)
395    }
396}
397
398/// Generate a new task ULID.
399pub fn gen_id() -> TaskId {
400    TaskId::new(Ulid::new())
401}
402
403pub fn parse_status(s: &str) -> Result<Status> {
404    Status::parse(s)
405}
406
407pub fn parse_priority(s: &str) -> Result<Priority> {
408    Priority::parse(s)
409}
410
411pub fn parse_effort(s: &str) -> Result<Effort> {
412    Effort::parse(s)
413}
414
415pub fn status_label(s: Status) -> &'static str {
416    s.as_str()
417}
418
419pub fn priority_label(p: Priority) -> &'static str {
420    p.as_str()
421}
422
423pub fn effort_label(e: Effort) -> &'static str {
424    e.as_str()
425}
426
427pub fn data_root() -> Result<PathBuf> {
428    let home = std::env::var("HOME").context("HOME is not set")?;
429    Ok(PathBuf::from(home).join(".local").join("share").join("td"))
430}
431
432pub fn init(cwd: &Path, project: &str) -> Result<Store> {
433    let root = data_root()?;
434    fs::create_dir_all(root.join(PROJECTS_DIR))?;
435    let store = Store::init(&root, project)?;
436    bind_project(cwd, project)?;
437    Ok(store)
438}
439
440pub fn use_project(cwd: &Path, project: &str) -> Result<()> {
441    let root = data_root()?;
442    validate_project_name(project)?;
443    if !project_dir(&root, project).join(BASE_FILE).exists() {
444        bail!("project '{project}' not found. Run 'td projects' to list known projects");
445    }
446    bind_project(cwd, project)
447}
448
449pub fn open(start: &Path) -> Result<Store> {
450    let root = data_root()?;
451    let explicit = std::env::var(PROJECT_ENV).ok();
452    let project = resolve_project_name(start, &root, explicit.as_deref())?;
453    Store::open(&root, &project)
454}
455
456pub fn list_projects() -> Result<Vec<String>> {
457    let root = data_root()?;
458    let mut out = Vec::new();
459    let dir = root.join(PROJECTS_DIR);
460    if !dir.exists() {
461        return Ok(out);
462    }
463
464    for entry in fs::read_dir(dir)? {
465        let path = entry?.path();
466        if !path.is_dir() {
467            continue;
468        }
469        let Some(name) = path.file_name().and_then(|n| n.to_str()) else {
470            continue;
471        };
472        if path.join(BASE_FILE).exists() {
473            out.push(name.to_string());
474        }
475    }
476
477    out.sort();
478    Ok(out)
479}
480
481pub fn resolve_task_id(store: &Store, raw: &str, include_deleted: bool) -> Result<TaskId> {
482    if let Ok(id) = TaskId::parse(raw) {
483        if store.get_task(&id, include_deleted)?.is_some() {
484            return Ok(id);
485        }
486    }
487
488    let tasks = if include_deleted {
489        store.list_tasks_unfiltered()?
490    } else {
491        store.list_tasks()?
492    };
493
494    let matches: Vec<TaskId> = tasks
495        .into_iter()
496        .filter(|t| t.id.as_str().starts_with(raw))
497        .map(|t| t.id)
498        .collect();
499
500    match matches.as_slice() {
501        [] => bail!("task '{raw}' not found"),
502        [id] => Ok(id.clone()),
503        _ => bail!("task reference '{raw}' is ambiguous"),
504    }
505}
506
507pub fn partition_blockers(store: &Store, blockers: &[TaskId]) -> Result<BlockerPartition> {
508    let mut out = BlockerPartition::default();
509    for blocker in blockers {
510        let Some(task) = store.get_task(blocker, true)? else {
511            out.resolved.push(blocker.clone());
512            continue;
513        };
514        if task.status == Status::Closed || task.deleted_at.is_some() {
515            out.resolved.push(blocker.clone());
516        } else {
517            out.open.push(blocker.clone());
518        }
519    }
520    Ok(out)
521}
522
523pub fn insert_task_map(tasks: &LoroMap, task_id: &TaskId) -> Result<LoroMap> {
524    tasks
525        .insert_container(task_id.as_str(), LoroMap::new())
526        .context("failed to create task map")
527}
528
529pub fn get_task_map(tasks: &LoroMap, task_id: &TaskId) -> Result<Option<LoroMap>> {
530    match tasks.get(task_id.as_str()) {
531        Some(ValueOrContainer::Container(Container::Map(map))) => Ok(Some(map)),
532        Some(_) => bail!("task '{}' has invalid container type", task_id.as_str()),
533        None => Ok(None),
534    }
535}
536
537pub fn get_or_create_child_map(parent: &LoroMap, key: &str) -> Result<LoroMap> {
538    parent
539        .get_or_create_container(key, LoroMap::new())
540        .with_context(|| format!("failed to get or create map key '{key}'"))
541}
542
543fn bindings_path(root: &Path) -> PathBuf {
544    root.join(BINDINGS_FILE)
545}
546
547fn resolve_project_name(start: &Path, root: &Path, explicit: Option<&str>) -> Result<String> {
548    if let Some(project) = explicit {
549        validate_project_name(project)?;
550        return Ok(project.to_string());
551    }
552
553    let cwd = canonicalize_binding_path(start)?;
554    let bindings = load_bindings(root)?;
555
556    let mut best: Option<(usize, String)> = None;
557    for (raw_path, project) in bindings.bindings {
558        let bound = PathBuf::from(raw_path);
559        if is_prefix_path(&bound, &cwd) {
560            let score = bound.components().count();
561            match &best {
562                Some((best_score, _)) if *best_score >= score => {}
563                _ => best = Some((score, project)),
564            }
565        }
566    }
567
568    if let Some((_, project)) = best {
569        return Ok(project);
570    }
571
572    bail!(
573        "no project selected. Use --project/TD_PROJECT, run 'td use <name>', or run 'td init <name>'"
574    )
575}
576
577fn bind_project(cwd: &Path, project: &str) -> Result<()> {
578    validate_project_name(project)?;
579
580    let root = data_root()?;
581    fs::create_dir_all(&root)?;
582
583    let canonical = canonicalize_binding_path(cwd)?;
584    let mut bindings = load_bindings(&root)?;
585    bindings
586        .bindings
587        .insert(canonical.to_string_lossy().to_string(), project.to_string());
588    save_bindings(&root, &bindings)
589}
590
591fn load_bindings(root: &Path) -> Result<BindingsFile> {
592    let path = bindings_path(root);
593    if !path.exists() {
594        return Ok(BindingsFile::default());
595    }
596    let content = fs::read_to_string(&path)
597        .with_context(|| format!("failed reading bindings from '{}'", path.display()))?;
598    serde_json::from_str(&content)
599        .with_context(|| format!("invalid bindings file '{}'", path.display()))
600}
601
602fn save_bindings(root: &Path, bindings: &BindingsFile) -> Result<()> {
603    let path = bindings_path(root);
604    let bytes = serde_json::to_vec_pretty(bindings)?;
605    atomic_write_file(&path, &bytes)
606}
607
608fn canonicalize_binding_path(path: &Path) -> Result<PathBuf> {
609    fs::canonicalize(path).with_context(|| format!("failed to canonicalize '{}'", path.display()))
610}
611
612fn is_prefix_path(prefix: &Path, target: &Path) -> bool {
613    let mut prefix_components = prefix.components();
614    let mut target_components = target.components();
615
616    loop {
617        match (prefix_components.next(), target_components.next()) {
618            (None, _) => return true,
619            (Some(_), None) => return false,
620            (Some(a), Some(b)) if a == b => continue,
621            _ => return false,
622        }
623    }
624}
625
626fn validate_project_name(name: &str) -> Result<()> {
627    if name.is_empty() {
628        bail!("project name cannot be empty");
629    }
630    if name.contains('/') || name.contains('\\') || name == "." || name == ".." {
631        bail!("invalid project name '{name}'");
632    }
633    if name.chars().any(char::is_control) {
634        bail!("invalid project name '{name}'");
635    }
636    Ok(())
637}
638
639fn hydrate_task(task_id_raw: &str, value: &Value) -> Result<Task> {
640    let obj = value
641        .as_object()
642        .ok_or_else(|| anyhow!("task '{task_id_raw}' is not an object"))?;
643
644    let id = TaskId::parse(task_id_raw)?;
645
646    let title = get_required_string(obj, "title")?;
647    let description = get_required_string(obj, "description")?;
648    let task_type = get_required_string(obj, "type")?;
649    let status = Status::parse(&get_required_string(obj, "status")?)?;
650    let priority = Priority::parse(&get_required_string(obj, "priority")?)?;
651    let effort = Effort::parse(&get_required_string(obj, "effort")?)?;
652    let parent = match obj.get("parent").and_then(Value::as_str) {
653        Some("") | None => None,
654        Some(raw) => Some(TaskId::parse(raw)?),
655    };
656
657    let created_at = get_required_string(obj, "created_at")?;
658    let updated_at = get_required_string(obj, "updated_at")?;
659    let deleted_at = obj
660        .get("deleted_at")
661        .and_then(Value::as_str)
662        .map(str::to_owned)
663        .filter(|s| !s.is_empty());
664
665    let labels = obj
666        .get("labels")
667        .and_then(Value::as_object)
668        .map(|m| m.keys().cloned().collect())
669        .unwrap_or_default();
670
671    let blockers = obj
672        .get("blockers")
673        .and_then(Value::as_object)
674        .map(|m| {
675            m.keys()
676                .map(|raw| TaskId::parse(raw))
677                .collect::<Result<Vec<_>>>()
678        })
679        .transpose()?
680        .unwrap_or_default();
681
682    let mut logs = obj
683        .get("logs")
684        .and_then(Value::as_object)
685        .map(|logs| {
686            logs.iter()
687                .map(|(log_id_raw, payload)| {
688                    let payload_obj = payload.as_object().ok_or_else(|| {
689                        anyhow!("log '{log_id_raw}' on task '{task_id_raw}' is not an object")
690                    })?;
691                    Ok(LogEntry {
692                        id: TaskId::parse(log_id_raw)?,
693                        timestamp: get_required_string(payload_obj, "timestamp")?,
694                        message: get_required_string(payload_obj, "message")?,
695                    })
696                })
697                .collect::<Result<Vec<_>>>()
698        })
699        .transpose()?
700        .unwrap_or_default();
701
702    logs.sort_by(|a, b| a.id.as_str().cmp(b.id.as_str()));
703
704    Ok(Task {
705        id,
706        title,
707        description,
708        task_type,
709        priority,
710        status,
711        effort,
712        parent,
713        created_at,
714        updated_at,
715        deleted_at,
716        labels,
717        blockers,
718        logs,
719    })
720}
721
722fn get_required_string(map: &serde_json::Map<String, Value>, key: &str) -> Result<String> {
723    map.get(key)
724        .and_then(Value::as_str)
725        .map(str::to_owned)
726        .ok_or_else(|| anyhow!("missing or non-string key '{key}'"))
727}
728
729fn collect_delta_paths(project_dir: &Path) -> Result<Vec<PathBuf>> {
730    let mut paths = Vec::new();
731    collect_changes_from_dir(&project_dir.join(CHANGES_DIR), &mut paths)?;
732
733    for entry in fs::read_dir(project_dir)? {
734        let path = entry?.path();
735        if !path.is_dir() {
736            continue;
737        }
738        let Some(name) = path.file_name().and_then(|n| n.to_str()) else {
739            continue;
740        };
741        if name.starts_with("changes.compacting.") {
742            collect_changes_from_dir(&path, &mut paths)?;
743        }
744    }
745
746    Ok(paths)
747}
748
749fn collect_changes_from_dir(dir: &Path, out: &mut Vec<PathBuf>) -> Result<()> {
750    if !dir.exists() {
751        return Ok(());
752    }
753
754    for entry in fs::read_dir(dir)? {
755        let path = entry?.path();
756        if !path.is_file() {
757            continue;
758        }
759
760        let Some(filename) = path.file_name().and_then(|n| n.to_str()) else {
761            continue;
762        };
763        if filename.ends_with(TMP_SUFFIX) || !filename.ends_with(".loro") {
764            continue;
765        }
766
767        let Some(stem) = path.file_stem().and_then(|s| s.to_str()) else {
768            continue;
769        };
770        if Ulid::from_string(stem).is_err() {
771            continue;
772        }
773
774        out.push(path);
775    }
776
777    Ok(())
778}
779
780fn project_dir(root: &Path, project: &str) -> PathBuf {
781    root.join(PROJECTS_DIR).join(project)
782}
783
784fn load_or_create_device_peer_id(root: &Path) -> Result<PeerID> {
785    let path = root.join("device_id");
786    if let Some(parent) = path.parent() {
787        fs::create_dir_all(parent)?;
788    }
789
790    let device_ulid = if path.exists() {
791        let content = fs::read_to_string(&path)
792            .with_context(|| format!("failed reading device id from '{}'", path.display()))?;
793        Ulid::from_string(content.trim()).context("invalid persisted device id ULID")?
794    } else {
795        let id = Ulid::new();
796        atomic_write_file(&path, id.to_string().as_bytes())?;
797        id
798    };
799
800    let raw: u128 = device_ulid.into();
801    Ok((raw & u64::MAX as u128) as u64)
802}
803
804fn atomic_write_file(path: &Path, bytes: &[u8]) -> Result<()> {
805    let parent = path
806        .parent()
807        .ok_or_else(|| anyhow!("cannot atomically write root path '{}'", path.display()))?;
808    fs::create_dir_all(parent)?;
809
810    let tmp_name = format!(
811        "{}.{}{}",
812        path.file_name().and_then(|n| n.to_str()).unwrap_or("write"),
813        Ulid::new(),
814        TMP_SUFFIX
815    );
816    let tmp_path = parent.join(tmp_name);
817
818    {
819        let mut file = OpenOptions::new()
820            .create_new(true)
821            .write(true)
822            .open(&tmp_path)
823            .with_context(|| format!("failed to open temp file '{}'", tmp_path.display()))?;
824        file.write_all(bytes)?;
825        file.sync_all()?;
826    }
827
828    fs::rename(&tmp_path, path).with_context(|| {
829        format!(
830            "failed to atomically rename '{}' to '{}'",
831            tmp_path.display(),
832            path.display()
833        )
834    })?;
835
836    sync_dir(parent)?;
837    Ok(())
838}
839
840fn sync_dir(path: &Path) -> Result<()> {
841    let dir =
842        File::open(path).with_context(|| format!("failed opening dir '{}'", path.display()))?;
843    dir.sync_all()
844        .with_context(|| format!("failed fsync on dir '{}'", path.display()))?;
845    Ok(())
846}