1mod connection;
2mod diff;
3mod mention;
4mod terminal;
5
6use ::terminal::terminal_settings::TerminalSettings;
7use agent_settings::AgentSettings;
8use collections::HashSet;
9pub use connection::*;
10pub use diff::*;
11use language::language_settings::FormatOnSave;
12pub use mention::*;
13use project::lsp_store::{FormatTrigger, LspFormatTarget};
14use serde::{Deserialize, Serialize};
15use settings::{Settings as _, SettingsLocation};
16use task::{Shell, ShellBuilder};
17pub use terminal::*;
18
19use action_log::ActionLog;
20use agent_client_protocol::{self as acp};
21use anyhow::{Context as _, Result, anyhow};
22use editor::Bias;
23use futures::{FutureExt, channel::oneshot, future::BoxFuture};
24use gpui::{AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Task, WeakEntity};
25use itertools::Itertools;
26use language::{Anchor, Buffer, BufferSnapshot, LanguageRegistry, Point, ToPoint, text_diff};
27use markdown::Markdown;
28use project::{AgentLocation, Project, git_store::GitStoreCheckpoint};
29use std::collections::HashMap;
30use std::error::Error;
31use std::fmt::{Formatter, Write};
32use std::ops::Range;
33use std::process::ExitStatus;
34use std::rc::Rc;
35use std::time::{Duration, Instant};
36use std::{fmt::Display, mem, path::PathBuf, sync::Arc};
37use ui::App;
38use util::{ResultExt, get_default_system_shell_preferring_bash};
39use uuid::Uuid;
40
41#[derive(Debug)]
42pub struct UserMessage {
43 pub id: Option<UserMessageId>,
44 pub content: ContentBlock,
45 pub chunks: Vec<acp::ContentBlock>,
46 pub checkpoint: Option<Checkpoint>,
47}
48
49#[derive(Debug)]
50pub struct Checkpoint {
51 git_checkpoint: GitStoreCheckpoint,
52 pub show: bool,
53}
54
55impl UserMessage {
56 fn to_markdown(&self, cx: &App) -> String {
57 let mut markdown = String::new();
58 if self
59 .checkpoint
60 .as_ref()
61 .is_some_and(|checkpoint| checkpoint.show)
62 {
63 writeln!(markdown, "## User (checkpoint)").unwrap();
64 } else {
65 writeln!(markdown, "## User").unwrap();
66 }
67 writeln!(markdown).unwrap();
68 writeln!(markdown, "{}", self.content.to_markdown(cx)).unwrap();
69 writeln!(markdown).unwrap();
70 markdown
71 }
72}
73
74#[derive(Debug, PartialEq)]
75pub struct AssistantMessage {
76 pub chunks: Vec<AssistantMessageChunk>,
77}
78
79impl AssistantMessage {
80 pub fn to_markdown(&self, cx: &App) -> String {
81 format!(
82 "## Assistant\n\n{}\n\n",
83 self.chunks
84 .iter()
85 .map(|chunk| chunk.to_markdown(cx))
86 .join("\n\n")
87 )
88 }
89}
90
91#[derive(Debug, PartialEq)]
92pub enum AssistantMessageChunk {
93 Message { block: ContentBlock },
94 Thought { block: ContentBlock },
95}
96
97impl AssistantMessageChunk {
98 pub fn from_str(chunk: &str, language_registry: &Arc<LanguageRegistry>, cx: &mut App) -> Self {
99 Self::Message {
100 block: ContentBlock::new(chunk.into(), language_registry, cx),
101 }
102 }
103
104 fn to_markdown(&self, cx: &App) -> String {
105 match self {
106 Self::Message { block } => block.to_markdown(cx).to_string(),
107 Self::Thought { block } => {
108 format!("<thinking>\n{}\n</thinking>", block.to_markdown(cx))
109 }
110 }
111 }
112}
113
114#[derive(Debug)]
115pub enum AgentThreadEntry {
116 UserMessage(UserMessage),
117 AssistantMessage(AssistantMessage),
118 ToolCall(ToolCall),
119}
120
121impl AgentThreadEntry {
122 pub fn to_markdown(&self, cx: &App) -> String {
123 match self {
124 Self::UserMessage(message) => message.to_markdown(cx),
125 Self::AssistantMessage(message) => message.to_markdown(cx),
126 Self::ToolCall(tool_call) => tool_call.to_markdown(cx),
127 }
128 }
129
130 pub fn user_message(&self) -> Option<&UserMessage> {
131 if let AgentThreadEntry::UserMessage(message) = self {
132 Some(message)
133 } else {
134 None
135 }
136 }
137
138 pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
139 if let AgentThreadEntry::ToolCall(call) = self {
140 itertools::Either::Left(call.diffs())
141 } else {
142 itertools::Either::Right(std::iter::empty())
143 }
144 }
145
146 pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
147 if let AgentThreadEntry::ToolCall(call) = self {
148 itertools::Either::Left(call.terminals())
149 } else {
150 itertools::Either::Right(std::iter::empty())
151 }
152 }
153
154 pub fn location(&self, ix: usize) -> Option<(acp::ToolCallLocation, AgentLocation)> {
155 if let AgentThreadEntry::ToolCall(ToolCall {
156 locations,
157 resolved_locations,
158 ..
159 }) = self
160 {
161 Some((
162 locations.get(ix)?.clone(),
163 resolved_locations.get(ix)?.clone()?,
164 ))
165 } else {
166 None
167 }
168 }
169}
170
171#[derive(Debug)]
172pub struct ToolCall {
173 pub id: acp::ToolCallId,
174 pub label: Entity<Markdown>,
175 pub kind: acp::ToolKind,
176 pub content: Vec<ToolCallContent>,
177 pub status: ToolCallStatus,
178 pub locations: Vec<acp::ToolCallLocation>,
179 pub resolved_locations: Vec<Option<AgentLocation>>,
180 pub raw_input: Option<serde_json::Value>,
181 pub raw_output: Option<serde_json::Value>,
182}
183
184impl ToolCall {
185 fn from_acp(
186 tool_call: acp::ToolCall,
187 status: ToolCallStatus,
188 language_registry: Arc<LanguageRegistry>,
189 terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
190 cx: &mut App,
191 ) -> Result<Self> {
192 let title = if let Some((first_line, _)) = tool_call.title.split_once("\n") {
193 first_line.to_owned() + "…"
194 } else {
195 tool_call.title
196 };
197 let mut content = Vec::with_capacity(tool_call.content.len());
198 for item in tool_call.content {
199 content.push(ToolCallContent::from_acp(
200 item,
201 language_registry.clone(),
202 terminals,
203 cx,
204 )?);
205 }
206
207 let result = Self {
208 id: tool_call.id,
209 label: cx
210 .new(|cx| Markdown::new(title.into(), Some(language_registry.clone()), None, cx)),
211 kind: tool_call.kind,
212 content,
213 locations: tool_call.locations,
214 resolved_locations: Vec::default(),
215 status,
216 raw_input: tool_call.raw_input,
217 raw_output: tool_call.raw_output,
218 };
219 Ok(result)
220 }
221
222 fn update_fields(
223 &mut self,
224 fields: acp::ToolCallUpdateFields,
225 language_registry: Arc<LanguageRegistry>,
226 terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
227 cx: &mut App,
228 ) -> Result<()> {
229 let acp::ToolCallUpdateFields {
230 kind,
231 status,
232 title,
233 content,
234 locations,
235 raw_input,
236 raw_output,
237 } = fields;
238
239 if let Some(kind) = kind {
240 self.kind = kind;
241 }
242
243 if let Some(status) = status {
244 self.status = status.into();
245 }
246
247 if let Some(title) = title {
248 self.label.update(cx, |label, cx| {
249 if let Some((first_line, _)) = title.split_once("\n") {
250 label.replace(first_line.to_owned() + "…", cx)
251 } else {
252 label.replace(title, cx);
253 }
254 });
255 }
256
257 if let Some(content) = content {
258 let new_content_len = content.len();
259 let mut content = content.into_iter();
260
261 // Reuse existing content if we can
262 for (old, new) in self.content.iter_mut().zip(content.by_ref()) {
263 old.update_from_acp(new, language_registry.clone(), terminals, cx)?;
264 }
265 for new in content {
266 self.content.push(ToolCallContent::from_acp(
267 new,
268 language_registry.clone(),
269 terminals,
270 cx,
271 )?)
272 }
273 self.content.truncate(new_content_len);
274 }
275
276 if let Some(locations) = locations {
277 self.locations = locations;
278 }
279
280 if let Some(raw_input) = raw_input {
281 self.raw_input = Some(raw_input);
282 }
283
284 if let Some(raw_output) = raw_output {
285 if self.content.is_empty()
286 && let Some(markdown) = markdown_for_raw_output(&raw_output, &language_registry, cx)
287 {
288 self.content
289 .push(ToolCallContent::ContentBlock(ContentBlock::Markdown {
290 markdown,
291 }));
292 }
293 self.raw_output = Some(raw_output);
294 }
295 Ok(())
296 }
297
298 pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
299 self.content.iter().filter_map(|content| match content {
300 ToolCallContent::Diff(diff) => Some(diff),
301 ToolCallContent::ContentBlock(_) => None,
302 ToolCallContent::Terminal(_) => None,
303 })
304 }
305
306 pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
307 self.content.iter().filter_map(|content| match content {
308 ToolCallContent::Terminal(terminal) => Some(terminal),
309 ToolCallContent::ContentBlock(_) => None,
310 ToolCallContent::Diff(_) => None,
311 })
312 }
313
314 fn to_markdown(&self, cx: &App) -> String {
315 let mut markdown = format!(
316 "**Tool Call: {}**\nStatus: {}\n\n",
317 self.label.read(cx).source(),
318 self.status
319 );
320 for content in &self.content {
321 markdown.push_str(content.to_markdown(cx).as_str());
322 markdown.push_str("\n\n");
323 }
324 markdown
325 }
326
327 async fn resolve_location(
328 location: acp::ToolCallLocation,
329 project: WeakEntity<Project>,
330 cx: &mut AsyncApp,
331 ) -> Option<AgentLocation> {
332 let buffer = project
333 .update(cx, |project, cx| {
334 project
335 .project_path_for_absolute_path(&location.path, cx)
336 .map(|path| project.open_buffer(path, cx))
337 })
338 .ok()??;
339 let buffer = buffer.await.log_err()?;
340 let position = buffer
341 .update(cx, |buffer, _| {
342 if let Some(row) = location.line {
343 let snapshot = buffer.snapshot();
344 let column = snapshot.indent_size_for_line(row).len;
345 let point = snapshot.clip_point(Point::new(row, column), Bias::Left);
346 snapshot.anchor_before(point)
347 } else {
348 Anchor::MIN
349 }
350 })
351 .ok()?;
352
353 Some(AgentLocation {
354 buffer: buffer.downgrade(),
355 position,
356 })
357 }
358
359 fn resolve_locations(
360 &self,
361 project: Entity<Project>,
362 cx: &mut App,
363 ) -> Task<Vec<Option<AgentLocation>>> {
364 let locations = self.locations.clone();
365 project.update(cx, |_, cx| {
366 cx.spawn(async move |project, cx| {
367 let mut new_locations = Vec::new();
368 for location in locations {
369 new_locations.push(Self::resolve_location(location, project.clone(), cx).await);
370 }
371 new_locations
372 })
373 })
374 }
375}
376
377#[derive(Debug)]
378pub enum ToolCallStatus {
379 /// The tool call hasn't started running yet, but we start showing it to
380 /// the user.
381 Pending,
382 /// The tool call is waiting for confirmation from the user.
383 WaitingForConfirmation {
384 options: Vec<acp::PermissionOption>,
385 respond_tx: oneshot::Sender<acp::PermissionOptionId>,
386 },
387 /// The tool call is currently running.
388 InProgress,
389 /// The tool call completed successfully.
390 Completed,
391 /// The tool call failed.
392 Failed,
393 /// The user rejected the tool call.
394 Rejected,
395 /// The user canceled generation so the tool call was canceled.
396 Canceled,
397}
398
399impl From<acp::ToolCallStatus> for ToolCallStatus {
400 fn from(status: acp::ToolCallStatus) -> Self {
401 match status {
402 acp::ToolCallStatus::Pending => Self::Pending,
403 acp::ToolCallStatus::InProgress => Self::InProgress,
404 acp::ToolCallStatus::Completed => Self::Completed,
405 acp::ToolCallStatus::Failed => Self::Failed,
406 }
407 }
408}
409
410impl Display for ToolCallStatus {
411 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
412 write!(
413 f,
414 "{}",
415 match self {
416 ToolCallStatus::Pending => "Pending",
417 ToolCallStatus::WaitingForConfirmation { .. } => "Waiting for confirmation",
418 ToolCallStatus::InProgress => "In Progress",
419 ToolCallStatus::Completed => "Completed",
420 ToolCallStatus::Failed => "Failed",
421 ToolCallStatus::Rejected => "Rejected",
422 ToolCallStatus::Canceled => "Canceled",
423 }
424 )
425 }
426}
427
428#[derive(Debug, PartialEq, Clone)]
429pub enum ContentBlock {
430 Empty,
431 Markdown { markdown: Entity<Markdown> },
432 ResourceLink { resource_link: acp::ResourceLink },
433}
434
435impl ContentBlock {
436 pub fn new(
437 block: acp::ContentBlock,
438 language_registry: &Arc<LanguageRegistry>,
439 cx: &mut App,
440 ) -> Self {
441 let mut this = Self::Empty;
442 this.append(block, language_registry, cx);
443 this
444 }
445
446 pub fn new_combined(
447 blocks: impl IntoIterator<Item = acp::ContentBlock>,
448 language_registry: Arc<LanguageRegistry>,
449 cx: &mut App,
450 ) -> Self {
451 let mut this = Self::Empty;
452 for block in blocks {
453 this.append(block, &language_registry, cx);
454 }
455 this
456 }
457
458 pub fn append(
459 &mut self,
460 block: acp::ContentBlock,
461 language_registry: &Arc<LanguageRegistry>,
462 cx: &mut App,
463 ) {
464 if matches!(self, ContentBlock::Empty)
465 && let acp::ContentBlock::ResourceLink(resource_link) = block
466 {
467 *self = ContentBlock::ResourceLink { resource_link };
468 return;
469 }
470
471 let new_content = self.block_string_contents(block);
472
473 match self {
474 ContentBlock::Empty => {
475 *self = Self::create_markdown_block(new_content, language_registry, cx);
476 }
477 ContentBlock::Markdown { markdown } => {
478 markdown.update(cx, |markdown, cx| markdown.append(&new_content, cx));
479 }
480 ContentBlock::ResourceLink { resource_link } => {
481 let existing_content = Self::resource_link_md(&resource_link.uri);
482 let combined = format!("{}\n{}", existing_content, new_content);
483
484 *self = Self::create_markdown_block(combined, language_registry, cx);
485 }
486 }
487 }
488
489 fn create_markdown_block(
490 content: String,
491 language_registry: &Arc<LanguageRegistry>,
492 cx: &mut App,
493 ) -> ContentBlock {
494 ContentBlock::Markdown {
495 markdown: cx
496 .new(|cx| Markdown::new(content.into(), Some(language_registry.clone()), None, cx)),
497 }
498 }
499
500 fn block_string_contents(&self, block: acp::ContentBlock) -> String {
501 match block {
502 acp::ContentBlock::Text(text_content) => text_content.text,
503 acp::ContentBlock::ResourceLink(resource_link) => {
504 Self::resource_link_md(&resource_link.uri)
505 }
506 acp::ContentBlock::Resource(acp::EmbeddedResource {
507 resource:
508 acp::EmbeddedResourceResource::TextResourceContents(acp::TextResourceContents {
509 uri,
510 ..
511 }),
512 ..
513 }) => Self::resource_link_md(&uri),
514 acp::ContentBlock::Image(image) => Self::image_md(&image),
515 acp::ContentBlock::Audio(_) | acp::ContentBlock::Resource(_) => String::new(),
516 }
517 }
518
519 fn resource_link_md(uri: &str) -> String {
520 if let Some(uri) = MentionUri::parse(uri).log_err() {
521 uri.as_link().to_string()
522 } else {
523 uri.to_string()
524 }
525 }
526
527 fn image_md(_image: &acp::ImageContent) -> String {
528 "`Image`".into()
529 }
530
531 pub fn to_markdown<'a>(&'a self, cx: &'a App) -> &'a str {
532 match self {
533 ContentBlock::Empty => "",
534 ContentBlock::Markdown { markdown } => markdown.read(cx).source(),
535 ContentBlock::ResourceLink { resource_link } => &resource_link.uri,
536 }
537 }
538
539 pub fn markdown(&self) -> Option<&Entity<Markdown>> {
540 match self {
541 ContentBlock::Empty => None,
542 ContentBlock::Markdown { markdown } => Some(markdown),
543 ContentBlock::ResourceLink { .. } => None,
544 }
545 }
546
547 pub fn resource_link(&self) -> Option<&acp::ResourceLink> {
548 match self {
549 ContentBlock::ResourceLink { resource_link } => Some(resource_link),
550 _ => None,
551 }
552 }
553}
554
555#[derive(Debug)]
556pub enum ToolCallContent {
557 ContentBlock(ContentBlock),
558 Diff(Entity<Diff>),
559 Terminal(Entity<Terminal>),
560}
561
562impl ToolCallContent {
563 pub fn from_acp(
564 content: acp::ToolCallContent,
565 language_registry: Arc<LanguageRegistry>,
566 terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
567 cx: &mut App,
568 ) -> Result<Self> {
569 match content {
570 acp::ToolCallContent::Content { content } => Ok(Self::ContentBlock(ContentBlock::new(
571 content,
572 &language_registry,
573 cx,
574 ))),
575 acp::ToolCallContent::Diff { diff } => Ok(Self::Diff(cx.new(|cx| {
576 Diff::finalized(
577 diff.path.to_string_lossy().into_owned(),
578 diff.old_text,
579 diff.new_text,
580 language_registry,
581 cx,
582 )
583 }))),
584 acp::ToolCallContent::Terminal { terminal_id } => terminals
585 .get(&terminal_id)
586 .cloned()
587 .map(Self::Terminal)
588 .ok_or_else(|| anyhow::anyhow!("Terminal with id `{}` not found", terminal_id)),
589 }
590 }
591
592 pub fn update_from_acp(
593 &mut self,
594 new: acp::ToolCallContent,
595 language_registry: Arc<LanguageRegistry>,
596 terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
597 cx: &mut App,
598 ) -> Result<()> {
599 let needs_update = match (&self, &new) {
600 (Self::Diff(old_diff), acp::ToolCallContent::Diff { diff: new_diff }) => {
601 old_diff.read(cx).needs_update(
602 new_diff.old_text.as_deref().unwrap_or(""),
603 &new_diff.new_text,
604 cx,
605 )
606 }
607 _ => true,
608 };
609
610 if needs_update {
611 *self = Self::from_acp(new, language_registry, terminals, cx)?;
612 }
613 Ok(())
614 }
615
616 pub fn to_markdown(&self, cx: &App) -> String {
617 match self {
618 Self::ContentBlock(content) => content.to_markdown(cx).to_string(),
619 Self::Diff(diff) => diff.read(cx).to_markdown(cx),
620 Self::Terminal(terminal) => terminal.read(cx).to_markdown(cx),
621 }
622 }
623}
624
625#[derive(Debug, PartialEq)]
626pub enum ToolCallUpdate {
627 UpdateFields(acp::ToolCallUpdate),
628 UpdateDiff(ToolCallUpdateDiff),
629 UpdateTerminal(ToolCallUpdateTerminal),
630}
631
632impl ToolCallUpdate {
633 fn id(&self) -> &acp::ToolCallId {
634 match self {
635 Self::UpdateFields(update) => &update.id,
636 Self::UpdateDiff(diff) => &diff.id,
637 Self::UpdateTerminal(terminal) => &terminal.id,
638 }
639 }
640}
641
642impl From<acp::ToolCallUpdate> for ToolCallUpdate {
643 fn from(update: acp::ToolCallUpdate) -> Self {
644 Self::UpdateFields(update)
645 }
646}
647
648impl From<ToolCallUpdateDiff> for ToolCallUpdate {
649 fn from(diff: ToolCallUpdateDiff) -> Self {
650 Self::UpdateDiff(diff)
651 }
652}
653
654#[derive(Debug, PartialEq)]
655pub struct ToolCallUpdateDiff {
656 pub id: acp::ToolCallId,
657 pub diff: Entity<Diff>,
658}
659
660impl From<ToolCallUpdateTerminal> for ToolCallUpdate {
661 fn from(terminal: ToolCallUpdateTerminal) -> Self {
662 Self::UpdateTerminal(terminal)
663 }
664}
665
666#[derive(Debug, PartialEq)]
667pub struct ToolCallUpdateTerminal {
668 pub id: acp::ToolCallId,
669 pub terminal: Entity<Terminal>,
670}
671
672#[derive(Debug, Default)]
673pub struct Plan {
674 pub entries: Vec<PlanEntry>,
675}
676
677#[derive(Debug)]
678pub struct PlanStats<'a> {
679 pub in_progress_entry: Option<&'a PlanEntry>,
680 pub pending: u32,
681 pub completed: u32,
682}
683
684impl Plan {
685 pub fn is_empty(&self) -> bool {
686 self.entries.is_empty()
687 }
688
689 pub fn stats(&self) -> PlanStats<'_> {
690 let mut stats = PlanStats {
691 in_progress_entry: None,
692 pending: 0,
693 completed: 0,
694 };
695
696 for entry in &self.entries {
697 match &entry.status {
698 acp::PlanEntryStatus::Pending => {
699 stats.pending += 1;
700 }
701 acp::PlanEntryStatus::InProgress => {
702 stats.in_progress_entry = stats.in_progress_entry.or(Some(entry));
703 }
704 acp::PlanEntryStatus::Completed => {
705 stats.completed += 1;
706 }
707 }
708 }
709
710 stats
711 }
712}
713
714#[derive(Debug)]
715pub struct PlanEntry {
716 pub content: Entity<Markdown>,
717 pub priority: acp::PlanEntryPriority,
718 pub status: acp::PlanEntryStatus,
719}
720
721impl PlanEntry {
722 pub fn from_acp(entry: acp::PlanEntry, cx: &mut App) -> Self {
723 Self {
724 content: cx.new(|cx| Markdown::new(entry.content.into(), None, None, cx)),
725 priority: entry.priority,
726 status: entry.status,
727 }
728 }
729}
730
731#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
732pub struct TokenUsage {
733 pub max_tokens: u64,
734 pub used_tokens: u64,
735}
736
737impl TokenUsage {
738 pub fn ratio(&self) -> TokenUsageRatio {
739 #[cfg(debug_assertions)]
740 let warning_threshold: f32 = std::env::var("ZED_THREAD_WARNING_THRESHOLD")
741 .unwrap_or("0.8".to_string())
742 .parse()
743 .unwrap();
744 #[cfg(not(debug_assertions))]
745 let warning_threshold: f32 = 0.8;
746
747 // When the maximum is unknown because there is no selected model,
748 // avoid showing the token limit warning.
749 if self.max_tokens == 0 {
750 TokenUsageRatio::Normal
751 } else if self.used_tokens >= self.max_tokens {
752 TokenUsageRatio::Exceeded
753 } else if self.used_tokens as f32 / self.max_tokens as f32 >= warning_threshold {
754 TokenUsageRatio::Warning
755 } else {
756 TokenUsageRatio::Normal
757 }
758 }
759}
760
761#[derive(Debug, Clone, PartialEq, Eq)]
762pub enum TokenUsageRatio {
763 Normal,
764 Warning,
765 Exceeded,
766}
767
768#[derive(Debug, Clone)]
769pub struct RetryStatus {
770 pub last_error: SharedString,
771 pub attempt: usize,
772 pub max_attempts: usize,
773 pub started_at: Instant,
774 pub duration: Duration,
775}
776
777pub struct AcpThread {
778 title: SharedString,
779 entries: Vec<AgentThreadEntry>,
780 plan: Plan,
781 project: Entity<Project>,
782 action_log: Entity<ActionLog>,
783 shared_buffers: HashMap<Entity<Buffer>, BufferSnapshot>,
784 send_task: Option<Task<()>>,
785 connection: Rc<dyn AgentConnection>,
786 session_id: acp::SessionId,
787 token_usage: Option<TokenUsage>,
788 prompt_capabilities: acp::PromptCapabilities,
789 _observe_prompt_capabilities: Task<anyhow::Result<()>>,
790 terminals: HashMap<acp::TerminalId, Entity<Terminal>>,
791 pending_terminal_output: HashMap<acp::TerminalId, Vec<Vec<u8>>>,
792 pending_terminal_exit: HashMap<acp::TerminalId, acp::TerminalExitStatus>,
793}
794
795#[derive(Debug)]
796pub enum AcpThreadEvent {
797 NewEntry,
798 TitleUpdated,
799 TokenUsageUpdated,
800 EntryUpdated(usize),
801 EntriesRemoved(Range<usize>),
802 ToolAuthorizationRequired,
803 Retry(RetryStatus),
804 Stopped,
805 Error,
806 LoadError(LoadError),
807 PromptCapabilitiesUpdated,
808 Refusal,
809 AvailableCommandsUpdated(Vec<acp::AvailableCommand>),
810 ModeUpdated(acp::SessionModeId),
811}
812
813impl EventEmitter<AcpThreadEvent> for AcpThread {}
814
815#[derive(Debug, Clone)]
816pub enum TerminalProviderEvent {
817 Created {
818 terminal_id: acp::TerminalId,
819 label: String,
820 cwd: Option<PathBuf>,
821 output_byte_limit: Option<u64>,
822 terminal: Entity<::terminal::Terminal>,
823 },
824 Output {
825 terminal_id: acp::TerminalId,
826 data: Vec<u8>,
827 },
828 TitleChanged {
829 terminal_id: acp::TerminalId,
830 title: String,
831 },
832 Exit {
833 terminal_id: acp::TerminalId,
834 status: acp::TerminalExitStatus,
835 },
836}
837
838#[derive(Debug, Clone)]
839pub enum TerminalProviderCommand {
840 WriteInput {
841 terminal_id: acp::TerminalId,
842 bytes: Vec<u8>,
843 },
844 Resize {
845 terminal_id: acp::TerminalId,
846 cols: u16,
847 rows: u16,
848 },
849 Close {
850 terminal_id: acp::TerminalId,
851 },
852}
853
854impl AcpThread {
855 pub fn on_terminal_provider_event(
856 &mut self,
857 event: TerminalProviderEvent,
858 cx: &mut Context<Self>,
859 ) {
860 match event {
861 TerminalProviderEvent::Created {
862 terminal_id,
863 label,
864 cwd,
865 output_byte_limit,
866 terminal,
867 } => {
868 let entity = self.register_terminal_created(
869 terminal_id.clone(),
870 label,
871 cwd,
872 output_byte_limit,
873 terminal,
874 cx,
875 );
876
877 if let Some(mut chunks) = self.pending_terminal_output.remove(&terminal_id) {
878 for data in chunks.drain(..) {
879 entity.update(cx, |term, cx| {
880 term.inner().update(cx, |inner, cx| {
881 inner.write_output(&data, cx);
882 })
883 });
884 }
885 }
886
887 if let Some(_status) = self.pending_terminal_exit.remove(&terminal_id) {
888 entity.update(cx, |_term, cx| {
889 cx.notify();
890 });
891 }
892
893 cx.notify();
894 }
895 TerminalProviderEvent::Output { terminal_id, data } => {
896 if let Some(entity) = self.terminals.get(&terminal_id) {
897 entity.update(cx, |term, cx| {
898 term.inner().update(cx, |inner, cx| {
899 inner.write_output(&data, cx);
900 })
901 });
902 } else {
903 self.pending_terminal_output
904 .entry(terminal_id)
905 .or_default()
906 .push(data);
907 }
908 }
909 TerminalProviderEvent::TitleChanged { terminal_id, title } => {
910 if let Some(entity) = self.terminals.get(&terminal_id) {
911 entity.update(cx, |term, cx| {
912 term.inner().update(cx, |inner, cx| {
913 inner.breadcrumb_text = title;
914 cx.emit(::terminal::Event::BreadcrumbsChanged);
915 })
916 });
917 }
918 }
919 TerminalProviderEvent::Exit {
920 terminal_id,
921 status,
922 } => {
923 if let Some(entity) = self.terminals.get(&terminal_id) {
924 entity.update(cx, |_term, cx| {
925 cx.notify();
926 });
927 } else {
928 self.pending_terminal_exit.insert(terminal_id, status);
929 }
930 }
931 }
932 }
933}
934
935#[derive(PartialEq, Eq, Debug)]
936pub enum ThreadStatus {
937 Idle,
938 Generating,
939}
940
941#[derive(Debug, Clone)]
942pub enum LoadError {
943 Unsupported {
944 command: SharedString,
945 current_version: SharedString,
946 minimum_version: SharedString,
947 },
948 FailedToInstall(SharedString),
949 Exited {
950 status: ExitStatus,
951 },
952 Other(SharedString),
953}
954
955impl Display for LoadError {
956 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
957 match self {
958 LoadError::Unsupported {
959 command: path,
960 current_version,
961 minimum_version,
962 } => {
963 write!(
964 f,
965 "version {current_version} from {path} is not supported (need at least {minimum_version})"
966 )
967 }
968 LoadError::FailedToInstall(msg) => write!(f, "Failed to install: {msg}"),
969 LoadError::Exited { status } => write!(f, "Server exited with status {status}"),
970 LoadError::Other(msg) => write!(f, "{msg}"),
971 }
972 }
973}
974
975impl Error for LoadError {}
976
977impl AcpThread {
978 pub fn new(
979 title: impl Into<SharedString>,
980 connection: Rc<dyn AgentConnection>,
981 project: Entity<Project>,
982 action_log: Entity<ActionLog>,
983 session_id: acp::SessionId,
984 mut prompt_capabilities_rx: watch::Receiver<acp::PromptCapabilities>,
985 cx: &mut Context<Self>,
986 ) -> Self {
987 let prompt_capabilities = prompt_capabilities_rx.borrow().clone();
988 let task = cx.spawn::<_, anyhow::Result<()>>(async move |this, cx| {
989 loop {
990 let caps = prompt_capabilities_rx.recv().await?;
991 this.update(cx, |this, cx| {
992 this.prompt_capabilities = caps;
993 cx.emit(AcpThreadEvent::PromptCapabilitiesUpdated);
994 })?;
995 }
996 });
997
998 Self {
999 action_log,
1000 shared_buffers: Default::default(),
1001 entries: Default::default(),
1002 plan: Default::default(),
1003 title: title.into(),
1004 project,
1005 send_task: None,
1006 connection,
1007 session_id,
1008 token_usage: None,
1009 prompt_capabilities,
1010 _observe_prompt_capabilities: task,
1011 terminals: HashMap::default(),
1012 pending_terminal_output: HashMap::default(),
1013 pending_terminal_exit: HashMap::default(),
1014 }
1015 }
1016
1017 pub fn prompt_capabilities(&self) -> acp::PromptCapabilities {
1018 self.prompt_capabilities.clone()
1019 }
1020
1021 pub fn connection(&self) -> &Rc<dyn AgentConnection> {
1022 &self.connection
1023 }
1024
1025 pub fn action_log(&self) -> &Entity<ActionLog> {
1026 &self.action_log
1027 }
1028
1029 pub fn project(&self) -> &Entity<Project> {
1030 &self.project
1031 }
1032
1033 pub fn title(&self) -> SharedString {
1034 self.title.clone()
1035 }
1036
1037 pub fn entries(&self) -> &[AgentThreadEntry] {
1038 &self.entries
1039 }
1040
1041 pub fn session_id(&self) -> &acp::SessionId {
1042 &self.session_id
1043 }
1044
1045 pub fn status(&self) -> ThreadStatus {
1046 if self.send_task.is_some() {
1047 ThreadStatus::Generating
1048 } else {
1049 ThreadStatus::Idle
1050 }
1051 }
1052
1053 pub fn token_usage(&self) -> Option<&TokenUsage> {
1054 self.token_usage.as_ref()
1055 }
1056
1057 pub fn has_pending_edit_tool_calls(&self) -> bool {
1058 for entry in self.entries.iter().rev() {
1059 match entry {
1060 AgentThreadEntry::UserMessage(_) => return false,
1061 AgentThreadEntry::ToolCall(
1062 call @ ToolCall {
1063 status: ToolCallStatus::InProgress | ToolCallStatus::Pending,
1064 ..
1065 },
1066 ) if call.diffs().next().is_some() => {
1067 return true;
1068 }
1069 AgentThreadEntry::ToolCall(_) | AgentThreadEntry::AssistantMessage(_) => {}
1070 }
1071 }
1072
1073 false
1074 }
1075
1076 pub fn used_tools_since_last_user_message(&self) -> bool {
1077 for entry in self.entries.iter().rev() {
1078 match entry {
1079 AgentThreadEntry::UserMessage(..) => return false,
1080 AgentThreadEntry::AssistantMessage(..) => continue,
1081 AgentThreadEntry::ToolCall(..) => return true,
1082 }
1083 }
1084
1085 false
1086 }
1087
1088 pub fn handle_session_update(
1089 &mut self,
1090 update: acp::SessionUpdate,
1091 cx: &mut Context<Self>,
1092 ) -> Result<(), acp::Error> {
1093 match update {
1094 acp::SessionUpdate::UserMessageChunk { content } => {
1095 self.push_user_content_block(None, content, cx);
1096 }
1097 acp::SessionUpdate::AgentMessageChunk { content } => {
1098 self.push_assistant_content_block(content, false, cx);
1099 }
1100 acp::SessionUpdate::AgentThoughtChunk { content } => {
1101 self.push_assistant_content_block(content, true, cx);
1102 }
1103 acp::SessionUpdate::ToolCall(tool_call) => {
1104 self.upsert_tool_call(tool_call, cx)?;
1105 }
1106 acp::SessionUpdate::ToolCallUpdate(tool_call_update) => {
1107 self.update_tool_call(tool_call_update, cx)?;
1108 }
1109 acp::SessionUpdate::Plan(plan) => {
1110 self.update_plan(plan, cx);
1111 }
1112 acp::SessionUpdate::AvailableCommandsUpdate { available_commands } => {
1113 cx.emit(AcpThreadEvent::AvailableCommandsUpdated(available_commands))
1114 }
1115 acp::SessionUpdate::CurrentModeUpdate { current_mode_id } => {
1116 cx.emit(AcpThreadEvent::ModeUpdated(current_mode_id))
1117 }
1118 }
1119 Ok(())
1120 }
1121
1122 pub fn push_user_content_block(
1123 &mut self,
1124 message_id: Option<UserMessageId>,
1125 chunk: acp::ContentBlock,
1126 cx: &mut Context<Self>,
1127 ) {
1128 let language_registry = self.project.read(cx).languages().clone();
1129 let entries_len = self.entries.len();
1130
1131 if let Some(last_entry) = self.entries.last_mut()
1132 && let AgentThreadEntry::UserMessage(UserMessage {
1133 id,
1134 content,
1135 chunks,
1136 ..
1137 }) = last_entry
1138 {
1139 *id = message_id.or(id.take());
1140 content.append(chunk.clone(), &language_registry, cx);
1141 chunks.push(chunk);
1142 let idx = entries_len - 1;
1143 cx.emit(AcpThreadEvent::EntryUpdated(idx));
1144 } else {
1145 let content = ContentBlock::new(chunk.clone(), &language_registry, cx);
1146 self.push_entry(
1147 AgentThreadEntry::UserMessage(UserMessage {
1148 id: message_id,
1149 content,
1150 chunks: vec![chunk],
1151 checkpoint: None,
1152 }),
1153 cx,
1154 );
1155 }
1156 }
1157
1158 pub fn push_assistant_content_block(
1159 &mut self,
1160 chunk: acp::ContentBlock,
1161 is_thought: bool,
1162 cx: &mut Context<Self>,
1163 ) {
1164 let language_registry = self.project.read(cx).languages().clone();
1165 let entries_len = self.entries.len();
1166 if let Some(last_entry) = self.entries.last_mut()
1167 && let AgentThreadEntry::AssistantMessage(AssistantMessage { chunks }) = last_entry
1168 {
1169 let idx = entries_len - 1;
1170 cx.emit(AcpThreadEvent::EntryUpdated(idx));
1171 match (chunks.last_mut(), is_thought) {
1172 (Some(AssistantMessageChunk::Message { block }), false)
1173 | (Some(AssistantMessageChunk::Thought { block }), true) => {
1174 block.append(chunk, &language_registry, cx)
1175 }
1176 _ => {
1177 let block = ContentBlock::new(chunk, &language_registry, cx);
1178 if is_thought {
1179 chunks.push(AssistantMessageChunk::Thought { block })
1180 } else {
1181 chunks.push(AssistantMessageChunk::Message { block })
1182 }
1183 }
1184 }
1185 } else {
1186 let block = ContentBlock::new(chunk, &language_registry, cx);
1187 let chunk = if is_thought {
1188 AssistantMessageChunk::Thought { block }
1189 } else {
1190 AssistantMessageChunk::Message { block }
1191 };
1192
1193 self.push_entry(
1194 AgentThreadEntry::AssistantMessage(AssistantMessage {
1195 chunks: vec![chunk],
1196 }),
1197 cx,
1198 );
1199 }
1200 }
1201
1202 fn push_entry(&mut self, entry: AgentThreadEntry, cx: &mut Context<Self>) {
1203 self.entries.push(entry);
1204 cx.emit(AcpThreadEvent::NewEntry);
1205 }
1206
1207 pub fn can_set_title(&mut self, cx: &mut Context<Self>) -> bool {
1208 self.connection.set_title(&self.session_id, cx).is_some()
1209 }
1210
1211 pub fn set_title(&mut self, title: SharedString, cx: &mut Context<Self>) -> Task<Result<()>> {
1212 if title != self.title {
1213 self.title = title.clone();
1214 cx.emit(AcpThreadEvent::TitleUpdated);
1215 if let Some(set_title) = self.connection.set_title(&self.session_id, cx) {
1216 return set_title.run(title, cx);
1217 }
1218 }
1219 Task::ready(Ok(()))
1220 }
1221
1222 pub fn update_token_usage(&mut self, usage: Option<TokenUsage>, cx: &mut Context<Self>) {
1223 self.token_usage = usage;
1224 cx.emit(AcpThreadEvent::TokenUsageUpdated);
1225 }
1226
1227 pub fn update_retry_status(&mut self, status: RetryStatus, cx: &mut Context<Self>) {
1228 cx.emit(AcpThreadEvent::Retry(status));
1229 }
1230
1231 pub fn update_tool_call(
1232 &mut self,
1233 update: impl Into<ToolCallUpdate>,
1234 cx: &mut Context<Self>,
1235 ) -> Result<()> {
1236 let update = update.into();
1237 let languages = self.project.read(cx).languages().clone();
1238
1239 let ix = match self.index_for_tool_call(update.id()) {
1240 Some(ix) => ix,
1241 None => {
1242 // Tool call not found - create a failed tool call entry
1243 let failed_tool_call = ToolCall {
1244 id: update.id().clone(),
1245 label: cx.new(|cx| Markdown::new("Tool call not found".into(), None, None, cx)),
1246 kind: acp::ToolKind::Fetch,
1247 content: vec![ToolCallContent::ContentBlock(ContentBlock::new(
1248 acp::ContentBlock::Text(acp::TextContent {
1249 text: "Tool call not found".to_string(),
1250 annotations: None,
1251 meta: None,
1252 }),
1253 &languages,
1254 cx,
1255 ))],
1256 status: ToolCallStatus::Failed,
1257 locations: Vec::new(),
1258 resolved_locations: Vec::new(),
1259 raw_input: None,
1260 raw_output: None,
1261 };
1262 self.push_entry(AgentThreadEntry::ToolCall(failed_tool_call), cx);
1263 return Ok(());
1264 }
1265 };
1266 let AgentThreadEntry::ToolCall(call) = &mut self.entries[ix] else {
1267 unreachable!()
1268 };
1269
1270 match update {
1271 ToolCallUpdate::UpdateFields(update) => {
1272 let location_updated = update.fields.locations.is_some();
1273 call.update_fields(update.fields, languages, &self.terminals, cx)?;
1274 if location_updated {
1275 self.resolve_locations(update.id, cx);
1276 }
1277 }
1278 ToolCallUpdate::UpdateDiff(update) => {
1279 call.content.clear();
1280 call.content.push(ToolCallContent::Diff(update.diff));
1281 }
1282 ToolCallUpdate::UpdateTerminal(update) => {
1283 call.content.clear();
1284 call.content
1285 .push(ToolCallContent::Terminal(update.terminal));
1286 }
1287 }
1288
1289 cx.emit(AcpThreadEvent::EntryUpdated(ix));
1290
1291 Ok(())
1292 }
1293
1294 /// Updates a tool call if id matches an existing entry, otherwise inserts a new one.
1295 pub fn upsert_tool_call(
1296 &mut self,
1297 tool_call: acp::ToolCall,
1298 cx: &mut Context<Self>,
1299 ) -> Result<(), acp::Error> {
1300 let status = tool_call.status.into();
1301 self.upsert_tool_call_inner(tool_call.into(), status, cx)
1302 }
1303
1304 /// Fails if id does not match an existing entry.
1305 pub fn upsert_tool_call_inner(
1306 &mut self,
1307 update: acp::ToolCallUpdate,
1308 status: ToolCallStatus,
1309 cx: &mut Context<Self>,
1310 ) -> Result<(), acp::Error> {
1311 let language_registry = self.project.read(cx).languages().clone();
1312 let id = update.id.clone();
1313
1314 if let Some(ix) = self.index_for_tool_call(&id) {
1315 let AgentThreadEntry::ToolCall(call) = &mut self.entries[ix] else {
1316 unreachable!()
1317 };
1318
1319 call.update_fields(update.fields, language_registry, &self.terminals, cx)?;
1320 call.status = status;
1321
1322 cx.emit(AcpThreadEvent::EntryUpdated(ix));
1323 } else {
1324 let call = ToolCall::from_acp(
1325 update.try_into()?,
1326 status,
1327 language_registry,
1328 &self.terminals,
1329 cx,
1330 )?;
1331 self.push_entry(AgentThreadEntry::ToolCall(call), cx);
1332 };
1333
1334 self.resolve_locations(id, cx);
1335 Ok(())
1336 }
1337
1338 fn index_for_tool_call(&self, id: &acp::ToolCallId) -> Option<usize> {
1339 self.entries
1340 .iter()
1341 .enumerate()
1342 .rev()
1343 .find_map(|(index, entry)| {
1344 if let AgentThreadEntry::ToolCall(tool_call) = entry
1345 && &tool_call.id == id
1346 {
1347 Some(index)
1348 } else {
1349 None
1350 }
1351 })
1352 }
1353
1354 fn tool_call_mut(&mut self, id: &acp::ToolCallId) -> Option<(usize, &mut ToolCall)> {
1355 // The tool call we are looking for is typically the last one, or very close to the end.
1356 // At the moment, it doesn't seem like a hashmap would be a good fit for this use case.
1357 self.entries
1358 .iter_mut()
1359 .enumerate()
1360 .rev()
1361 .find_map(|(index, tool_call)| {
1362 if let AgentThreadEntry::ToolCall(tool_call) = tool_call
1363 && &tool_call.id == id
1364 {
1365 Some((index, tool_call))
1366 } else {
1367 None
1368 }
1369 })
1370 }
1371
1372 pub fn tool_call(&mut self, id: &acp::ToolCallId) -> Option<(usize, &ToolCall)> {
1373 self.entries
1374 .iter()
1375 .enumerate()
1376 .rev()
1377 .find_map(|(index, tool_call)| {
1378 if let AgentThreadEntry::ToolCall(tool_call) = tool_call
1379 && &tool_call.id == id
1380 {
1381 Some((index, tool_call))
1382 } else {
1383 None
1384 }
1385 })
1386 }
1387
1388 pub fn resolve_locations(&mut self, id: acp::ToolCallId, cx: &mut Context<Self>) {
1389 let project = self.project.clone();
1390 let Some((_, tool_call)) = self.tool_call_mut(&id) else {
1391 return;
1392 };
1393 let task = tool_call.resolve_locations(project, cx);
1394 cx.spawn(async move |this, cx| {
1395 let resolved_locations = task.await;
1396 this.update(cx, |this, cx| {
1397 let project = this.project.clone();
1398 let Some((ix, tool_call)) = this.tool_call_mut(&id) else {
1399 return;
1400 };
1401 if let Some(Some(location)) = resolved_locations.last() {
1402 project.update(cx, |project, cx| {
1403 if let Some(agent_location) = project.agent_location() {
1404 let should_ignore = agent_location.buffer == location.buffer
1405 && location
1406 .buffer
1407 .update(cx, |buffer, _| {
1408 let snapshot = buffer.snapshot();
1409 let old_position =
1410 agent_location.position.to_point(&snapshot);
1411 let new_position = location.position.to_point(&snapshot);
1412 // ignore this so that when we get updates from the edit tool
1413 // the position doesn't reset to the startof line
1414 old_position.row == new_position.row
1415 && old_position.column > new_position.column
1416 })
1417 .ok()
1418 .unwrap_or_default();
1419 if !should_ignore {
1420 project.set_agent_location(Some(location.clone()), cx);
1421 }
1422 }
1423 });
1424 }
1425 if tool_call.resolved_locations != resolved_locations {
1426 tool_call.resolved_locations = resolved_locations;
1427 cx.emit(AcpThreadEvent::EntryUpdated(ix));
1428 }
1429 })
1430 })
1431 .detach();
1432 }
1433
1434 pub fn request_tool_call_authorization(
1435 &mut self,
1436 tool_call: acp::ToolCallUpdate,
1437 options: Vec<acp::PermissionOption>,
1438 respect_always_allow_setting: bool,
1439 cx: &mut Context<Self>,
1440 ) -> Result<BoxFuture<'static, acp::RequestPermissionOutcome>> {
1441 let (tx, rx) = oneshot::channel();
1442
1443 if respect_always_allow_setting && AgentSettings::get_global(cx).always_allow_tool_actions {
1444 // Don't use AllowAlways, because then if you were to turn off always_allow_tool_actions,
1445 // some tools would (incorrectly) continue to auto-accept.
1446 if let Some(allow_once_option) = options.iter().find_map(|option| {
1447 if matches!(option.kind, acp::PermissionOptionKind::AllowOnce) {
1448 Some(option.id.clone())
1449 } else {
1450 None
1451 }
1452 }) {
1453 self.upsert_tool_call_inner(tool_call, ToolCallStatus::Pending, cx)?;
1454 return Ok(async {
1455 acp::RequestPermissionOutcome::Selected {
1456 option_id: allow_once_option,
1457 }
1458 }
1459 .boxed());
1460 }
1461 }
1462
1463 let status = ToolCallStatus::WaitingForConfirmation {
1464 options,
1465 respond_tx: tx,
1466 };
1467
1468 self.upsert_tool_call_inner(tool_call, status, cx)?;
1469 cx.emit(AcpThreadEvent::ToolAuthorizationRequired);
1470
1471 let fut = async {
1472 match rx.await {
1473 Ok(option) => acp::RequestPermissionOutcome::Selected { option_id: option },
1474 Err(oneshot::Canceled) => acp::RequestPermissionOutcome::Cancelled,
1475 }
1476 }
1477 .boxed();
1478
1479 Ok(fut)
1480 }
1481
1482 pub fn authorize_tool_call(
1483 &mut self,
1484 id: acp::ToolCallId,
1485 option_id: acp::PermissionOptionId,
1486 option_kind: acp::PermissionOptionKind,
1487 cx: &mut Context<Self>,
1488 ) {
1489 let Some((ix, call)) = self.tool_call_mut(&id) else {
1490 return;
1491 };
1492
1493 let new_status = match option_kind {
1494 acp::PermissionOptionKind::RejectOnce | acp::PermissionOptionKind::RejectAlways => {
1495 ToolCallStatus::Rejected
1496 }
1497 acp::PermissionOptionKind::AllowOnce | acp::PermissionOptionKind::AllowAlways => {
1498 ToolCallStatus::InProgress
1499 }
1500 };
1501
1502 let curr_status = mem::replace(&mut call.status, new_status);
1503
1504 if let ToolCallStatus::WaitingForConfirmation { respond_tx, .. } = curr_status {
1505 respond_tx.send(option_id).log_err();
1506 } else if cfg!(debug_assertions) {
1507 panic!("tried to authorize an already authorized tool call");
1508 }
1509
1510 cx.emit(AcpThreadEvent::EntryUpdated(ix));
1511 }
1512
1513 pub fn first_tool_awaiting_confirmation(&self) -> Option<&ToolCall> {
1514 let mut first_tool_call = None;
1515
1516 for entry in self.entries.iter().rev() {
1517 match &entry {
1518 AgentThreadEntry::ToolCall(call) => {
1519 if let ToolCallStatus::WaitingForConfirmation { .. } = call.status {
1520 first_tool_call = Some(call);
1521 } else {
1522 continue;
1523 }
1524 }
1525 AgentThreadEntry::UserMessage(_) | AgentThreadEntry::AssistantMessage(_) => {
1526 // Reached the beginning of the turn.
1527 // If we had pending permission requests in the previous turn, they have been cancelled.
1528 break;
1529 }
1530 }
1531 }
1532
1533 first_tool_call
1534 }
1535
1536 pub fn plan(&self) -> &Plan {
1537 &self.plan
1538 }
1539
1540 pub fn update_plan(&mut self, request: acp::Plan, cx: &mut Context<Self>) {
1541 let new_entries_len = request.entries.len();
1542 let mut new_entries = request.entries.into_iter();
1543
1544 // Reuse existing markdown to prevent flickering
1545 for (old, new) in self.plan.entries.iter_mut().zip(new_entries.by_ref()) {
1546 let PlanEntry {
1547 content,
1548 priority,
1549 status,
1550 } = old;
1551 content.update(cx, |old, cx| {
1552 old.replace(new.content, cx);
1553 });
1554 *priority = new.priority;
1555 *status = new.status;
1556 }
1557 for new in new_entries {
1558 self.plan.entries.push(PlanEntry::from_acp(new, cx))
1559 }
1560 self.plan.entries.truncate(new_entries_len);
1561
1562 cx.notify();
1563 }
1564
1565 fn clear_completed_plan_entries(&mut self, cx: &mut Context<Self>) {
1566 self.plan
1567 .entries
1568 .retain(|entry| !matches!(entry.status, acp::PlanEntryStatus::Completed));
1569 cx.notify();
1570 }
1571
1572 #[cfg(any(test, feature = "test-support"))]
1573 pub fn send_raw(
1574 &mut self,
1575 message: &str,
1576 cx: &mut Context<Self>,
1577 ) -> BoxFuture<'static, Result<()>> {
1578 self.send(
1579 vec![acp::ContentBlock::Text(acp::TextContent {
1580 text: message.to_string(),
1581 annotations: None,
1582 meta: None,
1583 })],
1584 cx,
1585 )
1586 }
1587
1588 pub fn send(
1589 &mut self,
1590 message: Vec<acp::ContentBlock>,
1591 cx: &mut Context<Self>,
1592 ) -> BoxFuture<'static, Result<()>> {
1593 let block = ContentBlock::new_combined(
1594 message.clone(),
1595 self.project.read(cx).languages().clone(),
1596 cx,
1597 );
1598 let request = acp::PromptRequest {
1599 prompt: message.clone(),
1600 session_id: self.session_id.clone(),
1601 meta: None,
1602 };
1603 let git_store = self.project.read(cx).git_store().clone();
1604
1605 let message_id = if self.connection.truncate(&self.session_id, cx).is_some() {
1606 Some(UserMessageId::new())
1607 } else {
1608 None
1609 };
1610
1611 self.run_turn(cx, async move |this, cx| {
1612 this.update(cx, |this, cx| {
1613 this.push_entry(
1614 AgentThreadEntry::UserMessage(UserMessage {
1615 id: message_id.clone(),
1616 content: block,
1617 chunks: message,
1618 checkpoint: None,
1619 }),
1620 cx,
1621 );
1622 })
1623 .ok();
1624
1625 let old_checkpoint = git_store
1626 .update(cx, |git, cx| git.checkpoint(cx))?
1627 .await
1628 .context("failed to get old checkpoint")
1629 .log_err();
1630 this.update(cx, |this, cx| {
1631 if let Some((_ix, message)) = this.last_user_message() {
1632 message.checkpoint = old_checkpoint.map(|git_checkpoint| Checkpoint {
1633 git_checkpoint,
1634 show: false,
1635 });
1636 }
1637 this.connection.prompt(message_id, request, cx)
1638 })?
1639 .await
1640 })
1641 }
1642
1643 pub fn can_resume(&self, cx: &App) -> bool {
1644 self.connection.resume(&self.session_id, cx).is_some()
1645 }
1646
1647 pub fn resume(&mut self, cx: &mut Context<Self>) -> BoxFuture<'static, Result<()>> {
1648 self.run_turn(cx, async move |this, cx| {
1649 this.update(cx, |this, cx| {
1650 this.connection
1651 .resume(&this.session_id, cx)
1652 .map(|resume| resume.run(cx))
1653 })?
1654 .context("resuming a session is not supported")?
1655 .await
1656 })
1657 }
1658
1659 fn run_turn(
1660 &mut self,
1661 cx: &mut Context<Self>,
1662 f: impl 'static + AsyncFnOnce(WeakEntity<Self>, &mut AsyncApp) -> Result<acp::PromptResponse>,
1663 ) -> BoxFuture<'static, Result<()>> {
1664 self.clear_completed_plan_entries(cx);
1665
1666 let (tx, rx) = oneshot::channel();
1667 let cancel_task = self.cancel(cx);
1668
1669 self.send_task = Some(cx.spawn(async move |this, cx| {
1670 cancel_task.await;
1671 tx.send(f(this, cx).await).ok();
1672 }));
1673
1674 cx.spawn(async move |this, cx| {
1675 let response = rx.await;
1676
1677 this.update(cx, |this, cx| this.update_last_checkpoint(cx))?
1678 .await?;
1679
1680 this.update(cx, |this, cx| {
1681 this.project
1682 .update(cx, |project, cx| project.set_agent_location(None, cx));
1683 match response {
1684 Ok(Err(e)) => {
1685 this.send_task.take();
1686 cx.emit(AcpThreadEvent::Error);
1687 Err(e)
1688 }
1689 result => {
1690 let canceled = matches!(
1691 result,
1692 Ok(Ok(acp::PromptResponse {
1693 stop_reason: acp::StopReason::Cancelled,
1694 meta: None,
1695 }))
1696 );
1697
1698 // We only take the task if the current prompt wasn't canceled.
1699 //
1700 // This prompt may have been canceled because another one was sent
1701 // while it was still generating. In these cases, dropping `send_task`
1702 // would cause the next generation to be canceled.
1703 if !canceled {
1704 this.send_task.take();
1705 }
1706
1707 // Handle refusal - distinguish between user prompt and tool call refusals
1708 if let Ok(Ok(acp::PromptResponse {
1709 stop_reason: acp::StopReason::Refusal,
1710 meta: _,
1711 })) = result
1712 {
1713 if let Some((user_msg_ix, _)) = this.last_user_message() {
1714 // Check if there's a completed tool call with results after the last user message
1715 // This indicates the refusal is in response to tool output, not the user's prompt
1716 let has_completed_tool_call_after_user_msg =
1717 this.entries.iter().skip(user_msg_ix + 1).any(|entry| {
1718 if let AgentThreadEntry::ToolCall(tool_call) = entry {
1719 // Check if the tool call has completed and has output
1720 matches!(tool_call.status, ToolCallStatus::Completed)
1721 && tool_call.raw_output.is_some()
1722 } else {
1723 false
1724 }
1725 });
1726
1727 if has_completed_tool_call_after_user_msg {
1728 // Refusal is due to tool output - don't truncate, just notify
1729 // The model refused based on what the tool returned
1730 cx.emit(AcpThreadEvent::Refusal);
1731 } else {
1732 // User prompt was refused - truncate back to before the user message
1733 let range = user_msg_ix..this.entries.len();
1734 if range.start < range.end {
1735 this.entries.truncate(user_msg_ix);
1736 cx.emit(AcpThreadEvent::EntriesRemoved(range));
1737 }
1738 cx.emit(AcpThreadEvent::Refusal);
1739 }
1740 } else {
1741 // No user message found, treat as general refusal
1742 cx.emit(AcpThreadEvent::Refusal);
1743 }
1744 }
1745
1746 cx.emit(AcpThreadEvent::Stopped);
1747 Ok(())
1748 }
1749 }
1750 })?
1751 })
1752 .boxed()
1753 }
1754
1755 pub fn cancel(&mut self, cx: &mut Context<Self>) -> Task<()> {
1756 let Some(send_task) = self.send_task.take() else {
1757 return Task::ready(());
1758 };
1759
1760 for entry in self.entries.iter_mut() {
1761 if let AgentThreadEntry::ToolCall(call) = entry {
1762 let cancel = matches!(
1763 call.status,
1764 ToolCallStatus::Pending
1765 | ToolCallStatus::WaitingForConfirmation { .. }
1766 | ToolCallStatus::InProgress
1767 );
1768
1769 if cancel {
1770 call.status = ToolCallStatus::Canceled;
1771 }
1772 }
1773 }
1774
1775 self.connection.cancel(&self.session_id, cx);
1776
1777 // Wait for the send task to complete
1778 cx.foreground_executor().spawn(send_task)
1779 }
1780
1781 /// Restores the git working tree to the state at the given checkpoint (if one exists)
1782 pub fn restore_checkpoint(
1783 &mut self,
1784 id: UserMessageId,
1785 cx: &mut Context<Self>,
1786 ) -> Task<Result<()>> {
1787 let Some((_, message)) = self.user_message_mut(&id) else {
1788 return Task::ready(Err(anyhow!("message not found")));
1789 };
1790
1791 let checkpoint = message
1792 .checkpoint
1793 .as_ref()
1794 .map(|c| c.git_checkpoint.clone());
1795 let rewind = self.rewind(id.clone(), cx);
1796 let git_store = self.project.read(cx).git_store().clone();
1797
1798 cx.spawn(async move |_, cx| {
1799 rewind.await?;
1800 if let Some(checkpoint) = checkpoint {
1801 git_store
1802 .update(cx, |git, cx| git.restore_checkpoint(checkpoint, cx))?
1803 .await?;
1804 }
1805
1806 Ok(())
1807 })
1808 }
1809
1810 /// Rewinds this thread to before the entry at `index`, removing it and all
1811 /// subsequent entries while rejecting any action_log changes made from that point.
1812 /// Unlike `restore_checkpoint`, this method does not restore from git.
1813 pub fn rewind(&mut self, id: UserMessageId, cx: &mut Context<Self>) -> Task<Result<()>> {
1814 let Some(truncate) = self.connection.truncate(&self.session_id, cx) else {
1815 return Task::ready(Err(anyhow!("not supported")));
1816 };
1817
1818 cx.spawn(async move |this, cx| {
1819 cx.update(|cx| truncate.run(id.clone(), cx))?.await?;
1820 this.update(cx, |this, cx| {
1821 if let Some((ix, _)) = this.user_message_mut(&id) {
1822 let range = ix..this.entries.len();
1823 this.entries.truncate(ix);
1824 cx.emit(AcpThreadEvent::EntriesRemoved(range));
1825 }
1826 this.action_log()
1827 .update(cx, |action_log, cx| action_log.reject_all_edits(cx))
1828 })?
1829 .await;
1830 Ok(())
1831 })
1832 }
1833
1834 fn update_last_checkpoint(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
1835 let git_store = self.project.read(cx).git_store().clone();
1836
1837 let old_checkpoint = if let Some((_, message)) = self.last_user_message() {
1838 if let Some(checkpoint) = message.checkpoint.as_ref() {
1839 checkpoint.git_checkpoint.clone()
1840 } else {
1841 return Task::ready(Ok(()));
1842 }
1843 } else {
1844 return Task::ready(Ok(()));
1845 };
1846
1847 let new_checkpoint = git_store.update(cx, |git, cx| git.checkpoint(cx));
1848 cx.spawn(async move |this, cx| {
1849 let new_checkpoint = new_checkpoint
1850 .await
1851 .context("failed to get new checkpoint")
1852 .log_err();
1853 if let Some(new_checkpoint) = new_checkpoint {
1854 let equal = git_store
1855 .update(cx, |git, cx| {
1856 git.compare_checkpoints(old_checkpoint.clone(), new_checkpoint, cx)
1857 })?
1858 .await
1859 .unwrap_or(true);
1860 this.update(cx, |this, cx| {
1861 let (ix, message) = this.last_user_message().context("no user message")?;
1862 let checkpoint = message.checkpoint.as_mut().context("no checkpoint")?;
1863 checkpoint.show = !equal;
1864 cx.emit(AcpThreadEvent::EntryUpdated(ix));
1865 anyhow::Ok(())
1866 })??;
1867 }
1868
1869 Ok(())
1870 })
1871 }
1872
1873 fn last_user_message(&mut self) -> Option<(usize, &mut UserMessage)> {
1874 self.entries
1875 .iter_mut()
1876 .enumerate()
1877 .rev()
1878 .find_map(|(ix, entry)| {
1879 if let AgentThreadEntry::UserMessage(message) = entry {
1880 Some((ix, message))
1881 } else {
1882 None
1883 }
1884 })
1885 }
1886
1887 fn user_message_mut(&mut self, id: &UserMessageId) -> Option<(usize, &mut UserMessage)> {
1888 self.entries.iter_mut().enumerate().find_map(|(ix, entry)| {
1889 if let AgentThreadEntry::UserMessage(message) = entry {
1890 if message.id.as_ref() == Some(id) {
1891 Some((ix, message))
1892 } else {
1893 None
1894 }
1895 } else {
1896 None
1897 }
1898 })
1899 }
1900
1901 pub fn read_text_file(
1902 &self,
1903 path: PathBuf,
1904 line: Option<u32>,
1905 limit: Option<u32>,
1906 reuse_shared_snapshot: bool,
1907 cx: &mut Context<Self>,
1908 ) -> Task<Result<String, acp::Error>> {
1909 // Args are 1-based, move to 0-based
1910 let line = line.unwrap_or_default().saturating_sub(1);
1911 let limit = limit.unwrap_or(u32::MAX);
1912 let project = self.project.clone();
1913 let action_log = self.action_log.clone();
1914 cx.spawn(async move |this, cx| {
1915 let load = project
1916 .update(cx, |project, cx| {
1917 let path = project
1918 .project_path_for_absolute_path(&path, cx)
1919 .ok_or_else(|| {
1920 acp::Error::resource_not_found(Some(path.display().to_string()))
1921 })?;
1922 Ok(project.open_buffer(path, cx))
1923 })
1924 .map_err(|e| acp::Error::internal_error().with_data(e.to_string()))
1925 .flatten()?;
1926
1927 let buffer = load.await?;
1928
1929 let snapshot = if reuse_shared_snapshot {
1930 this.read_with(cx, |this, _| {
1931 this.shared_buffers.get(&buffer.clone()).cloned()
1932 })
1933 .log_err()
1934 .flatten()
1935 } else {
1936 None
1937 };
1938
1939 let snapshot = if let Some(snapshot) = snapshot {
1940 snapshot
1941 } else {
1942 action_log.update(cx, |action_log, cx| {
1943 action_log.buffer_read(buffer.clone(), cx);
1944 })?;
1945
1946 let snapshot = buffer.update(cx, |buffer, _| buffer.snapshot())?;
1947 this.update(cx, |this, _| {
1948 this.shared_buffers.insert(buffer.clone(), snapshot.clone());
1949 })?;
1950 snapshot
1951 };
1952
1953 let max_point = snapshot.max_point();
1954 let start_position = Point::new(line, 0);
1955
1956 if start_position > max_point {
1957 return Err(acp::Error::invalid_params().with_data(format!(
1958 "Attempting to read beyond the end of the file, line {}:{}",
1959 max_point.row + 1,
1960 max_point.column
1961 )));
1962 }
1963
1964 let start = snapshot.anchor_before(start_position);
1965 let end = snapshot.anchor_before(Point::new(line.saturating_add(limit), 0));
1966
1967 project.update(cx, |project, cx| {
1968 project.set_agent_location(
1969 Some(AgentLocation {
1970 buffer: buffer.downgrade(),
1971 position: start,
1972 }),
1973 cx,
1974 );
1975 })?;
1976
1977 Ok(snapshot.text_for_range(start..end).collect::<String>())
1978 })
1979 }
1980
1981 pub fn write_text_file(
1982 &self,
1983 path: PathBuf,
1984 content: String,
1985 cx: &mut Context<Self>,
1986 ) -> Task<Result<()>> {
1987 let project = self.project.clone();
1988 let action_log = self.action_log.clone();
1989 cx.spawn(async move |this, cx| {
1990 let load = project.update(cx, |project, cx| {
1991 let path = project
1992 .project_path_for_absolute_path(&path, cx)
1993 .context("invalid path")?;
1994 anyhow::Ok(project.open_buffer(path, cx))
1995 });
1996 let buffer = load??.await?;
1997 let snapshot = this.update(cx, |this, cx| {
1998 this.shared_buffers
1999 .get(&buffer)
2000 .cloned()
2001 .unwrap_or_else(|| buffer.read(cx).snapshot())
2002 })?;
2003 let edits = cx
2004 .background_executor()
2005 .spawn(async move {
2006 let old_text = snapshot.text();
2007 text_diff(old_text.as_str(), &content)
2008 .into_iter()
2009 .map(|(range, replacement)| {
2010 (
2011 snapshot.anchor_after(range.start)
2012 ..snapshot.anchor_before(range.end),
2013 replacement,
2014 )
2015 })
2016 .collect::<Vec<_>>()
2017 })
2018 .await;
2019
2020 project.update(cx, |project, cx| {
2021 project.set_agent_location(
2022 Some(AgentLocation {
2023 buffer: buffer.downgrade(),
2024 position: edits
2025 .last()
2026 .map(|(range, _)| range.end)
2027 .unwrap_or(Anchor::MIN),
2028 }),
2029 cx,
2030 );
2031 })?;
2032
2033 let format_on_save = cx.update(|cx| {
2034 action_log.update(cx, |action_log, cx| {
2035 action_log.buffer_read(buffer.clone(), cx);
2036 });
2037
2038 let format_on_save = buffer.update(cx, |buffer, cx| {
2039 buffer.edit(edits, None, cx);
2040
2041 let settings = language::language_settings::language_settings(
2042 buffer.language().map(|l| l.name()),
2043 buffer.file(),
2044 cx,
2045 );
2046
2047 settings.format_on_save != FormatOnSave::Off
2048 });
2049 action_log.update(cx, |action_log, cx| {
2050 action_log.buffer_edited(buffer.clone(), cx);
2051 });
2052 format_on_save
2053 })?;
2054
2055 if format_on_save {
2056 let format_task = project.update(cx, |project, cx| {
2057 project.format(
2058 HashSet::from_iter([buffer.clone()]),
2059 LspFormatTarget::Buffers,
2060 false,
2061 FormatTrigger::Save,
2062 cx,
2063 )
2064 })?;
2065 format_task.await.log_err();
2066
2067 action_log.update(cx, |action_log, cx| {
2068 action_log.buffer_edited(buffer.clone(), cx);
2069 })?;
2070 }
2071
2072 project
2073 .update(cx, |project, cx| project.save_buffer(buffer, cx))?
2074 .await
2075 })
2076 }
2077
2078 pub fn create_terminal(
2079 &self,
2080 command: String,
2081 args: Vec<String>,
2082 extra_env: Vec<acp::EnvVariable>,
2083 cwd: Option<PathBuf>,
2084 output_byte_limit: Option<u64>,
2085 cx: &mut Context<Self>,
2086 ) -> Task<Result<Entity<Terminal>>> {
2087 let env = match &cwd {
2088 Some(dir) => self.project.update(cx, |project, cx| {
2089 let worktree = project.find_worktree(dir.as_path(), cx);
2090 let shell = TerminalSettings::get(
2091 worktree.as_ref().map(|(worktree, path)| SettingsLocation {
2092 worktree_id: worktree.read(cx).id(),
2093 path: &path,
2094 }),
2095 cx,
2096 )
2097 .shell
2098 .clone();
2099 project.directory_environment(&shell, dir.as_path().into(), cx)
2100 }),
2101 None => Task::ready(None).shared(),
2102 };
2103 let env = cx.spawn(async move |_, _| {
2104 let mut env = env.await.unwrap_or_default();
2105 // Disables paging for `git` and hopefully other commands
2106 env.insert("PAGER".into(), "".into());
2107 for var in extra_env {
2108 env.insert(var.name, var.value);
2109 }
2110 env
2111 });
2112
2113 let project = self.project.clone();
2114 let language_registry = project.read(cx).languages().clone();
2115 let is_windows = project.read(cx).path_style(cx).is_windows();
2116
2117 let terminal_id = acp::TerminalId(Uuid::new_v4().to_string().into());
2118 let terminal_task = cx.spawn({
2119 let terminal_id = terminal_id.clone();
2120 async move |_this, cx| {
2121 let env = env.await;
2122 let shell = project
2123 .update(cx, |project, cx| {
2124 project
2125 .remote_client()
2126 .and_then(|r| r.read(cx).default_system_shell())
2127 })?
2128 .unwrap_or_else(|| get_default_system_shell_preferring_bash());
2129 let (task_command, task_args) =
2130 ShellBuilder::new(&Shell::Program(shell), is_windows)
2131 .redirect_stdin_to_dev_null()
2132 .build(Some(command.clone()), &args);
2133 let terminal = project
2134 .update(cx, |project, cx| {
2135 project.create_terminal_task(
2136 task::SpawnInTerminal {
2137 command: Some(task_command),
2138 args: task_args,
2139 cwd: cwd.clone(),
2140 env,
2141 ..Default::default()
2142 },
2143 cx,
2144 )
2145 })?
2146 .await?;
2147
2148 cx.new(|cx| {
2149 Terminal::new(
2150 terminal_id,
2151 &format!("{} {}", command, args.join(" ")),
2152 cwd,
2153 output_byte_limit.map(|l| l as usize),
2154 terminal,
2155 language_registry,
2156 cx,
2157 )
2158 })
2159 }
2160 });
2161
2162 cx.spawn(async move |this, cx| {
2163 let terminal = terminal_task.await?;
2164 this.update(cx, |this, _cx| {
2165 this.terminals.insert(terminal_id, terminal.clone());
2166 terminal
2167 })
2168 })
2169 }
2170
2171 pub fn kill_terminal(
2172 &mut self,
2173 terminal_id: acp::TerminalId,
2174 cx: &mut Context<Self>,
2175 ) -> Result<()> {
2176 self.terminals
2177 .get(&terminal_id)
2178 .context("Terminal not found")?
2179 .update(cx, |terminal, cx| {
2180 terminal.kill(cx);
2181 });
2182
2183 Ok(())
2184 }
2185
2186 pub fn release_terminal(
2187 &mut self,
2188 terminal_id: acp::TerminalId,
2189 cx: &mut Context<Self>,
2190 ) -> Result<()> {
2191 self.terminals
2192 .remove(&terminal_id)
2193 .context("Terminal not found")?
2194 .update(cx, |terminal, cx| {
2195 terminal.kill(cx);
2196 });
2197
2198 Ok(())
2199 }
2200
2201 pub fn terminal(&self, terminal_id: acp::TerminalId) -> Result<Entity<Terminal>> {
2202 self.terminals
2203 .get(&terminal_id)
2204 .context("Terminal not found")
2205 .cloned()
2206 }
2207
2208 pub fn to_markdown(&self, cx: &App) -> String {
2209 self.entries.iter().map(|e| e.to_markdown(cx)).collect()
2210 }
2211
2212 pub fn emit_load_error(&mut self, error: LoadError, cx: &mut Context<Self>) {
2213 cx.emit(AcpThreadEvent::LoadError(error));
2214 }
2215
2216 pub fn register_terminal_created(
2217 &mut self,
2218 terminal_id: acp::TerminalId,
2219 command_label: String,
2220 working_dir: Option<PathBuf>,
2221 output_byte_limit: Option<u64>,
2222 terminal: Entity<::terminal::Terminal>,
2223 cx: &mut Context<Self>,
2224 ) -> Entity<Terminal> {
2225 let language_registry = self.project.read(cx).languages().clone();
2226
2227 let entity = cx.new(|cx| {
2228 Terminal::new(
2229 terminal_id.clone(),
2230 &command_label,
2231 working_dir.clone(),
2232 output_byte_limit.map(|l| l as usize),
2233 terminal,
2234 language_registry,
2235 cx,
2236 )
2237 });
2238 self.terminals.insert(terminal_id.clone(), entity.clone());
2239 entity
2240 }
2241}
2242
2243fn markdown_for_raw_output(
2244 raw_output: &serde_json::Value,
2245 language_registry: &Arc<LanguageRegistry>,
2246 cx: &mut App,
2247) -> Option<Entity<Markdown>> {
2248 match raw_output {
2249 serde_json::Value::Null => None,
2250 serde_json::Value::Bool(value) => Some(cx.new(|cx| {
2251 Markdown::new(
2252 value.to_string().into(),
2253 Some(language_registry.clone()),
2254 None,
2255 cx,
2256 )
2257 })),
2258 serde_json::Value::Number(value) => Some(cx.new(|cx| {
2259 Markdown::new(
2260 value.to_string().into(),
2261 Some(language_registry.clone()),
2262 None,
2263 cx,
2264 )
2265 })),
2266 serde_json::Value::String(value) => Some(cx.new(|cx| {
2267 Markdown::new(
2268 value.clone().into(),
2269 Some(language_registry.clone()),
2270 None,
2271 cx,
2272 )
2273 })),
2274 value => Some(cx.new(|cx| {
2275 Markdown::new(
2276 format!("```json\n{}\n```", value).into(),
2277 Some(language_registry.clone()),
2278 None,
2279 cx,
2280 )
2281 })),
2282 }
2283}
2284
2285#[cfg(test)]
2286mod tests {
2287 use super::*;
2288 use anyhow::anyhow;
2289 use futures::{channel::mpsc, future::LocalBoxFuture, select};
2290 use gpui::{App, AsyncApp, TestAppContext, WeakEntity};
2291 use indoc::indoc;
2292 use project::{FakeFs, Fs};
2293 use rand::{distr, prelude::*};
2294 use serde_json::json;
2295 use settings::SettingsStore;
2296 use smol::stream::StreamExt as _;
2297 use std::{
2298 any::Any,
2299 cell::RefCell,
2300 path::Path,
2301 rc::Rc,
2302 sync::atomic::{AtomicBool, AtomicUsize, Ordering::SeqCst},
2303 time::Duration,
2304 };
2305 use util::path;
2306
2307 fn init_test(cx: &mut TestAppContext) {
2308 env_logger::try_init().ok();
2309 cx.update(|cx| {
2310 let settings_store = SettingsStore::test(cx);
2311 cx.set_global(settings_store);
2312 Project::init_settings(cx);
2313 language::init(cx);
2314 });
2315 }
2316
2317 #[gpui::test]
2318 async fn test_terminal_output_buffered_before_created_renders(cx: &mut gpui::TestAppContext) {
2319 init_test(cx);
2320
2321 let fs = FakeFs::new(cx.executor());
2322 let project = Project::test(fs, [], cx).await;
2323 let connection = Rc::new(FakeAgentConnection::new());
2324 let thread = cx
2325 .update(|cx| connection.new_thread(project, std::path::Path::new(path!("/test")), cx))
2326 .await
2327 .unwrap();
2328
2329 let terminal_id = acp::TerminalId(uuid::Uuid::new_v4().to_string().into());
2330
2331 // Send Output BEFORE Created - should be buffered by acp_thread
2332 thread.update(cx, |thread, cx| {
2333 thread.on_terminal_provider_event(
2334 TerminalProviderEvent::Output {
2335 terminal_id: terminal_id.clone(),
2336 data: b"hello buffered".to_vec(),
2337 },
2338 cx,
2339 );
2340 });
2341
2342 // Create a display-only terminal and then send Created
2343 let lower = cx.new(|cx| {
2344 let builder = ::terminal::TerminalBuilder::new_display_only(
2345 ::terminal::terminal_settings::CursorShape::default(),
2346 ::terminal::terminal_settings::AlternateScroll::On,
2347 None,
2348 0,
2349 )
2350 .unwrap();
2351 builder.subscribe(cx)
2352 });
2353
2354 thread.update(cx, |thread, cx| {
2355 thread.on_terminal_provider_event(
2356 TerminalProviderEvent::Created {
2357 terminal_id: terminal_id.clone(),
2358 label: "Buffered Test".to_string(),
2359 cwd: None,
2360 output_byte_limit: None,
2361 terminal: lower.clone(),
2362 },
2363 cx,
2364 );
2365 });
2366
2367 // After Created, buffered Output should have been flushed into the renderer
2368 let content = thread.read_with(cx, |thread, cx| {
2369 let term = thread.terminal(terminal_id.clone()).unwrap();
2370 term.read_with(cx, |t, cx| t.inner().read(cx).get_content())
2371 });
2372
2373 assert!(
2374 content.contains("hello buffered"),
2375 "expected buffered output to render, got: {content}"
2376 );
2377 }
2378
2379 #[gpui::test]
2380 async fn test_terminal_output_and_exit_buffered_before_created(cx: &mut gpui::TestAppContext) {
2381 init_test(cx);
2382
2383 let fs = FakeFs::new(cx.executor());
2384 let project = Project::test(fs, [], cx).await;
2385 let connection = Rc::new(FakeAgentConnection::new());
2386 let thread = cx
2387 .update(|cx| connection.new_thread(project, std::path::Path::new(path!("/test")), cx))
2388 .await
2389 .unwrap();
2390
2391 let terminal_id = acp::TerminalId(uuid::Uuid::new_v4().to_string().into());
2392
2393 // Send Output BEFORE Created
2394 thread.update(cx, |thread, cx| {
2395 thread.on_terminal_provider_event(
2396 TerminalProviderEvent::Output {
2397 terminal_id: terminal_id.clone(),
2398 data: b"pre-exit data".to_vec(),
2399 },
2400 cx,
2401 );
2402 });
2403
2404 // Send Exit BEFORE Created
2405 thread.update(cx, |thread, cx| {
2406 thread.on_terminal_provider_event(
2407 TerminalProviderEvent::Exit {
2408 terminal_id: terminal_id.clone(),
2409 status: acp::TerminalExitStatus {
2410 exit_code: Some(0),
2411 signal: None,
2412 meta: None,
2413 },
2414 },
2415 cx,
2416 );
2417 });
2418
2419 // Now create a display-only lower-level terminal and send Created
2420 let lower = cx.new(|cx| {
2421 let builder = ::terminal::TerminalBuilder::new_display_only(
2422 ::terminal::terminal_settings::CursorShape::default(),
2423 ::terminal::terminal_settings::AlternateScroll::On,
2424 None,
2425 0,
2426 )
2427 .unwrap();
2428 builder.subscribe(cx)
2429 });
2430
2431 thread.update(cx, |thread, cx| {
2432 thread.on_terminal_provider_event(
2433 TerminalProviderEvent::Created {
2434 terminal_id: terminal_id.clone(),
2435 label: "Buffered Exit Test".to_string(),
2436 cwd: None,
2437 output_byte_limit: None,
2438 terminal: lower.clone(),
2439 },
2440 cx,
2441 );
2442 });
2443
2444 // Output should be present after Created (flushed from buffer)
2445 let content = thread.read_with(cx, |thread, cx| {
2446 let term = thread.terminal(terminal_id.clone()).unwrap();
2447 term.read_with(cx, |t, cx| t.inner().read(cx).get_content())
2448 });
2449
2450 assert!(
2451 content.contains("pre-exit data"),
2452 "expected pre-exit data to render, got: {content}"
2453 );
2454 }
2455
2456 #[gpui::test]
2457 async fn test_push_user_content_block(cx: &mut gpui::TestAppContext) {
2458 init_test(cx);
2459
2460 let fs = FakeFs::new(cx.executor());
2461 let project = Project::test(fs, [], cx).await;
2462 let connection = Rc::new(FakeAgentConnection::new());
2463 let thread = cx
2464 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
2465 .await
2466 .unwrap();
2467
2468 // Test creating a new user message
2469 thread.update(cx, |thread, cx| {
2470 thread.push_user_content_block(
2471 None,
2472 acp::ContentBlock::Text(acp::TextContent {
2473 annotations: None,
2474 text: "Hello, ".to_string(),
2475 meta: None,
2476 }),
2477 cx,
2478 );
2479 });
2480
2481 thread.update(cx, |thread, cx| {
2482 assert_eq!(thread.entries.len(), 1);
2483 if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
2484 assert_eq!(user_msg.id, None);
2485 assert_eq!(user_msg.content.to_markdown(cx), "Hello, ");
2486 } else {
2487 panic!("Expected UserMessage");
2488 }
2489 });
2490
2491 // Test appending to existing user message
2492 let message_1_id = UserMessageId::new();
2493 thread.update(cx, |thread, cx| {
2494 thread.push_user_content_block(
2495 Some(message_1_id.clone()),
2496 acp::ContentBlock::Text(acp::TextContent {
2497 annotations: None,
2498 text: "world!".to_string(),
2499 meta: None,
2500 }),
2501 cx,
2502 );
2503 });
2504
2505 thread.update(cx, |thread, cx| {
2506 assert_eq!(thread.entries.len(), 1);
2507 if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
2508 assert_eq!(user_msg.id, Some(message_1_id));
2509 assert_eq!(user_msg.content.to_markdown(cx), "Hello, world!");
2510 } else {
2511 panic!("Expected UserMessage");
2512 }
2513 });
2514
2515 // Test creating new user message after assistant message
2516 thread.update(cx, |thread, cx| {
2517 thread.push_assistant_content_block(
2518 acp::ContentBlock::Text(acp::TextContent {
2519 annotations: None,
2520 text: "Assistant response".to_string(),
2521 meta: None,
2522 }),
2523 false,
2524 cx,
2525 );
2526 });
2527
2528 let message_2_id = UserMessageId::new();
2529 thread.update(cx, |thread, cx| {
2530 thread.push_user_content_block(
2531 Some(message_2_id.clone()),
2532 acp::ContentBlock::Text(acp::TextContent {
2533 annotations: None,
2534 text: "New user message".to_string(),
2535 meta: None,
2536 }),
2537 cx,
2538 );
2539 });
2540
2541 thread.update(cx, |thread, cx| {
2542 assert_eq!(thread.entries.len(), 3);
2543 if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[2] {
2544 assert_eq!(user_msg.id, Some(message_2_id));
2545 assert_eq!(user_msg.content.to_markdown(cx), "New user message");
2546 } else {
2547 panic!("Expected UserMessage at index 2");
2548 }
2549 });
2550 }
2551
2552 #[gpui::test]
2553 async fn test_thinking_concatenation(cx: &mut gpui::TestAppContext) {
2554 init_test(cx);
2555
2556 let fs = FakeFs::new(cx.executor());
2557 let project = Project::test(fs, [], cx).await;
2558 let connection = Rc::new(FakeAgentConnection::new().on_user_message(
2559 |_, thread, mut cx| {
2560 async move {
2561 thread.update(&mut cx, |thread, cx| {
2562 thread
2563 .handle_session_update(
2564 acp::SessionUpdate::AgentThoughtChunk {
2565 content: "Thinking ".into(),
2566 },
2567 cx,
2568 )
2569 .unwrap();
2570 thread
2571 .handle_session_update(
2572 acp::SessionUpdate::AgentThoughtChunk {
2573 content: "hard!".into(),
2574 },
2575 cx,
2576 )
2577 .unwrap();
2578 })?;
2579 Ok(acp::PromptResponse {
2580 stop_reason: acp::StopReason::EndTurn,
2581 meta: None,
2582 })
2583 }
2584 .boxed_local()
2585 },
2586 ));
2587
2588 let thread = cx
2589 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
2590 .await
2591 .unwrap();
2592
2593 thread
2594 .update(cx, |thread, cx| thread.send_raw("Hello from Zed!", cx))
2595 .await
2596 .unwrap();
2597
2598 let output = thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
2599 assert_eq!(
2600 output,
2601 indoc! {r#"
2602 ## User
2603
2604 Hello from Zed!
2605
2606 ## Assistant
2607
2608 <thinking>
2609 Thinking hard!
2610 </thinking>
2611
2612 "#}
2613 );
2614 }
2615
2616 #[gpui::test]
2617 async fn test_edits_concurrently_to_user(cx: &mut TestAppContext) {
2618 init_test(cx);
2619
2620 let fs = FakeFs::new(cx.executor());
2621 fs.insert_tree(path!("/tmp"), json!({"foo": "one\ntwo\nthree\n"}))
2622 .await;
2623 let project = Project::test(fs.clone(), [], cx).await;
2624 let (read_file_tx, read_file_rx) = oneshot::channel::<()>();
2625 let read_file_tx = Rc::new(RefCell::new(Some(read_file_tx)));
2626 let connection = Rc::new(FakeAgentConnection::new().on_user_message(
2627 move |_, thread, mut cx| {
2628 let read_file_tx = read_file_tx.clone();
2629 async move {
2630 let content = thread
2631 .update(&mut cx, |thread, cx| {
2632 thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
2633 })
2634 .unwrap()
2635 .await
2636 .unwrap();
2637 assert_eq!(content, "one\ntwo\nthree\n");
2638 read_file_tx.take().unwrap().send(()).unwrap();
2639 thread
2640 .update(&mut cx, |thread, cx| {
2641 thread.write_text_file(
2642 path!("/tmp/foo").into(),
2643 "one\ntwo\nthree\nfour\nfive\n".to_string(),
2644 cx,
2645 )
2646 })
2647 .unwrap()
2648 .await
2649 .unwrap();
2650 Ok(acp::PromptResponse {
2651 stop_reason: acp::StopReason::EndTurn,
2652 meta: None,
2653 })
2654 }
2655 .boxed_local()
2656 },
2657 ));
2658
2659 let (worktree, pathbuf) = project
2660 .update(cx, |project, cx| {
2661 project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
2662 })
2663 .await
2664 .unwrap();
2665 let buffer = project
2666 .update(cx, |project, cx| {
2667 project.open_buffer((worktree.read(cx).id(), pathbuf), cx)
2668 })
2669 .await
2670 .unwrap();
2671
2672 let thread = cx
2673 .update(|cx| connection.new_thread(project, Path::new(path!("/tmp")), cx))
2674 .await
2675 .unwrap();
2676
2677 let request = thread.update(cx, |thread, cx| {
2678 thread.send_raw("Extend the count in /tmp/foo", cx)
2679 });
2680 read_file_rx.await.ok();
2681 buffer.update(cx, |buffer, cx| {
2682 buffer.edit([(0..0, "zero\n".to_string())], None, cx);
2683 });
2684 cx.run_until_parked();
2685 assert_eq!(
2686 buffer.read_with(cx, |buffer, _| buffer.text()),
2687 "zero\none\ntwo\nthree\nfour\nfive\n"
2688 );
2689 assert_eq!(
2690 String::from_utf8(fs.read_file_sync(path!("/tmp/foo")).unwrap()).unwrap(),
2691 "zero\none\ntwo\nthree\nfour\nfive\n"
2692 );
2693 request.await.unwrap();
2694 }
2695
2696 #[gpui::test]
2697 async fn test_reading_from_line(cx: &mut TestAppContext) {
2698 init_test(cx);
2699
2700 let fs = FakeFs::new(cx.executor());
2701 fs.insert_tree(path!("/tmp"), json!({"foo": "one\ntwo\nthree\nfour\n"}))
2702 .await;
2703 let project = Project::test(fs.clone(), [], cx).await;
2704 project
2705 .update(cx, |project, cx| {
2706 project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
2707 })
2708 .await
2709 .unwrap();
2710
2711 let connection = Rc::new(FakeAgentConnection::new());
2712
2713 let thread = cx
2714 .update(|cx| connection.new_thread(project, Path::new(path!("/tmp")), cx))
2715 .await
2716 .unwrap();
2717
2718 // Whole file
2719 let content = thread
2720 .update(cx, |thread, cx| {
2721 thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
2722 })
2723 .await
2724 .unwrap();
2725
2726 assert_eq!(content, "one\ntwo\nthree\nfour\n");
2727
2728 // Only start line
2729 let content = thread
2730 .update(cx, |thread, cx| {
2731 thread.read_text_file(path!("/tmp/foo").into(), Some(3), None, false, cx)
2732 })
2733 .await
2734 .unwrap();
2735
2736 assert_eq!(content, "three\nfour\n");
2737
2738 // Only limit
2739 let content = thread
2740 .update(cx, |thread, cx| {
2741 thread.read_text_file(path!("/tmp/foo").into(), None, Some(2), false, cx)
2742 })
2743 .await
2744 .unwrap();
2745
2746 assert_eq!(content, "one\ntwo\n");
2747
2748 // Range
2749 let content = thread
2750 .update(cx, |thread, cx| {
2751 thread.read_text_file(path!("/tmp/foo").into(), Some(2), Some(2), false, cx)
2752 })
2753 .await
2754 .unwrap();
2755
2756 assert_eq!(content, "two\nthree\n");
2757
2758 // Invalid
2759 let err = thread
2760 .update(cx, |thread, cx| {
2761 thread.read_text_file(path!("/tmp/foo").into(), Some(6), Some(2), false, cx)
2762 })
2763 .await
2764 .unwrap_err();
2765
2766 assert_eq!(
2767 err.to_string(),
2768 "Invalid params: \"Attempting to read beyond the end of the file, line 5:0\""
2769 );
2770 }
2771
2772 #[gpui::test]
2773 async fn test_reading_empty_file(cx: &mut TestAppContext) {
2774 init_test(cx);
2775
2776 let fs = FakeFs::new(cx.executor());
2777 fs.insert_tree(path!("/tmp"), json!({"foo": ""})).await;
2778 let project = Project::test(fs.clone(), [], cx).await;
2779 project
2780 .update(cx, |project, cx| {
2781 project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
2782 })
2783 .await
2784 .unwrap();
2785
2786 let connection = Rc::new(FakeAgentConnection::new());
2787
2788 let thread = cx
2789 .update(|cx| connection.new_thread(project, Path::new(path!("/tmp")), cx))
2790 .await
2791 .unwrap();
2792
2793 // Whole file
2794 let content = thread
2795 .update(cx, |thread, cx| {
2796 thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
2797 })
2798 .await
2799 .unwrap();
2800
2801 assert_eq!(content, "");
2802
2803 // Only start line
2804 let content = thread
2805 .update(cx, |thread, cx| {
2806 thread.read_text_file(path!("/tmp/foo").into(), Some(1), None, false, cx)
2807 })
2808 .await
2809 .unwrap();
2810
2811 assert_eq!(content, "");
2812
2813 // Only limit
2814 let content = thread
2815 .update(cx, |thread, cx| {
2816 thread.read_text_file(path!("/tmp/foo").into(), None, Some(2), false, cx)
2817 })
2818 .await
2819 .unwrap();
2820
2821 assert_eq!(content, "");
2822
2823 // Range
2824 let content = thread
2825 .update(cx, |thread, cx| {
2826 thread.read_text_file(path!("/tmp/foo").into(), Some(1), Some(1), false, cx)
2827 })
2828 .await
2829 .unwrap();
2830
2831 assert_eq!(content, "");
2832
2833 // Invalid
2834 let err = thread
2835 .update(cx, |thread, cx| {
2836 thread.read_text_file(path!("/tmp/foo").into(), Some(5), Some(2), false, cx)
2837 })
2838 .await
2839 .unwrap_err();
2840
2841 assert_eq!(
2842 err.to_string(),
2843 "Invalid params: \"Attempting to read beyond the end of the file, line 1:0\""
2844 );
2845 }
2846 #[gpui::test]
2847 async fn test_reading_non_existing_file(cx: &mut TestAppContext) {
2848 init_test(cx);
2849
2850 let fs = FakeFs::new(cx.executor());
2851 fs.insert_tree(path!("/tmp"), json!({})).await;
2852 let project = Project::test(fs.clone(), [], cx).await;
2853 project
2854 .update(cx, |project, cx| {
2855 project.find_or_create_worktree(path!("/tmp"), true, cx)
2856 })
2857 .await
2858 .unwrap();
2859
2860 let connection = Rc::new(FakeAgentConnection::new());
2861
2862 let thread = cx
2863 .update(|cx| connection.new_thread(project, Path::new(path!("/tmp")), cx))
2864 .await
2865 .unwrap();
2866
2867 // Out of project file
2868 let err = thread
2869 .update(cx, |thread, cx| {
2870 thread.read_text_file(path!("/foo").into(), None, None, false, cx)
2871 })
2872 .await
2873 .unwrap_err();
2874
2875 assert_eq!(err.code, acp::ErrorCode::RESOURCE_NOT_FOUND.code);
2876 }
2877
2878 #[gpui::test]
2879 async fn test_succeeding_canceled_toolcall(cx: &mut TestAppContext) {
2880 init_test(cx);
2881
2882 let fs = FakeFs::new(cx.executor());
2883 let project = Project::test(fs, [], cx).await;
2884 let id = acp::ToolCallId("test".into());
2885
2886 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
2887 let id = id.clone();
2888 move |_, thread, mut cx| {
2889 let id = id.clone();
2890 async move {
2891 thread
2892 .update(&mut cx, |thread, cx| {
2893 thread.handle_session_update(
2894 acp::SessionUpdate::ToolCall(acp::ToolCall {
2895 id: id.clone(),
2896 title: "Label".into(),
2897 kind: acp::ToolKind::Fetch,
2898 status: acp::ToolCallStatus::InProgress,
2899 content: vec![],
2900 locations: vec![],
2901 raw_input: None,
2902 raw_output: None,
2903 meta: None,
2904 }),
2905 cx,
2906 )
2907 })
2908 .unwrap()
2909 .unwrap();
2910 Ok(acp::PromptResponse {
2911 stop_reason: acp::StopReason::EndTurn,
2912 meta: None,
2913 })
2914 }
2915 .boxed_local()
2916 }
2917 }));
2918
2919 let thread = cx
2920 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
2921 .await
2922 .unwrap();
2923
2924 let request = thread.update(cx, |thread, cx| {
2925 thread.send_raw("Fetch https://example.com", cx)
2926 });
2927
2928 run_until_first_tool_call(&thread, cx).await;
2929
2930 thread.read_with(cx, |thread, _| {
2931 assert!(matches!(
2932 thread.entries[1],
2933 AgentThreadEntry::ToolCall(ToolCall {
2934 status: ToolCallStatus::InProgress,
2935 ..
2936 })
2937 ));
2938 });
2939
2940 thread.update(cx, |thread, cx| thread.cancel(cx)).await;
2941
2942 thread.read_with(cx, |thread, _| {
2943 assert!(matches!(
2944 &thread.entries[1],
2945 AgentThreadEntry::ToolCall(ToolCall {
2946 status: ToolCallStatus::Canceled,
2947 ..
2948 })
2949 ));
2950 });
2951
2952 thread
2953 .update(cx, |thread, cx| {
2954 thread.handle_session_update(
2955 acp::SessionUpdate::ToolCallUpdate(acp::ToolCallUpdate {
2956 id,
2957 fields: acp::ToolCallUpdateFields {
2958 status: Some(acp::ToolCallStatus::Completed),
2959 ..Default::default()
2960 },
2961 meta: None,
2962 }),
2963 cx,
2964 )
2965 })
2966 .unwrap();
2967
2968 request.await.unwrap();
2969
2970 thread.read_with(cx, |thread, _| {
2971 assert!(matches!(
2972 thread.entries[1],
2973 AgentThreadEntry::ToolCall(ToolCall {
2974 status: ToolCallStatus::Completed,
2975 ..
2976 })
2977 ));
2978 });
2979 }
2980
2981 #[gpui::test]
2982 async fn test_no_pending_edits_if_tool_calls_are_completed(cx: &mut TestAppContext) {
2983 init_test(cx);
2984 let fs = FakeFs::new(cx.background_executor.clone());
2985 fs.insert_tree(path!("/test"), json!({})).await;
2986 let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
2987
2988 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
2989 move |_, thread, mut cx| {
2990 async move {
2991 thread
2992 .update(&mut cx, |thread, cx| {
2993 thread.handle_session_update(
2994 acp::SessionUpdate::ToolCall(acp::ToolCall {
2995 id: acp::ToolCallId("test".into()),
2996 title: "Label".into(),
2997 kind: acp::ToolKind::Edit,
2998 status: acp::ToolCallStatus::Completed,
2999 content: vec![acp::ToolCallContent::Diff {
3000 diff: acp::Diff {
3001 path: "/test/test.txt".into(),
3002 old_text: None,
3003 new_text: "foo".into(),
3004 meta: None,
3005 },
3006 }],
3007 locations: vec![],
3008 raw_input: None,
3009 raw_output: None,
3010 meta: None,
3011 }),
3012 cx,
3013 )
3014 })
3015 .unwrap()
3016 .unwrap();
3017 Ok(acp::PromptResponse {
3018 stop_reason: acp::StopReason::EndTurn,
3019 meta: None,
3020 })
3021 }
3022 .boxed_local()
3023 }
3024 }));
3025
3026 let thread = cx
3027 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3028 .await
3029 .unwrap();
3030
3031 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Hi".into()], cx)))
3032 .await
3033 .unwrap();
3034
3035 assert!(cx.read(|cx| !thread.read(cx).has_pending_edit_tool_calls()));
3036 }
3037
3038 #[gpui::test(iterations = 10)]
3039 async fn test_checkpoints(cx: &mut TestAppContext) {
3040 init_test(cx);
3041 let fs = FakeFs::new(cx.background_executor.clone());
3042 fs.insert_tree(
3043 path!("/test"),
3044 json!({
3045 ".git": {}
3046 }),
3047 )
3048 .await;
3049 let project = Project::test(fs.clone(), [path!("/test").as_ref()], cx).await;
3050
3051 let simulate_changes = Arc::new(AtomicBool::new(true));
3052 let next_filename = Arc::new(AtomicUsize::new(0));
3053 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3054 let simulate_changes = simulate_changes.clone();
3055 let next_filename = next_filename.clone();
3056 let fs = fs.clone();
3057 move |request, thread, mut cx| {
3058 let fs = fs.clone();
3059 let simulate_changes = simulate_changes.clone();
3060 let next_filename = next_filename.clone();
3061 async move {
3062 if simulate_changes.load(SeqCst) {
3063 let filename = format!("/test/file-{}", next_filename.fetch_add(1, SeqCst));
3064 fs.write(Path::new(&filename), b"").await?;
3065 }
3066
3067 let acp::ContentBlock::Text(content) = &request.prompt[0] else {
3068 panic!("expected text content block");
3069 };
3070 thread.update(&mut cx, |thread, cx| {
3071 thread
3072 .handle_session_update(
3073 acp::SessionUpdate::AgentMessageChunk {
3074 content: content.text.to_uppercase().into(),
3075 },
3076 cx,
3077 )
3078 .unwrap();
3079 })?;
3080 Ok(acp::PromptResponse {
3081 stop_reason: acp::StopReason::EndTurn,
3082 meta: None,
3083 })
3084 }
3085 .boxed_local()
3086 }
3087 }));
3088 let thread = cx
3089 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3090 .await
3091 .unwrap();
3092
3093 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Lorem".into()], cx)))
3094 .await
3095 .unwrap();
3096 thread.read_with(cx, |thread, cx| {
3097 assert_eq!(
3098 thread.to_markdown(cx),
3099 indoc! {"
3100 ## User (checkpoint)
3101
3102 Lorem
3103
3104 ## Assistant
3105
3106 LOREM
3107
3108 "}
3109 );
3110 });
3111 assert_eq!(fs.files(), vec![Path::new(path!("/test/file-0"))]);
3112
3113 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["ipsum".into()], cx)))
3114 .await
3115 .unwrap();
3116 thread.read_with(cx, |thread, cx| {
3117 assert_eq!(
3118 thread.to_markdown(cx),
3119 indoc! {"
3120 ## User (checkpoint)
3121
3122 Lorem
3123
3124 ## Assistant
3125
3126 LOREM
3127
3128 ## User (checkpoint)
3129
3130 ipsum
3131
3132 ## Assistant
3133
3134 IPSUM
3135
3136 "}
3137 );
3138 });
3139 assert_eq!(
3140 fs.files(),
3141 vec![
3142 Path::new(path!("/test/file-0")),
3143 Path::new(path!("/test/file-1"))
3144 ]
3145 );
3146
3147 // Checkpoint isn't stored when there are no changes.
3148 simulate_changes.store(false, SeqCst);
3149 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["dolor".into()], cx)))
3150 .await
3151 .unwrap();
3152 thread.read_with(cx, |thread, cx| {
3153 assert_eq!(
3154 thread.to_markdown(cx),
3155 indoc! {"
3156 ## User (checkpoint)
3157
3158 Lorem
3159
3160 ## Assistant
3161
3162 LOREM
3163
3164 ## User (checkpoint)
3165
3166 ipsum
3167
3168 ## Assistant
3169
3170 IPSUM
3171
3172 ## User
3173
3174 dolor
3175
3176 ## Assistant
3177
3178 DOLOR
3179
3180 "}
3181 );
3182 });
3183 assert_eq!(
3184 fs.files(),
3185 vec![
3186 Path::new(path!("/test/file-0")),
3187 Path::new(path!("/test/file-1"))
3188 ]
3189 );
3190
3191 // Rewinding the conversation truncates the history and restores the checkpoint.
3192 thread
3193 .update(cx, |thread, cx| {
3194 let AgentThreadEntry::UserMessage(message) = &thread.entries[2] else {
3195 panic!("unexpected entries {:?}", thread.entries)
3196 };
3197 thread.restore_checkpoint(message.id.clone().unwrap(), cx)
3198 })
3199 .await
3200 .unwrap();
3201 thread.read_with(cx, |thread, cx| {
3202 assert_eq!(
3203 thread.to_markdown(cx),
3204 indoc! {"
3205 ## User (checkpoint)
3206
3207 Lorem
3208
3209 ## Assistant
3210
3211 LOREM
3212
3213 "}
3214 );
3215 });
3216 assert_eq!(fs.files(), vec![Path::new(path!("/test/file-0"))]);
3217 }
3218
3219 #[gpui::test]
3220 async fn test_tool_result_refusal(cx: &mut TestAppContext) {
3221 use std::sync::atomic::AtomicUsize;
3222 init_test(cx);
3223
3224 let fs = FakeFs::new(cx.executor());
3225 let project = Project::test(fs, None, cx).await;
3226
3227 // Create a connection that simulates refusal after tool result
3228 let prompt_count = Arc::new(AtomicUsize::new(0));
3229 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3230 let prompt_count = prompt_count.clone();
3231 move |_request, thread, mut cx| {
3232 let count = prompt_count.fetch_add(1, SeqCst);
3233 async move {
3234 if count == 0 {
3235 // First prompt: Generate a tool call with result
3236 thread.update(&mut cx, |thread, cx| {
3237 thread
3238 .handle_session_update(
3239 acp::SessionUpdate::ToolCall(acp::ToolCall {
3240 id: acp::ToolCallId("tool1".into()),
3241 title: "Test Tool".into(),
3242 kind: acp::ToolKind::Fetch,
3243 status: acp::ToolCallStatus::Completed,
3244 content: vec![],
3245 locations: vec![],
3246 raw_input: Some(serde_json::json!({"query": "test"})),
3247 raw_output: Some(
3248 serde_json::json!({"result": "inappropriate content"}),
3249 ),
3250 meta: None,
3251 }),
3252 cx,
3253 )
3254 .unwrap();
3255 })?;
3256
3257 // Now return refusal because of the tool result
3258 Ok(acp::PromptResponse {
3259 stop_reason: acp::StopReason::Refusal,
3260 meta: None,
3261 })
3262 } else {
3263 Ok(acp::PromptResponse {
3264 stop_reason: acp::StopReason::EndTurn,
3265 meta: None,
3266 })
3267 }
3268 }
3269 .boxed_local()
3270 }
3271 }));
3272
3273 let thread = cx
3274 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3275 .await
3276 .unwrap();
3277
3278 // Track if we see a Refusal event
3279 let saw_refusal_event = Arc::new(std::sync::Mutex::new(false));
3280 let saw_refusal_event_captured = saw_refusal_event.clone();
3281 thread.update(cx, |_thread, cx| {
3282 cx.subscribe(
3283 &thread,
3284 move |_thread, _event_thread, event: &AcpThreadEvent, _cx| {
3285 if matches!(event, AcpThreadEvent::Refusal) {
3286 *saw_refusal_event_captured.lock().unwrap() = true;
3287 }
3288 },
3289 )
3290 .detach();
3291 });
3292
3293 // Send a user message - this will trigger tool call and then refusal
3294 let send_task = thread.update(cx, |thread, cx| {
3295 thread.send(
3296 vec![acp::ContentBlock::Text(acp::TextContent {
3297 text: "Hello".into(),
3298 annotations: None,
3299 meta: None,
3300 })],
3301 cx,
3302 )
3303 });
3304 cx.background_executor.spawn(send_task).detach();
3305 cx.run_until_parked();
3306
3307 // Verify that:
3308 // 1. A Refusal event WAS emitted (because it's a tool result refusal, not user prompt)
3309 // 2. The user message was NOT truncated
3310 assert!(
3311 *saw_refusal_event.lock().unwrap(),
3312 "Refusal event should be emitted for tool result refusals"
3313 );
3314
3315 thread.read_with(cx, |thread, _| {
3316 let entries = thread.entries();
3317 assert!(entries.len() >= 2, "Should have user message and tool call");
3318
3319 // Verify user message is still there
3320 assert!(
3321 matches!(entries[0], AgentThreadEntry::UserMessage(_)),
3322 "User message should not be truncated"
3323 );
3324
3325 // Verify tool call is there with result
3326 if let AgentThreadEntry::ToolCall(tool_call) = &entries[1] {
3327 assert!(
3328 tool_call.raw_output.is_some(),
3329 "Tool call should have output"
3330 );
3331 } else {
3332 panic!("Expected tool call at index 1");
3333 }
3334 });
3335 }
3336
3337 #[gpui::test]
3338 async fn test_user_prompt_refusal_emits_event(cx: &mut TestAppContext) {
3339 init_test(cx);
3340
3341 let fs = FakeFs::new(cx.executor());
3342 let project = Project::test(fs, None, cx).await;
3343
3344 let refuse_next = Arc::new(AtomicBool::new(false));
3345 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3346 let refuse_next = refuse_next.clone();
3347 move |_request, _thread, _cx| {
3348 if refuse_next.load(SeqCst) {
3349 async move {
3350 Ok(acp::PromptResponse {
3351 stop_reason: acp::StopReason::Refusal,
3352 meta: None,
3353 })
3354 }
3355 .boxed_local()
3356 } else {
3357 async move {
3358 Ok(acp::PromptResponse {
3359 stop_reason: acp::StopReason::EndTurn,
3360 meta: None,
3361 })
3362 }
3363 .boxed_local()
3364 }
3365 }
3366 }));
3367
3368 let thread = cx
3369 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3370 .await
3371 .unwrap();
3372
3373 // Track if we see a Refusal event
3374 let saw_refusal_event = Arc::new(std::sync::Mutex::new(false));
3375 let saw_refusal_event_captured = saw_refusal_event.clone();
3376 thread.update(cx, |_thread, cx| {
3377 cx.subscribe(
3378 &thread,
3379 move |_thread, _event_thread, event: &AcpThreadEvent, _cx| {
3380 if matches!(event, AcpThreadEvent::Refusal) {
3381 *saw_refusal_event_captured.lock().unwrap() = true;
3382 }
3383 },
3384 )
3385 .detach();
3386 });
3387
3388 // Send a message that will be refused
3389 refuse_next.store(true, SeqCst);
3390 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["hello".into()], cx)))
3391 .await
3392 .unwrap();
3393
3394 // Verify that a Refusal event WAS emitted for user prompt refusal
3395 assert!(
3396 *saw_refusal_event.lock().unwrap(),
3397 "Refusal event should be emitted for user prompt refusals"
3398 );
3399
3400 // Verify the message was truncated (user prompt refusal)
3401 thread.read_with(cx, |thread, cx| {
3402 assert_eq!(thread.to_markdown(cx), "");
3403 });
3404 }
3405
3406 #[gpui::test]
3407 async fn test_refusal(cx: &mut TestAppContext) {
3408 init_test(cx);
3409 let fs = FakeFs::new(cx.background_executor.clone());
3410 fs.insert_tree(path!("/"), json!({})).await;
3411 let project = Project::test(fs.clone(), [path!("/").as_ref()], cx).await;
3412
3413 let refuse_next = Arc::new(AtomicBool::new(false));
3414 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3415 let refuse_next = refuse_next.clone();
3416 move |request, thread, mut cx| {
3417 let refuse_next = refuse_next.clone();
3418 async move {
3419 if refuse_next.load(SeqCst) {
3420 return Ok(acp::PromptResponse {
3421 stop_reason: acp::StopReason::Refusal,
3422 meta: None,
3423 });
3424 }
3425
3426 let acp::ContentBlock::Text(content) = &request.prompt[0] else {
3427 panic!("expected text content block");
3428 };
3429 thread.update(&mut cx, |thread, cx| {
3430 thread
3431 .handle_session_update(
3432 acp::SessionUpdate::AgentMessageChunk {
3433 content: content.text.to_uppercase().into(),
3434 },
3435 cx,
3436 )
3437 .unwrap();
3438 })?;
3439 Ok(acp::PromptResponse {
3440 stop_reason: acp::StopReason::EndTurn,
3441 meta: None,
3442 })
3443 }
3444 .boxed_local()
3445 }
3446 }));
3447 let thread = cx
3448 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3449 .await
3450 .unwrap();
3451
3452 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["hello".into()], cx)))
3453 .await
3454 .unwrap();
3455 thread.read_with(cx, |thread, cx| {
3456 assert_eq!(
3457 thread.to_markdown(cx),
3458 indoc! {"
3459 ## User
3460
3461 hello
3462
3463 ## Assistant
3464
3465 HELLO
3466
3467 "}
3468 );
3469 });
3470
3471 // Simulate refusing the second message. The message should be truncated
3472 // when a user prompt is refused.
3473 refuse_next.store(true, SeqCst);
3474 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["world".into()], cx)))
3475 .await
3476 .unwrap();
3477 thread.read_with(cx, |thread, cx| {
3478 assert_eq!(
3479 thread.to_markdown(cx),
3480 indoc! {"
3481 ## User
3482
3483 hello
3484
3485 ## Assistant
3486
3487 HELLO
3488
3489 "}
3490 );
3491 });
3492 }
3493
3494 async fn run_until_first_tool_call(
3495 thread: &Entity<AcpThread>,
3496 cx: &mut TestAppContext,
3497 ) -> usize {
3498 let (mut tx, mut rx) = mpsc::channel::<usize>(1);
3499
3500 let subscription = cx.update(|cx| {
3501 cx.subscribe(thread, move |thread, _, cx| {
3502 for (ix, entry) in thread.read(cx).entries.iter().enumerate() {
3503 if matches!(entry, AgentThreadEntry::ToolCall(_)) {
3504 return tx.try_send(ix).unwrap();
3505 }
3506 }
3507 })
3508 });
3509
3510 select! {
3511 _ = futures::FutureExt::fuse(smol::Timer::after(Duration::from_secs(10))) => {
3512 panic!("Timeout waiting for tool call")
3513 }
3514 ix = rx.next().fuse() => {
3515 drop(subscription);
3516 ix.unwrap()
3517 }
3518 }
3519 }
3520
3521 #[derive(Clone, Default)]
3522 struct FakeAgentConnection {
3523 auth_methods: Vec<acp::AuthMethod>,
3524 sessions: Arc<parking_lot::Mutex<HashMap<acp::SessionId, WeakEntity<AcpThread>>>>,
3525 on_user_message: Option<
3526 Rc<
3527 dyn Fn(
3528 acp::PromptRequest,
3529 WeakEntity<AcpThread>,
3530 AsyncApp,
3531 ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
3532 + 'static,
3533 >,
3534 >,
3535 }
3536
3537 impl FakeAgentConnection {
3538 fn new() -> Self {
3539 Self {
3540 auth_methods: Vec::new(),
3541 on_user_message: None,
3542 sessions: Arc::default(),
3543 }
3544 }
3545
3546 #[expect(unused)]
3547 fn with_auth_methods(mut self, auth_methods: Vec<acp::AuthMethod>) -> Self {
3548 self.auth_methods = auth_methods;
3549 self
3550 }
3551
3552 fn on_user_message(
3553 mut self,
3554 handler: impl Fn(
3555 acp::PromptRequest,
3556 WeakEntity<AcpThread>,
3557 AsyncApp,
3558 ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
3559 + 'static,
3560 ) -> Self {
3561 self.on_user_message.replace(Rc::new(handler));
3562 self
3563 }
3564 }
3565
3566 impl AgentConnection for FakeAgentConnection {
3567 fn auth_methods(&self) -> &[acp::AuthMethod] {
3568 &self.auth_methods
3569 }
3570
3571 fn new_thread(
3572 self: Rc<Self>,
3573 project: Entity<Project>,
3574 _cwd: &Path,
3575 cx: &mut App,
3576 ) -> Task<gpui::Result<Entity<AcpThread>>> {
3577 let session_id = acp::SessionId(
3578 rand::rng()
3579 .sample_iter(&distr::Alphanumeric)
3580 .take(7)
3581 .map(char::from)
3582 .collect::<String>()
3583 .into(),
3584 );
3585 let action_log = cx.new(|_| ActionLog::new(project.clone()));
3586 let thread = cx.new(|cx| {
3587 AcpThread::new(
3588 "Test",
3589 self.clone(),
3590 project,
3591 action_log,
3592 session_id.clone(),
3593 watch::Receiver::constant(acp::PromptCapabilities {
3594 image: true,
3595 audio: true,
3596 embedded_context: true,
3597 meta: None,
3598 }),
3599 cx,
3600 )
3601 });
3602 self.sessions.lock().insert(session_id, thread.downgrade());
3603 Task::ready(Ok(thread))
3604 }
3605
3606 fn authenticate(&self, method: acp::AuthMethodId, _cx: &mut App) -> Task<gpui::Result<()>> {
3607 if self.auth_methods().iter().any(|m| m.id == method) {
3608 Task::ready(Ok(()))
3609 } else {
3610 Task::ready(Err(anyhow!("Invalid Auth Method")))
3611 }
3612 }
3613
3614 fn prompt(
3615 &self,
3616 _id: Option<UserMessageId>,
3617 params: acp::PromptRequest,
3618 cx: &mut App,
3619 ) -> Task<gpui::Result<acp::PromptResponse>> {
3620 let sessions = self.sessions.lock();
3621 let thread = sessions.get(¶ms.session_id).unwrap();
3622 if let Some(handler) = &self.on_user_message {
3623 let handler = handler.clone();
3624 let thread = thread.clone();
3625 cx.spawn(async move |cx| handler(params, thread, cx.clone()).await)
3626 } else {
3627 Task::ready(Ok(acp::PromptResponse {
3628 stop_reason: acp::StopReason::EndTurn,
3629 meta: None,
3630 }))
3631 }
3632 }
3633
3634 fn cancel(&self, session_id: &acp::SessionId, cx: &mut App) {
3635 let sessions = self.sessions.lock();
3636 let thread = sessions.get(session_id).unwrap().clone();
3637
3638 cx.spawn(async move |cx| {
3639 thread
3640 .update(cx, |thread, cx| thread.cancel(cx))
3641 .unwrap()
3642 .await
3643 })
3644 .detach();
3645 }
3646
3647 fn truncate(
3648 &self,
3649 session_id: &acp::SessionId,
3650 _cx: &App,
3651 ) -> Option<Rc<dyn AgentSessionTruncate>> {
3652 Some(Rc::new(FakeAgentSessionEditor {
3653 _session_id: session_id.clone(),
3654 }))
3655 }
3656
3657 fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
3658 self
3659 }
3660 }
3661
3662 struct FakeAgentSessionEditor {
3663 _session_id: acp::SessionId,
3664 }
3665
3666 impl AgentSessionTruncate for FakeAgentSessionEditor {
3667 fn run(&self, _message_id: UserMessageId, _cx: &mut App) -> Task<Result<()>> {
3668 Task::ready(Ok(()))
3669 }
3670 }
3671
3672 #[gpui::test]
3673 async fn test_tool_call_not_found_creates_failed_entry(cx: &mut TestAppContext) {
3674 init_test(cx);
3675
3676 let fs = FakeFs::new(cx.executor());
3677 let project = Project::test(fs, [], cx).await;
3678 let connection = Rc::new(FakeAgentConnection::new());
3679 let thread = cx
3680 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3681 .await
3682 .unwrap();
3683
3684 // Try to update a tool call that doesn't exist
3685 let nonexistent_id = acp::ToolCallId("nonexistent-tool-call".into());
3686 thread.update(cx, |thread, cx| {
3687 let result = thread.handle_session_update(
3688 acp::SessionUpdate::ToolCallUpdate(acp::ToolCallUpdate {
3689 id: nonexistent_id.clone(),
3690 fields: acp::ToolCallUpdateFields {
3691 status: Some(acp::ToolCallStatus::Completed),
3692 ..Default::default()
3693 },
3694 meta: None,
3695 }),
3696 cx,
3697 );
3698
3699 // The update should succeed (not return an error)
3700 assert!(result.is_ok());
3701
3702 // There should now be exactly one entry in the thread
3703 assert_eq!(thread.entries.len(), 1);
3704
3705 // The entry should be a failed tool call
3706 if let AgentThreadEntry::ToolCall(tool_call) = &thread.entries[0] {
3707 assert_eq!(tool_call.id, nonexistent_id);
3708 assert!(matches!(tool_call.status, ToolCallStatus::Failed));
3709 assert_eq!(tool_call.kind, acp::ToolKind::Fetch);
3710
3711 // Check that the content contains the error message
3712 assert_eq!(tool_call.content.len(), 1);
3713 if let ToolCallContent::ContentBlock(content_block) = &tool_call.content[0] {
3714 match content_block {
3715 ContentBlock::Markdown { markdown } => {
3716 let markdown_text = markdown.read(cx).source();
3717 assert!(markdown_text.contains("Tool call not found"));
3718 }
3719 ContentBlock::Empty => panic!("Expected markdown content, got empty"),
3720 ContentBlock::ResourceLink { .. } => {
3721 panic!("Expected markdown content, got resource link")
3722 }
3723 }
3724 } else {
3725 panic!("Expected ContentBlock, got: {:?}", tool_call.content[0]);
3726 }
3727 } else {
3728 panic!("Expected ToolCall entry, got: {:?}", thread.entries[0]);
3729 }
3730 });
3731 }
3732}