agent.go

   1package fantasy
   2
   3import (
   4	"cmp"
   5	"context"
   6	"encoding/json"
   7	"errors"
   8	"fmt"
   9	"maps"
  10	"slices"
  11	"sync"
  12
  13	"charm.land/fantasy/schema"
  14)
  15
  16// StepResult represents the result of a single step in an agent execution.
  17type StepResult struct {
  18	Response
  19	Messages []Message
  20}
  21
  22// stepExecutionResult encapsulates the result of executing a step with stream processing.
  23type stepExecutionResult struct {
  24	StepResult     StepResult
  25	ShouldContinue bool
  26}
  27
  28// StopCondition defines a function that determines when an agent should stop executing.
  29type StopCondition = func(steps []StepResult) bool
  30
  31// StepCountIs returns a stop condition that stops after the specified number of steps.
  32func StepCountIs(stepCount int) StopCondition {
  33	return func(steps []StepResult) bool {
  34		return len(steps) >= stepCount
  35	}
  36}
  37
  38// HasToolCall returns a stop condition that stops when the specified tool is called in the last step.
  39func HasToolCall(toolName string) StopCondition {
  40	return func(steps []StepResult) bool {
  41		if len(steps) == 0 {
  42			return false
  43		}
  44		lastStep := steps[len(steps)-1]
  45		toolCalls := lastStep.Content.ToolCalls()
  46		for _, toolCall := range toolCalls {
  47			if toolCall.ToolName == toolName {
  48				return true
  49			}
  50		}
  51		return false
  52	}
  53}
  54
  55// HasContent returns a stop condition that stops when the specified content type appears in the last step.
  56func HasContent(contentType ContentType) StopCondition {
  57	return func(steps []StepResult) bool {
  58		if len(steps) == 0 {
  59			return false
  60		}
  61		lastStep := steps[len(steps)-1]
  62		for _, content := range lastStep.Content {
  63			if content.GetType() == contentType {
  64				return true
  65			}
  66		}
  67		return false
  68	}
  69}
  70
  71// FinishReasonIs returns a stop condition that stops when the specified finish reason occurs.
  72func FinishReasonIs(reason FinishReason) StopCondition {
  73	return func(steps []StepResult) bool {
  74		if len(steps) == 0 {
  75			return false
  76		}
  77		lastStep := steps[len(steps)-1]
  78		return lastStep.FinishReason == reason
  79	}
  80}
  81
  82// MaxTokensUsed returns a stop condition that stops when total token usage exceeds the specified limit.
  83func MaxTokensUsed(maxTokens int64) StopCondition {
  84	return func(steps []StepResult) bool {
  85		var totalTokens int64
  86		for _, step := range steps {
  87			totalTokens += step.Usage.TotalTokens
  88		}
  89		return totalTokens >= maxTokens
  90	}
  91}
  92
  93// PrepareStepFunctionOptions contains the options for preparing a step in an agent execution.
  94type PrepareStepFunctionOptions struct {
  95	Steps      []StepResult
  96	StepNumber int
  97	Model      LanguageModel
  98	Messages   []Message
  99}
 100
 101// PrepareStepResult contains the result of preparing a step in an agent execution.
 102type PrepareStepResult struct {
 103	Model           LanguageModel
 104	Messages        []Message
 105	System          *string
 106	ToolChoice      *ToolChoice
 107	ActiveTools     []string
 108	DisableAllTools bool
 109	Tools           []AgentTool
 110}
 111
 112// ToolCallRepairOptions contains the options for repairing a tool call.
 113type ToolCallRepairOptions struct {
 114	OriginalToolCall ToolCallContent
 115	ValidationError  error
 116	AvailableTools   []AgentTool
 117	SystemPrompt     string
 118	Messages         []Message
 119}
 120
 121type (
 122	// PrepareStepFunction defines a function that prepares a step in an agent execution.
 123	PrepareStepFunction = func(ctx context.Context, options PrepareStepFunctionOptions) (context.Context, PrepareStepResult, error)
 124
 125	// OnStepFinishedFunction defines a function that is called when a step finishes.
 126	OnStepFinishedFunction = func(step StepResult)
 127
 128	// RepairToolCallFunction defines a function that repairs a tool call.
 129	RepairToolCallFunction = func(ctx context.Context, options ToolCallRepairOptions) (*ToolCallContent, error)
 130)
 131
 132type agentSettings struct {
 133	systemPrompt     string
 134	maxOutputTokens  *int64
 135	temperature      *float64
 136	topP             *float64
 137	topK             *int64
 138	presencePenalty  *float64
 139	frequencyPenalty *float64
 140	headers          map[string]string
 141	userAgent        string
 142	providerOptions  ProviderOptions
 143
 144	// TODO: add support for provider tools
 145	tools      []AgentTool
 146	maxRetries *int
 147
 148	model LanguageModel
 149
 150	stopWhen       []StopCondition
 151	prepareStep    PrepareStepFunction
 152	repairToolCall RepairToolCallFunction
 153	onRetry        OnRetryCallback
 154}
 155
 156// AgentCall represents a call to an agent.
 157type AgentCall struct {
 158	Prompt           string     `json:"prompt"`
 159	Files            []FilePart `json:"files"`
 160	Messages         []Message  `json:"messages"`
 161	MaxOutputTokens  *int64
 162	Temperature      *float64 `json:"temperature"`
 163	TopP             *float64 `json:"top_p"`
 164	TopK             *int64   `json:"top_k"`
 165	PresencePenalty  *float64 `json:"presence_penalty"`
 166	FrequencyPenalty *float64 `json:"frequency_penalty"`
 167	ActiveTools      []string `json:"active_tools"`
 168	ProviderOptions  ProviderOptions
 169	OnRetry          OnRetryCallback
 170	MaxRetries       *int
 171
 172	StopWhen       []StopCondition
 173	PrepareStep    PrepareStepFunction
 174	RepairToolCall RepairToolCallFunction
 175}
 176
 177// Agent-level callbacks.
 178type (
 179	// OnAgentStartFunc is called when agent starts.
 180	OnAgentStartFunc func()
 181
 182	// OnAgentFinishFunc is called when agent finishes.
 183	OnAgentFinishFunc func(result *AgentResult) error
 184
 185	// OnStepStartFunc is called when a step starts.
 186	OnStepStartFunc func(stepNumber int) error
 187
 188	// OnStepFinishFunc is called when a step finishes.
 189	OnStepFinishFunc func(stepResult StepResult) error
 190
 191	// OnFinishFunc is called when entire agent completes.
 192	OnFinishFunc func(result *AgentResult)
 193
 194	// OnErrorFunc is called when an error occurs.
 195	OnErrorFunc func(error)
 196)
 197
 198// Stream part callbacks - called for each corresponding stream part type.
 199type (
 200	// OnChunkFunc is called for each stream part (catch-all).
 201	OnChunkFunc func(StreamPart) error
 202
 203	// OnWarningsFunc is called for warnings.
 204	OnWarningsFunc func(warnings []CallWarning) error
 205
 206	// OnTextStartFunc is called when text starts.
 207	OnTextStartFunc func(id string) error
 208
 209	// OnTextDeltaFunc is called for text deltas.
 210	OnTextDeltaFunc func(id, text string) error
 211
 212	// OnTextEndFunc is called when text ends.
 213	OnTextEndFunc func(id string) error
 214
 215	// OnReasoningStartFunc is called when reasoning starts.
 216	OnReasoningStartFunc func(id string, reasoning ReasoningContent) error
 217
 218	// OnReasoningDeltaFunc is called for reasoning deltas.
 219	OnReasoningDeltaFunc func(id, text string) error
 220
 221	// OnReasoningEndFunc is called when reasoning ends.
 222	OnReasoningEndFunc func(id string, reasoning ReasoningContent) error
 223
 224	// OnToolInputStartFunc is called when tool input starts.
 225	OnToolInputStartFunc func(id, toolName string) error
 226
 227	// OnToolInputDeltaFunc is called for tool input deltas.
 228	OnToolInputDeltaFunc func(id, delta string) error
 229
 230	// OnToolInputEndFunc is called when tool input ends.
 231	OnToolInputEndFunc func(id string) error
 232
 233	// OnToolCallFunc is called when tool call is complete.
 234	OnToolCallFunc func(toolCall ToolCallContent) error
 235
 236	// OnToolResultFunc is called when tool execution completes.
 237	OnToolResultFunc func(result ToolResultContent) error
 238
 239	// OnSourceFunc is called for source references.
 240	OnSourceFunc func(source SourceContent) error
 241
 242	// OnStreamFinishFunc is called when stream finishes.
 243	OnStreamFinishFunc func(usage Usage, finishReason FinishReason, providerMetadata ProviderMetadata) error
 244)
 245
 246// AgentStreamCall represents a streaming call to an agent.
 247type AgentStreamCall struct {
 248	Prompt           string     `json:"prompt"`
 249	Files            []FilePart `json:"files"`
 250	Messages         []Message  `json:"messages"`
 251	MaxOutputTokens  *int64
 252	Temperature      *float64 `json:"temperature"`
 253	TopP             *float64 `json:"top_p"`
 254	TopK             *int64   `json:"top_k"`
 255	PresencePenalty  *float64 `json:"presence_penalty"`
 256	FrequencyPenalty *float64 `json:"frequency_penalty"`
 257	ActiveTools      []string `json:"active_tools"`
 258	Headers          map[string]string
 259	ProviderOptions  ProviderOptions
 260	OnRetry          OnRetryCallback
 261	MaxRetries       *int
 262
 263	StopWhen       []StopCondition
 264	PrepareStep    PrepareStepFunction
 265	RepairToolCall RepairToolCallFunction
 266
 267	// Agent-level callbacks
 268	OnAgentStart  OnAgentStartFunc  // Called when agent starts
 269	OnAgentFinish OnAgentFinishFunc // Called when agent finishes
 270	OnStepStart   OnStepStartFunc   // Called when a step starts
 271	OnStepFinish  OnStepFinishFunc  // Called when a step finishes
 272	OnFinish      OnFinishFunc      // Called when entire agent completes
 273	OnError       OnErrorFunc       // Called when an error occurs
 274
 275	// Stream part callbacks - called for each corresponding stream part type
 276	OnChunk          OnChunkFunc          // Called for each stream part (catch-all)
 277	OnWarnings       OnWarningsFunc       // Called for warnings
 278	OnTextStart      OnTextStartFunc      // Called when text starts
 279	OnTextDelta      OnTextDeltaFunc      // Called for text deltas
 280	OnTextEnd        OnTextEndFunc        // Called when text ends
 281	OnReasoningStart OnReasoningStartFunc // Called when reasoning starts
 282	OnReasoningDelta OnReasoningDeltaFunc // Called for reasoning deltas
 283	OnReasoningEnd   OnReasoningEndFunc   // Called when reasoning ends
 284	OnToolInputStart OnToolInputStartFunc // Called when tool input starts
 285	OnToolInputDelta OnToolInputDeltaFunc // Called for tool input deltas
 286	OnToolInputEnd   OnToolInputEndFunc   // Called when tool input ends
 287	OnToolCall       OnToolCallFunc       // Called when tool call is complete
 288	OnToolResult     OnToolResultFunc     // Called when tool execution completes
 289	OnSource         OnSourceFunc         // Called for source references
 290	OnStreamFinish   OnStreamFinishFunc   // Called when stream finishes
 291}
 292
 293// AgentResult represents the result of an agent execution.
 294type AgentResult struct {
 295	Steps []StepResult
 296	// Final response
 297	Response   Response
 298	TotalUsage Usage
 299}
 300
 301// Agent represents an AI agent that can generate responses and stream responses.
 302type Agent interface {
 303	Generate(context.Context, AgentCall) (*AgentResult, error)
 304	Stream(context.Context, AgentStreamCall) (*AgentResult, error)
 305}
 306
 307// AgentOption defines a function that configures agent settings.
 308type AgentOption = func(*agentSettings)
 309
 310type agent struct {
 311	settings agentSettings
 312}
 313
 314// NewAgent creates a new agent with the given language model and options.
 315func NewAgent(model LanguageModel, opts ...AgentOption) Agent {
 316	settings := agentSettings{
 317		model: model,
 318	}
 319	for _, o := range opts {
 320		o(&settings)
 321	}
 322	return &agent{
 323		settings: settings,
 324	}
 325}
 326
 327func (a *agent) prepareCall(call AgentCall) AgentCall {
 328	call.MaxOutputTokens = cmp.Or(call.MaxOutputTokens, a.settings.maxOutputTokens)
 329	call.Temperature = cmp.Or(call.Temperature, a.settings.temperature)
 330	call.TopP = cmp.Or(call.TopP, a.settings.topP)
 331	call.TopK = cmp.Or(call.TopK, a.settings.topK)
 332	call.PresencePenalty = cmp.Or(call.PresencePenalty, a.settings.presencePenalty)
 333	call.FrequencyPenalty = cmp.Or(call.FrequencyPenalty, a.settings.frequencyPenalty)
 334	call.MaxRetries = cmp.Or(call.MaxRetries, a.settings.maxRetries)
 335
 336	if len(call.StopWhen) == 0 && len(a.settings.stopWhen) > 0 {
 337		call.StopWhen = a.settings.stopWhen
 338	}
 339	if call.PrepareStep == nil && a.settings.prepareStep != nil {
 340		call.PrepareStep = a.settings.prepareStep
 341	}
 342	if call.RepairToolCall == nil && a.settings.repairToolCall != nil {
 343		call.RepairToolCall = a.settings.repairToolCall
 344	}
 345	if call.OnRetry == nil && a.settings.onRetry != nil {
 346		call.OnRetry = a.settings.onRetry
 347	}
 348
 349	providerOptions := ProviderOptions{}
 350	if a.settings.providerOptions != nil {
 351		maps.Copy(providerOptions, a.settings.providerOptions)
 352	}
 353	if call.ProviderOptions != nil {
 354		maps.Copy(providerOptions, call.ProviderOptions)
 355	}
 356	call.ProviderOptions = providerOptions
 357
 358	headers := map[string]string{}
 359
 360	if a.settings.headers != nil {
 361		maps.Copy(headers, a.settings.headers)
 362	}
 363
 364	return call
 365}
 366
 367// Generate implements Agent.
 368func (a *agent) Generate(ctx context.Context, opts AgentCall) (*AgentResult, error) {
 369	opts = a.prepareCall(opts)
 370	initialPrompt, err := a.createPrompt(a.settings.systemPrompt, opts.Prompt, opts.Messages, opts.Files...)
 371	if err != nil {
 372		return nil, err
 373	}
 374	var responseMessages []Message
 375	var steps []StepResult
 376
 377	for {
 378		stepInputMessages := append(initialPrompt, responseMessages...)
 379		stepModel := a.settings.model
 380		stepSystemPrompt := a.settings.systemPrompt
 381		stepActiveTools := opts.ActiveTools
 382		stepToolChoice := ToolChoiceAuto
 383		disableAllTools := false
 384		stepTools := a.settings.tools
 385		if opts.PrepareStep != nil {
 386			updatedCtx, prepared, err := opts.PrepareStep(ctx, PrepareStepFunctionOptions{
 387				Model:      stepModel,
 388				Steps:      steps,
 389				StepNumber: len(steps),
 390				Messages:   stepInputMessages,
 391			})
 392			if err != nil {
 393				return nil, err
 394			}
 395
 396			ctx = updatedCtx
 397
 398			// Apply prepared step modifications
 399			if prepared.Messages != nil {
 400				stepInputMessages = prepared.Messages
 401			}
 402			if prepared.Model != nil {
 403				stepModel = prepared.Model
 404			}
 405			if prepared.System != nil {
 406				stepSystemPrompt = *prepared.System
 407			}
 408			if prepared.ToolChoice != nil {
 409				stepToolChoice = *prepared.ToolChoice
 410			}
 411			if len(prepared.ActiveTools) > 0 {
 412				stepActiveTools = prepared.ActiveTools
 413			}
 414			disableAllTools = prepared.DisableAllTools
 415			if prepared.Tools != nil {
 416				stepTools = prepared.Tools
 417			}
 418		}
 419
 420		// Recreate prompt with potentially modified system prompt
 421		if stepSystemPrompt != a.settings.systemPrompt {
 422			stepPrompt, err := a.createPrompt(stepSystemPrompt, opts.Prompt, opts.Messages, opts.Files...)
 423			if err != nil {
 424				return nil, err
 425			}
 426			// Replace system message part, keep the rest
 427			if len(stepInputMessages) > 0 && len(stepPrompt) > 0 {
 428				stepInputMessages[0] = stepPrompt[0] // Replace system message
 429			}
 430		}
 431
 432		preparedTools := a.prepareTools(stepTools, stepActiveTools, disableAllTools)
 433
 434		retryOptions := DefaultRetryOptions()
 435		if opts.MaxRetries != nil {
 436			retryOptions.MaxRetries = *opts.MaxRetries
 437		}
 438		retryOptions.OnRetry = opts.OnRetry
 439		retry := RetryWithExponentialBackoffRespectingRetryHeaders[*Response](retryOptions)
 440
 441		result, err := retry(ctx, func() (*Response, error) {
 442			return stepModel.Generate(ctx, Call{
 443				Prompt:           stepInputMessages,
 444				MaxOutputTokens:  opts.MaxOutputTokens,
 445				Temperature:      opts.Temperature,
 446				TopP:             opts.TopP,
 447				TopK:             opts.TopK,
 448				PresencePenalty:  opts.PresencePenalty,
 449				FrequencyPenalty: opts.FrequencyPenalty,
 450				Tools:            preparedTools,
 451				ToolChoice:       &stepToolChoice,
 452				UserAgent:        a.settings.userAgent,
 453				ProviderOptions:  opts.ProviderOptions,
 454			})
 455		})
 456		if err != nil {
 457			return nil, err
 458		}
 459
 460		var stepToolCalls []ToolCallContent
 461		for _, content := range result.Content {
 462			if content.GetType() == ContentTypeToolCall {
 463				toolCall, ok := AsContentType[ToolCallContent](content)
 464				if !ok {
 465					continue
 466				}
 467
 468				// Validate and potentially repair the tool call
 469				validatedToolCall := a.validateAndRepairToolCall(ctx, toolCall, stepTools, stepSystemPrompt, stepInputMessages, a.settings.repairToolCall)
 470				stepToolCalls = append(stepToolCalls, validatedToolCall)
 471			}
 472		}
 473
 474		toolResults, err := a.executeTools(ctx, stepTools, stepToolCalls, nil)
 475
 476		// Build step content with validated tool calls and tool results
 477		stepContent := []Content{}
 478		toolCallIndex := 0
 479		for _, content := range result.Content {
 480			if content.GetType() == ContentTypeToolCall {
 481				// Replace with validated tool call
 482				if toolCallIndex < len(stepToolCalls) {
 483					stepContent = append(stepContent, stepToolCalls[toolCallIndex])
 484					toolCallIndex++
 485				}
 486			} else {
 487				// Keep other content as-is
 488				stepContent = append(stepContent, content)
 489			}
 490		}
 491		// Add tool results
 492		for _, result := range toolResults {
 493			stepContent = append(stepContent, result)
 494		}
 495		currentStepMessages := toResponseMessages(stepContent)
 496		responseMessages = append(responseMessages, currentStepMessages...)
 497
 498		stepResult := StepResult{
 499			Response: Response{
 500				Content:          stepContent,
 501				FinishReason:     result.FinishReason,
 502				Usage:            result.Usage,
 503				Warnings:         result.Warnings,
 504				ProviderMetadata: result.ProviderMetadata,
 505			},
 506			Messages: currentStepMessages,
 507		}
 508		steps = append(steps, stepResult)
 509		shouldStop := isStopConditionMet(opts.StopWhen, steps)
 510
 511		if shouldStop || err != nil || len(stepToolCalls) == 0 || result.FinishReason != FinishReasonToolCalls {
 512			break
 513		}
 514	}
 515
 516	totalUsage := Usage{}
 517
 518	for _, step := range steps {
 519		usage := step.Usage
 520		totalUsage.InputTokens += usage.InputTokens
 521		totalUsage.OutputTokens += usage.OutputTokens
 522		totalUsage.ReasoningTokens += usage.ReasoningTokens
 523		totalUsage.CacheCreationTokens += usage.CacheCreationTokens
 524		totalUsage.CacheReadTokens += usage.CacheReadTokens
 525		totalUsage.TotalTokens += usage.TotalTokens
 526	}
 527
 528	agentResult := &AgentResult{
 529		Steps:      steps,
 530		Response:   steps[len(steps)-1].Response,
 531		TotalUsage: totalUsage,
 532	}
 533	return agentResult, nil
 534}
 535
 536func isStopConditionMet(conditions []StopCondition, steps []StepResult) bool {
 537	if len(conditions) == 0 {
 538		return false
 539	}
 540
 541	for _, condition := range conditions {
 542		if condition(steps) {
 543			return true
 544		}
 545	}
 546	return false
 547}
 548
 549func toResponseMessages(content []Content) []Message {
 550	var assistantParts []MessagePart
 551	var toolParts []MessagePart
 552
 553	for _, c := range content {
 554		switch c.GetType() {
 555		case ContentTypeText:
 556			text, ok := AsContentType[TextContent](c)
 557			if !ok {
 558				continue
 559			}
 560			assistantParts = append(assistantParts, TextPart{
 561				Text:            text.Text,
 562				ProviderOptions: ProviderOptions(text.ProviderMetadata),
 563			})
 564		case ContentTypeReasoning:
 565			reasoning, ok := AsContentType[ReasoningContent](c)
 566			if !ok {
 567				continue
 568			}
 569			assistantParts = append(assistantParts, ReasoningPart{
 570				Text:            reasoning.Text,
 571				ProviderOptions: ProviderOptions(reasoning.ProviderMetadata),
 572			})
 573		case ContentTypeToolCall:
 574			toolCall, ok := AsContentType[ToolCallContent](c)
 575			if !ok {
 576				continue
 577			}
 578			assistantParts = append(assistantParts, ToolCallPart{
 579				ToolCallID:       toolCall.ToolCallID,
 580				ToolName:         toolCall.ToolName,
 581				Input:            toolCall.Input,
 582				ProviderExecuted: toolCall.ProviderExecuted,
 583				ProviderOptions:  ProviderOptions(toolCall.ProviderMetadata),
 584			})
 585		case ContentTypeFile:
 586			file, ok := AsContentType[FileContent](c)
 587			if !ok {
 588				continue
 589			}
 590			assistantParts = append(assistantParts, FilePart{
 591				Data:            file.Data,
 592				MediaType:       file.MediaType,
 593				ProviderOptions: ProviderOptions(file.ProviderMetadata),
 594			})
 595		case ContentTypeSource:
 596			// Sources are metadata about references used to generate the response.
 597			// They don't need to be included in the conversation messages.
 598			continue
 599		case ContentTypeToolResult:
 600			result, ok := AsContentType[ToolResultContent](c)
 601			if !ok {
 602				continue
 603			}
 604			toolParts = append(toolParts, ToolResultPart{
 605				ToolCallID:      result.ToolCallID,
 606				Output:          result.Result,
 607				ProviderOptions: ProviderOptions(result.ProviderMetadata),
 608			})
 609		}
 610	}
 611
 612	var messages []Message
 613	if len(assistantParts) > 0 {
 614		messages = append(messages, Message{
 615			Role:    MessageRoleAssistant,
 616			Content: assistantParts,
 617		})
 618	}
 619	if len(toolParts) > 0 {
 620		messages = append(messages, Message{
 621			Role:    MessageRoleTool,
 622			Content: toolParts,
 623		})
 624	}
 625	return messages
 626}
 627
 628func (a *agent) executeTools(ctx context.Context, allTools []AgentTool, toolCalls []ToolCallContent, toolResultCallback func(result ToolResultContent) error) ([]ToolResultContent, error) {
 629	if len(toolCalls) == 0 {
 630		return nil, nil
 631	}
 632
 633	// Create a map for quick tool lookup
 634	toolMap := make(map[string]AgentTool)
 635	for _, tool := range allTools {
 636		toolMap[tool.Info().Name] = tool
 637	}
 638
 639	// Execute all tool calls sequentially in order
 640	results := make([]ToolResultContent, 0, len(toolCalls))
 641
 642	for _, toolCall := range toolCalls {
 643		result, isCriticalError := a.executeSingleTool(ctx, toolMap, toolCall, toolResultCallback)
 644		results = append(results, result)
 645		if isCriticalError {
 646			if errorResult, ok := result.Result.(ToolResultOutputContentError); ok && errorResult.Error != nil {
 647				return nil, errorResult.Error
 648			}
 649		}
 650	}
 651
 652	return results, nil
 653}
 654
 655// executeSingleTool executes a single tool and returns its result and a critical error flag.
 656func (a *agent) executeSingleTool(ctx context.Context, toolMap map[string]AgentTool, toolCall ToolCallContent, toolResultCallback func(result ToolResultContent) error) (ToolResultContent, bool) {
 657	result := ToolResultContent{
 658		ToolCallID:       toolCall.ToolCallID,
 659		ToolName:         toolCall.ToolName,
 660		ProviderExecuted: false,
 661	}
 662
 663	// Skip invalid tool calls - create error result (not critical)
 664	if toolCall.Invalid {
 665		result.Result = ToolResultOutputContentError{
 666			Error: toolCall.ValidationError,
 667		}
 668		if toolResultCallback != nil {
 669			_ = toolResultCallback(result)
 670		}
 671		return result, false
 672	}
 673
 674	tool, exists := toolMap[toolCall.ToolName]
 675	if !exists {
 676		result.Result = ToolResultOutputContentError{
 677			Error: errors.New("Error: Tool not found: " + toolCall.ToolName),
 678		}
 679		if toolResultCallback != nil {
 680			_ = toolResultCallback(result)
 681		}
 682		return result, false
 683	}
 684
 685	// Execute the tool
 686	toolResult, err := tool.Run(ctx, ToolCall{
 687		ID:    toolCall.ToolCallID,
 688		Name:  toolCall.ToolName,
 689		Input: toolCall.Input,
 690	})
 691	if err != nil {
 692		result.Result = ToolResultOutputContentError{
 693			Error: err,
 694		}
 695		result.ClientMetadata = toolResult.Metadata
 696		if toolResultCallback != nil {
 697			_ = toolResultCallback(result)
 698		}
 699		return result, true
 700	}
 701
 702	result.ClientMetadata = toolResult.Metadata
 703	if toolResult.IsError {
 704		result.Result = ToolResultOutputContentError{
 705			Error: errors.New(toolResult.Content),
 706		}
 707	} else if toolResult.Type == "image" || toolResult.Type == "media" {
 708		result.Result = ToolResultOutputContentMedia{
 709			Data:      string(toolResult.Data),
 710			MediaType: toolResult.MediaType,
 711			Text:      toolResult.Content,
 712		}
 713	} else {
 714		result.Result = ToolResultOutputContentText{
 715			Text: toolResult.Content,
 716		}
 717	}
 718	if toolResultCallback != nil {
 719		_ = toolResultCallback(result)
 720	}
 721	return result, false
 722}
 723
 724// Stream implements Agent.
 725func (a *agent) Stream(ctx context.Context, opts AgentStreamCall) (*AgentResult, error) {
 726	// Convert AgentStreamCall to AgentCall for preparation
 727	call := AgentCall{
 728		Prompt:           opts.Prompt,
 729		Files:            opts.Files,
 730		Messages:         opts.Messages,
 731		MaxOutputTokens:  opts.MaxOutputTokens,
 732		Temperature:      opts.Temperature,
 733		TopP:             opts.TopP,
 734		TopK:             opts.TopK,
 735		PresencePenalty:  opts.PresencePenalty,
 736		FrequencyPenalty: opts.FrequencyPenalty,
 737		ActiveTools:      opts.ActiveTools,
 738		ProviderOptions:  opts.ProviderOptions,
 739		MaxRetries:       opts.MaxRetries,
 740		OnRetry:          opts.OnRetry,
 741		StopWhen:         opts.StopWhen,
 742		PrepareStep:      opts.PrepareStep,
 743		RepairToolCall:   opts.RepairToolCall,
 744	}
 745
 746	call = a.prepareCall(call)
 747
 748	initialPrompt, err := a.createPrompt(a.settings.systemPrompt, call.Prompt, call.Messages, call.Files...)
 749	if err != nil {
 750		return nil, err
 751	}
 752
 753	var responseMessages []Message
 754	var steps []StepResult
 755	var totalUsage Usage
 756
 757	// Start agent stream
 758	if opts.OnAgentStart != nil {
 759		opts.OnAgentStart()
 760	}
 761
 762	for stepNumber := 0; ; stepNumber++ {
 763		stepInputMessages := append(initialPrompt, responseMessages...)
 764		stepModel := a.settings.model
 765		stepSystemPrompt := a.settings.systemPrompt
 766		stepActiveTools := call.ActiveTools
 767		stepToolChoice := ToolChoiceAuto
 768		disableAllTools := false
 769		stepTools := a.settings.tools
 770		// Apply step preparation if provided
 771		if call.PrepareStep != nil {
 772			updatedCtx, prepared, err := call.PrepareStep(ctx, PrepareStepFunctionOptions{
 773				Model:      stepModel,
 774				Steps:      steps,
 775				StepNumber: stepNumber,
 776				Messages:   stepInputMessages,
 777			})
 778			if err != nil {
 779				return nil, err
 780			}
 781
 782			ctx = updatedCtx
 783
 784			if prepared.Messages != nil {
 785				stepInputMessages = prepared.Messages
 786			}
 787			if prepared.Model != nil {
 788				stepModel = prepared.Model
 789			}
 790			if prepared.System != nil {
 791				stepSystemPrompt = *prepared.System
 792			}
 793			if prepared.ToolChoice != nil {
 794				stepToolChoice = *prepared.ToolChoice
 795			}
 796			if len(prepared.ActiveTools) > 0 {
 797				stepActiveTools = prepared.ActiveTools
 798			}
 799			disableAllTools = prepared.DisableAllTools
 800			if prepared.Tools != nil {
 801				stepTools = prepared.Tools
 802			}
 803		}
 804
 805		// Recreate prompt with potentially modified system prompt
 806		if stepSystemPrompt != a.settings.systemPrompt {
 807			stepPrompt, err := a.createPrompt(stepSystemPrompt, call.Prompt, call.Messages, call.Files...)
 808			if err != nil {
 809				return nil, err
 810			}
 811			if len(stepInputMessages) > 0 && len(stepPrompt) > 0 {
 812				stepInputMessages[0] = stepPrompt[0]
 813			}
 814		}
 815
 816		preparedTools := a.prepareTools(stepTools, stepActiveTools, disableAllTools)
 817
 818		// Start step stream
 819		if opts.OnStepStart != nil {
 820			_ = opts.OnStepStart(stepNumber)
 821		}
 822
 823		// Create streaming call
 824		streamCall := Call{
 825			Prompt:           stepInputMessages,
 826			MaxOutputTokens:  call.MaxOutputTokens,
 827			Temperature:      call.Temperature,
 828			TopP:             call.TopP,
 829			TopK:             call.TopK,
 830			PresencePenalty:  call.PresencePenalty,
 831			FrequencyPenalty: call.FrequencyPenalty,
 832			Tools:            preparedTools,
 833			ToolChoice:       &stepToolChoice,
 834			UserAgent:        a.settings.userAgent,
 835			ProviderOptions:  call.ProviderOptions,
 836		}
 837
 838		// Execute step with retry logic wrapping both stream creation and processing
 839		retryOptions := DefaultRetryOptions()
 840		if call.MaxRetries != nil {
 841			retryOptions.MaxRetries = *call.MaxRetries
 842		}
 843		retryOptions.OnRetry = call.OnRetry
 844		retry := RetryWithExponentialBackoffRespectingRetryHeaders[stepExecutionResult](retryOptions)
 845
 846		result, err := retry(ctx, func() (stepExecutionResult, error) {
 847			// Create the stream
 848			stream, err := stepModel.Stream(ctx, streamCall)
 849			if err != nil {
 850				return stepExecutionResult{}, err
 851			}
 852
 853			// Process the stream
 854			result, err := a.processStepStream(ctx, stream, opts, steps, stepTools)
 855			if err != nil {
 856				return stepExecutionResult{}, err
 857			}
 858
 859			return result, nil
 860		})
 861		if err != nil {
 862			if opts.OnError != nil {
 863				opts.OnError(err)
 864			}
 865			return nil, err
 866		}
 867
 868		steps = append(steps, result.StepResult)
 869		totalUsage = addUsage(totalUsage, result.StepResult.Usage)
 870
 871		// Call step finished callback
 872		if opts.OnStepFinish != nil {
 873			_ = opts.OnStepFinish(result.StepResult)
 874		}
 875
 876		// Add step messages to response messages
 877		stepMessages := toResponseMessages(result.StepResult.Content)
 878		responseMessages = append(responseMessages, stepMessages...)
 879
 880		// Check stop conditions
 881		shouldStop := isStopConditionMet(call.StopWhen, steps)
 882		if shouldStop || !result.ShouldContinue {
 883			break
 884		}
 885	}
 886
 887	// Finish agent stream
 888	agentResult := &AgentResult{
 889		Steps:      steps,
 890		Response:   steps[len(steps)-1].Response,
 891		TotalUsage: totalUsage,
 892	}
 893
 894	if opts.OnFinish != nil {
 895		opts.OnFinish(agentResult)
 896	}
 897
 898	if opts.OnAgentFinish != nil {
 899		_ = opts.OnAgentFinish(agentResult)
 900	}
 901
 902	return agentResult, nil
 903}
 904
 905func (a *agent) prepareTools(tools []AgentTool, activeTools []string, disableAllTools bool) []Tool {
 906	preparedTools := make([]Tool, 0, len(tools))
 907
 908	// If explicitly disabling all tools, return no tools
 909	if disableAllTools {
 910		return preparedTools
 911	}
 912
 913	for _, tool := range tools {
 914		// If activeTools has items, only include tools in the list
 915		// If activeTools is empty, include all tools
 916		if len(activeTools) > 0 && !slices.Contains(activeTools, tool.Info().Name) {
 917			continue
 918		}
 919		info := tool.Info()
 920		inputSchema := map[string]any{
 921			"type":       "object",
 922			"properties": info.Parameters,
 923			"required":   info.Required,
 924		}
 925		schema.Normalize(inputSchema)
 926		preparedTools = append(preparedTools, FunctionTool{
 927			Name:            info.Name,
 928			Description:     info.Description,
 929			InputSchema:     inputSchema,
 930			ProviderOptions: tool.ProviderOptions(),
 931		})
 932	}
 933	return preparedTools
 934}
 935
 936// validateAndRepairToolCall validates a tool call and attempts repair if validation fails.
 937func (a *agent) validateAndRepairToolCall(ctx context.Context, toolCall ToolCallContent, availableTools []AgentTool, systemPrompt string, messages []Message, repairFunc RepairToolCallFunction) ToolCallContent {
 938	if err := a.validateToolCall(toolCall, availableTools); err == nil {
 939		return toolCall
 940	} else { //nolint: revive
 941		if repairFunc != nil {
 942			repairOptions := ToolCallRepairOptions{
 943				OriginalToolCall: toolCall,
 944				ValidationError:  err,
 945				AvailableTools:   availableTools,
 946				SystemPrompt:     systemPrompt,
 947				Messages:         messages,
 948			}
 949
 950			if repairedToolCall, repairErr := repairFunc(ctx, repairOptions); repairErr == nil && repairedToolCall != nil {
 951				if validateErr := a.validateToolCall(*repairedToolCall, availableTools); validateErr == nil {
 952					return *repairedToolCall
 953				}
 954			}
 955		}
 956
 957		invalidToolCall := toolCall
 958		invalidToolCall.Invalid = true
 959		invalidToolCall.ValidationError = err
 960		return invalidToolCall
 961	}
 962}
 963
 964// validateToolCall validates a tool call against available tools and their schemas.
 965func (a *agent) validateToolCall(toolCall ToolCallContent, availableTools []AgentTool) error {
 966	var tool AgentTool
 967	for _, t := range availableTools {
 968		if t.Info().Name == toolCall.ToolName {
 969			tool = t
 970			break
 971		}
 972	}
 973
 974	if tool == nil {
 975		return fmt.Errorf("tool not found: %s", toolCall.ToolName)
 976	}
 977
 978	// Validate JSON parsing
 979	var input map[string]any
 980	if err := json.Unmarshal([]byte(toolCall.Input), &input); err != nil {
 981		return fmt.Errorf("invalid JSON input: %w", err)
 982	}
 983
 984	// Basic schema validation (check required fields)
 985	// TODO: more robust schema validation using JSON Schema or similar
 986	toolInfo := tool.Info()
 987	for _, required := range toolInfo.Required {
 988		if _, exists := input[required]; !exists {
 989			return fmt.Errorf("missing required parameter: %s", required)
 990		}
 991	}
 992	return nil
 993}
 994
 995func (a *agent) createPrompt(system, prompt string, messages []Message, files ...FilePart) (Prompt, error) {
 996	if prompt == "" {
 997		return nil, &Error{Title: "invalid argument", Message: "prompt can't be empty"}
 998	}
 999
1000	var preparedPrompt Prompt
1001
1002	if system != "" {
1003		preparedPrompt = append(preparedPrompt, NewSystemMessage(system))
1004	}
1005	preparedPrompt = append(preparedPrompt, messages...)
1006	preparedPrompt = append(preparedPrompt, NewUserMessage(prompt, files...))
1007	return preparedPrompt, nil
1008}
1009
1010// WithSystemPrompt sets the system prompt for the agent.
1011func WithSystemPrompt(prompt string) AgentOption {
1012	return func(s *agentSettings) {
1013		s.systemPrompt = prompt
1014	}
1015}
1016
1017// WithMaxOutputTokens sets the maximum output tokens for the agent.
1018func WithMaxOutputTokens(tokens int64) AgentOption {
1019	return func(s *agentSettings) {
1020		s.maxOutputTokens = &tokens
1021	}
1022}
1023
1024// WithTemperature sets the temperature for the agent.
1025func WithTemperature(temp float64) AgentOption {
1026	return func(s *agentSettings) {
1027		s.temperature = &temp
1028	}
1029}
1030
1031// WithTopP sets the top-p value for the agent.
1032func WithTopP(topP float64) AgentOption {
1033	return func(s *agentSettings) {
1034		s.topP = &topP
1035	}
1036}
1037
1038// WithTopK sets the top-k value for the agent.
1039func WithTopK(topK int64) AgentOption {
1040	return func(s *agentSettings) {
1041		s.topK = &topK
1042	}
1043}
1044
1045// WithPresencePenalty sets the presence penalty for the agent.
1046func WithPresencePenalty(penalty float64) AgentOption {
1047	return func(s *agentSettings) {
1048		s.presencePenalty = &penalty
1049	}
1050}
1051
1052// WithFrequencyPenalty sets the frequency penalty for the agent.
1053func WithFrequencyPenalty(penalty float64) AgentOption {
1054	return func(s *agentSettings) {
1055		s.frequencyPenalty = &penalty
1056	}
1057}
1058
1059// WithTools sets the tools for the agent.
1060func WithTools(tools ...AgentTool) AgentOption {
1061	return func(s *agentSettings) {
1062		s.tools = append(s.tools, tools...)
1063	}
1064}
1065
1066// WithStopConditions sets the stop conditions for the agent.
1067func WithStopConditions(conditions ...StopCondition) AgentOption {
1068	return func(s *agentSettings) {
1069		s.stopWhen = append(s.stopWhen, conditions...)
1070	}
1071}
1072
1073// WithPrepareStep sets the prepare step function for the agent.
1074func WithPrepareStep(fn PrepareStepFunction) AgentOption {
1075	return func(s *agentSettings) {
1076		s.prepareStep = fn
1077	}
1078}
1079
1080// WithRepairToolCall sets the repair tool call function for the agent.
1081func WithRepairToolCall(fn RepairToolCallFunction) AgentOption {
1082	return func(s *agentSettings) {
1083		s.repairToolCall = fn
1084	}
1085}
1086
1087// WithMaxRetries sets the maximum number of retries for the agent.
1088func WithMaxRetries(maxRetries int) AgentOption {
1089	return func(s *agentSettings) {
1090		s.maxRetries = &maxRetries
1091	}
1092}
1093
1094// WithOnRetry sets the retry callback for the agent.
1095func WithOnRetry(callback OnRetryCallback) AgentOption {
1096	return func(s *agentSettings) {
1097		s.onRetry = callback
1098	}
1099}
1100
1101// processStepStream processes a single step's stream and returns the step result.
1102func (a *agent) processStepStream(ctx context.Context, stream StreamResponse, opts AgentStreamCall, _ []StepResult, stepTools []AgentTool) (stepExecutionResult, error) {
1103	var stepContent []Content
1104	var stepToolCalls []ToolCallContent
1105	var stepUsage Usage
1106	stepFinishReason := FinishReasonUnknown
1107	var stepWarnings []CallWarning
1108	var stepProviderMetadata ProviderMetadata
1109
1110	activeToolCalls := make(map[string]*ToolCallContent)
1111	activeTextContent := make(map[string]string)
1112	type reasoningContent struct {
1113		content string
1114		options ProviderMetadata
1115	}
1116	activeReasoningContent := make(map[string]reasoningContent)
1117
1118	// Set up concurrent tool execution
1119	type toolExecutionRequest struct {
1120		toolCall ToolCallContent
1121		parallel bool
1122	}
1123	toolChan := make(chan toolExecutionRequest, 10)
1124	var toolExecutionWg sync.WaitGroup
1125	var toolStateMu sync.Mutex
1126	toolResults := make([]ToolResultContent, 0)
1127	var toolExecutionErr error
1128
1129	// Create a map for quick tool lookup
1130	toolMap := make(map[string]AgentTool)
1131	for _, tool := range stepTools {
1132		toolMap[tool.Info().Name] = tool
1133	}
1134
1135	// Semaphores for controlling parallelism
1136	parallelSem := make(chan struct{}, 5)
1137	var sequentialMu sync.Mutex
1138
1139	// Single coordinator goroutine that dispatches tools
1140	toolExecutionWg.Go(func() {
1141		for req := range toolChan {
1142			if req.parallel {
1143				parallelSem <- struct{}{}
1144				toolExecutionWg.Go(func() {
1145					defer func() { <-parallelSem }()
1146					result, isCriticalError := a.executeSingleTool(ctx, toolMap, req.toolCall, opts.OnToolResult)
1147					toolStateMu.Lock()
1148					toolResults = append(toolResults, result)
1149					if isCriticalError && toolExecutionErr == nil {
1150						if errorResult, ok := result.Result.(ToolResultOutputContentError); ok && errorResult.Error != nil {
1151							toolExecutionErr = errorResult.Error
1152						}
1153					}
1154					toolStateMu.Unlock()
1155				})
1156			} else {
1157				sequentialMu.Lock()
1158				result, isCriticalError := a.executeSingleTool(ctx, toolMap, req.toolCall, opts.OnToolResult)
1159				toolStateMu.Lock()
1160				toolResults = append(toolResults, result)
1161				if isCriticalError && toolExecutionErr == nil {
1162					if errorResult, ok := result.Result.(ToolResultOutputContentError); ok && errorResult.Error != nil {
1163						toolExecutionErr = errorResult.Error
1164					}
1165				}
1166				toolStateMu.Unlock()
1167				sequentialMu.Unlock()
1168			}
1169		}
1170	})
1171
1172	// Process stream parts
1173	for part := range stream {
1174		// Forward all parts to chunk callback
1175		if opts.OnChunk != nil {
1176			err := opts.OnChunk(part)
1177			if err != nil {
1178				return stepExecutionResult{}, err
1179			}
1180		}
1181
1182		switch part.Type {
1183		case StreamPartTypeWarnings:
1184			stepWarnings = part.Warnings
1185			if opts.OnWarnings != nil {
1186				err := opts.OnWarnings(part.Warnings)
1187				if err != nil {
1188					return stepExecutionResult{}, err
1189				}
1190			}
1191
1192		case StreamPartTypeTextStart:
1193			activeTextContent[part.ID] = ""
1194			if opts.OnTextStart != nil {
1195				err := opts.OnTextStart(part.ID)
1196				if err != nil {
1197					return stepExecutionResult{}, err
1198				}
1199			}
1200
1201		case StreamPartTypeTextDelta:
1202			if _, exists := activeTextContent[part.ID]; exists {
1203				activeTextContent[part.ID] += part.Delta
1204			}
1205			if opts.OnTextDelta != nil {
1206				err := opts.OnTextDelta(part.ID, part.Delta)
1207				if err != nil {
1208					return stepExecutionResult{}, err
1209				}
1210			}
1211
1212		case StreamPartTypeTextEnd:
1213			if text, exists := activeTextContent[part.ID]; exists {
1214				stepContent = append(stepContent, TextContent{
1215					Text:             text,
1216					ProviderMetadata: part.ProviderMetadata,
1217				})
1218				delete(activeTextContent, part.ID)
1219			}
1220			if opts.OnTextEnd != nil {
1221				err := opts.OnTextEnd(part.ID)
1222				if err != nil {
1223					return stepExecutionResult{}, err
1224				}
1225			}
1226
1227		case StreamPartTypeReasoningStart:
1228			activeReasoningContent[part.ID] = reasoningContent{content: part.Delta, options: part.ProviderMetadata}
1229			if opts.OnReasoningStart != nil {
1230				content := ReasoningContent{
1231					Text:             part.Delta,
1232					ProviderMetadata: part.ProviderMetadata,
1233				}
1234				err := opts.OnReasoningStart(part.ID, content)
1235				if err != nil {
1236					return stepExecutionResult{}, err
1237				}
1238			}
1239
1240		case StreamPartTypeReasoningDelta:
1241			if active, exists := activeReasoningContent[part.ID]; exists {
1242				active.content += part.Delta
1243				active.options = part.ProviderMetadata
1244				activeReasoningContent[part.ID] = active
1245			}
1246			if opts.OnReasoningDelta != nil {
1247				err := opts.OnReasoningDelta(part.ID, part.Delta)
1248				if err != nil {
1249					return stepExecutionResult{}, err
1250				}
1251			}
1252
1253		case StreamPartTypeReasoningEnd:
1254			if active, exists := activeReasoningContent[part.ID]; exists {
1255				if part.ProviderMetadata != nil {
1256					active.options = part.ProviderMetadata
1257				}
1258				content := ReasoningContent{
1259					Text:             active.content,
1260					ProviderMetadata: active.options,
1261				}
1262				stepContent = append(stepContent, content)
1263				if opts.OnReasoningEnd != nil {
1264					err := opts.OnReasoningEnd(part.ID, content)
1265					if err != nil {
1266						return stepExecutionResult{}, err
1267					}
1268				}
1269				delete(activeReasoningContent, part.ID)
1270			}
1271
1272		case StreamPartTypeToolInputStart:
1273			activeToolCalls[part.ID] = &ToolCallContent{
1274				ToolCallID:       part.ID,
1275				ToolName:         part.ToolCallName,
1276				Input:            "",
1277				ProviderExecuted: part.ProviderExecuted,
1278			}
1279			if opts.OnToolInputStart != nil {
1280				err := opts.OnToolInputStart(part.ID, part.ToolCallName)
1281				if err != nil {
1282					return stepExecutionResult{}, err
1283				}
1284			}
1285
1286		case StreamPartTypeToolInputDelta:
1287			if toolCall, exists := activeToolCalls[part.ID]; exists {
1288				toolCall.Input += part.Delta
1289			}
1290			if opts.OnToolInputDelta != nil {
1291				err := opts.OnToolInputDelta(part.ID, part.Delta)
1292				if err != nil {
1293					return stepExecutionResult{}, err
1294				}
1295			}
1296
1297		case StreamPartTypeToolInputEnd:
1298			if opts.OnToolInputEnd != nil {
1299				err := opts.OnToolInputEnd(part.ID)
1300				if err != nil {
1301					return stepExecutionResult{}, err
1302				}
1303			}
1304
1305		case StreamPartTypeToolCall:
1306			toolCall := ToolCallContent{
1307				ToolCallID:       part.ID,
1308				ToolName:         part.ToolCallName,
1309				Input:            part.ToolCallInput,
1310				ProviderExecuted: part.ProviderExecuted,
1311				ProviderMetadata: part.ProviderMetadata,
1312			}
1313
1314			// Validate and potentially repair the tool call
1315			validatedToolCall := a.validateAndRepairToolCall(ctx, toolCall, stepTools, a.settings.systemPrompt, nil, opts.RepairToolCall)
1316			stepToolCalls = append(stepToolCalls, validatedToolCall)
1317			stepContent = append(stepContent, validatedToolCall)
1318
1319			if opts.OnToolCall != nil {
1320				err := opts.OnToolCall(validatedToolCall)
1321				if err != nil {
1322					return stepExecutionResult{}, err
1323				}
1324			}
1325
1326			// Determine if tool can run in parallel
1327			isParallel := false
1328			if tool, exists := toolMap[validatedToolCall.ToolName]; exists {
1329				isParallel = tool.Info().Parallel
1330			}
1331
1332			// Send tool call to execution channel
1333			toolChan <- toolExecutionRequest{toolCall: validatedToolCall, parallel: isParallel}
1334
1335			// Clean up active tool call
1336			delete(activeToolCalls, part.ID)
1337
1338		case StreamPartTypeSource:
1339			sourceContent := SourceContent{
1340				SourceType:       part.SourceType,
1341				ID:               part.ID,
1342				URL:              part.URL,
1343				Title:            part.Title,
1344				ProviderMetadata: part.ProviderMetadata,
1345			}
1346			stepContent = append(stepContent, sourceContent)
1347			if opts.OnSource != nil {
1348				err := opts.OnSource(sourceContent)
1349				if err != nil {
1350					return stepExecutionResult{}, err
1351				}
1352			}
1353
1354		case StreamPartTypeFinish:
1355			stepUsage = part.Usage
1356			stepFinishReason = part.FinishReason
1357			stepProviderMetadata = part.ProviderMetadata
1358			if opts.OnStreamFinish != nil {
1359				err := opts.OnStreamFinish(part.Usage, part.FinishReason, part.ProviderMetadata)
1360				if err != nil {
1361					return stepExecutionResult{}, err
1362				}
1363			}
1364
1365		case StreamPartTypeError:
1366			return stepExecutionResult{}, part.Error
1367		}
1368	}
1369
1370	// Close the tool execution channel and wait for all executions to complete
1371	close(toolChan)
1372	toolExecutionWg.Wait()
1373
1374	// Check for tool execution errors
1375	if toolExecutionErr != nil {
1376		return stepExecutionResult{}, toolExecutionErr
1377	}
1378
1379	// Add tool results to content if any
1380	if len(toolResults) > 0 {
1381		for _, result := range toolResults {
1382			stepContent = append(stepContent, result)
1383		}
1384	}
1385
1386	stepResult := StepResult{
1387		Response: Response{
1388			Content:          stepContent,
1389			FinishReason:     stepFinishReason,
1390			Usage:            stepUsage,
1391			Warnings:         stepWarnings,
1392			ProviderMetadata: stepProviderMetadata,
1393		},
1394		Messages: toResponseMessages(stepContent),
1395	}
1396
1397	// Determine if we should continue (has tool calls and not stopped)
1398	shouldContinue := len(stepToolCalls) > 0 && stepFinishReason == FinishReasonToolCalls
1399
1400	return stepExecutionResult{
1401		StepResult:     stepResult,
1402		ShouldContinue: shouldContinue,
1403	}, nil
1404}
1405
1406func addUsage(a, b Usage) Usage {
1407	return Usage{
1408		InputTokens:         a.InputTokens + b.InputTokens,
1409		OutputTokens:        a.OutputTokens + b.OutputTokens,
1410		TotalTokens:         a.TotalTokens + b.TotalTokens,
1411		ReasoningTokens:     a.ReasoningTokens + b.ReasoningTokens,
1412		CacheCreationTokens: a.CacheCreationTokens + b.CacheCreationTokens,
1413		CacheReadTokens:     a.CacheReadTokens + b.CacheReadTokens,
1414	}
1415}
1416
1417// WithHeaders sets the headers for the agent.
1418func WithHeaders(headers map[string]string) AgentOption {
1419	return func(s *agentSettings) {
1420		s.headers = headers
1421	}
1422}
1423
1424// WithUserAgent sets the User-Agent header for the agent. This overrides any
1425// provider-level User-Agent setting.
1426func WithUserAgent(ua string) AgentOption {
1427	return func(s *agentSettings) {
1428		s.userAgent = ua
1429	}
1430}
1431
1432// WithProviderOptions sets the provider options for the agent.
1433func WithProviderOptions(providerOptions ProviderOptions) AgentOption {
1434	return func(s *agentSettings) {
1435		s.providerOptions = providerOptions
1436	}
1437}