connect.go

  1package db
  2
  3import (
  4	"context"
  5	"database/sql"
  6	"embed"
  7	"fmt"
  8	"log/slog"
  9	"os"
 10	"path/filepath"
 11	"sync"
 12	"testing"
 13
 14	"github.com/pressly/goose/v3"
 15)
 16
 17var (
 18	pragmas = map[string]string{
 19		"foreign_keys":  "ON",
 20		"journal_mode":  "WAL",
 21		"page_size":     "4096",
 22		"temp_store":    "MEMORY",
 23		"cache_size":    "-8000",
 24		"synchronous":   "NORMAL",
 25		"secure_delete": "ON",
 26		"busy_timeout":  "30000",
 27	}
 28	gooseInitOnce sync.Once
 29	gooseInitErr  error
 30)
 31
 32//go:embed migrations/*.sql
 33var FS embed.FS
 34
 35func init() {
 36	goose.SetBaseFS(FS)
 37
 38	if testing.Testing() {
 39		goose.SetLogger(goose.NopLogger())
 40	}
 41}
 42
 43// connEntry holds a shared database connection, its reference count,
 44// and the data-directory lock that gates access to this entry. The
 45// lock is acquired exactly once when the entry is created and released
 46// when the last reference is dropped, which lets the same process open
 47// the same data directory concurrently while still blocking a second
 48// crush process from racing the storage.
 49type connEntry struct {
 50	db       *sql.DB
 51	refCount int
 52	lock     *dataDirLock
 53}
 54
 55var (
 56	pool   = make(map[string]*connEntry)
 57	poolMu sync.Mutex
 58)
 59
 60// ConnectOption configures a Connect call. Options are applied in
 61// order; later options override earlier ones for the same field.
 62type ConnectOption func(*connectOptions)
 63
 64// connectOptions holds the resolved configuration for a Connect call.
 65type connectOptions struct {
 66	lockDataDir bool
 67}
 68
 69// WithDataDirLock toggles acquisition of the per-data-directory lock
 70// for this Connect call. The lock is off by default so local-mode
 71// invocations do not regress today's behavior; the server's
 72// workspace-bootstrap path opts in. CRUSH_SKIP_DATADIR_LOCK still
 73// bypasses acquisition even when this option is set.
 74func WithDataDirLock(enable bool) ConnectOption {
 75	return func(o *connectOptions) { o.lockDataDir = enable }
 76}
 77
 78// Connect opens a SQLite database connection for the given data
 79// directory and runs migrations. If a connection to the same database
 80// file already exists, the existing connection is returned with its
 81// reference count incremented. Callers must pair each Connect with a
 82// [Release] when they no longer need the connection.
 83func Connect(ctx context.Context, dataDir string, opts ...ConnectOption) (*sql.DB, error) {
 84	if dataDir == "" {
 85		return nil, fmt.Errorf("data.dir is not set")
 86	}
 87
 88	var cfg connectOptions
 89	for _, opt := range opts {
 90		opt(&cfg)
 91	}
 92
 93	dbPath := filepath.Join(dataDir, "crush.db")
 94
 95	// Resolve to an absolute path so that different relative paths to
 96	// the same file share a single connection.
 97	absPath, err := filepath.Abs(dbPath)
 98	if err != nil {
 99		absPath = dbPath
100	}
101
102	poolMu.Lock()
103	defer poolMu.Unlock()
104
105	if entry, ok := pool[absPath]; ok {
106		entry.refCount++
107		return entry.db, nil
108	}
109
110	// Take the per-data-directory lock before opening the database so
111	// we fail fast and with a clear error rather than racing another
112	// crush process on the same SQLite file. The lock is released when
113	// the matching Release call drops the refcount to zero. Ensuring
114	// the data directory exists is required because the lock file
115	// lives inside it. Locking is opt-in via WithDataDirLock so that
116	// local-mode invocations do not refuse a second crush against the
117	// same data dir until client/server becomes the default.
118	if err := os.MkdirAll(dataDir, 0o700); err != nil {
119		return nil, fmt.Errorf("failed to create data directory %q: %w", dataDir, err)
120	}
121	var lock *dataDirLock
122	if cfg.lockDataDir && !skipDataDirLock() {
123		lock, err = acquireDataDirLock(dataDir)
124		if err != nil {
125			return nil, err
126		}
127	}
128
129	conn, err := openDB(dbPath)
130	if err != nil {
131		if lock != nil {
132			lock.release()
133		}
134		return nil, err
135	}
136
137	// Serialize all access through a single connection. SQLite
138	// serializes writes at the file level anyway, and allowing multiple
139	// pool connections to interleave writes/checkpoints (especially
140	// under concurrent sub-agents) has caused WAL/header desync
141	// resulting in SQLITE_NOTADB (26) on the next open.
142	conn.SetMaxOpenConns(1)
143
144	releaseLock := func() {
145		if lock != nil {
146			lock.release()
147		}
148	}
149
150	if err = conn.PingContext(ctx); err != nil {
151		conn.Close()
152		releaseLock()
153		return nil, fmt.Errorf("failed to connect to database: %w", err)
154	}
155
156	if err := initGoose(); err != nil {
157		conn.Close()
158		releaseLock()
159		slog.Error("Failed to initialize goose", "error", err)
160		return nil, fmt.Errorf("failed to initialize goose: %w", err)
161	}
162
163	if err := goose.Up(conn, "migrations"); err != nil {
164		conn.Close()
165		releaseLock()
166		slog.Error("Failed to apply migrations", "error", err)
167		return nil, fmt.Errorf("failed to apply migrations: %w", err)
168	}
169
170	pool[absPath] = &connEntry{db: conn, refCount: 1, lock: lock}
171	return conn, nil
172}
173
174// Release decrements the reference count for the database at the given
175// data directory. When the count reaches zero the underlying connection
176// is closed and removed from the pool.
177func Release(dataDir string) error {
178	dbPath := filepath.Join(dataDir, "crush.db")
179	absPath, err := filepath.Abs(dbPath)
180	if err != nil {
181		absPath = dbPath
182	}
183
184	poolMu.Lock()
185	defer poolMu.Unlock()
186
187	entry, ok := pool[absPath]
188	if !ok {
189		return nil
190	}
191
192	entry.refCount--
193	if entry.refCount > 0 {
194		return nil
195	}
196
197	delete(pool, absPath)
198	closeErr := entry.db.Close()
199	if entry.lock != nil {
200		entry.lock.release()
201	}
202	return closeErr
203}
204
205// ResetPool closes all pooled connections and clears the pool. This is
206// intended for use in tests to ensure a clean state between test cases.
207func ResetPool() {
208	poolMu.Lock()
209	defer poolMu.Unlock()
210	for path, entry := range pool {
211		entry.db.Close()
212		if entry.lock != nil {
213			entry.lock.release()
214		}
215		delete(pool, path)
216	}
217}
218
219func initGoose() error {
220	gooseInitOnce.Do(func() {
221		goose.SetBaseFS(FS)
222		gooseInitErr = goose.SetDialect("sqlite3")
223	})
224
225	return gooseInitErr
226}