connect.go

  1package db
  2
  3import (
  4	"context"
  5	"database/sql"
  6	"embed"
  7	"fmt"
  8	"log/slog"
  9	"os"
 10	"path/filepath"
 11	"sync"
 12	"testing"
 13
 14	"github.com/pressly/goose/v3"
 15)
 16
 17var (
 18	pragmas = map[string]string{
 19		"foreign_keys":  "ON",
 20		"journal_mode":  "WAL",
 21		"page_size":     "4096",
 22		"cache_size":    "-8000",
 23		"synchronous":   "NORMAL",
 24		"secure_delete": "ON",
 25		"busy_timeout":  "30000",
 26	}
 27	gooseInitOnce sync.Once
 28	gooseInitErr  error
 29)
 30
 31//go:embed migrations/*.sql
 32var FS embed.FS
 33
 34func init() {
 35	goose.SetBaseFS(FS)
 36
 37	if testing.Testing() {
 38		goose.SetLogger(goose.NopLogger())
 39	}
 40}
 41
 42// connEntry holds a shared database connection, its reference count,
 43// and the data-directory lock that gates access to this entry. The
 44// lock is acquired exactly once when the entry is created and released
 45// when the last reference is dropped, which lets the same process open
 46// the same data directory concurrently while still blocking a second
 47// crush process from racing the storage.
 48type connEntry struct {
 49	db       *sql.DB
 50	refCount int
 51	lock     *dataDirLock
 52}
 53
 54var (
 55	pool   = make(map[string]*connEntry)
 56	poolMu sync.Mutex
 57)
 58
 59// ConnectOption configures a Connect call. Options are applied in
 60// order; later options override earlier ones for the same field.
 61type ConnectOption func(*connectOptions)
 62
 63// connectOptions holds the resolved configuration for a Connect call.
 64type connectOptions struct {
 65	lockDataDir bool
 66}
 67
 68// WithDataDirLock toggles acquisition of the per-data-directory lock
 69// for this Connect call. The lock is off by default so local-mode
 70// invocations do not regress today's behavior; the server's
 71// workspace-bootstrap path opts in. CRUSH_SKIP_DATADIR_LOCK still
 72// bypasses acquisition even when this option is set.
 73func WithDataDirLock(enable bool) ConnectOption {
 74	return func(o *connectOptions) { o.lockDataDir = enable }
 75}
 76
 77// Connect opens a SQLite database connection for the given data
 78// directory and runs migrations. If a connection to the same database
 79// file already exists, the existing connection is returned with its
 80// reference count incremented. Callers must pair each Connect with a
 81// [Release] when they no longer need the connection.
 82func Connect(ctx context.Context, dataDir string, opts ...ConnectOption) (*sql.DB, error) {
 83	if dataDir == "" {
 84		return nil, fmt.Errorf("data.dir is not set")
 85	}
 86
 87	var cfg connectOptions
 88	for _, opt := range opts {
 89		opt(&cfg)
 90	}
 91
 92	dbPath := filepath.Join(dataDir, "crush.db")
 93
 94	// Resolve to an absolute path so that different relative paths to
 95	// the same file share a single connection.
 96	absPath, err := filepath.Abs(dbPath)
 97	if err != nil {
 98		absPath = dbPath
 99	}
100
101	poolMu.Lock()
102	defer poolMu.Unlock()
103
104	if entry, ok := pool[absPath]; ok {
105		entry.refCount++
106		return entry.db, nil
107	}
108
109	// Take the per-data-directory lock before opening the database so
110	// we fail fast and with a clear error rather than racing another
111	// crush process on the same SQLite file. The lock is released when
112	// the matching Release call drops the refcount to zero. Ensuring
113	// the data directory exists is required because the lock file
114	// lives inside it. Locking is opt-in via WithDataDirLock so that
115	// local-mode invocations do not refuse a second crush against the
116	// same data dir until client/server becomes the default.
117	if err := os.MkdirAll(dataDir, 0o700); err != nil {
118		return nil, fmt.Errorf("failed to create data directory %q: %w", dataDir, err)
119	}
120	var lock *dataDirLock
121	if cfg.lockDataDir && !skipDataDirLock() {
122		lock, err = acquireDataDirLock(dataDir)
123		if err != nil {
124			return nil, err
125		}
126	}
127
128	conn, err := openDB(dbPath)
129	if err != nil {
130		if lock != nil {
131			lock.release()
132		}
133		return nil, err
134	}
135
136	// Serialize all access through a single connection. SQLite
137	// serializes writes at the file level anyway, and allowing multiple
138	// pool connections to interleave writes/checkpoints (especially
139	// under concurrent sub-agents) has caused WAL/header desync
140	// resulting in SQLITE_NOTADB (26) on the next open.
141	conn.SetMaxOpenConns(1)
142
143	releaseLock := func() {
144		if lock != nil {
145			lock.release()
146		}
147	}
148
149	if err = conn.PingContext(ctx); err != nil {
150		conn.Close()
151		releaseLock()
152		return nil, fmt.Errorf("failed to connect to database: %w", err)
153	}
154
155	if err := initGoose(); err != nil {
156		conn.Close()
157		releaseLock()
158		slog.Error("Failed to initialize goose", "error", err)
159		return nil, fmt.Errorf("failed to initialize goose: %w", err)
160	}
161
162	if err := goose.Up(conn, "migrations"); err != nil {
163		conn.Close()
164		releaseLock()
165		slog.Error("Failed to apply migrations", "error", err)
166		return nil, fmt.Errorf("failed to apply migrations: %w", err)
167	}
168
169	pool[absPath] = &connEntry{db: conn, refCount: 1, lock: lock}
170	return conn, nil
171}
172
173// Release decrements the reference count for the database at the given
174// data directory. When the count reaches zero the underlying connection
175// is closed and removed from the pool.
176func Release(dataDir string) error {
177	dbPath := filepath.Join(dataDir, "crush.db")
178	absPath, err := filepath.Abs(dbPath)
179	if err != nil {
180		absPath = dbPath
181	}
182
183	poolMu.Lock()
184	defer poolMu.Unlock()
185
186	entry, ok := pool[absPath]
187	if !ok {
188		return nil
189	}
190
191	entry.refCount--
192	if entry.refCount > 0 {
193		return nil
194	}
195
196	delete(pool, absPath)
197	closeErr := entry.db.Close()
198	if entry.lock != nil {
199		entry.lock.release()
200	}
201	return closeErr
202}
203
204// ResetPool closes all pooled connections and clears the pool. This is
205// intended for use in tests to ensure a clean state between test cases.
206func ResetPool() {
207	poolMu.Lock()
208	defer poolMu.Unlock()
209	for path, entry := range pool {
210		entry.db.Close()
211		if entry.lock != nil {
212			entry.lock.release()
213		}
214		delete(pool, path)
215	}
216}
217
218func initGoose() error {
219	gooseInitOnce.Do(func() {
220		goose.SetBaseFS(FS)
221		gooseInitErr = goose.SetDialect("sqlite3")
222	})
223
224	return gooseInitErr
225}