@@ -614,6 +614,7 @@ func convertToLLMMessage(msg generated.Message) (llm.Message, error) {
// notifySubscribers sends conversation metadata updates (e.g., slug changes) to subscribers.
// This is used when only the conversation data changes, not the messages.
+// Uses Broadcast instead of Publish to avoid racing with message sequence IDs.
func (s *Server) notifySubscribers(ctx context.Context, conversationID string) {
s.mu.Lock()
manager, exists := s.activeConversations[conversationID]
@@ -635,30 +636,14 @@ func (s *Server) notifySubscribers(ctx context.Context, conversationID string) {
return
}
- // For conversation-only updates, we need to get the latest sequence ID
- // to properly notify subscribers, but we send an empty message list
- var latestSequenceID int64
- err = s.db.Queries(ctx, func(q *generated.Queries) error {
- messages, err := q.ListMessages(ctx, conversationID)
- if err != nil {
- return err
- }
- if len(messages) > 0 {
- latestSequenceID = messages[len(messages)-1].SequenceID
- }
- return nil
- })
- if err != nil {
- s.logger.Error("Failed to get latest sequence ID", "conversationID", conversationID, "error", err)
- return
- }
-
- // Publish conversation update with no new messages
+ // Broadcast conversation update with no new messages.
+ // Using Broadcast instead of Publish ensures this metadata-only update
+ // doesn't race with notifySubscribersNewMessage which uses Publish with sequence IDs.
streamData := StreamResponse{
Messages: nil, // No new messages, just conversation update
Conversation: conversation,
}
- manager.subpub.Publish(latestSequenceID, streamData)
+ manager.subpub.Broadcast(streamData)
// Also notify conversation list subscribers (e.g., slug change)
s.publishConversationListUpdate(ConversationListUpdate{
@@ -379,31 +379,37 @@ func TestSSEUserMessageWithExistingConnection(t *testing.T) {
// We should receive an update with the user message within 500ms
// (well before the 5 second LLM delay)
- select {
- case update := <-updates:
- // Check that the update contains the user message
- foundUserMsg := false
- for _, msg := range update.Messages {
- if msg.Type == string(db.MessageTypeUser) && msg.LlmData != nil {
- var llmMsg llm.Message
- if err := json.Unmarshal([]byte(*msg.LlmData), &llmMsg); err == nil {
- for _, content := range llmMsg.Content {
- if content.Type == llm.ContentTypeText && strings.Contains(content.Text, "delay: 5") {
- foundUserMsg = true
- break
+ // Note: We may receive other updates first (e.g., ConversationListUpdate for slug changes),
+ // so we need to keep checking until we find the user message or timeout.
+ deadline := time.Now().Add(500 * time.Millisecond)
+ foundUserMsg := false
+
+ for time.Now().Before(deadline) && !foundUserMsg {
+ select {
+ case update := <-updates:
+ // Check if this update contains the user message
+ for _, msg := range update.Messages {
+ if msg.Type == string(db.MessageTypeUser) && msg.LlmData != nil {
+ var llmMsg llm.Message
+ if err := json.Unmarshal([]byte(*msg.LlmData), &llmMsg); err == nil {
+ for _, content := range llmMsg.Content {
+ if content.Type == llm.ContentTypeText && strings.Contains(content.Text, "delay: 5") {
+ foundUserMsg = true
+ break
+ }
}
}
}
}
+ case <-time.After(50 * time.Millisecond):
+ // Keep waiting
}
- if !foundUserMsg {
- t.Error("received update but it didn't contain the user message")
- t.Logf("update had %d messages", len(update.Messages))
- } else {
- t.Log("SUCCESS: received user message via subpub immediately")
- }
- case <-time.After(500 * time.Millisecond):
+ }
+
+ if !foundUserMsg {
t.Error("BUG: did not receive subpub update with user message within 500ms")
t.Log("This means notifySubscribers is failing or not being called after user message is recorded")
+ } else {
+ t.Log("SUCCESS: received user message via subpub immediately")
}
}