1package server
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "log/slog"
8 "net"
9 "net/http"
10 "os"
11 "os/exec"
12 "os/signal"
13 "path/filepath"
14 "strings"
15 "sync"
16 "syscall"
17 "time"
18
19 "tailscale.com/util/singleflight"
20
21 "shelley.exe.dev/claudetool"
22 "shelley.exe.dev/db"
23 "shelley.exe.dev/db/generated"
24 "shelley.exe.dev/llm"
25 "shelley.exe.dev/models"
26 "shelley.exe.dev/ui"
27)
28
29// APIMessage is the message format sent to clients
30// TODO: We could maybe omit llm_data when display_data is available
31type APIMessage struct {
32 MessageID string `json:"message_id"`
33 ConversationID string `json:"conversation_id"`
34 SequenceID int64 `json:"sequence_id"`
35 Type string `json:"type"`
36 LlmData *string `json:"llm_data,omitempty"`
37 UserData *string `json:"user_data,omitempty"`
38 UsageData *string `json:"usage_data,omitempty"`
39 CreatedAt time.Time `json:"created_at"`
40 DisplayData *string `json:"display_data,omitempty"`
41 EndOfTurn *bool `json:"end_of_turn,omitempty"`
42}
43
44// ConversationState represents the current state of a conversation.
45// This is broadcast to all subscribers whenever the state changes.
46type ConversationState struct {
47 ConversationID string `json:"conversation_id"`
48 Working bool `json:"working"`
49 Model string `json:"model,omitempty"`
50}
51
52// ConversationWithState combines a conversation with its working state.
53type ConversationWithState struct {
54 generated.Conversation
55 Working bool `json:"working"`
56}
57
58// StreamResponse represents the response format for conversation streaming
59type StreamResponse struct {
60 Messages []APIMessage `json:"messages"`
61 Conversation generated.Conversation `json:"conversation"`
62 ConversationState *ConversationState `json:"conversation_state,omitempty"`
63 ContextWindowSize uint64 `json:"context_window_size,omitempty"`
64 // ConversationListUpdate is set when another conversation in the list changed
65 ConversationListUpdate *ConversationListUpdate `json:"conversation_list_update,omitempty"`
66 // Heartbeat indicates this is a heartbeat message (no new data, just keeping connection alive)
67 Heartbeat bool `json:"heartbeat,omitempty"`
68}
69
70// LLMProvider is an interface for getting LLM services
71type LLMProvider interface {
72 GetService(modelID string) (llm.Service, error)
73 GetAvailableModels() []string
74 HasModel(modelID string) bool
75 GetModelInfo(modelID string) *models.ModelInfo
76 RefreshCustomModels() error
77}
78
79// NewLLMServiceManager creates a new LLM service manager from config
80func NewLLMServiceManager(cfg *LLMConfig) LLMProvider {
81 // Convert LLMConfig to models.Config
82 modelConfig := &models.Config{
83 AnthropicAPIKey: cfg.AnthropicAPIKey,
84 OpenAIAPIKey: cfg.OpenAIAPIKey,
85 GeminiAPIKey: cfg.GeminiAPIKey,
86 FireworksAPIKey: cfg.FireworksAPIKey,
87 Gateway: cfg.Gateway,
88 Logger: cfg.Logger,
89 DB: cfg.DB,
90 }
91
92 manager, err := models.NewManager(modelConfig)
93 if err != nil {
94 // This shouldn't happen in practice, but handle it gracefully
95 cfg.Logger.Error("Failed to create models manager", "error", err)
96 }
97
98 return manager
99}
100
101// toAPIMessages converts database messages to API messages.
102// When display_data is present (tool results), llm_data is omitted to save bandwidth
103// since the display_data contains all information needed for UI rendering.
104func toAPIMessages(messages []generated.Message) []APIMessage {
105 apiMessages := make([]APIMessage, len(messages))
106 for i, msg := range messages {
107 var endOfTurnPtr *bool
108 if msg.LlmData != nil && msg.Type == string(db.MessageTypeAgent) {
109 if endOfTurn, ok := extractEndOfTurn(*msg.LlmData); ok {
110 endOfTurnCopy := endOfTurn
111 endOfTurnPtr = &endOfTurnCopy
112 }
113 }
114
115 // TODO: Consider omitting llm_data when display_data is present to save bandwidth.
116 // The display_data contains all info needed for UI rendering of tool results,
117 // but the UI currently still uses llm_data for some checks.
118
119 apiMsg := APIMessage{
120 MessageID: msg.MessageID,
121 ConversationID: msg.ConversationID,
122 SequenceID: msg.SequenceID,
123 Type: msg.Type,
124 LlmData: msg.LlmData,
125 UserData: msg.UserData,
126 UsageData: msg.UsageData,
127 CreatedAt: msg.CreatedAt,
128 DisplayData: msg.DisplayData,
129 EndOfTurn: endOfTurnPtr,
130 }
131 apiMessages[i] = apiMsg
132 }
133 return apiMessages
134}
135
136func extractEndOfTurn(raw string) (bool, bool) {
137 var message llm.Message
138 if err := json.Unmarshal([]byte(raw), &message); err != nil {
139 return false, false
140 }
141 return message.EndOfTurn, true
142}
143
144// calculateContextWindowSize returns the context window usage from the most recent message with non-zero usage.
145// Each API call's input tokens represent the full conversation history sent to the model,
146// so we only need the last message's tokens (not accumulated across all messages).
147// The total input includes regular input tokens plus cached tokens (both read and created).
148// Messages without usage data (user messages, tool messages, etc.) are skipped.
149func calculateContextWindowSize(messages []APIMessage) uint64 {
150 // Find the last message with non-zero usage data
151 for i := len(messages) - 1; i >= 0; i-- {
152 msg := messages[i]
153 if msg.UsageData == nil {
154 continue
155 }
156 var usage llm.Usage
157 if err := json.Unmarshal([]byte(*msg.UsageData), &usage); err != nil {
158 continue
159 }
160 ctxUsed := usage.ContextWindowUsed()
161 if ctxUsed == 0 {
162 continue
163 }
164 // Return total context window used: all input tokens + output tokens
165 // This represents the full context that would be sent for the next turn
166 return ctxUsed
167 }
168 return 0
169}
170
171// isAgentEndOfTurn checks if a message is an agent or error message with end_of_turn=true.
172// This indicates the agent loop has finished processing.
173func isAgentEndOfTurn(msg *generated.Message) bool {
174 if msg == nil {
175 return false
176 }
177 // Agent and error messages can have end_of_turn
178 if msg.Type != string(db.MessageTypeAgent) && msg.Type != string(db.MessageTypeError) {
179 return false
180 }
181 if msg.LlmData == nil {
182 return false
183 }
184 endOfTurn, ok := extractEndOfTurn(*msg.LlmData)
185 if !ok {
186 return false
187 }
188 return endOfTurn
189}
190
191// calculateContextWindowSizeFromMsg calculates context window usage from a single message.
192// Returns 0 if the message has no usage data (e.g., user messages), in which case
193// the client should keep its previous context window value.
194func calculateContextWindowSizeFromMsg(msg *generated.Message) uint64 {
195 if msg == nil || msg.UsageData == nil {
196 return 0
197 }
198 var usage llm.Usage
199 if err := json.Unmarshal([]byte(*msg.UsageData), &usage); err != nil {
200 return 0
201 }
202 return usage.ContextWindowUsed()
203}
204
205// ConversationListUpdate represents an update to the conversation list
206type ConversationListUpdate struct {
207 Type string `json:"type"` // "update", "delete"
208 Conversation *generated.Conversation `json:"conversation,omitempty"`
209 ConversationID string `json:"conversation_id,omitempty"` // For deletes
210}
211
212// Server manages the HTTP API and active conversations
213type Server struct {
214 db *db.DB
215 llmManager LLMProvider
216 toolSetConfig claudetool.ToolSetConfig
217 activeConversations map[string]*ConversationManager
218 mu sync.Mutex
219 logger *slog.Logger
220 predictableOnly bool
221 terminalURL string
222 defaultModel string
223 links []Link
224 requireHeader string
225 conversationGroup singleflight.Group[string, *ConversationManager]
226 versionChecker *VersionChecker
227}
228
229// NewServer creates a new server instance
230func NewServer(database *db.DB, llmManager LLMProvider, toolSetConfig claudetool.ToolSetConfig, logger *slog.Logger, predictableOnly bool, terminalURL, defaultModel, requireHeader string, links []Link) *Server {
231 s := &Server{
232 db: database,
233 llmManager: llmManager,
234 toolSetConfig: toolSetConfig,
235 activeConversations: make(map[string]*ConversationManager),
236 logger: logger,
237 predictableOnly: predictableOnly,
238 terminalURL: terminalURL,
239 defaultModel: defaultModel,
240 requireHeader: requireHeader,
241 links: links,
242 versionChecker: NewVersionChecker(),
243 }
244
245 // Set up subagent support
246 s.toolSetConfig.SubagentRunner = NewSubagentRunner(s)
247 s.toolSetConfig.SubagentDB = &db.SubagentDBAdapter{DB: database}
248
249 return s
250}
251
252// RegisterRoutes registers HTTP routes on the given mux
253func (s *Server) RegisterRoutes(mux *http.ServeMux) {
254 // API routes - wrap with gzip where beneficial
255 mux.Handle("/api/conversations", gzipHandler(http.HandlerFunc(s.handleConversations)))
256 mux.Handle("/api/conversations/archived", gzipHandler(http.HandlerFunc(s.handleArchivedConversations)))
257 mux.Handle("/api/conversations/new", http.HandlerFunc(s.handleNewConversation)) // Small response
258 mux.Handle("/api/conversations/continue", http.HandlerFunc(s.handleContinueConversation)) // Small response
259 mux.Handle("/api/conversation/", http.StripPrefix("/api/conversation", s.conversationMux()))
260 mux.Handle("/api/conversation-by-slug/", gzipHandler(http.HandlerFunc(s.handleConversationBySlug)))
261 mux.Handle("/api/validate-cwd", http.HandlerFunc(s.handleValidateCwd)) // Small response
262 mux.Handle("/api/list-directory", gzipHandler(http.HandlerFunc(s.handleListDirectory)))
263 mux.Handle("/api/create-directory", http.HandlerFunc(s.handleCreateDirectory))
264 mux.Handle("/api/git/diffs", gzipHandler(http.HandlerFunc(s.handleGitDiffs)))
265 mux.Handle("/api/git/diffs/", gzipHandler(http.HandlerFunc(s.handleGitDiffFiles)))
266 mux.Handle("/api/git/file-diff/", gzipHandler(http.HandlerFunc(s.handleGitFileDiff)))
267 mux.HandleFunc("/api/upload", s.handleUpload) // Binary uploads
268 mux.HandleFunc("/api/read", s.handleRead) // Serves images
269 mux.Handle("/api/write-file", http.HandlerFunc(s.handleWriteFile)) // Small response
270 mux.HandleFunc("/api/exec-ws", s.handleExecWS) // Websocket for shell commands
271
272 // Custom models API
273 mux.Handle("/api/custom-models", http.HandlerFunc(s.handleCustomModels))
274 mux.Handle("/api/custom-models/", http.HandlerFunc(s.handleCustomModel))
275 mux.Handle("/api/custom-models-test", http.HandlerFunc(s.handleTestModel))
276
277 // Models API (dynamic list refresh)
278 mux.Handle("/api/models", http.HandlerFunc(s.handleModels))
279
280 // Version endpoints
281 mux.Handle("GET /version", http.HandlerFunc(s.handleVersion))
282 mux.Handle("GET /version-check", http.HandlerFunc(s.handleVersionCheck))
283 mux.Handle("GET /version-changelog", http.HandlerFunc(s.handleVersionChangelog))
284 mux.Handle("POST /upgrade", http.HandlerFunc(s.handleUpgrade))
285 mux.Handle("POST /exit", http.HandlerFunc(s.handleExit))
286
287 // Debug endpoints
288 mux.Handle("GET /debug/llm_requests", http.HandlerFunc(s.handleDebugLLMRequests))
289 mux.Handle("GET /debug/llm_requests/api", http.HandlerFunc(s.handleDebugLLMRequestsAPI))
290 mux.Handle("GET /debug/llm_requests/{id}/request", http.HandlerFunc(s.handleDebugLLMRequestBody))
291 mux.Handle("GET /debug/llm_requests/{id}/request_full", http.HandlerFunc(s.handleDebugLLMRequestBodyFull))
292 mux.Handle("GET /debug/llm_requests/{id}/response", http.HandlerFunc(s.handleDebugLLMResponseBody))
293
294 // Serve embedded UI assets
295 mux.Handle("/", s.staticHandler(ui.Assets()))
296}
297
298// handleValidateCwd validates that a path exists and is a directory
299func (s *Server) handleValidateCwd(w http.ResponseWriter, r *http.Request) {
300 if r.Method != http.MethodGet {
301 http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
302 return
303 }
304
305 path := r.URL.Query().Get("path")
306 if path == "" {
307 w.Header().Set("Content-Type", "application/json")
308 json.NewEncoder(w).Encode(map[string]interface{}{
309 "valid": false,
310 "error": "path is required",
311 })
312 return
313 }
314
315 info, err := os.Stat(path)
316 if err != nil {
317 w.Header().Set("Content-Type", "application/json")
318 if os.IsNotExist(err) {
319 json.NewEncoder(w).Encode(map[string]interface{}{
320 "valid": false,
321 "error": "directory does not exist",
322 })
323 } else {
324 json.NewEncoder(w).Encode(map[string]interface{}{
325 "valid": false,
326 "error": err.Error(),
327 })
328 }
329 return
330 }
331
332 if !info.IsDir() {
333 w.Header().Set("Content-Type", "application/json")
334 json.NewEncoder(w).Encode(map[string]interface{}{
335 "valid": false,
336 "error": "path is not a directory",
337 })
338 return
339 }
340
341 w.Header().Set("Content-Type", "application/json")
342 json.NewEncoder(w).Encode(map[string]interface{}{
343 "valid": true,
344 })
345}
346
347// DirectoryEntry represents a single directory entry for the directory picker
348type DirectoryEntry struct {
349 Name string `json:"name"`
350 IsDir bool `json:"is_dir"`
351 GitHeadSubject string `json:"git_head_subject,omitempty"`
352}
353
354// ListDirectoryResponse is the response from the list-directory endpoint
355type ListDirectoryResponse struct {
356 Path string `json:"path"`
357 Parent string `json:"parent"`
358 Entries []DirectoryEntry `json:"entries"`
359 GitHeadSubject string `json:"git_head_subject,omitempty"`
360}
361
362// handleListDirectory lists the contents of a directory for the directory picker
363func (s *Server) handleListDirectory(w http.ResponseWriter, r *http.Request) {
364 if r.Method != http.MethodGet {
365 http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
366 return
367 }
368
369 path := r.URL.Query().Get("path")
370 if path == "" {
371 // Default to home directory or root
372 homeDir, err := os.UserHomeDir()
373 if err != nil {
374 path = "/"
375 } else {
376 path = homeDir
377 }
378 }
379
380 // Clean and resolve the path
381 path = filepath.Clean(path)
382
383 // Verify path exists and is a directory
384 info, err := os.Stat(path)
385 if err != nil {
386 w.Header().Set("Content-Type", "application/json")
387 if os.IsNotExist(err) {
388 json.NewEncoder(w).Encode(map[string]interface{}{
389 "error": "directory does not exist",
390 })
391 } else if os.IsPermission(err) {
392 json.NewEncoder(w).Encode(map[string]interface{}{
393 "error": "permission denied",
394 })
395 } else {
396 json.NewEncoder(w).Encode(map[string]interface{}{
397 "error": err.Error(),
398 })
399 }
400 return
401 }
402
403 if !info.IsDir() {
404 w.Header().Set("Content-Type", "application/json")
405 json.NewEncoder(w).Encode(map[string]interface{}{
406 "error": "path is not a directory",
407 })
408 return
409 }
410
411 // Read directory contents
412 dirEntries, err := os.ReadDir(path)
413 if err != nil {
414 w.Header().Set("Content-Type", "application/json")
415 if os.IsPermission(err) {
416 json.NewEncoder(w).Encode(map[string]interface{}{
417 "error": "permission denied",
418 })
419 } else {
420 json.NewEncoder(w).Encode(map[string]interface{}{
421 "error": err.Error(),
422 })
423 }
424 return
425 }
426
427 // Build response with only directories (for directory picker)
428 var entries []DirectoryEntry
429 for _, entry := range dirEntries {
430 // Only include directories
431 if entry.IsDir() {
432 dirEntry := DirectoryEntry{
433 Name: entry.Name(),
434 IsDir: true,
435 }
436
437 // Check if this is a git repo root and get HEAD commit subject
438 entryPath := filepath.Join(path, entry.Name())
439 if isGitRepo(entryPath) {
440 if subject := getGitHeadSubject(entryPath); subject != "" {
441 dirEntry.GitHeadSubject = subject
442 }
443 }
444
445 entries = append(entries, dirEntry)
446 }
447 }
448
449 // Calculate parent directory
450 parent := filepath.Dir(path)
451 if parent == path {
452 // At root, no parent
453 parent = ""
454 }
455
456 response := ListDirectoryResponse{
457 Path: path,
458 Parent: parent,
459 Entries: entries,
460 }
461
462 // Check if the current directory itself is a git repo
463 if isGitRepo(path) {
464 response.GitHeadSubject = getGitHeadSubject(path)
465 }
466
467 w.Header().Set("Content-Type", "application/json")
468 json.NewEncoder(w).Encode(response)
469}
470
471// getGitHeadSubject returns the subject line of HEAD commit for a git repository.
472// Returns empty string if unable to get the subject.
473// isGitRepo checks if the given path is a git repository root.
474// Returns true for both regular repos (.git directory) and worktrees (.git file with gitdir:).
475func isGitRepo(dirPath string) bool {
476 gitPath := filepath.Join(dirPath, ".git")
477 fi, err := os.Stat(gitPath)
478 if err != nil {
479 return false
480 }
481 if fi.IsDir() {
482 return true // regular .git directory
483 }
484 if fi.Mode().IsRegular() {
485 // Check if it's a worktree .git file
486 content, err := os.ReadFile(gitPath)
487 if err == nil && strings.HasPrefix(string(content), "gitdir:") {
488 return true
489 }
490 }
491 return false
492}
493
494// getGitHeadSubject returns the subject line of HEAD commit for a git repository.
495// Returns empty string if unable to get the subject.
496func getGitHeadSubject(repoPath string) string {
497 cmd := exec.Command("git", "log", "-1", "--format=%s")
498 cmd.Dir = repoPath
499 output, err := cmd.Output()
500 if err != nil {
501 return ""
502 }
503 return strings.TrimSpace(string(output))
504}
505
506// handleCreateDirectory creates a new directory
507func (s *Server) handleCreateDirectory(w http.ResponseWriter, r *http.Request) {
508 if r.Method != http.MethodPost {
509 http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
510 return
511 }
512
513 var req struct {
514 Path string `json:"path"`
515 }
516 if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
517 w.Header().Set("Content-Type", "application/json")
518 json.NewEncoder(w).Encode(map[string]interface{}{
519 "error": "invalid request body",
520 })
521 return
522 }
523
524 if req.Path == "" {
525 w.Header().Set("Content-Type", "application/json")
526 json.NewEncoder(w).Encode(map[string]interface{}{
527 "error": "path is required",
528 })
529 return
530 }
531
532 // Clean the path
533 path := filepath.Clean(req.Path)
534
535 // Check if path already exists
536 if _, err := os.Stat(path); err == nil {
537 w.Header().Set("Content-Type", "application/json")
538 json.NewEncoder(w).Encode(map[string]interface{}{
539 "error": "path already exists",
540 })
541 return
542 }
543
544 // Verify parent directory exists
545 parentDir := filepath.Dir(path)
546 if _, err := os.Stat(parentDir); os.IsNotExist(err) {
547 w.Header().Set("Content-Type", "application/json")
548 json.NewEncoder(w).Encode(map[string]interface{}{
549 "error": "parent directory does not exist",
550 })
551 return
552 }
553
554 // Create the directory (only the final directory, not parents)
555 if err := os.Mkdir(path, 0o755); err != nil {
556 w.Header().Set("Content-Type", "application/json")
557 if os.IsPermission(err) {
558 json.NewEncoder(w).Encode(map[string]interface{}{
559 "error": "permission denied",
560 })
561 } else {
562 json.NewEncoder(w).Encode(map[string]interface{}{
563 "error": err.Error(),
564 })
565 }
566 return
567 }
568
569 w.Header().Set("Content-Type", "application/json")
570 json.NewEncoder(w).Encode(map[string]interface{}{
571 "path": path,
572 })
573}
574
575// getOrCreateConversationManager gets an existing conversation manager or creates a new one.
576func (s *Server) getOrCreateConversationManager(ctx context.Context, conversationID string) (*ConversationManager, error) {
577 manager, err, _ := s.conversationGroup.Do(conversationID, func() (*ConversationManager, error) {
578 s.mu.Lock()
579 defer s.mu.Unlock()
580 if manager, exists := s.activeConversations[conversationID]; exists {
581 manager.Touch()
582 return manager, nil
583 }
584
585 recordMessage := func(ctx context.Context, message llm.Message, usage llm.Usage) error {
586 return s.recordMessage(ctx, conversationID, message, usage)
587 }
588
589 onStateChange := func(state ConversationState) {
590 s.publishConversationState(state)
591 }
592
593 manager := NewConversationManager(conversationID, s.db, s.logger, s.toolSetConfig, recordMessage, onStateChange)
594 if err := manager.Hydrate(ctx); err != nil {
595 return nil, err
596 }
597
598 s.activeConversations[conversationID] = manager
599 return manager, nil
600 })
601 if err != nil {
602 return nil, err
603 }
604 return manager, nil
605}
606
607// ExtractDisplayData extracts display data from message content for storage
608func ExtractDisplayData(message llm.Message) interface{} {
609 // Build a map of tool_use_id to tool_name for lookups
610 toolNameMap := make(map[string]string)
611 for _, content := range message.Content {
612 if content.Type == llm.ContentTypeToolUse {
613 toolNameMap[content.ID] = content.ToolName
614 }
615 }
616
617 var displayData []any
618 for _, content := range message.Content {
619 if content.Type == llm.ContentTypeToolResult && content.Display != nil {
620 // Include tool name if we can find it
621 toolName := toolNameMap[content.ToolUseID]
622 displayData = append(displayData, map[string]any{
623 "tool_use_id": content.ToolUseID,
624 "tool_name": toolName,
625 "display": content.Display,
626 })
627 }
628 }
629
630 if len(displayData) > 0 {
631 return displayData
632 }
633 return nil
634}
635
636// recordMessage records a new message to the database and also notifies subscribers
637func (s *Server) recordMessage(ctx context.Context, conversationID string, message llm.Message, usage llm.Usage) error {
638 // Log message based on role
639 if message.Role == llm.MessageRoleUser {
640 s.logger.Info("User message", "conversation_id", conversationID, "content_items", len(message.Content))
641 } else if message.Role == llm.MessageRoleAssistant {
642 s.logger.Info("Agent message", "conversation_id", conversationID, "content_items", len(message.Content), "end_of_turn", message.EndOfTurn)
643 }
644
645 // Convert LLM message to database format
646 messageType, err := s.getMessageType(message)
647 if err != nil {
648 return fmt.Errorf("failed to determine message type: %w", err)
649 }
650
651 // Extract display data from content items
652 displayDataToStore := ExtractDisplayData(message)
653
654 // Create message
655 createdMsg, err := s.db.CreateMessage(ctx, db.CreateMessageParams{
656 ConversationID: conversationID,
657 Type: messageType,
658 LLMData: message,
659 UserData: nil,
660 UsageData: usage,
661 DisplayData: displayDataToStore,
662 ExcludedFromContext: message.ExcludedFromContext,
663 })
664 if err != nil {
665 return fmt.Errorf("failed to create message: %w", err)
666 }
667
668 // Update conversation's last updated timestamp for correct ordering
669 if err := s.db.QueriesTx(ctx, func(q *generated.Queries) error {
670 return q.UpdateConversationTimestamp(ctx, conversationID)
671 }); err != nil {
672 s.logger.Warn("Failed to update conversation timestamp", "conversationID", conversationID, "error", err)
673 }
674
675 // Touch active manager activity time if present
676 s.mu.Lock()
677 mgr, ok := s.activeConversations[conversationID]
678 if ok {
679 mgr.Touch()
680 }
681 s.mu.Unlock()
682
683 // Notify subscribers with only the new message - use WithoutCancel because
684 // the HTTP request context may be cancelled after the handler returns, but
685 // we still want the notification to complete so SSE clients see the message immediately
686 go s.notifySubscribersNewMessage(context.WithoutCancel(ctx), conversationID, createdMsg)
687
688 return nil
689}
690
691// getMessageType determines the message type from an LLM message
692func (s *Server) getMessageType(message llm.Message) (db.MessageType, error) {
693 // System-generated errors are stored as error type
694 if message.ErrorType != llm.ErrorTypeNone {
695 return db.MessageTypeError, nil
696 }
697
698 switch message.Role {
699 case llm.MessageRoleUser:
700 return db.MessageTypeUser, nil
701 case llm.MessageRoleAssistant:
702 return db.MessageTypeAgent, nil
703 default:
704 // For tool messages, check if it's a tool call or tool result
705 for _, content := range message.Content {
706 if content.Type == llm.ContentTypeToolUse {
707 return db.MessageTypeTool, nil
708 }
709 if content.Type == llm.ContentTypeToolResult {
710 return db.MessageTypeTool, nil
711 }
712 }
713 return db.MessageTypeAgent, nil
714 }
715}
716
717// convertToLLMMessage converts a database message to an LLM message
718func convertToLLMMessage(msg generated.Message) (llm.Message, error) {
719 var llmMsg llm.Message
720 if msg.LlmData == nil {
721 return llm.Message{}, fmt.Errorf("message has no LLM data")
722 }
723 if err := json.Unmarshal([]byte(*msg.LlmData), &llmMsg); err != nil {
724 return llm.Message{}, fmt.Errorf("failed to unmarshal LLM data: %w", err)
725 }
726 return llmMsg, nil
727}
728
729// notifySubscribers sends conversation metadata updates (e.g., slug changes) to subscribers.
730// This is used when only the conversation data changes, not the messages.
731// Uses Broadcast instead of Publish to avoid racing with message sequence IDs.
732func (s *Server) notifySubscribers(ctx context.Context, conversationID string) {
733 s.mu.Lock()
734 manager, exists := s.activeConversations[conversationID]
735 s.mu.Unlock()
736
737 if !exists {
738 return
739 }
740
741 // Get conversation data only (no messages needed for metadata-only updates)
742 var conversation generated.Conversation
743 err := s.db.Queries(ctx, func(q *generated.Queries) error {
744 var err error
745 conversation, err = q.GetConversation(ctx, conversationID)
746 return err
747 })
748 if err != nil {
749 s.logger.Error("Failed to get conversation data for notification", "conversationID", conversationID, "error", err)
750 return
751 }
752
753 // Broadcast conversation update with no new messages.
754 // Using Broadcast instead of Publish ensures this metadata-only update
755 // doesn't race with notifySubscribersNewMessage which uses Publish with sequence IDs.
756 streamData := StreamResponse{
757 Messages: nil, // No new messages, just conversation update
758 Conversation: conversation,
759 }
760 manager.subpub.Broadcast(streamData)
761
762 // Also notify conversation list subscribers (e.g., slug change)
763 s.publishConversationListUpdate(ConversationListUpdate{
764 Type: "update",
765 Conversation: &conversation,
766 })
767}
768
769// notifySubscribersNewMessage sends a single new message to all subscribers.
770// This is more efficient than re-sending all messages on each update.
771func (s *Server) notifySubscribersNewMessage(ctx context.Context, conversationID string, newMsg *generated.Message) {
772 s.mu.Lock()
773 manager, exists := s.activeConversations[conversationID]
774 s.mu.Unlock()
775
776 if !exists {
777 return
778 }
779
780 // Get conversation data for the response
781 var conversation generated.Conversation
782 err := s.db.Queries(ctx, func(q *generated.Queries) error {
783 var err error
784 conversation, err = q.GetConversation(ctx, conversationID)
785 return err
786 })
787 if err != nil {
788 s.logger.Error("Failed to get conversation data for notification", "conversationID", conversationID, "error", err)
789 return
790 }
791
792 // Convert the single new message to API format
793 apiMessages := toAPIMessages([]generated.Message{*newMsg})
794
795 // Update agent working state based on message type
796 if isAgentEndOfTurn(newMsg) {
797 manager.SetAgentWorking(false)
798 }
799
800 // Publish only the new message
801 streamData := StreamResponse{
802 Messages: apiMessages,
803 Conversation: conversation,
804 // ContextWindowSize: 0 for messages without usage data (user/tool messages).
805 // With omitempty, 0 is omitted from JSON, so the UI keeps its cached value.
806 // Only agent messages have usage data, so context window updates when they arrive.
807 ContextWindowSize: calculateContextWindowSizeFromMsg(newMsg),
808 }
809 manager.subpub.Publish(newMsg.SequenceID, streamData)
810
811 // Also notify conversation list subscribers about the update (updated_at changed)
812 s.publishConversationListUpdate(ConversationListUpdate{
813 Type: "update",
814 Conversation: &conversation,
815 })
816}
817
818// publishConversationListUpdate broadcasts a conversation list update to ALL active
819// conversation streams. This allows clients to receive updates about other conversations
820// while they're subscribed to their current conversation's stream.
821func (s *Server) publishConversationListUpdate(update ConversationListUpdate) {
822 s.mu.Lock()
823 defer s.mu.Unlock()
824
825 // Broadcast to all active conversation managers
826 for _, manager := range s.activeConversations {
827 streamData := StreamResponse{
828 ConversationListUpdate: &update,
829 }
830 manager.subpub.Broadcast(streamData)
831 }
832}
833
834// publishConversationState broadcasts a conversation state update to ALL active
835// conversation streams. This allows clients to see the working state of other conversations.
836func (s *Server) publishConversationState(state ConversationState) {
837 s.mu.Lock()
838 defer s.mu.Unlock()
839
840 // Broadcast to all active conversation managers
841 for _, manager := range s.activeConversations {
842 streamData := StreamResponse{
843 ConversationState: &state,
844 }
845 manager.subpub.Broadcast(streamData)
846 }
847}
848
849// getWorkingConversations returns a map of conversation IDs that are currently working.
850func (s *Server) getWorkingConversations() map[string]bool {
851 s.mu.Lock()
852 defer s.mu.Unlock()
853
854 working := make(map[string]bool)
855 for id, manager := range s.activeConversations {
856 if manager.IsAgentWorking() {
857 working[id] = true
858 }
859 }
860 return working
861}
862
863// IsAgentWorking returns whether the agent is currently working on the given conversation.
864// Returns false if the conversation doesn't have an active manager.
865func (s *Server) IsAgentWorking(conversationID string) bool {
866 s.mu.Lock()
867 manager, exists := s.activeConversations[conversationID]
868 s.mu.Unlock()
869 if !exists {
870 return false
871 }
872 return manager.IsAgentWorking()
873}
874
875// Cleanup removes inactive conversation managers
876func (s *Server) Cleanup() {
877 s.mu.Lock()
878 defer s.mu.Unlock()
879
880 now := time.Now()
881 for id, manager := range s.activeConversations {
882 // Remove managers that have been inactive for more than 30 minutes
883 manager.mu.Lock()
884 lastActivity := manager.lastActivity
885 manager.mu.Unlock()
886 if now.Sub(lastActivity) > 30*time.Minute {
887 manager.stopLoop()
888 delete(s.activeConversations, id)
889 s.logger.Debug("Cleaned up inactive conversation", "conversationID", id)
890 }
891 }
892}
893
894// Start starts the HTTP server and handles the complete lifecycle
895func (s *Server) Start(port string) error {
896 listener, err := net.Listen("tcp", ":"+port)
897 if err != nil {
898 s.logger.Error("Failed to create listener", "error", err, "port_info", getPortOwnerInfo(port))
899 return err
900 }
901 return s.StartWithListener(listener)
902}
903
904// StartWithListener starts the HTTP server using the provided listener.
905// This is useful for systemd socket activation where the listener is created externally.
906func (s *Server) StartWithListener(listener net.Listener) error {
907 // Set up HTTP server with routes and middleware
908 mux := http.NewServeMux()
909 s.RegisterRoutes(mux)
910
911 // Add middleware (applied in reverse order: last added = first executed)
912 handler := LoggerMiddleware(s.logger)(mux)
913 handler = CSRFMiddleware()(handler)
914 if s.requireHeader != "" {
915 handler = RequireHeaderMiddleware(s.requireHeader)(handler)
916 }
917
918 httpServer := &http.Server{
919 Handler: handler,
920 }
921
922 // Start cleanup routine
923 go func() {
924 ticker := time.NewTicker(5 * time.Minute)
925 defer ticker.Stop()
926 for range ticker.C {
927 s.Cleanup()
928 }
929 }()
930
931 // Get actual port from listener
932 actualPort := listener.Addr().(*net.TCPAddr).Port
933
934 // Start server in goroutine
935 serverErrCh := make(chan error, 1)
936 go func() {
937 s.logger.Info("Server starting", "port", actualPort, "url", fmt.Sprintf("http://localhost:%d", actualPort))
938 if err := httpServer.Serve(listener); err != nil && err != http.ErrServerClosed {
939 serverErrCh <- err
940 }
941 }()
942
943 // Wait for shutdown signal or server error
944 quit := make(chan os.Signal, 1)
945 signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
946
947 select {
948 case err := <-serverErrCh:
949 s.logger.Error("Server failed", "error", err)
950 return err
951 case <-quit:
952 s.logger.Info("Shutting down server")
953 }
954
955 // Graceful shutdown
956 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
957 defer cancel()
958
959 if err := httpServer.Shutdown(ctx); err != nil {
960 s.logger.Error("Server forced to shutdown", "error", err)
961 return err
962 }
963
964 s.logger.Info("Server exited")
965 return nil
966}
967
968// getPortOwnerInfo tries to identify what process is using a port.
969// Returns a human-readable string with the PID and process name, or an error message.
970func getPortOwnerInfo(port string) string {
971 // Use lsof to find the process using the port
972 cmd := exec.Command("lsof", "-i", ":"+port, "-sTCP:LISTEN", "-n", "-P")
973 output, err := cmd.Output()
974 if err != nil {
975 return fmt.Sprintf("(unable to determine: %v)", err)
976 }
977
978 lines := strings.Split(strings.TrimSpace(string(output)), "\n")
979 if len(lines) < 2 {
980 return "(no process found)"
981 }
982
983 // Parse lsof output: COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
984 // Skip the header line
985 for _, line := range lines[1:] {
986 fields := strings.Fields(line)
987 if len(fields) >= 2 {
988 command := fields[0]
989 pid := fields[1]
990 return fmt.Sprintf("pid=%s process=%s", pid, command)
991 }
992 }
993
994 return "(could not parse lsof output)"
995}