1// Package app wires together services, coordinates agents, and manages
2// application lifecycle.
3package app
4
5import (
6 "context"
7 "database/sql"
8 "errors"
9 "fmt"
10 "io"
11 "log/slog"
12 "os"
13 "sync"
14 "time"
15
16 tea "charm.land/bubbletea/v2"
17 "charm.land/fantasy"
18 "charm.land/lipgloss/v2"
19 "git.secluded.site/crush/internal/agent"
20 "git.secluded.site/crush/internal/agent/tools/mcp"
21 "git.secluded.site/crush/internal/config"
22 "git.secluded.site/crush/internal/csync"
23 "git.secluded.site/crush/internal/db"
24 "git.secluded.site/crush/internal/format"
25 "git.secluded.site/crush/internal/history"
26 "git.secluded.site/crush/internal/log"
27 "git.secluded.site/crush/internal/lsp"
28 "git.secluded.site/crush/internal/message"
29 "git.secluded.site/crush/internal/notification"
30 "git.secluded.site/crush/internal/permission"
31 "git.secluded.site/crush/internal/pubsub"
32 "git.secluded.site/crush/internal/session"
33 "git.secluded.site/crush/internal/term"
34 "git.secluded.site/crush/internal/tui/components/anim"
35 "git.secluded.site/crush/internal/tui/styles"
36 "github.com/charmbracelet/x/ansi"
37 "github.com/charmbracelet/x/exp/charmtone"
38)
39
40type App struct {
41 Sessions session.Service
42 Messages message.Service
43 History history.Service
44 Permissions permission.Service
45
46 AgentCoordinator agent.Coordinator
47 Notifier *notification.Notifier
48
49 LSPClients *csync.Map[string, *lsp.Client]
50
51 config *config.Config
52
53 serviceEventsWG *sync.WaitGroup
54 eventsCtx context.Context
55 events chan tea.Msg
56 tuiWG *sync.WaitGroup
57
58 // global context and cleanup functions
59 globalCtx context.Context
60 cleanupFuncs []func() error
61}
62
63// New initializes a new applcation instance.
64func New(ctx context.Context, conn *sql.DB, cfg *config.Config) (*App, error) {
65 q := db.New(conn)
66 sessions := session.NewService(q)
67 messages := message.NewService(q)
68 files := history.NewService(q, conn)
69 skipPermissionsRequests := cfg.Permissions != nil && cfg.Permissions.SkipRequests
70 allowedTools := []string{}
71 if cfg.Permissions != nil && cfg.Permissions.AllowedTools != nil {
72 allowedTools = cfg.Permissions.AllowedTools
73 }
74
75 // Enable notifications by default, disable if configured.
76 notificationsEnabled := cfg.Options == nil || !cfg.Options.DisableNotifications
77
78 notifier := notification.New(notificationsEnabled)
79 permissions := permission.NewPermissionService(ctx, cfg.WorkingDir(), skipPermissionsRequests, allowedTools, notifier)
80
81 app := &App{
82 Sessions: sessions,
83 Messages: messages,
84 History: files,
85 Permissions: permissions,
86 Notifier: notifier,
87 LSPClients: csync.NewMap[string, *lsp.Client](),
88
89 globalCtx: ctx,
90
91 config: cfg,
92
93 events: make(chan tea.Msg, 100),
94 serviceEventsWG: &sync.WaitGroup{},
95 tuiWG: &sync.WaitGroup{},
96 }
97
98 app.setupEvents()
99
100 // Initialize LSP clients in the background.
101 app.initLSPClients(ctx)
102
103 go func() {
104 slog.Info("Initializing MCP clients")
105 mcp.Initialize(ctx, app.Permissions, cfg)
106 }()
107
108 // cleanup database upon app shutdown
109 app.cleanupFuncs = append(app.cleanupFuncs, conn.Close, mcp.Close)
110
111 // TODO: remove the concept of agent config, most likely.
112 if !cfg.IsConfigured() {
113 slog.Warn("No agent configuration found")
114 return app, nil
115 }
116 if err := app.InitCoderAgent(ctx); err != nil {
117 return nil, fmt.Errorf("failed to initialize coder agent: %w", err)
118 }
119 return app, nil
120}
121
122// Config returns the application configuration.
123func (app *App) Config() *config.Config {
124 return app.config
125}
126
127// RunNonInteractive runs the application in non-interactive mode with the
128// given prompt, printing to stdout.
129func (app *App) RunNonInteractive(ctx context.Context, output io.Writer, prompt string, quiet bool) error {
130 slog.Info("Running in non-interactive mode")
131
132 ctx, cancel := context.WithCancel(ctx)
133 defer cancel()
134
135 var spinner *format.Spinner
136 if !quiet {
137 t := styles.CurrentTheme()
138
139 // Detect background color to set the appropriate color for the
140 // spinner's 'Generating...' text. Without this, that text would be
141 // unreadable in light terminals.
142 hasDarkBG := true
143 if f, ok := output.(*os.File); ok {
144 hasDarkBG = lipgloss.HasDarkBackground(os.Stdin, f)
145 }
146 defaultFG := lipgloss.LightDark(hasDarkBG)(charmtone.Pepper, t.FgBase)
147
148 spinner = format.NewSpinner(ctx, cancel, anim.Settings{
149 Size: 10,
150 Label: "Generating",
151 LabelColor: defaultFG,
152 GradColorA: t.Primary,
153 GradColorB: t.Secondary,
154 CycleColors: true,
155 })
156 spinner.Start()
157 }
158
159 // Helper function to stop spinner once.
160 stopSpinner := func() {
161 if !quiet && spinner != nil {
162 spinner.Stop()
163 spinner = nil
164 }
165 }
166 defer stopSpinner()
167
168 const maxPromptLengthForTitle = 100
169 const titlePrefix = "Non-interactive: "
170 var titleSuffix string
171
172 if len(prompt) > maxPromptLengthForTitle {
173 titleSuffix = prompt[:maxPromptLengthForTitle] + "..."
174 } else {
175 titleSuffix = prompt
176 }
177 title := titlePrefix + titleSuffix
178
179 sess, err := app.Sessions.Create(ctx, title)
180 if err != nil {
181 return fmt.Errorf("failed to create session for non-interactive mode: %w", err)
182 }
183 slog.Info("Created session for non-interactive run", "session_id", sess.ID)
184
185 // Automatically approve all permission requests for this non-interactive
186 // session.
187 app.Permissions.AutoApproveSession(sess.ID)
188
189 type response struct {
190 result *fantasy.AgentResult
191 err error
192 }
193 done := make(chan response, 1)
194
195 go func(ctx context.Context, sessionID, prompt string) {
196 result, err := app.AgentCoordinator.Run(ctx, sess.ID, prompt)
197 if err != nil {
198 done <- response{
199 err: fmt.Errorf("failed to start agent processing stream: %w", err),
200 }
201 }
202 done <- response{
203 result: result,
204 }
205 }(ctx, sess.ID, prompt)
206
207 messageEvents := app.Messages.Subscribe(ctx)
208 messageReadBytes := make(map[string]int)
209 supportsProgressBar := term.SupportsProgressBar()
210
211 defer func() {
212 if supportsProgressBar {
213 _, _ = fmt.Fprintf(os.Stderr, ansi.ResetProgressBar)
214 }
215
216 // Always print a newline at the end. If output is a TTY this will
217 // prevent the prompt from overwriting the last line of output.
218 _, _ = fmt.Fprintln(output)
219 }()
220
221 for {
222 if supportsProgressBar {
223 // HACK: Reinitialize the terminal progress bar on every iteration so
224 // it doesn't get hidden by the terminal due to inactivity.
225 _, _ = fmt.Fprintf(os.Stderr, ansi.SetIndeterminateProgressBar)
226 }
227
228 select {
229 case result := <-done:
230 stopSpinner()
231 if result.err != nil {
232 if errors.Is(result.err, context.Canceled) || errors.Is(result.err, agent.ErrRequestCancelled) {
233 slog.Info("Non-interactive: agent processing cancelled", "session_id", sess.ID)
234 return nil
235 }
236 return fmt.Errorf("agent processing failed: %w", result.err)
237 }
238 return nil
239
240 case event := <-messageEvents:
241 msg := event.Payload
242 if msg.SessionID == sess.ID && msg.Role == message.Assistant && len(msg.Parts) > 0 {
243 stopSpinner()
244
245 content := msg.Content().String()
246 readBytes := messageReadBytes[msg.ID]
247
248 if len(content) < readBytes {
249 slog.Error("Non-interactive: message content is shorter than read bytes", "message_length", len(content), "read_bytes", readBytes)
250 return fmt.Errorf("message content is shorter than read bytes: %d < %d", len(content), readBytes)
251 }
252
253 part := content[readBytes:]
254 fmt.Fprint(output, part)
255 messageReadBytes[msg.ID] = len(content)
256 }
257
258 case <-ctx.Done():
259 stopSpinner()
260 return ctx.Err()
261 }
262 }
263}
264
265func (app *App) UpdateAgentModel(ctx context.Context) error {
266 return app.AgentCoordinator.UpdateModels(ctx)
267}
268
269func (app *App) setupEvents() {
270 ctx, cancel := context.WithCancel(app.globalCtx)
271 app.eventsCtx = ctx
272 setupSubscriber(ctx, app.serviceEventsWG, "sessions", app.Sessions.Subscribe, app.events)
273 setupSubscriber(ctx, app.serviceEventsWG, "messages", app.Messages.Subscribe, app.events)
274 setupSubscriber(ctx, app.serviceEventsWG, "permissions", app.Permissions.Subscribe, app.events)
275 setupSubscriber(ctx, app.serviceEventsWG, "permissions-notifications", app.Permissions.SubscribeNotifications, app.events)
276 setupSubscriber(ctx, app.serviceEventsWG, "history", app.History.Subscribe, app.events)
277 setupSubscriber(ctx, app.serviceEventsWG, "mcp", mcp.SubscribeEvents, app.events)
278 setupSubscriber(ctx, app.serviceEventsWG, "lsp", SubscribeLSPEvents, app.events)
279 cleanupFunc := func() error {
280 cancel()
281 app.serviceEventsWG.Wait()
282 return nil
283 }
284 app.cleanupFuncs = append(app.cleanupFuncs, cleanupFunc)
285}
286
287func setupSubscriber[T any](
288 ctx context.Context,
289 wg *sync.WaitGroup,
290 name string,
291 subscriber func(context.Context) <-chan pubsub.Event[T],
292 outputCh chan<- tea.Msg,
293) {
294 wg.Go(func() {
295 subCh := subscriber(ctx)
296 for {
297 select {
298 case event, ok := <-subCh:
299 if !ok {
300 slog.Debug("subscription channel closed", "name", name)
301 return
302 }
303 var msg tea.Msg = event
304 select {
305 case outputCh <- msg:
306 case <-time.After(2 * time.Second):
307 slog.Warn("message dropped due to slow consumer", "name", name)
308 case <-ctx.Done():
309 slog.Debug("subscription cancelled", "name", name)
310 return
311 }
312 case <-ctx.Done():
313 slog.Debug("subscription cancelled", "name", name)
314 return
315 }
316 }
317 })
318}
319
320func (app *App) InitCoderAgent(ctx context.Context) error {
321 coderAgentCfg := app.config.Agents[config.AgentCoder]
322 if coderAgentCfg.ID == "" {
323 return fmt.Errorf("coder agent configuration is missing")
324 }
325 var err error
326 app.AgentCoordinator, err = agent.NewCoordinator(
327 ctx,
328 app.config,
329 app.Sessions,
330 app.Messages,
331 app.Permissions,
332 app.History,
333 app.LSPClients,
334 app.Notifier,
335 )
336 if err != nil {
337 slog.Error("Failed to create coder agent", "err", err)
338 return err
339 }
340 return nil
341}
342
343// Subscribe sends events to the TUI as tea.Msgs.
344func (app *App) Subscribe(program *tea.Program) {
345 defer log.RecoverPanic("app.Subscribe", func() {
346 slog.Info("TUI subscription panic: attempting graceful shutdown")
347 program.Quit()
348 })
349
350 app.tuiWG.Add(1)
351 tuiCtx, tuiCancel := context.WithCancel(app.globalCtx)
352 app.cleanupFuncs = append(app.cleanupFuncs, func() error {
353 slog.Debug("Cancelling TUI message handler")
354 tuiCancel()
355 app.tuiWG.Wait()
356 return nil
357 })
358 defer app.tuiWG.Done()
359
360 for {
361 select {
362 case <-tuiCtx.Done():
363 slog.Debug("TUI message handler shutting down")
364 return
365 case msg, ok := <-app.events:
366 if !ok {
367 slog.Debug("TUI message channel closed")
368 return
369 }
370 program.Send(msg)
371 }
372 }
373}
374
375// Shutdown performs a graceful shutdown of the application.
376func (app *App) Shutdown() {
377 if app.AgentCoordinator != nil {
378 app.AgentCoordinator.CancelAll()
379 }
380
381 // Shutdown all LSP clients.
382 for name, client := range app.LSPClients.Seq2() {
383 shutdownCtx, cancel := context.WithTimeout(app.globalCtx, 5*time.Second)
384 if err := client.Close(shutdownCtx); err != nil {
385 slog.Error("Failed to shutdown LSP client", "name", name, "error", err)
386 }
387 cancel()
388 }
389
390 // Call call cleanup functions.
391 for _, cleanup := range app.cleanupFuncs {
392 if cleanup != nil {
393 if err := cleanup(); err != nil {
394 slog.Error("Failed to cleanup app properly on shutdown", "error", err)
395 }
396 }
397 }
398}