1package db
2
3import (
4 "context"
5 "database/sql"
6 "embed"
7 "fmt"
8 "log/slog"
9 "path/filepath"
10 "sync"
11 "testing"
12
13 "github.com/pressly/goose/v3"
14)
15
16var (
17 pragmas = map[string]string{
18 "foreign_keys": "ON",
19 "journal_mode": "WAL",
20 "page_size": "4096",
21 "cache_size": "-8000",
22 "synchronous": "NORMAL",
23 "secure_delete": "ON",
24 "busy_timeout": "30000",
25 }
26 gooseInitOnce sync.Once
27 gooseInitErr error
28)
29
30//go:embed migrations/*.sql
31var FS embed.FS
32
33func init() {
34 goose.SetBaseFS(FS)
35
36 if testing.Testing() {
37 goose.SetLogger(goose.NopLogger())
38 }
39}
40
41// connEntry holds a shared database connection and its reference count.
42type connEntry struct {
43 db *sql.DB
44 refCount int
45}
46
47var (
48 pool = make(map[string]*connEntry)
49 poolMu sync.Mutex
50)
51
52// Connect opens a SQLite database connection for the given data
53// directory and runs migrations. If a connection to the same database
54// file already exists, the existing connection is returned with its
55// reference count incremented. Callers must pair each Connect with a
56// [Release] when they no longer need the connection.
57func Connect(ctx context.Context, dataDir string) (*sql.DB, error) {
58 if dataDir == "" {
59 return nil, fmt.Errorf("data.dir is not set")
60 }
61
62 dbPath := filepath.Join(dataDir, "crush.db")
63
64 // Resolve to an absolute path so that different relative paths to
65 // the same file share a single connection.
66 absPath, err := filepath.Abs(dbPath)
67 if err != nil {
68 absPath = dbPath
69 }
70
71 poolMu.Lock()
72 defer poolMu.Unlock()
73
74 if entry, ok := pool[absPath]; ok {
75 entry.refCount++
76 return entry.db, nil
77 }
78
79 conn, err := openDB(dbPath)
80 if err != nil {
81 return nil, err
82 }
83
84 // Serialize all access through a single connection. SQLite
85 // serializes writes at the file level anyway, and allowing multiple
86 // pool connections to interleave writes/checkpoints (especially
87 // under concurrent sub-agents) has caused WAL/header desync
88 // resulting in SQLITE_NOTADB (26) on the next open.
89 conn.SetMaxOpenConns(1)
90
91 if err = conn.PingContext(ctx); err != nil {
92 conn.Close()
93 return nil, fmt.Errorf("failed to connect to database: %w", err)
94 }
95
96 if err := initGoose(); err != nil {
97 conn.Close()
98 slog.Error("Failed to initialize goose", "error", err)
99 return nil, fmt.Errorf("failed to initialize goose: %w", err)
100 }
101
102 if err := goose.Up(conn, "migrations"); err != nil {
103 conn.Close()
104 slog.Error("Failed to apply migrations", "error", err)
105 return nil, fmt.Errorf("failed to apply migrations: %w", err)
106 }
107
108 pool[absPath] = &connEntry{db: conn, refCount: 1}
109 return conn, nil
110}
111
112// Release decrements the reference count for the database at the given
113// data directory. When the count reaches zero the underlying connection
114// is closed and removed from the pool.
115func Release(dataDir string) error {
116 dbPath := filepath.Join(dataDir, "crush.db")
117 absPath, err := filepath.Abs(dbPath)
118 if err != nil {
119 absPath = dbPath
120 }
121
122 poolMu.Lock()
123 defer poolMu.Unlock()
124
125 entry, ok := pool[absPath]
126 if !ok {
127 return nil
128 }
129
130 entry.refCount--
131 if entry.refCount > 0 {
132 return nil
133 }
134
135 delete(pool, absPath)
136 return entry.db.Close()
137}
138
139// ResetPool closes all pooled connections and clears the pool. This is
140// intended for use in tests to ensure a clean state between test cases.
141func ResetPool() {
142 poolMu.Lock()
143 defer poolMu.Unlock()
144 for path, entry := range pool {
145 entry.db.Close()
146 delete(pool, path)
147 }
148}
149
150func initGoose() error {
151 gooseInitOnce.Do(func() {
152 goose.SetBaseFS(FS)
153 gooseInitErr = goose.SetDialect("sqlite3")
154 })
155
156 return gooseInitErr
157}