1package server
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "log/slog"
8 "net"
9 "net/http"
10 "os"
11 "os/exec"
12 "os/signal"
13 "path/filepath"
14 "strings"
15 "sync"
16 "syscall"
17 "time"
18
19 "tailscale.com/util/singleflight"
20
21 "shelley.exe.dev/claudetool"
22 "shelley.exe.dev/db"
23 "shelley.exe.dev/db/generated"
24 "shelley.exe.dev/llm"
25 "shelley.exe.dev/models"
26 "shelley.exe.dev/ui"
27)
28
29// APIMessage is the message format sent to clients
30// TODO: We could maybe omit llm_data when display_data is available
31type APIMessage struct {
32 MessageID string `json:"message_id"`
33 ConversationID string `json:"conversation_id"`
34 SequenceID int64 `json:"sequence_id"`
35 Type string `json:"type"`
36 LlmData *string `json:"llm_data,omitempty"`
37 UserData *string `json:"user_data,omitempty"`
38 UsageData *string `json:"usage_data,omitempty"`
39 CreatedAt time.Time `json:"created_at"`
40 DisplayData *string `json:"display_data,omitempty"`
41 EndOfTurn *bool `json:"end_of_turn,omitempty"`
42}
43
44// ConversationState represents the current state of a conversation.
45// This is broadcast to all subscribers whenever the state changes.
46type ConversationState struct {
47 ConversationID string `json:"conversation_id"`
48 Working bool `json:"working"`
49 Model string `json:"model,omitempty"`
50}
51
52// ConversationWithState combines a conversation with its working state.
53type ConversationWithState struct {
54 generated.Conversation
55 Working bool `json:"working"`
56}
57
58// StreamResponse represents the response format for conversation streaming
59type StreamResponse struct {
60 Messages []APIMessage `json:"messages"`
61 Conversation generated.Conversation `json:"conversation"`
62 ConversationState *ConversationState `json:"conversation_state,omitempty"`
63 ContextWindowSize uint64 `json:"context_window_size,omitempty"`
64 // ConversationListUpdate is set when another conversation in the list changed
65 ConversationListUpdate *ConversationListUpdate `json:"conversation_list_update,omitempty"`
66 // Heartbeat indicates this is a heartbeat message (no new data, just keeping connection alive)
67 Heartbeat bool `json:"heartbeat,omitempty"`
68}
69
70// LLMProvider is an interface for getting LLM services
71type LLMProvider interface {
72 GetService(modelID string) (llm.Service, error)
73 GetAvailableModels() []string
74 HasModel(modelID string) bool
75 GetModelInfo(modelID string) *models.ModelInfo
76 RefreshCustomModels() error
77}
78
79// NewLLMServiceManager creates a new LLM service manager from config
80func NewLLMServiceManager(cfg *LLMConfig) LLMProvider {
81 // Convert LLMConfig to models.Config
82 modelConfig := &models.Config{
83 AnthropicAPIKey: cfg.AnthropicAPIKey,
84 OpenAIAPIKey: cfg.OpenAIAPIKey,
85 GeminiAPIKey: cfg.GeminiAPIKey,
86 FireworksAPIKey: cfg.FireworksAPIKey,
87 Gateway: cfg.Gateway,
88 Logger: cfg.Logger,
89 DB: cfg.DB,
90 }
91
92 manager, err := models.NewManager(modelConfig)
93 if err != nil {
94 // This shouldn't happen in practice, but handle it gracefully
95 cfg.Logger.Error("Failed to create models manager", "error", err)
96 }
97
98 return manager
99}
100
101// toAPIMessages converts database messages to API messages.
102// When display_data is present (tool results), llm_data is omitted to save bandwidth
103// since the display_data contains all information needed for UI rendering.
104func toAPIMessages(messages []generated.Message) []APIMessage {
105 apiMessages := make([]APIMessage, len(messages))
106 for i, msg := range messages {
107 var endOfTurnPtr *bool
108 if msg.LlmData != nil && msg.Type == string(db.MessageTypeAgent) {
109 if endOfTurn, ok := extractEndOfTurn(*msg.LlmData); ok {
110 endOfTurnCopy := endOfTurn
111 endOfTurnPtr = &endOfTurnCopy
112 }
113 }
114
115 // TODO: Consider omitting llm_data when display_data is present to save bandwidth.
116 // The display_data contains all info needed for UI rendering of tool results,
117 // but the UI currently still uses llm_data for some checks.
118
119 apiMsg := APIMessage{
120 MessageID: msg.MessageID,
121 ConversationID: msg.ConversationID,
122 SequenceID: msg.SequenceID,
123 Type: msg.Type,
124 LlmData: msg.LlmData,
125 UserData: msg.UserData,
126 UsageData: msg.UsageData,
127 CreatedAt: msg.CreatedAt,
128 DisplayData: msg.DisplayData,
129 EndOfTurn: endOfTurnPtr,
130 }
131 apiMessages[i] = apiMsg
132 }
133 return apiMessages
134}
135
136func extractEndOfTurn(raw string) (bool, bool) {
137 var message llm.Message
138 if err := json.Unmarshal([]byte(raw), &message); err != nil {
139 return false, false
140 }
141 return message.EndOfTurn, true
142}
143
144// calculateContextWindowSize returns the context window usage from the most recent message with non-zero usage.
145// Each API call's input tokens represent the full conversation history sent to the model,
146// so we only need the last message's tokens (not accumulated across all messages).
147// The total input includes regular input tokens plus cached tokens (both read and created).
148// Messages without usage data (user messages, tool messages, etc.) are skipped.
149func calculateContextWindowSize(messages []APIMessage) uint64 {
150 // Find the last message with non-zero usage data
151 for i := len(messages) - 1; i >= 0; i-- {
152 msg := messages[i]
153 if msg.UsageData == nil {
154 continue
155 }
156 var usage llm.Usage
157 if err := json.Unmarshal([]byte(*msg.UsageData), &usage); err != nil {
158 continue
159 }
160 ctxUsed := usage.ContextWindowUsed()
161 if ctxUsed == 0 {
162 continue
163 }
164 // Return total context window used: all input tokens + output tokens
165 // This represents the full context that would be sent for the next turn
166 return ctxUsed
167 }
168 return 0
169}
170
171// isAgentEndOfTurn checks if a message is an agent or error message with end_of_turn=true.
172// This indicates the agent loop has finished processing.
173func isAgentEndOfTurn(msg *generated.Message) bool {
174 if msg == nil {
175 return false
176 }
177 // Agent and error messages can have end_of_turn
178 if msg.Type != string(db.MessageTypeAgent) && msg.Type != string(db.MessageTypeError) {
179 return false
180 }
181 if msg.LlmData == nil {
182 return false
183 }
184 endOfTurn, ok := extractEndOfTurn(*msg.LlmData)
185 if !ok {
186 return false
187 }
188 return endOfTurn
189}
190
191// calculateContextWindowSizeFromMsg calculates context window usage from a single message.
192// Returns 0 if the message has no usage data (e.g., user messages), in which case
193// the client should keep its previous context window value.
194func calculateContextWindowSizeFromMsg(msg *generated.Message) uint64 {
195 if msg == nil || msg.UsageData == nil {
196 return 0
197 }
198 var usage llm.Usage
199 if err := json.Unmarshal([]byte(*msg.UsageData), &usage); err != nil {
200 return 0
201 }
202 return usage.ContextWindowUsed()
203}
204
205// ConversationListUpdate represents an update to the conversation list
206type ConversationListUpdate struct {
207 Type string `json:"type"` // "update", "delete"
208 Conversation *generated.Conversation `json:"conversation,omitempty"`
209 ConversationID string `json:"conversation_id,omitempty"` // For deletes
210}
211
212// Server manages the HTTP API and active conversations
213type Server struct {
214 db *db.DB
215 llmManager LLMProvider
216 toolSetConfig claudetool.ToolSetConfig
217 activeConversations map[string]*ConversationManager
218 mu sync.Mutex
219 logger *slog.Logger
220 predictableOnly bool
221 terminalURL string
222 defaultModel string
223 links []Link
224 requireHeader string
225 conversationGroup singleflight.Group[string, *ConversationManager]
226 versionChecker *VersionChecker
227}
228
229// NewServer creates a new server instance
230func NewServer(database *db.DB, llmManager LLMProvider, toolSetConfig claudetool.ToolSetConfig, logger *slog.Logger, predictableOnly bool, terminalURL, defaultModel, requireHeader string, links []Link) *Server {
231 s := &Server{
232 db: database,
233 llmManager: llmManager,
234 toolSetConfig: toolSetConfig,
235 activeConversations: make(map[string]*ConversationManager),
236 logger: logger,
237 predictableOnly: predictableOnly,
238 terminalURL: terminalURL,
239 defaultModel: defaultModel,
240 requireHeader: requireHeader,
241 links: links,
242 versionChecker: NewVersionChecker(),
243 }
244
245 // Set up subagent support
246 s.toolSetConfig.SubagentRunner = NewSubagentRunner(s)
247 s.toolSetConfig.SubagentDB = &db.SubagentDBAdapter{DB: database}
248
249 return s
250}
251
252// RegisterRoutes registers HTTP routes on the given mux
253func (s *Server) RegisterRoutes(mux *http.ServeMux) {
254 // API routes - wrap with gzip where beneficial
255 mux.Handle("/api/conversations", gzipHandler(http.HandlerFunc(s.handleConversations)))
256 mux.Handle("/api/conversations/archived", gzipHandler(http.HandlerFunc(s.handleArchivedConversations)))
257 mux.Handle("/api/conversations/new", http.HandlerFunc(s.handleNewConversation)) // Small response
258 mux.Handle("/api/conversations/continue", http.HandlerFunc(s.handleContinueConversation)) // Small response
259 mux.Handle("/api/conversation/", http.StripPrefix("/api/conversation", s.conversationMux()))
260 mux.Handle("/api/conversation-by-slug/", gzipHandler(http.HandlerFunc(s.handleConversationBySlug)))
261 mux.Handle("/api/validate-cwd", http.HandlerFunc(s.handleValidateCwd)) // Small response
262 mux.Handle("/api/list-directory", gzipHandler(http.HandlerFunc(s.handleListDirectory)))
263 mux.Handle("/api/create-directory", http.HandlerFunc(s.handleCreateDirectory))
264 mux.Handle("/api/git/diffs", gzipHandler(http.HandlerFunc(s.handleGitDiffs)))
265 mux.Handle("/api/git/diffs/", gzipHandler(http.HandlerFunc(s.handleGitDiffFiles)))
266 mux.Handle("/api/git/file-diff/", gzipHandler(http.HandlerFunc(s.handleGitFileDiff)))
267 mux.HandleFunc("/api/upload", s.handleUpload) // Binary uploads
268 mux.HandleFunc("/api/read", s.handleRead) // Serves images
269 mux.Handle("/api/write-file", http.HandlerFunc(s.handleWriteFile)) // Small response
270 mux.HandleFunc("/api/exec-ws", s.handleExecWS) // Websocket for shell commands
271
272 // Custom models API
273 mux.Handle("/api/custom-models", http.HandlerFunc(s.handleCustomModels))
274 mux.Handle("/api/custom-models/", http.HandlerFunc(s.handleCustomModel))
275 mux.Handle("/api/custom-models-test", http.HandlerFunc(s.handleTestModel))
276
277 // Models API (dynamic list refresh)
278 mux.Handle("/api/models", http.HandlerFunc(s.handleModels))
279
280 // Version endpoints
281 mux.Handle("GET /version", http.HandlerFunc(s.handleVersion))
282 mux.Handle("GET /version-check", http.HandlerFunc(s.handleVersionCheck))
283 mux.Handle("GET /version-changelog", http.HandlerFunc(s.handleVersionChangelog))
284 mux.Handle("POST /upgrade", http.HandlerFunc(s.handleUpgrade))
285 mux.Handle("POST /exit", http.HandlerFunc(s.handleExit))
286
287 // Debug endpoints
288 mux.Handle("GET /debug/conversations", http.HandlerFunc(s.handleDebugConversationsPage))
289 mux.Handle("GET /debug/llm_requests", http.HandlerFunc(s.handleDebugLLMRequests))
290 mux.Handle("GET /debug/llm_requests/api", http.HandlerFunc(s.handleDebugLLMRequestsAPI))
291 mux.Handle("GET /debug/llm_requests/{id}/request", http.HandlerFunc(s.handleDebugLLMRequestBody))
292 mux.Handle("GET /debug/llm_requests/{id}/request_full", http.HandlerFunc(s.handleDebugLLMRequestBodyFull))
293 mux.Handle("GET /debug/llm_requests/{id}/response", http.HandlerFunc(s.handleDebugLLMResponseBody))
294
295 // Serve embedded UI assets
296 mux.Handle("/", s.staticHandler(ui.Assets()))
297}
298
299// handleValidateCwd validates that a path exists and is a directory
300func (s *Server) handleValidateCwd(w http.ResponseWriter, r *http.Request) {
301 if r.Method != http.MethodGet {
302 http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
303 return
304 }
305
306 path := r.URL.Query().Get("path")
307 if path == "" {
308 w.Header().Set("Content-Type", "application/json")
309 json.NewEncoder(w).Encode(map[string]interface{}{
310 "valid": false,
311 "error": "path is required",
312 })
313 return
314 }
315
316 info, err := os.Stat(path)
317 if err != nil {
318 w.Header().Set("Content-Type", "application/json")
319 if os.IsNotExist(err) {
320 json.NewEncoder(w).Encode(map[string]interface{}{
321 "valid": false,
322 "error": "directory does not exist",
323 })
324 } else {
325 json.NewEncoder(w).Encode(map[string]interface{}{
326 "valid": false,
327 "error": err.Error(),
328 })
329 }
330 return
331 }
332
333 if !info.IsDir() {
334 w.Header().Set("Content-Type", "application/json")
335 json.NewEncoder(w).Encode(map[string]interface{}{
336 "valid": false,
337 "error": "path is not a directory",
338 })
339 return
340 }
341
342 w.Header().Set("Content-Type", "application/json")
343 json.NewEncoder(w).Encode(map[string]interface{}{
344 "valid": true,
345 })
346}
347
348// DirectoryEntry represents a single directory entry for the directory picker
349type DirectoryEntry struct {
350 Name string `json:"name"`
351 IsDir bool `json:"is_dir"`
352 GitHeadSubject string `json:"git_head_subject,omitempty"`
353}
354
355// ListDirectoryResponse is the response from the list-directory endpoint
356type ListDirectoryResponse struct {
357 Path string `json:"path"`
358 Parent string `json:"parent"`
359 Entries []DirectoryEntry `json:"entries"`
360 GitHeadSubject string `json:"git_head_subject,omitempty"`
361}
362
363// handleListDirectory lists the contents of a directory for the directory picker
364func (s *Server) handleListDirectory(w http.ResponseWriter, r *http.Request) {
365 if r.Method != http.MethodGet {
366 http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
367 return
368 }
369
370 path := r.URL.Query().Get("path")
371 if path == "" {
372 // Default to home directory or root
373 homeDir, err := os.UserHomeDir()
374 if err != nil {
375 path = "/"
376 } else {
377 path = homeDir
378 }
379 }
380
381 // Clean and resolve the path
382 path = filepath.Clean(path)
383
384 // Verify path exists and is a directory
385 info, err := os.Stat(path)
386 if err != nil {
387 w.Header().Set("Content-Type", "application/json")
388 if os.IsNotExist(err) {
389 json.NewEncoder(w).Encode(map[string]interface{}{
390 "error": "directory does not exist",
391 })
392 } else if os.IsPermission(err) {
393 json.NewEncoder(w).Encode(map[string]interface{}{
394 "error": "permission denied",
395 })
396 } else {
397 json.NewEncoder(w).Encode(map[string]interface{}{
398 "error": err.Error(),
399 })
400 }
401 return
402 }
403
404 if !info.IsDir() {
405 w.Header().Set("Content-Type", "application/json")
406 json.NewEncoder(w).Encode(map[string]interface{}{
407 "error": "path is not a directory",
408 })
409 return
410 }
411
412 // Read directory contents
413 dirEntries, err := os.ReadDir(path)
414 if err != nil {
415 w.Header().Set("Content-Type", "application/json")
416 if os.IsPermission(err) {
417 json.NewEncoder(w).Encode(map[string]interface{}{
418 "error": "permission denied",
419 })
420 } else {
421 json.NewEncoder(w).Encode(map[string]interface{}{
422 "error": err.Error(),
423 })
424 }
425 return
426 }
427
428 // Build response with only directories (for directory picker)
429 var entries []DirectoryEntry
430 for _, entry := range dirEntries {
431 // Only include directories
432 if entry.IsDir() {
433 dirEntry := DirectoryEntry{
434 Name: entry.Name(),
435 IsDir: true,
436 }
437
438 // Check if this is a git repo root and get HEAD commit subject
439 entryPath := filepath.Join(path, entry.Name())
440 if isGitRepo(entryPath) {
441 if subject := getGitHeadSubject(entryPath); subject != "" {
442 dirEntry.GitHeadSubject = subject
443 }
444 }
445
446 entries = append(entries, dirEntry)
447 }
448 }
449
450 // Calculate parent directory
451 parent := filepath.Dir(path)
452 if parent == path {
453 // At root, no parent
454 parent = ""
455 }
456
457 response := ListDirectoryResponse{
458 Path: path,
459 Parent: parent,
460 Entries: entries,
461 }
462
463 // Check if the current directory itself is a git repo
464 if isGitRepo(path) {
465 response.GitHeadSubject = getGitHeadSubject(path)
466 }
467
468 w.Header().Set("Content-Type", "application/json")
469 json.NewEncoder(w).Encode(response)
470}
471
472// getGitHeadSubject returns the subject line of HEAD commit for a git repository.
473// Returns empty string if unable to get the subject.
474// isGitRepo checks if the given path is a git repository root.
475// Returns true for both regular repos (.git directory) and worktrees (.git file with gitdir:).
476func isGitRepo(dirPath string) bool {
477 gitPath := filepath.Join(dirPath, ".git")
478 fi, err := os.Stat(gitPath)
479 if err != nil {
480 return false
481 }
482 if fi.IsDir() {
483 return true // regular .git directory
484 }
485 if fi.Mode().IsRegular() {
486 // Check if it's a worktree .git file
487 content, err := os.ReadFile(gitPath)
488 if err == nil && strings.HasPrefix(string(content), "gitdir:") {
489 return true
490 }
491 }
492 return false
493}
494
495// getGitHeadSubject returns the subject line of HEAD commit for a git repository.
496// Returns empty string if unable to get the subject.
497func getGitHeadSubject(repoPath string) string {
498 cmd := exec.Command("git", "log", "-1", "--format=%s")
499 cmd.Dir = repoPath
500 output, err := cmd.Output()
501 if err != nil {
502 return ""
503 }
504 return strings.TrimSpace(string(output))
505}
506
507// handleCreateDirectory creates a new directory
508func (s *Server) handleCreateDirectory(w http.ResponseWriter, r *http.Request) {
509 if r.Method != http.MethodPost {
510 http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
511 return
512 }
513
514 var req struct {
515 Path string `json:"path"`
516 }
517 if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
518 w.Header().Set("Content-Type", "application/json")
519 json.NewEncoder(w).Encode(map[string]interface{}{
520 "error": "invalid request body",
521 })
522 return
523 }
524
525 if req.Path == "" {
526 w.Header().Set("Content-Type", "application/json")
527 json.NewEncoder(w).Encode(map[string]interface{}{
528 "error": "path is required",
529 })
530 return
531 }
532
533 // Clean the path
534 path := filepath.Clean(req.Path)
535
536 // Check if path already exists
537 if _, err := os.Stat(path); err == nil {
538 w.Header().Set("Content-Type", "application/json")
539 json.NewEncoder(w).Encode(map[string]interface{}{
540 "error": "path already exists",
541 })
542 return
543 }
544
545 // Verify parent directory exists
546 parentDir := filepath.Dir(path)
547 if _, err := os.Stat(parentDir); os.IsNotExist(err) {
548 w.Header().Set("Content-Type", "application/json")
549 json.NewEncoder(w).Encode(map[string]interface{}{
550 "error": "parent directory does not exist",
551 })
552 return
553 }
554
555 // Create the directory (only the final directory, not parents)
556 if err := os.Mkdir(path, 0o755); err != nil {
557 w.Header().Set("Content-Type", "application/json")
558 if os.IsPermission(err) {
559 json.NewEncoder(w).Encode(map[string]interface{}{
560 "error": "permission denied",
561 })
562 } else {
563 json.NewEncoder(w).Encode(map[string]interface{}{
564 "error": err.Error(),
565 })
566 }
567 return
568 }
569
570 w.Header().Set("Content-Type", "application/json")
571 json.NewEncoder(w).Encode(map[string]interface{}{
572 "path": path,
573 })
574}
575
576// getOrCreateConversationManager gets an existing conversation manager or creates a new one.
577func (s *Server) getOrCreateConversationManager(ctx context.Context, conversationID string) (*ConversationManager, error) {
578 manager, err, _ := s.conversationGroup.Do(conversationID, func() (*ConversationManager, error) {
579 s.mu.Lock()
580 defer s.mu.Unlock()
581 if manager, exists := s.activeConversations[conversationID]; exists {
582 manager.Touch()
583 return manager, nil
584 }
585
586 recordMessage := func(ctx context.Context, message llm.Message, usage llm.Usage) error {
587 return s.recordMessage(ctx, conversationID, message, usage)
588 }
589
590 onStateChange := func(state ConversationState) {
591 s.publishConversationState(state)
592 }
593
594 manager := NewConversationManager(conversationID, s.db, s.logger, s.toolSetConfig, recordMessage, onStateChange)
595 if err := manager.Hydrate(ctx); err != nil {
596 return nil, err
597 }
598
599 s.activeConversations[conversationID] = manager
600 return manager, nil
601 })
602 if err != nil {
603 return nil, err
604 }
605 return manager, nil
606}
607
608// ExtractDisplayData extracts display data from message content for storage
609func ExtractDisplayData(message llm.Message) interface{} {
610 // Build a map of tool_use_id to tool_name for lookups
611 toolNameMap := make(map[string]string)
612 for _, content := range message.Content {
613 if content.Type == llm.ContentTypeToolUse {
614 toolNameMap[content.ID] = content.ToolName
615 }
616 }
617
618 var displayData []any
619 for _, content := range message.Content {
620 if content.Type == llm.ContentTypeToolResult && content.Display != nil {
621 // Include tool name if we can find it
622 toolName := toolNameMap[content.ToolUseID]
623 displayData = append(displayData, map[string]any{
624 "tool_use_id": content.ToolUseID,
625 "tool_name": toolName,
626 "display": content.Display,
627 })
628 }
629 }
630
631 if len(displayData) > 0 {
632 return displayData
633 }
634 return nil
635}
636
637// recordMessage records a new message to the database and also notifies subscribers
638func (s *Server) recordMessage(ctx context.Context, conversationID string, message llm.Message, usage llm.Usage) error {
639 // Log message based on role
640 if message.Role == llm.MessageRoleUser {
641 s.logger.Info("User message", "conversation_id", conversationID, "content_items", len(message.Content))
642 } else if message.Role == llm.MessageRoleAssistant {
643 s.logger.Info("Agent message", "conversation_id", conversationID, "content_items", len(message.Content), "end_of_turn", message.EndOfTurn)
644 }
645
646 // Convert LLM message to database format
647 messageType, err := s.getMessageType(message)
648 if err != nil {
649 return fmt.Errorf("failed to determine message type: %w", err)
650 }
651
652 // Extract display data from content items
653 displayDataToStore := ExtractDisplayData(message)
654
655 // Create message
656 createdMsg, err := s.db.CreateMessage(ctx, db.CreateMessageParams{
657 ConversationID: conversationID,
658 Type: messageType,
659 LLMData: message,
660 UserData: nil,
661 UsageData: usage,
662 DisplayData: displayDataToStore,
663 ExcludedFromContext: message.ExcludedFromContext,
664 })
665 if err != nil {
666 return fmt.Errorf("failed to create message: %w", err)
667 }
668
669 // Update conversation's last updated timestamp for correct ordering
670 if err := s.db.QueriesTx(ctx, func(q *generated.Queries) error {
671 return q.UpdateConversationTimestamp(ctx, conversationID)
672 }); err != nil {
673 s.logger.Warn("Failed to update conversation timestamp", "conversationID", conversationID, "error", err)
674 }
675
676 // Touch active manager activity time if present
677 s.mu.Lock()
678 mgr, ok := s.activeConversations[conversationID]
679 if ok {
680 mgr.Touch()
681 }
682 s.mu.Unlock()
683
684 // Notify subscribers with only the new message - use WithoutCancel because
685 // the HTTP request context may be cancelled after the handler returns, but
686 // we still want the notification to complete so SSE clients see the message immediately
687 go s.notifySubscribersNewMessage(context.WithoutCancel(ctx), conversationID, createdMsg)
688
689 return nil
690}
691
692// getMessageType determines the message type from an LLM message
693func (s *Server) getMessageType(message llm.Message) (db.MessageType, error) {
694 // System-generated errors are stored as error type
695 if message.ErrorType != llm.ErrorTypeNone {
696 return db.MessageTypeError, nil
697 }
698
699 switch message.Role {
700 case llm.MessageRoleUser:
701 return db.MessageTypeUser, nil
702 case llm.MessageRoleAssistant:
703 return db.MessageTypeAgent, nil
704 default:
705 // For tool messages, check if it's a tool call or tool result
706 for _, content := range message.Content {
707 if content.Type == llm.ContentTypeToolUse {
708 return db.MessageTypeTool, nil
709 }
710 if content.Type == llm.ContentTypeToolResult {
711 return db.MessageTypeTool, nil
712 }
713 }
714 return db.MessageTypeAgent, nil
715 }
716}
717
718// convertToLLMMessage converts a database message to an LLM message
719func convertToLLMMessage(msg generated.Message) (llm.Message, error) {
720 var llmMsg llm.Message
721 if msg.LlmData == nil {
722 return llm.Message{}, fmt.Errorf("message has no LLM data")
723 }
724 if err := json.Unmarshal([]byte(*msg.LlmData), &llmMsg); err != nil {
725 return llm.Message{}, fmt.Errorf("failed to unmarshal LLM data: %w", err)
726 }
727 return llmMsg, nil
728}
729
730// notifySubscribers sends conversation metadata updates (e.g., slug changes) to subscribers.
731// This is used when only the conversation data changes, not the messages.
732// Uses Broadcast instead of Publish to avoid racing with message sequence IDs.
733func (s *Server) notifySubscribers(ctx context.Context, conversationID string) {
734 s.mu.Lock()
735 manager, exists := s.activeConversations[conversationID]
736 s.mu.Unlock()
737
738 if !exists {
739 return
740 }
741
742 // Get conversation data only (no messages needed for metadata-only updates)
743 var conversation generated.Conversation
744 err := s.db.Queries(ctx, func(q *generated.Queries) error {
745 var err error
746 conversation, err = q.GetConversation(ctx, conversationID)
747 return err
748 })
749 if err != nil {
750 s.logger.Error("Failed to get conversation data for notification", "conversationID", conversationID, "error", err)
751 return
752 }
753
754 // Broadcast conversation update with no new messages.
755 // Using Broadcast instead of Publish ensures this metadata-only update
756 // doesn't race with notifySubscribersNewMessage which uses Publish with sequence IDs.
757 streamData := StreamResponse{
758 Messages: nil, // No new messages, just conversation update
759 Conversation: conversation,
760 }
761 manager.subpub.Broadcast(streamData)
762
763 // Also notify conversation list subscribers (e.g., slug change)
764 s.publishConversationListUpdate(ConversationListUpdate{
765 Type: "update",
766 Conversation: &conversation,
767 })
768}
769
770// notifySubscribersNewMessage sends a single new message to all subscribers.
771// This is more efficient than re-sending all messages on each update.
772func (s *Server) notifySubscribersNewMessage(ctx context.Context, conversationID string, newMsg *generated.Message) {
773 s.mu.Lock()
774 manager, exists := s.activeConversations[conversationID]
775 s.mu.Unlock()
776
777 if !exists {
778 return
779 }
780
781 // Get conversation data for the response
782 var conversation generated.Conversation
783 err := s.db.Queries(ctx, func(q *generated.Queries) error {
784 var err error
785 conversation, err = q.GetConversation(ctx, conversationID)
786 return err
787 })
788 if err != nil {
789 s.logger.Error("Failed to get conversation data for notification", "conversationID", conversationID, "error", err)
790 return
791 }
792
793 // Convert the single new message to API format
794 apiMessages := toAPIMessages([]generated.Message{*newMsg})
795
796 // Update agent working state based on message type
797 if isAgentEndOfTurn(newMsg) {
798 manager.SetAgentWorking(false)
799 }
800
801 // Publish only the new message
802 streamData := StreamResponse{
803 Messages: apiMessages,
804 Conversation: conversation,
805 // ContextWindowSize: 0 for messages without usage data (user/tool messages).
806 // With omitempty, 0 is omitted from JSON, so the UI keeps its cached value.
807 // Only agent messages have usage data, so context window updates when they arrive.
808 ContextWindowSize: calculateContextWindowSizeFromMsg(newMsg),
809 }
810 manager.subpub.Publish(newMsg.SequenceID, streamData)
811
812 // Also notify conversation list subscribers about the update (updated_at changed)
813 s.publishConversationListUpdate(ConversationListUpdate{
814 Type: "update",
815 Conversation: &conversation,
816 })
817}
818
819// publishConversationListUpdate broadcasts a conversation list update to ALL active
820// conversation streams. This allows clients to receive updates about other conversations
821// while they're subscribed to their current conversation's stream.
822func (s *Server) publishConversationListUpdate(update ConversationListUpdate) {
823 s.mu.Lock()
824 defer s.mu.Unlock()
825
826 // Broadcast to all active conversation managers
827 for _, manager := range s.activeConversations {
828 streamData := StreamResponse{
829 ConversationListUpdate: &update,
830 }
831 manager.subpub.Broadcast(streamData)
832 }
833}
834
835// publishConversationState broadcasts a conversation state update to ALL active
836// conversation streams. This allows clients to see the working state of other conversations.
837func (s *Server) publishConversationState(state ConversationState) {
838 s.mu.Lock()
839 defer s.mu.Unlock()
840
841 // Broadcast to all active conversation managers
842 for _, manager := range s.activeConversations {
843 streamData := StreamResponse{
844 ConversationState: &state,
845 }
846 manager.subpub.Broadcast(streamData)
847 }
848}
849
850// getWorkingConversations returns a map of conversation IDs that are currently working.
851func (s *Server) getWorkingConversations() map[string]bool {
852 s.mu.Lock()
853 defer s.mu.Unlock()
854
855 working := make(map[string]bool)
856 for id, manager := range s.activeConversations {
857 if manager.IsAgentWorking() {
858 working[id] = true
859 }
860 }
861 return working
862}
863
864// IsAgentWorking returns whether the agent is currently working on the given conversation.
865// Returns false if the conversation doesn't have an active manager.
866func (s *Server) IsAgentWorking(conversationID string) bool {
867 s.mu.Lock()
868 manager, exists := s.activeConversations[conversationID]
869 s.mu.Unlock()
870 if !exists {
871 return false
872 }
873 return manager.IsAgentWorking()
874}
875
876// Cleanup removes inactive conversation managers
877func (s *Server) Cleanup() {
878 s.mu.Lock()
879 defer s.mu.Unlock()
880
881 now := time.Now()
882 for id, manager := range s.activeConversations {
883 // Remove managers that have been inactive for more than 30 minutes
884 manager.mu.Lock()
885 lastActivity := manager.lastActivity
886 manager.mu.Unlock()
887 if now.Sub(lastActivity) > 30*time.Minute {
888 manager.stopLoop()
889 delete(s.activeConversations, id)
890 s.logger.Debug("Cleaned up inactive conversation", "conversationID", id)
891 }
892 }
893}
894
895// Start starts the HTTP server and handles the complete lifecycle
896func (s *Server) Start(port string) error {
897 listener, err := net.Listen("tcp", ":"+port)
898 if err != nil {
899 s.logger.Error("Failed to create listener", "error", err, "port_info", getPortOwnerInfo(port))
900 return err
901 }
902 return s.StartWithListener(listener)
903}
904
905// StartWithListener starts the HTTP server using the provided listener.
906// This is useful for systemd socket activation where the listener is created externally.
907func (s *Server) StartWithListener(listener net.Listener) error {
908 // Set up HTTP server with routes and middleware
909 mux := http.NewServeMux()
910 s.RegisterRoutes(mux)
911
912 // Add middleware (applied in reverse order: last added = first executed)
913 handler := LoggerMiddleware(s.logger)(mux)
914 handler = CSRFMiddleware()(handler)
915 if s.requireHeader != "" {
916 handler = RequireHeaderMiddleware(s.requireHeader)(handler)
917 }
918
919 httpServer := &http.Server{
920 Handler: handler,
921 }
922
923 // Start cleanup routine
924 go func() {
925 ticker := time.NewTicker(5 * time.Minute)
926 defer ticker.Stop()
927 for range ticker.C {
928 s.Cleanup()
929 }
930 }()
931
932 // Get actual port from listener
933 actualPort := listener.Addr().(*net.TCPAddr).Port
934
935 // Start server in goroutine
936 serverErrCh := make(chan error, 1)
937 go func() {
938 s.logger.Info("Server starting", "port", actualPort, "url", fmt.Sprintf("http://localhost:%d", actualPort))
939 if err := httpServer.Serve(listener); err != nil && err != http.ErrServerClosed {
940 serverErrCh <- err
941 }
942 }()
943
944 // Wait for shutdown signal or server error
945 quit := make(chan os.Signal, 1)
946 signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
947
948 select {
949 case err := <-serverErrCh:
950 s.logger.Error("Server failed", "error", err)
951 return err
952 case <-quit:
953 s.logger.Info("Shutting down server")
954 }
955
956 // Graceful shutdown
957 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
958 defer cancel()
959
960 if err := httpServer.Shutdown(ctx); err != nil {
961 s.logger.Error("Server forced to shutdown", "error", err)
962 return err
963 }
964
965 s.logger.Info("Server exited")
966 return nil
967}
968
969// getPortOwnerInfo tries to identify what process is using a port.
970// Returns a human-readable string with the PID and process name, or an error message.
971func getPortOwnerInfo(port string) string {
972 // Use lsof to find the process using the port
973 cmd := exec.Command("lsof", "-i", ":"+port, "-sTCP:LISTEN", "-n", "-P")
974 output, err := cmd.Output()
975 if err != nil {
976 return fmt.Sprintf("(unable to determine: %v)", err)
977 }
978
979 lines := strings.Split(strings.TrimSpace(string(output)), "\n")
980 if len(lines) < 2 {
981 return "(no process found)"
982 }
983
984 // Parse lsof output: COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
985 // Skip the header line
986 for _, line := range lines[1:] {
987 fields := strings.Fields(line)
988 if len(fields) >= 2 {
989 command := fields[0]
990 pid := fields[1]
991 return fmt.Sprintf("pid=%s process=%s", pid, command)
992 }
993 }
994
995 return "(could not parse lsof output)"
996}