1mod connection;
2mod diff;
3mod mention;
4mod terminal;
5
6use ::terminal::terminal_settings::TerminalSettings;
7use agent_settings::AgentSettings;
8use collections::HashSet;
9pub use connection::*;
10pub use diff::*;
11use language::language_settings::FormatOnSave;
12pub use mention::*;
13use project::lsp_store::{FormatTrigger, LspFormatTarget};
14use serde::{Deserialize, Serialize};
15use settings::Settings as _;
16use task::{Shell, ShellBuilder};
17pub use terminal::*;
18
19use action_log::ActionLog;
20use agent_client_protocol::{self as acp};
21use anyhow::{Context as _, Result, anyhow};
22use editor::Bias;
23use futures::{FutureExt, channel::oneshot, future::BoxFuture};
24use gpui::{AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Task, WeakEntity};
25use itertools::Itertools;
26use language::{Anchor, Buffer, BufferSnapshot, LanguageRegistry, Point, ToPoint, text_diff};
27use markdown::Markdown;
28use project::{AgentLocation, Project, git_store::GitStoreCheckpoint};
29use std::collections::HashMap;
30use std::error::Error;
31use std::fmt::{Formatter, Write};
32use std::ops::Range;
33use std::process::ExitStatus;
34use std::rc::Rc;
35use std::time::{Duration, Instant};
36use std::{fmt::Display, mem, path::PathBuf, sync::Arc};
37use ui::App;
38use util::{ResultExt, get_default_system_shell};
39use uuid::Uuid;
40
41#[derive(Debug)]
42pub struct UserMessage {
43 pub id: Option<UserMessageId>,
44 pub content: ContentBlock,
45 pub chunks: Vec<acp::ContentBlock>,
46 pub checkpoint: Option<Checkpoint>,
47}
48
49#[derive(Debug)]
50pub struct Checkpoint {
51 git_checkpoint: GitStoreCheckpoint,
52 pub show: bool,
53}
54
55impl UserMessage {
56 fn to_markdown(&self, cx: &App) -> String {
57 let mut markdown = String::new();
58 if self
59 .checkpoint
60 .as_ref()
61 .is_some_and(|checkpoint| checkpoint.show)
62 {
63 writeln!(markdown, "## User (checkpoint)").unwrap();
64 } else {
65 writeln!(markdown, "## User").unwrap();
66 }
67 writeln!(markdown).unwrap();
68 writeln!(markdown, "{}", self.content.to_markdown(cx)).unwrap();
69 writeln!(markdown).unwrap();
70 markdown
71 }
72}
73
74#[derive(Debug, PartialEq)]
75pub struct AssistantMessage {
76 pub chunks: Vec<AssistantMessageChunk>,
77}
78
79impl AssistantMessage {
80 pub fn to_markdown(&self, cx: &App) -> String {
81 format!(
82 "## Assistant\n\n{}\n\n",
83 self.chunks
84 .iter()
85 .map(|chunk| chunk.to_markdown(cx))
86 .join("\n\n")
87 )
88 }
89}
90
91#[derive(Debug, PartialEq)]
92pub enum AssistantMessageChunk {
93 Message { block: ContentBlock },
94 Thought { block: ContentBlock },
95}
96
97impl AssistantMessageChunk {
98 pub fn from_str(chunk: &str, language_registry: &Arc<LanguageRegistry>, cx: &mut App) -> Self {
99 Self::Message {
100 block: ContentBlock::new(chunk.into(), language_registry, cx),
101 }
102 }
103
104 fn to_markdown(&self, cx: &App) -> String {
105 match self {
106 Self::Message { block } => block.to_markdown(cx).to_string(),
107 Self::Thought { block } => {
108 format!("<thinking>\n{}\n</thinking>", block.to_markdown(cx))
109 }
110 }
111 }
112}
113
114#[derive(Debug)]
115pub enum AgentThreadEntry {
116 UserMessage(UserMessage),
117 AssistantMessage(AssistantMessage),
118 ToolCall(ToolCall),
119}
120
121impl AgentThreadEntry {
122 pub fn to_markdown(&self, cx: &App) -> String {
123 match self {
124 Self::UserMessage(message) => message.to_markdown(cx),
125 Self::AssistantMessage(message) => message.to_markdown(cx),
126 Self::ToolCall(tool_call) => tool_call.to_markdown(cx),
127 }
128 }
129
130 pub fn user_message(&self) -> Option<&UserMessage> {
131 if let AgentThreadEntry::UserMessage(message) = self {
132 Some(message)
133 } else {
134 None
135 }
136 }
137
138 pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
139 if let AgentThreadEntry::ToolCall(call) = self {
140 itertools::Either::Left(call.diffs())
141 } else {
142 itertools::Either::Right(std::iter::empty())
143 }
144 }
145
146 pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
147 if let AgentThreadEntry::ToolCall(call) = self {
148 itertools::Either::Left(call.terminals())
149 } else {
150 itertools::Either::Right(std::iter::empty())
151 }
152 }
153
154 pub fn location(&self, ix: usize) -> Option<(acp::ToolCallLocation, AgentLocation)> {
155 if let AgentThreadEntry::ToolCall(ToolCall {
156 locations,
157 resolved_locations,
158 ..
159 }) = self
160 {
161 Some((
162 locations.get(ix)?.clone(),
163 resolved_locations.get(ix)?.clone()?,
164 ))
165 } else {
166 None
167 }
168 }
169}
170
171#[derive(Debug)]
172pub struct ToolCall {
173 pub id: acp::ToolCallId,
174 pub label: Entity<Markdown>,
175 pub kind: acp::ToolKind,
176 pub content: Vec<ToolCallContent>,
177 pub status: ToolCallStatus,
178 pub locations: Vec<acp::ToolCallLocation>,
179 pub resolved_locations: Vec<Option<AgentLocation>>,
180 pub raw_input: Option<serde_json::Value>,
181 pub raw_output: Option<serde_json::Value>,
182}
183
184impl ToolCall {
185 fn from_acp(
186 tool_call: acp::ToolCall,
187 status: ToolCallStatus,
188 language_registry: Arc<LanguageRegistry>,
189 terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
190 cx: &mut App,
191 ) -> Result<Self> {
192 let title = if let Some((first_line, _)) = tool_call.title.split_once("\n") {
193 first_line.to_owned() + "…"
194 } else {
195 tool_call.title
196 };
197 let mut content = Vec::with_capacity(tool_call.content.len());
198 for item in tool_call.content {
199 content.push(ToolCallContent::from_acp(
200 item,
201 language_registry.clone(),
202 terminals,
203 cx,
204 )?);
205 }
206
207 let result = Self {
208 id: tool_call.id,
209 label: cx
210 .new(|cx| Markdown::new(title.into(), Some(language_registry.clone()), None, cx)),
211 kind: tool_call.kind,
212 content,
213 locations: tool_call.locations,
214 resolved_locations: Vec::default(),
215 status,
216 raw_input: tool_call.raw_input,
217 raw_output: tool_call.raw_output,
218 };
219 Ok(result)
220 }
221
222 fn update_fields(
223 &mut self,
224 fields: acp::ToolCallUpdateFields,
225 language_registry: Arc<LanguageRegistry>,
226 terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
227 cx: &mut App,
228 ) -> Result<()> {
229 let acp::ToolCallUpdateFields {
230 kind,
231 status,
232 title,
233 content,
234 locations,
235 raw_input,
236 raw_output,
237 } = fields;
238
239 if let Some(kind) = kind {
240 self.kind = kind;
241 }
242
243 if let Some(status) = status {
244 self.status = status.into();
245 }
246
247 if let Some(title) = title {
248 self.label.update(cx, |label, cx| {
249 if let Some((first_line, _)) = title.split_once("\n") {
250 label.replace(first_line.to_owned() + "…", cx)
251 } else {
252 label.replace(title, cx);
253 }
254 });
255 }
256
257 if let Some(content) = content {
258 let new_content_len = content.len();
259 let mut content = content.into_iter();
260
261 // Reuse existing content if we can
262 for (old, new) in self.content.iter_mut().zip(content.by_ref()) {
263 old.update_from_acp(new, language_registry.clone(), terminals, cx)?;
264 }
265 for new in content {
266 self.content.push(ToolCallContent::from_acp(
267 new,
268 language_registry.clone(),
269 terminals,
270 cx,
271 )?)
272 }
273 self.content.truncate(new_content_len);
274 }
275
276 if let Some(locations) = locations {
277 self.locations = locations;
278 }
279
280 if let Some(raw_input) = raw_input {
281 self.raw_input = Some(raw_input);
282 }
283
284 if let Some(raw_output) = raw_output {
285 if self.content.is_empty()
286 && let Some(markdown) = markdown_for_raw_output(&raw_output, &language_registry, cx)
287 {
288 self.content
289 .push(ToolCallContent::ContentBlock(ContentBlock::Markdown {
290 markdown,
291 }));
292 }
293 self.raw_output = Some(raw_output);
294 }
295 Ok(())
296 }
297
298 pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
299 self.content.iter().filter_map(|content| match content {
300 ToolCallContent::Diff(diff) => Some(diff),
301 ToolCallContent::ContentBlock(_) => None,
302 ToolCallContent::Terminal(_) => None,
303 })
304 }
305
306 pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
307 self.content.iter().filter_map(|content| match content {
308 ToolCallContent::Terminal(terminal) => Some(terminal),
309 ToolCallContent::ContentBlock(_) => None,
310 ToolCallContent::Diff(_) => None,
311 })
312 }
313
314 fn to_markdown(&self, cx: &App) -> String {
315 let mut markdown = format!(
316 "**Tool Call: {}**\nStatus: {}\n\n",
317 self.label.read(cx).source(),
318 self.status
319 );
320 for content in &self.content {
321 markdown.push_str(content.to_markdown(cx).as_str());
322 markdown.push_str("\n\n");
323 }
324 markdown
325 }
326
327 async fn resolve_location(
328 location: acp::ToolCallLocation,
329 project: WeakEntity<Project>,
330 cx: &mut AsyncApp,
331 ) -> Option<AgentLocation> {
332 let buffer = project
333 .update(cx, |project, cx| {
334 project
335 .project_path_for_absolute_path(&location.path, cx)
336 .map(|path| project.open_buffer(path, cx))
337 })
338 .ok()??;
339 let buffer = buffer.await.log_err()?;
340 let position = buffer
341 .update(cx, |buffer, _| {
342 if let Some(row) = location.line {
343 let snapshot = buffer.snapshot();
344 let column = snapshot.indent_size_for_line(row).len;
345 let point = snapshot.clip_point(Point::new(row, column), Bias::Left);
346 snapshot.anchor_before(point)
347 } else {
348 Anchor::MIN
349 }
350 })
351 .ok()?;
352
353 Some(AgentLocation {
354 buffer: buffer.downgrade(),
355 position,
356 })
357 }
358
359 fn resolve_locations(
360 &self,
361 project: Entity<Project>,
362 cx: &mut App,
363 ) -> Task<Vec<Option<AgentLocation>>> {
364 let locations = self.locations.clone();
365 project.update(cx, |_, cx| {
366 cx.spawn(async move |project, cx| {
367 let mut new_locations = Vec::new();
368 for location in locations {
369 new_locations.push(Self::resolve_location(location, project.clone(), cx).await);
370 }
371 new_locations
372 })
373 })
374 }
375}
376
377#[derive(Debug)]
378pub enum ToolCallStatus {
379 /// The tool call hasn't started running yet, but we start showing it to
380 /// the user.
381 Pending,
382 /// The tool call is waiting for confirmation from the user.
383 WaitingForConfirmation {
384 options: Vec<acp::PermissionOption>,
385 respond_tx: oneshot::Sender<acp::PermissionOptionId>,
386 },
387 /// The tool call is currently running.
388 InProgress,
389 /// The tool call completed successfully.
390 Completed,
391 /// The tool call failed.
392 Failed,
393 /// The user rejected the tool call.
394 Rejected,
395 /// The user canceled generation so the tool call was canceled.
396 Canceled,
397}
398
399impl From<acp::ToolCallStatus> for ToolCallStatus {
400 fn from(status: acp::ToolCallStatus) -> Self {
401 match status {
402 acp::ToolCallStatus::Pending => Self::Pending,
403 acp::ToolCallStatus::InProgress => Self::InProgress,
404 acp::ToolCallStatus::Completed => Self::Completed,
405 acp::ToolCallStatus::Failed => Self::Failed,
406 }
407 }
408}
409
410impl Display for ToolCallStatus {
411 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
412 write!(
413 f,
414 "{}",
415 match self {
416 ToolCallStatus::Pending => "Pending",
417 ToolCallStatus::WaitingForConfirmation { .. } => "Waiting for confirmation",
418 ToolCallStatus::InProgress => "In Progress",
419 ToolCallStatus::Completed => "Completed",
420 ToolCallStatus::Failed => "Failed",
421 ToolCallStatus::Rejected => "Rejected",
422 ToolCallStatus::Canceled => "Canceled",
423 }
424 )
425 }
426}
427
428#[derive(Debug, PartialEq, Clone)]
429pub enum ContentBlock {
430 Empty,
431 Markdown { markdown: Entity<Markdown> },
432 ResourceLink { resource_link: acp::ResourceLink },
433}
434
435impl ContentBlock {
436 pub fn new(
437 block: acp::ContentBlock,
438 language_registry: &Arc<LanguageRegistry>,
439 cx: &mut App,
440 ) -> Self {
441 let mut this = Self::Empty;
442 this.append(block, language_registry, cx);
443 this
444 }
445
446 pub fn new_combined(
447 blocks: impl IntoIterator<Item = acp::ContentBlock>,
448 language_registry: Arc<LanguageRegistry>,
449 cx: &mut App,
450 ) -> Self {
451 let mut this = Self::Empty;
452 for block in blocks {
453 this.append(block, &language_registry, cx);
454 }
455 this
456 }
457
458 pub fn append(
459 &mut self,
460 block: acp::ContentBlock,
461 language_registry: &Arc<LanguageRegistry>,
462 cx: &mut App,
463 ) {
464 if matches!(self, ContentBlock::Empty)
465 && let acp::ContentBlock::ResourceLink(resource_link) = block
466 {
467 *self = ContentBlock::ResourceLink { resource_link };
468 return;
469 }
470
471 let new_content = self.block_string_contents(block);
472
473 match self {
474 ContentBlock::Empty => {
475 *self = Self::create_markdown_block(new_content, language_registry, cx);
476 }
477 ContentBlock::Markdown { markdown } => {
478 markdown.update(cx, |markdown, cx| markdown.append(&new_content, cx));
479 }
480 ContentBlock::ResourceLink { resource_link } => {
481 let existing_content = Self::resource_link_md(&resource_link.uri);
482 let combined = format!("{}\n{}", existing_content, new_content);
483
484 *self = Self::create_markdown_block(combined, language_registry, cx);
485 }
486 }
487 }
488
489 fn create_markdown_block(
490 content: String,
491 language_registry: &Arc<LanguageRegistry>,
492 cx: &mut App,
493 ) -> ContentBlock {
494 ContentBlock::Markdown {
495 markdown: cx
496 .new(|cx| Markdown::new(content.into(), Some(language_registry.clone()), None, cx)),
497 }
498 }
499
500 fn block_string_contents(&self, block: acp::ContentBlock) -> String {
501 match block {
502 acp::ContentBlock::Text(text_content) => text_content.text,
503 acp::ContentBlock::ResourceLink(resource_link) => {
504 Self::resource_link_md(&resource_link.uri)
505 }
506 acp::ContentBlock::Resource(acp::EmbeddedResource {
507 resource:
508 acp::EmbeddedResourceResource::TextResourceContents(acp::TextResourceContents {
509 uri,
510 ..
511 }),
512 ..
513 }) => Self::resource_link_md(&uri),
514 acp::ContentBlock::Image(image) => Self::image_md(&image),
515 acp::ContentBlock::Audio(_) | acp::ContentBlock::Resource(_) => String::new(),
516 }
517 }
518
519 fn resource_link_md(uri: &str) -> String {
520 if let Some(uri) = MentionUri::parse(uri).log_err() {
521 uri.as_link().to_string()
522 } else {
523 uri.to_string()
524 }
525 }
526
527 fn image_md(_image: &acp::ImageContent) -> String {
528 "`Image`".into()
529 }
530
531 pub fn to_markdown<'a>(&'a self, cx: &'a App) -> &'a str {
532 match self {
533 ContentBlock::Empty => "",
534 ContentBlock::Markdown { markdown } => markdown.read(cx).source(),
535 ContentBlock::ResourceLink { resource_link } => &resource_link.uri,
536 }
537 }
538
539 pub fn markdown(&self) -> Option<&Entity<Markdown>> {
540 match self {
541 ContentBlock::Empty => None,
542 ContentBlock::Markdown { markdown } => Some(markdown),
543 ContentBlock::ResourceLink { .. } => None,
544 }
545 }
546
547 pub fn resource_link(&self) -> Option<&acp::ResourceLink> {
548 match self {
549 ContentBlock::ResourceLink { resource_link } => Some(resource_link),
550 _ => None,
551 }
552 }
553}
554
555#[derive(Debug)]
556pub enum ToolCallContent {
557 ContentBlock(ContentBlock),
558 Diff(Entity<Diff>),
559 Terminal(Entity<Terminal>),
560}
561
562impl ToolCallContent {
563 pub fn from_acp(
564 content: acp::ToolCallContent,
565 language_registry: Arc<LanguageRegistry>,
566 terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
567 cx: &mut App,
568 ) -> Result<Self> {
569 match content {
570 acp::ToolCallContent::Content { content } => Ok(Self::ContentBlock(ContentBlock::new(
571 content,
572 &language_registry,
573 cx,
574 ))),
575 acp::ToolCallContent::Diff { diff } => Ok(Self::Diff(cx.new(|cx| {
576 Diff::finalized(
577 diff.path.to_string_lossy().into_owned(),
578 diff.old_text,
579 diff.new_text,
580 language_registry,
581 cx,
582 )
583 }))),
584 acp::ToolCallContent::Terminal { terminal_id } => terminals
585 .get(&terminal_id)
586 .cloned()
587 .map(Self::Terminal)
588 .ok_or_else(|| anyhow::anyhow!("Terminal with id `{}` not found", terminal_id)),
589 }
590 }
591
592 pub fn update_from_acp(
593 &mut self,
594 new: acp::ToolCallContent,
595 language_registry: Arc<LanguageRegistry>,
596 terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
597 cx: &mut App,
598 ) -> Result<()> {
599 let needs_update = match (&self, &new) {
600 (Self::Diff(old_diff), acp::ToolCallContent::Diff { diff: new_diff }) => {
601 old_diff.read(cx).needs_update(
602 new_diff.old_text.as_deref().unwrap_or(""),
603 &new_diff.new_text,
604 cx,
605 )
606 }
607 _ => true,
608 };
609
610 if needs_update {
611 *self = Self::from_acp(new, language_registry, terminals, cx)?;
612 }
613 Ok(())
614 }
615
616 pub fn to_markdown(&self, cx: &App) -> String {
617 match self {
618 Self::ContentBlock(content) => content.to_markdown(cx).to_string(),
619 Self::Diff(diff) => diff.read(cx).to_markdown(cx),
620 Self::Terminal(terminal) => terminal.read(cx).to_markdown(cx),
621 }
622 }
623}
624
625#[derive(Debug, PartialEq)]
626pub enum ToolCallUpdate {
627 UpdateFields(acp::ToolCallUpdate),
628 UpdateDiff(ToolCallUpdateDiff),
629 UpdateTerminal(ToolCallUpdateTerminal),
630}
631
632impl ToolCallUpdate {
633 fn id(&self) -> &acp::ToolCallId {
634 match self {
635 Self::UpdateFields(update) => &update.id,
636 Self::UpdateDiff(diff) => &diff.id,
637 Self::UpdateTerminal(terminal) => &terminal.id,
638 }
639 }
640}
641
642impl From<acp::ToolCallUpdate> for ToolCallUpdate {
643 fn from(update: acp::ToolCallUpdate) -> Self {
644 Self::UpdateFields(update)
645 }
646}
647
648impl From<ToolCallUpdateDiff> for ToolCallUpdate {
649 fn from(diff: ToolCallUpdateDiff) -> Self {
650 Self::UpdateDiff(diff)
651 }
652}
653
654#[derive(Debug, PartialEq)]
655pub struct ToolCallUpdateDiff {
656 pub id: acp::ToolCallId,
657 pub diff: Entity<Diff>,
658}
659
660impl From<ToolCallUpdateTerminal> for ToolCallUpdate {
661 fn from(terminal: ToolCallUpdateTerminal) -> Self {
662 Self::UpdateTerminal(terminal)
663 }
664}
665
666#[derive(Debug, PartialEq)]
667pub struct ToolCallUpdateTerminal {
668 pub id: acp::ToolCallId,
669 pub terminal: Entity<Terminal>,
670}
671
672#[derive(Debug, Default)]
673pub struct Plan {
674 pub entries: Vec<PlanEntry>,
675}
676
677#[derive(Debug)]
678pub struct PlanStats<'a> {
679 pub in_progress_entry: Option<&'a PlanEntry>,
680 pub pending: u32,
681 pub completed: u32,
682}
683
684impl Plan {
685 pub fn is_empty(&self) -> bool {
686 self.entries.is_empty()
687 }
688
689 pub fn stats(&self) -> PlanStats<'_> {
690 let mut stats = PlanStats {
691 in_progress_entry: None,
692 pending: 0,
693 completed: 0,
694 };
695
696 for entry in &self.entries {
697 match &entry.status {
698 acp::PlanEntryStatus::Pending => {
699 stats.pending += 1;
700 }
701 acp::PlanEntryStatus::InProgress => {
702 stats.in_progress_entry = stats.in_progress_entry.or(Some(entry));
703 }
704 acp::PlanEntryStatus::Completed => {
705 stats.completed += 1;
706 }
707 }
708 }
709
710 stats
711 }
712}
713
714#[derive(Debug)]
715pub struct PlanEntry {
716 pub content: Entity<Markdown>,
717 pub priority: acp::PlanEntryPriority,
718 pub status: acp::PlanEntryStatus,
719}
720
721impl PlanEntry {
722 pub fn from_acp(entry: acp::PlanEntry, cx: &mut App) -> Self {
723 Self {
724 content: cx.new(|cx| Markdown::new(entry.content.into(), None, None, cx)),
725 priority: entry.priority,
726 status: entry.status,
727 }
728 }
729}
730
731#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
732pub struct TokenUsage {
733 pub max_tokens: u64,
734 pub used_tokens: u64,
735}
736
737impl TokenUsage {
738 pub fn ratio(&self) -> TokenUsageRatio {
739 #[cfg(debug_assertions)]
740 let warning_threshold: f32 = std::env::var("ZED_THREAD_WARNING_THRESHOLD")
741 .unwrap_or("0.8".to_string())
742 .parse()
743 .unwrap();
744 #[cfg(not(debug_assertions))]
745 let warning_threshold: f32 = 0.8;
746
747 // When the maximum is unknown because there is no selected model,
748 // avoid showing the token limit warning.
749 if self.max_tokens == 0 {
750 TokenUsageRatio::Normal
751 } else if self.used_tokens >= self.max_tokens {
752 TokenUsageRatio::Exceeded
753 } else if self.used_tokens as f32 / self.max_tokens as f32 >= warning_threshold {
754 TokenUsageRatio::Warning
755 } else {
756 TokenUsageRatio::Normal
757 }
758 }
759}
760
761#[derive(Debug, Clone, PartialEq, Eq)]
762pub enum TokenUsageRatio {
763 Normal,
764 Warning,
765 Exceeded,
766}
767
768#[derive(Debug, Clone)]
769pub struct RetryStatus {
770 pub last_error: SharedString,
771 pub attempt: usize,
772 pub max_attempts: usize,
773 pub started_at: Instant,
774 pub duration: Duration,
775}
776
777pub struct AcpThread {
778 title: SharedString,
779 entries: Vec<AgentThreadEntry>,
780 plan: Plan,
781 project: Entity<Project>,
782 action_log: Entity<ActionLog>,
783 shared_buffers: HashMap<Entity<Buffer>, BufferSnapshot>,
784 send_task: Option<Task<()>>,
785 connection: Rc<dyn AgentConnection>,
786 session_id: acp::SessionId,
787 token_usage: Option<TokenUsage>,
788 prompt_capabilities: acp::PromptCapabilities,
789 _observe_prompt_capabilities: Task<anyhow::Result<()>>,
790 terminals: HashMap<acp::TerminalId, Entity<Terminal>>,
791 pending_terminal_output: HashMap<acp::TerminalId, Vec<Vec<u8>>>,
792 pending_terminal_exit: HashMap<acp::TerminalId, acp::TerminalExitStatus>,
793}
794
795#[derive(Debug)]
796pub enum AcpThreadEvent {
797 NewEntry,
798 TitleUpdated,
799 TokenUsageUpdated,
800 EntryUpdated(usize),
801 EntriesRemoved(Range<usize>),
802 ToolAuthorizationRequired,
803 Retry(RetryStatus),
804 Stopped,
805 Error,
806 LoadError(LoadError),
807 PromptCapabilitiesUpdated,
808 Refusal,
809 AvailableCommandsUpdated(Vec<acp::AvailableCommand>),
810 ModeUpdated(acp::SessionModeId),
811}
812
813impl EventEmitter<AcpThreadEvent> for AcpThread {}
814
815#[derive(Debug, Clone)]
816pub enum TerminalProviderEvent {
817 Created {
818 terminal_id: acp::TerminalId,
819 label: String,
820 cwd: Option<PathBuf>,
821 output_byte_limit: Option<u64>,
822 terminal: Entity<::terminal::Terminal>,
823 },
824 Output {
825 terminal_id: acp::TerminalId,
826 data: Vec<u8>,
827 },
828 TitleChanged {
829 terminal_id: acp::TerminalId,
830 title: String,
831 },
832 Exit {
833 terminal_id: acp::TerminalId,
834 status: acp::TerminalExitStatus,
835 },
836}
837
838#[derive(Debug, Clone)]
839pub enum TerminalProviderCommand {
840 WriteInput {
841 terminal_id: acp::TerminalId,
842 bytes: Vec<u8>,
843 },
844 Resize {
845 terminal_id: acp::TerminalId,
846 cols: u16,
847 rows: u16,
848 },
849 Close {
850 terminal_id: acp::TerminalId,
851 },
852}
853
854impl AcpThread {
855 pub fn on_terminal_provider_event(
856 &mut self,
857 event: TerminalProviderEvent,
858 cx: &mut Context<Self>,
859 ) {
860 match event {
861 TerminalProviderEvent::Created {
862 terminal_id,
863 label,
864 cwd,
865 output_byte_limit,
866 terminal,
867 } => {
868 let entity = self.register_terminal_created(
869 terminal_id.clone(),
870 label,
871 cwd,
872 output_byte_limit,
873 terminal,
874 cx,
875 );
876
877 if let Some(mut chunks) = self.pending_terminal_output.remove(&terminal_id) {
878 for data in chunks.drain(..) {
879 entity.update(cx, |term, cx| {
880 term.inner().update(cx, |inner, cx| {
881 inner.write_output(&data, cx);
882 })
883 });
884 }
885 }
886
887 if let Some(_status) = self.pending_terminal_exit.remove(&terminal_id) {
888 entity.update(cx, |_term, cx| {
889 cx.notify();
890 });
891 }
892
893 cx.notify();
894 }
895 TerminalProviderEvent::Output { terminal_id, data } => {
896 if let Some(entity) = self.terminals.get(&terminal_id) {
897 entity.update(cx, |term, cx| {
898 term.inner().update(cx, |inner, cx| {
899 inner.write_output(&data, cx);
900 })
901 });
902 } else {
903 self.pending_terminal_output
904 .entry(terminal_id)
905 .or_default()
906 .push(data);
907 }
908 }
909 TerminalProviderEvent::TitleChanged { terminal_id, title } => {
910 if let Some(entity) = self.terminals.get(&terminal_id) {
911 entity.update(cx, |term, cx| {
912 term.inner().update(cx, |inner, cx| {
913 inner.breadcrumb_text = title;
914 cx.emit(::terminal::Event::BreadcrumbsChanged);
915 })
916 });
917 }
918 }
919 TerminalProviderEvent::Exit {
920 terminal_id,
921 status,
922 } => {
923 if let Some(entity) = self.terminals.get(&terminal_id) {
924 entity.update(cx, |_term, cx| {
925 cx.notify();
926 });
927 } else {
928 self.pending_terminal_exit.insert(terminal_id, status);
929 }
930 }
931 }
932 }
933}
934
935#[derive(PartialEq, Eq, Debug)]
936pub enum ThreadStatus {
937 Idle,
938 Generating,
939}
940
941#[derive(Debug, Clone)]
942pub enum LoadError {
943 Unsupported {
944 command: SharedString,
945 current_version: SharedString,
946 minimum_version: SharedString,
947 },
948 FailedToInstall(SharedString),
949 Exited {
950 status: ExitStatus,
951 },
952 Other(SharedString),
953}
954
955impl Display for LoadError {
956 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
957 match self {
958 LoadError::Unsupported {
959 command: path,
960 current_version,
961 minimum_version,
962 } => {
963 write!(
964 f,
965 "version {current_version} from {path} is not supported (need at least {minimum_version})"
966 )
967 }
968 LoadError::FailedToInstall(msg) => write!(f, "Failed to install: {msg}"),
969 LoadError::Exited { status } => write!(f, "Server exited with status {status}"),
970 LoadError::Other(msg) => write!(f, "{msg}"),
971 }
972 }
973}
974
975impl Error for LoadError {}
976
977impl AcpThread {
978 pub fn new(
979 title: impl Into<SharedString>,
980 connection: Rc<dyn AgentConnection>,
981 project: Entity<Project>,
982 action_log: Entity<ActionLog>,
983 session_id: acp::SessionId,
984 mut prompt_capabilities_rx: watch::Receiver<acp::PromptCapabilities>,
985 cx: &mut Context<Self>,
986 ) -> Self {
987 let prompt_capabilities = prompt_capabilities_rx.borrow().clone();
988 let task = cx.spawn::<_, anyhow::Result<()>>(async move |this, cx| {
989 loop {
990 let caps = prompt_capabilities_rx.recv().await?;
991 this.update(cx, |this, cx| {
992 this.prompt_capabilities = caps;
993 cx.emit(AcpThreadEvent::PromptCapabilitiesUpdated);
994 })?;
995 }
996 });
997
998 Self {
999 action_log,
1000 shared_buffers: Default::default(),
1001 entries: Default::default(),
1002 plan: Default::default(),
1003 title: title.into(),
1004 project,
1005 send_task: None,
1006 connection,
1007 session_id,
1008 token_usage: None,
1009 prompt_capabilities,
1010 _observe_prompt_capabilities: task,
1011 terminals: HashMap::default(),
1012 pending_terminal_output: HashMap::default(),
1013 pending_terminal_exit: HashMap::default(),
1014 }
1015 }
1016
1017 pub fn prompt_capabilities(&self) -> acp::PromptCapabilities {
1018 self.prompt_capabilities.clone()
1019 }
1020
1021 pub fn connection(&self) -> &Rc<dyn AgentConnection> {
1022 &self.connection
1023 }
1024
1025 pub fn action_log(&self) -> &Entity<ActionLog> {
1026 &self.action_log
1027 }
1028
1029 pub fn project(&self) -> &Entity<Project> {
1030 &self.project
1031 }
1032
1033 pub fn title(&self) -> SharedString {
1034 self.title.clone()
1035 }
1036
1037 pub fn entries(&self) -> &[AgentThreadEntry] {
1038 &self.entries
1039 }
1040
1041 pub fn session_id(&self) -> &acp::SessionId {
1042 &self.session_id
1043 }
1044
1045 pub fn status(&self) -> ThreadStatus {
1046 if self.send_task.is_some() {
1047 ThreadStatus::Generating
1048 } else {
1049 ThreadStatus::Idle
1050 }
1051 }
1052
1053 pub fn token_usage(&self) -> Option<&TokenUsage> {
1054 self.token_usage.as_ref()
1055 }
1056
1057 pub fn has_pending_edit_tool_calls(&self) -> bool {
1058 for entry in self.entries.iter().rev() {
1059 match entry {
1060 AgentThreadEntry::UserMessage(_) => return false,
1061 AgentThreadEntry::ToolCall(
1062 call @ ToolCall {
1063 status: ToolCallStatus::InProgress | ToolCallStatus::Pending,
1064 ..
1065 },
1066 ) if call.diffs().next().is_some() => {
1067 return true;
1068 }
1069 AgentThreadEntry::ToolCall(_) | AgentThreadEntry::AssistantMessage(_) => {}
1070 }
1071 }
1072
1073 false
1074 }
1075
1076 pub fn used_tools_since_last_user_message(&self) -> bool {
1077 for entry in self.entries.iter().rev() {
1078 match entry {
1079 AgentThreadEntry::UserMessage(..) => return false,
1080 AgentThreadEntry::AssistantMessage(..) => continue,
1081 AgentThreadEntry::ToolCall(..) => return true,
1082 }
1083 }
1084
1085 false
1086 }
1087
1088 pub fn handle_session_update(
1089 &mut self,
1090 update: acp::SessionUpdate,
1091 cx: &mut Context<Self>,
1092 ) -> Result<(), acp::Error> {
1093 match update {
1094 acp::SessionUpdate::UserMessageChunk { content } => {
1095 self.push_user_content_block(None, content, cx);
1096 }
1097 acp::SessionUpdate::AgentMessageChunk { content } => {
1098 self.push_assistant_content_block(content, false, cx);
1099 }
1100 acp::SessionUpdate::AgentThoughtChunk { content } => {
1101 self.push_assistant_content_block(content, true, cx);
1102 }
1103 acp::SessionUpdate::ToolCall(tool_call) => {
1104 self.upsert_tool_call(tool_call, cx)?;
1105 }
1106 acp::SessionUpdate::ToolCallUpdate(tool_call_update) => {
1107 self.update_tool_call(tool_call_update, cx)?;
1108 }
1109 acp::SessionUpdate::Plan(plan) => {
1110 self.update_plan(plan, cx);
1111 }
1112 acp::SessionUpdate::AvailableCommandsUpdate { available_commands } => {
1113 cx.emit(AcpThreadEvent::AvailableCommandsUpdated(available_commands))
1114 }
1115 acp::SessionUpdate::CurrentModeUpdate { current_mode_id } => {
1116 cx.emit(AcpThreadEvent::ModeUpdated(current_mode_id))
1117 }
1118 }
1119 Ok(())
1120 }
1121
1122 pub fn push_user_content_block(
1123 &mut self,
1124 message_id: Option<UserMessageId>,
1125 chunk: acp::ContentBlock,
1126 cx: &mut Context<Self>,
1127 ) {
1128 let language_registry = self.project.read(cx).languages().clone();
1129 let entries_len = self.entries.len();
1130
1131 if let Some(last_entry) = self.entries.last_mut()
1132 && let AgentThreadEntry::UserMessage(UserMessage {
1133 id,
1134 content,
1135 chunks,
1136 ..
1137 }) = last_entry
1138 {
1139 *id = message_id.or(id.take());
1140 content.append(chunk.clone(), &language_registry, cx);
1141 chunks.push(chunk);
1142 let idx = entries_len - 1;
1143 cx.emit(AcpThreadEvent::EntryUpdated(idx));
1144 } else {
1145 let content = ContentBlock::new(chunk.clone(), &language_registry, cx);
1146 self.push_entry(
1147 AgentThreadEntry::UserMessage(UserMessage {
1148 id: message_id,
1149 content,
1150 chunks: vec![chunk],
1151 checkpoint: None,
1152 }),
1153 cx,
1154 );
1155 }
1156 }
1157
1158 pub fn push_assistant_content_block(
1159 &mut self,
1160 chunk: acp::ContentBlock,
1161 is_thought: bool,
1162 cx: &mut Context<Self>,
1163 ) {
1164 let language_registry = self.project.read(cx).languages().clone();
1165 let entries_len = self.entries.len();
1166 if let Some(last_entry) = self.entries.last_mut()
1167 && let AgentThreadEntry::AssistantMessage(AssistantMessage { chunks }) = last_entry
1168 {
1169 let idx = entries_len - 1;
1170 cx.emit(AcpThreadEvent::EntryUpdated(idx));
1171 match (chunks.last_mut(), is_thought) {
1172 (Some(AssistantMessageChunk::Message { block }), false)
1173 | (Some(AssistantMessageChunk::Thought { block }), true) => {
1174 block.append(chunk, &language_registry, cx)
1175 }
1176 _ => {
1177 let block = ContentBlock::new(chunk, &language_registry, cx);
1178 if is_thought {
1179 chunks.push(AssistantMessageChunk::Thought { block })
1180 } else {
1181 chunks.push(AssistantMessageChunk::Message { block })
1182 }
1183 }
1184 }
1185 } else {
1186 let block = ContentBlock::new(chunk, &language_registry, cx);
1187 let chunk = if is_thought {
1188 AssistantMessageChunk::Thought { block }
1189 } else {
1190 AssistantMessageChunk::Message { block }
1191 };
1192
1193 self.push_entry(
1194 AgentThreadEntry::AssistantMessage(AssistantMessage {
1195 chunks: vec![chunk],
1196 }),
1197 cx,
1198 );
1199 }
1200 }
1201
1202 fn push_entry(&mut self, entry: AgentThreadEntry, cx: &mut Context<Self>) {
1203 self.entries.push(entry);
1204 cx.emit(AcpThreadEvent::NewEntry);
1205 }
1206
1207 pub fn can_set_title(&mut self, cx: &mut Context<Self>) -> bool {
1208 self.connection.set_title(&self.session_id, cx).is_some()
1209 }
1210
1211 pub fn set_title(&mut self, title: SharedString, cx: &mut Context<Self>) -> Task<Result<()>> {
1212 if title != self.title {
1213 self.title = title.clone();
1214 cx.emit(AcpThreadEvent::TitleUpdated);
1215 if let Some(set_title) = self.connection.set_title(&self.session_id, cx) {
1216 return set_title.run(title, cx);
1217 }
1218 }
1219 Task::ready(Ok(()))
1220 }
1221
1222 pub fn update_token_usage(&mut self, usage: Option<TokenUsage>, cx: &mut Context<Self>) {
1223 self.token_usage = usage;
1224 cx.emit(AcpThreadEvent::TokenUsageUpdated);
1225 }
1226
1227 pub fn update_retry_status(&mut self, status: RetryStatus, cx: &mut Context<Self>) {
1228 cx.emit(AcpThreadEvent::Retry(status));
1229 }
1230
1231 pub fn update_tool_call(
1232 &mut self,
1233 update: impl Into<ToolCallUpdate>,
1234 cx: &mut Context<Self>,
1235 ) -> Result<()> {
1236 let update = update.into();
1237 let languages = self.project.read(cx).languages().clone();
1238
1239 let ix = match self.index_for_tool_call(update.id()) {
1240 Some(ix) => ix,
1241 None => {
1242 // Tool call not found - create a failed tool call entry
1243 let failed_tool_call = ToolCall {
1244 id: update.id().clone(),
1245 label: cx.new(|cx| Markdown::new("Tool call not found".into(), None, None, cx)),
1246 kind: acp::ToolKind::Fetch,
1247 content: vec![ToolCallContent::ContentBlock(ContentBlock::new(
1248 acp::ContentBlock::Text(acp::TextContent {
1249 text: "Tool call not found".to_string(),
1250 annotations: None,
1251 meta: None,
1252 }),
1253 &languages,
1254 cx,
1255 ))],
1256 status: ToolCallStatus::Failed,
1257 locations: Vec::new(),
1258 resolved_locations: Vec::new(),
1259 raw_input: None,
1260 raw_output: None,
1261 };
1262 self.push_entry(AgentThreadEntry::ToolCall(failed_tool_call), cx);
1263 return Ok(());
1264 }
1265 };
1266 let AgentThreadEntry::ToolCall(call) = &mut self.entries[ix] else {
1267 unreachable!()
1268 };
1269
1270 match update {
1271 ToolCallUpdate::UpdateFields(update) => {
1272 let location_updated = update.fields.locations.is_some();
1273 call.update_fields(update.fields, languages, &self.terminals, cx)?;
1274 if location_updated {
1275 self.resolve_locations(update.id, cx);
1276 }
1277 }
1278 ToolCallUpdate::UpdateDiff(update) => {
1279 call.content.clear();
1280 call.content.push(ToolCallContent::Diff(update.diff));
1281 }
1282 ToolCallUpdate::UpdateTerminal(update) => {
1283 call.content.clear();
1284 call.content
1285 .push(ToolCallContent::Terminal(update.terminal));
1286 }
1287 }
1288
1289 cx.emit(AcpThreadEvent::EntryUpdated(ix));
1290
1291 Ok(())
1292 }
1293
1294 /// Updates a tool call if id matches an existing entry, otherwise inserts a new one.
1295 pub fn upsert_tool_call(
1296 &mut self,
1297 tool_call: acp::ToolCall,
1298 cx: &mut Context<Self>,
1299 ) -> Result<(), acp::Error> {
1300 let status = tool_call.status.into();
1301 self.upsert_tool_call_inner(tool_call.into(), status, cx)
1302 }
1303
1304 /// Fails if id does not match an existing entry.
1305 pub fn upsert_tool_call_inner(
1306 &mut self,
1307 update: acp::ToolCallUpdate,
1308 status: ToolCallStatus,
1309 cx: &mut Context<Self>,
1310 ) -> Result<(), acp::Error> {
1311 let language_registry = self.project.read(cx).languages().clone();
1312 let id = update.id.clone();
1313
1314 if let Some(ix) = self.index_for_tool_call(&id) {
1315 let AgentThreadEntry::ToolCall(call) = &mut self.entries[ix] else {
1316 unreachable!()
1317 };
1318
1319 call.update_fields(update.fields, language_registry, &self.terminals, cx)?;
1320 call.status = status;
1321
1322 cx.emit(AcpThreadEvent::EntryUpdated(ix));
1323 } else {
1324 let call = ToolCall::from_acp(
1325 update.try_into()?,
1326 status,
1327 language_registry,
1328 &self.terminals,
1329 cx,
1330 )?;
1331 self.push_entry(AgentThreadEntry::ToolCall(call), cx);
1332 };
1333
1334 self.resolve_locations(id, cx);
1335 Ok(())
1336 }
1337
1338 fn index_for_tool_call(&self, id: &acp::ToolCallId) -> Option<usize> {
1339 self.entries
1340 .iter()
1341 .enumerate()
1342 .rev()
1343 .find_map(|(index, entry)| {
1344 if let AgentThreadEntry::ToolCall(tool_call) = entry
1345 && &tool_call.id == id
1346 {
1347 Some(index)
1348 } else {
1349 None
1350 }
1351 })
1352 }
1353
1354 fn tool_call_mut(&mut self, id: &acp::ToolCallId) -> Option<(usize, &mut ToolCall)> {
1355 // The tool call we are looking for is typically the last one, or very close to the end.
1356 // At the moment, it doesn't seem like a hashmap would be a good fit for this use case.
1357 self.entries
1358 .iter_mut()
1359 .enumerate()
1360 .rev()
1361 .find_map(|(index, tool_call)| {
1362 if let AgentThreadEntry::ToolCall(tool_call) = tool_call
1363 && &tool_call.id == id
1364 {
1365 Some((index, tool_call))
1366 } else {
1367 None
1368 }
1369 })
1370 }
1371
1372 pub fn tool_call(&mut self, id: &acp::ToolCallId) -> Option<(usize, &ToolCall)> {
1373 self.entries
1374 .iter()
1375 .enumerate()
1376 .rev()
1377 .find_map(|(index, tool_call)| {
1378 if let AgentThreadEntry::ToolCall(tool_call) = tool_call
1379 && &tool_call.id == id
1380 {
1381 Some((index, tool_call))
1382 } else {
1383 None
1384 }
1385 })
1386 }
1387
1388 pub fn resolve_locations(&mut self, id: acp::ToolCallId, cx: &mut Context<Self>) {
1389 let project = self.project.clone();
1390 let Some((_, tool_call)) = self.tool_call_mut(&id) else {
1391 return;
1392 };
1393 let task = tool_call.resolve_locations(project, cx);
1394 cx.spawn(async move |this, cx| {
1395 let resolved_locations = task.await;
1396 this.update(cx, |this, cx| {
1397 let project = this.project.clone();
1398 let Some((ix, tool_call)) = this.tool_call_mut(&id) else {
1399 return;
1400 };
1401 if let Some(Some(location)) = resolved_locations.last() {
1402 project.update(cx, |project, cx| {
1403 if let Some(agent_location) = project.agent_location() {
1404 let should_ignore = agent_location.buffer == location.buffer
1405 && location
1406 .buffer
1407 .update(cx, |buffer, _| {
1408 let snapshot = buffer.snapshot();
1409 let old_position =
1410 agent_location.position.to_point(&snapshot);
1411 let new_position = location.position.to_point(&snapshot);
1412 // ignore this so that when we get updates from the edit tool
1413 // the position doesn't reset to the startof line
1414 old_position.row == new_position.row
1415 && old_position.column > new_position.column
1416 })
1417 .ok()
1418 .unwrap_or_default();
1419 if !should_ignore {
1420 project.set_agent_location(Some(location.clone()), cx);
1421 }
1422 }
1423 });
1424 }
1425 if tool_call.resolved_locations != resolved_locations {
1426 tool_call.resolved_locations = resolved_locations;
1427 cx.emit(AcpThreadEvent::EntryUpdated(ix));
1428 }
1429 })
1430 })
1431 .detach();
1432 }
1433
1434 pub fn request_tool_call_authorization(
1435 &mut self,
1436 tool_call: acp::ToolCallUpdate,
1437 options: Vec<acp::PermissionOption>,
1438 respect_always_allow_setting: bool,
1439 cx: &mut Context<Self>,
1440 ) -> Result<BoxFuture<'static, acp::RequestPermissionOutcome>> {
1441 let (tx, rx) = oneshot::channel();
1442
1443 if respect_always_allow_setting && AgentSettings::get_global(cx).always_allow_tool_actions {
1444 // Don't use AllowAlways, because then if you were to turn off always_allow_tool_actions,
1445 // some tools would (incorrectly) continue to auto-accept.
1446 if let Some(allow_once_option) = options.iter().find_map(|option| {
1447 if matches!(option.kind, acp::PermissionOptionKind::AllowOnce) {
1448 Some(option.id.clone())
1449 } else {
1450 None
1451 }
1452 }) {
1453 self.upsert_tool_call_inner(tool_call, ToolCallStatus::Pending, cx)?;
1454 return Ok(async {
1455 acp::RequestPermissionOutcome::Selected {
1456 option_id: allow_once_option,
1457 }
1458 }
1459 .boxed());
1460 }
1461 }
1462
1463 let status = ToolCallStatus::WaitingForConfirmation {
1464 options,
1465 respond_tx: tx,
1466 };
1467
1468 self.upsert_tool_call_inner(tool_call, status, cx)?;
1469 cx.emit(AcpThreadEvent::ToolAuthorizationRequired);
1470
1471 let fut = async {
1472 match rx.await {
1473 Ok(option) => acp::RequestPermissionOutcome::Selected { option_id: option },
1474 Err(oneshot::Canceled) => acp::RequestPermissionOutcome::Cancelled,
1475 }
1476 }
1477 .boxed();
1478
1479 Ok(fut)
1480 }
1481
1482 pub fn authorize_tool_call(
1483 &mut self,
1484 id: acp::ToolCallId,
1485 option_id: acp::PermissionOptionId,
1486 option_kind: acp::PermissionOptionKind,
1487 cx: &mut Context<Self>,
1488 ) {
1489 let Some((ix, call)) = self.tool_call_mut(&id) else {
1490 return;
1491 };
1492
1493 let new_status = match option_kind {
1494 acp::PermissionOptionKind::RejectOnce | acp::PermissionOptionKind::RejectAlways => {
1495 ToolCallStatus::Rejected
1496 }
1497 acp::PermissionOptionKind::AllowOnce | acp::PermissionOptionKind::AllowAlways => {
1498 ToolCallStatus::InProgress
1499 }
1500 };
1501
1502 let curr_status = mem::replace(&mut call.status, new_status);
1503
1504 if let ToolCallStatus::WaitingForConfirmation { respond_tx, .. } = curr_status {
1505 respond_tx.send(option_id).log_err();
1506 } else if cfg!(debug_assertions) {
1507 panic!("tried to authorize an already authorized tool call");
1508 }
1509
1510 cx.emit(AcpThreadEvent::EntryUpdated(ix));
1511 }
1512
1513 pub fn first_tool_awaiting_confirmation(&self) -> Option<&ToolCall> {
1514 let mut first_tool_call = None;
1515
1516 for entry in self.entries.iter().rev() {
1517 match &entry {
1518 AgentThreadEntry::ToolCall(call) => {
1519 if let ToolCallStatus::WaitingForConfirmation { .. } = call.status {
1520 first_tool_call = Some(call);
1521 } else {
1522 continue;
1523 }
1524 }
1525 AgentThreadEntry::UserMessage(_) | AgentThreadEntry::AssistantMessage(_) => {
1526 // Reached the beginning of the turn.
1527 // If we had pending permission requests in the previous turn, they have been cancelled.
1528 break;
1529 }
1530 }
1531 }
1532
1533 first_tool_call
1534 }
1535
1536 pub fn plan(&self) -> &Plan {
1537 &self.plan
1538 }
1539
1540 pub fn update_plan(&mut self, request: acp::Plan, cx: &mut Context<Self>) {
1541 let new_entries_len = request.entries.len();
1542 let mut new_entries = request.entries.into_iter();
1543
1544 // Reuse existing markdown to prevent flickering
1545 for (old, new) in self.plan.entries.iter_mut().zip(new_entries.by_ref()) {
1546 let PlanEntry {
1547 content,
1548 priority,
1549 status,
1550 } = old;
1551 content.update(cx, |old, cx| {
1552 old.replace(new.content, cx);
1553 });
1554 *priority = new.priority;
1555 *status = new.status;
1556 }
1557 for new in new_entries {
1558 self.plan.entries.push(PlanEntry::from_acp(new, cx))
1559 }
1560 self.plan.entries.truncate(new_entries_len);
1561
1562 cx.notify();
1563 }
1564
1565 fn clear_completed_plan_entries(&mut self, cx: &mut Context<Self>) {
1566 self.plan
1567 .entries
1568 .retain(|entry| !matches!(entry.status, acp::PlanEntryStatus::Completed));
1569 cx.notify();
1570 }
1571
1572 #[cfg(any(test, feature = "test-support"))]
1573 pub fn send_raw(
1574 &mut self,
1575 message: &str,
1576 cx: &mut Context<Self>,
1577 ) -> BoxFuture<'static, Result<()>> {
1578 self.send(
1579 vec![acp::ContentBlock::Text(acp::TextContent {
1580 text: message.to_string(),
1581 annotations: None,
1582 meta: None,
1583 })],
1584 cx,
1585 )
1586 }
1587
1588 pub fn send(
1589 &mut self,
1590 message: Vec<acp::ContentBlock>,
1591 cx: &mut Context<Self>,
1592 ) -> BoxFuture<'static, Result<()>> {
1593 let block = ContentBlock::new_combined(
1594 message.clone(),
1595 self.project.read(cx).languages().clone(),
1596 cx,
1597 );
1598 let request = acp::PromptRequest {
1599 prompt: message.clone(),
1600 session_id: self.session_id.clone(),
1601 meta: None,
1602 };
1603 let git_store = self.project.read(cx).git_store().clone();
1604
1605 let message_id = if self.connection.truncate(&self.session_id, cx).is_some() {
1606 Some(UserMessageId::new())
1607 } else {
1608 None
1609 };
1610
1611 self.run_turn(cx, async move |this, cx| {
1612 this.update(cx, |this, cx| {
1613 this.push_entry(
1614 AgentThreadEntry::UserMessage(UserMessage {
1615 id: message_id.clone(),
1616 content: block,
1617 chunks: message,
1618 checkpoint: None,
1619 }),
1620 cx,
1621 );
1622 })
1623 .ok();
1624
1625 let old_checkpoint = git_store
1626 .update(cx, |git, cx| git.checkpoint(cx))?
1627 .await
1628 .context("failed to get old checkpoint")
1629 .log_err();
1630 this.update(cx, |this, cx| {
1631 if let Some((_ix, message)) = this.last_user_message() {
1632 message.checkpoint = old_checkpoint.map(|git_checkpoint| Checkpoint {
1633 git_checkpoint,
1634 show: false,
1635 });
1636 }
1637 this.connection.prompt(message_id, request, cx)
1638 })?
1639 .await
1640 })
1641 }
1642
1643 pub fn can_resume(&self, cx: &App) -> bool {
1644 self.connection.resume(&self.session_id, cx).is_some()
1645 }
1646
1647 pub fn resume(&mut self, cx: &mut Context<Self>) -> BoxFuture<'static, Result<()>> {
1648 self.run_turn(cx, async move |this, cx| {
1649 this.update(cx, |this, cx| {
1650 this.connection
1651 .resume(&this.session_id, cx)
1652 .map(|resume| resume.run(cx))
1653 })?
1654 .context("resuming a session is not supported")?
1655 .await
1656 })
1657 }
1658
1659 fn run_turn(
1660 &mut self,
1661 cx: &mut Context<Self>,
1662 f: impl 'static + AsyncFnOnce(WeakEntity<Self>, &mut AsyncApp) -> Result<acp::PromptResponse>,
1663 ) -> BoxFuture<'static, Result<()>> {
1664 self.clear_completed_plan_entries(cx);
1665
1666 let (tx, rx) = oneshot::channel();
1667 let cancel_task = self.cancel(cx);
1668
1669 self.send_task = Some(cx.spawn(async move |this, cx| {
1670 cancel_task.await;
1671 tx.send(f(this, cx).await).ok();
1672 }));
1673
1674 cx.spawn(async move |this, cx| {
1675 let response = rx.await;
1676
1677 this.update(cx, |this, cx| this.update_last_checkpoint(cx))?
1678 .await?;
1679
1680 this.update(cx, |this, cx| {
1681 this.project
1682 .update(cx, |project, cx| project.set_agent_location(None, cx));
1683 match response {
1684 Ok(Err(e)) => {
1685 this.send_task.take();
1686 cx.emit(AcpThreadEvent::Error);
1687 Err(e)
1688 }
1689 result => {
1690 let canceled = matches!(
1691 result,
1692 Ok(Ok(acp::PromptResponse {
1693 stop_reason: acp::StopReason::Cancelled,
1694 meta: None,
1695 }))
1696 );
1697
1698 // We only take the task if the current prompt wasn't canceled.
1699 //
1700 // This prompt may have been canceled because another one was sent
1701 // while it was still generating. In these cases, dropping `send_task`
1702 // would cause the next generation to be canceled.
1703 if !canceled {
1704 this.send_task.take();
1705 }
1706
1707 // Handle refusal - distinguish between user prompt and tool call refusals
1708 if let Ok(Ok(acp::PromptResponse {
1709 stop_reason: acp::StopReason::Refusal,
1710 meta: _,
1711 })) = result
1712 {
1713 if let Some((user_msg_ix, _)) = this.last_user_message() {
1714 // Check if there's a completed tool call with results after the last user message
1715 // This indicates the refusal is in response to tool output, not the user's prompt
1716 let has_completed_tool_call_after_user_msg =
1717 this.entries.iter().skip(user_msg_ix + 1).any(|entry| {
1718 if let AgentThreadEntry::ToolCall(tool_call) = entry {
1719 // Check if the tool call has completed and has output
1720 matches!(tool_call.status, ToolCallStatus::Completed)
1721 && tool_call.raw_output.is_some()
1722 } else {
1723 false
1724 }
1725 });
1726
1727 if has_completed_tool_call_after_user_msg {
1728 // Refusal is due to tool output - don't truncate, just notify
1729 // The model refused based on what the tool returned
1730 cx.emit(AcpThreadEvent::Refusal);
1731 } else {
1732 // User prompt was refused - truncate back to before the user message
1733 let range = user_msg_ix..this.entries.len();
1734 if range.start < range.end {
1735 this.entries.truncate(user_msg_ix);
1736 cx.emit(AcpThreadEvent::EntriesRemoved(range));
1737 }
1738 cx.emit(AcpThreadEvent::Refusal);
1739 }
1740 } else {
1741 // No user message found, treat as general refusal
1742 cx.emit(AcpThreadEvent::Refusal);
1743 }
1744 }
1745
1746 cx.emit(AcpThreadEvent::Stopped);
1747 Ok(())
1748 }
1749 }
1750 })?
1751 })
1752 .boxed()
1753 }
1754
1755 pub fn cancel(&mut self, cx: &mut Context<Self>) -> Task<()> {
1756 let Some(send_task) = self.send_task.take() else {
1757 return Task::ready(());
1758 };
1759
1760 for entry in self.entries.iter_mut() {
1761 if let AgentThreadEntry::ToolCall(call) = entry {
1762 let cancel = matches!(
1763 call.status,
1764 ToolCallStatus::Pending
1765 | ToolCallStatus::WaitingForConfirmation { .. }
1766 | ToolCallStatus::InProgress
1767 );
1768
1769 if cancel {
1770 call.status = ToolCallStatus::Canceled;
1771 }
1772 }
1773 }
1774
1775 self.connection.cancel(&self.session_id, cx);
1776
1777 // Wait for the send task to complete
1778 cx.foreground_executor().spawn(send_task)
1779 }
1780
1781 /// Restores the git working tree to the state at the given checkpoint (if one exists)
1782 pub fn restore_checkpoint(
1783 &mut self,
1784 id: UserMessageId,
1785 cx: &mut Context<Self>,
1786 ) -> Task<Result<()>> {
1787 let Some((_, message)) = self.user_message_mut(&id) else {
1788 return Task::ready(Err(anyhow!("message not found")));
1789 };
1790
1791 let checkpoint = message
1792 .checkpoint
1793 .as_ref()
1794 .map(|c| c.git_checkpoint.clone());
1795 let rewind = self.rewind(id.clone(), cx);
1796 let git_store = self.project.read(cx).git_store().clone();
1797
1798 cx.spawn(async move |_, cx| {
1799 rewind.await?;
1800 if let Some(checkpoint) = checkpoint {
1801 git_store
1802 .update(cx, |git, cx| git.restore_checkpoint(checkpoint, cx))?
1803 .await?;
1804 }
1805
1806 Ok(())
1807 })
1808 }
1809
1810 /// Rewinds this thread to before the entry at `index`, removing it and all
1811 /// subsequent entries while rejecting any action_log changes made from that point.
1812 /// Unlike `restore_checkpoint`, this method does not restore from git.
1813 pub fn rewind(&mut self, id: UserMessageId, cx: &mut Context<Self>) -> Task<Result<()>> {
1814 let Some(truncate) = self.connection.truncate(&self.session_id, cx) else {
1815 return Task::ready(Err(anyhow!("not supported")));
1816 };
1817
1818 cx.spawn(async move |this, cx| {
1819 cx.update(|cx| truncate.run(id.clone(), cx))?.await?;
1820 this.update(cx, |this, cx| {
1821 if let Some((ix, _)) = this.user_message_mut(&id) {
1822 let range = ix..this.entries.len();
1823 this.entries.truncate(ix);
1824 cx.emit(AcpThreadEvent::EntriesRemoved(range));
1825 }
1826 this.action_log()
1827 .update(cx, |action_log, cx| action_log.reject_all_edits(cx))
1828 })?
1829 .await;
1830 Ok(())
1831 })
1832 }
1833
1834 fn update_last_checkpoint(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
1835 let git_store = self.project.read(cx).git_store().clone();
1836
1837 let old_checkpoint = if let Some((_, message)) = self.last_user_message() {
1838 if let Some(checkpoint) = message.checkpoint.as_ref() {
1839 checkpoint.git_checkpoint.clone()
1840 } else {
1841 return Task::ready(Ok(()));
1842 }
1843 } else {
1844 return Task::ready(Ok(()));
1845 };
1846
1847 let new_checkpoint = git_store.update(cx, |git, cx| git.checkpoint(cx));
1848 cx.spawn(async move |this, cx| {
1849 let new_checkpoint = new_checkpoint
1850 .await
1851 .context("failed to get new checkpoint")
1852 .log_err();
1853 if let Some(new_checkpoint) = new_checkpoint {
1854 let equal = git_store
1855 .update(cx, |git, cx| {
1856 git.compare_checkpoints(old_checkpoint.clone(), new_checkpoint, cx)
1857 })?
1858 .await
1859 .unwrap_or(true);
1860 this.update(cx, |this, cx| {
1861 let (ix, message) = this.last_user_message().context("no user message")?;
1862 let checkpoint = message.checkpoint.as_mut().context("no checkpoint")?;
1863 checkpoint.show = !equal;
1864 cx.emit(AcpThreadEvent::EntryUpdated(ix));
1865 anyhow::Ok(())
1866 })??;
1867 }
1868
1869 Ok(())
1870 })
1871 }
1872
1873 fn last_user_message(&mut self) -> Option<(usize, &mut UserMessage)> {
1874 self.entries
1875 .iter_mut()
1876 .enumerate()
1877 .rev()
1878 .find_map(|(ix, entry)| {
1879 if let AgentThreadEntry::UserMessage(message) = entry {
1880 Some((ix, message))
1881 } else {
1882 None
1883 }
1884 })
1885 }
1886
1887 fn user_message_mut(&mut self, id: &UserMessageId) -> Option<(usize, &mut UserMessage)> {
1888 self.entries.iter_mut().enumerate().find_map(|(ix, entry)| {
1889 if let AgentThreadEntry::UserMessage(message) = entry {
1890 if message.id.as_ref() == Some(id) {
1891 Some((ix, message))
1892 } else {
1893 None
1894 }
1895 } else {
1896 None
1897 }
1898 })
1899 }
1900
1901 pub fn read_text_file(
1902 &self,
1903 path: PathBuf,
1904 line: Option<u32>,
1905 limit: Option<u32>,
1906 reuse_shared_snapshot: bool,
1907 cx: &mut Context<Self>,
1908 ) -> Task<Result<String, acp::Error>> {
1909 // Args are 1-based, move to 0-based
1910 let line = line.unwrap_or_default().saturating_sub(1);
1911 let limit = limit.unwrap_or(u32::MAX);
1912 let project = self.project.clone();
1913 let action_log = self.action_log.clone();
1914 cx.spawn(async move |this, cx| {
1915 let load = project
1916 .update(cx, |project, cx| {
1917 let path = project
1918 .project_path_for_absolute_path(&path, cx)
1919 .ok_or_else(|| {
1920 acp::Error::resource_not_found(Some(path.display().to_string()))
1921 })?;
1922 Ok(project.open_buffer(path, cx))
1923 })
1924 .map_err(|e| acp::Error::internal_error().with_data(e.to_string()))
1925 .flatten()?;
1926
1927 let buffer = load.await?;
1928
1929 let snapshot = if reuse_shared_snapshot {
1930 this.read_with(cx, |this, _| {
1931 this.shared_buffers.get(&buffer.clone()).cloned()
1932 })
1933 .log_err()
1934 .flatten()
1935 } else {
1936 None
1937 };
1938
1939 let snapshot = if let Some(snapshot) = snapshot {
1940 snapshot
1941 } else {
1942 action_log.update(cx, |action_log, cx| {
1943 action_log.buffer_read(buffer.clone(), cx);
1944 })?;
1945
1946 let snapshot = buffer.update(cx, |buffer, _| buffer.snapshot())?;
1947 this.update(cx, |this, _| {
1948 this.shared_buffers.insert(buffer.clone(), snapshot.clone());
1949 })?;
1950 snapshot
1951 };
1952
1953 let max_point = snapshot.max_point();
1954 let start_position = Point::new(line, 0);
1955
1956 if start_position > max_point {
1957 return Err(acp::Error::invalid_params().with_data(format!(
1958 "Attempting to read beyond the end of the file, line {}:{}",
1959 max_point.row + 1,
1960 max_point.column
1961 )));
1962 }
1963
1964 let start = snapshot.anchor_before(start_position);
1965 let end = snapshot.anchor_before(Point::new(line.saturating_add(limit), 0));
1966
1967 project.update(cx, |project, cx| {
1968 project.set_agent_location(
1969 Some(AgentLocation {
1970 buffer: buffer.downgrade(),
1971 position: start,
1972 }),
1973 cx,
1974 );
1975 })?;
1976
1977 Ok(snapshot.text_for_range(start..end).collect::<String>())
1978 })
1979 }
1980
1981 pub fn write_text_file(
1982 &self,
1983 path: PathBuf,
1984 content: String,
1985 cx: &mut Context<Self>,
1986 ) -> Task<Result<()>> {
1987 let project = self.project.clone();
1988 let action_log = self.action_log.clone();
1989 cx.spawn(async move |this, cx| {
1990 let load = project.update(cx, |project, cx| {
1991 let path = project
1992 .project_path_for_absolute_path(&path, cx)
1993 .context("invalid path")?;
1994 anyhow::Ok(project.open_buffer(path, cx))
1995 });
1996 let buffer = load??.await?;
1997 let snapshot = this.update(cx, |this, cx| {
1998 this.shared_buffers
1999 .get(&buffer)
2000 .cloned()
2001 .unwrap_or_else(|| buffer.read(cx).snapshot())
2002 })?;
2003 let edits = cx
2004 .background_executor()
2005 .spawn(async move {
2006 let old_text = snapshot.text();
2007 text_diff(old_text.as_str(), &content)
2008 .into_iter()
2009 .map(|(range, replacement)| {
2010 (
2011 snapshot.anchor_after(range.start)
2012 ..snapshot.anchor_before(range.end),
2013 replacement,
2014 )
2015 })
2016 .collect::<Vec<_>>()
2017 })
2018 .await;
2019
2020 project.update(cx, |project, cx| {
2021 project.set_agent_location(
2022 Some(AgentLocation {
2023 buffer: buffer.downgrade(),
2024 position: edits
2025 .last()
2026 .map(|(range, _)| range.end)
2027 .unwrap_or(Anchor::MIN),
2028 }),
2029 cx,
2030 );
2031 })?;
2032
2033 let format_on_save = cx.update(|cx| {
2034 action_log.update(cx, |action_log, cx| {
2035 action_log.buffer_read(buffer.clone(), cx);
2036 });
2037
2038 let format_on_save = buffer.update(cx, |buffer, cx| {
2039 buffer.edit(edits, None, cx);
2040
2041 let settings = language::language_settings::language_settings(
2042 buffer.language().map(|l| l.name()),
2043 buffer.file(),
2044 cx,
2045 );
2046
2047 settings.format_on_save != FormatOnSave::Off
2048 });
2049 action_log.update(cx, |action_log, cx| {
2050 action_log.buffer_edited(buffer.clone(), cx);
2051 });
2052 format_on_save
2053 })?;
2054
2055 if format_on_save {
2056 let format_task = project.update(cx, |project, cx| {
2057 project.format(
2058 HashSet::from_iter([buffer.clone()]),
2059 LspFormatTarget::Buffers,
2060 false,
2061 FormatTrigger::Save,
2062 cx,
2063 )
2064 })?;
2065 format_task.await.log_err();
2066
2067 action_log.update(cx, |action_log, cx| {
2068 action_log.buffer_edited(buffer.clone(), cx);
2069 })?;
2070 }
2071
2072 project
2073 .update(cx, |project, cx| project.save_buffer(buffer, cx))?
2074 .await
2075 })
2076 }
2077
2078 pub fn create_terminal(
2079 &self,
2080 command: String,
2081 args: Vec<String>,
2082 extra_env: Vec<acp::EnvVariable>,
2083 cwd: Option<PathBuf>,
2084 output_byte_limit: Option<u64>,
2085 cx: &mut Context<Self>,
2086 ) -> Task<Result<Entity<Terminal>>> {
2087 let env = match &cwd {
2088 Some(dir) => self.project.update(cx, |project, cx| {
2089 let shell = TerminalSettings::get_global(cx).shell.clone();
2090 project.directory_environment(&shell, dir.as_path().into(), cx)
2091 }),
2092 None => Task::ready(None).shared(),
2093 };
2094 let env = cx.spawn(async move |_, _| {
2095 let mut env = env.await.unwrap_or_default();
2096 // Disables paging for `git` and hopefully other commands
2097 env.insert("PAGER".into(), "".into());
2098 for var in extra_env {
2099 env.insert(var.name, var.value);
2100 }
2101 env
2102 });
2103
2104 let project = self.project.clone();
2105 let language_registry = project.read(cx).languages().clone();
2106
2107 let terminal_id = acp::TerminalId(Uuid::new_v4().to_string().into());
2108 let terminal_task = cx.spawn({
2109 let terminal_id = terminal_id.clone();
2110 async move |_this, cx| {
2111 let env = env.await;
2112 let shell = project
2113 .update(cx, |project, cx| {
2114 project
2115 .remote_client()
2116 .and_then(|r| r.read(cx).default_system_shell())
2117 })?
2118 .unwrap_or_else(|| get_default_system_shell());
2119 let (task_command, task_args) = ShellBuilder::new(&Shell::Program(shell))
2120 .redirect_stdin_to_dev_null()
2121 .build(Some(command.clone()), &args);
2122 let terminal = project
2123 .update(cx, |project, cx| {
2124 project.create_terminal_task(
2125 task::SpawnInTerminal {
2126 command: Some(task_command),
2127 args: task_args,
2128 cwd: cwd.clone(),
2129 env,
2130 ..Default::default()
2131 },
2132 cx,
2133 )
2134 })?
2135 .await?;
2136
2137 cx.new(|cx| {
2138 Terminal::new(
2139 terminal_id,
2140 &format!("{} {}", command, args.join(" ")),
2141 cwd,
2142 output_byte_limit.map(|l| l as usize),
2143 terminal,
2144 language_registry,
2145 cx,
2146 )
2147 })
2148 }
2149 });
2150
2151 cx.spawn(async move |this, cx| {
2152 let terminal = terminal_task.await?;
2153 this.update(cx, |this, _cx| {
2154 this.terminals.insert(terminal_id, terminal.clone());
2155 terminal
2156 })
2157 })
2158 }
2159
2160 pub fn kill_terminal(
2161 &mut self,
2162 terminal_id: acp::TerminalId,
2163 cx: &mut Context<Self>,
2164 ) -> Result<()> {
2165 self.terminals
2166 .get(&terminal_id)
2167 .context("Terminal not found")?
2168 .update(cx, |terminal, cx| {
2169 terminal.kill(cx);
2170 });
2171
2172 Ok(())
2173 }
2174
2175 pub fn release_terminal(
2176 &mut self,
2177 terminal_id: acp::TerminalId,
2178 cx: &mut Context<Self>,
2179 ) -> Result<()> {
2180 self.terminals
2181 .remove(&terminal_id)
2182 .context("Terminal not found")?
2183 .update(cx, |terminal, cx| {
2184 terminal.kill(cx);
2185 });
2186
2187 Ok(())
2188 }
2189
2190 pub fn terminal(&self, terminal_id: acp::TerminalId) -> Result<Entity<Terminal>> {
2191 self.terminals
2192 .get(&terminal_id)
2193 .context("Terminal not found")
2194 .cloned()
2195 }
2196
2197 pub fn to_markdown(&self, cx: &App) -> String {
2198 self.entries.iter().map(|e| e.to_markdown(cx)).collect()
2199 }
2200
2201 pub fn emit_load_error(&mut self, error: LoadError, cx: &mut Context<Self>) {
2202 cx.emit(AcpThreadEvent::LoadError(error));
2203 }
2204
2205 pub fn register_terminal_created(
2206 &mut self,
2207 terminal_id: acp::TerminalId,
2208 command_label: String,
2209 working_dir: Option<PathBuf>,
2210 output_byte_limit: Option<u64>,
2211 terminal: Entity<::terminal::Terminal>,
2212 cx: &mut Context<Self>,
2213 ) -> Entity<Terminal> {
2214 let language_registry = self.project.read(cx).languages().clone();
2215
2216 let entity = cx.new(|cx| {
2217 Terminal::new(
2218 terminal_id.clone(),
2219 &command_label,
2220 working_dir.clone(),
2221 output_byte_limit.map(|l| l as usize),
2222 terminal,
2223 language_registry,
2224 cx,
2225 )
2226 });
2227 self.terminals.insert(terminal_id.clone(), entity.clone());
2228 entity
2229 }
2230}
2231
2232fn markdown_for_raw_output(
2233 raw_output: &serde_json::Value,
2234 language_registry: &Arc<LanguageRegistry>,
2235 cx: &mut App,
2236) -> Option<Entity<Markdown>> {
2237 match raw_output {
2238 serde_json::Value::Null => None,
2239 serde_json::Value::Bool(value) => Some(cx.new(|cx| {
2240 Markdown::new(
2241 value.to_string().into(),
2242 Some(language_registry.clone()),
2243 None,
2244 cx,
2245 )
2246 })),
2247 serde_json::Value::Number(value) => Some(cx.new(|cx| {
2248 Markdown::new(
2249 value.to_string().into(),
2250 Some(language_registry.clone()),
2251 None,
2252 cx,
2253 )
2254 })),
2255 serde_json::Value::String(value) => Some(cx.new(|cx| {
2256 Markdown::new(
2257 value.clone().into(),
2258 Some(language_registry.clone()),
2259 None,
2260 cx,
2261 )
2262 })),
2263 value => Some(cx.new(|cx| {
2264 Markdown::new(
2265 format!("```json\n{}\n```", value).into(),
2266 Some(language_registry.clone()),
2267 None,
2268 cx,
2269 )
2270 })),
2271 }
2272}
2273
2274#[cfg(test)]
2275mod tests {
2276 use super::*;
2277 use anyhow::anyhow;
2278 use futures::{channel::mpsc, future::LocalBoxFuture, select};
2279 use gpui::{App, AsyncApp, TestAppContext, WeakEntity};
2280 use indoc::indoc;
2281 use project::{FakeFs, Fs};
2282 use rand::{distr, prelude::*};
2283 use serde_json::json;
2284 use settings::SettingsStore;
2285 use smol::stream::StreamExt as _;
2286 use std::{
2287 any::Any,
2288 cell::RefCell,
2289 path::Path,
2290 rc::Rc,
2291 sync::atomic::{AtomicBool, AtomicUsize, Ordering::SeqCst},
2292 time::Duration,
2293 };
2294 use util::path;
2295
2296 fn init_test(cx: &mut TestAppContext) {
2297 env_logger::try_init().ok();
2298 cx.update(|cx| {
2299 let settings_store = SettingsStore::test(cx);
2300 cx.set_global(settings_store);
2301 Project::init_settings(cx);
2302 language::init(cx);
2303 });
2304 }
2305
2306 #[gpui::test]
2307 async fn test_terminal_output_buffered_before_created_renders(cx: &mut gpui::TestAppContext) {
2308 init_test(cx);
2309
2310 let fs = FakeFs::new(cx.executor());
2311 let project = Project::test(fs, [], cx).await;
2312 let connection = Rc::new(FakeAgentConnection::new());
2313 let thread = cx
2314 .update(|cx| connection.new_thread(project, std::path::Path::new(path!("/test")), cx))
2315 .await
2316 .unwrap();
2317
2318 let terminal_id = acp::TerminalId(uuid::Uuid::new_v4().to_string().into());
2319
2320 // Send Output BEFORE Created - should be buffered by acp_thread
2321 thread.update(cx, |thread, cx| {
2322 thread.on_terminal_provider_event(
2323 TerminalProviderEvent::Output {
2324 terminal_id: terminal_id.clone(),
2325 data: b"hello buffered".to_vec(),
2326 },
2327 cx,
2328 );
2329 });
2330
2331 // Create a display-only terminal and then send Created
2332 let lower = cx.new(|cx| {
2333 let builder = ::terminal::TerminalBuilder::new_display_only(
2334 ::terminal::terminal_settings::CursorShape::default(),
2335 ::terminal::terminal_settings::AlternateScroll::On,
2336 None,
2337 0,
2338 )
2339 .unwrap();
2340 builder.subscribe(cx)
2341 });
2342
2343 thread.update(cx, |thread, cx| {
2344 thread.on_terminal_provider_event(
2345 TerminalProviderEvent::Created {
2346 terminal_id: terminal_id.clone(),
2347 label: "Buffered Test".to_string(),
2348 cwd: None,
2349 output_byte_limit: None,
2350 terminal: lower.clone(),
2351 },
2352 cx,
2353 );
2354 });
2355
2356 // After Created, buffered Output should have been flushed into the renderer
2357 let content = thread.read_with(cx, |thread, cx| {
2358 let term = thread.terminal(terminal_id.clone()).unwrap();
2359 term.read_with(cx, |t, cx| t.inner().read(cx).get_content())
2360 });
2361
2362 assert!(
2363 content.contains("hello buffered"),
2364 "expected buffered output to render, got: {content}"
2365 );
2366 }
2367
2368 #[gpui::test]
2369 async fn test_terminal_output_and_exit_buffered_before_created(cx: &mut gpui::TestAppContext) {
2370 init_test(cx);
2371
2372 let fs = FakeFs::new(cx.executor());
2373 let project = Project::test(fs, [], cx).await;
2374 let connection = Rc::new(FakeAgentConnection::new());
2375 let thread = cx
2376 .update(|cx| connection.new_thread(project, std::path::Path::new(path!("/test")), cx))
2377 .await
2378 .unwrap();
2379
2380 let terminal_id = acp::TerminalId(uuid::Uuid::new_v4().to_string().into());
2381
2382 // Send Output BEFORE Created
2383 thread.update(cx, |thread, cx| {
2384 thread.on_terminal_provider_event(
2385 TerminalProviderEvent::Output {
2386 terminal_id: terminal_id.clone(),
2387 data: b"pre-exit data".to_vec(),
2388 },
2389 cx,
2390 );
2391 });
2392
2393 // Send Exit BEFORE Created
2394 thread.update(cx, |thread, cx| {
2395 thread.on_terminal_provider_event(
2396 TerminalProviderEvent::Exit {
2397 terminal_id: terminal_id.clone(),
2398 status: acp::TerminalExitStatus {
2399 exit_code: Some(0),
2400 signal: None,
2401 meta: None,
2402 },
2403 },
2404 cx,
2405 );
2406 });
2407
2408 // Now create a display-only lower-level terminal and send Created
2409 let lower = cx.new(|cx| {
2410 let builder = ::terminal::TerminalBuilder::new_display_only(
2411 ::terminal::terminal_settings::CursorShape::default(),
2412 ::terminal::terminal_settings::AlternateScroll::On,
2413 None,
2414 0,
2415 )
2416 .unwrap();
2417 builder.subscribe(cx)
2418 });
2419
2420 thread.update(cx, |thread, cx| {
2421 thread.on_terminal_provider_event(
2422 TerminalProviderEvent::Created {
2423 terminal_id: terminal_id.clone(),
2424 label: "Buffered Exit Test".to_string(),
2425 cwd: None,
2426 output_byte_limit: None,
2427 terminal: lower.clone(),
2428 },
2429 cx,
2430 );
2431 });
2432
2433 // Output should be present after Created (flushed from buffer)
2434 let content = thread.read_with(cx, |thread, cx| {
2435 let term = thread.terminal(terminal_id.clone()).unwrap();
2436 term.read_with(cx, |t, cx| t.inner().read(cx).get_content())
2437 });
2438
2439 assert!(
2440 content.contains("pre-exit data"),
2441 "expected pre-exit data to render, got: {content}"
2442 );
2443 }
2444
2445 #[gpui::test]
2446 async fn test_push_user_content_block(cx: &mut gpui::TestAppContext) {
2447 init_test(cx);
2448
2449 let fs = FakeFs::new(cx.executor());
2450 let project = Project::test(fs, [], cx).await;
2451 let connection = Rc::new(FakeAgentConnection::new());
2452 let thread = cx
2453 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
2454 .await
2455 .unwrap();
2456
2457 // Test creating a new user message
2458 thread.update(cx, |thread, cx| {
2459 thread.push_user_content_block(
2460 None,
2461 acp::ContentBlock::Text(acp::TextContent {
2462 annotations: None,
2463 text: "Hello, ".to_string(),
2464 meta: None,
2465 }),
2466 cx,
2467 );
2468 });
2469
2470 thread.update(cx, |thread, cx| {
2471 assert_eq!(thread.entries.len(), 1);
2472 if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
2473 assert_eq!(user_msg.id, None);
2474 assert_eq!(user_msg.content.to_markdown(cx), "Hello, ");
2475 } else {
2476 panic!("Expected UserMessage");
2477 }
2478 });
2479
2480 // Test appending to existing user message
2481 let message_1_id = UserMessageId::new();
2482 thread.update(cx, |thread, cx| {
2483 thread.push_user_content_block(
2484 Some(message_1_id.clone()),
2485 acp::ContentBlock::Text(acp::TextContent {
2486 annotations: None,
2487 text: "world!".to_string(),
2488 meta: None,
2489 }),
2490 cx,
2491 );
2492 });
2493
2494 thread.update(cx, |thread, cx| {
2495 assert_eq!(thread.entries.len(), 1);
2496 if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
2497 assert_eq!(user_msg.id, Some(message_1_id));
2498 assert_eq!(user_msg.content.to_markdown(cx), "Hello, world!");
2499 } else {
2500 panic!("Expected UserMessage");
2501 }
2502 });
2503
2504 // Test creating new user message after assistant message
2505 thread.update(cx, |thread, cx| {
2506 thread.push_assistant_content_block(
2507 acp::ContentBlock::Text(acp::TextContent {
2508 annotations: None,
2509 text: "Assistant response".to_string(),
2510 meta: None,
2511 }),
2512 false,
2513 cx,
2514 );
2515 });
2516
2517 let message_2_id = UserMessageId::new();
2518 thread.update(cx, |thread, cx| {
2519 thread.push_user_content_block(
2520 Some(message_2_id.clone()),
2521 acp::ContentBlock::Text(acp::TextContent {
2522 annotations: None,
2523 text: "New user message".to_string(),
2524 meta: None,
2525 }),
2526 cx,
2527 );
2528 });
2529
2530 thread.update(cx, |thread, cx| {
2531 assert_eq!(thread.entries.len(), 3);
2532 if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[2] {
2533 assert_eq!(user_msg.id, Some(message_2_id));
2534 assert_eq!(user_msg.content.to_markdown(cx), "New user message");
2535 } else {
2536 panic!("Expected UserMessage at index 2");
2537 }
2538 });
2539 }
2540
2541 #[gpui::test]
2542 async fn test_thinking_concatenation(cx: &mut gpui::TestAppContext) {
2543 init_test(cx);
2544
2545 let fs = FakeFs::new(cx.executor());
2546 let project = Project::test(fs, [], cx).await;
2547 let connection = Rc::new(FakeAgentConnection::new().on_user_message(
2548 |_, thread, mut cx| {
2549 async move {
2550 thread.update(&mut cx, |thread, cx| {
2551 thread
2552 .handle_session_update(
2553 acp::SessionUpdate::AgentThoughtChunk {
2554 content: "Thinking ".into(),
2555 },
2556 cx,
2557 )
2558 .unwrap();
2559 thread
2560 .handle_session_update(
2561 acp::SessionUpdate::AgentThoughtChunk {
2562 content: "hard!".into(),
2563 },
2564 cx,
2565 )
2566 .unwrap();
2567 })?;
2568 Ok(acp::PromptResponse {
2569 stop_reason: acp::StopReason::EndTurn,
2570 meta: None,
2571 })
2572 }
2573 .boxed_local()
2574 },
2575 ));
2576
2577 let thread = cx
2578 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
2579 .await
2580 .unwrap();
2581
2582 thread
2583 .update(cx, |thread, cx| thread.send_raw("Hello from Zed!", cx))
2584 .await
2585 .unwrap();
2586
2587 let output = thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
2588 assert_eq!(
2589 output,
2590 indoc! {r#"
2591 ## User
2592
2593 Hello from Zed!
2594
2595 ## Assistant
2596
2597 <thinking>
2598 Thinking hard!
2599 </thinking>
2600
2601 "#}
2602 );
2603 }
2604
2605 #[gpui::test]
2606 async fn test_edits_concurrently_to_user(cx: &mut TestAppContext) {
2607 init_test(cx);
2608
2609 let fs = FakeFs::new(cx.executor());
2610 fs.insert_tree(path!("/tmp"), json!({"foo": "one\ntwo\nthree\n"}))
2611 .await;
2612 let project = Project::test(fs.clone(), [], cx).await;
2613 let (read_file_tx, read_file_rx) = oneshot::channel::<()>();
2614 let read_file_tx = Rc::new(RefCell::new(Some(read_file_tx)));
2615 let connection = Rc::new(FakeAgentConnection::new().on_user_message(
2616 move |_, thread, mut cx| {
2617 let read_file_tx = read_file_tx.clone();
2618 async move {
2619 let content = thread
2620 .update(&mut cx, |thread, cx| {
2621 thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
2622 })
2623 .unwrap()
2624 .await
2625 .unwrap();
2626 assert_eq!(content, "one\ntwo\nthree\n");
2627 read_file_tx.take().unwrap().send(()).unwrap();
2628 thread
2629 .update(&mut cx, |thread, cx| {
2630 thread.write_text_file(
2631 path!("/tmp/foo").into(),
2632 "one\ntwo\nthree\nfour\nfive\n".to_string(),
2633 cx,
2634 )
2635 })
2636 .unwrap()
2637 .await
2638 .unwrap();
2639 Ok(acp::PromptResponse {
2640 stop_reason: acp::StopReason::EndTurn,
2641 meta: None,
2642 })
2643 }
2644 .boxed_local()
2645 },
2646 ));
2647
2648 let (worktree, pathbuf) = project
2649 .update(cx, |project, cx| {
2650 project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
2651 })
2652 .await
2653 .unwrap();
2654 let buffer = project
2655 .update(cx, |project, cx| {
2656 project.open_buffer((worktree.read(cx).id(), pathbuf), cx)
2657 })
2658 .await
2659 .unwrap();
2660
2661 let thread = cx
2662 .update(|cx| connection.new_thread(project, Path::new(path!("/tmp")), cx))
2663 .await
2664 .unwrap();
2665
2666 let request = thread.update(cx, |thread, cx| {
2667 thread.send_raw("Extend the count in /tmp/foo", cx)
2668 });
2669 read_file_rx.await.ok();
2670 buffer.update(cx, |buffer, cx| {
2671 buffer.edit([(0..0, "zero\n".to_string())], None, cx);
2672 });
2673 cx.run_until_parked();
2674 assert_eq!(
2675 buffer.read_with(cx, |buffer, _| buffer.text()),
2676 "zero\none\ntwo\nthree\nfour\nfive\n"
2677 );
2678 assert_eq!(
2679 String::from_utf8(fs.read_file_sync(path!("/tmp/foo")).unwrap()).unwrap(),
2680 "zero\none\ntwo\nthree\nfour\nfive\n"
2681 );
2682 request.await.unwrap();
2683 }
2684
2685 #[gpui::test]
2686 async fn test_reading_from_line(cx: &mut TestAppContext) {
2687 init_test(cx);
2688
2689 let fs = FakeFs::new(cx.executor());
2690 fs.insert_tree(path!("/tmp"), json!({"foo": "one\ntwo\nthree\nfour\n"}))
2691 .await;
2692 let project = Project::test(fs.clone(), [], cx).await;
2693 project
2694 .update(cx, |project, cx| {
2695 project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
2696 })
2697 .await
2698 .unwrap();
2699
2700 let connection = Rc::new(FakeAgentConnection::new());
2701
2702 let thread = cx
2703 .update(|cx| connection.new_thread(project, Path::new(path!("/tmp")), cx))
2704 .await
2705 .unwrap();
2706
2707 // Whole file
2708 let content = thread
2709 .update(cx, |thread, cx| {
2710 thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
2711 })
2712 .await
2713 .unwrap();
2714
2715 assert_eq!(content, "one\ntwo\nthree\nfour\n");
2716
2717 // Only start line
2718 let content = thread
2719 .update(cx, |thread, cx| {
2720 thread.read_text_file(path!("/tmp/foo").into(), Some(3), None, false, cx)
2721 })
2722 .await
2723 .unwrap();
2724
2725 assert_eq!(content, "three\nfour\n");
2726
2727 // Only limit
2728 let content = thread
2729 .update(cx, |thread, cx| {
2730 thread.read_text_file(path!("/tmp/foo").into(), None, Some(2), false, cx)
2731 })
2732 .await
2733 .unwrap();
2734
2735 assert_eq!(content, "one\ntwo\n");
2736
2737 // Range
2738 let content = thread
2739 .update(cx, |thread, cx| {
2740 thread.read_text_file(path!("/tmp/foo").into(), Some(2), Some(2), false, cx)
2741 })
2742 .await
2743 .unwrap();
2744
2745 assert_eq!(content, "two\nthree\n");
2746
2747 // Invalid
2748 let err = thread
2749 .update(cx, |thread, cx| {
2750 thread.read_text_file(path!("/tmp/foo").into(), Some(6), Some(2), false, cx)
2751 })
2752 .await
2753 .unwrap_err();
2754
2755 assert_eq!(
2756 err.to_string(),
2757 "Invalid params: \"Attempting to read beyond the end of the file, line 5:0\""
2758 );
2759 }
2760
2761 #[gpui::test]
2762 async fn test_reading_empty_file(cx: &mut TestAppContext) {
2763 init_test(cx);
2764
2765 let fs = FakeFs::new(cx.executor());
2766 fs.insert_tree(path!("/tmp"), json!({"foo": ""})).await;
2767 let project = Project::test(fs.clone(), [], cx).await;
2768 project
2769 .update(cx, |project, cx| {
2770 project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
2771 })
2772 .await
2773 .unwrap();
2774
2775 let connection = Rc::new(FakeAgentConnection::new());
2776
2777 let thread = cx
2778 .update(|cx| connection.new_thread(project, Path::new(path!("/tmp")), cx))
2779 .await
2780 .unwrap();
2781
2782 // Whole file
2783 let content = thread
2784 .update(cx, |thread, cx| {
2785 thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
2786 })
2787 .await
2788 .unwrap();
2789
2790 assert_eq!(content, "");
2791
2792 // Only start line
2793 let content = thread
2794 .update(cx, |thread, cx| {
2795 thread.read_text_file(path!("/tmp/foo").into(), Some(1), None, false, cx)
2796 })
2797 .await
2798 .unwrap();
2799
2800 assert_eq!(content, "");
2801
2802 // Only limit
2803 let content = thread
2804 .update(cx, |thread, cx| {
2805 thread.read_text_file(path!("/tmp/foo").into(), None, Some(2), false, cx)
2806 })
2807 .await
2808 .unwrap();
2809
2810 assert_eq!(content, "");
2811
2812 // Range
2813 let content = thread
2814 .update(cx, |thread, cx| {
2815 thread.read_text_file(path!("/tmp/foo").into(), Some(1), Some(1), false, cx)
2816 })
2817 .await
2818 .unwrap();
2819
2820 assert_eq!(content, "");
2821
2822 // Invalid
2823 let err = thread
2824 .update(cx, |thread, cx| {
2825 thread.read_text_file(path!("/tmp/foo").into(), Some(5), Some(2), false, cx)
2826 })
2827 .await
2828 .unwrap_err();
2829
2830 assert_eq!(
2831 err.to_string(),
2832 "Invalid params: \"Attempting to read beyond the end of the file, line 1:0\""
2833 );
2834 }
2835 #[gpui::test]
2836 async fn test_reading_non_existing_file(cx: &mut TestAppContext) {
2837 init_test(cx);
2838
2839 let fs = FakeFs::new(cx.executor());
2840 fs.insert_tree(path!("/tmp"), json!({})).await;
2841 let project = Project::test(fs.clone(), [], cx).await;
2842 project
2843 .update(cx, |project, cx| {
2844 project.find_or_create_worktree(path!("/tmp"), true, cx)
2845 })
2846 .await
2847 .unwrap();
2848
2849 let connection = Rc::new(FakeAgentConnection::new());
2850
2851 let thread = cx
2852 .update(|cx| connection.new_thread(project, Path::new(path!("/tmp")), cx))
2853 .await
2854 .unwrap();
2855
2856 // Out of project file
2857 let err = thread
2858 .update(cx, |thread, cx| {
2859 thread.read_text_file(path!("/foo").into(), None, None, false, cx)
2860 })
2861 .await
2862 .unwrap_err();
2863
2864 assert_eq!(err.code, acp::ErrorCode::RESOURCE_NOT_FOUND.code);
2865 }
2866
2867 #[gpui::test]
2868 async fn test_succeeding_canceled_toolcall(cx: &mut TestAppContext) {
2869 init_test(cx);
2870
2871 let fs = FakeFs::new(cx.executor());
2872 let project = Project::test(fs, [], cx).await;
2873 let id = acp::ToolCallId("test".into());
2874
2875 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
2876 let id = id.clone();
2877 move |_, thread, mut cx| {
2878 let id = id.clone();
2879 async move {
2880 thread
2881 .update(&mut cx, |thread, cx| {
2882 thread.handle_session_update(
2883 acp::SessionUpdate::ToolCall(acp::ToolCall {
2884 id: id.clone(),
2885 title: "Label".into(),
2886 kind: acp::ToolKind::Fetch,
2887 status: acp::ToolCallStatus::InProgress,
2888 content: vec![],
2889 locations: vec![],
2890 raw_input: None,
2891 raw_output: None,
2892 meta: None,
2893 }),
2894 cx,
2895 )
2896 })
2897 .unwrap()
2898 .unwrap();
2899 Ok(acp::PromptResponse {
2900 stop_reason: acp::StopReason::EndTurn,
2901 meta: None,
2902 })
2903 }
2904 .boxed_local()
2905 }
2906 }));
2907
2908 let thread = cx
2909 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
2910 .await
2911 .unwrap();
2912
2913 let request = thread.update(cx, |thread, cx| {
2914 thread.send_raw("Fetch https://example.com", cx)
2915 });
2916
2917 run_until_first_tool_call(&thread, cx).await;
2918
2919 thread.read_with(cx, |thread, _| {
2920 assert!(matches!(
2921 thread.entries[1],
2922 AgentThreadEntry::ToolCall(ToolCall {
2923 status: ToolCallStatus::InProgress,
2924 ..
2925 })
2926 ));
2927 });
2928
2929 thread.update(cx, |thread, cx| thread.cancel(cx)).await;
2930
2931 thread.read_with(cx, |thread, _| {
2932 assert!(matches!(
2933 &thread.entries[1],
2934 AgentThreadEntry::ToolCall(ToolCall {
2935 status: ToolCallStatus::Canceled,
2936 ..
2937 })
2938 ));
2939 });
2940
2941 thread
2942 .update(cx, |thread, cx| {
2943 thread.handle_session_update(
2944 acp::SessionUpdate::ToolCallUpdate(acp::ToolCallUpdate {
2945 id,
2946 fields: acp::ToolCallUpdateFields {
2947 status: Some(acp::ToolCallStatus::Completed),
2948 ..Default::default()
2949 },
2950 meta: None,
2951 }),
2952 cx,
2953 )
2954 })
2955 .unwrap();
2956
2957 request.await.unwrap();
2958
2959 thread.read_with(cx, |thread, _| {
2960 assert!(matches!(
2961 thread.entries[1],
2962 AgentThreadEntry::ToolCall(ToolCall {
2963 status: ToolCallStatus::Completed,
2964 ..
2965 })
2966 ));
2967 });
2968 }
2969
2970 #[gpui::test]
2971 async fn test_no_pending_edits_if_tool_calls_are_completed(cx: &mut TestAppContext) {
2972 init_test(cx);
2973 let fs = FakeFs::new(cx.background_executor.clone());
2974 fs.insert_tree(path!("/test"), json!({})).await;
2975 let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
2976
2977 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
2978 move |_, thread, mut cx| {
2979 async move {
2980 thread
2981 .update(&mut cx, |thread, cx| {
2982 thread.handle_session_update(
2983 acp::SessionUpdate::ToolCall(acp::ToolCall {
2984 id: acp::ToolCallId("test".into()),
2985 title: "Label".into(),
2986 kind: acp::ToolKind::Edit,
2987 status: acp::ToolCallStatus::Completed,
2988 content: vec![acp::ToolCallContent::Diff {
2989 diff: acp::Diff {
2990 path: "/test/test.txt".into(),
2991 old_text: None,
2992 new_text: "foo".into(),
2993 meta: None,
2994 },
2995 }],
2996 locations: vec![],
2997 raw_input: None,
2998 raw_output: None,
2999 meta: None,
3000 }),
3001 cx,
3002 )
3003 })
3004 .unwrap()
3005 .unwrap();
3006 Ok(acp::PromptResponse {
3007 stop_reason: acp::StopReason::EndTurn,
3008 meta: None,
3009 })
3010 }
3011 .boxed_local()
3012 }
3013 }));
3014
3015 let thread = cx
3016 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3017 .await
3018 .unwrap();
3019
3020 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Hi".into()], cx)))
3021 .await
3022 .unwrap();
3023
3024 assert!(cx.read(|cx| !thread.read(cx).has_pending_edit_tool_calls()));
3025 }
3026
3027 #[gpui::test(iterations = 10)]
3028 async fn test_checkpoints(cx: &mut TestAppContext) {
3029 init_test(cx);
3030 let fs = FakeFs::new(cx.background_executor.clone());
3031 fs.insert_tree(
3032 path!("/test"),
3033 json!({
3034 ".git": {}
3035 }),
3036 )
3037 .await;
3038 let project = Project::test(fs.clone(), [path!("/test").as_ref()], cx).await;
3039
3040 let simulate_changes = Arc::new(AtomicBool::new(true));
3041 let next_filename = Arc::new(AtomicUsize::new(0));
3042 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3043 let simulate_changes = simulate_changes.clone();
3044 let next_filename = next_filename.clone();
3045 let fs = fs.clone();
3046 move |request, thread, mut cx| {
3047 let fs = fs.clone();
3048 let simulate_changes = simulate_changes.clone();
3049 let next_filename = next_filename.clone();
3050 async move {
3051 if simulate_changes.load(SeqCst) {
3052 let filename = format!("/test/file-{}", next_filename.fetch_add(1, SeqCst));
3053 fs.write(Path::new(&filename), b"").await?;
3054 }
3055
3056 let acp::ContentBlock::Text(content) = &request.prompt[0] else {
3057 panic!("expected text content block");
3058 };
3059 thread.update(&mut cx, |thread, cx| {
3060 thread
3061 .handle_session_update(
3062 acp::SessionUpdate::AgentMessageChunk {
3063 content: content.text.to_uppercase().into(),
3064 },
3065 cx,
3066 )
3067 .unwrap();
3068 })?;
3069 Ok(acp::PromptResponse {
3070 stop_reason: acp::StopReason::EndTurn,
3071 meta: None,
3072 })
3073 }
3074 .boxed_local()
3075 }
3076 }));
3077 let thread = cx
3078 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3079 .await
3080 .unwrap();
3081
3082 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Lorem".into()], cx)))
3083 .await
3084 .unwrap();
3085 thread.read_with(cx, |thread, cx| {
3086 assert_eq!(
3087 thread.to_markdown(cx),
3088 indoc! {"
3089 ## User (checkpoint)
3090
3091 Lorem
3092
3093 ## Assistant
3094
3095 LOREM
3096
3097 "}
3098 );
3099 });
3100 assert_eq!(fs.files(), vec![Path::new(path!("/test/file-0"))]);
3101
3102 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["ipsum".into()], cx)))
3103 .await
3104 .unwrap();
3105 thread.read_with(cx, |thread, cx| {
3106 assert_eq!(
3107 thread.to_markdown(cx),
3108 indoc! {"
3109 ## User (checkpoint)
3110
3111 Lorem
3112
3113 ## Assistant
3114
3115 LOREM
3116
3117 ## User (checkpoint)
3118
3119 ipsum
3120
3121 ## Assistant
3122
3123 IPSUM
3124
3125 "}
3126 );
3127 });
3128 assert_eq!(
3129 fs.files(),
3130 vec![
3131 Path::new(path!("/test/file-0")),
3132 Path::new(path!("/test/file-1"))
3133 ]
3134 );
3135
3136 // Checkpoint isn't stored when there are no changes.
3137 simulate_changes.store(false, SeqCst);
3138 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["dolor".into()], cx)))
3139 .await
3140 .unwrap();
3141 thread.read_with(cx, |thread, cx| {
3142 assert_eq!(
3143 thread.to_markdown(cx),
3144 indoc! {"
3145 ## User (checkpoint)
3146
3147 Lorem
3148
3149 ## Assistant
3150
3151 LOREM
3152
3153 ## User (checkpoint)
3154
3155 ipsum
3156
3157 ## Assistant
3158
3159 IPSUM
3160
3161 ## User
3162
3163 dolor
3164
3165 ## Assistant
3166
3167 DOLOR
3168
3169 "}
3170 );
3171 });
3172 assert_eq!(
3173 fs.files(),
3174 vec![
3175 Path::new(path!("/test/file-0")),
3176 Path::new(path!("/test/file-1"))
3177 ]
3178 );
3179
3180 // Rewinding the conversation truncates the history and restores the checkpoint.
3181 thread
3182 .update(cx, |thread, cx| {
3183 let AgentThreadEntry::UserMessage(message) = &thread.entries[2] else {
3184 panic!("unexpected entries {:?}", thread.entries)
3185 };
3186 thread.restore_checkpoint(message.id.clone().unwrap(), cx)
3187 })
3188 .await
3189 .unwrap();
3190 thread.read_with(cx, |thread, cx| {
3191 assert_eq!(
3192 thread.to_markdown(cx),
3193 indoc! {"
3194 ## User (checkpoint)
3195
3196 Lorem
3197
3198 ## Assistant
3199
3200 LOREM
3201
3202 "}
3203 );
3204 });
3205 assert_eq!(fs.files(), vec![Path::new(path!("/test/file-0"))]);
3206 }
3207
3208 #[gpui::test]
3209 async fn test_tool_result_refusal(cx: &mut TestAppContext) {
3210 use std::sync::atomic::AtomicUsize;
3211 init_test(cx);
3212
3213 let fs = FakeFs::new(cx.executor());
3214 let project = Project::test(fs, None, cx).await;
3215
3216 // Create a connection that simulates refusal after tool result
3217 let prompt_count = Arc::new(AtomicUsize::new(0));
3218 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3219 let prompt_count = prompt_count.clone();
3220 move |_request, thread, mut cx| {
3221 let count = prompt_count.fetch_add(1, SeqCst);
3222 async move {
3223 if count == 0 {
3224 // First prompt: Generate a tool call with result
3225 thread.update(&mut cx, |thread, cx| {
3226 thread
3227 .handle_session_update(
3228 acp::SessionUpdate::ToolCall(acp::ToolCall {
3229 id: acp::ToolCallId("tool1".into()),
3230 title: "Test Tool".into(),
3231 kind: acp::ToolKind::Fetch,
3232 status: acp::ToolCallStatus::Completed,
3233 content: vec![],
3234 locations: vec![],
3235 raw_input: Some(serde_json::json!({"query": "test"})),
3236 raw_output: Some(
3237 serde_json::json!({"result": "inappropriate content"}),
3238 ),
3239 meta: None,
3240 }),
3241 cx,
3242 )
3243 .unwrap();
3244 })?;
3245
3246 // Now return refusal because of the tool result
3247 Ok(acp::PromptResponse {
3248 stop_reason: acp::StopReason::Refusal,
3249 meta: None,
3250 })
3251 } else {
3252 Ok(acp::PromptResponse {
3253 stop_reason: acp::StopReason::EndTurn,
3254 meta: None,
3255 })
3256 }
3257 }
3258 .boxed_local()
3259 }
3260 }));
3261
3262 let thread = cx
3263 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3264 .await
3265 .unwrap();
3266
3267 // Track if we see a Refusal event
3268 let saw_refusal_event = Arc::new(std::sync::Mutex::new(false));
3269 let saw_refusal_event_captured = saw_refusal_event.clone();
3270 thread.update(cx, |_thread, cx| {
3271 cx.subscribe(
3272 &thread,
3273 move |_thread, _event_thread, event: &AcpThreadEvent, _cx| {
3274 if matches!(event, AcpThreadEvent::Refusal) {
3275 *saw_refusal_event_captured.lock().unwrap() = true;
3276 }
3277 },
3278 )
3279 .detach();
3280 });
3281
3282 // Send a user message - this will trigger tool call and then refusal
3283 let send_task = thread.update(cx, |thread, cx| {
3284 thread.send(
3285 vec![acp::ContentBlock::Text(acp::TextContent {
3286 text: "Hello".into(),
3287 annotations: None,
3288 meta: None,
3289 })],
3290 cx,
3291 )
3292 });
3293 cx.background_executor.spawn(send_task).detach();
3294 cx.run_until_parked();
3295
3296 // Verify that:
3297 // 1. A Refusal event WAS emitted (because it's a tool result refusal, not user prompt)
3298 // 2. The user message was NOT truncated
3299 assert!(
3300 *saw_refusal_event.lock().unwrap(),
3301 "Refusal event should be emitted for tool result refusals"
3302 );
3303
3304 thread.read_with(cx, |thread, _| {
3305 let entries = thread.entries();
3306 assert!(entries.len() >= 2, "Should have user message and tool call");
3307
3308 // Verify user message is still there
3309 assert!(
3310 matches!(entries[0], AgentThreadEntry::UserMessage(_)),
3311 "User message should not be truncated"
3312 );
3313
3314 // Verify tool call is there with result
3315 if let AgentThreadEntry::ToolCall(tool_call) = &entries[1] {
3316 assert!(
3317 tool_call.raw_output.is_some(),
3318 "Tool call should have output"
3319 );
3320 } else {
3321 panic!("Expected tool call at index 1");
3322 }
3323 });
3324 }
3325
3326 #[gpui::test]
3327 async fn test_user_prompt_refusal_emits_event(cx: &mut TestAppContext) {
3328 init_test(cx);
3329
3330 let fs = FakeFs::new(cx.executor());
3331 let project = Project::test(fs, None, cx).await;
3332
3333 let refuse_next = Arc::new(AtomicBool::new(false));
3334 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3335 let refuse_next = refuse_next.clone();
3336 move |_request, _thread, _cx| {
3337 if refuse_next.load(SeqCst) {
3338 async move {
3339 Ok(acp::PromptResponse {
3340 stop_reason: acp::StopReason::Refusal,
3341 meta: None,
3342 })
3343 }
3344 .boxed_local()
3345 } else {
3346 async move {
3347 Ok(acp::PromptResponse {
3348 stop_reason: acp::StopReason::EndTurn,
3349 meta: None,
3350 })
3351 }
3352 .boxed_local()
3353 }
3354 }
3355 }));
3356
3357 let thread = cx
3358 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3359 .await
3360 .unwrap();
3361
3362 // Track if we see a Refusal event
3363 let saw_refusal_event = Arc::new(std::sync::Mutex::new(false));
3364 let saw_refusal_event_captured = saw_refusal_event.clone();
3365 thread.update(cx, |_thread, cx| {
3366 cx.subscribe(
3367 &thread,
3368 move |_thread, _event_thread, event: &AcpThreadEvent, _cx| {
3369 if matches!(event, AcpThreadEvent::Refusal) {
3370 *saw_refusal_event_captured.lock().unwrap() = true;
3371 }
3372 },
3373 )
3374 .detach();
3375 });
3376
3377 // Send a message that will be refused
3378 refuse_next.store(true, SeqCst);
3379 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["hello".into()], cx)))
3380 .await
3381 .unwrap();
3382
3383 // Verify that a Refusal event WAS emitted for user prompt refusal
3384 assert!(
3385 *saw_refusal_event.lock().unwrap(),
3386 "Refusal event should be emitted for user prompt refusals"
3387 );
3388
3389 // Verify the message was truncated (user prompt refusal)
3390 thread.read_with(cx, |thread, cx| {
3391 assert_eq!(thread.to_markdown(cx), "");
3392 });
3393 }
3394
3395 #[gpui::test]
3396 async fn test_refusal(cx: &mut TestAppContext) {
3397 init_test(cx);
3398 let fs = FakeFs::new(cx.background_executor.clone());
3399 fs.insert_tree(path!("/"), json!({})).await;
3400 let project = Project::test(fs.clone(), [path!("/").as_ref()], cx).await;
3401
3402 let refuse_next = Arc::new(AtomicBool::new(false));
3403 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3404 let refuse_next = refuse_next.clone();
3405 move |request, thread, mut cx| {
3406 let refuse_next = refuse_next.clone();
3407 async move {
3408 if refuse_next.load(SeqCst) {
3409 return Ok(acp::PromptResponse {
3410 stop_reason: acp::StopReason::Refusal,
3411 meta: None,
3412 });
3413 }
3414
3415 let acp::ContentBlock::Text(content) = &request.prompt[0] else {
3416 panic!("expected text content block");
3417 };
3418 thread.update(&mut cx, |thread, cx| {
3419 thread
3420 .handle_session_update(
3421 acp::SessionUpdate::AgentMessageChunk {
3422 content: content.text.to_uppercase().into(),
3423 },
3424 cx,
3425 )
3426 .unwrap();
3427 })?;
3428 Ok(acp::PromptResponse {
3429 stop_reason: acp::StopReason::EndTurn,
3430 meta: None,
3431 })
3432 }
3433 .boxed_local()
3434 }
3435 }));
3436 let thread = cx
3437 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3438 .await
3439 .unwrap();
3440
3441 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["hello".into()], cx)))
3442 .await
3443 .unwrap();
3444 thread.read_with(cx, |thread, cx| {
3445 assert_eq!(
3446 thread.to_markdown(cx),
3447 indoc! {"
3448 ## User
3449
3450 hello
3451
3452 ## Assistant
3453
3454 HELLO
3455
3456 "}
3457 );
3458 });
3459
3460 // Simulate refusing the second message. The message should be truncated
3461 // when a user prompt is refused.
3462 refuse_next.store(true, SeqCst);
3463 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["world".into()], cx)))
3464 .await
3465 .unwrap();
3466 thread.read_with(cx, |thread, cx| {
3467 assert_eq!(
3468 thread.to_markdown(cx),
3469 indoc! {"
3470 ## User
3471
3472 hello
3473
3474 ## Assistant
3475
3476 HELLO
3477
3478 "}
3479 );
3480 });
3481 }
3482
3483 async fn run_until_first_tool_call(
3484 thread: &Entity<AcpThread>,
3485 cx: &mut TestAppContext,
3486 ) -> usize {
3487 let (mut tx, mut rx) = mpsc::channel::<usize>(1);
3488
3489 let subscription = cx.update(|cx| {
3490 cx.subscribe(thread, move |thread, _, cx| {
3491 for (ix, entry) in thread.read(cx).entries.iter().enumerate() {
3492 if matches!(entry, AgentThreadEntry::ToolCall(_)) {
3493 return tx.try_send(ix).unwrap();
3494 }
3495 }
3496 })
3497 });
3498
3499 select! {
3500 _ = futures::FutureExt::fuse(smol::Timer::after(Duration::from_secs(10))) => {
3501 panic!("Timeout waiting for tool call")
3502 }
3503 ix = rx.next().fuse() => {
3504 drop(subscription);
3505 ix.unwrap()
3506 }
3507 }
3508 }
3509
3510 #[derive(Clone, Default)]
3511 struct FakeAgentConnection {
3512 auth_methods: Vec<acp::AuthMethod>,
3513 sessions: Arc<parking_lot::Mutex<HashMap<acp::SessionId, WeakEntity<AcpThread>>>>,
3514 on_user_message: Option<
3515 Rc<
3516 dyn Fn(
3517 acp::PromptRequest,
3518 WeakEntity<AcpThread>,
3519 AsyncApp,
3520 ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
3521 + 'static,
3522 >,
3523 >,
3524 }
3525
3526 impl FakeAgentConnection {
3527 fn new() -> Self {
3528 Self {
3529 auth_methods: Vec::new(),
3530 on_user_message: None,
3531 sessions: Arc::default(),
3532 }
3533 }
3534
3535 #[expect(unused)]
3536 fn with_auth_methods(mut self, auth_methods: Vec<acp::AuthMethod>) -> Self {
3537 self.auth_methods = auth_methods;
3538 self
3539 }
3540
3541 fn on_user_message(
3542 mut self,
3543 handler: impl Fn(
3544 acp::PromptRequest,
3545 WeakEntity<AcpThread>,
3546 AsyncApp,
3547 ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
3548 + 'static,
3549 ) -> Self {
3550 self.on_user_message.replace(Rc::new(handler));
3551 self
3552 }
3553 }
3554
3555 impl AgentConnection for FakeAgentConnection {
3556 fn auth_methods(&self) -> &[acp::AuthMethod] {
3557 &self.auth_methods
3558 }
3559
3560 fn new_thread(
3561 self: Rc<Self>,
3562 project: Entity<Project>,
3563 _cwd: &Path,
3564 cx: &mut App,
3565 ) -> Task<gpui::Result<Entity<AcpThread>>> {
3566 let session_id = acp::SessionId(
3567 rand::rng()
3568 .sample_iter(&distr::Alphanumeric)
3569 .take(7)
3570 .map(char::from)
3571 .collect::<String>()
3572 .into(),
3573 );
3574 let action_log = cx.new(|_| ActionLog::new(project.clone()));
3575 let thread = cx.new(|cx| {
3576 AcpThread::new(
3577 "Test",
3578 self.clone(),
3579 project,
3580 action_log,
3581 session_id.clone(),
3582 watch::Receiver::constant(acp::PromptCapabilities {
3583 image: true,
3584 audio: true,
3585 embedded_context: true,
3586 meta: None,
3587 }),
3588 cx,
3589 )
3590 });
3591 self.sessions.lock().insert(session_id, thread.downgrade());
3592 Task::ready(Ok(thread))
3593 }
3594
3595 fn authenticate(&self, method: acp::AuthMethodId, _cx: &mut App) -> Task<gpui::Result<()>> {
3596 if self.auth_methods().iter().any(|m| m.id == method) {
3597 Task::ready(Ok(()))
3598 } else {
3599 Task::ready(Err(anyhow!("Invalid Auth Method")))
3600 }
3601 }
3602
3603 fn prompt(
3604 &self,
3605 _id: Option<UserMessageId>,
3606 params: acp::PromptRequest,
3607 cx: &mut App,
3608 ) -> Task<gpui::Result<acp::PromptResponse>> {
3609 let sessions = self.sessions.lock();
3610 let thread = sessions.get(¶ms.session_id).unwrap();
3611 if let Some(handler) = &self.on_user_message {
3612 let handler = handler.clone();
3613 let thread = thread.clone();
3614 cx.spawn(async move |cx| handler(params, thread, cx.clone()).await)
3615 } else {
3616 Task::ready(Ok(acp::PromptResponse {
3617 stop_reason: acp::StopReason::EndTurn,
3618 meta: None,
3619 }))
3620 }
3621 }
3622
3623 fn cancel(&self, session_id: &acp::SessionId, cx: &mut App) {
3624 let sessions = self.sessions.lock();
3625 let thread = sessions.get(session_id).unwrap().clone();
3626
3627 cx.spawn(async move |cx| {
3628 thread
3629 .update(cx, |thread, cx| thread.cancel(cx))
3630 .unwrap()
3631 .await
3632 })
3633 .detach();
3634 }
3635
3636 fn truncate(
3637 &self,
3638 session_id: &acp::SessionId,
3639 _cx: &App,
3640 ) -> Option<Rc<dyn AgentSessionTruncate>> {
3641 Some(Rc::new(FakeAgentSessionEditor {
3642 _session_id: session_id.clone(),
3643 }))
3644 }
3645
3646 fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
3647 self
3648 }
3649 }
3650
3651 struct FakeAgentSessionEditor {
3652 _session_id: acp::SessionId,
3653 }
3654
3655 impl AgentSessionTruncate for FakeAgentSessionEditor {
3656 fn run(&self, _message_id: UserMessageId, _cx: &mut App) -> Task<Result<()>> {
3657 Task::ready(Ok(()))
3658 }
3659 }
3660
3661 #[gpui::test]
3662 async fn test_tool_call_not_found_creates_failed_entry(cx: &mut TestAppContext) {
3663 init_test(cx);
3664
3665 let fs = FakeFs::new(cx.executor());
3666 let project = Project::test(fs, [], cx).await;
3667 let connection = Rc::new(FakeAgentConnection::new());
3668 let thread = cx
3669 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3670 .await
3671 .unwrap();
3672
3673 // Try to update a tool call that doesn't exist
3674 let nonexistent_id = acp::ToolCallId("nonexistent-tool-call".into());
3675 thread.update(cx, |thread, cx| {
3676 let result = thread.handle_session_update(
3677 acp::SessionUpdate::ToolCallUpdate(acp::ToolCallUpdate {
3678 id: nonexistent_id.clone(),
3679 fields: acp::ToolCallUpdateFields {
3680 status: Some(acp::ToolCallStatus::Completed),
3681 ..Default::default()
3682 },
3683 meta: None,
3684 }),
3685 cx,
3686 );
3687
3688 // The update should succeed (not return an error)
3689 assert!(result.is_ok());
3690
3691 // There should now be exactly one entry in the thread
3692 assert_eq!(thread.entries.len(), 1);
3693
3694 // The entry should be a failed tool call
3695 if let AgentThreadEntry::ToolCall(tool_call) = &thread.entries[0] {
3696 assert_eq!(tool_call.id, nonexistent_id);
3697 assert!(matches!(tool_call.status, ToolCallStatus::Failed));
3698 assert_eq!(tool_call.kind, acp::ToolKind::Fetch);
3699
3700 // Check that the content contains the error message
3701 assert_eq!(tool_call.content.len(), 1);
3702 if let ToolCallContent::ContentBlock(content_block) = &tool_call.content[0] {
3703 match content_block {
3704 ContentBlock::Markdown { markdown } => {
3705 let markdown_text = markdown.read(cx).source();
3706 assert!(markdown_text.contains("Tool call not found"));
3707 }
3708 ContentBlock::Empty => panic!("Expected markdown content, got empty"),
3709 ContentBlock::ResourceLink { .. } => {
3710 panic!("Expected markdown content, got resource link")
3711 }
3712 }
3713 } else {
3714 panic!("Expected ContentBlock, got: {:?}", tool_call.content[0]);
3715 }
3716 } else {
3717 panic!("Expected ToolCall entry, got: {:?}", thread.entries[0]);
3718 }
3719 });
3720 }
3721}