diff --git a/internal/app/app.go b/internal/app/app.go index c8f6fe75ed2db719fa7ace6d9507f46fd2b441f3..a64c16b79c74f077fb921f167430ff3c01b86cf3 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -78,7 +78,7 @@ func New(ctx context.Context, conn *sql.DB, cfg *config.Config) (*App, error) { watcherCancelFuncs: csync.NewSlice[context.CancelFunc](), - events: make(chan tea.Msg, 100), + events: make(chan tea.Msg, 256), serviceEventsWG: &sync.WaitGroup{}, tuiWG: &sync.WaitGroup{}, } diff --git a/internal/config/config.go b/internal/config/config.go index 7fef3b11d9b08f60d1ee9554bed27fd142536f7a..8665249a87167932292de06bb9e5c4908d0fdb08 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -143,6 +143,7 @@ type Options struct { DebugLSP bool `json:"debug_lsp,omitempty" jsonschema:"description=Enable debug logging for LSP servers,default=false"` DisableAutoSummarize bool `json:"disable_auto_summarize,omitempty" jsonschema:"description=Disable automatic conversation summarization,default=false"` DataDirectory string `json:"data_directory,omitempty" jsonschema:"description=Directory for storing application data (relative to working directory),default=.crush,example=.crush"` // Relative to the cwd + MaxMessages int `json:"max_messages,omitempty" jsonschema:"description=Maximum number of messages to keep in context (sliding window),default=50,minimum=10,maximum=200"` } type MCPs map[string]MCPConfig diff --git a/internal/config/load.go b/internal/config/load.go index 12defe528334dba0a0c93463310ffbd3d9226a56..a4aabbdf008d426181f0fc8fb8d90a0aa49959c0 100644 --- a/internal/config/load.go +++ b/internal/config/load.go @@ -315,6 +315,9 @@ func (c *Config) setDefaults(workingDir string) { if c.Options.DataDirectory == "" { c.Options.DataDirectory = filepath.Join(workingDir, defaultDataDirectory) } + if c.Options.MaxMessages < 10 { + c.Options.MaxMessages = 50 + } if c.Providers == nil { c.Providers = csync.NewMap[string, ProviderConfig]() } diff --git a/internal/llm/agent/agent.go b/internal/llm/agent/agent.go index bffb0592966b566b9129fd77637969ea96dda244..0541ec7d42c35cc05c6dc2129f85225613a51daa 100644 --- a/internal/llm/agent/agent.go +++ b/internal/llm/agent/agent.go @@ -355,8 +355,11 @@ func (a *agent) Run(ctx context.Context, sessionID string, content string, attac } genCtx, cancel := context.WithCancel(ctx) + defer cancel() // Ensure cancel is always called a.activeRequests.Set(sessionID, cancel) + defer a.activeRequests.Del(sessionID) // Clean up on exit + go func() { slog.Debug("Request started", "sessionID", sessionID) defer log.RecoverPanic("agent.Run", func() { @@ -371,8 +374,6 @@ func (a *agent) Run(ctx context.Context, sessionID string, content string, attac slog.Error(result.Error.Error()) } slog.Debug("Request completed", "sessionID", sessionID) - a.activeRequests.Del(sessionID) - cancel() a.Publish(pubsub.CreatedEvent, result) events <- result close(events) @@ -387,12 +388,23 @@ func (a *agent) processGeneration(ctx context.Context, sessionID, content string if err != nil { return a.err(fmt.Errorf("failed to list messages: %w", err)) } + + // sliding window to limit message history + maxMessagesInContext := cfg.Options.MaxMessages + if maxMessagesInContext > 0 && len(msgs) > maxMessagesInContext { + // Keep the first message (usually system/context) and the last N-1 messages + msgs = append(msgs[:1], msgs[len(msgs)-maxMessagesInContext+1:]...) + } + if len(msgs) == 0 { + // Use a context with timeout for title generation + titleCtx, titleCancel := context.WithTimeout(context.Background(), 30*time.Second) go func() { + defer titleCancel() defer log.RecoverPanic("agent.Run", func() { slog.Error("panic while generating title") }) - titleErr := a.generateTitle(context.Background(), sessionID, content) + titleErr := a.generateTitle(titleCtx, sessionID, content) if titleErr != nil && !errors.Is(titleErr, context.Canceled) && !errors.Is(titleErr, context.DeadlineExceeded) { slog.Error("failed to generate title", "error", titleErr) } diff --git a/internal/lsp/client.go b/internal/lsp/client.go index a6b9fcbb4caea4992fb2dbce6ddc6e75066c9da7..4db28c43ac74e4fa1a083b919bcd240ec9d6d04f 100644 --- a/internal/lsp/client.go +++ b/internal/lsp/client.go @@ -62,6 +62,12 @@ type Client struct { // Server state serverState atomic.Value + + // Shutdown tracking + shutdownOnce sync.Once + shutdownChan chan struct{} + stderrDone chan struct{} + messageHandlerDone chan struct{} } // NewClient creates a new LSP client. @@ -98,6 +104,9 @@ func NewClient(ctx context.Context, name string, config config.LSPConfig) (*Clie serverRequestHandlers: make(map[string]ServerRequestHandler), diagnostics: make(map[protocol.DocumentURI][]protocol.Diagnostic), openFiles: make(map[string]*OpenFileInfo), + shutdownChan: make(chan struct{}), + stderrDone: make(chan struct{}), + messageHandlerDone: make(chan struct{}), } // Initialize server state @@ -110,9 +119,15 @@ func NewClient(ctx context.Context, name string, config config.LSPConfig) (*Clie // Handle stderr in a separate goroutine go func() { + defer close(client.stderrDone) scanner := bufio.NewScanner(stderr) for scanner.Scan() { - slog.Error("LSP Server", "err", scanner.Text()) + select { + case <-client.shutdownChan: + return + default: + slog.Error("LSP Server", "err", scanner.Text()) + } } if err := scanner.Err(); err != nil { slog.Error("Error reading", "err", err) @@ -121,6 +136,7 @@ func NewClient(ctx context.Context, name string, config config.LSPConfig) (*Clie // Start message handling loop go func() { + defer close(client.messageHandlerDone) defer log.RecoverPanic("LSP-message-handler", func() { slog.Error("LSP message handler crashed, LSP functionality may be impaired") }) diff --git a/internal/lsp/methods.go b/internal/lsp/methods.go index afd087c1b86d5242e845e419c47234de11ce467f..b37304e8859774f3ffd89555c43738035a606fe1 100644 --- a/internal/lsp/methods.go +++ b/internal/lsp/methods.go @@ -3,6 +3,8 @@ package lsp import ( "context" + "log/slog" + "time" "github.com/charmbracelet/crush/internal/lsp/protocol" ) @@ -239,10 +241,56 @@ func (c *Client) Initialize(ctx context.Context, params protocol.ParamInitialize return result, err } -// Shutdown sends a shutdown request to the LSP server. +// Shutdown sends a shutdown request to the LSP server and cleans up resources. // A shutdown request is sent from the client to the server. It is sent once when the client decides to shutdown the server. The only notification that is sent after a shutdown request is the exit event. func (c *Client) Shutdown(ctx context.Context) error { - return c.Call(ctx, "shutdown", nil, nil) + var shutdownErr error + c.shutdownOnce.Do(func() { + // Signal shutdown to goroutines + close(c.shutdownChan) + + // Send shutdown request to server + shutdownErr = c.Call(ctx, "shutdown", nil, nil) + + // Clean up handlers map to prevent memory leaks + c.handlersMu.Lock() + for id, ch := range c.handlers { + close(ch) + delete(c.handlers, id) + } + c.handlersMu.Unlock() + + // Clean up open files map + c.openFilesMu.Lock() + for uri := range c.openFiles { + delete(c.openFiles, uri) + } + c.openFilesMu.Unlock() + + // Clean up diagnostics map + c.diagnosticsMu.Lock() + for uri := range c.diagnostics { + delete(c.diagnostics, uri) + } + c.diagnosticsMu.Unlock() + + // Wait for goroutines to finish with timeout + done := make(chan struct{}) + go func() { + <-c.stderrDone + <-c.messageHandlerDone + close(done) + }() + + select { + case <-done: + // Goroutines finished cleanly + case <-time.After(2 * time.Second): + // Timeout waiting for goroutines + slog.Warn("Timeout waiting for LSP goroutines to finish", "name", c.name) + } + }) + return shutdownErr } // WillSaveWaitUntil sends a textDocument/willSaveWaitUntil request to the LSP server. diff --git a/internal/pubsub/broker.go b/internal/pubsub/broker.go index 80948d3d515a4fb5dad0d4dc36adbbff4e502993..e59fe31fe49cebb6e9897d60e9ebe6030b524a93 100644 --- a/internal/pubsub/broker.go +++ b/internal/pubsub/broker.go @@ -2,10 +2,11 @@ package pubsub import ( "context" + "log/slog" "sync" ) -const bufferSize = 64 +const bufferSize = 256 type Broker[T any] struct { subs map[chan Event[T]]struct{} @@ -111,8 +112,11 @@ func (b *Broker[T]) Publish(t EventType, payload T) { select { case sub <- event: default: - // Channel is full, subscriber is slow - skip this event - // This prevents blocking the publisher + // Channel is full, subscriber is slow + // Log this for debugging but don't block + if b.GetSubscriberCount() > 0 { + slog.Debug("Dropping event for slow subscriber", "eventType", t) + } } } } diff --git a/internal/tui/components/chat/chat.go b/internal/tui/components/chat/chat.go index 8688f7e24c94290c74ae4344499acff61b43ac39..baed025a74186f92218c41a7dbd048f555d557e5 100644 --- a/internal/tui/components/chat/chat.go +++ b/internal/tui/components/chat/chat.go @@ -575,7 +575,7 @@ func (m *messageListCmp) buildToolResultMap(messages []message.Message) map[stri // convertMessagesToUI converts database messages to UI components. func (m *messageListCmp) convertMessagesToUI(sessionMessages []message.Message, toolResultMap map[string]message.ToolResult) []list.Item { - uiMessages := make([]list.Item, 0) + uiMessages := make([]list.Item, 0, len(sessionMessages)*2) // Pre-allocate with reasonable capacity for _, msg := range sessionMessages { switch msg.Role { @@ -595,7 +595,7 @@ func (m *messageListCmp) convertMessagesToUI(sessionMessages []message.Message, // convertAssistantMessage converts an assistant message and its tool calls to UI components. func (m *messageListCmp) convertAssistantMessage(msg message.Message, toolResultMap map[string]message.ToolResult) []list.Item { - var uiMessages []list.Item + uiMessages := make([]list.Item, 0) // Add assistant message if it should be displayed if m.shouldShowAssistantMessage(msg) {