From bbf53dca72d1ea29bc6769bc968cfadbaaa5cb6a Mon Sep 17 00:00:00 2001 From: Christian Rocha Date: Mon, 13 Apr 2026 11:53:22 -0400 Subject: [PATCH] fix(agent): buffer tool calls Wait until streaming has finished before dispatching tool calls to avoid getting tool results without a matching call: essentially orphans. --- agent.go | 12 ++++++++++-- providers/anthropic/anthropic_test.go | 3 ++- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/agent.go b/agent.go index 34d4f8a1fb6efa1c6d0028bba905b1d52896ed1e..f0b981be6c57d6d3308dafc9e73541e6b4017471 100644 --- a/agent.go +++ b/agent.go @@ -1248,6 +1248,7 @@ func (a *agent) processStepStream(ctx context.Context, stream StreamResponse, op parallel bool } toolChan := make(chan toolExecutionRequest, 10) + var pendingDispatches []toolExecutionRequest var toolExecutionWg sync.WaitGroup var toolStateMu sync.Mutex toolResults := make([]ToolResultContent, 0) @@ -1475,8 +1476,9 @@ func (a *agent) processStepStream(ctx context.Context, stream StreamResponse, op isParallel = tool.Info().Parallel } - // Send tool call to execution channel - toolChan <- toolExecutionRequest{toolCall: validatedToolCall, parallel: isParallel} + // Buffer dispatch until stream is fully consumed so that all + // OnToolCall callbacks complete before any tool result is written. + pendingDispatches = append(pendingDispatches, toolExecutionRequest{toolCall: validatedToolCall, parallel: isParallel}) // Clean up active tool call delete(activeToolCalls, part.ID) @@ -1534,6 +1536,12 @@ func (a *agent) processStepStream(ctx context.Context, stream StreamResponse, op } } + // Dispatch all buffered tool calls now that the complete set is known and + // every OnToolCall callback has been called. + for _, req := range pendingDispatches { + toolChan <- req + } + // Close the tool execution channel and wait for all executions to complete close(toolChan) toolExecutionWg.Wait() diff --git a/providers/anthropic/anthropic_test.go b/providers/anthropic/anthropic_test.go index 4387a34faf7fe8900383e26859de13d306505664..d447954cbf1d78819e6091aedfa3da6af8b742fc 100644 --- a/providers/anthropic/anthropic_test.go +++ b/providers/anthropic/anthropic_test.go @@ -1574,7 +1574,8 @@ func TestComputerUseToolJSON(t *testing.T) { } _, err := computerUseToolJSON(pdt) require.Error(t, err) - require.Contains(t, err.Error(), "tool_version arg is missing") }) + require.Contains(t, err.Error(), "tool_version arg is missing") + }) t.Run("returns error for unsupported version", func(t *testing.T) { t.Parallel()