app.go

  1// Package app wires together services, coordinates agents, and manages
  2// application lifecycle.
  3package app
  4
  5import (
  6	"context"
  7	"database/sql"
  8	"errors"
  9	"fmt"
 10	"io"
 11	"log/slog"
 12	"os"
 13	"strings"
 14	"sync"
 15	"time"
 16
 17	tea "charm.land/bubbletea/v2"
 18	"charm.land/fantasy"
 19	"charm.land/lipgloss/v2"
 20	"github.com/charmbracelet/crush/internal/agent"
 21	"github.com/charmbracelet/crush/internal/agent/tools/mcp"
 22	"github.com/charmbracelet/crush/internal/config"
 23	"github.com/charmbracelet/crush/internal/csync"
 24	"github.com/charmbracelet/crush/internal/db"
 25	"github.com/charmbracelet/crush/internal/format"
 26	"github.com/charmbracelet/crush/internal/history"
 27	"github.com/charmbracelet/crush/internal/log"
 28	"github.com/charmbracelet/crush/internal/lsp"
 29	"github.com/charmbracelet/crush/internal/message"
 30	"github.com/charmbracelet/crush/internal/permission"
 31	"github.com/charmbracelet/crush/internal/pubsub"
 32	"github.com/charmbracelet/crush/internal/session"
 33	"github.com/charmbracelet/crush/internal/shell"
 34	"github.com/charmbracelet/crush/internal/tui/components/anim"
 35	"github.com/charmbracelet/crush/internal/tui/styles"
 36	"github.com/charmbracelet/crush/internal/update"
 37	"github.com/charmbracelet/crush/internal/version"
 38	"github.com/charmbracelet/x/ansi"
 39	"github.com/charmbracelet/x/exp/charmtone"
 40	"github.com/charmbracelet/x/term"
 41)
 42
 43type App struct {
 44	Sessions    session.Service
 45	Messages    message.Service
 46	History     history.Service
 47	Permissions permission.Service
 48
 49	AgentCoordinator agent.Coordinator
 50
 51	LSPClients *csync.Map[string, *lsp.Client]
 52
 53	config *config.Config
 54
 55	serviceEventsWG *sync.WaitGroup
 56	eventsCtx       context.Context
 57	events          chan tea.Msg
 58	tuiWG           *sync.WaitGroup
 59
 60	// global context and cleanup functions
 61	globalCtx    context.Context
 62	cleanupFuncs []func() error
 63}
 64
 65// New initializes a new application instance.
 66func New(ctx context.Context, conn *sql.DB, cfg *config.Config) (*App, error) {
 67	q := db.New(conn)
 68	sessions := session.NewService(q)
 69	messages := message.NewService(q)
 70	files := history.NewService(q, conn)
 71	skipPermissionsRequests := cfg.Permissions != nil && cfg.Permissions.SkipRequests
 72	var allowedTools []string
 73	if cfg.Permissions != nil && cfg.Permissions.AllowedTools != nil {
 74		allowedTools = cfg.Permissions.AllowedTools
 75	}
 76
 77	app := &App{
 78		Sessions:    sessions,
 79		Messages:    messages,
 80		History:     files,
 81		Permissions: permission.NewPermissionService(cfg.WorkingDir(), skipPermissionsRequests, allowedTools),
 82		LSPClients:  csync.NewMap[string, *lsp.Client](),
 83
 84		globalCtx: ctx,
 85
 86		config: cfg,
 87
 88		events:          make(chan tea.Msg, 100),
 89		serviceEventsWG: &sync.WaitGroup{},
 90		tuiWG:           &sync.WaitGroup{},
 91	}
 92
 93	app.setupEvents()
 94
 95	// Initialize LSP clients in the background.
 96	app.initLSPClients(ctx)
 97
 98	// Check for updates in the background.
 99	go app.checkForUpdates(ctx)
100
101	go func() {
102		slog.Info("Initializing MCP clients")
103		mcp.Initialize(ctx, app.Permissions, cfg)
104	}()
105
106	// cleanup database upon app shutdown
107	app.cleanupFuncs = append(app.cleanupFuncs, conn.Close, mcp.Close)
108
109	// TODO: remove the concept of agent config, most likely.
110	if !cfg.IsConfigured() {
111		slog.Warn("No agent configuration found")
112		return app, nil
113	}
114	if err := app.InitCoderAgent(ctx); err != nil {
115		return nil, fmt.Errorf("failed to initialize coder agent: %w", err)
116	}
117	return app, nil
118}
119
120// Config returns the application configuration.
121func (app *App) Config() *config.Config {
122	return app.config
123}
124
125// RunNonInteractive runs the application in non-interactive mode with the
126// given prompt, printing to stdout.
127func (app *App) RunNonInteractive(ctx context.Context, output io.Writer, prompt string, quiet bool) error {
128	slog.Info("Running in non-interactive mode")
129
130	ctx, cancel := context.WithCancel(ctx)
131	defer cancel()
132
133	var (
134		spinner   *format.Spinner
135		stdoutTTY bool
136		stderrTTY bool
137		stdinTTY  bool
138	)
139
140	if f, ok := output.(*os.File); ok {
141		stdoutTTY = term.IsTerminal(f.Fd())
142	}
143	stderrTTY = term.IsTerminal(os.Stderr.Fd())
144	stdinTTY = term.IsTerminal(os.Stdin.Fd())
145
146	if !quiet && stderrTTY {
147		t := styles.CurrentTheme()
148
149		// Detect background color to set the appropriate color for the
150		// spinner's 'Generating...' text. Without this, that text would be
151		// unreadable in light terminals.
152		hasDarkBG := true
153		if f, ok := output.(*os.File); ok && stdinTTY && stdoutTTY {
154			hasDarkBG = lipgloss.HasDarkBackground(os.Stdin, f)
155		}
156		defaultFG := lipgloss.LightDark(hasDarkBG)(charmtone.Pepper, t.FgBase)
157
158		spinner = format.NewSpinner(ctx, cancel, anim.Settings{
159			Size:        10,
160			Label:       "Generating",
161			LabelColor:  defaultFG,
162			GradColorA:  t.Primary,
163			GradColorB:  t.Secondary,
164			CycleColors: true,
165		})
166		spinner.Start()
167	}
168
169	// Helper function to stop spinner once.
170	stopSpinner := func() {
171		if !quiet && spinner != nil {
172			spinner.Stop()
173			spinner = nil
174		}
175	}
176
177	// Wait for MCP initialization to complete before reading MCP tools.
178	if err := mcp.WaitForInit(ctx); err != nil {
179		return fmt.Errorf("failed to wait for MCP initialization: %w", err)
180	}
181
182	// force update of agent models before running so mcp tools are loaded
183	app.AgentCoordinator.UpdateModels(ctx)
184
185	defer stopSpinner()
186
187	const maxPromptLengthForTitle = 100
188	const titlePrefix = "Non-interactive: "
189	var titleSuffix string
190
191	if len(prompt) > maxPromptLengthForTitle {
192		titleSuffix = prompt[:maxPromptLengthForTitle] + "..."
193	} else {
194		titleSuffix = prompt
195	}
196	title := titlePrefix + titleSuffix
197
198	sess, err := app.Sessions.Create(ctx, title)
199	if err != nil {
200		return fmt.Errorf("failed to create session for non-interactive mode: %w", err)
201	}
202	slog.Info("Created session for non-interactive run", "session_id", sess.ID)
203
204	// Automatically approve all permission requests for this non-interactive
205	// session.
206	app.Permissions.AutoApproveSession(sess.ID)
207
208	type response struct {
209		result *fantasy.AgentResult
210		err    error
211	}
212	done := make(chan response, 1)
213
214	go func(ctx context.Context, sessionID, prompt string) {
215		result, err := app.AgentCoordinator.Run(ctx, sess.ID, prompt)
216		if err != nil {
217			done <- response{
218				err: fmt.Errorf("failed to start agent processing stream: %w", err),
219			}
220		}
221		done <- response{
222			result: result,
223		}
224	}(ctx, sess.ID, prompt)
225
226	messageEvents := app.Messages.Subscribe(ctx)
227	messageReadBytes := make(map[string]int)
228
229	defer func() {
230		if stderrTTY {
231			_, _ = fmt.Fprintf(os.Stderr, ansi.ResetProgressBar)
232		}
233
234		// Always print a newline at the end. If output is a TTY this will
235		// prevent the prompt from overwriting the last line of output.
236		_, _ = fmt.Fprintln(output)
237	}()
238
239	for {
240		if stderrTTY {
241			// HACK: Reinitialize the terminal progress bar on every iteration
242			// so it doesn't get hidden by the terminal due to inactivity.
243			_, _ = fmt.Fprintf(os.Stderr, ansi.SetIndeterminateProgressBar)
244		}
245
246		select {
247		case result := <-done:
248			stopSpinner()
249			if result.err != nil {
250				if errors.Is(result.err, context.Canceled) || errors.Is(result.err, agent.ErrRequestCancelled) {
251					slog.Info("Non-interactive: agent processing cancelled", "session_id", sess.ID)
252					return nil
253				}
254				return fmt.Errorf("agent processing failed: %w", result.err)
255			}
256			return nil
257
258		case event := <-messageEvents:
259			msg := event.Payload
260			if msg.SessionID == sess.ID && msg.Role == message.Assistant && len(msg.Parts) > 0 {
261				stopSpinner()
262
263				content := msg.Content().String()
264				readBytes := messageReadBytes[msg.ID]
265
266				if len(content) < readBytes {
267					slog.Error("Non-interactive: message content is shorter than read bytes", "message_length", len(content), "read_bytes", readBytes)
268					return fmt.Errorf("message content is shorter than read bytes: %d < %d", len(content), readBytes)
269				}
270
271				part := content[readBytes:]
272				// Trim leading whitespace. Sometimes the LLM includes leading
273				// formatting and intentation, which we don't want here.
274				if readBytes == 0 {
275					part = strings.TrimLeft(part, " \t")
276				}
277				fmt.Fprint(output, part)
278				messageReadBytes[msg.ID] = len(content)
279			}
280
281		case <-ctx.Done():
282			stopSpinner()
283			return ctx.Err()
284		}
285	}
286}
287
288func (app *App) UpdateAgentModel(ctx context.Context) error {
289	if app.AgentCoordinator == nil {
290		return fmt.Errorf("agent configuration is missing")
291	}
292	return app.AgentCoordinator.UpdateModels(ctx)
293}
294
295func (app *App) setupEvents() {
296	ctx, cancel := context.WithCancel(app.globalCtx)
297	app.eventsCtx = ctx
298	setupSubscriber(ctx, app.serviceEventsWG, "sessions", app.Sessions.Subscribe, app.events)
299	setupSubscriber(ctx, app.serviceEventsWG, "messages", app.Messages.Subscribe, app.events)
300	setupSubscriber(ctx, app.serviceEventsWG, "permissions", app.Permissions.Subscribe, app.events)
301	setupSubscriber(ctx, app.serviceEventsWG, "permissions-notifications", app.Permissions.SubscribeNotifications, app.events)
302	setupSubscriber(ctx, app.serviceEventsWG, "history", app.History.Subscribe, app.events)
303	setupSubscriber(ctx, app.serviceEventsWG, "mcp", mcp.SubscribeEvents, app.events)
304	setupSubscriber(ctx, app.serviceEventsWG, "lsp", SubscribeLSPEvents, app.events)
305	cleanupFunc := func() error {
306		cancel()
307		app.serviceEventsWG.Wait()
308		return nil
309	}
310	app.cleanupFuncs = append(app.cleanupFuncs, cleanupFunc)
311}
312
313func setupSubscriber[T any](
314	ctx context.Context,
315	wg *sync.WaitGroup,
316	name string,
317	subscriber func(context.Context) <-chan pubsub.Event[T],
318	outputCh chan<- tea.Msg,
319) {
320	wg.Go(func() {
321		subCh := subscriber(ctx)
322		for {
323			select {
324			case event, ok := <-subCh:
325				if !ok {
326					slog.Debug("subscription channel closed", "name", name)
327					return
328				}
329				var msg tea.Msg = event
330				select {
331				case outputCh <- msg:
332				case <-time.After(2 * time.Second):
333					slog.Warn("message dropped due to slow consumer", "name", name)
334				case <-ctx.Done():
335					slog.Debug("subscription cancelled", "name", name)
336					return
337				}
338			case <-ctx.Done():
339				slog.Debug("subscription cancelled", "name", name)
340				return
341			}
342		}
343	})
344}
345
346func (app *App) InitCoderAgent(ctx context.Context) error {
347	coderAgentCfg := app.config.Agents[config.AgentCoder]
348	if coderAgentCfg.ID == "" {
349		return fmt.Errorf("coder agent configuration is missing")
350	}
351	var err error
352	app.AgentCoordinator, err = agent.NewCoordinator(
353		ctx,
354		app.config,
355		app.Sessions,
356		app.Messages,
357		app.Permissions,
358		app.History,
359		app.LSPClients,
360	)
361	if err != nil {
362		slog.Error("Failed to create coder agent", "err", err)
363		return err
364	}
365	return nil
366}
367
368// Subscribe sends events to the TUI as tea.Msgs.
369func (app *App) Subscribe(program *tea.Program) {
370	defer log.RecoverPanic("app.Subscribe", func() {
371		slog.Info("TUI subscription panic: attempting graceful shutdown")
372		program.Quit()
373	})
374
375	app.tuiWG.Add(1)
376	tuiCtx, tuiCancel := context.WithCancel(app.globalCtx)
377	app.cleanupFuncs = append(app.cleanupFuncs, func() error {
378		slog.Debug("Cancelling TUI message handler")
379		tuiCancel()
380		app.tuiWG.Wait()
381		return nil
382	})
383	defer app.tuiWG.Done()
384
385	for {
386		select {
387		case <-tuiCtx.Done():
388			slog.Debug("TUI message handler shutting down")
389			return
390		case msg, ok := <-app.events:
391			if !ok {
392				slog.Debug("TUI message channel closed")
393				return
394			}
395			program.Send(msg)
396		}
397	}
398}
399
400// Shutdown performs a graceful shutdown of the application.
401func (app *App) Shutdown() {
402	start := time.Now()
403	defer func() { slog.Info("Shutdown took " + time.Since(start).String()) }()
404
405	// First, cancel all agents and wait for them to finish. This must complete
406	// before closing the DB so agents can finish writing their state.
407	if app.AgentCoordinator != nil {
408		app.AgentCoordinator.CancelAll()
409	}
410
411	// Now run remaining cleanup tasks in parallel.
412	var wg sync.WaitGroup
413
414	// Kill all background shells.
415	wg.Go(func() {
416		shell.GetBackgroundShellManager().KillAll()
417	})
418
419	// Shutdown all LSP clients.
420	shutdownCtx, cancel := context.WithTimeout(app.globalCtx, 5*time.Second)
421	defer cancel()
422	for name, client := range app.LSPClients.Seq2() {
423		wg.Go(func() {
424			if err := client.Close(shutdownCtx); err != nil &&
425				!errors.Is(err, io.EOF) &&
426				!errors.Is(err, context.Canceled) &&
427				err.Error() != "signal: killed" {
428				slog.Warn("Failed to shutdown LSP client", "name", name, "error", err)
429			}
430		})
431	}
432
433	// Call all cleanup functions.
434	for _, cleanup := range app.cleanupFuncs {
435		if cleanup != nil {
436			wg.Go(func() {
437				if err := cleanup(); err != nil {
438					slog.Error("Failed to cleanup app properly on shutdown", "error", err)
439				}
440			})
441		}
442	}
443	wg.Wait()
444}
445
446// checkForUpdates checks for available updates.
447func (app *App) checkForUpdates(ctx context.Context) {
448	checkCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
449	defer cancel()
450
451	info, err := update.Check(checkCtx, version.Version, update.Default)
452	if err != nil || !info.Available() {
453		return
454	}
455	app.events <- pubsub.UpdateAvailableMsg{
456		CurrentVersion: info.Current,
457		LatestVersion:  info.Latest,
458		IsDevelopment:  info.IsDevelopment(),
459	}
460}