db.go

  1// SPDX-FileCopyrightText: Amolith <amolith@secluded.site>
  2//
  3// SPDX-License-Identifier: AGPL-3.0-or-later
  4
  5package db
  6
  7import (
  8	"context"
  9	"encoding/json"
 10	"errors"
 11	"fmt"
 12	"os"
 13	"time"
 14
 15	"github.com/dgraph-io/badger/v4"
 16)
 17
 18// Database wraps a Badger instance with higher-level helpers suitable for the
 19// nasin pali domain.
 20type Database struct {
 21	opts   Options
 22	badger *badger.DB
 23}
 24
 25// Errors mapped from Badger, providing a consistent surface to callers.
 26var (
 27	ErrClosed      = errors.New("db: database is closed")
 28	ErrReadOnly    = errors.New("db: database opened read-only")
 29	ErrKeyNotFound = errors.New("db: key not found")
 30	ErrTxnAborted  = errors.New("db: transaction aborted by callback")
 31)
 32
 33// Open instantiates a Database using the provided options.
 34func Open(opts Options) (*Database, error) {
 35	finalOpts, err := opts.applyDefaults()
 36	if err != nil {
 37		return nil, err
 38	}
 39
 40	if err := ensureDir(finalOpts.Path); err != nil {
 41		return nil, fmt.Errorf("db: preparing data directory: %w", err)
 42	}
 43
 44	badgerOpts := badger.DefaultOptions(finalOpts.Path)
 45	badgerOpts.SyncWrites = finalOpts.SyncWrites
 46	badgerOpts.ReadOnly = finalOpts.ReadOnly
 47	badgerOpts.Logger = badgerLoggerAdapter{logger: finalOpts.Logger}
 48	if !finalOpts.ReadOnly {
 49		// ValueLogFileSize defaults to 1GB which is excessive for our workloads.
 50		// Drop it to 64 MiB to reduce disk footprint without sacrificing much
 51		// performance.
 52		badgerOpts.ValueLogFileSize = 64 << 20
 53	}
 54
 55	db, err := badger.Open(badgerOpts)
 56	if err != nil {
 57		return nil, fmt.Errorf("db: open badger: %w", err)
 58	}
 59
 60	return &Database{
 61		opts:   finalOpts,
 62		badger: db,
 63	}, nil
 64}
 65
 66// Close releases underlying resources.
 67func (db *Database) Close() error {
 68	if db.badger == nil {
 69		return nil
 70	}
 71	err := db.badger.Close()
 72	db.badger = nil
 73	return err
 74}
 75
 76// Path exposes the filesystem directory backing the database.
 77func (db *Database) Path() string {
 78	return db.opts.Path
 79}
 80
 81// View executes fn within a read-only transaction.
 82func (db *Database) View(ctx context.Context, fn func(*Txn) error) error {
 83	if ctx == nil {
 84		ctx = context.Background()
 85	}
 86	if db.badger == nil {
 87		return ErrClosed
 88	}
 89
 90	txn := db.badger.NewTransaction(false)
 91	defer txn.Discard()
 92
 93	if err := fn(&Txn{txn: txn, readonly: true}); err != nil {
 94		if errors.Is(err, badger.ErrKeyNotFound) {
 95			return ErrKeyNotFound
 96		}
 97		if errors.Is(err, ErrTxnAborted) {
 98			return nil
 99		}
100		return err
101	}
102
103	return ctx.Err()
104}
105
106// Update executes fn within a read-write transaction, retrying on conflicts
107// according to Options.MaxTxnRetries.
108func (db *Database) Update(ctx context.Context, fn func(*Txn) error) error {
109	if db.opts.ReadOnly {
110		return ErrReadOnly
111	}
112	if db.badger == nil {
113		return ErrClosed
114	}
115	if ctx == nil {
116		ctx = context.Background()
117	}
118
119	var attempt int
120	for {
121		if err := ctx.Err(); err != nil {
122			return err
123		}
124
125		txn := db.badger.NewTransaction(true)
126		callbackErr := fn(&Txn{txn: txn})
127		if callbackErr != nil {
128			txn.Discard()
129			if errors.Is(callbackErr, ErrTxnAborted) {
130				return nil
131			}
132			if errors.Is(callbackErr, badger.ErrKeyNotFound) {
133				return ErrKeyNotFound
134			}
135			return callbackErr
136		}
137
138		if err := txn.Commit(); err != nil {
139			txn.Discard()
140			if errors.Is(err, badger.ErrConflict) && attempt < db.opts.MaxTxnRetries {
141				backoff := db.opts.ConflictBackoff * time.Duration(1<<attempt)
142				select {
143				case <-ctx.Done():
144					return ctx.Err()
145				case <-time.After(backoff):
146					attempt++
147					continue
148				}
149			}
150
151			if errors.Is(err, badger.ErrKeyNotFound) {
152				return ErrKeyNotFound
153			}
154			return fmt.Errorf("db: commit transaction: %w", err)
155		}
156
157		txn.Discard()
158		return nil
159	}
160}
161
162// Txn wraps badger.Txn and exposes helper methods.
163type Txn struct {
164	txn      *badger.Txn
165	readonly bool
166}
167
168// Abort signals that the transaction should be rolled back without returning an
169// error to callers.
170func (t *Txn) Abort() error {
171	return ErrTxnAborted
172}
173
174// Get retrieves the value for key.
175func (t *Txn) Get(key []byte) ([]byte, error) {
176	item, err := t.txn.Get(key)
177	if err != nil {
178		return nil, err
179	}
180	return item.ValueCopy(nil)
181}
182
183// GetJSON retrieves the value at key and unmarshals it into dst.
184func (t *Txn) GetJSON(key []byte, dst any) error {
185	item, err := t.txn.Get(key)
186	if err != nil {
187		return err
188	}
189	return item.Value(func(val []byte) error {
190		return json.Unmarshal(val, dst)
191	})
192}
193
194// Exists reports whether a key exists.
195func (t *Txn) Exists(key []byte) (bool, error) {
196	_, err := t.txn.Get(key)
197	if err == nil {
198		return true, nil
199	}
200	if errors.Is(err, badger.ErrKeyNotFound) {
201		return false, nil
202	}
203	return false, err
204}
205
206// Set associates key with value.
207func (t *Txn) Set(key, value []byte) error {
208	if t.readonly {
209		return ErrReadOnly
210	}
211	return t.txn.Set(key, value)
212}
213
214// SetJSON marshals v as JSON and stores it at key.
215func (t *Txn) SetJSON(key []byte, v any) error {
216	if t.readonly {
217		return ErrReadOnly
218	}
219	data, err := json.Marshal(v)
220	if err != nil {
221		return err
222	}
223	return t.txn.Set(key, data)
224}
225
226// Delete removes key.
227func (t *Txn) Delete(key []byte) error {
228	if t.readonly {
229		return ErrReadOnly
230	}
231	return t.txn.Delete(key)
232}
233
234// IncrementUint64 increments a big-endian uint64 value stored at key by delta
235// and returns the resulting value. The stored representation is raw 8-byte big
236// endian, aligning with the schema's counter storage.
237func (t *Txn) IncrementUint64(key []byte, delta uint64) (uint64, error) {
238	if t.readonly {
239		return 0, ErrReadOnly
240	}
241
242	var current uint64
243	item, err := t.txn.Get(key)
244	switch {
245	case err == nil:
246		if err := item.Value(func(val []byte) error {
247			var convErr error
248			current, convErr = decodeUint64(val)
249			return convErr
250		}); err != nil {
251			return 0, err
252		}
253	case errors.Is(err, badger.ErrKeyNotFound):
254		current = 0
255	default:
256		return 0, err
257	}
258
259	next := current + delta
260	if err := t.txn.Set(key, encodeUint64(next)); err != nil {
261		return 0, err
262	}
263	return next, nil
264}
265
266// Iterate walks over keys using prefix iteration.
267func (t *Txn) Iterate(opts IterateOptions, fn func(Item) error) error {
268	iterOpts := badger.DefaultIteratorOptions
269	iterOpts.Reverse = opts.Reverse
270	iterOpts.PrefetchValues = opts.PrefetchValues
271	iterOpts.Prefix = opts.Prefix
272
273	it := t.txn.NewIterator(iterOpts)
274	defer it.Close()
275
276	if len(opts.Prefix) > 0 {
277		for it.Seek(opts.Prefix); it.ValidForPrefix(opts.Prefix); it.Next() {
278			if err := fn(Item{item: it.Item()}); err != nil {
279				if errors.Is(err, ErrTxnAborted) {
280					return nil
281				}
282				return err
283			}
284		}
285		return nil
286	}
287
288	for it.Rewind(); it.Valid(); it.Next() {
289		if err := fn(Item{item: it.Item()}); err != nil {
290			if errors.Is(err, ErrTxnAborted) {
291				return nil
292			}
293			return err
294		}
295	}
296	return nil
297}
298
299// Item wraps a Badger item during iteration.
300type Item struct {
301	item *badger.Item
302}
303
304// Key returns a copy of the item's key.
305func (it Item) Key() []byte {
306	return it.item.KeyCopy(nil)
307}
308
309// KeyString returns the item's key as a string without additional allocation.
310func (it Item) KeyString() string {
311	return string(it.item.KeyCopy(nil))
312}
313
314// Value returns a copy of the item's value.
315func (it Item) Value() ([]byte, error) {
316	return it.item.ValueCopy(nil)
317}
318
319// ValueJSON unmarshals the item's value into dst.
320func (it Item) ValueJSON(dst any) error {
321	return it.item.Value(func(val []byte) error {
322		return json.Unmarshal(val, dst)
323	})
324}
325
326// IterateOptions configures Txn.Iterate.
327type IterateOptions struct {
328	Prefix         []byte
329	Reverse        bool
330	PrefetchValues bool
331}
332
333func ensureDir(path string) error {
334	return os.MkdirAll(path, 0o755)
335}
336
337type badgerLoggerAdapter struct {
338	logger Logger
339}
340
341func (a badgerLoggerAdapter) Errorf(format string, args ...any) {
342	a.logger.Errorf(format, args...)
343}
344
345func (a badgerLoggerAdapter) Warningf(format string, args ...any) {
346	a.logger.Warningf(format, args...)
347}
348
349func (a badgerLoggerAdapter) Infof(format string, args ...any) {
350	a.logger.Infof(format, args...)
351}
352
353func (a badgerLoggerAdapter) Debugf(format string, args ...any) {
354	a.logger.Debugf(format, args...)
355}
356
357func encodeUint64(v uint64) []byte {
358	var b [8]byte
359	putUint64(b[:], v)
360	return b[:]
361}
362
363func decodeUint64(b []byte) (uint64, error) {
364	if len(b) != 8 {
365		return 0, fmt.Errorf("db: expected 8 bytes, got %d", len(b))
366	}
367	return readUint64(b), nil
368}