server.go

  1package server
  2
  3import (
  4	"context"
  5	"encoding/json"
  6	"fmt"
  7	"log/slog"
  8	"net"
  9	"net/http"
 10	"os"
 11	"os/exec"
 12	"os/signal"
 13	"path/filepath"
 14	"strings"
 15	"sync"
 16	"syscall"
 17	"time"
 18
 19	"tailscale.com/util/singleflight"
 20
 21	"shelley.exe.dev/claudetool"
 22	"shelley.exe.dev/db"
 23	"shelley.exe.dev/db/generated"
 24	"shelley.exe.dev/llm"
 25	"shelley.exe.dev/models"
 26	"shelley.exe.dev/ui"
 27)
 28
 29// APIMessage is the message format sent to clients
 30// TODO: We could maybe omit llm_data when display_data is available
 31type APIMessage struct {
 32	MessageID      string    `json:"message_id"`
 33	ConversationID string    `json:"conversation_id"`
 34	SequenceID     int64     `json:"sequence_id"`
 35	Type           string    `json:"type"`
 36	LlmData        *string   `json:"llm_data,omitempty"`
 37	UserData       *string   `json:"user_data,omitempty"`
 38	UsageData      *string   `json:"usage_data,omitempty"`
 39	CreatedAt      time.Time `json:"created_at"`
 40	DisplayData    *string   `json:"display_data,omitempty"`
 41	EndOfTurn      *bool     `json:"end_of_turn,omitempty"`
 42}
 43
 44// ConversationState represents the current state of a conversation.
 45// This is broadcast to all subscribers whenever the state changes.
 46type ConversationState struct {
 47	ConversationID string `json:"conversation_id"`
 48	Working        bool   `json:"working"`
 49	Model          string `json:"model,omitempty"`
 50}
 51
 52// ConversationWithState combines a conversation with its working state.
 53type ConversationWithState struct {
 54	generated.Conversation
 55	Working bool `json:"working"`
 56}
 57
 58// StreamResponse represents the response format for conversation streaming
 59type StreamResponse struct {
 60	Messages          []APIMessage           `json:"messages"`
 61	Conversation      generated.Conversation `json:"conversation"`
 62	ConversationState *ConversationState     `json:"conversation_state,omitempty"`
 63	ContextWindowSize uint64                 `json:"context_window_size,omitempty"`
 64	// ConversationListUpdate is set when another conversation in the list changed
 65	ConversationListUpdate *ConversationListUpdate `json:"conversation_list_update,omitempty"`
 66}
 67
 68// LLMProvider is an interface for getting LLM services
 69type LLMProvider interface {
 70	GetService(modelID string) (llm.Service, error)
 71	GetAvailableModels() []string
 72	HasModel(modelID string) bool
 73	GetModelInfo(modelID string) *models.ModelInfo
 74	RefreshCustomModels() error
 75}
 76
 77// NewLLMServiceManager creates a new LLM service manager from config
 78func NewLLMServiceManager(cfg *LLMConfig) LLMProvider {
 79	// Convert LLMConfig to models.Config
 80	modelConfig := &models.Config{
 81		AnthropicAPIKey: cfg.AnthropicAPIKey,
 82		OpenAIAPIKey:    cfg.OpenAIAPIKey,
 83		GeminiAPIKey:    cfg.GeminiAPIKey,
 84		FireworksAPIKey: cfg.FireworksAPIKey,
 85		Gateway:         cfg.Gateway,
 86		Logger:          cfg.Logger,
 87		DB:              cfg.DB,
 88	}
 89
 90	manager, err := models.NewManager(modelConfig)
 91	if err != nil {
 92		// This shouldn't happen in practice, but handle it gracefully
 93		cfg.Logger.Error("Failed to create models manager", "error", err)
 94	}
 95
 96	return manager
 97}
 98
 99// toAPIMessages converts database messages to API messages.
100// When display_data is present (tool results), llm_data is omitted to save bandwidth
101// since the display_data contains all information needed for UI rendering.
102func toAPIMessages(messages []generated.Message) []APIMessage {
103	apiMessages := make([]APIMessage, len(messages))
104	for i, msg := range messages {
105		var endOfTurnPtr *bool
106		if msg.LlmData != nil && msg.Type == string(db.MessageTypeAgent) {
107			if endOfTurn, ok := extractEndOfTurn(*msg.LlmData); ok {
108				endOfTurnCopy := endOfTurn
109				endOfTurnPtr = &endOfTurnCopy
110			}
111		}
112
113		// TODO: Consider omitting llm_data when display_data is present to save bandwidth.
114		// The display_data contains all info needed for UI rendering of tool results,
115		// but the UI currently still uses llm_data for some checks.
116
117		apiMsg := APIMessage{
118			MessageID:      msg.MessageID,
119			ConversationID: msg.ConversationID,
120			SequenceID:     msg.SequenceID,
121			Type:           msg.Type,
122			LlmData:        msg.LlmData,
123			UserData:       msg.UserData,
124			UsageData:      msg.UsageData,
125			CreatedAt:      msg.CreatedAt,
126			DisplayData:    msg.DisplayData,
127			EndOfTurn:      endOfTurnPtr,
128		}
129		apiMessages[i] = apiMsg
130	}
131	return apiMessages
132}
133
134func extractEndOfTurn(raw string) (bool, bool) {
135	var message llm.Message
136	if err := json.Unmarshal([]byte(raw), &message); err != nil {
137		return false, false
138	}
139	return message.EndOfTurn, true
140}
141
142// calculateContextWindowSize returns the context window usage from the most recent message with non-zero usage.
143// Each API call's input tokens represent the full conversation history sent to the model,
144// so we only need the last message's tokens (not accumulated across all messages).
145// The total input includes regular input tokens plus cached tokens (both read and created).
146// Messages without usage data (user messages, tool messages, etc.) are skipped.
147func calculateContextWindowSize(messages []APIMessage) uint64 {
148	// Find the last message with non-zero usage data
149	for i := len(messages) - 1; i >= 0; i-- {
150		msg := messages[i]
151		if msg.UsageData == nil {
152			continue
153		}
154		var usage llm.Usage
155		if err := json.Unmarshal([]byte(*msg.UsageData), &usage); err != nil {
156			continue
157		}
158		ctxUsed := usage.ContextWindowUsed()
159		if ctxUsed == 0 {
160			continue
161		}
162		// Return total context window used: all input tokens + output tokens
163		// This represents the full context that would be sent for the next turn
164		return ctxUsed
165	}
166	return 0
167}
168
169// isAgentEndOfTurn checks if a message is an agent or error message with end_of_turn=true.
170// This indicates the agent loop has finished processing.
171func isAgentEndOfTurn(msg *generated.Message) bool {
172	if msg == nil {
173		return false
174	}
175	// Agent and error messages can have end_of_turn
176	if msg.Type != string(db.MessageTypeAgent) && msg.Type != string(db.MessageTypeError) {
177		return false
178	}
179	if msg.LlmData == nil {
180		return false
181	}
182	endOfTurn, ok := extractEndOfTurn(*msg.LlmData)
183	if !ok {
184		return false
185	}
186	return endOfTurn
187}
188
189// calculateContextWindowSizeFromMsg calculates context window usage from a single message.
190// Returns 0 if the message has no usage data (e.g., user messages), in which case
191// the client should keep its previous context window value.
192func calculateContextWindowSizeFromMsg(msg *generated.Message) uint64 {
193	if msg == nil || msg.UsageData == nil {
194		return 0
195	}
196	var usage llm.Usage
197	if err := json.Unmarshal([]byte(*msg.UsageData), &usage); err != nil {
198		return 0
199	}
200	return usage.ContextWindowUsed()
201}
202
203// ConversationListUpdate represents an update to the conversation list
204type ConversationListUpdate struct {
205	Type           string                  `json:"type"` // "update", "delete"
206	Conversation   *generated.Conversation `json:"conversation,omitempty"`
207	ConversationID string                  `json:"conversation_id,omitempty"` // For deletes
208}
209
210// Server manages the HTTP API and active conversations
211type Server struct {
212	db                  *db.DB
213	llmManager          LLMProvider
214	toolSetConfig       claudetool.ToolSetConfig
215	activeConversations map[string]*ConversationManager
216	mu                  sync.Mutex
217	logger              *slog.Logger
218	predictableOnly     bool
219	terminalURL         string
220	defaultModel        string
221	links               []Link
222	requireHeader       string
223	conversationGroup   singleflight.Group[string, *ConversationManager]
224	versionChecker      *VersionChecker
225}
226
227// NewServer creates a new server instance
228func NewServer(database *db.DB, llmManager LLMProvider, toolSetConfig claudetool.ToolSetConfig, logger *slog.Logger, predictableOnly bool, terminalURL, defaultModel, requireHeader string, links []Link) *Server {
229	s := &Server{
230		db:                  database,
231		llmManager:          llmManager,
232		toolSetConfig:       toolSetConfig,
233		activeConversations: make(map[string]*ConversationManager),
234		logger:              logger,
235		predictableOnly:     predictableOnly,
236		terminalURL:         terminalURL,
237		defaultModel:        defaultModel,
238		requireHeader:       requireHeader,
239		links:               links,
240		versionChecker:      NewVersionChecker(),
241	}
242
243	// Set up subagent support
244	s.toolSetConfig.SubagentRunner = NewSubagentRunner(s)
245	s.toolSetConfig.SubagentDB = &db.SubagentDBAdapter{DB: database}
246
247	return s
248}
249
250// RegisterRoutes registers HTTP routes on the given mux
251func (s *Server) RegisterRoutes(mux *http.ServeMux) {
252	// API routes - wrap with gzip where beneficial
253	mux.Handle("/api/conversations", gzipHandler(http.HandlerFunc(s.handleConversations)))
254	mux.Handle("/api/conversations/archived", gzipHandler(http.HandlerFunc(s.handleArchivedConversations)))
255	mux.Handle("/api/conversations/new", http.HandlerFunc(s.handleNewConversation))           // Small response
256	mux.Handle("/api/conversations/continue", http.HandlerFunc(s.handleContinueConversation)) // Small response
257	mux.Handle("/api/conversation/", http.StripPrefix("/api/conversation", s.conversationMux()))
258	mux.Handle("/api/conversation-by-slug/", gzipHandler(http.HandlerFunc(s.handleConversationBySlug)))
259	mux.Handle("/api/validate-cwd", http.HandlerFunc(s.handleValidateCwd)) // Small response
260	mux.Handle("/api/list-directory", gzipHandler(http.HandlerFunc(s.handleListDirectory)))
261	mux.Handle("/api/create-directory", http.HandlerFunc(s.handleCreateDirectory))
262	mux.Handle("/api/git/diffs", gzipHandler(http.HandlerFunc(s.handleGitDiffs)))
263	mux.Handle("/api/git/diffs/", gzipHandler(http.HandlerFunc(s.handleGitDiffFiles)))
264	mux.Handle("/api/git/file-diff/", gzipHandler(http.HandlerFunc(s.handleGitFileDiff)))
265	mux.HandleFunc("/api/upload", s.handleUpload)                      // Binary uploads
266	mux.HandleFunc("/api/read", s.handleRead)                          // Serves images
267	mux.Handle("/api/write-file", http.HandlerFunc(s.handleWriteFile)) // Small response
268	mux.HandleFunc("/api/exec-ws", s.handleExecWS)                     // Websocket for shell commands
269
270	// Custom models API
271	mux.Handle("/api/custom-models", http.HandlerFunc(s.handleCustomModels))
272	mux.Handle("/api/custom-models/", http.HandlerFunc(s.handleCustomModel))
273	mux.Handle("/api/custom-models-test", http.HandlerFunc(s.handleTestModel))
274
275	// Models API (dynamic list refresh)
276	mux.Handle("/api/models", http.HandlerFunc(s.handleModels))
277
278	// Version endpoints
279	mux.Handle("GET /version", http.HandlerFunc(s.handleVersion))
280	mux.Handle("GET /version-check", http.HandlerFunc(s.handleVersionCheck))
281	mux.Handle("GET /version-changelog", http.HandlerFunc(s.handleVersionChangelog))
282	mux.Handle("POST /upgrade", http.HandlerFunc(s.handleUpgrade))
283	mux.Handle("POST /exit", http.HandlerFunc(s.handleExit))
284
285	// Debug endpoints
286	mux.Handle("GET /debug/llm_requests", http.HandlerFunc(s.handleDebugLLMRequests))
287	mux.Handle("GET /debug/llm_requests/api", http.HandlerFunc(s.handleDebugLLMRequestsAPI))
288	mux.Handle("GET /debug/llm_requests/{id}/request", http.HandlerFunc(s.handleDebugLLMRequestBody))
289	mux.Handle("GET /debug/llm_requests/{id}/request_full", http.HandlerFunc(s.handleDebugLLMRequestBodyFull))
290	mux.Handle("GET /debug/llm_requests/{id}/response", http.HandlerFunc(s.handleDebugLLMResponseBody))
291
292	// Serve embedded UI assets
293	mux.Handle("/", s.staticHandler(ui.Assets()))
294}
295
296// handleValidateCwd validates that a path exists and is a directory
297func (s *Server) handleValidateCwd(w http.ResponseWriter, r *http.Request) {
298	if r.Method != http.MethodGet {
299		http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
300		return
301	}
302
303	path := r.URL.Query().Get("path")
304	if path == "" {
305		w.Header().Set("Content-Type", "application/json")
306		json.NewEncoder(w).Encode(map[string]interface{}{
307			"valid": false,
308			"error": "path is required",
309		})
310		return
311	}
312
313	info, err := os.Stat(path)
314	if err != nil {
315		w.Header().Set("Content-Type", "application/json")
316		if os.IsNotExist(err) {
317			json.NewEncoder(w).Encode(map[string]interface{}{
318				"valid": false,
319				"error": "directory does not exist",
320			})
321		} else {
322			json.NewEncoder(w).Encode(map[string]interface{}{
323				"valid": false,
324				"error": err.Error(),
325			})
326		}
327		return
328	}
329
330	if !info.IsDir() {
331		w.Header().Set("Content-Type", "application/json")
332		json.NewEncoder(w).Encode(map[string]interface{}{
333			"valid": false,
334			"error": "path is not a directory",
335		})
336		return
337	}
338
339	w.Header().Set("Content-Type", "application/json")
340	json.NewEncoder(w).Encode(map[string]interface{}{
341		"valid": true,
342	})
343}
344
345// DirectoryEntry represents a single directory entry for the directory picker
346type DirectoryEntry struct {
347	Name           string `json:"name"`
348	IsDir          bool   `json:"is_dir"`
349	GitHeadSubject string `json:"git_head_subject,omitempty"`
350}
351
352// ListDirectoryResponse is the response from the list-directory endpoint
353type ListDirectoryResponse struct {
354	Path           string           `json:"path"`
355	Parent         string           `json:"parent"`
356	Entries        []DirectoryEntry `json:"entries"`
357	GitHeadSubject string           `json:"git_head_subject,omitempty"`
358}
359
360// handleListDirectory lists the contents of a directory for the directory picker
361func (s *Server) handleListDirectory(w http.ResponseWriter, r *http.Request) {
362	if r.Method != http.MethodGet {
363		http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
364		return
365	}
366
367	path := r.URL.Query().Get("path")
368	if path == "" {
369		// Default to home directory or root
370		homeDir, err := os.UserHomeDir()
371		if err != nil {
372			path = "/"
373		} else {
374			path = homeDir
375		}
376	}
377
378	// Clean and resolve the path
379	path = filepath.Clean(path)
380
381	// Verify path exists and is a directory
382	info, err := os.Stat(path)
383	if err != nil {
384		w.Header().Set("Content-Type", "application/json")
385		if os.IsNotExist(err) {
386			json.NewEncoder(w).Encode(map[string]interface{}{
387				"error": "directory does not exist",
388			})
389		} else if os.IsPermission(err) {
390			json.NewEncoder(w).Encode(map[string]interface{}{
391				"error": "permission denied",
392			})
393		} else {
394			json.NewEncoder(w).Encode(map[string]interface{}{
395				"error": err.Error(),
396			})
397		}
398		return
399	}
400
401	if !info.IsDir() {
402		w.Header().Set("Content-Type", "application/json")
403		json.NewEncoder(w).Encode(map[string]interface{}{
404			"error": "path is not a directory",
405		})
406		return
407	}
408
409	// Read directory contents
410	dirEntries, err := os.ReadDir(path)
411	if err != nil {
412		w.Header().Set("Content-Type", "application/json")
413		if os.IsPermission(err) {
414			json.NewEncoder(w).Encode(map[string]interface{}{
415				"error": "permission denied",
416			})
417		} else {
418			json.NewEncoder(w).Encode(map[string]interface{}{
419				"error": err.Error(),
420			})
421		}
422		return
423	}
424
425	// Build response with only directories (for directory picker)
426	var entries []DirectoryEntry
427	for _, entry := range dirEntries {
428		// Only include directories
429		if entry.IsDir() {
430			dirEntry := DirectoryEntry{
431				Name:  entry.Name(),
432				IsDir: true,
433			}
434
435			// Check if this is a git repo root and get HEAD commit subject
436			entryPath := filepath.Join(path, entry.Name())
437			if isGitRepo(entryPath) {
438				if subject := getGitHeadSubject(entryPath); subject != "" {
439					dirEntry.GitHeadSubject = subject
440				}
441			}
442
443			entries = append(entries, dirEntry)
444		}
445	}
446
447	// Calculate parent directory
448	parent := filepath.Dir(path)
449	if parent == path {
450		// At root, no parent
451		parent = ""
452	}
453
454	response := ListDirectoryResponse{
455		Path:    path,
456		Parent:  parent,
457		Entries: entries,
458	}
459
460	// Check if the current directory itself is a git repo
461	if isGitRepo(path) {
462		response.GitHeadSubject = getGitHeadSubject(path)
463	}
464
465	w.Header().Set("Content-Type", "application/json")
466	json.NewEncoder(w).Encode(response)
467}
468
469// getGitHeadSubject returns the subject line of HEAD commit for a git repository.
470// Returns empty string if unable to get the subject.
471// isGitRepo checks if the given path is a git repository root.
472// Returns true for both regular repos (.git directory) and worktrees (.git file with gitdir:).
473func isGitRepo(dirPath string) bool {
474	gitPath := filepath.Join(dirPath, ".git")
475	fi, err := os.Stat(gitPath)
476	if err != nil {
477		return false
478	}
479	if fi.IsDir() {
480		return true // regular .git directory
481	}
482	if fi.Mode().IsRegular() {
483		// Check if it's a worktree .git file
484		content, err := os.ReadFile(gitPath)
485		if err == nil && strings.HasPrefix(string(content), "gitdir:") {
486			return true
487		}
488	}
489	return false
490}
491
492// getGitHeadSubject returns the subject line of HEAD commit for a git repository.
493// Returns empty string if unable to get the subject.
494func getGitHeadSubject(repoPath string) string {
495	cmd := exec.Command("git", "log", "-1", "--format=%s")
496	cmd.Dir = repoPath
497	output, err := cmd.Output()
498	if err != nil {
499		return ""
500	}
501	return strings.TrimSpace(string(output))
502}
503
504// handleCreateDirectory creates a new directory
505func (s *Server) handleCreateDirectory(w http.ResponseWriter, r *http.Request) {
506	if r.Method != http.MethodPost {
507		http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
508		return
509	}
510
511	var req struct {
512		Path string `json:"path"`
513	}
514	if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
515		w.Header().Set("Content-Type", "application/json")
516		json.NewEncoder(w).Encode(map[string]interface{}{
517			"error": "invalid request body",
518		})
519		return
520	}
521
522	if req.Path == "" {
523		w.Header().Set("Content-Type", "application/json")
524		json.NewEncoder(w).Encode(map[string]interface{}{
525			"error": "path is required",
526		})
527		return
528	}
529
530	// Clean the path
531	path := filepath.Clean(req.Path)
532
533	// Check if path already exists
534	if _, err := os.Stat(path); err == nil {
535		w.Header().Set("Content-Type", "application/json")
536		json.NewEncoder(w).Encode(map[string]interface{}{
537			"error": "path already exists",
538		})
539		return
540	}
541
542	// Verify parent directory exists
543	parentDir := filepath.Dir(path)
544	if _, err := os.Stat(parentDir); os.IsNotExist(err) {
545		w.Header().Set("Content-Type", "application/json")
546		json.NewEncoder(w).Encode(map[string]interface{}{
547			"error": "parent directory does not exist",
548		})
549		return
550	}
551
552	// Create the directory (only the final directory, not parents)
553	if err := os.Mkdir(path, 0o755); err != nil {
554		w.Header().Set("Content-Type", "application/json")
555		if os.IsPermission(err) {
556			json.NewEncoder(w).Encode(map[string]interface{}{
557				"error": "permission denied",
558			})
559		} else {
560			json.NewEncoder(w).Encode(map[string]interface{}{
561				"error": err.Error(),
562			})
563		}
564		return
565	}
566
567	w.Header().Set("Content-Type", "application/json")
568	json.NewEncoder(w).Encode(map[string]interface{}{
569		"path": path,
570	})
571}
572
573// getOrCreateConversationManager gets an existing conversation manager or creates a new one.
574func (s *Server) getOrCreateConversationManager(ctx context.Context, conversationID string) (*ConversationManager, error) {
575	manager, err, _ := s.conversationGroup.Do(conversationID, func() (*ConversationManager, error) {
576		s.mu.Lock()
577		defer s.mu.Unlock()
578		if manager, exists := s.activeConversations[conversationID]; exists {
579			manager.Touch()
580			return manager, nil
581		}
582
583		recordMessage := func(ctx context.Context, message llm.Message, usage llm.Usage) error {
584			return s.recordMessage(ctx, conversationID, message, usage)
585		}
586
587		onStateChange := func(state ConversationState) {
588			s.publishConversationState(state)
589		}
590
591		manager := NewConversationManager(conversationID, s.db, s.logger, s.toolSetConfig, recordMessage, onStateChange)
592		if err := manager.Hydrate(ctx); err != nil {
593			return nil, err
594		}
595
596		s.activeConversations[conversationID] = manager
597		return manager, nil
598	})
599	if err != nil {
600		return nil, err
601	}
602	return manager, nil
603}
604
605// ExtractDisplayData extracts display data from message content for storage
606func ExtractDisplayData(message llm.Message) interface{} {
607	// Build a map of tool_use_id to tool_name for lookups
608	toolNameMap := make(map[string]string)
609	for _, content := range message.Content {
610		if content.Type == llm.ContentTypeToolUse {
611			toolNameMap[content.ID] = content.ToolName
612		}
613	}
614
615	var displayData []any
616	for _, content := range message.Content {
617		if content.Type == llm.ContentTypeToolResult && content.Display != nil {
618			// Include tool name if we can find it
619			toolName := toolNameMap[content.ToolUseID]
620			displayData = append(displayData, map[string]any{
621				"tool_use_id": content.ToolUseID,
622				"tool_name":   toolName,
623				"display":     content.Display,
624			})
625		}
626	}
627
628	if len(displayData) > 0 {
629		return displayData
630	}
631	return nil
632}
633
634// recordMessage records a new message to the database and also notifies subscribers
635func (s *Server) recordMessage(ctx context.Context, conversationID string, message llm.Message, usage llm.Usage) error {
636	// Log message based on role
637	if message.Role == llm.MessageRoleUser {
638		s.logger.Info("User message", "conversation_id", conversationID, "content_items", len(message.Content))
639	} else if message.Role == llm.MessageRoleAssistant {
640		s.logger.Info("Agent message", "conversation_id", conversationID, "content_items", len(message.Content), "end_of_turn", message.EndOfTurn)
641	}
642
643	// Convert LLM message to database format
644	messageType, err := s.getMessageType(message)
645	if err != nil {
646		return fmt.Errorf("failed to determine message type: %w", err)
647	}
648
649	// Extract display data from content items
650	displayDataToStore := ExtractDisplayData(message)
651
652	// Create message
653	createdMsg, err := s.db.CreateMessage(ctx, db.CreateMessageParams{
654		ConversationID:      conversationID,
655		Type:                messageType,
656		LLMData:             message,
657		UserData:            nil,
658		UsageData:           usage,
659		DisplayData:         displayDataToStore,
660		ExcludedFromContext: message.ExcludedFromContext,
661	})
662	if err != nil {
663		return fmt.Errorf("failed to create message: %w", err)
664	}
665
666	// Update conversation's last updated timestamp for correct ordering
667	if err := s.db.QueriesTx(ctx, func(q *generated.Queries) error {
668		return q.UpdateConversationTimestamp(ctx, conversationID)
669	}); err != nil {
670		s.logger.Warn("Failed to update conversation timestamp", "conversationID", conversationID, "error", err)
671	}
672
673	// Touch active manager activity time if present
674	s.mu.Lock()
675	mgr, ok := s.activeConversations[conversationID]
676	if ok {
677		mgr.Touch()
678	}
679	s.mu.Unlock()
680
681	// Notify subscribers with only the new message - use WithoutCancel because
682	// the HTTP request context may be cancelled after the handler returns, but
683	// we still want the notification to complete so SSE clients see the message immediately
684	go s.notifySubscribersNewMessage(context.WithoutCancel(ctx), conversationID, createdMsg)
685
686	return nil
687}
688
689// getMessageType determines the message type from an LLM message
690func (s *Server) getMessageType(message llm.Message) (db.MessageType, error) {
691	// System-generated errors are stored as error type
692	if message.ErrorType != llm.ErrorTypeNone {
693		return db.MessageTypeError, nil
694	}
695
696	switch message.Role {
697	case llm.MessageRoleUser:
698		return db.MessageTypeUser, nil
699	case llm.MessageRoleAssistant:
700		return db.MessageTypeAgent, nil
701	default:
702		// For tool messages, check if it's a tool call or tool result
703		for _, content := range message.Content {
704			if content.Type == llm.ContentTypeToolUse {
705				return db.MessageTypeTool, nil
706			}
707			if content.Type == llm.ContentTypeToolResult {
708				return db.MessageTypeTool, nil
709			}
710		}
711		return db.MessageTypeAgent, nil
712	}
713}
714
715// convertToLLMMessage converts a database message to an LLM message
716func convertToLLMMessage(msg generated.Message) (llm.Message, error) {
717	var llmMsg llm.Message
718	if msg.LlmData == nil {
719		return llm.Message{}, fmt.Errorf("message has no LLM data")
720	}
721	if err := json.Unmarshal([]byte(*msg.LlmData), &llmMsg); err != nil {
722		return llm.Message{}, fmt.Errorf("failed to unmarshal LLM data: %w", err)
723	}
724	return llmMsg, nil
725}
726
727// notifySubscribers sends conversation metadata updates (e.g., slug changes) to subscribers.
728// This is used when only the conversation data changes, not the messages.
729// Uses Broadcast instead of Publish to avoid racing with message sequence IDs.
730func (s *Server) notifySubscribers(ctx context.Context, conversationID string) {
731	s.mu.Lock()
732	manager, exists := s.activeConversations[conversationID]
733	s.mu.Unlock()
734
735	if !exists {
736		return
737	}
738
739	// Get conversation data only (no messages needed for metadata-only updates)
740	var conversation generated.Conversation
741	err := s.db.Queries(ctx, func(q *generated.Queries) error {
742		var err error
743		conversation, err = q.GetConversation(ctx, conversationID)
744		return err
745	})
746	if err != nil {
747		s.logger.Error("Failed to get conversation data for notification", "conversationID", conversationID, "error", err)
748		return
749	}
750
751	// Broadcast conversation update with no new messages.
752	// Using Broadcast instead of Publish ensures this metadata-only update
753	// doesn't race with notifySubscribersNewMessage which uses Publish with sequence IDs.
754	streamData := StreamResponse{
755		Messages:     nil, // No new messages, just conversation update
756		Conversation: conversation,
757	}
758	manager.subpub.Broadcast(streamData)
759
760	// Also notify conversation list subscribers (e.g., slug change)
761	s.publishConversationListUpdate(ConversationListUpdate{
762		Type:         "update",
763		Conversation: &conversation,
764	})
765}
766
767// notifySubscribersNewMessage sends a single new message to all subscribers.
768// This is more efficient than re-sending all messages on each update.
769func (s *Server) notifySubscribersNewMessage(ctx context.Context, conversationID string, newMsg *generated.Message) {
770	s.mu.Lock()
771	manager, exists := s.activeConversations[conversationID]
772	s.mu.Unlock()
773
774	if !exists {
775		return
776	}
777
778	// Get conversation data for the response
779	var conversation generated.Conversation
780	err := s.db.Queries(ctx, func(q *generated.Queries) error {
781		var err error
782		conversation, err = q.GetConversation(ctx, conversationID)
783		return err
784	})
785	if err != nil {
786		s.logger.Error("Failed to get conversation data for notification", "conversationID", conversationID, "error", err)
787		return
788	}
789
790	// Convert the single new message to API format
791	apiMessages := toAPIMessages([]generated.Message{*newMsg})
792
793	// Update agent working state based on message type
794	if isAgentEndOfTurn(newMsg) {
795		manager.SetAgentWorking(false)
796	}
797
798	// Publish only the new message
799	streamData := StreamResponse{
800		Messages:     apiMessages,
801		Conversation: conversation,
802		// ContextWindowSize: 0 for messages without usage data (user/tool messages).
803		// With omitempty, 0 is omitted from JSON, so the UI keeps its cached value.
804		// Only agent messages have usage data, so context window updates when they arrive.
805		ContextWindowSize: calculateContextWindowSizeFromMsg(newMsg),
806	}
807	manager.subpub.Publish(newMsg.SequenceID, streamData)
808
809	// Also notify conversation list subscribers about the update (updated_at changed)
810	s.publishConversationListUpdate(ConversationListUpdate{
811		Type:         "update",
812		Conversation: &conversation,
813	})
814}
815
816// publishConversationListUpdate broadcasts a conversation list update to ALL active
817// conversation streams. This allows clients to receive updates about other conversations
818// while they're subscribed to their current conversation's stream.
819func (s *Server) publishConversationListUpdate(update ConversationListUpdate) {
820	s.mu.Lock()
821	defer s.mu.Unlock()
822
823	// Broadcast to all active conversation managers
824	for _, manager := range s.activeConversations {
825		streamData := StreamResponse{
826			ConversationListUpdate: &update,
827		}
828		manager.subpub.Broadcast(streamData)
829	}
830}
831
832// publishConversationState broadcasts a conversation state update to ALL active
833// conversation streams. This allows clients to see the working state of other conversations.
834func (s *Server) publishConversationState(state ConversationState) {
835	s.mu.Lock()
836	defer s.mu.Unlock()
837
838	// Broadcast to all active conversation managers
839	for _, manager := range s.activeConversations {
840		streamData := StreamResponse{
841			ConversationState: &state,
842		}
843		manager.subpub.Broadcast(streamData)
844	}
845}
846
847// getWorkingConversations returns a map of conversation IDs that are currently working.
848func (s *Server) getWorkingConversations() map[string]bool {
849	s.mu.Lock()
850	defer s.mu.Unlock()
851
852	working := make(map[string]bool)
853	for id, manager := range s.activeConversations {
854		if manager.IsAgentWorking() {
855			working[id] = true
856		}
857	}
858	return working
859}
860
861// IsAgentWorking returns whether the agent is currently working on the given conversation.
862// Returns false if the conversation doesn't have an active manager.
863func (s *Server) IsAgentWorking(conversationID string) bool {
864	s.mu.Lock()
865	manager, exists := s.activeConversations[conversationID]
866	s.mu.Unlock()
867	if !exists {
868		return false
869	}
870	return manager.IsAgentWorking()
871}
872
873// Cleanup removes inactive conversation managers
874func (s *Server) Cleanup() {
875	s.mu.Lock()
876	defer s.mu.Unlock()
877
878	now := time.Now()
879	for id, manager := range s.activeConversations {
880		// Remove managers that have been inactive for more than 30 minutes
881		manager.mu.Lock()
882		lastActivity := manager.lastActivity
883		manager.mu.Unlock()
884		if now.Sub(lastActivity) > 30*time.Minute {
885			manager.stopLoop()
886			delete(s.activeConversations, id)
887			s.logger.Debug("Cleaned up inactive conversation", "conversationID", id)
888		}
889	}
890}
891
892// Start starts the HTTP server and handles the complete lifecycle
893func (s *Server) Start(port string) error {
894	listener, err := net.Listen("tcp", ":"+port)
895	if err != nil {
896		s.logger.Error("Failed to create listener", "error", err, "port_info", getPortOwnerInfo(port))
897		return err
898	}
899	return s.StartWithListener(listener)
900}
901
902// StartWithListener starts the HTTP server using the provided listener.
903// This is useful for systemd socket activation where the listener is created externally.
904func (s *Server) StartWithListener(listener net.Listener) error {
905	// Set up HTTP server with routes and middleware
906	mux := http.NewServeMux()
907	s.RegisterRoutes(mux)
908
909	// Add middleware (applied in reverse order: last added = first executed)
910	handler := LoggerMiddleware(s.logger)(mux)
911	handler = CSRFMiddleware()(handler)
912	if s.requireHeader != "" {
913		handler = RequireHeaderMiddleware(s.requireHeader)(handler)
914	}
915
916	httpServer := &http.Server{
917		Handler: handler,
918	}
919
920	// Start cleanup routine
921	go func() {
922		ticker := time.NewTicker(5 * time.Minute)
923		defer ticker.Stop()
924		for range ticker.C {
925			s.Cleanup()
926		}
927	}()
928
929	// Get actual port from listener
930	actualPort := listener.Addr().(*net.TCPAddr).Port
931
932	// Start server in goroutine
933	serverErrCh := make(chan error, 1)
934	go func() {
935		s.logger.Info("Server starting", "port", actualPort, "url", fmt.Sprintf("http://localhost:%d", actualPort))
936		if err := httpServer.Serve(listener); err != nil && err != http.ErrServerClosed {
937			serverErrCh <- err
938		}
939	}()
940
941	// Wait for shutdown signal or server error
942	quit := make(chan os.Signal, 1)
943	signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
944
945	select {
946	case err := <-serverErrCh:
947		s.logger.Error("Server failed", "error", err)
948		return err
949	case <-quit:
950		s.logger.Info("Shutting down server")
951	}
952
953	// Graceful shutdown
954	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
955	defer cancel()
956
957	if err := httpServer.Shutdown(ctx); err != nil {
958		s.logger.Error("Server forced to shutdown", "error", err)
959		return err
960	}
961
962	s.logger.Info("Server exited")
963	return nil
964}
965
966// getPortOwnerInfo tries to identify what process is using a port.
967// Returns a human-readable string with the PID and process name, or an error message.
968func getPortOwnerInfo(port string) string {
969	// Use lsof to find the process using the port
970	cmd := exec.Command("lsof", "-i", ":"+port, "-sTCP:LISTEN", "-n", "-P")
971	output, err := cmd.Output()
972	if err != nil {
973		return fmt.Sprintf("(unable to determine: %v)", err)
974	}
975
976	lines := strings.Split(strings.TrimSpace(string(output)), "\n")
977	if len(lines) < 2 {
978		return "(no process found)"
979	}
980
981	// Parse lsof output: COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
982	// Skip the header line
983	for _, line := range lines[1:] {
984		fields := strings.Fields(line)
985		if len(fields) >= 2 {
986			command := fields[0]
987			pid := fields[1]
988			return fmt.Sprintf("pid=%s process=%s", pid, command)
989		}
990	}
991
992	return "(could not parse lsof output)"
993}