1mod connection;
2mod diff;
3mod mention;
4mod terminal;
5use action_log::{ActionLog, ActionLogTelemetry};
6use agent_client_protocol::{self as acp};
7use anyhow::{Context as _, Result, anyhow};
8use collections::HashSet;
9pub use connection::*;
10pub use diff::*;
11use feature_flags::{AcpBetaFeatureFlag, FeatureFlagAppExt as _};
12use futures::{FutureExt, channel::oneshot, future::BoxFuture};
13use gpui::{AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Task, WeakEntity};
14use itertools::Itertools;
15use language::language_settings::FormatOnSave;
16use language::{Anchor, Buffer, BufferSnapshot, LanguageRegistry, Point, ToPoint, text_diff};
17use markdown::Markdown;
18pub use mention::*;
19use project::lsp_store::{FormatTrigger, LspFormatTarget};
20use project::{AgentLocation, Project, git_store::GitStoreCheckpoint};
21use serde::{Deserialize, Serialize};
22use serde_json::to_string_pretty;
23use std::collections::HashMap;
24use std::error::Error;
25use std::fmt::{Formatter, Write};
26use std::ops::Range;
27use std::process::ExitStatus;
28use std::rc::Rc;
29use std::time::{Duration, Instant};
30use std::{fmt::Display, mem, path::PathBuf, sync::Arc};
31use task::{Shell, ShellBuilder};
32pub use terminal::*;
33use text::Bias;
34use ui::App;
35use util::markdown::MarkdownEscaped;
36use util::path_list::PathList;
37use util::{ResultExt, get_default_system_shell_preferring_bash, paths::PathStyle};
38use uuid::Uuid;
39
40/// Returned when the model stops because it exhausted its output token budget.
41#[derive(Debug)]
42pub struct MaxOutputTokensError;
43
44impl std::fmt::Display for MaxOutputTokensError {
45 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
46 write!(f, "output token limit reached")
47 }
48}
49
50impl std::error::Error for MaxOutputTokensError {}
51
52/// Key used in ACP ToolCall meta to store the tool's programmatic name.
53/// This is a workaround since ACP's ToolCall doesn't have a dedicated name field.
54pub const TOOL_NAME_META_KEY: &str = "tool_name";
55
56/// Helper to extract tool name from ACP meta
57pub fn tool_name_from_meta(meta: &Option<acp::Meta>) -> Option<SharedString> {
58 meta.as_ref()
59 .and_then(|m| m.get(TOOL_NAME_META_KEY))
60 .and_then(|v| v.as_str())
61 .map(|s| SharedString::from(s.to_owned()))
62}
63
64/// Helper to create meta with tool name
65pub fn meta_with_tool_name(tool_name: &str) -> acp::Meta {
66 acp::Meta::from_iter([(TOOL_NAME_META_KEY.into(), tool_name.into())])
67}
68
69/// Key used in ACP ToolCall meta to store the session id and message indexes
70pub const SUBAGENT_SESSION_INFO_META_KEY: &str = "subagent_session_info";
71
72#[derive(Clone, Debug, Deserialize, Serialize)]
73pub struct SubagentSessionInfo {
74 /// The session id of the subagent sessiont that was spawned
75 pub session_id: acp::SessionId,
76 /// The index of the message of the start of the "turn" run by this tool call
77 pub message_start_index: usize,
78 /// The index of the output of the message that the subagent has returned
79 #[serde(skip_serializing_if = "Option::is_none")]
80 pub message_end_index: Option<usize>,
81}
82
83/// Helper to extract subagent session id from ACP meta
84pub fn subagent_session_info_from_meta(meta: &Option<acp::Meta>) -> Option<SubagentSessionInfo> {
85 meta.as_ref()
86 .and_then(|m| m.get(SUBAGENT_SESSION_INFO_META_KEY))
87 .and_then(|v| serde_json::from_value(v.clone()).ok())
88}
89
90#[derive(Debug)]
91pub struct UserMessage {
92 pub id: Option<UserMessageId>,
93 pub content: ContentBlock,
94 pub chunks: Vec<acp::ContentBlock>,
95 pub checkpoint: Option<Checkpoint>,
96 pub indented: bool,
97}
98
99#[derive(Debug)]
100pub struct Checkpoint {
101 git_checkpoint: GitStoreCheckpoint,
102 pub show: bool,
103}
104
105impl UserMessage {
106 fn to_markdown(&self, cx: &App) -> String {
107 let mut markdown = String::new();
108 if self
109 .checkpoint
110 .as_ref()
111 .is_some_and(|checkpoint| checkpoint.show)
112 {
113 writeln!(markdown, "## User (checkpoint)").unwrap();
114 } else {
115 writeln!(markdown, "## User").unwrap();
116 }
117 writeln!(markdown).unwrap();
118 writeln!(markdown, "{}", self.content.to_markdown(cx)).unwrap();
119 writeln!(markdown).unwrap();
120 markdown
121 }
122}
123
124#[derive(Debug, PartialEq)]
125pub struct AssistantMessage {
126 pub chunks: Vec<AssistantMessageChunk>,
127 pub indented: bool,
128 pub is_subagent_output: bool,
129}
130
131impl AssistantMessage {
132 pub fn to_markdown(&self, cx: &App) -> String {
133 format!(
134 "## Assistant\n\n{}\n\n",
135 self.chunks
136 .iter()
137 .map(|chunk| chunk.to_markdown(cx))
138 .join("\n\n")
139 )
140 }
141}
142
143#[derive(Debug, PartialEq)]
144pub enum AssistantMessageChunk {
145 Message { block: ContentBlock },
146 Thought { block: ContentBlock },
147}
148
149impl AssistantMessageChunk {
150 pub fn from_str(
151 chunk: &str,
152 language_registry: &Arc<LanguageRegistry>,
153 path_style: PathStyle,
154 cx: &mut App,
155 ) -> Self {
156 Self::Message {
157 block: ContentBlock::new(chunk.into(), language_registry, path_style, cx),
158 }
159 }
160
161 fn to_markdown(&self, cx: &App) -> String {
162 match self {
163 Self::Message { block } => block.to_markdown(cx).to_string(),
164 Self::Thought { block } => {
165 format!("<thinking>\n{}\n</thinking>", block.to_markdown(cx))
166 }
167 }
168 }
169}
170
171#[derive(Debug)]
172pub enum AgentThreadEntry {
173 UserMessage(UserMessage),
174 AssistantMessage(AssistantMessage),
175 ToolCall(ToolCall),
176 CompletedPlan(Vec<PlanEntry>),
177}
178
179impl AgentThreadEntry {
180 pub fn is_indented(&self) -> bool {
181 match self {
182 Self::UserMessage(message) => message.indented,
183 Self::AssistantMessage(message) => message.indented,
184 Self::ToolCall(_) => false,
185 Self::CompletedPlan(_) => false,
186 }
187 }
188
189 pub fn to_markdown(&self, cx: &App) -> String {
190 match self {
191 Self::UserMessage(message) => message.to_markdown(cx),
192 Self::AssistantMessage(message) => message.to_markdown(cx),
193 Self::ToolCall(tool_call) => tool_call.to_markdown(cx),
194 Self::CompletedPlan(entries) => {
195 let mut md = String::from("## Plan\n\n");
196 for entry in entries {
197 let source = entry.content.read(cx).source().to_string();
198 md.push_str(&format!("- [x] {}\n", source));
199 }
200 md
201 }
202 }
203 }
204
205 pub fn user_message(&self) -> Option<&UserMessage> {
206 if let AgentThreadEntry::UserMessage(message) = self {
207 Some(message)
208 } else {
209 None
210 }
211 }
212
213 pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
214 if let AgentThreadEntry::ToolCall(call) = self {
215 itertools::Either::Left(call.diffs())
216 } else {
217 itertools::Either::Right(std::iter::empty())
218 }
219 }
220
221 pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
222 if let AgentThreadEntry::ToolCall(call) = self {
223 itertools::Either::Left(call.terminals())
224 } else {
225 itertools::Either::Right(std::iter::empty())
226 }
227 }
228
229 pub fn location(&self, ix: usize) -> Option<(acp::ToolCallLocation, AgentLocation)> {
230 if let AgentThreadEntry::ToolCall(ToolCall {
231 locations,
232 resolved_locations,
233 ..
234 }) = self
235 {
236 Some((
237 locations.get(ix)?.clone(),
238 resolved_locations.get(ix)?.clone()?,
239 ))
240 } else {
241 None
242 }
243 }
244}
245
246#[derive(Debug)]
247pub struct ToolCall {
248 pub id: acp::ToolCallId,
249 pub label: Entity<Markdown>,
250 pub kind: acp::ToolKind,
251 pub content: Vec<ToolCallContent>,
252 pub status: ToolCallStatus,
253 pub locations: Vec<acp::ToolCallLocation>,
254 pub resolved_locations: Vec<Option<AgentLocation>>,
255 pub raw_input: Option<serde_json::Value>,
256 pub raw_input_markdown: Option<Entity<Markdown>>,
257 pub raw_output: Option<serde_json::Value>,
258 pub tool_name: Option<SharedString>,
259 pub subagent_session_info: Option<SubagentSessionInfo>,
260}
261
262impl ToolCall {
263 fn from_acp(
264 tool_call: acp::ToolCall,
265 status: ToolCallStatus,
266 language_registry: Arc<LanguageRegistry>,
267 path_style: PathStyle,
268 terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
269 cx: &mut App,
270 ) -> Result<Self> {
271 let title = if tool_call.kind == acp::ToolKind::Execute {
272 tool_call.title
273 } else if tool_call.kind == acp::ToolKind::Edit {
274 MarkdownEscaped(tool_call.title.as_str()).to_string()
275 } else if let Some((first_line, _)) = tool_call.title.split_once("\n") {
276 first_line.to_owned() + "…"
277 } else {
278 tool_call.title
279 };
280 let mut content = Vec::with_capacity(tool_call.content.len());
281 for item in tool_call.content {
282 if let Some(item) = ToolCallContent::from_acp(
283 item,
284 language_registry.clone(),
285 path_style,
286 terminals,
287 cx,
288 )? {
289 content.push(item);
290 }
291 }
292
293 let raw_input_markdown = tool_call
294 .raw_input
295 .as_ref()
296 .and_then(|input| markdown_for_raw_output(input, &language_registry, cx));
297
298 let tool_name = tool_name_from_meta(&tool_call.meta);
299
300 let subagent_session_info = subagent_session_info_from_meta(&tool_call.meta);
301
302 let result = Self {
303 id: tool_call.tool_call_id,
304 label: cx
305 .new(|cx| Markdown::new(title.into(), Some(language_registry.clone()), None, cx)),
306 kind: tool_call.kind,
307 content,
308 locations: tool_call.locations,
309 resolved_locations: Vec::default(),
310 status,
311 raw_input: tool_call.raw_input,
312 raw_input_markdown,
313 raw_output: tool_call.raw_output,
314 tool_name,
315 subagent_session_info,
316 };
317 Ok(result)
318 }
319
320 fn update_fields(
321 &mut self,
322 fields: acp::ToolCallUpdateFields,
323 meta: Option<acp::Meta>,
324 language_registry: Arc<LanguageRegistry>,
325 path_style: PathStyle,
326 terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
327 cx: &mut App,
328 ) -> Result<()> {
329 let acp::ToolCallUpdateFields {
330 kind,
331 status,
332 title,
333 content,
334 locations,
335 raw_input,
336 raw_output,
337 ..
338 } = fields;
339
340 if let Some(kind) = kind {
341 self.kind = kind;
342 }
343
344 if let Some(status) = status {
345 self.status = status.into();
346 }
347
348 if let Some(subagent_session_info) = subagent_session_info_from_meta(&meta) {
349 self.subagent_session_info = Some(subagent_session_info);
350 }
351
352 if let Some(title) = title {
353 if self.kind == acp::ToolKind::Execute {
354 for terminal in self.terminals() {
355 terminal.update(cx, |terminal, cx| {
356 terminal.update_command_label(&title, cx);
357 });
358 }
359 }
360 self.label.update(cx, |label, cx| {
361 if self.kind == acp::ToolKind::Execute {
362 label.replace(title, cx);
363 } else if self.kind == acp::ToolKind::Edit {
364 label.replace(MarkdownEscaped(&title).to_string(), cx)
365 } else if let Some((first_line, _)) = title.split_once("\n") {
366 label.replace(first_line.to_owned() + "…", cx);
367 } else {
368 label.replace(title, cx);
369 }
370 });
371 }
372
373 if let Some(content) = content {
374 let mut new_content_len = content.len();
375 let mut content = content.into_iter();
376
377 // Reuse existing content if we can
378 for (old, new) in self.content.iter_mut().zip(content.by_ref()) {
379 let valid_content =
380 old.update_from_acp(new, language_registry.clone(), path_style, terminals, cx)?;
381 if !valid_content {
382 new_content_len -= 1;
383 }
384 }
385 for new in content {
386 if let Some(new) = ToolCallContent::from_acp(
387 new,
388 language_registry.clone(),
389 path_style,
390 terminals,
391 cx,
392 )? {
393 self.content.push(new);
394 } else {
395 new_content_len -= 1;
396 }
397 }
398 self.content.truncate(new_content_len);
399 }
400
401 if let Some(locations) = locations {
402 self.locations = locations;
403 }
404
405 if let Some(raw_input) = raw_input {
406 self.raw_input_markdown = markdown_for_raw_output(&raw_input, &language_registry, cx);
407 self.raw_input = Some(raw_input);
408 }
409
410 if let Some(raw_output) = raw_output {
411 if self.content.is_empty()
412 && let Some(markdown) = markdown_for_raw_output(&raw_output, &language_registry, cx)
413 {
414 self.content
415 .push(ToolCallContent::ContentBlock(ContentBlock::Markdown {
416 markdown,
417 }));
418 }
419 self.raw_output = Some(raw_output);
420 }
421 Ok(())
422 }
423
424 pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
425 self.content.iter().filter_map(|content| match content {
426 ToolCallContent::Diff(diff) => Some(diff),
427 ToolCallContent::ContentBlock(_) => None,
428 ToolCallContent::Terminal(_) => None,
429 })
430 }
431
432 pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
433 self.content.iter().filter_map(|content| match content {
434 ToolCallContent::Terminal(terminal) => Some(terminal),
435 ToolCallContent::ContentBlock(_) => None,
436 ToolCallContent::Diff(_) => None,
437 })
438 }
439
440 pub fn is_subagent(&self) -> bool {
441 self.tool_name.as_ref().is_some_and(|s| s == "spawn_agent")
442 || self.subagent_session_info.is_some()
443 }
444
445 pub fn to_markdown(&self, cx: &App) -> String {
446 let mut markdown = format!(
447 "**Tool Call: {}**\nStatus: {}\n\n",
448 self.label.read(cx).source(),
449 self.status
450 );
451 for content in &self.content {
452 markdown.push_str(content.to_markdown(cx).as_str());
453 markdown.push_str("\n\n");
454 }
455 markdown
456 }
457
458 async fn resolve_location(
459 location: acp::ToolCallLocation,
460 project: WeakEntity<Project>,
461 cx: &mut AsyncApp,
462 ) -> Option<ResolvedLocation> {
463 let buffer = project
464 .update(cx, |project, cx| {
465 project
466 .project_path_for_absolute_path(&location.path, cx)
467 .map(|path| project.open_buffer(path, cx))
468 })
469 .ok()??;
470 let buffer = buffer.await.log_err()?;
471 let position = buffer.update(cx, |buffer, _| {
472 let snapshot = buffer.snapshot();
473 if let Some(row) = location.line {
474 let column = snapshot.indent_size_for_line(row).len;
475 let point = snapshot.clip_point(Point::new(row, column), Bias::Left);
476 snapshot.anchor_before(point)
477 } else {
478 Anchor::min_for_buffer(snapshot.remote_id())
479 }
480 });
481
482 Some(ResolvedLocation { buffer, position })
483 }
484
485 fn resolve_locations(
486 &self,
487 project: Entity<Project>,
488 cx: &mut App,
489 ) -> Task<Vec<Option<ResolvedLocation>>> {
490 let locations = self.locations.clone();
491 project.update(cx, |_, cx| {
492 cx.spawn(async move |project, cx| {
493 let mut new_locations = Vec::new();
494 for location in locations {
495 new_locations.push(Self::resolve_location(location, project.clone(), cx).await);
496 }
497 new_locations
498 })
499 })
500 }
501}
502
503// Separate so we can hold a strong reference to the buffer
504// for saving on the thread
505#[derive(Clone, Debug, PartialEq, Eq)]
506struct ResolvedLocation {
507 buffer: Entity<Buffer>,
508 position: Anchor,
509}
510
511impl From<&ResolvedLocation> for AgentLocation {
512 fn from(value: &ResolvedLocation) -> Self {
513 Self {
514 buffer: value.buffer.downgrade(),
515 position: value.position,
516 }
517 }
518}
519
520#[derive(Debug, Clone)]
521pub enum SelectedPermissionParams {
522 Terminal { patterns: Vec<String> },
523}
524
525#[derive(Debug)]
526pub struct SelectedPermissionOutcome {
527 pub option_id: acp::PermissionOptionId,
528 pub option_kind: acp::PermissionOptionKind,
529 pub params: Option<SelectedPermissionParams>,
530}
531
532impl SelectedPermissionOutcome {
533 pub fn new(option_id: acp::PermissionOptionId, option_kind: acp::PermissionOptionKind) -> Self {
534 Self {
535 option_id,
536 option_kind,
537 params: None,
538 }
539 }
540
541 pub fn params(mut self, params: Option<SelectedPermissionParams>) -> Self {
542 self.params = params;
543 self
544 }
545}
546
547impl From<SelectedPermissionOutcome> for acp::SelectedPermissionOutcome {
548 fn from(value: SelectedPermissionOutcome) -> Self {
549 Self::new(value.option_id)
550 }
551}
552
553#[derive(Debug)]
554pub enum RequestPermissionOutcome {
555 Cancelled,
556 Selected(SelectedPermissionOutcome),
557}
558
559impl From<RequestPermissionOutcome> for acp::RequestPermissionOutcome {
560 fn from(value: RequestPermissionOutcome) -> Self {
561 match value {
562 RequestPermissionOutcome::Cancelled => Self::Cancelled,
563 RequestPermissionOutcome::Selected(outcome) => Self::Selected(outcome.into()),
564 }
565 }
566}
567
568#[derive(Debug)]
569pub enum ToolCallStatus {
570 /// The tool call hasn't started running yet, but we start showing it to
571 /// the user.
572 Pending,
573 /// The tool call is waiting for confirmation from the user.
574 WaitingForConfirmation {
575 options: PermissionOptions,
576 respond_tx: oneshot::Sender<SelectedPermissionOutcome>,
577 },
578 /// The tool call is currently running.
579 InProgress,
580 /// The tool call completed successfully.
581 Completed,
582 /// The tool call failed.
583 Failed,
584 /// The user rejected the tool call.
585 Rejected,
586 /// The user canceled generation so the tool call was canceled.
587 Canceled,
588}
589
590impl From<acp::ToolCallStatus> for ToolCallStatus {
591 fn from(status: acp::ToolCallStatus) -> Self {
592 match status {
593 acp::ToolCallStatus::Pending => Self::Pending,
594 acp::ToolCallStatus::InProgress => Self::InProgress,
595 acp::ToolCallStatus::Completed => Self::Completed,
596 acp::ToolCallStatus::Failed => Self::Failed,
597 _ => Self::Pending,
598 }
599 }
600}
601
602impl Display for ToolCallStatus {
603 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
604 write!(
605 f,
606 "{}",
607 match self {
608 ToolCallStatus::Pending => "Pending",
609 ToolCallStatus::WaitingForConfirmation { .. } => "Waiting for confirmation",
610 ToolCallStatus::InProgress => "In Progress",
611 ToolCallStatus::Completed => "Completed",
612 ToolCallStatus::Failed => "Failed",
613 ToolCallStatus::Rejected => "Rejected",
614 ToolCallStatus::Canceled => "Canceled",
615 }
616 )
617 }
618}
619
620#[derive(Debug, PartialEq, Clone)]
621pub enum ContentBlock {
622 Empty,
623 Markdown { markdown: Entity<Markdown> },
624 ResourceLink { resource_link: acp::ResourceLink },
625 Image { image: Arc<gpui::Image> },
626}
627
628impl ContentBlock {
629 pub fn new(
630 block: acp::ContentBlock,
631 language_registry: &Arc<LanguageRegistry>,
632 path_style: PathStyle,
633 cx: &mut App,
634 ) -> Self {
635 let mut this = Self::Empty;
636 this.append(block, language_registry, path_style, cx);
637 this
638 }
639
640 pub fn new_combined(
641 blocks: impl IntoIterator<Item = acp::ContentBlock>,
642 language_registry: Arc<LanguageRegistry>,
643 path_style: PathStyle,
644 cx: &mut App,
645 ) -> Self {
646 let mut this = Self::Empty;
647 for block in blocks {
648 this.append(block, &language_registry, path_style, cx);
649 }
650 this
651 }
652
653 pub fn append(
654 &mut self,
655 block: acp::ContentBlock,
656 language_registry: &Arc<LanguageRegistry>,
657 path_style: PathStyle,
658 cx: &mut App,
659 ) {
660 match (&mut *self, &block) {
661 (ContentBlock::Empty, acp::ContentBlock::ResourceLink(resource_link)) => {
662 *self = ContentBlock::ResourceLink {
663 resource_link: resource_link.clone(),
664 };
665 }
666 (ContentBlock::Empty, acp::ContentBlock::Image(image_content)) => {
667 if let Some(image) = Self::decode_image(image_content) {
668 *self = ContentBlock::Image { image };
669 } else {
670 let new_content = Self::image_md(image_content);
671 *self = Self::create_markdown_block(new_content, language_registry, cx);
672 }
673 }
674 (ContentBlock::Empty, _) => {
675 let new_content = Self::block_string_contents(&block, path_style);
676 *self = Self::create_markdown_block(new_content, language_registry, cx);
677 }
678 (ContentBlock::Markdown { markdown }, _) => {
679 let new_content = Self::block_string_contents(&block, path_style);
680 markdown.update(cx, |markdown, cx| markdown.append(&new_content, cx));
681 }
682 (ContentBlock::ResourceLink { resource_link }, _) => {
683 let existing_content = Self::resource_link_md(&resource_link.uri, path_style);
684 let new_content = Self::block_string_contents(&block, path_style);
685 let combined = format!("{}\n{}", existing_content, new_content);
686 *self = Self::create_markdown_block(combined, language_registry, cx);
687 }
688 (ContentBlock::Image { .. }, _) => {
689 let new_content = Self::block_string_contents(&block, path_style);
690 let combined = format!("`Image`\n{}", new_content);
691 *self = Self::create_markdown_block(combined, language_registry, cx);
692 }
693 }
694 }
695
696 fn decode_image(image_content: &acp::ImageContent) -> Option<Arc<gpui::Image>> {
697 use base64::Engine as _;
698
699 let bytes = base64::engine::general_purpose::STANDARD
700 .decode(image_content.data.as_bytes())
701 .ok()?;
702 let format = gpui::ImageFormat::from_mime_type(&image_content.mime_type)?;
703 Some(Arc::new(gpui::Image::from_bytes(format, bytes)))
704 }
705
706 fn create_markdown_block(
707 content: String,
708 language_registry: &Arc<LanguageRegistry>,
709 cx: &mut App,
710 ) -> ContentBlock {
711 ContentBlock::Markdown {
712 markdown: cx
713 .new(|cx| Markdown::new(content.into(), Some(language_registry.clone()), None, cx)),
714 }
715 }
716
717 fn block_string_contents(block: &acp::ContentBlock, path_style: PathStyle) -> String {
718 match block {
719 acp::ContentBlock::Text(text_content) => text_content.text.clone(),
720 acp::ContentBlock::ResourceLink(resource_link) => {
721 Self::resource_link_md(&resource_link.uri, path_style)
722 }
723 acp::ContentBlock::Resource(acp::EmbeddedResource {
724 resource:
725 acp::EmbeddedResourceResource::TextResourceContents(acp::TextResourceContents {
726 uri,
727 ..
728 }),
729 ..
730 }) => Self::resource_link_md(uri, path_style),
731 acp::ContentBlock::Image(image) => Self::image_md(image),
732 _ => String::new(),
733 }
734 }
735
736 fn resource_link_md(uri: &str, path_style: PathStyle) -> String {
737 if let Some(uri) = MentionUri::parse(uri, path_style).log_err() {
738 uri.as_link().to_string()
739 } else {
740 uri.to_string()
741 }
742 }
743
744 fn image_md(_image: &acp::ImageContent) -> String {
745 "`Image`".into()
746 }
747
748 pub fn to_markdown<'a>(&'a self, cx: &'a App) -> &'a str {
749 match self {
750 ContentBlock::Empty => "",
751 ContentBlock::Markdown { markdown } => markdown.read(cx).source(),
752 ContentBlock::ResourceLink { resource_link } => &resource_link.uri,
753 ContentBlock::Image { .. } => "`Image`",
754 }
755 }
756
757 pub fn markdown(&self) -> Option<&Entity<Markdown>> {
758 match self {
759 ContentBlock::Empty => None,
760 ContentBlock::Markdown { markdown } => Some(markdown),
761 ContentBlock::ResourceLink { .. } => None,
762 ContentBlock::Image { .. } => None,
763 }
764 }
765
766 pub fn resource_link(&self) -> Option<&acp::ResourceLink> {
767 match self {
768 ContentBlock::ResourceLink { resource_link } => Some(resource_link),
769 _ => None,
770 }
771 }
772
773 pub fn image(&self) -> Option<&Arc<gpui::Image>> {
774 match self {
775 ContentBlock::Image { image } => Some(image),
776 _ => None,
777 }
778 }
779}
780
781#[derive(Debug)]
782pub enum ToolCallContent {
783 ContentBlock(ContentBlock),
784 Diff(Entity<Diff>),
785 Terminal(Entity<Terminal>),
786}
787
788impl ToolCallContent {
789 pub fn from_acp(
790 content: acp::ToolCallContent,
791 language_registry: Arc<LanguageRegistry>,
792 path_style: PathStyle,
793 terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
794 cx: &mut App,
795 ) -> Result<Option<Self>> {
796 match content {
797 acp::ToolCallContent::Content(acp::Content { content, .. }) => {
798 Ok(Some(Self::ContentBlock(ContentBlock::new(
799 content,
800 &language_registry,
801 path_style,
802 cx,
803 ))))
804 }
805 acp::ToolCallContent::Diff(diff) => Ok(Some(Self::Diff(cx.new(|cx| {
806 Diff::finalized(
807 diff.path.to_string_lossy().into_owned(),
808 diff.old_text,
809 diff.new_text,
810 language_registry,
811 cx,
812 )
813 })))),
814 acp::ToolCallContent::Terminal(acp::Terminal { terminal_id, .. }) => terminals
815 .get(&terminal_id)
816 .cloned()
817 .map(|terminal| Some(Self::Terminal(terminal)))
818 .ok_or_else(|| anyhow::anyhow!("Terminal with id `{}` not found", terminal_id)),
819 _ => Ok(None),
820 }
821 }
822
823 pub fn update_from_acp(
824 &mut self,
825 new: acp::ToolCallContent,
826 language_registry: Arc<LanguageRegistry>,
827 path_style: PathStyle,
828 terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
829 cx: &mut App,
830 ) -> Result<bool> {
831 let needs_update = match (&self, &new) {
832 (Self::Diff(old_diff), acp::ToolCallContent::Diff(new_diff)) => {
833 old_diff.read(cx).needs_update(
834 new_diff.old_text.as_deref().unwrap_or(""),
835 &new_diff.new_text,
836 cx,
837 )
838 }
839 _ => true,
840 };
841
842 if let Some(update) = Self::from_acp(new, language_registry, path_style, terminals, cx)? {
843 if needs_update {
844 *self = update;
845 }
846 Ok(true)
847 } else {
848 Ok(false)
849 }
850 }
851
852 pub fn to_markdown(&self, cx: &App) -> String {
853 match self {
854 Self::ContentBlock(content) => content.to_markdown(cx).to_string(),
855 Self::Diff(diff) => diff.read(cx).to_markdown(cx),
856 Self::Terminal(terminal) => terminal.read(cx).to_markdown(cx),
857 }
858 }
859
860 pub fn image(&self) -> Option<&Arc<gpui::Image>> {
861 match self {
862 Self::ContentBlock(content) => content.image(),
863 _ => None,
864 }
865 }
866}
867
868#[derive(Debug, PartialEq)]
869pub enum ToolCallUpdate {
870 UpdateFields(acp::ToolCallUpdate),
871 UpdateDiff(ToolCallUpdateDiff),
872 UpdateTerminal(ToolCallUpdateTerminal),
873}
874
875impl ToolCallUpdate {
876 fn id(&self) -> &acp::ToolCallId {
877 match self {
878 Self::UpdateFields(update) => &update.tool_call_id,
879 Self::UpdateDiff(diff) => &diff.id,
880 Self::UpdateTerminal(terminal) => &terminal.id,
881 }
882 }
883}
884
885impl From<acp::ToolCallUpdate> for ToolCallUpdate {
886 fn from(update: acp::ToolCallUpdate) -> Self {
887 Self::UpdateFields(update)
888 }
889}
890
891impl From<ToolCallUpdateDiff> for ToolCallUpdate {
892 fn from(diff: ToolCallUpdateDiff) -> Self {
893 Self::UpdateDiff(diff)
894 }
895}
896
897#[derive(Debug, PartialEq)]
898pub struct ToolCallUpdateDiff {
899 pub id: acp::ToolCallId,
900 pub diff: Entity<Diff>,
901}
902
903impl From<ToolCallUpdateTerminal> for ToolCallUpdate {
904 fn from(terminal: ToolCallUpdateTerminal) -> Self {
905 Self::UpdateTerminal(terminal)
906 }
907}
908
909#[derive(Debug, PartialEq)]
910pub struct ToolCallUpdateTerminal {
911 pub id: acp::ToolCallId,
912 pub terminal: Entity<Terminal>,
913}
914
915#[derive(Debug, Default)]
916pub struct Plan {
917 pub entries: Vec<PlanEntry>,
918}
919
920#[derive(Debug)]
921pub struct PlanStats<'a> {
922 pub in_progress_entry: Option<&'a PlanEntry>,
923 pub pending: u32,
924 pub completed: u32,
925}
926
927impl Plan {
928 pub fn is_empty(&self) -> bool {
929 self.entries.is_empty()
930 }
931
932 pub fn stats(&self) -> PlanStats<'_> {
933 let mut stats = PlanStats {
934 in_progress_entry: None,
935 pending: 0,
936 completed: 0,
937 };
938
939 for entry in &self.entries {
940 match &entry.status {
941 acp::PlanEntryStatus::Pending => {
942 stats.pending += 1;
943 }
944 acp::PlanEntryStatus::InProgress => {
945 stats.in_progress_entry = stats.in_progress_entry.or(Some(entry));
946 stats.pending += 1;
947 }
948 acp::PlanEntryStatus::Completed => {
949 stats.completed += 1;
950 }
951 _ => {}
952 }
953 }
954
955 stats
956 }
957}
958
959#[derive(Debug)]
960pub struct PlanEntry {
961 pub content: Entity<Markdown>,
962 pub priority: acp::PlanEntryPriority,
963 pub status: acp::PlanEntryStatus,
964}
965
966impl PlanEntry {
967 pub fn from_acp(entry: acp::PlanEntry, cx: &mut App) -> Self {
968 Self {
969 content: cx.new(|cx| Markdown::new(entry.content.into(), None, None, cx)),
970 priority: entry.priority,
971 status: entry.status,
972 }
973 }
974}
975
976#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
977pub struct TokenUsage {
978 pub max_tokens: u64,
979 pub used_tokens: u64,
980 pub input_tokens: u64,
981 pub output_tokens: u64,
982 pub max_output_tokens: Option<u64>,
983}
984
985#[derive(Debug, Clone)]
986pub struct SessionCost {
987 pub amount: f64,
988 pub currency: SharedString,
989}
990
991pub const TOKEN_USAGE_WARNING_THRESHOLD: f32 = 0.8;
992
993impl TokenUsage {
994 pub fn ratio(&self) -> TokenUsageRatio {
995 #[cfg(debug_assertions)]
996 let warning_threshold: f32 = std::env::var("ZED_THREAD_WARNING_THRESHOLD")
997 .unwrap_or(TOKEN_USAGE_WARNING_THRESHOLD.to_string())
998 .parse()
999 .unwrap();
1000 #[cfg(not(debug_assertions))]
1001 let warning_threshold: f32 = TOKEN_USAGE_WARNING_THRESHOLD;
1002
1003 // When the maximum is unknown because there is no selected model,
1004 // avoid showing the token limit warning.
1005 if self.max_tokens == 0 {
1006 TokenUsageRatio::Normal
1007 } else if self.used_tokens >= self.max_tokens {
1008 TokenUsageRatio::Exceeded
1009 } else if self.used_tokens as f32 / self.max_tokens as f32 >= warning_threshold {
1010 TokenUsageRatio::Warning
1011 } else {
1012 TokenUsageRatio::Normal
1013 }
1014 }
1015}
1016
1017#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
1018pub enum TokenUsageRatio {
1019 Normal,
1020 Warning,
1021 Exceeded,
1022}
1023
1024#[derive(Debug, Clone)]
1025pub struct RetryStatus {
1026 pub last_error: SharedString,
1027 pub attempt: usize,
1028 pub max_attempts: usize,
1029 pub started_at: Instant,
1030 pub duration: Duration,
1031}
1032
1033struct RunningTurn {
1034 id: u32,
1035 send_task: Task<()>,
1036}
1037
1038pub struct AcpThread {
1039 session_id: acp::SessionId,
1040 work_dirs: Option<PathList>,
1041 parent_session_id: Option<acp::SessionId>,
1042 title: Option<SharedString>,
1043 provisional_title: Option<SharedString>,
1044 entries: Vec<AgentThreadEntry>,
1045 plan: Plan,
1046 project: Entity<Project>,
1047 action_log: Entity<ActionLog>,
1048 shared_buffers: HashMap<Entity<Buffer>, BufferSnapshot>,
1049 turn_id: u32,
1050 running_turn: Option<RunningTurn>,
1051 connection: Rc<dyn AgentConnection>,
1052 token_usage: Option<TokenUsage>,
1053 cost: Option<SessionCost>,
1054 prompt_capabilities: acp::PromptCapabilities,
1055 available_commands: Vec<acp::AvailableCommand>,
1056 _observe_prompt_capabilities: Task<anyhow::Result<()>>,
1057 terminals: HashMap<acp::TerminalId, Entity<Terminal>>,
1058 pending_terminal_output: HashMap<acp::TerminalId, Vec<Vec<u8>>>,
1059 pending_terminal_exit: HashMap<acp::TerminalId, acp::TerminalExitStatus>,
1060 had_error: bool,
1061 /// The user's unsent prompt text, persisted so it can be restored when reloading the thread.
1062 draft_prompt: Option<Vec<acp::ContentBlock>>,
1063 /// The initial scroll position for the thread view, set during session registration.
1064 ui_scroll_position: Option<gpui::ListOffset>,
1065 /// Buffer for smooth text streaming. Holds text that has been received from
1066 /// the model but not yet revealed in the UI. A timer task drains this buffer
1067 /// gradually to create a fluid typing effect instead of choppy chunk-at-a-time
1068 /// updates.
1069 streaming_text_buffer: Option<StreamingTextBuffer>,
1070}
1071
1072struct StreamingTextBuffer {
1073 /// Text received from the model but not yet appended to the Markdown source.
1074 pending: String,
1075 /// The number of bytes to reveal per timer turn.
1076 bytes_to_reveal_per_tick: usize,
1077 /// The Markdown entity being streamed into.
1078 target: Entity<Markdown>,
1079 /// Timer task that periodically moves text from `pending` into `source`.
1080 _reveal_task: Task<()>,
1081}
1082
1083impl StreamingTextBuffer {
1084 /// The number of milliseconds between each timer tick, controlling how quickly
1085 /// text is revealed.
1086 const TASK_UPDATE_MS: u64 = 16;
1087 /// The time in milliseconds to reveal the entire pending text.
1088 const REVEAL_TARGET: f32 = 200.0;
1089}
1090
1091impl From<&AcpThread> for ActionLogTelemetry {
1092 fn from(value: &AcpThread) -> Self {
1093 Self {
1094 agent_telemetry_id: value.connection().telemetry_id(),
1095 session_id: value.session_id.0.clone(),
1096 }
1097 }
1098}
1099
1100#[derive(Debug)]
1101pub enum AcpThreadEvent {
1102 NewEntry,
1103 TitleUpdated,
1104 TokenUsageUpdated,
1105 EntryUpdated(usize),
1106 EntriesRemoved(Range<usize>),
1107 ToolAuthorizationRequested(acp::ToolCallId),
1108 ToolAuthorizationReceived(acp::ToolCallId),
1109 Retry(RetryStatus),
1110 SubagentSpawned(acp::SessionId),
1111 Stopped(acp::StopReason),
1112 Error,
1113 LoadError(LoadError),
1114 PromptCapabilitiesUpdated,
1115 Refusal,
1116 AvailableCommandsUpdated(Vec<acp::AvailableCommand>),
1117 ModeUpdated(acp::SessionModeId),
1118 ConfigOptionsUpdated(Vec<acp::SessionConfigOption>),
1119 WorkingDirectoriesUpdated,
1120}
1121
1122impl EventEmitter<AcpThreadEvent> for AcpThread {}
1123
1124#[derive(Debug, Clone)]
1125pub enum TerminalProviderEvent {
1126 Created {
1127 terminal_id: acp::TerminalId,
1128 label: String,
1129 cwd: Option<PathBuf>,
1130 output_byte_limit: Option<u64>,
1131 terminal: Entity<::terminal::Terminal>,
1132 },
1133 Output {
1134 terminal_id: acp::TerminalId,
1135 data: Vec<u8>,
1136 },
1137 TitleChanged {
1138 terminal_id: acp::TerminalId,
1139 title: String,
1140 },
1141 Exit {
1142 terminal_id: acp::TerminalId,
1143 status: acp::TerminalExitStatus,
1144 },
1145}
1146
1147#[derive(Debug, Clone)]
1148pub enum TerminalProviderCommand {
1149 WriteInput {
1150 terminal_id: acp::TerminalId,
1151 bytes: Vec<u8>,
1152 },
1153 Resize {
1154 terminal_id: acp::TerminalId,
1155 cols: u16,
1156 rows: u16,
1157 },
1158 Close {
1159 terminal_id: acp::TerminalId,
1160 },
1161}
1162
1163#[derive(PartialEq, Eq, Debug)]
1164pub enum ThreadStatus {
1165 Idle,
1166 Generating,
1167}
1168
1169#[derive(Debug, Clone)]
1170pub enum LoadError {
1171 Unsupported {
1172 command: SharedString,
1173 current_version: SharedString,
1174 minimum_version: SharedString,
1175 },
1176 FailedToInstall(SharedString),
1177 Exited {
1178 status: ExitStatus,
1179 },
1180 Other(SharedString),
1181}
1182
1183impl Display for LoadError {
1184 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1185 match self {
1186 LoadError::Unsupported {
1187 command: path,
1188 current_version,
1189 minimum_version,
1190 } => {
1191 write!(
1192 f,
1193 "version {current_version} from {path} is not supported (need at least {minimum_version})"
1194 )
1195 }
1196 LoadError::FailedToInstall(msg) => write!(f, "Failed to install: {msg}"),
1197 LoadError::Exited { status } => write!(f, "Server exited with status {status}"),
1198 LoadError::Other(msg) => write!(f, "{msg}"),
1199 }
1200 }
1201}
1202
1203impl Error for LoadError {}
1204
1205impl AcpThread {
1206 pub fn new(
1207 parent_session_id: Option<acp::SessionId>,
1208 title: Option<SharedString>,
1209 work_dirs: Option<PathList>,
1210 connection: Rc<dyn AgentConnection>,
1211 project: Entity<Project>,
1212 action_log: Entity<ActionLog>,
1213 session_id: acp::SessionId,
1214 mut prompt_capabilities_rx: watch::Receiver<acp::PromptCapabilities>,
1215 cx: &mut Context<Self>,
1216 ) -> Self {
1217 let prompt_capabilities = prompt_capabilities_rx.borrow().clone();
1218 let task = cx.spawn::<_, anyhow::Result<()>>(async move |this, cx| {
1219 loop {
1220 let caps = prompt_capabilities_rx.recv().await?;
1221 this.update(cx, |this, cx| {
1222 this.prompt_capabilities = caps;
1223 cx.emit(AcpThreadEvent::PromptCapabilitiesUpdated);
1224 })?;
1225 }
1226 });
1227
1228 Self {
1229 parent_session_id,
1230 work_dirs,
1231 action_log,
1232 shared_buffers: Default::default(),
1233 entries: Default::default(),
1234 plan: Default::default(),
1235 title,
1236 provisional_title: None,
1237 project,
1238 running_turn: None,
1239 turn_id: 0,
1240 connection,
1241 session_id,
1242 token_usage: None,
1243 cost: None,
1244 prompt_capabilities,
1245 available_commands: Vec::new(),
1246 _observe_prompt_capabilities: task,
1247 terminals: HashMap::default(),
1248 pending_terminal_output: HashMap::default(),
1249 pending_terminal_exit: HashMap::default(),
1250 had_error: false,
1251 draft_prompt: None,
1252 ui_scroll_position: None,
1253 streaming_text_buffer: None,
1254 }
1255 }
1256
1257 pub fn parent_session_id(&self) -> Option<&acp::SessionId> {
1258 self.parent_session_id.as_ref()
1259 }
1260
1261 pub fn prompt_capabilities(&self) -> acp::PromptCapabilities {
1262 self.prompt_capabilities.clone()
1263 }
1264
1265 pub fn available_commands(&self) -> &[acp::AvailableCommand] {
1266 &self.available_commands
1267 }
1268
1269 pub fn draft_prompt(&self) -> Option<&[acp::ContentBlock]> {
1270 self.draft_prompt.as_deref()
1271 }
1272
1273 pub fn set_draft_prompt(&mut self, prompt: Option<Vec<acp::ContentBlock>>) {
1274 self.draft_prompt = prompt;
1275 }
1276
1277 pub fn ui_scroll_position(&self) -> Option<gpui::ListOffset> {
1278 self.ui_scroll_position
1279 }
1280
1281 pub fn set_ui_scroll_position(&mut self, position: Option<gpui::ListOffset>) {
1282 self.ui_scroll_position = position;
1283 }
1284
1285 pub fn connection(&self) -> &Rc<dyn AgentConnection> {
1286 &self.connection
1287 }
1288
1289 pub fn action_log(&self) -> &Entity<ActionLog> {
1290 &self.action_log
1291 }
1292
1293 pub fn project(&self) -> &Entity<Project> {
1294 &self.project
1295 }
1296
1297 pub fn title(&self) -> Option<SharedString> {
1298 self.title
1299 .clone()
1300 .or_else(|| self.provisional_title.clone())
1301 }
1302
1303 pub fn has_provisional_title(&self) -> bool {
1304 self.provisional_title.is_some()
1305 }
1306
1307 pub fn entries(&self) -> &[AgentThreadEntry] {
1308 &self.entries
1309 }
1310
1311 pub fn session_id(&self) -> &acp::SessionId {
1312 &self.session_id
1313 }
1314
1315 pub fn supports_truncate(&self, cx: &App) -> bool {
1316 self.connection.truncate(&self.session_id, cx).is_some()
1317 }
1318
1319 pub fn work_dirs(&self) -> Option<&PathList> {
1320 self.work_dirs.as_ref()
1321 }
1322
1323 pub fn set_work_dirs(&mut self, work_dirs: PathList, cx: &mut Context<Self>) {
1324 self.work_dirs = Some(work_dirs);
1325 cx.emit(AcpThreadEvent::WorkingDirectoriesUpdated)
1326 }
1327
1328 pub fn status(&self) -> ThreadStatus {
1329 if self.running_turn.is_some() {
1330 ThreadStatus::Generating
1331 } else {
1332 ThreadStatus::Idle
1333 }
1334 }
1335
1336 pub fn had_error(&self) -> bool {
1337 self.had_error
1338 }
1339
1340 pub fn is_waiting_for_confirmation(&self) -> bool {
1341 for entry in self.entries.iter().rev() {
1342 match entry {
1343 AgentThreadEntry::UserMessage(_) => return false,
1344 AgentThreadEntry::ToolCall(ToolCall {
1345 status: ToolCallStatus::WaitingForConfirmation { .. },
1346 ..
1347 }) => return true,
1348 AgentThreadEntry::ToolCall(_)
1349 | AgentThreadEntry::AssistantMessage(_)
1350 | AgentThreadEntry::CompletedPlan(_) => {}
1351 }
1352 }
1353 false
1354 }
1355
1356 pub fn token_usage(&self) -> Option<&TokenUsage> {
1357 self.token_usage.as_ref()
1358 }
1359
1360 pub fn cost(&self) -> Option<&SessionCost> {
1361 self.cost.as_ref()
1362 }
1363
1364 pub fn has_pending_edit_tool_calls(&self) -> bool {
1365 for entry in self.entries.iter().rev() {
1366 match entry {
1367 AgentThreadEntry::UserMessage(_) => return false,
1368 AgentThreadEntry::ToolCall(
1369 call @ ToolCall {
1370 status: ToolCallStatus::InProgress | ToolCallStatus::Pending,
1371 ..
1372 },
1373 ) if call.diffs().next().is_some() => {
1374 return true;
1375 }
1376 AgentThreadEntry::ToolCall(_)
1377 | AgentThreadEntry::AssistantMessage(_)
1378 | AgentThreadEntry::CompletedPlan(_) => {}
1379 }
1380 }
1381
1382 false
1383 }
1384
1385 pub fn has_in_progress_tool_calls(&self) -> bool {
1386 for entry in self.entries.iter().rev() {
1387 match entry {
1388 AgentThreadEntry::UserMessage(_) => return false,
1389 AgentThreadEntry::ToolCall(ToolCall {
1390 status: ToolCallStatus::InProgress | ToolCallStatus::Pending,
1391 ..
1392 }) => {
1393 return true;
1394 }
1395 AgentThreadEntry::ToolCall(_)
1396 | AgentThreadEntry::AssistantMessage(_)
1397 | AgentThreadEntry::CompletedPlan(_) => {}
1398 }
1399 }
1400
1401 false
1402 }
1403
1404 pub fn used_tools_since_last_user_message(&self) -> bool {
1405 for entry in self.entries.iter().rev() {
1406 match entry {
1407 AgentThreadEntry::UserMessage(..) => return false,
1408 AgentThreadEntry::AssistantMessage(..) | AgentThreadEntry::CompletedPlan(..) => {
1409 continue;
1410 }
1411 AgentThreadEntry::ToolCall(..) => return true,
1412 }
1413 }
1414
1415 false
1416 }
1417
1418 pub fn handle_session_update(
1419 &mut self,
1420 update: acp::SessionUpdate,
1421 cx: &mut Context<Self>,
1422 ) -> Result<(), acp::Error> {
1423 match update {
1424 acp::SessionUpdate::UserMessageChunk(acp::ContentChunk { content, .. }) => {
1425 // We optimistically add the full user prompt before calling `prompt`.
1426 // Some ACP servers echo user chunks back over updates. Skip the chunk if
1427 // it's already present in the current user message to avoid duplicating content.
1428 let already_in_user_message = self
1429 .entries
1430 .last()
1431 .and_then(|entry| entry.user_message())
1432 .is_some_and(|message| message.chunks.contains(&content));
1433 if !already_in_user_message {
1434 self.push_user_content_block(None, content, cx);
1435 }
1436 }
1437 acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk { content, .. }) => {
1438 self.push_assistant_content_block(content, false, cx);
1439 }
1440 acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk { content, .. }) => {
1441 self.push_assistant_content_block(content, true, cx);
1442 }
1443 acp::SessionUpdate::ToolCall(tool_call) => {
1444 self.upsert_tool_call(tool_call, cx)?;
1445 }
1446 acp::SessionUpdate::ToolCallUpdate(tool_call_update) => {
1447 self.update_tool_call(tool_call_update, cx)?;
1448 }
1449 acp::SessionUpdate::Plan(plan) => {
1450 self.update_plan(plan, cx);
1451 }
1452 acp::SessionUpdate::SessionInfoUpdate(info_update) => {
1453 if let acp::MaybeUndefined::Value(title) = info_update.title {
1454 let had_provisional = self.provisional_title.take().is_some();
1455 let title: SharedString = title.into();
1456 if self.title.as_ref() != Some(&title) {
1457 self.title = Some(title);
1458 cx.emit(AcpThreadEvent::TitleUpdated);
1459 } else if had_provisional {
1460 cx.emit(AcpThreadEvent::TitleUpdated);
1461 }
1462 }
1463 }
1464 acp::SessionUpdate::AvailableCommandsUpdate(acp::AvailableCommandsUpdate {
1465 available_commands,
1466 ..
1467 }) => {
1468 self.available_commands = available_commands.clone();
1469 cx.emit(AcpThreadEvent::AvailableCommandsUpdated(available_commands));
1470 }
1471 acp::SessionUpdate::CurrentModeUpdate(acp::CurrentModeUpdate {
1472 current_mode_id,
1473 ..
1474 }) => cx.emit(AcpThreadEvent::ModeUpdated(current_mode_id)),
1475 acp::SessionUpdate::ConfigOptionUpdate(acp::ConfigOptionUpdate {
1476 config_options,
1477 ..
1478 }) => cx.emit(AcpThreadEvent::ConfigOptionsUpdated(config_options)),
1479 acp::SessionUpdate::UsageUpdate(update) if cx.has_flag::<AcpBetaFeatureFlag>() => {
1480 let usage = self.token_usage.get_or_insert_with(Default::default);
1481 usage.max_tokens = update.size;
1482 usage.used_tokens = update.used;
1483 if let Some(cost) = update.cost {
1484 self.cost = Some(SessionCost {
1485 amount: cost.amount,
1486 currency: cost.currency.into(),
1487 });
1488 }
1489 cx.emit(AcpThreadEvent::TokenUsageUpdated);
1490 }
1491 _ => {}
1492 }
1493 Ok(())
1494 }
1495
1496 pub fn push_user_content_block(
1497 &mut self,
1498 message_id: Option<UserMessageId>,
1499 chunk: acp::ContentBlock,
1500 cx: &mut Context<Self>,
1501 ) {
1502 self.push_user_content_block_with_indent(message_id, chunk, false, cx)
1503 }
1504
1505 pub fn push_user_content_block_with_indent(
1506 &mut self,
1507 message_id: Option<UserMessageId>,
1508 chunk: acp::ContentBlock,
1509 indented: bool,
1510 cx: &mut Context<Self>,
1511 ) {
1512 let language_registry = self.project.read(cx).languages().clone();
1513 let path_style = self.project.read(cx).path_style(cx);
1514 let entries_len = self.entries.len();
1515
1516 if let Some(last_entry) = self.entries.last_mut()
1517 && let AgentThreadEntry::UserMessage(UserMessage {
1518 id,
1519 content,
1520 chunks,
1521 indented: existing_indented,
1522 ..
1523 }) = last_entry
1524 && *existing_indented == indented
1525 {
1526 Self::flush_streaming_text(&mut self.streaming_text_buffer, cx);
1527 *id = message_id.or(id.take());
1528 content.append(chunk.clone(), &language_registry, path_style, cx);
1529 chunks.push(chunk);
1530 let idx = entries_len - 1;
1531 cx.emit(AcpThreadEvent::EntryUpdated(idx));
1532 } else {
1533 let content = ContentBlock::new(chunk.clone(), &language_registry, path_style, cx);
1534 self.push_entry(
1535 AgentThreadEntry::UserMessage(UserMessage {
1536 id: message_id,
1537 content,
1538 chunks: vec![chunk],
1539 checkpoint: None,
1540 indented,
1541 }),
1542 cx,
1543 );
1544 }
1545 }
1546
1547 pub fn push_assistant_content_block(
1548 &mut self,
1549 chunk: acp::ContentBlock,
1550 is_thought: bool,
1551 cx: &mut Context<Self>,
1552 ) {
1553 self.push_assistant_content_block_with_indent(chunk, is_thought, false, cx)
1554 }
1555
1556 pub fn push_assistant_content_block_with_indent(
1557 &mut self,
1558 chunk: acp::ContentBlock,
1559 is_thought: bool,
1560 indented: bool,
1561 cx: &mut Context<Self>,
1562 ) {
1563 let path_style = self.project.read(cx).path_style(cx);
1564
1565 // For text chunks going to an existing Markdown block, buffer for smooth
1566 // streaming instead of appending all at once which may feel more choppy.
1567 if let acp::ContentBlock::Text(text_content) = &chunk {
1568 if let Some(markdown) = self.streaming_markdown_target(is_thought, indented) {
1569 let entries_len = self.entries.len();
1570 cx.emit(AcpThreadEvent::EntryUpdated(entries_len - 1));
1571 self.buffer_streaming_text(&markdown, text_content.text.clone(), cx);
1572 return;
1573 }
1574 }
1575
1576 let language_registry = self.project.read(cx).languages().clone();
1577 let entries_len = self.entries.len();
1578 if let Some(last_entry) = self.entries.last_mut()
1579 && let AgentThreadEntry::AssistantMessage(AssistantMessage {
1580 chunks,
1581 indented: existing_indented,
1582 is_subagent_output: _,
1583 }) = last_entry
1584 && *existing_indented == indented
1585 {
1586 let idx = entries_len - 1;
1587 Self::flush_streaming_text(&mut self.streaming_text_buffer, cx);
1588 cx.emit(AcpThreadEvent::EntryUpdated(idx));
1589 match (chunks.last_mut(), is_thought) {
1590 (Some(AssistantMessageChunk::Message { block }), false)
1591 | (Some(AssistantMessageChunk::Thought { block }), true) => {
1592 block.append(chunk, &language_registry, path_style, cx)
1593 }
1594 _ => {
1595 let block = ContentBlock::new(chunk, &language_registry, path_style, cx);
1596 if is_thought {
1597 chunks.push(AssistantMessageChunk::Thought { block })
1598 } else {
1599 chunks.push(AssistantMessageChunk::Message { block })
1600 }
1601 }
1602 }
1603 } else {
1604 let block = ContentBlock::new(chunk, &language_registry, path_style, cx);
1605 let chunk = if is_thought {
1606 AssistantMessageChunk::Thought { block }
1607 } else {
1608 AssistantMessageChunk::Message { block }
1609 };
1610
1611 self.push_entry(
1612 AgentThreadEntry::AssistantMessage(AssistantMessage {
1613 chunks: vec![chunk],
1614 indented,
1615 is_subagent_output: false,
1616 }),
1617 cx,
1618 );
1619 }
1620 }
1621
1622 fn streaming_markdown_target(
1623 &self,
1624 is_thought: bool,
1625 indented: bool,
1626 ) -> Option<Entity<Markdown>> {
1627 let last_entry = self.entries.last()?;
1628 if let AgentThreadEntry::AssistantMessage(AssistantMessage {
1629 chunks,
1630 indented: existing_indented,
1631 ..
1632 }) = last_entry
1633 && *existing_indented == indented
1634 && let [.., chunk] = chunks.as_slice()
1635 {
1636 match (chunk, is_thought) {
1637 (
1638 AssistantMessageChunk::Message {
1639 block: ContentBlock::Markdown { markdown },
1640 },
1641 false,
1642 )
1643 | (
1644 AssistantMessageChunk::Thought {
1645 block: ContentBlock::Markdown { markdown },
1646 },
1647 true,
1648 ) => Some(markdown.clone()),
1649 _ => None,
1650 }
1651 } else {
1652 None
1653 }
1654 }
1655
1656 /// Add text to the streaming buffer. If the target changed (e.g. switching
1657 /// from thoughts to message text), flush the old buffer first.
1658 fn buffer_streaming_text(
1659 &mut self,
1660 markdown: &Entity<Markdown>,
1661 text: String,
1662 cx: &mut Context<Self>,
1663 ) {
1664 if let Some(buffer) = &mut self.streaming_text_buffer {
1665 if buffer.target.entity_id() == markdown.entity_id() {
1666 buffer.pending.push_str(&text);
1667
1668 buffer.bytes_to_reveal_per_tick = (buffer.pending.len() as f32
1669 / StreamingTextBuffer::REVEAL_TARGET
1670 * StreamingTextBuffer::TASK_UPDATE_MS as f32)
1671 .ceil() as usize;
1672 return;
1673 }
1674 Self::flush_streaming_text(&mut self.streaming_text_buffer, cx);
1675 }
1676
1677 let target = markdown.clone();
1678 let _reveal_task = self.start_streaming_reveal(cx);
1679 let pending_len = text.len();
1680 let bytes_to_reveal = (pending_len as f32 / StreamingTextBuffer::REVEAL_TARGET
1681 * StreamingTextBuffer::TASK_UPDATE_MS as f32)
1682 .ceil() as usize;
1683 self.streaming_text_buffer = Some(StreamingTextBuffer {
1684 pending: text,
1685 bytes_to_reveal_per_tick: bytes_to_reveal,
1686 target,
1687 _reveal_task,
1688 });
1689 }
1690
1691 /// Flush all buffered streaming text into the Markdown entity immediately.
1692 fn flush_streaming_text(
1693 streaming_text_buffer: &mut Option<StreamingTextBuffer>,
1694 cx: &mut Context<Self>,
1695 ) {
1696 if let Some(buffer) = streaming_text_buffer.take() {
1697 if !buffer.pending.is_empty() {
1698 buffer
1699 .target
1700 .update(cx, |markdown, cx| markdown.append(&buffer.pending, cx));
1701 }
1702 }
1703 }
1704
1705 /// Spawns a foreground task that periodically drains
1706 /// `streaming_text_buffer.pending` into the target `Markdown` entity,
1707 /// producing smooth, continuous text output.
1708 fn start_streaming_reveal(&self, cx: &mut Context<Self>) -> Task<()> {
1709 cx.spawn(async move |this, cx| {
1710 loop {
1711 cx.background_executor()
1712 .timer(Duration::from_millis(StreamingTextBuffer::TASK_UPDATE_MS))
1713 .await;
1714
1715 let should_continue = this
1716 .update(cx, |this, cx| {
1717 let Some(buffer) = &mut this.streaming_text_buffer else {
1718 return false;
1719 };
1720
1721 if buffer.pending.is_empty() {
1722 return true;
1723 }
1724
1725 let pending_len = buffer.pending.len();
1726
1727 let byte_boundary = buffer
1728 .pending
1729 .ceil_char_boundary(buffer.bytes_to_reveal_per_tick)
1730 .min(pending_len);
1731
1732 buffer.target.update(cx, |markdown: &mut Markdown, cx| {
1733 markdown.append(&buffer.pending[..byte_boundary], cx);
1734 buffer.pending.drain(..byte_boundary);
1735 });
1736
1737 true
1738 })
1739 .unwrap_or(false);
1740
1741 if !should_continue {
1742 break;
1743 }
1744 }
1745 })
1746 }
1747
1748 fn push_entry(&mut self, entry: AgentThreadEntry, cx: &mut Context<Self>) {
1749 Self::flush_streaming_text(&mut self.streaming_text_buffer, cx);
1750 self.entries.push(entry);
1751 cx.emit(AcpThreadEvent::NewEntry);
1752 }
1753
1754 pub fn can_set_title(&mut self, cx: &mut Context<Self>) -> bool {
1755 self.connection.set_title(&self.session_id, cx).is_some()
1756 }
1757
1758 pub fn set_title(&mut self, title: SharedString, cx: &mut Context<Self>) -> Task<Result<()>> {
1759 let had_provisional = self.provisional_title.take().is_some();
1760 if self.title.as_ref() != Some(&title) {
1761 self.title = Some(title.clone());
1762 cx.emit(AcpThreadEvent::TitleUpdated);
1763 if let Some(set_title) = self.connection.set_title(&self.session_id, cx) {
1764 return set_title.run(title, cx);
1765 }
1766 } else if had_provisional {
1767 cx.emit(AcpThreadEvent::TitleUpdated);
1768 }
1769 Task::ready(Ok(()))
1770 }
1771
1772 /// Sets a provisional display title without propagating back to the
1773 /// underlying agent connection. This is used for quick preview titles
1774 /// (e.g. first 20 chars of the user message) that should be shown
1775 /// immediately but replaced once the LLM generates a proper title via
1776 /// `set_title`.
1777 pub fn set_provisional_title(&mut self, title: SharedString, cx: &mut Context<Self>) {
1778 self.provisional_title = Some(title);
1779 cx.emit(AcpThreadEvent::TitleUpdated);
1780 }
1781
1782 pub fn subagent_spawned(&mut self, session_id: acp::SessionId, cx: &mut Context<Self>) {
1783 cx.emit(AcpThreadEvent::SubagentSpawned(session_id));
1784 }
1785
1786 pub fn update_token_usage(&mut self, usage: Option<TokenUsage>, cx: &mut Context<Self>) {
1787 if usage.is_none() {
1788 self.cost = None;
1789 }
1790 self.token_usage = usage;
1791 cx.emit(AcpThreadEvent::TokenUsageUpdated);
1792 }
1793
1794 pub fn update_retry_status(&mut self, status: RetryStatus, cx: &mut Context<Self>) {
1795 cx.emit(AcpThreadEvent::Retry(status));
1796 }
1797
1798 pub fn update_tool_call(
1799 &mut self,
1800 update: impl Into<ToolCallUpdate>,
1801 cx: &mut Context<Self>,
1802 ) -> Result<()> {
1803 let update = update.into();
1804 let languages = self.project.read(cx).languages().clone();
1805 let path_style = self.project.read(cx).path_style(cx);
1806
1807 let ix = match self.index_for_tool_call(update.id()) {
1808 Some(ix) => ix,
1809 None => {
1810 // Tool call not found - create a failed tool call entry
1811 let failed_tool_call = ToolCall {
1812 id: update.id().clone(),
1813 label: cx.new(|cx| Markdown::new("Tool call not found".into(), None, None, cx)),
1814 kind: acp::ToolKind::Fetch,
1815 content: vec![ToolCallContent::ContentBlock(ContentBlock::new(
1816 "Tool call not found".into(),
1817 &languages,
1818 path_style,
1819 cx,
1820 ))],
1821 status: ToolCallStatus::Failed,
1822 locations: Vec::new(),
1823 resolved_locations: Vec::new(),
1824 raw_input: None,
1825 raw_input_markdown: None,
1826 raw_output: None,
1827 tool_name: None,
1828 subagent_session_info: None,
1829 };
1830 self.push_entry(AgentThreadEntry::ToolCall(failed_tool_call), cx);
1831 return Ok(());
1832 }
1833 };
1834 let AgentThreadEntry::ToolCall(call) = &mut self.entries[ix] else {
1835 unreachable!()
1836 };
1837
1838 match update {
1839 ToolCallUpdate::UpdateFields(update) => {
1840 let location_updated = update.fields.locations.is_some();
1841 call.update_fields(
1842 update.fields,
1843 update.meta,
1844 languages,
1845 path_style,
1846 &self.terminals,
1847 cx,
1848 )?;
1849 if location_updated {
1850 self.resolve_locations(update.tool_call_id, cx);
1851 }
1852 }
1853 ToolCallUpdate::UpdateDiff(update) => {
1854 call.content.clear();
1855 call.content.push(ToolCallContent::Diff(update.diff));
1856 }
1857 ToolCallUpdate::UpdateTerminal(update) => {
1858 call.content.clear();
1859 call.content
1860 .push(ToolCallContent::Terminal(update.terminal));
1861 }
1862 }
1863
1864 cx.emit(AcpThreadEvent::EntryUpdated(ix));
1865
1866 Ok(())
1867 }
1868
1869 /// Updates a tool call if id matches an existing entry, otherwise inserts a new one.
1870 pub fn upsert_tool_call(
1871 &mut self,
1872 tool_call: acp::ToolCall,
1873 cx: &mut Context<Self>,
1874 ) -> Result<(), acp::Error> {
1875 let status = tool_call.status.into();
1876 self.upsert_tool_call_inner(tool_call.into(), status, cx)
1877 }
1878
1879 /// Fails if id does not match an existing entry.
1880 pub fn upsert_tool_call_inner(
1881 &mut self,
1882 update: acp::ToolCallUpdate,
1883 status: ToolCallStatus,
1884 cx: &mut Context<Self>,
1885 ) -> Result<(), acp::Error> {
1886 let language_registry = self.project.read(cx).languages().clone();
1887 let path_style = self.project.read(cx).path_style(cx);
1888 let id = update.tool_call_id.clone();
1889
1890 let agent_telemetry_id = self.connection().telemetry_id();
1891 let session = self.session_id();
1892 let parent_session_id = self.parent_session_id();
1893 if let ToolCallStatus::Completed | ToolCallStatus::Failed = status {
1894 let status = if matches!(status, ToolCallStatus::Completed) {
1895 "completed"
1896 } else {
1897 "failed"
1898 };
1899 telemetry::event!(
1900 "Agent Tool Call Completed",
1901 agent_telemetry_id,
1902 session,
1903 parent_session_id,
1904 status
1905 );
1906 }
1907
1908 if let Some(ix) = self.index_for_tool_call(&id) {
1909 let AgentThreadEntry::ToolCall(call) = &mut self.entries[ix] else {
1910 unreachable!()
1911 };
1912
1913 call.update_fields(
1914 update.fields,
1915 update.meta,
1916 language_registry,
1917 path_style,
1918 &self.terminals,
1919 cx,
1920 )?;
1921 call.status = status;
1922
1923 cx.emit(AcpThreadEvent::EntryUpdated(ix));
1924 } else {
1925 let call = ToolCall::from_acp(
1926 update.try_into()?,
1927 status,
1928 language_registry,
1929 self.project.read(cx).path_style(cx),
1930 &self.terminals,
1931 cx,
1932 )?;
1933 self.push_entry(AgentThreadEntry::ToolCall(call), cx);
1934 };
1935
1936 self.resolve_locations(id, cx);
1937 Ok(())
1938 }
1939
1940 fn index_for_tool_call(&self, id: &acp::ToolCallId) -> Option<usize> {
1941 self.entries
1942 .iter()
1943 .enumerate()
1944 .rev()
1945 .find_map(|(index, entry)| {
1946 if let AgentThreadEntry::ToolCall(tool_call) = entry
1947 && &tool_call.id == id
1948 {
1949 Some(index)
1950 } else {
1951 None
1952 }
1953 })
1954 }
1955
1956 fn tool_call_mut(&mut self, id: &acp::ToolCallId) -> Option<(usize, &mut ToolCall)> {
1957 // The tool call we are looking for is typically the last one, or very close to the end.
1958 // At the moment, it doesn't seem like a hashmap would be a good fit for this use case.
1959 self.entries
1960 .iter_mut()
1961 .enumerate()
1962 .rev()
1963 .find_map(|(index, tool_call)| {
1964 if let AgentThreadEntry::ToolCall(tool_call) = tool_call
1965 && &tool_call.id == id
1966 {
1967 Some((index, tool_call))
1968 } else {
1969 None
1970 }
1971 })
1972 }
1973
1974 pub fn tool_call(&self, id: &acp::ToolCallId) -> Option<(usize, &ToolCall)> {
1975 self.entries
1976 .iter()
1977 .enumerate()
1978 .rev()
1979 .find_map(|(index, tool_call)| {
1980 if let AgentThreadEntry::ToolCall(tool_call) = tool_call
1981 && &tool_call.id == id
1982 {
1983 Some((index, tool_call))
1984 } else {
1985 None
1986 }
1987 })
1988 }
1989
1990 pub fn tool_call_for_subagent(&self, session_id: &acp::SessionId) -> Option<&ToolCall> {
1991 self.entries.iter().find_map(|entry| match entry {
1992 AgentThreadEntry::ToolCall(tool_call) => {
1993 if let Some(subagent_session_info) = &tool_call.subagent_session_info
1994 && &subagent_session_info.session_id == session_id
1995 {
1996 Some(tool_call)
1997 } else {
1998 None
1999 }
2000 }
2001 _ => None,
2002 })
2003 }
2004
2005 pub fn resolve_locations(&mut self, id: acp::ToolCallId, cx: &mut Context<Self>) {
2006 let project = self.project.clone();
2007 let should_update_agent_location = self.parent_session_id.is_none();
2008 let Some((_, tool_call)) = self.tool_call_mut(&id) else {
2009 return;
2010 };
2011 let task = tool_call.resolve_locations(project, cx);
2012 cx.spawn(async move |this, cx| {
2013 let resolved_locations = task.await;
2014
2015 this.update(cx, |this, cx| {
2016 let project = this.project.clone();
2017
2018 for location in resolved_locations.iter().flatten() {
2019 this.shared_buffers
2020 .insert(location.buffer.clone(), location.buffer.read(cx).snapshot());
2021 }
2022 let Some((ix, tool_call)) = this.tool_call_mut(&id) else {
2023 return;
2024 };
2025
2026 if let Some(Some(location)) = resolved_locations.last() {
2027 project.update(cx, |project, cx| {
2028 let should_ignore = if let Some(agent_location) = project
2029 .agent_location()
2030 .filter(|agent_location| agent_location.buffer == location.buffer)
2031 {
2032 let snapshot = location.buffer.read(cx).snapshot();
2033 let old_position = agent_location.position.to_point(&snapshot);
2034 let new_position = location.position.to_point(&snapshot);
2035
2036 // ignore this so that when we get updates from the edit tool
2037 // the position doesn't reset to the startof line
2038 old_position.row == new_position.row
2039 && old_position.column > new_position.column
2040 } else {
2041 false
2042 };
2043 if !should_ignore && should_update_agent_location {
2044 project.set_agent_location(Some(location.into()), cx);
2045 }
2046 });
2047 }
2048
2049 let resolved_locations = resolved_locations
2050 .iter()
2051 .map(|l| l.as_ref().map(|l| AgentLocation::from(l)))
2052 .collect::<Vec<_>>();
2053
2054 if tool_call.resolved_locations != resolved_locations {
2055 tool_call.resolved_locations = resolved_locations;
2056 cx.emit(AcpThreadEvent::EntryUpdated(ix));
2057 }
2058 })
2059 })
2060 .detach();
2061 }
2062
2063 pub fn request_tool_call_authorization(
2064 &mut self,
2065 tool_call: acp::ToolCallUpdate,
2066 options: PermissionOptions,
2067 cx: &mut Context<Self>,
2068 ) -> Result<Task<RequestPermissionOutcome>> {
2069 let (tx, rx) = oneshot::channel();
2070
2071 let status = ToolCallStatus::WaitingForConfirmation {
2072 options,
2073 respond_tx: tx,
2074 };
2075
2076 let tool_call_id = tool_call.tool_call_id.clone();
2077 self.upsert_tool_call_inner(tool_call, status, cx)?;
2078 cx.emit(AcpThreadEvent::ToolAuthorizationRequested(
2079 tool_call_id.clone(),
2080 ));
2081
2082 Ok(cx.spawn(async move |this, cx| {
2083 let outcome = match rx.await {
2084 Ok(outcome) => RequestPermissionOutcome::Selected(outcome),
2085 Err(oneshot::Canceled) => RequestPermissionOutcome::Cancelled,
2086 };
2087 this.update(cx, |_this, cx| {
2088 cx.emit(AcpThreadEvent::ToolAuthorizationReceived(tool_call_id))
2089 })
2090 .ok();
2091 outcome
2092 }))
2093 }
2094
2095 pub fn authorize_tool_call(
2096 &mut self,
2097 id: acp::ToolCallId,
2098 outcome: SelectedPermissionOutcome,
2099 cx: &mut Context<Self>,
2100 ) {
2101 let Some((ix, call)) = self.tool_call_mut(&id) else {
2102 return;
2103 };
2104
2105 let new_status = match outcome.option_kind {
2106 acp::PermissionOptionKind::RejectOnce | acp::PermissionOptionKind::RejectAlways => {
2107 ToolCallStatus::Rejected
2108 }
2109 acp::PermissionOptionKind::AllowOnce | acp::PermissionOptionKind::AllowAlways => {
2110 ToolCallStatus::InProgress
2111 }
2112 _ => ToolCallStatus::InProgress,
2113 };
2114
2115 let curr_status = mem::replace(&mut call.status, new_status);
2116
2117 if let ToolCallStatus::WaitingForConfirmation { respond_tx, .. } = curr_status {
2118 respond_tx.send(outcome).log_err();
2119 } else if cfg!(debug_assertions) {
2120 panic!("tried to authorize an already authorized tool call");
2121 }
2122
2123 cx.emit(AcpThreadEvent::EntryUpdated(ix));
2124 }
2125
2126 pub fn plan(&self) -> &Plan {
2127 &self.plan
2128 }
2129
2130 pub fn update_plan(&mut self, request: acp::Plan, cx: &mut Context<Self>) {
2131 let new_entries_len = request.entries.len();
2132 let mut new_entries = request.entries.into_iter();
2133
2134 // Reuse existing markdown to prevent flickering
2135 for (old, new) in self.plan.entries.iter_mut().zip(new_entries.by_ref()) {
2136 let PlanEntry {
2137 content,
2138 priority,
2139 status,
2140 } = old;
2141 content.update(cx, |old, cx| {
2142 old.replace(new.content, cx);
2143 });
2144 *priority = new.priority;
2145 *status = new.status;
2146 }
2147 for new in new_entries {
2148 self.plan.entries.push(PlanEntry::from_acp(new, cx))
2149 }
2150 self.plan.entries.truncate(new_entries_len);
2151
2152 cx.notify();
2153 }
2154
2155 pub fn snapshot_completed_plan(&mut self, cx: &mut Context<Self>) {
2156 if !self.plan.is_empty() && self.plan.stats().pending == 0 {
2157 let completed_entries = std::mem::take(&mut self.plan.entries);
2158 self.push_entry(AgentThreadEntry::CompletedPlan(completed_entries), cx);
2159 }
2160 }
2161
2162 fn clear_completed_plan_entries(&mut self, cx: &mut Context<Self>) {
2163 self.plan
2164 .entries
2165 .retain(|entry| !matches!(entry.status, acp::PlanEntryStatus::Completed));
2166 cx.notify();
2167 }
2168
2169 pub fn clear_plan(&mut self, cx: &mut Context<Self>) {
2170 self.plan.entries.clear();
2171 cx.notify();
2172 }
2173
2174 #[cfg(any(test, feature = "test-support"))]
2175 pub fn send_raw(
2176 &mut self,
2177 message: &str,
2178 cx: &mut Context<Self>,
2179 ) -> BoxFuture<'static, Result<Option<acp::PromptResponse>>> {
2180 self.send(vec![message.into()], cx)
2181 }
2182
2183 pub fn send(
2184 &mut self,
2185 message: Vec<acp::ContentBlock>,
2186 cx: &mut Context<Self>,
2187 ) -> BoxFuture<'static, Result<Option<acp::PromptResponse>>> {
2188 let block = ContentBlock::new_combined(
2189 message.clone(),
2190 self.project.read(cx).languages().clone(),
2191 self.project.read(cx).path_style(cx),
2192 cx,
2193 );
2194 let request = acp::PromptRequest::new(self.session_id.clone(), message.clone());
2195 let git_store = self.project.read(cx).git_store().clone();
2196
2197 let message_id = UserMessageId::new();
2198
2199 self.run_turn(cx, async move |this, cx| {
2200 this.update(cx, |this, cx| {
2201 this.push_entry(
2202 AgentThreadEntry::UserMessage(UserMessage {
2203 id: Some(message_id.clone()),
2204 content: block,
2205 chunks: message,
2206 checkpoint: None,
2207 indented: false,
2208 }),
2209 cx,
2210 );
2211 })
2212 .ok();
2213
2214 let old_checkpoint = git_store
2215 .update(cx, |git, cx| git.checkpoint(cx))
2216 .await
2217 .context("failed to get old checkpoint")
2218 .log_err();
2219 this.update(cx, |this, cx| {
2220 if let Some((_ix, message)) = this.last_user_message() {
2221 message.checkpoint = old_checkpoint.map(|git_checkpoint| Checkpoint {
2222 git_checkpoint,
2223 show: false,
2224 });
2225 }
2226 this.connection.prompt(message_id, request, cx)
2227 })?
2228 .await
2229 })
2230 }
2231
2232 pub fn can_retry(&self, cx: &App) -> bool {
2233 self.connection.retry(&self.session_id, cx).is_some()
2234 }
2235
2236 pub fn retry(
2237 &mut self,
2238 cx: &mut Context<Self>,
2239 ) -> BoxFuture<'static, Result<Option<acp::PromptResponse>>> {
2240 self.run_turn(cx, async move |this, cx| {
2241 this.update(cx, |this, cx| {
2242 this.connection
2243 .retry(&this.session_id, cx)
2244 .map(|retry| retry.run(cx))
2245 })?
2246 .context("retrying a session is not supported")?
2247 .await
2248 })
2249 }
2250
2251 fn run_turn(
2252 &mut self,
2253 cx: &mut Context<Self>,
2254 f: impl 'static + AsyncFnOnce(WeakEntity<Self>, &mut AsyncApp) -> Result<acp::PromptResponse>,
2255 ) -> BoxFuture<'static, Result<Option<acp::PromptResponse>>> {
2256 self.clear_completed_plan_entries(cx);
2257 self.had_error = false;
2258
2259 let (tx, rx) = oneshot::channel();
2260 let cancel_task = self.cancel(cx);
2261
2262 self.turn_id += 1;
2263 let turn_id = self.turn_id;
2264 self.running_turn = Some(RunningTurn {
2265 id: turn_id,
2266 send_task: cx.spawn(async move |this, cx| {
2267 cancel_task.await;
2268 tx.send(f(this, cx).await).ok();
2269 }),
2270 });
2271
2272 cx.spawn(async move |this, cx| {
2273 let response = rx.await;
2274
2275 this.update(cx, |this, cx| this.update_last_checkpoint(cx))?
2276 .await?;
2277
2278 this.update(cx, |this, cx| {
2279 if this.parent_session_id.is_none() {
2280 this.project
2281 .update(cx, |project, cx| project.set_agent_location(None, cx));
2282 }
2283 let Ok(response) = response else {
2284 // tx dropped, just return
2285 return Ok(None);
2286 };
2287
2288 let is_same_turn = this
2289 .running_turn
2290 .as_ref()
2291 .is_some_and(|turn| turn_id == turn.id);
2292
2293 // If the user submitted a follow up message, running_turn might
2294 // already point to a different turn. Therefore we only want to
2295 // take the task if it's the same turn.
2296 if is_same_turn {
2297 this.running_turn.take();
2298 }
2299
2300 match response {
2301 Ok(r) => {
2302 Self::flush_streaming_text(&mut this.streaming_text_buffer, cx);
2303
2304 if r.stop_reason == acp::StopReason::MaxTokens {
2305 this.had_error = true;
2306 cx.emit(AcpThreadEvent::Error);
2307 log::error!("Max tokens reached. Usage: {:?}", this.token_usage);
2308
2309 let exceeded_max_output_tokens =
2310 this.token_usage.as_ref().is_some_and(|u| {
2311 u.max_output_tokens
2312 .is_some_and(|max| u.output_tokens >= max)
2313 });
2314
2315 if exceeded_max_output_tokens {
2316 log::error!(
2317 "Max output tokens reached. Usage: {:?}",
2318 this.token_usage
2319 );
2320 } else {
2321 log::error!("Max tokens reached. Usage: {:?}", this.token_usage);
2322 }
2323 return Err(anyhow!(MaxOutputTokensError));
2324 }
2325
2326 let canceled = matches!(r.stop_reason, acp::StopReason::Cancelled);
2327 if canceled {
2328 this.mark_pending_tools_as_canceled();
2329 }
2330
2331 if !canceled {
2332 this.snapshot_completed_plan(cx);
2333 }
2334
2335 // Handle refusal - distinguish between user prompt and tool call refusals
2336 if let acp::StopReason::Refusal = r.stop_reason {
2337 this.had_error = true;
2338 if let Some((user_msg_ix, _)) = this.last_user_message() {
2339 // Check if there's a completed tool call with results after the last user message
2340 // This indicates the refusal is in response to tool output, not the user's prompt
2341 let has_completed_tool_call_after_user_msg =
2342 this.entries.iter().skip(user_msg_ix + 1).any(|entry| {
2343 if let AgentThreadEntry::ToolCall(tool_call) = entry {
2344 // Check if the tool call has completed and has output
2345 matches!(tool_call.status, ToolCallStatus::Completed)
2346 && tool_call.raw_output.is_some()
2347 } else {
2348 false
2349 }
2350 });
2351
2352 if has_completed_tool_call_after_user_msg {
2353 // Refusal is due to tool output - don't truncate, just notify
2354 // The model refused based on what the tool returned
2355 cx.emit(AcpThreadEvent::Refusal);
2356 } else {
2357 // User prompt was refused - truncate back to before the user message
2358 let range = user_msg_ix..this.entries.len();
2359 if range.start < range.end {
2360 this.entries.truncate(user_msg_ix);
2361 cx.emit(AcpThreadEvent::EntriesRemoved(range));
2362 }
2363 cx.emit(AcpThreadEvent::Refusal);
2364 }
2365 } else {
2366 // No user message found, treat as general refusal
2367 cx.emit(AcpThreadEvent::Refusal);
2368 }
2369 }
2370
2371 if cx.has_flag::<AcpBetaFeatureFlag>()
2372 && let Some(response_usage) = &r.usage
2373 {
2374 let usage = this.token_usage.get_or_insert_with(Default::default);
2375 usage.input_tokens = response_usage.input_tokens;
2376 usage.output_tokens = response_usage.output_tokens;
2377 cx.emit(AcpThreadEvent::TokenUsageUpdated);
2378 }
2379
2380 cx.emit(AcpThreadEvent::Stopped(r.stop_reason));
2381 Ok(Some(r))
2382 }
2383 Err(e) => {
2384 Self::flush_streaming_text(&mut this.streaming_text_buffer, cx);
2385
2386 this.had_error = true;
2387 cx.emit(AcpThreadEvent::Error);
2388 log::error!("Error in run turn: {:?}", e);
2389 Err(e)
2390 }
2391 }
2392 })?
2393 })
2394 .boxed()
2395 }
2396
2397 pub fn cancel(&mut self, cx: &mut Context<Self>) -> Task<()> {
2398 let Some(turn) = self.running_turn.take() else {
2399 return Task::ready(());
2400 };
2401 self.connection.cancel(&self.session_id, cx);
2402
2403 Self::flush_streaming_text(&mut self.streaming_text_buffer, cx);
2404 self.mark_pending_tools_as_canceled();
2405
2406 // Wait for the send task to complete
2407 cx.background_spawn(turn.send_task)
2408 }
2409
2410 fn mark_pending_tools_as_canceled(&mut self) {
2411 for entry in self.entries.iter_mut() {
2412 if let AgentThreadEntry::ToolCall(call) = entry {
2413 let cancel = matches!(
2414 call.status,
2415 ToolCallStatus::Pending
2416 | ToolCallStatus::WaitingForConfirmation { .. }
2417 | ToolCallStatus::InProgress
2418 );
2419
2420 if cancel {
2421 call.status = ToolCallStatus::Canceled;
2422 }
2423 }
2424 }
2425 }
2426
2427 /// Restores the git working tree to the state at the given checkpoint (if one exists)
2428 pub fn restore_checkpoint(
2429 &mut self,
2430 id: UserMessageId,
2431 cx: &mut Context<Self>,
2432 ) -> Task<Result<()>> {
2433 let Some((_, message)) = self.user_message_mut(&id) else {
2434 return Task::ready(Err(anyhow!("message not found")));
2435 };
2436
2437 let checkpoint = message
2438 .checkpoint
2439 .as_ref()
2440 .map(|c| c.git_checkpoint.clone());
2441
2442 // Cancel any in-progress generation before restoring
2443 let cancel_task = self.cancel(cx);
2444 let rewind = self.rewind(id.clone(), cx);
2445 let git_store = self.project.read(cx).git_store().clone();
2446
2447 cx.spawn(async move |_, cx| {
2448 cancel_task.await;
2449 rewind.await?;
2450 if let Some(checkpoint) = checkpoint {
2451 git_store
2452 .update(cx, |git, cx| git.restore_checkpoint(checkpoint, cx))
2453 .await?;
2454 }
2455
2456 Ok(())
2457 })
2458 }
2459
2460 /// Rewinds this thread to before the entry at `index`, removing it and all
2461 /// subsequent entries while rejecting any action_log changes made from that point.
2462 /// Unlike `restore_checkpoint`, this method does not restore from git.
2463 pub fn rewind(&mut self, id: UserMessageId, cx: &mut Context<Self>) -> Task<Result<()>> {
2464 let Some(truncate) = self.connection.truncate(&self.session_id, cx) else {
2465 return Task::ready(Err(anyhow!("not supported")));
2466 };
2467
2468 Self::flush_streaming_text(&mut self.streaming_text_buffer, cx);
2469 let telemetry = ActionLogTelemetry::from(&*self);
2470 cx.spawn(async move |this, cx| {
2471 cx.update(|cx| truncate.run(id.clone(), cx)).await?;
2472 this.update(cx, |this, cx| {
2473 if let Some((ix, _)) = this.user_message_mut(&id) {
2474 // Collect all terminals from entries that will be removed
2475 let terminals_to_remove: Vec<acp::TerminalId> = this.entries[ix..]
2476 .iter()
2477 .flat_map(|entry| entry.terminals())
2478 .filter_map(|terminal| terminal.read(cx).id().clone().into())
2479 .collect();
2480
2481 let range = ix..this.entries.len();
2482 this.entries.truncate(ix);
2483 cx.emit(AcpThreadEvent::EntriesRemoved(range));
2484
2485 // Kill and remove the terminals
2486 for terminal_id in terminals_to_remove {
2487 if let Some(terminal) = this.terminals.remove(&terminal_id) {
2488 terminal.update(cx, |terminal, cx| {
2489 terminal.kill(cx);
2490 });
2491 }
2492 }
2493 }
2494 this.action_log().update(cx, |action_log, cx| {
2495 action_log.reject_all_edits(Some(telemetry), cx)
2496 })
2497 })?
2498 .await;
2499 Ok(())
2500 })
2501 }
2502
2503 fn update_last_checkpoint(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
2504 let git_store = self.project.read(cx).git_store().clone();
2505
2506 let Some((_, message)) = self.last_user_message() else {
2507 return Task::ready(Ok(()));
2508 };
2509 let Some(user_message_id) = message.id.clone() else {
2510 return Task::ready(Ok(()));
2511 };
2512 let Some(checkpoint) = message.checkpoint.as_ref() else {
2513 return Task::ready(Ok(()));
2514 };
2515 let old_checkpoint = checkpoint.git_checkpoint.clone();
2516
2517 let new_checkpoint = git_store.update(cx, |git, cx| git.checkpoint(cx));
2518 cx.spawn(async move |this, cx| {
2519 let Some(new_checkpoint) = new_checkpoint
2520 .await
2521 .context("failed to get new checkpoint")
2522 .log_err()
2523 else {
2524 return Ok(());
2525 };
2526
2527 let equal = git_store
2528 .update(cx, |git, cx| {
2529 git.compare_checkpoints(old_checkpoint.clone(), new_checkpoint, cx)
2530 })
2531 .await
2532 .unwrap_or(true);
2533
2534 this.update(cx, |this, cx| {
2535 if let Some((ix, message)) = this.user_message_mut(&user_message_id) {
2536 if let Some(checkpoint) = message.checkpoint.as_mut() {
2537 checkpoint.show = !equal;
2538 cx.emit(AcpThreadEvent::EntryUpdated(ix));
2539 }
2540 }
2541 })?;
2542
2543 Ok(())
2544 })
2545 }
2546
2547 fn last_user_message(&mut self) -> Option<(usize, &mut UserMessage)> {
2548 self.entries
2549 .iter_mut()
2550 .enumerate()
2551 .rev()
2552 .find_map(|(ix, entry)| {
2553 if let AgentThreadEntry::UserMessage(message) = entry {
2554 Some((ix, message))
2555 } else {
2556 None
2557 }
2558 })
2559 }
2560
2561 fn user_message_mut(&mut self, id: &UserMessageId) -> Option<(usize, &mut UserMessage)> {
2562 self.entries.iter_mut().enumerate().find_map(|(ix, entry)| {
2563 if let AgentThreadEntry::UserMessage(message) = entry {
2564 if message.id.as_ref() == Some(id) {
2565 Some((ix, message))
2566 } else {
2567 None
2568 }
2569 } else {
2570 None
2571 }
2572 })
2573 }
2574
2575 pub fn read_text_file(
2576 &self,
2577 path: PathBuf,
2578 line: Option<u32>,
2579 limit: Option<u32>,
2580 reuse_shared_snapshot: bool,
2581 cx: &mut Context<Self>,
2582 ) -> Task<Result<String, acp::Error>> {
2583 // Args are 1-based, move to 0-based
2584 let line = line.unwrap_or_default().saturating_sub(1);
2585 let limit = limit.unwrap_or(u32::MAX);
2586 let project = self.project.clone();
2587 let action_log = self.action_log.clone();
2588 let should_update_agent_location = self.parent_session_id.is_none();
2589 cx.spawn(async move |this, cx| {
2590 let load = project.update(cx, |project, cx| {
2591 let path = project
2592 .project_path_for_absolute_path(&path, cx)
2593 .ok_or_else(|| {
2594 acp::Error::resource_not_found(Some(path.display().to_string()))
2595 })?;
2596 Ok::<_, acp::Error>(project.open_buffer(path, cx))
2597 })?;
2598
2599 let buffer = load.await?;
2600
2601 let snapshot = if reuse_shared_snapshot {
2602 this.read_with(cx, |this, _| {
2603 this.shared_buffers.get(&buffer.clone()).cloned()
2604 })
2605 .log_err()
2606 .flatten()
2607 } else {
2608 None
2609 };
2610
2611 let snapshot = if let Some(snapshot) = snapshot {
2612 snapshot
2613 } else {
2614 action_log.update(cx, |action_log, cx| {
2615 action_log.buffer_read(buffer.clone(), cx);
2616 });
2617
2618 let snapshot = buffer.update(cx, |buffer, _| buffer.snapshot());
2619 this.update(cx, |this, _| {
2620 this.shared_buffers.insert(buffer.clone(), snapshot.clone());
2621 })?;
2622 snapshot
2623 };
2624
2625 let max_point = snapshot.max_point();
2626 let start_position = Point::new(line, 0);
2627
2628 if start_position > max_point {
2629 return Err(acp::Error::invalid_params().data(format!(
2630 "Attempting to read beyond the end of the file, line {}:{}",
2631 max_point.row + 1,
2632 max_point.column
2633 )));
2634 }
2635
2636 let start = snapshot.anchor_before(start_position);
2637 let end = snapshot.anchor_before(Point::new(line.saturating_add(limit), 0));
2638
2639 if should_update_agent_location {
2640 project.update(cx, |project, cx| {
2641 project.set_agent_location(
2642 Some(AgentLocation {
2643 buffer: buffer.downgrade(),
2644 position: start,
2645 }),
2646 cx,
2647 );
2648 });
2649 }
2650
2651 Ok(snapshot.text_for_range(start..end).collect::<String>())
2652 })
2653 }
2654
2655 pub fn write_text_file(
2656 &self,
2657 path: PathBuf,
2658 content: String,
2659 cx: &mut Context<Self>,
2660 ) -> Task<Result<()>> {
2661 let project = self.project.clone();
2662 let action_log = self.action_log.clone();
2663 let should_update_agent_location = self.parent_session_id.is_none();
2664 cx.spawn(async move |this, cx| {
2665 let load = project.update(cx, |project, cx| {
2666 let path = project
2667 .project_path_for_absolute_path(&path, cx)
2668 .context("invalid path")?;
2669 anyhow::Ok(project.open_buffer(path, cx))
2670 });
2671 let buffer = load?.await?;
2672 let snapshot = this.update(cx, |this, cx| {
2673 this.shared_buffers
2674 .get(&buffer)
2675 .cloned()
2676 .unwrap_or_else(|| buffer.read(cx).snapshot())
2677 })?;
2678 let edits = cx
2679 .background_executor()
2680 .spawn(async move {
2681 let old_text = snapshot.text();
2682 text_diff(old_text.as_str(), &content)
2683 .into_iter()
2684 .map(|(range, replacement)| {
2685 (snapshot.anchor_range_inside(range), replacement)
2686 })
2687 .collect::<Vec<_>>()
2688 })
2689 .await;
2690
2691 if should_update_agent_location {
2692 project.update(cx, |project, cx| {
2693 project.set_agent_location(
2694 Some(AgentLocation {
2695 buffer: buffer.downgrade(),
2696 position: edits
2697 .last()
2698 .map(|(range, _)| range.end)
2699 .unwrap_or(Anchor::min_for_buffer(buffer.read(cx).remote_id())),
2700 }),
2701 cx,
2702 );
2703 });
2704 }
2705
2706 let format_on_save = cx.update(|cx| {
2707 action_log.update(cx, |action_log, cx| {
2708 action_log.buffer_read(buffer.clone(), cx);
2709 });
2710
2711 let format_on_save = buffer.update(cx, |buffer, cx| {
2712 buffer.edit(edits, None, cx);
2713
2714 let settings =
2715 language::language_settings::LanguageSettings::for_buffer(buffer, cx);
2716
2717 settings.format_on_save != FormatOnSave::Off
2718 });
2719 action_log.update(cx, |action_log, cx| {
2720 action_log.buffer_edited(buffer.clone(), cx);
2721 });
2722 format_on_save
2723 });
2724
2725 if format_on_save {
2726 let format_task = project.update(cx, |project, cx| {
2727 project.format(
2728 HashSet::from_iter([buffer.clone()]),
2729 LspFormatTarget::Buffers,
2730 false,
2731 FormatTrigger::Save,
2732 cx,
2733 )
2734 });
2735 format_task.await.log_err();
2736
2737 action_log.update(cx, |action_log, cx| {
2738 action_log.buffer_edited(buffer.clone(), cx);
2739 });
2740 }
2741
2742 project
2743 .update(cx, |project, cx| project.save_buffer(buffer, cx))
2744 .await
2745 })
2746 }
2747
2748 pub fn create_terminal(
2749 &self,
2750 command: String,
2751 args: Vec<String>,
2752 extra_env: Vec<acp::EnvVariable>,
2753 cwd: Option<PathBuf>,
2754 output_byte_limit: Option<u64>,
2755 cx: &mut Context<Self>,
2756 ) -> Task<Result<Entity<Terminal>>> {
2757 let env = match &cwd {
2758 Some(dir) => self.project.update(cx, |project, cx| {
2759 project.environment().update(cx, |env, cx| {
2760 env.directory_environment(dir.as_path().into(), cx)
2761 })
2762 }),
2763 None => Task::ready(None).shared(),
2764 };
2765 let env = cx.spawn(async move |_, _| {
2766 let mut env = env.await.unwrap_or_default();
2767 // Disables paging for `git` and hopefully other commands
2768 env.insert("PAGER".into(), "".into());
2769 for var in extra_env {
2770 env.insert(var.name, var.value);
2771 }
2772 env
2773 });
2774
2775 let project = self.project.clone();
2776 let language_registry = project.read(cx).languages().clone();
2777 let is_windows = project.read(cx).path_style(cx).is_windows();
2778
2779 let terminal_id = acp::TerminalId::new(Uuid::new_v4().to_string());
2780 let terminal_task = cx.spawn({
2781 let terminal_id = terminal_id.clone();
2782 async move |_this, cx| {
2783 let env = env.await;
2784 let shell = project
2785 .update(cx, |project, cx| {
2786 project
2787 .remote_client()
2788 .and_then(|r| r.read(cx).default_system_shell())
2789 })
2790 .unwrap_or_else(|| get_default_system_shell_preferring_bash());
2791 let (task_command, task_args) =
2792 ShellBuilder::new(&Shell::Program(shell), is_windows)
2793 .redirect_stdin_to_dev_null()
2794 .build(Some(command.clone()), &args);
2795 let terminal = project
2796 .update(cx, |project, cx| {
2797 project.create_terminal_task(
2798 task::SpawnInTerminal {
2799 command: Some(task_command),
2800 args: task_args,
2801 cwd: cwd.clone(),
2802 env,
2803 ..Default::default()
2804 },
2805 cx,
2806 )
2807 })
2808 .await?;
2809
2810 anyhow::Ok(cx.new(|cx| {
2811 Terminal::new(
2812 terminal_id,
2813 &format!("{} {}", command, args.join(" ")),
2814 cwd,
2815 output_byte_limit.map(|l| l as usize),
2816 terminal,
2817 language_registry,
2818 cx,
2819 )
2820 }))
2821 }
2822 });
2823
2824 cx.spawn(async move |this, cx| {
2825 let terminal = terminal_task.await?;
2826 this.update(cx, |this, _cx| {
2827 this.terminals.insert(terminal_id, terminal.clone());
2828 terminal
2829 })
2830 })
2831 }
2832
2833 pub fn kill_terminal(
2834 &mut self,
2835 terminal_id: acp::TerminalId,
2836 cx: &mut Context<Self>,
2837 ) -> Result<()> {
2838 self.terminals
2839 .get(&terminal_id)
2840 .context("Terminal not found")?
2841 .update(cx, |terminal, cx| {
2842 terminal.kill(cx);
2843 });
2844
2845 Ok(())
2846 }
2847
2848 pub fn release_terminal(
2849 &mut self,
2850 terminal_id: acp::TerminalId,
2851 cx: &mut Context<Self>,
2852 ) -> Result<()> {
2853 self.terminals
2854 .remove(&terminal_id)
2855 .context("Terminal not found")?
2856 .update(cx, |terminal, cx| {
2857 terminal.kill(cx);
2858 });
2859
2860 Ok(())
2861 }
2862
2863 pub fn terminal(&self, terminal_id: acp::TerminalId) -> Result<Entity<Terminal>> {
2864 self.terminals
2865 .get(&terminal_id)
2866 .context("Terminal not found")
2867 .cloned()
2868 }
2869
2870 pub fn to_markdown(&self, cx: &App) -> String {
2871 self.entries.iter().map(|e| e.to_markdown(cx)).collect()
2872 }
2873
2874 pub fn emit_load_error(&mut self, error: LoadError, cx: &mut Context<Self>) {
2875 cx.emit(AcpThreadEvent::LoadError(error));
2876 }
2877
2878 pub fn register_terminal_created(
2879 &mut self,
2880 terminal_id: acp::TerminalId,
2881 command_label: String,
2882 working_dir: Option<PathBuf>,
2883 output_byte_limit: Option<u64>,
2884 terminal: Entity<::terminal::Terminal>,
2885 cx: &mut Context<Self>,
2886 ) -> Entity<Terminal> {
2887 let language_registry = self.project.read(cx).languages().clone();
2888
2889 let entity = cx.new(|cx| {
2890 Terminal::new(
2891 terminal_id.clone(),
2892 &command_label,
2893 working_dir.clone(),
2894 output_byte_limit.map(|l| l as usize),
2895 terminal,
2896 language_registry,
2897 cx,
2898 )
2899 });
2900 self.terminals.insert(terminal_id.clone(), entity.clone());
2901 entity
2902 }
2903
2904 pub fn mark_as_subagent_output(&mut self, cx: &mut Context<Self>) {
2905 for entry in self.entries.iter_mut().rev() {
2906 if let AgentThreadEntry::AssistantMessage(assistant_message) = entry {
2907 assistant_message.is_subagent_output = true;
2908 cx.notify();
2909 return;
2910 }
2911 }
2912 }
2913
2914 pub fn on_terminal_provider_event(
2915 &mut self,
2916 event: TerminalProviderEvent,
2917 cx: &mut Context<Self>,
2918 ) {
2919 match event {
2920 TerminalProviderEvent::Created {
2921 terminal_id,
2922 label,
2923 cwd,
2924 output_byte_limit,
2925 terminal,
2926 } => {
2927 let entity = self.register_terminal_created(
2928 terminal_id.clone(),
2929 label,
2930 cwd,
2931 output_byte_limit,
2932 terminal,
2933 cx,
2934 );
2935
2936 if let Some(mut chunks) = self.pending_terminal_output.remove(&terminal_id) {
2937 for data in chunks.drain(..) {
2938 entity.update(cx, |term, cx| {
2939 term.inner().update(cx, |inner, cx| {
2940 inner.write_output(&data, cx);
2941 })
2942 });
2943 }
2944 }
2945
2946 if let Some(_status) = self.pending_terminal_exit.remove(&terminal_id) {
2947 entity.update(cx, |_term, cx| {
2948 cx.notify();
2949 });
2950 }
2951
2952 cx.notify();
2953 }
2954 TerminalProviderEvent::Output { terminal_id, data } => {
2955 if let Some(entity) = self.terminals.get(&terminal_id) {
2956 entity.update(cx, |term, cx| {
2957 term.inner().update(cx, |inner, cx| {
2958 inner.write_output(&data, cx);
2959 })
2960 });
2961 } else {
2962 self.pending_terminal_output
2963 .entry(terminal_id)
2964 .or_default()
2965 .push(data);
2966 }
2967 }
2968 TerminalProviderEvent::TitleChanged { terminal_id, title } => {
2969 if let Some(entity) = self.terminals.get(&terminal_id) {
2970 entity.update(cx, |term, cx| {
2971 term.inner().update(cx, |inner, cx| {
2972 inner.breadcrumb_text = title;
2973 cx.emit(::terminal::Event::BreadcrumbsChanged);
2974 })
2975 });
2976 }
2977 }
2978 TerminalProviderEvent::Exit {
2979 terminal_id,
2980 status,
2981 } => {
2982 if let Some(entity) = self.terminals.get(&terminal_id) {
2983 entity.update(cx, |_term, cx| {
2984 cx.notify();
2985 });
2986 } else {
2987 self.pending_terminal_exit.insert(terminal_id, status);
2988 }
2989 }
2990 }
2991 }
2992}
2993
2994fn markdown_for_raw_output(
2995 raw_output: &serde_json::Value,
2996 language_registry: &Arc<LanguageRegistry>,
2997 cx: &mut App,
2998) -> Option<Entity<Markdown>> {
2999 match raw_output {
3000 serde_json::Value::Null => None,
3001 serde_json::Value::Bool(value) => Some(cx.new(|cx| {
3002 Markdown::new(
3003 value.to_string().into(),
3004 Some(language_registry.clone()),
3005 None,
3006 cx,
3007 )
3008 })),
3009 serde_json::Value::Number(value) => Some(cx.new(|cx| {
3010 Markdown::new(
3011 value.to_string().into(),
3012 Some(language_registry.clone()),
3013 None,
3014 cx,
3015 )
3016 })),
3017 serde_json::Value::String(value) => Some(cx.new(|cx| {
3018 Markdown::new(
3019 value.clone().into(),
3020 Some(language_registry.clone()),
3021 None,
3022 cx,
3023 )
3024 })),
3025 value => Some(cx.new(|cx| {
3026 let pretty_json = to_string_pretty(value).unwrap_or_else(|_| value.to_string());
3027
3028 Markdown::new(
3029 format!("```json\n{}\n```", pretty_json).into(),
3030 Some(language_registry.clone()),
3031 None,
3032 cx,
3033 )
3034 })),
3035 }
3036}
3037
3038#[cfg(test)]
3039mod tests {
3040 use super::*;
3041 use anyhow::anyhow;
3042 use futures::{channel::mpsc, future::LocalBoxFuture, select};
3043 use gpui::{App, AsyncApp, TestAppContext, WeakEntity};
3044 use indoc::indoc;
3045 use project::{AgentId, FakeFs, Fs};
3046 use rand::{distr, prelude::*};
3047 use serde_json::json;
3048 use settings::SettingsStore;
3049 use smol::stream::StreamExt as _;
3050 use std::{
3051 any::Any,
3052 cell::RefCell,
3053 path::Path,
3054 rc::Rc,
3055 sync::atomic::{AtomicBool, AtomicUsize, Ordering::SeqCst},
3056 time::Duration,
3057 };
3058 use util::{path, path_list::PathList};
3059
3060 fn init_test(cx: &mut TestAppContext) {
3061 env_logger::try_init().ok();
3062 cx.update(|cx| {
3063 let settings_store = SettingsStore::test(cx);
3064 cx.set_global(settings_store);
3065 });
3066 }
3067
3068 #[gpui::test]
3069 async fn test_terminal_output_buffered_before_created_renders(cx: &mut gpui::TestAppContext) {
3070 init_test(cx);
3071
3072 let fs = FakeFs::new(cx.executor());
3073 let project = Project::test(fs, [], cx).await;
3074 let connection = Rc::new(FakeAgentConnection::new());
3075 let thread = cx
3076 .update(|cx| {
3077 connection.new_session(
3078 project,
3079 PathList::new(&[std::path::Path::new(path!("/test"))]),
3080 cx,
3081 )
3082 })
3083 .await
3084 .unwrap();
3085
3086 let terminal_id = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
3087
3088 // Send Output BEFORE Created - should be buffered by acp_thread
3089 thread.update(cx, |thread, cx| {
3090 thread.on_terminal_provider_event(
3091 TerminalProviderEvent::Output {
3092 terminal_id: terminal_id.clone(),
3093 data: b"hello buffered".to_vec(),
3094 },
3095 cx,
3096 );
3097 });
3098
3099 // Create a display-only terminal and then send Created
3100 let lower = cx.new(|cx| {
3101 let builder = ::terminal::TerminalBuilder::new_display_only(
3102 ::terminal::terminal_settings::CursorShape::default(),
3103 ::terminal::terminal_settings::AlternateScroll::On,
3104 None,
3105 0,
3106 cx.background_executor(),
3107 PathStyle::local(),
3108 )
3109 .unwrap();
3110 builder.subscribe(cx)
3111 });
3112
3113 thread.update(cx, |thread, cx| {
3114 thread.on_terminal_provider_event(
3115 TerminalProviderEvent::Created {
3116 terminal_id: terminal_id.clone(),
3117 label: "Buffered Test".to_string(),
3118 cwd: None,
3119 output_byte_limit: None,
3120 terminal: lower.clone(),
3121 },
3122 cx,
3123 );
3124 });
3125
3126 // After Created, buffered Output should have been flushed into the renderer
3127 let content = thread.read_with(cx, |thread, cx| {
3128 let term = thread.terminal(terminal_id.clone()).unwrap();
3129 term.read_with(cx, |t, cx| t.inner().read(cx).get_content())
3130 });
3131
3132 assert!(
3133 content.contains("hello buffered"),
3134 "expected buffered output to render, got: {content}"
3135 );
3136 }
3137
3138 #[gpui::test]
3139 async fn test_terminal_output_and_exit_buffered_before_created(cx: &mut gpui::TestAppContext) {
3140 init_test(cx);
3141
3142 let fs = FakeFs::new(cx.executor());
3143 let project = Project::test(fs, [], cx).await;
3144 let connection = Rc::new(FakeAgentConnection::new());
3145 let thread = cx
3146 .update(|cx| {
3147 connection.new_session(
3148 project,
3149 PathList::new(&[std::path::Path::new(path!("/test"))]),
3150 cx,
3151 )
3152 })
3153 .await
3154 .unwrap();
3155
3156 let terminal_id = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
3157
3158 // Send Output BEFORE Created
3159 thread.update(cx, |thread, cx| {
3160 thread.on_terminal_provider_event(
3161 TerminalProviderEvent::Output {
3162 terminal_id: terminal_id.clone(),
3163 data: b"pre-exit data".to_vec(),
3164 },
3165 cx,
3166 );
3167 });
3168
3169 // Send Exit BEFORE Created
3170 thread.update(cx, |thread, cx| {
3171 thread.on_terminal_provider_event(
3172 TerminalProviderEvent::Exit {
3173 terminal_id: terminal_id.clone(),
3174 status: acp::TerminalExitStatus::new().exit_code(0),
3175 },
3176 cx,
3177 );
3178 });
3179
3180 // Now create a display-only lower-level terminal and send Created
3181 let lower = cx.new(|cx| {
3182 let builder = ::terminal::TerminalBuilder::new_display_only(
3183 ::terminal::terminal_settings::CursorShape::default(),
3184 ::terminal::terminal_settings::AlternateScroll::On,
3185 None,
3186 0,
3187 cx.background_executor(),
3188 PathStyle::local(),
3189 )
3190 .unwrap();
3191 builder.subscribe(cx)
3192 });
3193
3194 thread.update(cx, |thread, cx| {
3195 thread.on_terminal_provider_event(
3196 TerminalProviderEvent::Created {
3197 terminal_id: terminal_id.clone(),
3198 label: "Buffered Exit Test".to_string(),
3199 cwd: None,
3200 output_byte_limit: None,
3201 terminal: lower.clone(),
3202 },
3203 cx,
3204 );
3205 });
3206
3207 // Output should be present after Created (flushed from buffer)
3208 let content = thread.read_with(cx, |thread, cx| {
3209 let term = thread.terminal(terminal_id.clone()).unwrap();
3210 term.read_with(cx, |t, cx| t.inner().read(cx).get_content())
3211 });
3212
3213 assert!(
3214 content.contains("pre-exit data"),
3215 "expected pre-exit data to render, got: {content}"
3216 );
3217 }
3218
3219 /// Test that killing a terminal via Terminal::kill properly:
3220 /// 1. Causes wait_for_exit to complete (doesn't hang forever)
3221 /// 2. The underlying terminal still has the output that was written before the kill
3222 ///
3223 /// This test verifies that the fix to kill_active_task (which now also kills
3224 /// the shell process in addition to the foreground process) properly allows
3225 /// wait_for_exit to complete instead of hanging indefinitely.
3226 #[cfg(unix)]
3227 #[gpui::test]
3228 async fn test_terminal_kill_allows_wait_for_exit_to_complete(cx: &mut gpui::TestAppContext) {
3229 use std::collections::HashMap;
3230 use task::Shell;
3231 use util::shell_builder::ShellBuilder;
3232
3233 init_test(cx);
3234 cx.executor().allow_parking();
3235
3236 let fs = FakeFs::new(cx.executor());
3237 let project = Project::test(fs, [], cx).await;
3238 let connection = Rc::new(FakeAgentConnection::new());
3239 let thread = cx
3240 .update(|cx| {
3241 connection.new_session(
3242 project.clone(),
3243 PathList::new(&[Path::new(path!("/test"))]),
3244 cx,
3245 )
3246 })
3247 .await
3248 .unwrap();
3249
3250 let terminal_id = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
3251
3252 // Create a real PTY terminal that runs a command which prints output then sleeps
3253 // We use printf instead of echo and chain with && sleep to ensure proper execution
3254 let (completion_tx, _completion_rx) = smol::channel::unbounded();
3255 let (program, args) = ShellBuilder::new(&Shell::System, false).build(
3256 Some("printf 'output_before_kill\\n' && sleep 60".to_owned()),
3257 &[],
3258 );
3259
3260 let builder = cx
3261 .update(|cx| {
3262 ::terminal::TerminalBuilder::new(
3263 None,
3264 None,
3265 task::Shell::WithArguments {
3266 program,
3267 args,
3268 title_override: None,
3269 },
3270 HashMap::default(),
3271 ::terminal::terminal_settings::CursorShape::default(),
3272 ::terminal::terminal_settings::AlternateScroll::On,
3273 None,
3274 vec![],
3275 0,
3276 false,
3277 0,
3278 Some(completion_tx),
3279 cx,
3280 vec![],
3281 PathStyle::local(),
3282 )
3283 })
3284 .await
3285 .unwrap();
3286
3287 let lower_terminal = cx.new(|cx| builder.subscribe(cx));
3288
3289 // Create the acp_thread Terminal wrapper
3290 thread.update(cx, |thread, cx| {
3291 thread.on_terminal_provider_event(
3292 TerminalProviderEvent::Created {
3293 terminal_id: terminal_id.clone(),
3294 label: "printf output_before_kill && sleep 60".to_string(),
3295 cwd: None,
3296 output_byte_limit: None,
3297 terminal: lower_terminal.clone(),
3298 },
3299 cx,
3300 );
3301 });
3302
3303 // Poll until the printf command produces output, rather than using a
3304 // fixed sleep which is flaky on loaded machines.
3305 let deadline = std::time::Instant::now() + Duration::from_secs(10);
3306 loop {
3307 let has_output = thread.read_with(cx, |thread, cx| {
3308 let term = thread
3309 .terminals
3310 .get(&terminal_id)
3311 .expect("terminal not found");
3312 let content = term.read(cx).inner().read(cx).get_content();
3313 content.contains("output_before_kill")
3314 });
3315 if has_output {
3316 break;
3317 }
3318 assert!(
3319 std::time::Instant::now() < deadline,
3320 "Timed out waiting for printf output to appear in terminal",
3321 );
3322 cx.executor().timer(Duration::from_millis(50)).await;
3323 }
3324
3325 // Get the acp_thread Terminal and kill it
3326 let wait_for_exit = thread.update(cx, |thread, cx| {
3327 let term = thread.terminals.get(&terminal_id).unwrap();
3328 let wait_for_exit = term.read(cx).wait_for_exit();
3329 term.update(cx, |term, cx| {
3330 term.kill(cx);
3331 });
3332 wait_for_exit
3333 });
3334
3335 // KEY ASSERTION: wait_for_exit should complete within a reasonable time (not hang).
3336 // Before the fix to kill_active_task, this would hang forever because
3337 // only the foreground process was killed, not the shell, so the PTY
3338 // child never exited and wait_for_completed_task never completed.
3339 let exit_result = futures::select! {
3340 result = futures::FutureExt::fuse(wait_for_exit) => Some(result),
3341 _ = futures::FutureExt::fuse(cx.background_executor.timer(Duration::from_secs(5))) => None,
3342 };
3343
3344 assert!(
3345 exit_result.is_some(),
3346 "wait_for_exit should complete after kill, but it timed out. \
3347 This indicates kill_active_task is not properly killing the shell process."
3348 );
3349
3350 // Give the system a chance to process any pending updates
3351 cx.run_until_parked();
3352
3353 // Verify that the underlying terminal still has the output that was
3354 // written before the kill. This verifies that killing doesn't lose output.
3355 let inner_content = thread.read_with(cx, |thread, cx| {
3356 let term = thread.terminals.get(&terminal_id).unwrap();
3357 term.read(cx).inner().read(cx).get_content()
3358 });
3359
3360 assert!(
3361 inner_content.contains("output_before_kill"),
3362 "Underlying terminal should contain output from before kill, got: {}",
3363 inner_content
3364 );
3365 }
3366
3367 #[gpui::test]
3368 async fn test_push_user_content_block(cx: &mut gpui::TestAppContext) {
3369 init_test(cx);
3370
3371 let fs = FakeFs::new(cx.executor());
3372 let project = Project::test(fs, [], cx).await;
3373 let connection = Rc::new(FakeAgentConnection::new());
3374 let thread = cx
3375 .update(|cx| {
3376 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
3377 })
3378 .await
3379 .unwrap();
3380
3381 // Test creating a new user message
3382 thread.update(cx, |thread, cx| {
3383 thread.push_user_content_block(None, "Hello, ".into(), cx);
3384 });
3385
3386 thread.update(cx, |thread, cx| {
3387 assert_eq!(thread.entries.len(), 1);
3388 if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
3389 assert_eq!(user_msg.id, None);
3390 assert_eq!(user_msg.content.to_markdown(cx), "Hello, ");
3391 } else {
3392 panic!("Expected UserMessage");
3393 }
3394 });
3395
3396 // Test appending to existing user message
3397 let message_1_id = UserMessageId::new();
3398 thread.update(cx, |thread, cx| {
3399 thread.push_user_content_block(Some(message_1_id.clone()), "world!".into(), cx);
3400 });
3401
3402 thread.update(cx, |thread, cx| {
3403 assert_eq!(thread.entries.len(), 1);
3404 if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
3405 assert_eq!(user_msg.id, Some(message_1_id));
3406 assert_eq!(user_msg.content.to_markdown(cx), "Hello, world!");
3407 } else {
3408 panic!("Expected UserMessage");
3409 }
3410 });
3411
3412 // Test creating new user message after assistant message
3413 thread.update(cx, |thread, cx| {
3414 thread.push_assistant_content_block("Assistant response".into(), false, cx);
3415 });
3416
3417 let message_2_id = UserMessageId::new();
3418 thread.update(cx, |thread, cx| {
3419 thread.push_user_content_block(
3420 Some(message_2_id.clone()),
3421 "New user message".into(),
3422 cx,
3423 );
3424 });
3425
3426 thread.update(cx, |thread, cx| {
3427 assert_eq!(thread.entries.len(), 3);
3428 if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[2] {
3429 assert_eq!(user_msg.id, Some(message_2_id));
3430 assert_eq!(user_msg.content.to_markdown(cx), "New user message");
3431 } else {
3432 panic!("Expected UserMessage at index 2");
3433 }
3434 });
3435 }
3436
3437 #[gpui::test]
3438 async fn test_thinking_concatenation(cx: &mut gpui::TestAppContext) {
3439 init_test(cx);
3440
3441 let fs = FakeFs::new(cx.executor());
3442 let project = Project::test(fs, [], cx).await;
3443 let connection = Rc::new(FakeAgentConnection::new().on_user_message(
3444 |_, thread, mut cx| {
3445 async move {
3446 thread.update(&mut cx, |thread, cx| {
3447 thread
3448 .handle_session_update(
3449 acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk::new(
3450 "Thinking ".into(),
3451 )),
3452 cx,
3453 )
3454 .unwrap();
3455 thread
3456 .handle_session_update(
3457 acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk::new(
3458 "hard!".into(),
3459 )),
3460 cx,
3461 )
3462 .unwrap();
3463 })?;
3464 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3465 }
3466 .boxed_local()
3467 },
3468 ));
3469
3470 let thread = cx
3471 .update(|cx| {
3472 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
3473 })
3474 .await
3475 .unwrap();
3476
3477 thread
3478 .update(cx, |thread, cx| thread.send_raw("Hello from Zed!", cx))
3479 .await
3480 .unwrap();
3481
3482 let output = thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
3483 assert_eq!(
3484 output,
3485 indoc! {r#"
3486 ## User
3487
3488 Hello from Zed!
3489
3490 ## Assistant
3491
3492 <thinking>
3493 Thinking hard!
3494 </thinking>
3495
3496 "#}
3497 );
3498 }
3499
3500 #[gpui::test]
3501 async fn test_ignore_echoed_user_message_chunks_during_active_turn(
3502 cx: &mut gpui::TestAppContext,
3503 ) {
3504 init_test(cx);
3505
3506 let fs = FakeFs::new(cx.executor());
3507 let project = Project::test(fs, [], cx).await;
3508 let connection = Rc::new(FakeAgentConnection::new().on_user_message(
3509 |request, thread, mut cx| {
3510 async move {
3511 let prompt = request.prompt.first().cloned().unwrap_or_else(|| "".into());
3512
3513 thread.update(&mut cx, |thread, cx| {
3514 thread
3515 .handle_session_update(
3516 acp::SessionUpdate::UserMessageChunk(acp::ContentChunk::new(
3517 prompt,
3518 )),
3519 cx,
3520 )
3521 .unwrap();
3522 })?;
3523
3524 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3525 }
3526 .boxed_local()
3527 },
3528 ));
3529
3530 let thread = cx
3531 .update(|cx| {
3532 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
3533 })
3534 .await
3535 .unwrap();
3536
3537 thread
3538 .update(cx, |thread, cx| thread.send_raw("Hello from Zed!", cx))
3539 .await
3540 .unwrap();
3541
3542 let output = thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
3543 assert_eq!(output.matches("Hello from Zed!").count(), 1);
3544 }
3545
3546 #[gpui::test]
3547 async fn test_edits_concurrently_to_user(cx: &mut TestAppContext) {
3548 init_test(cx);
3549
3550 let fs = FakeFs::new(cx.executor());
3551 fs.insert_tree(path!("/tmp"), json!({"foo": "one\ntwo\nthree\n"}))
3552 .await;
3553 let project = Project::test(fs.clone(), [], cx).await;
3554 let (read_file_tx, read_file_rx) = oneshot::channel::<()>();
3555 let read_file_tx = Rc::new(RefCell::new(Some(read_file_tx)));
3556 let connection = Rc::new(FakeAgentConnection::new().on_user_message(
3557 move |_, thread, mut cx| {
3558 let read_file_tx = read_file_tx.clone();
3559 async move {
3560 let content = thread
3561 .update(&mut cx, |thread, cx| {
3562 thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
3563 })
3564 .unwrap()
3565 .await
3566 .unwrap();
3567 assert_eq!(content, "one\ntwo\nthree\n");
3568 read_file_tx.take().unwrap().send(()).unwrap();
3569 thread
3570 .update(&mut cx, |thread, cx| {
3571 thread.write_text_file(
3572 path!("/tmp/foo").into(),
3573 "one\ntwo\nthree\nfour\nfive\n".to_string(),
3574 cx,
3575 )
3576 })
3577 .unwrap()
3578 .await
3579 .unwrap();
3580 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3581 }
3582 .boxed_local()
3583 },
3584 ));
3585
3586 let (worktree, pathbuf) = project
3587 .update(cx, |project, cx| {
3588 project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
3589 })
3590 .await
3591 .unwrap();
3592 let buffer = project
3593 .update(cx, |project, cx| {
3594 project.open_buffer((worktree.read(cx).id(), pathbuf), cx)
3595 })
3596 .await
3597 .unwrap();
3598
3599 let thread = cx
3600 .update(|cx| {
3601 connection.new_session(project, PathList::new(&[Path::new(path!("/tmp"))]), cx)
3602 })
3603 .await
3604 .unwrap();
3605
3606 let request = thread.update(cx, |thread, cx| {
3607 thread.send_raw("Extend the count in /tmp/foo", cx)
3608 });
3609 read_file_rx.await.ok();
3610 buffer.update(cx, |buffer, cx| {
3611 buffer.edit([(0..0, "zero\n".to_string())], None, cx);
3612 });
3613 cx.run_until_parked();
3614 assert_eq!(
3615 buffer.read_with(cx, |buffer, _| buffer.text()),
3616 "zero\none\ntwo\nthree\nfour\nfive\n"
3617 );
3618 assert_eq!(
3619 String::from_utf8(fs.read_file_sync(path!("/tmp/foo")).unwrap()).unwrap(),
3620 "zero\none\ntwo\nthree\nfour\nfive\n"
3621 );
3622 request.await.unwrap();
3623 }
3624
3625 #[gpui::test]
3626 async fn test_reading_from_line(cx: &mut TestAppContext) {
3627 init_test(cx);
3628
3629 let fs = FakeFs::new(cx.executor());
3630 fs.insert_tree(path!("/tmp"), json!({"foo": "one\ntwo\nthree\nfour\n"}))
3631 .await;
3632 let project = Project::test(fs.clone(), [], cx).await;
3633 project
3634 .update(cx, |project, cx| {
3635 project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
3636 })
3637 .await
3638 .unwrap();
3639
3640 let connection = Rc::new(FakeAgentConnection::new());
3641
3642 let thread = cx
3643 .update(|cx| {
3644 connection.new_session(project, PathList::new(&[Path::new(path!("/tmp"))]), cx)
3645 })
3646 .await
3647 .unwrap();
3648
3649 // Whole file
3650 let content = thread
3651 .update(cx, |thread, cx| {
3652 thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
3653 })
3654 .await
3655 .unwrap();
3656
3657 assert_eq!(content, "one\ntwo\nthree\nfour\n");
3658
3659 // Only start line
3660 let content = thread
3661 .update(cx, |thread, cx| {
3662 thread.read_text_file(path!("/tmp/foo").into(), Some(3), None, false, cx)
3663 })
3664 .await
3665 .unwrap();
3666
3667 assert_eq!(content, "three\nfour\n");
3668
3669 // Only limit
3670 let content = thread
3671 .update(cx, |thread, cx| {
3672 thread.read_text_file(path!("/tmp/foo").into(), None, Some(2), false, cx)
3673 })
3674 .await
3675 .unwrap();
3676
3677 assert_eq!(content, "one\ntwo\n");
3678
3679 // Range
3680 let content = thread
3681 .update(cx, |thread, cx| {
3682 thread.read_text_file(path!("/tmp/foo").into(), Some(2), Some(2), false, cx)
3683 })
3684 .await
3685 .unwrap();
3686
3687 assert_eq!(content, "two\nthree\n");
3688
3689 // Invalid
3690 let err = thread
3691 .update(cx, |thread, cx| {
3692 thread.read_text_file(path!("/tmp/foo").into(), Some(6), Some(2), false, cx)
3693 })
3694 .await
3695 .unwrap_err();
3696
3697 assert_eq!(
3698 err.to_string(),
3699 "Invalid params: \"Attempting to read beyond the end of the file, line 5:0\""
3700 );
3701 }
3702
3703 #[gpui::test]
3704 async fn test_reading_empty_file(cx: &mut TestAppContext) {
3705 init_test(cx);
3706
3707 let fs = FakeFs::new(cx.executor());
3708 fs.insert_tree(path!("/tmp"), json!({"foo": ""})).await;
3709 let project = Project::test(fs.clone(), [], cx).await;
3710 project
3711 .update(cx, |project, cx| {
3712 project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
3713 })
3714 .await
3715 .unwrap();
3716
3717 let connection = Rc::new(FakeAgentConnection::new());
3718
3719 let thread = cx
3720 .update(|cx| {
3721 connection.new_session(project, PathList::new(&[Path::new(path!("/tmp"))]), cx)
3722 })
3723 .await
3724 .unwrap();
3725
3726 // Whole file
3727 let content = thread
3728 .update(cx, |thread, cx| {
3729 thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
3730 })
3731 .await
3732 .unwrap();
3733
3734 assert_eq!(content, "");
3735
3736 // Only start line
3737 let content = thread
3738 .update(cx, |thread, cx| {
3739 thread.read_text_file(path!("/tmp/foo").into(), Some(1), None, false, cx)
3740 })
3741 .await
3742 .unwrap();
3743
3744 assert_eq!(content, "");
3745
3746 // Only limit
3747 let content = thread
3748 .update(cx, |thread, cx| {
3749 thread.read_text_file(path!("/tmp/foo").into(), None, Some(2), false, cx)
3750 })
3751 .await
3752 .unwrap();
3753
3754 assert_eq!(content, "");
3755
3756 // Range
3757 let content = thread
3758 .update(cx, |thread, cx| {
3759 thread.read_text_file(path!("/tmp/foo").into(), Some(1), Some(1), false, cx)
3760 })
3761 .await
3762 .unwrap();
3763
3764 assert_eq!(content, "");
3765
3766 // Invalid
3767 let err = thread
3768 .update(cx, |thread, cx| {
3769 thread.read_text_file(path!("/tmp/foo").into(), Some(5), Some(2), false, cx)
3770 })
3771 .await
3772 .unwrap_err();
3773
3774 assert_eq!(
3775 err.to_string(),
3776 "Invalid params: \"Attempting to read beyond the end of the file, line 1:0\""
3777 );
3778 }
3779 #[gpui::test]
3780 async fn test_reading_non_existing_file(cx: &mut TestAppContext) {
3781 init_test(cx);
3782
3783 let fs = FakeFs::new(cx.executor());
3784 fs.insert_tree(path!("/tmp"), json!({})).await;
3785 let project = Project::test(fs.clone(), [], cx).await;
3786 project
3787 .update(cx, |project, cx| {
3788 project.find_or_create_worktree(path!("/tmp"), true, cx)
3789 })
3790 .await
3791 .unwrap();
3792
3793 let connection = Rc::new(FakeAgentConnection::new());
3794
3795 let thread = cx
3796 .update(|cx| {
3797 connection.new_session(project, PathList::new(&[Path::new(path!("/tmp"))]), cx)
3798 })
3799 .await
3800 .unwrap();
3801
3802 // Out of project file
3803 let err = thread
3804 .update(cx, |thread, cx| {
3805 thread.read_text_file(path!("/foo").into(), None, None, false, cx)
3806 })
3807 .await
3808 .unwrap_err();
3809
3810 assert_eq!(err.code, acp::ErrorCode::ResourceNotFound);
3811 }
3812
3813 #[gpui::test]
3814 async fn test_succeeding_canceled_toolcall(cx: &mut TestAppContext) {
3815 init_test(cx);
3816
3817 let fs = FakeFs::new(cx.executor());
3818 let project = Project::test(fs, [], cx).await;
3819 let id = acp::ToolCallId::new("test");
3820
3821 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3822 let id = id.clone();
3823 move |_, thread, mut cx| {
3824 let id = id.clone();
3825 async move {
3826 thread
3827 .update(&mut cx, |thread, cx| {
3828 thread.handle_session_update(
3829 acp::SessionUpdate::ToolCall(
3830 acp::ToolCall::new(id.clone(), "Label")
3831 .kind(acp::ToolKind::Fetch)
3832 .status(acp::ToolCallStatus::InProgress),
3833 ),
3834 cx,
3835 )
3836 })
3837 .unwrap()
3838 .unwrap();
3839 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3840 }
3841 .boxed_local()
3842 }
3843 }));
3844
3845 let thread = cx
3846 .update(|cx| {
3847 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
3848 })
3849 .await
3850 .unwrap();
3851
3852 let request = thread.update(cx, |thread, cx| {
3853 thread.send_raw("Fetch https://example.com", cx)
3854 });
3855
3856 run_until_first_tool_call(&thread, cx).await;
3857
3858 thread.read_with(cx, |thread, _| {
3859 assert!(matches!(
3860 thread.entries[1],
3861 AgentThreadEntry::ToolCall(ToolCall {
3862 status: ToolCallStatus::InProgress,
3863 ..
3864 })
3865 ));
3866 });
3867
3868 thread.update(cx, |thread, cx| thread.cancel(cx)).await;
3869
3870 thread.read_with(cx, |thread, _| {
3871 assert!(matches!(
3872 &thread.entries[1],
3873 AgentThreadEntry::ToolCall(ToolCall {
3874 status: ToolCallStatus::Canceled,
3875 ..
3876 })
3877 ));
3878 });
3879
3880 thread
3881 .update(cx, |thread, cx| {
3882 thread.handle_session_update(
3883 acp::SessionUpdate::ToolCallUpdate(acp::ToolCallUpdate::new(
3884 id,
3885 acp::ToolCallUpdateFields::new().status(acp::ToolCallStatus::Completed),
3886 )),
3887 cx,
3888 )
3889 })
3890 .unwrap();
3891
3892 request.await.unwrap();
3893
3894 thread.read_with(cx, |thread, _| {
3895 assert!(matches!(
3896 thread.entries[1],
3897 AgentThreadEntry::ToolCall(ToolCall {
3898 status: ToolCallStatus::Completed,
3899 ..
3900 })
3901 ));
3902 });
3903 }
3904
3905 #[gpui::test]
3906 async fn test_no_pending_edits_if_tool_calls_are_completed(cx: &mut TestAppContext) {
3907 init_test(cx);
3908 let fs = FakeFs::new(cx.background_executor.clone());
3909 fs.insert_tree(path!("/test"), json!({})).await;
3910 let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
3911
3912 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3913 move |_, thread, mut cx| {
3914 async move {
3915 thread
3916 .update(&mut cx, |thread, cx| {
3917 thread.handle_session_update(
3918 acp::SessionUpdate::ToolCall(
3919 acp::ToolCall::new("test", "Label")
3920 .kind(acp::ToolKind::Edit)
3921 .status(acp::ToolCallStatus::Completed)
3922 .content(vec![acp::ToolCallContent::Diff(acp::Diff::new(
3923 "/test/test.txt",
3924 "foo",
3925 ))]),
3926 ),
3927 cx,
3928 )
3929 })
3930 .unwrap()
3931 .unwrap();
3932 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3933 }
3934 .boxed_local()
3935 }
3936 }));
3937
3938 let thread = cx
3939 .update(|cx| {
3940 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
3941 })
3942 .await
3943 .unwrap();
3944
3945 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Hi".into()], cx)))
3946 .await
3947 .unwrap();
3948
3949 assert!(cx.read(|cx| !thread.read(cx).has_pending_edit_tool_calls()));
3950 }
3951
3952 #[gpui::test(iterations = 10)]
3953 async fn test_checkpoints(cx: &mut TestAppContext) {
3954 init_test(cx);
3955 let fs = FakeFs::new(cx.background_executor.clone());
3956 fs.insert_tree(
3957 path!("/test"),
3958 json!({
3959 ".git": {}
3960 }),
3961 )
3962 .await;
3963 let project = Project::test(fs.clone(), [path!("/test").as_ref()], cx).await;
3964
3965 let simulate_changes = Arc::new(AtomicBool::new(true));
3966 let next_filename = Arc::new(AtomicUsize::new(0));
3967 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3968 let simulate_changes = simulate_changes.clone();
3969 let next_filename = next_filename.clone();
3970 let fs = fs.clone();
3971 move |request, thread, mut cx| {
3972 let fs = fs.clone();
3973 let simulate_changes = simulate_changes.clone();
3974 let next_filename = next_filename.clone();
3975 async move {
3976 if simulate_changes.load(SeqCst) {
3977 let filename = format!("/test/file-{}", next_filename.fetch_add(1, SeqCst));
3978 fs.write(Path::new(&filename), b"").await?;
3979 }
3980
3981 let acp::ContentBlock::Text(content) = &request.prompt[0] else {
3982 panic!("expected text content block");
3983 };
3984 thread.update(&mut cx, |thread, cx| {
3985 thread
3986 .handle_session_update(
3987 acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk::new(
3988 content.text.to_uppercase().into(),
3989 )),
3990 cx,
3991 )
3992 .unwrap();
3993 })?;
3994 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3995 }
3996 .boxed_local()
3997 }
3998 }));
3999 let thread = cx
4000 .update(|cx| {
4001 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4002 })
4003 .await
4004 .unwrap();
4005
4006 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Lorem".into()], cx)))
4007 .await
4008 .unwrap();
4009 thread.read_with(cx, |thread, cx| {
4010 assert_eq!(
4011 thread.to_markdown(cx),
4012 indoc! {"
4013 ## User (checkpoint)
4014
4015 Lorem
4016
4017 ## Assistant
4018
4019 LOREM
4020
4021 "}
4022 );
4023 });
4024 assert_eq!(fs.files(), vec![Path::new(path!("/test/file-0"))]);
4025
4026 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["ipsum".into()], cx)))
4027 .await
4028 .unwrap();
4029 thread.read_with(cx, |thread, cx| {
4030 assert_eq!(
4031 thread.to_markdown(cx),
4032 indoc! {"
4033 ## User (checkpoint)
4034
4035 Lorem
4036
4037 ## Assistant
4038
4039 LOREM
4040
4041 ## User (checkpoint)
4042
4043 ipsum
4044
4045 ## Assistant
4046
4047 IPSUM
4048
4049 "}
4050 );
4051 });
4052 assert_eq!(
4053 fs.files(),
4054 vec![
4055 Path::new(path!("/test/file-0")),
4056 Path::new(path!("/test/file-1"))
4057 ]
4058 );
4059
4060 // Checkpoint isn't stored when there are no changes.
4061 simulate_changes.store(false, SeqCst);
4062 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["dolor".into()], cx)))
4063 .await
4064 .unwrap();
4065 thread.read_with(cx, |thread, cx| {
4066 assert_eq!(
4067 thread.to_markdown(cx),
4068 indoc! {"
4069 ## User (checkpoint)
4070
4071 Lorem
4072
4073 ## Assistant
4074
4075 LOREM
4076
4077 ## User (checkpoint)
4078
4079 ipsum
4080
4081 ## Assistant
4082
4083 IPSUM
4084
4085 ## User
4086
4087 dolor
4088
4089 ## Assistant
4090
4091 DOLOR
4092
4093 "}
4094 );
4095 });
4096 assert_eq!(
4097 fs.files(),
4098 vec![
4099 Path::new(path!("/test/file-0")),
4100 Path::new(path!("/test/file-1"))
4101 ]
4102 );
4103
4104 // Rewinding the conversation truncates the history and restores the checkpoint.
4105 thread
4106 .update(cx, |thread, cx| {
4107 let AgentThreadEntry::UserMessage(message) = &thread.entries[2] else {
4108 panic!("unexpected entries {:?}", thread.entries)
4109 };
4110 thread.restore_checkpoint(message.id.clone().unwrap(), cx)
4111 })
4112 .await
4113 .unwrap();
4114 thread.read_with(cx, |thread, cx| {
4115 assert_eq!(
4116 thread.to_markdown(cx),
4117 indoc! {"
4118 ## User (checkpoint)
4119
4120 Lorem
4121
4122 ## Assistant
4123
4124 LOREM
4125
4126 "}
4127 );
4128 });
4129 assert_eq!(fs.files(), vec![Path::new(path!("/test/file-0"))]);
4130 }
4131
4132 #[gpui::test]
4133 async fn test_tool_result_refusal(cx: &mut TestAppContext) {
4134 use std::sync::atomic::AtomicUsize;
4135 init_test(cx);
4136
4137 let fs = FakeFs::new(cx.executor());
4138 let project = Project::test(fs, None, cx).await;
4139
4140 // Create a connection that simulates refusal after tool result
4141 let prompt_count = Arc::new(AtomicUsize::new(0));
4142 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
4143 let prompt_count = prompt_count.clone();
4144 move |_request, thread, mut cx| {
4145 let count = prompt_count.fetch_add(1, SeqCst);
4146 async move {
4147 if count == 0 {
4148 // First prompt: Generate a tool call with result
4149 thread.update(&mut cx, |thread, cx| {
4150 thread
4151 .handle_session_update(
4152 acp::SessionUpdate::ToolCall(
4153 acp::ToolCall::new("tool1", "Test Tool")
4154 .kind(acp::ToolKind::Fetch)
4155 .status(acp::ToolCallStatus::Completed)
4156 .raw_input(serde_json::json!({"query": "test"}))
4157 .raw_output(serde_json::json!({"result": "inappropriate content"})),
4158 ),
4159 cx,
4160 )
4161 .unwrap();
4162 })?;
4163
4164 // Now return refusal because of the tool result
4165 Ok(acp::PromptResponse::new(acp::StopReason::Refusal))
4166 } else {
4167 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
4168 }
4169 }
4170 .boxed_local()
4171 }
4172 }));
4173
4174 let thread = cx
4175 .update(|cx| {
4176 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4177 })
4178 .await
4179 .unwrap();
4180
4181 // Track if we see a Refusal event
4182 let saw_refusal_event = Arc::new(std::sync::Mutex::new(false));
4183 let saw_refusal_event_captured = saw_refusal_event.clone();
4184 thread.update(cx, |_thread, cx| {
4185 cx.subscribe(
4186 &thread,
4187 move |_thread, _event_thread, event: &AcpThreadEvent, _cx| {
4188 if matches!(event, AcpThreadEvent::Refusal) {
4189 *saw_refusal_event_captured.lock().unwrap() = true;
4190 }
4191 },
4192 )
4193 .detach();
4194 });
4195
4196 // Send a user message - this will trigger tool call and then refusal
4197 let send_task = thread.update(cx, |thread, cx| thread.send(vec!["Hello".into()], cx));
4198 cx.background_executor.spawn(send_task).detach();
4199 cx.run_until_parked();
4200
4201 // Verify that:
4202 // 1. A Refusal event WAS emitted (because it's a tool result refusal, not user prompt)
4203 // 2. The user message was NOT truncated
4204 assert!(
4205 *saw_refusal_event.lock().unwrap(),
4206 "Refusal event should be emitted for tool result refusals"
4207 );
4208
4209 thread.read_with(cx, |thread, _| {
4210 let entries = thread.entries();
4211 assert!(entries.len() >= 2, "Should have user message and tool call");
4212
4213 // Verify user message is still there
4214 assert!(
4215 matches!(entries[0], AgentThreadEntry::UserMessage(_)),
4216 "User message should not be truncated"
4217 );
4218
4219 // Verify tool call is there with result
4220 if let AgentThreadEntry::ToolCall(tool_call) = &entries[1] {
4221 assert!(
4222 tool_call.raw_output.is_some(),
4223 "Tool call should have output"
4224 );
4225 } else {
4226 panic!("Expected tool call at index 1");
4227 }
4228 });
4229 }
4230
4231 #[gpui::test]
4232 async fn test_user_prompt_refusal_emits_event(cx: &mut TestAppContext) {
4233 init_test(cx);
4234
4235 let fs = FakeFs::new(cx.executor());
4236 let project = Project::test(fs, None, cx).await;
4237
4238 let refuse_next = Arc::new(AtomicBool::new(false));
4239 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
4240 let refuse_next = refuse_next.clone();
4241 move |_request, _thread, _cx| {
4242 if refuse_next.load(SeqCst) {
4243 async move { Ok(acp::PromptResponse::new(acp::StopReason::Refusal)) }
4244 .boxed_local()
4245 } else {
4246 async move { Ok(acp::PromptResponse::new(acp::StopReason::EndTurn)) }
4247 .boxed_local()
4248 }
4249 }
4250 }));
4251
4252 let thread = cx
4253 .update(|cx| {
4254 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4255 })
4256 .await
4257 .unwrap();
4258
4259 // Track if we see a Refusal event
4260 let saw_refusal_event = Arc::new(std::sync::Mutex::new(false));
4261 let saw_refusal_event_captured = saw_refusal_event.clone();
4262 thread.update(cx, |_thread, cx| {
4263 cx.subscribe(
4264 &thread,
4265 move |_thread, _event_thread, event: &AcpThreadEvent, _cx| {
4266 if matches!(event, AcpThreadEvent::Refusal) {
4267 *saw_refusal_event_captured.lock().unwrap() = true;
4268 }
4269 },
4270 )
4271 .detach();
4272 });
4273
4274 // Send a message that will be refused
4275 refuse_next.store(true, SeqCst);
4276 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["hello".into()], cx)))
4277 .await
4278 .unwrap();
4279
4280 // Verify that a Refusal event WAS emitted for user prompt refusal
4281 assert!(
4282 *saw_refusal_event.lock().unwrap(),
4283 "Refusal event should be emitted for user prompt refusals"
4284 );
4285
4286 // Verify the message was truncated (user prompt refusal)
4287 thread.read_with(cx, |thread, cx| {
4288 assert_eq!(thread.to_markdown(cx), "");
4289 });
4290 }
4291
4292 #[gpui::test]
4293 async fn test_refusal(cx: &mut TestAppContext) {
4294 init_test(cx);
4295 let fs = FakeFs::new(cx.background_executor.clone());
4296 fs.insert_tree(path!("/"), json!({})).await;
4297 let project = Project::test(fs.clone(), [path!("/").as_ref()], cx).await;
4298
4299 let refuse_next = Arc::new(AtomicBool::new(false));
4300 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
4301 let refuse_next = refuse_next.clone();
4302 move |request, thread, mut cx| {
4303 let refuse_next = refuse_next.clone();
4304 async move {
4305 if refuse_next.load(SeqCst) {
4306 return Ok(acp::PromptResponse::new(acp::StopReason::Refusal));
4307 }
4308
4309 let acp::ContentBlock::Text(content) = &request.prompt[0] else {
4310 panic!("expected text content block");
4311 };
4312 thread.update(&mut cx, |thread, cx| {
4313 thread
4314 .handle_session_update(
4315 acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk::new(
4316 content.text.to_uppercase().into(),
4317 )),
4318 cx,
4319 )
4320 .unwrap();
4321 })?;
4322 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
4323 }
4324 .boxed_local()
4325 }
4326 }));
4327 let thread = cx
4328 .update(|cx| {
4329 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4330 })
4331 .await
4332 .unwrap();
4333
4334 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["hello".into()], cx)))
4335 .await
4336 .unwrap();
4337 thread.read_with(cx, |thread, cx| {
4338 assert_eq!(
4339 thread.to_markdown(cx),
4340 indoc! {"
4341 ## User
4342
4343 hello
4344
4345 ## Assistant
4346
4347 HELLO
4348
4349 "}
4350 );
4351 });
4352
4353 // Simulate refusing the second message. The message should be truncated
4354 // when a user prompt is refused.
4355 refuse_next.store(true, SeqCst);
4356 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["world".into()], cx)))
4357 .await
4358 .unwrap();
4359 thread.read_with(cx, |thread, cx| {
4360 assert_eq!(
4361 thread.to_markdown(cx),
4362 indoc! {"
4363 ## User
4364
4365 hello
4366
4367 ## Assistant
4368
4369 HELLO
4370
4371 "}
4372 );
4373 });
4374 }
4375
4376 async fn run_until_first_tool_call(
4377 thread: &Entity<AcpThread>,
4378 cx: &mut TestAppContext,
4379 ) -> usize {
4380 let (mut tx, mut rx) = mpsc::channel::<usize>(1);
4381
4382 let subscription = cx.update(|cx| {
4383 cx.subscribe(thread, move |thread, _, cx| {
4384 for (ix, entry) in thread.read(cx).entries.iter().enumerate() {
4385 if matches!(entry, AgentThreadEntry::ToolCall(_)) {
4386 return tx.try_send(ix).unwrap();
4387 }
4388 }
4389 })
4390 });
4391
4392 select! {
4393 _ = futures::FutureExt::fuse(cx.background_executor.timer(Duration::from_secs(10))) => {
4394 panic!("Timeout waiting for tool call")
4395 }
4396 ix = rx.next().fuse() => {
4397 drop(subscription);
4398 ix.unwrap()
4399 }
4400 }
4401 }
4402
4403 #[derive(Clone, Default)]
4404 struct FakeAgentConnection {
4405 auth_methods: Vec<acp::AuthMethod>,
4406 supports_truncate: bool,
4407 sessions: Arc<parking_lot::Mutex<HashMap<acp::SessionId, WeakEntity<AcpThread>>>>,
4408 set_title_calls: Rc<RefCell<Vec<SharedString>>>,
4409 on_user_message: Option<
4410 Rc<
4411 dyn Fn(
4412 acp::PromptRequest,
4413 WeakEntity<AcpThread>,
4414 AsyncApp,
4415 ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
4416 + 'static,
4417 >,
4418 >,
4419 }
4420
4421 impl FakeAgentConnection {
4422 fn new() -> Self {
4423 Self {
4424 auth_methods: Vec::new(),
4425 supports_truncate: true,
4426 on_user_message: None,
4427 sessions: Arc::default(),
4428 set_title_calls: Default::default(),
4429 }
4430 }
4431
4432 fn without_truncate_support(mut self) -> Self {
4433 self.supports_truncate = false;
4434 self
4435 }
4436
4437 #[expect(unused)]
4438 fn with_auth_methods(mut self, auth_methods: Vec<acp::AuthMethod>) -> Self {
4439 self.auth_methods = auth_methods;
4440 self
4441 }
4442
4443 fn on_user_message(
4444 mut self,
4445 handler: impl Fn(
4446 acp::PromptRequest,
4447 WeakEntity<AcpThread>,
4448 AsyncApp,
4449 ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
4450 + 'static,
4451 ) -> Self {
4452 self.on_user_message.replace(Rc::new(handler));
4453 self
4454 }
4455 }
4456
4457 impl AgentConnection for FakeAgentConnection {
4458 fn agent_id(&self) -> AgentId {
4459 AgentId::new("fake")
4460 }
4461
4462 fn telemetry_id(&self) -> SharedString {
4463 "fake".into()
4464 }
4465
4466 fn auth_methods(&self) -> &[acp::AuthMethod] {
4467 &self.auth_methods
4468 }
4469
4470 fn new_session(
4471 self: Rc<Self>,
4472 project: Entity<Project>,
4473 work_dirs: PathList,
4474 cx: &mut App,
4475 ) -> Task<gpui::Result<Entity<AcpThread>>> {
4476 let session_id = acp::SessionId::new(
4477 rand::rng()
4478 .sample_iter(&distr::Alphanumeric)
4479 .take(7)
4480 .map(char::from)
4481 .collect::<String>(),
4482 );
4483 let action_log = cx.new(|_| ActionLog::new(project.clone()));
4484 let thread = cx.new(|cx| {
4485 AcpThread::new(
4486 None,
4487 None,
4488 Some(work_dirs),
4489 self.clone(),
4490 project,
4491 action_log,
4492 session_id.clone(),
4493 watch::Receiver::constant(
4494 acp::PromptCapabilities::new()
4495 .image(true)
4496 .audio(true)
4497 .embedded_context(true),
4498 ),
4499 cx,
4500 )
4501 });
4502 self.sessions.lock().insert(session_id, thread.downgrade());
4503 Task::ready(Ok(thread))
4504 }
4505
4506 fn authenticate(&self, method: acp::AuthMethodId, _cx: &mut App) -> Task<gpui::Result<()>> {
4507 if self.auth_methods().iter().any(|m| m.id() == &method) {
4508 Task::ready(Ok(()))
4509 } else {
4510 Task::ready(Err(anyhow!("Invalid Auth Method")))
4511 }
4512 }
4513
4514 fn prompt(
4515 &self,
4516 _id: UserMessageId,
4517 params: acp::PromptRequest,
4518 cx: &mut App,
4519 ) -> Task<gpui::Result<acp::PromptResponse>> {
4520 let sessions = self.sessions.lock();
4521 let thread = sessions.get(¶ms.session_id).unwrap();
4522 if let Some(handler) = &self.on_user_message {
4523 let handler = handler.clone();
4524 let thread = thread.clone();
4525 cx.spawn(async move |cx| handler(params, thread, cx.clone()).await)
4526 } else {
4527 Task::ready(Ok(acp::PromptResponse::new(acp::StopReason::EndTurn)))
4528 }
4529 }
4530
4531 fn cancel(&self, _session_id: &acp::SessionId, _cx: &mut App) {}
4532
4533 fn truncate(
4534 &self,
4535 session_id: &acp::SessionId,
4536 _cx: &App,
4537 ) -> Option<Rc<dyn AgentSessionTruncate>> {
4538 self.supports_truncate.then(|| {
4539 Rc::new(FakeAgentSessionEditor {
4540 _session_id: session_id.clone(),
4541 }) as Rc<dyn AgentSessionTruncate>
4542 })
4543 }
4544
4545 fn set_title(
4546 &self,
4547 _session_id: &acp::SessionId,
4548 _cx: &App,
4549 ) -> Option<Rc<dyn AgentSessionSetTitle>> {
4550 Some(Rc::new(FakeAgentSessionSetTitle {
4551 calls: self.set_title_calls.clone(),
4552 }))
4553 }
4554
4555 fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
4556 self
4557 }
4558 }
4559
4560 struct FakeAgentSessionSetTitle {
4561 calls: Rc<RefCell<Vec<SharedString>>>,
4562 }
4563
4564 impl AgentSessionSetTitle for FakeAgentSessionSetTitle {
4565 fn run(&self, title: SharedString, _cx: &mut App) -> Task<Result<()>> {
4566 self.calls.borrow_mut().push(title);
4567 Task::ready(Ok(()))
4568 }
4569 }
4570
4571 struct FakeAgentSessionEditor {
4572 _session_id: acp::SessionId,
4573 }
4574
4575 impl AgentSessionTruncate for FakeAgentSessionEditor {
4576 fn run(&self, _message_id: UserMessageId, _cx: &mut App) -> Task<Result<()>> {
4577 Task::ready(Ok(()))
4578 }
4579 }
4580
4581 #[gpui::test]
4582 async fn test_tool_call_not_found_creates_failed_entry(cx: &mut TestAppContext) {
4583 init_test(cx);
4584
4585 let fs = FakeFs::new(cx.executor());
4586 let project = Project::test(fs, [], cx).await;
4587 let connection = Rc::new(FakeAgentConnection::new());
4588 let thread = cx
4589 .update(|cx| {
4590 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4591 })
4592 .await
4593 .unwrap();
4594
4595 // Try to update a tool call that doesn't exist
4596 let nonexistent_id = acp::ToolCallId::new("nonexistent-tool-call");
4597 thread.update(cx, |thread, cx| {
4598 let result = thread.handle_session_update(
4599 acp::SessionUpdate::ToolCallUpdate(acp::ToolCallUpdate::new(
4600 nonexistent_id.clone(),
4601 acp::ToolCallUpdateFields::new().status(acp::ToolCallStatus::Completed),
4602 )),
4603 cx,
4604 );
4605
4606 // The update should succeed (not return an error)
4607 assert!(result.is_ok());
4608
4609 // There should now be exactly one entry in the thread
4610 assert_eq!(thread.entries.len(), 1);
4611
4612 // The entry should be a failed tool call
4613 if let AgentThreadEntry::ToolCall(tool_call) = &thread.entries[0] {
4614 assert_eq!(tool_call.id, nonexistent_id);
4615 assert!(matches!(tool_call.status, ToolCallStatus::Failed));
4616 assert_eq!(tool_call.kind, acp::ToolKind::Fetch);
4617
4618 // Check that the content contains the error message
4619 assert_eq!(tool_call.content.len(), 1);
4620 if let ToolCallContent::ContentBlock(content_block) = &tool_call.content[0] {
4621 match content_block {
4622 ContentBlock::Markdown { markdown } => {
4623 let markdown_text = markdown.read(cx).source();
4624 assert!(markdown_text.contains("Tool call not found"));
4625 }
4626 ContentBlock::Empty => panic!("Expected markdown content, got empty"),
4627 ContentBlock::ResourceLink { .. } => {
4628 panic!("Expected markdown content, got resource link")
4629 }
4630 ContentBlock::Image { .. } => {
4631 panic!("Expected markdown content, got image")
4632 }
4633 }
4634 } else {
4635 panic!("Expected ContentBlock, got: {:?}", tool_call.content[0]);
4636 }
4637 } else {
4638 panic!("Expected ToolCall entry, got: {:?}", thread.entries[0]);
4639 }
4640 });
4641 }
4642
4643 /// Tests that restoring a checkpoint properly cleans up terminals that were
4644 /// created after that checkpoint, and cancels any in-progress generation.
4645 ///
4646 /// Reproduces issue #35142: When a checkpoint is restored, any terminal processes
4647 /// that were started after that checkpoint should be terminated, and any in-progress
4648 /// AI generation should be canceled.
4649 #[gpui::test]
4650 async fn test_restore_checkpoint_kills_terminal(cx: &mut TestAppContext) {
4651 init_test(cx);
4652
4653 let fs = FakeFs::new(cx.executor());
4654 let project = Project::test(fs, [], cx).await;
4655 let connection = Rc::new(FakeAgentConnection::new());
4656 let thread = cx
4657 .update(|cx| {
4658 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4659 })
4660 .await
4661 .unwrap();
4662
4663 // Send first user message to create a checkpoint
4664 cx.update(|cx| {
4665 thread.update(cx, |thread, cx| {
4666 thread.send(vec!["first message".into()], cx)
4667 })
4668 })
4669 .await
4670 .unwrap();
4671
4672 // Send second message (creates another checkpoint) - we'll restore to this one
4673 cx.update(|cx| {
4674 thread.update(cx, |thread, cx| {
4675 thread.send(vec!["second message".into()], cx)
4676 })
4677 })
4678 .await
4679 .unwrap();
4680
4681 // Create 2 terminals BEFORE the checkpoint that have completed running
4682 let terminal_id_1 = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
4683 let mock_terminal_1 = cx.new(|cx| {
4684 let builder = ::terminal::TerminalBuilder::new_display_only(
4685 ::terminal::terminal_settings::CursorShape::default(),
4686 ::terminal::terminal_settings::AlternateScroll::On,
4687 None,
4688 0,
4689 cx.background_executor(),
4690 PathStyle::local(),
4691 )
4692 .unwrap();
4693 builder.subscribe(cx)
4694 });
4695
4696 thread.update(cx, |thread, cx| {
4697 thread.on_terminal_provider_event(
4698 TerminalProviderEvent::Created {
4699 terminal_id: terminal_id_1.clone(),
4700 label: "echo 'first'".to_string(),
4701 cwd: Some(PathBuf::from("/test")),
4702 output_byte_limit: None,
4703 terminal: mock_terminal_1.clone(),
4704 },
4705 cx,
4706 );
4707 });
4708
4709 thread.update(cx, |thread, cx| {
4710 thread.on_terminal_provider_event(
4711 TerminalProviderEvent::Output {
4712 terminal_id: terminal_id_1.clone(),
4713 data: b"first\n".to_vec(),
4714 },
4715 cx,
4716 );
4717 });
4718
4719 thread.update(cx, |thread, cx| {
4720 thread.on_terminal_provider_event(
4721 TerminalProviderEvent::Exit {
4722 terminal_id: terminal_id_1.clone(),
4723 status: acp::TerminalExitStatus::new().exit_code(0),
4724 },
4725 cx,
4726 );
4727 });
4728
4729 let terminal_id_2 = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
4730 let mock_terminal_2 = cx.new(|cx| {
4731 let builder = ::terminal::TerminalBuilder::new_display_only(
4732 ::terminal::terminal_settings::CursorShape::default(),
4733 ::terminal::terminal_settings::AlternateScroll::On,
4734 None,
4735 0,
4736 cx.background_executor(),
4737 PathStyle::local(),
4738 )
4739 .unwrap();
4740 builder.subscribe(cx)
4741 });
4742
4743 thread.update(cx, |thread, cx| {
4744 thread.on_terminal_provider_event(
4745 TerminalProviderEvent::Created {
4746 terminal_id: terminal_id_2.clone(),
4747 label: "echo 'second'".to_string(),
4748 cwd: Some(PathBuf::from("/test")),
4749 output_byte_limit: None,
4750 terminal: mock_terminal_2.clone(),
4751 },
4752 cx,
4753 );
4754 });
4755
4756 thread.update(cx, |thread, cx| {
4757 thread.on_terminal_provider_event(
4758 TerminalProviderEvent::Output {
4759 terminal_id: terminal_id_2.clone(),
4760 data: b"second\n".to_vec(),
4761 },
4762 cx,
4763 );
4764 });
4765
4766 thread.update(cx, |thread, cx| {
4767 thread.on_terminal_provider_event(
4768 TerminalProviderEvent::Exit {
4769 terminal_id: terminal_id_2.clone(),
4770 status: acp::TerminalExitStatus::new().exit_code(0),
4771 },
4772 cx,
4773 );
4774 });
4775
4776 // Get the second message ID to restore to
4777 let second_message_id = thread.read_with(cx, |thread, _| {
4778 // At this point we have:
4779 // - Index 0: First user message (with checkpoint)
4780 // - Index 1: Second user message (with checkpoint)
4781 // No assistant responses because FakeAgentConnection just returns EndTurn
4782 let AgentThreadEntry::UserMessage(message) = &thread.entries[1] else {
4783 panic!("expected user message at index 1");
4784 };
4785 message.id.clone().unwrap()
4786 });
4787
4788 // Create a terminal AFTER the checkpoint we'll restore to.
4789 // This simulates the AI agent starting a long-running terminal command.
4790 let terminal_id = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
4791 let mock_terminal = cx.new(|cx| {
4792 let builder = ::terminal::TerminalBuilder::new_display_only(
4793 ::terminal::terminal_settings::CursorShape::default(),
4794 ::terminal::terminal_settings::AlternateScroll::On,
4795 None,
4796 0,
4797 cx.background_executor(),
4798 PathStyle::local(),
4799 )
4800 .unwrap();
4801 builder.subscribe(cx)
4802 });
4803
4804 // Register the terminal as created
4805 thread.update(cx, |thread, cx| {
4806 thread.on_terminal_provider_event(
4807 TerminalProviderEvent::Created {
4808 terminal_id: terminal_id.clone(),
4809 label: "sleep 1000".to_string(),
4810 cwd: Some(PathBuf::from("/test")),
4811 output_byte_limit: None,
4812 terminal: mock_terminal.clone(),
4813 },
4814 cx,
4815 );
4816 });
4817
4818 // Simulate the terminal producing output (still running)
4819 thread.update(cx, |thread, cx| {
4820 thread.on_terminal_provider_event(
4821 TerminalProviderEvent::Output {
4822 terminal_id: terminal_id.clone(),
4823 data: b"terminal is running...\n".to_vec(),
4824 },
4825 cx,
4826 );
4827 });
4828
4829 // Create a tool call entry that references this terminal
4830 // This represents the agent requesting a terminal command
4831 thread.update(cx, |thread, cx| {
4832 thread
4833 .handle_session_update(
4834 acp::SessionUpdate::ToolCall(
4835 acp::ToolCall::new("terminal-tool-1", "Running command")
4836 .kind(acp::ToolKind::Execute)
4837 .status(acp::ToolCallStatus::InProgress)
4838 .content(vec![acp::ToolCallContent::Terminal(acp::Terminal::new(
4839 terminal_id.clone(),
4840 ))])
4841 .raw_input(serde_json::json!({"command": "sleep 1000", "cd": "/test"})),
4842 ),
4843 cx,
4844 )
4845 .unwrap();
4846 });
4847
4848 // Verify terminal exists and is in the thread
4849 let terminal_exists_before =
4850 thread.read_with(cx, |thread, _| thread.terminals.contains_key(&terminal_id));
4851 assert!(
4852 terminal_exists_before,
4853 "Terminal should exist before checkpoint restore"
4854 );
4855
4856 // Verify the terminal's underlying task is still running (not completed)
4857 let terminal_running_before = thread.read_with(cx, |thread, _cx| {
4858 let terminal_entity = thread.terminals.get(&terminal_id).unwrap();
4859 terminal_entity.read_with(cx, |term, _cx| {
4860 term.output().is_none() // output is None means it's still running
4861 })
4862 });
4863 assert!(
4864 terminal_running_before,
4865 "Terminal should be running before checkpoint restore"
4866 );
4867
4868 // Verify we have the expected entries before restore
4869 let entry_count_before = thread.read_with(cx, |thread, _| thread.entries.len());
4870 assert!(
4871 entry_count_before > 1,
4872 "Should have multiple entries before restore"
4873 );
4874
4875 // Restore the checkpoint to the second message.
4876 // This should:
4877 // 1. Cancel any in-progress generation (via the cancel() call)
4878 // 2. Remove the terminal that was created after that point
4879 thread
4880 .update(cx, |thread, cx| {
4881 thread.restore_checkpoint(second_message_id, cx)
4882 })
4883 .await
4884 .unwrap();
4885
4886 // Verify that no send_task is in progress after restore
4887 // (cancel() clears the send_task)
4888 let has_send_task_after = thread.read_with(cx, |thread, _| thread.running_turn.is_some());
4889 assert!(
4890 !has_send_task_after,
4891 "Should not have a send_task after restore (cancel should have cleared it)"
4892 );
4893
4894 // Verify the entries were truncated (restoring to index 1 truncates at 1, keeping only index 0)
4895 let entry_count = thread.read_with(cx, |thread, _| thread.entries.len());
4896 assert_eq!(
4897 entry_count, 1,
4898 "Should have 1 entry after restore (only the first user message)"
4899 );
4900
4901 // Verify the 2 completed terminals from before the checkpoint still exist
4902 let terminal_1_exists = thread.read_with(cx, |thread, _| {
4903 thread.terminals.contains_key(&terminal_id_1)
4904 });
4905 assert!(
4906 terminal_1_exists,
4907 "Terminal 1 (from before checkpoint) should still exist"
4908 );
4909
4910 let terminal_2_exists = thread.read_with(cx, |thread, _| {
4911 thread.terminals.contains_key(&terminal_id_2)
4912 });
4913 assert!(
4914 terminal_2_exists,
4915 "Terminal 2 (from before checkpoint) should still exist"
4916 );
4917
4918 // Verify they're still in completed state
4919 let terminal_1_completed = thread.read_with(cx, |thread, _cx| {
4920 let terminal_entity = thread.terminals.get(&terminal_id_1).unwrap();
4921 terminal_entity.read_with(cx, |term, _cx| term.output().is_some())
4922 });
4923 assert!(terminal_1_completed, "Terminal 1 should still be completed");
4924
4925 let terminal_2_completed = thread.read_with(cx, |thread, _cx| {
4926 let terminal_entity = thread.terminals.get(&terminal_id_2).unwrap();
4927 terminal_entity.read_with(cx, |term, _cx| term.output().is_some())
4928 });
4929 assert!(terminal_2_completed, "Terminal 2 should still be completed");
4930
4931 // Verify the running terminal (created after checkpoint) was removed
4932 let terminal_3_exists =
4933 thread.read_with(cx, |thread, _| thread.terminals.contains_key(&terminal_id));
4934 assert!(
4935 !terminal_3_exists,
4936 "Terminal 3 (created after checkpoint) should have been removed"
4937 );
4938
4939 // Verify total count is 2 (the two from before the checkpoint)
4940 let terminal_count = thread.read_with(cx, |thread, _| thread.terminals.len());
4941 assert_eq!(
4942 terminal_count, 2,
4943 "Should have exactly 2 terminals (the completed ones from before checkpoint)"
4944 );
4945 }
4946
4947 /// Tests that update_last_checkpoint correctly updates the original message's checkpoint
4948 /// even when a new user message is added while the async checkpoint comparison is in progress.
4949 ///
4950 /// This is a regression test for a bug where update_last_checkpoint would fail with
4951 /// "no checkpoint" if a new user message (without a checkpoint) was added between when
4952 /// update_last_checkpoint started and when its async closure ran.
4953 #[gpui::test]
4954 async fn test_update_last_checkpoint_with_new_message_added(cx: &mut TestAppContext) {
4955 init_test(cx);
4956
4957 let fs = FakeFs::new(cx.executor());
4958 fs.insert_tree(path!("/test"), json!({".git": {}, "file.txt": "content"}))
4959 .await;
4960 let project = Project::test(fs.clone(), [Path::new(path!("/test"))], cx).await;
4961
4962 let handler_done = Arc::new(AtomicBool::new(false));
4963 let handler_done_clone = handler_done.clone();
4964 let connection = Rc::new(FakeAgentConnection::new().on_user_message(
4965 move |_, _thread, _cx| {
4966 handler_done_clone.store(true, SeqCst);
4967 async move { Ok(acp::PromptResponse::new(acp::StopReason::EndTurn)) }.boxed_local()
4968 },
4969 ));
4970
4971 let thread = cx
4972 .update(|cx| {
4973 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4974 })
4975 .await
4976 .unwrap();
4977
4978 let send_future = thread.update(cx, |thread, cx| thread.send_raw("First message", cx));
4979 let send_task = cx.background_executor.spawn(send_future);
4980
4981 // Tick until handler completes, then a few more to let update_last_checkpoint start
4982 while !handler_done.load(SeqCst) {
4983 cx.executor().tick();
4984 }
4985 for _ in 0..5 {
4986 cx.executor().tick();
4987 }
4988
4989 thread.update(cx, |thread, cx| {
4990 thread.push_entry(
4991 AgentThreadEntry::UserMessage(UserMessage {
4992 id: Some(UserMessageId::new()),
4993 content: ContentBlock::Empty,
4994 chunks: vec!["Injected message (no checkpoint)".into()],
4995 checkpoint: None,
4996 indented: false,
4997 }),
4998 cx,
4999 );
5000 });
5001
5002 cx.run_until_parked();
5003 let result = send_task.await;
5004
5005 assert!(
5006 result.is_ok(),
5007 "send should succeed even when new message added during update_last_checkpoint: {:?}",
5008 result.err()
5009 );
5010 }
5011
5012 /// Tests that when a follow-up message is sent during generation,
5013 /// the first turn completing does NOT clear `running_turn` because
5014 /// it now belongs to the second turn.
5015 #[gpui::test]
5016 async fn test_follow_up_message_during_generation_does_not_clear_turn(cx: &mut TestAppContext) {
5017 init_test(cx);
5018
5019 let fs = FakeFs::new(cx.executor());
5020 let project = Project::test(fs, [], cx).await;
5021
5022 // First handler waits for this signal before completing
5023 let (first_complete_tx, first_complete_rx) = futures::channel::oneshot::channel::<()>();
5024 let first_complete_rx = RefCell::new(Some(first_complete_rx));
5025
5026 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
5027 move |params, _thread, _cx| {
5028 let first_complete_rx = first_complete_rx.borrow_mut().take();
5029 let is_first = params
5030 .prompt
5031 .iter()
5032 .any(|c| matches!(c, acp::ContentBlock::Text(t) if t.text.contains("first")));
5033
5034 async move {
5035 if is_first {
5036 // First handler waits until signaled
5037 if let Some(rx) = first_complete_rx {
5038 rx.await.ok();
5039 }
5040 }
5041 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
5042 }
5043 .boxed_local()
5044 }
5045 }));
5046
5047 let thread = cx
5048 .update(|cx| {
5049 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
5050 })
5051 .await
5052 .unwrap();
5053
5054 // Send first message (turn_id=1) - handler will block
5055 let first_request = thread.update(cx, |thread, cx| thread.send_raw("first", cx));
5056 assert_eq!(thread.read_with(cx, |t, _| t.turn_id), 1);
5057
5058 // Send second message (turn_id=2) while first is still blocked
5059 // This calls cancel() which takes turn 1's running_turn and sets turn 2's
5060 let second_request = thread.update(cx, |thread, cx| thread.send_raw("second", cx));
5061 assert_eq!(thread.read_with(cx, |t, _| t.turn_id), 2);
5062
5063 let running_turn_after_second_send =
5064 thread.read_with(cx, |thread, _| thread.running_turn.as_ref().map(|t| t.id));
5065 assert_eq!(
5066 running_turn_after_second_send,
5067 Some(2),
5068 "running_turn should be set to turn 2 after sending second message"
5069 );
5070
5071 // Now signal first handler to complete
5072 first_complete_tx.send(()).ok();
5073
5074 // First request completes - should NOT clear running_turn
5075 // because running_turn now belongs to turn 2
5076 first_request.await.unwrap();
5077
5078 let running_turn_after_first =
5079 thread.read_with(cx, |thread, _| thread.running_turn.as_ref().map(|t| t.id));
5080 assert_eq!(
5081 running_turn_after_first,
5082 Some(2),
5083 "first turn completing should not clear running_turn (belongs to turn 2)"
5084 );
5085
5086 // Second request completes - SHOULD clear running_turn
5087 second_request.await.unwrap();
5088
5089 let running_turn_after_second =
5090 thread.read_with(cx, |thread, _| thread.running_turn.is_some());
5091 assert!(
5092 !running_turn_after_second,
5093 "second turn completing should clear running_turn"
5094 );
5095 }
5096
5097 #[gpui::test]
5098 async fn test_send_assigns_message_id_without_truncate_support(cx: &mut TestAppContext) {
5099 init_test(cx);
5100
5101 let fs = FakeFs::new(cx.executor());
5102 let project = Project::test(fs, [], cx).await;
5103
5104 let connection = Rc::new(FakeAgentConnection::new().without_truncate_support());
5105 let thread = cx
5106 .update(|cx| {
5107 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
5108 })
5109 .await
5110 .unwrap();
5111
5112 let response = thread
5113 .update(cx, |thread, cx| thread.send_raw("test message", cx))
5114 .await;
5115
5116 assert!(response.is_ok(), "send should not fail: {response:?}");
5117 thread.read_with(cx, |thread, _| {
5118 let AgentThreadEntry::UserMessage(message) = &thread.entries[0] else {
5119 panic!("expected first entry to be a user message")
5120 };
5121 assert!(
5122 message.id.is_some(),
5123 "user message should always have an id"
5124 );
5125 });
5126 }
5127
5128 #[gpui::test]
5129 async fn test_send_returns_cancelled_response_and_marks_tools_as_cancelled(
5130 cx: &mut TestAppContext,
5131 ) {
5132 init_test(cx);
5133
5134 let fs = FakeFs::new(cx.executor());
5135 let project = Project::test(fs, [], cx).await;
5136
5137 let connection = Rc::new(FakeAgentConnection::new().on_user_message(
5138 move |_params, thread, mut cx| {
5139 async move {
5140 thread
5141 .update(&mut cx, |thread, cx| {
5142 thread.handle_session_update(
5143 acp::SessionUpdate::ToolCall(
5144 acp::ToolCall::new(
5145 acp::ToolCallId::new("test-tool"),
5146 "Test Tool",
5147 )
5148 .kind(acp::ToolKind::Fetch)
5149 .status(acp::ToolCallStatus::InProgress),
5150 ),
5151 cx,
5152 )
5153 })
5154 .unwrap()
5155 .unwrap();
5156
5157 Ok(acp::PromptResponse::new(acp::StopReason::Cancelled))
5158 }
5159 .boxed_local()
5160 },
5161 ));
5162
5163 let thread = cx
5164 .update(|cx| {
5165 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
5166 })
5167 .await
5168 .unwrap();
5169
5170 let response = thread
5171 .update(cx, |thread, cx| thread.send_raw("test message", cx))
5172 .await;
5173
5174 let response = response
5175 .expect("send should succeed")
5176 .expect("should have response");
5177 assert_eq!(
5178 response.stop_reason,
5179 acp::StopReason::Cancelled,
5180 "response should have Cancelled stop_reason"
5181 );
5182
5183 thread.read_with(cx, |thread, _| {
5184 let tool_entry = thread
5185 .entries
5186 .iter()
5187 .find_map(|e| {
5188 if let AgentThreadEntry::ToolCall(call) = e {
5189 Some(call)
5190 } else {
5191 None
5192 }
5193 })
5194 .expect("should have tool call entry");
5195
5196 assert!(
5197 matches!(tool_entry.status, ToolCallStatus::Canceled),
5198 "tool should be marked as Canceled when response is Cancelled, got {:?}",
5199 tool_entry.status
5200 );
5201 });
5202 }
5203
5204 #[gpui::test]
5205 async fn test_provisional_title_replaced_by_real_title(cx: &mut TestAppContext) {
5206 init_test(cx);
5207
5208 let fs = FakeFs::new(cx.executor());
5209 let project = Project::test(fs, [], cx).await;
5210 let connection = Rc::new(FakeAgentConnection::new());
5211 let set_title_calls = connection.set_title_calls.clone();
5212
5213 let thread = cx
5214 .update(|cx| {
5215 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
5216 })
5217 .await
5218 .unwrap();
5219
5220 // Initial title is the default.
5221 thread.read_with(cx, |thread, _| {
5222 assert_eq!(thread.title(), None);
5223 });
5224
5225 // Setting a provisional title updates the display title.
5226 thread.update(cx, |thread, cx| {
5227 thread.set_provisional_title("Hello, can you help…".into(), cx);
5228 });
5229 thread.read_with(cx, |thread, _| {
5230 assert_eq!(
5231 thread.title().as_ref().map(|s| s.as_str()),
5232 Some("Hello, can you help…")
5233 );
5234 });
5235
5236 // The provisional title should NOT have propagated to the connection.
5237 assert_eq!(
5238 set_title_calls.borrow().len(),
5239 0,
5240 "provisional title should not propagate to the connection"
5241 );
5242
5243 // When the real title arrives via set_title, it replaces the
5244 // provisional title and propagates to the connection.
5245 let task = thread.update(cx, |thread, cx| {
5246 thread.set_title("Helping with Rust question".into(), cx)
5247 });
5248 task.await.expect("set_title should succeed");
5249 thread.read_with(cx, |thread, _| {
5250 assert_eq!(
5251 thread.title().as_ref().map(|s| s.as_str()),
5252 Some("Helping with Rust question")
5253 );
5254 });
5255 assert_eq!(
5256 set_title_calls.borrow().as_slice(),
5257 &[SharedString::from("Helping with Rust question")],
5258 "real title should propagate to the connection"
5259 );
5260 }
5261
5262 #[gpui::test]
5263 async fn test_session_info_update_replaces_provisional_title_and_emits_event(
5264 cx: &mut TestAppContext,
5265 ) {
5266 init_test(cx);
5267
5268 let fs = FakeFs::new(cx.executor());
5269 let project = Project::test(fs, [], cx).await;
5270 let connection = Rc::new(FakeAgentConnection::new());
5271
5272 let thread = cx
5273 .update(|cx| {
5274 connection.clone().new_session(
5275 project,
5276 PathList::new(&[Path::new(path!("/test"))]),
5277 cx,
5278 )
5279 })
5280 .await
5281 .unwrap();
5282
5283 let title_updated_events = Rc::new(RefCell::new(0usize));
5284 let title_updated_events_for_subscription = title_updated_events.clone();
5285 thread.update(cx, |_thread, cx| {
5286 cx.subscribe(
5287 &thread,
5288 move |_thread, _event_thread, event: &AcpThreadEvent, _cx| {
5289 if matches!(event, AcpThreadEvent::TitleUpdated) {
5290 *title_updated_events_for_subscription.borrow_mut() += 1;
5291 }
5292 },
5293 )
5294 .detach();
5295 });
5296
5297 thread.update(cx, |thread, cx| {
5298 thread.set_provisional_title("Hello, can you help…".into(), cx);
5299 });
5300 assert_eq!(
5301 *title_updated_events.borrow(),
5302 1,
5303 "setting a provisional title should emit TitleUpdated"
5304 );
5305
5306 let result = thread.update(cx, |thread, cx| {
5307 thread.handle_session_update(
5308 acp::SessionUpdate::SessionInfoUpdate(
5309 acp::SessionInfoUpdate::new().title("Helping with Rust question"),
5310 ),
5311 cx,
5312 )
5313 });
5314 result.expect("session info update should succeed");
5315
5316 thread.read_with(cx, |thread, _| {
5317 assert_eq!(
5318 thread.title().as_ref().map(|s| s.as_str()),
5319 Some("Helping with Rust question")
5320 );
5321 assert!(
5322 !thread.has_provisional_title(),
5323 "session info title update should clear provisional title"
5324 );
5325 });
5326
5327 assert_eq!(
5328 *title_updated_events.borrow(),
5329 2,
5330 "session info title update should emit TitleUpdated"
5331 );
5332 assert!(
5333 connection.set_title_calls.borrow().is_empty(),
5334 "session info title update should not propagate back to the connection"
5335 );
5336 }
5337
5338 #[gpui::test]
5339 async fn test_usage_update_populates_token_usage_and_cost(cx: &mut TestAppContext) {
5340 init_test(cx);
5341
5342 let fs = FakeFs::new(cx.executor());
5343 let project = Project::test(fs, [], cx).await;
5344 let connection = Rc::new(FakeAgentConnection::new());
5345 let thread = cx
5346 .update(|cx| {
5347 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
5348 })
5349 .await
5350 .unwrap();
5351
5352 thread.update(cx, |thread, cx| {
5353 thread
5354 .handle_session_update(
5355 acp::SessionUpdate::UsageUpdate(
5356 acp::UsageUpdate::new(5000, 10000).cost(acp::Cost::new(0.42, "USD")),
5357 ),
5358 cx,
5359 )
5360 .unwrap();
5361 });
5362
5363 thread.read_with(cx, |thread, _| {
5364 let usage = thread.token_usage().expect("token_usage should be set");
5365 assert_eq!(usage.max_tokens, 10000);
5366 assert_eq!(usage.used_tokens, 5000);
5367
5368 let cost = thread.cost().expect("cost should be set");
5369 assert!((cost.amount - 0.42).abs() < f64::EPSILON);
5370 assert_eq!(cost.currency.as_ref(), "USD");
5371 });
5372 }
5373
5374 #[gpui::test]
5375 async fn test_usage_update_without_cost_preserves_existing_cost(cx: &mut TestAppContext) {
5376 init_test(cx);
5377
5378 let fs = FakeFs::new(cx.executor());
5379 let project = Project::test(fs, [], cx).await;
5380 let connection = Rc::new(FakeAgentConnection::new());
5381 let thread = cx
5382 .update(|cx| {
5383 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
5384 })
5385 .await
5386 .unwrap();
5387
5388 thread.update(cx, |thread, cx| {
5389 thread
5390 .handle_session_update(
5391 acp::SessionUpdate::UsageUpdate(
5392 acp::UsageUpdate::new(1000, 10000).cost(acp::Cost::new(0.10, "USD")),
5393 ),
5394 cx,
5395 )
5396 .unwrap();
5397
5398 thread
5399 .handle_session_update(
5400 acp::SessionUpdate::UsageUpdate(acp::UsageUpdate::new(2000, 10000)),
5401 cx,
5402 )
5403 .unwrap();
5404 });
5405
5406 thread.read_with(cx, |thread, _| {
5407 let usage = thread.token_usage().expect("token_usage should be set");
5408 assert_eq!(usage.used_tokens, 2000);
5409
5410 let cost = thread.cost().expect("cost should be preserved");
5411 assert!((cost.amount - 0.10).abs() < f64::EPSILON);
5412 });
5413 }
5414
5415 #[gpui::test]
5416 async fn test_response_usage_does_not_clobber_session_usage(cx: &mut TestAppContext) {
5417 init_test(cx);
5418
5419 let fs = FakeFs::new(cx.executor());
5420 let project = Project::test(fs, [], cx).await;
5421 let connection = Rc::new(FakeAgentConnection::new().on_user_message(
5422 move |_, thread, mut cx| {
5423 async move {
5424 thread.update(&mut cx, |thread, cx| {
5425 thread
5426 .handle_session_update(
5427 acp::SessionUpdate::UsageUpdate(
5428 acp::UsageUpdate::new(3000, 10000)
5429 .cost(acp::Cost::new(0.05, "EUR")),
5430 ),
5431 cx,
5432 )
5433 .unwrap();
5434 })?;
5435 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn)
5436 .usage(acp::Usage::new(500, 200, 300)))
5437 }
5438 .boxed_local()
5439 },
5440 ));
5441
5442 let thread = cx
5443 .update(|cx| {
5444 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
5445 })
5446 .await
5447 .unwrap();
5448
5449 thread
5450 .update(cx, |thread, cx| thread.send_raw("hello", cx))
5451 .await
5452 .unwrap();
5453
5454 thread.read_with(cx, |thread, _| {
5455 let usage = thread.token_usage().expect("token_usage should be set");
5456 assert_eq!(usage.max_tokens, 10000, "max_tokens from UsageUpdate");
5457 assert_eq!(usage.used_tokens, 3000, "used_tokens from UsageUpdate");
5458 assert_eq!(usage.input_tokens, 200, "input_tokens from response usage");
5459 assert_eq!(
5460 usage.output_tokens, 300,
5461 "output_tokens from response usage"
5462 );
5463
5464 let cost = thread.cost().expect("cost should be set");
5465 assert!((cost.amount - 0.05).abs() < f64::EPSILON);
5466 assert_eq!(cost.currency.as_ref(), "EUR");
5467 });
5468 }
5469
5470 #[gpui::test]
5471 async fn test_clearing_token_usage_also_clears_cost(cx: &mut TestAppContext) {
5472 init_test(cx);
5473
5474 let fs = FakeFs::new(cx.executor());
5475 let project = Project::test(fs, [], cx).await;
5476 let connection = Rc::new(FakeAgentConnection::new());
5477 let thread = cx
5478 .update(|cx| {
5479 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
5480 })
5481 .await
5482 .unwrap();
5483
5484 thread.update(cx, |thread, cx| {
5485 thread
5486 .handle_session_update(
5487 acp::SessionUpdate::UsageUpdate(
5488 acp::UsageUpdate::new(1000, 10000).cost(acp::Cost::new(0.25, "USD")),
5489 ),
5490 cx,
5491 )
5492 .unwrap();
5493
5494 assert!(thread.token_usage().is_some());
5495 assert!(thread.cost().is_some());
5496
5497 thread.update_token_usage(None, cx);
5498
5499 assert!(thread.token_usage().is_none());
5500 assert!(
5501 thread.cost().is_none(),
5502 "cost should be cleared when token usage is cleared"
5503 );
5504 });
5505 }
5506}