server.go

   1package server
   2
   3import (
   4	"context"
   5	"encoding/json"
   6	"fmt"
   7	"log/slog"
   8	"net"
   9	"net/http"
  10	"os"
  11	"os/exec"
  12	"os/signal"
  13	"path/filepath"
  14	"sort"
  15	"strings"
  16	"sync"
  17	"syscall"
  18	"time"
  19
  20	"tailscale.com/util/singleflight"
  21
  22	"shelley.exe.dev/claudetool"
  23	"shelley.exe.dev/db"
  24	"shelley.exe.dev/db/generated"
  25	"shelley.exe.dev/llm"
  26	"shelley.exe.dev/models"
  27	"shelley.exe.dev/ui"
  28)
  29
  30// APIMessage is the message format sent to clients
  31// TODO: We could maybe omit llm_data when display_data is available
  32type APIMessage struct {
  33	MessageID      string    `json:"message_id"`
  34	ConversationID string    `json:"conversation_id"`
  35	SequenceID     int64     `json:"sequence_id"`
  36	Type           string    `json:"type"`
  37	LlmData        *string   `json:"llm_data,omitempty"`
  38	UserData       *string   `json:"user_data,omitempty"`
  39	UsageData      *string   `json:"usage_data,omitempty"`
  40	CreatedAt      time.Time `json:"created_at"`
  41	DisplayData    *string   `json:"display_data,omitempty"`
  42	EndOfTurn      *bool     `json:"end_of_turn,omitempty"`
  43}
  44
  45// ConversationState represents the current state of a conversation.
  46// This is broadcast to all subscribers whenever the state changes.
  47type ConversationState struct {
  48	ConversationID string `json:"conversation_id"`
  49	Working        bool   `json:"working"`
  50	Model          string `json:"model,omitempty"`
  51}
  52
  53// ConversationWithState combines a conversation with its working state.
  54type ConversationWithState struct {
  55	generated.Conversation
  56	Working bool `json:"working"`
  57}
  58
  59// StreamResponse represents the response format for conversation streaming
  60type StreamResponse struct {
  61	Messages          []APIMessage           `json:"messages"`
  62	Conversation      generated.Conversation `json:"conversation"`
  63	ConversationState *ConversationState     `json:"conversation_state,omitempty"`
  64	ContextWindowSize uint64                 `json:"context_window_size,omitempty"`
  65	// ConversationListUpdate is set when another conversation in the list changed
  66	ConversationListUpdate *ConversationListUpdate `json:"conversation_list_update,omitempty"`
  67	// Heartbeat indicates this is a heartbeat message (no new data, just keeping connection alive)
  68	Heartbeat bool `json:"heartbeat,omitempty"`
  69}
  70
  71// LLMProvider is an interface for getting LLM services
  72type LLMProvider interface {
  73	GetService(modelID string) (llm.Service, error)
  74	GetAvailableModels() []string
  75	HasModel(modelID string) bool
  76	GetModelInfo(modelID string) *models.ModelInfo
  77	RefreshCustomModels() error
  78}
  79
  80// NewLLMServiceManager creates a new LLM service manager from config
  81func NewLLMServiceManager(cfg *LLMConfig) LLMProvider {
  82	// Convert LLMConfig to models.Config
  83	modelConfig := &models.Config{
  84		AnthropicAPIKey: cfg.AnthropicAPIKey,
  85		OpenAIAPIKey:    cfg.OpenAIAPIKey,
  86		GeminiAPIKey:    cfg.GeminiAPIKey,
  87		FireworksAPIKey: cfg.FireworksAPIKey,
  88		Gateway:         cfg.Gateway,
  89		Logger:          cfg.Logger,
  90		DB:              cfg.DB,
  91	}
  92
  93	manager, err := models.NewManager(modelConfig)
  94	if err != nil {
  95		// This shouldn't happen in practice, but handle it gracefully
  96		cfg.Logger.Error("Failed to create models manager", "error", err)
  97	}
  98
  99	return manager
 100}
 101
 102// toAPIMessages converts database messages to API messages.
 103// When display_data is present (tool results), llm_data is omitted to save bandwidth
 104// since the display_data contains all information needed for UI rendering.
 105func toAPIMessages(messages []generated.Message) []APIMessage {
 106	apiMessages := make([]APIMessage, len(messages))
 107	for i, msg := range messages {
 108		var endOfTurnPtr *bool
 109		if msg.LlmData != nil && msg.Type == string(db.MessageTypeAgent) {
 110			if endOfTurn, ok := extractEndOfTurn(*msg.LlmData); ok {
 111				endOfTurnCopy := endOfTurn
 112				endOfTurnPtr = &endOfTurnCopy
 113			}
 114		}
 115
 116		// TODO: Consider omitting llm_data when display_data is present to save bandwidth.
 117		// The display_data contains all info needed for UI rendering of tool results,
 118		// but the UI currently still uses llm_data for some checks.
 119
 120		apiMsg := APIMessage{
 121			MessageID:      msg.MessageID,
 122			ConversationID: msg.ConversationID,
 123			SequenceID:     msg.SequenceID,
 124			Type:           msg.Type,
 125			LlmData:        msg.LlmData,
 126			UserData:       msg.UserData,
 127			UsageData:      msg.UsageData,
 128			CreatedAt:      msg.CreatedAt,
 129			DisplayData:    msg.DisplayData,
 130			EndOfTurn:      endOfTurnPtr,
 131		}
 132		apiMessages[i] = apiMsg
 133	}
 134	return apiMessages
 135}
 136
 137func extractEndOfTurn(raw string) (bool, bool) {
 138	var message llm.Message
 139	if err := json.Unmarshal([]byte(raw), &message); err != nil {
 140		return false, false
 141	}
 142	return message.EndOfTurn, true
 143}
 144
 145// calculateContextWindowSize returns the context window usage from the most recent message with non-zero usage.
 146// Each API call's input tokens represent the full conversation history sent to the model,
 147// so we only need the last message's tokens (not accumulated across all messages).
 148// The total input includes regular input tokens plus cached tokens (both read and created).
 149// Messages without usage data (user messages, tool messages, etc.) are skipped.
 150func calculateContextWindowSize(messages []APIMessage) uint64 {
 151	// Find the last message with non-zero usage data
 152	for i := len(messages) - 1; i >= 0; i-- {
 153		msg := messages[i]
 154		if msg.UsageData == nil {
 155			continue
 156		}
 157		var usage llm.Usage
 158		if err := json.Unmarshal([]byte(*msg.UsageData), &usage); err != nil {
 159			continue
 160		}
 161		ctxUsed := usage.ContextWindowUsed()
 162		if ctxUsed == 0 {
 163			continue
 164		}
 165		// Return total context window used: all input tokens + output tokens
 166		// This represents the full context that would be sent for the next turn
 167		return ctxUsed
 168	}
 169	return 0
 170}
 171
 172// isAgentEndOfTurn checks if a message is an agent or error message with end_of_turn=true.
 173// This indicates the agent loop has finished processing.
 174func isAgentEndOfTurn(msg *generated.Message) bool {
 175	if msg == nil {
 176		return false
 177	}
 178	// Agent and error messages can have end_of_turn
 179	if msg.Type != string(db.MessageTypeAgent) && msg.Type != string(db.MessageTypeError) {
 180		return false
 181	}
 182	if msg.LlmData == nil {
 183		return false
 184	}
 185	endOfTurn, ok := extractEndOfTurn(*msg.LlmData)
 186	if !ok {
 187		return false
 188	}
 189	return endOfTurn
 190}
 191
 192// calculateContextWindowSizeFromMsg calculates context window usage from a single message.
 193// Returns 0 if the message has no usage data (e.g., user messages), in which case
 194// the client should keep its previous context window value.
 195func calculateContextWindowSizeFromMsg(msg *generated.Message) uint64 {
 196	if msg == nil || msg.UsageData == nil {
 197		return 0
 198	}
 199	var usage llm.Usage
 200	if err := json.Unmarshal([]byte(*msg.UsageData), &usage); err != nil {
 201		return 0
 202	}
 203	return usage.ContextWindowUsed()
 204}
 205
 206// ConversationListUpdate represents an update to the conversation list
 207type ConversationListUpdate struct {
 208	Type           string                  `json:"type"` // "update", "delete"
 209	Conversation   *generated.Conversation `json:"conversation,omitempty"`
 210	ConversationID string                  `json:"conversation_id,omitempty"` // For deletes
 211}
 212
 213// Server manages the HTTP API and active conversations
 214type Server struct {
 215	db                  *db.DB
 216	llmManager          LLMProvider
 217	toolSetConfig       claudetool.ToolSetConfig
 218	activeConversations map[string]*ConversationManager
 219	mu                  sync.Mutex
 220	logger              *slog.Logger
 221	predictableOnly     bool
 222	terminalURL         string
 223	defaultModel        string
 224	links               []Link
 225	requireHeader       string
 226	conversationGroup   singleflight.Group[string, *ConversationManager]
 227	versionChecker      *VersionChecker
 228}
 229
 230// NewServer creates a new server instance
 231func NewServer(database *db.DB, llmManager LLMProvider, toolSetConfig claudetool.ToolSetConfig, logger *slog.Logger, predictableOnly bool, terminalURL, defaultModel, requireHeader string, links []Link) *Server {
 232	s := &Server{
 233		db:                  database,
 234		llmManager:          llmManager,
 235		toolSetConfig:       toolSetConfig,
 236		activeConversations: make(map[string]*ConversationManager),
 237		logger:              logger,
 238		predictableOnly:     predictableOnly,
 239		terminalURL:         terminalURL,
 240		defaultModel:        defaultModel,
 241		requireHeader:       requireHeader,
 242		links:               links,
 243		versionChecker:      NewVersionChecker(),
 244	}
 245
 246	// Set up subagent support
 247	s.toolSetConfig.SubagentRunner = NewSubagentRunner(s)
 248	s.toolSetConfig.SubagentDB = &db.SubagentDBAdapter{DB: database}
 249
 250	return s
 251}
 252
 253// RegisterRoutes registers HTTP routes on the given mux
 254func (s *Server) RegisterRoutes(mux *http.ServeMux) {
 255	// API routes - wrap with gzip where beneficial
 256	mux.Handle("/api/conversations", gzipHandler(http.HandlerFunc(s.handleConversations)))
 257	mux.Handle("/api/conversations/archived", gzipHandler(http.HandlerFunc(s.handleArchivedConversations)))
 258	mux.Handle("/api/conversations/new", http.HandlerFunc(s.handleNewConversation))           // Small response
 259	mux.Handle("/api/conversations/continue", http.HandlerFunc(s.handleContinueConversation)) // Small response
 260	mux.Handle("/api/conversation/", http.StripPrefix("/api/conversation", s.conversationMux()))
 261	mux.Handle("/api/conversation-by-slug/", gzipHandler(http.HandlerFunc(s.handleConversationBySlug)))
 262	mux.Handle("/api/validate-cwd", http.HandlerFunc(s.handleValidateCwd)) // Small response
 263	mux.Handle("/api/list-directory", gzipHandler(http.HandlerFunc(s.handleListDirectory)))
 264	mux.Handle("/api/create-directory", http.HandlerFunc(s.handleCreateDirectory))
 265	mux.Handle("/api/git/diffs", gzipHandler(http.HandlerFunc(s.handleGitDiffs)))
 266	mux.Handle("/api/git/diffs/", gzipHandler(http.HandlerFunc(s.handleGitDiffFiles)))
 267	mux.Handle("/api/git/file-diff/", gzipHandler(http.HandlerFunc(s.handleGitFileDiff)))
 268	mux.HandleFunc("/api/upload", s.handleUpload)                      // Binary uploads
 269	mux.HandleFunc("/api/read", s.handleRead)                          // Serves images
 270	mux.Handle("/api/write-file", http.HandlerFunc(s.handleWriteFile)) // Small response
 271	mux.HandleFunc("/api/exec-ws", s.handleExecWS)                     // Websocket for shell commands
 272
 273	// Custom models API
 274	mux.Handle("/api/custom-models", http.HandlerFunc(s.handleCustomModels))
 275	mux.Handle("/api/custom-models/", http.HandlerFunc(s.handleCustomModel))
 276	mux.Handle("/api/custom-models-test", http.HandlerFunc(s.handleTestModel))
 277
 278	// Models API (dynamic list refresh)
 279	mux.Handle("/api/models", http.HandlerFunc(s.handleModels))
 280
 281	// Version endpoints
 282	mux.Handle("GET /version", http.HandlerFunc(s.handleVersion))
 283	mux.Handle("GET /version-check", http.HandlerFunc(s.handleVersionCheck))
 284	mux.Handle("GET /version-changelog", http.HandlerFunc(s.handleVersionChangelog))
 285	mux.Handle("POST /upgrade", http.HandlerFunc(s.handleUpgrade))
 286	mux.Handle("POST /exit", http.HandlerFunc(s.handleExit))
 287
 288	// Debug endpoints
 289	mux.Handle("GET /debug/conversations", http.HandlerFunc(s.handleDebugConversationsPage))
 290	mux.Handle("GET /debug/llm_requests", http.HandlerFunc(s.handleDebugLLMRequests))
 291	mux.Handle("GET /debug/llm_requests/api", http.HandlerFunc(s.handleDebugLLMRequestsAPI))
 292	mux.Handle("GET /debug/llm_requests/{id}/request", http.HandlerFunc(s.handleDebugLLMRequestBody))
 293	mux.Handle("GET /debug/llm_requests/{id}/request_full", http.HandlerFunc(s.handleDebugLLMRequestBodyFull))
 294	mux.Handle("GET /debug/llm_requests/{id}/response", http.HandlerFunc(s.handleDebugLLMResponseBody))
 295
 296	// Serve embedded UI assets
 297	mux.Handle("/", s.staticHandler(ui.Assets()))
 298}
 299
 300// handleValidateCwd validates that a path exists and is a directory
 301func (s *Server) handleValidateCwd(w http.ResponseWriter, r *http.Request) {
 302	if r.Method != http.MethodGet {
 303		http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
 304		return
 305	}
 306
 307	path := r.URL.Query().Get("path")
 308	if path == "" {
 309		w.Header().Set("Content-Type", "application/json")
 310		json.NewEncoder(w).Encode(map[string]interface{}{
 311			"valid": false,
 312			"error": "path is required",
 313		})
 314		return
 315	}
 316
 317	info, err := os.Stat(path)
 318	if err != nil {
 319		w.Header().Set("Content-Type", "application/json")
 320		if os.IsNotExist(err) {
 321			json.NewEncoder(w).Encode(map[string]interface{}{
 322				"valid": false,
 323				"error": "directory does not exist",
 324			})
 325		} else {
 326			json.NewEncoder(w).Encode(map[string]interface{}{
 327				"valid": false,
 328				"error": err.Error(),
 329			})
 330		}
 331		return
 332	}
 333
 334	if !info.IsDir() {
 335		w.Header().Set("Content-Type", "application/json")
 336		json.NewEncoder(w).Encode(map[string]interface{}{
 337			"valid": false,
 338			"error": "path is not a directory",
 339		})
 340		return
 341	}
 342
 343	w.Header().Set("Content-Type", "application/json")
 344	json.NewEncoder(w).Encode(map[string]interface{}{
 345		"valid": true,
 346	})
 347}
 348
 349// DirectoryEntry represents a single directory entry for the directory picker
 350type DirectoryEntry struct {
 351	Name           string `json:"name"`
 352	IsDir          bool   `json:"is_dir"`
 353	GitHeadSubject string `json:"git_head_subject,omitempty"`
 354}
 355
 356// ListDirectoryResponse is the response from the list-directory endpoint
 357type ListDirectoryResponse struct {
 358	Path            string           `json:"path"`
 359	Parent          string           `json:"parent"`
 360	Entries         []DirectoryEntry `json:"entries"`
 361	GitHeadSubject  string           `json:"git_head_subject,omitempty"`
 362	GitWorktreeRoot string           `json:"git_worktree_root,omitempty"`
 363}
 364
 365// handleListDirectory lists the contents of a directory for the directory picker
 366func (s *Server) handleListDirectory(w http.ResponseWriter, r *http.Request) {
 367	if r.Method != http.MethodGet {
 368		http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
 369		return
 370	}
 371
 372	path := r.URL.Query().Get("path")
 373	if path == "" {
 374		// Default to home directory or root
 375		homeDir, err := os.UserHomeDir()
 376		if err != nil {
 377			path = "/"
 378		} else {
 379			path = homeDir
 380		}
 381	}
 382
 383	// Clean and resolve the path
 384	path = filepath.Clean(path)
 385
 386	// Verify path exists and is a directory
 387	info, err := os.Stat(path)
 388	if err != nil {
 389		w.Header().Set("Content-Type", "application/json")
 390		if os.IsNotExist(err) {
 391			json.NewEncoder(w).Encode(map[string]interface{}{
 392				"error": "directory does not exist",
 393			})
 394		} else if os.IsPermission(err) {
 395			json.NewEncoder(w).Encode(map[string]interface{}{
 396				"error": "permission denied",
 397			})
 398		} else {
 399			json.NewEncoder(w).Encode(map[string]interface{}{
 400				"error": err.Error(),
 401			})
 402		}
 403		return
 404	}
 405
 406	if !info.IsDir() {
 407		w.Header().Set("Content-Type", "application/json")
 408		json.NewEncoder(w).Encode(map[string]interface{}{
 409			"error": "path is not a directory",
 410		})
 411		return
 412	}
 413
 414	// Read directory contents
 415	dirEntries, err := os.ReadDir(path)
 416	if err != nil {
 417		w.Header().Set("Content-Type", "application/json")
 418		if os.IsPermission(err) {
 419			json.NewEncoder(w).Encode(map[string]interface{}{
 420				"error": "permission denied",
 421			})
 422		} else {
 423			json.NewEncoder(w).Encode(map[string]interface{}{
 424				"error": err.Error(),
 425			})
 426		}
 427		return
 428	}
 429
 430	// Build response with only directories (for directory picker)
 431	var entries []DirectoryEntry
 432	for _, entry := range dirEntries {
 433		// Only include directories
 434		if entry.IsDir() {
 435			dirEntry := DirectoryEntry{
 436				Name:  entry.Name(),
 437				IsDir: true,
 438			}
 439
 440			// Check if this is a git repo root and get HEAD commit subject
 441			entryPath := filepath.Join(path, entry.Name())
 442			if isGitRepo(entryPath) {
 443				if subject := getGitHeadSubject(entryPath); subject != "" {
 444					dirEntry.GitHeadSubject = subject
 445				}
 446			}
 447
 448			entries = append(entries, dirEntry)
 449		}
 450	}
 451
 452	// Sort entries: non-hidden first, then hidden (.*), alphabetically within each group
 453	sort.Slice(entries, func(i, j int) bool {
 454		iHidden := strings.HasPrefix(entries[i].Name, ".")
 455		jHidden := strings.HasPrefix(entries[j].Name, ".")
 456		if iHidden != jHidden {
 457			return !iHidden // non-hidden comes first
 458		}
 459		return entries[i].Name < entries[j].Name
 460	})
 461
 462	// Calculate parent directory
 463	parent := filepath.Dir(path)
 464	if parent == path {
 465		// At root, no parent
 466		parent = ""
 467	}
 468
 469	response := ListDirectoryResponse{
 470		Path:    path,
 471		Parent:  parent,
 472		Entries: entries,
 473	}
 474
 475	// Check if the current directory itself is a git repo
 476	if isGitRepo(path) {
 477		response.GitHeadSubject = getGitHeadSubject(path)
 478		if root := getGitWorktreeRoot(path); root != "" {
 479			response.GitWorktreeRoot = root
 480		}
 481	}
 482
 483	w.Header().Set("Content-Type", "application/json")
 484	json.NewEncoder(w).Encode(response)
 485}
 486
 487// getGitHeadSubject returns the subject line of HEAD commit for a git repository.
 488// Returns empty string if unable to get the subject.
 489// isGitRepo checks if the given path is a git repository root.
 490// Returns true for both regular repos (.git directory) and worktrees (.git file with gitdir:).
 491func isGitRepo(dirPath string) bool {
 492	gitPath := filepath.Join(dirPath, ".git")
 493	fi, err := os.Stat(gitPath)
 494	if err != nil {
 495		return false
 496	}
 497	if fi.IsDir() {
 498		return true // regular .git directory
 499	}
 500	if fi.Mode().IsRegular() {
 501		// Check if it's a worktree .git file
 502		content, err := os.ReadFile(gitPath)
 503		if err == nil && strings.HasPrefix(string(content), "gitdir:") {
 504			return true
 505		}
 506	}
 507	return false
 508}
 509
 510// getGitHeadSubject returns the subject line of HEAD commit for a git repository.
 511// Returns empty string if unable to get the subject.
 512func getGitHeadSubject(repoPath string) string {
 513	cmd := exec.Command("git", "log", "-1", "--format=%s")
 514	cmd.Dir = repoPath
 515	output, err := cmd.Output()
 516	if err != nil {
 517		return ""
 518	}
 519	return strings.TrimSpace(string(output))
 520}
 521
 522// getGitWorktreeRoot returns the main repository root if the given path is
 523// a git worktree (not the main repo itself). Returns "" otherwise.
 524func getGitWorktreeRoot(repoPath string) string {
 525	// Get the worktree's git dir and the common (main repo) git dir
 526	cmd := exec.Command("git", "rev-parse", "--git-dir", "--git-common-dir")
 527	cmd.Dir = repoPath
 528	output, err := cmd.Output()
 529	if err != nil {
 530		return ""
 531	}
 532	lines := strings.SplitN(strings.TrimSpace(string(output)), "\n", 2)
 533	if len(lines) != 2 {
 534		return ""
 535	}
 536	gitDir := lines[0]
 537	commonDir := lines[1]
 538
 539	// Resolve relative paths
 540	if !filepath.IsAbs(gitDir) {
 541		gitDir = filepath.Join(repoPath, gitDir)
 542	}
 543	if !filepath.IsAbs(commonDir) {
 544		commonDir = filepath.Join(repoPath, commonDir)
 545	}
 546	gitDir = filepath.Clean(gitDir)
 547	commonDir = filepath.Clean(commonDir)
 548
 549	// If they're the same, this is the main repo, not a worktree
 550	if gitDir == commonDir {
 551		return ""
 552	}
 553
 554	// The main repo root is the parent of the common .git dir
 555	return filepath.Dir(commonDir)
 556}
 557
 558// handleCreateDirectory creates a new directory
 559func (s *Server) handleCreateDirectory(w http.ResponseWriter, r *http.Request) {
 560	if r.Method != http.MethodPost {
 561		http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
 562		return
 563	}
 564
 565	var req struct {
 566		Path string `json:"path"`
 567	}
 568	if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
 569		w.Header().Set("Content-Type", "application/json")
 570		json.NewEncoder(w).Encode(map[string]interface{}{
 571			"error": "invalid request body",
 572		})
 573		return
 574	}
 575
 576	if req.Path == "" {
 577		w.Header().Set("Content-Type", "application/json")
 578		json.NewEncoder(w).Encode(map[string]interface{}{
 579			"error": "path is required",
 580		})
 581		return
 582	}
 583
 584	// Clean the path
 585	path := filepath.Clean(req.Path)
 586
 587	// Check if path already exists
 588	if _, err := os.Stat(path); err == nil {
 589		w.Header().Set("Content-Type", "application/json")
 590		json.NewEncoder(w).Encode(map[string]interface{}{
 591			"error": "path already exists",
 592		})
 593		return
 594	}
 595
 596	// Verify parent directory exists
 597	parentDir := filepath.Dir(path)
 598	if _, err := os.Stat(parentDir); os.IsNotExist(err) {
 599		w.Header().Set("Content-Type", "application/json")
 600		json.NewEncoder(w).Encode(map[string]interface{}{
 601			"error": "parent directory does not exist",
 602		})
 603		return
 604	}
 605
 606	// Create the directory (only the final directory, not parents)
 607	if err := os.Mkdir(path, 0o755); err != nil {
 608		w.Header().Set("Content-Type", "application/json")
 609		if os.IsPermission(err) {
 610			json.NewEncoder(w).Encode(map[string]interface{}{
 611				"error": "permission denied",
 612			})
 613		} else {
 614			json.NewEncoder(w).Encode(map[string]interface{}{
 615				"error": err.Error(),
 616			})
 617		}
 618		return
 619	}
 620
 621	w.Header().Set("Content-Type", "application/json")
 622	json.NewEncoder(w).Encode(map[string]interface{}{
 623		"path": path,
 624	})
 625}
 626
 627// getOrCreateConversationManager gets an existing conversation manager or creates a new one.
 628func (s *Server) getOrCreateConversationManager(ctx context.Context, conversationID string) (*ConversationManager, error) {
 629	manager, err, _ := s.conversationGroup.Do(conversationID, func() (*ConversationManager, error) {
 630		s.mu.Lock()
 631		defer s.mu.Unlock()
 632		if manager, exists := s.activeConversations[conversationID]; exists {
 633			manager.Touch()
 634			return manager, nil
 635		}
 636
 637		recordMessage := func(ctx context.Context, message llm.Message, usage llm.Usage) error {
 638			return s.recordMessage(ctx, conversationID, message, usage)
 639		}
 640
 641		onStateChange := func(state ConversationState) {
 642			s.publishConversationState(state)
 643		}
 644
 645		manager := NewConversationManager(conversationID, s.db, s.logger, s.toolSetConfig, recordMessage, onStateChange)
 646		if err := manager.Hydrate(ctx); err != nil {
 647			return nil, err
 648		}
 649
 650		s.activeConversations[conversationID] = manager
 651		return manager, nil
 652	})
 653	if err != nil {
 654		return nil, err
 655	}
 656	return manager, nil
 657}
 658
 659// ExtractDisplayData extracts display data from message content for storage
 660func ExtractDisplayData(message llm.Message) interface{} {
 661	// Build a map of tool_use_id to tool_name for lookups
 662	toolNameMap := make(map[string]string)
 663	for _, content := range message.Content {
 664		if content.Type == llm.ContentTypeToolUse {
 665			toolNameMap[content.ID] = content.ToolName
 666		}
 667	}
 668
 669	var displayData []any
 670	for _, content := range message.Content {
 671		if content.Type == llm.ContentTypeToolResult && content.Display != nil {
 672			// Include tool name if we can find it
 673			toolName := toolNameMap[content.ToolUseID]
 674			displayData = append(displayData, map[string]any{
 675				"tool_use_id": content.ToolUseID,
 676				"tool_name":   toolName,
 677				"display":     content.Display,
 678			})
 679		}
 680	}
 681
 682	if len(displayData) > 0 {
 683		return displayData
 684	}
 685	return nil
 686}
 687
 688// recordMessage records a new message to the database and also notifies subscribers
 689func (s *Server) recordMessage(ctx context.Context, conversationID string, message llm.Message, usage llm.Usage) error {
 690	// Log message based on role
 691	if message.Role == llm.MessageRoleUser {
 692		s.logger.Info("User message", "conversation_id", conversationID, "content_items", len(message.Content))
 693	} else if message.Role == llm.MessageRoleAssistant {
 694		s.logger.Info("Agent message", "conversation_id", conversationID, "content_items", len(message.Content), "end_of_turn", message.EndOfTurn)
 695	}
 696
 697	// Convert LLM message to database format
 698	messageType, err := s.getMessageType(message)
 699	if err != nil {
 700		return fmt.Errorf("failed to determine message type: %w", err)
 701	}
 702
 703	// Extract display data from content items
 704	displayDataToStore := ExtractDisplayData(message)
 705
 706	// Create message
 707	createdMsg, err := s.db.CreateMessage(ctx, db.CreateMessageParams{
 708		ConversationID:      conversationID,
 709		Type:                messageType,
 710		LLMData:             message,
 711		UserData:            nil,
 712		UsageData:           usage,
 713		DisplayData:         displayDataToStore,
 714		ExcludedFromContext: message.ExcludedFromContext,
 715	})
 716	if err != nil {
 717		return fmt.Errorf("failed to create message: %w", err)
 718	}
 719
 720	// Update conversation's last updated timestamp for correct ordering
 721	if err := s.db.QueriesTx(ctx, func(q *generated.Queries) error {
 722		return q.UpdateConversationTimestamp(ctx, conversationID)
 723	}); err != nil {
 724		s.logger.Warn("Failed to update conversation timestamp", "conversationID", conversationID, "error", err)
 725	}
 726
 727	// Touch active manager activity time if present
 728	s.mu.Lock()
 729	mgr, ok := s.activeConversations[conversationID]
 730	if ok {
 731		mgr.Touch()
 732	}
 733	s.mu.Unlock()
 734
 735	// Notify subscribers with only the new message - use WithoutCancel because
 736	// the HTTP request context may be cancelled after the handler returns, but
 737	// we still want the notification to complete so SSE clients see the message immediately
 738	go s.notifySubscribersNewMessage(context.WithoutCancel(ctx), conversationID, createdMsg)
 739
 740	return nil
 741}
 742
 743// getMessageType determines the message type from an LLM message
 744func (s *Server) getMessageType(message llm.Message) (db.MessageType, error) {
 745	// System-generated errors are stored as error type
 746	if message.ErrorType != llm.ErrorTypeNone {
 747		return db.MessageTypeError, nil
 748	}
 749
 750	switch message.Role {
 751	case llm.MessageRoleUser:
 752		return db.MessageTypeUser, nil
 753	case llm.MessageRoleAssistant:
 754		return db.MessageTypeAgent, nil
 755	default:
 756		// For tool messages, check if it's a tool call or tool result
 757		for _, content := range message.Content {
 758			if content.Type == llm.ContentTypeToolUse {
 759				return db.MessageTypeTool, nil
 760			}
 761			if content.Type == llm.ContentTypeToolResult {
 762				return db.MessageTypeTool, nil
 763			}
 764		}
 765		return db.MessageTypeAgent, nil
 766	}
 767}
 768
 769// convertToLLMMessage converts a database message to an LLM message
 770func convertToLLMMessage(msg generated.Message) (llm.Message, error) {
 771	var llmMsg llm.Message
 772	if msg.LlmData == nil {
 773		return llm.Message{}, fmt.Errorf("message has no LLM data")
 774	}
 775	if err := json.Unmarshal([]byte(*msg.LlmData), &llmMsg); err != nil {
 776		return llm.Message{}, fmt.Errorf("failed to unmarshal LLM data: %w", err)
 777	}
 778	return llmMsg, nil
 779}
 780
 781// notifySubscribers sends conversation metadata updates (e.g., slug changes) to subscribers.
 782// This is used when only the conversation data changes, not the messages.
 783// Uses Broadcast instead of Publish to avoid racing with message sequence IDs.
 784func (s *Server) notifySubscribers(ctx context.Context, conversationID string) {
 785	s.mu.Lock()
 786	manager, exists := s.activeConversations[conversationID]
 787	s.mu.Unlock()
 788
 789	if !exists {
 790		return
 791	}
 792
 793	// Get conversation data only (no messages needed for metadata-only updates)
 794	var conversation generated.Conversation
 795	err := s.db.Queries(ctx, func(q *generated.Queries) error {
 796		var err error
 797		conversation, err = q.GetConversation(ctx, conversationID)
 798		return err
 799	})
 800	if err != nil {
 801		s.logger.Error("Failed to get conversation data for notification", "conversationID", conversationID, "error", err)
 802		return
 803	}
 804
 805	// Broadcast conversation update with no new messages.
 806	// Using Broadcast instead of Publish ensures this metadata-only update
 807	// doesn't race with notifySubscribersNewMessage which uses Publish with sequence IDs.
 808	streamData := StreamResponse{
 809		Messages:     nil, // No new messages, just conversation update
 810		Conversation: conversation,
 811	}
 812	manager.subpub.Broadcast(streamData)
 813
 814	// Also notify conversation list subscribers (e.g., slug change)
 815	s.publishConversationListUpdate(ConversationListUpdate{
 816		Type:         "update",
 817		Conversation: &conversation,
 818	})
 819}
 820
 821// notifySubscribersNewMessage sends a single new message to all subscribers.
 822// This is more efficient than re-sending all messages on each update.
 823func (s *Server) notifySubscribersNewMessage(ctx context.Context, conversationID string, newMsg *generated.Message) {
 824	s.mu.Lock()
 825	manager, exists := s.activeConversations[conversationID]
 826	s.mu.Unlock()
 827
 828	if !exists {
 829		return
 830	}
 831
 832	// Get conversation data for the response
 833	var conversation generated.Conversation
 834	err := s.db.Queries(ctx, func(q *generated.Queries) error {
 835		var err error
 836		conversation, err = q.GetConversation(ctx, conversationID)
 837		return err
 838	})
 839	if err != nil {
 840		s.logger.Error("Failed to get conversation data for notification", "conversationID", conversationID, "error", err)
 841		return
 842	}
 843
 844	// Convert the single new message to API format
 845	apiMessages := toAPIMessages([]generated.Message{*newMsg})
 846
 847	// Update agent working state based on message type
 848	if isAgentEndOfTurn(newMsg) {
 849		manager.SetAgentWorking(false)
 850	}
 851
 852	// Publish only the new message
 853	streamData := StreamResponse{
 854		Messages:     apiMessages,
 855		Conversation: conversation,
 856		// ContextWindowSize: 0 for messages without usage data (user/tool messages).
 857		// With omitempty, 0 is omitted from JSON, so the UI keeps its cached value.
 858		// Only agent messages have usage data, so context window updates when they arrive.
 859		ContextWindowSize: calculateContextWindowSizeFromMsg(newMsg),
 860	}
 861	manager.subpub.Publish(newMsg.SequenceID, streamData)
 862
 863	// Also notify conversation list subscribers about the update (updated_at changed)
 864	s.publishConversationListUpdate(ConversationListUpdate{
 865		Type:         "update",
 866		Conversation: &conversation,
 867	})
 868}
 869
 870// publishConversationListUpdate broadcasts a conversation list update to ALL active
 871// conversation streams. This allows clients to receive updates about other conversations
 872// while they're subscribed to their current conversation's stream.
 873func (s *Server) publishConversationListUpdate(update ConversationListUpdate) {
 874	s.mu.Lock()
 875	defer s.mu.Unlock()
 876
 877	// Broadcast to all active conversation managers
 878	for _, manager := range s.activeConversations {
 879		streamData := StreamResponse{
 880			ConversationListUpdate: &update,
 881		}
 882		manager.subpub.Broadcast(streamData)
 883	}
 884}
 885
 886// publishConversationState broadcasts a conversation state update to ALL active
 887// conversation streams. This allows clients to see the working state of other conversations.
 888func (s *Server) publishConversationState(state ConversationState) {
 889	s.mu.Lock()
 890	defer s.mu.Unlock()
 891
 892	// Broadcast to all active conversation managers
 893	for _, manager := range s.activeConversations {
 894		streamData := StreamResponse{
 895			ConversationState: &state,
 896		}
 897		manager.subpub.Broadcast(streamData)
 898	}
 899}
 900
 901// getWorkingConversations returns a map of conversation IDs that are currently working.
 902func (s *Server) getWorkingConversations() map[string]bool {
 903	s.mu.Lock()
 904	defer s.mu.Unlock()
 905
 906	working := make(map[string]bool)
 907	for id, manager := range s.activeConversations {
 908		if manager.IsAgentWorking() {
 909			working[id] = true
 910		}
 911	}
 912	return working
 913}
 914
 915// IsAgentWorking returns whether the agent is currently working on the given conversation.
 916// Returns false if the conversation doesn't have an active manager.
 917func (s *Server) IsAgentWorking(conversationID string) bool {
 918	s.mu.Lock()
 919	manager, exists := s.activeConversations[conversationID]
 920	s.mu.Unlock()
 921	if !exists {
 922		return false
 923	}
 924	return manager.IsAgentWorking()
 925}
 926
 927// Cleanup removes inactive conversation managers
 928func (s *Server) Cleanup() {
 929	s.mu.Lock()
 930	defer s.mu.Unlock()
 931
 932	now := time.Now()
 933	for id, manager := range s.activeConversations {
 934		// Remove managers that have been inactive for more than 30 minutes
 935		manager.mu.Lock()
 936		lastActivity := manager.lastActivity
 937		manager.mu.Unlock()
 938		if now.Sub(lastActivity) > 30*time.Minute {
 939			manager.stopLoop()
 940			delete(s.activeConversations, id)
 941			s.logger.Debug("Cleaned up inactive conversation", "conversationID", id)
 942		}
 943	}
 944}
 945
 946// Start starts the HTTP server and handles the complete lifecycle
 947func (s *Server) Start(port string) error {
 948	listener, err := net.Listen("tcp", ":"+port)
 949	if err != nil {
 950		s.logger.Error("Failed to create listener", "error", err, "port_info", getPortOwnerInfo(port))
 951		return err
 952	}
 953	return s.StartWithListener(listener)
 954}
 955
 956// StartWithListener starts the HTTP server using the provided listener.
 957// This is useful for systemd socket activation where the listener is created externally.
 958func (s *Server) StartWithListener(listener net.Listener) error {
 959	// Set up HTTP server with routes and middleware
 960	mux := http.NewServeMux()
 961	s.RegisterRoutes(mux)
 962
 963	// Add middleware (applied in reverse order: last added = first executed)
 964	handler := LoggerMiddleware(s.logger)(mux)
 965	handler = CSRFMiddleware()(handler)
 966	if s.requireHeader != "" {
 967		handler = RequireHeaderMiddleware(s.requireHeader)(handler)
 968	}
 969
 970	httpServer := &http.Server{
 971		Handler: handler,
 972	}
 973
 974	// Start cleanup routine
 975	go func() {
 976		ticker := time.NewTicker(5 * time.Minute)
 977		defer ticker.Stop()
 978		for range ticker.C {
 979			s.Cleanup()
 980		}
 981	}()
 982
 983	// Get actual port from listener
 984	actualPort := listener.Addr().(*net.TCPAddr).Port
 985
 986	// Start server in goroutine
 987	serverErrCh := make(chan error, 1)
 988	go func() {
 989		s.logger.Info("Server starting", "port", actualPort, "url", fmt.Sprintf("http://localhost:%d", actualPort))
 990		if err := httpServer.Serve(listener); err != nil && err != http.ErrServerClosed {
 991			serverErrCh <- err
 992		}
 993	}()
 994
 995	// Wait for shutdown signal or server error
 996	quit := make(chan os.Signal, 1)
 997	signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
 998
 999	select {
1000	case err := <-serverErrCh:
1001		s.logger.Error("Server failed", "error", err)
1002		return err
1003	case <-quit:
1004		s.logger.Info("Shutting down server")
1005	}
1006
1007	// Graceful shutdown
1008	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
1009	defer cancel()
1010
1011	if err := httpServer.Shutdown(ctx); err != nil {
1012		s.logger.Error("Server forced to shutdown", "error", err)
1013		return err
1014	}
1015
1016	s.logger.Info("Server exited")
1017	return nil
1018}
1019
1020// getPortOwnerInfo tries to identify what process is using a port.
1021// Returns a human-readable string with the PID and process name, or an error message.
1022func getPortOwnerInfo(port string) string {
1023	// Use lsof to find the process using the port
1024	cmd := exec.Command("lsof", "-i", ":"+port, "-sTCP:LISTEN", "-n", "-P")
1025	output, err := cmd.Output()
1026	if err != nil {
1027		return fmt.Sprintf("(unable to determine: %v)", err)
1028	}
1029
1030	lines := strings.Split(strings.TrimSpace(string(output)), "\n")
1031	if len(lines) < 2 {
1032		return "(no process found)"
1033	}
1034
1035	// Parse lsof output: COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
1036	// Skip the header line
1037	for _, line := range lines[1:] {
1038		fields := strings.Fields(line)
1039		if len(fields) >= 2 {
1040			command := fields[0]
1041			pid := fields[1]
1042			return fmt.Sprintf("pid=%s process=%s", pid, command)
1043		}
1044	}
1045
1046	return "(could not parse lsof output)"
1047}