1// SPDX-FileCopyrightText: Amolith <amolith@secluded.site>
  2//
  3// SPDX-License-Identifier: AGPL-3.0-or-later
  4
  5package event
  6
  7import (
  8	"context"
  9	"encoding/binary"
 10	"encoding/json"
 11	"errors"
 12	"fmt"
 13	"strings"
 14	"time"
 15
 16	"git.secluded.site/np/internal/db"
 17	"git.secluded.site/np/internal/timeutil"
 18)
 19
 20// ErrInvalidType indicates that an event type was not recognised.
 21var ErrInvalidType = errors.New("event: invalid type")
 22
 23// ErrEmptyCommand indicates that a command string was not provided.
 24var ErrEmptyCommand = errors.New("event: command is required")
 25
 26// Type enumerates known event kinds.
 27type Type string
 28
 29const (
 30	TypeGoalSet          Type = "goal_set"
 31	TypeGoalUpdated      Type = "goal_updated"
 32	TypeTaskAdded        Type = "task_added"
 33	TypeTaskUpdated      Type = "task_updated"
 34	TypeTaskStatusChange Type = "task_status_changed"
 35)
 36
 37// Valid reports whether t is a recognised event type.
 38func (t Type) Valid() bool {
 39	switch t {
 40	case TypeGoalSet,
 41		TypeGoalUpdated,
 42		TypeTaskAdded,
 43		TypeTaskUpdated,
 44		TypeTaskStatusChange:
 45		return true
 46	default:
 47		return false
 48	}
 49}
 50
 51// Record represents a stored event entry.
 52type Record struct {
 53	Seq     uint64          `json:"seq"`
 54	At      time.Time       `json:"at"`
 55	Type    Type            `json:"type"`
 56	Reason  *string         `json:"reason,omitempty"`
 57	Command string          `json:"cmd"`
 58	Payload json.RawMessage `json:"payload"`
 59}
 60
 61// HasReason reports whether a reason was supplied.
 62func (r Record) HasReason() bool {
 63	return r.Reason != nil && *r.Reason != ""
 64}
 65
 66// UnmarshalPayload decodes the payload into dst.
 67func (r Record) UnmarshalPayload(dst any) error {
 68	if len(r.Payload) == 0 {
 69		return errors.New("event: payload is empty")
 70	}
 71	return json.Unmarshal(r.Payload, dst)
 72}
 73
 74// AppendInput captures the data necessary to append an event.
 75type AppendInput struct {
 76	Type    Type
 77	Command string
 78	Reason  string
 79	Payload any
 80	At      time.Time
 81}
 82
 83// ListOptions controls event listing.
 84type ListOptions struct {
 85	// After skips events with seq <= After.
 86	After uint64
 87	// Limit restricts the number of events returned. Zero or negative returns all.
 88	Limit int
 89}
 90
 91// Store provides high-level helpers for working with events.
 92type Store struct {
 93	db    *db.Database
 94	clock timeutil.Clock
 95}
 96
 97// NewStore constructs a Store. When clock is nil, a UTC system clock is used.
 98func NewStore(database *db.Database, clock timeutil.Clock) *Store {
 99	if clock == nil {
100		clock = timeutil.UTCClock{}
101	}
102	return &Store{
103		db:    database,
104		clock: clock,
105	}
106}
107
108// WithTxn exposes transactional helpers for use within db.Update.
109func (s *Store) WithTxn(txn *db.Txn) TxnStore {
110	return TxnStore{
111		txn:   txn,
112		clock: s.clock,
113	}
114}
115
116// Append records an event for sid.
117func (s *Store) Append(ctx context.Context, sid string, input AppendInput) (Record, error) {
118	var rec Record
119	err := s.db.Update(ctx, func(txn *db.Txn) error {
120		var err error
121		rec, err = appendRecord(txn, s.clock, sid, input)
122		return err
123	})
124	return rec, err
125}
126
127// List returns events for sid subject to opts.
128func (s *Store) List(ctx context.Context, sid string, opts ListOptions) ([]Record, error) {
129	var records []Record
130	err := s.db.View(ctx, func(txn *db.Txn) error {
131		var err error
132		records, err = listRecords(txn, sid, opts)
133		return err
134	})
135	return records, err
136}
137
138// LatestSequence returns the latest event sequence for sid.
139func (s *Store) LatestSequence(ctx context.Context, sid string) (uint64, error) {
140	var seq uint64
141	err := s.db.View(ctx, func(txn *db.Txn) error {
142		var err error
143		seq, err = latestSequence(txn, sid)
144		return err
145	})
146	return seq, err
147}
148
149// TxnStore wraps a db transaction for event operations.
150type TxnStore struct {
151	txn   *db.Txn
152	clock timeutil.Clock
153}
154
155// Append records an event using the wrapped transaction.
156func (s TxnStore) Append(sid string, input AppendInput) (Record, error) {
157	return appendRecord(s.txn, s.clock, sid, input)
158}
159
160// List returns events for sid using opts.
161func (s TxnStore) List(sid string, opts ListOptions) ([]Record, error) {
162	return listRecords(s.txn, sid, opts)
163}
164
165// LatestSequence returns the most recent event sequence.
166func (s TxnStore) LatestSequence(sid string) (uint64, error) {
167	return latestSequence(s.txn, sid)
168}
169
170// Prefix returns the subscription prefix for events in sid.
171func Prefix(sid string) []byte {
172	return db.PrefixSessionEvents(sid)
173}
174
175// Key returns the event key for seq within sid.
176func Key(sid string, seq uint64) []byte {
177	return db.KeySessionEvent(sid, seq)
178}
179
180// SequenceCounterKey returns the key that stores the latest sequence for sid.
181func SequenceCounterKey(sid string) []byte {
182	return db.KeySessionEventSeq(sid)
183}
184
185var nullPayload = json.RawMessage("null")
186
187func appendRecord(txn *db.Txn, clock timeutil.Clock, sid string, input AppendInput) (Record, error) {
188	if txn == nil {
189		return Record{}, errors.New("event: transaction is nil")
190	}
191	if !input.Type.Valid() {
192		return Record{}, ErrInvalidType
193	}
194
195	cmd := strings.TrimSpace(input.Command)
196	if cmd == "" {
197		return Record{}, ErrEmptyCommand
198	}
199
200	at := input.At
201	if clock == nil {
202		clock = timeutil.UTCClock{}
203	}
204	if at.IsZero() {
205		at = clock.Now()
206	}
207	at = timeutil.EnsureUTC(at)
208
209	payload, err := marshalPayload(input.Payload)
210	if err != nil {
211		return Record{}, err
212	}
213
214	seq, err := txn.IncrementUint64(db.KeySessionEventSeq(sid), 1)
215	if err != nil {
216		return Record{}, err
217	}
218
219	var reasonPtr *string
220	if trimmed := strings.TrimSpace(input.Reason); trimmed != "" {
221		reasonPtr = &trimmed
222	}
223
224	record := Record{
225		Seq:     seq,
226		At:      at,
227		Type:    input.Type,
228		Reason:  reasonPtr,
229		Command: cmd,
230		Payload: payload,
231	}
232
233	if err := txn.SetJSON(db.KeySessionEvent(sid, seq), record); err != nil {
234		return Record{}, err
235	}
236
237	return record, nil
238}
239
240func listRecords(txn *db.Txn, sid string, opts ListOptions) ([]Record, error) {
241	if txn == nil {
242		return nil, errors.New("event: transaction is nil")
243	}
244
245	var records []Record
246	err := txn.Iterate(db.IterateOptions{
247		Prefix:         db.PrefixSessionEvents(sid),
248		PrefetchValues: true,
249	}, func(item db.Item) error {
250		var rec Record
251		if err := item.ValueJSON(&rec); err != nil {
252			return err
253		}
254
255		if rec.Seq <= opts.After {
256			return nil
257		}
258
259		records = append(records, rec)
260		if opts.Limit > 0 && len(records) >= opts.Limit {
261			return db.ErrTxnAborted
262		}
263
264		return nil
265	})
266	if err != nil && !errors.Is(err, db.ErrTxnAborted) {
267		return nil, err
268	}
269
270	return records, nil
271}
272
273func latestSequence(txn *db.Txn, sid string) (uint64, error) {
274	if txn == nil {
275		return 0, errors.New("event: transaction is nil")
276	}
277
278	key := db.KeySessionEventSeq(sid)
279	exists, err := txn.Exists(key)
280	if err != nil {
281		return 0, err
282	}
283	if !exists {
284		return 0, nil
285	}
286
287	value, err := txn.Get(key)
288	if err != nil {
289		return 0, err
290	}
291
292	if len(value) != 8 {
293		return 0, fmt.Errorf("event: corrupt sequence counter for session %q", sid)
294	}
295
296	return binary.BigEndian.Uint64(value), nil
297}
298
299func marshalPayload(payload any) (json.RawMessage, error) {
300	if payload == nil {
301		return nullPayload, nil
302	}
303	data, err := json.Marshal(payload)
304	if err != nil {
305		return nil, err
306	}
307	if len(data) == 0 {
308		return nullPayload, nil
309	}
310	return json.RawMessage(data), nil
311}