1// SPDX-FileCopyrightText: Amolith <amolith@secluded.site>
  2//
  3// SPDX-License-Identifier: AGPL-3.0-or-later
  4
  5package db
  6
  7import (
  8	"context"
  9	"encoding/json"
 10	"errors"
 11	"fmt"
 12	"os"
 13	"time"
 14
 15	"github.com/dgraph-io/badger/v4"
 16)
 17
 18// Database wraps a Badger instance with higher-level helpers suitable for the
 19// nasin pali domain.
 20type Database struct {
 21	opts   Options
 22	badger *badger.DB
 23}
 24
 25// Errors mapped from Badger, providing a consistent surface to callers.
 26var (
 27	ErrClosed      = errors.New("db: database is closed")
 28	ErrReadOnly    = errors.New("db: database opened read-only")
 29	ErrKeyNotFound = errors.New("db: key not found")
 30	ErrTxnAborted  = errors.New("db: transaction aborted by callback")
 31)
 32
 33// Open instantiates a Database using the provided options.
 34func Open(opts Options) (*Database, error) {
 35	finalOpts, err := opts.applyDefaults()
 36	if err != nil {
 37		return nil, err
 38	}
 39
 40	if err := ensureDir(finalOpts.Path); err != nil {
 41		return nil, fmt.Errorf("db: preparing data directory: %w", err)
 42	}
 43
 44	badgerOpts := badger.DefaultOptions(finalOpts.Path)
 45	badgerOpts.SyncWrites = finalOpts.SyncWrites
 46	badgerOpts.ReadOnly = finalOpts.ReadOnly
 47	badgerOpts.Logger = badgerLoggerAdapter{logger: finalOpts.Logger}
 48	if !finalOpts.ReadOnly {
 49		// ValueLogFileSize defaults to 1GB which is excessive for our workloads.
 50		// Drop it to 64 MiB to reduce disk footprint without sacrificing much
 51		// performance.
 52		badgerOpts.ValueLogFileSize = 64 << 20
 53	}
 54
 55	db, err := badger.Open(badgerOpts)
 56	if err != nil {
 57		return nil, fmt.Errorf("db: open badger: %w", err)
 58	}
 59
 60	return &Database{
 61		opts:   finalOpts,
 62		badger: db,
 63	}, nil
 64}
 65
 66// Close releases underlying resources.
 67func (db *Database) Close() error {
 68	if db.badger == nil {
 69		return nil
 70	}
 71	err := db.badger.Close()
 72	db.badger = nil
 73	return err
 74}
 75
 76// Path exposes the filesystem directory backing the database.
 77func (db *Database) Path() string {
 78	return db.opts.Path
 79}
 80
 81// View executes fn within a read-only transaction.
 82func (db *Database) View(ctx context.Context, fn func(*Txn) error) error {
 83	if db.badger == nil {
 84		return ErrClosed
 85	}
 86
 87	txn := db.badger.NewTransaction(false)
 88	defer txn.Discard()
 89
 90	if err := fn(&Txn{txn: txn, readonly: true}); err != nil {
 91		if errors.Is(err, badger.ErrKeyNotFound) {
 92			return ErrKeyNotFound
 93		}
 94		if errors.Is(err, ErrTxnAborted) {
 95			return nil
 96		}
 97		return err
 98	}
 99
100	return nil
101}
102
103// Update executes fn within a read-write transaction, retrying on conflicts
104// according to Options.MaxTxnRetries.
105func (db *Database) Update(ctx context.Context, fn func(*Txn) error) error {
106	if db.opts.ReadOnly {
107		return ErrReadOnly
108	}
109	if db.badger == nil {
110		return ErrClosed
111	}
112	if ctx == nil {
113		ctx = context.Background()
114	}
115
116	var attempt int
117	for {
118		if err := ctx.Err(); err != nil {
119			return err
120		}
121
122		txn := db.badger.NewTransaction(true)
123		callbackErr := fn(&Txn{txn: txn})
124		if callbackErr != nil {
125			txn.Discard()
126			if errors.Is(callbackErr, ErrTxnAborted) {
127				return nil
128			}
129			if errors.Is(callbackErr, badger.ErrKeyNotFound) {
130				return ErrKeyNotFound
131			}
132			return callbackErr
133		}
134
135		if err := txn.Commit(); err != nil {
136			txn.Discard()
137			if errors.Is(err, badger.ErrConflict) && attempt < db.opts.MaxTxnRetries {
138				backoff := db.opts.ConflictBackoff * time.Duration(1<<attempt)
139				select {
140				case <-ctx.Done():
141					return ctx.Err()
142				case <-time.After(backoff):
143					attempt++
144					continue
145				}
146			}
147
148			if errors.Is(err, badger.ErrKeyNotFound) {
149				return ErrKeyNotFound
150			}
151			return fmt.Errorf("db: commit transaction: %w", err)
152		}
153
154		return nil
155	}
156}
157
158// Txn wraps badger.Txn and exposes helper methods.
159type Txn struct {
160	txn      *badger.Txn
161	readonly bool
162}
163
164// Abort signals that the transaction should be rolled back without returning an
165// error to callers.
166func (t *Txn) Abort() error {
167	return ErrTxnAborted
168}
169
170// Get retrieves the value for key.
171func (t *Txn) Get(key []byte) ([]byte, error) {
172	item, err := t.txn.Get(key)
173	if err != nil {
174		return nil, err
175	}
176	return item.ValueCopy(nil)
177}
178
179// GetJSON retrieves the value at key and unmarshals it into dst.
180func (t *Txn) GetJSON(key []byte, dst any) error {
181	item, err := t.txn.Get(key)
182	if err != nil {
183		return err
184	}
185	return item.Value(func(val []byte) error {
186		return json.Unmarshal(val, dst)
187	})
188}
189
190// Exists reports whether a key exists.
191func (t *Txn) Exists(key []byte) (bool, error) {
192	_, err := t.txn.Get(key)
193	if err == nil {
194		return true, nil
195	}
196	if errors.Is(err, badger.ErrKeyNotFound) {
197		return false, nil
198	}
199	return false, err
200}
201
202// Set associates key with value.
203func (t *Txn) Set(key, value []byte) error {
204	if t.readonly {
205		return ErrReadOnly
206	}
207	return t.txn.Set(key, value)
208}
209
210// SetJSON marshals v as JSON and stores it at key.
211func (t *Txn) SetJSON(key []byte, v any) error {
212	if t.readonly {
213		return ErrReadOnly
214	}
215	data, err := json.Marshal(v)
216	if err != nil {
217		return err
218	}
219	return t.txn.Set(key, data)
220}
221
222// Delete removes key.
223func (t *Txn) Delete(key []byte) error {
224	if t.readonly {
225		return ErrReadOnly
226	}
227	return t.txn.Delete(key)
228}
229
230// IncrementUint64 increments a big-endian uint64 value stored at key by delta
231// and returns the resulting value. The stored representation is raw 8-byte big
232// endian, aligning with the schema's counter storage.
233func (t *Txn) IncrementUint64(key []byte, delta uint64) (uint64, error) {
234	if t.readonly {
235		return 0, ErrReadOnly
236	}
237
238	var current uint64
239	item, err := t.txn.Get(key)
240	switch {
241	case err == nil:
242		if err := item.Value(func(val []byte) error {
243			var convErr error
244			current, convErr = decodeUint64(val)
245			return convErr
246		}); err != nil {
247			return 0, err
248		}
249	case errors.Is(err, badger.ErrKeyNotFound):
250		current = 0
251	default:
252		return 0, err
253	}
254
255	next := current + delta
256	if err := t.txn.Set(key, encodeUint64(next)); err != nil {
257		return 0, err
258	}
259	return next, nil
260}
261
262// Iterate walks over keys using prefix iteration.
263func (t *Txn) Iterate(opts IterateOptions, fn func(Item) error) error {
264	iterOpts := badger.DefaultIteratorOptions
265	iterOpts.Reverse = opts.Reverse
266	iterOpts.PrefetchValues = opts.PrefetchValues
267	iterOpts.Prefix = opts.Prefix
268
269	it := t.txn.NewIterator(iterOpts)
270	defer it.Close()
271
272	if len(opts.Prefix) > 0 {
273		for it.Seek(opts.Prefix); it.ValidForPrefix(opts.Prefix); it.Next() {
274			if err := fn(Item{item: it.Item()}); err != nil {
275				if errors.Is(err, ErrTxnAborted) {
276					return nil
277				}
278				return err
279			}
280		}
281		return nil
282	}
283
284	for it.Rewind(); it.Valid(); it.Next() {
285		if err := fn(Item{item: it.Item()}); err != nil {
286			if errors.Is(err, ErrTxnAborted) {
287				return nil
288			}
289			return err
290		}
291	}
292	return nil
293}
294
295// Item wraps a Badger item during iteration.
296type Item struct {
297	item *badger.Item
298}
299
300// Key returns a copy of the item's key.
301func (it Item) Key() []byte {
302	return it.item.KeyCopy(nil)
303}
304
305// KeyString returns the item's key as a string (allocates).
306func (it Item) KeyString() string {
307	return string(it.item.KeyCopy(nil))
308}
309
310// Value returns a copy of the item's value.
311func (it Item) Value() ([]byte, error) {
312	return it.item.ValueCopy(nil)
313}
314
315// ValueJSON unmarshals the item's value into dst.
316func (it Item) ValueJSON(dst any) error {
317	return it.item.Value(func(val []byte) error {
318		return json.Unmarshal(val, dst)
319	})
320}
321
322// IterateOptions configures Txn.Iterate.
323type IterateOptions struct {
324	Prefix         []byte
325	Reverse        bool
326	PrefetchValues bool
327}
328
329func ensureDir(path string) error {
330	return os.MkdirAll(path, 0o755)
331}
332
333type badgerLoggerAdapter struct {
334	logger Logger
335}
336
337func (a badgerLoggerAdapter) Errorf(format string, args ...any) {
338	a.logger.Errorf(format, args...)
339}
340
341func (a badgerLoggerAdapter) Warningf(format string, args ...any) {
342	a.logger.Warningf(format, args...)
343}
344
345func (a badgerLoggerAdapter) Infof(format string, args ...any) {
346	a.logger.Infof(format, args...)
347}
348
349func (a badgerLoggerAdapter) Debugf(format string, args ...any) {
350	a.logger.Debugf(format, args...)
351}
352
353func encodeUint64(v uint64) []byte {
354	var b [8]byte
355	putUint64(b[:], v)
356	return b[:]
357}
358
359func decodeUint64(b []byte) (uint64, error) {
360	if len(b) != 8 {
361		return 0, fmt.Errorf("db: expected 8 bytes, got %d", len(b))
362	}
363	return readUint64(b), nil
364}