message.go

  1package message
  2
  3import (
  4	"context"
  5	"encoding/json"
  6
  7	"github.com/cloudwego/eino/schema"
  8	"github.com/google/uuid"
  9	"github.com/kujtimiihoxha/termai/internal/db"
 10	"github.com/kujtimiihoxha/termai/internal/pubsub"
 11)
 12
 13type Message struct {
 14	ID          string
 15	SessionID   string
 16	MessageData schema.Message
 17
 18	CreatedAt int64
 19	UpdatedAt int64
 20}
 21
 22type Service interface {
 23	pubsub.Suscriber[Message]
 24	Create(sessionID string, messageData schema.Message) (Message, error)
 25	Get(id string) (Message, error)
 26	List(sessionID string) ([]Message, error)
 27	Delete(id string) error
 28	DeleteSessionMessages(sessionID string) error
 29}
 30
 31type service struct {
 32	*pubsub.Broker[Message]
 33	q   db.Querier
 34	ctx context.Context
 35}
 36
 37func (s *service) Create(sessionID string, messageData schema.Message) (Message, error) {
 38	messageDataJSON, err := json.Marshal(messageData)
 39	if err != nil {
 40		return Message{}, err
 41	}
 42	dbMessage, err := s.q.CreateMessage(s.ctx, db.CreateMessageParams{
 43		ID:          uuid.New().String(),
 44		SessionID:   sessionID,
 45		MessageData: string(messageDataJSON),
 46	})
 47	if err != nil {
 48		return Message{}, err
 49	}
 50	message := s.fromDBItem(dbMessage)
 51	s.Publish(pubsub.CreatedEvent, message)
 52	return message, nil
 53}
 54
 55func (s *service) Delete(id string) error {
 56	message, err := s.Get(id)
 57	if err != nil {
 58		return err
 59	}
 60	err = s.q.DeleteMessage(s.ctx, message.ID)
 61	if err != nil {
 62		return err
 63	}
 64	s.Publish(pubsub.DeletedEvent, message)
 65	return nil
 66}
 67
 68func (s *service) DeleteSessionMessages(sessionID string) error {
 69	messages, err := s.List(sessionID)
 70	if err != nil {
 71		return err
 72	}
 73	for _, message := range messages {
 74		if message.SessionID == sessionID {
 75			err = s.Delete(message.ID)
 76			if err != nil {
 77				return err
 78			}
 79		}
 80	}
 81	return nil
 82}
 83
 84func (s *service) Get(id string) (Message, error) {
 85	dbMessage, err := s.q.GetMessage(s.ctx, id)
 86	if err != nil {
 87		return Message{}, err
 88	}
 89	return s.fromDBItem(dbMessage), nil
 90}
 91
 92func (s *service) List(sessionID string) ([]Message, error) {
 93	dbMessages, err := s.q.ListMessagesBySession(s.ctx, sessionID)
 94	if err != nil {
 95		return nil, err
 96	}
 97	messages := make([]Message, len(dbMessages))
 98	for i, dbMessage := range dbMessages {
 99		messages[i] = s.fromDBItem(dbMessage)
100	}
101	return messages, nil
102}
103
104func (s *service) fromDBItem(item db.Message) Message {
105	var messageData schema.Message
106	json.Unmarshal([]byte(item.MessageData), &messageData)
107	return Message{
108		ID:          item.ID,
109		SessionID:   item.SessionID,
110		MessageData: messageData,
111		CreatedAt:   item.CreatedAt,
112		UpdatedAt:   item.UpdatedAt,
113	}
114}
115
116func NewService(ctx context.Context, q db.Querier) Service {
117	return &service{
118		Broker: pubsub.NewBroker[Message](),
119		q:      q,
120		ctx:    ctx,
121	}
122}