1package server
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "log/slog"
8 "net"
9 "net/http"
10 "os"
11 "os/exec"
12 "os/signal"
13 "path/filepath"
14 "sort"
15 "strings"
16 "sync"
17 "syscall"
18 "time"
19
20 "tailscale.com/util/singleflight"
21
22 "shelley.exe.dev/claudetool"
23 "shelley.exe.dev/db"
24 "shelley.exe.dev/db/generated"
25 "shelley.exe.dev/llm"
26 "shelley.exe.dev/models"
27 "shelley.exe.dev/ui"
28)
29
30// APIMessage is the message format sent to clients
31// TODO: We could maybe omit llm_data when display_data is available
32type APIMessage struct {
33 MessageID string `json:"message_id"`
34 ConversationID string `json:"conversation_id"`
35 SequenceID int64 `json:"sequence_id"`
36 Type string `json:"type"`
37 LlmData *string `json:"llm_data,omitempty"`
38 UserData *string `json:"user_data,omitempty"`
39 UsageData *string `json:"usage_data,omitempty"`
40 CreatedAt time.Time `json:"created_at"`
41 DisplayData *string `json:"display_data,omitempty"`
42 EndOfTurn *bool `json:"end_of_turn,omitempty"`
43}
44
45// ConversationState represents the current state of a conversation.
46// This is broadcast to all subscribers whenever the state changes.
47type ConversationState struct {
48 ConversationID string `json:"conversation_id"`
49 Working bool `json:"working"`
50 Model string `json:"model,omitempty"`
51}
52
53// ConversationWithState combines a conversation with its working state.
54type ConversationWithState struct {
55 generated.Conversation
56 Working bool `json:"working"`
57}
58
59// StreamResponse represents the response format for conversation streaming
60type StreamResponse struct {
61 Messages []APIMessage `json:"messages"`
62 Conversation generated.Conversation `json:"conversation"`
63 ConversationState *ConversationState `json:"conversation_state,omitempty"`
64 ContextWindowSize uint64 `json:"context_window_size,omitempty"`
65 // ConversationListUpdate is set when another conversation in the list changed
66 ConversationListUpdate *ConversationListUpdate `json:"conversation_list_update,omitempty"`
67 // Heartbeat indicates this is a heartbeat message (no new data, just keeping connection alive)
68 Heartbeat bool `json:"heartbeat,omitempty"`
69}
70
71// LLMProvider is an interface for getting LLM services
72type LLMProvider interface {
73 GetService(modelID string) (llm.Service, error)
74 GetAvailableModels() []string
75 HasModel(modelID string) bool
76 GetModelInfo(modelID string) *models.ModelInfo
77 RefreshCustomModels() error
78}
79
80// NewLLMServiceManager creates a new LLM service manager from config
81func NewLLMServiceManager(cfg *LLMConfig) LLMProvider {
82 // Convert LLMConfig to models.Config
83 modelConfig := &models.Config{
84 AnthropicAPIKey: cfg.AnthropicAPIKey,
85 OpenAIAPIKey: cfg.OpenAIAPIKey,
86 GeminiAPIKey: cfg.GeminiAPIKey,
87 FireworksAPIKey: cfg.FireworksAPIKey,
88 Gateway: cfg.Gateway,
89 Logger: cfg.Logger,
90 DB: cfg.DB,
91 }
92
93 manager, err := models.NewManager(modelConfig)
94 if err != nil {
95 // This shouldn't happen in practice, but handle it gracefully
96 cfg.Logger.Error("Failed to create models manager", "error", err)
97 }
98
99 return manager
100}
101
102// toAPIMessages converts database messages to API messages.
103// When display_data is present (tool results), llm_data is omitted to save bandwidth
104// since the display_data contains all information needed for UI rendering.
105func toAPIMessages(messages []generated.Message) []APIMessage {
106 apiMessages := make([]APIMessage, len(messages))
107 for i, msg := range messages {
108 var endOfTurnPtr *bool
109 if msg.LlmData != nil && msg.Type == string(db.MessageTypeAgent) {
110 if endOfTurn, ok := extractEndOfTurn(*msg.LlmData); ok {
111 endOfTurnCopy := endOfTurn
112 endOfTurnPtr = &endOfTurnCopy
113 }
114 }
115
116 // TODO: Consider omitting llm_data when display_data is present to save bandwidth.
117 // The display_data contains all info needed for UI rendering of tool results,
118 // but the UI currently still uses llm_data for some checks.
119
120 apiMsg := APIMessage{
121 MessageID: msg.MessageID,
122 ConversationID: msg.ConversationID,
123 SequenceID: msg.SequenceID,
124 Type: msg.Type,
125 LlmData: msg.LlmData,
126 UserData: msg.UserData,
127 UsageData: msg.UsageData,
128 CreatedAt: msg.CreatedAt,
129 DisplayData: msg.DisplayData,
130 EndOfTurn: endOfTurnPtr,
131 }
132 apiMessages[i] = apiMsg
133 }
134 return apiMessages
135}
136
137func extractEndOfTurn(raw string) (bool, bool) {
138 var message llm.Message
139 if err := json.Unmarshal([]byte(raw), &message); err != nil {
140 return false, false
141 }
142 return message.EndOfTurn, true
143}
144
145// calculateContextWindowSize returns the context window usage from the most recent message with non-zero usage.
146// Each API call's input tokens represent the full conversation history sent to the model,
147// so we only need the last message's tokens (not accumulated across all messages).
148// The total input includes regular input tokens plus cached tokens (both read and created).
149// Messages without usage data (user messages, tool messages, etc.) are skipped.
150func calculateContextWindowSize(messages []APIMessage) uint64 {
151 // Find the last message with non-zero usage data
152 for i := len(messages) - 1; i >= 0; i-- {
153 msg := messages[i]
154 if msg.UsageData == nil {
155 continue
156 }
157 var usage llm.Usage
158 if err := json.Unmarshal([]byte(*msg.UsageData), &usage); err != nil {
159 continue
160 }
161 ctxUsed := usage.ContextWindowUsed()
162 if ctxUsed == 0 {
163 continue
164 }
165 // Return total context window used: all input tokens + output tokens
166 // This represents the full context that would be sent for the next turn
167 return ctxUsed
168 }
169 return 0
170}
171
172// isAgentEndOfTurn checks if a message is an agent or error message with end_of_turn=true.
173// This indicates the agent loop has finished processing.
174func isAgentEndOfTurn(msg *generated.Message) bool {
175 if msg == nil {
176 return false
177 }
178 // Agent and error messages can have end_of_turn
179 if msg.Type != string(db.MessageTypeAgent) && msg.Type != string(db.MessageTypeError) {
180 return false
181 }
182 if msg.LlmData == nil {
183 return false
184 }
185 endOfTurn, ok := extractEndOfTurn(*msg.LlmData)
186 if !ok {
187 return false
188 }
189 return endOfTurn
190}
191
192// calculateContextWindowSizeFromMsg calculates context window usage from a single message.
193// Returns 0 if the message has no usage data (e.g., user messages), in which case
194// the client should keep its previous context window value.
195func calculateContextWindowSizeFromMsg(msg *generated.Message) uint64 {
196 if msg == nil || msg.UsageData == nil {
197 return 0
198 }
199 var usage llm.Usage
200 if err := json.Unmarshal([]byte(*msg.UsageData), &usage); err != nil {
201 return 0
202 }
203 return usage.ContextWindowUsed()
204}
205
206// ConversationListUpdate represents an update to the conversation list
207type ConversationListUpdate struct {
208 Type string `json:"type"` // "update", "delete"
209 Conversation *generated.Conversation `json:"conversation,omitempty"`
210 ConversationID string `json:"conversation_id,omitempty"` // For deletes
211}
212
213// Server manages the HTTP API and active conversations
214type Server struct {
215 db *db.DB
216 llmManager LLMProvider
217 toolSetConfig claudetool.ToolSetConfig
218 activeConversations map[string]*ConversationManager
219 mu sync.Mutex
220 logger *slog.Logger
221 predictableOnly bool
222 terminalURL string
223 defaultModel string
224 links []Link
225 requireHeader string
226 conversationGroup singleflight.Group[string, *ConversationManager]
227 versionChecker *VersionChecker
228}
229
230// NewServer creates a new server instance
231func NewServer(database *db.DB, llmManager LLMProvider, toolSetConfig claudetool.ToolSetConfig, logger *slog.Logger, predictableOnly bool, terminalURL, defaultModel, requireHeader string, links []Link) *Server {
232 s := &Server{
233 db: database,
234 llmManager: llmManager,
235 toolSetConfig: toolSetConfig,
236 activeConversations: make(map[string]*ConversationManager),
237 logger: logger,
238 predictableOnly: predictableOnly,
239 terminalURL: terminalURL,
240 defaultModel: defaultModel,
241 requireHeader: requireHeader,
242 links: links,
243 versionChecker: NewVersionChecker(),
244 }
245
246 // Set up subagent support
247 s.toolSetConfig.SubagentRunner = NewSubagentRunner(s)
248 s.toolSetConfig.SubagentDB = &db.SubagentDBAdapter{DB: database}
249
250 return s
251}
252
253// RegisterRoutes registers HTTP routes on the given mux
254func (s *Server) RegisterRoutes(mux *http.ServeMux) {
255 // API routes - wrap with gzip where beneficial
256 mux.Handle("/api/conversations", gzipHandler(http.HandlerFunc(s.handleConversations)))
257 mux.Handle("/api/conversations/archived", gzipHandler(http.HandlerFunc(s.handleArchivedConversations)))
258 mux.Handle("/api/conversations/new", http.HandlerFunc(s.handleNewConversation)) // Small response
259 mux.Handle("/api/conversations/continue", http.HandlerFunc(s.handleContinueConversation)) // Small response
260 mux.Handle("/api/conversation/", http.StripPrefix("/api/conversation", s.conversationMux()))
261 mux.Handle("/api/conversation-by-slug/", gzipHandler(http.HandlerFunc(s.handleConversationBySlug)))
262 mux.Handle("/api/validate-cwd", http.HandlerFunc(s.handleValidateCwd)) // Small response
263 mux.Handle("/api/list-directory", gzipHandler(http.HandlerFunc(s.handleListDirectory)))
264 mux.Handle("/api/create-directory", http.HandlerFunc(s.handleCreateDirectory))
265 mux.Handle("/api/git/diffs", gzipHandler(http.HandlerFunc(s.handleGitDiffs)))
266 mux.Handle("/api/git/diffs/", gzipHandler(http.HandlerFunc(s.handleGitDiffFiles)))
267 mux.Handle("/api/git/file-diff/", gzipHandler(http.HandlerFunc(s.handleGitFileDiff)))
268 mux.HandleFunc("/api/upload", s.handleUpload) // Binary uploads
269 mux.HandleFunc("/api/read", s.handleRead) // Serves images
270 mux.Handle("/api/write-file", http.HandlerFunc(s.handleWriteFile)) // Small response
271 mux.HandleFunc("/api/exec-ws", s.handleExecWS) // Websocket for shell commands
272
273 // Custom models API
274 mux.Handle("/api/custom-models", http.HandlerFunc(s.handleCustomModels))
275 mux.Handle("/api/custom-models/", http.HandlerFunc(s.handleCustomModel))
276 mux.Handle("/api/custom-models-test", http.HandlerFunc(s.handleTestModel))
277
278 // Models API (dynamic list refresh)
279 mux.Handle("/api/models", http.HandlerFunc(s.handleModels))
280
281 // Version endpoints
282 mux.Handle("GET /version", http.HandlerFunc(s.handleVersion))
283 mux.Handle("GET /version-check", http.HandlerFunc(s.handleVersionCheck))
284 mux.Handle("GET /version-changelog", http.HandlerFunc(s.handleVersionChangelog))
285 mux.Handle("POST /upgrade", http.HandlerFunc(s.handleUpgrade))
286 mux.Handle("POST /exit", http.HandlerFunc(s.handleExit))
287
288 // Debug endpoints
289 mux.Handle("GET /debug/conversations", http.HandlerFunc(s.handleDebugConversationsPage))
290 mux.Handle("GET /debug/llm_requests", http.HandlerFunc(s.handleDebugLLMRequests))
291 mux.Handle("GET /debug/llm_requests/api", http.HandlerFunc(s.handleDebugLLMRequestsAPI))
292 mux.Handle("GET /debug/llm_requests/{id}/request", http.HandlerFunc(s.handleDebugLLMRequestBody))
293 mux.Handle("GET /debug/llm_requests/{id}/request_full", http.HandlerFunc(s.handleDebugLLMRequestBodyFull))
294 mux.Handle("GET /debug/llm_requests/{id}/response", http.HandlerFunc(s.handleDebugLLMResponseBody))
295
296 // Serve embedded UI assets
297 mux.Handle("/", s.staticHandler(ui.Assets()))
298}
299
300// handleValidateCwd validates that a path exists and is a directory
301func (s *Server) handleValidateCwd(w http.ResponseWriter, r *http.Request) {
302 if r.Method != http.MethodGet {
303 http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
304 return
305 }
306
307 path := r.URL.Query().Get("path")
308 if path == "" {
309 w.Header().Set("Content-Type", "application/json")
310 json.NewEncoder(w).Encode(map[string]interface{}{
311 "valid": false,
312 "error": "path is required",
313 })
314 return
315 }
316
317 info, err := os.Stat(path)
318 if err != nil {
319 w.Header().Set("Content-Type", "application/json")
320 if os.IsNotExist(err) {
321 json.NewEncoder(w).Encode(map[string]interface{}{
322 "valid": false,
323 "error": "directory does not exist",
324 })
325 } else {
326 json.NewEncoder(w).Encode(map[string]interface{}{
327 "valid": false,
328 "error": err.Error(),
329 })
330 }
331 return
332 }
333
334 if !info.IsDir() {
335 w.Header().Set("Content-Type", "application/json")
336 json.NewEncoder(w).Encode(map[string]interface{}{
337 "valid": false,
338 "error": "path is not a directory",
339 })
340 return
341 }
342
343 w.Header().Set("Content-Type", "application/json")
344 json.NewEncoder(w).Encode(map[string]interface{}{
345 "valid": true,
346 })
347}
348
349// DirectoryEntry represents a single directory entry for the directory picker
350type DirectoryEntry struct {
351 Name string `json:"name"`
352 IsDir bool `json:"is_dir"`
353 GitHeadSubject string `json:"git_head_subject,omitempty"`
354}
355
356// ListDirectoryResponse is the response from the list-directory endpoint
357type ListDirectoryResponse struct {
358 Path string `json:"path"`
359 Parent string `json:"parent"`
360 Entries []DirectoryEntry `json:"entries"`
361 GitHeadSubject string `json:"git_head_subject,omitempty"`
362 GitWorktreeRoot string `json:"git_worktree_root,omitempty"`
363}
364
365// handleListDirectory lists the contents of a directory for the directory picker
366func (s *Server) handleListDirectory(w http.ResponseWriter, r *http.Request) {
367 if r.Method != http.MethodGet {
368 http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
369 return
370 }
371
372 path := r.URL.Query().Get("path")
373 if path == "" {
374 // Default to home directory or root
375 homeDir, err := os.UserHomeDir()
376 if err != nil {
377 path = "/"
378 } else {
379 path = homeDir
380 }
381 }
382
383 // Clean and resolve the path
384 path = filepath.Clean(path)
385
386 // Verify path exists and is a directory
387 info, err := os.Stat(path)
388 if err != nil {
389 w.Header().Set("Content-Type", "application/json")
390 if os.IsNotExist(err) {
391 json.NewEncoder(w).Encode(map[string]interface{}{
392 "error": "directory does not exist",
393 })
394 } else if os.IsPermission(err) {
395 json.NewEncoder(w).Encode(map[string]interface{}{
396 "error": "permission denied",
397 })
398 } else {
399 json.NewEncoder(w).Encode(map[string]interface{}{
400 "error": err.Error(),
401 })
402 }
403 return
404 }
405
406 if !info.IsDir() {
407 w.Header().Set("Content-Type", "application/json")
408 json.NewEncoder(w).Encode(map[string]interface{}{
409 "error": "path is not a directory",
410 })
411 return
412 }
413
414 // Read directory contents
415 dirEntries, err := os.ReadDir(path)
416 if err != nil {
417 w.Header().Set("Content-Type", "application/json")
418 if os.IsPermission(err) {
419 json.NewEncoder(w).Encode(map[string]interface{}{
420 "error": "permission denied",
421 })
422 } else {
423 json.NewEncoder(w).Encode(map[string]interface{}{
424 "error": err.Error(),
425 })
426 }
427 return
428 }
429
430 // Build response with only directories (for directory picker)
431 var entries []DirectoryEntry
432 for _, entry := range dirEntries {
433 // Only include directories
434 if entry.IsDir() {
435 dirEntry := DirectoryEntry{
436 Name: entry.Name(),
437 IsDir: true,
438 }
439
440 // Check if this is a git repo root and get HEAD commit subject
441 entryPath := filepath.Join(path, entry.Name())
442 if isGitRepo(entryPath) {
443 if subject := getGitHeadSubject(entryPath); subject != "" {
444 dirEntry.GitHeadSubject = subject
445 }
446 }
447
448 entries = append(entries, dirEntry)
449 }
450 }
451
452 // Sort entries: non-hidden first, then hidden (.*), alphabetically within each group
453 sort.Slice(entries, func(i, j int) bool {
454 iHidden := strings.HasPrefix(entries[i].Name, ".")
455 jHidden := strings.HasPrefix(entries[j].Name, ".")
456 if iHidden != jHidden {
457 return !iHidden // non-hidden comes first
458 }
459 return entries[i].Name < entries[j].Name
460 })
461
462 // Calculate parent directory
463 parent := filepath.Dir(path)
464 if parent == path {
465 // At root, no parent
466 parent = ""
467 }
468
469 response := ListDirectoryResponse{
470 Path: path,
471 Parent: parent,
472 Entries: entries,
473 }
474
475 // Check if the current directory itself is a git repo
476 if isGitRepo(path) {
477 response.GitHeadSubject = getGitHeadSubject(path)
478 if root := getGitWorktreeRoot(path); root != "" {
479 response.GitWorktreeRoot = root
480 }
481 }
482
483 w.Header().Set("Content-Type", "application/json")
484 json.NewEncoder(w).Encode(response)
485}
486
487// getGitHeadSubject returns the subject line of HEAD commit for a git repository.
488// Returns empty string if unable to get the subject.
489// isGitRepo checks if the given path is a git repository root.
490// Returns true for both regular repos (.git directory) and worktrees (.git file with gitdir:).
491func isGitRepo(dirPath string) bool {
492 gitPath := filepath.Join(dirPath, ".git")
493 fi, err := os.Stat(gitPath)
494 if err != nil {
495 return false
496 }
497 if fi.IsDir() {
498 return true // regular .git directory
499 }
500 if fi.Mode().IsRegular() {
501 // Check if it's a worktree .git file
502 content, err := os.ReadFile(gitPath)
503 if err == nil && strings.HasPrefix(string(content), "gitdir:") {
504 return true
505 }
506 }
507 return false
508}
509
510// getGitHeadSubject returns the subject line of HEAD commit for a git repository.
511// Returns empty string if unable to get the subject.
512func getGitHeadSubject(repoPath string) string {
513 cmd := exec.Command("git", "log", "-1", "--format=%s")
514 cmd.Dir = repoPath
515 output, err := cmd.Output()
516 if err != nil {
517 return ""
518 }
519 return strings.TrimSpace(string(output))
520}
521
522// getGitWorktreeRoot returns the main repository root if the given path is
523// a git worktree (not the main repo itself). Returns "" otherwise.
524func getGitWorktreeRoot(repoPath string) string {
525 // Get the worktree's git dir and the common (main repo) git dir
526 cmd := exec.Command("git", "rev-parse", "--git-dir", "--git-common-dir")
527 cmd.Dir = repoPath
528 output, err := cmd.Output()
529 if err != nil {
530 return ""
531 }
532 lines := strings.SplitN(strings.TrimSpace(string(output)), "\n", 2)
533 if len(lines) != 2 {
534 return ""
535 }
536 gitDir := lines[0]
537 commonDir := lines[1]
538
539 // Resolve relative paths
540 if !filepath.IsAbs(gitDir) {
541 gitDir = filepath.Join(repoPath, gitDir)
542 }
543 if !filepath.IsAbs(commonDir) {
544 commonDir = filepath.Join(repoPath, commonDir)
545 }
546 gitDir = filepath.Clean(gitDir)
547 commonDir = filepath.Clean(commonDir)
548
549 // If they're the same, this is the main repo, not a worktree
550 if gitDir == commonDir {
551 return ""
552 }
553
554 // The main repo root is the parent of the common .git dir
555 return filepath.Dir(commonDir)
556}
557
558// handleCreateDirectory creates a new directory
559func (s *Server) handleCreateDirectory(w http.ResponseWriter, r *http.Request) {
560 if r.Method != http.MethodPost {
561 http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
562 return
563 }
564
565 var req struct {
566 Path string `json:"path"`
567 }
568 if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
569 w.Header().Set("Content-Type", "application/json")
570 json.NewEncoder(w).Encode(map[string]interface{}{
571 "error": "invalid request body",
572 })
573 return
574 }
575
576 if req.Path == "" {
577 w.Header().Set("Content-Type", "application/json")
578 json.NewEncoder(w).Encode(map[string]interface{}{
579 "error": "path is required",
580 })
581 return
582 }
583
584 // Clean the path
585 path := filepath.Clean(req.Path)
586
587 // Check if path already exists
588 if _, err := os.Stat(path); err == nil {
589 w.Header().Set("Content-Type", "application/json")
590 json.NewEncoder(w).Encode(map[string]interface{}{
591 "error": "path already exists",
592 })
593 return
594 }
595
596 // Verify parent directory exists
597 parentDir := filepath.Dir(path)
598 if _, err := os.Stat(parentDir); os.IsNotExist(err) {
599 w.Header().Set("Content-Type", "application/json")
600 json.NewEncoder(w).Encode(map[string]interface{}{
601 "error": "parent directory does not exist",
602 })
603 return
604 }
605
606 // Create the directory (only the final directory, not parents)
607 if err := os.Mkdir(path, 0o755); err != nil {
608 w.Header().Set("Content-Type", "application/json")
609 if os.IsPermission(err) {
610 json.NewEncoder(w).Encode(map[string]interface{}{
611 "error": "permission denied",
612 })
613 } else {
614 json.NewEncoder(w).Encode(map[string]interface{}{
615 "error": err.Error(),
616 })
617 }
618 return
619 }
620
621 w.Header().Set("Content-Type", "application/json")
622 json.NewEncoder(w).Encode(map[string]interface{}{
623 "path": path,
624 })
625}
626
627// getOrCreateConversationManager gets an existing conversation manager or creates a new one.
628func (s *Server) getOrCreateConversationManager(ctx context.Context, conversationID string) (*ConversationManager, error) {
629 manager, err, _ := s.conversationGroup.Do(conversationID, func() (*ConversationManager, error) {
630 s.mu.Lock()
631 defer s.mu.Unlock()
632 if manager, exists := s.activeConversations[conversationID]; exists {
633 manager.Touch()
634 return manager, nil
635 }
636
637 recordMessage := func(ctx context.Context, message llm.Message, usage llm.Usage) error {
638 return s.recordMessage(ctx, conversationID, message, usage)
639 }
640
641 onStateChange := func(state ConversationState) {
642 s.publishConversationState(state)
643 }
644
645 manager := NewConversationManager(conversationID, s.db, s.logger, s.toolSetConfig, recordMessage, onStateChange)
646 if err := manager.Hydrate(ctx); err != nil {
647 return nil, err
648 }
649
650 s.activeConversations[conversationID] = manager
651 return manager, nil
652 })
653 if err != nil {
654 return nil, err
655 }
656 return manager, nil
657}
658
659// ExtractDisplayData extracts display data from message content for storage
660func ExtractDisplayData(message llm.Message) interface{} {
661 // Build a map of tool_use_id to tool_name for lookups
662 toolNameMap := make(map[string]string)
663 for _, content := range message.Content {
664 if content.Type == llm.ContentTypeToolUse {
665 toolNameMap[content.ID] = content.ToolName
666 }
667 }
668
669 var displayData []any
670 for _, content := range message.Content {
671 if content.Type == llm.ContentTypeToolResult && content.Display != nil {
672 // Include tool name if we can find it
673 toolName := toolNameMap[content.ToolUseID]
674 displayData = append(displayData, map[string]any{
675 "tool_use_id": content.ToolUseID,
676 "tool_name": toolName,
677 "display": content.Display,
678 })
679 }
680 }
681
682 if len(displayData) > 0 {
683 return displayData
684 }
685 return nil
686}
687
688// recordMessage records a new message to the database and also notifies subscribers
689func (s *Server) recordMessage(ctx context.Context, conversationID string, message llm.Message, usage llm.Usage) error {
690 // Log message based on role
691 if message.Role == llm.MessageRoleUser {
692 s.logger.Info("User message", "conversation_id", conversationID, "content_items", len(message.Content))
693 } else if message.Role == llm.MessageRoleAssistant {
694 s.logger.Info("Agent message", "conversation_id", conversationID, "content_items", len(message.Content), "end_of_turn", message.EndOfTurn)
695 }
696
697 // Convert LLM message to database format
698 messageType, err := s.getMessageType(message)
699 if err != nil {
700 return fmt.Errorf("failed to determine message type: %w", err)
701 }
702
703 // Extract display data from content items
704 displayDataToStore := ExtractDisplayData(message)
705
706 // Create message
707 createdMsg, err := s.db.CreateMessage(ctx, db.CreateMessageParams{
708 ConversationID: conversationID,
709 Type: messageType,
710 LLMData: message,
711 UserData: nil,
712 UsageData: usage,
713 DisplayData: displayDataToStore,
714 ExcludedFromContext: message.ExcludedFromContext,
715 })
716 if err != nil {
717 return fmt.Errorf("failed to create message: %w", err)
718 }
719
720 // Update conversation's last updated timestamp for correct ordering
721 if err := s.db.QueriesTx(ctx, func(q *generated.Queries) error {
722 return q.UpdateConversationTimestamp(ctx, conversationID)
723 }); err != nil {
724 s.logger.Warn("Failed to update conversation timestamp", "conversationID", conversationID, "error", err)
725 }
726
727 // Touch active manager activity time if present
728 s.mu.Lock()
729 mgr, ok := s.activeConversations[conversationID]
730 if ok {
731 mgr.Touch()
732 }
733 s.mu.Unlock()
734
735 // Notify subscribers with only the new message - use WithoutCancel because
736 // the HTTP request context may be cancelled after the handler returns, but
737 // we still want the notification to complete so SSE clients see the message immediately
738 go s.notifySubscribersNewMessage(context.WithoutCancel(ctx), conversationID, createdMsg)
739
740 return nil
741}
742
743// getMessageType determines the message type from an LLM message
744func (s *Server) getMessageType(message llm.Message) (db.MessageType, error) {
745 // System-generated errors are stored as error type
746 if message.ErrorType != llm.ErrorTypeNone {
747 return db.MessageTypeError, nil
748 }
749
750 switch message.Role {
751 case llm.MessageRoleUser:
752 return db.MessageTypeUser, nil
753 case llm.MessageRoleAssistant:
754 return db.MessageTypeAgent, nil
755 default:
756 // For tool messages, check if it's a tool call or tool result
757 for _, content := range message.Content {
758 if content.Type == llm.ContentTypeToolUse {
759 return db.MessageTypeTool, nil
760 }
761 if content.Type == llm.ContentTypeToolResult {
762 return db.MessageTypeTool, nil
763 }
764 }
765 return db.MessageTypeAgent, nil
766 }
767}
768
769// convertToLLMMessage converts a database message to an LLM message
770func convertToLLMMessage(msg generated.Message) (llm.Message, error) {
771 var llmMsg llm.Message
772 if msg.LlmData == nil {
773 return llm.Message{}, fmt.Errorf("message has no LLM data")
774 }
775 if err := json.Unmarshal([]byte(*msg.LlmData), &llmMsg); err != nil {
776 return llm.Message{}, fmt.Errorf("failed to unmarshal LLM data: %w", err)
777 }
778 return llmMsg, nil
779}
780
781// notifySubscribers sends conversation metadata updates (e.g., slug changes) to subscribers.
782// This is used when only the conversation data changes, not the messages.
783// Uses Broadcast instead of Publish to avoid racing with message sequence IDs.
784func (s *Server) notifySubscribers(ctx context.Context, conversationID string) {
785 s.mu.Lock()
786 manager, exists := s.activeConversations[conversationID]
787 s.mu.Unlock()
788
789 if !exists {
790 return
791 }
792
793 // Get conversation data only (no messages needed for metadata-only updates)
794 var conversation generated.Conversation
795 err := s.db.Queries(ctx, func(q *generated.Queries) error {
796 var err error
797 conversation, err = q.GetConversation(ctx, conversationID)
798 return err
799 })
800 if err != nil {
801 s.logger.Error("Failed to get conversation data for notification", "conversationID", conversationID, "error", err)
802 return
803 }
804
805 // Broadcast conversation update with no new messages.
806 // Using Broadcast instead of Publish ensures this metadata-only update
807 // doesn't race with notifySubscribersNewMessage which uses Publish with sequence IDs.
808 streamData := StreamResponse{
809 Messages: nil, // No new messages, just conversation update
810 Conversation: conversation,
811 }
812 manager.subpub.Broadcast(streamData)
813
814 // Also notify conversation list subscribers (e.g., slug change)
815 s.publishConversationListUpdate(ConversationListUpdate{
816 Type: "update",
817 Conversation: &conversation,
818 })
819}
820
821// notifySubscribersNewMessage sends a single new message to all subscribers.
822// This is more efficient than re-sending all messages on each update.
823func (s *Server) notifySubscribersNewMessage(ctx context.Context, conversationID string, newMsg *generated.Message) {
824 s.mu.Lock()
825 manager, exists := s.activeConversations[conversationID]
826 s.mu.Unlock()
827
828 if !exists {
829 return
830 }
831
832 // Get conversation data for the response
833 var conversation generated.Conversation
834 err := s.db.Queries(ctx, func(q *generated.Queries) error {
835 var err error
836 conversation, err = q.GetConversation(ctx, conversationID)
837 return err
838 })
839 if err != nil {
840 s.logger.Error("Failed to get conversation data for notification", "conversationID", conversationID, "error", err)
841 return
842 }
843
844 // Convert the single new message to API format
845 apiMessages := toAPIMessages([]generated.Message{*newMsg})
846
847 // Update agent working state based on message type
848 if isAgentEndOfTurn(newMsg) {
849 manager.SetAgentWorking(false)
850 }
851
852 // Publish only the new message
853 streamData := StreamResponse{
854 Messages: apiMessages,
855 Conversation: conversation,
856 // ContextWindowSize: 0 for messages without usage data (user/tool messages).
857 // With omitempty, 0 is omitted from JSON, so the UI keeps its cached value.
858 // Only agent messages have usage data, so context window updates when they arrive.
859 ContextWindowSize: calculateContextWindowSizeFromMsg(newMsg),
860 }
861 manager.subpub.Publish(newMsg.SequenceID, streamData)
862
863 // Also notify conversation list subscribers about the update (updated_at changed)
864 s.publishConversationListUpdate(ConversationListUpdate{
865 Type: "update",
866 Conversation: &conversation,
867 })
868}
869
870// publishConversationListUpdate broadcasts a conversation list update to ALL active
871// conversation streams. This allows clients to receive updates about other conversations
872// while they're subscribed to their current conversation's stream.
873func (s *Server) publishConversationListUpdate(update ConversationListUpdate) {
874 s.mu.Lock()
875 defer s.mu.Unlock()
876
877 // Broadcast to all active conversation managers
878 for _, manager := range s.activeConversations {
879 streamData := StreamResponse{
880 ConversationListUpdate: &update,
881 }
882 manager.subpub.Broadcast(streamData)
883 }
884}
885
886// publishConversationState broadcasts a conversation state update to ALL active
887// conversation streams. This allows clients to see the working state of other conversations.
888func (s *Server) publishConversationState(state ConversationState) {
889 s.mu.Lock()
890 defer s.mu.Unlock()
891
892 // Broadcast to all active conversation managers
893 for _, manager := range s.activeConversations {
894 streamData := StreamResponse{
895 ConversationState: &state,
896 }
897 manager.subpub.Broadcast(streamData)
898 }
899}
900
901// getWorkingConversations returns a map of conversation IDs that are currently working.
902func (s *Server) getWorkingConversations() map[string]bool {
903 s.mu.Lock()
904 defer s.mu.Unlock()
905
906 working := make(map[string]bool)
907 for id, manager := range s.activeConversations {
908 if manager.IsAgentWorking() {
909 working[id] = true
910 }
911 }
912 return working
913}
914
915// IsAgentWorking returns whether the agent is currently working on the given conversation.
916// Returns false if the conversation doesn't have an active manager.
917func (s *Server) IsAgentWorking(conversationID string) bool {
918 s.mu.Lock()
919 manager, exists := s.activeConversations[conversationID]
920 s.mu.Unlock()
921 if !exists {
922 return false
923 }
924 return manager.IsAgentWorking()
925}
926
927// Cleanup removes inactive conversation managers
928func (s *Server) Cleanup() {
929 s.mu.Lock()
930 defer s.mu.Unlock()
931
932 now := time.Now()
933 for id, manager := range s.activeConversations {
934 // Remove managers that have been inactive for more than 30 minutes
935 manager.mu.Lock()
936 lastActivity := manager.lastActivity
937 manager.mu.Unlock()
938 if now.Sub(lastActivity) > 30*time.Minute {
939 manager.stopLoop()
940 delete(s.activeConversations, id)
941 s.logger.Debug("Cleaned up inactive conversation", "conversationID", id)
942 }
943 }
944}
945
946// Start starts the HTTP server and handles the complete lifecycle
947func (s *Server) Start(port string) error {
948 listener, err := net.Listen("tcp", ":"+port)
949 if err != nil {
950 s.logger.Error("Failed to create listener", "error", err, "port_info", getPortOwnerInfo(port))
951 return err
952 }
953 return s.StartWithListener(listener)
954}
955
956// StartWithListener starts the HTTP server using the provided listener.
957// This is useful for systemd socket activation where the listener is created externally.
958func (s *Server) StartWithListener(listener net.Listener) error {
959 // Set up HTTP server with routes and middleware
960 mux := http.NewServeMux()
961 s.RegisterRoutes(mux)
962
963 // Add middleware (applied in reverse order: last added = first executed)
964 handler := LoggerMiddleware(s.logger)(mux)
965 handler = CSRFMiddleware()(handler)
966 if s.requireHeader != "" {
967 handler = RequireHeaderMiddleware(s.requireHeader)(handler)
968 }
969
970 httpServer := &http.Server{
971 Handler: handler,
972 }
973
974 // Start cleanup routine
975 go func() {
976 ticker := time.NewTicker(5 * time.Minute)
977 defer ticker.Stop()
978 for range ticker.C {
979 s.Cleanup()
980 }
981 }()
982
983 // Get actual port from listener
984 actualPort := listener.Addr().(*net.TCPAddr).Port
985
986 // Start server in goroutine
987 serverErrCh := make(chan error, 1)
988 go func() {
989 s.logger.Info("Server starting", "port", actualPort, "url", fmt.Sprintf("http://localhost:%d", actualPort))
990 if err := httpServer.Serve(listener); err != nil && err != http.ErrServerClosed {
991 serverErrCh <- err
992 }
993 }()
994
995 // Wait for shutdown signal or server error
996 quit := make(chan os.Signal, 1)
997 signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
998
999 select {
1000 case err := <-serverErrCh:
1001 s.logger.Error("Server failed", "error", err)
1002 return err
1003 case <-quit:
1004 s.logger.Info("Shutting down server")
1005 }
1006
1007 // Graceful shutdown
1008 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
1009 defer cancel()
1010
1011 if err := httpServer.Shutdown(ctx); err != nil {
1012 s.logger.Error("Server forced to shutdown", "error", err)
1013 return err
1014 }
1015
1016 s.logger.Info("Server exited")
1017 return nil
1018}
1019
1020// getPortOwnerInfo tries to identify what process is using a port.
1021// Returns a human-readable string with the PID and process name, or an error message.
1022func getPortOwnerInfo(port string) string {
1023 // Use lsof to find the process using the port
1024 cmd := exec.Command("lsof", "-i", ":"+port, "-sTCP:LISTEN", "-n", "-P")
1025 output, err := cmd.Output()
1026 if err != nil {
1027 return fmt.Sprintf("(unable to determine: %v)", err)
1028 }
1029
1030 lines := strings.Split(strings.TrimSpace(string(output)), "\n")
1031 if len(lines) < 2 {
1032 return "(no process found)"
1033 }
1034
1035 // Parse lsof output: COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
1036 // Skip the header line
1037 for _, line := range lines[1:] {
1038 fields := strings.Fields(line)
1039 if len(fields) >= 2 {
1040 command := fields[0]
1041 pid := fields[1]
1042 return fmt.Sprintf("pid=%s process=%s", pid, command)
1043 }
1044 }
1045
1046 return "(could not parse lsof output)"
1047}