1mod connection;
2mod diff;
3mod mention;
4mod terminal;
5use action_log::{ActionLog, ActionLogTelemetry};
6use agent_client_protocol::{self as acp};
7use anyhow::{Context as _, Result, anyhow};
8use collections::HashSet;
9pub use connection::*;
10pub use diff::*;
11use futures::{FutureExt, channel::oneshot, future::BoxFuture};
12use gpui::{AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Task, WeakEntity};
13use itertools::Itertools;
14use language::language_settings::FormatOnSave;
15use language::{Anchor, Buffer, BufferSnapshot, LanguageRegistry, Point, ToPoint, text_diff};
16use markdown::Markdown;
17pub use mention::*;
18use project::lsp_store::{FormatTrigger, LspFormatTarget};
19use project::{AgentLocation, Project, git_store::GitStoreCheckpoint};
20use serde::{Deserialize, Serialize};
21use serde_json::to_string_pretty;
22use std::collections::HashMap;
23use std::error::Error;
24use std::fmt::{Formatter, Write};
25use std::ops::Range;
26use std::process::ExitStatus;
27use std::rc::Rc;
28use std::time::{Duration, Instant};
29use std::{fmt::Display, mem, path::PathBuf, sync::Arc};
30use task::{Shell, ShellBuilder};
31pub use terminal::*;
32use text::Bias;
33use ui::App;
34use util::markdown::MarkdownEscaped;
35use util::path_list::PathList;
36use util::{ResultExt, get_default_system_shell_preferring_bash, paths::PathStyle};
37use uuid::Uuid;
38
39/// Key used in ACP ToolCall meta to store the tool's programmatic name.
40/// This is a workaround since ACP's ToolCall doesn't have a dedicated name field.
41pub const TOOL_NAME_META_KEY: &str = "tool_name";
42
43/// Helper to extract tool name from ACP meta
44pub fn tool_name_from_meta(meta: &Option<acp::Meta>) -> Option<SharedString> {
45 meta.as_ref()
46 .and_then(|m| m.get(TOOL_NAME_META_KEY))
47 .and_then(|v| v.as_str())
48 .map(|s| SharedString::from(s.to_owned()))
49}
50
51/// Helper to create meta with tool name
52pub fn meta_with_tool_name(tool_name: &str) -> acp::Meta {
53 acp::Meta::from_iter([(TOOL_NAME_META_KEY.into(), tool_name.into())])
54}
55
56/// Key used in ACP ToolCall meta to store the session id and message indexes
57pub const SUBAGENT_SESSION_INFO_META_KEY: &str = "subagent_session_info";
58
59#[derive(Clone, Debug, Deserialize, Serialize)]
60pub struct SubagentSessionInfo {
61 /// The session id of the subagent sessiont that was spawned
62 pub session_id: acp::SessionId,
63 /// The index of the message of the start of the "turn" run by this tool call
64 pub message_start_index: usize,
65 /// The index of the output of the message that the subagent has returned
66 #[serde(skip_serializing_if = "Option::is_none")]
67 pub message_end_index: Option<usize>,
68}
69
70/// Helper to extract subagent session id from ACP meta
71pub fn subagent_session_info_from_meta(meta: &Option<acp::Meta>) -> Option<SubagentSessionInfo> {
72 meta.as_ref()
73 .and_then(|m| m.get(SUBAGENT_SESSION_INFO_META_KEY))
74 .and_then(|v| serde_json::from_value(v.clone()).ok())
75}
76
77#[derive(Debug)]
78pub struct UserMessage {
79 pub id: Option<UserMessageId>,
80 pub content: ContentBlock,
81 pub chunks: Vec<acp::ContentBlock>,
82 pub checkpoint: Option<Checkpoint>,
83 pub indented: bool,
84}
85
86#[derive(Debug)]
87pub struct Checkpoint {
88 git_checkpoint: GitStoreCheckpoint,
89 pub show: bool,
90}
91
92impl UserMessage {
93 fn to_markdown(&self, cx: &App) -> String {
94 let mut markdown = String::new();
95 if self
96 .checkpoint
97 .as_ref()
98 .is_some_and(|checkpoint| checkpoint.show)
99 {
100 writeln!(markdown, "## User (checkpoint)").unwrap();
101 } else {
102 writeln!(markdown, "## User").unwrap();
103 }
104 writeln!(markdown).unwrap();
105 writeln!(markdown, "{}", self.content.to_markdown(cx)).unwrap();
106 writeln!(markdown).unwrap();
107 markdown
108 }
109}
110
111#[derive(Debug, PartialEq)]
112pub struct AssistantMessage {
113 pub chunks: Vec<AssistantMessageChunk>,
114 pub indented: bool,
115 pub is_subagent_output: bool,
116}
117
118impl AssistantMessage {
119 pub fn to_markdown(&self, cx: &App) -> String {
120 format!(
121 "## Assistant\n\n{}\n\n",
122 self.chunks
123 .iter()
124 .map(|chunk| chunk.to_markdown(cx))
125 .join("\n\n")
126 )
127 }
128}
129
130#[derive(Debug, PartialEq)]
131pub enum AssistantMessageChunk {
132 Message { block: ContentBlock },
133 Thought { block: ContentBlock },
134}
135
136impl AssistantMessageChunk {
137 pub fn from_str(
138 chunk: &str,
139 language_registry: &Arc<LanguageRegistry>,
140 path_style: PathStyle,
141 cx: &mut App,
142 ) -> Self {
143 Self::Message {
144 block: ContentBlock::new(chunk.into(), language_registry, path_style, cx),
145 }
146 }
147
148 fn to_markdown(&self, cx: &App) -> String {
149 match self {
150 Self::Message { block } => block.to_markdown(cx).to_string(),
151 Self::Thought { block } => {
152 format!("<thinking>\n{}\n</thinking>", block.to_markdown(cx))
153 }
154 }
155 }
156}
157
158#[derive(Debug)]
159pub enum AgentThreadEntry {
160 UserMessage(UserMessage),
161 AssistantMessage(AssistantMessage),
162 ToolCall(ToolCall),
163}
164
165impl AgentThreadEntry {
166 pub fn is_indented(&self) -> bool {
167 match self {
168 Self::UserMessage(message) => message.indented,
169 Self::AssistantMessage(message) => message.indented,
170 Self::ToolCall(_) => false,
171 }
172 }
173
174 pub fn to_markdown(&self, cx: &App) -> String {
175 match self {
176 Self::UserMessage(message) => message.to_markdown(cx),
177 Self::AssistantMessage(message) => message.to_markdown(cx),
178 Self::ToolCall(tool_call) => tool_call.to_markdown(cx),
179 }
180 }
181
182 pub fn user_message(&self) -> Option<&UserMessage> {
183 if let AgentThreadEntry::UserMessage(message) = self {
184 Some(message)
185 } else {
186 None
187 }
188 }
189
190 pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
191 if let AgentThreadEntry::ToolCall(call) = self {
192 itertools::Either::Left(call.diffs())
193 } else {
194 itertools::Either::Right(std::iter::empty())
195 }
196 }
197
198 pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
199 if let AgentThreadEntry::ToolCall(call) = self {
200 itertools::Either::Left(call.terminals())
201 } else {
202 itertools::Either::Right(std::iter::empty())
203 }
204 }
205
206 pub fn location(&self, ix: usize) -> Option<(acp::ToolCallLocation, AgentLocation)> {
207 if let AgentThreadEntry::ToolCall(ToolCall {
208 locations,
209 resolved_locations,
210 ..
211 }) = self
212 {
213 Some((
214 locations.get(ix)?.clone(),
215 resolved_locations.get(ix)?.clone()?,
216 ))
217 } else {
218 None
219 }
220 }
221}
222
223#[derive(Debug)]
224pub struct ToolCall {
225 pub id: acp::ToolCallId,
226 pub label: Entity<Markdown>,
227 pub kind: acp::ToolKind,
228 pub content: Vec<ToolCallContent>,
229 pub status: ToolCallStatus,
230 pub locations: Vec<acp::ToolCallLocation>,
231 pub resolved_locations: Vec<Option<AgentLocation>>,
232 pub raw_input: Option<serde_json::Value>,
233 pub raw_input_markdown: Option<Entity<Markdown>>,
234 pub raw_output: Option<serde_json::Value>,
235 pub tool_name: Option<SharedString>,
236 pub subagent_session_info: Option<SubagentSessionInfo>,
237}
238
239impl ToolCall {
240 fn from_acp(
241 tool_call: acp::ToolCall,
242 status: ToolCallStatus,
243 language_registry: Arc<LanguageRegistry>,
244 path_style: PathStyle,
245 terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
246 cx: &mut App,
247 ) -> Result<Self> {
248 let title = if tool_call.kind == acp::ToolKind::Execute {
249 tool_call.title
250 } else if tool_call.kind == acp::ToolKind::Edit {
251 MarkdownEscaped(tool_call.title.as_str()).to_string()
252 } else if let Some((first_line, _)) = tool_call.title.split_once("\n") {
253 first_line.to_owned() + "…"
254 } else {
255 tool_call.title
256 };
257 let mut content = Vec::with_capacity(tool_call.content.len());
258 for item in tool_call.content {
259 if let Some(item) = ToolCallContent::from_acp(
260 item,
261 language_registry.clone(),
262 path_style,
263 terminals,
264 cx,
265 )? {
266 content.push(item);
267 }
268 }
269
270 let raw_input_markdown = tool_call
271 .raw_input
272 .as_ref()
273 .and_then(|input| markdown_for_raw_output(input, &language_registry, cx));
274
275 let tool_name = tool_name_from_meta(&tool_call.meta);
276
277 let subagent_session_info = subagent_session_info_from_meta(&tool_call.meta);
278
279 let result = Self {
280 id: tool_call.tool_call_id,
281 label: cx
282 .new(|cx| Markdown::new(title.into(), Some(language_registry.clone()), None, cx)),
283 kind: tool_call.kind,
284 content,
285 locations: tool_call.locations,
286 resolved_locations: Vec::default(),
287 status,
288 raw_input: tool_call.raw_input,
289 raw_input_markdown,
290 raw_output: tool_call.raw_output,
291 tool_name,
292 subagent_session_info,
293 };
294 Ok(result)
295 }
296
297 fn update_fields(
298 &mut self,
299 fields: acp::ToolCallUpdateFields,
300 meta: Option<acp::Meta>,
301 language_registry: Arc<LanguageRegistry>,
302 path_style: PathStyle,
303 terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
304 cx: &mut App,
305 ) -> Result<()> {
306 let acp::ToolCallUpdateFields {
307 kind,
308 status,
309 title,
310 content,
311 locations,
312 raw_input,
313 raw_output,
314 ..
315 } = fields;
316
317 if let Some(kind) = kind {
318 self.kind = kind;
319 }
320
321 if let Some(status) = status {
322 self.status = status.into();
323 }
324
325 if let Some(tool_name) = tool_name_from_meta(&meta) {
326 self.tool_name = Some(tool_name);
327 }
328
329 if let Some(subagent_session_info) = subagent_session_info_from_meta(&meta) {
330 self.subagent_session_info = Some(subagent_session_info);
331 }
332
333 if let Some(title) = title {
334 if self.kind == acp::ToolKind::Execute {
335 for terminal in self.terminals() {
336 terminal.update(cx, |terminal, cx| {
337 terminal.update_command_label(&title, cx);
338 });
339 }
340 }
341 self.label.update(cx, |label, cx| {
342 if self.kind == acp::ToolKind::Execute {
343 label.replace(title, cx);
344 } else if self.kind == acp::ToolKind::Edit {
345 label.replace(MarkdownEscaped(&title).to_string(), cx)
346 } else if let Some((first_line, _)) = title.split_once("\n") {
347 label.replace(first_line.to_owned() + "…", cx);
348 } else {
349 label.replace(title, cx);
350 }
351 });
352 }
353
354 if let Some(content) = content {
355 let mut new_content_len = content.len();
356 let mut content = content.into_iter();
357
358 // Reuse existing content if we can
359 for (old, new) in self.content.iter_mut().zip(content.by_ref()) {
360 let valid_content =
361 old.update_from_acp(new, language_registry.clone(), path_style, terminals, cx)?;
362 if !valid_content {
363 new_content_len -= 1;
364 }
365 }
366 for new in content {
367 if let Some(new) = ToolCallContent::from_acp(
368 new,
369 language_registry.clone(),
370 path_style,
371 terminals,
372 cx,
373 )? {
374 self.content.push(new);
375 } else {
376 new_content_len -= 1;
377 }
378 }
379 self.content.truncate(new_content_len);
380 }
381
382 if let Some(locations) = locations {
383 self.locations = locations;
384 }
385
386 if let Some(raw_input) = raw_input {
387 self.raw_input_markdown = markdown_for_raw_output(&raw_input, &language_registry, cx);
388 self.raw_input = Some(raw_input);
389 }
390
391 if let Some(raw_output) = raw_output {
392 if self.content.is_empty()
393 && let Some(markdown) = markdown_for_raw_output(&raw_output, &language_registry, cx)
394 {
395 self.content
396 .push(ToolCallContent::ContentBlock(ContentBlock::Markdown {
397 markdown,
398 }));
399 }
400 self.raw_output = Some(raw_output);
401 }
402 Ok(())
403 }
404
405 pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
406 self.content.iter().filter_map(|content| match content {
407 ToolCallContent::Diff(diff) => Some(diff),
408 ToolCallContent::ContentBlock(_) => None,
409 ToolCallContent::Terminal(_) => None,
410 })
411 }
412
413 pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
414 self.content.iter().filter_map(|content| match content {
415 ToolCallContent::Terminal(terminal) => Some(terminal),
416 ToolCallContent::ContentBlock(_) => None,
417 ToolCallContent::Diff(_) => None,
418 })
419 }
420
421 pub fn is_subagent(&self) -> bool {
422 self.tool_name.as_ref().is_some_and(|s| s == "spawn_agent")
423 || self.subagent_session_info.is_some()
424 }
425
426 pub fn to_markdown(&self, cx: &App) -> String {
427 let mut markdown = format!(
428 "**Tool Call: {}**\nStatus: {}\n\n",
429 self.label.read(cx).source(),
430 self.status
431 );
432 for content in &self.content {
433 markdown.push_str(content.to_markdown(cx).as_str());
434 markdown.push_str("\n\n");
435 }
436 markdown
437 }
438
439 async fn resolve_location(
440 location: acp::ToolCallLocation,
441 project: WeakEntity<Project>,
442 cx: &mut AsyncApp,
443 ) -> Option<ResolvedLocation> {
444 let buffer = project
445 .update(cx, |project, cx| {
446 project
447 .project_path_for_absolute_path(&location.path, cx)
448 .map(|path| project.open_buffer(path, cx))
449 })
450 .ok()??;
451 let buffer = buffer.await.log_err()?;
452 let position = buffer.update(cx, |buffer, _| {
453 let snapshot = buffer.snapshot();
454 if let Some(row) = location.line {
455 let column = snapshot.indent_size_for_line(row).len;
456 let point = snapshot.clip_point(Point::new(row, column), Bias::Left);
457 snapshot.anchor_before(point)
458 } else {
459 Anchor::min_for_buffer(snapshot.remote_id())
460 }
461 });
462
463 Some(ResolvedLocation { buffer, position })
464 }
465
466 fn resolve_locations(
467 &self,
468 project: Entity<Project>,
469 cx: &mut App,
470 ) -> Task<Vec<Option<ResolvedLocation>>> {
471 let locations = self.locations.clone();
472 project.update(cx, |_, cx| {
473 cx.spawn(async move |project, cx| {
474 let mut new_locations = Vec::new();
475 for location in locations {
476 new_locations.push(Self::resolve_location(location, project.clone(), cx).await);
477 }
478 new_locations
479 })
480 })
481 }
482}
483
484// Separate so we can hold a strong reference to the buffer
485// for saving on the thread
486#[derive(Clone, Debug, PartialEq, Eq)]
487struct ResolvedLocation {
488 buffer: Entity<Buffer>,
489 position: Anchor,
490}
491
492impl From<&ResolvedLocation> for AgentLocation {
493 fn from(value: &ResolvedLocation) -> Self {
494 Self {
495 buffer: value.buffer.downgrade(),
496 position: value.position,
497 }
498 }
499}
500
501#[derive(Debug, Clone)]
502pub enum SelectedPermissionParams {
503 Terminal { patterns: Vec<String> },
504}
505
506#[derive(Debug)]
507pub struct SelectedPermissionOutcome {
508 pub option_id: acp::PermissionOptionId,
509 pub params: Option<SelectedPermissionParams>,
510}
511
512impl SelectedPermissionOutcome {
513 pub fn new(option_id: acp::PermissionOptionId) -> Self {
514 Self {
515 option_id,
516 params: None,
517 }
518 }
519
520 pub fn params(mut self, params: Option<SelectedPermissionParams>) -> Self {
521 self.params = params;
522 self
523 }
524}
525
526impl From<acp::PermissionOptionId> for SelectedPermissionOutcome {
527 fn from(option_id: acp::PermissionOptionId) -> Self {
528 Self::new(option_id)
529 }
530}
531
532impl From<SelectedPermissionOutcome> for acp::SelectedPermissionOutcome {
533 fn from(value: SelectedPermissionOutcome) -> Self {
534 Self::new(value.option_id)
535 }
536}
537
538#[derive(Debug)]
539pub enum RequestPermissionOutcome {
540 Cancelled,
541 Selected(SelectedPermissionOutcome),
542}
543
544impl From<RequestPermissionOutcome> for acp::RequestPermissionOutcome {
545 fn from(value: RequestPermissionOutcome) -> Self {
546 match value {
547 RequestPermissionOutcome::Cancelled => Self::Cancelled,
548 RequestPermissionOutcome::Selected(outcome) => Self::Selected(outcome.into()),
549 }
550 }
551}
552
553#[derive(Debug)]
554pub enum ToolCallStatus {
555 /// The tool call hasn't started running yet, but we start showing it to
556 /// the user.
557 Pending,
558 /// The tool call is waiting for confirmation from the user.
559 WaitingForConfirmation {
560 options: PermissionOptions,
561 respond_tx: oneshot::Sender<SelectedPermissionOutcome>,
562 },
563 /// The tool call is currently running.
564 InProgress,
565 /// The tool call completed successfully.
566 Completed,
567 /// The tool call failed.
568 Failed,
569 /// The user rejected the tool call.
570 Rejected,
571 /// The user canceled generation so the tool call was canceled.
572 Canceled,
573}
574
575impl From<acp::ToolCallStatus> for ToolCallStatus {
576 fn from(status: acp::ToolCallStatus) -> Self {
577 match status {
578 acp::ToolCallStatus::Pending => Self::Pending,
579 acp::ToolCallStatus::InProgress => Self::InProgress,
580 acp::ToolCallStatus::Completed => Self::Completed,
581 acp::ToolCallStatus::Failed => Self::Failed,
582 _ => Self::Pending,
583 }
584 }
585}
586
587impl Display for ToolCallStatus {
588 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
589 write!(
590 f,
591 "{}",
592 match self {
593 ToolCallStatus::Pending => "Pending",
594 ToolCallStatus::WaitingForConfirmation { .. } => "Waiting for confirmation",
595 ToolCallStatus::InProgress => "In Progress",
596 ToolCallStatus::Completed => "Completed",
597 ToolCallStatus::Failed => "Failed",
598 ToolCallStatus::Rejected => "Rejected",
599 ToolCallStatus::Canceled => "Canceled",
600 }
601 )
602 }
603}
604
605#[derive(Debug, PartialEq, Clone)]
606pub enum ContentBlock {
607 Empty,
608 Markdown { markdown: Entity<Markdown> },
609 ResourceLink { resource_link: acp::ResourceLink },
610 Image { image: Arc<gpui::Image> },
611}
612
613impl ContentBlock {
614 pub fn new(
615 block: acp::ContentBlock,
616 language_registry: &Arc<LanguageRegistry>,
617 path_style: PathStyle,
618 cx: &mut App,
619 ) -> Self {
620 let mut this = Self::Empty;
621 this.append(block, language_registry, path_style, cx);
622 this
623 }
624
625 pub fn new_combined(
626 blocks: impl IntoIterator<Item = acp::ContentBlock>,
627 language_registry: Arc<LanguageRegistry>,
628 path_style: PathStyle,
629 cx: &mut App,
630 ) -> Self {
631 let mut this = Self::Empty;
632 for block in blocks {
633 this.append(block, &language_registry, path_style, cx);
634 }
635 this
636 }
637
638 pub fn append(
639 &mut self,
640 block: acp::ContentBlock,
641 language_registry: &Arc<LanguageRegistry>,
642 path_style: PathStyle,
643 cx: &mut App,
644 ) {
645 match (&mut *self, &block) {
646 (ContentBlock::Empty, acp::ContentBlock::ResourceLink(resource_link)) => {
647 *self = ContentBlock::ResourceLink {
648 resource_link: resource_link.clone(),
649 };
650 }
651 (ContentBlock::Empty, acp::ContentBlock::Image(image_content)) => {
652 if let Some(image) = Self::decode_image(image_content) {
653 *self = ContentBlock::Image { image };
654 } else {
655 let new_content = Self::image_md(image_content);
656 *self = Self::create_markdown_block(new_content, language_registry, cx);
657 }
658 }
659 (ContentBlock::Empty, _) => {
660 let new_content = Self::block_string_contents(&block, path_style);
661 *self = Self::create_markdown_block(new_content, language_registry, cx);
662 }
663 (ContentBlock::Markdown { markdown }, _) => {
664 let new_content = Self::block_string_contents(&block, path_style);
665 markdown.update(cx, |markdown, cx| markdown.append(&new_content, cx));
666 }
667 (ContentBlock::ResourceLink { resource_link }, _) => {
668 let existing_content = Self::resource_link_md(&resource_link.uri, path_style);
669 let new_content = Self::block_string_contents(&block, path_style);
670 let combined = format!("{}\n{}", existing_content, new_content);
671 *self = Self::create_markdown_block(combined, language_registry, cx);
672 }
673 (ContentBlock::Image { .. }, _) => {
674 let new_content = Self::block_string_contents(&block, path_style);
675 let combined = format!("`Image`\n{}", new_content);
676 *self = Self::create_markdown_block(combined, language_registry, cx);
677 }
678 }
679 }
680
681 fn decode_image(image_content: &acp::ImageContent) -> Option<Arc<gpui::Image>> {
682 use base64::Engine as _;
683
684 let bytes = base64::engine::general_purpose::STANDARD
685 .decode(image_content.data.as_bytes())
686 .ok()?;
687 let format = gpui::ImageFormat::from_mime_type(&image_content.mime_type)?;
688 Some(Arc::new(gpui::Image::from_bytes(format, bytes)))
689 }
690
691 fn create_markdown_block(
692 content: String,
693 language_registry: &Arc<LanguageRegistry>,
694 cx: &mut App,
695 ) -> ContentBlock {
696 ContentBlock::Markdown {
697 markdown: cx
698 .new(|cx| Markdown::new(content.into(), Some(language_registry.clone()), None, cx)),
699 }
700 }
701
702 fn block_string_contents(block: &acp::ContentBlock, path_style: PathStyle) -> String {
703 match block {
704 acp::ContentBlock::Text(text_content) => text_content.text.clone(),
705 acp::ContentBlock::ResourceLink(resource_link) => {
706 Self::resource_link_md(&resource_link.uri, path_style)
707 }
708 acp::ContentBlock::Resource(acp::EmbeddedResource {
709 resource:
710 acp::EmbeddedResourceResource::TextResourceContents(acp::TextResourceContents {
711 uri,
712 ..
713 }),
714 ..
715 }) => Self::resource_link_md(uri, path_style),
716 acp::ContentBlock::Image(image) => Self::image_md(image),
717 _ => String::new(),
718 }
719 }
720
721 fn resource_link_md(uri: &str, path_style: PathStyle) -> String {
722 if let Some(uri) = MentionUri::parse(uri, path_style).log_err() {
723 uri.as_link().to_string()
724 } else {
725 uri.to_string()
726 }
727 }
728
729 fn image_md(_image: &acp::ImageContent) -> String {
730 "`Image`".into()
731 }
732
733 pub fn to_markdown<'a>(&'a self, cx: &'a App) -> &'a str {
734 match self {
735 ContentBlock::Empty => "",
736 ContentBlock::Markdown { markdown } => markdown.read(cx).source(),
737 ContentBlock::ResourceLink { resource_link } => &resource_link.uri,
738 ContentBlock::Image { .. } => "`Image`",
739 }
740 }
741
742 pub fn markdown(&self) -> Option<&Entity<Markdown>> {
743 match self {
744 ContentBlock::Empty => None,
745 ContentBlock::Markdown { markdown } => Some(markdown),
746 ContentBlock::ResourceLink { .. } => None,
747 ContentBlock::Image { .. } => None,
748 }
749 }
750
751 pub fn resource_link(&self) -> Option<&acp::ResourceLink> {
752 match self {
753 ContentBlock::ResourceLink { resource_link } => Some(resource_link),
754 _ => None,
755 }
756 }
757
758 pub fn image(&self) -> Option<&Arc<gpui::Image>> {
759 match self {
760 ContentBlock::Image { image } => Some(image),
761 _ => None,
762 }
763 }
764}
765
766#[derive(Debug)]
767pub enum ToolCallContent {
768 ContentBlock(ContentBlock),
769 Diff(Entity<Diff>),
770 Terminal(Entity<Terminal>),
771}
772
773impl ToolCallContent {
774 pub fn from_acp(
775 content: acp::ToolCallContent,
776 language_registry: Arc<LanguageRegistry>,
777 path_style: PathStyle,
778 terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
779 cx: &mut App,
780 ) -> Result<Option<Self>> {
781 match content {
782 acp::ToolCallContent::Content(acp::Content { content, .. }) => {
783 Ok(Some(Self::ContentBlock(ContentBlock::new(
784 content,
785 &language_registry,
786 path_style,
787 cx,
788 ))))
789 }
790 acp::ToolCallContent::Diff(diff) => Ok(Some(Self::Diff(cx.new(|cx| {
791 Diff::finalized(
792 diff.path.to_string_lossy().into_owned(),
793 diff.old_text,
794 diff.new_text,
795 language_registry,
796 cx,
797 )
798 })))),
799 acp::ToolCallContent::Terminal(acp::Terminal { terminal_id, .. }) => terminals
800 .get(&terminal_id)
801 .cloned()
802 .map(|terminal| Some(Self::Terminal(terminal)))
803 .ok_or_else(|| anyhow::anyhow!("Terminal with id `{}` not found", terminal_id)),
804 _ => Ok(None),
805 }
806 }
807
808 pub fn update_from_acp(
809 &mut self,
810 new: acp::ToolCallContent,
811 language_registry: Arc<LanguageRegistry>,
812 path_style: PathStyle,
813 terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
814 cx: &mut App,
815 ) -> Result<bool> {
816 let needs_update = match (&self, &new) {
817 (Self::Diff(old_diff), acp::ToolCallContent::Diff(new_diff)) => {
818 old_diff.read(cx).needs_update(
819 new_diff.old_text.as_deref().unwrap_or(""),
820 &new_diff.new_text,
821 cx,
822 )
823 }
824 _ => true,
825 };
826
827 if let Some(update) = Self::from_acp(new, language_registry, path_style, terminals, cx)? {
828 if needs_update {
829 *self = update;
830 }
831 Ok(true)
832 } else {
833 Ok(false)
834 }
835 }
836
837 pub fn to_markdown(&self, cx: &App) -> String {
838 match self {
839 Self::ContentBlock(content) => content.to_markdown(cx).to_string(),
840 Self::Diff(diff) => diff.read(cx).to_markdown(cx),
841 Self::Terminal(terminal) => terminal.read(cx).to_markdown(cx),
842 }
843 }
844
845 pub fn image(&self) -> Option<&Arc<gpui::Image>> {
846 match self {
847 Self::ContentBlock(content) => content.image(),
848 _ => None,
849 }
850 }
851}
852
853#[derive(Debug, PartialEq)]
854pub enum ToolCallUpdate {
855 UpdateFields(acp::ToolCallUpdate),
856 UpdateDiff(ToolCallUpdateDiff),
857 UpdateTerminal(ToolCallUpdateTerminal),
858}
859
860impl ToolCallUpdate {
861 fn id(&self) -> &acp::ToolCallId {
862 match self {
863 Self::UpdateFields(update) => &update.tool_call_id,
864 Self::UpdateDiff(diff) => &diff.id,
865 Self::UpdateTerminal(terminal) => &terminal.id,
866 }
867 }
868}
869
870impl From<acp::ToolCallUpdate> for ToolCallUpdate {
871 fn from(update: acp::ToolCallUpdate) -> Self {
872 Self::UpdateFields(update)
873 }
874}
875
876impl From<ToolCallUpdateDiff> for ToolCallUpdate {
877 fn from(diff: ToolCallUpdateDiff) -> Self {
878 Self::UpdateDiff(diff)
879 }
880}
881
882#[derive(Debug, PartialEq)]
883pub struct ToolCallUpdateDiff {
884 pub id: acp::ToolCallId,
885 pub diff: Entity<Diff>,
886}
887
888impl From<ToolCallUpdateTerminal> for ToolCallUpdate {
889 fn from(terminal: ToolCallUpdateTerminal) -> Self {
890 Self::UpdateTerminal(terminal)
891 }
892}
893
894#[derive(Debug, PartialEq)]
895pub struct ToolCallUpdateTerminal {
896 pub id: acp::ToolCallId,
897 pub terminal: Entity<Terminal>,
898}
899
900#[derive(Debug, Default)]
901pub struct Plan {
902 pub entries: Vec<PlanEntry>,
903}
904
905#[derive(Debug)]
906pub struct PlanStats<'a> {
907 pub in_progress_entry: Option<&'a PlanEntry>,
908 pub pending: u32,
909 pub completed: u32,
910}
911
912impl Plan {
913 pub fn is_empty(&self) -> bool {
914 self.entries.is_empty()
915 }
916
917 pub fn stats(&self) -> PlanStats<'_> {
918 let mut stats = PlanStats {
919 in_progress_entry: None,
920 pending: 0,
921 completed: 0,
922 };
923
924 for entry in &self.entries {
925 match &entry.status {
926 acp::PlanEntryStatus::Pending => {
927 stats.pending += 1;
928 }
929 acp::PlanEntryStatus::InProgress => {
930 stats.in_progress_entry = stats.in_progress_entry.or(Some(entry));
931 }
932 acp::PlanEntryStatus::Completed => {
933 stats.completed += 1;
934 }
935 _ => {}
936 }
937 }
938
939 stats
940 }
941}
942
943#[derive(Debug)]
944pub struct PlanEntry {
945 pub content: Entity<Markdown>,
946 pub priority: acp::PlanEntryPriority,
947 pub status: acp::PlanEntryStatus,
948}
949
950impl PlanEntry {
951 pub fn from_acp(entry: acp::PlanEntry, cx: &mut App) -> Self {
952 Self {
953 content: cx.new(|cx| Markdown::new(entry.content.into(), None, None, cx)),
954 priority: entry.priority,
955 status: entry.status,
956 }
957 }
958}
959
960#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
961pub struct TokenUsage {
962 pub max_tokens: u64,
963 pub used_tokens: u64,
964 pub input_tokens: u64,
965 pub output_tokens: u64,
966 pub max_output_tokens: Option<u64>,
967}
968
969pub const TOKEN_USAGE_WARNING_THRESHOLD: f32 = 0.8;
970
971impl TokenUsage {
972 pub fn ratio(&self) -> TokenUsageRatio {
973 #[cfg(debug_assertions)]
974 let warning_threshold: f32 = std::env::var("ZED_THREAD_WARNING_THRESHOLD")
975 .unwrap_or(TOKEN_USAGE_WARNING_THRESHOLD.to_string())
976 .parse()
977 .unwrap();
978 #[cfg(not(debug_assertions))]
979 let warning_threshold: f32 = TOKEN_USAGE_WARNING_THRESHOLD;
980
981 // When the maximum is unknown because there is no selected model,
982 // avoid showing the token limit warning.
983 if self.max_tokens == 0 {
984 TokenUsageRatio::Normal
985 } else if self.used_tokens >= self.max_tokens {
986 TokenUsageRatio::Exceeded
987 } else if self.used_tokens as f32 / self.max_tokens as f32 >= warning_threshold {
988 TokenUsageRatio::Warning
989 } else {
990 TokenUsageRatio::Normal
991 }
992 }
993}
994
995#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
996pub enum TokenUsageRatio {
997 Normal,
998 Warning,
999 Exceeded,
1000}
1001
1002#[derive(Debug, Clone)]
1003pub struct RetryStatus {
1004 pub last_error: SharedString,
1005 pub attempt: usize,
1006 pub max_attempts: usize,
1007 pub started_at: Instant,
1008 pub duration: Duration,
1009}
1010
1011struct RunningTurn {
1012 id: u32,
1013 send_task: Task<()>,
1014}
1015
1016#[derive(Clone)]
1017struct InferredEditCandidateReady {
1018 nonce: u64,
1019 buffer: Entity<Buffer>,
1020}
1021
1022enum InferredEditCandidateState {
1023 Pending { nonce: u64 },
1024 Ready(InferredEditCandidateReady),
1025 Finalizing(InferredEditCandidateReady),
1026}
1027
1028impl InferredEditCandidateState {
1029 fn nonce(&self) -> u64 {
1030 match self {
1031 Self::Pending { nonce } => *nonce,
1032 Self::Ready(candidate) | Self::Finalizing(candidate) => candidate.nonce,
1033 }
1034 }
1035
1036 fn into_buffer_to_end(self) -> Option<Entity<Buffer>> {
1037 match self {
1038 Self::Ready(candidate) | Self::Finalizing(candidate) => Some(candidate.buffer),
1039 Self::Pending { .. } => None,
1040 }
1041 }
1042}
1043
1044pub struct AcpThread {
1045 session_id: acp::SessionId,
1046 work_dirs: Option<PathList>,
1047 parent_session_id: Option<acp::SessionId>,
1048 title: SharedString,
1049 provisional_title: Option<SharedString>,
1050 entries: Vec<AgentThreadEntry>,
1051 plan: Plan,
1052 project: Entity<Project>,
1053 action_log: Entity<ActionLog>,
1054 shared_buffers: HashMap<Entity<Buffer>, BufferSnapshot>,
1055 turn_id: u32,
1056 running_turn: Option<RunningTurn>,
1057 connection: Rc<dyn AgentConnection>,
1058 token_usage: Option<TokenUsage>,
1059 prompt_capabilities: acp::PromptCapabilities,
1060 _observe_prompt_capabilities: Task<anyhow::Result<()>>,
1061 terminals: HashMap<acp::TerminalId, Entity<Terminal>>,
1062 pending_terminal_output: HashMap<acp::TerminalId, Vec<Vec<u8>>>,
1063 pending_terminal_exit: HashMap<acp::TerminalId, acp::TerminalExitStatus>,
1064 inferred_edit_candidates:
1065 HashMap<acp::ToolCallId, HashMap<PathBuf, InferredEditCandidateState>>,
1066 inferred_edit_tool_call_turns: HashMap<acp::ToolCallId, u32>,
1067 finalizing_inferred_edit_tool_calls: HashSet<acp::ToolCallId>,
1068 next_inferred_edit_candidate_nonce: u64,
1069
1070 had_error: bool,
1071 /// The user's unsent prompt text, persisted so it can be restored when reloading the thread.
1072 draft_prompt: Option<Vec<acp::ContentBlock>>,
1073 /// The initial scroll position for the thread view, set during session registration.
1074 ui_scroll_position: Option<gpui::ListOffset>,
1075 /// Buffer for smooth text streaming. Holds text that has been received from
1076 /// the model but not yet revealed in the UI. A timer task drains this buffer
1077 /// gradually to create a fluid typing effect instead of choppy chunk-at-a-time
1078 /// updates.
1079 streaming_text_buffer: Option<StreamingTextBuffer>,
1080}
1081
1082struct StreamingTextBuffer {
1083 /// Text received from the model but not yet appended to the Markdown source.
1084 pending: String,
1085 /// The number of bytes to reveal per timer turn.
1086 bytes_to_reveal_per_tick: usize,
1087 /// The Markdown entity being streamed into.
1088 target: Entity<Markdown>,
1089 /// Timer task that periodically moves text from `pending` into `source`.
1090 _reveal_task: Task<()>,
1091}
1092
1093impl StreamingTextBuffer {
1094 /// The number of milliseconds between each timer tick, controlling how quickly
1095 /// text is revealed.
1096 const TASK_UPDATE_MS: u64 = 16;
1097 /// The time in milliseconds to reveal the entire pending text.
1098 const REVEAL_TARGET: f32 = 200.0;
1099}
1100
1101impl From<&AcpThread> for ActionLogTelemetry {
1102 fn from(value: &AcpThread) -> Self {
1103 Self {
1104 agent_telemetry_id: value.connection().telemetry_id(),
1105 session_id: value.session_id.0.clone(),
1106 }
1107 }
1108}
1109
1110#[derive(Debug)]
1111pub enum AcpThreadEvent {
1112 NewEntry,
1113 TitleUpdated,
1114 TokenUsageUpdated,
1115 EntryUpdated(usize),
1116 EntriesRemoved(Range<usize>),
1117 ToolAuthorizationRequested(acp::ToolCallId),
1118 ToolAuthorizationReceived(acp::ToolCallId),
1119 Retry(RetryStatus),
1120 SubagentSpawned(acp::SessionId),
1121 Stopped(acp::StopReason),
1122 Error,
1123 LoadError(LoadError),
1124 PromptCapabilitiesUpdated,
1125 Refusal,
1126 AvailableCommandsUpdated(Vec<acp::AvailableCommand>),
1127 ModeUpdated(acp::SessionModeId),
1128 ConfigOptionsUpdated(Vec<acp::SessionConfigOption>),
1129}
1130
1131impl EventEmitter<AcpThreadEvent> for AcpThread {}
1132
1133#[derive(Debug, Clone)]
1134pub enum TerminalProviderEvent {
1135 Created {
1136 terminal_id: acp::TerminalId,
1137 label: String,
1138 cwd: Option<PathBuf>,
1139 output_byte_limit: Option<u64>,
1140 terminal: Entity<::terminal::Terminal>,
1141 },
1142 Output {
1143 terminal_id: acp::TerminalId,
1144 data: Vec<u8>,
1145 },
1146 TitleChanged {
1147 terminal_id: acp::TerminalId,
1148 title: String,
1149 },
1150 Exit {
1151 terminal_id: acp::TerminalId,
1152 status: acp::TerminalExitStatus,
1153 },
1154}
1155
1156#[derive(Debug, Clone)]
1157pub enum TerminalProviderCommand {
1158 WriteInput {
1159 terminal_id: acp::TerminalId,
1160 bytes: Vec<u8>,
1161 },
1162 Resize {
1163 terminal_id: acp::TerminalId,
1164 cols: u16,
1165 rows: u16,
1166 },
1167 Close {
1168 terminal_id: acp::TerminalId,
1169 },
1170}
1171
1172#[derive(PartialEq, Eq, Debug)]
1173pub enum ThreadStatus {
1174 Idle,
1175 Generating,
1176}
1177
1178#[derive(Debug, Clone)]
1179pub enum LoadError {
1180 Unsupported {
1181 command: SharedString,
1182 current_version: SharedString,
1183 minimum_version: SharedString,
1184 },
1185 FailedToInstall(SharedString),
1186 Exited {
1187 status: ExitStatus,
1188 },
1189 Other(SharedString),
1190}
1191
1192impl Display for LoadError {
1193 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1194 match self {
1195 LoadError::Unsupported {
1196 command: path,
1197 current_version,
1198 minimum_version,
1199 } => {
1200 write!(
1201 f,
1202 "version {current_version} from {path} is not supported (need at least {minimum_version})"
1203 )
1204 }
1205 LoadError::FailedToInstall(msg) => write!(f, "Failed to install: {msg}"),
1206 LoadError::Exited { status } => write!(f, "Server exited with status {status}"),
1207 LoadError::Other(msg) => write!(f, "{msg}"),
1208 }
1209 }
1210}
1211
1212impl Error for LoadError {}
1213
1214impl AcpThread {
1215 pub fn new(
1216 parent_session_id: Option<acp::SessionId>,
1217 title: impl Into<SharedString>,
1218 work_dirs: Option<PathList>,
1219 connection: Rc<dyn AgentConnection>,
1220 project: Entity<Project>,
1221 action_log: Entity<ActionLog>,
1222 session_id: acp::SessionId,
1223 mut prompt_capabilities_rx: watch::Receiver<acp::PromptCapabilities>,
1224 cx: &mut Context<Self>,
1225 ) -> Self {
1226 let prompt_capabilities = prompt_capabilities_rx.borrow().clone();
1227 let task = cx.spawn::<_, anyhow::Result<()>>(async move |this, cx| {
1228 loop {
1229 let caps = prompt_capabilities_rx.recv().await?;
1230 this.update(cx, |this, cx| {
1231 this.prompt_capabilities = caps;
1232 cx.emit(AcpThreadEvent::PromptCapabilitiesUpdated);
1233 })?;
1234 }
1235 });
1236
1237 Self {
1238 parent_session_id,
1239 work_dirs,
1240 action_log,
1241 shared_buffers: Default::default(),
1242 entries: Default::default(),
1243 plan: Default::default(),
1244 title: title.into(),
1245 provisional_title: None,
1246 project,
1247 running_turn: None,
1248 turn_id: 0,
1249 connection,
1250 session_id,
1251 token_usage: None,
1252 prompt_capabilities,
1253 _observe_prompt_capabilities: task,
1254 terminals: HashMap::default(),
1255 pending_terminal_output: HashMap::default(),
1256 pending_terminal_exit: HashMap::default(),
1257 inferred_edit_candidates: HashMap::default(),
1258 inferred_edit_tool_call_turns: HashMap::default(),
1259 finalizing_inferred_edit_tool_calls: HashSet::default(),
1260 next_inferred_edit_candidate_nonce: 0,
1261
1262 had_error: false,
1263 draft_prompt: None,
1264 ui_scroll_position: None,
1265 streaming_text_buffer: None,
1266 }
1267 }
1268
1269 pub fn parent_session_id(&self) -> Option<&acp::SessionId> {
1270 self.parent_session_id.as_ref()
1271 }
1272
1273 pub fn prompt_capabilities(&self) -> acp::PromptCapabilities {
1274 self.prompt_capabilities.clone()
1275 }
1276
1277 pub fn draft_prompt(&self) -> Option<&[acp::ContentBlock]> {
1278 self.draft_prompt.as_deref()
1279 }
1280
1281 pub fn set_draft_prompt(&mut self, prompt: Option<Vec<acp::ContentBlock>>) {
1282 self.draft_prompt = prompt;
1283 }
1284
1285 pub fn ui_scroll_position(&self) -> Option<gpui::ListOffset> {
1286 self.ui_scroll_position
1287 }
1288
1289 pub fn set_ui_scroll_position(&mut self, position: Option<gpui::ListOffset>) {
1290 self.ui_scroll_position = position;
1291 }
1292
1293 pub fn connection(&self) -> &Rc<dyn AgentConnection> {
1294 &self.connection
1295 }
1296
1297 pub fn action_log(&self) -> &Entity<ActionLog> {
1298 &self.action_log
1299 }
1300
1301 pub fn project(&self) -> &Entity<Project> {
1302 &self.project
1303 }
1304
1305 pub fn title(&self) -> SharedString {
1306 self.provisional_title
1307 .clone()
1308 .unwrap_or_else(|| self.title.clone())
1309 }
1310
1311 pub fn has_provisional_title(&self) -> bool {
1312 self.provisional_title.is_some()
1313 }
1314
1315 pub fn entries(&self) -> &[AgentThreadEntry] {
1316 &self.entries
1317 }
1318
1319 pub fn session_id(&self) -> &acp::SessionId {
1320 &self.session_id
1321 }
1322
1323 pub fn work_dirs(&self) -> Option<&PathList> {
1324 self.work_dirs.as_ref()
1325 }
1326
1327 pub fn status(&self) -> ThreadStatus {
1328 if self.running_turn.is_some() {
1329 ThreadStatus::Generating
1330 } else {
1331 ThreadStatus::Idle
1332 }
1333 }
1334
1335 pub fn had_error(&self) -> bool {
1336 self.had_error
1337 }
1338
1339 pub fn is_waiting_for_confirmation(&self) -> bool {
1340 for entry in self.entries.iter().rev() {
1341 match entry {
1342 AgentThreadEntry::UserMessage(_) => return false,
1343 AgentThreadEntry::ToolCall(ToolCall {
1344 status: ToolCallStatus::WaitingForConfirmation { .. },
1345 ..
1346 }) => return true,
1347 AgentThreadEntry::ToolCall(_) | AgentThreadEntry::AssistantMessage(_) => {}
1348 }
1349 }
1350 false
1351 }
1352
1353 pub fn token_usage(&self) -> Option<&TokenUsage> {
1354 self.token_usage.as_ref()
1355 }
1356
1357 pub fn has_pending_edit_tool_calls(&self) -> bool {
1358 for entry in self.entries.iter().rev() {
1359 match entry {
1360 AgentThreadEntry::UserMessage(_) => return false,
1361 AgentThreadEntry::ToolCall(
1362 call @ ToolCall {
1363 status: ToolCallStatus::InProgress | ToolCallStatus::Pending,
1364 ..
1365 },
1366 ) if call.diffs().next().is_some() || Self::should_infer_external_edits(call) => {
1367 return true;
1368 }
1369 AgentThreadEntry::ToolCall(_) | AgentThreadEntry::AssistantMessage(_) => {}
1370 }
1371 }
1372
1373 false
1374 }
1375
1376 pub fn has_in_progress_tool_calls(&self) -> bool {
1377 for entry in self.entries.iter().rev() {
1378 match entry {
1379 AgentThreadEntry::UserMessage(_) => return false,
1380 AgentThreadEntry::ToolCall(ToolCall {
1381 status: ToolCallStatus::InProgress | ToolCallStatus::Pending,
1382 ..
1383 }) => {
1384 return true;
1385 }
1386 AgentThreadEntry::ToolCall(_) | AgentThreadEntry::AssistantMessage(_) => {}
1387 }
1388 }
1389
1390 false
1391 }
1392
1393 pub fn used_tools_since_last_user_message(&self) -> bool {
1394 for entry in self.entries.iter().rev() {
1395 match entry {
1396 AgentThreadEntry::UserMessage(..) => return false,
1397 AgentThreadEntry::AssistantMessage(..) => continue,
1398 AgentThreadEntry::ToolCall(..) => return true,
1399 }
1400 }
1401
1402 false
1403 }
1404
1405 pub fn handle_session_update(
1406 &mut self,
1407 update: acp::SessionUpdate,
1408 cx: &mut Context<Self>,
1409 ) -> Result<(), acp::Error> {
1410 match update {
1411 acp::SessionUpdate::UserMessageChunk(acp::ContentChunk { content, .. }) => {
1412 self.push_user_content_block(None, content, cx);
1413 }
1414 acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk { content, .. }) => {
1415 self.push_assistant_content_block(content, false, cx);
1416 }
1417 acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk { content, .. }) => {
1418 self.push_assistant_content_block(content, true, cx);
1419 }
1420 acp::SessionUpdate::ToolCall(tool_call) => {
1421 self.upsert_tool_call(tool_call, cx)?;
1422 }
1423 acp::SessionUpdate::ToolCallUpdate(tool_call_update) => {
1424 self.update_tool_call(tool_call_update, cx)?;
1425 }
1426 acp::SessionUpdate::Plan(plan) => {
1427 self.update_plan(plan, cx);
1428 }
1429 acp::SessionUpdate::SessionInfoUpdate(info_update) => {
1430 if let acp::MaybeUndefined::Value(title) = info_update.title {
1431 let had_provisional = self.provisional_title.take().is_some();
1432 let title: SharedString = title.into();
1433 if title != self.title {
1434 self.title = title;
1435 cx.emit(AcpThreadEvent::TitleUpdated);
1436 } else if had_provisional {
1437 cx.emit(AcpThreadEvent::TitleUpdated);
1438 }
1439 }
1440 }
1441 acp::SessionUpdate::AvailableCommandsUpdate(acp::AvailableCommandsUpdate {
1442 available_commands,
1443 ..
1444 }) => cx.emit(AcpThreadEvent::AvailableCommandsUpdated(available_commands)),
1445 acp::SessionUpdate::CurrentModeUpdate(acp::CurrentModeUpdate {
1446 current_mode_id,
1447 ..
1448 }) => cx.emit(AcpThreadEvent::ModeUpdated(current_mode_id)),
1449 acp::SessionUpdate::ConfigOptionUpdate(acp::ConfigOptionUpdate {
1450 config_options,
1451 ..
1452 }) => cx.emit(AcpThreadEvent::ConfigOptionsUpdated(config_options)),
1453 _ => {}
1454 }
1455 Ok(())
1456 }
1457
1458 pub fn push_user_content_block(
1459 &mut self,
1460 message_id: Option<UserMessageId>,
1461 chunk: acp::ContentBlock,
1462 cx: &mut Context<Self>,
1463 ) {
1464 self.push_user_content_block_with_indent(message_id, chunk, false, cx)
1465 }
1466
1467 pub fn push_user_content_block_with_indent(
1468 &mut self,
1469 message_id: Option<UserMessageId>,
1470 chunk: acp::ContentBlock,
1471 indented: bool,
1472 cx: &mut Context<Self>,
1473 ) {
1474 let language_registry = self.project.read(cx).languages().clone();
1475 let path_style = self.project.read(cx).path_style(cx);
1476 let entries_len = self.entries.len();
1477
1478 if let Some(last_entry) = self.entries.last_mut()
1479 && let AgentThreadEntry::UserMessage(UserMessage {
1480 id,
1481 content,
1482 chunks,
1483 indented: existing_indented,
1484 ..
1485 }) = last_entry
1486 && *existing_indented == indented
1487 {
1488 Self::flush_streaming_text(&mut self.streaming_text_buffer, cx);
1489 *id = message_id.or(id.take());
1490 content.append(chunk.clone(), &language_registry, path_style, cx);
1491 chunks.push(chunk);
1492 let idx = entries_len - 1;
1493 cx.emit(AcpThreadEvent::EntryUpdated(idx));
1494 } else {
1495 let content = ContentBlock::new(chunk.clone(), &language_registry, path_style, cx);
1496 self.push_entry(
1497 AgentThreadEntry::UserMessage(UserMessage {
1498 id: message_id,
1499 content,
1500 chunks: vec![chunk],
1501 checkpoint: None,
1502 indented,
1503 }),
1504 cx,
1505 );
1506 }
1507 }
1508
1509 pub fn push_assistant_content_block(
1510 &mut self,
1511 chunk: acp::ContentBlock,
1512 is_thought: bool,
1513 cx: &mut Context<Self>,
1514 ) {
1515 self.push_assistant_content_block_with_indent(chunk, is_thought, false, cx)
1516 }
1517
1518 pub fn push_assistant_content_block_with_indent(
1519 &mut self,
1520 chunk: acp::ContentBlock,
1521 is_thought: bool,
1522 indented: bool,
1523 cx: &mut Context<Self>,
1524 ) {
1525 let path_style = self.project.read(cx).path_style(cx);
1526
1527 // For text chunks going to an existing Markdown block, buffer for smooth
1528 // streaming instead of appending all at once which may feel more choppy.
1529 if let acp::ContentBlock::Text(text_content) = &chunk {
1530 if let Some(markdown) = self.streaming_markdown_target(is_thought, indented) {
1531 let entries_len = self.entries.len();
1532 cx.emit(AcpThreadEvent::EntryUpdated(entries_len - 1));
1533 self.buffer_streaming_text(&markdown, text_content.text.clone(), cx);
1534 return;
1535 }
1536 }
1537
1538 let language_registry = self.project.read(cx).languages().clone();
1539 let entries_len = self.entries.len();
1540 if let Some(last_entry) = self.entries.last_mut()
1541 && let AgentThreadEntry::AssistantMessage(AssistantMessage {
1542 chunks,
1543 indented: existing_indented,
1544 is_subagent_output: _,
1545 }) = last_entry
1546 && *existing_indented == indented
1547 {
1548 let idx = entries_len - 1;
1549 Self::flush_streaming_text(&mut self.streaming_text_buffer, cx);
1550 cx.emit(AcpThreadEvent::EntryUpdated(idx));
1551 match (chunks.last_mut(), is_thought) {
1552 (Some(AssistantMessageChunk::Message { block }), false)
1553 | (Some(AssistantMessageChunk::Thought { block }), true) => {
1554 block.append(chunk, &language_registry, path_style, cx)
1555 }
1556 _ => {
1557 let block = ContentBlock::new(chunk, &language_registry, path_style, cx);
1558 if is_thought {
1559 chunks.push(AssistantMessageChunk::Thought { block })
1560 } else {
1561 chunks.push(AssistantMessageChunk::Message { block })
1562 }
1563 }
1564 }
1565 } else {
1566 let block = ContentBlock::new(chunk, &language_registry, path_style, cx);
1567 let chunk = if is_thought {
1568 AssistantMessageChunk::Thought { block }
1569 } else {
1570 AssistantMessageChunk::Message { block }
1571 };
1572
1573 self.push_entry(
1574 AgentThreadEntry::AssistantMessage(AssistantMessage {
1575 chunks: vec![chunk],
1576 indented,
1577 is_subagent_output: false,
1578 }),
1579 cx,
1580 );
1581 }
1582 }
1583
1584 fn streaming_markdown_target(
1585 &self,
1586 is_thought: bool,
1587 indented: bool,
1588 ) -> Option<Entity<Markdown>> {
1589 let last_entry = self.entries.last()?;
1590 if let AgentThreadEntry::AssistantMessage(AssistantMessage {
1591 chunks,
1592 indented: existing_indented,
1593 ..
1594 }) = last_entry
1595 && *existing_indented == indented
1596 && let [.., chunk] = chunks.as_slice()
1597 {
1598 match (chunk, is_thought) {
1599 (
1600 AssistantMessageChunk::Message {
1601 block: ContentBlock::Markdown { markdown },
1602 },
1603 false,
1604 )
1605 | (
1606 AssistantMessageChunk::Thought {
1607 block: ContentBlock::Markdown { markdown },
1608 },
1609 true,
1610 ) => Some(markdown.clone()),
1611 _ => None,
1612 }
1613 } else {
1614 None
1615 }
1616 }
1617
1618 /// Add text to the streaming buffer. If the target changed (e.g. switching
1619 /// from thoughts to message text), flush the old buffer first.
1620 fn buffer_streaming_text(
1621 &mut self,
1622 markdown: &Entity<Markdown>,
1623 text: String,
1624 cx: &mut Context<Self>,
1625 ) {
1626 if let Some(buffer) = &mut self.streaming_text_buffer {
1627 if buffer.target.entity_id() == markdown.entity_id() {
1628 buffer.pending.push_str(&text);
1629
1630 buffer.bytes_to_reveal_per_tick = (buffer.pending.len() as f32
1631 / StreamingTextBuffer::REVEAL_TARGET
1632 * StreamingTextBuffer::TASK_UPDATE_MS as f32)
1633 .ceil() as usize;
1634 return;
1635 }
1636 Self::flush_streaming_text(&mut self.streaming_text_buffer, cx);
1637 }
1638
1639 let target = markdown.clone();
1640 let _reveal_task = self.start_streaming_reveal(cx);
1641 let pending_len = text.len();
1642 let bytes_to_reveal = (pending_len as f32 / StreamingTextBuffer::REVEAL_TARGET
1643 * StreamingTextBuffer::TASK_UPDATE_MS as f32)
1644 .ceil() as usize;
1645 self.streaming_text_buffer = Some(StreamingTextBuffer {
1646 pending: text,
1647 bytes_to_reveal_per_tick: bytes_to_reveal,
1648 target,
1649 _reveal_task,
1650 });
1651 }
1652
1653 /// Flush all buffered streaming text into the Markdown entity immediately.
1654 fn flush_streaming_text(
1655 streaming_text_buffer: &mut Option<StreamingTextBuffer>,
1656 cx: &mut Context<Self>,
1657 ) {
1658 if let Some(buffer) = streaming_text_buffer.take() {
1659 if !buffer.pending.is_empty() {
1660 buffer
1661 .target
1662 .update(cx, |markdown, cx| markdown.append(&buffer.pending, cx));
1663 }
1664 }
1665 }
1666
1667 /// Spawns a foreground task that periodically drains
1668 /// `streaming_text_buffer.pending` into the target `Markdown` entity,
1669 /// producing smooth, continuous text output.
1670 fn start_streaming_reveal(&self, cx: &mut Context<Self>) -> Task<()> {
1671 cx.spawn(async move |this, cx| {
1672 loop {
1673 cx.background_executor()
1674 .timer(Duration::from_millis(StreamingTextBuffer::TASK_UPDATE_MS))
1675 .await;
1676
1677 let should_continue = this
1678 .update(cx, |this, cx| {
1679 let Some(buffer) = &mut this.streaming_text_buffer else {
1680 return false;
1681 };
1682
1683 if buffer.pending.is_empty() {
1684 return true;
1685 }
1686
1687 let pending_len = buffer.pending.len();
1688
1689 let byte_boundary = buffer
1690 .pending
1691 .ceil_char_boundary(buffer.bytes_to_reveal_per_tick)
1692 .min(pending_len);
1693
1694 buffer.target.update(cx, |markdown: &mut Markdown, cx| {
1695 markdown.append(&buffer.pending[..byte_boundary], cx);
1696 buffer.pending.drain(..byte_boundary);
1697 });
1698
1699 true
1700 })
1701 .unwrap_or(false);
1702
1703 if !should_continue {
1704 break;
1705 }
1706 }
1707 })
1708 }
1709
1710 fn push_entry(&mut self, entry: AgentThreadEntry, cx: &mut Context<Self>) {
1711 Self::flush_streaming_text(&mut self.streaming_text_buffer, cx);
1712 self.entries.push(entry);
1713 cx.emit(AcpThreadEvent::NewEntry);
1714 }
1715
1716 pub fn can_set_title(&mut self, cx: &mut Context<Self>) -> bool {
1717 self.connection.set_title(&self.session_id, cx).is_some()
1718 }
1719
1720 pub fn set_title(&mut self, title: SharedString, cx: &mut Context<Self>) -> Task<Result<()>> {
1721 let had_provisional = self.provisional_title.take().is_some();
1722 if title != self.title {
1723 self.title = title.clone();
1724 cx.emit(AcpThreadEvent::TitleUpdated);
1725 if let Some(set_title) = self.connection.set_title(&self.session_id, cx) {
1726 return set_title.run(title, cx);
1727 }
1728 } else if had_provisional {
1729 cx.emit(AcpThreadEvent::TitleUpdated);
1730 }
1731 Task::ready(Ok(()))
1732 }
1733
1734 /// Sets a provisional display title without propagating back to the
1735 /// underlying agent connection. This is used for quick preview titles
1736 /// (e.g. first 20 chars of the user message) that should be shown
1737 /// immediately but replaced once the LLM generates a proper title via
1738 /// `set_title`.
1739 pub fn set_provisional_title(&mut self, title: SharedString, cx: &mut Context<Self>) {
1740 self.provisional_title = Some(title);
1741 cx.emit(AcpThreadEvent::TitleUpdated);
1742 }
1743
1744 pub fn subagent_spawned(&mut self, session_id: acp::SessionId, cx: &mut Context<Self>) {
1745 cx.emit(AcpThreadEvent::SubagentSpawned(session_id));
1746 }
1747
1748 pub fn update_token_usage(&mut self, usage: Option<TokenUsage>, cx: &mut Context<Self>) {
1749 self.token_usage = usage;
1750 cx.emit(AcpThreadEvent::TokenUsageUpdated);
1751 }
1752
1753 pub fn update_retry_status(&mut self, status: RetryStatus, cx: &mut Context<Self>) {
1754 cx.emit(AcpThreadEvent::Retry(status));
1755 }
1756
1757 fn should_infer_external_edits(tool_call: &ToolCall) -> bool {
1758 tool_call.tool_name.is_none()
1759 && tool_call.kind == acp::ToolKind::Edit
1760 && tool_call.diffs().next().is_none()
1761 && !tool_call.locations.is_empty()
1762 }
1763
1764 fn should_track_inferred_external_edits(tool_call: &ToolCall) -> bool {
1765 Self::should_infer_external_edits(tool_call)
1766 && matches!(
1767 tool_call.status,
1768 ToolCallStatus::Pending
1769 | ToolCallStatus::InProgress
1770 | ToolCallStatus::Completed
1771 | ToolCallStatus::Failed
1772 )
1773 }
1774
1775 fn is_inferred_edit_terminal_status(status: &ToolCallStatus) -> bool {
1776 matches!(status, ToolCallStatus::Completed | ToolCallStatus::Failed)
1777 }
1778
1779 fn allocate_inferred_edit_candidate_nonce(&mut self) -> u64 {
1780 let nonce = self.next_inferred_edit_candidate_nonce;
1781 self.next_inferred_edit_candidate_nonce =
1782 self.next_inferred_edit_candidate_nonce.wrapping_add(1);
1783 nonce
1784 }
1785
1786 fn end_expected_external_edits(
1787 &mut self,
1788 buffers: impl IntoIterator<Item = Entity<Buffer>>,
1789 cx: &mut Context<Self>,
1790 ) {
1791 for buffer in buffers {
1792 self.action_log.update(cx, |action_log, cx| {
1793 action_log.end_expected_external_edit(buffer, cx);
1794 });
1795 }
1796 }
1797
1798 fn inferred_edit_tracking_turn_id(&self) -> u32 {
1799 self.running_turn
1800 .as_ref()
1801 .map_or(self.turn_id, |turn| turn.id)
1802 }
1803
1804 fn inferred_edit_tool_call_belongs_to_turn(
1805 &self,
1806 tool_call_id: &acp::ToolCallId,
1807 turn_id: u32,
1808 ) -> bool {
1809 self.inferred_edit_tool_call_turns.get(tool_call_id) == Some(&turn_id)
1810 }
1811
1812 fn record_inferred_edit_tool_call_turn_if_needed(
1813 &mut self,
1814 tool_call_id: &acp::ToolCallId,
1815 cx: &mut Context<Self>,
1816 ) {
1817 let turn_id = self.inferred_edit_tracking_turn_id();
1818 if self.inferred_edit_tool_call_belongs_to_turn(tool_call_id, turn_id) {
1819 return;
1820 }
1821
1822 let buffers_to_end = self.clear_inferred_edit_tool_call_tracking(tool_call_id);
1823 self.end_expected_external_edits(buffers_to_end, cx);
1824
1825 self.inferred_edit_tool_call_turns
1826 .insert(tool_call_id.clone(), turn_id);
1827 }
1828
1829 fn remove_inferred_edit_tool_call_if_empty(&mut self, tool_call_id: &acp::ToolCallId) {
1830 let remove_tool_call = self
1831 .inferred_edit_candidates
1832 .get(tool_call_id)
1833 .is_some_and(|candidates| candidates.is_empty());
1834
1835 if remove_tool_call {
1836 self.inferred_edit_candidates.remove(tool_call_id);
1837 self.inferred_edit_tool_call_turns.remove(tool_call_id);
1838 self.finalizing_inferred_edit_tool_calls
1839 .remove(tool_call_id);
1840 }
1841 }
1842
1843 fn clear_inferred_edit_tool_call_tracking(
1844 &mut self,
1845 tool_call_id: &acp::ToolCallId,
1846 ) -> Vec<Entity<Buffer>> {
1847 let buffers_to_end = self
1848 .inferred_edit_candidates
1849 .remove(tool_call_id)
1850 .into_iter()
1851 .flat_map(|candidates| candidates.into_values())
1852 .filter_map(|candidate_state| candidate_state.into_buffer_to_end())
1853 .collect::<Vec<_>>();
1854
1855 self.inferred_edit_tool_call_turns.remove(tool_call_id);
1856 self.finalizing_inferred_edit_tool_calls
1857 .remove(tool_call_id);
1858
1859 buffers_to_end
1860 }
1861
1862 fn remove_inferred_edit_candidate_if_matching(
1863 &mut self,
1864 tool_call_id: &acp::ToolCallId,
1865 abs_path: &PathBuf,
1866 nonce: u64,
1867 cx: &mut Context<Self>,
1868 ) {
1869 let buffer_to_end =
1870 if let Some(candidates) = self.inferred_edit_candidates.get_mut(tool_call_id) {
1871 let should_remove = candidates
1872 .get(abs_path)
1873 .is_some_and(|candidate_state| candidate_state.nonce() == nonce);
1874
1875 if should_remove {
1876 candidates
1877 .remove(abs_path)
1878 .and_then(|candidate_state| candidate_state.into_buffer_to_end())
1879 } else {
1880 None
1881 }
1882 } else {
1883 None
1884 };
1885
1886 self.end_expected_external_edits(buffer_to_end, cx);
1887 self.remove_inferred_edit_tool_call_if_empty(tool_call_id);
1888 }
1889
1890 fn clear_inferred_edit_candidates_for_tool_calls(
1891 &mut self,
1892 tool_call_ids: impl IntoIterator<Item = acp::ToolCallId>,
1893 cx: &mut Context<Self>,
1894 ) {
1895 let mut buffers_to_end = Vec::new();
1896
1897 for tool_call_id in tool_call_ids {
1898 buffers_to_end.extend(self.clear_inferred_edit_tool_call_tracking(&tool_call_id));
1899 }
1900
1901 self.end_expected_external_edits(buffers_to_end, cx);
1902 }
1903
1904 fn finalize_all_inferred_edit_tool_calls_for_turn(
1905 &mut self,
1906 turn_id: u32,
1907 cx: &mut Context<Self>,
1908 ) {
1909 let tool_call_ids = self
1910 .inferred_edit_candidates
1911 .keys()
1912 .filter(|tool_call_id| {
1913 self.inferred_edit_tool_call_belongs_to_turn(tool_call_id, turn_id)
1914 })
1915 .filter(|tool_call_id| {
1916 self.tool_call(tool_call_id).is_some_and(|(_, tool_call)| {
1917 Self::should_track_inferred_external_edits(tool_call)
1918 && Self::is_inferred_edit_terminal_status(&tool_call.status)
1919 })
1920 })
1921 .cloned()
1922 .collect::<Vec<_>>();
1923 for tool_call_id in tool_call_ids {
1924 self.finalize_inferred_edit_tool_call(tool_call_id, cx);
1925 }
1926 }
1927
1928 fn finish_inferred_edit_tracking_for_stopped_turn(
1929 &mut self,
1930 turn_id: u32,
1931 cx: &mut Context<Self>,
1932 ) {
1933 self.finalize_all_inferred_edit_tool_calls_for_turn(turn_id, cx);
1934
1935 let tool_call_ids_to_clear = self
1936 .inferred_edit_candidates
1937 .keys()
1938 .filter(|tool_call_id| {
1939 self.inferred_edit_tool_call_belongs_to_turn(tool_call_id, turn_id)
1940 })
1941 .filter(|tool_call_id| {
1942 !self.tool_call(tool_call_id).is_some_and(|(_, tool_call)| {
1943 Self::should_track_inferred_external_edits(tool_call)
1944 && Self::is_inferred_edit_terminal_status(&tool_call.status)
1945 })
1946 })
1947 .cloned()
1948 .collect::<Vec<_>>();
1949
1950 self.clear_inferred_edit_candidates_for_tool_calls(tool_call_ids_to_clear, cx);
1951 }
1952
1953 fn sync_inferred_edit_candidate_paths(
1954 &mut self,
1955 tool_call_id: &acp::ToolCallId,
1956 locations: &[acp::ToolCallLocation],
1957 cx: &mut Context<Self>,
1958 ) {
1959 let mut current_paths = HashSet::default();
1960 for location in locations {
1961 current_paths.insert(location.path.clone());
1962 }
1963
1964 let buffers_to_end =
1965 if let Some(candidates) = self.inferred_edit_candidates.get_mut(tool_call_id) {
1966 let removed_paths = candidates
1967 .keys()
1968 .filter(|path| !current_paths.contains(*path))
1969 .cloned()
1970 .collect::<Vec<_>>();
1971
1972 removed_paths
1973 .into_iter()
1974 .filter_map(|path| {
1975 candidates
1976 .remove(&path)
1977 .and_then(|candidate_state| candidate_state.into_buffer_to_end())
1978 })
1979 .collect::<Vec<_>>()
1980 } else {
1981 Vec::new()
1982 };
1983
1984 self.end_expected_external_edits(buffers_to_end, cx);
1985 self.remove_inferred_edit_tool_call_if_empty(tool_call_id);
1986 }
1987
1988 fn register_inferred_edit_locations(
1989 &mut self,
1990 tool_call_id: acp::ToolCallId,
1991 locations: &[acp::ToolCallLocation],
1992 cx: &mut Context<Self>,
1993 ) {
1994 self.sync_inferred_edit_candidate_paths(&tool_call_id, locations, cx);
1995
1996 let mut unique_paths = HashSet::default();
1997 for location in locations {
1998 let abs_path = location.path.clone();
1999 if !unique_paths.insert(abs_path.clone()) {
2000 continue;
2001 }
2002
2003 let nonce = self.allocate_inferred_edit_candidate_nonce();
2004 let candidates = self
2005 .inferred_edit_candidates
2006 .entry(tool_call_id.clone())
2007 .or_default();
2008 if candidates.contains_key(&abs_path) {
2009 continue;
2010 }
2011
2012 candidates.insert(
2013 abs_path.clone(),
2014 InferredEditCandidateState::Pending { nonce },
2015 );
2016
2017 let open_buffer = self.project.update(cx, |project, cx| {
2018 let project_path = project.project_path_for_absolute_path(&abs_path, cx)?;
2019 if let Some(buffer) = project.get_open_buffer(&project_path, cx) {
2020 return Some((Some(buffer), None));
2021 }
2022
2023 Some((None, Some(project.open_buffer(project_path, cx))))
2024 });
2025
2026 let Some((ready_buffer, open_buffer)) = open_buffer else {
2027 self.remove_inferred_edit_candidate_if_matching(
2028 &tool_call_id,
2029 &abs_path,
2030 nonce,
2031 cx,
2032 );
2033 continue;
2034 };
2035
2036 if let Some(buffer) = ready_buffer {
2037 self.set_inferred_edit_candidate_ready(
2038 tool_call_id.clone(),
2039 abs_path,
2040 nonce,
2041 buffer,
2042 cx,
2043 );
2044 continue;
2045 }
2046
2047 let Some(open_buffer) = open_buffer else {
2048 self.remove_inferred_edit_candidate_if_matching(
2049 &tool_call_id,
2050 &abs_path,
2051 nonce,
2052 cx,
2053 );
2054 continue;
2055 };
2056
2057 let tool_call_id = tool_call_id.clone();
2058 cx.spawn::<_, anyhow::Result<()>>(async move |this, cx| {
2059 let buffer = match open_buffer.await {
2060 Ok(buffer) => buffer,
2061 Err(_) => {
2062 this.update(cx, |this, cx| {
2063 this.remove_inferred_edit_candidate_if_matching(
2064 &tool_call_id,
2065 &abs_path,
2066 nonce,
2067 cx,
2068 );
2069 })
2070 .ok();
2071 return Ok(());
2072 }
2073 };
2074
2075 this.update(cx, |this, cx| {
2076 this.set_inferred_edit_candidate_ready(
2077 tool_call_id.clone(),
2078 abs_path.clone(),
2079 nonce,
2080 buffer,
2081 cx,
2082 );
2083 })
2084 .ok();
2085
2086 Ok(())
2087 })
2088 .detach_and_log_err(cx);
2089 }
2090 }
2091
2092 fn set_inferred_edit_candidate_ready(
2093 &mut self,
2094 tool_call_id: acp::ToolCallId,
2095 abs_path: PathBuf,
2096 nonce: u64,
2097 buffer: Entity<Buffer>,
2098 cx: &mut Context<Self>,
2099 ) {
2100 let buffer_for_action_log = {
2101 let Some(candidates) = self.inferred_edit_candidates.get_mut(&tool_call_id) else {
2102 return;
2103 };
2104 let Some(candidate_state) = candidates.get_mut(&abs_path) else {
2105 return;
2106 };
2107 if candidate_state.nonce() != nonce {
2108 return;
2109 }
2110
2111 *candidate_state = InferredEditCandidateState::Ready(InferredEditCandidateReady {
2112 nonce,
2113 buffer: buffer.clone(),
2114 });
2115 buffer
2116 };
2117
2118 self.action_log.update(cx, |action_log, cx| {
2119 action_log.begin_expected_external_edit(buffer_for_action_log, cx);
2120 });
2121
2122 if self
2123 .finalizing_inferred_edit_tool_calls
2124 .contains(&tool_call_id)
2125 {
2126 self.start_finalizing_ready_inferred_edit_candidates(tool_call_id, cx);
2127 }
2128 }
2129
2130 fn start_finalizing_ready_inferred_edit_candidates(
2131 &mut self,
2132 tool_call_id: acp::ToolCallId,
2133 cx: &mut Context<Self>,
2134 ) {
2135 let ready_candidates =
2136 if let Some(candidates) = self.inferred_edit_candidates.get_mut(&tool_call_id) {
2137 let ready_candidates = candidates
2138 .iter()
2139 .filter_map(|(abs_path, candidate_state)| match candidate_state {
2140 InferredEditCandidateState::Ready(candidate) => {
2141 Some((abs_path.clone(), candidate.clone()))
2142 }
2143 InferredEditCandidateState::Pending { .. }
2144 | InferredEditCandidateState::Finalizing(_) => None,
2145 })
2146 .collect::<Vec<_>>();
2147
2148 for (abs_path, candidate) in &ready_candidates {
2149 let Some(candidate_state) = candidates.get_mut(abs_path) else {
2150 continue;
2151 };
2152 if candidate_state.nonce() != candidate.nonce {
2153 continue;
2154 }
2155
2156 *candidate_state = InferredEditCandidateState::Finalizing(candidate.clone());
2157 }
2158
2159 ready_candidates
2160 } else {
2161 Vec::new()
2162 };
2163
2164 for (abs_path, candidate) in ready_candidates {
2165 self.finalize_inferred_edit_candidate(tool_call_id.clone(), abs_path, candidate, cx);
2166 }
2167
2168 if !self.inferred_edit_candidates.contains_key(&tool_call_id) {
2169 self.finalizing_inferred_edit_tool_calls
2170 .remove(&tool_call_id);
2171 }
2172 }
2173
2174 fn finalize_inferred_edit_candidate(
2175 &mut self,
2176 tool_call_id: acp::ToolCallId,
2177 abs_path: PathBuf,
2178 candidate: InferredEditCandidateReady,
2179 cx: &mut Context<Self>,
2180 ) {
2181 let nonce = candidate.nonce;
2182 let buffer = candidate.buffer;
2183 let should_reload = buffer.read_with(cx, |buffer, _| !buffer.is_dirty());
2184 if !should_reload {
2185 self.remove_inferred_edit_candidate_if_matching(&tool_call_id, &abs_path, nonce, cx);
2186 return;
2187 }
2188
2189 self.action_log.update(cx, |action_log, cx| {
2190 action_log.arm_expected_external_reload(buffer.clone(), cx);
2191 });
2192
2193 let project = self.project.clone();
2194 cx.spawn::<_, anyhow::Result<()>>(async move |this, cx| {
2195 let reload = project.update(cx, |project, cx| {
2196 let mut buffers = HashSet::default();
2197 buffers.insert(buffer.clone());
2198 project.reload_buffers(buffers, false, cx)
2199 });
2200 reload.await.log_err();
2201
2202 this.update(cx, |this, cx| {
2203 this.remove_inferred_edit_candidate_if_matching(
2204 &tool_call_id,
2205 &abs_path,
2206 nonce,
2207 cx,
2208 );
2209 })
2210 .ok();
2211
2212 Ok(())
2213 })
2214 .detach_and_log_err(cx);
2215 }
2216
2217 fn finalize_inferred_edit_tool_call(
2218 &mut self,
2219 tool_call_id: acp::ToolCallId,
2220 cx: &mut Context<Self>,
2221 ) {
2222 let should_finalize = self.tool_call(&tool_call_id).is_some_and(|(_, tool_call)| {
2223 Self::should_infer_external_edits(tool_call)
2224 && Self::is_inferred_edit_terminal_status(&tool_call.status)
2225 });
2226 if !should_finalize {
2227 self.clear_inferred_edit_candidates_for_tool_calls([tool_call_id], cx);
2228 return;
2229 }
2230
2231 self.finalizing_inferred_edit_tool_calls
2232 .insert(tool_call_id.clone());
2233 self.start_finalizing_ready_inferred_edit_candidates(tool_call_id, cx);
2234 }
2235
2236 fn refresh_inferred_edit_tool_call(
2237 &mut self,
2238 tool_call_id: acp::ToolCallId,
2239 cx: &mut Context<Self>,
2240 ) {
2241 let Some((_, tool_call)) = self.tool_call(&tool_call_id) else {
2242 return;
2243 };
2244
2245 let should_track = Self::should_track_inferred_external_edits(tool_call);
2246 let should_finalize = Self::is_inferred_edit_terminal_status(&tool_call.status);
2247 let locations = tool_call.locations.clone();
2248
2249 if !should_track {
2250 self.clear_inferred_edit_candidates_for_tool_calls([tool_call_id], cx);
2251 return;
2252 }
2253
2254 self.record_inferred_edit_tool_call_turn_if_needed(&tool_call_id, cx);
2255 self.register_inferred_edit_locations(tool_call_id.clone(), &locations, cx);
2256
2257 if should_finalize {
2258 self.finalize_inferred_edit_tool_call(tool_call_id, cx);
2259 }
2260 }
2261
2262 pub fn update_tool_call(
2263 &mut self,
2264 update: impl Into<ToolCallUpdate>,
2265 cx: &mut Context<Self>,
2266 ) -> Result<()> {
2267 let update = update.into();
2268 let tool_call_id = update.id().clone();
2269 let languages = self.project.read(cx).languages().clone();
2270 let path_style = self.project.read(cx).path_style(cx);
2271
2272 let ix = match self.index_for_tool_call(&tool_call_id) {
2273 Some(ix) => ix,
2274 None => {
2275 // Tool call not found - create a failed tool call entry
2276 let failed_tool_call = ToolCall {
2277 id: tool_call_id,
2278 label: cx.new(|cx| Markdown::new("Tool call not found".into(), None, None, cx)),
2279 kind: acp::ToolKind::Fetch,
2280 content: vec![ToolCallContent::ContentBlock(ContentBlock::new(
2281 "Tool call not found".into(),
2282 &languages,
2283 path_style,
2284 cx,
2285 ))],
2286 status: ToolCallStatus::Failed,
2287 locations: Vec::new(),
2288 resolved_locations: Vec::new(),
2289 raw_input: None,
2290 raw_input_markdown: None,
2291 raw_output: None,
2292 tool_name: None,
2293 subagent_session_info: None,
2294 };
2295 self.push_entry(AgentThreadEntry::ToolCall(failed_tool_call), cx);
2296 return Ok(());
2297 }
2298 };
2299 let AgentThreadEntry::ToolCall(call) = &mut self.entries[ix] else {
2300 unreachable!()
2301 };
2302
2303 match update {
2304 ToolCallUpdate::UpdateFields(update) => {
2305 let location_updated = update.fields.locations.is_some();
2306 call.update_fields(
2307 update.fields,
2308 update.meta,
2309 languages,
2310 path_style,
2311 &self.terminals,
2312 cx,
2313 )?;
2314 if location_updated {
2315 self.resolve_locations(update.tool_call_id, cx);
2316 }
2317 }
2318 ToolCallUpdate::UpdateDiff(update) => {
2319 call.content.clear();
2320 call.content.push(ToolCallContent::Diff(update.diff));
2321 }
2322 ToolCallUpdate::UpdateTerminal(update) => {
2323 call.content.clear();
2324 call.content
2325 .push(ToolCallContent::Terminal(update.terminal));
2326 }
2327 }
2328
2329 cx.emit(AcpThreadEvent::EntryUpdated(ix));
2330 self.refresh_inferred_edit_tool_call(tool_call_id, cx);
2331
2332 Ok(())
2333 }
2334
2335 /// Updates a tool call if id matches an existing entry, otherwise inserts a new one.
2336 pub fn upsert_tool_call(
2337 &mut self,
2338 tool_call: acp::ToolCall,
2339 cx: &mut Context<Self>,
2340 ) -> Result<(), acp::Error> {
2341 let status = tool_call.status.into();
2342 self.upsert_tool_call_inner(tool_call.into(), status, cx)
2343 }
2344
2345 /// Fails if id does not match an existing entry.
2346 pub fn upsert_tool_call_inner(
2347 &mut self,
2348 update: acp::ToolCallUpdate,
2349 status: ToolCallStatus,
2350 cx: &mut Context<Self>,
2351 ) -> Result<(), acp::Error> {
2352 let language_registry = self.project.read(cx).languages().clone();
2353 let path_style = self.project.read(cx).path_style(cx);
2354 let id = update.tool_call_id.clone();
2355
2356 let agent_telemetry_id = self.connection().telemetry_id();
2357 let session = self.session_id();
2358 let parent_session_id = self.parent_session_id();
2359 if let ToolCallStatus::Completed | ToolCallStatus::Failed = status {
2360 let status = if matches!(status, ToolCallStatus::Completed) {
2361 "completed"
2362 } else {
2363 "failed"
2364 };
2365 telemetry::event!(
2366 "Agent Tool Call Completed",
2367 agent_telemetry_id,
2368 session,
2369 parent_session_id,
2370 status
2371 );
2372 }
2373
2374 if let Some(ix) = self.index_for_tool_call(&id) {
2375 let AgentThreadEntry::ToolCall(call) = &mut self.entries[ix] else {
2376 unreachable!()
2377 };
2378
2379 call.update_fields(
2380 update.fields,
2381 update.meta,
2382 language_registry,
2383 path_style,
2384 &self.terminals,
2385 cx,
2386 )?;
2387 call.status = status;
2388
2389 cx.emit(AcpThreadEvent::EntryUpdated(ix));
2390 } else {
2391 let call = ToolCall::from_acp(
2392 update.try_into()?,
2393 status,
2394 language_registry,
2395 self.project.read(cx).path_style(cx),
2396 &self.terminals,
2397 cx,
2398 )?;
2399 self.push_entry(AgentThreadEntry::ToolCall(call), cx);
2400 };
2401
2402 self.resolve_locations(id.clone(), cx);
2403 self.refresh_inferred_edit_tool_call(id, cx);
2404 Ok(())
2405 }
2406
2407 fn index_for_tool_call(&self, id: &acp::ToolCallId) -> Option<usize> {
2408 self.entries
2409 .iter()
2410 .enumerate()
2411 .rev()
2412 .find_map(|(index, entry)| {
2413 if let AgentThreadEntry::ToolCall(tool_call) = entry
2414 && &tool_call.id == id
2415 {
2416 Some(index)
2417 } else {
2418 None
2419 }
2420 })
2421 }
2422
2423 fn tool_call_mut(&mut self, id: &acp::ToolCallId) -> Option<(usize, &mut ToolCall)> {
2424 // The tool call we are looking for is typically the last one, or very close to the end.
2425 // At the moment, it doesn't seem like a hashmap would be a good fit for this use case.
2426 self.entries
2427 .iter_mut()
2428 .enumerate()
2429 .rev()
2430 .find_map(|(index, tool_call)| {
2431 if let AgentThreadEntry::ToolCall(tool_call) = tool_call
2432 && &tool_call.id == id
2433 {
2434 Some((index, tool_call))
2435 } else {
2436 None
2437 }
2438 })
2439 }
2440
2441 pub fn tool_call(&self, id: &acp::ToolCallId) -> Option<(usize, &ToolCall)> {
2442 self.entries
2443 .iter()
2444 .enumerate()
2445 .rev()
2446 .find_map(|(index, tool_call)| {
2447 if let AgentThreadEntry::ToolCall(tool_call) = tool_call
2448 && &tool_call.id == id
2449 {
2450 Some((index, tool_call))
2451 } else {
2452 None
2453 }
2454 })
2455 }
2456
2457 pub fn tool_call_for_subagent(&self, session_id: &acp::SessionId) -> Option<&ToolCall> {
2458 self.entries.iter().find_map(|entry| match entry {
2459 AgentThreadEntry::ToolCall(tool_call) => {
2460 if let Some(subagent_session_info) = &tool_call.subagent_session_info
2461 && &subagent_session_info.session_id == session_id
2462 {
2463 Some(tool_call)
2464 } else {
2465 None
2466 }
2467 }
2468 _ => None,
2469 })
2470 }
2471
2472 pub fn resolve_locations(&mut self, id: acp::ToolCallId, cx: &mut Context<Self>) {
2473 let project = self.project.clone();
2474 let should_update_agent_location = self.parent_session_id.is_none();
2475 let Some((_, tool_call)) = self.tool_call_mut(&id) else {
2476 return;
2477 };
2478 let task = tool_call.resolve_locations(project, cx);
2479 cx.spawn(async move |this, cx| {
2480 let resolved_locations = task.await;
2481
2482 this.update(cx, |this, cx| {
2483 let project = this.project.clone();
2484
2485 for location in resolved_locations.iter().flatten() {
2486 this.shared_buffers
2487 .insert(location.buffer.clone(), location.buffer.read(cx).snapshot());
2488 }
2489 let Some((ix, tool_call)) = this.tool_call_mut(&id) else {
2490 return;
2491 };
2492
2493 if let Some(Some(location)) = resolved_locations.last() {
2494 project.update(cx, |project, cx| {
2495 let should_ignore = if let Some(agent_location) = project
2496 .agent_location()
2497 .filter(|agent_location| agent_location.buffer == location.buffer)
2498 {
2499 let snapshot = location.buffer.read(cx).snapshot();
2500 let old_position = agent_location.position.to_point(&snapshot);
2501 let new_position = location.position.to_point(&snapshot);
2502
2503 // ignore this so that when we get updates from the edit tool
2504 // the position doesn't reset to the startof line
2505 old_position.row == new_position.row
2506 && old_position.column > new_position.column
2507 } else {
2508 false
2509 };
2510 if !should_ignore && should_update_agent_location {
2511 project.set_agent_location(Some(location.into()), cx);
2512 }
2513 });
2514 }
2515
2516 let resolved_locations = resolved_locations
2517 .iter()
2518 .map(|l| l.as_ref().map(|l| AgentLocation::from(l)))
2519 .collect::<Vec<_>>();
2520
2521 if tool_call.resolved_locations != resolved_locations {
2522 tool_call.resolved_locations = resolved_locations;
2523 cx.emit(AcpThreadEvent::EntryUpdated(ix));
2524 }
2525 })
2526 })
2527 .detach();
2528 }
2529
2530 pub fn request_tool_call_authorization(
2531 &mut self,
2532 tool_call: acp::ToolCallUpdate,
2533 options: PermissionOptions,
2534 cx: &mut Context<Self>,
2535 ) -> Result<Task<RequestPermissionOutcome>> {
2536 let (tx, rx) = oneshot::channel();
2537
2538 let status = ToolCallStatus::WaitingForConfirmation {
2539 options,
2540 respond_tx: tx,
2541 };
2542
2543 let tool_call_id = tool_call.tool_call_id.clone();
2544 self.upsert_tool_call_inner(tool_call, status, cx)?;
2545 cx.emit(AcpThreadEvent::ToolAuthorizationRequested(
2546 tool_call_id.clone(),
2547 ));
2548
2549 Ok(cx.spawn(async move |this, cx| {
2550 let outcome = match rx.await {
2551 Ok(outcome) => RequestPermissionOutcome::Selected(outcome),
2552 Err(oneshot::Canceled) => RequestPermissionOutcome::Cancelled,
2553 };
2554 this.update(cx, |_this, cx| {
2555 cx.emit(AcpThreadEvent::ToolAuthorizationReceived(tool_call_id))
2556 })
2557 .ok();
2558 outcome
2559 }))
2560 }
2561
2562 pub fn authorize_tool_call(
2563 &mut self,
2564 id: acp::ToolCallId,
2565 outcome: SelectedPermissionOutcome,
2566 option_kind: acp::PermissionOptionKind,
2567 cx: &mut Context<Self>,
2568 ) {
2569 let Some((ix, call)) = self.tool_call_mut(&id) else {
2570 return;
2571 };
2572
2573 let ToolCallStatus::WaitingForConfirmation { respond_tx, .. } = &call.status else {
2574 return;
2575 };
2576
2577 if respond_tx.is_canceled() {
2578 log::warn!(
2579 "dropping stale tool authorization for call `{}` because it is no longer waiting for confirmation",
2580 id
2581 );
2582 return;
2583 }
2584
2585 let new_status = match option_kind {
2586 acp::PermissionOptionKind::RejectOnce | acp::PermissionOptionKind::RejectAlways => {
2587 ToolCallStatus::Rejected
2588 }
2589 acp::PermissionOptionKind::AllowOnce | acp::PermissionOptionKind::AllowAlways => {
2590 ToolCallStatus::InProgress
2591 }
2592 _ => ToolCallStatus::InProgress,
2593 };
2594
2595 let ToolCallStatus::WaitingForConfirmation { respond_tx, .. } =
2596 mem::replace(&mut call.status, ToolCallStatus::Canceled)
2597 else {
2598 return;
2599 };
2600
2601 if respond_tx.send(outcome).is_err() {
2602 log::warn!(
2603 "dropping stale tool authorization for call `{}` because it is no longer waiting for confirmation",
2604 id
2605 );
2606 return;
2607 }
2608
2609 call.status = new_status;
2610
2611 cx.emit(AcpThreadEvent::EntryUpdated(ix));
2612 self.refresh_inferred_edit_tool_call(id, cx);
2613 }
2614
2615 pub fn plan(&self) -> &Plan {
2616 &self.plan
2617 }
2618
2619 pub fn update_plan(&mut self, request: acp::Plan, cx: &mut Context<Self>) {
2620 let new_entries_len = request.entries.len();
2621 let mut new_entries = request.entries.into_iter();
2622
2623 // Reuse existing markdown to prevent flickering
2624 for (old, new) in self.plan.entries.iter_mut().zip(new_entries.by_ref()) {
2625 let PlanEntry {
2626 content,
2627 priority,
2628 status,
2629 } = old;
2630 content.update(cx, |old, cx| {
2631 old.replace(new.content, cx);
2632 });
2633 *priority = new.priority;
2634 *status = new.status;
2635 }
2636 for new in new_entries {
2637 self.plan.entries.push(PlanEntry::from_acp(new, cx))
2638 }
2639 self.plan.entries.truncate(new_entries_len);
2640
2641 cx.notify();
2642 }
2643
2644 fn clear_completed_plan_entries(&mut self, cx: &mut Context<Self>) {
2645 self.plan
2646 .entries
2647 .retain(|entry| !matches!(entry.status, acp::PlanEntryStatus::Completed));
2648 cx.notify();
2649 }
2650
2651 #[cfg(any(test, feature = "test-support"))]
2652 pub fn send_raw(
2653 &mut self,
2654 message: &str,
2655 cx: &mut Context<Self>,
2656 ) -> BoxFuture<'static, Result<Option<acp::PromptResponse>>> {
2657 self.send(vec![message.into()], cx)
2658 }
2659
2660 pub fn send(
2661 &mut self,
2662 message: Vec<acp::ContentBlock>,
2663 cx: &mut Context<Self>,
2664 ) -> BoxFuture<'static, Result<Option<acp::PromptResponse>>> {
2665 let block = ContentBlock::new_combined(
2666 message.clone(),
2667 self.project.read(cx).languages().clone(),
2668 self.project.read(cx).path_style(cx),
2669 cx,
2670 );
2671 let request = acp::PromptRequest::new(self.session_id.clone(), message.clone());
2672 let git_store = self.project.read(cx).git_store().clone();
2673
2674 let message_id = if self.connection.truncate(&self.session_id, cx).is_some() {
2675 Some(UserMessageId::new())
2676 } else {
2677 None
2678 };
2679
2680 self.run_turn(cx, async move |this, cx| {
2681 this.update(cx, |this, cx| {
2682 this.push_entry(
2683 AgentThreadEntry::UserMessage(UserMessage {
2684 id: message_id.clone(),
2685 content: block,
2686 chunks: message,
2687 checkpoint: None,
2688 indented: false,
2689 }),
2690 cx,
2691 );
2692 })
2693 .ok();
2694
2695 let old_checkpoint = git_store
2696 .update(cx, |git, cx| git.checkpoint(cx))
2697 .await
2698 .context("failed to get old checkpoint")
2699 .log_err();
2700 this.update(cx, |this, cx| {
2701 if let Some((_ix, message)) = this.last_user_message() {
2702 message.checkpoint = old_checkpoint.map(|git_checkpoint| Checkpoint {
2703 git_checkpoint,
2704 show: false,
2705 });
2706 }
2707 this.connection.prompt(message_id, request, cx)
2708 })?
2709 .await
2710 })
2711 }
2712
2713 pub fn can_retry(&self, cx: &App) -> bool {
2714 self.connection.retry(&self.session_id, cx).is_some()
2715 }
2716
2717 pub fn retry(
2718 &mut self,
2719 cx: &mut Context<Self>,
2720 ) -> BoxFuture<'static, Result<Option<acp::PromptResponse>>> {
2721 self.run_turn(cx, async move |this, cx| {
2722 this.update(cx, |this, cx| {
2723 this.connection
2724 .retry(&this.session_id, cx)
2725 .map(|retry| retry.run(cx))
2726 })?
2727 .context("retrying a session is not supported")?
2728 .await
2729 })
2730 }
2731
2732 fn run_turn(
2733 &mut self,
2734 cx: &mut Context<Self>,
2735 f: impl 'static + AsyncFnOnce(WeakEntity<Self>, &mut AsyncApp) -> Result<acp::PromptResponse>,
2736 ) -> BoxFuture<'static, Result<Option<acp::PromptResponse>>> {
2737 self.clear_completed_plan_entries(cx);
2738 self.had_error = false;
2739
2740 let (tx, rx) = oneshot::channel();
2741 let cancel_task = self.cancel(cx);
2742
2743 self.turn_id += 1;
2744 let turn_id = self.turn_id;
2745 self.running_turn = Some(RunningTurn {
2746 id: turn_id,
2747 send_task: cx.spawn(async move |this, cx| {
2748 cancel_task.await;
2749 tx.send(f(this, cx).await).ok();
2750 }),
2751 });
2752
2753 cx.spawn(async move |this, cx| {
2754 let response = rx.await;
2755
2756 this.update(cx, |this, cx| this.update_last_checkpoint(cx))?
2757 .await?;
2758
2759 this.update(cx, |this, cx| {
2760 if this.parent_session_id.is_none() {
2761 this.project
2762 .update(cx, |project, cx| project.set_agent_location(None, cx));
2763 }
2764 let Ok(response) = response else {
2765 // tx dropped, just return
2766 return Ok(None);
2767 };
2768
2769 let is_same_turn = this
2770 .running_turn
2771 .as_ref()
2772 .is_some_and(|turn| turn_id == turn.id);
2773
2774 // If the user submitted a follow up message, running_turn might
2775 // already point to a different turn. Therefore we only want to
2776 // take the task if it's the same turn.
2777 if is_same_turn {
2778 this.running_turn.take();
2779 }
2780
2781 match response {
2782 Ok(r) => {
2783 Self::flush_streaming_text(&mut this.streaming_text_buffer, cx);
2784
2785 if r.stop_reason == acp::StopReason::MaxTokens {
2786 this.finish_inferred_edit_tracking_for_stopped_turn(turn_id, cx);
2787 this.had_error = true;
2788 cx.emit(AcpThreadEvent::Error);
2789 log::error!("Max tokens reached. Usage: {:?}", this.token_usage);
2790 return Err(anyhow!("Max tokens reached"));
2791 }
2792
2793 let canceled = matches!(r.stop_reason, acp::StopReason::Cancelled);
2794 if canceled {
2795 this.mark_pending_tools_as_canceled();
2796 this.finish_inferred_edit_tracking_for_stopped_turn(turn_id, cx);
2797 }
2798
2799 // Handle refusal - distinguish between user prompt and tool call refusals
2800 if let acp::StopReason::Refusal = r.stop_reason {
2801 this.had_error = true;
2802 if let Some((user_msg_ix, _)) = this.last_user_message() {
2803 // Check if there's a completed tool call with results after the last user message
2804 // This indicates the refusal is in response to tool output, not the user's prompt
2805 let has_completed_tool_call_after_user_msg =
2806 this.entries.iter().skip(user_msg_ix + 1).any(|entry| {
2807 if let AgentThreadEntry::ToolCall(tool_call) = entry {
2808 // Check if the tool call has completed and has output
2809 matches!(tool_call.status, ToolCallStatus::Completed)
2810 && tool_call.raw_output.is_some()
2811 } else {
2812 false
2813 }
2814 });
2815
2816 if has_completed_tool_call_after_user_msg {
2817 // Refusal is due to tool output - don't truncate, just notify
2818 // The model refused based on what the tool returned
2819 cx.emit(AcpThreadEvent::Refusal);
2820 } else {
2821 // User prompt was refused - truncate back to before the user message
2822 let range = user_msg_ix..this.entries.len();
2823 if range.start < range.end {
2824 let removed_tool_call_ids = this.entries[user_msg_ix..]
2825 .iter()
2826 .filter_map(|entry| {
2827 if let AgentThreadEntry::ToolCall(tool_call) = entry
2828 {
2829 Some(tool_call.id.clone())
2830 } else {
2831 None
2832 }
2833 })
2834 .collect::<Vec<_>>();
2835 this.clear_inferred_edit_candidates_for_tool_calls(
2836 removed_tool_call_ids,
2837 cx,
2838 );
2839 this.entries.truncate(user_msg_ix);
2840 cx.emit(AcpThreadEvent::EntriesRemoved(range));
2841 }
2842 cx.emit(AcpThreadEvent::Refusal);
2843 }
2844 } else {
2845 // No user message found, treat as general refusal
2846 cx.emit(AcpThreadEvent::Refusal);
2847 }
2848 }
2849
2850 this.finish_inferred_edit_tracking_for_stopped_turn(turn_id, cx);
2851 cx.emit(AcpThreadEvent::Stopped(r.stop_reason));
2852 Ok(Some(r))
2853 }
2854 Err(e) => {
2855 Self::flush_streaming_text(&mut this.streaming_text_buffer, cx);
2856
2857 this.finish_inferred_edit_tracking_for_stopped_turn(turn_id, cx);
2858 this.had_error = true;
2859 cx.emit(AcpThreadEvent::Error);
2860 log::error!("Error in run turn: {:?}", e);
2861 Err(e)
2862 }
2863 }
2864 })?
2865 })
2866 .boxed()
2867 }
2868
2869 pub fn cancel(&mut self, cx: &mut Context<Self>) -> Task<()> {
2870 let Some(turn) = self.running_turn.take() else {
2871 return Task::ready(());
2872 };
2873 let turn_id = turn.id;
2874 self.connection.cancel(&self.session_id, cx);
2875
2876 Self::flush_streaming_text(&mut self.streaming_text_buffer, cx);
2877 self.mark_pending_tools_as_canceled();
2878 self.finish_inferred_edit_tracking_for_stopped_turn(turn_id, cx);
2879
2880 cx.background_spawn(turn.send_task)
2881 }
2882
2883 fn mark_pending_tools_as_canceled(&mut self) -> Vec<acp::ToolCallId> {
2884 let mut canceled_tool_call_ids = Vec::new();
2885
2886 for entry in self.entries.iter_mut() {
2887 if let AgentThreadEntry::ToolCall(call) = entry {
2888 let cancel = matches!(
2889 call.status,
2890 ToolCallStatus::Pending
2891 | ToolCallStatus::WaitingForConfirmation { .. }
2892 | ToolCallStatus::InProgress
2893 );
2894
2895 if cancel {
2896 call.status = ToolCallStatus::Canceled;
2897 canceled_tool_call_ids.push(call.id.clone());
2898 }
2899 }
2900 }
2901
2902 canceled_tool_call_ids
2903 }
2904
2905 /// Restores the git working tree to the state at the given checkpoint (if one exists)
2906 pub fn restore_checkpoint(
2907 &mut self,
2908 id: UserMessageId,
2909 cx: &mut Context<Self>,
2910 ) -> Task<Result<()>> {
2911 let Some((_, message)) = self.user_message_mut(&id) else {
2912 return Task::ready(Err(anyhow!("message not found")));
2913 };
2914
2915 let checkpoint = message
2916 .checkpoint
2917 .as_ref()
2918 .map(|c| c.git_checkpoint.clone());
2919
2920 // Cancel any in-progress generation before restoring
2921 let cancel_task = self.cancel(cx);
2922 let rewind = self.rewind(id.clone(), cx);
2923 let git_store = self.project.read(cx).git_store().clone();
2924
2925 cx.spawn(async move |_, cx| {
2926 cancel_task.await;
2927 rewind.await?;
2928 if let Some(checkpoint) = checkpoint {
2929 git_store
2930 .update(cx, |git, cx| git.restore_checkpoint(checkpoint, cx))
2931 .await?;
2932 }
2933
2934 Ok(())
2935 })
2936 }
2937
2938 /// Rewinds this thread to before the entry at `index`, removing it and all
2939 /// subsequent entries while rejecting any action_log changes made from that point.
2940 /// Unlike `restore_checkpoint`, this method does not restore from git.
2941 pub fn rewind(&mut self, id: UserMessageId, cx: &mut Context<Self>) -> Task<Result<()>> {
2942 let Some(truncate) = self.connection.truncate(&self.session_id, cx) else {
2943 return Task::ready(Err(anyhow!("not supported")));
2944 };
2945
2946 Self::flush_streaming_text(&mut self.streaming_text_buffer, cx);
2947 let telemetry = ActionLogTelemetry::from(&*self);
2948 cx.spawn(async move |this, cx| {
2949 cx.update(|cx| truncate.run(id.clone(), cx)).await?;
2950 this.update(cx, |this, cx| {
2951 if let Some((ix, _)) = this.user_message_mut(&id) {
2952 // Collect all terminals from entries that will be removed
2953 let terminals_to_remove: Vec<acp::TerminalId> = this.entries[ix..]
2954 .iter()
2955 .flat_map(|entry| entry.terminals())
2956 .filter_map(|terminal| terminal.read(cx).id().clone().into())
2957 .collect();
2958 let removed_tool_call_ids = this.entries[ix..]
2959 .iter()
2960 .filter_map(|entry| {
2961 if let AgentThreadEntry::ToolCall(tool_call) = entry {
2962 Some(tool_call.id.clone())
2963 } else {
2964 None
2965 }
2966 })
2967 .collect::<Vec<_>>();
2968
2969 let range = ix..this.entries.len();
2970 this.clear_inferred_edit_candidates_for_tool_calls(removed_tool_call_ids, cx);
2971 this.entries.truncate(ix);
2972 cx.emit(AcpThreadEvent::EntriesRemoved(range));
2973
2974 // Kill and remove the terminals
2975 for terminal_id in terminals_to_remove {
2976 if let Some(terminal) = this.terminals.remove(&terminal_id) {
2977 terminal.update(cx, |terminal, cx| {
2978 terminal.kill(cx);
2979 });
2980 }
2981 }
2982 }
2983 this.action_log().update(cx, |action_log, cx| {
2984 action_log.reject_all_edits(Some(telemetry), cx)
2985 })
2986 })?
2987 .await;
2988 Ok(())
2989 })
2990 }
2991
2992 fn update_last_checkpoint(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
2993 let git_store = self.project.read(cx).git_store().clone();
2994
2995 let Some((_, message)) = self.last_user_message() else {
2996 return Task::ready(Ok(()));
2997 };
2998 let Some(user_message_id) = message.id.clone() else {
2999 return Task::ready(Ok(()));
3000 };
3001 let Some(checkpoint) = message.checkpoint.as_ref() else {
3002 return Task::ready(Ok(()));
3003 };
3004 let old_checkpoint = checkpoint.git_checkpoint.clone();
3005
3006 let new_checkpoint = git_store.update(cx, |git, cx| git.checkpoint(cx));
3007 cx.spawn(async move |this, cx| {
3008 let Some(new_checkpoint) = new_checkpoint
3009 .await
3010 .context("failed to get new checkpoint")
3011 .log_err()
3012 else {
3013 return Ok(());
3014 };
3015
3016 let equal = git_store
3017 .update(cx, |git, cx| {
3018 git.compare_checkpoints(old_checkpoint.clone(), new_checkpoint, cx)
3019 })
3020 .await
3021 .unwrap_or(true);
3022
3023 this.update(cx, |this, cx| {
3024 if let Some((ix, message)) = this.user_message_mut(&user_message_id) {
3025 if let Some(checkpoint) = message.checkpoint.as_mut() {
3026 checkpoint.show = !equal;
3027 cx.emit(AcpThreadEvent::EntryUpdated(ix));
3028 }
3029 }
3030 })?;
3031
3032 Ok(())
3033 })
3034 }
3035
3036 fn last_user_message(&mut self) -> Option<(usize, &mut UserMessage)> {
3037 self.entries
3038 .iter_mut()
3039 .enumerate()
3040 .rev()
3041 .find_map(|(ix, entry)| {
3042 if let AgentThreadEntry::UserMessage(message) = entry {
3043 Some((ix, message))
3044 } else {
3045 None
3046 }
3047 })
3048 }
3049
3050 fn user_message_mut(&mut self, id: &UserMessageId) -> Option<(usize, &mut UserMessage)> {
3051 self.entries.iter_mut().enumerate().find_map(|(ix, entry)| {
3052 if let AgentThreadEntry::UserMessage(message) = entry {
3053 if message.id.as_ref() == Some(id) {
3054 Some((ix, message))
3055 } else {
3056 None
3057 }
3058 } else {
3059 None
3060 }
3061 })
3062 }
3063
3064 pub fn read_text_file(
3065 &self,
3066 path: PathBuf,
3067 line: Option<u32>,
3068 limit: Option<u32>,
3069 reuse_shared_snapshot: bool,
3070 cx: &mut Context<Self>,
3071 ) -> Task<Result<String, acp::Error>> {
3072 // Args are 1-based, move to 0-based
3073 let line = line.unwrap_or_default().saturating_sub(1);
3074 let limit = limit.unwrap_or(u32::MAX);
3075 let project = self.project.clone();
3076 let action_log = self.action_log.clone();
3077 let should_update_agent_location = self.parent_session_id.is_none();
3078 cx.spawn(async move |this, cx| {
3079 let load = project.update(cx, |project, cx| {
3080 let path = project
3081 .project_path_for_absolute_path(&path, cx)
3082 .ok_or_else(|| {
3083 acp::Error::resource_not_found(Some(path.display().to_string()))
3084 })?;
3085 Ok::<_, acp::Error>(project.open_buffer(path, cx))
3086 })?;
3087
3088 let buffer = load.await?;
3089
3090 let snapshot = if reuse_shared_snapshot {
3091 this.read_with(cx, |this, _| {
3092 this.shared_buffers.get(&buffer.clone()).cloned()
3093 })
3094 .log_err()
3095 .flatten()
3096 } else {
3097 None
3098 };
3099
3100 let snapshot = if let Some(snapshot) = snapshot {
3101 snapshot
3102 } else {
3103 action_log.update(cx, |action_log, cx| {
3104 action_log.buffer_read(buffer.clone(), cx);
3105 });
3106
3107 let snapshot = buffer.update(cx, |buffer, _| buffer.snapshot());
3108 this.update(cx, |this, _| {
3109 this.shared_buffers.insert(buffer.clone(), snapshot.clone());
3110 })?;
3111 snapshot
3112 };
3113
3114 let max_point = snapshot.max_point();
3115 let start_position = Point::new(line, 0);
3116
3117 if start_position > max_point {
3118 return Err(acp::Error::invalid_params().data(format!(
3119 "Attempting to read beyond the end of the file, line {}:{}",
3120 max_point.row + 1,
3121 max_point.column
3122 )));
3123 }
3124
3125 let start = snapshot.anchor_before(start_position);
3126 let end = snapshot.anchor_before(Point::new(line.saturating_add(limit), 0));
3127
3128 if should_update_agent_location {
3129 project.update(cx, |project, cx| {
3130 project.set_agent_location(
3131 Some(AgentLocation {
3132 buffer: buffer.downgrade(),
3133 position: start,
3134 }),
3135 cx,
3136 );
3137 });
3138 }
3139
3140 Ok(snapshot.text_for_range(start..end).collect::<String>())
3141 })
3142 }
3143
3144 pub fn write_text_file(
3145 &self,
3146 path: PathBuf,
3147 content: String,
3148 cx: &mut Context<Self>,
3149 ) -> Task<Result<()>> {
3150 let project = self.project.clone();
3151 let action_log = self.action_log.clone();
3152 let should_update_agent_location = self.parent_session_id.is_none();
3153 cx.spawn(async move |this, cx| {
3154 let load = project.update(cx, |project, cx| {
3155 let path = project
3156 .project_path_for_absolute_path(&path, cx)
3157 .context("invalid path")?;
3158 anyhow::Ok(project.open_buffer(path, cx))
3159 });
3160 let buffer = load?.await?;
3161 let snapshot = this.update(cx, |this, cx| {
3162 this.shared_buffers
3163 .get(&buffer)
3164 .cloned()
3165 .unwrap_or_else(|| buffer.read(cx).snapshot())
3166 })?;
3167 let edits = cx
3168 .background_executor()
3169 .spawn(async move {
3170 let old_text = snapshot.text();
3171 text_diff(old_text.as_str(), &content)
3172 .into_iter()
3173 .map(|(range, replacement)| {
3174 (snapshot.anchor_range_around(range), replacement)
3175 })
3176 .collect::<Vec<_>>()
3177 })
3178 .await;
3179
3180 if should_update_agent_location {
3181 project.update(cx, |project, cx| {
3182 project.set_agent_location(
3183 Some(AgentLocation {
3184 buffer: buffer.downgrade(),
3185 position: edits
3186 .last()
3187 .map(|(range, _)| range.end)
3188 .unwrap_or(Anchor::min_for_buffer(buffer.read(cx).remote_id())),
3189 }),
3190 cx,
3191 );
3192 });
3193 }
3194
3195 let format_on_save = cx.update(|cx| {
3196 action_log.update(cx, |action_log, cx| {
3197 action_log.buffer_read(buffer.clone(), cx);
3198 });
3199
3200 let format_on_save = buffer.update(cx, |buffer, cx| {
3201 buffer.edit(edits, None, cx);
3202
3203 let settings = language::language_settings::language_settings(
3204 buffer.language().map(|l| l.name()),
3205 buffer.file(),
3206 cx,
3207 );
3208
3209 settings.format_on_save != FormatOnSave::Off
3210 });
3211 action_log.update(cx, |action_log, cx| {
3212 action_log.buffer_edited(buffer.clone(), cx);
3213 });
3214 format_on_save
3215 });
3216
3217 if format_on_save {
3218 let format_task = project.update(cx, |project, cx| {
3219 project.format(
3220 HashSet::from_iter([buffer.clone()]),
3221 LspFormatTarget::Buffers,
3222 false,
3223 FormatTrigger::Save,
3224 cx,
3225 )
3226 });
3227 format_task.await.log_err();
3228
3229 action_log.update(cx, |action_log, cx| {
3230 action_log.buffer_edited(buffer.clone(), cx);
3231 });
3232 }
3233
3234 project
3235 .update(cx, |project, cx| project.save_buffer(buffer, cx))
3236 .await
3237 })
3238 }
3239
3240 pub fn create_terminal(
3241 &self,
3242 command: String,
3243 args: Vec<String>,
3244 extra_env: Vec<acp::EnvVariable>,
3245 cwd: Option<PathBuf>,
3246 output_byte_limit: Option<u64>,
3247 cx: &mut Context<Self>,
3248 ) -> Task<Result<Entity<Terminal>>> {
3249 let env = match &cwd {
3250 Some(dir) => self.project.update(cx, |project, cx| {
3251 project.environment().update(cx, |env, cx| {
3252 env.directory_environment(dir.as_path().into(), cx)
3253 })
3254 }),
3255 None => Task::ready(None).shared(),
3256 };
3257 let env = cx.spawn(async move |_, _| {
3258 let mut env = env.await.unwrap_or_default();
3259 // Disables paging for `git` and hopefully other commands
3260 env.insert("PAGER".into(), "".into());
3261 for var in extra_env {
3262 env.insert(var.name, var.value);
3263 }
3264 env
3265 });
3266
3267 let project = self.project.clone();
3268 let language_registry = project.read(cx).languages().clone();
3269 let is_windows = project.read(cx).path_style(cx).is_windows();
3270
3271 let terminal_id = acp::TerminalId::new(Uuid::new_v4().to_string());
3272 let terminal_task = cx.spawn({
3273 let terminal_id = terminal_id.clone();
3274 async move |_this, cx| {
3275 let env = env.await;
3276 let shell = project
3277 .update(cx, |project, cx| {
3278 project
3279 .remote_client()
3280 .and_then(|r| r.read(cx).default_system_shell())
3281 })
3282 .unwrap_or_else(|| get_default_system_shell_preferring_bash());
3283 let (task_command, task_args) =
3284 ShellBuilder::new(&Shell::Program(shell), is_windows)
3285 .redirect_stdin_to_dev_null()
3286 .build(Some(command.clone()), &args);
3287 let terminal = project
3288 .update(cx, |project, cx| {
3289 project.create_terminal_task(
3290 task::SpawnInTerminal {
3291 command: Some(task_command),
3292 args: task_args,
3293 cwd: cwd.clone(),
3294 env,
3295 ..Default::default()
3296 },
3297 cx,
3298 )
3299 })
3300 .await?;
3301
3302 anyhow::Ok(cx.new(|cx| {
3303 Terminal::new(
3304 terminal_id,
3305 &format!("{} {}", command, args.join(" ")),
3306 cwd,
3307 output_byte_limit.map(|l| l as usize),
3308 terminal,
3309 language_registry,
3310 cx,
3311 )
3312 }))
3313 }
3314 });
3315
3316 cx.spawn(async move |this, cx| {
3317 let terminal = terminal_task.await?;
3318 this.update(cx, |this, _cx| {
3319 this.terminals.insert(terminal_id, terminal.clone());
3320 terminal
3321 })
3322 })
3323 }
3324
3325 pub fn kill_terminal(
3326 &mut self,
3327 terminal_id: acp::TerminalId,
3328 cx: &mut Context<Self>,
3329 ) -> Result<()> {
3330 self.terminals
3331 .get(&terminal_id)
3332 .context("Terminal not found")?
3333 .update(cx, |terminal, cx| {
3334 terminal.kill(cx);
3335 });
3336
3337 Ok(())
3338 }
3339
3340 pub fn release_terminal(
3341 &mut self,
3342 terminal_id: acp::TerminalId,
3343 cx: &mut Context<Self>,
3344 ) -> Result<()> {
3345 self.terminals
3346 .remove(&terminal_id)
3347 .context("Terminal not found")?
3348 .update(cx, |terminal, cx| {
3349 terminal.kill(cx);
3350 });
3351
3352 Ok(())
3353 }
3354
3355 pub fn terminal(&self, terminal_id: acp::TerminalId) -> Result<Entity<Terminal>> {
3356 self.terminals
3357 .get(&terminal_id)
3358 .context("Terminal not found")
3359 .cloned()
3360 }
3361
3362 pub fn to_markdown(&self, cx: &App) -> String {
3363 self.entries.iter().map(|e| e.to_markdown(cx)).collect()
3364 }
3365
3366 pub fn emit_load_error(&mut self, error: LoadError, cx: &mut Context<Self>) {
3367 cx.emit(AcpThreadEvent::LoadError(error));
3368 }
3369
3370 pub fn register_terminal_created(
3371 &mut self,
3372 terminal_id: acp::TerminalId,
3373 command_label: String,
3374 working_dir: Option<PathBuf>,
3375 output_byte_limit: Option<u64>,
3376 terminal: Entity<::terminal::Terminal>,
3377 cx: &mut Context<Self>,
3378 ) -> Entity<Terminal> {
3379 let language_registry = self.project.read(cx).languages().clone();
3380
3381 let entity = cx.new(|cx| {
3382 Terminal::new(
3383 terminal_id.clone(),
3384 &command_label,
3385 working_dir.clone(),
3386 output_byte_limit.map(|l| l as usize),
3387 terminal,
3388 language_registry,
3389 cx,
3390 )
3391 });
3392 self.terminals.insert(terminal_id.clone(), entity.clone());
3393 entity
3394 }
3395
3396 pub fn mark_as_subagent_output(&mut self, cx: &mut Context<Self>) {
3397 for entry in self.entries.iter_mut().rev() {
3398 if let AgentThreadEntry::AssistantMessage(assistant_message) = entry {
3399 assistant_message.is_subagent_output = true;
3400 cx.notify();
3401 return;
3402 }
3403 }
3404 }
3405
3406 pub fn on_terminal_provider_event(
3407 &mut self,
3408 event: TerminalProviderEvent,
3409 cx: &mut Context<Self>,
3410 ) {
3411 match event {
3412 TerminalProviderEvent::Created {
3413 terminal_id,
3414 label,
3415 cwd,
3416 output_byte_limit,
3417 terminal,
3418 } => {
3419 let entity = self.register_terminal_created(
3420 terminal_id.clone(),
3421 label,
3422 cwd,
3423 output_byte_limit,
3424 terminal,
3425 cx,
3426 );
3427
3428 if let Some(mut chunks) = self.pending_terminal_output.remove(&terminal_id) {
3429 for data in chunks.drain(..) {
3430 entity.update(cx, |term, cx| {
3431 term.inner().update(cx, |inner, cx| {
3432 inner.write_output(&data, cx);
3433 })
3434 });
3435 }
3436 }
3437
3438 if let Some(_status) = self.pending_terminal_exit.remove(&terminal_id) {
3439 entity.update(cx, |_term, cx| {
3440 cx.notify();
3441 });
3442 }
3443
3444 cx.notify();
3445 }
3446 TerminalProviderEvent::Output { terminal_id, data } => {
3447 if let Some(entity) = self.terminals.get(&terminal_id) {
3448 entity.update(cx, |term, cx| {
3449 term.inner().update(cx, |inner, cx| {
3450 inner.write_output(&data, cx);
3451 })
3452 });
3453 } else {
3454 self.pending_terminal_output
3455 .entry(terminal_id)
3456 .or_default()
3457 .push(data);
3458 }
3459 }
3460 TerminalProviderEvent::TitleChanged { terminal_id, title } => {
3461 if let Some(entity) = self.terminals.get(&terminal_id) {
3462 entity.update(cx, |term, cx| {
3463 term.inner().update(cx, |inner, cx| {
3464 inner.breadcrumb_text = title;
3465 cx.emit(::terminal::Event::BreadcrumbsChanged);
3466 })
3467 });
3468 }
3469 }
3470 TerminalProviderEvent::Exit {
3471 terminal_id,
3472 status,
3473 } => {
3474 if let Some(entity) = self.terminals.get(&terminal_id) {
3475 entity.update(cx, |_term, cx| {
3476 cx.notify();
3477 });
3478 } else {
3479 self.pending_terminal_exit.insert(terminal_id, status);
3480 }
3481 }
3482 }
3483 }
3484}
3485
3486fn markdown_for_raw_output(
3487 raw_output: &serde_json::Value,
3488 language_registry: &Arc<LanguageRegistry>,
3489 cx: &mut App,
3490) -> Option<Entity<Markdown>> {
3491 match raw_output {
3492 serde_json::Value::Null => None,
3493 serde_json::Value::Bool(value) => Some(cx.new(|cx| {
3494 Markdown::new(
3495 value.to_string().into(),
3496 Some(language_registry.clone()),
3497 None,
3498 cx,
3499 )
3500 })),
3501 serde_json::Value::Number(value) => Some(cx.new(|cx| {
3502 Markdown::new(
3503 value.to_string().into(),
3504 Some(language_registry.clone()),
3505 None,
3506 cx,
3507 )
3508 })),
3509 serde_json::Value::String(value) => Some(cx.new(|cx| {
3510 Markdown::new(
3511 value.clone().into(),
3512 Some(language_registry.clone()),
3513 None,
3514 cx,
3515 )
3516 })),
3517 value => Some(cx.new(|cx| {
3518 let pretty_json = to_string_pretty(value).unwrap_or_else(|_| value.to_string());
3519
3520 Markdown::new(
3521 format!("```json\n{}\n```", pretty_json).into(),
3522 Some(language_registry.clone()),
3523 None,
3524 cx,
3525 )
3526 })),
3527 }
3528}
3529
3530#[cfg(test)]
3531mod tests {
3532 use super::*;
3533 use anyhow::anyhow;
3534 use futures::{channel::mpsc, future::LocalBoxFuture, select};
3535 use gpui::{App, AsyncApp, TestAppContext, WeakEntity};
3536 use indoc::indoc;
3537 use project::{AgentId, FakeFs, Fs};
3538 use rand::{distr, prelude::*};
3539 use serde_json::json;
3540 use settings::SettingsStore;
3541 use smol::stream::StreamExt as _;
3542 use std::{
3543 any::Any,
3544 cell::RefCell,
3545 path::Path,
3546 rc::Rc,
3547 sync::atomic::{AtomicBool, AtomicUsize, Ordering::SeqCst},
3548 time::Duration,
3549 };
3550 use util::{path, path_list::PathList};
3551
3552 fn init_test(cx: &mut TestAppContext) {
3553 env_logger::try_init().ok();
3554 cx.update(|cx| {
3555 let settings_store = SettingsStore::test(cx);
3556 cx.set_global(settings_store);
3557 });
3558 }
3559
3560 #[gpui::test]
3561 async fn test_terminal_output_buffered_before_created_renders(cx: &mut gpui::TestAppContext) {
3562 init_test(cx);
3563
3564 let fs = FakeFs::new(cx.executor());
3565 let project = Project::test(fs, [], cx).await;
3566 let connection = Rc::new(FakeAgentConnection::new());
3567 let thread = cx
3568 .update(|cx| {
3569 connection.new_session(
3570 project,
3571 PathList::new(&[std::path::Path::new(path!("/test"))]),
3572 cx,
3573 )
3574 })
3575 .await
3576 .unwrap();
3577
3578 let terminal_id = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
3579
3580 // Send Output BEFORE Created - should be buffered by acp_thread
3581 thread.update(cx, |thread, cx| {
3582 thread.on_terminal_provider_event(
3583 TerminalProviderEvent::Output {
3584 terminal_id: terminal_id.clone(),
3585 data: b"hello buffered".to_vec(),
3586 },
3587 cx,
3588 );
3589 });
3590
3591 // Create a display-only terminal and then send Created
3592 let lower = cx.new(|cx| {
3593 let builder = ::terminal::TerminalBuilder::new_display_only(
3594 ::terminal::terminal_settings::CursorShape::default(),
3595 ::terminal::terminal_settings::AlternateScroll::On,
3596 None,
3597 0,
3598 cx.background_executor(),
3599 PathStyle::local(),
3600 )
3601 .unwrap();
3602 builder.subscribe(cx)
3603 });
3604
3605 thread.update(cx, |thread, cx| {
3606 thread.on_terminal_provider_event(
3607 TerminalProviderEvent::Created {
3608 terminal_id: terminal_id.clone(),
3609 label: "Buffered Test".to_string(),
3610 cwd: None,
3611 output_byte_limit: None,
3612 terminal: lower.clone(),
3613 },
3614 cx,
3615 );
3616 });
3617
3618 // After Created, buffered Output should have been flushed into the renderer
3619 let content = thread.read_with(cx, |thread, cx| {
3620 let term = thread.terminal(terminal_id.clone()).unwrap();
3621 term.read_with(cx, |t, cx| t.inner().read(cx).get_content())
3622 });
3623
3624 assert!(
3625 content.contains("hello buffered"),
3626 "expected buffered output to render, got: {content}"
3627 );
3628 }
3629
3630 #[gpui::test]
3631 async fn test_terminal_output_and_exit_buffered_before_created(cx: &mut gpui::TestAppContext) {
3632 init_test(cx);
3633
3634 let fs = FakeFs::new(cx.executor());
3635 let project = Project::test(fs, [], cx).await;
3636 let connection = Rc::new(FakeAgentConnection::new());
3637 let thread = cx
3638 .update(|cx| {
3639 connection.new_session(
3640 project,
3641 PathList::new(&[std::path::Path::new(path!("/test"))]),
3642 cx,
3643 )
3644 })
3645 .await
3646 .unwrap();
3647
3648 let terminal_id = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
3649
3650 // Send Output BEFORE Created
3651 thread.update(cx, |thread, cx| {
3652 thread.on_terminal_provider_event(
3653 TerminalProviderEvent::Output {
3654 terminal_id: terminal_id.clone(),
3655 data: b"pre-exit data".to_vec(),
3656 },
3657 cx,
3658 );
3659 });
3660
3661 // Send Exit BEFORE Created
3662 thread.update(cx, |thread, cx| {
3663 thread.on_terminal_provider_event(
3664 TerminalProviderEvent::Exit {
3665 terminal_id: terminal_id.clone(),
3666 status: acp::TerminalExitStatus::new().exit_code(0),
3667 },
3668 cx,
3669 );
3670 });
3671
3672 // Now create a display-only lower-level terminal and send Created
3673 let lower = cx.new(|cx| {
3674 let builder = ::terminal::TerminalBuilder::new_display_only(
3675 ::terminal::terminal_settings::CursorShape::default(),
3676 ::terminal::terminal_settings::AlternateScroll::On,
3677 None,
3678 0,
3679 cx.background_executor(),
3680 PathStyle::local(),
3681 )
3682 .unwrap();
3683 builder.subscribe(cx)
3684 });
3685
3686 thread.update(cx, |thread, cx| {
3687 thread.on_terminal_provider_event(
3688 TerminalProviderEvent::Created {
3689 terminal_id: terminal_id.clone(),
3690 label: "Buffered Exit Test".to_string(),
3691 cwd: None,
3692 output_byte_limit: None,
3693 terminal: lower.clone(),
3694 },
3695 cx,
3696 );
3697 });
3698
3699 // Output should be present after Created (flushed from buffer)
3700 let content = thread.read_with(cx, |thread, cx| {
3701 let term = thread.terminal(terminal_id.clone()).unwrap();
3702 term.read_with(cx, |t, cx| t.inner().read(cx).get_content())
3703 });
3704
3705 assert!(
3706 content.contains("pre-exit data"),
3707 "expected pre-exit data to render, got: {content}"
3708 );
3709 }
3710
3711 /// Test that killing a terminal via Terminal::kill properly:
3712 /// 1. Causes wait_for_exit to complete (doesn't hang forever)
3713 /// 2. The underlying terminal still has the output that was written before the kill
3714 ///
3715 /// This test verifies that the fix to kill_active_task (which now also kills
3716 /// the shell process in addition to the foreground process) properly allows
3717 /// wait_for_exit to complete instead of hanging indefinitely.
3718 #[cfg(unix)]
3719 #[gpui::test]
3720 async fn test_terminal_kill_allows_wait_for_exit_to_complete(cx: &mut gpui::TestAppContext) {
3721 use std::collections::HashMap;
3722 use task::Shell;
3723 use util::shell_builder::ShellBuilder;
3724
3725 init_test(cx);
3726 cx.executor().allow_parking();
3727
3728 let fs = FakeFs::new(cx.executor());
3729 let project = Project::test(fs, [], cx).await;
3730 let connection = Rc::new(FakeAgentConnection::new());
3731 let thread = cx
3732 .update(|cx| {
3733 connection.new_session(
3734 project.clone(),
3735 PathList::new(&[Path::new(path!("/test"))]),
3736 cx,
3737 )
3738 })
3739 .await
3740 .unwrap();
3741
3742 let terminal_id = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
3743
3744 // Create a real PTY terminal that runs a command which prints output then sleeps
3745 // We use printf instead of echo and chain with && sleep to ensure proper execution
3746 let (completion_tx, _completion_rx) = smol::channel::unbounded();
3747 let (program, args) = ShellBuilder::new(&Shell::System, false).build(
3748 Some("printf 'output_before_kill\\n' && sleep 60".to_owned()),
3749 &[],
3750 );
3751
3752 let builder = cx
3753 .update(|cx| {
3754 ::terminal::TerminalBuilder::new(
3755 None,
3756 None,
3757 task::Shell::WithArguments {
3758 program,
3759 args,
3760 title_override: None,
3761 },
3762 HashMap::default(),
3763 ::terminal::terminal_settings::CursorShape::default(),
3764 ::terminal::terminal_settings::AlternateScroll::On,
3765 None,
3766 vec![],
3767 0,
3768 false,
3769 0,
3770 Some(completion_tx),
3771 cx,
3772 vec![],
3773 PathStyle::local(),
3774 )
3775 })
3776 .await
3777 .unwrap();
3778
3779 let lower_terminal = cx.new(|cx| builder.subscribe(cx));
3780
3781 // Create the acp_thread Terminal wrapper
3782 thread.update(cx, |thread, cx| {
3783 thread.on_terminal_provider_event(
3784 TerminalProviderEvent::Created {
3785 terminal_id: terminal_id.clone(),
3786 label: "printf output_before_kill && sleep 60".to_string(),
3787 cwd: None,
3788 output_byte_limit: None,
3789 terminal: lower_terminal.clone(),
3790 },
3791 cx,
3792 );
3793 });
3794
3795 // Wait for the printf command to execute and produce output
3796 // Use real time since parking is enabled
3797 cx.executor().timer(Duration::from_millis(500)).await;
3798
3799 // Get the acp_thread Terminal and kill it
3800 let wait_for_exit = thread.update(cx, |thread, cx| {
3801 let term = thread.terminals.get(&terminal_id).unwrap();
3802 let wait_for_exit = term.read(cx).wait_for_exit();
3803 term.update(cx, |term, cx| {
3804 term.kill(cx);
3805 });
3806 wait_for_exit
3807 });
3808
3809 // KEY ASSERTION: wait_for_exit should complete within a reasonable time (not hang).
3810 // Before the fix to kill_active_task, this would hang forever because
3811 // only the foreground process was killed, not the shell, so the PTY
3812 // child never exited and wait_for_completed_task never completed.
3813 let exit_result = futures::select! {
3814 result = futures::FutureExt::fuse(wait_for_exit) => Some(result),
3815 _ = futures::FutureExt::fuse(cx.background_executor.timer(Duration::from_secs(5))) => None,
3816 };
3817
3818 assert!(
3819 exit_result.is_some(),
3820 "wait_for_exit should complete after kill, but it timed out. \
3821 This indicates kill_active_task is not properly killing the shell process."
3822 );
3823
3824 // Give the system a chance to process any pending updates
3825 cx.run_until_parked();
3826
3827 // Verify that the underlying terminal still has the output that was
3828 // written before the kill. This verifies that killing doesn't lose output.
3829 let inner_content = thread.read_with(cx, |thread, cx| {
3830 let term = thread.terminals.get(&terminal_id).unwrap();
3831 term.read(cx).inner().read(cx).get_content()
3832 });
3833
3834 assert!(
3835 inner_content.contains("output_before_kill"),
3836 "Underlying terminal should contain output from before kill, got: {}",
3837 inner_content
3838 );
3839 }
3840
3841 #[gpui::test]
3842 async fn test_push_user_content_block(cx: &mut gpui::TestAppContext) {
3843 init_test(cx);
3844
3845 let fs = FakeFs::new(cx.executor());
3846 let project = Project::test(fs, [], cx).await;
3847 let connection = Rc::new(FakeAgentConnection::new());
3848 let thread = cx
3849 .update(|cx| {
3850 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
3851 })
3852 .await
3853 .unwrap();
3854
3855 // Test creating a new user message
3856 thread.update(cx, |thread, cx| {
3857 thread.push_user_content_block(None, "Hello, ".into(), cx);
3858 });
3859
3860 thread.update(cx, |thread, cx| {
3861 assert_eq!(thread.entries.len(), 1);
3862 if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
3863 assert_eq!(user_msg.id, None);
3864 assert_eq!(user_msg.content.to_markdown(cx), "Hello, ");
3865 } else {
3866 panic!("Expected UserMessage");
3867 }
3868 });
3869
3870 // Test appending to existing user message
3871 let message_1_id = UserMessageId::new();
3872 thread.update(cx, |thread, cx| {
3873 thread.push_user_content_block(Some(message_1_id.clone()), "world!".into(), cx);
3874 });
3875
3876 thread.update(cx, |thread, cx| {
3877 assert_eq!(thread.entries.len(), 1);
3878 if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
3879 assert_eq!(user_msg.id, Some(message_1_id));
3880 assert_eq!(user_msg.content.to_markdown(cx), "Hello, world!");
3881 } else {
3882 panic!("Expected UserMessage");
3883 }
3884 });
3885
3886 // Test creating new user message after assistant message
3887 thread.update(cx, |thread, cx| {
3888 thread.push_assistant_content_block("Assistant response".into(), false, cx);
3889 });
3890
3891 let message_2_id = UserMessageId::new();
3892 thread.update(cx, |thread, cx| {
3893 thread.push_user_content_block(
3894 Some(message_2_id.clone()),
3895 "New user message".into(),
3896 cx,
3897 );
3898 });
3899
3900 thread.update(cx, |thread, cx| {
3901 assert_eq!(thread.entries.len(), 3);
3902 if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[2] {
3903 assert_eq!(user_msg.id, Some(message_2_id));
3904 assert_eq!(user_msg.content.to_markdown(cx), "New user message");
3905 } else {
3906 panic!("Expected UserMessage at index 2");
3907 }
3908 });
3909 }
3910
3911 #[gpui::test]
3912 async fn test_thinking_concatenation(cx: &mut gpui::TestAppContext) {
3913 init_test(cx);
3914
3915 let fs = FakeFs::new(cx.executor());
3916 let project = Project::test(fs, [], cx).await;
3917 let connection = Rc::new(FakeAgentConnection::new().on_user_message(
3918 |_, thread, mut cx| {
3919 async move {
3920 thread.update(&mut cx, |thread, cx| {
3921 thread
3922 .handle_session_update(
3923 acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk::new(
3924 "Thinking ".into(),
3925 )),
3926 cx,
3927 )
3928 .unwrap();
3929 thread
3930 .handle_session_update(
3931 acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk::new(
3932 "hard!".into(),
3933 )),
3934 cx,
3935 )
3936 .unwrap();
3937 })?;
3938 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3939 }
3940 .boxed_local()
3941 },
3942 ));
3943
3944 let thread = cx
3945 .update(|cx| {
3946 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
3947 })
3948 .await
3949 .unwrap();
3950
3951 thread
3952 .update(cx, |thread, cx| thread.send_raw("Hello from Zed!", cx))
3953 .await
3954 .unwrap();
3955
3956 let output = thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
3957 assert_eq!(
3958 output,
3959 indoc! {r#"
3960 ## User
3961
3962 Hello from Zed!
3963
3964 ## Assistant
3965
3966 <thinking>
3967 Thinking hard!
3968 </thinking>
3969
3970 "#}
3971 );
3972 }
3973
3974 #[gpui::test]
3975 async fn test_edits_concurrently_to_user(cx: &mut TestAppContext) {
3976 init_test(cx);
3977
3978 let fs = FakeFs::new(cx.executor());
3979 fs.insert_tree(path!("/tmp"), json!({"foo": "one\ntwo\nthree\n"}))
3980 .await;
3981 let project = Project::test(fs.clone(), [], cx).await;
3982 let (read_file_tx, read_file_rx) = oneshot::channel::<()>();
3983 let read_file_tx = Rc::new(RefCell::new(Some(read_file_tx)));
3984 let connection = Rc::new(FakeAgentConnection::new().on_user_message(
3985 move |_, thread, mut cx| {
3986 let read_file_tx = read_file_tx.clone();
3987 async move {
3988 let content = thread
3989 .update(&mut cx, |thread, cx| {
3990 thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
3991 })
3992 .unwrap()
3993 .await
3994 .unwrap();
3995 assert_eq!(content, "one\ntwo\nthree\n");
3996 read_file_tx.take().unwrap().send(()).unwrap();
3997 thread
3998 .update(&mut cx, |thread, cx| {
3999 thread.write_text_file(
4000 path!("/tmp/foo").into(),
4001 "one\ntwo\nthree\nfour\nfive\n".to_string(),
4002 cx,
4003 )
4004 })
4005 .unwrap()
4006 .await
4007 .unwrap();
4008 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
4009 }
4010 .boxed_local()
4011 },
4012 ));
4013
4014 let (worktree, pathbuf) = project
4015 .update(cx, |project, cx| {
4016 project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
4017 })
4018 .await
4019 .unwrap();
4020 let buffer = project
4021 .update(cx, |project, cx| {
4022 project.open_buffer((worktree.read(cx).id(), pathbuf), cx)
4023 })
4024 .await
4025 .unwrap();
4026
4027 let thread = cx
4028 .update(|cx| {
4029 connection.new_session(project, PathList::new(&[Path::new(path!("/tmp"))]), cx)
4030 })
4031 .await
4032 .unwrap();
4033
4034 let request = thread.update(cx, |thread, cx| {
4035 thread.send_raw("Extend the count in /tmp/foo", cx)
4036 });
4037 read_file_rx.await.ok();
4038 buffer.update(cx, |buffer, cx| {
4039 buffer.edit([(0..0, "zero\n".to_string())], None, cx);
4040 });
4041 cx.run_until_parked();
4042 assert_eq!(
4043 buffer.read_with(cx, |buffer, _| buffer.text()),
4044 "zero\none\ntwo\nthree\nfour\nfive\n"
4045 );
4046 assert_eq!(
4047 String::from_utf8(fs.read_file_sync(path!("/tmp/foo")).unwrap()).unwrap(),
4048 "zero\none\ntwo\nthree\nfour\nfive\n"
4049 );
4050 request.await.unwrap();
4051 }
4052
4053 #[gpui::test]
4054 async fn test_reading_from_line(cx: &mut TestAppContext) {
4055 init_test(cx);
4056
4057 let fs = FakeFs::new(cx.executor());
4058 fs.insert_tree(path!("/tmp"), json!({"foo": "one\ntwo\nthree\nfour\n"}))
4059 .await;
4060 let project = Project::test(fs.clone(), [], cx).await;
4061 project
4062 .update(cx, |project, cx| {
4063 project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
4064 })
4065 .await
4066 .unwrap();
4067
4068 let connection = Rc::new(FakeAgentConnection::new());
4069
4070 let thread = cx
4071 .update(|cx| {
4072 connection.new_session(project, PathList::new(&[Path::new(path!("/tmp"))]), cx)
4073 })
4074 .await
4075 .unwrap();
4076
4077 // Whole file
4078 let content = thread
4079 .update(cx, |thread, cx| {
4080 thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
4081 })
4082 .await
4083 .unwrap();
4084
4085 assert_eq!(content, "one\ntwo\nthree\nfour\n");
4086
4087 // Only start line
4088 let content = thread
4089 .update(cx, |thread, cx| {
4090 thread.read_text_file(path!("/tmp/foo").into(), Some(3), None, false, cx)
4091 })
4092 .await
4093 .unwrap();
4094
4095 assert_eq!(content, "three\nfour\n");
4096
4097 // Only limit
4098 let content = thread
4099 .update(cx, |thread, cx| {
4100 thread.read_text_file(path!("/tmp/foo").into(), None, Some(2), false, cx)
4101 })
4102 .await
4103 .unwrap();
4104
4105 assert_eq!(content, "one\ntwo\n");
4106
4107 // Range
4108 let content = thread
4109 .update(cx, |thread, cx| {
4110 thread.read_text_file(path!("/tmp/foo").into(), Some(2), Some(2), false, cx)
4111 })
4112 .await
4113 .unwrap();
4114
4115 assert_eq!(content, "two\nthree\n");
4116
4117 // Invalid
4118 let err = thread
4119 .update(cx, |thread, cx| {
4120 thread.read_text_file(path!("/tmp/foo").into(), Some(6), Some(2), false, cx)
4121 })
4122 .await
4123 .unwrap_err();
4124
4125 assert_eq!(
4126 err.to_string(),
4127 "Invalid params: \"Attempting to read beyond the end of the file, line 5:0\""
4128 );
4129 }
4130
4131 #[gpui::test]
4132 async fn test_reading_empty_file(cx: &mut TestAppContext) {
4133 init_test(cx);
4134
4135 let fs = FakeFs::new(cx.executor());
4136 fs.insert_tree(path!("/tmp"), json!({"foo": ""})).await;
4137 let project = Project::test(fs.clone(), [], cx).await;
4138 project
4139 .update(cx, |project, cx| {
4140 project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
4141 })
4142 .await
4143 .unwrap();
4144
4145 let connection = Rc::new(FakeAgentConnection::new());
4146
4147 let thread = cx
4148 .update(|cx| {
4149 connection.new_session(project, PathList::new(&[Path::new(path!("/tmp"))]), cx)
4150 })
4151 .await
4152 .unwrap();
4153
4154 // Whole file
4155 let content = thread
4156 .update(cx, |thread, cx| {
4157 thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
4158 })
4159 .await
4160 .unwrap();
4161
4162 assert_eq!(content, "");
4163
4164 // Only start line
4165 let content = thread
4166 .update(cx, |thread, cx| {
4167 thread.read_text_file(path!("/tmp/foo").into(), Some(1), None, false, cx)
4168 })
4169 .await
4170 .unwrap();
4171
4172 assert_eq!(content, "");
4173
4174 // Only limit
4175 let content = thread
4176 .update(cx, |thread, cx| {
4177 thread.read_text_file(path!("/tmp/foo").into(), None, Some(2), false, cx)
4178 })
4179 .await
4180 .unwrap();
4181
4182 assert_eq!(content, "");
4183
4184 // Range
4185 let content = thread
4186 .update(cx, |thread, cx| {
4187 thread.read_text_file(path!("/tmp/foo").into(), Some(1), Some(1), false, cx)
4188 })
4189 .await
4190 .unwrap();
4191
4192 assert_eq!(content, "");
4193
4194 // Invalid
4195 let err = thread
4196 .update(cx, |thread, cx| {
4197 thread.read_text_file(path!("/tmp/foo").into(), Some(5), Some(2), false, cx)
4198 })
4199 .await
4200 .unwrap_err();
4201
4202 assert_eq!(
4203 err.to_string(),
4204 "Invalid params: \"Attempting to read beyond the end of the file, line 1:0\""
4205 );
4206 }
4207 #[gpui::test]
4208 async fn test_reading_non_existing_file(cx: &mut TestAppContext) {
4209 init_test(cx);
4210
4211 let fs = FakeFs::new(cx.executor());
4212 fs.insert_tree(path!("/tmp"), json!({})).await;
4213 let project = Project::test(fs.clone(), [], cx).await;
4214 project
4215 .update(cx, |project, cx| {
4216 project.find_or_create_worktree(path!("/tmp"), true, cx)
4217 })
4218 .await
4219 .unwrap();
4220
4221 let connection = Rc::new(FakeAgentConnection::new());
4222
4223 let thread = cx
4224 .update(|cx| {
4225 connection.new_session(project, PathList::new(&[Path::new(path!("/tmp"))]), cx)
4226 })
4227 .await
4228 .unwrap();
4229
4230 // Out of project file
4231 let err = thread
4232 .update(cx, |thread, cx| {
4233 thread.read_text_file(path!("/foo").into(), None, None, false, cx)
4234 })
4235 .await
4236 .unwrap_err();
4237
4238 assert_eq!(err.code, acp::ErrorCode::ResourceNotFound);
4239 }
4240
4241 #[gpui::test]
4242 async fn test_succeeding_canceled_toolcall(cx: &mut TestAppContext) {
4243 init_test(cx);
4244
4245 let fs = FakeFs::new(cx.executor());
4246 let project = Project::test(fs, [], cx).await;
4247 let id = acp::ToolCallId::new("test");
4248
4249 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
4250 let id = id.clone();
4251 move |_, thread, mut cx| {
4252 let id = id.clone();
4253 async move {
4254 thread
4255 .update(&mut cx, |thread, cx| {
4256 thread.handle_session_update(
4257 acp::SessionUpdate::ToolCall(
4258 acp::ToolCall::new(id.clone(), "Label")
4259 .kind(acp::ToolKind::Fetch)
4260 .status(acp::ToolCallStatus::InProgress),
4261 ),
4262 cx,
4263 )
4264 })
4265 .unwrap()
4266 .unwrap();
4267 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
4268 }
4269 .boxed_local()
4270 }
4271 }));
4272
4273 let thread = cx
4274 .update(|cx| {
4275 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4276 })
4277 .await
4278 .unwrap();
4279
4280 let request = thread.update(cx, |thread, cx| {
4281 thread.send_raw("Fetch https://example.com", cx)
4282 });
4283
4284 run_until_first_tool_call(&thread, cx).await;
4285
4286 thread.read_with(cx, |thread, _| {
4287 assert!(matches!(
4288 thread.entries[1],
4289 AgentThreadEntry::ToolCall(ToolCall {
4290 status: ToolCallStatus::InProgress,
4291 ..
4292 })
4293 ));
4294 });
4295
4296 thread.update(cx, |thread, cx| thread.cancel(cx)).await;
4297
4298 thread.read_with(cx, |thread, _| {
4299 assert!(matches!(
4300 &thread.entries[1],
4301 AgentThreadEntry::ToolCall(ToolCall {
4302 status: ToolCallStatus::Canceled,
4303 ..
4304 })
4305 ));
4306 });
4307
4308 thread
4309 .update(cx, |thread, cx| {
4310 thread.handle_session_update(
4311 acp::SessionUpdate::ToolCallUpdate(acp::ToolCallUpdate::new(
4312 id,
4313 acp::ToolCallUpdateFields::new().status(acp::ToolCallStatus::Completed),
4314 )),
4315 cx,
4316 )
4317 })
4318 .unwrap();
4319
4320 request.await.unwrap();
4321
4322 thread.read_with(cx, |thread, _| {
4323 assert!(matches!(
4324 thread.entries[1],
4325 AgentThreadEntry::ToolCall(ToolCall {
4326 status: ToolCallStatus::Completed,
4327 ..
4328 })
4329 ));
4330 });
4331 }
4332
4333 #[gpui::test]
4334 async fn test_no_pending_edits_if_tool_calls_are_completed(cx: &mut TestAppContext) {
4335 init_test(cx);
4336 let fs = FakeFs::new(cx.background_executor.clone());
4337 fs.insert_tree(path!("/test"), json!({})).await;
4338 let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
4339
4340 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
4341 move |_, thread, mut cx| {
4342 async move {
4343 thread
4344 .update(&mut cx, |thread, cx| {
4345 thread.handle_session_update(
4346 acp::SessionUpdate::ToolCall(
4347 acp::ToolCall::new("test", "Label")
4348 .kind(acp::ToolKind::Edit)
4349 .status(acp::ToolCallStatus::Completed)
4350 .content(vec![acp::ToolCallContent::Diff(acp::Diff::new(
4351 "/test/test.txt",
4352 "foo",
4353 ))]),
4354 ),
4355 cx,
4356 )
4357 })
4358 .unwrap()
4359 .unwrap();
4360 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
4361 }
4362 .boxed_local()
4363 }
4364 }));
4365
4366 let thread = cx
4367 .update(|cx| {
4368 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4369 })
4370 .await
4371 .unwrap();
4372
4373 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Hi".into()], cx)))
4374 .await
4375 .unwrap();
4376
4377 assert!(cx.read(|cx| !thread.read(cx).has_pending_edit_tool_calls()));
4378 }
4379
4380 #[gpui::test]
4381 async fn test_pending_edits_for_edit_tool_calls_with_locations(cx: &mut TestAppContext) {
4382 init_test(cx);
4383 let fs = FakeFs::new(cx.background_executor.clone());
4384 fs.insert_tree(path!("/test"), json!({"file.txt": "one\ntwo\n"}))
4385 .await;
4386 let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
4387 let connection = Rc::new(FakeAgentConnection::new());
4388
4389 let thread = cx
4390 .update(|cx| {
4391 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4392 })
4393 .await
4394 .unwrap();
4395
4396 thread
4397 .update(cx, |thread, cx| {
4398 thread.handle_session_update(
4399 acp::SessionUpdate::ToolCall(
4400 acp::ToolCall::new("test", "Label")
4401 .kind(acp::ToolKind::Edit)
4402 .status(acp::ToolCallStatus::InProgress),
4403 ),
4404 cx,
4405 )
4406 })
4407 .unwrap();
4408
4409 thread
4410 .update(cx, |thread, cx| {
4411 thread.handle_session_update(
4412 acp::SessionUpdate::ToolCallUpdate(acp::ToolCallUpdate::new(
4413 "test",
4414 acp::ToolCallUpdateFields::new()
4415 .locations(vec![acp::ToolCallLocation::new(path!("/test/file.txt"))]),
4416 )),
4417 cx,
4418 )
4419 })
4420 .unwrap();
4421
4422 cx.run_until_parked();
4423
4424 assert!(cx.read(|cx| thread.read(cx).has_pending_edit_tool_calls()));
4425 }
4426
4427 #[gpui::test]
4428 async fn test_waiting_for_confirmation_does_not_start_inferred_edit_tracking(
4429 cx: &mut TestAppContext,
4430 ) {
4431 init_test(cx);
4432 let fs = FakeFs::new(cx.background_executor.clone());
4433 fs.insert_tree(path!("/test"), json!({"file.txt": "one\ntwo\n"}))
4434 .await;
4435 let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
4436 let connection = Rc::new(FakeAgentConnection::new());
4437
4438 let thread = cx
4439 .update(|cx| {
4440 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4441 })
4442 .await
4443 .unwrap();
4444
4445 let tool_call_id = acp::ToolCallId::new("test");
4446 let allow_option_id = acp::PermissionOptionId::new("allow");
4447 let deny_option_id = acp::PermissionOptionId::new("deny");
4448 let _authorization_task = thread
4449 .update(cx, |thread, cx| {
4450 thread.request_tool_call_authorization(
4451 acp::ToolCall::new(tool_call_id.clone(), "Label")
4452 .kind(acp::ToolKind::Edit)
4453 .locations(vec![acp::ToolCallLocation::new(path!("/test/file.txt"))])
4454 .into(),
4455 PermissionOptions::Flat(vec![
4456 acp::PermissionOption::new(
4457 allow_option_id.clone(),
4458 "Allow",
4459 acp::PermissionOptionKind::AllowOnce,
4460 ),
4461 acp::PermissionOption::new(
4462 deny_option_id,
4463 "Deny",
4464 acp::PermissionOptionKind::RejectOnce,
4465 ),
4466 ]),
4467 cx,
4468 )
4469 })
4470 .unwrap();
4471
4472 cx.run_until_parked();
4473
4474 assert_eq!(inferred_edit_candidate_count(&thread, cx), 0);
4475 assert!(!cx.read(|cx| thread.read(cx).has_pending_edit_tool_calls()));
4476 assert!(cx.read(|cx| thread.read(cx).is_waiting_for_confirmation()));
4477 }
4478
4479 #[gpui::test]
4480 async fn test_authorizing_waiting_tool_call_starts_inferred_edit_tracking(
4481 cx: &mut TestAppContext,
4482 ) {
4483 init_test(cx);
4484 let fs = FakeFs::new(cx.background_executor.clone());
4485 fs.insert_tree(path!("/test"), json!({"file.txt": "one\ntwo\n"}))
4486 .await;
4487 let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
4488 let connection = Rc::new(FakeAgentConnection::new());
4489
4490 let thread = cx
4491 .update(|cx| {
4492 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4493 })
4494 .await
4495 .unwrap();
4496
4497 let tool_call_id = acp::ToolCallId::new("test");
4498 let allow_option_id = acp::PermissionOptionId::new("allow");
4499 let deny_option_id = acp::PermissionOptionId::new("deny");
4500 let _authorization_task = thread
4501 .update(cx, |thread, cx| {
4502 thread.request_tool_call_authorization(
4503 acp::ToolCall::new(tool_call_id.clone(), "Label")
4504 .kind(acp::ToolKind::Edit)
4505 .locations(vec![acp::ToolCallLocation::new(path!("/test/file.txt"))])
4506 .into(),
4507 PermissionOptions::Flat(vec![
4508 acp::PermissionOption::new(
4509 allow_option_id.clone(),
4510 "Allow",
4511 acp::PermissionOptionKind::AllowOnce,
4512 ),
4513 acp::PermissionOption::new(
4514 deny_option_id,
4515 "Deny",
4516 acp::PermissionOptionKind::RejectOnce,
4517 ),
4518 ]),
4519 cx,
4520 )
4521 })
4522 .unwrap();
4523
4524 thread.update(cx, |thread, cx| {
4525 thread.authorize_tool_call(
4526 tool_call_id.clone(),
4527 allow_option_id.into(),
4528 acp::PermissionOptionKind::AllowOnce,
4529 cx,
4530 );
4531 });
4532 cx.run_until_parked();
4533
4534 assert_eq!(inferred_edit_candidate_count(&thread, cx), 1);
4535 assert!(cx.read(|cx| thread.read(cx).has_pending_edit_tool_calls()));
4536 }
4537
4538 #[gpui::test]
4539 async fn test_stale_authorization_does_not_rewrite_status_or_start_inferred_edit_tracking(
4540 cx: &mut TestAppContext,
4541 ) {
4542 init_test(cx);
4543 let fs = FakeFs::new(cx.background_executor.clone());
4544 fs.insert_tree(path!("/test"), json!({"file.txt": "one\ntwo\n"}))
4545 .await;
4546 let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
4547 let connection = Rc::new(FakeAgentConnection::new());
4548
4549 let thread = cx
4550 .update(|cx| {
4551 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4552 })
4553 .await
4554 .unwrap();
4555
4556 let tool_call_id = acp::ToolCallId::new("test");
4557 let allow_option_id = acp::PermissionOptionId::new("allow");
4558 let deny_option_id = acp::PermissionOptionId::new("deny");
4559 let _authorization_task = thread
4560 .update(cx, |thread, cx| {
4561 thread.request_tool_call_authorization(
4562 acp::ToolCall::new(tool_call_id.clone(), "Label")
4563 .kind(acp::ToolKind::Edit)
4564 .locations(vec![acp::ToolCallLocation::new(path!("/test/file.txt"))])
4565 .into(),
4566 PermissionOptions::Flat(vec![
4567 acp::PermissionOption::new(
4568 allow_option_id.clone(),
4569 "Allow",
4570 acp::PermissionOptionKind::AllowOnce,
4571 ),
4572 acp::PermissionOption::new(
4573 deny_option_id,
4574 "Deny",
4575 acp::PermissionOptionKind::RejectOnce,
4576 ),
4577 ]),
4578 cx,
4579 )
4580 })
4581 .unwrap();
4582
4583 thread.update(cx, |thread, _cx| {
4584 let (_, tool_call) = thread.tool_call_mut(&tool_call_id).unwrap();
4585 tool_call.status = ToolCallStatus::Rejected;
4586 });
4587
4588 thread.update(cx, |thread, cx| {
4589 thread.authorize_tool_call(
4590 tool_call_id.clone(),
4591 allow_option_id.into(),
4592 acp::PermissionOptionKind::AllowOnce,
4593 cx,
4594 );
4595 });
4596 cx.run_until_parked();
4597
4598 assert_eq!(inferred_edit_candidate_count(&thread, cx), 0);
4599 thread.read_with(cx, |thread, _| {
4600 let (_, tool_call) = thread.tool_call(&tool_call_id).unwrap();
4601 assert!(matches!(tool_call.status, ToolCallStatus::Rejected));
4602 });
4603 assert!(!cx.read(|cx| thread.read(cx).has_pending_edit_tool_calls()));
4604 }
4605
4606 #[gpui::test]
4607 async fn test_duplicate_authorization_does_not_rewrite_status_or_tracking(
4608 cx: &mut TestAppContext,
4609 ) {
4610 init_test(cx);
4611 let fs = FakeFs::new(cx.background_executor.clone());
4612 fs.insert_tree(path!("/test"), json!({"file.txt": "one\ntwo\n"}))
4613 .await;
4614 let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
4615 let connection = Rc::new(FakeAgentConnection::new());
4616
4617 let thread = cx
4618 .update(|cx| {
4619 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4620 })
4621 .await
4622 .unwrap();
4623
4624 let tool_call_id = acp::ToolCallId::new("test");
4625 let allow_option_id = acp::PermissionOptionId::new("allow");
4626 let deny_option_id = acp::PermissionOptionId::new("deny");
4627 let _authorization_task = thread
4628 .update(cx, |thread, cx| {
4629 thread.request_tool_call_authorization(
4630 acp::ToolCall::new(tool_call_id.clone(), "Label")
4631 .kind(acp::ToolKind::Edit)
4632 .locations(vec![acp::ToolCallLocation::new(path!("/test/file.txt"))])
4633 .into(),
4634 PermissionOptions::Flat(vec![
4635 acp::PermissionOption::new(
4636 allow_option_id.clone(),
4637 "Allow",
4638 acp::PermissionOptionKind::AllowOnce,
4639 ),
4640 acp::PermissionOption::new(
4641 deny_option_id.clone(),
4642 "Deny",
4643 acp::PermissionOptionKind::RejectOnce,
4644 ),
4645 ]),
4646 cx,
4647 )
4648 })
4649 .unwrap();
4650
4651 thread.update(cx, |thread, cx| {
4652 thread.authorize_tool_call(
4653 tool_call_id.clone(),
4654 allow_option_id.into(),
4655 acp::PermissionOptionKind::AllowOnce,
4656 cx,
4657 );
4658 });
4659 cx.run_until_parked();
4660
4661 assert_eq!(inferred_edit_candidate_count(&thread, cx), 1);
4662 thread.read_with(cx, |thread, _| {
4663 let (_, tool_call) = thread.tool_call(&tool_call_id).unwrap();
4664 assert!(matches!(tool_call.status, ToolCallStatus::InProgress));
4665 });
4666
4667 thread.update(cx, |thread, cx| {
4668 thread.authorize_tool_call(
4669 tool_call_id.clone(),
4670 deny_option_id.into(),
4671 acp::PermissionOptionKind::RejectOnce,
4672 cx,
4673 );
4674 });
4675 cx.run_until_parked();
4676
4677 assert_eq!(inferred_edit_candidate_count(&thread, cx), 1);
4678 thread.read_with(cx, |thread, _| {
4679 let (_, tool_call) = thread.tool_call(&tool_call_id).unwrap();
4680 assert!(matches!(tool_call.status, ToolCallStatus::InProgress));
4681 });
4682 }
4683
4684 #[gpui::test]
4685 async fn test_authorization_send_failure_cancels_call_without_tracking(
4686 cx: &mut TestAppContext,
4687 ) {
4688 init_test(cx);
4689 let fs = FakeFs::new(cx.background_executor.clone());
4690 fs.insert_tree(path!("/test"), json!({"file.txt": "one\ntwo\n"}))
4691 .await;
4692 let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
4693 let connection = Rc::new(FakeAgentConnection::new());
4694
4695 let thread = cx
4696 .update(|cx| {
4697 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4698 })
4699 .await
4700 .unwrap();
4701
4702 let tool_call_id = acp::ToolCallId::new("test");
4703 let allow_option_id = acp::PermissionOptionId::new("allow");
4704 let deny_option_id = acp::PermissionOptionId::new("deny");
4705 let authorization_task = thread
4706 .update(cx, |thread, cx| {
4707 thread.request_tool_call_authorization(
4708 acp::ToolCall::new(tool_call_id.clone(), "Label")
4709 .kind(acp::ToolKind::Edit)
4710 .locations(vec![acp::ToolCallLocation::new(path!("/test/file.txt"))])
4711 .into(),
4712 PermissionOptions::Flat(vec![
4713 acp::PermissionOption::new(
4714 allow_option_id.clone(),
4715 "Allow",
4716 acp::PermissionOptionKind::AllowOnce,
4717 ),
4718 acp::PermissionOption::new(
4719 deny_option_id,
4720 "Deny",
4721 acp::PermissionOptionKind::RejectOnce,
4722 ),
4723 ]),
4724 cx,
4725 )
4726 })
4727 .unwrap();
4728 drop(authorization_task);
4729 cx.run_until_parked();
4730
4731 thread.update(cx, |thread, cx| {
4732 thread.authorize_tool_call(
4733 tool_call_id.clone(),
4734 allow_option_id.into(),
4735 acp::PermissionOptionKind::AllowOnce,
4736 cx,
4737 );
4738 });
4739 cx.run_until_parked();
4740
4741 assert_eq!(inferred_edit_candidate_count(&thread, cx), 0);
4742 thread.read_with(cx, |thread, _| {
4743 let (_, tool_call) = thread.tool_call(&tool_call_id).unwrap();
4744 assert!(matches!(tool_call.status, ToolCallStatus::Canceled));
4745 });
4746 assert!(!cx.read(|cx| thread.read(cx).has_pending_edit_tool_calls()));
4747 }
4748
4749 #[gpui::test]
4750 async fn test_stopped_turn_clears_unfinished_inferred_edit_tracking(cx: &mut TestAppContext) {
4751 init_test(cx);
4752 let fs = FakeFs::new(cx.background_executor.clone());
4753 fs.insert_tree(path!("/test"), json!({"file.txt": "one\ntwo\n"}))
4754 .await;
4755 let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
4756 let connection = Rc::new(FakeAgentConnection::new());
4757
4758 let thread = cx
4759 .update(|cx| {
4760 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4761 })
4762 .await
4763 .unwrap();
4764
4765 let tool_call_id = acp::ToolCallId::new("test");
4766 start_external_edit_tool_call(
4767 &thread,
4768 &tool_call_id,
4769 vec![acp::ToolCallLocation::new(path!("/test/file.txt"))],
4770 cx,
4771 );
4772 cx.run_until_parked();
4773
4774 assert_eq!(inferred_edit_candidate_count(&thread, cx), 1);
4775
4776 thread.update(cx, |thread, cx| {
4777 let turn_id = thread.inferred_edit_tracking_turn_id();
4778 thread.finish_inferred_edit_tracking_for_stopped_turn(turn_id, cx);
4779 });
4780
4781 cx.run_until_parked();
4782
4783 assert_eq!(inferred_edit_candidate_count(&thread, cx), 0);
4784 }
4785
4786 #[gpui::test]
4787 async fn test_stopped_turn_only_clears_inferred_edit_tracking_for_its_own_turn(
4788 cx: &mut TestAppContext,
4789 ) {
4790 init_test(cx);
4791 let fs = FakeFs::new(cx.background_executor.clone());
4792 fs.insert_tree(
4793 path!("/test"),
4794 json!({
4795 "old.txt": "one\n",
4796 "new.txt": "two\n",
4797 }),
4798 )
4799 .await;
4800 let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
4801 let connection = Rc::new(FakeAgentConnection::new());
4802
4803 let thread = cx
4804 .update(|cx| {
4805 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4806 })
4807 .await
4808 .unwrap();
4809
4810 let old_tool_call_id = acp::ToolCallId::new("old");
4811 let new_tool_call_id = acp::ToolCallId::new("new");
4812
4813 set_running_turn_for_test(&thread, 1, cx);
4814 start_external_edit_tool_call(
4815 &thread,
4816 &old_tool_call_id,
4817 vec![acp::ToolCallLocation::new(path!("/test/old.txt"))],
4818 cx,
4819 );
4820
4821 set_running_turn_for_test(&thread, 2, cx);
4822 start_external_edit_tool_call(
4823 &thread,
4824 &new_tool_call_id,
4825 vec![acp::ToolCallLocation::new(path!("/test/new.txt"))],
4826 cx,
4827 );
4828 cx.run_until_parked();
4829
4830 assert_eq!(
4831 inferred_edit_candidate_count_for_tool_call(&thread, &old_tool_call_id, cx),
4832 1
4833 );
4834 assert_eq!(
4835 inferred_edit_candidate_count_for_tool_call(&thread, &new_tool_call_id, cx),
4836 1
4837 );
4838
4839 thread.update(cx, |thread, cx| {
4840 thread.finish_inferred_edit_tracking_for_stopped_turn(1, cx);
4841 });
4842 cx.run_until_parked();
4843
4844 assert_eq!(
4845 inferred_edit_candidate_count_for_tool_call(&thread, &old_tool_call_id, cx),
4846 0
4847 );
4848 assert_eq!(
4849 inferred_edit_candidate_count_for_tool_call(&thread, &new_tool_call_id, cx),
4850 1
4851 );
4852 assert!(cx.read(|cx| thread.read(cx).has_pending_edit_tool_calls()));
4853 }
4854
4855 #[gpui::test]
4856 async fn test_infer_external_modified_file_edits_from_tool_call_locations(
4857 cx: &mut TestAppContext,
4858 ) {
4859 init_test(cx);
4860 let fs = FakeFs::new(cx.background_executor.clone());
4861 fs.insert_tree(path!("/test"), json!({"file.txt": "one\ntwo\n"}))
4862 .await;
4863 let project = Project::test(fs.clone(), [path!("/test").as_ref()], cx).await;
4864 let connection = Rc::new(FakeAgentConnection::new());
4865
4866 let thread = cx
4867 .update(|cx| {
4868 connection.new_session(
4869 project.clone(),
4870 PathList::new(&[Path::new(path!("/test"))]),
4871 cx,
4872 )
4873 })
4874 .await
4875 .unwrap();
4876
4877 thread
4878 .update(cx, |thread, cx| {
4879 thread.handle_session_update(
4880 acp::SessionUpdate::ToolCall(
4881 acp::ToolCall::new("test", "Label")
4882 .kind(acp::ToolKind::Edit)
4883 .status(acp::ToolCallStatus::InProgress),
4884 ),
4885 cx,
4886 )
4887 })
4888 .unwrap();
4889
4890 thread
4891 .update(cx, |thread, cx| {
4892 thread.handle_session_update(
4893 acp::SessionUpdate::ToolCallUpdate(acp::ToolCallUpdate::new(
4894 "test",
4895 acp::ToolCallUpdateFields::new()
4896 .locations(vec![acp::ToolCallLocation::new(path!("/test/file.txt"))]),
4897 )),
4898 cx,
4899 )
4900 })
4901 .unwrap();
4902
4903 cx.run_until_parked();
4904
4905 fs.save(
4906 path!("/test/file.txt").as_ref(),
4907 &"one\ntwo\nthree\n".into(),
4908 Default::default(),
4909 )
4910 .await
4911 .unwrap();
4912 cx.run_until_parked();
4913
4914 thread
4915 .update(cx, |thread, cx| {
4916 thread.handle_session_update(
4917 acp::SessionUpdate::ToolCallUpdate(acp::ToolCallUpdate::new(
4918 "test",
4919 acp::ToolCallUpdateFields::new().status(acp::ToolCallStatus::Completed),
4920 )),
4921 cx,
4922 )
4923 })
4924 .unwrap();
4925
4926 cx.executor().advance_clock(Duration::from_millis(200));
4927 cx.run_until_parked();
4928
4929 let action_log = thread.read_with(cx, |thread, _| thread.action_log().clone());
4930 assert_eq!(
4931 action_log.read_with(cx, |action_log, cx| action_log.changed_buffers(cx).len()),
4932 1
4933 );
4934
4935 action_log
4936 .update(cx, |action_log, cx| action_log.reject_all_edits(None, cx))
4937 .await;
4938 cx.run_until_parked();
4939
4940 assert_eq!(
4941 String::from_utf8(fs.read_file_sync(path!("/test/file.txt")).unwrap()).unwrap(),
4942 "one\ntwo\n"
4943 );
4944 }
4945
4946 #[gpui::test]
4947 async fn test_infer_external_created_file_edits_from_tool_call_locations(
4948 cx: &mut TestAppContext,
4949 ) {
4950 init_test(cx);
4951 let fs = FakeFs::new(cx.background_executor.clone());
4952 fs.insert_tree(path!("/test"), json!({})).await;
4953 let project = Project::test(fs.clone(), [path!("/test").as_ref()], cx).await;
4954 let connection = Rc::new(FakeAgentConnection::new());
4955
4956 let thread = cx
4957 .update(|cx| {
4958 connection.new_session(
4959 project.clone(),
4960 PathList::new(&[Path::new(path!("/test"))]),
4961 cx,
4962 )
4963 })
4964 .await
4965 .unwrap();
4966
4967 thread
4968 .update(cx, |thread, cx| {
4969 thread.handle_session_update(
4970 acp::SessionUpdate::ToolCall(
4971 acp::ToolCall::new("test", "Label")
4972 .kind(acp::ToolKind::Edit)
4973 .status(acp::ToolCallStatus::InProgress),
4974 ),
4975 cx,
4976 )
4977 })
4978 .unwrap();
4979
4980 thread
4981 .update(cx, |thread, cx| {
4982 thread.handle_session_update(
4983 acp::SessionUpdate::ToolCallUpdate(acp::ToolCallUpdate::new(
4984 "test",
4985 acp::ToolCallUpdateFields::new()
4986 .locations(vec![acp::ToolCallLocation::new(path!("/test/new.txt"))]),
4987 )),
4988 cx,
4989 )
4990 })
4991 .unwrap();
4992
4993 cx.run_until_parked();
4994
4995 fs.insert_file(path!("/test/new.txt"), "hello\n".as_bytes().to_vec())
4996 .await;
4997 cx.run_until_parked();
4998
4999 thread
5000 .update(cx, |thread, cx| {
5001 thread.handle_session_update(
5002 acp::SessionUpdate::ToolCallUpdate(acp::ToolCallUpdate::new(
5003 "test",
5004 acp::ToolCallUpdateFields::new().status(acp::ToolCallStatus::Completed),
5005 )),
5006 cx,
5007 )
5008 })
5009 .unwrap();
5010
5011 cx.executor().advance_clock(Duration::from_millis(200));
5012 cx.run_until_parked();
5013
5014 let action_log = thread.read_with(cx, |thread, _| thread.action_log().clone());
5015 assert_eq!(
5016 action_log.read_with(cx, |action_log, cx| action_log.changed_buffers(cx).len()),
5017 1
5018 );
5019
5020 action_log
5021 .update(cx, |action_log, cx| action_log.reject_all_edits(None, cx))
5022 .await;
5023 cx.run_until_parked();
5024
5025 assert!(fs.read_file_sync(path!("/test/new.txt")).is_err());
5026 }
5027
5028 fn start_external_edit_tool_call(
5029 thread: &Entity<AcpThread>,
5030 tool_call_id: &acp::ToolCallId,
5031 locations: Vec<acp::ToolCallLocation>,
5032 cx: &mut TestAppContext,
5033 ) {
5034 start_external_edit_tool_call_with_meta(thread, tool_call_id, locations, None, cx);
5035 }
5036
5037 fn start_external_edit_tool_call_with_meta(
5038 thread: &Entity<AcpThread>,
5039 tool_call_id: &acp::ToolCallId,
5040 locations: Vec<acp::ToolCallLocation>,
5041 meta: Option<acp::Meta>,
5042 cx: &mut TestAppContext,
5043 ) {
5044 let tool_call_id = tool_call_id.clone();
5045 thread
5046 .update(cx, move |thread, cx| {
5047 let mut tool_call = acp::ToolCall::new(tool_call_id, "Label")
5048 .kind(acp::ToolKind::Edit)
5049 .status(acp::ToolCallStatus::InProgress)
5050 .locations(locations);
5051
5052 if let Some(meta) = meta {
5053 tool_call = tool_call.meta(meta);
5054 }
5055
5056 thread.handle_session_update(acp::SessionUpdate::ToolCall(tool_call), cx)
5057 })
5058 .unwrap();
5059 }
5060
5061 fn update_external_edit_tool_call_locations(
5062 thread: &Entity<AcpThread>,
5063 tool_call_id: &acp::ToolCallId,
5064 locations: Vec<acp::ToolCallLocation>,
5065 cx: &mut TestAppContext,
5066 ) {
5067 let tool_call_id = tool_call_id.clone();
5068 thread
5069 .update(cx, move |thread, cx| {
5070 thread.handle_session_update(
5071 acp::SessionUpdate::ToolCallUpdate(acp::ToolCallUpdate::new(
5072 tool_call_id,
5073 acp::ToolCallUpdateFields::new().locations(locations),
5074 )),
5075 cx,
5076 )
5077 })
5078 .unwrap();
5079 }
5080
5081 fn complete_external_edit_tool_call(
5082 thread: &Entity<AcpThread>,
5083 tool_call_id: &acp::ToolCallId,
5084 cx: &mut TestAppContext,
5085 ) {
5086 let tool_call_id = tool_call_id.clone();
5087 thread
5088 .update(cx, move |thread, cx| {
5089 thread.handle_session_update(
5090 acp::SessionUpdate::ToolCallUpdate(acp::ToolCallUpdate::new(
5091 tool_call_id,
5092 acp::ToolCallUpdateFields::new().status(acp::ToolCallStatus::Completed),
5093 )),
5094 cx,
5095 )
5096 })
5097 .unwrap();
5098 }
5099
5100 fn changed_buffer_count(thread: &Entity<AcpThread>, cx: &TestAppContext) -> usize {
5101 let action_log = thread.read_with(cx, |thread, _| thread.action_log().clone());
5102 action_log.read_with(cx, |action_log, cx| action_log.changed_buffers(cx).len())
5103 }
5104
5105 fn set_running_turn_for_test(
5106 thread: &Entity<AcpThread>,
5107 turn_id: u32,
5108 cx: &mut TestAppContext,
5109 ) {
5110 thread.update(cx, |thread, _cx| {
5111 thread.turn_id = turn_id;
5112 thread.running_turn = Some(RunningTurn {
5113 id: turn_id,
5114 send_task: Task::ready(()),
5115 });
5116 });
5117 }
5118
5119 fn inferred_edit_candidate_count(thread: &Entity<AcpThread>, cx: &TestAppContext) -> usize {
5120 thread.read_with(cx, |thread, _| {
5121 thread
5122 .inferred_edit_candidates
5123 .values()
5124 .map(|candidates| candidates.len())
5125 .sum()
5126 })
5127 }
5128
5129 fn inferred_edit_candidate_count_for_tool_call(
5130 thread: &Entity<AcpThread>,
5131 tool_call_id: &acp::ToolCallId,
5132 cx: &TestAppContext,
5133 ) -> usize {
5134 thread.read_with(cx, |thread, _| {
5135 thread
5136 .inferred_edit_candidates
5137 .get(tool_call_id)
5138 .map_or(0, HashMap::len)
5139 })
5140 }
5141
5142 fn inferred_edit_tool_call_is_finalizing(
5143 thread: &Entity<AcpThread>,
5144 tool_call_id: &acp::ToolCallId,
5145 cx: &TestAppContext,
5146 ) -> bool {
5147 thread.read_with(cx, |thread, _| {
5148 thread
5149 .finalizing_inferred_edit_tool_calls
5150 .contains(tool_call_id)
5151 })
5152 }
5153
5154 fn inferred_edit_candidate_is_ready(
5155 thread: &Entity<AcpThread>,
5156 tool_call_id: &acp::ToolCallId,
5157 abs_path: &PathBuf,
5158 cx: &TestAppContext,
5159 ) -> bool {
5160 thread.read_with(cx, |thread, _| {
5161 thread
5162 .inferred_edit_candidates
5163 .get(tool_call_id)
5164 .and_then(|candidates| candidates.get(abs_path))
5165 .is_some_and(|candidate_state| {
5166 matches!(candidate_state, InferredEditCandidateState::Ready(_))
5167 })
5168 })
5169 }
5170
5171 async fn open_test_buffer(
5172 project: &Entity<Project>,
5173 abs_path: &Path,
5174 cx: &mut TestAppContext,
5175 ) -> Entity<Buffer> {
5176 project
5177 .update(cx, |project, cx| {
5178 let project_path = project
5179 .project_path_for_absolute_path(abs_path, cx)
5180 .unwrap();
5181 project.open_buffer(project_path, cx)
5182 })
5183 .await
5184 .unwrap()
5185 }
5186
5187 #[gpui::test]
5188 async fn test_cancel_clears_inferred_candidate_state_for_external_edit_calls_with_locations(
5189 cx: &mut TestAppContext,
5190 ) {
5191 init_test(cx);
5192 let fs = FakeFs::new(cx.background_executor.clone());
5193 fs.insert_tree(path!("/test"), json!({"file.txt": "one\ntwo\n"}))
5194 .await;
5195 let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
5196 let connection = Rc::new(FakeAgentConnection::new());
5197
5198 let thread = cx
5199 .update(|cx| {
5200 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
5201 })
5202 .await
5203 .unwrap();
5204
5205 let tool_call_id = acp::ToolCallId::new("test");
5206 set_running_turn_for_test(&thread, 1, cx);
5207 start_external_edit_tool_call(
5208 &thread,
5209 &tool_call_id,
5210 vec![acp::ToolCallLocation::new(path!("/test/file.txt"))],
5211 cx,
5212 );
5213 cx.run_until_parked();
5214
5215 assert_eq!(inferred_edit_candidate_count(&thread, cx), 1);
5216
5217 let cancel = thread.update(cx, |thread, cx| thread.cancel(cx));
5218
5219 cancel.await;
5220 cx.run_until_parked();
5221
5222 assert_eq!(inferred_edit_candidate_count(&thread, cx), 0);
5223 thread.read_with(cx, |thread, _| {
5224 let (_, tool_call) = thread.tool_call(&tool_call_id).unwrap();
5225 assert!(matches!(tool_call.status, ToolCallStatus::Canceled));
5226 });
5227 }
5228
5229 #[gpui::test]
5230 async fn test_completed_update_after_cancel_does_not_reuse_stale_inferred_candidate_baseline(
5231 cx: &mut TestAppContext,
5232 ) {
5233 init_test(cx);
5234 let fs = FakeFs::new(cx.background_executor.clone());
5235 fs.insert_tree(path!("/test"), json!({"file.txt": "one\ntwo\n"}))
5236 .await;
5237 let project = Project::test(fs.clone(), [path!("/test").as_ref()], cx).await;
5238 let connection = Rc::new(FakeAgentConnection::new());
5239
5240 let thread = cx
5241 .update(|cx| {
5242 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
5243 })
5244 .await
5245 .unwrap();
5246
5247 let tool_call_id = acp::ToolCallId::new("test");
5248 set_running_turn_for_test(&thread, 1, cx);
5249 start_external_edit_tool_call(
5250 &thread,
5251 &tool_call_id,
5252 vec![acp::ToolCallLocation::new(path!("/test/file.txt"))],
5253 cx,
5254 );
5255 cx.run_until_parked();
5256
5257 let cancel = thread.update(cx, |thread, cx| thread.cancel(cx));
5258
5259 cancel.await;
5260 cx.run_until_parked();
5261
5262 assert_eq!(inferred_edit_candidate_count(&thread, cx), 0);
5263
5264 fs.save(
5265 path!("/test/file.txt").as_ref(),
5266 &"one\ntwo\nthree\n".into(),
5267 Default::default(),
5268 )
5269 .await
5270 .unwrap();
5271 cx.run_until_parked();
5272
5273 complete_external_edit_tool_call(&thread, &tool_call_id, cx);
5274 cx.executor().advance_clock(Duration::from_millis(200));
5275 cx.run_until_parked();
5276
5277 assert_eq!(changed_buffer_count(&thread, cx), 0);
5278 assert_eq!(inferred_edit_candidate_count(&thread, cx), 0);
5279 }
5280
5281 #[gpui::test]
5282 async fn test_user_edit_and_save_during_external_tool_call_does_not_infer_edit(
5283 cx: &mut TestAppContext,
5284 ) {
5285 init_test(cx);
5286 let fs = FakeFs::new(cx.background_executor.clone());
5287 fs.insert_tree(path!("/test"), json!({"file.txt": "one\ntwo\n"}))
5288 .await;
5289 let project = Project::test(fs.clone(), [path!("/test").as_ref()], cx).await;
5290 let connection = Rc::new(FakeAgentConnection::new());
5291
5292 let thread = cx
5293 .update(|cx| {
5294 connection.new_session(
5295 project.clone(),
5296 PathList::new(&[Path::new(path!("/test"))]),
5297 cx,
5298 )
5299 })
5300 .await
5301 .unwrap();
5302
5303 let tool_call_id = acp::ToolCallId::new("test");
5304 start_external_edit_tool_call(
5305 &thread,
5306 &tool_call_id,
5307 vec![acp::ToolCallLocation::new(path!("/test/file.txt"))],
5308 cx,
5309 );
5310 cx.run_until_parked();
5311
5312 let buffer = open_test_buffer(&project, Path::new(path!("/test/file.txt")), cx).await;
5313 buffer.update(cx, |buffer, cx| {
5314 buffer.edit([(0..0, "zero\n")], None, cx);
5315 });
5316
5317 project
5318 .update(cx, |project, cx| project.save_buffer(buffer.clone(), cx))
5319 .await
5320 .unwrap();
5321 cx.run_until_parked();
5322
5323 complete_external_edit_tool_call(&thread, &tool_call_id, cx);
5324 cx.executor().advance_clock(Duration::from_millis(200));
5325 cx.run_until_parked();
5326
5327 assert_eq!(changed_buffer_count(&thread, cx), 0);
5328 assert_eq!(inferred_edit_candidate_count(&thread, cx), 0);
5329 }
5330
5331 #[gpui::test]
5332 async fn test_already_open_buffer_captures_inferred_edit_baseline_synchronously(
5333 cx: &mut TestAppContext,
5334 ) {
5335 init_test(cx);
5336 let fs = FakeFs::new(cx.background_executor.clone());
5337 fs.insert_tree(path!("/test"), json!({"file.txt": "one\ntwo\n"}))
5338 .await;
5339 let project = Project::test(fs.clone(), [path!("/test").as_ref()], cx).await;
5340 let connection = Rc::new(FakeAgentConnection::new());
5341
5342 let thread = cx
5343 .update(|cx| {
5344 connection.new_session(
5345 project.clone(),
5346 PathList::new(&[Path::new(path!("/test"))]),
5347 cx,
5348 )
5349 })
5350 .await
5351 .unwrap();
5352
5353 let abs_path = PathBuf::from(path!("/test/file.txt"));
5354 let _buffer = open_test_buffer(&project, Path::new(path!("/test/file.txt")), cx).await;
5355
5356 let tool_call_id = acp::ToolCallId::new("test");
5357 start_external_edit_tool_call(&thread, &tool_call_id, Vec::new(), cx);
5358 update_external_edit_tool_call_locations(
5359 &thread,
5360 &tool_call_id,
5361 vec![acp::ToolCallLocation::new(abs_path.clone())],
5362 cx,
5363 );
5364
5365 assert!(inferred_edit_candidate_is_ready(
5366 &thread,
5367 &tool_call_id,
5368 &abs_path,
5369 cx
5370 ));
5371
5372 fs.save(
5373 path!("/test/file.txt").as_ref(),
5374 &"one\ntwo\nthree\n".into(),
5375 Default::default(),
5376 )
5377 .await
5378 .unwrap();
5379 cx.run_until_parked();
5380
5381 complete_external_edit_tool_call(&thread, &tool_call_id, cx);
5382 cx.executor().advance_clock(Duration::from_millis(200));
5383 cx.run_until_parked();
5384
5385 assert_eq!(changed_buffer_count(&thread, cx), 1);
5386 }
5387
5388 #[gpui::test]
5389 async fn test_location_registration_after_external_write_does_not_infer_without_prior_baseline(
5390 cx: &mut TestAppContext,
5391 ) {
5392 init_test(cx);
5393 let fs = FakeFs::new(cx.background_executor.clone());
5394 fs.insert_tree(path!("/test"), json!({"file.txt": "one\ntwo\n"}))
5395 .await;
5396 let project = Project::test(fs.clone(), [path!("/test").as_ref()], cx).await;
5397 let connection = Rc::new(FakeAgentConnection::new());
5398
5399 let thread = cx
5400 .update(|cx| {
5401 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
5402 })
5403 .await
5404 .unwrap();
5405
5406 let tool_call_id = acp::ToolCallId::new("test");
5407 start_external_edit_tool_call(&thread, &tool_call_id, Vec::new(), cx);
5408
5409 fs.save(
5410 path!("/test/file.txt").as_ref(),
5411 &"one\ntwo\nthree\n".into(),
5412 Default::default(),
5413 )
5414 .await
5415 .unwrap();
5416 cx.run_until_parked();
5417
5418 update_external_edit_tool_call_locations(
5419 &thread,
5420 &tool_call_id,
5421 vec![acp::ToolCallLocation::new(path!("/test/file.txt"))],
5422 cx,
5423 );
5424 cx.run_until_parked();
5425
5426 complete_external_edit_tool_call(&thread, &tool_call_id, cx);
5427 cx.executor().advance_clock(Duration::from_millis(200));
5428 cx.run_until_parked();
5429
5430 assert_eq!(changed_buffer_count(&thread, cx), 0);
5431 }
5432
5433 #[gpui::test]
5434 async fn test_terminal_external_edit_candidates_remain_active_until_late_readiness(
5435 cx: &mut TestAppContext,
5436 ) {
5437 init_test(cx);
5438 let fs = FakeFs::new(cx.background_executor.clone());
5439 fs.insert_tree(path!("/test"), json!({"file.txt": "one\ntwo\n"}))
5440 .await;
5441 let project = Project::test(fs.clone(), [path!("/test").as_ref()], cx).await;
5442 let connection = Rc::new(FakeAgentConnection::new());
5443
5444 let thread = cx
5445 .update(|cx| {
5446 connection.new_session(
5447 project.clone(),
5448 PathList::new(&[Path::new(path!("/test"))]),
5449 cx,
5450 )
5451 })
5452 .await
5453 .unwrap();
5454
5455 let abs_path = PathBuf::from(path!("/test/file.txt"));
5456 let buffer = open_test_buffer(&project, Path::new(path!("/test/file.txt")), cx).await;
5457
5458 let tool_call_id = acp::ToolCallId::new("test");
5459 set_running_turn_for_test(&thread, 1, cx);
5460 start_external_edit_tool_call(&thread, &tool_call_id, Vec::new(), cx);
5461
5462 let nonce = thread.update(cx, |thread, _cx| {
5463 let nonce = thread.allocate_inferred_edit_candidate_nonce();
5464 {
5465 let (_, tool_call) = thread.tool_call_mut(&tool_call_id).unwrap();
5466 tool_call.locations = vec![acp::ToolCallLocation::new(abs_path.clone())];
5467 tool_call.resolved_locations = vec![None];
5468 }
5469 thread
5470 .inferred_edit_tool_call_turns
5471 .insert(tool_call_id.clone(), 1);
5472 thread
5473 .inferred_edit_candidates
5474 .entry(tool_call_id.clone())
5475 .or_default()
5476 .insert(
5477 abs_path.clone(),
5478 InferredEditCandidateState::Pending { nonce },
5479 );
5480 nonce
5481 });
5482
5483 complete_external_edit_tool_call(&thread, &tool_call_id, cx);
5484 cx.executor().advance_clock(Duration::from_millis(200));
5485 cx.run_until_parked();
5486
5487 assert_eq!(inferred_edit_candidate_count(&thread, cx), 1);
5488 assert!(inferred_edit_tool_call_is_finalizing(
5489 &thread,
5490 &tool_call_id,
5491 cx
5492 ));
5493
5494 thread.update(cx, |thread, cx| {
5495 thread.set_inferred_edit_candidate_ready(
5496 tool_call_id.clone(),
5497 abs_path.clone(),
5498 nonce,
5499 buffer.clone(),
5500 cx,
5501 );
5502 });
5503 cx.run_until_parked();
5504
5505 assert_eq!(changed_buffer_count(&thread, cx), 0);
5506 assert_eq!(inferred_edit_candidate_count(&thread, cx), 0);
5507 assert!(!inferred_edit_tool_call_is_finalizing(
5508 &thread,
5509 &tool_call_id,
5510 cx
5511 ));
5512 }
5513
5514 #[gpui::test]
5515 async fn test_tool_name_disables_external_edit_inference_for_location_edit_calls(
5516 cx: &mut TestAppContext,
5517 ) {
5518 init_test(cx);
5519 let fs = FakeFs::new(cx.background_executor.clone());
5520 fs.insert_tree(path!("/test"), json!({"file.txt": "one\ntwo\n"}))
5521 .await;
5522 let project = Project::test(fs.clone(), [path!("/test").as_ref()], cx).await;
5523 let connection = Rc::new(FakeAgentConnection::new());
5524
5525 let thread = cx
5526 .update(|cx| {
5527 connection.new_session(
5528 project.clone(),
5529 PathList::new(&[Path::new(path!("/test"))]),
5530 cx,
5531 )
5532 })
5533 .await
5534 .unwrap();
5535
5536 let tool_call_id = acp::ToolCallId::new("test");
5537 start_external_edit_tool_call_with_meta(
5538 &thread,
5539 &tool_call_id,
5540 vec![acp::ToolCallLocation::new(PathBuf::from(path!(
5541 "/test/file.txt"
5542 )))],
5543 Some(meta_with_tool_name("edit_file")),
5544 cx,
5545 );
5546 cx.run_until_parked();
5547
5548 assert_eq!(inferred_edit_candidate_count(&thread, cx), 0);
5549 assert!(!cx.read(|cx| thread.read(cx).has_pending_edit_tool_calls()));
5550
5551 fs.save(
5552 path!("/test/file.txt").as_ref(),
5553 &"one\ntwo\nthree\n".into(),
5554 Default::default(),
5555 )
5556 .await
5557 .unwrap();
5558 cx.run_until_parked();
5559
5560 complete_external_edit_tool_call(&thread, &tool_call_id, cx);
5561 cx.executor().advance_clock(Duration::from_millis(200));
5562 cx.run_until_parked();
5563
5564 assert_eq!(changed_buffer_count(&thread, cx), 0);
5565 }
5566
5567 #[gpui::test]
5568 async fn test_explicit_diff_content_disables_external_edit_inference_for_location_edit_calls(
5569 cx: &mut TestAppContext,
5570 ) {
5571 init_test(cx);
5572 let fs = FakeFs::new(cx.background_executor.clone());
5573 fs.insert_tree(path!("/test"), json!({"file.txt": "one\ntwo\n"}))
5574 .await;
5575 let project = Project::test(fs.clone(), [path!("/test").as_ref()], cx).await;
5576 let connection = Rc::new(FakeAgentConnection::new());
5577
5578 let thread = cx
5579 .update(|cx| {
5580 connection.new_session(
5581 project.clone(),
5582 PathList::new(&[Path::new(path!("/test"))]),
5583 cx,
5584 )
5585 })
5586 .await
5587 .unwrap();
5588
5589 let abs_path = PathBuf::from(path!("/test/file.txt"));
5590 let tool_call_id = acp::ToolCallId::new("test");
5591 start_external_edit_tool_call(
5592 &thread,
5593 &tool_call_id,
5594 vec![acp::ToolCallLocation::new(abs_path.clone())],
5595 cx,
5596 );
5597 cx.run_until_parked();
5598
5599 assert_eq!(inferred_edit_candidate_count(&thread, cx), 1);
5600
5601 let languages = project.read_with(cx, |project, _| project.languages().clone());
5602 let diff = cx.new(|cx| {
5603 Diff::finalized(
5604 path!("/test/file.txt").to_string(),
5605 Some("one\ntwo\n".into()),
5606 "one\ntwo\nthree\n".into(),
5607 languages,
5608 cx,
5609 )
5610 });
5611
5612 thread
5613 .update(cx, |thread, cx| {
5614 thread.update_tool_call(
5615 ToolCallUpdateDiff {
5616 id: tool_call_id.clone(),
5617 diff,
5618 },
5619 cx,
5620 )
5621 })
5622 .unwrap();
5623 cx.run_until_parked();
5624
5625 assert_eq!(inferred_edit_candidate_count(&thread, cx), 0);
5626 assert!(cx.read(|cx| thread.read(cx).has_pending_edit_tool_calls()));
5627
5628 fs.save(
5629 path!("/test/file.txt").as_ref(),
5630 &"one\ntwo\nthree\n".into(),
5631 Default::default(),
5632 )
5633 .await
5634 .unwrap();
5635 cx.run_until_parked();
5636
5637 complete_external_edit_tool_call(&thread, &tool_call_id, cx);
5638 cx.executor().advance_clock(Duration::from_millis(200));
5639 cx.run_until_parked();
5640
5641 assert_eq!(changed_buffer_count(&thread, cx), 0);
5642 }
5643
5644 #[gpui::test(iterations = 10)]
5645 async fn test_checkpoints(cx: &mut TestAppContext) {
5646 init_test(cx);
5647 let fs = FakeFs::new(cx.background_executor.clone());
5648 fs.insert_tree(
5649 path!("/test"),
5650 json!({
5651 ".git": {}
5652 }),
5653 )
5654 .await;
5655 let project = Project::test(fs.clone(), [path!("/test").as_ref()], cx).await;
5656
5657 let simulate_changes = Arc::new(AtomicBool::new(true));
5658 let next_filename = Arc::new(AtomicUsize::new(0));
5659 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
5660 let simulate_changes = simulate_changes.clone();
5661 let next_filename = next_filename.clone();
5662 let fs = fs.clone();
5663 move |request, thread, mut cx| {
5664 let fs = fs.clone();
5665 let simulate_changes = simulate_changes.clone();
5666 let next_filename = next_filename.clone();
5667 async move {
5668 if simulate_changes.load(SeqCst) {
5669 let filename = format!("/test/file-{}", next_filename.fetch_add(1, SeqCst));
5670 fs.write(Path::new(&filename), b"").await?;
5671 }
5672
5673 let acp::ContentBlock::Text(content) = &request.prompt[0] else {
5674 panic!("expected text content block");
5675 };
5676 thread.update(&mut cx, |thread, cx| {
5677 thread
5678 .handle_session_update(
5679 acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk::new(
5680 content.text.to_uppercase().into(),
5681 )),
5682 cx,
5683 )
5684 .unwrap();
5685 })?;
5686 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
5687 }
5688 .boxed_local()
5689 }
5690 }));
5691 let thread = cx
5692 .update(|cx| {
5693 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
5694 })
5695 .await
5696 .unwrap();
5697
5698 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Lorem".into()], cx)))
5699 .await
5700 .unwrap();
5701 thread.read_with(cx, |thread, cx| {
5702 assert_eq!(
5703 thread.to_markdown(cx),
5704 indoc! {"
5705 ## User (checkpoint)
5706
5707 Lorem
5708
5709 ## Assistant
5710
5711 LOREM
5712
5713 "}
5714 );
5715 });
5716 assert_eq!(fs.files(), vec![Path::new(path!("/test/file-0"))]);
5717
5718 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["ipsum".into()], cx)))
5719 .await
5720 .unwrap();
5721 thread.read_with(cx, |thread, cx| {
5722 assert_eq!(
5723 thread.to_markdown(cx),
5724 indoc! {"
5725 ## User (checkpoint)
5726
5727 Lorem
5728
5729 ## Assistant
5730
5731 LOREM
5732
5733 ## User (checkpoint)
5734
5735 ipsum
5736
5737 ## Assistant
5738
5739 IPSUM
5740
5741 "}
5742 );
5743 });
5744 assert_eq!(
5745 fs.files(),
5746 vec![
5747 Path::new(path!("/test/file-0")),
5748 Path::new(path!("/test/file-1"))
5749 ]
5750 );
5751
5752 // Checkpoint isn't stored when there are no changes.
5753 simulate_changes.store(false, SeqCst);
5754 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["dolor".into()], cx)))
5755 .await
5756 .unwrap();
5757 thread.read_with(cx, |thread, cx| {
5758 assert_eq!(
5759 thread.to_markdown(cx),
5760 indoc! {"
5761 ## User (checkpoint)
5762
5763 Lorem
5764
5765 ## Assistant
5766
5767 LOREM
5768
5769 ## User (checkpoint)
5770
5771 ipsum
5772
5773 ## Assistant
5774
5775 IPSUM
5776
5777 ## User
5778
5779 dolor
5780
5781 ## Assistant
5782
5783 DOLOR
5784
5785 "}
5786 );
5787 });
5788 assert_eq!(
5789 fs.files(),
5790 vec![
5791 Path::new(path!("/test/file-0")),
5792 Path::new(path!("/test/file-1"))
5793 ]
5794 );
5795
5796 // Rewinding the conversation truncates the history and restores the checkpoint.
5797 thread
5798 .update(cx, |thread, cx| {
5799 let AgentThreadEntry::UserMessage(message) = &thread.entries[2] else {
5800 panic!("unexpected entries {:?}", thread.entries)
5801 };
5802 thread.restore_checkpoint(message.id.clone().unwrap(), cx)
5803 })
5804 .await
5805 .unwrap();
5806 thread.read_with(cx, |thread, cx| {
5807 assert_eq!(
5808 thread.to_markdown(cx),
5809 indoc! {"
5810 ## User (checkpoint)
5811
5812 Lorem
5813
5814 ## Assistant
5815
5816 LOREM
5817
5818 "}
5819 );
5820 });
5821 assert_eq!(fs.files(), vec![Path::new(path!("/test/file-0"))]);
5822 }
5823
5824 #[gpui::test]
5825 async fn test_tool_result_refusal(cx: &mut TestAppContext) {
5826 use std::sync::atomic::AtomicUsize;
5827 init_test(cx);
5828
5829 let fs = FakeFs::new(cx.executor());
5830 let project = Project::test(fs, None, cx).await;
5831
5832 // Create a connection that simulates refusal after tool result
5833 let prompt_count = Arc::new(AtomicUsize::new(0));
5834 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
5835 let prompt_count = prompt_count.clone();
5836 move |_request, thread, mut cx| {
5837 let count = prompt_count.fetch_add(1, SeqCst);
5838 async move {
5839 if count == 0 {
5840 // First prompt: Generate a tool call with result
5841 thread.update(&mut cx, |thread, cx| {
5842 thread
5843 .handle_session_update(
5844 acp::SessionUpdate::ToolCall(
5845 acp::ToolCall::new("tool1", "Test Tool")
5846 .kind(acp::ToolKind::Fetch)
5847 .status(acp::ToolCallStatus::Completed)
5848 .raw_input(serde_json::json!({"query": "test"}))
5849 .raw_output(serde_json::json!({"result": "inappropriate content"})),
5850 ),
5851 cx,
5852 )
5853 .unwrap();
5854 })?;
5855
5856 // Now return refusal because of the tool result
5857 Ok(acp::PromptResponse::new(acp::StopReason::Refusal))
5858 } else {
5859 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
5860 }
5861 }
5862 .boxed_local()
5863 }
5864 }));
5865
5866 let thread = cx
5867 .update(|cx| {
5868 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
5869 })
5870 .await
5871 .unwrap();
5872
5873 // Track if we see a Refusal event
5874 let saw_refusal_event = Arc::new(std::sync::Mutex::new(false));
5875 let saw_refusal_event_captured = saw_refusal_event.clone();
5876 thread.update(cx, |_thread, cx| {
5877 cx.subscribe(
5878 &thread,
5879 move |_thread, _event_thread, event: &AcpThreadEvent, _cx| {
5880 if matches!(event, AcpThreadEvent::Refusal) {
5881 *saw_refusal_event_captured.lock().unwrap() = true;
5882 }
5883 },
5884 )
5885 .detach();
5886 });
5887
5888 // Send a user message - this will trigger tool call and then refusal
5889 let send_task = thread.update(cx, |thread, cx| thread.send(vec!["Hello".into()], cx));
5890 cx.background_executor.spawn(send_task).detach();
5891 cx.run_until_parked();
5892
5893 // Verify that:
5894 // 1. A Refusal event WAS emitted (because it's a tool result refusal, not user prompt)
5895 // 2. The user message was NOT truncated
5896 assert!(
5897 *saw_refusal_event.lock().unwrap(),
5898 "Refusal event should be emitted for tool result refusals"
5899 );
5900
5901 thread.read_with(cx, |thread, _| {
5902 let entries = thread.entries();
5903 assert!(entries.len() >= 2, "Should have user message and tool call");
5904
5905 // Verify user message is still there
5906 assert!(
5907 matches!(entries[0], AgentThreadEntry::UserMessage(_)),
5908 "User message should not be truncated"
5909 );
5910
5911 // Verify tool call is there with result
5912 if let AgentThreadEntry::ToolCall(tool_call) = &entries[1] {
5913 assert!(
5914 tool_call.raw_output.is_some(),
5915 "Tool call should have output"
5916 );
5917 } else {
5918 panic!("Expected tool call at index 1");
5919 }
5920 });
5921 }
5922
5923 #[gpui::test]
5924 async fn test_user_prompt_refusal_emits_event(cx: &mut TestAppContext) {
5925 init_test(cx);
5926
5927 let fs = FakeFs::new(cx.executor());
5928 let project = Project::test(fs, None, cx).await;
5929
5930 let refuse_next = Arc::new(AtomicBool::new(false));
5931 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
5932 let refuse_next = refuse_next.clone();
5933 move |_request, _thread, _cx| {
5934 if refuse_next.load(SeqCst) {
5935 async move { Ok(acp::PromptResponse::new(acp::StopReason::Refusal)) }
5936 .boxed_local()
5937 } else {
5938 async move { Ok(acp::PromptResponse::new(acp::StopReason::EndTurn)) }
5939 .boxed_local()
5940 }
5941 }
5942 }));
5943
5944 let thread = cx
5945 .update(|cx| {
5946 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
5947 })
5948 .await
5949 .unwrap();
5950
5951 // Track if we see a Refusal event
5952 let saw_refusal_event = Arc::new(std::sync::Mutex::new(false));
5953 let saw_refusal_event_captured = saw_refusal_event.clone();
5954 thread.update(cx, |_thread, cx| {
5955 cx.subscribe(
5956 &thread,
5957 move |_thread, _event_thread, event: &AcpThreadEvent, _cx| {
5958 if matches!(event, AcpThreadEvent::Refusal) {
5959 *saw_refusal_event_captured.lock().unwrap() = true;
5960 }
5961 },
5962 )
5963 .detach();
5964 });
5965
5966 // Send a message that will be refused
5967 refuse_next.store(true, SeqCst);
5968 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["hello".into()], cx)))
5969 .await
5970 .unwrap();
5971
5972 // Verify that a Refusal event WAS emitted for user prompt refusal
5973 assert!(
5974 *saw_refusal_event.lock().unwrap(),
5975 "Refusal event should be emitted for user prompt refusals"
5976 );
5977
5978 // Verify the message was truncated (user prompt refusal)
5979 thread.read_with(cx, |thread, cx| {
5980 assert_eq!(thread.to_markdown(cx), "");
5981 });
5982 }
5983
5984 #[gpui::test]
5985 async fn test_refusal(cx: &mut TestAppContext) {
5986 init_test(cx);
5987 let fs = FakeFs::new(cx.background_executor.clone());
5988 fs.insert_tree(path!("/"), json!({})).await;
5989 let project = Project::test(fs.clone(), [path!("/").as_ref()], cx).await;
5990
5991 let refuse_next = Arc::new(AtomicBool::new(false));
5992 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
5993 let refuse_next = refuse_next.clone();
5994 move |request, thread, mut cx| {
5995 let refuse_next = refuse_next.clone();
5996 async move {
5997 if refuse_next.load(SeqCst) {
5998 return Ok(acp::PromptResponse::new(acp::StopReason::Refusal));
5999 }
6000
6001 let acp::ContentBlock::Text(content) = &request.prompt[0] else {
6002 panic!("expected text content block");
6003 };
6004 thread.update(&mut cx, |thread, cx| {
6005 thread
6006 .handle_session_update(
6007 acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk::new(
6008 content.text.to_uppercase().into(),
6009 )),
6010 cx,
6011 )
6012 .unwrap();
6013 })?;
6014 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
6015 }
6016 .boxed_local()
6017 }
6018 }));
6019 let thread = cx
6020 .update(|cx| {
6021 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
6022 })
6023 .await
6024 .unwrap();
6025
6026 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["hello".into()], cx)))
6027 .await
6028 .unwrap();
6029 thread.read_with(cx, |thread, cx| {
6030 assert_eq!(
6031 thread.to_markdown(cx),
6032 indoc! {"
6033 ## User
6034
6035 hello
6036
6037 ## Assistant
6038
6039 HELLO
6040
6041 "}
6042 );
6043 });
6044
6045 // Simulate refusing the second message. The message should be truncated
6046 // when a user prompt is refused.
6047 refuse_next.store(true, SeqCst);
6048 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["world".into()], cx)))
6049 .await
6050 .unwrap();
6051 thread.read_with(cx, |thread, cx| {
6052 assert_eq!(
6053 thread.to_markdown(cx),
6054 indoc! {"
6055 ## User
6056
6057 hello
6058
6059 ## Assistant
6060
6061 HELLO
6062
6063 "}
6064 );
6065 });
6066 }
6067
6068 async fn run_until_first_tool_call(
6069 thread: &Entity<AcpThread>,
6070 cx: &mut TestAppContext,
6071 ) -> usize {
6072 let (mut tx, mut rx) = mpsc::channel::<usize>(1);
6073
6074 let subscription = cx.update(|cx| {
6075 cx.subscribe(thread, move |thread, _, cx| {
6076 for (ix, entry) in thread.read(cx).entries.iter().enumerate() {
6077 if matches!(entry, AgentThreadEntry::ToolCall(_)) {
6078 return tx.try_send(ix).unwrap();
6079 }
6080 }
6081 })
6082 });
6083
6084 select! {
6085 _ = futures::FutureExt::fuse(cx.background_executor.timer(Duration::from_secs(10))) => {
6086 panic!("Timeout waiting for tool call")
6087 }
6088 ix = rx.next().fuse() => {
6089 drop(subscription);
6090 ix.unwrap()
6091 }
6092 }
6093 }
6094
6095 #[derive(Clone, Default)]
6096 struct FakeAgentConnection {
6097 auth_methods: Vec<acp::AuthMethod>,
6098 sessions: Arc<parking_lot::Mutex<HashMap<acp::SessionId, WeakEntity<AcpThread>>>>,
6099 set_title_calls: Rc<RefCell<Vec<SharedString>>>,
6100 on_user_message: Option<
6101 Rc<
6102 dyn Fn(
6103 acp::PromptRequest,
6104 WeakEntity<AcpThread>,
6105 AsyncApp,
6106 ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
6107 + 'static,
6108 >,
6109 >,
6110 }
6111
6112 impl FakeAgentConnection {
6113 fn new() -> Self {
6114 Self {
6115 auth_methods: Vec::new(),
6116 on_user_message: None,
6117 sessions: Arc::default(),
6118 set_title_calls: Default::default(),
6119 }
6120 }
6121
6122 #[expect(unused)]
6123 fn with_auth_methods(mut self, auth_methods: Vec<acp::AuthMethod>) -> Self {
6124 self.auth_methods = auth_methods;
6125 self
6126 }
6127
6128 fn on_user_message(
6129 mut self,
6130 handler: impl Fn(
6131 acp::PromptRequest,
6132 WeakEntity<AcpThread>,
6133 AsyncApp,
6134 ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
6135 + 'static,
6136 ) -> Self {
6137 self.on_user_message.replace(Rc::new(handler));
6138 self
6139 }
6140 }
6141
6142 impl AgentConnection for FakeAgentConnection {
6143 fn agent_id(&self) -> AgentId {
6144 AgentId::new("fake")
6145 }
6146
6147 fn telemetry_id(&self) -> SharedString {
6148 "fake".into()
6149 }
6150
6151 fn auth_methods(&self) -> &[acp::AuthMethod] {
6152 &self.auth_methods
6153 }
6154
6155 fn new_session(
6156 self: Rc<Self>,
6157 project: Entity<Project>,
6158 work_dirs: PathList,
6159 cx: &mut App,
6160 ) -> Task<gpui::Result<Entity<AcpThread>>> {
6161 let session_id = acp::SessionId::new(
6162 rand::rng()
6163 .sample_iter(&distr::Alphanumeric)
6164 .take(7)
6165 .map(char::from)
6166 .collect::<String>(),
6167 );
6168 let action_log = cx.new(|_| ActionLog::new(project.clone()));
6169 let thread = cx.new(|cx| {
6170 AcpThread::new(
6171 None,
6172 "Test",
6173 Some(work_dirs),
6174 self.clone(),
6175 project,
6176 action_log,
6177 session_id.clone(),
6178 watch::Receiver::constant(
6179 acp::PromptCapabilities::new()
6180 .image(true)
6181 .audio(true)
6182 .embedded_context(true),
6183 ),
6184 cx,
6185 )
6186 });
6187 self.sessions.lock().insert(session_id, thread.downgrade());
6188 Task::ready(Ok(thread))
6189 }
6190
6191 fn authenticate(&self, method: acp::AuthMethodId, _cx: &mut App) -> Task<gpui::Result<()>> {
6192 if self.auth_methods().iter().any(|m| m.id() == &method) {
6193 Task::ready(Ok(()))
6194 } else {
6195 Task::ready(Err(anyhow!("Invalid Auth Method")))
6196 }
6197 }
6198
6199 fn prompt(
6200 &self,
6201 _id: Option<UserMessageId>,
6202 params: acp::PromptRequest,
6203 cx: &mut App,
6204 ) -> Task<gpui::Result<acp::PromptResponse>> {
6205 let sessions = self.sessions.lock();
6206 let thread = sessions.get(¶ms.session_id).unwrap();
6207 if let Some(handler) = &self.on_user_message {
6208 let handler = handler.clone();
6209 let thread = thread.clone();
6210 cx.spawn(async move |cx| handler(params, thread, cx.clone()).await)
6211 } else {
6212 Task::ready(Ok(acp::PromptResponse::new(acp::StopReason::EndTurn)))
6213 }
6214 }
6215
6216 fn cancel(&self, _session_id: &acp::SessionId, _cx: &mut App) {}
6217
6218 fn truncate(
6219 &self,
6220 session_id: &acp::SessionId,
6221 _cx: &App,
6222 ) -> Option<Rc<dyn AgentSessionTruncate>> {
6223 Some(Rc::new(FakeAgentSessionEditor {
6224 _session_id: session_id.clone(),
6225 }))
6226 }
6227
6228 fn set_title(
6229 &self,
6230 _session_id: &acp::SessionId,
6231 _cx: &App,
6232 ) -> Option<Rc<dyn AgentSessionSetTitle>> {
6233 Some(Rc::new(FakeAgentSessionSetTitle {
6234 calls: self.set_title_calls.clone(),
6235 }))
6236 }
6237
6238 fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
6239 self
6240 }
6241 }
6242
6243 struct FakeAgentSessionSetTitle {
6244 calls: Rc<RefCell<Vec<SharedString>>>,
6245 }
6246
6247 impl AgentSessionSetTitle for FakeAgentSessionSetTitle {
6248 fn run(&self, title: SharedString, _cx: &mut App) -> Task<Result<()>> {
6249 self.calls.borrow_mut().push(title);
6250 Task::ready(Ok(()))
6251 }
6252 }
6253
6254 struct FakeAgentSessionEditor {
6255 _session_id: acp::SessionId,
6256 }
6257
6258 impl AgentSessionTruncate for FakeAgentSessionEditor {
6259 fn run(&self, _message_id: UserMessageId, _cx: &mut App) -> Task<Result<()>> {
6260 Task::ready(Ok(()))
6261 }
6262 }
6263
6264 #[gpui::test]
6265 async fn test_tool_call_not_found_creates_failed_entry(cx: &mut TestAppContext) {
6266 init_test(cx);
6267
6268 let fs = FakeFs::new(cx.executor());
6269 let project = Project::test(fs, [], cx).await;
6270 let connection = Rc::new(FakeAgentConnection::new());
6271 let thread = cx
6272 .update(|cx| {
6273 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
6274 })
6275 .await
6276 .unwrap();
6277
6278 // Try to update a tool call that doesn't exist
6279 let nonexistent_id = acp::ToolCallId::new("nonexistent-tool-call");
6280 thread.update(cx, |thread, cx| {
6281 let result = thread.handle_session_update(
6282 acp::SessionUpdate::ToolCallUpdate(acp::ToolCallUpdate::new(
6283 nonexistent_id.clone(),
6284 acp::ToolCallUpdateFields::new().status(acp::ToolCallStatus::Completed),
6285 )),
6286 cx,
6287 );
6288
6289 // The update should succeed (not return an error)
6290 assert!(result.is_ok());
6291
6292 // There should now be exactly one entry in the thread
6293 assert_eq!(thread.entries.len(), 1);
6294
6295 // The entry should be a failed tool call
6296 if let AgentThreadEntry::ToolCall(tool_call) = &thread.entries[0] {
6297 assert_eq!(tool_call.id, nonexistent_id);
6298 assert!(matches!(tool_call.status, ToolCallStatus::Failed));
6299 assert_eq!(tool_call.kind, acp::ToolKind::Fetch);
6300
6301 // Check that the content contains the error message
6302 assert_eq!(tool_call.content.len(), 1);
6303 if let ToolCallContent::ContentBlock(content_block) = &tool_call.content[0] {
6304 match content_block {
6305 ContentBlock::Markdown { markdown } => {
6306 let markdown_text = markdown.read(cx).source();
6307 assert!(markdown_text.contains("Tool call not found"));
6308 }
6309 ContentBlock::Empty => panic!("Expected markdown content, got empty"),
6310 ContentBlock::ResourceLink { .. } => {
6311 panic!("Expected markdown content, got resource link")
6312 }
6313 ContentBlock::Image { .. } => {
6314 panic!("Expected markdown content, got image")
6315 }
6316 }
6317 } else {
6318 panic!("Expected ContentBlock, got: {:?}", tool_call.content[0]);
6319 }
6320 } else {
6321 panic!("Expected ToolCall entry, got: {:?}", thread.entries[0]);
6322 }
6323 });
6324 }
6325
6326 /// Tests that restoring a checkpoint properly cleans up terminals that were
6327 /// created after that checkpoint, and cancels any in-progress generation.
6328 ///
6329 /// Reproduces issue #35142: When a checkpoint is restored, any terminal processes
6330 /// that were started after that checkpoint should be terminated, and any in-progress
6331 /// AI generation should be canceled.
6332 #[gpui::test]
6333 async fn test_restore_checkpoint_kills_terminal(cx: &mut TestAppContext) {
6334 init_test(cx);
6335
6336 let fs = FakeFs::new(cx.executor());
6337 let project = Project::test(fs, [], cx).await;
6338 let connection = Rc::new(FakeAgentConnection::new());
6339 let thread = cx
6340 .update(|cx| {
6341 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
6342 })
6343 .await
6344 .unwrap();
6345
6346 // Send first user message to create a checkpoint
6347 cx.update(|cx| {
6348 thread.update(cx, |thread, cx| {
6349 thread.send(vec!["first message".into()], cx)
6350 })
6351 })
6352 .await
6353 .unwrap();
6354
6355 // Send second message (creates another checkpoint) - we'll restore to this one
6356 cx.update(|cx| {
6357 thread.update(cx, |thread, cx| {
6358 thread.send(vec!["second message".into()], cx)
6359 })
6360 })
6361 .await
6362 .unwrap();
6363
6364 // Create 2 terminals BEFORE the checkpoint that have completed running
6365 let terminal_id_1 = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
6366 let mock_terminal_1 = cx.new(|cx| {
6367 let builder = ::terminal::TerminalBuilder::new_display_only(
6368 ::terminal::terminal_settings::CursorShape::default(),
6369 ::terminal::terminal_settings::AlternateScroll::On,
6370 None,
6371 0,
6372 cx.background_executor(),
6373 PathStyle::local(),
6374 )
6375 .unwrap();
6376 builder.subscribe(cx)
6377 });
6378
6379 thread.update(cx, |thread, cx| {
6380 thread.on_terminal_provider_event(
6381 TerminalProviderEvent::Created {
6382 terminal_id: terminal_id_1.clone(),
6383 label: "echo 'first'".to_string(),
6384 cwd: Some(PathBuf::from("/test")),
6385 output_byte_limit: None,
6386 terminal: mock_terminal_1.clone(),
6387 },
6388 cx,
6389 );
6390 });
6391
6392 thread.update(cx, |thread, cx| {
6393 thread.on_terminal_provider_event(
6394 TerminalProviderEvent::Output {
6395 terminal_id: terminal_id_1.clone(),
6396 data: b"first\n".to_vec(),
6397 },
6398 cx,
6399 );
6400 });
6401
6402 thread.update(cx, |thread, cx| {
6403 thread.on_terminal_provider_event(
6404 TerminalProviderEvent::Exit {
6405 terminal_id: terminal_id_1.clone(),
6406 status: acp::TerminalExitStatus::new().exit_code(0),
6407 },
6408 cx,
6409 );
6410 });
6411
6412 let terminal_id_2 = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
6413 let mock_terminal_2 = cx.new(|cx| {
6414 let builder = ::terminal::TerminalBuilder::new_display_only(
6415 ::terminal::terminal_settings::CursorShape::default(),
6416 ::terminal::terminal_settings::AlternateScroll::On,
6417 None,
6418 0,
6419 cx.background_executor(),
6420 PathStyle::local(),
6421 )
6422 .unwrap();
6423 builder.subscribe(cx)
6424 });
6425
6426 thread.update(cx, |thread, cx| {
6427 thread.on_terminal_provider_event(
6428 TerminalProviderEvent::Created {
6429 terminal_id: terminal_id_2.clone(),
6430 label: "echo 'second'".to_string(),
6431 cwd: Some(PathBuf::from("/test")),
6432 output_byte_limit: None,
6433 terminal: mock_terminal_2.clone(),
6434 },
6435 cx,
6436 );
6437 });
6438
6439 thread.update(cx, |thread, cx| {
6440 thread.on_terminal_provider_event(
6441 TerminalProviderEvent::Output {
6442 terminal_id: terminal_id_2.clone(),
6443 data: b"second\n".to_vec(),
6444 },
6445 cx,
6446 );
6447 });
6448
6449 thread.update(cx, |thread, cx| {
6450 thread.on_terminal_provider_event(
6451 TerminalProviderEvent::Exit {
6452 terminal_id: terminal_id_2.clone(),
6453 status: acp::TerminalExitStatus::new().exit_code(0),
6454 },
6455 cx,
6456 );
6457 });
6458
6459 // Get the second message ID to restore to
6460 let second_message_id = thread.read_with(cx, |thread, _| {
6461 // At this point we have:
6462 // - Index 0: First user message (with checkpoint)
6463 // - Index 1: Second user message (with checkpoint)
6464 // No assistant responses because FakeAgentConnection just returns EndTurn
6465 let AgentThreadEntry::UserMessage(message) = &thread.entries[1] else {
6466 panic!("expected user message at index 1");
6467 };
6468 message.id.clone().unwrap()
6469 });
6470
6471 // Create a terminal AFTER the checkpoint we'll restore to.
6472 // This simulates the AI agent starting a long-running terminal command.
6473 let terminal_id = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
6474 let mock_terminal = cx.new(|cx| {
6475 let builder = ::terminal::TerminalBuilder::new_display_only(
6476 ::terminal::terminal_settings::CursorShape::default(),
6477 ::terminal::terminal_settings::AlternateScroll::On,
6478 None,
6479 0,
6480 cx.background_executor(),
6481 PathStyle::local(),
6482 )
6483 .unwrap();
6484 builder.subscribe(cx)
6485 });
6486
6487 // Register the terminal as created
6488 thread.update(cx, |thread, cx| {
6489 thread.on_terminal_provider_event(
6490 TerminalProviderEvent::Created {
6491 terminal_id: terminal_id.clone(),
6492 label: "sleep 1000".to_string(),
6493 cwd: Some(PathBuf::from("/test")),
6494 output_byte_limit: None,
6495 terminal: mock_terminal.clone(),
6496 },
6497 cx,
6498 );
6499 });
6500
6501 // Simulate the terminal producing output (still running)
6502 thread.update(cx, |thread, cx| {
6503 thread.on_terminal_provider_event(
6504 TerminalProviderEvent::Output {
6505 terminal_id: terminal_id.clone(),
6506 data: b"terminal is running...\n".to_vec(),
6507 },
6508 cx,
6509 );
6510 });
6511
6512 // Create a tool call entry that references this terminal
6513 // This represents the agent requesting a terminal command
6514 thread.update(cx, |thread, cx| {
6515 thread
6516 .handle_session_update(
6517 acp::SessionUpdate::ToolCall(
6518 acp::ToolCall::new("terminal-tool-1", "Running command")
6519 .kind(acp::ToolKind::Execute)
6520 .status(acp::ToolCallStatus::InProgress)
6521 .content(vec![acp::ToolCallContent::Terminal(acp::Terminal::new(
6522 terminal_id.clone(),
6523 ))])
6524 .raw_input(serde_json::json!({"command": "sleep 1000", "cd": "/test"})),
6525 ),
6526 cx,
6527 )
6528 .unwrap();
6529 });
6530
6531 // Verify terminal exists and is in the thread
6532 let terminal_exists_before =
6533 thread.read_with(cx, |thread, _| thread.terminals.contains_key(&terminal_id));
6534 assert!(
6535 terminal_exists_before,
6536 "Terminal should exist before checkpoint restore"
6537 );
6538
6539 // Verify the terminal's underlying task is still running (not completed)
6540 let terminal_running_before = thread.read_with(cx, |thread, _cx| {
6541 let terminal_entity = thread.terminals.get(&terminal_id).unwrap();
6542 terminal_entity.read_with(cx, |term, _cx| {
6543 term.output().is_none() // output is None means it's still running
6544 })
6545 });
6546 assert!(
6547 terminal_running_before,
6548 "Terminal should be running before checkpoint restore"
6549 );
6550
6551 // Verify we have the expected entries before restore
6552 let entry_count_before = thread.read_with(cx, |thread, _| thread.entries.len());
6553 assert!(
6554 entry_count_before > 1,
6555 "Should have multiple entries before restore"
6556 );
6557
6558 // Restore the checkpoint to the second message.
6559 // This should:
6560 // 1. Cancel any in-progress generation (via the cancel() call)
6561 // 2. Remove the terminal that was created after that point
6562 thread
6563 .update(cx, |thread, cx| {
6564 thread.restore_checkpoint(second_message_id, cx)
6565 })
6566 .await
6567 .unwrap();
6568
6569 // Verify that no send_task is in progress after restore
6570 // (cancel() clears the send_task)
6571 let has_send_task_after = thread.read_with(cx, |thread, _| thread.running_turn.is_some());
6572 assert!(
6573 !has_send_task_after,
6574 "Should not have a send_task after restore (cancel should have cleared it)"
6575 );
6576
6577 // Verify the entries were truncated (restoring to index 1 truncates at 1, keeping only index 0)
6578 let entry_count = thread.read_with(cx, |thread, _| thread.entries.len());
6579 assert_eq!(
6580 entry_count, 1,
6581 "Should have 1 entry after restore (only the first user message)"
6582 );
6583
6584 // Verify the 2 completed terminals from before the checkpoint still exist
6585 let terminal_1_exists = thread.read_with(cx, |thread, _| {
6586 thread.terminals.contains_key(&terminal_id_1)
6587 });
6588 assert!(
6589 terminal_1_exists,
6590 "Terminal 1 (from before checkpoint) should still exist"
6591 );
6592
6593 let terminal_2_exists = thread.read_with(cx, |thread, _| {
6594 thread.terminals.contains_key(&terminal_id_2)
6595 });
6596 assert!(
6597 terminal_2_exists,
6598 "Terminal 2 (from before checkpoint) should still exist"
6599 );
6600
6601 // Verify they're still in completed state
6602 let terminal_1_completed = thread.read_with(cx, |thread, _cx| {
6603 let terminal_entity = thread.terminals.get(&terminal_id_1).unwrap();
6604 terminal_entity.read_with(cx, |term, _cx| term.output().is_some())
6605 });
6606 assert!(terminal_1_completed, "Terminal 1 should still be completed");
6607
6608 let terminal_2_completed = thread.read_with(cx, |thread, _cx| {
6609 let terminal_entity = thread.terminals.get(&terminal_id_2).unwrap();
6610 terminal_entity.read_with(cx, |term, _cx| term.output().is_some())
6611 });
6612 assert!(terminal_2_completed, "Terminal 2 should still be completed");
6613
6614 // Verify the running terminal (created after checkpoint) was removed
6615 let terminal_3_exists =
6616 thread.read_with(cx, |thread, _| thread.terminals.contains_key(&terminal_id));
6617 assert!(
6618 !terminal_3_exists,
6619 "Terminal 3 (created after checkpoint) should have been removed"
6620 );
6621
6622 // Verify total count is 2 (the two from before the checkpoint)
6623 let terminal_count = thread.read_with(cx, |thread, _| thread.terminals.len());
6624 assert_eq!(
6625 terminal_count, 2,
6626 "Should have exactly 2 terminals (the completed ones from before checkpoint)"
6627 );
6628 }
6629
6630 /// Tests that update_last_checkpoint correctly updates the original message's checkpoint
6631 /// even when a new user message is added while the async checkpoint comparison is in progress.
6632 ///
6633 /// This is a regression test for a bug where update_last_checkpoint would fail with
6634 /// "no checkpoint" if a new user message (without a checkpoint) was added between when
6635 /// update_last_checkpoint started and when its async closure ran.
6636 #[gpui::test]
6637 async fn test_update_last_checkpoint_with_new_message_added(cx: &mut TestAppContext) {
6638 init_test(cx);
6639
6640 let fs = FakeFs::new(cx.executor());
6641 fs.insert_tree(path!("/test"), json!({".git": {}, "file.txt": "content"}))
6642 .await;
6643 let project = Project::test(fs.clone(), [Path::new(path!("/test"))], cx).await;
6644
6645 let handler_done = Arc::new(AtomicBool::new(false));
6646 let handler_done_clone = handler_done.clone();
6647 let connection = Rc::new(FakeAgentConnection::new().on_user_message(
6648 move |_, _thread, _cx| {
6649 handler_done_clone.store(true, SeqCst);
6650 async move { Ok(acp::PromptResponse::new(acp::StopReason::EndTurn)) }.boxed_local()
6651 },
6652 ));
6653
6654 let thread = cx
6655 .update(|cx| {
6656 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
6657 })
6658 .await
6659 .unwrap();
6660
6661 let send_future = thread.update(cx, |thread, cx| thread.send_raw("First message", cx));
6662 let send_task = cx.background_executor.spawn(send_future);
6663
6664 // Tick until handler completes, then a few more to let update_last_checkpoint start
6665 while !handler_done.load(SeqCst) {
6666 cx.executor().tick();
6667 }
6668 for _ in 0..5 {
6669 cx.executor().tick();
6670 }
6671
6672 thread.update(cx, |thread, cx| {
6673 thread.push_entry(
6674 AgentThreadEntry::UserMessage(UserMessage {
6675 id: Some(UserMessageId::new()),
6676 content: ContentBlock::Empty,
6677 chunks: vec!["Injected message (no checkpoint)".into()],
6678 checkpoint: None,
6679 indented: false,
6680 }),
6681 cx,
6682 );
6683 });
6684
6685 cx.run_until_parked();
6686 let result = send_task.await;
6687
6688 assert!(
6689 result.is_ok(),
6690 "send should succeed even when new message added during update_last_checkpoint: {:?}",
6691 result.err()
6692 );
6693 }
6694
6695 /// Tests that when a follow-up message is sent during generation,
6696 /// the first turn completing does NOT clear `running_turn` because
6697 /// it now belongs to the second turn.
6698 #[gpui::test]
6699 async fn test_follow_up_message_during_generation_does_not_clear_turn(cx: &mut TestAppContext) {
6700 init_test(cx);
6701
6702 let fs = FakeFs::new(cx.executor());
6703 let project = Project::test(fs, [], cx).await;
6704
6705 // First handler waits for this signal before completing
6706 let (first_complete_tx, first_complete_rx) = futures::channel::oneshot::channel::<()>();
6707 let first_complete_rx = RefCell::new(Some(first_complete_rx));
6708
6709 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
6710 move |params, _thread, _cx| {
6711 let first_complete_rx = first_complete_rx.borrow_mut().take();
6712 let is_first = params
6713 .prompt
6714 .iter()
6715 .any(|c| matches!(c, acp::ContentBlock::Text(t) if t.text.contains("first")));
6716
6717 async move {
6718 if is_first {
6719 // First handler waits until signaled
6720 if let Some(rx) = first_complete_rx {
6721 rx.await.ok();
6722 }
6723 }
6724 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
6725 }
6726 .boxed_local()
6727 }
6728 }));
6729
6730 let thread = cx
6731 .update(|cx| {
6732 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
6733 })
6734 .await
6735 .unwrap();
6736
6737 // Send first message (turn_id=1) - handler will block
6738 let first_request = thread.update(cx, |thread, cx| thread.send_raw("first", cx));
6739 assert_eq!(thread.read_with(cx, |t, _| t.turn_id), 1);
6740
6741 // Send second message (turn_id=2) while first is still blocked
6742 // This calls cancel() which takes turn 1's running_turn and sets turn 2's
6743 let second_request = thread.update(cx, |thread, cx| thread.send_raw("second", cx));
6744 assert_eq!(thread.read_with(cx, |t, _| t.turn_id), 2);
6745
6746 let running_turn_after_second_send =
6747 thread.read_with(cx, |thread, _| thread.running_turn.as_ref().map(|t| t.id));
6748 assert_eq!(
6749 running_turn_after_second_send,
6750 Some(2),
6751 "running_turn should be set to turn 2 after sending second message"
6752 );
6753
6754 // Now signal first handler to complete
6755 first_complete_tx.send(()).ok();
6756
6757 // First request completes - should NOT clear running_turn
6758 // because running_turn now belongs to turn 2
6759 first_request.await.unwrap();
6760
6761 let running_turn_after_first =
6762 thread.read_with(cx, |thread, _| thread.running_turn.as_ref().map(|t| t.id));
6763 assert_eq!(
6764 running_turn_after_first,
6765 Some(2),
6766 "first turn completing should not clear running_turn (belongs to turn 2)"
6767 );
6768
6769 // Second request completes - SHOULD clear running_turn
6770 second_request.await.unwrap();
6771
6772 let running_turn_after_second =
6773 thread.read_with(cx, |thread, _| thread.running_turn.is_some());
6774 assert!(
6775 !running_turn_after_second,
6776 "second turn completing should clear running_turn"
6777 );
6778 }
6779
6780 #[gpui::test]
6781 async fn test_send_returns_cancelled_response_and_marks_tools_as_cancelled(
6782 cx: &mut TestAppContext,
6783 ) {
6784 init_test(cx);
6785
6786 let fs = FakeFs::new(cx.executor());
6787 let project = Project::test(fs, [], cx).await;
6788
6789 let connection = Rc::new(FakeAgentConnection::new().on_user_message(
6790 move |_params, thread, mut cx| {
6791 async move {
6792 thread
6793 .update(&mut cx, |thread, cx| {
6794 thread.handle_session_update(
6795 acp::SessionUpdate::ToolCall(
6796 acp::ToolCall::new(
6797 acp::ToolCallId::new("test-tool"),
6798 "Test Tool",
6799 )
6800 .kind(acp::ToolKind::Fetch)
6801 .status(acp::ToolCallStatus::InProgress),
6802 ),
6803 cx,
6804 )
6805 })
6806 .unwrap()
6807 .unwrap();
6808
6809 Ok(acp::PromptResponse::new(acp::StopReason::Cancelled))
6810 }
6811 .boxed_local()
6812 },
6813 ));
6814
6815 let thread = cx
6816 .update(|cx| {
6817 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
6818 })
6819 .await
6820 .unwrap();
6821
6822 let response = thread
6823 .update(cx, |thread, cx| thread.send_raw("test message", cx))
6824 .await;
6825
6826 let response = response
6827 .expect("send should succeed")
6828 .expect("should have response");
6829 assert_eq!(
6830 response.stop_reason,
6831 acp::StopReason::Cancelled,
6832 "response should have Cancelled stop_reason"
6833 );
6834
6835 thread.read_with(cx, |thread, _| {
6836 let tool_entry = thread
6837 .entries
6838 .iter()
6839 .find_map(|e| {
6840 if let AgentThreadEntry::ToolCall(call) = e {
6841 Some(call)
6842 } else {
6843 None
6844 }
6845 })
6846 .expect("should have tool call entry");
6847
6848 assert!(
6849 matches!(tool_entry.status, ToolCallStatus::Canceled),
6850 "tool should be marked as Canceled when response is Cancelled, got {:?}",
6851 tool_entry.status
6852 );
6853 });
6854 }
6855
6856 #[gpui::test]
6857 async fn test_provisional_title_replaced_by_real_title(cx: &mut TestAppContext) {
6858 init_test(cx);
6859
6860 let fs = FakeFs::new(cx.executor());
6861 let project = Project::test(fs, [], cx).await;
6862 let connection = Rc::new(FakeAgentConnection::new());
6863 let set_title_calls = connection.set_title_calls.clone();
6864
6865 let thread = cx
6866 .update(|cx| {
6867 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
6868 })
6869 .await
6870 .unwrap();
6871
6872 // Initial title is the default.
6873 thread.read_with(cx, |thread, _| {
6874 assert_eq!(thread.title().as_ref(), "Test");
6875 });
6876
6877 // Setting a provisional title updates the display title.
6878 thread.update(cx, |thread, cx| {
6879 thread.set_provisional_title("Hello, can you help…".into(), cx);
6880 });
6881 thread.read_with(cx, |thread, _| {
6882 assert_eq!(thread.title().as_ref(), "Hello, can you help…");
6883 });
6884
6885 // The provisional title should NOT have propagated to the connection.
6886 assert_eq!(
6887 set_title_calls.borrow().len(),
6888 0,
6889 "provisional title should not propagate to the connection"
6890 );
6891
6892 // When the real title arrives via set_title, it replaces the
6893 // provisional title and propagates to the connection.
6894 let task = thread.update(cx, |thread, cx| {
6895 thread.set_title("Helping with Rust question".into(), cx)
6896 });
6897 task.await.expect("set_title should succeed");
6898 thread.read_with(cx, |thread, _| {
6899 assert_eq!(thread.title().as_ref(), "Helping with Rust question");
6900 });
6901 assert_eq!(
6902 set_title_calls.borrow().as_slice(),
6903 &[SharedString::from("Helping with Rust question")],
6904 "real title should propagate to the connection"
6905 );
6906 }
6907
6908 #[gpui::test]
6909 async fn test_session_info_update_replaces_provisional_title_and_emits_event(
6910 cx: &mut TestAppContext,
6911 ) {
6912 init_test(cx);
6913
6914 let fs = FakeFs::new(cx.executor());
6915 let project = Project::test(fs, [], cx).await;
6916 let connection = Rc::new(FakeAgentConnection::new());
6917
6918 let thread = cx
6919 .update(|cx| {
6920 connection.clone().new_session(
6921 project,
6922 PathList::new(&[Path::new(path!("/test"))]),
6923 cx,
6924 )
6925 })
6926 .await
6927 .unwrap();
6928
6929 let title_updated_events = Rc::new(RefCell::new(0usize));
6930 let title_updated_events_for_subscription = title_updated_events.clone();
6931 thread.update(cx, |_thread, cx| {
6932 cx.subscribe(
6933 &thread,
6934 move |_thread, _event_thread, event: &AcpThreadEvent, _cx| {
6935 if matches!(event, AcpThreadEvent::TitleUpdated) {
6936 *title_updated_events_for_subscription.borrow_mut() += 1;
6937 }
6938 },
6939 )
6940 .detach();
6941 });
6942
6943 thread.update(cx, |thread, cx| {
6944 thread.set_provisional_title("Hello, can you help…".into(), cx);
6945 });
6946 assert_eq!(
6947 *title_updated_events.borrow(),
6948 1,
6949 "setting a provisional title should emit TitleUpdated"
6950 );
6951
6952 let result = thread.update(cx, |thread, cx| {
6953 thread.handle_session_update(
6954 acp::SessionUpdate::SessionInfoUpdate(
6955 acp::SessionInfoUpdate::new().title("Helping with Rust question"),
6956 ),
6957 cx,
6958 )
6959 });
6960 result.expect("session info update should succeed");
6961
6962 thread.read_with(cx, |thread, _| {
6963 assert_eq!(thread.title().as_ref(), "Helping with Rust question");
6964 assert!(
6965 !thread.has_provisional_title(),
6966 "session info title update should clear provisional title"
6967 );
6968 });
6969
6970 assert_eq!(
6971 *title_updated_events.borrow(),
6972 2,
6973 "session info title update should emit TitleUpdated"
6974 );
6975 assert!(
6976 connection.set_title_calls.borrow().is_empty(),
6977 "session info title update should not propagate back to the connection"
6978 );
6979 }
6980}