diff --git a/server/server.go b/server/server.go index ea91cefa55239a162aa9d1df2bb096eef8cc2bf3..b7e32c6e36ef95eaf8836154c085dd3c3f99736a 100644 --- a/server/server.go +++ b/server/server.go @@ -614,6 +614,7 @@ func convertToLLMMessage(msg generated.Message) (llm.Message, error) { // notifySubscribers sends conversation metadata updates (e.g., slug changes) to subscribers. // This is used when only the conversation data changes, not the messages. +// Uses Broadcast instead of Publish to avoid racing with message sequence IDs. func (s *Server) notifySubscribers(ctx context.Context, conversationID string) { s.mu.Lock() manager, exists := s.activeConversations[conversationID] @@ -635,30 +636,14 @@ func (s *Server) notifySubscribers(ctx context.Context, conversationID string) { return } - // For conversation-only updates, we need to get the latest sequence ID - // to properly notify subscribers, but we send an empty message list - var latestSequenceID int64 - err = s.db.Queries(ctx, func(q *generated.Queries) error { - messages, err := q.ListMessages(ctx, conversationID) - if err != nil { - return err - } - if len(messages) > 0 { - latestSequenceID = messages[len(messages)-1].SequenceID - } - return nil - }) - if err != nil { - s.logger.Error("Failed to get latest sequence ID", "conversationID", conversationID, "error", err) - return - } - - // Publish conversation update with no new messages + // Broadcast conversation update with no new messages. + // Using Broadcast instead of Publish ensures this metadata-only update + // doesn't race with notifySubscribersNewMessage which uses Publish with sequence IDs. streamData := StreamResponse{ Messages: nil, // No new messages, just conversation update Conversation: conversation, } - manager.subpub.Publish(latestSequenceID, streamData) + manager.subpub.Broadcast(streamData) // Also notify conversation list subscribers (e.g., slug change) s.publishConversationListUpdate(ConversationListUpdate{ diff --git a/server/sse_immediacy_test.go b/server/sse_immediacy_test.go index ddfdc39846e557a4e7d664988f46fd482666f215..432aa4ec9d67857f5089cc7e8de383cc2b850e2c 100644 --- a/server/sse_immediacy_test.go +++ b/server/sse_immediacy_test.go @@ -379,31 +379,37 @@ func TestSSEUserMessageWithExistingConnection(t *testing.T) { // We should receive an update with the user message within 500ms // (well before the 5 second LLM delay) - select { - case update := <-updates: - // Check that the update contains the user message - foundUserMsg := false - for _, msg := range update.Messages { - if msg.Type == string(db.MessageTypeUser) && msg.LlmData != nil { - var llmMsg llm.Message - if err := json.Unmarshal([]byte(*msg.LlmData), &llmMsg); err == nil { - for _, content := range llmMsg.Content { - if content.Type == llm.ContentTypeText && strings.Contains(content.Text, "delay: 5") { - foundUserMsg = true - break + // Note: We may receive other updates first (e.g., ConversationListUpdate for slug changes), + // so we need to keep checking until we find the user message or timeout. + deadline := time.Now().Add(500 * time.Millisecond) + foundUserMsg := false + + for time.Now().Before(deadline) && !foundUserMsg { + select { + case update := <-updates: + // Check if this update contains the user message + for _, msg := range update.Messages { + if msg.Type == string(db.MessageTypeUser) && msg.LlmData != nil { + var llmMsg llm.Message + if err := json.Unmarshal([]byte(*msg.LlmData), &llmMsg); err == nil { + for _, content := range llmMsg.Content { + if content.Type == llm.ContentTypeText && strings.Contains(content.Text, "delay: 5") { + foundUserMsg = true + break + } } } } } + case <-time.After(50 * time.Millisecond): + // Keep waiting } - if !foundUserMsg { - t.Error("received update but it didn't contain the user message") - t.Logf("update had %d messages", len(update.Messages)) - } else { - t.Log("SUCCESS: received user message via subpub immediately") - } - case <-time.After(500 * time.Millisecond): + } + + if !foundUserMsg { t.Error("BUG: did not receive subpub update with user message within 500ms") t.Log("This means notifySubscribers is failing or not being called after user message is recorded") + } else { + t.Log("SUCCESS: received user message via subpub immediately") } }