@@ -1248,6 +1248,7 @@ func (a *agent) processStepStream(ctx context.Context, stream StreamResponse, op
parallel bool
}
toolChan := make(chan toolExecutionRequest, 10)
+ var pendingDispatches []toolExecutionRequest
var toolExecutionWg sync.WaitGroup
var toolStateMu sync.Mutex
toolResults := make([]ToolResultContent, 0)
@@ -1475,8 +1476,9 @@ func (a *agent) processStepStream(ctx context.Context, stream StreamResponse, op
isParallel = tool.Info().Parallel
}
- // Send tool call to execution channel
- toolChan <- toolExecutionRequest{toolCall: validatedToolCall, parallel: isParallel}
+ // Buffer dispatch until stream is fully consumed so that all
+ // OnToolCall callbacks complete before any tool result is written.
+ pendingDispatches = append(pendingDispatches, toolExecutionRequest{toolCall: validatedToolCall, parallel: isParallel})
// Clean up active tool call
delete(activeToolCalls, part.ID)
@@ -1534,6 +1536,12 @@ func (a *agent) processStepStream(ctx context.Context, stream StreamResponse, op
}
}
+ // Dispatch all buffered tool calls now that the complete set is known and
+ // every OnToolCall callback has been called.
+ for _, req := range pendingDispatches {
+ toolChan <- req
+ }
+
// Close the tool execution channel and wait for all executions to complete
close(toolChan)
toolExecutionWg.Wait()
@@ -1574,7 +1574,8 @@ func TestComputerUseToolJSON(t *testing.T) {
}
_, err := computerUseToolJSON(pdt)
require.Error(t, err)
- require.Contains(t, err.Error(), "tool_version arg is missing") })
+ require.Contains(t, err.Error(), "tool_version arg is missing")
+ })
t.Run("returns error for unsupported version", func(t *testing.T) {
t.Parallel()