1package db
2
3import (
4 "context"
5 "database/sql"
6 "embed"
7 "fmt"
8 "log/slog"
9 "os"
10 "path/filepath"
11 "sync"
12 "testing"
13
14 "github.com/pressly/goose/v3"
15)
16
17var (
18 pragmas = map[string]string{
19 "foreign_keys": "ON",
20 "journal_mode": "WAL",
21 "page_size": "4096",
22 "cache_size": "-8000",
23 "synchronous": "NORMAL",
24 "secure_delete": "ON",
25 "busy_timeout": "30000",
26 }
27 gooseInitOnce sync.Once
28 gooseInitErr error
29)
30
31//go:embed migrations/*.sql
32var FS embed.FS
33
34func init() {
35 goose.SetBaseFS(FS)
36
37 if testing.Testing() {
38 goose.SetLogger(goose.NopLogger())
39 }
40}
41
42// connEntry holds a shared database connection, its reference count,
43// and the data-directory lock that gates access to this entry. The
44// lock is acquired exactly once when the entry is created and released
45// when the last reference is dropped, which lets the same process open
46// the same data directory concurrently while still blocking a second
47// crush process from racing the storage.
48type connEntry struct {
49 db *sql.DB
50 refCount int
51 lock *dataDirLock
52}
53
54var (
55 pool = make(map[string]*connEntry)
56 poolMu sync.Mutex
57)
58
59// ConnectOption configures a Connect call. Options are applied in
60// order; later options override earlier ones for the same field.
61type ConnectOption func(*connectOptions)
62
63// connectOptions holds the resolved configuration for a Connect call.
64type connectOptions struct {
65 lockDataDir bool
66}
67
68// WithDataDirLock toggles acquisition of the per-data-directory lock
69// for this Connect call. The lock is off by default so local-mode
70// invocations do not regress today's behavior; the server's
71// workspace-bootstrap path opts in. CRUSH_SKIP_DATADIR_LOCK still
72// bypasses acquisition even when this option is set.
73func WithDataDirLock(enable bool) ConnectOption {
74 return func(o *connectOptions) { o.lockDataDir = enable }
75}
76
77// Connect opens a SQLite database connection for the given data
78// directory and runs migrations. If a connection to the same database
79// file already exists, the existing connection is returned with its
80// reference count incremented. Callers must pair each Connect with a
81// [Release] when they no longer need the connection.
82func Connect(ctx context.Context, dataDir string, opts ...ConnectOption) (*sql.DB, error) {
83 if dataDir == "" {
84 return nil, fmt.Errorf("data.dir is not set")
85 }
86
87 var cfg connectOptions
88 for _, opt := range opts {
89 opt(&cfg)
90 }
91
92 dbPath := filepath.Join(dataDir, "crush.db")
93
94 // Resolve to an absolute path so that different relative paths to
95 // the same file share a single connection.
96 absPath, err := filepath.Abs(dbPath)
97 if err != nil {
98 absPath = dbPath
99 }
100
101 poolMu.Lock()
102 defer poolMu.Unlock()
103
104 if entry, ok := pool[absPath]; ok {
105 entry.refCount++
106 return entry.db, nil
107 }
108
109 // Take the per-data-directory lock before opening the database so
110 // we fail fast and with a clear error rather than racing another
111 // crush process on the same SQLite file. The lock is released when
112 // the matching Release call drops the refcount to zero. Ensuring
113 // the data directory exists is required because the lock file
114 // lives inside it. Locking is opt-in via WithDataDirLock so that
115 // local-mode invocations do not refuse a second crush against the
116 // same data dir until client/server becomes the default.
117 if err := os.MkdirAll(dataDir, 0o700); err != nil {
118 return nil, fmt.Errorf("failed to create data directory %q: %w", dataDir, err)
119 }
120 var lock *dataDirLock
121 if cfg.lockDataDir && !skipDataDirLock() {
122 lock, err = acquireDataDirLock(dataDir)
123 if err != nil {
124 return nil, err
125 }
126 }
127
128 conn, err := openDB(dbPath)
129 if err != nil {
130 if lock != nil {
131 lock.release()
132 }
133 return nil, err
134 }
135
136 // Serialize all access through a single connection. SQLite
137 // serializes writes at the file level anyway, and allowing multiple
138 // pool connections to interleave writes/checkpoints (especially
139 // under concurrent sub-agents) has caused WAL/header desync
140 // resulting in SQLITE_NOTADB (26) on the next open.
141 conn.SetMaxOpenConns(1)
142
143 releaseLock := func() {
144 if lock != nil {
145 lock.release()
146 }
147 }
148
149 if err = conn.PingContext(ctx); err != nil {
150 conn.Close()
151 releaseLock()
152 return nil, fmt.Errorf("failed to connect to database: %w", err)
153 }
154
155 if err := initGoose(); err != nil {
156 conn.Close()
157 releaseLock()
158 slog.Error("Failed to initialize goose", "error", err)
159 return nil, fmt.Errorf("failed to initialize goose: %w", err)
160 }
161
162 if err := goose.Up(conn, "migrations"); err != nil {
163 conn.Close()
164 releaseLock()
165 slog.Error("Failed to apply migrations", "error", err)
166 return nil, fmt.Errorf("failed to apply migrations: %w", err)
167 }
168
169 pool[absPath] = &connEntry{db: conn, refCount: 1, lock: lock}
170 return conn, nil
171}
172
173// Release decrements the reference count for the database at the given
174// data directory. When the count reaches zero the underlying connection
175// is closed and removed from the pool.
176func Release(dataDir string) error {
177 dbPath := filepath.Join(dataDir, "crush.db")
178 absPath, err := filepath.Abs(dbPath)
179 if err != nil {
180 absPath = dbPath
181 }
182
183 poolMu.Lock()
184 defer poolMu.Unlock()
185
186 entry, ok := pool[absPath]
187 if !ok {
188 return nil
189 }
190
191 entry.refCount--
192 if entry.refCount > 0 {
193 return nil
194 }
195
196 delete(pool, absPath)
197 closeErr := entry.db.Close()
198 if entry.lock != nil {
199 entry.lock.release()
200 }
201 return closeErr
202}
203
204// ResetPool closes all pooled connections and clears the pool. This is
205// intended for use in tests to ensure a clean state between test cases.
206func ResetPool() {
207 poolMu.Lock()
208 defer poolMu.Unlock()
209 for path, entry := range pool {
210 entry.db.Close()
211 if entry.lock != nil {
212 entry.lock.release()
213 }
214 delete(pool, path)
215 }
216}
217
218func initGoose() error {
219 gooseInitOnce.Do(func() {
220 goose.SetBaseFS(FS)
221 gooseInitErr = goose.SetDialect("sqlite3")
222 })
223
224 return gooseInitErr
225}