diff --git a/internal/llm/agent/agent.go b/internal/llm/agent/agent.go index 44efba31835aa4d68a79538fd637f1eff43cbb3e..dfc8cbc3f2d0030d0ba0df2f9d33fe75cbb5599c 100644 --- a/internal/llm/agent/agent.go +++ b/internal/llm/agent/agent.go @@ -26,6 +26,8 @@ import ( "github.com/charmbracelet/crush/internal/shell" ) +const streamChunkTimeout = 80 * time.Second + type AgentEventType string const ( @@ -553,16 +555,25 @@ func (a *agent) streamAndHandleEvents(ctx context.Context, sessionID string, msg ctx = context.WithValue(ctx, tools.MessageIDContextKey, assistantMsg.ID) // Process each event in the stream. - for event := range eventChan { - if processErr := a.processEvent(ctx, sessionID, &assistantMsg, event); processErr != nil { - if errors.Is(processErr, context.Canceled) { - a.finishMessage(context.Background(), &assistantMsg, message.FinishReasonCanceled, "Request cancelled", "") - } else { - a.finishMessage(ctx, &assistantMsg, message.FinishReasonError, "API Error", processErr.Error()) +loop: + for { + select { + case event, ok := <-eventChan: + if !ok { + break loop } - return assistantMsg, nil, processErr - } - if ctx.Err() != nil { + if processErr := a.processEvent(ctx, sessionID, &assistantMsg, event); processErr != nil { + if errors.Is(processErr, context.Canceled) { + a.finishMessage(context.Background(), &assistantMsg, message.FinishReasonCanceled, "Request cancelled", "") + } else { + a.finishMessage(ctx, &assistantMsg, message.FinishReasonError, "API Error", processErr.Error()) + } + return assistantMsg, nil, processErr + } + case <-time.After(streamChunkTimeout): + a.finishMessage(ctx, &assistantMsg, message.FinishReasonError, "Stream timeout", "No chunk received within timeout") + return assistantMsg, nil, fmt.Errorf("stream chunk timeout") + case <-ctx.Done(): a.finishMessage(context.Background(), &assistantMsg, message.FinishReasonCanceled, "Request cancelled", "") return assistantMsg, nil, ctx.Err() }