events.go

  1package server
  2
  3import (
  4	"encoding/json"
  5	"errors"
  6	"fmt"
  7	"log/slog"
  8
  9	"github.com/charmbracelet/crush/internal/agent/notify"
 10	"github.com/charmbracelet/crush/internal/agent/tools/mcp"
 11	"github.com/charmbracelet/crush/internal/app"
 12	"github.com/charmbracelet/crush/internal/backend"
 13	"github.com/charmbracelet/crush/internal/history"
 14	"github.com/charmbracelet/crush/internal/message"
 15	"github.com/charmbracelet/crush/internal/permission"
 16	"github.com/charmbracelet/crush/internal/proto"
 17	"github.com/charmbracelet/crush/internal/pubsub"
 18	"github.com/charmbracelet/crush/internal/session"
 19	"github.com/charmbracelet/crush/internal/skills"
 20)
 21
 22// wrapEvent converts a raw tea.Msg (a pubsub.Event[T] from the app
 23// event fan-in) into a pubsub.Payload envelope with the correct
 24// PayloadType discriminator and a proto-typed inner payload that has
 25// proper JSON tags. Returns nil if the event type is unrecognized.
 26func wrapEvent(ev any) *pubsub.Payload {
 27	switch e := ev.(type) {
 28	case pubsub.Event[app.LSPEvent]:
 29		return envelope(pubsub.PayloadTypeLSPEvent, pubsub.Event[proto.LSPEvent]{
 30			Type: e.Type,
 31			Payload: proto.LSPEvent{
 32				Type:            proto.LSPEventType(e.Payload.Type),
 33				Name:            e.Payload.Name,
 34				State:           e.Payload.State,
 35				Error:           e.Payload.Error,
 36				DiagnosticCount: e.Payload.DiagnosticCount,
 37			},
 38		})
 39	case pubsub.Event[mcp.Event]:
 40		return envelope(pubsub.PayloadTypeMCPEvent, pubsub.Event[proto.MCPEvent]{
 41			Type: e.Type,
 42			Payload: proto.MCPEvent{
 43				Type:      mcpEventTypeToProto(e.Payload.Type),
 44				Name:      e.Payload.Name,
 45				State:     proto.MCPState(e.Payload.State),
 46				Error:     e.Payload.Error,
 47				ToolCount: e.Payload.Counts.Tools,
 48			},
 49		})
 50	case pubsub.Event[permission.PermissionRequest]:
 51		return envelope(pubsub.PayloadTypePermissionRequest, pubsub.Event[proto.PermissionRequest]{
 52			Type: e.Type,
 53			Payload: proto.PermissionRequest{
 54				ID:          e.Payload.ID,
 55				SessionID:   e.Payload.SessionID,
 56				ToolCallID:  e.Payload.ToolCallID,
 57				ToolName:    e.Payload.ToolName,
 58				Description: e.Payload.Description,
 59				Action:      e.Payload.Action,
 60				Path:        e.Payload.Path,
 61				Params:      e.Payload.Params,
 62			},
 63		})
 64	case pubsub.Event[permission.PermissionNotification]:
 65		return envelope(pubsub.PayloadTypePermissionNotification, pubsub.Event[proto.PermissionNotification]{
 66			Type: e.Type,
 67			Payload: proto.PermissionNotification{
 68				ToolCallID: e.Payload.ToolCallID,
 69				Granted:    e.Payload.Granted,
 70				Denied:     e.Payload.Denied,
 71			},
 72		})
 73	case pubsub.Event[message.Message]:
 74		return envelope(pubsub.PayloadTypeMessage, pubsub.Event[proto.Message]{
 75			Type:    e.Type,
 76			Payload: messageToProto(e.Payload),
 77		})
 78	case pubsub.Event[session.Session]:
 79		return envelope(pubsub.PayloadTypeSession, pubsub.Event[proto.Session]{
 80			Type:    e.Type,
 81			Payload: sessionToProto(e.Payload),
 82		})
 83	case pubsub.Event[history.File]:
 84		return envelope(pubsub.PayloadTypeFile, pubsub.Event[proto.File]{
 85			Type:    e.Type,
 86			Payload: fileToProto(e.Payload),
 87		})
 88	case pubsub.Event[notify.Notification]:
 89		payload := proto.AgentEvent{
 90			SessionID:    e.Payload.SessionID,
 91			SessionTitle: e.Payload.SessionTitle,
 92			RunID:        e.Payload.RunID,
 93			Type:         proto.AgentEventType(e.Payload.Type),
 94		}
 95		if e.Payload.Type == notify.TypeAgentError {
 96			payload.Type = proto.AgentEventTypeError
 97			payload.Error = errors.New(e.Payload.Message)
 98		}
 99		return envelope(pubsub.PayloadTypeAgentEvent, pubsub.Event[proto.AgentEvent]{
100			Type:    e.Type,
101			Payload: payload,
102		})
103	case pubsub.Event[notify.RunComplete]:
104		return envelope(pubsub.PayloadTypeRunComplete, pubsub.Event[proto.RunComplete]{
105			Type: e.Type,
106			Payload: proto.RunComplete{
107				SessionID: e.Payload.SessionID,
108				RunID:     e.Payload.RunID,
109				MessageID: e.Payload.MessageID,
110				Text:      e.Payload.Text,
111				Error:     e.Payload.Error,
112				Cancelled: e.Payload.Cancelled,
113			},
114		})
115	case pubsub.Event[proto.ConfigChanged]:
116		return envelope(pubsub.PayloadTypeConfigChanged, e)
117	case pubsub.Event[skills.Event]:
118		return envelope(pubsub.PayloadTypeSkillsEvent, pubsub.Event[proto.SkillsEvent]{
119			Type:    e.Type,
120			Payload: skillsEventToProto(e.Payload),
121		})
122	default:
123		slog.Warn("Unrecognized event type for SSE wrapping", "type", fmt.Sprintf("%T", ev))
124		return nil
125	}
126}
127
128// envelope marshals the inner event and wraps it in a pubsub.Payload.
129func envelope(payloadType pubsub.PayloadType, inner any) *pubsub.Payload {
130	raw, err := json.Marshal(inner)
131	if err != nil {
132		slog.Error("Failed to marshal event payload", "error", err)
133		return nil
134	}
135	return &pubsub.Payload{
136		Type:    payloadType,
137		Payload: raw,
138	}
139}
140
141func mcpEventTypeToProto(t mcp.EventType) proto.MCPEventType {
142	switch t {
143	case mcp.EventStateChanged:
144		return proto.MCPEventStateChanged
145	case mcp.EventToolsListChanged:
146		return proto.MCPEventToolsListChanged
147	case mcp.EventPromptsListChanged:
148		return proto.MCPEventPromptsListChanged
149	case mcp.EventResourcesListChanged:
150		return proto.MCPEventResourcesListChanged
151	default:
152		return proto.MCPEventStateChanged
153	}
154}
155
156func sessionToProto(s session.Session) proto.Session {
157	return proto.Session{
158		ID:               s.ID,
159		ParentSessionID:  s.ParentSessionID,
160		Title:            s.Title,
161		SummaryMessageID: s.SummaryMessageID,
162		MessageCount:     s.MessageCount,
163		PromptTokens:     s.PromptTokens,
164		CompletionTokens: s.CompletionTokens,
165		Cost:             s.Cost,
166		Todos:            todosToProto(s.Todos),
167		CreatedAt:        s.CreatedAt,
168		UpdatedAt:        s.UpdatedAt,
169	}
170}
171
172// isSessionBusy reports whether the given workspace has an in-flight
173// agent run for sessionID. It tolerates a nil workspace (treating it as
174// "not busy") so REST handlers can pass GetWorkspace's result through
175// unconditionally — the workspace lookup error is already surfaced by
176// the prior ListSessions/GetSession call when relevant.
177func isSessionBusy(ws *backend.Workspace, sessionID string) bool {
178	if ws == nil || ws.App == nil || ws.AgentCoordinator == nil {
179		return false
180	}
181	return ws.AgentCoordinator.IsSessionBusy(sessionID)
182}
183
184// attachedClients returns the number of clients currently viewing
185// sessionID in ws. Hold-only clients (streams == 0) do not contribute.
186// A nil workspace is treated as zero so handlers can pass GetWorkspace's
187// result through without an extra guard.
188func attachedClients(ws *backend.Workspace, sessionID string) int {
189	if ws == nil {
190		return 0
191	}
192	return ws.AttachedClientsForSession(sessionID)
193}
194
195func todosToProto(todos []session.Todo) []proto.Todo {
196	if len(todos) == 0 {
197		return nil
198	}
199	out := make([]proto.Todo, len(todos))
200	for i, t := range todos {
201		out[i] = proto.Todo{
202			Content:    t.Content,
203			Status:     string(t.Status),
204			ActiveForm: t.ActiveForm,
205		}
206	}
207	return out
208}
209
210func fileToProto(f history.File) proto.File {
211	return proto.File{
212		ID:        f.ID,
213		SessionID: f.SessionID,
214		Path:      f.Path,
215		Content:   f.Content,
216		Version:   f.Version,
217		CreatedAt: f.CreatedAt,
218		UpdatedAt: f.UpdatedAt,
219	}
220}
221
222func messageToProto(m message.Message) proto.Message {
223	msg := proto.Message{
224		ID:        m.ID,
225		SessionID: m.SessionID,
226		Role:      proto.MessageRole(m.Role),
227		Model:     m.Model,
228		Provider:  m.Provider,
229		CreatedAt: m.CreatedAt,
230		UpdatedAt: m.UpdatedAt,
231	}
232
233	for _, p := range m.Parts {
234		switch v := p.(type) {
235		case message.TextContent:
236			msg.Parts = append(msg.Parts, proto.TextContent{Text: v.Text})
237		case message.ReasoningContent:
238			msg.Parts = append(msg.Parts, proto.ReasoningContent{
239				Thinking:   v.Thinking,
240				Signature:  v.Signature,
241				StartedAt:  v.StartedAt,
242				FinishedAt: v.FinishedAt,
243			})
244		case message.ToolCall:
245			msg.Parts = append(msg.Parts, proto.ToolCall{
246				ID:       v.ID,
247				Name:     v.Name,
248				Input:    v.Input,
249				Finished: v.Finished,
250			})
251		case message.ToolResult:
252			msg.Parts = append(msg.Parts, proto.ToolResult{
253				ToolCallID: v.ToolCallID,
254				Name:       v.Name,
255				Content:    v.Content,
256				Data:       v.Data,
257				MIMEType:   v.MIMEType,
258				Metadata:   v.Metadata,
259				IsError:    v.IsError,
260			})
261		case message.Finish:
262			msg.Parts = append(msg.Parts, proto.Finish{
263				Reason:  proto.FinishReason(v.Reason),
264				Time:    v.Time,
265				Message: v.Message,
266				Details: v.Details,
267			})
268		case message.ImageURLContent:
269			msg.Parts = append(msg.Parts, proto.ImageURLContent{URL: v.URL, Detail: v.Detail})
270		case message.BinaryContent:
271			msg.Parts = append(msg.Parts, proto.BinaryContent{Path: v.Path, MIMEType: v.MIMEType, Data: v.Data})
272		}
273	}
274
275	return msg
276}
277
278// skillsEventToProto converts a skills.Event into its wire form. Errors
279// are flattened to strings because error does not round-trip over JSON.
280func skillsEventToProto(e skills.Event) proto.SkillsEvent {
281	if len(e.States) == 0 {
282		return proto.SkillsEvent{}
283	}
284	out := proto.SkillsEvent{States: make([]proto.SkillState, len(e.States))}
285	for i, s := range e.States {
286		entry := proto.SkillState{
287			Name:  s.Name,
288			Path:  s.Path,
289			State: proto.SkillDiscoveryState(s.State),
290		}
291		if s.Err != nil {
292			entry.Error = s.Err.Error()
293		}
294		out.States[i] = entry
295	}
296	return out
297}
298
299func messagesToProto(msgs []message.Message) []proto.Message {
300	out := make([]proto.Message, len(msgs))
301	for i, m := range msgs {
302		out[i] = messageToProto(m)
303	}
304	return out
305}