1mod connection;
2mod diff;
3mod mention;
4mod terminal;
5use action_log::{ActionLog, ActionLogTelemetry};
6use agent_client_protocol::{self as acp};
7use anyhow::{Context as _, Result, anyhow};
8use collections::HashSet;
9pub use connection::*;
10pub use diff::*;
11use futures::{FutureExt, channel::oneshot, future::BoxFuture};
12use gpui::{AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Task, WeakEntity};
13use itertools::Itertools;
14use language::language_settings::FormatOnSave;
15use language::{Anchor, Buffer, BufferSnapshot, LanguageRegistry, Point, ToPoint, text_diff};
16use markdown::Markdown;
17pub use mention::*;
18use project::lsp_store::{FormatTrigger, LspFormatTarget};
19use project::{AgentLocation, Project, git_store::GitStoreCheckpoint};
20use serde::{Deserialize, Serialize};
21use serde_json::to_string_pretty;
22use std::collections::HashMap;
23use std::error::Error;
24use std::fmt::{Formatter, Write};
25use std::ops::Range;
26use std::process::ExitStatus;
27use std::rc::Rc;
28use std::time::{Duration, Instant};
29use std::{fmt::Display, mem, path::PathBuf, sync::Arc};
30use task::{Shell, ShellBuilder};
31pub use terminal::*;
32use text::Bias;
33use ui::App;
34use util::{ResultExt, get_default_system_shell_preferring_bash, paths::PathStyle};
35use uuid::Uuid;
36
37/// Key used in ACP ToolCall meta to store the tool's programmatic name.
38/// This is a workaround since ACP's ToolCall doesn't have a dedicated name field.
39pub const TOOL_NAME_META_KEY: &str = "tool_name";
40
41/// Helper to extract tool name from ACP meta
42pub fn tool_name_from_meta(meta: &Option<acp::Meta>) -> Option<SharedString> {
43 meta.as_ref()
44 .and_then(|m| m.get(TOOL_NAME_META_KEY))
45 .and_then(|v| v.as_str())
46 .map(|s| SharedString::from(s.to_owned()))
47}
48
49/// Helper to create meta with tool name
50pub fn meta_with_tool_name(tool_name: &str) -> acp::Meta {
51 acp::Meta::from_iter([(TOOL_NAME_META_KEY.into(), tool_name.into())])
52}
53
54/// Key used in ACP ToolCall meta to store the session id and message indexes
55pub const SUBAGENT_SESSION_INFO_META_KEY: &str = "subagent_session_info";
56
57#[derive(Clone, Debug, Deserialize, Serialize)]
58pub struct SubagentSessionInfo {
59 /// The session id of the subagent sessiont that was spawned
60 pub session_id: acp::SessionId,
61 /// The index of the message of the start of the "turn" run by this tool call
62 pub message_start_index: usize,
63 /// The index of the output of the message that the subagent has returned
64 #[serde(skip_serializing_if = "Option::is_none")]
65 pub message_end_index: Option<usize>,
66}
67
68/// Helper to extract subagent session id from ACP meta
69pub fn subagent_session_info_from_meta(meta: &Option<acp::Meta>) -> Option<SubagentSessionInfo> {
70 meta.as_ref()
71 .and_then(|m| m.get(SUBAGENT_SESSION_INFO_META_KEY))
72 .and_then(|v| serde_json::from_value(v.clone()).ok())
73}
74
75#[derive(Debug)]
76pub struct UserMessage {
77 pub id: Option<UserMessageId>,
78 pub content: ContentBlock,
79 pub chunks: Vec<acp::ContentBlock>,
80 pub checkpoint: Option<Checkpoint>,
81 pub indented: bool,
82}
83
84#[derive(Debug)]
85pub struct Checkpoint {
86 git_checkpoint: GitStoreCheckpoint,
87 pub show: bool,
88}
89
90impl UserMessage {
91 fn to_markdown(&self, cx: &App) -> String {
92 let mut markdown = String::new();
93 if self
94 .checkpoint
95 .as_ref()
96 .is_some_and(|checkpoint| checkpoint.show)
97 {
98 writeln!(markdown, "## User (checkpoint)").unwrap();
99 } else {
100 writeln!(markdown, "## User").unwrap();
101 }
102 writeln!(markdown).unwrap();
103 writeln!(markdown, "{}", self.content.to_markdown(cx)).unwrap();
104 writeln!(markdown).unwrap();
105 markdown
106 }
107}
108
109#[derive(Debug, PartialEq)]
110pub struct AssistantMessage {
111 pub chunks: Vec<AssistantMessageChunk>,
112 pub indented: bool,
113 pub is_subagent_output: bool,
114}
115
116impl AssistantMessage {
117 pub fn to_markdown(&self, cx: &App) -> String {
118 format!(
119 "## Assistant\n\n{}\n\n",
120 self.chunks
121 .iter()
122 .map(|chunk| chunk.to_markdown(cx))
123 .join("\n\n")
124 )
125 }
126}
127
128#[derive(Debug, PartialEq)]
129pub enum AssistantMessageChunk {
130 Message { block: ContentBlock },
131 Thought { block: ContentBlock },
132}
133
134impl AssistantMessageChunk {
135 pub fn from_str(
136 chunk: &str,
137 language_registry: &Arc<LanguageRegistry>,
138 path_style: PathStyle,
139 cx: &mut App,
140 ) -> Self {
141 Self::Message {
142 block: ContentBlock::new(chunk.into(), language_registry, path_style, cx),
143 }
144 }
145
146 fn to_markdown(&self, cx: &App) -> String {
147 match self {
148 Self::Message { block } => block.to_markdown(cx).to_string(),
149 Self::Thought { block } => {
150 format!("<thinking>\n{}\n</thinking>", block.to_markdown(cx))
151 }
152 }
153 }
154}
155
156#[derive(Debug)]
157pub enum AgentThreadEntry {
158 UserMessage(UserMessage),
159 AssistantMessage(AssistantMessage),
160 ToolCall(ToolCall),
161}
162
163impl AgentThreadEntry {
164 pub fn is_indented(&self) -> bool {
165 match self {
166 Self::UserMessage(message) => message.indented,
167 Self::AssistantMessage(message) => message.indented,
168 Self::ToolCall(_) => false,
169 }
170 }
171
172 pub fn to_markdown(&self, cx: &App) -> String {
173 match self {
174 Self::UserMessage(message) => message.to_markdown(cx),
175 Self::AssistantMessage(message) => message.to_markdown(cx),
176 Self::ToolCall(tool_call) => tool_call.to_markdown(cx),
177 }
178 }
179
180 pub fn user_message(&self) -> Option<&UserMessage> {
181 if let AgentThreadEntry::UserMessage(message) = self {
182 Some(message)
183 } else {
184 None
185 }
186 }
187
188 pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
189 if let AgentThreadEntry::ToolCall(call) = self {
190 itertools::Either::Left(call.diffs())
191 } else {
192 itertools::Either::Right(std::iter::empty())
193 }
194 }
195
196 pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
197 if let AgentThreadEntry::ToolCall(call) = self {
198 itertools::Either::Left(call.terminals())
199 } else {
200 itertools::Either::Right(std::iter::empty())
201 }
202 }
203
204 pub fn location(&self, ix: usize) -> Option<(acp::ToolCallLocation, AgentLocation)> {
205 if let AgentThreadEntry::ToolCall(ToolCall {
206 locations,
207 resolved_locations,
208 ..
209 }) = self
210 {
211 Some((
212 locations.get(ix)?.clone(),
213 resolved_locations.get(ix)?.clone()?,
214 ))
215 } else {
216 None
217 }
218 }
219}
220
221#[derive(Debug)]
222pub struct ToolCall {
223 pub id: acp::ToolCallId,
224 pub label: Entity<Markdown>,
225 pub kind: acp::ToolKind,
226 pub content: Vec<ToolCallContent>,
227 pub status: ToolCallStatus,
228 pub locations: Vec<acp::ToolCallLocation>,
229 pub resolved_locations: Vec<Option<AgentLocation>>,
230 pub raw_input: Option<serde_json::Value>,
231 pub raw_input_markdown: Option<Entity<Markdown>>,
232 pub raw_output: Option<serde_json::Value>,
233 pub tool_name: Option<SharedString>,
234 pub subagent_session_info: Option<SubagentSessionInfo>,
235}
236
237impl ToolCall {
238 fn from_acp(
239 tool_call: acp::ToolCall,
240 status: ToolCallStatus,
241 language_registry: Arc<LanguageRegistry>,
242 path_style: PathStyle,
243 terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
244 cx: &mut App,
245 ) -> Result<Self> {
246 let title = if tool_call.kind == acp::ToolKind::Execute {
247 tool_call.title
248 } else if let Some((first_line, _)) = tool_call.title.split_once("\n") {
249 first_line.to_owned() + "…"
250 } else {
251 tool_call.title
252 };
253 let mut content = Vec::with_capacity(tool_call.content.len());
254 for item in tool_call.content {
255 if let Some(item) = ToolCallContent::from_acp(
256 item,
257 language_registry.clone(),
258 path_style,
259 terminals,
260 cx,
261 )? {
262 content.push(item);
263 }
264 }
265
266 let raw_input_markdown = tool_call
267 .raw_input
268 .as_ref()
269 .and_then(|input| markdown_for_raw_output(input, &language_registry, cx));
270
271 let tool_name = tool_name_from_meta(&tool_call.meta);
272
273 let subagent_session_info = subagent_session_info_from_meta(&tool_call.meta);
274
275 let result = Self {
276 id: tool_call.tool_call_id,
277 label: cx
278 .new(|cx| Markdown::new(title.into(), Some(language_registry.clone()), None, cx)),
279 kind: tool_call.kind,
280 content,
281 locations: tool_call.locations,
282 resolved_locations: Vec::default(),
283 status,
284 raw_input: tool_call.raw_input,
285 raw_input_markdown,
286 raw_output: tool_call.raw_output,
287 tool_name,
288 subagent_session_info,
289 };
290 Ok(result)
291 }
292
293 fn update_fields(
294 &mut self,
295 fields: acp::ToolCallUpdateFields,
296 meta: Option<acp::Meta>,
297 language_registry: Arc<LanguageRegistry>,
298 path_style: PathStyle,
299 terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
300 cx: &mut App,
301 ) -> Result<()> {
302 let acp::ToolCallUpdateFields {
303 kind,
304 status,
305 title,
306 content,
307 locations,
308 raw_input,
309 raw_output,
310 ..
311 } = fields;
312
313 if let Some(kind) = kind {
314 self.kind = kind;
315 }
316
317 if let Some(status) = status {
318 self.status = status.into();
319 }
320
321 if let Some(subagent_session_info) = subagent_session_info_from_meta(&meta) {
322 self.subagent_session_info = Some(subagent_session_info);
323 }
324
325 if let Some(title) = title {
326 if self.kind == acp::ToolKind::Execute {
327 for terminal in self.terminals() {
328 terminal.update(cx, |terminal, cx| {
329 terminal.update_command_label(&title, cx);
330 });
331 }
332 }
333 self.label.update(cx, |label, cx| {
334 if self.kind == acp::ToolKind::Execute {
335 label.replace(title, cx);
336 } else if let Some((first_line, _)) = title.split_once("\n") {
337 label.replace(first_line.to_owned() + "…", cx);
338 } else {
339 label.replace(title, cx);
340 }
341 });
342 }
343
344 if let Some(content) = content {
345 let mut new_content_len = content.len();
346 let mut content = content.into_iter();
347
348 // Reuse existing content if we can
349 for (old, new) in self.content.iter_mut().zip(content.by_ref()) {
350 let valid_content =
351 old.update_from_acp(new, language_registry.clone(), path_style, terminals, cx)?;
352 if !valid_content {
353 new_content_len -= 1;
354 }
355 }
356 for new in content {
357 if let Some(new) = ToolCallContent::from_acp(
358 new,
359 language_registry.clone(),
360 path_style,
361 terminals,
362 cx,
363 )? {
364 self.content.push(new);
365 } else {
366 new_content_len -= 1;
367 }
368 }
369 self.content.truncate(new_content_len);
370 }
371
372 if let Some(locations) = locations {
373 self.locations = locations;
374 }
375
376 if let Some(raw_input) = raw_input {
377 self.raw_input_markdown = markdown_for_raw_output(&raw_input, &language_registry, cx);
378 self.raw_input = Some(raw_input);
379 }
380
381 if let Some(raw_output) = raw_output {
382 if self.content.is_empty()
383 && let Some(markdown) = markdown_for_raw_output(&raw_output, &language_registry, cx)
384 {
385 self.content
386 .push(ToolCallContent::ContentBlock(ContentBlock::Markdown {
387 markdown,
388 }));
389 }
390 self.raw_output = Some(raw_output);
391 }
392 Ok(())
393 }
394
395 pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
396 self.content.iter().filter_map(|content| match content {
397 ToolCallContent::Diff(diff) => Some(diff),
398 ToolCallContent::ContentBlock(_) => None,
399 ToolCallContent::Terminal(_) => None,
400 })
401 }
402
403 pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
404 self.content.iter().filter_map(|content| match content {
405 ToolCallContent::Terminal(terminal) => Some(terminal),
406 ToolCallContent::ContentBlock(_) => None,
407 ToolCallContent::Diff(_) => None,
408 })
409 }
410
411 pub fn is_subagent(&self) -> bool {
412 self.tool_name.as_ref().is_some_and(|s| s == "spawn_agent")
413 || self.subagent_session_info.is_some()
414 }
415
416 pub fn to_markdown(&self, cx: &App) -> String {
417 let mut markdown = format!(
418 "**Tool Call: {}**\nStatus: {}\n\n",
419 self.label.read(cx).source(),
420 self.status
421 );
422 for content in &self.content {
423 markdown.push_str(content.to_markdown(cx).as_str());
424 markdown.push_str("\n\n");
425 }
426 markdown
427 }
428
429 async fn resolve_location(
430 location: acp::ToolCallLocation,
431 project: WeakEntity<Project>,
432 cx: &mut AsyncApp,
433 ) -> Option<ResolvedLocation> {
434 let buffer = project
435 .update(cx, |project, cx| {
436 project
437 .project_path_for_absolute_path(&location.path, cx)
438 .map(|path| project.open_buffer(path, cx))
439 })
440 .ok()??;
441 let buffer = buffer.await.log_err()?;
442 let position = buffer.update(cx, |buffer, _| {
443 let snapshot = buffer.snapshot();
444 if let Some(row) = location.line {
445 let column = snapshot.indent_size_for_line(row).len;
446 let point = snapshot.clip_point(Point::new(row, column), Bias::Left);
447 snapshot.anchor_before(point)
448 } else {
449 Anchor::min_for_buffer(snapshot.remote_id())
450 }
451 });
452
453 Some(ResolvedLocation { buffer, position })
454 }
455
456 fn resolve_locations(
457 &self,
458 project: Entity<Project>,
459 cx: &mut App,
460 ) -> Task<Vec<Option<ResolvedLocation>>> {
461 let locations = self.locations.clone();
462 project.update(cx, |_, cx| {
463 cx.spawn(async move |project, cx| {
464 let mut new_locations = Vec::new();
465 for location in locations {
466 new_locations.push(Self::resolve_location(location, project.clone(), cx).await);
467 }
468 new_locations
469 })
470 })
471 }
472}
473
474// Separate so we can hold a strong reference to the buffer
475// for saving on the thread
476#[derive(Clone, Debug, PartialEq, Eq)]
477struct ResolvedLocation {
478 buffer: Entity<Buffer>,
479 position: Anchor,
480}
481
482impl From<&ResolvedLocation> for AgentLocation {
483 fn from(value: &ResolvedLocation) -> Self {
484 Self {
485 buffer: value.buffer.downgrade(),
486 position: value.position,
487 }
488 }
489}
490
491#[derive(Debug)]
492pub enum ToolCallStatus {
493 /// The tool call hasn't started running yet, but we start showing it to
494 /// the user.
495 Pending,
496 /// The tool call is waiting for confirmation from the user.
497 WaitingForConfirmation {
498 options: PermissionOptions,
499 respond_tx: oneshot::Sender<acp::PermissionOptionId>,
500 },
501 /// The tool call is currently running.
502 InProgress,
503 /// The tool call completed successfully.
504 Completed,
505 /// The tool call failed.
506 Failed,
507 /// The user rejected the tool call.
508 Rejected,
509 /// The user canceled generation so the tool call was canceled.
510 Canceled,
511}
512
513impl From<acp::ToolCallStatus> for ToolCallStatus {
514 fn from(status: acp::ToolCallStatus) -> Self {
515 match status {
516 acp::ToolCallStatus::Pending => Self::Pending,
517 acp::ToolCallStatus::InProgress => Self::InProgress,
518 acp::ToolCallStatus::Completed => Self::Completed,
519 acp::ToolCallStatus::Failed => Self::Failed,
520 _ => Self::Pending,
521 }
522 }
523}
524
525impl Display for ToolCallStatus {
526 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
527 write!(
528 f,
529 "{}",
530 match self {
531 ToolCallStatus::Pending => "Pending",
532 ToolCallStatus::WaitingForConfirmation { .. } => "Waiting for confirmation",
533 ToolCallStatus::InProgress => "In Progress",
534 ToolCallStatus::Completed => "Completed",
535 ToolCallStatus::Failed => "Failed",
536 ToolCallStatus::Rejected => "Rejected",
537 ToolCallStatus::Canceled => "Canceled",
538 }
539 )
540 }
541}
542
543#[derive(Debug, PartialEq, Clone)]
544pub enum ContentBlock {
545 Empty,
546 Markdown { markdown: Entity<Markdown> },
547 ResourceLink { resource_link: acp::ResourceLink },
548 Image { image: Arc<gpui::Image> },
549}
550
551impl ContentBlock {
552 pub fn new(
553 block: acp::ContentBlock,
554 language_registry: &Arc<LanguageRegistry>,
555 path_style: PathStyle,
556 cx: &mut App,
557 ) -> Self {
558 let mut this = Self::Empty;
559 this.append(block, language_registry, path_style, cx);
560 this
561 }
562
563 pub fn new_combined(
564 blocks: impl IntoIterator<Item = acp::ContentBlock>,
565 language_registry: Arc<LanguageRegistry>,
566 path_style: PathStyle,
567 cx: &mut App,
568 ) -> Self {
569 let mut this = Self::Empty;
570 for block in blocks {
571 this.append(block, &language_registry, path_style, cx);
572 }
573 this
574 }
575
576 pub fn append(
577 &mut self,
578 block: acp::ContentBlock,
579 language_registry: &Arc<LanguageRegistry>,
580 path_style: PathStyle,
581 cx: &mut App,
582 ) {
583 match (&mut *self, &block) {
584 (ContentBlock::Empty, acp::ContentBlock::ResourceLink(resource_link)) => {
585 *self = ContentBlock::ResourceLink {
586 resource_link: resource_link.clone(),
587 };
588 }
589 (ContentBlock::Empty, acp::ContentBlock::Image(image_content)) => {
590 if let Some(image) = Self::decode_image(image_content) {
591 *self = ContentBlock::Image { image };
592 } else {
593 let new_content = Self::image_md(image_content);
594 *self = Self::create_markdown_block(new_content, language_registry, cx);
595 }
596 }
597 (ContentBlock::Empty, _) => {
598 let new_content = Self::block_string_contents(&block, path_style);
599 *self = Self::create_markdown_block(new_content, language_registry, cx);
600 }
601 (ContentBlock::Markdown { markdown }, _) => {
602 let new_content = Self::block_string_contents(&block, path_style);
603 markdown.update(cx, |markdown, cx| markdown.append(&new_content, cx));
604 }
605 (ContentBlock::ResourceLink { resource_link }, _) => {
606 let existing_content = Self::resource_link_md(&resource_link.uri, path_style);
607 let new_content = Self::block_string_contents(&block, path_style);
608 let combined = format!("{}\n{}", existing_content, new_content);
609 *self = Self::create_markdown_block(combined, language_registry, cx);
610 }
611 (ContentBlock::Image { .. }, _) => {
612 let new_content = Self::block_string_contents(&block, path_style);
613 let combined = format!("`Image`\n{}", new_content);
614 *self = Self::create_markdown_block(combined, language_registry, cx);
615 }
616 }
617 }
618
619 fn decode_image(image_content: &acp::ImageContent) -> Option<Arc<gpui::Image>> {
620 use base64::Engine as _;
621
622 let bytes = base64::engine::general_purpose::STANDARD
623 .decode(image_content.data.as_bytes())
624 .ok()?;
625 let format = gpui::ImageFormat::from_mime_type(&image_content.mime_type)?;
626 Some(Arc::new(gpui::Image::from_bytes(format, bytes)))
627 }
628
629 fn create_markdown_block(
630 content: String,
631 language_registry: &Arc<LanguageRegistry>,
632 cx: &mut App,
633 ) -> ContentBlock {
634 ContentBlock::Markdown {
635 markdown: cx
636 .new(|cx| Markdown::new(content.into(), Some(language_registry.clone()), None, cx)),
637 }
638 }
639
640 fn block_string_contents(block: &acp::ContentBlock, path_style: PathStyle) -> String {
641 match block {
642 acp::ContentBlock::Text(text_content) => text_content.text.clone(),
643 acp::ContentBlock::ResourceLink(resource_link) => {
644 Self::resource_link_md(&resource_link.uri, path_style)
645 }
646 acp::ContentBlock::Resource(acp::EmbeddedResource {
647 resource:
648 acp::EmbeddedResourceResource::TextResourceContents(acp::TextResourceContents {
649 uri,
650 ..
651 }),
652 ..
653 }) => Self::resource_link_md(uri, path_style),
654 acp::ContentBlock::Image(image) => Self::image_md(image),
655 _ => String::new(),
656 }
657 }
658
659 fn resource_link_md(uri: &str, path_style: PathStyle) -> String {
660 if let Some(uri) = MentionUri::parse(uri, path_style).log_err() {
661 uri.as_link().to_string()
662 } else {
663 uri.to_string()
664 }
665 }
666
667 fn image_md(_image: &acp::ImageContent) -> String {
668 "`Image`".into()
669 }
670
671 pub fn to_markdown<'a>(&'a self, cx: &'a App) -> &'a str {
672 match self {
673 ContentBlock::Empty => "",
674 ContentBlock::Markdown { markdown } => markdown.read(cx).source(),
675 ContentBlock::ResourceLink { resource_link } => &resource_link.uri,
676 ContentBlock::Image { .. } => "`Image`",
677 }
678 }
679
680 pub fn markdown(&self) -> Option<&Entity<Markdown>> {
681 match self {
682 ContentBlock::Empty => None,
683 ContentBlock::Markdown { markdown } => Some(markdown),
684 ContentBlock::ResourceLink { .. } => None,
685 ContentBlock::Image { .. } => None,
686 }
687 }
688
689 pub fn resource_link(&self) -> Option<&acp::ResourceLink> {
690 match self {
691 ContentBlock::ResourceLink { resource_link } => Some(resource_link),
692 _ => None,
693 }
694 }
695
696 pub fn image(&self) -> Option<&Arc<gpui::Image>> {
697 match self {
698 ContentBlock::Image { image } => Some(image),
699 _ => None,
700 }
701 }
702}
703
704#[derive(Debug)]
705pub enum ToolCallContent {
706 ContentBlock(ContentBlock),
707 Diff(Entity<Diff>),
708 Terminal(Entity<Terminal>),
709}
710
711impl ToolCallContent {
712 pub fn from_acp(
713 content: acp::ToolCallContent,
714 language_registry: Arc<LanguageRegistry>,
715 path_style: PathStyle,
716 terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
717 cx: &mut App,
718 ) -> Result<Option<Self>> {
719 match content {
720 acp::ToolCallContent::Content(acp::Content { content, .. }) => {
721 Ok(Some(Self::ContentBlock(ContentBlock::new(
722 content,
723 &language_registry,
724 path_style,
725 cx,
726 ))))
727 }
728 acp::ToolCallContent::Diff(diff) => Ok(Some(Self::Diff(cx.new(|cx| {
729 Diff::finalized(
730 diff.path.to_string_lossy().into_owned(),
731 diff.old_text,
732 diff.new_text,
733 language_registry,
734 cx,
735 )
736 })))),
737 acp::ToolCallContent::Terminal(acp::Terminal { terminal_id, .. }) => terminals
738 .get(&terminal_id)
739 .cloned()
740 .map(|terminal| Some(Self::Terminal(terminal)))
741 .ok_or_else(|| anyhow::anyhow!("Terminal with id `{}` not found", terminal_id)),
742 _ => Ok(None),
743 }
744 }
745
746 pub fn update_from_acp(
747 &mut self,
748 new: acp::ToolCallContent,
749 language_registry: Arc<LanguageRegistry>,
750 path_style: PathStyle,
751 terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
752 cx: &mut App,
753 ) -> Result<bool> {
754 let needs_update = match (&self, &new) {
755 (Self::Diff(old_diff), acp::ToolCallContent::Diff(new_diff)) => {
756 old_diff.read(cx).needs_update(
757 new_diff.old_text.as_deref().unwrap_or(""),
758 &new_diff.new_text,
759 cx,
760 )
761 }
762 _ => true,
763 };
764
765 if let Some(update) = Self::from_acp(new, language_registry, path_style, terminals, cx)? {
766 if needs_update {
767 *self = update;
768 }
769 Ok(true)
770 } else {
771 Ok(false)
772 }
773 }
774
775 pub fn to_markdown(&self, cx: &App) -> String {
776 match self {
777 Self::ContentBlock(content) => content.to_markdown(cx).to_string(),
778 Self::Diff(diff) => diff.read(cx).to_markdown(cx),
779 Self::Terminal(terminal) => terminal.read(cx).to_markdown(cx),
780 }
781 }
782
783 pub fn image(&self) -> Option<&Arc<gpui::Image>> {
784 match self {
785 Self::ContentBlock(content) => content.image(),
786 _ => None,
787 }
788 }
789}
790
791#[derive(Debug, PartialEq)]
792pub enum ToolCallUpdate {
793 UpdateFields(acp::ToolCallUpdate),
794 UpdateDiff(ToolCallUpdateDiff),
795 UpdateTerminal(ToolCallUpdateTerminal),
796}
797
798impl ToolCallUpdate {
799 fn id(&self) -> &acp::ToolCallId {
800 match self {
801 Self::UpdateFields(update) => &update.tool_call_id,
802 Self::UpdateDiff(diff) => &diff.id,
803 Self::UpdateTerminal(terminal) => &terminal.id,
804 }
805 }
806}
807
808impl From<acp::ToolCallUpdate> for ToolCallUpdate {
809 fn from(update: acp::ToolCallUpdate) -> Self {
810 Self::UpdateFields(update)
811 }
812}
813
814impl From<ToolCallUpdateDiff> for ToolCallUpdate {
815 fn from(diff: ToolCallUpdateDiff) -> Self {
816 Self::UpdateDiff(diff)
817 }
818}
819
820#[derive(Debug, PartialEq)]
821pub struct ToolCallUpdateDiff {
822 pub id: acp::ToolCallId,
823 pub diff: Entity<Diff>,
824}
825
826impl From<ToolCallUpdateTerminal> for ToolCallUpdate {
827 fn from(terminal: ToolCallUpdateTerminal) -> Self {
828 Self::UpdateTerminal(terminal)
829 }
830}
831
832#[derive(Debug, PartialEq)]
833pub struct ToolCallUpdateTerminal {
834 pub id: acp::ToolCallId,
835 pub terminal: Entity<Terminal>,
836}
837
838#[derive(Debug, Default)]
839pub struct Plan {
840 pub entries: Vec<PlanEntry>,
841}
842
843#[derive(Debug)]
844pub struct PlanStats<'a> {
845 pub in_progress_entry: Option<&'a PlanEntry>,
846 pub pending: u32,
847 pub completed: u32,
848}
849
850impl Plan {
851 pub fn is_empty(&self) -> bool {
852 self.entries.is_empty()
853 }
854
855 pub fn stats(&self) -> PlanStats<'_> {
856 let mut stats = PlanStats {
857 in_progress_entry: None,
858 pending: 0,
859 completed: 0,
860 };
861
862 for entry in &self.entries {
863 match &entry.status {
864 acp::PlanEntryStatus::Pending => {
865 stats.pending += 1;
866 }
867 acp::PlanEntryStatus::InProgress => {
868 stats.in_progress_entry = stats.in_progress_entry.or(Some(entry));
869 }
870 acp::PlanEntryStatus::Completed => {
871 stats.completed += 1;
872 }
873 _ => {}
874 }
875 }
876
877 stats
878 }
879}
880
881#[derive(Debug)]
882pub struct PlanEntry {
883 pub content: Entity<Markdown>,
884 pub priority: acp::PlanEntryPriority,
885 pub status: acp::PlanEntryStatus,
886}
887
888impl PlanEntry {
889 pub fn from_acp(entry: acp::PlanEntry, cx: &mut App) -> Self {
890 Self {
891 content: cx.new(|cx| Markdown::new(entry.content.into(), None, None, cx)),
892 priority: entry.priority,
893 status: entry.status,
894 }
895 }
896}
897
898#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
899pub struct TokenUsage {
900 pub max_tokens: u64,
901 pub used_tokens: u64,
902 pub input_tokens: u64,
903 pub output_tokens: u64,
904 pub max_output_tokens: Option<u64>,
905}
906
907pub const TOKEN_USAGE_WARNING_THRESHOLD: f32 = 0.8;
908
909impl TokenUsage {
910 pub fn ratio(&self) -> TokenUsageRatio {
911 #[cfg(debug_assertions)]
912 let warning_threshold: f32 = std::env::var("ZED_THREAD_WARNING_THRESHOLD")
913 .unwrap_or(TOKEN_USAGE_WARNING_THRESHOLD.to_string())
914 .parse()
915 .unwrap();
916 #[cfg(not(debug_assertions))]
917 let warning_threshold: f32 = TOKEN_USAGE_WARNING_THRESHOLD;
918
919 // When the maximum is unknown because there is no selected model,
920 // avoid showing the token limit warning.
921 if self.max_tokens == 0 {
922 TokenUsageRatio::Normal
923 } else if self.used_tokens >= self.max_tokens {
924 TokenUsageRatio::Exceeded
925 } else if self.used_tokens as f32 / self.max_tokens as f32 >= warning_threshold {
926 TokenUsageRatio::Warning
927 } else {
928 TokenUsageRatio::Normal
929 }
930 }
931}
932
933#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
934pub enum TokenUsageRatio {
935 Normal,
936 Warning,
937 Exceeded,
938}
939
940#[derive(Debug, Clone)]
941pub struct RetryStatus {
942 pub last_error: SharedString,
943 pub attempt: usize,
944 pub max_attempts: usize,
945 pub started_at: Instant,
946 pub duration: Duration,
947}
948
949struct RunningTurn {
950 id: u32,
951 send_task: Task<()>,
952}
953
954pub struct AcpThread {
955 parent_session_id: Option<acp::SessionId>,
956 title: SharedString,
957 entries: Vec<AgentThreadEntry>,
958 plan: Plan,
959 project: Entity<Project>,
960 action_log: Entity<ActionLog>,
961 shared_buffers: HashMap<Entity<Buffer>, BufferSnapshot>,
962 turn_id: u32,
963 running_turn: Option<RunningTurn>,
964 connection: Rc<dyn AgentConnection>,
965 session_id: acp::SessionId,
966 token_usage: Option<TokenUsage>,
967 prompt_capabilities: acp::PromptCapabilities,
968 _observe_prompt_capabilities: Task<anyhow::Result<()>>,
969 terminals: HashMap<acp::TerminalId, Entity<Terminal>>,
970 pending_terminal_output: HashMap<acp::TerminalId, Vec<Vec<u8>>>,
971 pending_terminal_exit: HashMap<acp::TerminalId, acp::TerminalExitStatus>,
972 had_error: bool,
973 /// The user's unsent prompt text, persisted so it can be restored when reloading the thread.
974 draft_prompt: Option<Vec<acp::ContentBlock>>,
975}
976
977impl From<&AcpThread> for ActionLogTelemetry {
978 fn from(value: &AcpThread) -> Self {
979 Self {
980 agent_telemetry_id: value.connection().telemetry_id(),
981 session_id: value.session_id.0.clone(),
982 }
983 }
984}
985
986#[derive(Debug)]
987pub enum AcpThreadEvent {
988 NewEntry,
989 TitleUpdated,
990 TokenUsageUpdated,
991 EntryUpdated(usize),
992 EntriesRemoved(Range<usize>),
993 ToolAuthorizationRequested(acp::ToolCallId),
994 ToolAuthorizationReceived(acp::ToolCallId),
995 Retry(RetryStatus),
996 SubagentSpawned(acp::SessionId),
997 Stopped(acp::StopReason),
998 Error,
999 LoadError(LoadError),
1000 PromptCapabilitiesUpdated,
1001 Refusal,
1002 AvailableCommandsUpdated(Vec<acp::AvailableCommand>),
1003 ModeUpdated(acp::SessionModeId),
1004 ConfigOptionsUpdated(Vec<acp::SessionConfigOption>),
1005}
1006
1007impl EventEmitter<AcpThreadEvent> for AcpThread {}
1008
1009#[derive(Debug, Clone)]
1010pub enum TerminalProviderEvent {
1011 Created {
1012 terminal_id: acp::TerminalId,
1013 label: String,
1014 cwd: Option<PathBuf>,
1015 output_byte_limit: Option<u64>,
1016 terminal: Entity<::terminal::Terminal>,
1017 },
1018 Output {
1019 terminal_id: acp::TerminalId,
1020 data: Vec<u8>,
1021 },
1022 TitleChanged {
1023 terminal_id: acp::TerminalId,
1024 title: String,
1025 },
1026 Exit {
1027 terminal_id: acp::TerminalId,
1028 status: acp::TerminalExitStatus,
1029 },
1030}
1031
1032#[derive(Debug, Clone)]
1033pub enum TerminalProviderCommand {
1034 WriteInput {
1035 terminal_id: acp::TerminalId,
1036 bytes: Vec<u8>,
1037 },
1038 Resize {
1039 terminal_id: acp::TerminalId,
1040 cols: u16,
1041 rows: u16,
1042 },
1043 Close {
1044 terminal_id: acp::TerminalId,
1045 },
1046}
1047
1048impl AcpThread {
1049 pub fn on_terminal_provider_event(
1050 &mut self,
1051 event: TerminalProviderEvent,
1052 cx: &mut Context<Self>,
1053 ) {
1054 match event {
1055 TerminalProviderEvent::Created {
1056 terminal_id,
1057 label,
1058 cwd,
1059 output_byte_limit,
1060 terminal,
1061 } => {
1062 let entity = self.register_terminal_created(
1063 terminal_id.clone(),
1064 label,
1065 cwd,
1066 output_byte_limit,
1067 terminal,
1068 cx,
1069 );
1070
1071 if let Some(mut chunks) = self.pending_terminal_output.remove(&terminal_id) {
1072 for data in chunks.drain(..) {
1073 entity.update(cx, |term, cx| {
1074 term.inner().update(cx, |inner, cx| {
1075 inner.write_output(&data, cx);
1076 })
1077 });
1078 }
1079 }
1080
1081 if let Some(_status) = self.pending_terminal_exit.remove(&terminal_id) {
1082 entity.update(cx, |_term, cx| {
1083 cx.notify();
1084 });
1085 }
1086
1087 cx.notify();
1088 }
1089 TerminalProviderEvent::Output { terminal_id, data } => {
1090 if let Some(entity) = self.terminals.get(&terminal_id) {
1091 entity.update(cx, |term, cx| {
1092 term.inner().update(cx, |inner, cx| {
1093 inner.write_output(&data, cx);
1094 })
1095 });
1096 } else {
1097 self.pending_terminal_output
1098 .entry(terminal_id)
1099 .or_default()
1100 .push(data);
1101 }
1102 }
1103 TerminalProviderEvent::TitleChanged { terminal_id, title } => {
1104 if let Some(entity) = self.terminals.get(&terminal_id) {
1105 entity.update(cx, |term, cx| {
1106 term.inner().update(cx, |inner, cx| {
1107 inner.breadcrumb_text = title;
1108 cx.emit(::terminal::Event::BreadcrumbsChanged);
1109 })
1110 });
1111 }
1112 }
1113 TerminalProviderEvent::Exit {
1114 terminal_id,
1115 status,
1116 } => {
1117 if let Some(entity) = self.terminals.get(&terminal_id) {
1118 entity.update(cx, |_term, cx| {
1119 cx.notify();
1120 });
1121 } else {
1122 self.pending_terminal_exit.insert(terminal_id, status);
1123 }
1124 }
1125 }
1126 }
1127}
1128
1129#[derive(PartialEq, Eq, Debug)]
1130pub enum ThreadStatus {
1131 Idle,
1132 Generating,
1133}
1134
1135#[derive(Debug, Clone)]
1136pub enum LoadError {
1137 Unsupported {
1138 command: SharedString,
1139 current_version: SharedString,
1140 minimum_version: SharedString,
1141 },
1142 FailedToInstall(SharedString),
1143 Exited {
1144 status: ExitStatus,
1145 },
1146 Other(SharedString),
1147}
1148
1149impl Display for LoadError {
1150 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1151 match self {
1152 LoadError::Unsupported {
1153 command: path,
1154 current_version,
1155 minimum_version,
1156 } => {
1157 write!(
1158 f,
1159 "version {current_version} from {path} is not supported (need at least {minimum_version})"
1160 )
1161 }
1162 LoadError::FailedToInstall(msg) => write!(f, "Failed to install: {msg}"),
1163 LoadError::Exited { status } => write!(f, "Server exited with status {status}"),
1164 LoadError::Other(msg) => write!(f, "{msg}"),
1165 }
1166 }
1167}
1168
1169impl Error for LoadError {}
1170
1171impl AcpThread {
1172 pub fn new(
1173 parent_session_id: Option<acp::SessionId>,
1174 title: impl Into<SharedString>,
1175 connection: Rc<dyn AgentConnection>,
1176 project: Entity<Project>,
1177 action_log: Entity<ActionLog>,
1178 session_id: acp::SessionId,
1179 mut prompt_capabilities_rx: watch::Receiver<acp::PromptCapabilities>,
1180 cx: &mut Context<Self>,
1181 ) -> Self {
1182 let prompt_capabilities = prompt_capabilities_rx.borrow().clone();
1183 let task = cx.spawn::<_, anyhow::Result<()>>(async move |this, cx| {
1184 loop {
1185 let caps = prompt_capabilities_rx.recv().await?;
1186 this.update(cx, |this, cx| {
1187 this.prompt_capabilities = caps;
1188 cx.emit(AcpThreadEvent::PromptCapabilitiesUpdated);
1189 })?;
1190 }
1191 });
1192
1193 Self {
1194 parent_session_id,
1195 action_log,
1196 shared_buffers: Default::default(),
1197 entries: Default::default(),
1198 plan: Default::default(),
1199 title: title.into(),
1200 project,
1201 running_turn: None,
1202 turn_id: 0,
1203 connection,
1204 session_id,
1205 token_usage: None,
1206 prompt_capabilities,
1207 _observe_prompt_capabilities: task,
1208 terminals: HashMap::default(),
1209 pending_terminal_output: HashMap::default(),
1210 pending_terminal_exit: HashMap::default(),
1211 had_error: false,
1212 draft_prompt: None,
1213 }
1214 }
1215
1216 pub fn parent_session_id(&self) -> Option<&acp::SessionId> {
1217 self.parent_session_id.as_ref()
1218 }
1219
1220 pub fn prompt_capabilities(&self) -> acp::PromptCapabilities {
1221 self.prompt_capabilities.clone()
1222 }
1223
1224 pub fn draft_prompt(&self) -> Option<&[acp::ContentBlock]> {
1225 self.draft_prompt.as_deref()
1226 }
1227
1228 pub fn set_draft_prompt(&mut self, prompt: Option<Vec<acp::ContentBlock>>) {
1229 self.draft_prompt = prompt;
1230 }
1231
1232 pub fn connection(&self) -> &Rc<dyn AgentConnection> {
1233 &self.connection
1234 }
1235
1236 pub fn action_log(&self) -> &Entity<ActionLog> {
1237 &self.action_log
1238 }
1239
1240 pub fn project(&self) -> &Entity<Project> {
1241 &self.project
1242 }
1243
1244 pub fn title(&self) -> SharedString {
1245 self.title.clone()
1246 }
1247
1248 pub fn entries(&self) -> &[AgentThreadEntry] {
1249 &self.entries
1250 }
1251
1252 pub fn session_id(&self) -> &acp::SessionId {
1253 &self.session_id
1254 }
1255
1256 pub fn status(&self) -> ThreadStatus {
1257 if self.running_turn.is_some() {
1258 ThreadStatus::Generating
1259 } else {
1260 ThreadStatus::Idle
1261 }
1262 }
1263
1264 pub fn had_error(&self) -> bool {
1265 self.had_error
1266 }
1267
1268 pub fn is_waiting_for_confirmation(&self) -> bool {
1269 for entry in self.entries.iter().rev() {
1270 match entry {
1271 AgentThreadEntry::UserMessage(_) => return false,
1272 AgentThreadEntry::ToolCall(ToolCall {
1273 status: ToolCallStatus::WaitingForConfirmation { .. },
1274 ..
1275 }) => return true,
1276 AgentThreadEntry::ToolCall(_) | AgentThreadEntry::AssistantMessage(_) => {}
1277 }
1278 }
1279 false
1280 }
1281
1282 pub fn token_usage(&self) -> Option<&TokenUsage> {
1283 self.token_usage.as_ref()
1284 }
1285
1286 pub fn has_pending_edit_tool_calls(&self) -> bool {
1287 for entry in self.entries.iter().rev() {
1288 match entry {
1289 AgentThreadEntry::UserMessage(_) => return false,
1290 AgentThreadEntry::ToolCall(
1291 call @ ToolCall {
1292 status: ToolCallStatus::InProgress | ToolCallStatus::Pending,
1293 ..
1294 },
1295 ) if call.diffs().next().is_some() => {
1296 return true;
1297 }
1298 AgentThreadEntry::ToolCall(_) | AgentThreadEntry::AssistantMessage(_) => {}
1299 }
1300 }
1301
1302 false
1303 }
1304
1305 pub fn has_in_progress_tool_calls(&self) -> bool {
1306 for entry in self.entries.iter().rev() {
1307 match entry {
1308 AgentThreadEntry::UserMessage(_) => return false,
1309 AgentThreadEntry::ToolCall(ToolCall {
1310 status: ToolCallStatus::InProgress | ToolCallStatus::Pending,
1311 ..
1312 }) => {
1313 return true;
1314 }
1315 AgentThreadEntry::ToolCall(_) | AgentThreadEntry::AssistantMessage(_) => {}
1316 }
1317 }
1318
1319 false
1320 }
1321
1322 pub fn used_tools_since_last_user_message(&self) -> bool {
1323 for entry in self.entries.iter().rev() {
1324 match entry {
1325 AgentThreadEntry::UserMessage(..) => return false,
1326 AgentThreadEntry::AssistantMessage(..) => continue,
1327 AgentThreadEntry::ToolCall(..) => return true,
1328 }
1329 }
1330
1331 false
1332 }
1333
1334 pub fn handle_session_update(
1335 &mut self,
1336 update: acp::SessionUpdate,
1337 cx: &mut Context<Self>,
1338 ) -> Result<(), acp::Error> {
1339 match update {
1340 acp::SessionUpdate::UserMessageChunk(acp::ContentChunk { content, .. }) => {
1341 self.push_user_content_block(None, content, cx);
1342 }
1343 acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk { content, .. }) => {
1344 self.push_assistant_content_block(content, false, cx);
1345 }
1346 acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk { content, .. }) => {
1347 self.push_assistant_content_block(content, true, cx);
1348 }
1349 acp::SessionUpdate::ToolCall(tool_call) => {
1350 self.upsert_tool_call(tool_call, cx)?;
1351 }
1352 acp::SessionUpdate::ToolCallUpdate(tool_call_update) => {
1353 self.update_tool_call(tool_call_update, cx)?;
1354 }
1355 acp::SessionUpdate::Plan(plan) => {
1356 self.update_plan(plan, cx);
1357 }
1358 acp::SessionUpdate::AvailableCommandsUpdate(acp::AvailableCommandsUpdate {
1359 available_commands,
1360 ..
1361 }) => cx.emit(AcpThreadEvent::AvailableCommandsUpdated(available_commands)),
1362 acp::SessionUpdate::CurrentModeUpdate(acp::CurrentModeUpdate {
1363 current_mode_id,
1364 ..
1365 }) => cx.emit(AcpThreadEvent::ModeUpdated(current_mode_id)),
1366 acp::SessionUpdate::ConfigOptionUpdate(acp::ConfigOptionUpdate {
1367 config_options,
1368 ..
1369 }) => cx.emit(AcpThreadEvent::ConfigOptionsUpdated(config_options)),
1370 _ => {}
1371 }
1372 Ok(())
1373 }
1374
1375 pub fn push_user_content_block(
1376 &mut self,
1377 message_id: Option<UserMessageId>,
1378 chunk: acp::ContentBlock,
1379 cx: &mut Context<Self>,
1380 ) {
1381 self.push_user_content_block_with_indent(message_id, chunk, false, cx)
1382 }
1383
1384 pub fn push_user_content_block_with_indent(
1385 &mut self,
1386 message_id: Option<UserMessageId>,
1387 chunk: acp::ContentBlock,
1388 indented: bool,
1389 cx: &mut Context<Self>,
1390 ) {
1391 let language_registry = self.project.read(cx).languages().clone();
1392 let path_style = self.project.read(cx).path_style(cx);
1393 let entries_len = self.entries.len();
1394
1395 if let Some(last_entry) = self.entries.last_mut()
1396 && let AgentThreadEntry::UserMessage(UserMessage {
1397 id,
1398 content,
1399 chunks,
1400 indented: existing_indented,
1401 ..
1402 }) = last_entry
1403 && *existing_indented == indented
1404 {
1405 *id = message_id.or(id.take());
1406 content.append(chunk.clone(), &language_registry, path_style, cx);
1407 chunks.push(chunk);
1408 let idx = entries_len - 1;
1409 cx.emit(AcpThreadEvent::EntryUpdated(idx));
1410 } else {
1411 let content = ContentBlock::new(chunk.clone(), &language_registry, path_style, cx);
1412 self.push_entry(
1413 AgentThreadEntry::UserMessage(UserMessage {
1414 id: message_id,
1415 content,
1416 chunks: vec![chunk],
1417 checkpoint: None,
1418 indented,
1419 }),
1420 cx,
1421 );
1422 }
1423 }
1424
1425 pub fn push_assistant_content_block(
1426 &mut self,
1427 chunk: acp::ContentBlock,
1428 is_thought: bool,
1429 cx: &mut Context<Self>,
1430 ) {
1431 self.push_assistant_content_block_with_indent(chunk, is_thought, false, cx)
1432 }
1433
1434 pub fn push_assistant_content_block_with_indent(
1435 &mut self,
1436 chunk: acp::ContentBlock,
1437 is_thought: bool,
1438 indented: bool,
1439 cx: &mut Context<Self>,
1440 ) {
1441 let language_registry = self.project.read(cx).languages().clone();
1442 let path_style = self.project.read(cx).path_style(cx);
1443 let entries_len = self.entries.len();
1444 if let Some(last_entry) = self.entries.last_mut()
1445 && let AgentThreadEntry::AssistantMessage(AssistantMessage {
1446 chunks,
1447 indented: existing_indented,
1448 is_subagent_output: _,
1449 }) = last_entry
1450 && *existing_indented == indented
1451 {
1452 let idx = entries_len - 1;
1453 cx.emit(AcpThreadEvent::EntryUpdated(idx));
1454 match (chunks.last_mut(), is_thought) {
1455 (Some(AssistantMessageChunk::Message { block }), false)
1456 | (Some(AssistantMessageChunk::Thought { block }), true) => {
1457 block.append(chunk, &language_registry, path_style, cx)
1458 }
1459 _ => {
1460 let block = ContentBlock::new(chunk, &language_registry, path_style, cx);
1461 if is_thought {
1462 chunks.push(AssistantMessageChunk::Thought { block })
1463 } else {
1464 chunks.push(AssistantMessageChunk::Message { block })
1465 }
1466 }
1467 }
1468 } else {
1469 let block = ContentBlock::new(chunk, &language_registry, path_style, cx);
1470 let chunk = if is_thought {
1471 AssistantMessageChunk::Thought { block }
1472 } else {
1473 AssistantMessageChunk::Message { block }
1474 };
1475
1476 self.push_entry(
1477 AgentThreadEntry::AssistantMessage(AssistantMessage {
1478 chunks: vec![chunk],
1479 indented,
1480 is_subagent_output: false,
1481 }),
1482 cx,
1483 );
1484 }
1485 }
1486
1487 fn push_entry(&mut self, entry: AgentThreadEntry, cx: &mut Context<Self>) {
1488 self.entries.push(entry);
1489 cx.emit(AcpThreadEvent::NewEntry);
1490 }
1491
1492 pub fn can_set_title(&mut self, cx: &mut Context<Self>) -> bool {
1493 self.connection.set_title(&self.session_id, cx).is_some()
1494 }
1495
1496 pub fn set_title(&mut self, title: SharedString, cx: &mut Context<Self>) -> Task<Result<()>> {
1497 if title != self.title {
1498 self.title = title.clone();
1499 cx.emit(AcpThreadEvent::TitleUpdated);
1500 if let Some(set_title) = self.connection.set_title(&self.session_id, cx) {
1501 return set_title.run(title, cx);
1502 }
1503 }
1504 Task::ready(Ok(()))
1505 }
1506
1507 pub fn subagent_spawned(&mut self, session_id: acp::SessionId, cx: &mut Context<Self>) {
1508 cx.emit(AcpThreadEvent::SubagentSpawned(session_id));
1509 }
1510
1511 pub fn update_token_usage(&mut self, usage: Option<TokenUsage>, cx: &mut Context<Self>) {
1512 self.token_usage = usage;
1513 cx.emit(AcpThreadEvent::TokenUsageUpdated);
1514 }
1515
1516 pub fn update_retry_status(&mut self, status: RetryStatus, cx: &mut Context<Self>) {
1517 cx.emit(AcpThreadEvent::Retry(status));
1518 }
1519
1520 pub fn update_tool_call(
1521 &mut self,
1522 update: impl Into<ToolCallUpdate>,
1523 cx: &mut Context<Self>,
1524 ) -> Result<()> {
1525 let update = update.into();
1526 let languages = self.project.read(cx).languages().clone();
1527 let path_style = self.project.read(cx).path_style(cx);
1528
1529 let ix = match self.index_for_tool_call(update.id()) {
1530 Some(ix) => ix,
1531 None => {
1532 // Tool call not found - create a failed tool call entry
1533 let failed_tool_call = ToolCall {
1534 id: update.id().clone(),
1535 label: cx.new(|cx| Markdown::new("Tool call not found".into(), None, None, cx)),
1536 kind: acp::ToolKind::Fetch,
1537 content: vec![ToolCallContent::ContentBlock(ContentBlock::new(
1538 "Tool call not found".into(),
1539 &languages,
1540 path_style,
1541 cx,
1542 ))],
1543 status: ToolCallStatus::Failed,
1544 locations: Vec::new(),
1545 resolved_locations: Vec::new(),
1546 raw_input: None,
1547 raw_input_markdown: None,
1548 raw_output: None,
1549 tool_name: None,
1550 subagent_session_info: None,
1551 };
1552 self.push_entry(AgentThreadEntry::ToolCall(failed_tool_call), cx);
1553 return Ok(());
1554 }
1555 };
1556 let AgentThreadEntry::ToolCall(call) = &mut self.entries[ix] else {
1557 unreachable!()
1558 };
1559
1560 match update {
1561 ToolCallUpdate::UpdateFields(update) => {
1562 let location_updated = update.fields.locations.is_some();
1563 call.update_fields(
1564 update.fields,
1565 update.meta,
1566 languages,
1567 path_style,
1568 &self.terminals,
1569 cx,
1570 )?;
1571 if location_updated {
1572 self.resolve_locations(update.tool_call_id, cx);
1573 }
1574 }
1575 ToolCallUpdate::UpdateDiff(update) => {
1576 call.content.clear();
1577 call.content.push(ToolCallContent::Diff(update.diff));
1578 }
1579 ToolCallUpdate::UpdateTerminal(update) => {
1580 call.content.clear();
1581 call.content
1582 .push(ToolCallContent::Terminal(update.terminal));
1583 }
1584 }
1585
1586 cx.emit(AcpThreadEvent::EntryUpdated(ix));
1587
1588 Ok(())
1589 }
1590
1591 /// Updates a tool call if id matches an existing entry, otherwise inserts a new one.
1592 pub fn upsert_tool_call(
1593 &mut self,
1594 tool_call: acp::ToolCall,
1595 cx: &mut Context<Self>,
1596 ) -> Result<(), acp::Error> {
1597 let status = tool_call.status.into();
1598 self.upsert_tool_call_inner(tool_call.into(), status, cx)
1599 }
1600
1601 /// Fails if id does not match an existing entry.
1602 pub fn upsert_tool_call_inner(
1603 &mut self,
1604 update: acp::ToolCallUpdate,
1605 status: ToolCallStatus,
1606 cx: &mut Context<Self>,
1607 ) -> Result<(), acp::Error> {
1608 let language_registry = self.project.read(cx).languages().clone();
1609 let path_style = self.project.read(cx).path_style(cx);
1610 let id = update.tool_call_id.clone();
1611
1612 let agent_telemetry_id = self.connection().telemetry_id();
1613 let session = self.session_id();
1614 let parent_session_id = self.parent_session_id();
1615 if let ToolCallStatus::Completed | ToolCallStatus::Failed = status {
1616 let status = if matches!(status, ToolCallStatus::Completed) {
1617 "completed"
1618 } else {
1619 "failed"
1620 };
1621 telemetry::event!(
1622 "Agent Tool Call Completed",
1623 agent_telemetry_id,
1624 session,
1625 parent_session_id,
1626 status
1627 );
1628 }
1629
1630 if let Some(ix) = self.index_for_tool_call(&id) {
1631 let AgentThreadEntry::ToolCall(call) = &mut self.entries[ix] else {
1632 unreachable!()
1633 };
1634
1635 call.update_fields(
1636 update.fields,
1637 update.meta,
1638 language_registry,
1639 path_style,
1640 &self.terminals,
1641 cx,
1642 )?;
1643 call.status = status;
1644
1645 cx.emit(AcpThreadEvent::EntryUpdated(ix));
1646 } else {
1647 let call = ToolCall::from_acp(
1648 update.try_into()?,
1649 status,
1650 language_registry,
1651 self.project.read(cx).path_style(cx),
1652 &self.terminals,
1653 cx,
1654 )?;
1655 self.push_entry(AgentThreadEntry::ToolCall(call), cx);
1656 };
1657
1658 self.resolve_locations(id, cx);
1659 Ok(())
1660 }
1661
1662 fn index_for_tool_call(&self, id: &acp::ToolCallId) -> Option<usize> {
1663 self.entries
1664 .iter()
1665 .enumerate()
1666 .rev()
1667 .find_map(|(index, entry)| {
1668 if let AgentThreadEntry::ToolCall(tool_call) = entry
1669 && &tool_call.id == id
1670 {
1671 Some(index)
1672 } else {
1673 None
1674 }
1675 })
1676 }
1677
1678 fn tool_call_mut(&mut self, id: &acp::ToolCallId) -> Option<(usize, &mut ToolCall)> {
1679 // The tool call we are looking for is typically the last one, or very close to the end.
1680 // At the moment, it doesn't seem like a hashmap would be a good fit for this use case.
1681 self.entries
1682 .iter_mut()
1683 .enumerate()
1684 .rev()
1685 .find_map(|(index, tool_call)| {
1686 if let AgentThreadEntry::ToolCall(tool_call) = tool_call
1687 && &tool_call.id == id
1688 {
1689 Some((index, tool_call))
1690 } else {
1691 None
1692 }
1693 })
1694 }
1695
1696 pub fn tool_call(&self, id: &acp::ToolCallId) -> Option<(usize, &ToolCall)> {
1697 self.entries
1698 .iter()
1699 .enumerate()
1700 .rev()
1701 .find_map(|(index, tool_call)| {
1702 if let AgentThreadEntry::ToolCall(tool_call) = tool_call
1703 && &tool_call.id == id
1704 {
1705 Some((index, tool_call))
1706 } else {
1707 None
1708 }
1709 })
1710 }
1711
1712 pub fn tool_call_for_subagent(&self, session_id: &acp::SessionId) -> Option<&ToolCall> {
1713 self.entries.iter().find_map(|entry| match entry {
1714 AgentThreadEntry::ToolCall(tool_call) => {
1715 if let Some(subagent_session_info) = &tool_call.subagent_session_info
1716 && &subagent_session_info.session_id == session_id
1717 {
1718 Some(tool_call)
1719 } else {
1720 None
1721 }
1722 }
1723 _ => None,
1724 })
1725 }
1726
1727 pub fn resolve_locations(&mut self, id: acp::ToolCallId, cx: &mut Context<Self>) {
1728 let project = self.project.clone();
1729 let should_update_agent_location = self.parent_session_id.is_none();
1730 let Some((_, tool_call)) = self.tool_call_mut(&id) else {
1731 return;
1732 };
1733 let task = tool_call.resolve_locations(project, cx);
1734 cx.spawn(async move |this, cx| {
1735 let resolved_locations = task.await;
1736
1737 this.update(cx, |this, cx| {
1738 let project = this.project.clone();
1739
1740 for location in resolved_locations.iter().flatten() {
1741 this.shared_buffers
1742 .insert(location.buffer.clone(), location.buffer.read(cx).snapshot());
1743 }
1744 let Some((ix, tool_call)) = this.tool_call_mut(&id) else {
1745 return;
1746 };
1747
1748 if let Some(Some(location)) = resolved_locations.last() {
1749 project.update(cx, |project, cx| {
1750 let should_ignore = if let Some(agent_location) = project
1751 .agent_location()
1752 .filter(|agent_location| agent_location.buffer == location.buffer)
1753 {
1754 let snapshot = location.buffer.read(cx).snapshot();
1755 let old_position = agent_location.position.to_point(&snapshot);
1756 let new_position = location.position.to_point(&snapshot);
1757
1758 // ignore this so that when we get updates from the edit tool
1759 // the position doesn't reset to the startof line
1760 old_position.row == new_position.row
1761 && old_position.column > new_position.column
1762 } else {
1763 false
1764 };
1765 if !should_ignore && should_update_agent_location {
1766 project.set_agent_location(Some(location.into()), cx);
1767 }
1768 });
1769 }
1770
1771 let resolved_locations = resolved_locations
1772 .iter()
1773 .map(|l| l.as_ref().map(|l| AgentLocation::from(l)))
1774 .collect::<Vec<_>>();
1775
1776 if tool_call.resolved_locations != resolved_locations {
1777 tool_call.resolved_locations = resolved_locations;
1778 cx.emit(AcpThreadEvent::EntryUpdated(ix));
1779 }
1780 })
1781 })
1782 .detach();
1783 }
1784
1785 pub fn request_tool_call_authorization(
1786 &mut self,
1787 tool_call: acp::ToolCallUpdate,
1788 options: PermissionOptions,
1789 cx: &mut Context<Self>,
1790 ) -> Result<Task<acp::RequestPermissionOutcome>> {
1791 let (tx, rx) = oneshot::channel();
1792
1793 let status = ToolCallStatus::WaitingForConfirmation {
1794 options,
1795 respond_tx: tx,
1796 };
1797
1798 let tool_call_id = tool_call.tool_call_id.clone();
1799 self.upsert_tool_call_inner(tool_call, status, cx)?;
1800 cx.emit(AcpThreadEvent::ToolAuthorizationRequested(
1801 tool_call_id.clone(),
1802 ));
1803
1804 Ok(cx.spawn(async move |this, cx| {
1805 let outcome = match rx.await {
1806 Ok(option) => acp::RequestPermissionOutcome::Selected(
1807 acp::SelectedPermissionOutcome::new(option),
1808 ),
1809 Err(oneshot::Canceled) => acp::RequestPermissionOutcome::Cancelled,
1810 };
1811 this.update(cx, |_this, cx| {
1812 cx.emit(AcpThreadEvent::ToolAuthorizationReceived(tool_call_id))
1813 })
1814 .ok();
1815 outcome
1816 }))
1817 }
1818
1819 pub fn authorize_tool_call(
1820 &mut self,
1821 id: acp::ToolCallId,
1822 option_id: acp::PermissionOptionId,
1823 option_kind: acp::PermissionOptionKind,
1824 cx: &mut Context<Self>,
1825 ) {
1826 let Some((ix, call)) = self.tool_call_mut(&id) else {
1827 return;
1828 };
1829
1830 let new_status = match option_kind {
1831 acp::PermissionOptionKind::RejectOnce | acp::PermissionOptionKind::RejectAlways => {
1832 ToolCallStatus::Rejected
1833 }
1834 acp::PermissionOptionKind::AllowOnce | acp::PermissionOptionKind::AllowAlways => {
1835 ToolCallStatus::InProgress
1836 }
1837 _ => ToolCallStatus::InProgress,
1838 };
1839
1840 let curr_status = mem::replace(&mut call.status, new_status);
1841
1842 if let ToolCallStatus::WaitingForConfirmation { respond_tx, .. } = curr_status {
1843 respond_tx.send(option_id).log_err();
1844 } else if cfg!(debug_assertions) {
1845 panic!("tried to authorize an already authorized tool call");
1846 }
1847
1848 cx.emit(AcpThreadEvent::EntryUpdated(ix));
1849 }
1850
1851 pub fn plan(&self) -> &Plan {
1852 &self.plan
1853 }
1854
1855 pub fn update_plan(&mut self, request: acp::Plan, cx: &mut Context<Self>) {
1856 let new_entries_len = request.entries.len();
1857 let mut new_entries = request.entries.into_iter();
1858
1859 // Reuse existing markdown to prevent flickering
1860 for (old, new) in self.plan.entries.iter_mut().zip(new_entries.by_ref()) {
1861 let PlanEntry {
1862 content,
1863 priority,
1864 status,
1865 } = old;
1866 content.update(cx, |old, cx| {
1867 old.replace(new.content, cx);
1868 });
1869 *priority = new.priority;
1870 *status = new.status;
1871 }
1872 for new in new_entries {
1873 self.plan.entries.push(PlanEntry::from_acp(new, cx))
1874 }
1875 self.plan.entries.truncate(new_entries_len);
1876
1877 cx.notify();
1878 }
1879
1880 fn clear_completed_plan_entries(&mut self, cx: &mut Context<Self>) {
1881 self.plan
1882 .entries
1883 .retain(|entry| !matches!(entry.status, acp::PlanEntryStatus::Completed));
1884 cx.notify();
1885 }
1886
1887 #[cfg(any(test, feature = "test-support"))]
1888 pub fn send_raw(
1889 &mut self,
1890 message: &str,
1891 cx: &mut Context<Self>,
1892 ) -> BoxFuture<'static, Result<Option<acp::PromptResponse>>> {
1893 self.send(vec![message.into()], cx)
1894 }
1895
1896 pub fn send(
1897 &mut self,
1898 message: Vec<acp::ContentBlock>,
1899 cx: &mut Context<Self>,
1900 ) -> BoxFuture<'static, Result<Option<acp::PromptResponse>>> {
1901 let block = ContentBlock::new_combined(
1902 message.clone(),
1903 self.project.read(cx).languages().clone(),
1904 self.project.read(cx).path_style(cx),
1905 cx,
1906 );
1907 let request = acp::PromptRequest::new(self.session_id.clone(), message.clone());
1908 let git_store = self.project.read(cx).git_store().clone();
1909
1910 let message_id = if self.connection.truncate(&self.session_id, cx).is_some() {
1911 Some(UserMessageId::new())
1912 } else {
1913 None
1914 };
1915
1916 self.run_turn(cx, async move |this, cx| {
1917 this.update(cx, |this, cx| {
1918 this.push_entry(
1919 AgentThreadEntry::UserMessage(UserMessage {
1920 id: message_id.clone(),
1921 content: block,
1922 chunks: message,
1923 checkpoint: None,
1924 indented: false,
1925 }),
1926 cx,
1927 );
1928 })
1929 .ok();
1930
1931 let old_checkpoint = git_store
1932 .update(cx, |git, cx| git.checkpoint(cx))
1933 .await
1934 .context("failed to get old checkpoint")
1935 .log_err();
1936 this.update(cx, |this, cx| {
1937 if let Some((_ix, message)) = this.last_user_message() {
1938 message.checkpoint = old_checkpoint.map(|git_checkpoint| Checkpoint {
1939 git_checkpoint,
1940 show: false,
1941 });
1942 }
1943 this.connection.prompt(message_id, request, cx)
1944 })?
1945 .await
1946 })
1947 }
1948
1949 pub fn can_retry(&self, cx: &App) -> bool {
1950 self.connection.retry(&self.session_id, cx).is_some()
1951 }
1952
1953 pub fn retry(
1954 &mut self,
1955 cx: &mut Context<Self>,
1956 ) -> BoxFuture<'static, Result<Option<acp::PromptResponse>>> {
1957 self.run_turn(cx, async move |this, cx| {
1958 this.update(cx, |this, cx| {
1959 this.connection
1960 .retry(&this.session_id, cx)
1961 .map(|retry| retry.run(cx))
1962 })?
1963 .context("retrying a session is not supported")?
1964 .await
1965 })
1966 }
1967
1968 fn run_turn(
1969 &mut self,
1970 cx: &mut Context<Self>,
1971 f: impl 'static + AsyncFnOnce(WeakEntity<Self>, &mut AsyncApp) -> Result<acp::PromptResponse>,
1972 ) -> BoxFuture<'static, Result<Option<acp::PromptResponse>>> {
1973 self.clear_completed_plan_entries(cx);
1974 self.had_error = false;
1975
1976 let (tx, rx) = oneshot::channel();
1977 let cancel_task = self.cancel(cx);
1978
1979 self.turn_id += 1;
1980 let turn_id = self.turn_id;
1981 self.running_turn = Some(RunningTurn {
1982 id: turn_id,
1983 send_task: cx.spawn(async move |this, cx| {
1984 cancel_task.await;
1985 tx.send(f(this, cx).await).ok();
1986 }),
1987 });
1988
1989 cx.spawn(async move |this, cx| {
1990 let response = rx.await;
1991
1992 this.update(cx, |this, cx| this.update_last_checkpoint(cx))?
1993 .await?;
1994
1995 this.update(cx, |this, cx| {
1996 if this.parent_session_id.is_none() {
1997 this.project
1998 .update(cx, |project, cx| project.set_agent_location(None, cx));
1999 }
2000 let Ok(response) = response else {
2001 // tx dropped, just return
2002 return Ok(None);
2003 };
2004
2005 let is_same_turn = this
2006 .running_turn
2007 .as_ref()
2008 .is_some_and(|turn| turn_id == turn.id);
2009
2010 // If the user submitted a follow up message, running_turn might
2011 // already point to a different turn. Therefore we only want to
2012 // take the task if it's the same turn.
2013 if is_same_turn {
2014 this.running_turn.take();
2015 }
2016
2017 match response {
2018 Ok(r) => {
2019 if r.stop_reason == acp::StopReason::MaxTokens {
2020 this.had_error = true;
2021 cx.emit(AcpThreadEvent::Error);
2022 log::error!("Max tokens reached. Usage: {:?}", this.token_usage);
2023 return Err(anyhow!("Max tokens reached"));
2024 }
2025
2026 let canceled = matches!(r.stop_reason, acp::StopReason::Cancelled);
2027 if canceled {
2028 this.mark_pending_tools_as_canceled();
2029 }
2030
2031 // Handle refusal - distinguish between user prompt and tool call refusals
2032 if let acp::StopReason::Refusal = r.stop_reason {
2033 this.had_error = true;
2034 if let Some((user_msg_ix, _)) = this.last_user_message() {
2035 // Check if there's a completed tool call with results after the last user message
2036 // This indicates the refusal is in response to tool output, not the user's prompt
2037 let has_completed_tool_call_after_user_msg =
2038 this.entries.iter().skip(user_msg_ix + 1).any(|entry| {
2039 if let AgentThreadEntry::ToolCall(tool_call) = entry {
2040 // Check if the tool call has completed and has output
2041 matches!(tool_call.status, ToolCallStatus::Completed)
2042 && tool_call.raw_output.is_some()
2043 } else {
2044 false
2045 }
2046 });
2047
2048 if has_completed_tool_call_after_user_msg {
2049 // Refusal is due to tool output - don't truncate, just notify
2050 // The model refused based on what the tool returned
2051 cx.emit(AcpThreadEvent::Refusal);
2052 } else {
2053 // User prompt was refused - truncate back to before the user message
2054 let range = user_msg_ix..this.entries.len();
2055 if range.start < range.end {
2056 this.entries.truncate(user_msg_ix);
2057 cx.emit(AcpThreadEvent::EntriesRemoved(range));
2058 }
2059 cx.emit(AcpThreadEvent::Refusal);
2060 }
2061 } else {
2062 // No user message found, treat as general refusal
2063 cx.emit(AcpThreadEvent::Refusal);
2064 }
2065 }
2066
2067 cx.emit(AcpThreadEvent::Stopped(r.stop_reason));
2068 Ok(Some(r))
2069 }
2070 Err(e) => {
2071 this.had_error = true;
2072 cx.emit(AcpThreadEvent::Error);
2073 log::error!("Error in run turn: {:?}", e);
2074 Err(e)
2075 }
2076 }
2077 })?
2078 })
2079 .boxed()
2080 }
2081
2082 pub fn cancel(&mut self, cx: &mut Context<Self>) -> Task<()> {
2083 let Some(turn) = self.running_turn.take() else {
2084 return Task::ready(());
2085 };
2086 self.connection.cancel(&self.session_id, cx);
2087
2088 self.mark_pending_tools_as_canceled();
2089
2090 // Wait for the send task to complete
2091 cx.background_spawn(turn.send_task)
2092 }
2093
2094 fn mark_pending_tools_as_canceled(&mut self) {
2095 for entry in self.entries.iter_mut() {
2096 if let AgentThreadEntry::ToolCall(call) = entry {
2097 let cancel = matches!(
2098 call.status,
2099 ToolCallStatus::Pending
2100 | ToolCallStatus::WaitingForConfirmation { .. }
2101 | ToolCallStatus::InProgress
2102 );
2103
2104 if cancel {
2105 call.status = ToolCallStatus::Canceled;
2106 }
2107 }
2108 }
2109 }
2110
2111 /// Restores the git working tree to the state at the given checkpoint (if one exists)
2112 pub fn restore_checkpoint(
2113 &mut self,
2114 id: UserMessageId,
2115 cx: &mut Context<Self>,
2116 ) -> Task<Result<()>> {
2117 let Some((_, message)) = self.user_message_mut(&id) else {
2118 return Task::ready(Err(anyhow!("message not found")));
2119 };
2120
2121 let checkpoint = message
2122 .checkpoint
2123 .as_ref()
2124 .map(|c| c.git_checkpoint.clone());
2125
2126 // Cancel any in-progress generation before restoring
2127 let cancel_task = self.cancel(cx);
2128 let rewind = self.rewind(id.clone(), cx);
2129 let git_store = self.project.read(cx).git_store().clone();
2130
2131 cx.spawn(async move |_, cx| {
2132 cancel_task.await;
2133 rewind.await?;
2134 if let Some(checkpoint) = checkpoint {
2135 git_store
2136 .update(cx, |git, cx| git.restore_checkpoint(checkpoint, cx))
2137 .await?;
2138 }
2139
2140 Ok(())
2141 })
2142 }
2143
2144 /// Rewinds this thread to before the entry at `index`, removing it and all
2145 /// subsequent entries while rejecting any action_log changes made from that point.
2146 /// Unlike `restore_checkpoint`, this method does not restore from git.
2147 pub fn rewind(&mut self, id: UserMessageId, cx: &mut Context<Self>) -> Task<Result<()>> {
2148 let Some(truncate) = self.connection.truncate(&self.session_id, cx) else {
2149 return Task::ready(Err(anyhow!("not supported")));
2150 };
2151
2152 let telemetry = ActionLogTelemetry::from(&*self);
2153 cx.spawn(async move |this, cx| {
2154 cx.update(|cx| truncate.run(id.clone(), cx)).await?;
2155 this.update(cx, |this, cx| {
2156 if let Some((ix, _)) = this.user_message_mut(&id) {
2157 // Collect all terminals from entries that will be removed
2158 let terminals_to_remove: Vec<acp::TerminalId> = this.entries[ix..]
2159 .iter()
2160 .flat_map(|entry| entry.terminals())
2161 .filter_map(|terminal| terminal.read(cx).id().clone().into())
2162 .collect();
2163
2164 let range = ix..this.entries.len();
2165 this.entries.truncate(ix);
2166 cx.emit(AcpThreadEvent::EntriesRemoved(range));
2167
2168 // Kill and remove the terminals
2169 for terminal_id in terminals_to_remove {
2170 if let Some(terminal) = this.terminals.remove(&terminal_id) {
2171 terminal.update(cx, |terminal, cx| {
2172 terminal.kill(cx);
2173 });
2174 }
2175 }
2176 }
2177 this.action_log().update(cx, |action_log, cx| {
2178 action_log.reject_all_edits(Some(telemetry), cx)
2179 })
2180 })?
2181 .await;
2182 Ok(())
2183 })
2184 }
2185
2186 fn update_last_checkpoint(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
2187 let git_store = self.project.read(cx).git_store().clone();
2188
2189 let Some((_, message)) = self.last_user_message() else {
2190 return Task::ready(Ok(()));
2191 };
2192 let Some(user_message_id) = message.id.clone() else {
2193 return Task::ready(Ok(()));
2194 };
2195 let Some(checkpoint) = message.checkpoint.as_ref() else {
2196 return Task::ready(Ok(()));
2197 };
2198 let old_checkpoint = checkpoint.git_checkpoint.clone();
2199
2200 let new_checkpoint = git_store.update(cx, |git, cx| git.checkpoint(cx));
2201 cx.spawn(async move |this, cx| {
2202 let Some(new_checkpoint) = new_checkpoint
2203 .await
2204 .context("failed to get new checkpoint")
2205 .log_err()
2206 else {
2207 return Ok(());
2208 };
2209
2210 let equal = git_store
2211 .update(cx, |git, cx| {
2212 git.compare_checkpoints(old_checkpoint.clone(), new_checkpoint, cx)
2213 })
2214 .await
2215 .unwrap_or(true);
2216
2217 this.update(cx, |this, cx| {
2218 if let Some((ix, message)) = this.user_message_mut(&user_message_id) {
2219 if let Some(checkpoint) = message.checkpoint.as_mut() {
2220 checkpoint.show = !equal;
2221 cx.emit(AcpThreadEvent::EntryUpdated(ix));
2222 }
2223 }
2224 })?;
2225
2226 Ok(())
2227 })
2228 }
2229
2230 fn last_user_message(&mut self) -> Option<(usize, &mut UserMessage)> {
2231 self.entries
2232 .iter_mut()
2233 .enumerate()
2234 .rev()
2235 .find_map(|(ix, entry)| {
2236 if let AgentThreadEntry::UserMessage(message) = entry {
2237 Some((ix, message))
2238 } else {
2239 None
2240 }
2241 })
2242 }
2243
2244 fn user_message_mut(&mut self, id: &UserMessageId) -> Option<(usize, &mut UserMessage)> {
2245 self.entries.iter_mut().enumerate().find_map(|(ix, entry)| {
2246 if let AgentThreadEntry::UserMessage(message) = entry {
2247 if message.id.as_ref() == Some(id) {
2248 Some((ix, message))
2249 } else {
2250 None
2251 }
2252 } else {
2253 None
2254 }
2255 })
2256 }
2257
2258 pub fn read_text_file(
2259 &self,
2260 path: PathBuf,
2261 line: Option<u32>,
2262 limit: Option<u32>,
2263 reuse_shared_snapshot: bool,
2264 cx: &mut Context<Self>,
2265 ) -> Task<Result<String, acp::Error>> {
2266 // Args are 1-based, move to 0-based
2267 let line = line.unwrap_or_default().saturating_sub(1);
2268 let limit = limit.unwrap_or(u32::MAX);
2269 let project = self.project.clone();
2270 let action_log = self.action_log.clone();
2271 let should_update_agent_location = self.parent_session_id.is_none();
2272 cx.spawn(async move |this, cx| {
2273 let load = project.update(cx, |project, cx| {
2274 let path = project
2275 .project_path_for_absolute_path(&path, cx)
2276 .ok_or_else(|| {
2277 acp::Error::resource_not_found(Some(path.display().to_string()))
2278 })?;
2279 Ok::<_, acp::Error>(project.open_buffer(path, cx))
2280 })?;
2281
2282 let buffer = load.await?;
2283
2284 let snapshot = if reuse_shared_snapshot {
2285 this.read_with(cx, |this, _| {
2286 this.shared_buffers.get(&buffer.clone()).cloned()
2287 })
2288 .log_err()
2289 .flatten()
2290 } else {
2291 None
2292 };
2293
2294 let snapshot = if let Some(snapshot) = snapshot {
2295 snapshot
2296 } else {
2297 action_log.update(cx, |action_log, cx| {
2298 action_log.buffer_read(buffer.clone(), cx);
2299 });
2300
2301 let snapshot = buffer.update(cx, |buffer, _| buffer.snapshot());
2302 this.update(cx, |this, _| {
2303 this.shared_buffers.insert(buffer.clone(), snapshot.clone());
2304 })?;
2305 snapshot
2306 };
2307
2308 let max_point = snapshot.max_point();
2309 let start_position = Point::new(line, 0);
2310
2311 if start_position > max_point {
2312 return Err(acp::Error::invalid_params().data(format!(
2313 "Attempting to read beyond the end of the file, line {}:{}",
2314 max_point.row + 1,
2315 max_point.column
2316 )));
2317 }
2318
2319 let start = snapshot.anchor_before(start_position);
2320 let end = snapshot.anchor_before(Point::new(line.saturating_add(limit), 0));
2321
2322 if should_update_agent_location {
2323 project.update(cx, |project, cx| {
2324 project.set_agent_location(
2325 Some(AgentLocation {
2326 buffer: buffer.downgrade(),
2327 position: start,
2328 }),
2329 cx,
2330 );
2331 });
2332 }
2333
2334 Ok(snapshot.text_for_range(start..end).collect::<String>())
2335 })
2336 }
2337
2338 pub fn write_text_file(
2339 &self,
2340 path: PathBuf,
2341 content: String,
2342 cx: &mut Context<Self>,
2343 ) -> Task<Result<()>> {
2344 let project = self.project.clone();
2345 let action_log = self.action_log.clone();
2346 let should_update_agent_location = self.parent_session_id.is_none();
2347 cx.spawn(async move |this, cx| {
2348 let load = project.update(cx, |project, cx| {
2349 let path = project
2350 .project_path_for_absolute_path(&path, cx)
2351 .context("invalid path")?;
2352 anyhow::Ok(project.open_buffer(path, cx))
2353 });
2354 let buffer = load?.await?;
2355 let snapshot = this.update(cx, |this, cx| {
2356 this.shared_buffers
2357 .get(&buffer)
2358 .cloned()
2359 .unwrap_or_else(|| buffer.read(cx).snapshot())
2360 })?;
2361 let edits = cx
2362 .background_executor()
2363 .spawn(async move {
2364 let old_text = snapshot.text();
2365 text_diff(old_text.as_str(), &content)
2366 .into_iter()
2367 .map(|(range, replacement)| {
2368 (snapshot.anchor_range_around(range), replacement)
2369 })
2370 .collect::<Vec<_>>()
2371 })
2372 .await;
2373
2374 if should_update_agent_location {
2375 project.update(cx, |project, cx| {
2376 project.set_agent_location(
2377 Some(AgentLocation {
2378 buffer: buffer.downgrade(),
2379 position: edits
2380 .last()
2381 .map(|(range, _)| range.end)
2382 .unwrap_or(Anchor::min_for_buffer(buffer.read(cx).remote_id())),
2383 }),
2384 cx,
2385 );
2386 });
2387 }
2388
2389 let format_on_save = cx.update(|cx| {
2390 action_log.update(cx, |action_log, cx| {
2391 action_log.buffer_read(buffer.clone(), cx);
2392 });
2393
2394 let format_on_save = buffer.update(cx, |buffer, cx| {
2395 buffer.edit(edits, None, cx);
2396
2397 let settings = language::language_settings::language_settings(
2398 buffer.language().map(|l| l.name()),
2399 buffer.file(),
2400 cx,
2401 );
2402
2403 settings.format_on_save != FormatOnSave::Off
2404 });
2405 action_log.update(cx, |action_log, cx| {
2406 action_log.buffer_edited(buffer.clone(), cx);
2407 });
2408 format_on_save
2409 });
2410
2411 if format_on_save {
2412 let format_task = project.update(cx, |project, cx| {
2413 project.format(
2414 HashSet::from_iter([buffer.clone()]),
2415 LspFormatTarget::Buffers,
2416 false,
2417 FormatTrigger::Save,
2418 cx,
2419 )
2420 });
2421 format_task.await.log_err();
2422
2423 action_log.update(cx, |action_log, cx| {
2424 action_log.buffer_edited(buffer.clone(), cx);
2425 });
2426 }
2427
2428 project
2429 .update(cx, |project, cx| project.save_buffer(buffer, cx))
2430 .await
2431 })
2432 }
2433
2434 pub fn create_terminal(
2435 &self,
2436 command: String,
2437 args: Vec<String>,
2438 extra_env: Vec<acp::EnvVariable>,
2439 cwd: Option<PathBuf>,
2440 output_byte_limit: Option<u64>,
2441 cx: &mut Context<Self>,
2442 ) -> Task<Result<Entity<Terminal>>> {
2443 let env = match &cwd {
2444 Some(dir) => self.project.update(cx, |project, cx| {
2445 project.environment().update(cx, |env, cx| {
2446 env.directory_environment(dir.as_path().into(), cx)
2447 })
2448 }),
2449 None => Task::ready(None).shared(),
2450 };
2451 let env = cx.spawn(async move |_, _| {
2452 let mut env = env.await.unwrap_or_default();
2453 // Disables paging for `git` and hopefully other commands
2454 env.insert("PAGER".into(), "".into());
2455 for var in extra_env {
2456 env.insert(var.name, var.value);
2457 }
2458 env
2459 });
2460
2461 let project = self.project.clone();
2462 let language_registry = project.read(cx).languages().clone();
2463 let is_windows = project.read(cx).path_style(cx).is_windows();
2464
2465 let terminal_id = acp::TerminalId::new(Uuid::new_v4().to_string());
2466 let terminal_task = cx.spawn({
2467 let terminal_id = terminal_id.clone();
2468 async move |_this, cx| {
2469 let env = env.await;
2470 let shell = project
2471 .update(cx, |project, cx| {
2472 project
2473 .remote_client()
2474 .and_then(|r| r.read(cx).default_system_shell())
2475 })
2476 .unwrap_or_else(|| get_default_system_shell_preferring_bash());
2477 let (task_command, task_args) =
2478 ShellBuilder::new(&Shell::Program(shell), is_windows)
2479 .redirect_stdin_to_dev_null()
2480 .build(Some(command.clone()), &args);
2481 let terminal = project
2482 .update(cx, |project, cx| {
2483 project.create_terminal_task(
2484 task::SpawnInTerminal {
2485 command: Some(task_command),
2486 args: task_args,
2487 cwd: cwd.clone(),
2488 env,
2489 ..Default::default()
2490 },
2491 cx,
2492 )
2493 })
2494 .await?;
2495
2496 anyhow::Ok(cx.new(|cx| {
2497 Terminal::new(
2498 terminal_id,
2499 &format!("{} {}", command, args.join(" ")),
2500 cwd,
2501 output_byte_limit.map(|l| l as usize),
2502 terminal,
2503 language_registry,
2504 cx,
2505 )
2506 }))
2507 }
2508 });
2509
2510 cx.spawn(async move |this, cx| {
2511 let terminal = terminal_task.await?;
2512 this.update(cx, |this, _cx| {
2513 this.terminals.insert(terminal_id, terminal.clone());
2514 terminal
2515 })
2516 })
2517 }
2518
2519 pub fn kill_terminal(
2520 &mut self,
2521 terminal_id: acp::TerminalId,
2522 cx: &mut Context<Self>,
2523 ) -> Result<()> {
2524 self.terminals
2525 .get(&terminal_id)
2526 .context("Terminal not found")?
2527 .update(cx, |terminal, cx| {
2528 terminal.kill(cx);
2529 });
2530
2531 Ok(())
2532 }
2533
2534 pub fn release_terminal(
2535 &mut self,
2536 terminal_id: acp::TerminalId,
2537 cx: &mut Context<Self>,
2538 ) -> Result<()> {
2539 self.terminals
2540 .remove(&terminal_id)
2541 .context("Terminal not found")?
2542 .update(cx, |terminal, cx| {
2543 terminal.kill(cx);
2544 });
2545
2546 Ok(())
2547 }
2548
2549 pub fn terminal(&self, terminal_id: acp::TerminalId) -> Result<Entity<Terminal>> {
2550 self.terminals
2551 .get(&terminal_id)
2552 .context("Terminal not found")
2553 .cloned()
2554 }
2555
2556 pub fn to_markdown(&self, cx: &App) -> String {
2557 self.entries.iter().map(|e| e.to_markdown(cx)).collect()
2558 }
2559
2560 pub fn emit_load_error(&mut self, error: LoadError, cx: &mut Context<Self>) {
2561 cx.emit(AcpThreadEvent::LoadError(error));
2562 }
2563
2564 pub fn register_terminal_created(
2565 &mut self,
2566 terminal_id: acp::TerminalId,
2567 command_label: String,
2568 working_dir: Option<PathBuf>,
2569 output_byte_limit: Option<u64>,
2570 terminal: Entity<::terminal::Terminal>,
2571 cx: &mut Context<Self>,
2572 ) -> Entity<Terminal> {
2573 let language_registry = self.project.read(cx).languages().clone();
2574
2575 let entity = cx.new(|cx| {
2576 Terminal::new(
2577 terminal_id.clone(),
2578 &command_label,
2579 working_dir.clone(),
2580 output_byte_limit.map(|l| l as usize),
2581 terminal,
2582 language_registry,
2583 cx,
2584 )
2585 });
2586 self.terminals.insert(terminal_id.clone(), entity.clone());
2587 entity
2588 }
2589
2590 pub fn mark_as_subagent_output(&mut self, cx: &mut Context<Self>) {
2591 for entry in self.entries.iter_mut().rev() {
2592 if let AgentThreadEntry::AssistantMessage(assistant_message) = entry {
2593 assistant_message.is_subagent_output = true;
2594 cx.notify();
2595 return;
2596 }
2597 }
2598 }
2599}
2600
2601fn markdown_for_raw_output(
2602 raw_output: &serde_json::Value,
2603 language_registry: &Arc<LanguageRegistry>,
2604 cx: &mut App,
2605) -> Option<Entity<Markdown>> {
2606 match raw_output {
2607 serde_json::Value::Null => None,
2608 serde_json::Value::Bool(value) => Some(cx.new(|cx| {
2609 Markdown::new(
2610 value.to_string().into(),
2611 Some(language_registry.clone()),
2612 None,
2613 cx,
2614 )
2615 })),
2616 serde_json::Value::Number(value) => Some(cx.new(|cx| {
2617 Markdown::new(
2618 value.to_string().into(),
2619 Some(language_registry.clone()),
2620 None,
2621 cx,
2622 )
2623 })),
2624 serde_json::Value::String(value) => Some(cx.new(|cx| {
2625 Markdown::new(
2626 value.clone().into(),
2627 Some(language_registry.clone()),
2628 None,
2629 cx,
2630 )
2631 })),
2632 value => Some(cx.new(|cx| {
2633 let pretty_json = to_string_pretty(value).unwrap_or_else(|_| value.to_string());
2634
2635 Markdown::new(
2636 format!("```json\n{}\n```", pretty_json).into(),
2637 Some(language_registry.clone()),
2638 None,
2639 cx,
2640 )
2641 })),
2642 }
2643}
2644
2645#[cfg(test)]
2646mod tests {
2647 use super::*;
2648 use anyhow::anyhow;
2649 use futures::{channel::mpsc, future::LocalBoxFuture, select};
2650 use gpui::{App, AsyncApp, TestAppContext, WeakEntity};
2651 use indoc::indoc;
2652 use project::{FakeFs, Fs};
2653 use rand::{distr, prelude::*};
2654 use serde_json::json;
2655 use settings::SettingsStore;
2656 use smol::stream::StreamExt as _;
2657 use std::{
2658 any::Any,
2659 cell::RefCell,
2660 path::Path,
2661 rc::Rc,
2662 sync::atomic::{AtomicBool, AtomicUsize, Ordering::SeqCst},
2663 time::Duration,
2664 };
2665 use util::path;
2666
2667 fn init_test(cx: &mut TestAppContext) {
2668 env_logger::try_init().ok();
2669 cx.update(|cx| {
2670 let settings_store = SettingsStore::test(cx);
2671 cx.set_global(settings_store);
2672 });
2673 }
2674
2675 #[gpui::test]
2676 async fn test_terminal_output_buffered_before_created_renders(cx: &mut gpui::TestAppContext) {
2677 init_test(cx);
2678
2679 let fs = FakeFs::new(cx.executor());
2680 let project = Project::test(fs, [], cx).await;
2681 let connection = Rc::new(FakeAgentConnection::new());
2682 let thread = cx
2683 .update(|cx| connection.new_session(project, std::path::Path::new(path!("/test")), cx))
2684 .await
2685 .unwrap();
2686
2687 let terminal_id = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
2688
2689 // Send Output BEFORE Created - should be buffered by acp_thread
2690 thread.update(cx, |thread, cx| {
2691 thread.on_terminal_provider_event(
2692 TerminalProviderEvent::Output {
2693 terminal_id: terminal_id.clone(),
2694 data: b"hello buffered".to_vec(),
2695 },
2696 cx,
2697 );
2698 });
2699
2700 // Create a display-only terminal and then send Created
2701 let lower = cx.new(|cx| {
2702 let builder = ::terminal::TerminalBuilder::new_display_only(
2703 ::terminal::terminal_settings::CursorShape::default(),
2704 ::terminal::terminal_settings::AlternateScroll::On,
2705 None,
2706 0,
2707 cx.background_executor(),
2708 PathStyle::local(),
2709 )
2710 .unwrap();
2711 builder.subscribe(cx)
2712 });
2713
2714 thread.update(cx, |thread, cx| {
2715 thread.on_terminal_provider_event(
2716 TerminalProviderEvent::Created {
2717 terminal_id: terminal_id.clone(),
2718 label: "Buffered Test".to_string(),
2719 cwd: None,
2720 output_byte_limit: None,
2721 terminal: lower.clone(),
2722 },
2723 cx,
2724 );
2725 });
2726
2727 // After Created, buffered Output should have been flushed into the renderer
2728 let content = thread.read_with(cx, |thread, cx| {
2729 let term = thread.terminal(terminal_id.clone()).unwrap();
2730 term.read_with(cx, |t, cx| t.inner().read(cx).get_content())
2731 });
2732
2733 assert!(
2734 content.contains("hello buffered"),
2735 "expected buffered output to render, got: {content}"
2736 );
2737 }
2738
2739 #[gpui::test]
2740 async fn test_terminal_output_and_exit_buffered_before_created(cx: &mut gpui::TestAppContext) {
2741 init_test(cx);
2742
2743 let fs = FakeFs::new(cx.executor());
2744 let project = Project::test(fs, [], cx).await;
2745 let connection = Rc::new(FakeAgentConnection::new());
2746 let thread = cx
2747 .update(|cx| connection.new_session(project, std::path::Path::new(path!("/test")), cx))
2748 .await
2749 .unwrap();
2750
2751 let terminal_id = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
2752
2753 // Send Output BEFORE Created
2754 thread.update(cx, |thread, cx| {
2755 thread.on_terminal_provider_event(
2756 TerminalProviderEvent::Output {
2757 terminal_id: terminal_id.clone(),
2758 data: b"pre-exit data".to_vec(),
2759 },
2760 cx,
2761 );
2762 });
2763
2764 // Send Exit BEFORE Created
2765 thread.update(cx, |thread, cx| {
2766 thread.on_terminal_provider_event(
2767 TerminalProviderEvent::Exit {
2768 terminal_id: terminal_id.clone(),
2769 status: acp::TerminalExitStatus::new().exit_code(0),
2770 },
2771 cx,
2772 );
2773 });
2774
2775 // Now create a display-only lower-level terminal and send Created
2776 let lower = cx.new(|cx| {
2777 let builder = ::terminal::TerminalBuilder::new_display_only(
2778 ::terminal::terminal_settings::CursorShape::default(),
2779 ::terminal::terminal_settings::AlternateScroll::On,
2780 None,
2781 0,
2782 cx.background_executor(),
2783 PathStyle::local(),
2784 )
2785 .unwrap();
2786 builder.subscribe(cx)
2787 });
2788
2789 thread.update(cx, |thread, cx| {
2790 thread.on_terminal_provider_event(
2791 TerminalProviderEvent::Created {
2792 terminal_id: terminal_id.clone(),
2793 label: "Buffered Exit Test".to_string(),
2794 cwd: None,
2795 output_byte_limit: None,
2796 terminal: lower.clone(),
2797 },
2798 cx,
2799 );
2800 });
2801
2802 // Output should be present after Created (flushed from buffer)
2803 let content = thread.read_with(cx, |thread, cx| {
2804 let term = thread.terminal(terminal_id.clone()).unwrap();
2805 term.read_with(cx, |t, cx| t.inner().read(cx).get_content())
2806 });
2807
2808 assert!(
2809 content.contains("pre-exit data"),
2810 "expected pre-exit data to render, got: {content}"
2811 );
2812 }
2813
2814 /// Test that killing a terminal via Terminal::kill properly:
2815 /// 1. Causes wait_for_exit to complete (doesn't hang forever)
2816 /// 2. The underlying terminal still has the output that was written before the kill
2817 ///
2818 /// This test verifies that the fix to kill_active_task (which now also kills
2819 /// the shell process in addition to the foreground process) properly allows
2820 /// wait_for_exit to complete instead of hanging indefinitely.
2821 #[cfg(unix)]
2822 #[gpui::test]
2823 async fn test_terminal_kill_allows_wait_for_exit_to_complete(cx: &mut gpui::TestAppContext) {
2824 use std::collections::HashMap;
2825 use task::Shell;
2826 use util::shell_builder::ShellBuilder;
2827
2828 init_test(cx);
2829 cx.executor().allow_parking();
2830
2831 let fs = FakeFs::new(cx.executor());
2832 let project = Project::test(fs, [], cx).await;
2833 let connection = Rc::new(FakeAgentConnection::new());
2834 let thread = cx
2835 .update(|cx| connection.new_session(project.clone(), Path::new(path!("/test")), cx))
2836 .await
2837 .unwrap();
2838
2839 let terminal_id = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
2840
2841 // Create a real PTY terminal that runs a command which prints output then sleeps
2842 // We use printf instead of echo and chain with && sleep to ensure proper execution
2843 let (completion_tx, _completion_rx) = smol::channel::unbounded();
2844 let (program, args) = ShellBuilder::new(&Shell::System, false).build(
2845 Some("printf 'output_before_kill\\n' && sleep 60".to_owned()),
2846 &[],
2847 );
2848
2849 let builder = cx
2850 .update(|cx| {
2851 ::terminal::TerminalBuilder::new(
2852 None,
2853 None,
2854 task::Shell::WithArguments {
2855 program,
2856 args,
2857 title_override: None,
2858 },
2859 HashMap::default(),
2860 ::terminal::terminal_settings::CursorShape::default(),
2861 ::terminal::terminal_settings::AlternateScroll::On,
2862 None,
2863 vec![],
2864 0,
2865 false,
2866 0,
2867 Some(completion_tx),
2868 cx,
2869 vec![],
2870 PathStyle::local(),
2871 )
2872 })
2873 .await
2874 .unwrap();
2875
2876 let lower_terminal = cx.new(|cx| builder.subscribe(cx));
2877
2878 // Create the acp_thread Terminal wrapper
2879 thread.update(cx, |thread, cx| {
2880 thread.on_terminal_provider_event(
2881 TerminalProviderEvent::Created {
2882 terminal_id: terminal_id.clone(),
2883 label: "printf output_before_kill && sleep 60".to_string(),
2884 cwd: None,
2885 output_byte_limit: None,
2886 terminal: lower_terminal.clone(),
2887 },
2888 cx,
2889 );
2890 });
2891
2892 // Wait for the printf command to execute and produce output
2893 // Use real time since parking is enabled
2894 cx.executor().timer(Duration::from_millis(500)).await;
2895
2896 // Get the acp_thread Terminal and kill it
2897 let wait_for_exit = thread.update(cx, |thread, cx| {
2898 let term = thread.terminals.get(&terminal_id).unwrap();
2899 let wait_for_exit = term.read(cx).wait_for_exit();
2900 term.update(cx, |term, cx| {
2901 term.kill(cx);
2902 });
2903 wait_for_exit
2904 });
2905
2906 // KEY ASSERTION: wait_for_exit should complete within a reasonable time (not hang).
2907 // Before the fix to kill_active_task, this would hang forever because
2908 // only the foreground process was killed, not the shell, so the PTY
2909 // child never exited and wait_for_completed_task never completed.
2910 let exit_result = futures::select! {
2911 result = futures::FutureExt::fuse(wait_for_exit) => Some(result),
2912 _ = futures::FutureExt::fuse(cx.background_executor.timer(Duration::from_secs(5))) => None,
2913 };
2914
2915 assert!(
2916 exit_result.is_some(),
2917 "wait_for_exit should complete after kill, but it timed out. \
2918 This indicates kill_active_task is not properly killing the shell process."
2919 );
2920
2921 // Give the system a chance to process any pending updates
2922 cx.run_until_parked();
2923
2924 // Verify that the underlying terminal still has the output that was
2925 // written before the kill. This verifies that killing doesn't lose output.
2926 let inner_content = thread.read_with(cx, |thread, cx| {
2927 let term = thread.terminals.get(&terminal_id).unwrap();
2928 term.read(cx).inner().read(cx).get_content()
2929 });
2930
2931 assert!(
2932 inner_content.contains("output_before_kill"),
2933 "Underlying terminal should contain output from before kill, got: {}",
2934 inner_content
2935 );
2936 }
2937
2938 #[gpui::test]
2939 async fn test_push_user_content_block(cx: &mut gpui::TestAppContext) {
2940 init_test(cx);
2941
2942 let fs = FakeFs::new(cx.executor());
2943 let project = Project::test(fs, [], cx).await;
2944 let connection = Rc::new(FakeAgentConnection::new());
2945 let thread = cx
2946 .update(|cx| connection.new_session(project, Path::new(path!("/test")), cx))
2947 .await
2948 .unwrap();
2949
2950 // Test creating a new user message
2951 thread.update(cx, |thread, cx| {
2952 thread.push_user_content_block(None, "Hello, ".into(), cx);
2953 });
2954
2955 thread.update(cx, |thread, cx| {
2956 assert_eq!(thread.entries.len(), 1);
2957 if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
2958 assert_eq!(user_msg.id, None);
2959 assert_eq!(user_msg.content.to_markdown(cx), "Hello, ");
2960 } else {
2961 panic!("Expected UserMessage");
2962 }
2963 });
2964
2965 // Test appending to existing user message
2966 let message_1_id = UserMessageId::new();
2967 thread.update(cx, |thread, cx| {
2968 thread.push_user_content_block(Some(message_1_id.clone()), "world!".into(), cx);
2969 });
2970
2971 thread.update(cx, |thread, cx| {
2972 assert_eq!(thread.entries.len(), 1);
2973 if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
2974 assert_eq!(user_msg.id, Some(message_1_id));
2975 assert_eq!(user_msg.content.to_markdown(cx), "Hello, world!");
2976 } else {
2977 panic!("Expected UserMessage");
2978 }
2979 });
2980
2981 // Test creating new user message after assistant message
2982 thread.update(cx, |thread, cx| {
2983 thread.push_assistant_content_block("Assistant response".into(), false, cx);
2984 });
2985
2986 let message_2_id = UserMessageId::new();
2987 thread.update(cx, |thread, cx| {
2988 thread.push_user_content_block(
2989 Some(message_2_id.clone()),
2990 "New user message".into(),
2991 cx,
2992 );
2993 });
2994
2995 thread.update(cx, |thread, cx| {
2996 assert_eq!(thread.entries.len(), 3);
2997 if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[2] {
2998 assert_eq!(user_msg.id, Some(message_2_id));
2999 assert_eq!(user_msg.content.to_markdown(cx), "New user message");
3000 } else {
3001 panic!("Expected UserMessage at index 2");
3002 }
3003 });
3004 }
3005
3006 #[gpui::test]
3007 async fn test_thinking_concatenation(cx: &mut gpui::TestAppContext) {
3008 init_test(cx);
3009
3010 let fs = FakeFs::new(cx.executor());
3011 let project = Project::test(fs, [], cx).await;
3012 let connection = Rc::new(FakeAgentConnection::new().on_user_message(
3013 |_, thread, mut cx| {
3014 async move {
3015 thread.update(&mut cx, |thread, cx| {
3016 thread
3017 .handle_session_update(
3018 acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk::new(
3019 "Thinking ".into(),
3020 )),
3021 cx,
3022 )
3023 .unwrap();
3024 thread
3025 .handle_session_update(
3026 acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk::new(
3027 "hard!".into(),
3028 )),
3029 cx,
3030 )
3031 .unwrap();
3032 })?;
3033 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3034 }
3035 .boxed_local()
3036 },
3037 ));
3038
3039 let thread = cx
3040 .update(|cx| connection.new_session(project, Path::new(path!("/test")), cx))
3041 .await
3042 .unwrap();
3043
3044 thread
3045 .update(cx, |thread, cx| thread.send_raw("Hello from Zed!", cx))
3046 .await
3047 .unwrap();
3048
3049 let output = thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
3050 assert_eq!(
3051 output,
3052 indoc! {r#"
3053 ## User
3054
3055 Hello from Zed!
3056
3057 ## Assistant
3058
3059 <thinking>
3060 Thinking hard!
3061 </thinking>
3062
3063 "#}
3064 );
3065 }
3066
3067 #[gpui::test]
3068 async fn test_edits_concurrently_to_user(cx: &mut TestAppContext) {
3069 init_test(cx);
3070
3071 let fs = FakeFs::new(cx.executor());
3072 fs.insert_tree(path!("/tmp"), json!({"foo": "one\ntwo\nthree\n"}))
3073 .await;
3074 let project = Project::test(fs.clone(), [], cx).await;
3075 let (read_file_tx, read_file_rx) = oneshot::channel::<()>();
3076 let read_file_tx = Rc::new(RefCell::new(Some(read_file_tx)));
3077 let connection = Rc::new(FakeAgentConnection::new().on_user_message(
3078 move |_, thread, mut cx| {
3079 let read_file_tx = read_file_tx.clone();
3080 async move {
3081 let content = thread
3082 .update(&mut cx, |thread, cx| {
3083 thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
3084 })
3085 .unwrap()
3086 .await
3087 .unwrap();
3088 assert_eq!(content, "one\ntwo\nthree\n");
3089 read_file_tx.take().unwrap().send(()).unwrap();
3090 thread
3091 .update(&mut cx, |thread, cx| {
3092 thread.write_text_file(
3093 path!("/tmp/foo").into(),
3094 "one\ntwo\nthree\nfour\nfive\n".to_string(),
3095 cx,
3096 )
3097 })
3098 .unwrap()
3099 .await
3100 .unwrap();
3101 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3102 }
3103 .boxed_local()
3104 },
3105 ));
3106
3107 let (worktree, pathbuf) = project
3108 .update(cx, |project, cx| {
3109 project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
3110 })
3111 .await
3112 .unwrap();
3113 let buffer = project
3114 .update(cx, |project, cx| {
3115 project.open_buffer((worktree.read(cx).id(), pathbuf), cx)
3116 })
3117 .await
3118 .unwrap();
3119
3120 let thread = cx
3121 .update(|cx| connection.new_session(project, Path::new(path!("/tmp")), cx))
3122 .await
3123 .unwrap();
3124
3125 let request = thread.update(cx, |thread, cx| {
3126 thread.send_raw("Extend the count in /tmp/foo", cx)
3127 });
3128 read_file_rx.await.ok();
3129 buffer.update(cx, |buffer, cx| {
3130 buffer.edit([(0..0, "zero\n".to_string())], None, cx);
3131 });
3132 cx.run_until_parked();
3133 assert_eq!(
3134 buffer.read_with(cx, |buffer, _| buffer.text()),
3135 "zero\none\ntwo\nthree\nfour\nfive\n"
3136 );
3137 assert_eq!(
3138 String::from_utf8(fs.read_file_sync(path!("/tmp/foo")).unwrap()).unwrap(),
3139 "zero\none\ntwo\nthree\nfour\nfive\n"
3140 );
3141 request.await.unwrap();
3142 }
3143
3144 #[gpui::test]
3145 async fn test_reading_from_line(cx: &mut TestAppContext) {
3146 init_test(cx);
3147
3148 let fs = FakeFs::new(cx.executor());
3149 fs.insert_tree(path!("/tmp"), json!({"foo": "one\ntwo\nthree\nfour\n"}))
3150 .await;
3151 let project = Project::test(fs.clone(), [], cx).await;
3152 project
3153 .update(cx, |project, cx| {
3154 project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
3155 })
3156 .await
3157 .unwrap();
3158
3159 let connection = Rc::new(FakeAgentConnection::new());
3160
3161 let thread = cx
3162 .update(|cx| connection.new_session(project, Path::new(path!("/tmp")), cx))
3163 .await
3164 .unwrap();
3165
3166 // Whole file
3167 let content = thread
3168 .update(cx, |thread, cx| {
3169 thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
3170 })
3171 .await
3172 .unwrap();
3173
3174 assert_eq!(content, "one\ntwo\nthree\nfour\n");
3175
3176 // Only start line
3177 let content = thread
3178 .update(cx, |thread, cx| {
3179 thread.read_text_file(path!("/tmp/foo").into(), Some(3), None, false, cx)
3180 })
3181 .await
3182 .unwrap();
3183
3184 assert_eq!(content, "three\nfour\n");
3185
3186 // Only limit
3187 let content = thread
3188 .update(cx, |thread, cx| {
3189 thread.read_text_file(path!("/tmp/foo").into(), None, Some(2), false, cx)
3190 })
3191 .await
3192 .unwrap();
3193
3194 assert_eq!(content, "one\ntwo\n");
3195
3196 // Range
3197 let content = thread
3198 .update(cx, |thread, cx| {
3199 thread.read_text_file(path!("/tmp/foo").into(), Some(2), Some(2), false, cx)
3200 })
3201 .await
3202 .unwrap();
3203
3204 assert_eq!(content, "two\nthree\n");
3205
3206 // Invalid
3207 let err = thread
3208 .update(cx, |thread, cx| {
3209 thread.read_text_file(path!("/tmp/foo").into(), Some(6), Some(2), false, cx)
3210 })
3211 .await
3212 .unwrap_err();
3213
3214 assert_eq!(
3215 err.to_string(),
3216 "Invalid params: \"Attempting to read beyond the end of the file, line 5:0\""
3217 );
3218 }
3219
3220 #[gpui::test]
3221 async fn test_reading_empty_file(cx: &mut TestAppContext) {
3222 init_test(cx);
3223
3224 let fs = FakeFs::new(cx.executor());
3225 fs.insert_tree(path!("/tmp"), json!({"foo": ""})).await;
3226 let project = Project::test(fs.clone(), [], cx).await;
3227 project
3228 .update(cx, |project, cx| {
3229 project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
3230 })
3231 .await
3232 .unwrap();
3233
3234 let connection = Rc::new(FakeAgentConnection::new());
3235
3236 let thread = cx
3237 .update(|cx| connection.new_session(project, Path::new(path!("/tmp")), cx))
3238 .await
3239 .unwrap();
3240
3241 // Whole file
3242 let content = thread
3243 .update(cx, |thread, cx| {
3244 thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
3245 })
3246 .await
3247 .unwrap();
3248
3249 assert_eq!(content, "");
3250
3251 // Only start line
3252 let content = thread
3253 .update(cx, |thread, cx| {
3254 thread.read_text_file(path!("/tmp/foo").into(), Some(1), None, false, cx)
3255 })
3256 .await
3257 .unwrap();
3258
3259 assert_eq!(content, "");
3260
3261 // Only limit
3262 let content = thread
3263 .update(cx, |thread, cx| {
3264 thread.read_text_file(path!("/tmp/foo").into(), None, Some(2), false, cx)
3265 })
3266 .await
3267 .unwrap();
3268
3269 assert_eq!(content, "");
3270
3271 // Range
3272 let content = thread
3273 .update(cx, |thread, cx| {
3274 thread.read_text_file(path!("/tmp/foo").into(), Some(1), Some(1), false, cx)
3275 })
3276 .await
3277 .unwrap();
3278
3279 assert_eq!(content, "");
3280
3281 // Invalid
3282 let err = thread
3283 .update(cx, |thread, cx| {
3284 thread.read_text_file(path!("/tmp/foo").into(), Some(5), Some(2), false, cx)
3285 })
3286 .await
3287 .unwrap_err();
3288
3289 assert_eq!(
3290 err.to_string(),
3291 "Invalid params: \"Attempting to read beyond the end of the file, line 1:0\""
3292 );
3293 }
3294 #[gpui::test]
3295 async fn test_reading_non_existing_file(cx: &mut TestAppContext) {
3296 init_test(cx);
3297
3298 let fs = FakeFs::new(cx.executor());
3299 fs.insert_tree(path!("/tmp"), json!({})).await;
3300 let project = Project::test(fs.clone(), [], cx).await;
3301 project
3302 .update(cx, |project, cx| {
3303 project.find_or_create_worktree(path!("/tmp"), true, cx)
3304 })
3305 .await
3306 .unwrap();
3307
3308 let connection = Rc::new(FakeAgentConnection::new());
3309
3310 let thread = cx
3311 .update(|cx| connection.new_session(project, Path::new(path!("/tmp")), cx))
3312 .await
3313 .unwrap();
3314
3315 // Out of project file
3316 let err = thread
3317 .update(cx, |thread, cx| {
3318 thread.read_text_file(path!("/foo").into(), None, None, false, cx)
3319 })
3320 .await
3321 .unwrap_err();
3322
3323 assert_eq!(err.code, acp::ErrorCode::ResourceNotFound);
3324 }
3325
3326 #[gpui::test]
3327 async fn test_succeeding_canceled_toolcall(cx: &mut TestAppContext) {
3328 init_test(cx);
3329
3330 let fs = FakeFs::new(cx.executor());
3331 let project = Project::test(fs, [], cx).await;
3332 let id = acp::ToolCallId::new("test");
3333
3334 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3335 let id = id.clone();
3336 move |_, thread, mut cx| {
3337 let id = id.clone();
3338 async move {
3339 thread
3340 .update(&mut cx, |thread, cx| {
3341 thread.handle_session_update(
3342 acp::SessionUpdate::ToolCall(
3343 acp::ToolCall::new(id.clone(), "Label")
3344 .kind(acp::ToolKind::Fetch)
3345 .status(acp::ToolCallStatus::InProgress),
3346 ),
3347 cx,
3348 )
3349 })
3350 .unwrap()
3351 .unwrap();
3352 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3353 }
3354 .boxed_local()
3355 }
3356 }));
3357
3358 let thread = cx
3359 .update(|cx| connection.new_session(project, Path::new(path!("/test")), cx))
3360 .await
3361 .unwrap();
3362
3363 let request = thread.update(cx, |thread, cx| {
3364 thread.send_raw("Fetch https://example.com", cx)
3365 });
3366
3367 run_until_first_tool_call(&thread, cx).await;
3368
3369 thread.read_with(cx, |thread, _| {
3370 assert!(matches!(
3371 thread.entries[1],
3372 AgentThreadEntry::ToolCall(ToolCall {
3373 status: ToolCallStatus::InProgress,
3374 ..
3375 })
3376 ));
3377 });
3378
3379 thread.update(cx, |thread, cx| thread.cancel(cx)).await;
3380
3381 thread.read_with(cx, |thread, _| {
3382 assert!(matches!(
3383 &thread.entries[1],
3384 AgentThreadEntry::ToolCall(ToolCall {
3385 status: ToolCallStatus::Canceled,
3386 ..
3387 })
3388 ));
3389 });
3390
3391 thread
3392 .update(cx, |thread, cx| {
3393 thread.handle_session_update(
3394 acp::SessionUpdate::ToolCallUpdate(acp::ToolCallUpdate::new(
3395 id,
3396 acp::ToolCallUpdateFields::new().status(acp::ToolCallStatus::Completed),
3397 )),
3398 cx,
3399 )
3400 })
3401 .unwrap();
3402
3403 request.await.unwrap();
3404
3405 thread.read_with(cx, |thread, _| {
3406 assert!(matches!(
3407 thread.entries[1],
3408 AgentThreadEntry::ToolCall(ToolCall {
3409 status: ToolCallStatus::Completed,
3410 ..
3411 })
3412 ));
3413 });
3414 }
3415
3416 #[gpui::test]
3417 async fn test_no_pending_edits_if_tool_calls_are_completed(cx: &mut TestAppContext) {
3418 init_test(cx);
3419 let fs = FakeFs::new(cx.background_executor.clone());
3420 fs.insert_tree(path!("/test"), json!({})).await;
3421 let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
3422
3423 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3424 move |_, thread, mut cx| {
3425 async move {
3426 thread
3427 .update(&mut cx, |thread, cx| {
3428 thread.handle_session_update(
3429 acp::SessionUpdate::ToolCall(
3430 acp::ToolCall::new("test", "Label")
3431 .kind(acp::ToolKind::Edit)
3432 .status(acp::ToolCallStatus::Completed)
3433 .content(vec![acp::ToolCallContent::Diff(acp::Diff::new(
3434 "/test/test.txt",
3435 "foo",
3436 ))]),
3437 ),
3438 cx,
3439 )
3440 })
3441 .unwrap()
3442 .unwrap();
3443 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3444 }
3445 .boxed_local()
3446 }
3447 }));
3448
3449 let thread = cx
3450 .update(|cx| connection.new_session(project, Path::new(path!("/test")), cx))
3451 .await
3452 .unwrap();
3453
3454 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Hi".into()], cx)))
3455 .await
3456 .unwrap();
3457
3458 assert!(cx.read(|cx| !thread.read(cx).has_pending_edit_tool_calls()));
3459 }
3460
3461 #[gpui::test(iterations = 10)]
3462 async fn test_checkpoints(cx: &mut TestAppContext) {
3463 init_test(cx);
3464 let fs = FakeFs::new(cx.background_executor.clone());
3465 fs.insert_tree(
3466 path!("/test"),
3467 json!({
3468 ".git": {}
3469 }),
3470 )
3471 .await;
3472 let project = Project::test(fs.clone(), [path!("/test").as_ref()], cx).await;
3473
3474 let simulate_changes = Arc::new(AtomicBool::new(true));
3475 let next_filename = Arc::new(AtomicUsize::new(0));
3476 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3477 let simulate_changes = simulate_changes.clone();
3478 let next_filename = next_filename.clone();
3479 let fs = fs.clone();
3480 move |request, thread, mut cx| {
3481 let fs = fs.clone();
3482 let simulate_changes = simulate_changes.clone();
3483 let next_filename = next_filename.clone();
3484 async move {
3485 if simulate_changes.load(SeqCst) {
3486 let filename = format!("/test/file-{}", next_filename.fetch_add(1, SeqCst));
3487 fs.write(Path::new(&filename), b"").await?;
3488 }
3489
3490 let acp::ContentBlock::Text(content) = &request.prompt[0] else {
3491 panic!("expected text content block");
3492 };
3493 thread.update(&mut cx, |thread, cx| {
3494 thread
3495 .handle_session_update(
3496 acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk::new(
3497 content.text.to_uppercase().into(),
3498 )),
3499 cx,
3500 )
3501 .unwrap();
3502 })?;
3503 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3504 }
3505 .boxed_local()
3506 }
3507 }));
3508 let thread = cx
3509 .update(|cx| connection.new_session(project, Path::new(path!("/test")), cx))
3510 .await
3511 .unwrap();
3512
3513 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Lorem".into()], cx)))
3514 .await
3515 .unwrap();
3516 thread.read_with(cx, |thread, cx| {
3517 assert_eq!(
3518 thread.to_markdown(cx),
3519 indoc! {"
3520 ## User (checkpoint)
3521
3522 Lorem
3523
3524 ## Assistant
3525
3526 LOREM
3527
3528 "}
3529 );
3530 });
3531 assert_eq!(fs.files(), vec![Path::new(path!("/test/file-0"))]);
3532
3533 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["ipsum".into()], cx)))
3534 .await
3535 .unwrap();
3536 thread.read_with(cx, |thread, cx| {
3537 assert_eq!(
3538 thread.to_markdown(cx),
3539 indoc! {"
3540 ## User (checkpoint)
3541
3542 Lorem
3543
3544 ## Assistant
3545
3546 LOREM
3547
3548 ## User (checkpoint)
3549
3550 ipsum
3551
3552 ## Assistant
3553
3554 IPSUM
3555
3556 "}
3557 );
3558 });
3559 assert_eq!(
3560 fs.files(),
3561 vec![
3562 Path::new(path!("/test/file-0")),
3563 Path::new(path!("/test/file-1"))
3564 ]
3565 );
3566
3567 // Checkpoint isn't stored when there are no changes.
3568 simulate_changes.store(false, SeqCst);
3569 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["dolor".into()], cx)))
3570 .await
3571 .unwrap();
3572 thread.read_with(cx, |thread, cx| {
3573 assert_eq!(
3574 thread.to_markdown(cx),
3575 indoc! {"
3576 ## User (checkpoint)
3577
3578 Lorem
3579
3580 ## Assistant
3581
3582 LOREM
3583
3584 ## User (checkpoint)
3585
3586 ipsum
3587
3588 ## Assistant
3589
3590 IPSUM
3591
3592 ## User
3593
3594 dolor
3595
3596 ## Assistant
3597
3598 DOLOR
3599
3600 "}
3601 );
3602 });
3603 assert_eq!(
3604 fs.files(),
3605 vec![
3606 Path::new(path!("/test/file-0")),
3607 Path::new(path!("/test/file-1"))
3608 ]
3609 );
3610
3611 // Rewinding the conversation truncates the history and restores the checkpoint.
3612 thread
3613 .update(cx, |thread, cx| {
3614 let AgentThreadEntry::UserMessage(message) = &thread.entries[2] else {
3615 panic!("unexpected entries {:?}", thread.entries)
3616 };
3617 thread.restore_checkpoint(message.id.clone().unwrap(), cx)
3618 })
3619 .await
3620 .unwrap();
3621 thread.read_with(cx, |thread, cx| {
3622 assert_eq!(
3623 thread.to_markdown(cx),
3624 indoc! {"
3625 ## User (checkpoint)
3626
3627 Lorem
3628
3629 ## Assistant
3630
3631 LOREM
3632
3633 "}
3634 );
3635 });
3636 assert_eq!(fs.files(), vec![Path::new(path!("/test/file-0"))]);
3637 }
3638
3639 #[gpui::test]
3640 async fn test_tool_result_refusal(cx: &mut TestAppContext) {
3641 use std::sync::atomic::AtomicUsize;
3642 init_test(cx);
3643
3644 let fs = FakeFs::new(cx.executor());
3645 let project = Project::test(fs, None, cx).await;
3646
3647 // Create a connection that simulates refusal after tool result
3648 let prompt_count = Arc::new(AtomicUsize::new(0));
3649 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3650 let prompt_count = prompt_count.clone();
3651 move |_request, thread, mut cx| {
3652 let count = prompt_count.fetch_add(1, SeqCst);
3653 async move {
3654 if count == 0 {
3655 // First prompt: Generate a tool call with result
3656 thread.update(&mut cx, |thread, cx| {
3657 thread
3658 .handle_session_update(
3659 acp::SessionUpdate::ToolCall(
3660 acp::ToolCall::new("tool1", "Test Tool")
3661 .kind(acp::ToolKind::Fetch)
3662 .status(acp::ToolCallStatus::Completed)
3663 .raw_input(serde_json::json!({"query": "test"}))
3664 .raw_output(serde_json::json!({"result": "inappropriate content"})),
3665 ),
3666 cx,
3667 )
3668 .unwrap();
3669 })?;
3670
3671 // Now return refusal because of the tool result
3672 Ok(acp::PromptResponse::new(acp::StopReason::Refusal))
3673 } else {
3674 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3675 }
3676 }
3677 .boxed_local()
3678 }
3679 }));
3680
3681 let thread = cx
3682 .update(|cx| connection.new_session(project, Path::new(path!("/test")), cx))
3683 .await
3684 .unwrap();
3685
3686 // Track if we see a Refusal event
3687 let saw_refusal_event = Arc::new(std::sync::Mutex::new(false));
3688 let saw_refusal_event_captured = saw_refusal_event.clone();
3689 thread.update(cx, |_thread, cx| {
3690 cx.subscribe(
3691 &thread,
3692 move |_thread, _event_thread, event: &AcpThreadEvent, _cx| {
3693 if matches!(event, AcpThreadEvent::Refusal) {
3694 *saw_refusal_event_captured.lock().unwrap() = true;
3695 }
3696 },
3697 )
3698 .detach();
3699 });
3700
3701 // Send a user message - this will trigger tool call and then refusal
3702 let send_task = thread.update(cx, |thread, cx| thread.send(vec!["Hello".into()], cx));
3703 cx.background_executor.spawn(send_task).detach();
3704 cx.run_until_parked();
3705
3706 // Verify that:
3707 // 1. A Refusal event WAS emitted (because it's a tool result refusal, not user prompt)
3708 // 2. The user message was NOT truncated
3709 assert!(
3710 *saw_refusal_event.lock().unwrap(),
3711 "Refusal event should be emitted for tool result refusals"
3712 );
3713
3714 thread.read_with(cx, |thread, _| {
3715 let entries = thread.entries();
3716 assert!(entries.len() >= 2, "Should have user message and tool call");
3717
3718 // Verify user message is still there
3719 assert!(
3720 matches!(entries[0], AgentThreadEntry::UserMessage(_)),
3721 "User message should not be truncated"
3722 );
3723
3724 // Verify tool call is there with result
3725 if let AgentThreadEntry::ToolCall(tool_call) = &entries[1] {
3726 assert!(
3727 tool_call.raw_output.is_some(),
3728 "Tool call should have output"
3729 );
3730 } else {
3731 panic!("Expected tool call at index 1");
3732 }
3733 });
3734 }
3735
3736 #[gpui::test]
3737 async fn test_user_prompt_refusal_emits_event(cx: &mut TestAppContext) {
3738 init_test(cx);
3739
3740 let fs = FakeFs::new(cx.executor());
3741 let project = Project::test(fs, None, cx).await;
3742
3743 let refuse_next = Arc::new(AtomicBool::new(false));
3744 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3745 let refuse_next = refuse_next.clone();
3746 move |_request, _thread, _cx| {
3747 if refuse_next.load(SeqCst) {
3748 async move { Ok(acp::PromptResponse::new(acp::StopReason::Refusal)) }
3749 .boxed_local()
3750 } else {
3751 async move { Ok(acp::PromptResponse::new(acp::StopReason::EndTurn)) }
3752 .boxed_local()
3753 }
3754 }
3755 }));
3756
3757 let thread = cx
3758 .update(|cx| connection.new_session(project, Path::new(path!("/test")), cx))
3759 .await
3760 .unwrap();
3761
3762 // Track if we see a Refusal event
3763 let saw_refusal_event = Arc::new(std::sync::Mutex::new(false));
3764 let saw_refusal_event_captured = saw_refusal_event.clone();
3765 thread.update(cx, |_thread, cx| {
3766 cx.subscribe(
3767 &thread,
3768 move |_thread, _event_thread, event: &AcpThreadEvent, _cx| {
3769 if matches!(event, AcpThreadEvent::Refusal) {
3770 *saw_refusal_event_captured.lock().unwrap() = true;
3771 }
3772 },
3773 )
3774 .detach();
3775 });
3776
3777 // Send a message that will be refused
3778 refuse_next.store(true, SeqCst);
3779 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["hello".into()], cx)))
3780 .await
3781 .unwrap();
3782
3783 // Verify that a Refusal event WAS emitted for user prompt refusal
3784 assert!(
3785 *saw_refusal_event.lock().unwrap(),
3786 "Refusal event should be emitted for user prompt refusals"
3787 );
3788
3789 // Verify the message was truncated (user prompt refusal)
3790 thread.read_with(cx, |thread, cx| {
3791 assert_eq!(thread.to_markdown(cx), "");
3792 });
3793 }
3794
3795 #[gpui::test]
3796 async fn test_refusal(cx: &mut TestAppContext) {
3797 init_test(cx);
3798 let fs = FakeFs::new(cx.background_executor.clone());
3799 fs.insert_tree(path!("/"), json!({})).await;
3800 let project = Project::test(fs.clone(), [path!("/").as_ref()], cx).await;
3801
3802 let refuse_next = Arc::new(AtomicBool::new(false));
3803 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3804 let refuse_next = refuse_next.clone();
3805 move |request, thread, mut cx| {
3806 let refuse_next = refuse_next.clone();
3807 async move {
3808 if refuse_next.load(SeqCst) {
3809 return Ok(acp::PromptResponse::new(acp::StopReason::Refusal));
3810 }
3811
3812 let acp::ContentBlock::Text(content) = &request.prompt[0] else {
3813 panic!("expected text content block");
3814 };
3815 thread.update(&mut cx, |thread, cx| {
3816 thread
3817 .handle_session_update(
3818 acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk::new(
3819 content.text.to_uppercase().into(),
3820 )),
3821 cx,
3822 )
3823 .unwrap();
3824 })?;
3825 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3826 }
3827 .boxed_local()
3828 }
3829 }));
3830 let thread = cx
3831 .update(|cx| connection.new_session(project, Path::new(path!("/test")), cx))
3832 .await
3833 .unwrap();
3834
3835 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["hello".into()], cx)))
3836 .await
3837 .unwrap();
3838 thread.read_with(cx, |thread, cx| {
3839 assert_eq!(
3840 thread.to_markdown(cx),
3841 indoc! {"
3842 ## User
3843
3844 hello
3845
3846 ## Assistant
3847
3848 HELLO
3849
3850 "}
3851 );
3852 });
3853
3854 // Simulate refusing the second message. The message should be truncated
3855 // when a user prompt is refused.
3856 refuse_next.store(true, SeqCst);
3857 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["world".into()], cx)))
3858 .await
3859 .unwrap();
3860 thread.read_with(cx, |thread, cx| {
3861 assert_eq!(
3862 thread.to_markdown(cx),
3863 indoc! {"
3864 ## User
3865
3866 hello
3867
3868 ## Assistant
3869
3870 HELLO
3871
3872 "}
3873 );
3874 });
3875 }
3876
3877 async fn run_until_first_tool_call(
3878 thread: &Entity<AcpThread>,
3879 cx: &mut TestAppContext,
3880 ) -> usize {
3881 let (mut tx, mut rx) = mpsc::channel::<usize>(1);
3882
3883 let subscription = cx.update(|cx| {
3884 cx.subscribe(thread, move |thread, _, cx| {
3885 for (ix, entry) in thread.read(cx).entries.iter().enumerate() {
3886 if matches!(entry, AgentThreadEntry::ToolCall(_)) {
3887 return tx.try_send(ix).unwrap();
3888 }
3889 }
3890 })
3891 });
3892
3893 select! {
3894 _ = futures::FutureExt::fuse(cx.background_executor.timer(Duration::from_secs(10))) => {
3895 panic!("Timeout waiting for tool call")
3896 }
3897 ix = rx.next().fuse() => {
3898 drop(subscription);
3899 ix.unwrap()
3900 }
3901 }
3902 }
3903
3904 #[derive(Clone, Default)]
3905 struct FakeAgentConnection {
3906 auth_methods: Vec<acp::AuthMethod>,
3907 sessions: Arc<parking_lot::Mutex<HashMap<acp::SessionId, WeakEntity<AcpThread>>>>,
3908 on_user_message: Option<
3909 Rc<
3910 dyn Fn(
3911 acp::PromptRequest,
3912 WeakEntity<AcpThread>,
3913 AsyncApp,
3914 ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
3915 + 'static,
3916 >,
3917 >,
3918 }
3919
3920 impl FakeAgentConnection {
3921 fn new() -> Self {
3922 Self {
3923 auth_methods: Vec::new(),
3924 on_user_message: None,
3925 sessions: Arc::default(),
3926 }
3927 }
3928
3929 #[expect(unused)]
3930 fn with_auth_methods(mut self, auth_methods: Vec<acp::AuthMethod>) -> Self {
3931 self.auth_methods = auth_methods;
3932 self
3933 }
3934
3935 fn on_user_message(
3936 mut self,
3937 handler: impl Fn(
3938 acp::PromptRequest,
3939 WeakEntity<AcpThread>,
3940 AsyncApp,
3941 ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
3942 + 'static,
3943 ) -> Self {
3944 self.on_user_message.replace(Rc::new(handler));
3945 self
3946 }
3947 }
3948
3949 impl AgentConnection for FakeAgentConnection {
3950 fn telemetry_id(&self) -> SharedString {
3951 "fake".into()
3952 }
3953
3954 fn auth_methods(&self) -> &[acp::AuthMethod] {
3955 &self.auth_methods
3956 }
3957
3958 fn new_session(
3959 self: Rc<Self>,
3960 project: Entity<Project>,
3961 _cwd: &Path,
3962 cx: &mut App,
3963 ) -> Task<gpui::Result<Entity<AcpThread>>> {
3964 let session_id = acp::SessionId::new(
3965 rand::rng()
3966 .sample_iter(&distr::Alphanumeric)
3967 .take(7)
3968 .map(char::from)
3969 .collect::<String>(),
3970 );
3971 let action_log = cx.new(|_| ActionLog::new(project.clone()));
3972 let thread = cx.new(|cx| {
3973 AcpThread::new(
3974 None,
3975 "Test",
3976 self.clone(),
3977 project,
3978 action_log,
3979 session_id.clone(),
3980 watch::Receiver::constant(
3981 acp::PromptCapabilities::new()
3982 .image(true)
3983 .audio(true)
3984 .embedded_context(true),
3985 ),
3986 cx,
3987 )
3988 });
3989 self.sessions.lock().insert(session_id, thread.downgrade());
3990 Task::ready(Ok(thread))
3991 }
3992
3993 fn authenticate(&self, method: acp::AuthMethodId, _cx: &mut App) -> Task<gpui::Result<()>> {
3994 if self.auth_methods().iter().any(|m| m.id == method) {
3995 Task::ready(Ok(()))
3996 } else {
3997 Task::ready(Err(anyhow!("Invalid Auth Method")))
3998 }
3999 }
4000
4001 fn prompt(
4002 &self,
4003 _id: Option<UserMessageId>,
4004 params: acp::PromptRequest,
4005 cx: &mut App,
4006 ) -> Task<gpui::Result<acp::PromptResponse>> {
4007 let sessions = self.sessions.lock();
4008 let thread = sessions.get(¶ms.session_id).unwrap();
4009 if let Some(handler) = &self.on_user_message {
4010 let handler = handler.clone();
4011 let thread = thread.clone();
4012 cx.spawn(async move |cx| handler(params, thread, cx.clone()).await)
4013 } else {
4014 Task::ready(Ok(acp::PromptResponse::new(acp::StopReason::EndTurn)))
4015 }
4016 }
4017
4018 fn cancel(&self, _session_id: &acp::SessionId, _cx: &mut App) {}
4019
4020 fn truncate(
4021 &self,
4022 session_id: &acp::SessionId,
4023 _cx: &App,
4024 ) -> Option<Rc<dyn AgentSessionTruncate>> {
4025 Some(Rc::new(FakeAgentSessionEditor {
4026 _session_id: session_id.clone(),
4027 }))
4028 }
4029
4030 fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
4031 self
4032 }
4033 }
4034
4035 struct FakeAgentSessionEditor {
4036 _session_id: acp::SessionId,
4037 }
4038
4039 impl AgentSessionTruncate for FakeAgentSessionEditor {
4040 fn run(&self, _message_id: UserMessageId, _cx: &mut App) -> Task<Result<()>> {
4041 Task::ready(Ok(()))
4042 }
4043 }
4044
4045 #[gpui::test]
4046 async fn test_tool_call_not_found_creates_failed_entry(cx: &mut TestAppContext) {
4047 init_test(cx);
4048
4049 let fs = FakeFs::new(cx.executor());
4050 let project = Project::test(fs, [], cx).await;
4051 let connection = Rc::new(FakeAgentConnection::new());
4052 let thread = cx
4053 .update(|cx| connection.new_session(project, Path::new(path!("/test")), cx))
4054 .await
4055 .unwrap();
4056
4057 // Try to update a tool call that doesn't exist
4058 let nonexistent_id = acp::ToolCallId::new("nonexistent-tool-call");
4059 thread.update(cx, |thread, cx| {
4060 let result = thread.handle_session_update(
4061 acp::SessionUpdate::ToolCallUpdate(acp::ToolCallUpdate::new(
4062 nonexistent_id.clone(),
4063 acp::ToolCallUpdateFields::new().status(acp::ToolCallStatus::Completed),
4064 )),
4065 cx,
4066 );
4067
4068 // The update should succeed (not return an error)
4069 assert!(result.is_ok());
4070
4071 // There should now be exactly one entry in the thread
4072 assert_eq!(thread.entries.len(), 1);
4073
4074 // The entry should be a failed tool call
4075 if let AgentThreadEntry::ToolCall(tool_call) = &thread.entries[0] {
4076 assert_eq!(tool_call.id, nonexistent_id);
4077 assert!(matches!(tool_call.status, ToolCallStatus::Failed));
4078 assert_eq!(tool_call.kind, acp::ToolKind::Fetch);
4079
4080 // Check that the content contains the error message
4081 assert_eq!(tool_call.content.len(), 1);
4082 if let ToolCallContent::ContentBlock(content_block) = &tool_call.content[0] {
4083 match content_block {
4084 ContentBlock::Markdown { markdown } => {
4085 let markdown_text = markdown.read(cx).source();
4086 assert!(markdown_text.contains("Tool call not found"));
4087 }
4088 ContentBlock::Empty => panic!("Expected markdown content, got empty"),
4089 ContentBlock::ResourceLink { .. } => {
4090 panic!("Expected markdown content, got resource link")
4091 }
4092 ContentBlock::Image { .. } => {
4093 panic!("Expected markdown content, got image")
4094 }
4095 }
4096 } else {
4097 panic!("Expected ContentBlock, got: {:?}", tool_call.content[0]);
4098 }
4099 } else {
4100 panic!("Expected ToolCall entry, got: {:?}", thread.entries[0]);
4101 }
4102 });
4103 }
4104
4105 /// Tests that restoring a checkpoint properly cleans up terminals that were
4106 /// created after that checkpoint, and cancels any in-progress generation.
4107 ///
4108 /// Reproduces issue #35142: When a checkpoint is restored, any terminal processes
4109 /// that were started after that checkpoint should be terminated, and any in-progress
4110 /// AI generation should be canceled.
4111 #[gpui::test]
4112 async fn test_restore_checkpoint_kills_terminal(cx: &mut TestAppContext) {
4113 init_test(cx);
4114
4115 let fs = FakeFs::new(cx.executor());
4116 let project = Project::test(fs, [], cx).await;
4117 let connection = Rc::new(FakeAgentConnection::new());
4118 let thread = cx
4119 .update(|cx| connection.new_session(project, Path::new(path!("/test")), cx))
4120 .await
4121 .unwrap();
4122
4123 // Send first user message to create a checkpoint
4124 cx.update(|cx| {
4125 thread.update(cx, |thread, cx| {
4126 thread.send(vec!["first message".into()], cx)
4127 })
4128 })
4129 .await
4130 .unwrap();
4131
4132 // Send second message (creates another checkpoint) - we'll restore to this one
4133 cx.update(|cx| {
4134 thread.update(cx, |thread, cx| {
4135 thread.send(vec!["second message".into()], cx)
4136 })
4137 })
4138 .await
4139 .unwrap();
4140
4141 // Create 2 terminals BEFORE the checkpoint that have completed running
4142 let terminal_id_1 = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
4143 let mock_terminal_1 = cx.new(|cx| {
4144 let builder = ::terminal::TerminalBuilder::new_display_only(
4145 ::terminal::terminal_settings::CursorShape::default(),
4146 ::terminal::terminal_settings::AlternateScroll::On,
4147 None,
4148 0,
4149 cx.background_executor(),
4150 PathStyle::local(),
4151 )
4152 .unwrap();
4153 builder.subscribe(cx)
4154 });
4155
4156 thread.update(cx, |thread, cx| {
4157 thread.on_terminal_provider_event(
4158 TerminalProviderEvent::Created {
4159 terminal_id: terminal_id_1.clone(),
4160 label: "echo 'first'".to_string(),
4161 cwd: Some(PathBuf::from("/test")),
4162 output_byte_limit: None,
4163 terminal: mock_terminal_1.clone(),
4164 },
4165 cx,
4166 );
4167 });
4168
4169 thread.update(cx, |thread, cx| {
4170 thread.on_terminal_provider_event(
4171 TerminalProviderEvent::Output {
4172 terminal_id: terminal_id_1.clone(),
4173 data: b"first\n".to_vec(),
4174 },
4175 cx,
4176 );
4177 });
4178
4179 thread.update(cx, |thread, cx| {
4180 thread.on_terminal_provider_event(
4181 TerminalProviderEvent::Exit {
4182 terminal_id: terminal_id_1.clone(),
4183 status: acp::TerminalExitStatus::new().exit_code(0),
4184 },
4185 cx,
4186 );
4187 });
4188
4189 let terminal_id_2 = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
4190 let mock_terminal_2 = cx.new(|cx| {
4191 let builder = ::terminal::TerminalBuilder::new_display_only(
4192 ::terminal::terminal_settings::CursorShape::default(),
4193 ::terminal::terminal_settings::AlternateScroll::On,
4194 None,
4195 0,
4196 cx.background_executor(),
4197 PathStyle::local(),
4198 )
4199 .unwrap();
4200 builder.subscribe(cx)
4201 });
4202
4203 thread.update(cx, |thread, cx| {
4204 thread.on_terminal_provider_event(
4205 TerminalProviderEvent::Created {
4206 terminal_id: terminal_id_2.clone(),
4207 label: "echo 'second'".to_string(),
4208 cwd: Some(PathBuf::from("/test")),
4209 output_byte_limit: None,
4210 terminal: mock_terminal_2.clone(),
4211 },
4212 cx,
4213 );
4214 });
4215
4216 thread.update(cx, |thread, cx| {
4217 thread.on_terminal_provider_event(
4218 TerminalProviderEvent::Output {
4219 terminal_id: terminal_id_2.clone(),
4220 data: b"second\n".to_vec(),
4221 },
4222 cx,
4223 );
4224 });
4225
4226 thread.update(cx, |thread, cx| {
4227 thread.on_terminal_provider_event(
4228 TerminalProviderEvent::Exit {
4229 terminal_id: terminal_id_2.clone(),
4230 status: acp::TerminalExitStatus::new().exit_code(0),
4231 },
4232 cx,
4233 );
4234 });
4235
4236 // Get the second message ID to restore to
4237 let second_message_id = thread.read_with(cx, |thread, _| {
4238 // At this point we have:
4239 // - Index 0: First user message (with checkpoint)
4240 // - Index 1: Second user message (with checkpoint)
4241 // No assistant responses because FakeAgentConnection just returns EndTurn
4242 let AgentThreadEntry::UserMessage(message) = &thread.entries[1] else {
4243 panic!("expected user message at index 1");
4244 };
4245 message.id.clone().unwrap()
4246 });
4247
4248 // Create a terminal AFTER the checkpoint we'll restore to.
4249 // This simulates the AI agent starting a long-running terminal command.
4250 let terminal_id = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
4251 let mock_terminal = cx.new(|cx| {
4252 let builder = ::terminal::TerminalBuilder::new_display_only(
4253 ::terminal::terminal_settings::CursorShape::default(),
4254 ::terminal::terminal_settings::AlternateScroll::On,
4255 None,
4256 0,
4257 cx.background_executor(),
4258 PathStyle::local(),
4259 )
4260 .unwrap();
4261 builder.subscribe(cx)
4262 });
4263
4264 // Register the terminal as created
4265 thread.update(cx, |thread, cx| {
4266 thread.on_terminal_provider_event(
4267 TerminalProviderEvent::Created {
4268 terminal_id: terminal_id.clone(),
4269 label: "sleep 1000".to_string(),
4270 cwd: Some(PathBuf::from("/test")),
4271 output_byte_limit: None,
4272 terminal: mock_terminal.clone(),
4273 },
4274 cx,
4275 );
4276 });
4277
4278 // Simulate the terminal producing output (still running)
4279 thread.update(cx, |thread, cx| {
4280 thread.on_terminal_provider_event(
4281 TerminalProviderEvent::Output {
4282 terminal_id: terminal_id.clone(),
4283 data: b"terminal is running...\n".to_vec(),
4284 },
4285 cx,
4286 );
4287 });
4288
4289 // Create a tool call entry that references this terminal
4290 // This represents the agent requesting a terminal command
4291 thread.update(cx, |thread, cx| {
4292 thread
4293 .handle_session_update(
4294 acp::SessionUpdate::ToolCall(
4295 acp::ToolCall::new("terminal-tool-1", "Running command")
4296 .kind(acp::ToolKind::Execute)
4297 .status(acp::ToolCallStatus::InProgress)
4298 .content(vec![acp::ToolCallContent::Terminal(acp::Terminal::new(
4299 terminal_id.clone(),
4300 ))])
4301 .raw_input(serde_json::json!({"command": "sleep 1000", "cd": "/test"})),
4302 ),
4303 cx,
4304 )
4305 .unwrap();
4306 });
4307
4308 // Verify terminal exists and is in the thread
4309 let terminal_exists_before =
4310 thread.read_with(cx, |thread, _| thread.terminals.contains_key(&terminal_id));
4311 assert!(
4312 terminal_exists_before,
4313 "Terminal should exist before checkpoint restore"
4314 );
4315
4316 // Verify the terminal's underlying task is still running (not completed)
4317 let terminal_running_before = thread.read_with(cx, |thread, _cx| {
4318 let terminal_entity = thread.terminals.get(&terminal_id).unwrap();
4319 terminal_entity.read_with(cx, |term, _cx| {
4320 term.output().is_none() // output is None means it's still running
4321 })
4322 });
4323 assert!(
4324 terminal_running_before,
4325 "Terminal should be running before checkpoint restore"
4326 );
4327
4328 // Verify we have the expected entries before restore
4329 let entry_count_before = thread.read_with(cx, |thread, _| thread.entries.len());
4330 assert!(
4331 entry_count_before > 1,
4332 "Should have multiple entries before restore"
4333 );
4334
4335 // Restore the checkpoint to the second message.
4336 // This should:
4337 // 1. Cancel any in-progress generation (via the cancel() call)
4338 // 2. Remove the terminal that was created after that point
4339 thread
4340 .update(cx, |thread, cx| {
4341 thread.restore_checkpoint(second_message_id, cx)
4342 })
4343 .await
4344 .unwrap();
4345
4346 // Verify that no send_task is in progress after restore
4347 // (cancel() clears the send_task)
4348 let has_send_task_after = thread.read_with(cx, |thread, _| thread.running_turn.is_some());
4349 assert!(
4350 !has_send_task_after,
4351 "Should not have a send_task after restore (cancel should have cleared it)"
4352 );
4353
4354 // Verify the entries were truncated (restoring to index 1 truncates at 1, keeping only index 0)
4355 let entry_count = thread.read_with(cx, |thread, _| thread.entries.len());
4356 assert_eq!(
4357 entry_count, 1,
4358 "Should have 1 entry after restore (only the first user message)"
4359 );
4360
4361 // Verify the 2 completed terminals from before the checkpoint still exist
4362 let terminal_1_exists = thread.read_with(cx, |thread, _| {
4363 thread.terminals.contains_key(&terminal_id_1)
4364 });
4365 assert!(
4366 terminal_1_exists,
4367 "Terminal 1 (from before checkpoint) should still exist"
4368 );
4369
4370 let terminal_2_exists = thread.read_with(cx, |thread, _| {
4371 thread.terminals.contains_key(&terminal_id_2)
4372 });
4373 assert!(
4374 terminal_2_exists,
4375 "Terminal 2 (from before checkpoint) should still exist"
4376 );
4377
4378 // Verify they're still in completed state
4379 let terminal_1_completed = thread.read_with(cx, |thread, _cx| {
4380 let terminal_entity = thread.terminals.get(&terminal_id_1).unwrap();
4381 terminal_entity.read_with(cx, |term, _cx| term.output().is_some())
4382 });
4383 assert!(terminal_1_completed, "Terminal 1 should still be completed");
4384
4385 let terminal_2_completed = thread.read_with(cx, |thread, _cx| {
4386 let terminal_entity = thread.terminals.get(&terminal_id_2).unwrap();
4387 terminal_entity.read_with(cx, |term, _cx| term.output().is_some())
4388 });
4389 assert!(terminal_2_completed, "Terminal 2 should still be completed");
4390
4391 // Verify the running terminal (created after checkpoint) was removed
4392 let terminal_3_exists =
4393 thread.read_with(cx, |thread, _| thread.terminals.contains_key(&terminal_id));
4394 assert!(
4395 !terminal_3_exists,
4396 "Terminal 3 (created after checkpoint) should have been removed"
4397 );
4398
4399 // Verify total count is 2 (the two from before the checkpoint)
4400 let terminal_count = thread.read_with(cx, |thread, _| thread.terminals.len());
4401 assert_eq!(
4402 terminal_count, 2,
4403 "Should have exactly 2 terminals (the completed ones from before checkpoint)"
4404 );
4405 }
4406
4407 /// Tests that update_last_checkpoint correctly updates the original message's checkpoint
4408 /// even when a new user message is added while the async checkpoint comparison is in progress.
4409 ///
4410 /// This is a regression test for a bug where update_last_checkpoint would fail with
4411 /// "no checkpoint" if a new user message (without a checkpoint) was added between when
4412 /// update_last_checkpoint started and when its async closure ran.
4413 #[gpui::test]
4414 async fn test_update_last_checkpoint_with_new_message_added(cx: &mut TestAppContext) {
4415 init_test(cx);
4416
4417 let fs = FakeFs::new(cx.executor());
4418 fs.insert_tree(path!("/test"), json!({".git": {}, "file.txt": "content"}))
4419 .await;
4420 let project = Project::test(fs.clone(), [Path::new(path!("/test"))], cx).await;
4421
4422 let handler_done = Arc::new(AtomicBool::new(false));
4423 let handler_done_clone = handler_done.clone();
4424 let connection = Rc::new(FakeAgentConnection::new().on_user_message(
4425 move |_, _thread, _cx| {
4426 handler_done_clone.store(true, SeqCst);
4427 async move { Ok(acp::PromptResponse::new(acp::StopReason::EndTurn)) }.boxed_local()
4428 },
4429 ));
4430
4431 let thread = cx
4432 .update(|cx| connection.new_session(project, Path::new(path!("/test")), cx))
4433 .await
4434 .unwrap();
4435
4436 let send_future = thread.update(cx, |thread, cx| thread.send_raw("First message", cx));
4437 let send_task = cx.background_executor.spawn(send_future);
4438
4439 // Tick until handler completes, then a few more to let update_last_checkpoint start
4440 while !handler_done.load(SeqCst) {
4441 cx.executor().tick();
4442 }
4443 for _ in 0..5 {
4444 cx.executor().tick();
4445 }
4446
4447 thread.update(cx, |thread, cx| {
4448 thread.push_entry(
4449 AgentThreadEntry::UserMessage(UserMessage {
4450 id: Some(UserMessageId::new()),
4451 content: ContentBlock::Empty,
4452 chunks: vec!["Injected message (no checkpoint)".into()],
4453 checkpoint: None,
4454 indented: false,
4455 }),
4456 cx,
4457 );
4458 });
4459
4460 cx.run_until_parked();
4461 let result = send_task.await;
4462
4463 assert!(
4464 result.is_ok(),
4465 "send should succeed even when new message added during update_last_checkpoint: {:?}",
4466 result.err()
4467 );
4468 }
4469
4470 /// Tests that when a follow-up message is sent during generation,
4471 /// the first turn completing does NOT clear `running_turn` because
4472 /// it now belongs to the second turn.
4473 #[gpui::test]
4474 async fn test_follow_up_message_during_generation_does_not_clear_turn(cx: &mut TestAppContext) {
4475 init_test(cx);
4476
4477 let fs = FakeFs::new(cx.executor());
4478 let project = Project::test(fs, [], cx).await;
4479
4480 // First handler waits for this signal before completing
4481 let (first_complete_tx, first_complete_rx) = futures::channel::oneshot::channel::<()>();
4482 let first_complete_rx = RefCell::new(Some(first_complete_rx));
4483
4484 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
4485 move |params, _thread, _cx| {
4486 let first_complete_rx = first_complete_rx.borrow_mut().take();
4487 let is_first = params
4488 .prompt
4489 .iter()
4490 .any(|c| matches!(c, acp::ContentBlock::Text(t) if t.text.contains("first")));
4491
4492 async move {
4493 if is_first {
4494 // First handler waits until signaled
4495 if let Some(rx) = first_complete_rx {
4496 rx.await.ok();
4497 }
4498 }
4499 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
4500 }
4501 .boxed_local()
4502 }
4503 }));
4504
4505 let thread = cx
4506 .update(|cx| connection.new_session(project, Path::new(path!("/test")), cx))
4507 .await
4508 .unwrap();
4509
4510 // Send first message (turn_id=1) - handler will block
4511 let first_request = thread.update(cx, |thread, cx| thread.send_raw("first", cx));
4512 assert_eq!(thread.read_with(cx, |t, _| t.turn_id), 1);
4513
4514 // Send second message (turn_id=2) while first is still blocked
4515 // This calls cancel() which takes turn 1's running_turn and sets turn 2's
4516 let second_request = thread.update(cx, |thread, cx| thread.send_raw("second", cx));
4517 assert_eq!(thread.read_with(cx, |t, _| t.turn_id), 2);
4518
4519 let running_turn_after_second_send =
4520 thread.read_with(cx, |thread, _| thread.running_turn.as_ref().map(|t| t.id));
4521 assert_eq!(
4522 running_turn_after_second_send,
4523 Some(2),
4524 "running_turn should be set to turn 2 after sending second message"
4525 );
4526
4527 // Now signal first handler to complete
4528 first_complete_tx.send(()).ok();
4529
4530 // First request completes - should NOT clear running_turn
4531 // because running_turn now belongs to turn 2
4532 first_request.await.unwrap();
4533
4534 let running_turn_after_first =
4535 thread.read_with(cx, |thread, _| thread.running_turn.as_ref().map(|t| t.id));
4536 assert_eq!(
4537 running_turn_after_first,
4538 Some(2),
4539 "first turn completing should not clear running_turn (belongs to turn 2)"
4540 );
4541
4542 // Second request completes - SHOULD clear running_turn
4543 second_request.await.unwrap();
4544
4545 let running_turn_after_second =
4546 thread.read_with(cx, |thread, _| thread.running_turn.is_some());
4547 assert!(
4548 !running_turn_after_second,
4549 "second turn completing should clear running_turn"
4550 );
4551 }
4552
4553 #[gpui::test]
4554 async fn test_send_returns_cancelled_response_and_marks_tools_as_cancelled(
4555 cx: &mut TestAppContext,
4556 ) {
4557 init_test(cx);
4558
4559 let fs = FakeFs::new(cx.executor());
4560 let project = Project::test(fs, [], cx).await;
4561
4562 let connection = Rc::new(FakeAgentConnection::new().on_user_message(
4563 move |_params, thread, mut cx| {
4564 async move {
4565 thread
4566 .update(&mut cx, |thread, cx| {
4567 thread.handle_session_update(
4568 acp::SessionUpdate::ToolCall(
4569 acp::ToolCall::new(
4570 acp::ToolCallId::new("test-tool"),
4571 "Test Tool",
4572 )
4573 .kind(acp::ToolKind::Fetch)
4574 .status(acp::ToolCallStatus::InProgress),
4575 ),
4576 cx,
4577 )
4578 })
4579 .unwrap()
4580 .unwrap();
4581
4582 Ok(acp::PromptResponse::new(acp::StopReason::Cancelled))
4583 }
4584 .boxed_local()
4585 },
4586 ));
4587
4588 let thread = cx
4589 .update(|cx| connection.new_session(project, Path::new(path!("/test")), cx))
4590 .await
4591 .unwrap();
4592
4593 let response = thread
4594 .update(cx, |thread, cx| thread.send_raw("test message", cx))
4595 .await;
4596
4597 let response = response
4598 .expect("send should succeed")
4599 .expect("should have response");
4600 assert_eq!(
4601 response.stop_reason,
4602 acp::StopReason::Cancelled,
4603 "response should have Cancelled stop_reason"
4604 );
4605
4606 thread.read_with(cx, |thread, _| {
4607 let tool_entry = thread
4608 .entries
4609 .iter()
4610 .find_map(|e| {
4611 if let AgentThreadEntry::ToolCall(call) = e {
4612 Some(call)
4613 } else {
4614 None
4615 }
4616 })
4617 .expect("should have tool call entry");
4618
4619 assert!(
4620 matches!(tool_entry.status, ToolCallStatus::Canceled),
4621 "tool should be marked as Canceled when response is Cancelled, got {:?}",
4622 tool_entry.status
4623 );
4624 });
4625 }
4626}