1use crate::{ContextServerRegistry, SystemPromptTemplate, Template, Templates};
2use acp_thread::{MentionUri, UserMessageId};
3use action_log::ActionLog;
4use agent_client_protocol as acp;
5use agent_settings::{AgentProfileId, AgentSettings, CompletionMode};
6use anyhow::{Context as _, Result, anyhow};
7use assistant_tool::adapt_schema_to_format;
8use cloud_llm_client::{CompletionIntent, CompletionRequestStatus};
9use collections::IndexMap;
10use fs::Fs;
11use futures::{
12 channel::{mpsc, oneshot},
13 stream::FuturesUnordered,
14};
15use gpui::{App, Context, Entity, SharedString, Task};
16use language_model::{
17 LanguageModel, LanguageModelCompletionEvent, LanguageModelImage, LanguageModelProviderId,
18 LanguageModelRequest, LanguageModelRequestMessage, LanguageModelRequestTool,
19 LanguageModelToolResult, LanguageModelToolResultContent, LanguageModelToolSchemaFormat,
20 LanguageModelToolUse, LanguageModelToolUseId, Role, StopReason,
21};
22use project::Project;
23use prompt_store::ProjectContext;
24use schemars::{JsonSchema, Schema};
25use serde::{Deserialize, Serialize};
26use settings::{Settings, update_settings_file};
27use smol::stream::StreamExt;
28use std::{collections::BTreeMap, path::Path, sync::Arc};
29use std::{fmt::Write, ops::Range};
30use util::{ResultExt, markdown::MarkdownCodeBlock};
31use uuid::Uuid;
32
33#[derive(
34 Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Serialize, Deserialize, JsonSchema,
35)]
36pub struct ThreadId(Arc<str>);
37
38impl ThreadId {
39 pub fn new() -> Self {
40 Self(Uuid::new_v4().to_string().into())
41 }
42}
43
44impl std::fmt::Display for ThreadId {
45 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
46 write!(f, "{}", self.0)
47 }
48}
49
50impl From<&str> for ThreadId {
51 fn from(value: &str) -> Self {
52 Self(value.into())
53 }
54}
55
56/// The ID of the user prompt that initiated a request.
57///
58/// This equates to the user physically submitting a message to the model (e.g., by pressing the Enter key).
59#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Serialize, Deserialize)]
60pub struct PromptId(Arc<str>);
61
62impl PromptId {
63 pub fn new() -> Self {
64 Self(Uuid::new_v4().to_string().into())
65 }
66}
67
68impl std::fmt::Display for PromptId {
69 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
70 write!(f, "{}", self.0)
71 }
72}
73
74#[derive(Debug, Clone, PartialEq, Eq)]
75pub enum Message {
76 User(UserMessage),
77 Agent(AgentMessage),
78 Resume,
79}
80
81impl Message {
82 pub fn as_agent_message(&self) -> Option<&AgentMessage> {
83 match self {
84 Message::Agent(agent_message) => Some(agent_message),
85 _ => None,
86 }
87 }
88
89 pub fn to_markdown(&self) -> String {
90 match self {
91 Message::User(message) => message.to_markdown(),
92 Message::Agent(message) => message.to_markdown(),
93 Message::Resume => "[resumed after tool use limit was reached]".into(),
94 }
95 }
96}
97
98#[derive(Debug, Clone, PartialEq, Eq)]
99pub struct UserMessage {
100 pub id: UserMessageId,
101 pub content: Vec<UserMessageContent>,
102}
103
104#[derive(Debug, Clone, PartialEq, Eq)]
105pub enum UserMessageContent {
106 Text(String),
107 Mention { uri: MentionUri, content: String },
108 Image(LanguageModelImage),
109}
110
111impl UserMessage {
112 pub fn to_markdown(&self) -> String {
113 let mut markdown = String::from("## User\n\n");
114
115 for content in &self.content {
116 match content {
117 UserMessageContent::Text(text) => {
118 markdown.push_str(text);
119 markdown.push('\n');
120 }
121 UserMessageContent::Image(_) => {
122 markdown.push_str("<image />\n");
123 }
124 UserMessageContent::Mention { uri, content } => {
125 if !content.is_empty() {
126 let _ = write!(&mut markdown, "{}\n\n{}\n", uri.as_link(), content);
127 } else {
128 let _ = write!(&mut markdown, "{}\n", uri.as_link());
129 }
130 }
131 }
132 }
133
134 markdown
135 }
136
137 fn to_request(&self) -> LanguageModelRequestMessage {
138 let mut message = LanguageModelRequestMessage {
139 role: Role::User,
140 content: Vec::with_capacity(self.content.len()),
141 cache: false,
142 };
143
144 const OPEN_CONTEXT: &str = "<context>\n\
145 The following items were attached by the user. \
146 They are up-to-date and don't need to be re-read.\n\n";
147
148 const OPEN_FILES_TAG: &str = "<files>";
149 const OPEN_DIRECTORIES_TAG: &str = "<directories>";
150 const OPEN_SYMBOLS_TAG: &str = "<symbols>";
151 const OPEN_THREADS_TAG: &str = "<threads>";
152 const OPEN_FETCH_TAG: &str = "<fetched_urls>";
153 const OPEN_RULES_TAG: &str =
154 "<rules>\nThe user has specified the following rules that should be applied:\n";
155
156 let mut file_context = OPEN_FILES_TAG.to_string();
157 let mut directory_context = OPEN_DIRECTORIES_TAG.to_string();
158 let mut symbol_context = OPEN_SYMBOLS_TAG.to_string();
159 let mut thread_context = OPEN_THREADS_TAG.to_string();
160 let mut fetch_context = OPEN_FETCH_TAG.to_string();
161 let mut rules_context = OPEN_RULES_TAG.to_string();
162
163 for chunk in &self.content {
164 let chunk = match chunk {
165 UserMessageContent::Text(text) => {
166 language_model::MessageContent::Text(text.clone())
167 }
168 UserMessageContent::Image(value) => {
169 language_model::MessageContent::Image(value.clone())
170 }
171 UserMessageContent::Mention { uri, content } => {
172 match uri {
173 MentionUri::File { abs_path } => {
174 write!(
175 &mut symbol_context,
176 "\n{}",
177 MarkdownCodeBlock {
178 tag: &codeblock_tag(abs_path, None),
179 text: &content.to_string(),
180 }
181 )
182 .ok();
183 }
184 MentionUri::Directory { .. } => {
185 write!(&mut directory_context, "\n{}\n", content).ok();
186 }
187 MentionUri::Symbol {
188 path, line_range, ..
189 }
190 | MentionUri::Selection {
191 path, line_range, ..
192 } => {
193 write!(
194 &mut rules_context,
195 "\n{}",
196 MarkdownCodeBlock {
197 tag: &codeblock_tag(path, Some(line_range)),
198 text: content
199 }
200 )
201 .ok();
202 }
203 MentionUri::Thread { .. } => {
204 write!(&mut thread_context, "\n{}\n", content).ok();
205 }
206 MentionUri::TextThread { .. } => {
207 write!(&mut thread_context, "\n{}\n", content).ok();
208 }
209 MentionUri::Rule { .. } => {
210 write!(
211 &mut rules_context,
212 "\n{}",
213 MarkdownCodeBlock {
214 tag: "",
215 text: content
216 }
217 )
218 .ok();
219 }
220 MentionUri::Fetch { url } => {
221 write!(&mut fetch_context, "\nFetch: {}\n\n{}", url, content).ok();
222 }
223 }
224
225 language_model::MessageContent::Text(uri.as_link().to_string())
226 }
227 };
228
229 message.content.push(chunk);
230 }
231
232 let len_before_context = message.content.len();
233
234 if file_context.len() > OPEN_FILES_TAG.len() {
235 file_context.push_str("</files>\n");
236 message
237 .content
238 .push(language_model::MessageContent::Text(file_context));
239 }
240
241 if directory_context.len() > OPEN_DIRECTORIES_TAG.len() {
242 directory_context.push_str("</directories>\n");
243 message
244 .content
245 .push(language_model::MessageContent::Text(directory_context));
246 }
247
248 if symbol_context.len() > OPEN_SYMBOLS_TAG.len() {
249 symbol_context.push_str("</symbols>\n");
250 message
251 .content
252 .push(language_model::MessageContent::Text(symbol_context));
253 }
254
255 if thread_context.len() > OPEN_THREADS_TAG.len() {
256 thread_context.push_str("</threads>\n");
257 message
258 .content
259 .push(language_model::MessageContent::Text(thread_context));
260 }
261
262 if fetch_context.len() > OPEN_FETCH_TAG.len() {
263 fetch_context.push_str("</fetched_urls>\n");
264 message
265 .content
266 .push(language_model::MessageContent::Text(fetch_context));
267 }
268
269 if rules_context.len() > OPEN_RULES_TAG.len() {
270 rules_context.push_str("</user_rules>\n");
271 message
272 .content
273 .push(language_model::MessageContent::Text(rules_context));
274 }
275
276 if message.content.len() > len_before_context {
277 message.content.insert(
278 len_before_context,
279 language_model::MessageContent::Text(OPEN_CONTEXT.into()),
280 );
281 message
282 .content
283 .push(language_model::MessageContent::Text("</context>".into()));
284 }
285
286 message
287 }
288}
289
290fn codeblock_tag(full_path: &Path, line_range: Option<&Range<u32>>) -> String {
291 let mut result = String::new();
292
293 if let Some(extension) = full_path.extension().and_then(|ext| ext.to_str()) {
294 let _ = write!(result, "{} ", extension);
295 }
296
297 let _ = write!(result, "{}", full_path.display());
298
299 if let Some(range) = line_range {
300 if range.start == range.end {
301 let _ = write!(result, ":{}", range.start + 1);
302 } else {
303 let _ = write!(result, ":{}-{}", range.start + 1, range.end + 1);
304 }
305 }
306
307 result
308}
309
310impl AgentMessage {
311 pub fn to_markdown(&self) -> String {
312 let mut markdown = String::from("## Assistant\n\n");
313
314 for content in &self.content {
315 match content {
316 AgentMessageContent::Text(text) => {
317 markdown.push_str(text);
318 markdown.push('\n');
319 }
320 AgentMessageContent::Thinking { text, .. } => {
321 markdown.push_str("<think>");
322 markdown.push_str(text);
323 markdown.push_str("</think>\n");
324 }
325 AgentMessageContent::RedactedThinking(_) => {
326 markdown.push_str("<redacted_thinking />\n")
327 }
328 AgentMessageContent::Image(_) => {
329 markdown.push_str("<image />\n");
330 }
331 AgentMessageContent::ToolUse(tool_use) => {
332 markdown.push_str(&format!(
333 "**Tool Use**: {} (ID: {})\n",
334 tool_use.name, tool_use.id
335 ));
336 markdown.push_str(&format!(
337 "{}\n",
338 MarkdownCodeBlock {
339 tag: "json",
340 text: &format!("{:#}", tool_use.input)
341 }
342 ));
343 }
344 }
345 }
346
347 for tool_result in self.tool_results.values() {
348 markdown.push_str(&format!(
349 "**Tool Result**: {} (ID: {})\n\n",
350 tool_result.tool_name, tool_result.tool_use_id
351 ));
352 if tool_result.is_error {
353 markdown.push_str("**ERROR:**\n");
354 }
355
356 match &tool_result.content {
357 LanguageModelToolResultContent::Text(text) => {
358 writeln!(markdown, "{text}\n").ok();
359 }
360 LanguageModelToolResultContent::Image(_) => {
361 writeln!(markdown, "<image />\n").ok();
362 }
363 }
364
365 if let Some(output) = tool_result.output.as_ref() {
366 writeln!(
367 markdown,
368 "**Debug Output**:\n\n```json\n{}\n```\n",
369 serde_json::to_string_pretty(output).unwrap()
370 )
371 .unwrap();
372 }
373 }
374
375 markdown
376 }
377
378 pub fn to_request(&self) -> Vec<LanguageModelRequestMessage> {
379 let mut assistant_message = LanguageModelRequestMessage {
380 role: Role::Assistant,
381 content: Vec::with_capacity(self.content.len()),
382 cache: false,
383 };
384 for chunk in &self.content {
385 let chunk = match chunk {
386 AgentMessageContent::Text(text) => {
387 language_model::MessageContent::Text(text.clone())
388 }
389 AgentMessageContent::Thinking { text, signature } => {
390 language_model::MessageContent::Thinking {
391 text: text.clone(),
392 signature: signature.clone(),
393 }
394 }
395 AgentMessageContent::RedactedThinking(value) => {
396 language_model::MessageContent::RedactedThinking(value.clone())
397 }
398 AgentMessageContent::ToolUse(value) => {
399 language_model::MessageContent::ToolUse(value.clone())
400 }
401 AgentMessageContent::Image(value) => {
402 language_model::MessageContent::Image(value.clone())
403 }
404 };
405 assistant_message.content.push(chunk);
406 }
407
408 let mut user_message = LanguageModelRequestMessage {
409 role: Role::User,
410 content: Vec::new(),
411 cache: false,
412 };
413
414 for tool_result in self.tool_results.values() {
415 user_message
416 .content
417 .push(language_model::MessageContent::ToolResult(
418 tool_result.clone(),
419 ));
420 }
421
422 let mut messages = Vec::new();
423 if !assistant_message.content.is_empty() {
424 messages.push(assistant_message);
425 }
426 if !user_message.content.is_empty() {
427 messages.push(user_message);
428 }
429 messages
430 }
431}
432
433#[derive(Default, Debug, Clone, PartialEq, Eq)]
434pub struct AgentMessage {
435 pub content: Vec<AgentMessageContent>,
436 pub tool_results: IndexMap<LanguageModelToolUseId, LanguageModelToolResult>,
437}
438
439#[derive(Debug, Clone, PartialEq, Eq)]
440pub enum AgentMessageContent {
441 Text(String),
442 Thinking {
443 text: String,
444 signature: Option<String>,
445 },
446 RedactedThinking(String),
447 Image(LanguageModelImage),
448 ToolUse(LanguageModelToolUse),
449}
450
451#[derive(Debug)]
452pub enum AgentResponseEvent {
453 Text(String),
454 Thinking(String),
455 ToolCall(acp::ToolCall),
456 ToolCallUpdate(acp_thread::ToolCallUpdate),
457 ToolCallAuthorization(ToolCallAuthorization),
458 Stop(acp::StopReason),
459}
460
461#[derive(Debug)]
462pub struct ToolCallAuthorization {
463 pub tool_call: acp::ToolCallUpdate,
464 pub options: Vec<acp::PermissionOption>,
465 pub response: oneshot::Sender<acp::PermissionOptionId>,
466}
467
468pub struct Thread {
469 id: ThreadId,
470 prompt_id: PromptId,
471 messages: Vec<Message>,
472 completion_mode: CompletionMode,
473 /// Holds the task that handles agent interaction until the end of the turn.
474 /// Survives across multiple requests as the model performs tool calls and
475 /// we run tools, report their results.
476 running_turn: Option<RunningTurn>,
477 pending_message: Option<AgentMessage>,
478 tools: BTreeMap<SharedString, Arc<dyn AnyAgentTool>>,
479 tool_use_limit_reached: bool,
480 context_server_registry: Entity<ContextServerRegistry>,
481 profile_id: AgentProfileId,
482 project_context: Entity<ProjectContext>,
483 templates: Arc<Templates>,
484 model: Option<Arc<dyn LanguageModel>>,
485 project: Entity<Project>,
486 action_log: Entity<ActionLog>,
487}
488
489impl Thread {
490 pub fn new(
491 project: Entity<Project>,
492 project_context: Entity<ProjectContext>,
493 context_server_registry: Entity<ContextServerRegistry>,
494 action_log: Entity<ActionLog>,
495 templates: Arc<Templates>,
496 model: Option<Arc<dyn LanguageModel>>,
497 cx: &mut Context<Self>,
498 ) -> Self {
499 let profile_id = AgentSettings::get_global(cx).default_profile.clone();
500 Self {
501 id: ThreadId::new(),
502 prompt_id: PromptId::new(),
503 messages: Vec::new(),
504 completion_mode: CompletionMode::Normal,
505 running_turn: None,
506 pending_message: None,
507 tools: BTreeMap::default(),
508 tool_use_limit_reached: false,
509 context_server_registry,
510 profile_id,
511 project_context,
512 templates,
513 model,
514 project,
515 action_log,
516 }
517 }
518
519 pub fn project(&self) -> &Entity<Project> {
520 &self.project
521 }
522
523 pub fn project_context(&self) -> &Entity<ProjectContext> {
524 &self.project_context
525 }
526
527 pub fn action_log(&self) -> &Entity<ActionLog> {
528 &self.action_log
529 }
530
531 pub fn model(&self) -> Option<&Arc<dyn LanguageModel>> {
532 self.model.as_ref()
533 }
534
535 pub fn set_model(&mut self, model: Arc<dyn LanguageModel>) {
536 self.model = Some(model);
537 }
538
539 pub fn completion_mode(&self) -> CompletionMode {
540 self.completion_mode
541 }
542
543 pub fn set_completion_mode(&mut self, mode: CompletionMode) {
544 self.completion_mode = mode;
545 }
546
547 #[cfg(any(test, feature = "test-support"))]
548 pub fn last_message(&self) -> Option<Message> {
549 if let Some(message) = self.pending_message.clone() {
550 Some(Message::Agent(message))
551 } else {
552 self.messages.last().cloned()
553 }
554 }
555
556 pub fn add_tool(&mut self, tool: impl AgentTool) {
557 self.tools.insert(tool.name(), tool.erase());
558 }
559
560 pub fn remove_tool(&mut self, name: &str) -> bool {
561 self.tools.remove(name).is_some()
562 }
563
564 pub fn profile(&self) -> &AgentProfileId {
565 &self.profile_id
566 }
567
568 pub fn set_profile(&mut self, profile_id: AgentProfileId) {
569 self.profile_id = profile_id;
570 }
571
572 pub fn cancel(&mut self) {
573 if let Some(running_turn) = self.running_turn.take() {
574 running_turn.cancel();
575 }
576 self.flush_pending_message();
577 }
578
579 pub fn truncate(&mut self, message_id: UserMessageId) -> Result<()> {
580 self.cancel();
581 let Some(position) = self.messages.iter().position(
582 |msg| matches!(msg, Message::User(UserMessage { id, .. }) if id == &message_id),
583 ) else {
584 return Err(anyhow!("Message not found"));
585 };
586 self.messages.truncate(position);
587 Ok(())
588 }
589
590 pub fn resume(
591 &mut self,
592 cx: &mut Context<Self>,
593 ) -> Result<mpsc::UnboundedReceiver<Result<AgentResponseEvent>>> {
594 anyhow::ensure!(self.model.is_some(), "Model not set");
595 anyhow::ensure!(
596 self.tool_use_limit_reached,
597 "can only resume after tool use limit is reached"
598 );
599
600 self.messages.push(Message::Resume);
601 cx.notify();
602
603 log::info!("Total messages in thread: {}", self.messages.len());
604 self.run_turn(cx)
605 }
606
607 /// Sending a message results in the model streaming a response, which could include tool calls.
608 /// After calling tools, the model will stops and waits for any outstanding tool calls to be completed and their results sent.
609 /// The returned channel will report all the occurrences in which the model stops before erroring or ending its turn.
610 pub fn send<T>(
611 &mut self,
612 id: UserMessageId,
613 content: impl IntoIterator<Item = T>,
614 cx: &mut Context<Self>,
615 ) -> Result<mpsc::UnboundedReceiver<Result<AgentResponseEvent>>>
616 where
617 T: Into<UserMessageContent>,
618 {
619 let model = self.model().context("No language model configured")?;
620
621 log::info!("Thread::send called with model: {:?}", model.name());
622 self.advance_prompt_id();
623
624 let content = content.into_iter().map(Into::into).collect::<Vec<_>>();
625 log::debug!("Thread::send content: {:?}", content);
626
627 self.messages
628 .push(Message::User(UserMessage { id, content }));
629 cx.notify();
630
631 log::info!("Total messages in thread: {}", self.messages.len());
632 self.run_turn(cx)
633 }
634
635 fn run_turn(
636 &mut self,
637 cx: &mut Context<Self>,
638 ) -> Result<mpsc::UnboundedReceiver<Result<AgentResponseEvent>>> {
639 self.cancel();
640
641 let model = self
642 .model()
643 .cloned()
644 .context("No language model configured")?;
645 let (events_tx, events_rx) = mpsc::unbounded::<Result<AgentResponseEvent>>();
646 let event_stream = AgentResponseEventStream(events_tx);
647 let message_ix = self.messages.len().saturating_sub(1);
648 self.tool_use_limit_reached = false;
649 self.running_turn = Some(RunningTurn {
650 event_stream: event_stream.clone(),
651 _task: cx.spawn(async move |this, cx| {
652 log::info!("Starting agent turn execution");
653 let turn_result: Result<()> = async {
654 let mut completion_intent = CompletionIntent::UserPrompt;
655 loop {
656 log::debug!(
657 "Building completion request with intent: {:?}",
658 completion_intent
659 );
660 let request = this.update(cx, |this, cx| {
661 this.build_completion_request(completion_intent, cx)
662 })??;
663
664 log::info!("Calling model.stream_completion");
665 let mut events = model.stream_completion(request, cx).await?;
666 log::debug!("Stream completion started successfully");
667
668 let mut tool_use_limit_reached = false;
669 let mut tool_uses = FuturesUnordered::new();
670 while let Some(event) = events.next().await {
671 match event? {
672 LanguageModelCompletionEvent::StatusUpdate(
673 CompletionRequestStatus::ToolUseLimitReached,
674 ) => {
675 tool_use_limit_reached = true;
676 }
677 LanguageModelCompletionEvent::Stop(reason) => {
678 event_stream.send_stop(reason);
679 if reason == StopReason::Refusal {
680 this.update(cx, |this, _cx| {
681 this.flush_pending_message();
682 this.messages.truncate(message_ix);
683 })?;
684 return Ok(());
685 }
686 }
687 event => {
688 log::trace!("Received completion event: {:?}", event);
689 this.update(cx, |this, cx| {
690 tool_uses.extend(this.handle_streamed_completion_event(
691 event,
692 &event_stream,
693 cx,
694 ));
695 })
696 .ok();
697 }
698 }
699 }
700
701 let used_tools = tool_uses.is_empty();
702 while let Some(tool_result) = tool_uses.next().await {
703 log::info!("Tool finished {:?}", tool_result);
704
705 event_stream.update_tool_call_fields(
706 &tool_result.tool_use_id,
707 acp::ToolCallUpdateFields {
708 status: Some(if tool_result.is_error {
709 acp::ToolCallStatus::Failed
710 } else {
711 acp::ToolCallStatus::Completed
712 }),
713 raw_output: tool_result.output.clone(),
714 ..Default::default()
715 },
716 );
717 this.update(cx, |this, _cx| {
718 this.pending_message()
719 .tool_results
720 .insert(tool_result.tool_use_id.clone(), tool_result);
721 })
722 .ok();
723 }
724
725 if tool_use_limit_reached {
726 log::info!("Tool use limit reached, completing turn");
727 this.update(cx, |this, _cx| this.tool_use_limit_reached = true)?;
728 return Err(language_model::ToolUseLimitReachedError.into());
729 } else if used_tools {
730 log::info!("No tool uses found, completing turn");
731 return Ok(());
732 } else {
733 this.update(cx, |this, _| this.flush_pending_message())?;
734 completion_intent = CompletionIntent::ToolResults;
735 }
736 }
737 }
738 .await;
739
740 if let Err(error) = turn_result {
741 log::error!("Turn execution failed: {:?}", error);
742 event_stream.send_error(error);
743 } else {
744 log::info!("Turn execution completed successfully");
745 }
746
747 this.update(cx, |this, _| {
748 this.flush_pending_message();
749 this.running_turn.take();
750 })
751 .ok();
752 }),
753 });
754 Ok(events_rx)
755 }
756
757 pub fn build_system_message(&self, cx: &App) -> LanguageModelRequestMessage {
758 log::debug!("Building system message");
759 let prompt = SystemPromptTemplate {
760 project: &self.project_context.read(cx),
761 available_tools: self.tools.keys().cloned().collect(),
762 }
763 .render(&self.templates)
764 .context("failed to build system prompt")
765 .expect("Invalid template");
766 log::debug!("System message built");
767 LanguageModelRequestMessage {
768 role: Role::System,
769 content: vec![prompt.into()],
770 cache: true,
771 }
772 }
773
774 /// A helper method that's called on every streamed completion event.
775 /// Returns an optional tool result task, which the main agentic loop in
776 /// send will send back to the model when it resolves.
777 fn handle_streamed_completion_event(
778 &mut self,
779 event: LanguageModelCompletionEvent,
780 event_stream: &AgentResponseEventStream,
781 cx: &mut Context<Self>,
782 ) -> Option<Task<LanguageModelToolResult>> {
783 log::trace!("Handling streamed completion event: {:?}", event);
784 use LanguageModelCompletionEvent::*;
785
786 match event {
787 StartMessage { .. } => {
788 self.flush_pending_message();
789 self.pending_message = Some(AgentMessage::default());
790 }
791 Text(new_text) => self.handle_text_event(new_text, event_stream, cx),
792 Thinking { text, signature } => {
793 self.handle_thinking_event(text, signature, event_stream, cx)
794 }
795 RedactedThinking { data } => self.handle_redacted_thinking_event(data, cx),
796 ToolUse(tool_use) => {
797 return self.handle_tool_use_event(tool_use, event_stream, cx);
798 }
799 ToolUseJsonParseError {
800 id,
801 tool_name,
802 raw_input,
803 json_parse_error,
804 } => {
805 return Some(Task::ready(self.handle_tool_use_json_parse_error_event(
806 id,
807 tool_name,
808 raw_input,
809 json_parse_error,
810 )));
811 }
812 UsageUpdate(_) | StatusUpdate(_) => {}
813 Stop(_) => unreachable!(),
814 }
815
816 None
817 }
818
819 fn handle_text_event(
820 &mut self,
821 new_text: String,
822 event_stream: &AgentResponseEventStream,
823 cx: &mut Context<Self>,
824 ) {
825 event_stream.send_text(&new_text);
826
827 let last_message = self.pending_message();
828 if let Some(AgentMessageContent::Text(text)) = last_message.content.last_mut() {
829 text.push_str(&new_text);
830 } else {
831 last_message
832 .content
833 .push(AgentMessageContent::Text(new_text));
834 }
835
836 cx.notify();
837 }
838
839 fn handle_thinking_event(
840 &mut self,
841 new_text: String,
842 new_signature: Option<String>,
843 event_stream: &AgentResponseEventStream,
844 cx: &mut Context<Self>,
845 ) {
846 event_stream.send_thinking(&new_text);
847
848 let last_message = self.pending_message();
849 if let Some(AgentMessageContent::Thinking { text, signature }) =
850 last_message.content.last_mut()
851 {
852 text.push_str(&new_text);
853 *signature = new_signature.or(signature.take());
854 } else {
855 last_message.content.push(AgentMessageContent::Thinking {
856 text: new_text,
857 signature: new_signature,
858 });
859 }
860
861 cx.notify();
862 }
863
864 fn handle_redacted_thinking_event(&mut self, data: String, cx: &mut Context<Self>) {
865 let last_message = self.pending_message();
866 last_message
867 .content
868 .push(AgentMessageContent::RedactedThinking(data));
869 cx.notify();
870 }
871
872 fn handle_tool_use_event(
873 &mut self,
874 tool_use: LanguageModelToolUse,
875 event_stream: &AgentResponseEventStream,
876 cx: &mut Context<Self>,
877 ) -> Option<Task<LanguageModelToolResult>> {
878 cx.notify();
879
880 let tool = self.tools.get(tool_use.name.as_ref()).cloned();
881 let mut title = SharedString::from(&tool_use.name);
882 let mut kind = acp::ToolKind::Other;
883 if let Some(tool) = tool.as_ref() {
884 title = tool.initial_title(tool_use.input.clone());
885 kind = tool.kind();
886 }
887
888 // Ensure the last message ends in the current tool use
889 let last_message = self.pending_message();
890 let push_new_tool_use = last_message.content.last_mut().map_or(true, |content| {
891 if let AgentMessageContent::ToolUse(last_tool_use) = content {
892 if last_tool_use.id == tool_use.id {
893 *last_tool_use = tool_use.clone();
894 false
895 } else {
896 true
897 }
898 } else {
899 true
900 }
901 });
902
903 if push_new_tool_use {
904 event_stream.send_tool_call(&tool_use.id, title, kind, tool_use.input.clone());
905 last_message
906 .content
907 .push(AgentMessageContent::ToolUse(tool_use.clone()));
908 } else {
909 event_stream.update_tool_call_fields(
910 &tool_use.id,
911 acp::ToolCallUpdateFields {
912 title: Some(title.into()),
913 kind: Some(kind),
914 raw_input: Some(tool_use.input.clone()),
915 ..Default::default()
916 },
917 );
918 }
919
920 if !tool_use.is_input_complete {
921 return None;
922 }
923
924 let Some(tool) = tool else {
925 let content = format!("No tool named {} exists", tool_use.name);
926 return Some(Task::ready(LanguageModelToolResult {
927 content: LanguageModelToolResultContent::Text(Arc::from(content)),
928 tool_use_id: tool_use.id,
929 tool_name: tool_use.name,
930 is_error: true,
931 output: None,
932 }));
933 };
934
935 let fs = self.project.read(cx).fs().clone();
936 let tool_event_stream =
937 ToolCallEventStream::new(tool_use.id.clone(), event_stream.clone(), Some(fs));
938 tool_event_stream.update_fields(acp::ToolCallUpdateFields {
939 status: Some(acp::ToolCallStatus::InProgress),
940 ..Default::default()
941 });
942 let supports_images = self.model().map_or(false, |model| model.supports_images());
943 let tool_result = tool.run(tool_use.input, tool_event_stream, cx);
944 log::info!("Running tool {}", tool_use.name);
945 Some(cx.foreground_executor().spawn(async move {
946 let tool_result = tool_result.await.and_then(|output| {
947 if let LanguageModelToolResultContent::Image(_) = &output.llm_output {
948 if !supports_images {
949 return Err(anyhow!(
950 "Attempted to read an image, but this model doesn't support it.",
951 ));
952 }
953 }
954 Ok(output)
955 });
956
957 match tool_result {
958 Ok(output) => LanguageModelToolResult {
959 tool_use_id: tool_use.id,
960 tool_name: tool_use.name,
961 is_error: false,
962 content: output.llm_output,
963 output: Some(output.raw_output),
964 },
965 Err(error) => LanguageModelToolResult {
966 tool_use_id: tool_use.id,
967 tool_name: tool_use.name,
968 is_error: true,
969 content: LanguageModelToolResultContent::Text(Arc::from(error.to_string())),
970 output: None,
971 },
972 }
973 }))
974 }
975
976 fn handle_tool_use_json_parse_error_event(
977 &mut self,
978 tool_use_id: LanguageModelToolUseId,
979 tool_name: Arc<str>,
980 raw_input: Arc<str>,
981 json_parse_error: String,
982 ) -> LanguageModelToolResult {
983 let tool_output = format!("Error parsing input JSON: {json_parse_error}");
984 LanguageModelToolResult {
985 tool_use_id,
986 tool_name,
987 is_error: true,
988 content: LanguageModelToolResultContent::Text(tool_output.into()),
989 output: Some(serde_json::Value::String(raw_input.to_string())),
990 }
991 }
992
993 fn pending_message(&mut self) -> &mut AgentMessage {
994 self.pending_message.get_or_insert_default()
995 }
996
997 fn flush_pending_message(&mut self) {
998 let Some(mut message) = self.pending_message.take() else {
999 return;
1000 };
1001
1002 for content in &message.content {
1003 let AgentMessageContent::ToolUse(tool_use) = content else {
1004 continue;
1005 };
1006
1007 if !message.tool_results.contains_key(&tool_use.id) {
1008 message.tool_results.insert(
1009 tool_use.id.clone(),
1010 LanguageModelToolResult {
1011 tool_use_id: tool_use.id.clone(),
1012 tool_name: tool_use.name.clone(),
1013 is_error: true,
1014 content: LanguageModelToolResultContent::Text(
1015 "Tool canceled by user".into(),
1016 ),
1017 output: None,
1018 },
1019 );
1020 }
1021 }
1022
1023 self.messages.push(Message::Agent(message));
1024 }
1025
1026 pub(crate) fn build_completion_request(
1027 &self,
1028 completion_intent: CompletionIntent,
1029 cx: &mut App,
1030 ) -> Result<LanguageModelRequest> {
1031 let model = self.model().context("No language model configured")?;
1032
1033 log::debug!("Building completion request");
1034 log::debug!("Completion intent: {:?}", completion_intent);
1035 log::debug!("Completion mode: {:?}", self.completion_mode);
1036
1037 let messages = self.build_request_messages(cx);
1038 log::info!("Request will include {} messages", messages.len());
1039
1040 let tools = if let Some(tools) = self.tools(cx).log_err() {
1041 tools
1042 .filter_map(|tool| {
1043 let tool_name = tool.name().to_string();
1044 log::trace!("Including tool: {}", tool_name);
1045 Some(LanguageModelRequestTool {
1046 name: tool_name,
1047 description: tool.description().to_string(),
1048 input_schema: tool.input_schema(model.tool_input_format()).log_err()?,
1049 })
1050 })
1051 .collect()
1052 } else {
1053 Vec::new()
1054 };
1055
1056 log::info!("Request includes {} tools", tools.len());
1057
1058 let request = LanguageModelRequest {
1059 thread_id: Some(self.id.to_string()),
1060 prompt_id: Some(self.prompt_id.to_string()),
1061 intent: Some(completion_intent),
1062 mode: Some(self.completion_mode.into()),
1063 messages,
1064 tools,
1065 tool_choice: None,
1066 stop: Vec::new(),
1067 temperature: AgentSettings::temperature_for_model(model, cx),
1068 thinking_allowed: true,
1069 };
1070
1071 log::debug!("Completion request built successfully");
1072 Ok(request)
1073 }
1074
1075 fn tools<'a>(&'a self, cx: &'a App) -> Result<impl Iterator<Item = &'a Arc<dyn AnyAgentTool>>> {
1076 let model = self.model().context("No language model configured")?;
1077
1078 let profile = AgentSettings::get_global(cx)
1079 .profiles
1080 .get(&self.profile_id)
1081 .context("profile not found")?;
1082 let provider_id = model.provider_id();
1083
1084 Ok(self
1085 .tools
1086 .iter()
1087 .filter(move |(_, tool)| tool.supported_provider(&provider_id))
1088 .filter_map(|(tool_name, tool)| {
1089 if profile.is_tool_enabled(tool_name) {
1090 Some(tool)
1091 } else {
1092 None
1093 }
1094 })
1095 .chain(self.context_server_registry.read(cx).servers().flat_map(
1096 |(server_id, tools)| {
1097 tools.iter().filter_map(|(tool_name, tool)| {
1098 if profile.is_context_server_tool_enabled(&server_id.0, tool_name) {
1099 Some(tool)
1100 } else {
1101 None
1102 }
1103 })
1104 },
1105 )))
1106 }
1107
1108 fn build_request_messages(&self, cx: &App) -> Vec<LanguageModelRequestMessage> {
1109 log::trace!(
1110 "Building request messages from {} thread messages",
1111 self.messages.len()
1112 );
1113 let mut messages = vec![self.build_system_message(cx)];
1114 for message in &self.messages {
1115 match message {
1116 Message::User(message) => messages.push(message.to_request()),
1117 Message::Agent(message) => messages.extend(message.to_request()),
1118 Message::Resume => messages.push(LanguageModelRequestMessage {
1119 role: Role::User,
1120 content: vec!["Continue where you left off".into()],
1121 cache: false,
1122 }),
1123 }
1124 }
1125
1126 if let Some(message) = self.pending_message.as_ref() {
1127 messages.extend(message.to_request());
1128 }
1129
1130 if let Some(last_user_message) = messages
1131 .iter_mut()
1132 .rev()
1133 .find(|message| message.role == Role::User)
1134 {
1135 last_user_message.cache = true;
1136 }
1137
1138 messages
1139 }
1140
1141 pub fn to_markdown(&self) -> String {
1142 let mut markdown = String::new();
1143 for (ix, message) in self.messages.iter().enumerate() {
1144 if ix > 0 {
1145 markdown.push('\n');
1146 }
1147 markdown.push_str(&message.to_markdown());
1148 }
1149
1150 if let Some(message) = self.pending_message.as_ref() {
1151 markdown.push('\n');
1152 markdown.push_str(&message.to_markdown());
1153 }
1154
1155 markdown
1156 }
1157
1158 fn advance_prompt_id(&mut self) {
1159 self.prompt_id = PromptId::new();
1160 }
1161}
1162
1163struct RunningTurn {
1164 /// Holds the task that handles agent interaction until the end of the turn.
1165 /// Survives across multiple requests as the model performs tool calls and
1166 /// we run tools, report their results.
1167 _task: Task<()>,
1168 /// The current event stream for the running turn. Used to report a final
1169 /// cancellation event if we cancel the turn.
1170 event_stream: AgentResponseEventStream,
1171}
1172
1173impl RunningTurn {
1174 fn cancel(self) {
1175 log::debug!("Cancelling in progress turn");
1176 self.event_stream.send_canceled();
1177 }
1178}
1179
1180pub trait AgentTool
1181where
1182 Self: 'static + Sized,
1183{
1184 type Input: for<'de> Deserialize<'de> + Serialize + JsonSchema;
1185 type Output: for<'de> Deserialize<'de> + Serialize + Into<LanguageModelToolResultContent>;
1186
1187 fn name(&self) -> SharedString;
1188
1189 fn description(&self) -> SharedString {
1190 let schema = schemars::schema_for!(Self::Input);
1191 SharedString::new(
1192 schema
1193 .get("description")
1194 .and_then(|description| description.as_str())
1195 .unwrap_or_default(),
1196 )
1197 }
1198
1199 fn kind(&self) -> acp::ToolKind;
1200
1201 /// The initial tool title to display. Can be updated during the tool run.
1202 fn initial_title(&self, input: Result<Self::Input, serde_json::Value>) -> SharedString;
1203
1204 /// Returns the JSON schema that describes the tool's input.
1205 fn input_schema(&self) -> Schema {
1206 schemars::schema_for!(Self::Input)
1207 }
1208
1209 /// Some tools rely on a provider for the underlying billing or other reasons.
1210 /// Allow the tool to check if they are compatible, or should be filtered out.
1211 fn supported_provider(&self, _provider: &LanguageModelProviderId) -> bool {
1212 true
1213 }
1214
1215 /// Runs the tool with the provided input.
1216 fn run(
1217 self: Arc<Self>,
1218 input: Self::Input,
1219 event_stream: ToolCallEventStream,
1220 cx: &mut App,
1221 ) -> Task<Result<Self::Output>>;
1222
1223 fn erase(self) -> Arc<dyn AnyAgentTool> {
1224 Arc::new(Erased(Arc::new(self)))
1225 }
1226}
1227
1228pub struct Erased<T>(T);
1229
1230pub struct AgentToolOutput {
1231 pub llm_output: LanguageModelToolResultContent,
1232 pub raw_output: serde_json::Value,
1233}
1234
1235pub trait AnyAgentTool {
1236 fn name(&self) -> SharedString;
1237 fn description(&self) -> SharedString;
1238 fn kind(&self) -> acp::ToolKind;
1239 fn initial_title(&self, input: serde_json::Value) -> SharedString;
1240 fn input_schema(&self, format: LanguageModelToolSchemaFormat) -> Result<serde_json::Value>;
1241 fn supported_provider(&self, _provider: &LanguageModelProviderId) -> bool {
1242 true
1243 }
1244 fn run(
1245 self: Arc<Self>,
1246 input: serde_json::Value,
1247 event_stream: ToolCallEventStream,
1248 cx: &mut App,
1249 ) -> Task<Result<AgentToolOutput>>;
1250}
1251
1252impl<T> AnyAgentTool for Erased<Arc<T>>
1253where
1254 T: AgentTool,
1255{
1256 fn name(&self) -> SharedString {
1257 self.0.name()
1258 }
1259
1260 fn description(&self) -> SharedString {
1261 self.0.description()
1262 }
1263
1264 fn kind(&self) -> agent_client_protocol::ToolKind {
1265 self.0.kind()
1266 }
1267
1268 fn initial_title(&self, input: serde_json::Value) -> SharedString {
1269 let parsed_input = serde_json::from_value(input.clone()).map_err(|_| input);
1270 self.0.initial_title(parsed_input)
1271 }
1272
1273 fn input_schema(&self, format: LanguageModelToolSchemaFormat) -> Result<serde_json::Value> {
1274 let mut json = serde_json::to_value(self.0.input_schema())?;
1275 adapt_schema_to_format(&mut json, format)?;
1276 Ok(json)
1277 }
1278
1279 fn supported_provider(&self, provider: &LanguageModelProviderId) -> bool {
1280 self.0.supported_provider(provider)
1281 }
1282
1283 fn run(
1284 self: Arc<Self>,
1285 input: serde_json::Value,
1286 event_stream: ToolCallEventStream,
1287 cx: &mut App,
1288 ) -> Task<Result<AgentToolOutput>> {
1289 cx.spawn(async move |cx| {
1290 let input = serde_json::from_value(input)?;
1291 let output = cx
1292 .update(|cx| self.0.clone().run(input, event_stream, cx))?
1293 .await?;
1294 let raw_output = serde_json::to_value(&output)?;
1295 Ok(AgentToolOutput {
1296 llm_output: output.into(),
1297 raw_output,
1298 })
1299 })
1300 }
1301}
1302
1303#[derive(Clone)]
1304struct AgentResponseEventStream(mpsc::UnboundedSender<Result<AgentResponseEvent>>);
1305
1306impl AgentResponseEventStream {
1307 fn send_text(&self, text: &str) {
1308 self.0
1309 .unbounded_send(Ok(AgentResponseEvent::Text(text.to_string())))
1310 .ok();
1311 }
1312
1313 fn send_thinking(&self, text: &str) {
1314 self.0
1315 .unbounded_send(Ok(AgentResponseEvent::Thinking(text.to_string())))
1316 .ok();
1317 }
1318
1319 fn send_tool_call(
1320 &self,
1321 id: &LanguageModelToolUseId,
1322 title: SharedString,
1323 kind: acp::ToolKind,
1324 input: serde_json::Value,
1325 ) {
1326 self.0
1327 .unbounded_send(Ok(AgentResponseEvent::ToolCall(Self::initial_tool_call(
1328 id,
1329 title.to_string(),
1330 kind,
1331 input,
1332 ))))
1333 .ok();
1334 }
1335
1336 fn initial_tool_call(
1337 id: &LanguageModelToolUseId,
1338 title: String,
1339 kind: acp::ToolKind,
1340 input: serde_json::Value,
1341 ) -> acp::ToolCall {
1342 acp::ToolCall {
1343 id: acp::ToolCallId(id.to_string().into()),
1344 title,
1345 kind,
1346 status: acp::ToolCallStatus::Pending,
1347 content: vec![],
1348 locations: vec![],
1349 raw_input: Some(input),
1350 raw_output: None,
1351 }
1352 }
1353
1354 fn update_tool_call_fields(
1355 &self,
1356 tool_use_id: &LanguageModelToolUseId,
1357 fields: acp::ToolCallUpdateFields,
1358 ) {
1359 self.0
1360 .unbounded_send(Ok(AgentResponseEvent::ToolCallUpdate(
1361 acp::ToolCallUpdate {
1362 id: acp::ToolCallId(tool_use_id.to_string().into()),
1363 fields,
1364 }
1365 .into(),
1366 )))
1367 .ok();
1368 }
1369
1370 fn send_stop(&self, reason: StopReason) {
1371 match reason {
1372 StopReason::EndTurn => {
1373 self.0
1374 .unbounded_send(Ok(AgentResponseEvent::Stop(acp::StopReason::EndTurn)))
1375 .ok();
1376 }
1377 StopReason::MaxTokens => {
1378 self.0
1379 .unbounded_send(Ok(AgentResponseEvent::Stop(acp::StopReason::MaxTokens)))
1380 .ok();
1381 }
1382 StopReason::Refusal => {
1383 self.0
1384 .unbounded_send(Ok(AgentResponseEvent::Stop(acp::StopReason::Refusal)))
1385 .ok();
1386 }
1387 StopReason::ToolUse => {}
1388 }
1389 }
1390
1391 fn send_canceled(&self) {
1392 self.0
1393 .unbounded_send(Ok(AgentResponseEvent::Stop(acp::StopReason::Canceled)))
1394 .ok();
1395 }
1396
1397 fn send_error(&self, error: impl Into<anyhow::Error>) {
1398 self.0.unbounded_send(Err(error.into())).ok();
1399 }
1400}
1401
1402#[derive(Clone)]
1403pub struct ToolCallEventStream {
1404 tool_use_id: LanguageModelToolUseId,
1405 stream: AgentResponseEventStream,
1406 fs: Option<Arc<dyn Fs>>,
1407}
1408
1409impl ToolCallEventStream {
1410 #[cfg(test)]
1411 pub fn test() -> (Self, ToolCallEventStreamReceiver) {
1412 let (events_tx, events_rx) = mpsc::unbounded::<Result<AgentResponseEvent>>();
1413
1414 let stream =
1415 ToolCallEventStream::new("test_id".into(), AgentResponseEventStream(events_tx), None);
1416
1417 (stream, ToolCallEventStreamReceiver(events_rx))
1418 }
1419
1420 fn new(
1421 tool_use_id: LanguageModelToolUseId,
1422 stream: AgentResponseEventStream,
1423 fs: Option<Arc<dyn Fs>>,
1424 ) -> Self {
1425 Self {
1426 tool_use_id,
1427 stream,
1428 fs,
1429 }
1430 }
1431
1432 pub fn update_fields(&self, fields: acp::ToolCallUpdateFields) {
1433 self.stream
1434 .update_tool_call_fields(&self.tool_use_id, fields);
1435 }
1436
1437 pub fn update_diff(&self, diff: Entity<acp_thread::Diff>) {
1438 self.stream
1439 .0
1440 .unbounded_send(Ok(AgentResponseEvent::ToolCallUpdate(
1441 acp_thread::ToolCallUpdateDiff {
1442 id: acp::ToolCallId(self.tool_use_id.to_string().into()),
1443 diff,
1444 }
1445 .into(),
1446 )))
1447 .ok();
1448 }
1449
1450 pub fn update_terminal(&self, terminal: Entity<acp_thread::Terminal>) {
1451 self.stream
1452 .0
1453 .unbounded_send(Ok(AgentResponseEvent::ToolCallUpdate(
1454 acp_thread::ToolCallUpdateTerminal {
1455 id: acp::ToolCallId(self.tool_use_id.to_string().into()),
1456 terminal,
1457 }
1458 .into(),
1459 )))
1460 .ok();
1461 }
1462
1463 pub fn authorize(&self, title: impl Into<String>, cx: &mut App) -> Task<Result<()>> {
1464 if agent_settings::AgentSettings::get_global(cx).always_allow_tool_actions {
1465 return Task::ready(Ok(()));
1466 }
1467
1468 let (response_tx, response_rx) = oneshot::channel();
1469 self.stream
1470 .0
1471 .unbounded_send(Ok(AgentResponseEvent::ToolCallAuthorization(
1472 ToolCallAuthorization {
1473 tool_call: acp::ToolCallUpdate {
1474 id: acp::ToolCallId(self.tool_use_id.to_string().into()),
1475 fields: acp::ToolCallUpdateFields {
1476 title: Some(title.into()),
1477 ..Default::default()
1478 },
1479 },
1480 options: vec![
1481 acp::PermissionOption {
1482 id: acp::PermissionOptionId("always_allow".into()),
1483 name: "Always Allow".into(),
1484 kind: acp::PermissionOptionKind::AllowAlways,
1485 },
1486 acp::PermissionOption {
1487 id: acp::PermissionOptionId("allow".into()),
1488 name: "Allow".into(),
1489 kind: acp::PermissionOptionKind::AllowOnce,
1490 },
1491 acp::PermissionOption {
1492 id: acp::PermissionOptionId("deny".into()),
1493 name: "Deny".into(),
1494 kind: acp::PermissionOptionKind::RejectOnce,
1495 },
1496 ],
1497 response: response_tx,
1498 },
1499 )))
1500 .ok();
1501 let fs = self.fs.clone();
1502 cx.spawn(async move |cx| match response_rx.await?.0.as_ref() {
1503 "always_allow" => {
1504 if let Some(fs) = fs.clone() {
1505 cx.update(|cx| {
1506 update_settings_file::<AgentSettings>(fs, cx, |settings, _| {
1507 settings.set_always_allow_tool_actions(true);
1508 });
1509 })?;
1510 }
1511
1512 Ok(())
1513 }
1514 "allow" => Ok(()),
1515 _ => Err(anyhow!("Permission to run tool denied by user")),
1516 })
1517 }
1518}
1519
1520#[cfg(test)]
1521pub struct ToolCallEventStreamReceiver(mpsc::UnboundedReceiver<Result<AgentResponseEvent>>);
1522
1523#[cfg(test)]
1524impl ToolCallEventStreamReceiver {
1525 pub async fn expect_authorization(&mut self) -> ToolCallAuthorization {
1526 let event = self.0.next().await;
1527 if let Some(Ok(AgentResponseEvent::ToolCallAuthorization(auth))) = event {
1528 auth
1529 } else {
1530 panic!("Expected ToolCallAuthorization but got: {:?}", event);
1531 }
1532 }
1533
1534 pub async fn expect_terminal(&mut self) -> Entity<acp_thread::Terminal> {
1535 let event = self.0.next().await;
1536 if let Some(Ok(AgentResponseEvent::ToolCallUpdate(
1537 acp_thread::ToolCallUpdate::UpdateTerminal(update),
1538 ))) = event
1539 {
1540 update.terminal
1541 } else {
1542 panic!("Expected terminal but got: {:?}", event);
1543 }
1544 }
1545}
1546
1547#[cfg(test)]
1548impl std::ops::Deref for ToolCallEventStreamReceiver {
1549 type Target = mpsc::UnboundedReceiver<Result<AgentResponseEvent>>;
1550
1551 fn deref(&self) -> &Self::Target {
1552 &self.0
1553 }
1554}
1555
1556#[cfg(test)]
1557impl std::ops::DerefMut for ToolCallEventStreamReceiver {
1558 fn deref_mut(&mut self) -> &mut Self::Target {
1559 &mut self.0
1560 }
1561}
1562
1563impl From<&str> for UserMessageContent {
1564 fn from(text: &str) -> Self {
1565 Self::Text(text.into())
1566 }
1567}
1568
1569impl From<acp::ContentBlock> for UserMessageContent {
1570 fn from(value: acp::ContentBlock) -> Self {
1571 match value {
1572 acp::ContentBlock::Text(text_content) => Self::Text(text_content.text),
1573 acp::ContentBlock::Image(image_content) => Self::Image(convert_image(image_content)),
1574 acp::ContentBlock::Audio(_) => {
1575 // TODO
1576 Self::Text("[audio]".to_string())
1577 }
1578 acp::ContentBlock::ResourceLink(resource_link) => {
1579 match MentionUri::parse(&resource_link.uri) {
1580 Ok(uri) => Self::Mention {
1581 uri,
1582 content: String::new(),
1583 },
1584 Err(err) => {
1585 log::error!("Failed to parse mention link: {}", err);
1586 Self::Text(format!("[{}]({})", resource_link.name, resource_link.uri))
1587 }
1588 }
1589 }
1590 acp::ContentBlock::Resource(resource) => match resource.resource {
1591 acp::EmbeddedResourceResource::TextResourceContents(resource) => {
1592 match MentionUri::parse(&resource.uri) {
1593 Ok(uri) => Self::Mention {
1594 uri,
1595 content: resource.text,
1596 },
1597 Err(err) => {
1598 log::error!("Failed to parse mention link: {}", err);
1599 Self::Text(
1600 MarkdownCodeBlock {
1601 tag: &resource.uri,
1602 text: &resource.text,
1603 }
1604 .to_string(),
1605 )
1606 }
1607 }
1608 }
1609 acp::EmbeddedResourceResource::BlobResourceContents(_) => {
1610 // TODO
1611 Self::Text("[blob]".to_string())
1612 }
1613 },
1614 }
1615 }
1616}
1617
1618fn convert_image(image_content: acp::ImageContent) -> LanguageModelImage {
1619 LanguageModelImage {
1620 source: image_content.data.into(),
1621 // TODO: make this optional?
1622 size: gpui::Size::new(0.into(), 0.into()),
1623 }
1624}