1mod connection;
2mod diff;
3mod mention;
4mod terminal;
5
6use agent_settings::AgentSettings;
7use collections::HashSet;
8pub use connection::*;
9pub use diff::*;
10use language::language_settings::FormatOnSave;
11pub use mention::*;
12use project::lsp_store::{FormatTrigger, LspFormatTarget};
13use serde::{Deserialize, Serialize};
14use settings::Settings as _;
15use task::{Shell, ShellBuilder};
16pub use terminal::*;
17
18use action_log::{ActionLog, ActionLogTelemetry};
19use agent_client_protocol::{self as acp};
20use anyhow::{Context as _, Result, anyhow};
21use editor::Bias;
22use futures::{FutureExt, channel::oneshot, future::BoxFuture};
23use gpui::{AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Task, WeakEntity};
24use itertools::Itertools;
25use language::{Anchor, Buffer, BufferSnapshot, LanguageRegistry, Point, ToPoint, text_diff};
26use markdown::Markdown;
27use project::{AgentLocation, Project, git_store::GitStoreCheckpoint};
28use std::collections::HashMap;
29use std::error::Error;
30use std::fmt::{Formatter, Write};
31use std::ops::Range;
32use std::process::ExitStatus;
33use std::rc::Rc;
34use std::time::{Duration, Instant};
35use std::{fmt::Display, mem, path::PathBuf, sync::Arc};
36use ui::App;
37use util::{ResultExt, get_default_system_shell_preferring_bash, paths::PathStyle};
38use uuid::Uuid;
39
40#[derive(Debug)]
41pub struct UserMessage {
42 pub id: Option<UserMessageId>,
43 pub content: ContentBlock,
44 pub chunks: Vec<acp::ContentBlock>,
45 pub checkpoint: Option<Checkpoint>,
46 pub indented: bool,
47}
48
49#[derive(Debug)]
50pub struct Checkpoint {
51 git_checkpoint: GitStoreCheckpoint,
52 pub show: bool,
53}
54
55impl UserMessage {
56 fn to_markdown(&self, cx: &App) -> String {
57 let mut markdown = String::new();
58 if self
59 .checkpoint
60 .as_ref()
61 .is_some_and(|checkpoint| checkpoint.show)
62 {
63 writeln!(markdown, "## User (checkpoint)").unwrap();
64 } else {
65 writeln!(markdown, "## User").unwrap();
66 }
67 writeln!(markdown).unwrap();
68 writeln!(markdown, "{}", self.content.to_markdown(cx)).unwrap();
69 writeln!(markdown).unwrap();
70 markdown
71 }
72}
73
74#[derive(Debug, PartialEq)]
75pub struct AssistantMessage {
76 pub chunks: Vec<AssistantMessageChunk>,
77 pub indented: bool,
78}
79
80impl AssistantMessage {
81 pub fn to_markdown(&self, cx: &App) -> String {
82 format!(
83 "## Assistant\n\n{}\n\n",
84 self.chunks
85 .iter()
86 .map(|chunk| chunk.to_markdown(cx))
87 .join("\n\n")
88 )
89 }
90}
91
92#[derive(Debug, PartialEq)]
93pub enum AssistantMessageChunk {
94 Message { block: ContentBlock },
95 Thought { block: ContentBlock },
96}
97
98impl AssistantMessageChunk {
99 pub fn from_str(
100 chunk: &str,
101 language_registry: &Arc<LanguageRegistry>,
102 path_style: PathStyle,
103 cx: &mut App,
104 ) -> Self {
105 Self::Message {
106 block: ContentBlock::new(chunk.into(), language_registry, path_style, cx),
107 }
108 }
109
110 fn to_markdown(&self, cx: &App) -> String {
111 match self {
112 Self::Message { block } => block.to_markdown(cx).to_string(),
113 Self::Thought { block } => {
114 format!("<thinking>\n{}\n</thinking>", block.to_markdown(cx))
115 }
116 }
117 }
118}
119
120#[derive(Debug)]
121pub enum AgentThreadEntry {
122 UserMessage(UserMessage),
123 AssistantMessage(AssistantMessage),
124 ToolCall(ToolCall),
125}
126
127impl AgentThreadEntry {
128 pub fn is_indented(&self) -> bool {
129 match self {
130 Self::UserMessage(message) => message.indented,
131 Self::AssistantMessage(message) => message.indented,
132 Self::ToolCall(_) => false,
133 }
134 }
135
136 pub fn to_markdown(&self, cx: &App) -> String {
137 match self {
138 Self::UserMessage(message) => message.to_markdown(cx),
139 Self::AssistantMessage(message) => message.to_markdown(cx),
140 Self::ToolCall(tool_call) => tool_call.to_markdown(cx),
141 }
142 }
143
144 pub fn user_message(&self) -> Option<&UserMessage> {
145 if let AgentThreadEntry::UserMessage(message) = self {
146 Some(message)
147 } else {
148 None
149 }
150 }
151
152 pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
153 if let AgentThreadEntry::ToolCall(call) = self {
154 itertools::Either::Left(call.diffs())
155 } else {
156 itertools::Either::Right(std::iter::empty())
157 }
158 }
159
160 pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
161 if let AgentThreadEntry::ToolCall(call) = self {
162 itertools::Either::Left(call.terminals())
163 } else {
164 itertools::Either::Right(std::iter::empty())
165 }
166 }
167
168 pub fn location(&self, ix: usize) -> Option<(acp::ToolCallLocation, AgentLocation)> {
169 if let AgentThreadEntry::ToolCall(ToolCall {
170 locations,
171 resolved_locations,
172 ..
173 }) = self
174 {
175 Some((
176 locations.get(ix)?.clone(),
177 resolved_locations.get(ix)?.clone()?,
178 ))
179 } else {
180 None
181 }
182 }
183}
184
185#[derive(Debug)]
186pub struct ToolCall {
187 pub id: acp::ToolCallId,
188 pub label: Entity<Markdown>,
189 pub kind: acp::ToolKind,
190 pub content: Vec<ToolCallContent>,
191 pub status: ToolCallStatus,
192 pub locations: Vec<acp::ToolCallLocation>,
193 pub resolved_locations: Vec<Option<AgentLocation>>,
194 pub raw_input: Option<serde_json::Value>,
195 pub raw_input_markdown: Option<Entity<Markdown>>,
196 pub raw_output: Option<serde_json::Value>,
197}
198
199impl ToolCall {
200 fn from_acp(
201 tool_call: acp::ToolCall,
202 status: ToolCallStatus,
203 language_registry: Arc<LanguageRegistry>,
204 path_style: PathStyle,
205 terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
206 cx: &mut App,
207 ) -> Result<Self> {
208 let title = if let Some((first_line, _)) = tool_call.title.split_once("\n") {
209 first_line.to_owned() + "…"
210 } else {
211 tool_call.title
212 };
213 let mut content = Vec::with_capacity(tool_call.content.len());
214 for item in tool_call.content {
215 if let Some(item) = ToolCallContent::from_acp(
216 item,
217 language_registry.clone(),
218 path_style,
219 terminals,
220 cx,
221 )? {
222 content.push(item);
223 }
224 }
225
226 let raw_input_markdown = tool_call
227 .raw_input
228 .as_ref()
229 .and_then(|input| markdown_for_raw_output(input, &language_registry, cx));
230
231 let result = Self {
232 id: tool_call.tool_call_id,
233 label: cx
234 .new(|cx| Markdown::new(title.into(), Some(language_registry.clone()), None, cx)),
235 kind: tool_call.kind,
236 content,
237 locations: tool_call.locations,
238 resolved_locations: Vec::default(),
239 status,
240 raw_input: tool_call.raw_input,
241 raw_input_markdown,
242 raw_output: tool_call.raw_output,
243 };
244 Ok(result)
245 }
246
247 fn update_fields(
248 &mut self,
249 fields: acp::ToolCallUpdateFields,
250 language_registry: Arc<LanguageRegistry>,
251 path_style: PathStyle,
252 terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
253 cx: &mut App,
254 ) -> Result<()> {
255 let acp::ToolCallUpdateFields {
256 kind,
257 status,
258 title,
259 content,
260 locations,
261 raw_input,
262 raw_output,
263 ..
264 } = fields;
265
266 if let Some(kind) = kind {
267 self.kind = kind;
268 }
269
270 if let Some(status) = status {
271 self.status = status.into();
272 }
273
274 if let Some(title) = title {
275 self.label.update(cx, |label, cx| {
276 if let Some((first_line, _)) = title.split_once("\n") {
277 label.replace(first_line.to_owned() + "…", cx)
278 } else {
279 label.replace(title, cx);
280 }
281 });
282 }
283
284 if let Some(content) = content {
285 let mut new_content_len = content.len();
286 let mut content = content.into_iter();
287
288 // Reuse existing content if we can
289 for (old, new) in self.content.iter_mut().zip(content.by_ref()) {
290 let valid_content =
291 old.update_from_acp(new, language_registry.clone(), path_style, terminals, cx)?;
292 if !valid_content {
293 new_content_len -= 1;
294 }
295 }
296 for new in content {
297 if let Some(new) = ToolCallContent::from_acp(
298 new,
299 language_registry.clone(),
300 path_style,
301 terminals,
302 cx,
303 )? {
304 self.content.push(new);
305 } else {
306 new_content_len -= 1;
307 }
308 }
309 self.content.truncate(new_content_len);
310 }
311
312 if let Some(locations) = locations {
313 self.locations = locations;
314 }
315
316 if let Some(raw_input) = raw_input {
317 self.raw_input_markdown = markdown_for_raw_output(&raw_input, &language_registry, cx);
318 self.raw_input = Some(raw_input);
319 }
320
321 if let Some(raw_output) = raw_output {
322 if self.content.is_empty()
323 && let Some(markdown) = markdown_for_raw_output(&raw_output, &language_registry, cx)
324 {
325 self.content
326 .push(ToolCallContent::ContentBlock(ContentBlock::Markdown {
327 markdown,
328 }));
329 }
330 self.raw_output = Some(raw_output);
331 }
332 Ok(())
333 }
334
335 pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
336 self.content.iter().filter_map(|content| match content {
337 ToolCallContent::Diff(diff) => Some(diff),
338 ToolCallContent::ContentBlock(_) => None,
339 ToolCallContent::Terminal(_) => None,
340 })
341 }
342
343 pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
344 self.content.iter().filter_map(|content| match content {
345 ToolCallContent::Terminal(terminal) => Some(terminal),
346 ToolCallContent::ContentBlock(_) => None,
347 ToolCallContent::Diff(_) => None,
348 })
349 }
350
351 fn to_markdown(&self, cx: &App) -> String {
352 let mut markdown = format!(
353 "**Tool Call: {}**\nStatus: {}\n\n",
354 self.label.read(cx).source(),
355 self.status
356 );
357 for content in &self.content {
358 markdown.push_str(content.to_markdown(cx).as_str());
359 markdown.push_str("\n\n");
360 }
361 markdown
362 }
363
364 async fn resolve_location(
365 location: acp::ToolCallLocation,
366 project: WeakEntity<Project>,
367 cx: &mut AsyncApp,
368 ) -> Option<ResolvedLocation> {
369 let buffer = project
370 .update(cx, |project, cx| {
371 project
372 .project_path_for_absolute_path(&location.path, cx)
373 .map(|path| project.open_buffer(path, cx))
374 })
375 .ok()??;
376 let buffer = buffer.await.log_err()?;
377 let position = buffer
378 .update(cx, |buffer, _| {
379 let snapshot = buffer.snapshot();
380 if let Some(row) = location.line {
381 let column = snapshot.indent_size_for_line(row).len;
382 let point = snapshot.clip_point(Point::new(row, column), Bias::Left);
383 snapshot.anchor_before(point)
384 } else {
385 Anchor::min_for_buffer(snapshot.remote_id())
386 }
387 })
388 .ok()?;
389
390 Some(ResolvedLocation { buffer, position })
391 }
392
393 fn resolve_locations(
394 &self,
395 project: Entity<Project>,
396 cx: &mut App,
397 ) -> Task<Vec<Option<ResolvedLocation>>> {
398 let locations = self.locations.clone();
399 project.update(cx, |_, cx| {
400 cx.spawn(async move |project, cx| {
401 let mut new_locations = Vec::new();
402 for location in locations {
403 new_locations.push(Self::resolve_location(location, project.clone(), cx).await);
404 }
405 new_locations
406 })
407 })
408 }
409}
410
411// Separate so we can hold a strong reference to the buffer
412// for saving on the thread
413#[derive(Clone, Debug, PartialEq, Eq)]
414struct ResolvedLocation {
415 buffer: Entity<Buffer>,
416 position: Anchor,
417}
418
419impl From<&ResolvedLocation> for AgentLocation {
420 fn from(value: &ResolvedLocation) -> Self {
421 Self {
422 buffer: value.buffer.downgrade(),
423 position: value.position,
424 }
425 }
426}
427
428#[derive(Debug)]
429pub enum ToolCallStatus {
430 /// The tool call hasn't started running yet, but we start showing it to
431 /// the user.
432 Pending,
433 /// The tool call is waiting for confirmation from the user.
434 WaitingForConfirmation {
435 options: Vec<acp::PermissionOption>,
436 respond_tx: oneshot::Sender<acp::PermissionOptionId>,
437 },
438 /// The tool call is currently running.
439 InProgress,
440 /// The tool call completed successfully.
441 Completed,
442 /// The tool call failed.
443 Failed,
444 /// The user rejected the tool call.
445 Rejected,
446 /// The user canceled generation so the tool call was canceled.
447 Canceled,
448}
449
450impl From<acp::ToolCallStatus> for ToolCallStatus {
451 fn from(status: acp::ToolCallStatus) -> Self {
452 match status {
453 acp::ToolCallStatus::Pending => Self::Pending,
454 acp::ToolCallStatus::InProgress => Self::InProgress,
455 acp::ToolCallStatus::Completed => Self::Completed,
456 acp::ToolCallStatus::Failed => Self::Failed,
457 _ => Self::Pending,
458 }
459 }
460}
461
462impl Display for ToolCallStatus {
463 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
464 write!(
465 f,
466 "{}",
467 match self {
468 ToolCallStatus::Pending => "Pending",
469 ToolCallStatus::WaitingForConfirmation { .. } => "Waiting for confirmation",
470 ToolCallStatus::InProgress => "In Progress",
471 ToolCallStatus::Completed => "Completed",
472 ToolCallStatus::Failed => "Failed",
473 ToolCallStatus::Rejected => "Rejected",
474 ToolCallStatus::Canceled => "Canceled",
475 }
476 )
477 }
478}
479
480#[derive(Debug, PartialEq, Clone)]
481pub enum ContentBlock {
482 Empty,
483 Markdown { markdown: Entity<Markdown> },
484 ResourceLink { resource_link: acp::ResourceLink },
485}
486
487impl ContentBlock {
488 pub fn new(
489 block: acp::ContentBlock,
490 language_registry: &Arc<LanguageRegistry>,
491 path_style: PathStyle,
492 cx: &mut App,
493 ) -> Self {
494 let mut this = Self::Empty;
495 this.append(block, language_registry, path_style, cx);
496 this
497 }
498
499 pub fn new_combined(
500 blocks: impl IntoIterator<Item = acp::ContentBlock>,
501 language_registry: Arc<LanguageRegistry>,
502 path_style: PathStyle,
503 cx: &mut App,
504 ) -> Self {
505 let mut this = Self::Empty;
506 for block in blocks {
507 this.append(block, &language_registry, path_style, cx);
508 }
509 this
510 }
511
512 pub fn append(
513 &mut self,
514 block: acp::ContentBlock,
515 language_registry: &Arc<LanguageRegistry>,
516 path_style: PathStyle,
517 cx: &mut App,
518 ) {
519 if matches!(self, ContentBlock::Empty)
520 && let acp::ContentBlock::ResourceLink(resource_link) = block
521 {
522 *self = ContentBlock::ResourceLink { resource_link };
523 return;
524 }
525
526 let new_content = self.block_string_contents(block, path_style);
527
528 match self {
529 ContentBlock::Empty => {
530 *self = Self::create_markdown_block(new_content, language_registry, cx);
531 }
532 ContentBlock::Markdown { markdown } => {
533 markdown.update(cx, |markdown, cx| markdown.append(&new_content, cx));
534 }
535 ContentBlock::ResourceLink { resource_link } => {
536 let existing_content = Self::resource_link_md(&resource_link.uri, path_style);
537 let combined = format!("{}\n{}", existing_content, new_content);
538
539 *self = Self::create_markdown_block(combined, language_registry, cx);
540 }
541 }
542 }
543
544 fn create_markdown_block(
545 content: String,
546 language_registry: &Arc<LanguageRegistry>,
547 cx: &mut App,
548 ) -> ContentBlock {
549 ContentBlock::Markdown {
550 markdown: cx
551 .new(|cx| Markdown::new(content.into(), Some(language_registry.clone()), None, cx)),
552 }
553 }
554
555 fn block_string_contents(&self, block: acp::ContentBlock, path_style: PathStyle) -> String {
556 match block {
557 acp::ContentBlock::Text(text_content) => text_content.text,
558 acp::ContentBlock::ResourceLink(resource_link) => {
559 Self::resource_link_md(&resource_link.uri, path_style)
560 }
561 acp::ContentBlock::Resource(acp::EmbeddedResource {
562 resource:
563 acp::EmbeddedResourceResource::TextResourceContents(acp::TextResourceContents {
564 uri,
565 ..
566 }),
567 ..
568 }) => Self::resource_link_md(&uri, path_style),
569 acp::ContentBlock::Image(image) => Self::image_md(&image),
570 _ => String::new(),
571 }
572 }
573
574 fn resource_link_md(uri: &str, path_style: PathStyle) -> String {
575 if let Some(uri) = MentionUri::parse(uri, path_style).log_err() {
576 uri.as_link().to_string()
577 } else {
578 uri.to_string()
579 }
580 }
581
582 fn image_md(_image: &acp::ImageContent) -> String {
583 "`Image`".into()
584 }
585
586 pub fn to_markdown<'a>(&'a self, cx: &'a App) -> &'a str {
587 match self {
588 ContentBlock::Empty => "",
589 ContentBlock::Markdown { markdown } => markdown.read(cx).source(),
590 ContentBlock::ResourceLink { resource_link } => &resource_link.uri,
591 }
592 }
593
594 pub fn markdown(&self) -> Option<&Entity<Markdown>> {
595 match self {
596 ContentBlock::Empty => None,
597 ContentBlock::Markdown { markdown } => Some(markdown),
598 ContentBlock::ResourceLink { .. } => None,
599 }
600 }
601
602 pub fn resource_link(&self) -> Option<&acp::ResourceLink> {
603 match self {
604 ContentBlock::ResourceLink { resource_link } => Some(resource_link),
605 _ => None,
606 }
607 }
608}
609
610#[derive(Debug)]
611pub enum ToolCallContent {
612 ContentBlock(ContentBlock),
613 Diff(Entity<Diff>),
614 Terminal(Entity<Terminal>),
615}
616
617impl ToolCallContent {
618 pub fn from_acp(
619 content: acp::ToolCallContent,
620 language_registry: Arc<LanguageRegistry>,
621 path_style: PathStyle,
622 terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
623 cx: &mut App,
624 ) -> Result<Option<Self>> {
625 match content {
626 acp::ToolCallContent::Content(acp::Content { content, .. }) => {
627 Ok(Some(Self::ContentBlock(ContentBlock::new(
628 content,
629 &language_registry,
630 path_style,
631 cx,
632 ))))
633 }
634 acp::ToolCallContent::Diff(diff) => Ok(Some(Self::Diff(cx.new(|cx| {
635 Diff::finalized(
636 diff.path.to_string_lossy().into_owned(),
637 diff.old_text,
638 diff.new_text,
639 language_registry,
640 cx,
641 )
642 })))),
643 acp::ToolCallContent::Terminal(acp::Terminal { terminal_id, .. }) => terminals
644 .get(&terminal_id)
645 .cloned()
646 .map(|terminal| Some(Self::Terminal(terminal)))
647 .ok_or_else(|| anyhow::anyhow!("Terminal with id `{}` not found", terminal_id)),
648 _ => Ok(None),
649 }
650 }
651
652 pub fn update_from_acp(
653 &mut self,
654 new: acp::ToolCallContent,
655 language_registry: Arc<LanguageRegistry>,
656 path_style: PathStyle,
657 terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
658 cx: &mut App,
659 ) -> Result<bool> {
660 let needs_update = match (&self, &new) {
661 (Self::Diff(old_diff), acp::ToolCallContent::Diff(new_diff)) => {
662 old_diff.read(cx).needs_update(
663 new_diff.old_text.as_deref().unwrap_or(""),
664 &new_diff.new_text,
665 cx,
666 )
667 }
668 _ => true,
669 };
670
671 if let Some(update) = Self::from_acp(new, language_registry, path_style, terminals, cx)? {
672 if needs_update {
673 *self = update;
674 }
675 Ok(true)
676 } else {
677 Ok(false)
678 }
679 }
680
681 pub fn to_markdown(&self, cx: &App) -> String {
682 match self {
683 Self::ContentBlock(content) => content.to_markdown(cx).to_string(),
684 Self::Diff(diff) => diff.read(cx).to_markdown(cx),
685 Self::Terminal(terminal) => terminal.read(cx).to_markdown(cx),
686 }
687 }
688}
689
690#[derive(Debug, PartialEq)]
691pub enum ToolCallUpdate {
692 UpdateFields(acp::ToolCallUpdate),
693 UpdateDiff(ToolCallUpdateDiff),
694 UpdateTerminal(ToolCallUpdateTerminal),
695}
696
697impl ToolCallUpdate {
698 fn id(&self) -> &acp::ToolCallId {
699 match self {
700 Self::UpdateFields(update) => &update.tool_call_id,
701 Self::UpdateDiff(diff) => &diff.id,
702 Self::UpdateTerminal(terminal) => &terminal.id,
703 }
704 }
705}
706
707impl From<acp::ToolCallUpdate> for ToolCallUpdate {
708 fn from(update: acp::ToolCallUpdate) -> Self {
709 Self::UpdateFields(update)
710 }
711}
712
713impl From<ToolCallUpdateDiff> for ToolCallUpdate {
714 fn from(diff: ToolCallUpdateDiff) -> Self {
715 Self::UpdateDiff(diff)
716 }
717}
718
719#[derive(Debug, PartialEq)]
720pub struct ToolCallUpdateDiff {
721 pub id: acp::ToolCallId,
722 pub diff: Entity<Diff>,
723}
724
725impl From<ToolCallUpdateTerminal> for ToolCallUpdate {
726 fn from(terminal: ToolCallUpdateTerminal) -> Self {
727 Self::UpdateTerminal(terminal)
728 }
729}
730
731#[derive(Debug, PartialEq)]
732pub struct ToolCallUpdateTerminal {
733 pub id: acp::ToolCallId,
734 pub terminal: Entity<Terminal>,
735}
736
737#[derive(Debug, Default)]
738pub struct Plan {
739 pub entries: Vec<PlanEntry>,
740}
741
742#[derive(Debug)]
743pub struct PlanStats<'a> {
744 pub in_progress_entry: Option<&'a PlanEntry>,
745 pub pending: u32,
746 pub completed: u32,
747}
748
749impl Plan {
750 pub fn is_empty(&self) -> bool {
751 self.entries.is_empty()
752 }
753
754 pub fn stats(&self) -> PlanStats<'_> {
755 let mut stats = PlanStats {
756 in_progress_entry: None,
757 pending: 0,
758 completed: 0,
759 };
760
761 for entry in &self.entries {
762 match &entry.status {
763 acp::PlanEntryStatus::Pending => {
764 stats.pending += 1;
765 }
766 acp::PlanEntryStatus::InProgress => {
767 stats.in_progress_entry = stats.in_progress_entry.or(Some(entry));
768 }
769 acp::PlanEntryStatus::Completed => {
770 stats.completed += 1;
771 }
772 _ => {}
773 }
774 }
775
776 stats
777 }
778}
779
780#[derive(Debug)]
781pub struct PlanEntry {
782 pub content: Entity<Markdown>,
783 pub priority: acp::PlanEntryPriority,
784 pub status: acp::PlanEntryStatus,
785}
786
787impl PlanEntry {
788 pub fn from_acp(entry: acp::PlanEntry, cx: &mut App) -> Self {
789 Self {
790 content: cx.new(|cx| Markdown::new(entry.content.into(), None, None, cx)),
791 priority: entry.priority,
792 status: entry.status,
793 }
794 }
795}
796
797#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
798pub struct TokenUsage {
799 pub max_tokens: u64,
800 pub used_tokens: u64,
801}
802
803impl TokenUsage {
804 pub fn ratio(&self) -> TokenUsageRatio {
805 #[cfg(debug_assertions)]
806 let warning_threshold: f32 = std::env::var("ZED_THREAD_WARNING_THRESHOLD")
807 .unwrap_or("0.8".to_string())
808 .parse()
809 .unwrap();
810 #[cfg(not(debug_assertions))]
811 let warning_threshold: f32 = 0.8;
812
813 // When the maximum is unknown because there is no selected model,
814 // avoid showing the token limit warning.
815 if self.max_tokens == 0 {
816 TokenUsageRatio::Normal
817 } else if self.used_tokens >= self.max_tokens {
818 TokenUsageRatio::Exceeded
819 } else if self.used_tokens as f32 / self.max_tokens as f32 >= warning_threshold {
820 TokenUsageRatio::Warning
821 } else {
822 TokenUsageRatio::Normal
823 }
824 }
825}
826
827#[derive(Debug, Clone, PartialEq, Eq)]
828pub enum TokenUsageRatio {
829 Normal,
830 Warning,
831 Exceeded,
832}
833
834#[derive(Debug, Clone)]
835pub struct RetryStatus {
836 pub last_error: SharedString,
837 pub attempt: usize,
838 pub max_attempts: usize,
839 pub started_at: Instant,
840 pub duration: Duration,
841}
842
843pub struct AcpThread {
844 title: SharedString,
845 entries: Vec<AgentThreadEntry>,
846 plan: Plan,
847 project: Entity<Project>,
848 action_log: Entity<ActionLog>,
849 shared_buffers: HashMap<Entity<Buffer>, BufferSnapshot>,
850 send_task: Option<Task<()>>,
851 connection: Rc<dyn AgentConnection>,
852 session_id: acp::SessionId,
853 token_usage: Option<TokenUsage>,
854 prompt_capabilities: acp::PromptCapabilities,
855 _observe_prompt_capabilities: Task<anyhow::Result<()>>,
856 terminals: HashMap<acp::TerminalId, Entity<Terminal>>,
857 pending_terminal_output: HashMap<acp::TerminalId, Vec<Vec<u8>>>,
858 pending_terminal_exit: HashMap<acp::TerminalId, acp::TerminalExitStatus>,
859}
860
861impl From<&AcpThread> for ActionLogTelemetry {
862 fn from(value: &AcpThread) -> Self {
863 Self {
864 agent_telemetry_id: value.connection().telemetry_id(),
865 session_id: value.session_id.0.clone(),
866 }
867 }
868}
869
870#[derive(Debug)]
871pub enum AcpThreadEvent {
872 NewEntry,
873 TitleUpdated,
874 TokenUsageUpdated,
875 EntryUpdated(usize),
876 EntriesRemoved(Range<usize>),
877 ToolAuthorizationRequired,
878 Retry(RetryStatus),
879 Stopped,
880 Error,
881 LoadError(LoadError),
882 PromptCapabilitiesUpdated,
883 Refusal,
884 AvailableCommandsUpdated(Vec<acp::AvailableCommand>),
885 ModeUpdated(acp::SessionModeId),
886}
887
888impl EventEmitter<AcpThreadEvent> for AcpThread {}
889
890#[derive(Debug, Clone)]
891pub enum TerminalProviderEvent {
892 Created {
893 terminal_id: acp::TerminalId,
894 label: String,
895 cwd: Option<PathBuf>,
896 output_byte_limit: Option<u64>,
897 terminal: Entity<::terminal::Terminal>,
898 },
899 Output {
900 terminal_id: acp::TerminalId,
901 data: Vec<u8>,
902 },
903 TitleChanged {
904 terminal_id: acp::TerminalId,
905 title: String,
906 },
907 Exit {
908 terminal_id: acp::TerminalId,
909 status: acp::TerminalExitStatus,
910 },
911}
912
913#[derive(Debug, Clone)]
914pub enum TerminalProviderCommand {
915 WriteInput {
916 terminal_id: acp::TerminalId,
917 bytes: Vec<u8>,
918 },
919 Resize {
920 terminal_id: acp::TerminalId,
921 cols: u16,
922 rows: u16,
923 },
924 Close {
925 terminal_id: acp::TerminalId,
926 },
927}
928
929impl AcpThread {
930 pub fn on_terminal_provider_event(
931 &mut self,
932 event: TerminalProviderEvent,
933 cx: &mut Context<Self>,
934 ) {
935 match event {
936 TerminalProviderEvent::Created {
937 terminal_id,
938 label,
939 cwd,
940 output_byte_limit,
941 terminal,
942 } => {
943 let entity = self.register_terminal_created(
944 terminal_id.clone(),
945 label,
946 cwd,
947 output_byte_limit,
948 terminal,
949 cx,
950 );
951
952 if let Some(mut chunks) = self.pending_terminal_output.remove(&terminal_id) {
953 for data in chunks.drain(..) {
954 entity.update(cx, |term, cx| {
955 term.inner().update(cx, |inner, cx| {
956 inner.write_output(&data, cx);
957 })
958 });
959 }
960 }
961
962 if let Some(_status) = self.pending_terminal_exit.remove(&terminal_id) {
963 entity.update(cx, |_term, cx| {
964 cx.notify();
965 });
966 }
967
968 cx.notify();
969 }
970 TerminalProviderEvent::Output { terminal_id, data } => {
971 if let Some(entity) = self.terminals.get(&terminal_id) {
972 entity.update(cx, |term, cx| {
973 term.inner().update(cx, |inner, cx| {
974 inner.write_output(&data, cx);
975 })
976 });
977 } else {
978 self.pending_terminal_output
979 .entry(terminal_id)
980 .or_default()
981 .push(data);
982 }
983 }
984 TerminalProviderEvent::TitleChanged { terminal_id, title } => {
985 if let Some(entity) = self.terminals.get(&terminal_id) {
986 entity.update(cx, |term, cx| {
987 term.inner().update(cx, |inner, cx| {
988 inner.breadcrumb_text = title;
989 cx.emit(::terminal::Event::BreadcrumbsChanged);
990 })
991 });
992 }
993 }
994 TerminalProviderEvent::Exit {
995 terminal_id,
996 status,
997 } => {
998 if let Some(entity) = self.terminals.get(&terminal_id) {
999 entity.update(cx, |_term, cx| {
1000 cx.notify();
1001 });
1002 } else {
1003 self.pending_terminal_exit.insert(terminal_id, status);
1004 }
1005 }
1006 }
1007 }
1008}
1009
1010#[derive(PartialEq, Eq, Debug)]
1011pub enum ThreadStatus {
1012 Idle,
1013 Generating,
1014}
1015
1016#[derive(Debug, Clone)]
1017pub enum LoadError {
1018 Unsupported {
1019 command: SharedString,
1020 current_version: SharedString,
1021 minimum_version: SharedString,
1022 },
1023 FailedToInstall(SharedString),
1024 Exited {
1025 status: ExitStatus,
1026 },
1027 Other(SharedString),
1028}
1029
1030impl Display for LoadError {
1031 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1032 match self {
1033 LoadError::Unsupported {
1034 command: path,
1035 current_version,
1036 minimum_version,
1037 } => {
1038 write!(
1039 f,
1040 "version {current_version} from {path} is not supported (need at least {minimum_version})"
1041 )
1042 }
1043 LoadError::FailedToInstall(msg) => write!(f, "Failed to install: {msg}"),
1044 LoadError::Exited { status } => write!(f, "Server exited with status {status}"),
1045 LoadError::Other(msg) => write!(f, "{msg}"),
1046 }
1047 }
1048}
1049
1050impl Error for LoadError {}
1051
1052impl AcpThread {
1053 pub fn new(
1054 title: impl Into<SharedString>,
1055 connection: Rc<dyn AgentConnection>,
1056 project: Entity<Project>,
1057 action_log: Entity<ActionLog>,
1058 session_id: acp::SessionId,
1059 mut prompt_capabilities_rx: watch::Receiver<acp::PromptCapabilities>,
1060 cx: &mut Context<Self>,
1061 ) -> Self {
1062 let prompt_capabilities = prompt_capabilities_rx.borrow().clone();
1063 let task = cx.spawn::<_, anyhow::Result<()>>(async move |this, cx| {
1064 loop {
1065 let caps = prompt_capabilities_rx.recv().await?;
1066 this.update(cx, |this, cx| {
1067 this.prompt_capabilities = caps;
1068 cx.emit(AcpThreadEvent::PromptCapabilitiesUpdated);
1069 })?;
1070 }
1071 });
1072
1073 Self {
1074 action_log,
1075 shared_buffers: Default::default(),
1076 entries: Default::default(),
1077 plan: Default::default(),
1078 title: title.into(),
1079 project,
1080 send_task: None,
1081 connection,
1082 session_id,
1083 token_usage: None,
1084 prompt_capabilities,
1085 _observe_prompt_capabilities: task,
1086 terminals: HashMap::default(),
1087 pending_terminal_output: HashMap::default(),
1088 pending_terminal_exit: HashMap::default(),
1089 }
1090 }
1091
1092 pub fn prompt_capabilities(&self) -> acp::PromptCapabilities {
1093 self.prompt_capabilities.clone()
1094 }
1095
1096 pub fn connection(&self) -> &Rc<dyn AgentConnection> {
1097 &self.connection
1098 }
1099
1100 pub fn action_log(&self) -> &Entity<ActionLog> {
1101 &self.action_log
1102 }
1103
1104 pub fn project(&self) -> &Entity<Project> {
1105 &self.project
1106 }
1107
1108 pub fn title(&self) -> SharedString {
1109 self.title.clone()
1110 }
1111
1112 pub fn entries(&self) -> &[AgentThreadEntry] {
1113 &self.entries
1114 }
1115
1116 pub fn session_id(&self) -> &acp::SessionId {
1117 &self.session_id
1118 }
1119
1120 pub fn status(&self) -> ThreadStatus {
1121 if self.send_task.is_some() {
1122 ThreadStatus::Generating
1123 } else {
1124 ThreadStatus::Idle
1125 }
1126 }
1127
1128 pub fn token_usage(&self) -> Option<&TokenUsage> {
1129 self.token_usage.as_ref()
1130 }
1131
1132 pub fn has_pending_edit_tool_calls(&self) -> bool {
1133 for entry in self.entries.iter().rev() {
1134 match entry {
1135 AgentThreadEntry::UserMessage(_) => return false,
1136 AgentThreadEntry::ToolCall(
1137 call @ ToolCall {
1138 status: ToolCallStatus::InProgress | ToolCallStatus::Pending,
1139 ..
1140 },
1141 ) if call.diffs().next().is_some() => {
1142 return true;
1143 }
1144 AgentThreadEntry::ToolCall(_) | AgentThreadEntry::AssistantMessage(_) => {}
1145 }
1146 }
1147
1148 false
1149 }
1150
1151 pub fn used_tools_since_last_user_message(&self) -> bool {
1152 for entry in self.entries.iter().rev() {
1153 match entry {
1154 AgentThreadEntry::UserMessage(..) => return false,
1155 AgentThreadEntry::AssistantMessage(..) => continue,
1156 AgentThreadEntry::ToolCall(..) => return true,
1157 }
1158 }
1159
1160 false
1161 }
1162
1163 pub fn handle_session_update(
1164 &mut self,
1165 update: acp::SessionUpdate,
1166 cx: &mut Context<Self>,
1167 ) -> Result<(), acp::Error> {
1168 match update {
1169 acp::SessionUpdate::UserMessageChunk(acp::ContentChunk { content, .. }) => {
1170 self.push_user_content_block(None, content, cx);
1171 }
1172 acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk { content, .. }) => {
1173 self.push_assistant_content_block(content, false, cx);
1174 }
1175 acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk { content, .. }) => {
1176 self.push_assistant_content_block(content, true, cx);
1177 }
1178 acp::SessionUpdate::ToolCall(tool_call) => {
1179 self.upsert_tool_call(tool_call, cx)?;
1180 }
1181 acp::SessionUpdate::ToolCallUpdate(tool_call_update) => {
1182 self.update_tool_call(tool_call_update, cx)?;
1183 }
1184 acp::SessionUpdate::Plan(plan) => {
1185 self.update_plan(plan, cx);
1186 }
1187 acp::SessionUpdate::AvailableCommandsUpdate(acp::AvailableCommandsUpdate {
1188 available_commands,
1189 ..
1190 }) => cx.emit(AcpThreadEvent::AvailableCommandsUpdated(available_commands)),
1191 acp::SessionUpdate::CurrentModeUpdate(acp::CurrentModeUpdate {
1192 current_mode_id,
1193 ..
1194 }) => cx.emit(AcpThreadEvent::ModeUpdated(current_mode_id)),
1195 _ => {}
1196 }
1197 Ok(())
1198 }
1199
1200 pub fn push_user_content_block(
1201 &mut self,
1202 message_id: Option<UserMessageId>,
1203 chunk: acp::ContentBlock,
1204 cx: &mut Context<Self>,
1205 ) {
1206 self.push_user_content_block_with_indent(message_id, chunk, false, cx)
1207 }
1208
1209 pub fn push_user_content_block_with_indent(
1210 &mut self,
1211 message_id: Option<UserMessageId>,
1212 chunk: acp::ContentBlock,
1213 indented: bool,
1214 cx: &mut Context<Self>,
1215 ) {
1216 let language_registry = self.project.read(cx).languages().clone();
1217 let path_style = self.project.read(cx).path_style(cx);
1218 let entries_len = self.entries.len();
1219
1220 if let Some(last_entry) = self.entries.last_mut()
1221 && let AgentThreadEntry::UserMessage(UserMessage {
1222 id,
1223 content,
1224 chunks,
1225 indented: existing_indented,
1226 ..
1227 }) = last_entry
1228 && *existing_indented == indented
1229 {
1230 *id = message_id.or(id.take());
1231 content.append(chunk.clone(), &language_registry, path_style, cx);
1232 chunks.push(chunk);
1233 let idx = entries_len - 1;
1234 cx.emit(AcpThreadEvent::EntryUpdated(idx));
1235 } else {
1236 let content = ContentBlock::new(chunk.clone(), &language_registry, path_style, cx);
1237 self.push_entry(
1238 AgentThreadEntry::UserMessage(UserMessage {
1239 id: message_id,
1240 content,
1241 chunks: vec![chunk],
1242 checkpoint: None,
1243 indented,
1244 }),
1245 cx,
1246 );
1247 }
1248 }
1249
1250 pub fn push_assistant_content_block(
1251 &mut self,
1252 chunk: acp::ContentBlock,
1253 is_thought: bool,
1254 cx: &mut Context<Self>,
1255 ) {
1256 self.push_assistant_content_block_with_indent(chunk, is_thought, false, cx)
1257 }
1258
1259 pub fn push_assistant_content_block_with_indent(
1260 &mut self,
1261 chunk: acp::ContentBlock,
1262 is_thought: bool,
1263 indented: bool,
1264 cx: &mut Context<Self>,
1265 ) {
1266 let language_registry = self.project.read(cx).languages().clone();
1267 let path_style = self.project.read(cx).path_style(cx);
1268 let entries_len = self.entries.len();
1269 if let Some(last_entry) = self.entries.last_mut()
1270 && let AgentThreadEntry::AssistantMessage(AssistantMessage {
1271 chunks,
1272 indented: existing_indented,
1273 }) = last_entry
1274 && *existing_indented == indented
1275 {
1276 let idx = entries_len - 1;
1277 cx.emit(AcpThreadEvent::EntryUpdated(idx));
1278 match (chunks.last_mut(), is_thought) {
1279 (Some(AssistantMessageChunk::Message { block }), false)
1280 | (Some(AssistantMessageChunk::Thought { block }), true) => {
1281 block.append(chunk, &language_registry, path_style, cx)
1282 }
1283 _ => {
1284 let block = ContentBlock::new(chunk, &language_registry, path_style, cx);
1285 if is_thought {
1286 chunks.push(AssistantMessageChunk::Thought { block })
1287 } else {
1288 chunks.push(AssistantMessageChunk::Message { block })
1289 }
1290 }
1291 }
1292 } else {
1293 let block = ContentBlock::new(chunk, &language_registry, path_style, cx);
1294 let chunk = if is_thought {
1295 AssistantMessageChunk::Thought { block }
1296 } else {
1297 AssistantMessageChunk::Message { block }
1298 };
1299
1300 self.push_entry(
1301 AgentThreadEntry::AssistantMessage(AssistantMessage {
1302 chunks: vec![chunk],
1303 indented,
1304 }),
1305 cx,
1306 );
1307 }
1308 }
1309
1310 fn push_entry(&mut self, entry: AgentThreadEntry, cx: &mut Context<Self>) {
1311 self.entries.push(entry);
1312 cx.emit(AcpThreadEvent::NewEntry);
1313 }
1314
1315 pub fn can_set_title(&mut self, cx: &mut Context<Self>) -> bool {
1316 self.connection.set_title(&self.session_id, cx).is_some()
1317 }
1318
1319 pub fn set_title(&mut self, title: SharedString, cx: &mut Context<Self>) -> Task<Result<()>> {
1320 if title != self.title {
1321 self.title = title.clone();
1322 cx.emit(AcpThreadEvent::TitleUpdated);
1323 if let Some(set_title) = self.connection.set_title(&self.session_id, cx) {
1324 return set_title.run(title, cx);
1325 }
1326 }
1327 Task::ready(Ok(()))
1328 }
1329
1330 pub fn update_token_usage(&mut self, usage: Option<TokenUsage>, cx: &mut Context<Self>) {
1331 self.token_usage = usage;
1332 cx.emit(AcpThreadEvent::TokenUsageUpdated);
1333 }
1334
1335 pub fn update_retry_status(&mut self, status: RetryStatus, cx: &mut Context<Self>) {
1336 cx.emit(AcpThreadEvent::Retry(status));
1337 }
1338
1339 pub fn update_tool_call(
1340 &mut self,
1341 update: impl Into<ToolCallUpdate>,
1342 cx: &mut Context<Self>,
1343 ) -> Result<()> {
1344 let update = update.into();
1345 let languages = self.project.read(cx).languages().clone();
1346 let path_style = self.project.read(cx).path_style(cx);
1347
1348 let ix = match self.index_for_tool_call(update.id()) {
1349 Some(ix) => ix,
1350 None => {
1351 // Tool call not found - create a failed tool call entry
1352 let failed_tool_call = ToolCall {
1353 id: update.id().clone(),
1354 label: cx.new(|cx| Markdown::new("Tool call not found".into(), None, None, cx)),
1355 kind: acp::ToolKind::Fetch,
1356 content: vec![ToolCallContent::ContentBlock(ContentBlock::new(
1357 "Tool call not found".into(),
1358 &languages,
1359 path_style,
1360 cx,
1361 ))],
1362 status: ToolCallStatus::Failed,
1363 locations: Vec::new(),
1364 resolved_locations: Vec::new(),
1365 raw_input: None,
1366 raw_input_markdown: None,
1367 raw_output: None,
1368 };
1369 self.push_entry(AgentThreadEntry::ToolCall(failed_tool_call), cx);
1370 return Ok(());
1371 }
1372 };
1373 let AgentThreadEntry::ToolCall(call) = &mut self.entries[ix] else {
1374 unreachable!()
1375 };
1376
1377 match update {
1378 ToolCallUpdate::UpdateFields(update) => {
1379 let location_updated = update.fields.locations.is_some();
1380 call.update_fields(update.fields, languages, path_style, &self.terminals, cx)?;
1381 if location_updated {
1382 self.resolve_locations(update.tool_call_id, cx);
1383 }
1384 }
1385 ToolCallUpdate::UpdateDiff(update) => {
1386 call.content.clear();
1387 call.content.push(ToolCallContent::Diff(update.diff));
1388 }
1389 ToolCallUpdate::UpdateTerminal(update) => {
1390 call.content.clear();
1391 call.content
1392 .push(ToolCallContent::Terminal(update.terminal));
1393 }
1394 }
1395
1396 cx.emit(AcpThreadEvent::EntryUpdated(ix));
1397
1398 Ok(())
1399 }
1400
1401 /// Updates a tool call if id matches an existing entry, otherwise inserts a new one.
1402 pub fn upsert_tool_call(
1403 &mut self,
1404 tool_call: acp::ToolCall,
1405 cx: &mut Context<Self>,
1406 ) -> Result<(), acp::Error> {
1407 let status = tool_call.status.into();
1408 self.upsert_tool_call_inner(tool_call.into(), status, cx)
1409 }
1410
1411 /// Fails if id does not match an existing entry.
1412 pub fn upsert_tool_call_inner(
1413 &mut self,
1414 update: acp::ToolCallUpdate,
1415 status: ToolCallStatus,
1416 cx: &mut Context<Self>,
1417 ) -> Result<(), acp::Error> {
1418 let language_registry = self.project.read(cx).languages().clone();
1419 let path_style = self.project.read(cx).path_style(cx);
1420 let id = update.tool_call_id.clone();
1421
1422 let agent_telemetry_id = self.connection().telemetry_id();
1423 let session = self.session_id();
1424 if let ToolCallStatus::Completed | ToolCallStatus::Failed = status {
1425 let status = if matches!(status, ToolCallStatus::Completed) {
1426 "completed"
1427 } else {
1428 "failed"
1429 };
1430 telemetry::event!(
1431 "Agent Tool Call Completed",
1432 agent_telemetry_id,
1433 session,
1434 status
1435 );
1436 }
1437
1438 if let Some(ix) = self.index_for_tool_call(&id) {
1439 let AgentThreadEntry::ToolCall(call) = &mut self.entries[ix] else {
1440 unreachable!()
1441 };
1442
1443 call.update_fields(
1444 update.fields,
1445 language_registry,
1446 path_style,
1447 &self.terminals,
1448 cx,
1449 )?;
1450 call.status = status;
1451
1452 cx.emit(AcpThreadEvent::EntryUpdated(ix));
1453 } else {
1454 let call = ToolCall::from_acp(
1455 update.try_into()?,
1456 status,
1457 language_registry,
1458 self.project.read(cx).path_style(cx),
1459 &self.terminals,
1460 cx,
1461 )?;
1462 self.push_entry(AgentThreadEntry::ToolCall(call), cx);
1463 };
1464
1465 self.resolve_locations(id, cx);
1466 Ok(())
1467 }
1468
1469 fn index_for_tool_call(&self, id: &acp::ToolCallId) -> Option<usize> {
1470 self.entries
1471 .iter()
1472 .enumerate()
1473 .rev()
1474 .find_map(|(index, entry)| {
1475 if let AgentThreadEntry::ToolCall(tool_call) = entry
1476 && &tool_call.id == id
1477 {
1478 Some(index)
1479 } else {
1480 None
1481 }
1482 })
1483 }
1484
1485 fn tool_call_mut(&mut self, id: &acp::ToolCallId) -> Option<(usize, &mut ToolCall)> {
1486 // The tool call we are looking for is typically the last one, or very close to the end.
1487 // At the moment, it doesn't seem like a hashmap would be a good fit for this use case.
1488 self.entries
1489 .iter_mut()
1490 .enumerate()
1491 .rev()
1492 .find_map(|(index, tool_call)| {
1493 if let AgentThreadEntry::ToolCall(tool_call) = tool_call
1494 && &tool_call.id == id
1495 {
1496 Some((index, tool_call))
1497 } else {
1498 None
1499 }
1500 })
1501 }
1502
1503 pub fn tool_call(&mut self, id: &acp::ToolCallId) -> Option<(usize, &ToolCall)> {
1504 self.entries
1505 .iter()
1506 .enumerate()
1507 .rev()
1508 .find_map(|(index, tool_call)| {
1509 if let AgentThreadEntry::ToolCall(tool_call) = tool_call
1510 && &tool_call.id == id
1511 {
1512 Some((index, tool_call))
1513 } else {
1514 None
1515 }
1516 })
1517 }
1518
1519 pub fn resolve_locations(&mut self, id: acp::ToolCallId, cx: &mut Context<Self>) {
1520 let project = self.project.clone();
1521 let Some((_, tool_call)) = self.tool_call_mut(&id) else {
1522 return;
1523 };
1524 let task = tool_call.resolve_locations(project, cx);
1525 cx.spawn(async move |this, cx| {
1526 let resolved_locations = task.await;
1527
1528 this.update(cx, |this, cx| {
1529 let project = this.project.clone();
1530
1531 for location in resolved_locations.iter().flatten() {
1532 this.shared_buffers
1533 .insert(location.buffer.clone(), location.buffer.read(cx).snapshot());
1534 }
1535 let Some((ix, tool_call)) = this.tool_call_mut(&id) else {
1536 return;
1537 };
1538
1539 if let Some(Some(location)) = resolved_locations.last() {
1540 project.update(cx, |project, cx| {
1541 let should_ignore = if let Some(agent_location) = project
1542 .agent_location()
1543 .filter(|agent_location| agent_location.buffer == location.buffer)
1544 {
1545 let snapshot = location.buffer.read(cx).snapshot();
1546 let old_position = agent_location.position.to_point(&snapshot);
1547 let new_position = location.position.to_point(&snapshot);
1548
1549 // ignore this so that when we get updates from the edit tool
1550 // the position doesn't reset to the startof line
1551 old_position.row == new_position.row
1552 && old_position.column > new_position.column
1553 } else {
1554 false
1555 };
1556 if !should_ignore {
1557 project.set_agent_location(Some(location.into()), cx);
1558 }
1559 });
1560 }
1561
1562 let resolved_locations = resolved_locations
1563 .iter()
1564 .map(|l| l.as_ref().map(|l| AgentLocation::from(l)))
1565 .collect::<Vec<_>>();
1566
1567 if tool_call.resolved_locations != resolved_locations {
1568 tool_call.resolved_locations = resolved_locations;
1569 cx.emit(AcpThreadEvent::EntryUpdated(ix));
1570 }
1571 })
1572 })
1573 .detach();
1574 }
1575
1576 pub fn request_tool_call_authorization(
1577 &mut self,
1578 tool_call: acp::ToolCallUpdate,
1579 options: Vec<acp::PermissionOption>,
1580 respect_always_allow_setting: bool,
1581 cx: &mut Context<Self>,
1582 ) -> Result<BoxFuture<'static, acp::RequestPermissionOutcome>> {
1583 let (tx, rx) = oneshot::channel();
1584
1585 if respect_always_allow_setting && AgentSettings::get_global(cx).always_allow_tool_actions {
1586 // Don't use AllowAlways, because then if you were to turn off always_allow_tool_actions,
1587 // some tools would (incorrectly) continue to auto-accept.
1588 if let Some(allow_once_option) = options.iter().find_map(|option| {
1589 if matches!(option.kind, acp::PermissionOptionKind::AllowOnce) {
1590 Some(option.option_id.clone())
1591 } else {
1592 None
1593 }
1594 }) {
1595 self.upsert_tool_call_inner(tool_call, ToolCallStatus::Pending, cx)?;
1596 return Ok(async {
1597 acp::RequestPermissionOutcome::Selected(acp::SelectedPermissionOutcome::new(
1598 allow_once_option,
1599 ))
1600 }
1601 .boxed());
1602 }
1603 }
1604
1605 let status = ToolCallStatus::WaitingForConfirmation {
1606 options,
1607 respond_tx: tx,
1608 };
1609
1610 self.upsert_tool_call_inner(tool_call, status, cx)?;
1611 cx.emit(AcpThreadEvent::ToolAuthorizationRequired);
1612
1613 let fut = async {
1614 match rx.await {
1615 Ok(option) => acp::RequestPermissionOutcome::Selected(
1616 acp::SelectedPermissionOutcome::new(option),
1617 ),
1618 Err(oneshot::Canceled) => acp::RequestPermissionOutcome::Cancelled,
1619 }
1620 }
1621 .boxed();
1622
1623 Ok(fut)
1624 }
1625
1626 pub fn authorize_tool_call(
1627 &mut self,
1628 id: acp::ToolCallId,
1629 option_id: acp::PermissionOptionId,
1630 option_kind: acp::PermissionOptionKind,
1631 cx: &mut Context<Self>,
1632 ) {
1633 let Some((ix, call)) = self.tool_call_mut(&id) else {
1634 return;
1635 };
1636
1637 let new_status = match option_kind {
1638 acp::PermissionOptionKind::RejectOnce | acp::PermissionOptionKind::RejectAlways => {
1639 ToolCallStatus::Rejected
1640 }
1641 acp::PermissionOptionKind::AllowOnce | acp::PermissionOptionKind::AllowAlways => {
1642 ToolCallStatus::InProgress
1643 }
1644 _ => ToolCallStatus::InProgress,
1645 };
1646
1647 let curr_status = mem::replace(&mut call.status, new_status);
1648
1649 if let ToolCallStatus::WaitingForConfirmation { respond_tx, .. } = curr_status {
1650 respond_tx.send(option_id).log_err();
1651 } else if cfg!(debug_assertions) {
1652 panic!("tried to authorize an already authorized tool call");
1653 }
1654
1655 cx.emit(AcpThreadEvent::EntryUpdated(ix));
1656 }
1657
1658 pub fn first_tool_awaiting_confirmation(&self) -> Option<&ToolCall> {
1659 let mut first_tool_call = None;
1660
1661 for entry in self.entries.iter().rev() {
1662 match &entry {
1663 AgentThreadEntry::ToolCall(call) => {
1664 if let ToolCallStatus::WaitingForConfirmation { .. } = call.status {
1665 first_tool_call = Some(call);
1666 } else {
1667 continue;
1668 }
1669 }
1670 AgentThreadEntry::UserMessage(_) | AgentThreadEntry::AssistantMessage(_) => {
1671 // Reached the beginning of the turn.
1672 // If we had pending permission requests in the previous turn, they have been cancelled.
1673 break;
1674 }
1675 }
1676 }
1677
1678 first_tool_call
1679 }
1680
1681 pub fn plan(&self) -> &Plan {
1682 &self.plan
1683 }
1684
1685 pub fn update_plan(&mut self, request: acp::Plan, cx: &mut Context<Self>) {
1686 let new_entries_len = request.entries.len();
1687 let mut new_entries = request.entries.into_iter();
1688
1689 // Reuse existing markdown to prevent flickering
1690 for (old, new) in self.plan.entries.iter_mut().zip(new_entries.by_ref()) {
1691 let PlanEntry {
1692 content,
1693 priority,
1694 status,
1695 } = old;
1696 content.update(cx, |old, cx| {
1697 old.replace(new.content, cx);
1698 });
1699 *priority = new.priority;
1700 *status = new.status;
1701 }
1702 for new in new_entries {
1703 self.plan.entries.push(PlanEntry::from_acp(new, cx))
1704 }
1705 self.plan.entries.truncate(new_entries_len);
1706
1707 cx.notify();
1708 }
1709
1710 fn clear_completed_plan_entries(&mut self, cx: &mut Context<Self>) {
1711 self.plan
1712 .entries
1713 .retain(|entry| !matches!(entry.status, acp::PlanEntryStatus::Completed));
1714 cx.notify();
1715 }
1716
1717 #[cfg(any(test, feature = "test-support"))]
1718 pub fn send_raw(
1719 &mut self,
1720 message: &str,
1721 cx: &mut Context<Self>,
1722 ) -> BoxFuture<'static, Result<()>> {
1723 self.send(vec![message.into()], cx)
1724 }
1725
1726 pub fn send(
1727 &mut self,
1728 message: Vec<acp::ContentBlock>,
1729 cx: &mut Context<Self>,
1730 ) -> BoxFuture<'static, Result<()>> {
1731 let block = ContentBlock::new_combined(
1732 message.clone(),
1733 self.project.read(cx).languages().clone(),
1734 self.project.read(cx).path_style(cx),
1735 cx,
1736 );
1737 let request = acp::PromptRequest::new(self.session_id.clone(), message.clone());
1738 let git_store = self.project.read(cx).git_store().clone();
1739
1740 let message_id = if self.connection.truncate(&self.session_id, cx).is_some() {
1741 Some(UserMessageId::new())
1742 } else {
1743 None
1744 };
1745
1746 self.run_turn(cx, async move |this, cx| {
1747 this.update(cx, |this, cx| {
1748 this.push_entry(
1749 AgentThreadEntry::UserMessage(UserMessage {
1750 id: message_id.clone(),
1751 content: block,
1752 chunks: message,
1753 checkpoint: None,
1754 indented: false,
1755 }),
1756 cx,
1757 );
1758 })
1759 .ok();
1760
1761 let old_checkpoint = git_store
1762 .update(cx, |git, cx| git.checkpoint(cx))?
1763 .await
1764 .context("failed to get old checkpoint")
1765 .log_err();
1766 this.update(cx, |this, cx| {
1767 if let Some((_ix, message)) = this.last_user_message() {
1768 message.checkpoint = old_checkpoint.map(|git_checkpoint| Checkpoint {
1769 git_checkpoint,
1770 show: false,
1771 });
1772 }
1773 this.connection.prompt(message_id, request, cx)
1774 })?
1775 .await
1776 })
1777 }
1778
1779 pub fn can_resume(&self, cx: &App) -> bool {
1780 self.connection.resume(&self.session_id, cx).is_some()
1781 }
1782
1783 pub fn resume(&mut self, cx: &mut Context<Self>) -> BoxFuture<'static, Result<()>> {
1784 self.run_turn(cx, async move |this, cx| {
1785 this.update(cx, |this, cx| {
1786 this.connection
1787 .resume(&this.session_id, cx)
1788 .map(|resume| resume.run(cx))
1789 })?
1790 .context("resuming a session is not supported")?
1791 .await
1792 })
1793 }
1794
1795 fn run_turn(
1796 &mut self,
1797 cx: &mut Context<Self>,
1798 f: impl 'static + AsyncFnOnce(WeakEntity<Self>, &mut AsyncApp) -> Result<acp::PromptResponse>,
1799 ) -> BoxFuture<'static, Result<()>> {
1800 self.clear_completed_plan_entries(cx);
1801
1802 let (tx, rx) = oneshot::channel();
1803 let cancel_task = self.cancel(cx);
1804
1805 self.send_task = Some(cx.spawn(async move |this, cx| {
1806 cancel_task.await;
1807 tx.send(f(this, cx).await).ok();
1808 }));
1809
1810 cx.spawn(async move |this, cx| {
1811 let response = rx.await;
1812
1813 this.update(cx, |this, cx| this.update_last_checkpoint(cx))?
1814 .await?;
1815
1816 this.update(cx, |this, cx| {
1817 this.project
1818 .update(cx, |project, cx| project.set_agent_location(None, cx));
1819 match response {
1820 Ok(Err(e)) => {
1821 this.send_task.take();
1822 cx.emit(AcpThreadEvent::Error);
1823 Err(e)
1824 }
1825 result => {
1826 let canceled = matches!(
1827 result,
1828 Ok(Ok(acp::PromptResponse {
1829 stop_reason: acp::StopReason::Cancelled,
1830 ..
1831 }))
1832 );
1833
1834 // We only take the task if the current prompt wasn't canceled.
1835 //
1836 // This prompt may have been canceled because another one was sent
1837 // while it was still generating. In these cases, dropping `send_task`
1838 // would cause the next generation to be canceled.
1839 if !canceled {
1840 this.send_task.take();
1841 }
1842
1843 // Handle refusal - distinguish between user prompt and tool call refusals
1844 if let Ok(Ok(acp::PromptResponse {
1845 stop_reason: acp::StopReason::Refusal,
1846 ..
1847 })) = result
1848 {
1849 if let Some((user_msg_ix, _)) = this.last_user_message() {
1850 // Check if there's a completed tool call with results after the last user message
1851 // This indicates the refusal is in response to tool output, not the user's prompt
1852 let has_completed_tool_call_after_user_msg =
1853 this.entries.iter().skip(user_msg_ix + 1).any(|entry| {
1854 if let AgentThreadEntry::ToolCall(tool_call) = entry {
1855 // Check if the tool call has completed and has output
1856 matches!(tool_call.status, ToolCallStatus::Completed)
1857 && tool_call.raw_output.is_some()
1858 } else {
1859 false
1860 }
1861 });
1862
1863 if has_completed_tool_call_after_user_msg {
1864 // Refusal is due to tool output - don't truncate, just notify
1865 // The model refused based on what the tool returned
1866 cx.emit(AcpThreadEvent::Refusal);
1867 } else {
1868 // User prompt was refused - truncate back to before the user message
1869 let range = user_msg_ix..this.entries.len();
1870 if range.start < range.end {
1871 this.entries.truncate(user_msg_ix);
1872 cx.emit(AcpThreadEvent::EntriesRemoved(range));
1873 }
1874 cx.emit(AcpThreadEvent::Refusal);
1875 }
1876 } else {
1877 // No user message found, treat as general refusal
1878 cx.emit(AcpThreadEvent::Refusal);
1879 }
1880 }
1881
1882 cx.emit(AcpThreadEvent::Stopped);
1883 Ok(())
1884 }
1885 }
1886 })?
1887 })
1888 .boxed()
1889 }
1890
1891 pub fn cancel(&mut self, cx: &mut Context<Self>) -> Task<()> {
1892 let Some(send_task) = self.send_task.take() else {
1893 return Task::ready(());
1894 };
1895
1896 for entry in self.entries.iter_mut() {
1897 if let AgentThreadEntry::ToolCall(call) = entry {
1898 let cancel = matches!(
1899 call.status,
1900 ToolCallStatus::Pending
1901 | ToolCallStatus::WaitingForConfirmation { .. }
1902 | ToolCallStatus::InProgress
1903 );
1904
1905 if cancel {
1906 call.status = ToolCallStatus::Canceled;
1907 }
1908 }
1909 }
1910
1911 self.connection.cancel(&self.session_id, cx);
1912
1913 // Wait for the send task to complete
1914 cx.foreground_executor().spawn(send_task)
1915 }
1916
1917 /// Restores the git working tree to the state at the given checkpoint (if one exists)
1918 pub fn restore_checkpoint(
1919 &mut self,
1920 id: UserMessageId,
1921 cx: &mut Context<Self>,
1922 ) -> Task<Result<()>> {
1923 let Some((_, message)) = self.user_message_mut(&id) else {
1924 return Task::ready(Err(anyhow!("message not found")));
1925 };
1926
1927 let checkpoint = message
1928 .checkpoint
1929 .as_ref()
1930 .map(|c| c.git_checkpoint.clone());
1931
1932 // Cancel any in-progress generation before restoring
1933 let cancel_task = self.cancel(cx);
1934 let rewind = self.rewind(id.clone(), cx);
1935 let git_store = self.project.read(cx).git_store().clone();
1936
1937 cx.spawn(async move |_, cx| {
1938 cancel_task.await;
1939 rewind.await?;
1940 if let Some(checkpoint) = checkpoint {
1941 git_store
1942 .update(cx, |git, cx| git.restore_checkpoint(checkpoint, cx))?
1943 .await?;
1944 }
1945
1946 Ok(())
1947 })
1948 }
1949
1950 /// Rewinds this thread to before the entry at `index`, removing it and all
1951 /// subsequent entries while rejecting any action_log changes made from that point.
1952 /// Unlike `restore_checkpoint`, this method does not restore from git.
1953 pub fn rewind(&mut self, id: UserMessageId, cx: &mut Context<Self>) -> Task<Result<()>> {
1954 let Some(truncate) = self.connection.truncate(&self.session_id, cx) else {
1955 return Task::ready(Err(anyhow!("not supported")));
1956 };
1957
1958 let telemetry = ActionLogTelemetry::from(&*self);
1959 cx.spawn(async move |this, cx| {
1960 cx.update(|cx| truncate.run(id.clone(), cx))?.await?;
1961 this.update(cx, |this, cx| {
1962 if let Some((ix, _)) = this.user_message_mut(&id) {
1963 // Collect all terminals from entries that will be removed
1964 let terminals_to_remove: Vec<acp::TerminalId> = this.entries[ix..]
1965 .iter()
1966 .flat_map(|entry| entry.terminals())
1967 .filter_map(|terminal| terminal.read(cx).id().clone().into())
1968 .collect();
1969
1970 let range = ix..this.entries.len();
1971 this.entries.truncate(ix);
1972 cx.emit(AcpThreadEvent::EntriesRemoved(range));
1973
1974 // Kill and remove the terminals
1975 for terminal_id in terminals_to_remove {
1976 if let Some(terminal) = this.terminals.remove(&terminal_id) {
1977 terminal.update(cx, |terminal, cx| {
1978 terminal.kill(cx);
1979 });
1980 }
1981 }
1982 }
1983 this.action_log().update(cx, |action_log, cx| {
1984 action_log.reject_all_edits(Some(telemetry), cx)
1985 })
1986 })?
1987 .await;
1988 Ok(())
1989 })
1990 }
1991
1992 fn update_last_checkpoint(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
1993 let git_store = self.project.read(cx).git_store().clone();
1994
1995 let old_checkpoint = if let Some((_, message)) = self.last_user_message() {
1996 if let Some(checkpoint) = message.checkpoint.as_ref() {
1997 checkpoint.git_checkpoint.clone()
1998 } else {
1999 return Task::ready(Ok(()));
2000 }
2001 } else {
2002 return Task::ready(Ok(()));
2003 };
2004
2005 let new_checkpoint = git_store.update(cx, |git, cx| git.checkpoint(cx));
2006 cx.spawn(async move |this, cx| {
2007 let new_checkpoint = new_checkpoint
2008 .await
2009 .context("failed to get new checkpoint")
2010 .log_err();
2011 if let Some(new_checkpoint) = new_checkpoint {
2012 let equal = git_store
2013 .update(cx, |git, cx| {
2014 git.compare_checkpoints(old_checkpoint.clone(), new_checkpoint, cx)
2015 })?
2016 .await
2017 .unwrap_or(true);
2018 this.update(cx, |this, cx| {
2019 let (ix, message) = this.last_user_message().context("no user message")?;
2020 let checkpoint = message.checkpoint.as_mut().context("no checkpoint")?;
2021 checkpoint.show = !equal;
2022 cx.emit(AcpThreadEvent::EntryUpdated(ix));
2023 anyhow::Ok(())
2024 })??;
2025 }
2026
2027 Ok(())
2028 })
2029 }
2030
2031 fn last_user_message(&mut self) -> Option<(usize, &mut UserMessage)> {
2032 self.entries
2033 .iter_mut()
2034 .enumerate()
2035 .rev()
2036 .find_map(|(ix, entry)| {
2037 if let AgentThreadEntry::UserMessage(message) = entry {
2038 Some((ix, message))
2039 } else {
2040 None
2041 }
2042 })
2043 }
2044
2045 fn user_message_mut(&mut self, id: &UserMessageId) -> Option<(usize, &mut UserMessage)> {
2046 self.entries.iter_mut().enumerate().find_map(|(ix, entry)| {
2047 if let AgentThreadEntry::UserMessage(message) = entry {
2048 if message.id.as_ref() == Some(id) {
2049 Some((ix, message))
2050 } else {
2051 None
2052 }
2053 } else {
2054 None
2055 }
2056 })
2057 }
2058
2059 pub fn read_text_file(
2060 &self,
2061 path: PathBuf,
2062 line: Option<u32>,
2063 limit: Option<u32>,
2064 reuse_shared_snapshot: bool,
2065 cx: &mut Context<Self>,
2066 ) -> Task<Result<String, acp::Error>> {
2067 // Args are 1-based, move to 0-based
2068 let line = line.unwrap_or_default().saturating_sub(1);
2069 let limit = limit.unwrap_or(u32::MAX);
2070 let project = self.project.clone();
2071 let action_log = self.action_log.clone();
2072 cx.spawn(async move |this, cx| {
2073 let load = project
2074 .update(cx, |project, cx| {
2075 let path = project
2076 .project_path_for_absolute_path(&path, cx)
2077 .ok_or_else(|| {
2078 acp::Error::resource_not_found(Some(path.display().to_string()))
2079 })?;
2080 Ok(project.open_buffer(path, cx))
2081 })
2082 .map_err(|e| acp::Error::internal_error().data(e.to_string()))
2083 .flatten()?;
2084
2085 let buffer = load.await?;
2086
2087 let snapshot = if reuse_shared_snapshot {
2088 this.read_with(cx, |this, _| {
2089 this.shared_buffers.get(&buffer.clone()).cloned()
2090 })
2091 .log_err()
2092 .flatten()
2093 } else {
2094 None
2095 };
2096
2097 let snapshot = if let Some(snapshot) = snapshot {
2098 snapshot
2099 } else {
2100 action_log.update(cx, |action_log, cx| {
2101 action_log.buffer_read(buffer.clone(), cx);
2102 })?;
2103
2104 let snapshot = buffer.update(cx, |buffer, _| buffer.snapshot())?;
2105 this.update(cx, |this, _| {
2106 this.shared_buffers.insert(buffer.clone(), snapshot.clone());
2107 })?;
2108 snapshot
2109 };
2110
2111 let max_point = snapshot.max_point();
2112 let start_position = Point::new(line, 0);
2113
2114 if start_position > max_point {
2115 return Err(acp::Error::invalid_params().data(format!(
2116 "Attempting to read beyond the end of the file, line {}:{}",
2117 max_point.row + 1,
2118 max_point.column
2119 )));
2120 }
2121
2122 let start = snapshot.anchor_before(start_position);
2123 let end = snapshot.anchor_before(Point::new(line.saturating_add(limit), 0));
2124
2125 project.update(cx, |project, cx| {
2126 project.set_agent_location(
2127 Some(AgentLocation {
2128 buffer: buffer.downgrade(),
2129 position: start,
2130 }),
2131 cx,
2132 );
2133 })?;
2134
2135 Ok(snapshot.text_for_range(start..end).collect::<String>())
2136 })
2137 }
2138
2139 pub fn write_text_file(
2140 &self,
2141 path: PathBuf,
2142 content: String,
2143 cx: &mut Context<Self>,
2144 ) -> Task<Result<()>> {
2145 let project = self.project.clone();
2146 let action_log = self.action_log.clone();
2147 cx.spawn(async move |this, cx| {
2148 let load = project.update(cx, |project, cx| {
2149 let path = project
2150 .project_path_for_absolute_path(&path, cx)
2151 .context("invalid path")?;
2152 anyhow::Ok(project.open_buffer(path, cx))
2153 });
2154 let buffer = load??.await?;
2155 let snapshot = this.update(cx, |this, cx| {
2156 this.shared_buffers
2157 .get(&buffer)
2158 .cloned()
2159 .unwrap_or_else(|| buffer.read(cx).snapshot())
2160 })?;
2161 let edits = cx
2162 .background_executor()
2163 .spawn(async move {
2164 let old_text = snapshot.text();
2165 text_diff(old_text.as_str(), &content)
2166 .into_iter()
2167 .map(|(range, replacement)| {
2168 (
2169 snapshot.anchor_after(range.start)
2170 ..snapshot.anchor_before(range.end),
2171 replacement,
2172 )
2173 })
2174 .collect::<Vec<_>>()
2175 })
2176 .await;
2177
2178 project.update(cx, |project, cx| {
2179 project.set_agent_location(
2180 Some(AgentLocation {
2181 buffer: buffer.downgrade(),
2182 position: edits
2183 .last()
2184 .map(|(range, _)| range.end)
2185 .unwrap_or(Anchor::min_for_buffer(buffer.read(cx).remote_id())),
2186 }),
2187 cx,
2188 );
2189 })?;
2190
2191 let format_on_save = cx.update(|cx| {
2192 action_log.update(cx, |action_log, cx| {
2193 action_log.buffer_read(buffer.clone(), cx);
2194 });
2195
2196 let format_on_save = buffer.update(cx, |buffer, cx| {
2197 buffer.edit(edits, None, cx);
2198
2199 let settings = language::language_settings::language_settings(
2200 buffer.language().map(|l| l.name()),
2201 buffer.file(),
2202 cx,
2203 );
2204
2205 settings.format_on_save != FormatOnSave::Off
2206 });
2207 action_log.update(cx, |action_log, cx| {
2208 action_log.buffer_edited(buffer.clone(), cx);
2209 });
2210 format_on_save
2211 })?;
2212
2213 if format_on_save {
2214 let format_task = project.update(cx, |project, cx| {
2215 project.format(
2216 HashSet::from_iter([buffer.clone()]),
2217 LspFormatTarget::Buffers,
2218 false,
2219 FormatTrigger::Save,
2220 cx,
2221 )
2222 })?;
2223 format_task.await.log_err();
2224
2225 action_log.update(cx, |action_log, cx| {
2226 action_log.buffer_edited(buffer.clone(), cx);
2227 })?;
2228 }
2229
2230 project
2231 .update(cx, |project, cx| project.save_buffer(buffer, cx))?
2232 .await
2233 })
2234 }
2235
2236 pub fn create_terminal(
2237 &self,
2238 command: String,
2239 args: Vec<String>,
2240 extra_env: Vec<acp::EnvVariable>,
2241 cwd: Option<PathBuf>,
2242 output_byte_limit: Option<u64>,
2243 cx: &mut Context<Self>,
2244 ) -> Task<Result<Entity<Terminal>>> {
2245 let env = match &cwd {
2246 Some(dir) => self.project.update(cx, |project, cx| {
2247 project.environment().update(cx, |env, cx| {
2248 env.directory_environment(dir.as_path().into(), cx)
2249 })
2250 }),
2251 None => Task::ready(None).shared(),
2252 };
2253 let env = cx.spawn(async move |_, _| {
2254 let mut env = env.await.unwrap_or_default();
2255 // Disables paging for `git` and hopefully other commands
2256 env.insert("PAGER".into(), "".into());
2257 for var in extra_env {
2258 env.insert(var.name, var.value);
2259 }
2260 env
2261 });
2262
2263 let project = self.project.clone();
2264 let language_registry = project.read(cx).languages().clone();
2265 let is_windows = project.read(cx).path_style(cx).is_windows();
2266
2267 let terminal_id = acp::TerminalId::new(Uuid::new_v4().to_string());
2268 let terminal_task = cx.spawn({
2269 let terminal_id = terminal_id.clone();
2270 async move |_this, cx| {
2271 let env = env.await;
2272 let shell = project
2273 .update(cx, |project, cx| {
2274 project
2275 .remote_client()
2276 .and_then(|r| r.read(cx).default_system_shell())
2277 })?
2278 .unwrap_or_else(|| get_default_system_shell_preferring_bash());
2279 let (task_command, task_args) =
2280 ShellBuilder::new(&Shell::Program(shell), is_windows)
2281 .redirect_stdin_to_dev_null()
2282 .build(Some(command.clone()), &args);
2283 let terminal = project
2284 .update(cx, |project, cx| {
2285 project.create_terminal_task(
2286 task::SpawnInTerminal {
2287 command: Some(task_command),
2288 args: task_args,
2289 cwd: cwd.clone(),
2290 env,
2291 ..Default::default()
2292 },
2293 cx,
2294 )
2295 })?
2296 .await?;
2297
2298 cx.new(|cx| {
2299 Terminal::new(
2300 terminal_id,
2301 &format!("{} {}", command, args.join(" ")),
2302 cwd,
2303 output_byte_limit.map(|l| l as usize),
2304 terminal,
2305 language_registry,
2306 cx,
2307 )
2308 })
2309 }
2310 });
2311
2312 cx.spawn(async move |this, cx| {
2313 let terminal = terminal_task.await?;
2314 this.update(cx, |this, _cx| {
2315 this.terminals.insert(terminal_id, terminal.clone());
2316 terminal
2317 })
2318 })
2319 }
2320
2321 pub fn kill_terminal(
2322 &mut self,
2323 terminal_id: acp::TerminalId,
2324 cx: &mut Context<Self>,
2325 ) -> Result<()> {
2326 self.terminals
2327 .get(&terminal_id)
2328 .context("Terminal not found")?
2329 .update(cx, |terminal, cx| {
2330 terminal.kill(cx);
2331 });
2332
2333 Ok(())
2334 }
2335
2336 pub fn release_terminal(
2337 &mut self,
2338 terminal_id: acp::TerminalId,
2339 cx: &mut Context<Self>,
2340 ) -> Result<()> {
2341 self.terminals
2342 .remove(&terminal_id)
2343 .context("Terminal not found")?
2344 .update(cx, |terminal, cx| {
2345 terminal.kill(cx);
2346 });
2347
2348 Ok(())
2349 }
2350
2351 pub fn terminal(&self, terminal_id: acp::TerminalId) -> Result<Entity<Terminal>> {
2352 self.terminals
2353 .get(&terminal_id)
2354 .context("Terminal not found")
2355 .cloned()
2356 }
2357
2358 pub fn to_markdown(&self, cx: &App) -> String {
2359 self.entries.iter().map(|e| e.to_markdown(cx)).collect()
2360 }
2361
2362 pub fn emit_load_error(&mut self, error: LoadError, cx: &mut Context<Self>) {
2363 cx.emit(AcpThreadEvent::LoadError(error));
2364 }
2365
2366 pub fn register_terminal_created(
2367 &mut self,
2368 terminal_id: acp::TerminalId,
2369 command_label: String,
2370 working_dir: Option<PathBuf>,
2371 output_byte_limit: Option<u64>,
2372 terminal: Entity<::terminal::Terminal>,
2373 cx: &mut Context<Self>,
2374 ) -> Entity<Terminal> {
2375 let language_registry = self.project.read(cx).languages().clone();
2376
2377 let entity = cx.new(|cx| {
2378 Terminal::new(
2379 terminal_id.clone(),
2380 &command_label,
2381 working_dir.clone(),
2382 output_byte_limit.map(|l| l as usize),
2383 terminal,
2384 language_registry,
2385 cx,
2386 )
2387 });
2388 self.terminals.insert(terminal_id.clone(), entity.clone());
2389 entity
2390 }
2391}
2392
2393fn markdown_for_raw_output(
2394 raw_output: &serde_json::Value,
2395 language_registry: &Arc<LanguageRegistry>,
2396 cx: &mut App,
2397) -> Option<Entity<Markdown>> {
2398 match raw_output {
2399 serde_json::Value::Null => None,
2400 serde_json::Value::Bool(value) => Some(cx.new(|cx| {
2401 Markdown::new(
2402 value.to_string().into(),
2403 Some(language_registry.clone()),
2404 None,
2405 cx,
2406 )
2407 })),
2408 serde_json::Value::Number(value) => Some(cx.new(|cx| {
2409 Markdown::new(
2410 value.to_string().into(),
2411 Some(language_registry.clone()),
2412 None,
2413 cx,
2414 )
2415 })),
2416 serde_json::Value::String(value) => Some(cx.new(|cx| {
2417 Markdown::new(
2418 value.clone().into(),
2419 Some(language_registry.clone()),
2420 None,
2421 cx,
2422 )
2423 })),
2424 value => Some(cx.new(|cx| {
2425 Markdown::new(
2426 format!("```json\n{}\n```", value).into(),
2427 Some(language_registry.clone()),
2428 None,
2429 cx,
2430 )
2431 })),
2432 }
2433}
2434
2435#[cfg(test)]
2436mod tests {
2437 use super::*;
2438 use anyhow::anyhow;
2439 use futures::{channel::mpsc, future::LocalBoxFuture, select};
2440 use gpui::{App, AsyncApp, TestAppContext, WeakEntity};
2441 use indoc::indoc;
2442 use project::{FakeFs, Fs};
2443 use rand::{distr, prelude::*};
2444 use serde_json::json;
2445 use settings::SettingsStore;
2446 use smol::stream::StreamExt as _;
2447 use std::{
2448 any::Any,
2449 cell::RefCell,
2450 path::Path,
2451 rc::Rc,
2452 sync::atomic::{AtomicBool, AtomicUsize, Ordering::SeqCst},
2453 time::Duration,
2454 };
2455 use util::path;
2456
2457 fn init_test(cx: &mut TestAppContext) {
2458 env_logger::try_init().ok();
2459 cx.update(|cx| {
2460 let settings_store = SettingsStore::test(cx);
2461 cx.set_global(settings_store);
2462 });
2463 }
2464
2465 #[gpui::test]
2466 async fn test_terminal_output_buffered_before_created_renders(cx: &mut gpui::TestAppContext) {
2467 init_test(cx);
2468
2469 let fs = FakeFs::new(cx.executor());
2470 let project = Project::test(fs, [], cx).await;
2471 let connection = Rc::new(FakeAgentConnection::new());
2472 let thread = cx
2473 .update(|cx| connection.new_thread(project, std::path::Path::new(path!("/test")), cx))
2474 .await
2475 .unwrap();
2476
2477 let terminal_id = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
2478
2479 // Send Output BEFORE Created - should be buffered by acp_thread
2480 thread.update(cx, |thread, cx| {
2481 thread.on_terminal_provider_event(
2482 TerminalProviderEvent::Output {
2483 terminal_id: terminal_id.clone(),
2484 data: b"hello buffered".to_vec(),
2485 },
2486 cx,
2487 );
2488 });
2489
2490 // Create a display-only terminal and then send Created
2491 let lower = cx.new(|cx| {
2492 let builder = ::terminal::TerminalBuilder::new_display_only(
2493 ::terminal::terminal_settings::CursorShape::default(),
2494 ::terminal::terminal_settings::AlternateScroll::On,
2495 None,
2496 0,
2497 )
2498 .unwrap();
2499 builder.subscribe(cx)
2500 });
2501
2502 thread.update(cx, |thread, cx| {
2503 thread.on_terminal_provider_event(
2504 TerminalProviderEvent::Created {
2505 terminal_id: terminal_id.clone(),
2506 label: "Buffered Test".to_string(),
2507 cwd: None,
2508 output_byte_limit: None,
2509 terminal: lower.clone(),
2510 },
2511 cx,
2512 );
2513 });
2514
2515 // After Created, buffered Output should have been flushed into the renderer
2516 let content = thread.read_with(cx, |thread, cx| {
2517 let term = thread.terminal(terminal_id.clone()).unwrap();
2518 term.read_with(cx, |t, cx| t.inner().read(cx).get_content())
2519 });
2520
2521 assert!(
2522 content.contains("hello buffered"),
2523 "expected buffered output to render, got: {content}"
2524 );
2525 }
2526
2527 #[gpui::test]
2528 async fn test_terminal_output_and_exit_buffered_before_created(cx: &mut gpui::TestAppContext) {
2529 init_test(cx);
2530
2531 let fs = FakeFs::new(cx.executor());
2532 let project = Project::test(fs, [], cx).await;
2533 let connection = Rc::new(FakeAgentConnection::new());
2534 let thread = cx
2535 .update(|cx| connection.new_thread(project, std::path::Path::new(path!("/test")), cx))
2536 .await
2537 .unwrap();
2538
2539 let terminal_id = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
2540
2541 // Send Output BEFORE Created
2542 thread.update(cx, |thread, cx| {
2543 thread.on_terminal_provider_event(
2544 TerminalProviderEvent::Output {
2545 terminal_id: terminal_id.clone(),
2546 data: b"pre-exit data".to_vec(),
2547 },
2548 cx,
2549 );
2550 });
2551
2552 // Send Exit BEFORE Created
2553 thread.update(cx, |thread, cx| {
2554 thread.on_terminal_provider_event(
2555 TerminalProviderEvent::Exit {
2556 terminal_id: terminal_id.clone(),
2557 status: acp::TerminalExitStatus::new().exit_code(0),
2558 },
2559 cx,
2560 );
2561 });
2562
2563 // Now create a display-only lower-level terminal and send Created
2564 let lower = cx.new(|cx| {
2565 let builder = ::terminal::TerminalBuilder::new_display_only(
2566 ::terminal::terminal_settings::CursorShape::default(),
2567 ::terminal::terminal_settings::AlternateScroll::On,
2568 None,
2569 0,
2570 )
2571 .unwrap();
2572 builder.subscribe(cx)
2573 });
2574
2575 thread.update(cx, |thread, cx| {
2576 thread.on_terminal_provider_event(
2577 TerminalProviderEvent::Created {
2578 terminal_id: terminal_id.clone(),
2579 label: "Buffered Exit Test".to_string(),
2580 cwd: None,
2581 output_byte_limit: None,
2582 terminal: lower.clone(),
2583 },
2584 cx,
2585 );
2586 });
2587
2588 // Output should be present after Created (flushed from buffer)
2589 let content = thread.read_with(cx, |thread, cx| {
2590 let term = thread.terminal(terminal_id.clone()).unwrap();
2591 term.read_with(cx, |t, cx| t.inner().read(cx).get_content())
2592 });
2593
2594 assert!(
2595 content.contains("pre-exit data"),
2596 "expected pre-exit data to render, got: {content}"
2597 );
2598 }
2599
2600 #[gpui::test]
2601 async fn test_push_user_content_block(cx: &mut gpui::TestAppContext) {
2602 init_test(cx);
2603
2604 let fs = FakeFs::new(cx.executor());
2605 let project = Project::test(fs, [], cx).await;
2606 let connection = Rc::new(FakeAgentConnection::new());
2607 let thread = cx
2608 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
2609 .await
2610 .unwrap();
2611
2612 // Test creating a new user message
2613 thread.update(cx, |thread, cx| {
2614 thread.push_user_content_block(None, "Hello, ".into(), cx);
2615 });
2616
2617 thread.update(cx, |thread, cx| {
2618 assert_eq!(thread.entries.len(), 1);
2619 if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
2620 assert_eq!(user_msg.id, None);
2621 assert_eq!(user_msg.content.to_markdown(cx), "Hello, ");
2622 } else {
2623 panic!("Expected UserMessage");
2624 }
2625 });
2626
2627 // Test appending to existing user message
2628 let message_1_id = UserMessageId::new();
2629 thread.update(cx, |thread, cx| {
2630 thread.push_user_content_block(Some(message_1_id.clone()), "world!".into(), cx);
2631 });
2632
2633 thread.update(cx, |thread, cx| {
2634 assert_eq!(thread.entries.len(), 1);
2635 if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
2636 assert_eq!(user_msg.id, Some(message_1_id));
2637 assert_eq!(user_msg.content.to_markdown(cx), "Hello, world!");
2638 } else {
2639 panic!("Expected UserMessage");
2640 }
2641 });
2642
2643 // Test creating new user message after assistant message
2644 thread.update(cx, |thread, cx| {
2645 thread.push_assistant_content_block("Assistant response".into(), false, cx);
2646 });
2647
2648 let message_2_id = UserMessageId::new();
2649 thread.update(cx, |thread, cx| {
2650 thread.push_user_content_block(
2651 Some(message_2_id.clone()),
2652 "New user message".into(),
2653 cx,
2654 );
2655 });
2656
2657 thread.update(cx, |thread, cx| {
2658 assert_eq!(thread.entries.len(), 3);
2659 if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[2] {
2660 assert_eq!(user_msg.id, Some(message_2_id));
2661 assert_eq!(user_msg.content.to_markdown(cx), "New user message");
2662 } else {
2663 panic!("Expected UserMessage at index 2");
2664 }
2665 });
2666 }
2667
2668 #[gpui::test]
2669 async fn test_thinking_concatenation(cx: &mut gpui::TestAppContext) {
2670 init_test(cx);
2671
2672 let fs = FakeFs::new(cx.executor());
2673 let project = Project::test(fs, [], cx).await;
2674 let connection = Rc::new(FakeAgentConnection::new().on_user_message(
2675 |_, thread, mut cx| {
2676 async move {
2677 thread.update(&mut cx, |thread, cx| {
2678 thread
2679 .handle_session_update(
2680 acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk::new(
2681 "Thinking ".into(),
2682 )),
2683 cx,
2684 )
2685 .unwrap();
2686 thread
2687 .handle_session_update(
2688 acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk::new(
2689 "hard!".into(),
2690 )),
2691 cx,
2692 )
2693 .unwrap();
2694 })?;
2695 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
2696 }
2697 .boxed_local()
2698 },
2699 ));
2700
2701 let thread = cx
2702 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
2703 .await
2704 .unwrap();
2705
2706 thread
2707 .update(cx, |thread, cx| thread.send_raw("Hello from Zed!", cx))
2708 .await
2709 .unwrap();
2710
2711 let output = thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
2712 assert_eq!(
2713 output,
2714 indoc! {r#"
2715 ## User
2716
2717 Hello from Zed!
2718
2719 ## Assistant
2720
2721 <thinking>
2722 Thinking hard!
2723 </thinking>
2724
2725 "#}
2726 );
2727 }
2728
2729 #[gpui::test]
2730 async fn test_edits_concurrently_to_user(cx: &mut TestAppContext) {
2731 init_test(cx);
2732
2733 let fs = FakeFs::new(cx.executor());
2734 fs.insert_tree(path!("/tmp"), json!({"foo": "one\ntwo\nthree\n"}))
2735 .await;
2736 let project = Project::test(fs.clone(), [], cx).await;
2737 let (read_file_tx, read_file_rx) = oneshot::channel::<()>();
2738 let read_file_tx = Rc::new(RefCell::new(Some(read_file_tx)));
2739 let connection = Rc::new(FakeAgentConnection::new().on_user_message(
2740 move |_, thread, mut cx| {
2741 let read_file_tx = read_file_tx.clone();
2742 async move {
2743 let content = thread
2744 .update(&mut cx, |thread, cx| {
2745 thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
2746 })
2747 .unwrap()
2748 .await
2749 .unwrap();
2750 assert_eq!(content, "one\ntwo\nthree\n");
2751 read_file_tx.take().unwrap().send(()).unwrap();
2752 thread
2753 .update(&mut cx, |thread, cx| {
2754 thread.write_text_file(
2755 path!("/tmp/foo").into(),
2756 "one\ntwo\nthree\nfour\nfive\n".to_string(),
2757 cx,
2758 )
2759 })
2760 .unwrap()
2761 .await
2762 .unwrap();
2763 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
2764 }
2765 .boxed_local()
2766 },
2767 ));
2768
2769 let (worktree, pathbuf) = project
2770 .update(cx, |project, cx| {
2771 project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
2772 })
2773 .await
2774 .unwrap();
2775 let buffer = project
2776 .update(cx, |project, cx| {
2777 project.open_buffer((worktree.read(cx).id(), pathbuf), cx)
2778 })
2779 .await
2780 .unwrap();
2781
2782 let thread = cx
2783 .update(|cx| connection.new_thread(project, Path::new(path!("/tmp")), cx))
2784 .await
2785 .unwrap();
2786
2787 let request = thread.update(cx, |thread, cx| {
2788 thread.send_raw("Extend the count in /tmp/foo", cx)
2789 });
2790 read_file_rx.await.ok();
2791 buffer.update(cx, |buffer, cx| {
2792 buffer.edit([(0..0, "zero\n".to_string())], None, cx);
2793 });
2794 cx.run_until_parked();
2795 assert_eq!(
2796 buffer.read_with(cx, |buffer, _| buffer.text()),
2797 "zero\none\ntwo\nthree\nfour\nfive\n"
2798 );
2799 assert_eq!(
2800 String::from_utf8(fs.read_file_sync(path!("/tmp/foo")).unwrap()).unwrap(),
2801 "zero\none\ntwo\nthree\nfour\nfive\n"
2802 );
2803 request.await.unwrap();
2804 }
2805
2806 #[gpui::test]
2807 async fn test_reading_from_line(cx: &mut TestAppContext) {
2808 init_test(cx);
2809
2810 let fs = FakeFs::new(cx.executor());
2811 fs.insert_tree(path!("/tmp"), json!({"foo": "one\ntwo\nthree\nfour\n"}))
2812 .await;
2813 let project = Project::test(fs.clone(), [], cx).await;
2814 project
2815 .update(cx, |project, cx| {
2816 project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
2817 })
2818 .await
2819 .unwrap();
2820
2821 let connection = Rc::new(FakeAgentConnection::new());
2822
2823 let thread = cx
2824 .update(|cx| connection.new_thread(project, Path::new(path!("/tmp")), cx))
2825 .await
2826 .unwrap();
2827
2828 // Whole file
2829 let content = thread
2830 .update(cx, |thread, cx| {
2831 thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
2832 })
2833 .await
2834 .unwrap();
2835
2836 assert_eq!(content, "one\ntwo\nthree\nfour\n");
2837
2838 // Only start line
2839 let content = thread
2840 .update(cx, |thread, cx| {
2841 thread.read_text_file(path!("/tmp/foo").into(), Some(3), None, false, cx)
2842 })
2843 .await
2844 .unwrap();
2845
2846 assert_eq!(content, "three\nfour\n");
2847
2848 // Only limit
2849 let content = thread
2850 .update(cx, |thread, cx| {
2851 thread.read_text_file(path!("/tmp/foo").into(), None, Some(2), false, cx)
2852 })
2853 .await
2854 .unwrap();
2855
2856 assert_eq!(content, "one\ntwo\n");
2857
2858 // Range
2859 let content = thread
2860 .update(cx, |thread, cx| {
2861 thread.read_text_file(path!("/tmp/foo").into(), Some(2), Some(2), false, cx)
2862 })
2863 .await
2864 .unwrap();
2865
2866 assert_eq!(content, "two\nthree\n");
2867
2868 // Invalid
2869 let err = thread
2870 .update(cx, |thread, cx| {
2871 thread.read_text_file(path!("/tmp/foo").into(), Some(6), Some(2), false, cx)
2872 })
2873 .await
2874 .unwrap_err();
2875
2876 assert_eq!(
2877 err.to_string(),
2878 "Invalid params: \"Attempting to read beyond the end of the file, line 5:0\""
2879 );
2880 }
2881
2882 #[gpui::test]
2883 async fn test_reading_empty_file(cx: &mut TestAppContext) {
2884 init_test(cx);
2885
2886 let fs = FakeFs::new(cx.executor());
2887 fs.insert_tree(path!("/tmp"), json!({"foo": ""})).await;
2888 let project = Project::test(fs.clone(), [], cx).await;
2889 project
2890 .update(cx, |project, cx| {
2891 project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
2892 })
2893 .await
2894 .unwrap();
2895
2896 let connection = Rc::new(FakeAgentConnection::new());
2897
2898 let thread = cx
2899 .update(|cx| connection.new_thread(project, Path::new(path!("/tmp")), cx))
2900 .await
2901 .unwrap();
2902
2903 // Whole file
2904 let content = thread
2905 .update(cx, |thread, cx| {
2906 thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
2907 })
2908 .await
2909 .unwrap();
2910
2911 assert_eq!(content, "");
2912
2913 // Only start line
2914 let content = thread
2915 .update(cx, |thread, cx| {
2916 thread.read_text_file(path!("/tmp/foo").into(), Some(1), None, false, cx)
2917 })
2918 .await
2919 .unwrap();
2920
2921 assert_eq!(content, "");
2922
2923 // Only limit
2924 let content = thread
2925 .update(cx, |thread, cx| {
2926 thread.read_text_file(path!("/tmp/foo").into(), None, Some(2), false, cx)
2927 })
2928 .await
2929 .unwrap();
2930
2931 assert_eq!(content, "");
2932
2933 // Range
2934 let content = thread
2935 .update(cx, |thread, cx| {
2936 thread.read_text_file(path!("/tmp/foo").into(), Some(1), Some(1), false, cx)
2937 })
2938 .await
2939 .unwrap();
2940
2941 assert_eq!(content, "");
2942
2943 // Invalid
2944 let err = thread
2945 .update(cx, |thread, cx| {
2946 thread.read_text_file(path!("/tmp/foo").into(), Some(5), Some(2), false, cx)
2947 })
2948 .await
2949 .unwrap_err();
2950
2951 assert_eq!(
2952 err.to_string(),
2953 "Invalid params: \"Attempting to read beyond the end of the file, line 1:0\""
2954 );
2955 }
2956 #[gpui::test]
2957 async fn test_reading_non_existing_file(cx: &mut TestAppContext) {
2958 init_test(cx);
2959
2960 let fs = FakeFs::new(cx.executor());
2961 fs.insert_tree(path!("/tmp"), json!({})).await;
2962 let project = Project::test(fs.clone(), [], cx).await;
2963 project
2964 .update(cx, |project, cx| {
2965 project.find_or_create_worktree(path!("/tmp"), true, cx)
2966 })
2967 .await
2968 .unwrap();
2969
2970 let connection = Rc::new(FakeAgentConnection::new());
2971
2972 let thread = cx
2973 .update(|cx| connection.new_thread(project, Path::new(path!("/tmp")), cx))
2974 .await
2975 .unwrap();
2976
2977 // Out of project file
2978 let err = thread
2979 .update(cx, |thread, cx| {
2980 thread.read_text_file(path!("/foo").into(), None, None, false, cx)
2981 })
2982 .await
2983 .unwrap_err();
2984
2985 assert_eq!(err.code, acp::ErrorCode::ResourceNotFound);
2986 }
2987
2988 #[gpui::test]
2989 async fn test_succeeding_canceled_toolcall(cx: &mut TestAppContext) {
2990 init_test(cx);
2991
2992 let fs = FakeFs::new(cx.executor());
2993 let project = Project::test(fs, [], cx).await;
2994 let id = acp::ToolCallId::new("test");
2995
2996 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
2997 let id = id.clone();
2998 move |_, thread, mut cx| {
2999 let id = id.clone();
3000 async move {
3001 thread
3002 .update(&mut cx, |thread, cx| {
3003 thread.handle_session_update(
3004 acp::SessionUpdate::ToolCall(
3005 acp::ToolCall::new(id.clone(), "Label")
3006 .kind(acp::ToolKind::Fetch)
3007 .status(acp::ToolCallStatus::InProgress),
3008 ),
3009 cx,
3010 )
3011 })
3012 .unwrap()
3013 .unwrap();
3014 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3015 }
3016 .boxed_local()
3017 }
3018 }));
3019
3020 let thread = cx
3021 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3022 .await
3023 .unwrap();
3024
3025 let request = thread.update(cx, |thread, cx| {
3026 thread.send_raw("Fetch https://example.com", cx)
3027 });
3028
3029 run_until_first_tool_call(&thread, cx).await;
3030
3031 thread.read_with(cx, |thread, _| {
3032 assert!(matches!(
3033 thread.entries[1],
3034 AgentThreadEntry::ToolCall(ToolCall {
3035 status: ToolCallStatus::InProgress,
3036 ..
3037 })
3038 ));
3039 });
3040
3041 thread.update(cx, |thread, cx| thread.cancel(cx)).await;
3042
3043 thread.read_with(cx, |thread, _| {
3044 assert!(matches!(
3045 &thread.entries[1],
3046 AgentThreadEntry::ToolCall(ToolCall {
3047 status: ToolCallStatus::Canceled,
3048 ..
3049 })
3050 ));
3051 });
3052
3053 thread
3054 .update(cx, |thread, cx| {
3055 thread.handle_session_update(
3056 acp::SessionUpdate::ToolCallUpdate(acp::ToolCallUpdate::new(
3057 id,
3058 acp::ToolCallUpdateFields::new().status(acp::ToolCallStatus::Completed),
3059 )),
3060 cx,
3061 )
3062 })
3063 .unwrap();
3064
3065 request.await.unwrap();
3066
3067 thread.read_with(cx, |thread, _| {
3068 assert!(matches!(
3069 thread.entries[1],
3070 AgentThreadEntry::ToolCall(ToolCall {
3071 status: ToolCallStatus::Completed,
3072 ..
3073 })
3074 ));
3075 });
3076 }
3077
3078 #[gpui::test]
3079 async fn test_no_pending_edits_if_tool_calls_are_completed(cx: &mut TestAppContext) {
3080 init_test(cx);
3081 let fs = FakeFs::new(cx.background_executor.clone());
3082 fs.insert_tree(path!("/test"), json!({})).await;
3083 let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
3084
3085 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3086 move |_, thread, mut cx| {
3087 async move {
3088 thread
3089 .update(&mut cx, |thread, cx| {
3090 thread.handle_session_update(
3091 acp::SessionUpdate::ToolCall(
3092 acp::ToolCall::new("test", "Label")
3093 .kind(acp::ToolKind::Edit)
3094 .status(acp::ToolCallStatus::Completed)
3095 .content(vec![acp::ToolCallContent::Diff(acp::Diff::new(
3096 "/test/test.txt",
3097 "foo",
3098 ))]),
3099 ),
3100 cx,
3101 )
3102 })
3103 .unwrap()
3104 .unwrap();
3105 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3106 }
3107 .boxed_local()
3108 }
3109 }));
3110
3111 let thread = cx
3112 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3113 .await
3114 .unwrap();
3115
3116 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Hi".into()], cx)))
3117 .await
3118 .unwrap();
3119
3120 assert!(cx.read(|cx| !thread.read(cx).has_pending_edit_tool_calls()));
3121 }
3122
3123 #[gpui::test(iterations = 10)]
3124 async fn test_checkpoints(cx: &mut TestAppContext) {
3125 init_test(cx);
3126 let fs = FakeFs::new(cx.background_executor.clone());
3127 fs.insert_tree(
3128 path!("/test"),
3129 json!({
3130 ".git": {}
3131 }),
3132 )
3133 .await;
3134 let project = Project::test(fs.clone(), [path!("/test").as_ref()], cx).await;
3135
3136 let simulate_changes = Arc::new(AtomicBool::new(true));
3137 let next_filename = Arc::new(AtomicUsize::new(0));
3138 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3139 let simulate_changes = simulate_changes.clone();
3140 let next_filename = next_filename.clone();
3141 let fs = fs.clone();
3142 move |request, thread, mut cx| {
3143 let fs = fs.clone();
3144 let simulate_changes = simulate_changes.clone();
3145 let next_filename = next_filename.clone();
3146 async move {
3147 if simulate_changes.load(SeqCst) {
3148 let filename = format!("/test/file-{}", next_filename.fetch_add(1, SeqCst));
3149 fs.write(Path::new(&filename), b"").await?;
3150 }
3151
3152 let acp::ContentBlock::Text(content) = &request.prompt[0] else {
3153 panic!("expected text content block");
3154 };
3155 thread.update(&mut cx, |thread, cx| {
3156 thread
3157 .handle_session_update(
3158 acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk::new(
3159 content.text.to_uppercase().into(),
3160 )),
3161 cx,
3162 )
3163 .unwrap();
3164 })?;
3165 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3166 }
3167 .boxed_local()
3168 }
3169 }));
3170 let thread = cx
3171 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3172 .await
3173 .unwrap();
3174
3175 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Lorem".into()], cx)))
3176 .await
3177 .unwrap();
3178 thread.read_with(cx, |thread, cx| {
3179 assert_eq!(
3180 thread.to_markdown(cx),
3181 indoc! {"
3182 ## User (checkpoint)
3183
3184 Lorem
3185
3186 ## Assistant
3187
3188 LOREM
3189
3190 "}
3191 );
3192 });
3193 assert_eq!(fs.files(), vec![Path::new(path!("/test/file-0"))]);
3194
3195 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["ipsum".into()], cx)))
3196 .await
3197 .unwrap();
3198 thread.read_with(cx, |thread, cx| {
3199 assert_eq!(
3200 thread.to_markdown(cx),
3201 indoc! {"
3202 ## User (checkpoint)
3203
3204 Lorem
3205
3206 ## Assistant
3207
3208 LOREM
3209
3210 ## User (checkpoint)
3211
3212 ipsum
3213
3214 ## Assistant
3215
3216 IPSUM
3217
3218 "}
3219 );
3220 });
3221 assert_eq!(
3222 fs.files(),
3223 vec![
3224 Path::new(path!("/test/file-0")),
3225 Path::new(path!("/test/file-1"))
3226 ]
3227 );
3228
3229 // Checkpoint isn't stored when there are no changes.
3230 simulate_changes.store(false, SeqCst);
3231 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["dolor".into()], cx)))
3232 .await
3233 .unwrap();
3234 thread.read_with(cx, |thread, cx| {
3235 assert_eq!(
3236 thread.to_markdown(cx),
3237 indoc! {"
3238 ## User (checkpoint)
3239
3240 Lorem
3241
3242 ## Assistant
3243
3244 LOREM
3245
3246 ## User (checkpoint)
3247
3248 ipsum
3249
3250 ## Assistant
3251
3252 IPSUM
3253
3254 ## User
3255
3256 dolor
3257
3258 ## Assistant
3259
3260 DOLOR
3261
3262 "}
3263 );
3264 });
3265 assert_eq!(
3266 fs.files(),
3267 vec![
3268 Path::new(path!("/test/file-0")),
3269 Path::new(path!("/test/file-1"))
3270 ]
3271 );
3272
3273 // Rewinding the conversation truncates the history and restores the checkpoint.
3274 thread
3275 .update(cx, |thread, cx| {
3276 let AgentThreadEntry::UserMessage(message) = &thread.entries[2] else {
3277 panic!("unexpected entries {:?}", thread.entries)
3278 };
3279 thread.restore_checkpoint(message.id.clone().unwrap(), cx)
3280 })
3281 .await
3282 .unwrap();
3283 thread.read_with(cx, |thread, cx| {
3284 assert_eq!(
3285 thread.to_markdown(cx),
3286 indoc! {"
3287 ## User (checkpoint)
3288
3289 Lorem
3290
3291 ## Assistant
3292
3293 LOREM
3294
3295 "}
3296 );
3297 });
3298 assert_eq!(fs.files(), vec![Path::new(path!("/test/file-0"))]);
3299 }
3300
3301 #[gpui::test]
3302 async fn test_tool_result_refusal(cx: &mut TestAppContext) {
3303 use std::sync::atomic::AtomicUsize;
3304 init_test(cx);
3305
3306 let fs = FakeFs::new(cx.executor());
3307 let project = Project::test(fs, None, cx).await;
3308
3309 // Create a connection that simulates refusal after tool result
3310 let prompt_count = Arc::new(AtomicUsize::new(0));
3311 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3312 let prompt_count = prompt_count.clone();
3313 move |_request, thread, mut cx| {
3314 let count = prompt_count.fetch_add(1, SeqCst);
3315 async move {
3316 if count == 0 {
3317 // First prompt: Generate a tool call with result
3318 thread.update(&mut cx, |thread, cx| {
3319 thread
3320 .handle_session_update(
3321 acp::SessionUpdate::ToolCall(
3322 acp::ToolCall::new("tool1", "Test Tool")
3323 .kind(acp::ToolKind::Fetch)
3324 .status(acp::ToolCallStatus::Completed)
3325 .raw_input(serde_json::json!({"query": "test"}))
3326 .raw_output(serde_json::json!({"result": "inappropriate content"})),
3327 ),
3328 cx,
3329 )
3330 .unwrap();
3331 })?;
3332
3333 // Now return refusal because of the tool result
3334 Ok(acp::PromptResponse::new(acp::StopReason::Refusal))
3335 } else {
3336 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3337 }
3338 }
3339 .boxed_local()
3340 }
3341 }));
3342
3343 let thread = cx
3344 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3345 .await
3346 .unwrap();
3347
3348 // Track if we see a Refusal event
3349 let saw_refusal_event = Arc::new(std::sync::Mutex::new(false));
3350 let saw_refusal_event_captured = saw_refusal_event.clone();
3351 thread.update(cx, |_thread, cx| {
3352 cx.subscribe(
3353 &thread,
3354 move |_thread, _event_thread, event: &AcpThreadEvent, _cx| {
3355 if matches!(event, AcpThreadEvent::Refusal) {
3356 *saw_refusal_event_captured.lock().unwrap() = true;
3357 }
3358 },
3359 )
3360 .detach();
3361 });
3362
3363 // Send a user message - this will trigger tool call and then refusal
3364 let send_task = thread.update(cx, |thread, cx| thread.send(vec!["Hello".into()], cx));
3365 cx.background_executor.spawn(send_task).detach();
3366 cx.run_until_parked();
3367
3368 // Verify that:
3369 // 1. A Refusal event WAS emitted (because it's a tool result refusal, not user prompt)
3370 // 2. The user message was NOT truncated
3371 assert!(
3372 *saw_refusal_event.lock().unwrap(),
3373 "Refusal event should be emitted for tool result refusals"
3374 );
3375
3376 thread.read_with(cx, |thread, _| {
3377 let entries = thread.entries();
3378 assert!(entries.len() >= 2, "Should have user message and tool call");
3379
3380 // Verify user message is still there
3381 assert!(
3382 matches!(entries[0], AgentThreadEntry::UserMessage(_)),
3383 "User message should not be truncated"
3384 );
3385
3386 // Verify tool call is there with result
3387 if let AgentThreadEntry::ToolCall(tool_call) = &entries[1] {
3388 assert!(
3389 tool_call.raw_output.is_some(),
3390 "Tool call should have output"
3391 );
3392 } else {
3393 panic!("Expected tool call at index 1");
3394 }
3395 });
3396 }
3397
3398 #[gpui::test]
3399 async fn test_user_prompt_refusal_emits_event(cx: &mut TestAppContext) {
3400 init_test(cx);
3401
3402 let fs = FakeFs::new(cx.executor());
3403 let project = Project::test(fs, None, cx).await;
3404
3405 let refuse_next = Arc::new(AtomicBool::new(false));
3406 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3407 let refuse_next = refuse_next.clone();
3408 move |_request, _thread, _cx| {
3409 if refuse_next.load(SeqCst) {
3410 async move { Ok(acp::PromptResponse::new(acp::StopReason::Refusal)) }
3411 .boxed_local()
3412 } else {
3413 async move { Ok(acp::PromptResponse::new(acp::StopReason::EndTurn)) }
3414 .boxed_local()
3415 }
3416 }
3417 }));
3418
3419 let thread = cx
3420 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3421 .await
3422 .unwrap();
3423
3424 // Track if we see a Refusal event
3425 let saw_refusal_event = Arc::new(std::sync::Mutex::new(false));
3426 let saw_refusal_event_captured = saw_refusal_event.clone();
3427 thread.update(cx, |_thread, cx| {
3428 cx.subscribe(
3429 &thread,
3430 move |_thread, _event_thread, event: &AcpThreadEvent, _cx| {
3431 if matches!(event, AcpThreadEvent::Refusal) {
3432 *saw_refusal_event_captured.lock().unwrap() = true;
3433 }
3434 },
3435 )
3436 .detach();
3437 });
3438
3439 // Send a message that will be refused
3440 refuse_next.store(true, SeqCst);
3441 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["hello".into()], cx)))
3442 .await
3443 .unwrap();
3444
3445 // Verify that a Refusal event WAS emitted for user prompt refusal
3446 assert!(
3447 *saw_refusal_event.lock().unwrap(),
3448 "Refusal event should be emitted for user prompt refusals"
3449 );
3450
3451 // Verify the message was truncated (user prompt refusal)
3452 thread.read_with(cx, |thread, cx| {
3453 assert_eq!(thread.to_markdown(cx), "");
3454 });
3455 }
3456
3457 #[gpui::test]
3458 async fn test_refusal(cx: &mut TestAppContext) {
3459 init_test(cx);
3460 let fs = FakeFs::new(cx.background_executor.clone());
3461 fs.insert_tree(path!("/"), json!({})).await;
3462 let project = Project::test(fs.clone(), [path!("/").as_ref()], cx).await;
3463
3464 let refuse_next = Arc::new(AtomicBool::new(false));
3465 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3466 let refuse_next = refuse_next.clone();
3467 move |request, thread, mut cx| {
3468 let refuse_next = refuse_next.clone();
3469 async move {
3470 if refuse_next.load(SeqCst) {
3471 return Ok(acp::PromptResponse::new(acp::StopReason::Refusal));
3472 }
3473
3474 let acp::ContentBlock::Text(content) = &request.prompt[0] else {
3475 panic!("expected text content block");
3476 };
3477 thread.update(&mut cx, |thread, cx| {
3478 thread
3479 .handle_session_update(
3480 acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk::new(
3481 content.text.to_uppercase().into(),
3482 )),
3483 cx,
3484 )
3485 .unwrap();
3486 })?;
3487 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3488 }
3489 .boxed_local()
3490 }
3491 }));
3492 let thread = cx
3493 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3494 .await
3495 .unwrap();
3496
3497 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["hello".into()], cx)))
3498 .await
3499 .unwrap();
3500 thread.read_with(cx, |thread, cx| {
3501 assert_eq!(
3502 thread.to_markdown(cx),
3503 indoc! {"
3504 ## User
3505
3506 hello
3507
3508 ## Assistant
3509
3510 HELLO
3511
3512 "}
3513 );
3514 });
3515
3516 // Simulate refusing the second message. The message should be truncated
3517 // when a user prompt is refused.
3518 refuse_next.store(true, SeqCst);
3519 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["world".into()], cx)))
3520 .await
3521 .unwrap();
3522 thread.read_with(cx, |thread, cx| {
3523 assert_eq!(
3524 thread.to_markdown(cx),
3525 indoc! {"
3526 ## User
3527
3528 hello
3529
3530 ## Assistant
3531
3532 HELLO
3533
3534 "}
3535 );
3536 });
3537 }
3538
3539 async fn run_until_first_tool_call(
3540 thread: &Entity<AcpThread>,
3541 cx: &mut TestAppContext,
3542 ) -> usize {
3543 let (mut tx, mut rx) = mpsc::channel::<usize>(1);
3544
3545 let subscription = cx.update(|cx| {
3546 cx.subscribe(thread, move |thread, _, cx| {
3547 for (ix, entry) in thread.read(cx).entries.iter().enumerate() {
3548 if matches!(entry, AgentThreadEntry::ToolCall(_)) {
3549 return tx.try_send(ix).unwrap();
3550 }
3551 }
3552 })
3553 });
3554
3555 select! {
3556 _ = futures::FutureExt::fuse(smol::Timer::after(Duration::from_secs(10))) => {
3557 panic!("Timeout waiting for tool call")
3558 }
3559 ix = rx.next().fuse() => {
3560 drop(subscription);
3561 ix.unwrap()
3562 }
3563 }
3564 }
3565
3566 #[derive(Clone, Default)]
3567 struct FakeAgentConnection {
3568 auth_methods: Vec<acp::AuthMethod>,
3569 sessions: Arc<parking_lot::Mutex<HashMap<acp::SessionId, WeakEntity<AcpThread>>>>,
3570 on_user_message: Option<
3571 Rc<
3572 dyn Fn(
3573 acp::PromptRequest,
3574 WeakEntity<AcpThread>,
3575 AsyncApp,
3576 ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
3577 + 'static,
3578 >,
3579 >,
3580 }
3581
3582 impl FakeAgentConnection {
3583 fn new() -> Self {
3584 Self {
3585 auth_methods: Vec::new(),
3586 on_user_message: None,
3587 sessions: Arc::default(),
3588 }
3589 }
3590
3591 #[expect(unused)]
3592 fn with_auth_methods(mut self, auth_methods: Vec<acp::AuthMethod>) -> Self {
3593 self.auth_methods = auth_methods;
3594 self
3595 }
3596
3597 fn on_user_message(
3598 mut self,
3599 handler: impl Fn(
3600 acp::PromptRequest,
3601 WeakEntity<AcpThread>,
3602 AsyncApp,
3603 ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
3604 + 'static,
3605 ) -> Self {
3606 self.on_user_message.replace(Rc::new(handler));
3607 self
3608 }
3609 }
3610
3611 impl AgentConnection for FakeAgentConnection {
3612 fn telemetry_id(&self) -> SharedString {
3613 "fake".into()
3614 }
3615
3616 fn auth_methods(&self) -> &[acp::AuthMethod] {
3617 &self.auth_methods
3618 }
3619
3620 fn new_thread(
3621 self: Rc<Self>,
3622 project: Entity<Project>,
3623 _cwd: &Path,
3624 cx: &mut App,
3625 ) -> Task<gpui::Result<Entity<AcpThread>>> {
3626 let session_id = acp::SessionId::new(
3627 rand::rng()
3628 .sample_iter(&distr::Alphanumeric)
3629 .take(7)
3630 .map(char::from)
3631 .collect::<String>(),
3632 );
3633 let action_log = cx.new(|_| ActionLog::new(project.clone()));
3634 let thread = cx.new(|cx| {
3635 AcpThread::new(
3636 "Test",
3637 self.clone(),
3638 project,
3639 action_log,
3640 session_id.clone(),
3641 watch::Receiver::constant(
3642 acp::PromptCapabilities::new()
3643 .image(true)
3644 .audio(true)
3645 .embedded_context(true),
3646 ),
3647 cx,
3648 )
3649 });
3650 self.sessions.lock().insert(session_id, thread.downgrade());
3651 Task::ready(Ok(thread))
3652 }
3653
3654 fn authenticate(&self, method: acp::AuthMethodId, _cx: &mut App) -> Task<gpui::Result<()>> {
3655 if self.auth_methods().iter().any(|m| m.id == method) {
3656 Task::ready(Ok(()))
3657 } else {
3658 Task::ready(Err(anyhow!("Invalid Auth Method")))
3659 }
3660 }
3661
3662 fn prompt(
3663 &self,
3664 _id: Option<UserMessageId>,
3665 params: acp::PromptRequest,
3666 cx: &mut App,
3667 ) -> Task<gpui::Result<acp::PromptResponse>> {
3668 let sessions = self.sessions.lock();
3669 let thread = sessions.get(¶ms.session_id).unwrap();
3670 if let Some(handler) = &self.on_user_message {
3671 let handler = handler.clone();
3672 let thread = thread.clone();
3673 cx.spawn(async move |cx| handler(params, thread, cx.clone()).await)
3674 } else {
3675 Task::ready(Ok(acp::PromptResponse::new(acp::StopReason::EndTurn)))
3676 }
3677 }
3678
3679 fn cancel(&self, session_id: &acp::SessionId, cx: &mut App) {
3680 let sessions = self.sessions.lock();
3681 let thread = sessions.get(session_id).unwrap().clone();
3682
3683 cx.spawn(async move |cx| {
3684 thread
3685 .update(cx, |thread, cx| thread.cancel(cx))
3686 .unwrap()
3687 .await
3688 })
3689 .detach();
3690 }
3691
3692 fn truncate(
3693 &self,
3694 session_id: &acp::SessionId,
3695 _cx: &App,
3696 ) -> Option<Rc<dyn AgentSessionTruncate>> {
3697 Some(Rc::new(FakeAgentSessionEditor {
3698 _session_id: session_id.clone(),
3699 }))
3700 }
3701
3702 fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
3703 self
3704 }
3705 }
3706
3707 struct FakeAgentSessionEditor {
3708 _session_id: acp::SessionId,
3709 }
3710
3711 impl AgentSessionTruncate for FakeAgentSessionEditor {
3712 fn run(&self, _message_id: UserMessageId, _cx: &mut App) -> Task<Result<()>> {
3713 Task::ready(Ok(()))
3714 }
3715 }
3716
3717 #[gpui::test]
3718 async fn test_tool_call_not_found_creates_failed_entry(cx: &mut TestAppContext) {
3719 init_test(cx);
3720
3721 let fs = FakeFs::new(cx.executor());
3722 let project = Project::test(fs, [], cx).await;
3723 let connection = Rc::new(FakeAgentConnection::new());
3724 let thread = cx
3725 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3726 .await
3727 .unwrap();
3728
3729 // Try to update a tool call that doesn't exist
3730 let nonexistent_id = acp::ToolCallId::new("nonexistent-tool-call");
3731 thread.update(cx, |thread, cx| {
3732 let result = thread.handle_session_update(
3733 acp::SessionUpdate::ToolCallUpdate(acp::ToolCallUpdate::new(
3734 nonexistent_id.clone(),
3735 acp::ToolCallUpdateFields::new().status(acp::ToolCallStatus::Completed),
3736 )),
3737 cx,
3738 );
3739
3740 // The update should succeed (not return an error)
3741 assert!(result.is_ok());
3742
3743 // There should now be exactly one entry in the thread
3744 assert_eq!(thread.entries.len(), 1);
3745
3746 // The entry should be a failed tool call
3747 if let AgentThreadEntry::ToolCall(tool_call) = &thread.entries[0] {
3748 assert_eq!(tool_call.id, nonexistent_id);
3749 assert!(matches!(tool_call.status, ToolCallStatus::Failed));
3750 assert_eq!(tool_call.kind, acp::ToolKind::Fetch);
3751
3752 // Check that the content contains the error message
3753 assert_eq!(tool_call.content.len(), 1);
3754 if let ToolCallContent::ContentBlock(content_block) = &tool_call.content[0] {
3755 match content_block {
3756 ContentBlock::Markdown { markdown } => {
3757 let markdown_text = markdown.read(cx).source();
3758 assert!(markdown_text.contains("Tool call not found"));
3759 }
3760 ContentBlock::Empty => panic!("Expected markdown content, got empty"),
3761 ContentBlock::ResourceLink { .. } => {
3762 panic!("Expected markdown content, got resource link")
3763 }
3764 }
3765 } else {
3766 panic!("Expected ContentBlock, got: {:?}", tool_call.content[0]);
3767 }
3768 } else {
3769 panic!("Expected ToolCall entry, got: {:?}", thread.entries[0]);
3770 }
3771 });
3772 }
3773
3774 /// Tests that restoring a checkpoint properly cleans up terminals that were
3775 /// created after that checkpoint, and cancels any in-progress generation.
3776 ///
3777 /// Reproduces issue #35142: When a checkpoint is restored, any terminal processes
3778 /// that were started after that checkpoint should be terminated, and any in-progress
3779 /// AI generation should be canceled.
3780 #[gpui::test]
3781 async fn test_restore_checkpoint_kills_terminal(cx: &mut TestAppContext) {
3782 init_test(cx);
3783
3784 let fs = FakeFs::new(cx.executor());
3785 let project = Project::test(fs, [], cx).await;
3786 let connection = Rc::new(FakeAgentConnection::new());
3787 let thread = cx
3788 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3789 .await
3790 .unwrap();
3791
3792 // Send first user message to create a checkpoint
3793 cx.update(|cx| {
3794 thread.update(cx, |thread, cx| {
3795 thread.send(vec!["first message".into()], cx)
3796 })
3797 })
3798 .await
3799 .unwrap();
3800
3801 // Send second message (creates another checkpoint) - we'll restore to this one
3802 cx.update(|cx| {
3803 thread.update(cx, |thread, cx| {
3804 thread.send(vec!["second message".into()], cx)
3805 })
3806 })
3807 .await
3808 .unwrap();
3809
3810 // Create 2 terminals BEFORE the checkpoint that have completed running
3811 let terminal_id_1 = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
3812 let mock_terminal_1 = cx.new(|cx| {
3813 let builder = ::terminal::TerminalBuilder::new_display_only(
3814 ::terminal::terminal_settings::CursorShape::default(),
3815 ::terminal::terminal_settings::AlternateScroll::On,
3816 None,
3817 0,
3818 )
3819 .unwrap();
3820 builder.subscribe(cx)
3821 });
3822
3823 thread.update(cx, |thread, cx| {
3824 thread.on_terminal_provider_event(
3825 TerminalProviderEvent::Created {
3826 terminal_id: terminal_id_1.clone(),
3827 label: "echo 'first'".to_string(),
3828 cwd: Some(PathBuf::from("/test")),
3829 output_byte_limit: None,
3830 terminal: mock_terminal_1.clone(),
3831 },
3832 cx,
3833 );
3834 });
3835
3836 thread.update(cx, |thread, cx| {
3837 thread.on_terminal_provider_event(
3838 TerminalProviderEvent::Output {
3839 terminal_id: terminal_id_1.clone(),
3840 data: b"first\n".to_vec(),
3841 },
3842 cx,
3843 );
3844 });
3845
3846 thread.update(cx, |thread, cx| {
3847 thread.on_terminal_provider_event(
3848 TerminalProviderEvent::Exit {
3849 terminal_id: terminal_id_1.clone(),
3850 status: acp::TerminalExitStatus::new().exit_code(0),
3851 },
3852 cx,
3853 );
3854 });
3855
3856 let terminal_id_2 = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
3857 let mock_terminal_2 = cx.new(|cx| {
3858 let builder = ::terminal::TerminalBuilder::new_display_only(
3859 ::terminal::terminal_settings::CursorShape::default(),
3860 ::terminal::terminal_settings::AlternateScroll::On,
3861 None,
3862 0,
3863 )
3864 .unwrap();
3865 builder.subscribe(cx)
3866 });
3867
3868 thread.update(cx, |thread, cx| {
3869 thread.on_terminal_provider_event(
3870 TerminalProviderEvent::Created {
3871 terminal_id: terminal_id_2.clone(),
3872 label: "echo 'second'".to_string(),
3873 cwd: Some(PathBuf::from("/test")),
3874 output_byte_limit: None,
3875 terminal: mock_terminal_2.clone(),
3876 },
3877 cx,
3878 );
3879 });
3880
3881 thread.update(cx, |thread, cx| {
3882 thread.on_terminal_provider_event(
3883 TerminalProviderEvent::Output {
3884 terminal_id: terminal_id_2.clone(),
3885 data: b"second\n".to_vec(),
3886 },
3887 cx,
3888 );
3889 });
3890
3891 thread.update(cx, |thread, cx| {
3892 thread.on_terminal_provider_event(
3893 TerminalProviderEvent::Exit {
3894 terminal_id: terminal_id_2.clone(),
3895 status: acp::TerminalExitStatus::new().exit_code(0),
3896 },
3897 cx,
3898 );
3899 });
3900
3901 // Get the second message ID to restore to
3902 let second_message_id = thread.read_with(cx, |thread, _| {
3903 // At this point we have:
3904 // - Index 0: First user message (with checkpoint)
3905 // - Index 1: Second user message (with checkpoint)
3906 // No assistant responses because FakeAgentConnection just returns EndTurn
3907 let AgentThreadEntry::UserMessage(message) = &thread.entries[1] else {
3908 panic!("expected user message at index 1");
3909 };
3910 message.id.clone().unwrap()
3911 });
3912
3913 // Create a terminal AFTER the checkpoint we'll restore to.
3914 // This simulates the AI agent starting a long-running terminal command.
3915 let terminal_id = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
3916 let mock_terminal = cx.new(|cx| {
3917 let builder = ::terminal::TerminalBuilder::new_display_only(
3918 ::terminal::terminal_settings::CursorShape::default(),
3919 ::terminal::terminal_settings::AlternateScroll::On,
3920 None,
3921 0,
3922 )
3923 .unwrap();
3924 builder.subscribe(cx)
3925 });
3926
3927 // Register the terminal as created
3928 thread.update(cx, |thread, cx| {
3929 thread.on_terminal_provider_event(
3930 TerminalProviderEvent::Created {
3931 terminal_id: terminal_id.clone(),
3932 label: "sleep 1000".to_string(),
3933 cwd: Some(PathBuf::from("/test")),
3934 output_byte_limit: None,
3935 terminal: mock_terminal.clone(),
3936 },
3937 cx,
3938 );
3939 });
3940
3941 // Simulate the terminal producing output (still running)
3942 thread.update(cx, |thread, cx| {
3943 thread.on_terminal_provider_event(
3944 TerminalProviderEvent::Output {
3945 terminal_id: terminal_id.clone(),
3946 data: b"terminal is running...\n".to_vec(),
3947 },
3948 cx,
3949 );
3950 });
3951
3952 // Create a tool call entry that references this terminal
3953 // This represents the agent requesting a terminal command
3954 thread.update(cx, |thread, cx| {
3955 thread
3956 .handle_session_update(
3957 acp::SessionUpdate::ToolCall(
3958 acp::ToolCall::new("terminal-tool-1", "Running command")
3959 .kind(acp::ToolKind::Execute)
3960 .status(acp::ToolCallStatus::InProgress)
3961 .content(vec![acp::ToolCallContent::Terminal(acp::Terminal::new(
3962 terminal_id.clone(),
3963 ))])
3964 .raw_input(serde_json::json!({"command": "sleep 1000", "cd": "/test"})),
3965 ),
3966 cx,
3967 )
3968 .unwrap();
3969 });
3970
3971 // Verify terminal exists and is in the thread
3972 let terminal_exists_before =
3973 thread.read_with(cx, |thread, _| thread.terminals.contains_key(&terminal_id));
3974 assert!(
3975 terminal_exists_before,
3976 "Terminal should exist before checkpoint restore"
3977 );
3978
3979 // Verify the terminal's underlying task is still running (not completed)
3980 let terminal_running_before = thread.read_with(cx, |thread, _cx| {
3981 let terminal_entity = thread.terminals.get(&terminal_id).unwrap();
3982 terminal_entity.read_with(cx, |term, _cx| {
3983 term.output().is_none() // output is None means it's still running
3984 })
3985 });
3986 assert!(
3987 terminal_running_before,
3988 "Terminal should be running before checkpoint restore"
3989 );
3990
3991 // Verify we have the expected entries before restore
3992 let entry_count_before = thread.read_with(cx, |thread, _| thread.entries.len());
3993 assert!(
3994 entry_count_before > 1,
3995 "Should have multiple entries before restore"
3996 );
3997
3998 // Restore the checkpoint to the second message.
3999 // This should:
4000 // 1. Cancel any in-progress generation (via the cancel() call)
4001 // 2. Remove the terminal that was created after that point
4002 thread
4003 .update(cx, |thread, cx| {
4004 thread.restore_checkpoint(second_message_id, cx)
4005 })
4006 .await
4007 .unwrap();
4008
4009 // Verify that no send_task is in progress after restore
4010 // (cancel() clears the send_task)
4011 let has_send_task_after = thread.read_with(cx, |thread, _| thread.send_task.is_some());
4012 assert!(
4013 !has_send_task_after,
4014 "Should not have a send_task after restore (cancel should have cleared it)"
4015 );
4016
4017 // Verify the entries were truncated (restoring to index 1 truncates at 1, keeping only index 0)
4018 let entry_count = thread.read_with(cx, |thread, _| thread.entries.len());
4019 assert_eq!(
4020 entry_count, 1,
4021 "Should have 1 entry after restore (only the first user message)"
4022 );
4023
4024 // Verify the 2 completed terminals from before the checkpoint still exist
4025 let terminal_1_exists = thread.read_with(cx, |thread, _| {
4026 thread.terminals.contains_key(&terminal_id_1)
4027 });
4028 assert!(
4029 terminal_1_exists,
4030 "Terminal 1 (from before checkpoint) should still exist"
4031 );
4032
4033 let terminal_2_exists = thread.read_with(cx, |thread, _| {
4034 thread.terminals.contains_key(&terminal_id_2)
4035 });
4036 assert!(
4037 terminal_2_exists,
4038 "Terminal 2 (from before checkpoint) should still exist"
4039 );
4040
4041 // Verify they're still in completed state
4042 let terminal_1_completed = thread.read_with(cx, |thread, _cx| {
4043 let terminal_entity = thread.terminals.get(&terminal_id_1).unwrap();
4044 terminal_entity.read_with(cx, |term, _cx| term.output().is_some())
4045 });
4046 assert!(terminal_1_completed, "Terminal 1 should still be completed");
4047
4048 let terminal_2_completed = thread.read_with(cx, |thread, _cx| {
4049 let terminal_entity = thread.terminals.get(&terminal_id_2).unwrap();
4050 terminal_entity.read_with(cx, |term, _cx| term.output().is_some())
4051 });
4052 assert!(terminal_2_completed, "Terminal 2 should still be completed");
4053
4054 // Verify the running terminal (created after checkpoint) was removed
4055 let terminal_3_exists =
4056 thread.read_with(cx, |thread, _| thread.terminals.contains_key(&terminal_id));
4057 assert!(
4058 !terminal_3_exists,
4059 "Terminal 3 (created after checkpoint) should have been removed"
4060 );
4061
4062 // Verify total count is 2 (the two from before the checkpoint)
4063 let terminal_count = thread.read_with(cx, |thread, _| thread.terminals.len());
4064 assert_eq!(
4065 terminal_count, 2,
4066 "Should have exactly 2 terminals (the completed ones from before checkpoint)"
4067 );
4068 }
4069}