@@ -26,6 +26,8 @@ import (
"github.com/charmbracelet/crush/internal/shell"
)
+const streamChunkTimeout = 80 * time.Second
+
type AgentEventType string
const (
@@ -553,16 +555,25 @@ func (a *agent) streamAndHandleEvents(ctx context.Context, sessionID string, msg
ctx = context.WithValue(ctx, tools.MessageIDContextKey, assistantMsg.ID)
// Process each event in the stream.
- for event := range eventChan {
- if processErr := a.processEvent(ctx, sessionID, &assistantMsg, event); processErr != nil {
- if errors.Is(processErr, context.Canceled) {
- a.finishMessage(context.Background(), &assistantMsg, message.FinishReasonCanceled, "Request cancelled", "")
- } else {
- a.finishMessage(ctx, &assistantMsg, message.FinishReasonError, "API Error", processErr.Error())
+loop:
+ for {
+ select {
+ case event, ok := <-eventChan:
+ if !ok {
+ break loop
}
- return assistantMsg, nil, processErr
- }
- if ctx.Err() != nil {
+ if processErr := a.processEvent(ctx, sessionID, &assistantMsg, event); processErr != nil {
+ if errors.Is(processErr, context.Canceled) {
+ a.finishMessage(context.Background(), &assistantMsg, message.FinishReasonCanceled, "Request cancelled", "")
+ } else {
+ a.finishMessage(ctx, &assistantMsg, message.FinishReasonError, "API Error", processErr.Error())
+ }
+ return assistantMsg, nil, processErr
+ }
+ case <-time.After(streamChunkTimeout):
+ a.finishMessage(ctx, &assistantMsg, message.FinishReasonError, "Stream timeout", "No chunk received within timeout")
+ return assistantMsg, nil, fmt.Errorf("stream chunk timeout")
+ case <-ctx.Done():
a.finishMessage(context.Background(), &assistantMsg, message.FinishReasonCanceled, "Request cancelled", "")
return assistantMsg, nil, ctx.Err()
}