perf: memory improvements (sliding-window for messages and channel buffer exhaustion)

Raphael Amorim created

Change summary

internal/app/app.go                  |  2 
internal/config/config.go            |  1 
internal/config/load.go              |  3 +
internal/llm/agent/agent.go          | 18 ++++++++-
internal/lsp/client.go               | 18 +++++++++
internal/lsp/methods.go              | 52 ++++++++++++++++++++++++++++-
internal/pubsub/broker.go            | 10 ++++-
internal/tui/components/chat/chat.go |  4 +-
8 files changed, 96 insertions(+), 12 deletions(-)

Detailed changes

internal/app/app.go 🔗

@@ -78,7 +78,7 @@ func New(ctx context.Context, conn *sql.DB, cfg *config.Config) (*App, error) {
 
 		watcherCancelFuncs: csync.NewSlice[context.CancelFunc](),
 
-		events:          make(chan tea.Msg, 100),
+		events:          make(chan tea.Msg, 256),
 		serviceEventsWG: &sync.WaitGroup{},
 		tuiWG:           &sync.WaitGroup{},
 	}

internal/config/config.go 🔗

@@ -143,6 +143,7 @@ type Options struct {
 	DebugLSP             bool        `json:"debug_lsp,omitempty" jsonschema:"description=Enable debug logging for LSP servers,default=false"`
 	DisableAutoSummarize bool        `json:"disable_auto_summarize,omitempty" jsonschema:"description=Disable automatic conversation summarization,default=false"`
 	DataDirectory        string      `json:"data_directory,omitempty" jsonschema:"description=Directory for storing application data (relative to working directory),default=.crush,example=.crush"` // Relative to the cwd
+	MaxMessages          int         `json:"max_messages,omitempty" jsonschema:"description=Maximum number of messages to keep in context (sliding window),default=50,minimum=10,maximum=200"`
 }
 
 type MCPs map[string]MCPConfig

internal/config/load.go 🔗

@@ -315,6 +315,9 @@ func (c *Config) setDefaults(workingDir string) {
 	if c.Options.DataDirectory == "" {
 		c.Options.DataDirectory = filepath.Join(workingDir, defaultDataDirectory)
 	}
+	if c.Options.MaxMessages < 10 {
+		c.Options.MaxMessages = 50
+	}
 	if c.Providers == nil {
 		c.Providers = csync.NewMap[string, ProviderConfig]()
 	}

internal/llm/agent/agent.go 🔗

@@ -355,8 +355,11 @@ func (a *agent) Run(ctx context.Context, sessionID string, content string, attac
 	}
 
 	genCtx, cancel := context.WithCancel(ctx)
+	defer cancel() // Ensure cancel is always called
 
 	a.activeRequests.Set(sessionID, cancel)
+	defer a.activeRequests.Del(sessionID) // Clean up on exit
+
 	go func() {
 		slog.Debug("Request started", "sessionID", sessionID)
 		defer log.RecoverPanic("agent.Run", func() {
@@ -371,8 +374,6 @@ func (a *agent) Run(ctx context.Context, sessionID string, content string, attac
 			slog.Error(result.Error.Error())
 		}
 		slog.Debug("Request completed", "sessionID", sessionID)
-		a.activeRequests.Del(sessionID)
-		cancel()
 		a.Publish(pubsub.CreatedEvent, result)
 		events <- result
 		close(events)
@@ -387,12 +388,23 @@ func (a *agent) processGeneration(ctx context.Context, sessionID, content string
 	if err != nil {
 		return a.err(fmt.Errorf("failed to list messages: %w", err))
 	}
+
+	// sliding window to limit message history
+	maxMessagesInContext := cfg.Options.MaxMessages
+	if maxMessagesInContext > 0 && len(msgs) > maxMessagesInContext {
+		// Keep the first message (usually system/context) and the last N-1 messages
+		msgs = append(msgs[:1], msgs[len(msgs)-maxMessagesInContext+1:]...)
+	}
+
 	if len(msgs) == 0 {
+		// Use a context with timeout for title generation
+		titleCtx, titleCancel := context.WithTimeout(context.Background(), 30*time.Second)
 		go func() {
+			defer titleCancel()
 			defer log.RecoverPanic("agent.Run", func() {
 				slog.Error("panic while generating title")
 			})
-			titleErr := a.generateTitle(context.Background(), sessionID, content)
+			titleErr := a.generateTitle(titleCtx, sessionID, content)
 			if titleErr != nil && !errors.Is(titleErr, context.Canceled) && !errors.Is(titleErr, context.DeadlineExceeded) {
 				slog.Error("failed to generate title", "error", titleErr)
 			}

internal/lsp/client.go 🔗

@@ -62,6 +62,12 @@ type Client struct {
 
 	// Server state
 	serverState atomic.Value
+
+	// Shutdown tracking
+	shutdownOnce       sync.Once
+	shutdownChan       chan struct{}
+	stderrDone         chan struct{}
+	messageHandlerDone chan struct{}
 }
 
 // NewClient creates a new LSP client.
@@ -98,6 +104,9 @@ func NewClient(ctx context.Context, name string, config config.LSPConfig) (*Clie
 		serverRequestHandlers: make(map[string]ServerRequestHandler),
 		diagnostics:           make(map[protocol.DocumentURI][]protocol.Diagnostic),
 		openFiles:             make(map[string]*OpenFileInfo),
+		shutdownChan:          make(chan struct{}),
+		stderrDone:            make(chan struct{}),
+		messageHandlerDone:    make(chan struct{}),
 	}
 
 	// Initialize server state
@@ -110,9 +119,15 @@ func NewClient(ctx context.Context, name string, config config.LSPConfig) (*Clie
 
 	// Handle stderr in a separate goroutine
 	go func() {
+		defer close(client.stderrDone)
 		scanner := bufio.NewScanner(stderr)
 		for scanner.Scan() {
-			slog.Error("LSP Server", "err", scanner.Text())
+			select {
+			case <-client.shutdownChan:
+				return
+			default:
+				slog.Error("LSP Server", "err", scanner.Text())
+			}
 		}
 		if err := scanner.Err(); err != nil {
 			slog.Error("Error reading", "err", err)
@@ -121,6 +136,7 @@ func NewClient(ctx context.Context, name string, config config.LSPConfig) (*Clie
 
 	// Start message handling loop
 	go func() {
+		defer close(client.messageHandlerDone)
 		defer log.RecoverPanic("LSP-message-handler", func() {
 			slog.Error("LSP message handler crashed, LSP functionality may be impaired")
 		})

internal/lsp/methods.go 🔗

@@ -3,6 +3,8 @@ package lsp
 
 import (
 	"context"
+	"log/slog"
+	"time"
 
 	"github.com/charmbracelet/crush/internal/lsp/protocol"
 )
@@ -239,10 +241,56 @@ func (c *Client) Initialize(ctx context.Context, params protocol.ParamInitialize
 	return result, err
 }
 
-// Shutdown sends a shutdown request to the LSP server.
+// Shutdown sends a shutdown request to the LSP server and cleans up resources.
 // A shutdown request is sent from the client to the server. It is sent once when the client decides to shutdown the server. The only notification that is sent after a shutdown request is the exit event.
 func (c *Client) Shutdown(ctx context.Context) error {
-	return c.Call(ctx, "shutdown", nil, nil)
+	var shutdownErr error
+	c.shutdownOnce.Do(func() {
+		// Signal shutdown to goroutines
+		close(c.shutdownChan)
+
+		// Send shutdown request to server
+		shutdownErr = c.Call(ctx, "shutdown", nil, nil)
+
+		// Clean up handlers map to prevent memory leaks
+		c.handlersMu.Lock()
+		for id, ch := range c.handlers {
+			close(ch)
+			delete(c.handlers, id)
+		}
+		c.handlersMu.Unlock()
+
+		// Clean up open files map
+		c.openFilesMu.Lock()
+		for uri := range c.openFiles {
+			delete(c.openFiles, uri)
+		}
+		c.openFilesMu.Unlock()
+
+		// Clean up diagnostics map
+		c.diagnosticsMu.Lock()
+		for uri := range c.diagnostics {
+			delete(c.diagnostics, uri)
+		}
+		c.diagnosticsMu.Unlock()
+
+		// Wait for goroutines to finish with timeout
+		done := make(chan struct{})
+		go func() {
+			<-c.stderrDone
+			<-c.messageHandlerDone
+			close(done)
+		}()
+
+		select {
+		case <-done:
+			// Goroutines finished cleanly
+		case <-time.After(2 * time.Second):
+			// Timeout waiting for goroutines
+			slog.Warn("Timeout waiting for LSP goroutines to finish", "name", c.name)
+		}
+	})
+	return shutdownErr
 }
 
 // WillSaveWaitUntil sends a textDocument/willSaveWaitUntil request to the LSP server.

internal/pubsub/broker.go 🔗

@@ -2,10 +2,11 @@ package pubsub
 
 import (
 	"context"
+	"log/slog"
 	"sync"
 )
 
-const bufferSize = 64
+const bufferSize = 256
 
 type Broker[T any] struct {
 	subs      map[chan Event[T]]struct{}
@@ -111,8 +112,11 @@ func (b *Broker[T]) Publish(t EventType, payload T) {
 		select {
 		case sub <- event:
 		default:
-			// Channel is full, subscriber is slow - skip this event
-			// This prevents blocking the publisher
+			// Channel is full, subscriber is slow
+			// Log this for debugging but don't block
+			if b.GetSubscriberCount() > 0 {
+				slog.Debug("Dropping event for slow subscriber", "eventType", t)
+			}
 		}
 	}
 }

internal/tui/components/chat/chat.go 🔗

@@ -575,7 +575,7 @@ func (m *messageListCmp) buildToolResultMap(messages []message.Message) map[stri
 
 // convertMessagesToUI converts database messages to UI components.
 func (m *messageListCmp) convertMessagesToUI(sessionMessages []message.Message, toolResultMap map[string]message.ToolResult) []list.Item {
-	uiMessages := make([]list.Item, 0)
+	uiMessages := make([]list.Item, 0, len(sessionMessages)*2) // Pre-allocate with reasonable capacity
 
 	for _, msg := range sessionMessages {
 		switch msg.Role {
@@ -595,7 +595,7 @@ func (m *messageListCmp) convertMessagesToUI(sessionMessages []message.Message,
 
 // convertAssistantMessage converts an assistant message and its tool calls to UI components.
 func (m *messageListCmp) convertAssistantMessage(msg message.Message, toolResultMap map[string]message.ToolResult) []list.Item {
-	var uiMessages []list.Item
+	uiMessages := make([]list.Item, 0)
 
 	// Add assistant message if it should be displayed
 	if m.shouldShowAssistantMessage(msg) {