1mod connection;
2mod diff;
3mod mention;
4mod terminal;
5
6use agent_settings::AgentSettings;
7
8/// Key used in ACP ToolCall meta to store the tool's programmatic name.
9/// This is a workaround since ACP's ToolCall doesn't have a dedicated name field.
10pub const TOOL_NAME_META_KEY: &str = "tool_name";
11
12/// The tool name for subagent spawning
13pub const SUBAGENT_TOOL_NAME: &str = "subagent";
14
15/// Helper to extract tool name from ACP meta
16pub fn tool_name_from_meta(meta: &Option<acp::Meta>) -> Option<SharedString> {
17 meta.as_ref()
18 .and_then(|m| m.get(TOOL_NAME_META_KEY))
19 .and_then(|v| v.as_str())
20 .map(|s| SharedString::from(s.to_owned()))
21}
22
23/// Helper to create meta with tool name
24pub fn meta_with_tool_name(tool_name: &str) -> acp::Meta {
25 acp::Meta::from_iter([(TOOL_NAME_META_KEY.into(), tool_name.into())])
26}
27use collections::HashSet;
28pub use connection::*;
29pub use diff::*;
30use language::language_settings::FormatOnSave;
31pub use mention::*;
32use project::lsp_store::{FormatTrigger, LspFormatTarget};
33use serde::{Deserialize, Serialize};
34use serde_json::to_string_pretty;
35use settings::Settings as _;
36use task::{Shell, ShellBuilder};
37pub use terminal::*;
38
39use action_log::{ActionLog, ActionLogTelemetry};
40use agent_client_protocol::{self as acp};
41use anyhow::{Context as _, Result, anyhow};
42use futures::{FutureExt, channel::oneshot, future::BoxFuture};
43use gpui::{AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Task, WeakEntity};
44use itertools::Itertools;
45use language::{Anchor, Buffer, BufferSnapshot, LanguageRegistry, Point, ToPoint, text_diff};
46use markdown::Markdown;
47use project::{AgentLocation, Project, git_store::GitStoreCheckpoint};
48use std::collections::HashMap;
49use std::error::Error;
50use std::fmt::{Formatter, Write};
51use std::ops::Range;
52use std::process::ExitStatus;
53use std::rc::Rc;
54use std::time::{Duration, Instant};
55use std::{fmt::Display, mem, path::PathBuf, sync::Arc};
56use text::Bias;
57use ui::App;
58use util::{ResultExt, get_default_system_shell_preferring_bash, paths::PathStyle};
59use uuid::Uuid;
60
61#[derive(Debug)]
62pub struct UserMessage {
63 pub id: Option<UserMessageId>,
64 pub content: ContentBlock,
65 pub chunks: Vec<acp::ContentBlock>,
66 pub checkpoint: Option<Checkpoint>,
67 pub indented: bool,
68}
69
70#[derive(Debug)]
71pub struct Checkpoint {
72 git_checkpoint: GitStoreCheckpoint,
73 pub show: bool,
74}
75
76impl UserMessage {
77 fn to_markdown(&self, cx: &App) -> String {
78 let mut markdown = String::new();
79 if self
80 .checkpoint
81 .as_ref()
82 .is_some_and(|checkpoint| checkpoint.show)
83 {
84 writeln!(markdown, "## User (checkpoint)").unwrap();
85 } else {
86 writeln!(markdown, "## User").unwrap();
87 }
88 writeln!(markdown).unwrap();
89 writeln!(markdown, "{}", self.content.to_markdown(cx)).unwrap();
90 writeln!(markdown).unwrap();
91 markdown
92 }
93}
94
95#[derive(Debug, PartialEq)]
96pub struct AssistantMessage {
97 pub chunks: Vec<AssistantMessageChunk>,
98 pub indented: bool,
99}
100
101impl AssistantMessage {
102 pub fn to_markdown(&self, cx: &App) -> String {
103 format!(
104 "## Assistant\n\n{}\n\n",
105 self.chunks
106 .iter()
107 .map(|chunk| chunk.to_markdown(cx))
108 .join("\n\n")
109 )
110 }
111}
112
113#[derive(Debug, PartialEq)]
114pub enum AssistantMessageChunk {
115 Message { block: ContentBlock },
116 Thought { block: ContentBlock },
117}
118
119impl AssistantMessageChunk {
120 pub fn from_str(
121 chunk: &str,
122 language_registry: &Arc<LanguageRegistry>,
123 path_style: PathStyle,
124 cx: &mut App,
125 ) -> Self {
126 Self::Message {
127 block: ContentBlock::new(chunk.into(), language_registry, path_style, cx),
128 }
129 }
130
131 fn to_markdown(&self, cx: &App) -> String {
132 match self {
133 Self::Message { block } => block.to_markdown(cx).to_string(),
134 Self::Thought { block } => {
135 format!("<thinking>\n{}\n</thinking>", block.to_markdown(cx))
136 }
137 }
138 }
139}
140
141#[derive(Debug)]
142pub enum AgentThreadEntry {
143 UserMessage(UserMessage),
144 AssistantMessage(AssistantMessage),
145 ToolCall(ToolCall),
146}
147
148impl AgentThreadEntry {
149 pub fn is_indented(&self) -> bool {
150 match self {
151 Self::UserMessage(message) => message.indented,
152 Self::AssistantMessage(message) => message.indented,
153 Self::ToolCall(_) => false,
154 }
155 }
156
157 pub fn to_markdown(&self, cx: &App) -> String {
158 match self {
159 Self::UserMessage(message) => message.to_markdown(cx),
160 Self::AssistantMessage(message) => message.to_markdown(cx),
161 Self::ToolCall(tool_call) => tool_call.to_markdown(cx),
162 }
163 }
164
165 pub fn user_message(&self) -> Option<&UserMessage> {
166 if let AgentThreadEntry::UserMessage(message) = self {
167 Some(message)
168 } else {
169 None
170 }
171 }
172
173 pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
174 if let AgentThreadEntry::ToolCall(call) = self {
175 itertools::Either::Left(call.diffs())
176 } else {
177 itertools::Either::Right(std::iter::empty())
178 }
179 }
180
181 pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
182 if let AgentThreadEntry::ToolCall(call) = self {
183 itertools::Either::Left(call.terminals())
184 } else {
185 itertools::Either::Right(std::iter::empty())
186 }
187 }
188
189 pub fn location(&self, ix: usize) -> Option<(acp::ToolCallLocation, AgentLocation)> {
190 if let AgentThreadEntry::ToolCall(ToolCall {
191 locations,
192 resolved_locations,
193 ..
194 }) = self
195 {
196 Some((
197 locations.get(ix)?.clone(),
198 resolved_locations.get(ix)?.clone()?,
199 ))
200 } else {
201 None
202 }
203 }
204}
205
206#[derive(Debug)]
207pub struct ToolCall {
208 pub id: acp::ToolCallId,
209 pub label: Entity<Markdown>,
210 pub kind: acp::ToolKind,
211 pub content: Vec<ToolCallContent>,
212 pub status: ToolCallStatus,
213 pub locations: Vec<acp::ToolCallLocation>,
214 pub resolved_locations: Vec<Option<AgentLocation>>,
215 pub raw_input: Option<serde_json::Value>,
216 pub raw_input_markdown: Option<Entity<Markdown>>,
217 pub raw_output: Option<serde_json::Value>,
218 pub tool_name: Option<SharedString>,
219}
220
221impl ToolCall {
222 fn from_acp(
223 tool_call: acp::ToolCall,
224 status: ToolCallStatus,
225 language_registry: Arc<LanguageRegistry>,
226 path_style: PathStyle,
227 terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
228 cx: &mut App,
229 ) -> Result<Self> {
230 let title = if tool_call.kind == acp::ToolKind::Execute {
231 tool_call.title
232 } else if let Some((first_line, _)) = tool_call.title.split_once("\n") {
233 first_line.to_owned() + "…"
234 } else {
235 tool_call.title
236 };
237 let mut content = Vec::with_capacity(tool_call.content.len());
238 for item in tool_call.content {
239 if let Some(item) = ToolCallContent::from_acp(
240 item,
241 language_registry.clone(),
242 path_style,
243 terminals,
244 cx,
245 )? {
246 content.push(item);
247 }
248 }
249
250 let raw_input_markdown = tool_call
251 .raw_input
252 .as_ref()
253 .and_then(|input| markdown_for_raw_output(input, &language_registry, cx));
254
255 let tool_name = tool_name_from_meta(&tool_call.meta);
256
257 let result = Self {
258 id: tool_call.tool_call_id,
259 label: cx
260 .new(|cx| Markdown::new(title.into(), Some(language_registry.clone()), None, cx)),
261 kind: tool_call.kind,
262 content,
263 locations: tool_call.locations,
264 resolved_locations: Vec::default(),
265 status,
266 raw_input: tool_call.raw_input,
267 raw_input_markdown,
268 raw_output: tool_call.raw_output,
269 tool_name,
270 };
271 Ok(result)
272 }
273
274 fn update_fields(
275 &mut self,
276 fields: acp::ToolCallUpdateFields,
277 language_registry: Arc<LanguageRegistry>,
278 path_style: PathStyle,
279 terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
280 cx: &mut App,
281 ) -> Result<()> {
282 let acp::ToolCallUpdateFields {
283 kind,
284 status,
285 title,
286 content,
287 locations,
288 raw_input,
289 raw_output,
290 ..
291 } = fields;
292
293 if let Some(kind) = kind {
294 self.kind = kind;
295 }
296
297 if let Some(status) = status {
298 self.status = status.into();
299 }
300
301 if let Some(title) = title {
302 self.label.update(cx, |label, cx| {
303 if self.kind == acp::ToolKind::Execute {
304 label.replace(title, cx);
305 } else if let Some((first_line, _)) = title.split_once("\n") {
306 label.replace(first_line.to_owned() + "…", cx);
307 } else {
308 label.replace(title, cx);
309 }
310 });
311 }
312
313 if let Some(content) = content {
314 let mut new_content_len = content.len();
315 let mut content = content.into_iter();
316
317 // Reuse existing content if we can
318 for (old, new) in self.content.iter_mut().zip(content.by_ref()) {
319 let valid_content =
320 old.update_from_acp(new, language_registry.clone(), path_style, terminals, cx)?;
321 if !valid_content {
322 new_content_len -= 1;
323 }
324 }
325 for new in content {
326 if let Some(new) = ToolCallContent::from_acp(
327 new,
328 language_registry.clone(),
329 path_style,
330 terminals,
331 cx,
332 )? {
333 self.content.push(new);
334 } else {
335 new_content_len -= 1;
336 }
337 }
338 self.content.truncate(new_content_len);
339 }
340
341 if let Some(locations) = locations {
342 self.locations = locations;
343 }
344
345 if let Some(raw_input) = raw_input {
346 self.raw_input_markdown = markdown_for_raw_output(&raw_input, &language_registry, cx);
347 self.raw_input = Some(raw_input);
348 }
349
350 if let Some(raw_output) = raw_output {
351 if self.content.is_empty()
352 && let Some(markdown) = markdown_for_raw_output(&raw_output, &language_registry, cx)
353 {
354 self.content
355 .push(ToolCallContent::ContentBlock(ContentBlock::Markdown {
356 markdown,
357 }));
358 }
359 self.raw_output = Some(raw_output);
360 }
361 Ok(())
362 }
363
364 pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
365 self.content.iter().filter_map(|content| match content {
366 ToolCallContent::Diff(diff) => Some(diff),
367 ToolCallContent::ContentBlock(_) => None,
368 ToolCallContent::Terminal(_) => None,
369 ToolCallContent::SubagentThread(_) => None,
370 })
371 }
372
373 pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
374 self.content.iter().filter_map(|content| match content {
375 ToolCallContent::Terminal(terminal) => Some(terminal),
376 ToolCallContent::ContentBlock(_) => None,
377 ToolCallContent::Diff(_) => None,
378 ToolCallContent::SubagentThread(_) => None,
379 })
380 }
381
382 pub fn subagent_thread(&self) -> Option<&Entity<AcpThread>> {
383 self.content.iter().find_map(|content| match content {
384 ToolCallContent::SubagentThread(thread) => Some(thread),
385 _ => None,
386 })
387 }
388
389 pub fn is_subagent(&self) -> bool {
390 matches!(self.kind, acp::ToolKind::Other)
391 && self
392 .tool_name
393 .as_ref()
394 .map(|n| n.as_ref() == SUBAGENT_TOOL_NAME)
395 .unwrap_or(false)
396 }
397
398 pub fn to_markdown(&self, cx: &App) -> String {
399 let mut markdown = format!(
400 "**Tool Call: {}**\nStatus: {}\n\n",
401 self.label.read(cx).source(),
402 self.status
403 );
404 for content in &self.content {
405 markdown.push_str(content.to_markdown(cx).as_str());
406 markdown.push_str("\n\n");
407 }
408 markdown
409 }
410
411 async fn resolve_location(
412 location: acp::ToolCallLocation,
413 project: WeakEntity<Project>,
414 cx: &mut AsyncApp,
415 ) -> Option<ResolvedLocation> {
416 let buffer = project
417 .update(cx, |project, cx| {
418 project
419 .project_path_for_absolute_path(&location.path, cx)
420 .map(|path| project.open_buffer(path, cx))
421 })
422 .ok()??;
423 let buffer = buffer.await.log_err()?;
424 let position = buffer.update(cx, |buffer, _| {
425 let snapshot = buffer.snapshot();
426 if let Some(row) = location.line {
427 let column = snapshot.indent_size_for_line(row).len;
428 let point = snapshot.clip_point(Point::new(row, column), Bias::Left);
429 snapshot.anchor_before(point)
430 } else {
431 Anchor::min_for_buffer(snapshot.remote_id())
432 }
433 });
434
435 Some(ResolvedLocation { buffer, position })
436 }
437
438 fn resolve_locations(
439 &self,
440 project: Entity<Project>,
441 cx: &mut App,
442 ) -> Task<Vec<Option<ResolvedLocation>>> {
443 let locations = self.locations.clone();
444 project.update(cx, |_, cx| {
445 cx.spawn(async move |project, cx| {
446 let mut new_locations = Vec::new();
447 for location in locations {
448 new_locations.push(Self::resolve_location(location, project.clone(), cx).await);
449 }
450 new_locations
451 })
452 })
453 }
454}
455
456// Separate so we can hold a strong reference to the buffer
457// for saving on the thread
458#[derive(Clone, Debug, PartialEq, Eq)]
459struct ResolvedLocation {
460 buffer: Entity<Buffer>,
461 position: Anchor,
462}
463
464impl From<&ResolvedLocation> for AgentLocation {
465 fn from(value: &ResolvedLocation) -> Self {
466 Self {
467 buffer: value.buffer.downgrade(),
468 position: value.position,
469 }
470 }
471}
472
473#[derive(Debug)]
474pub enum ToolCallStatus {
475 /// The tool call hasn't started running yet, but we start showing it to
476 /// the user.
477 Pending,
478 /// The tool call is waiting for confirmation from the user.
479 WaitingForConfirmation {
480 options: PermissionOptions,
481 respond_tx: oneshot::Sender<acp::PermissionOptionId>,
482 },
483 /// The tool call is currently running.
484 InProgress,
485 /// The tool call completed successfully.
486 Completed,
487 /// The tool call failed.
488 Failed,
489 /// The user rejected the tool call.
490 Rejected,
491 /// The user canceled generation so the tool call was canceled.
492 Canceled,
493}
494
495impl From<acp::ToolCallStatus> for ToolCallStatus {
496 fn from(status: acp::ToolCallStatus) -> Self {
497 match status {
498 acp::ToolCallStatus::Pending => Self::Pending,
499 acp::ToolCallStatus::InProgress => Self::InProgress,
500 acp::ToolCallStatus::Completed => Self::Completed,
501 acp::ToolCallStatus::Failed => Self::Failed,
502 _ => Self::Pending,
503 }
504 }
505}
506
507impl Display for ToolCallStatus {
508 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
509 write!(
510 f,
511 "{}",
512 match self {
513 ToolCallStatus::Pending => "Pending",
514 ToolCallStatus::WaitingForConfirmation { .. } => "Waiting for confirmation",
515 ToolCallStatus::InProgress => "In Progress",
516 ToolCallStatus::Completed => "Completed",
517 ToolCallStatus::Failed => "Failed",
518 ToolCallStatus::Rejected => "Rejected",
519 ToolCallStatus::Canceled => "Canceled",
520 }
521 )
522 }
523}
524
525#[derive(Debug, PartialEq, Clone)]
526pub enum ContentBlock {
527 Empty,
528 Markdown { markdown: Entity<Markdown> },
529 ResourceLink { resource_link: acp::ResourceLink },
530 Image { image: Arc<gpui::Image> },
531}
532
533impl ContentBlock {
534 pub fn new(
535 block: acp::ContentBlock,
536 language_registry: &Arc<LanguageRegistry>,
537 path_style: PathStyle,
538 cx: &mut App,
539 ) -> Self {
540 let mut this = Self::Empty;
541 this.append(block, language_registry, path_style, cx);
542 this
543 }
544
545 pub fn new_combined(
546 blocks: impl IntoIterator<Item = acp::ContentBlock>,
547 language_registry: Arc<LanguageRegistry>,
548 path_style: PathStyle,
549 cx: &mut App,
550 ) -> Self {
551 let mut this = Self::Empty;
552 for block in blocks {
553 this.append(block, &language_registry, path_style, cx);
554 }
555 this
556 }
557
558 pub fn append(
559 &mut self,
560 block: acp::ContentBlock,
561 language_registry: &Arc<LanguageRegistry>,
562 path_style: PathStyle,
563 cx: &mut App,
564 ) {
565 match (&mut *self, &block) {
566 (ContentBlock::Empty, acp::ContentBlock::ResourceLink(resource_link)) => {
567 *self = ContentBlock::ResourceLink {
568 resource_link: resource_link.clone(),
569 };
570 }
571 (ContentBlock::Empty, acp::ContentBlock::Image(image_content)) => {
572 if let Some(image) = Self::decode_image(image_content) {
573 *self = ContentBlock::Image { image };
574 } else {
575 let new_content = Self::image_md(image_content);
576 *self = Self::create_markdown_block(new_content, language_registry, cx);
577 }
578 }
579 (ContentBlock::Empty, _) => {
580 let new_content = Self::block_string_contents(&block, path_style);
581 *self = Self::create_markdown_block(new_content, language_registry, cx);
582 }
583 (ContentBlock::Markdown { markdown }, _) => {
584 let new_content = Self::block_string_contents(&block, path_style);
585 markdown.update(cx, |markdown, cx| markdown.append(&new_content, cx));
586 }
587 (ContentBlock::ResourceLink { resource_link }, _) => {
588 let existing_content = Self::resource_link_md(&resource_link.uri, path_style);
589 let new_content = Self::block_string_contents(&block, path_style);
590 let combined = format!("{}\n{}", existing_content, new_content);
591 *self = Self::create_markdown_block(combined, language_registry, cx);
592 }
593 (ContentBlock::Image { .. }, _) => {
594 let new_content = Self::block_string_contents(&block, path_style);
595 let combined = format!("`Image`\n{}", new_content);
596 *self = Self::create_markdown_block(combined, language_registry, cx);
597 }
598 }
599 }
600
601 fn decode_image(image_content: &acp::ImageContent) -> Option<Arc<gpui::Image>> {
602 use base64::Engine as _;
603
604 let bytes = base64::engine::general_purpose::STANDARD
605 .decode(image_content.data.as_bytes())
606 .ok()?;
607 let format = gpui::ImageFormat::from_mime_type(&image_content.mime_type)?;
608 Some(Arc::new(gpui::Image::from_bytes(format, bytes)))
609 }
610
611 fn create_markdown_block(
612 content: String,
613 language_registry: &Arc<LanguageRegistry>,
614 cx: &mut App,
615 ) -> ContentBlock {
616 ContentBlock::Markdown {
617 markdown: cx
618 .new(|cx| Markdown::new(content.into(), Some(language_registry.clone()), None, cx)),
619 }
620 }
621
622 fn block_string_contents(block: &acp::ContentBlock, path_style: PathStyle) -> String {
623 match block {
624 acp::ContentBlock::Text(text_content) => text_content.text.clone(),
625 acp::ContentBlock::ResourceLink(resource_link) => {
626 Self::resource_link_md(&resource_link.uri, path_style)
627 }
628 acp::ContentBlock::Resource(acp::EmbeddedResource {
629 resource:
630 acp::EmbeddedResourceResource::TextResourceContents(acp::TextResourceContents {
631 uri,
632 ..
633 }),
634 ..
635 }) => Self::resource_link_md(uri, path_style),
636 acp::ContentBlock::Image(image) => Self::image_md(image),
637 _ => String::new(),
638 }
639 }
640
641 fn resource_link_md(uri: &str, path_style: PathStyle) -> String {
642 if let Some(uri) = MentionUri::parse(uri, path_style).log_err() {
643 uri.as_link().to_string()
644 } else {
645 uri.to_string()
646 }
647 }
648
649 fn image_md(_image: &acp::ImageContent) -> String {
650 "`Image`".into()
651 }
652
653 pub fn to_markdown<'a>(&'a self, cx: &'a App) -> &'a str {
654 match self {
655 ContentBlock::Empty => "",
656 ContentBlock::Markdown { markdown } => markdown.read(cx).source(),
657 ContentBlock::ResourceLink { resource_link } => &resource_link.uri,
658 ContentBlock::Image { .. } => "`Image`",
659 }
660 }
661
662 pub fn markdown(&self) -> Option<&Entity<Markdown>> {
663 match self {
664 ContentBlock::Empty => None,
665 ContentBlock::Markdown { markdown } => Some(markdown),
666 ContentBlock::ResourceLink { .. } => None,
667 ContentBlock::Image { .. } => None,
668 }
669 }
670
671 pub fn resource_link(&self) -> Option<&acp::ResourceLink> {
672 match self {
673 ContentBlock::ResourceLink { resource_link } => Some(resource_link),
674 _ => None,
675 }
676 }
677
678 pub fn image(&self) -> Option<&Arc<gpui::Image>> {
679 match self {
680 ContentBlock::Image { image } => Some(image),
681 _ => None,
682 }
683 }
684}
685
686#[derive(Debug)]
687pub enum ToolCallContent {
688 ContentBlock(ContentBlock),
689 Diff(Entity<Diff>),
690 Terminal(Entity<Terminal>),
691 SubagentThread(Entity<AcpThread>),
692}
693
694impl ToolCallContent {
695 pub fn from_acp(
696 content: acp::ToolCallContent,
697 language_registry: Arc<LanguageRegistry>,
698 path_style: PathStyle,
699 terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
700 cx: &mut App,
701 ) -> Result<Option<Self>> {
702 match content {
703 acp::ToolCallContent::Content(acp::Content { content, .. }) => {
704 Ok(Some(Self::ContentBlock(ContentBlock::new(
705 content,
706 &language_registry,
707 path_style,
708 cx,
709 ))))
710 }
711 acp::ToolCallContent::Diff(diff) => Ok(Some(Self::Diff(cx.new(|cx| {
712 Diff::finalized(
713 diff.path.to_string_lossy().into_owned(),
714 diff.old_text,
715 diff.new_text,
716 language_registry,
717 cx,
718 )
719 })))),
720 acp::ToolCallContent::Terminal(acp::Terminal { terminal_id, .. }) => terminals
721 .get(&terminal_id)
722 .cloned()
723 .map(|terminal| Some(Self::Terminal(terminal)))
724 .ok_or_else(|| anyhow::anyhow!("Terminal with id `{}` not found", terminal_id)),
725 _ => Ok(None),
726 }
727 }
728
729 pub fn update_from_acp(
730 &mut self,
731 new: acp::ToolCallContent,
732 language_registry: Arc<LanguageRegistry>,
733 path_style: PathStyle,
734 terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
735 cx: &mut App,
736 ) -> Result<bool> {
737 let needs_update = match (&self, &new) {
738 (Self::Diff(old_diff), acp::ToolCallContent::Diff(new_diff)) => {
739 old_diff.read(cx).needs_update(
740 new_diff.old_text.as_deref().unwrap_or(""),
741 &new_diff.new_text,
742 cx,
743 )
744 }
745 _ => true,
746 };
747
748 if let Some(update) = Self::from_acp(new, language_registry, path_style, terminals, cx)? {
749 if needs_update {
750 *self = update;
751 }
752 Ok(true)
753 } else {
754 Ok(false)
755 }
756 }
757
758 pub fn to_markdown(&self, cx: &App) -> String {
759 match self {
760 Self::ContentBlock(content) => content.to_markdown(cx).to_string(),
761 Self::Diff(diff) => diff.read(cx).to_markdown(cx),
762 Self::Terminal(terminal) => terminal.read(cx).to_markdown(cx),
763 Self::SubagentThread(thread) => thread.read(cx).to_markdown(cx),
764 }
765 }
766
767 pub fn image(&self) -> Option<&Arc<gpui::Image>> {
768 match self {
769 Self::ContentBlock(content) => content.image(),
770 _ => None,
771 }
772 }
773
774 pub fn subagent_thread(&self) -> Option<&Entity<AcpThread>> {
775 match self {
776 Self::SubagentThread(thread) => Some(thread),
777 _ => None,
778 }
779 }
780}
781
782#[derive(Debug, PartialEq)]
783pub enum ToolCallUpdate {
784 UpdateFields(acp::ToolCallUpdate),
785 UpdateDiff(ToolCallUpdateDiff),
786 UpdateTerminal(ToolCallUpdateTerminal),
787 UpdateSubagentThread(ToolCallUpdateSubagentThread),
788}
789
790impl ToolCallUpdate {
791 fn id(&self) -> &acp::ToolCallId {
792 match self {
793 Self::UpdateFields(update) => &update.tool_call_id,
794 Self::UpdateDiff(diff) => &diff.id,
795 Self::UpdateTerminal(terminal) => &terminal.id,
796 Self::UpdateSubagentThread(subagent) => &subagent.id,
797 }
798 }
799}
800
801impl From<acp::ToolCallUpdate> for ToolCallUpdate {
802 fn from(update: acp::ToolCallUpdate) -> Self {
803 Self::UpdateFields(update)
804 }
805}
806
807impl From<ToolCallUpdateDiff> for ToolCallUpdate {
808 fn from(diff: ToolCallUpdateDiff) -> Self {
809 Self::UpdateDiff(diff)
810 }
811}
812
813#[derive(Debug, PartialEq)]
814pub struct ToolCallUpdateDiff {
815 pub id: acp::ToolCallId,
816 pub diff: Entity<Diff>,
817}
818
819impl From<ToolCallUpdateTerminal> for ToolCallUpdate {
820 fn from(terminal: ToolCallUpdateTerminal) -> Self {
821 Self::UpdateTerminal(terminal)
822 }
823}
824
825#[derive(Debug, PartialEq)]
826pub struct ToolCallUpdateTerminal {
827 pub id: acp::ToolCallId,
828 pub terminal: Entity<Terminal>,
829}
830
831impl From<ToolCallUpdateSubagentThread> for ToolCallUpdate {
832 fn from(subagent: ToolCallUpdateSubagentThread) -> Self {
833 Self::UpdateSubagentThread(subagent)
834 }
835}
836
837#[derive(Debug, PartialEq)]
838pub struct ToolCallUpdateSubagentThread {
839 pub id: acp::ToolCallId,
840 pub thread: Entity<AcpThread>,
841}
842
843#[derive(Debug, Default)]
844pub struct Plan {
845 pub entries: Vec<PlanEntry>,
846}
847
848#[derive(Debug)]
849pub struct PlanStats<'a> {
850 pub in_progress_entry: Option<&'a PlanEntry>,
851 pub pending: u32,
852 pub completed: u32,
853}
854
855impl Plan {
856 pub fn is_empty(&self) -> bool {
857 self.entries.is_empty()
858 }
859
860 pub fn stats(&self) -> PlanStats<'_> {
861 let mut stats = PlanStats {
862 in_progress_entry: None,
863 pending: 0,
864 completed: 0,
865 };
866
867 for entry in &self.entries {
868 match &entry.status {
869 acp::PlanEntryStatus::Pending => {
870 stats.pending += 1;
871 }
872 acp::PlanEntryStatus::InProgress => {
873 stats.in_progress_entry = stats.in_progress_entry.or(Some(entry));
874 }
875 acp::PlanEntryStatus::Completed => {
876 stats.completed += 1;
877 }
878 _ => {}
879 }
880 }
881
882 stats
883 }
884}
885
886#[derive(Debug)]
887pub struct PlanEntry {
888 pub content: Entity<Markdown>,
889 pub priority: acp::PlanEntryPriority,
890 pub status: acp::PlanEntryStatus,
891}
892
893impl PlanEntry {
894 pub fn from_acp(entry: acp::PlanEntry, cx: &mut App) -> Self {
895 Self {
896 content: cx.new(|cx| Markdown::new(entry.content.into(), None, None, cx)),
897 priority: entry.priority,
898 status: entry.status,
899 }
900 }
901}
902
903#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
904pub struct TokenUsage {
905 pub max_tokens: u64,
906 pub used_tokens: u64,
907 pub input_tokens: u64,
908 pub output_tokens: u64,
909}
910
911impl TokenUsage {
912 pub fn ratio(&self) -> TokenUsageRatio {
913 #[cfg(debug_assertions)]
914 let warning_threshold: f32 = std::env::var("ZED_THREAD_WARNING_THRESHOLD")
915 .unwrap_or("0.8".to_string())
916 .parse()
917 .unwrap();
918 #[cfg(not(debug_assertions))]
919 let warning_threshold: f32 = 0.8;
920
921 // When the maximum is unknown because there is no selected model,
922 // avoid showing the token limit warning.
923 if self.max_tokens == 0 {
924 TokenUsageRatio::Normal
925 } else if self.used_tokens >= self.max_tokens {
926 TokenUsageRatio::Exceeded
927 } else if self.used_tokens as f32 / self.max_tokens as f32 >= warning_threshold {
928 TokenUsageRatio::Warning
929 } else {
930 TokenUsageRatio::Normal
931 }
932 }
933}
934
935#[derive(Debug, Clone, PartialEq, Eq)]
936pub enum TokenUsageRatio {
937 Normal,
938 Warning,
939 Exceeded,
940}
941
942#[derive(Debug, Clone)]
943pub struct RetryStatus {
944 pub last_error: SharedString,
945 pub attempt: usize,
946 pub max_attempts: usize,
947 pub started_at: Instant,
948 pub duration: Duration,
949}
950
951pub struct AcpThread {
952 title: SharedString,
953 entries: Vec<AgentThreadEntry>,
954 plan: Plan,
955 project: Entity<Project>,
956 action_log: Entity<ActionLog>,
957 shared_buffers: HashMap<Entity<Buffer>, BufferSnapshot>,
958 send_task: Option<Task<()>>,
959 connection: Rc<dyn AgentConnection>,
960 session_id: acp::SessionId,
961 token_usage: Option<TokenUsage>,
962 prompt_capabilities: acp::PromptCapabilities,
963 _observe_prompt_capabilities: Task<anyhow::Result<()>>,
964 terminals: HashMap<acp::TerminalId, Entity<Terminal>>,
965 pending_terminal_output: HashMap<acp::TerminalId, Vec<Vec<u8>>>,
966 pending_terminal_exit: HashMap<acp::TerminalId, acp::TerminalExitStatus>,
967 // subagent cancellation fields
968 user_stopped: Arc<std::sync::atomic::AtomicBool>,
969 user_stop_tx: watch::Sender<bool>,
970}
971
972impl From<&AcpThread> for ActionLogTelemetry {
973 fn from(value: &AcpThread) -> Self {
974 Self {
975 agent_telemetry_id: value.connection().telemetry_id(),
976 session_id: value.session_id.0.clone(),
977 }
978 }
979}
980
981#[derive(Debug)]
982pub enum AcpThreadEvent {
983 NewEntry,
984 TitleUpdated,
985 TokenUsageUpdated,
986 EntryUpdated(usize),
987 EntriesRemoved(Range<usize>),
988 ToolAuthorizationRequired,
989 Retry(RetryStatus),
990 Stopped,
991 Error,
992 LoadError(LoadError),
993 PromptCapabilitiesUpdated,
994 Refusal,
995 AvailableCommandsUpdated(Vec<acp::AvailableCommand>),
996 ModeUpdated(acp::SessionModeId),
997 ConfigOptionsUpdated(Vec<acp::SessionConfigOption>),
998}
999
1000impl EventEmitter<AcpThreadEvent> for AcpThread {}
1001
1002#[derive(Debug, Clone)]
1003pub enum TerminalProviderEvent {
1004 Created {
1005 terminal_id: acp::TerminalId,
1006 label: String,
1007 cwd: Option<PathBuf>,
1008 output_byte_limit: Option<u64>,
1009 terminal: Entity<::terminal::Terminal>,
1010 },
1011 Output {
1012 terminal_id: acp::TerminalId,
1013 data: Vec<u8>,
1014 },
1015 TitleChanged {
1016 terminal_id: acp::TerminalId,
1017 title: String,
1018 },
1019 Exit {
1020 terminal_id: acp::TerminalId,
1021 status: acp::TerminalExitStatus,
1022 },
1023}
1024
1025#[derive(Debug, Clone)]
1026pub enum TerminalProviderCommand {
1027 WriteInput {
1028 terminal_id: acp::TerminalId,
1029 bytes: Vec<u8>,
1030 },
1031 Resize {
1032 terminal_id: acp::TerminalId,
1033 cols: u16,
1034 rows: u16,
1035 },
1036 Close {
1037 terminal_id: acp::TerminalId,
1038 },
1039}
1040
1041impl AcpThread {
1042 pub fn on_terminal_provider_event(
1043 &mut self,
1044 event: TerminalProviderEvent,
1045 cx: &mut Context<Self>,
1046 ) {
1047 match event {
1048 TerminalProviderEvent::Created {
1049 terminal_id,
1050 label,
1051 cwd,
1052 output_byte_limit,
1053 terminal,
1054 } => {
1055 let entity = self.register_terminal_created(
1056 terminal_id.clone(),
1057 label,
1058 cwd,
1059 output_byte_limit,
1060 terminal,
1061 cx,
1062 );
1063
1064 if let Some(mut chunks) = self.pending_terminal_output.remove(&terminal_id) {
1065 for data in chunks.drain(..) {
1066 entity.update(cx, |term, cx| {
1067 term.inner().update(cx, |inner, cx| {
1068 inner.write_output(&data, cx);
1069 })
1070 });
1071 }
1072 }
1073
1074 if let Some(_status) = self.pending_terminal_exit.remove(&terminal_id) {
1075 entity.update(cx, |_term, cx| {
1076 cx.notify();
1077 });
1078 }
1079
1080 cx.notify();
1081 }
1082 TerminalProviderEvent::Output { terminal_id, data } => {
1083 if let Some(entity) = self.terminals.get(&terminal_id) {
1084 entity.update(cx, |term, cx| {
1085 term.inner().update(cx, |inner, cx| {
1086 inner.write_output(&data, cx);
1087 })
1088 });
1089 } else {
1090 self.pending_terminal_output
1091 .entry(terminal_id)
1092 .or_default()
1093 .push(data);
1094 }
1095 }
1096 TerminalProviderEvent::TitleChanged { terminal_id, title } => {
1097 if let Some(entity) = self.terminals.get(&terminal_id) {
1098 entity.update(cx, |term, cx| {
1099 term.inner().update(cx, |inner, cx| {
1100 inner.breadcrumb_text = title;
1101 cx.emit(::terminal::Event::BreadcrumbsChanged);
1102 })
1103 });
1104 }
1105 }
1106 TerminalProviderEvent::Exit {
1107 terminal_id,
1108 status,
1109 } => {
1110 if let Some(entity) = self.terminals.get(&terminal_id) {
1111 entity.update(cx, |_term, cx| {
1112 cx.notify();
1113 });
1114 } else {
1115 self.pending_terminal_exit.insert(terminal_id, status);
1116 }
1117 }
1118 }
1119 }
1120}
1121
1122#[derive(PartialEq, Eq, Debug)]
1123pub enum ThreadStatus {
1124 Idle,
1125 Generating,
1126}
1127
1128#[derive(Debug, Clone)]
1129pub enum LoadError {
1130 Unsupported {
1131 command: SharedString,
1132 current_version: SharedString,
1133 minimum_version: SharedString,
1134 },
1135 FailedToInstall(SharedString),
1136 Exited {
1137 status: ExitStatus,
1138 },
1139 Other(SharedString),
1140}
1141
1142impl Display for LoadError {
1143 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1144 match self {
1145 LoadError::Unsupported {
1146 command: path,
1147 current_version,
1148 minimum_version,
1149 } => {
1150 write!(
1151 f,
1152 "version {current_version} from {path} is not supported (need at least {minimum_version})"
1153 )
1154 }
1155 LoadError::FailedToInstall(msg) => write!(f, "Failed to install: {msg}"),
1156 LoadError::Exited { status } => write!(f, "Server exited with status {status}"),
1157 LoadError::Other(msg) => write!(f, "{msg}"),
1158 }
1159 }
1160}
1161
1162impl Error for LoadError {}
1163
1164impl AcpThread {
1165 pub fn new(
1166 title: impl Into<SharedString>,
1167 connection: Rc<dyn AgentConnection>,
1168 project: Entity<Project>,
1169 action_log: Entity<ActionLog>,
1170 session_id: acp::SessionId,
1171 mut prompt_capabilities_rx: watch::Receiver<acp::PromptCapabilities>,
1172 cx: &mut Context<Self>,
1173 ) -> Self {
1174 let prompt_capabilities = prompt_capabilities_rx.borrow().clone();
1175 let task = cx.spawn::<_, anyhow::Result<()>>(async move |this, cx| {
1176 loop {
1177 let caps = prompt_capabilities_rx.recv().await?;
1178 this.update(cx, |this, cx| {
1179 this.prompt_capabilities = caps;
1180 cx.emit(AcpThreadEvent::PromptCapabilitiesUpdated);
1181 })?;
1182 }
1183 });
1184
1185 let (user_stop_tx, _user_stop_rx) = watch::channel(false);
1186
1187 Self {
1188 action_log,
1189 shared_buffers: Default::default(),
1190 entries: Default::default(),
1191 plan: Default::default(),
1192 title: title.into(),
1193 project,
1194 send_task: None,
1195 connection,
1196 session_id,
1197 token_usage: None,
1198 prompt_capabilities,
1199 _observe_prompt_capabilities: task,
1200 terminals: HashMap::default(),
1201 pending_terminal_output: HashMap::default(),
1202 pending_terminal_exit: HashMap::default(),
1203 user_stopped: Arc::new(std::sync::atomic::AtomicBool::new(false)),
1204 user_stop_tx,
1205 }
1206 }
1207
1208 pub fn prompt_capabilities(&self) -> acp::PromptCapabilities {
1209 self.prompt_capabilities.clone()
1210 }
1211
1212 /// Marks this thread as stopped by user action and signals any listeners.
1213 pub fn stop_by_user(&mut self) {
1214 self.user_stopped
1215 .store(true, std::sync::atomic::Ordering::SeqCst);
1216 self.user_stop_tx.send(true).ok();
1217 }
1218
1219 pub fn was_stopped_by_user(&self) -> bool {
1220 self.user_stopped.load(std::sync::atomic::Ordering::SeqCst)
1221 }
1222
1223 pub fn user_stop_receiver(&self) -> watch::Receiver<bool> {
1224 self.user_stop_tx.receiver()
1225 }
1226
1227 pub fn connection(&self) -> &Rc<dyn AgentConnection> {
1228 &self.connection
1229 }
1230
1231 pub fn action_log(&self) -> &Entity<ActionLog> {
1232 &self.action_log
1233 }
1234
1235 pub fn project(&self) -> &Entity<Project> {
1236 &self.project
1237 }
1238
1239 pub fn title(&self) -> SharedString {
1240 self.title.clone()
1241 }
1242
1243 pub fn entries(&self) -> &[AgentThreadEntry] {
1244 &self.entries
1245 }
1246
1247 pub fn session_id(&self) -> &acp::SessionId {
1248 &self.session_id
1249 }
1250
1251 pub fn status(&self) -> ThreadStatus {
1252 if self.send_task.is_some() {
1253 ThreadStatus::Generating
1254 } else {
1255 ThreadStatus::Idle
1256 }
1257 }
1258
1259 pub fn token_usage(&self) -> Option<&TokenUsage> {
1260 self.token_usage.as_ref()
1261 }
1262
1263 pub fn has_pending_edit_tool_calls(&self) -> bool {
1264 for entry in self.entries.iter().rev() {
1265 match entry {
1266 AgentThreadEntry::UserMessage(_) => return false,
1267 AgentThreadEntry::ToolCall(
1268 call @ ToolCall {
1269 status: ToolCallStatus::InProgress | ToolCallStatus::Pending,
1270 ..
1271 },
1272 ) if call.diffs().next().is_some() => {
1273 return true;
1274 }
1275 AgentThreadEntry::ToolCall(_) | AgentThreadEntry::AssistantMessage(_) => {}
1276 }
1277 }
1278
1279 false
1280 }
1281
1282 pub fn has_in_progress_tool_calls(&self) -> bool {
1283 for entry in self.entries.iter().rev() {
1284 match entry {
1285 AgentThreadEntry::UserMessage(_) => return false,
1286 AgentThreadEntry::ToolCall(ToolCall {
1287 status: ToolCallStatus::InProgress | ToolCallStatus::Pending,
1288 ..
1289 }) => {
1290 return true;
1291 }
1292 AgentThreadEntry::ToolCall(_) | AgentThreadEntry::AssistantMessage(_) => {}
1293 }
1294 }
1295
1296 false
1297 }
1298
1299 pub fn used_tools_since_last_user_message(&self) -> bool {
1300 for entry in self.entries.iter().rev() {
1301 match entry {
1302 AgentThreadEntry::UserMessage(..) => return false,
1303 AgentThreadEntry::AssistantMessage(..) => continue,
1304 AgentThreadEntry::ToolCall(..) => return true,
1305 }
1306 }
1307
1308 false
1309 }
1310
1311 pub fn handle_session_update(
1312 &mut self,
1313 update: acp::SessionUpdate,
1314 cx: &mut Context<Self>,
1315 ) -> Result<(), acp::Error> {
1316 match update {
1317 acp::SessionUpdate::UserMessageChunk(acp::ContentChunk { content, .. }) => {
1318 self.push_user_content_block(None, content, cx);
1319 }
1320 acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk { content, .. }) => {
1321 self.push_assistant_content_block(content, false, cx);
1322 }
1323 acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk { content, .. }) => {
1324 self.push_assistant_content_block(content, true, cx);
1325 }
1326 acp::SessionUpdate::ToolCall(tool_call) => {
1327 self.upsert_tool_call(tool_call, cx)?;
1328 }
1329 acp::SessionUpdate::ToolCallUpdate(tool_call_update) => {
1330 self.update_tool_call(tool_call_update, cx)?;
1331 }
1332 acp::SessionUpdate::Plan(plan) => {
1333 self.update_plan(plan, cx);
1334 }
1335 acp::SessionUpdate::AvailableCommandsUpdate(acp::AvailableCommandsUpdate {
1336 available_commands,
1337 ..
1338 }) => cx.emit(AcpThreadEvent::AvailableCommandsUpdated(available_commands)),
1339 acp::SessionUpdate::CurrentModeUpdate(acp::CurrentModeUpdate {
1340 current_mode_id,
1341 ..
1342 }) => cx.emit(AcpThreadEvent::ModeUpdated(current_mode_id)),
1343 acp::SessionUpdate::ConfigOptionUpdate(acp::ConfigOptionUpdate {
1344 config_options,
1345 ..
1346 }) => cx.emit(AcpThreadEvent::ConfigOptionsUpdated(config_options)),
1347 _ => {}
1348 }
1349 Ok(())
1350 }
1351
1352 pub fn push_user_content_block(
1353 &mut self,
1354 message_id: Option<UserMessageId>,
1355 chunk: acp::ContentBlock,
1356 cx: &mut Context<Self>,
1357 ) {
1358 self.push_user_content_block_with_indent(message_id, chunk, false, cx)
1359 }
1360
1361 pub fn push_user_content_block_with_indent(
1362 &mut self,
1363 message_id: Option<UserMessageId>,
1364 chunk: acp::ContentBlock,
1365 indented: bool,
1366 cx: &mut Context<Self>,
1367 ) {
1368 let language_registry = self.project.read(cx).languages().clone();
1369 let path_style = self.project.read(cx).path_style(cx);
1370 let entries_len = self.entries.len();
1371
1372 if let Some(last_entry) = self.entries.last_mut()
1373 && let AgentThreadEntry::UserMessage(UserMessage {
1374 id,
1375 content,
1376 chunks,
1377 indented: existing_indented,
1378 ..
1379 }) = last_entry
1380 && *existing_indented == indented
1381 {
1382 *id = message_id.or(id.take());
1383 content.append(chunk.clone(), &language_registry, path_style, cx);
1384 chunks.push(chunk);
1385 let idx = entries_len - 1;
1386 cx.emit(AcpThreadEvent::EntryUpdated(idx));
1387 } else {
1388 let content = ContentBlock::new(chunk.clone(), &language_registry, path_style, cx);
1389 self.push_entry(
1390 AgentThreadEntry::UserMessage(UserMessage {
1391 id: message_id,
1392 content,
1393 chunks: vec![chunk],
1394 checkpoint: None,
1395 indented,
1396 }),
1397 cx,
1398 );
1399 }
1400 }
1401
1402 pub fn push_assistant_content_block(
1403 &mut self,
1404 chunk: acp::ContentBlock,
1405 is_thought: bool,
1406 cx: &mut Context<Self>,
1407 ) {
1408 self.push_assistant_content_block_with_indent(chunk, is_thought, false, cx)
1409 }
1410
1411 pub fn push_assistant_content_block_with_indent(
1412 &mut self,
1413 chunk: acp::ContentBlock,
1414 is_thought: bool,
1415 indented: bool,
1416 cx: &mut Context<Self>,
1417 ) {
1418 let language_registry = self.project.read(cx).languages().clone();
1419 let path_style = self.project.read(cx).path_style(cx);
1420 let entries_len = self.entries.len();
1421 if let Some(last_entry) = self.entries.last_mut()
1422 && let AgentThreadEntry::AssistantMessage(AssistantMessage {
1423 chunks,
1424 indented: existing_indented,
1425 }) = last_entry
1426 && *existing_indented == indented
1427 {
1428 let idx = entries_len - 1;
1429 cx.emit(AcpThreadEvent::EntryUpdated(idx));
1430 match (chunks.last_mut(), is_thought) {
1431 (Some(AssistantMessageChunk::Message { block }), false)
1432 | (Some(AssistantMessageChunk::Thought { block }), true) => {
1433 block.append(chunk, &language_registry, path_style, cx)
1434 }
1435 _ => {
1436 let block = ContentBlock::new(chunk, &language_registry, path_style, cx);
1437 if is_thought {
1438 chunks.push(AssistantMessageChunk::Thought { block })
1439 } else {
1440 chunks.push(AssistantMessageChunk::Message { block })
1441 }
1442 }
1443 }
1444 } else {
1445 let block = ContentBlock::new(chunk, &language_registry, path_style, cx);
1446 let chunk = if is_thought {
1447 AssistantMessageChunk::Thought { block }
1448 } else {
1449 AssistantMessageChunk::Message { block }
1450 };
1451
1452 self.push_entry(
1453 AgentThreadEntry::AssistantMessage(AssistantMessage {
1454 chunks: vec![chunk],
1455 indented,
1456 }),
1457 cx,
1458 );
1459 }
1460 }
1461
1462 fn push_entry(&mut self, entry: AgentThreadEntry, cx: &mut Context<Self>) {
1463 self.entries.push(entry);
1464 cx.emit(AcpThreadEvent::NewEntry);
1465 }
1466
1467 pub fn can_set_title(&mut self, cx: &mut Context<Self>) -> bool {
1468 self.connection.set_title(&self.session_id, cx).is_some()
1469 }
1470
1471 pub fn set_title(&mut self, title: SharedString, cx: &mut Context<Self>) -> Task<Result<()>> {
1472 if title != self.title {
1473 self.title = title.clone();
1474 cx.emit(AcpThreadEvent::TitleUpdated);
1475 if let Some(set_title) = self.connection.set_title(&self.session_id, cx) {
1476 return set_title.run(title, cx);
1477 }
1478 }
1479 Task::ready(Ok(()))
1480 }
1481
1482 pub fn update_token_usage(&mut self, usage: Option<TokenUsage>, cx: &mut Context<Self>) {
1483 self.token_usage = usage;
1484 cx.emit(AcpThreadEvent::TokenUsageUpdated);
1485 }
1486
1487 pub fn update_retry_status(&mut self, status: RetryStatus, cx: &mut Context<Self>) {
1488 cx.emit(AcpThreadEvent::Retry(status));
1489 }
1490
1491 pub fn update_tool_call(
1492 &mut self,
1493 update: impl Into<ToolCallUpdate>,
1494 cx: &mut Context<Self>,
1495 ) -> Result<()> {
1496 let update = update.into();
1497 let languages = self.project.read(cx).languages().clone();
1498 let path_style = self.project.read(cx).path_style(cx);
1499
1500 let ix = match self.index_for_tool_call(update.id()) {
1501 Some(ix) => ix,
1502 None => {
1503 // Tool call not found - create a failed tool call entry
1504 let failed_tool_call = ToolCall {
1505 id: update.id().clone(),
1506 label: cx.new(|cx| Markdown::new("Tool call not found".into(), None, None, cx)),
1507 kind: acp::ToolKind::Fetch,
1508 content: vec![ToolCallContent::ContentBlock(ContentBlock::new(
1509 "Tool call not found".into(),
1510 &languages,
1511 path_style,
1512 cx,
1513 ))],
1514 status: ToolCallStatus::Failed,
1515 locations: Vec::new(),
1516 resolved_locations: Vec::new(),
1517 raw_input: None,
1518 raw_input_markdown: None,
1519 raw_output: None,
1520 tool_name: None,
1521 };
1522 self.push_entry(AgentThreadEntry::ToolCall(failed_tool_call), cx);
1523 return Ok(());
1524 }
1525 };
1526 let AgentThreadEntry::ToolCall(call) = &mut self.entries[ix] else {
1527 unreachable!()
1528 };
1529
1530 match update {
1531 ToolCallUpdate::UpdateFields(update) => {
1532 let location_updated = update.fields.locations.is_some();
1533 call.update_fields(update.fields, languages, path_style, &self.terminals, cx)?;
1534 if location_updated {
1535 self.resolve_locations(update.tool_call_id, cx);
1536 }
1537 }
1538 ToolCallUpdate::UpdateDiff(update) => {
1539 call.content.clear();
1540 call.content.push(ToolCallContent::Diff(update.diff));
1541 }
1542 ToolCallUpdate::UpdateTerminal(update) => {
1543 call.content.clear();
1544 call.content
1545 .push(ToolCallContent::Terminal(update.terminal));
1546 }
1547 ToolCallUpdate::UpdateSubagentThread(update) => {
1548 debug_assert!(
1549 !call.content.iter().any(|c| {
1550 matches!(c, ToolCallContent::SubagentThread(existing) if existing == &update.thread)
1551 }),
1552 "Duplicate SubagentThread update for the same AcpThread entity"
1553 );
1554 call.content
1555 .push(ToolCallContent::SubagentThread(update.thread));
1556 }
1557 }
1558
1559 cx.emit(AcpThreadEvent::EntryUpdated(ix));
1560
1561 Ok(())
1562 }
1563
1564 /// Updates a tool call if id matches an existing entry, otherwise inserts a new one.
1565 pub fn upsert_tool_call(
1566 &mut self,
1567 tool_call: acp::ToolCall,
1568 cx: &mut Context<Self>,
1569 ) -> Result<(), acp::Error> {
1570 let status = tool_call.status.into();
1571 self.upsert_tool_call_inner(tool_call.into(), status, cx)
1572 }
1573
1574 /// Fails if id does not match an existing entry.
1575 pub fn upsert_tool_call_inner(
1576 &mut self,
1577 update: acp::ToolCallUpdate,
1578 status: ToolCallStatus,
1579 cx: &mut Context<Self>,
1580 ) -> Result<(), acp::Error> {
1581 let language_registry = self.project.read(cx).languages().clone();
1582 let path_style = self.project.read(cx).path_style(cx);
1583 let id = update.tool_call_id.clone();
1584
1585 let agent_telemetry_id = self.connection().telemetry_id();
1586 let session = self.session_id();
1587 if let ToolCallStatus::Completed | ToolCallStatus::Failed = status {
1588 let status = if matches!(status, ToolCallStatus::Completed) {
1589 "completed"
1590 } else {
1591 "failed"
1592 };
1593 telemetry::event!(
1594 "Agent Tool Call Completed",
1595 agent_telemetry_id,
1596 session,
1597 status
1598 );
1599 }
1600
1601 if let Some(ix) = self.index_for_tool_call(&id) {
1602 let AgentThreadEntry::ToolCall(call) = &mut self.entries[ix] else {
1603 unreachable!()
1604 };
1605
1606 call.update_fields(
1607 update.fields,
1608 language_registry,
1609 path_style,
1610 &self.terminals,
1611 cx,
1612 )?;
1613 call.status = status;
1614
1615 cx.emit(AcpThreadEvent::EntryUpdated(ix));
1616 } else {
1617 let call = ToolCall::from_acp(
1618 update.try_into()?,
1619 status,
1620 language_registry,
1621 self.project.read(cx).path_style(cx),
1622 &self.terminals,
1623 cx,
1624 )?;
1625 self.push_entry(AgentThreadEntry::ToolCall(call), cx);
1626 };
1627
1628 self.resolve_locations(id, cx);
1629 Ok(())
1630 }
1631
1632 fn index_for_tool_call(&self, id: &acp::ToolCallId) -> Option<usize> {
1633 self.entries
1634 .iter()
1635 .enumerate()
1636 .rev()
1637 .find_map(|(index, entry)| {
1638 if let AgentThreadEntry::ToolCall(tool_call) = entry
1639 && &tool_call.id == id
1640 {
1641 Some(index)
1642 } else {
1643 None
1644 }
1645 })
1646 }
1647
1648 fn tool_call_mut(&mut self, id: &acp::ToolCallId) -> Option<(usize, &mut ToolCall)> {
1649 // The tool call we are looking for is typically the last one, or very close to the end.
1650 // At the moment, it doesn't seem like a hashmap would be a good fit for this use case.
1651 self.entries
1652 .iter_mut()
1653 .enumerate()
1654 .rev()
1655 .find_map(|(index, tool_call)| {
1656 if let AgentThreadEntry::ToolCall(tool_call) = tool_call
1657 && &tool_call.id == id
1658 {
1659 Some((index, tool_call))
1660 } else {
1661 None
1662 }
1663 })
1664 }
1665
1666 pub fn tool_call(&mut self, id: &acp::ToolCallId) -> Option<(usize, &ToolCall)> {
1667 self.entries
1668 .iter()
1669 .enumerate()
1670 .rev()
1671 .find_map(|(index, tool_call)| {
1672 if let AgentThreadEntry::ToolCall(tool_call) = tool_call
1673 && &tool_call.id == id
1674 {
1675 Some((index, tool_call))
1676 } else {
1677 None
1678 }
1679 })
1680 }
1681
1682 pub fn resolve_locations(&mut self, id: acp::ToolCallId, cx: &mut Context<Self>) {
1683 let project = self.project.clone();
1684 let Some((_, tool_call)) = self.tool_call_mut(&id) else {
1685 return;
1686 };
1687 let task = tool_call.resolve_locations(project, cx);
1688 cx.spawn(async move |this, cx| {
1689 let resolved_locations = task.await;
1690
1691 this.update(cx, |this, cx| {
1692 let project = this.project.clone();
1693
1694 for location in resolved_locations.iter().flatten() {
1695 this.shared_buffers
1696 .insert(location.buffer.clone(), location.buffer.read(cx).snapshot());
1697 }
1698 let Some((ix, tool_call)) = this.tool_call_mut(&id) else {
1699 return;
1700 };
1701
1702 if let Some(Some(location)) = resolved_locations.last() {
1703 project.update(cx, |project, cx| {
1704 let should_ignore = if let Some(agent_location) = project
1705 .agent_location()
1706 .filter(|agent_location| agent_location.buffer == location.buffer)
1707 {
1708 let snapshot = location.buffer.read(cx).snapshot();
1709 let old_position = agent_location.position.to_point(&snapshot);
1710 let new_position = location.position.to_point(&snapshot);
1711
1712 // ignore this so that when we get updates from the edit tool
1713 // the position doesn't reset to the startof line
1714 old_position.row == new_position.row
1715 && old_position.column > new_position.column
1716 } else {
1717 false
1718 };
1719 if !should_ignore {
1720 project.set_agent_location(Some(location.into()), cx);
1721 }
1722 });
1723 }
1724
1725 let resolved_locations = resolved_locations
1726 .iter()
1727 .map(|l| l.as_ref().map(|l| AgentLocation::from(l)))
1728 .collect::<Vec<_>>();
1729
1730 if tool_call.resolved_locations != resolved_locations {
1731 tool_call.resolved_locations = resolved_locations;
1732 cx.emit(AcpThreadEvent::EntryUpdated(ix));
1733 }
1734 })
1735 })
1736 .detach();
1737 }
1738
1739 pub fn request_tool_call_authorization(
1740 &mut self,
1741 tool_call: acp::ToolCallUpdate,
1742 options: PermissionOptions,
1743 respect_always_allow_setting: bool,
1744 cx: &mut Context<Self>,
1745 ) -> Result<BoxFuture<'static, acp::RequestPermissionOutcome>> {
1746 let (tx, rx) = oneshot::channel();
1747
1748 if respect_always_allow_setting && AgentSettings::get_global(cx).always_allow_tool_actions {
1749 // Don't use AllowAlways, because then if you were to turn off always_allow_tool_actions,
1750 // some tools would (incorrectly) continue to auto-accept.
1751 if let Some(allow_once_option) = options.allow_once_option_id() {
1752 self.upsert_tool_call_inner(tool_call, ToolCallStatus::Pending, cx)?;
1753 return Ok(async {
1754 acp::RequestPermissionOutcome::Selected(acp::SelectedPermissionOutcome::new(
1755 allow_once_option,
1756 ))
1757 }
1758 .boxed());
1759 }
1760 }
1761
1762 let status = ToolCallStatus::WaitingForConfirmation {
1763 options,
1764 respond_tx: tx,
1765 };
1766
1767 self.upsert_tool_call_inner(tool_call, status, cx)?;
1768 cx.emit(AcpThreadEvent::ToolAuthorizationRequired);
1769
1770 let fut = async {
1771 match rx.await {
1772 Ok(option) => acp::RequestPermissionOutcome::Selected(
1773 acp::SelectedPermissionOutcome::new(option),
1774 ),
1775 Err(oneshot::Canceled) => acp::RequestPermissionOutcome::Cancelled,
1776 }
1777 }
1778 .boxed();
1779
1780 Ok(fut)
1781 }
1782
1783 pub fn authorize_tool_call(
1784 &mut self,
1785 id: acp::ToolCallId,
1786 option_id: acp::PermissionOptionId,
1787 option_kind: acp::PermissionOptionKind,
1788 cx: &mut Context<Self>,
1789 ) {
1790 let Some((ix, call)) = self.tool_call_mut(&id) else {
1791 return;
1792 };
1793
1794 let new_status = match option_kind {
1795 acp::PermissionOptionKind::RejectOnce | acp::PermissionOptionKind::RejectAlways => {
1796 ToolCallStatus::Rejected
1797 }
1798 acp::PermissionOptionKind::AllowOnce | acp::PermissionOptionKind::AllowAlways => {
1799 ToolCallStatus::InProgress
1800 }
1801 _ => ToolCallStatus::InProgress,
1802 };
1803
1804 let curr_status = mem::replace(&mut call.status, new_status);
1805
1806 if let ToolCallStatus::WaitingForConfirmation { respond_tx, .. } = curr_status {
1807 respond_tx.send(option_id).log_err();
1808 } else if cfg!(debug_assertions) {
1809 panic!("tried to authorize an already authorized tool call");
1810 }
1811
1812 cx.emit(AcpThreadEvent::EntryUpdated(ix));
1813 }
1814
1815 pub fn first_tool_awaiting_confirmation(&self) -> Option<&ToolCall> {
1816 let mut first_tool_call = None;
1817
1818 for entry in self.entries.iter().rev() {
1819 match &entry {
1820 AgentThreadEntry::ToolCall(call) => {
1821 if let ToolCallStatus::WaitingForConfirmation { .. } = call.status {
1822 first_tool_call = Some(call);
1823 } else {
1824 continue;
1825 }
1826 }
1827 AgentThreadEntry::UserMessage(_) | AgentThreadEntry::AssistantMessage(_) => {
1828 // Reached the beginning of the turn.
1829 // If we had pending permission requests in the previous turn, they have been cancelled.
1830 break;
1831 }
1832 }
1833 }
1834
1835 first_tool_call
1836 }
1837
1838 pub fn plan(&self) -> &Plan {
1839 &self.plan
1840 }
1841
1842 pub fn update_plan(&mut self, request: acp::Plan, cx: &mut Context<Self>) {
1843 let new_entries_len = request.entries.len();
1844 let mut new_entries = request.entries.into_iter();
1845
1846 // Reuse existing markdown to prevent flickering
1847 for (old, new) in self.plan.entries.iter_mut().zip(new_entries.by_ref()) {
1848 let PlanEntry {
1849 content,
1850 priority,
1851 status,
1852 } = old;
1853 content.update(cx, |old, cx| {
1854 old.replace(new.content, cx);
1855 });
1856 *priority = new.priority;
1857 *status = new.status;
1858 }
1859 for new in new_entries {
1860 self.plan.entries.push(PlanEntry::from_acp(new, cx))
1861 }
1862 self.plan.entries.truncate(new_entries_len);
1863
1864 cx.notify();
1865 }
1866
1867 fn clear_completed_plan_entries(&mut self, cx: &mut Context<Self>) {
1868 self.plan
1869 .entries
1870 .retain(|entry| !matches!(entry.status, acp::PlanEntryStatus::Completed));
1871 cx.notify();
1872 }
1873
1874 #[cfg(any(test, feature = "test-support"))]
1875 pub fn send_raw(
1876 &mut self,
1877 message: &str,
1878 cx: &mut Context<Self>,
1879 ) -> BoxFuture<'static, Result<()>> {
1880 self.send(vec![message.into()], cx)
1881 }
1882
1883 pub fn send(
1884 &mut self,
1885 message: Vec<acp::ContentBlock>,
1886 cx: &mut Context<Self>,
1887 ) -> BoxFuture<'static, Result<()>> {
1888 let block = ContentBlock::new_combined(
1889 message.clone(),
1890 self.project.read(cx).languages().clone(),
1891 self.project.read(cx).path_style(cx),
1892 cx,
1893 );
1894 let request = acp::PromptRequest::new(self.session_id.clone(), message.clone());
1895 let git_store = self.project.read(cx).git_store().clone();
1896
1897 let message_id = if self.connection.truncate(&self.session_id, cx).is_some() {
1898 Some(UserMessageId::new())
1899 } else {
1900 None
1901 };
1902
1903 self.run_turn(cx, async move |this, cx| {
1904 this.update(cx, |this, cx| {
1905 this.push_entry(
1906 AgentThreadEntry::UserMessage(UserMessage {
1907 id: message_id.clone(),
1908 content: block,
1909 chunks: message,
1910 checkpoint: None,
1911 indented: false,
1912 }),
1913 cx,
1914 );
1915 })
1916 .ok();
1917
1918 let old_checkpoint = git_store
1919 .update(cx, |git, cx| git.checkpoint(cx))
1920 .await
1921 .context("failed to get old checkpoint")
1922 .log_err();
1923 this.update(cx, |this, cx| {
1924 if let Some((_ix, message)) = this.last_user_message() {
1925 message.checkpoint = old_checkpoint.map(|git_checkpoint| Checkpoint {
1926 git_checkpoint,
1927 show: false,
1928 });
1929 }
1930 this.connection.prompt(message_id, request, cx)
1931 })?
1932 .await
1933 })
1934 }
1935
1936 pub fn can_retry(&self, cx: &App) -> bool {
1937 self.connection.retry(&self.session_id, cx).is_some()
1938 }
1939
1940 pub fn retry(&mut self, cx: &mut Context<Self>) -> BoxFuture<'static, Result<()>> {
1941 self.run_turn(cx, async move |this, cx| {
1942 this.update(cx, |this, cx| {
1943 this.connection
1944 .retry(&this.session_id, cx)
1945 .map(|retry| retry.run(cx))
1946 })?
1947 .context("retrying a session is not supported")?
1948 .await
1949 })
1950 }
1951
1952 fn run_turn(
1953 &mut self,
1954 cx: &mut Context<Self>,
1955 f: impl 'static + AsyncFnOnce(WeakEntity<Self>, &mut AsyncApp) -> Result<acp::PromptResponse>,
1956 ) -> BoxFuture<'static, Result<()>> {
1957 self.clear_completed_plan_entries(cx);
1958
1959 let (tx, rx) = oneshot::channel();
1960 let cancel_task = self.cancel(cx);
1961
1962 self.send_task = Some(cx.spawn(async move |this, cx| {
1963 cancel_task.await;
1964 tx.send(f(this, cx).await).ok();
1965 }));
1966
1967 cx.spawn(async move |this, cx| {
1968 let response = rx.await;
1969
1970 this.update(cx, |this, cx| this.update_last_checkpoint(cx))?
1971 .await?;
1972
1973 this.update(cx, |this, cx| {
1974 this.project
1975 .update(cx, |project, cx| project.set_agent_location(None, cx));
1976 match response {
1977 Ok(Err(e)) => {
1978 this.send_task.take();
1979 cx.emit(AcpThreadEvent::Error);
1980 Err(e)
1981 }
1982 result => {
1983 let canceled = matches!(
1984 result,
1985 Ok(Ok(acp::PromptResponse {
1986 stop_reason: acp::StopReason::Cancelled,
1987 ..
1988 }))
1989 );
1990
1991 // We only take the task if the current prompt wasn't canceled.
1992 //
1993 // This prompt may have been canceled because another one was sent
1994 // while it was still generating. In these cases, dropping `send_task`
1995 // would cause the next generation to be canceled.
1996 if !canceled {
1997 this.send_task.take();
1998 }
1999
2000 // Handle refusal - distinguish between user prompt and tool call refusals
2001 if let Ok(Ok(acp::PromptResponse {
2002 stop_reason: acp::StopReason::Refusal,
2003 ..
2004 })) = result
2005 {
2006 if let Some((user_msg_ix, _)) = this.last_user_message() {
2007 // Check if there's a completed tool call with results after the last user message
2008 // This indicates the refusal is in response to tool output, not the user's prompt
2009 let has_completed_tool_call_after_user_msg =
2010 this.entries.iter().skip(user_msg_ix + 1).any(|entry| {
2011 if let AgentThreadEntry::ToolCall(tool_call) = entry {
2012 // Check if the tool call has completed and has output
2013 matches!(tool_call.status, ToolCallStatus::Completed)
2014 && tool_call.raw_output.is_some()
2015 } else {
2016 false
2017 }
2018 });
2019
2020 if has_completed_tool_call_after_user_msg {
2021 // Refusal is due to tool output - don't truncate, just notify
2022 // The model refused based on what the tool returned
2023 cx.emit(AcpThreadEvent::Refusal);
2024 } else {
2025 // User prompt was refused - truncate back to before the user message
2026 let range = user_msg_ix..this.entries.len();
2027 if range.start < range.end {
2028 this.entries.truncate(user_msg_ix);
2029 cx.emit(AcpThreadEvent::EntriesRemoved(range));
2030 }
2031 cx.emit(AcpThreadEvent::Refusal);
2032 }
2033 } else {
2034 // No user message found, treat as general refusal
2035 cx.emit(AcpThreadEvent::Refusal);
2036 }
2037 }
2038
2039 cx.emit(AcpThreadEvent::Stopped);
2040 Ok(())
2041 }
2042 }
2043 })?
2044 })
2045 .boxed()
2046 }
2047
2048 pub fn cancel(&mut self, cx: &mut Context<Self>) -> Task<()> {
2049 let Some(send_task) = self.send_task.take() else {
2050 return Task::ready(());
2051 };
2052
2053 for entry in self.entries.iter_mut() {
2054 if let AgentThreadEntry::ToolCall(call) = entry {
2055 let cancel = matches!(
2056 call.status,
2057 ToolCallStatus::Pending
2058 | ToolCallStatus::WaitingForConfirmation { .. }
2059 | ToolCallStatus::InProgress
2060 );
2061
2062 if cancel {
2063 call.status = ToolCallStatus::Canceled;
2064 }
2065 }
2066 }
2067
2068 self.connection.cancel(&self.session_id, cx);
2069
2070 // Wait for the send task to complete
2071 cx.foreground_executor().spawn(send_task)
2072 }
2073
2074 /// Restores the git working tree to the state at the given checkpoint (if one exists)
2075 pub fn restore_checkpoint(
2076 &mut self,
2077 id: UserMessageId,
2078 cx: &mut Context<Self>,
2079 ) -> Task<Result<()>> {
2080 let Some((_, message)) = self.user_message_mut(&id) else {
2081 return Task::ready(Err(anyhow!("message not found")));
2082 };
2083
2084 let checkpoint = message
2085 .checkpoint
2086 .as_ref()
2087 .map(|c| c.git_checkpoint.clone());
2088
2089 // Cancel any in-progress generation before restoring
2090 let cancel_task = self.cancel(cx);
2091 let rewind = self.rewind(id.clone(), cx);
2092 let git_store = self.project.read(cx).git_store().clone();
2093
2094 cx.spawn(async move |_, cx| {
2095 cancel_task.await;
2096 rewind.await?;
2097 if let Some(checkpoint) = checkpoint {
2098 git_store
2099 .update(cx, |git, cx| git.restore_checkpoint(checkpoint, cx))
2100 .await?;
2101 }
2102
2103 Ok(())
2104 })
2105 }
2106
2107 /// Rewinds this thread to before the entry at `index`, removing it and all
2108 /// subsequent entries while rejecting any action_log changes made from that point.
2109 /// Unlike `restore_checkpoint`, this method does not restore from git.
2110 pub fn rewind(&mut self, id: UserMessageId, cx: &mut Context<Self>) -> Task<Result<()>> {
2111 let Some(truncate) = self.connection.truncate(&self.session_id, cx) else {
2112 return Task::ready(Err(anyhow!("not supported")));
2113 };
2114
2115 let telemetry = ActionLogTelemetry::from(&*self);
2116 cx.spawn(async move |this, cx| {
2117 cx.update(|cx| truncate.run(id.clone(), cx)).await?;
2118 this.update(cx, |this, cx| {
2119 if let Some((ix, _)) = this.user_message_mut(&id) {
2120 // Collect all terminals from entries that will be removed
2121 let terminals_to_remove: Vec<acp::TerminalId> = this.entries[ix..]
2122 .iter()
2123 .flat_map(|entry| entry.terminals())
2124 .filter_map(|terminal| terminal.read(cx).id().clone().into())
2125 .collect();
2126
2127 let range = ix..this.entries.len();
2128 this.entries.truncate(ix);
2129 cx.emit(AcpThreadEvent::EntriesRemoved(range));
2130
2131 // Kill and remove the terminals
2132 for terminal_id in terminals_to_remove {
2133 if let Some(terminal) = this.terminals.remove(&terminal_id) {
2134 terminal.update(cx, |terminal, cx| {
2135 terminal.kill(cx);
2136 });
2137 }
2138 }
2139 }
2140 this.action_log().update(cx, |action_log, cx| {
2141 action_log.reject_all_edits(Some(telemetry), cx)
2142 })
2143 })?
2144 .await;
2145 Ok(())
2146 })
2147 }
2148
2149 fn update_last_checkpoint(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
2150 let git_store = self.project.read(cx).git_store().clone();
2151
2152 let Some((_, message)) = self.last_user_message() else {
2153 return Task::ready(Ok(()));
2154 };
2155 let Some(user_message_id) = message.id.clone() else {
2156 return Task::ready(Ok(()));
2157 };
2158 let Some(checkpoint) = message.checkpoint.as_ref() else {
2159 return Task::ready(Ok(()));
2160 };
2161 let old_checkpoint = checkpoint.git_checkpoint.clone();
2162
2163 let new_checkpoint = git_store.update(cx, |git, cx| git.checkpoint(cx));
2164 cx.spawn(async move |this, cx| {
2165 let Some(new_checkpoint) = new_checkpoint
2166 .await
2167 .context("failed to get new checkpoint")
2168 .log_err()
2169 else {
2170 return Ok(());
2171 };
2172
2173 let equal = git_store
2174 .update(cx, |git, cx| {
2175 git.compare_checkpoints(old_checkpoint.clone(), new_checkpoint, cx)
2176 })
2177 .await
2178 .unwrap_or(true);
2179
2180 this.update(cx, |this, cx| {
2181 if let Some((ix, message)) = this.user_message_mut(&user_message_id) {
2182 if let Some(checkpoint) = message.checkpoint.as_mut() {
2183 checkpoint.show = !equal;
2184 cx.emit(AcpThreadEvent::EntryUpdated(ix));
2185 }
2186 }
2187 })?;
2188
2189 Ok(())
2190 })
2191 }
2192
2193 fn last_user_message(&mut self) -> Option<(usize, &mut UserMessage)> {
2194 self.entries
2195 .iter_mut()
2196 .enumerate()
2197 .rev()
2198 .find_map(|(ix, entry)| {
2199 if let AgentThreadEntry::UserMessage(message) = entry {
2200 Some((ix, message))
2201 } else {
2202 None
2203 }
2204 })
2205 }
2206
2207 fn user_message_mut(&mut self, id: &UserMessageId) -> Option<(usize, &mut UserMessage)> {
2208 self.entries.iter_mut().enumerate().find_map(|(ix, entry)| {
2209 if let AgentThreadEntry::UserMessage(message) = entry {
2210 if message.id.as_ref() == Some(id) {
2211 Some((ix, message))
2212 } else {
2213 None
2214 }
2215 } else {
2216 None
2217 }
2218 })
2219 }
2220
2221 pub fn read_text_file(
2222 &self,
2223 path: PathBuf,
2224 line: Option<u32>,
2225 limit: Option<u32>,
2226 reuse_shared_snapshot: bool,
2227 cx: &mut Context<Self>,
2228 ) -> Task<Result<String, acp::Error>> {
2229 // Args are 1-based, move to 0-based
2230 let line = line.unwrap_or_default().saturating_sub(1);
2231 let limit = limit.unwrap_or(u32::MAX);
2232 let project = self.project.clone();
2233 let action_log = self.action_log.clone();
2234 cx.spawn(async move |this, cx| {
2235 let load = project.update(cx, |project, cx| {
2236 let path = project
2237 .project_path_for_absolute_path(&path, cx)
2238 .ok_or_else(|| {
2239 acp::Error::resource_not_found(Some(path.display().to_string()))
2240 })?;
2241 Ok::<_, acp::Error>(project.open_buffer(path, cx))
2242 })?;
2243
2244 let buffer = load.await?;
2245
2246 let snapshot = if reuse_shared_snapshot {
2247 this.read_with(cx, |this, _| {
2248 this.shared_buffers.get(&buffer.clone()).cloned()
2249 })
2250 .log_err()
2251 .flatten()
2252 } else {
2253 None
2254 };
2255
2256 let snapshot = if let Some(snapshot) = snapshot {
2257 snapshot
2258 } else {
2259 action_log.update(cx, |action_log, cx| {
2260 action_log.buffer_read(buffer.clone(), cx);
2261 });
2262
2263 let snapshot = buffer.update(cx, |buffer, _| buffer.snapshot());
2264 this.update(cx, |this, _| {
2265 this.shared_buffers.insert(buffer.clone(), snapshot.clone());
2266 })?;
2267 snapshot
2268 };
2269
2270 let max_point = snapshot.max_point();
2271 let start_position = Point::new(line, 0);
2272
2273 if start_position > max_point {
2274 return Err(acp::Error::invalid_params().data(format!(
2275 "Attempting to read beyond the end of the file, line {}:{}",
2276 max_point.row + 1,
2277 max_point.column
2278 )));
2279 }
2280
2281 let start = snapshot.anchor_before(start_position);
2282 let end = snapshot.anchor_before(Point::new(line.saturating_add(limit), 0));
2283
2284 project.update(cx, |project, cx| {
2285 project.set_agent_location(
2286 Some(AgentLocation {
2287 buffer: buffer.downgrade(),
2288 position: start,
2289 }),
2290 cx,
2291 );
2292 });
2293
2294 Ok(snapshot.text_for_range(start..end).collect::<String>())
2295 })
2296 }
2297
2298 pub fn write_text_file(
2299 &self,
2300 path: PathBuf,
2301 content: String,
2302 cx: &mut Context<Self>,
2303 ) -> Task<Result<()>> {
2304 let project = self.project.clone();
2305 let action_log = self.action_log.clone();
2306 cx.spawn(async move |this, cx| {
2307 let load = project.update(cx, |project, cx| {
2308 let path = project
2309 .project_path_for_absolute_path(&path, cx)
2310 .context("invalid path")?;
2311 anyhow::Ok(project.open_buffer(path, cx))
2312 });
2313 let buffer = load?.await?;
2314 let snapshot = this.update(cx, |this, cx| {
2315 this.shared_buffers
2316 .get(&buffer)
2317 .cloned()
2318 .unwrap_or_else(|| buffer.read(cx).snapshot())
2319 })?;
2320 let edits = cx
2321 .background_executor()
2322 .spawn(async move {
2323 let old_text = snapshot.text();
2324 text_diff(old_text.as_str(), &content)
2325 .into_iter()
2326 .map(|(range, replacement)| {
2327 (
2328 snapshot.anchor_after(range.start)
2329 ..snapshot.anchor_before(range.end),
2330 replacement,
2331 )
2332 })
2333 .collect::<Vec<_>>()
2334 })
2335 .await;
2336
2337 project.update(cx, |project, cx| {
2338 project.set_agent_location(
2339 Some(AgentLocation {
2340 buffer: buffer.downgrade(),
2341 position: edits
2342 .last()
2343 .map(|(range, _)| range.end)
2344 .unwrap_or(Anchor::min_for_buffer(buffer.read(cx).remote_id())),
2345 }),
2346 cx,
2347 );
2348 });
2349
2350 let format_on_save = cx.update(|cx| {
2351 action_log.update(cx, |action_log, cx| {
2352 action_log.buffer_read(buffer.clone(), cx);
2353 });
2354
2355 let format_on_save = buffer.update(cx, |buffer, cx| {
2356 buffer.edit(edits, None, cx);
2357
2358 let settings = language::language_settings::language_settings(
2359 buffer.language().map(|l| l.name()),
2360 buffer.file(),
2361 cx,
2362 );
2363
2364 settings.format_on_save != FormatOnSave::Off
2365 });
2366 action_log.update(cx, |action_log, cx| {
2367 action_log.buffer_edited(buffer.clone(), cx);
2368 });
2369 format_on_save
2370 });
2371
2372 if format_on_save {
2373 let format_task = project.update(cx, |project, cx| {
2374 project.format(
2375 HashSet::from_iter([buffer.clone()]),
2376 LspFormatTarget::Buffers,
2377 false,
2378 FormatTrigger::Save,
2379 cx,
2380 )
2381 });
2382 format_task.await.log_err();
2383
2384 action_log.update(cx, |action_log, cx| {
2385 action_log.buffer_edited(buffer.clone(), cx);
2386 });
2387 }
2388
2389 project
2390 .update(cx, |project, cx| project.save_buffer(buffer, cx))
2391 .await
2392 })
2393 }
2394
2395 pub fn create_terminal(
2396 &self,
2397 command: String,
2398 args: Vec<String>,
2399 extra_env: Vec<acp::EnvVariable>,
2400 cwd: Option<PathBuf>,
2401 output_byte_limit: Option<u64>,
2402 cx: &mut Context<Self>,
2403 ) -> Task<Result<Entity<Terminal>>> {
2404 let env = match &cwd {
2405 Some(dir) => self.project.update(cx, |project, cx| {
2406 project.environment().update(cx, |env, cx| {
2407 env.directory_environment(dir.as_path().into(), cx)
2408 })
2409 }),
2410 None => Task::ready(None).shared(),
2411 };
2412 let env = cx.spawn(async move |_, _| {
2413 let mut env = env.await.unwrap_or_default();
2414 // Disables paging for `git` and hopefully other commands
2415 env.insert("PAGER".into(), "".into());
2416 for var in extra_env {
2417 env.insert(var.name, var.value);
2418 }
2419 env
2420 });
2421
2422 let project = self.project.clone();
2423 let language_registry = project.read(cx).languages().clone();
2424 let is_windows = project.read(cx).path_style(cx).is_windows();
2425
2426 let terminal_id = acp::TerminalId::new(Uuid::new_v4().to_string());
2427 let terminal_task = cx.spawn({
2428 let terminal_id = terminal_id.clone();
2429 async move |_this, cx| {
2430 let env = env.await;
2431 let shell = project
2432 .update(cx, |project, cx| {
2433 project
2434 .remote_client()
2435 .and_then(|r| r.read(cx).default_system_shell())
2436 })
2437 .unwrap_or_else(|| get_default_system_shell_preferring_bash());
2438 let (task_command, task_args) =
2439 ShellBuilder::new(&Shell::Program(shell), is_windows)
2440 .redirect_stdin_to_dev_null()
2441 .build(Some(command.clone()), &args);
2442 let terminal = project
2443 .update(cx, |project, cx| {
2444 project.create_terminal_task(
2445 task::SpawnInTerminal {
2446 command: Some(task_command),
2447 args: task_args,
2448 cwd: cwd.clone(),
2449 env,
2450 ..Default::default()
2451 },
2452 cx,
2453 )
2454 })
2455 .await?;
2456
2457 anyhow::Ok(cx.new(|cx| {
2458 Terminal::new(
2459 terminal_id,
2460 &format!("{} {}", command, args.join(" ")),
2461 cwd,
2462 output_byte_limit.map(|l| l as usize),
2463 terminal,
2464 language_registry,
2465 cx,
2466 )
2467 }))
2468 }
2469 });
2470
2471 cx.spawn(async move |this, cx| {
2472 let terminal = terminal_task.await?;
2473 this.update(cx, |this, _cx| {
2474 this.terminals.insert(terminal_id, terminal.clone());
2475 terminal
2476 })
2477 })
2478 }
2479
2480 pub fn kill_terminal(
2481 &mut self,
2482 terminal_id: acp::TerminalId,
2483 cx: &mut Context<Self>,
2484 ) -> Result<()> {
2485 self.terminals
2486 .get(&terminal_id)
2487 .context("Terminal not found")?
2488 .update(cx, |terminal, cx| {
2489 terminal.kill(cx);
2490 });
2491
2492 Ok(())
2493 }
2494
2495 pub fn release_terminal(
2496 &mut self,
2497 terminal_id: acp::TerminalId,
2498 cx: &mut Context<Self>,
2499 ) -> Result<()> {
2500 self.terminals
2501 .remove(&terminal_id)
2502 .context("Terminal not found")?
2503 .update(cx, |terminal, cx| {
2504 terminal.kill(cx);
2505 });
2506
2507 Ok(())
2508 }
2509
2510 pub fn terminal(&self, terminal_id: acp::TerminalId) -> Result<Entity<Terminal>> {
2511 self.terminals
2512 .get(&terminal_id)
2513 .context("Terminal not found")
2514 .cloned()
2515 }
2516
2517 pub fn to_markdown(&self, cx: &App) -> String {
2518 self.entries.iter().map(|e| e.to_markdown(cx)).collect()
2519 }
2520
2521 pub fn emit_load_error(&mut self, error: LoadError, cx: &mut Context<Self>) {
2522 cx.emit(AcpThreadEvent::LoadError(error));
2523 }
2524
2525 pub fn register_terminal_created(
2526 &mut self,
2527 terminal_id: acp::TerminalId,
2528 command_label: String,
2529 working_dir: Option<PathBuf>,
2530 output_byte_limit: Option<u64>,
2531 terminal: Entity<::terminal::Terminal>,
2532 cx: &mut Context<Self>,
2533 ) -> Entity<Terminal> {
2534 let language_registry = self.project.read(cx).languages().clone();
2535
2536 let entity = cx.new(|cx| {
2537 Terminal::new(
2538 terminal_id.clone(),
2539 &command_label,
2540 working_dir.clone(),
2541 output_byte_limit.map(|l| l as usize),
2542 terminal,
2543 language_registry,
2544 cx,
2545 )
2546 });
2547 self.terminals.insert(terminal_id.clone(), entity.clone());
2548 entity
2549 }
2550}
2551
2552fn markdown_for_raw_output(
2553 raw_output: &serde_json::Value,
2554 language_registry: &Arc<LanguageRegistry>,
2555 cx: &mut App,
2556) -> Option<Entity<Markdown>> {
2557 match raw_output {
2558 serde_json::Value::Null => None,
2559 serde_json::Value::Bool(value) => Some(cx.new(|cx| {
2560 Markdown::new(
2561 value.to_string().into(),
2562 Some(language_registry.clone()),
2563 None,
2564 cx,
2565 )
2566 })),
2567 serde_json::Value::Number(value) => Some(cx.new(|cx| {
2568 Markdown::new(
2569 value.to_string().into(),
2570 Some(language_registry.clone()),
2571 None,
2572 cx,
2573 )
2574 })),
2575 serde_json::Value::String(value) => Some(cx.new(|cx| {
2576 Markdown::new(
2577 value.clone().into(),
2578 Some(language_registry.clone()),
2579 None,
2580 cx,
2581 )
2582 })),
2583 value => Some(cx.new(|cx| {
2584 let pretty_json = to_string_pretty(value).unwrap_or_else(|_| value.to_string());
2585
2586 Markdown::new(
2587 format!("```json\n{}\n```", pretty_json).into(),
2588 Some(language_registry.clone()),
2589 None,
2590 cx,
2591 )
2592 })),
2593 }
2594}
2595
2596#[cfg(test)]
2597mod tests {
2598 use super::*;
2599 use anyhow::anyhow;
2600 use futures::{channel::mpsc, future::LocalBoxFuture, select};
2601 use gpui::{App, AsyncApp, TestAppContext, WeakEntity};
2602 use indoc::indoc;
2603 use project::{FakeFs, Fs};
2604 use rand::{distr, prelude::*};
2605 use serde_json::json;
2606 use settings::SettingsStore;
2607 use smol::stream::StreamExt as _;
2608 use std::{
2609 any::Any,
2610 cell::RefCell,
2611 path::Path,
2612 rc::Rc,
2613 sync::atomic::{AtomicBool, AtomicUsize, Ordering::SeqCst},
2614 time::Duration,
2615 };
2616 use util::path;
2617
2618 fn init_test(cx: &mut TestAppContext) {
2619 env_logger::try_init().ok();
2620 cx.update(|cx| {
2621 let settings_store = SettingsStore::test(cx);
2622 cx.set_global(settings_store);
2623 });
2624 }
2625
2626 #[gpui::test]
2627 async fn test_terminal_output_buffered_before_created_renders(cx: &mut gpui::TestAppContext) {
2628 init_test(cx);
2629
2630 let fs = FakeFs::new(cx.executor());
2631 let project = Project::test(fs, [], cx).await;
2632 let connection = Rc::new(FakeAgentConnection::new());
2633 let thread = cx
2634 .update(|cx| connection.new_thread(project, std::path::Path::new(path!("/test")), cx))
2635 .await
2636 .unwrap();
2637
2638 let terminal_id = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
2639
2640 // Send Output BEFORE Created - should be buffered by acp_thread
2641 thread.update(cx, |thread, cx| {
2642 thread.on_terminal_provider_event(
2643 TerminalProviderEvent::Output {
2644 terminal_id: terminal_id.clone(),
2645 data: b"hello buffered".to_vec(),
2646 },
2647 cx,
2648 );
2649 });
2650
2651 // Create a display-only terminal and then send Created
2652 let lower = cx.new(|cx| {
2653 let builder = ::terminal::TerminalBuilder::new_display_only(
2654 ::terminal::terminal_settings::CursorShape::default(),
2655 ::terminal::terminal_settings::AlternateScroll::On,
2656 None,
2657 0,
2658 cx.background_executor(),
2659 PathStyle::local(),
2660 )
2661 .unwrap();
2662 builder.subscribe(cx)
2663 });
2664
2665 thread.update(cx, |thread, cx| {
2666 thread.on_terminal_provider_event(
2667 TerminalProviderEvent::Created {
2668 terminal_id: terminal_id.clone(),
2669 label: "Buffered Test".to_string(),
2670 cwd: None,
2671 output_byte_limit: None,
2672 terminal: lower.clone(),
2673 },
2674 cx,
2675 );
2676 });
2677
2678 // After Created, buffered Output should have been flushed into the renderer
2679 let content = thread.read_with(cx, |thread, cx| {
2680 let term = thread.terminal(terminal_id.clone()).unwrap();
2681 term.read_with(cx, |t, cx| t.inner().read(cx).get_content())
2682 });
2683
2684 assert!(
2685 content.contains("hello buffered"),
2686 "expected buffered output to render, got: {content}"
2687 );
2688 }
2689
2690 #[gpui::test]
2691 async fn test_terminal_output_and_exit_buffered_before_created(cx: &mut gpui::TestAppContext) {
2692 init_test(cx);
2693
2694 let fs = FakeFs::new(cx.executor());
2695 let project = Project::test(fs, [], cx).await;
2696 let connection = Rc::new(FakeAgentConnection::new());
2697 let thread = cx
2698 .update(|cx| connection.new_thread(project, std::path::Path::new(path!("/test")), cx))
2699 .await
2700 .unwrap();
2701
2702 let terminal_id = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
2703
2704 // Send Output BEFORE Created
2705 thread.update(cx, |thread, cx| {
2706 thread.on_terminal_provider_event(
2707 TerminalProviderEvent::Output {
2708 terminal_id: terminal_id.clone(),
2709 data: b"pre-exit data".to_vec(),
2710 },
2711 cx,
2712 );
2713 });
2714
2715 // Send Exit BEFORE Created
2716 thread.update(cx, |thread, cx| {
2717 thread.on_terminal_provider_event(
2718 TerminalProviderEvent::Exit {
2719 terminal_id: terminal_id.clone(),
2720 status: acp::TerminalExitStatus::new().exit_code(0),
2721 },
2722 cx,
2723 );
2724 });
2725
2726 // Now create a display-only lower-level terminal and send Created
2727 let lower = cx.new(|cx| {
2728 let builder = ::terminal::TerminalBuilder::new_display_only(
2729 ::terminal::terminal_settings::CursorShape::default(),
2730 ::terminal::terminal_settings::AlternateScroll::On,
2731 None,
2732 0,
2733 cx.background_executor(),
2734 PathStyle::local(),
2735 )
2736 .unwrap();
2737 builder.subscribe(cx)
2738 });
2739
2740 thread.update(cx, |thread, cx| {
2741 thread.on_terminal_provider_event(
2742 TerminalProviderEvent::Created {
2743 terminal_id: terminal_id.clone(),
2744 label: "Buffered Exit Test".to_string(),
2745 cwd: None,
2746 output_byte_limit: None,
2747 terminal: lower.clone(),
2748 },
2749 cx,
2750 );
2751 });
2752
2753 // Output should be present after Created (flushed from buffer)
2754 let content = thread.read_with(cx, |thread, cx| {
2755 let term = thread.terminal(terminal_id.clone()).unwrap();
2756 term.read_with(cx, |t, cx| t.inner().read(cx).get_content())
2757 });
2758
2759 assert!(
2760 content.contains("pre-exit data"),
2761 "expected pre-exit data to render, got: {content}"
2762 );
2763 }
2764
2765 /// Test that killing a terminal via Terminal::kill properly:
2766 /// 1. Causes wait_for_exit to complete (doesn't hang forever)
2767 /// 2. The underlying terminal still has the output that was written before the kill
2768 ///
2769 /// This test verifies that the fix to kill_active_task (which now also kills
2770 /// the shell process in addition to the foreground process) properly allows
2771 /// wait_for_exit to complete instead of hanging indefinitely.
2772 #[cfg(unix)]
2773 #[gpui::test]
2774 async fn test_terminal_kill_allows_wait_for_exit_to_complete(cx: &mut gpui::TestAppContext) {
2775 use std::collections::HashMap;
2776 use task::Shell;
2777 use util::shell_builder::ShellBuilder;
2778
2779 init_test(cx);
2780 cx.executor().allow_parking();
2781
2782 let fs = FakeFs::new(cx.executor());
2783 let project = Project::test(fs, [], cx).await;
2784 let connection = Rc::new(FakeAgentConnection::new());
2785 let thread = cx
2786 .update(|cx| connection.new_thread(project.clone(), Path::new(path!("/test")), cx))
2787 .await
2788 .unwrap();
2789
2790 let terminal_id = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
2791
2792 // Create a real PTY terminal that runs a command which prints output then sleeps
2793 // We use printf instead of echo and chain with && sleep to ensure proper execution
2794 let (completion_tx, _completion_rx) = smol::channel::unbounded();
2795 let (program, args) = ShellBuilder::new(&Shell::System, false).build(
2796 Some("printf 'output_before_kill\\n' && sleep 60".to_owned()),
2797 &[],
2798 );
2799
2800 let builder = cx
2801 .update(|cx| {
2802 ::terminal::TerminalBuilder::new(
2803 None,
2804 None,
2805 task::Shell::WithArguments {
2806 program,
2807 args,
2808 title_override: None,
2809 },
2810 HashMap::default(),
2811 ::terminal::terminal_settings::CursorShape::default(),
2812 ::terminal::terminal_settings::AlternateScroll::On,
2813 None,
2814 vec![],
2815 0,
2816 false,
2817 0,
2818 Some(completion_tx),
2819 cx,
2820 vec![],
2821 PathStyle::local(),
2822 )
2823 })
2824 .await
2825 .unwrap();
2826
2827 let lower_terminal = cx.new(|cx| builder.subscribe(cx));
2828
2829 // Create the acp_thread Terminal wrapper
2830 thread.update(cx, |thread, cx| {
2831 thread.on_terminal_provider_event(
2832 TerminalProviderEvent::Created {
2833 terminal_id: terminal_id.clone(),
2834 label: "printf output_before_kill && sleep 60".to_string(),
2835 cwd: None,
2836 output_byte_limit: None,
2837 terminal: lower_terminal.clone(),
2838 },
2839 cx,
2840 );
2841 });
2842
2843 // Wait for the printf command to execute and produce output
2844 // Use real time since parking is enabled
2845 cx.executor().timer(Duration::from_millis(500)).await;
2846
2847 // Get the acp_thread Terminal and kill it
2848 let wait_for_exit = thread.update(cx, |thread, cx| {
2849 let term = thread.terminals.get(&terminal_id).unwrap();
2850 let wait_for_exit = term.read(cx).wait_for_exit();
2851 term.update(cx, |term, cx| {
2852 term.kill(cx);
2853 });
2854 wait_for_exit
2855 });
2856
2857 // KEY ASSERTION: wait_for_exit should complete within a reasonable time (not hang).
2858 // Before the fix to kill_active_task, this would hang forever because
2859 // only the foreground process was killed, not the shell, so the PTY
2860 // child never exited and wait_for_completed_task never completed.
2861 let exit_result = futures::select! {
2862 result = futures::FutureExt::fuse(wait_for_exit) => Some(result),
2863 _ = futures::FutureExt::fuse(cx.background_executor.timer(Duration::from_secs(5))) => None,
2864 };
2865
2866 assert!(
2867 exit_result.is_some(),
2868 "wait_for_exit should complete after kill, but it timed out. \
2869 This indicates kill_active_task is not properly killing the shell process."
2870 );
2871
2872 // Give the system a chance to process any pending updates
2873 cx.run_until_parked();
2874
2875 // Verify that the underlying terminal still has the output that was
2876 // written before the kill. This verifies that killing doesn't lose output.
2877 let inner_content = thread.read_with(cx, |thread, cx| {
2878 let term = thread.terminals.get(&terminal_id).unwrap();
2879 term.read(cx).inner().read(cx).get_content()
2880 });
2881
2882 assert!(
2883 inner_content.contains("output_before_kill"),
2884 "Underlying terminal should contain output from before kill, got: {}",
2885 inner_content
2886 );
2887 }
2888
2889 #[gpui::test]
2890 async fn test_push_user_content_block(cx: &mut gpui::TestAppContext) {
2891 init_test(cx);
2892
2893 let fs = FakeFs::new(cx.executor());
2894 let project = Project::test(fs, [], cx).await;
2895 let connection = Rc::new(FakeAgentConnection::new());
2896 let thread = cx
2897 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
2898 .await
2899 .unwrap();
2900
2901 // Test creating a new user message
2902 thread.update(cx, |thread, cx| {
2903 thread.push_user_content_block(None, "Hello, ".into(), cx);
2904 });
2905
2906 thread.update(cx, |thread, cx| {
2907 assert_eq!(thread.entries.len(), 1);
2908 if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
2909 assert_eq!(user_msg.id, None);
2910 assert_eq!(user_msg.content.to_markdown(cx), "Hello, ");
2911 } else {
2912 panic!("Expected UserMessage");
2913 }
2914 });
2915
2916 // Test appending to existing user message
2917 let message_1_id = UserMessageId::new();
2918 thread.update(cx, |thread, cx| {
2919 thread.push_user_content_block(Some(message_1_id.clone()), "world!".into(), cx);
2920 });
2921
2922 thread.update(cx, |thread, cx| {
2923 assert_eq!(thread.entries.len(), 1);
2924 if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
2925 assert_eq!(user_msg.id, Some(message_1_id));
2926 assert_eq!(user_msg.content.to_markdown(cx), "Hello, world!");
2927 } else {
2928 panic!("Expected UserMessage");
2929 }
2930 });
2931
2932 // Test creating new user message after assistant message
2933 thread.update(cx, |thread, cx| {
2934 thread.push_assistant_content_block("Assistant response".into(), false, cx);
2935 });
2936
2937 let message_2_id = UserMessageId::new();
2938 thread.update(cx, |thread, cx| {
2939 thread.push_user_content_block(
2940 Some(message_2_id.clone()),
2941 "New user message".into(),
2942 cx,
2943 );
2944 });
2945
2946 thread.update(cx, |thread, cx| {
2947 assert_eq!(thread.entries.len(), 3);
2948 if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[2] {
2949 assert_eq!(user_msg.id, Some(message_2_id));
2950 assert_eq!(user_msg.content.to_markdown(cx), "New user message");
2951 } else {
2952 panic!("Expected UserMessage at index 2");
2953 }
2954 });
2955 }
2956
2957 #[gpui::test]
2958 async fn test_thinking_concatenation(cx: &mut gpui::TestAppContext) {
2959 init_test(cx);
2960
2961 let fs = FakeFs::new(cx.executor());
2962 let project = Project::test(fs, [], cx).await;
2963 let connection = Rc::new(FakeAgentConnection::new().on_user_message(
2964 |_, thread, mut cx| {
2965 async move {
2966 thread.update(&mut cx, |thread, cx| {
2967 thread
2968 .handle_session_update(
2969 acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk::new(
2970 "Thinking ".into(),
2971 )),
2972 cx,
2973 )
2974 .unwrap();
2975 thread
2976 .handle_session_update(
2977 acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk::new(
2978 "hard!".into(),
2979 )),
2980 cx,
2981 )
2982 .unwrap();
2983 })?;
2984 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
2985 }
2986 .boxed_local()
2987 },
2988 ));
2989
2990 let thread = cx
2991 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
2992 .await
2993 .unwrap();
2994
2995 thread
2996 .update(cx, |thread, cx| thread.send_raw("Hello from Zed!", cx))
2997 .await
2998 .unwrap();
2999
3000 let output = thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
3001 assert_eq!(
3002 output,
3003 indoc! {r#"
3004 ## User
3005
3006 Hello from Zed!
3007
3008 ## Assistant
3009
3010 <thinking>
3011 Thinking hard!
3012 </thinking>
3013
3014 "#}
3015 );
3016 }
3017
3018 #[gpui::test]
3019 async fn test_edits_concurrently_to_user(cx: &mut TestAppContext) {
3020 init_test(cx);
3021
3022 let fs = FakeFs::new(cx.executor());
3023 fs.insert_tree(path!("/tmp"), json!({"foo": "one\ntwo\nthree\n"}))
3024 .await;
3025 let project = Project::test(fs.clone(), [], cx).await;
3026 let (read_file_tx, read_file_rx) = oneshot::channel::<()>();
3027 let read_file_tx = Rc::new(RefCell::new(Some(read_file_tx)));
3028 let connection = Rc::new(FakeAgentConnection::new().on_user_message(
3029 move |_, thread, mut cx| {
3030 let read_file_tx = read_file_tx.clone();
3031 async move {
3032 let content = thread
3033 .update(&mut cx, |thread, cx| {
3034 thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
3035 })
3036 .unwrap()
3037 .await
3038 .unwrap();
3039 assert_eq!(content, "one\ntwo\nthree\n");
3040 read_file_tx.take().unwrap().send(()).unwrap();
3041 thread
3042 .update(&mut cx, |thread, cx| {
3043 thread.write_text_file(
3044 path!("/tmp/foo").into(),
3045 "one\ntwo\nthree\nfour\nfive\n".to_string(),
3046 cx,
3047 )
3048 })
3049 .unwrap()
3050 .await
3051 .unwrap();
3052 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3053 }
3054 .boxed_local()
3055 },
3056 ));
3057
3058 let (worktree, pathbuf) = project
3059 .update(cx, |project, cx| {
3060 project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
3061 })
3062 .await
3063 .unwrap();
3064 let buffer = project
3065 .update(cx, |project, cx| {
3066 project.open_buffer((worktree.read(cx).id(), pathbuf), cx)
3067 })
3068 .await
3069 .unwrap();
3070
3071 let thread = cx
3072 .update(|cx| connection.new_thread(project, Path::new(path!("/tmp")), cx))
3073 .await
3074 .unwrap();
3075
3076 let request = thread.update(cx, |thread, cx| {
3077 thread.send_raw("Extend the count in /tmp/foo", cx)
3078 });
3079 read_file_rx.await.ok();
3080 buffer.update(cx, |buffer, cx| {
3081 buffer.edit([(0..0, "zero\n".to_string())], None, cx);
3082 });
3083 cx.run_until_parked();
3084 assert_eq!(
3085 buffer.read_with(cx, |buffer, _| buffer.text()),
3086 "zero\none\ntwo\nthree\nfour\nfive\n"
3087 );
3088 assert_eq!(
3089 String::from_utf8(fs.read_file_sync(path!("/tmp/foo")).unwrap()).unwrap(),
3090 "zero\none\ntwo\nthree\nfour\nfive\n"
3091 );
3092 request.await.unwrap();
3093 }
3094
3095 #[gpui::test]
3096 async fn test_reading_from_line(cx: &mut TestAppContext) {
3097 init_test(cx);
3098
3099 let fs = FakeFs::new(cx.executor());
3100 fs.insert_tree(path!("/tmp"), json!({"foo": "one\ntwo\nthree\nfour\n"}))
3101 .await;
3102 let project = Project::test(fs.clone(), [], cx).await;
3103 project
3104 .update(cx, |project, cx| {
3105 project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
3106 })
3107 .await
3108 .unwrap();
3109
3110 let connection = Rc::new(FakeAgentConnection::new());
3111
3112 let thread = cx
3113 .update(|cx| connection.new_thread(project, Path::new(path!("/tmp")), cx))
3114 .await
3115 .unwrap();
3116
3117 // Whole file
3118 let content = thread
3119 .update(cx, |thread, cx| {
3120 thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
3121 })
3122 .await
3123 .unwrap();
3124
3125 assert_eq!(content, "one\ntwo\nthree\nfour\n");
3126
3127 // Only start line
3128 let content = thread
3129 .update(cx, |thread, cx| {
3130 thread.read_text_file(path!("/tmp/foo").into(), Some(3), None, false, cx)
3131 })
3132 .await
3133 .unwrap();
3134
3135 assert_eq!(content, "three\nfour\n");
3136
3137 // Only limit
3138 let content = thread
3139 .update(cx, |thread, cx| {
3140 thread.read_text_file(path!("/tmp/foo").into(), None, Some(2), false, cx)
3141 })
3142 .await
3143 .unwrap();
3144
3145 assert_eq!(content, "one\ntwo\n");
3146
3147 // Range
3148 let content = thread
3149 .update(cx, |thread, cx| {
3150 thread.read_text_file(path!("/tmp/foo").into(), Some(2), Some(2), false, cx)
3151 })
3152 .await
3153 .unwrap();
3154
3155 assert_eq!(content, "two\nthree\n");
3156
3157 // Invalid
3158 let err = thread
3159 .update(cx, |thread, cx| {
3160 thread.read_text_file(path!("/tmp/foo").into(), Some(6), Some(2), false, cx)
3161 })
3162 .await
3163 .unwrap_err();
3164
3165 assert_eq!(
3166 err.to_string(),
3167 "Invalid params: \"Attempting to read beyond the end of the file, line 5:0\""
3168 );
3169 }
3170
3171 #[gpui::test]
3172 async fn test_reading_empty_file(cx: &mut TestAppContext) {
3173 init_test(cx);
3174
3175 let fs = FakeFs::new(cx.executor());
3176 fs.insert_tree(path!("/tmp"), json!({"foo": ""})).await;
3177 let project = Project::test(fs.clone(), [], cx).await;
3178 project
3179 .update(cx, |project, cx| {
3180 project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
3181 })
3182 .await
3183 .unwrap();
3184
3185 let connection = Rc::new(FakeAgentConnection::new());
3186
3187 let thread = cx
3188 .update(|cx| connection.new_thread(project, Path::new(path!("/tmp")), cx))
3189 .await
3190 .unwrap();
3191
3192 // Whole file
3193 let content = thread
3194 .update(cx, |thread, cx| {
3195 thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
3196 })
3197 .await
3198 .unwrap();
3199
3200 assert_eq!(content, "");
3201
3202 // Only start line
3203 let content = thread
3204 .update(cx, |thread, cx| {
3205 thread.read_text_file(path!("/tmp/foo").into(), Some(1), None, false, cx)
3206 })
3207 .await
3208 .unwrap();
3209
3210 assert_eq!(content, "");
3211
3212 // Only limit
3213 let content = thread
3214 .update(cx, |thread, cx| {
3215 thread.read_text_file(path!("/tmp/foo").into(), None, Some(2), false, cx)
3216 })
3217 .await
3218 .unwrap();
3219
3220 assert_eq!(content, "");
3221
3222 // Range
3223 let content = thread
3224 .update(cx, |thread, cx| {
3225 thread.read_text_file(path!("/tmp/foo").into(), Some(1), Some(1), false, cx)
3226 })
3227 .await
3228 .unwrap();
3229
3230 assert_eq!(content, "");
3231
3232 // Invalid
3233 let err = thread
3234 .update(cx, |thread, cx| {
3235 thread.read_text_file(path!("/tmp/foo").into(), Some(5), Some(2), false, cx)
3236 })
3237 .await
3238 .unwrap_err();
3239
3240 assert_eq!(
3241 err.to_string(),
3242 "Invalid params: \"Attempting to read beyond the end of the file, line 1:0\""
3243 );
3244 }
3245 #[gpui::test]
3246 async fn test_reading_non_existing_file(cx: &mut TestAppContext) {
3247 init_test(cx);
3248
3249 let fs = FakeFs::new(cx.executor());
3250 fs.insert_tree(path!("/tmp"), json!({})).await;
3251 let project = Project::test(fs.clone(), [], cx).await;
3252 project
3253 .update(cx, |project, cx| {
3254 project.find_or_create_worktree(path!("/tmp"), true, cx)
3255 })
3256 .await
3257 .unwrap();
3258
3259 let connection = Rc::new(FakeAgentConnection::new());
3260
3261 let thread = cx
3262 .update(|cx| connection.new_thread(project, Path::new(path!("/tmp")), cx))
3263 .await
3264 .unwrap();
3265
3266 // Out of project file
3267 let err = thread
3268 .update(cx, |thread, cx| {
3269 thread.read_text_file(path!("/foo").into(), None, None, false, cx)
3270 })
3271 .await
3272 .unwrap_err();
3273
3274 assert_eq!(err.code, acp::ErrorCode::ResourceNotFound);
3275 }
3276
3277 #[gpui::test]
3278 async fn test_succeeding_canceled_toolcall(cx: &mut TestAppContext) {
3279 init_test(cx);
3280
3281 let fs = FakeFs::new(cx.executor());
3282 let project = Project::test(fs, [], cx).await;
3283 let id = acp::ToolCallId::new("test");
3284
3285 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3286 let id = id.clone();
3287 move |_, thread, mut cx| {
3288 let id = id.clone();
3289 async move {
3290 thread
3291 .update(&mut cx, |thread, cx| {
3292 thread.handle_session_update(
3293 acp::SessionUpdate::ToolCall(
3294 acp::ToolCall::new(id.clone(), "Label")
3295 .kind(acp::ToolKind::Fetch)
3296 .status(acp::ToolCallStatus::InProgress),
3297 ),
3298 cx,
3299 )
3300 })
3301 .unwrap()
3302 .unwrap();
3303 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3304 }
3305 .boxed_local()
3306 }
3307 }));
3308
3309 let thread = cx
3310 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3311 .await
3312 .unwrap();
3313
3314 let request = thread.update(cx, |thread, cx| {
3315 thread.send_raw("Fetch https://example.com", cx)
3316 });
3317
3318 run_until_first_tool_call(&thread, cx).await;
3319
3320 thread.read_with(cx, |thread, _| {
3321 assert!(matches!(
3322 thread.entries[1],
3323 AgentThreadEntry::ToolCall(ToolCall {
3324 status: ToolCallStatus::InProgress,
3325 ..
3326 })
3327 ));
3328 });
3329
3330 thread.update(cx, |thread, cx| thread.cancel(cx)).await;
3331
3332 thread.read_with(cx, |thread, _| {
3333 assert!(matches!(
3334 &thread.entries[1],
3335 AgentThreadEntry::ToolCall(ToolCall {
3336 status: ToolCallStatus::Canceled,
3337 ..
3338 })
3339 ));
3340 });
3341
3342 thread
3343 .update(cx, |thread, cx| {
3344 thread.handle_session_update(
3345 acp::SessionUpdate::ToolCallUpdate(acp::ToolCallUpdate::new(
3346 id,
3347 acp::ToolCallUpdateFields::new().status(acp::ToolCallStatus::Completed),
3348 )),
3349 cx,
3350 )
3351 })
3352 .unwrap();
3353
3354 request.await.unwrap();
3355
3356 thread.read_with(cx, |thread, _| {
3357 assert!(matches!(
3358 thread.entries[1],
3359 AgentThreadEntry::ToolCall(ToolCall {
3360 status: ToolCallStatus::Completed,
3361 ..
3362 })
3363 ));
3364 });
3365 }
3366
3367 #[gpui::test]
3368 async fn test_no_pending_edits_if_tool_calls_are_completed(cx: &mut TestAppContext) {
3369 init_test(cx);
3370 let fs = FakeFs::new(cx.background_executor.clone());
3371 fs.insert_tree(path!("/test"), json!({})).await;
3372 let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
3373
3374 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3375 move |_, thread, mut cx| {
3376 async move {
3377 thread
3378 .update(&mut cx, |thread, cx| {
3379 thread.handle_session_update(
3380 acp::SessionUpdate::ToolCall(
3381 acp::ToolCall::new("test", "Label")
3382 .kind(acp::ToolKind::Edit)
3383 .status(acp::ToolCallStatus::Completed)
3384 .content(vec![acp::ToolCallContent::Diff(acp::Diff::new(
3385 "/test/test.txt",
3386 "foo",
3387 ))]),
3388 ),
3389 cx,
3390 )
3391 })
3392 .unwrap()
3393 .unwrap();
3394 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3395 }
3396 .boxed_local()
3397 }
3398 }));
3399
3400 let thread = cx
3401 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3402 .await
3403 .unwrap();
3404
3405 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Hi".into()], cx)))
3406 .await
3407 .unwrap();
3408
3409 assert!(cx.read(|cx| !thread.read(cx).has_pending_edit_tool_calls()));
3410 }
3411
3412 #[gpui::test(iterations = 10)]
3413 async fn test_checkpoints(cx: &mut TestAppContext) {
3414 init_test(cx);
3415 let fs = FakeFs::new(cx.background_executor.clone());
3416 fs.insert_tree(
3417 path!("/test"),
3418 json!({
3419 ".git": {}
3420 }),
3421 )
3422 .await;
3423 let project = Project::test(fs.clone(), [path!("/test").as_ref()], cx).await;
3424
3425 let simulate_changes = Arc::new(AtomicBool::new(true));
3426 let next_filename = Arc::new(AtomicUsize::new(0));
3427 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3428 let simulate_changes = simulate_changes.clone();
3429 let next_filename = next_filename.clone();
3430 let fs = fs.clone();
3431 move |request, thread, mut cx| {
3432 let fs = fs.clone();
3433 let simulate_changes = simulate_changes.clone();
3434 let next_filename = next_filename.clone();
3435 async move {
3436 if simulate_changes.load(SeqCst) {
3437 let filename = format!("/test/file-{}", next_filename.fetch_add(1, SeqCst));
3438 fs.write(Path::new(&filename), b"").await?;
3439 }
3440
3441 let acp::ContentBlock::Text(content) = &request.prompt[0] else {
3442 panic!("expected text content block");
3443 };
3444 thread.update(&mut cx, |thread, cx| {
3445 thread
3446 .handle_session_update(
3447 acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk::new(
3448 content.text.to_uppercase().into(),
3449 )),
3450 cx,
3451 )
3452 .unwrap();
3453 })?;
3454 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3455 }
3456 .boxed_local()
3457 }
3458 }));
3459 let thread = cx
3460 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3461 .await
3462 .unwrap();
3463
3464 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Lorem".into()], cx)))
3465 .await
3466 .unwrap();
3467 thread.read_with(cx, |thread, cx| {
3468 assert_eq!(
3469 thread.to_markdown(cx),
3470 indoc! {"
3471 ## User (checkpoint)
3472
3473 Lorem
3474
3475 ## Assistant
3476
3477 LOREM
3478
3479 "}
3480 );
3481 });
3482 assert_eq!(fs.files(), vec![Path::new(path!("/test/file-0"))]);
3483
3484 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["ipsum".into()], cx)))
3485 .await
3486 .unwrap();
3487 thread.read_with(cx, |thread, cx| {
3488 assert_eq!(
3489 thread.to_markdown(cx),
3490 indoc! {"
3491 ## User (checkpoint)
3492
3493 Lorem
3494
3495 ## Assistant
3496
3497 LOREM
3498
3499 ## User (checkpoint)
3500
3501 ipsum
3502
3503 ## Assistant
3504
3505 IPSUM
3506
3507 "}
3508 );
3509 });
3510 assert_eq!(
3511 fs.files(),
3512 vec![
3513 Path::new(path!("/test/file-0")),
3514 Path::new(path!("/test/file-1"))
3515 ]
3516 );
3517
3518 // Checkpoint isn't stored when there are no changes.
3519 simulate_changes.store(false, SeqCst);
3520 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["dolor".into()], cx)))
3521 .await
3522 .unwrap();
3523 thread.read_with(cx, |thread, cx| {
3524 assert_eq!(
3525 thread.to_markdown(cx),
3526 indoc! {"
3527 ## User (checkpoint)
3528
3529 Lorem
3530
3531 ## Assistant
3532
3533 LOREM
3534
3535 ## User (checkpoint)
3536
3537 ipsum
3538
3539 ## Assistant
3540
3541 IPSUM
3542
3543 ## User
3544
3545 dolor
3546
3547 ## Assistant
3548
3549 DOLOR
3550
3551 "}
3552 );
3553 });
3554 assert_eq!(
3555 fs.files(),
3556 vec![
3557 Path::new(path!("/test/file-0")),
3558 Path::new(path!("/test/file-1"))
3559 ]
3560 );
3561
3562 // Rewinding the conversation truncates the history and restores the checkpoint.
3563 thread
3564 .update(cx, |thread, cx| {
3565 let AgentThreadEntry::UserMessage(message) = &thread.entries[2] else {
3566 panic!("unexpected entries {:?}", thread.entries)
3567 };
3568 thread.restore_checkpoint(message.id.clone().unwrap(), cx)
3569 })
3570 .await
3571 .unwrap();
3572 thread.read_with(cx, |thread, cx| {
3573 assert_eq!(
3574 thread.to_markdown(cx),
3575 indoc! {"
3576 ## User (checkpoint)
3577
3578 Lorem
3579
3580 ## Assistant
3581
3582 LOREM
3583
3584 "}
3585 );
3586 });
3587 assert_eq!(fs.files(), vec![Path::new(path!("/test/file-0"))]);
3588 }
3589
3590 #[gpui::test]
3591 async fn test_tool_result_refusal(cx: &mut TestAppContext) {
3592 use std::sync::atomic::AtomicUsize;
3593 init_test(cx);
3594
3595 let fs = FakeFs::new(cx.executor());
3596 let project = Project::test(fs, None, cx).await;
3597
3598 // Create a connection that simulates refusal after tool result
3599 let prompt_count = Arc::new(AtomicUsize::new(0));
3600 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3601 let prompt_count = prompt_count.clone();
3602 move |_request, thread, mut cx| {
3603 let count = prompt_count.fetch_add(1, SeqCst);
3604 async move {
3605 if count == 0 {
3606 // First prompt: Generate a tool call with result
3607 thread.update(&mut cx, |thread, cx| {
3608 thread
3609 .handle_session_update(
3610 acp::SessionUpdate::ToolCall(
3611 acp::ToolCall::new("tool1", "Test Tool")
3612 .kind(acp::ToolKind::Fetch)
3613 .status(acp::ToolCallStatus::Completed)
3614 .raw_input(serde_json::json!({"query": "test"}))
3615 .raw_output(serde_json::json!({"result": "inappropriate content"})),
3616 ),
3617 cx,
3618 )
3619 .unwrap();
3620 })?;
3621
3622 // Now return refusal because of the tool result
3623 Ok(acp::PromptResponse::new(acp::StopReason::Refusal))
3624 } else {
3625 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3626 }
3627 }
3628 .boxed_local()
3629 }
3630 }));
3631
3632 let thread = cx
3633 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3634 .await
3635 .unwrap();
3636
3637 // Track if we see a Refusal event
3638 let saw_refusal_event = Arc::new(std::sync::Mutex::new(false));
3639 let saw_refusal_event_captured = saw_refusal_event.clone();
3640 thread.update(cx, |_thread, cx| {
3641 cx.subscribe(
3642 &thread,
3643 move |_thread, _event_thread, event: &AcpThreadEvent, _cx| {
3644 if matches!(event, AcpThreadEvent::Refusal) {
3645 *saw_refusal_event_captured.lock().unwrap() = true;
3646 }
3647 },
3648 )
3649 .detach();
3650 });
3651
3652 // Send a user message - this will trigger tool call and then refusal
3653 let send_task = thread.update(cx, |thread, cx| thread.send(vec!["Hello".into()], cx));
3654 cx.background_executor.spawn(send_task).detach();
3655 cx.run_until_parked();
3656
3657 // Verify that:
3658 // 1. A Refusal event WAS emitted (because it's a tool result refusal, not user prompt)
3659 // 2. The user message was NOT truncated
3660 assert!(
3661 *saw_refusal_event.lock().unwrap(),
3662 "Refusal event should be emitted for tool result refusals"
3663 );
3664
3665 thread.read_with(cx, |thread, _| {
3666 let entries = thread.entries();
3667 assert!(entries.len() >= 2, "Should have user message and tool call");
3668
3669 // Verify user message is still there
3670 assert!(
3671 matches!(entries[0], AgentThreadEntry::UserMessage(_)),
3672 "User message should not be truncated"
3673 );
3674
3675 // Verify tool call is there with result
3676 if let AgentThreadEntry::ToolCall(tool_call) = &entries[1] {
3677 assert!(
3678 tool_call.raw_output.is_some(),
3679 "Tool call should have output"
3680 );
3681 } else {
3682 panic!("Expected tool call at index 1");
3683 }
3684 });
3685 }
3686
3687 #[gpui::test]
3688 async fn test_user_prompt_refusal_emits_event(cx: &mut TestAppContext) {
3689 init_test(cx);
3690
3691 let fs = FakeFs::new(cx.executor());
3692 let project = Project::test(fs, None, cx).await;
3693
3694 let refuse_next = Arc::new(AtomicBool::new(false));
3695 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3696 let refuse_next = refuse_next.clone();
3697 move |_request, _thread, _cx| {
3698 if refuse_next.load(SeqCst) {
3699 async move { Ok(acp::PromptResponse::new(acp::StopReason::Refusal)) }
3700 .boxed_local()
3701 } else {
3702 async move { Ok(acp::PromptResponse::new(acp::StopReason::EndTurn)) }
3703 .boxed_local()
3704 }
3705 }
3706 }));
3707
3708 let thread = cx
3709 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3710 .await
3711 .unwrap();
3712
3713 // Track if we see a Refusal event
3714 let saw_refusal_event = Arc::new(std::sync::Mutex::new(false));
3715 let saw_refusal_event_captured = saw_refusal_event.clone();
3716 thread.update(cx, |_thread, cx| {
3717 cx.subscribe(
3718 &thread,
3719 move |_thread, _event_thread, event: &AcpThreadEvent, _cx| {
3720 if matches!(event, AcpThreadEvent::Refusal) {
3721 *saw_refusal_event_captured.lock().unwrap() = true;
3722 }
3723 },
3724 )
3725 .detach();
3726 });
3727
3728 // Send a message that will be refused
3729 refuse_next.store(true, SeqCst);
3730 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["hello".into()], cx)))
3731 .await
3732 .unwrap();
3733
3734 // Verify that a Refusal event WAS emitted for user prompt refusal
3735 assert!(
3736 *saw_refusal_event.lock().unwrap(),
3737 "Refusal event should be emitted for user prompt refusals"
3738 );
3739
3740 // Verify the message was truncated (user prompt refusal)
3741 thread.read_with(cx, |thread, cx| {
3742 assert_eq!(thread.to_markdown(cx), "");
3743 });
3744 }
3745
3746 #[gpui::test]
3747 async fn test_refusal(cx: &mut TestAppContext) {
3748 init_test(cx);
3749 let fs = FakeFs::new(cx.background_executor.clone());
3750 fs.insert_tree(path!("/"), json!({})).await;
3751 let project = Project::test(fs.clone(), [path!("/").as_ref()], cx).await;
3752
3753 let refuse_next = Arc::new(AtomicBool::new(false));
3754 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3755 let refuse_next = refuse_next.clone();
3756 move |request, thread, mut cx| {
3757 let refuse_next = refuse_next.clone();
3758 async move {
3759 if refuse_next.load(SeqCst) {
3760 return Ok(acp::PromptResponse::new(acp::StopReason::Refusal));
3761 }
3762
3763 let acp::ContentBlock::Text(content) = &request.prompt[0] else {
3764 panic!("expected text content block");
3765 };
3766 thread.update(&mut cx, |thread, cx| {
3767 thread
3768 .handle_session_update(
3769 acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk::new(
3770 content.text.to_uppercase().into(),
3771 )),
3772 cx,
3773 )
3774 .unwrap();
3775 })?;
3776 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3777 }
3778 .boxed_local()
3779 }
3780 }));
3781 let thread = cx
3782 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3783 .await
3784 .unwrap();
3785
3786 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["hello".into()], cx)))
3787 .await
3788 .unwrap();
3789 thread.read_with(cx, |thread, cx| {
3790 assert_eq!(
3791 thread.to_markdown(cx),
3792 indoc! {"
3793 ## User
3794
3795 hello
3796
3797 ## Assistant
3798
3799 HELLO
3800
3801 "}
3802 );
3803 });
3804
3805 // Simulate refusing the second message. The message should be truncated
3806 // when a user prompt is refused.
3807 refuse_next.store(true, SeqCst);
3808 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["world".into()], cx)))
3809 .await
3810 .unwrap();
3811 thread.read_with(cx, |thread, cx| {
3812 assert_eq!(
3813 thread.to_markdown(cx),
3814 indoc! {"
3815 ## User
3816
3817 hello
3818
3819 ## Assistant
3820
3821 HELLO
3822
3823 "}
3824 );
3825 });
3826 }
3827
3828 async fn run_until_first_tool_call(
3829 thread: &Entity<AcpThread>,
3830 cx: &mut TestAppContext,
3831 ) -> usize {
3832 let (mut tx, mut rx) = mpsc::channel::<usize>(1);
3833
3834 let subscription = cx.update(|cx| {
3835 cx.subscribe(thread, move |thread, _, cx| {
3836 for (ix, entry) in thread.read(cx).entries.iter().enumerate() {
3837 if matches!(entry, AgentThreadEntry::ToolCall(_)) {
3838 return tx.try_send(ix).unwrap();
3839 }
3840 }
3841 })
3842 });
3843
3844 select! {
3845 _ = futures::FutureExt::fuse(cx.background_executor.timer(Duration::from_secs(10))) => {
3846 panic!("Timeout waiting for tool call")
3847 }
3848 ix = rx.next().fuse() => {
3849 drop(subscription);
3850 ix.unwrap()
3851 }
3852 }
3853 }
3854
3855 #[derive(Clone, Default)]
3856 struct FakeAgentConnection {
3857 auth_methods: Vec<acp::AuthMethod>,
3858 sessions: Arc<parking_lot::Mutex<HashMap<acp::SessionId, WeakEntity<AcpThread>>>>,
3859 on_user_message: Option<
3860 Rc<
3861 dyn Fn(
3862 acp::PromptRequest,
3863 WeakEntity<AcpThread>,
3864 AsyncApp,
3865 ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
3866 + 'static,
3867 >,
3868 >,
3869 }
3870
3871 impl FakeAgentConnection {
3872 fn new() -> Self {
3873 Self {
3874 auth_methods: Vec::new(),
3875 on_user_message: None,
3876 sessions: Arc::default(),
3877 }
3878 }
3879
3880 #[expect(unused)]
3881 fn with_auth_methods(mut self, auth_methods: Vec<acp::AuthMethod>) -> Self {
3882 self.auth_methods = auth_methods;
3883 self
3884 }
3885
3886 fn on_user_message(
3887 mut self,
3888 handler: impl Fn(
3889 acp::PromptRequest,
3890 WeakEntity<AcpThread>,
3891 AsyncApp,
3892 ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
3893 + 'static,
3894 ) -> Self {
3895 self.on_user_message.replace(Rc::new(handler));
3896 self
3897 }
3898 }
3899
3900 impl AgentConnection for FakeAgentConnection {
3901 fn telemetry_id(&self) -> SharedString {
3902 "fake".into()
3903 }
3904
3905 fn auth_methods(&self) -> &[acp::AuthMethod] {
3906 &self.auth_methods
3907 }
3908
3909 fn new_thread(
3910 self: Rc<Self>,
3911 project: Entity<Project>,
3912 _cwd: &Path,
3913 cx: &mut App,
3914 ) -> Task<gpui::Result<Entity<AcpThread>>> {
3915 let session_id = acp::SessionId::new(
3916 rand::rng()
3917 .sample_iter(&distr::Alphanumeric)
3918 .take(7)
3919 .map(char::from)
3920 .collect::<String>(),
3921 );
3922 let action_log = cx.new(|_| ActionLog::new(project.clone()));
3923 let thread = cx.new(|cx| {
3924 AcpThread::new(
3925 "Test",
3926 self.clone(),
3927 project,
3928 action_log,
3929 session_id.clone(),
3930 watch::Receiver::constant(
3931 acp::PromptCapabilities::new()
3932 .image(true)
3933 .audio(true)
3934 .embedded_context(true),
3935 ),
3936 cx,
3937 )
3938 });
3939 self.sessions.lock().insert(session_id, thread.downgrade());
3940 Task::ready(Ok(thread))
3941 }
3942
3943 fn authenticate(&self, method: acp::AuthMethodId, _cx: &mut App) -> Task<gpui::Result<()>> {
3944 if self.auth_methods().iter().any(|m| m.id == method) {
3945 Task::ready(Ok(()))
3946 } else {
3947 Task::ready(Err(anyhow!("Invalid Auth Method")))
3948 }
3949 }
3950
3951 fn prompt(
3952 &self,
3953 _id: Option<UserMessageId>,
3954 params: acp::PromptRequest,
3955 cx: &mut App,
3956 ) -> Task<gpui::Result<acp::PromptResponse>> {
3957 let sessions = self.sessions.lock();
3958 let thread = sessions.get(¶ms.session_id).unwrap();
3959 if let Some(handler) = &self.on_user_message {
3960 let handler = handler.clone();
3961 let thread = thread.clone();
3962 cx.spawn(async move |cx| handler(params, thread, cx.clone()).await)
3963 } else {
3964 Task::ready(Ok(acp::PromptResponse::new(acp::StopReason::EndTurn)))
3965 }
3966 }
3967
3968 fn cancel(&self, session_id: &acp::SessionId, cx: &mut App) {
3969 let sessions = self.sessions.lock();
3970 let thread = sessions.get(session_id).unwrap().clone();
3971
3972 cx.spawn(async move |cx| {
3973 thread
3974 .update(cx, |thread, cx| thread.cancel(cx))
3975 .unwrap()
3976 .await
3977 })
3978 .detach();
3979 }
3980
3981 fn truncate(
3982 &self,
3983 session_id: &acp::SessionId,
3984 _cx: &App,
3985 ) -> Option<Rc<dyn AgentSessionTruncate>> {
3986 Some(Rc::new(FakeAgentSessionEditor {
3987 _session_id: session_id.clone(),
3988 }))
3989 }
3990
3991 fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
3992 self
3993 }
3994 }
3995
3996 struct FakeAgentSessionEditor {
3997 _session_id: acp::SessionId,
3998 }
3999
4000 impl AgentSessionTruncate for FakeAgentSessionEditor {
4001 fn run(&self, _message_id: UserMessageId, _cx: &mut App) -> Task<Result<()>> {
4002 Task::ready(Ok(()))
4003 }
4004 }
4005
4006 #[gpui::test]
4007 async fn test_tool_call_not_found_creates_failed_entry(cx: &mut TestAppContext) {
4008 init_test(cx);
4009
4010 let fs = FakeFs::new(cx.executor());
4011 let project = Project::test(fs, [], cx).await;
4012 let connection = Rc::new(FakeAgentConnection::new());
4013 let thread = cx
4014 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
4015 .await
4016 .unwrap();
4017
4018 // Try to update a tool call that doesn't exist
4019 let nonexistent_id = acp::ToolCallId::new("nonexistent-tool-call");
4020 thread.update(cx, |thread, cx| {
4021 let result = thread.handle_session_update(
4022 acp::SessionUpdate::ToolCallUpdate(acp::ToolCallUpdate::new(
4023 nonexistent_id.clone(),
4024 acp::ToolCallUpdateFields::new().status(acp::ToolCallStatus::Completed),
4025 )),
4026 cx,
4027 );
4028
4029 // The update should succeed (not return an error)
4030 assert!(result.is_ok());
4031
4032 // There should now be exactly one entry in the thread
4033 assert_eq!(thread.entries.len(), 1);
4034
4035 // The entry should be a failed tool call
4036 if let AgentThreadEntry::ToolCall(tool_call) = &thread.entries[0] {
4037 assert_eq!(tool_call.id, nonexistent_id);
4038 assert!(matches!(tool_call.status, ToolCallStatus::Failed));
4039 assert_eq!(tool_call.kind, acp::ToolKind::Fetch);
4040
4041 // Check that the content contains the error message
4042 assert_eq!(tool_call.content.len(), 1);
4043 if let ToolCallContent::ContentBlock(content_block) = &tool_call.content[0] {
4044 match content_block {
4045 ContentBlock::Markdown { markdown } => {
4046 let markdown_text = markdown.read(cx).source();
4047 assert!(markdown_text.contains("Tool call not found"));
4048 }
4049 ContentBlock::Empty => panic!("Expected markdown content, got empty"),
4050 ContentBlock::ResourceLink { .. } => {
4051 panic!("Expected markdown content, got resource link")
4052 }
4053 ContentBlock::Image { .. } => {
4054 panic!("Expected markdown content, got image")
4055 }
4056 }
4057 } else {
4058 panic!("Expected ContentBlock, got: {:?}", tool_call.content[0]);
4059 }
4060 } else {
4061 panic!("Expected ToolCall entry, got: {:?}", thread.entries[0]);
4062 }
4063 });
4064 }
4065
4066 /// Tests that restoring a checkpoint properly cleans up terminals that were
4067 /// created after that checkpoint, and cancels any in-progress generation.
4068 ///
4069 /// Reproduces issue #35142: When a checkpoint is restored, any terminal processes
4070 /// that were started after that checkpoint should be terminated, and any in-progress
4071 /// AI generation should be canceled.
4072 #[gpui::test]
4073 async fn test_restore_checkpoint_kills_terminal(cx: &mut TestAppContext) {
4074 init_test(cx);
4075
4076 let fs = FakeFs::new(cx.executor());
4077 let project = Project::test(fs, [], cx).await;
4078 let connection = Rc::new(FakeAgentConnection::new());
4079 let thread = cx
4080 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
4081 .await
4082 .unwrap();
4083
4084 // Send first user message to create a checkpoint
4085 cx.update(|cx| {
4086 thread.update(cx, |thread, cx| {
4087 thread.send(vec!["first message".into()], cx)
4088 })
4089 })
4090 .await
4091 .unwrap();
4092
4093 // Send second message (creates another checkpoint) - we'll restore to this one
4094 cx.update(|cx| {
4095 thread.update(cx, |thread, cx| {
4096 thread.send(vec!["second message".into()], cx)
4097 })
4098 })
4099 .await
4100 .unwrap();
4101
4102 // Create 2 terminals BEFORE the checkpoint that have completed running
4103 let terminal_id_1 = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
4104 let mock_terminal_1 = cx.new(|cx| {
4105 let builder = ::terminal::TerminalBuilder::new_display_only(
4106 ::terminal::terminal_settings::CursorShape::default(),
4107 ::terminal::terminal_settings::AlternateScroll::On,
4108 None,
4109 0,
4110 cx.background_executor(),
4111 PathStyle::local(),
4112 )
4113 .unwrap();
4114 builder.subscribe(cx)
4115 });
4116
4117 thread.update(cx, |thread, cx| {
4118 thread.on_terminal_provider_event(
4119 TerminalProviderEvent::Created {
4120 terminal_id: terminal_id_1.clone(),
4121 label: "echo 'first'".to_string(),
4122 cwd: Some(PathBuf::from("/test")),
4123 output_byte_limit: None,
4124 terminal: mock_terminal_1.clone(),
4125 },
4126 cx,
4127 );
4128 });
4129
4130 thread.update(cx, |thread, cx| {
4131 thread.on_terminal_provider_event(
4132 TerminalProviderEvent::Output {
4133 terminal_id: terminal_id_1.clone(),
4134 data: b"first\n".to_vec(),
4135 },
4136 cx,
4137 );
4138 });
4139
4140 thread.update(cx, |thread, cx| {
4141 thread.on_terminal_provider_event(
4142 TerminalProviderEvent::Exit {
4143 terminal_id: terminal_id_1.clone(),
4144 status: acp::TerminalExitStatus::new().exit_code(0),
4145 },
4146 cx,
4147 );
4148 });
4149
4150 let terminal_id_2 = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
4151 let mock_terminal_2 = cx.new(|cx| {
4152 let builder = ::terminal::TerminalBuilder::new_display_only(
4153 ::terminal::terminal_settings::CursorShape::default(),
4154 ::terminal::terminal_settings::AlternateScroll::On,
4155 None,
4156 0,
4157 cx.background_executor(),
4158 PathStyle::local(),
4159 )
4160 .unwrap();
4161 builder.subscribe(cx)
4162 });
4163
4164 thread.update(cx, |thread, cx| {
4165 thread.on_terminal_provider_event(
4166 TerminalProviderEvent::Created {
4167 terminal_id: terminal_id_2.clone(),
4168 label: "echo 'second'".to_string(),
4169 cwd: Some(PathBuf::from("/test")),
4170 output_byte_limit: None,
4171 terminal: mock_terminal_2.clone(),
4172 },
4173 cx,
4174 );
4175 });
4176
4177 thread.update(cx, |thread, cx| {
4178 thread.on_terminal_provider_event(
4179 TerminalProviderEvent::Output {
4180 terminal_id: terminal_id_2.clone(),
4181 data: b"second\n".to_vec(),
4182 },
4183 cx,
4184 );
4185 });
4186
4187 thread.update(cx, |thread, cx| {
4188 thread.on_terminal_provider_event(
4189 TerminalProviderEvent::Exit {
4190 terminal_id: terminal_id_2.clone(),
4191 status: acp::TerminalExitStatus::new().exit_code(0),
4192 },
4193 cx,
4194 );
4195 });
4196
4197 // Get the second message ID to restore to
4198 let second_message_id = thread.read_with(cx, |thread, _| {
4199 // At this point we have:
4200 // - Index 0: First user message (with checkpoint)
4201 // - Index 1: Second user message (with checkpoint)
4202 // No assistant responses because FakeAgentConnection just returns EndTurn
4203 let AgentThreadEntry::UserMessage(message) = &thread.entries[1] else {
4204 panic!("expected user message at index 1");
4205 };
4206 message.id.clone().unwrap()
4207 });
4208
4209 // Create a terminal AFTER the checkpoint we'll restore to.
4210 // This simulates the AI agent starting a long-running terminal command.
4211 let terminal_id = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
4212 let mock_terminal = cx.new(|cx| {
4213 let builder = ::terminal::TerminalBuilder::new_display_only(
4214 ::terminal::terminal_settings::CursorShape::default(),
4215 ::terminal::terminal_settings::AlternateScroll::On,
4216 None,
4217 0,
4218 cx.background_executor(),
4219 PathStyle::local(),
4220 )
4221 .unwrap();
4222 builder.subscribe(cx)
4223 });
4224
4225 // Register the terminal as created
4226 thread.update(cx, |thread, cx| {
4227 thread.on_terminal_provider_event(
4228 TerminalProviderEvent::Created {
4229 terminal_id: terminal_id.clone(),
4230 label: "sleep 1000".to_string(),
4231 cwd: Some(PathBuf::from("/test")),
4232 output_byte_limit: None,
4233 terminal: mock_terminal.clone(),
4234 },
4235 cx,
4236 );
4237 });
4238
4239 // Simulate the terminal producing output (still running)
4240 thread.update(cx, |thread, cx| {
4241 thread.on_terminal_provider_event(
4242 TerminalProviderEvent::Output {
4243 terminal_id: terminal_id.clone(),
4244 data: b"terminal is running...\n".to_vec(),
4245 },
4246 cx,
4247 );
4248 });
4249
4250 // Create a tool call entry that references this terminal
4251 // This represents the agent requesting a terminal command
4252 thread.update(cx, |thread, cx| {
4253 thread
4254 .handle_session_update(
4255 acp::SessionUpdate::ToolCall(
4256 acp::ToolCall::new("terminal-tool-1", "Running command")
4257 .kind(acp::ToolKind::Execute)
4258 .status(acp::ToolCallStatus::InProgress)
4259 .content(vec![acp::ToolCallContent::Terminal(acp::Terminal::new(
4260 terminal_id.clone(),
4261 ))])
4262 .raw_input(serde_json::json!({"command": "sleep 1000", "cd": "/test"})),
4263 ),
4264 cx,
4265 )
4266 .unwrap();
4267 });
4268
4269 // Verify terminal exists and is in the thread
4270 let terminal_exists_before =
4271 thread.read_with(cx, |thread, _| thread.terminals.contains_key(&terminal_id));
4272 assert!(
4273 terminal_exists_before,
4274 "Terminal should exist before checkpoint restore"
4275 );
4276
4277 // Verify the terminal's underlying task is still running (not completed)
4278 let terminal_running_before = thread.read_with(cx, |thread, _cx| {
4279 let terminal_entity = thread.terminals.get(&terminal_id).unwrap();
4280 terminal_entity.read_with(cx, |term, _cx| {
4281 term.output().is_none() // output is None means it's still running
4282 })
4283 });
4284 assert!(
4285 terminal_running_before,
4286 "Terminal should be running before checkpoint restore"
4287 );
4288
4289 // Verify we have the expected entries before restore
4290 let entry_count_before = thread.read_with(cx, |thread, _| thread.entries.len());
4291 assert!(
4292 entry_count_before > 1,
4293 "Should have multiple entries before restore"
4294 );
4295
4296 // Restore the checkpoint to the second message.
4297 // This should:
4298 // 1. Cancel any in-progress generation (via the cancel() call)
4299 // 2. Remove the terminal that was created after that point
4300 thread
4301 .update(cx, |thread, cx| {
4302 thread.restore_checkpoint(second_message_id, cx)
4303 })
4304 .await
4305 .unwrap();
4306
4307 // Verify that no send_task is in progress after restore
4308 // (cancel() clears the send_task)
4309 let has_send_task_after = thread.read_with(cx, |thread, _| thread.send_task.is_some());
4310 assert!(
4311 !has_send_task_after,
4312 "Should not have a send_task after restore (cancel should have cleared it)"
4313 );
4314
4315 // Verify the entries were truncated (restoring to index 1 truncates at 1, keeping only index 0)
4316 let entry_count = thread.read_with(cx, |thread, _| thread.entries.len());
4317 assert_eq!(
4318 entry_count, 1,
4319 "Should have 1 entry after restore (only the first user message)"
4320 );
4321
4322 // Verify the 2 completed terminals from before the checkpoint still exist
4323 let terminal_1_exists = thread.read_with(cx, |thread, _| {
4324 thread.terminals.contains_key(&terminal_id_1)
4325 });
4326 assert!(
4327 terminal_1_exists,
4328 "Terminal 1 (from before checkpoint) should still exist"
4329 );
4330
4331 let terminal_2_exists = thread.read_with(cx, |thread, _| {
4332 thread.terminals.contains_key(&terminal_id_2)
4333 });
4334 assert!(
4335 terminal_2_exists,
4336 "Terminal 2 (from before checkpoint) should still exist"
4337 );
4338
4339 // Verify they're still in completed state
4340 let terminal_1_completed = thread.read_with(cx, |thread, _cx| {
4341 let terminal_entity = thread.terminals.get(&terminal_id_1).unwrap();
4342 terminal_entity.read_with(cx, |term, _cx| term.output().is_some())
4343 });
4344 assert!(terminal_1_completed, "Terminal 1 should still be completed");
4345
4346 let terminal_2_completed = thread.read_with(cx, |thread, _cx| {
4347 let terminal_entity = thread.terminals.get(&terminal_id_2).unwrap();
4348 terminal_entity.read_with(cx, |term, _cx| term.output().is_some())
4349 });
4350 assert!(terminal_2_completed, "Terminal 2 should still be completed");
4351
4352 // Verify the running terminal (created after checkpoint) was removed
4353 let terminal_3_exists =
4354 thread.read_with(cx, |thread, _| thread.terminals.contains_key(&terminal_id));
4355 assert!(
4356 !terminal_3_exists,
4357 "Terminal 3 (created after checkpoint) should have been removed"
4358 );
4359
4360 // Verify total count is 2 (the two from before the checkpoint)
4361 let terminal_count = thread.read_with(cx, |thread, _| thread.terminals.len());
4362 assert_eq!(
4363 terminal_count, 2,
4364 "Should have exactly 2 terminals (the completed ones from before checkpoint)"
4365 );
4366 }
4367
4368 /// Tests that update_last_checkpoint correctly updates the original message's checkpoint
4369 /// even when a new user message is added while the async checkpoint comparison is in progress.
4370 ///
4371 /// This is a regression test for a bug where update_last_checkpoint would fail with
4372 /// "no checkpoint" if a new user message (without a checkpoint) was added between when
4373 /// update_last_checkpoint started and when its async closure ran.
4374 #[gpui::test]
4375 async fn test_update_last_checkpoint_with_new_message_added(cx: &mut TestAppContext) {
4376 init_test(cx);
4377
4378 let fs = FakeFs::new(cx.executor());
4379 fs.insert_tree(path!("/test"), json!({".git": {}, "file.txt": "content"}))
4380 .await;
4381 let project = Project::test(fs.clone(), [Path::new(path!("/test"))], cx).await;
4382
4383 let handler_done = Arc::new(AtomicBool::new(false));
4384 let handler_done_clone = handler_done.clone();
4385 let connection = Rc::new(FakeAgentConnection::new().on_user_message(
4386 move |_, _thread, _cx| {
4387 handler_done_clone.store(true, SeqCst);
4388 async move { Ok(acp::PromptResponse::new(acp::StopReason::EndTurn)) }.boxed_local()
4389 },
4390 ));
4391
4392 let thread = cx
4393 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
4394 .await
4395 .unwrap();
4396
4397 let send_future = thread.update(cx, |thread, cx| thread.send_raw("First message", cx));
4398 let send_task = cx.background_executor.spawn(send_future);
4399
4400 // Tick until handler completes, then a few more to let update_last_checkpoint start
4401 while !handler_done.load(SeqCst) {
4402 cx.executor().tick();
4403 }
4404 for _ in 0..5 {
4405 cx.executor().tick();
4406 }
4407
4408 thread.update(cx, |thread, cx| {
4409 thread.push_entry(
4410 AgentThreadEntry::UserMessage(UserMessage {
4411 id: Some(UserMessageId::new()),
4412 content: ContentBlock::Empty,
4413 chunks: vec!["Injected message (no checkpoint)".into()],
4414 checkpoint: None,
4415 indented: false,
4416 }),
4417 cx,
4418 );
4419 });
4420
4421 cx.run_until_parked();
4422 let result = send_task.await;
4423
4424 assert!(
4425 result.is_ok(),
4426 "send should succeed even when new message added during update_last_checkpoint: {:?}",
4427 result.err()
4428 );
4429 }
4430}