1package message
2
3import (
4 "context"
5 "encoding/json"
6
7 "github.com/cloudwego/eino/schema"
8 "github.com/google/uuid"
9 "github.com/kujtimiihoxha/termai/internal/db"
10 "github.com/kujtimiihoxha/termai/internal/pubsub"
11)
12
13type Message struct {
14 ID string
15 SessionID string
16 MessageData schema.Message
17
18 CreatedAt int64
19 UpdatedAt int64
20}
21
22type Service interface {
23 pubsub.Suscriber[Message]
24 Create(sessionID string, messageData schema.Message) (Message, error)
25 Get(id string) (Message, error)
26 List(sessionID string) ([]Message, error)
27 Delete(id string) error
28 DeleteSessionMessages(sessionID string) error
29}
30
31type service struct {
32 *pubsub.Broker[Message]
33 q db.Querier
34 ctx context.Context
35}
36
37func (s *service) Create(sessionID string, messageData schema.Message) (Message, error) {
38 messageDataJSON, err := json.Marshal(messageData)
39 if err != nil {
40 return Message{}, err
41 }
42 dbMessage, err := s.q.CreateMessage(s.ctx, db.CreateMessageParams{
43 ID: uuid.New().String(),
44 SessionID: sessionID,
45 MessageData: string(messageDataJSON),
46 })
47 if err != nil {
48 return Message{}, err
49 }
50 message := s.fromDBItem(dbMessage)
51 s.Publish(pubsub.CreatedEvent, message)
52 return message, nil
53}
54
55func (s *service) Delete(id string) error {
56 message, err := s.Get(id)
57 if err != nil {
58 return err
59 }
60 err = s.q.DeleteMessage(s.ctx, message.ID)
61 if err != nil {
62 return err
63 }
64 s.Publish(pubsub.DeletedEvent, message)
65 return nil
66}
67
68func (s *service) DeleteSessionMessages(sessionID string) error {
69 messages, err := s.List(sessionID)
70 if err != nil {
71 return err
72 }
73 for _, message := range messages {
74 if message.SessionID == sessionID {
75 err = s.Delete(message.ID)
76 if err != nil {
77 return err
78 }
79 }
80 }
81 return nil
82}
83
84func (s *service) Get(id string) (Message, error) {
85 dbMessage, err := s.q.GetMessage(s.ctx, id)
86 if err != nil {
87 return Message{}, err
88 }
89 return s.fromDBItem(dbMessage), nil
90}
91
92func (s *service) List(sessionID string) ([]Message, error) {
93 dbMessages, err := s.q.ListMessagesBySession(s.ctx, sessionID)
94 if err != nil {
95 return nil, err
96 }
97 messages := make([]Message, len(dbMessages))
98 for i, dbMessage := range dbMessages {
99 messages[i] = s.fromDBItem(dbMessage)
100 }
101 return messages, nil
102}
103
104func (s *service) fromDBItem(item db.Message) Message {
105 var messageData schema.Message
106 json.Unmarshal([]byte(item.MessageData), &messageData)
107 return Message{
108 ID: item.ID,
109 SessionID: item.SessionID,
110 MessageData: messageData,
111 CreatedAt: item.CreatedAt,
112 UpdatedAt: item.UpdatedAt,
113 }
114}
115
116func NewService(ctx context.Context, q db.Querier) Service {
117 return &service{
118 Broker: pubsub.NewBroker[Message](),
119 q: q,
120 ctx: ctx,
121 }
122}