server.go

  1package server
  2
  3import (
  4	"context"
  5	"encoding/json"
  6	"fmt"
  7	"log/slog"
  8	"net"
  9	"net/http"
 10	"os"
 11	"os/exec"
 12	"os/signal"
 13	"path/filepath"
 14	"strings"
 15	"sync"
 16	"syscall"
 17	"time"
 18
 19	"tailscale.com/util/singleflight"
 20
 21	"shelley.exe.dev/claudetool"
 22	"shelley.exe.dev/db"
 23	"shelley.exe.dev/db/generated"
 24	"shelley.exe.dev/llm"
 25	"shelley.exe.dev/models"
 26	"shelley.exe.dev/ui"
 27)
 28
 29// APIMessage is the message format sent to clients
 30// TODO: We could maybe omit llm_data when display_data is available
 31type APIMessage struct {
 32	MessageID      string    `json:"message_id"`
 33	ConversationID string    `json:"conversation_id"`
 34	SequenceID     int64     `json:"sequence_id"`
 35	Type           string    `json:"type"`
 36	LlmData        *string   `json:"llm_data,omitempty"`
 37	UserData       *string   `json:"user_data,omitempty"`
 38	UsageData      *string   `json:"usage_data,omitempty"`
 39	CreatedAt      time.Time `json:"created_at"`
 40	DisplayData    *string   `json:"display_data,omitempty"`
 41	EndOfTurn      *bool     `json:"end_of_turn,omitempty"`
 42}
 43
 44// ConversationState represents the current state of a conversation.
 45// This is broadcast to all subscribers whenever the state changes.
 46type ConversationState struct {
 47	ConversationID string `json:"conversation_id"`
 48	Working        bool   `json:"working"`
 49	Model          string `json:"model,omitempty"`
 50}
 51
 52// ConversationWithState combines a conversation with its working state.
 53type ConversationWithState struct {
 54	generated.Conversation
 55	Working bool `json:"working"`
 56}
 57
 58// StreamResponse represents the response format for conversation streaming
 59type StreamResponse struct {
 60	Messages          []APIMessage           `json:"messages"`
 61	Conversation      generated.Conversation `json:"conversation"`
 62	ConversationState *ConversationState     `json:"conversation_state,omitempty"`
 63	ContextWindowSize uint64                 `json:"context_window_size,omitempty"`
 64	// ConversationListUpdate is set when another conversation in the list changed
 65	ConversationListUpdate *ConversationListUpdate `json:"conversation_list_update,omitempty"`
 66	// Heartbeat indicates this is a heartbeat message (no new data, just keeping connection alive)
 67	Heartbeat bool `json:"heartbeat,omitempty"`
 68}
 69
 70// LLMProvider is an interface for getting LLM services
 71type LLMProvider interface {
 72	GetService(modelID string) (llm.Service, error)
 73	GetAvailableModels() []string
 74	HasModel(modelID string) bool
 75	GetModelInfo(modelID string) *models.ModelInfo
 76	RefreshCustomModels() error
 77}
 78
 79// NewLLMServiceManager creates a new LLM service manager from config
 80func NewLLMServiceManager(cfg *LLMConfig) LLMProvider {
 81	// Convert LLMConfig to models.Config
 82	modelConfig := &models.Config{
 83		AnthropicAPIKey: cfg.AnthropicAPIKey,
 84		OpenAIAPIKey:    cfg.OpenAIAPIKey,
 85		GeminiAPIKey:    cfg.GeminiAPIKey,
 86		FireworksAPIKey: cfg.FireworksAPIKey,
 87		Gateway:         cfg.Gateway,
 88		Logger:          cfg.Logger,
 89		DB:              cfg.DB,
 90	}
 91
 92	manager, err := models.NewManager(modelConfig)
 93	if err != nil {
 94		// This shouldn't happen in practice, but handle it gracefully
 95		cfg.Logger.Error("Failed to create models manager", "error", err)
 96	}
 97
 98	return manager
 99}
100
101// toAPIMessages converts database messages to API messages.
102// When display_data is present (tool results), llm_data is omitted to save bandwidth
103// since the display_data contains all information needed for UI rendering.
104func toAPIMessages(messages []generated.Message) []APIMessage {
105	apiMessages := make([]APIMessage, len(messages))
106	for i, msg := range messages {
107		var endOfTurnPtr *bool
108		if msg.LlmData != nil && msg.Type == string(db.MessageTypeAgent) {
109			if endOfTurn, ok := extractEndOfTurn(*msg.LlmData); ok {
110				endOfTurnCopy := endOfTurn
111				endOfTurnPtr = &endOfTurnCopy
112			}
113		}
114
115		// TODO: Consider omitting llm_data when display_data is present to save bandwidth.
116		// The display_data contains all info needed for UI rendering of tool results,
117		// but the UI currently still uses llm_data for some checks.
118
119		apiMsg := APIMessage{
120			MessageID:      msg.MessageID,
121			ConversationID: msg.ConversationID,
122			SequenceID:     msg.SequenceID,
123			Type:           msg.Type,
124			LlmData:        msg.LlmData,
125			UserData:       msg.UserData,
126			UsageData:      msg.UsageData,
127			CreatedAt:      msg.CreatedAt,
128			DisplayData:    msg.DisplayData,
129			EndOfTurn:      endOfTurnPtr,
130		}
131		apiMessages[i] = apiMsg
132	}
133	return apiMessages
134}
135
136func extractEndOfTurn(raw string) (bool, bool) {
137	var message llm.Message
138	if err := json.Unmarshal([]byte(raw), &message); err != nil {
139		return false, false
140	}
141	return message.EndOfTurn, true
142}
143
144// calculateContextWindowSize returns the context window usage from the most recent message with non-zero usage.
145// Each API call's input tokens represent the full conversation history sent to the model,
146// so we only need the last message's tokens (not accumulated across all messages).
147// The total input includes regular input tokens plus cached tokens (both read and created).
148// Messages without usage data (user messages, tool messages, etc.) are skipped.
149func calculateContextWindowSize(messages []APIMessage) uint64 {
150	// Find the last message with non-zero usage data
151	for i := len(messages) - 1; i >= 0; i-- {
152		msg := messages[i]
153		if msg.UsageData == nil {
154			continue
155		}
156		var usage llm.Usage
157		if err := json.Unmarshal([]byte(*msg.UsageData), &usage); err != nil {
158			continue
159		}
160		ctxUsed := usage.ContextWindowUsed()
161		if ctxUsed == 0 {
162			continue
163		}
164		// Return total context window used: all input tokens + output tokens
165		// This represents the full context that would be sent for the next turn
166		return ctxUsed
167	}
168	return 0
169}
170
171// isAgentEndOfTurn checks if a message is an agent or error message with end_of_turn=true.
172// This indicates the agent loop has finished processing.
173func isAgentEndOfTurn(msg *generated.Message) bool {
174	if msg == nil {
175		return false
176	}
177	// Agent and error messages can have end_of_turn
178	if msg.Type != string(db.MessageTypeAgent) && msg.Type != string(db.MessageTypeError) {
179		return false
180	}
181	if msg.LlmData == nil {
182		return false
183	}
184	endOfTurn, ok := extractEndOfTurn(*msg.LlmData)
185	if !ok {
186		return false
187	}
188	return endOfTurn
189}
190
191// calculateContextWindowSizeFromMsg calculates context window usage from a single message.
192// Returns 0 if the message has no usage data (e.g., user messages), in which case
193// the client should keep its previous context window value.
194func calculateContextWindowSizeFromMsg(msg *generated.Message) uint64 {
195	if msg == nil || msg.UsageData == nil {
196		return 0
197	}
198	var usage llm.Usage
199	if err := json.Unmarshal([]byte(*msg.UsageData), &usage); err != nil {
200		return 0
201	}
202	return usage.ContextWindowUsed()
203}
204
205// ConversationListUpdate represents an update to the conversation list
206type ConversationListUpdate struct {
207	Type           string                  `json:"type"` // "update", "delete"
208	Conversation   *generated.Conversation `json:"conversation,omitempty"`
209	ConversationID string                  `json:"conversation_id,omitempty"` // For deletes
210}
211
212// Server manages the HTTP API and active conversations
213type Server struct {
214	db                  *db.DB
215	llmManager          LLMProvider
216	toolSetConfig       claudetool.ToolSetConfig
217	activeConversations map[string]*ConversationManager
218	mu                  sync.Mutex
219	logger              *slog.Logger
220	predictableOnly     bool
221	terminalURL         string
222	defaultModel        string
223	links               []Link
224	requireHeader       string
225	conversationGroup   singleflight.Group[string, *ConversationManager]
226	versionChecker      *VersionChecker
227}
228
229// NewServer creates a new server instance
230func NewServer(database *db.DB, llmManager LLMProvider, toolSetConfig claudetool.ToolSetConfig, logger *slog.Logger, predictableOnly bool, terminalURL, defaultModel, requireHeader string, links []Link) *Server {
231	s := &Server{
232		db:                  database,
233		llmManager:          llmManager,
234		toolSetConfig:       toolSetConfig,
235		activeConversations: make(map[string]*ConversationManager),
236		logger:              logger,
237		predictableOnly:     predictableOnly,
238		terminalURL:         terminalURL,
239		defaultModel:        defaultModel,
240		requireHeader:       requireHeader,
241		links:               links,
242		versionChecker:      NewVersionChecker(),
243	}
244
245	// Set up subagent support
246	s.toolSetConfig.SubagentRunner = NewSubagentRunner(s)
247	s.toolSetConfig.SubagentDB = &db.SubagentDBAdapter{DB: database}
248
249	return s
250}
251
252// RegisterRoutes registers HTTP routes on the given mux
253func (s *Server) RegisterRoutes(mux *http.ServeMux) {
254	// API routes - wrap with gzip where beneficial
255	mux.Handle("/api/conversations", gzipHandler(http.HandlerFunc(s.handleConversations)))
256	mux.Handle("/api/conversations/archived", gzipHandler(http.HandlerFunc(s.handleArchivedConversations)))
257	mux.Handle("/api/conversations/new", http.HandlerFunc(s.handleNewConversation))           // Small response
258	mux.Handle("/api/conversations/continue", http.HandlerFunc(s.handleContinueConversation)) // Small response
259	mux.Handle("/api/conversation/", http.StripPrefix("/api/conversation", s.conversationMux()))
260	mux.Handle("/api/conversation-by-slug/", gzipHandler(http.HandlerFunc(s.handleConversationBySlug)))
261	mux.Handle("/api/validate-cwd", http.HandlerFunc(s.handleValidateCwd)) // Small response
262	mux.Handle("/api/list-directory", gzipHandler(http.HandlerFunc(s.handleListDirectory)))
263	mux.Handle("/api/create-directory", http.HandlerFunc(s.handleCreateDirectory))
264	mux.Handle("/api/git/diffs", gzipHandler(http.HandlerFunc(s.handleGitDiffs)))
265	mux.Handle("/api/git/diffs/", gzipHandler(http.HandlerFunc(s.handleGitDiffFiles)))
266	mux.Handle("/api/git/file-diff/", gzipHandler(http.HandlerFunc(s.handleGitFileDiff)))
267	mux.HandleFunc("/api/upload", s.handleUpload)                      // Binary uploads
268	mux.HandleFunc("/api/read", s.handleRead)                          // Serves images
269	mux.Handle("/api/write-file", http.HandlerFunc(s.handleWriteFile)) // Small response
270	mux.HandleFunc("/api/exec-ws", s.handleExecWS)                     // Websocket for shell commands
271
272	// Custom models API
273	mux.Handle("/api/custom-models", http.HandlerFunc(s.handleCustomModels))
274	mux.Handle("/api/custom-models/", http.HandlerFunc(s.handleCustomModel))
275	mux.Handle("/api/custom-models-test", http.HandlerFunc(s.handleTestModel))
276
277	// Models API (dynamic list refresh)
278	mux.Handle("/api/models", http.HandlerFunc(s.handleModels))
279
280	// Version endpoints
281	mux.Handle("GET /version", http.HandlerFunc(s.handleVersion))
282	mux.Handle("GET /version-check", http.HandlerFunc(s.handleVersionCheck))
283	mux.Handle("GET /version-changelog", http.HandlerFunc(s.handleVersionChangelog))
284	mux.Handle("POST /upgrade", http.HandlerFunc(s.handleUpgrade))
285	mux.Handle("POST /exit", http.HandlerFunc(s.handleExit))
286
287	// Debug endpoints
288	mux.Handle("GET /debug/conversations", http.HandlerFunc(s.handleDebugConversationsPage))
289	mux.Handle("GET /debug/llm_requests", http.HandlerFunc(s.handleDebugLLMRequests))
290	mux.Handle("GET /debug/llm_requests/api", http.HandlerFunc(s.handleDebugLLMRequestsAPI))
291	mux.Handle("GET /debug/llm_requests/{id}/request", http.HandlerFunc(s.handleDebugLLMRequestBody))
292	mux.Handle("GET /debug/llm_requests/{id}/request_full", http.HandlerFunc(s.handleDebugLLMRequestBodyFull))
293	mux.Handle("GET /debug/llm_requests/{id}/response", http.HandlerFunc(s.handleDebugLLMResponseBody))
294
295	// Serve embedded UI assets
296	mux.Handle("/", s.staticHandler(ui.Assets()))
297}
298
299// handleValidateCwd validates that a path exists and is a directory
300func (s *Server) handleValidateCwd(w http.ResponseWriter, r *http.Request) {
301	if r.Method != http.MethodGet {
302		http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
303		return
304	}
305
306	path := r.URL.Query().Get("path")
307	if path == "" {
308		w.Header().Set("Content-Type", "application/json")
309		json.NewEncoder(w).Encode(map[string]interface{}{
310			"valid": false,
311			"error": "path is required",
312		})
313		return
314	}
315
316	info, err := os.Stat(path)
317	if err != nil {
318		w.Header().Set("Content-Type", "application/json")
319		if os.IsNotExist(err) {
320			json.NewEncoder(w).Encode(map[string]interface{}{
321				"valid": false,
322				"error": "directory does not exist",
323			})
324		} else {
325			json.NewEncoder(w).Encode(map[string]interface{}{
326				"valid": false,
327				"error": err.Error(),
328			})
329		}
330		return
331	}
332
333	if !info.IsDir() {
334		w.Header().Set("Content-Type", "application/json")
335		json.NewEncoder(w).Encode(map[string]interface{}{
336			"valid": false,
337			"error": "path is not a directory",
338		})
339		return
340	}
341
342	w.Header().Set("Content-Type", "application/json")
343	json.NewEncoder(w).Encode(map[string]interface{}{
344		"valid": true,
345	})
346}
347
348// DirectoryEntry represents a single directory entry for the directory picker
349type DirectoryEntry struct {
350	Name           string `json:"name"`
351	IsDir          bool   `json:"is_dir"`
352	GitHeadSubject string `json:"git_head_subject,omitempty"`
353}
354
355// ListDirectoryResponse is the response from the list-directory endpoint
356type ListDirectoryResponse struct {
357	Path           string           `json:"path"`
358	Parent         string           `json:"parent"`
359	Entries        []DirectoryEntry `json:"entries"`
360	GitHeadSubject string           `json:"git_head_subject,omitempty"`
361}
362
363// handleListDirectory lists the contents of a directory for the directory picker
364func (s *Server) handleListDirectory(w http.ResponseWriter, r *http.Request) {
365	if r.Method != http.MethodGet {
366		http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
367		return
368	}
369
370	path := r.URL.Query().Get("path")
371	if path == "" {
372		// Default to home directory or root
373		homeDir, err := os.UserHomeDir()
374		if err != nil {
375			path = "/"
376		} else {
377			path = homeDir
378		}
379	}
380
381	// Clean and resolve the path
382	path = filepath.Clean(path)
383
384	// Verify path exists and is a directory
385	info, err := os.Stat(path)
386	if err != nil {
387		w.Header().Set("Content-Type", "application/json")
388		if os.IsNotExist(err) {
389			json.NewEncoder(w).Encode(map[string]interface{}{
390				"error": "directory does not exist",
391			})
392		} else if os.IsPermission(err) {
393			json.NewEncoder(w).Encode(map[string]interface{}{
394				"error": "permission denied",
395			})
396		} else {
397			json.NewEncoder(w).Encode(map[string]interface{}{
398				"error": err.Error(),
399			})
400		}
401		return
402	}
403
404	if !info.IsDir() {
405		w.Header().Set("Content-Type", "application/json")
406		json.NewEncoder(w).Encode(map[string]interface{}{
407			"error": "path is not a directory",
408		})
409		return
410	}
411
412	// Read directory contents
413	dirEntries, err := os.ReadDir(path)
414	if err != nil {
415		w.Header().Set("Content-Type", "application/json")
416		if os.IsPermission(err) {
417			json.NewEncoder(w).Encode(map[string]interface{}{
418				"error": "permission denied",
419			})
420		} else {
421			json.NewEncoder(w).Encode(map[string]interface{}{
422				"error": err.Error(),
423			})
424		}
425		return
426	}
427
428	// Build response with only directories (for directory picker)
429	var entries []DirectoryEntry
430	for _, entry := range dirEntries {
431		// Only include directories
432		if entry.IsDir() {
433			dirEntry := DirectoryEntry{
434				Name:  entry.Name(),
435				IsDir: true,
436			}
437
438			// Check if this is a git repo root and get HEAD commit subject
439			entryPath := filepath.Join(path, entry.Name())
440			if isGitRepo(entryPath) {
441				if subject := getGitHeadSubject(entryPath); subject != "" {
442					dirEntry.GitHeadSubject = subject
443				}
444			}
445
446			entries = append(entries, dirEntry)
447		}
448	}
449
450	// Calculate parent directory
451	parent := filepath.Dir(path)
452	if parent == path {
453		// At root, no parent
454		parent = ""
455	}
456
457	response := ListDirectoryResponse{
458		Path:    path,
459		Parent:  parent,
460		Entries: entries,
461	}
462
463	// Check if the current directory itself is a git repo
464	if isGitRepo(path) {
465		response.GitHeadSubject = getGitHeadSubject(path)
466	}
467
468	w.Header().Set("Content-Type", "application/json")
469	json.NewEncoder(w).Encode(response)
470}
471
472// getGitHeadSubject returns the subject line of HEAD commit for a git repository.
473// Returns empty string if unable to get the subject.
474// isGitRepo checks if the given path is a git repository root.
475// Returns true for both regular repos (.git directory) and worktrees (.git file with gitdir:).
476func isGitRepo(dirPath string) bool {
477	gitPath := filepath.Join(dirPath, ".git")
478	fi, err := os.Stat(gitPath)
479	if err != nil {
480		return false
481	}
482	if fi.IsDir() {
483		return true // regular .git directory
484	}
485	if fi.Mode().IsRegular() {
486		// Check if it's a worktree .git file
487		content, err := os.ReadFile(gitPath)
488		if err == nil && strings.HasPrefix(string(content), "gitdir:") {
489			return true
490		}
491	}
492	return false
493}
494
495// getGitHeadSubject returns the subject line of HEAD commit for a git repository.
496// Returns empty string if unable to get the subject.
497func getGitHeadSubject(repoPath string) string {
498	cmd := exec.Command("git", "log", "-1", "--format=%s")
499	cmd.Dir = repoPath
500	output, err := cmd.Output()
501	if err != nil {
502		return ""
503	}
504	return strings.TrimSpace(string(output))
505}
506
507// handleCreateDirectory creates a new directory
508func (s *Server) handleCreateDirectory(w http.ResponseWriter, r *http.Request) {
509	if r.Method != http.MethodPost {
510		http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
511		return
512	}
513
514	var req struct {
515		Path string `json:"path"`
516	}
517	if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
518		w.Header().Set("Content-Type", "application/json")
519		json.NewEncoder(w).Encode(map[string]interface{}{
520			"error": "invalid request body",
521		})
522		return
523	}
524
525	if req.Path == "" {
526		w.Header().Set("Content-Type", "application/json")
527		json.NewEncoder(w).Encode(map[string]interface{}{
528			"error": "path is required",
529		})
530		return
531	}
532
533	// Clean the path
534	path := filepath.Clean(req.Path)
535
536	// Check if path already exists
537	if _, err := os.Stat(path); err == nil {
538		w.Header().Set("Content-Type", "application/json")
539		json.NewEncoder(w).Encode(map[string]interface{}{
540			"error": "path already exists",
541		})
542		return
543	}
544
545	// Verify parent directory exists
546	parentDir := filepath.Dir(path)
547	if _, err := os.Stat(parentDir); os.IsNotExist(err) {
548		w.Header().Set("Content-Type", "application/json")
549		json.NewEncoder(w).Encode(map[string]interface{}{
550			"error": "parent directory does not exist",
551		})
552		return
553	}
554
555	// Create the directory (only the final directory, not parents)
556	if err := os.Mkdir(path, 0o755); err != nil {
557		w.Header().Set("Content-Type", "application/json")
558		if os.IsPermission(err) {
559			json.NewEncoder(w).Encode(map[string]interface{}{
560				"error": "permission denied",
561			})
562		} else {
563			json.NewEncoder(w).Encode(map[string]interface{}{
564				"error": err.Error(),
565			})
566		}
567		return
568	}
569
570	w.Header().Set("Content-Type", "application/json")
571	json.NewEncoder(w).Encode(map[string]interface{}{
572		"path": path,
573	})
574}
575
576// getOrCreateConversationManager gets an existing conversation manager or creates a new one.
577func (s *Server) getOrCreateConversationManager(ctx context.Context, conversationID string) (*ConversationManager, error) {
578	manager, err, _ := s.conversationGroup.Do(conversationID, func() (*ConversationManager, error) {
579		s.mu.Lock()
580		defer s.mu.Unlock()
581		if manager, exists := s.activeConversations[conversationID]; exists {
582			manager.Touch()
583			return manager, nil
584		}
585
586		recordMessage := func(ctx context.Context, message llm.Message, usage llm.Usage) error {
587			return s.recordMessage(ctx, conversationID, message, usage)
588		}
589
590		onStateChange := func(state ConversationState) {
591			s.publishConversationState(state)
592		}
593
594		manager := NewConversationManager(conversationID, s.db, s.logger, s.toolSetConfig, recordMessage, onStateChange)
595		if err := manager.Hydrate(ctx); err != nil {
596			return nil, err
597		}
598
599		s.activeConversations[conversationID] = manager
600		return manager, nil
601	})
602	if err != nil {
603		return nil, err
604	}
605	return manager, nil
606}
607
608// ExtractDisplayData extracts display data from message content for storage
609func ExtractDisplayData(message llm.Message) interface{} {
610	// Build a map of tool_use_id to tool_name for lookups
611	toolNameMap := make(map[string]string)
612	for _, content := range message.Content {
613		if content.Type == llm.ContentTypeToolUse {
614			toolNameMap[content.ID] = content.ToolName
615		}
616	}
617
618	var displayData []any
619	for _, content := range message.Content {
620		if content.Type == llm.ContentTypeToolResult && content.Display != nil {
621			// Include tool name if we can find it
622			toolName := toolNameMap[content.ToolUseID]
623			displayData = append(displayData, map[string]any{
624				"tool_use_id": content.ToolUseID,
625				"tool_name":   toolName,
626				"display":     content.Display,
627			})
628		}
629	}
630
631	if len(displayData) > 0 {
632		return displayData
633	}
634	return nil
635}
636
637// recordMessage records a new message to the database and also notifies subscribers
638func (s *Server) recordMessage(ctx context.Context, conversationID string, message llm.Message, usage llm.Usage) error {
639	// Log message based on role
640	if message.Role == llm.MessageRoleUser {
641		s.logger.Info("User message", "conversation_id", conversationID, "content_items", len(message.Content))
642	} else if message.Role == llm.MessageRoleAssistant {
643		s.logger.Info("Agent message", "conversation_id", conversationID, "content_items", len(message.Content), "end_of_turn", message.EndOfTurn)
644	}
645
646	// Convert LLM message to database format
647	messageType, err := s.getMessageType(message)
648	if err != nil {
649		return fmt.Errorf("failed to determine message type: %w", err)
650	}
651
652	// Extract display data from content items
653	displayDataToStore := ExtractDisplayData(message)
654
655	// Create message
656	createdMsg, err := s.db.CreateMessage(ctx, db.CreateMessageParams{
657		ConversationID:      conversationID,
658		Type:                messageType,
659		LLMData:             message,
660		UserData:            nil,
661		UsageData:           usage,
662		DisplayData:         displayDataToStore,
663		ExcludedFromContext: message.ExcludedFromContext,
664	})
665	if err != nil {
666		return fmt.Errorf("failed to create message: %w", err)
667	}
668
669	// Update conversation's last updated timestamp for correct ordering
670	if err := s.db.QueriesTx(ctx, func(q *generated.Queries) error {
671		return q.UpdateConversationTimestamp(ctx, conversationID)
672	}); err != nil {
673		s.logger.Warn("Failed to update conversation timestamp", "conversationID", conversationID, "error", err)
674	}
675
676	// Touch active manager activity time if present
677	s.mu.Lock()
678	mgr, ok := s.activeConversations[conversationID]
679	if ok {
680		mgr.Touch()
681	}
682	s.mu.Unlock()
683
684	// Notify subscribers with only the new message - use WithoutCancel because
685	// the HTTP request context may be cancelled after the handler returns, but
686	// we still want the notification to complete so SSE clients see the message immediately
687	go s.notifySubscribersNewMessage(context.WithoutCancel(ctx), conversationID, createdMsg)
688
689	return nil
690}
691
692// getMessageType determines the message type from an LLM message
693func (s *Server) getMessageType(message llm.Message) (db.MessageType, error) {
694	// System-generated errors are stored as error type
695	if message.ErrorType != llm.ErrorTypeNone {
696		return db.MessageTypeError, nil
697	}
698
699	switch message.Role {
700	case llm.MessageRoleUser:
701		return db.MessageTypeUser, nil
702	case llm.MessageRoleAssistant:
703		return db.MessageTypeAgent, nil
704	default:
705		// For tool messages, check if it's a tool call or tool result
706		for _, content := range message.Content {
707			if content.Type == llm.ContentTypeToolUse {
708				return db.MessageTypeTool, nil
709			}
710			if content.Type == llm.ContentTypeToolResult {
711				return db.MessageTypeTool, nil
712			}
713		}
714		return db.MessageTypeAgent, nil
715	}
716}
717
718// convertToLLMMessage converts a database message to an LLM message
719func convertToLLMMessage(msg generated.Message) (llm.Message, error) {
720	var llmMsg llm.Message
721	if msg.LlmData == nil {
722		return llm.Message{}, fmt.Errorf("message has no LLM data")
723	}
724	if err := json.Unmarshal([]byte(*msg.LlmData), &llmMsg); err != nil {
725		return llm.Message{}, fmt.Errorf("failed to unmarshal LLM data: %w", err)
726	}
727	return llmMsg, nil
728}
729
730// notifySubscribers sends conversation metadata updates (e.g., slug changes) to subscribers.
731// This is used when only the conversation data changes, not the messages.
732// Uses Broadcast instead of Publish to avoid racing with message sequence IDs.
733func (s *Server) notifySubscribers(ctx context.Context, conversationID string) {
734	s.mu.Lock()
735	manager, exists := s.activeConversations[conversationID]
736	s.mu.Unlock()
737
738	if !exists {
739		return
740	}
741
742	// Get conversation data only (no messages needed for metadata-only updates)
743	var conversation generated.Conversation
744	err := s.db.Queries(ctx, func(q *generated.Queries) error {
745		var err error
746		conversation, err = q.GetConversation(ctx, conversationID)
747		return err
748	})
749	if err != nil {
750		s.logger.Error("Failed to get conversation data for notification", "conversationID", conversationID, "error", err)
751		return
752	}
753
754	// Broadcast conversation update with no new messages.
755	// Using Broadcast instead of Publish ensures this metadata-only update
756	// doesn't race with notifySubscribersNewMessage which uses Publish with sequence IDs.
757	streamData := StreamResponse{
758		Messages:     nil, // No new messages, just conversation update
759		Conversation: conversation,
760	}
761	manager.subpub.Broadcast(streamData)
762
763	// Also notify conversation list subscribers (e.g., slug change)
764	s.publishConversationListUpdate(ConversationListUpdate{
765		Type:         "update",
766		Conversation: &conversation,
767	})
768}
769
770// notifySubscribersNewMessage sends a single new message to all subscribers.
771// This is more efficient than re-sending all messages on each update.
772func (s *Server) notifySubscribersNewMessage(ctx context.Context, conversationID string, newMsg *generated.Message) {
773	s.mu.Lock()
774	manager, exists := s.activeConversations[conversationID]
775	s.mu.Unlock()
776
777	if !exists {
778		return
779	}
780
781	// Get conversation data for the response
782	var conversation generated.Conversation
783	err := s.db.Queries(ctx, func(q *generated.Queries) error {
784		var err error
785		conversation, err = q.GetConversation(ctx, conversationID)
786		return err
787	})
788	if err != nil {
789		s.logger.Error("Failed to get conversation data for notification", "conversationID", conversationID, "error", err)
790		return
791	}
792
793	// Convert the single new message to API format
794	apiMessages := toAPIMessages([]generated.Message{*newMsg})
795
796	// Update agent working state based on message type
797	if isAgentEndOfTurn(newMsg) {
798		manager.SetAgentWorking(false)
799	}
800
801	// Publish only the new message
802	streamData := StreamResponse{
803		Messages:     apiMessages,
804		Conversation: conversation,
805		// ContextWindowSize: 0 for messages without usage data (user/tool messages).
806		// With omitempty, 0 is omitted from JSON, so the UI keeps its cached value.
807		// Only agent messages have usage data, so context window updates when they arrive.
808		ContextWindowSize: calculateContextWindowSizeFromMsg(newMsg),
809	}
810	manager.subpub.Publish(newMsg.SequenceID, streamData)
811
812	// Also notify conversation list subscribers about the update (updated_at changed)
813	s.publishConversationListUpdate(ConversationListUpdate{
814		Type:         "update",
815		Conversation: &conversation,
816	})
817}
818
819// publishConversationListUpdate broadcasts a conversation list update to ALL active
820// conversation streams. This allows clients to receive updates about other conversations
821// while they're subscribed to their current conversation's stream.
822func (s *Server) publishConversationListUpdate(update ConversationListUpdate) {
823	s.mu.Lock()
824	defer s.mu.Unlock()
825
826	// Broadcast to all active conversation managers
827	for _, manager := range s.activeConversations {
828		streamData := StreamResponse{
829			ConversationListUpdate: &update,
830		}
831		manager.subpub.Broadcast(streamData)
832	}
833}
834
835// publishConversationState broadcasts a conversation state update to ALL active
836// conversation streams. This allows clients to see the working state of other conversations.
837func (s *Server) publishConversationState(state ConversationState) {
838	s.mu.Lock()
839	defer s.mu.Unlock()
840
841	// Broadcast to all active conversation managers
842	for _, manager := range s.activeConversations {
843		streamData := StreamResponse{
844			ConversationState: &state,
845		}
846		manager.subpub.Broadcast(streamData)
847	}
848}
849
850// getWorkingConversations returns a map of conversation IDs that are currently working.
851func (s *Server) getWorkingConversations() map[string]bool {
852	s.mu.Lock()
853	defer s.mu.Unlock()
854
855	working := make(map[string]bool)
856	for id, manager := range s.activeConversations {
857		if manager.IsAgentWorking() {
858			working[id] = true
859		}
860	}
861	return working
862}
863
864// IsAgentWorking returns whether the agent is currently working on the given conversation.
865// Returns false if the conversation doesn't have an active manager.
866func (s *Server) IsAgentWorking(conversationID string) bool {
867	s.mu.Lock()
868	manager, exists := s.activeConversations[conversationID]
869	s.mu.Unlock()
870	if !exists {
871		return false
872	}
873	return manager.IsAgentWorking()
874}
875
876// Cleanup removes inactive conversation managers
877func (s *Server) Cleanup() {
878	s.mu.Lock()
879	defer s.mu.Unlock()
880
881	now := time.Now()
882	for id, manager := range s.activeConversations {
883		// Remove managers that have been inactive for more than 30 minutes
884		manager.mu.Lock()
885		lastActivity := manager.lastActivity
886		manager.mu.Unlock()
887		if now.Sub(lastActivity) > 30*time.Minute {
888			manager.stopLoop()
889			delete(s.activeConversations, id)
890			s.logger.Debug("Cleaned up inactive conversation", "conversationID", id)
891		}
892	}
893}
894
895// Start starts the HTTP server and handles the complete lifecycle
896func (s *Server) Start(port string) error {
897	listener, err := net.Listen("tcp", ":"+port)
898	if err != nil {
899		s.logger.Error("Failed to create listener", "error", err, "port_info", getPortOwnerInfo(port))
900		return err
901	}
902	return s.StartWithListener(listener)
903}
904
905// StartWithListener starts the HTTP server using the provided listener.
906// This is useful for systemd socket activation where the listener is created externally.
907func (s *Server) StartWithListener(listener net.Listener) error {
908	// Set up HTTP server with routes and middleware
909	mux := http.NewServeMux()
910	s.RegisterRoutes(mux)
911
912	// Add middleware (applied in reverse order: last added = first executed)
913	handler := LoggerMiddleware(s.logger)(mux)
914	handler = CSRFMiddleware()(handler)
915	if s.requireHeader != "" {
916		handler = RequireHeaderMiddleware(s.requireHeader)(handler)
917	}
918
919	httpServer := &http.Server{
920		Handler: handler,
921	}
922
923	// Start cleanup routine
924	go func() {
925		ticker := time.NewTicker(5 * time.Minute)
926		defer ticker.Stop()
927		for range ticker.C {
928			s.Cleanup()
929		}
930	}()
931
932	// Get actual port from listener
933	actualPort := listener.Addr().(*net.TCPAddr).Port
934
935	// Start server in goroutine
936	serverErrCh := make(chan error, 1)
937	go func() {
938		s.logger.Info("Server starting", "port", actualPort, "url", fmt.Sprintf("http://localhost:%d", actualPort))
939		if err := httpServer.Serve(listener); err != nil && err != http.ErrServerClosed {
940			serverErrCh <- err
941		}
942	}()
943
944	// Wait for shutdown signal or server error
945	quit := make(chan os.Signal, 1)
946	signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
947
948	select {
949	case err := <-serverErrCh:
950		s.logger.Error("Server failed", "error", err)
951		return err
952	case <-quit:
953		s.logger.Info("Shutting down server")
954	}
955
956	// Graceful shutdown
957	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
958	defer cancel()
959
960	if err := httpServer.Shutdown(ctx); err != nil {
961		s.logger.Error("Server forced to shutdown", "error", err)
962		return err
963	}
964
965	s.logger.Info("Server exited")
966	return nil
967}
968
969// getPortOwnerInfo tries to identify what process is using a port.
970// Returns a human-readable string with the PID and process name, or an error message.
971func getPortOwnerInfo(port string) string {
972	// Use lsof to find the process using the port
973	cmd := exec.Command("lsof", "-i", ":"+port, "-sTCP:LISTEN", "-n", "-P")
974	output, err := cmd.Output()
975	if err != nil {
976		return fmt.Sprintf("(unable to determine: %v)", err)
977	}
978
979	lines := strings.Split(strings.TrimSpace(string(output)), "\n")
980	if len(lines) < 2 {
981		return "(no process found)"
982	}
983
984	// Parse lsof output: COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
985	// Skip the header line
986	for _, line := range lines[1:] {
987		fields := strings.Fields(line)
988		if len(fields) >= 2 {
989			command := fields[0]
990			pid := fields[1]
991			return fmt.Sprintf("pid=%s process=%s", pid, command)
992		}
993	}
994
995	return "(could not parse lsof output)"
996}