1mod connection;
2mod diff;
3mod mention;
4mod terminal;
5use action_log::{ActionLog, ActionLogTelemetry};
6use agent_client_protocol::{self as acp};
7use anyhow::{Context as _, Result, anyhow};
8use collections::HashSet;
9pub use connection::*;
10pub use diff::*;
11use feature_flags::{AcpBetaFeatureFlag, FeatureFlagAppExt as _};
12use futures::{FutureExt, channel::oneshot, future::BoxFuture};
13use gpui::{AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Task, WeakEntity};
14use itertools::Itertools;
15use language::language_settings::FormatOnSave;
16use language::{Anchor, Buffer, BufferSnapshot, LanguageRegistry, Point, ToPoint, text_diff};
17use markdown::Markdown;
18pub use mention::*;
19use project::lsp_store::{FormatTrigger, LspFormatTarget};
20use project::{AgentLocation, Project, git_store::GitStoreCheckpoint};
21use serde::{Deserialize, Serialize};
22use serde_json::to_string_pretty;
23use std::collections::HashMap;
24use std::error::Error;
25use std::fmt::{Formatter, Write};
26use std::ops::Range;
27use std::process::ExitStatus;
28use std::rc::Rc;
29use std::time::{Duration, Instant};
30use std::{fmt::Display, mem, path::PathBuf, sync::Arc};
31use task::{Shell, ShellBuilder};
32pub use terminal::*;
33use text::Bias;
34use ui::App;
35use util::markdown::MarkdownEscaped;
36use util::path_list::PathList;
37use util::{ResultExt, get_default_system_shell_preferring_bash, paths::PathStyle};
38use uuid::Uuid;
39
40/// Returned when the model stops because it exhausted its output token budget.
41#[derive(Debug)]
42pub struct MaxOutputTokensError;
43
44impl std::fmt::Display for MaxOutputTokensError {
45 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
46 write!(f, "output token limit reached")
47 }
48}
49
50impl std::error::Error for MaxOutputTokensError {}
51
52/// Key used in ACP ToolCall meta to store the tool's programmatic name.
53/// This is a workaround since ACP's ToolCall doesn't have a dedicated name field.
54pub const TOOL_NAME_META_KEY: &str = "tool_name";
55
56/// Helper to extract tool name from ACP meta
57pub fn tool_name_from_meta(meta: &Option<acp::Meta>) -> Option<SharedString> {
58 meta.as_ref()
59 .and_then(|m| m.get(TOOL_NAME_META_KEY))
60 .and_then(|v| v.as_str())
61 .map(|s| SharedString::from(s.to_owned()))
62}
63
64/// Helper to create meta with tool name
65pub fn meta_with_tool_name(tool_name: &str) -> acp::Meta {
66 acp::Meta::from_iter([(TOOL_NAME_META_KEY.into(), tool_name.into())])
67}
68
69/// Key used in ACP ToolCall meta to store the session id and message indexes
70pub const SUBAGENT_SESSION_INFO_META_KEY: &str = "subagent_session_info";
71
72#[derive(Clone, Debug, Deserialize, Serialize)]
73pub struct SubagentSessionInfo {
74 /// The session id of the subagent sessiont that was spawned
75 pub session_id: acp::SessionId,
76 /// The index of the message of the start of the "turn" run by this tool call
77 pub message_start_index: usize,
78 /// The index of the output of the message that the subagent has returned
79 #[serde(skip_serializing_if = "Option::is_none")]
80 pub message_end_index: Option<usize>,
81}
82
83/// Helper to extract subagent session id from ACP meta
84pub fn subagent_session_info_from_meta(meta: &Option<acp::Meta>) -> Option<SubagentSessionInfo> {
85 meta.as_ref()
86 .and_then(|m| m.get(SUBAGENT_SESSION_INFO_META_KEY))
87 .and_then(|v| serde_json::from_value(v.clone()).ok())
88}
89
90#[derive(Debug)]
91pub struct UserMessage {
92 pub id: Option<UserMessageId>,
93 pub content: ContentBlock,
94 pub chunks: Vec<acp::ContentBlock>,
95 pub checkpoint: Option<Checkpoint>,
96 pub indented: bool,
97}
98
99#[derive(Debug)]
100pub struct Checkpoint {
101 git_checkpoint: GitStoreCheckpoint,
102 pub show: bool,
103}
104
105impl UserMessage {
106 fn to_markdown(&self, cx: &App) -> String {
107 let mut markdown = String::new();
108 if self
109 .checkpoint
110 .as_ref()
111 .is_some_and(|checkpoint| checkpoint.show)
112 {
113 writeln!(markdown, "## User (checkpoint)").unwrap();
114 } else {
115 writeln!(markdown, "## User").unwrap();
116 }
117 writeln!(markdown).unwrap();
118 writeln!(markdown, "{}", self.content.to_markdown(cx)).unwrap();
119 writeln!(markdown).unwrap();
120 markdown
121 }
122}
123
124#[derive(Debug, PartialEq)]
125pub struct AssistantMessage {
126 pub chunks: Vec<AssistantMessageChunk>,
127 pub indented: bool,
128 pub is_subagent_output: bool,
129}
130
131impl AssistantMessage {
132 pub fn to_markdown(&self, cx: &App) -> String {
133 format!(
134 "## Assistant\n\n{}\n\n",
135 self.chunks
136 .iter()
137 .map(|chunk| chunk.to_markdown(cx))
138 .join("\n\n")
139 )
140 }
141}
142
143#[derive(Debug, PartialEq)]
144pub enum AssistantMessageChunk {
145 Message { block: ContentBlock },
146 Thought { block: ContentBlock },
147}
148
149impl AssistantMessageChunk {
150 pub fn from_str(
151 chunk: &str,
152 language_registry: &Arc<LanguageRegistry>,
153 path_style: PathStyle,
154 cx: &mut App,
155 ) -> Self {
156 Self::Message {
157 block: ContentBlock::new(chunk.into(), language_registry, path_style, cx),
158 }
159 }
160
161 fn to_markdown(&self, cx: &App) -> String {
162 match self {
163 Self::Message { block } => block.to_markdown(cx).to_string(),
164 Self::Thought { block } => {
165 format!("<thinking>\n{}\n</thinking>", block.to_markdown(cx))
166 }
167 }
168 }
169}
170
171#[derive(Debug)]
172pub enum AgentThreadEntry {
173 UserMessage(UserMessage),
174 AssistantMessage(AssistantMessage),
175 ToolCall(ToolCall),
176 CompletedPlan(Vec<PlanEntry>),
177}
178
179impl AgentThreadEntry {
180 pub fn is_indented(&self) -> bool {
181 match self {
182 Self::UserMessage(message) => message.indented,
183 Self::AssistantMessage(message) => message.indented,
184 Self::ToolCall(_) => false,
185 Self::CompletedPlan(_) => false,
186 }
187 }
188
189 pub fn to_markdown(&self, cx: &App) -> String {
190 match self {
191 Self::UserMessage(message) => message.to_markdown(cx),
192 Self::AssistantMessage(message) => message.to_markdown(cx),
193 Self::ToolCall(tool_call) => tool_call.to_markdown(cx),
194 Self::CompletedPlan(entries) => {
195 let mut md = String::from("## Plan\n\n");
196 for entry in entries {
197 let source = entry.content.read(cx).source().to_string();
198 md.push_str(&format!("- [x] {}\n", source));
199 }
200 md
201 }
202 }
203 }
204
205 pub fn user_message(&self) -> Option<&UserMessage> {
206 if let AgentThreadEntry::UserMessage(message) = self {
207 Some(message)
208 } else {
209 None
210 }
211 }
212
213 pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
214 if let AgentThreadEntry::ToolCall(call) = self {
215 itertools::Either::Left(call.diffs())
216 } else {
217 itertools::Either::Right(std::iter::empty())
218 }
219 }
220
221 pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
222 if let AgentThreadEntry::ToolCall(call) = self {
223 itertools::Either::Left(call.terminals())
224 } else {
225 itertools::Either::Right(std::iter::empty())
226 }
227 }
228
229 pub fn location(&self, ix: usize) -> Option<(acp::ToolCallLocation, AgentLocation)> {
230 if let AgentThreadEntry::ToolCall(ToolCall {
231 locations,
232 resolved_locations,
233 ..
234 }) = self
235 {
236 Some((
237 locations.get(ix)?.clone(),
238 resolved_locations.get(ix)?.clone()?,
239 ))
240 } else {
241 None
242 }
243 }
244}
245
246#[derive(Debug)]
247pub struct ToolCall {
248 pub id: acp::ToolCallId,
249 pub label: Entity<Markdown>,
250 pub kind: acp::ToolKind,
251 pub content: Vec<ToolCallContent>,
252 pub status: ToolCallStatus,
253 pub locations: Vec<acp::ToolCallLocation>,
254 pub resolved_locations: Vec<Option<AgentLocation>>,
255 pub raw_input: Option<serde_json::Value>,
256 pub raw_input_markdown: Option<Entity<Markdown>>,
257 pub raw_output: Option<serde_json::Value>,
258 pub tool_name: Option<SharedString>,
259 pub subagent_session_info: Option<SubagentSessionInfo>,
260}
261
262impl ToolCall {
263 fn from_acp(
264 tool_call: acp::ToolCall,
265 status: ToolCallStatus,
266 language_registry: Arc<LanguageRegistry>,
267 path_style: PathStyle,
268 terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
269 cx: &mut App,
270 ) -> Result<Self> {
271 let title = if tool_call.kind == acp::ToolKind::Execute {
272 tool_call.title
273 } else if tool_call.kind == acp::ToolKind::Edit {
274 MarkdownEscaped(tool_call.title.as_str()).to_string()
275 } else if let Some((first_line, _)) = tool_call.title.split_once("\n") {
276 first_line.to_owned() + "…"
277 } else {
278 tool_call.title
279 };
280 let mut content = Vec::with_capacity(tool_call.content.len());
281 for item in tool_call.content {
282 if let Some(item) = ToolCallContent::from_acp(
283 item,
284 language_registry.clone(),
285 path_style,
286 terminals,
287 cx,
288 )? {
289 content.push(item);
290 }
291 }
292
293 let raw_input_markdown = tool_call
294 .raw_input
295 .as_ref()
296 .and_then(|input| markdown_for_raw_output(input, &language_registry, cx));
297
298 let tool_name = tool_name_from_meta(&tool_call.meta);
299
300 let subagent_session_info = subagent_session_info_from_meta(&tool_call.meta);
301
302 let result = Self {
303 id: tool_call.tool_call_id,
304 label: cx
305 .new(|cx| Markdown::new(title.into(), Some(language_registry.clone()), None, cx)),
306 kind: tool_call.kind,
307 content,
308 locations: tool_call.locations,
309 resolved_locations: Vec::default(),
310 status,
311 raw_input: tool_call.raw_input,
312 raw_input_markdown,
313 raw_output: tool_call.raw_output,
314 tool_name,
315 subagent_session_info,
316 };
317 Ok(result)
318 }
319
320 fn update_fields(
321 &mut self,
322 fields: acp::ToolCallUpdateFields,
323 meta: Option<acp::Meta>,
324 language_registry: Arc<LanguageRegistry>,
325 path_style: PathStyle,
326 terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
327 cx: &mut App,
328 ) -> Result<()> {
329 let acp::ToolCallUpdateFields {
330 kind,
331 status,
332 title,
333 content,
334 locations,
335 raw_input,
336 raw_output,
337 ..
338 } = fields;
339
340 if let Some(kind) = kind {
341 self.kind = kind;
342 }
343
344 if let Some(status) = status {
345 self.status = status.into();
346 }
347
348 if let Some(subagent_session_info) = subagent_session_info_from_meta(&meta) {
349 self.subagent_session_info = Some(subagent_session_info);
350 }
351
352 if let Some(title) = title {
353 if self.kind == acp::ToolKind::Execute {
354 for terminal in self.terminals() {
355 terminal.update(cx, |terminal, cx| {
356 terminal.update_command_label(&title, cx);
357 });
358 }
359 }
360 self.label.update(cx, |label, cx| {
361 if self.kind == acp::ToolKind::Execute {
362 label.replace(title, cx);
363 } else if self.kind == acp::ToolKind::Edit {
364 label.replace(MarkdownEscaped(&title).to_string(), cx)
365 } else if let Some((first_line, _)) = title.split_once("\n") {
366 label.replace(first_line.to_owned() + "…", cx);
367 } else {
368 label.replace(title, cx);
369 }
370 });
371 }
372
373 if let Some(content) = content {
374 let mut new_content_len = content.len();
375 let mut content = content.into_iter();
376
377 // Reuse existing content if we can
378 for (old, new) in self.content.iter_mut().zip(content.by_ref()) {
379 let valid_content =
380 old.update_from_acp(new, language_registry.clone(), path_style, terminals, cx)?;
381 if !valid_content {
382 new_content_len -= 1;
383 }
384 }
385 for new in content {
386 if let Some(new) = ToolCallContent::from_acp(
387 new,
388 language_registry.clone(),
389 path_style,
390 terminals,
391 cx,
392 )? {
393 self.content.push(new);
394 } else {
395 new_content_len -= 1;
396 }
397 }
398 self.content.truncate(new_content_len);
399 }
400
401 if let Some(locations) = locations {
402 self.locations = locations;
403 }
404
405 if let Some(raw_input) = raw_input {
406 self.raw_input_markdown = markdown_for_raw_output(&raw_input, &language_registry, cx);
407 self.raw_input = Some(raw_input);
408 }
409
410 if let Some(raw_output) = raw_output {
411 if self.content.is_empty()
412 && let Some(markdown) = markdown_for_raw_output(&raw_output, &language_registry, cx)
413 {
414 self.content
415 .push(ToolCallContent::ContentBlock(ContentBlock::Markdown {
416 markdown,
417 }));
418 }
419 self.raw_output = Some(raw_output);
420 }
421 Ok(())
422 }
423
424 pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
425 self.content.iter().filter_map(|content| match content {
426 ToolCallContent::Diff(diff) => Some(diff),
427 ToolCallContent::ContentBlock(_) => None,
428 ToolCallContent::Terminal(_) => None,
429 })
430 }
431
432 pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
433 self.content.iter().filter_map(|content| match content {
434 ToolCallContent::Terminal(terminal) => Some(terminal),
435 ToolCallContent::ContentBlock(_) => None,
436 ToolCallContent::Diff(_) => None,
437 })
438 }
439
440 pub fn is_subagent(&self) -> bool {
441 self.tool_name.as_ref().is_some_and(|s| s == "spawn_agent")
442 || self.subagent_session_info.is_some()
443 }
444
445 pub fn to_markdown(&self, cx: &App) -> String {
446 let mut markdown = format!(
447 "**Tool Call: {}**\nStatus: {}\n\n",
448 self.label.read(cx).source(),
449 self.status
450 );
451 for content in &self.content {
452 markdown.push_str(content.to_markdown(cx).as_str());
453 markdown.push_str("\n\n");
454 }
455 markdown
456 }
457
458 async fn resolve_location(
459 location: acp::ToolCallLocation,
460 project: WeakEntity<Project>,
461 cx: &mut AsyncApp,
462 ) -> Option<ResolvedLocation> {
463 let buffer = project
464 .update(cx, |project, cx| {
465 project
466 .project_path_for_absolute_path(&location.path, cx)
467 .map(|path| project.open_buffer(path, cx))
468 })
469 .ok()??;
470 let buffer = buffer.await.log_err()?;
471 let position = buffer.update(cx, |buffer, _| {
472 let snapshot = buffer.snapshot();
473 if let Some(row) = location.line {
474 let column = snapshot.indent_size_for_line(row).len;
475 let point = snapshot.clip_point(Point::new(row, column), Bias::Left);
476 snapshot.anchor_before(point)
477 } else {
478 Anchor::min_for_buffer(snapshot.remote_id())
479 }
480 });
481
482 Some(ResolvedLocation { buffer, position })
483 }
484
485 fn resolve_locations(
486 &self,
487 project: Entity<Project>,
488 cx: &mut App,
489 ) -> Task<Vec<Option<ResolvedLocation>>> {
490 let locations = self.locations.clone();
491 project.update(cx, |_, cx| {
492 cx.spawn(async move |project, cx| {
493 let mut new_locations = Vec::new();
494 for location in locations {
495 new_locations.push(Self::resolve_location(location, project.clone(), cx).await);
496 }
497 new_locations
498 })
499 })
500 }
501}
502
503// Separate so we can hold a strong reference to the buffer
504// for saving on the thread
505#[derive(Clone, Debug, PartialEq, Eq)]
506struct ResolvedLocation {
507 buffer: Entity<Buffer>,
508 position: Anchor,
509}
510
511impl From<&ResolvedLocation> for AgentLocation {
512 fn from(value: &ResolvedLocation) -> Self {
513 Self {
514 buffer: value.buffer.downgrade(),
515 position: value.position,
516 }
517 }
518}
519
520#[derive(Debug, Clone)]
521pub enum SelectedPermissionParams {
522 Terminal { patterns: Vec<String> },
523}
524
525#[derive(Debug)]
526pub struct SelectedPermissionOutcome {
527 pub option_id: acp::PermissionOptionId,
528 pub option_kind: acp::PermissionOptionKind,
529 pub params: Option<SelectedPermissionParams>,
530}
531
532impl SelectedPermissionOutcome {
533 pub fn new(option_id: acp::PermissionOptionId, option_kind: acp::PermissionOptionKind) -> Self {
534 Self {
535 option_id,
536 option_kind,
537 params: None,
538 }
539 }
540
541 pub fn params(mut self, params: Option<SelectedPermissionParams>) -> Self {
542 self.params = params;
543 self
544 }
545}
546
547impl From<SelectedPermissionOutcome> for acp::SelectedPermissionOutcome {
548 fn from(value: SelectedPermissionOutcome) -> Self {
549 Self::new(value.option_id)
550 }
551}
552
553#[derive(Debug)]
554pub enum RequestPermissionOutcome {
555 Cancelled,
556 Selected(SelectedPermissionOutcome),
557}
558
559impl From<RequestPermissionOutcome> for acp::RequestPermissionOutcome {
560 fn from(value: RequestPermissionOutcome) -> Self {
561 match value {
562 RequestPermissionOutcome::Cancelled => Self::Cancelled,
563 RequestPermissionOutcome::Selected(outcome) => Self::Selected(outcome.into()),
564 }
565 }
566}
567
568#[derive(Debug)]
569pub enum ToolCallStatus {
570 /// The tool call hasn't started running yet, but we start showing it to
571 /// the user.
572 Pending,
573 /// The tool call is waiting for confirmation from the user.
574 WaitingForConfirmation {
575 options: PermissionOptions,
576 respond_tx: oneshot::Sender<SelectedPermissionOutcome>,
577 },
578 /// The tool call is currently running.
579 InProgress,
580 /// The tool call completed successfully.
581 Completed,
582 /// The tool call failed.
583 Failed,
584 /// The user rejected the tool call.
585 Rejected,
586 /// The user canceled generation so the tool call was canceled.
587 Canceled,
588}
589
590impl From<acp::ToolCallStatus> for ToolCallStatus {
591 fn from(status: acp::ToolCallStatus) -> Self {
592 match status {
593 acp::ToolCallStatus::Pending => Self::Pending,
594 acp::ToolCallStatus::InProgress => Self::InProgress,
595 acp::ToolCallStatus::Completed => Self::Completed,
596 acp::ToolCallStatus::Failed => Self::Failed,
597 _ => Self::Pending,
598 }
599 }
600}
601
602impl Display for ToolCallStatus {
603 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
604 write!(
605 f,
606 "{}",
607 match self {
608 ToolCallStatus::Pending => "Pending",
609 ToolCallStatus::WaitingForConfirmation { .. } => "Waiting for confirmation",
610 ToolCallStatus::InProgress => "In Progress",
611 ToolCallStatus::Completed => "Completed",
612 ToolCallStatus::Failed => "Failed",
613 ToolCallStatus::Rejected => "Rejected",
614 ToolCallStatus::Canceled => "Canceled",
615 }
616 )
617 }
618}
619
620#[derive(Debug, PartialEq, Clone)]
621pub enum ContentBlock {
622 Empty,
623 Markdown { markdown: Entity<Markdown> },
624 ResourceLink { resource_link: acp::ResourceLink },
625 Image { image: Arc<gpui::Image> },
626}
627
628impl ContentBlock {
629 pub fn new(
630 block: acp::ContentBlock,
631 language_registry: &Arc<LanguageRegistry>,
632 path_style: PathStyle,
633 cx: &mut App,
634 ) -> Self {
635 let mut this = Self::Empty;
636 this.append(block, language_registry, path_style, cx);
637 this
638 }
639
640 pub fn new_combined(
641 blocks: impl IntoIterator<Item = acp::ContentBlock>,
642 language_registry: Arc<LanguageRegistry>,
643 path_style: PathStyle,
644 cx: &mut App,
645 ) -> Self {
646 let mut this = Self::Empty;
647 for block in blocks {
648 this.append(block, &language_registry, path_style, cx);
649 }
650 this
651 }
652
653 pub fn append(
654 &mut self,
655 block: acp::ContentBlock,
656 language_registry: &Arc<LanguageRegistry>,
657 path_style: PathStyle,
658 cx: &mut App,
659 ) {
660 match (&mut *self, &block) {
661 (ContentBlock::Empty, acp::ContentBlock::ResourceLink(resource_link)) => {
662 *self = ContentBlock::ResourceLink {
663 resource_link: resource_link.clone(),
664 };
665 }
666 (ContentBlock::Empty, acp::ContentBlock::Image(image_content)) => {
667 if let Some(image) = Self::decode_image(image_content) {
668 *self = ContentBlock::Image { image };
669 } else {
670 let new_content = Self::image_md(image_content);
671 *self = Self::create_markdown_block(new_content, language_registry, cx);
672 }
673 }
674 (ContentBlock::Empty, _) => {
675 let new_content = Self::block_string_contents(&block, path_style);
676 *self = Self::create_markdown_block(new_content, language_registry, cx);
677 }
678 (ContentBlock::Markdown { markdown }, _) => {
679 let new_content = Self::block_string_contents(&block, path_style);
680 markdown.update(cx, |markdown, cx| markdown.append(&new_content, cx));
681 }
682 (ContentBlock::ResourceLink { resource_link }, _) => {
683 let existing_content = Self::resource_link_md(&resource_link.uri, path_style);
684 let new_content = Self::block_string_contents(&block, path_style);
685 let combined = format!("{}\n{}", existing_content, new_content);
686 *self = Self::create_markdown_block(combined, language_registry, cx);
687 }
688 (ContentBlock::Image { .. }, _) => {
689 let new_content = Self::block_string_contents(&block, path_style);
690 let combined = format!("`Image`\n{}", new_content);
691 *self = Self::create_markdown_block(combined, language_registry, cx);
692 }
693 }
694 }
695
696 fn decode_image(image_content: &acp::ImageContent) -> Option<Arc<gpui::Image>> {
697 use base64::Engine as _;
698
699 let bytes = base64::engine::general_purpose::STANDARD
700 .decode(image_content.data.as_bytes())
701 .ok()?;
702 let format = gpui::ImageFormat::from_mime_type(&image_content.mime_type)?;
703 Some(Arc::new(gpui::Image::from_bytes(format, bytes)))
704 }
705
706 fn create_markdown_block(
707 content: String,
708 language_registry: &Arc<LanguageRegistry>,
709 cx: &mut App,
710 ) -> ContentBlock {
711 ContentBlock::Markdown {
712 markdown: cx
713 .new(|cx| Markdown::new(content.into(), Some(language_registry.clone()), None, cx)),
714 }
715 }
716
717 fn block_string_contents(block: &acp::ContentBlock, path_style: PathStyle) -> String {
718 match block {
719 acp::ContentBlock::Text(text_content) => text_content.text.clone(),
720 acp::ContentBlock::ResourceLink(resource_link) => {
721 Self::resource_link_md(&resource_link.uri, path_style)
722 }
723 acp::ContentBlock::Resource(acp::EmbeddedResource {
724 resource:
725 acp::EmbeddedResourceResource::TextResourceContents(acp::TextResourceContents {
726 uri,
727 ..
728 }),
729 ..
730 }) => Self::resource_link_md(uri, path_style),
731 acp::ContentBlock::Image(image) => Self::image_md(image),
732 _ => String::new(),
733 }
734 }
735
736 fn resource_link_md(uri: &str, path_style: PathStyle) -> String {
737 if let Some(uri) = MentionUri::parse(uri, path_style).log_err() {
738 uri.as_link().to_string()
739 } else {
740 uri.to_string()
741 }
742 }
743
744 fn image_md(_image: &acp::ImageContent) -> String {
745 "`Image`".into()
746 }
747
748 pub fn to_markdown<'a>(&'a self, cx: &'a App) -> &'a str {
749 match self {
750 ContentBlock::Empty => "",
751 ContentBlock::Markdown { markdown } => markdown.read(cx).source(),
752 ContentBlock::ResourceLink { resource_link } => &resource_link.uri,
753 ContentBlock::Image { .. } => "`Image`",
754 }
755 }
756
757 pub fn markdown(&self) -> Option<&Entity<Markdown>> {
758 match self {
759 ContentBlock::Empty => None,
760 ContentBlock::Markdown { markdown } => Some(markdown),
761 ContentBlock::ResourceLink { .. } => None,
762 ContentBlock::Image { .. } => None,
763 }
764 }
765
766 pub fn resource_link(&self) -> Option<&acp::ResourceLink> {
767 match self {
768 ContentBlock::ResourceLink { resource_link } => Some(resource_link),
769 _ => None,
770 }
771 }
772
773 pub fn image(&self) -> Option<&Arc<gpui::Image>> {
774 match self {
775 ContentBlock::Image { image } => Some(image),
776 _ => None,
777 }
778 }
779}
780
781#[derive(Debug)]
782pub enum ToolCallContent {
783 ContentBlock(ContentBlock),
784 Diff(Entity<Diff>),
785 Terminal(Entity<Terminal>),
786}
787
788impl ToolCallContent {
789 pub fn from_acp(
790 content: acp::ToolCallContent,
791 language_registry: Arc<LanguageRegistry>,
792 path_style: PathStyle,
793 terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
794 cx: &mut App,
795 ) -> Result<Option<Self>> {
796 match content {
797 acp::ToolCallContent::Content(acp::Content { content, .. }) => {
798 Ok(Some(Self::ContentBlock(ContentBlock::new(
799 content,
800 &language_registry,
801 path_style,
802 cx,
803 ))))
804 }
805 acp::ToolCallContent::Diff(diff) => Ok(Some(Self::Diff(cx.new(|cx| {
806 Diff::finalized(
807 diff.path.to_string_lossy().into_owned(),
808 diff.old_text,
809 diff.new_text,
810 language_registry,
811 cx,
812 )
813 })))),
814 acp::ToolCallContent::Terminal(acp::Terminal { terminal_id, .. }) => terminals
815 .get(&terminal_id)
816 .cloned()
817 .map(|terminal| Some(Self::Terminal(terminal)))
818 .ok_or_else(|| anyhow::anyhow!("Terminal with id `{}` not found", terminal_id)),
819 _ => Ok(None),
820 }
821 }
822
823 pub fn update_from_acp(
824 &mut self,
825 new: acp::ToolCallContent,
826 language_registry: Arc<LanguageRegistry>,
827 path_style: PathStyle,
828 terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
829 cx: &mut App,
830 ) -> Result<bool> {
831 let needs_update = match (&self, &new) {
832 (Self::Diff(old_diff), acp::ToolCallContent::Diff(new_diff)) => {
833 old_diff.read(cx).needs_update(
834 new_diff.old_text.as_deref().unwrap_or(""),
835 &new_diff.new_text,
836 cx,
837 )
838 }
839 _ => true,
840 };
841
842 if let Some(update) = Self::from_acp(new, language_registry, path_style, terminals, cx)? {
843 if needs_update {
844 *self = update;
845 }
846 Ok(true)
847 } else {
848 Ok(false)
849 }
850 }
851
852 pub fn to_markdown(&self, cx: &App) -> String {
853 match self {
854 Self::ContentBlock(content) => content.to_markdown(cx).to_string(),
855 Self::Diff(diff) => diff.read(cx).to_markdown(cx),
856 Self::Terminal(terminal) => terminal.read(cx).to_markdown(cx),
857 }
858 }
859
860 pub fn image(&self) -> Option<&Arc<gpui::Image>> {
861 match self {
862 Self::ContentBlock(content) => content.image(),
863 _ => None,
864 }
865 }
866}
867
868#[derive(Debug, PartialEq)]
869pub enum ToolCallUpdate {
870 UpdateFields(acp::ToolCallUpdate),
871 UpdateDiff(ToolCallUpdateDiff),
872 UpdateTerminal(ToolCallUpdateTerminal),
873}
874
875impl ToolCallUpdate {
876 fn id(&self) -> &acp::ToolCallId {
877 match self {
878 Self::UpdateFields(update) => &update.tool_call_id,
879 Self::UpdateDiff(diff) => &diff.id,
880 Self::UpdateTerminal(terminal) => &terminal.id,
881 }
882 }
883}
884
885impl From<acp::ToolCallUpdate> for ToolCallUpdate {
886 fn from(update: acp::ToolCallUpdate) -> Self {
887 Self::UpdateFields(update)
888 }
889}
890
891impl From<ToolCallUpdateDiff> for ToolCallUpdate {
892 fn from(diff: ToolCallUpdateDiff) -> Self {
893 Self::UpdateDiff(diff)
894 }
895}
896
897#[derive(Debug, PartialEq)]
898pub struct ToolCallUpdateDiff {
899 pub id: acp::ToolCallId,
900 pub diff: Entity<Diff>,
901}
902
903impl From<ToolCallUpdateTerminal> for ToolCallUpdate {
904 fn from(terminal: ToolCallUpdateTerminal) -> Self {
905 Self::UpdateTerminal(terminal)
906 }
907}
908
909#[derive(Debug, PartialEq)]
910pub struct ToolCallUpdateTerminal {
911 pub id: acp::ToolCallId,
912 pub terminal: Entity<Terminal>,
913}
914
915#[derive(Debug, Default)]
916pub struct Plan {
917 pub entries: Vec<PlanEntry>,
918}
919
920#[derive(Debug)]
921pub struct PlanStats<'a> {
922 pub in_progress_entry: Option<&'a PlanEntry>,
923 pub pending: u32,
924 pub completed: u32,
925}
926
927impl Plan {
928 pub fn is_empty(&self) -> bool {
929 self.entries.is_empty()
930 }
931
932 pub fn stats(&self) -> PlanStats<'_> {
933 let mut stats = PlanStats {
934 in_progress_entry: None,
935 pending: 0,
936 completed: 0,
937 };
938
939 for entry in &self.entries {
940 match &entry.status {
941 acp::PlanEntryStatus::Pending => {
942 stats.pending += 1;
943 }
944 acp::PlanEntryStatus::InProgress => {
945 stats.in_progress_entry = stats.in_progress_entry.or(Some(entry));
946 stats.pending += 1;
947 }
948 acp::PlanEntryStatus::Completed => {
949 stats.completed += 1;
950 }
951 _ => {}
952 }
953 }
954
955 stats
956 }
957}
958
959#[derive(Debug)]
960pub struct PlanEntry {
961 pub content: Entity<Markdown>,
962 pub priority: acp::PlanEntryPriority,
963 pub status: acp::PlanEntryStatus,
964}
965
966impl PlanEntry {
967 pub fn from_acp(entry: acp::PlanEntry, cx: &mut App) -> Self {
968 Self {
969 content: cx.new(|cx| Markdown::new(entry.content.into(), None, None, cx)),
970 priority: entry.priority,
971 status: entry.status,
972 }
973 }
974}
975
976#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
977pub struct TokenUsage {
978 pub max_tokens: u64,
979 pub used_tokens: u64,
980 pub input_tokens: u64,
981 pub output_tokens: u64,
982 pub max_output_tokens: Option<u64>,
983}
984
985#[derive(Debug, Clone)]
986pub struct SessionCost {
987 pub amount: f64,
988 pub currency: SharedString,
989}
990
991pub const TOKEN_USAGE_WARNING_THRESHOLD: f32 = 0.8;
992
993impl TokenUsage {
994 pub fn ratio(&self) -> TokenUsageRatio {
995 #[cfg(debug_assertions)]
996 let warning_threshold: f32 = std::env::var("ZED_THREAD_WARNING_THRESHOLD")
997 .unwrap_or(TOKEN_USAGE_WARNING_THRESHOLD.to_string())
998 .parse()
999 .unwrap();
1000 #[cfg(not(debug_assertions))]
1001 let warning_threshold: f32 = TOKEN_USAGE_WARNING_THRESHOLD;
1002
1003 // When the maximum is unknown because there is no selected model,
1004 // avoid showing the token limit warning.
1005 if self.max_tokens == 0 {
1006 TokenUsageRatio::Normal
1007 } else if self.used_tokens >= self.max_tokens {
1008 TokenUsageRatio::Exceeded
1009 } else if self.used_tokens as f32 / self.max_tokens as f32 >= warning_threshold {
1010 TokenUsageRatio::Warning
1011 } else {
1012 TokenUsageRatio::Normal
1013 }
1014 }
1015}
1016
1017#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
1018pub enum TokenUsageRatio {
1019 Normal,
1020 Warning,
1021 Exceeded,
1022}
1023
1024#[derive(Debug, Clone)]
1025pub struct RetryStatus {
1026 pub last_error: SharedString,
1027 pub attempt: usize,
1028 pub max_attempts: usize,
1029 pub started_at: Instant,
1030 pub duration: Duration,
1031}
1032
1033struct RunningTurn {
1034 id: u32,
1035 send_task: Task<()>,
1036}
1037
1038pub struct AcpThread {
1039 session_id: acp::SessionId,
1040 work_dirs: Option<PathList>,
1041 parent_session_id: Option<acp::SessionId>,
1042 title: Option<SharedString>,
1043 provisional_title: Option<SharedString>,
1044 entries: Vec<AgentThreadEntry>,
1045 plan: Plan,
1046 project: Entity<Project>,
1047 action_log: Entity<ActionLog>,
1048 shared_buffers: HashMap<Entity<Buffer>, BufferSnapshot>,
1049 turn_id: u32,
1050 running_turn: Option<RunningTurn>,
1051 connection: Rc<dyn AgentConnection>,
1052 token_usage: Option<TokenUsage>,
1053 cost: Option<SessionCost>,
1054 prompt_capabilities: acp::PromptCapabilities,
1055 available_commands: Vec<acp::AvailableCommand>,
1056 _observe_prompt_capabilities: Task<anyhow::Result<()>>,
1057 terminals: HashMap<acp::TerminalId, Entity<Terminal>>,
1058 pending_terminal_output: HashMap<acp::TerminalId, Vec<Vec<u8>>>,
1059 pending_terminal_exit: HashMap<acp::TerminalId, acp::TerminalExitStatus>,
1060 had_error: bool,
1061 /// The user's unsent prompt text, persisted so it can be restored when reloading the thread.
1062 draft_prompt: Option<Vec<acp::ContentBlock>>,
1063 /// The initial scroll position for the thread view, set during session registration.
1064 ui_scroll_position: Option<gpui::ListOffset>,
1065 /// Buffer for smooth text streaming. Holds text that has been received from
1066 /// the model but not yet revealed in the UI. A timer task drains this buffer
1067 /// gradually to create a fluid typing effect instead of choppy chunk-at-a-time
1068 /// updates.
1069 streaming_text_buffer: Option<StreamingTextBuffer>,
1070}
1071
1072struct StreamingTextBuffer {
1073 /// Text received from the model but not yet appended to the Markdown source.
1074 pending: String,
1075 /// The number of bytes to reveal per timer turn.
1076 bytes_to_reveal_per_tick: usize,
1077 /// The Markdown entity being streamed into.
1078 target: Entity<Markdown>,
1079 /// Timer task that periodically moves text from `pending` into `source`.
1080 _reveal_task: Task<()>,
1081}
1082
1083impl StreamingTextBuffer {
1084 /// The number of milliseconds between each timer tick, controlling how quickly
1085 /// text is revealed.
1086 const TASK_UPDATE_MS: u64 = 16;
1087 /// The time in milliseconds to reveal the entire pending text.
1088 const REVEAL_TARGET: f32 = 200.0;
1089}
1090
1091impl From<&AcpThread> for ActionLogTelemetry {
1092 fn from(value: &AcpThread) -> Self {
1093 Self {
1094 agent_telemetry_id: value.connection().telemetry_id(),
1095 session_id: value.session_id.0.clone(),
1096 }
1097 }
1098}
1099
1100#[derive(Debug)]
1101pub enum AcpThreadEvent {
1102 PromptUpdated,
1103 NewEntry,
1104 TitleUpdated,
1105 TokenUsageUpdated,
1106 EntryUpdated(usize),
1107 EntriesRemoved(Range<usize>),
1108 ToolAuthorizationRequested(acp::ToolCallId),
1109 ToolAuthorizationReceived(acp::ToolCallId),
1110 Retry(RetryStatus),
1111 SubagentSpawned(acp::SessionId),
1112 Stopped(acp::StopReason),
1113 Error,
1114 LoadError(LoadError),
1115 PromptCapabilitiesUpdated,
1116 Refusal,
1117 AvailableCommandsUpdated(Vec<acp::AvailableCommand>),
1118 ModeUpdated(acp::SessionModeId),
1119 ConfigOptionsUpdated(Vec<acp::SessionConfigOption>),
1120 WorkingDirectoriesUpdated,
1121}
1122
1123impl EventEmitter<AcpThreadEvent> for AcpThread {}
1124
1125#[derive(Debug, Clone)]
1126pub enum TerminalProviderEvent {
1127 Created {
1128 terminal_id: acp::TerminalId,
1129 label: String,
1130 cwd: Option<PathBuf>,
1131 output_byte_limit: Option<u64>,
1132 terminal: Entity<::terminal::Terminal>,
1133 },
1134 Output {
1135 terminal_id: acp::TerminalId,
1136 data: Vec<u8>,
1137 },
1138 TitleChanged {
1139 terminal_id: acp::TerminalId,
1140 title: String,
1141 },
1142 Exit {
1143 terminal_id: acp::TerminalId,
1144 status: acp::TerminalExitStatus,
1145 },
1146}
1147
1148#[derive(Debug, Clone)]
1149pub enum TerminalProviderCommand {
1150 WriteInput {
1151 terminal_id: acp::TerminalId,
1152 bytes: Vec<u8>,
1153 },
1154 Resize {
1155 terminal_id: acp::TerminalId,
1156 cols: u16,
1157 rows: u16,
1158 },
1159 Close {
1160 terminal_id: acp::TerminalId,
1161 },
1162}
1163
1164#[derive(PartialEq, Eq, Debug)]
1165pub enum ThreadStatus {
1166 Idle,
1167 Generating,
1168}
1169
1170#[derive(Debug, Clone)]
1171pub enum LoadError {
1172 Unsupported {
1173 command: SharedString,
1174 current_version: SharedString,
1175 minimum_version: SharedString,
1176 },
1177 FailedToInstall(SharedString),
1178 Exited {
1179 status: ExitStatus,
1180 },
1181 Other(SharedString),
1182}
1183
1184impl Display for LoadError {
1185 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1186 match self {
1187 LoadError::Unsupported {
1188 command: path,
1189 current_version,
1190 minimum_version,
1191 } => {
1192 write!(
1193 f,
1194 "version {current_version} from {path} is not supported (need at least {minimum_version})"
1195 )
1196 }
1197 LoadError::FailedToInstall(msg) => write!(f, "Failed to install: {msg}"),
1198 LoadError::Exited { status } => write!(f, "Server exited with status {status}"),
1199 LoadError::Other(msg) => write!(f, "{msg}"),
1200 }
1201 }
1202}
1203
1204impl Error for LoadError {}
1205
1206impl AcpThread {
1207 pub fn new(
1208 parent_session_id: Option<acp::SessionId>,
1209 title: Option<SharedString>,
1210 work_dirs: Option<PathList>,
1211 connection: Rc<dyn AgentConnection>,
1212 project: Entity<Project>,
1213 action_log: Entity<ActionLog>,
1214 session_id: acp::SessionId,
1215 mut prompt_capabilities_rx: watch::Receiver<acp::PromptCapabilities>,
1216 cx: &mut Context<Self>,
1217 ) -> Self {
1218 let prompt_capabilities = prompt_capabilities_rx.borrow().clone();
1219 let task = cx.spawn::<_, anyhow::Result<()>>(async move |this, cx| {
1220 loop {
1221 let caps = prompt_capabilities_rx.recv().await?;
1222 this.update(cx, |this, cx| {
1223 this.prompt_capabilities = caps;
1224 cx.emit(AcpThreadEvent::PromptCapabilitiesUpdated);
1225 })?;
1226 }
1227 });
1228
1229 Self {
1230 parent_session_id,
1231 work_dirs,
1232 action_log,
1233 shared_buffers: Default::default(),
1234 entries: Default::default(),
1235 plan: Default::default(),
1236 title,
1237 provisional_title: None,
1238 project,
1239 running_turn: None,
1240 turn_id: 0,
1241 connection,
1242 session_id,
1243 token_usage: None,
1244 cost: None,
1245 prompt_capabilities,
1246 available_commands: Vec::new(),
1247 _observe_prompt_capabilities: task,
1248 terminals: HashMap::default(),
1249 pending_terminal_output: HashMap::default(),
1250 pending_terminal_exit: HashMap::default(),
1251 had_error: false,
1252 draft_prompt: None,
1253 ui_scroll_position: None,
1254 streaming_text_buffer: None,
1255 }
1256 }
1257
1258 pub fn parent_session_id(&self) -> Option<&acp::SessionId> {
1259 self.parent_session_id.as_ref()
1260 }
1261
1262 pub fn prompt_capabilities(&self) -> acp::PromptCapabilities {
1263 self.prompt_capabilities.clone()
1264 }
1265
1266 pub fn available_commands(&self) -> &[acp::AvailableCommand] {
1267 &self.available_commands
1268 }
1269
1270 pub fn is_draft_thread(&self) -> bool {
1271 self.entries().is_empty()
1272 }
1273
1274 pub fn draft_prompt(&self) -> Option<&[acp::ContentBlock]> {
1275 self.draft_prompt.as_deref()
1276 }
1277
1278 pub fn set_draft_prompt(
1279 &mut self,
1280 prompt: Option<Vec<acp::ContentBlock>>,
1281 cx: &mut Context<Self>,
1282 ) {
1283 cx.emit(AcpThreadEvent::PromptUpdated);
1284 self.draft_prompt = prompt;
1285 }
1286
1287 pub fn ui_scroll_position(&self) -> Option<gpui::ListOffset> {
1288 self.ui_scroll_position
1289 }
1290
1291 pub fn set_ui_scroll_position(&mut self, position: Option<gpui::ListOffset>) {
1292 self.ui_scroll_position = position;
1293 }
1294
1295 pub fn connection(&self) -> &Rc<dyn AgentConnection> {
1296 &self.connection
1297 }
1298
1299 pub fn action_log(&self) -> &Entity<ActionLog> {
1300 &self.action_log
1301 }
1302
1303 pub fn project(&self) -> &Entity<Project> {
1304 &self.project
1305 }
1306
1307 pub fn title(&self) -> Option<SharedString> {
1308 self.title
1309 .clone()
1310 .or_else(|| self.provisional_title.clone())
1311 }
1312
1313 pub fn has_provisional_title(&self) -> bool {
1314 self.provisional_title.is_some()
1315 }
1316
1317 pub fn entries(&self) -> &[AgentThreadEntry] {
1318 &self.entries
1319 }
1320
1321 pub fn session_id(&self) -> &acp::SessionId {
1322 &self.session_id
1323 }
1324
1325 pub fn supports_truncate(&self, cx: &App) -> bool {
1326 self.connection.truncate(&self.session_id, cx).is_some()
1327 }
1328
1329 pub fn work_dirs(&self) -> Option<&PathList> {
1330 self.work_dirs.as_ref()
1331 }
1332
1333 pub fn set_work_dirs(&mut self, work_dirs: PathList, cx: &mut Context<Self>) {
1334 self.work_dirs = Some(work_dirs);
1335 cx.emit(AcpThreadEvent::WorkingDirectoriesUpdated)
1336 }
1337
1338 pub fn status(&self) -> ThreadStatus {
1339 if self.running_turn.is_some() {
1340 ThreadStatus::Generating
1341 } else {
1342 ThreadStatus::Idle
1343 }
1344 }
1345
1346 pub fn had_error(&self) -> bool {
1347 self.had_error
1348 }
1349
1350 pub fn is_waiting_for_confirmation(&self) -> bool {
1351 for entry in self.entries.iter().rev() {
1352 match entry {
1353 AgentThreadEntry::UserMessage(_) => return false,
1354 AgentThreadEntry::ToolCall(ToolCall {
1355 status: ToolCallStatus::WaitingForConfirmation { .. },
1356 ..
1357 }) => return true,
1358 AgentThreadEntry::ToolCall(_)
1359 | AgentThreadEntry::AssistantMessage(_)
1360 | AgentThreadEntry::CompletedPlan(_) => {}
1361 }
1362 }
1363 false
1364 }
1365
1366 pub fn token_usage(&self) -> Option<&TokenUsage> {
1367 self.token_usage.as_ref()
1368 }
1369
1370 pub fn cost(&self) -> Option<&SessionCost> {
1371 self.cost.as_ref()
1372 }
1373
1374 pub fn has_pending_edit_tool_calls(&self) -> bool {
1375 for entry in self.entries.iter().rev() {
1376 match entry {
1377 AgentThreadEntry::UserMessage(_) => return false,
1378 AgentThreadEntry::ToolCall(
1379 call @ ToolCall {
1380 status: ToolCallStatus::InProgress | ToolCallStatus::Pending,
1381 ..
1382 },
1383 ) if call.diffs().next().is_some() => {
1384 return true;
1385 }
1386 AgentThreadEntry::ToolCall(_)
1387 | AgentThreadEntry::AssistantMessage(_)
1388 | AgentThreadEntry::CompletedPlan(_) => {}
1389 }
1390 }
1391
1392 false
1393 }
1394
1395 pub fn has_in_progress_tool_calls(&self) -> bool {
1396 for entry in self.entries.iter().rev() {
1397 match entry {
1398 AgentThreadEntry::UserMessage(_) => return false,
1399 AgentThreadEntry::ToolCall(ToolCall {
1400 status: ToolCallStatus::InProgress | ToolCallStatus::Pending,
1401 ..
1402 }) => {
1403 return true;
1404 }
1405 AgentThreadEntry::ToolCall(_)
1406 | AgentThreadEntry::AssistantMessage(_)
1407 | AgentThreadEntry::CompletedPlan(_) => {}
1408 }
1409 }
1410
1411 false
1412 }
1413
1414 pub fn used_tools_since_last_user_message(&self) -> bool {
1415 for entry in self.entries.iter().rev() {
1416 match entry {
1417 AgentThreadEntry::UserMessage(..) => return false,
1418 AgentThreadEntry::AssistantMessage(..) | AgentThreadEntry::CompletedPlan(..) => {
1419 continue;
1420 }
1421 AgentThreadEntry::ToolCall(..) => return true,
1422 }
1423 }
1424
1425 false
1426 }
1427
1428 pub fn handle_session_update(
1429 &mut self,
1430 update: acp::SessionUpdate,
1431 cx: &mut Context<Self>,
1432 ) -> Result<(), acp::Error> {
1433 match update {
1434 acp::SessionUpdate::UserMessageChunk(acp::ContentChunk { content, .. }) => {
1435 // We optimistically add the full user prompt before calling `prompt`.
1436 // Some ACP servers echo user chunks back over updates. Skip the chunk if
1437 // it's already present in the current user message to avoid duplicating content.
1438 let already_in_user_message = self
1439 .entries
1440 .last()
1441 .and_then(|entry| entry.user_message())
1442 .is_some_and(|message| message.chunks.contains(&content));
1443 if !already_in_user_message {
1444 self.push_user_content_block(None, content, cx);
1445 }
1446 }
1447 acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk { content, .. }) => {
1448 self.push_assistant_content_block(content, false, cx);
1449 }
1450 acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk { content, .. }) => {
1451 self.push_assistant_content_block(content, true, cx);
1452 }
1453 acp::SessionUpdate::ToolCall(tool_call) => {
1454 self.upsert_tool_call(tool_call, cx)?;
1455 }
1456 acp::SessionUpdate::ToolCallUpdate(tool_call_update) => {
1457 self.update_tool_call(tool_call_update, cx)?;
1458 }
1459 acp::SessionUpdate::Plan(plan) => {
1460 self.update_plan(plan, cx);
1461 }
1462 acp::SessionUpdate::SessionInfoUpdate(info_update) => {
1463 if let acp::MaybeUndefined::Value(title) = info_update.title {
1464 let had_provisional = self.provisional_title.take().is_some();
1465 let title: SharedString = title.into();
1466 if self.title.as_ref() != Some(&title) {
1467 self.title = Some(title);
1468 cx.emit(AcpThreadEvent::TitleUpdated);
1469 } else if had_provisional {
1470 cx.emit(AcpThreadEvent::TitleUpdated);
1471 }
1472 }
1473 }
1474 acp::SessionUpdate::AvailableCommandsUpdate(acp::AvailableCommandsUpdate {
1475 available_commands,
1476 ..
1477 }) => {
1478 self.available_commands = available_commands.clone();
1479 cx.emit(AcpThreadEvent::AvailableCommandsUpdated(available_commands));
1480 }
1481 acp::SessionUpdate::CurrentModeUpdate(acp::CurrentModeUpdate {
1482 current_mode_id,
1483 ..
1484 }) => cx.emit(AcpThreadEvent::ModeUpdated(current_mode_id)),
1485 acp::SessionUpdate::ConfigOptionUpdate(acp::ConfigOptionUpdate {
1486 config_options,
1487 ..
1488 }) => cx.emit(AcpThreadEvent::ConfigOptionsUpdated(config_options)),
1489 acp::SessionUpdate::UsageUpdate(update) if cx.has_flag::<AcpBetaFeatureFlag>() => {
1490 let usage = self.token_usage.get_or_insert_with(Default::default);
1491 usage.max_tokens = update.size;
1492 usage.used_tokens = update.used;
1493 if let Some(cost) = update.cost {
1494 self.cost = Some(SessionCost {
1495 amount: cost.amount,
1496 currency: cost.currency.into(),
1497 });
1498 }
1499 cx.emit(AcpThreadEvent::TokenUsageUpdated);
1500 }
1501 _ => {}
1502 }
1503 Ok(())
1504 }
1505
1506 pub fn push_user_content_block(
1507 &mut self,
1508 message_id: Option<UserMessageId>,
1509 chunk: acp::ContentBlock,
1510 cx: &mut Context<Self>,
1511 ) {
1512 self.push_user_content_block_with_indent(message_id, chunk, false, cx)
1513 }
1514
1515 pub fn push_user_content_block_with_indent(
1516 &mut self,
1517 message_id: Option<UserMessageId>,
1518 chunk: acp::ContentBlock,
1519 indented: bool,
1520 cx: &mut Context<Self>,
1521 ) {
1522 let language_registry = self.project.read(cx).languages().clone();
1523 let path_style = self.project.read(cx).path_style(cx);
1524 let entries_len = self.entries.len();
1525
1526 if let Some(last_entry) = self.entries.last_mut()
1527 && let AgentThreadEntry::UserMessage(UserMessage {
1528 id,
1529 content,
1530 chunks,
1531 indented: existing_indented,
1532 ..
1533 }) = last_entry
1534 && *existing_indented == indented
1535 {
1536 Self::flush_streaming_text(&mut self.streaming_text_buffer, cx);
1537 *id = message_id.or(id.take());
1538 content.append(chunk.clone(), &language_registry, path_style, cx);
1539 chunks.push(chunk);
1540 let idx = entries_len - 1;
1541 cx.emit(AcpThreadEvent::EntryUpdated(idx));
1542 } else {
1543 let content = ContentBlock::new(chunk.clone(), &language_registry, path_style, cx);
1544 self.push_entry(
1545 AgentThreadEntry::UserMessage(UserMessage {
1546 id: message_id,
1547 content,
1548 chunks: vec![chunk],
1549 checkpoint: None,
1550 indented,
1551 }),
1552 cx,
1553 );
1554 }
1555 }
1556
1557 pub fn push_assistant_content_block(
1558 &mut self,
1559 chunk: acp::ContentBlock,
1560 is_thought: bool,
1561 cx: &mut Context<Self>,
1562 ) {
1563 self.push_assistant_content_block_with_indent(chunk, is_thought, false, cx)
1564 }
1565
1566 pub fn push_assistant_content_block_with_indent(
1567 &mut self,
1568 chunk: acp::ContentBlock,
1569 is_thought: bool,
1570 indented: bool,
1571 cx: &mut Context<Self>,
1572 ) {
1573 let path_style = self.project.read(cx).path_style(cx);
1574
1575 // For text chunks going to an existing Markdown block, buffer for smooth
1576 // streaming instead of appending all at once which may feel more choppy.
1577 if let acp::ContentBlock::Text(text_content) = &chunk {
1578 if let Some(markdown) = self.streaming_markdown_target(is_thought, indented) {
1579 let entries_len = self.entries.len();
1580 cx.emit(AcpThreadEvent::EntryUpdated(entries_len - 1));
1581 self.buffer_streaming_text(&markdown, text_content.text.clone(), cx);
1582 return;
1583 }
1584 }
1585
1586 let language_registry = self.project.read(cx).languages().clone();
1587 let entries_len = self.entries.len();
1588 if let Some(last_entry) = self.entries.last_mut()
1589 && let AgentThreadEntry::AssistantMessage(AssistantMessage {
1590 chunks,
1591 indented: existing_indented,
1592 is_subagent_output: _,
1593 }) = last_entry
1594 && *existing_indented == indented
1595 {
1596 let idx = entries_len - 1;
1597 Self::flush_streaming_text(&mut self.streaming_text_buffer, cx);
1598 cx.emit(AcpThreadEvent::EntryUpdated(idx));
1599 match (chunks.last_mut(), is_thought) {
1600 (Some(AssistantMessageChunk::Message { block }), false)
1601 | (Some(AssistantMessageChunk::Thought { block }), true) => {
1602 block.append(chunk, &language_registry, path_style, cx)
1603 }
1604 _ => {
1605 let block = ContentBlock::new(chunk, &language_registry, path_style, cx);
1606 if is_thought {
1607 chunks.push(AssistantMessageChunk::Thought { block })
1608 } else {
1609 chunks.push(AssistantMessageChunk::Message { block })
1610 }
1611 }
1612 }
1613 } else {
1614 let block = ContentBlock::new(chunk, &language_registry, path_style, cx);
1615 let chunk = if is_thought {
1616 AssistantMessageChunk::Thought { block }
1617 } else {
1618 AssistantMessageChunk::Message { block }
1619 };
1620
1621 self.push_entry(
1622 AgentThreadEntry::AssistantMessage(AssistantMessage {
1623 chunks: vec![chunk],
1624 indented,
1625 is_subagent_output: false,
1626 }),
1627 cx,
1628 );
1629 }
1630 }
1631
1632 fn streaming_markdown_target(
1633 &self,
1634 is_thought: bool,
1635 indented: bool,
1636 ) -> Option<Entity<Markdown>> {
1637 let last_entry = self.entries.last()?;
1638 if let AgentThreadEntry::AssistantMessage(AssistantMessage {
1639 chunks,
1640 indented: existing_indented,
1641 ..
1642 }) = last_entry
1643 && *existing_indented == indented
1644 && let [.., chunk] = chunks.as_slice()
1645 {
1646 match (chunk, is_thought) {
1647 (
1648 AssistantMessageChunk::Message {
1649 block: ContentBlock::Markdown { markdown },
1650 },
1651 false,
1652 )
1653 | (
1654 AssistantMessageChunk::Thought {
1655 block: ContentBlock::Markdown { markdown },
1656 },
1657 true,
1658 ) => Some(markdown.clone()),
1659 _ => None,
1660 }
1661 } else {
1662 None
1663 }
1664 }
1665
1666 /// Add text to the streaming buffer. If the target changed (e.g. switching
1667 /// from thoughts to message text), flush the old buffer first.
1668 fn buffer_streaming_text(
1669 &mut self,
1670 markdown: &Entity<Markdown>,
1671 text: String,
1672 cx: &mut Context<Self>,
1673 ) {
1674 if let Some(buffer) = &mut self.streaming_text_buffer {
1675 if buffer.target.entity_id() == markdown.entity_id() {
1676 buffer.pending.push_str(&text);
1677
1678 buffer.bytes_to_reveal_per_tick = (buffer.pending.len() as f32
1679 / StreamingTextBuffer::REVEAL_TARGET
1680 * StreamingTextBuffer::TASK_UPDATE_MS as f32)
1681 .ceil() as usize;
1682 return;
1683 }
1684 Self::flush_streaming_text(&mut self.streaming_text_buffer, cx);
1685 }
1686
1687 let target = markdown.clone();
1688 let _reveal_task = self.start_streaming_reveal(cx);
1689 let pending_len = text.len();
1690 let bytes_to_reveal = (pending_len as f32 / StreamingTextBuffer::REVEAL_TARGET
1691 * StreamingTextBuffer::TASK_UPDATE_MS as f32)
1692 .ceil() as usize;
1693 self.streaming_text_buffer = Some(StreamingTextBuffer {
1694 pending: text,
1695 bytes_to_reveal_per_tick: bytes_to_reveal,
1696 target,
1697 _reveal_task,
1698 });
1699 }
1700
1701 /// Flush all buffered streaming text into the Markdown entity immediately.
1702 fn flush_streaming_text(
1703 streaming_text_buffer: &mut Option<StreamingTextBuffer>,
1704 cx: &mut Context<Self>,
1705 ) {
1706 if let Some(buffer) = streaming_text_buffer.take() {
1707 if !buffer.pending.is_empty() {
1708 buffer
1709 .target
1710 .update(cx, |markdown, cx| markdown.append(&buffer.pending, cx));
1711 }
1712 }
1713 }
1714
1715 /// Spawns a foreground task that periodically drains
1716 /// `streaming_text_buffer.pending` into the target `Markdown` entity,
1717 /// producing smooth, continuous text output.
1718 fn start_streaming_reveal(&self, cx: &mut Context<Self>) -> Task<()> {
1719 cx.spawn(async move |this, cx| {
1720 loop {
1721 cx.background_executor()
1722 .timer(Duration::from_millis(StreamingTextBuffer::TASK_UPDATE_MS))
1723 .await;
1724
1725 let should_continue = this
1726 .update(cx, |this, cx| {
1727 let Some(buffer) = &mut this.streaming_text_buffer else {
1728 return false;
1729 };
1730
1731 if buffer.pending.is_empty() {
1732 return true;
1733 }
1734
1735 let pending_len = buffer.pending.len();
1736
1737 let byte_boundary = buffer
1738 .pending
1739 .ceil_char_boundary(buffer.bytes_to_reveal_per_tick)
1740 .min(pending_len);
1741
1742 buffer.target.update(cx, |markdown: &mut Markdown, cx| {
1743 markdown.append(&buffer.pending[..byte_boundary], cx);
1744 buffer.pending.drain(..byte_boundary);
1745 });
1746
1747 true
1748 })
1749 .unwrap_or(false);
1750
1751 if !should_continue {
1752 break;
1753 }
1754 }
1755 })
1756 }
1757
1758 fn push_entry(&mut self, entry: AgentThreadEntry, cx: &mut Context<Self>) {
1759 Self::flush_streaming_text(&mut self.streaming_text_buffer, cx);
1760 self.entries.push(entry);
1761 cx.emit(AcpThreadEvent::NewEntry);
1762 }
1763
1764 pub fn can_set_title(&mut self, cx: &mut Context<Self>) -> bool {
1765 self.connection.set_title(&self.session_id, cx).is_some()
1766 }
1767
1768 pub fn set_title(&mut self, title: SharedString, cx: &mut Context<Self>) -> Task<Result<()>> {
1769 let had_provisional = self.provisional_title.take().is_some();
1770 if self.title.as_ref() != Some(&title) {
1771 self.title = Some(title.clone());
1772 cx.emit(AcpThreadEvent::TitleUpdated);
1773 if let Some(set_title) = self.connection.set_title(&self.session_id, cx) {
1774 return set_title.run(title, cx);
1775 }
1776 } else if had_provisional {
1777 cx.emit(AcpThreadEvent::TitleUpdated);
1778 }
1779 Task::ready(Ok(()))
1780 }
1781
1782 /// Sets a provisional display title without propagating back to the
1783 /// underlying agent connection. This is used for quick preview titles
1784 /// (e.g. first 20 chars of the user message) that should be shown
1785 /// immediately but replaced once the LLM generates a proper title via
1786 /// `set_title`.
1787 pub fn set_provisional_title(&mut self, title: SharedString, cx: &mut Context<Self>) {
1788 self.provisional_title = Some(title);
1789 cx.emit(AcpThreadEvent::TitleUpdated);
1790 }
1791
1792 pub fn subagent_spawned(&mut self, session_id: acp::SessionId, cx: &mut Context<Self>) {
1793 cx.emit(AcpThreadEvent::SubagentSpawned(session_id));
1794 }
1795
1796 pub fn update_token_usage(&mut self, usage: Option<TokenUsage>, cx: &mut Context<Self>) {
1797 if usage.is_none() {
1798 self.cost = None;
1799 }
1800 self.token_usage = usage;
1801 cx.emit(AcpThreadEvent::TokenUsageUpdated);
1802 }
1803
1804 pub fn update_retry_status(&mut self, status: RetryStatus, cx: &mut Context<Self>) {
1805 cx.emit(AcpThreadEvent::Retry(status));
1806 }
1807
1808 pub fn update_tool_call(
1809 &mut self,
1810 update: impl Into<ToolCallUpdate>,
1811 cx: &mut Context<Self>,
1812 ) -> Result<()> {
1813 let update = update.into();
1814 let languages = self.project.read(cx).languages().clone();
1815 let path_style = self.project.read(cx).path_style(cx);
1816
1817 let ix = match self.index_for_tool_call(update.id()) {
1818 Some(ix) => ix,
1819 None => {
1820 // Tool call not found - create a failed tool call entry
1821 let failed_tool_call = ToolCall {
1822 id: update.id().clone(),
1823 label: cx.new(|cx| Markdown::new("Tool call not found".into(), None, None, cx)),
1824 kind: acp::ToolKind::Fetch,
1825 content: vec![ToolCallContent::ContentBlock(ContentBlock::new(
1826 "Tool call not found".into(),
1827 &languages,
1828 path_style,
1829 cx,
1830 ))],
1831 status: ToolCallStatus::Failed,
1832 locations: Vec::new(),
1833 resolved_locations: Vec::new(),
1834 raw_input: None,
1835 raw_input_markdown: None,
1836 raw_output: None,
1837 tool_name: None,
1838 subagent_session_info: None,
1839 };
1840 self.push_entry(AgentThreadEntry::ToolCall(failed_tool_call), cx);
1841 return Ok(());
1842 }
1843 };
1844 let AgentThreadEntry::ToolCall(call) = &mut self.entries[ix] else {
1845 unreachable!()
1846 };
1847
1848 match update {
1849 ToolCallUpdate::UpdateFields(update) => {
1850 let location_updated = update.fields.locations.is_some();
1851 call.update_fields(
1852 update.fields,
1853 update.meta,
1854 languages,
1855 path_style,
1856 &self.terminals,
1857 cx,
1858 )?;
1859 if location_updated {
1860 self.resolve_locations(update.tool_call_id, cx);
1861 }
1862 }
1863 ToolCallUpdate::UpdateDiff(update) => {
1864 call.content.clear();
1865 call.content.push(ToolCallContent::Diff(update.diff));
1866 }
1867 ToolCallUpdate::UpdateTerminal(update) => {
1868 call.content.clear();
1869 call.content
1870 .push(ToolCallContent::Terminal(update.terminal));
1871 }
1872 }
1873
1874 cx.emit(AcpThreadEvent::EntryUpdated(ix));
1875
1876 Ok(())
1877 }
1878
1879 /// Updates a tool call if id matches an existing entry, otherwise inserts a new one.
1880 pub fn upsert_tool_call(
1881 &mut self,
1882 tool_call: acp::ToolCall,
1883 cx: &mut Context<Self>,
1884 ) -> Result<(), acp::Error> {
1885 let status = tool_call.status.into();
1886 self.upsert_tool_call_inner(tool_call.into(), status, cx)
1887 }
1888
1889 /// Fails if id does not match an existing entry.
1890 pub fn upsert_tool_call_inner(
1891 &mut self,
1892 update: acp::ToolCallUpdate,
1893 status: ToolCallStatus,
1894 cx: &mut Context<Self>,
1895 ) -> Result<(), acp::Error> {
1896 let language_registry = self.project.read(cx).languages().clone();
1897 let path_style = self.project.read(cx).path_style(cx);
1898 let id = update.tool_call_id.clone();
1899
1900 let agent_telemetry_id = self.connection().telemetry_id();
1901 let session = self.session_id();
1902 let parent_session_id = self.parent_session_id();
1903 if let ToolCallStatus::Completed | ToolCallStatus::Failed = status {
1904 let status = if matches!(status, ToolCallStatus::Completed) {
1905 "completed"
1906 } else {
1907 "failed"
1908 };
1909 telemetry::event!(
1910 "Agent Tool Call Completed",
1911 agent_telemetry_id,
1912 session,
1913 parent_session_id,
1914 status
1915 );
1916 }
1917
1918 if let Some(ix) = self.index_for_tool_call(&id) {
1919 let AgentThreadEntry::ToolCall(call) = &mut self.entries[ix] else {
1920 unreachable!()
1921 };
1922
1923 call.update_fields(
1924 update.fields,
1925 update.meta,
1926 language_registry,
1927 path_style,
1928 &self.terminals,
1929 cx,
1930 )?;
1931 call.status = status;
1932
1933 cx.emit(AcpThreadEvent::EntryUpdated(ix));
1934 } else {
1935 let call = ToolCall::from_acp(
1936 update.try_into()?,
1937 status,
1938 language_registry,
1939 self.project.read(cx).path_style(cx),
1940 &self.terminals,
1941 cx,
1942 )?;
1943 self.push_entry(AgentThreadEntry::ToolCall(call), cx);
1944 };
1945
1946 self.resolve_locations(id, cx);
1947 Ok(())
1948 }
1949
1950 fn index_for_tool_call(&self, id: &acp::ToolCallId) -> Option<usize> {
1951 self.entries
1952 .iter()
1953 .enumerate()
1954 .rev()
1955 .find_map(|(index, entry)| {
1956 if let AgentThreadEntry::ToolCall(tool_call) = entry
1957 && &tool_call.id == id
1958 {
1959 Some(index)
1960 } else {
1961 None
1962 }
1963 })
1964 }
1965
1966 fn tool_call_mut(&mut self, id: &acp::ToolCallId) -> Option<(usize, &mut ToolCall)> {
1967 // The tool call we are looking for is typically the last one, or very close to the end.
1968 // At the moment, it doesn't seem like a hashmap would be a good fit for this use case.
1969 self.entries
1970 .iter_mut()
1971 .enumerate()
1972 .rev()
1973 .find_map(|(index, tool_call)| {
1974 if let AgentThreadEntry::ToolCall(tool_call) = tool_call
1975 && &tool_call.id == id
1976 {
1977 Some((index, tool_call))
1978 } else {
1979 None
1980 }
1981 })
1982 }
1983
1984 pub fn tool_call(&self, id: &acp::ToolCallId) -> Option<(usize, &ToolCall)> {
1985 self.entries
1986 .iter()
1987 .enumerate()
1988 .rev()
1989 .find_map(|(index, tool_call)| {
1990 if let AgentThreadEntry::ToolCall(tool_call) = tool_call
1991 && &tool_call.id == id
1992 {
1993 Some((index, tool_call))
1994 } else {
1995 None
1996 }
1997 })
1998 }
1999
2000 pub fn tool_call_for_subagent(&self, session_id: &acp::SessionId) -> Option<&ToolCall> {
2001 self.entries.iter().find_map(|entry| match entry {
2002 AgentThreadEntry::ToolCall(tool_call) => {
2003 if let Some(subagent_session_info) = &tool_call.subagent_session_info
2004 && &subagent_session_info.session_id == session_id
2005 {
2006 Some(tool_call)
2007 } else {
2008 None
2009 }
2010 }
2011 _ => None,
2012 })
2013 }
2014
2015 pub fn resolve_locations(&mut self, id: acp::ToolCallId, cx: &mut Context<Self>) {
2016 let project = self.project.clone();
2017 let should_update_agent_location = self.parent_session_id.is_none();
2018 let Some((_, tool_call)) = self.tool_call_mut(&id) else {
2019 return;
2020 };
2021 let task = tool_call.resolve_locations(project, cx);
2022 cx.spawn(async move |this, cx| {
2023 let resolved_locations = task.await;
2024
2025 this.update(cx, |this, cx| {
2026 let project = this.project.clone();
2027
2028 for location in resolved_locations.iter().flatten() {
2029 this.shared_buffers
2030 .insert(location.buffer.clone(), location.buffer.read(cx).snapshot());
2031 }
2032 let Some((ix, tool_call)) = this.tool_call_mut(&id) else {
2033 return;
2034 };
2035
2036 if let Some(Some(location)) = resolved_locations.last() {
2037 project.update(cx, |project, cx| {
2038 let should_ignore = if let Some(agent_location) = project
2039 .agent_location()
2040 .filter(|agent_location| agent_location.buffer == location.buffer)
2041 {
2042 let snapshot = location.buffer.read(cx).snapshot();
2043 let old_position = agent_location.position.to_point(&snapshot);
2044 let new_position = location.position.to_point(&snapshot);
2045
2046 // ignore this so that when we get updates from the edit tool
2047 // the position doesn't reset to the startof line
2048 old_position.row == new_position.row
2049 && old_position.column > new_position.column
2050 } else {
2051 false
2052 };
2053 if !should_ignore && should_update_agent_location {
2054 project.set_agent_location(Some(location.into()), cx);
2055 }
2056 });
2057 }
2058
2059 let resolved_locations = resolved_locations
2060 .iter()
2061 .map(|l| l.as_ref().map(|l| AgentLocation::from(l)))
2062 .collect::<Vec<_>>();
2063
2064 if tool_call.resolved_locations != resolved_locations {
2065 tool_call.resolved_locations = resolved_locations;
2066 cx.emit(AcpThreadEvent::EntryUpdated(ix));
2067 }
2068 })
2069 })
2070 .detach();
2071 }
2072
2073 pub fn request_tool_call_authorization(
2074 &mut self,
2075 tool_call: acp::ToolCallUpdate,
2076 options: PermissionOptions,
2077 cx: &mut Context<Self>,
2078 ) -> Result<Task<RequestPermissionOutcome>> {
2079 let (tx, rx) = oneshot::channel();
2080
2081 let status = ToolCallStatus::WaitingForConfirmation {
2082 options,
2083 respond_tx: tx,
2084 };
2085
2086 let tool_call_id = tool_call.tool_call_id.clone();
2087 self.upsert_tool_call_inner(tool_call, status, cx)?;
2088 cx.emit(AcpThreadEvent::ToolAuthorizationRequested(
2089 tool_call_id.clone(),
2090 ));
2091
2092 Ok(cx.spawn(async move |this, cx| {
2093 let outcome = match rx.await {
2094 Ok(outcome) => RequestPermissionOutcome::Selected(outcome),
2095 Err(oneshot::Canceled) => RequestPermissionOutcome::Cancelled,
2096 };
2097 this.update(cx, |_this, cx| {
2098 cx.emit(AcpThreadEvent::ToolAuthorizationReceived(tool_call_id))
2099 })
2100 .ok();
2101 outcome
2102 }))
2103 }
2104
2105 pub fn authorize_tool_call(
2106 &mut self,
2107 id: acp::ToolCallId,
2108 outcome: SelectedPermissionOutcome,
2109 cx: &mut Context<Self>,
2110 ) {
2111 let Some((ix, call)) = self.tool_call_mut(&id) else {
2112 return;
2113 };
2114
2115 let new_status = match outcome.option_kind {
2116 acp::PermissionOptionKind::RejectOnce | acp::PermissionOptionKind::RejectAlways => {
2117 ToolCallStatus::Rejected
2118 }
2119 acp::PermissionOptionKind::AllowOnce | acp::PermissionOptionKind::AllowAlways => {
2120 ToolCallStatus::InProgress
2121 }
2122 _ => ToolCallStatus::InProgress,
2123 };
2124
2125 let curr_status = mem::replace(&mut call.status, new_status);
2126
2127 if let ToolCallStatus::WaitingForConfirmation { respond_tx, .. } = curr_status {
2128 respond_tx.send(outcome).log_err();
2129 } else if cfg!(debug_assertions) {
2130 panic!("tried to authorize an already authorized tool call");
2131 }
2132
2133 cx.emit(AcpThreadEvent::EntryUpdated(ix));
2134 }
2135
2136 pub fn plan(&self) -> &Plan {
2137 &self.plan
2138 }
2139
2140 pub fn update_plan(&mut self, request: acp::Plan, cx: &mut Context<Self>) {
2141 let new_entries_len = request.entries.len();
2142 let mut new_entries = request.entries.into_iter();
2143
2144 // Reuse existing markdown to prevent flickering
2145 for (old, new) in self.plan.entries.iter_mut().zip(new_entries.by_ref()) {
2146 let PlanEntry {
2147 content,
2148 priority,
2149 status,
2150 } = old;
2151 content.update(cx, |old, cx| {
2152 old.replace(new.content, cx);
2153 });
2154 *priority = new.priority;
2155 *status = new.status;
2156 }
2157 for new in new_entries {
2158 self.plan.entries.push(PlanEntry::from_acp(new, cx))
2159 }
2160 self.plan.entries.truncate(new_entries_len);
2161
2162 cx.notify();
2163 }
2164
2165 pub fn snapshot_completed_plan(&mut self, cx: &mut Context<Self>) {
2166 if !self.plan.is_empty() && self.plan.stats().pending == 0 {
2167 let completed_entries = std::mem::take(&mut self.plan.entries);
2168 self.push_entry(AgentThreadEntry::CompletedPlan(completed_entries), cx);
2169 }
2170 }
2171
2172 fn clear_completed_plan_entries(&mut self, cx: &mut Context<Self>) {
2173 self.plan
2174 .entries
2175 .retain(|entry| !matches!(entry.status, acp::PlanEntryStatus::Completed));
2176 cx.notify();
2177 }
2178
2179 pub fn clear_plan(&mut self, cx: &mut Context<Self>) {
2180 self.plan.entries.clear();
2181 cx.notify();
2182 }
2183
2184 #[cfg(any(test, feature = "test-support"))]
2185 pub fn send_raw(
2186 &mut self,
2187 message: &str,
2188 cx: &mut Context<Self>,
2189 ) -> BoxFuture<'static, Result<Option<acp::PromptResponse>>> {
2190 self.send(vec![message.into()], cx)
2191 }
2192
2193 pub fn send(
2194 &mut self,
2195 message: Vec<acp::ContentBlock>,
2196 cx: &mut Context<Self>,
2197 ) -> BoxFuture<'static, Result<Option<acp::PromptResponse>>> {
2198 let block = ContentBlock::new_combined(
2199 message.clone(),
2200 self.project.read(cx).languages().clone(),
2201 self.project.read(cx).path_style(cx),
2202 cx,
2203 );
2204 let request = acp::PromptRequest::new(self.session_id.clone(), message.clone());
2205 let git_store = self.project.read(cx).git_store().clone();
2206
2207 let message_id = UserMessageId::new();
2208
2209 self.run_turn(cx, async move |this, cx| {
2210 this.update(cx, |this, cx| {
2211 this.push_entry(
2212 AgentThreadEntry::UserMessage(UserMessage {
2213 id: Some(message_id.clone()),
2214 content: block,
2215 chunks: message,
2216 checkpoint: None,
2217 indented: false,
2218 }),
2219 cx,
2220 );
2221 })
2222 .ok();
2223
2224 let old_checkpoint = git_store
2225 .update(cx, |git, cx| git.checkpoint(cx))
2226 .await
2227 .context("failed to get old checkpoint")
2228 .log_err();
2229 this.update(cx, |this, cx| {
2230 if let Some((_ix, message)) = this.last_user_message() {
2231 message.checkpoint = old_checkpoint.map(|git_checkpoint| Checkpoint {
2232 git_checkpoint,
2233 show: false,
2234 });
2235 }
2236 this.connection.prompt(message_id, request, cx)
2237 })?
2238 .await
2239 })
2240 }
2241
2242 pub fn can_retry(&self, cx: &App) -> bool {
2243 self.connection.retry(&self.session_id, cx).is_some()
2244 }
2245
2246 pub fn retry(
2247 &mut self,
2248 cx: &mut Context<Self>,
2249 ) -> BoxFuture<'static, Result<Option<acp::PromptResponse>>> {
2250 self.run_turn(cx, async move |this, cx| {
2251 this.update(cx, |this, cx| {
2252 this.connection
2253 .retry(&this.session_id, cx)
2254 .map(|retry| retry.run(cx))
2255 })?
2256 .context("retrying a session is not supported")?
2257 .await
2258 })
2259 }
2260
2261 fn run_turn(
2262 &mut self,
2263 cx: &mut Context<Self>,
2264 f: impl 'static + AsyncFnOnce(WeakEntity<Self>, &mut AsyncApp) -> Result<acp::PromptResponse>,
2265 ) -> BoxFuture<'static, Result<Option<acp::PromptResponse>>> {
2266 self.clear_completed_plan_entries(cx);
2267 self.had_error = false;
2268
2269 let (tx, rx) = oneshot::channel();
2270 let cancel_task = self.cancel(cx);
2271
2272 self.turn_id += 1;
2273 let turn_id = self.turn_id;
2274 self.running_turn = Some(RunningTurn {
2275 id: turn_id,
2276 send_task: cx.spawn(async move |this, cx| {
2277 cancel_task.await;
2278 tx.send(f(this, cx).await).ok();
2279 }),
2280 });
2281
2282 cx.spawn(async move |this, cx| {
2283 let response = rx.await;
2284
2285 this.update(cx, |this, cx| this.update_last_checkpoint(cx))?
2286 .await?;
2287
2288 this.update(cx, |this, cx| {
2289 if this.parent_session_id.is_none() {
2290 this.project
2291 .update(cx, |project, cx| project.set_agent_location(None, cx));
2292 }
2293 let Ok(response) = response else {
2294 // tx dropped, just return
2295 return Ok(None);
2296 };
2297
2298 let is_same_turn = this
2299 .running_turn
2300 .as_ref()
2301 .is_some_and(|turn| turn_id == turn.id);
2302
2303 // If the user submitted a follow up message, running_turn might
2304 // already point to a different turn. Therefore we only want to
2305 // take the task if it's the same turn.
2306 if is_same_turn {
2307 this.running_turn.take();
2308 }
2309
2310 match response {
2311 Ok(r) => {
2312 Self::flush_streaming_text(&mut this.streaming_text_buffer, cx);
2313
2314 if r.stop_reason == acp::StopReason::MaxTokens {
2315 this.had_error = true;
2316 cx.emit(AcpThreadEvent::Error);
2317 log::error!("Max tokens reached. Usage: {:?}", this.token_usage);
2318
2319 let exceeded_max_output_tokens =
2320 this.token_usage.as_ref().is_some_and(|u| {
2321 u.max_output_tokens
2322 .is_some_and(|max| u.output_tokens >= max)
2323 });
2324
2325 if exceeded_max_output_tokens {
2326 log::error!(
2327 "Max output tokens reached. Usage: {:?}",
2328 this.token_usage
2329 );
2330 } else {
2331 log::error!("Max tokens reached. Usage: {:?}", this.token_usage);
2332 }
2333 return Err(anyhow!(MaxOutputTokensError));
2334 }
2335
2336 let canceled = matches!(r.stop_reason, acp::StopReason::Cancelled);
2337 if canceled {
2338 this.mark_pending_tools_as_canceled();
2339 }
2340
2341 if !canceled {
2342 this.snapshot_completed_plan(cx);
2343 }
2344
2345 // Handle refusal - distinguish between user prompt and tool call refusals
2346 if let acp::StopReason::Refusal = r.stop_reason {
2347 this.had_error = true;
2348 if let Some((user_msg_ix, _)) = this.last_user_message() {
2349 // Check if there's a completed tool call with results after the last user message
2350 // This indicates the refusal is in response to tool output, not the user's prompt
2351 let has_completed_tool_call_after_user_msg =
2352 this.entries.iter().skip(user_msg_ix + 1).any(|entry| {
2353 if let AgentThreadEntry::ToolCall(tool_call) = entry {
2354 // Check if the tool call has completed and has output
2355 matches!(tool_call.status, ToolCallStatus::Completed)
2356 && tool_call.raw_output.is_some()
2357 } else {
2358 false
2359 }
2360 });
2361
2362 if has_completed_tool_call_after_user_msg {
2363 // Refusal is due to tool output - don't truncate, just notify
2364 // The model refused based on what the tool returned
2365 cx.emit(AcpThreadEvent::Refusal);
2366 } else {
2367 // User prompt was refused - truncate back to before the user message
2368 let range = user_msg_ix..this.entries.len();
2369 if range.start < range.end {
2370 this.entries.truncate(user_msg_ix);
2371 cx.emit(AcpThreadEvent::EntriesRemoved(range));
2372 }
2373 cx.emit(AcpThreadEvent::Refusal);
2374 }
2375 } else {
2376 // No user message found, treat as general refusal
2377 cx.emit(AcpThreadEvent::Refusal);
2378 }
2379 }
2380
2381 if cx.has_flag::<AcpBetaFeatureFlag>()
2382 && let Some(response_usage) = &r.usage
2383 {
2384 let usage = this.token_usage.get_or_insert_with(Default::default);
2385 usage.input_tokens = response_usage.input_tokens;
2386 usage.output_tokens = response_usage.output_tokens;
2387 cx.emit(AcpThreadEvent::TokenUsageUpdated);
2388 }
2389
2390 cx.emit(AcpThreadEvent::Stopped(r.stop_reason));
2391 Ok(Some(r))
2392 }
2393 Err(e) => {
2394 Self::flush_streaming_text(&mut this.streaming_text_buffer, cx);
2395
2396 this.had_error = true;
2397 cx.emit(AcpThreadEvent::Error);
2398 log::error!("Error in run turn: {:?}", e);
2399 Err(e)
2400 }
2401 }
2402 })?
2403 })
2404 .boxed()
2405 }
2406
2407 pub fn cancel(&mut self, cx: &mut Context<Self>) -> Task<()> {
2408 let Some(turn) = self.running_turn.take() else {
2409 return Task::ready(());
2410 };
2411 self.connection.cancel(&self.session_id, cx);
2412
2413 Self::flush_streaming_text(&mut self.streaming_text_buffer, cx);
2414 self.mark_pending_tools_as_canceled();
2415
2416 // Wait for the send task to complete
2417 cx.background_spawn(turn.send_task)
2418 }
2419
2420 fn mark_pending_tools_as_canceled(&mut self) {
2421 for entry in self.entries.iter_mut() {
2422 if let AgentThreadEntry::ToolCall(call) = entry {
2423 let cancel = matches!(
2424 call.status,
2425 ToolCallStatus::Pending
2426 | ToolCallStatus::WaitingForConfirmation { .. }
2427 | ToolCallStatus::InProgress
2428 );
2429
2430 if cancel {
2431 call.status = ToolCallStatus::Canceled;
2432 }
2433 }
2434 }
2435 }
2436
2437 /// Restores the git working tree to the state at the given checkpoint (if one exists)
2438 pub fn restore_checkpoint(
2439 &mut self,
2440 id: UserMessageId,
2441 cx: &mut Context<Self>,
2442 ) -> Task<Result<()>> {
2443 let Some((_, message)) = self.user_message_mut(&id) else {
2444 return Task::ready(Err(anyhow!("message not found")));
2445 };
2446
2447 let checkpoint = message
2448 .checkpoint
2449 .as_ref()
2450 .map(|c| c.git_checkpoint.clone());
2451
2452 // Cancel any in-progress generation before restoring
2453 let cancel_task = self.cancel(cx);
2454 let rewind = self.rewind(id.clone(), cx);
2455 let git_store = self.project.read(cx).git_store().clone();
2456
2457 cx.spawn(async move |_, cx| {
2458 cancel_task.await;
2459 rewind.await?;
2460 if let Some(checkpoint) = checkpoint {
2461 git_store
2462 .update(cx, |git, cx| git.restore_checkpoint(checkpoint, cx))
2463 .await?;
2464 }
2465
2466 Ok(())
2467 })
2468 }
2469
2470 /// Rewinds this thread to before the entry at `index`, removing it and all
2471 /// subsequent entries while rejecting any action_log changes made from that point.
2472 /// Unlike `restore_checkpoint`, this method does not restore from git.
2473 pub fn rewind(&mut self, id: UserMessageId, cx: &mut Context<Self>) -> Task<Result<()>> {
2474 let Some(truncate) = self.connection.truncate(&self.session_id, cx) else {
2475 return Task::ready(Err(anyhow!("not supported")));
2476 };
2477
2478 Self::flush_streaming_text(&mut self.streaming_text_buffer, cx);
2479 let telemetry = ActionLogTelemetry::from(&*self);
2480 cx.spawn(async move |this, cx| {
2481 cx.update(|cx| truncate.run(id.clone(), cx)).await?;
2482 this.update(cx, |this, cx| {
2483 if let Some((ix, _)) = this.user_message_mut(&id) {
2484 // Collect all terminals from entries that will be removed
2485 let terminals_to_remove: Vec<acp::TerminalId> = this.entries[ix..]
2486 .iter()
2487 .flat_map(|entry| entry.terminals())
2488 .filter_map(|terminal| terminal.read(cx).id().clone().into())
2489 .collect();
2490
2491 let range = ix..this.entries.len();
2492 this.entries.truncate(ix);
2493 cx.emit(AcpThreadEvent::EntriesRemoved(range));
2494
2495 // Kill and remove the terminals
2496 for terminal_id in terminals_to_remove {
2497 if let Some(terminal) = this.terminals.remove(&terminal_id) {
2498 terminal.update(cx, |terminal, cx| {
2499 terminal.kill(cx);
2500 });
2501 }
2502 }
2503 }
2504 this.action_log().update(cx, |action_log, cx| {
2505 action_log.reject_all_edits(Some(telemetry), cx)
2506 })
2507 })?
2508 .await;
2509 Ok(())
2510 })
2511 }
2512
2513 fn update_last_checkpoint(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
2514 let git_store = self.project.read(cx).git_store().clone();
2515
2516 let Some((_, message)) = self.last_user_message() else {
2517 return Task::ready(Ok(()));
2518 };
2519 let Some(user_message_id) = message.id.clone() else {
2520 return Task::ready(Ok(()));
2521 };
2522 let Some(checkpoint) = message.checkpoint.as_ref() else {
2523 return Task::ready(Ok(()));
2524 };
2525 let old_checkpoint = checkpoint.git_checkpoint.clone();
2526
2527 let new_checkpoint = git_store.update(cx, |git, cx| git.checkpoint(cx));
2528 cx.spawn(async move |this, cx| {
2529 let Some(new_checkpoint) = new_checkpoint
2530 .await
2531 .context("failed to get new checkpoint")
2532 .log_err()
2533 else {
2534 return Ok(());
2535 };
2536
2537 let equal = git_store
2538 .update(cx, |git, cx| {
2539 git.compare_checkpoints(old_checkpoint.clone(), new_checkpoint, cx)
2540 })
2541 .await
2542 .unwrap_or(true);
2543
2544 this.update(cx, |this, cx| {
2545 if let Some((ix, message)) = this.user_message_mut(&user_message_id) {
2546 if let Some(checkpoint) = message.checkpoint.as_mut() {
2547 checkpoint.show = !equal;
2548 cx.emit(AcpThreadEvent::EntryUpdated(ix));
2549 }
2550 }
2551 })?;
2552
2553 Ok(())
2554 })
2555 }
2556
2557 fn last_user_message(&mut self) -> Option<(usize, &mut UserMessage)> {
2558 self.entries
2559 .iter_mut()
2560 .enumerate()
2561 .rev()
2562 .find_map(|(ix, entry)| {
2563 if let AgentThreadEntry::UserMessage(message) = entry {
2564 Some((ix, message))
2565 } else {
2566 None
2567 }
2568 })
2569 }
2570
2571 fn user_message_mut(&mut self, id: &UserMessageId) -> Option<(usize, &mut UserMessage)> {
2572 self.entries.iter_mut().enumerate().find_map(|(ix, entry)| {
2573 if let AgentThreadEntry::UserMessage(message) = entry {
2574 if message.id.as_ref() == Some(id) {
2575 Some((ix, message))
2576 } else {
2577 None
2578 }
2579 } else {
2580 None
2581 }
2582 })
2583 }
2584
2585 pub fn read_text_file(
2586 &self,
2587 path: PathBuf,
2588 line: Option<u32>,
2589 limit: Option<u32>,
2590 reuse_shared_snapshot: bool,
2591 cx: &mut Context<Self>,
2592 ) -> Task<Result<String, acp::Error>> {
2593 // Args are 1-based, move to 0-based
2594 let line = line.unwrap_or_default().saturating_sub(1);
2595 let limit = limit.unwrap_or(u32::MAX);
2596 let project = self.project.clone();
2597 let action_log = self.action_log.clone();
2598 let should_update_agent_location = self.parent_session_id.is_none();
2599 cx.spawn(async move |this, cx| {
2600 let load = project.update(cx, |project, cx| {
2601 let path = project
2602 .project_path_for_absolute_path(&path, cx)
2603 .ok_or_else(|| {
2604 acp::Error::resource_not_found(Some(path.display().to_string()))
2605 })?;
2606 Ok::<_, acp::Error>(project.open_buffer(path, cx))
2607 })?;
2608
2609 let buffer = load.await?;
2610
2611 let snapshot = if reuse_shared_snapshot {
2612 this.read_with(cx, |this, _| {
2613 this.shared_buffers.get(&buffer.clone()).cloned()
2614 })
2615 .log_err()
2616 .flatten()
2617 } else {
2618 None
2619 };
2620
2621 let snapshot = if let Some(snapshot) = snapshot {
2622 snapshot
2623 } else {
2624 action_log.update(cx, |action_log, cx| {
2625 action_log.buffer_read(buffer.clone(), cx);
2626 });
2627
2628 let snapshot = buffer.update(cx, |buffer, _| buffer.snapshot());
2629 this.update(cx, |this, _| {
2630 this.shared_buffers.insert(buffer.clone(), snapshot.clone());
2631 })?;
2632 snapshot
2633 };
2634
2635 let max_point = snapshot.max_point();
2636 let start_position = Point::new(line, 0);
2637
2638 if start_position > max_point {
2639 return Err(acp::Error::invalid_params().data(format!(
2640 "Attempting to read beyond the end of the file, line {}:{}",
2641 max_point.row + 1,
2642 max_point.column
2643 )));
2644 }
2645
2646 let start = snapshot.anchor_before(start_position);
2647 let end = snapshot.anchor_before(Point::new(line.saturating_add(limit), 0));
2648
2649 if should_update_agent_location {
2650 project.update(cx, |project, cx| {
2651 project.set_agent_location(
2652 Some(AgentLocation {
2653 buffer: buffer.downgrade(),
2654 position: start,
2655 }),
2656 cx,
2657 );
2658 });
2659 }
2660
2661 Ok(snapshot.text_for_range(start..end).collect::<String>())
2662 })
2663 }
2664
2665 pub fn write_text_file(
2666 &self,
2667 path: PathBuf,
2668 content: String,
2669 cx: &mut Context<Self>,
2670 ) -> Task<Result<()>> {
2671 let project = self.project.clone();
2672 let action_log = self.action_log.clone();
2673 let should_update_agent_location = self.parent_session_id.is_none();
2674 cx.spawn(async move |this, cx| {
2675 let load = project.update(cx, |project, cx| {
2676 let path = project
2677 .project_path_for_absolute_path(&path, cx)
2678 .context("invalid path")?;
2679 anyhow::Ok(project.open_buffer(path, cx))
2680 });
2681 let buffer = load?.await?;
2682 let snapshot = this.update(cx, |this, cx| {
2683 this.shared_buffers
2684 .get(&buffer)
2685 .cloned()
2686 .unwrap_or_else(|| buffer.read(cx).snapshot())
2687 })?;
2688 let edits = cx
2689 .background_executor()
2690 .spawn(async move {
2691 let old_text = snapshot.text();
2692 text_diff(old_text.as_str(), &content)
2693 .into_iter()
2694 .map(|(range, replacement)| {
2695 (snapshot.anchor_range_inside(range), replacement)
2696 })
2697 .collect::<Vec<_>>()
2698 })
2699 .await;
2700
2701 if should_update_agent_location {
2702 project.update(cx, |project, cx| {
2703 project.set_agent_location(
2704 Some(AgentLocation {
2705 buffer: buffer.downgrade(),
2706 position: edits
2707 .last()
2708 .map(|(range, _)| range.end)
2709 .unwrap_or(Anchor::min_for_buffer(buffer.read(cx).remote_id())),
2710 }),
2711 cx,
2712 );
2713 });
2714 }
2715
2716 let format_on_save = cx.update(|cx| {
2717 action_log.update(cx, |action_log, cx| {
2718 action_log.buffer_read(buffer.clone(), cx);
2719 });
2720
2721 let format_on_save = buffer.update(cx, |buffer, cx| {
2722 buffer.edit(edits, None, cx);
2723
2724 let settings =
2725 language::language_settings::LanguageSettings::for_buffer(buffer, cx);
2726
2727 settings.format_on_save != FormatOnSave::Off
2728 });
2729 action_log.update(cx, |action_log, cx| {
2730 action_log.buffer_edited(buffer.clone(), cx);
2731 });
2732 format_on_save
2733 });
2734
2735 if format_on_save {
2736 let format_task = project.update(cx, |project, cx| {
2737 project.format(
2738 HashSet::from_iter([buffer.clone()]),
2739 LspFormatTarget::Buffers,
2740 false,
2741 FormatTrigger::Save,
2742 cx,
2743 )
2744 });
2745 format_task.await.log_err();
2746
2747 action_log.update(cx, |action_log, cx| {
2748 action_log.buffer_edited(buffer.clone(), cx);
2749 });
2750 }
2751
2752 project
2753 .update(cx, |project, cx| project.save_buffer(buffer, cx))
2754 .await
2755 })
2756 }
2757
2758 pub fn create_terminal(
2759 &self,
2760 command: String,
2761 args: Vec<String>,
2762 extra_env: Vec<acp::EnvVariable>,
2763 cwd: Option<PathBuf>,
2764 output_byte_limit: Option<u64>,
2765 cx: &mut Context<Self>,
2766 ) -> Task<Result<Entity<Terminal>>> {
2767 let env = match &cwd {
2768 Some(dir) => self.project.update(cx, |project, cx| {
2769 project.environment().update(cx, |env, cx| {
2770 env.directory_environment(dir.as_path().into(), cx)
2771 })
2772 }),
2773 None => Task::ready(None).shared(),
2774 };
2775 let env = cx.spawn(async move |_, _| {
2776 let mut env = env.await.unwrap_or_default();
2777 // Disables paging for `git` and hopefully other commands
2778 env.insert("PAGER".into(), "".into());
2779 for var in extra_env {
2780 env.insert(var.name, var.value);
2781 }
2782 env
2783 });
2784
2785 let project = self.project.clone();
2786 let language_registry = project.read(cx).languages().clone();
2787 let is_windows = project.read(cx).path_style(cx).is_windows();
2788
2789 let terminal_id = acp::TerminalId::new(Uuid::new_v4().to_string());
2790 let terminal_task = cx.spawn({
2791 let terminal_id = terminal_id.clone();
2792 async move |_this, cx| {
2793 let env = env.await;
2794 let shell = project
2795 .update(cx, |project, cx| {
2796 project
2797 .remote_client()
2798 .and_then(|r| r.read(cx).default_system_shell())
2799 })
2800 .unwrap_or_else(|| get_default_system_shell_preferring_bash());
2801 let (task_command, task_args) =
2802 ShellBuilder::new(&Shell::Program(shell), is_windows)
2803 .redirect_stdin_to_dev_null()
2804 .build(Some(command.clone()), &args);
2805 let terminal = project
2806 .update(cx, |project, cx| {
2807 project.create_terminal_task(
2808 task::SpawnInTerminal {
2809 command: Some(task_command),
2810 args: task_args,
2811 cwd: cwd.clone(),
2812 env,
2813 ..Default::default()
2814 },
2815 cx,
2816 )
2817 })
2818 .await?;
2819
2820 anyhow::Ok(cx.new(|cx| {
2821 Terminal::new(
2822 terminal_id,
2823 &format!("{} {}", command, args.join(" ")),
2824 cwd,
2825 output_byte_limit.map(|l| l as usize),
2826 terminal,
2827 language_registry,
2828 cx,
2829 )
2830 }))
2831 }
2832 });
2833
2834 cx.spawn(async move |this, cx| {
2835 let terminal = terminal_task.await?;
2836 this.update(cx, |this, _cx| {
2837 this.terminals.insert(terminal_id, terminal.clone());
2838 terminal
2839 })
2840 })
2841 }
2842
2843 pub fn kill_terminal(
2844 &mut self,
2845 terminal_id: acp::TerminalId,
2846 cx: &mut Context<Self>,
2847 ) -> Result<()> {
2848 self.terminals
2849 .get(&terminal_id)
2850 .context("Terminal not found")?
2851 .update(cx, |terminal, cx| {
2852 terminal.kill(cx);
2853 });
2854
2855 Ok(())
2856 }
2857
2858 pub fn release_terminal(
2859 &mut self,
2860 terminal_id: acp::TerminalId,
2861 cx: &mut Context<Self>,
2862 ) -> Result<()> {
2863 self.terminals
2864 .remove(&terminal_id)
2865 .context("Terminal not found")?
2866 .update(cx, |terminal, cx| {
2867 terminal.kill(cx);
2868 });
2869
2870 Ok(())
2871 }
2872
2873 pub fn terminal(&self, terminal_id: acp::TerminalId) -> Result<Entity<Terminal>> {
2874 self.terminals
2875 .get(&terminal_id)
2876 .context("Terminal not found")
2877 .cloned()
2878 }
2879
2880 pub fn to_markdown(&self, cx: &App) -> String {
2881 self.entries.iter().map(|e| e.to_markdown(cx)).collect()
2882 }
2883
2884 pub fn emit_load_error(&mut self, error: LoadError, cx: &mut Context<Self>) {
2885 cx.emit(AcpThreadEvent::LoadError(error));
2886 }
2887
2888 pub fn register_terminal_created(
2889 &mut self,
2890 terminal_id: acp::TerminalId,
2891 command_label: String,
2892 working_dir: Option<PathBuf>,
2893 output_byte_limit: Option<u64>,
2894 terminal: Entity<::terminal::Terminal>,
2895 cx: &mut Context<Self>,
2896 ) -> Entity<Terminal> {
2897 let language_registry = self.project.read(cx).languages().clone();
2898
2899 let entity = cx.new(|cx| {
2900 Terminal::new(
2901 terminal_id.clone(),
2902 &command_label,
2903 working_dir.clone(),
2904 output_byte_limit.map(|l| l as usize),
2905 terminal,
2906 language_registry,
2907 cx,
2908 )
2909 });
2910 self.terminals.insert(terminal_id.clone(), entity.clone());
2911 entity
2912 }
2913
2914 pub fn mark_as_subagent_output(&mut self, cx: &mut Context<Self>) {
2915 for entry in self.entries.iter_mut().rev() {
2916 if let AgentThreadEntry::AssistantMessage(assistant_message) = entry {
2917 assistant_message.is_subagent_output = true;
2918 cx.notify();
2919 return;
2920 }
2921 }
2922 }
2923
2924 pub fn on_terminal_provider_event(
2925 &mut self,
2926 event: TerminalProviderEvent,
2927 cx: &mut Context<Self>,
2928 ) {
2929 match event {
2930 TerminalProviderEvent::Created {
2931 terminal_id,
2932 label,
2933 cwd,
2934 output_byte_limit,
2935 terminal,
2936 } => {
2937 let entity = self.register_terminal_created(
2938 terminal_id.clone(),
2939 label,
2940 cwd,
2941 output_byte_limit,
2942 terminal,
2943 cx,
2944 );
2945
2946 if let Some(mut chunks) = self.pending_terminal_output.remove(&terminal_id) {
2947 for data in chunks.drain(..) {
2948 entity.update(cx, |term, cx| {
2949 term.inner().update(cx, |inner, cx| {
2950 inner.write_output(&data, cx);
2951 })
2952 });
2953 }
2954 }
2955
2956 if let Some(_status) = self.pending_terminal_exit.remove(&terminal_id) {
2957 entity.update(cx, |_term, cx| {
2958 cx.notify();
2959 });
2960 }
2961
2962 cx.notify();
2963 }
2964 TerminalProviderEvent::Output { terminal_id, data } => {
2965 if let Some(entity) = self.terminals.get(&terminal_id) {
2966 entity.update(cx, |term, cx| {
2967 term.inner().update(cx, |inner, cx| {
2968 inner.write_output(&data, cx);
2969 })
2970 });
2971 } else {
2972 self.pending_terminal_output
2973 .entry(terminal_id)
2974 .or_default()
2975 .push(data);
2976 }
2977 }
2978 TerminalProviderEvent::TitleChanged { terminal_id, title } => {
2979 if let Some(entity) = self.terminals.get(&terminal_id) {
2980 entity.update(cx, |term, cx| {
2981 term.inner().update(cx, |inner, cx| {
2982 inner.breadcrumb_text = title;
2983 cx.emit(::terminal::Event::BreadcrumbsChanged);
2984 })
2985 });
2986 }
2987 }
2988 TerminalProviderEvent::Exit {
2989 terminal_id,
2990 status,
2991 } => {
2992 if let Some(entity) = self.terminals.get(&terminal_id) {
2993 entity.update(cx, |_term, cx| {
2994 cx.notify();
2995 });
2996 } else {
2997 self.pending_terminal_exit.insert(terminal_id, status);
2998 }
2999 }
3000 }
3001 }
3002}
3003
3004fn markdown_for_raw_output(
3005 raw_output: &serde_json::Value,
3006 language_registry: &Arc<LanguageRegistry>,
3007 cx: &mut App,
3008) -> Option<Entity<Markdown>> {
3009 match raw_output {
3010 serde_json::Value::Null => None,
3011 serde_json::Value::Bool(value) => Some(cx.new(|cx| {
3012 Markdown::new(
3013 value.to_string().into(),
3014 Some(language_registry.clone()),
3015 None,
3016 cx,
3017 )
3018 })),
3019 serde_json::Value::Number(value) => Some(cx.new(|cx| {
3020 Markdown::new(
3021 value.to_string().into(),
3022 Some(language_registry.clone()),
3023 None,
3024 cx,
3025 )
3026 })),
3027 serde_json::Value::String(value) => Some(cx.new(|cx| {
3028 Markdown::new(
3029 value.clone().into(),
3030 Some(language_registry.clone()),
3031 None,
3032 cx,
3033 )
3034 })),
3035 value => Some(cx.new(|cx| {
3036 let pretty_json = to_string_pretty(value).unwrap_or_else(|_| value.to_string());
3037
3038 Markdown::new(
3039 format!("```json\n{}\n```", pretty_json).into(),
3040 Some(language_registry.clone()),
3041 None,
3042 cx,
3043 )
3044 })),
3045 }
3046}
3047
3048#[cfg(test)]
3049mod tests {
3050 use super::*;
3051 use anyhow::anyhow;
3052 use futures::{channel::mpsc, future::LocalBoxFuture, select};
3053 use gpui::{App, AsyncApp, TestAppContext, WeakEntity};
3054 use indoc::indoc;
3055 use project::{AgentId, FakeFs, Fs};
3056 use rand::{distr, prelude::*};
3057 use serde_json::json;
3058 use settings::SettingsStore;
3059 use smol::stream::StreamExt as _;
3060 use std::{
3061 any::Any,
3062 cell::RefCell,
3063 path::Path,
3064 rc::Rc,
3065 sync::atomic::{AtomicBool, AtomicUsize, Ordering::SeqCst},
3066 time::Duration,
3067 };
3068 use util::{path, path_list::PathList};
3069
3070 fn init_test(cx: &mut TestAppContext) {
3071 env_logger::try_init().ok();
3072 cx.update(|cx| {
3073 let settings_store = SettingsStore::test(cx);
3074 cx.set_global(settings_store);
3075 });
3076 }
3077
3078 #[gpui::test]
3079 async fn test_terminal_output_buffered_before_created_renders(cx: &mut gpui::TestAppContext) {
3080 init_test(cx);
3081
3082 let fs = FakeFs::new(cx.executor());
3083 let project = Project::test(fs, [], cx).await;
3084 let connection = Rc::new(FakeAgentConnection::new());
3085 let thread = cx
3086 .update(|cx| {
3087 connection.new_session(
3088 project,
3089 PathList::new(&[std::path::Path::new(path!("/test"))]),
3090 cx,
3091 )
3092 })
3093 .await
3094 .unwrap();
3095
3096 let terminal_id = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
3097
3098 // Send Output BEFORE Created - should be buffered by acp_thread
3099 thread.update(cx, |thread, cx| {
3100 thread.on_terminal_provider_event(
3101 TerminalProviderEvent::Output {
3102 terminal_id: terminal_id.clone(),
3103 data: b"hello buffered".to_vec(),
3104 },
3105 cx,
3106 );
3107 });
3108
3109 // Create a display-only terminal and then send Created
3110 let lower = cx.new(|cx| {
3111 let builder = ::terminal::TerminalBuilder::new_display_only(
3112 ::terminal::terminal_settings::CursorShape::default(),
3113 ::terminal::terminal_settings::AlternateScroll::On,
3114 None,
3115 0,
3116 cx.background_executor(),
3117 PathStyle::local(),
3118 )
3119 .unwrap();
3120 builder.subscribe(cx)
3121 });
3122
3123 thread.update(cx, |thread, cx| {
3124 thread.on_terminal_provider_event(
3125 TerminalProviderEvent::Created {
3126 terminal_id: terminal_id.clone(),
3127 label: "Buffered Test".to_string(),
3128 cwd: None,
3129 output_byte_limit: None,
3130 terminal: lower.clone(),
3131 },
3132 cx,
3133 );
3134 });
3135
3136 // After Created, buffered Output should have been flushed into the renderer
3137 let content = thread.read_with(cx, |thread, cx| {
3138 let term = thread.terminal(terminal_id.clone()).unwrap();
3139 term.read_with(cx, |t, cx| t.inner().read(cx).get_content())
3140 });
3141
3142 assert!(
3143 content.contains("hello buffered"),
3144 "expected buffered output to render, got: {content}"
3145 );
3146 }
3147
3148 #[gpui::test]
3149 async fn test_terminal_output_and_exit_buffered_before_created(cx: &mut gpui::TestAppContext) {
3150 init_test(cx);
3151
3152 let fs = FakeFs::new(cx.executor());
3153 let project = Project::test(fs, [], cx).await;
3154 let connection = Rc::new(FakeAgentConnection::new());
3155 let thread = cx
3156 .update(|cx| {
3157 connection.new_session(
3158 project,
3159 PathList::new(&[std::path::Path::new(path!("/test"))]),
3160 cx,
3161 )
3162 })
3163 .await
3164 .unwrap();
3165
3166 let terminal_id = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
3167
3168 // Send Output BEFORE Created
3169 thread.update(cx, |thread, cx| {
3170 thread.on_terminal_provider_event(
3171 TerminalProviderEvent::Output {
3172 terminal_id: terminal_id.clone(),
3173 data: b"pre-exit data".to_vec(),
3174 },
3175 cx,
3176 );
3177 });
3178
3179 // Send Exit BEFORE Created
3180 thread.update(cx, |thread, cx| {
3181 thread.on_terminal_provider_event(
3182 TerminalProviderEvent::Exit {
3183 terminal_id: terminal_id.clone(),
3184 status: acp::TerminalExitStatus::new().exit_code(0),
3185 },
3186 cx,
3187 );
3188 });
3189
3190 // Now create a display-only lower-level terminal and send Created
3191 let lower = cx.new(|cx| {
3192 let builder = ::terminal::TerminalBuilder::new_display_only(
3193 ::terminal::terminal_settings::CursorShape::default(),
3194 ::terminal::terminal_settings::AlternateScroll::On,
3195 None,
3196 0,
3197 cx.background_executor(),
3198 PathStyle::local(),
3199 )
3200 .unwrap();
3201 builder.subscribe(cx)
3202 });
3203
3204 thread.update(cx, |thread, cx| {
3205 thread.on_terminal_provider_event(
3206 TerminalProviderEvent::Created {
3207 terminal_id: terminal_id.clone(),
3208 label: "Buffered Exit Test".to_string(),
3209 cwd: None,
3210 output_byte_limit: None,
3211 terminal: lower.clone(),
3212 },
3213 cx,
3214 );
3215 });
3216
3217 // Output should be present after Created (flushed from buffer)
3218 let content = thread.read_with(cx, |thread, cx| {
3219 let term = thread.terminal(terminal_id.clone()).unwrap();
3220 term.read_with(cx, |t, cx| t.inner().read(cx).get_content())
3221 });
3222
3223 assert!(
3224 content.contains("pre-exit data"),
3225 "expected pre-exit data to render, got: {content}"
3226 );
3227 }
3228
3229 /// Test that killing a terminal via Terminal::kill properly:
3230 /// 1. Causes wait_for_exit to complete (doesn't hang forever)
3231 /// 2. The underlying terminal still has the output that was written before the kill
3232 ///
3233 /// This test verifies that the fix to kill_active_task (which now also kills
3234 /// the shell process in addition to the foreground process) properly allows
3235 /// wait_for_exit to complete instead of hanging indefinitely.
3236 #[cfg(unix)]
3237 #[gpui::test]
3238 async fn test_terminal_kill_allows_wait_for_exit_to_complete(cx: &mut gpui::TestAppContext) {
3239 use std::collections::HashMap;
3240 use task::Shell;
3241 use util::shell_builder::ShellBuilder;
3242
3243 init_test(cx);
3244 cx.executor().allow_parking();
3245
3246 let fs = FakeFs::new(cx.executor());
3247 let project = Project::test(fs, [], cx).await;
3248 let connection = Rc::new(FakeAgentConnection::new());
3249 let thread = cx
3250 .update(|cx| {
3251 connection.new_session(
3252 project.clone(),
3253 PathList::new(&[Path::new(path!("/test"))]),
3254 cx,
3255 )
3256 })
3257 .await
3258 .unwrap();
3259
3260 let terminal_id = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
3261
3262 // Create a real PTY terminal that runs a command which prints output then sleeps
3263 // We use printf instead of echo and chain with && sleep to ensure proper execution
3264 let (completion_tx, _completion_rx) = smol::channel::unbounded();
3265 let (program, args) = ShellBuilder::new(&Shell::System, false).build(
3266 Some("printf 'output_before_kill\\n' && sleep 60".to_owned()),
3267 &[],
3268 );
3269
3270 let builder = cx
3271 .update(|cx| {
3272 ::terminal::TerminalBuilder::new(
3273 None,
3274 None,
3275 task::Shell::WithArguments {
3276 program,
3277 args,
3278 title_override: None,
3279 },
3280 HashMap::default(),
3281 ::terminal::terminal_settings::CursorShape::default(),
3282 ::terminal::terminal_settings::AlternateScroll::On,
3283 None,
3284 vec![],
3285 0,
3286 false,
3287 0,
3288 Some(completion_tx),
3289 cx,
3290 vec![],
3291 PathStyle::local(),
3292 )
3293 })
3294 .await
3295 .unwrap();
3296
3297 let lower_terminal = cx.new(|cx| builder.subscribe(cx));
3298
3299 // Create the acp_thread Terminal wrapper
3300 thread.update(cx, |thread, cx| {
3301 thread.on_terminal_provider_event(
3302 TerminalProviderEvent::Created {
3303 terminal_id: terminal_id.clone(),
3304 label: "printf output_before_kill && sleep 60".to_string(),
3305 cwd: None,
3306 output_byte_limit: None,
3307 terminal: lower_terminal.clone(),
3308 },
3309 cx,
3310 );
3311 });
3312
3313 // Poll until the printf command produces output, rather than using a
3314 // fixed sleep which is flaky on loaded machines.
3315 let deadline = std::time::Instant::now() + Duration::from_secs(10);
3316 loop {
3317 let has_output = thread.read_with(cx, |thread, cx| {
3318 let term = thread
3319 .terminals
3320 .get(&terminal_id)
3321 .expect("terminal not found");
3322 let content = term.read(cx).inner().read(cx).get_content();
3323 content.contains("output_before_kill")
3324 });
3325 if has_output {
3326 break;
3327 }
3328 assert!(
3329 std::time::Instant::now() < deadline,
3330 "Timed out waiting for printf output to appear in terminal",
3331 );
3332 cx.executor().timer(Duration::from_millis(50)).await;
3333 }
3334
3335 // Get the acp_thread Terminal and kill it
3336 let wait_for_exit = thread.update(cx, |thread, cx| {
3337 let term = thread.terminals.get(&terminal_id).unwrap();
3338 let wait_for_exit = term.read(cx).wait_for_exit();
3339 term.update(cx, |term, cx| {
3340 term.kill(cx);
3341 });
3342 wait_for_exit
3343 });
3344
3345 // KEY ASSERTION: wait_for_exit should complete within a reasonable time (not hang).
3346 // Before the fix to kill_active_task, this would hang forever because
3347 // only the foreground process was killed, not the shell, so the PTY
3348 // child never exited and wait_for_completed_task never completed.
3349 let exit_result = futures::select! {
3350 result = futures::FutureExt::fuse(wait_for_exit) => Some(result),
3351 _ = futures::FutureExt::fuse(cx.background_executor.timer(Duration::from_secs(5))) => None,
3352 };
3353
3354 assert!(
3355 exit_result.is_some(),
3356 "wait_for_exit should complete after kill, but it timed out. \
3357 This indicates kill_active_task is not properly killing the shell process."
3358 );
3359
3360 // Give the system a chance to process any pending updates
3361 cx.run_until_parked();
3362
3363 // Verify that the underlying terminal still has the output that was
3364 // written before the kill. This verifies that killing doesn't lose output.
3365 let inner_content = thread.read_with(cx, |thread, cx| {
3366 let term = thread.terminals.get(&terminal_id).unwrap();
3367 term.read(cx).inner().read(cx).get_content()
3368 });
3369
3370 assert!(
3371 inner_content.contains("output_before_kill"),
3372 "Underlying terminal should contain output from before kill, got: {}",
3373 inner_content
3374 );
3375 }
3376
3377 #[gpui::test]
3378 async fn test_push_user_content_block(cx: &mut gpui::TestAppContext) {
3379 init_test(cx);
3380
3381 let fs = FakeFs::new(cx.executor());
3382 let project = Project::test(fs, [], cx).await;
3383 let connection = Rc::new(FakeAgentConnection::new());
3384 let thread = cx
3385 .update(|cx| {
3386 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
3387 })
3388 .await
3389 .unwrap();
3390
3391 // Test creating a new user message
3392 thread.update(cx, |thread, cx| {
3393 thread.push_user_content_block(None, "Hello, ".into(), cx);
3394 });
3395
3396 thread.update(cx, |thread, cx| {
3397 assert_eq!(thread.entries.len(), 1);
3398 if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
3399 assert_eq!(user_msg.id, None);
3400 assert_eq!(user_msg.content.to_markdown(cx), "Hello, ");
3401 } else {
3402 panic!("Expected UserMessage");
3403 }
3404 });
3405
3406 // Test appending to existing user message
3407 let message_1_id = UserMessageId::new();
3408 thread.update(cx, |thread, cx| {
3409 thread.push_user_content_block(Some(message_1_id.clone()), "world!".into(), cx);
3410 });
3411
3412 thread.update(cx, |thread, cx| {
3413 assert_eq!(thread.entries.len(), 1);
3414 if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
3415 assert_eq!(user_msg.id, Some(message_1_id));
3416 assert_eq!(user_msg.content.to_markdown(cx), "Hello, world!");
3417 } else {
3418 panic!("Expected UserMessage");
3419 }
3420 });
3421
3422 // Test creating new user message after assistant message
3423 thread.update(cx, |thread, cx| {
3424 thread.push_assistant_content_block("Assistant response".into(), false, cx);
3425 });
3426
3427 let message_2_id = UserMessageId::new();
3428 thread.update(cx, |thread, cx| {
3429 thread.push_user_content_block(
3430 Some(message_2_id.clone()),
3431 "New user message".into(),
3432 cx,
3433 );
3434 });
3435
3436 thread.update(cx, |thread, cx| {
3437 assert_eq!(thread.entries.len(), 3);
3438 if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[2] {
3439 assert_eq!(user_msg.id, Some(message_2_id));
3440 assert_eq!(user_msg.content.to_markdown(cx), "New user message");
3441 } else {
3442 panic!("Expected UserMessage at index 2");
3443 }
3444 });
3445 }
3446
3447 #[gpui::test]
3448 async fn test_thinking_concatenation(cx: &mut gpui::TestAppContext) {
3449 init_test(cx);
3450
3451 let fs = FakeFs::new(cx.executor());
3452 let project = Project::test(fs, [], cx).await;
3453 let connection = Rc::new(FakeAgentConnection::new().on_user_message(
3454 |_, thread, mut cx| {
3455 async move {
3456 thread.update(&mut cx, |thread, cx| {
3457 thread
3458 .handle_session_update(
3459 acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk::new(
3460 "Thinking ".into(),
3461 )),
3462 cx,
3463 )
3464 .unwrap();
3465 thread
3466 .handle_session_update(
3467 acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk::new(
3468 "hard!".into(),
3469 )),
3470 cx,
3471 )
3472 .unwrap();
3473 })?;
3474 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3475 }
3476 .boxed_local()
3477 },
3478 ));
3479
3480 let thread = cx
3481 .update(|cx| {
3482 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
3483 })
3484 .await
3485 .unwrap();
3486
3487 thread
3488 .update(cx, |thread, cx| thread.send_raw("Hello from Zed!", cx))
3489 .await
3490 .unwrap();
3491
3492 let output = thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
3493 assert_eq!(
3494 output,
3495 indoc! {r#"
3496 ## User
3497
3498 Hello from Zed!
3499
3500 ## Assistant
3501
3502 <thinking>
3503 Thinking hard!
3504 </thinking>
3505
3506 "#}
3507 );
3508 }
3509
3510 #[gpui::test]
3511 async fn test_ignore_echoed_user_message_chunks_during_active_turn(
3512 cx: &mut gpui::TestAppContext,
3513 ) {
3514 init_test(cx);
3515
3516 let fs = FakeFs::new(cx.executor());
3517 let project = Project::test(fs, [], cx).await;
3518 let connection = Rc::new(FakeAgentConnection::new().on_user_message(
3519 |request, thread, mut cx| {
3520 async move {
3521 let prompt = request.prompt.first().cloned().unwrap_or_else(|| "".into());
3522
3523 thread.update(&mut cx, |thread, cx| {
3524 thread
3525 .handle_session_update(
3526 acp::SessionUpdate::UserMessageChunk(acp::ContentChunk::new(
3527 prompt,
3528 )),
3529 cx,
3530 )
3531 .unwrap();
3532 })?;
3533
3534 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3535 }
3536 .boxed_local()
3537 },
3538 ));
3539
3540 let thread = cx
3541 .update(|cx| {
3542 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
3543 })
3544 .await
3545 .unwrap();
3546
3547 thread
3548 .update(cx, |thread, cx| thread.send_raw("Hello from Zed!", cx))
3549 .await
3550 .unwrap();
3551
3552 let output = thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
3553 assert_eq!(output.matches("Hello from Zed!").count(), 1);
3554 }
3555
3556 #[gpui::test]
3557 async fn test_edits_concurrently_to_user(cx: &mut TestAppContext) {
3558 init_test(cx);
3559
3560 let fs = FakeFs::new(cx.executor());
3561 fs.insert_tree(path!("/tmp"), json!({"foo": "one\ntwo\nthree\n"}))
3562 .await;
3563 let project = Project::test(fs.clone(), [], cx).await;
3564 let (read_file_tx, read_file_rx) = oneshot::channel::<()>();
3565 let read_file_tx = Rc::new(RefCell::new(Some(read_file_tx)));
3566 let connection = Rc::new(FakeAgentConnection::new().on_user_message(
3567 move |_, thread, mut cx| {
3568 let read_file_tx = read_file_tx.clone();
3569 async move {
3570 let content = thread
3571 .update(&mut cx, |thread, cx| {
3572 thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
3573 })
3574 .unwrap()
3575 .await
3576 .unwrap();
3577 assert_eq!(content, "one\ntwo\nthree\n");
3578 read_file_tx.take().unwrap().send(()).unwrap();
3579 thread
3580 .update(&mut cx, |thread, cx| {
3581 thread.write_text_file(
3582 path!("/tmp/foo").into(),
3583 "one\ntwo\nthree\nfour\nfive\n".to_string(),
3584 cx,
3585 )
3586 })
3587 .unwrap()
3588 .await
3589 .unwrap();
3590 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3591 }
3592 .boxed_local()
3593 },
3594 ));
3595
3596 let (worktree, pathbuf) = project
3597 .update(cx, |project, cx| {
3598 project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
3599 })
3600 .await
3601 .unwrap();
3602 let buffer = project
3603 .update(cx, |project, cx| {
3604 project.open_buffer((worktree.read(cx).id(), pathbuf), cx)
3605 })
3606 .await
3607 .unwrap();
3608
3609 let thread = cx
3610 .update(|cx| {
3611 connection.new_session(project, PathList::new(&[Path::new(path!("/tmp"))]), cx)
3612 })
3613 .await
3614 .unwrap();
3615
3616 let request = thread.update(cx, |thread, cx| {
3617 thread.send_raw("Extend the count in /tmp/foo", cx)
3618 });
3619 read_file_rx.await.ok();
3620 buffer.update(cx, |buffer, cx| {
3621 buffer.edit([(0..0, "zero\n".to_string())], None, cx);
3622 });
3623 cx.run_until_parked();
3624 assert_eq!(
3625 buffer.read_with(cx, |buffer, _| buffer.text()),
3626 "zero\none\ntwo\nthree\nfour\nfive\n"
3627 );
3628 assert_eq!(
3629 String::from_utf8(fs.read_file_sync(path!("/tmp/foo")).unwrap()).unwrap(),
3630 "zero\none\ntwo\nthree\nfour\nfive\n"
3631 );
3632 request.await.unwrap();
3633 }
3634
3635 #[gpui::test]
3636 async fn test_reading_from_line(cx: &mut TestAppContext) {
3637 init_test(cx);
3638
3639 let fs = FakeFs::new(cx.executor());
3640 fs.insert_tree(path!("/tmp"), json!({"foo": "one\ntwo\nthree\nfour\n"}))
3641 .await;
3642 let project = Project::test(fs.clone(), [], cx).await;
3643 project
3644 .update(cx, |project, cx| {
3645 project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
3646 })
3647 .await
3648 .unwrap();
3649
3650 let connection = Rc::new(FakeAgentConnection::new());
3651
3652 let thread = cx
3653 .update(|cx| {
3654 connection.new_session(project, PathList::new(&[Path::new(path!("/tmp"))]), cx)
3655 })
3656 .await
3657 .unwrap();
3658
3659 // Whole file
3660 let content = thread
3661 .update(cx, |thread, cx| {
3662 thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
3663 })
3664 .await
3665 .unwrap();
3666
3667 assert_eq!(content, "one\ntwo\nthree\nfour\n");
3668
3669 // Only start line
3670 let content = thread
3671 .update(cx, |thread, cx| {
3672 thread.read_text_file(path!("/tmp/foo").into(), Some(3), None, false, cx)
3673 })
3674 .await
3675 .unwrap();
3676
3677 assert_eq!(content, "three\nfour\n");
3678
3679 // Only limit
3680 let content = thread
3681 .update(cx, |thread, cx| {
3682 thread.read_text_file(path!("/tmp/foo").into(), None, Some(2), false, cx)
3683 })
3684 .await
3685 .unwrap();
3686
3687 assert_eq!(content, "one\ntwo\n");
3688
3689 // Range
3690 let content = thread
3691 .update(cx, |thread, cx| {
3692 thread.read_text_file(path!("/tmp/foo").into(), Some(2), Some(2), false, cx)
3693 })
3694 .await
3695 .unwrap();
3696
3697 assert_eq!(content, "two\nthree\n");
3698
3699 // Invalid
3700 let err = thread
3701 .update(cx, |thread, cx| {
3702 thread.read_text_file(path!("/tmp/foo").into(), Some(6), Some(2), false, cx)
3703 })
3704 .await
3705 .unwrap_err();
3706
3707 assert_eq!(
3708 err.to_string(),
3709 "Invalid params: \"Attempting to read beyond the end of the file, line 5:0\""
3710 );
3711 }
3712
3713 #[gpui::test]
3714 async fn test_reading_empty_file(cx: &mut TestAppContext) {
3715 init_test(cx);
3716
3717 let fs = FakeFs::new(cx.executor());
3718 fs.insert_tree(path!("/tmp"), json!({"foo": ""})).await;
3719 let project = Project::test(fs.clone(), [], cx).await;
3720 project
3721 .update(cx, |project, cx| {
3722 project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
3723 })
3724 .await
3725 .unwrap();
3726
3727 let connection = Rc::new(FakeAgentConnection::new());
3728
3729 let thread = cx
3730 .update(|cx| {
3731 connection.new_session(project, PathList::new(&[Path::new(path!("/tmp"))]), cx)
3732 })
3733 .await
3734 .unwrap();
3735
3736 // Whole file
3737 let content = thread
3738 .update(cx, |thread, cx| {
3739 thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
3740 })
3741 .await
3742 .unwrap();
3743
3744 assert_eq!(content, "");
3745
3746 // Only start line
3747 let content = thread
3748 .update(cx, |thread, cx| {
3749 thread.read_text_file(path!("/tmp/foo").into(), Some(1), None, false, cx)
3750 })
3751 .await
3752 .unwrap();
3753
3754 assert_eq!(content, "");
3755
3756 // Only limit
3757 let content = thread
3758 .update(cx, |thread, cx| {
3759 thread.read_text_file(path!("/tmp/foo").into(), None, Some(2), false, cx)
3760 })
3761 .await
3762 .unwrap();
3763
3764 assert_eq!(content, "");
3765
3766 // Range
3767 let content = thread
3768 .update(cx, |thread, cx| {
3769 thread.read_text_file(path!("/tmp/foo").into(), Some(1), Some(1), false, cx)
3770 })
3771 .await
3772 .unwrap();
3773
3774 assert_eq!(content, "");
3775
3776 // Invalid
3777 let err = thread
3778 .update(cx, |thread, cx| {
3779 thread.read_text_file(path!("/tmp/foo").into(), Some(5), Some(2), false, cx)
3780 })
3781 .await
3782 .unwrap_err();
3783
3784 assert_eq!(
3785 err.to_string(),
3786 "Invalid params: \"Attempting to read beyond the end of the file, line 1:0\""
3787 );
3788 }
3789 #[gpui::test]
3790 async fn test_reading_non_existing_file(cx: &mut TestAppContext) {
3791 init_test(cx);
3792
3793 let fs = FakeFs::new(cx.executor());
3794 fs.insert_tree(path!("/tmp"), json!({})).await;
3795 let project = Project::test(fs.clone(), [], cx).await;
3796 project
3797 .update(cx, |project, cx| {
3798 project.find_or_create_worktree(path!("/tmp"), true, cx)
3799 })
3800 .await
3801 .unwrap();
3802
3803 let connection = Rc::new(FakeAgentConnection::new());
3804
3805 let thread = cx
3806 .update(|cx| {
3807 connection.new_session(project, PathList::new(&[Path::new(path!("/tmp"))]), cx)
3808 })
3809 .await
3810 .unwrap();
3811
3812 // Out of project file
3813 let err = thread
3814 .update(cx, |thread, cx| {
3815 thread.read_text_file(path!("/foo").into(), None, None, false, cx)
3816 })
3817 .await
3818 .unwrap_err();
3819
3820 assert_eq!(err.code, acp::ErrorCode::ResourceNotFound);
3821 }
3822
3823 #[gpui::test]
3824 async fn test_succeeding_canceled_toolcall(cx: &mut TestAppContext) {
3825 init_test(cx);
3826
3827 let fs = FakeFs::new(cx.executor());
3828 let project = Project::test(fs, [], cx).await;
3829 let id = acp::ToolCallId::new("test");
3830
3831 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3832 let id = id.clone();
3833 move |_, thread, mut cx| {
3834 let id = id.clone();
3835 async move {
3836 thread
3837 .update(&mut cx, |thread, cx| {
3838 thread.handle_session_update(
3839 acp::SessionUpdate::ToolCall(
3840 acp::ToolCall::new(id.clone(), "Label")
3841 .kind(acp::ToolKind::Fetch)
3842 .status(acp::ToolCallStatus::InProgress),
3843 ),
3844 cx,
3845 )
3846 })
3847 .unwrap()
3848 .unwrap();
3849 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3850 }
3851 .boxed_local()
3852 }
3853 }));
3854
3855 let thread = cx
3856 .update(|cx| {
3857 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
3858 })
3859 .await
3860 .unwrap();
3861
3862 let request = thread.update(cx, |thread, cx| {
3863 thread.send_raw("Fetch https://example.com", cx)
3864 });
3865
3866 run_until_first_tool_call(&thread, cx).await;
3867
3868 thread.read_with(cx, |thread, _| {
3869 assert!(matches!(
3870 thread.entries[1],
3871 AgentThreadEntry::ToolCall(ToolCall {
3872 status: ToolCallStatus::InProgress,
3873 ..
3874 })
3875 ));
3876 });
3877
3878 thread.update(cx, |thread, cx| thread.cancel(cx)).await;
3879
3880 thread.read_with(cx, |thread, _| {
3881 assert!(matches!(
3882 &thread.entries[1],
3883 AgentThreadEntry::ToolCall(ToolCall {
3884 status: ToolCallStatus::Canceled,
3885 ..
3886 })
3887 ));
3888 });
3889
3890 thread
3891 .update(cx, |thread, cx| {
3892 thread.handle_session_update(
3893 acp::SessionUpdate::ToolCallUpdate(acp::ToolCallUpdate::new(
3894 id,
3895 acp::ToolCallUpdateFields::new().status(acp::ToolCallStatus::Completed),
3896 )),
3897 cx,
3898 )
3899 })
3900 .unwrap();
3901
3902 request.await.unwrap();
3903
3904 thread.read_with(cx, |thread, _| {
3905 assert!(matches!(
3906 thread.entries[1],
3907 AgentThreadEntry::ToolCall(ToolCall {
3908 status: ToolCallStatus::Completed,
3909 ..
3910 })
3911 ));
3912 });
3913 }
3914
3915 #[gpui::test]
3916 async fn test_no_pending_edits_if_tool_calls_are_completed(cx: &mut TestAppContext) {
3917 init_test(cx);
3918 let fs = FakeFs::new(cx.background_executor.clone());
3919 fs.insert_tree(path!("/test"), json!({})).await;
3920 let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
3921
3922 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3923 move |_, thread, mut cx| {
3924 async move {
3925 thread
3926 .update(&mut cx, |thread, cx| {
3927 thread.handle_session_update(
3928 acp::SessionUpdate::ToolCall(
3929 acp::ToolCall::new("test", "Label")
3930 .kind(acp::ToolKind::Edit)
3931 .status(acp::ToolCallStatus::Completed)
3932 .content(vec![acp::ToolCallContent::Diff(acp::Diff::new(
3933 "/test/test.txt",
3934 "foo",
3935 ))]),
3936 ),
3937 cx,
3938 )
3939 })
3940 .unwrap()
3941 .unwrap();
3942 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3943 }
3944 .boxed_local()
3945 }
3946 }));
3947
3948 let thread = cx
3949 .update(|cx| {
3950 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
3951 })
3952 .await
3953 .unwrap();
3954
3955 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Hi".into()], cx)))
3956 .await
3957 .unwrap();
3958
3959 assert!(cx.read(|cx| !thread.read(cx).has_pending_edit_tool_calls()));
3960 }
3961
3962 #[gpui::test(iterations = 10)]
3963 async fn test_checkpoints(cx: &mut TestAppContext) {
3964 init_test(cx);
3965 let fs = FakeFs::new(cx.background_executor.clone());
3966 fs.insert_tree(
3967 path!("/test"),
3968 json!({
3969 ".git": {}
3970 }),
3971 )
3972 .await;
3973 let project = Project::test(fs.clone(), [path!("/test").as_ref()], cx).await;
3974
3975 let simulate_changes = Arc::new(AtomicBool::new(true));
3976 let next_filename = Arc::new(AtomicUsize::new(0));
3977 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3978 let simulate_changes = simulate_changes.clone();
3979 let next_filename = next_filename.clone();
3980 let fs = fs.clone();
3981 move |request, thread, mut cx| {
3982 let fs = fs.clone();
3983 let simulate_changes = simulate_changes.clone();
3984 let next_filename = next_filename.clone();
3985 async move {
3986 if simulate_changes.load(SeqCst) {
3987 let filename = format!("/test/file-{}", next_filename.fetch_add(1, SeqCst));
3988 fs.write(Path::new(&filename), b"").await?;
3989 }
3990
3991 let acp::ContentBlock::Text(content) = &request.prompt[0] else {
3992 panic!("expected text content block");
3993 };
3994 thread.update(&mut cx, |thread, cx| {
3995 thread
3996 .handle_session_update(
3997 acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk::new(
3998 content.text.to_uppercase().into(),
3999 )),
4000 cx,
4001 )
4002 .unwrap();
4003 })?;
4004 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
4005 }
4006 .boxed_local()
4007 }
4008 }));
4009 let thread = cx
4010 .update(|cx| {
4011 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4012 })
4013 .await
4014 .unwrap();
4015
4016 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Lorem".into()], cx)))
4017 .await
4018 .unwrap();
4019 thread.read_with(cx, |thread, cx| {
4020 assert_eq!(
4021 thread.to_markdown(cx),
4022 indoc! {"
4023 ## User (checkpoint)
4024
4025 Lorem
4026
4027 ## Assistant
4028
4029 LOREM
4030
4031 "}
4032 );
4033 });
4034 assert_eq!(fs.files(), vec![Path::new(path!("/test/file-0"))]);
4035
4036 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["ipsum".into()], cx)))
4037 .await
4038 .unwrap();
4039 thread.read_with(cx, |thread, cx| {
4040 assert_eq!(
4041 thread.to_markdown(cx),
4042 indoc! {"
4043 ## User (checkpoint)
4044
4045 Lorem
4046
4047 ## Assistant
4048
4049 LOREM
4050
4051 ## User (checkpoint)
4052
4053 ipsum
4054
4055 ## Assistant
4056
4057 IPSUM
4058
4059 "}
4060 );
4061 });
4062 assert_eq!(
4063 fs.files(),
4064 vec![
4065 Path::new(path!("/test/file-0")),
4066 Path::new(path!("/test/file-1"))
4067 ]
4068 );
4069
4070 // Checkpoint isn't stored when there are no changes.
4071 simulate_changes.store(false, SeqCst);
4072 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["dolor".into()], cx)))
4073 .await
4074 .unwrap();
4075 thread.read_with(cx, |thread, cx| {
4076 assert_eq!(
4077 thread.to_markdown(cx),
4078 indoc! {"
4079 ## User (checkpoint)
4080
4081 Lorem
4082
4083 ## Assistant
4084
4085 LOREM
4086
4087 ## User (checkpoint)
4088
4089 ipsum
4090
4091 ## Assistant
4092
4093 IPSUM
4094
4095 ## User
4096
4097 dolor
4098
4099 ## Assistant
4100
4101 DOLOR
4102
4103 "}
4104 );
4105 });
4106 assert_eq!(
4107 fs.files(),
4108 vec![
4109 Path::new(path!("/test/file-0")),
4110 Path::new(path!("/test/file-1"))
4111 ]
4112 );
4113
4114 // Rewinding the conversation truncates the history and restores the checkpoint.
4115 thread
4116 .update(cx, |thread, cx| {
4117 let AgentThreadEntry::UserMessage(message) = &thread.entries[2] else {
4118 panic!("unexpected entries {:?}", thread.entries)
4119 };
4120 thread.restore_checkpoint(message.id.clone().unwrap(), cx)
4121 })
4122 .await
4123 .unwrap();
4124 thread.read_with(cx, |thread, cx| {
4125 assert_eq!(
4126 thread.to_markdown(cx),
4127 indoc! {"
4128 ## User (checkpoint)
4129
4130 Lorem
4131
4132 ## Assistant
4133
4134 LOREM
4135
4136 "}
4137 );
4138 });
4139 assert_eq!(fs.files(), vec![Path::new(path!("/test/file-0"))]);
4140 }
4141
4142 #[gpui::test]
4143 async fn test_tool_result_refusal(cx: &mut TestAppContext) {
4144 use std::sync::atomic::AtomicUsize;
4145 init_test(cx);
4146
4147 let fs = FakeFs::new(cx.executor());
4148 let project = Project::test(fs, None, cx).await;
4149
4150 // Create a connection that simulates refusal after tool result
4151 let prompt_count = Arc::new(AtomicUsize::new(0));
4152 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
4153 let prompt_count = prompt_count.clone();
4154 move |_request, thread, mut cx| {
4155 let count = prompt_count.fetch_add(1, SeqCst);
4156 async move {
4157 if count == 0 {
4158 // First prompt: Generate a tool call with result
4159 thread.update(&mut cx, |thread, cx| {
4160 thread
4161 .handle_session_update(
4162 acp::SessionUpdate::ToolCall(
4163 acp::ToolCall::new("tool1", "Test Tool")
4164 .kind(acp::ToolKind::Fetch)
4165 .status(acp::ToolCallStatus::Completed)
4166 .raw_input(serde_json::json!({"query": "test"}))
4167 .raw_output(serde_json::json!({"result": "inappropriate content"})),
4168 ),
4169 cx,
4170 )
4171 .unwrap();
4172 })?;
4173
4174 // Now return refusal because of the tool result
4175 Ok(acp::PromptResponse::new(acp::StopReason::Refusal))
4176 } else {
4177 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
4178 }
4179 }
4180 .boxed_local()
4181 }
4182 }));
4183
4184 let thread = cx
4185 .update(|cx| {
4186 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4187 })
4188 .await
4189 .unwrap();
4190
4191 // Track if we see a Refusal event
4192 let saw_refusal_event = Arc::new(std::sync::Mutex::new(false));
4193 let saw_refusal_event_captured = saw_refusal_event.clone();
4194 thread.update(cx, |_thread, cx| {
4195 cx.subscribe(
4196 &thread,
4197 move |_thread, _event_thread, event: &AcpThreadEvent, _cx| {
4198 if matches!(event, AcpThreadEvent::Refusal) {
4199 *saw_refusal_event_captured.lock().unwrap() = true;
4200 }
4201 },
4202 )
4203 .detach();
4204 });
4205
4206 // Send a user message - this will trigger tool call and then refusal
4207 let send_task = thread.update(cx, |thread, cx| thread.send(vec!["Hello".into()], cx));
4208 cx.background_executor.spawn(send_task).detach();
4209 cx.run_until_parked();
4210
4211 // Verify that:
4212 // 1. A Refusal event WAS emitted (because it's a tool result refusal, not user prompt)
4213 // 2. The user message was NOT truncated
4214 assert!(
4215 *saw_refusal_event.lock().unwrap(),
4216 "Refusal event should be emitted for tool result refusals"
4217 );
4218
4219 thread.read_with(cx, |thread, _| {
4220 let entries = thread.entries();
4221 assert!(entries.len() >= 2, "Should have user message and tool call");
4222
4223 // Verify user message is still there
4224 assert!(
4225 matches!(entries[0], AgentThreadEntry::UserMessage(_)),
4226 "User message should not be truncated"
4227 );
4228
4229 // Verify tool call is there with result
4230 if let AgentThreadEntry::ToolCall(tool_call) = &entries[1] {
4231 assert!(
4232 tool_call.raw_output.is_some(),
4233 "Tool call should have output"
4234 );
4235 } else {
4236 panic!("Expected tool call at index 1");
4237 }
4238 });
4239 }
4240
4241 #[gpui::test]
4242 async fn test_user_prompt_refusal_emits_event(cx: &mut TestAppContext) {
4243 init_test(cx);
4244
4245 let fs = FakeFs::new(cx.executor());
4246 let project = Project::test(fs, None, cx).await;
4247
4248 let refuse_next = Arc::new(AtomicBool::new(false));
4249 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
4250 let refuse_next = refuse_next.clone();
4251 move |_request, _thread, _cx| {
4252 if refuse_next.load(SeqCst) {
4253 async move { Ok(acp::PromptResponse::new(acp::StopReason::Refusal)) }
4254 .boxed_local()
4255 } else {
4256 async move { Ok(acp::PromptResponse::new(acp::StopReason::EndTurn)) }
4257 .boxed_local()
4258 }
4259 }
4260 }));
4261
4262 let thread = cx
4263 .update(|cx| {
4264 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4265 })
4266 .await
4267 .unwrap();
4268
4269 // Track if we see a Refusal event
4270 let saw_refusal_event = Arc::new(std::sync::Mutex::new(false));
4271 let saw_refusal_event_captured = saw_refusal_event.clone();
4272 thread.update(cx, |_thread, cx| {
4273 cx.subscribe(
4274 &thread,
4275 move |_thread, _event_thread, event: &AcpThreadEvent, _cx| {
4276 if matches!(event, AcpThreadEvent::Refusal) {
4277 *saw_refusal_event_captured.lock().unwrap() = true;
4278 }
4279 },
4280 )
4281 .detach();
4282 });
4283
4284 // Send a message that will be refused
4285 refuse_next.store(true, SeqCst);
4286 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["hello".into()], cx)))
4287 .await
4288 .unwrap();
4289
4290 // Verify that a Refusal event WAS emitted for user prompt refusal
4291 assert!(
4292 *saw_refusal_event.lock().unwrap(),
4293 "Refusal event should be emitted for user prompt refusals"
4294 );
4295
4296 // Verify the message was truncated (user prompt refusal)
4297 thread.read_with(cx, |thread, cx| {
4298 assert_eq!(thread.to_markdown(cx), "");
4299 });
4300 }
4301
4302 #[gpui::test]
4303 async fn test_refusal(cx: &mut TestAppContext) {
4304 init_test(cx);
4305 let fs = FakeFs::new(cx.background_executor.clone());
4306 fs.insert_tree(path!("/"), json!({})).await;
4307 let project = Project::test(fs.clone(), [path!("/").as_ref()], cx).await;
4308
4309 let refuse_next = Arc::new(AtomicBool::new(false));
4310 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
4311 let refuse_next = refuse_next.clone();
4312 move |request, thread, mut cx| {
4313 let refuse_next = refuse_next.clone();
4314 async move {
4315 if refuse_next.load(SeqCst) {
4316 return Ok(acp::PromptResponse::new(acp::StopReason::Refusal));
4317 }
4318
4319 let acp::ContentBlock::Text(content) = &request.prompt[0] else {
4320 panic!("expected text content block");
4321 };
4322 thread.update(&mut cx, |thread, cx| {
4323 thread
4324 .handle_session_update(
4325 acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk::new(
4326 content.text.to_uppercase().into(),
4327 )),
4328 cx,
4329 )
4330 .unwrap();
4331 })?;
4332 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
4333 }
4334 .boxed_local()
4335 }
4336 }));
4337 let thread = cx
4338 .update(|cx| {
4339 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4340 })
4341 .await
4342 .unwrap();
4343
4344 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["hello".into()], cx)))
4345 .await
4346 .unwrap();
4347 thread.read_with(cx, |thread, cx| {
4348 assert_eq!(
4349 thread.to_markdown(cx),
4350 indoc! {"
4351 ## User
4352
4353 hello
4354
4355 ## Assistant
4356
4357 HELLO
4358
4359 "}
4360 );
4361 });
4362
4363 // Simulate refusing the second message. The message should be truncated
4364 // when a user prompt is refused.
4365 refuse_next.store(true, SeqCst);
4366 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["world".into()], cx)))
4367 .await
4368 .unwrap();
4369 thread.read_with(cx, |thread, cx| {
4370 assert_eq!(
4371 thread.to_markdown(cx),
4372 indoc! {"
4373 ## User
4374
4375 hello
4376
4377 ## Assistant
4378
4379 HELLO
4380
4381 "}
4382 );
4383 });
4384 }
4385
4386 async fn run_until_first_tool_call(
4387 thread: &Entity<AcpThread>,
4388 cx: &mut TestAppContext,
4389 ) -> usize {
4390 let (mut tx, mut rx) = mpsc::channel::<usize>(1);
4391
4392 let subscription = cx.update(|cx| {
4393 cx.subscribe(thread, move |thread, _, cx| {
4394 for (ix, entry) in thread.read(cx).entries.iter().enumerate() {
4395 if matches!(entry, AgentThreadEntry::ToolCall(_)) {
4396 return tx.try_send(ix).unwrap();
4397 }
4398 }
4399 })
4400 });
4401
4402 select! {
4403 _ = futures::FutureExt::fuse(cx.background_executor.timer(Duration::from_secs(10))) => {
4404 panic!("Timeout waiting for tool call")
4405 }
4406 ix = rx.next().fuse() => {
4407 drop(subscription);
4408 ix.unwrap()
4409 }
4410 }
4411 }
4412
4413 #[derive(Clone, Default)]
4414 struct FakeAgentConnection {
4415 auth_methods: Vec<acp::AuthMethod>,
4416 supports_truncate: bool,
4417 sessions: Arc<parking_lot::Mutex<HashMap<acp::SessionId, WeakEntity<AcpThread>>>>,
4418 set_title_calls: Rc<RefCell<Vec<SharedString>>>,
4419 on_user_message: Option<
4420 Rc<
4421 dyn Fn(
4422 acp::PromptRequest,
4423 WeakEntity<AcpThread>,
4424 AsyncApp,
4425 ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
4426 + 'static,
4427 >,
4428 >,
4429 }
4430
4431 impl FakeAgentConnection {
4432 fn new() -> Self {
4433 Self {
4434 auth_methods: Vec::new(),
4435 supports_truncate: true,
4436 on_user_message: None,
4437 sessions: Arc::default(),
4438 set_title_calls: Default::default(),
4439 }
4440 }
4441
4442 fn without_truncate_support(mut self) -> Self {
4443 self.supports_truncate = false;
4444 self
4445 }
4446
4447 #[expect(unused)]
4448 fn with_auth_methods(mut self, auth_methods: Vec<acp::AuthMethod>) -> Self {
4449 self.auth_methods = auth_methods;
4450 self
4451 }
4452
4453 fn on_user_message(
4454 mut self,
4455 handler: impl Fn(
4456 acp::PromptRequest,
4457 WeakEntity<AcpThread>,
4458 AsyncApp,
4459 ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
4460 + 'static,
4461 ) -> Self {
4462 self.on_user_message.replace(Rc::new(handler));
4463 self
4464 }
4465 }
4466
4467 impl AgentConnection for FakeAgentConnection {
4468 fn agent_id(&self) -> AgentId {
4469 AgentId::new("fake")
4470 }
4471
4472 fn telemetry_id(&self) -> SharedString {
4473 "fake".into()
4474 }
4475
4476 fn auth_methods(&self) -> &[acp::AuthMethod] {
4477 &self.auth_methods
4478 }
4479
4480 fn new_session(
4481 self: Rc<Self>,
4482 project: Entity<Project>,
4483 work_dirs: PathList,
4484 cx: &mut App,
4485 ) -> Task<gpui::Result<Entity<AcpThread>>> {
4486 let session_id = acp::SessionId::new(
4487 rand::rng()
4488 .sample_iter(&distr::Alphanumeric)
4489 .take(7)
4490 .map(char::from)
4491 .collect::<String>(),
4492 );
4493 let action_log = cx.new(|_| ActionLog::new(project.clone()));
4494 let thread = cx.new(|cx| {
4495 AcpThread::new(
4496 None,
4497 None,
4498 Some(work_dirs),
4499 self.clone(),
4500 project,
4501 action_log,
4502 session_id.clone(),
4503 watch::Receiver::constant(
4504 acp::PromptCapabilities::new()
4505 .image(true)
4506 .audio(true)
4507 .embedded_context(true),
4508 ),
4509 cx,
4510 )
4511 });
4512 self.sessions.lock().insert(session_id, thread.downgrade());
4513 Task::ready(Ok(thread))
4514 }
4515
4516 fn authenticate(&self, method: acp::AuthMethodId, _cx: &mut App) -> Task<gpui::Result<()>> {
4517 if self.auth_methods().iter().any(|m| m.id() == &method) {
4518 Task::ready(Ok(()))
4519 } else {
4520 Task::ready(Err(anyhow!("Invalid Auth Method")))
4521 }
4522 }
4523
4524 fn prompt(
4525 &self,
4526 _id: UserMessageId,
4527 params: acp::PromptRequest,
4528 cx: &mut App,
4529 ) -> Task<gpui::Result<acp::PromptResponse>> {
4530 let sessions = self.sessions.lock();
4531 let thread = sessions.get(¶ms.session_id).unwrap();
4532 if let Some(handler) = &self.on_user_message {
4533 let handler = handler.clone();
4534 let thread = thread.clone();
4535 cx.spawn(async move |cx| handler(params, thread, cx.clone()).await)
4536 } else {
4537 Task::ready(Ok(acp::PromptResponse::new(acp::StopReason::EndTurn)))
4538 }
4539 }
4540
4541 fn cancel(&self, _session_id: &acp::SessionId, _cx: &mut App) {}
4542
4543 fn truncate(
4544 &self,
4545 session_id: &acp::SessionId,
4546 _cx: &App,
4547 ) -> Option<Rc<dyn AgentSessionTruncate>> {
4548 self.supports_truncate.then(|| {
4549 Rc::new(FakeAgentSessionEditor {
4550 _session_id: session_id.clone(),
4551 }) as Rc<dyn AgentSessionTruncate>
4552 })
4553 }
4554
4555 fn set_title(
4556 &self,
4557 _session_id: &acp::SessionId,
4558 _cx: &App,
4559 ) -> Option<Rc<dyn AgentSessionSetTitle>> {
4560 Some(Rc::new(FakeAgentSessionSetTitle {
4561 calls: self.set_title_calls.clone(),
4562 }))
4563 }
4564
4565 fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
4566 self
4567 }
4568 }
4569
4570 struct FakeAgentSessionSetTitle {
4571 calls: Rc<RefCell<Vec<SharedString>>>,
4572 }
4573
4574 impl AgentSessionSetTitle for FakeAgentSessionSetTitle {
4575 fn run(&self, title: SharedString, _cx: &mut App) -> Task<Result<()>> {
4576 self.calls.borrow_mut().push(title);
4577 Task::ready(Ok(()))
4578 }
4579 }
4580
4581 struct FakeAgentSessionEditor {
4582 _session_id: acp::SessionId,
4583 }
4584
4585 impl AgentSessionTruncate for FakeAgentSessionEditor {
4586 fn run(&self, _message_id: UserMessageId, _cx: &mut App) -> Task<Result<()>> {
4587 Task::ready(Ok(()))
4588 }
4589 }
4590
4591 #[gpui::test]
4592 async fn test_tool_call_not_found_creates_failed_entry(cx: &mut TestAppContext) {
4593 init_test(cx);
4594
4595 let fs = FakeFs::new(cx.executor());
4596 let project = Project::test(fs, [], cx).await;
4597 let connection = Rc::new(FakeAgentConnection::new());
4598 let thread = cx
4599 .update(|cx| {
4600 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4601 })
4602 .await
4603 .unwrap();
4604
4605 // Try to update a tool call that doesn't exist
4606 let nonexistent_id = acp::ToolCallId::new("nonexistent-tool-call");
4607 thread.update(cx, |thread, cx| {
4608 let result = thread.handle_session_update(
4609 acp::SessionUpdate::ToolCallUpdate(acp::ToolCallUpdate::new(
4610 nonexistent_id.clone(),
4611 acp::ToolCallUpdateFields::new().status(acp::ToolCallStatus::Completed),
4612 )),
4613 cx,
4614 );
4615
4616 // The update should succeed (not return an error)
4617 assert!(result.is_ok());
4618
4619 // There should now be exactly one entry in the thread
4620 assert_eq!(thread.entries.len(), 1);
4621
4622 // The entry should be a failed tool call
4623 if let AgentThreadEntry::ToolCall(tool_call) = &thread.entries[0] {
4624 assert_eq!(tool_call.id, nonexistent_id);
4625 assert!(matches!(tool_call.status, ToolCallStatus::Failed));
4626 assert_eq!(tool_call.kind, acp::ToolKind::Fetch);
4627
4628 // Check that the content contains the error message
4629 assert_eq!(tool_call.content.len(), 1);
4630 if let ToolCallContent::ContentBlock(content_block) = &tool_call.content[0] {
4631 match content_block {
4632 ContentBlock::Markdown { markdown } => {
4633 let markdown_text = markdown.read(cx).source();
4634 assert!(markdown_text.contains("Tool call not found"));
4635 }
4636 ContentBlock::Empty => panic!("Expected markdown content, got empty"),
4637 ContentBlock::ResourceLink { .. } => {
4638 panic!("Expected markdown content, got resource link")
4639 }
4640 ContentBlock::Image { .. } => {
4641 panic!("Expected markdown content, got image")
4642 }
4643 }
4644 } else {
4645 panic!("Expected ContentBlock, got: {:?}", tool_call.content[0]);
4646 }
4647 } else {
4648 panic!("Expected ToolCall entry, got: {:?}", thread.entries[0]);
4649 }
4650 });
4651 }
4652
4653 /// Tests that restoring a checkpoint properly cleans up terminals that were
4654 /// created after that checkpoint, and cancels any in-progress generation.
4655 ///
4656 /// Reproduces issue #35142: When a checkpoint is restored, any terminal processes
4657 /// that were started after that checkpoint should be terminated, and any in-progress
4658 /// AI generation should be canceled.
4659 #[gpui::test]
4660 async fn test_restore_checkpoint_kills_terminal(cx: &mut TestAppContext) {
4661 init_test(cx);
4662
4663 let fs = FakeFs::new(cx.executor());
4664 let project = Project::test(fs, [], cx).await;
4665 let connection = Rc::new(FakeAgentConnection::new());
4666 let thread = cx
4667 .update(|cx| {
4668 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4669 })
4670 .await
4671 .unwrap();
4672
4673 // Send first user message to create a checkpoint
4674 cx.update(|cx| {
4675 thread.update(cx, |thread, cx| {
4676 thread.send(vec!["first message".into()], cx)
4677 })
4678 })
4679 .await
4680 .unwrap();
4681
4682 // Send second message (creates another checkpoint) - we'll restore to this one
4683 cx.update(|cx| {
4684 thread.update(cx, |thread, cx| {
4685 thread.send(vec!["second message".into()], cx)
4686 })
4687 })
4688 .await
4689 .unwrap();
4690
4691 // Create 2 terminals BEFORE the checkpoint that have completed running
4692 let terminal_id_1 = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
4693 let mock_terminal_1 = cx.new(|cx| {
4694 let builder = ::terminal::TerminalBuilder::new_display_only(
4695 ::terminal::terminal_settings::CursorShape::default(),
4696 ::terminal::terminal_settings::AlternateScroll::On,
4697 None,
4698 0,
4699 cx.background_executor(),
4700 PathStyle::local(),
4701 )
4702 .unwrap();
4703 builder.subscribe(cx)
4704 });
4705
4706 thread.update(cx, |thread, cx| {
4707 thread.on_terminal_provider_event(
4708 TerminalProviderEvent::Created {
4709 terminal_id: terminal_id_1.clone(),
4710 label: "echo 'first'".to_string(),
4711 cwd: Some(PathBuf::from("/test")),
4712 output_byte_limit: None,
4713 terminal: mock_terminal_1.clone(),
4714 },
4715 cx,
4716 );
4717 });
4718
4719 thread.update(cx, |thread, cx| {
4720 thread.on_terminal_provider_event(
4721 TerminalProviderEvent::Output {
4722 terminal_id: terminal_id_1.clone(),
4723 data: b"first\n".to_vec(),
4724 },
4725 cx,
4726 );
4727 });
4728
4729 thread.update(cx, |thread, cx| {
4730 thread.on_terminal_provider_event(
4731 TerminalProviderEvent::Exit {
4732 terminal_id: terminal_id_1.clone(),
4733 status: acp::TerminalExitStatus::new().exit_code(0),
4734 },
4735 cx,
4736 );
4737 });
4738
4739 let terminal_id_2 = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
4740 let mock_terminal_2 = cx.new(|cx| {
4741 let builder = ::terminal::TerminalBuilder::new_display_only(
4742 ::terminal::terminal_settings::CursorShape::default(),
4743 ::terminal::terminal_settings::AlternateScroll::On,
4744 None,
4745 0,
4746 cx.background_executor(),
4747 PathStyle::local(),
4748 )
4749 .unwrap();
4750 builder.subscribe(cx)
4751 });
4752
4753 thread.update(cx, |thread, cx| {
4754 thread.on_terminal_provider_event(
4755 TerminalProviderEvent::Created {
4756 terminal_id: terminal_id_2.clone(),
4757 label: "echo 'second'".to_string(),
4758 cwd: Some(PathBuf::from("/test")),
4759 output_byte_limit: None,
4760 terminal: mock_terminal_2.clone(),
4761 },
4762 cx,
4763 );
4764 });
4765
4766 thread.update(cx, |thread, cx| {
4767 thread.on_terminal_provider_event(
4768 TerminalProviderEvent::Output {
4769 terminal_id: terminal_id_2.clone(),
4770 data: b"second\n".to_vec(),
4771 },
4772 cx,
4773 );
4774 });
4775
4776 thread.update(cx, |thread, cx| {
4777 thread.on_terminal_provider_event(
4778 TerminalProviderEvent::Exit {
4779 terminal_id: terminal_id_2.clone(),
4780 status: acp::TerminalExitStatus::new().exit_code(0),
4781 },
4782 cx,
4783 );
4784 });
4785
4786 // Get the second message ID to restore to
4787 let second_message_id = thread.read_with(cx, |thread, _| {
4788 // At this point we have:
4789 // - Index 0: First user message (with checkpoint)
4790 // - Index 1: Second user message (with checkpoint)
4791 // No assistant responses because FakeAgentConnection just returns EndTurn
4792 let AgentThreadEntry::UserMessage(message) = &thread.entries[1] else {
4793 panic!("expected user message at index 1");
4794 };
4795 message.id.clone().unwrap()
4796 });
4797
4798 // Create a terminal AFTER the checkpoint we'll restore to.
4799 // This simulates the AI agent starting a long-running terminal command.
4800 let terminal_id = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
4801 let mock_terminal = cx.new(|cx| {
4802 let builder = ::terminal::TerminalBuilder::new_display_only(
4803 ::terminal::terminal_settings::CursorShape::default(),
4804 ::terminal::terminal_settings::AlternateScroll::On,
4805 None,
4806 0,
4807 cx.background_executor(),
4808 PathStyle::local(),
4809 )
4810 .unwrap();
4811 builder.subscribe(cx)
4812 });
4813
4814 // Register the terminal as created
4815 thread.update(cx, |thread, cx| {
4816 thread.on_terminal_provider_event(
4817 TerminalProviderEvent::Created {
4818 terminal_id: terminal_id.clone(),
4819 label: "sleep 1000".to_string(),
4820 cwd: Some(PathBuf::from("/test")),
4821 output_byte_limit: None,
4822 terminal: mock_terminal.clone(),
4823 },
4824 cx,
4825 );
4826 });
4827
4828 // Simulate the terminal producing output (still running)
4829 thread.update(cx, |thread, cx| {
4830 thread.on_terminal_provider_event(
4831 TerminalProviderEvent::Output {
4832 terminal_id: terminal_id.clone(),
4833 data: b"terminal is running...\n".to_vec(),
4834 },
4835 cx,
4836 );
4837 });
4838
4839 // Create a tool call entry that references this terminal
4840 // This represents the agent requesting a terminal command
4841 thread.update(cx, |thread, cx| {
4842 thread
4843 .handle_session_update(
4844 acp::SessionUpdate::ToolCall(
4845 acp::ToolCall::new("terminal-tool-1", "Running command")
4846 .kind(acp::ToolKind::Execute)
4847 .status(acp::ToolCallStatus::InProgress)
4848 .content(vec![acp::ToolCallContent::Terminal(acp::Terminal::new(
4849 terminal_id.clone(),
4850 ))])
4851 .raw_input(serde_json::json!({"command": "sleep 1000", "cd": "/test"})),
4852 ),
4853 cx,
4854 )
4855 .unwrap();
4856 });
4857
4858 // Verify terminal exists and is in the thread
4859 let terminal_exists_before =
4860 thread.read_with(cx, |thread, _| thread.terminals.contains_key(&terminal_id));
4861 assert!(
4862 terminal_exists_before,
4863 "Terminal should exist before checkpoint restore"
4864 );
4865
4866 // Verify the terminal's underlying task is still running (not completed)
4867 let terminal_running_before = thread.read_with(cx, |thread, _cx| {
4868 let terminal_entity = thread.terminals.get(&terminal_id).unwrap();
4869 terminal_entity.read_with(cx, |term, _cx| {
4870 term.output().is_none() // output is None means it's still running
4871 })
4872 });
4873 assert!(
4874 terminal_running_before,
4875 "Terminal should be running before checkpoint restore"
4876 );
4877
4878 // Verify we have the expected entries before restore
4879 let entry_count_before = thread.read_with(cx, |thread, _| thread.entries.len());
4880 assert!(
4881 entry_count_before > 1,
4882 "Should have multiple entries before restore"
4883 );
4884
4885 // Restore the checkpoint to the second message.
4886 // This should:
4887 // 1. Cancel any in-progress generation (via the cancel() call)
4888 // 2. Remove the terminal that was created after that point
4889 thread
4890 .update(cx, |thread, cx| {
4891 thread.restore_checkpoint(second_message_id, cx)
4892 })
4893 .await
4894 .unwrap();
4895
4896 // Verify that no send_task is in progress after restore
4897 // (cancel() clears the send_task)
4898 let has_send_task_after = thread.read_with(cx, |thread, _| thread.running_turn.is_some());
4899 assert!(
4900 !has_send_task_after,
4901 "Should not have a send_task after restore (cancel should have cleared it)"
4902 );
4903
4904 // Verify the entries were truncated (restoring to index 1 truncates at 1, keeping only index 0)
4905 let entry_count = thread.read_with(cx, |thread, _| thread.entries.len());
4906 assert_eq!(
4907 entry_count, 1,
4908 "Should have 1 entry after restore (only the first user message)"
4909 );
4910
4911 // Verify the 2 completed terminals from before the checkpoint still exist
4912 let terminal_1_exists = thread.read_with(cx, |thread, _| {
4913 thread.terminals.contains_key(&terminal_id_1)
4914 });
4915 assert!(
4916 terminal_1_exists,
4917 "Terminal 1 (from before checkpoint) should still exist"
4918 );
4919
4920 let terminal_2_exists = thread.read_with(cx, |thread, _| {
4921 thread.terminals.contains_key(&terminal_id_2)
4922 });
4923 assert!(
4924 terminal_2_exists,
4925 "Terminal 2 (from before checkpoint) should still exist"
4926 );
4927
4928 // Verify they're still in completed state
4929 let terminal_1_completed = thread.read_with(cx, |thread, _cx| {
4930 let terminal_entity = thread.terminals.get(&terminal_id_1).unwrap();
4931 terminal_entity.read_with(cx, |term, _cx| term.output().is_some())
4932 });
4933 assert!(terminal_1_completed, "Terminal 1 should still be completed");
4934
4935 let terminal_2_completed = thread.read_with(cx, |thread, _cx| {
4936 let terminal_entity = thread.terminals.get(&terminal_id_2).unwrap();
4937 terminal_entity.read_with(cx, |term, _cx| term.output().is_some())
4938 });
4939 assert!(terminal_2_completed, "Terminal 2 should still be completed");
4940
4941 // Verify the running terminal (created after checkpoint) was removed
4942 let terminal_3_exists =
4943 thread.read_with(cx, |thread, _| thread.terminals.contains_key(&terminal_id));
4944 assert!(
4945 !terminal_3_exists,
4946 "Terminal 3 (created after checkpoint) should have been removed"
4947 );
4948
4949 // Verify total count is 2 (the two from before the checkpoint)
4950 let terminal_count = thread.read_with(cx, |thread, _| thread.terminals.len());
4951 assert_eq!(
4952 terminal_count, 2,
4953 "Should have exactly 2 terminals (the completed ones from before checkpoint)"
4954 );
4955 }
4956
4957 /// Tests that update_last_checkpoint correctly updates the original message's checkpoint
4958 /// even when a new user message is added while the async checkpoint comparison is in progress.
4959 ///
4960 /// This is a regression test for a bug where update_last_checkpoint would fail with
4961 /// "no checkpoint" if a new user message (without a checkpoint) was added between when
4962 /// update_last_checkpoint started and when its async closure ran.
4963 #[gpui::test]
4964 async fn test_update_last_checkpoint_with_new_message_added(cx: &mut TestAppContext) {
4965 init_test(cx);
4966
4967 let fs = FakeFs::new(cx.executor());
4968 fs.insert_tree(path!("/test"), json!({".git": {}, "file.txt": "content"}))
4969 .await;
4970 let project = Project::test(fs.clone(), [Path::new(path!("/test"))], cx).await;
4971
4972 let handler_done = Arc::new(AtomicBool::new(false));
4973 let handler_done_clone = handler_done.clone();
4974 let connection = Rc::new(FakeAgentConnection::new().on_user_message(
4975 move |_, _thread, _cx| {
4976 handler_done_clone.store(true, SeqCst);
4977 async move { Ok(acp::PromptResponse::new(acp::StopReason::EndTurn)) }.boxed_local()
4978 },
4979 ));
4980
4981 let thread = cx
4982 .update(|cx| {
4983 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4984 })
4985 .await
4986 .unwrap();
4987
4988 let send_future = thread.update(cx, |thread, cx| thread.send_raw("First message", cx));
4989 let send_task = cx.background_executor.spawn(send_future);
4990
4991 // Tick until handler completes, then a few more to let update_last_checkpoint start
4992 while !handler_done.load(SeqCst) {
4993 cx.executor().tick();
4994 }
4995 for _ in 0..5 {
4996 cx.executor().tick();
4997 }
4998
4999 thread.update(cx, |thread, cx| {
5000 thread.push_entry(
5001 AgentThreadEntry::UserMessage(UserMessage {
5002 id: Some(UserMessageId::new()),
5003 content: ContentBlock::Empty,
5004 chunks: vec!["Injected message (no checkpoint)".into()],
5005 checkpoint: None,
5006 indented: false,
5007 }),
5008 cx,
5009 );
5010 });
5011
5012 cx.run_until_parked();
5013 let result = send_task.await;
5014
5015 assert!(
5016 result.is_ok(),
5017 "send should succeed even when new message added during update_last_checkpoint: {:?}",
5018 result.err()
5019 );
5020 }
5021
5022 /// Tests that when a follow-up message is sent during generation,
5023 /// the first turn completing does NOT clear `running_turn` because
5024 /// it now belongs to the second turn.
5025 #[gpui::test]
5026 async fn test_follow_up_message_during_generation_does_not_clear_turn(cx: &mut TestAppContext) {
5027 init_test(cx);
5028
5029 let fs = FakeFs::new(cx.executor());
5030 let project = Project::test(fs, [], cx).await;
5031
5032 // First handler waits for this signal before completing
5033 let (first_complete_tx, first_complete_rx) = futures::channel::oneshot::channel::<()>();
5034 let first_complete_rx = RefCell::new(Some(first_complete_rx));
5035
5036 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
5037 move |params, _thread, _cx| {
5038 let first_complete_rx = first_complete_rx.borrow_mut().take();
5039 let is_first = params
5040 .prompt
5041 .iter()
5042 .any(|c| matches!(c, acp::ContentBlock::Text(t) if t.text.contains("first")));
5043
5044 async move {
5045 if is_first {
5046 // First handler waits until signaled
5047 if let Some(rx) = first_complete_rx {
5048 rx.await.ok();
5049 }
5050 }
5051 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
5052 }
5053 .boxed_local()
5054 }
5055 }));
5056
5057 let thread = cx
5058 .update(|cx| {
5059 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
5060 })
5061 .await
5062 .unwrap();
5063
5064 // Send first message (turn_id=1) - handler will block
5065 let first_request = thread.update(cx, |thread, cx| thread.send_raw("first", cx));
5066 assert_eq!(thread.read_with(cx, |t, _| t.turn_id), 1);
5067
5068 // Send second message (turn_id=2) while first is still blocked
5069 // This calls cancel() which takes turn 1's running_turn and sets turn 2's
5070 let second_request = thread.update(cx, |thread, cx| thread.send_raw("second", cx));
5071 assert_eq!(thread.read_with(cx, |t, _| t.turn_id), 2);
5072
5073 let running_turn_after_second_send =
5074 thread.read_with(cx, |thread, _| thread.running_turn.as_ref().map(|t| t.id));
5075 assert_eq!(
5076 running_turn_after_second_send,
5077 Some(2),
5078 "running_turn should be set to turn 2 after sending second message"
5079 );
5080
5081 // Now signal first handler to complete
5082 first_complete_tx.send(()).ok();
5083
5084 // First request completes - should NOT clear running_turn
5085 // because running_turn now belongs to turn 2
5086 first_request.await.unwrap();
5087
5088 let running_turn_after_first =
5089 thread.read_with(cx, |thread, _| thread.running_turn.as_ref().map(|t| t.id));
5090 assert_eq!(
5091 running_turn_after_first,
5092 Some(2),
5093 "first turn completing should not clear running_turn (belongs to turn 2)"
5094 );
5095
5096 // Second request completes - SHOULD clear running_turn
5097 second_request.await.unwrap();
5098
5099 let running_turn_after_second =
5100 thread.read_with(cx, |thread, _| thread.running_turn.is_some());
5101 assert!(
5102 !running_turn_after_second,
5103 "second turn completing should clear running_turn"
5104 );
5105 }
5106
5107 #[gpui::test]
5108 async fn test_send_assigns_message_id_without_truncate_support(cx: &mut TestAppContext) {
5109 init_test(cx);
5110
5111 let fs = FakeFs::new(cx.executor());
5112 let project = Project::test(fs, [], cx).await;
5113
5114 let connection = Rc::new(FakeAgentConnection::new().without_truncate_support());
5115 let thread = cx
5116 .update(|cx| {
5117 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
5118 })
5119 .await
5120 .unwrap();
5121
5122 let response = thread
5123 .update(cx, |thread, cx| thread.send_raw("test message", cx))
5124 .await;
5125
5126 assert!(response.is_ok(), "send should not fail: {response:?}");
5127 thread.read_with(cx, |thread, _| {
5128 let AgentThreadEntry::UserMessage(message) = &thread.entries[0] else {
5129 panic!("expected first entry to be a user message")
5130 };
5131 assert!(
5132 message.id.is_some(),
5133 "user message should always have an id"
5134 );
5135 });
5136 }
5137
5138 #[gpui::test]
5139 async fn test_send_returns_cancelled_response_and_marks_tools_as_cancelled(
5140 cx: &mut TestAppContext,
5141 ) {
5142 init_test(cx);
5143
5144 let fs = FakeFs::new(cx.executor());
5145 let project = Project::test(fs, [], cx).await;
5146
5147 let connection = Rc::new(FakeAgentConnection::new().on_user_message(
5148 move |_params, thread, mut cx| {
5149 async move {
5150 thread
5151 .update(&mut cx, |thread, cx| {
5152 thread.handle_session_update(
5153 acp::SessionUpdate::ToolCall(
5154 acp::ToolCall::new(
5155 acp::ToolCallId::new("test-tool"),
5156 "Test Tool",
5157 )
5158 .kind(acp::ToolKind::Fetch)
5159 .status(acp::ToolCallStatus::InProgress),
5160 ),
5161 cx,
5162 )
5163 })
5164 .unwrap()
5165 .unwrap();
5166
5167 Ok(acp::PromptResponse::new(acp::StopReason::Cancelled))
5168 }
5169 .boxed_local()
5170 },
5171 ));
5172
5173 let thread = cx
5174 .update(|cx| {
5175 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
5176 })
5177 .await
5178 .unwrap();
5179
5180 let response = thread
5181 .update(cx, |thread, cx| thread.send_raw("test message", cx))
5182 .await;
5183
5184 let response = response
5185 .expect("send should succeed")
5186 .expect("should have response");
5187 assert_eq!(
5188 response.stop_reason,
5189 acp::StopReason::Cancelled,
5190 "response should have Cancelled stop_reason"
5191 );
5192
5193 thread.read_with(cx, |thread, _| {
5194 let tool_entry = thread
5195 .entries
5196 .iter()
5197 .find_map(|e| {
5198 if let AgentThreadEntry::ToolCall(call) = e {
5199 Some(call)
5200 } else {
5201 None
5202 }
5203 })
5204 .expect("should have tool call entry");
5205
5206 assert!(
5207 matches!(tool_entry.status, ToolCallStatus::Canceled),
5208 "tool should be marked as Canceled when response is Cancelled, got {:?}",
5209 tool_entry.status
5210 );
5211 });
5212 }
5213
5214 #[gpui::test]
5215 async fn test_provisional_title_replaced_by_real_title(cx: &mut TestAppContext) {
5216 init_test(cx);
5217
5218 let fs = FakeFs::new(cx.executor());
5219 let project = Project::test(fs, [], cx).await;
5220 let connection = Rc::new(FakeAgentConnection::new());
5221 let set_title_calls = connection.set_title_calls.clone();
5222
5223 let thread = cx
5224 .update(|cx| {
5225 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
5226 })
5227 .await
5228 .unwrap();
5229
5230 // Initial title is the default.
5231 thread.read_with(cx, |thread, _| {
5232 assert_eq!(thread.title(), None);
5233 });
5234
5235 // Setting a provisional title updates the display title.
5236 thread.update(cx, |thread, cx| {
5237 thread.set_provisional_title("Hello, can you help…".into(), cx);
5238 });
5239 thread.read_with(cx, |thread, _| {
5240 assert_eq!(
5241 thread.title().as_ref().map(|s| s.as_str()),
5242 Some("Hello, can you help…")
5243 );
5244 });
5245
5246 // The provisional title should NOT have propagated to the connection.
5247 assert_eq!(
5248 set_title_calls.borrow().len(),
5249 0,
5250 "provisional title should not propagate to the connection"
5251 );
5252
5253 // When the real title arrives via set_title, it replaces the
5254 // provisional title and propagates to the connection.
5255 let task = thread.update(cx, |thread, cx| {
5256 thread.set_title("Helping with Rust question".into(), cx)
5257 });
5258 task.await.expect("set_title should succeed");
5259 thread.read_with(cx, |thread, _| {
5260 assert_eq!(
5261 thread.title().as_ref().map(|s| s.as_str()),
5262 Some("Helping with Rust question")
5263 );
5264 });
5265 assert_eq!(
5266 set_title_calls.borrow().as_slice(),
5267 &[SharedString::from("Helping with Rust question")],
5268 "real title should propagate to the connection"
5269 );
5270 }
5271
5272 #[gpui::test]
5273 async fn test_session_info_update_replaces_provisional_title_and_emits_event(
5274 cx: &mut TestAppContext,
5275 ) {
5276 init_test(cx);
5277
5278 let fs = FakeFs::new(cx.executor());
5279 let project = Project::test(fs, [], cx).await;
5280 let connection = Rc::new(FakeAgentConnection::new());
5281
5282 let thread = cx
5283 .update(|cx| {
5284 connection.clone().new_session(
5285 project,
5286 PathList::new(&[Path::new(path!("/test"))]),
5287 cx,
5288 )
5289 })
5290 .await
5291 .unwrap();
5292
5293 let title_updated_events = Rc::new(RefCell::new(0usize));
5294 let title_updated_events_for_subscription = title_updated_events.clone();
5295 thread.update(cx, |_thread, cx| {
5296 cx.subscribe(
5297 &thread,
5298 move |_thread, _event_thread, event: &AcpThreadEvent, _cx| {
5299 if matches!(event, AcpThreadEvent::TitleUpdated) {
5300 *title_updated_events_for_subscription.borrow_mut() += 1;
5301 }
5302 },
5303 )
5304 .detach();
5305 });
5306
5307 thread.update(cx, |thread, cx| {
5308 thread.set_provisional_title("Hello, can you help…".into(), cx);
5309 });
5310 assert_eq!(
5311 *title_updated_events.borrow(),
5312 1,
5313 "setting a provisional title should emit TitleUpdated"
5314 );
5315
5316 let result = thread.update(cx, |thread, cx| {
5317 thread.handle_session_update(
5318 acp::SessionUpdate::SessionInfoUpdate(
5319 acp::SessionInfoUpdate::new().title("Helping with Rust question"),
5320 ),
5321 cx,
5322 )
5323 });
5324 result.expect("session info update should succeed");
5325
5326 thread.read_with(cx, |thread, _| {
5327 assert_eq!(
5328 thread.title().as_ref().map(|s| s.as_str()),
5329 Some("Helping with Rust question")
5330 );
5331 assert!(
5332 !thread.has_provisional_title(),
5333 "session info title update should clear provisional title"
5334 );
5335 });
5336
5337 assert_eq!(
5338 *title_updated_events.borrow(),
5339 2,
5340 "session info title update should emit TitleUpdated"
5341 );
5342 assert!(
5343 connection.set_title_calls.borrow().is_empty(),
5344 "session info title update should not propagate back to the connection"
5345 );
5346 }
5347
5348 #[gpui::test]
5349 async fn test_usage_update_populates_token_usage_and_cost(cx: &mut TestAppContext) {
5350 init_test(cx);
5351
5352 let fs = FakeFs::new(cx.executor());
5353 let project = Project::test(fs, [], cx).await;
5354 let connection = Rc::new(FakeAgentConnection::new());
5355 let thread = cx
5356 .update(|cx| {
5357 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
5358 })
5359 .await
5360 .unwrap();
5361
5362 thread.update(cx, |thread, cx| {
5363 thread
5364 .handle_session_update(
5365 acp::SessionUpdate::UsageUpdate(
5366 acp::UsageUpdate::new(5000, 10000).cost(acp::Cost::new(0.42, "USD")),
5367 ),
5368 cx,
5369 )
5370 .unwrap();
5371 });
5372
5373 thread.read_with(cx, |thread, _| {
5374 let usage = thread.token_usage().expect("token_usage should be set");
5375 assert_eq!(usage.max_tokens, 10000);
5376 assert_eq!(usage.used_tokens, 5000);
5377
5378 let cost = thread.cost().expect("cost should be set");
5379 assert!((cost.amount - 0.42).abs() < f64::EPSILON);
5380 assert_eq!(cost.currency.as_ref(), "USD");
5381 });
5382 }
5383
5384 #[gpui::test]
5385 async fn test_usage_update_without_cost_preserves_existing_cost(cx: &mut TestAppContext) {
5386 init_test(cx);
5387
5388 let fs = FakeFs::new(cx.executor());
5389 let project = Project::test(fs, [], cx).await;
5390 let connection = Rc::new(FakeAgentConnection::new());
5391 let thread = cx
5392 .update(|cx| {
5393 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
5394 })
5395 .await
5396 .unwrap();
5397
5398 thread.update(cx, |thread, cx| {
5399 thread
5400 .handle_session_update(
5401 acp::SessionUpdate::UsageUpdate(
5402 acp::UsageUpdate::new(1000, 10000).cost(acp::Cost::new(0.10, "USD")),
5403 ),
5404 cx,
5405 )
5406 .unwrap();
5407
5408 thread
5409 .handle_session_update(
5410 acp::SessionUpdate::UsageUpdate(acp::UsageUpdate::new(2000, 10000)),
5411 cx,
5412 )
5413 .unwrap();
5414 });
5415
5416 thread.read_with(cx, |thread, _| {
5417 let usage = thread.token_usage().expect("token_usage should be set");
5418 assert_eq!(usage.used_tokens, 2000);
5419
5420 let cost = thread.cost().expect("cost should be preserved");
5421 assert!((cost.amount - 0.10).abs() < f64::EPSILON);
5422 });
5423 }
5424
5425 #[gpui::test]
5426 async fn test_response_usage_does_not_clobber_session_usage(cx: &mut TestAppContext) {
5427 init_test(cx);
5428
5429 let fs = FakeFs::new(cx.executor());
5430 let project = Project::test(fs, [], cx).await;
5431 let connection = Rc::new(FakeAgentConnection::new().on_user_message(
5432 move |_, thread, mut cx| {
5433 async move {
5434 thread.update(&mut cx, |thread, cx| {
5435 thread
5436 .handle_session_update(
5437 acp::SessionUpdate::UsageUpdate(
5438 acp::UsageUpdate::new(3000, 10000)
5439 .cost(acp::Cost::new(0.05, "EUR")),
5440 ),
5441 cx,
5442 )
5443 .unwrap();
5444 })?;
5445 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn)
5446 .usage(acp::Usage::new(500, 200, 300)))
5447 }
5448 .boxed_local()
5449 },
5450 ));
5451
5452 let thread = cx
5453 .update(|cx| {
5454 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
5455 })
5456 .await
5457 .unwrap();
5458
5459 thread
5460 .update(cx, |thread, cx| thread.send_raw("hello", cx))
5461 .await
5462 .unwrap();
5463
5464 thread.read_with(cx, |thread, _| {
5465 let usage = thread.token_usage().expect("token_usage should be set");
5466 assert_eq!(usage.max_tokens, 10000, "max_tokens from UsageUpdate");
5467 assert_eq!(usage.used_tokens, 3000, "used_tokens from UsageUpdate");
5468 assert_eq!(usage.input_tokens, 200, "input_tokens from response usage");
5469 assert_eq!(
5470 usage.output_tokens, 300,
5471 "output_tokens from response usage"
5472 );
5473
5474 let cost = thread.cost().expect("cost should be set");
5475 assert!((cost.amount - 0.05).abs() < f64::EPSILON);
5476 assert_eq!(cost.currency.as_ref(), "EUR");
5477 });
5478 }
5479
5480 #[gpui::test]
5481 async fn test_clearing_token_usage_also_clears_cost(cx: &mut TestAppContext) {
5482 init_test(cx);
5483
5484 let fs = FakeFs::new(cx.executor());
5485 let project = Project::test(fs, [], cx).await;
5486 let connection = Rc::new(FakeAgentConnection::new());
5487 let thread = cx
5488 .update(|cx| {
5489 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
5490 })
5491 .await
5492 .unwrap();
5493
5494 thread.update(cx, |thread, cx| {
5495 thread
5496 .handle_session_update(
5497 acp::SessionUpdate::UsageUpdate(
5498 acp::UsageUpdate::new(1000, 10000).cost(acp::Cost::new(0.25, "USD")),
5499 ),
5500 cx,
5501 )
5502 .unwrap();
5503
5504 assert!(thread.token_usage().is_some());
5505 assert!(thread.cost().is_some());
5506
5507 thread.update_token_usage(None, cx);
5508
5509 assert!(thread.token_usage().is_none());
5510 assert!(
5511 thread.cost().is_none(),
5512 "cost should be cleared when token usage is cleared"
5513 );
5514 });
5515 }
5516}