connect.go

  1package db
  2
  3import (
  4	"context"
  5	"database/sql"
  6	"embed"
  7	"fmt"
  8	"log/slog"
  9	"path/filepath"
 10	"sync"
 11	"testing"
 12
 13	"github.com/pressly/goose/v3"
 14)
 15
 16var (
 17	pragmas = map[string]string{
 18		"foreign_keys":  "ON",
 19		"journal_mode":  "WAL",
 20		"page_size":     "4096",
 21		"cache_size":    "-8000",
 22		"synchronous":   "NORMAL",
 23		"secure_delete": "ON",
 24		"busy_timeout":  "30000",
 25	}
 26	gooseInitOnce sync.Once
 27	gooseInitErr  error
 28)
 29
 30//go:embed migrations/*.sql
 31var FS embed.FS
 32
 33func init() {
 34	goose.SetBaseFS(FS)
 35
 36	if testing.Testing() {
 37		goose.SetLogger(goose.NopLogger())
 38	}
 39}
 40
 41// connEntry holds a shared database connection and its reference count.
 42type connEntry struct {
 43	db       *sql.DB
 44	refCount int
 45}
 46
 47var (
 48	pool   = make(map[string]*connEntry)
 49	poolMu sync.Mutex
 50)
 51
 52// Connect opens a SQLite database connection for the given data
 53// directory and runs migrations. If a connection to the same database
 54// file already exists, the existing connection is returned with its
 55// reference count incremented. Callers must pair each Connect with a
 56// [Release] when they no longer need the connection.
 57func Connect(ctx context.Context, dataDir string) (*sql.DB, error) {
 58	if dataDir == "" {
 59		return nil, fmt.Errorf("data.dir is not set")
 60	}
 61
 62	dbPath := filepath.Join(dataDir, "crush.db")
 63
 64	// Resolve to an absolute path so that different relative paths to
 65	// the same file share a single connection.
 66	absPath, err := filepath.Abs(dbPath)
 67	if err != nil {
 68		absPath = dbPath
 69	}
 70
 71	poolMu.Lock()
 72	defer poolMu.Unlock()
 73
 74	if entry, ok := pool[absPath]; ok {
 75		entry.refCount++
 76		return entry.db, nil
 77	}
 78
 79	conn, err := openDB(dbPath)
 80	if err != nil {
 81		return nil, err
 82	}
 83
 84	// Serialize all access through a single connection. SQLite
 85	// serializes writes at the file level anyway, and allowing multiple
 86	// pool connections to interleave writes/checkpoints (especially
 87	// under concurrent sub-agents) has caused WAL/header desync
 88	// resulting in SQLITE_NOTADB (26) on the next open.
 89	conn.SetMaxOpenConns(1)
 90
 91	if err = conn.PingContext(ctx); err != nil {
 92		conn.Close()
 93		return nil, fmt.Errorf("failed to connect to database: %w", err)
 94	}
 95
 96	if err := initGoose(); err != nil {
 97		conn.Close()
 98		slog.Error("Failed to initialize goose", "error", err)
 99		return nil, fmt.Errorf("failed to initialize goose: %w", err)
100	}
101
102	if err := goose.Up(conn, "migrations"); err != nil {
103		conn.Close()
104		slog.Error("Failed to apply migrations", "error", err)
105		return nil, fmt.Errorf("failed to apply migrations: %w", err)
106	}
107
108	pool[absPath] = &connEntry{db: conn, refCount: 1}
109	return conn, nil
110}
111
112// Release decrements the reference count for the database at the given
113// data directory. When the count reaches zero the underlying connection
114// is closed and removed from the pool.
115func Release(dataDir string) error {
116	dbPath := filepath.Join(dataDir, "crush.db")
117	absPath, err := filepath.Abs(dbPath)
118	if err != nil {
119		absPath = dbPath
120	}
121
122	poolMu.Lock()
123	defer poolMu.Unlock()
124
125	entry, ok := pool[absPath]
126	if !ok {
127		return nil
128	}
129
130	entry.refCount--
131	if entry.refCount > 0 {
132		return nil
133	}
134
135	delete(pool, absPath)
136	return entry.db.Close()
137}
138
139// ResetPool closes all pooled connections and clears the pool. This is
140// intended for use in tests to ensure a clean state between test cases.
141func ResetPool() {
142	poolMu.Lock()
143	defer poolMu.Unlock()
144	for path, entry := range pool {
145		entry.db.Close()
146		delete(pool, path)
147	}
148}
149
150func initGoose() error {
151	gooseInitOnce.Do(func() {
152		goose.SetBaseFS(FS)
153		gooseInitErr = goose.SetDialect("sqlite3")
154	})
155
156	return gooseInitErr
157}