1package server
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "log/slog"
8 "net"
9 "net/http"
10 "os"
11 "os/exec"
12 "os/signal"
13 "path/filepath"
14 "strings"
15 "sync"
16 "syscall"
17 "time"
18
19 "tailscale.com/util/singleflight"
20
21 "shelley.exe.dev/claudetool"
22 "shelley.exe.dev/db"
23 "shelley.exe.dev/db/generated"
24 "shelley.exe.dev/llm"
25 "shelley.exe.dev/models"
26 "shelley.exe.dev/ui"
27)
28
29// APIMessage is the message format sent to clients
30// TODO: We could maybe omit llm_data when display_data is available
31type APIMessage struct {
32 MessageID string `json:"message_id"`
33 ConversationID string `json:"conversation_id"`
34 SequenceID int64 `json:"sequence_id"`
35 Type string `json:"type"`
36 LlmData *string `json:"llm_data,omitempty"`
37 UserData *string `json:"user_data,omitempty"`
38 UsageData *string `json:"usage_data,omitempty"`
39 CreatedAt time.Time `json:"created_at"`
40 DisplayData *string `json:"display_data,omitempty"`
41 EndOfTurn *bool `json:"end_of_turn,omitempty"`
42}
43
44// ConversationState represents the current state of a conversation.
45// This is broadcast to all subscribers whenever the state changes.
46type ConversationState struct {
47 ConversationID string `json:"conversation_id"`
48 Working bool `json:"working"`
49 Model string `json:"model,omitempty"`
50}
51
52// ConversationWithState combines a conversation with its working state.
53type ConversationWithState struct {
54 generated.Conversation
55 Working bool `json:"working"`
56}
57
58// StreamResponse represents the response format for conversation streaming
59type StreamResponse struct {
60 Messages []APIMessage `json:"messages"`
61 Conversation generated.Conversation `json:"conversation"`
62 ConversationState *ConversationState `json:"conversation_state,omitempty"`
63 ContextWindowSize uint64 `json:"context_window_size,omitempty"`
64 // ConversationListUpdate is set when another conversation in the list changed
65 ConversationListUpdate *ConversationListUpdate `json:"conversation_list_update,omitempty"`
66}
67
68// LLMProvider is an interface for getting LLM services
69type LLMProvider interface {
70 GetService(modelID string) (llm.Service, error)
71 GetAvailableModels() []string
72 HasModel(modelID string) bool
73 GetModelInfo(modelID string) *models.ModelInfo
74 RefreshCustomModels() error
75}
76
77// NewLLMServiceManager creates a new LLM service manager from config
78func NewLLMServiceManager(cfg *LLMConfig) LLMProvider {
79 // Convert LLMConfig to models.Config
80 modelConfig := &models.Config{
81 AnthropicAPIKey: cfg.AnthropicAPIKey,
82 OpenAIAPIKey: cfg.OpenAIAPIKey,
83 GeminiAPIKey: cfg.GeminiAPIKey,
84 FireworksAPIKey: cfg.FireworksAPIKey,
85 Gateway: cfg.Gateway,
86 Logger: cfg.Logger,
87 DB: cfg.DB,
88 }
89
90 manager, err := models.NewManager(modelConfig)
91 if err != nil {
92 // This shouldn't happen in practice, but handle it gracefully
93 cfg.Logger.Error("Failed to create models manager", "error", err)
94 }
95
96 return manager
97}
98
99// toAPIMessages converts database messages to API messages.
100// When display_data is present (tool results), llm_data is omitted to save bandwidth
101// since the display_data contains all information needed for UI rendering.
102func toAPIMessages(messages []generated.Message) []APIMessage {
103 apiMessages := make([]APIMessage, len(messages))
104 for i, msg := range messages {
105 var endOfTurnPtr *bool
106 if msg.LlmData != nil && msg.Type == string(db.MessageTypeAgent) {
107 if endOfTurn, ok := extractEndOfTurn(*msg.LlmData); ok {
108 endOfTurnCopy := endOfTurn
109 endOfTurnPtr = &endOfTurnCopy
110 }
111 }
112
113 // TODO: Consider omitting llm_data when display_data is present to save bandwidth.
114 // The display_data contains all info needed for UI rendering of tool results,
115 // but the UI currently still uses llm_data for some checks.
116
117 apiMsg := APIMessage{
118 MessageID: msg.MessageID,
119 ConversationID: msg.ConversationID,
120 SequenceID: msg.SequenceID,
121 Type: msg.Type,
122 LlmData: msg.LlmData,
123 UserData: msg.UserData,
124 UsageData: msg.UsageData,
125 CreatedAt: msg.CreatedAt,
126 DisplayData: msg.DisplayData,
127 EndOfTurn: endOfTurnPtr,
128 }
129 apiMessages[i] = apiMsg
130 }
131 return apiMessages
132}
133
134func extractEndOfTurn(raw string) (bool, bool) {
135 var message llm.Message
136 if err := json.Unmarshal([]byte(raw), &message); err != nil {
137 return false, false
138 }
139 return message.EndOfTurn, true
140}
141
142// calculateContextWindowSize returns the context window usage from the most recent message with non-zero usage.
143// Each API call's input tokens represent the full conversation history sent to the model,
144// so we only need the last message's tokens (not accumulated across all messages).
145// The total input includes regular input tokens plus cached tokens (both read and created).
146// Messages without usage data (user messages, tool messages, etc.) are skipped.
147func calculateContextWindowSize(messages []APIMessage) uint64 {
148 // Find the last message with non-zero usage data
149 for i := len(messages) - 1; i >= 0; i-- {
150 msg := messages[i]
151 if msg.UsageData == nil {
152 continue
153 }
154 var usage llm.Usage
155 if err := json.Unmarshal([]byte(*msg.UsageData), &usage); err != nil {
156 continue
157 }
158 ctxUsed := usage.ContextWindowUsed()
159 if ctxUsed == 0 {
160 continue
161 }
162 // Return total context window used: all input tokens + output tokens
163 // This represents the full context that would be sent for the next turn
164 return ctxUsed
165 }
166 return 0
167}
168
169// isAgentEndOfTurn checks if a message is an agent or error message with end_of_turn=true.
170// This indicates the agent loop has finished processing.
171func isAgentEndOfTurn(msg *generated.Message) bool {
172 if msg == nil {
173 return false
174 }
175 // Agent and error messages can have end_of_turn
176 if msg.Type != string(db.MessageTypeAgent) && msg.Type != string(db.MessageTypeError) {
177 return false
178 }
179 if msg.LlmData == nil {
180 return false
181 }
182 endOfTurn, ok := extractEndOfTurn(*msg.LlmData)
183 if !ok {
184 return false
185 }
186 return endOfTurn
187}
188
189// calculateContextWindowSizeFromMsg calculates context window usage from a single message.
190// Returns 0 if the message has no usage data (e.g., user messages), in which case
191// the client should keep its previous context window value.
192func calculateContextWindowSizeFromMsg(msg *generated.Message) uint64 {
193 if msg == nil || msg.UsageData == nil {
194 return 0
195 }
196 var usage llm.Usage
197 if err := json.Unmarshal([]byte(*msg.UsageData), &usage); err != nil {
198 return 0
199 }
200 return usage.ContextWindowUsed()
201}
202
203// ConversationListUpdate represents an update to the conversation list
204type ConversationListUpdate struct {
205 Type string `json:"type"` // "update", "delete"
206 Conversation *generated.Conversation `json:"conversation,omitempty"`
207 ConversationID string `json:"conversation_id,omitempty"` // For deletes
208}
209
210// Server manages the HTTP API and active conversations
211type Server struct {
212 db *db.DB
213 llmManager LLMProvider
214 toolSetConfig claudetool.ToolSetConfig
215 activeConversations map[string]*ConversationManager
216 mu sync.Mutex
217 logger *slog.Logger
218 predictableOnly bool
219 terminalURL string
220 defaultModel string
221 links []Link
222 requireHeader string
223 conversationGroup singleflight.Group[string, *ConversationManager]
224 versionChecker *VersionChecker
225}
226
227// NewServer creates a new server instance
228func NewServer(database *db.DB, llmManager LLMProvider, toolSetConfig claudetool.ToolSetConfig, logger *slog.Logger, predictableOnly bool, terminalURL, defaultModel, requireHeader string, links []Link) *Server {
229 s := &Server{
230 db: database,
231 llmManager: llmManager,
232 toolSetConfig: toolSetConfig,
233 activeConversations: make(map[string]*ConversationManager),
234 logger: logger,
235 predictableOnly: predictableOnly,
236 terminalURL: terminalURL,
237 defaultModel: defaultModel,
238 requireHeader: requireHeader,
239 links: links,
240 versionChecker: NewVersionChecker(),
241 }
242
243 // Set up subagent support
244 s.toolSetConfig.SubagentRunner = NewSubagentRunner(s)
245 s.toolSetConfig.SubagentDB = &db.SubagentDBAdapter{DB: database}
246
247 return s
248}
249
250// RegisterRoutes registers HTTP routes on the given mux
251func (s *Server) RegisterRoutes(mux *http.ServeMux) {
252 // API routes - wrap with gzip where beneficial
253 mux.Handle("/api/conversations", gzipHandler(http.HandlerFunc(s.handleConversations)))
254 mux.Handle("/api/conversations/archived", gzipHandler(http.HandlerFunc(s.handleArchivedConversations)))
255 mux.Handle("/api/conversations/new", http.HandlerFunc(s.handleNewConversation)) // Small response
256 mux.Handle("/api/conversations/continue", http.HandlerFunc(s.handleContinueConversation)) // Small response
257 mux.Handle("/api/conversation/", http.StripPrefix("/api/conversation", s.conversationMux()))
258 mux.Handle("/api/conversation-by-slug/", gzipHandler(http.HandlerFunc(s.handleConversationBySlug)))
259 mux.Handle("/api/validate-cwd", http.HandlerFunc(s.handleValidateCwd)) // Small response
260 mux.Handle("/api/list-directory", gzipHandler(http.HandlerFunc(s.handleListDirectory)))
261 mux.Handle("/api/create-directory", http.HandlerFunc(s.handleCreateDirectory))
262 mux.Handle("/api/git/diffs", gzipHandler(http.HandlerFunc(s.handleGitDiffs)))
263 mux.Handle("/api/git/diffs/", gzipHandler(http.HandlerFunc(s.handleGitDiffFiles)))
264 mux.Handle("/api/git/file-diff/", gzipHandler(http.HandlerFunc(s.handleGitFileDiff)))
265 mux.HandleFunc("/api/upload", s.handleUpload) // Binary uploads
266 mux.HandleFunc("/api/read", s.handleRead) // Serves images
267 mux.Handle("/api/write-file", http.HandlerFunc(s.handleWriteFile)) // Small response
268 mux.HandleFunc("/api/exec-ws", s.handleExecWS) // Websocket for shell commands
269
270 // Custom models API
271 mux.Handle("/api/custom-models", http.HandlerFunc(s.handleCustomModels))
272 mux.Handle("/api/custom-models/", http.HandlerFunc(s.handleCustomModel))
273 mux.Handle("/api/custom-models-test", http.HandlerFunc(s.handleTestModel))
274
275 // Models API (dynamic list refresh)
276 mux.Handle("/api/models", http.HandlerFunc(s.handleModels))
277
278 // Version endpoints
279 mux.Handle("GET /version", http.HandlerFunc(s.handleVersion))
280 mux.Handle("GET /version-check", http.HandlerFunc(s.handleVersionCheck))
281 mux.Handle("GET /version-changelog", http.HandlerFunc(s.handleVersionChangelog))
282 mux.Handle("POST /upgrade", http.HandlerFunc(s.handleUpgrade))
283 mux.Handle("POST /exit", http.HandlerFunc(s.handleExit))
284
285 // Debug endpoints
286 mux.Handle("GET /debug/llm_requests", http.HandlerFunc(s.handleDebugLLMRequests))
287 mux.Handle("GET /debug/llm_requests/api", http.HandlerFunc(s.handleDebugLLMRequestsAPI))
288 mux.Handle("GET /debug/llm_requests/{id}/request", http.HandlerFunc(s.handleDebugLLMRequestBody))
289 mux.Handle("GET /debug/llm_requests/{id}/request_full", http.HandlerFunc(s.handleDebugLLMRequestBodyFull))
290 mux.Handle("GET /debug/llm_requests/{id}/response", http.HandlerFunc(s.handleDebugLLMResponseBody))
291
292 // Serve embedded UI assets
293 mux.Handle("/", s.staticHandler(ui.Assets()))
294}
295
296// handleValidateCwd validates that a path exists and is a directory
297func (s *Server) handleValidateCwd(w http.ResponseWriter, r *http.Request) {
298 if r.Method != http.MethodGet {
299 http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
300 return
301 }
302
303 path := r.URL.Query().Get("path")
304 if path == "" {
305 w.Header().Set("Content-Type", "application/json")
306 json.NewEncoder(w).Encode(map[string]interface{}{
307 "valid": false,
308 "error": "path is required",
309 })
310 return
311 }
312
313 info, err := os.Stat(path)
314 if err != nil {
315 w.Header().Set("Content-Type", "application/json")
316 if os.IsNotExist(err) {
317 json.NewEncoder(w).Encode(map[string]interface{}{
318 "valid": false,
319 "error": "directory does not exist",
320 })
321 } else {
322 json.NewEncoder(w).Encode(map[string]interface{}{
323 "valid": false,
324 "error": err.Error(),
325 })
326 }
327 return
328 }
329
330 if !info.IsDir() {
331 w.Header().Set("Content-Type", "application/json")
332 json.NewEncoder(w).Encode(map[string]interface{}{
333 "valid": false,
334 "error": "path is not a directory",
335 })
336 return
337 }
338
339 w.Header().Set("Content-Type", "application/json")
340 json.NewEncoder(w).Encode(map[string]interface{}{
341 "valid": true,
342 })
343}
344
345// DirectoryEntry represents a single directory entry for the directory picker
346type DirectoryEntry struct {
347 Name string `json:"name"`
348 IsDir bool `json:"is_dir"`
349 GitHeadSubject string `json:"git_head_subject,omitempty"`
350}
351
352// ListDirectoryResponse is the response from the list-directory endpoint
353type ListDirectoryResponse struct {
354 Path string `json:"path"`
355 Parent string `json:"parent"`
356 Entries []DirectoryEntry `json:"entries"`
357 GitHeadSubject string `json:"git_head_subject,omitempty"`
358}
359
360// handleListDirectory lists the contents of a directory for the directory picker
361func (s *Server) handleListDirectory(w http.ResponseWriter, r *http.Request) {
362 if r.Method != http.MethodGet {
363 http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
364 return
365 }
366
367 path := r.URL.Query().Get("path")
368 if path == "" {
369 // Default to home directory or root
370 homeDir, err := os.UserHomeDir()
371 if err != nil {
372 path = "/"
373 } else {
374 path = homeDir
375 }
376 }
377
378 // Clean and resolve the path
379 path = filepath.Clean(path)
380
381 // Verify path exists and is a directory
382 info, err := os.Stat(path)
383 if err != nil {
384 w.Header().Set("Content-Type", "application/json")
385 if os.IsNotExist(err) {
386 json.NewEncoder(w).Encode(map[string]interface{}{
387 "error": "directory does not exist",
388 })
389 } else if os.IsPermission(err) {
390 json.NewEncoder(w).Encode(map[string]interface{}{
391 "error": "permission denied",
392 })
393 } else {
394 json.NewEncoder(w).Encode(map[string]interface{}{
395 "error": err.Error(),
396 })
397 }
398 return
399 }
400
401 if !info.IsDir() {
402 w.Header().Set("Content-Type", "application/json")
403 json.NewEncoder(w).Encode(map[string]interface{}{
404 "error": "path is not a directory",
405 })
406 return
407 }
408
409 // Read directory contents
410 dirEntries, err := os.ReadDir(path)
411 if err != nil {
412 w.Header().Set("Content-Type", "application/json")
413 if os.IsPermission(err) {
414 json.NewEncoder(w).Encode(map[string]interface{}{
415 "error": "permission denied",
416 })
417 } else {
418 json.NewEncoder(w).Encode(map[string]interface{}{
419 "error": err.Error(),
420 })
421 }
422 return
423 }
424
425 // Build response with only directories (for directory picker)
426 var entries []DirectoryEntry
427 for _, entry := range dirEntries {
428 // Only include directories
429 if entry.IsDir() {
430 dirEntry := DirectoryEntry{
431 Name: entry.Name(),
432 IsDir: true,
433 }
434
435 // Check if this is a git repo root and get HEAD commit subject
436 entryPath := filepath.Join(path, entry.Name())
437 if isGitRepo(entryPath) {
438 if subject := getGitHeadSubject(entryPath); subject != "" {
439 dirEntry.GitHeadSubject = subject
440 }
441 }
442
443 entries = append(entries, dirEntry)
444 }
445 }
446
447 // Calculate parent directory
448 parent := filepath.Dir(path)
449 if parent == path {
450 // At root, no parent
451 parent = ""
452 }
453
454 response := ListDirectoryResponse{
455 Path: path,
456 Parent: parent,
457 Entries: entries,
458 }
459
460 // Check if the current directory itself is a git repo
461 if isGitRepo(path) {
462 response.GitHeadSubject = getGitHeadSubject(path)
463 }
464
465 w.Header().Set("Content-Type", "application/json")
466 json.NewEncoder(w).Encode(response)
467}
468
469// getGitHeadSubject returns the subject line of HEAD commit for a git repository.
470// Returns empty string if unable to get the subject.
471// isGitRepo checks if the given path is a git repository root.
472// Returns true for both regular repos (.git directory) and worktrees (.git file with gitdir:).
473func isGitRepo(dirPath string) bool {
474 gitPath := filepath.Join(dirPath, ".git")
475 fi, err := os.Stat(gitPath)
476 if err != nil {
477 return false
478 }
479 if fi.IsDir() {
480 return true // regular .git directory
481 }
482 if fi.Mode().IsRegular() {
483 // Check if it's a worktree .git file
484 content, err := os.ReadFile(gitPath)
485 if err == nil && strings.HasPrefix(string(content), "gitdir:") {
486 return true
487 }
488 }
489 return false
490}
491
492// getGitHeadSubject returns the subject line of HEAD commit for a git repository.
493// Returns empty string if unable to get the subject.
494func getGitHeadSubject(repoPath string) string {
495 cmd := exec.Command("git", "log", "-1", "--format=%s")
496 cmd.Dir = repoPath
497 output, err := cmd.Output()
498 if err != nil {
499 return ""
500 }
501 return strings.TrimSpace(string(output))
502}
503
504// handleCreateDirectory creates a new directory
505func (s *Server) handleCreateDirectory(w http.ResponseWriter, r *http.Request) {
506 if r.Method != http.MethodPost {
507 http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
508 return
509 }
510
511 var req struct {
512 Path string `json:"path"`
513 }
514 if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
515 w.Header().Set("Content-Type", "application/json")
516 json.NewEncoder(w).Encode(map[string]interface{}{
517 "error": "invalid request body",
518 })
519 return
520 }
521
522 if req.Path == "" {
523 w.Header().Set("Content-Type", "application/json")
524 json.NewEncoder(w).Encode(map[string]interface{}{
525 "error": "path is required",
526 })
527 return
528 }
529
530 // Clean the path
531 path := filepath.Clean(req.Path)
532
533 // Check if path already exists
534 if _, err := os.Stat(path); err == nil {
535 w.Header().Set("Content-Type", "application/json")
536 json.NewEncoder(w).Encode(map[string]interface{}{
537 "error": "path already exists",
538 })
539 return
540 }
541
542 // Verify parent directory exists
543 parentDir := filepath.Dir(path)
544 if _, err := os.Stat(parentDir); os.IsNotExist(err) {
545 w.Header().Set("Content-Type", "application/json")
546 json.NewEncoder(w).Encode(map[string]interface{}{
547 "error": "parent directory does not exist",
548 })
549 return
550 }
551
552 // Create the directory (only the final directory, not parents)
553 if err := os.Mkdir(path, 0o755); err != nil {
554 w.Header().Set("Content-Type", "application/json")
555 if os.IsPermission(err) {
556 json.NewEncoder(w).Encode(map[string]interface{}{
557 "error": "permission denied",
558 })
559 } else {
560 json.NewEncoder(w).Encode(map[string]interface{}{
561 "error": err.Error(),
562 })
563 }
564 return
565 }
566
567 w.Header().Set("Content-Type", "application/json")
568 json.NewEncoder(w).Encode(map[string]interface{}{
569 "path": path,
570 })
571}
572
573// getOrCreateConversationManager gets an existing conversation manager or creates a new one.
574func (s *Server) getOrCreateConversationManager(ctx context.Context, conversationID string) (*ConversationManager, error) {
575 manager, err, _ := s.conversationGroup.Do(conversationID, func() (*ConversationManager, error) {
576 s.mu.Lock()
577 defer s.mu.Unlock()
578 if manager, exists := s.activeConversations[conversationID]; exists {
579 manager.Touch()
580 return manager, nil
581 }
582
583 recordMessage := func(ctx context.Context, message llm.Message, usage llm.Usage) error {
584 return s.recordMessage(ctx, conversationID, message, usage)
585 }
586
587 onStateChange := func(state ConversationState) {
588 s.publishConversationState(state)
589 }
590
591 manager := NewConversationManager(conversationID, s.db, s.logger, s.toolSetConfig, recordMessage, onStateChange)
592 if err := manager.Hydrate(ctx); err != nil {
593 return nil, err
594 }
595
596 s.activeConversations[conversationID] = manager
597 return manager, nil
598 })
599 if err != nil {
600 return nil, err
601 }
602 return manager, nil
603}
604
605// ExtractDisplayData extracts display data from message content for storage
606func ExtractDisplayData(message llm.Message) interface{} {
607 // Build a map of tool_use_id to tool_name for lookups
608 toolNameMap := make(map[string]string)
609 for _, content := range message.Content {
610 if content.Type == llm.ContentTypeToolUse {
611 toolNameMap[content.ID] = content.ToolName
612 }
613 }
614
615 var displayData []any
616 for _, content := range message.Content {
617 if content.Type == llm.ContentTypeToolResult && content.Display != nil {
618 // Include tool name if we can find it
619 toolName := toolNameMap[content.ToolUseID]
620 displayData = append(displayData, map[string]any{
621 "tool_use_id": content.ToolUseID,
622 "tool_name": toolName,
623 "display": content.Display,
624 })
625 }
626 }
627
628 if len(displayData) > 0 {
629 return displayData
630 }
631 return nil
632}
633
634// recordMessage records a new message to the database and also notifies subscribers
635func (s *Server) recordMessage(ctx context.Context, conversationID string, message llm.Message, usage llm.Usage) error {
636 // Log message based on role
637 if message.Role == llm.MessageRoleUser {
638 s.logger.Info("User message", "conversation_id", conversationID, "content_items", len(message.Content))
639 } else if message.Role == llm.MessageRoleAssistant {
640 s.logger.Info("Agent message", "conversation_id", conversationID, "content_items", len(message.Content), "end_of_turn", message.EndOfTurn)
641 }
642
643 // Convert LLM message to database format
644 messageType, err := s.getMessageType(message)
645 if err != nil {
646 return fmt.Errorf("failed to determine message type: %w", err)
647 }
648
649 // Extract display data from content items
650 displayDataToStore := ExtractDisplayData(message)
651
652 // Create message
653 createdMsg, err := s.db.CreateMessage(ctx, db.CreateMessageParams{
654 ConversationID: conversationID,
655 Type: messageType,
656 LLMData: message,
657 UserData: nil,
658 UsageData: usage,
659 DisplayData: displayDataToStore,
660 ExcludedFromContext: message.ExcludedFromContext,
661 })
662 if err != nil {
663 return fmt.Errorf("failed to create message: %w", err)
664 }
665
666 // Update conversation's last updated timestamp for correct ordering
667 if err := s.db.QueriesTx(ctx, func(q *generated.Queries) error {
668 return q.UpdateConversationTimestamp(ctx, conversationID)
669 }); err != nil {
670 s.logger.Warn("Failed to update conversation timestamp", "conversationID", conversationID, "error", err)
671 }
672
673 // Touch active manager activity time if present
674 s.mu.Lock()
675 mgr, ok := s.activeConversations[conversationID]
676 if ok {
677 mgr.Touch()
678 }
679 s.mu.Unlock()
680
681 // Notify subscribers with only the new message - use WithoutCancel because
682 // the HTTP request context may be cancelled after the handler returns, but
683 // we still want the notification to complete so SSE clients see the message immediately
684 go s.notifySubscribersNewMessage(context.WithoutCancel(ctx), conversationID, createdMsg)
685
686 return nil
687}
688
689// getMessageType determines the message type from an LLM message
690func (s *Server) getMessageType(message llm.Message) (db.MessageType, error) {
691 // System-generated errors are stored as error type
692 if message.ErrorType != llm.ErrorTypeNone {
693 return db.MessageTypeError, nil
694 }
695
696 switch message.Role {
697 case llm.MessageRoleUser:
698 return db.MessageTypeUser, nil
699 case llm.MessageRoleAssistant:
700 return db.MessageTypeAgent, nil
701 default:
702 // For tool messages, check if it's a tool call or tool result
703 for _, content := range message.Content {
704 if content.Type == llm.ContentTypeToolUse {
705 return db.MessageTypeTool, nil
706 }
707 if content.Type == llm.ContentTypeToolResult {
708 return db.MessageTypeTool, nil
709 }
710 }
711 return db.MessageTypeAgent, nil
712 }
713}
714
715// convertToLLMMessage converts a database message to an LLM message
716func convertToLLMMessage(msg generated.Message) (llm.Message, error) {
717 var llmMsg llm.Message
718 if msg.LlmData == nil {
719 return llm.Message{}, fmt.Errorf("message has no LLM data")
720 }
721 if err := json.Unmarshal([]byte(*msg.LlmData), &llmMsg); err != nil {
722 return llm.Message{}, fmt.Errorf("failed to unmarshal LLM data: %w", err)
723 }
724 return llmMsg, nil
725}
726
727// notifySubscribers sends conversation metadata updates (e.g., slug changes) to subscribers.
728// This is used when only the conversation data changes, not the messages.
729// Uses Broadcast instead of Publish to avoid racing with message sequence IDs.
730func (s *Server) notifySubscribers(ctx context.Context, conversationID string) {
731 s.mu.Lock()
732 manager, exists := s.activeConversations[conversationID]
733 s.mu.Unlock()
734
735 if !exists {
736 return
737 }
738
739 // Get conversation data only (no messages needed for metadata-only updates)
740 var conversation generated.Conversation
741 err := s.db.Queries(ctx, func(q *generated.Queries) error {
742 var err error
743 conversation, err = q.GetConversation(ctx, conversationID)
744 return err
745 })
746 if err != nil {
747 s.logger.Error("Failed to get conversation data for notification", "conversationID", conversationID, "error", err)
748 return
749 }
750
751 // Broadcast conversation update with no new messages.
752 // Using Broadcast instead of Publish ensures this metadata-only update
753 // doesn't race with notifySubscribersNewMessage which uses Publish with sequence IDs.
754 streamData := StreamResponse{
755 Messages: nil, // No new messages, just conversation update
756 Conversation: conversation,
757 }
758 manager.subpub.Broadcast(streamData)
759
760 // Also notify conversation list subscribers (e.g., slug change)
761 s.publishConversationListUpdate(ConversationListUpdate{
762 Type: "update",
763 Conversation: &conversation,
764 })
765}
766
767// notifySubscribersNewMessage sends a single new message to all subscribers.
768// This is more efficient than re-sending all messages on each update.
769func (s *Server) notifySubscribersNewMessage(ctx context.Context, conversationID string, newMsg *generated.Message) {
770 s.mu.Lock()
771 manager, exists := s.activeConversations[conversationID]
772 s.mu.Unlock()
773
774 if !exists {
775 return
776 }
777
778 // Get conversation data for the response
779 var conversation generated.Conversation
780 err := s.db.Queries(ctx, func(q *generated.Queries) error {
781 var err error
782 conversation, err = q.GetConversation(ctx, conversationID)
783 return err
784 })
785 if err != nil {
786 s.logger.Error("Failed to get conversation data for notification", "conversationID", conversationID, "error", err)
787 return
788 }
789
790 // Convert the single new message to API format
791 apiMessages := toAPIMessages([]generated.Message{*newMsg})
792
793 // Update agent working state based on message type
794 if isAgentEndOfTurn(newMsg) {
795 manager.SetAgentWorking(false)
796 }
797
798 // Publish only the new message
799 streamData := StreamResponse{
800 Messages: apiMessages,
801 Conversation: conversation,
802 // ContextWindowSize: 0 for messages without usage data (user/tool messages).
803 // With omitempty, 0 is omitted from JSON, so the UI keeps its cached value.
804 // Only agent messages have usage data, so context window updates when they arrive.
805 ContextWindowSize: calculateContextWindowSizeFromMsg(newMsg),
806 }
807 manager.subpub.Publish(newMsg.SequenceID, streamData)
808
809 // Also notify conversation list subscribers about the update (updated_at changed)
810 s.publishConversationListUpdate(ConversationListUpdate{
811 Type: "update",
812 Conversation: &conversation,
813 })
814}
815
816// publishConversationListUpdate broadcasts a conversation list update to ALL active
817// conversation streams. This allows clients to receive updates about other conversations
818// while they're subscribed to their current conversation's stream.
819func (s *Server) publishConversationListUpdate(update ConversationListUpdate) {
820 s.mu.Lock()
821 defer s.mu.Unlock()
822
823 // Broadcast to all active conversation managers
824 for _, manager := range s.activeConversations {
825 streamData := StreamResponse{
826 ConversationListUpdate: &update,
827 }
828 manager.subpub.Broadcast(streamData)
829 }
830}
831
832// publishConversationState broadcasts a conversation state update to ALL active
833// conversation streams. This allows clients to see the working state of other conversations.
834func (s *Server) publishConversationState(state ConversationState) {
835 s.mu.Lock()
836 defer s.mu.Unlock()
837
838 // Broadcast to all active conversation managers
839 for _, manager := range s.activeConversations {
840 streamData := StreamResponse{
841 ConversationState: &state,
842 }
843 manager.subpub.Broadcast(streamData)
844 }
845}
846
847// getWorkingConversations returns a map of conversation IDs that are currently working.
848func (s *Server) getWorkingConversations() map[string]bool {
849 s.mu.Lock()
850 defer s.mu.Unlock()
851
852 working := make(map[string]bool)
853 for id, manager := range s.activeConversations {
854 if manager.IsAgentWorking() {
855 working[id] = true
856 }
857 }
858 return working
859}
860
861// IsAgentWorking returns whether the agent is currently working on the given conversation.
862// Returns false if the conversation doesn't have an active manager.
863func (s *Server) IsAgentWorking(conversationID string) bool {
864 s.mu.Lock()
865 manager, exists := s.activeConversations[conversationID]
866 s.mu.Unlock()
867 if !exists {
868 return false
869 }
870 return manager.IsAgentWorking()
871}
872
873// Cleanup removes inactive conversation managers
874func (s *Server) Cleanup() {
875 s.mu.Lock()
876 defer s.mu.Unlock()
877
878 now := time.Now()
879 for id, manager := range s.activeConversations {
880 // Remove managers that have been inactive for more than 30 minutes
881 manager.mu.Lock()
882 lastActivity := manager.lastActivity
883 manager.mu.Unlock()
884 if now.Sub(lastActivity) > 30*time.Minute {
885 manager.stopLoop()
886 delete(s.activeConversations, id)
887 s.logger.Debug("Cleaned up inactive conversation", "conversationID", id)
888 }
889 }
890}
891
892// Start starts the HTTP server and handles the complete lifecycle
893func (s *Server) Start(port string) error {
894 listener, err := net.Listen("tcp", ":"+port)
895 if err != nil {
896 s.logger.Error("Failed to create listener", "error", err, "port_info", getPortOwnerInfo(port))
897 return err
898 }
899 return s.StartWithListener(listener)
900}
901
902// StartWithListener starts the HTTP server using the provided listener.
903// This is useful for systemd socket activation where the listener is created externally.
904func (s *Server) StartWithListener(listener net.Listener) error {
905 // Set up HTTP server with routes and middleware
906 mux := http.NewServeMux()
907 s.RegisterRoutes(mux)
908
909 // Add middleware (applied in reverse order: last added = first executed)
910 handler := LoggerMiddleware(s.logger)(mux)
911 handler = CSRFMiddleware()(handler)
912 if s.requireHeader != "" {
913 handler = RequireHeaderMiddleware(s.requireHeader)(handler)
914 }
915
916 httpServer := &http.Server{
917 Handler: handler,
918 }
919
920 // Start cleanup routine
921 go func() {
922 ticker := time.NewTicker(5 * time.Minute)
923 defer ticker.Stop()
924 for range ticker.C {
925 s.Cleanup()
926 }
927 }()
928
929 // Get actual port from listener
930 actualPort := listener.Addr().(*net.TCPAddr).Port
931
932 // Start server in goroutine
933 serverErrCh := make(chan error, 1)
934 go func() {
935 s.logger.Info("Server starting", "port", actualPort, "url", fmt.Sprintf("http://localhost:%d", actualPort))
936 if err := httpServer.Serve(listener); err != nil && err != http.ErrServerClosed {
937 serverErrCh <- err
938 }
939 }()
940
941 // Wait for shutdown signal or server error
942 quit := make(chan os.Signal, 1)
943 signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
944
945 select {
946 case err := <-serverErrCh:
947 s.logger.Error("Server failed", "error", err)
948 return err
949 case <-quit:
950 s.logger.Info("Shutting down server")
951 }
952
953 // Graceful shutdown
954 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
955 defer cancel()
956
957 if err := httpServer.Shutdown(ctx); err != nil {
958 s.logger.Error("Server forced to shutdown", "error", err)
959 return err
960 }
961
962 s.logger.Info("Server exited")
963 return nil
964}
965
966// getPortOwnerInfo tries to identify what process is using a port.
967// Returns a human-readable string with the PID and process name, or an error message.
968func getPortOwnerInfo(port string) string {
969 // Use lsof to find the process using the port
970 cmd := exec.Command("lsof", "-i", ":"+port, "-sTCP:LISTEN", "-n", "-P")
971 output, err := cmd.Output()
972 if err != nil {
973 return fmt.Sprintf("(unable to determine: %v)", err)
974 }
975
976 lines := strings.Split(strings.TrimSpace(string(output)), "\n")
977 if len(lines) < 2 {
978 return "(no process found)"
979 }
980
981 // Parse lsof output: COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
982 // Skip the header line
983 for _, line := range lines[1:] {
984 fields := strings.Fields(line)
985 if len(fields) >= 2 {
986 command := fields[0]
987 pid := fields[1]
988 return fmt.Sprintf("pid=%s process=%s", pid, command)
989 }
990 }
991
992 return "(could not parse lsof output)"
993}