1package conversation
2
3import (
4 "context"
5 "encoding/json"
6 "errors"
7 "fmt"
8 "log/slog"
9 "maps"
10 "math/rand/v2"
11 "slices"
12 "strings"
13 "sync"
14 "time"
15
16 "github.com/oklog/ulid/v2"
17 "github.com/richardlehane/crock32"
18 "shelley.exe.dev/llm"
19 "sketch.dev/skribe"
20)
21
22type Listener interface {
23 // TODO: Content is leaking an anthropic API; should we avoid it?
24 // TODO: Where should we include start/end time and usage?
25 OnToolCall(ctx context.Context, convo *Convo, toolCallID, toolName string, toolInput json.RawMessage, content llm.Content)
26 OnToolResult(ctx context.Context, convo *Convo, toolCallID, toolName string, toolInput json.RawMessage, content llm.Content, result *string, err error)
27 OnRequest(ctx context.Context, convo *Convo, requestID string, msg *llm.Message)
28 OnResponse(ctx context.Context, convo *Convo, requestID string, msg *llm.Response)
29}
30
31type NoopListener struct{}
32
33func (n *NoopListener) OnToolCall(ctx context.Context, convo *Convo, id, toolName string, toolInput json.RawMessage, content llm.Content) {
34}
35
36func (n *NoopListener) OnToolResult(ctx context.Context, convo *Convo, id, toolName string, toolInput json.RawMessage, content llm.Content, result *string, err error) {
37}
38
39func (n *NoopListener) OnResponse(ctx context.Context, convo *Convo, id string, msg *llm.Response) {
40}
41func (n *NoopListener) OnRequest(ctx context.Context, convo *Convo, id string, msg *llm.Message) {}
42
43var ErrDoNotRespond = errors.New("do not respond")
44
45// A Convo is a managed conversation with Claude.
46// It automatically manages the state of the conversation,
47// including appending messages send/received,
48// calling tools and sending their results,
49// tracking usage, etc.
50//
51// Exported fields must not be altered concurrently with calling any method on Convo.
52// Typical usage is to configure a Convo once before using it.
53type Convo struct {
54 // ID is a unique ID for the conversation
55 ID string
56 // Ctx is the context for the entire conversation.
57 Ctx context.Context
58 // Service is the LLM service to use.
59 Service llm.Service
60 // Tools are the tools available during the conversation.
61 Tools []*llm.Tool
62 // SystemPrompt is the system prompt for the conversation.
63 SystemPrompt string
64 // PromptCaching indicates whether to use Anthropic's prompt caching.
65 // See https://docs.anthropic.com/en/docs/build-with-claude/prompt-caching#continuing-a-multi-turn-conversation
66 // for the documentation. At request send time, we set the cache_control field on the
67 // last message. We also cache the system prompt.
68 // Default: true.
69 PromptCaching bool
70 // ToolUseOnly indicates whether Claude may only use tools during this conversation.
71 // TODO: add more fine-grained control over tool use?
72 ToolUseOnly bool
73 // Parent is the parent conversation, if any.
74 // It is non-nil for "subagent" calls.
75 // It is set automatically when calling SubConvo,
76 // and usually should not be set manually.
77 Parent *Convo
78 // Budget is the budget for this conversation (and all sub-conversations).
79 // The Conversation DOES NOT automatically enforce the budget.
80 // It is up to the caller to call OverBudget() as appropriate.
81 Budget Budget
82 // Hidden indicates that the output of this conversation should be hidden in the UI.
83 // This is useful for subconversations that can generate noisy, uninteresting output.
84 Hidden bool
85 // ExtraData is extra data to make available to all tool calls.
86 ExtraData map[string]any
87
88 // messages tracks the messages so far in the conversation.
89 messages []llm.Message
90
91 // Listener receives messages being sent.
92 Listener Listener
93
94 toolUseCancelMu sync.Mutex
95 toolUseCancel map[string]context.CancelCauseFunc
96
97 // Protects usage. This is used for subconversations (that share part of CumulativeUsage) as well.
98 mu *sync.Mutex
99 // usage tracks usage for this conversation and all sub-conversations.
100 usage *CumulativeUsage
101 // lastUsage tracks the usage from the most recent API call
102 lastUsage llm.Usage
103}
104
105// newConvoID generates a new 8-byte random id.
106// The uniqueness/collision requirements here are very low.
107// They are not global identifiers,
108// just enough to distinguish different convos in a single session.
109func newConvoID() string {
110 u1 := rand.Uint32()
111 s := crock32.Encode(uint64(u1))
112 if len(s) < 7 {
113 s += strings.Repeat("0", 7-len(s))
114 }
115 return s[:3] + "-" + s[3:]
116}
117
118// New creates a new conversation with Claude with sensible defaults.
119// ctx is the context for the entire conversation.
120func New(ctx context.Context, srv llm.Service, usage *CumulativeUsage) *Convo {
121 id := newConvoID()
122 if usage == nil {
123 usage = newUsage()
124 }
125 return &Convo{
126 Ctx: skribe.ContextWithAttr(ctx, slog.String("convo_id", id)),
127 Service: srv,
128 PromptCaching: true,
129 usage: usage,
130 Listener: &NoopListener{},
131 ID: id,
132 toolUseCancel: map[string]context.CancelCauseFunc{},
133 mu: &sync.Mutex{},
134 }
135}
136
137// SubConvo creates a sub-conversation with the same configuration as the parent conversation.
138// (This propagates context for cancellation, HTTP client, API key, etc.)
139// The sub-conversation shares no messages with the parent conversation.
140// It does not inherit tools from the parent conversation.
141func (c *Convo) SubConvo() *Convo {
142 id := newConvoID()
143 return &Convo{
144 Ctx: skribe.ContextWithAttr(c.Ctx, slog.String("convo_id", id), slog.String("parent_convo_id", c.ID)),
145 Service: c.Service,
146 PromptCaching: c.PromptCaching,
147 Parent: c,
148 // For convenience, sub-convo usage shares tool uses map with parent,
149 // all other fields separate, propagated in AddResponse
150 usage: newUsageWithSharedToolUses(c.usage),
151 mu: c.mu,
152 Listener: c.Listener,
153 ID: id,
154 toolUseCancel: map[string]context.CancelCauseFunc{},
155 // Do not copy Budget. Each budget is independent,
156 // and OverBudget checks whether any ancestor is over budget.
157 }
158}
159
160func (c *Convo) SubConvoWithHistory() *Convo {
161 id := newConvoID()
162 return &Convo{
163 Ctx: skribe.ContextWithAttr(c.Ctx, slog.String("convo_id", id), slog.String("parent_convo_id", c.ID)),
164 Service: c.Service,
165 PromptCaching: c.PromptCaching,
166 Parent: c,
167 // For convenience, sub-convo usage shares tool uses map with parent,
168 // all other fields separate, propagated in AddResponse
169 usage: newUsageWithSharedToolUses(c.usage),
170 mu: c.mu,
171 Listener: c.Listener,
172 ID: id,
173 // Do not copy Budget. Each budget is independent,
174 // and OverBudget checks whether any ancestor is over budget.
175 messages: slices.Clone(c.messages),
176 }
177}
178
179// Depth reports how many "sub-conversations" deep this conversation is.
180// That it, it walks up parents until it finds a root.
181func (c *Convo) Depth() int {
182 x := c
183 var depth int
184 for x.Parent != nil {
185 x = x.Parent
186 depth++
187 }
188 return depth
189}
190
191// SendUserTextMessage sends a text message to the LLM in this conversation.
192// otherContents contains additional contents to send with the message, usually tool results.
193func (c *Convo) SendUserTextMessage(s string, otherContents ...llm.Content) (*llm.Response, error) {
194 contents := slices.Clone(otherContents)
195 if s != "" {
196 contents = append(contents, llm.Content{Type: llm.ContentTypeText, Text: s})
197 }
198 msg := llm.Message{
199 Role: llm.MessageRoleUser,
200 Content: contents,
201 }
202 return c.SendMessage(msg)
203}
204
205func (c *Convo) messageRequest(msg llm.Message) *llm.Request {
206 system := []llm.SystemContent{}
207 if c.SystemPrompt != "" {
208 d := llm.SystemContent{Type: "text", Text: c.SystemPrompt}
209 if c.PromptCaching {
210 d.Cache = true
211 }
212 system = []llm.SystemContent{d}
213 }
214
215 // Claude is happy to return an empty response in response to our Done() call,
216 // and, if so, you'll see something like:
217 // API request failed with status 400 Bad Request
218 // {"type":"error","error": {"type":"invalid_request_error",
219 // "message":"messages.5: all messages must have non-empty content except for the optional final assistant message"}}
220 // So, we filter out those empty messages.
221 var nonEmptyMessages []llm.Message
222 for _, m := range c.messages {
223 if len(m.Content) > 0 {
224 nonEmptyMessages = append(nonEmptyMessages, m)
225 }
226 }
227
228 // Also validate the new message being sent - don't add it if empty
229 messagesToSend := nonEmptyMessages
230 if len(msg.Content) > 0 {
231 messagesToSend = append(messagesToSend, msg)
232 }
233
234 mr := &llm.Request{
235 Messages: messagesToSend,
236 System: system,
237 Tools: c.Tools,
238 }
239 if c.ToolUseOnly {
240 mr.ToolChoice = &llm.ToolChoice{Type: llm.ToolChoiceTypeAny}
241 }
242 return mr
243}
244
245func (c *Convo) findTool(name string) (*llm.Tool, error) {
246 for _, tool := range c.Tools {
247 if tool.Name == name {
248 return tool, nil
249 }
250 }
251 return nil, fmt.Errorf("tool %q not found", name)
252}
253
254// insertMissingToolResults adds error results for tool uses that were requested
255// but not included in the message, which can happen in error paths like "out of budget."
256// We only insert these if there were no tool responses at all, since an incorrect
257// number of tool results would be a programmer error. Mutates inputs.
258func (c *Convo) insertMissingToolResults(mr *llm.Request, msg *llm.Message) {
259 if len(mr.Messages) < 2 {
260 return
261 }
262 prev := mr.Messages[len(mr.Messages)-2]
263 var toolUsePrev int
264 for _, c := range prev.Content {
265 if c.Type == llm.ContentTypeToolUse {
266 toolUsePrev++
267 }
268 }
269 if toolUsePrev == 0 {
270 return
271 }
272 var toolUseCurrent int
273 for _, c := range msg.Content {
274 if c.Type == llm.ContentTypeToolResult {
275 toolUseCurrent++
276 }
277 }
278 if toolUseCurrent != 0 {
279 return
280 }
281 var prefix []llm.Content
282 for _, part := range prev.Content {
283 if part.Type != llm.ContentTypeToolUse {
284 continue
285 }
286 content := llm.Content{
287 Type: llm.ContentTypeToolResult,
288 ToolUseID: part.ID,
289 ToolError: true,
290 ToolResult: []llm.Content{{
291 Type: llm.ContentTypeText,
292 Text: "not executed; retry possible",
293 }},
294 }
295 prefix = append(prefix, content)
296 }
297 msg.Content = append(prefix, msg.Content...)
298 mr.Messages[len(mr.Messages)-1].Content = msg.Content
299 slog.DebugContext(c.Ctx, "inserted missing tool results")
300}
301
302// SendMessage sends a message to Claude.
303// The conversation records (internally) all messages succesfully sent and received.
304func (c *Convo) SendMessage(msg llm.Message) (*llm.Response, error) {
305 id := ulid.Make().String()
306 mr := c.messageRequest(msg)
307 var lastMessage *llm.Message
308 if c.PromptCaching {
309 lastMessage = &mr.Messages[len(mr.Messages)-1]
310 if len(lastMessage.Content) > 0 {
311 lastMessage.Content[len(lastMessage.Content)-1].Cache = true
312 }
313 }
314 defer func() {
315 if lastMessage == nil {
316 return
317 }
318 if len(lastMessage.Content) > 0 {
319 lastMessage.Content[len(lastMessage.Content)-1].Cache = false
320 }
321 }()
322 c.insertMissingToolResults(mr, &msg)
323 c.Listener.OnRequest(c.Ctx, c, id, &msg)
324
325 startTime := time.Now()
326 resp, err := c.Service.Do(c.Ctx, mr)
327 if resp != nil {
328 resp.StartTime = &startTime
329 endTime := time.Now()
330 resp.EndTime = &endTime
331 }
332
333 if err != nil {
334 c.Listener.OnResponse(c.Ctx, c, id, nil)
335 return nil, err
336 }
337 c.messages = append(c.messages, msg, resp.ToMessage())
338 // Propagate usage to all ancestors (including us).
339 for x := c; x != nil; x = x.Parent {
340 x.usage.Add(resp.Usage)
341 // Store the most recent usage (only on the current conversation, not ancestors)
342 if x == c {
343 x.lastUsage = resp.Usage
344 }
345 }
346 c.Listener.OnResponse(c.Ctx, c, id, resp)
347 return resp, err
348}
349
350type toolCallInfoKeyType string
351
352var toolCallInfoKey toolCallInfoKeyType
353
354type ToolCallInfo struct {
355 ToolUseID string
356}
357
358func ToolCallInfoFromContext(ctx context.Context) ToolCallInfo {
359 v := ctx.Value(toolCallInfoKey)
360 i, _ := v.(ToolCallInfo)
361 return i
362}
363
364func (c *Convo) ToolResultCancelContents(resp *llm.Response) ([]llm.Content, error) {
365 if resp.StopReason != llm.StopReasonToolUse {
366 return nil, nil
367 }
368 var toolResults []llm.Content
369
370 for _, part := range resp.Content {
371 if part.Type != llm.ContentTypeToolUse {
372 continue
373 }
374 c.incrementToolUse(part.ToolName)
375
376 content := llm.Content{
377 Type: llm.ContentTypeToolResult,
378 ToolUseID: part.ID,
379 }
380
381 content.ToolError = true
382 content.ToolResult = []llm.Content{{
383 Type: llm.ContentTypeText,
384 Text: "user canceled this tool_use",
385 }}
386 toolResults = append(toolResults, content)
387 }
388 return toolResults, nil
389}
390
391// GetID returns the conversation ID
392func (c *Convo) GetID() string {
393 return c.ID
394}
395
396func (c *Convo) CancelToolUse(toolUseID string, err error) error {
397 c.toolUseCancelMu.Lock()
398 defer c.toolUseCancelMu.Unlock()
399 cancel, ok := c.toolUseCancel[toolUseID]
400 if !ok {
401 return fmt.Errorf("cannot cancel %s: no cancel function registered for this tool_use_id. All I have is %+v", toolUseID, c.toolUseCancel)
402 }
403 delete(c.toolUseCancel, toolUseID)
404 cancel(err)
405 return nil
406}
407
408func (c *Convo) newToolUseContext(ctx context.Context, toolUseID string) (context.Context, context.CancelFunc) {
409 c.toolUseCancelMu.Lock()
410 defer c.toolUseCancelMu.Unlock()
411 ctx, cancel := context.WithCancelCause(ctx)
412 c.toolUseCancel[toolUseID] = cancel
413 return ctx, func() { c.CancelToolUse(toolUseID, nil) }
414}
415
416// ToolResultContents runs all tool uses requested by the response and returns their results.
417// Cancelling ctx will cancel any running tool calls.
418// The boolean return value indicates whether any of the executed tools should end the turn.
419func (c *Convo) ToolResultContents(ctx context.Context, resp *llm.Response) ([]llm.Content, bool, error) {
420 if resp.StopReason != llm.StopReasonToolUse {
421 return nil, false, nil
422 }
423 // Extract all tool calls from the response, call the tools, and gather the results.
424 var wg sync.WaitGroup
425 toolResultC := make(chan llm.Content, len(resp.Content))
426
427 endsTurn := false
428 for _, part := range resp.Content {
429 if part.Type != llm.ContentTypeToolUse {
430 continue
431 }
432 tool, err := c.findTool(part.ToolName)
433 if err == nil && tool.EndsTurn {
434 endsTurn = true
435 }
436 c.incrementToolUse(part.ToolName)
437 startTime := time.Now()
438
439 c.Listener.OnToolCall(ctx, c, part.ID, part.ToolName, part.ToolInput, llm.Content{
440 Type: llm.ContentTypeToolUse,
441 ToolUseID: part.ID,
442 ToolUseStartTime: &startTime,
443 })
444
445 wg.Add(1)
446 go func() {
447 defer wg.Done()
448
449 content := llm.Content{
450 Type: llm.ContentTypeToolResult,
451 ToolUseID: part.ID,
452 ToolUseStartTime: &startTime,
453 }
454 sendErr := func(err error) {
455 // Record end time
456 endTime := time.Now()
457 content.ToolUseEndTime = &endTime
458
459 content.ToolError = true
460 content.ToolResult = []llm.Content{{
461 Type: llm.ContentTypeText,
462 Text: err.Error(),
463 }}
464 c.Listener.OnToolResult(ctx, c, part.ID, part.ToolName, part.ToolInput, content, nil, err)
465 toolResultC <- content
466 }
467 sendRes := func(toolOut llm.ToolOut) {
468 // Record end time
469 endTime := time.Now()
470 content.ToolUseEndTime = &endTime
471
472 content.ToolResult = toolOut.LLMContent
473 content.Display = toolOut.Display
474 var firstText string
475 if len(toolOut.LLMContent) > 0 {
476 firstText = toolOut.LLMContent[0].Text
477 }
478 c.Listener.OnToolResult(ctx, c, part.ID, part.ToolName, part.ToolInput, content, &firstText, nil)
479 toolResultC <- content
480 }
481
482 tool, err := c.findTool(part.ToolName)
483 if err != nil {
484 sendErr(err)
485 return
486 }
487 // Create a new context for just this tool_use call, and register its
488 // cancel function so that it can be canceled individually.
489 toolUseCtx, cancel := c.newToolUseContext(ctx, part.ID)
490 defer cancel()
491 // TODO: move this into newToolUseContext?
492 toolUseCtx = context.WithValue(toolUseCtx, toolCallInfoKey, ToolCallInfo{ToolUseID: part.ID})
493 toolOut := tool.Run(toolUseCtx, part.ToolInput)
494 if errors.Is(toolOut.Error, ErrDoNotRespond) {
495 return
496 }
497 if toolUseCtx.Err() != nil {
498 sendErr(context.Cause(toolUseCtx))
499 return
500 }
501
502 if toolOut.Error != nil {
503 sendErr(toolOut.Error)
504 return
505 }
506 sendRes(toolOut)
507 }()
508 }
509 wg.Wait()
510 close(toolResultC)
511 var toolResults []llm.Content
512 for toolResult := range toolResultC {
513 toolResults = append(toolResults, toolResult)
514 }
515 if ctx.Err() != nil {
516 return nil, false, ctx.Err()
517 }
518 return toolResults, endsTurn, nil
519}
520
521func (c *Convo) incrementToolUse(name string) {
522 c.mu.Lock()
523 defer c.mu.Unlock()
524
525 c.usage.ToolUses[name]++
526}
527
528// CumulativeUsage represents cumulative usage across a Convo, including all sub-conversations.
529type CumulativeUsage struct {
530 StartTime time.Time `json:"start_time"`
531 Responses uint64 `json:"messages"` // count of responses
532 InputTokens uint64 `json:"input_tokens"`
533 OutputTokens uint64 `json:"output_tokens"`
534 CacheReadInputTokens uint64 `json:"cache_read_input_tokens"`
535 CacheCreationInputTokens uint64 `json:"cache_creation_input_tokens"`
536 TotalCostUSD float64 `json:"total_cost_usd"`
537 ToolUses map[string]int `json:"tool_uses"` // tool name -> number of uses
538}
539
540func newUsage() *CumulativeUsage {
541 return &CumulativeUsage{ToolUses: make(map[string]int), StartTime: time.Now()}
542}
543
544func newUsageWithSharedToolUses(parent *CumulativeUsage) *CumulativeUsage {
545 return &CumulativeUsage{ToolUses: parent.ToolUses, StartTime: time.Now()}
546}
547
548func (u *CumulativeUsage) Clone() CumulativeUsage {
549 v := *u
550 v.ToolUses = maps.Clone(u.ToolUses)
551 return v
552}
553
554func (c *Convo) CumulativeUsage() CumulativeUsage {
555 if c == nil {
556 return CumulativeUsage{}
557 }
558 c.mu.Lock()
559 defer c.mu.Unlock()
560 return c.usage.Clone()
561}
562
563// LastUsage returns the usage from the most recent API call
564func (c *Convo) LastUsage() llm.Usage {
565 if c == nil {
566 return llm.Usage{}
567 }
568 c.mu.Lock()
569 defer c.mu.Unlock()
570 return c.lastUsage
571}
572
573func (u *CumulativeUsage) WallTime() time.Duration {
574 return time.Since(u.StartTime)
575}
576
577func (u *CumulativeUsage) DollarsPerHour() float64 {
578 hours := u.WallTime().Hours()
579 // Prevent division by very small numbers that could cause issues
580 if hours < 1e-6 {
581 return 0
582 }
583 return u.TotalCostUSD / hours
584}
585
586func (u *CumulativeUsage) Add(usage llm.Usage) {
587 u.Responses++
588 u.InputTokens += usage.InputTokens
589 u.OutputTokens += usage.OutputTokens
590 u.CacheReadInputTokens += usage.CacheReadInputTokens
591 u.CacheCreationInputTokens += usage.CacheCreationInputTokens
592 u.TotalCostUSD += usage.CostUSD
593}
594
595// TotalInputTokens returns the grand total cumulative input tokens in u.
596func (u *CumulativeUsage) TotalInputTokens() uint64 {
597 return u.InputTokens + u.CacheReadInputTokens + u.CacheCreationInputTokens
598}
599
600// Attr returns the cumulative usage as a slog.Attr with key "usage".
601func (u CumulativeUsage) Attr() slog.Attr {
602 elapsed := time.Since(u.StartTime)
603 return slog.Group("usage",
604 slog.Duration("wall_time", elapsed),
605 slog.Uint64("responses", u.Responses),
606 slog.Uint64("input_tokens", u.InputTokens),
607 slog.Uint64("output_tokens", u.OutputTokens),
608 slog.Uint64("cache_read_input_tokens", u.CacheReadInputTokens),
609 slog.Uint64("cache_creation_input_tokens", u.CacheCreationInputTokens),
610 slog.Float64("total_cost_usd", u.TotalCostUSD),
611 slog.Float64("dollars_per_hour", u.TotalCostUSD/elapsed.Hours()),
612 slog.Any("tool_uses", maps.Clone(u.ToolUses)),
613 )
614}
615
616// A Budget represents the maximum amount of resources that may be spent on a conversation.
617// Note that the default (zero) budget is unlimited.
618type Budget struct {
619 MaxDollars float64 // if > 0, max dollars that may be spent
620}
621
622// OverBudget returns an error if the convo (or any of its parents) has exceeded its budget.
623// TODO: document parent vs sub budgets, multiple errors, etc, once we know the desired behavior.
624func (c *Convo) OverBudget() error {
625 for x := c; x != nil; x = x.Parent {
626 if err := x.overBudget(); err != nil {
627 return err
628 }
629 }
630 return nil
631}
632
633// ResetBudget sets the budget to the passed in budget and
634// adjusts it by what's been used so far.
635func (c *Convo) ResetBudget(budget Budget) {
636 c.Budget = budget
637 if c.Budget.MaxDollars > 0 {
638 c.Budget.MaxDollars += c.CumulativeUsage().TotalCostUSD
639 }
640}
641
642func (c *Convo) overBudget() error {
643 usage := c.CumulativeUsage()
644 // TODO: stop before we exceed the budget instead of after?
645 var err error
646 cont := "Continuing to chat will reset the budget."
647 if c.Budget.MaxDollars > 0 && usage.TotalCostUSD >= c.Budget.MaxDollars {
648 err = errors.Join(err, fmt.Errorf("$%.2f spent, budget is $%.2f. %s", usage.TotalCostUSD, c.Budget.MaxDollars, cont))
649 }
650 return err
651}
652
653// DebugJSON returns the conversation history as JSON for debugging purposes.
654func (c *Convo) DebugJSON() ([]byte, error) {
655 return json.MarshalIndent(c.messages, "", " ")
656}