1mod connection;
2mod diff;
3mod mention;
4mod terminal;
5use action_log::{ActionLog, ActionLogTelemetry};
6use agent_client_protocol::{self as acp};
7use anyhow::{Context as _, Result, anyhow};
8use collections::HashSet;
9pub use connection::*;
10pub use diff::*;
11use futures::{FutureExt, channel::oneshot, future::BoxFuture};
12use gpui::{AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Task, WeakEntity};
13use itertools::Itertools;
14use language::language_settings::FormatOnSave;
15use language::{Anchor, Buffer, BufferSnapshot, LanguageRegistry, Point, ToPoint, text_diff};
16use markdown::Markdown;
17pub use mention::*;
18use project::lsp_store::{FormatTrigger, LspFormatTarget};
19use project::{AgentLocation, Project, git_store::GitStoreCheckpoint};
20use serde::{Deserialize, Serialize};
21use serde_json::to_string_pretty;
22use std::collections::HashMap;
23use std::error::Error;
24use std::fmt::{Formatter, Write};
25use std::ops::Range;
26use std::process::ExitStatus;
27use std::rc::Rc;
28use std::time::{Duration, Instant};
29use std::{fmt::Display, mem, path::PathBuf, sync::Arc};
30use task::{Shell, ShellBuilder};
31pub use terminal::*;
32use text::Bias;
33use ui::App;
34use util::markdown::MarkdownEscaped;
35use util::path_list::PathList;
36use util::{ResultExt, get_default_system_shell_preferring_bash, paths::PathStyle};
37use uuid::Uuid;
38
39/// Key used in ACP ToolCall meta to store the tool's programmatic name.
40/// This is a workaround since ACP's ToolCall doesn't have a dedicated name field.
41pub const TOOL_NAME_META_KEY: &str = "tool_name";
42
43/// Helper to extract tool name from ACP meta
44pub fn tool_name_from_meta(meta: &Option<acp::Meta>) -> Option<SharedString> {
45 meta.as_ref()
46 .and_then(|m| m.get(TOOL_NAME_META_KEY))
47 .and_then(|v| v.as_str())
48 .map(|s| SharedString::from(s.to_owned()))
49}
50
51/// Helper to create meta with tool name
52pub fn meta_with_tool_name(tool_name: &str) -> acp::Meta {
53 acp::Meta::from_iter([(TOOL_NAME_META_KEY.into(), tool_name.into())])
54}
55
56/// Key used in ACP ToolCall meta to store the session id and message indexes
57pub const SUBAGENT_SESSION_INFO_META_KEY: &str = "subagent_session_info";
58
59#[derive(Clone, Debug, Deserialize, Serialize)]
60pub struct SubagentSessionInfo {
61 /// The session id of the subagent sessiont that was spawned
62 pub session_id: acp::SessionId,
63 /// The index of the message of the start of the "turn" run by this tool call
64 pub message_start_index: usize,
65 /// The index of the output of the message that the subagent has returned
66 #[serde(skip_serializing_if = "Option::is_none")]
67 pub message_end_index: Option<usize>,
68}
69
70/// Helper to extract subagent session id from ACP meta
71pub fn subagent_session_info_from_meta(meta: &Option<acp::Meta>) -> Option<SubagentSessionInfo> {
72 meta.as_ref()
73 .and_then(|m| m.get(SUBAGENT_SESSION_INFO_META_KEY))
74 .and_then(|v| serde_json::from_value(v.clone()).ok())
75}
76
77#[derive(Debug)]
78pub struct UserMessage {
79 pub id: Option<UserMessageId>,
80 pub content: ContentBlock,
81 pub chunks: Vec<acp::ContentBlock>,
82 pub checkpoint: Option<Checkpoint>,
83 pub indented: bool,
84}
85
86#[derive(Debug)]
87pub struct Checkpoint {
88 git_checkpoint: GitStoreCheckpoint,
89 pub show: bool,
90}
91
92impl UserMessage {
93 fn to_markdown(&self, cx: &App) -> String {
94 let mut markdown = String::new();
95 if self
96 .checkpoint
97 .as_ref()
98 .is_some_and(|checkpoint| checkpoint.show)
99 {
100 writeln!(markdown, "## User (checkpoint)").unwrap();
101 } else {
102 writeln!(markdown, "## User").unwrap();
103 }
104 writeln!(markdown).unwrap();
105 writeln!(markdown, "{}", self.content.to_markdown(cx)).unwrap();
106 writeln!(markdown).unwrap();
107 markdown
108 }
109}
110
111#[derive(Debug, PartialEq)]
112pub struct AssistantMessage {
113 pub chunks: Vec<AssistantMessageChunk>,
114 pub indented: bool,
115 pub is_subagent_output: bool,
116}
117
118impl AssistantMessage {
119 pub fn to_markdown(&self, cx: &App) -> String {
120 format!(
121 "## Assistant\n\n{}\n\n",
122 self.chunks
123 .iter()
124 .map(|chunk| chunk.to_markdown(cx))
125 .join("\n\n")
126 )
127 }
128}
129
130#[derive(Debug, PartialEq)]
131pub enum AssistantMessageChunk {
132 Message { block: ContentBlock },
133 Thought { block: ContentBlock },
134}
135
136impl AssistantMessageChunk {
137 pub fn from_str(
138 chunk: &str,
139 language_registry: &Arc<LanguageRegistry>,
140 path_style: PathStyle,
141 cx: &mut App,
142 ) -> Self {
143 Self::Message {
144 block: ContentBlock::new(chunk.into(), language_registry, path_style, cx),
145 }
146 }
147
148 fn to_markdown(&self, cx: &App) -> String {
149 match self {
150 Self::Message { block } => block.to_markdown(cx).to_string(),
151 Self::Thought { block } => {
152 format!("<thinking>\n{}\n</thinking>", block.to_markdown(cx))
153 }
154 }
155 }
156}
157
158#[derive(Debug)]
159pub enum AgentThreadEntry {
160 UserMessage(UserMessage),
161 AssistantMessage(AssistantMessage),
162 ToolCall(ToolCall),
163}
164
165impl AgentThreadEntry {
166 pub fn is_indented(&self) -> bool {
167 match self {
168 Self::UserMessage(message) => message.indented,
169 Self::AssistantMessage(message) => message.indented,
170 Self::ToolCall(_) => false,
171 }
172 }
173
174 pub fn to_markdown(&self, cx: &App) -> String {
175 match self {
176 Self::UserMessage(message) => message.to_markdown(cx),
177 Self::AssistantMessage(message) => message.to_markdown(cx),
178 Self::ToolCall(tool_call) => tool_call.to_markdown(cx),
179 }
180 }
181
182 pub fn user_message(&self) -> Option<&UserMessage> {
183 if let AgentThreadEntry::UserMessage(message) = self {
184 Some(message)
185 } else {
186 None
187 }
188 }
189
190 pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
191 if let AgentThreadEntry::ToolCall(call) = self {
192 itertools::Either::Left(call.diffs())
193 } else {
194 itertools::Either::Right(std::iter::empty())
195 }
196 }
197
198 pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
199 if let AgentThreadEntry::ToolCall(call) = self {
200 itertools::Either::Left(call.terminals())
201 } else {
202 itertools::Either::Right(std::iter::empty())
203 }
204 }
205
206 pub fn location(&self, ix: usize) -> Option<(acp::ToolCallLocation, AgentLocation)> {
207 if let AgentThreadEntry::ToolCall(ToolCall {
208 locations,
209 resolved_locations,
210 ..
211 }) = self
212 {
213 Some((
214 locations.get(ix)?.clone(),
215 resolved_locations.get(ix)?.clone()?,
216 ))
217 } else {
218 None
219 }
220 }
221}
222
223#[derive(Debug)]
224pub struct ToolCall {
225 pub id: acp::ToolCallId,
226 pub label: Entity<Markdown>,
227 pub kind: acp::ToolKind,
228 pub content: Vec<ToolCallContent>,
229 pub status: ToolCallStatus,
230 pub locations: Vec<acp::ToolCallLocation>,
231 pub resolved_locations: Vec<Option<AgentLocation>>,
232 pub raw_input: Option<serde_json::Value>,
233 pub raw_input_markdown: Option<Entity<Markdown>>,
234 pub raw_output: Option<serde_json::Value>,
235 pub tool_name: Option<SharedString>,
236 pub subagent_session_info: Option<SubagentSessionInfo>,
237}
238
239impl ToolCall {
240 fn from_acp(
241 tool_call: acp::ToolCall,
242 status: ToolCallStatus,
243 language_registry: Arc<LanguageRegistry>,
244 path_style: PathStyle,
245 terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
246 cx: &mut App,
247 ) -> Result<Self> {
248 let title = if tool_call.kind == acp::ToolKind::Execute {
249 tool_call.title
250 } else if tool_call.kind == acp::ToolKind::Edit {
251 MarkdownEscaped(tool_call.title.as_str()).to_string()
252 } else if let Some((first_line, _)) = tool_call.title.split_once("\n") {
253 first_line.to_owned() + "…"
254 } else {
255 tool_call.title
256 };
257 let mut content = Vec::with_capacity(tool_call.content.len());
258 for item in tool_call.content {
259 if let Some(item) = ToolCallContent::from_acp(
260 item,
261 language_registry.clone(),
262 path_style,
263 terminals,
264 cx,
265 )? {
266 content.push(item);
267 }
268 }
269
270 let raw_input_markdown = tool_call
271 .raw_input
272 .as_ref()
273 .and_then(|input| markdown_for_raw_output(input, &language_registry, cx));
274
275 let tool_name = tool_name_from_meta(&tool_call.meta);
276
277 let subagent_session_info = subagent_session_info_from_meta(&tool_call.meta);
278
279 let result = Self {
280 id: tool_call.tool_call_id,
281 label: cx
282 .new(|cx| Markdown::new(title.into(), Some(language_registry.clone()), None, cx)),
283 kind: tool_call.kind,
284 content,
285 locations: tool_call.locations,
286 resolved_locations: Vec::default(),
287 status,
288 raw_input: tool_call.raw_input,
289 raw_input_markdown,
290 raw_output: tool_call.raw_output,
291 tool_name,
292 subagent_session_info,
293 };
294 Ok(result)
295 }
296
297 fn update_fields(
298 &mut self,
299 fields: acp::ToolCallUpdateFields,
300 meta: Option<acp::Meta>,
301 language_registry: Arc<LanguageRegistry>,
302 path_style: PathStyle,
303 terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
304 cx: &mut App,
305 ) -> Result<()> {
306 let acp::ToolCallUpdateFields {
307 kind,
308 status,
309 title,
310 content,
311 locations,
312 raw_input,
313 raw_output,
314 ..
315 } = fields;
316
317 if let Some(kind) = kind {
318 self.kind = kind;
319 }
320
321 if let Some(status) = status {
322 self.status = status.into();
323 }
324
325 if let Some(subagent_session_info) = subagent_session_info_from_meta(&meta) {
326 self.subagent_session_info = Some(subagent_session_info);
327 }
328
329 if let Some(title) = title {
330 if self.kind == acp::ToolKind::Execute {
331 for terminal in self.terminals() {
332 terminal.update(cx, |terminal, cx| {
333 terminal.update_command_label(&title, cx);
334 });
335 }
336 }
337 self.label.update(cx, |label, cx| {
338 if self.kind == acp::ToolKind::Execute {
339 label.replace(title, cx);
340 } else if self.kind == acp::ToolKind::Edit {
341 label.replace(MarkdownEscaped(&title).to_string(), cx)
342 } else if let Some((first_line, _)) = title.split_once("\n") {
343 label.replace(first_line.to_owned() + "…", cx);
344 } else {
345 label.replace(title, cx);
346 }
347 });
348 }
349
350 if let Some(content) = content {
351 let mut new_content_len = content.len();
352 let mut content = content.into_iter();
353
354 // Reuse existing content if we can
355 for (old, new) in self.content.iter_mut().zip(content.by_ref()) {
356 let valid_content =
357 old.update_from_acp(new, language_registry.clone(), path_style, terminals, cx)?;
358 if !valid_content {
359 new_content_len -= 1;
360 }
361 }
362 for new in content {
363 if let Some(new) = ToolCallContent::from_acp(
364 new,
365 language_registry.clone(),
366 path_style,
367 terminals,
368 cx,
369 )? {
370 self.content.push(new);
371 } else {
372 new_content_len -= 1;
373 }
374 }
375 self.content.truncate(new_content_len);
376 }
377
378 if let Some(locations) = locations {
379 self.locations = locations;
380 }
381
382 if let Some(raw_input) = raw_input {
383 self.raw_input_markdown = markdown_for_raw_output(&raw_input, &language_registry, cx);
384 self.raw_input = Some(raw_input);
385 }
386
387 if let Some(raw_output) = raw_output {
388 if self.content.is_empty()
389 && let Some(markdown) = markdown_for_raw_output(&raw_output, &language_registry, cx)
390 {
391 self.content
392 .push(ToolCallContent::ContentBlock(ContentBlock::Markdown {
393 markdown,
394 }));
395 }
396 self.raw_output = Some(raw_output);
397 }
398 Ok(())
399 }
400
401 pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
402 self.content.iter().filter_map(|content| match content {
403 ToolCallContent::Diff(diff) => Some(diff),
404 ToolCallContent::ContentBlock(_) => None,
405 ToolCallContent::Terminal(_) => None,
406 })
407 }
408
409 pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
410 self.content.iter().filter_map(|content| match content {
411 ToolCallContent::Terminal(terminal) => Some(terminal),
412 ToolCallContent::ContentBlock(_) => None,
413 ToolCallContent::Diff(_) => None,
414 })
415 }
416
417 pub fn is_subagent(&self) -> bool {
418 self.tool_name.as_ref().is_some_and(|s| s == "spawn_agent")
419 || self.subagent_session_info.is_some()
420 }
421
422 pub fn to_markdown(&self, cx: &App) -> String {
423 let mut markdown = format!(
424 "**Tool Call: {}**\nStatus: {}\n\n",
425 self.label.read(cx).source(),
426 self.status
427 );
428 for content in &self.content {
429 markdown.push_str(content.to_markdown(cx).as_str());
430 markdown.push_str("\n\n");
431 }
432 markdown
433 }
434
435 async fn resolve_location(
436 location: acp::ToolCallLocation,
437 project: WeakEntity<Project>,
438 cx: &mut AsyncApp,
439 ) -> Option<ResolvedLocation> {
440 let buffer = project
441 .update(cx, |project, cx| {
442 project
443 .project_path_for_absolute_path(&location.path, cx)
444 .map(|path| project.open_buffer(path, cx))
445 })
446 .ok()??;
447 let buffer = buffer.await.log_err()?;
448 let position = buffer.update(cx, |buffer, _| {
449 let snapshot = buffer.snapshot();
450 if let Some(row) = location.line {
451 let column = snapshot.indent_size_for_line(row).len;
452 let point = snapshot.clip_point(Point::new(row, column), Bias::Left);
453 snapshot.anchor_before(point)
454 } else {
455 Anchor::min_for_buffer(snapshot.remote_id())
456 }
457 });
458
459 Some(ResolvedLocation { buffer, position })
460 }
461
462 fn resolve_locations(
463 &self,
464 project: Entity<Project>,
465 cx: &mut App,
466 ) -> Task<Vec<Option<ResolvedLocation>>> {
467 let locations = self.locations.clone();
468 project.update(cx, |_, cx| {
469 cx.spawn(async move |project, cx| {
470 let mut new_locations = Vec::new();
471 for location in locations {
472 new_locations.push(Self::resolve_location(location, project.clone(), cx).await);
473 }
474 new_locations
475 })
476 })
477 }
478}
479
480// Separate so we can hold a strong reference to the buffer
481// for saving on the thread
482#[derive(Clone, Debug, PartialEq, Eq)]
483struct ResolvedLocation {
484 buffer: Entity<Buffer>,
485 position: Anchor,
486}
487
488impl From<&ResolvedLocation> for AgentLocation {
489 fn from(value: &ResolvedLocation) -> Self {
490 Self {
491 buffer: value.buffer.downgrade(),
492 position: value.position,
493 }
494 }
495}
496
497#[derive(Debug)]
498pub enum ToolCallStatus {
499 /// The tool call hasn't started running yet, but we start showing it to
500 /// the user.
501 Pending,
502 /// The tool call is waiting for confirmation from the user.
503 WaitingForConfirmation {
504 options: PermissionOptions,
505 respond_tx: oneshot::Sender<acp::PermissionOptionId>,
506 },
507 /// The tool call is currently running.
508 InProgress,
509 /// The tool call completed successfully.
510 Completed,
511 /// The tool call failed.
512 Failed,
513 /// The user rejected the tool call.
514 Rejected,
515 /// The user canceled generation so the tool call was canceled.
516 Canceled,
517}
518
519impl From<acp::ToolCallStatus> for ToolCallStatus {
520 fn from(status: acp::ToolCallStatus) -> Self {
521 match status {
522 acp::ToolCallStatus::Pending => Self::Pending,
523 acp::ToolCallStatus::InProgress => Self::InProgress,
524 acp::ToolCallStatus::Completed => Self::Completed,
525 acp::ToolCallStatus::Failed => Self::Failed,
526 _ => Self::Pending,
527 }
528 }
529}
530
531impl Display for ToolCallStatus {
532 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
533 write!(
534 f,
535 "{}",
536 match self {
537 ToolCallStatus::Pending => "Pending",
538 ToolCallStatus::WaitingForConfirmation { .. } => "Waiting for confirmation",
539 ToolCallStatus::InProgress => "In Progress",
540 ToolCallStatus::Completed => "Completed",
541 ToolCallStatus::Failed => "Failed",
542 ToolCallStatus::Rejected => "Rejected",
543 ToolCallStatus::Canceled => "Canceled",
544 }
545 )
546 }
547}
548
549#[derive(Debug, PartialEq, Clone)]
550pub enum ContentBlock {
551 Empty,
552 Markdown { markdown: Entity<Markdown> },
553 ResourceLink { resource_link: acp::ResourceLink },
554 Image { image: Arc<gpui::Image> },
555}
556
557impl ContentBlock {
558 pub fn new(
559 block: acp::ContentBlock,
560 language_registry: &Arc<LanguageRegistry>,
561 path_style: PathStyle,
562 cx: &mut App,
563 ) -> Self {
564 let mut this = Self::Empty;
565 this.append(block, language_registry, path_style, cx);
566 this
567 }
568
569 pub fn new_combined(
570 blocks: impl IntoIterator<Item = acp::ContentBlock>,
571 language_registry: Arc<LanguageRegistry>,
572 path_style: PathStyle,
573 cx: &mut App,
574 ) -> Self {
575 let mut this = Self::Empty;
576 for block in blocks {
577 this.append(block, &language_registry, path_style, cx);
578 }
579 this
580 }
581
582 pub fn append(
583 &mut self,
584 block: acp::ContentBlock,
585 language_registry: &Arc<LanguageRegistry>,
586 path_style: PathStyle,
587 cx: &mut App,
588 ) {
589 match (&mut *self, &block) {
590 (ContentBlock::Empty, acp::ContentBlock::ResourceLink(resource_link)) => {
591 *self = ContentBlock::ResourceLink {
592 resource_link: resource_link.clone(),
593 };
594 }
595 (ContentBlock::Empty, acp::ContentBlock::Image(image_content)) => {
596 if let Some(image) = Self::decode_image(image_content) {
597 *self = ContentBlock::Image { image };
598 } else {
599 let new_content = Self::image_md(image_content);
600 *self = Self::create_markdown_block(new_content, language_registry, cx);
601 }
602 }
603 (ContentBlock::Empty, _) => {
604 let new_content = Self::block_string_contents(&block, path_style);
605 *self = Self::create_markdown_block(new_content, language_registry, cx);
606 }
607 (ContentBlock::Markdown { markdown }, _) => {
608 let new_content = Self::block_string_contents(&block, path_style);
609 markdown.update(cx, |markdown, cx| markdown.append(&new_content, cx));
610 }
611 (ContentBlock::ResourceLink { resource_link }, _) => {
612 let existing_content = Self::resource_link_md(&resource_link.uri, path_style);
613 let new_content = Self::block_string_contents(&block, path_style);
614 let combined = format!("{}\n{}", existing_content, new_content);
615 *self = Self::create_markdown_block(combined, language_registry, cx);
616 }
617 (ContentBlock::Image { .. }, _) => {
618 let new_content = Self::block_string_contents(&block, path_style);
619 let combined = format!("`Image`\n{}", new_content);
620 *self = Self::create_markdown_block(combined, language_registry, cx);
621 }
622 }
623 }
624
625 fn decode_image(image_content: &acp::ImageContent) -> Option<Arc<gpui::Image>> {
626 use base64::Engine as _;
627
628 let bytes = base64::engine::general_purpose::STANDARD
629 .decode(image_content.data.as_bytes())
630 .ok()?;
631 let format = gpui::ImageFormat::from_mime_type(&image_content.mime_type)?;
632 Some(Arc::new(gpui::Image::from_bytes(format, bytes)))
633 }
634
635 fn create_markdown_block(
636 content: String,
637 language_registry: &Arc<LanguageRegistry>,
638 cx: &mut App,
639 ) -> ContentBlock {
640 ContentBlock::Markdown {
641 markdown: cx
642 .new(|cx| Markdown::new(content.into(), Some(language_registry.clone()), None, cx)),
643 }
644 }
645
646 fn block_string_contents(block: &acp::ContentBlock, path_style: PathStyle) -> String {
647 match block {
648 acp::ContentBlock::Text(text_content) => text_content.text.clone(),
649 acp::ContentBlock::ResourceLink(resource_link) => {
650 Self::resource_link_md(&resource_link.uri, path_style)
651 }
652 acp::ContentBlock::Resource(acp::EmbeddedResource {
653 resource:
654 acp::EmbeddedResourceResource::TextResourceContents(acp::TextResourceContents {
655 uri,
656 ..
657 }),
658 ..
659 }) => Self::resource_link_md(uri, path_style),
660 acp::ContentBlock::Image(image) => Self::image_md(image),
661 _ => String::new(),
662 }
663 }
664
665 fn resource_link_md(uri: &str, path_style: PathStyle) -> String {
666 if let Some(uri) = MentionUri::parse(uri, path_style).log_err() {
667 uri.as_link().to_string()
668 } else {
669 uri.to_string()
670 }
671 }
672
673 fn image_md(_image: &acp::ImageContent) -> String {
674 "`Image`".into()
675 }
676
677 pub fn to_markdown<'a>(&'a self, cx: &'a App) -> &'a str {
678 match self {
679 ContentBlock::Empty => "",
680 ContentBlock::Markdown { markdown } => markdown.read(cx).source(),
681 ContentBlock::ResourceLink { resource_link } => &resource_link.uri,
682 ContentBlock::Image { .. } => "`Image`",
683 }
684 }
685
686 pub fn markdown(&self) -> Option<&Entity<Markdown>> {
687 match self {
688 ContentBlock::Empty => None,
689 ContentBlock::Markdown { markdown } => Some(markdown),
690 ContentBlock::ResourceLink { .. } => None,
691 ContentBlock::Image { .. } => None,
692 }
693 }
694
695 pub fn resource_link(&self) -> Option<&acp::ResourceLink> {
696 match self {
697 ContentBlock::ResourceLink { resource_link } => Some(resource_link),
698 _ => None,
699 }
700 }
701
702 pub fn image(&self) -> Option<&Arc<gpui::Image>> {
703 match self {
704 ContentBlock::Image { image } => Some(image),
705 _ => None,
706 }
707 }
708}
709
710#[derive(Debug)]
711pub enum ToolCallContent {
712 ContentBlock(ContentBlock),
713 Diff(Entity<Diff>),
714 Terminal(Entity<Terminal>),
715}
716
717impl ToolCallContent {
718 pub fn from_acp(
719 content: acp::ToolCallContent,
720 language_registry: Arc<LanguageRegistry>,
721 path_style: PathStyle,
722 terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
723 cx: &mut App,
724 ) -> Result<Option<Self>> {
725 match content {
726 acp::ToolCallContent::Content(acp::Content { content, .. }) => {
727 Ok(Some(Self::ContentBlock(ContentBlock::new(
728 content,
729 &language_registry,
730 path_style,
731 cx,
732 ))))
733 }
734 acp::ToolCallContent::Diff(diff) => Ok(Some(Self::Diff(cx.new(|cx| {
735 Diff::finalized(
736 diff.path.to_string_lossy().into_owned(),
737 diff.old_text,
738 diff.new_text,
739 language_registry,
740 cx,
741 )
742 })))),
743 acp::ToolCallContent::Terminal(acp::Terminal { terminal_id, .. }) => terminals
744 .get(&terminal_id)
745 .cloned()
746 .map(|terminal| Some(Self::Terminal(terminal)))
747 .ok_or_else(|| anyhow::anyhow!("Terminal with id `{}` not found", terminal_id)),
748 _ => Ok(None),
749 }
750 }
751
752 pub fn update_from_acp(
753 &mut self,
754 new: acp::ToolCallContent,
755 language_registry: Arc<LanguageRegistry>,
756 path_style: PathStyle,
757 terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
758 cx: &mut App,
759 ) -> Result<bool> {
760 let needs_update = match (&self, &new) {
761 (Self::Diff(old_diff), acp::ToolCallContent::Diff(new_diff)) => {
762 old_diff.read(cx).needs_update(
763 new_diff.old_text.as_deref().unwrap_or(""),
764 &new_diff.new_text,
765 cx,
766 )
767 }
768 _ => true,
769 };
770
771 if let Some(update) = Self::from_acp(new, language_registry, path_style, terminals, cx)? {
772 if needs_update {
773 *self = update;
774 }
775 Ok(true)
776 } else {
777 Ok(false)
778 }
779 }
780
781 pub fn to_markdown(&self, cx: &App) -> String {
782 match self {
783 Self::ContentBlock(content) => content.to_markdown(cx).to_string(),
784 Self::Diff(diff) => diff.read(cx).to_markdown(cx),
785 Self::Terminal(terminal) => terminal.read(cx).to_markdown(cx),
786 }
787 }
788
789 pub fn image(&self) -> Option<&Arc<gpui::Image>> {
790 match self {
791 Self::ContentBlock(content) => content.image(),
792 _ => None,
793 }
794 }
795}
796
797#[derive(Debug, PartialEq)]
798pub enum ToolCallUpdate {
799 UpdateFields(acp::ToolCallUpdate),
800 UpdateDiff(ToolCallUpdateDiff),
801 UpdateTerminal(ToolCallUpdateTerminal),
802}
803
804impl ToolCallUpdate {
805 fn id(&self) -> &acp::ToolCallId {
806 match self {
807 Self::UpdateFields(update) => &update.tool_call_id,
808 Self::UpdateDiff(diff) => &diff.id,
809 Self::UpdateTerminal(terminal) => &terminal.id,
810 }
811 }
812}
813
814impl From<acp::ToolCallUpdate> for ToolCallUpdate {
815 fn from(update: acp::ToolCallUpdate) -> Self {
816 Self::UpdateFields(update)
817 }
818}
819
820impl From<ToolCallUpdateDiff> for ToolCallUpdate {
821 fn from(diff: ToolCallUpdateDiff) -> Self {
822 Self::UpdateDiff(diff)
823 }
824}
825
826#[derive(Debug, PartialEq)]
827pub struct ToolCallUpdateDiff {
828 pub id: acp::ToolCallId,
829 pub diff: Entity<Diff>,
830}
831
832impl From<ToolCallUpdateTerminal> for ToolCallUpdate {
833 fn from(terminal: ToolCallUpdateTerminal) -> Self {
834 Self::UpdateTerminal(terminal)
835 }
836}
837
838#[derive(Debug, PartialEq)]
839pub struct ToolCallUpdateTerminal {
840 pub id: acp::ToolCallId,
841 pub terminal: Entity<Terminal>,
842}
843
844#[derive(Debug, Default)]
845pub struct Plan {
846 pub entries: Vec<PlanEntry>,
847}
848
849#[derive(Debug)]
850pub struct PlanStats<'a> {
851 pub in_progress_entry: Option<&'a PlanEntry>,
852 pub pending: u32,
853 pub completed: u32,
854}
855
856impl Plan {
857 pub fn is_empty(&self) -> bool {
858 self.entries.is_empty()
859 }
860
861 pub fn stats(&self) -> PlanStats<'_> {
862 let mut stats = PlanStats {
863 in_progress_entry: None,
864 pending: 0,
865 completed: 0,
866 };
867
868 for entry in &self.entries {
869 match &entry.status {
870 acp::PlanEntryStatus::Pending => {
871 stats.pending += 1;
872 }
873 acp::PlanEntryStatus::InProgress => {
874 stats.in_progress_entry = stats.in_progress_entry.or(Some(entry));
875 }
876 acp::PlanEntryStatus::Completed => {
877 stats.completed += 1;
878 }
879 _ => {}
880 }
881 }
882
883 stats
884 }
885}
886
887#[derive(Debug)]
888pub struct PlanEntry {
889 pub content: Entity<Markdown>,
890 pub priority: acp::PlanEntryPriority,
891 pub status: acp::PlanEntryStatus,
892}
893
894impl PlanEntry {
895 pub fn from_acp(entry: acp::PlanEntry, cx: &mut App) -> Self {
896 Self {
897 content: cx.new(|cx| Markdown::new(entry.content.into(), None, None, cx)),
898 priority: entry.priority,
899 status: entry.status,
900 }
901 }
902}
903
904#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
905pub struct TokenUsage {
906 pub max_tokens: u64,
907 pub used_tokens: u64,
908 pub input_tokens: u64,
909 pub output_tokens: u64,
910 pub max_output_tokens: Option<u64>,
911}
912
913pub const TOKEN_USAGE_WARNING_THRESHOLD: f32 = 0.8;
914
915impl TokenUsage {
916 pub fn ratio(&self) -> TokenUsageRatio {
917 #[cfg(debug_assertions)]
918 let warning_threshold: f32 = std::env::var("ZED_THREAD_WARNING_THRESHOLD")
919 .unwrap_or(TOKEN_USAGE_WARNING_THRESHOLD.to_string())
920 .parse()
921 .unwrap();
922 #[cfg(not(debug_assertions))]
923 let warning_threshold: f32 = TOKEN_USAGE_WARNING_THRESHOLD;
924
925 // When the maximum is unknown because there is no selected model,
926 // avoid showing the token limit warning.
927 if self.max_tokens == 0 {
928 TokenUsageRatio::Normal
929 } else if self.used_tokens >= self.max_tokens {
930 TokenUsageRatio::Exceeded
931 } else if self.used_tokens as f32 / self.max_tokens as f32 >= warning_threshold {
932 TokenUsageRatio::Warning
933 } else {
934 TokenUsageRatio::Normal
935 }
936 }
937}
938
939#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
940pub enum TokenUsageRatio {
941 Normal,
942 Warning,
943 Exceeded,
944}
945
946#[derive(Debug, Clone)]
947pub struct RetryStatus {
948 pub last_error: SharedString,
949 pub attempt: usize,
950 pub max_attempts: usize,
951 pub started_at: Instant,
952 pub duration: Duration,
953}
954
955struct RunningTurn {
956 id: u32,
957 send_task: Task<()>,
958}
959
960pub struct AcpThread {
961 session_id: acp::SessionId,
962 work_dirs: Option<PathList>,
963 parent_session_id: Option<acp::SessionId>,
964 title: SharedString,
965 provisional_title: Option<SharedString>,
966 entries: Vec<AgentThreadEntry>,
967 plan: Plan,
968 project: Entity<Project>,
969 action_log: Entity<ActionLog>,
970 shared_buffers: HashMap<Entity<Buffer>, BufferSnapshot>,
971 turn_id: u32,
972 running_turn: Option<RunningTurn>,
973 connection: Rc<dyn AgentConnection>,
974 token_usage: Option<TokenUsage>,
975 prompt_capabilities: acp::PromptCapabilities,
976 _observe_prompt_capabilities: Task<anyhow::Result<()>>,
977 terminals: HashMap<acp::TerminalId, Entity<Terminal>>,
978 pending_terminal_output: HashMap<acp::TerminalId, Vec<Vec<u8>>>,
979 pending_terminal_exit: HashMap<acp::TerminalId, acp::TerminalExitStatus>,
980 had_error: bool,
981 /// The user's unsent prompt text, persisted so it can be restored when reloading the thread.
982 draft_prompt: Option<Vec<acp::ContentBlock>>,
983 /// The initial scroll position for the thread view, set during session registration.
984 ui_scroll_position: Option<gpui::ListOffset>,
985 /// Buffer for smooth text streaming. Holds text that has been received from
986 /// the model but not yet revealed in the UI. A timer task drains this buffer
987 /// gradually to create a fluid typing effect instead of choppy chunk-at-a-time
988 /// updates.
989 streaming_text_buffer: Option<StreamingTextBuffer>,
990}
991
992struct StreamingTextBuffer {
993 /// Text received from the model but not yet appended to the Markdown source.
994 pending: String,
995 /// The number of bytes to reveal per timer turn.
996 bytes_to_reveal_per_tick: usize,
997 /// The Markdown entity being streamed into.
998 target: Entity<Markdown>,
999 /// Timer task that periodically moves text from `pending` into `source`.
1000 _reveal_task: Task<()>,
1001}
1002
1003impl StreamingTextBuffer {
1004 /// The number of milliseconds between each timer tick, controlling how quickly
1005 /// text is revealed.
1006 const TASK_UPDATE_MS: u64 = 16;
1007 /// The time in milliseconds to reveal the entire pending text.
1008 const REVEAL_TARGET: f32 = 200.0;
1009}
1010
1011impl From<&AcpThread> for ActionLogTelemetry {
1012 fn from(value: &AcpThread) -> Self {
1013 Self {
1014 agent_telemetry_id: value.connection().telemetry_id(),
1015 session_id: value.session_id.0.clone(),
1016 }
1017 }
1018}
1019
1020#[derive(Debug)]
1021pub enum AcpThreadEvent {
1022 NewEntry,
1023 TitleUpdated,
1024 TokenUsageUpdated,
1025 EntryUpdated(usize),
1026 EntriesRemoved(Range<usize>),
1027 ToolAuthorizationRequested(acp::ToolCallId),
1028 ToolAuthorizationReceived(acp::ToolCallId),
1029 Retry(RetryStatus),
1030 SubagentSpawned(acp::SessionId),
1031 Stopped(acp::StopReason),
1032 Error,
1033 LoadError(LoadError),
1034 PromptCapabilitiesUpdated,
1035 Refusal,
1036 AvailableCommandsUpdated(Vec<acp::AvailableCommand>),
1037 ModeUpdated(acp::SessionModeId),
1038 ConfigOptionsUpdated(Vec<acp::SessionConfigOption>),
1039}
1040
1041impl EventEmitter<AcpThreadEvent> for AcpThread {}
1042
1043#[derive(Debug, Clone)]
1044pub enum TerminalProviderEvent {
1045 Created {
1046 terminal_id: acp::TerminalId,
1047 label: String,
1048 cwd: Option<PathBuf>,
1049 output_byte_limit: Option<u64>,
1050 terminal: Entity<::terminal::Terminal>,
1051 },
1052 Output {
1053 terminal_id: acp::TerminalId,
1054 data: Vec<u8>,
1055 },
1056 TitleChanged {
1057 terminal_id: acp::TerminalId,
1058 title: String,
1059 },
1060 Exit {
1061 terminal_id: acp::TerminalId,
1062 status: acp::TerminalExitStatus,
1063 },
1064}
1065
1066#[derive(Debug, Clone)]
1067pub enum TerminalProviderCommand {
1068 WriteInput {
1069 terminal_id: acp::TerminalId,
1070 bytes: Vec<u8>,
1071 },
1072 Resize {
1073 terminal_id: acp::TerminalId,
1074 cols: u16,
1075 rows: u16,
1076 },
1077 Close {
1078 terminal_id: acp::TerminalId,
1079 },
1080}
1081
1082#[derive(PartialEq, Eq, Debug)]
1083pub enum ThreadStatus {
1084 Idle,
1085 Generating,
1086}
1087
1088#[derive(Debug, Clone)]
1089pub enum LoadError {
1090 Unsupported {
1091 command: SharedString,
1092 current_version: SharedString,
1093 minimum_version: SharedString,
1094 },
1095 FailedToInstall(SharedString),
1096 Exited {
1097 status: ExitStatus,
1098 },
1099 Other(SharedString),
1100}
1101
1102impl Display for LoadError {
1103 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1104 match self {
1105 LoadError::Unsupported {
1106 command: path,
1107 current_version,
1108 minimum_version,
1109 } => {
1110 write!(
1111 f,
1112 "version {current_version} from {path} is not supported (need at least {minimum_version})"
1113 )
1114 }
1115 LoadError::FailedToInstall(msg) => write!(f, "Failed to install: {msg}"),
1116 LoadError::Exited { status } => write!(f, "Server exited with status {status}"),
1117 LoadError::Other(msg) => write!(f, "{msg}"),
1118 }
1119 }
1120}
1121
1122impl Error for LoadError {}
1123
1124impl AcpThread {
1125 pub fn new(
1126 parent_session_id: Option<acp::SessionId>,
1127 title: impl Into<SharedString>,
1128 work_dirs: Option<PathList>,
1129 connection: Rc<dyn AgentConnection>,
1130 project: Entity<Project>,
1131 action_log: Entity<ActionLog>,
1132 session_id: acp::SessionId,
1133 mut prompt_capabilities_rx: watch::Receiver<acp::PromptCapabilities>,
1134 cx: &mut Context<Self>,
1135 ) -> Self {
1136 let prompt_capabilities = prompt_capabilities_rx.borrow().clone();
1137 let task = cx.spawn::<_, anyhow::Result<()>>(async move |this, cx| {
1138 loop {
1139 let caps = prompt_capabilities_rx.recv().await?;
1140 this.update(cx, |this, cx| {
1141 this.prompt_capabilities = caps;
1142 cx.emit(AcpThreadEvent::PromptCapabilitiesUpdated);
1143 })?;
1144 }
1145 });
1146
1147 Self {
1148 parent_session_id,
1149 work_dirs,
1150 action_log,
1151 shared_buffers: Default::default(),
1152 entries: Default::default(),
1153 plan: Default::default(),
1154 title: title.into(),
1155 provisional_title: None,
1156 project,
1157 running_turn: None,
1158 turn_id: 0,
1159 connection,
1160 session_id,
1161 token_usage: None,
1162 prompt_capabilities,
1163 _observe_prompt_capabilities: task,
1164 terminals: HashMap::default(),
1165 pending_terminal_output: HashMap::default(),
1166 pending_terminal_exit: HashMap::default(),
1167 had_error: false,
1168 draft_prompt: None,
1169 ui_scroll_position: None,
1170 streaming_text_buffer: None,
1171 }
1172 }
1173
1174 pub fn parent_session_id(&self) -> Option<&acp::SessionId> {
1175 self.parent_session_id.as_ref()
1176 }
1177
1178 pub fn prompt_capabilities(&self) -> acp::PromptCapabilities {
1179 self.prompt_capabilities.clone()
1180 }
1181
1182 pub fn draft_prompt(&self) -> Option<&[acp::ContentBlock]> {
1183 self.draft_prompt.as_deref()
1184 }
1185
1186 pub fn set_draft_prompt(&mut self, prompt: Option<Vec<acp::ContentBlock>>) {
1187 self.draft_prompt = prompt;
1188 }
1189
1190 pub fn ui_scroll_position(&self) -> Option<gpui::ListOffset> {
1191 self.ui_scroll_position
1192 }
1193
1194 pub fn set_ui_scroll_position(&mut self, position: Option<gpui::ListOffset>) {
1195 self.ui_scroll_position = position;
1196 }
1197
1198 pub fn connection(&self) -> &Rc<dyn AgentConnection> {
1199 &self.connection
1200 }
1201
1202 pub fn action_log(&self) -> &Entity<ActionLog> {
1203 &self.action_log
1204 }
1205
1206 pub fn project(&self) -> &Entity<Project> {
1207 &self.project
1208 }
1209
1210 pub fn title(&self) -> SharedString {
1211 self.provisional_title
1212 .clone()
1213 .unwrap_or_else(|| self.title.clone())
1214 }
1215
1216 pub fn has_provisional_title(&self) -> bool {
1217 self.provisional_title.is_some()
1218 }
1219
1220 pub fn entries(&self) -> &[AgentThreadEntry] {
1221 &self.entries
1222 }
1223
1224 pub fn session_id(&self) -> &acp::SessionId {
1225 &self.session_id
1226 }
1227
1228 pub fn work_dirs(&self) -> Option<&PathList> {
1229 self.work_dirs.as_ref()
1230 }
1231
1232 pub fn status(&self) -> ThreadStatus {
1233 if self.running_turn.is_some() {
1234 ThreadStatus::Generating
1235 } else {
1236 ThreadStatus::Idle
1237 }
1238 }
1239
1240 pub fn had_error(&self) -> bool {
1241 self.had_error
1242 }
1243
1244 pub fn is_waiting_for_confirmation(&self) -> bool {
1245 for entry in self.entries.iter().rev() {
1246 match entry {
1247 AgentThreadEntry::UserMessage(_) => return false,
1248 AgentThreadEntry::ToolCall(ToolCall {
1249 status: ToolCallStatus::WaitingForConfirmation { .. },
1250 ..
1251 }) => return true,
1252 AgentThreadEntry::ToolCall(_) | AgentThreadEntry::AssistantMessage(_) => {}
1253 }
1254 }
1255 false
1256 }
1257
1258 pub fn token_usage(&self) -> Option<&TokenUsage> {
1259 self.token_usage.as_ref()
1260 }
1261
1262 pub fn has_pending_edit_tool_calls(&self) -> bool {
1263 for entry in self.entries.iter().rev() {
1264 match entry {
1265 AgentThreadEntry::UserMessage(_) => return false,
1266 AgentThreadEntry::ToolCall(
1267 call @ ToolCall {
1268 status: ToolCallStatus::InProgress | ToolCallStatus::Pending,
1269 ..
1270 },
1271 ) if call.diffs().next().is_some() => {
1272 return true;
1273 }
1274 AgentThreadEntry::ToolCall(_) | AgentThreadEntry::AssistantMessage(_) => {}
1275 }
1276 }
1277
1278 false
1279 }
1280
1281 pub fn has_in_progress_tool_calls(&self) -> bool {
1282 for entry in self.entries.iter().rev() {
1283 match entry {
1284 AgentThreadEntry::UserMessage(_) => return false,
1285 AgentThreadEntry::ToolCall(ToolCall {
1286 status: ToolCallStatus::InProgress | ToolCallStatus::Pending,
1287 ..
1288 }) => {
1289 return true;
1290 }
1291 AgentThreadEntry::ToolCall(_) | AgentThreadEntry::AssistantMessage(_) => {}
1292 }
1293 }
1294
1295 false
1296 }
1297
1298 pub fn used_tools_since_last_user_message(&self) -> bool {
1299 for entry in self.entries.iter().rev() {
1300 match entry {
1301 AgentThreadEntry::UserMessage(..) => return false,
1302 AgentThreadEntry::AssistantMessage(..) => continue,
1303 AgentThreadEntry::ToolCall(..) => return true,
1304 }
1305 }
1306
1307 false
1308 }
1309
1310 pub fn handle_session_update(
1311 &mut self,
1312 update: acp::SessionUpdate,
1313 cx: &mut Context<Self>,
1314 ) -> Result<(), acp::Error> {
1315 match update {
1316 acp::SessionUpdate::UserMessageChunk(acp::ContentChunk { content, .. }) => {
1317 self.push_user_content_block(None, content, cx);
1318 }
1319 acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk { content, .. }) => {
1320 self.push_assistant_content_block(content, false, cx);
1321 }
1322 acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk { content, .. }) => {
1323 self.push_assistant_content_block(content, true, cx);
1324 }
1325 acp::SessionUpdate::ToolCall(tool_call) => {
1326 self.upsert_tool_call(tool_call, cx)?;
1327 }
1328 acp::SessionUpdate::ToolCallUpdate(tool_call_update) => {
1329 self.update_tool_call(tool_call_update, cx)?;
1330 }
1331 acp::SessionUpdate::Plan(plan) => {
1332 self.update_plan(plan, cx);
1333 }
1334 acp::SessionUpdate::AvailableCommandsUpdate(acp::AvailableCommandsUpdate {
1335 available_commands,
1336 ..
1337 }) => cx.emit(AcpThreadEvent::AvailableCommandsUpdated(available_commands)),
1338 acp::SessionUpdate::CurrentModeUpdate(acp::CurrentModeUpdate {
1339 current_mode_id,
1340 ..
1341 }) => cx.emit(AcpThreadEvent::ModeUpdated(current_mode_id)),
1342 acp::SessionUpdate::ConfigOptionUpdate(acp::ConfigOptionUpdate {
1343 config_options,
1344 ..
1345 }) => cx.emit(AcpThreadEvent::ConfigOptionsUpdated(config_options)),
1346 _ => {}
1347 }
1348 Ok(())
1349 }
1350
1351 pub fn push_user_content_block(
1352 &mut self,
1353 message_id: Option<UserMessageId>,
1354 chunk: acp::ContentBlock,
1355 cx: &mut Context<Self>,
1356 ) {
1357 self.push_user_content_block_with_indent(message_id, chunk, false, cx)
1358 }
1359
1360 pub fn push_user_content_block_with_indent(
1361 &mut self,
1362 message_id: Option<UserMessageId>,
1363 chunk: acp::ContentBlock,
1364 indented: bool,
1365 cx: &mut Context<Self>,
1366 ) {
1367 let language_registry = self.project.read(cx).languages().clone();
1368 let path_style = self.project.read(cx).path_style(cx);
1369 let entries_len = self.entries.len();
1370
1371 if let Some(last_entry) = self.entries.last_mut()
1372 && let AgentThreadEntry::UserMessage(UserMessage {
1373 id,
1374 content,
1375 chunks,
1376 indented: existing_indented,
1377 ..
1378 }) = last_entry
1379 && *existing_indented == indented
1380 {
1381 Self::flush_streaming_text(&mut self.streaming_text_buffer, cx);
1382 *id = message_id.or(id.take());
1383 content.append(chunk.clone(), &language_registry, path_style, cx);
1384 chunks.push(chunk);
1385 let idx = entries_len - 1;
1386 cx.emit(AcpThreadEvent::EntryUpdated(idx));
1387 } else {
1388 let content = ContentBlock::new(chunk.clone(), &language_registry, path_style, cx);
1389 self.push_entry(
1390 AgentThreadEntry::UserMessage(UserMessage {
1391 id: message_id,
1392 content,
1393 chunks: vec![chunk],
1394 checkpoint: None,
1395 indented,
1396 }),
1397 cx,
1398 );
1399 }
1400 }
1401
1402 pub fn push_assistant_content_block(
1403 &mut self,
1404 chunk: acp::ContentBlock,
1405 is_thought: bool,
1406 cx: &mut Context<Self>,
1407 ) {
1408 self.push_assistant_content_block_with_indent(chunk, is_thought, false, cx)
1409 }
1410
1411 pub fn push_assistant_content_block_with_indent(
1412 &mut self,
1413 chunk: acp::ContentBlock,
1414 is_thought: bool,
1415 indented: bool,
1416 cx: &mut Context<Self>,
1417 ) {
1418 let path_style = self.project.read(cx).path_style(cx);
1419
1420 // For text chunks going to an existing Markdown block, buffer for smooth
1421 // streaming instead of appending all at once which may feel more choppy.
1422 if let acp::ContentBlock::Text(text_content) = &chunk {
1423 if let Some(markdown) = self.streaming_markdown_target(is_thought, indented) {
1424 let entries_len = self.entries.len();
1425 cx.emit(AcpThreadEvent::EntryUpdated(entries_len - 1));
1426 self.buffer_streaming_text(&markdown, text_content.text.clone(), cx);
1427 return;
1428 }
1429 }
1430
1431 let language_registry = self.project.read(cx).languages().clone();
1432 let entries_len = self.entries.len();
1433 if let Some(last_entry) = self.entries.last_mut()
1434 && let AgentThreadEntry::AssistantMessage(AssistantMessage {
1435 chunks,
1436 indented: existing_indented,
1437 is_subagent_output: _,
1438 }) = last_entry
1439 && *existing_indented == indented
1440 {
1441 let idx = entries_len - 1;
1442 Self::flush_streaming_text(&mut self.streaming_text_buffer, cx);
1443 cx.emit(AcpThreadEvent::EntryUpdated(idx));
1444 match (chunks.last_mut(), is_thought) {
1445 (Some(AssistantMessageChunk::Message { block }), false)
1446 | (Some(AssistantMessageChunk::Thought { block }), true) => {
1447 block.append(chunk, &language_registry, path_style, cx)
1448 }
1449 _ => {
1450 let block = ContentBlock::new(chunk, &language_registry, path_style, cx);
1451 if is_thought {
1452 chunks.push(AssistantMessageChunk::Thought { block })
1453 } else {
1454 chunks.push(AssistantMessageChunk::Message { block })
1455 }
1456 }
1457 }
1458 } else {
1459 let block = ContentBlock::new(chunk, &language_registry, path_style, cx);
1460 let chunk = if is_thought {
1461 AssistantMessageChunk::Thought { block }
1462 } else {
1463 AssistantMessageChunk::Message { block }
1464 };
1465
1466 self.push_entry(
1467 AgentThreadEntry::AssistantMessage(AssistantMessage {
1468 chunks: vec![chunk],
1469 indented,
1470 is_subagent_output: false,
1471 }),
1472 cx,
1473 );
1474 }
1475 }
1476
1477 fn streaming_markdown_target(
1478 &self,
1479 is_thought: bool,
1480 indented: bool,
1481 ) -> Option<Entity<Markdown>> {
1482 let last_entry = self.entries.last()?;
1483 if let AgentThreadEntry::AssistantMessage(AssistantMessage {
1484 chunks,
1485 indented: existing_indented,
1486 ..
1487 }) = last_entry
1488 && *existing_indented == indented
1489 && let [.., chunk] = chunks.as_slice()
1490 {
1491 match (chunk, is_thought) {
1492 (
1493 AssistantMessageChunk::Message {
1494 block: ContentBlock::Markdown { markdown },
1495 },
1496 false,
1497 )
1498 | (
1499 AssistantMessageChunk::Thought {
1500 block: ContentBlock::Markdown { markdown },
1501 },
1502 true,
1503 ) => Some(markdown.clone()),
1504 _ => None,
1505 }
1506 } else {
1507 None
1508 }
1509 }
1510
1511 /// Add text to the streaming buffer. If the target changed (e.g. switching
1512 /// from thoughts to message text), flush the old buffer first.
1513 fn buffer_streaming_text(
1514 &mut self,
1515 markdown: &Entity<Markdown>,
1516 text: String,
1517 cx: &mut Context<Self>,
1518 ) {
1519 if let Some(buffer) = &mut self.streaming_text_buffer {
1520 if buffer.target.entity_id() == markdown.entity_id() {
1521 buffer.pending.push_str(&text);
1522
1523 buffer.bytes_to_reveal_per_tick = (buffer.pending.len() as f32
1524 / StreamingTextBuffer::REVEAL_TARGET
1525 * StreamingTextBuffer::TASK_UPDATE_MS as f32)
1526 .ceil() as usize;
1527 return;
1528 }
1529 Self::flush_streaming_text(&mut self.streaming_text_buffer, cx);
1530 }
1531
1532 let target = markdown.clone();
1533 let _reveal_task = self.start_streaming_reveal(cx);
1534 let pending_len = text.len();
1535 let bytes_to_reveal = (pending_len as f32 / StreamingTextBuffer::REVEAL_TARGET
1536 * StreamingTextBuffer::TASK_UPDATE_MS as f32)
1537 .ceil() as usize;
1538 self.streaming_text_buffer = Some(StreamingTextBuffer {
1539 pending: text,
1540 bytes_to_reveal_per_tick: bytes_to_reveal,
1541 target,
1542 _reveal_task,
1543 });
1544 }
1545
1546 /// Flush all buffered streaming text into the Markdown entity immediately.
1547 fn flush_streaming_text(
1548 streaming_text_buffer: &mut Option<StreamingTextBuffer>,
1549 cx: &mut Context<Self>,
1550 ) {
1551 if let Some(buffer) = streaming_text_buffer.take() {
1552 if !buffer.pending.is_empty() {
1553 buffer
1554 .target
1555 .update(cx, |markdown, cx| markdown.append(&buffer.pending, cx));
1556 }
1557 }
1558 }
1559
1560 /// Spawns a foreground task that periodically drains
1561 /// `streaming_text_buffer.pending` into the target `Markdown` entity,
1562 /// producing smooth, continuous text output.
1563 fn start_streaming_reveal(&self, cx: &mut Context<Self>) -> Task<()> {
1564 cx.spawn(async move |this, cx| {
1565 loop {
1566 cx.background_executor()
1567 .timer(Duration::from_millis(StreamingTextBuffer::TASK_UPDATE_MS))
1568 .await;
1569
1570 let should_continue = this
1571 .update(cx, |this, cx| {
1572 let Some(buffer) = &mut this.streaming_text_buffer else {
1573 return false;
1574 };
1575
1576 if buffer.pending.is_empty() {
1577 return true;
1578 }
1579
1580 let pending_len = buffer.pending.len();
1581
1582 let byte_boundary = buffer
1583 .pending
1584 .ceil_char_boundary(buffer.bytes_to_reveal_per_tick)
1585 .min(pending_len);
1586
1587 buffer.target.update(cx, |markdown: &mut Markdown, cx| {
1588 markdown.append(&buffer.pending[..byte_boundary], cx);
1589 buffer.pending.drain(..byte_boundary);
1590 });
1591
1592 true
1593 })
1594 .unwrap_or(false);
1595
1596 if !should_continue {
1597 break;
1598 }
1599 }
1600 })
1601 }
1602
1603 fn push_entry(&mut self, entry: AgentThreadEntry, cx: &mut Context<Self>) {
1604 Self::flush_streaming_text(&mut self.streaming_text_buffer, cx);
1605 self.entries.push(entry);
1606 cx.emit(AcpThreadEvent::NewEntry);
1607 }
1608
1609 pub fn can_set_title(&mut self, cx: &mut Context<Self>) -> bool {
1610 self.connection.set_title(&self.session_id, cx).is_some()
1611 }
1612
1613 pub fn set_title(&mut self, title: SharedString, cx: &mut Context<Self>) -> Task<Result<()>> {
1614 let had_provisional = self.provisional_title.take().is_some();
1615 if title != self.title {
1616 self.title = title.clone();
1617 cx.emit(AcpThreadEvent::TitleUpdated);
1618 if let Some(set_title) = self.connection.set_title(&self.session_id, cx) {
1619 return set_title.run(title, cx);
1620 }
1621 } else if had_provisional {
1622 cx.emit(AcpThreadEvent::TitleUpdated);
1623 }
1624 Task::ready(Ok(()))
1625 }
1626
1627 /// Sets a provisional display title without propagating back to the
1628 /// underlying agent connection. This is used for quick preview titles
1629 /// (e.g. first 20 chars of the user message) that should be shown
1630 /// immediately but replaced once the LLM generates a proper title via
1631 /// `set_title`.
1632 pub fn set_provisional_title(&mut self, title: SharedString, cx: &mut Context<Self>) {
1633 self.provisional_title = Some(title);
1634 cx.emit(AcpThreadEvent::TitleUpdated);
1635 }
1636
1637 pub fn subagent_spawned(&mut self, session_id: acp::SessionId, cx: &mut Context<Self>) {
1638 cx.emit(AcpThreadEvent::SubagentSpawned(session_id));
1639 }
1640
1641 pub fn update_token_usage(&mut self, usage: Option<TokenUsage>, cx: &mut Context<Self>) {
1642 self.token_usage = usage;
1643 cx.emit(AcpThreadEvent::TokenUsageUpdated);
1644 }
1645
1646 pub fn update_retry_status(&mut self, status: RetryStatus, cx: &mut Context<Self>) {
1647 cx.emit(AcpThreadEvent::Retry(status));
1648 }
1649
1650 pub fn update_tool_call(
1651 &mut self,
1652 update: impl Into<ToolCallUpdate>,
1653 cx: &mut Context<Self>,
1654 ) -> Result<()> {
1655 let update = update.into();
1656 let languages = self.project.read(cx).languages().clone();
1657 let path_style = self.project.read(cx).path_style(cx);
1658
1659 let ix = match self.index_for_tool_call(update.id()) {
1660 Some(ix) => ix,
1661 None => {
1662 // Tool call not found - create a failed tool call entry
1663 let failed_tool_call = ToolCall {
1664 id: update.id().clone(),
1665 label: cx.new(|cx| Markdown::new("Tool call not found".into(), None, None, cx)),
1666 kind: acp::ToolKind::Fetch,
1667 content: vec![ToolCallContent::ContentBlock(ContentBlock::new(
1668 "Tool call not found".into(),
1669 &languages,
1670 path_style,
1671 cx,
1672 ))],
1673 status: ToolCallStatus::Failed,
1674 locations: Vec::new(),
1675 resolved_locations: Vec::new(),
1676 raw_input: None,
1677 raw_input_markdown: None,
1678 raw_output: None,
1679 tool_name: None,
1680 subagent_session_info: None,
1681 };
1682 self.push_entry(AgentThreadEntry::ToolCall(failed_tool_call), cx);
1683 return Ok(());
1684 }
1685 };
1686 let AgentThreadEntry::ToolCall(call) = &mut self.entries[ix] else {
1687 unreachable!()
1688 };
1689
1690 match update {
1691 ToolCallUpdate::UpdateFields(update) => {
1692 let location_updated = update.fields.locations.is_some();
1693 call.update_fields(
1694 update.fields,
1695 update.meta,
1696 languages,
1697 path_style,
1698 &self.terminals,
1699 cx,
1700 )?;
1701 if location_updated {
1702 self.resolve_locations(update.tool_call_id, cx);
1703 }
1704 }
1705 ToolCallUpdate::UpdateDiff(update) => {
1706 call.content.clear();
1707 call.content.push(ToolCallContent::Diff(update.diff));
1708 }
1709 ToolCallUpdate::UpdateTerminal(update) => {
1710 call.content.clear();
1711 call.content
1712 .push(ToolCallContent::Terminal(update.terminal));
1713 }
1714 }
1715
1716 cx.emit(AcpThreadEvent::EntryUpdated(ix));
1717
1718 Ok(())
1719 }
1720
1721 /// Updates a tool call if id matches an existing entry, otherwise inserts a new one.
1722 pub fn upsert_tool_call(
1723 &mut self,
1724 tool_call: acp::ToolCall,
1725 cx: &mut Context<Self>,
1726 ) -> Result<(), acp::Error> {
1727 let status = tool_call.status.into();
1728 self.upsert_tool_call_inner(tool_call.into(), status, cx)
1729 }
1730
1731 /// Fails if id does not match an existing entry.
1732 pub fn upsert_tool_call_inner(
1733 &mut self,
1734 update: acp::ToolCallUpdate,
1735 status: ToolCallStatus,
1736 cx: &mut Context<Self>,
1737 ) -> Result<(), acp::Error> {
1738 let language_registry = self.project.read(cx).languages().clone();
1739 let path_style = self.project.read(cx).path_style(cx);
1740 let id = update.tool_call_id.clone();
1741
1742 let agent_telemetry_id = self.connection().telemetry_id();
1743 let session = self.session_id();
1744 let parent_session_id = self.parent_session_id();
1745 if let ToolCallStatus::Completed | ToolCallStatus::Failed = status {
1746 let status = if matches!(status, ToolCallStatus::Completed) {
1747 "completed"
1748 } else {
1749 "failed"
1750 };
1751 telemetry::event!(
1752 "Agent Tool Call Completed",
1753 agent_telemetry_id,
1754 session,
1755 parent_session_id,
1756 status
1757 );
1758 }
1759
1760 if let Some(ix) = self.index_for_tool_call(&id) {
1761 let AgentThreadEntry::ToolCall(call) = &mut self.entries[ix] else {
1762 unreachable!()
1763 };
1764
1765 call.update_fields(
1766 update.fields,
1767 update.meta,
1768 language_registry,
1769 path_style,
1770 &self.terminals,
1771 cx,
1772 )?;
1773 call.status = status;
1774
1775 cx.emit(AcpThreadEvent::EntryUpdated(ix));
1776 } else {
1777 let call = ToolCall::from_acp(
1778 update.try_into()?,
1779 status,
1780 language_registry,
1781 self.project.read(cx).path_style(cx),
1782 &self.terminals,
1783 cx,
1784 )?;
1785 self.push_entry(AgentThreadEntry::ToolCall(call), cx);
1786 };
1787
1788 self.resolve_locations(id, cx);
1789 Ok(())
1790 }
1791
1792 fn index_for_tool_call(&self, id: &acp::ToolCallId) -> Option<usize> {
1793 self.entries
1794 .iter()
1795 .enumerate()
1796 .rev()
1797 .find_map(|(index, entry)| {
1798 if let AgentThreadEntry::ToolCall(tool_call) = entry
1799 && &tool_call.id == id
1800 {
1801 Some(index)
1802 } else {
1803 None
1804 }
1805 })
1806 }
1807
1808 fn tool_call_mut(&mut self, id: &acp::ToolCallId) -> Option<(usize, &mut ToolCall)> {
1809 // The tool call we are looking for is typically the last one, or very close to the end.
1810 // At the moment, it doesn't seem like a hashmap would be a good fit for this use case.
1811 self.entries
1812 .iter_mut()
1813 .enumerate()
1814 .rev()
1815 .find_map(|(index, tool_call)| {
1816 if let AgentThreadEntry::ToolCall(tool_call) = tool_call
1817 && &tool_call.id == id
1818 {
1819 Some((index, tool_call))
1820 } else {
1821 None
1822 }
1823 })
1824 }
1825
1826 pub fn tool_call(&self, id: &acp::ToolCallId) -> Option<(usize, &ToolCall)> {
1827 self.entries
1828 .iter()
1829 .enumerate()
1830 .rev()
1831 .find_map(|(index, tool_call)| {
1832 if let AgentThreadEntry::ToolCall(tool_call) = tool_call
1833 && &tool_call.id == id
1834 {
1835 Some((index, tool_call))
1836 } else {
1837 None
1838 }
1839 })
1840 }
1841
1842 pub fn tool_call_for_subagent(&self, session_id: &acp::SessionId) -> Option<&ToolCall> {
1843 self.entries.iter().find_map(|entry| match entry {
1844 AgentThreadEntry::ToolCall(tool_call) => {
1845 if let Some(subagent_session_info) = &tool_call.subagent_session_info
1846 && &subagent_session_info.session_id == session_id
1847 {
1848 Some(tool_call)
1849 } else {
1850 None
1851 }
1852 }
1853 _ => None,
1854 })
1855 }
1856
1857 pub fn resolve_locations(&mut self, id: acp::ToolCallId, cx: &mut Context<Self>) {
1858 let project = self.project.clone();
1859 let should_update_agent_location = self.parent_session_id.is_none();
1860 let Some((_, tool_call)) = self.tool_call_mut(&id) else {
1861 return;
1862 };
1863 let task = tool_call.resolve_locations(project, cx);
1864 cx.spawn(async move |this, cx| {
1865 let resolved_locations = task.await;
1866
1867 this.update(cx, |this, cx| {
1868 let project = this.project.clone();
1869
1870 for location in resolved_locations.iter().flatten() {
1871 this.shared_buffers
1872 .insert(location.buffer.clone(), location.buffer.read(cx).snapshot());
1873 }
1874 let Some((ix, tool_call)) = this.tool_call_mut(&id) else {
1875 return;
1876 };
1877
1878 if let Some(Some(location)) = resolved_locations.last() {
1879 project.update(cx, |project, cx| {
1880 let should_ignore = if let Some(agent_location) = project
1881 .agent_location()
1882 .filter(|agent_location| agent_location.buffer == location.buffer)
1883 {
1884 let snapshot = location.buffer.read(cx).snapshot();
1885 let old_position = agent_location.position.to_point(&snapshot);
1886 let new_position = location.position.to_point(&snapshot);
1887
1888 // ignore this so that when we get updates from the edit tool
1889 // the position doesn't reset to the startof line
1890 old_position.row == new_position.row
1891 && old_position.column > new_position.column
1892 } else {
1893 false
1894 };
1895 if !should_ignore && should_update_agent_location {
1896 project.set_agent_location(Some(location.into()), cx);
1897 }
1898 });
1899 }
1900
1901 let resolved_locations = resolved_locations
1902 .iter()
1903 .map(|l| l.as_ref().map(|l| AgentLocation::from(l)))
1904 .collect::<Vec<_>>();
1905
1906 if tool_call.resolved_locations != resolved_locations {
1907 tool_call.resolved_locations = resolved_locations;
1908 cx.emit(AcpThreadEvent::EntryUpdated(ix));
1909 }
1910 })
1911 })
1912 .detach();
1913 }
1914
1915 pub fn request_tool_call_authorization(
1916 &mut self,
1917 tool_call: acp::ToolCallUpdate,
1918 options: PermissionOptions,
1919 cx: &mut Context<Self>,
1920 ) -> Result<Task<acp::RequestPermissionOutcome>> {
1921 let (tx, rx) = oneshot::channel();
1922
1923 let status = ToolCallStatus::WaitingForConfirmation {
1924 options,
1925 respond_tx: tx,
1926 };
1927
1928 let tool_call_id = tool_call.tool_call_id.clone();
1929 self.upsert_tool_call_inner(tool_call, status, cx)?;
1930 cx.emit(AcpThreadEvent::ToolAuthorizationRequested(
1931 tool_call_id.clone(),
1932 ));
1933
1934 Ok(cx.spawn(async move |this, cx| {
1935 let outcome = match rx.await {
1936 Ok(option) => acp::RequestPermissionOutcome::Selected(
1937 acp::SelectedPermissionOutcome::new(option),
1938 ),
1939 Err(oneshot::Canceled) => acp::RequestPermissionOutcome::Cancelled,
1940 };
1941 this.update(cx, |_this, cx| {
1942 cx.emit(AcpThreadEvent::ToolAuthorizationReceived(tool_call_id))
1943 })
1944 .ok();
1945 outcome
1946 }))
1947 }
1948
1949 pub fn authorize_tool_call(
1950 &mut self,
1951 id: acp::ToolCallId,
1952 option_id: acp::PermissionOptionId,
1953 option_kind: acp::PermissionOptionKind,
1954 cx: &mut Context<Self>,
1955 ) {
1956 let Some((ix, call)) = self.tool_call_mut(&id) else {
1957 return;
1958 };
1959
1960 let new_status = match option_kind {
1961 acp::PermissionOptionKind::RejectOnce | acp::PermissionOptionKind::RejectAlways => {
1962 ToolCallStatus::Rejected
1963 }
1964 acp::PermissionOptionKind::AllowOnce | acp::PermissionOptionKind::AllowAlways => {
1965 ToolCallStatus::InProgress
1966 }
1967 _ => ToolCallStatus::InProgress,
1968 };
1969
1970 let curr_status = mem::replace(&mut call.status, new_status);
1971
1972 if let ToolCallStatus::WaitingForConfirmation { respond_tx, .. } = curr_status {
1973 respond_tx.send(option_id).log_err();
1974 } else if cfg!(debug_assertions) {
1975 panic!("tried to authorize an already authorized tool call");
1976 }
1977
1978 cx.emit(AcpThreadEvent::EntryUpdated(ix));
1979 }
1980
1981 pub fn plan(&self) -> &Plan {
1982 &self.plan
1983 }
1984
1985 pub fn update_plan(&mut self, request: acp::Plan, cx: &mut Context<Self>) {
1986 let new_entries_len = request.entries.len();
1987 let mut new_entries = request.entries.into_iter();
1988
1989 // Reuse existing markdown to prevent flickering
1990 for (old, new) in self.plan.entries.iter_mut().zip(new_entries.by_ref()) {
1991 let PlanEntry {
1992 content,
1993 priority,
1994 status,
1995 } = old;
1996 content.update(cx, |old, cx| {
1997 old.replace(new.content, cx);
1998 });
1999 *priority = new.priority;
2000 *status = new.status;
2001 }
2002 for new in new_entries {
2003 self.plan.entries.push(PlanEntry::from_acp(new, cx))
2004 }
2005 self.plan.entries.truncate(new_entries_len);
2006
2007 cx.notify();
2008 }
2009
2010 fn clear_completed_plan_entries(&mut self, cx: &mut Context<Self>) {
2011 self.plan
2012 .entries
2013 .retain(|entry| !matches!(entry.status, acp::PlanEntryStatus::Completed));
2014 cx.notify();
2015 }
2016
2017 #[cfg(any(test, feature = "test-support"))]
2018 pub fn send_raw(
2019 &mut self,
2020 message: &str,
2021 cx: &mut Context<Self>,
2022 ) -> BoxFuture<'static, Result<Option<acp::PromptResponse>>> {
2023 self.send(vec![message.into()], cx)
2024 }
2025
2026 pub fn send(
2027 &mut self,
2028 message: Vec<acp::ContentBlock>,
2029 cx: &mut Context<Self>,
2030 ) -> BoxFuture<'static, Result<Option<acp::PromptResponse>>> {
2031 let block = ContentBlock::new_combined(
2032 message.clone(),
2033 self.project.read(cx).languages().clone(),
2034 self.project.read(cx).path_style(cx),
2035 cx,
2036 );
2037 let request = acp::PromptRequest::new(self.session_id.clone(), message.clone());
2038 let git_store = self.project.read(cx).git_store().clone();
2039
2040 let message_id = if self.connection.truncate(&self.session_id, cx).is_some() {
2041 Some(UserMessageId::new())
2042 } else {
2043 None
2044 };
2045
2046 self.run_turn(cx, async move |this, cx| {
2047 this.update(cx, |this, cx| {
2048 this.push_entry(
2049 AgentThreadEntry::UserMessage(UserMessage {
2050 id: message_id.clone(),
2051 content: block,
2052 chunks: message,
2053 checkpoint: None,
2054 indented: false,
2055 }),
2056 cx,
2057 );
2058 })
2059 .ok();
2060
2061 let old_checkpoint = git_store
2062 .update(cx, |git, cx| git.checkpoint(cx))
2063 .await
2064 .context("failed to get old checkpoint")
2065 .log_err();
2066 this.update(cx, |this, cx| {
2067 if let Some((_ix, message)) = this.last_user_message() {
2068 message.checkpoint = old_checkpoint.map(|git_checkpoint| Checkpoint {
2069 git_checkpoint,
2070 show: false,
2071 });
2072 }
2073 this.connection.prompt(message_id, request, cx)
2074 })?
2075 .await
2076 })
2077 }
2078
2079 pub fn can_retry(&self, cx: &App) -> bool {
2080 self.connection.retry(&self.session_id, cx).is_some()
2081 }
2082
2083 pub fn retry(
2084 &mut self,
2085 cx: &mut Context<Self>,
2086 ) -> BoxFuture<'static, Result<Option<acp::PromptResponse>>> {
2087 self.run_turn(cx, async move |this, cx| {
2088 this.update(cx, |this, cx| {
2089 this.connection
2090 .retry(&this.session_id, cx)
2091 .map(|retry| retry.run(cx))
2092 })?
2093 .context("retrying a session is not supported")?
2094 .await
2095 })
2096 }
2097
2098 fn run_turn(
2099 &mut self,
2100 cx: &mut Context<Self>,
2101 f: impl 'static + AsyncFnOnce(WeakEntity<Self>, &mut AsyncApp) -> Result<acp::PromptResponse>,
2102 ) -> BoxFuture<'static, Result<Option<acp::PromptResponse>>> {
2103 self.clear_completed_plan_entries(cx);
2104 self.had_error = false;
2105
2106 let (tx, rx) = oneshot::channel();
2107 let cancel_task = self.cancel(cx);
2108
2109 self.turn_id += 1;
2110 let turn_id = self.turn_id;
2111 self.running_turn = Some(RunningTurn {
2112 id: turn_id,
2113 send_task: cx.spawn(async move |this, cx| {
2114 cancel_task.await;
2115 tx.send(f(this, cx).await).ok();
2116 }),
2117 });
2118
2119 cx.spawn(async move |this, cx| {
2120 let response = rx.await;
2121
2122 this.update(cx, |this, cx| this.update_last_checkpoint(cx))?
2123 .await?;
2124
2125 this.update(cx, |this, cx| {
2126 if this.parent_session_id.is_none() {
2127 this.project
2128 .update(cx, |project, cx| project.set_agent_location(None, cx));
2129 }
2130 let Ok(response) = response else {
2131 // tx dropped, just return
2132 return Ok(None);
2133 };
2134
2135 let is_same_turn = this
2136 .running_turn
2137 .as_ref()
2138 .is_some_and(|turn| turn_id == turn.id);
2139
2140 // If the user submitted a follow up message, running_turn might
2141 // already point to a different turn. Therefore we only want to
2142 // take the task if it's the same turn.
2143 if is_same_turn {
2144 this.running_turn.take();
2145 }
2146
2147 match response {
2148 Ok(r) => {
2149 Self::flush_streaming_text(&mut this.streaming_text_buffer, cx);
2150
2151 if r.stop_reason == acp::StopReason::MaxTokens {
2152 this.had_error = true;
2153 cx.emit(AcpThreadEvent::Error);
2154 log::error!("Max tokens reached. Usage: {:?}", this.token_usage);
2155 return Err(anyhow!("Max tokens reached"));
2156 }
2157
2158 let canceled = matches!(r.stop_reason, acp::StopReason::Cancelled);
2159 if canceled {
2160 this.mark_pending_tools_as_canceled();
2161 }
2162
2163 // Handle refusal - distinguish between user prompt and tool call refusals
2164 if let acp::StopReason::Refusal = r.stop_reason {
2165 this.had_error = true;
2166 if let Some((user_msg_ix, _)) = this.last_user_message() {
2167 // Check if there's a completed tool call with results after the last user message
2168 // This indicates the refusal is in response to tool output, not the user's prompt
2169 let has_completed_tool_call_after_user_msg =
2170 this.entries.iter().skip(user_msg_ix + 1).any(|entry| {
2171 if let AgentThreadEntry::ToolCall(tool_call) = entry {
2172 // Check if the tool call has completed and has output
2173 matches!(tool_call.status, ToolCallStatus::Completed)
2174 && tool_call.raw_output.is_some()
2175 } else {
2176 false
2177 }
2178 });
2179
2180 if has_completed_tool_call_after_user_msg {
2181 // Refusal is due to tool output - don't truncate, just notify
2182 // The model refused based on what the tool returned
2183 cx.emit(AcpThreadEvent::Refusal);
2184 } else {
2185 // User prompt was refused - truncate back to before the user message
2186 let range = user_msg_ix..this.entries.len();
2187 if range.start < range.end {
2188 this.entries.truncate(user_msg_ix);
2189 cx.emit(AcpThreadEvent::EntriesRemoved(range));
2190 }
2191 cx.emit(AcpThreadEvent::Refusal);
2192 }
2193 } else {
2194 // No user message found, treat as general refusal
2195 cx.emit(AcpThreadEvent::Refusal);
2196 }
2197 }
2198
2199 cx.emit(AcpThreadEvent::Stopped(r.stop_reason));
2200 Ok(Some(r))
2201 }
2202 Err(e) => {
2203 Self::flush_streaming_text(&mut this.streaming_text_buffer, cx);
2204
2205 this.had_error = true;
2206 cx.emit(AcpThreadEvent::Error);
2207 log::error!("Error in run turn: {:?}", e);
2208 Err(e)
2209 }
2210 }
2211 })?
2212 })
2213 .boxed()
2214 }
2215
2216 pub fn cancel(&mut self, cx: &mut Context<Self>) -> Task<()> {
2217 let Some(turn) = self.running_turn.take() else {
2218 return Task::ready(());
2219 };
2220 self.connection.cancel(&self.session_id, cx);
2221
2222 Self::flush_streaming_text(&mut self.streaming_text_buffer, cx);
2223 self.mark_pending_tools_as_canceled();
2224
2225 // Wait for the send task to complete
2226 cx.background_spawn(turn.send_task)
2227 }
2228
2229 fn mark_pending_tools_as_canceled(&mut self) {
2230 for entry in self.entries.iter_mut() {
2231 if let AgentThreadEntry::ToolCall(call) = entry {
2232 let cancel = matches!(
2233 call.status,
2234 ToolCallStatus::Pending
2235 | ToolCallStatus::WaitingForConfirmation { .. }
2236 | ToolCallStatus::InProgress
2237 );
2238
2239 if cancel {
2240 call.status = ToolCallStatus::Canceled;
2241 }
2242 }
2243 }
2244 }
2245
2246 /// Restores the git working tree to the state at the given checkpoint (if one exists)
2247 pub fn restore_checkpoint(
2248 &mut self,
2249 id: UserMessageId,
2250 cx: &mut Context<Self>,
2251 ) -> Task<Result<()>> {
2252 let Some((_, message)) = self.user_message_mut(&id) else {
2253 return Task::ready(Err(anyhow!("message not found")));
2254 };
2255
2256 let checkpoint = message
2257 .checkpoint
2258 .as_ref()
2259 .map(|c| c.git_checkpoint.clone());
2260
2261 // Cancel any in-progress generation before restoring
2262 let cancel_task = self.cancel(cx);
2263 let rewind = self.rewind(id.clone(), cx);
2264 let git_store = self.project.read(cx).git_store().clone();
2265
2266 cx.spawn(async move |_, cx| {
2267 cancel_task.await;
2268 rewind.await?;
2269 if let Some(checkpoint) = checkpoint {
2270 git_store
2271 .update(cx, |git, cx| git.restore_checkpoint(checkpoint, cx))
2272 .await?;
2273 }
2274
2275 Ok(())
2276 })
2277 }
2278
2279 /// Rewinds this thread to before the entry at `index`, removing it and all
2280 /// subsequent entries while rejecting any action_log changes made from that point.
2281 /// Unlike `restore_checkpoint`, this method does not restore from git.
2282 pub fn rewind(&mut self, id: UserMessageId, cx: &mut Context<Self>) -> Task<Result<()>> {
2283 let Some(truncate) = self.connection.truncate(&self.session_id, cx) else {
2284 return Task::ready(Err(anyhow!("not supported")));
2285 };
2286
2287 Self::flush_streaming_text(&mut self.streaming_text_buffer, cx);
2288 let telemetry = ActionLogTelemetry::from(&*self);
2289 cx.spawn(async move |this, cx| {
2290 cx.update(|cx| truncate.run(id.clone(), cx)).await?;
2291 this.update(cx, |this, cx| {
2292 if let Some((ix, _)) = this.user_message_mut(&id) {
2293 // Collect all terminals from entries that will be removed
2294 let terminals_to_remove: Vec<acp::TerminalId> = this.entries[ix..]
2295 .iter()
2296 .flat_map(|entry| entry.terminals())
2297 .filter_map(|terminal| terminal.read(cx).id().clone().into())
2298 .collect();
2299
2300 let range = ix..this.entries.len();
2301 this.entries.truncate(ix);
2302 cx.emit(AcpThreadEvent::EntriesRemoved(range));
2303
2304 // Kill and remove the terminals
2305 for terminal_id in terminals_to_remove {
2306 if let Some(terminal) = this.terminals.remove(&terminal_id) {
2307 terminal.update(cx, |terminal, cx| {
2308 terminal.kill(cx);
2309 });
2310 }
2311 }
2312 }
2313 this.action_log().update(cx, |action_log, cx| {
2314 action_log.reject_all_edits(Some(telemetry), cx)
2315 })
2316 })?
2317 .await;
2318 Ok(())
2319 })
2320 }
2321
2322 fn update_last_checkpoint(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
2323 let git_store = self.project.read(cx).git_store().clone();
2324
2325 let Some((_, message)) = self.last_user_message() else {
2326 return Task::ready(Ok(()));
2327 };
2328 let Some(user_message_id) = message.id.clone() else {
2329 return Task::ready(Ok(()));
2330 };
2331 let Some(checkpoint) = message.checkpoint.as_ref() else {
2332 return Task::ready(Ok(()));
2333 };
2334 let old_checkpoint = checkpoint.git_checkpoint.clone();
2335
2336 let new_checkpoint = git_store.update(cx, |git, cx| git.checkpoint(cx));
2337 cx.spawn(async move |this, cx| {
2338 let Some(new_checkpoint) = new_checkpoint
2339 .await
2340 .context("failed to get new checkpoint")
2341 .log_err()
2342 else {
2343 return Ok(());
2344 };
2345
2346 let equal = git_store
2347 .update(cx, |git, cx| {
2348 git.compare_checkpoints(old_checkpoint.clone(), new_checkpoint, cx)
2349 })
2350 .await
2351 .unwrap_or(true);
2352
2353 this.update(cx, |this, cx| {
2354 if let Some((ix, message)) = this.user_message_mut(&user_message_id) {
2355 if let Some(checkpoint) = message.checkpoint.as_mut() {
2356 checkpoint.show = !equal;
2357 cx.emit(AcpThreadEvent::EntryUpdated(ix));
2358 }
2359 }
2360 })?;
2361
2362 Ok(())
2363 })
2364 }
2365
2366 fn last_user_message(&mut self) -> Option<(usize, &mut UserMessage)> {
2367 self.entries
2368 .iter_mut()
2369 .enumerate()
2370 .rev()
2371 .find_map(|(ix, entry)| {
2372 if let AgentThreadEntry::UserMessage(message) = entry {
2373 Some((ix, message))
2374 } else {
2375 None
2376 }
2377 })
2378 }
2379
2380 fn user_message_mut(&mut self, id: &UserMessageId) -> Option<(usize, &mut UserMessage)> {
2381 self.entries.iter_mut().enumerate().find_map(|(ix, entry)| {
2382 if let AgentThreadEntry::UserMessage(message) = entry {
2383 if message.id.as_ref() == Some(id) {
2384 Some((ix, message))
2385 } else {
2386 None
2387 }
2388 } else {
2389 None
2390 }
2391 })
2392 }
2393
2394 pub fn read_text_file(
2395 &self,
2396 path: PathBuf,
2397 line: Option<u32>,
2398 limit: Option<u32>,
2399 reuse_shared_snapshot: bool,
2400 cx: &mut Context<Self>,
2401 ) -> Task<Result<String, acp::Error>> {
2402 // Args are 1-based, move to 0-based
2403 let line = line.unwrap_or_default().saturating_sub(1);
2404 let limit = limit.unwrap_or(u32::MAX);
2405 let project = self.project.clone();
2406 let action_log = self.action_log.clone();
2407 let should_update_agent_location = self.parent_session_id.is_none();
2408 cx.spawn(async move |this, cx| {
2409 let load = project.update(cx, |project, cx| {
2410 let path = project
2411 .project_path_for_absolute_path(&path, cx)
2412 .ok_or_else(|| {
2413 acp::Error::resource_not_found(Some(path.display().to_string()))
2414 })?;
2415 Ok::<_, acp::Error>(project.open_buffer(path, cx))
2416 })?;
2417
2418 let buffer = load.await?;
2419
2420 let snapshot = if reuse_shared_snapshot {
2421 this.read_with(cx, |this, _| {
2422 this.shared_buffers.get(&buffer.clone()).cloned()
2423 })
2424 .log_err()
2425 .flatten()
2426 } else {
2427 None
2428 };
2429
2430 let snapshot = if let Some(snapshot) = snapshot {
2431 snapshot
2432 } else {
2433 action_log.update(cx, |action_log, cx| {
2434 action_log.buffer_read(buffer.clone(), cx);
2435 });
2436
2437 let snapshot = buffer.update(cx, |buffer, _| buffer.snapshot());
2438 this.update(cx, |this, _| {
2439 this.shared_buffers.insert(buffer.clone(), snapshot.clone());
2440 })?;
2441 snapshot
2442 };
2443
2444 let max_point = snapshot.max_point();
2445 let start_position = Point::new(line, 0);
2446
2447 if start_position > max_point {
2448 return Err(acp::Error::invalid_params().data(format!(
2449 "Attempting to read beyond the end of the file, line {}:{}",
2450 max_point.row + 1,
2451 max_point.column
2452 )));
2453 }
2454
2455 let start = snapshot.anchor_before(start_position);
2456 let end = snapshot.anchor_before(Point::new(line.saturating_add(limit), 0));
2457
2458 if should_update_agent_location {
2459 project.update(cx, |project, cx| {
2460 project.set_agent_location(
2461 Some(AgentLocation {
2462 buffer: buffer.downgrade(),
2463 position: start,
2464 }),
2465 cx,
2466 );
2467 });
2468 }
2469
2470 Ok(snapshot.text_for_range(start..end).collect::<String>())
2471 })
2472 }
2473
2474 pub fn write_text_file(
2475 &self,
2476 path: PathBuf,
2477 content: String,
2478 cx: &mut Context<Self>,
2479 ) -> Task<Result<()>> {
2480 let project = self.project.clone();
2481 let action_log = self.action_log.clone();
2482 let should_update_agent_location = self.parent_session_id.is_none();
2483 cx.spawn(async move |this, cx| {
2484 let load = project.update(cx, |project, cx| {
2485 let path = project
2486 .project_path_for_absolute_path(&path, cx)
2487 .context("invalid path")?;
2488 anyhow::Ok(project.open_buffer(path, cx))
2489 });
2490 let buffer = load?.await?;
2491 let snapshot = this.update(cx, |this, cx| {
2492 this.shared_buffers
2493 .get(&buffer)
2494 .cloned()
2495 .unwrap_or_else(|| buffer.read(cx).snapshot())
2496 })?;
2497 let edits = cx
2498 .background_executor()
2499 .spawn(async move {
2500 let old_text = snapshot.text();
2501 text_diff(old_text.as_str(), &content)
2502 .into_iter()
2503 .map(|(range, replacement)| {
2504 (snapshot.anchor_range_around(range), replacement)
2505 })
2506 .collect::<Vec<_>>()
2507 })
2508 .await;
2509
2510 if should_update_agent_location {
2511 project.update(cx, |project, cx| {
2512 project.set_agent_location(
2513 Some(AgentLocation {
2514 buffer: buffer.downgrade(),
2515 position: edits
2516 .last()
2517 .map(|(range, _)| range.end)
2518 .unwrap_or(Anchor::min_for_buffer(buffer.read(cx).remote_id())),
2519 }),
2520 cx,
2521 );
2522 });
2523 }
2524
2525 let format_on_save = cx.update(|cx| {
2526 action_log.update(cx, |action_log, cx| {
2527 action_log.buffer_read(buffer.clone(), cx);
2528 });
2529
2530 let format_on_save = buffer.update(cx, |buffer, cx| {
2531 buffer.edit(edits, None, cx);
2532
2533 let settings = language::language_settings::language_settings(
2534 buffer.language().map(|l| l.name()),
2535 buffer.file(),
2536 cx,
2537 );
2538
2539 settings.format_on_save != FormatOnSave::Off
2540 });
2541 action_log.update(cx, |action_log, cx| {
2542 action_log.buffer_edited(buffer.clone(), cx);
2543 });
2544 format_on_save
2545 });
2546
2547 if format_on_save {
2548 let format_task = project.update(cx, |project, cx| {
2549 project.format(
2550 HashSet::from_iter([buffer.clone()]),
2551 LspFormatTarget::Buffers,
2552 false,
2553 FormatTrigger::Save,
2554 cx,
2555 )
2556 });
2557 format_task.await.log_err();
2558
2559 action_log.update(cx, |action_log, cx| {
2560 action_log.buffer_edited(buffer.clone(), cx);
2561 });
2562 }
2563
2564 project
2565 .update(cx, |project, cx| project.save_buffer(buffer, cx))
2566 .await
2567 })
2568 }
2569
2570 pub fn create_terminal(
2571 &self,
2572 command: String,
2573 args: Vec<String>,
2574 extra_env: Vec<acp::EnvVariable>,
2575 cwd: Option<PathBuf>,
2576 output_byte_limit: Option<u64>,
2577 cx: &mut Context<Self>,
2578 ) -> Task<Result<Entity<Terminal>>> {
2579 let env = match &cwd {
2580 Some(dir) => self.project.update(cx, |project, cx| {
2581 project.environment().update(cx, |env, cx| {
2582 env.directory_environment(dir.as_path().into(), cx)
2583 })
2584 }),
2585 None => Task::ready(None).shared(),
2586 };
2587 let env = cx.spawn(async move |_, _| {
2588 let mut env = env.await.unwrap_or_default();
2589 // Disables paging for `git` and hopefully other commands
2590 env.insert("PAGER".into(), "".into());
2591 for var in extra_env {
2592 env.insert(var.name, var.value);
2593 }
2594 env
2595 });
2596
2597 let project = self.project.clone();
2598 let language_registry = project.read(cx).languages().clone();
2599 let is_windows = project.read(cx).path_style(cx).is_windows();
2600
2601 let terminal_id = acp::TerminalId::new(Uuid::new_v4().to_string());
2602 let terminal_task = cx.spawn({
2603 let terminal_id = terminal_id.clone();
2604 async move |_this, cx| {
2605 let env = env.await;
2606 let shell = project
2607 .update(cx, |project, cx| {
2608 project
2609 .remote_client()
2610 .and_then(|r| r.read(cx).default_system_shell())
2611 })
2612 .unwrap_or_else(|| get_default_system_shell_preferring_bash());
2613 let (task_command, task_args) =
2614 ShellBuilder::new(&Shell::Program(shell), is_windows)
2615 .redirect_stdin_to_dev_null()
2616 .build(Some(command.clone()), &args);
2617 let terminal = project
2618 .update(cx, |project, cx| {
2619 project.create_terminal_task(
2620 task::SpawnInTerminal {
2621 command: Some(task_command),
2622 args: task_args,
2623 cwd: cwd.clone(),
2624 env,
2625 ..Default::default()
2626 },
2627 cx,
2628 )
2629 })
2630 .await?;
2631
2632 anyhow::Ok(cx.new(|cx| {
2633 Terminal::new(
2634 terminal_id,
2635 &format!("{} {}", command, args.join(" ")),
2636 cwd,
2637 output_byte_limit.map(|l| l as usize),
2638 terminal,
2639 language_registry,
2640 cx,
2641 )
2642 }))
2643 }
2644 });
2645
2646 cx.spawn(async move |this, cx| {
2647 let terminal = terminal_task.await?;
2648 this.update(cx, |this, _cx| {
2649 this.terminals.insert(terminal_id, terminal.clone());
2650 terminal
2651 })
2652 })
2653 }
2654
2655 pub fn kill_terminal(
2656 &mut self,
2657 terminal_id: acp::TerminalId,
2658 cx: &mut Context<Self>,
2659 ) -> Result<()> {
2660 self.terminals
2661 .get(&terminal_id)
2662 .context("Terminal not found")?
2663 .update(cx, |terminal, cx| {
2664 terminal.kill(cx);
2665 });
2666
2667 Ok(())
2668 }
2669
2670 pub fn release_terminal(
2671 &mut self,
2672 terminal_id: acp::TerminalId,
2673 cx: &mut Context<Self>,
2674 ) -> Result<()> {
2675 self.terminals
2676 .remove(&terminal_id)
2677 .context("Terminal not found")?
2678 .update(cx, |terminal, cx| {
2679 terminal.kill(cx);
2680 });
2681
2682 Ok(())
2683 }
2684
2685 pub fn terminal(&self, terminal_id: acp::TerminalId) -> Result<Entity<Terminal>> {
2686 self.terminals
2687 .get(&terminal_id)
2688 .context("Terminal not found")
2689 .cloned()
2690 }
2691
2692 pub fn to_markdown(&self, cx: &App) -> String {
2693 self.entries.iter().map(|e| e.to_markdown(cx)).collect()
2694 }
2695
2696 pub fn emit_load_error(&mut self, error: LoadError, cx: &mut Context<Self>) {
2697 cx.emit(AcpThreadEvent::LoadError(error));
2698 }
2699
2700 pub fn register_terminal_created(
2701 &mut self,
2702 terminal_id: acp::TerminalId,
2703 command_label: String,
2704 working_dir: Option<PathBuf>,
2705 output_byte_limit: Option<u64>,
2706 terminal: Entity<::terminal::Terminal>,
2707 cx: &mut Context<Self>,
2708 ) -> Entity<Terminal> {
2709 let language_registry = self.project.read(cx).languages().clone();
2710
2711 let entity = cx.new(|cx| {
2712 Terminal::new(
2713 terminal_id.clone(),
2714 &command_label,
2715 working_dir.clone(),
2716 output_byte_limit.map(|l| l as usize),
2717 terminal,
2718 language_registry,
2719 cx,
2720 )
2721 });
2722 self.terminals.insert(terminal_id.clone(), entity.clone());
2723 entity
2724 }
2725
2726 pub fn mark_as_subagent_output(&mut self, cx: &mut Context<Self>) {
2727 for entry in self.entries.iter_mut().rev() {
2728 if let AgentThreadEntry::AssistantMessage(assistant_message) = entry {
2729 assistant_message.is_subagent_output = true;
2730 cx.notify();
2731 return;
2732 }
2733 }
2734 }
2735
2736 pub fn on_terminal_provider_event(
2737 &mut self,
2738 event: TerminalProviderEvent,
2739 cx: &mut Context<Self>,
2740 ) {
2741 match event {
2742 TerminalProviderEvent::Created {
2743 terminal_id,
2744 label,
2745 cwd,
2746 output_byte_limit,
2747 terminal,
2748 } => {
2749 let entity = self.register_terminal_created(
2750 terminal_id.clone(),
2751 label,
2752 cwd,
2753 output_byte_limit,
2754 terminal,
2755 cx,
2756 );
2757
2758 if let Some(mut chunks) = self.pending_terminal_output.remove(&terminal_id) {
2759 for data in chunks.drain(..) {
2760 entity.update(cx, |term, cx| {
2761 term.inner().update(cx, |inner, cx| {
2762 inner.write_output(&data, cx);
2763 })
2764 });
2765 }
2766 }
2767
2768 if let Some(_status) = self.pending_terminal_exit.remove(&terminal_id) {
2769 entity.update(cx, |_term, cx| {
2770 cx.notify();
2771 });
2772 }
2773
2774 cx.notify();
2775 }
2776 TerminalProviderEvent::Output { terminal_id, data } => {
2777 if let Some(entity) = self.terminals.get(&terminal_id) {
2778 entity.update(cx, |term, cx| {
2779 term.inner().update(cx, |inner, cx| {
2780 inner.write_output(&data, cx);
2781 })
2782 });
2783 } else {
2784 self.pending_terminal_output
2785 .entry(terminal_id)
2786 .or_default()
2787 .push(data);
2788 }
2789 }
2790 TerminalProviderEvent::TitleChanged { terminal_id, title } => {
2791 if let Some(entity) = self.terminals.get(&terminal_id) {
2792 entity.update(cx, |term, cx| {
2793 term.inner().update(cx, |inner, cx| {
2794 inner.breadcrumb_text = title;
2795 cx.emit(::terminal::Event::BreadcrumbsChanged);
2796 })
2797 });
2798 }
2799 }
2800 TerminalProviderEvent::Exit {
2801 terminal_id,
2802 status,
2803 } => {
2804 if let Some(entity) = self.terminals.get(&terminal_id) {
2805 entity.update(cx, |_term, cx| {
2806 cx.notify();
2807 });
2808 } else {
2809 self.pending_terminal_exit.insert(terminal_id, status);
2810 }
2811 }
2812 }
2813 }
2814}
2815
2816fn markdown_for_raw_output(
2817 raw_output: &serde_json::Value,
2818 language_registry: &Arc<LanguageRegistry>,
2819 cx: &mut App,
2820) -> Option<Entity<Markdown>> {
2821 match raw_output {
2822 serde_json::Value::Null => None,
2823 serde_json::Value::Bool(value) => Some(cx.new(|cx| {
2824 Markdown::new(
2825 value.to_string().into(),
2826 Some(language_registry.clone()),
2827 None,
2828 cx,
2829 )
2830 })),
2831 serde_json::Value::Number(value) => Some(cx.new(|cx| {
2832 Markdown::new(
2833 value.to_string().into(),
2834 Some(language_registry.clone()),
2835 None,
2836 cx,
2837 )
2838 })),
2839 serde_json::Value::String(value) => Some(cx.new(|cx| {
2840 Markdown::new(
2841 value.clone().into(),
2842 Some(language_registry.clone()),
2843 None,
2844 cx,
2845 )
2846 })),
2847 value => Some(cx.new(|cx| {
2848 let pretty_json = to_string_pretty(value).unwrap_or_else(|_| value.to_string());
2849
2850 Markdown::new(
2851 format!("```json\n{}\n```", pretty_json).into(),
2852 Some(language_registry.clone()),
2853 None,
2854 cx,
2855 )
2856 })),
2857 }
2858}
2859
2860#[cfg(test)]
2861mod tests {
2862 use super::*;
2863 use anyhow::anyhow;
2864 use futures::{channel::mpsc, future::LocalBoxFuture, select};
2865 use gpui::{App, AsyncApp, TestAppContext, WeakEntity};
2866 use indoc::indoc;
2867 use project::{AgentId, FakeFs, Fs};
2868 use rand::{distr, prelude::*};
2869 use serde_json::json;
2870 use settings::SettingsStore;
2871 use smol::stream::StreamExt as _;
2872 use std::{
2873 any::Any,
2874 cell::RefCell,
2875 path::Path,
2876 rc::Rc,
2877 sync::atomic::{AtomicBool, AtomicUsize, Ordering::SeqCst},
2878 time::Duration,
2879 };
2880 use util::{path, path_list::PathList};
2881
2882 fn init_test(cx: &mut TestAppContext) {
2883 env_logger::try_init().ok();
2884 cx.update(|cx| {
2885 let settings_store = SettingsStore::test(cx);
2886 cx.set_global(settings_store);
2887 });
2888 }
2889
2890 #[gpui::test]
2891 async fn test_terminal_output_buffered_before_created_renders(cx: &mut gpui::TestAppContext) {
2892 init_test(cx);
2893
2894 let fs = FakeFs::new(cx.executor());
2895 let project = Project::test(fs, [], cx).await;
2896 let connection = Rc::new(FakeAgentConnection::new());
2897 let thread = cx
2898 .update(|cx| {
2899 connection.new_session(
2900 project,
2901 PathList::new(&[std::path::Path::new(path!("/test"))]),
2902 cx,
2903 )
2904 })
2905 .await
2906 .unwrap();
2907
2908 let terminal_id = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
2909
2910 // Send Output BEFORE Created - should be buffered by acp_thread
2911 thread.update(cx, |thread, cx| {
2912 thread.on_terminal_provider_event(
2913 TerminalProviderEvent::Output {
2914 terminal_id: terminal_id.clone(),
2915 data: b"hello buffered".to_vec(),
2916 },
2917 cx,
2918 );
2919 });
2920
2921 // Create a display-only terminal and then send Created
2922 let lower = cx.new(|cx| {
2923 let builder = ::terminal::TerminalBuilder::new_display_only(
2924 ::terminal::terminal_settings::CursorShape::default(),
2925 ::terminal::terminal_settings::AlternateScroll::On,
2926 None,
2927 0,
2928 cx.background_executor(),
2929 PathStyle::local(),
2930 )
2931 .unwrap();
2932 builder.subscribe(cx)
2933 });
2934
2935 thread.update(cx, |thread, cx| {
2936 thread.on_terminal_provider_event(
2937 TerminalProviderEvent::Created {
2938 terminal_id: terminal_id.clone(),
2939 label: "Buffered Test".to_string(),
2940 cwd: None,
2941 output_byte_limit: None,
2942 terminal: lower.clone(),
2943 },
2944 cx,
2945 );
2946 });
2947
2948 // After Created, buffered Output should have been flushed into the renderer
2949 let content = thread.read_with(cx, |thread, cx| {
2950 let term = thread.terminal(terminal_id.clone()).unwrap();
2951 term.read_with(cx, |t, cx| t.inner().read(cx).get_content())
2952 });
2953
2954 assert!(
2955 content.contains("hello buffered"),
2956 "expected buffered output to render, got: {content}"
2957 );
2958 }
2959
2960 #[gpui::test]
2961 async fn test_terminal_output_and_exit_buffered_before_created(cx: &mut gpui::TestAppContext) {
2962 init_test(cx);
2963
2964 let fs = FakeFs::new(cx.executor());
2965 let project = Project::test(fs, [], cx).await;
2966 let connection = Rc::new(FakeAgentConnection::new());
2967 let thread = cx
2968 .update(|cx| {
2969 connection.new_session(
2970 project,
2971 PathList::new(&[std::path::Path::new(path!("/test"))]),
2972 cx,
2973 )
2974 })
2975 .await
2976 .unwrap();
2977
2978 let terminal_id = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
2979
2980 // Send Output BEFORE Created
2981 thread.update(cx, |thread, cx| {
2982 thread.on_terminal_provider_event(
2983 TerminalProviderEvent::Output {
2984 terminal_id: terminal_id.clone(),
2985 data: b"pre-exit data".to_vec(),
2986 },
2987 cx,
2988 );
2989 });
2990
2991 // Send Exit BEFORE Created
2992 thread.update(cx, |thread, cx| {
2993 thread.on_terminal_provider_event(
2994 TerminalProviderEvent::Exit {
2995 terminal_id: terminal_id.clone(),
2996 status: acp::TerminalExitStatus::new().exit_code(0),
2997 },
2998 cx,
2999 );
3000 });
3001
3002 // Now create a display-only lower-level terminal and send Created
3003 let lower = cx.new(|cx| {
3004 let builder = ::terminal::TerminalBuilder::new_display_only(
3005 ::terminal::terminal_settings::CursorShape::default(),
3006 ::terminal::terminal_settings::AlternateScroll::On,
3007 None,
3008 0,
3009 cx.background_executor(),
3010 PathStyle::local(),
3011 )
3012 .unwrap();
3013 builder.subscribe(cx)
3014 });
3015
3016 thread.update(cx, |thread, cx| {
3017 thread.on_terminal_provider_event(
3018 TerminalProviderEvent::Created {
3019 terminal_id: terminal_id.clone(),
3020 label: "Buffered Exit Test".to_string(),
3021 cwd: None,
3022 output_byte_limit: None,
3023 terminal: lower.clone(),
3024 },
3025 cx,
3026 );
3027 });
3028
3029 // Output should be present after Created (flushed from buffer)
3030 let content = thread.read_with(cx, |thread, cx| {
3031 let term = thread.terminal(terminal_id.clone()).unwrap();
3032 term.read_with(cx, |t, cx| t.inner().read(cx).get_content())
3033 });
3034
3035 assert!(
3036 content.contains("pre-exit data"),
3037 "expected pre-exit data to render, got: {content}"
3038 );
3039 }
3040
3041 /// Test that killing a terminal via Terminal::kill properly:
3042 /// 1. Causes wait_for_exit to complete (doesn't hang forever)
3043 /// 2. The underlying terminal still has the output that was written before the kill
3044 ///
3045 /// This test verifies that the fix to kill_active_task (which now also kills
3046 /// the shell process in addition to the foreground process) properly allows
3047 /// wait_for_exit to complete instead of hanging indefinitely.
3048 #[cfg(unix)]
3049 #[gpui::test]
3050 async fn test_terminal_kill_allows_wait_for_exit_to_complete(cx: &mut gpui::TestAppContext) {
3051 use std::collections::HashMap;
3052 use task::Shell;
3053 use util::shell_builder::ShellBuilder;
3054
3055 init_test(cx);
3056 cx.executor().allow_parking();
3057
3058 let fs = FakeFs::new(cx.executor());
3059 let project = Project::test(fs, [], cx).await;
3060 let connection = Rc::new(FakeAgentConnection::new());
3061 let thread = cx
3062 .update(|cx| {
3063 connection.new_session(
3064 project.clone(),
3065 PathList::new(&[Path::new(path!("/test"))]),
3066 cx,
3067 )
3068 })
3069 .await
3070 .unwrap();
3071
3072 let terminal_id = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
3073
3074 // Create a real PTY terminal that runs a command which prints output then sleeps
3075 // We use printf instead of echo and chain with && sleep to ensure proper execution
3076 let (completion_tx, _completion_rx) = smol::channel::unbounded();
3077 let (program, args) = ShellBuilder::new(&Shell::System, false).build(
3078 Some("printf 'output_before_kill\\n' && sleep 60".to_owned()),
3079 &[],
3080 );
3081
3082 let builder = cx
3083 .update(|cx| {
3084 ::terminal::TerminalBuilder::new(
3085 None,
3086 None,
3087 task::Shell::WithArguments {
3088 program,
3089 args,
3090 title_override: None,
3091 },
3092 HashMap::default(),
3093 ::terminal::terminal_settings::CursorShape::default(),
3094 ::terminal::terminal_settings::AlternateScroll::On,
3095 None,
3096 vec![],
3097 0,
3098 false,
3099 0,
3100 Some(completion_tx),
3101 cx,
3102 vec![],
3103 PathStyle::local(),
3104 )
3105 })
3106 .await
3107 .unwrap();
3108
3109 let lower_terminal = cx.new(|cx| builder.subscribe(cx));
3110
3111 // Create the acp_thread Terminal wrapper
3112 thread.update(cx, |thread, cx| {
3113 thread.on_terminal_provider_event(
3114 TerminalProviderEvent::Created {
3115 terminal_id: terminal_id.clone(),
3116 label: "printf output_before_kill && sleep 60".to_string(),
3117 cwd: None,
3118 output_byte_limit: None,
3119 terminal: lower_terminal.clone(),
3120 },
3121 cx,
3122 );
3123 });
3124
3125 // Wait for the printf command to execute and produce output
3126 // Use real time since parking is enabled
3127 cx.executor().timer(Duration::from_millis(500)).await;
3128
3129 // Get the acp_thread Terminal and kill it
3130 let wait_for_exit = thread.update(cx, |thread, cx| {
3131 let term = thread.terminals.get(&terminal_id).unwrap();
3132 let wait_for_exit = term.read(cx).wait_for_exit();
3133 term.update(cx, |term, cx| {
3134 term.kill(cx);
3135 });
3136 wait_for_exit
3137 });
3138
3139 // KEY ASSERTION: wait_for_exit should complete within a reasonable time (not hang).
3140 // Before the fix to kill_active_task, this would hang forever because
3141 // only the foreground process was killed, not the shell, so the PTY
3142 // child never exited and wait_for_completed_task never completed.
3143 let exit_result = futures::select! {
3144 result = futures::FutureExt::fuse(wait_for_exit) => Some(result),
3145 _ = futures::FutureExt::fuse(cx.background_executor.timer(Duration::from_secs(5))) => None,
3146 };
3147
3148 assert!(
3149 exit_result.is_some(),
3150 "wait_for_exit should complete after kill, but it timed out. \
3151 This indicates kill_active_task is not properly killing the shell process."
3152 );
3153
3154 // Give the system a chance to process any pending updates
3155 cx.run_until_parked();
3156
3157 // Verify that the underlying terminal still has the output that was
3158 // written before the kill. This verifies that killing doesn't lose output.
3159 let inner_content = thread.read_with(cx, |thread, cx| {
3160 let term = thread.terminals.get(&terminal_id).unwrap();
3161 term.read(cx).inner().read(cx).get_content()
3162 });
3163
3164 assert!(
3165 inner_content.contains("output_before_kill"),
3166 "Underlying terminal should contain output from before kill, got: {}",
3167 inner_content
3168 );
3169 }
3170
3171 #[gpui::test]
3172 async fn test_push_user_content_block(cx: &mut gpui::TestAppContext) {
3173 init_test(cx);
3174
3175 let fs = FakeFs::new(cx.executor());
3176 let project = Project::test(fs, [], cx).await;
3177 let connection = Rc::new(FakeAgentConnection::new());
3178 let thread = cx
3179 .update(|cx| {
3180 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
3181 })
3182 .await
3183 .unwrap();
3184
3185 // Test creating a new user message
3186 thread.update(cx, |thread, cx| {
3187 thread.push_user_content_block(None, "Hello, ".into(), cx);
3188 });
3189
3190 thread.update(cx, |thread, cx| {
3191 assert_eq!(thread.entries.len(), 1);
3192 if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
3193 assert_eq!(user_msg.id, None);
3194 assert_eq!(user_msg.content.to_markdown(cx), "Hello, ");
3195 } else {
3196 panic!("Expected UserMessage");
3197 }
3198 });
3199
3200 // Test appending to existing user message
3201 let message_1_id = UserMessageId::new();
3202 thread.update(cx, |thread, cx| {
3203 thread.push_user_content_block(Some(message_1_id.clone()), "world!".into(), cx);
3204 });
3205
3206 thread.update(cx, |thread, cx| {
3207 assert_eq!(thread.entries.len(), 1);
3208 if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
3209 assert_eq!(user_msg.id, Some(message_1_id));
3210 assert_eq!(user_msg.content.to_markdown(cx), "Hello, world!");
3211 } else {
3212 panic!("Expected UserMessage");
3213 }
3214 });
3215
3216 // Test creating new user message after assistant message
3217 thread.update(cx, |thread, cx| {
3218 thread.push_assistant_content_block("Assistant response".into(), false, cx);
3219 });
3220
3221 let message_2_id = UserMessageId::new();
3222 thread.update(cx, |thread, cx| {
3223 thread.push_user_content_block(
3224 Some(message_2_id.clone()),
3225 "New user message".into(),
3226 cx,
3227 );
3228 });
3229
3230 thread.update(cx, |thread, cx| {
3231 assert_eq!(thread.entries.len(), 3);
3232 if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[2] {
3233 assert_eq!(user_msg.id, Some(message_2_id));
3234 assert_eq!(user_msg.content.to_markdown(cx), "New user message");
3235 } else {
3236 panic!("Expected UserMessage at index 2");
3237 }
3238 });
3239 }
3240
3241 #[gpui::test]
3242 async fn test_thinking_concatenation(cx: &mut gpui::TestAppContext) {
3243 init_test(cx);
3244
3245 let fs = FakeFs::new(cx.executor());
3246 let project = Project::test(fs, [], cx).await;
3247 let connection = Rc::new(FakeAgentConnection::new().on_user_message(
3248 |_, thread, mut cx| {
3249 async move {
3250 thread.update(&mut cx, |thread, cx| {
3251 thread
3252 .handle_session_update(
3253 acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk::new(
3254 "Thinking ".into(),
3255 )),
3256 cx,
3257 )
3258 .unwrap();
3259 thread
3260 .handle_session_update(
3261 acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk::new(
3262 "hard!".into(),
3263 )),
3264 cx,
3265 )
3266 .unwrap();
3267 })?;
3268 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3269 }
3270 .boxed_local()
3271 },
3272 ));
3273
3274 let thread = cx
3275 .update(|cx| {
3276 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
3277 })
3278 .await
3279 .unwrap();
3280
3281 thread
3282 .update(cx, |thread, cx| thread.send_raw("Hello from Zed!", cx))
3283 .await
3284 .unwrap();
3285
3286 let output = thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
3287 assert_eq!(
3288 output,
3289 indoc! {r#"
3290 ## User
3291
3292 Hello from Zed!
3293
3294 ## Assistant
3295
3296 <thinking>
3297 Thinking hard!
3298 </thinking>
3299
3300 "#}
3301 );
3302 }
3303
3304 #[gpui::test]
3305 async fn test_edits_concurrently_to_user(cx: &mut TestAppContext) {
3306 init_test(cx);
3307
3308 let fs = FakeFs::new(cx.executor());
3309 fs.insert_tree(path!("/tmp"), json!({"foo": "one\ntwo\nthree\n"}))
3310 .await;
3311 let project = Project::test(fs.clone(), [], cx).await;
3312 let (read_file_tx, read_file_rx) = oneshot::channel::<()>();
3313 let read_file_tx = Rc::new(RefCell::new(Some(read_file_tx)));
3314 let connection = Rc::new(FakeAgentConnection::new().on_user_message(
3315 move |_, thread, mut cx| {
3316 let read_file_tx = read_file_tx.clone();
3317 async move {
3318 let content = thread
3319 .update(&mut cx, |thread, cx| {
3320 thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
3321 })
3322 .unwrap()
3323 .await
3324 .unwrap();
3325 assert_eq!(content, "one\ntwo\nthree\n");
3326 read_file_tx.take().unwrap().send(()).unwrap();
3327 thread
3328 .update(&mut cx, |thread, cx| {
3329 thread.write_text_file(
3330 path!("/tmp/foo").into(),
3331 "one\ntwo\nthree\nfour\nfive\n".to_string(),
3332 cx,
3333 )
3334 })
3335 .unwrap()
3336 .await
3337 .unwrap();
3338 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3339 }
3340 .boxed_local()
3341 },
3342 ));
3343
3344 let (worktree, pathbuf) = project
3345 .update(cx, |project, cx| {
3346 project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
3347 })
3348 .await
3349 .unwrap();
3350 let buffer = project
3351 .update(cx, |project, cx| {
3352 project.open_buffer((worktree.read(cx).id(), pathbuf), cx)
3353 })
3354 .await
3355 .unwrap();
3356
3357 let thread = cx
3358 .update(|cx| {
3359 connection.new_session(project, PathList::new(&[Path::new(path!("/tmp"))]), cx)
3360 })
3361 .await
3362 .unwrap();
3363
3364 let request = thread.update(cx, |thread, cx| {
3365 thread.send_raw("Extend the count in /tmp/foo", cx)
3366 });
3367 read_file_rx.await.ok();
3368 buffer.update(cx, |buffer, cx| {
3369 buffer.edit([(0..0, "zero\n".to_string())], None, cx);
3370 });
3371 cx.run_until_parked();
3372 assert_eq!(
3373 buffer.read_with(cx, |buffer, _| buffer.text()),
3374 "zero\none\ntwo\nthree\nfour\nfive\n"
3375 );
3376 assert_eq!(
3377 String::from_utf8(fs.read_file_sync(path!("/tmp/foo")).unwrap()).unwrap(),
3378 "zero\none\ntwo\nthree\nfour\nfive\n"
3379 );
3380 request.await.unwrap();
3381 }
3382
3383 #[gpui::test]
3384 async fn test_reading_from_line(cx: &mut TestAppContext) {
3385 init_test(cx);
3386
3387 let fs = FakeFs::new(cx.executor());
3388 fs.insert_tree(path!("/tmp"), json!({"foo": "one\ntwo\nthree\nfour\n"}))
3389 .await;
3390 let project = Project::test(fs.clone(), [], cx).await;
3391 project
3392 .update(cx, |project, cx| {
3393 project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
3394 })
3395 .await
3396 .unwrap();
3397
3398 let connection = Rc::new(FakeAgentConnection::new());
3399
3400 let thread = cx
3401 .update(|cx| {
3402 connection.new_session(project, PathList::new(&[Path::new(path!("/tmp"))]), cx)
3403 })
3404 .await
3405 .unwrap();
3406
3407 // Whole file
3408 let content = thread
3409 .update(cx, |thread, cx| {
3410 thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
3411 })
3412 .await
3413 .unwrap();
3414
3415 assert_eq!(content, "one\ntwo\nthree\nfour\n");
3416
3417 // Only start line
3418 let content = thread
3419 .update(cx, |thread, cx| {
3420 thread.read_text_file(path!("/tmp/foo").into(), Some(3), None, false, cx)
3421 })
3422 .await
3423 .unwrap();
3424
3425 assert_eq!(content, "three\nfour\n");
3426
3427 // Only limit
3428 let content = thread
3429 .update(cx, |thread, cx| {
3430 thread.read_text_file(path!("/tmp/foo").into(), None, Some(2), false, cx)
3431 })
3432 .await
3433 .unwrap();
3434
3435 assert_eq!(content, "one\ntwo\n");
3436
3437 // Range
3438 let content = thread
3439 .update(cx, |thread, cx| {
3440 thread.read_text_file(path!("/tmp/foo").into(), Some(2), Some(2), false, cx)
3441 })
3442 .await
3443 .unwrap();
3444
3445 assert_eq!(content, "two\nthree\n");
3446
3447 // Invalid
3448 let err = thread
3449 .update(cx, |thread, cx| {
3450 thread.read_text_file(path!("/tmp/foo").into(), Some(6), Some(2), false, cx)
3451 })
3452 .await
3453 .unwrap_err();
3454
3455 assert_eq!(
3456 err.to_string(),
3457 "Invalid params: \"Attempting to read beyond the end of the file, line 5:0\""
3458 );
3459 }
3460
3461 #[gpui::test]
3462 async fn test_reading_empty_file(cx: &mut TestAppContext) {
3463 init_test(cx);
3464
3465 let fs = FakeFs::new(cx.executor());
3466 fs.insert_tree(path!("/tmp"), json!({"foo": ""})).await;
3467 let project = Project::test(fs.clone(), [], cx).await;
3468 project
3469 .update(cx, |project, cx| {
3470 project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
3471 })
3472 .await
3473 .unwrap();
3474
3475 let connection = Rc::new(FakeAgentConnection::new());
3476
3477 let thread = cx
3478 .update(|cx| {
3479 connection.new_session(project, PathList::new(&[Path::new(path!("/tmp"))]), cx)
3480 })
3481 .await
3482 .unwrap();
3483
3484 // Whole file
3485 let content = thread
3486 .update(cx, |thread, cx| {
3487 thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
3488 })
3489 .await
3490 .unwrap();
3491
3492 assert_eq!(content, "");
3493
3494 // Only start line
3495 let content = thread
3496 .update(cx, |thread, cx| {
3497 thread.read_text_file(path!("/tmp/foo").into(), Some(1), None, false, cx)
3498 })
3499 .await
3500 .unwrap();
3501
3502 assert_eq!(content, "");
3503
3504 // Only limit
3505 let content = thread
3506 .update(cx, |thread, cx| {
3507 thread.read_text_file(path!("/tmp/foo").into(), None, Some(2), false, cx)
3508 })
3509 .await
3510 .unwrap();
3511
3512 assert_eq!(content, "");
3513
3514 // Range
3515 let content = thread
3516 .update(cx, |thread, cx| {
3517 thread.read_text_file(path!("/tmp/foo").into(), Some(1), Some(1), false, cx)
3518 })
3519 .await
3520 .unwrap();
3521
3522 assert_eq!(content, "");
3523
3524 // Invalid
3525 let err = thread
3526 .update(cx, |thread, cx| {
3527 thread.read_text_file(path!("/tmp/foo").into(), Some(5), Some(2), false, cx)
3528 })
3529 .await
3530 .unwrap_err();
3531
3532 assert_eq!(
3533 err.to_string(),
3534 "Invalid params: \"Attempting to read beyond the end of the file, line 1:0\""
3535 );
3536 }
3537 #[gpui::test]
3538 async fn test_reading_non_existing_file(cx: &mut TestAppContext) {
3539 init_test(cx);
3540
3541 let fs = FakeFs::new(cx.executor());
3542 fs.insert_tree(path!("/tmp"), json!({})).await;
3543 let project = Project::test(fs.clone(), [], cx).await;
3544 project
3545 .update(cx, |project, cx| {
3546 project.find_or_create_worktree(path!("/tmp"), true, cx)
3547 })
3548 .await
3549 .unwrap();
3550
3551 let connection = Rc::new(FakeAgentConnection::new());
3552
3553 let thread = cx
3554 .update(|cx| {
3555 connection.new_session(project, PathList::new(&[Path::new(path!("/tmp"))]), cx)
3556 })
3557 .await
3558 .unwrap();
3559
3560 // Out of project file
3561 let err = thread
3562 .update(cx, |thread, cx| {
3563 thread.read_text_file(path!("/foo").into(), None, None, false, cx)
3564 })
3565 .await
3566 .unwrap_err();
3567
3568 assert_eq!(err.code, acp::ErrorCode::ResourceNotFound);
3569 }
3570
3571 #[gpui::test]
3572 async fn test_succeeding_canceled_toolcall(cx: &mut TestAppContext) {
3573 init_test(cx);
3574
3575 let fs = FakeFs::new(cx.executor());
3576 let project = Project::test(fs, [], cx).await;
3577 let id = acp::ToolCallId::new("test");
3578
3579 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3580 let id = id.clone();
3581 move |_, thread, mut cx| {
3582 let id = id.clone();
3583 async move {
3584 thread
3585 .update(&mut cx, |thread, cx| {
3586 thread.handle_session_update(
3587 acp::SessionUpdate::ToolCall(
3588 acp::ToolCall::new(id.clone(), "Label")
3589 .kind(acp::ToolKind::Fetch)
3590 .status(acp::ToolCallStatus::InProgress),
3591 ),
3592 cx,
3593 )
3594 })
3595 .unwrap()
3596 .unwrap();
3597 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3598 }
3599 .boxed_local()
3600 }
3601 }));
3602
3603 let thread = cx
3604 .update(|cx| {
3605 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
3606 })
3607 .await
3608 .unwrap();
3609
3610 let request = thread.update(cx, |thread, cx| {
3611 thread.send_raw("Fetch https://example.com", cx)
3612 });
3613
3614 run_until_first_tool_call(&thread, cx).await;
3615
3616 thread.read_with(cx, |thread, _| {
3617 assert!(matches!(
3618 thread.entries[1],
3619 AgentThreadEntry::ToolCall(ToolCall {
3620 status: ToolCallStatus::InProgress,
3621 ..
3622 })
3623 ));
3624 });
3625
3626 thread.update(cx, |thread, cx| thread.cancel(cx)).await;
3627
3628 thread.read_with(cx, |thread, _| {
3629 assert!(matches!(
3630 &thread.entries[1],
3631 AgentThreadEntry::ToolCall(ToolCall {
3632 status: ToolCallStatus::Canceled,
3633 ..
3634 })
3635 ));
3636 });
3637
3638 thread
3639 .update(cx, |thread, cx| {
3640 thread.handle_session_update(
3641 acp::SessionUpdate::ToolCallUpdate(acp::ToolCallUpdate::new(
3642 id,
3643 acp::ToolCallUpdateFields::new().status(acp::ToolCallStatus::Completed),
3644 )),
3645 cx,
3646 )
3647 })
3648 .unwrap();
3649
3650 request.await.unwrap();
3651
3652 thread.read_with(cx, |thread, _| {
3653 assert!(matches!(
3654 thread.entries[1],
3655 AgentThreadEntry::ToolCall(ToolCall {
3656 status: ToolCallStatus::Completed,
3657 ..
3658 })
3659 ));
3660 });
3661 }
3662
3663 #[gpui::test]
3664 async fn test_no_pending_edits_if_tool_calls_are_completed(cx: &mut TestAppContext) {
3665 init_test(cx);
3666 let fs = FakeFs::new(cx.background_executor.clone());
3667 fs.insert_tree(path!("/test"), json!({})).await;
3668 let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
3669
3670 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3671 move |_, thread, mut cx| {
3672 async move {
3673 thread
3674 .update(&mut cx, |thread, cx| {
3675 thread.handle_session_update(
3676 acp::SessionUpdate::ToolCall(
3677 acp::ToolCall::new("test", "Label")
3678 .kind(acp::ToolKind::Edit)
3679 .status(acp::ToolCallStatus::Completed)
3680 .content(vec![acp::ToolCallContent::Diff(acp::Diff::new(
3681 "/test/test.txt",
3682 "foo",
3683 ))]),
3684 ),
3685 cx,
3686 )
3687 })
3688 .unwrap()
3689 .unwrap();
3690 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3691 }
3692 .boxed_local()
3693 }
3694 }));
3695
3696 let thread = cx
3697 .update(|cx| {
3698 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
3699 })
3700 .await
3701 .unwrap();
3702
3703 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Hi".into()], cx)))
3704 .await
3705 .unwrap();
3706
3707 assert!(cx.read(|cx| !thread.read(cx).has_pending_edit_tool_calls()));
3708 }
3709
3710 #[gpui::test(iterations = 10)]
3711 async fn test_checkpoints(cx: &mut TestAppContext) {
3712 init_test(cx);
3713 let fs = FakeFs::new(cx.background_executor.clone());
3714 fs.insert_tree(
3715 path!("/test"),
3716 json!({
3717 ".git": {}
3718 }),
3719 )
3720 .await;
3721 let project = Project::test(fs.clone(), [path!("/test").as_ref()], cx).await;
3722
3723 let simulate_changes = Arc::new(AtomicBool::new(true));
3724 let next_filename = Arc::new(AtomicUsize::new(0));
3725 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3726 let simulate_changes = simulate_changes.clone();
3727 let next_filename = next_filename.clone();
3728 let fs = fs.clone();
3729 move |request, thread, mut cx| {
3730 let fs = fs.clone();
3731 let simulate_changes = simulate_changes.clone();
3732 let next_filename = next_filename.clone();
3733 async move {
3734 if simulate_changes.load(SeqCst) {
3735 let filename = format!("/test/file-{}", next_filename.fetch_add(1, SeqCst));
3736 fs.write(Path::new(&filename), b"").await?;
3737 }
3738
3739 let acp::ContentBlock::Text(content) = &request.prompt[0] else {
3740 panic!("expected text content block");
3741 };
3742 thread.update(&mut cx, |thread, cx| {
3743 thread
3744 .handle_session_update(
3745 acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk::new(
3746 content.text.to_uppercase().into(),
3747 )),
3748 cx,
3749 )
3750 .unwrap();
3751 })?;
3752 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3753 }
3754 .boxed_local()
3755 }
3756 }));
3757 let thread = cx
3758 .update(|cx| {
3759 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
3760 })
3761 .await
3762 .unwrap();
3763
3764 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Lorem".into()], cx)))
3765 .await
3766 .unwrap();
3767 thread.read_with(cx, |thread, cx| {
3768 assert_eq!(
3769 thread.to_markdown(cx),
3770 indoc! {"
3771 ## User (checkpoint)
3772
3773 Lorem
3774
3775 ## Assistant
3776
3777 LOREM
3778
3779 "}
3780 );
3781 });
3782 assert_eq!(fs.files(), vec![Path::new(path!("/test/file-0"))]);
3783
3784 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["ipsum".into()], cx)))
3785 .await
3786 .unwrap();
3787 thread.read_with(cx, |thread, cx| {
3788 assert_eq!(
3789 thread.to_markdown(cx),
3790 indoc! {"
3791 ## User (checkpoint)
3792
3793 Lorem
3794
3795 ## Assistant
3796
3797 LOREM
3798
3799 ## User (checkpoint)
3800
3801 ipsum
3802
3803 ## Assistant
3804
3805 IPSUM
3806
3807 "}
3808 );
3809 });
3810 assert_eq!(
3811 fs.files(),
3812 vec![
3813 Path::new(path!("/test/file-0")),
3814 Path::new(path!("/test/file-1"))
3815 ]
3816 );
3817
3818 // Checkpoint isn't stored when there are no changes.
3819 simulate_changes.store(false, SeqCst);
3820 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["dolor".into()], cx)))
3821 .await
3822 .unwrap();
3823 thread.read_with(cx, |thread, cx| {
3824 assert_eq!(
3825 thread.to_markdown(cx),
3826 indoc! {"
3827 ## User (checkpoint)
3828
3829 Lorem
3830
3831 ## Assistant
3832
3833 LOREM
3834
3835 ## User (checkpoint)
3836
3837 ipsum
3838
3839 ## Assistant
3840
3841 IPSUM
3842
3843 ## User
3844
3845 dolor
3846
3847 ## Assistant
3848
3849 DOLOR
3850
3851 "}
3852 );
3853 });
3854 assert_eq!(
3855 fs.files(),
3856 vec![
3857 Path::new(path!("/test/file-0")),
3858 Path::new(path!("/test/file-1"))
3859 ]
3860 );
3861
3862 // Rewinding the conversation truncates the history and restores the checkpoint.
3863 thread
3864 .update(cx, |thread, cx| {
3865 let AgentThreadEntry::UserMessage(message) = &thread.entries[2] else {
3866 panic!("unexpected entries {:?}", thread.entries)
3867 };
3868 thread.restore_checkpoint(message.id.clone().unwrap(), cx)
3869 })
3870 .await
3871 .unwrap();
3872 thread.read_with(cx, |thread, cx| {
3873 assert_eq!(
3874 thread.to_markdown(cx),
3875 indoc! {"
3876 ## User (checkpoint)
3877
3878 Lorem
3879
3880 ## Assistant
3881
3882 LOREM
3883
3884 "}
3885 );
3886 });
3887 assert_eq!(fs.files(), vec![Path::new(path!("/test/file-0"))]);
3888 }
3889
3890 #[gpui::test]
3891 async fn test_tool_result_refusal(cx: &mut TestAppContext) {
3892 use std::sync::atomic::AtomicUsize;
3893 init_test(cx);
3894
3895 let fs = FakeFs::new(cx.executor());
3896 let project = Project::test(fs, None, cx).await;
3897
3898 // Create a connection that simulates refusal after tool result
3899 let prompt_count = Arc::new(AtomicUsize::new(0));
3900 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3901 let prompt_count = prompt_count.clone();
3902 move |_request, thread, mut cx| {
3903 let count = prompt_count.fetch_add(1, SeqCst);
3904 async move {
3905 if count == 0 {
3906 // First prompt: Generate a tool call with result
3907 thread.update(&mut cx, |thread, cx| {
3908 thread
3909 .handle_session_update(
3910 acp::SessionUpdate::ToolCall(
3911 acp::ToolCall::new("tool1", "Test Tool")
3912 .kind(acp::ToolKind::Fetch)
3913 .status(acp::ToolCallStatus::Completed)
3914 .raw_input(serde_json::json!({"query": "test"}))
3915 .raw_output(serde_json::json!({"result": "inappropriate content"})),
3916 ),
3917 cx,
3918 )
3919 .unwrap();
3920 })?;
3921
3922 // Now return refusal because of the tool result
3923 Ok(acp::PromptResponse::new(acp::StopReason::Refusal))
3924 } else {
3925 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3926 }
3927 }
3928 .boxed_local()
3929 }
3930 }));
3931
3932 let thread = cx
3933 .update(|cx| {
3934 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
3935 })
3936 .await
3937 .unwrap();
3938
3939 // Track if we see a Refusal event
3940 let saw_refusal_event = Arc::new(std::sync::Mutex::new(false));
3941 let saw_refusal_event_captured = saw_refusal_event.clone();
3942 thread.update(cx, |_thread, cx| {
3943 cx.subscribe(
3944 &thread,
3945 move |_thread, _event_thread, event: &AcpThreadEvent, _cx| {
3946 if matches!(event, AcpThreadEvent::Refusal) {
3947 *saw_refusal_event_captured.lock().unwrap() = true;
3948 }
3949 },
3950 )
3951 .detach();
3952 });
3953
3954 // Send a user message - this will trigger tool call and then refusal
3955 let send_task = thread.update(cx, |thread, cx| thread.send(vec!["Hello".into()], cx));
3956 cx.background_executor.spawn(send_task).detach();
3957 cx.run_until_parked();
3958
3959 // Verify that:
3960 // 1. A Refusal event WAS emitted (because it's a tool result refusal, not user prompt)
3961 // 2. The user message was NOT truncated
3962 assert!(
3963 *saw_refusal_event.lock().unwrap(),
3964 "Refusal event should be emitted for tool result refusals"
3965 );
3966
3967 thread.read_with(cx, |thread, _| {
3968 let entries = thread.entries();
3969 assert!(entries.len() >= 2, "Should have user message and tool call");
3970
3971 // Verify user message is still there
3972 assert!(
3973 matches!(entries[0], AgentThreadEntry::UserMessage(_)),
3974 "User message should not be truncated"
3975 );
3976
3977 // Verify tool call is there with result
3978 if let AgentThreadEntry::ToolCall(tool_call) = &entries[1] {
3979 assert!(
3980 tool_call.raw_output.is_some(),
3981 "Tool call should have output"
3982 );
3983 } else {
3984 panic!("Expected tool call at index 1");
3985 }
3986 });
3987 }
3988
3989 #[gpui::test]
3990 async fn test_user_prompt_refusal_emits_event(cx: &mut TestAppContext) {
3991 init_test(cx);
3992
3993 let fs = FakeFs::new(cx.executor());
3994 let project = Project::test(fs, None, cx).await;
3995
3996 let refuse_next = Arc::new(AtomicBool::new(false));
3997 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3998 let refuse_next = refuse_next.clone();
3999 move |_request, _thread, _cx| {
4000 if refuse_next.load(SeqCst) {
4001 async move { Ok(acp::PromptResponse::new(acp::StopReason::Refusal)) }
4002 .boxed_local()
4003 } else {
4004 async move { Ok(acp::PromptResponse::new(acp::StopReason::EndTurn)) }
4005 .boxed_local()
4006 }
4007 }
4008 }));
4009
4010 let thread = cx
4011 .update(|cx| {
4012 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4013 })
4014 .await
4015 .unwrap();
4016
4017 // Track if we see a Refusal event
4018 let saw_refusal_event = Arc::new(std::sync::Mutex::new(false));
4019 let saw_refusal_event_captured = saw_refusal_event.clone();
4020 thread.update(cx, |_thread, cx| {
4021 cx.subscribe(
4022 &thread,
4023 move |_thread, _event_thread, event: &AcpThreadEvent, _cx| {
4024 if matches!(event, AcpThreadEvent::Refusal) {
4025 *saw_refusal_event_captured.lock().unwrap() = true;
4026 }
4027 },
4028 )
4029 .detach();
4030 });
4031
4032 // Send a message that will be refused
4033 refuse_next.store(true, SeqCst);
4034 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["hello".into()], cx)))
4035 .await
4036 .unwrap();
4037
4038 // Verify that a Refusal event WAS emitted for user prompt refusal
4039 assert!(
4040 *saw_refusal_event.lock().unwrap(),
4041 "Refusal event should be emitted for user prompt refusals"
4042 );
4043
4044 // Verify the message was truncated (user prompt refusal)
4045 thread.read_with(cx, |thread, cx| {
4046 assert_eq!(thread.to_markdown(cx), "");
4047 });
4048 }
4049
4050 #[gpui::test]
4051 async fn test_refusal(cx: &mut TestAppContext) {
4052 init_test(cx);
4053 let fs = FakeFs::new(cx.background_executor.clone());
4054 fs.insert_tree(path!("/"), json!({})).await;
4055 let project = Project::test(fs.clone(), [path!("/").as_ref()], cx).await;
4056
4057 let refuse_next = Arc::new(AtomicBool::new(false));
4058 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
4059 let refuse_next = refuse_next.clone();
4060 move |request, thread, mut cx| {
4061 let refuse_next = refuse_next.clone();
4062 async move {
4063 if refuse_next.load(SeqCst) {
4064 return Ok(acp::PromptResponse::new(acp::StopReason::Refusal));
4065 }
4066
4067 let acp::ContentBlock::Text(content) = &request.prompt[0] else {
4068 panic!("expected text content block");
4069 };
4070 thread.update(&mut cx, |thread, cx| {
4071 thread
4072 .handle_session_update(
4073 acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk::new(
4074 content.text.to_uppercase().into(),
4075 )),
4076 cx,
4077 )
4078 .unwrap();
4079 })?;
4080 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
4081 }
4082 .boxed_local()
4083 }
4084 }));
4085 let thread = cx
4086 .update(|cx| {
4087 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4088 })
4089 .await
4090 .unwrap();
4091
4092 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["hello".into()], cx)))
4093 .await
4094 .unwrap();
4095 thread.read_with(cx, |thread, cx| {
4096 assert_eq!(
4097 thread.to_markdown(cx),
4098 indoc! {"
4099 ## User
4100
4101 hello
4102
4103 ## Assistant
4104
4105 HELLO
4106
4107 "}
4108 );
4109 });
4110
4111 // Simulate refusing the second message. The message should be truncated
4112 // when a user prompt is refused.
4113 refuse_next.store(true, SeqCst);
4114 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["world".into()], cx)))
4115 .await
4116 .unwrap();
4117 thread.read_with(cx, |thread, cx| {
4118 assert_eq!(
4119 thread.to_markdown(cx),
4120 indoc! {"
4121 ## User
4122
4123 hello
4124
4125 ## Assistant
4126
4127 HELLO
4128
4129 "}
4130 );
4131 });
4132 }
4133
4134 async fn run_until_first_tool_call(
4135 thread: &Entity<AcpThread>,
4136 cx: &mut TestAppContext,
4137 ) -> usize {
4138 let (mut tx, mut rx) = mpsc::channel::<usize>(1);
4139
4140 let subscription = cx.update(|cx| {
4141 cx.subscribe(thread, move |thread, _, cx| {
4142 for (ix, entry) in thread.read(cx).entries.iter().enumerate() {
4143 if matches!(entry, AgentThreadEntry::ToolCall(_)) {
4144 return tx.try_send(ix).unwrap();
4145 }
4146 }
4147 })
4148 });
4149
4150 select! {
4151 _ = futures::FutureExt::fuse(cx.background_executor.timer(Duration::from_secs(10))) => {
4152 panic!("Timeout waiting for tool call")
4153 }
4154 ix = rx.next().fuse() => {
4155 drop(subscription);
4156 ix.unwrap()
4157 }
4158 }
4159 }
4160
4161 #[derive(Clone, Default)]
4162 struct FakeAgentConnection {
4163 auth_methods: Vec<acp::AuthMethod>,
4164 sessions: Arc<parking_lot::Mutex<HashMap<acp::SessionId, WeakEntity<AcpThread>>>>,
4165 set_title_calls: Rc<RefCell<Vec<SharedString>>>,
4166 on_user_message: Option<
4167 Rc<
4168 dyn Fn(
4169 acp::PromptRequest,
4170 WeakEntity<AcpThread>,
4171 AsyncApp,
4172 ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
4173 + 'static,
4174 >,
4175 >,
4176 }
4177
4178 impl FakeAgentConnection {
4179 fn new() -> Self {
4180 Self {
4181 auth_methods: Vec::new(),
4182 on_user_message: None,
4183 sessions: Arc::default(),
4184 set_title_calls: Default::default(),
4185 }
4186 }
4187
4188 #[expect(unused)]
4189 fn with_auth_methods(mut self, auth_methods: Vec<acp::AuthMethod>) -> Self {
4190 self.auth_methods = auth_methods;
4191 self
4192 }
4193
4194 fn on_user_message(
4195 mut self,
4196 handler: impl Fn(
4197 acp::PromptRequest,
4198 WeakEntity<AcpThread>,
4199 AsyncApp,
4200 ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
4201 + 'static,
4202 ) -> Self {
4203 self.on_user_message.replace(Rc::new(handler));
4204 self
4205 }
4206 }
4207
4208 impl AgentConnection for FakeAgentConnection {
4209 fn agent_id(&self) -> AgentId {
4210 AgentId::new("fake")
4211 }
4212
4213 fn telemetry_id(&self) -> SharedString {
4214 "fake".into()
4215 }
4216
4217 fn auth_methods(&self) -> &[acp::AuthMethod] {
4218 &self.auth_methods
4219 }
4220
4221 fn new_session(
4222 self: Rc<Self>,
4223 project: Entity<Project>,
4224 work_dirs: PathList,
4225 cx: &mut App,
4226 ) -> Task<gpui::Result<Entity<AcpThread>>> {
4227 let session_id = acp::SessionId::new(
4228 rand::rng()
4229 .sample_iter(&distr::Alphanumeric)
4230 .take(7)
4231 .map(char::from)
4232 .collect::<String>(),
4233 );
4234 let action_log = cx.new(|_| ActionLog::new(project.clone()));
4235 let thread = cx.new(|cx| {
4236 AcpThread::new(
4237 None,
4238 "Test",
4239 Some(work_dirs),
4240 self.clone(),
4241 project,
4242 action_log,
4243 session_id.clone(),
4244 watch::Receiver::constant(
4245 acp::PromptCapabilities::new()
4246 .image(true)
4247 .audio(true)
4248 .embedded_context(true),
4249 ),
4250 cx,
4251 )
4252 });
4253 self.sessions.lock().insert(session_id, thread.downgrade());
4254 Task::ready(Ok(thread))
4255 }
4256
4257 fn authenticate(&self, method: acp::AuthMethodId, _cx: &mut App) -> Task<gpui::Result<()>> {
4258 if self.auth_methods().iter().any(|m| m.id() == &method) {
4259 Task::ready(Ok(()))
4260 } else {
4261 Task::ready(Err(anyhow!("Invalid Auth Method")))
4262 }
4263 }
4264
4265 fn prompt(
4266 &self,
4267 _id: Option<UserMessageId>,
4268 params: acp::PromptRequest,
4269 cx: &mut App,
4270 ) -> Task<gpui::Result<acp::PromptResponse>> {
4271 let sessions = self.sessions.lock();
4272 let thread = sessions.get(¶ms.session_id).unwrap();
4273 if let Some(handler) = &self.on_user_message {
4274 let handler = handler.clone();
4275 let thread = thread.clone();
4276 cx.spawn(async move |cx| handler(params, thread, cx.clone()).await)
4277 } else {
4278 Task::ready(Ok(acp::PromptResponse::new(acp::StopReason::EndTurn)))
4279 }
4280 }
4281
4282 fn cancel(&self, _session_id: &acp::SessionId, _cx: &mut App) {}
4283
4284 fn truncate(
4285 &self,
4286 session_id: &acp::SessionId,
4287 _cx: &App,
4288 ) -> Option<Rc<dyn AgentSessionTruncate>> {
4289 Some(Rc::new(FakeAgentSessionEditor {
4290 _session_id: session_id.clone(),
4291 }))
4292 }
4293
4294 fn set_title(
4295 &self,
4296 _session_id: &acp::SessionId,
4297 _cx: &App,
4298 ) -> Option<Rc<dyn AgentSessionSetTitle>> {
4299 Some(Rc::new(FakeAgentSessionSetTitle {
4300 calls: self.set_title_calls.clone(),
4301 }))
4302 }
4303
4304 fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
4305 self
4306 }
4307 }
4308
4309 struct FakeAgentSessionSetTitle {
4310 calls: Rc<RefCell<Vec<SharedString>>>,
4311 }
4312
4313 impl AgentSessionSetTitle for FakeAgentSessionSetTitle {
4314 fn run(&self, title: SharedString, _cx: &mut App) -> Task<Result<()>> {
4315 self.calls.borrow_mut().push(title);
4316 Task::ready(Ok(()))
4317 }
4318 }
4319
4320 struct FakeAgentSessionEditor {
4321 _session_id: acp::SessionId,
4322 }
4323
4324 impl AgentSessionTruncate for FakeAgentSessionEditor {
4325 fn run(&self, _message_id: UserMessageId, _cx: &mut App) -> Task<Result<()>> {
4326 Task::ready(Ok(()))
4327 }
4328 }
4329
4330 #[gpui::test]
4331 async fn test_tool_call_not_found_creates_failed_entry(cx: &mut TestAppContext) {
4332 init_test(cx);
4333
4334 let fs = FakeFs::new(cx.executor());
4335 let project = Project::test(fs, [], cx).await;
4336 let connection = Rc::new(FakeAgentConnection::new());
4337 let thread = cx
4338 .update(|cx| {
4339 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4340 })
4341 .await
4342 .unwrap();
4343
4344 // Try to update a tool call that doesn't exist
4345 let nonexistent_id = acp::ToolCallId::new("nonexistent-tool-call");
4346 thread.update(cx, |thread, cx| {
4347 let result = thread.handle_session_update(
4348 acp::SessionUpdate::ToolCallUpdate(acp::ToolCallUpdate::new(
4349 nonexistent_id.clone(),
4350 acp::ToolCallUpdateFields::new().status(acp::ToolCallStatus::Completed),
4351 )),
4352 cx,
4353 );
4354
4355 // The update should succeed (not return an error)
4356 assert!(result.is_ok());
4357
4358 // There should now be exactly one entry in the thread
4359 assert_eq!(thread.entries.len(), 1);
4360
4361 // The entry should be a failed tool call
4362 if let AgentThreadEntry::ToolCall(tool_call) = &thread.entries[0] {
4363 assert_eq!(tool_call.id, nonexistent_id);
4364 assert!(matches!(tool_call.status, ToolCallStatus::Failed));
4365 assert_eq!(tool_call.kind, acp::ToolKind::Fetch);
4366
4367 // Check that the content contains the error message
4368 assert_eq!(tool_call.content.len(), 1);
4369 if let ToolCallContent::ContentBlock(content_block) = &tool_call.content[0] {
4370 match content_block {
4371 ContentBlock::Markdown { markdown } => {
4372 let markdown_text = markdown.read(cx).source();
4373 assert!(markdown_text.contains("Tool call not found"));
4374 }
4375 ContentBlock::Empty => panic!("Expected markdown content, got empty"),
4376 ContentBlock::ResourceLink { .. } => {
4377 panic!("Expected markdown content, got resource link")
4378 }
4379 ContentBlock::Image { .. } => {
4380 panic!("Expected markdown content, got image")
4381 }
4382 }
4383 } else {
4384 panic!("Expected ContentBlock, got: {:?}", tool_call.content[0]);
4385 }
4386 } else {
4387 panic!("Expected ToolCall entry, got: {:?}", thread.entries[0]);
4388 }
4389 });
4390 }
4391
4392 /// Tests that restoring a checkpoint properly cleans up terminals that were
4393 /// created after that checkpoint, and cancels any in-progress generation.
4394 ///
4395 /// Reproduces issue #35142: When a checkpoint is restored, any terminal processes
4396 /// that were started after that checkpoint should be terminated, and any in-progress
4397 /// AI generation should be canceled.
4398 #[gpui::test]
4399 async fn test_restore_checkpoint_kills_terminal(cx: &mut TestAppContext) {
4400 init_test(cx);
4401
4402 let fs = FakeFs::new(cx.executor());
4403 let project = Project::test(fs, [], cx).await;
4404 let connection = Rc::new(FakeAgentConnection::new());
4405 let thread = cx
4406 .update(|cx| {
4407 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4408 })
4409 .await
4410 .unwrap();
4411
4412 // Send first user message to create a checkpoint
4413 cx.update(|cx| {
4414 thread.update(cx, |thread, cx| {
4415 thread.send(vec!["first message".into()], cx)
4416 })
4417 })
4418 .await
4419 .unwrap();
4420
4421 // Send second message (creates another checkpoint) - we'll restore to this one
4422 cx.update(|cx| {
4423 thread.update(cx, |thread, cx| {
4424 thread.send(vec!["second message".into()], cx)
4425 })
4426 })
4427 .await
4428 .unwrap();
4429
4430 // Create 2 terminals BEFORE the checkpoint that have completed running
4431 let terminal_id_1 = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
4432 let mock_terminal_1 = cx.new(|cx| {
4433 let builder = ::terminal::TerminalBuilder::new_display_only(
4434 ::terminal::terminal_settings::CursorShape::default(),
4435 ::terminal::terminal_settings::AlternateScroll::On,
4436 None,
4437 0,
4438 cx.background_executor(),
4439 PathStyle::local(),
4440 )
4441 .unwrap();
4442 builder.subscribe(cx)
4443 });
4444
4445 thread.update(cx, |thread, cx| {
4446 thread.on_terminal_provider_event(
4447 TerminalProviderEvent::Created {
4448 terminal_id: terminal_id_1.clone(),
4449 label: "echo 'first'".to_string(),
4450 cwd: Some(PathBuf::from("/test")),
4451 output_byte_limit: None,
4452 terminal: mock_terminal_1.clone(),
4453 },
4454 cx,
4455 );
4456 });
4457
4458 thread.update(cx, |thread, cx| {
4459 thread.on_terminal_provider_event(
4460 TerminalProviderEvent::Output {
4461 terminal_id: terminal_id_1.clone(),
4462 data: b"first\n".to_vec(),
4463 },
4464 cx,
4465 );
4466 });
4467
4468 thread.update(cx, |thread, cx| {
4469 thread.on_terminal_provider_event(
4470 TerminalProviderEvent::Exit {
4471 terminal_id: terminal_id_1.clone(),
4472 status: acp::TerminalExitStatus::new().exit_code(0),
4473 },
4474 cx,
4475 );
4476 });
4477
4478 let terminal_id_2 = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
4479 let mock_terminal_2 = cx.new(|cx| {
4480 let builder = ::terminal::TerminalBuilder::new_display_only(
4481 ::terminal::terminal_settings::CursorShape::default(),
4482 ::terminal::terminal_settings::AlternateScroll::On,
4483 None,
4484 0,
4485 cx.background_executor(),
4486 PathStyle::local(),
4487 )
4488 .unwrap();
4489 builder.subscribe(cx)
4490 });
4491
4492 thread.update(cx, |thread, cx| {
4493 thread.on_terminal_provider_event(
4494 TerminalProviderEvent::Created {
4495 terminal_id: terminal_id_2.clone(),
4496 label: "echo 'second'".to_string(),
4497 cwd: Some(PathBuf::from("/test")),
4498 output_byte_limit: None,
4499 terminal: mock_terminal_2.clone(),
4500 },
4501 cx,
4502 );
4503 });
4504
4505 thread.update(cx, |thread, cx| {
4506 thread.on_terminal_provider_event(
4507 TerminalProviderEvent::Output {
4508 terminal_id: terminal_id_2.clone(),
4509 data: b"second\n".to_vec(),
4510 },
4511 cx,
4512 );
4513 });
4514
4515 thread.update(cx, |thread, cx| {
4516 thread.on_terminal_provider_event(
4517 TerminalProviderEvent::Exit {
4518 terminal_id: terminal_id_2.clone(),
4519 status: acp::TerminalExitStatus::new().exit_code(0),
4520 },
4521 cx,
4522 );
4523 });
4524
4525 // Get the second message ID to restore to
4526 let second_message_id = thread.read_with(cx, |thread, _| {
4527 // At this point we have:
4528 // - Index 0: First user message (with checkpoint)
4529 // - Index 1: Second user message (with checkpoint)
4530 // No assistant responses because FakeAgentConnection just returns EndTurn
4531 let AgentThreadEntry::UserMessage(message) = &thread.entries[1] else {
4532 panic!("expected user message at index 1");
4533 };
4534 message.id.clone().unwrap()
4535 });
4536
4537 // Create a terminal AFTER the checkpoint we'll restore to.
4538 // This simulates the AI agent starting a long-running terminal command.
4539 let terminal_id = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
4540 let mock_terminal = cx.new(|cx| {
4541 let builder = ::terminal::TerminalBuilder::new_display_only(
4542 ::terminal::terminal_settings::CursorShape::default(),
4543 ::terminal::terminal_settings::AlternateScroll::On,
4544 None,
4545 0,
4546 cx.background_executor(),
4547 PathStyle::local(),
4548 )
4549 .unwrap();
4550 builder.subscribe(cx)
4551 });
4552
4553 // Register the terminal as created
4554 thread.update(cx, |thread, cx| {
4555 thread.on_terminal_provider_event(
4556 TerminalProviderEvent::Created {
4557 terminal_id: terminal_id.clone(),
4558 label: "sleep 1000".to_string(),
4559 cwd: Some(PathBuf::from("/test")),
4560 output_byte_limit: None,
4561 terminal: mock_terminal.clone(),
4562 },
4563 cx,
4564 );
4565 });
4566
4567 // Simulate the terminal producing output (still running)
4568 thread.update(cx, |thread, cx| {
4569 thread.on_terminal_provider_event(
4570 TerminalProviderEvent::Output {
4571 terminal_id: terminal_id.clone(),
4572 data: b"terminal is running...\n".to_vec(),
4573 },
4574 cx,
4575 );
4576 });
4577
4578 // Create a tool call entry that references this terminal
4579 // This represents the agent requesting a terminal command
4580 thread.update(cx, |thread, cx| {
4581 thread
4582 .handle_session_update(
4583 acp::SessionUpdate::ToolCall(
4584 acp::ToolCall::new("terminal-tool-1", "Running command")
4585 .kind(acp::ToolKind::Execute)
4586 .status(acp::ToolCallStatus::InProgress)
4587 .content(vec![acp::ToolCallContent::Terminal(acp::Terminal::new(
4588 terminal_id.clone(),
4589 ))])
4590 .raw_input(serde_json::json!({"command": "sleep 1000", "cd": "/test"})),
4591 ),
4592 cx,
4593 )
4594 .unwrap();
4595 });
4596
4597 // Verify terminal exists and is in the thread
4598 let terminal_exists_before =
4599 thread.read_with(cx, |thread, _| thread.terminals.contains_key(&terminal_id));
4600 assert!(
4601 terminal_exists_before,
4602 "Terminal should exist before checkpoint restore"
4603 );
4604
4605 // Verify the terminal's underlying task is still running (not completed)
4606 let terminal_running_before = thread.read_with(cx, |thread, _cx| {
4607 let terminal_entity = thread.terminals.get(&terminal_id).unwrap();
4608 terminal_entity.read_with(cx, |term, _cx| {
4609 term.output().is_none() // output is None means it's still running
4610 })
4611 });
4612 assert!(
4613 terminal_running_before,
4614 "Terminal should be running before checkpoint restore"
4615 );
4616
4617 // Verify we have the expected entries before restore
4618 let entry_count_before = thread.read_with(cx, |thread, _| thread.entries.len());
4619 assert!(
4620 entry_count_before > 1,
4621 "Should have multiple entries before restore"
4622 );
4623
4624 // Restore the checkpoint to the second message.
4625 // This should:
4626 // 1. Cancel any in-progress generation (via the cancel() call)
4627 // 2. Remove the terminal that was created after that point
4628 thread
4629 .update(cx, |thread, cx| {
4630 thread.restore_checkpoint(second_message_id, cx)
4631 })
4632 .await
4633 .unwrap();
4634
4635 // Verify that no send_task is in progress after restore
4636 // (cancel() clears the send_task)
4637 let has_send_task_after = thread.read_with(cx, |thread, _| thread.running_turn.is_some());
4638 assert!(
4639 !has_send_task_after,
4640 "Should not have a send_task after restore (cancel should have cleared it)"
4641 );
4642
4643 // Verify the entries were truncated (restoring to index 1 truncates at 1, keeping only index 0)
4644 let entry_count = thread.read_with(cx, |thread, _| thread.entries.len());
4645 assert_eq!(
4646 entry_count, 1,
4647 "Should have 1 entry after restore (only the first user message)"
4648 );
4649
4650 // Verify the 2 completed terminals from before the checkpoint still exist
4651 let terminal_1_exists = thread.read_with(cx, |thread, _| {
4652 thread.terminals.contains_key(&terminal_id_1)
4653 });
4654 assert!(
4655 terminal_1_exists,
4656 "Terminal 1 (from before checkpoint) should still exist"
4657 );
4658
4659 let terminal_2_exists = thread.read_with(cx, |thread, _| {
4660 thread.terminals.contains_key(&terminal_id_2)
4661 });
4662 assert!(
4663 terminal_2_exists,
4664 "Terminal 2 (from before checkpoint) should still exist"
4665 );
4666
4667 // Verify they're still in completed state
4668 let terminal_1_completed = thread.read_with(cx, |thread, _cx| {
4669 let terminal_entity = thread.terminals.get(&terminal_id_1).unwrap();
4670 terminal_entity.read_with(cx, |term, _cx| term.output().is_some())
4671 });
4672 assert!(terminal_1_completed, "Terminal 1 should still be completed");
4673
4674 let terminal_2_completed = thread.read_with(cx, |thread, _cx| {
4675 let terminal_entity = thread.terminals.get(&terminal_id_2).unwrap();
4676 terminal_entity.read_with(cx, |term, _cx| term.output().is_some())
4677 });
4678 assert!(terminal_2_completed, "Terminal 2 should still be completed");
4679
4680 // Verify the running terminal (created after checkpoint) was removed
4681 let terminal_3_exists =
4682 thread.read_with(cx, |thread, _| thread.terminals.contains_key(&terminal_id));
4683 assert!(
4684 !terminal_3_exists,
4685 "Terminal 3 (created after checkpoint) should have been removed"
4686 );
4687
4688 // Verify total count is 2 (the two from before the checkpoint)
4689 let terminal_count = thread.read_with(cx, |thread, _| thread.terminals.len());
4690 assert_eq!(
4691 terminal_count, 2,
4692 "Should have exactly 2 terminals (the completed ones from before checkpoint)"
4693 );
4694 }
4695
4696 /// Tests that update_last_checkpoint correctly updates the original message's checkpoint
4697 /// even when a new user message is added while the async checkpoint comparison is in progress.
4698 ///
4699 /// This is a regression test for a bug where update_last_checkpoint would fail with
4700 /// "no checkpoint" if a new user message (without a checkpoint) was added between when
4701 /// update_last_checkpoint started and when its async closure ran.
4702 #[gpui::test]
4703 async fn test_update_last_checkpoint_with_new_message_added(cx: &mut TestAppContext) {
4704 init_test(cx);
4705
4706 let fs = FakeFs::new(cx.executor());
4707 fs.insert_tree(path!("/test"), json!({".git": {}, "file.txt": "content"}))
4708 .await;
4709 let project = Project::test(fs.clone(), [Path::new(path!("/test"))], cx).await;
4710
4711 let handler_done = Arc::new(AtomicBool::new(false));
4712 let handler_done_clone = handler_done.clone();
4713 let connection = Rc::new(FakeAgentConnection::new().on_user_message(
4714 move |_, _thread, _cx| {
4715 handler_done_clone.store(true, SeqCst);
4716 async move { Ok(acp::PromptResponse::new(acp::StopReason::EndTurn)) }.boxed_local()
4717 },
4718 ));
4719
4720 let thread = cx
4721 .update(|cx| {
4722 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4723 })
4724 .await
4725 .unwrap();
4726
4727 let send_future = thread.update(cx, |thread, cx| thread.send_raw("First message", cx));
4728 let send_task = cx.background_executor.spawn(send_future);
4729
4730 // Tick until handler completes, then a few more to let update_last_checkpoint start
4731 while !handler_done.load(SeqCst) {
4732 cx.executor().tick();
4733 }
4734 for _ in 0..5 {
4735 cx.executor().tick();
4736 }
4737
4738 thread.update(cx, |thread, cx| {
4739 thread.push_entry(
4740 AgentThreadEntry::UserMessage(UserMessage {
4741 id: Some(UserMessageId::new()),
4742 content: ContentBlock::Empty,
4743 chunks: vec!["Injected message (no checkpoint)".into()],
4744 checkpoint: None,
4745 indented: false,
4746 }),
4747 cx,
4748 );
4749 });
4750
4751 cx.run_until_parked();
4752 let result = send_task.await;
4753
4754 assert!(
4755 result.is_ok(),
4756 "send should succeed even when new message added during update_last_checkpoint: {:?}",
4757 result.err()
4758 );
4759 }
4760
4761 /// Tests that when a follow-up message is sent during generation,
4762 /// the first turn completing does NOT clear `running_turn` because
4763 /// it now belongs to the second turn.
4764 #[gpui::test]
4765 async fn test_follow_up_message_during_generation_does_not_clear_turn(cx: &mut TestAppContext) {
4766 init_test(cx);
4767
4768 let fs = FakeFs::new(cx.executor());
4769 let project = Project::test(fs, [], cx).await;
4770
4771 // First handler waits for this signal before completing
4772 let (first_complete_tx, first_complete_rx) = futures::channel::oneshot::channel::<()>();
4773 let first_complete_rx = RefCell::new(Some(first_complete_rx));
4774
4775 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
4776 move |params, _thread, _cx| {
4777 let first_complete_rx = first_complete_rx.borrow_mut().take();
4778 let is_first = params
4779 .prompt
4780 .iter()
4781 .any(|c| matches!(c, acp::ContentBlock::Text(t) if t.text.contains("first")));
4782
4783 async move {
4784 if is_first {
4785 // First handler waits until signaled
4786 if let Some(rx) = first_complete_rx {
4787 rx.await.ok();
4788 }
4789 }
4790 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
4791 }
4792 .boxed_local()
4793 }
4794 }));
4795
4796 let thread = cx
4797 .update(|cx| {
4798 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4799 })
4800 .await
4801 .unwrap();
4802
4803 // Send first message (turn_id=1) - handler will block
4804 let first_request = thread.update(cx, |thread, cx| thread.send_raw("first", cx));
4805 assert_eq!(thread.read_with(cx, |t, _| t.turn_id), 1);
4806
4807 // Send second message (turn_id=2) while first is still blocked
4808 // This calls cancel() which takes turn 1's running_turn and sets turn 2's
4809 let second_request = thread.update(cx, |thread, cx| thread.send_raw("second", cx));
4810 assert_eq!(thread.read_with(cx, |t, _| t.turn_id), 2);
4811
4812 let running_turn_after_second_send =
4813 thread.read_with(cx, |thread, _| thread.running_turn.as_ref().map(|t| t.id));
4814 assert_eq!(
4815 running_turn_after_second_send,
4816 Some(2),
4817 "running_turn should be set to turn 2 after sending second message"
4818 );
4819
4820 // Now signal first handler to complete
4821 first_complete_tx.send(()).ok();
4822
4823 // First request completes - should NOT clear running_turn
4824 // because running_turn now belongs to turn 2
4825 first_request.await.unwrap();
4826
4827 let running_turn_after_first =
4828 thread.read_with(cx, |thread, _| thread.running_turn.as_ref().map(|t| t.id));
4829 assert_eq!(
4830 running_turn_after_first,
4831 Some(2),
4832 "first turn completing should not clear running_turn (belongs to turn 2)"
4833 );
4834
4835 // Second request completes - SHOULD clear running_turn
4836 second_request.await.unwrap();
4837
4838 let running_turn_after_second =
4839 thread.read_with(cx, |thread, _| thread.running_turn.is_some());
4840 assert!(
4841 !running_turn_after_second,
4842 "second turn completing should clear running_turn"
4843 );
4844 }
4845
4846 #[gpui::test]
4847 async fn test_send_returns_cancelled_response_and_marks_tools_as_cancelled(
4848 cx: &mut TestAppContext,
4849 ) {
4850 init_test(cx);
4851
4852 let fs = FakeFs::new(cx.executor());
4853 let project = Project::test(fs, [], cx).await;
4854
4855 let connection = Rc::new(FakeAgentConnection::new().on_user_message(
4856 move |_params, thread, mut cx| {
4857 async move {
4858 thread
4859 .update(&mut cx, |thread, cx| {
4860 thread.handle_session_update(
4861 acp::SessionUpdate::ToolCall(
4862 acp::ToolCall::new(
4863 acp::ToolCallId::new("test-tool"),
4864 "Test Tool",
4865 )
4866 .kind(acp::ToolKind::Fetch)
4867 .status(acp::ToolCallStatus::InProgress),
4868 ),
4869 cx,
4870 )
4871 })
4872 .unwrap()
4873 .unwrap();
4874
4875 Ok(acp::PromptResponse::new(acp::StopReason::Cancelled))
4876 }
4877 .boxed_local()
4878 },
4879 ));
4880
4881 let thread = cx
4882 .update(|cx| {
4883 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4884 })
4885 .await
4886 .unwrap();
4887
4888 let response = thread
4889 .update(cx, |thread, cx| thread.send_raw("test message", cx))
4890 .await;
4891
4892 let response = response
4893 .expect("send should succeed")
4894 .expect("should have response");
4895 assert_eq!(
4896 response.stop_reason,
4897 acp::StopReason::Cancelled,
4898 "response should have Cancelled stop_reason"
4899 );
4900
4901 thread.read_with(cx, |thread, _| {
4902 let tool_entry = thread
4903 .entries
4904 .iter()
4905 .find_map(|e| {
4906 if let AgentThreadEntry::ToolCall(call) = e {
4907 Some(call)
4908 } else {
4909 None
4910 }
4911 })
4912 .expect("should have tool call entry");
4913
4914 assert!(
4915 matches!(tool_entry.status, ToolCallStatus::Canceled),
4916 "tool should be marked as Canceled when response is Cancelled, got {:?}",
4917 tool_entry.status
4918 );
4919 });
4920 }
4921
4922 #[gpui::test]
4923 async fn test_provisional_title_replaced_by_real_title(cx: &mut TestAppContext) {
4924 init_test(cx);
4925
4926 let fs = FakeFs::new(cx.executor());
4927 let project = Project::test(fs, [], cx).await;
4928 let connection = Rc::new(FakeAgentConnection::new());
4929 let set_title_calls = connection.set_title_calls.clone();
4930
4931 let thread = cx
4932 .update(|cx| {
4933 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4934 })
4935 .await
4936 .unwrap();
4937
4938 // Initial title is the default.
4939 thread.read_with(cx, |thread, _| {
4940 assert_eq!(thread.title().as_ref(), "Test");
4941 });
4942
4943 // Setting a provisional title updates the display title.
4944 thread.update(cx, |thread, cx| {
4945 thread.set_provisional_title("Hello, can you help…".into(), cx);
4946 });
4947 thread.read_with(cx, |thread, _| {
4948 assert_eq!(thread.title().as_ref(), "Hello, can you help…");
4949 });
4950
4951 // The provisional title should NOT have propagated to the connection.
4952 assert_eq!(
4953 set_title_calls.borrow().len(),
4954 0,
4955 "provisional title should not propagate to the connection"
4956 );
4957
4958 // When the real title arrives via set_title, it replaces the
4959 // provisional title and propagates to the connection.
4960 let task = thread.update(cx, |thread, cx| {
4961 thread.set_title("Helping with Rust question".into(), cx)
4962 });
4963 task.await.expect("set_title should succeed");
4964 thread.read_with(cx, |thread, _| {
4965 assert_eq!(thread.title().as_ref(), "Helping with Rust question");
4966 });
4967 assert_eq!(
4968 set_title_calls.borrow().as_slice(),
4969 &[SharedString::from("Helping with Rust question")],
4970 "real title should propagate to the connection"
4971 );
4972 }
4973}