app.go

  1package app
  2
  3import (
  4	"context"
  5	"database/sql"
  6	"errors"
  7	"fmt"
  8	"log/slog"
  9	"maps"
 10	"sync"
 11	"time"
 12
 13	tea "github.com/charmbracelet/bubbletea/v2"
 14	"github.com/charmbracelet/crush/internal/config"
 15	"github.com/charmbracelet/crush/internal/csync"
 16	"github.com/charmbracelet/crush/internal/db"
 17	"github.com/charmbracelet/crush/internal/format"
 18	"github.com/charmbracelet/crush/internal/history"
 19	"github.com/charmbracelet/crush/internal/llm/agent"
 20	"github.com/charmbracelet/crush/internal/log"
 21	"github.com/charmbracelet/crush/internal/pubsub"
 22
 23	"github.com/charmbracelet/crush/internal/lsp"
 24	"github.com/charmbracelet/crush/internal/message"
 25	"github.com/charmbracelet/crush/internal/permission"
 26	"github.com/charmbracelet/crush/internal/session"
 27	"github.com/charmbracelet/crush/internal/update"
 28)
 29
 30type App struct {
 31	Sessions    session.Service
 32	Messages    message.Service
 33	History     history.Service
 34	Permissions permission.Service
 35
 36	CoderAgent agent.Service
 37
 38	LSPClients map[string]*lsp.Client
 39
 40	clientsMutex sync.RWMutex
 41
 42	watcherCancelFuncs *csync.Slice[context.CancelFunc]
 43	lspWatcherWG       sync.WaitGroup
 44
 45	config *config.Config
 46
 47	serviceEventsWG *sync.WaitGroup
 48	eventsCtx       context.Context
 49	events          chan tea.Msg
 50	tuiWG           *sync.WaitGroup
 51
 52	// global context and cleanup functions
 53	globalCtx    context.Context
 54	cleanupFuncs []func()
 55}
 56
 57// New initializes a new applcation instance.
 58func New(ctx context.Context, conn *sql.DB, cfg *config.Config) (*App, error) {
 59	q := db.New(conn)
 60	sessions := session.NewService(q)
 61	messages := message.NewService(q)
 62	files := history.NewService(q, conn)
 63	skipPermissionsRequests := cfg.Permissions != nil && cfg.Permissions.SkipRequests
 64	allowedTools := []string{}
 65	if cfg.Permissions != nil && cfg.Permissions.AllowedTools != nil {
 66		allowedTools = cfg.Permissions.AllowedTools
 67	}
 68
 69	app := &App{
 70		Sessions:    sessions,
 71		Messages:    messages,
 72		History:     files,
 73		Permissions: permission.NewPermissionService(cfg.WorkingDir(), skipPermissionsRequests, allowedTools),
 74		LSPClients:  make(map[string]*lsp.Client),
 75
 76		globalCtx: ctx,
 77
 78		config: cfg,
 79
 80		watcherCancelFuncs: csync.NewSlice[context.CancelFunc](),
 81
 82		events:          make(chan tea.Msg, 100),
 83		serviceEventsWG: &sync.WaitGroup{},
 84		tuiWG:           &sync.WaitGroup{},
 85	}
 86
 87	app.setupEvents()
 88
 89	// Initialize LSP clients in the background.
 90	app.initLSPClients(ctx)
 91
 92	// Check for updates in the background.
 93	go app.checkForUpdates(ctx)
 94
 95	// TODO: remove the concept of agent config, most likely.
 96	if cfg.IsConfigured() {
 97		if err := app.InitCoderAgent(); err != nil {
 98			return nil, fmt.Errorf("failed to initialize coder agent: %w", err)
 99		}
100	} else {
101		slog.Warn("No agent configuration found")
102	}
103	return app, nil
104}
105
106// Config returns the application configuration.
107func (app *App) Config() *config.Config {
108	return app.config
109}
110
111// RunNonInteractive handles the execution flow when a prompt is provided via
112// CLI flag.
113func (app *App) RunNonInteractive(ctx context.Context, prompt string, quiet bool) error {
114	slog.Info("Running in non-interactive mode")
115
116	ctx, cancel := context.WithCancel(ctx)
117	defer cancel()
118
119	// Start spinner if not in quiet mode.
120	var spinner *format.Spinner
121	if !quiet {
122		spinner = format.NewSpinner(ctx, cancel, "Generating")
123		spinner.Start()
124	}
125
126	// Helper function to stop spinner once.
127	stopSpinner := func() {
128		if !quiet && spinner != nil {
129			spinner.Stop()
130			spinner = nil
131		}
132	}
133	defer stopSpinner()
134
135	const maxPromptLengthForTitle = 100
136	titlePrefix := "Non-interactive: "
137	var titleSuffix string
138
139	if len(prompt) > maxPromptLengthForTitle {
140		titleSuffix = prompt[:maxPromptLengthForTitle] + "..."
141	} else {
142		titleSuffix = prompt
143	}
144	title := titlePrefix + titleSuffix
145
146	sess, err := app.Sessions.Create(ctx, title)
147	if err != nil {
148		return fmt.Errorf("failed to create session for non-interactive mode: %w", err)
149	}
150	slog.Info("Created session for non-interactive run", "session_id", sess.ID)
151
152	// Automatically approve all permission requests for this non-interactive session
153	app.Permissions.AutoApproveSession(sess.ID)
154
155	done, err := app.CoderAgent.Run(ctx, sess.ID, prompt)
156	if err != nil {
157		return fmt.Errorf("failed to start agent processing stream: %w", err)
158	}
159
160	messageEvents := app.Messages.Subscribe(ctx)
161	readBts := 0
162
163	for {
164		select {
165		case result := <-done:
166			stopSpinner()
167
168			if result.Error != nil {
169				if errors.Is(result.Error, context.Canceled) || errors.Is(result.Error, agent.ErrRequestCancelled) {
170					slog.Info("Non-interactive: agent processing cancelled", "session_id", sess.ID)
171					return nil
172				}
173				return fmt.Errorf("agent processing failed: %w", result.Error)
174			}
175
176			msgContent := result.Message.Content().String()
177			if len(msgContent) < readBts {
178				slog.Error("Non-interactive: message content is shorter than read bytes", "message_length", len(msgContent), "read_bytes", readBts)
179				return fmt.Errorf("message content is shorter than read bytes: %d < %d", len(msgContent), readBts)
180			}
181			fmt.Println(msgContent[readBts:])
182
183			slog.Info("Non-interactive: run completed", "session_id", sess.ID)
184			return nil
185
186		case event := <-messageEvents:
187			msg := event.Payload
188			if msg.SessionID == sess.ID && msg.Role == message.Assistant && len(msg.Parts) > 0 {
189				stopSpinner()
190				part := msg.Content().String()[readBts:]
191				fmt.Print(part)
192				readBts += len(part)
193			}
194
195		case <-ctx.Done():
196			stopSpinner()
197			return ctx.Err()
198		}
199	}
200}
201
202func (app *App) UpdateAgentModel() error {
203	return app.CoderAgent.UpdateModel()
204}
205
206func (app *App) setupEvents() {
207	ctx, cancel := context.WithCancel(app.globalCtx)
208	app.eventsCtx = ctx
209	setupSubscriber(ctx, app.serviceEventsWG, "sessions", app.Sessions.Subscribe, app.events)
210	setupSubscriber(ctx, app.serviceEventsWG, "messages", app.Messages.Subscribe, app.events)
211	setupSubscriber(ctx, app.serviceEventsWG, "permissions", app.Permissions.Subscribe, app.events)
212	setupSubscriber(ctx, app.serviceEventsWG, "permissions-notifications", app.Permissions.SubscribeNotifications, app.events)
213	setupSubscriber(ctx, app.serviceEventsWG, "history", app.History.Subscribe, app.events)
214	cleanupFunc := func() {
215		cancel()
216		app.serviceEventsWG.Wait()
217	}
218	app.cleanupFuncs = append(app.cleanupFuncs, cleanupFunc)
219}
220
221func setupSubscriber[T any](
222	ctx context.Context,
223	wg *sync.WaitGroup,
224	name string,
225	subscriber func(context.Context) <-chan pubsub.Event[T],
226	outputCh chan<- tea.Msg,
227) {
228	wg.Add(1)
229	go func() {
230		defer wg.Done()
231		subCh := subscriber(ctx)
232		for {
233			select {
234			case event, ok := <-subCh:
235				if !ok {
236					slog.Debug("subscription channel closed", "name", name)
237					return
238				}
239				var msg tea.Msg = event
240				select {
241				case outputCh <- msg:
242				case <-time.After(2 * time.Second):
243					slog.Warn("message dropped due to slow consumer", "name", name)
244				case <-ctx.Done():
245					slog.Debug("subscription cancelled", "name", name)
246					return
247				}
248			case <-ctx.Done():
249				slog.Debug("subscription cancelled", "name", name)
250				return
251			}
252		}
253	}()
254}
255
256func (app *App) InitCoderAgent() error {
257	coderAgentCfg := app.config.Agents["coder"]
258	if coderAgentCfg.ID == "" {
259		return fmt.Errorf("coder agent configuration is missing")
260	}
261	var err error
262	app.CoderAgent, err = agent.NewAgent(
263		app.globalCtx,
264		coderAgentCfg,
265		app.Permissions,
266		app.Sessions,
267		app.Messages,
268		app.History,
269		app.LSPClients,
270	)
271	if err != nil {
272		slog.Error("Failed to create coder agent", "err", err)
273		return err
274	}
275
276	// Add MCP client cleanup to shutdown process
277	app.cleanupFuncs = append(app.cleanupFuncs, agent.CloseMCPClients)
278
279	setupSubscriber(app.eventsCtx, app.serviceEventsWG, "coderAgent", app.CoderAgent.Subscribe, app.events)
280	return nil
281}
282
283// Subscribe sends events to the TUI as tea.Msgs.
284func (app *App) Subscribe(program *tea.Program) {
285	defer log.RecoverPanic("app.Subscribe", func() {
286		slog.Info("TUI subscription panic: attempting graceful shutdown")
287		program.Quit()
288	})
289
290	app.tuiWG.Add(1)
291	tuiCtx, tuiCancel := context.WithCancel(app.globalCtx)
292	app.cleanupFuncs = append(app.cleanupFuncs, func() {
293		slog.Debug("Cancelling TUI message handler")
294		tuiCancel()
295		app.tuiWG.Wait()
296	})
297	defer app.tuiWG.Done()
298
299	for {
300		select {
301		case <-tuiCtx.Done():
302			slog.Debug("TUI message handler shutting down")
303			return
304		case msg, ok := <-app.events:
305			if !ok {
306				slog.Debug("TUI message channel closed")
307				return
308			}
309			program.Send(msg)
310		}
311	}
312}
313
314// Shutdown performs a graceful shutdown of the application.
315func (app *App) Shutdown() {
316	if app.CoderAgent != nil {
317		app.CoderAgent.CancelAll()
318	}
319
320	for cancel := range app.watcherCancelFuncs.Seq() {
321		cancel()
322	}
323
324	// Wait for all LSP watchers to finish.
325	app.lspWatcherWG.Wait()
326
327	// Get all LSP clients.
328	app.clientsMutex.RLock()
329	clients := make(map[string]*lsp.Client, len(app.LSPClients))
330	maps.Copy(clients, app.LSPClients)
331	app.clientsMutex.RUnlock()
332
333	// Shutdown all LSP clients.
334	for name, client := range clients {
335		shutdownCtx, cancel := context.WithTimeout(app.globalCtx, 5*time.Second)
336		if err := client.Shutdown(shutdownCtx); err != nil {
337			slog.Error("Failed to shutdown LSP client", "name", name, "error", err)
338		}
339		cancel()
340	}
341
342	// Call call cleanup functions.
343	for _, cleanup := range app.cleanupFuncs {
344		if cleanup != nil {
345			cleanup()
346		}
347	}
348}
349
350// checkForUpdates checks for available updates in the background.
351func (app *App) checkForUpdates(ctx context.Context) {
352	// Use a timeout to avoid hanging indefinitely.
353	checkCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
354	defer cancel()
355
356	// Check for updates asynchronously.
357	updateCh := update.CheckForUpdateAsync(checkCtx, app.config.Options.DataDirectory)
358
359	select {
360	case info := <-updateCh:
361		if info != nil && info.Available {
362			// Send update notification through the event system.
363			app.events <- pubsub.UpdateAvailableMsg{
364				CurrentVersion: info.CurrentVersion,
365				LatestVersion:  info.LatestVersion,
366			}
367		}
368	case <-checkCtx.Done():
369		// Timeout or context cancelled.
370		return
371	}
372}