1package agent
2
3import (
4 "context"
5 "errors"
6 "fmt"
7 "strings"
8 "sync"
9
10 "github.com/opencode-ai/opencode/internal/config"
11 "github.com/opencode-ai/opencode/internal/llm/models"
12 "github.com/opencode-ai/opencode/internal/llm/prompt"
13 "github.com/opencode-ai/opencode/internal/llm/provider"
14 "github.com/opencode-ai/opencode/internal/llm/tools"
15 "github.com/opencode-ai/opencode/internal/logging"
16 "github.com/opencode-ai/opencode/internal/message"
17 "github.com/opencode-ai/opencode/internal/permission"
18 "github.com/opencode-ai/opencode/internal/pubsub"
19 "github.com/opencode-ai/opencode/internal/session"
20)
21
22// Common errors
23var (
24 ErrRequestCancelled = errors.New("request cancelled by user")
25 ErrSessionBusy = errors.New("session is currently processing another request")
26)
27
28type AgentEventType string
29
30const (
31 AgentEventTypeError AgentEventType = "error"
32 AgentEventTypeResponse AgentEventType = "response"
33 AgentEventTypeSummarize AgentEventType = "summarize"
34)
35
36type AgentEvent struct {
37 Type AgentEventType
38 Message message.Message
39 Error error
40
41 // When summarizing
42 SessionID string
43 Progress string
44 Done bool
45}
46
47type Service interface {
48 pubsub.Suscriber[AgentEvent]
49 Model() models.Model
50 Run(ctx context.Context, sessionID string, content string, attachments ...message.Attachment) (<-chan AgentEvent, error)
51 Cancel(sessionID string)
52 IsSessionBusy(sessionID string) bool
53 IsBusy() bool
54 Update(agentName config.AgentName, modelID models.ModelID) (models.Model, error)
55 Summarize(ctx context.Context, sessionID string) error
56}
57
58type agent struct {
59 *pubsub.Broker[AgentEvent]
60 sessions session.Service
61 messages message.Service
62
63 tools []tools.BaseTool
64 provider provider.Provider
65
66 titleProvider provider.Provider
67 summarizeProvider provider.Provider
68
69 activeRequests sync.Map
70}
71
72func NewAgent(
73 agentName config.AgentName,
74 sessions session.Service,
75 messages message.Service,
76 agentTools []tools.BaseTool,
77) (Service, error) {
78 agentProvider, err := createAgentProvider(agentName)
79 if err != nil {
80 return nil, err
81 }
82 var titleProvider provider.Provider
83 // Only generate titles for the coder agent
84 if agentName == config.AgentCoder {
85 titleProvider, err = createAgentProvider(config.AgentTitle)
86 if err != nil {
87 return nil, err
88 }
89 }
90 var summarizeProvider provider.Provider
91 if agentName == config.AgentCoder {
92 summarizeProvider, err = createAgentProvider(config.AgentSummarizer)
93 if err != nil {
94 return nil, err
95 }
96 }
97
98 agent := &agent{
99 Broker: pubsub.NewBroker[AgentEvent](),
100 provider: agentProvider,
101 messages: messages,
102 sessions: sessions,
103 tools: agentTools,
104 titleProvider: titleProvider,
105 summarizeProvider: summarizeProvider,
106 activeRequests: sync.Map{},
107 }
108
109 return agent, nil
110}
111
112func (a *agent) Model() models.Model {
113 return a.provider.Model()
114}
115
116func (a *agent) Cancel(sessionID string) {
117 // Cancel regular requests
118 if cancelFunc, exists := a.activeRequests.LoadAndDelete(sessionID); exists {
119 if cancel, ok := cancelFunc.(context.CancelFunc); ok {
120 logging.InfoPersist(fmt.Sprintf("Request cancellation initiated for session: %s", sessionID))
121 cancel()
122 }
123 }
124
125 // Also check for summarize requests
126 if cancelFunc, exists := a.activeRequests.LoadAndDelete(sessionID + "-summarize"); exists {
127 if cancel, ok := cancelFunc.(context.CancelFunc); ok {
128 logging.InfoPersist(fmt.Sprintf("Summarize cancellation initiated for session: %s", sessionID))
129 cancel()
130 }
131 }
132}
133
134func (a *agent) IsBusy() bool {
135 busy := false
136 a.activeRequests.Range(func(key, value interface{}) bool {
137 if cancelFunc, ok := value.(context.CancelFunc); ok {
138 if cancelFunc != nil {
139 busy = true
140 return false // Stop iterating
141 }
142 }
143 return true // Continue iterating
144 })
145 return busy
146}
147
148func (a *agent) IsSessionBusy(sessionID string) bool {
149 _, busy := a.activeRequests.Load(sessionID)
150 return busy
151}
152
153func (a *agent) generateTitle(ctx context.Context, sessionID string, content string) error {
154 if content == "" {
155 return nil
156 }
157 if a.titleProvider == nil {
158 return nil
159 }
160 session, err := a.sessions.Get(ctx, sessionID)
161 if err != nil {
162 return err
163 }
164 parts := []message.ContentPart{message.TextContent{Text: content}}
165 response, err := a.titleProvider.SendMessages(
166 ctx,
167 []message.Message{
168 {
169 Role: message.User,
170 Parts: parts,
171 },
172 },
173 make([]tools.BaseTool, 0),
174 )
175 if err != nil {
176 return err
177 }
178
179 title := strings.TrimSpace(strings.ReplaceAll(response.Content, "\n", " "))
180 if title == "" {
181 return nil
182 }
183
184 session.Title = title
185 _, err = a.sessions.Save(ctx, session)
186 return err
187}
188
189func (a *agent) err(err error) AgentEvent {
190 return AgentEvent{
191 Type: AgentEventTypeError,
192 Error: err,
193 }
194}
195
196func (a *agent) Run(ctx context.Context, sessionID string, content string, attachments ...message.Attachment) (<-chan AgentEvent, error) {
197 if !a.provider.Model().SupportsAttachments && attachments != nil {
198 attachments = nil
199 }
200 events := make(chan AgentEvent)
201 if a.IsSessionBusy(sessionID) {
202 return nil, ErrSessionBusy
203 }
204
205 genCtx, cancel := context.WithCancel(ctx)
206
207 a.activeRequests.Store(sessionID, cancel)
208 go func() {
209 logging.Debug("Request started", "sessionID", sessionID)
210 defer logging.RecoverPanic("agent.Run", func() {
211 events <- a.err(fmt.Errorf("panic while running the agent"))
212 })
213 var attachmentParts []message.ContentPart
214 for _, attachment := range attachments {
215 attachmentParts = append(attachmentParts, message.BinaryContent{Path: attachment.FilePath, MIMEType: attachment.MimeType, Data: attachment.Content})
216 }
217 result := a.processGeneration(genCtx, sessionID, content, attachmentParts)
218 if result.Error != nil && !errors.Is(result.Error, ErrRequestCancelled) && !errors.Is(result.Error, context.Canceled) {
219 logging.ErrorPersist(result.Error.Error())
220 }
221 logging.Debug("Request completed", "sessionID", sessionID)
222 a.activeRequests.Delete(sessionID)
223 cancel()
224 a.Publish(pubsub.CreatedEvent, result)
225 events <- result
226 close(events)
227 }()
228 return events, nil
229}
230
231func (a *agent) processGeneration(ctx context.Context, sessionID, content string, attachmentParts []message.ContentPart) AgentEvent {
232 // List existing messages; if none, start title generation asynchronously.
233 msgs, err := a.messages.List(ctx, sessionID)
234 if err != nil {
235 return a.err(fmt.Errorf("failed to list messages: %w", err))
236 }
237 if len(msgs) == 0 {
238 go func() {
239 defer logging.RecoverPanic("agent.Run", func() {
240 logging.ErrorPersist("panic while generating title")
241 })
242 titleErr := a.generateTitle(context.Background(), sessionID, content)
243 if titleErr != nil {
244 logging.ErrorPersist(fmt.Sprintf("failed to generate title: %v", titleErr))
245 }
246 }()
247 }
248
249 userMsg, err := a.createUserMessage(ctx, sessionID, content, attachmentParts)
250 if err != nil {
251 return a.err(fmt.Errorf("failed to create user message: %w", err))
252 }
253 // Append the new user message to the conversation history.
254 msgHistory := append(msgs, userMsg)
255
256 for {
257 // Check for cancellation before each iteration
258 select {
259 case <-ctx.Done():
260 return a.err(ctx.Err())
261 default:
262 // Continue processing
263 }
264 agentMessage, toolResults, err := a.streamAndHandleEvents(ctx, sessionID, msgHistory)
265 if err != nil {
266 if errors.Is(err, context.Canceled) {
267 agentMessage.AddFinish(message.FinishReasonCanceled)
268 a.messages.Update(context.Background(), agentMessage)
269 return a.err(ErrRequestCancelled)
270 }
271 return a.err(fmt.Errorf("failed to process events: %w", err))
272 }
273 logging.Info("Result", "message", agentMessage.FinishReason(), "toolResults", toolResults)
274 if (agentMessage.FinishReason() == message.FinishReasonToolUse) && toolResults != nil {
275 // We are not done, we need to respond with the tool response
276 msgHistory = append(msgHistory, agentMessage, *toolResults)
277 continue
278 }
279 return AgentEvent{
280 Type: AgentEventTypeResponse,
281 Message: agentMessage,
282 Done: true,
283 }
284 }
285}
286
287func (a *agent) createUserMessage(ctx context.Context, sessionID, content string, attachmentParts []message.ContentPart) (message.Message, error) {
288 parts := []message.ContentPart{message.TextContent{Text: content}}
289 parts = append(parts, attachmentParts...)
290 return a.messages.Create(ctx, sessionID, message.CreateMessageParams{
291 Role: message.User,
292 Parts: parts,
293 })
294}
295
296func (a *agent) streamAndHandleEvents(ctx context.Context, sessionID string, msgHistory []message.Message) (message.Message, *message.Message, error) {
297 eventChan := a.provider.StreamResponse(ctx, msgHistory, a.tools)
298
299 assistantMsg, err := a.messages.Create(ctx, sessionID, message.CreateMessageParams{
300 Role: message.Assistant,
301 Parts: []message.ContentPart{},
302 Model: a.provider.Model().ID,
303 })
304 if err != nil {
305 return assistantMsg, nil, fmt.Errorf("failed to create assistant message: %w", err)
306 }
307
308 // Add the session and message ID into the context if needed by tools.
309 ctx = context.WithValue(ctx, tools.MessageIDContextKey, assistantMsg.ID)
310 ctx = context.WithValue(ctx, tools.SessionIDContextKey, sessionID)
311
312 // Process each event in the stream.
313 for event := range eventChan {
314 if processErr := a.processEvent(ctx, sessionID, &assistantMsg, event); processErr != nil {
315 a.finishMessage(ctx, &assistantMsg, message.FinishReasonCanceled)
316 return assistantMsg, nil, processErr
317 }
318 if ctx.Err() != nil {
319 a.finishMessage(context.Background(), &assistantMsg, message.FinishReasonCanceled)
320 return assistantMsg, nil, ctx.Err()
321 }
322 }
323
324 toolResults := make([]message.ToolResult, len(assistantMsg.ToolCalls()))
325 toolCalls := assistantMsg.ToolCalls()
326 for i, toolCall := range toolCalls {
327 select {
328 case <-ctx.Done():
329 a.finishMessage(context.Background(), &assistantMsg, message.FinishReasonCanceled)
330 // Make all future tool calls cancelled
331 for j := i; j < len(toolCalls); j++ {
332 toolResults[j] = message.ToolResult{
333 ToolCallID: toolCalls[j].ID,
334 Content: "Tool execution canceled by user",
335 IsError: true,
336 }
337 }
338 goto out
339 default:
340 // Continue processing
341 var tool tools.BaseTool
342 for _, availableTools := range a.tools {
343 if availableTools.Info().Name == toolCall.Name {
344 tool = availableTools
345 }
346 }
347
348 // Tool not found
349 if tool == nil {
350 toolResults[i] = message.ToolResult{
351 ToolCallID: toolCall.ID,
352 Content: fmt.Sprintf("Tool not found: %s", toolCall.Name),
353 IsError: true,
354 }
355 continue
356 }
357 toolResult, toolErr := tool.Run(ctx, tools.ToolCall{
358 ID: toolCall.ID,
359 Name: toolCall.Name,
360 Input: toolCall.Input,
361 })
362 if toolErr != nil {
363 if errors.Is(toolErr, permission.ErrorPermissionDenied) {
364 toolResults[i] = message.ToolResult{
365 ToolCallID: toolCall.ID,
366 Content: "Permission denied",
367 IsError: true,
368 }
369 for j := i + 1; j < len(toolCalls); j++ {
370 toolResults[j] = message.ToolResult{
371 ToolCallID: toolCalls[j].ID,
372 Content: "Tool execution canceled by user",
373 IsError: true,
374 }
375 }
376 a.finishMessage(ctx, &assistantMsg, message.FinishReasonPermissionDenied)
377 break
378 }
379 }
380 toolResults[i] = message.ToolResult{
381 ToolCallID: toolCall.ID,
382 Content: toolResult.Content,
383 Metadata: toolResult.Metadata,
384 IsError: toolResult.IsError,
385 }
386 }
387 }
388out:
389 if len(toolResults) == 0 {
390 return assistantMsg, nil, nil
391 }
392 parts := make([]message.ContentPart, 0)
393 for _, tr := range toolResults {
394 parts = append(parts, tr)
395 }
396 msg, err := a.messages.Create(context.Background(), assistantMsg.SessionID, message.CreateMessageParams{
397 Role: message.Tool,
398 Parts: parts,
399 })
400 if err != nil {
401 return assistantMsg, nil, fmt.Errorf("failed to create cancelled tool message: %w", err)
402 }
403
404 return assistantMsg, &msg, err
405}
406
407func (a *agent) finishMessage(ctx context.Context, msg *message.Message, finishReson message.FinishReason) {
408 msg.AddFinish(finishReson)
409 _ = a.messages.Update(ctx, *msg)
410}
411
412func (a *agent) processEvent(ctx context.Context, sessionID string, assistantMsg *message.Message, event provider.ProviderEvent) error {
413 select {
414 case <-ctx.Done():
415 return ctx.Err()
416 default:
417 // Continue processing.
418 }
419
420 switch event.Type {
421 case provider.EventThinkingDelta:
422 assistantMsg.AppendReasoningContent(event.Content)
423 return a.messages.Update(ctx, *assistantMsg)
424 case provider.EventContentDelta:
425 assistantMsg.AppendContent(event.Content)
426 return a.messages.Update(ctx, *assistantMsg)
427 case provider.EventToolUseStart:
428 assistantMsg.AddToolCall(*event.ToolCall)
429 return a.messages.Update(ctx, *assistantMsg)
430 // TODO: see how to handle this
431 // case provider.EventToolUseDelta:
432 // tm := time.Unix(assistantMsg.UpdatedAt, 0)
433 // assistantMsg.AppendToolCallInput(event.ToolCall.ID, event.ToolCall.Input)
434 // if time.Since(tm) > 1000*time.Millisecond {
435 // err := a.messages.Update(ctx, *assistantMsg)
436 // assistantMsg.UpdatedAt = time.Now().Unix()
437 // return err
438 // }
439 case provider.EventToolUseStop:
440 assistantMsg.FinishToolCall(event.ToolCall.ID)
441 return a.messages.Update(ctx, *assistantMsg)
442 case provider.EventError:
443 if errors.Is(event.Error, context.Canceled) {
444 logging.InfoPersist(fmt.Sprintf("Event processing canceled for session: %s", sessionID))
445 return context.Canceled
446 }
447 logging.ErrorPersist(event.Error.Error())
448 return event.Error
449 case provider.EventComplete:
450 assistantMsg.SetToolCalls(event.Response.ToolCalls)
451 assistantMsg.AddFinish(event.Response.FinishReason)
452 if err := a.messages.Update(ctx, *assistantMsg); err != nil {
453 return fmt.Errorf("failed to update message: %w", err)
454 }
455 return a.TrackUsage(ctx, sessionID, a.provider.Model(), event.Response.Usage)
456 }
457
458 return nil
459}
460
461func (a *agent) TrackUsage(ctx context.Context, sessionID string, model models.Model, usage provider.TokenUsage) error {
462 sess, err := a.sessions.Get(ctx, sessionID)
463 if err != nil {
464 return fmt.Errorf("failed to get session: %w", err)
465 }
466
467 cost := model.CostPer1MInCached/1e6*float64(usage.CacheCreationTokens) +
468 model.CostPer1MOutCached/1e6*float64(usage.CacheReadTokens) +
469 model.CostPer1MIn/1e6*float64(usage.InputTokens) +
470 model.CostPer1MOut/1e6*float64(usage.OutputTokens)
471
472 sess.Cost += cost
473 sess.CompletionTokens = usage.OutputTokens + usage.CacheReadTokens
474 sess.PromptTokens = usage.InputTokens + usage.CacheCreationTokens
475
476 _, err = a.sessions.Save(ctx, sess)
477 if err != nil {
478 return fmt.Errorf("failed to save session: %w", err)
479 }
480 return nil
481}
482
483func (a *agent) Update(agentName config.AgentName, modelID models.ModelID) (models.Model, error) {
484 if a.IsBusy() {
485 return models.Model{}, fmt.Errorf("cannot change model while processing requests")
486 }
487
488 if err := config.UpdateAgentModel(agentName, modelID); err != nil {
489 return models.Model{}, fmt.Errorf("failed to update config: %w", err)
490 }
491
492 provider, err := createAgentProvider(agentName)
493 if err != nil {
494 return models.Model{}, fmt.Errorf("failed to create provider for model %s: %w", modelID, err)
495 }
496
497 a.provider = provider
498
499 return a.provider.Model(), nil
500}
501
502func (a *agent) Summarize(ctx context.Context, sessionID string) error {
503 if a.summarizeProvider == nil {
504 return fmt.Errorf("summarize provider not available")
505 }
506
507 // Check if session is busy
508 if a.IsSessionBusy(sessionID) {
509 return ErrSessionBusy
510 }
511
512 // Create a new context with cancellation
513 summarizeCtx, cancel := context.WithCancel(ctx)
514
515 // Store the cancel function in activeRequests to allow cancellation
516 a.activeRequests.Store(sessionID+"-summarize", cancel)
517
518 go func() {
519 defer a.activeRequests.Delete(sessionID + "-summarize")
520 defer cancel()
521 event := AgentEvent{
522 Type: AgentEventTypeSummarize,
523 Progress: "Starting summarization...",
524 }
525
526 a.Publish(pubsub.CreatedEvent, event)
527 // Get all messages from the session
528 msgs, err := a.messages.List(summarizeCtx, sessionID)
529 if err != nil {
530 event = AgentEvent{
531 Type: AgentEventTypeError,
532 Error: fmt.Errorf("failed to list messages: %w", err),
533 Done: true,
534 }
535 a.Publish(pubsub.CreatedEvent, event)
536 return
537 }
538
539 if len(msgs) == 0 {
540 event = AgentEvent{
541 Type: AgentEventTypeError,
542 Error: fmt.Errorf("no messages to summarize"),
543 Done: true,
544 }
545 a.Publish(pubsub.CreatedEvent, event)
546 return
547 }
548
549 event = AgentEvent{
550 Type: AgentEventTypeSummarize,
551 Progress: "Analyzing conversation...",
552 }
553 a.Publish(pubsub.CreatedEvent, event)
554
555 // Add a system message to guide the summarization
556 summarizePrompt := "Provide a detailed but concise summary of our conversation above. Focus on information that would be helpful for continuing the conversation, including what we did, what we're doing, which files we're working on, and what we're going to do next."
557
558 // Create a new message with the summarize prompt
559 promptMsg := message.Message{
560 Role: message.User,
561 Parts: []message.ContentPart{message.TextContent{Text: summarizePrompt}},
562 }
563
564 // Append the prompt to the messages
565 msgsWithPrompt := append(msgs, promptMsg)
566
567 event = AgentEvent{
568 Type: AgentEventTypeSummarize,
569 Progress: "Generating summary...",
570 }
571
572 a.Publish(pubsub.CreatedEvent, event)
573
574 // Send the messages to the summarize provider
575 response, err := a.summarizeProvider.SendMessages(
576 summarizeCtx,
577 msgsWithPrompt,
578 make([]tools.BaseTool, 0),
579 )
580 if err != nil {
581 event = AgentEvent{
582 Type: AgentEventTypeError,
583 Error: fmt.Errorf("failed to summarize: %w", err),
584 Done: true,
585 }
586 a.Publish(pubsub.CreatedEvent, event)
587 return
588 }
589
590 summary := strings.TrimSpace(response.Content)
591 if summary == "" {
592 event = AgentEvent{
593 Type: AgentEventTypeError,
594 Error: fmt.Errorf("empty summary returned"),
595 Done: true,
596 }
597 a.Publish(pubsub.CreatedEvent, event)
598 return
599 }
600 event = AgentEvent{
601 Type: AgentEventTypeSummarize,
602 Progress: "Creating new session...",
603 }
604
605 a.Publish(pubsub.CreatedEvent, event)
606 oldSession, err := a.sessions.Get(summarizeCtx, sessionID)
607 if err != nil {
608 event = AgentEvent{
609 Type: AgentEventTypeError,
610 Error: fmt.Errorf("failed to get session: %w", err),
611 Done: true,
612 }
613
614 a.Publish(pubsub.CreatedEvent, event)
615 return
616 }
617 // Create a new session with the summary
618 newSession, err := a.sessions.Create(summarizeCtx, oldSession.Title+" - Continuation")
619 if err != nil {
620 event = AgentEvent{
621 Type: AgentEventTypeError,
622 Error: fmt.Errorf("failed to create new session: %w", err),
623 Done: true,
624 }
625 a.Publish(pubsub.CreatedEvent, event)
626 return
627 }
628
629 // Create a message in the new session with the summary
630 _, err = a.messages.Create(summarizeCtx, newSession.ID, message.CreateMessageParams{
631 Role: message.Assistant,
632 Parts: []message.ContentPart{message.TextContent{Text: summary}},
633 Model: a.summarizeProvider.Model().ID,
634 })
635 if err != nil {
636 event = AgentEvent{
637 Type: AgentEventTypeError,
638 Error: fmt.Errorf("failed to create summary message: %w", err),
639 Done: true,
640 }
641
642 a.Publish(pubsub.CreatedEvent, event)
643 return
644 }
645 event = AgentEvent{
646 Type: AgentEventTypeSummarize,
647 SessionID: newSession.ID,
648 Progress: "Summary complete",
649 Done: true,
650 }
651 a.Publish(pubsub.CreatedEvent, event)
652 // Send final success event with the new session ID
653 }()
654
655 return nil
656}
657
658func createAgentProvider(agentName config.AgentName) (provider.Provider, error) {
659 cfg := config.Get()
660 agentConfig, ok := cfg.Agents[agentName]
661 if !ok {
662 return nil, fmt.Errorf("agent %s not found", agentName)
663 }
664 model, ok := models.SupportedModels[agentConfig.Model]
665 if !ok {
666 return nil, fmt.Errorf("model %s not supported", agentConfig.Model)
667 }
668
669 providerCfg, ok := cfg.Providers[model.Provider]
670 if !ok {
671 return nil, fmt.Errorf("provider %s not supported", model.Provider)
672 }
673 if providerCfg.Disabled {
674 return nil, fmt.Errorf("provider %s is not enabled", model.Provider)
675 }
676 maxTokens := model.DefaultMaxTokens
677 if agentConfig.MaxTokens > 0 {
678 maxTokens = agentConfig.MaxTokens
679 }
680 opts := []provider.ProviderClientOption{
681 provider.WithAPIKey(providerCfg.APIKey),
682 provider.WithModel(model),
683 provider.WithSystemMessage(prompt.GetAgentPrompt(agentName, model.Provider)),
684 provider.WithMaxTokens(maxTokens),
685 }
686 if model.Provider == models.ProviderOpenAI && model.CanReason {
687 opts = append(
688 opts,
689 provider.WithOpenAIOptions(
690 provider.WithReasoningEffort(agentConfig.ReasoningEffort),
691 ),
692 )
693 } else if model.Provider == models.ProviderAnthropic && model.CanReason && agentName == config.AgentCoder {
694 opts = append(
695 opts,
696 provider.WithAnthropicOptions(
697 provider.WithAnthropicShouldThinkFn(provider.DefaultShouldThinkFn),
698 ),
699 )
700 }
701 agentProvider, err := provider.NewProvider(
702 model.Provider,
703 opts...,
704 )
705 if err != nil {
706 return nil, fmt.Errorf("could not create provider: %v", err)
707 }
708
709 return agentProvider, nil
710}