From a42f6e4afda793450af70fbe07fc79af2e6f008f Mon Sep 17 00:00:00 2001 From: Philip Zeyliger Date: Mon, 12 Jan 2026 20:15:14 -0800 Subject: [PATCH] shelley: fix race condition between message and metadata notifications MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Prompt: This test is flaky. [...] I think the issue is that slug updates aren't messages. I think we send conversation updates (see 3dbcb107dbbb6753c7af811d535b559c794a674b) over the same stream but without message ids. Fix it. Run the test a lot to make sure it's not flaky. notifySubscribers (for metadata like slug changes) was using Publish with sequence IDs, which could race with notifySubscribersNewMessage. If the metadata update ran first, it would consume the sequence slot and the actual message would be skipped. Changed notifySubscribers to use Broadcast instead, which doesn't interfere with message sequence tracking. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- server/server.go | 25 ++++---------------- server/sse_immediacy_test.go | 44 ++++++++++++++++++++---------------- 2 files changed, 30 insertions(+), 39 deletions(-) diff --git a/server/server.go b/server/server.go index ea91cefa55239a162aa9d1df2bb096eef8cc2bf3..b7e32c6e36ef95eaf8836154c085dd3c3f99736a 100644 --- a/server/server.go +++ b/server/server.go @@ -614,6 +614,7 @@ func convertToLLMMessage(msg generated.Message) (llm.Message, error) { // notifySubscribers sends conversation metadata updates (e.g., slug changes) to subscribers. // This is used when only the conversation data changes, not the messages. +// Uses Broadcast instead of Publish to avoid racing with message sequence IDs. func (s *Server) notifySubscribers(ctx context.Context, conversationID string) { s.mu.Lock() manager, exists := s.activeConversations[conversationID] @@ -635,30 +636,14 @@ func (s *Server) notifySubscribers(ctx context.Context, conversationID string) { return } - // For conversation-only updates, we need to get the latest sequence ID - // to properly notify subscribers, but we send an empty message list - var latestSequenceID int64 - err = s.db.Queries(ctx, func(q *generated.Queries) error { - messages, err := q.ListMessages(ctx, conversationID) - if err != nil { - return err - } - if len(messages) > 0 { - latestSequenceID = messages[len(messages)-1].SequenceID - } - return nil - }) - if err != nil { - s.logger.Error("Failed to get latest sequence ID", "conversationID", conversationID, "error", err) - return - } - - // Publish conversation update with no new messages + // Broadcast conversation update with no new messages. + // Using Broadcast instead of Publish ensures this metadata-only update + // doesn't race with notifySubscribersNewMessage which uses Publish with sequence IDs. streamData := StreamResponse{ Messages: nil, // No new messages, just conversation update Conversation: conversation, } - manager.subpub.Publish(latestSequenceID, streamData) + manager.subpub.Broadcast(streamData) // Also notify conversation list subscribers (e.g., slug change) s.publishConversationListUpdate(ConversationListUpdate{ diff --git a/server/sse_immediacy_test.go b/server/sse_immediacy_test.go index ddfdc39846e557a4e7d664988f46fd482666f215..432aa4ec9d67857f5089cc7e8de383cc2b850e2c 100644 --- a/server/sse_immediacy_test.go +++ b/server/sse_immediacy_test.go @@ -379,31 +379,37 @@ func TestSSEUserMessageWithExistingConnection(t *testing.T) { // We should receive an update with the user message within 500ms // (well before the 5 second LLM delay) - select { - case update := <-updates: - // Check that the update contains the user message - foundUserMsg := false - for _, msg := range update.Messages { - if msg.Type == string(db.MessageTypeUser) && msg.LlmData != nil { - var llmMsg llm.Message - if err := json.Unmarshal([]byte(*msg.LlmData), &llmMsg); err == nil { - for _, content := range llmMsg.Content { - if content.Type == llm.ContentTypeText && strings.Contains(content.Text, "delay: 5") { - foundUserMsg = true - break + // Note: We may receive other updates first (e.g., ConversationListUpdate for slug changes), + // so we need to keep checking until we find the user message or timeout. + deadline := time.Now().Add(500 * time.Millisecond) + foundUserMsg := false + + for time.Now().Before(deadline) && !foundUserMsg { + select { + case update := <-updates: + // Check if this update contains the user message + for _, msg := range update.Messages { + if msg.Type == string(db.MessageTypeUser) && msg.LlmData != nil { + var llmMsg llm.Message + if err := json.Unmarshal([]byte(*msg.LlmData), &llmMsg); err == nil { + for _, content := range llmMsg.Content { + if content.Type == llm.ContentTypeText && strings.Contains(content.Text, "delay: 5") { + foundUserMsg = true + break + } } } } } + case <-time.After(50 * time.Millisecond): + // Keep waiting } - if !foundUserMsg { - t.Error("received update but it didn't contain the user message") - t.Logf("update had %d messages", len(update.Messages)) - } else { - t.Log("SUCCESS: received user message via subpub immediately") - } - case <-time.After(500 * time.Millisecond): + } + + if !foundUserMsg { t.Error("BUG: did not receive subpub update with user message within 500ms") t.Log("This means notifySubscribers is failing or not being called after user message is recorded") + } else { + t.Log("SUCCESS: received user message via subpub immediately") } }