1// Package app wires together services, coordinates agents, and manages
2// application lifecycle.
3package app
4
5import (
6 "context"
7 "database/sql"
8 "errors"
9 "fmt"
10 "io"
11 "log/slog"
12 "os"
13 "strings"
14 "sync"
15 "time"
16
17 tea "charm.land/bubbletea/v2"
18 "charm.land/fantasy"
19 "charm.land/lipgloss/v2"
20 "github.com/charmbracelet/crush/internal/agent"
21 "github.com/charmbracelet/crush/internal/agent/tools/mcp"
22 "github.com/charmbracelet/crush/internal/config"
23 "github.com/charmbracelet/crush/internal/csync"
24 "github.com/charmbracelet/crush/internal/db"
25 "github.com/charmbracelet/crush/internal/format"
26 "github.com/charmbracelet/crush/internal/history"
27 "github.com/charmbracelet/crush/internal/log"
28 "github.com/charmbracelet/crush/internal/lsp"
29 "github.com/charmbracelet/crush/internal/message"
30 "github.com/charmbracelet/crush/internal/permission"
31 "github.com/charmbracelet/crush/internal/pubsub"
32 "github.com/charmbracelet/crush/internal/session"
33 "github.com/charmbracelet/crush/internal/shell"
34 "github.com/charmbracelet/crush/internal/tui/components/anim"
35 "github.com/charmbracelet/crush/internal/tui/styles"
36 "github.com/charmbracelet/crush/internal/update"
37 "github.com/charmbracelet/crush/internal/version"
38 "github.com/charmbracelet/x/ansi"
39 "github.com/charmbracelet/x/exp/charmtone"
40 "github.com/charmbracelet/x/term"
41)
42
43// UpdateAvailableMsg is sent when a new version is available.
44type UpdateAvailableMsg struct {
45 CurrentVersion string
46 LatestVersion string
47 IsDevelopment bool
48}
49
50type App struct {
51 Sessions session.Service
52 Messages message.Service
53 History history.Service
54 Permissions permission.Service
55
56 AgentCoordinator agent.Coordinator
57
58 LSPClients *csync.Map[string, *lsp.Client]
59
60 config *config.Config
61
62 serviceEventsWG *sync.WaitGroup
63 eventsCtx context.Context
64 events chan tea.Msg
65 tuiWG *sync.WaitGroup
66
67 // global context and cleanup functions
68 globalCtx context.Context
69 cleanupFuncs []func() error
70}
71
72// New initializes a new application instance.
73func New(ctx context.Context, conn *sql.DB, cfg *config.Config) (*App, error) {
74 q := db.New(conn)
75 sessions := session.NewService(q)
76 messages := message.NewService(q)
77 files := history.NewService(q, conn)
78 skipPermissionsRequests := cfg.Permissions != nil && cfg.Permissions.SkipRequests
79 var allowedTools []string
80 if cfg.Permissions != nil && cfg.Permissions.AllowedTools != nil {
81 allowedTools = cfg.Permissions.AllowedTools
82 }
83
84 app := &App{
85 Sessions: sessions,
86 Messages: messages,
87 History: files,
88 Permissions: permission.NewPermissionService(cfg.WorkingDir(), skipPermissionsRequests, allowedTools),
89 LSPClients: csync.NewMap[string, *lsp.Client](),
90
91 globalCtx: ctx,
92
93 config: cfg,
94
95 events: make(chan tea.Msg, 100),
96 serviceEventsWG: &sync.WaitGroup{},
97 tuiWG: &sync.WaitGroup{},
98 }
99
100 app.setupEvents()
101
102 // Initialize LSP clients in the background.
103 app.initLSPClients(ctx)
104
105 // Check for updates in the background.
106 go app.checkForUpdates(ctx)
107
108 go func() {
109 slog.Info("Initializing MCP clients")
110 mcp.Initialize(ctx, app.Permissions, cfg)
111 }()
112
113 // cleanup database upon app shutdown
114 app.cleanupFuncs = append(app.cleanupFuncs, conn.Close, mcp.Close)
115
116 // TODO: remove the concept of agent config, most likely.
117 if !cfg.IsConfigured() {
118 slog.Warn("No agent configuration found")
119 return app, nil
120 }
121 if err := app.InitCoderAgent(ctx); err != nil {
122 return nil, fmt.Errorf("failed to initialize coder agent: %w", err)
123 }
124 return app, nil
125}
126
127// Config returns the application configuration.
128func (app *App) Config() *config.Config {
129 return app.config
130}
131
132// RunNonInteractive runs the application in non-interactive mode with the
133// given prompt, printing to stdout.
134func (app *App) RunNonInteractive(ctx context.Context, output io.Writer, prompt string, quiet bool) error {
135 slog.Info("Running in non-interactive mode")
136
137 ctx, cancel := context.WithCancel(ctx)
138 defer cancel()
139
140 var (
141 spinner *format.Spinner
142 stdoutTTY bool
143 stderrTTY bool
144 stdinTTY bool
145 )
146
147 if f, ok := output.(*os.File); ok {
148 stdoutTTY = term.IsTerminal(f.Fd())
149 }
150 stderrTTY = term.IsTerminal(os.Stderr.Fd())
151 stdinTTY = term.IsTerminal(os.Stdin.Fd())
152
153 if !quiet && stderrTTY {
154 t := styles.CurrentTheme()
155
156 // Detect background color to set the appropriate color for the
157 // spinner's 'Generating...' text. Without this, that text would be
158 // unreadable in light terminals.
159 hasDarkBG := true
160 if f, ok := output.(*os.File); ok && stdinTTY && stdoutTTY {
161 hasDarkBG = lipgloss.HasDarkBackground(os.Stdin, f)
162 }
163 defaultFG := lipgloss.LightDark(hasDarkBG)(charmtone.Pepper, t.FgBase)
164
165 spinner = format.NewSpinner(ctx, cancel, anim.Settings{
166 Size: 10,
167 Label: "Generating",
168 LabelColor: defaultFG,
169 GradColorA: t.Primary,
170 GradColorB: t.Secondary,
171 CycleColors: true,
172 })
173 spinner.Start()
174 }
175
176 // Helper function to stop spinner once.
177 stopSpinner := func() {
178 if !quiet && spinner != nil {
179 spinner.Stop()
180 spinner = nil
181 }
182 }
183
184 // Wait for MCP initialization to complete before reading MCP tools.
185 if err := mcp.WaitForInit(ctx); err != nil {
186 return fmt.Errorf("failed to wait for MCP initialization: %w", err)
187 }
188
189 // force update of agent models before running so mcp tools are loaded
190 app.AgentCoordinator.UpdateModels(ctx)
191
192 defer stopSpinner()
193
194 const maxPromptLengthForTitle = 100
195 const titlePrefix = "Non-interactive: "
196 var titleSuffix string
197
198 if len(prompt) > maxPromptLengthForTitle {
199 titleSuffix = prompt[:maxPromptLengthForTitle] + "..."
200 } else {
201 titleSuffix = prompt
202 }
203 title := titlePrefix + titleSuffix
204
205 sess, err := app.Sessions.Create(ctx, title)
206 if err != nil {
207 return fmt.Errorf("failed to create session for non-interactive mode: %w", err)
208 }
209 slog.Info("Created session for non-interactive run", "session_id", sess.ID)
210
211 // Automatically approve all permission requests for this non-interactive
212 // session.
213 app.Permissions.AutoApproveSession(sess.ID)
214
215 type response struct {
216 result *fantasy.AgentResult
217 err error
218 }
219 done := make(chan response, 1)
220
221 go func(ctx context.Context, sessionID, prompt string) {
222 result, err := app.AgentCoordinator.Run(ctx, sess.ID, prompt)
223 if err != nil {
224 done <- response{
225 err: fmt.Errorf("failed to start agent processing stream: %w", err),
226 }
227 }
228 done <- response{
229 result: result,
230 }
231 }(ctx, sess.ID, prompt)
232
233 messageEvents := app.Messages.Subscribe(ctx)
234 messageReadBytes := make(map[string]int)
235
236 defer func() {
237 if stderrTTY {
238 _, _ = fmt.Fprintf(os.Stderr, ansi.ResetProgressBar)
239 }
240
241 // Always print a newline at the end. If output is a TTY this will
242 // prevent the prompt from overwriting the last line of output.
243 _, _ = fmt.Fprintln(output)
244 }()
245
246 for {
247 if stderrTTY {
248 // HACK: Reinitialize the terminal progress bar on every iteration
249 // so it doesn't get hidden by the terminal due to inactivity.
250 _, _ = fmt.Fprintf(os.Stderr, ansi.SetIndeterminateProgressBar)
251 }
252
253 select {
254 case result := <-done:
255 stopSpinner()
256 if result.err != nil {
257 if errors.Is(result.err, context.Canceled) || errors.Is(result.err, agent.ErrRequestCancelled) {
258 slog.Info("Non-interactive: agent processing cancelled", "session_id", sess.ID)
259 return nil
260 }
261 return fmt.Errorf("agent processing failed: %w", result.err)
262 }
263 return nil
264
265 case event := <-messageEvents:
266 msg := event.Payload
267 if msg.SessionID == sess.ID && msg.Role == message.Assistant && len(msg.Parts) > 0 {
268 stopSpinner()
269
270 content := msg.Content().String()
271 readBytes := messageReadBytes[msg.ID]
272
273 if len(content) < readBytes {
274 slog.Error("Non-interactive: message content is shorter than read bytes", "message_length", len(content), "read_bytes", readBytes)
275 return fmt.Errorf("message content is shorter than read bytes: %d < %d", len(content), readBytes)
276 }
277
278 part := content[readBytes:]
279 // Trim leading whitespace. Sometimes the LLM includes leading
280 // formatting and intentation, which we don't want here.
281 if readBytes == 0 {
282 part = strings.TrimLeft(part, " \t")
283 }
284 fmt.Fprint(output, part)
285 messageReadBytes[msg.ID] = len(content)
286 }
287
288 case <-ctx.Done():
289 stopSpinner()
290 return ctx.Err()
291 }
292 }
293}
294
295func (app *App) UpdateAgentModel(ctx context.Context) error {
296 if app.AgentCoordinator == nil {
297 return fmt.Errorf("agent configuration is missing")
298 }
299 return app.AgentCoordinator.UpdateModels(ctx)
300}
301
302func (app *App) setupEvents() {
303 ctx, cancel := context.WithCancel(app.globalCtx)
304 app.eventsCtx = ctx
305 setupSubscriber(ctx, app.serviceEventsWG, "sessions", app.Sessions.Subscribe, app.events)
306 setupSubscriber(ctx, app.serviceEventsWG, "messages", app.Messages.Subscribe, app.events)
307 setupSubscriber(ctx, app.serviceEventsWG, "permissions", app.Permissions.Subscribe, app.events)
308 setupSubscriber(ctx, app.serviceEventsWG, "permissions-notifications", app.Permissions.SubscribeNotifications, app.events)
309 setupSubscriber(ctx, app.serviceEventsWG, "history", app.History.Subscribe, app.events)
310 setupSubscriber(ctx, app.serviceEventsWG, "mcp", mcp.SubscribeEvents, app.events)
311 setupSubscriber(ctx, app.serviceEventsWG, "lsp", SubscribeLSPEvents, app.events)
312 cleanupFunc := func() error {
313 cancel()
314 app.serviceEventsWG.Wait()
315 return nil
316 }
317 app.cleanupFuncs = append(app.cleanupFuncs, cleanupFunc)
318}
319
320func setupSubscriber[T any](
321 ctx context.Context,
322 wg *sync.WaitGroup,
323 name string,
324 subscriber func(context.Context) <-chan pubsub.Event[T],
325 outputCh chan<- tea.Msg,
326) {
327 wg.Go(func() {
328 subCh := subscriber(ctx)
329 for {
330 select {
331 case event, ok := <-subCh:
332 if !ok {
333 slog.Debug("subscription channel closed", "name", name)
334 return
335 }
336 var msg tea.Msg = event
337 select {
338 case outputCh <- msg:
339 case <-time.After(2 * time.Second):
340 slog.Warn("message dropped due to slow consumer", "name", name)
341 case <-ctx.Done():
342 slog.Debug("subscription cancelled", "name", name)
343 return
344 }
345 case <-ctx.Done():
346 slog.Debug("subscription cancelled", "name", name)
347 return
348 }
349 }
350 })
351}
352
353func (app *App) InitCoderAgent(ctx context.Context) error {
354 coderAgentCfg := app.config.Agents[config.AgentCoder]
355 if coderAgentCfg.ID == "" {
356 return fmt.Errorf("coder agent configuration is missing")
357 }
358 var err error
359 app.AgentCoordinator, err = agent.NewCoordinator(
360 ctx,
361 app.config,
362 app.Sessions,
363 app.Messages,
364 app.Permissions,
365 app.History,
366 app.LSPClients,
367 )
368 if err != nil {
369 slog.Error("Failed to create coder agent", "err", err)
370 return err
371 }
372 return nil
373}
374
375// Subscribe sends events to the TUI as tea.Msgs.
376func (app *App) Subscribe(program *tea.Program) {
377 defer log.RecoverPanic("app.Subscribe", func() {
378 slog.Info("TUI subscription panic: attempting graceful shutdown")
379 program.Quit()
380 })
381
382 app.tuiWG.Add(1)
383 tuiCtx, tuiCancel := context.WithCancel(app.globalCtx)
384 app.cleanupFuncs = append(app.cleanupFuncs, func() error {
385 slog.Debug("Cancelling TUI message handler")
386 tuiCancel()
387 app.tuiWG.Wait()
388 return nil
389 })
390 defer app.tuiWG.Done()
391
392 for {
393 select {
394 case <-tuiCtx.Done():
395 slog.Debug("TUI message handler shutting down")
396 return
397 case msg, ok := <-app.events:
398 if !ok {
399 slog.Debug("TUI message channel closed")
400 return
401 }
402 program.Send(msg)
403 }
404 }
405}
406
407// Shutdown performs a graceful shutdown of the application.
408func (app *App) Shutdown() {
409 start := time.Now()
410 defer func() { slog.Info("Shutdown took " + time.Since(start).String()) }()
411
412 // First, cancel all agents and wait for them to finish. This must complete
413 // before closing the DB so agents can finish writing their state.
414 if app.AgentCoordinator != nil {
415 app.AgentCoordinator.CancelAll()
416 }
417
418 // Now run remaining cleanup tasks in parallel.
419 var wg sync.WaitGroup
420
421 // Kill all background shells.
422 wg.Go(func() {
423 shell.GetBackgroundShellManager().KillAll()
424 })
425
426 // Shutdown all LSP clients.
427 shutdownCtx, cancel := context.WithTimeout(app.globalCtx, 5*time.Second)
428 defer cancel()
429 for name, client := range app.LSPClients.Seq2() {
430 wg.Go(func() {
431 if err := client.Close(shutdownCtx); err != nil &&
432 !errors.Is(err, io.EOF) &&
433 !errors.Is(err, context.Canceled) &&
434 err.Error() != "signal: killed" {
435 slog.Warn("Failed to shutdown LSP client", "name", name, "error", err)
436 }
437 })
438 }
439
440 // Call all cleanup functions.
441 for _, cleanup := range app.cleanupFuncs {
442 if cleanup != nil {
443 wg.Go(func() {
444 if err := cleanup(); err != nil {
445 slog.Error("Failed to cleanup app properly on shutdown", "error", err)
446 }
447 })
448 }
449 }
450 wg.Wait()
451}
452
453// checkForUpdates checks for available updates.
454func (app *App) checkForUpdates(ctx context.Context) {
455 checkCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
456 defer cancel()
457
458 info, err := update.Check(checkCtx, version.Version, update.Default)
459 if err != nil || !info.Available() {
460 return
461 }
462 app.events <- UpdateAvailableMsg{
463 CurrentVersion: info.Current,
464 LatestVersion: info.Latest,
465 IsDevelopment: info.IsDevelopment(),
466 }
467}