1// SPDX-FileCopyrightText: Amolith <amolith@secluded.site>
2//
3// SPDX-License-Identifier: AGPL-3.0-or-later
4
5package db
6
7import (
8 "context"
9 "encoding/json"
10 "errors"
11 "fmt"
12 "os"
13 "time"
14
15 "github.com/dgraph-io/badger/v4"
16)
17
18// Database wraps a Badger instance with higher-level helpers suitable for the
19// nasin pali domain.
20type Database struct {
21 opts Options
22 badger *badger.DB
23}
24
25// Errors mapped from Badger, providing a consistent surface to callers.
26var (
27 ErrClosed = errors.New("db: database is closed")
28 ErrReadOnly = errors.New("db: database opened read-only")
29 ErrKeyNotFound = errors.New("db: key not found")
30 ErrTxnAborted = errors.New("db: transaction aborted by callback")
31)
32
33// Open instantiates a Database using the provided options.
34func Open(opts Options) (*Database, error) {
35 finalOpts, err := opts.applyDefaults()
36 if err != nil {
37 return nil, err
38 }
39
40 if err := ensureDir(finalOpts.Path); err != nil {
41 return nil, fmt.Errorf("db: preparing data directory: %w", err)
42 }
43
44 badgerOpts := badger.DefaultOptions(finalOpts.Path)
45 badgerOpts.SyncWrites = finalOpts.SyncWrites
46 badgerOpts.ReadOnly = finalOpts.ReadOnly
47 badgerOpts.Logger = badgerLoggerAdapter{logger: finalOpts.Logger}
48 if !finalOpts.ReadOnly {
49 // ValueLogFileSize defaults to 1GB which is excessive for our workloads.
50 // Drop it to 64 MiB to reduce disk footprint without sacrificing much
51 // performance.
52 badgerOpts.ValueLogFileSize = 64 << 20
53 }
54
55 db, err := badger.Open(badgerOpts)
56 if err != nil {
57 return nil, fmt.Errorf("db: open badger: %w", err)
58 }
59
60 return &Database{
61 opts: finalOpts,
62 badger: db,
63 }, nil
64}
65
66// Close releases underlying resources.
67func (db *Database) Close() error {
68 if db.badger == nil {
69 return nil
70 }
71 err := db.badger.Close()
72 db.badger = nil
73 return err
74}
75
76// Path exposes the filesystem directory backing the database.
77func (db *Database) Path() string {
78 return db.opts.Path
79}
80
81// View executes fn within a read-only transaction.
82func (db *Database) View(ctx context.Context, fn func(*Txn) error) error {
83 if db.badger == nil {
84 return ErrClosed
85 }
86
87 txn := db.badger.NewTransaction(false)
88 defer txn.Discard()
89
90 if err := fn(&Txn{txn: txn, readonly: true}); err != nil {
91 if errors.Is(err, badger.ErrKeyNotFound) {
92 return ErrKeyNotFound
93 }
94 if errors.Is(err, ErrTxnAborted) {
95 return nil
96 }
97 return err
98 }
99
100 return nil
101}
102
103// Update executes fn within a read-write transaction, retrying on conflicts
104// according to Options.MaxTxnRetries.
105func (db *Database) Update(ctx context.Context, fn func(*Txn) error) error {
106 if db.opts.ReadOnly {
107 return ErrReadOnly
108 }
109 if db.badger == nil {
110 return ErrClosed
111 }
112 if ctx == nil {
113 ctx = context.Background()
114 }
115
116 var attempt int
117 for {
118 if err := ctx.Err(); err != nil {
119 return err
120 }
121
122 txn := db.badger.NewTransaction(true)
123 callbackErr := fn(&Txn{txn: txn})
124 if callbackErr != nil {
125 txn.Discard()
126 if errors.Is(callbackErr, ErrTxnAborted) {
127 return nil
128 }
129 if errors.Is(callbackErr, badger.ErrKeyNotFound) {
130 return ErrKeyNotFound
131 }
132 return callbackErr
133 }
134
135 if err := txn.Commit(); err != nil {
136 txn.Discard()
137 if errors.Is(err, badger.ErrConflict) && attempt < db.opts.MaxTxnRetries {
138 backoff := db.opts.ConflictBackoff * time.Duration(1<<attempt)
139 select {
140 case <-ctx.Done():
141 return ctx.Err()
142 case <-time.After(backoff):
143 attempt++
144 continue
145 }
146 }
147
148 if errors.Is(err, badger.ErrKeyNotFound) {
149 return ErrKeyNotFound
150 }
151 return fmt.Errorf("db: commit transaction: %w", err)
152 }
153
154 return nil
155 }
156}
157
158// Txn wraps badger.Txn and exposes helper methods.
159type Txn struct {
160 txn *badger.Txn
161 readonly bool
162}
163
164// Abort signals that the transaction should be rolled back without returning an
165// error to callers.
166func (t *Txn) Abort() error {
167 return ErrTxnAborted
168}
169
170// Get retrieves the value for key.
171func (t *Txn) Get(key []byte) ([]byte, error) {
172 item, err := t.txn.Get(key)
173 if err != nil {
174 return nil, err
175 }
176 return item.ValueCopy(nil)
177}
178
179// GetJSON retrieves the value at key and unmarshals it into dst.
180func (t *Txn) GetJSON(key []byte, dst any) error {
181 item, err := t.txn.Get(key)
182 if err != nil {
183 return err
184 }
185 return item.Value(func(val []byte) error {
186 return json.Unmarshal(val, dst)
187 })
188}
189
190// Exists reports whether a key exists.
191func (t *Txn) Exists(key []byte) (bool, error) {
192 _, err := t.txn.Get(key)
193 if err == nil {
194 return true, nil
195 }
196 if errors.Is(err, badger.ErrKeyNotFound) {
197 return false, nil
198 }
199 return false, err
200}
201
202// Set associates key with value.
203func (t *Txn) Set(key, value []byte) error {
204 if t.readonly {
205 return ErrReadOnly
206 }
207 return t.txn.Set(key, value)
208}
209
210// SetJSON marshals v as JSON and stores it at key.
211func (t *Txn) SetJSON(key []byte, v any) error {
212 if t.readonly {
213 return ErrReadOnly
214 }
215 data, err := json.Marshal(v)
216 if err != nil {
217 return err
218 }
219 return t.txn.Set(key, data)
220}
221
222// Delete removes key.
223func (t *Txn) Delete(key []byte) error {
224 if t.readonly {
225 return ErrReadOnly
226 }
227 return t.txn.Delete(key)
228}
229
230// IncrementUint64 increments a big-endian uint64 value stored at key by delta
231// and returns the resulting value. The stored representation is raw 8-byte big
232// endian, aligning with the schema's counter storage.
233func (t *Txn) IncrementUint64(key []byte, delta uint64) (uint64, error) {
234 if t.readonly {
235 return 0, ErrReadOnly
236 }
237
238 var current uint64
239 item, err := t.txn.Get(key)
240 switch {
241 case err == nil:
242 if err := item.Value(func(val []byte) error {
243 var convErr error
244 current, convErr = decodeUint64(val)
245 return convErr
246 }); err != nil {
247 return 0, err
248 }
249 case errors.Is(err, badger.ErrKeyNotFound):
250 current = 0
251 default:
252 return 0, err
253 }
254
255 next := current + delta
256 if err := t.txn.Set(key, encodeUint64(next)); err != nil {
257 return 0, err
258 }
259 return next, nil
260}
261
262// Iterate walks over keys using prefix iteration.
263func (t *Txn) Iterate(opts IterateOptions, fn func(Item) error) error {
264 iterOpts := badger.DefaultIteratorOptions
265 iterOpts.Reverse = opts.Reverse
266 iterOpts.PrefetchValues = opts.PrefetchValues
267 iterOpts.Prefix = opts.Prefix
268
269 it := t.txn.NewIterator(iterOpts)
270 defer it.Close()
271
272 if len(opts.Prefix) > 0 {
273 if opts.Reverse {
274 // Seek to the end of the prefix range by appending 0xFF
275 seekKey := append(opts.Prefix, 0xFF)
276 it.Seek(seekKey)
277 } else {
278 it.Seek(opts.Prefix)
279 }
280
281 for ; it.ValidForPrefix(opts.Prefix); it.Next() {
282 if err := fn(Item{item: it.Item()}); err != nil {
283 if errors.Is(err, ErrTxnAborted) {
284 return nil
285 }
286 return err
287 }
288 }
289 return nil
290 }
291
292 for it.Rewind(); it.Valid(); it.Next() {
293 if err := fn(Item{item: it.Item()}); err != nil {
294 if errors.Is(err, ErrTxnAborted) {
295 return nil
296 }
297 return err
298 }
299 }
300 return nil
301}
302
303// Item wraps a Badger item during iteration.
304type Item struct {
305 item *badger.Item
306}
307
308// Key returns a copy of the item's key.
309func (it Item) Key() []byte {
310 return it.item.KeyCopy(nil)
311}
312
313// KeyString returns the item's key as a string (allocates).
314func (it Item) KeyString() string {
315 return string(it.item.KeyCopy(nil))
316}
317
318// Value returns a copy of the item's value.
319func (it Item) Value() ([]byte, error) {
320 return it.item.ValueCopy(nil)
321}
322
323// ValueJSON unmarshals the item's value into dst.
324func (it Item) ValueJSON(dst any) error {
325 return it.item.Value(func(val []byte) error {
326 return json.Unmarshal(val, dst)
327 })
328}
329
330// IterateOptions configures Txn.Iterate.
331type IterateOptions struct {
332 Prefix []byte
333 Reverse bool
334 PrefetchValues bool
335}
336
337func ensureDir(path string) error {
338 return os.MkdirAll(path, 0o755)
339}
340
341type badgerLoggerAdapter struct {
342 logger Logger
343}
344
345func (a badgerLoggerAdapter) Errorf(format string, args ...any) {
346 a.logger.Errorf(format, args...)
347}
348
349func (a badgerLoggerAdapter) Warningf(format string, args ...any) {
350 a.logger.Warningf(format, args...)
351}
352
353func (a badgerLoggerAdapter) Infof(format string, args ...any) {
354 a.logger.Infof(format, args...)
355}
356
357func (a badgerLoggerAdapter) Debugf(format string, args ...any) {
358 a.logger.Debugf(format, args...)
359}
360
361func encodeUint64(v uint64) []byte {
362 var b [8]byte
363 putUint64(b[:], v)
364 return b[:]
365}
366
367func decodeUint64(b []byte) (uint64, error) {
368 if len(b) != 8 {
369 return 0, fmt.Errorf("db: expected 8 bytes, got %d", len(b))
370 }
371 return readUint64(b), nil
372}