1mod connection;
2mod diff;
3mod mention;
4mod terminal;
5
6use ::terminal::terminal_settings::TerminalSettings;
7use agent_settings::AgentSettings;
8use collections::HashSet;
9pub use connection::*;
10pub use diff::*;
11use language::language_settings::FormatOnSave;
12pub use mention::*;
13use project::lsp_store::{FormatTrigger, LspFormatTarget};
14use serde::{Deserialize, Serialize};
15use settings::{Settings as _, SettingsLocation};
16use task::{Shell, ShellBuilder};
17pub use terminal::*;
18
19use action_log::ActionLog;
20use agent_client_protocol::{self as acp};
21use anyhow::{Context as _, Result, anyhow};
22use editor::Bias;
23use futures::{FutureExt, channel::oneshot, future::BoxFuture};
24use gpui::{AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Task, WeakEntity};
25use itertools::Itertools;
26use language::{Anchor, Buffer, BufferSnapshot, LanguageRegistry, Point, ToPoint, text_diff};
27use markdown::Markdown;
28use project::{AgentLocation, Project, git_store::GitStoreCheckpoint};
29use std::collections::HashMap;
30use std::error::Error;
31use std::fmt::{Formatter, Write};
32use std::ops::Range;
33use std::process::ExitStatus;
34use std::rc::Rc;
35use std::time::{Duration, Instant};
36use std::{fmt::Display, mem, path::PathBuf, sync::Arc};
37use ui::App;
38use util::{ResultExt, get_default_system_shell_preferring_bash, paths::PathStyle};
39use uuid::Uuid;
40
41#[derive(Debug)]
42pub struct UserMessage {
43 pub id: Option<UserMessageId>,
44 pub content: ContentBlock,
45 pub chunks: Vec<acp::ContentBlock>,
46 pub checkpoint: Option<Checkpoint>,
47}
48
49#[derive(Debug)]
50pub struct Checkpoint {
51 git_checkpoint: GitStoreCheckpoint,
52 pub show: bool,
53}
54
55impl UserMessage {
56 fn to_markdown(&self, cx: &App) -> String {
57 let mut markdown = String::new();
58 if self
59 .checkpoint
60 .as_ref()
61 .is_some_and(|checkpoint| checkpoint.show)
62 {
63 writeln!(markdown, "## User (checkpoint)").unwrap();
64 } else {
65 writeln!(markdown, "## User").unwrap();
66 }
67 writeln!(markdown).unwrap();
68 writeln!(markdown, "{}", self.content.to_markdown(cx)).unwrap();
69 writeln!(markdown).unwrap();
70 markdown
71 }
72}
73
74#[derive(Debug, PartialEq)]
75pub struct AssistantMessage {
76 pub chunks: Vec<AssistantMessageChunk>,
77}
78
79impl AssistantMessage {
80 pub fn to_markdown(&self, cx: &App) -> String {
81 format!(
82 "## Assistant\n\n{}\n\n",
83 self.chunks
84 .iter()
85 .map(|chunk| chunk.to_markdown(cx))
86 .join("\n\n")
87 )
88 }
89}
90
91#[derive(Debug, PartialEq)]
92pub enum AssistantMessageChunk {
93 Message { block: ContentBlock },
94 Thought { block: ContentBlock },
95}
96
97impl AssistantMessageChunk {
98 pub fn from_str(
99 chunk: &str,
100 language_registry: &Arc<LanguageRegistry>,
101 path_style: PathStyle,
102 cx: &mut App,
103 ) -> Self {
104 Self::Message {
105 block: ContentBlock::new(chunk.into(), language_registry, path_style, cx),
106 }
107 }
108
109 fn to_markdown(&self, cx: &App) -> String {
110 match self {
111 Self::Message { block } => block.to_markdown(cx).to_string(),
112 Self::Thought { block } => {
113 format!("<thinking>\n{}\n</thinking>", block.to_markdown(cx))
114 }
115 }
116 }
117}
118
119#[derive(Debug)]
120pub enum AgentThreadEntry {
121 UserMessage(UserMessage),
122 AssistantMessage(AssistantMessage),
123 ToolCall(ToolCall),
124}
125
126impl AgentThreadEntry {
127 pub fn to_markdown(&self, cx: &App) -> String {
128 match self {
129 Self::UserMessage(message) => message.to_markdown(cx),
130 Self::AssistantMessage(message) => message.to_markdown(cx),
131 Self::ToolCall(tool_call) => tool_call.to_markdown(cx),
132 }
133 }
134
135 pub fn user_message(&self) -> Option<&UserMessage> {
136 if let AgentThreadEntry::UserMessage(message) = self {
137 Some(message)
138 } else {
139 None
140 }
141 }
142
143 pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
144 if let AgentThreadEntry::ToolCall(call) = self {
145 itertools::Either::Left(call.diffs())
146 } else {
147 itertools::Either::Right(std::iter::empty())
148 }
149 }
150
151 pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
152 if let AgentThreadEntry::ToolCall(call) = self {
153 itertools::Either::Left(call.terminals())
154 } else {
155 itertools::Either::Right(std::iter::empty())
156 }
157 }
158
159 pub fn location(&self, ix: usize) -> Option<(acp::ToolCallLocation, AgentLocation)> {
160 if let AgentThreadEntry::ToolCall(ToolCall {
161 locations,
162 resolved_locations,
163 ..
164 }) = self
165 {
166 Some((
167 locations.get(ix)?.clone(),
168 resolved_locations.get(ix)?.clone()?,
169 ))
170 } else {
171 None
172 }
173 }
174}
175
176#[derive(Debug)]
177pub struct ToolCall {
178 pub id: acp::ToolCallId,
179 pub label: Entity<Markdown>,
180 pub kind: acp::ToolKind,
181 pub content: Vec<ToolCallContent>,
182 pub status: ToolCallStatus,
183 pub locations: Vec<acp::ToolCallLocation>,
184 pub resolved_locations: Vec<Option<AgentLocation>>,
185 pub raw_input: Option<serde_json::Value>,
186 pub raw_output: Option<serde_json::Value>,
187}
188
189impl ToolCall {
190 fn from_acp(
191 tool_call: acp::ToolCall,
192 status: ToolCallStatus,
193 language_registry: Arc<LanguageRegistry>,
194 path_style: PathStyle,
195 terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
196 cx: &mut App,
197 ) -> Result<Self> {
198 let title = if let Some((first_line, _)) = tool_call.title.split_once("\n") {
199 first_line.to_owned() + "…"
200 } else {
201 tool_call.title
202 };
203 let mut content = Vec::with_capacity(tool_call.content.len());
204 for item in tool_call.content {
205 content.push(ToolCallContent::from_acp(
206 item,
207 language_registry.clone(),
208 path_style,
209 terminals,
210 cx,
211 )?);
212 }
213
214 let result = Self {
215 id: tool_call.id,
216 label: cx
217 .new(|cx| Markdown::new(title.into(), Some(language_registry.clone()), None, cx)),
218 kind: tool_call.kind,
219 content,
220 locations: tool_call.locations,
221 resolved_locations: Vec::default(),
222 status,
223 raw_input: tool_call.raw_input,
224 raw_output: tool_call.raw_output,
225 };
226 Ok(result)
227 }
228
229 fn update_fields(
230 &mut self,
231 fields: acp::ToolCallUpdateFields,
232 language_registry: Arc<LanguageRegistry>,
233 path_style: PathStyle,
234 terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
235 cx: &mut App,
236 ) -> Result<()> {
237 let acp::ToolCallUpdateFields {
238 kind,
239 status,
240 title,
241 content,
242 locations,
243 raw_input,
244 raw_output,
245 } = fields;
246
247 if let Some(kind) = kind {
248 self.kind = kind;
249 }
250
251 if let Some(status) = status {
252 self.status = status.into();
253 }
254
255 if let Some(title) = title {
256 self.label.update(cx, |label, cx| {
257 if let Some((first_line, _)) = title.split_once("\n") {
258 label.replace(first_line.to_owned() + "…", cx)
259 } else {
260 label.replace(title, cx);
261 }
262 });
263 }
264
265 if let Some(content) = content {
266 let new_content_len = content.len();
267 let mut content = content.into_iter();
268
269 // Reuse existing content if we can
270 for (old, new) in self.content.iter_mut().zip(content.by_ref()) {
271 old.update_from_acp(new, language_registry.clone(), path_style, terminals, cx)?;
272 }
273 for new in content {
274 self.content.push(ToolCallContent::from_acp(
275 new,
276 language_registry.clone(),
277 path_style,
278 terminals,
279 cx,
280 )?)
281 }
282 self.content.truncate(new_content_len);
283 }
284
285 if let Some(locations) = locations {
286 self.locations = locations;
287 }
288
289 if let Some(raw_input) = raw_input {
290 self.raw_input = Some(raw_input);
291 }
292
293 if let Some(raw_output) = raw_output {
294 if self.content.is_empty()
295 && let Some(markdown) = markdown_for_raw_output(&raw_output, &language_registry, cx)
296 {
297 self.content
298 .push(ToolCallContent::ContentBlock(ContentBlock::Markdown {
299 markdown,
300 }));
301 }
302 self.raw_output = Some(raw_output);
303 }
304 Ok(())
305 }
306
307 pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
308 self.content.iter().filter_map(|content| match content {
309 ToolCallContent::Diff(diff) => Some(diff),
310 ToolCallContent::ContentBlock(_) => None,
311 ToolCallContent::Terminal(_) => None,
312 })
313 }
314
315 pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
316 self.content.iter().filter_map(|content| match content {
317 ToolCallContent::Terminal(terminal) => Some(terminal),
318 ToolCallContent::ContentBlock(_) => None,
319 ToolCallContent::Diff(_) => None,
320 })
321 }
322
323 fn to_markdown(&self, cx: &App) -> String {
324 let mut markdown = format!(
325 "**Tool Call: {}**\nStatus: {}\n\n",
326 self.label.read(cx).source(),
327 self.status
328 );
329 for content in &self.content {
330 markdown.push_str(content.to_markdown(cx).as_str());
331 markdown.push_str("\n\n");
332 }
333 markdown
334 }
335
336 async fn resolve_location(
337 location: acp::ToolCallLocation,
338 project: WeakEntity<Project>,
339 cx: &mut AsyncApp,
340 ) -> Option<ResolvedLocation> {
341 let buffer = project
342 .update(cx, |project, cx| {
343 project
344 .project_path_for_absolute_path(&location.path, cx)
345 .map(|path| project.open_buffer(path, cx))
346 })
347 .ok()??;
348 let buffer = buffer.await.log_err()?;
349 let position = buffer
350 .update(cx, |buffer, _| {
351 if let Some(row) = location.line {
352 let snapshot = buffer.snapshot();
353 let column = snapshot.indent_size_for_line(row).len;
354 let point = snapshot.clip_point(Point::new(row, column), Bias::Left);
355 snapshot.anchor_before(point)
356 } else {
357 Anchor::MIN
358 }
359 })
360 .ok()?;
361
362 Some(ResolvedLocation { buffer, position })
363 }
364
365 fn resolve_locations(
366 &self,
367 project: Entity<Project>,
368 cx: &mut App,
369 ) -> Task<Vec<Option<ResolvedLocation>>> {
370 let locations = self.locations.clone();
371 project.update(cx, |_, cx| {
372 cx.spawn(async move |project, cx| {
373 let mut new_locations = Vec::new();
374 for location in locations {
375 new_locations.push(Self::resolve_location(location, project.clone(), cx).await);
376 }
377 new_locations
378 })
379 })
380 }
381}
382
383// Separate so we can hold a strong reference to the buffer
384// for saving on the thread
385#[derive(Clone, Debug, PartialEq, Eq)]
386struct ResolvedLocation {
387 buffer: Entity<Buffer>,
388 position: Anchor,
389}
390
391impl From<&ResolvedLocation> for AgentLocation {
392 fn from(value: &ResolvedLocation) -> Self {
393 Self {
394 buffer: value.buffer.downgrade(),
395 position: value.position,
396 }
397 }
398}
399
400#[derive(Debug)]
401pub enum ToolCallStatus {
402 /// The tool call hasn't started running yet, but we start showing it to
403 /// the user.
404 Pending,
405 /// The tool call is waiting for confirmation from the user.
406 WaitingForConfirmation {
407 options: Vec<acp::PermissionOption>,
408 respond_tx: oneshot::Sender<acp::PermissionOptionId>,
409 },
410 /// The tool call is currently running.
411 InProgress,
412 /// The tool call completed successfully.
413 Completed,
414 /// The tool call failed.
415 Failed,
416 /// The user rejected the tool call.
417 Rejected,
418 /// The user canceled generation so the tool call was canceled.
419 Canceled,
420}
421
422impl From<acp::ToolCallStatus> for ToolCallStatus {
423 fn from(status: acp::ToolCallStatus) -> Self {
424 match status {
425 acp::ToolCallStatus::Pending => Self::Pending,
426 acp::ToolCallStatus::InProgress => Self::InProgress,
427 acp::ToolCallStatus::Completed => Self::Completed,
428 acp::ToolCallStatus::Failed => Self::Failed,
429 }
430 }
431}
432
433impl Display for ToolCallStatus {
434 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
435 write!(
436 f,
437 "{}",
438 match self {
439 ToolCallStatus::Pending => "Pending",
440 ToolCallStatus::WaitingForConfirmation { .. } => "Waiting for confirmation",
441 ToolCallStatus::InProgress => "In Progress",
442 ToolCallStatus::Completed => "Completed",
443 ToolCallStatus::Failed => "Failed",
444 ToolCallStatus::Rejected => "Rejected",
445 ToolCallStatus::Canceled => "Canceled",
446 }
447 )
448 }
449}
450
451#[derive(Debug, PartialEq, Clone)]
452pub enum ContentBlock {
453 Empty,
454 Markdown { markdown: Entity<Markdown> },
455 ResourceLink { resource_link: acp::ResourceLink },
456}
457
458impl ContentBlock {
459 pub fn new(
460 block: acp::ContentBlock,
461 language_registry: &Arc<LanguageRegistry>,
462 path_style: PathStyle,
463 cx: &mut App,
464 ) -> Self {
465 let mut this = Self::Empty;
466 this.append(block, language_registry, path_style, cx);
467 this
468 }
469
470 pub fn new_combined(
471 blocks: impl IntoIterator<Item = acp::ContentBlock>,
472 language_registry: Arc<LanguageRegistry>,
473 path_style: PathStyle,
474 cx: &mut App,
475 ) -> Self {
476 let mut this = Self::Empty;
477 for block in blocks {
478 this.append(block, &language_registry, path_style, cx);
479 }
480 this
481 }
482
483 pub fn append(
484 &mut self,
485 block: acp::ContentBlock,
486 language_registry: &Arc<LanguageRegistry>,
487 path_style: PathStyle,
488 cx: &mut App,
489 ) {
490 if matches!(self, ContentBlock::Empty)
491 && let acp::ContentBlock::ResourceLink(resource_link) = block
492 {
493 *self = ContentBlock::ResourceLink { resource_link };
494 return;
495 }
496
497 let new_content = self.block_string_contents(block, path_style);
498
499 match self {
500 ContentBlock::Empty => {
501 *self = Self::create_markdown_block(new_content, language_registry, cx);
502 }
503 ContentBlock::Markdown { markdown } => {
504 markdown.update(cx, |markdown, cx| markdown.append(&new_content, cx));
505 }
506 ContentBlock::ResourceLink { resource_link } => {
507 let existing_content = Self::resource_link_md(&resource_link.uri, path_style);
508 let combined = format!("{}\n{}", existing_content, new_content);
509
510 *self = Self::create_markdown_block(combined, language_registry, cx);
511 }
512 }
513 }
514
515 fn create_markdown_block(
516 content: String,
517 language_registry: &Arc<LanguageRegistry>,
518 cx: &mut App,
519 ) -> ContentBlock {
520 ContentBlock::Markdown {
521 markdown: cx
522 .new(|cx| Markdown::new(content.into(), Some(language_registry.clone()), None, cx)),
523 }
524 }
525
526 fn block_string_contents(&self, block: acp::ContentBlock, path_style: PathStyle) -> String {
527 match block {
528 acp::ContentBlock::Text(text_content) => text_content.text,
529 acp::ContentBlock::ResourceLink(resource_link) => {
530 Self::resource_link_md(&resource_link.uri, path_style)
531 }
532 acp::ContentBlock::Resource(acp::EmbeddedResource {
533 resource:
534 acp::EmbeddedResourceResource::TextResourceContents(acp::TextResourceContents {
535 uri,
536 ..
537 }),
538 ..
539 }) => Self::resource_link_md(&uri, path_style),
540 acp::ContentBlock::Image(image) => Self::image_md(&image),
541 acp::ContentBlock::Audio(_) | acp::ContentBlock::Resource(_) => String::new(),
542 }
543 }
544
545 fn resource_link_md(uri: &str, path_style: PathStyle) -> String {
546 if let Some(uri) = MentionUri::parse(uri, path_style).log_err() {
547 uri.as_link().to_string()
548 } else {
549 uri.to_string()
550 }
551 }
552
553 fn image_md(_image: &acp::ImageContent) -> String {
554 "`Image`".into()
555 }
556
557 pub fn to_markdown<'a>(&'a self, cx: &'a App) -> &'a str {
558 match self {
559 ContentBlock::Empty => "",
560 ContentBlock::Markdown { markdown } => markdown.read(cx).source(),
561 ContentBlock::ResourceLink { resource_link } => &resource_link.uri,
562 }
563 }
564
565 pub fn markdown(&self) -> Option<&Entity<Markdown>> {
566 match self {
567 ContentBlock::Empty => None,
568 ContentBlock::Markdown { markdown } => Some(markdown),
569 ContentBlock::ResourceLink { .. } => None,
570 }
571 }
572
573 pub fn resource_link(&self) -> Option<&acp::ResourceLink> {
574 match self {
575 ContentBlock::ResourceLink { resource_link } => Some(resource_link),
576 _ => None,
577 }
578 }
579}
580
581#[derive(Debug)]
582pub enum ToolCallContent {
583 ContentBlock(ContentBlock),
584 Diff(Entity<Diff>),
585 Terminal(Entity<Terminal>),
586}
587
588impl ToolCallContent {
589 pub fn from_acp(
590 content: acp::ToolCallContent,
591 language_registry: Arc<LanguageRegistry>,
592 path_style: PathStyle,
593 terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
594 cx: &mut App,
595 ) -> Result<Self> {
596 match content {
597 acp::ToolCallContent::Content { content } => Ok(Self::ContentBlock(ContentBlock::new(
598 content,
599 &language_registry,
600 path_style,
601 cx,
602 ))),
603 acp::ToolCallContent::Diff { diff } => Ok(Self::Diff(cx.new(|cx| {
604 Diff::finalized(
605 diff.path.to_string_lossy().into_owned(),
606 diff.old_text,
607 diff.new_text,
608 language_registry,
609 cx,
610 )
611 }))),
612 acp::ToolCallContent::Terminal { terminal_id } => terminals
613 .get(&terminal_id)
614 .cloned()
615 .map(Self::Terminal)
616 .ok_or_else(|| anyhow::anyhow!("Terminal with id `{}` not found", terminal_id)),
617 }
618 }
619
620 pub fn update_from_acp(
621 &mut self,
622 new: acp::ToolCallContent,
623 language_registry: Arc<LanguageRegistry>,
624 path_style: PathStyle,
625 terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
626 cx: &mut App,
627 ) -> Result<()> {
628 let needs_update = match (&self, &new) {
629 (Self::Diff(old_diff), acp::ToolCallContent::Diff { diff: new_diff }) => {
630 old_diff.read(cx).needs_update(
631 new_diff.old_text.as_deref().unwrap_or(""),
632 &new_diff.new_text,
633 cx,
634 )
635 }
636 _ => true,
637 };
638
639 if needs_update {
640 *self = Self::from_acp(new, language_registry, path_style, terminals, cx)?;
641 }
642 Ok(())
643 }
644
645 pub fn to_markdown(&self, cx: &App) -> String {
646 match self {
647 Self::ContentBlock(content) => content.to_markdown(cx).to_string(),
648 Self::Diff(diff) => diff.read(cx).to_markdown(cx),
649 Self::Terminal(terminal) => terminal.read(cx).to_markdown(cx),
650 }
651 }
652}
653
654#[derive(Debug, PartialEq)]
655pub enum ToolCallUpdate {
656 UpdateFields(acp::ToolCallUpdate),
657 UpdateDiff(ToolCallUpdateDiff),
658 UpdateTerminal(ToolCallUpdateTerminal),
659}
660
661impl ToolCallUpdate {
662 fn id(&self) -> &acp::ToolCallId {
663 match self {
664 Self::UpdateFields(update) => &update.id,
665 Self::UpdateDiff(diff) => &diff.id,
666 Self::UpdateTerminal(terminal) => &terminal.id,
667 }
668 }
669}
670
671impl From<acp::ToolCallUpdate> for ToolCallUpdate {
672 fn from(update: acp::ToolCallUpdate) -> Self {
673 Self::UpdateFields(update)
674 }
675}
676
677impl From<ToolCallUpdateDiff> for ToolCallUpdate {
678 fn from(diff: ToolCallUpdateDiff) -> Self {
679 Self::UpdateDiff(diff)
680 }
681}
682
683#[derive(Debug, PartialEq)]
684pub struct ToolCallUpdateDiff {
685 pub id: acp::ToolCallId,
686 pub diff: Entity<Diff>,
687}
688
689impl From<ToolCallUpdateTerminal> for ToolCallUpdate {
690 fn from(terminal: ToolCallUpdateTerminal) -> Self {
691 Self::UpdateTerminal(terminal)
692 }
693}
694
695#[derive(Debug, PartialEq)]
696pub struct ToolCallUpdateTerminal {
697 pub id: acp::ToolCallId,
698 pub terminal: Entity<Terminal>,
699}
700
701#[derive(Debug, Default)]
702pub struct Plan {
703 pub entries: Vec<PlanEntry>,
704}
705
706#[derive(Debug)]
707pub struct PlanStats<'a> {
708 pub in_progress_entry: Option<&'a PlanEntry>,
709 pub pending: u32,
710 pub completed: u32,
711}
712
713impl Plan {
714 pub fn is_empty(&self) -> bool {
715 self.entries.is_empty()
716 }
717
718 pub fn stats(&self) -> PlanStats<'_> {
719 let mut stats = PlanStats {
720 in_progress_entry: None,
721 pending: 0,
722 completed: 0,
723 };
724
725 for entry in &self.entries {
726 match &entry.status {
727 acp::PlanEntryStatus::Pending => {
728 stats.pending += 1;
729 }
730 acp::PlanEntryStatus::InProgress => {
731 stats.in_progress_entry = stats.in_progress_entry.or(Some(entry));
732 }
733 acp::PlanEntryStatus::Completed => {
734 stats.completed += 1;
735 }
736 }
737 }
738
739 stats
740 }
741}
742
743#[derive(Debug)]
744pub struct PlanEntry {
745 pub content: Entity<Markdown>,
746 pub priority: acp::PlanEntryPriority,
747 pub status: acp::PlanEntryStatus,
748}
749
750impl PlanEntry {
751 pub fn from_acp(entry: acp::PlanEntry, cx: &mut App) -> Self {
752 Self {
753 content: cx.new(|cx| Markdown::new(entry.content.into(), None, None, cx)),
754 priority: entry.priority,
755 status: entry.status,
756 }
757 }
758}
759
760#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
761pub struct TokenUsage {
762 pub max_tokens: u64,
763 pub used_tokens: u64,
764}
765
766impl TokenUsage {
767 pub fn ratio(&self) -> TokenUsageRatio {
768 #[cfg(debug_assertions)]
769 let warning_threshold: f32 = std::env::var("ZED_THREAD_WARNING_THRESHOLD")
770 .unwrap_or("0.8".to_string())
771 .parse()
772 .unwrap();
773 #[cfg(not(debug_assertions))]
774 let warning_threshold: f32 = 0.8;
775
776 // When the maximum is unknown because there is no selected model,
777 // avoid showing the token limit warning.
778 if self.max_tokens == 0 {
779 TokenUsageRatio::Normal
780 } else if self.used_tokens >= self.max_tokens {
781 TokenUsageRatio::Exceeded
782 } else if self.used_tokens as f32 / self.max_tokens as f32 >= warning_threshold {
783 TokenUsageRatio::Warning
784 } else {
785 TokenUsageRatio::Normal
786 }
787 }
788}
789
790#[derive(Debug, Clone, PartialEq, Eq)]
791pub enum TokenUsageRatio {
792 Normal,
793 Warning,
794 Exceeded,
795}
796
797#[derive(Debug, Clone)]
798pub struct RetryStatus {
799 pub last_error: SharedString,
800 pub attempt: usize,
801 pub max_attempts: usize,
802 pub started_at: Instant,
803 pub duration: Duration,
804}
805
806pub struct AcpThread {
807 title: SharedString,
808 entries: Vec<AgentThreadEntry>,
809 plan: Plan,
810 project: Entity<Project>,
811 action_log: Entity<ActionLog>,
812 shared_buffers: HashMap<Entity<Buffer>, BufferSnapshot>,
813 send_task: Option<Task<()>>,
814 connection: Rc<dyn AgentConnection>,
815 session_id: acp::SessionId,
816 token_usage: Option<TokenUsage>,
817 prompt_capabilities: acp::PromptCapabilities,
818 _observe_prompt_capabilities: Task<anyhow::Result<()>>,
819 terminals: HashMap<acp::TerminalId, Entity<Terminal>>,
820 pending_terminal_output: HashMap<acp::TerminalId, Vec<Vec<u8>>>,
821 pending_terminal_exit: HashMap<acp::TerminalId, acp::TerminalExitStatus>,
822}
823
824#[derive(Debug)]
825pub enum AcpThreadEvent {
826 NewEntry,
827 TitleUpdated,
828 TokenUsageUpdated,
829 EntryUpdated(usize),
830 EntriesRemoved(Range<usize>),
831 ToolAuthorizationRequired,
832 Retry(RetryStatus),
833 Stopped,
834 Error,
835 LoadError(LoadError),
836 PromptCapabilitiesUpdated,
837 Refusal,
838 AvailableCommandsUpdated(Vec<acp::AvailableCommand>),
839 ModeUpdated(acp::SessionModeId),
840}
841
842impl EventEmitter<AcpThreadEvent> for AcpThread {}
843
844#[derive(Debug, Clone)]
845pub enum TerminalProviderEvent {
846 Created {
847 terminal_id: acp::TerminalId,
848 label: String,
849 cwd: Option<PathBuf>,
850 output_byte_limit: Option<u64>,
851 terminal: Entity<::terminal::Terminal>,
852 },
853 Output {
854 terminal_id: acp::TerminalId,
855 data: Vec<u8>,
856 },
857 TitleChanged {
858 terminal_id: acp::TerminalId,
859 title: String,
860 },
861 Exit {
862 terminal_id: acp::TerminalId,
863 status: acp::TerminalExitStatus,
864 },
865}
866
867#[derive(Debug, Clone)]
868pub enum TerminalProviderCommand {
869 WriteInput {
870 terminal_id: acp::TerminalId,
871 bytes: Vec<u8>,
872 },
873 Resize {
874 terminal_id: acp::TerminalId,
875 cols: u16,
876 rows: u16,
877 },
878 Close {
879 terminal_id: acp::TerminalId,
880 },
881}
882
883impl AcpThread {
884 pub fn on_terminal_provider_event(
885 &mut self,
886 event: TerminalProviderEvent,
887 cx: &mut Context<Self>,
888 ) {
889 match event {
890 TerminalProviderEvent::Created {
891 terminal_id,
892 label,
893 cwd,
894 output_byte_limit,
895 terminal,
896 } => {
897 let entity = self.register_terminal_created(
898 terminal_id.clone(),
899 label,
900 cwd,
901 output_byte_limit,
902 terminal,
903 cx,
904 );
905
906 if let Some(mut chunks) = self.pending_terminal_output.remove(&terminal_id) {
907 for data in chunks.drain(..) {
908 entity.update(cx, |term, cx| {
909 term.inner().update(cx, |inner, cx| {
910 inner.write_output(&data, cx);
911 })
912 });
913 }
914 }
915
916 if let Some(_status) = self.pending_terminal_exit.remove(&terminal_id) {
917 entity.update(cx, |_term, cx| {
918 cx.notify();
919 });
920 }
921
922 cx.notify();
923 }
924 TerminalProviderEvent::Output { terminal_id, data } => {
925 if let Some(entity) = self.terminals.get(&terminal_id) {
926 entity.update(cx, |term, cx| {
927 term.inner().update(cx, |inner, cx| {
928 inner.write_output(&data, cx);
929 })
930 });
931 } else {
932 self.pending_terminal_output
933 .entry(terminal_id)
934 .or_default()
935 .push(data);
936 }
937 }
938 TerminalProviderEvent::TitleChanged { terminal_id, title } => {
939 if let Some(entity) = self.terminals.get(&terminal_id) {
940 entity.update(cx, |term, cx| {
941 term.inner().update(cx, |inner, cx| {
942 inner.breadcrumb_text = title;
943 cx.emit(::terminal::Event::BreadcrumbsChanged);
944 })
945 });
946 }
947 }
948 TerminalProviderEvent::Exit {
949 terminal_id,
950 status,
951 } => {
952 if let Some(entity) = self.terminals.get(&terminal_id) {
953 entity.update(cx, |_term, cx| {
954 cx.notify();
955 });
956 } else {
957 self.pending_terminal_exit.insert(terminal_id, status);
958 }
959 }
960 }
961 }
962}
963
964#[derive(PartialEq, Eq, Debug)]
965pub enum ThreadStatus {
966 Idle,
967 Generating,
968}
969
970#[derive(Debug, Clone)]
971pub enum LoadError {
972 Unsupported {
973 command: SharedString,
974 current_version: SharedString,
975 minimum_version: SharedString,
976 },
977 FailedToInstall(SharedString),
978 Exited {
979 status: ExitStatus,
980 },
981 Other(SharedString),
982}
983
984impl Display for LoadError {
985 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
986 match self {
987 LoadError::Unsupported {
988 command: path,
989 current_version,
990 minimum_version,
991 } => {
992 write!(
993 f,
994 "version {current_version} from {path} is not supported (need at least {minimum_version})"
995 )
996 }
997 LoadError::FailedToInstall(msg) => write!(f, "Failed to install: {msg}"),
998 LoadError::Exited { status } => write!(f, "Server exited with status {status}"),
999 LoadError::Other(msg) => write!(f, "{msg}"),
1000 }
1001 }
1002}
1003
1004impl Error for LoadError {}
1005
1006impl AcpThread {
1007 pub fn new(
1008 title: impl Into<SharedString>,
1009 connection: Rc<dyn AgentConnection>,
1010 project: Entity<Project>,
1011 action_log: Entity<ActionLog>,
1012 session_id: acp::SessionId,
1013 mut prompt_capabilities_rx: watch::Receiver<acp::PromptCapabilities>,
1014 cx: &mut Context<Self>,
1015 ) -> Self {
1016 let prompt_capabilities = prompt_capabilities_rx.borrow().clone();
1017 let task = cx.spawn::<_, anyhow::Result<()>>(async move |this, cx| {
1018 loop {
1019 let caps = prompt_capabilities_rx.recv().await?;
1020 this.update(cx, |this, cx| {
1021 this.prompt_capabilities = caps;
1022 cx.emit(AcpThreadEvent::PromptCapabilitiesUpdated);
1023 })?;
1024 }
1025 });
1026
1027 Self {
1028 action_log,
1029 shared_buffers: Default::default(),
1030 entries: Default::default(),
1031 plan: Default::default(),
1032 title: title.into(),
1033 project,
1034 send_task: None,
1035 connection,
1036 session_id,
1037 token_usage: None,
1038 prompt_capabilities,
1039 _observe_prompt_capabilities: task,
1040 terminals: HashMap::default(),
1041 pending_terminal_output: HashMap::default(),
1042 pending_terminal_exit: HashMap::default(),
1043 }
1044 }
1045
1046 pub fn prompt_capabilities(&self) -> acp::PromptCapabilities {
1047 self.prompt_capabilities.clone()
1048 }
1049
1050 pub fn connection(&self) -> &Rc<dyn AgentConnection> {
1051 &self.connection
1052 }
1053
1054 pub fn action_log(&self) -> &Entity<ActionLog> {
1055 &self.action_log
1056 }
1057
1058 pub fn project(&self) -> &Entity<Project> {
1059 &self.project
1060 }
1061
1062 pub fn title(&self) -> SharedString {
1063 self.title.clone()
1064 }
1065
1066 pub fn entries(&self) -> &[AgentThreadEntry] {
1067 &self.entries
1068 }
1069
1070 pub fn session_id(&self) -> &acp::SessionId {
1071 &self.session_id
1072 }
1073
1074 pub fn status(&self) -> ThreadStatus {
1075 if self.send_task.is_some() {
1076 ThreadStatus::Generating
1077 } else {
1078 ThreadStatus::Idle
1079 }
1080 }
1081
1082 pub fn token_usage(&self) -> Option<&TokenUsage> {
1083 self.token_usage.as_ref()
1084 }
1085
1086 pub fn has_pending_edit_tool_calls(&self) -> bool {
1087 for entry in self.entries.iter().rev() {
1088 match entry {
1089 AgentThreadEntry::UserMessage(_) => return false,
1090 AgentThreadEntry::ToolCall(
1091 call @ ToolCall {
1092 status: ToolCallStatus::InProgress | ToolCallStatus::Pending,
1093 ..
1094 },
1095 ) if call.diffs().next().is_some() => {
1096 return true;
1097 }
1098 AgentThreadEntry::ToolCall(_) | AgentThreadEntry::AssistantMessage(_) => {}
1099 }
1100 }
1101
1102 false
1103 }
1104
1105 pub fn used_tools_since_last_user_message(&self) -> bool {
1106 for entry in self.entries.iter().rev() {
1107 match entry {
1108 AgentThreadEntry::UserMessage(..) => return false,
1109 AgentThreadEntry::AssistantMessage(..) => continue,
1110 AgentThreadEntry::ToolCall(..) => return true,
1111 }
1112 }
1113
1114 false
1115 }
1116
1117 pub fn handle_session_update(
1118 &mut self,
1119 update: acp::SessionUpdate,
1120 cx: &mut Context<Self>,
1121 ) -> Result<(), acp::Error> {
1122 match update {
1123 acp::SessionUpdate::UserMessageChunk(acp::ContentChunk { content, .. }) => {
1124 self.push_user_content_block(None, content, cx);
1125 }
1126 acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk { content, .. }) => {
1127 self.push_assistant_content_block(content, false, cx);
1128 }
1129 acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk { content, .. }) => {
1130 self.push_assistant_content_block(content, true, cx);
1131 }
1132 acp::SessionUpdate::ToolCall(tool_call) => {
1133 self.upsert_tool_call(tool_call, cx)?;
1134 }
1135 acp::SessionUpdate::ToolCallUpdate(tool_call_update) => {
1136 self.update_tool_call(tool_call_update, cx)?;
1137 }
1138 acp::SessionUpdate::Plan(plan) => {
1139 self.update_plan(plan, cx);
1140 }
1141 acp::SessionUpdate::AvailableCommandsUpdate(acp::AvailableCommandsUpdate {
1142 available_commands,
1143 ..
1144 }) => cx.emit(AcpThreadEvent::AvailableCommandsUpdated(available_commands)),
1145 acp::SessionUpdate::CurrentModeUpdate(acp::CurrentModeUpdate {
1146 current_mode_id,
1147 ..
1148 }) => cx.emit(AcpThreadEvent::ModeUpdated(current_mode_id)),
1149 }
1150 Ok(())
1151 }
1152
1153 pub fn push_user_content_block(
1154 &mut self,
1155 message_id: Option<UserMessageId>,
1156 chunk: acp::ContentBlock,
1157 cx: &mut Context<Self>,
1158 ) {
1159 let language_registry = self.project.read(cx).languages().clone();
1160 let path_style = self.project.read(cx).path_style(cx);
1161 let entries_len = self.entries.len();
1162
1163 if let Some(last_entry) = self.entries.last_mut()
1164 && let AgentThreadEntry::UserMessage(UserMessage {
1165 id,
1166 content,
1167 chunks,
1168 ..
1169 }) = last_entry
1170 {
1171 *id = message_id.or(id.take());
1172 content.append(chunk.clone(), &language_registry, path_style, cx);
1173 chunks.push(chunk);
1174 let idx = entries_len - 1;
1175 cx.emit(AcpThreadEvent::EntryUpdated(idx));
1176 } else {
1177 let content = ContentBlock::new(chunk.clone(), &language_registry, path_style, cx);
1178 self.push_entry(
1179 AgentThreadEntry::UserMessage(UserMessage {
1180 id: message_id,
1181 content,
1182 chunks: vec![chunk],
1183 checkpoint: None,
1184 }),
1185 cx,
1186 );
1187 }
1188 }
1189
1190 pub fn push_assistant_content_block(
1191 &mut self,
1192 chunk: acp::ContentBlock,
1193 is_thought: bool,
1194 cx: &mut Context<Self>,
1195 ) {
1196 let language_registry = self.project.read(cx).languages().clone();
1197 let path_style = self.project.read(cx).path_style(cx);
1198 let entries_len = self.entries.len();
1199 if let Some(last_entry) = self.entries.last_mut()
1200 && let AgentThreadEntry::AssistantMessage(AssistantMessage { chunks }) = last_entry
1201 {
1202 let idx = entries_len - 1;
1203 cx.emit(AcpThreadEvent::EntryUpdated(idx));
1204 match (chunks.last_mut(), is_thought) {
1205 (Some(AssistantMessageChunk::Message { block }), false)
1206 | (Some(AssistantMessageChunk::Thought { block }), true) => {
1207 block.append(chunk, &language_registry, path_style, cx)
1208 }
1209 _ => {
1210 let block = ContentBlock::new(chunk, &language_registry, path_style, cx);
1211 if is_thought {
1212 chunks.push(AssistantMessageChunk::Thought { block })
1213 } else {
1214 chunks.push(AssistantMessageChunk::Message { block })
1215 }
1216 }
1217 }
1218 } else {
1219 let block = ContentBlock::new(chunk, &language_registry, path_style, cx);
1220 let chunk = if is_thought {
1221 AssistantMessageChunk::Thought { block }
1222 } else {
1223 AssistantMessageChunk::Message { block }
1224 };
1225
1226 self.push_entry(
1227 AgentThreadEntry::AssistantMessage(AssistantMessage {
1228 chunks: vec![chunk],
1229 }),
1230 cx,
1231 );
1232 }
1233 }
1234
1235 fn push_entry(&mut self, entry: AgentThreadEntry, cx: &mut Context<Self>) {
1236 self.entries.push(entry);
1237 cx.emit(AcpThreadEvent::NewEntry);
1238 }
1239
1240 pub fn can_set_title(&mut self, cx: &mut Context<Self>) -> bool {
1241 self.connection.set_title(&self.session_id, cx).is_some()
1242 }
1243
1244 pub fn set_title(&mut self, title: SharedString, cx: &mut Context<Self>) -> Task<Result<()>> {
1245 if title != self.title {
1246 self.title = title.clone();
1247 cx.emit(AcpThreadEvent::TitleUpdated);
1248 if let Some(set_title) = self.connection.set_title(&self.session_id, cx) {
1249 return set_title.run(title, cx);
1250 }
1251 }
1252 Task::ready(Ok(()))
1253 }
1254
1255 pub fn update_token_usage(&mut self, usage: Option<TokenUsage>, cx: &mut Context<Self>) {
1256 self.token_usage = usage;
1257 cx.emit(AcpThreadEvent::TokenUsageUpdated);
1258 }
1259
1260 pub fn update_retry_status(&mut self, status: RetryStatus, cx: &mut Context<Self>) {
1261 cx.emit(AcpThreadEvent::Retry(status));
1262 }
1263
1264 pub fn update_tool_call(
1265 &mut self,
1266 update: impl Into<ToolCallUpdate>,
1267 cx: &mut Context<Self>,
1268 ) -> Result<()> {
1269 let update = update.into();
1270 let languages = self.project.read(cx).languages().clone();
1271 let path_style = self.project.read(cx).path_style(cx);
1272
1273 let ix = match self.index_for_tool_call(update.id()) {
1274 Some(ix) => ix,
1275 None => {
1276 // Tool call not found - create a failed tool call entry
1277 let failed_tool_call = ToolCall {
1278 id: update.id().clone(),
1279 label: cx.new(|cx| Markdown::new("Tool call not found".into(), None, None, cx)),
1280 kind: acp::ToolKind::Fetch,
1281 content: vec![ToolCallContent::ContentBlock(ContentBlock::new(
1282 acp::ContentBlock::Text(acp::TextContent {
1283 text: "Tool call not found".to_string(),
1284 annotations: None,
1285 meta: None,
1286 }),
1287 &languages,
1288 path_style,
1289 cx,
1290 ))],
1291 status: ToolCallStatus::Failed,
1292 locations: Vec::new(),
1293 resolved_locations: Vec::new(),
1294 raw_input: None,
1295 raw_output: None,
1296 };
1297 self.push_entry(AgentThreadEntry::ToolCall(failed_tool_call), cx);
1298 return Ok(());
1299 }
1300 };
1301 let AgentThreadEntry::ToolCall(call) = &mut self.entries[ix] else {
1302 unreachable!()
1303 };
1304
1305 match update {
1306 ToolCallUpdate::UpdateFields(update) => {
1307 let location_updated = update.fields.locations.is_some();
1308 call.update_fields(update.fields, languages, path_style, &self.terminals, cx)?;
1309 if location_updated {
1310 self.resolve_locations(update.id, cx);
1311 }
1312 }
1313 ToolCallUpdate::UpdateDiff(update) => {
1314 call.content.clear();
1315 call.content.push(ToolCallContent::Diff(update.diff));
1316 }
1317 ToolCallUpdate::UpdateTerminal(update) => {
1318 call.content.clear();
1319 call.content
1320 .push(ToolCallContent::Terminal(update.terminal));
1321 }
1322 }
1323
1324 cx.emit(AcpThreadEvent::EntryUpdated(ix));
1325
1326 Ok(())
1327 }
1328
1329 /// Updates a tool call if id matches an existing entry, otherwise inserts a new one.
1330 pub fn upsert_tool_call(
1331 &mut self,
1332 tool_call: acp::ToolCall,
1333 cx: &mut Context<Self>,
1334 ) -> Result<(), acp::Error> {
1335 let status = tool_call.status.into();
1336 self.upsert_tool_call_inner(tool_call.into(), status, cx)
1337 }
1338
1339 /// Fails if id does not match an existing entry.
1340 pub fn upsert_tool_call_inner(
1341 &mut self,
1342 update: acp::ToolCallUpdate,
1343 status: ToolCallStatus,
1344 cx: &mut Context<Self>,
1345 ) -> Result<(), acp::Error> {
1346 let language_registry = self.project.read(cx).languages().clone();
1347 let path_style = self.project.read(cx).path_style(cx);
1348 let id = update.id.clone();
1349
1350 if let Some(ix) = self.index_for_tool_call(&id) {
1351 let AgentThreadEntry::ToolCall(call) = &mut self.entries[ix] else {
1352 unreachable!()
1353 };
1354
1355 call.update_fields(
1356 update.fields,
1357 language_registry,
1358 path_style,
1359 &self.terminals,
1360 cx,
1361 )?;
1362 call.status = status;
1363
1364 cx.emit(AcpThreadEvent::EntryUpdated(ix));
1365 } else {
1366 let call = ToolCall::from_acp(
1367 update.try_into()?,
1368 status,
1369 language_registry,
1370 self.project.read(cx).path_style(cx),
1371 &self.terminals,
1372 cx,
1373 )?;
1374 self.push_entry(AgentThreadEntry::ToolCall(call), cx);
1375 };
1376
1377 self.resolve_locations(id, cx);
1378 Ok(())
1379 }
1380
1381 fn index_for_tool_call(&self, id: &acp::ToolCallId) -> Option<usize> {
1382 self.entries
1383 .iter()
1384 .enumerate()
1385 .rev()
1386 .find_map(|(index, entry)| {
1387 if let AgentThreadEntry::ToolCall(tool_call) = entry
1388 && &tool_call.id == id
1389 {
1390 Some(index)
1391 } else {
1392 None
1393 }
1394 })
1395 }
1396
1397 fn tool_call_mut(&mut self, id: &acp::ToolCallId) -> Option<(usize, &mut ToolCall)> {
1398 // The tool call we are looking for is typically the last one, or very close to the end.
1399 // At the moment, it doesn't seem like a hashmap would be a good fit for this use case.
1400 self.entries
1401 .iter_mut()
1402 .enumerate()
1403 .rev()
1404 .find_map(|(index, tool_call)| {
1405 if let AgentThreadEntry::ToolCall(tool_call) = tool_call
1406 && &tool_call.id == id
1407 {
1408 Some((index, tool_call))
1409 } else {
1410 None
1411 }
1412 })
1413 }
1414
1415 pub fn tool_call(&mut self, id: &acp::ToolCallId) -> Option<(usize, &ToolCall)> {
1416 self.entries
1417 .iter()
1418 .enumerate()
1419 .rev()
1420 .find_map(|(index, tool_call)| {
1421 if let AgentThreadEntry::ToolCall(tool_call) = tool_call
1422 && &tool_call.id == id
1423 {
1424 Some((index, tool_call))
1425 } else {
1426 None
1427 }
1428 })
1429 }
1430
1431 pub fn resolve_locations(&mut self, id: acp::ToolCallId, cx: &mut Context<Self>) {
1432 let project = self.project.clone();
1433 let Some((_, tool_call)) = self.tool_call_mut(&id) else {
1434 return;
1435 };
1436 let task = tool_call.resolve_locations(project, cx);
1437 cx.spawn(async move |this, cx| {
1438 let resolved_locations = task.await;
1439
1440 this.update(cx, |this, cx| {
1441 let project = this.project.clone();
1442
1443 for location in resolved_locations.iter().flatten() {
1444 this.shared_buffers
1445 .insert(location.buffer.clone(), location.buffer.read(cx).snapshot());
1446 }
1447 let Some((ix, tool_call)) = this.tool_call_mut(&id) else {
1448 return;
1449 };
1450
1451 if let Some(Some(location)) = resolved_locations.last() {
1452 project.update(cx, |project, cx| {
1453 let should_ignore = if let Some(agent_location) = project
1454 .agent_location()
1455 .filter(|agent_location| agent_location.buffer == location.buffer)
1456 {
1457 let snapshot = location.buffer.read(cx).snapshot();
1458 let old_position = agent_location.position.to_point(&snapshot);
1459 let new_position = location.position.to_point(&snapshot);
1460
1461 // ignore this so that when we get updates from the edit tool
1462 // the position doesn't reset to the startof line
1463 old_position.row == new_position.row
1464 && old_position.column > new_position.column
1465 } else {
1466 false
1467 };
1468 if !should_ignore {
1469 project.set_agent_location(Some(location.into()), cx);
1470 }
1471 });
1472 }
1473
1474 let resolved_locations = resolved_locations
1475 .iter()
1476 .map(|l| l.as_ref().map(|l| AgentLocation::from(l)))
1477 .collect::<Vec<_>>();
1478
1479 if tool_call.resolved_locations != resolved_locations {
1480 tool_call.resolved_locations = resolved_locations;
1481 cx.emit(AcpThreadEvent::EntryUpdated(ix));
1482 }
1483 })
1484 })
1485 .detach();
1486 }
1487
1488 pub fn request_tool_call_authorization(
1489 &mut self,
1490 tool_call: acp::ToolCallUpdate,
1491 options: Vec<acp::PermissionOption>,
1492 respect_always_allow_setting: bool,
1493 cx: &mut Context<Self>,
1494 ) -> Result<BoxFuture<'static, acp::RequestPermissionOutcome>> {
1495 let (tx, rx) = oneshot::channel();
1496
1497 if respect_always_allow_setting && AgentSettings::get_global(cx).always_allow_tool_actions {
1498 // Don't use AllowAlways, because then if you were to turn off always_allow_tool_actions,
1499 // some tools would (incorrectly) continue to auto-accept.
1500 if let Some(allow_once_option) = options.iter().find_map(|option| {
1501 if matches!(option.kind, acp::PermissionOptionKind::AllowOnce) {
1502 Some(option.id.clone())
1503 } else {
1504 None
1505 }
1506 }) {
1507 self.upsert_tool_call_inner(tool_call, ToolCallStatus::Pending, cx)?;
1508 return Ok(async {
1509 acp::RequestPermissionOutcome::Selected {
1510 option_id: allow_once_option,
1511 }
1512 }
1513 .boxed());
1514 }
1515 }
1516
1517 let status = ToolCallStatus::WaitingForConfirmation {
1518 options,
1519 respond_tx: tx,
1520 };
1521
1522 self.upsert_tool_call_inner(tool_call, status, cx)?;
1523 cx.emit(AcpThreadEvent::ToolAuthorizationRequired);
1524
1525 let fut = async {
1526 match rx.await {
1527 Ok(option) => acp::RequestPermissionOutcome::Selected { option_id: option },
1528 Err(oneshot::Canceled) => acp::RequestPermissionOutcome::Cancelled,
1529 }
1530 }
1531 .boxed();
1532
1533 Ok(fut)
1534 }
1535
1536 pub fn authorize_tool_call(
1537 &mut self,
1538 id: acp::ToolCallId,
1539 option_id: acp::PermissionOptionId,
1540 option_kind: acp::PermissionOptionKind,
1541 cx: &mut Context<Self>,
1542 ) {
1543 let Some((ix, call)) = self.tool_call_mut(&id) else {
1544 return;
1545 };
1546
1547 let new_status = match option_kind {
1548 acp::PermissionOptionKind::RejectOnce | acp::PermissionOptionKind::RejectAlways => {
1549 ToolCallStatus::Rejected
1550 }
1551 acp::PermissionOptionKind::AllowOnce | acp::PermissionOptionKind::AllowAlways => {
1552 ToolCallStatus::InProgress
1553 }
1554 };
1555
1556 let curr_status = mem::replace(&mut call.status, new_status);
1557
1558 if let ToolCallStatus::WaitingForConfirmation { respond_tx, .. } = curr_status {
1559 respond_tx.send(option_id).log_err();
1560 } else if cfg!(debug_assertions) {
1561 panic!("tried to authorize an already authorized tool call");
1562 }
1563
1564 cx.emit(AcpThreadEvent::EntryUpdated(ix));
1565 }
1566
1567 pub fn first_tool_awaiting_confirmation(&self) -> Option<&ToolCall> {
1568 let mut first_tool_call = None;
1569
1570 for entry in self.entries.iter().rev() {
1571 match &entry {
1572 AgentThreadEntry::ToolCall(call) => {
1573 if let ToolCallStatus::WaitingForConfirmation { .. } = call.status {
1574 first_tool_call = Some(call);
1575 } else {
1576 continue;
1577 }
1578 }
1579 AgentThreadEntry::UserMessage(_) | AgentThreadEntry::AssistantMessage(_) => {
1580 // Reached the beginning of the turn.
1581 // If we had pending permission requests in the previous turn, they have been cancelled.
1582 break;
1583 }
1584 }
1585 }
1586
1587 first_tool_call
1588 }
1589
1590 pub fn plan(&self) -> &Plan {
1591 &self.plan
1592 }
1593
1594 pub fn update_plan(&mut self, request: acp::Plan, cx: &mut Context<Self>) {
1595 let new_entries_len = request.entries.len();
1596 let mut new_entries = request.entries.into_iter();
1597
1598 // Reuse existing markdown to prevent flickering
1599 for (old, new) in self.plan.entries.iter_mut().zip(new_entries.by_ref()) {
1600 let PlanEntry {
1601 content,
1602 priority,
1603 status,
1604 } = old;
1605 content.update(cx, |old, cx| {
1606 old.replace(new.content, cx);
1607 });
1608 *priority = new.priority;
1609 *status = new.status;
1610 }
1611 for new in new_entries {
1612 self.plan.entries.push(PlanEntry::from_acp(new, cx))
1613 }
1614 self.plan.entries.truncate(new_entries_len);
1615
1616 cx.notify();
1617 }
1618
1619 fn clear_completed_plan_entries(&mut self, cx: &mut Context<Self>) {
1620 self.plan
1621 .entries
1622 .retain(|entry| !matches!(entry.status, acp::PlanEntryStatus::Completed));
1623 cx.notify();
1624 }
1625
1626 #[cfg(any(test, feature = "test-support"))]
1627 pub fn send_raw(
1628 &mut self,
1629 message: &str,
1630 cx: &mut Context<Self>,
1631 ) -> BoxFuture<'static, Result<()>> {
1632 self.send(
1633 vec![acp::ContentBlock::Text(acp::TextContent {
1634 text: message.to_string(),
1635 annotations: None,
1636 meta: None,
1637 })],
1638 cx,
1639 )
1640 }
1641
1642 pub fn send(
1643 &mut self,
1644 message: Vec<acp::ContentBlock>,
1645 cx: &mut Context<Self>,
1646 ) -> BoxFuture<'static, Result<()>> {
1647 let block = ContentBlock::new_combined(
1648 message.clone(),
1649 self.project.read(cx).languages().clone(),
1650 self.project.read(cx).path_style(cx),
1651 cx,
1652 );
1653 let request = acp::PromptRequest {
1654 prompt: message.clone(),
1655 session_id: self.session_id.clone(),
1656 meta: None,
1657 };
1658 let git_store = self.project.read(cx).git_store().clone();
1659
1660 let message_id = if self.connection.truncate(&self.session_id, cx).is_some() {
1661 Some(UserMessageId::new())
1662 } else {
1663 None
1664 };
1665
1666 self.run_turn(cx, async move |this, cx| {
1667 this.update(cx, |this, cx| {
1668 this.push_entry(
1669 AgentThreadEntry::UserMessage(UserMessage {
1670 id: message_id.clone(),
1671 content: block,
1672 chunks: message,
1673 checkpoint: None,
1674 }),
1675 cx,
1676 );
1677 })
1678 .ok();
1679
1680 let old_checkpoint = git_store
1681 .update(cx, |git, cx| git.checkpoint(cx))?
1682 .await
1683 .context("failed to get old checkpoint")
1684 .log_err();
1685 this.update(cx, |this, cx| {
1686 if let Some((_ix, message)) = this.last_user_message() {
1687 message.checkpoint = old_checkpoint.map(|git_checkpoint| Checkpoint {
1688 git_checkpoint,
1689 show: false,
1690 });
1691 }
1692 this.connection.prompt(message_id, request, cx)
1693 })?
1694 .await
1695 })
1696 }
1697
1698 pub fn can_resume(&self, cx: &App) -> bool {
1699 self.connection.resume(&self.session_id, cx).is_some()
1700 }
1701
1702 pub fn resume(&mut self, cx: &mut Context<Self>) -> BoxFuture<'static, Result<()>> {
1703 self.run_turn(cx, async move |this, cx| {
1704 this.update(cx, |this, cx| {
1705 this.connection
1706 .resume(&this.session_id, cx)
1707 .map(|resume| resume.run(cx))
1708 })?
1709 .context("resuming a session is not supported")?
1710 .await
1711 })
1712 }
1713
1714 fn run_turn(
1715 &mut self,
1716 cx: &mut Context<Self>,
1717 f: impl 'static + AsyncFnOnce(WeakEntity<Self>, &mut AsyncApp) -> Result<acp::PromptResponse>,
1718 ) -> BoxFuture<'static, Result<()>> {
1719 self.clear_completed_plan_entries(cx);
1720
1721 let (tx, rx) = oneshot::channel();
1722 let cancel_task = self.cancel(cx);
1723
1724 self.send_task = Some(cx.spawn(async move |this, cx| {
1725 cancel_task.await;
1726 tx.send(f(this, cx).await).ok();
1727 }));
1728
1729 cx.spawn(async move |this, cx| {
1730 let response = rx.await;
1731
1732 this.update(cx, |this, cx| this.update_last_checkpoint(cx))?
1733 .await?;
1734
1735 this.update(cx, |this, cx| {
1736 this.project
1737 .update(cx, |project, cx| project.set_agent_location(None, cx));
1738 match response {
1739 Ok(Err(e)) => {
1740 this.send_task.take();
1741 cx.emit(AcpThreadEvent::Error);
1742 Err(e)
1743 }
1744 result => {
1745 let canceled = matches!(
1746 result,
1747 Ok(Ok(acp::PromptResponse {
1748 stop_reason: acp::StopReason::Cancelled,
1749 meta: None,
1750 }))
1751 );
1752
1753 // We only take the task if the current prompt wasn't canceled.
1754 //
1755 // This prompt may have been canceled because another one was sent
1756 // while it was still generating. In these cases, dropping `send_task`
1757 // would cause the next generation to be canceled.
1758 if !canceled {
1759 this.send_task.take();
1760 }
1761
1762 // Handle refusal - distinguish between user prompt and tool call refusals
1763 if let Ok(Ok(acp::PromptResponse {
1764 stop_reason: acp::StopReason::Refusal,
1765 meta: _,
1766 })) = result
1767 {
1768 if let Some((user_msg_ix, _)) = this.last_user_message() {
1769 // Check if there's a completed tool call with results after the last user message
1770 // This indicates the refusal is in response to tool output, not the user's prompt
1771 let has_completed_tool_call_after_user_msg =
1772 this.entries.iter().skip(user_msg_ix + 1).any(|entry| {
1773 if let AgentThreadEntry::ToolCall(tool_call) = entry {
1774 // Check if the tool call has completed and has output
1775 matches!(tool_call.status, ToolCallStatus::Completed)
1776 && tool_call.raw_output.is_some()
1777 } else {
1778 false
1779 }
1780 });
1781
1782 if has_completed_tool_call_after_user_msg {
1783 // Refusal is due to tool output - don't truncate, just notify
1784 // The model refused based on what the tool returned
1785 cx.emit(AcpThreadEvent::Refusal);
1786 } else {
1787 // User prompt was refused - truncate back to before the user message
1788 let range = user_msg_ix..this.entries.len();
1789 if range.start < range.end {
1790 this.entries.truncate(user_msg_ix);
1791 cx.emit(AcpThreadEvent::EntriesRemoved(range));
1792 }
1793 cx.emit(AcpThreadEvent::Refusal);
1794 }
1795 } else {
1796 // No user message found, treat as general refusal
1797 cx.emit(AcpThreadEvent::Refusal);
1798 }
1799 }
1800
1801 cx.emit(AcpThreadEvent::Stopped);
1802 Ok(())
1803 }
1804 }
1805 })?
1806 })
1807 .boxed()
1808 }
1809
1810 pub fn cancel(&mut self, cx: &mut Context<Self>) -> Task<()> {
1811 let Some(send_task) = self.send_task.take() else {
1812 return Task::ready(());
1813 };
1814
1815 for entry in self.entries.iter_mut() {
1816 if let AgentThreadEntry::ToolCall(call) = entry {
1817 let cancel = matches!(
1818 call.status,
1819 ToolCallStatus::Pending
1820 | ToolCallStatus::WaitingForConfirmation { .. }
1821 | ToolCallStatus::InProgress
1822 );
1823
1824 if cancel {
1825 call.status = ToolCallStatus::Canceled;
1826 }
1827 }
1828 }
1829
1830 self.connection.cancel(&self.session_id, cx);
1831
1832 // Wait for the send task to complete
1833 cx.foreground_executor().spawn(send_task)
1834 }
1835
1836 /// Restores the git working tree to the state at the given checkpoint (if one exists)
1837 pub fn restore_checkpoint(
1838 &mut self,
1839 id: UserMessageId,
1840 cx: &mut Context<Self>,
1841 ) -> Task<Result<()>> {
1842 let Some((_, message)) = self.user_message_mut(&id) else {
1843 return Task::ready(Err(anyhow!("message not found")));
1844 };
1845
1846 let checkpoint = message
1847 .checkpoint
1848 .as_ref()
1849 .map(|c| c.git_checkpoint.clone());
1850 let rewind = self.rewind(id.clone(), cx);
1851 let git_store = self.project.read(cx).git_store().clone();
1852
1853 cx.spawn(async move |_, cx| {
1854 rewind.await?;
1855 if let Some(checkpoint) = checkpoint {
1856 git_store
1857 .update(cx, |git, cx| git.restore_checkpoint(checkpoint, cx))?
1858 .await?;
1859 }
1860
1861 Ok(())
1862 })
1863 }
1864
1865 /// Rewinds this thread to before the entry at `index`, removing it and all
1866 /// subsequent entries while rejecting any action_log changes made from that point.
1867 /// Unlike `restore_checkpoint`, this method does not restore from git.
1868 pub fn rewind(&mut self, id: UserMessageId, cx: &mut Context<Self>) -> Task<Result<()>> {
1869 let Some(truncate) = self.connection.truncate(&self.session_id, cx) else {
1870 return Task::ready(Err(anyhow!("not supported")));
1871 };
1872
1873 cx.spawn(async move |this, cx| {
1874 cx.update(|cx| truncate.run(id.clone(), cx))?.await?;
1875 this.update(cx, |this, cx| {
1876 if let Some((ix, _)) = this.user_message_mut(&id) {
1877 let range = ix..this.entries.len();
1878 this.entries.truncate(ix);
1879 cx.emit(AcpThreadEvent::EntriesRemoved(range));
1880 }
1881 this.action_log()
1882 .update(cx, |action_log, cx| action_log.reject_all_edits(cx))
1883 })?
1884 .await;
1885 Ok(())
1886 })
1887 }
1888
1889 fn update_last_checkpoint(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
1890 let git_store = self.project.read(cx).git_store().clone();
1891
1892 let old_checkpoint = if let Some((_, message)) = self.last_user_message() {
1893 if let Some(checkpoint) = message.checkpoint.as_ref() {
1894 checkpoint.git_checkpoint.clone()
1895 } else {
1896 return Task::ready(Ok(()));
1897 }
1898 } else {
1899 return Task::ready(Ok(()));
1900 };
1901
1902 let new_checkpoint = git_store.update(cx, |git, cx| git.checkpoint(cx));
1903 cx.spawn(async move |this, cx| {
1904 let new_checkpoint = new_checkpoint
1905 .await
1906 .context("failed to get new checkpoint")
1907 .log_err();
1908 if let Some(new_checkpoint) = new_checkpoint {
1909 let equal = git_store
1910 .update(cx, |git, cx| {
1911 git.compare_checkpoints(old_checkpoint.clone(), new_checkpoint, cx)
1912 })?
1913 .await
1914 .unwrap_or(true);
1915 this.update(cx, |this, cx| {
1916 let (ix, message) = this.last_user_message().context("no user message")?;
1917 let checkpoint = message.checkpoint.as_mut().context("no checkpoint")?;
1918 checkpoint.show = !equal;
1919 cx.emit(AcpThreadEvent::EntryUpdated(ix));
1920 anyhow::Ok(())
1921 })??;
1922 }
1923
1924 Ok(())
1925 })
1926 }
1927
1928 fn last_user_message(&mut self) -> Option<(usize, &mut UserMessage)> {
1929 self.entries
1930 .iter_mut()
1931 .enumerate()
1932 .rev()
1933 .find_map(|(ix, entry)| {
1934 if let AgentThreadEntry::UserMessage(message) = entry {
1935 Some((ix, message))
1936 } else {
1937 None
1938 }
1939 })
1940 }
1941
1942 fn user_message_mut(&mut self, id: &UserMessageId) -> Option<(usize, &mut UserMessage)> {
1943 self.entries.iter_mut().enumerate().find_map(|(ix, entry)| {
1944 if let AgentThreadEntry::UserMessage(message) = entry {
1945 if message.id.as_ref() == Some(id) {
1946 Some((ix, message))
1947 } else {
1948 None
1949 }
1950 } else {
1951 None
1952 }
1953 })
1954 }
1955
1956 pub fn read_text_file(
1957 &self,
1958 path: PathBuf,
1959 line: Option<u32>,
1960 limit: Option<u32>,
1961 reuse_shared_snapshot: bool,
1962 cx: &mut Context<Self>,
1963 ) -> Task<Result<String, acp::Error>> {
1964 // Args are 1-based, move to 0-based
1965 let line = line.unwrap_or_default().saturating_sub(1);
1966 let limit = limit.unwrap_or(u32::MAX);
1967 let project = self.project.clone();
1968 let action_log = self.action_log.clone();
1969 cx.spawn(async move |this, cx| {
1970 let load = project
1971 .update(cx, |project, cx| {
1972 let path = project
1973 .project_path_for_absolute_path(&path, cx)
1974 .ok_or_else(|| {
1975 acp::Error::resource_not_found(Some(path.display().to_string()))
1976 })?;
1977 Ok(project.open_buffer(path, cx))
1978 })
1979 .map_err(|e| acp::Error::internal_error().with_data(e.to_string()))
1980 .flatten()?;
1981
1982 let buffer = load.await?;
1983
1984 let snapshot = if reuse_shared_snapshot {
1985 this.read_with(cx, |this, _| {
1986 this.shared_buffers.get(&buffer.clone()).cloned()
1987 })
1988 .log_err()
1989 .flatten()
1990 } else {
1991 None
1992 };
1993
1994 let snapshot = if let Some(snapshot) = snapshot {
1995 snapshot
1996 } else {
1997 action_log.update(cx, |action_log, cx| {
1998 action_log.buffer_read(buffer.clone(), cx);
1999 })?;
2000
2001 let snapshot = buffer.update(cx, |buffer, _| buffer.snapshot())?;
2002 this.update(cx, |this, _| {
2003 this.shared_buffers.insert(buffer.clone(), snapshot.clone());
2004 })?;
2005 snapshot
2006 };
2007
2008 let max_point = snapshot.max_point();
2009 let start_position = Point::new(line, 0);
2010
2011 if start_position > max_point {
2012 return Err(acp::Error::invalid_params().with_data(format!(
2013 "Attempting to read beyond the end of the file, line {}:{}",
2014 max_point.row + 1,
2015 max_point.column
2016 )));
2017 }
2018
2019 let start = snapshot.anchor_before(start_position);
2020 let end = snapshot.anchor_before(Point::new(line.saturating_add(limit), 0));
2021
2022 project.update(cx, |project, cx| {
2023 project.set_agent_location(
2024 Some(AgentLocation {
2025 buffer: buffer.downgrade(),
2026 position: start,
2027 }),
2028 cx,
2029 );
2030 })?;
2031
2032 Ok(snapshot.text_for_range(start..end).collect::<String>())
2033 })
2034 }
2035
2036 pub fn write_text_file(
2037 &self,
2038 path: PathBuf,
2039 content: String,
2040 cx: &mut Context<Self>,
2041 ) -> Task<Result<()>> {
2042 let project = self.project.clone();
2043 let action_log = self.action_log.clone();
2044 cx.spawn(async move |this, cx| {
2045 let load = project.update(cx, |project, cx| {
2046 let path = project
2047 .project_path_for_absolute_path(&path, cx)
2048 .context("invalid path")?;
2049 anyhow::Ok(project.open_buffer(path, cx))
2050 });
2051 let buffer = load??.await?;
2052 let snapshot = this.update(cx, |this, cx| {
2053 this.shared_buffers
2054 .get(&buffer)
2055 .cloned()
2056 .unwrap_or_else(|| buffer.read(cx).snapshot())
2057 })?;
2058 let edits = cx
2059 .background_executor()
2060 .spawn(async move {
2061 let old_text = snapshot.text();
2062 text_diff(old_text.as_str(), &content)
2063 .into_iter()
2064 .map(|(range, replacement)| {
2065 (
2066 snapshot.anchor_after(range.start)
2067 ..snapshot.anchor_before(range.end),
2068 replacement,
2069 )
2070 })
2071 .collect::<Vec<_>>()
2072 })
2073 .await;
2074
2075 project.update(cx, |project, cx| {
2076 project.set_agent_location(
2077 Some(AgentLocation {
2078 buffer: buffer.downgrade(),
2079 position: edits
2080 .last()
2081 .map(|(range, _)| range.end)
2082 .unwrap_or(Anchor::MIN),
2083 }),
2084 cx,
2085 );
2086 })?;
2087
2088 let format_on_save = cx.update(|cx| {
2089 action_log.update(cx, |action_log, cx| {
2090 action_log.buffer_read(buffer.clone(), cx);
2091 });
2092
2093 let format_on_save = buffer.update(cx, |buffer, cx| {
2094 buffer.edit(edits, None, cx);
2095
2096 let settings = language::language_settings::language_settings(
2097 buffer.language().map(|l| l.name()),
2098 buffer.file(),
2099 cx,
2100 );
2101
2102 settings.format_on_save != FormatOnSave::Off
2103 });
2104 action_log.update(cx, |action_log, cx| {
2105 action_log.buffer_edited(buffer.clone(), cx);
2106 });
2107 format_on_save
2108 })?;
2109
2110 if format_on_save {
2111 let format_task = project.update(cx, |project, cx| {
2112 project.format(
2113 HashSet::from_iter([buffer.clone()]),
2114 LspFormatTarget::Buffers,
2115 false,
2116 FormatTrigger::Save,
2117 cx,
2118 )
2119 })?;
2120 format_task.await.log_err();
2121
2122 action_log.update(cx, |action_log, cx| {
2123 action_log.buffer_edited(buffer.clone(), cx);
2124 })?;
2125 }
2126
2127 project
2128 .update(cx, |project, cx| project.save_buffer(buffer, cx))?
2129 .await
2130 })
2131 }
2132
2133 pub fn create_terminal(
2134 &self,
2135 command: String,
2136 args: Vec<String>,
2137 extra_env: Vec<acp::EnvVariable>,
2138 cwd: Option<PathBuf>,
2139 output_byte_limit: Option<u64>,
2140 cx: &mut Context<Self>,
2141 ) -> Task<Result<Entity<Terminal>>> {
2142 let env = match &cwd {
2143 Some(dir) => self.project.update(cx, |project, cx| {
2144 let worktree = project.find_worktree(dir.as_path(), cx);
2145 let shell = TerminalSettings::get(
2146 worktree.as_ref().map(|(worktree, path)| SettingsLocation {
2147 worktree_id: worktree.read(cx).id(),
2148 path: &path,
2149 }),
2150 cx,
2151 )
2152 .shell
2153 .clone();
2154 project.directory_environment(&shell, dir.as_path().into(), cx)
2155 }),
2156 None => Task::ready(None).shared(),
2157 };
2158 let env = cx.spawn(async move |_, _| {
2159 let mut env = env.await.unwrap_or_default();
2160 // Disables paging for `git` and hopefully other commands
2161 env.insert("PAGER".into(), "".into());
2162 for var in extra_env {
2163 env.insert(var.name, var.value);
2164 }
2165 env
2166 });
2167
2168 let project = self.project.clone();
2169 let language_registry = project.read(cx).languages().clone();
2170 let is_windows = project.read(cx).path_style(cx).is_windows();
2171
2172 let terminal_id = acp::TerminalId(Uuid::new_v4().to_string().into());
2173 let terminal_task = cx.spawn({
2174 let terminal_id = terminal_id.clone();
2175 async move |_this, cx| {
2176 let env = env.await;
2177 let shell = project
2178 .update(cx, |project, cx| {
2179 project
2180 .remote_client()
2181 .and_then(|r| r.read(cx).default_system_shell())
2182 })?
2183 .unwrap_or_else(|| get_default_system_shell_preferring_bash());
2184 let (task_command, task_args) =
2185 ShellBuilder::new(&Shell::Program(shell), is_windows)
2186 .redirect_stdin_to_dev_null()
2187 .build(Some(command.clone()), &args);
2188 let terminal = project
2189 .update(cx, |project, cx| {
2190 project.create_terminal_task(
2191 task::SpawnInTerminal {
2192 command: Some(task_command),
2193 args: task_args,
2194 cwd: cwd.clone(),
2195 env,
2196 ..Default::default()
2197 },
2198 cx,
2199 )
2200 })?
2201 .await?;
2202
2203 cx.new(|cx| {
2204 Terminal::new(
2205 terminal_id,
2206 &format!("{} {}", command, args.join(" ")),
2207 cwd,
2208 output_byte_limit.map(|l| l as usize),
2209 terminal,
2210 language_registry,
2211 cx,
2212 )
2213 })
2214 }
2215 });
2216
2217 cx.spawn(async move |this, cx| {
2218 let terminal = terminal_task.await?;
2219 this.update(cx, |this, _cx| {
2220 this.terminals.insert(terminal_id, terminal.clone());
2221 terminal
2222 })
2223 })
2224 }
2225
2226 pub fn kill_terminal(
2227 &mut self,
2228 terminal_id: acp::TerminalId,
2229 cx: &mut Context<Self>,
2230 ) -> Result<()> {
2231 self.terminals
2232 .get(&terminal_id)
2233 .context("Terminal not found")?
2234 .update(cx, |terminal, cx| {
2235 terminal.kill(cx);
2236 });
2237
2238 Ok(())
2239 }
2240
2241 pub fn release_terminal(
2242 &mut self,
2243 terminal_id: acp::TerminalId,
2244 cx: &mut Context<Self>,
2245 ) -> Result<()> {
2246 self.terminals
2247 .remove(&terminal_id)
2248 .context("Terminal not found")?
2249 .update(cx, |terminal, cx| {
2250 terminal.kill(cx);
2251 });
2252
2253 Ok(())
2254 }
2255
2256 pub fn terminal(&self, terminal_id: acp::TerminalId) -> Result<Entity<Terminal>> {
2257 self.terminals
2258 .get(&terminal_id)
2259 .context("Terminal not found")
2260 .cloned()
2261 }
2262
2263 pub fn to_markdown(&self, cx: &App) -> String {
2264 self.entries.iter().map(|e| e.to_markdown(cx)).collect()
2265 }
2266
2267 pub fn emit_load_error(&mut self, error: LoadError, cx: &mut Context<Self>) {
2268 cx.emit(AcpThreadEvent::LoadError(error));
2269 }
2270
2271 pub fn register_terminal_created(
2272 &mut self,
2273 terminal_id: acp::TerminalId,
2274 command_label: String,
2275 working_dir: Option<PathBuf>,
2276 output_byte_limit: Option<u64>,
2277 terminal: Entity<::terminal::Terminal>,
2278 cx: &mut Context<Self>,
2279 ) -> Entity<Terminal> {
2280 let language_registry = self.project.read(cx).languages().clone();
2281
2282 let entity = cx.new(|cx| {
2283 Terminal::new(
2284 terminal_id.clone(),
2285 &command_label,
2286 working_dir.clone(),
2287 output_byte_limit.map(|l| l as usize),
2288 terminal,
2289 language_registry,
2290 cx,
2291 )
2292 });
2293 self.terminals.insert(terminal_id.clone(), entity.clone());
2294 entity
2295 }
2296}
2297
2298fn markdown_for_raw_output(
2299 raw_output: &serde_json::Value,
2300 language_registry: &Arc<LanguageRegistry>,
2301 cx: &mut App,
2302) -> Option<Entity<Markdown>> {
2303 match raw_output {
2304 serde_json::Value::Null => None,
2305 serde_json::Value::Bool(value) => Some(cx.new(|cx| {
2306 Markdown::new(
2307 value.to_string().into(),
2308 Some(language_registry.clone()),
2309 None,
2310 cx,
2311 )
2312 })),
2313 serde_json::Value::Number(value) => Some(cx.new(|cx| {
2314 Markdown::new(
2315 value.to_string().into(),
2316 Some(language_registry.clone()),
2317 None,
2318 cx,
2319 )
2320 })),
2321 serde_json::Value::String(value) => Some(cx.new(|cx| {
2322 Markdown::new(
2323 value.clone().into(),
2324 Some(language_registry.clone()),
2325 None,
2326 cx,
2327 )
2328 })),
2329 value => Some(cx.new(|cx| {
2330 Markdown::new(
2331 format!("```json\n{}\n```", value).into(),
2332 Some(language_registry.clone()),
2333 None,
2334 cx,
2335 )
2336 })),
2337 }
2338}
2339
2340#[cfg(test)]
2341mod tests {
2342 use super::*;
2343 use anyhow::anyhow;
2344 use futures::{channel::mpsc, future::LocalBoxFuture, select};
2345 use gpui::{App, AsyncApp, TestAppContext, WeakEntity};
2346 use indoc::indoc;
2347 use project::{FakeFs, Fs};
2348 use rand::{distr, prelude::*};
2349 use serde_json::json;
2350 use settings::SettingsStore;
2351 use smol::stream::StreamExt as _;
2352 use std::{
2353 any::Any,
2354 cell::RefCell,
2355 path::Path,
2356 rc::Rc,
2357 sync::atomic::{AtomicBool, AtomicUsize, Ordering::SeqCst},
2358 time::Duration,
2359 };
2360 use util::path;
2361
2362 fn init_test(cx: &mut TestAppContext) {
2363 env_logger::try_init().ok();
2364 cx.update(|cx| {
2365 let settings_store = SettingsStore::test(cx);
2366 cx.set_global(settings_store);
2367 Project::init_settings(cx);
2368 language::init(cx);
2369 });
2370 }
2371
2372 #[gpui::test]
2373 async fn test_terminal_output_buffered_before_created_renders(cx: &mut gpui::TestAppContext) {
2374 init_test(cx);
2375
2376 let fs = FakeFs::new(cx.executor());
2377 let project = Project::test(fs, [], cx).await;
2378 let connection = Rc::new(FakeAgentConnection::new());
2379 let thread = cx
2380 .update(|cx| connection.new_thread(project, std::path::Path::new(path!("/test")), cx))
2381 .await
2382 .unwrap();
2383
2384 let terminal_id = acp::TerminalId(uuid::Uuid::new_v4().to_string().into());
2385
2386 // Send Output BEFORE Created - should be buffered by acp_thread
2387 thread.update(cx, |thread, cx| {
2388 thread.on_terminal_provider_event(
2389 TerminalProviderEvent::Output {
2390 terminal_id: terminal_id.clone(),
2391 data: b"hello buffered".to_vec(),
2392 },
2393 cx,
2394 );
2395 });
2396
2397 // Create a display-only terminal and then send Created
2398 let lower = cx.new(|cx| {
2399 let builder = ::terminal::TerminalBuilder::new_display_only(
2400 ::terminal::terminal_settings::CursorShape::default(),
2401 ::terminal::terminal_settings::AlternateScroll::On,
2402 None,
2403 0,
2404 )
2405 .unwrap();
2406 builder.subscribe(cx)
2407 });
2408
2409 thread.update(cx, |thread, cx| {
2410 thread.on_terminal_provider_event(
2411 TerminalProviderEvent::Created {
2412 terminal_id: terminal_id.clone(),
2413 label: "Buffered Test".to_string(),
2414 cwd: None,
2415 output_byte_limit: None,
2416 terminal: lower.clone(),
2417 },
2418 cx,
2419 );
2420 });
2421
2422 // After Created, buffered Output should have been flushed into the renderer
2423 let content = thread.read_with(cx, |thread, cx| {
2424 let term = thread.terminal(terminal_id.clone()).unwrap();
2425 term.read_with(cx, |t, cx| t.inner().read(cx).get_content())
2426 });
2427
2428 assert!(
2429 content.contains("hello buffered"),
2430 "expected buffered output to render, got: {content}"
2431 );
2432 }
2433
2434 #[gpui::test]
2435 async fn test_terminal_output_and_exit_buffered_before_created(cx: &mut gpui::TestAppContext) {
2436 init_test(cx);
2437
2438 let fs = FakeFs::new(cx.executor());
2439 let project = Project::test(fs, [], cx).await;
2440 let connection = Rc::new(FakeAgentConnection::new());
2441 let thread = cx
2442 .update(|cx| connection.new_thread(project, std::path::Path::new(path!("/test")), cx))
2443 .await
2444 .unwrap();
2445
2446 let terminal_id = acp::TerminalId(uuid::Uuid::new_v4().to_string().into());
2447
2448 // Send Output BEFORE Created
2449 thread.update(cx, |thread, cx| {
2450 thread.on_terminal_provider_event(
2451 TerminalProviderEvent::Output {
2452 terminal_id: terminal_id.clone(),
2453 data: b"pre-exit data".to_vec(),
2454 },
2455 cx,
2456 );
2457 });
2458
2459 // Send Exit BEFORE Created
2460 thread.update(cx, |thread, cx| {
2461 thread.on_terminal_provider_event(
2462 TerminalProviderEvent::Exit {
2463 terminal_id: terminal_id.clone(),
2464 status: acp::TerminalExitStatus {
2465 exit_code: Some(0),
2466 signal: None,
2467 meta: None,
2468 },
2469 },
2470 cx,
2471 );
2472 });
2473
2474 // Now create a display-only lower-level terminal and send Created
2475 let lower = cx.new(|cx| {
2476 let builder = ::terminal::TerminalBuilder::new_display_only(
2477 ::terminal::terminal_settings::CursorShape::default(),
2478 ::terminal::terminal_settings::AlternateScroll::On,
2479 None,
2480 0,
2481 )
2482 .unwrap();
2483 builder.subscribe(cx)
2484 });
2485
2486 thread.update(cx, |thread, cx| {
2487 thread.on_terminal_provider_event(
2488 TerminalProviderEvent::Created {
2489 terminal_id: terminal_id.clone(),
2490 label: "Buffered Exit Test".to_string(),
2491 cwd: None,
2492 output_byte_limit: None,
2493 terminal: lower.clone(),
2494 },
2495 cx,
2496 );
2497 });
2498
2499 // Output should be present after Created (flushed from buffer)
2500 let content = thread.read_with(cx, |thread, cx| {
2501 let term = thread.terminal(terminal_id.clone()).unwrap();
2502 term.read_with(cx, |t, cx| t.inner().read(cx).get_content())
2503 });
2504
2505 assert!(
2506 content.contains("pre-exit data"),
2507 "expected pre-exit data to render, got: {content}"
2508 );
2509 }
2510
2511 #[gpui::test]
2512 async fn test_push_user_content_block(cx: &mut gpui::TestAppContext) {
2513 init_test(cx);
2514
2515 let fs = FakeFs::new(cx.executor());
2516 let project = Project::test(fs, [], cx).await;
2517 let connection = Rc::new(FakeAgentConnection::new());
2518 let thread = cx
2519 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
2520 .await
2521 .unwrap();
2522
2523 // Test creating a new user message
2524 thread.update(cx, |thread, cx| {
2525 thread.push_user_content_block(
2526 None,
2527 acp::ContentBlock::Text(acp::TextContent {
2528 annotations: None,
2529 text: "Hello, ".to_string(),
2530 meta: None,
2531 }),
2532 cx,
2533 );
2534 });
2535
2536 thread.update(cx, |thread, cx| {
2537 assert_eq!(thread.entries.len(), 1);
2538 if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
2539 assert_eq!(user_msg.id, None);
2540 assert_eq!(user_msg.content.to_markdown(cx), "Hello, ");
2541 } else {
2542 panic!("Expected UserMessage");
2543 }
2544 });
2545
2546 // Test appending to existing user message
2547 let message_1_id = UserMessageId::new();
2548 thread.update(cx, |thread, cx| {
2549 thread.push_user_content_block(
2550 Some(message_1_id.clone()),
2551 acp::ContentBlock::Text(acp::TextContent {
2552 annotations: None,
2553 text: "world!".to_string(),
2554 meta: None,
2555 }),
2556 cx,
2557 );
2558 });
2559
2560 thread.update(cx, |thread, cx| {
2561 assert_eq!(thread.entries.len(), 1);
2562 if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
2563 assert_eq!(user_msg.id, Some(message_1_id));
2564 assert_eq!(user_msg.content.to_markdown(cx), "Hello, world!");
2565 } else {
2566 panic!("Expected UserMessage");
2567 }
2568 });
2569
2570 // Test creating new user message after assistant message
2571 thread.update(cx, |thread, cx| {
2572 thread.push_assistant_content_block(
2573 acp::ContentBlock::Text(acp::TextContent {
2574 annotations: None,
2575 text: "Assistant response".to_string(),
2576 meta: None,
2577 }),
2578 false,
2579 cx,
2580 );
2581 });
2582
2583 let message_2_id = UserMessageId::new();
2584 thread.update(cx, |thread, cx| {
2585 thread.push_user_content_block(
2586 Some(message_2_id.clone()),
2587 acp::ContentBlock::Text(acp::TextContent {
2588 annotations: None,
2589 text: "New user message".to_string(),
2590 meta: None,
2591 }),
2592 cx,
2593 );
2594 });
2595
2596 thread.update(cx, |thread, cx| {
2597 assert_eq!(thread.entries.len(), 3);
2598 if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[2] {
2599 assert_eq!(user_msg.id, Some(message_2_id));
2600 assert_eq!(user_msg.content.to_markdown(cx), "New user message");
2601 } else {
2602 panic!("Expected UserMessage at index 2");
2603 }
2604 });
2605 }
2606
2607 #[gpui::test]
2608 async fn test_thinking_concatenation(cx: &mut gpui::TestAppContext) {
2609 init_test(cx);
2610
2611 let fs = FakeFs::new(cx.executor());
2612 let project = Project::test(fs, [], cx).await;
2613 let connection = Rc::new(FakeAgentConnection::new().on_user_message(
2614 |_, thread, mut cx| {
2615 async move {
2616 thread.update(&mut cx, |thread, cx| {
2617 thread
2618 .handle_session_update(
2619 acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk {
2620 content: "Thinking ".into(),
2621 meta: None,
2622 }),
2623 cx,
2624 )
2625 .unwrap();
2626 thread
2627 .handle_session_update(
2628 acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk {
2629 content: "hard!".into(),
2630 meta: None,
2631 }),
2632 cx,
2633 )
2634 .unwrap();
2635 })?;
2636 Ok(acp::PromptResponse {
2637 stop_reason: acp::StopReason::EndTurn,
2638 meta: None,
2639 })
2640 }
2641 .boxed_local()
2642 },
2643 ));
2644
2645 let thread = cx
2646 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
2647 .await
2648 .unwrap();
2649
2650 thread
2651 .update(cx, |thread, cx| thread.send_raw("Hello from Zed!", cx))
2652 .await
2653 .unwrap();
2654
2655 let output = thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
2656 assert_eq!(
2657 output,
2658 indoc! {r#"
2659 ## User
2660
2661 Hello from Zed!
2662
2663 ## Assistant
2664
2665 <thinking>
2666 Thinking hard!
2667 </thinking>
2668
2669 "#}
2670 );
2671 }
2672
2673 #[gpui::test]
2674 async fn test_edits_concurrently_to_user(cx: &mut TestAppContext) {
2675 init_test(cx);
2676
2677 let fs = FakeFs::new(cx.executor());
2678 fs.insert_tree(path!("/tmp"), json!({"foo": "one\ntwo\nthree\n"}))
2679 .await;
2680 let project = Project::test(fs.clone(), [], cx).await;
2681 let (read_file_tx, read_file_rx) = oneshot::channel::<()>();
2682 let read_file_tx = Rc::new(RefCell::new(Some(read_file_tx)));
2683 let connection = Rc::new(FakeAgentConnection::new().on_user_message(
2684 move |_, thread, mut cx| {
2685 let read_file_tx = read_file_tx.clone();
2686 async move {
2687 let content = thread
2688 .update(&mut cx, |thread, cx| {
2689 thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
2690 })
2691 .unwrap()
2692 .await
2693 .unwrap();
2694 assert_eq!(content, "one\ntwo\nthree\n");
2695 read_file_tx.take().unwrap().send(()).unwrap();
2696 thread
2697 .update(&mut cx, |thread, cx| {
2698 thread.write_text_file(
2699 path!("/tmp/foo").into(),
2700 "one\ntwo\nthree\nfour\nfive\n".to_string(),
2701 cx,
2702 )
2703 })
2704 .unwrap()
2705 .await
2706 .unwrap();
2707 Ok(acp::PromptResponse {
2708 stop_reason: acp::StopReason::EndTurn,
2709 meta: None,
2710 })
2711 }
2712 .boxed_local()
2713 },
2714 ));
2715
2716 let (worktree, pathbuf) = project
2717 .update(cx, |project, cx| {
2718 project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
2719 })
2720 .await
2721 .unwrap();
2722 let buffer = project
2723 .update(cx, |project, cx| {
2724 project.open_buffer((worktree.read(cx).id(), pathbuf), cx)
2725 })
2726 .await
2727 .unwrap();
2728
2729 let thread = cx
2730 .update(|cx| connection.new_thread(project, Path::new(path!("/tmp")), cx))
2731 .await
2732 .unwrap();
2733
2734 let request = thread.update(cx, |thread, cx| {
2735 thread.send_raw("Extend the count in /tmp/foo", cx)
2736 });
2737 read_file_rx.await.ok();
2738 buffer.update(cx, |buffer, cx| {
2739 buffer.edit([(0..0, "zero\n".to_string())], None, cx);
2740 });
2741 cx.run_until_parked();
2742 assert_eq!(
2743 buffer.read_with(cx, |buffer, _| buffer.text()),
2744 "zero\none\ntwo\nthree\nfour\nfive\n"
2745 );
2746 assert_eq!(
2747 String::from_utf8(fs.read_file_sync(path!("/tmp/foo")).unwrap()).unwrap(),
2748 "zero\none\ntwo\nthree\nfour\nfive\n"
2749 );
2750 request.await.unwrap();
2751 }
2752
2753 #[gpui::test]
2754 async fn test_reading_from_line(cx: &mut TestAppContext) {
2755 init_test(cx);
2756
2757 let fs = FakeFs::new(cx.executor());
2758 fs.insert_tree(path!("/tmp"), json!({"foo": "one\ntwo\nthree\nfour\n"}))
2759 .await;
2760 let project = Project::test(fs.clone(), [], cx).await;
2761 project
2762 .update(cx, |project, cx| {
2763 project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
2764 })
2765 .await
2766 .unwrap();
2767
2768 let connection = Rc::new(FakeAgentConnection::new());
2769
2770 let thread = cx
2771 .update(|cx| connection.new_thread(project, Path::new(path!("/tmp")), cx))
2772 .await
2773 .unwrap();
2774
2775 // Whole file
2776 let content = thread
2777 .update(cx, |thread, cx| {
2778 thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
2779 })
2780 .await
2781 .unwrap();
2782
2783 assert_eq!(content, "one\ntwo\nthree\nfour\n");
2784
2785 // Only start line
2786 let content = thread
2787 .update(cx, |thread, cx| {
2788 thread.read_text_file(path!("/tmp/foo").into(), Some(3), None, false, cx)
2789 })
2790 .await
2791 .unwrap();
2792
2793 assert_eq!(content, "three\nfour\n");
2794
2795 // Only limit
2796 let content = thread
2797 .update(cx, |thread, cx| {
2798 thread.read_text_file(path!("/tmp/foo").into(), None, Some(2), false, cx)
2799 })
2800 .await
2801 .unwrap();
2802
2803 assert_eq!(content, "one\ntwo\n");
2804
2805 // Range
2806 let content = thread
2807 .update(cx, |thread, cx| {
2808 thread.read_text_file(path!("/tmp/foo").into(), Some(2), Some(2), false, cx)
2809 })
2810 .await
2811 .unwrap();
2812
2813 assert_eq!(content, "two\nthree\n");
2814
2815 // Invalid
2816 let err = thread
2817 .update(cx, |thread, cx| {
2818 thread.read_text_file(path!("/tmp/foo").into(), Some(6), Some(2), false, cx)
2819 })
2820 .await
2821 .unwrap_err();
2822
2823 assert_eq!(
2824 err.to_string(),
2825 "Invalid params: \"Attempting to read beyond the end of the file, line 5:0\""
2826 );
2827 }
2828
2829 #[gpui::test]
2830 async fn test_reading_empty_file(cx: &mut TestAppContext) {
2831 init_test(cx);
2832
2833 let fs = FakeFs::new(cx.executor());
2834 fs.insert_tree(path!("/tmp"), json!({"foo": ""})).await;
2835 let project = Project::test(fs.clone(), [], cx).await;
2836 project
2837 .update(cx, |project, cx| {
2838 project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
2839 })
2840 .await
2841 .unwrap();
2842
2843 let connection = Rc::new(FakeAgentConnection::new());
2844
2845 let thread = cx
2846 .update(|cx| connection.new_thread(project, Path::new(path!("/tmp")), cx))
2847 .await
2848 .unwrap();
2849
2850 // Whole file
2851 let content = thread
2852 .update(cx, |thread, cx| {
2853 thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
2854 })
2855 .await
2856 .unwrap();
2857
2858 assert_eq!(content, "");
2859
2860 // Only start line
2861 let content = thread
2862 .update(cx, |thread, cx| {
2863 thread.read_text_file(path!("/tmp/foo").into(), Some(1), None, false, cx)
2864 })
2865 .await
2866 .unwrap();
2867
2868 assert_eq!(content, "");
2869
2870 // Only limit
2871 let content = thread
2872 .update(cx, |thread, cx| {
2873 thread.read_text_file(path!("/tmp/foo").into(), None, Some(2), false, cx)
2874 })
2875 .await
2876 .unwrap();
2877
2878 assert_eq!(content, "");
2879
2880 // Range
2881 let content = thread
2882 .update(cx, |thread, cx| {
2883 thread.read_text_file(path!("/tmp/foo").into(), Some(1), Some(1), false, cx)
2884 })
2885 .await
2886 .unwrap();
2887
2888 assert_eq!(content, "");
2889
2890 // Invalid
2891 let err = thread
2892 .update(cx, |thread, cx| {
2893 thread.read_text_file(path!("/tmp/foo").into(), Some(5), Some(2), false, cx)
2894 })
2895 .await
2896 .unwrap_err();
2897
2898 assert_eq!(
2899 err.to_string(),
2900 "Invalid params: \"Attempting to read beyond the end of the file, line 1:0\""
2901 );
2902 }
2903 #[gpui::test]
2904 async fn test_reading_non_existing_file(cx: &mut TestAppContext) {
2905 init_test(cx);
2906
2907 let fs = FakeFs::new(cx.executor());
2908 fs.insert_tree(path!("/tmp"), json!({})).await;
2909 let project = Project::test(fs.clone(), [], cx).await;
2910 project
2911 .update(cx, |project, cx| {
2912 project.find_or_create_worktree(path!("/tmp"), true, cx)
2913 })
2914 .await
2915 .unwrap();
2916
2917 let connection = Rc::new(FakeAgentConnection::new());
2918
2919 let thread = cx
2920 .update(|cx| connection.new_thread(project, Path::new(path!("/tmp")), cx))
2921 .await
2922 .unwrap();
2923
2924 // Out of project file
2925 let err = thread
2926 .update(cx, |thread, cx| {
2927 thread.read_text_file(path!("/foo").into(), None, None, false, cx)
2928 })
2929 .await
2930 .unwrap_err();
2931
2932 assert_eq!(err.code, acp::ErrorCode::RESOURCE_NOT_FOUND.code);
2933 }
2934
2935 #[gpui::test]
2936 async fn test_succeeding_canceled_toolcall(cx: &mut TestAppContext) {
2937 init_test(cx);
2938
2939 let fs = FakeFs::new(cx.executor());
2940 let project = Project::test(fs, [], cx).await;
2941 let id = acp::ToolCallId("test".into());
2942
2943 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
2944 let id = id.clone();
2945 move |_, thread, mut cx| {
2946 let id = id.clone();
2947 async move {
2948 thread
2949 .update(&mut cx, |thread, cx| {
2950 thread.handle_session_update(
2951 acp::SessionUpdate::ToolCall(acp::ToolCall {
2952 id: id.clone(),
2953 title: "Label".into(),
2954 kind: acp::ToolKind::Fetch,
2955 status: acp::ToolCallStatus::InProgress,
2956 content: vec![],
2957 locations: vec![],
2958 raw_input: None,
2959 raw_output: None,
2960 meta: None,
2961 }),
2962 cx,
2963 )
2964 })
2965 .unwrap()
2966 .unwrap();
2967 Ok(acp::PromptResponse {
2968 stop_reason: acp::StopReason::EndTurn,
2969 meta: None,
2970 })
2971 }
2972 .boxed_local()
2973 }
2974 }));
2975
2976 let thread = cx
2977 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
2978 .await
2979 .unwrap();
2980
2981 let request = thread.update(cx, |thread, cx| {
2982 thread.send_raw("Fetch https://example.com", cx)
2983 });
2984
2985 run_until_first_tool_call(&thread, cx).await;
2986
2987 thread.read_with(cx, |thread, _| {
2988 assert!(matches!(
2989 thread.entries[1],
2990 AgentThreadEntry::ToolCall(ToolCall {
2991 status: ToolCallStatus::InProgress,
2992 ..
2993 })
2994 ));
2995 });
2996
2997 thread.update(cx, |thread, cx| thread.cancel(cx)).await;
2998
2999 thread.read_with(cx, |thread, _| {
3000 assert!(matches!(
3001 &thread.entries[1],
3002 AgentThreadEntry::ToolCall(ToolCall {
3003 status: ToolCallStatus::Canceled,
3004 ..
3005 })
3006 ));
3007 });
3008
3009 thread
3010 .update(cx, |thread, cx| {
3011 thread.handle_session_update(
3012 acp::SessionUpdate::ToolCallUpdate(acp::ToolCallUpdate {
3013 id,
3014 fields: acp::ToolCallUpdateFields {
3015 status: Some(acp::ToolCallStatus::Completed),
3016 ..Default::default()
3017 },
3018 meta: None,
3019 }),
3020 cx,
3021 )
3022 })
3023 .unwrap();
3024
3025 request.await.unwrap();
3026
3027 thread.read_with(cx, |thread, _| {
3028 assert!(matches!(
3029 thread.entries[1],
3030 AgentThreadEntry::ToolCall(ToolCall {
3031 status: ToolCallStatus::Completed,
3032 ..
3033 })
3034 ));
3035 });
3036 }
3037
3038 #[gpui::test]
3039 async fn test_no_pending_edits_if_tool_calls_are_completed(cx: &mut TestAppContext) {
3040 init_test(cx);
3041 let fs = FakeFs::new(cx.background_executor.clone());
3042 fs.insert_tree(path!("/test"), json!({})).await;
3043 let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
3044
3045 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3046 move |_, thread, mut cx| {
3047 async move {
3048 thread
3049 .update(&mut cx, |thread, cx| {
3050 thread.handle_session_update(
3051 acp::SessionUpdate::ToolCall(acp::ToolCall {
3052 id: acp::ToolCallId("test".into()),
3053 title: "Label".into(),
3054 kind: acp::ToolKind::Edit,
3055 status: acp::ToolCallStatus::Completed,
3056 content: vec![acp::ToolCallContent::Diff {
3057 diff: acp::Diff {
3058 path: "/test/test.txt".into(),
3059 old_text: None,
3060 new_text: "foo".into(),
3061 meta: None,
3062 },
3063 }],
3064 locations: vec![],
3065 raw_input: None,
3066 raw_output: None,
3067 meta: None,
3068 }),
3069 cx,
3070 )
3071 })
3072 .unwrap()
3073 .unwrap();
3074 Ok(acp::PromptResponse {
3075 stop_reason: acp::StopReason::EndTurn,
3076 meta: None,
3077 })
3078 }
3079 .boxed_local()
3080 }
3081 }));
3082
3083 let thread = cx
3084 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3085 .await
3086 .unwrap();
3087
3088 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Hi".into()], cx)))
3089 .await
3090 .unwrap();
3091
3092 assert!(cx.read(|cx| !thread.read(cx).has_pending_edit_tool_calls()));
3093 }
3094
3095 #[gpui::test(iterations = 10)]
3096 async fn test_checkpoints(cx: &mut TestAppContext) {
3097 init_test(cx);
3098 let fs = FakeFs::new(cx.background_executor.clone());
3099 fs.insert_tree(
3100 path!("/test"),
3101 json!({
3102 ".git": {}
3103 }),
3104 )
3105 .await;
3106 let project = Project::test(fs.clone(), [path!("/test").as_ref()], cx).await;
3107
3108 let simulate_changes = Arc::new(AtomicBool::new(true));
3109 let next_filename = Arc::new(AtomicUsize::new(0));
3110 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3111 let simulate_changes = simulate_changes.clone();
3112 let next_filename = next_filename.clone();
3113 let fs = fs.clone();
3114 move |request, thread, mut cx| {
3115 let fs = fs.clone();
3116 let simulate_changes = simulate_changes.clone();
3117 let next_filename = next_filename.clone();
3118 async move {
3119 if simulate_changes.load(SeqCst) {
3120 let filename = format!("/test/file-{}", next_filename.fetch_add(1, SeqCst));
3121 fs.write(Path::new(&filename), b"").await?;
3122 }
3123
3124 let acp::ContentBlock::Text(content) = &request.prompt[0] else {
3125 panic!("expected text content block");
3126 };
3127 thread.update(&mut cx, |thread, cx| {
3128 thread
3129 .handle_session_update(
3130 acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk {
3131 content: content.text.to_uppercase().into(),
3132 meta: None,
3133 }),
3134 cx,
3135 )
3136 .unwrap();
3137 })?;
3138 Ok(acp::PromptResponse {
3139 stop_reason: acp::StopReason::EndTurn,
3140 meta: None,
3141 })
3142 }
3143 .boxed_local()
3144 }
3145 }));
3146 let thread = cx
3147 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3148 .await
3149 .unwrap();
3150
3151 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Lorem".into()], cx)))
3152 .await
3153 .unwrap();
3154 thread.read_with(cx, |thread, cx| {
3155 assert_eq!(
3156 thread.to_markdown(cx),
3157 indoc! {"
3158 ## User (checkpoint)
3159
3160 Lorem
3161
3162 ## Assistant
3163
3164 LOREM
3165
3166 "}
3167 );
3168 });
3169 assert_eq!(fs.files(), vec![Path::new(path!("/test/file-0"))]);
3170
3171 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["ipsum".into()], cx)))
3172 .await
3173 .unwrap();
3174 thread.read_with(cx, |thread, cx| {
3175 assert_eq!(
3176 thread.to_markdown(cx),
3177 indoc! {"
3178 ## User (checkpoint)
3179
3180 Lorem
3181
3182 ## Assistant
3183
3184 LOREM
3185
3186 ## User (checkpoint)
3187
3188 ipsum
3189
3190 ## Assistant
3191
3192 IPSUM
3193
3194 "}
3195 );
3196 });
3197 assert_eq!(
3198 fs.files(),
3199 vec![
3200 Path::new(path!("/test/file-0")),
3201 Path::new(path!("/test/file-1"))
3202 ]
3203 );
3204
3205 // Checkpoint isn't stored when there are no changes.
3206 simulate_changes.store(false, SeqCst);
3207 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["dolor".into()], cx)))
3208 .await
3209 .unwrap();
3210 thread.read_with(cx, |thread, cx| {
3211 assert_eq!(
3212 thread.to_markdown(cx),
3213 indoc! {"
3214 ## User (checkpoint)
3215
3216 Lorem
3217
3218 ## Assistant
3219
3220 LOREM
3221
3222 ## User (checkpoint)
3223
3224 ipsum
3225
3226 ## Assistant
3227
3228 IPSUM
3229
3230 ## User
3231
3232 dolor
3233
3234 ## Assistant
3235
3236 DOLOR
3237
3238 "}
3239 );
3240 });
3241 assert_eq!(
3242 fs.files(),
3243 vec![
3244 Path::new(path!("/test/file-0")),
3245 Path::new(path!("/test/file-1"))
3246 ]
3247 );
3248
3249 // Rewinding the conversation truncates the history and restores the checkpoint.
3250 thread
3251 .update(cx, |thread, cx| {
3252 let AgentThreadEntry::UserMessage(message) = &thread.entries[2] else {
3253 panic!("unexpected entries {:?}", thread.entries)
3254 };
3255 thread.restore_checkpoint(message.id.clone().unwrap(), cx)
3256 })
3257 .await
3258 .unwrap();
3259 thread.read_with(cx, |thread, cx| {
3260 assert_eq!(
3261 thread.to_markdown(cx),
3262 indoc! {"
3263 ## User (checkpoint)
3264
3265 Lorem
3266
3267 ## Assistant
3268
3269 LOREM
3270
3271 "}
3272 );
3273 });
3274 assert_eq!(fs.files(), vec![Path::new(path!("/test/file-0"))]);
3275 }
3276
3277 #[gpui::test]
3278 async fn test_tool_result_refusal(cx: &mut TestAppContext) {
3279 use std::sync::atomic::AtomicUsize;
3280 init_test(cx);
3281
3282 let fs = FakeFs::new(cx.executor());
3283 let project = Project::test(fs, None, cx).await;
3284
3285 // Create a connection that simulates refusal after tool result
3286 let prompt_count = Arc::new(AtomicUsize::new(0));
3287 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3288 let prompt_count = prompt_count.clone();
3289 move |_request, thread, mut cx| {
3290 let count = prompt_count.fetch_add(1, SeqCst);
3291 async move {
3292 if count == 0 {
3293 // First prompt: Generate a tool call with result
3294 thread.update(&mut cx, |thread, cx| {
3295 thread
3296 .handle_session_update(
3297 acp::SessionUpdate::ToolCall(acp::ToolCall {
3298 id: acp::ToolCallId("tool1".into()),
3299 title: "Test Tool".into(),
3300 kind: acp::ToolKind::Fetch,
3301 status: acp::ToolCallStatus::Completed,
3302 content: vec![],
3303 locations: vec![],
3304 raw_input: Some(serde_json::json!({"query": "test"})),
3305 raw_output: Some(
3306 serde_json::json!({"result": "inappropriate content"}),
3307 ),
3308 meta: None,
3309 }),
3310 cx,
3311 )
3312 .unwrap();
3313 })?;
3314
3315 // Now return refusal because of the tool result
3316 Ok(acp::PromptResponse {
3317 stop_reason: acp::StopReason::Refusal,
3318 meta: None,
3319 })
3320 } else {
3321 Ok(acp::PromptResponse {
3322 stop_reason: acp::StopReason::EndTurn,
3323 meta: None,
3324 })
3325 }
3326 }
3327 .boxed_local()
3328 }
3329 }));
3330
3331 let thread = cx
3332 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3333 .await
3334 .unwrap();
3335
3336 // Track if we see a Refusal event
3337 let saw_refusal_event = Arc::new(std::sync::Mutex::new(false));
3338 let saw_refusal_event_captured = saw_refusal_event.clone();
3339 thread.update(cx, |_thread, cx| {
3340 cx.subscribe(
3341 &thread,
3342 move |_thread, _event_thread, event: &AcpThreadEvent, _cx| {
3343 if matches!(event, AcpThreadEvent::Refusal) {
3344 *saw_refusal_event_captured.lock().unwrap() = true;
3345 }
3346 },
3347 )
3348 .detach();
3349 });
3350
3351 // Send a user message - this will trigger tool call and then refusal
3352 let send_task = thread.update(cx, |thread, cx| {
3353 thread.send(
3354 vec![acp::ContentBlock::Text(acp::TextContent {
3355 text: "Hello".into(),
3356 annotations: None,
3357 meta: None,
3358 })],
3359 cx,
3360 )
3361 });
3362 cx.background_executor.spawn(send_task).detach();
3363 cx.run_until_parked();
3364
3365 // Verify that:
3366 // 1. A Refusal event WAS emitted (because it's a tool result refusal, not user prompt)
3367 // 2. The user message was NOT truncated
3368 assert!(
3369 *saw_refusal_event.lock().unwrap(),
3370 "Refusal event should be emitted for tool result refusals"
3371 );
3372
3373 thread.read_with(cx, |thread, _| {
3374 let entries = thread.entries();
3375 assert!(entries.len() >= 2, "Should have user message and tool call");
3376
3377 // Verify user message is still there
3378 assert!(
3379 matches!(entries[0], AgentThreadEntry::UserMessage(_)),
3380 "User message should not be truncated"
3381 );
3382
3383 // Verify tool call is there with result
3384 if let AgentThreadEntry::ToolCall(tool_call) = &entries[1] {
3385 assert!(
3386 tool_call.raw_output.is_some(),
3387 "Tool call should have output"
3388 );
3389 } else {
3390 panic!("Expected tool call at index 1");
3391 }
3392 });
3393 }
3394
3395 #[gpui::test]
3396 async fn test_user_prompt_refusal_emits_event(cx: &mut TestAppContext) {
3397 init_test(cx);
3398
3399 let fs = FakeFs::new(cx.executor());
3400 let project = Project::test(fs, None, cx).await;
3401
3402 let refuse_next = Arc::new(AtomicBool::new(false));
3403 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3404 let refuse_next = refuse_next.clone();
3405 move |_request, _thread, _cx| {
3406 if refuse_next.load(SeqCst) {
3407 async move {
3408 Ok(acp::PromptResponse {
3409 stop_reason: acp::StopReason::Refusal,
3410 meta: None,
3411 })
3412 }
3413 .boxed_local()
3414 } else {
3415 async move {
3416 Ok(acp::PromptResponse {
3417 stop_reason: acp::StopReason::EndTurn,
3418 meta: None,
3419 })
3420 }
3421 .boxed_local()
3422 }
3423 }
3424 }));
3425
3426 let thread = cx
3427 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3428 .await
3429 .unwrap();
3430
3431 // Track if we see a Refusal event
3432 let saw_refusal_event = Arc::new(std::sync::Mutex::new(false));
3433 let saw_refusal_event_captured = saw_refusal_event.clone();
3434 thread.update(cx, |_thread, cx| {
3435 cx.subscribe(
3436 &thread,
3437 move |_thread, _event_thread, event: &AcpThreadEvent, _cx| {
3438 if matches!(event, AcpThreadEvent::Refusal) {
3439 *saw_refusal_event_captured.lock().unwrap() = true;
3440 }
3441 },
3442 )
3443 .detach();
3444 });
3445
3446 // Send a message that will be refused
3447 refuse_next.store(true, SeqCst);
3448 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["hello".into()], cx)))
3449 .await
3450 .unwrap();
3451
3452 // Verify that a Refusal event WAS emitted for user prompt refusal
3453 assert!(
3454 *saw_refusal_event.lock().unwrap(),
3455 "Refusal event should be emitted for user prompt refusals"
3456 );
3457
3458 // Verify the message was truncated (user prompt refusal)
3459 thread.read_with(cx, |thread, cx| {
3460 assert_eq!(thread.to_markdown(cx), "");
3461 });
3462 }
3463
3464 #[gpui::test]
3465 async fn test_refusal(cx: &mut TestAppContext) {
3466 init_test(cx);
3467 let fs = FakeFs::new(cx.background_executor.clone());
3468 fs.insert_tree(path!("/"), json!({})).await;
3469 let project = Project::test(fs.clone(), [path!("/").as_ref()], cx).await;
3470
3471 let refuse_next = Arc::new(AtomicBool::new(false));
3472 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3473 let refuse_next = refuse_next.clone();
3474 move |request, thread, mut cx| {
3475 let refuse_next = refuse_next.clone();
3476 async move {
3477 if refuse_next.load(SeqCst) {
3478 return Ok(acp::PromptResponse {
3479 stop_reason: acp::StopReason::Refusal,
3480 meta: None,
3481 });
3482 }
3483
3484 let acp::ContentBlock::Text(content) = &request.prompt[0] else {
3485 panic!("expected text content block");
3486 };
3487 thread.update(&mut cx, |thread, cx| {
3488 thread
3489 .handle_session_update(
3490 acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk {
3491 content: content.text.to_uppercase().into(),
3492 meta: None,
3493 }),
3494 cx,
3495 )
3496 .unwrap();
3497 })?;
3498 Ok(acp::PromptResponse {
3499 stop_reason: acp::StopReason::EndTurn,
3500 meta: None,
3501 })
3502 }
3503 .boxed_local()
3504 }
3505 }));
3506 let thread = cx
3507 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3508 .await
3509 .unwrap();
3510
3511 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["hello".into()], cx)))
3512 .await
3513 .unwrap();
3514 thread.read_with(cx, |thread, cx| {
3515 assert_eq!(
3516 thread.to_markdown(cx),
3517 indoc! {"
3518 ## User
3519
3520 hello
3521
3522 ## Assistant
3523
3524 HELLO
3525
3526 "}
3527 );
3528 });
3529
3530 // Simulate refusing the second message. The message should be truncated
3531 // when a user prompt is refused.
3532 refuse_next.store(true, SeqCst);
3533 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["world".into()], cx)))
3534 .await
3535 .unwrap();
3536 thread.read_with(cx, |thread, cx| {
3537 assert_eq!(
3538 thread.to_markdown(cx),
3539 indoc! {"
3540 ## User
3541
3542 hello
3543
3544 ## Assistant
3545
3546 HELLO
3547
3548 "}
3549 );
3550 });
3551 }
3552
3553 async fn run_until_first_tool_call(
3554 thread: &Entity<AcpThread>,
3555 cx: &mut TestAppContext,
3556 ) -> usize {
3557 let (mut tx, mut rx) = mpsc::channel::<usize>(1);
3558
3559 let subscription = cx.update(|cx| {
3560 cx.subscribe(thread, move |thread, _, cx| {
3561 for (ix, entry) in thread.read(cx).entries.iter().enumerate() {
3562 if matches!(entry, AgentThreadEntry::ToolCall(_)) {
3563 return tx.try_send(ix).unwrap();
3564 }
3565 }
3566 })
3567 });
3568
3569 select! {
3570 _ = futures::FutureExt::fuse(smol::Timer::after(Duration::from_secs(10))) => {
3571 panic!("Timeout waiting for tool call")
3572 }
3573 ix = rx.next().fuse() => {
3574 drop(subscription);
3575 ix.unwrap()
3576 }
3577 }
3578 }
3579
3580 #[derive(Clone, Default)]
3581 struct FakeAgentConnection {
3582 auth_methods: Vec<acp::AuthMethod>,
3583 sessions: Arc<parking_lot::Mutex<HashMap<acp::SessionId, WeakEntity<AcpThread>>>>,
3584 on_user_message: Option<
3585 Rc<
3586 dyn Fn(
3587 acp::PromptRequest,
3588 WeakEntity<AcpThread>,
3589 AsyncApp,
3590 ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
3591 + 'static,
3592 >,
3593 >,
3594 }
3595
3596 impl FakeAgentConnection {
3597 fn new() -> Self {
3598 Self {
3599 auth_methods: Vec::new(),
3600 on_user_message: None,
3601 sessions: Arc::default(),
3602 }
3603 }
3604
3605 #[expect(unused)]
3606 fn with_auth_methods(mut self, auth_methods: Vec<acp::AuthMethod>) -> Self {
3607 self.auth_methods = auth_methods;
3608 self
3609 }
3610
3611 fn on_user_message(
3612 mut self,
3613 handler: impl Fn(
3614 acp::PromptRequest,
3615 WeakEntity<AcpThread>,
3616 AsyncApp,
3617 ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
3618 + 'static,
3619 ) -> Self {
3620 self.on_user_message.replace(Rc::new(handler));
3621 self
3622 }
3623 }
3624
3625 impl AgentConnection for FakeAgentConnection {
3626 fn auth_methods(&self) -> &[acp::AuthMethod] {
3627 &self.auth_methods
3628 }
3629
3630 fn new_thread(
3631 self: Rc<Self>,
3632 project: Entity<Project>,
3633 _cwd: &Path,
3634 cx: &mut App,
3635 ) -> Task<gpui::Result<Entity<AcpThread>>> {
3636 let session_id = acp::SessionId(
3637 rand::rng()
3638 .sample_iter(&distr::Alphanumeric)
3639 .take(7)
3640 .map(char::from)
3641 .collect::<String>()
3642 .into(),
3643 );
3644 let action_log = cx.new(|_| ActionLog::new(project.clone()));
3645 let thread = cx.new(|cx| {
3646 AcpThread::new(
3647 "Test",
3648 self.clone(),
3649 project,
3650 action_log,
3651 session_id.clone(),
3652 watch::Receiver::constant(acp::PromptCapabilities {
3653 image: true,
3654 audio: true,
3655 embedded_context: true,
3656 meta: None,
3657 }),
3658 cx,
3659 )
3660 });
3661 self.sessions.lock().insert(session_id, thread.downgrade());
3662 Task::ready(Ok(thread))
3663 }
3664
3665 fn authenticate(&self, method: acp::AuthMethodId, _cx: &mut App) -> Task<gpui::Result<()>> {
3666 if self.auth_methods().iter().any(|m| m.id == method) {
3667 Task::ready(Ok(()))
3668 } else {
3669 Task::ready(Err(anyhow!("Invalid Auth Method")))
3670 }
3671 }
3672
3673 fn prompt(
3674 &self,
3675 _id: Option<UserMessageId>,
3676 params: acp::PromptRequest,
3677 cx: &mut App,
3678 ) -> Task<gpui::Result<acp::PromptResponse>> {
3679 let sessions = self.sessions.lock();
3680 let thread = sessions.get(¶ms.session_id).unwrap();
3681 if let Some(handler) = &self.on_user_message {
3682 let handler = handler.clone();
3683 let thread = thread.clone();
3684 cx.spawn(async move |cx| handler(params, thread, cx.clone()).await)
3685 } else {
3686 Task::ready(Ok(acp::PromptResponse {
3687 stop_reason: acp::StopReason::EndTurn,
3688 meta: None,
3689 }))
3690 }
3691 }
3692
3693 fn cancel(&self, session_id: &acp::SessionId, cx: &mut App) {
3694 let sessions = self.sessions.lock();
3695 let thread = sessions.get(session_id).unwrap().clone();
3696
3697 cx.spawn(async move |cx| {
3698 thread
3699 .update(cx, |thread, cx| thread.cancel(cx))
3700 .unwrap()
3701 .await
3702 })
3703 .detach();
3704 }
3705
3706 fn truncate(
3707 &self,
3708 session_id: &acp::SessionId,
3709 _cx: &App,
3710 ) -> Option<Rc<dyn AgentSessionTruncate>> {
3711 Some(Rc::new(FakeAgentSessionEditor {
3712 _session_id: session_id.clone(),
3713 }))
3714 }
3715
3716 fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
3717 self
3718 }
3719 }
3720
3721 struct FakeAgentSessionEditor {
3722 _session_id: acp::SessionId,
3723 }
3724
3725 impl AgentSessionTruncate for FakeAgentSessionEditor {
3726 fn run(&self, _message_id: UserMessageId, _cx: &mut App) -> Task<Result<()>> {
3727 Task::ready(Ok(()))
3728 }
3729 }
3730
3731 #[gpui::test]
3732 async fn test_tool_call_not_found_creates_failed_entry(cx: &mut TestAppContext) {
3733 init_test(cx);
3734
3735 let fs = FakeFs::new(cx.executor());
3736 let project = Project::test(fs, [], cx).await;
3737 let connection = Rc::new(FakeAgentConnection::new());
3738 let thread = cx
3739 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3740 .await
3741 .unwrap();
3742
3743 // Try to update a tool call that doesn't exist
3744 let nonexistent_id = acp::ToolCallId("nonexistent-tool-call".into());
3745 thread.update(cx, |thread, cx| {
3746 let result = thread.handle_session_update(
3747 acp::SessionUpdate::ToolCallUpdate(acp::ToolCallUpdate {
3748 id: nonexistent_id.clone(),
3749 fields: acp::ToolCallUpdateFields {
3750 status: Some(acp::ToolCallStatus::Completed),
3751 ..Default::default()
3752 },
3753 meta: None,
3754 }),
3755 cx,
3756 );
3757
3758 // The update should succeed (not return an error)
3759 assert!(result.is_ok());
3760
3761 // There should now be exactly one entry in the thread
3762 assert_eq!(thread.entries.len(), 1);
3763
3764 // The entry should be a failed tool call
3765 if let AgentThreadEntry::ToolCall(tool_call) = &thread.entries[0] {
3766 assert_eq!(tool_call.id, nonexistent_id);
3767 assert!(matches!(tool_call.status, ToolCallStatus::Failed));
3768 assert_eq!(tool_call.kind, acp::ToolKind::Fetch);
3769
3770 // Check that the content contains the error message
3771 assert_eq!(tool_call.content.len(), 1);
3772 if let ToolCallContent::ContentBlock(content_block) = &tool_call.content[0] {
3773 match content_block {
3774 ContentBlock::Markdown { markdown } => {
3775 let markdown_text = markdown.read(cx).source();
3776 assert!(markdown_text.contains("Tool call not found"));
3777 }
3778 ContentBlock::Empty => panic!("Expected markdown content, got empty"),
3779 ContentBlock::ResourceLink { .. } => {
3780 panic!("Expected markdown content, got resource link")
3781 }
3782 }
3783 } else {
3784 panic!("Expected ContentBlock, got: {:?}", tool_call.content[0]);
3785 }
3786 } else {
3787 panic!("Expected ToolCall entry, got: {:?}", thread.entries[0]);
3788 }
3789 });
3790 }
3791}