connect.go

  1package db
  2
  3import (
  4	"context"
  5	"database/sql"
  6	"embed"
  7	"fmt"
  8	"log/slog"
  9	"path/filepath"
 10	"sync"
 11	"testing"
 12
 13	"github.com/pressly/goose/v3"
 14)
 15
 16var (
 17	pragmas = map[string]string{
 18		"foreign_keys":  "ON",
 19		"journal_mode":  "WAL",
 20		"page_size":     "4096",
 21		"temp_store":    "MEMORY",
 22		"cache_size":    "-8000",
 23		"synchronous":   "NORMAL",
 24		"secure_delete": "ON",
 25		"busy_timeout":  "30000",
 26	}
 27	gooseInitOnce sync.Once
 28	gooseInitErr  error
 29)
 30
 31//go:embed migrations/*.sql
 32var FS embed.FS
 33
 34func init() {
 35	goose.SetBaseFS(FS)
 36
 37	if testing.Testing() {
 38		goose.SetLogger(goose.NopLogger())
 39	}
 40}
 41
 42// connEntry holds a shared database connection and its reference count.
 43type connEntry struct {
 44	db       *sql.DB
 45	refCount int
 46}
 47
 48var (
 49	pool   = make(map[string]*connEntry)
 50	poolMu sync.Mutex
 51)
 52
 53// Connect opens a SQLite database connection for the given data
 54// directory and runs migrations. If a connection to the same database
 55// file already exists, the existing connection is returned with its
 56// reference count incremented. Callers must pair each Connect with a
 57// [Release] when they no longer need the connection.
 58func Connect(ctx context.Context, dataDir string) (*sql.DB, error) {
 59	if dataDir == "" {
 60		return nil, fmt.Errorf("data.dir is not set")
 61	}
 62
 63	dbPath := filepath.Join(dataDir, "crush.db")
 64
 65	// Resolve to an absolute path so that different relative paths to
 66	// the same file share a single connection.
 67	absPath, err := filepath.Abs(dbPath)
 68	if err != nil {
 69		absPath = dbPath
 70	}
 71
 72	poolMu.Lock()
 73	defer poolMu.Unlock()
 74
 75	if entry, ok := pool[absPath]; ok {
 76		entry.refCount++
 77		return entry.db, nil
 78	}
 79
 80	conn, err := openDB(dbPath)
 81	if err != nil {
 82		return nil, err
 83	}
 84
 85	// Serialize all access through a single connection. SQLite
 86	// serializes writes at the file level anyway, and allowing multiple
 87	// pool connections to interleave writes/checkpoints (especially
 88	// under concurrent sub-agents) has caused WAL/header desync
 89	// resulting in SQLITE_NOTADB (26) on the next open.
 90	conn.SetMaxOpenConns(1)
 91
 92	if err = conn.PingContext(ctx); err != nil {
 93		conn.Close()
 94		return nil, fmt.Errorf("failed to connect to database: %w", err)
 95	}
 96
 97	if err := initGoose(); err != nil {
 98		conn.Close()
 99		slog.Error("Failed to initialize goose", "error", err)
100		return nil, fmt.Errorf("failed to initialize goose: %w", err)
101	}
102
103	if err := goose.Up(conn, "migrations"); err != nil {
104		conn.Close()
105		slog.Error("Failed to apply migrations", "error", err)
106		return nil, fmt.Errorf("failed to apply migrations: %w", err)
107	}
108
109	pool[absPath] = &connEntry{db: conn, refCount: 1}
110	return conn, nil
111}
112
113// Release decrements the reference count for the database at the given
114// data directory. When the count reaches zero the underlying connection
115// is closed and removed from the pool.
116func Release(dataDir string) error {
117	dbPath := filepath.Join(dataDir, "crush.db")
118	absPath, err := filepath.Abs(dbPath)
119	if err != nil {
120		absPath = dbPath
121	}
122
123	poolMu.Lock()
124	defer poolMu.Unlock()
125
126	entry, ok := pool[absPath]
127	if !ok {
128		return nil
129	}
130
131	entry.refCount--
132	if entry.refCount > 0 {
133		return nil
134	}
135
136	delete(pool, absPath)
137	return entry.db.Close()
138}
139
140// ResetPool closes all pooled connections and clears the pool. This is
141// intended for use in tests to ensure a clean state between test cases.
142func ResetPool() {
143	poolMu.Lock()
144	defer poolMu.Unlock()
145	for path, entry := range pool {
146		entry.db.Close()
147		delete(pool, path)
148	}
149}
150
151func initGoose() error {
152	gooseInitOnce.Do(func() {
153		goose.SetBaseFS(FS)
154		gooseInitErr = goose.SetDialect("sqlite3")
155	})
156
157	return gooseInitErr
158}