diff --git a/server/conversation_list_stream_test.go b/server/conversation_list_stream_test.go new file mode 100644 index 0000000000000000000000000000000000000000..322c6cf41254e9dc616f96a35063986c5835944a --- /dev/null +++ b/server/conversation_list_stream_test.go @@ -0,0 +1,385 @@ +package server + +import ( + "context" + "encoding/json" + "log/slog" + "net/http" + "net/http/httptest" + "os" + "strings" + "testing" + "time" + + "shelley.exe.dev/claudetool" + "shelley.exe.dev/loop" +) + +// TestConversationStreamReceivesListUpdateForNewConversation tests that when subscribed +// to one conversation's stream, we receive updates about new conversations. +func TestConversationStreamReceivesListUpdateForNewConversation(t *testing.T) { + database, cleanup := setupTestDB(t) + defer cleanup() + + predictableService := loop.NewPredictableService() + llmManager := &testLLMManager{service: predictableService} + logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelWarn})) + server := NewServer(database, llmManager, claudetool.ToolSetConfig{}, logger, true, "", "predictable", "", nil) + + // Create a conversation to subscribe to + conversation, err := database.CreateConversation(context.Background(), nil, true, nil) + if err != nil { + t.Fatalf("failed to create conversation: %v", err) + } + + // Get or create conversation manager to ensure the conversation is active + _, err = server.getOrCreateConversationManager(context.Background(), conversation.ConversationID) + if err != nil { + t.Fatalf("failed to get conversation manager: %v", err) + } + + // Start the conversation stream + sseCtx, sseCancel := context.WithCancel(context.Background()) + defer sseCancel() + + sseRecorder := newFlusherRecorder() + sseReq := httptest.NewRequest("GET", "/api/conversation/"+conversation.ConversationID+"/stream", nil) + sseReq = sseReq.WithContext(sseCtx) + + sseStarted := make(chan struct{}) + sseDone := make(chan struct{}) + go func() { + close(sseStarted) + server.handleStreamConversation(sseRecorder, sseReq, conversation.ConversationID) + close(sseDone) + }() + + <-sseStarted + + // Wait for the initial event + select { + case <-sseRecorder.flushed: + case <-time.After(2 * time.Second): + t.Fatal("timeout waiting for initial SSE event") + } + + // Create another conversation via the API + chatReq := ChatRequest{ + Message: "hello", + Model: "predictable", + } + chatBody, _ := json.Marshal(chatReq) + req := httptest.NewRequest("POST", "/api/conversations/new", strings.NewReader(string(chatBody))) + req.Header.Set("Content-Type", "application/json") + w := httptest.NewRecorder() + + server.handleNewConversation(w, req) + if w.Code != http.StatusCreated { + t.Fatalf("expected status 201, got %d: %s", w.Code, w.Body.String()) + } + + var resp struct { + ConversationID string `json:"conversation_id"` + } + if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil { + t.Fatalf("failed to parse response: %v", err) + } + + // Wait for the conversation list update to come through the existing stream + deadline := time.Now().Add(2 * time.Second) + var receivedUpdate bool + for time.Now().Before(deadline) && !receivedUpdate { + select { + case <-sseRecorder.flushed: + chunks := sseRecorder.getChunks() + for _, chunk := range chunks { + // Check for conversation_list_update with the new conversation ID + if strings.Contains(chunk, "conversation_list_update") && strings.Contains(chunk, resp.ConversationID) { + receivedUpdate = true + break + } + } + case <-time.After(100 * time.Millisecond): + } + } + + if !receivedUpdate { + t.Error("did not receive conversation list update for new conversation") + chunks := sseRecorder.getChunks() + t.Logf("SSE chunks received: %v", chunks) + } + + sseCancel() + <-sseDone +} + +// TestConversationStreamReceivesListUpdateForRename tests that when subscribed +// to one conversation's stream, we receive updates when another conversation is renamed. +func TestConversationStreamReceivesListUpdateForRename(t *testing.T) { + database, cleanup := setupTestDB(t) + defer cleanup() + + predictableService := loop.NewPredictableService() + llmManager := &testLLMManager{service: predictableService} + logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelWarn})) + server := NewServer(database, llmManager, claudetool.ToolSetConfig{}, logger, true, "", "predictable", "", nil) + + // Create two conversations + conv1, err := database.CreateConversation(context.Background(), nil, true, nil) + if err != nil { + t.Fatalf("failed to create conversation 1: %v", err) + } + conv2, err := database.CreateConversation(context.Background(), nil, true, nil) + if err != nil { + t.Fatalf("failed to create conversation 2: %v", err) + } + + // Get or create conversation manager for conv1 (the one we'll subscribe to) + _, err = server.getOrCreateConversationManager(context.Background(), conv1.ConversationID) + if err != nil { + t.Fatalf("failed to get conversation manager: %v", err) + } + + // Start the conversation stream for conv1 + sseCtx, sseCancel := context.WithCancel(context.Background()) + defer sseCancel() + + sseRecorder := newFlusherRecorder() + sseReq := httptest.NewRequest("GET", "/api/conversation/"+conv1.ConversationID+"/stream", nil) + sseReq = sseReq.WithContext(sseCtx) + + sseStarted := make(chan struct{}) + sseDone := make(chan struct{}) + go func() { + close(sseStarted) + server.handleStreamConversation(sseRecorder, sseReq, conv1.ConversationID) + close(sseDone) + }() + + <-sseStarted + + // Wait for the initial event + select { + case <-sseRecorder.flushed: + case <-time.After(2 * time.Second): + t.Fatal("timeout waiting for initial SSE event") + } + + // Rename conv2 + renameReq := RenameRequest{Slug: "test-slug-rename"} + renameBody, _ := json.Marshal(renameReq) + req := httptest.NewRequest("POST", "/api/conversation/"+conv2.ConversationID+"/rename", strings.NewReader(string(renameBody))) + req.Header.Set("Content-Type", "application/json") + w := httptest.NewRecorder() + + server.handleRenameConversation(w, req, conv2.ConversationID) + if w.Code != http.StatusOK { + t.Fatalf("expected status 200, got %d: %s", w.Code, w.Body.String()) + } + + // Wait for the conversation list update with the new slug + deadline := time.Now().Add(2 * time.Second) + var receivedUpdate bool + for time.Now().Before(deadline) && !receivedUpdate { + select { + case <-sseRecorder.flushed: + chunks := sseRecorder.getChunks() + for _, chunk := range chunks { + if strings.Contains(chunk, "conversation_list_update") && strings.Contains(chunk, "test-slug-rename") { + receivedUpdate = true + break + } + } + case <-time.After(100 * time.Millisecond): + } + } + + if !receivedUpdate { + t.Error("did not receive conversation list update for slug change") + chunks := sseRecorder.getChunks() + t.Logf("SSE chunks received: %v", chunks) + } + + sseCancel() + <-sseDone +} + +// TestConversationStreamReceivesListUpdateForDelete tests that when subscribed +// to one conversation's stream, we receive updates when another conversation is deleted. +func TestConversationStreamReceivesListUpdateForDelete(t *testing.T) { + database, cleanup := setupTestDB(t) + defer cleanup() + + predictableService := loop.NewPredictableService() + llmManager := &testLLMManager{service: predictableService} + logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelWarn})) + server := NewServer(database, llmManager, claudetool.ToolSetConfig{}, logger, true, "", "predictable", "", nil) + + // Create two conversations + conv1, err := database.CreateConversation(context.Background(), nil, true, nil) + if err != nil { + t.Fatalf("failed to create conversation 1: %v", err) + } + conv2, err := database.CreateConversation(context.Background(), nil, true, nil) + if err != nil { + t.Fatalf("failed to create conversation 2: %v", err) + } + + // Get or create conversation manager for conv1 + _, err = server.getOrCreateConversationManager(context.Background(), conv1.ConversationID) + if err != nil { + t.Fatalf("failed to get conversation manager: %v", err) + } + + // Start the conversation stream for conv1 + sseCtx, sseCancel := context.WithCancel(context.Background()) + defer sseCancel() + + sseRecorder := newFlusherRecorder() + sseReq := httptest.NewRequest("GET", "/api/conversation/"+conv1.ConversationID+"/stream", nil) + sseReq = sseReq.WithContext(sseCtx) + + sseStarted := make(chan struct{}) + sseDone := make(chan struct{}) + go func() { + close(sseStarted) + server.handleStreamConversation(sseRecorder, sseReq, conv1.ConversationID) + close(sseDone) + }() + + <-sseStarted + + // Wait for the initial event + select { + case <-sseRecorder.flushed: + case <-time.After(2 * time.Second): + t.Fatal("timeout waiting for initial SSE event") + } + + // Delete conv2 + req := httptest.NewRequest("POST", "/api/conversation/"+conv2.ConversationID+"/delete", nil) + w := httptest.NewRecorder() + + server.handleDeleteConversation(w, req, conv2.ConversationID) + if w.Code != http.StatusOK { + t.Fatalf("expected status 200, got %d: %s", w.Code, w.Body.String()) + } + + // Wait for the delete update + deadline := time.Now().Add(2 * time.Second) + var receivedUpdate bool + for time.Now().Before(deadline) && !receivedUpdate { + select { + case <-sseRecorder.flushed: + chunks := sseRecorder.getChunks() + for _, chunk := range chunks { + if strings.Contains(chunk, "conversation_list_update") && + strings.Contains(chunk, `"type":"delete"`) && + strings.Contains(chunk, conv2.ConversationID) { + receivedUpdate = true + break + } + } + case <-time.After(100 * time.Millisecond): + } + } + + if !receivedUpdate { + t.Error("did not receive conversation list delete update") + chunks := sseRecorder.getChunks() + t.Logf("SSE chunks received: %v", chunks) + } + + sseCancel() + <-sseDone +} + +// TestConversationStreamReceivesListUpdateForArchive tests that when subscribed +// to one conversation's stream, we receive updates when another conversation is archived. +func TestConversationStreamReceivesListUpdateForArchive(t *testing.T) { + database, cleanup := setupTestDB(t) + defer cleanup() + + predictableService := loop.NewPredictableService() + llmManager := &testLLMManager{service: predictableService} + logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelWarn})) + server := NewServer(database, llmManager, claudetool.ToolSetConfig{}, logger, true, "", "predictable", "", nil) + + // Create two conversations + conv1, err := database.CreateConversation(context.Background(), nil, true, nil) + if err != nil { + t.Fatalf("failed to create conversation 1: %v", err) + } + conv2, err := database.CreateConversation(context.Background(), nil, true, nil) + if err != nil { + t.Fatalf("failed to create conversation 2: %v", err) + } + + // Get or create conversation manager for conv1 + _, err = server.getOrCreateConversationManager(context.Background(), conv1.ConversationID) + if err != nil { + t.Fatalf("failed to get conversation manager: %v", err) + } + + // Start the conversation stream for conv1 + sseCtx, sseCancel := context.WithCancel(context.Background()) + defer sseCancel() + + sseRecorder := newFlusherRecorder() + sseReq := httptest.NewRequest("GET", "/api/conversation/"+conv1.ConversationID+"/stream", nil) + sseReq = sseReq.WithContext(sseCtx) + + sseStarted := make(chan struct{}) + sseDone := make(chan struct{}) + go func() { + close(sseStarted) + server.handleStreamConversation(sseRecorder, sseReq, conv1.ConversationID) + close(sseDone) + }() + + <-sseStarted + + // Wait for the initial event + select { + case <-sseRecorder.flushed: + case <-time.After(2 * time.Second): + t.Fatal("timeout waiting for initial SSE event") + } + + // Archive conv2 + req := httptest.NewRequest("POST", "/api/conversation/"+conv2.ConversationID+"/archive", nil) + w := httptest.NewRecorder() + + server.handleArchiveConversation(w, req, conv2.ConversationID) + if w.Code != http.StatusOK { + t.Fatalf("expected status 200, got %d: %s", w.Code, w.Body.String()) + } + + // Wait for the archive update + deadline := time.Now().Add(2 * time.Second) + var receivedUpdate bool + for time.Now().Before(deadline) && !receivedUpdate { + select { + case <-sseRecorder.flushed: + chunks := sseRecorder.getChunks() + for _, chunk := range chunks { + if strings.Contains(chunk, "conversation_list_update") && + strings.Contains(chunk, conv2.ConversationID) && + strings.Contains(chunk, `"archived":true`) { + receivedUpdate = true + break + } + } + case <-time.After(100 * time.Millisecond): + } + } + + if !receivedUpdate { + t.Error("did not receive conversation list archive update") + chunks := sseRecorder.getChunks() + t.Logf("SSE chunks received: %v", chunks) + } + + sseCancel() + <-sseDone +} diff --git a/server/handlers.go b/server/handlers.go index 0367add6da16b8bc854492e357bdefb4a8ea7f2b..dc3cd7e525f4a135cc3b69a857e2ac02862fd132 100644 --- a/server/handlers.go +++ b/server/handlers.go @@ -731,6 +731,12 @@ func (s *Server) handleNewConversation(w http.ResponseWriter, r *http.Request) { } conversationID := conversation.ConversationID + // Notify conversation list subscribers about the new conversation + go s.publishConversationListUpdate(ConversationListUpdate{ + Type: "update", + Conversation: conversation, + }) + // Get or create conversation manager manager, err := s.getOrCreateConversationManager(ctx, conversationID) if err != nil { @@ -1104,6 +1110,12 @@ func (s *Server) handleArchiveConversation(w http.ResponseWriter, r *http.Reques return } + // Notify conversation list subscribers + go s.publishConversationListUpdate(ConversationListUpdate{ + Type: "update", + Conversation: conversation, + }) + w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(conversation) } @@ -1123,6 +1135,12 @@ func (s *Server) handleUnarchiveConversation(w http.ResponseWriter, r *http.Requ return } + // Notify conversation list subscribers + go s.publishConversationListUpdate(ConversationListUpdate{ + Type: "update", + Conversation: conversation, + }) + w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(conversation) } @@ -1141,6 +1159,12 @@ func (s *Server) handleDeleteConversation(w http.ResponseWriter, r *http.Request return } + // Notify conversation list subscribers about the deletion + go s.publishConversationListUpdate(ConversationListUpdate{ + Type: "delete", + ConversationID: conversationID, + }) + w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(map[string]string{"status": "deleted"}) } @@ -1208,6 +1232,12 @@ func (s *Server) handleRenameConversation(w http.ResponseWriter, r *http.Request return } + // Notify conversation list subscribers + go s.publishConversationListUpdate(ConversationListUpdate{ + Type: "update", + Conversation: conversation, + }) + w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(conversation) } diff --git a/server/server.go b/server/server.go index 3ddce141a1e8794c917e5c1052042a69a12b886f..ea91cefa55239a162aa9d1df2bb096eef8cc2bf3 100644 --- a/server/server.go +++ b/server/server.go @@ -47,6 +47,8 @@ type StreamResponse struct { Conversation generated.Conversation `json:"conversation"` AgentWorking bool `json:"agent_working"` ContextWindowSize uint64 `json:"context_window_size,omitempty"` + // ConversationListUpdate is set when another conversation in the list changed + ConversationListUpdate *ConversationListUpdate `json:"conversation_list_update,omitempty"` } // LLMProvider is an interface for getting LLM services @@ -234,6 +236,13 @@ func calculateContextWindowSizeFromMsg(msg *generated.Message) uint64 { return usage.ContextWindowUsed() } +// ConversationListUpdate represents an update to the conversation list +type ConversationListUpdate struct { + Type string `json:"type"` // "update", "delete" + Conversation *generated.Conversation `json:"conversation,omitempty"` + ConversationID string `json:"conversation_id,omitempty"` // For deletes +} + // Server manages the HTTP API and active conversations type Server struct { db *db.DB @@ -650,6 +659,12 @@ func (s *Server) notifySubscribers(ctx context.Context, conversationID string) { Conversation: conversation, } manager.subpub.Publish(latestSequenceID, streamData) + + // Also notify conversation list subscribers (e.g., slug change) + s.publishConversationListUpdate(ConversationListUpdate{ + Type: "update", + Conversation: &conversation, + }) } // notifySubscribersNewMessage sends a single new message to all subscribers. @@ -689,6 +704,28 @@ func (s *Server) notifySubscribersNewMessage(ctx context.Context, conversationID ContextWindowSize: calculateContextWindowSizeFromMsg(newMsg), } manager.subpub.Publish(newMsg.SequenceID, streamData) + + // Also notify conversation list subscribers about the update (updated_at changed) + s.publishConversationListUpdate(ConversationListUpdate{ + Type: "update", + Conversation: &conversation, + }) +} + +// publishConversationListUpdate broadcasts a conversation list update to ALL active +// conversation streams. This allows clients to receive updates about other conversations +// while they're subscribed to their current conversation's stream. +func (s *Server) publishConversationListUpdate(update ConversationListUpdate) { + s.mu.Lock() + defer s.mu.Unlock() + + // Broadcast to all active conversation managers + for _, manager := range s.activeConversations { + streamData := StreamResponse{ + ConversationListUpdate: &update, + } + manager.subpub.Broadcast(streamData) + } } // Cleanup removes inactive conversation managers diff --git a/server/sse_immediacy_test.go b/server/sse_immediacy_test.go index f94025cf1969f4c8f029ed068b2854e26dd713c0..ddfdc39846e557a4e7d664988f46fd482666f215 100644 --- a/server/sse_immediacy_test.go +++ b/server/sse_immediacy_test.go @@ -20,7 +20,7 @@ import ( ) // flusherRecorder wraps httptest.ResponseRecorder to implement http.Flusher -// and provide immediate access to written data +// and provide immediate access to written data in a thread-safe manner type flusherRecorder struct { *httptest.ResponseRecorder mu sync.Mutex @@ -35,6 +35,13 @@ func newFlusherRecorder() *flusherRecorder { } } +// Write overrides ResponseRecorder.Write to provide thread-safe access +func (f *flusherRecorder) Write(p []byte) (int, error) { + f.mu.Lock() + defer f.mu.Unlock() + return f.ResponseRecorder.Write(p) +} + func (f *flusherRecorder) Flush() { f.mu.Lock() body := f.Body.String() @@ -55,6 +62,13 @@ func (f *flusherRecorder) getChunks() []string { return result } +// getString returns the current body contents in a thread-safe manner +func (f *flusherRecorder) getString() string { + f.mu.Lock() + defer f.mu.Unlock() + return f.Body.String() +} + // TestSSEUserMessageAppearsImmediately tests that when a user sends a message, // the message appears in the SSE stream immediately, before the LLM responds. func TestSSEUserMessageAppearsImmediately(t *testing.T) { @@ -126,13 +140,13 @@ func TestSSEUserMessageAppearsImmediately(t *testing.T) { select { case <-sseRecorder.flushed: // Check if user message is now in the stream - body := sseRecorder.Body.String() + body := sseRecorder.getString() if containsUserMessage(body, "delay: 3") { userMessageFound = true } case <-time.After(50 * time.Millisecond): // Also check current body - body := sseRecorder.Body.String() + body := sseRecorder.getString() if containsUserMessage(body, "delay: 3") { userMessageFound = true } @@ -145,7 +159,7 @@ func TestSSEUserMessageAppearsImmediately(t *testing.T) { if !userMessageFound { t.Errorf("BUG: user message did not appear in SSE stream within 500ms (LLM has 3s delay)") t.Log("This likely means notifySubscribers is not being called immediately after recording the user message") - t.Logf("SSE body so far: %s", sseRecorder.Body.String()) + t.Logf("SSE body so far: %s", sseRecorder.getString()) } else { t.Log("SUCCESS: user message appeared in SSE stream immediately") } diff --git a/subpub/subpub.go b/subpub/subpub.go index 91ff2562c4c30644eeb6e9e4327daddcde98f628..96927eda2a51714effb001a93414b917e1abf3f9 100644 --- a/subpub/subpub.go +++ b/subpub/subpub.go @@ -106,3 +106,30 @@ func (sp *SubPub[K]) Publish(idx int64, message K) { } sp.subscribers = remaining } + +// Broadcast sends a message to ALL subscribers regardless of their current index. +// This is used for out-of-band notifications like conversation list updates. +func (sp *SubPub[K]) Broadcast(message K) { + sp.mu.Lock() + defer sp.mu.Unlock() + + remaining := sp.subscribers[:0] + for _, sub := range sp.subscribers { + select { + case <-sub.ctx.Done(): + close(sub.ch) + continue + default: + } + + select { + case sub.ch <- message: + remaining = append(remaining, sub) + default: + // Channel full, disconnect + close(sub.ch) + sub.cancel() + } + } + sp.subscribers = remaining +} diff --git a/ui/src/App.tsx b/ui/src/App.tsx index 53519b1b83181435dfab3268f223886d2ce7bc9c..610ace62b7e8ba3b0aca046588e864d31b510440 100644 --- a/ui/src/App.tsx +++ b/ui/src/App.tsx @@ -1,7 +1,7 @@ import React, { useState, useEffect, useCallback, useRef } from "react"; import ChatInterface from "./components/ChatInterface"; import ConversationDrawer from "./components/ConversationDrawer"; -import { Conversation } from "./types"; +import { Conversation, ConversationListUpdate } from "./types"; import { api } from "./services/api"; // Check if a slug is a generated ID (format: cXXXX where X is alphanumeric) @@ -102,6 +102,39 @@ function App() { loadConversations(); }, []); + // Handle conversation list updates from the message stream + const handleConversationListUpdate = useCallback((update: ConversationListUpdate) => { + if (update.type === "update" && update.conversation) { + setConversations((prev) => { + // Check if this conversation already exists + const existingIndex = prev.findIndex( + (c) => c.conversation_id === update.conversation!.conversation_id, + ); + + if (existingIndex >= 0) { + // Update existing conversation + const updated = [...prev]; + updated[existingIndex] = update.conversation!; + // Re-sort by updated_at descending + updated.sort( + (a, b) => new Date(b.updated_at).getTime() - new Date(a.updated_at).getTime(), + ); + return updated; + } else { + // Add new conversation at the appropriate position + const updated = [update.conversation!, ...prev]; + // Sort by updated_at descending + updated.sort( + (a, b) => new Date(b.updated_at).getTime() - new Date(a.updated_at).getTime(), + ); + return updated; + } + }); + } else if (update.type === "delete" && update.conversation_id) { + setConversations((prev) => prev.filter((c) => c.conversation_id !== update.conversation_id)); + } + }, []); + // Update page title and URL when conversation changes useEffect(() => { const currentConv = conversations.find( @@ -254,6 +287,7 @@ function App() { onNewConversation={startNewConversation} currentConversation={currentConversation} onConversationUpdate={updateConversation} + onConversationListUpdate={handleConversationListUpdate} onFirstMessage={handleFirstMessage} mostRecentCwd={mostRecentCwd} isDrawerCollapsed={drawerCollapsed} diff --git a/ui/src/components/ChatInterface.tsx b/ui/src/components/ChatInterface.tsx index 1f60c0d2c53afa3cf04a3087fc111816532aba12..5a2dba3d6f2ea502f3fbc8eba49c6972c39f1c28 100644 --- a/ui/src/components/ChatInterface.tsx +++ b/ui/src/components/ChatInterface.tsx @@ -1,5 +1,11 @@ import React, { useState, useEffect, useRef } from "react"; -import { Message, Conversation, StreamResponse, LLMContent } from "../types"; +import { + Message, + Conversation, + StreamResponse, + LLMContent, + ConversationListUpdate, +} from "../types"; import { api } from "../services/api"; import { ThemeMode, getStoredTheme, setStoredTheme, applyTheme } from "../services/theme"; import MessageComponent from "./Message"; @@ -352,6 +358,7 @@ interface ChatInterfaceProps { onNewConversation: () => void; currentConversation?: Conversation; onConversationUpdate?: (conversation: Conversation) => void; + onConversationListUpdate?: (update: ConversationListUpdate) => void; onFirstMessage?: (message: string, model: string, cwd?: string) => Promise; mostRecentCwd?: string | null; isDrawerCollapsed?: boolean; @@ -364,6 +371,7 @@ function ChatInterface({ onNewConversation, currentConversation, onConversationUpdate, + onConversationListUpdate, onFirstMessage, mostRecentCwd, isDrawerCollapsed, @@ -613,10 +621,15 @@ function ChatInterface({ } // Update conversation data if provided - if (onConversationUpdate) { + if (onConversationUpdate && streamResponse.conversation) { onConversationUpdate(streamResponse.conversation); } + // Handle conversation list updates (for other conversations) + if (onConversationListUpdate && streamResponse.conversation_list_update) { + onConversationListUpdate(streamResponse.conversation_list_update); + } + if (typeof streamResponse.agent_working === "boolean") { setAgentWorking(streamResponse.agent_working); } diff --git a/ui/src/types.ts b/ui/src/types.ts index 739bd41cb45685be617d7a60f44eb122918314f6..5585cc322dfa735bc35aae0ba9a7a018acd121e2 100644 --- a/ui/src/types.ts +++ b/ui/src/types.ts @@ -60,6 +60,7 @@ export interface ChatRequest { export interface StreamResponse extends Omit { messages: Message[]; context_window_size?: number; + conversation_list_update?: ConversationListUpdate; } // Link represents a custom link that can be added to the UI @@ -123,3 +124,10 @@ export interface DiffComment { filePath: string; diffId: string; } + +// Conversation list streaming update +export interface ConversationListUpdate { + type: "update" | "delete"; + conversation?: Conversation; + conversation_id?: string; // For deletes +}