1// SPDX-FileCopyrightText: Amolith <amolith@secluded.site>
2//
3// SPDX-License-Identifier: AGPL-3.0-or-later
4
5package event
6
7import (
8 "context"
9 "encoding/binary"
10 "encoding/json"
11 "errors"
12 "fmt"
13 "strings"
14 "time"
15
16 "git.secluded.site/np/internal/db"
17 "git.secluded.site/np/internal/timeutil"
18)
19
20// ErrInvalidType indicates that an event type was not recognised.
21var ErrInvalidType = errors.New("event: invalid type")
22
23// ErrEmptyCommand indicates that a command string was not provided.
24var ErrEmptyCommand = errors.New("event: command is required")
25
26// Type enumerates known event kinds.
27type Type string
28
29const (
30 TypeGoalSet Type = "goal_set"
31 TypeGoalUpdated Type = "goal_updated"
32 TypeTaskAdded Type = "task_added"
33 TypeTaskUpdated Type = "task_updated"
34 TypeTaskStatusChange Type = "task_status_changed"
35)
36
37// Valid reports whether t is a recognised event type.
38func (t Type) Valid() bool {
39 switch t {
40 case TypeGoalSet,
41 TypeGoalUpdated,
42 TypeTaskAdded,
43 TypeTaskUpdated,
44 TypeTaskStatusChange:
45 return true
46 default:
47 return false
48 }
49}
50
51// Record represents a stored event entry.
52type Record struct {
53 Seq uint64 `json:"seq"`
54 At time.Time `json:"at"`
55 Type Type `json:"type"`
56 Reason *string `json:"reason,omitempty"`
57 Command string `json:"cmd"`
58 Payload json.RawMessage `json:"payload"`
59}
60
61// HasReason reports whether a reason was supplied.
62func (r Record) HasReason() bool {
63 return r.Reason != nil && *r.Reason != ""
64}
65
66// UnmarshalPayload decodes the payload into dst.
67func (r Record) UnmarshalPayload(dst any) error {
68 if len(r.Payload) == 0 {
69 return errors.New("event: payload is empty")
70 }
71 return json.Unmarshal(r.Payload, dst)
72}
73
74// AppendInput captures the data necessary to append an event.
75type AppendInput struct {
76 Type Type
77 Command string
78 Reason string
79 Payload any
80 At time.Time
81}
82
83// ListOptions controls event listing.
84type ListOptions struct {
85 // After skips events with seq <= After.
86 After uint64
87 // Limit restricts the number of events returned. Zero or negative returns all.
88 Limit int
89}
90
91// Store provides high-level helpers for working with events.
92type Store struct {
93 db *db.Database
94 clock timeutil.Clock
95}
96
97// NewStore constructs a Store. When clock is nil, a UTC system clock is used.
98func NewStore(database *db.Database, clock timeutil.Clock) *Store {
99 if clock == nil {
100 clock = timeutil.UTCClock{}
101 }
102 return &Store{
103 db: database,
104 clock: clock,
105 }
106}
107
108// WithTxn exposes transactional helpers for use within db.Update.
109func (s *Store) WithTxn(txn *db.Txn) TxnStore {
110 return TxnStore{
111 txn: txn,
112 clock: s.clock,
113 }
114}
115
116// Append records an event for sid.
117func (s *Store) Append(ctx context.Context, sid string, input AppendInput) (Record, error) {
118 var rec Record
119 err := s.db.Update(ctx, func(txn *db.Txn) error {
120 var err error
121 rec, err = appendRecord(txn, s.clock, sid, input)
122 return err
123 })
124 return rec, err
125}
126
127// List returns events for sid subject to opts.
128func (s *Store) List(ctx context.Context, sid string, opts ListOptions) ([]Record, error) {
129 var records []Record
130 err := s.db.View(ctx, func(txn *db.Txn) error {
131 var err error
132 records, err = listRecords(txn, sid, opts)
133 return err
134 })
135 return records, err
136}
137
138// LatestSequence returns the latest event sequence for sid.
139func (s *Store) LatestSequence(ctx context.Context, sid string) (uint64, error) {
140 var seq uint64
141 err := s.db.View(ctx, func(txn *db.Txn) error {
142 var err error
143 seq, err = latestSequence(txn, sid)
144 return err
145 })
146 return seq, err
147}
148
149// TxnStore wraps a db transaction for event operations.
150type TxnStore struct {
151 txn *db.Txn
152 clock timeutil.Clock
153}
154
155// Append records an event using the wrapped transaction.
156func (s TxnStore) Append(sid string, input AppendInput) (Record, error) {
157 return appendRecord(s.txn, s.clock, sid, input)
158}
159
160// List returns events for sid using opts.
161func (s TxnStore) List(sid string, opts ListOptions) ([]Record, error) {
162 return listRecords(s.txn, sid, opts)
163}
164
165// LatestSequence returns the most recent event sequence.
166func (s TxnStore) LatestSequence(sid string) (uint64, error) {
167 return latestSequence(s.txn, sid)
168}
169
170// Prefix returns the subscription prefix for events in sid.
171func Prefix(sid string) []byte {
172 return db.PrefixSessionEvents(sid)
173}
174
175// Key returns the event key for seq within sid.
176func Key(sid string, seq uint64) []byte {
177 return db.KeySessionEvent(sid, seq)
178}
179
180// SequenceCounterKey returns the key that stores the latest sequence for sid.
181func SequenceCounterKey(sid string) []byte {
182 return db.KeySessionEventSeq(sid)
183}
184
185var nullPayload = json.RawMessage("null")
186
187func appendRecord(txn *db.Txn, clock timeutil.Clock, sid string, input AppendInput) (Record, error) {
188 if txn == nil {
189 return Record{}, errors.New("event: transaction is nil")
190 }
191 if !input.Type.Valid() {
192 return Record{}, ErrInvalidType
193 }
194
195 cmd := strings.TrimSpace(input.Command)
196 if cmd == "" {
197 return Record{}, ErrEmptyCommand
198 }
199
200 at := input.At
201 if clock == nil {
202 clock = timeutil.UTCClock{}
203 }
204 if at.IsZero() {
205 at = clock.Now()
206 }
207 at = timeutil.EnsureUTC(at)
208
209 payload, err := marshalPayload(input.Payload)
210 if err != nil {
211 return Record{}, err
212 }
213
214 seq, err := txn.IncrementUint64(db.KeySessionEventSeq(sid), 1)
215 if err != nil {
216 return Record{}, err
217 }
218
219 var reasonPtr *string
220 if trimmed := strings.TrimSpace(input.Reason); trimmed != "" {
221 reasonPtr = &trimmed
222 }
223
224 record := Record{
225 Seq: seq,
226 At: at,
227 Type: input.Type,
228 Reason: reasonPtr,
229 Command: cmd,
230 Payload: payload,
231 }
232
233 if err := txn.SetJSON(db.KeySessionEvent(sid, seq), record); err != nil {
234 return Record{}, err
235 }
236
237 return record, nil
238}
239
240func listRecords(txn *db.Txn, sid string, opts ListOptions) ([]Record, error) {
241 if txn == nil {
242 return nil, errors.New("event: transaction is nil")
243 }
244
245 var records []Record
246 err := txn.Iterate(db.IterateOptions{
247 Prefix: db.PrefixSessionEvents(sid),
248 PrefetchValues: true,
249 }, func(item db.Item) error {
250 var rec Record
251 if err := item.ValueJSON(&rec); err != nil {
252 return err
253 }
254
255 if rec.Seq <= opts.After {
256 return nil
257 }
258
259 records = append(records, rec)
260 if opts.Limit > 0 && len(records) >= opts.Limit {
261 return db.ErrTxnAborted
262 }
263
264 return nil
265 })
266 if err != nil && !errors.Is(err, db.ErrTxnAborted) {
267 return nil, err
268 }
269
270 return records, nil
271}
272
273func latestSequence(txn *db.Txn, sid string) (uint64, error) {
274 if txn == nil {
275 return 0, errors.New("event: transaction is nil")
276 }
277
278 key := db.KeySessionEventSeq(sid)
279 exists, err := txn.Exists(key)
280 if err != nil {
281 return 0, err
282 }
283 if !exists {
284 return 0, nil
285 }
286
287 value, err := txn.Get(key)
288 if err != nil {
289 return 0, err
290 }
291
292 if len(value) != 8 {
293 return 0, fmt.Errorf("event: corrupt sequence counter for session %q", sid)
294 }
295
296 return binary.BigEndian.Uint64(value), nil
297}
298
299func marshalPayload(payload any) (json.RawMessage, error) {
300 if payload == nil {
301 return nullPayload, nil
302 }
303 data, err := json.Marshal(payload)
304 if err != nil {
305 return nil, err
306 }
307 if len(data) == 0 {
308 return nullPayload, nil
309 }
310 return json.RawMessage(data), nil
311}