events.go

  1package server
  2
  3import (
  4	"encoding/json"
  5	"fmt"
  6	"log/slog"
  7
  8	"github.com/charmbracelet/crush/internal/agent/notify"
  9	"github.com/charmbracelet/crush/internal/agent/tools/mcp"
 10	"github.com/charmbracelet/crush/internal/app"
 11	"github.com/charmbracelet/crush/internal/backend"
 12	"github.com/charmbracelet/crush/internal/history"
 13	"github.com/charmbracelet/crush/internal/message"
 14	"github.com/charmbracelet/crush/internal/permission"
 15	"github.com/charmbracelet/crush/internal/proto"
 16	"github.com/charmbracelet/crush/internal/pubsub"
 17	"github.com/charmbracelet/crush/internal/session"
 18	"github.com/charmbracelet/crush/internal/skills"
 19)
 20
 21// wrapEvent converts a raw tea.Msg (a pubsub.Event[T] from the app
 22// event fan-in) into a pubsub.Payload envelope with the correct
 23// PayloadType discriminator and a proto-typed inner payload that has
 24// proper JSON tags. Returns nil if the event type is unrecognized.
 25func wrapEvent(ev any) *pubsub.Payload {
 26	switch e := ev.(type) {
 27	case pubsub.Event[app.LSPEvent]:
 28		return envelope(pubsub.PayloadTypeLSPEvent, pubsub.Event[proto.LSPEvent]{
 29			Type: e.Type,
 30			Payload: proto.LSPEvent{
 31				Type:            proto.LSPEventType(e.Payload.Type),
 32				Name:            e.Payload.Name,
 33				State:           e.Payload.State,
 34				Error:           e.Payload.Error,
 35				DiagnosticCount: e.Payload.DiagnosticCount,
 36			},
 37		})
 38	case pubsub.Event[mcp.Event]:
 39		return envelope(pubsub.PayloadTypeMCPEvent, pubsub.Event[proto.MCPEvent]{
 40			Type: e.Type,
 41			Payload: proto.MCPEvent{
 42				Type:      mcpEventTypeToProto(e.Payload.Type),
 43				Name:      e.Payload.Name,
 44				State:     proto.MCPState(e.Payload.State),
 45				Error:     e.Payload.Error,
 46				ToolCount: e.Payload.Counts.Tools,
 47			},
 48		})
 49	case pubsub.Event[permission.PermissionRequest]:
 50		return envelope(pubsub.PayloadTypePermissionRequest, pubsub.Event[proto.PermissionRequest]{
 51			Type: e.Type,
 52			Payload: proto.PermissionRequest{
 53				ID:          e.Payload.ID,
 54				SessionID:   e.Payload.SessionID,
 55				ToolCallID:  e.Payload.ToolCallID,
 56				ToolName:    e.Payload.ToolName,
 57				Description: e.Payload.Description,
 58				Action:      e.Payload.Action,
 59				Path:        e.Payload.Path,
 60				Params:      e.Payload.Params,
 61			},
 62		})
 63	case pubsub.Event[permission.PermissionNotification]:
 64		return envelope(pubsub.PayloadTypePermissionNotification, pubsub.Event[proto.PermissionNotification]{
 65			Type: e.Type,
 66			Payload: proto.PermissionNotification{
 67				ToolCallID: e.Payload.ToolCallID,
 68				Granted:    e.Payload.Granted,
 69				Denied:     e.Payload.Denied,
 70			},
 71		})
 72	case pubsub.Event[message.Message]:
 73		return envelope(pubsub.PayloadTypeMessage, pubsub.Event[proto.Message]{
 74			Type:    e.Type,
 75			Payload: messageToProto(e.Payload),
 76		})
 77	case pubsub.Event[session.Session]:
 78		return envelope(pubsub.PayloadTypeSession, pubsub.Event[proto.Session]{
 79			Type:    e.Type,
 80			Payload: sessionToProto(e.Payload),
 81		})
 82	case pubsub.Event[history.File]:
 83		return envelope(pubsub.PayloadTypeFile, pubsub.Event[proto.File]{
 84			Type:    e.Type,
 85			Payload: fileToProto(e.Payload),
 86		})
 87	case pubsub.Event[notify.Notification]:
 88		return envelope(pubsub.PayloadTypeAgentEvent, pubsub.Event[proto.AgentEvent]{
 89			Type: e.Type,
 90			Payload: proto.AgentEvent{
 91				SessionID:    e.Payload.SessionID,
 92				SessionTitle: e.Payload.SessionTitle,
 93				Type:         proto.AgentEventType(e.Payload.Type),
 94			},
 95		})
 96	case pubsub.Event[notify.RunComplete]:
 97		return envelope(pubsub.PayloadTypeRunComplete, pubsub.Event[proto.RunComplete]{
 98			Type: e.Type,
 99			Payload: proto.RunComplete{
100				SessionID: e.Payload.SessionID,
101				RunID:     e.Payload.RunID,
102				MessageID: e.Payload.MessageID,
103				Text:      e.Payload.Text,
104				Error:     e.Payload.Error,
105				Cancelled: e.Payload.Cancelled,
106			},
107		})
108	case pubsub.Event[proto.ConfigChanged]:
109		return envelope(pubsub.PayloadTypeConfigChanged, e)
110	case pubsub.Event[skills.Event]:
111		return envelope(pubsub.PayloadTypeSkillsEvent, pubsub.Event[proto.SkillsEvent]{
112			Type:    e.Type,
113			Payload: skillsEventToProto(e.Payload),
114		})
115	default:
116		slog.Warn("Unrecognized event type for SSE wrapping", "type", fmt.Sprintf("%T", ev))
117		return nil
118	}
119}
120
121// envelope marshals the inner event and wraps it in a pubsub.Payload.
122func envelope(payloadType pubsub.PayloadType, inner any) *pubsub.Payload {
123	raw, err := json.Marshal(inner)
124	if err != nil {
125		slog.Error("Failed to marshal event payload", "error", err)
126		return nil
127	}
128	return &pubsub.Payload{
129		Type:    payloadType,
130		Payload: raw,
131	}
132}
133
134func mcpEventTypeToProto(t mcp.EventType) proto.MCPEventType {
135	switch t {
136	case mcp.EventStateChanged:
137		return proto.MCPEventStateChanged
138	case mcp.EventToolsListChanged:
139		return proto.MCPEventToolsListChanged
140	case mcp.EventPromptsListChanged:
141		return proto.MCPEventPromptsListChanged
142	case mcp.EventResourcesListChanged:
143		return proto.MCPEventResourcesListChanged
144	default:
145		return proto.MCPEventStateChanged
146	}
147}
148
149func sessionToProto(s session.Session) proto.Session {
150	return proto.Session{
151		ID:               s.ID,
152		ParentSessionID:  s.ParentSessionID,
153		Title:            s.Title,
154		SummaryMessageID: s.SummaryMessageID,
155		MessageCount:     s.MessageCount,
156		PromptTokens:     s.PromptTokens,
157		CompletionTokens: s.CompletionTokens,
158		Cost:             s.Cost,
159		Todos:            todosToProto(s.Todos),
160		CreatedAt:        s.CreatedAt,
161		UpdatedAt:        s.UpdatedAt,
162	}
163}
164
165// isSessionBusy reports whether the given workspace has an in-flight
166// agent run for sessionID. It tolerates a nil workspace (treating it as
167// "not busy") so REST handlers can pass GetWorkspace's result through
168// unconditionally — the workspace lookup error is already surfaced by
169// the prior ListSessions/GetSession call when relevant.
170func isSessionBusy(ws *backend.Workspace, sessionID string) bool {
171	if ws == nil || ws.App == nil || ws.AgentCoordinator == nil {
172		return false
173	}
174	return ws.AgentCoordinator.IsSessionBusy(sessionID)
175}
176
177// attachedClients returns the number of clients currently viewing
178// sessionID in ws. Hold-only clients (streams == 0) do not contribute.
179// A nil workspace is treated as zero so handlers can pass GetWorkspace's
180// result through without an extra guard.
181func attachedClients(ws *backend.Workspace, sessionID string) int {
182	if ws == nil {
183		return 0
184	}
185	return ws.AttachedClientsForSession(sessionID)
186}
187
188func todosToProto(todos []session.Todo) []proto.Todo {
189	if len(todos) == 0 {
190		return nil
191	}
192	out := make([]proto.Todo, len(todos))
193	for i, t := range todos {
194		out[i] = proto.Todo{
195			Content:    t.Content,
196			Status:     string(t.Status),
197			ActiveForm: t.ActiveForm,
198		}
199	}
200	return out
201}
202
203func fileToProto(f history.File) proto.File {
204	return proto.File{
205		ID:        f.ID,
206		SessionID: f.SessionID,
207		Path:      f.Path,
208		Content:   f.Content,
209		Version:   f.Version,
210		CreatedAt: f.CreatedAt,
211		UpdatedAt: f.UpdatedAt,
212	}
213}
214
215func messageToProto(m message.Message) proto.Message {
216	msg := proto.Message{
217		ID:        m.ID,
218		SessionID: m.SessionID,
219		Role:      proto.MessageRole(m.Role),
220		Model:     m.Model,
221		Provider:  m.Provider,
222		CreatedAt: m.CreatedAt,
223		UpdatedAt: m.UpdatedAt,
224	}
225
226	for _, p := range m.Parts {
227		switch v := p.(type) {
228		case message.TextContent:
229			msg.Parts = append(msg.Parts, proto.TextContent{Text: v.Text})
230		case message.ReasoningContent:
231			msg.Parts = append(msg.Parts, proto.ReasoningContent{
232				Thinking:   v.Thinking,
233				Signature:  v.Signature,
234				StartedAt:  v.StartedAt,
235				FinishedAt: v.FinishedAt,
236			})
237		case message.ToolCall:
238			msg.Parts = append(msg.Parts, proto.ToolCall{
239				ID:       v.ID,
240				Name:     v.Name,
241				Input:    v.Input,
242				Finished: v.Finished,
243			})
244		case message.ToolResult:
245			msg.Parts = append(msg.Parts, proto.ToolResult{
246				ToolCallID: v.ToolCallID,
247				Name:       v.Name,
248				Content:    v.Content,
249				Data:       v.Data,
250				MIMEType:   v.MIMEType,
251				Metadata:   v.Metadata,
252				IsError:    v.IsError,
253			})
254		case message.Finish:
255			msg.Parts = append(msg.Parts, proto.Finish{
256				Reason:  proto.FinishReason(v.Reason),
257				Time:    v.Time,
258				Message: v.Message,
259				Details: v.Details,
260			})
261		case message.ImageURLContent:
262			msg.Parts = append(msg.Parts, proto.ImageURLContent{URL: v.URL, Detail: v.Detail})
263		case message.BinaryContent:
264			msg.Parts = append(msg.Parts, proto.BinaryContent{Path: v.Path, MIMEType: v.MIMEType, Data: v.Data})
265		}
266	}
267
268	return msg
269}
270
271// skillsEventToProto converts a skills.Event into its wire form. Errors
272// are flattened to strings because error does not round-trip over JSON.
273func skillsEventToProto(e skills.Event) proto.SkillsEvent {
274	if len(e.States) == 0 {
275		return proto.SkillsEvent{}
276	}
277	out := proto.SkillsEvent{States: make([]proto.SkillState, len(e.States))}
278	for i, s := range e.States {
279		entry := proto.SkillState{
280			Name:  s.Name,
281			Path:  s.Path,
282			State: proto.SkillDiscoveryState(s.State),
283		}
284		if s.Err != nil {
285			entry.Error = s.Err.Error()
286		}
287		out.States[i] = entry
288	}
289	return out
290}
291
292func messagesToProto(msgs []message.Message) []proto.Message {
293	out := make([]proto.Message, len(msgs))
294	for i, m := range msgs {
295		out[i] = messageToProto(m)
296	}
297	return out
298}