1package db
2
3import (
4 "context"
5 "database/sql"
6 "embed"
7 "fmt"
8 "log/slog"
9 "os"
10 "path/filepath"
11 "sync"
12 "testing"
13
14 "github.com/pressly/goose/v3"
15)
16
17var (
18 pragmas = map[string]string{
19 "foreign_keys": "ON",
20 "journal_mode": "WAL",
21 "page_size": "4096",
22 "cache_size": "-8000",
23 "synchronous": "NORMAL",
24 "secure_delete": "ON",
25 "busy_timeout": "30000",
26 }
27 gooseInitOnce sync.Once
28 gooseInitErr error
29)
30
31//go:embed migrations/*.sql
32var FS embed.FS
33
34func init() {
35 goose.SetBaseFS(FS)
36
37 if testing.Testing() {
38 goose.SetLogger(goose.NopLogger())
39 }
40}
41
42// connEntry holds a shared database connection, its reference count,
43// and the data-directory lock that gates access to this entry. The
44// lock is acquired exactly once when the entry is created and released
45// when the last reference is dropped, which lets the same process open
46// the same data directory concurrently while still blocking a second
47// crush process from racing the storage.
48type connEntry struct {
49 db *sql.DB
50 refCount int
51 lock *dataDirLock
52}
53
54var (
55 pool = make(map[string]*connEntry)
56 poolMu sync.Mutex
57)
58
59// Connect opens a SQLite database connection for the given data
60// directory and runs migrations. If a connection to the same database
61// file already exists, the existing connection is returned with its
62// reference count incremented. Callers must pair each Connect with a
63// [Release] when they no longer need the connection.
64func Connect(ctx context.Context, dataDir string) (*sql.DB, error) {
65 if dataDir == "" {
66 return nil, fmt.Errorf("data.dir is not set")
67 }
68
69 dbPath := filepath.Join(dataDir, "crush.db")
70
71 // Resolve to an absolute path so that different relative paths to
72 // the same file share a single connection.
73 absPath, err := filepath.Abs(dbPath)
74 if err != nil {
75 absPath = dbPath
76 }
77
78 poolMu.Lock()
79 defer poolMu.Unlock()
80
81 if entry, ok := pool[absPath]; ok {
82 entry.refCount++
83 return entry.db, nil
84 }
85
86 // Take the per-data-directory lock before opening the database so
87 // we fail fast and with a clear error rather than racing another
88 // crush process on the same SQLite file. The lock is released when
89 // the matching Release call drops the refcount to zero. Ensuring
90 // the data directory exists is required because the lock file
91 // lives inside it.
92 if err := os.MkdirAll(dataDir, 0o700); err != nil {
93 return nil, fmt.Errorf("failed to create data directory %q: %w", dataDir, err)
94 }
95 lock, err := acquireDataDirLock(dataDir)
96 if err != nil {
97 return nil, err
98 }
99
100 conn, err := openDB(dbPath)
101 if err != nil {
102 lock.release()
103 return nil, err
104 }
105
106 // Serialize all access through a single connection. SQLite
107 // serializes writes at the file level anyway, and allowing multiple
108 // pool connections to interleave writes/checkpoints (especially
109 // under concurrent sub-agents) has caused WAL/header desync
110 // resulting in SQLITE_NOTADB (26) on the next open.
111 conn.SetMaxOpenConns(1)
112
113 if err = conn.PingContext(ctx); err != nil {
114 conn.Close()
115 lock.release()
116 return nil, fmt.Errorf("failed to connect to database: %w", err)
117 }
118
119 if err := initGoose(); err != nil {
120 conn.Close()
121 lock.release()
122 slog.Error("Failed to initialize goose", "error", err)
123 return nil, fmt.Errorf("failed to initialize goose: %w", err)
124 }
125
126 if err := goose.Up(conn, "migrations"); err != nil {
127 conn.Close()
128 lock.release()
129 slog.Error("Failed to apply migrations", "error", err)
130 return nil, fmt.Errorf("failed to apply migrations: %w", err)
131 }
132
133 pool[absPath] = &connEntry{db: conn, refCount: 1, lock: lock}
134 return conn, nil
135}
136
137// Release decrements the reference count for the database at the given
138// data directory. When the count reaches zero the underlying connection
139// is closed and removed from the pool.
140func Release(dataDir string) error {
141 dbPath := filepath.Join(dataDir, "crush.db")
142 absPath, err := filepath.Abs(dbPath)
143 if err != nil {
144 absPath = dbPath
145 }
146
147 poolMu.Lock()
148 defer poolMu.Unlock()
149
150 entry, ok := pool[absPath]
151 if !ok {
152 return nil
153 }
154
155 entry.refCount--
156 if entry.refCount > 0 {
157 return nil
158 }
159
160 delete(pool, absPath)
161 closeErr := entry.db.Close()
162 if entry.lock != nil {
163 entry.lock.release()
164 }
165 return closeErr
166}
167
168// ResetPool closes all pooled connections and clears the pool. This is
169// intended for use in tests to ensure a clean state between test cases.
170func ResetPool() {
171 poolMu.Lock()
172 defer poolMu.Unlock()
173 for path, entry := range pool {
174 entry.db.Close()
175 if entry.lock != nil {
176 entry.lock.release()
177 }
178 delete(pool, path)
179 }
180}
181
182func initGoose() error {
183 gooseInitOnce.Do(func() {
184 goose.SetBaseFS(FS)
185 gooseInitErr = goose.SetDialect("sqlite3")
186 })
187
188 return gooseInitErr
189}