1package db
2
3import (
4 "context"
5 "database/sql"
6 "embed"
7 "fmt"
8 "log/slog"
9 "os"
10 "path/filepath"
11 "sync"
12 "testing"
13
14 "github.com/pressly/goose/v3"
15)
16
17var (
18 pragmas = map[string]string{
19 "foreign_keys": "ON",
20 "journal_mode": "WAL",
21 "page_size": "4096",
22 "temp_store": "MEMORY",
23 "cache_size": "-8000",
24 "synchronous": "NORMAL",
25 "secure_delete": "ON",
26 "busy_timeout": "30000",
27 }
28 gooseInitOnce sync.Once
29 gooseInitErr error
30)
31
32//go:embed migrations/*.sql
33var FS embed.FS
34
35func init() {
36 goose.SetBaseFS(FS)
37
38 if testing.Testing() {
39 goose.SetLogger(goose.NopLogger())
40 }
41}
42
43// connEntry holds a shared database connection, its reference count,
44// and the data-directory lock that gates access to this entry. The
45// lock is acquired exactly once when the entry is created and released
46// when the last reference is dropped, which lets the same process open
47// the same data directory concurrently while still blocking a second
48// crush process from racing the storage.
49type connEntry struct {
50 db *sql.DB
51 refCount int
52 lock *dataDirLock
53}
54
55var (
56 pool = make(map[string]*connEntry)
57 poolMu sync.Mutex
58)
59
60// ConnectOption configures a Connect call. Options are applied in
61// order; later options override earlier ones for the same field.
62type ConnectOption func(*connectOptions)
63
64// connectOptions holds the resolved configuration for a Connect call.
65type connectOptions struct {
66 lockDataDir bool
67}
68
69// WithDataDirLock toggles acquisition of the per-data-directory lock
70// for this Connect call. The lock is off by default so local-mode
71// invocations do not regress today's behavior; the server's
72// workspace-bootstrap path opts in. CRUSH_SKIP_DATADIR_LOCK still
73// bypasses acquisition even when this option is set.
74func WithDataDirLock(enable bool) ConnectOption {
75 return func(o *connectOptions) { o.lockDataDir = enable }
76}
77
78// Connect opens a SQLite database connection for the given data
79// directory and runs migrations. If a connection to the same database
80// file already exists, the existing connection is returned with its
81// reference count incremented. Callers must pair each Connect with a
82// [Release] when they no longer need the connection.
83func Connect(ctx context.Context, dataDir string, opts ...ConnectOption) (*sql.DB, error) {
84 if dataDir == "" {
85 return nil, fmt.Errorf("data.dir is not set")
86 }
87
88 var cfg connectOptions
89 for _, opt := range opts {
90 opt(&cfg)
91 }
92
93 dbPath := filepath.Join(dataDir, "crush.db")
94
95 // Resolve to an absolute path so that different relative paths to
96 // the same file share a single connection.
97 absPath, err := filepath.Abs(dbPath)
98 if err != nil {
99 absPath = dbPath
100 }
101
102 poolMu.Lock()
103 defer poolMu.Unlock()
104
105 if entry, ok := pool[absPath]; ok {
106 entry.refCount++
107 return entry.db, nil
108 }
109
110 // Take the per-data-directory lock before opening the database so
111 // we fail fast and with a clear error rather than racing another
112 // crush process on the same SQLite file. The lock is released when
113 // the matching Release call drops the refcount to zero. Ensuring
114 // the data directory exists is required because the lock file
115 // lives inside it. Locking is opt-in via WithDataDirLock so that
116 // local-mode invocations do not refuse a second crush against the
117 // same data dir until client/server becomes the default.
118 if err := os.MkdirAll(dataDir, 0o700); err != nil {
119 return nil, fmt.Errorf("failed to create data directory %q: %w", dataDir, err)
120 }
121 var lock *dataDirLock
122 if cfg.lockDataDir && !skipDataDirLock() {
123 lock, err = acquireDataDirLock(dataDir)
124 if err != nil {
125 return nil, err
126 }
127 }
128
129 conn, err := openDB(dbPath)
130 if err != nil {
131 if lock != nil {
132 lock.release()
133 }
134 return nil, err
135 }
136
137 // Serialize all access through a single connection. SQLite
138 // serializes writes at the file level anyway, and allowing multiple
139 // pool connections to interleave writes/checkpoints (especially
140 // under concurrent sub-agents) has caused WAL/header desync
141 // resulting in SQLITE_NOTADB (26) on the next open.
142 conn.SetMaxOpenConns(1)
143
144 releaseLock := func() {
145 if lock != nil {
146 lock.release()
147 }
148 }
149
150 if err = conn.PingContext(ctx); err != nil {
151 conn.Close()
152 releaseLock()
153 return nil, fmt.Errorf("failed to connect to database: %w", err)
154 }
155
156 if err := initGoose(); err != nil {
157 conn.Close()
158 releaseLock()
159 slog.Error("Failed to initialize goose", "error", err)
160 return nil, fmt.Errorf("failed to initialize goose: %w", err)
161 }
162
163 if err := goose.Up(conn, "migrations"); err != nil {
164 conn.Close()
165 releaseLock()
166 slog.Error("Failed to apply migrations", "error", err)
167 return nil, fmt.Errorf("failed to apply migrations: %w", err)
168 }
169
170 pool[absPath] = &connEntry{db: conn, refCount: 1, lock: lock}
171 return conn, nil
172}
173
174// Release decrements the reference count for the database at the given
175// data directory. When the count reaches zero the underlying connection
176// is closed and removed from the pool.
177func Release(dataDir string) error {
178 dbPath := filepath.Join(dataDir, "crush.db")
179 absPath, err := filepath.Abs(dbPath)
180 if err != nil {
181 absPath = dbPath
182 }
183
184 poolMu.Lock()
185 defer poolMu.Unlock()
186
187 entry, ok := pool[absPath]
188 if !ok {
189 return nil
190 }
191
192 entry.refCount--
193 if entry.refCount > 0 {
194 return nil
195 }
196
197 delete(pool, absPath)
198 closeErr := entry.db.Close()
199 if entry.lock != nil {
200 entry.lock.release()
201 }
202 return closeErr
203}
204
205// ResetPool closes all pooled connections and clears the pool. This is
206// intended for use in tests to ensure a clean state between test cases.
207func ResetPool() {
208 poolMu.Lock()
209 defer poolMu.Unlock()
210 for path, entry := range pool {
211 entry.db.Close()
212 if entry.lock != nil {
213 entry.lock.release()
214 }
215 delete(pool, path)
216 }
217}
218
219func initGoose() error {
220 gooseInitOnce.Do(func() {
221 goose.SetBaseFS(FS)
222 gooseInitErr = goose.SetDialect("sqlite3")
223 })
224
225 return gooseInitErr
226}