1package anthropic
2
3import (
4 "cmp"
5 "context"
6 "encoding/base64"
7 "encoding/json"
8 "errors"
9 "fmt"
10 "io"
11 "maps"
12 "strings"
13
14 "github.com/anthropics/anthropic-sdk-go"
15 "github.com/anthropics/anthropic-sdk-go/option"
16 "github.com/anthropics/anthropic-sdk-go/packages/param"
17 "github.com/charmbracelet/ai/ai"
18)
19
20type options struct {
21 baseURL string
22 apiKey string
23 name string
24 headers map[string]string
25 client option.HTTPClient
26}
27
28type provider struct {
29 options options
30}
31
32type Option = func(*options)
33
34func New(opts ...Option) ai.Provider {
35 options := options{
36 headers: map[string]string{},
37 }
38 for _, o := range opts {
39 o(&options)
40 }
41
42 options.baseURL = cmp.Or(options.baseURL, "https://api.anthropic.com")
43 options.name = cmp.Or(options.name, "anthropic")
44
45 return &provider{options: options}
46}
47
48func WithBaseURL(baseURL string) Option {
49 return func(o *options) {
50 o.baseURL = baseURL
51 }
52}
53
54func WithAPIKey(apiKey string) Option {
55 return func(o *options) {
56 o.apiKey = apiKey
57 }
58}
59
60func WithName(name string) Option {
61 return func(o *options) {
62 o.name = name
63 }
64}
65
66func WithHeaders(headers map[string]string) Option {
67 return func(o *options) {
68 maps.Copy(o.headers, headers)
69 }
70}
71
72func WithHTTPClient(client option.HTTPClient) Option {
73 return func(o *options) {
74 o.client = client
75 }
76}
77
78func (a *provider) LanguageModel(modelID string) (ai.LanguageModel, error) {
79 anthropicClientOptions := []option.RequestOption{}
80 if a.options.apiKey != "" {
81 anthropicClientOptions = append(anthropicClientOptions, option.WithAPIKey(a.options.apiKey))
82 }
83 if a.options.baseURL != "" {
84 anthropicClientOptions = append(anthropicClientOptions, option.WithBaseURL(a.options.baseURL))
85 }
86
87 for key, value := range a.options.headers {
88 anthropicClientOptions = append(anthropicClientOptions, option.WithHeader(key, value))
89 }
90
91 if a.options.client != nil {
92 anthropicClientOptions = append(anthropicClientOptions, option.WithHTTPClient(a.options.client))
93 }
94 return languageModel{
95 modelID: modelID,
96 provider: fmt.Sprintf("%s.messages", a.options.name),
97 options: a.options,
98 client: anthropic.NewClient(anthropicClientOptions...),
99 }, nil
100}
101
102type languageModel struct {
103 provider string
104 modelID string
105 client anthropic.Client
106 options options
107}
108
109// Model implements ai.LanguageModel.
110func (a languageModel) Model() string {
111 return a.modelID
112}
113
114// Provider implements ai.LanguageModel.
115func (a languageModel) Provider() string {
116 return a.provider
117}
118
119func (a languageModel) prepareParams(call ai.Call) (*anthropic.MessageNewParams, []ai.CallWarning, error) {
120 params := &anthropic.MessageNewParams{}
121 providerOptions := &ProviderOptions{}
122 if v, ok := call.ProviderOptions["anthropic"]; ok {
123 err := ai.ParseOptions(v, providerOptions)
124 if err != nil {
125 return nil, nil, err
126 }
127 }
128 sendReasoning := true
129 if providerOptions.SendReasoning != nil {
130 sendReasoning = *providerOptions.SendReasoning
131 }
132 systemBlocks, messages, warnings := toPrompt(call.Prompt, sendReasoning)
133
134 if call.FrequencyPenalty != nil {
135 warnings = append(warnings, ai.CallWarning{
136 Type: ai.CallWarningTypeUnsupportedSetting,
137 Setting: "FrequencyPenalty",
138 })
139 }
140 if call.PresencePenalty != nil {
141 warnings = append(warnings, ai.CallWarning{
142 Type: ai.CallWarningTypeUnsupportedSetting,
143 Setting: "PresencePenalty",
144 })
145 }
146
147 params.System = systemBlocks
148 params.Messages = messages
149 params.Model = anthropic.Model(a.modelID)
150 params.MaxTokens = 4096
151
152 if call.MaxOutputTokens != nil {
153 params.MaxTokens = *call.MaxOutputTokens
154 }
155
156 if call.Temperature != nil {
157 params.Temperature = param.NewOpt(*call.Temperature)
158 }
159 if call.TopK != nil {
160 params.TopK = param.NewOpt(*call.TopK)
161 }
162 if call.TopP != nil {
163 params.TopP = param.NewOpt(*call.TopP)
164 }
165
166 isThinking := false
167 var thinkingBudget int64
168 if providerOptions.Thinking != nil {
169 isThinking = true
170 thinkingBudget = providerOptions.Thinking.BudgetTokens
171 }
172 if isThinking {
173 if thinkingBudget == 0 {
174 return nil, nil, ai.NewUnsupportedFunctionalityError("thinking requires budget", "")
175 }
176 params.Thinking = anthropic.ThinkingConfigParamOfEnabled(thinkingBudget)
177 if call.Temperature != nil {
178 params.Temperature = param.Opt[float64]{}
179 warnings = append(warnings, ai.CallWarning{
180 Type: ai.CallWarningTypeUnsupportedSetting,
181 Setting: "temperature",
182 Details: "temperature is not supported when thinking is enabled",
183 })
184 }
185 if call.TopP != nil {
186 params.TopP = param.Opt[float64]{}
187 warnings = append(warnings, ai.CallWarning{
188 Type: ai.CallWarningTypeUnsupportedSetting,
189 Setting: "TopP",
190 Details: "TopP is not supported when thinking is enabled",
191 })
192 }
193 if call.TopK != nil {
194 params.TopK = param.Opt[int64]{}
195 warnings = append(warnings, ai.CallWarning{
196 Type: ai.CallWarningTypeUnsupportedSetting,
197 Setting: "TopK",
198 Details: "TopK is not supported when thinking is enabled",
199 })
200 }
201 params.MaxTokens = params.MaxTokens + thinkingBudget
202 }
203
204 if len(call.Tools) > 0 {
205 disableParallelToolUse := false
206 if providerOptions.DisableParallelToolUse != nil {
207 disableParallelToolUse = *providerOptions.DisableParallelToolUse
208 }
209 tools, toolChoice, toolWarnings := toTools(call.Tools, call.ToolChoice, disableParallelToolUse)
210 params.Tools = tools
211 if toolChoice != nil {
212 params.ToolChoice = *toolChoice
213 }
214 warnings = append(warnings, toolWarnings...)
215 }
216
217 return params, warnings, nil
218}
219
220func getCacheControl(providerOptions ai.ProviderOptions) *CacheControlProviderOptions {
221 if anthropicOptions, ok := providerOptions["anthropic"]; ok {
222 if cacheControl, ok := anthropicOptions["cache_control"]; ok {
223 if cc, ok := cacheControl.(map[string]any); ok {
224 cacheControlOption := &CacheControlProviderOptions{}
225 err := ai.ParseOptions(cc, cacheControlOption)
226 if err == nil {
227 return cacheControlOption
228 }
229 }
230 } else if cacheControl, ok := anthropicOptions["cacheControl"]; ok {
231 if cc, ok := cacheControl.(map[string]any); ok {
232 cacheControlOption := &CacheControlProviderOptions{}
233 err := ai.ParseOptions(cc, cacheControlOption)
234 if err == nil {
235 return cacheControlOption
236 }
237 }
238 }
239 }
240 return nil
241}
242
243func getReasoningMetadata(providerOptions ai.ProviderOptions) *ReasoningMetadata {
244 if anthropicOptions, ok := providerOptions["anthropic"]; ok {
245 reasoningMetadata := &ReasoningMetadata{}
246 err := ai.ParseOptions(anthropicOptions, reasoningMetadata)
247 if err == nil {
248 return reasoningMetadata
249 }
250 }
251 return nil
252}
253
254type messageBlock struct {
255 Role ai.MessageRole
256 Messages []ai.Message
257}
258
259func groupIntoBlocks(prompt ai.Prompt) []*messageBlock {
260 var blocks []*messageBlock
261
262 var currentBlock *messageBlock
263
264 for _, msg := range prompt {
265 switch msg.Role {
266 case ai.MessageRoleSystem:
267 if currentBlock == nil || currentBlock.Role != ai.MessageRoleSystem {
268 currentBlock = &messageBlock{
269 Role: ai.MessageRoleSystem,
270 Messages: []ai.Message{},
271 }
272 blocks = append(blocks, currentBlock)
273 }
274 currentBlock.Messages = append(currentBlock.Messages, msg)
275 case ai.MessageRoleUser:
276 if currentBlock == nil || currentBlock.Role != ai.MessageRoleUser {
277 currentBlock = &messageBlock{
278 Role: ai.MessageRoleUser,
279 Messages: []ai.Message{},
280 }
281 blocks = append(blocks, currentBlock)
282 }
283 currentBlock.Messages = append(currentBlock.Messages, msg)
284 case ai.MessageRoleAssistant:
285 if currentBlock == nil || currentBlock.Role != ai.MessageRoleAssistant {
286 currentBlock = &messageBlock{
287 Role: ai.MessageRoleAssistant,
288 Messages: []ai.Message{},
289 }
290 blocks = append(blocks, currentBlock)
291 }
292 currentBlock.Messages = append(currentBlock.Messages, msg)
293 case ai.MessageRoleTool:
294 if currentBlock == nil || currentBlock.Role != ai.MessageRoleUser {
295 currentBlock = &messageBlock{
296 Role: ai.MessageRoleUser,
297 Messages: []ai.Message{},
298 }
299 blocks = append(blocks, currentBlock)
300 }
301 currentBlock.Messages = append(currentBlock.Messages, msg)
302 }
303 }
304 return blocks
305}
306
307func toTools(tools []ai.Tool, toolChoice *ai.ToolChoice, disableParallelToolCalls bool) (anthropicTools []anthropic.ToolUnionParam, anthropicToolChoice *anthropic.ToolChoiceUnionParam, warnings []ai.CallWarning) {
308 for _, tool := range tools {
309 if tool.GetType() == ai.ToolTypeFunction {
310 ft, ok := tool.(ai.FunctionTool)
311 if !ok {
312 continue
313 }
314 required := []string{}
315 var properties any
316 if props, ok := ft.InputSchema["properties"]; ok {
317 properties = props
318 }
319 if req, ok := ft.InputSchema["required"]; ok {
320 if reqArr, ok := req.([]string); ok {
321 required = reqArr
322 }
323 }
324 cacheControl := getCacheControl(ft.ProviderOptions)
325
326 anthropicTool := anthropic.ToolParam{
327 Name: ft.Name,
328 Description: anthropic.String(ft.Description),
329 InputSchema: anthropic.ToolInputSchemaParam{
330 Properties: properties,
331 Required: required,
332 },
333 }
334 if cacheControl != nil {
335 anthropicTool.CacheControl = anthropic.NewCacheControlEphemeralParam()
336 }
337 anthropicTools = append(anthropicTools, anthropic.ToolUnionParam{OfTool: &anthropicTool})
338 continue
339 }
340 // TODO: handle provider tool calls
341 warnings = append(warnings, ai.CallWarning{
342 Type: ai.CallWarningTypeUnsupportedTool,
343 Tool: tool,
344 Message: "tool is not supported",
345 })
346 }
347 if toolChoice == nil {
348 if disableParallelToolCalls {
349 anthropicToolChoice = &anthropic.ToolChoiceUnionParam{
350 OfAuto: &anthropic.ToolChoiceAutoParam{
351 Type: "auto",
352 DisableParallelToolUse: param.NewOpt(disableParallelToolCalls),
353 },
354 }
355 }
356 return anthropicTools, anthropicToolChoice, warnings
357 }
358
359 switch *toolChoice {
360 case ai.ToolChoiceAuto:
361 anthropicToolChoice = &anthropic.ToolChoiceUnionParam{
362 OfAuto: &anthropic.ToolChoiceAutoParam{
363 Type: "auto",
364 DisableParallelToolUse: param.NewOpt(disableParallelToolCalls),
365 },
366 }
367 case ai.ToolChoiceRequired:
368 anthropicToolChoice = &anthropic.ToolChoiceUnionParam{
369 OfAny: &anthropic.ToolChoiceAnyParam{
370 Type: "any",
371 DisableParallelToolUse: param.NewOpt(disableParallelToolCalls),
372 },
373 }
374 case ai.ToolChoiceNone:
375 return anthropicTools, anthropicToolChoice, warnings
376 default:
377 anthropicToolChoice = &anthropic.ToolChoiceUnionParam{
378 OfTool: &anthropic.ToolChoiceToolParam{
379 Type: "tool",
380 Name: string(*toolChoice),
381 DisableParallelToolUse: param.NewOpt(disableParallelToolCalls),
382 },
383 }
384 }
385 return anthropicTools, anthropicToolChoice, warnings
386}
387
388func toPrompt(prompt ai.Prompt, sendReasoningData bool) ([]anthropic.TextBlockParam, []anthropic.MessageParam, []ai.CallWarning) {
389 var systemBlocks []anthropic.TextBlockParam
390 var messages []anthropic.MessageParam
391 var warnings []ai.CallWarning
392
393 blocks := groupIntoBlocks(prompt)
394 finishedSystemBlock := false
395 for _, block := range blocks {
396 switch block.Role {
397 case ai.MessageRoleSystem:
398 if finishedSystemBlock {
399 // skip multiple system messages that are separated by user/assistant messages
400 // TODO: see if we need to send error here?
401 continue
402 }
403 finishedSystemBlock = true
404 for _, msg := range block.Messages {
405 for _, part := range msg.Content {
406 cacheControl := getCacheControl(part.Options())
407 text, ok := ai.AsMessagePart[ai.TextPart](part)
408 if !ok {
409 continue
410 }
411 textBlock := anthropic.TextBlockParam{
412 Text: text.Text,
413 }
414 if cacheControl != nil {
415 textBlock.CacheControl = anthropic.NewCacheControlEphemeralParam()
416 }
417 systemBlocks = append(systemBlocks, textBlock)
418 }
419 }
420
421 case ai.MessageRoleUser:
422 var anthropicContent []anthropic.ContentBlockParamUnion
423 for _, msg := range block.Messages {
424 if msg.Role == ai.MessageRoleUser {
425 for i, part := range msg.Content {
426 isLastPart := i == len(msg.Content)-1
427 cacheControl := getCacheControl(part.Options())
428 if cacheControl == nil && isLastPart {
429 cacheControl = getCacheControl(msg.ProviderOptions)
430 }
431 switch part.GetType() {
432 case ai.ContentTypeText:
433 text, ok := ai.AsMessagePart[ai.TextPart](part)
434 if !ok {
435 continue
436 }
437 textBlock := &anthropic.TextBlockParam{
438 Text: text.Text,
439 }
440 if cacheControl != nil {
441 textBlock.CacheControl = anthropic.NewCacheControlEphemeralParam()
442 }
443 anthropicContent = append(anthropicContent, anthropic.ContentBlockParamUnion{
444 OfText: textBlock,
445 })
446 case ai.ContentTypeFile:
447 file, ok := ai.AsMessagePart[ai.FilePart](part)
448 if !ok {
449 continue
450 }
451 // TODO: handle other file types
452 if !strings.HasPrefix(file.MediaType, "image/") {
453 continue
454 }
455
456 base64Encoded := base64.StdEncoding.EncodeToString(file.Data)
457 imageBlock := anthropic.NewImageBlockBase64(file.MediaType, base64Encoded)
458 if cacheControl != nil {
459 imageBlock.OfImage.CacheControl = anthropic.NewCacheControlEphemeralParam()
460 }
461 anthropicContent = append(anthropicContent, imageBlock)
462 }
463 }
464 } else if msg.Role == ai.MessageRoleTool {
465 for i, part := range msg.Content {
466 isLastPart := i == len(msg.Content)-1
467 cacheControl := getCacheControl(part.Options())
468 if cacheControl == nil && isLastPart {
469 cacheControl = getCacheControl(msg.ProviderOptions)
470 }
471 result, ok := ai.AsMessagePart[ai.ToolResultPart](part)
472 if !ok {
473 continue
474 }
475 toolResultBlock := anthropic.ToolResultBlockParam{
476 ToolUseID: result.ToolCallID,
477 }
478 switch result.Output.GetType() {
479 case ai.ToolResultContentTypeText:
480 content, ok := ai.AsToolResultOutputType[ai.ToolResultOutputContentText](result.Output)
481 if !ok {
482 continue
483 }
484 toolResultBlock.Content = []anthropic.ToolResultBlockParamContentUnion{
485 {
486 OfText: &anthropic.TextBlockParam{
487 Text: content.Text,
488 },
489 },
490 }
491 case ai.ToolResultContentTypeMedia:
492 content, ok := ai.AsToolResultOutputType[ai.ToolResultOutputContentMedia](result.Output)
493 if !ok {
494 continue
495 }
496 toolResultBlock.Content = []anthropic.ToolResultBlockParamContentUnion{
497 {
498 OfImage: anthropic.NewImageBlockBase64(content.MediaType, content.Data).OfImage,
499 },
500 }
501 case ai.ToolResultContentTypeError:
502 content, ok := ai.AsToolResultOutputType[ai.ToolResultOutputContentError](result.Output)
503 if !ok {
504 continue
505 }
506 toolResultBlock.Content = []anthropic.ToolResultBlockParamContentUnion{
507 {
508 OfText: &anthropic.TextBlockParam{
509 Text: content.Error.Error(),
510 },
511 },
512 }
513 toolResultBlock.IsError = param.NewOpt(true)
514 }
515 if cacheControl != nil {
516 toolResultBlock.CacheControl = anthropic.NewCacheControlEphemeralParam()
517 }
518 anthropicContent = append(anthropicContent, anthropic.ContentBlockParamUnion{
519 OfToolResult: &toolResultBlock,
520 })
521 }
522 }
523 }
524 messages = append(messages, anthropic.NewUserMessage(anthropicContent...))
525 case ai.MessageRoleAssistant:
526 var anthropicContent []anthropic.ContentBlockParamUnion
527 for _, msg := range block.Messages {
528 for i, part := range msg.Content {
529 isLastPart := i == len(msg.Content)-1
530 cacheControl := getCacheControl(part.Options())
531 if cacheControl == nil && isLastPart {
532 cacheControl = getCacheControl(msg.ProviderOptions)
533 }
534 switch part.GetType() {
535 case ai.ContentTypeText:
536 text, ok := ai.AsMessagePart[ai.TextPart](part)
537 if !ok {
538 continue
539 }
540 textBlock := &anthropic.TextBlockParam{
541 Text: text.Text,
542 }
543 if cacheControl != nil {
544 textBlock.CacheControl = anthropic.NewCacheControlEphemeralParam()
545 }
546 anthropicContent = append(anthropicContent, anthropic.ContentBlockParamUnion{
547 OfText: textBlock,
548 })
549 case ai.ContentTypeReasoning:
550 reasoning, ok := ai.AsMessagePart[ai.ReasoningPart](part)
551 if !ok {
552 continue
553 }
554 if !sendReasoningData {
555 warnings = append(warnings, ai.CallWarning{
556 Type: "other",
557 Message: "sending reasoning content is disabled for this model",
558 })
559 continue
560 }
561 reasoningMetadata := getReasoningMetadata(part.Options())
562 if reasoningMetadata == nil {
563 warnings = append(warnings, ai.CallWarning{
564 Type: "other",
565 Message: "unsupported reasoning metadata",
566 })
567 continue
568 }
569
570 if reasoningMetadata.Signature != "" {
571 anthropicContent = append(anthropicContent, anthropic.NewThinkingBlock(reasoningMetadata.Signature, reasoning.Text))
572 } else if reasoningMetadata.RedactedData != "" {
573 anthropicContent = append(anthropicContent, anthropic.NewRedactedThinkingBlock(reasoningMetadata.RedactedData))
574 } else {
575 warnings = append(warnings, ai.CallWarning{
576 Type: "other",
577 Message: "unsupported reasoning metadata",
578 })
579 continue
580 }
581 case ai.ContentTypeToolCall:
582 toolCall, ok := ai.AsMessagePart[ai.ToolCallPart](part)
583 if !ok {
584 continue
585 }
586 if toolCall.ProviderExecuted {
587 // TODO: implement provider executed call
588 continue
589 }
590
591 var inputMap map[string]any
592 err := json.Unmarshal([]byte(toolCall.Input), &inputMap)
593 if err != nil {
594 continue
595 }
596 toolUseBlock := anthropic.NewToolUseBlock(toolCall.ToolCallID, inputMap, toolCall.ToolName)
597 if cacheControl != nil {
598 toolUseBlock.OfToolUse.CacheControl = anthropic.NewCacheControlEphemeralParam()
599 }
600 anthropicContent = append(anthropicContent, toolUseBlock)
601 case ai.ContentTypeToolResult:
602 // TODO: implement provider executed tool result
603 }
604 }
605 }
606 messages = append(messages, anthropic.NewAssistantMessage(anthropicContent...))
607 }
608 }
609 return systemBlocks, messages, warnings
610}
611
612func (o languageModel) handleError(err error) error {
613 var apiErr *anthropic.Error
614 if errors.As(err, &apiErr) {
615 requestDump := apiErr.DumpRequest(true)
616 responseDump := apiErr.DumpResponse(true)
617 headers := map[string]string{}
618 for k, h := range apiErr.Response.Header {
619 v := h[len(h)-1]
620 headers[strings.ToLower(k)] = v
621 }
622 return ai.NewAPICallError(
623 apiErr.Error(),
624 apiErr.Request.URL.String(),
625 string(requestDump),
626 apiErr.StatusCode,
627 headers,
628 string(responseDump),
629 apiErr,
630 false,
631 )
632 }
633 return err
634}
635
636func mapFinishReason(finishReason string) ai.FinishReason {
637 switch finishReason {
638 case "end_turn", "pause_turn", "stop_sequence":
639 return ai.FinishReasonStop
640 case "max_tokens":
641 return ai.FinishReasonLength
642 case "tool_use":
643 return ai.FinishReasonToolCalls
644 default:
645 return ai.FinishReasonUnknown
646 }
647}
648
649// Generate implements ai.LanguageModel.
650func (a languageModel) Generate(ctx context.Context, call ai.Call) (*ai.Response, error) {
651 params, warnings, err := a.prepareParams(call)
652 if err != nil {
653 return nil, err
654 }
655 response, err := a.client.Messages.New(ctx, *params)
656 if err != nil {
657 return nil, a.handleError(err)
658 }
659
660 var content []ai.Content
661 for _, block := range response.Content {
662 switch block.Type {
663 case "text":
664 text, ok := block.AsAny().(anthropic.TextBlock)
665 if !ok {
666 continue
667 }
668 content = append(content, ai.TextContent{
669 Text: text.Text,
670 })
671 case "thinking":
672 reasoning, ok := block.AsAny().(anthropic.ThinkingBlock)
673 if !ok {
674 continue
675 }
676 content = append(content, ai.ReasoningContent{
677 Text: reasoning.Thinking,
678 ProviderMetadata: map[string]map[string]any{
679 "anthropic": {
680 "signature": reasoning.Signature,
681 },
682 },
683 })
684 case "redacted_thinking":
685 reasoning, ok := block.AsAny().(anthropic.RedactedThinkingBlock)
686 if !ok {
687 continue
688 }
689 content = append(content, ai.ReasoningContent{
690 Text: "",
691 ProviderMetadata: map[string]map[string]any{
692 "anthropic": {
693 "redacted_data": reasoning.Data,
694 },
695 },
696 })
697 case "tool_use":
698 toolUse, ok := block.AsAny().(anthropic.ToolUseBlock)
699 if !ok {
700 continue
701 }
702 content = append(content, ai.ToolCallContent{
703 ToolCallID: toolUse.ID,
704 ToolName: toolUse.Name,
705 Input: string(toolUse.Input),
706 ProviderExecuted: false,
707 })
708 }
709 }
710
711 return &ai.Response{
712 Content: content,
713 Usage: ai.Usage{
714 InputTokens: response.Usage.InputTokens,
715 OutputTokens: response.Usage.OutputTokens,
716 TotalTokens: response.Usage.InputTokens + response.Usage.OutputTokens,
717 CacheCreationTokens: response.Usage.CacheCreationInputTokens,
718 CacheReadTokens: response.Usage.CacheReadInputTokens,
719 },
720 FinishReason: mapFinishReason(string(response.StopReason)),
721 ProviderMetadata: ai.ProviderMetadata{
722 "anthropic": make(map[string]any),
723 },
724 Warnings: warnings,
725 }, nil
726}
727
728// Stream implements ai.LanguageModel.
729func (a languageModel) Stream(ctx context.Context, call ai.Call) (ai.StreamResponse, error) {
730 params, warnings, err := a.prepareParams(call)
731 if err != nil {
732 return nil, err
733 }
734
735 stream := a.client.Messages.NewStreaming(ctx, *params)
736 acc := anthropic.Message{}
737 return func(yield func(ai.StreamPart) bool) {
738 if len(warnings) > 0 {
739 if !yield(ai.StreamPart{
740 Type: ai.StreamPartTypeWarnings,
741 Warnings: warnings,
742 }) {
743 return
744 }
745 }
746
747 for stream.Next() {
748 chunk := stream.Current()
749 _ = acc.Accumulate(chunk)
750 switch chunk.Type {
751 case "content_block_start":
752 contentBlockType := chunk.ContentBlock.Type
753 switch contentBlockType {
754 case "text":
755 if !yield(ai.StreamPart{
756 Type: ai.StreamPartTypeTextStart,
757 ID: fmt.Sprintf("%d", chunk.Index),
758 }) {
759 return
760 }
761 case "thinking":
762 if !yield(ai.StreamPart{
763 Type: ai.StreamPartTypeReasoningStart,
764 ID: fmt.Sprintf("%d", chunk.Index),
765 }) {
766 return
767 }
768 case "redacted_thinking":
769 if !yield(ai.StreamPart{
770 Type: ai.StreamPartTypeReasoningStart,
771 ID: fmt.Sprintf("%d", chunk.Index),
772 ProviderMetadata: ai.ProviderMetadata{
773 "anthropic": {
774 "redacted_data": chunk.ContentBlock.Data,
775 },
776 },
777 }) {
778 return
779 }
780 case "tool_use":
781 if !yield(ai.StreamPart{
782 Type: ai.StreamPartTypeToolInputStart,
783 ID: chunk.ContentBlock.ID,
784 ToolCallName: chunk.ContentBlock.Name,
785 ToolCallInput: "",
786 }) {
787 return
788 }
789 }
790 case "content_block_stop":
791 if len(acc.Content)-1 < int(chunk.Index) {
792 continue
793 }
794 contentBlock := acc.Content[int(chunk.Index)]
795 switch contentBlock.Type {
796 case "text":
797 if !yield(ai.StreamPart{
798 Type: ai.StreamPartTypeTextEnd,
799 ID: fmt.Sprintf("%d", chunk.Index),
800 }) {
801 return
802 }
803 case "thinking":
804 if !yield(ai.StreamPart{
805 Type: ai.StreamPartTypeReasoningEnd,
806 ID: fmt.Sprintf("%d", chunk.Index),
807 }) {
808 return
809 }
810 case "tool_use":
811 if !yield(ai.StreamPart{
812 Type: ai.StreamPartTypeToolInputEnd,
813 ID: contentBlock.ID,
814 }) {
815 return
816 }
817 if !yield(ai.StreamPart{
818 Type: ai.StreamPartTypeToolCall,
819 ID: contentBlock.ID,
820 ToolCallName: contentBlock.Name,
821 ToolCallInput: string(contentBlock.Input),
822 }) {
823 return
824 }
825 }
826 case "content_block_delta":
827 switch chunk.Delta.Type {
828 case "text_delta":
829 if !yield(ai.StreamPart{
830 Type: ai.StreamPartTypeTextDelta,
831 ID: fmt.Sprintf("%d", chunk.Index),
832 Delta: chunk.Delta.Text,
833 }) {
834 return
835 }
836 case "thinking_delta":
837 if !yield(ai.StreamPart{
838 Type: ai.StreamPartTypeReasoningDelta,
839 ID: fmt.Sprintf("%d", chunk.Index),
840 Delta: chunk.Delta.Thinking,
841 }) {
842 return
843 }
844 case "signature_delta":
845 if !yield(ai.StreamPart{
846 Type: ai.StreamPartTypeReasoningDelta,
847 ID: fmt.Sprintf("%d", chunk.Index),
848 ProviderMetadata: ai.ProviderMetadata{
849 "anthropic": {
850 "signature": chunk.Delta.Signature,
851 },
852 },
853 }) {
854 return
855 }
856 case "input_json_delta":
857 if len(acc.Content)-1 < int(chunk.Index) {
858 continue
859 }
860 contentBlock := acc.Content[int(chunk.Index)]
861 if !yield(ai.StreamPart{
862 Type: ai.StreamPartTypeToolInputDelta,
863 ID: contentBlock.ID,
864 ToolCallInput: chunk.Delta.PartialJSON,
865 }) {
866 return
867 }
868 }
869 case "message_stop":
870 }
871 }
872
873 err := stream.Err()
874 if err == nil || errors.Is(err, io.EOF) {
875 yield(ai.StreamPart{
876 Type: ai.StreamPartTypeFinish,
877 ID: acc.ID,
878 FinishReason: mapFinishReason(string(acc.StopReason)),
879 Usage: ai.Usage{
880 InputTokens: acc.Usage.InputTokens,
881 OutputTokens: acc.Usage.OutputTokens,
882 TotalTokens: acc.Usage.InputTokens + acc.Usage.OutputTokens,
883 CacheCreationTokens: acc.Usage.CacheCreationInputTokens,
884 CacheReadTokens: acc.Usage.CacheReadInputTokens,
885 },
886 ProviderMetadata: ai.ProviderMetadata{
887 "anthropic": make(map[string]any),
888 },
889 })
890 return
891 } else {
892 yield(ai.StreamPart{
893 Type: ai.StreamPartTypeError,
894 Error: a.handleError(err),
895 })
896 return
897 }
898 }, nil
899}