server.go

  1package server
  2
  3import (
  4	"context"
  5	"encoding/json"
  6	"fmt"
  7	"log/slog"
  8	"net"
  9	"net/http"
 10	"os"
 11	"os/exec"
 12	"os/signal"
 13	"path/filepath"
 14	"strings"
 15	"sync"
 16	"syscall"
 17	"time"
 18
 19	"tailscale.com/util/singleflight"
 20
 21	"shelley.exe.dev/claudetool"
 22	"shelley.exe.dev/db"
 23	"shelley.exe.dev/db/generated"
 24	"shelley.exe.dev/llm"
 25	"shelley.exe.dev/models"
 26	"shelley.exe.dev/ui"
 27)
 28
 29// APIMessage is the message format sent to clients
 30// TODO: We could maybe omit llm_data when display_data is available
 31type APIMessage struct {
 32	MessageID      string    `json:"message_id"`
 33	ConversationID string    `json:"conversation_id"`
 34	SequenceID     int64     `json:"sequence_id"`
 35	Type           string    `json:"type"`
 36	LlmData        *string   `json:"llm_data,omitempty"`
 37	UserData       *string   `json:"user_data,omitempty"`
 38	UsageData      *string   `json:"usage_data,omitempty"`
 39	CreatedAt      time.Time `json:"created_at"`
 40	DisplayData    *string   `json:"display_data,omitempty"`
 41	EndOfTurn      *bool     `json:"end_of_turn,omitempty"`
 42}
 43
 44// ConversationState represents the current state of a conversation.
 45// This is broadcast to all subscribers whenever the state changes.
 46type ConversationState struct {
 47	ConversationID string `json:"conversation_id"`
 48	Working        bool   `json:"working"`
 49	Model          string `json:"model,omitempty"`
 50}
 51
 52// ConversationWithState combines a conversation with its working state.
 53type ConversationWithState struct {
 54	generated.Conversation
 55	Working bool `json:"working"`
 56}
 57
 58// StreamResponse represents the response format for conversation streaming
 59type StreamResponse struct {
 60	Messages          []APIMessage           `json:"messages"`
 61	Conversation      generated.Conversation `json:"conversation"`
 62	ConversationState *ConversationState     `json:"conversation_state,omitempty"`
 63	ContextWindowSize uint64                 `json:"context_window_size,omitempty"`
 64	// ConversationListUpdate is set when another conversation in the list changed
 65	ConversationListUpdate *ConversationListUpdate `json:"conversation_list_update,omitempty"`
 66	// Heartbeat indicates this is a heartbeat message (no new data, just keeping connection alive)
 67	Heartbeat bool `json:"heartbeat,omitempty"`
 68}
 69
 70// LLMProvider is an interface for getting LLM services
 71type LLMProvider interface {
 72	GetService(modelID string) (llm.Service, error)
 73	GetAvailableModels() []string
 74	HasModel(modelID string) bool
 75	GetModelInfo(modelID string) *models.ModelInfo
 76	RefreshCustomModels() error
 77}
 78
 79// NewLLMServiceManager creates a new LLM service manager from config
 80func NewLLMServiceManager(cfg *LLMConfig) LLMProvider {
 81	// Convert LLMConfig to models.Config
 82	modelConfig := &models.Config{
 83		AnthropicAPIKey: cfg.AnthropicAPIKey,
 84		OpenAIAPIKey:    cfg.OpenAIAPIKey,
 85		GeminiAPIKey:    cfg.GeminiAPIKey,
 86		FireworksAPIKey: cfg.FireworksAPIKey,
 87		Gateway:         cfg.Gateway,
 88		Logger:          cfg.Logger,
 89		DB:              cfg.DB,
 90	}
 91
 92	manager, err := models.NewManager(modelConfig)
 93	if err != nil {
 94		// This shouldn't happen in practice, but handle it gracefully
 95		cfg.Logger.Error("Failed to create models manager", "error", err)
 96	}
 97
 98	return manager
 99}
100
101// toAPIMessages converts database messages to API messages.
102// When display_data is present (tool results), llm_data is omitted to save bandwidth
103// since the display_data contains all information needed for UI rendering.
104func toAPIMessages(messages []generated.Message) []APIMessage {
105	apiMessages := make([]APIMessage, len(messages))
106	for i, msg := range messages {
107		var endOfTurnPtr *bool
108		if msg.LlmData != nil && msg.Type == string(db.MessageTypeAgent) {
109			if endOfTurn, ok := extractEndOfTurn(*msg.LlmData); ok {
110				endOfTurnCopy := endOfTurn
111				endOfTurnPtr = &endOfTurnCopy
112			}
113		}
114
115		// TODO: Consider omitting llm_data when display_data is present to save bandwidth.
116		// The display_data contains all info needed for UI rendering of tool results,
117		// but the UI currently still uses llm_data for some checks.
118
119		apiMsg := APIMessage{
120			MessageID:      msg.MessageID,
121			ConversationID: msg.ConversationID,
122			SequenceID:     msg.SequenceID,
123			Type:           msg.Type,
124			LlmData:        msg.LlmData,
125			UserData:       msg.UserData,
126			UsageData:      msg.UsageData,
127			CreatedAt:      msg.CreatedAt,
128			DisplayData:    msg.DisplayData,
129			EndOfTurn:      endOfTurnPtr,
130		}
131		apiMessages[i] = apiMsg
132	}
133	return apiMessages
134}
135
136func extractEndOfTurn(raw string) (bool, bool) {
137	var message llm.Message
138	if err := json.Unmarshal([]byte(raw), &message); err != nil {
139		return false, false
140	}
141	return message.EndOfTurn, true
142}
143
144// calculateContextWindowSize returns the context window usage from the most recent message with non-zero usage.
145// Each API call's input tokens represent the full conversation history sent to the model,
146// so we only need the last message's tokens (not accumulated across all messages).
147// The total input includes regular input tokens plus cached tokens (both read and created).
148// Messages without usage data (user messages, tool messages, etc.) are skipped.
149func calculateContextWindowSize(messages []APIMessage) uint64 {
150	// Find the last message with non-zero usage data
151	for i := len(messages) - 1; i >= 0; i-- {
152		msg := messages[i]
153		if msg.UsageData == nil {
154			continue
155		}
156		var usage llm.Usage
157		if err := json.Unmarshal([]byte(*msg.UsageData), &usage); err != nil {
158			continue
159		}
160		ctxUsed := usage.ContextWindowUsed()
161		if ctxUsed == 0 {
162			continue
163		}
164		// Return total context window used: all input tokens + output tokens
165		// This represents the full context that would be sent for the next turn
166		return ctxUsed
167	}
168	return 0
169}
170
171// isAgentEndOfTurn checks if a message is an agent or error message with end_of_turn=true.
172// This indicates the agent loop has finished processing.
173func isAgentEndOfTurn(msg *generated.Message) bool {
174	if msg == nil {
175		return false
176	}
177	// Agent and error messages can have end_of_turn
178	if msg.Type != string(db.MessageTypeAgent) && msg.Type != string(db.MessageTypeError) {
179		return false
180	}
181	if msg.LlmData == nil {
182		return false
183	}
184	endOfTurn, ok := extractEndOfTurn(*msg.LlmData)
185	if !ok {
186		return false
187	}
188	return endOfTurn
189}
190
191// calculateContextWindowSizeFromMsg calculates context window usage from a single message.
192// Returns 0 if the message has no usage data (e.g., user messages), in which case
193// the client should keep its previous context window value.
194func calculateContextWindowSizeFromMsg(msg *generated.Message) uint64 {
195	if msg == nil || msg.UsageData == nil {
196		return 0
197	}
198	var usage llm.Usage
199	if err := json.Unmarshal([]byte(*msg.UsageData), &usage); err != nil {
200		return 0
201	}
202	return usage.ContextWindowUsed()
203}
204
205// ConversationListUpdate represents an update to the conversation list
206type ConversationListUpdate struct {
207	Type           string                  `json:"type"` // "update", "delete"
208	Conversation   *generated.Conversation `json:"conversation,omitempty"`
209	ConversationID string                  `json:"conversation_id,omitempty"` // For deletes
210}
211
212// Server manages the HTTP API and active conversations
213type Server struct {
214	db                  *db.DB
215	llmManager          LLMProvider
216	toolSetConfig       claudetool.ToolSetConfig
217	activeConversations map[string]*ConversationManager
218	mu                  sync.Mutex
219	logger              *slog.Logger
220	predictableOnly     bool
221	terminalURL         string
222	defaultModel        string
223	links               []Link
224	requireHeader       string
225	conversationGroup   singleflight.Group[string, *ConversationManager]
226	versionChecker      *VersionChecker
227}
228
229// NewServer creates a new server instance
230func NewServer(database *db.DB, llmManager LLMProvider, toolSetConfig claudetool.ToolSetConfig, logger *slog.Logger, predictableOnly bool, terminalURL, defaultModel, requireHeader string, links []Link) *Server {
231	s := &Server{
232		db:                  database,
233		llmManager:          llmManager,
234		toolSetConfig:       toolSetConfig,
235		activeConversations: make(map[string]*ConversationManager),
236		logger:              logger,
237		predictableOnly:     predictableOnly,
238		terminalURL:         terminalURL,
239		defaultModel:        defaultModel,
240		requireHeader:       requireHeader,
241		links:               links,
242		versionChecker:      NewVersionChecker(),
243	}
244
245	// Set up subagent support
246	s.toolSetConfig.SubagentRunner = NewSubagentRunner(s)
247	s.toolSetConfig.SubagentDB = &db.SubagentDBAdapter{DB: database}
248
249	return s
250}
251
252// RegisterRoutes registers HTTP routes on the given mux
253func (s *Server) RegisterRoutes(mux *http.ServeMux) {
254	// API routes - wrap with gzip where beneficial
255	mux.Handle("/api/conversations", gzipHandler(http.HandlerFunc(s.handleConversations)))
256	mux.Handle("/api/conversations/archived", gzipHandler(http.HandlerFunc(s.handleArchivedConversations)))
257	mux.Handle("/api/conversations/new", http.HandlerFunc(s.handleNewConversation))           // Small response
258	mux.Handle("/api/conversations/continue", http.HandlerFunc(s.handleContinueConversation)) // Small response
259	mux.Handle("/api/conversation/", http.StripPrefix("/api/conversation", s.conversationMux()))
260	mux.Handle("/api/conversation-by-slug/", gzipHandler(http.HandlerFunc(s.handleConversationBySlug)))
261	mux.Handle("/api/validate-cwd", http.HandlerFunc(s.handleValidateCwd)) // Small response
262	mux.Handle("/api/list-directory", gzipHandler(http.HandlerFunc(s.handleListDirectory)))
263	mux.Handle("/api/create-directory", http.HandlerFunc(s.handleCreateDirectory))
264	mux.Handle("/api/git/diffs", gzipHandler(http.HandlerFunc(s.handleGitDiffs)))
265	mux.Handle("/api/git/diffs/", gzipHandler(http.HandlerFunc(s.handleGitDiffFiles)))
266	mux.Handle("/api/git/file-diff/", gzipHandler(http.HandlerFunc(s.handleGitFileDiff)))
267	mux.HandleFunc("/api/upload", s.handleUpload)                      // Binary uploads
268	mux.HandleFunc("/api/read", s.handleRead)                          // Serves images
269	mux.Handle("/api/write-file", http.HandlerFunc(s.handleWriteFile)) // Small response
270	mux.HandleFunc("/api/exec-ws", s.handleExecWS)                     // Websocket for shell commands
271
272	// Custom models API
273	mux.Handle("/api/custom-models", http.HandlerFunc(s.handleCustomModels))
274	mux.Handle("/api/custom-models/", http.HandlerFunc(s.handleCustomModel))
275	mux.Handle("/api/custom-models-test", http.HandlerFunc(s.handleTestModel))
276
277	// Models API (dynamic list refresh)
278	mux.Handle("/api/models", http.HandlerFunc(s.handleModels))
279
280	// Version endpoints
281	mux.Handle("GET /version", http.HandlerFunc(s.handleVersion))
282	mux.Handle("GET /version-check", http.HandlerFunc(s.handleVersionCheck))
283	mux.Handle("GET /version-changelog", http.HandlerFunc(s.handleVersionChangelog))
284	mux.Handle("POST /upgrade", http.HandlerFunc(s.handleUpgrade))
285	mux.Handle("POST /exit", http.HandlerFunc(s.handleExit))
286
287	// Debug endpoints
288	mux.Handle("GET /debug/llm_requests", http.HandlerFunc(s.handleDebugLLMRequests))
289	mux.Handle("GET /debug/llm_requests/api", http.HandlerFunc(s.handleDebugLLMRequestsAPI))
290	mux.Handle("GET /debug/llm_requests/{id}/request", http.HandlerFunc(s.handleDebugLLMRequestBody))
291	mux.Handle("GET /debug/llm_requests/{id}/request_full", http.HandlerFunc(s.handleDebugLLMRequestBodyFull))
292	mux.Handle("GET /debug/llm_requests/{id}/response", http.HandlerFunc(s.handleDebugLLMResponseBody))
293
294	// Serve embedded UI assets
295	mux.Handle("/", s.staticHandler(ui.Assets()))
296}
297
298// handleValidateCwd validates that a path exists and is a directory
299func (s *Server) handleValidateCwd(w http.ResponseWriter, r *http.Request) {
300	if r.Method != http.MethodGet {
301		http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
302		return
303	}
304
305	path := r.URL.Query().Get("path")
306	if path == "" {
307		w.Header().Set("Content-Type", "application/json")
308		json.NewEncoder(w).Encode(map[string]interface{}{
309			"valid": false,
310			"error": "path is required",
311		})
312		return
313	}
314
315	info, err := os.Stat(path)
316	if err != nil {
317		w.Header().Set("Content-Type", "application/json")
318		if os.IsNotExist(err) {
319			json.NewEncoder(w).Encode(map[string]interface{}{
320				"valid": false,
321				"error": "directory does not exist",
322			})
323		} else {
324			json.NewEncoder(w).Encode(map[string]interface{}{
325				"valid": false,
326				"error": err.Error(),
327			})
328		}
329		return
330	}
331
332	if !info.IsDir() {
333		w.Header().Set("Content-Type", "application/json")
334		json.NewEncoder(w).Encode(map[string]interface{}{
335			"valid": false,
336			"error": "path is not a directory",
337		})
338		return
339	}
340
341	w.Header().Set("Content-Type", "application/json")
342	json.NewEncoder(w).Encode(map[string]interface{}{
343		"valid": true,
344	})
345}
346
347// DirectoryEntry represents a single directory entry for the directory picker
348type DirectoryEntry struct {
349	Name           string `json:"name"`
350	IsDir          bool   `json:"is_dir"`
351	GitHeadSubject string `json:"git_head_subject,omitempty"`
352}
353
354// ListDirectoryResponse is the response from the list-directory endpoint
355type ListDirectoryResponse struct {
356	Path           string           `json:"path"`
357	Parent         string           `json:"parent"`
358	Entries        []DirectoryEntry `json:"entries"`
359	GitHeadSubject string           `json:"git_head_subject,omitempty"`
360}
361
362// handleListDirectory lists the contents of a directory for the directory picker
363func (s *Server) handleListDirectory(w http.ResponseWriter, r *http.Request) {
364	if r.Method != http.MethodGet {
365		http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
366		return
367	}
368
369	path := r.URL.Query().Get("path")
370	if path == "" {
371		// Default to home directory or root
372		homeDir, err := os.UserHomeDir()
373		if err != nil {
374			path = "/"
375		} else {
376			path = homeDir
377		}
378	}
379
380	// Clean and resolve the path
381	path = filepath.Clean(path)
382
383	// Verify path exists and is a directory
384	info, err := os.Stat(path)
385	if err != nil {
386		w.Header().Set("Content-Type", "application/json")
387		if os.IsNotExist(err) {
388			json.NewEncoder(w).Encode(map[string]interface{}{
389				"error": "directory does not exist",
390			})
391		} else if os.IsPermission(err) {
392			json.NewEncoder(w).Encode(map[string]interface{}{
393				"error": "permission denied",
394			})
395		} else {
396			json.NewEncoder(w).Encode(map[string]interface{}{
397				"error": err.Error(),
398			})
399		}
400		return
401	}
402
403	if !info.IsDir() {
404		w.Header().Set("Content-Type", "application/json")
405		json.NewEncoder(w).Encode(map[string]interface{}{
406			"error": "path is not a directory",
407		})
408		return
409	}
410
411	// Read directory contents
412	dirEntries, err := os.ReadDir(path)
413	if err != nil {
414		w.Header().Set("Content-Type", "application/json")
415		if os.IsPermission(err) {
416			json.NewEncoder(w).Encode(map[string]interface{}{
417				"error": "permission denied",
418			})
419		} else {
420			json.NewEncoder(w).Encode(map[string]interface{}{
421				"error": err.Error(),
422			})
423		}
424		return
425	}
426
427	// Build response with only directories (for directory picker)
428	var entries []DirectoryEntry
429	for _, entry := range dirEntries {
430		// Only include directories
431		if entry.IsDir() {
432			dirEntry := DirectoryEntry{
433				Name:  entry.Name(),
434				IsDir: true,
435			}
436
437			// Check if this is a git repo root and get HEAD commit subject
438			entryPath := filepath.Join(path, entry.Name())
439			if isGitRepo(entryPath) {
440				if subject := getGitHeadSubject(entryPath); subject != "" {
441					dirEntry.GitHeadSubject = subject
442				}
443			}
444
445			entries = append(entries, dirEntry)
446		}
447	}
448
449	// Calculate parent directory
450	parent := filepath.Dir(path)
451	if parent == path {
452		// At root, no parent
453		parent = ""
454	}
455
456	response := ListDirectoryResponse{
457		Path:    path,
458		Parent:  parent,
459		Entries: entries,
460	}
461
462	// Check if the current directory itself is a git repo
463	if isGitRepo(path) {
464		response.GitHeadSubject = getGitHeadSubject(path)
465	}
466
467	w.Header().Set("Content-Type", "application/json")
468	json.NewEncoder(w).Encode(response)
469}
470
471// getGitHeadSubject returns the subject line of HEAD commit for a git repository.
472// Returns empty string if unable to get the subject.
473// isGitRepo checks if the given path is a git repository root.
474// Returns true for both regular repos (.git directory) and worktrees (.git file with gitdir:).
475func isGitRepo(dirPath string) bool {
476	gitPath := filepath.Join(dirPath, ".git")
477	fi, err := os.Stat(gitPath)
478	if err != nil {
479		return false
480	}
481	if fi.IsDir() {
482		return true // regular .git directory
483	}
484	if fi.Mode().IsRegular() {
485		// Check if it's a worktree .git file
486		content, err := os.ReadFile(gitPath)
487		if err == nil && strings.HasPrefix(string(content), "gitdir:") {
488			return true
489		}
490	}
491	return false
492}
493
494// getGitHeadSubject returns the subject line of HEAD commit for a git repository.
495// Returns empty string if unable to get the subject.
496func getGitHeadSubject(repoPath string) string {
497	cmd := exec.Command("git", "log", "-1", "--format=%s")
498	cmd.Dir = repoPath
499	output, err := cmd.Output()
500	if err != nil {
501		return ""
502	}
503	return strings.TrimSpace(string(output))
504}
505
506// handleCreateDirectory creates a new directory
507func (s *Server) handleCreateDirectory(w http.ResponseWriter, r *http.Request) {
508	if r.Method != http.MethodPost {
509		http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
510		return
511	}
512
513	var req struct {
514		Path string `json:"path"`
515	}
516	if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
517		w.Header().Set("Content-Type", "application/json")
518		json.NewEncoder(w).Encode(map[string]interface{}{
519			"error": "invalid request body",
520		})
521		return
522	}
523
524	if req.Path == "" {
525		w.Header().Set("Content-Type", "application/json")
526		json.NewEncoder(w).Encode(map[string]interface{}{
527			"error": "path is required",
528		})
529		return
530	}
531
532	// Clean the path
533	path := filepath.Clean(req.Path)
534
535	// Check if path already exists
536	if _, err := os.Stat(path); err == nil {
537		w.Header().Set("Content-Type", "application/json")
538		json.NewEncoder(w).Encode(map[string]interface{}{
539			"error": "path already exists",
540		})
541		return
542	}
543
544	// Verify parent directory exists
545	parentDir := filepath.Dir(path)
546	if _, err := os.Stat(parentDir); os.IsNotExist(err) {
547		w.Header().Set("Content-Type", "application/json")
548		json.NewEncoder(w).Encode(map[string]interface{}{
549			"error": "parent directory does not exist",
550		})
551		return
552	}
553
554	// Create the directory (only the final directory, not parents)
555	if err := os.Mkdir(path, 0o755); err != nil {
556		w.Header().Set("Content-Type", "application/json")
557		if os.IsPermission(err) {
558			json.NewEncoder(w).Encode(map[string]interface{}{
559				"error": "permission denied",
560			})
561		} else {
562			json.NewEncoder(w).Encode(map[string]interface{}{
563				"error": err.Error(),
564			})
565		}
566		return
567	}
568
569	w.Header().Set("Content-Type", "application/json")
570	json.NewEncoder(w).Encode(map[string]interface{}{
571		"path": path,
572	})
573}
574
575// getOrCreateConversationManager gets an existing conversation manager or creates a new one.
576func (s *Server) getOrCreateConversationManager(ctx context.Context, conversationID string) (*ConversationManager, error) {
577	manager, err, _ := s.conversationGroup.Do(conversationID, func() (*ConversationManager, error) {
578		s.mu.Lock()
579		defer s.mu.Unlock()
580		if manager, exists := s.activeConversations[conversationID]; exists {
581			manager.Touch()
582			return manager, nil
583		}
584
585		recordMessage := func(ctx context.Context, message llm.Message, usage llm.Usage) error {
586			return s.recordMessage(ctx, conversationID, message, usage)
587		}
588
589		onStateChange := func(state ConversationState) {
590			s.publishConversationState(state)
591		}
592
593		manager := NewConversationManager(conversationID, s.db, s.logger, s.toolSetConfig, recordMessage, onStateChange)
594		if err := manager.Hydrate(ctx); err != nil {
595			return nil, err
596		}
597
598		s.activeConversations[conversationID] = manager
599		return manager, nil
600	})
601	if err != nil {
602		return nil, err
603	}
604	return manager, nil
605}
606
607// ExtractDisplayData extracts display data from message content for storage
608func ExtractDisplayData(message llm.Message) interface{} {
609	// Build a map of tool_use_id to tool_name for lookups
610	toolNameMap := make(map[string]string)
611	for _, content := range message.Content {
612		if content.Type == llm.ContentTypeToolUse {
613			toolNameMap[content.ID] = content.ToolName
614		}
615	}
616
617	var displayData []any
618	for _, content := range message.Content {
619		if content.Type == llm.ContentTypeToolResult && content.Display != nil {
620			// Include tool name if we can find it
621			toolName := toolNameMap[content.ToolUseID]
622			displayData = append(displayData, map[string]any{
623				"tool_use_id": content.ToolUseID,
624				"tool_name":   toolName,
625				"display":     content.Display,
626			})
627		}
628	}
629
630	if len(displayData) > 0 {
631		return displayData
632	}
633	return nil
634}
635
636// recordMessage records a new message to the database and also notifies subscribers
637func (s *Server) recordMessage(ctx context.Context, conversationID string, message llm.Message, usage llm.Usage) error {
638	// Log message based on role
639	if message.Role == llm.MessageRoleUser {
640		s.logger.Info("User message", "conversation_id", conversationID, "content_items", len(message.Content))
641	} else if message.Role == llm.MessageRoleAssistant {
642		s.logger.Info("Agent message", "conversation_id", conversationID, "content_items", len(message.Content), "end_of_turn", message.EndOfTurn)
643	}
644
645	// Convert LLM message to database format
646	messageType, err := s.getMessageType(message)
647	if err != nil {
648		return fmt.Errorf("failed to determine message type: %w", err)
649	}
650
651	// Extract display data from content items
652	displayDataToStore := ExtractDisplayData(message)
653
654	// Create message
655	createdMsg, err := s.db.CreateMessage(ctx, db.CreateMessageParams{
656		ConversationID:      conversationID,
657		Type:                messageType,
658		LLMData:             message,
659		UserData:            nil,
660		UsageData:           usage,
661		DisplayData:         displayDataToStore,
662		ExcludedFromContext: message.ExcludedFromContext,
663	})
664	if err != nil {
665		return fmt.Errorf("failed to create message: %w", err)
666	}
667
668	// Update conversation's last updated timestamp for correct ordering
669	if err := s.db.QueriesTx(ctx, func(q *generated.Queries) error {
670		return q.UpdateConversationTimestamp(ctx, conversationID)
671	}); err != nil {
672		s.logger.Warn("Failed to update conversation timestamp", "conversationID", conversationID, "error", err)
673	}
674
675	// Touch active manager activity time if present
676	s.mu.Lock()
677	mgr, ok := s.activeConversations[conversationID]
678	if ok {
679		mgr.Touch()
680	}
681	s.mu.Unlock()
682
683	// Notify subscribers with only the new message - use WithoutCancel because
684	// the HTTP request context may be cancelled after the handler returns, but
685	// we still want the notification to complete so SSE clients see the message immediately
686	go s.notifySubscribersNewMessage(context.WithoutCancel(ctx), conversationID, createdMsg)
687
688	return nil
689}
690
691// getMessageType determines the message type from an LLM message
692func (s *Server) getMessageType(message llm.Message) (db.MessageType, error) {
693	// System-generated errors are stored as error type
694	if message.ErrorType != llm.ErrorTypeNone {
695		return db.MessageTypeError, nil
696	}
697
698	switch message.Role {
699	case llm.MessageRoleUser:
700		return db.MessageTypeUser, nil
701	case llm.MessageRoleAssistant:
702		return db.MessageTypeAgent, nil
703	default:
704		// For tool messages, check if it's a tool call or tool result
705		for _, content := range message.Content {
706			if content.Type == llm.ContentTypeToolUse {
707				return db.MessageTypeTool, nil
708			}
709			if content.Type == llm.ContentTypeToolResult {
710				return db.MessageTypeTool, nil
711			}
712		}
713		return db.MessageTypeAgent, nil
714	}
715}
716
717// convertToLLMMessage converts a database message to an LLM message
718func convertToLLMMessage(msg generated.Message) (llm.Message, error) {
719	var llmMsg llm.Message
720	if msg.LlmData == nil {
721		return llm.Message{}, fmt.Errorf("message has no LLM data")
722	}
723	if err := json.Unmarshal([]byte(*msg.LlmData), &llmMsg); err != nil {
724		return llm.Message{}, fmt.Errorf("failed to unmarshal LLM data: %w", err)
725	}
726	return llmMsg, nil
727}
728
729// notifySubscribers sends conversation metadata updates (e.g., slug changes) to subscribers.
730// This is used when only the conversation data changes, not the messages.
731// Uses Broadcast instead of Publish to avoid racing with message sequence IDs.
732func (s *Server) notifySubscribers(ctx context.Context, conversationID string) {
733	s.mu.Lock()
734	manager, exists := s.activeConversations[conversationID]
735	s.mu.Unlock()
736
737	if !exists {
738		return
739	}
740
741	// Get conversation data only (no messages needed for metadata-only updates)
742	var conversation generated.Conversation
743	err := s.db.Queries(ctx, func(q *generated.Queries) error {
744		var err error
745		conversation, err = q.GetConversation(ctx, conversationID)
746		return err
747	})
748	if err != nil {
749		s.logger.Error("Failed to get conversation data for notification", "conversationID", conversationID, "error", err)
750		return
751	}
752
753	// Broadcast conversation update with no new messages.
754	// Using Broadcast instead of Publish ensures this metadata-only update
755	// doesn't race with notifySubscribersNewMessage which uses Publish with sequence IDs.
756	streamData := StreamResponse{
757		Messages:     nil, // No new messages, just conversation update
758		Conversation: conversation,
759	}
760	manager.subpub.Broadcast(streamData)
761
762	// Also notify conversation list subscribers (e.g., slug change)
763	s.publishConversationListUpdate(ConversationListUpdate{
764		Type:         "update",
765		Conversation: &conversation,
766	})
767}
768
769// notifySubscribersNewMessage sends a single new message to all subscribers.
770// This is more efficient than re-sending all messages on each update.
771func (s *Server) notifySubscribersNewMessage(ctx context.Context, conversationID string, newMsg *generated.Message) {
772	s.mu.Lock()
773	manager, exists := s.activeConversations[conversationID]
774	s.mu.Unlock()
775
776	if !exists {
777		return
778	}
779
780	// Get conversation data for the response
781	var conversation generated.Conversation
782	err := s.db.Queries(ctx, func(q *generated.Queries) error {
783		var err error
784		conversation, err = q.GetConversation(ctx, conversationID)
785		return err
786	})
787	if err != nil {
788		s.logger.Error("Failed to get conversation data for notification", "conversationID", conversationID, "error", err)
789		return
790	}
791
792	// Convert the single new message to API format
793	apiMessages := toAPIMessages([]generated.Message{*newMsg})
794
795	// Update agent working state based on message type
796	if isAgentEndOfTurn(newMsg) {
797		manager.SetAgentWorking(false)
798	}
799
800	// Publish only the new message
801	streamData := StreamResponse{
802		Messages:     apiMessages,
803		Conversation: conversation,
804		// ContextWindowSize: 0 for messages without usage data (user/tool messages).
805		// With omitempty, 0 is omitted from JSON, so the UI keeps its cached value.
806		// Only agent messages have usage data, so context window updates when they arrive.
807		ContextWindowSize: calculateContextWindowSizeFromMsg(newMsg),
808	}
809	manager.subpub.Publish(newMsg.SequenceID, streamData)
810
811	// Also notify conversation list subscribers about the update (updated_at changed)
812	s.publishConversationListUpdate(ConversationListUpdate{
813		Type:         "update",
814		Conversation: &conversation,
815	})
816}
817
818// publishConversationListUpdate broadcasts a conversation list update to ALL active
819// conversation streams. This allows clients to receive updates about other conversations
820// while they're subscribed to their current conversation's stream.
821func (s *Server) publishConversationListUpdate(update ConversationListUpdate) {
822	s.mu.Lock()
823	defer s.mu.Unlock()
824
825	// Broadcast to all active conversation managers
826	for _, manager := range s.activeConversations {
827		streamData := StreamResponse{
828			ConversationListUpdate: &update,
829		}
830		manager.subpub.Broadcast(streamData)
831	}
832}
833
834// publishConversationState broadcasts a conversation state update to ALL active
835// conversation streams. This allows clients to see the working state of other conversations.
836func (s *Server) publishConversationState(state ConversationState) {
837	s.mu.Lock()
838	defer s.mu.Unlock()
839
840	// Broadcast to all active conversation managers
841	for _, manager := range s.activeConversations {
842		streamData := StreamResponse{
843			ConversationState: &state,
844		}
845		manager.subpub.Broadcast(streamData)
846	}
847}
848
849// getWorkingConversations returns a map of conversation IDs that are currently working.
850func (s *Server) getWorkingConversations() map[string]bool {
851	s.mu.Lock()
852	defer s.mu.Unlock()
853
854	working := make(map[string]bool)
855	for id, manager := range s.activeConversations {
856		if manager.IsAgentWorking() {
857			working[id] = true
858		}
859	}
860	return working
861}
862
863// IsAgentWorking returns whether the agent is currently working on the given conversation.
864// Returns false if the conversation doesn't have an active manager.
865func (s *Server) IsAgentWorking(conversationID string) bool {
866	s.mu.Lock()
867	manager, exists := s.activeConversations[conversationID]
868	s.mu.Unlock()
869	if !exists {
870		return false
871	}
872	return manager.IsAgentWorking()
873}
874
875// Cleanup removes inactive conversation managers
876func (s *Server) Cleanup() {
877	s.mu.Lock()
878	defer s.mu.Unlock()
879
880	now := time.Now()
881	for id, manager := range s.activeConversations {
882		// Remove managers that have been inactive for more than 30 minutes
883		manager.mu.Lock()
884		lastActivity := manager.lastActivity
885		manager.mu.Unlock()
886		if now.Sub(lastActivity) > 30*time.Minute {
887			manager.stopLoop()
888			delete(s.activeConversations, id)
889			s.logger.Debug("Cleaned up inactive conversation", "conversationID", id)
890		}
891	}
892}
893
894// Start starts the HTTP server and handles the complete lifecycle
895func (s *Server) Start(port string) error {
896	listener, err := net.Listen("tcp", ":"+port)
897	if err != nil {
898		s.logger.Error("Failed to create listener", "error", err, "port_info", getPortOwnerInfo(port))
899		return err
900	}
901	return s.StartWithListener(listener)
902}
903
904// StartWithListener starts the HTTP server using the provided listener.
905// This is useful for systemd socket activation where the listener is created externally.
906func (s *Server) StartWithListener(listener net.Listener) error {
907	// Set up HTTP server with routes and middleware
908	mux := http.NewServeMux()
909	s.RegisterRoutes(mux)
910
911	// Add middleware (applied in reverse order: last added = first executed)
912	handler := LoggerMiddleware(s.logger)(mux)
913	handler = CSRFMiddleware()(handler)
914	if s.requireHeader != "" {
915		handler = RequireHeaderMiddleware(s.requireHeader)(handler)
916	}
917
918	httpServer := &http.Server{
919		Handler: handler,
920	}
921
922	// Start cleanup routine
923	go func() {
924		ticker := time.NewTicker(5 * time.Minute)
925		defer ticker.Stop()
926		for range ticker.C {
927			s.Cleanup()
928		}
929	}()
930
931	// Get actual port from listener
932	actualPort := listener.Addr().(*net.TCPAddr).Port
933
934	// Start server in goroutine
935	serverErrCh := make(chan error, 1)
936	go func() {
937		s.logger.Info("Server starting", "port", actualPort, "url", fmt.Sprintf("http://localhost:%d", actualPort))
938		if err := httpServer.Serve(listener); err != nil && err != http.ErrServerClosed {
939			serverErrCh <- err
940		}
941	}()
942
943	// Wait for shutdown signal or server error
944	quit := make(chan os.Signal, 1)
945	signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
946
947	select {
948	case err := <-serverErrCh:
949		s.logger.Error("Server failed", "error", err)
950		return err
951	case <-quit:
952		s.logger.Info("Shutting down server")
953	}
954
955	// Graceful shutdown
956	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
957	defer cancel()
958
959	if err := httpServer.Shutdown(ctx); err != nil {
960		s.logger.Error("Server forced to shutdown", "error", err)
961		return err
962	}
963
964	s.logger.Info("Server exited")
965	return nil
966}
967
968// getPortOwnerInfo tries to identify what process is using a port.
969// Returns a human-readable string with the PID and process name, or an error message.
970func getPortOwnerInfo(port string) string {
971	// Use lsof to find the process using the port
972	cmd := exec.Command("lsof", "-i", ":"+port, "-sTCP:LISTEN", "-n", "-P")
973	output, err := cmd.Output()
974	if err != nil {
975		return fmt.Sprintf("(unable to determine: %v)", err)
976	}
977
978	lines := strings.Split(strings.TrimSpace(string(output)), "\n")
979	if len(lines) < 2 {
980		return "(no process found)"
981	}
982
983	// Parse lsof output: COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
984	// Skip the header line
985	for _, line := range lines[1:] {
986		fields := strings.Fields(line)
987		if len(fields) >= 2 {
988			command := fields[0]
989			pid := fields[1]
990			return fmt.Sprintf("pid=%s process=%s", pid, command)
991		}
992	}
993
994	return "(could not parse lsof output)"
995}