1mod connection;
2mod diff;
3mod mention;
4mod terminal;
5
6use agent_settings::AgentSettings;
7use collections::HashSet;
8pub use connection::*;
9pub use diff::*;
10use language::language_settings::FormatOnSave;
11pub use mention::*;
12use project::lsp_store::{FormatTrigger, LspFormatTarget};
13use serde::{Deserialize, Serialize};
14use settings::Settings as _;
15use task::{Shell, ShellBuilder};
16pub use terminal::*;
17
18use action_log::ActionLog;
19use agent_client_protocol::{self as acp};
20use anyhow::{Context as _, Result, anyhow};
21use editor::Bias;
22use futures::{FutureExt, channel::oneshot, future::BoxFuture};
23use gpui::{AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Task, WeakEntity};
24use itertools::Itertools;
25use language::{Anchor, Buffer, BufferSnapshot, LanguageRegistry, Point, ToPoint, text_diff};
26use markdown::Markdown;
27use project::{AgentLocation, Project, git_store::GitStoreCheckpoint};
28use std::collections::HashMap;
29use std::error::Error;
30use std::fmt::{Formatter, Write};
31use std::ops::Range;
32use std::process::ExitStatus;
33use std::rc::Rc;
34use std::time::{Duration, Instant};
35use std::{fmt::Display, mem, path::PathBuf, sync::Arc};
36use ui::App;
37use util::{ResultExt, get_default_system_shell_preferring_bash, paths::PathStyle};
38use uuid::Uuid;
39
40#[derive(Debug)]
41pub struct UserMessage<T> {
42 pub id: Option<UserMessageId>,
43 pub content: ContentBlock,
44 pub chunks: Vec<acp::ContentBlock<T>>,
45 pub checkpoint: Option<Checkpoint>,
46}
47
48#[derive(Debug)]
49pub struct Checkpoint {
50 git_checkpoint: GitStoreCheckpoint,
51 pub show: bool,
52}
53
54impl<T> UserMessage<T> {
55 fn to_markdown(&self, cx: &App) -> String {
56 let mut markdown = String::new();
57 if self
58 .checkpoint
59 .as_ref()
60 .is_some_and(|checkpoint| checkpoint.show)
61 {
62 writeln!(markdown, "## User (checkpoint)").unwrap();
63 } else {
64 writeln!(markdown, "## User").unwrap();
65 }
66 writeln!(markdown).unwrap();
67 writeln!(markdown, "{}", self.content.to_markdown(cx)).unwrap();
68 writeln!(markdown).unwrap();
69 markdown
70 }
71}
72
73#[derive(Debug, PartialEq)]
74pub struct AssistantMessage {
75 pub chunks: Vec<AssistantMessageChunk>,
76}
77
78impl AssistantMessage {
79 pub fn to_markdown(&self, cx: &App) -> String {
80 format!(
81 "## Assistant\n\n{}\n\n",
82 self.chunks
83 .iter()
84 .map(|chunk| chunk.to_markdown(cx))
85 .join("\n\n")
86 )
87 }
88}
89
90#[derive(Debug, PartialEq)]
91pub enum AssistantMessageChunk {
92 Message { block: ContentBlock },
93 Thought { block: ContentBlock },
94}
95
96impl AssistantMessageChunk {
97 pub fn from_str(
98 chunk: &str,
99 language_registry: &Arc<LanguageRegistry>,
100 path_style: PathStyle,
101 cx: &mut App,
102 ) -> Self {
103 Self::Message {
104 block: ContentBlock::new(chunk.into(), language_registry, path_style, cx),
105 }
106 }
107
108 fn to_markdown(&self, cx: &App) -> String {
109 match self {
110 Self::Message { block } => block.to_markdown(cx).to_string(),
111 Self::Thought { block } => {
112 format!("<thinking>\n{}\n</thinking>", block.to_markdown(cx))
113 }
114 }
115 }
116}
117
118#[derive(Debug)]
119pub enum AgentThreadEntry<T> {
120 UserMessage(UserMessage<T>),
121 AssistantMessage(AssistantMessage),
122 ToolCall(ToolCall),
123}
124
125impl<T> AgentThreadEntry<T> {
126 pub fn to_markdown(&self, cx: &App) -> String {
127 match self {
128 Self::UserMessage(message) => message.to_markdown(cx),
129 Self::AssistantMessage(message) => message.to_markdown(cx),
130 Self::ToolCall(tool_call) => tool_call.to_markdown(cx),
131 }
132 }
133
134 pub fn user_message(&self) -> Option<&UserMessage<T>> {
135 if let AgentThreadEntry::UserMessage(message) = self {
136 Some(message)
137 } else {
138 None
139 }
140 }
141
142 pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
143 if let AgentThreadEntry::ToolCall(call) = self {
144 itertools::Either::Left(call.diffs())
145 } else {
146 itertools::Either::Right(std::iter::empty())
147 }
148 }
149
150 pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
151 if let AgentThreadEntry::ToolCall(call) = self {
152 itertools::Either::Left(call.terminals())
153 } else {
154 itertools::Either::Right(std::iter::empty())
155 }
156 }
157
158 pub fn location(&self, ix: usize) -> Option<(acp::ToolCallLocation, AgentLocation)> {
159 if let AgentThreadEntry::ToolCall(ToolCall {
160 locations,
161 resolved_locations,
162 ..
163 }) = self
164 {
165 Some((
166 locations.get(ix)?.clone(),
167 resolved_locations.get(ix)?.clone()?,
168 ))
169 } else {
170 None
171 }
172 }
173}
174
175#[derive(Debug)]
176pub struct ToolCall {
177 pub id: acp::ToolCallId,
178 pub label: Entity<Markdown>,
179 pub kind: acp::ToolKind,
180 pub content: Vec<ToolCallContent>,
181 pub status: ToolCallStatus,
182 pub locations: Vec<acp::ToolCallLocation>,
183 pub resolved_locations: Vec<Option<AgentLocation>>,
184 pub raw_input: Option<serde_json::Value>,
185 pub raw_output: Option<serde_json::Value>,
186}
187
188impl ToolCall {
189 fn from_acp(
190 tool_call: acp::ToolCall,
191 status: ToolCallStatus,
192 language_registry: Arc<LanguageRegistry>,
193 path_style: PathStyle,
194 terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
195 cx: &mut App,
196 ) -> Result<Self> {
197 let title = if let Some((first_line, _)) = tool_call.title.split_once("\n") {
198 first_line.to_owned() + "…"
199 } else {
200 tool_call.title
201 };
202 let mut content = Vec::with_capacity(tool_call.content.len());
203 for item in tool_call.content {
204 content.push(ToolCallContent::from_acp(
205 item,
206 language_registry.clone(),
207 path_style,
208 terminals,
209 cx,
210 )?);
211 }
212
213 let result = Self {
214 id: tool_call.id,
215 label: cx
216 .new(|cx| Markdown::new(title.into(), Some(language_registry.clone()), None, cx)),
217 kind: tool_call.kind,
218 content,
219 locations: tool_call.locations,
220 resolved_locations: Vec::default(),
221 status,
222 raw_input: tool_call.raw_input,
223 raw_output: tool_call.raw_output,
224 };
225 Ok(result)
226 }
227
228 fn update_fields(
229 &mut self,
230 fields: acp::ToolCallUpdateFields,
231 language_registry: Arc<LanguageRegistry>,
232 path_style: PathStyle,
233 terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
234 cx: &mut App,
235 ) -> Result<()> {
236 let acp::ToolCallUpdateFields {
237 kind,
238 status,
239 title,
240 content,
241 locations,
242 raw_input,
243 raw_output,
244 } = fields;
245
246 if let Some(kind) = kind {
247 self.kind = kind;
248 }
249
250 if let Some(status) = status {
251 self.status = status.into();
252 }
253
254 if let Some(title) = title {
255 self.label.update(cx, |label, cx| {
256 if let Some((first_line, _)) = title.split_once("\n") {
257 label.replace(first_line.to_owned() + "…", cx)
258 } else {
259 label.replace(title, cx);
260 }
261 });
262 }
263
264 if let Some(content) = content {
265 let new_content_len = content.len();
266 let mut content = content.into_iter();
267
268 // Reuse existing content if we can
269 for (old, new) in self.content.iter_mut().zip(content.by_ref()) {
270 old.update_from_acp(new, language_registry.clone(), path_style, terminals, cx)?;
271 }
272 for new in content {
273 self.content.push(ToolCallContent::from_acp(
274 new,
275 language_registry.clone(),
276 path_style,
277 terminals,
278 cx,
279 )?)
280 }
281 self.content.truncate(new_content_len);
282 }
283
284 if let Some(locations) = locations {
285 self.locations = locations;
286 }
287
288 if let Some(raw_input) = raw_input {
289 self.raw_input = Some(raw_input);
290 }
291
292 if let Some(raw_output) = raw_output {
293 if self.content.is_empty()
294 && let Some(markdown) = markdown_for_raw_output(&raw_output, &language_registry, cx)
295 {
296 self.content
297 .push(ToolCallContent::ContentBlock(ContentBlock::Markdown {
298 markdown,
299 }));
300 }
301 self.raw_output = Some(raw_output);
302 }
303 Ok(())
304 }
305
306 pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
307 self.content.iter().filter_map(|content| match content {
308 ToolCallContent::Diff(diff) => Some(diff),
309 ToolCallContent::ContentBlock(_) => None,
310 ToolCallContent::Terminal(_) => None,
311 })
312 }
313
314 pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
315 self.content.iter().filter_map(|content| match content {
316 ToolCallContent::Terminal(terminal) => Some(terminal),
317 ToolCallContent::ContentBlock(_) => None,
318 ToolCallContent::Diff(_) => None,
319 })
320 }
321
322 fn to_markdown(&self, cx: &App) -> String {
323 let mut markdown = format!(
324 "**Tool Call: {}**\nStatus: {}\n\n",
325 self.label.read(cx).source(),
326 self.status
327 );
328 for content in &self.content {
329 markdown.push_str(content.to_markdown(cx).as_str());
330 markdown.push_str("\n\n");
331 }
332 markdown
333 }
334
335 async fn resolve_location(
336 location: acp::ToolCallLocation,
337 project: WeakEntity<Project>,
338 cx: &mut AsyncApp,
339 ) -> Option<ResolvedLocation> {
340 let buffer = project
341 .update(cx, |project, cx| {
342 project
343 .project_path_for_absolute_path(&location.path, cx)
344 .map(|path| project.open_buffer(path, cx))
345 })
346 .ok()??;
347 let buffer = buffer.await.log_err()?;
348 let position = buffer
349 .update(cx, |buffer, _| {
350 if let Some(row) = location.line {
351 let snapshot = buffer.snapshot();
352 let column = snapshot.indent_size_for_line(row).len;
353 let point = snapshot.clip_point(Point::new(row, column), Bias::Left);
354 snapshot.anchor_before(point)
355 } else {
356 Anchor::MIN
357 }
358 })
359 .ok()?;
360
361 Some(ResolvedLocation { buffer, position })
362 }
363
364 fn resolve_locations(
365 &self,
366 project: Entity<Project>,
367 cx: &mut App,
368 ) -> Task<Vec<Option<ResolvedLocation>>> {
369 let locations = self.locations.clone();
370 project.update(cx, |_, cx| {
371 cx.spawn(async move |project, cx| {
372 let mut new_locations = Vec::new();
373 for location in locations {
374 new_locations.push(Self::resolve_location(location, project.clone(), cx).await);
375 }
376 new_locations
377 })
378 })
379 }
380}
381
382// Separate so we can hold a strong reference to the buffer
383// for saving on the thread
384#[derive(Clone, Debug, PartialEq, Eq)]
385struct ResolvedLocation {
386 buffer: Entity<Buffer>,
387 position: Anchor,
388}
389
390impl From<&ResolvedLocation> for AgentLocation {
391 fn from(value: &ResolvedLocation) -> Self {
392 Self {
393 buffer: value.buffer.downgrade(),
394 position: value.position,
395 }
396 }
397}
398
399#[derive(Debug)]
400pub enum ToolCallStatus {
401 /// The tool call hasn't started running yet, but we start showing it to
402 /// the user.
403 Pending,
404 /// The tool call is waiting for confirmation from the user.
405 WaitingForConfirmation {
406 options: Vec<acp::PermissionOption>,
407 respond_tx: oneshot::Sender<acp::PermissionOptionId>,
408 },
409 /// The tool call is currently running.
410 InProgress,
411 /// The tool call completed successfully.
412 Completed,
413 /// The tool call failed.
414 Failed,
415 /// The user rejected the tool call.
416 Rejected,
417 /// The user canceled generation so the tool call was canceled.
418 Canceled,
419}
420
421impl From<acp::ToolCallStatus> for ToolCallStatus {
422 fn from(status: acp::ToolCallStatus) -> Self {
423 match status {
424 acp::ToolCallStatus::Pending => Self::Pending,
425 acp::ToolCallStatus::InProgress => Self::InProgress,
426 acp::ToolCallStatus::Completed => Self::Completed,
427 acp::ToolCallStatus::Failed => Self::Failed,
428 }
429 }
430}
431
432impl Display for ToolCallStatus {
433 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
434 write!(
435 f,
436 "{}",
437 match self {
438 ToolCallStatus::Pending => "Pending",
439 ToolCallStatus::WaitingForConfirmation { .. } => "Waiting for confirmation",
440 ToolCallStatus::InProgress => "In Progress",
441 ToolCallStatus::Completed => "Completed",
442 ToolCallStatus::Failed => "Failed",
443 ToolCallStatus::Rejected => "Rejected",
444 ToolCallStatus::Canceled => "Canceled",
445 }
446 )
447 }
448}
449
450#[derive(Debug, PartialEq, Clone)]
451pub enum ContentBlock {
452 Empty,
453 Markdown { markdown: Entity<Markdown> },
454 ResourceLink { resource_link: acp::ResourceLink },
455}
456
457impl ContentBlock {
458 pub fn new(
459 block: acp::ContentBlock,
460 language_registry: &Arc<LanguageRegistry>,
461 path_style: PathStyle,
462 cx: &mut App,
463 ) -> Self {
464 let mut this = Self::Empty;
465 this.append(block, language_registry, path_style, cx);
466 this
467 }
468
469 pub fn new_combined(
470 blocks: impl IntoIterator<Item = acp::ContentBlock>,
471 language_registry: Arc<LanguageRegistry>,
472 path_style: PathStyle,
473 cx: &mut App,
474 ) -> Self {
475 let mut this = Self::Empty;
476 for block in blocks {
477 this.append(block, &language_registry, path_style, cx);
478 }
479 this
480 }
481
482 pub fn append(
483 &mut self,
484 block: acp::ContentBlock,
485 language_registry: &Arc<LanguageRegistry>,
486 path_style: PathStyle,
487 cx: &mut App,
488 ) {
489 if matches!(self, ContentBlock::Empty)
490 && let acp::ContentBlock::ResourceLink(resource_link) = block
491 {
492 *self = ContentBlock::ResourceLink { resource_link };
493 return;
494 }
495
496 let new_content = self.block_string_contents(block, path_style);
497
498 match self {
499 ContentBlock::Empty => {
500 *self = Self::create_markdown_block(new_content, language_registry, cx);
501 }
502 ContentBlock::Markdown { markdown } => {
503 markdown.update(cx, |markdown, cx| markdown.append(&new_content, cx));
504 }
505 ContentBlock::ResourceLink { resource_link } => {
506 let existing_content = Self::resource_link_md(&resource_link.uri, path_style);
507 let combined = format!("{}\n{}", existing_content, new_content);
508
509 *self = Self::create_markdown_block(combined, language_registry, cx);
510 }
511 }
512 }
513
514 fn create_markdown_block(
515 content: String,
516 language_registry: &Arc<LanguageRegistry>,
517 cx: &mut App,
518 ) -> ContentBlock {
519 ContentBlock::Markdown {
520 markdown: cx
521 .new(|cx| Markdown::new(content.into(), Some(language_registry.clone()), None, cx)),
522 }
523 }
524
525 fn block_string_contents(&self, block: acp::ContentBlock, path_style: PathStyle) -> String {
526 match block {
527 acp::ContentBlock::Text(text_content) => text_content.text,
528 acp::ContentBlock::ResourceLink(resource_link) => {
529 Self::resource_link_md(&resource_link.uri, path_style)
530 }
531 acp::ContentBlock::Resource(acp::EmbeddedResource {
532 resource:
533 acp::EmbeddedResourceResource::TextResourceContents(acp::TextResourceContents {
534 uri,
535 ..
536 }),
537 ..
538 }) => Self::resource_link_md(&uri, path_style),
539 acp::ContentBlock::Image(image) => Self::image_md(&image),
540 acp::ContentBlock::Audio(_) | acp::ContentBlock::Resource(_) => String::new(),
541 }
542 }
543
544 fn resource_link_md(uri: &str, path_style: PathStyle) -> String {
545 if let Some(uri) = MentionUri::parse(uri, path_style).log_err() {
546 uri.as_link().to_string()
547 } else {
548 uri.to_string()
549 }
550 }
551
552 fn image_md(_image: &acp::ImageContent) -> String {
553 "`Image`".into()
554 }
555
556 pub fn to_markdown<'a>(&'a self, cx: &'a App) -> &'a str {
557 match self {
558 ContentBlock::Empty => "",
559 ContentBlock::Markdown { markdown } => markdown.read(cx).source(),
560 ContentBlock::ResourceLink { resource_link } => &resource_link.uri,
561 }
562 }
563
564 pub fn markdown(&self) -> Option<&Entity<Markdown>> {
565 match self {
566 ContentBlock::Empty => None,
567 ContentBlock::Markdown { markdown } => Some(markdown),
568 ContentBlock::ResourceLink { .. } => None,
569 }
570 }
571
572 pub fn resource_link(&self) -> Option<&acp::ResourceLink> {
573 match self {
574 ContentBlock::ResourceLink { resource_link } => Some(resource_link),
575 _ => None,
576 }
577 }
578}
579
580#[derive(Debug)]
581pub enum ToolCallContent {
582 ContentBlock(ContentBlock),
583 Diff(Entity<Diff>),
584 Terminal(Entity<Terminal>),
585}
586
587impl ToolCallContent {
588 pub fn from_acp(
589 content: acp::ToolCallContent,
590 language_registry: Arc<LanguageRegistry>,
591 path_style: PathStyle,
592 terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
593 cx: &mut App,
594 ) -> Result<Self> {
595 match content {
596 acp::ToolCallContent::Content { content } => Ok(Self::ContentBlock(ContentBlock::new(
597 content,
598 &language_registry,
599 path_style,
600 cx,
601 ))),
602 acp::ToolCallContent::Diff { diff } => Ok(Self::Diff(cx.new(|cx| {
603 Diff::finalized(
604 diff.path.to_string_lossy().into_owned(),
605 diff.old_text,
606 diff.new_text,
607 language_registry,
608 cx,
609 )
610 }))),
611 acp::ToolCallContent::Terminal { terminal_id } => terminals
612 .get(&terminal_id)
613 .cloned()
614 .map(Self::Terminal)
615 .ok_or_else(|| anyhow::anyhow!("Terminal with id `{}` not found", terminal_id)),
616 }
617 }
618
619 pub fn update_from_acp(
620 &mut self,
621 new: acp::ToolCallContent,
622 language_registry: Arc<LanguageRegistry>,
623 path_style: PathStyle,
624 terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
625 cx: &mut App,
626 ) -> Result<()> {
627 let needs_update = match (&self, &new) {
628 (Self::Diff(old_diff), acp::ToolCallContent::Diff { diff: new_diff }) => {
629 old_diff.read(cx).needs_update(
630 new_diff.old_text.as_deref().unwrap_or(""),
631 &new_diff.new_text,
632 cx,
633 )
634 }
635 _ => true,
636 };
637
638 if needs_update {
639 *self = Self::from_acp(new, language_registry, path_style, terminals, cx)?;
640 }
641 Ok(())
642 }
643
644 pub fn to_markdown(&self, cx: &App) -> String {
645 match self {
646 Self::ContentBlock(content) => content.to_markdown(cx).to_string(),
647 Self::Diff(diff) => diff.read(cx).to_markdown(cx),
648 Self::Terminal(terminal) => terminal.read(cx).to_markdown(cx),
649 }
650 }
651}
652
653#[derive(Debug, PartialEq)]
654pub enum ToolCallUpdate {
655 UpdateFields(acp::ToolCallUpdate),
656 UpdateDiff(ToolCallUpdateDiff),
657 UpdateTerminal(ToolCallUpdateTerminal),
658}
659
660impl ToolCallUpdate {
661 fn id(&self) -> &acp::ToolCallId {
662 match self {
663 Self::UpdateFields(update) => &update.id,
664 Self::UpdateDiff(diff) => &diff.id,
665 Self::UpdateTerminal(terminal) => &terminal.id,
666 }
667 }
668}
669
670impl From<acp::ToolCallUpdate> for ToolCallUpdate {
671 fn from(update: acp::ToolCallUpdate) -> Self {
672 Self::UpdateFields(update)
673 }
674}
675
676impl From<ToolCallUpdateDiff> for ToolCallUpdate {
677 fn from(diff: ToolCallUpdateDiff) -> Self {
678 Self::UpdateDiff(diff)
679 }
680}
681
682#[derive(Debug, PartialEq)]
683pub struct ToolCallUpdateDiff {
684 pub id: acp::ToolCallId,
685 pub diff: Entity<Diff>,
686}
687
688impl From<ToolCallUpdateTerminal> for ToolCallUpdate {
689 fn from(terminal: ToolCallUpdateTerminal) -> Self {
690 Self::UpdateTerminal(terminal)
691 }
692}
693
694#[derive(Debug, PartialEq)]
695pub struct ToolCallUpdateTerminal {
696 pub id: acp::ToolCallId,
697 pub terminal: Entity<Terminal>,
698}
699
700#[derive(Debug, Default)]
701pub struct Plan {
702 pub entries: Vec<PlanEntry>,
703}
704
705#[derive(Debug)]
706pub struct PlanStats<'a> {
707 pub in_progress_entry: Option<&'a PlanEntry>,
708 pub pending: u32,
709 pub completed: u32,
710}
711
712impl Plan {
713 pub fn is_empty(&self) -> bool {
714 self.entries.is_empty()
715 }
716
717 pub fn stats(&self) -> PlanStats<'_> {
718 let mut stats = PlanStats {
719 in_progress_entry: None,
720 pending: 0,
721 completed: 0,
722 };
723
724 for entry in &self.entries {
725 match &entry.status {
726 acp::PlanEntryStatus::Pending => {
727 stats.pending += 1;
728 }
729 acp::PlanEntryStatus::InProgress => {
730 stats.in_progress_entry = stats.in_progress_entry.or(Some(entry));
731 }
732 acp::PlanEntryStatus::Completed => {
733 stats.completed += 1;
734 }
735 }
736 }
737
738 stats
739 }
740}
741
742#[derive(Debug)]
743pub struct PlanEntry {
744 pub content: Entity<Markdown>,
745 pub priority: acp::PlanEntryPriority,
746 pub status: acp::PlanEntryStatus,
747}
748
749impl PlanEntry {
750 pub fn from_acp(entry: acp::PlanEntry, cx: &mut App) -> Self {
751 Self {
752 content: cx.new(|cx| Markdown::new(entry.content.into(), None, None, cx)),
753 priority: entry.priority,
754 status: entry.status,
755 }
756 }
757}
758
759#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
760pub struct TokenUsage {
761 pub max_tokens: u64,
762 pub used_tokens: u64,
763}
764
765impl TokenUsage {
766 pub fn ratio(&self) -> TokenUsageRatio {
767 #[cfg(debug_assertions)]
768 let warning_threshold: f32 = std::env::var("ZED_THREAD_WARNING_THRESHOLD")
769 .unwrap_or("0.8".to_string())
770 .parse()
771 .unwrap();
772 #[cfg(not(debug_assertions))]
773 let warning_threshold: f32 = 0.8;
774
775 // When the maximum is unknown because there is no selected model,
776 // avoid showing the token limit warning.
777 if self.max_tokens == 0 {
778 TokenUsageRatio::Normal
779 } else if self.used_tokens >= self.max_tokens {
780 TokenUsageRatio::Exceeded
781 } else if self.used_tokens as f32 / self.max_tokens as f32 >= warning_threshold {
782 TokenUsageRatio::Warning
783 } else {
784 TokenUsageRatio::Normal
785 }
786 }
787}
788
789#[derive(Debug, Clone, PartialEq, Eq)]
790pub enum TokenUsageRatio {
791 Normal,
792 Warning,
793 Exceeded,
794}
795
796#[derive(Debug, Clone)]
797pub struct RetryStatus {
798 pub last_error: SharedString,
799 pub attempt: usize,
800 pub max_attempts: usize,
801 pub started_at: Instant,
802 pub duration: Duration,
803}
804
805pub struct AnchoredText;
806
807pub struct AcpThread<T = SharedString> {
808 title: T,
809 entries: Vec<AgentThreadEntry<AnchoredText>>,
810 plan: Plan,
811 project: Entity<Project>,
812 action_log: Entity<ActionLog>,
813 shared_buffers: HashMap<Entity<Buffer>, BufferSnapshot>,
814 send_task: Option<Task<()>>,
815 connection: Rc<dyn AgentConnection>,
816 session_id: acp::SessionId,
817 token_usage: Option<TokenUsage>,
818 prompt_capabilities: acp::PromptCapabilities,
819 _observe_prompt_capabilities: Task<anyhow::Result<()>>,
820 terminals: HashMap<acp::TerminalId, Entity<Terminal>>,
821 pending_terminal_output: HashMap<acp::TerminalId, Vec<Vec<u8>>>,
822 pending_terminal_exit: HashMap<acp::TerminalId, acp::TerminalExitStatus>,
823}
824
825#[derive(Debug)]
826pub enum AcpThreadEvent {
827 NewEntry,
828 TitleUpdated,
829 TokenUsageUpdated,
830 EntryUpdated(usize),
831 EntriesRemoved(Range<usize>),
832 ToolAuthorizationRequired,
833 Retry(RetryStatus),
834 Stopped,
835 Error,
836 LoadError(LoadError),
837 PromptCapabilitiesUpdated,
838 Refusal,
839 AvailableCommandsUpdated(Vec<acp::AvailableCommand>),
840 ModeUpdated(acp::SessionModeId),
841}
842
843impl EventEmitter<AcpThreadEvent> for AcpThread {}
844
845#[derive(Debug, Clone)]
846pub enum TerminalProviderEvent {
847 Created {
848 terminal_id: acp::TerminalId,
849 label: String,
850 cwd: Option<PathBuf>,
851 output_byte_limit: Option<u64>,
852 terminal: Entity<::terminal::Terminal>,
853 },
854 Output {
855 terminal_id: acp::TerminalId,
856 data: Vec<u8>,
857 },
858 TitleChanged {
859 terminal_id: acp::TerminalId,
860 title: String,
861 },
862 Exit {
863 terminal_id: acp::TerminalId,
864 status: acp::TerminalExitStatus,
865 },
866}
867
868#[derive(Debug, Clone)]
869pub enum TerminalProviderCommand {
870 WriteInput {
871 terminal_id: acp::TerminalId,
872 bytes: Vec<u8>,
873 },
874 Resize {
875 terminal_id: acp::TerminalId,
876 cols: u16,
877 rows: u16,
878 },
879 Close {
880 terminal_id: acp::TerminalId,
881 },
882}
883
884impl AcpThread {
885 pub fn on_terminal_provider_event(
886 &mut self,
887 event: TerminalProviderEvent,
888 cx: &mut Context<Self>,
889 ) {
890 match event {
891 TerminalProviderEvent::Created {
892 terminal_id,
893 label,
894 cwd,
895 output_byte_limit,
896 terminal,
897 } => {
898 let entity = self.register_terminal_created(
899 terminal_id.clone(),
900 label,
901 cwd,
902 output_byte_limit,
903 terminal,
904 cx,
905 );
906
907 if let Some(mut chunks) = self.pending_terminal_output.remove(&terminal_id) {
908 for data in chunks.drain(..) {
909 entity.update(cx, |term, cx| {
910 term.inner().update(cx, |inner, cx| {
911 inner.write_output(&data, cx);
912 })
913 });
914 }
915 }
916
917 if let Some(_status) = self.pending_terminal_exit.remove(&terminal_id) {
918 entity.update(cx, |_term, cx| {
919 cx.notify();
920 });
921 }
922
923 cx.notify();
924 }
925 TerminalProviderEvent::Output { terminal_id, data } => {
926 if let Some(entity) = self.terminals.get(&terminal_id) {
927 entity.update(cx, |term, cx| {
928 term.inner().update(cx, |inner, cx| {
929 inner.write_output(&data, cx);
930 })
931 });
932 } else {
933 self.pending_terminal_output
934 .entry(terminal_id)
935 .or_default()
936 .push(data);
937 }
938 }
939 TerminalProviderEvent::TitleChanged { terminal_id, title } => {
940 if let Some(entity) = self.terminals.get(&terminal_id) {
941 entity.update(cx, |term, cx| {
942 term.inner().update(cx, |inner, cx| {
943 inner.breadcrumb_text = title;
944 cx.emit(::terminal::Event::BreadcrumbsChanged);
945 })
946 });
947 }
948 }
949 TerminalProviderEvent::Exit {
950 terminal_id,
951 status,
952 } => {
953 if let Some(entity) = self.terminals.get(&terminal_id) {
954 entity.update(cx, |_term, cx| {
955 cx.notify();
956 });
957 } else {
958 self.pending_terminal_exit.insert(terminal_id, status);
959 }
960 }
961 }
962 }
963}
964
965#[derive(PartialEq, Eq, Debug)]
966pub enum ThreadStatus {
967 Idle,
968 Generating,
969}
970
971#[derive(Debug, Clone)]
972pub enum LoadError {
973 Unsupported {
974 command: SharedString,
975 current_version: SharedString,
976 minimum_version: SharedString,
977 },
978 FailedToInstall(SharedString),
979 Exited {
980 status: ExitStatus,
981 },
982 Other(SharedString),
983}
984
985impl Display for LoadError {
986 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
987 match self {
988 LoadError::Unsupported {
989 command: path,
990 current_version,
991 minimum_version,
992 } => {
993 write!(
994 f,
995 "version {current_version} from {path} is not supported (need at least {minimum_version})"
996 )
997 }
998 LoadError::FailedToInstall(msg) => write!(f, "Failed to install: {msg}"),
999 LoadError::Exited { status } => write!(f, "Server exited with status {status}"),
1000 LoadError::Other(msg) => write!(f, "{msg}"),
1001 }
1002 }
1003}
1004
1005impl Error for LoadError {}
1006
1007impl<T> AcpThread<T> {
1008 pub fn new(
1009 title: impl Into<SharedString>,
1010 connection: Rc<dyn AgentConnection>,
1011 project: Entity<Project>,
1012 action_log: Entity<ActionLog>,
1013 session_id: acp::SessionId,
1014 mut prompt_capabilities_rx: watch::Receiver<acp::PromptCapabilities>,
1015 cx: &mut Context<Self>,
1016 ) -> Self {
1017 let prompt_capabilities = prompt_capabilities_rx.borrow().clone();
1018 let task = cx.spawn::<_, anyhow::Result<()>>(async move |this, cx| {
1019 loop {
1020 let caps = prompt_capabilities_rx.recv().await?;
1021 this.update(cx, |this, cx| {
1022 this.prompt_capabilities = caps;
1023 cx.emit(AcpThreadEvent::PromptCapabilitiesUpdated);
1024 })?;
1025 }
1026 });
1027
1028 Self {
1029 action_log,
1030 shared_buffers: Default::default(),
1031 entries: Default::default(),
1032 plan: Default::default(),
1033 title: title.into(),
1034 project,
1035 send_task: None,
1036 connection,
1037 session_id,
1038 token_usage: None,
1039 prompt_capabilities,
1040 _observe_prompt_capabilities: task,
1041 terminals: HashMap::default(),
1042 pending_terminal_output: HashMap::default(),
1043 pending_terminal_exit: HashMap::default(),
1044 }
1045 }
1046
1047 pub fn prompt_capabilities(&self) -> acp::PromptCapabilities {
1048 self.prompt_capabilities.clone()
1049 }
1050
1051 pub fn connection(&self) -> &Rc<dyn AgentConnection> {
1052 &self.connection
1053 }
1054
1055 pub fn action_log(&self) -> &Entity<ActionLog> {
1056 &self.action_log
1057 }
1058
1059 pub fn project(&self) -> &Entity<Project> {
1060 &self.project
1061 }
1062
1063 pub fn title(&self) -> SharedString {
1064 self.title.clone()
1065 }
1066
1067 pub fn entries(&self) -> &[AgentThreadEntry] {
1068 &self.entries
1069 }
1070
1071 pub fn session_id(&self) -> &acp::SessionId {
1072 &self.session_id
1073 }
1074
1075 pub fn status(&self) -> ThreadStatus {
1076 if self.send_task.is_some() {
1077 ThreadStatus::Generating
1078 } else {
1079 ThreadStatus::Idle
1080 }
1081 }
1082
1083 pub fn token_usage(&self) -> Option<&TokenUsage> {
1084 self.token_usage.as_ref()
1085 }
1086
1087 pub fn has_pending_edit_tool_calls(&self) -> bool {
1088 for entry in self.entries.iter().rev() {
1089 match entry {
1090 AgentThreadEntry::UserMessage(_) => return false,
1091 AgentThreadEntry::ToolCall(
1092 call @ ToolCall {
1093 status: ToolCallStatus::InProgress | ToolCallStatus::Pending,
1094 ..
1095 },
1096 ) if call.diffs().next().is_some() => {
1097 return true;
1098 }
1099 AgentThreadEntry::ToolCall(_) | AgentThreadEntry::AssistantMessage(_) => {}
1100 }
1101 }
1102
1103 false
1104 }
1105
1106 pub fn used_tools_since_last_user_message(&self) -> bool {
1107 for entry in self.entries.iter().rev() {
1108 match entry {
1109 AgentThreadEntry::UserMessage(..) => return false,
1110 AgentThreadEntry::AssistantMessage(..) => continue,
1111 AgentThreadEntry::ToolCall(..) => return true,
1112 }
1113 }
1114
1115 false
1116 }
1117
1118 pub fn handle_session_update(
1119 &mut self,
1120 update: acp::SessionUpdate,
1121 cx: &mut Context<Self>,
1122 ) -> Result<(), acp::Error> {
1123 match update {
1124 acp::SessionUpdate::UserMessageChunk(acp::ContentChunk { content, .. }) => {
1125 self.push_user_content_block(None, content, cx);
1126 }
1127 acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk { content, .. }) => {
1128 self.push_assistant_content_block(content, false, cx);
1129 }
1130 acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk { content, .. }) => {
1131 self.push_assistant_content_block(content, true, cx);
1132 }
1133 acp::SessionUpdate::ToolCall(tool_call) => {
1134 self.upsert_tool_call(tool_call, cx)?;
1135 }
1136 acp::SessionUpdate::ToolCallUpdate(tool_call_update) => {
1137 self.update_tool_call(tool_call_update, cx)?;
1138 }
1139 acp::SessionUpdate::Plan(plan) => {
1140 self.update_plan(plan, cx);
1141 }
1142 acp::SessionUpdate::AvailableCommandsUpdate(acp::AvailableCommandsUpdate {
1143 available_commands,
1144 ..
1145 }) => cx.emit(AcpThreadEvent::AvailableCommandsUpdated(available_commands)),
1146 acp::SessionUpdate::CurrentModeUpdate(acp::CurrentModeUpdate {
1147 current_mode_id,
1148 ..
1149 }) => cx.emit(AcpThreadEvent::ModeUpdated(current_mode_id)),
1150 }
1151 Ok(())
1152 }
1153
1154 pub fn push_user_content_block(
1155 &mut self,
1156 message_id: Option<UserMessageId>,
1157 chunk: acp::ContentBlock<T>,
1158 cx: &mut Context<Self>,
1159 ) {
1160 let language_registry = self.project.read(cx).languages().clone();
1161 let path_style = self.project.read(cx).path_style(cx);
1162 let entries_len = self.entries.len();
1163
1164 if let Some(last_entry) = self.entries.last_mut()
1165 && let AgentThreadEntry::UserMessage(UserMessage {
1166 id,
1167 content,
1168 chunks,
1169 ..
1170 }) = last_entry
1171 {
1172 *id = message_id.or(id.take());
1173 content.append(chunk.clone(), &language_registry, path_style, cx);
1174 chunks.push(chunk);
1175 let idx = entries_len - 1;
1176 cx.emit(AcpThreadEvent::EntryUpdated(idx));
1177 } else {
1178 let content = ContentBlock::new(chunk.clone(), &language_registry, path_style, cx);
1179 self.push_entry(
1180 AgentThreadEntry::UserMessage(UserMessage {
1181 id: message_id,
1182 content,
1183 chunks: vec![chunk],
1184 checkpoint: None,
1185 }),
1186 cx,
1187 );
1188 }
1189 }
1190
1191 pub fn push_assistant_content_block(
1192 &mut self,
1193 chunk: acp::ContentBlock,
1194 is_thought: bool,
1195 cx: &mut Context<Self>,
1196 ) {
1197 let language_registry = self.project.read(cx).languages().clone();
1198 let path_style = self.project.read(cx).path_style(cx);
1199 let entries_len = self.entries.len();
1200 if let Some(last_entry) = self.entries.last_mut()
1201 && let AgentThreadEntry::AssistantMessage(AssistantMessage { chunks }) = last_entry
1202 {
1203 let idx = entries_len - 1;
1204 cx.emit(AcpThreadEvent::EntryUpdated(idx));
1205 match (chunks.last_mut(), is_thought) {
1206 (Some(AssistantMessageChunk::Message { block }), false)
1207 | (Some(AssistantMessageChunk::Thought { block }), true) => {
1208 block.append(chunk, &language_registry, path_style, cx)
1209 }
1210 _ => {
1211 let block = ContentBlock::new(chunk, &language_registry, path_style, cx);
1212 if is_thought {
1213 chunks.push(AssistantMessageChunk::Thought { block })
1214 } else {
1215 chunks.push(AssistantMessageChunk::Message { block })
1216 }
1217 }
1218 }
1219 } else {
1220 let block = ContentBlock::new(chunk, &language_registry, path_style, cx);
1221 let chunk = if is_thought {
1222 AssistantMessageChunk::Thought { block }
1223 } else {
1224 AssistantMessageChunk::Message { block }
1225 };
1226
1227 self.push_entry(
1228 AgentThreadEntry::AssistantMessage(AssistantMessage {
1229 chunks: vec![chunk],
1230 }),
1231 cx,
1232 );
1233 }
1234 }
1235
1236 fn push_entry(&mut self, entry: AgentThreadEntry<T>, cx: &mut Context<Self>) {
1237 self.entries.push(entry);
1238 cx.emit(AcpThreadEvent::NewEntry);
1239 }
1240
1241 pub fn can_set_title(&mut self, cx: &mut Context<Self>) -> bool {
1242 self.connection.set_title(&self.session_id, cx).is_some()
1243 }
1244
1245 pub fn set_title(&mut self, title: SharedString, cx: &mut Context<Self>) -> Task<Result<()>> {
1246 if title != self.title {
1247 self.title = title.clone();
1248 cx.emit(AcpThreadEvent::TitleUpdated);
1249 if let Some(set_title) = self.connection.set_title(&self.session_id, cx) {
1250 return set_title.run(title, cx);
1251 }
1252 }
1253 Task::ready(Ok(()))
1254 }
1255
1256 pub fn update_token_usage(&mut self, usage: Option<TokenUsage>, cx: &mut Context<Self>) {
1257 self.token_usage = usage;
1258 cx.emit(AcpThreadEvent::TokenUsageUpdated);
1259 }
1260
1261 pub fn update_retry_status(&mut self, status: RetryStatus, cx: &mut Context<Self>) {
1262 cx.emit(AcpThreadEvent::Retry(status));
1263 }
1264
1265 pub fn update_tool_call(
1266 &mut self,
1267 update: impl Into<ToolCallUpdate>,
1268 cx: &mut Context<Self>,
1269 ) -> Result<()> {
1270 let update = update.into();
1271 let languages = self.project.read(cx).languages().clone();
1272 let path_style = self.project.read(cx).path_style(cx);
1273
1274 let ix = match self.index_for_tool_call(update.id()) {
1275 Some(ix) => ix,
1276 None => {
1277 // Tool call not found - create a failed tool call entry
1278 let failed_tool_call = ToolCall {
1279 id: update.id().clone(),
1280 label: cx.new(|cx| Markdown::new("Tool call not found".into(), None, None, cx)),
1281 kind: acp::ToolKind::Fetch,
1282 content: vec![ToolCallContent::ContentBlock(ContentBlock::new(
1283 acp::ContentBlock::Text(acp::TextContent {
1284 text: "Tool call not found".to_string(),
1285 annotations: None,
1286 meta: None,
1287 }),
1288 &languages,
1289 path_style,
1290 cx,
1291 ))],
1292 status: ToolCallStatus::Failed,
1293 locations: Vec::new(),
1294 resolved_locations: Vec::new(),
1295 raw_input: None,
1296 raw_output: None,
1297 };
1298 self.push_entry(AgentThreadEntry::ToolCall(failed_tool_call), cx);
1299 return Ok(());
1300 }
1301 };
1302 let AgentThreadEntry::ToolCall(call) = &mut self.entries[ix] else {
1303 unreachable!()
1304 };
1305
1306 match update {
1307 ToolCallUpdate::UpdateFields(update) => {
1308 let location_updated = update.fields.locations.is_some();
1309 call.update_fields(update.fields, languages, path_style, &self.terminals, cx)?;
1310 if location_updated {
1311 self.resolve_locations(update.id, cx);
1312 }
1313 }
1314 ToolCallUpdate::UpdateDiff(update) => {
1315 call.content.clear();
1316 call.content.push(ToolCallContent::Diff(update.diff));
1317 }
1318 ToolCallUpdate::UpdateTerminal(update) => {
1319 call.content.clear();
1320 call.content
1321 .push(ToolCallContent::Terminal(update.terminal));
1322 }
1323 }
1324
1325 cx.emit(AcpThreadEvent::EntryUpdated(ix));
1326
1327 Ok(())
1328 }
1329
1330 /// Updates a tool call if id matches an existing entry, otherwise inserts a new one.
1331 pub fn upsert_tool_call(
1332 &mut self,
1333 tool_call: acp::ToolCall,
1334 cx: &mut Context<Self>,
1335 ) -> Result<(), acp::Error> {
1336 let status = tool_call.status.into();
1337 self.upsert_tool_call_inner(tool_call.into(), status, cx)
1338 }
1339
1340 /// Fails if id does not match an existing entry.
1341 pub fn upsert_tool_call_inner(
1342 &mut self,
1343 update: acp::ToolCallUpdate,
1344 status: ToolCallStatus,
1345 cx: &mut Context<Self>,
1346 ) -> Result<(), acp::Error> {
1347 let language_registry = self.project.read(cx).languages().clone();
1348 let path_style = self.project.read(cx).path_style(cx);
1349 let id = update.id.clone();
1350
1351 if let Some(ix) = self.index_for_tool_call(&id) {
1352 let AgentThreadEntry::ToolCall(call) = &mut self.entries[ix] else {
1353 unreachable!()
1354 };
1355
1356 call.update_fields(
1357 update.fields,
1358 language_registry,
1359 path_style,
1360 &self.terminals,
1361 cx,
1362 )?;
1363 call.status = status;
1364
1365 cx.emit(AcpThreadEvent::EntryUpdated(ix));
1366 } else {
1367 let call = ToolCall::from_acp(
1368 update.try_into()?,
1369 status,
1370 language_registry,
1371 self.project.read(cx).path_style(cx),
1372 &self.terminals,
1373 cx,
1374 )?;
1375 self.push_entry(AgentThreadEntry::ToolCall(call), cx);
1376 };
1377
1378 self.resolve_locations(id, cx);
1379 Ok(())
1380 }
1381
1382 fn index_for_tool_call(&self, id: &acp::ToolCallId) -> Option<usize> {
1383 self.entries
1384 .iter()
1385 .enumerate()
1386 .rev()
1387 .find_map(|(index, entry)| {
1388 if let AgentThreadEntry::ToolCall(tool_call) = entry
1389 && &tool_call.id == id
1390 {
1391 Some(index)
1392 } else {
1393 None
1394 }
1395 })
1396 }
1397
1398 fn tool_call_mut(&mut self, id: &acp::ToolCallId) -> Option<(usize, &mut ToolCall)> {
1399 // The tool call we are looking for is typically the last one, or very close to the end.
1400 // At the moment, it doesn't seem like a hashmap would be a good fit for this use case.
1401 self.entries
1402 .iter_mut()
1403 .enumerate()
1404 .rev()
1405 .find_map(|(index, tool_call)| {
1406 if let AgentThreadEntry::ToolCall(tool_call) = tool_call
1407 && &tool_call.id == id
1408 {
1409 Some((index, tool_call))
1410 } else {
1411 None
1412 }
1413 })
1414 }
1415
1416 pub fn tool_call(&mut self, id: &acp::ToolCallId) -> Option<(usize, &ToolCall)> {
1417 self.entries
1418 .iter()
1419 .enumerate()
1420 .rev()
1421 .find_map(|(index, tool_call)| {
1422 if let AgentThreadEntry::ToolCall(tool_call) = tool_call
1423 && &tool_call.id == id
1424 {
1425 Some((index, tool_call))
1426 } else {
1427 None
1428 }
1429 })
1430 }
1431
1432 pub fn resolve_locations(&mut self, id: acp::ToolCallId, cx: &mut Context<Self>) {
1433 let project = self.project.clone();
1434 let Some((_, tool_call)) = self.tool_call_mut(&id) else {
1435 return;
1436 };
1437 let task = tool_call.resolve_locations(project, cx);
1438 cx.spawn(async move |this, cx| {
1439 let resolved_locations = task.await;
1440
1441 this.update(cx, |this, cx| {
1442 let project = this.project.clone();
1443
1444 for location in resolved_locations.iter().flatten() {
1445 this.shared_buffers
1446 .insert(location.buffer.clone(), location.buffer.read(cx).snapshot());
1447 }
1448 let Some((ix, tool_call)) = this.tool_call_mut(&id) else {
1449 return;
1450 };
1451
1452 if let Some(Some(location)) = resolved_locations.last() {
1453 project.update(cx, |project, cx| {
1454 let should_ignore = if let Some(agent_location) = project
1455 .agent_location()
1456 .filter(|agent_location| agent_location.buffer == location.buffer)
1457 {
1458 let snapshot = location.buffer.read(cx).snapshot();
1459 let old_position = agent_location.position.to_point(&snapshot);
1460 let new_position = location.position.to_point(&snapshot);
1461
1462 // ignore this so that when we get updates from the edit tool
1463 // the position doesn't reset to the startof line
1464 old_position.row == new_position.row
1465 && old_position.column > new_position.column
1466 } else {
1467 false
1468 };
1469 if !should_ignore {
1470 project.set_agent_location(Some(location.into()), cx);
1471 }
1472 });
1473 }
1474
1475 let resolved_locations = resolved_locations
1476 .iter()
1477 .map(|l| l.as_ref().map(|l| AgentLocation::from(l)))
1478 .collect::<Vec<_>>();
1479
1480 if tool_call.resolved_locations != resolved_locations {
1481 tool_call.resolved_locations = resolved_locations;
1482 cx.emit(AcpThreadEvent::EntryUpdated(ix));
1483 }
1484 })
1485 })
1486 .detach();
1487 }
1488
1489 pub fn request_tool_call_authorization(
1490 &mut self,
1491 tool_call: acp::ToolCallUpdate,
1492 options: Vec<acp::PermissionOption>,
1493 respect_always_allow_setting: bool,
1494 cx: &mut Context<Self>,
1495 ) -> Result<BoxFuture<'static, acp::RequestPermissionOutcome>> {
1496 let (tx, rx) = oneshot::channel();
1497
1498 if respect_always_allow_setting && AgentSettings::get_global(cx).always_allow_tool_actions {
1499 // Don't use AllowAlways, because then if you were to turn off always_allow_tool_actions,
1500 // some tools would (incorrectly) continue to auto-accept.
1501 if let Some(allow_once_option) = options.iter().find_map(|option| {
1502 if matches!(option.kind, acp::PermissionOptionKind::AllowOnce) {
1503 Some(option.id.clone())
1504 } else {
1505 None
1506 }
1507 }) {
1508 self.upsert_tool_call_inner(tool_call, ToolCallStatus::Pending, cx)?;
1509 return Ok(async {
1510 acp::RequestPermissionOutcome::Selected {
1511 option_id: allow_once_option,
1512 }
1513 }
1514 .boxed());
1515 }
1516 }
1517
1518 let status = ToolCallStatus::WaitingForConfirmation {
1519 options,
1520 respond_tx: tx,
1521 };
1522
1523 self.upsert_tool_call_inner(tool_call, status, cx)?;
1524 cx.emit(AcpThreadEvent::ToolAuthorizationRequired);
1525
1526 let fut = async {
1527 match rx.await {
1528 Ok(option) => acp::RequestPermissionOutcome::Selected { option_id: option },
1529 Err(oneshot::Canceled) => acp::RequestPermissionOutcome::Cancelled,
1530 }
1531 }
1532 .boxed();
1533
1534 Ok(fut)
1535 }
1536
1537 pub fn authorize_tool_call(
1538 &mut self,
1539 id: acp::ToolCallId,
1540 option_id: acp::PermissionOptionId,
1541 option_kind: acp::PermissionOptionKind,
1542 cx: &mut Context<Self>,
1543 ) {
1544 let Some((ix, call)) = self.tool_call_mut(&id) else {
1545 return;
1546 };
1547
1548 let new_status = match option_kind {
1549 acp::PermissionOptionKind::RejectOnce | acp::PermissionOptionKind::RejectAlways => {
1550 ToolCallStatus::Rejected
1551 }
1552 acp::PermissionOptionKind::AllowOnce | acp::PermissionOptionKind::AllowAlways => {
1553 ToolCallStatus::InProgress
1554 }
1555 };
1556
1557 let curr_status = mem::replace(&mut call.status, new_status);
1558
1559 if let ToolCallStatus::WaitingForConfirmation { respond_tx, .. } = curr_status {
1560 respond_tx.send(option_id).log_err();
1561 } else if cfg!(debug_assertions) {
1562 panic!("tried to authorize an already authorized tool call");
1563 }
1564
1565 cx.emit(AcpThreadEvent::EntryUpdated(ix));
1566 }
1567
1568 pub fn first_tool_awaiting_confirmation(&self) -> Option<&ToolCall> {
1569 let mut first_tool_call = None;
1570
1571 for entry in self.entries.iter().rev() {
1572 match &entry {
1573 AgentThreadEntry::ToolCall(call) => {
1574 if let ToolCallStatus::WaitingForConfirmation { .. } = call.status {
1575 first_tool_call = Some(call);
1576 } else {
1577 continue;
1578 }
1579 }
1580 AgentThreadEntry::UserMessage(_) | AgentThreadEntry::AssistantMessage(_) => {
1581 // Reached the beginning of the turn.
1582 // If we had pending permission requests in the previous turn, they have been cancelled.
1583 break;
1584 }
1585 }
1586 }
1587
1588 first_tool_call
1589 }
1590
1591 pub fn plan(&self) -> &Plan {
1592 &self.plan
1593 }
1594
1595 pub fn update_plan(&mut self, request: acp::Plan, cx: &mut Context<Self>) {
1596 let new_entries_len = request.entries.len();
1597 let mut new_entries = request.entries.into_iter();
1598
1599 // Reuse existing markdown to prevent flickering
1600 for (old, new) in self.plan.entries.iter_mut().zip(new_entries.by_ref()) {
1601 let PlanEntry {
1602 content,
1603 priority,
1604 status,
1605 } = old;
1606 content.update(cx, |old, cx| {
1607 old.replace(new.content, cx);
1608 });
1609 *priority = new.priority;
1610 *status = new.status;
1611 }
1612 for new in new_entries {
1613 self.plan.entries.push(PlanEntry::from_acp(new, cx))
1614 }
1615 self.plan.entries.truncate(new_entries_len);
1616
1617 cx.notify();
1618 }
1619
1620 fn clear_completed_plan_entries(&mut self, cx: &mut Context<Self>) {
1621 self.plan
1622 .entries
1623 .retain(|entry| !matches!(entry.status, acp::PlanEntryStatus::Completed));
1624 cx.notify();
1625 }
1626
1627 #[cfg(any(test, feature = "test-support"))]
1628 pub fn send_raw(
1629 &mut self,
1630 message: &str,
1631 cx: &mut Context<Self>,
1632 ) -> BoxFuture<'static, Result<()>> {
1633 self.send(
1634 vec![acp::ContentBlock::Text(acp::TextContent {
1635 text: message.to_string(),
1636 annotations: None,
1637 meta: None,
1638 })],
1639 cx,
1640 )
1641 }
1642
1643 pub fn send(
1644 &mut self,
1645 message: Vec<acp::ContentBlock>,
1646 cx: &mut Context<Self>,
1647 ) -> BoxFuture<'static, Result<()>> {
1648 let block = ContentBlock::new_combined(
1649 message.clone(),
1650 self.project.read(cx).languages().clone(),
1651 self.project.read(cx).path_style(cx),
1652 cx,
1653 );
1654 let request = acp::PromptRequest {
1655 prompt: message.clone(),
1656 session_id: self.session_id.clone(),
1657 meta: None,
1658 };
1659 let git_store = self.project.read(cx).git_store().clone();
1660
1661 let message_id = if self.connection.truncate(&self.session_id, cx).is_some() {
1662 Some(UserMessageId::new())
1663 } else {
1664 None
1665 };
1666
1667 self.run_turn(cx, async move |this, cx| {
1668 this.update(cx, |this, cx| {
1669 this.push_entry(
1670 AgentThreadEntry::UserMessage(UserMessage {
1671 id: message_id.clone(),
1672 content: block,
1673 chunks: message,
1674 checkpoint: None,
1675 }),
1676 cx,
1677 );
1678 })
1679 .ok();
1680
1681 let old_checkpoint = git_store
1682 .update(cx, |git, cx| git.checkpoint(cx))?
1683 .await
1684 .context("failed to get old checkpoint")
1685 .log_err();
1686 this.update(cx, |this, cx| {
1687 if let Some((_ix, message)) = this.last_user_message() {
1688 message.checkpoint = old_checkpoint.map(|git_checkpoint| Checkpoint {
1689 git_checkpoint,
1690 show: false,
1691 });
1692 }
1693 this.connection.prompt(message_id, request, cx)
1694 })?
1695 .await
1696 })
1697 }
1698
1699 pub fn can_resume(&self, cx: &App) -> bool {
1700 self.connection.resume(&self.session_id, cx).is_some()
1701 }
1702
1703 pub fn resume(&mut self, cx: &mut Context<Self>) -> BoxFuture<'static, Result<()>> {
1704 self.run_turn(cx, async move |this, cx| {
1705 this.update(cx, |this, cx| {
1706 this.connection
1707 .resume(&this.session_id, cx)
1708 .map(|resume| resume.run(cx))
1709 })?
1710 .context("resuming a session is not supported")?
1711 .await
1712 })
1713 }
1714
1715 fn run_turn(
1716 &mut self,
1717 cx: &mut Context<Self>,
1718 f: impl 'static + AsyncFnOnce(WeakEntity<Self>, &mut AsyncApp) -> Result<acp::PromptResponse>,
1719 ) -> BoxFuture<'static, Result<()>> {
1720 self.clear_completed_plan_entries(cx);
1721
1722 let (tx, rx) = oneshot::channel();
1723 let cancel_task = self.cancel(cx);
1724
1725 self.send_task = Some(cx.spawn(async move |this, cx| {
1726 cancel_task.await;
1727 tx.send(f(this, cx).await).ok();
1728 }));
1729
1730 cx.spawn(async move |this, cx| {
1731 let response = rx.await;
1732
1733 this.update(cx, |this, cx| this.update_last_checkpoint(cx))?
1734 .await?;
1735
1736 this.update(cx, |this, cx| {
1737 this.project
1738 .update(cx, |project, cx| project.set_agent_location(None, cx));
1739 match response {
1740 Ok(Err(e)) => {
1741 this.send_task.take();
1742 cx.emit(AcpThreadEvent::Error);
1743 Err(e)
1744 }
1745 result => {
1746 let canceled = matches!(
1747 result,
1748 Ok(Ok(acp::PromptResponse {
1749 stop_reason: acp::StopReason::Cancelled,
1750 meta: None,
1751 }))
1752 );
1753
1754 // We only take the task if the current prompt wasn't canceled.
1755 //
1756 // This prompt may have been canceled because another one was sent
1757 // while it was still generating. In these cases, dropping `send_task`
1758 // would cause the next generation to be canceled.
1759 if !canceled {
1760 this.send_task.take();
1761 }
1762
1763 // Handle refusal - distinguish between user prompt and tool call refusals
1764 if let Ok(Ok(acp::PromptResponse {
1765 stop_reason: acp::StopReason::Refusal,
1766 meta: _,
1767 })) = result
1768 {
1769 if let Some((user_msg_ix, _)) = this.last_user_message() {
1770 // Check if there's a completed tool call with results after the last user message
1771 // This indicates the refusal is in response to tool output, not the user's prompt
1772 let has_completed_tool_call_after_user_msg =
1773 this.entries.iter().skip(user_msg_ix + 1).any(|entry| {
1774 if let AgentThreadEntry::ToolCall(tool_call) = entry {
1775 // Check if the tool call has completed and has output
1776 matches!(tool_call.status, ToolCallStatus::Completed)
1777 && tool_call.raw_output.is_some()
1778 } else {
1779 false
1780 }
1781 });
1782
1783 if has_completed_tool_call_after_user_msg {
1784 // Refusal is due to tool output - don't truncate, just notify
1785 // The model refused based on what the tool returned
1786 cx.emit(AcpThreadEvent::Refusal);
1787 } else {
1788 // User prompt was refused - truncate back to before the user message
1789 let range = user_msg_ix..this.entries.len();
1790 if range.start < range.end {
1791 this.entries.truncate(user_msg_ix);
1792 cx.emit(AcpThreadEvent::EntriesRemoved(range));
1793 }
1794 cx.emit(AcpThreadEvent::Refusal);
1795 }
1796 } else {
1797 // No user message found, treat as general refusal
1798 cx.emit(AcpThreadEvent::Refusal);
1799 }
1800 }
1801
1802 cx.emit(AcpThreadEvent::Stopped);
1803 Ok(())
1804 }
1805 }
1806 })?
1807 })
1808 .boxed()
1809 }
1810
1811 pub fn cancel(&mut self, cx: &mut Context<Self>) -> Task<()> {
1812 let Some(send_task) = self.send_task.take() else {
1813 return Task::ready(());
1814 };
1815
1816 for entry in self.entries.iter_mut() {
1817 if let AgentThreadEntry::ToolCall(call) = entry {
1818 let cancel = matches!(
1819 call.status,
1820 ToolCallStatus::Pending
1821 | ToolCallStatus::WaitingForConfirmation { .. }
1822 | ToolCallStatus::InProgress
1823 );
1824
1825 if cancel {
1826 call.status = ToolCallStatus::Canceled;
1827 }
1828 }
1829 }
1830
1831 self.connection.cancel(&self.session_id, cx);
1832
1833 // Wait for the send task to complete
1834 cx.foreground_executor().spawn(send_task)
1835 }
1836
1837 /// Restores the git working tree to the state at the given checkpoint (if one exists)
1838 pub fn restore_checkpoint(
1839 &mut self,
1840 id: UserMessageId,
1841 cx: &mut Context<Self>,
1842 ) -> Task<Result<()>> {
1843 let Some((_, message)) = self.user_message_mut(&id) else {
1844 return Task::ready(Err(anyhow!("message not found")));
1845 };
1846
1847 let checkpoint = message
1848 .checkpoint
1849 .as_ref()
1850 .map(|c| c.git_checkpoint.clone());
1851 let rewind = self.rewind(id.clone(), cx);
1852 let git_store = self.project.read(cx).git_store().clone();
1853
1854 cx.spawn(async move |_, cx| {
1855 rewind.await?;
1856 if let Some(checkpoint) = checkpoint {
1857 git_store
1858 .update(cx, |git, cx| git.restore_checkpoint(checkpoint, cx))?
1859 .await?;
1860 }
1861
1862 Ok(())
1863 })
1864 }
1865
1866 /// Rewinds this thread to before the entry at `index`, removing it and all
1867 /// subsequent entries while rejecting any action_log changes made from that point.
1868 /// Unlike `restore_checkpoint`, this method does not restore from git.
1869 pub fn rewind(&mut self, id: UserMessageId, cx: &mut Context<Self>) -> Task<Result<()>> {
1870 let Some(truncate) = self.connection.truncate(&self.session_id, cx) else {
1871 return Task::ready(Err(anyhow!("not supported")));
1872 };
1873
1874 cx.spawn(async move |this, cx| {
1875 cx.update(|cx| truncate.run(id.clone(), cx))?.await?;
1876 this.update(cx, |this, cx| {
1877 if let Some((ix, _)) = this.user_message_mut(&id) {
1878 let range = ix..this.entries.len();
1879 this.entries.truncate(ix);
1880 cx.emit(AcpThreadEvent::EntriesRemoved(range));
1881 }
1882 this.action_log()
1883 .update(cx, |action_log, cx| action_log.reject_all_edits(cx))
1884 })?
1885 .await;
1886 Ok(())
1887 })
1888 }
1889
1890 fn update_last_checkpoint(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
1891 let git_store = self.project.read(cx).git_store().clone();
1892
1893 let old_checkpoint = if let Some((_, message)) = self.last_user_message() {
1894 if let Some(checkpoint) = message.checkpoint.as_ref() {
1895 checkpoint.git_checkpoint.clone()
1896 } else {
1897 return Task::ready(Ok(()));
1898 }
1899 } else {
1900 return Task::ready(Ok(()));
1901 };
1902
1903 let new_checkpoint = git_store.update(cx, |git, cx| git.checkpoint(cx));
1904 cx.spawn(async move |this, cx| {
1905 let new_checkpoint = new_checkpoint
1906 .await
1907 .context("failed to get new checkpoint")
1908 .log_err();
1909 if let Some(new_checkpoint) = new_checkpoint {
1910 let equal = git_store
1911 .update(cx, |git, cx| {
1912 git.compare_checkpoints(old_checkpoint.clone(), new_checkpoint, cx)
1913 })?
1914 .await
1915 .unwrap_or(true);
1916 this.update(cx, |this, cx| {
1917 let (ix, message) = this.last_user_message().context("no user message")?;
1918 let checkpoint = message.checkpoint.as_mut().context("no checkpoint")?;
1919 checkpoint.show = !equal;
1920 cx.emit(AcpThreadEvent::EntryUpdated(ix));
1921 anyhow::Ok(())
1922 })??;
1923 }
1924
1925 Ok(())
1926 })
1927 }
1928
1929 fn last_user_message(&mut self) -> Option<(usize, &mut UserMessage<T>)> {
1930 self.entries
1931 .iter_mut()
1932 .enumerate()
1933 .rev()
1934 .find_map(|(ix, entry)| {
1935 if let AgentThreadEntry::UserMessage(message) = entry {
1936 Some((ix, message))
1937 } else {
1938 None
1939 }
1940 })
1941 }
1942
1943 fn user_message_mut(&mut self, id: &UserMessageId) -> Option<(usize, &mut UserMessage)> {
1944 self.entries.iter_mut().enumerate().find_map(|(ix, entry)| {
1945 if let AgentThreadEntry::UserMessage(message) = entry {
1946 if message.id.as_ref() == Some(id) {
1947 Some((ix, message))
1948 } else {
1949 None
1950 }
1951 } else {
1952 None
1953 }
1954 })
1955 }
1956
1957 pub fn read_text_file(
1958 &self,
1959 path: PathBuf,
1960 line: Option<u32>,
1961 limit: Option<u32>,
1962 reuse_shared_snapshot: bool,
1963 cx: &mut Context<Self>,
1964 ) -> Task<Result<String, acp::Error>> {
1965 // Args are 1-based, move to 0-based
1966 let line = line.unwrap_or_default().saturating_sub(1);
1967 let limit = limit.unwrap_or(u32::MAX);
1968 let project = self.project.clone();
1969 let action_log = self.action_log.clone();
1970 cx.spawn(async move |this, cx| {
1971 let load = project
1972 .update(cx, |project, cx| {
1973 let path = project
1974 .project_path_for_absolute_path(&path, cx)
1975 .ok_or_else(|| {
1976 acp::Error::resource_not_found(Some(path.display().to_string()))
1977 })?;
1978 Ok(project.open_buffer(path, cx))
1979 })
1980 .map_err(|e| acp::Error::internal_error().with_data(e.to_string()))
1981 .flatten()?;
1982
1983 let buffer = load.await?;
1984
1985 let snapshot = if reuse_shared_snapshot {
1986 this.read_with(cx, |this, _| {
1987 this.shared_buffers.get(&buffer.clone()).cloned()
1988 })
1989 .log_err()
1990 .flatten()
1991 } else {
1992 None
1993 };
1994
1995 let snapshot = if let Some(snapshot) = snapshot {
1996 snapshot
1997 } else {
1998 action_log.update(cx, |action_log, cx| {
1999 action_log.buffer_read(buffer.clone(), cx);
2000 })?;
2001
2002 let snapshot = buffer.update(cx, |buffer, _| buffer.snapshot())?;
2003 this.update(cx, |this, _| {
2004 this.shared_buffers.insert(buffer.clone(), snapshot.clone());
2005 })?;
2006 snapshot
2007 };
2008
2009 let max_point = snapshot.max_point();
2010 let start_position = Point::new(line, 0);
2011
2012 if start_position > max_point {
2013 return Err(acp::Error::invalid_params().with_data(format!(
2014 "Attempting to read beyond the end of the file, line {}:{}",
2015 max_point.row + 1,
2016 max_point.column
2017 )));
2018 }
2019
2020 let start = snapshot.anchor_before(start_position);
2021 let end = snapshot.anchor_before(Point::new(line.saturating_add(limit), 0));
2022
2023 project.update(cx, |project, cx| {
2024 project.set_agent_location(
2025 Some(AgentLocation {
2026 buffer: buffer.downgrade(),
2027 position: start,
2028 }),
2029 cx,
2030 );
2031 })?;
2032
2033 Ok(snapshot.text_for_range(start..end).collect::<String>())
2034 })
2035 }
2036
2037 pub fn write_text_file(
2038 &self,
2039 path: PathBuf,
2040 content: String,
2041 cx: &mut Context<Self>,
2042 ) -> Task<Result<()>> {
2043 let project = self.project.clone();
2044 let action_log = self.action_log.clone();
2045 cx.spawn(async move |this, cx| {
2046 let load = project.update(cx, |project, cx| {
2047 let path = project
2048 .project_path_for_absolute_path(&path, cx)
2049 .context("invalid path")?;
2050 anyhow::Ok(project.open_buffer(path, cx))
2051 });
2052 let buffer = load??.await?;
2053 let snapshot = this.update(cx, |this, cx| {
2054 this.shared_buffers
2055 .get(&buffer)
2056 .cloned()
2057 .unwrap_or_else(|| buffer.read(cx).snapshot())
2058 })?;
2059 let edits = cx
2060 .background_executor()
2061 .spawn(async move {
2062 let old_text = snapshot.text();
2063 text_diff(old_text.as_str(), &content)
2064 .into_iter()
2065 .map(|(range, replacement)| {
2066 (
2067 snapshot.anchor_after(range.start)
2068 ..snapshot.anchor_before(range.end),
2069 replacement,
2070 )
2071 })
2072 .collect::<Vec<_>>()
2073 })
2074 .await;
2075
2076 project.update(cx, |project, cx| {
2077 project.set_agent_location(
2078 Some(AgentLocation {
2079 buffer: buffer.downgrade(),
2080 position: edits
2081 .last()
2082 .map(|(range, _)| range.end)
2083 .unwrap_or(Anchor::MIN),
2084 }),
2085 cx,
2086 );
2087 })?;
2088
2089 let format_on_save = cx.update(|cx| {
2090 action_log.update(cx, |action_log, cx| {
2091 action_log.buffer_read(buffer.clone(), cx);
2092 });
2093
2094 let format_on_save = buffer.update(cx, |buffer, cx| {
2095 buffer.edit(edits, None, cx);
2096
2097 let settings = language::language_settings::language_settings(
2098 buffer.language().map(|l| l.name()),
2099 buffer.file(),
2100 cx,
2101 );
2102
2103 settings.format_on_save != FormatOnSave::Off
2104 });
2105 action_log.update(cx, |action_log, cx| {
2106 action_log.buffer_edited(buffer.clone(), cx);
2107 });
2108 format_on_save
2109 })?;
2110
2111 if format_on_save {
2112 let format_task = project.update(cx, |project, cx| {
2113 project.format(
2114 HashSet::from_iter([buffer.clone()]),
2115 LspFormatTarget::Buffers,
2116 false,
2117 FormatTrigger::Save,
2118 cx,
2119 )
2120 })?;
2121 format_task.await.log_err();
2122
2123 action_log.update(cx, |action_log, cx| {
2124 action_log.buffer_edited(buffer.clone(), cx);
2125 })?;
2126 }
2127
2128 project
2129 .update(cx, |project, cx| project.save_buffer(buffer, cx))?
2130 .await
2131 })
2132 }
2133
2134 pub fn create_terminal(
2135 &self,
2136 command: String,
2137 args: Vec<String>,
2138 extra_env: Vec<acp::EnvVariable>,
2139 cwd: Option<PathBuf>,
2140 output_byte_limit: Option<u64>,
2141 cx: &mut Context<Self>,
2142 ) -> Task<Result<Entity<Terminal>>> {
2143 let env = match &cwd {
2144 Some(dir) => self.project.update(cx, |project, cx| {
2145 project.environment().update(cx, |env, cx| {
2146 env.directory_environment(dir.as_path().into(), cx)
2147 })
2148 }),
2149 None => Task::ready(None).shared(),
2150 };
2151 let env = cx.spawn(async move |_, _| {
2152 let mut env = env.await.unwrap_or_default();
2153 // Disables paging for `git` and hopefully other commands
2154 env.insert("PAGER".into(), "".into());
2155 for var in extra_env {
2156 env.insert(var.name, var.value);
2157 }
2158 env
2159 });
2160
2161 let project = self.project.clone();
2162 let language_registry = project.read(cx).languages().clone();
2163 let is_windows = project.read(cx).path_style(cx).is_windows();
2164
2165 let terminal_id = acp::TerminalId(Uuid::new_v4().to_string().into());
2166 let terminal_task = cx.spawn({
2167 let terminal_id = terminal_id.clone();
2168 async move |_this, cx| {
2169 let env = env.await;
2170 let shell = project
2171 .update(cx, |project, cx| {
2172 project
2173 .remote_client()
2174 .and_then(|r| r.read(cx).default_system_shell())
2175 })?
2176 .unwrap_or_else(|| get_default_system_shell_preferring_bash());
2177 let (task_command, task_args) =
2178 ShellBuilder::new(&Shell::Program(shell), is_windows)
2179 .redirect_stdin_to_dev_null()
2180 .build(Some(command.clone()), &args);
2181 let terminal = project
2182 .update(cx, |project, cx| {
2183 project.create_terminal_task(
2184 task::SpawnInTerminal {
2185 command: Some(task_command),
2186 args: task_args,
2187 cwd: cwd.clone(),
2188 env,
2189 ..Default::default()
2190 },
2191 cx,
2192 )
2193 })?
2194 .await?;
2195
2196 cx.new(|cx| {
2197 Terminal::new(
2198 terminal_id,
2199 &format!("{} {}", command, args.join(" ")),
2200 cwd,
2201 output_byte_limit.map(|l| l as usize),
2202 terminal,
2203 language_registry,
2204 cx,
2205 )
2206 })
2207 }
2208 });
2209
2210 cx.spawn(async move |this, cx| {
2211 let terminal = terminal_task.await?;
2212 this.update(cx, |this, _cx| {
2213 this.terminals.insert(terminal_id, terminal.clone());
2214 terminal
2215 })
2216 })
2217 }
2218
2219 pub fn kill_terminal(
2220 &mut self,
2221 terminal_id: acp::TerminalId,
2222 cx: &mut Context<Self>,
2223 ) -> Result<()> {
2224 self.terminals
2225 .get(&terminal_id)
2226 .context("Terminal not found")?
2227 .update(cx, |terminal, cx| {
2228 terminal.kill(cx);
2229 });
2230
2231 Ok(())
2232 }
2233
2234 pub fn release_terminal(
2235 &mut self,
2236 terminal_id: acp::TerminalId,
2237 cx: &mut Context<Self>,
2238 ) -> Result<()> {
2239 self.terminals
2240 .remove(&terminal_id)
2241 .context("Terminal not found")?
2242 .update(cx, |terminal, cx| {
2243 terminal.kill(cx);
2244 });
2245
2246 Ok(())
2247 }
2248
2249 pub fn terminal(&self, terminal_id: acp::TerminalId) -> Result<Entity<Terminal>> {
2250 self.terminals
2251 .get(&terminal_id)
2252 .context("Terminal not found")
2253 .cloned()
2254 }
2255
2256 pub fn to_markdown(&self, cx: &App) -> String {
2257 self.entries.iter().map(|e| e.to_markdown(cx)).collect()
2258 }
2259
2260 pub fn emit_load_error(&mut self, error: LoadError, cx: &mut Context<Self>) {
2261 cx.emit(AcpThreadEvent::LoadError(error));
2262 }
2263
2264 pub fn register_terminal_created(
2265 &mut self,
2266 terminal_id: acp::TerminalId,
2267 command_label: String,
2268 working_dir: Option<PathBuf>,
2269 output_byte_limit: Option<u64>,
2270 terminal: Entity<::terminal::Terminal>,
2271 cx: &mut Context<Self>,
2272 ) -> Entity<Terminal> {
2273 let language_registry = self.project.read(cx).languages().clone();
2274
2275 let entity = cx.new(|cx| {
2276 Terminal::new(
2277 terminal_id.clone(),
2278 &command_label,
2279 working_dir.clone(),
2280 output_byte_limit.map(|l| l as usize),
2281 terminal,
2282 language_registry,
2283 cx,
2284 )
2285 });
2286 self.terminals.insert(terminal_id.clone(), entity.clone());
2287 entity
2288 }
2289}
2290
2291fn markdown_for_raw_output(
2292 raw_output: &serde_json::Value,
2293 language_registry: &Arc<LanguageRegistry>,
2294 cx: &mut App,
2295) -> Option<Entity<Markdown>> {
2296 match raw_output {
2297 serde_json::Value::Null => None,
2298 serde_json::Value::Bool(value) => Some(cx.new(|cx| {
2299 Markdown::new(
2300 value.to_string().into(),
2301 Some(language_registry.clone()),
2302 None,
2303 cx,
2304 )
2305 })),
2306 serde_json::Value::Number(value) => Some(cx.new(|cx| {
2307 Markdown::new(
2308 value.to_string().into(),
2309 Some(language_registry.clone()),
2310 None,
2311 cx,
2312 )
2313 })),
2314 serde_json::Value::String(value) => Some(cx.new(|cx| {
2315 Markdown::new(
2316 value.clone().into(),
2317 Some(language_registry.clone()),
2318 None,
2319 cx,
2320 )
2321 })),
2322 value => Some(cx.new(|cx| {
2323 Markdown::new(
2324 format!("```json\n{}\n```", value).into(),
2325 Some(language_registry.clone()),
2326 None,
2327 cx,
2328 )
2329 })),
2330 }
2331}
2332
2333#[cfg(test)]
2334mod tests {
2335 use super::*;
2336 use anyhow::anyhow;
2337 use futures::{channel::mpsc, future::LocalBoxFuture, select};
2338 use gpui::{App, AsyncApp, TestAppContext, WeakEntity};
2339 use indoc::indoc;
2340 use project::{FakeFs, Fs};
2341 use rand::{distr, prelude::*};
2342 use serde_json::json;
2343 use settings::SettingsStore;
2344 use smol::stream::StreamExt as _;
2345 use std::{
2346 any::Any,
2347 cell::RefCell,
2348 path::Path,
2349 rc::Rc,
2350 sync::atomic::{AtomicBool, AtomicUsize, Ordering::SeqCst},
2351 time::Duration,
2352 };
2353 use util::path;
2354
2355 fn init_test(cx: &mut TestAppContext) {
2356 env_logger::try_init().ok();
2357 cx.update(|cx| {
2358 let settings_store = SettingsStore::test(cx);
2359 cx.set_global(settings_store);
2360 Project::init_settings(cx);
2361 language::init(cx);
2362 });
2363 }
2364
2365 #[gpui::test]
2366 async fn test_terminal_output_buffered_before_created_renders(cx: &mut gpui::TestAppContext) {
2367 init_test(cx);
2368
2369 let fs = FakeFs::new(cx.executor());
2370 let project = Project::test(fs, [], cx).await;
2371 let connection = Rc::new(FakeAgentConnection::new());
2372 let thread = cx
2373 .update(|cx| connection.new_thread(project, std::path::Path::new(path!("/test")), cx))
2374 .await
2375 .unwrap();
2376
2377 let terminal_id = acp::TerminalId(uuid::Uuid::new_v4().to_string().into());
2378
2379 // Send Output BEFORE Created - should be buffered by acp_thread
2380 thread.update(cx, |thread, cx| {
2381 thread.on_terminal_provider_event(
2382 TerminalProviderEvent::Output {
2383 terminal_id: terminal_id.clone(),
2384 data: b"hello buffered".to_vec(),
2385 },
2386 cx,
2387 );
2388 });
2389
2390 // Create a display-only terminal and then send Created
2391 let lower = cx.new(|cx| {
2392 let builder = ::terminal::TerminalBuilder::new_display_only(
2393 ::terminal::terminal_settings::CursorShape::default(),
2394 ::terminal::terminal_settings::AlternateScroll::On,
2395 None,
2396 0,
2397 )
2398 .unwrap();
2399 builder.subscribe(cx)
2400 });
2401
2402 thread.update(cx, |thread, cx| {
2403 thread.on_terminal_provider_event(
2404 TerminalProviderEvent::Created {
2405 terminal_id: terminal_id.clone(),
2406 label: "Buffered Test".to_string(),
2407 cwd: None,
2408 output_byte_limit: None,
2409 terminal: lower.clone(),
2410 },
2411 cx,
2412 );
2413 });
2414
2415 // After Created, buffered Output should have been flushed into the renderer
2416 let content = thread.read_with(cx, |thread, cx| {
2417 let term = thread.terminal(terminal_id.clone()).unwrap();
2418 term.read_with(cx, |t, cx| t.inner().read(cx).get_content())
2419 });
2420
2421 assert!(
2422 content.contains("hello buffered"),
2423 "expected buffered output to render, got: {content}"
2424 );
2425 }
2426
2427 #[gpui::test]
2428 async fn test_terminal_output_and_exit_buffered_before_created(cx: &mut gpui::TestAppContext) {
2429 init_test(cx);
2430
2431 let fs = FakeFs::new(cx.executor());
2432 let project = Project::test(fs, [], cx).await;
2433 let connection = Rc::new(FakeAgentConnection::new());
2434 let thread = cx
2435 .update(|cx| connection.new_thread(project, std::path::Path::new(path!("/test")), cx))
2436 .await
2437 .unwrap();
2438
2439 let terminal_id = acp::TerminalId(uuid::Uuid::new_v4().to_string().into());
2440
2441 // Send Output BEFORE Created
2442 thread.update(cx, |thread, cx| {
2443 thread.on_terminal_provider_event(
2444 TerminalProviderEvent::Output {
2445 terminal_id: terminal_id.clone(),
2446 data: b"pre-exit data".to_vec(),
2447 },
2448 cx,
2449 );
2450 });
2451
2452 // Send Exit BEFORE Created
2453 thread.update(cx, |thread, cx| {
2454 thread.on_terminal_provider_event(
2455 TerminalProviderEvent::Exit {
2456 terminal_id: terminal_id.clone(),
2457 status: acp::TerminalExitStatus {
2458 exit_code: Some(0),
2459 signal: None,
2460 meta: None,
2461 },
2462 },
2463 cx,
2464 );
2465 });
2466
2467 // Now create a display-only lower-level terminal and send Created
2468 let lower = cx.new(|cx| {
2469 let builder = ::terminal::TerminalBuilder::new_display_only(
2470 ::terminal::terminal_settings::CursorShape::default(),
2471 ::terminal::terminal_settings::AlternateScroll::On,
2472 None,
2473 0,
2474 )
2475 .unwrap();
2476 builder.subscribe(cx)
2477 });
2478
2479 thread.update(cx, |thread, cx| {
2480 thread.on_terminal_provider_event(
2481 TerminalProviderEvent::Created {
2482 terminal_id: terminal_id.clone(),
2483 label: "Buffered Exit Test".to_string(),
2484 cwd: None,
2485 output_byte_limit: None,
2486 terminal: lower.clone(),
2487 },
2488 cx,
2489 );
2490 });
2491
2492 // Output should be present after Created (flushed from buffer)
2493 let content = thread.read_with(cx, |thread, cx| {
2494 let term = thread.terminal(terminal_id.clone()).unwrap();
2495 term.read_with(cx, |t, cx| t.inner().read(cx).get_content())
2496 });
2497
2498 assert!(
2499 content.contains("pre-exit data"),
2500 "expected pre-exit data to render, got: {content}"
2501 );
2502 }
2503
2504 #[gpui::test]
2505 async fn test_push_user_content_block(cx: &mut gpui::TestAppContext) {
2506 init_test(cx);
2507
2508 let fs = FakeFs::new(cx.executor());
2509 let project = Project::test(fs, [], cx).await;
2510 let connection = Rc::new(FakeAgentConnection::new());
2511 let thread = cx
2512 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
2513 .await
2514 .unwrap();
2515
2516 // Test creating a new user message
2517 thread.update(cx, |thread, cx| {
2518 thread.push_user_content_block(
2519 None,
2520 acp::ContentBlock::Text(acp::TextContent {
2521 annotations: None,
2522 text: "Hello, ".to_string(),
2523 meta: None,
2524 }),
2525 cx,
2526 );
2527 });
2528
2529 thread.update(cx, |thread, cx| {
2530 assert_eq!(thread.entries.len(), 1);
2531 if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
2532 assert_eq!(user_msg.id, None);
2533 assert_eq!(user_msg.content.to_markdown(cx), "Hello, ");
2534 } else {
2535 panic!("Expected UserMessage");
2536 }
2537 });
2538
2539 // Test appending to existing user message
2540 let message_1_id = UserMessageId::new();
2541 thread.update(cx, |thread, cx| {
2542 thread.push_user_content_block(
2543 Some(message_1_id.clone()),
2544 acp::ContentBlock::Text(acp::TextContent {
2545 annotations: None,
2546 text: "world!".to_string(),
2547 meta: None,
2548 }),
2549 cx,
2550 );
2551 });
2552
2553 thread.update(cx, |thread, cx| {
2554 assert_eq!(thread.entries.len(), 1);
2555 if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
2556 assert_eq!(user_msg.id, Some(message_1_id));
2557 assert_eq!(user_msg.content.to_markdown(cx), "Hello, world!");
2558 } else {
2559 panic!("Expected UserMessage");
2560 }
2561 });
2562
2563 // Test creating new user message after assistant message
2564 thread.update(cx, |thread, cx| {
2565 thread.push_assistant_content_block(
2566 acp::ContentBlock::Text(acp::TextContent {
2567 annotations: None,
2568 text: "Assistant response".to_string(),
2569 meta: None,
2570 }),
2571 false,
2572 cx,
2573 );
2574 });
2575
2576 let message_2_id = UserMessageId::new();
2577 thread.update(cx, |thread, cx| {
2578 thread.push_user_content_block(
2579 Some(message_2_id.clone()),
2580 acp::ContentBlock::Text(acp::TextContent {
2581 annotations: None,
2582 text: "New user message".to_string(),
2583 meta: None,
2584 }),
2585 cx,
2586 );
2587 });
2588
2589 thread.update(cx, |thread, cx| {
2590 assert_eq!(thread.entries.len(), 3);
2591 if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[2] {
2592 assert_eq!(user_msg.id, Some(message_2_id));
2593 assert_eq!(user_msg.content.to_markdown(cx), "New user message");
2594 } else {
2595 panic!("Expected UserMessage at index 2");
2596 }
2597 });
2598 }
2599
2600 #[gpui::test]
2601 async fn test_thinking_concatenation(cx: &mut gpui::TestAppContext) {
2602 init_test(cx);
2603
2604 let fs = FakeFs::new(cx.executor());
2605 let project = Project::test(fs, [], cx).await;
2606 let connection = Rc::new(FakeAgentConnection::new().on_user_message(
2607 |_, thread, mut cx| {
2608 async move {
2609 thread.update(&mut cx, |thread, cx| {
2610 thread
2611 .handle_session_update(
2612 acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk {
2613 content: "Thinking ".into(),
2614 meta: None,
2615 }),
2616 cx,
2617 )
2618 .unwrap();
2619 thread
2620 .handle_session_update(
2621 acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk {
2622 content: "hard!".into(),
2623 meta: None,
2624 }),
2625 cx,
2626 )
2627 .unwrap();
2628 })?;
2629 Ok(acp::PromptResponse {
2630 stop_reason: acp::StopReason::EndTurn,
2631 meta: None,
2632 })
2633 }
2634 .boxed_local()
2635 },
2636 ));
2637
2638 let thread = cx
2639 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
2640 .await
2641 .unwrap();
2642
2643 thread
2644 .update(cx, |thread, cx| thread.send_raw("Hello from Zed!", cx))
2645 .await
2646 .unwrap();
2647
2648 let output = thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
2649 assert_eq!(
2650 output,
2651 indoc! {r#"
2652 ## User
2653
2654 Hello from Zed!
2655
2656 ## Assistant
2657
2658 <thinking>
2659 Thinking hard!
2660 </thinking>
2661
2662 "#}
2663 );
2664 }
2665
2666 #[gpui::test]
2667 async fn test_edits_concurrently_to_user(cx: &mut TestAppContext) {
2668 init_test(cx);
2669
2670 let fs = FakeFs::new(cx.executor());
2671 fs.insert_tree(path!("/tmp"), json!({"foo": "one\ntwo\nthree\n"}))
2672 .await;
2673 let project = Project::test(fs.clone(), [], cx).await;
2674 let (read_file_tx, read_file_rx) = oneshot::channel::<()>();
2675 let read_file_tx = Rc::new(RefCell::new(Some(read_file_tx)));
2676 let connection = Rc::new(FakeAgentConnection::new().on_user_message(
2677 move |_, thread, mut cx| {
2678 let read_file_tx = read_file_tx.clone();
2679 async move {
2680 let content = thread
2681 .update(&mut cx, |thread, cx| {
2682 thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
2683 })
2684 .unwrap()
2685 .await
2686 .unwrap();
2687 assert_eq!(content, "one\ntwo\nthree\n");
2688 read_file_tx.take().unwrap().send(()).unwrap();
2689 thread
2690 .update(&mut cx, |thread, cx| {
2691 thread.write_text_file(
2692 path!("/tmp/foo").into(),
2693 "one\ntwo\nthree\nfour\nfive\n".to_string(),
2694 cx,
2695 )
2696 })
2697 .unwrap()
2698 .await
2699 .unwrap();
2700 Ok(acp::PromptResponse {
2701 stop_reason: acp::StopReason::EndTurn,
2702 meta: None,
2703 })
2704 }
2705 .boxed_local()
2706 },
2707 ));
2708
2709 let (worktree, pathbuf) = project
2710 .update(cx, |project, cx| {
2711 project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
2712 })
2713 .await
2714 .unwrap();
2715 let buffer = project
2716 .update(cx, |project, cx| {
2717 project.open_buffer((worktree.read(cx).id(), pathbuf), cx)
2718 })
2719 .await
2720 .unwrap();
2721
2722 let thread = cx
2723 .update(|cx| connection.new_thread(project, Path::new(path!("/tmp")), cx))
2724 .await
2725 .unwrap();
2726
2727 let request = thread.update(cx, |thread, cx| {
2728 thread.send_raw("Extend the count in /tmp/foo", cx)
2729 });
2730 read_file_rx.await.ok();
2731 buffer.update(cx, |buffer, cx| {
2732 buffer.edit([(0..0, "zero\n".to_string())], None, cx);
2733 });
2734 cx.run_until_parked();
2735 assert_eq!(
2736 buffer.read_with(cx, |buffer, _| buffer.text()),
2737 "zero\none\ntwo\nthree\nfour\nfive\n"
2738 );
2739 assert_eq!(
2740 String::from_utf8(fs.read_file_sync(path!("/tmp/foo")).unwrap()).unwrap(),
2741 "zero\none\ntwo\nthree\nfour\nfive\n"
2742 );
2743 request.await.unwrap();
2744 }
2745
2746 #[gpui::test]
2747 async fn test_reading_from_line(cx: &mut TestAppContext) {
2748 init_test(cx);
2749
2750 let fs = FakeFs::new(cx.executor());
2751 fs.insert_tree(path!("/tmp"), json!({"foo": "one\ntwo\nthree\nfour\n"}))
2752 .await;
2753 let project = Project::test(fs.clone(), [], cx).await;
2754 project
2755 .update(cx, |project, cx| {
2756 project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
2757 })
2758 .await
2759 .unwrap();
2760
2761 let connection = Rc::new(FakeAgentConnection::new());
2762
2763 let thread = cx
2764 .update(|cx| connection.new_thread(project, Path::new(path!("/tmp")), cx))
2765 .await
2766 .unwrap();
2767
2768 // Whole file
2769 let content = thread
2770 .update(cx, |thread, cx| {
2771 thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
2772 })
2773 .await
2774 .unwrap();
2775
2776 assert_eq!(content, "one\ntwo\nthree\nfour\n");
2777
2778 // Only start line
2779 let content = thread
2780 .update(cx, |thread, cx| {
2781 thread.read_text_file(path!("/tmp/foo").into(), Some(3), None, false, cx)
2782 })
2783 .await
2784 .unwrap();
2785
2786 assert_eq!(content, "three\nfour\n");
2787
2788 // Only limit
2789 let content = thread
2790 .update(cx, |thread, cx| {
2791 thread.read_text_file(path!("/tmp/foo").into(), None, Some(2), false, cx)
2792 })
2793 .await
2794 .unwrap();
2795
2796 assert_eq!(content, "one\ntwo\n");
2797
2798 // Range
2799 let content = thread
2800 .update(cx, |thread, cx| {
2801 thread.read_text_file(path!("/tmp/foo").into(), Some(2), Some(2), false, cx)
2802 })
2803 .await
2804 .unwrap();
2805
2806 assert_eq!(content, "two\nthree\n");
2807
2808 // Invalid
2809 let err = thread
2810 .update(cx, |thread, cx| {
2811 thread.read_text_file(path!("/tmp/foo").into(), Some(6), Some(2), false, cx)
2812 })
2813 .await
2814 .unwrap_err();
2815
2816 assert_eq!(
2817 err.to_string(),
2818 "Invalid params: \"Attempting to read beyond the end of the file, line 5:0\""
2819 );
2820 }
2821
2822 #[gpui::test]
2823 async fn test_reading_empty_file(cx: &mut TestAppContext) {
2824 init_test(cx);
2825
2826 let fs = FakeFs::new(cx.executor());
2827 fs.insert_tree(path!("/tmp"), json!({"foo": ""})).await;
2828 let project = Project::test(fs.clone(), [], cx).await;
2829 project
2830 .update(cx, |project, cx| {
2831 project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
2832 })
2833 .await
2834 .unwrap();
2835
2836 let connection = Rc::new(FakeAgentConnection::new());
2837
2838 let thread = cx
2839 .update(|cx| connection.new_thread(project, Path::new(path!("/tmp")), cx))
2840 .await
2841 .unwrap();
2842
2843 // Whole file
2844 let content = thread
2845 .update(cx, |thread, cx| {
2846 thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
2847 })
2848 .await
2849 .unwrap();
2850
2851 assert_eq!(content, "");
2852
2853 // Only start line
2854 let content = thread
2855 .update(cx, |thread, cx| {
2856 thread.read_text_file(path!("/tmp/foo").into(), Some(1), None, false, cx)
2857 })
2858 .await
2859 .unwrap();
2860
2861 assert_eq!(content, "");
2862
2863 // Only limit
2864 let content = thread
2865 .update(cx, |thread, cx| {
2866 thread.read_text_file(path!("/tmp/foo").into(), None, Some(2), false, cx)
2867 })
2868 .await
2869 .unwrap();
2870
2871 assert_eq!(content, "");
2872
2873 // Range
2874 let content = thread
2875 .update(cx, |thread, cx| {
2876 thread.read_text_file(path!("/tmp/foo").into(), Some(1), Some(1), false, cx)
2877 })
2878 .await
2879 .unwrap();
2880
2881 assert_eq!(content, "");
2882
2883 // Invalid
2884 let err = thread
2885 .update(cx, |thread, cx| {
2886 thread.read_text_file(path!("/tmp/foo").into(), Some(5), Some(2), false, cx)
2887 })
2888 .await
2889 .unwrap_err();
2890
2891 assert_eq!(
2892 err.to_string(),
2893 "Invalid params: \"Attempting to read beyond the end of the file, line 1:0\""
2894 );
2895 }
2896 #[gpui::test]
2897 async fn test_reading_non_existing_file(cx: &mut TestAppContext) {
2898 init_test(cx);
2899
2900 let fs = FakeFs::new(cx.executor());
2901 fs.insert_tree(path!("/tmp"), json!({})).await;
2902 let project = Project::test(fs.clone(), [], cx).await;
2903 project
2904 .update(cx, |project, cx| {
2905 project.find_or_create_worktree(path!("/tmp"), true, cx)
2906 })
2907 .await
2908 .unwrap();
2909
2910 let connection = Rc::new(FakeAgentConnection::new());
2911
2912 let thread = cx
2913 .update(|cx| connection.new_thread(project, Path::new(path!("/tmp")), cx))
2914 .await
2915 .unwrap();
2916
2917 // Out of project file
2918 let err = thread
2919 .update(cx, |thread, cx| {
2920 thread.read_text_file(path!("/foo").into(), None, None, false, cx)
2921 })
2922 .await
2923 .unwrap_err();
2924
2925 assert_eq!(err.code, acp::ErrorCode::RESOURCE_NOT_FOUND.code);
2926 }
2927
2928 #[gpui::test]
2929 async fn test_succeeding_canceled_toolcall(cx: &mut TestAppContext) {
2930 init_test(cx);
2931
2932 let fs = FakeFs::new(cx.executor());
2933 let project = Project::test(fs, [], cx).await;
2934 let id = acp::ToolCallId("test".into());
2935
2936 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
2937 let id = id.clone();
2938 move |_, thread, mut cx| {
2939 let id = id.clone();
2940 async move {
2941 thread
2942 .update(&mut cx, |thread, cx| {
2943 thread.handle_session_update(
2944 acp::SessionUpdate::ToolCall(acp::ToolCall {
2945 id: id.clone(),
2946 title: "Label".into(),
2947 kind: acp::ToolKind::Fetch,
2948 status: acp::ToolCallStatus::InProgress,
2949 content: vec![],
2950 locations: vec![],
2951 raw_input: None,
2952 raw_output: None,
2953 meta: None,
2954 }),
2955 cx,
2956 )
2957 })
2958 .unwrap()
2959 .unwrap();
2960 Ok(acp::PromptResponse {
2961 stop_reason: acp::StopReason::EndTurn,
2962 meta: None,
2963 })
2964 }
2965 .boxed_local()
2966 }
2967 }));
2968
2969 let thread = cx
2970 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
2971 .await
2972 .unwrap();
2973
2974 let request = thread.update(cx, |thread, cx| {
2975 thread.send_raw("Fetch https://example.com", cx)
2976 });
2977
2978 run_until_first_tool_call(&thread, cx).await;
2979
2980 thread.read_with(cx, |thread, _| {
2981 assert!(matches!(
2982 thread.entries[1],
2983 AgentThreadEntry::ToolCall(ToolCall {
2984 status: ToolCallStatus::InProgress,
2985 ..
2986 })
2987 ));
2988 });
2989
2990 thread.update(cx, |thread, cx| thread.cancel(cx)).await;
2991
2992 thread.read_with(cx, |thread, _| {
2993 assert!(matches!(
2994 &thread.entries[1],
2995 AgentThreadEntry::ToolCall(ToolCall {
2996 status: ToolCallStatus::Canceled,
2997 ..
2998 })
2999 ));
3000 });
3001
3002 thread
3003 .update(cx, |thread, cx| {
3004 thread.handle_session_update(
3005 acp::SessionUpdate::ToolCallUpdate(acp::ToolCallUpdate {
3006 id,
3007 fields: acp::ToolCallUpdateFields {
3008 status: Some(acp::ToolCallStatus::Completed),
3009 ..Default::default()
3010 },
3011 meta: None,
3012 }),
3013 cx,
3014 )
3015 })
3016 .unwrap();
3017
3018 request.await.unwrap();
3019
3020 thread.read_with(cx, |thread, _| {
3021 assert!(matches!(
3022 thread.entries[1],
3023 AgentThreadEntry::ToolCall(ToolCall {
3024 status: ToolCallStatus::Completed,
3025 ..
3026 })
3027 ));
3028 });
3029 }
3030
3031 #[gpui::test]
3032 async fn test_no_pending_edits_if_tool_calls_are_completed(cx: &mut TestAppContext) {
3033 init_test(cx);
3034 let fs = FakeFs::new(cx.background_executor.clone());
3035 fs.insert_tree(path!("/test"), json!({})).await;
3036 let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
3037
3038 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3039 move |_, thread, mut cx| {
3040 async move {
3041 thread
3042 .update(&mut cx, |thread, cx| {
3043 thread.handle_session_update(
3044 acp::SessionUpdate::ToolCall(acp::ToolCall {
3045 id: acp::ToolCallId("test".into()),
3046 title: "Label".into(),
3047 kind: acp::ToolKind::Edit,
3048 status: acp::ToolCallStatus::Completed,
3049 content: vec![acp::ToolCallContent::Diff {
3050 diff: acp::Diff {
3051 path: "/test/test.txt".into(),
3052 old_text: None,
3053 new_text: "foo".into(),
3054 meta: None,
3055 },
3056 }],
3057 locations: vec![],
3058 raw_input: None,
3059 raw_output: None,
3060 meta: None,
3061 }),
3062 cx,
3063 )
3064 })
3065 .unwrap()
3066 .unwrap();
3067 Ok(acp::PromptResponse {
3068 stop_reason: acp::StopReason::EndTurn,
3069 meta: None,
3070 })
3071 }
3072 .boxed_local()
3073 }
3074 }));
3075
3076 let thread = cx
3077 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3078 .await
3079 .unwrap();
3080
3081 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Hi".into()], cx)))
3082 .await
3083 .unwrap();
3084
3085 assert!(cx.read(|cx| !thread.read(cx).has_pending_edit_tool_calls()));
3086 }
3087
3088 #[gpui::test(iterations = 10)]
3089 async fn test_checkpoints(cx: &mut TestAppContext) {
3090 init_test(cx);
3091 let fs = FakeFs::new(cx.background_executor.clone());
3092 fs.insert_tree(
3093 path!("/test"),
3094 json!({
3095 ".git": {}
3096 }),
3097 )
3098 .await;
3099 let project = Project::test(fs.clone(), [path!("/test").as_ref()], cx).await;
3100
3101 let simulate_changes = Arc::new(AtomicBool::new(true));
3102 let next_filename = Arc::new(AtomicUsize::new(0));
3103 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3104 let simulate_changes = simulate_changes.clone();
3105 let next_filename = next_filename.clone();
3106 let fs = fs.clone();
3107 move |request, thread, mut cx| {
3108 let fs = fs.clone();
3109 let simulate_changes = simulate_changes.clone();
3110 let next_filename = next_filename.clone();
3111 async move {
3112 if simulate_changes.load(SeqCst) {
3113 let filename = format!("/test/file-{}", next_filename.fetch_add(1, SeqCst));
3114 fs.write(Path::new(&filename), b"").await?;
3115 }
3116
3117 let acp::ContentBlock::Text(content) = &request.prompt[0] else {
3118 panic!("expected text content block");
3119 };
3120 thread.update(&mut cx, |thread, cx| {
3121 thread
3122 .handle_session_update(
3123 acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk {
3124 content: content.text.to_uppercase().into(),
3125 meta: None,
3126 }),
3127 cx,
3128 )
3129 .unwrap();
3130 })?;
3131 Ok(acp::PromptResponse {
3132 stop_reason: acp::StopReason::EndTurn,
3133 meta: None,
3134 })
3135 }
3136 .boxed_local()
3137 }
3138 }));
3139 let thread = cx
3140 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3141 .await
3142 .unwrap();
3143
3144 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Lorem".into()], cx)))
3145 .await
3146 .unwrap();
3147 thread.read_with(cx, |thread, cx| {
3148 assert_eq!(
3149 thread.to_markdown(cx),
3150 indoc! {"
3151 ## User (checkpoint)
3152
3153 Lorem
3154
3155 ## Assistant
3156
3157 LOREM
3158
3159 "}
3160 );
3161 });
3162 assert_eq!(fs.files(), vec![Path::new(path!("/test/file-0"))]);
3163
3164 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["ipsum".into()], cx)))
3165 .await
3166 .unwrap();
3167 thread.read_with(cx, |thread, cx| {
3168 assert_eq!(
3169 thread.to_markdown(cx),
3170 indoc! {"
3171 ## User (checkpoint)
3172
3173 Lorem
3174
3175 ## Assistant
3176
3177 LOREM
3178
3179 ## User (checkpoint)
3180
3181 ipsum
3182
3183 ## Assistant
3184
3185 IPSUM
3186
3187 "}
3188 );
3189 });
3190 assert_eq!(
3191 fs.files(),
3192 vec![
3193 Path::new(path!("/test/file-0")),
3194 Path::new(path!("/test/file-1"))
3195 ]
3196 );
3197
3198 // Checkpoint isn't stored when there are no changes.
3199 simulate_changes.store(false, SeqCst);
3200 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["dolor".into()], cx)))
3201 .await
3202 .unwrap();
3203 thread.read_with(cx, |thread, cx| {
3204 assert_eq!(
3205 thread.to_markdown(cx),
3206 indoc! {"
3207 ## User (checkpoint)
3208
3209 Lorem
3210
3211 ## Assistant
3212
3213 LOREM
3214
3215 ## User (checkpoint)
3216
3217 ipsum
3218
3219 ## Assistant
3220
3221 IPSUM
3222
3223 ## User
3224
3225 dolor
3226
3227 ## Assistant
3228
3229 DOLOR
3230
3231 "}
3232 );
3233 });
3234 assert_eq!(
3235 fs.files(),
3236 vec![
3237 Path::new(path!("/test/file-0")),
3238 Path::new(path!("/test/file-1"))
3239 ]
3240 );
3241
3242 // Rewinding the conversation truncates the history and restores the checkpoint.
3243 thread
3244 .update(cx, |thread, cx| {
3245 let AgentThreadEntry::UserMessage(message) = &thread.entries[2] else {
3246 panic!("unexpected entries {:?}", thread.entries)
3247 };
3248 thread.restore_checkpoint(message.id.clone().unwrap(), cx)
3249 })
3250 .await
3251 .unwrap();
3252 thread.read_with(cx, |thread, cx| {
3253 assert_eq!(
3254 thread.to_markdown(cx),
3255 indoc! {"
3256 ## User (checkpoint)
3257
3258 Lorem
3259
3260 ## Assistant
3261
3262 LOREM
3263
3264 "}
3265 );
3266 });
3267 assert_eq!(fs.files(), vec![Path::new(path!("/test/file-0"))]);
3268 }
3269
3270 #[gpui::test]
3271 async fn test_tool_result_refusal(cx: &mut TestAppContext) {
3272 use std::sync::atomic::AtomicUsize;
3273 init_test(cx);
3274
3275 let fs = FakeFs::new(cx.executor());
3276 let project = Project::test(fs, None, cx).await;
3277
3278 // Create a connection that simulates refusal after tool result
3279 let prompt_count = Arc::new(AtomicUsize::new(0));
3280 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3281 let prompt_count = prompt_count.clone();
3282 move |_request, thread, mut cx| {
3283 let count = prompt_count.fetch_add(1, SeqCst);
3284 async move {
3285 if count == 0 {
3286 // First prompt: Generate a tool call with result
3287 thread.update(&mut cx, |thread, cx| {
3288 thread
3289 .handle_session_update(
3290 acp::SessionUpdate::ToolCall(acp::ToolCall {
3291 id: acp::ToolCallId("tool1".into()),
3292 title: "Test Tool".into(),
3293 kind: acp::ToolKind::Fetch,
3294 status: acp::ToolCallStatus::Completed,
3295 content: vec![],
3296 locations: vec![],
3297 raw_input: Some(serde_json::json!({"query": "test"})),
3298 raw_output: Some(
3299 serde_json::json!({"result": "inappropriate content"}),
3300 ),
3301 meta: None,
3302 }),
3303 cx,
3304 )
3305 .unwrap();
3306 })?;
3307
3308 // Now return refusal because of the tool result
3309 Ok(acp::PromptResponse {
3310 stop_reason: acp::StopReason::Refusal,
3311 meta: None,
3312 })
3313 } else {
3314 Ok(acp::PromptResponse {
3315 stop_reason: acp::StopReason::EndTurn,
3316 meta: None,
3317 })
3318 }
3319 }
3320 .boxed_local()
3321 }
3322 }));
3323
3324 let thread = cx
3325 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3326 .await
3327 .unwrap();
3328
3329 // Track if we see a Refusal event
3330 let saw_refusal_event = Arc::new(std::sync::Mutex::new(false));
3331 let saw_refusal_event_captured = saw_refusal_event.clone();
3332 thread.update(cx, |_thread, cx| {
3333 cx.subscribe(
3334 &thread,
3335 move |_thread, _event_thread, event: &AcpThreadEvent, _cx| {
3336 if matches!(event, AcpThreadEvent::Refusal) {
3337 *saw_refusal_event_captured.lock().unwrap() = true;
3338 }
3339 },
3340 )
3341 .detach();
3342 });
3343
3344 // Send a user message - this will trigger tool call and then refusal
3345 let send_task = thread.update(cx, |thread, cx| {
3346 thread.send(
3347 vec![acp::ContentBlock::Text(acp::TextContent {
3348 text: "Hello".into(),
3349 annotations: None,
3350 meta: None,
3351 })],
3352 cx,
3353 )
3354 });
3355 cx.background_executor.spawn(send_task).detach();
3356 cx.run_until_parked();
3357
3358 // Verify that:
3359 // 1. A Refusal event WAS emitted (because it's a tool result refusal, not user prompt)
3360 // 2. The user message was NOT truncated
3361 assert!(
3362 *saw_refusal_event.lock().unwrap(),
3363 "Refusal event should be emitted for tool result refusals"
3364 );
3365
3366 thread.read_with(cx, |thread, _| {
3367 let entries = thread.entries();
3368 assert!(entries.len() >= 2, "Should have user message and tool call");
3369
3370 // Verify user message is still there
3371 assert!(
3372 matches!(entries[0], AgentThreadEntry::UserMessage(_)),
3373 "User message should not be truncated"
3374 );
3375
3376 // Verify tool call is there with result
3377 if let AgentThreadEntry::ToolCall(tool_call) = &entries[1] {
3378 assert!(
3379 tool_call.raw_output.is_some(),
3380 "Tool call should have output"
3381 );
3382 } else {
3383 panic!("Expected tool call at index 1");
3384 }
3385 });
3386 }
3387
3388 #[gpui::test]
3389 async fn test_user_prompt_refusal_emits_event(cx: &mut TestAppContext) {
3390 init_test(cx);
3391
3392 let fs = FakeFs::new(cx.executor());
3393 let project = Project::test(fs, None, cx).await;
3394
3395 let refuse_next = Arc::new(AtomicBool::new(false));
3396 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3397 let refuse_next = refuse_next.clone();
3398 move |_request, _thread, _cx| {
3399 if refuse_next.load(SeqCst) {
3400 async move {
3401 Ok(acp::PromptResponse {
3402 stop_reason: acp::StopReason::Refusal,
3403 meta: None,
3404 })
3405 }
3406 .boxed_local()
3407 } else {
3408 async move {
3409 Ok(acp::PromptResponse {
3410 stop_reason: acp::StopReason::EndTurn,
3411 meta: None,
3412 })
3413 }
3414 .boxed_local()
3415 }
3416 }
3417 }));
3418
3419 let thread = cx
3420 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3421 .await
3422 .unwrap();
3423
3424 // Track if we see a Refusal event
3425 let saw_refusal_event = Arc::new(std::sync::Mutex::new(false));
3426 let saw_refusal_event_captured = saw_refusal_event.clone();
3427 thread.update(cx, |_thread, cx| {
3428 cx.subscribe(
3429 &thread,
3430 move |_thread, _event_thread, event: &AcpThreadEvent, _cx| {
3431 if matches!(event, AcpThreadEvent::Refusal) {
3432 *saw_refusal_event_captured.lock().unwrap() = true;
3433 }
3434 },
3435 )
3436 .detach();
3437 });
3438
3439 // Send a message that will be refused
3440 refuse_next.store(true, SeqCst);
3441 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["hello".into()], cx)))
3442 .await
3443 .unwrap();
3444
3445 // Verify that a Refusal event WAS emitted for user prompt refusal
3446 assert!(
3447 *saw_refusal_event.lock().unwrap(),
3448 "Refusal event should be emitted for user prompt refusals"
3449 );
3450
3451 // Verify the message was truncated (user prompt refusal)
3452 thread.read_with(cx, |thread, cx| {
3453 assert_eq!(thread.to_markdown(cx), "");
3454 });
3455 }
3456
3457 #[gpui::test]
3458 async fn test_refusal(cx: &mut TestAppContext) {
3459 init_test(cx);
3460 let fs = FakeFs::new(cx.background_executor.clone());
3461 fs.insert_tree(path!("/"), json!({})).await;
3462 let project = Project::test(fs.clone(), [path!("/").as_ref()], cx).await;
3463
3464 let refuse_next = Arc::new(AtomicBool::new(false));
3465 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3466 let refuse_next = refuse_next.clone();
3467 move |request, thread, mut cx| {
3468 let refuse_next = refuse_next.clone();
3469 async move {
3470 if refuse_next.load(SeqCst) {
3471 return Ok(acp::PromptResponse {
3472 stop_reason: acp::StopReason::Refusal,
3473 meta: None,
3474 });
3475 }
3476
3477 let acp::ContentBlock::Text(content) = &request.prompt[0] else {
3478 panic!("expected text content block");
3479 };
3480 thread.update(&mut cx, |thread, cx| {
3481 thread
3482 .handle_session_update(
3483 acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk {
3484 content: content.text.to_uppercase().into(),
3485 meta: None,
3486 }),
3487 cx,
3488 )
3489 .unwrap();
3490 })?;
3491 Ok(acp::PromptResponse {
3492 stop_reason: acp::StopReason::EndTurn,
3493 meta: None,
3494 })
3495 }
3496 .boxed_local()
3497 }
3498 }));
3499 let thread = cx
3500 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3501 .await
3502 .unwrap();
3503
3504 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["hello".into()], cx)))
3505 .await
3506 .unwrap();
3507 thread.read_with(cx, |thread, cx| {
3508 assert_eq!(
3509 thread.to_markdown(cx),
3510 indoc! {"
3511 ## User
3512
3513 hello
3514
3515 ## Assistant
3516
3517 HELLO
3518
3519 "}
3520 );
3521 });
3522
3523 // Simulate refusing the second message. The message should be truncated
3524 // when a user prompt is refused.
3525 refuse_next.store(true, SeqCst);
3526 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["world".into()], cx)))
3527 .await
3528 .unwrap();
3529 thread.read_with(cx, |thread, cx| {
3530 assert_eq!(
3531 thread.to_markdown(cx),
3532 indoc! {"
3533 ## User
3534
3535 hello
3536
3537 ## Assistant
3538
3539 HELLO
3540
3541 "}
3542 );
3543 });
3544 }
3545
3546 async fn run_until_first_tool_call(
3547 thread: &Entity<AcpThread>,
3548 cx: &mut TestAppContext,
3549 ) -> usize {
3550 let (mut tx, mut rx) = mpsc::channel::<usize>(1);
3551
3552 let subscription = cx.update(|cx| {
3553 cx.subscribe(thread, move |thread, _, cx| {
3554 for (ix, entry) in thread.read(cx).entries.iter().enumerate() {
3555 if matches!(entry, AgentThreadEntry::ToolCall(_)) {
3556 return tx.try_send(ix).unwrap();
3557 }
3558 }
3559 })
3560 });
3561
3562 select! {
3563 _ = futures::FutureExt::fuse(smol::Timer::after(Duration::from_secs(10))) => {
3564 panic!("Timeout waiting for tool call")
3565 }
3566 ix = rx.next().fuse() => {
3567 drop(subscription);
3568 ix.unwrap()
3569 }
3570 }
3571 }
3572
3573 #[derive(Clone, Default)]
3574 struct FakeAgentConnection {
3575 auth_methods: Vec<acp::AuthMethod>,
3576 sessions: Arc<parking_lot::Mutex<HashMap<acp::SessionId, WeakEntity<AcpThread>>>>,
3577 on_user_message: Option<
3578 Rc<
3579 dyn Fn(
3580 acp::PromptRequest,
3581 WeakEntity<AcpThread>,
3582 AsyncApp,
3583 ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
3584 + 'static,
3585 >,
3586 >,
3587 }
3588
3589 impl FakeAgentConnection {
3590 fn new() -> Self {
3591 Self {
3592 auth_methods: Vec::new(),
3593 on_user_message: None,
3594 sessions: Arc::default(),
3595 }
3596 }
3597
3598 #[expect(unused)]
3599 fn with_auth_methods(mut self, auth_methods: Vec<acp::AuthMethod>) -> Self {
3600 self.auth_methods = auth_methods;
3601 self
3602 }
3603
3604 fn on_user_message(
3605 mut self,
3606 handler: impl Fn(
3607 acp::PromptRequest,
3608 WeakEntity<AcpThread>,
3609 AsyncApp,
3610 ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
3611 + 'static,
3612 ) -> Self {
3613 self.on_user_message.replace(Rc::new(handler));
3614 self
3615 }
3616 }
3617
3618 impl AgentConnection for FakeAgentConnection {
3619 fn auth_methods(&self) -> &[acp::AuthMethod] {
3620 &self.auth_methods
3621 }
3622
3623 fn new_thread(
3624 self: Rc<Self>,
3625 project: Entity<Project>,
3626 _cwd: &Path,
3627 cx: &mut App,
3628 ) -> Task<gpui::Result<Entity<AcpThread>>> {
3629 let session_id = acp::SessionId(
3630 rand::rng()
3631 .sample_iter(&distr::Alphanumeric)
3632 .take(7)
3633 .map(char::from)
3634 .collect::<String>()
3635 .into(),
3636 );
3637 let action_log = cx.new(|_| ActionLog::new(project.clone()));
3638 let thread = cx.new(|cx| {
3639 AcpThread::new(
3640 "Test",
3641 self.clone(),
3642 project,
3643 action_log,
3644 session_id.clone(),
3645 watch::Receiver::constant(acp::PromptCapabilities {
3646 image: true,
3647 audio: true,
3648 embedded_context: true,
3649 meta: None,
3650 }),
3651 cx,
3652 )
3653 });
3654 self.sessions.lock().insert(session_id, thread.downgrade());
3655 Task::ready(Ok(thread))
3656 }
3657
3658 fn authenticate(&self, method: acp::AuthMethodId, _cx: &mut App) -> Task<gpui::Result<()>> {
3659 if self.auth_methods().iter().any(|m| m.id == method) {
3660 Task::ready(Ok(()))
3661 } else {
3662 Task::ready(Err(anyhow!("Invalid Auth Method")))
3663 }
3664 }
3665
3666 fn prompt(
3667 &self,
3668 _id: Option<UserMessageId>,
3669 params: acp::PromptRequest,
3670 cx: &mut App,
3671 ) -> Task<gpui::Result<acp::PromptResponse>> {
3672 let sessions = self.sessions.lock();
3673 let thread = sessions.get(¶ms.session_id).unwrap();
3674 if let Some(handler) = &self.on_user_message {
3675 let handler = handler.clone();
3676 let thread = thread.clone();
3677 cx.spawn(async move |cx| handler(params, thread, cx.clone()).await)
3678 } else {
3679 Task::ready(Ok(acp::PromptResponse {
3680 stop_reason: acp::StopReason::EndTurn,
3681 meta: None,
3682 }))
3683 }
3684 }
3685
3686 fn cancel(&self, session_id: &acp::SessionId, cx: &mut App) {
3687 let sessions = self.sessions.lock();
3688 let thread = sessions.get(session_id).unwrap().clone();
3689
3690 cx.spawn(async move |cx| {
3691 thread
3692 .update(cx, |thread, cx| thread.cancel(cx))
3693 .unwrap()
3694 .await
3695 })
3696 .detach();
3697 }
3698
3699 fn truncate(
3700 &self,
3701 session_id: &acp::SessionId,
3702 _cx: &App,
3703 ) -> Option<Rc<dyn AgentSessionTruncate>> {
3704 Some(Rc::new(FakeAgentSessionEditor {
3705 _session_id: session_id.clone(),
3706 }))
3707 }
3708
3709 fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
3710 self
3711 }
3712 }
3713
3714 struct FakeAgentSessionEditor {
3715 _session_id: acp::SessionId,
3716 }
3717
3718 impl AgentSessionTruncate for FakeAgentSessionEditor {
3719 fn run(&self, _message_id: UserMessageId, _cx: &mut App) -> Task<Result<()>> {
3720 Task::ready(Ok(()))
3721 }
3722 }
3723
3724 #[gpui::test]
3725 async fn test_tool_call_not_found_creates_failed_entry(cx: &mut TestAppContext) {
3726 init_test(cx);
3727
3728 let fs = FakeFs::new(cx.executor());
3729 let project = Project::test(fs, [], cx).await;
3730 let connection = Rc::new(FakeAgentConnection::new());
3731 let thread = cx
3732 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3733 .await
3734 .unwrap();
3735
3736 // Try to update a tool call that doesn't exist
3737 let nonexistent_id = acp::ToolCallId("nonexistent-tool-call".into());
3738 thread.update(cx, |thread, cx| {
3739 let result = thread.handle_session_update(
3740 acp::SessionUpdate::ToolCallUpdate(acp::ToolCallUpdate {
3741 id: nonexistent_id.clone(),
3742 fields: acp::ToolCallUpdateFields {
3743 status: Some(acp::ToolCallStatus::Completed),
3744 ..Default::default()
3745 },
3746 meta: None,
3747 }),
3748 cx,
3749 );
3750
3751 // The update should succeed (not return an error)
3752 assert!(result.is_ok());
3753
3754 // There should now be exactly one entry in the thread
3755 assert_eq!(thread.entries.len(), 1);
3756
3757 // The entry should be a failed tool call
3758 if let AgentThreadEntry::ToolCall(tool_call) = &thread.entries[0] {
3759 assert_eq!(tool_call.id, nonexistent_id);
3760 assert!(matches!(tool_call.status, ToolCallStatus::Failed));
3761 assert_eq!(tool_call.kind, acp::ToolKind::Fetch);
3762
3763 // Check that the content contains the error message
3764 assert_eq!(tool_call.content.len(), 1);
3765 if let ToolCallContent::ContentBlock(content_block) = &tool_call.content[0] {
3766 match content_block {
3767 ContentBlock::Markdown { markdown } => {
3768 let markdown_text = markdown.read(cx).source();
3769 assert!(markdown_text.contains("Tool call not found"));
3770 }
3771 ContentBlock::Empty => panic!("Expected markdown content, got empty"),
3772 ContentBlock::ResourceLink { .. } => {
3773 panic!("Expected markdown content, got resource link")
3774 }
3775 }
3776 } else {
3777 panic!("Expected ContentBlock, got: {:?}", tool_call.content[0]);
3778 }
3779 } else {
3780 panic!("Expected ToolCall entry, got: {:?}", thread.entries[0]);
3781 }
3782 });
3783 }
3784}