1mod connection;
2mod diff;
3mod mention;
4mod terminal;
5
6use ::terminal::terminal_settings::TerminalSettings;
7use agent_settings::AgentSettings;
8use collections::HashSet;
9pub use connection::*;
10pub use diff::*;
11use language::language_settings::FormatOnSave;
12pub use mention::*;
13use project::lsp_store::{FormatTrigger, LspFormatTarget};
14use serde::{Deserialize, Serialize};
15use settings::{Settings as _, SettingsLocation};
16use task::{Shell, ShellBuilder};
17pub use terminal::*;
18
19use action_log::ActionLog;
20use agent_client_protocol::{self as acp};
21use anyhow::{Context as _, Result, anyhow};
22use editor::Bias;
23use futures::{FutureExt, channel::oneshot, future::BoxFuture};
24use gpui::{AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Task, WeakEntity};
25use itertools::Itertools;
26use language::{Anchor, Buffer, BufferSnapshot, LanguageRegistry, Point, ToPoint, text_diff};
27use markdown::Markdown;
28use project::{AgentLocation, Project, git_store::GitStoreCheckpoint};
29use std::collections::HashMap;
30use std::error::Error;
31use std::fmt::{Formatter, Write};
32use std::ops::Range;
33use std::process::ExitStatus;
34use std::rc::Rc;
35use std::time::{Duration, Instant};
36use std::{fmt::Display, mem, path::PathBuf, sync::Arc};
37use ui::App;
38use util::{ResultExt, get_default_system_shell_preferring_bash};
39use uuid::Uuid;
40
41#[derive(Debug)]
42pub struct UserMessage {
43 pub id: Option<UserMessageId>,
44 pub content: ContentBlock,
45 pub chunks: Vec<acp::ContentBlock>,
46 pub checkpoint: Option<Checkpoint>,
47}
48
49#[derive(Debug)]
50pub struct Checkpoint {
51 git_checkpoint: GitStoreCheckpoint,
52 pub show: bool,
53}
54
55impl UserMessage {
56 fn to_markdown(&self, cx: &App) -> String {
57 let mut markdown = String::new();
58 if self
59 .checkpoint
60 .as_ref()
61 .is_some_and(|checkpoint| checkpoint.show)
62 {
63 writeln!(markdown, "## User (checkpoint)").unwrap();
64 } else {
65 writeln!(markdown, "## User").unwrap();
66 }
67 writeln!(markdown).unwrap();
68 writeln!(markdown, "{}", self.content.to_markdown(cx)).unwrap();
69 writeln!(markdown).unwrap();
70 markdown
71 }
72}
73
74#[derive(Debug, PartialEq)]
75pub struct AssistantMessage {
76 pub chunks: Vec<AssistantMessageChunk>,
77}
78
79impl AssistantMessage {
80 pub fn to_markdown(&self, cx: &App) -> String {
81 format!(
82 "## Assistant\n\n{}\n\n",
83 self.chunks
84 .iter()
85 .map(|chunk| chunk.to_markdown(cx))
86 .join("\n\n")
87 )
88 }
89}
90
91#[derive(Debug, PartialEq)]
92pub enum AssistantMessageChunk {
93 Message { block: ContentBlock },
94 Thought { block: ContentBlock },
95}
96
97impl AssistantMessageChunk {
98 pub fn from_str(chunk: &str, language_registry: &Arc<LanguageRegistry>, cx: &mut App) -> Self {
99 Self::Message {
100 block: ContentBlock::new(chunk.into(), language_registry, cx),
101 }
102 }
103
104 fn to_markdown(&self, cx: &App) -> String {
105 match self {
106 Self::Message { block } => block.to_markdown(cx).to_string(),
107 Self::Thought { block } => {
108 format!("<thinking>\n{}\n</thinking>", block.to_markdown(cx))
109 }
110 }
111 }
112}
113
114#[derive(Debug)]
115pub enum AgentThreadEntry {
116 UserMessage(UserMessage),
117 AssistantMessage(AssistantMessage),
118 ToolCall(ToolCall),
119}
120
121impl AgentThreadEntry {
122 pub fn to_markdown(&self, cx: &App) -> String {
123 match self {
124 Self::UserMessage(message) => message.to_markdown(cx),
125 Self::AssistantMessage(message) => message.to_markdown(cx),
126 Self::ToolCall(tool_call) => tool_call.to_markdown(cx),
127 }
128 }
129
130 pub fn user_message(&self) -> Option<&UserMessage> {
131 if let AgentThreadEntry::UserMessage(message) = self {
132 Some(message)
133 } else {
134 None
135 }
136 }
137
138 pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
139 if let AgentThreadEntry::ToolCall(call) = self {
140 itertools::Either::Left(call.diffs())
141 } else {
142 itertools::Either::Right(std::iter::empty())
143 }
144 }
145
146 pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
147 if let AgentThreadEntry::ToolCall(call) = self {
148 itertools::Either::Left(call.terminals())
149 } else {
150 itertools::Either::Right(std::iter::empty())
151 }
152 }
153
154 pub fn location(&self, ix: usize) -> Option<(acp::ToolCallLocation, AgentLocation)> {
155 if let AgentThreadEntry::ToolCall(ToolCall {
156 locations,
157 resolved_locations,
158 ..
159 }) = self
160 {
161 Some((
162 locations.get(ix)?.clone(),
163 resolved_locations.get(ix)?.clone()?,
164 ))
165 } else {
166 None
167 }
168 }
169}
170
171#[derive(Debug)]
172pub struct ToolCall {
173 pub id: acp::ToolCallId,
174 pub label: Entity<Markdown>,
175 pub kind: acp::ToolKind,
176 pub content: Vec<ToolCallContent>,
177 pub status: ToolCallStatus,
178 pub locations: Vec<acp::ToolCallLocation>,
179 pub resolved_locations: Vec<Option<AgentLocation>>,
180 pub raw_input: Option<serde_json::Value>,
181 pub raw_output: Option<serde_json::Value>,
182}
183
184impl ToolCall {
185 fn from_acp(
186 tool_call: acp::ToolCall,
187 status: ToolCallStatus,
188 language_registry: Arc<LanguageRegistry>,
189 terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
190 cx: &mut App,
191 ) -> Result<Self> {
192 let title = if let Some((first_line, _)) = tool_call.title.split_once("\n") {
193 first_line.to_owned() + "…"
194 } else {
195 tool_call.title
196 };
197 let mut content = Vec::with_capacity(tool_call.content.len());
198 for item in tool_call.content {
199 content.push(ToolCallContent::from_acp(
200 item,
201 language_registry.clone(),
202 terminals,
203 cx,
204 )?);
205 }
206
207 let result = Self {
208 id: tool_call.id,
209 label: cx
210 .new(|cx| Markdown::new(title.into(), Some(language_registry.clone()), None, cx)),
211 kind: tool_call.kind,
212 content,
213 locations: tool_call.locations,
214 resolved_locations: Vec::default(),
215 status,
216 raw_input: tool_call.raw_input,
217 raw_output: tool_call.raw_output,
218 };
219 Ok(result)
220 }
221
222 fn update_fields(
223 &mut self,
224 fields: acp::ToolCallUpdateFields,
225 language_registry: Arc<LanguageRegistry>,
226 terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
227 cx: &mut App,
228 ) -> Result<()> {
229 let acp::ToolCallUpdateFields {
230 kind,
231 status,
232 title,
233 content,
234 locations,
235 raw_input,
236 raw_output,
237 } = fields;
238
239 if let Some(kind) = kind {
240 self.kind = kind;
241 }
242
243 if let Some(status) = status {
244 self.status = status.into();
245 }
246
247 if let Some(title) = title {
248 self.label.update(cx, |label, cx| {
249 if let Some((first_line, _)) = title.split_once("\n") {
250 label.replace(first_line.to_owned() + "…", cx)
251 } else {
252 label.replace(title, cx);
253 }
254 });
255 }
256
257 if let Some(content) = content {
258 let new_content_len = content.len();
259 let mut content = content.into_iter();
260
261 // Reuse existing content if we can
262 for (old, new) in self.content.iter_mut().zip(content.by_ref()) {
263 old.update_from_acp(new, language_registry.clone(), terminals, cx)?;
264 }
265 for new in content {
266 self.content.push(ToolCallContent::from_acp(
267 new,
268 language_registry.clone(),
269 terminals,
270 cx,
271 )?)
272 }
273 self.content.truncate(new_content_len);
274 }
275
276 if let Some(locations) = locations {
277 self.locations = locations;
278 }
279
280 if let Some(raw_input) = raw_input {
281 self.raw_input = Some(raw_input);
282 }
283
284 if let Some(raw_output) = raw_output {
285 if self.content.is_empty()
286 && let Some(markdown) = markdown_for_raw_output(&raw_output, &language_registry, cx)
287 {
288 self.content
289 .push(ToolCallContent::ContentBlock(ContentBlock::Markdown {
290 markdown,
291 }));
292 }
293 self.raw_output = Some(raw_output);
294 }
295 Ok(())
296 }
297
298 pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
299 self.content.iter().filter_map(|content| match content {
300 ToolCallContent::Diff(diff) => Some(diff),
301 ToolCallContent::ContentBlock(_) => None,
302 ToolCallContent::Terminal(_) => None,
303 })
304 }
305
306 pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
307 self.content.iter().filter_map(|content| match content {
308 ToolCallContent::Terminal(terminal) => Some(terminal),
309 ToolCallContent::ContentBlock(_) => None,
310 ToolCallContent::Diff(_) => None,
311 })
312 }
313
314 fn to_markdown(&self, cx: &App) -> String {
315 let mut markdown = format!(
316 "**Tool Call: {}**\nStatus: {}\n\n",
317 self.label.read(cx).source(),
318 self.status
319 );
320 for content in &self.content {
321 markdown.push_str(content.to_markdown(cx).as_str());
322 markdown.push_str("\n\n");
323 }
324 markdown
325 }
326
327 async fn resolve_location(
328 location: acp::ToolCallLocation,
329 project: WeakEntity<Project>,
330 cx: &mut AsyncApp,
331 ) -> Option<AgentLocation> {
332 let buffer = project
333 .update(cx, |project, cx| {
334 project
335 .project_path_for_absolute_path(&location.path, cx)
336 .map(|path| project.open_buffer(path, cx))
337 })
338 .ok()??;
339 let buffer = buffer.await.log_err()?;
340 let position = buffer
341 .update(cx, |buffer, _| {
342 if let Some(row) = location.line {
343 let snapshot = buffer.snapshot();
344 let column = snapshot.indent_size_for_line(row).len;
345 let point = snapshot.clip_point(Point::new(row, column), Bias::Left);
346 snapshot.anchor_before(point)
347 } else {
348 Anchor::MIN
349 }
350 })
351 .ok()?;
352
353 Some(AgentLocation {
354 buffer: buffer.downgrade(),
355 position,
356 })
357 }
358
359 fn resolve_locations(
360 &self,
361 project: Entity<Project>,
362 cx: &mut App,
363 ) -> Task<Vec<Option<AgentLocation>>> {
364 let locations = self.locations.clone();
365 project.update(cx, |_, cx| {
366 cx.spawn(async move |project, cx| {
367 let mut new_locations = Vec::new();
368 for location in locations {
369 new_locations.push(Self::resolve_location(location, project.clone(), cx).await);
370 }
371 new_locations
372 })
373 })
374 }
375}
376
377#[derive(Debug)]
378pub enum ToolCallStatus {
379 /// The tool call hasn't started running yet, but we start showing it to
380 /// the user.
381 Pending,
382 /// The tool call is waiting for confirmation from the user.
383 WaitingForConfirmation {
384 options: Vec<acp::PermissionOption>,
385 respond_tx: oneshot::Sender<acp::PermissionOptionId>,
386 },
387 /// The tool call is currently running.
388 InProgress,
389 /// The tool call completed successfully.
390 Completed,
391 /// The tool call failed.
392 Failed,
393 /// The user rejected the tool call.
394 Rejected,
395 /// The user canceled generation so the tool call was canceled.
396 Canceled,
397}
398
399impl From<acp::ToolCallStatus> for ToolCallStatus {
400 fn from(status: acp::ToolCallStatus) -> Self {
401 match status {
402 acp::ToolCallStatus::Pending => Self::Pending,
403 acp::ToolCallStatus::InProgress => Self::InProgress,
404 acp::ToolCallStatus::Completed => Self::Completed,
405 acp::ToolCallStatus::Failed => Self::Failed,
406 }
407 }
408}
409
410impl Display for ToolCallStatus {
411 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
412 write!(
413 f,
414 "{}",
415 match self {
416 ToolCallStatus::Pending => "Pending",
417 ToolCallStatus::WaitingForConfirmation { .. } => "Waiting for confirmation",
418 ToolCallStatus::InProgress => "In Progress",
419 ToolCallStatus::Completed => "Completed",
420 ToolCallStatus::Failed => "Failed",
421 ToolCallStatus::Rejected => "Rejected",
422 ToolCallStatus::Canceled => "Canceled",
423 }
424 )
425 }
426}
427
428#[derive(Debug, PartialEq, Clone)]
429pub enum ContentBlock {
430 Empty,
431 Markdown { markdown: Entity<Markdown> },
432 ResourceLink { resource_link: acp::ResourceLink },
433}
434
435impl ContentBlock {
436 pub fn new(
437 block: acp::ContentBlock,
438 language_registry: &Arc<LanguageRegistry>,
439 cx: &mut App,
440 ) -> Self {
441 let mut this = Self::Empty;
442 this.append(block, language_registry, cx);
443 this
444 }
445
446 pub fn new_combined(
447 blocks: impl IntoIterator<Item = acp::ContentBlock>,
448 language_registry: Arc<LanguageRegistry>,
449 cx: &mut App,
450 ) -> Self {
451 let mut this = Self::Empty;
452 for block in blocks {
453 this.append(block, &language_registry, cx);
454 }
455 this
456 }
457
458 pub fn append(
459 &mut self,
460 block: acp::ContentBlock,
461 language_registry: &Arc<LanguageRegistry>,
462 cx: &mut App,
463 ) {
464 if matches!(self, ContentBlock::Empty)
465 && let acp::ContentBlock::ResourceLink(resource_link) = block
466 {
467 *self = ContentBlock::ResourceLink { resource_link };
468 return;
469 }
470
471 let new_content = self.block_string_contents(block);
472
473 match self {
474 ContentBlock::Empty => {
475 *self = Self::create_markdown_block(new_content, language_registry, cx);
476 }
477 ContentBlock::Markdown { markdown } => {
478 markdown.update(cx, |markdown, cx| markdown.append(&new_content, cx));
479 }
480 ContentBlock::ResourceLink { resource_link } => {
481 let existing_content = Self::resource_link_md(&resource_link.uri);
482 let combined = format!("{}\n{}", existing_content, new_content);
483
484 *self = Self::create_markdown_block(combined, language_registry, cx);
485 }
486 }
487 }
488
489 fn create_markdown_block(
490 content: String,
491 language_registry: &Arc<LanguageRegistry>,
492 cx: &mut App,
493 ) -> ContentBlock {
494 ContentBlock::Markdown {
495 markdown: cx
496 .new(|cx| Markdown::new(content.into(), Some(language_registry.clone()), None, cx)),
497 }
498 }
499
500 fn block_string_contents(&self, block: acp::ContentBlock) -> String {
501 match block {
502 acp::ContentBlock::Text(text_content) => text_content.text,
503 acp::ContentBlock::ResourceLink(resource_link) => {
504 Self::resource_link_md(&resource_link.uri)
505 }
506 acp::ContentBlock::Resource(acp::EmbeddedResource {
507 resource:
508 acp::EmbeddedResourceResource::TextResourceContents(acp::TextResourceContents {
509 uri,
510 ..
511 }),
512 ..
513 }) => Self::resource_link_md(&uri),
514 acp::ContentBlock::Image(image) => Self::image_md(&image),
515 acp::ContentBlock::Audio(_) | acp::ContentBlock::Resource(_) => String::new(),
516 }
517 }
518
519 fn resource_link_md(uri: &str) -> String {
520 if let Some(uri) = MentionUri::parse(uri).log_err() {
521 uri.as_link().to_string()
522 } else {
523 uri.to_string()
524 }
525 }
526
527 fn image_md(_image: &acp::ImageContent) -> String {
528 "`Image`".into()
529 }
530
531 pub fn to_markdown<'a>(&'a self, cx: &'a App) -> &'a str {
532 match self {
533 ContentBlock::Empty => "",
534 ContentBlock::Markdown { markdown } => markdown.read(cx).source(),
535 ContentBlock::ResourceLink { resource_link } => &resource_link.uri,
536 }
537 }
538
539 pub fn markdown(&self) -> Option<&Entity<Markdown>> {
540 match self {
541 ContentBlock::Empty => None,
542 ContentBlock::Markdown { markdown } => Some(markdown),
543 ContentBlock::ResourceLink { .. } => None,
544 }
545 }
546
547 pub fn resource_link(&self) -> Option<&acp::ResourceLink> {
548 match self {
549 ContentBlock::ResourceLink { resource_link } => Some(resource_link),
550 _ => None,
551 }
552 }
553}
554
555#[derive(Debug)]
556pub enum ToolCallContent {
557 ContentBlock(ContentBlock),
558 Diff(Entity<Diff>),
559 Terminal(Entity<Terminal>),
560}
561
562impl ToolCallContent {
563 pub fn from_acp(
564 content: acp::ToolCallContent,
565 language_registry: Arc<LanguageRegistry>,
566 terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
567 cx: &mut App,
568 ) -> Result<Self> {
569 match content {
570 acp::ToolCallContent::Content { content } => Ok(Self::ContentBlock(ContentBlock::new(
571 content,
572 &language_registry,
573 cx,
574 ))),
575 acp::ToolCallContent::Diff { diff } => Ok(Self::Diff(cx.new(|cx| {
576 Diff::finalized(
577 diff.path.to_string_lossy().into_owned(),
578 diff.old_text,
579 diff.new_text,
580 language_registry,
581 cx,
582 )
583 }))),
584 acp::ToolCallContent::Terminal { terminal_id } => terminals
585 .get(&terminal_id)
586 .cloned()
587 .map(Self::Terminal)
588 .ok_or_else(|| anyhow::anyhow!("Terminal with id `{}` not found", terminal_id)),
589 }
590 }
591
592 pub fn update_from_acp(
593 &mut self,
594 new: acp::ToolCallContent,
595 language_registry: Arc<LanguageRegistry>,
596 terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
597 cx: &mut App,
598 ) -> Result<()> {
599 let needs_update = match (&self, &new) {
600 (Self::Diff(old_diff), acp::ToolCallContent::Diff { diff: new_diff }) => {
601 old_diff.read(cx).needs_update(
602 new_diff.old_text.as_deref().unwrap_or(""),
603 &new_diff.new_text,
604 cx,
605 )
606 }
607 _ => true,
608 };
609
610 if needs_update {
611 *self = Self::from_acp(new, language_registry, terminals, cx)?;
612 }
613 Ok(())
614 }
615
616 pub fn to_markdown(&self, cx: &App) -> String {
617 match self {
618 Self::ContentBlock(content) => content.to_markdown(cx).to_string(),
619 Self::Diff(diff) => diff.read(cx).to_markdown(cx),
620 Self::Terminal(terminal) => terminal.read(cx).to_markdown(cx),
621 }
622 }
623}
624
625#[derive(Debug, PartialEq)]
626pub enum ToolCallUpdate {
627 UpdateFields(acp::ToolCallUpdate),
628 UpdateDiff(ToolCallUpdateDiff),
629 UpdateTerminal(ToolCallUpdateTerminal),
630}
631
632impl ToolCallUpdate {
633 fn id(&self) -> &acp::ToolCallId {
634 match self {
635 Self::UpdateFields(update) => &update.id,
636 Self::UpdateDiff(diff) => &diff.id,
637 Self::UpdateTerminal(terminal) => &terminal.id,
638 }
639 }
640}
641
642impl From<acp::ToolCallUpdate> for ToolCallUpdate {
643 fn from(update: acp::ToolCallUpdate) -> Self {
644 Self::UpdateFields(update)
645 }
646}
647
648impl From<ToolCallUpdateDiff> for ToolCallUpdate {
649 fn from(diff: ToolCallUpdateDiff) -> Self {
650 Self::UpdateDiff(diff)
651 }
652}
653
654#[derive(Debug, PartialEq)]
655pub struct ToolCallUpdateDiff {
656 pub id: acp::ToolCallId,
657 pub diff: Entity<Diff>,
658}
659
660impl From<ToolCallUpdateTerminal> for ToolCallUpdate {
661 fn from(terminal: ToolCallUpdateTerminal) -> Self {
662 Self::UpdateTerminal(terminal)
663 }
664}
665
666#[derive(Debug, PartialEq)]
667pub struct ToolCallUpdateTerminal {
668 pub id: acp::ToolCallId,
669 pub terminal: Entity<Terminal>,
670}
671
672#[derive(Debug, Default)]
673pub struct Plan {
674 pub entries: Vec<PlanEntry>,
675}
676
677#[derive(Debug)]
678pub struct PlanStats<'a> {
679 pub in_progress_entry: Option<&'a PlanEntry>,
680 pub pending: u32,
681 pub completed: u32,
682}
683
684impl Plan {
685 pub fn is_empty(&self) -> bool {
686 self.entries.is_empty()
687 }
688
689 pub fn stats(&self) -> PlanStats<'_> {
690 let mut stats = PlanStats {
691 in_progress_entry: None,
692 pending: 0,
693 completed: 0,
694 };
695
696 for entry in &self.entries {
697 match &entry.status {
698 acp::PlanEntryStatus::Pending => {
699 stats.pending += 1;
700 }
701 acp::PlanEntryStatus::InProgress => {
702 stats.in_progress_entry = stats.in_progress_entry.or(Some(entry));
703 }
704 acp::PlanEntryStatus::Completed => {
705 stats.completed += 1;
706 }
707 }
708 }
709
710 stats
711 }
712}
713
714#[derive(Debug)]
715pub struct PlanEntry {
716 pub content: Entity<Markdown>,
717 pub priority: acp::PlanEntryPriority,
718 pub status: acp::PlanEntryStatus,
719}
720
721impl PlanEntry {
722 pub fn from_acp(entry: acp::PlanEntry, cx: &mut App) -> Self {
723 Self {
724 content: cx.new(|cx| Markdown::new(entry.content.into(), None, None, cx)),
725 priority: entry.priority,
726 status: entry.status,
727 }
728 }
729}
730
731#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
732pub struct TokenUsage {
733 pub max_tokens: u64,
734 pub used_tokens: u64,
735}
736
737impl TokenUsage {
738 pub fn ratio(&self) -> TokenUsageRatio {
739 #[cfg(debug_assertions)]
740 let warning_threshold: f32 = std::env::var("ZED_THREAD_WARNING_THRESHOLD")
741 .unwrap_or("0.8".to_string())
742 .parse()
743 .unwrap();
744 #[cfg(not(debug_assertions))]
745 let warning_threshold: f32 = 0.8;
746
747 // When the maximum is unknown because there is no selected model,
748 // avoid showing the token limit warning.
749 if self.max_tokens == 0 {
750 TokenUsageRatio::Normal
751 } else if self.used_tokens >= self.max_tokens {
752 TokenUsageRatio::Exceeded
753 } else if self.used_tokens as f32 / self.max_tokens as f32 >= warning_threshold {
754 TokenUsageRatio::Warning
755 } else {
756 TokenUsageRatio::Normal
757 }
758 }
759}
760
761#[derive(Debug, Clone, PartialEq, Eq)]
762pub enum TokenUsageRatio {
763 Normal,
764 Warning,
765 Exceeded,
766}
767
768#[derive(Debug, Clone)]
769pub struct RetryStatus {
770 pub last_error: SharedString,
771 pub attempt: usize,
772 pub max_attempts: usize,
773 pub started_at: Instant,
774 pub duration: Duration,
775}
776
777pub struct AcpThread {
778 title: SharedString,
779 entries: Vec<AgentThreadEntry>,
780 plan: Plan,
781 project: Entity<Project>,
782 action_log: Entity<ActionLog>,
783 shared_buffers: HashMap<Entity<Buffer>, BufferSnapshot>,
784 send_task: Option<Task<()>>,
785 connection: Rc<dyn AgentConnection>,
786 session_id: acp::SessionId,
787 token_usage: Option<TokenUsage>,
788 prompt_capabilities: acp::PromptCapabilities,
789 _observe_prompt_capabilities: Task<anyhow::Result<()>>,
790 terminals: HashMap<acp::TerminalId, Entity<Terminal>>,
791 pending_terminal_output: HashMap<acp::TerminalId, Vec<Vec<u8>>>,
792 pending_terminal_exit: HashMap<acp::TerminalId, acp::TerminalExitStatus>,
793}
794
795#[derive(Debug)]
796pub enum AcpThreadEvent {
797 NewEntry,
798 TitleUpdated,
799 TokenUsageUpdated,
800 EntryUpdated(usize),
801 EntriesRemoved(Range<usize>),
802 ToolAuthorizationRequired,
803 Retry(RetryStatus),
804 Stopped,
805 Error,
806 LoadError(LoadError),
807 PromptCapabilitiesUpdated,
808 Refusal,
809 AvailableCommandsUpdated(Vec<acp::AvailableCommand>),
810 ModeUpdated(acp::SessionModeId),
811}
812
813impl EventEmitter<AcpThreadEvent> for AcpThread {}
814
815#[derive(Debug, Clone)]
816pub enum TerminalProviderEvent {
817 Created {
818 terminal_id: acp::TerminalId,
819 label: String,
820 cwd: Option<PathBuf>,
821 output_byte_limit: Option<u64>,
822 terminal: Entity<::terminal::Terminal>,
823 },
824 Output {
825 terminal_id: acp::TerminalId,
826 data: Vec<u8>,
827 },
828 TitleChanged {
829 terminal_id: acp::TerminalId,
830 title: String,
831 },
832 Exit {
833 terminal_id: acp::TerminalId,
834 status: acp::TerminalExitStatus,
835 },
836}
837
838#[derive(Debug, Clone)]
839pub enum TerminalProviderCommand {
840 WriteInput {
841 terminal_id: acp::TerminalId,
842 bytes: Vec<u8>,
843 },
844 Resize {
845 terminal_id: acp::TerminalId,
846 cols: u16,
847 rows: u16,
848 },
849 Close {
850 terminal_id: acp::TerminalId,
851 },
852}
853
854impl AcpThread {
855 pub fn on_terminal_provider_event(
856 &mut self,
857 event: TerminalProviderEvent,
858 cx: &mut Context<Self>,
859 ) {
860 match event {
861 TerminalProviderEvent::Created {
862 terminal_id,
863 label,
864 cwd,
865 output_byte_limit,
866 terminal,
867 } => {
868 let entity = self.register_terminal_created(
869 terminal_id.clone(),
870 label,
871 cwd,
872 output_byte_limit,
873 terminal,
874 cx,
875 );
876
877 if let Some(mut chunks) = self.pending_terminal_output.remove(&terminal_id) {
878 for data in chunks.drain(..) {
879 entity.update(cx, |term, cx| {
880 term.inner().update(cx, |inner, cx| {
881 inner.write_output(&data, cx);
882 })
883 });
884 }
885 }
886
887 if let Some(_status) = self.pending_terminal_exit.remove(&terminal_id) {
888 entity.update(cx, |_term, cx| {
889 cx.notify();
890 });
891 }
892
893 cx.notify();
894 }
895 TerminalProviderEvent::Output { terminal_id, data } => {
896 if let Some(entity) = self.terminals.get(&terminal_id) {
897 entity.update(cx, |term, cx| {
898 term.inner().update(cx, |inner, cx| {
899 inner.write_output(&data, cx);
900 })
901 });
902 } else {
903 self.pending_terminal_output
904 .entry(terminal_id)
905 .or_default()
906 .push(data);
907 }
908 }
909 TerminalProviderEvent::TitleChanged { terminal_id, title } => {
910 if let Some(entity) = self.terminals.get(&terminal_id) {
911 entity.update(cx, |term, cx| {
912 term.inner().update(cx, |inner, cx| {
913 inner.breadcrumb_text = title;
914 cx.emit(::terminal::Event::BreadcrumbsChanged);
915 })
916 });
917 }
918 }
919 TerminalProviderEvent::Exit {
920 terminal_id,
921 status,
922 } => {
923 if let Some(entity) = self.terminals.get(&terminal_id) {
924 entity.update(cx, |_term, cx| {
925 cx.notify();
926 });
927 } else {
928 self.pending_terminal_exit.insert(terminal_id, status);
929 }
930 }
931 }
932 }
933}
934
935#[derive(PartialEq, Eq, Debug)]
936pub enum ThreadStatus {
937 Idle,
938 Generating,
939}
940
941#[derive(Debug, Clone)]
942pub enum LoadError {
943 Unsupported {
944 command: SharedString,
945 current_version: SharedString,
946 minimum_version: SharedString,
947 },
948 FailedToInstall(SharedString),
949 Exited {
950 status: ExitStatus,
951 },
952 Other(SharedString),
953}
954
955impl Display for LoadError {
956 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
957 match self {
958 LoadError::Unsupported {
959 command: path,
960 current_version,
961 minimum_version,
962 } => {
963 write!(
964 f,
965 "version {current_version} from {path} is not supported (need at least {minimum_version})"
966 )
967 }
968 LoadError::FailedToInstall(msg) => write!(f, "Failed to install: {msg}"),
969 LoadError::Exited { status } => write!(f, "Server exited with status {status}"),
970 LoadError::Other(msg) => write!(f, "{msg}"),
971 }
972 }
973}
974
975impl Error for LoadError {}
976
977impl AcpThread {
978 pub fn new(
979 title: impl Into<SharedString>,
980 connection: Rc<dyn AgentConnection>,
981 project: Entity<Project>,
982 action_log: Entity<ActionLog>,
983 session_id: acp::SessionId,
984 mut prompt_capabilities_rx: watch::Receiver<acp::PromptCapabilities>,
985 cx: &mut Context<Self>,
986 ) -> Self {
987 let prompt_capabilities = prompt_capabilities_rx.borrow().clone();
988 let task = cx.spawn::<_, anyhow::Result<()>>(async move |this, cx| {
989 loop {
990 let caps = prompt_capabilities_rx.recv().await?;
991 this.update(cx, |this, cx| {
992 this.prompt_capabilities = caps;
993 cx.emit(AcpThreadEvent::PromptCapabilitiesUpdated);
994 })?;
995 }
996 });
997
998 Self {
999 action_log,
1000 shared_buffers: Default::default(),
1001 entries: Default::default(),
1002 plan: Default::default(),
1003 title: title.into(),
1004 project,
1005 send_task: None,
1006 connection,
1007 session_id,
1008 token_usage: None,
1009 prompt_capabilities,
1010 _observe_prompt_capabilities: task,
1011 terminals: HashMap::default(),
1012 pending_terminal_output: HashMap::default(),
1013 pending_terminal_exit: HashMap::default(),
1014 }
1015 }
1016
1017 pub fn prompt_capabilities(&self) -> acp::PromptCapabilities {
1018 self.prompt_capabilities.clone()
1019 }
1020
1021 pub fn connection(&self) -> &Rc<dyn AgentConnection> {
1022 &self.connection
1023 }
1024
1025 pub fn action_log(&self) -> &Entity<ActionLog> {
1026 &self.action_log
1027 }
1028
1029 pub fn project(&self) -> &Entity<Project> {
1030 &self.project
1031 }
1032
1033 pub fn title(&self) -> SharedString {
1034 self.title.clone()
1035 }
1036
1037 pub fn entries(&self) -> &[AgentThreadEntry] {
1038 &self.entries
1039 }
1040
1041 pub fn session_id(&self) -> &acp::SessionId {
1042 &self.session_id
1043 }
1044
1045 pub fn status(&self) -> ThreadStatus {
1046 if self.send_task.is_some() {
1047 ThreadStatus::Generating
1048 } else {
1049 ThreadStatus::Idle
1050 }
1051 }
1052
1053 pub fn token_usage(&self) -> Option<&TokenUsage> {
1054 self.token_usage.as_ref()
1055 }
1056
1057 pub fn has_pending_edit_tool_calls(&self) -> bool {
1058 for entry in self.entries.iter().rev() {
1059 match entry {
1060 AgentThreadEntry::UserMessage(_) => return false,
1061 AgentThreadEntry::ToolCall(
1062 call @ ToolCall {
1063 status: ToolCallStatus::InProgress | ToolCallStatus::Pending,
1064 ..
1065 },
1066 ) if call.diffs().next().is_some() => {
1067 return true;
1068 }
1069 AgentThreadEntry::ToolCall(_) | AgentThreadEntry::AssistantMessage(_) => {}
1070 }
1071 }
1072
1073 false
1074 }
1075
1076 pub fn used_tools_since_last_user_message(&self) -> bool {
1077 for entry in self.entries.iter().rev() {
1078 match entry {
1079 AgentThreadEntry::UserMessage(..) => return false,
1080 AgentThreadEntry::AssistantMessage(..) => continue,
1081 AgentThreadEntry::ToolCall(..) => return true,
1082 }
1083 }
1084
1085 false
1086 }
1087
1088 pub fn handle_session_update(
1089 &mut self,
1090 update: acp::SessionUpdate,
1091 cx: &mut Context<Self>,
1092 ) -> Result<(), acp::Error> {
1093 match update {
1094 acp::SessionUpdate::UserMessageChunk { content } => {
1095 self.push_user_content_block(None, content, cx);
1096 }
1097 acp::SessionUpdate::AgentMessageChunk { content } => {
1098 self.push_assistant_content_block(content, false, cx);
1099 }
1100 acp::SessionUpdate::AgentThoughtChunk { content } => {
1101 self.push_assistant_content_block(content, true, cx);
1102 }
1103 acp::SessionUpdate::ToolCall(tool_call) => {
1104 self.upsert_tool_call(tool_call, cx)?;
1105 }
1106 acp::SessionUpdate::ToolCallUpdate(tool_call_update) => {
1107 self.update_tool_call(tool_call_update, cx)?;
1108 }
1109 acp::SessionUpdate::Plan(plan) => {
1110 self.update_plan(plan, cx);
1111 }
1112 acp::SessionUpdate::AvailableCommandsUpdate { available_commands } => {
1113 cx.emit(AcpThreadEvent::AvailableCommandsUpdated(available_commands))
1114 }
1115 acp::SessionUpdate::CurrentModeUpdate { current_mode_id } => {
1116 cx.emit(AcpThreadEvent::ModeUpdated(current_mode_id))
1117 }
1118 }
1119 Ok(())
1120 }
1121
1122 pub fn push_user_content_block(
1123 &mut self,
1124 message_id: Option<UserMessageId>,
1125 chunk: acp::ContentBlock,
1126 cx: &mut Context<Self>,
1127 ) {
1128 let language_registry = self.project.read(cx).languages().clone();
1129 let entries_len = self.entries.len();
1130
1131 if let Some(last_entry) = self.entries.last_mut()
1132 && let AgentThreadEntry::UserMessage(UserMessage {
1133 id,
1134 content,
1135 chunks,
1136 ..
1137 }) = last_entry
1138 {
1139 *id = message_id.or(id.take());
1140 content.append(chunk.clone(), &language_registry, cx);
1141 chunks.push(chunk);
1142 let idx = entries_len - 1;
1143 cx.emit(AcpThreadEvent::EntryUpdated(idx));
1144 } else {
1145 let content = ContentBlock::new(chunk.clone(), &language_registry, cx);
1146 self.push_entry(
1147 AgentThreadEntry::UserMessage(UserMessage {
1148 id: message_id,
1149 content,
1150 chunks: vec![chunk],
1151 checkpoint: None,
1152 }),
1153 cx,
1154 );
1155 }
1156 }
1157
1158 pub fn push_assistant_content_block(
1159 &mut self,
1160 chunk: acp::ContentBlock,
1161 is_thought: bool,
1162 cx: &mut Context<Self>,
1163 ) {
1164 let language_registry = self.project.read(cx).languages().clone();
1165 let entries_len = self.entries.len();
1166 if let Some(last_entry) = self.entries.last_mut()
1167 && let AgentThreadEntry::AssistantMessage(AssistantMessage { chunks }) = last_entry
1168 {
1169 let idx = entries_len - 1;
1170 cx.emit(AcpThreadEvent::EntryUpdated(idx));
1171 match (chunks.last_mut(), is_thought) {
1172 (Some(AssistantMessageChunk::Message { block }), false)
1173 | (Some(AssistantMessageChunk::Thought { block }), true) => {
1174 block.append(chunk, &language_registry, cx)
1175 }
1176 _ => {
1177 let block = ContentBlock::new(chunk, &language_registry, cx);
1178 if is_thought {
1179 chunks.push(AssistantMessageChunk::Thought { block })
1180 } else {
1181 chunks.push(AssistantMessageChunk::Message { block })
1182 }
1183 }
1184 }
1185 } else {
1186 let block = ContentBlock::new(chunk, &language_registry, cx);
1187 let chunk = if is_thought {
1188 AssistantMessageChunk::Thought { block }
1189 } else {
1190 AssistantMessageChunk::Message { block }
1191 };
1192
1193 self.push_entry(
1194 AgentThreadEntry::AssistantMessage(AssistantMessage {
1195 chunks: vec![chunk],
1196 }),
1197 cx,
1198 );
1199 }
1200 }
1201
1202 fn push_entry(&mut self, entry: AgentThreadEntry, cx: &mut Context<Self>) {
1203 self.entries.push(entry);
1204 cx.emit(AcpThreadEvent::NewEntry);
1205 }
1206
1207 pub fn can_set_title(&mut self, cx: &mut Context<Self>) -> bool {
1208 self.connection.set_title(&self.session_id, cx).is_some()
1209 }
1210
1211 pub fn set_title(&mut self, title: SharedString, cx: &mut Context<Self>) -> Task<Result<()>> {
1212 if title != self.title {
1213 self.title = title.clone();
1214 cx.emit(AcpThreadEvent::TitleUpdated);
1215 if let Some(set_title) = self.connection.set_title(&self.session_id, cx) {
1216 return set_title.run(title, cx);
1217 }
1218 }
1219 Task::ready(Ok(()))
1220 }
1221
1222 pub fn update_token_usage(&mut self, usage: Option<TokenUsage>, cx: &mut Context<Self>) {
1223 self.token_usage = usage;
1224 cx.emit(AcpThreadEvent::TokenUsageUpdated);
1225 }
1226
1227 pub fn update_retry_status(&mut self, status: RetryStatus, cx: &mut Context<Self>) {
1228 cx.emit(AcpThreadEvent::Retry(status));
1229 }
1230
1231 pub fn update_tool_call(
1232 &mut self,
1233 update: impl Into<ToolCallUpdate>,
1234 cx: &mut Context<Self>,
1235 ) -> Result<()> {
1236 let update = update.into();
1237 let languages = self.project.read(cx).languages().clone();
1238
1239 let ix = match self.index_for_tool_call(update.id()) {
1240 Some(ix) => ix,
1241 None => {
1242 // Tool call not found - create a failed tool call entry
1243 let failed_tool_call = ToolCall {
1244 id: update.id().clone(),
1245 label: cx.new(|cx| Markdown::new("Tool call not found".into(), None, None, cx)),
1246 kind: acp::ToolKind::Fetch,
1247 content: vec![ToolCallContent::ContentBlock(ContentBlock::new(
1248 acp::ContentBlock::Text(acp::TextContent {
1249 text: "Tool call not found".to_string(),
1250 annotations: None,
1251 meta: None,
1252 }),
1253 &languages,
1254 cx,
1255 ))],
1256 status: ToolCallStatus::Failed,
1257 locations: Vec::new(),
1258 resolved_locations: Vec::new(),
1259 raw_input: None,
1260 raw_output: None,
1261 };
1262 self.push_entry(AgentThreadEntry::ToolCall(failed_tool_call), cx);
1263 return Ok(());
1264 }
1265 };
1266 let AgentThreadEntry::ToolCall(call) = &mut self.entries[ix] else {
1267 unreachable!()
1268 };
1269
1270 match update {
1271 ToolCallUpdate::UpdateFields(update) => {
1272 let location_updated = update.fields.locations.is_some();
1273 call.update_fields(update.fields, languages, &self.terminals, cx)?;
1274 if location_updated {
1275 self.resolve_locations(update.id, cx);
1276 }
1277 }
1278 ToolCallUpdate::UpdateDiff(update) => {
1279 call.content.clear();
1280 call.content.push(ToolCallContent::Diff(update.diff));
1281 }
1282 ToolCallUpdate::UpdateTerminal(update) => {
1283 call.content.clear();
1284 call.content
1285 .push(ToolCallContent::Terminal(update.terminal));
1286 }
1287 }
1288
1289 cx.emit(AcpThreadEvent::EntryUpdated(ix));
1290
1291 Ok(())
1292 }
1293
1294 /// Updates a tool call if id matches an existing entry, otherwise inserts a new one.
1295 pub fn upsert_tool_call(
1296 &mut self,
1297 tool_call: acp::ToolCall,
1298 cx: &mut Context<Self>,
1299 ) -> Result<(), acp::Error> {
1300 let status = tool_call.status.into();
1301 self.upsert_tool_call_inner(tool_call.into(), status, cx)
1302 }
1303
1304 /// Fails if id does not match an existing entry.
1305 pub fn upsert_tool_call_inner(
1306 &mut self,
1307 update: acp::ToolCallUpdate,
1308 status: ToolCallStatus,
1309 cx: &mut Context<Self>,
1310 ) -> Result<(), acp::Error> {
1311 let language_registry = self.project.read(cx).languages().clone();
1312 let id = update.id.clone();
1313
1314 if let Some(ix) = self.index_for_tool_call(&id) {
1315 let AgentThreadEntry::ToolCall(call) = &mut self.entries[ix] else {
1316 unreachable!()
1317 };
1318
1319 call.update_fields(update.fields, language_registry, &self.terminals, cx)?;
1320 call.status = status;
1321
1322 cx.emit(AcpThreadEvent::EntryUpdated(ix));
1323 } else {
1324 let call = ToolCall::from_acp(
1325 update.try_into()?,
1326 status,
1327 language_registry,
1328 &self.terminals,
1329 cx,
1330 )?;
1331 self.push_entry(AgentThreadEntry::ToolCall(call), cx);
1332 };
1333
1334 self.resolve_locations(id, cx);
1335 Ok(())
1336 }
1337
1338 fn index_for_tool_call(&self, id: &acp::ToolCallId) -> Option<usize> {
1339 self.entries
1340 .iter()
1341 .enumerate()
1342 .rev()
1343 .find_map(|(index, entry)| {
1344 if let AgentThreadEntry::ToolCall(tool_call) = entry
1345 && &tool_call.id == id
1346 {
1347 Some(index)
1348 } else {
1349 None
1350 }
1351 })
1352 }
1353
1354 fn tool_call_mut(&mut self, id: &acp::ToolCallId) -> Option<(usize, &mut ToolCall)> {
1355 // The tool call we are looking for is typically the last one, or very close to the end.
1356 // At the moment, it doesn't seem like a hashmap would be a good fit for this use case.
1357 self.entries
1358 .iter_mut()
1359 .enumerate()
1360 .rev()
1361 .find_map(|(index, tool_call)| {
1362 if let AgentThreadEntry::ToolCall(tool_call) = tool_call
1363 && &tool_call.id == id
1364 {
1365 Some((index, tool_call))
1366 } else {
1367 None
1368 }
1369 })
1370 }
1371
1372 pub fn tool_call(&mut self, id: &acp::ToolCallId) -> Option<(usize, &ToolCall)> {
1373 self.entries
1374 .iter()
1375 .enumerate()
1376 .rev()
1377 .find_map(|(index, tool_call)| {
1378 if let AgentThreadEntry::ToolCall(tool_call) = tool_call
1379 && &tool_call.id == id
1380 {
1381 Some((index, tool_call))
1382 } else {
1383 None
1384 }
1385 })
1386 }
1387
1388 pub fn resolve_locations(&mut self, id: acp::ToolCallId, cx: &mut Context<Self>) {
1389 let project = self.project.clone();
1390 let Some((_, tool_call)) = self.tool_call_mut(&id) else {
1391 return;
1392 };
1393 let task = tool_call.resolve_locations(project, cx);
1394 cx.spawn(async move |this, cx| {
1395 let resolved_locations = task.await;
1396 this.update(cx, |this, cx| {
1397 let project = this.project.clone();
1398 let Some((ix, tool_call)) = this.tool_call_mut(&id) else {
1399 return;
1400 };
1401 if let Some(Some(location)) = resolved_locations.last() {
1402 project.update(cx, |project, cx| {
1403 if let Some(agent_location) = project.agent_location() {
1404 let should_ignore = agent_location.buffer == location.buffer
1405 && location
1406 .buffer
1407 .update(cx, |buffer, _| {
1408 let snapshot = buffer.snapshot();
1409 let old_position =
1410 agent_location.position.to_point(&snapshot);
1411 let new_position = location.position.to_point(&snapshot);
1412 // ignore this so that when we get updates from the edit tool
1413 // the position doesn't reset to the startof line
1414 old_position.row == new_position.row
1415 && old_position.column > new_position.column
1416 })
1417 .ok()
1418 .unwrap_or_default();
1419 if !should_ignore {
1420 project.set_agent_location(Some(location.clone()), cx);
1421 }
1422 }
1423 });
1424 }
1425 if tool_call.resolved_locations != resolved_locations {
1426 tool_call.resolved_locations = resolved_locations;
1427 cx.emit(AcpThreadEvent::EntryUpdated(ix));
1428 }
1429 })
1430 })
1431 .detach();
1432 }
1433
1434 pub fn request_tool_call_authorization(
1435 &mut self,
1436 tool_call: acp::ToolCallUpdate,
1437 options: Vec<acp::PermissionOption>,
1438 respect_always_allow_setting: bool,
1439 cx: &mut Context<Self>,
1440 ) -> Result<BoxFuture<'static, acp::RequestPermissionOutcome>> {
1441 let (tx, rx) = oneshot::channel();
1442
1443 if respect_always_allow_setting && AgentSettings::get_global(cx).always_allow_tool_actions {
1444 // Don't use AllowAlways, because then if you were to turn off always_allow_tool_actions,
1445 // some tools would (incorrectly) continue to auto-accept.
1446 if let Some(allow_once_option) = options.iter().find_map(|option| {
1447 if matches!(option.kind, acp::PermissionOptionKind::AllowOnce) {
1448 Some(option.id.clone())
1449 } else {
1450 None
1451 }
1452 }) {
1453 self.upsert_tool_call_inner(tool_call, ToolCallStatus::Pending, cx)?;
1454 return Ok(async {
1455 acp::RequestPermissionOutcome::Selected {
1456 option_id: allow_once_option,
1457 }
1458 }
1459 .boxed());
1460 }
1461 }
1462
1463 let status = ToolCallStatus::WaitingForConfirmation {
1464 options,
1465 respond_tx: tx,
1466 };
1467
1468 self.upsert_tool_call_inner(tool_call, status, cx)?;
1469 cx.emit(AcpThreadEvent::ToolAuthorizationRequired);
1470
1471 let fut = async {
1472 match rx.await {
1473 Ok(option) => acp::RequestPermissionOutcome::Selected { option_id: option },
1474 Err(oneshot::Canceled) => acp::RequestPermissionOutcome::Cancelled,
1475 }
1476 }
1477 .boxed();
1478
1479 Ok(fut)
1480 }
1481
1482 pub fn authorize_tool_call(
1483 &mut self,
1484 id: acp::ToolCallId,
1485 option_id: acp::PermissionOptionId,
1486 option_kind: acp::PermissionOptionKind,
1487 cx: &mut Context<Self>,
1488 ) {
1489 let Some((ix, call)) = self.tool_call_mut(&id) else {
1490 return;
1491 };
1492
1493 let new_status = match option_kind {
1494 acp::PermissionOptionKind::RejectOnce | acp::PermissionOptionKind::RejectAlways => {
1495 ToolCallStatus::Rejected
1496 }
1497 acp::PermissionOptionKind::AllowOnce | acp::PermissionOptionKind::AllowAlways => {
1498 ToolCallStatus::InProgress
1499 }
1500 };
1501
1502 let curr_status = mem::replace(&mut call.status, new_status);
1503
1504 if let ToolCallStatus::WaitingForConfirmation { respond_tx, .. } = curr_status {
1505 respond_tx.send(option_id).log_err();
1506 } else if cfg!(debug_assertions) {
1507 panic!("tried to authorize an already authorized tool call");
1508 }
1509
1510 cx.emit(AcpThreadEvent::EntryUpdated(ix));
1511 }
1512
1513 pub fn first_tool_awaiting_confirmation(&self) -> Option<&ToolCall> {
1514 let mut first_tool_call = None;
1515
1516 for entry in self.entries.iter().rev() {
1517 match &entry {
1518 AgentThreadEntry::ToolCall(call) => {
1519 if let ToolCallStatus::WaitingForConfirmation { .. } = call.status {
1520 first_tool_call = Some(call);
1521 } else {
1522 continue;
1523 }
1524 }
1525 AgentThreadEntry::UserMessage(_) | AgentThreadEntry::AssistantMessage(_) => {
1526 // Reached the beginning of the turn.
1527 // If we had pending permission requests in the previous turn, they have been cancelled.
1528 break;
1529 }
1530 }
1531 }
1532
1533 first_tool_call
1534 }
1535
1536 pub fn plan(&self) -> &Plan {
1537 &self.plan
1538 }
1539
1540 pub fn update_plan(&mut self, request: acp::Plan, cx: &mut Context<Self>) {
1541 let new_entries_len = request.entries.len();
1542 let mut new_entries = request.entries.into_iter();
1543
1544 // Reuse existing markdown to prevent flickering
1545 for (old, new) in self.plan.entries.iter_mut().zip(new_entries.by_ref()) {
1546 let PlanEntry {
1547 content,
1548 priority,
1549 status,
1550 } = old;
1551 content.update(cx, |old, cx| {
1552 old.replace(new.content, cx);
1553 });
1554 *priority = new.priority;
1555 *status = new.status;
1556 }
1557 for new in new_entries {
1558 self.plan.entries.push(PlanEntry::from_acp(new, cx))
1559 }
1560 self.plan.entries.truncate(new_entries_len);
1561
1562 cx.notify();
1563 }
1564
1565 fn clear_completed_plan_entries(&mut self, cx: &mut Context<Self>) {
1566 self.plan
1567 .entries
1568 .retain(|entry| !matches!(entry.status, acp::PlanEntryStatus::Completed));
1569 cx.notify();
1570 }
1571
1572 #[cfg(any(test, feature = "test-support"))]
1573 pub fn send_raw(
1574 &mut self,
1575 message: &str,
1576 cx: &mut Context<Self>,
1577 ) -> BoxFuture<'static, Result<()>> {
1578 self.send(
1579 vec![acp::ContentBlock::Text(acp::TextContent {
1580 text: message.to_string(),
1581 annotations: None,
1582 meta: None,
1583 })],
1584 cx,
1585 )
1586 }
1587
1588 pub fn send(
1589 &mut self,
1590 message: Vec<acp::ContentBlock>,
1591 cx: &mut Context<Self>,
1592 ) -> BoxFuture<'static, Result<()>> {
1593 let block = ContentBlock::new_combined(
1594 message.clone(),
1595 self.project.read(cx).languages().clone(),
1596 cx,
1597 );
1598 let request = acp::PromptRequest {
1599 prompt: message.clone(),
1600 session_id: self.session_id.clone(),
1601 meta: None,
1602 };
1603 let git_store = self.project.read(cx).git_store().clone();
1604
1605 let message_id = if self.connection.truncate(&self.session_id, cx).is_some() {
1606 Some(UserMessageId::new())
1607 } else {
1608 None
1609 };
1610
1611 self.run_turn(cx, async move |this, cx| {
1612 this.update(cx, |this, cx| {
1613 this.push_entry(
1614 AgentThreadEntry::UserMessage(UserMessage {
1615 id: message_id.clone(),
1616 content: block,
1617 chunks: message,
1618 checkpoint: None,
1619 }),
1620 cx,
1621 );
1622 })
1623 .ok();
1624
1625 let old_checkpoint = git_store
1626 .update(cx, |git, cx| git.checkpoint(cx))?
1627 .await
1628 .context("failed to get old checkpoint")
1629 .log_err();
1630 this.update(cx, |this, cx| {
1631 if let Some((_ix, message)) = this.last_user_message() {
1632 message.checkpoint = old_checkpoint.map(|git_checkpoint| Checkpoint {
1633 git_checkpoint,
1634 show: false,
1635 });
1636 }
1637 this.connection.prompt(message_id, request, cx)
1638 })?
1639 .await
1640 })
1641 }
1642
1643 pub fn can_resume(&self, cx: &App) -> bool {
1644 self.connection.resume(&self.session_id, cx).is_some()
1645 }
1646
1647 pub fn resume(&mut self, cx: &mut Context<Self>) -> BoxFuture<'static, Result<()>> {
1648 self.run_turn(cx, async move |this, cx| {
1649 this.update(cx, |this, cx| {
1650 this.connection
1651 .resume(&this.session_id, cx)
1652 .map(|resume| resume.run(cx))
1653 })?
1654 .context("resuming a session is not supported")?
1655 .await
1656 })
1657 }
1658
1659 fn run_turn(
1660 &mut self,
1661 cx: &mut Context<Self>,
1662 f: impl 'static + AsyncFnOnce(WeakEntity<Self>, &mut AsyncApp) -> Result<acp::PromptResponse>,
1663 ) -> BoxFuture<'static, Result<()>> {
1664 self.clear_completed_plan_entries(cx);
1665
1666 let (tx, rx) = oneshot::channel();
1667 let cancel_task = self.cancel(cx);
1668
1669 self.send_task = Some(cx.spawn(async move |this, cx| {
1670 cancel_task.await;
1671 tx.send(f(this, cx).await).ok();
1672 }));
1673
1674 cx.spawn(async move |this, cx| {
1675 let response = rx.await;
1676
1677 this.update(cx, |this, cx| this.update_last_checkpoint(cx))?
1678 .await?;
1679
1680 this.update(cx, |this, cx| {
1681 this.project
1682 .update(cx, |project, cx| project.set_agent_location(None, cx));
1683 match response {
1684 Ok(Err(e)) => {
1685 this.send_task.take();
1686 cx.emit(AcpThreadEvent::Error);
1687 Err(e)
1688 }
1689 result => {
1690 let canceled = matches!(
1691 result,
1692 Ok(Ok(acp::PromptResponse {
1693 stop_reason: acp::StopReason::Cancelled,
1694 meta: None,
1695 }))
1696 );
1697
1698 // We only take the task if the current prompt wasn't canceled.
1699 //
1700 // This prompt may have been canceled because another one was sent
1701 // while it was still generating. In these cases, dropping `send_task`
1702 // would cause the next generation to be canceled.
1703 if !canceled {
1704 this.send_task.take();
1705 }
1706
1707 // Handle refusal - distinguish between user prompt and tool call refusals
1708 if let Ok(Ok(acp::PromptResponse {
1709 stop_reason: acp::StopReason::Refusal,
1710 meta: _,
1711 })) = result
1712 {
1713 if let Some((user_msg_ix, _)) = this.last_user_message() {
1714 // Check if there's a completed tool call with results after the last user message
1715 // This indicates the refusal is in response to tool output, not the user's prompt
1716 let has_completed_tool_call_after_user_msg =
1717 this.entries.iter().skip(user_msg_ix + 1).any(|entry| {
1718 if let AgentThreadEntry::ToolCall(tool_call) = entry {
1719 // Check if the tool call has completed and has output
1720 matches!(tool_call.status, ToolCallStatus::Completed)
1721 && tool_call.raw_output.is_some()
1722 } else {
1723 false
1724 }
1725 });
1726
1727 if has_completed_tool_call_after_user_msg {
1728 // Refusal is due to tool output - don't truncate, just notify
1729 // The model refused based on what the tool returned
1730 cx.emit(AcpThreadEvent::Refusal);
1731 } else {
1732 // User prompt was refused - truncate back to before the user message
1733 let range = user_msg_ix..this.entries.len();
1734 if range.start < range.end {
1735 this.entries.truncate(user_msg_ix);
1736 cx.emit(AcpThreadEvent::EntriesRemoved(range));
1737 }
1738 cx.emit(AcpThreadEvent::Refusal);
1739 }
1740 } else {
1741 // No user message found, treat as general refusal
1742 cx.emit(AcpThreadEvent::Refusal);
1743 }
1744 }
1745
1746 cx.emit(AcpThreadEvent::Stopped);
1747 Ok(())
1748 }
1749 }
1750 })?
1751 })
1752 .boxed()
1753 }
1754
1755 pub fn cancel(&mut self, cx: &mut Context<Self>) -> Task<()> {
1756 let Some(send_task) = self.send_task.take() else {
1757 return Task::ready(());
1758 };
1759
1760 for entry in self.entries.iter_mut() {
1761 if let AgentThreadEntry::ToolCall(call) = entry {
1762 let cancel = matches!(
1763 call.status,
1764 ToolCallStatus::Pending
1765 | ToolCallStatus::WaitingForConfirmation { .. }
1766 | ToolCallStatus::InProgress
1767 );
1768
1769 if cancel {
1770 call.status = ToolCallStatus::Canceled;
1771 }
1772 }
1773 }
1774
1775 self.connection.cancel(&self.session_id, cx);
1776
1777 // Wait for the send task to complete
1778 cx.foreground_executor().spawn(send_task)
1779 }
1780
1781 /// Restores the git working tree to the state at the given checkpoint (if one exists)
1782 pub fn restore_checkpoint(
1783 &mut self,
1784 id: UserMessageId,
1785 cx: &mut Context<Self>,
1786 ) -> Task<Result<()>> {
1787 let Some((_, message)) = self.user_message_mut(&id) else {
1788 return Task::ready(Err(anyhow!("message not found")));
1789 };
1790
1791 let checkpoint = message
1792 .checkpoint
1793 .as_ref()
1794 .map(|c| c.git_checkpoint.clone());
1795 let rewind = self.rewind(id.clone(), cx);
1796 let git_store = self.project.read(cx).git_store().clone();
1797
1798 cx.spawn(async move |_, cx| {
1799 rewind.await?;
1800 if let Some(checkpoint) = checkpoint {
1801 git_store
1802 .update(cx, |git, cx| git.restore_checkpoint(checkpoint, cx))?
1803 .await?;
1804 }
1805
1806 Ok(())
1807 })
1808 }
1809
1810 /// Rewinds this thread to before the entry at `index`, removing it and all
1811 /// subsequent entries while rejecting any action_log changes made from that point.
1812 /// Unlike `restore_checkpoint`, this method does not restore from git.
1813 pub fn rewind(&mut self, id: UserMessageId, cx: &mut Context<Self>) -> Task<Result<()>> {
1814 let Some(truncate) = self.connection.truncate(&self.session_id, cx) else {
1815 return Task::ready(Err(anyhow!("not supported")));
1816 };
1817
1818 cx.spawn(async move |this, cx| {
1819 cx.update(|cx| truncate.run(id.clone(), cx))?.await?;
1820 this.update(cx, |this, cx| {
1821 if let Some((ix, _)) = this.user_message_mut(&id) {
1822 let range = ix..this.entries.len();
1823 this.entries.truncate(ix);
1824 cx.emit(AcpThreadEvent::EntriesRemoved(range));
1825 }
1826 this.action_log()
1827 .update(cx, |action_log, cx| action_log.reject_all_edits(cx))
1828 })?
1829 .await;
1830 Ok(())
1831 })
1832 }
1833
1834 fn update_last_checkpoint(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
1835 let git_store = self.project.read(cx).git_store().clone();
1836
1837 let old_checkpoint = if let Some((_, message)) = self.last_user_message() {
1838 if let Some(checkpoint) = message.checkpoint.as_ref() {
1839 checkpoint.git_checkpoint.clone()
1840 } else {
1841 return Task::ready(Ok(()));
1842 }
1843 } else {
1844 return Task::ready(Ok(()));
1845 };
1846
1847 let new_checkpoint = git_store.update(cx, |git, cx| git.checkpoint(cx));
1848 cx.spawn(async move |this, cx| {
1849 let new_checkpoint = new_checkpoint
1850 .await
1851 .context("failed to get new checkpoint")
1852 .log_err();
1853 if let Some(new_checkpoint) = new_checkpoint {
1854 let equal = git_store
1855 .update(cx, |git, cx| {
1856 git.compare_checkpoints(old_checkpoint.clone(), new_checkpoint, cx)
1857 })?
1858 .await
1859 .unwrap_or(true);
1860 this.update(cx, |this, cx| {
1861 let (ix, message) = this.last_user_message().context("no user message")?;
1862 let checkpoint = message.checkpoint.as_mut().context("no checkpoint")?;
1863 checkpoint.show = !equal;
1864 cx.emit(AcpThreadEvent::EntryUpdated(ix));
1865 anyhow::Ok(())
1866 })??;
1867 }
1868
1869 Ok(())
1870 })
1871 }
1872
1873 fn last_user_message(&mut self) -> Option<(usize, &mut UserMessage)> {
1874 self.entries
1875 .iter_mut()
1876 .enumerate()
1877 .rev()
1878 .find_map(|(ix, entry)| {
1879 if let AgentThreadEntry::UserMessage(message) = entry {
1880 Some((ix, message))
1881 } else {
1882 None
1883 }
1884 })
1885 }
1886
1887 fn user_message_mut(&mut self, id: &UserMessageId) -> Option<(usize, &mut UserMessage)> {
1888 self.entries.iter_mut().enumerate().find_map(|(ix, entry)| {
1889 if let AgentThreadEntry::UserMessage(message) = entry {
1890 if message.id.as_ref() == Some(id) {
1891 Some((ix, message))
1892 } else {
1893 None
1894 }
1895 } else {
1896 None
1897 }
1898 })
1899 }
1900
1901 pub fn read_text_file(
1902 &self,
1903 path: PathBuf,
1904 line: Option<u32>,
1905 limit: Option<u32>,
1906 reuse_shared_snapshot: bool,
1907 cx: &mut Context<Self>,
1908 ) -> Task<Result<String, acp::Error>> {
1909 // Args are 1-based, move to 0-based
1910 let line = line.unwrap_or_default().saturating_sub(1);
1911 let limit = limit.unwrap_or(u32::MAX);
1912 let project = self.project.clone();
1913 let action_log = self.action_log.clone();
1914 cx.spawn(async move |this, cx| {
1915 let load = project
1916 .update(cx, |project, cx| {
1917 let path = project
1918 .project_path_for_absolute_path(&path, cx)
1919 .ok_or_else(|| {
1920 acp::Error::resource_not_found(Some(path.display().to_string()))
1921 })?;
1922 Ok(project.open_buffer(path, cx))
1923 })
1924 .map_err(|e| acp::Error::internal_error().with_data(e.to_string()))
1925 .flatten()?;
1926
1927 let buffer = load.await?;
1928
1929 let snapshot = if reuse_shared_snapshot {
1930 this.read_with(cx, |this, _| {
1931 this.shared_buffers.get(&buffer.clone()).cloned()
1932 })
1933 .log_err()
1934 .flatten()
1935 } else {
1936 None
1937 };
1938
1939 let snapshot = if let Some(snapshot) = snapshot {
1940 snapshot
1941 } else {
1942 action_log.update(cx, |action_log, cx| {
1943 action_log.buffer_read(buffer.clone(), cx);
1944 })?;
1945
1946 let snapshot = buffer.update(cx, |buffer, _| buffer.snapshot())?;
1947 this.update(cx, |this, _| {
1948 this.shared_buffers.insert(buffer.clone(), snapshot.clone());
1949 })?;
1950 snapshot
1951 };
1952
1953 let max_point = snapshot.max_point();
1954 let start_position = Point::new(line, 0);
1955
1956 if start_position > max_point {
1957 return Err(acp::Error::invalid_params().with_data(format!(
1958 "Attempting to read beyond the end of the file, line {}:{}",
1959 max_point.row + 1,
1960 max_point.column
1961 )));
1962 }
1963
1964 let start = snapshot.anchor_before(start_position);
1965 let end = snapshot.anchor_before(Point::new(line.saturating_add(limit), 0));
1966
1967 project.update(cx, |project, cx| {
1968 project.set_agent_location(
1969 Some(AgentLocation {
1970 buffer: buffer.downgrade(),
1971 position: start,
1972 }),
1973 cx,
1974 );
1975 })?;
1976
1977 Ok(snapshot.text_for_range(start..end).collect::<String>())
1978 })
1979 }
1980
1981 pub fn write_text_file(
1982 &self,
1983 path: PathBuf,
1984 content: String,
1985 cx: &mut Context<Self>,
1986 ) -> Task<Result<()>> {
1987 let project = self.project.clone();
1988 let action_log = self.action_log.clone();
1989 cx.spawn(async move |this, cx| {
1990 let load = project.update(cx, |project, cx| {
1991 let path = project
1992 .project_path_for_absolute_path(&path, cx)
1993 .context("invalid path")?;
1994 anyhow::Ok(project.open_buffer(path, cx))
1995 });
1996 let buffer = load??.await?;
1997 let snapshot = this.update(cx, |this, cx| {
1998 this.shared_buffers
1999 .get(&buffer)
2000 .cloned()
2001 .unwrap_or_else(|| buffer.read(cx).snapshot())
2002 })?;
2003 let edits = cx
2004 .background_executor()
2005 .spawn(async move {
2006 let old_text = snapshot.text();
2007 text_diff(old_text.as_str(), &content)
2008 .into_iter()
2009 .map(|(range, replacement)| {
2010 (
2011 snapshot.anchor_after(range.start)
2012 ..snapshot.anchor_before(range.end),
2013 replacement,
2014 )
2015 })
2016 .collect::<Vec<_>>()
2017 })
2018 .await;
2019
2020 project.update(cx, |project, cx| {
2021 project.set_agent_location(
2022 Some(AgentLocation {
2023 buffer: buffer.downgrade(),
2024 position: edits
2025 .last()
2026 .map(|(range, _)| range.end)
2027 .unwrap_or(Anchor::MIN),
2028 }),
2029 cx,
2030 );
2031 })?;
2032
2033 let format_on_save = cx.update(|cx| {
2034 action_log.update(cx, |action_log, cx| {
2035 action_log.buffer_read(buffer.clone(), cx);
2036 });
2037
2038 let format_on_save = buffer.update(cx, |buffer, cx| {
2039 buffer.edit(edits, None, cx);
2040
2041 let settings = language::language_settings::language_settings(
2042 buffer.language().map(|l| l.name()),
2043 buffer.file(),
2044 cx,
2045 );
2046
2047 settings.format_on_save != FormatOnSave::Off
2048 });
2049 action_log.update(cx, |action_log, cx| {
2050 action_log.buffer_edited(buffer.clone(), cx);
2051 });
2052 format_on_save
2053 })?;
2054
2055 if format_on_save {
2056 let format_task = project.update(cx, |project, cx| {
2057 project.format(
2058 HashSet::from_iter([buffer.clone()]),
2059 LspFormatTarget::Buffers,
2060 false,
2061 FormatTrigger::Save,
2062 cx,
2063 )
2064 })?;
2065 format_task.await.log_err();
2066
2067 action_log.update(cx, |action_log, cx| {
2068 action_log.buffer_edited(buffer.clone(), cx);
2069 })?;
2070 }
2071
2072 project
2073 .update(cx, |project, cx| project.save_buffer(buffer, cx))?
2074 .await
2075 })
2076 }
2077
2078 pub fn create_terminal(
2079 &self,
2080 command: String,
2081 args: Vec<String>,
2082 extra_env: Vec<acp::EnvVariable>,
2083 cwd: Option<PathBuf>,
2084 output_byte_limit: Option<u64>,
2085 cx: &mut Context<Self>,
2086 ) -> Task<Result<Entity<Terminal>>> {
2087 let env = match &cwd {
2088 Some(dir) => self.project.update(cx, |project, cx| {
2089 let worktree = project.find_worktree(dir.as_path(), cx);
2090 let shell = TerminalSettings::get(
2091 worktree.as_ref().map(|(worktree, path)| SettingsLocation {
2092 worktree_id: worktree.read(cx).id(),
2093 path: &path,
2094 }),
2095 cx,
2096 )
2097 .shell
2098 .clone();
2099 project.directory_environment(&shell, dir.as_path().into(), cx)
2100 }),
2101 None => Task::ready(None).shared(),
2102 };
2103 let env = cx.spawn(async move |_, _| {
2104 let mut env = env.await.unwrap_or_default();
2105 // Disables paging for `git` and hopefully other commands
2106 env.insert("PAGER".into(), "".into());
2107 for var in extra_env {
2108 env.insert(var.name, var.value);
2109 }
2110 env
2111 });
2112
2113 let project = self.project.clone();
2114 let language_registry = project.read(cx).languages().clone();
2115
2116 let terminal_id = acp::TerminalId(Uuid::new_v4().to_string().into());
2117 let terminal_task = cx.spawn({
2118 let terminal_id = terminal_id.clone();
2119 async move |_this, cx| {
2120 let env = env.await;
2121 let shell = project
2122 .update(cx, |project, cx| {
2123 project
2124 .remote_client()
2125 .and_then(|r| r.read(cx).default_system_shell())
2126 })?
2127 .unwrap_or_else(|| get_default_system_shell_preferring_bash());
2128 let (task_command, task_args) = ShellBuilder::new(&Shell::Program(shell))
2129 .redirect_stdin_to_dev_null()
2130 .build(Some(command.clone()), &args);
2131 let terminal = project
2132 .update(cx, |project, cx| {
2133 project.create_terminal_task(
2134 task::SpawnInTerminal {
2135 command: Some(task_command),
2136 args: task_args,
2137 cwd: cwd.clone(),
2138 env,
2139 ..Default::default()
2140 },
2141 cx,
2142 )
2143 })?
2144 .await?;
2145
2146 cx.new(|cx| {
2147 Terminal::new(
2148 terminal_id,
2149 &format!("{} {}", command, args.join(" ")),
2150 cwd,
2151 output_byte_limit.map(|l| l as usize),
2152 terminal,
2153 language_registry,
2154 cx,
2155 )
2156 })
2157 }
2158 });
2159
2160 cx.spawn(async move |this, cx| {
2161 let terminal = terminal_task.await?;
2162 this.update(cx, |this, _cx| {
2163 this.terminals.insert(terminal_id, terminal.clone());
2164 terminal
2165 })
2166 })
2167 }
2168
2169 pub fn kill_terminal(
2170 &mut self,
2171 terminal_id: acp::TerminalId,
2172 cx: &mut Context<Self>,
2173 ) -> Result<()> {
2174 self.terminals
2175 .get(&terminal_id)
2176 .context("Terminal not found")?
2177 .update(cx, |terminal, cx| {
2178 terminal.kill(cx);
2179 });
2180
2181 Ok(())
2182 }
2183
2184 pub fn release_terminal(
2185 &mut self,
2186 terminal_id: acp::TerminalId,
2187 cx: &mut Context<Self>,
2188 ) -> Result<()> {
2189 self.terminals
2190 .remove(&terminal_id)
2191 .context("Terminal not found")?
2192 .update(cx, |terminal, cx| {
2193 terminal.kill(cx);
2194 });
2195
2196 Ok(())
2197 }
2198
2199 pub fn terminal(&self, terminal_id: acp::TerminalId) -> Result<Entity<Terminal>> {
2200 self.terminals
2201 .get(&terminal_id)
2202 .context("Terminal not found")
2203 .cloned()
2204 }
2205
2206 pub fn to_markdown(&self, cx: &App) -> String {
2207 self.entries.iter().map(|e| e.to_markdown(cx)).collect()
2208 }
2209
2210 pub fn emit_load_error(&mut self, error: LoadError, cx: &mut Context<Self>) {
2211 cx.emit(AcpThreadEvent::LoadError(error));
2212 }
2213
2214 pub fn register_terminal_created(
2215 &mut self,
2216 terminal_id: acp::TerminalId,
2217 command_label: String,
2218 working_dir: Option<PathBuf>,
2219 output_byte_limit: Option<u64>,
2220 terminal: Entity<::terminal::Terminal>,
2221 cx: &mut Context<Self>,
2222 ) -> Entity<Terminal> {
2223 let language_registry = self.project.read(cx).languages().clone();
2224
2225 let entity = cx.new(|cx| {
2226 Terminal::new(
2227 terminal_id.clone(),
2228 &command_label,
2229 working_dir.clone(),
2230 output_byte_limit.map(|l| l as usize),
2231 terminal,
2232 language_registry,
2233 cx,
2234 )
2235 });
2236 self.terminals.insert(terminal_id.clone(), entity.clone());
2237 entity
2238 }
2239}
2240
2241fn markdown_for_raw_output(
2242 raw_output: &serde_json::Value,
2243 language_registry: &Arc<LanguageRegistry>,
2244 cx: &mut App,
2245) -> Option<Entity<Markdown>> {
2246 match raw_output {
2247 serde_json::Value::Null => None,
2248 serde_json::Value::Bool(value) => Some(cx.new(|cx| {
2249 Markdown::new(
2250 value.to_string().into(),
2251 Some(language_registry.clone()),
2252 None,
2253 cx,
2254 )
2255 })),
2256 serde_json::Value::Number(value) => Some(cx.new(|cx| {
2257 Markdown::new(
2258 value.to_string().into(),
2259 Some(language_registry.clone()),
2260 None,
2261 cx,
2262 )
2263 })),
2264 serde_json::Value::String(value) => Some(cx.new(|cx| {
2265 Markdown::new(
2266 value.clone().into(),
2267 Some(language_registry.clone()),
2268 None,
2269 cx,
2270 )
2271 })),
2272 value => Some(cx.new(|cx| {
2273 Markdown::new(
2274 format!("```json\n{}\n```", value).into(),
2275 Some(language_registry.clone()),
2276 None,
2277 cx,
2278 )
2279 })),
2280 }
2281}
2282
2283#[cfg(test)]
2284mod tests {
2285 use super::*;
2286 use anyhow::anyhow;
2287 use futures::{channel::mpsc, future::LocalBoxFuture, select};
2288 use gpui::{App, AsyncApp, TestAppContext, WeakEntity};
2289 use indoc::indoc;
2290 use project::{FakeFs, Fs};
2291 use rand::{distr, prelude::*};
2292 use serde_json::json;
2293 use settings::SettingsStore;
2294 use smol::stream::StreamExt as _;
2295 use std::{
2296 any::Any,
2297 cell::RefCell,
2298 path::Path,
2299 rc::Rc,
2300 sync::atomic::{AtomicBool, AtomicUsize, Ordering::SeqCst},
2301 time::Duration,
2302 };
2303 use util::path;
2304
2305 fn init_test(cx: &mut TestAppContext) {
2306 env_logger::try_init().ok();
2307 cx.update(|cx| {
2308 let settings_store = SettingsStore::test(cx);
2309 cx.set_global(settings_store);
2310 Project::init_settings(cx);
2311 language::init(cx);
2312 });
2313 }
2314
2315 #[gpui::test]
2316 async fn test_terminal_output_buffered_before_created_renders(cx: &mut gpui::TestAppContext) {
2317 init_test(cx);
2318
2319 let fs = FakeFs::new(cx.executor());
2320 let project = Project::test(fs, [], cx).await;
2321 let connection = Rc::new(FakeAgentConnection::new());
2322 let thread = cx
2323 .update(|cx| connection.new_thread(project, std::path::Path::new(path!("/test")), cx))
2324 .await
2325 .unwrap();
2326
2327 let terminal_id = acp::TerminalId(uuid::Uuid::new_v4().to_string().into());
2328
2329 // Send Output BEFORE Created - should be buffered by acp_thread
2330 thread.update(cx, |thread, cx| {
2331 thread.on_terminal_provider_event(
2332 TerminalProviderEvent::Output {
2333 terminal_id: terminal_id.clone(),
2334 data: b"hello buffered".to_vec(),
2335 },
2336 cx,
2337 );
2338 });
2339
2340 // Create a display-only terminal and then send Created
2341 let lower = cx.new(|cx| {
2342 let builder = ::terminal::TerminalBuilder::new_display_only(
2343 ::terminal::terminal_settings::CursorShape::default(),
2344 ::terminal::terminal_settings::AlternateScroll::On,
2345 None,
2346 0,
2347 )
2348 .unwrap();
2349 builder.subscribe(cx)
2350 });
2351
2352 thread.update(cx, |thread, cx| {
2353 thread.on_terminal_provider_event(
2354 TerminalProviderEvent::Created {
2355 terminal_id: terminal_id.clone(),
2356 label: "Buffered Test".to_string(),
2357 cwd: None,
2358 output_byte_limit: None,
2359 terminal: lower.clone(),
2360 },
2361 cx,
2362 );
2363 });
2364
2365 // After Created, buffered Output should have been flushed into the renderer
2366 let content = thread.read_with(cx, |thread, cx| {
2367 let term = thread.terminal(terminal_id.clone()).unwrap();
2368 term.read_with(cx, |t, cx| t.inner().read(cx).get_content())
2369 });
2370
2371 assert!(
2372 content.contains("hello buffered"),
2373 "expected buffered output to render, got: {content}"
2374 );
2375 }
2376
2377 #[gpui::test]
2378 async fn test_terminal_output_and_exit_buffered_before_created(cx: &mut gpui::TestAppContext) {
2379 init_test(cx);
2380
2381 let fs = FakeFs::new(cx.executor());
2382 let project = Project::test(fs, [], cx).await;
2383 let connection = Rc::new(FakeAgentConnection::new());
2384 let thread = cx
2385 .update(|cx| connection.new_thread(project, std::path::Path::new(path!("/test")), cx))
2386 .await
2387 .unwrap();
2388
2389 let terminal_id = acp::TerminalId(uuid::Uuid::new_v4().to_string().into());
2390
2391 // Send Output BEFORE Created
2392 thread.update(cx, |thread, cx| {
2393 thread.on_terminal_provider_event(
2394 TerminalProviderEvent::Output {
2395 terminal_id: terminal_id.clone(),
2396 data: b"pre-exit data".to_vec(),
2397 },
2398 cx,
2399 );
2400 });
2401
2402 // Send Exit BEFORE Created
2403 thread.update(cx, |thread, cx| {
2404 thread.on_terminal_provider_event(
2405 TerminalProviderEvent::Exit {
2406 terminal_id: terminal_id.clone(),
2407 status: acp::TerminalExitStatus {
2408 exit_code: Some(0),
2409 signal: None,
2410 meta: None,
2411 },
2412 },
2413 cx,
2414 );
2415 });
2416
2417 // Now create a display-only lower-level terminal and send Created
2418 let lower = cx.new(|cx| {
2419 let builder = ::terminal::TerminalBuilder::new_display_only(
2420 ::terminal::terminal_settings::CursorShape::default(),
2421 ::terminal::terminal_settings::AlternateScroll::On,
2422 None,
2423 0,
2424 )
2425 .unwrap();
2426 builder.subscribe(cx)
2427 });
2428
2429 thread.update(cx, |thread, cx| {
2430 thread.on_terminal_provider_event(
2431 TerminalProviderEvent::Created {
2432 terminal_id: terminal_id.clone(),
2433 label: "Buffered Exit Test".to_string(),
2434 cwd: None,
2435 output_byte_limit: None,
2436 terminal: lower.clone(),
2437 },
2438 cx,
2439 );
2440 });
2441
2442 // Output should be present after Created (flushed from buffer)
2443 let content = thread.read_with(cx, |thread, cx| {
2444 let term = thread.terminal(terminal_id.clone()).unwrap();
2445 term.read_with(cx, |t, cx| t.inner().read(cx).get_content())
2446 });
2447
2448 assert!(
2449 content.contains("pre-exit data"),
2450 "expected pre-exit data to render, got: {content}"
2451 );
2452 }
2453
2454 #[gpui::test]
2455 async fn test_push_user_content_block(cx: &mut gpui::TestAppContext) {
2456 init_test(cx);
2457
2458 let fs = FakeFs::new(cx.executor());
2459 let project = Project::test(fs, [], cx).await;
2460 let connection = Rc::new(FakeAgentConnection::new());
2461 let thread = cx
2462 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
2463 .await
2464 .unwrap();
2465
2466 // Test creating a new user message
2467 thread.update(cx, |thread, cx| {
2468 thread.push_user_content_block(
2469 None,
2470 acp::ContentBlock::Text(acp::TextContent {
2471 annotations: None,
2472 text: "Hello, ".to_string(),
2473 meta: None,
2474 }),
2475 cx,
2476 );
2477 });
2478
2479 thread.update(cx, |thread, cx| {
2480 assert_eq!(thread.entries.len(), 1);
2481 if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
2482 assert_eq!(user_msg.id, None);
2483 assert_eq!(user_msg.content.to_markdown(cx), "Hello, ");
2484 } else {
2485 panic!("Expected UserMessage");
2486 }
2487 });
2488
2489 // Test appending to existing user message
2490 let message_1_id = UserMessageId::new();
2491 thread.update(cx, |thread, cx| {
2492 thread.push_user_content_block(
2493 Some(message_1_id.clone()),
2494 acp::ContentBlock::Text(acp::TextContent {
2495 annotations: None,
2496 text: "world!".to_string(),
2497 meta: None,
2498 }),
2499 cx,
2500 );
2501 });
2502
2503 thread.update(cx, |thread, cx| {
2504 assert_eq!(thread.entries.len(), 1);
2505 if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
2506 assert_eq!(user_msg.id, Some(message_1_id));
2507 assert_eq!(user_msg.content.to_markdown(cx), "Hello, world!");
2508 } else {
2509 panic!("Expected UserMessage");
2510 }
2511 });
2512
2513 // Test creating new user message after assistant message
2514 thread.update(cx, |thread, cx| {
2515 thread.push_assistant_content_block(
2516 acp::ContentBlock::Text(acp::TextContent {
2517 annotations: None,
2518 text: "Assistant response".to_string(),
2519 meta: None,
2520 }),
2521 false,
2522 cx,
2523 );
2524 });
2525
2526 let message_2_id = UserMessageId::new();
2527 thread.update(cx, |thread, cx| {
2528 thread.push_user_content_block(
2529 Some(message_2_id.clone()),
2530 acp::ContentBlock::Text(acp::TextContent {
2531 annotations: None,
2532 text: "New user message".to_string(),
2533 meta: None,
2534 }),
2535 cx,
2536 );
2537 });
2538
2539 thread.update(cx, |thread, cx| {
2540 assert_eq!(thread.entries.len(), 3);
2541 if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[2] {
2542 assert_eq!(user_msg.id, Some(message_2_id));
2543 assert_eq!(user_msg.content.to_markdown(cx), "New user message");
2544 } else {
2545 panic!("Expected UserMessage at index 2");
2546 }
2547 });
2548 }
2549
2550 #[gpui::test]
2551 async fn test_thinking_concatenation(cx: &mut gpui::TestAppContext) {
2552 init_test(cx);
2553
2554 let fs = FakeFs::new(cx.executor());
2555 let project = Project::test(fs, [], cx).await;
2556 let connection = Rc::new(FakeAgentConnection::new().on_user_message(
2557 |_, thread, mut cx| {
2558 async move {
2559 thread.update(&mut cx, |thread, cx| {
2560 thread
2561 .handle_session_update(
2562 acp::SessionUpdate::AgentThoughtChunk {
2563 content: "Thinking ".into(),
2564 },
2565 cx,
2566 )
2567 .unwrap();
2568 thread
2569 .handle_session_update(
2570 acp::SessionUpdate::AgentThoughtChunk {
2571 content: "hard!".into(),
2572 },
2573 cx,
2574 )
2575 .unwrap();
2576 })?;
2577 Ok(acp::PromptResponse {
2578 stop_reason: acp::StopReason::EndTurn,
2579 meta: None,
2580 })
2581 }
2582 .boxed_local()
2583 },
2584 ));
2585
2586 let thread = cx
2587 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
2588 .await
2589 .unwrap();
2590
2591 thread
2592 .update(cx, |thread, cx| thread.send_raw("Hello from Zed!", cx))
2593 .await
2594 .unwrap();
2595
2596 let output = thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
2597 assert_eq!(
2598 output,
2599 indoc! {r#"
2600 ## User
2601
2602 Hello from Zed!
2603
2604 ## Assistant
2605
2606 <thinking>
2607 Thinking hard!
2608 </thinking>
2609
2610 "#}
2611 );
2612 }
2613
2614 #[gpui::test]
2615 async fn test_edits_concurrently_to_user(cx: &mut TestAppContext) {
2616 init_test(cx);
2617
2618 let fs = FakeFs::new(cx.executor());
2619 fs.insert_tree(path!("/tmp"), json!({"foo": "one\ntwo\nthree\n"}))
2620 .await;
2621 let project = Project::test(fs.clone(), [], cx).await;
2622 let (read_file_tx, read_file_rx) = oneshot::channel::<()>();
2623 let read_file_tx = Rc::new(RefCell::new(Some(read_file_tx)));
2624 let connection = Rc::new(FakeAgentConnection::new().on_user_message(
2625 move |_, thread, mut cx| {
2626 let read_file_tx = read_file_tx.clone();
2627 async move {
2628 let content = thread
2629 .update(&mut cx, |thread, cx| {
2630 thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
2631 })
2632 .unwrap()
2633 .await
2634 .unwrap();
2635 assert_eq!(content, "one\ntwo\nthree\n");
2636 read_file_tx.take().unwrap().send(()).unwrap();
2637 thread
2638 .update(&mut cx, |thread, cx| {
2639 thread.write_text_file(
2640 path!("/tmp/foo").into(),
2641 "one\ntwo\nthree\nfour\nfive\n".to_string(),
2642 cx,
2643 )
2644 })
2645 .unwrap()
2646 .await
2647 .unwrap();
2648 Ok(acp::PromptResponse {
2649 stop_reason: acp::StopReason::EndTurn,
2650 meta: None,
2651 })
2652 }
2653 .boxed_local()
2654 },
2655 ));
2656
2657 let (worktree, pathbuf) = project
2658 .update(cx, |project, cx| {
2659 project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
2660 })
2661 .await
2662 .unwrap();
2663 let buffer = project
2664 .update(cx, |project, cx| {
2665 project.open_buffer((worktree.read(cx).id(), pathbuf), cx)
2666 })
2667 .await
2668 .unwrap();
2669
2670 let thread = cx
2671 .update(|cx| connection.new_thread(project, Path::new(path!("/tmp")), cx))
2672 .await
2673 .unwrap();
2674
2675 let request = thread.update(cx, |thread, cx| {
2676 thread.send_raw("Extend the count in /tmp/foo", cx)
2677 });
2678 read_file_rx.await.ok();
2679 buffer.update(cx, |buffer, cx| {
2680 buffer.edit([(0..0, "zero\n".to_string())], None, cx);
2681 });
2682 cx.run_until_parked();
2683 assert_eq!(
2684 buffer.read_with(cx, |buffer, _| buffer.text()),
2685 "zero\none\ntwo\nthree\nfour\nfive\n"
2686 );
2687 assert_eq!(
2688 String::from_utf8(fs.read_file_sync(path!("/tmp/foo")).unwrap()).unwrap(),
2689 "zero\none\ntwo\nthree\nfour\nfive\n"
2690 );
2691 request.await.unwrap();
2692 }
2693
2694 #[gpui::test]
2695 async fn test_reading_from_line(cx: &mut TestAppContext) {
2696 init_test(cx);
2697
2698 let fs = FakeFs::new(cx.executor());
2699 fs.insert_tree(path!("/tmp"), json!({"foo": "one\ntwo\nthree\nfour\n"}))
2700 .await;
2701 let project = Project::test(fs.clone(), [], cx).await;
2702 project
2703 .update(cx, |project, cx| {
2704 project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
2705 })
2706 .await
2707 .unwrap();
2708
2709 let connection = Rc::new(FakeAgentConnection::new());
2710
2711 let thread = cx
2712 .update(|cx| connection.new_thread(project, Path::new(path!("/tmp")), cx))
2713 .await
2714 .unwrap();
2715
2716 // Whole file
2717 let content = thread
2718 .update(cx, |thread, cx| {
2719 thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
2720 })
2721 .await
2722 .unwrap();
2723
2724 assert_eq!(content, "one\ntwo\nthree\nfour\n");
2725
2726 // Only start line
2727 let content = thread
2728 .update(cx, |thread, cx| {
2729 thread.read_text_file(path!("/tmp/foo").into(), Some(3), None, false, cx)
2730 })
2731 .await
2732 .unwrap();
2733
2734 assert_eq!(content, "three\nfour\n");
2735
2736 // Only limit
2737 let content = thread
2738 .update(cx, |thread, cx| {
2739 thread.read_text_file(path!("/tmp/foo").into(), None, Some(2), false, cx)
2740 })
2741 .await
2742 .unwrap();
2743
2744 assert_eq!(content, "one\ntwo\n");
2745
2746 // Range
2747 let content = thread
2748 .update(cx, |thread, cx| {
2749 thread.read_text_file(path!("/tmp/foo").into(), Some(2), Some(2), false, cx)
2750 })
2751 .await
2752 .unwrap();
2753
2754 assert_eq!(content, "two\nthree\n");
2755
2756 // Invalid
2757 let err = thread
2758 .update(cx, |thread, cx| {
2759 thread.read_text_file(path!("/tmp/foo").into(), Some(6), Some(2), false, cx)
2760 })
2761 .await
2762 .unwrap_err();
2763
2764 assert_eq!(
2765 err.to_string(),
2766 "Invalid params: \"Attempting to read beyond the end of the file, line 5:0\""
2767 );
2768 }
2769
2770 #[gpui::test]
2771 async fn test_reading_empty_file(cx: &mut TestAppContext) {
2772 init_test(cx);
2773
2774 let fs = FakeFs::new(cx.executor());
2775 fs.insert_tree(path!("/tmp"), json!({"foo": ""})).await;
2776 let project = Project::test(fs.clone(), [], cx).await;
2777 project
2778 .update(cx, |project, cx| {
2779 project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
2780 })
2781 .await
2782 .unwrap();
2783
2784 let connection = Rc::new(FakeAgentConnection::new());
2785
2786 let thread = cx
2787 .update(|cx| connection.new_thread(project, Path::new(path!("/tmp")), cx))
2788 .await
2789 .unwrap();
2790
2791 // Whole file
2792 let content = thread
2793 .update(cx, |thread, cx| {
2794 thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
2795 })
2796 .await
2797 .unwrap();
2798
2799 assert_eq!(content, "");
2800
2801 // Only start line
2802 let content = thread
2803 .update(cx, |thread, cx| {
2804 thread.read_text_file(path!("/tmp/foo").into(), Some(1), None, false, cx)
2805 })
2806 .await
2807 .unwrap();
2808
2809 assert_eq!(content, "");
2810
2811 // Only limit
2812 let content = thread
2813 .update(cx, |thread, cx| {
2814 thread.read_text_file(path!("/tmp/foo").into(), None, Some(2), false, cx)
2815 })
2816 .await
2817 .unwrap();
2818
2819 assert_eq!(content, "");
2820
2821 // Range
2822 let content = thread
2823 .update(cx, |thread, cx| {
2824 thread.read_text_file(path!("/tmp/foo").into(), Some(1), Some(1), false, cx)
2825 })
2826 .await
2827 .unwrap();
2828
2829 assert_eq!(content, "");
2830
2831 // Invalid
2832 let err = thread
2833 .update(cx, |thread, cx| {
2834 thread.read_text_file(path!("/tmp/foo").into(), Some(5), Some(2), false, cx)
2835 })
2836 .await
2837 .unwrap_err();
2838
2839 assert_eq!(
2840 err.to_string(),
2841 "Invalid params: \"Attempting to read beyond the end of the file, line 1:0\""
2842 );
2843 }
2844 #[gpui::test]
2845 async fn test_reading_non_existing_file(cx: &mut TestAppContext) {
2846 init_test(cx);
2847
2848 let fs = FakeFs::new(cx.executor());
2849 fs.insert_tree(path!("/tmp"), json!({})).await;
2850 let project = Project::test(fs.clone(), [], cx).await;
2851 project
2852 .update(cx, |project, cx| {
2853 project.find_or_create_worktree(path!("/tmp"), true, cx)
2854 })
2855 .await
2856 .unwrap();
2857
2858 let connection = Rc::new(FakeAgentConnection::new());
2859
2860 let thread = cx
2861 .update(|cx| connection.new_thread(project, Path::new(path!("/tmp")), cx))
2862 .await
2863 .unwrap();
2864
2865 // Out of project file
2866 let err = thread
2867 .update(cx, |thread, cx| {
2868 thread.read_text_file(path!("/foo").into(), None, None, false, cx)
2869 })
2870 .await
2871 .unwrap_err();
2872
2873 assert_eq!(err.code, acp::ErrorCode::RESOURCE_NOT_FOUND.code);
2874 }
2875
2876 #[gpui::test]
2877 async fn test_succeeding_canceled_toolcall(cx: &mut TestAppContext) {
2878 init_test(cx);
2879
2880 let fs = FakeFs::new(cx.executor());
2881 let project = Project::test(fs, [], cx).await;
2882 let id = acp::ToolCallId("test".into());
2883
2884 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
2885 let id = id.clone();
2886 move |_, thread, mut cx| {
2887 let id = id.clone();
2888 async move {
2889 thread
2890 .update(&mut cx, |thread, cx| {
2891 thread.handle_session_update(
2892 acp::SessionUpdate::ToolCall(acp::ToolCall {
2893 id: id.clone(),
2894 title: "Label".into(),
2895 kind: acp::ToolKind::Fetch,
2896 status: acp::ToolCallStatus::InProgress,
2897 content: vec![],
2898 locations: vec![],
2899 raw_input: None,
2900 raw_output: None,
2901 meta: None,
2902 }),
2903 cx,
2904 )
2905 })
2906 .unwrap()
2907 .unwrap();
2908 Ok(acp::PromptResponse {
2909 stop_reason: acp::StopReason::EndTurn,
2910 meta: None,
2911 })
2912 }
2913 .boxed_local()
2914 }
2915 }));
2916
2917 let thread = cx
2918 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
2919 .await
2920 .unwrap();
2921
2922 let request = thread.update(cx, |thread, cx| {
2923 thread.send_raw("Fetch https://example.com", cx)
2924 });
2925
2926 run_until_first_tool_call(&thread, cx).await;
2927
2928 thread.read_with(cx, |thread, _| {
2929 assert!(matches!(
2930 thread.entries[1],
2931 AgentThreadEntry::ToolCall(ToolCall {
2932 status: ToolCallStatus::InProgress,
2933 ..
2934 })
2935 ));
2936 });
2937
2938 thread.update(cx, |thread, cx| thread.cancel(cx)).await;
2939
2940 thread.read_with(cx, |thread, _| {
2941 assert!(matches!(
2942 &thread.entries[1],
2943 AgentThreadEntry::ToolCall(ToolCall {
2944 status: ToolCallStatus::Canceled,
2945 ..
2946 })
2947 ));
2948 });
2949
2950 thread
2951 .update(cx, |thread, cx| {
2952 thread.handle_session_update(
2953 acp::SessionUpdate::ToolCallUpdate(acp::ToolCallUpdate {
2954 id,
2955 fields: acp::ToolCallUpdateFields {
2956 status: Some(acp::ToolCallStatus::Completed),
2957 ..Default::default()
2958 },
2959 meta: None,
2960 }),
2961 cx,
2962 )
2963 })
2964 .unwrap();
2965
2966 request.await.unwrap();
2967
2968 thread.read_with(cx, |thread, _| {
2969 assert!(matches!(
2970 thread.entries[1],
2971 AgentThreadEntry::ToolCall(ToolCall {
2972 status: ToolCallStatus::Completed,
2973 ..
2974 })
2975 ));
2976 });
2977 }
2978
2979 #[gpui::test]
2980 async fn test_no_pending_edits_if_tool_calls_are_completed(cx: &mut TestAppContext) {
2981 init_test(cx);
2982 let fs = FakeFs::new(cx.background_executor.clone());
2983 fs.insert_tree(path!("/test"), json!({})).await;
2984 let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
2985
2986 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
2987 move |_, thread, mut cx| {
2988 async move {
2989 thread
2990 .update(&mut cx, |thread, cx| {
2991 thread.handle_session_update(
2992 acp::SessionUpdate::ToolCall(acp::ToolCall {
2993 id: acp::ToolCallId("test".into()),
2994 title: "Label".into(),
2995 kind: acp::ToolKind::Edit,
2996 status: acp::ToolCallStatus::Completed,
2997 content: vec![acp::ToolCallContent::Diff {
2998 diff: acp::Diff {
2999 path: "/test/test.txt".into(),
3000 old_text: None,
3001 new_text: "foo".into(),
3002 meta: None,
3003 },
3004 }],
3005 locations: vec![],
3006 raw_input: None,
3007 raw_output: None,
3008 meta: None,
3009 }),
3010 cx,
3011 )
3012 })
3013 .unwrap()
3014 .unwrap();
3015 Ok(acp::PromptResponse {
3016 stop_reason: acp::StopReason::EndTurn,
3017 meta: None,
3018 })
3019 }
3020 .boxed_local()
3021 }
3022 }));
3023
3024 let thread = cx
3025 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3026 .await
3027 .unwrap();
3028
3029 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Hi".into()], cx)))
3030 .await
3031 .unwrap();
3032
3033 assert!(cx.read(|cx| !thread.read(cx).has_pending_edit_tool_calls()));
3034 }
3035
3036 #[gpui::test(iterations = 10)]
3037 async fn test_checkpoints(cx: &mut TestAppContext) {
3038 init_test(cx);
3039 let fs = FakeFs::new(cx.background_executor.clone());
3040 fs.insert_tree(
3041 path!("/test"),
3042 json!({
3043 ".git": {}
3044 }),
3045 )
3046 .await;
3047 let project = Project::test(fs.clone(), [path!("/test").as_ref()], cx).await;
3048
3049 let simulate_changes = Arc::new(AtomicBool::new(true));
3050 let next_filename = Arc::new(AtomicUsize::new(0));
3051 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3052 let simulate_changes = simulate_changes.clone();
3053 let next_filename = next_filename.clone();
3054 let fs = fs.clone();
3055 move |request, thread, mut cx| {
3056 let fs = fs.clone();
3057 let simulate_changes = simulate_changes.clone();
3058 let next_filename = next_filename.clone();
3059 async move {
3060 if simulate_changes.load(SeqCst) {
3061 let filename = format!("/test/file-{}", next_filename.fetch_add(1, SeqCst));
3062 fs.write(Path::new(&filename), b"").await?;
3063 }
3064
3065 let acp::ContentBlock::Text(content) = &request.prompt[0] else {
3066 panic!("expected text content block");
3067 };
3068 thread.update(&mut cx, |thread, cx| {
3069 thread
3070 .handle_session_update(
3071 acp::SessionUpdate::AgentMessageChunk {
3072 content: content.text.to_uppercase().into(),
3073 },
3074 cx,
3075 )
3076 .unwrap();
3077 })?;
3078 Ok(acp::PromptResponse {
3079 stop_reason: acp::StopReason::EndTurn,
3080 meta: None,
3081 })
3082 }
3083 .boxed_local()
3084 }
3085 }));
3086 let thread = cx
3087 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3088 .await
3089 .unwrap();
3090
3091 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Lorem".into()], cx)))
3092 .await
3093 .unwrap();
3094 thread.read_with(cx, |thread, cx| {
3095 assert_eq!(
3096 thread.to_markdown(cx),
3097 indoc! {"
3098 ## User (checkpoint)
3099
3100 Lorem
3101
3102 ## Assistant
3103
3104 LOREM
3105
3106 "}
3107 );
3108 });
3109 assert_eq!(fs.files(), vec![Path::new(path!("/test/file-0"))]);
3110
3111 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["ipsum".into()], cx)))
3112 .await
3113 .unwrap();
3114 thread.read_with(cx, |thread, cx| {
3115 assert_eq!(
3116 thread.to_markdown(cx),
3117 indoc! {"
3118 ## User (checkpoint)
3119
3120 Lorem
3121
3122 ## Assistant
3123
3124 LOREM
3125
3126 ## User (checkpoint)
3127
3128 ipsum
3129
3130 ## Assistant
3131
3132 IPSUM
3133
3134 "}
3135 );
3136 });
3137 assert_eq!(
3138 fs.files(),
3139 vec![
3140 Path::new(path!("/test/file-0")),
3141 Path::new(path!("/test/file-1"))
3142 ]
3143 );
3144
3145 // Checkpoint isn't stored when there are no changes.
3146 simulate_changes.store(false, SeqCst);
3147 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["dolor".into()], cx)))
3148 .await
3149 .unwrap();
3150 thread.read_with(cx, |thread, cx| {
3151 assert_eq!(
3152 thread.to_markdown(cx),
3153 indoc! {"
3154 ## User (checkpoint)
3155
3156 Lorem
3157
3158 ## Assistant
3159
3160 LOREM
3161
3162 ## User (checkpoint)
3163
3164 ipsum
3165
3166 ## Assistant
3167
3168 IPSUM
3169
3170 ## User
3171
3172 dolor
3173
3174 ## Assistant
3175
3176 DOLOR
3177
3178 "}
3179 );
3180 });
3181 assert_eq!(
3182 fs.files(),
3183 vec![
3184 Path::new(path!("/test/file-0")),
3185 Path::new(path!("/test/file-1"))
3186 ]
3187 );
3188
3189 // Rewinding the conversation truncates the history and restores the checkpoint.
3190 thread
3191 .update(cx, |thread, cx| {
3192 let AgentThreadEntry::UserMessage(message) = &thread.entries[2] else {
3193 panic!("unexpected entries {:?}", thread.entries)
3194 };
3195 thread.restore_checkpoint(message.id.clone().unwrap(), cx)
3196 })
3197 .await
3198 .unwrap();
3199 thread.read_with(cx, |thread, cx| {
3200 assert_eq!(
3201 thread.to_markdown(cx),
3202 indoc! {"
3203 ## User (checkpoint)
3204
3205 Lorem
3206
3207 ## Assistant
3208
3209 LOREM
3210
3211 "}
3212 );
3213 });
3214 assert_eq!(fs.files(), vec![Path::new(path!("/test/file-0"))]);
3215 }
3216
3217 #[gpui::test]
3218 async fn test_tool_result_refusal(cx: &mut TestAppContext) {
3219 use std::sync::atomic::AtomicUsize;
3220 init_test(cx);
3221
3222 let fs = FakeFs::new(cx.executor());
3223 let project = Project::test(fs, None, cx).await;
3224
3225 // Create a connection that simulates refusal after tool result
3226 let prompt_count = Arc::new(AtomicUsize::new(0));
3227 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3228 let prompt_count = prompt_count.clone();
3229 move |_request, thread, mut cx| {
3230 let count = prompt_count.fetch_add(1, SeqCst);
3231 async move {
3232 if count == 0 {
3233 // First prompt: Generate a tool call with result
3234 thread.update(&mut cx, |thread, cx| {
3235 thread
3236 .handle_session_update(
3237 acp::SessionUpdate::ToolCall(acp::ToolCall {
3238 id: acp::ToolCallId("tool1".into()),
3239 title: "Test Tool".into(),
3240 kind: acp::ToolKind::Fetch,
3241 status: acp::ToolCallStatus::Completed,
3242 content: vec![],
3243 locations: vec![],
3244 raw_input: Some(serde_json::json!({"query": "test"})),
3245 raw_output: Some(
3246 serde_json::json!({"result": "inappropriate content"}),
3247 ),
3248 meta: None,
3249 }),
3250 cx,
3251 )
3252 .unwrap();
3253 })?;
3254
3255 // Now return refusal because of the tool result
3256 Ok(acp::PromptResponse {
3257 stop_reason: acp::StopReason::Refusal,
3258 meta: None,
3259 })
3260 } else {
3261 Ok(acp::PromptResponse {
3262 stop_reason: acp::StopReason::EndTurn,
3263 meta: None,
3264 })
3265 }
3266 }
3267 .boxed_local()
3268 }
3269 }));
3270
3271 let thread = cx
3272 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3273 .await
3274 .unwrap();
3275
3276 // Track if we see a Refusal event
3277 let saw_refusal_event = Arc::new(std::sync::Mutex::new(false));
3278 let saw_refusal_event_captured = saw_refusal_event.clone();
3279 thread.update(cx, |_thread, cx| {
3280 cx.subscribe(
3281 &thread,
3282 move |_thread, _event_thread, event: &AcpThreadEvent, _cx| {
3283 if matches!(event, AcpThreadEvent::Refusal) {
3284 *saw_refusal_event_captured.lock().unwrap() = true;
3285 }
3286 },
3287 )
3288 .detach();
3289 });
3290
3291 // Send a user message - this will trigger tool call and then refusal
3292 let send_task = thread.update(cx, |thread, cx| {
3293 thread.send(
3294 vec![acp::ContentBlock::Text(acp::TextContent {
3295 text: "Hello".into(),
3296 annotations: None,
3297 meta: None,
3298 })],
3299 cx,
3300 )
3301 });
3302 cx.background_executor.spawn(send_task).detach();
3303 cx.run_until_parked();
3304
3305 // Verify that:
3306 // 1. A Refusal event WAS emitted (because it's a tool result refusal, not user prompt)
3307 // 2. The user message was NOT truncated
3308 assert!(
3309 *saw_refusal_event.lock().unwrap(),
3310 "Refusal event should be emitted for tool result refusals"
3311 );
3312
3313 thread.read_with(cx, |thread, _| {
3314 let entries = thread.entries();
3315 assert!(entries.len() >= 2, "Should have user message and tool call");
3316
3317 // Verify user message is still there
3318 assert!(
3319 matches!(entries[0], AgentThreadEntry::UserMessage(_)),
3320 "User message should not be truncated"
3321 );
3322
3323 // Verify tool call is there with result
3324 if let AgentThreadEntry::ToolCall(tool_call) = &entries[1] {
3325 assert!(
3326 tool_call.raw_output.is_some(),
3327 "Tool call should have output"
3328 );
3329 } else {
3330 panic!("Expected tool call at index 1");
3331 }
3332 });
3333 }
3334
3335 #[gpui::test]
3336 async fn test_user_prompt_refusal_emits_event(cx: &mut TestAppContext) {
3337 init_test(cx);
3338
3339 let fs = FakeFs::new(cx.executor());
3340 let project = Project::test(fs, None, cx).await;
3341
3342 let refuse_next = Arc::new(AtomicBool::new(false));
3343 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3344 let refuse_next = refuse_next.clone();
3345 move |_request, _thread, _cx| {
3346 if refuse_next.load(SeqCst) {
3347 async move {
3348 Ok(acp::PromptResponse {
3349 stop_reason: acp::StopReason::Refusal,
3350 meta: None,
3351 })
3352 }
3353 .boxed_local()
3354 } else {
3355 async move {
3356 Ok(acp::PromptResponse {
3357 stop_reason: acp::StopReason::EndTurn,
3358 meta: None,
3359 })
3360 }
3361 .boxed_local()
3362 }
3363 }
3364 }));
3365
3366 let thread = cx
3367 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3368 .await
3369 .unwrap();
3370
3371 // Track if we see a Refusal event
3372 let saw_refusal_event = Arc::new(std::sync::Mutex::new(false));
3373 let saw_refusal_event_captured = saw_refusal_event.clone();
3374 thread.update(cx, |_thread, cx| {
3375 cx.subscribe(
3376 &thread,
3377 move |_thread, _event_thread, event: &AcpThreadEvent, _cx| {
3378 if matches!(event, AcpThreadEvent::Refusal) {
3379 *saw_refusal_event_captured.lock().unwrap() = true;
3380 }
3381 },
3382 )
3383 .detach();
3384 });
3385
3386 // Send a message that will be refused
3387 refuse_next.store(true, SeqCst);
3388 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["hello".into()], cx)))
3389 .await
3390 .unwrap();
3391
3392 // Verify that a Refusal event WAS emitted for user prompt refusal
3393 assert!(
3394 *saw_refusal_event.lock().unwrap(),
3395 "Refusal event should be emitted for user prompt refusals"
3396 );
3397
3398 // Verify the message was truncated (user prompt refusal)
3399 thread.read_with(cx, |thread, cx| {
3400 assert_eq!(thread.to_markdown(cx), "");
3401 });
3402 }
3403
3404 #[gpui::test]
3405 async fn test_refusal(cx: &mut TestAppContext) {
3406 init_test(cx);
3407 let fs = FakeFs::new(cx.background_executor.clone());
3408 fs.insert_tree(path!("/"), json!({})).await;
3409 let project = Project::test(fs.clone(), [path!("/").as_ref()], cx).await;
3410
3411 let refuse_next = Arc::new(AtomicBool::new(false));
3412 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3413 let refuse_next = refuse_next.clone();
3414 move |request, thread, mut cx| {
3415 let refuse_next = refuse_next.clone();
3416 async move {
3417 if refuse_next.load(SeqCst) {
3418 return Ok(acp::PromptResponse {
3419 stop_reason: acp::StopReason::Refusal,
3420 meta: None,
3421 });
3422 }
3423
3424 let acp::ContentBlock::Text(content) = &request.prompt[0] else {
3425 panic!("expected text content block");
3426 };
3427 thread.update(&mut cx, |thread, cx| {
3428 thread
3429 .handle_session_update(
3430 acp::SessionUpdate::AgentMessageChunk {
3431 content: content.text.to_uppercase().into(),
3432 },
3433 cx,
3434 )
3435 .unwrap();
3436 })?;
3437 Ok(acp::PromptResponse {
3438 stop_reason: acp::StopReason::EndTurn,
3439 meta: None,
3440 })
3441 }
3442 .boxed_local()
3443 }
3444 }));
3445 let thread = cx
3446 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3447 .await
3448 .unwrap();
3449
3450 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["hello".into()], cx)))
3451 .await
3452 .unwrap();
3453 thread.read_with(cx, |thread, cx| {
3454 assert_eq!(
3455 thread.to_markdown(cx),
3456 indoc! {"
3457 ## User
3458
3459 hello
3460
3461 ## Assistant
3462
3463 HELLO
3464
3465 "}
3466 );
3467 });
3468
3469 // Simulate refusing the second message. The message should be truncated
3470 // when a user prompt is refused.
3471 refuse_next.store(true, SeqCst);
3472 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["world".into()], cx)))
3473 .await
3474 .unwrap();
3475 thread.read_with(cx, |thread, cx| {
3476 assert_eq!(
3477 thread.to_markdown(cx),
3478 indoc! {"
3479 ## User
3480
3481 hello
3482
3483 ## Assistant
3484
3485 HELLO
3486
3487 "}
3488 );
3489 });
3490 }
3491
3492 async fn run_until_first_tool_call(
3493 thread: &Entity<AcpThread>,
3494 cx: &mut TestAppContext,
3495 ) -> usize {
3496 let (mut tx, mut rx) = mpsc::channel::<usize>(1);
3497
3498 let subscription = cx.update(|cx| {
3499 cx.subscribe(thread, move |thread, _, cx| {
3500 for (ix, entry) in thread.read(cx).entries.iter().enumerate() {
3501 if matches!(entry, AgentThreadEntry::ToolCall(_)) {
3502 return tx.try_send(ix).unwrap();
3503 }
3504 }
3505 })
3506 });
3507
3508 select! {
3509 _ = futures::FutureExt::fuse(smol::Timer::after(Duration::from_secs(10))) => {
3510 panic!("Timeout waiting for tool call")
3511 }
3512 ix = rx.next().fuse() => {
3513 drop(subscription);
3514 ix.unwrap()
3515 }
3516 }
3517 }
3518
3519 #[derive(Clone, Default)]
3520 struct FakeAgentConnection {
3521 auth_methods: Vec<acp::AuthMethod>,
3522 sessions: Arc<parking_lot::Mutex<HashMap<acp::SessionId, WeakEntity<AcpThread>>>>,
3523 on_user_message: Option<
3524 Rc<
3525 dyn Fn(
3526 acp::PromptRequest,
3527 WeakEntity<AcpThread>,
3528 AsyncApp,
3529 ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
3530 + 'static,
3531 >,
3532 >,
3533 }
3534
3535 impl FakeAgentConnection {
3536 fn new() -> Self {
3537 Self {
3538 auth_methods: Vec::new(),
3539 on_user_message: None,
3540 sessions: Arc::default(),
3541 }
3542 }
3543
3544 #[expect(unused)]
3545 fn with_auth_methods(mut self, auth_methods: Vec<acp::AuthMethod>) -> Self {
3546 self.auth_methods = auth_methods;
3547 self
3548 }
3549
3550 fn on_user_message(
3551 mut self,
3552 handler: impl Fn(
3553 acp::PromptRequest,
3554 WeakEntity<AcpThread>,
3555 AsyncApp,
3556 ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
3557 + 'static,
3558 ) -> Self {
3559 self.on_user_message.replace(Rc::new(handler));
3560 self
3561 }
3562 }
3563
3564 impl AgentConnection for FakeAgentConnection {
3565 fn auth_methods(&self) -> &[acp::AuthMethod] {
3566 &self.auth_methods
3567 }
3568
3569 fn new_thread(
3570 self: Rc<Self>,
3571 project: Entity<Project>,
3572 _cwd: &Path,
3573 cx: &mut App,
3574 ) -> Task<gpui::Result<Entity<AcpThread>>> {
3575 let session_id = acp::SessionId(
3576 rand::rng()
3577 .sample_iter(&distr::Alphanumeric)
3578 .take(7)
3579 .map(char::from)
3580 .collect::<String>()
3581 .into(),
3582 );
3583 let action_log = cx.new(|_| ActionLog::new(project.clone()));
3584 let thread = cx.new(|cx| {
3585 AcpThread::new(
3586 "Test",
3587 self.clone(),
3588 project,
3589 action_log,
3590 session_id.clone(),
3591 watch::Receiver::constant(acp::PromptCapabilities {
3592 image: true,
3593 audio: true,
3594 embedded_context: true,
3595 meta: None,
3596 }),
3597 cx,
3598 )
3599 });
3600 self.sessions.lock().insert(session_id, thread.downgrade());
3601 Task::ready(Ok(thread))
3602 }
3603
3604 fn authenticate(&self, method: acp::AuthMethodId, _cx: &mut App) -> Task<gpui::Result<()>> {
3605 if self.auth_methods().iter().any(|m| m.id == method) {
3606 Task::ready(Ok(()))
3607 } else {
3608 Task::ready(Err(anyhow!("Invalid Auth Method")))
3609 }
3610 }
3611
3612 fn prompt(
3613 &self,
3614 _id: Option<UserMessageId>,
3615 params: acp::PromptRequest,
3616 cx: &mut App,
3617 ) -> Task<gpui::Result<acp::PromptResponse>> {
3618 let sessions = self.sessions.lock();
3619 let thread = sessions.get(¶ms.session_id).unwrap();
3620 if let Some(handler) = &self.on_user_message {
3621 let handler = handler.clone();
3622 let thread = thread.clone();
3623 cx.spawn(async move |cx| handler(params, thread, cx.clone()).await)
3624 } else {
3625 Task::ready(Ok(acp::PromptResponse {
3626 stop_reason: acp::StopReason::EndTurn,
3627 meta: None,
3628 }))
3629 }
3630 }
3631
3632 fn cancel(&self, session_id: &acp::SessionId, cx: &mut App) {
3633 let sessions = self.sessions.lock();
3634 let thread = sessions.get(session_id).unwrap().clone();
3635
3636 cx.spawn(async move |cx| {
3637 thread
3638 .update(cx, |thread, cx| thread.cancel(cx))
3639 .unwrap()
3640 .await
3641 })
3642 .detach();
3643 }
3644
3645 fn truncate(
3646 &self,
3647 session_id: &acp::SessionId,
3648 _cx: &App,
3649 ) -> Option<Rc<dyn AgentSessionTruncate>> {
3650 Some(Rc::new(FakeAgentSessionEditor {
3651 _session_id: session_id.clone(),
3652 }))
3653 }
3654
3655 fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
3656 self
3657 }
3658 }
3659
3660 struct FakeAgentSessionEditor {
3661 _session_id: acp::SessionId,
3662 }
3663
3664 impl AgentSessionTruncate for FakeAgentSessionEditor {
3665 fn run(&self, _message_id: UserMessageId, _cx: &mut App) -> Task<Result<()>> {
3666 Task::ready(Ok(()))
3667 }
3668 }
3669
3670 #[gpui::test]
3671 async fn test_tool_call_not_found_creates_failed_entry(cx: &mut TestAppContext) {
3672 init_test(cx);
3673
3674 let fs = FakeFs::new(cx.executor());
3675 let project = Project::test(fs, [], cx).await;
3676 let connection = Rc::new(FakeAgentConnection::new());
3677 let thread = cx
3678 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3679 .await
3680 .unwrap();
3681
3682 // Try to update a tool call that doesn't exist
3683 let nonexistent_id = acp::ToolCallId("nonexistent-tool-call".into());
3684 thread.update(cx, |thread, cx| {
3685 let result = thread.handle_session_update(
3686 acp::SessionUpdate::ToolCallUpdate(acp::ToolCallUpdate {
3687 id: nonexistent_id.clone(),
3688 fields: acp::ToolCallUpdateFields {
3689 status: Some(acp::ToolCallStatus::Completed),
3690 ..Default::default()
3691 },
3692 meta: None,
3693 }),
3694 cx,
3695 );
3696
3697 // The update should succeed (not return an error)
3698 assert!(result.is_ok());
3699
3700 // There should now be exactly one entry in the thread
3701 assert_eq!(thread.entries.len(), 1);
3702
3703 // The entry should be a failed tool call
3704 if let AgentThreadEntry::ToolCall(tool_call) = &thread.entries[0] {
3705 assert_eq!(tool_call.id, nonexistent_id);
3706 assert!(matches!(tool_call.status, ToolCallStatus::Failed));
3707 assert_eq!(tool_call.kind, acp::ToolKind::Fetch);
3708
3709 // Check that the content contains the error message
3710 assert_eq!(tool_call.content.len(), 1);
3711 if let ToolCallContent::ContentBlock(content_block) = &tool_call.content[0] {
3712 match content_block {
3713 ContentBlock::Markdown { markdown } => {
3714 let markdown_text = markdown.read(cx).source();
3715 assert!(markdown_text.contains("Tool call not found"));
3716 }
3717 ContentBlock::Empty => panic!("Expected markdown content, got empty"),
3718 ContentBlock::ResourceLink { .. } => {
3719 panic!("Expected markdown content, got resource link")
3720 }
3721 }
3722 } else {
3723 panic!("Expected ContentBlock, got: {:?}", tool_call.content[0]);
3724 }
3725 } else {
3726 panic!("Expected ToolCall entry, got: {:?}", thread.entries[0]);
3727 }
3728 });
3729 }
3730}