connect.go

  1package db
  2
  3import (
  4	"context"
  5	"database/sql"
  6	"embed"
  7	"fmt"
  8	"log/slog"
  9	"os"
 10	"path/filepath"
 11	"sync"
 12	"testing"
 13
 14	"github.com/pressly/goose/v3"
 15)
 16
 17var (
 18	pragmas = map[string]string{
 19		"foreign_keys":  "ON",
 20		"journal_mode":  "WAL",
 21		"page_size":     "4096",
 22		"cache_size":    "-8000",
 23		"synchronous":   "NORMAL",
 24		"secure_delete": "ON",
 25		"busy_timeout":  "30000",
 26	}
 27	gooseInitOnce sync.Once
 28	gooseInitErr  error
 29)
 30
 31//go:embed migrations/*.sql
 32var FS embed.FS
 33
 34func init() {
 35	goose.SetBaseFS(FS)
 36
 37	if testing.Testing() {
 38		goose.SetLogger(goose.NopLogger())
 39	}
 40}
 41
 42// connEntry holds a shared database connection, its reference count,
 43// and the data-directory lock that gates access to this entry. The
 44// lock is acquired exactly once when the entry is created and released
 45// when the last reference is dropped, which lets the same process open
 46// the same data directory concurrently while still blocking a second
 47// crush process from racing the storage.
 48type connEntry struct {
 49	db       *sql.DB
 50	refCount int
 51	lock     *dataDirLock
 52}
 53
 54var (
 55	pool   = make(map[string]*connEntry)
 56	poolMu sync.Mutex
 57)
 58
 59// Connect opens a SQLite database connection for the given data
 60// directory and runs migrations. If a connection to the same database
 61// file already exists, the existing connection is returned with its
 62// reference count incremented. Callers must pair each Connect with a
 63// [Release] when they no longer need the connection.
 64func Connect(ctx context.Context, dataDir string) (*sql.DB, error) {
 65	if dataDir == "" {
 66		return nil, fmt.Errorf("data.dir is not set")
 67	}
 68
 69	dbPath := filepath.Join(dataDir, "crush.db")
 70
 71	// Resolve to an absolute path so that different relative paths to
 72	// the same file share a single connection.
 73	absPath, err := filepath.Abs(dbPath)
 74	if err != nil {
 75		absPath = dbPath
 76	}
 77
 78	poolMu.Lock()
 79	defer poolMu.Unlock()
 80
 81	if entry, ok := pool[absPath]; ok {
 82		entry.refCount++
 83		return entry.db, nil
 84	}
 85
 86	// Take the per-data-directory lock before opening the database so
 87	// we fail fast and with a clear error rather than racing another
 88	// crush process on the same SQLite file. The lock is released when
 89	// the matching Release call drops the refcount to zero. Ensuring
 90	// the data directory exists is required because the lock file
 91	// lives inside it.
 92	if err := os.MkdirAll(dataDir, 0o700); err != nil {
 93		return nil, fmt.Errorf("failed to create data directory %q: %w", dataDir, err)
 94	}
 95	lock, err := acquireDataDirLock(dataDir)
 96	if err != nil {
 97		return nil, err
 98	}
 99
100	conn, err := openDB(dbPath)
101	if err != nil {
102		lock.release()
103		return nil, err
104	}
105
106	// Serialize all access through a single connection. SQLite
107	// serializes writes at the file level anyway, and allowing multiple
108	// pool connections to interleave writes/checkpoints (especially
109	// under concurrent sub-agents) has caused WAL/header desync
110	// resulting in SQLITE_NOTADB (26) on the next open.
111	conn.SetMaxOpenConns(1)
112
113	if err = conn.PingContext(ctx); err != nil {
114		conn.Close()
115		lock.release()
116		return nil, fmt.Errorf("failed to connect to database: %w", err)
117	}
118
119	if err := initGoose(); err != nil {
120		conn.Close()
121		lock.release()
122		slog.Error("Failed to initialize goose", "error", err)
123		return nil, fmt.Errorf("failed to initialize goose: %w", err)
124	}
125
126	if err := goose.Up(conn, "migrations"); err != nil {
127		conn.Close()
128		lock.release()
129		slog.Error("Failed to apply migrations", "error", err)
130		return nil, fmt.Errorf("failed to apply migrations: %w", err)
131	}
132
133	pool[absPath] = &connEntry{db: conn, refCount: 1, lock: lock}
134	return conn, nil
135}
136
137// Release decrements the reference count for the database at the given
138// data directory. When the count reaches zero the underlying connection
139// is closed and removed from the pool.
140func Release(dataDir string) error {
141	dbPath := filepath.Join(dataDir, "crush.db")
142	absPath, err := filepath.Abs(dbPath)
143	if err != nil {
144		absPath = dbPath
145	}
146
147	poolMu.Lock()
148	defer poolMu.Unlock()
149
150	entry, ok := pool[absPath]
151	if !ok {
152		return nil
153	}
154
155	entry.refCount--
156	if entry.refCount > 0 {
157		return nil
158	}
159
160	delete(pool, absPath)
161	closeErr := entry.db.Close()
162	if entry.lock != nil {
163		entry.lock.release()
164	}
165	return closeErr
166}
167
168// ResetPool closes all pooled connections and clears the pool. This is
169// intended for use in tests to ensure a clean state between test cases.
170func ResetPool() {
171	poolMu.Lock()
172	defer poolMu.Unlock()
173	for path, entry := range pool {
174		entry.db.Close()
175		if entry.lock != nil {
176			entry.lock.release()
177		}
178		delete(pool, path)
179	}
180}
181
182func initGoose() error {
183	gooseInitOnce.Do(func() {
184		goose.SetBaseFS(FS)
185		gooseInitErr = goose.SetDialect("sqlite3")
186	})
187
188	return gooseInitErr
189}