1mod connection;
2mod diff;
3mod mention;
4mod terminal;
5
6use agent_settings::AgentSettings;
7use collections::HashSet;
8pub use connection::*;
9pub use diff::*;
10use language::language_settings::FormatOnSave;
11pub use mention::*;
12use project::lsp_store::{FormatTrigger, LspFormatTarget};
13use serde::{Deserialize, Serialize};
14use settings::Settings as _;
15use task::{Shell, ShellBuilder};
16pub use terminal::*;
17
18use action_log::ActionLog;
19use agent_client_protocol::{self as acp};
20use anyhow::{Context as _, Result, anyhow};
21use editor::Bias;
22use futures::{FutureExt, channel::oneshot, future::BoxFuture};
23use gpui::{AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Task, WeakEntity};
24use itertools::Itertools;
25use language::{Anchor, Buffer, BufferSnapshot, LanguageRegistry, Point, ToPoint, text_diff};
26use markdown::Markdown;
27use project::{AgentLocation, Project, git_store::GitStoreCheckpoint};
28use std::collections::HashMap;
29use std::error::Error;
30use std::fmt::{Formatter, Write};
31use std::ops::Range;
32use std::process::ExitStatus;
33use std::rc::Rc;
34use std::time::{Duration, Instant};
35use std::{fmt::Display, mem, path::PathBuf, sync::Arc};
36use ui::App;
37use util::{ResultExt, get_default_system_shell_preferring_bash, paths::PathStyle};
38use uuid::Uuid;
39
40#[derive(Debug)]
41pub struct UserMessage {
42 pub id: Option<UserMessageId>,
43 pub content: ContentBlock,
44 pub chunks: Vec<acp::ContentBlock>,
45 pub checkpoint: Option<Checkpoint>,
46}
47
48#[derive(Debug)]
49pub struct Checkpoint {
50 git_checkpoint: GitStoreCheckpoint,
51 pub show: bool,
52}
53
54impl UserMessage {
55 fn to_markdown(&self, cx: &App) -> String {
56 let mut markdown = String::new();
57 if self
58 .checkpoint
59 .as_ref()
60 .is_some_and(|checkpoint| checkpoint.show)
61 {
62 writeln!(markdown, "## User (checkpoint)").unwrap();
63 } else {
64 writeln!(markdown, "## User").unwrap();
65 }
66 writeln!(markdown).unwrap();
67 writeln!(markdown, "{}", self.content.to_markdown(cx)).unwrap();
68 writeln!(markdown).unwrap();
69 markdown
70 }
71}
72
73#[derive(Debug, PartialEq)]
74pub struct AssistantMessage {
75 pub chunks: Vec<AssistantMessageChunk>,
76}
77
78impl AssistantMessage {
79 pub fn to_markdown(&self, cx: &App) -> String {
80 format!(
81 "## Assistant\n\n{}\n\n",
82 self.chunks
83 .iter()
84 .map(|chunk| chunk.to_markdown(cx))
85 .join("\n\n")
86 )
87 }
88}
89
90#[derive(Debug, PartialEq)]
91pub enum AssistantMessageChunk {
92 Message { block: ContentBlock },
93 Thought { block: ContentBlock },
94}
95
96impl AssistantMessageChunk {
97 pub fn from_str(
98 chunk: &str,
99 language_registry: &Arc<LanguageRegistry>,
100 path_style: PathStyle,
101 cx: &mut App,
102 ) -> Self {
103 Self::Message {
104 block: ContentBlock::new(chunk.into(), language_registry, path_style, cx),
105 }
106 }
107
108 fn to_markdown(&self, cx: &App) -> String {
109 match self {
110 Self::Message { block } => block.to_markdown(cx).to_string(),
111 Self::Thought { block } => {
112 format!("<thinking>\n{}\n</thinking>", block.to_markdown(cx))
113 }
114 }
115 }
116}
117
118#[derive(Debug)]
119pub enum AgentThreadEntry {
120 UserMessage(UserMessage),
121 AssistantMessage(AssistantMessage),
122 ToolCall(ToolCall),
123}
124
125impl AgentThreadEntry {
126 pub fn to_markdown(&self, cx: &App) -> String {
127 match self {
128 Self::UserMessage(message) => message.to_markdown(cx),
129 Self::AssistantMessage(message) => message.to_markdown(cx),
130 Self::ToolCall(tool_call) => tool_call.to_markdown(cx),
131 }
132 }
133
134 pub fn user_message(&self) -> Option<&UserMessage> {
135 if let AgentThreadEntry::UserMessage(message) = self {
136 Some(message)
137 } else {
138 None
139 }
140 }
141
142 pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
143 if let AgentThreadEntry::ToolCall(call) = self {
144 itertools::Either::Left(call.diffs())
145 } else {
146 itertools::Either::Right(std::iter::empty())
147 }
148 }
149
150 pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
151 if let AgentThreadEntry::ToolCall(call) = self {
152 itertools::Either::Left(call.terminals())
153 } else {
154 itertools::Either::Right(std::iter::empty())
155 }
156 }
157
158 pub fn location(&self, ix: usize) -> Option<(acp::ToolCallLocation, AgentLocation)> {
159 if let AgentThreadEntry::ToolCall(ToolCall {
160 locations,
161 resolved_locations,
162 ..
163 }) = self
164 {
165 Some((
166 locations.get(ix)?.clone(),
167 resolved_locations.get(ix)?.clone()?,
168 ))
169 } else {
170 None
171 }
172 }
173}
174
175#[derive(Debug)]
176pub struct ToolCall {
177 pub id: acp::ToolCallId,
178 pub label: Entity<Markdown>,
179 pub kind: acp::ToolKind,
180 pub content: Vec<ToolCallContent>,
181 pub status: ToolCallStatus,
182 pub locations: Vec<acp::ToolCallLocation>,
183 pub resolved_locations: Vec<Option<AgentLocation>>,
184 pub raw_input: Option<serde_json::Value>,
185 pub raw_output: Option<serde_json::Value>,
186}
187
188impl ToolCall {
189 fn from_acp(
190 tool_call: acp::ToolCall,
191 status: ToolCallStatus,
192 language_registry: Arc<LanguageRegistry>,
193 path_style: PathStyle,
194 terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
195 cx: &mut App,
196 ) -> Result<Self> {
197 let title = if let Some((first_line, _)) = tool_call.title.split_once("\n") {
198 first_line.to_owned() + "…"
199 } else {
200 tool_call.title
201 };
202 let mut content = Vec::with_capacity(tool_call.content.len());
203 for item in tool_call.content {
204 content.push(ToolCallContent::from_acp(
205 item,
206 language_registry.clone(),
207 path_style,
208 terminals,
209 cx,
210 )?);
211 }
212
213 let result = Self {
214 id: tool_call.id,
215 label: cx
216 .new(|cx| Markdown::new(title.into(), Some(language_registry.clone()), None, cx)),
217 kind: tool_call.kind,
218 content,
219 locations: tool_call.locations,
220 resolved_locations: Vec::default(),
221 status,
222 raw_input: tool_call.raw_input,
223 raw_output: tool_call.raw_output,
224 };
225 Ok(result)
226 }
227
228 fn update_fields(
229 &mut self,
230 fields: acp::ToolCallUpdateFields,
231 language_registry: Arc<LanguageRegistry>,
232 path_style: PathStyle,
233 terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
234 cx: &mut App,
235 ) -> Result<()> {
236 let acp::ToolCallUpdateFields {
237 kind,
238 status,
239 title,
240 content,
241 locations,
242 raw_input,
243 raw_output,
244 } = fields;
245
246 if let Some(kind) = kind {
247 self.kind = kind;
248 }
249
250 if let Some(status) = status {
251 self.status = status.into();
252 }
253
254 if let Some(title) = title {
255 self.label.update(cx, |label, cx| {
256 if let Some((first_line, _)) = title.split_once("\n") {
257 label.replace(first_line.to_owned() + "…", cx)
258 } else {
259 label.replace(title, cx);
260 }
261 });
262 }
263
264 if let Some(content) = content {
265 let new_content_len = content.len();
266 let mut content = content.into_iter();
267
268 // Reuse existing content if we can
269 for (old, new) in self.content.iter_mut().zip(content.by_ref()) {
270 old.update_from_acp(new, language_registry.clone(), path_style, terminals, cx)?;
271 }
272 for new in content {
273 self.content.push(ToolCallContent::from_acp(
274 new,
275 language_registry.clone(),
276 path_style,
277 terminals,
278 cx,
279 )?)
280 }
281 self.content.truncate(new_content_len);
282 }
283
284 if let Some(locations) = locations {
285 self.locations = locations;
286 }
287
288 if let Some(raw_input) = raw_input {
289 self.raw_input = Some(raw_input);
290 }
291
292 if let Some(raw_output) = raw_output {
293 if self.content.is_empty()
294 && let Some(markdown) = markdown_for_raw_output(&raw_output, &language_registry, cx)
295 {
296 self.content
297 .push(ToolCallContent::ContentBlock(ContentBlock::Markdown {
298 markdown,
299 }));
300 }
301 self.raw_output = Some(raw_output);
302 }
303 Ok(())
304 }
305
306 pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
307 self.content.iter().filter_map(|content| match content {
308 ToolCallContent::Diff(diff) => Some(diff),
309 ToolCallContent::ContentBlock(_) => None,
310 ToolCallContent::Terminal(_) => None,
311 })
312 }
313
314 pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
315 self.content.iter().filter_map(|content| match content {
316 ToolCallContent::Terminal(terminal) => Some(terminal),
317 ToolCallContent::ContentBlock(_) => None,
318 ToolCallContent::Diff(_) => None,
319 })
320 }
321
322 fn to_markdown(&self, cx: &App) -> String {
323 let mut markdown = format!(
324 "**Tool Call: {}**\nStatus: {}\n\n",
325 self.label.read(cx).source(),
326 self.status
327 );
328 for content in &self.content {
329 markdown.push_str(content.to_markdown(cx).as_str());
330 markdown.push_str("\n\n");
331 }
332 markdown
333 }
334
335 async fn resolve_location(
336 location: acp::ToolCallLocation,
337 project: WeakEntity<Project>,
338 cx: &mut AsyncApp,
339 ) -> Option<ResolvedLocation> {
340 let buffer = project
341 .update(cx, |project, cx| {
342 project
343 .project_path_for_absolute_path(&location.path, cx)
344 .map(|path| project.open_buffer(path, cx))
345 })
346 .ok()??;
347 let buffer = buffer.await.log_err()?;
348 let position = buffer
349 .update(cx, |buffer, _| {
350 if let Some(row) = location.line {
351 let snapshot = buffer.snapshot();
352 let column = snapshot.indent_size_for_line(row).len;
353 let point = snapshot.clip_point(Point::new(row, column), Bias::Left);
354 snapshot.anchor_before(point)
355 } else {
356 Anchor::MIN
357 }
358 })
359 .ok()?;
360
361 Some(ResolvedLocation { buffer, position })
362 }
363
364 fn resolve_locations(
365 &self,
366 project: Entity<Project>,
367 cx: &mut App,
368 ) -> Task<Vec<Option<ResolvedLocation>>> {
369 let locations = self.locations.clone();
370 project.update(cx, |_, cx| {
371 cx.spawn(async move |project, cx| {
372 let mut new_locations = Vec::new();
373 for location in locations {
374 new_locations.push(Self::resolve_location(location, project.clone(), cx).await);
375 }
376 new_locations
377 })
378 })
379 }
380}
381
382// Separate so we can hold a strong reference to the buffer
383// for saving on the thread
384#[derive(Clone, Debug, PartialEq, Eq)]
385struct ResolvedLocation {
386 buffer: Entity<Buffer>,
387 position: Anchor,
388}
389
390impl From<&ResolvedLocation> for AgentLocation {
391 fn from(value: &ResolvedLocation) -> Self {
392 Self {
393 buffer: value.buffer.downgrade(),
394 position: value.position,
395 }
396 }
397}
398
399#[derive(Debug)]
400pub enum ToolCallStatus {
401 /// The tool call hasn't started running yet, but we start showing it to
402 /// the user.
403 Pending,
404 /// The tool call is waiting for confirmation from the user.
405 WaitingForConfirmation {
406 options: Vec<acp::PermissionOption>,
407 respond_tx: oneshot::Sender<acp::PermissionOptionId>,
408 },
409 /// The tool call is currently running.
410 InProgress,
411 /// The tool call completed successfully.
412 Completed,
413 /// The tool call failed.
414 Failed,
415 /// The user rejected the tool call.
416 Rejected,
417 /// The user canceled generation so the tool call was canceled.
418 Canceled,
419}
420
421impl From<acp::ToolCallStatus> for ToolCallStatus {
422 fn from(status: acp::ToolCallStatus) -> Self {
423 match status {
424 acp::ToolCallStatus::Pending => Self::Pending,
425 acp::ToolCallStatus::InProgress => Self::InProgress,
426 acp::ToolCallStatus::Completed => Self::Completed,
427 acp::ToolCallStatus::Failed => Self::Failed,
428 }
429 }
430}
431
432impl Display for ToolCallStatus {
433 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
434 write!(
435 f,
436 "{}",
437 match self {
438 ToolCallStatus::Pending => "Pending",
439 ToolCallStatus::WaitingForConfirmation { .. } => "Waiting for confirmation",
440 ToolCallStatus::InProgress => "In Progress",
441 ToolCallStatus::Completed => "Completed",
442 ToolCallStatus::Failed => "Failed",
443 ToolCallStatus::Rejected => "Rejected",
444 ToolCallStatus::Canceled => "Canceled",
445 }
446 )
447 }
448}
449
450#[derive(Debug, PartialEq, Clone)]
451pub enum ContentBlock {
452 Empty,
453 Markdown { markdown: Entity<Markdown> },
454 ResourceLink { resource_link: acp::ResourceLink },
455}
456
457impl ContentBlock {
458 pub fn new(
459 block: acp::ContentBlock,
460 language_registry: &Arc<LanguageRegistry>,
461 path_style: PathStyle,
462 cx: &mut App,
463 ) -> Self {
464 let mut this = Self::Empty;
465 this.append(block, language_registry, path_style, cx);
466 this
467 }
468
469 pub fn new_combined(
470 blocks: impl IntoIterator<Item = acp::ContentBlock>,
471 language_registry: Arc<LanguageRegistry>,
472 path_style: PathStyle,
473 cx: &mut App,
474 ) -> Self {
475 let mut this = Self::Empty;
476 for block in blocks {
477 this.append(block, &language_registry, path_style, cx);
478 }
479 this
480 }
481
482 pub fn append(
483 &mut self,
484 block: acp::ContentBlock,
485 language_registry: &Arc<LanguageRegistry>,
486 path_style: PathStyle,
487 cx: &mut App,
488 ) {
489 if matches!(self, ContentBlock::Empty)
490 && let acp::ContentBlock::ResourceLink(resource_link) = block
491 {
492 *self = ContentBlock::ResourceLink { resource_link };
493 return;
494 }
495
496 let new_content = self.block_string_contents(block, path_style);
497
498 match self {
499 ContentBlock::Empty => {
500 *self = Self::create_markdown_block(new_content, language_registry, cx);
501 }
502 ContentBlock::Markdown { markdown } => {
503 markdown.update(cx, |markdown, cx| markdown.append(&new_content, cx));
504 }
505 ContentBlock::ResourceLink { resource_link } => {
506 let existing_content = Self::resource_link_md(&resource_link.uri, path_style);
507 let combined = format!("{}\n{}", existing_content, new_content);
508
509 *self = Self::create_markdown_block(combined, language_registry, cx);
510 }
511 }
512 }
513
514 fn create_markdown_block(
515 content: String,
516 language_registry: &Arc<LanguageRegistry>,
517 cx: &mut App,
518 ) -> ContentBlock {
519 ContentBlock::Markdown {
520 markdown: cx
521 .new(|cx| Markdown::new(content.into(), Some(language_registry.clone()), None, cx)),
522 }
523 }
524
525 fn block_string_contents(&self, block: acp::ContentBlock, path_style: PathStyle) -> String {
526 match block {
527 acp::ContentBlock::Text(text_content) => text_content.text,
528 acp::ContentBlock::ResourceLink(resource_link) => {
529 Self::resource_link_md(&resource_link.uri, path_style)
530 }
531 acp::ContentBlock::Resource(acp::EmbeddedResource {
532 resource:
533 acp::EmbeddedResourceResource::TextResourceContents(acp::TextResourceContents {
534 uri,
535 ..
536 }),
537 ..
538 }) => Self::resource_link_md(&uri, path_style),
539 acp::ContentBlock::Image(image) => Self::image_md(&image),
540 acp::ContentBlock::Audio(_) | acp::ContentBlock::Resource(_) => String::new(),
541 }
542 }
543
544 fn resource_link_md(uri: &str, path_style: PathStyle) -> String {
545 if let Some(uri) = MentionUri::parse(uri, path_style).log_err() {
546 uri.as_link().to_string()
547 } else {
548 uri.to_string()
549 }
550 }
551
552 fn image_md(_image: &acp::ImageContent) -> String {
553 "`Image`".into()
554 }
555
556 pub fn to_markdown<'a>(&'a self, cx: &'a App) -> &'a str {
557 match self {
558 ContentBlock::Empty => "",
559 ContentBlock::Markdown { markdown } => markdown.read(cx).source(),
560 ContentBlock::ResourceLink { resource_link } => &resource_link.uri,
561 }
562 }
563
564 pub fn markdown(&self) -> Option<&Entity<Markdown>> {
565 match self {
566 ContentBlock::Empty => None,
567 ContentBlock::Markdown { markdown } => Some(markdown),
568 ContentBlock::ResourceLink { .. } => None,
569 }
570 }
571
572 pub fn resource_link(&self) -> Option<&acp::ResourceLink> {
573 match self {
574 ContentBlock::ResourceLink { resource_link } => Some(resource_link),
575 _ => None,
576 }
577 }
578}
579
580#[derive(Debug)]
581pub enum ToolCallContent {
582 ContentBlock(ContentBlock),
583 Diff(Entity<Diff>),
584 Terminal(Entity<Terminal>),
585}
586
587impl ToolCallContent {
588 pub fn from_acp(
589 content: acp::ToolCallContent,
590 language_registry: Arc<LanguageRegistry>,
591 path_style: PathStyle,
592 terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
593 cx: &mut App,
594 ) -> Result<Self> {
595 match content {
596 acp::ToolCallContent::Content { content } => Ok(Self::ContentBlock(ContentBlock::new(
597 content,
598 &language_registry,
599 path_style,
600 cx,
601 ))),
602 acp::ToolCallContent::Diff { diff } => Ok(Self::Diff(cx.new(|cx| {
603 Diff::finalized(
604 diff.path.to_string_lossy().into_owned(),
605 diff.old_text,
606 diff.new_text,
607 language_registry,
608 cx,
609 )
610 }))),
611 acp::ToolCallContent::Terminal { terminal_id } => terminals
612 .get(&terminal_id)
613 .cloned()
614 .map(Self::Terminal)
615 .ok_or_else(|| anyhow::anyhow!("Terminal with id `{}` not found", terminal_id)),
616 }
617 }
618
619 pub fn update_from_acp(
620 &mut self,
621 new: acp::ToolCallContent,
622 language_registry: Arc<LanguageRegistry>,
623 path_style: PathStyle,
624 terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
625 cx: &mut App,
626 ) -> Result<()> {
627 let needs_update = match (&self, &new) {
628 (Self::Diff(old_diff), acp::ToolCallContent::Diff { diff: new_diff }) => {
629 old_diff.read(cx).needs_update(
630 new_diff.old_text.as_deref().unwrap_or(""),
631 &new_diff.new_text,
632 cx,
633 )
634 }
635 _ => true,
636 };
637
638 if needs_update {
639 *self = Self::from_acp(new, language_registry, path_style, terminals, cx)?;
640 }
641 Ok(())
642 }
643
644 pub fn to_markdown(&self, cx: &App) -> String {
645 match self {
646 Self::ContentBlock(content) => content.to_markdown(cx).to_string(),
647 Self::Diff(diff) => diff.read(cx).to_markdown(cx),
648 Self::Terminal(terminal) => terminal.read(cx).to_markdown(cx),
649 }
650 }
651}
652
653#[derive(Debug, PartialEq)]
654pub enum ToolCallUpdate {
655 UpdateFields(acp::ToolCallUpdate),
656 UpdateDiff(ToolCallUpdateDiff),
657 UpdateTerminal(ToolCallUpdateTerminal),
658}
659
660impl ToolCallUpdate {
661 fn id(&self) -> &acp::ToolCallId {
662 match self {
663 Self::UpdateFields(update) => &update.id,
664 Self::UpdateDiff(diff) => &diff.id,
665 Self::UpdateTerminal(terminal) => &terminal.id,
666 }
667 }
668}
669
670impl From<acp::ToolCallUpdate> for ToolCallUpdate {
671 fn from(update: acp::ToolCallUpdate) -> Self {
672 Self::UpdateFields(update)
673 }
674}
675
676impl From<ToolCallUpdateDiff> for ToolCallUpdate {
677 fn from(diff: ToolCallUpdateDiff) -> Self {
678 Self::UpdateDiff(diff)
679 }
680}
681
682#[derive(Debug, PartialEq)]
683pub struct ToolCallUpdateDiff {
684 pub id: acp::ToolCallId,
685 pub diff: Entity<Diff>,
686}
687
688impl From<ToolCallUpdateTerminal> for ToolCallUpdate {
689 fn from(terminal: ToolCallUpdateTerminal) -> Self {
690 Self::UpdateTerminal(terminal)
691 }
692}
693
694#[derive(Debug, PartialEq)]
695pub struct ToolCallUpdateTerminal {
696 pub id: acp::ToolCallId,
697 pub terminal: Entity<Terminal>,
698}
699
700#[derive(Debug, Default)]
701pub struct Plan {
702 pub entries: Vec<PlanEntry>,
703}
704
705#[derive(Debug)]
706pub struct PlanStats<'a> {
707 pub in_progress_entry: Option<&'a PlanEntry>,
708 pub pending: u32,
709 pub completed: u32,
710}
711
712impl Plan {
713 pub fn is_empty(&self) -> bool {
714 self.entries.is_empty()
715 }
716
717 pub fn stats(&self) -> PlanStats<'_> {
718 let mut stats = PlanStats {
719 in_progress_entry: None,
720 pending: 0,
721 completed: 0,
722 };
723
724 for entry in &self.entries {
725 match &entry.status {
726 acp::PlanEntryStatus::Pending => {
727 stats.pending += 1;
728 }
729 acp::PlanEntryStatus::InProgress => {
730 stats.in_progress_entry = stats.in_progress_entry.or(Some(entry));
731 }
732 acp::PlanEntryStatus::Completed => {
733 stats.completed += 1;
734 }
735 }
736 }
737
738 stats
739 }
740}
741
742#[derive(Debug)]
743pub struct PlanEntry {
744 pub content: Entity<Markdown>,
745 pub priority: acp::PlanEntryPriority,
746 pub status: acp::PlanEntryStatus,
747}
748
749impl PlanEntry {
750 pub fn from_acp(entry: acp::PlanEntry, cx: &mut App) -> Self {
751 Self {
752 content: cx.new(|cx| Markdown::new(entry.content.into(), None, None, cx)),
753 priority: entry.priority,
754 status: entry.status,
755 }
756 }
757}
758
759#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
760pub struct TokenUsage {
761 pub max_tokens: u64,
762 pub used_tokens: u64,
763}
764
765impl TokenUsage {
766 pub fn ratio(&self) -> TokenUsageRatio {
767 #[cfg(debug_assertions)]
768 let warning_threshold: f32 = std::env::var("ZED_THREAD_WARNING_THRESHOLD")
769 .unwrap_or("0.8".to_string())
770 .parse()
771 .unwrap();
772 #[cfg(not(debug_assertions))]
773 let warning_threshold: f32 = 0.8;
774
775 // When the maximum is unknown because there is no selected model,
776 // avoid showing the token limit warning.
777 if self.max_tokens == 0 {
778 TokenUsageRatio::Normal
779 } else if self.used_tokens >= self.max_tokens {
780 TokenUsageRatio::Exceeded
781 } else if self.used_tokens as f32 / self.max_tokens as f32 >= warning_threshold {
782 TokenUsageRatio::Warning
783 } else {
784 TokenUsageRatio::Normal
785 }
786 }
787}
788
789#[derive(Debug, Clone, PartialEq, Eq)]
790pub enum TokenUsageRatio {
791 Normal,
792 Warning,
793 Exceeded,
794}
795
796#[derive(Debug, Clone)]
797pub struct RetryStatus {
798 pub last_error: SharedString,
799 pub attempt: usize,
800 pub max_attempts: usize,
801 pub started_at: Instant,
802 pub duration: Duration,
803}
804
805pub struct AcpThread {
806 title: SharedString,
807 entries: Vec<AgentThreadEntry>,
808 plan: Plan,
809 project: Entity<Project>,
810 action_log: Entity<ActionLog>,
811 shared_buffers: HashMap<Entity<Buffer>, BufferSnapshot>,
812 send_task: Option<Task<()>>,
813 connection: Rc<dyn AgentConnection>,
814 session_id: acp::SessionId,
815 token_usage: Option<TokenUsage>,
816 prompt_capabilities: acp::PromptCapabilities,
817 _observe_prompt_capabilities: Task<anyhow::Result<()>>,
818 terminals: HashMap<acp::TerminalId, Entity<Terminal>>,
819 pending_terminal_output: HashMap<acp::TerminalId, Vec<Vec<u8>>>,
820 pending_terminal_exit: HashMap<acp::TerminalId, acp::TerminalExitStatus>,
821}
822
823#[derive(Debug)]
824pub enum AcpThreadEvent {
825 NewEntry,
826 TitleUpdated,
827 TokenUsageUpdated,
828 EntryUpdated(usize),
829 EntriesRemoved(Range<usize>),
830 ToolAuthorizationRequired,
831 Retry(RetryStatus),
832 Stopped,
833 Error,
834 LoadError(LoadError),
835 PromptCapabilitiesUpdated,
836 Refusal,
837 AvailableCommandsUpdated(Vec<acp::AvailableCommand>),
838 ModeUpdated(acp::SessionModeId),
839}
840
841impl EventEmitter<AcpThreadEvent> for AcpThread {}
842
843#[derive(Debug, Clone)]
844pub enum TerminalProviderEvent {
845 Created {
846 terminal_id: acp::TerminalId,
847 label: String,
848 cwd: Option<PathBuf>,
849 output_byte_limit: Option<u64>,
850 terminal: Entity<::terminal::Terminal>,
851 },
852 Output {
853 terminal_id: acp::TerminalId,
854 data: Vec<u8>,
855 },
856 TitleChanged {
857 terminal_id: acp::TerminalId,
858 title: String,
859 },
860 Exit {
861 terminal_id: acp::TerminalId,
862 status: acp::TerminalExitStatus,
863 },
864}
865
866#[derive(Debug, Clone)]
867pub enum TerminalProviderCommand {
868 WriteInput {
869 terminal_id: acp::TerminalId,
870 bytes: Vec<u8>,
871 },
872 Resize {
873 terminal_id: acp::TerminalId,
874 cols: u16,
875 rows: u16,
876 },
877 Close {
878 terminal_id: acp::TerminalId,
879 },
880}
881
882impl AcpThread {
883 pub fn on_terminal_provider_event(
884 &mut self,
885 event: TerminalProviderEvent,
886 cx: &mut Context<Self>,
887 ) {
888 match event {
889 TerminalProviderEvent::Created {
890 terminal_id,
891 label,
892 cwd,
893 output_byte_limit,
894 terminal,
895 } => {
896 let entity = self.register_terminal_created(
897 terminal_id.clone(),
898 label,
899 cwd,
900 output_byte_limit,
901 terminal,
902 cx,
903 );
904
905 if let Some(mut chunks) = self.pending_terminal_output.remove(&terminal_id) {
906 for data in chunks.drain(..) {
907 entity.update(cx, |term, cx| {
908 term.inner().update(cx, |inner, cx| {
909 inner.write_output(&data, cx);
910 })
911 });
912 }
913 }
914
915 if let Some(_status) = self.pending_terminal_exit.remove(&terminal_id) {
916 entity.update(cx, |_term, cx| {
917 cx.notify();
918 });
919 }
920
921 cx.notify();
922 }
923 TerminalProviderEvent::Output { terminal_id, data } => {
924 if let Some(entity) = self.terminals.get(&terminal_id) {
925 entity.update(cx, |term, cx| {
926 term.inner().update(cx, |inner, cx| {
927 inner.write_output(&data, cx);
928 })
929 });
930 } else {
931 self.pending_terminal_output
932 .entry(terminal_id)
933 .or_default()
934 .push(data);
935 }
936 }
937 TerminalProviderEvent::TitleChanged { terminal_id, title } => {
938 if let Some(entity) = self.terminals.get(&terminal_id) {
939 entity.update(cx, |term, cx| {
940 term.inner().update(cx, |inner, cx| {
941 inner.breadcrumb_text = title;
942 cx.emit(::terminal::Event::BreadcrumbsChanged);
943 })
944 });
945 }
946 }
947 TerminalProviderEvent::Exit {
948 terminal_id,
949 status,
950 } => {
951 if let Some(entity) = self.terminals.get(&terminal_id) {
952 entity.update(cx, |_term, cx| {
953 cx.notify();
954 });
955 } else {
956 self.pending_terminal_exit.insert(terminal_id, status);
957 }
958 }
959 }
960 }
961}
962
963#[derive(PartialEq, Eq, Debug)]
964pub enum ThreadStatus {
965 Idle,
966 Generating,
967}
968
969#[derive(Debug, Clone)]
970pub enum LoadError {
971 Unsupported {
972 command: SharedString,
973 current_version: SharedString,
974 minimum_version: SharedString,
975 },
976 FailedToInstall(SharedString),
977 Exited {
978 status: ExitStatus,
979 },
980 Other(SharedString),
981}
982
983impl Display for LoadError {
984 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
985 match self {
986 LoadError::Unsupported {
987 command: path,
988 current_version,
989 minimum_version,
990 } => {
991 write!(
992 f,
993 "version {current_version} from {path} is not supported (need at least {minimum_version})"
994 )
995 }
996 LoadError::FailedToInstall(msg) => write!(f, "Failed to install: {msg}"),
997 LoadError::Exited { status } => write!(f, "Server exited with status {status}"),
998 LoadError::Other(msg) => write!(f, "{msg}"),
999 }
1000 }
1001}
1002
1003impl Error for LoadError {}
1004
1005impl AcpThread {
1006 pub fn new(
1007 title: impl Into<SharedString>,
1008 connection: Rc<dyn AgentConnection>,
1009 project: Entity<Project>,
1010 action_log: Entity<ActionLog>,
1011 session_id: acp::SessionId,
1012 mut prompt_capabilities_rx: watch::Receiver<acp::PromptCapabilities>,
1013 cx: &mut Context<Self>,
1014 ) -> Self {
1015 let prompt_capabilities = prompt_capabilities_rx.borrow().clone();
1016 let task = cx.spawn::<_, anyhow::Result<()>>(async move |this, cx| {
1017 loop {
1018 let caps = prompt_capabilities_rx.recv().await?;
1019 this.update(cx, |this, cx| {
1020 this.prompt_capabilities = caps;
1021 cx.emit(AcpThreadEvent::PromptCapabilitiesUpdated);
1022 })?;
1023 }
1024 });
1025
1026 Self {
1027 action_log,
1028 shared_buffers: Default::default(),
1029 entries: Default::default(),
1030 plan: Default::default(),
1031 title: title.into(),
1032 project,
1033 send_task: None,
1034 connection,
1035 session_id,
1036 token_usage: None,
1037 prompt_capabilities,
1038 _observe_prompt_capabilities: task,
1039 terminals: HashMap::default(),
1040 pending_terminal_output: HashMap::default(),
1041 pending_terminal_exit: HashMap::default(),
1042 }
1043 }
1044
1045 pub fn prompt_capabilities(&self) -> acp::PromptCapabilities {
1046 self.prompt_capabilities.clone()
1047 }
1048
1049 pub fn connection(&self) -> &Rc<dyn AgentConnection> {
1050 &self.connection
1051 }
1052
1053 pub fn action_log(&self) -> &Entity<ActionLog> {
1054 &self.action_log
1055 }
1056
1057 pub fn project(&self) -> &Entity<Project> {
1058 &self.project
1059 }
1060
1061 pub fn title(&self) -> SharedString {
1062 self.title.clone()
1063 }
1064
1065 pub fn entries(&self) -> &[AgentThreadEntry] {
1066 &self.entries
1067 }
1068
1069 pub fn session_id(&self) -> &acp::SessionId {
1070 &self.session_id
1071 }
1072
1073 pub fn status(&self) -> ThreadStatus {
1074 if self.send_task.is_some() {
1075 ThreadStatus::Generating
1076 } else {
1077 ThreadStatus::Idle
1078 }
1079 }
1080
1081 pub fn token_usage(&self) -> Option<&TokenUsage> {
1082 self.token_usage.as_ref()
1083 }
1084
1085 pub fn has_pending_edit_tool_calls(&self) -> bool {
1086 for entry in self.entries.iter().rev() {
1087 match entry {
1088 AgentThreadEntry::UserMessage(_) => return false,
1089 AgentThreadEntry::ToolCall(
1090 call @ ToolCall {
1091 status: ToolCallStatus::InProgress | ToolCallStatus::Pending,
1092 ..
1093 },
1094 ) if call.diffs().next().is_some() => {
1095 return true;
1096 }
1097 AgentThreadEntry::ToolCall(_) | AgentThreadEntry::AssistantMessage(_) => {}
1098 }
1099 }
1100
1101 false
1102 }
1103
1104 pub fn used_tools_since_last_user_message(&self) -> bool {
1105 for entry in self.entries.iter().rev() {
1106 match entry {
1107 AgentThreadEntry::UserMessage(..) => return false,
1108 AgentThreadEntry::AssistantMessage(..) => continue,
1109 AgentThreadEntry::ToolCall(..) => return true,
1110 }
1111 }
1112
1113 false
1114 }
1115
1116 pub fn handle_session_update(
1117 &mut self,
1118 update: acp::SessionUpdate,
1119 cx: &mut Context<Self>,
1120 ) -> Result<(), acp::Error> {
1121 match update {
1122 acp::SessionUpdate::UserMessageChunk(acp::ContentChunk { content, .. }) => {
1123 self.push_user_content_block(None, content, cx);
1124 }
1125 acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk { content, .. }) => {
1126 self.push_assistant_content_block(content, false, cx);
1127 }
1128 acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk { content, .. }) => {
1129 self.push_assistant_content_block(content, true, cx);
1130 }
1131 acp::SessionUpdate::ToolCall(tool_call) => {
1132 self.upsert_tool_call(tool_call, cx)?;
1133 }
1134 acp::SessionUpdate::ToolCallUpdate(tool_call_update) => {
1135 self.update_tool_call(tool_call_update, cx)?;
1136 }
1137 acp::SessionUpdate::Plan(plan) => {
1138 self.update_plan(plan, cx);
1139 }
1140 acp::SessionUpdate::AvailableCommandsUpdate(acp::AvailableCommandsUpdate {
1141 available_commands,
1142 ..
1143 }) => cx.emit(AcpThreadEvent::AvailableCommandsUpdated(available_commands)),
1144 acp::SessionUpdate::CurrentModeUpdate(acp::CurrentModeUpdate {
1145 current_mode_id,
1146 ..
1147 }) => cx.emit(AcpThreadEvent::ModeUpdated(current_mode_id)),
1148 }
1149 Ok(())
1150 }
1151
1152 pub fn push_user_content_block(
1153 &mut self,
1154 message_id: Option<UserMessageId>,
1155 chunk: acp::ContentBlock,
1156 cx: &mut Context<Self>,
1157 ) {
1158 let language_registry = self.project.read(cx).languages().clone();
1159 let path_style = self.project.read(cx).path_style(cx);
1160 let entries_len = self.entries.len();
1161
1162 if let Some(last_entry) = self.entries.last_mut()
1163 && let AgentThreadEntry::UserMessage(UserMessage {
1164 id,
1165 content,
1166 chunks,
1167 ..
1168 }) = last_entry
1169 {
1170 *id = message_id.or(id.take());
1171 content.append(chunk.clone(), &language_registry, path_style, cx);
1172 chunks.push(chunk);
1173 let idx = entries_len - 1;
1174 cx.emit(AcpThreadEvent::EntryUpdated(idx));
1175 } else {
1176 let content = ContentBlock::new(chunk.clone(), &language_registry, path_style, cx);
1177 self.push_entry(
1178 AgentThreadEntry::UserMessage(UserMessage {
1179 id: message_id,
1180 content,
1181 chunks: vec![chunk],
1182 checkpoint: None,
1183 }),
1184 cx,
1185 );
1186 }
1187 }
1188
1189 pub fn push_assistant_content_block(
1190 &mut self,
1191 chunk: acp::ContentBlock,
1192 is_thought: bool,
1193 cx: &mut Context<Self>,
1194 ) {
1195 let language_registry = self.project.read(cx).languages().clone();
1196 let path_style = self.project.read(cx).path_style(cx);
1197 let entries_len = self.entries.len();
1198 if let Some(last_entry) = self.entries.last_mut()
1199 && let AgentThreadEntry::AssistantMessage(AssistantMessage { chunks }) = last_entry
1200 {
1201 let idx = entries_len - 1;
1202 cx.emit(AcpThreadEvent::EntryUpdated(idx));
1203 match (chunks.last_mut(), is_thought) {
1204 (Some(AssistantMessageChunk::Message { block }), false)
1205 | (Some(AssistantMessageChunk::Thought { block }), true) => {
1206 block.append(chunk, &language_registry, path_style, cx)
1207 }
1208 _ => {
1209 let block = ContentBlock::new(chunk, &language_registry, path_style, cx);
1210 if is_thought {
1211 chunks.push(AssistantMessageChunk::Thought { block })
1212 } else {
1213 chunks.push(AssistantMessageChunk::Message { block })
1214 }
1215 }
1216 }
1217 } else {
1218 let block = ContentBlock::new(chunk, &language_registry, path_style, cx);
1219 let chunk = if is_thought {
1220 AssistantMessageChunk::Thought { block }
1221 } else {
1222 AssistantMessageChunk::Message { block }
1223 };
1224
1225 self.push_entry(
1226 AgentThreadEntry::AssistantMessage(AssistantMessage {
1227 chunks: vec![chunk],
1228 }),
1229 cx,
1230 );
1231 }
1232 }
1233
1234 fn push_entry(&mut self, entry: AgentThreadEntry, cx: &mut Context<Self>) {
1235 self.entries.push(entry);
1236 cx.emit(AcpThreadEvent::NewEntry);
1237 }
1238
1239 pub fn can_set_title(&mut self, cx: &mut Context<Self>) -> bool {
1240 self.connection.set_title(&self.session_id, cx).is_some()
1241 }
1242
1243 pub fn set_title(&mut self, title: SharedString, cx: &mut Context<Self>) -> Task<Result<()>> {
1244 if title != self.title {
1245 self.title = title.clone();
1246 cx.emit(AcpThreadEvent::TitleUpdated);
1247 if let Some(set_title) = self.connection.set_title(&self.session_id, cx) {
1248 return set_title.run(title, cx);
1249 }
1250 }
1251 Task::ready(Ok(()))
1252 }
1253
1254 pub fn update_token_usage(&mut self, usage: Option<TokenUsage>, cx: &mut Context<Self>) {
1255 self.token_usage = usage;
1256 cx.emit(AcpThreadEvent::TokenUsageUpdated);
1257 }
1258
1259 pub fn update_retry_status(&mut self, status: RetryStatus, cx: &mut Context<Self>) {
1260 cx.emit(AcpThreadEvent::Retry(status));
1261 }
1262
1263 pub fn update_tool_call(
1264 &mut self,
1265 update: impl Into<ToolCallUpdate>,
1266 cx: &mut Context<Self>,
1267 ) -> Result<()> {
1268 let update = update.into();
1269 let languages = self.project.read(cx).languages().clone();
1270 let path_style = self.project.read(cx).path_style(cx);
1271
1272 let ix = match self.index_for_tool_call(update.id()) {
1273 Some(ix) => ix,
1274 None => {
1275 // Tool call not found - create a failed tool call entry
1276 let failed_tool_call = ToolCall {
1277 id: update.id().clone(),
1278 label: cx.new(|cx| Markdown::new("Tool call not found".into(), None, None, cx)),
1279 kind: acp::ToolKind::Fetch,
1280 content: vec![ToolCallContent::ContentBlock(ContentBlock::new(
1281 acp::ContentBlock::Text(acp::TextContent {
1282 text: "Tool call not found".to_string(),
1283 annotations: None,
1284 meta: None,
1285 }),
1286 &languages,
1287 path_style,
1288 cx,
1289 ))],
1290 status: ToolCallStatus::Failed,
1291 locations: Vec::new(),
1292 resolved_locations: Vec::new(),
1293 raw_input: None,
1294 raw_output: None,
1295 };
1296 self.push_entry(AgentThreadEntry::ToolCall(failed_tool_call), cx);
1297 return Ok(());
1298 }
1299 };
1300 let AgentThreadEntry::ToolCall(call) = &mut self.entries[ix] else {
1301 unreachable!()
1302 };
1303
1304 match update {
1305 ToolCallUpdate::UpdateFields(update) => {
1306 let location_updated = update.fields.locations.is_some();
1307 call.update_fields(update.fields, languages, path_style, &self.terminals, cx)?;
1308 if location_updated {
1309 self.resolve_locations(update.id, cx);
1310 }
1311 }
1312 ToolCallUpdate::UpdateDiff(update) => {
1313 call.content.clear();
1314 call.content.push(ToolCallContent::Diff(update.diff));
1315 }
1316 ToolCallUpdate::UpdateTerminal(update) => {
1317 call.content.clear();
1318 call.content
1319 .push(ToolCallContent::Terminal(update.terminal));
1320 }
1321 }
1322
1323 cx.emit(AcpThreadEvent::EntryUpdated(ix));
1324
1325 Ok(())
1326 }
1327
1328 /// Updates a tool call if id matches an existing entry, otherwise inserts a new one.
1329 pub fn upsert_tool_call(
1330 &mut self,
1331 tool_call: acp::ToolCall,
1332 cx: &mut Context<Self>,
1333 ) -> Result<(), acp::Error> {
1334 let status = tool_call.status.into();
1335 self.upsert_tool_call_inner(tool_call.into(), status, cx)
1336 }
1337
1338 /// Fails if id does not match an existing entry.
1339 pub fn upsert_tool_call_inner(
1340 &mut self,
1341 update: acp::ToolCallUpdate,
1342 status: ToolCallStatus,
1343 cx: &mut Context<Self>,
1344 ) -> Result<(), acp::Error> {
1345 let language_registry = self.project.read(cx).languages().clone();
1346 let path_style = self.project.read(cx).path_style(cx);
1347 let id = update.id.clone();
1348
1349 if let Some(ix) = self.index_for_tool_call(&id) {
1350 let AgentThreadEntry::ToolCall(call) = &mut self.entries[ix] else {
1351 unreachable!()
1352 };
1353
1354 call.update_fields(
1355 update.fields,
1356 language_registry,
1357 path_style,
1358 &self.terminals,
1359 cx,
1360 )?;
1361 call.status = status;
1362
1363 cx.emit(AcpThreadEvent::EntryUpdated(ix));
1364 } else {
1365 let call = ToolCall::from_acp(
1366 update.try_into()?,
1367 status,
1368 language_registry,
1369 self.project.read(cx).path_style(cx),
1370 &self.terminals,
1371 cx,
1372 )?;
1373 self.push_entry(AgentThreadEntry::ToolCall(call), cx);
1374 };
1375
1376 self.resolve_locations(id, cx);
1377 Ok(())
1378 }
1379
1380 fn index_for_tool_call(&self, id: &acp::ToolCallId) -> Option<usize> {
1381 self.entries
1382 .iter()
1383 .enumerate()
1384 .rev()
1385 .find_map(|(index, entry)| {
1386 if let AgentThreadEntry::ToolCall(tool_call) = entry
1387 && &tool_call.id == id
1388 {
1389 Some(index)
1390 } else {
1391 None
1392 }
1393 })
1394 }
1395
1396 fn tool_call_mut(&mut self, id: &acp::ToolCallId) -> Option<(usize, &mut ToolCall)> {
1397 // The tool call we are looking for is typically the last one, or very close to the end.
1398 // At the moment, it doesn't seem like a hashmap would be a good fit for this use case.
1399 self.entries
1400 .iter_mut()
1401 .enumerate()
1402 .rev()
1403 .find_map(|(index, tool_call)| {
1404 if let AgentThreadEntry::ToolCall(tool_call) = tool_call
1405 && &tool_call.id == id
1406 {
1407 Some((index, tool_call))
1408 } else {
1409 None
1410 }
1411 })
1412 }
1413
1414 pub fn tool_call(&mut self, id: &acp::ToolCallId) -> Option<(usize, &ToolCall)> {
1415 self.entries
1416 .iter()
1417 .enumerate()
1418 .rev()
1419 .find_map(|(index, tool_call)| {
1420 if let AgentThreadEntry::ToolCall(tool_call) = tool_call
1421 && &tool_call.id == id
1422 {
1423 Some((index, tool_call))
1424 } else {
1425 None
1426 }
1427 })
1428 }
1429
1430 pub fn resolve_locations(&mut self, id: acp::ToolCallId, cx: &mut Context<Self>) {
1431 let project = self.project.clone();
1432 let Some((_, tool_call)) = self.tool_call_mut(&id) else {
1433 return;
1434 };
1435 let task = tool_call.resolve_locations(project, cx);
1436 cx.spawn(async move |this, cx| {
1437 let resolved_locations = task.await;
1438
1439 this.update(cx, |this, cx| {
1440 let project = this.project.clone();
1441
1442 for location in resolved_locations.iter().flatten() {
1443 this.shared_buffers
1444 .insert(location.buffer.clone(), location.buffer.read(cx).snapshot());
1445 }
1446 let Some((ix, tool_call)) = this.tool_call_mut(&id) else {
1447 return;
1448 };
1449
1450 if let Some(Some(location)) = resolved_locations.last() {
1451 project.update(cx, |project, cx| {
1452 let should_ignore = if let Some(agent_location) = project
1453 .agent_location()
1454 .filter(|agent_location| agent_location.buffer == location.buffer)
1455 {
1456 let snapshot = location.buffer.read(cx).snapshot();
1457 let old_position = agent_location.position.to_point(&snapshot);
1458 let new_position = location.position.to_point(&snapshot);
1459
1460 // ignore this so that when we get updates from the edit tool
1461 // the position doesn't reset to the startof line
1462 old_position.row == new_position.row
1463 && old_position.column > new_position.column
1464 } else {
1465 false
1466 };
1467 if !should_ignore {
1468 project.set_agent_location(Some(location.into()), cx);
1469 }
1470 });
1471 }
1472
1473 let resolved_locations = resolved_locations
1474 .iter()
1475 .map(|l| l.as_ref().map(|l| AgentLocation::from(l)))
1476 .collect::<Vec<_>>();
1477
1478 if tool_call.resolved_locations != resolved_locations {
1479 tool_call.resolved_locations = resolved_locations;
1480 cx.emit(AcpThreadEvent::EntryUpdated(ix));
1481 }
1482 })
1483 })
1484 .detach();
1485 }
1486
1487 pub fn request_tool_call_authorization(
1488 &mut self,
1489 tool_call: acp::ToolCallUpdate,
1490 options: Vec<acp::PermissionOption>,
1491 respect_always_allow_setting: bool,
1492 cx: &mut Context<Self>,
1493 ) -> Result<BoxFuture<'static, acp::RequestPermissionOutcome>> {
1494 let (tx, rx) = oneshot::channel();
1495
1496 if respect_always_allow_setting && AgentSettings::get_global(cx).always_allow_tool_actions {
1497 // Don't use AllowAlways, because then if you were to turn off always_allow_tool_actions,
1498 // some tools would (incorrectly) continue to auto-accept.
1499 if let Some(allow_once_option) = options.iter().find_map(|option| {
1500 if matches!(option.kind, acp::PermissionOptionKind::AllowOnce) {
1501 Some(option.id.clone())
1502 } else {
1503 None
1504 }
1505 }) {
1506 self.upsert_tool_call_inner(tool_call, ToolCallStatus::Pending, cx)?;
1507 return Ok(async {
1508 acp::RequestPermissionOutcome::Selected {
1509 option_id: allow_once_option,
1510 }
1511 }
1512 .boxed());
1513 }
1514 }
1515
1516 let status = ToolCallStatus::WaitingForConfirmation {
1517 options,
1518 respond_tx: tx,
1519 };
1520
1521 self.upsert_tool_call_inner(tool_call, status, cx)?;
1522 cx.emit(AcpThreadEvent::ToolAuthorizationRequired);
1523
1524 let fut = async {
1525 match rx.await {
1526 Ok(option) => acp::RequestPermissionOutcome::Selected { option_id: option },
1527 Err(oneshot::Canceled) => acp::RequestPermissionOutcome::Cancelled,
1528 }
1529 }
1530 .boxed();
1531
1532 Ok(fut)
1533 }
1534
1535 pub fn authorize_tool_call(
1536 &mut self,
1537 id: acp::ToolCallId,
1538 option_id: acp::PermissionOptionId,
1539 option_kind: acp::PermissionOptionKind,
1540 cx: &mut Context<Self>,
1541 ) {
1542 let Some((ix, call)) = self.tool_call_mut(&id) else {
1543 return;
1544 };
1545
1546 let new_status = match option_kind {
1547 acp::PermissionOptionKind::RejectOnce | acp::PermissionOptionKind::RejectAlways => {
1548 ToolCallStatus::Rejected
1549 }
1550 acp::PermissionOptionKind::AllowOnce | acp::PermissionOptionKind::AllowAlways => {
1551 ToolCallStatus::InProgress
1552 }
1553 };
1554
1555 let curr_status = mem::replace(&mut call.status, new_status);
1556
1557 if let ToolCallStatus::WaitingForConfirmation { respond_tx, .. } = curr_status {
1558 respond_tx.send(option_id).log_err();
1559 } else if cfg!(debug_assertions) {
1560 panic!("tried to authorize an already authorized tool call");
1561 }
1562
1563 cx.emit(AcpThreadEvent::EntryUpdated(ix));
1564 }
1565
1566 pub fn first_tool_awaiting_confirmation(&self) -> Option<&ToolCall> {
1567 let mut first_tool_call = None;
1568
1569 for entry in self.entries.iter().rev() {
1570 match &entry {
1571 AgentThreadEntry::ToolCall(call) => {
1572 if let ToolCallStatus::WaitingForConfirmation { .. } = call.status {
1573 first_tool_call = Some(call);
1574 } else {
1575 continue;
1576 }
1577 }
1578 AgentThreadEntry::UserMessage(_) | AgentThreadEntry::AssistantMessage(_) => {
1579 // Reached the beginning of the turn.
1580 // If we had pending permission requests in the previous turn, they have been cancelled.
1581 break;
1582 }
1583 }
1584 }
1585
1586 first_tool_call
1587 }
1588
1589 pub fn plan(&self) -> &Plan {
1590 &self.plan
1591 }
1592
1593 pub fn update_plan(&mut self, request: acp::Plan, cx: &mut Context<Self>) {
1594 let new_entries_len = request.entries.len();
1595 let mut new_entries = request.entries.into_iter();
1596
1597 // Reuse existing markdown to prevent flickering
1598 for (old, new) in self.plan.entries.iter_mut().zip(new_entries.by_ref()) {
1599 let PlanEntry {
1600 content,
1601 priority,
1602 status,
1603 } = old;
1604 content.update(cx, |old, cx| {
1605 old.replace(new.content, cx);
1606 });
1607 *priority = new.priority;
1608 *status = new.status;
1609 }
1610 for new in new_entries {
1611 self.plan.entries.push(PlanEntry::from_acp(new, cx))
1612 }
1613 self.plan.entries.truncate(new_entries_len);
1614
1615 cx.notify();
1616 }
1617
1618 fn clear_completed_plan_entries(&mut self, cx: &mut Context<Self>) {
1619 self.plan
1620 .entries
1621 .retain(|entry| !matches!(entry.status, acp::PlanEntryStatus::Completed));
1622 cx.notify();
1623 }
1624
1625 #[cfg(any(test, feature = "test-support"))]
1626 pub fn send_raw(
1627 &mut self,
1628 message: &str,
1629 cx: &mut Context<Self>,
1630 ) -> BoxFuture<'static, Result<()>> {
1631 self.send(
1632 vec![acp::ContentBlock::Text(acp::TextContent {
1633 text: message.to_string(),
1634 annotations: None,
1635 meta: None,
1636 })],
1637 cx,
1638 )
1639 }
1640
1641 pub fn send(
1642 &mut self,
1643 message: Vec<acp::ContentBlock>,
1644 cx: &mut Context<Self>,
1645 ) -> BoxFuture<'static, Result<()>> {
1646 let block = ContentBlock::new_combined(
1647 message.clone(),
1648 self.project.read(cx).languages().clone(),
1649 self.project.read(cx).path_style(cx),
1650 cx,
1651 );
1652 let request = acp::PromptRequest {
1653 prompt: message.clone(),
1654 session_id: self.session_id.clone(),
1655 meta: None,
1656 };
1657 let git_store = self.project.read(cx).git_store().clone();
1658
1659 let message_id = if self.connection.truncate(&self.session_id, cx).is_some() {
1660 Some(UserMessageId::new())
1661 } else {
1662 None
1663 };
1664
1665 self.run_turn(cx, async move |this, cx| {
1666 this.update(cx, |this, cx| {
1667 this.push_entry(
1668 AgentThreadEntry::UserMessage(UserMessage {
1669 id: message_id.clone(),
1670 content: block,
1671 chunks: message,
1672 checkpoint: None,
1673 }),
1674 cx,
1675 );
1676 })
1677 .ok();
1678
1679 let old_checkpoint = git_store
1680 .update(cx, |git, cx| git.checkpoint(cx))?
1681 .await
1682 .context("failed to get old checkpoint")
1683 .log_err();
1684 this.update(cx, |this, cx| {
1685 if let Some((_ix, message)) = this.last_user_message() {
1686 message.checkpoint = old_checkpoint.map(|git_checkpoint| Checkpoint {
1687 git_checkpoint,
1688 show: false,
1689 });
1690 }
1691 this.connection.prompt(message_id, request, cx)
1692 })?
1693 .await
1694 })
1695 }
1696
1697 pub fn can_resume(&self, cx: &App) -> bool {
1698 self.connection.resume(&self.session_id, cx).is_some()
1699 }
1700
1701 pub fn resume(&mut self, cx: &mut Context<Self>) -> BoxFuture<'static, Result<()>> {
1702 self.run_turn(cx, async move |this, cx| {
1703 this.update(cx, |this, cx| {
1704 this.connection
1705 .resume(&this.session_id, cx)
1706 .map(|resume| resume.run(cx))
1707 })?
1708 .context("resuming a session is not supported")?
1709 .await
1710 })
1711 }
1712
1713 fn run_turn(
1714 &mut self,
1715 cx: &mut Context<Self>,
1716 f: impl 'static + AsyncFnOnce(WeakEntity<Self>, &mut AsyncApp) -> Result<acp::PromptResponse>,
1717 ) -> BoxFuture<'static, Result<()>> {
1718 self.clear_completed_plan_entries(cx);
1719
1720 let (tx, rx) = oneshot::channel();
1721 let cancel_task = self.cancel(cx);
1722
1723 self.send_task = Some(cx.spawn(async move |this, cx| {
1724 cancel_task.await;
1725 tx.send(f(this, cx).await).ok();
1726 }));
1727
1728 cx.spawn(async move |this, cx| {
1729 let response = rx.await;
1730
1731 this.update(cx, |this, cx| this.update_last_checkpoint(cx))?
1732 .await?;
1733
1734 this.update(cx, |this, cx| {
1735 this.project
1736 .update(cx, |project, cx| project.set_agent_location(None, cx));
1737 match response {
1738 Ok(Err(e)) => {
1739 this.send_task.take();
1740 cx.emit(AcpThreadEvent::Error);
1741 Err(e)
1742 }
1743 result => {
1744 let canceled = matches!(
1745 result,
1746 Ok(Ok(acp::PromptResponse {
1747 stop_reason: acp::StopReason::Cancelled,
1748 meta: None,
1749 }))
1750 );
1751
1752 // We only take the task if the current prompt wasn't canceled.
1753 //
1754 // This prompt may have been canceled because another one was sent
1755 // while it was still generating. In these cases, dropping `send_task`
1756 // would cause the next generation to be canceled.
1757 if !canceled {
1758 this.send_task.take();
1759 }
1760
1761 // Handle refusal - distinguish between user prompt and tool call refusals
1762 if let Ok(Ok(acp::PromptResponse {
1763 stop_reason: acp::StopReason::Refusal,
1764 meta: _,
1765 })) = result
1766 {
1767 if let Some((user_msg_ix, _)) = this.last_user_message() {
1768 // Check if there's a completed tool call with results after the last user message
1769 // This indicates the refusal is in response to tool output, not the user's prompt
1770 let has_completed_tool_call_after_user_msg =
1771 this.entries.iter().skip(user_msg_ix + 1).any(|entry| {
1772 if let AgentThreadEntry::ToolCall(tool_call) = entry {
1773 // Check if the tool call has completed and has output
1774 matches!(tool_call.status, ToolCallStatus::Completed)
1775 && tool_call.raw_output.is_some()
1776 } else {
1777 false
1778 }
1779 });
1780
1781 if has_completed_tool_call_after_user_msg {
1782 // Refusal is due to tool output - don't truncate, just notify
1783 // The model refused based on what the tool returned
1784 cx.emit(AcpThreadEvent::Refusal);
1785 } else {
1786 // User prompt was refused - truncate back to before the user message
1787 let range = user_msg_ix..this.entries.len();
1788 if range.start < range.end {
1789 this.entries.truncate(user_msg_ix);
1790 cx.emit(AcpThreadEvent::EntriesRemoved(range));
1791 }
1792 cx.emit(AcpThreadEvent::Refusal);
1793 }
1794 } else {
1795 // No user message found, treat as general refusal
1796 cx.emit(AcpThreadEvent::Refusal);
1797 }
1798 }
1799
1800 cx.emit(AcpThreadEvent::Stopped);
1801 Ok(())
1802 }
1803 }
1804 })?
1805 })
1806 .boxed()
1807 }
1808
1809 pub fn cancel(&mut self, cx: &mut Context<Self>) -> Task<()> {
1810 let Some(send_task) = self.send_task.take() else {
1811 return Task::ready(());
1812 };
1813
1814 for entry in self.entries.iter_mut() {
1815 if let AgentThreadEntry::ToolCall(call) = entry {
1816 let cancel = matches!(
1817 call.status,
1818 ToolCallStatus::Pending
1819 | ToolCallStatus::WaitingForConfirmation { .. }
1820 | ToolCallStatus::InProgress
1821 );
1822
1823 if cancel {
1824 call.status = ToolCallStatus::Canceled;
1825 }
1826 }
1827 }
1828
1829 self.connection.cancel(&self.session_id, cx);
1830
1831 // Wait for the send task to complete
1832 cx.foreground_executor().spawn(send_task)
1833 }
1834
1835 /// Restores the git working tree to the state at the given checkpoint (if one exists)
1836 pub fn restore_checkpoint(
1837 &mut self,
1838 id: UserMessageId,
1839 cx: &mut Context<Self>,
1840 ) -> Task<Result<()>> {
1841 let Some((_, message)) = self.user_message_mut(&id) else {
1842 return Task::ready(Err(anyhow!("message not found")));
1843 };
1844
1845 let checkpoint = message
1846 .checkpoint
1847 .as_ref()
1848 .map(|c| c.git_checkpoint.clone());
1849 let rewind = self.rewind(id.clone(), cx);
1850 let git_store = self.project.read(cx).git_store().clone();
1851
1852 cx.spawn(async move |_, cx| {
1853 rewind.await?;
1854 if let Some(checkpoint) = checkpoint {
1855 git_store
1856 .update(cx, |git, cx| git.restore_checkpoint(checkpoint, cx))?
1857 .await?;
1858 }
1859
1860 Ok(())
1861 })
1862 }
1863
1864 /// Rewinds this thread to before the entry at `index`, removing it and all
1865 /// subsequent entries while rejecting any action_log changes made from that point.
1866 /// Unlike `restore_checkpoint`, this method does not restore from git.
1867 pub fn rewind(&mut self, id: UserMessageId, cx: &mut Context<Self>) -> Task<Result<()>> {
1868 let Some(truncate) = self.connection.truncate(&self.session_id, cx) else {
1869 return Task::ready(Err(anyhow!("not supported")));
1870 };
1871
1872 cx.spawn(async move |this, cx| {
1873 cx.update(|cx| truncate.run(id.clone(), cx))?.await?;
1874 this.update(cx, |this, cx| {
1875 if let Some((ix, _)) = this.user_message_mut(&id) {
1876 let range = ix..this.entries.len();
1877 this.entries.truncate(ix);
1878 cx.emit(AcpThreadEvent::EntriesRemoved(range));
1879 }
1880 this.action_log()
1881 .update(cx, |action_log, cx| action_log.reject_all_edits(cx))
1882 })?
1883 .await;
1884 Ok(())
1885 })
1886 }
1887
1888 fn update_last_checkpoint(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
1889 let git_store = self.project.read(cx).git_store().clone();
1890
1891 let old_checkpoint = if let Some((_, message)) = self.last_user_message() {
1892 if let Some(checkpoint) = message.checkpoint.as_ref() {
1893 checkpoint.git_checkpoint.clone()
1894 } else {
1895 return Task::ready(Ok(()));
1896 }
1897 } else {
1898 return Task::ready(Ok(()));
1899 };
1900
1901 let new_checkpoint = git_store.update(cx, |git, cx| git.checkpoint(cx));
1902 cx.spawn(async move |this, cx| {
1903 let new_checkpoint = new_checkpoint
1904 .await
1905 .context("failed to get new checkpoint")
1906 .log_err();
1907 if let Some(new_checkpoint) = new_checkpoint {
1908 let equal = git_store
1909 .update(cx, |git, cx| {
1910 git.compare_checkpoints(old_checkpoint.clone(), new_checkpoint, cx)
1911 })?
1912 .await
1913 .unwrap_or(true);
1914 this.update(cx, |this, cx| {
1915 let (ix, message) = this.last_user_message().context("no user message")?;
1916 let checkpoint = message.checkpoint.as_mut().context("no checkpoint")?;
1917 checkpoint.show = !equal;
1918 cx.emit(AcpThreadEvent::EntryUpdated(ix));
1919 anyhow::Ok(())
1920 })??;
1921 }
1922
1923 Ok(())
1924 })
1925 }
1926
1927 fn last_user_message(&mut self) -> Option<(usize, &mut UserMessage)> {
1928 self.entries
1929 .iter_mut()
1930 .enumerate()
1931 .rev()
1932 .find_map(|(ix, entry)| {
1933 if let AgentThreadEntry::UserMessage(message) = entry {
1934 Some((ix, message))
1935 } else {
1936 None
1937 }
1938 })
1939 }
1940
1941 fn user_message_mut(&mut self, id: &UserMessageId) -> Option<(usize, &mut UserMessage)> {
1942 self.entries.iter_mut().enumerate().find_map(|(ix, entry)| {
1943 if let AgentThreadEntry::UserMessage(message) = entry {
1944 if message.id.as_ref() == Some(id) {
1945 Some((ix, message))
1946 } else {
1947 None
1948 }
1949 } else {
1950 None
1951 }
1952 })
1953 }
1954
1955 pub fn read_text_file(
1956 &self,
1957 path: PathBuf,
1958 line: Option<u32>,
1959 limit: Option<u32>,
1960 reuse_shared_snapshot: bool,
1961 cx: &mut Context<Self>,
1962 ) -> Task<Result<String, acp::Error>> {
1963 // Args are 1-based, move to 0-based
1964 let line = line.unwrap_or_default().saturating_sub(1);
1965 let limit = limit.unwrap_or(u32::MAX);
1966 let project = self.project.clone();
1967 let action_log = self.action_log.clone();
1968 cx.spawn(async move |this, cx| {
1969 let load = project
1970 .update(cx, |project, cx| {
1971 let path = project
1972 .project_path_for_absolute_path(&path, cx)
1973 .ok_or_else(|| {
1974 acp::Error::resource_not_found(Some(path.display().to_string()))
1975 })?;
1976 Ok(project.open_buffer(path, cx))
1977 })
1978 .map_err(|e| acp::Error::internal_error().with_data(e.to_string()))
1979 .flatten()?;
1980
1981 let buffer = load.await?;
1982
1983 let snapshot = if reuse_shared_snapshot {
1984 this.read_with(cx, |this, _| {
1985 this.shared_buffers.get(&buffer.clone()).cloned()
1986 })
1987 .log_err()
1988 .flatten()
1989 } else {
1990 None
1991 };
1992
1993 let snapshot = if let Some(snapshot) = snapshot {
1994 snapshot
1995 } else {
1996 action_log.update(cx, |action_log, cx| {
1997 action_log.buffer_read(buffer.clone(), cx);
1998 })?;
1999
2000 let snapshot = buffer.update(cx, |buffer, _| buffer.snapshot())?;
2001 this.update(cx, |this, _| {
2002 this.shared_buffers.insert(buffer.clone(), snapshot.clone());
2003 })?;
2004 snapshot
2005 };
2006
2007 let max_point = snapshot.max_point();
2008 let start_position = Point::new(line, 0);
2009
2010 if start_position > max_point {
2011 return Err(acp::Error::invalid_params().with_data(format!(
2012 "Attempting to read beyond the end of the file, line {}:{}",
2013 max_point.row + 1,
2014 max_point.column
2015 )));
2016 }
2017
2018 let start = snapshot.anchor_before(start_position);
2019 let end = snapshot.anchor_before(Point::new(line.saturating_add(limit), 0));
2020
2021 project.update(cx, |project, cx| {
2022 project.set_agent_location(
2023 Some(AgentLocation {
2024 buffer: buffer.downgrade(),
2025 position: start,
2026 }),
2027 cx,
2028 );
2029 })?;
2030
2031 Ok(snapshot.text_for_range(start..end).collect::<String>())
2032 })
2033 }
2034
2035 pub fn write_text_file(
2036 &self,
2037 path: PathBuf,
2038 content: String,
2039 cx: &mut Context<Self>,
2040 ) -> Task<Result<()>> {
2041 let project = self.project.clone();
2042 let action_log = self.action_log.clone();
2043 cx.spawn(async move |this, cx| {
2044 let load = project.update(cx, |project, cx| {
2045 let path = project
2046 .project_path_for_absolute_path(&path, cx)
2047 .context("invalid path")?;
2048 anyhow::Ok(project.open_buffer(path, cx))
2049 });
2050 let buffer = load??.await?;
2051 let snapshot = this.update(cx, |this, cx| {
2052 this.shared_buffers
2053 .get(&buffer)
2054 .cloned()
2055 .unwrap_or_else(|| buffer.read(cx).snapshot())
2056 })?;
2057 let edits = cx
2058 .background_executor()
2059 .spawn(async move {
2060 let old_text = snapshot.text();
2061 text_diff(old_text.as_str(), &content)
2062 .into_iter()
2063 .map(|(range, replacement)| {
2064 (
2065 snapshot.anchor_after(range.start)
2066 ..snapshot.anchor_before(range.end),
2067 replacement,
2068 )
2069 })
2070 .collect::<Vec<_>>()
2071 })
2072 .await;
2073
2074 project.update(cx, |project, cx| {
2075 project.set_agent_location(
2076 Some(AgentLocation {
2077 buffer: buffer.downgrade(),
2078 position: edits
2079 .last()
2080 .map(|(range, _)| range.end)
2081 .unwrap_or(Anchor::MIN),
2082 }),
2083 cx,
2084 );
2085 })?;
2086
2087 let format_on_save = cx.update(|cx| {
2088 action_log.update(cx, |action_log, cx| {
2089 action_log.buffer_read(buffer.clone(), cx);
2090 });
2091
2092 let format_on_save = buffer.update(cx, |buffer, cx| {
2093 buffer.edit(edits, None, cx);
2094
2095 let settings = language::language_settings::language_settings(
2096 buffer.language().map(|l| l.name()),
2097 buffer.file(),
2098 cx,
2099 );
2100
2101 settings.format_on_save != FormatOnSave::Off
2102 });
2103 action_log.update(cx, |action_log, cx| {
2104 action_log.buffer_edited(buffer.clone(), cx);
2105 });
2106 format_on_save
2107 })?;
2108
2109 if format_on_save {
2110 let format_task = project.update(cx, |project, cx| {
2111 project.format(
2112 HashSet::from_iter([buffer.clone()]),
2113 LspFormatTarget::Buffers,
2114 false,
2115 FormatTrigger::Save,
2116 cx,
2117 )
2118 })?;
2119 format_task.await.log_err();
2120
2121 action_log.update(cx, |action_log, cx| {
2122 action_log.buffer_edited(buffer.clone(), cx);
2123 })?;
2124 }
2125
2126 project
2127 .update(cx, |project, cx| project.save_buffer(buffer, cx))?
2128 .await
2129 })
2130 }
2131
2132 pub fn create_terminal(
2133 &self,
2134 command: String,
2135 args: Vec<String>,
2136 extra_env: Vec<acp::EnvVariable>,
2137 cwd: Option<PathBuf>,
2138 output_byte_limit: Option<u64>,
2139 cx: &mut Context<Self>,
2140 ) -> Task<Result<Entity<Terminal>>> {
2141 let env = match &cwd {
2142 Some(dir) => self.project.update(cx, |project, cx| {
2143 project.environment().update(cx, |env, cx| {
2144 env.directory_environment(dir.as_path().into(), cx)
2145 })
2146 }),
2147 None => Task::ready(None).shared(),
2148 };
2149 let env = cx.spawn(async move |_, _| {
2150 let mut env = env.await.unwrap_or_default();
2151 // Disables paging for `git` and hopefully other commands
2152 env.insert("PAGER".into(), "".into());
2153 for var in extra_env {
2154 env.insert(var.name, var.value);
2155 }
2156 env
2157 });
2158
2159 let project = self.project.clone();
2160 let language_registry = project.read(cx).languages().clone();
2161 let is_windows = project.read(cx).path_style(cx).is_windows();
2162
2163 let terminal_id = acp::TerminalId(Uuid::new_v4().to_string().into());
2164 let terminal_task = cx.spawn({
2165 let terminal_id = terminal_id.clone();
2166 async move |_this, cx| {
2167 let env = env.await;
2168 let shell = project
2169 .update(cx, |project, cx| {
2170 project
2171 .remote_client()
2172 .and_then(|r| r.read(cx).default_system_shell())
2173 })?
2174 .unwrap_or_else(|| get_default_system_shell_preferring_bash());
2175 let (task_command, task_args) =
2176 ShellBuilder::new(&Shell::Program(shell), is_windows)
2177 .redirect_stdin_to_dev_null()
2178 .build(Some(command.clone()), &args);
2179 let terminal = project
2180 .update(cx, |project, cx| {
2181 project.create_terminal_task(
2182 task::SpawnInTerminal {
2183 command: Some(task_command),
2184 args: task_args,
2185 cwd: cwd.clone(),
2186 env,
2187 ..Default::default()
2188 },
2189 cx,
2190 )
2191 })?
2192 .await?;
2193
2194 cx.new(|cx| {
2195 Terminal::new(
2196 terminal_id,
2197 &format!("{} {}", command, args.join(" ")),
2198 cwd,
2199 output_byte_limit.map(|l| l as usize),
2200 terminal,
2201 language_registry,
2202 cx,
2203 )
2204 })
2205 }
2206 });
2207
2208 cx.spawn(async move |this, cx| {
2209 let terminal = terminal_task.await?;
2210 this.update(cx, |this, _cx| {
2211 this.terminals.insert(terminal_id, terminal.clone());
2212 terminal
2213 })
2214 })
2215 }
2216
2217 pub fn kill_terminal(
2218 &mut self,
2219 terminal_id: acp::TerminalId,
2220 cx: &mut Context<Self>,
2221 ) -> Result<()> {
2222 self.terminals
2223 .get(&terminal_id)
2224 .context("Terminal not found")?
2225 .update(cx, |terminal, cx| {
2226 terminal.kill(cx);
2227 });
2228
2229 Ok(())
2230 }
2231
2232 pub fn release_terminal(
2233 &mut self,
2234 terminal_id: acp::TerminalId,
2235 cx: &mut Context<Self>,
2236 ) -> Result<()> {
2237 self.terminals
2238 .remove(&terminal_id)
2239 .context("Terminal not found")?
2240 .update(cx, |terminal, cx| {
2241 terminal.kill(cx);
2242 });
2243
2244 Ok(())
2245 }
2246
2247 pub fn terminal(&self, terminal_id: acp::TerminalId) -> Result<Entity<Terminal>> {
2248 self.terminals
2249 .get(&terminal_id)
2250 .context("Terminal not found")
2251 .cloned()
2252 }
2253
2254 pub fn to_markdown(&self, cx: &App) -> String {
2255 self.entries.iter().map(|e| e.to_markdown(cx)).collect()
2256 }
2257
2258 pub fn emit_load_error(&mut self, error: LoadError, cx: &mut Context<Self>) {
2259 cx.emit(AcpThreadEvent::LoadError(error));
2260 }
2261
2262 pub fn register_terminal_created(
2263 &mut self,
2264 terminal_id: acp::TerminalId,
2265 command_label: String,
2266 working_dir: Option<PathBuf>,
2267 output_byte_limit: Option<u64>,
2268 terminal: Entity<::terminal::Terminal>,
2269 cx: &mut Context<Self>,
2270 ) -> Entity<Terminal> {
2271 let language_registry = self.project.read(cx).languages().clone();
2272
2273 let entity = cx.new(|cx| {
2274 Terminal::new(
2275 terminal_id.clone(),
2276 &command_label,
2277 working_dir.clone(),
2278 output_byte_limit.map(|l| l as usize),
2279 terminal,
2280 language_registry,
2281 cx,
2282 )
2283 });
2284 self.terminals.insert(terminal_id.clone(), entity.clone());
2285 entity
2286 }
2287}
2288
2289fn markdown_for_raw_output(
2290 raw_output: &serde_json::Value,
2291 language_registry: &Arc<LanguageRegistry>,
2292 cx: &mut App,
2293) -> Option<Entity<Markdown>> {
2294 match raw_output {
2295 serde_json::Value::Null => None,
2296 serde_json::Value::Bool(value) => Some(cx.new(|cx| {
2297 Markdown::new(
2298 value.to_string().into(),
2299 Some(language_registry.clone()),
2300 None,
2301 cx,
2302 )
2303 })),
2304 serde_json::Value::Number(value) => Some(cx.new(|cx| {
2305 Markdown::new(
2306 value.to_string().into(),
2307 Some(language_registry.clone()),
2308 None,
2309 cx,
2310 )
2311 })),
2312 serde_json::Value::String(value) => Some(cx.new(|cx| {
2313 Markdown::new(
2314 value.clone().into(),
2315 Some(language_registry.clone()),
2316 None,
2317 cx,
2318 )
2319 })),
2320 value => Some(cx.new(|cx| {
2321 Markdown::new(
2322 format!("```json\n{}\n```", value).into(),
2323 Some(language_registry.clone()),
2324 None,
2325 cx,
2326 )
2327 })),
2328 }
2329}
2330
2331#[cfg(test)]
2332mod tests {
2333 use super::*;
2334 use anyhow::anyhow;
2335 use futures::{channel::mpsc, future::LocalBoxFuture, select};
2336 use gpui::{App, AsyncApp, TestAppContext, WeakEntity};
2337 use indoc::indoc;
2338 use project::{FakeFs, Fs};
2339 use rand::{distr, prelude::*};
2340 use serde_json::json;
2341 use settings::SettingsStore;
2342 use smol::stream::StreamExt as _;
2343 use std::{
2344 any::Any,
2345 cell::RefCell,
2346 path::Path,
2347 rc::Rc,
2348 sync::atomic::{AtomicBool, AtomicUsize, Ordering::SeqCst},
2349 time::Duration,
2350 };
2351 use util::path;
2352
2353 fn init_test(cx: &mut TestAppContext) {
2354 env_logger::try_init().ok();
2355 cx.update(|cx| {
2356 let settings_store = SettingsStore::test(cx);
2357 cx.set_global(settings_store);
2358 Project::init_settings(cx);
2359 language::init(cx);
2360 });
2361 }
2362
2363 #[gpui::test]
2364 async fn test_terminal_output_buffered_before_created_renders(cx: &mut gpui::TestAppContext) {
2365 init_test(cx);
2366
2367 let fs = FakeFs::new(cx.executor());
2368 let project = Project::test(fs, [], cx).await;
2369 let connection = Rc::new(FakeAgentConnection::new());
2370 let thread = cx
2371 .update(|cx| connection.new_thread(project, std::path::Path::new(path!("/test")), cx))
2372 .await
2373 .unwrap();
2374
2375 let terminal_id = acp::TerminalId(uuid::Uuid::new_v4().to_string().into());
2376
2377 // Send Output BEFORE Created - should be buffered by acp_thread
2378 thread.update(cx, |thread, cx| {
2379 thread.on_terminal_provider_event(
2380 TerminalProviderEvent::Output {
2381 terminal_id: terminal_id.clone(),
2382 data: b"hello buffered".to_vec(),
2383 },
2384 cx,
2385 );
2386 });
2387
2388 // Create a display-only terminal and then send Created
2389 let lower = cx.new(|cx| {
2390 let builder = ::terminal::TerminalBuilder::new_display_only(
2391 ::terminal::terminal_settings::CursorShape::default(),
2392 ::terminal::terminal_settings::AlternateScroll::On,
2393 None,
2394 0,
2395 )
2396 .unwrap();
2397 builder.subscribe(cx)
2398 });
2399
2400 thread.update(cx, |thread, cx| {
2401 thread.on_terminal_provider_event(
2402 TerminalProviderEvent::Created {
2403 terminal_id: terminal_id.clone(),
2404 label: "Buffered Test".to_string(),
2405 cwd: None,
2406 output_byte_limit: None,
2407 terminal: lower.clone(),
2408 },
2409 cx,
2410 );
2411 });
2412
2413 // After Created, buffered Output should have been flushed into the renderer
2414 let content = thread.read_with(cx, |thread, cx| {
2415 let term = thread.terminal(terminal_id.clone()).unwrap();
2416 term.read_with(cx, |t, cx| t.inner().read(cx).get_content())
2417 });
2418
2419 assert!(
2420 content.contains("hello buffered"),
2421 "expected buffered output to render, got: {content}"
2422 );
2423 }
2424
2425 #[gpui::test]
2426 async fn test_terminal_output_and_exit_buffered_before_created(cx: &mut gpui::TestAppContext) {
2427 init_test(cx);
2428
2429 let fs = FakeFs::new(cx.executor());
2430 let project = Project::test(fs, [], cx).await;
2431 let connection = Rc::new(FakeAgentConnection::new());
2432 let thread = cx
2433 .update(|cx| connection.new_thread(project, std::path::Path::new(path!("/test")), cx))
2434 .await
2435 .unwrap();
2436
2437 let terminal_id = acp::TerminalId(uuid::Uuid::new_v4().to_string().into());
2438
2439 // Send Output BEFORE Created
2440 thread.update(cx, |thread, cx| {
2441 thread.on_terminal_provider_event(
2442 TerminalProviderEvent::Output {
2443 terminal_id: terminal_id.clone(),
2444 data: b"pre-exit data".to_vec(),
2445 },
2446 cx,
2447 );
2448 });
2449
2450 // Send Exit BEFORE Created
2451 thread.update(cx, |thread, cx| {
2452 thread.on_terminal_provider_event(
2453 TerminalProviderEvent::Exit {
2454 terminal_id: terminal_id.clone(),
2455 status: acp::TerminalExitStatus {
2456 exit_code: Some(0),
2457 signal: None,
2458 meta: None,
2459 },
2460 },
2461 cx,
2462 );
2463 });
2464
2465 // Now create a display-only lower-level terminal and send Created
2466 let lower = cx.new(|cx| {
2467 let builder = ::terminal::TerminalBuilder::new_display_only(
2468 ::terminal::terminal_settings::CursorShape::default(),
2469 ::terminal::terminal_settings::AlternateScroll::On,
2470 None,
2471 0,
2472 )
2473 .unwrap();
2474 builder.subscribe(cx)
2475 });
2476
2477 thread.update(cx, |thread, cx| {
2478 thread.on_terminal_provider_event(
2479 TerminalProviderEvent::Created {
2480 terminal_id: terminal_id.clone(),
2481 label: "Buffered Exit Test".to_string(),
2482 cwd: None,
2483 output_byte_limit: None,
2484 terminal: lower.clone(),
2485 },
2486 cx,
2487 );
2488 });
2489
2490 // Output should be present after Created (flushed from buffer)
2491 let content = thread.read_with(cx, |thread, cx| {
2492 let term = thread.terminal(terminal_id.clone()).unwrap();
2493 term.read_with(cx, |t, cx| t.inner().read(cx).get_content())
2494 });
2495
2496 assert!(
2497 content.contains("pre-exit data"),
2498 "expected pre-exit data to render, got: {content}"
2499 );
2500 }
2501
2502 #[gpui::test]
2503 async fn test_push_user_content_block(cx: &mut gpui::TestAppContext) {
2504 init_test(cx);
2505
2506 let fs = FakeFs::new(cx.executor());
2507 let project = Project::test(fs, [], cx).await;
2508 let connection = Rc::new(FakeAgentConnection::new());
2509 let thread = cx
2510 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
2511 .await
2512 .unwrap();
2513
2514 // Test creating a new user message
2515 thread.update(cx, |thread, cx| {
2516 thread.push_user_content_block(
2517 None,
2518 acp::ContentBlock::Text(acp::TextContent {
2519 annotations: None,
2520 text: "Hello, ".to_string(),
2521 meta: None,
2522 }),
2523 cx,
2524 );
2525 });
2526
2527 thread.update(cx, |thread, cx| {
2528 assert_eq!(thread.entries.len(), 1);
2529 if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
2530 assert_eq!(user_msg.id, None);
2531 assert_eq!(user_msg.content.to_markdown(cx), "Hello, ");
2532 } else {
2533 panic!("Expected UserMessage");
2534 }
2535 });
2536
2537 // Test appending to existing user message
2538 let message_1_id = UserMessageId::new();
2539 thread.update(cx, |thread, cx| {
2540 thread.push_user_content_block(
2541 Some(message_1_id.clone()),
2542 acp::ContentBlock::Text(acp::TextContent {
2543 annotations: None,
2544 text: "world!".to_string(),
2545 meta: None,
2546 }),
2547 cx,
2548 );
2549 });
2550
2551 thread.update(cx, |thread, cx| {
2552 assert_eq!(thread.entries.len(), 1);
2553 if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
2554 assert_eq!(user_msg.id, Some(message_1_id));
2555 assert_eq!(user_msg.content.to_markdown(cx), "Hello, world!");
2556 } else {
2557 panic!("Expected UserMessage");
2558 }
2559 });
2560
2561 // Test creating new user message after assistant message
2562 thread.update(cx, |thread, cx| {
2563 thread.push_assistant_content_block(
2564 acp::ContentBlock::Text(acp::TextContent {
2565 annotations: None,
2566 text: "Assistant response".to_string(),
2567 meta: None,
2568 }),
2569 false,
2570 cx,
2571 );
2572 });
2573
2574 let message_2_id = UserMessageId::new();
2575 thread.update(cx, |thread, cx| {
2576 thread.push_user_content_block(
2577 Some(message_2_id.clone()),
2578 acp::ContentBlock::Text(acp::TextContent {
2579 annotations: None,
2580 text: "New user message".to_string(),
2581 meta: None,
2582 }),
2583 cx,
2584 );
2585 });
2586
2587 thread.update(cx, |thread, cx| {
2588 assert_eq!(thread.entries.len(), 3);
2589 if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[2] {
2590 assert_eq!(user_msg.id, Some(message_2_id));
2591 assert_eq!(user_msg.content.to_markdown(cx), "New user message");
2592 } else {
2593 panic!("Expected UserMessage at index 2");
2594 }
2595 });
2596 }
2597
2598 #[gpui::test]
2599 async fn test_thinking_concatenation(cx: &mut gpui::TestAppContext) {
2600 init_test(cx);
2601
2602 let fs = FakeFs::new(cx.executor());
2603 let project = Project::test(fs, [], cx).await;
2604 let connection = Rc::new(FakeAgentConnection::new().on_user_message(
2605 |_, thread, mut cx| {
2606 async move {
2607 thread.update(&mut cx, |thread, cx| {
2608 thread
2609 .handle_session_update(
2610 acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk {
2611 content: "Thinking ".into(),
2612 meta: None,
2613 }),
2614 cx,
2615 )
2616 .unwrap();
2617 thread
2618 .handle_session_update(
2619 acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk {
2620 content: "hard!".into(),
2621 meta: None,
2622 }),
2623 cx,
2624 )
2625 .unwrap();
2626 })?;
2627 Ok(acp::PromptResponse {
2628 stop_reason: acp::StopReason::EndTurn,
2629 meta: None,
2630 })
2631 }
2632 .boxed_local()
2633 },
2634 ));
2635
2636 let thread = cx
2637 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
2638 .await
2639 .unwrap();
2640
2641 thread
2642 .update(cx, |thread, cx| thread.send_raw("Hello from Zed!", cx))
2643 .await
2644 .unwrap();
2645
2646 let output = thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
2647 assert_eq!(
2648 output,
2649 indoc! {r#"
2650 ## User
2651
2652 Hello from Zed!
2653
2654 ## Assistant
2655
2656 <thinking>
2657 Thinking hard!
2658 </thinking>
2659
2660 "#}
2661 );
2662 }
2663
2664 #[gpui::test]
2665 async fn test_edits_concurrently_to_user(cx: &mut TestAppContext) {
2666 init_test(cx);
2667
2668 let fs = FakeFs::new(cx.executor());
2669 fs.insert_tree(path!("/tmp"), json!({"foo": "one\ntwo\nthree\n"}))
2670 .await;
2671 let project = Project::test(fs.clone(), [], cx).await;
2672 let (read_file_tx, read_file_rx) = oneshot::channel::<()>();
2673 let read_file_tx = Rc::new(RefCell::new(Some(read_file_tx)));
2674 let connection = Rc::new(FakeAgentConnection::new().on_user_message(
2675 move |_, thread, mut cx| {
2676 let read_file_tx = read_file_tx.clone();
2677 async move {
2678 let content = thread
2679 .update(&mut cx, |thread, cx| {
2680 thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
2681 })
2682 .unwrap()
2683 .await
2684 .unwrap();
2685 assert_eq!(content, "one\ntwo\nthree\n");
2686 read_file_tx.take().unwrap().send(()).unwrap();
2687 thread
2688 .update(&mut cx, |thread, cx| {
2689 thread.write_text_file(
2690 path!("/tmp/foo").into(),
2691 "one\ntwo\nthree\nfour\nfive\n".to_string(),
2692 cx,
2693 )
2694 })
2695 .unwrap()
2696 .await
2697 .unwrap();
2698 Ok(acp::PromptResponse {
2699 stop_reason: acp::StopReason::EndTurn,
2700 meta: None,
2701 })
2702 }
2703 .boxed_local()
2704 },
2705 ));
2706
2707 let (worktree, pathbuf) = project
2708 .update(cx, |project, cx| {
2709 project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
2710 })
2711 .await
2712 .unwrap();
2713 let buffer = project
2714 .update(cx, |project, cx| {
2715 project.open_buffer((worktree.read(cx).id(), pathbuf), cx)
2716 })
2717 .await
2718 .unwrap();
2719
2720 let thread = cx
2721 .update(|cx| connection.new_thread(project, Path::new(path!("/tmp")), cx))
2722 .await
2723 .unwrap();
2724
2725 let request = thread.update(cx, |thread, cx| {
2726 thread.send_raw("Extend the count in /tmp/foo", cx)
2727 });
2728 read_file_rx.await.ok();
2729 buffer.update(cx, |buffer, cx| {
2730 buffer.edit([(0..0, "zero\n".to_string())], None, cx);
2731 });
2732 cx.run_until_parked();
2733 assert_eq!(
2734 buffer.read_with(cx, |buffer, _| buffer.text()),
2735 "zero\none\ntwo\nthree\nfour\nfive\n"
2736 );
2737 assert_eq!(
2738 String::from_utf8(fs.read_file_sync(path!("/tmp/foo")).unwrap()).unwrap(),
2739 "zero\none\ntwo\nthree\nfour\nfive\n"
2740 );
2741 request.await.unwrap();
2742 }
2743
2744 #[gpui::test]
2745 async fn test_reading_from_line(cx: &mut TestAppContext) {
2746 init_test(cx);
2747
2748 let fs = FakeFs::new(cx.executor());
2749 fs.insert_tree(path!("/tmp"), json!({"foo": "one\ntwo\nthree\nfour\n"}))
2750 .await;
2751 let project = Project::test(fs.clone(), [], cx).await;
2752 project
2753 .update(cx, |project, cx| {
2754 project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
2755 })
2756 .await
2757 .unwrap();
2758
2759 let connection = Rc::new(FakeAgentConnection::new());
2760
2761 let thread = cx
2762 .update(|cx| connection.new_thread(project, Path::new(path!("/tmp")), cx))
2763 .await
2764 .unwrap();
2765
2766 // Whole file
2767 let content = thread
2768 .update(cx, |thread, cx| {
2769 thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
2770 })
2771 .await
2772 .unwrap();
2773
2774 assert_eq!(content, "one\ntwo\nthree\nfour\n");
2775
2776 // Only start line
2777 let content = thread
2778 .update(cx, |thread, cx| {
2779 thread.read_text_file(path!("/tmp/foo").into(), Some(3), None, false, cx)
2780 })
2781 .await
2782 .unwrap();
2783
2784 assert_eq!(content, "three\nfour\n");
2785
2786 // Only limit
2787 let content = thread
2788 .update(cx, |thread, cx| {
2789 thread.read_text_file(path!("/tmp/foo").into(), None, Some(2), false, cx)
2790 })
2791 .await
2792 .unwrap();
2793
2794 assert_eq!(content, "one\ntwo\n");
2795
2796 // Range
2797 let content = thread
2798 .update(cx, |thread, cx| {
2799 thread.read_text_file(path!("/tmp/foo").into(), Some(2), Some(2), false, cx)
2800 })
2801 .await
2802 .unwrap();
2803
2804 assert_eq!(content, "two\nthree\n");
2805
2806 // Invalid
2807 let err = thread
2808 .update(cx, |thread, cx| {
2809 thread.read_text_file(path!("/tmp/foo").into(), Some(6), Some(2), false, cx)
2810 })
2811 .await
2812 .unwrap_err();
2813
2814 assert_eq!(
2815 err.to_string(),
2816 "Invalid params: \"Attempting to read beyond the end of the file, line 5:0\""
2817 );
2818 }
2819
2820 #[gpui::test]
2821 async fn test_reading_empty_file(cx: &mut TestAppContext) {
2822 init_test(cx);
2823
2824 let fs = FakeFs::new(cx.executor());
2825 fs.insert_tree(path!("/tmp"), json!({"foo": ""})).await;
2826 let project = Project::test(fs.clone(), [], cx).await;
2827 project
2828 .update(cx, |project, cx| {
2829 project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
2830 })
2831 .await
2832 .unwrap();
2833
2834 let connection = Rc::new(FakeAgentConnection::new());
2835
2836 let thread = cx
2837 .update(|cx| connection.new_thread(project, Path::new(path!("/tmp")), cx))
2838 .await
2839 .unwrap();
2840
2841 // Whole file
2842 let content = thread
2843 .update(cx, |thread, cx| {
2844 thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
2845 })
2846 .await
2847 .unwrap();
2848
2849 assert_eq!(content, "");
2850
2851 // Only start line
2852 let content = thread
2853 .update(cx, |thread, cx| {
2854 thread.read_text_file(path!("/tmp/foo").into(), Some(1), None, false, cx)
2855 })
2856 .await
2857 .unwrap();
2858
2859 assert_eq!(content, "");
2860
2861 // Only limit
2862 let content = thread
2863 .update(cx, |thread, cx| {
2864 thread.read_text_file(path!("/tmp/foo").into(), None, Some(2), false, cx)
2865 })
2866 .await
2867 .unwrap();
2868
2869 assert_eq!(content, "");
2870
2871 // Range
2872 let content = thread
2873 .update(cx, |thread, cx| {
2874 thread.read_text_file(path!("/tmp/foo").into(), Some(1), Some(1), false, cx)
2875 })
2876 .await
2877 .unwrap();
2878
2879 assert_eq!(content, "");
2880
2881 // Invalid
2882 let err = thread
2883 .update(cx, |thread, cx| {
2884 thread.read_text_file(path!("/tmp/foo").into(), Some(5), Some(2), false, cx)
2885 })
2886 .await
2887 .unwrap_err();
2888
2889 assert_eq!(
2890 err.to_string(),
2891 "Invalid params: \"Attempting to read beyond the end of the file, line 1:0\""
2892 );
2893 }
2894 #[gpui::test]
2895 async fn test_reading_non_existing_file(cx: &mut TestAppContext) {
2896 init_test(cx);
2897
2898 let fs = FakeFs::new(cx.executor());
2899 fs.insert_tree(path!("/tmp"), json!({})).await;
2900 let project = Project::test(fs.clone(), [], cx).await;
2901 project
2902 .update(cx, |project, cx| {
2903 project.find_or_create_worktree(path!("/tmp"), true, cx)
2904 })
2905 .await
2906 .unwrap();
2907
2908 let connection = Rc::new(FakeAgentConnection::new());
2909
2910 let thread = cx
2911 .update(|cx| connection.new_thread(project, Path::new(path!("/tmp")), cx))
2912 .await
2913 .unwrap();
2914
2915 // Out of project file
2916 let err = thread
2917 .update(cx, |thread, cx| {
2918 thread.read_text_file(path!("/foo").into(), None, None, false, cx)
2919 })
2920 .await
2921 .unwrap_err();
2922
2923 assert_eq!(err.code, acp::ErrorCode::RESOURCE_NOT_FOUND.code);
2924 }
2925
2926 #[gpui::test]
2927 async fn test_succeeding_canceled_toolcall(cx: &mut TestAppContext) {
2928 init_test(cx);
2929
2930 let fs = FakeFs::new(cx.executor());
2931 let project = Project::test(fs, [], cx).await;
2932 let id = acp::ToolCallId("test".into());
2933
2934 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
2935 let id = id.clone();
2936 move |_, thread, mut cx| {
2937 let id = id.clone();
2938 async move {
2939 thread
2940 .update(&mut cx, |thread, cx| {
2941 thread.handle_session_update(
2942 acp::SessionUpdate::ToolCall(acp::ToolCall {
2943 id: id.clone(),
2944 title: "Label".into(),
2945 kind: acp::ToolKind::Fetch,
2946 status: acp::ToolCallStatus::InProgress,
2947 content: vec![],
2948 locations: vec![],
2949 raw_input: None,
2950 raw_output: None,
2951 meta: None,
2952 }),
2953 cx,
2954 )
2955 })
2956 .unwrap()
2957 .unwrap();
2958 Ok(acp::PromptResponse {
2959 stop_reason: acp::StopReason::EndTurn,
2960 meta: None,
2961 })
2962 }
2963 .boxed_local()
2964 }
2965 }));
2966
2967 let thread = cx
2968 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
2969 .await
2970 .unwrap();
2971
2972 let request = thread.update(cx, |thread, cx| {
2973 thread.send_raw("Fetch https://example.com", cx)
2974 });
2975
2976 run_until_first_tool_call(&thread, cx).await;
2977
2978 thread.read_with(cx, |thread, _| {
2979 assert!(matches!(
2980 thread.entries[1],
2981 AgentThreadEntry::ToolCall(ToolCall {
2982 status: ToolCallStatus::InProgress,
2983 ..
2984 })
2985 ));
2986 });
2987
2988 thread.update(cx, |thread, cx| thread.cancel(cx)).await;
2989
2990 thread.read_with(cx, |thread, _| {
2991 assert!(matches!(
2992 &thread.entries[1],
2993 AgentThreadEntry::ToolCall(ToolCall {
2994 status: ToolCallStatus::Canceled,
2995 ..
2996 })
2997 ));
2998 });
2999
3000 thread
3001 .update(cx, |thread, cx| {
3002 thread.handle_session_update(
3003 acp::SessionUpdate::ToolCallUpdate(acp::ToolCallUpdate {
3004 id,
3005 fields: acp::ToolCallUpdateFields {
3006 status: Some(acp::ToolCallStatus::Completed),
3007 ..Default::default()
3008 },
3009 meta: None,
3010 }),
3011 cx,
3012 )
3013 })
3014 .unwrap();
3015
3016 request.await.unwrap();
3017
3018 thread.read_with(cx, |thread, _| {
3019 assert!(matches!(
3020 thread.entries[1],
3021 AgentThreadEntry::ToolCall(ToolCall {
3022 status: ToolCallStatus::Completed,
3023 ..
3024 })
3025 ));
3026 });
3027 }
3028
3029 #[gpui::test]
3030 async fn test_no_pending_edits_if_tool_calls_are_completed(cx: &mut TestAppContext) {
3031 init_test(cx);
3032 let fs = FakeFs::new(cx.background_executor.clone());
3033 fs.insert_tree(path!("/test"), json!({})).await;
3034 let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
3035
3036 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3037 move |_, thread, mut cx| {
3038 async move {
3039 thread
3040 .update(&mut cx, |thread, cx| {
3041 thread.handle_session_update(
3042 acp::SessionUpdate::ToolCall(acp::ToolCall {
3043 id: acp::ToolCallId("test".into()),
3044 title: "Label".into(),
3045 kind: acp::ToolKind::Edit,
3046 status: acp::ToolCallStatus::Completed,
3047 content: vec![acp::ToolCallContent::Diff {
3048 diff: acp::Diff {
3049 path: "/test/test.txt".into(),
3050 old_text: None,
3051 new_text: "foo".into(),
3052 meta: None,
3053 },
3054 }],
3055 locations: vec![],
3056 raw_input: None,
3057 raw_output: None,
3058 meta: None,
3059 }),
3060 cx,
3061 )
3062 })
3063 .unwrap()
3064 .unwrap();
3065 Ok(acp::PromptResponse {
3066 stop_reason: acp::StopReason::EndTurn,
3067 meta: None,
3068 })
3069 }
3070 .boxed_local()
3071 }
3072 }));
3073
3074 let thread = cx
3075 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3076 .await
3077 .unwrap();
3078
3079 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Hi".into()], cx)))
3080 .await
3081 .unwrap();
3082
3083 assert!(cx.read(|cx| !thread.read(cx).has_pending_edit_tool_calls()));
3084 }
3085
3086 #[gpui::test(iterations = 10)]
3087 async fn test_checkpoints(cx: &mut TestAppContext) {
3088 init_test(cx);
3089 let fs = FakeFs::new(cx.background_executor.clone());
3090 fs.insert_tree(
3091 path!("/test"),
3092 json!({
3093 ".git": {}
3094 }),
3095 )
3096 .await;
3097 let project = Project::test(fs.clone(), [path!("/test").as_ref()], cx).await;
3098
3099 let simulate_changes = Arc::new(AtomicBool::new(true));
3100 let next_filename = Arc::new(AtomicUsize::new(0));
3101 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3102 let simulate_changes = simulate_changes.clone();
3103 let next_filename = next_filename.clone();
3104 let fs = fs.clone();
3105 move |request, thread, mut cx| {
3106 let fs = fs.clone();
3107 let simulate_changes = simulate_changes.clone();
3108 let next_filename = next_filename.clone();
3109 async move {
3110 if simulate_changes.load(SeqCst) {
3111 let filename = format!("/test/file-{}", next_filename.fetch_add(1, SeqCst));
3112 fs.write(Path::new(&filename), b"").await?;
3113 }
3114
3115 let acp::ContentBlock::Text(content) = &request.prompt[0] else {
3116 panic!("expected text content block");
3117 };
3118 thread.update(&mut cx, |thread, cx| {
3119 thread
3120 .handle_session_update(
3121 acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk {
3122 content: content.text.to_uppercase().into(),
3123 meta: None,
3124 }),
3125 cx,
3126 )
3127 .unwrap();
3128 })?;
3129 Ok(acp::PromptResponse {
3130 stop_reason: acp::StopReason::EndTurn,
3131 meta: None,
3132 })
3133 }
3134 .boxed_local()
3135 }
3136 }));
3137 let thread = cx
3138 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3139 .await
3140 .unwrap();
3141
3142 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Lorem".into()], cx)))
3143 .await
3144 .unwrap();
3145 thread.read_with(cx, |thread, cx| {
3146 assert_eq!(
3147 thread.to_markdown(cx),
3148 indoc! {"
3149 ## User (checkpoint)
3150
3151 Lorem
3152
3153 ## Assistant
3154
3155 LOREM
3156
3157 "}
3158 );
3159 });
3160 assert_eq!(fs.files(), vec![Path::new(path!("/test/file-0"))]);
3161
3162 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["ipsum".into()], cx)))
3163 .await
3164 .unwrap();
3165 thread.read_with(cx, |thread, cx| {
3166 assert_eq!(
3167 thread.to_markdown(cx),
3168 indoc! {"
3169 ## User (checkpoint)
3170
3171 Lorem
3172
3173 ## Assistant
3174
3175 LOREM
3176
3177 ## User (checkpoint)
3178
3179 ipsum
3180
3181 ## Assistant
3182
3183 IPSUM
3184
3185 "}
3186 );
3187 });
3188 assert_eq!(
3189 fs.files(),
3190 vec![
3191 Path::new(path!("/test/file-0")),
3192 Path::new(path!("/test/file-1"))
3193 ]
3194 );
3195
3196 // Checkpoint isn't stored when there are no changes.
3197 simulate_changes.store(false, SeqCst);
3198 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["dolor".into()], cx)))
3199 .await
3200 .unwrap();
3201 thread.read_with(cx, |thread, cx| {
3202 assert_eq!(
3203 thread.to_markdown(cx),
3204 indoc! {"
3205 ## User (checkpoint)
3206
3207 Lorem
3208
3209 ## Assistant
3210
3211 LOREM
3212
3213 ## User (checkpoint)
3214
3215 ipsum
3216
3217 ## Assistant
3218
3219 IPSUM
3220
3221 ## User
3222
3223 dolor
3224
3225 ## Assistant
3226
3227 DOLOR
3228
3229 "}
3230 );
3231 });
3232 assert_eq!(
3233 fs.files(),
3234 vec![
3235 Path::new(path!("/test/file-0")),
3236 Path::new(path!("/test/file-1"))
3237 ]
3238 );
3239
3240 // Rewinding the conversation truncates the history and restores the checkpoint.
3241 thread
3242 .update(cx, |thread, cx| {
3243 let AgentThreadEntry::UserMessage(message) = &thread.entries[2] else {
3244 panic!("unexpected entries {:?}", thread.entries)
3245 };
3246 thread.restore_checkpoint(message.id.clone().unwrap(), cx)
3247 })
3248 .await
3249 .unwrap();
3250 thread.read_with(cx, |thread, cx| {
3251 assert_eq!(
3252 thread.to_markdown(cx),
3253 indoc! {"
3254 ## User (checkpoint)
3255
3256 Lorem
3257
3258 ## Assistant
3259
3260 LOREM
3261
3262 "}
3263 );
3264 });
3265 assert_eq!(fs.files(), vec![Path::new(path!("/test/file-0"))]);
3266 }
3267
3268 #[gpui::test]
3269 async fn test_tool_result_refusal(cx: &mut TestAppContext) {
3270 use std::sync::atomic::AtomicUsize;
3271 init_test(cx);
3272
3273 let fs = FakeFs::new(cx.executor());
3274 let project = Project::test(fs, None, cx).await;
3275
3276 // Create a connection that simulates refusal after tool result
3277 let prompt_count = Arc::new(AtomicUsize::new(0));
3278 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3279 let prompt_count = prompt_count.clone();
3280 move |_request, thread, mut cx| {
3281 let count = prompt_count.fetch_add(1, SeqCst);
3282 async move {
3283 if count == 0 {
3284 // First prompt: Generate a tool call with result
3285 thread.update(&mut cx, |thread, cx| {
3286 thread
3287 .handle_session_update(
3288 acp::SessionUpdate::ToolCall(acp::ToolCall {
3289 id: acp::ToolCallId("tool1".into()),
3290 title: "Test Tool".into(),
3291 kind: acp::ToolKind::Fetch,
3292 status: acp::ToolCallStatus::Completed,
3293 content: vec![],
3294 locations: vec![],
3295 raw_input: Some(serde_json::json!({"query": "test"})),
3296 raw_output: Some(
3297 serde_json::json!({"result": "inappropriate content"}),
3298 ),
3299 meta: None,
3300 }),
3301 cx,
3302 )
3303 .unwrap();
3304 })?;
3305
3306 // Now return refusal because of the tool result
3307 Ok(acp::PromptResponse {
3308 stop_reason: acp::StopReason::Refusal,
3309 meta: None,
3310 })
3311 } else {
3312 Ok(acp::PromptResponse {
3313 stop_reason: acp::StopReason::EndTurn,
3314 meta: None,
3315 })
3316 }
3317 }
3318 .boxed_local()
3319 }
3320 }));
3321
3322 let thread = cx
3323 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3324 .await
3325 .unwrap();
3326
3327 // Track if we see a Refusal event
3328 let saw_refusal_event = Arc::new(std::sync::Mutex::new(false));
3329 let saw_refusal_event_captured = saw_refusal_event.clone();
3330 thread.update(cx, |_thread, cx| {
3331 cx.subscribe(
3332 &thread,
3333 move |_thread, _event_thread, event: &AcpThreadEvent, _cx| {
3334 if matches!(event, AcpThreadEvent::Refusal) {
3335 *saw_refusal_event_captured.lock().unwrap() = true;
3336 }
3337 },
3338 )
3339 .detach();
3340 });
3341
3342 // Send a user message - this will trigger tool call and then refusal
3343 let send_task = thread.update(cx, |thread, cx| {
3344 thread.send(
3345 vec![acp::ContentBlock::Text(acp::TextContent {
3346 text: "Hello".into(),
3347 annotations: None,
3348 meta: None,
3349 })],
3350 cx,
3351 )
3352 });
3353 cx.background_executor.spawn(send_task).detach();
3354 cx.run_until_parked();
3355
3356 // Verify that:
3357 // 1. A Refusal event WAS emitted (because it's a tool result refusal, not user prompt)
3358 // 2. The user message was NOT truncated
3359 assert!(
3360 *saw_refusal_event.lock().unwrap(),
3361 "Refusal event should be emitted for tool result refusals"
3362 );
3363
3364 thread.read_with(cx, |thread, _| {
3365 let entries = thread.entries();
3366 assert!(entries.len() >= 2, "Should have user message and tool call");
3367
3368 // Verify user message is still there
3369 assert!(
3370 matches!(entries[0], AgentThreadEntry::UserMessage(_)),
3371 "User message should not be truncated"
3372 );
3373
3374 // Verify tool call is there with result
3375 if let AgentThreadEntry::ToolCall(tool_call) = &entries[1] {
3376 assert!(
3377 tool_call.raw_output.is_some(),
3378 "Tool call should have output"
3379 );
3380 } else {
3381 panic!("Expected tool call at index 1");
3382 }
3383 });
3384 }
3385
3386 #[gpui::test]
3387 async fn test_user_prompt_refusal_emits_event(cx: &mut TestAppContext) {
3388 init_test(cx);
3389
3390 let fs = FakeFs::new(cx.executor());
3391 let project = Project::test(fs, None, cx).await;
3392
3393 let refuse_next = Arc::new(AtomicBool::new(false));
3394 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3395 let refuse_next = refuse_next.clone();
3396 move |_request, _thread, _cx| {
3397 if refuse_next.load(SeqCst) {
3398 async move {
3399 Ok(acp::PromptResponse {
3400 stop_reason: acp::StopReason::Refusal,
3401 meta: None,
3402 })
3403 }
3404 .boxed_local()
3405 } else {
3406 async move {
3407 Ok(acp::PromptResponse {
3408 stop_reason: acp::StopReason::EndTurn,
3409 meta: None,
3410 })
3411 }
3412 .boxed_local()
3413 }
3414 }
3415 }));
3416
3417 let thread = cx
3418 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3419 .await
3420 .unwrap();
3421
3422 // Track if we see a Refusal event
3423 let saw_refusal_event = Arc::new(std::sync::Mutex::new(false));
3424 let saw_refusal_event_captured = saw_refusal_event.clone();
3425 thread.update(cx, |_thread, cx| {
3426 cx.subscribe(
3427 &thread,
3428 move |_thread, _event_thread, event: &AcpThreadEvent, _cx| {
3429 if matches!(event, AcpThreadEvent::Refusal) {
3430 *saw_refusal_event_captured.lock().unwrap() = true;
3431 }
3432 },
3433 )
3434 .detach();
3435 });
3436
3437 // Send a message that will be refused
3438 refuse_next.store(true, SeqCst);
3439 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["hello".into()], cx)))
3440 .await
3441 .unwrap();
3442
3443 // Verify that a Refusal event WAS emitted for user prompt refusal
3444 assert!(
3445 *saw_refusal_event.lock().unwrap(),
3446 "Refusal event should be emitted for user prompt refusals"
3447 );
3448
3449 // Verify the message was truncated (user prompt refusal)
3450 thread.read_with(cx, |thread, cx| {
3451 assert_eq!(thread.to_markdown(cx), "");
3452 });
3453 }
3454
3455 #[gpui::test]
3456 async fn test_refusal(cx: &mut TestAppContext) {
3457 init_test(cx);
3458 let fs = FakeFs::new(cx.background_executor.clone());
3459 fs.insert_tree(path!("/"), json!({})).await;
3460 let project = Project::test(fs.clone(), [path!("/").as_ref()], cx).await;
3461
3462 let refuse_next = Arc::new(AtomicBool::new(false));
3463 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3464 let refuse_next = refuse_next.clone();
3465 move |request, thread, mut cx| {
3466 let refuse_next = refuse_next.clone();
3467 async move {
3468 if refuse_next.load(SeqCst) {
3469 return Ok(acp::PromptResponse {
3470 stop_reason: acp::StopReason::Refusal,
3471 meta: None,
3472 });
3473 }
3474
3475 let acp::ContentBlock::Text(content) = &request.prompt[0] else {
3476 panic!("expected text content block");
3477 };
3478 thread.update(&mut cx, |thread, cx| {
3479 thread
3480 .handle_session_update(
3481 acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk {
3482 content: content.text.to_uppercase().into(),
3483 meta: None,
3484 }),
3485 cx,
3486 )
3487 .unwrap();
3488 })?;
3489 Ok(acp::PromptResponse {
3490 stop_reason: acp::StopReason::EndTurn,
3491 meta: None,
3492 })
3493 }
3494 .boxed_local()
3495 }
3496 }));
3497 let thread = cx
3498 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3499 .await
3500 .unwrap();
3501
3502 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["hello".into()], cx)))
3503 .await
3504 .unwrap();
3505 thread.read_with(cx, |thread, cx| {
3506 assert_eq!(
3507 thread.to_markdown(cx),
3508 indoc! {"
3509 ## User
3510
3511 hello
3512
3513 ## Assistant
3514
3515 HELLO
3516
3517 "}
3518 );
3519 });
3520
3521 // Simulate refusing the second message. The message should be truncated
3522 // when a user prompt is refused.
3523 refuse_next.store(true, SeqCst);
3524 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["world".into()], cx)))
3525 .await
3526 .unwrap();
3527 thread.read_with(cx, |thread, cx| {
3528 assert_eq!(
3529 thread.to_markdown(cx),
3530 indoc! {"
3531 ## User
3532
3533 hello
3534
3535 ## Assistant
3536
3537 HELLO
3538
3539 "}
3540 );
3541 });
3542 }
3543
3544 async fn run_until_first_tool_call(
3545 thread: &Entity<AcpThread>,
3546 cx: &mut TestAppContext,
3547 ) -> usize {
3548 let (mut tx, mut rx) = mpsc::channel::<usize>(1);
3549
3550 let subscription = cx.update(|cx| {
3551 cx.subscribe(thread, move |thread, _, cx| {
3552 for (ix, entry) in thread.read(cx).entries.iter().enumerate() {
3553 if matches!(entry, AgentThreadEntry::ToolCall(_)) {
3554 return tx.try_send(ix).unwrap();
3555 }
3556 }
3557 })
3558 });
3559
3560 select! {
3561 _ = futures::FutureExt::fuse(smol::Timer::after(Duration::from_secs(10))) => {
3562 panic!("Timeout waiting for tool call")
3563 }
3564 ix = rx.next().fuse() => {
3565 drop(subscription);
3566 ix.unwrap()
3567 }
3568 }
3569 }
3570
3571 #[derive(Clone, Default)]
3572 struct FakeAgentConnection {
3573 auth_methods: Vec<acp::AuthMethod>,
3574 sessions: Arc<parking_lot::Mutex<HashMap<acp::SessionId, WeakEntity<AcpThread>>>>,
3575 on_user_message: Option<
3576 Rc<
3577 dyn Fn(
3578 acp::PromptRequest,
3579 WeakEntity<AcpThread>,
3580 AsyncApp,
3581 ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
3582 + 'static,
3583 >,
3584 >,
3585 }
3586
3587 impl FakeAgentConnection {
3588 fn new() -> Self {
3589 Self {
3590 auth_methods: Vec::new(),
3591 on_user_message: None,
3592 sessions: Arc::default(),
3593 }
3594 }
3595
3596 #[expect(unused)]
3597 fn with_auth_methods(mut self, auth_methods: Vec<acp::AuthMethod>) -> Self {
3598 self.auth_methods = auth_methods;
3599 self
3600 }
3601
3602 fn on_user_message(
3603 mut self,
3604 handler: impl Fn(
3605 acp::PromptRequest,
3606 WeakEntity<AcpThread>,
3607 AsyncApp,
3608 ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
3609 + 'static,
3610 ) -> Self {
3611 self.on_user_message.replace(Rc::new(handler));
3612 self
3613 }
3614 }
3615
3616 impl AgentConnection for FakeAgentConnection {
3617 fn auth_methods(&self) -> &[acp::AuthMethod] {
3618 &self.auth_methods
3619 }
3620
3621 fn new_thread(
3622 self: Rc<Self>,
3623 project: Entity<Project>,
3624 _cwd: &Path,
3625 cx: &mut App,
3626 ) -> Task<gpui::Result<Entity<AcpThread>>> {
3627 let session_id = acp::SessionId(
3628 rand::rng()
3629 .sample_iter(&distr::Alphanumeric)
3630 .take(7)
3631 .map(char::from)
3632 .collect::<String>()
3633 .into(),
3634 );
3635 let action_log = cx.new(|_| ActionLog::new(project.clone()));
3636 let thread = cx.new(|cx| {
3637 AcpThread::new(
3638 "Test",
3639 self.clone(),
3640 project,
3641 action_log,
3642 session_id.clone(),
3643 watch::Receiver::constant(acp::PromptCapabilities {
3644 image: true,
3645 audio: true,
3646 embedded_context: true,
3647 meta: None,
3648 }),
3649 cx,
3650 )
3651 });
3652 self.sessions.lock().insert(session_id, thread.downgrade());
3653 Task::ready(Ok(thread))
3654 }
3655
3656 fn authenticate(&self, method: acp::AuthMethodId, _cx: &mut App) -> Task<gpui::Result<()>> {
3657 if self.auth_methods().iter().any(|m| m.id == method) {
3658 Task::ready(Ok(()))
3659 } else {
3660 Task::ready(Err(anyhow!("Invalid Auth Method")))
3661 }
3662 }
3663
3664 fn prompt(
3665 &self,
3666 _id: Option<UserMessageId>,
3667 params: acp::PromptRequest,
3668 cx: &mut App,
3669 ) -> Task<gpui::Result<acp::PromptResponse>> {
3670 let sessions = self.sessions.lock();
3671 let thread = sessions.get(¶ms.session_id).unwrap();
3672 if let Some(handler) = &self.on_user_message {
3673 let handler = handler.clone();
3674 let thread = thread.clone();
3675 cx.spawn(async move |cx| handler(params, thread, cx.clone()).await)
3676 } else {
3677 Task::ready(Ok(acp::PromptResponse {
3678 stop_reason: acp::StopReason::EndTurn,
3679 meta: None,
3680 }))
3681 }
3682 }
3683
3684 fn cancel(&self, session_id: &acp::SessionId, cx: &mut App) {
3685 let sessions = self.sessions.lock();
3686 let thread = sessions.get(session_id).unwrap().clone();
3687
3688 cx.spawn(async move |cx| {
3689 thread
3690 .update(cx, |thread, cx| thread.cancel(cx))
3691 .unwrap()
3692 .await
3693 })
3694 .detach();
3695 }
3696
3697 fn truncate(
3698 &self,
3699 session_id: &acp::SessionId,
3700 _cx: &App,
3701 ) -> Option<Rc<dyn AgentSessionTruncate>> {
3702 Some(Rc::new(FakeAgentSessionEditor {
3703 _session_id: session_id.clone(),
3704 }))
3705 }
3706
3707 fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
3708 self
3709 }
3710 }
3711
3712 struct FakeAgentSessionEditor {
3713 _session_id: acp::SessionId,
3714 }
3715
3716 impl AgentSessionTruncate for FakeAgentSessionEditor {
3717 fn run(&self, _message_id: UserMessageId, _cx: &mut App) -> Task<Result<()>> {
3718 Task::ready(Ok(()))
3719 }
3720 }
3721
3722 #[gpui::test]
3723 async fn test_tool_call_not_found_creates_failed_entry(cx: &mut TestAppContext) {
3724 init_test(cx);
3725
3726 let fs = FakeFs::new(cx.executor());
3727 let project = Project::test(fs, [], cx).await;
3728 let connection = Rc::new(FakeAgentConnection::new());
3729 let thread = cx
3730 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3731 .await
3732 .unwrap();
3733
3734 // Try to update a tool call that doesn't exist
3735 let nonexistent_id = acp::ToolCallId("nonexistent-tool-call".into());
3736 thread.update(cx, |thread, cx| {
3737 let result = thread.handle_session_update(
3738 acp::SessionUpdate::ToolCallUpdate(acp::ToolCallUpdate {
3739 id: nonexistent_id.clone(),
3740 fields: acp::ToolCallUpdateFields {
3741 status: Some(acp::ToolCallStatus::Completed),
3742 ..Default::default()
3743 },
3744 meta: None,
3745 }),
3746 cx,
3747 );
3748
3749 // The update should succeed (not return an error)
3750 assert!(result.is_ok());
3751
3752 // There should now be exactly one entry in the thread
3753 assert_eq!(thread.entries.len(), 1);
3754
3755 // The entry should be a failed tool call
3756 if let AgentThreadEntry::ToolCall(tool_call) = &thread.entries[0] {
3757 assert_eq!(tool_call.id, nonexistent_id);
3758 assert!(matches!(tool_call.status, ToolCallStatus::Failed));
3759 assert_eq!(tool_call.kind, acp::ToolKind::Fetch);
3760
3761 // Check that the content contains the error message
3762 assert_eq!(tool_call.content.len(), 1);
3763 if let ToolCallContent::ContentBlock(content_block) = &tool_call.content[0] {
3764 match content_block {
3765 ContentBlock::Markdown { markdown } => {
3766 let markdown_text = markdown.read(cx).source();
3767 assert!(markdown_text.contains("Tool call not found"));
3768 }
3769 ContentBlock::Empty => panic!("Expected markdown content, got empty"),
3770 ContentBlock::ResourceLink { .. } => {
3771 panic!("Expected markdown content, got resource link")
3772 }
3773 }
3774 } else {
3775 panic!("Expected ContentBlock, got: {:?}", tool_call.content[0]);
3776 }
3777 } else {
3778 panic!("Expected ToolCall entry, got: {:?}", thread.entries[0]);
3779 }
3780 });
3781 }
3782}