1mod connection;
2mod diff;
3mod mention;
4mod terminal;
5
6/// Key used in ACP ToolCall meta to store the tool's programmatic name.
7/// This is a workaround since ACP's ToolCall doesn't have a dedicated name field.
8pub const TOOL_NAME_META_KEY: &str = "tool_name";
9
10/// Key used in ACP ToolCall meta to store the session id when a subagent is spawned.
11pub const SUBAGENT_SESSION_ID_META_KEY: &str = "subagent_session_id";
12
13/// Helper to extract tool name from ACP meta
14pub fn tool_name_from_meta(meta: &Option<acp::Meta>) -> Option<SharedString> {
15 meta.as_ref()
16 .and_then(|m| m.get(TOOL_NAME_META_KEY))
17 .and_then(|v| v.as_str())
18 .map(|s| SharedString::from(s.to_owned()))
19}
20
21/// Helper to extract subagent session id from ACP meta
22pub fn subagent_session_id_from_meta(meta: &Option<acp::Meta>) -> Option<acp::SessionId> {
23 meta.as_ref()
24 .and_then(|m| m.get(SUBAGENT_SESSION_ID_META_KEY))
25 .and_then(|v| v.as_str())
26 .map(|s| acp::SessionId::from(s.to_string()))
27}
28
29/// Helper to create meta with tool name
30pub fn meta_with_tool_name(tool_name: &str) -> acp::Meta {
31 acp::Meta::from_iter([(TOOL_NAME_META_KEY.into(), tool_name.into())])
32}
33use collections::HashSet;
34pub use connection::*;
35pub use diff::*;
36use language::language_settings::FormatOnSave;
37pub use mention::*;
38use project::lsp_store::{FormatTrigger, LspFormatTarget};
39use serde::{Deserialize, Serialize};
40use serde_json::to_string_pretty;
41
42use task::{Shell, ShellBuilder};
43pub use terminal::*;
44
45use action_log::{ActionLog, ActionLogTelemetry};
46use agent_client_protocol::{self as acp};
47use anyhow::{Context as _, Result, anyhow};
48use futures::{FutureExt, channel::oneshot, future::BoxFuture};
49use gpui::{AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Task, WeakEntity};
50use itertools::Itertools;
51use language::{Anchor, Buffer, BufferSnapshot, LanguageRegistry, Point, ToPoint, text_diff};
52use markdown::Markdown;
53use project::{AgentLocation, Project, git_store::GitStoreCheckpoint};
54use std::collections::HashMap;
55use std::error::Error;
56use std::fmt::{Formatter, Write};
57use std::ops::Range;
58use std::process::ExitStatus;
59use std::rc::Rc;
60use std::time::{Duration, Instant};
61use std::{fmt::Display, mem, path::PathBuf, sync::Arc};
62use text::Bias;
63use ui::App;
64use util::{ResultExt, get_default_system_shell_preferring_bash, paths::PathStyle};
65use uuid::Uuid;
66
67#[derive(Debug)]
68pub struct UserMessage {
69 pub id: Option<UserMessageId>,
70 pub content: ContentBlock,
71 pub chunks: Vec<acp::ContentBlock>,
72 pub checkpoint: Option<Checkpoint>,
73 pub indented: bool,
74}
75
76#[derive(Debug)]
77pub struct Checkpoint {
78 git_checkpoint: GitStoreCheckpoint,
79 pub show: bool,
80}
81
82impl UserMessage {
83 fn to_markdown(&self, cx: &App) -> String {
84 let mut markdown = String::new();
85 if self
86 .checkpoint
87 .as_ref()
88 .is_some_and(|checkpoint| checkpoint.show)
89 {
90 writeln!(markdown, "## User (checkpoint)").unwrap();
91 } else {
92 writeln!(markdown, "## User").unwrap();
93 }
94 writeln!(markdown).unwrap();
95 writeln!(markdown, "{}", self.content.to_markdown(cx)).unwrap();
96 writeln!(markdown).unwrap();
97 markdown
98 }
99}
100
101#[derive(Debug, PartialEq)]
102pub struct AssistantMessage {
103 pub chunks: Vec<AssistantMessageChunk>,
104 pub indented: bool,
105}
106
107impl AssistantMessage {
108 pub fn to_markdown(&self, cx: &App) -> String {
109 format!(
110 "## Assistant\n\n{}\n\n",
111 self.chunks
112 .iter()
113 .map(|chunk| chunk.to_markdown(cx))
114 .join("\n\n")
115 )
116 }
117}
118
119#[derive(Debug, PartialEq)]
120pub enum AssistantMessageChunk {
121 Message { block: ContentBlock },
122 Thought { block: ContentBlock },
123}
124
125impl AssistantMessageChunk {
126 pub fn from_str(
127 chunk: &str,
128 language_registry: &Arc<LanguageRegistry>,
129 path_style: PathStyle,
130 cx: &mut App,
131 ) -> Self {
132 Self::Message {
133 block: ContentBlock::new(chunk.into(), language_registry, path_style, cx),
134 }
135 }
136
137 fn to_markdown(&self, cx: &App) -> String {
138 match self {
139 Self::Message { block } => block.to_markdown(cx).to_string(),
140 Self::Thought { block } => {
141 format!("<thinking>\n{}\n</thinking>", block.to_markdown(cx))
142 }
143 }
144 }
145}
146
147#[derive(Debug)]
148pub enum AgentThreadEntry {
149 UserMessage(UserMessage),
150 AssistantMessage(AssistantMessage),
151 ToolCall(ToolCall),
152}
153
154impl AgentThreadEntry {
155 pub fn is_indented(&self) -> bool {
156 match self {
157 Self::UserMessage(message) => message.indented,
158 Self::AssistantMessage(message) => message.indented,
159 Self::ToolCall(_) => false,
160 }
161 }
162
163 pub fn to_markdown(&self, cx: &App) -> String {
164 match self {
165 Self::UserMessage(message) => message.to_markdown(cx),
166 Self::AssistantMessage(message) => message.to_markdown(cx),
167 Self::ToolCall(tool_call) => tool_call.to_markdown(cx),
168 }
169 }
170
171 pub fn user_message(&self) -> Option<&UserMessage> {
172 if let AgentThreadEntry::UserMessage(message) = self {
173 Some(message)
174 } else {
175 None
176 }
177 }
178
179 pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
180 if let AgentThreadEntry::ToolCall(call) = self {
181 itertools::Either::Left(call.diffs())
182 } else {
183 itertools::Either::Right(std::iter::empty())
184 }
185 }
186
187 pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
188 if let AgentThreadEntry::ToolCall(call) = self {
189 itertools::Either::Left(call.terminals())
190 } else {
191 itertools::Either::Right(std::iter::empty())
192 }
193 }
194
195 pub fn location(&self, ix: usize) -> Option<(acp::ToolCallLocation, AgentLocation)> {
196 if let AgentThreadEntry::ToolCall(ToolCall {
197 locations,
198 resolved_locations,
199 ..
200 }) = self
201 {
202 Some((
203 locations.get(ix)?.clone(),
204 resolved_locations.get(ix)?.clone()?,
205 ))
206 } else {
207 None
208 }
209 }
210}
211
212#[derive(Debug)]
213pub struct ToolCall {
214 pub id: acp::ToolCallId,
215 pub label: Entity<Markdown>,
216 pub kind: acp::ToolKind,
217 pub content: Vec<ToolCallContent>,
218 pub status: ToolCallStatus,
219 pub locations: Vec<acp::ToolCallLocation>,
220 pub resolved_locations: Vec<Option<AgentLocation>>,
221 pub raw_input: Option<serde_json::Value>,
222 pub raw_input_markdown: Option<Entity<Markdown>>,
223 pub raw_output: Option<serde_json::Value>,
224 pub tool_name: Option<SharedString>,
225 pub subagent_session_id: Option<acp::SessionId>,
226}
227
228impl ToolCall {
229 fn from_acp(
230 tool_call: acp::ToolCall,
231 status: ToolCallStatus,
232 language_registry: Arc<LanguageRegistry>,
233 path_style: PathStyle,
234 terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
235 cx: &mut App,
236 ) -> Result<Self> {
237 let title = if tool_call.kind == acp::ToolKind::Execute {
238 tool_call.title
239 } else if let Some((first_line, _)) = tool_call.title.split_once("\n") {
240 first_line.to_owned() + "…"
241 } else {
242 tool_call.title
243 };
244 let mut content = Vec::with_capacity(tool_call.content.len());
245 for item in tool_call.content {
246 if let Some(item) = ToolCallContent::from_acp(
247 item,
248 language_registry.clone(),
249 path_style,
250 terminals,
251 cx,
252 )? {
253 content.push(item);
254 }
255 }
256
257 let raw_input_markdown = tool_call
258 .raw_input
259 .as_ref()
260 .and_then(|input| markdown_for_raw_output(input, &language_registry, cx));
261
262 let tool_name = tool_name_from_meta(&tool_call.meta);
263
264 let subagent_session = subagent_session_id_from_meta(&tool_call.meta);
265
266 let result = Self {
267 id: tool_call.tool_call_id,
268 label: cx
269 .new(|cx| Markdown::new(title.into(), Some(language_registry.clone()), None, cx)),
270 kind: tool_call.kind,
271 content,
272 locations: tool_call.locations,
273 resolved_locations: Vec::default(),
274 status,
275 raw_input: tool_call.raw_input,
276 raw_input_markdown,
277 raw_output: tool_call.raw_output,
278 tool_name,
279 subagent_session_id: subagent_session,
280 };
281 Ok(result)
282 }
283
284 fn update_fields(
285 &mut self,
286 fields: acp::ToolCallUpdateFields,
287 meta: Option<acp::Meta>,
288 language_registry: Arc<LanguageRegistry>,
289 path_style: PathStyle,
290 terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
291 cx: &mut App,
292 ) -> Result<()> {
293 let acp::ToolCallUpdateFields {
294 kind,
295 status,
296 title,
297 content,
298 locations,
299 raw_input,
300 raw_output,
301 ..
302 } = fields;
303
304 if let Some(kind) = kind {
305 self.kind = kind;
306 }
307
308 if let Some(status) = status {
309 self.status = status.into();
310 }
311
312 if let Some(subagent_session_id) = subagent_session_id_from_meta(&meta) {
313 self.subagent_session_id = Some(subagent_session_id);
314 }
315
316 if let Some(title) = title {
317 if self.kind == acp::ToolKind::Execute {
318 for terminal in self.terminals() {
319 terminal.update(cx, |terminal, cx| {
320 terminal.update_command_label(&title, cx);
321 });
322 }
323 }
324 self.label.update(cx, |label, cx| {
325 if self.kind == acp::ToolKind::Execute {
326 label.replace(title, cx);
327 } else if let Some((first_line, _)) = title.split_once("\n") {
328 label.replace(first_line.to_owned() + "…", cx);
329 } else {
330 label.replace(title, cx);
331 }
332 });
333 }
334
335 if let Some(content) = content {
336 let mut new_content_len = content.len();
337 let mut content = content.into_iter();
338
339 // Reuse existing content if we can
340 for (old, new) in self.content.iter_mut().zip(content.by_ref()) {
341 let valid_content =
342 old.update_from_acp(new, language_registry.clone(), path_style, terminals, cx)?;
343 if !valid_content {
344 new_content_len -= 1;
345 }
346 }
347 for new in content {
348 if let Some(new) = ToolCallContent::from_acp(
349 new,
350 language_registry.clone(),
351 path_style,
352 terminals,
353 cx,
354 )? {
355 self.content.push(new);
356 } else {
357 new_content_len -= 1;
358 }
359 }
360 self.content.truncate(new_content_len);
361 }
362
363 if let Some(locations) = locations {
364 self.locations = locations;
365 }
366
367 if let Some(raw_input) = raw_input {
368 self.raw_input_markdown = markdown_for_raw_output(&raw_input, &language_registry, cx);
369 self.raw_input = Some(raw_input);
370 }
371
372 if let Some(raw_output) = raw_output {
373 if self.content.is_empty()
374 && let Some(markdown) = markdown_for_raw_output(&raw_output, &language_registry, cx)
375 {
376 self.content
377 .push(ToolCallContent::ContentBlock(ContentBlock::Markdown {
378 markdown,
379 }));
380 }
381 self.raw_output = Some(raw_output);
382 }
383 Ok(())
384 }
385
386 pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
387 self.content.iter().filter_map(|content| match content {
388 ToolCallContent::Diff(diff) => Some(diff),
389 ToolCallContent::ContentBlock(_) => None,
390 ToolCallContent::Terminal(_) => None,
391 })
392 }
393
394 pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
395 self.content.iter().filter_map(|content| match content {
396 ToolCallContent::Terminal(terminal) => Some(terminal),
397 ToolCallContent::ContentBlock(_) => None,
398 ToolCallContent::Diff(_) => None,
399 })
400 }
401
402 pub fn is_subagent(&self) -> bool {
403 self.tool_name.as_ref().is_some_and(|s| s == "subagent")
404 || self.subagent_session_id.is_some()
405 }
406
407 pub fn to_markdown(&self, cx: &App) -> String {
408 let mut markdown = format!(
409 "**Tool Call: {}**\nStatus: {}\n\n",
410 self.label.read(cx).source(),
411 self.status
412 );
413 for content in &self.content {
414 markdown.push_str(content.to_markdown(cx).as_str());
415 markdown.push_str("\n\n");
416 }
417 markdown
418 }
419
420 async fn resolve_location(
421 location: acp::ToolCallLocation,
422 project: WeakEntity<Project>,
423 cx: &mut AsyncApp,
424 ) -> Option<ResolvedLocation> {
425 let buffer = project
426 .update(cx, |project, cx| {
427 project
428 .project_path_for_absolute_path(&location.path, cx)
429 .map(|path| project.open_buffer(path, cx))
430 })
431 .ok()??;
432 let buffer = buffer.await.log_err()?;
433 let position = buffer.update(cx, |buffer, _| {
434 let snapshot = buffer.snapshot();
435 if let Some(row) = location.line {
436 let column = snapshot.indent_size_for_line(row).len;
437 let point = snapshot.clip_point(Point::new(row, column), Bias::Left);
438 snapshot.anchor_before(point)
439 } else {
440 Anchor::min_for_buffer(snapshot.remote_id())
441 }
442 });
443
444 Some(ResolvedLocation { buffer, position })
445 }
446
447 fn resolve_locations(
448 &self,
449 project: Entity<Project>,
450 cx: &mut App,
451 ) -> Task<Vec<Option<ResolvedLocation>>> {
452 let locations = self.locations.clone();
453 project.update(cx, |_, cx| {
454 cx.spawn(async move |project, cx| {
455 let mut new_locations = Vec::new();
456 for location in locations {
457 new_locations.push(Self::resolve_location(location, project.clone(), cx).await);
458 }
459 new_locations
460 })
461 })
462 }
463}
464
465// Separate so we can hold a strong reference to the buffer
466// for saving on the thread
467#[derive(Clone, Debug, PartialEq, Eq)]
468struct ResolvedLocation {
469 buffer: Entity<Buffer>,
470 position: Anchor,
471}
472
473impl From<&ResolvedLocation> for AgentLocation {
474 fn from(value: &ResolvedLocation) -> Self {
475 Self {
476 buffer: value.buffer.downgrade(),
477 position: value.position,
478 }
479 }
480}
481
482#[derive(Debug)]
483pub enum ToolCallStatus {
484 /// The tool call hasn't started running yet, but we start showing it to
485 /// the user.
486 Pending,
487 /// The tool call is waiting for confirmation from the user.
488 WaitingForConfirmation {
489 options: PermissionOptions,
490 respond_tx: oneshot::Sender<acp::PermissionOptionId>,
491 },
492 /// The tool call is currently running.
493 InProgress,
494 /// The tool call completed successfully.
495 Completed,
496 /// The tool call failed.
497 Failed,
498 /// The user rejected the tool call.
499 Rejected,
500 /// The user canceled generation so the tool call was canceled.
501 Canceled,
502}
503
504impl From<acp::ToolCallStatus> for ToolCallStatus {
505 fn from(status: acp::ToolCallStatus) -> Self {
506 match status {
507 acp::ToolCallStatus::Pending => Self::Pending,
508 acp::ToolCallStatus::InProgress => Self::InProgress,
509 acp::ToolCallStatus::Completed => Self::Completed,
510 acp::ToolCallStatus::Failed => Self::Failed,
511 _ => Self::Pending,
512 }
513 }
514}
515
516impl Display for ToolCallStatus {
517 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
518 write!(
519 f,
520 "{}",
521 match self {
522 ToolCallStatus::Pending => "Pending",
523 ToolCallStatus::WaitingForConfirmation { .. } => "Waiting for confirmation",
524 ToolCallStatus::InProgress => "In Progress",
525 ToolCallStatus::Completed => "Completed",
526 ToolCallStatus::Failed => "Failed",
527 ToolCallStatus::Rejected => "Rejected",
528 ToolCallStatus::Canceled => "Canceled",
529 }
530 )
531 }
532}
533
534#[derive(Debug, PartialEq, Clone)]
535pub enum ContentBlock {
536 Empty,
537 Markdown { markdown: Entity<Markdown> },
538 ResourceLink { resource_link: acp::ResourceLink },
539 Image { image: Arc<gpui::Image> },
540}
541
542impl ContentBlock {
543 pub fn new(
544 block: acp::ContentBlock,
545 language_registry: &Arc<LanguageRegistry>,
546 path_style: PathStyle,
547 cx: &mut App,
548 ) -> Self {
549 let mut this = Self::Empty;
550 this.append(block, language_registry, path_style, cx);
551 this
552 }
553
554 pub fn new_combined(
555 blocks: impl IntoIterator<Item = acp::ContentBlock>,
556 language_registry: Arc<LanguageRegistry>,
557 path_style: PathStyle,
558 cx: &mut App,
559 ) -> Self {
560 let mut this = Self::Empty;
561 for block in blocks {
562 this.append(block, &language_registry, path_style, cx);
563 }
564 this
565 }
566
567 pub fn append(
568 &mut self,
569 block: acp::ContentBlock,
570 language_registry: &Arc<LanguageRegistry>,
571 path_style: PathStyle,
572 cx: &mut App,
573 ) {
574 match (&mut *self, &block) {
575 (ContentBlock::Empty, acp::ContentBlock::ResourceLink(resource_link)) => {
576 *self = ContentBlock::ResourceLink {
577 resource_link: resource_link.clone(),
578 };
579 }
580 (ContentBlock::Empty, acp::ContentBlock::Image(image_content)) => {
581 if let Some(image) = Self::decode_image(image_content) {
582 *self = ContentBlock::Image { image };
583 } else {
584 let new_content = Self::image_md(image_content);
585 *self = Self::create_markdown_block(new_content, language_registry, cx);
586 }
587 }
588 (ContentBlock::Empty, _) => {
589 let new_content = Self::block_string_contents(&block, path_style);
590 *self = Self::create_markdown_block(new_content, language_registry, cx);
591 }
592 (ContentBlock::Markdown { markdown }, _) => {
593 let new_content = Self::block_string_contents(&block, path_style);
594 markdown.update(cx, |markdown, cx| markdown.append(&new_content, cx));
595 }
596 (ContentBlock::ResourceLink { resource_link }, _) => {
597 let existing_content = Self::resource_link_md(&resource_link.uri, path_style);
598 let new_content = Self::block_string_contents(&block, path_style);
599 let combined = format!("{}\n{}", existing_content, new_content);
600 *self = Self::create_markdown_block(combined, language_registry, cx);
601 }
602 (ContentBlock::Image { .. }, _) => {
603 let new_content = Self::block_string_contents(&block, path_style);
604 let combined = format!("`Image`\n{}", new_content);
605 *self = Self::create_markdown_block(combined, language_registry, cx);
606 }
607 }
608 }
609
610 fn decode_image(image_content: &acp::ImageContent) -> Option<Arc<gpui::Image>> {
611 use base64::Engine as _;
612
613 let bytes = base64::engine::general_purpose::STANDARD
614 .decode(image_content.data.as_bytes())
615 .ok()?;
616 let format = gpui::ImageFormat::from_mime_type(&image_content.mime_type)?;
617 Some(Arc::new(gpui::Image::from_bytes(format, bytes)))
618 }
619
620 fn create_markdown_block(
621 content: String,
622 language_registry: &Arc<LanguageRegistry>,
623 cx: &mut App,
624 ) -> ContentBlock {
625 ContentBlock::Markdown {
626 markdown: cx
627 .new(|cx| Markdown::new(content.into(), Some(language_registry.clone()), None, cx)),
628 }
629 }
630
631 fn block_string_contents(block: &acp::ContentBlock, path_style: PathStyle) -> String {
632 match block {
633 acp::ContentBlock::Text(text_content) => text_content.text.clone(),
634 acp::ContentBlock::ResourceLink(resource_link) => {
635 Self::resource_link_md(&resource_link.uri, path_style)
636 }
637 acp::ContentBlock::Resource(acp::EmbeddedResource {
638 resource:
639 acp::EmbeddedResourceResource::TextResourceContents(acp::TextResourceContents {
640 uri,
641 ..
642 }),
643 ..
644 }) => Self::resource_link_md(uri, path_style),
645 acp::ContentBlock::Image(image) => Self::image_md(image),
646 _ => String::new(),
647 }
648 }
649
650 fn resource_link_md(uri: &str, path_style: PathStyle) -> String {
651 if let Some(uri) = MentionUri::parse(uri, path_style).log_err() {
652 uri.as_link().to_string()
653 } else {
654 uri.to_string()
655 }
656 }
657
658 fn image_md(_image: &acp::ImageContent) -> String {
659 "`Image`".into()
660 }
661
662 pub fn to_markdown<'a>(&'a self, cx: &'a App) -> &'a str {
663 match self {
664 ContentBlock::Empty => "",
665 ContentBlock::Markdown { markdown } => markdown.read(cx).source(),
666 ContentBlock::ResourceLink { resource_link } => &resource_link.uri,
667 ContentBlock::Image { .. } => "`Image`",
668 }
669 }
670
671 pub fn markdown(&self) -> Option<&Entity<Markdown>> {
672 match self {
673 ContentBlock::Empty => None,
674 ContentBlock::Markdown { markdown } => Some(markdown),
675 ContentBlock::ResourceLink { .. } => None,
676 ContentBlock::Image { .. } => None,
677 }
678 }
679
680 pub fn resource_link(&self) -> Option<&acp::ResourceLink> {
681 match self {
682 ContentBlock::ResourceLink { resource_link } => Some(resource_link),
683 _ => None,
684 }
685 }
686
687 pub fn image(&self) -> Option<&Arc<gpui::Image>> {
688 match self {
689 ContentBlock::Image { image } => Some(image),
690 _ => None,
691 }
692 }
693}
694
695#[derive(Debug)]
696pub enum ToolCallContent {
697 ContentBlock(ContentBlock),
698 Diff(Entity<Diff>),
699 Terminal(Entity<Terminal>),
700}
701
702impl ToolCallContent {
703 pub fn from_acp(
704 content: acp::ToolCallContent,
705 language_registry: Arc<LanguageRegistry>,
706 path_style: PathStyle,
707 terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
708 cx: &mut App,
709 ) -> Result<Option<Self>> {
710 match content {
711 acp::ToolCallContent::Content(acp::Content { content, .. }) => {
712 Ok(Some(Self::ContentBlock(ContentBlock::new(
713 content,
714 &language_registry,
715 path_style,
716 cx,
717 ))))
718 }
719 acp::ToolCallContent::Diff(diff) => Ok(Some(Self::Diff(cx.new(|cx| {
720 Diff::finalized(
721 diff.path.to_string_lossy().into_owned(),
722 diff.old_text,
723 diff.new_text,
724 language_registry,
725 cx,
726 )
727 })))),
728 acp::ToolCallContent::Terminal(acp::Terminal { terminal_id, .. }) => terminals
729 .get(&terminal_id)
730 .cloned()
731 .map(|terminal| Some(Self::Terminal(terminal)))
732 .ok_or_else(|| anyhow::anyhow!("Terminal with id `{}` not found", terminal_id)),
733 _ => Ok(None),
734 }
735 }
736
737 pub fn update_from_acp(
738 &mut self,
739 new: acp::ToolCallContent,
740 language_registry: Arc<LanguageRegistry>,
741 path_style: PathStyle,
742 terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
743 cx: &mut App,
744 ) -> Result<bool> {
745 let needs_update = match (&self, &new) {
746 (Self::Diff(old_diff), acp::ToolCallContent::Diff(new_diff)) => {
747 old_diff.read(cx).needs_update(
748 new_diff.old_text.as_deref().unwrap_or(""),
749 &new_diff.new_text,
750 cx,
751 )
752 }
753 _ => true,
754 };
755
756 if let Some(update) = Self::from_acp(new, language_registry, path_style, terminals, cx)? {
757 if needs_update {
758 *self = update;
759 }
760 Ok(true)
761 } else {
762 Ok(false)
763 }
764 }
765
766 pub fn to_markdown(&self, cx: &App) -> String {
767 match self {
768 Self::ContentBlock(content) => content.to_markdown(cx).to_string(),
769 Self::Diff(diff) => diff.read(cx).to_markdown(cx),
770 Self::Terminal(terminal) => terminal.read(cx).to_markdown(cx),
771 }
772 }
773
774 pub fn image(&self) -> Option<&Arc<gpui::Image>> {
775 match self {
776 Self::ContentBlock(content) => content.image(),
777 _ => None,
778 }
779 }
780}
781
782#[derive(Debug, PartialEq)]
783pub enum ToolCallUpdate {
784 UpdateFields(acp::ToolCallUpdate),
785 UpdateDiff(ToolCallUpdateDiff),
786 UpdateTerminal(ToolCallUpdateTerminal),
787}
788
789impl ToolCallUpdate {
790 fn id(&self) -> &acp::ToolCallId {
791 match self {
792 Self::UpdateFields(update) => &update.tool_call_id,
793 Self::UpdateDiff(diff) => &diff.id,
794 Self::UpdateTerminal(terminal) => &terminal.id,
795 }
796 }
797}
798
799impl From<acp::ToolCallUpdate> for ToolCallUpdate {
800 fn from(update: acp::ToolCallUpdate) -> Self {
801 Self::UpdateFields(update)
802 }
803}
804
805impl From<ToolCallUpdateDiff> for ToolCallUpdate {
806 fn from(diff: ToolCallUpdateDiff) -> Self {
807 Self::UpdateDiff(diff)
808 }
809}
810
811#[derive(Debug, PartialEq)]
812pub struct ToolCallUpdateDiff {
813 pub id: acp::ToolCallId,
814 pub diff: Entity<Diff>,
815}
816
817impl From<ToolCallUpdateTerminal> for ToolCallUpdate {
818 fn from(terminal: ToolCallUpdateTerminal) -> Self {
819 Self::UpdateTerminal(terminal)
820 }
821}
822
823#[derive(Debug, PartialEq)]
824pub struct ToolCallUpdateTerminal {
825 pub id: acp::ToolCallId,
826 pub terminal: Entity<Terminal>,
827}
828
829#[derive(Debug, Default)]
830pub struct Plan {
831 pub entries: Vec<PlanEntry>,
832}
833
834#[derive(Debug)]
835pub struct PlanStats<'a> {
836 pub in_progress_entry: Option<&'a PlanEntry>,
837 pub pending: u32,
838 pub completed: u32,
839}
840
841impl Plan {
842 pub fn is_empty(&self) -> bool {
843 self.entries.is_empty()
844 }
845
846 pub fn stats(&self) -> PlanStats<'_> {
847 let mut stats = PlanStats {
848 in_progress_entry: None,
849 pending: 0,
850 completed: 0,
851 };
852
853 for entry in &self.entries {
854 match &entry.status {
855 acp::PlanEntryStatus::Pending => {
856 stats.pending += 1;
857 }
858 acp::PlanEntryStatus::InProgress => {
859 stats.in_progress_entry = stats.in_progress_entry.or(Some(entry));
860 }
861 acp::PlanEntryStatus::Completed => {
862 stats.completed += 1;
863 }
864 _ => {}
865 }
866 }
867
868 stats
869 }
870}
871
872#[derive(Debug)]
873pub struct PlanEntry {
874 pub content: Entity<Markdown>,
875 pub priority: acp::PlanEntryPriority,
876 pub status: acp::PlanEntryStatus,
877}
878
879impl PlanEntry {
880 pub fn from_acp(entry: acp::PlanEntry, cx: &mut App) -> Self {
881 Self {
882 content: cx.new(|cx| Markdown::new(entry.content.into(), None, None, cx)),
883 priority: entry.priority,
884 status: entry.status,
885 }
886 }
887}
888
889#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
890pub struct TokenUsage {
891 pub max_tokens: u64,
892 pub used_tokens: u64,
893 pub input_tokens: u64,
894 pub output_tokens: u64,
895}
896
897impl TokenUsage {
898 pub fn ratio(&self) -> TokenUsageRatio {
899 #[cfg(debug_assertions)]
900 let warning_threshold: f32 = std::env::var("ZED_THREAD_WARNING_THRESHOLD")
901 .unwrap_or("0.8".to_string())
902 .parse()
903 .unwrap();
904 #[cfg(not(debug_assertions))]
905 let warning_threshold: f32 = 0.8;
906
907 // When the maximum is unknown because there is no selected model,
908 // avoid showing the token limit warning.
909 if self.max_tokens == 0 {
910 TokenUsageRatio::Normal
911 } else if self.used_tokens >= self.max_tokens {
912 TokenUsageRatio::Exceeded
913 } else if self.used_tokens as f32 / self.max_tokens as f32 >= warning_threshold {
914 TokenUsageRatio::Warning
915 } else {
916 TokenUsageRatio::Normal
917 }
918 }
919}
920
921#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
922pub enum TokenUsageRatio {
923 Normal,
924 Warning,
925 Exceeded,
926}
927
928#[derive(Debug, Clone)]
929pub struct RetryStatus {
930 pub last_error: SharedString,
931 pub attempt: usize,
932 pub max_attempts: usize,
933 pub started_at: Instant,
934 pub duration: Duration,
935}
936
937pub struct AcpThread {
938 parent_session_id: Option<acp::SessionId>,
939 title: SharedString,
940 entries: Vec<AgentThreadEntry>,
941 plan: Plan,
942 project: Entity<Project>,
943 action_log: Entity<ActionLog>,
944 shared_buffers: HashMap<Entity<Buffer>, BufferSnapshot>,
945 send_task: Option<Task<()>>,
946 connection: Rc<dyn AgentConnection>,
947 session_id: acp::SessionId,
948 token_usage: Option<TokenUsage>,
949 prompt_capabilities: acp::PromptCapabilities,
950 _observe_prompt_capabilities: Task<anyhow::Result<()>>,
951 terminals: HashMap<acp::TerminalId, Entity<Terminal>>,
952 pending_terminal_output: HashMap<acp::TerminalId, Vec<Vec<u8>>>,
953 pending_terminal_exit: HashMap<acp::TerminalId, acp::TerminalExitStatus>,
954 // subagent cancellation fields
955 user_stopped: Arc<std::sync::atomic::AtomicBool>,
956 user_stop_tx: watch::Sender<bool>,
957}
958
959impl From<&AcpThread> for ActionLogTelemetry {
960 fn from(value: &AcpThread) -> Self {
961 Self {
962 agent_telemetry_id: value.connection().telemetry_id(),
963 session_id: value.session_id.0.clone(),
964 }
965 }
966}
967
968#[derive(Debug)]
969pub enum AcpThreadEvent {
970 NewEntry,
971 TitleUpdated,
972 TokenUsageUpdated,
973 EntryUpdated(usize),
974 EntriesRemoved(Range<usize>),
975 ToolAuthorizationRequired,
976 Retry(RetryStatus),
977 SubagentSpawned(acp::SessionId),
978 Stopped,
979 Error,
980 LoadError(LoadError),
981 PromptCapabilitiesUpdated,
982 Refusal,
983 AvailableCommandsUpdated(Vec<acp::AvailableCommand>),
984 ModeUpdated(acp::SessionModeId),
985 ConfigOptionsUpdated(Vec<acp::SessionConfigOption>),
986}
987
988impl EventEmitter<AcpThreadEvent> for AcpThread {}
989
990#[derive(Debug, Clone)]
991pub enum TerminalProviderEvent {
992 Created {
993 terminal_id: acp::TerminalId,
994 label: String,
995 cwd: Option<PathBuf>,
996 output_byte_limit: Option<u64>,
997 terminal: Entity<::terminal::Terminal>,
998 },
999 Output {
1000 terminal_id: acp::TerminalId,
1001 data: Vec<u8>,
1002 },
1003 TitleChanged {
1004 terminal_id: acp::TerminalId,
1005 title: String,
1006 },
1007 Exit {
1008 terminal_id: acp::TerminalId,
1009 status: acp::TerminalExitStatus,
1010 },
1011}
1012
1013#[derive(Debug, Clone)]
1014pub enum TerminalProviderCommand {
1015 WriteInput {
1016 terminal_id: acp::TerminalId,
1017 bytes: Vec<u8>,
1018 },
1019 Resize {
1020 terminal_id: acp::TerminalId,
1021 cols: u16,
1022 rows: u16,
1023 },
1024 Close {
1025 terminal_id: acp::TerminalId,
1026 },
1027}
1028
1029impl AcpThread {
1030 pub fn on_terminal_provider_event(
1031 &mut self,
1032 event: TerminalProviderEvent,
1033 cx: &mut Context<Self>,
1034 ) {
1035 match event {
1036 TerminalProviderEvent::Created {
1037 terminal_id,
1038 label,
1039 cwd,
1040 output_byte_limit,
1041 terminal,
1042 } => {
1043 let entity = self.register_terminal_created(
1044 terminal_id.clone(),
1045 label,
1046 cwd,
1047 output_byte_limit,
1048 terminal,
1049 cx,
1050 );
1051
1052 if let Some(mut chunks) = self.pending_terminal_output.remove(&terminal_id) {
1053 for data in chunks.drain(..) {
1054 entity.update(cx, |term, cx| {
1055 term.inner().update(cx, |inner, cx| {
1056 inner.write_output(&data, cx);
1057 })
1058 });
1059 }
1060 }
1061
1062 if let Some(_status) = self.pending_terminal_exit.remove(&terminal_id) {
1063 entity.update(cx, |_term, cx| {
1064 cx.notify();
1065 });
1066 }
1067
1068 cx.notify();
1069 }
1070 TerminalProviderEvent::Output { terminal_id, data } => {
1071 if let Some(entity) = self.terminals.get(&terminal_id) {
1072 entity.update(cx, |term, cx| {
1073 term.inner().update(cx, |inner, cx| {
1074 inner.write_output(&data, cx);
1075 })
1076 });
1077 } else {
1078 self.pending_terminal_output
1079 .entry(terminal_id)
1080 .or_default()
1081 .push(data);
1082 }
1083 }
1084 TerminalProviderEvent::TitleChanged { terminal_id, title } => {
1085 if let Some(entity) = self.terminals.get(&terminal_id) {
1086 entity.update(cx, |term, cx| {
1087 term.inner().update(cx, |inner, cx| {
1088 inner.breadcrumb_text = title;
1089 cx.emit(::terminal::Event::BreadcrumbsChanged);
1090 })
1091 });
1092 }
1093 }
1094 TerminalProviderEvent::Exit {
1095 terminal_id,
1096 status,
1097 } => {
1098 if let Some(entity) = self.terminals.get(&terminal_id) {
1099 entity.update(cx, |_term, cx| {
1100 cx.notify();
1101 });
1102 } else {
1103 self.pending_terminal_exit.insert(terminal_id, status);
1104 }
1105 }
1106 }
1107 }
1108}
1109
1110#[derive(PartialEq, Eq, Debug)]
1111pub enum ThreadStatus {
1112 Idle,
1113 Generating,
1114}
1115
1116#[derive(Debug, Clone)]
1117pub enum LoadError {
1118 Unsupported {
1119 command: SharedString,
1120 current_version: SharedString,
1121 minimum_version: SharedString,
1122 },
1123 FailedToInstall(SharedString),
1124 Exited {
1125 status: ExitStatus,
1126 },
1127 Other(SharedString),
1128}
1129
1130impl Display for LoadError {
1131 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1132 match self {
1133 LoadError::Unsupported {
1134 command: path,
1135 current_version,
1136 minimum_version,
1137 } => {
1138 write!(
1139 f,
1140 "version {current_version} from {path} is not supported (need at least {minimum_version})"
1141 )
1142 }
1143 LoadError::FailedToInstall(msg) => write!(f, "Failed to install: {msg}"),
1144 LoadError::Exited { status } => write!(f, "Server exited with status {status}"),
1145 LoadError::Other(msg) => write!(f, "{msg}"),
1146 }
1147 }
1148}
1149
1150impl Error for LoadError {}
1151
1152impl AcpThread {
1153 pub fn new(
1154 parent_session_id: Option<acp::SessionId>,
1155 title: impl Into<SharedString>,
1156 connection: Rc<dyn AgentConnection>,
1157 project: Entity<Project>,
1158 action_log: Entity<ActionLog>,
1159 session_id: acp::SessionId,
1160 mut prompt_capabilities_rx: watch::Receiver<acp::PromptCapabilities>,
1161 cx: &mut Context<Self>,
1162 ) -> Self {
1163 let prompt_capabilities = prompt_capabilities_rx.borrow().clone();
1164 let task = cx.spawn::<_, anyhow::Result<()>>(async move |this, cx| {
1165 loop {
1166 let caps = prompt_capabilities_rx.recv().await?;
1167 this.update(cx, |this, cx| {
1168 this.prompt_capabilities = caps;
1169 cx.emit(AcpThreadEvent::PromptCapabilitiesUpdated);
1170 })?;
1171 }
1172 });
1173
1174 let (user_stop_tx, _user_stop_rx) = watch::channel(false);
1175
1176 Self {
1177 parent_session_id,
1178 action_log,
1179 shared_buffers: Default::default(),
1180 entries: Default::default(),
1181 plan: Default::default(),
1182 title: title.into(),
1183 project,
1184 send_task: None,
1185 connection,
1186 session_id,
1187 token_usage: None,
1188 prompt_capabilities,
1189 _observe_prompt_capabilities: task,
1190 terminals: HashMap::default(),
1191 pending_terminal_output: HashMap::default(),
1192 pending_terminal_exit: HashMap::default(),
1193 user_stopped: Arc::new(std::sync::atomic::AtomicBool::new(false)),
1194 user_stop_tx,
1195 }
1196 }
1197
1198 pub fn parent_session_id(&self) -> Option<&acp::SessionId> {
1199 self.parent_session_id.as_ref()
1200 }
1201
1202 pub fn prompt_capabilities(&self) -> acp::PromptCapabilities {
1203 self.prompt_capabilities.clone()
1204 }
1205
1206 /// Marks this thread as stopped by user action and signals any listeners.
1207 pub fn stop_by_user(&mut self) {
1208 self.user_stopped
1209 .store(true, std::sync::atomic::Ordering::SeqCst);
1210 self.user_stop_tx.send(true).ok();
1211 self.send_task.take();
1212 }
1213
1214 pub fn was_stopped_by_user(&self) -> bool {
1215 self.user_stopped.load(std::sync::atomic::Ordering::SeqCst)
1216 }
1217
1218 pub fn user_stop_receiver(&self) -> watch::Receiver<bool> {
1219 self.user_stop_tx.receiver()
1220 }
1221
1222 pub fn connection(&self) -> &Rc<dyn AgentConnection> {
1223 &self.connection
1224 }
1225
1226 pub fn action_log(&self) -> &Entity<ActionLog> {
1227 &self.action_log
1228 }
1229
1230 pub fn project(&self) -> &Entity<Project> {
1231 &self.project
1232 }
1233
1234 pub fn title(&self) -> SharedString {
1235 self.title.clone()
1236 }
1237
1238 pub fn entries(&self) -> &[AgentThreadEntry] {
1239 &self.entries
1240 }
1241
1242 pub fn session_id(&self) -> &acp::SessionId {
1243 &self.session_id
1244 }
1245
1246 pub fn status(&self) -> ThreadStatus {
1247 if self.send_task.is_some() {
1248 ThreadStatus::Generating
1249 } else {
1250 ThreadStatus::Idle
1251 }
1252 }
1253
1254 pub fn token_usage(&self) -> Option<&TokenUsage> {
1255 self.token_usage.as_ref()
1256 }
1257
1258 pub fn has_pending_edit_tool_calls(&self) -> bool {
1259 for entry in self.entries.iter().rev() {
1260 match entry {
1261 AgentThreadEntry::UserMessage(_) => return false,
1262 AgentThreadEntry::ToolCall(
1263 call @ ToolCall {
1264 status: ToolCallStatus::InProgress | ToolCallStatus::Pending,
1265 ..
1266 },
1267 ) if call.diffs().next().is_some() => {
1268 return true;
1269 }
1270 AgentThreadEntry::ToolCall(_) | AgentThreadEntry::AssistantMessage(_) => {}
1271 }
1272 }
1273
1274 false
1275 }
1276
1277 pub fn has_in_progress_tool_calls(&self) -> bool {
1278 for entry in self.entries.iter().rev() {
1279 match entry {
1280 AgentThreadEntry::UserMessage(_) => return false,
1281 AgentThreadEntry::ToolCall(ToolCall {
1282 status: ToolCallStatus::InProgress | ToolCallStatus::Pending,
1283 ..
1284 }) => {
1285 return true;
1286 }
1287 AgentThreadEntry::ToolCall(_) | AgentThreadEntry::AssistantMessage(_) => {}
1288 }
1289 }
1290
1291 false
1292 }
1293
1294 pub fn used_tools_since_last_user_message(&self) -> bool {
1295 for entry in self.entries.iter().rev() {
1296 match entry {
1297 AgentThreadEntry::UserMessage(..) => return false,
1298 AgentThreadEntry::AssistantMessage(..) => continue,
1299 AgentThreadEntry::ToolCall(..) => return true,
1300 }
1301 }
1302
1303 false
1304 }
1305
1306 pub fn handle_session_update(
1307 &mut self,
1308 update: acp::SessionUpdate,
1309 cx: &mut Context<Self>,
1310 ) -> Result<(), acp::Error> {
1311 match update {
1312 acp::SessionUpdate::UserMessageChunk(acp::ContentChunk { content, .. }) => {
1313 self.push_user_content_block(None, content, cx);
1314 }
1315 acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk { content, .. }) => {
1316 self.push_assistant_content_block(content, false, cx);
1317 }
1318 acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk { content, .. }) => {
1319 self.push_assistant_content_block(content, true, cx);
1320 }
1321 acp::SessionUpdate::ToolCall(tool_call) => {
1322 self.upsert_tool_call(tool_call, cx)?;
1323 }
1324 acp::SessionUpdate::ToolCallUpdate(tool_call_update) => {
1325 self.update_tool_call(tool_call_update, cx)?;
1326 }
1327 acp::SessionUpdate::Plan(plan) => {
1328 self.update_plan(plan, cx);
1329 }
1330 acp::SessionUpdate::AvailableCommandsUpdate(acp::AvailableCommandsUpdate {
1331 available_commands,
1332 ..
1333 }) => cx.emit(AcpThreadEvent::AvailableCommandsUpdated(available_commands)),
1334 acp::SessionUpdate::CurrentModeUpdate(acp::CurrentModeUpdate {
1335 current_mode_id,
1336 ..
1337 }) => cx.emit(AcpThreadEvent::ModeUpdated(current_mode_id)),
1338 acp::SessionUpdate::ConfigOptionUpdate(acp::ConfigOptionUpdate {
1339 config_options,
1340 ..
1341 }) => cx.emit(AcpThreadEvent::ConfigOptionsUpdated(config_options)),
1342 _ => {}
1343 }
1344 Ok(())
1345 }
1346
1347 pub fn push_user_content_block(
1348 &mut self,
1349 message_id: Option<UserMessageId>,
1350 chunk: acp::ContentBlock,
1351 cx: &mut Context<Self>,
1352 ) {
1353 self.push_user_content_block_with_indent(message_id, chunk, false, cx)
1354 }
1355
1356 pub fn push_user_content_block_with_indent(
1357 &mut self,
1358 message_id: Option<UserMessageId>,
1359 chunk: acp::ContentBlock,
1360 indented: bool,
1361 cx: &mut Context<Self>,
1362 ) {
1363 let language_registry = self.project.read(cx).languages().clone();
1364 let path_style = self.project.read(cx).path_style(cx);
1365 let entries_len = self.entries.len();
1366
1367 if let Some(last_entry) = self.entries.last_mut()
1368 && let AgentThreadEntry::UserMessage(UserMessage {
1369 id,
1370 content,
1371 chunks,
1372 indented: existing_indented,
1373 ..
1374 }) = last_entry
1375 && *existing_indented == indented
1376 {
1377 *id = message_id.or(id.take());
1378 content.append(chunk.clone(), &language_registry, path_style, cx);
1379 chunks.push(chunk);
1380 let idx = entries_len - 1;
1381 cx.emit(AcpThreadEvent::EntryUpdated(idx));
1382 } else {
1383 let content = ContentBlock::new(chunk.clone(), &language_registry, path_style, cx);
1384 self.push_entry(
1385 AgentThreadEntry::UserMessage(UserMessage {
1386 id: message_id,
1387 content,
1388 chunks: vec![chunk],
1389 checkpoint: None,
1390 indented,
1391 }),
1392 cx,
1393 );
1394 }
1395 }
1396
1397 pub fn push_assistant_content_block(
1398 &mut self,
1399 chunk: acp::ContentBlock,
1400 is_thought: bool,
1401 cx: &mut Context<Self>,
1402 ) {
1403 self.push_assistant_content_block_with_indent(chunk, is_thought, false, cx)
1404 }
1405
1406 pub fn push_assistant_content_block_with_indent(
1407 &mut self,
1408 chunk: acp::ContentBlock,
1409 is_thought: bool,
1410 indented: bool,
1411 cx: &mut Context<Self>,
1412 ) {
1413 let language_registry = self.project.read(cx).languages().clone();
1414 let path_style = self.project.read(cx).path_style(cx);
1415 let entries_len = self.entries.len();
1416 if let Some(last_entry) = self.entries.last_mut()
1417 && let AgentThreadEntry::AssistantMessage(AssistantMessage {
1418 chunks,
1419 indented: existing_indented,
1420 }) = last_entry
1421 && *existing_indented == indented
1422 {
1423 let idx = entries_len - 1;
1424 cx.emit(AcpThreadEvent::EntryUpdated(idx));
1425 match (chunks.last_mut(), is_thought) {
1426 (Some(AssistantMessageChunk::Message { block }), false)
1427 | (Some(AssistantMessageChunk::Thought { block }), true) => {
1428 block.append(chunk, &language_registry, path_style, cx)
1429 }
1430 _ => {
1431 let block = ContentBlock::new(chunk, &language_registry, path_style, cx);
1432 if is_thought {
1433 chunks.push(AssistantMessageChunk::Thought { block })
1434 } else {
1435 chunks.push(AssistantMessageChunk::Message { block })
1436 }
1437 }
1438 }
1439 } else {
1440 let block = ContentBlock::new(chunk, &language_registry, path_style, cx);
1441 let chunk = if is_thought {
1442 AssistantMessageChunk::Thought { block }
1443 } else {
1444 AssistantMessageChunk::Message { block }
1445 };
1446
1447 self.push_entry(
1448 AgentThreadEntry::AssistantMessage(AssistantMessage {
1449 chunks: vec![chunk],
1450 indented,
1451 }),
1452 cx,
1453 );
1454 }
1455 }
1456
1457 fn push_entry(&mut self, entry: AgentThreadEntry, cx: &mut Context<Self>) {
1458 self.entries.push(entry);
1459 cx.emit(AcpThreadEvent::NewEntry);
1460 }
1461
1462 pub fn can_set_title(&mut self, cx: &mut Context<Self>) -> bool {
1463 self.connection.set_title(&self.session_id, cx).is_some()
1464 }
1465
1466 pub fn set_title(&mut self, title: SharedString, cx: &mut Context<Self>) -> Task<Result<()>> {
1467 if title != self.title {
1468 self.title = title.clone();
1469 cx.emit(AcpThreadEvent::TitleUpdated);
1470 if let Some(set_title) = self.connection.set_title(&self.session_id, cx) {
1471 return set_title.run(title, cx);
1472 }
1473 }
1474 Task::ready(Ok(()))
1475 }
1476
1477 pub fn subagent_spawned(&mut self, session_id: acp::SessionId, cx: &mut Context<Self>) {
1478 cx.emit(AcpThreadEvent::SubagentSpawned(session_id));
1479 }
1480
1481 pub fn update_token_usage(&mut self, usage: Option<TokenUsage>, cx: &mut Context<Self>) {
1482 self.token_usage = usage;
1483 cx.emit(AcpThreadEvent::TokenUsageUpdated);
1484 }
1485
1486 pub fn update_retry_status(&mut self, status: RetryStatus, cx: &mut Context<Self>) {
1487 cx.emit(AcpThreadEvent::Retry(status));
1488 }
1489
1490 pub fn update_tool_call(
1491 &mut self,
1492 update: impl Into<ToolCallUpdate>,
1493 cx: &mut Context<Self>,
1494 ) -> Result<()> {
1495 let update = update.into();
1496 let languages = self.project.read(cx).languages().clone();
1497 let path_style = self.project.read(cx).path_style(cx);
1498
1499 let ix = match self.index_for_tool_call(update.id()) {
1500 Some(ix) => ix,
1501 None => {
1502 // Tool call not found - create a failed tool call entry
1503 let failed_tool_call = ToolCall {
1504 id: update.id().clone(),
1505 label: cx.new(|cx| Markdown::new("Tool call not found".into(), None, None, cx)),
1506 kind: acp::ToolKind::Fetch,
1507 content: vec![ToolCallContent::ContentBlock(ContentBlock::new(
1508 "Tool call not found".into(),
1509 &languages,
1510 path_style,
1511 cx,
1512 ))],
1513 status: ToolCallStatus::Failed,
1514 locations: Vec::new(),
1515 resolved_locations: Vec::new(),
1516 raw_input: None,
1517 raw_input_markdown: None,
1518 raw_output: None,
1519 tool_name: None,
1520 subagent_session_id: None,
1521 };
1522 self.push_entry(AgentThreadEntry::ToolCall(failed_tool_call), cx);
1523 return Ok(());
1524 }
1525 };
1526 let AgentThreadEntry::ToolCall(call) = &mut self.entries[ix] else {
1527 unreachable!()
1528 };
1529
1530 match update {
1531 ToolCallUpdate::UpdateFields(update) => {
1532 let location_updated = update.fields.locations.is_some();
1533 call.update_fields(
1534 update.fields,
1535 update.meta,
1536 languages,
1537 path_style,
1538 &self.terminals,
1539 cx,
1540 )?;
1541 if location_updated {
1542 self.resolve_locations(update.tool_call_id, cx);
1543 }
1544 }
1545 ToolCallUpdate::UpdateDiff(update) => {
1546 call.content.clear();
1547 call.content.push(ToolCallContent::Diff(update.diff));
1548 }
1549 ToolCallUpdate::UpdateTerminal(update) => {
1550 call.content.clear();
1551 call.content
1552 .push(ToolCallContent::Terminal(update.terminal));
1553 }
1554 }
1555
1556 cx.emit(AcpThreadEvent::EntryUpdated(ix));
1557
1558 Ok(())
1559 }
1560
1561 /// Updates a tool call if id matches an existing entry, otherwise inserts a new one.
1562 pub fn upsert_tool_call(
1563 &mut self,
1564 tool_call: acp::ToolCall,
1565 cx: &mut Context<Self>,
1566 ) -> Result<(), acp::Error> {
1567 let status = tool_call.status.into();
1568 self.upsert_tool_call_inner(tool_call.into(), status, cx)
1569 }
1570
1571 /// Fails if id does not match an existing entry.
1572 pub fn upsert_tool_call_inner(
1573 &mut self,
1574 update: acp::ToolCallUpdate,
1575 status: ToolCallStatus,
1576 cx: &mut Context<Self>,
1577 ) -> Result<(), acp::Error> {
1578 let language_registry = self.project.read(cx).languages().clone();
1579 let path_style = self.project.read(cx).path_style(cx);
1580 let id = update.tool_call_id.clone();
1581
1582 let agent_telemetry_id = self.connection().telemetry_id();
1583 let session = self.session_id();
1584 if let ToolCallStatus::Completed | ToolCallStatus::Failed = status {
1585 let status = if matches!(status, ToolCallStatus::Completed) {
1586 "completed"
1587 } else {
1588 "failed"
1589 };
1590 telemetry::event!(
1591 "Agent Tool Call Completed",
1592 agent_telemetry_id,
1593 session,
1594 status
1595 );
1596 }
1597
1598 if let Some(ix) = self.index_for_tool_call(&id) {
1599 let AgentThreadEntry::ToolCall(call) = &mut self.entries[ix] else {
1600 unreachable!()
1601 };
1602
1603 call.update_fields(
1604 update.fields,
1605 update.meta,
1606 language_registry,
1607 path_style,
1608 &self.terminals,
1609 cx,
1610 )?;
1611 call.status = status;
1612
1613 cx.emit(AcpThreadEvent::EntryUpdated(ix));
1614 } else {
1615 let call = ToolCall::from_acp(
1616 update.try_into()?,
1617 status,
1618 language_registry,
1619 self.project.read(cx).path_style(cx),
1620 &self.terminals,
1621 cx,
1622 )?;
1623 self.push_entry(AgentThreadEntry::ToolCall(call), cx);
1624 };
1625
1626 self.resolve_locations(id, cx);
1627 Ok(())
1628 }
1629
1630 fn index_for_tool_call(&self, id: &acp::ToolCallId) -> Option<usize> {
1631 self.entries
1632 .iter()
1633 .enumerate()
1634 .rev()
1635 .find_map(|(index, entry)| {
1636 if let AgentThreadEntry::ToolCall(tool_call) = entry
1637 && &tool_call.id == id
1638 {
1639 Some(index)
1640 } else {
1641 None
1642 }
1643 })
1644 }
1645
1646 fn tool_call_mut(&mut self, id: &acp::ToolCallId) -> Option<(usize, &mut ToolCall)> {
1647 // The tool call we are looking for is typically the last one, or very close to the end.
1648 // At the moment, it doesn't seem like a hashmap would be a good fit for this use case.
1649 self.entries
1650 .iter_mut()
1651 .enumerate()
1652 .rev()
1653 .find_map(|(index, tool_call)| {
1654 if let AgentThreadEntry::ToolCall(tool_call) = tool_call
1655 && &tool_call.id == id
1656 {
1657 Some((index, tool_call))
1658 } else {
1659 None
1660 }
1661 })
1662 }
1663
1664 pub fn tool_call(&mut self, id: &acp::ToolCallId) -> Option<(usize, &ToolCall)> {
1665 self.entries
1666 .iter()
1667 .enumerate()
1668 .rev()
1669 .find_map(|(index, tool_call)| {
1670 if let AgentThreadEntry::ToolCall(tool_call) = tool_call
1671 && &tool_call.id == id
1672 {
1673 Some((index, tool_call))
1674 } else {
1675 None
1676 }
1677 })
1678 }
1679
1680 pub fn resolve_locations(&mut self, id: acp::ToolCallId, cx: &mut Context<Self>) {
1681 let project = self.project.clone();
1682 let Some((_, tool_call)) = self.tool_call_mut(&id) else {
1683 return;
1684 };
1685 let task = tool_call.resolve_locations(project, cx);
1686 cx.spawn(async move |this, cx| {
1687 let resolved_locations = task.await;
1688
1689 this.update(cx, |this, cx| {
1690 let project = this.project.clone();
1691
1692 for location in resolved_locations.iter().flatten() {
1693 this.shared_buffers
1694 .insert(location.buffer.clone(), location.buffer.read(cx).snapshot());
1695 }
1696 let Some((ix, tool_call)) = this.tool_call_mut(&id) else {
1697 return;
1698 };
1699
1700 if let Some(Some(location)) = resolved_locations.last() {
1701 project.update(cx, |project, cx| {
1702 let should_ignore = if let Some(agent_location) = project
1703 .agent_location()
1704 .filter(|agent_location| agent_location.buffer == location.buffer)
1705 {
1706 let snapshot = location.buffer.read(cx).snapshot();
1707 let old_position = agent_location.position.to_point(&snapshot);
1708 let new_position = location.position.to_point(&snapshot);
1709
1710 // ignore this so that when we get updates from the edit tool
1711 // the position doesn't reset to the startof line
1712 old_position.row == new_position.row
1713 && old_position.column > new_position.column
1714 } else {
1715 false
1716 };
1717 if !should_ignore {
1718 project.set_agent_location(Some(location.into()), cx);
1719 }
1720 });
1721 }
1722
1723 let resolved_locations = resolved_locations
1724 .iter()
1725 .map(|l| l.as_ref().map(|l| AgentLocation::from(l)))
1726 .collect::<Vec<_>>();
1727
1728 if tool_call.resolved_locations != resolved_locations {
1729 tool_call.resolved_locations = resolved_locations;
1730 cx.emit(AcpThreadEvent::EntryUpdated(ix));
1731 }
1732 })
1733 })
1734 .detach();
1735 }
1736
1737 pub fn request_tool_call_authorization(
1738 &mut self,
1739 tool_call: acp::ToolCallUpdate,
1740 options: PermissionOptions,
1741 cx: &mut Context<Self>,
1742 ) -> Result<BoxFuture<'static, acp::RequestPermissionOutcome>> {
1743 let (tx, rx) = oneshot::channel();
1744
1745 let status = ToolCallStatus::WaitingForConfirmation {
1746 options,
1747 respond_tx: tx,
1748 };
1749
1750 self.upsert_tool_call_inner(tool_call, status, cx)?;
1751 cx.emit(AcpThreadEvent::ToolAuthorizationRequired);
1752
1753 let fut = async {
1754 match rx.await {
1755 Ok(option) => acp::RequestPermissionOutcome::Selected(
1756 acp::SelectedPermissionOutcome::new(option),
1757 ),
1758 Err(oneshot::Canceled) => acp::RequestPermissionOutcome::Cancelled,
1759 }
1760 }
1761 .boxed();
1762
1763 Ok(fut)
1764 }
1765
1766 pub fn authorize_tool_call(
1767 &mut self,
1768 id: acp::ToolCallId,
1769 option_id: acp::PermissionOptionId,
1770 option_kind: acp::PermissionOptionKind,
1771 cx: &mut Context<Self>,
1772 ) {
1773 let Some((ix, call)) = self.tool_call_mut(&id) else {
1774 return;
1775 };
1776
1777 let new_status = match option_kind {
1778 acp::PermissionOptionKind::RejectOnce | acp::PermissionOptionKind::RejectAlways => {
1779 ToolCallStatus::Rejected
1780 }
1781 acp::PermissionOptionKind::AllowOnce | acp::PermissionOptionKind::AllowAlways => {
1782 ToolCallStatus::InProgress
1783 }
1784 _ => ToolCallStatus::InProgress,
1785 };
1786
1787 let curr_status = mem::replace(&mut call.status, new_status);
1788
1789 if let ToolCallStatus::WaitingForConfirmation { respond_tx, .. } = curr_status {
1790 respond_tx.send(option_id).log_err();
1791 } else if cfg!(debug_assertions) {
1792 panic!("tried to authorize an already authorized tool call");
1793 }
1794
1795 cx.emit(AcpThreadEvent::EntryUpdated(ix));
1796 }
1797
1798 pub fn first_tool_awaiting_confirmation(&self) -> Option<&ToolCall> {
1799 let mut first_tool_call = None;
1800
1801 for entry in self.entries.iter().rev() {
1802 match &entry {
1803 AgentThreadEntry::ToolCall(call) => {
1804 if let ToolCallStatus::WaitingForConfirmation { .. } = call.status {
1805 first_tool_call = Some(call);
1806 } else {
1807 continue;
1808 }
1809 }
1810 AgentThreadEntry::UserMessage(_) | AgentThreadEntry::AssistantMessage(_) => {
1811 // Reached the beginning of the turn.
1812 // If we had pending permission requests in the previous turn, they have been cancelled.
1813 break;
1814 }
1815 }
1816 }
1817
1818 first_tool_call
1819 }
1820
1821 pub fn plan(&self) -> &Plan {
1822 &self.plan
1823 }
1824
1825 pub fn update_plan(&mut self, request: acp::Plan, cx: &mut Context<Self>) {
1826 let new_entries_len = request.entries.len();
1827 let mut new_entries = request.entries.into_iter();
1828
1829 // Reuse existing markdown to prevent flickering
1830 for (old, new) in self.plan.entries.iter_mut().zip(new_entries.by_ref()) {
1831 let PlanEntry {
1832 content,
1833 priority,
1834 status,
1835 } = old;
1836 content.update(cx, |old, cx| {
1837 old.replace(new.content, cx);
1838 });
1839 *priority = new.priority;
1840 *status = new.status;
1841 }
1842 for new in new_entries {
1843 self.plan.entries.push(PlanEntry::from_acp(new, cx))
1844 }
1845 self.plan.entries.truncate(new_entries_len);
1846
1847 cx.notify();
1848 }
1849
1850 fn clear_completed_plan_entries(&mut self, cx: &mut Context<Self>) {
1851 self.plan
1852 .entries
1853 .retain(|entry| !matches!(entry.status, acp::PlanEntryStatus::Completed));
1854 cx.notify();
1855 }
1856
1857 #[cfg(any(test, feature = "test-support"))]
1858 pub fn send_raw(
1859 &mut self,
1860 message: &str,
1861 cx: &mut Context<Self>,
1862 ) -> BoxFuture<'static, Result<()>> {
1863 self.send(vec![message.into()], cx)
1864 }
1865
1866 pub fn send(
1867 &mut self,
1868 message: Vec<acp::ContentBlock>,
1869 cx: &mut Context<Self>,
1870 ) -> BoxFuture<'static, Result<()>> {
1871 let block = ContentBlock::new_combined(
1872 message.clone(),
1873 self.project.read(cx).languages().clone(),
1874 self.project.read(cx).path_style(cx),
1875 cx,
1876 );
1877 let request = acp::PromptRequest::new(self.session_id.clone(), message.clone());
1878 let git_store = self.project.read(cx).git_store().clone();
1879
1880 let message_id = if self.connection.truncate(&self.session_id, cx).is_some() {
1881 Some(UserMessageId::new())
1882 } else {
1883 None
1884 };
1885
1886 self.run_turn(cx, async move |this, cx| {
1887 this.update(cx, |this, cx| {
1888 this.push_entry(
1889 AgentThreadEntry::UserMessage(UserMessage {
1890 id: message_id.clone(),
1891 content: block,
1892 chunks: message,
1893 checkpoint: None,
1894 indented: false,
1895 }),
1896 cx,
1897 );
1898 })
1899 .ok();
1900
1901 let old_checkpoint = git_store
1902 .update(cx, |git, cx| git.checkpoint(cx))
1903 .await
1904 .context("failed to get old checkpoint")
1905 .log_err();
1906 this.update(cx, |this, cx| {
1907 if let Some((_ix, message)) = this.last_user_message() {
1908 message.checkpoint = old_checkpoint.map(|git_checkpoint| Checkpoint {
1909 git_checkpoint,
1910 show: false,
1911 });
1912 }
1913 this.connection.prompt(message_id, request, cx)
1914 })?
1915 .await
1916 })
1917 }
1918
1919 pub fn can_retry(&self, cx: &App) -> bool {
1920 self.connection.retry(&self.session_id, cx).is_some()
1921 }
1922
1923 pub fn retry(&mut self, cx: &mut Context<Self>) -> BoxFuture<'static, Result<()>> {
1924 self.run_turn(cx, async move |this, cx| {
1925 this.update(cx, |this, cx| {
1926 this.connection
1927 .retry(&this.session_id, cx)
1928 .map(|retry| retry.run(cx))
1929 })?
1930 .context("retrying a session is not supported")?
1931 .await
1932 })
1933 }
1934
1935 fn run_turn(
1936 &mut self,
1937 cx: &mut Context<Self>,
1938 f: impl 'static + AsyncFnOnce(WeakEntity<Self>, &mut AsyncApp) -> Result<acp::PromptResponse>,
1939 ) -> BoxFuture<'static, Result<()>> {
1940 self.clear_completed_plan_entries(cx);
1941
1942 let (tx, rx) = oneshot::channel();
1943 let cancel_task = self.cancel(cx);
1944
1945 self.send_task = Some(cx.spawn(async move |this, cx| {
1946 cancel_task.await;
1947 tx.send(f(this, cx).await).ok();
1948 }));
1949
1950 cx.spawn(async move |this, cx| {
1951 let response = rx.await;
1952
1953 this.update(cx, |this, cx| this.update_last_checkpoint(cx))?
1954 .await?;
1955
1956 this.update(cx, |this, cx| {
1957 this.project
1958 .update(cx, |project, cx| project.set_agent_location(None, cx));
1959 match response {
1960 Ok(Err(e)) => {
1961 this.send_task.take();
1962 cx.emit(AcpThreadEvent::Error);
1963 Err(e)
1964 }
1965 result => {
1966 let canceled = matches!(
1967 result,
1968 Ok(Ok(acp::PromptResponse {
1969 stop_reason: acp::StopReason::Cancelled,
1970 ..
1971 }))
1972 );
1973
1974 // We only take the task if the current prompt wasn't canceled.
1975 //
1976 // This prompt may have been canceled because another one was sent
1977 // while it was still generating. In these cases, dropping `send_task`
1978 // would cause the next generation to be canceled.
1979 if !canceled {
1980 this.send_task.take();
1981 }
1982
1983 // Handle refusal - distinguish between user prompt and tool call refusals
1984 if let Ok(Ok(acp::PromptResponse {
1985 stop_reason: acp::StopReason::Refusal,
1986 ..
1987 })) = result
1988 {
1989 if let Some((user_msg_ix, _)) = this.last_user_message() {
1990 // Check if there's a completed tool call with results after the last user message
1991 // This indicates the refusal is in response to tool output, not the user's prompt
1992 let has_completed_tool_call_after_user_msg =
1993 this.entries.iter().skip(user_msg_ix + 1).any(|entry| {
1994 if let AgentThreadEntry::ToolCall(tool_call) = entry {
1995 // Check if the tool call has completed and has output
1996 matches!(tool_call.status, ToolCallStatus::Completed)
1997 && tool_call.raw_output.is_some()
1998 } else {
1999 false
2000 }
2001 });
2002
2003 if has_completed_tool_call_after_user_msg {
2004 // Refusal is due to tool output - don't truncate, just notify
2005 // The model refused based on what the tool returned
2006 cx.emit(AcpThreadEvent::Refusal);
2007 } else {
2008 // User prompt was refused - truncate back to before the user message
2009 let range = user_msg_ix..this.entries.len();
2010 if range.start < range.end {
2011 this.entries.truncate(user_msg_ix);
2012 cx.emit(AcpThreadEvent::EntriesRemoved(range));
2013 }
2014 cx.emit(AcpThreadEvent::Refusal);
2015 }
2016 } else {
2017 // No user message found, treat as general refusal
2018 cx.emit(AcpThreadEvent::Refusal);
2019 }
2020 }
2021
2022 cx.emit(AcpThreadEvent::Stopped);
2023 Ok(())
2024 }
2025 }
2026 })?
2027 })
2028 .boxed()
2029 }
2030
2031 pub fn cancel(&mut self, cx: &mut Context<Self>) -> Task<()> {
2032 let Some(send_task) = self.send_task.take() else {
2033 return Task::ready(());
2034 };
2035
2036 for entry in self.entries.iter_mut() {
2037 if let AgentThreadEntry::ToolCall(call) = entry {
2038 let cancel = matches!(
2039 call.status,
2040 ToolCallStatus::Pending
2041 | ToolCallStatus::WaitingForConfirmation { .. }
2042 | ToolCallStatus::InProgress
2043 );
2044
2045 if cancel {
2046 call.status = ToolCallStatus::Canceled;
2047 }
2048 }
2049 }
2050
2051 self.connection.cancel(&self.session_id, cx);
2052
2053 // Wait for the send task to complete
2054 cx.foreground_executor().spawn(send_task)
2055 }
2056
2057 /// Restores the git working tree to the state at the given checkpoint (if one exists)
2058 pub fn restore_checkpoint(
2059 &mut self,
2060 id: UserMessageId,
2061 cx: &mut Context<Self>,
2062 ) -> Task<Result<()>> {
2063 let Some((_, message)) = self.user_message_mut(&id) else {
2064 return Task::ready(Err(anyhow!("message not found")));
2065 };
2066
2067 let checkpoint = message
2068 .checkpoint
2069 .as_ref()
2070 .map(|c| c.git_checkpoint.clone());
2071
2072 // Cancel any in-progress generation before restoring
2073 let cancel_task = self.cancel(cx);
2074 let rewind = self.rewind(id.clone(), cx);
2075 let git_store = self.project.read(cx).git_store().clone();
2076
2077 cx.spawn(async move |_, cx| {
2078 cancel_task.await;
2079 rewind.await?;
2080 if let Some(checkpoint) = checkpoint {
2081 git_store
2082 .update(cx, |git, cx| git.restore_checkpoint(checkpoint, cx))
2083 .await?;
2084 }
2085
2086 Ok(())
2087 })
2088 }
2089
2090 /// Rewinds this thread to before the entry at `index`, removing it and all
2091 /// subsequent entries while rejecting any action_log changes made from that point.
2092 /// Unlike `restore_checkpoint`, this method does not restore from git.
2093 pub fn rewind(&mut self, id: UserMessageId, cx: &mut Context<Self>) -> Task<Result<()>> {
2094 let Some(truncate) = self.connection.truncate(&self.session_id, cx) else {
2095 return Task::ready(Err(anyhow!("not supported")));
2096 };
2097
2098 let telemetry = ActionLogTelemetry::from(&*self);
2099 cx.spawn(async move |this, cx| {
2100 cx.update(|cx| truncate.run(id.clone(), cx)).await?;
2101 this.update(cx, |this, cx| {
2102 if let Some((ix, _)) = this.user_message_mut(&id) {
2103 // Collect all terminals from entries that will be removed
2104 let terminals_to_remove: Vec<acp::TerminalId> = this.entries[ix..]
2105 .iter()
2106 .flat_map(|entry| entry.terminals())
2107 .filter_map(|terminal| terminal.read(cx).id().clone().into())
2108 .collect();
2109
2110 let range = ix..this.entries.len();
2111 this.entries.truncate(ix);
2112 cx.emit(AcpThreadEvent::EntriesRemoved(range));
2113
2114 // Kill and remove the terminals
2115 for terminal_id in terminals_to_remove {
2116 if let Some(terminal) = this.terminals.remove(&terminal_id) {
2117 terminal.update(cx, |terminal, cx| {
2118 terminal.kill(cx);
2119 });
2120 }
2121 }
2122 }
2123 this.action_log().update(cx, |action_log, cx| {
2124 action_log.reject_all_edits(Some(telemetry), cx)
2125 })
2126 })?
2127 .await;
2128 Ok(())
2129 })
2130 }
2131
2132 fn update_last_checkpoint(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
2133 let git_store = self.project.read(cx).git_store().clone();
2134
2135 let Some((_, message)) = self.last_user_message() else {
2136 return Task::ready(Ok(()));
2137 };
2138 let Some(user_message_id) = message.id.clone() else {
2139 return Task::ready(Ok(()));
2140 };
2141 let Some(checkpoint) = message.checkpoint.as_ref() else {
2142 return Task::ready(Ok(()));
2143 };
2144 let old_checkpoint = checkpoint.git_checkpoint.clone();
2145
2146 let new_checkpoint = git_store.update(cx, |git, cx| git.checkpoint(cx));
2147 cx.spawn(async move |this, cx| {
2148 let Some(new_checkpoint) = new_checkpoint
2149 .await
2150 .context("failed to get new checkpoint")
2151 .log_err()
2152 else {
2153 return Ok(());
2154 };
2155
2156 let equal = git_store
2157 .update(cx, |git, cx| {
2158 git.compare_checkpoints(old_checkpoint.clone(), new_checkpoint, cx)
2159 })
2160 .await
2161 .unwrap_or(true);
2162
2163 this.update(cx, |this, cx| {
2164 if let Some((ix, message)) = this.user_message_mut(&user_message_id) {
2165 if let Some(checkpoint) = message.checkpoint.as_mut() {
2166 checkpoint.show = !equal;
2167 cx.emit(AcpThreadEvent::EntryUpdated(ix));
2168 }
2169 }
2170 })?;
2171
2172 Ok(())
2173 })
2174 }
2175
2176 fn last_user_message(&mut self) -> Option<(usize, &mut UserMessage)> {
2177 self.entries
2178 .iter_mut()
2179 .enumerate()
2180 .rev()
2181 .find_map(|(ix, entry)| {
2182 if let AgentThreadEntry::UserMessage(message) = entry {
2183 Some((ix, message))
2184 } else {
2185 None
2186 }
2187 })
2188 }
2189
2190 fn user_message_mut(&mut self, id: &UserMessageId) -> Option<(usize, &mut UserMessage)> {
2191 self.entries.iter_mut().enumerate().find_map(|(ix, entry)| {
2192 if let AgentThreadEntry::UserMessage(message) = entry {
2193 if message.id.as_ref() == Some(id) {
2194 Some((ix, message))
2195 } else {
2196 None
2197 }
2198 } else {
2199 None
2200 }
2201 })
2202 }
2203
2204 pub fn read_text_file(
2205 &self,
2206 path: PathBuf,
2207 line: Option<u32>,
2208 limit: Option<u32>,
2209 reuse_shared_snapshot: bool,
2210 cx: &mut Context<Self>,
2211 ) -> Task<Result<String, acp::Error>> {
2212 // Args are 1-based, move to 0-based
2213 let line = line.unwrap_or_default().saturating_sub(1);
2214 let limit = limit.unwrap_or(u32::MAX);
2215 let project = self.project.clone();
2216 let action_log = self.action_log.clone();
2217 cx.spawn(async move |this, cx| {
2218 let load = project.update(cx, |project, cx| {
2219 let path = project
2220 .project_path_for_absolute_path(&path, cx)
2221 .ok_or_else(|| {
2222 acp::Error::resource_not_found(Some(path.display().to_string()))
2223 })?;
2224 Ok::<_, acp::Error>(project.open_buffer(path, cx))
2225 })?;
2226
2227 let buffer = load.await?;
2228
2229 let snapshot = if reuse_shared_snapshot {
2230 this.read_with(cx, |this, _| {
2231 this.shared_buffers.get(&buffer.clone()).cloned()
2232 })
2233 .log_err()
2234 .flatten()
2235 } else {
2236 None
2237 };
2238
2239 let snapshot = if let Some(snapshot) = snapshot {
2240 snapshot
2241 } else {
2242 action_log.update(cx, |action_log, cx| {
2243 action_log.buffer_read(buffer.clone(), cx);
2244 });
2245
2246 let snapshot = buffer.update(cx, |buffer, _| buffer.snapshot());
2247 this.update(cx, |this, _| {
2248 this.shared_buffers.insert(buffer.clone(), snapshot.clone());
2249 })?;
2250 snapshot
2251 };
2252
2253 let max_point = snapshot.max_point();
2254 let start_position = Point::new(line, 0);
2255
2256 if start_position > max_point {
2257 return Err(acp::Error::invalid_params().data(format!(
2258 "Attempting to read beyond the end of the file, line {}:{}",
2259 max_point.row + 1,
2260 max_point.column
2261 )));
2262 }
2263
2264 let start = snapshot.anchor_before(start_position);
2265 let end = snapshot.anchor_before(Point::new(line.saturating_add(limit), 0));
2266
2267 project.update(cx, |project, cx| {
2268 project.set_agent_location(
2269 Some(AgentLocation {
2270 buffer: buffer.downgrade(),
2271 position: start,
2272 }),
2273 cx,
2274 );
2275 });
2276
2277 Ok(snapshot.text_for_range(start..end).collect::<String>())
2278 })
2279 }
2280
2281 pub fn write_text_file(
2282 &self,
2283 path: PathBuf,
2284 content: String,
2285 cx: &mut Context<Self>,
2286 ) -> Task<Result<()>> {
2287 let project = self.project.clone();
2288 let action_log = self.action_log.clone();
2289 cx.spawn(async move |this, cx| {
2290 let load = project.update(cx, |project, cx| {
2291 let path = project
2292 .project_path_for_absolute_path(&path, cx)
2293 .context("invalid path")?;
2294 anyhow::Ok(project.open_buffer(path, cx))
2295 });
2296 let buffer = load?.await?;
2297 let snapshot = this.update(cx, |this, cx| {
2298 this.shared_buffers
2299 .get(&buffer)
2300 .cloned()
2301 .unwrap_or_else(|| buffer.read(cx).snapshot())
2302 })?;
2303 let edits = cx
2304 .background_executor()
2305 .spawn(async move {
2306 let old_text = snapshot.text();
2307 text_diff(old_text.as_str(), &content)
2308 .into_iter()
2309 .map(|(range, replacement)| {
2310 (
2311 snapshot.anchor_after(range.start)
2312 ..snapshot.anchor_before(range.end),
2313 replacement,
2314 )
2315 })
2316 .collect::<Vec<_>>()
2317 })
2318 .await;
2319
2320 project.update(cx, |project, cx| {
2321 project.set_agent_location(
2322 Some(AgentLocation {
2323 buffer: buffer.downgrade(),
2324 position: edits
2325 .last()
2326 .map(|(range, _)| range.end)
2327 .unwrap_or(Anchor::min_for_buffer(buffer.read(cx).remote_id())),
2328 }),
2329 cx,
2330 );
2331 });
2332
2333 let format_on_save = cx.update(|cx| {
2334 action_log.update(cx, |action_log, cx| {
2335 action_log.buffer_read(buffer.clone(), cx);
2336 });
2337
2338 let format_on_save = buffer.update(cx, |buffer, cx| {
2339 buffer.edit(edits, None, cx);
2340
2341 let settings = language::language_settings::language_settings(
2342 buffer.language().map(|l| l.name()),
2343 buffer.file(),
2344 cx,
2345 );
2346
2347 settings.format_on_save != FormatOnSave::Off
2348 });
2349 action_log.update(cx, |action_log, cx| {
2350 action_log.buffer_edited(buffer.clone(), cx);
2351 });
2352 format_on_save
2353 });
2354
2355 if format_on_save {
2356 let format_task = project.update(cx, |project, cx| {
2357 project.format(
2358 HashSet::from_iter([buffer.clone()]),
2359 LspFormatTarget::Buffers,
2360 false,
2361 FormatTrigger::Save,
2362 cx,
2363 )
2364 });
2365 format_task.await.log_err();
2366
2367 action_log.update(cx, |action_log, cx| {
2368 action_log.buffer_edited(buffer.clone(), cx);
2369 });
2370 }
2371
2372 project
2373 .update(cx, |project, cx| project.save_buffer(buffer, cx))
2374 .await
2375 })
2376 }
2377
2378 pub fn create_terminal(
2379 &self,
2380 command: String,
2381 args: Vec<String>,
2382 extra_env: Vec<acp::EnvVariable>,
2383 cwd: Option<PathBuf>,
2384 output_byte_limit: Option<u64>,
2385 cx: &mut Context<Self>,
2386 ) -> Task<Result<Entity<Terminal>>> {
2387 let env = match &cwd {
2388 Some(dir) => self.project.update(cx, |project, cx| {
2389 project.environment().update(cx, |env, cx| {
2390 env.directory_environment(dir.as_path().into(), cx)
2391 })
2392 }),
2393 None => Task::ready(None).shared(),
2394 };
2395 let env = cx.spawn(async move |_, _| {
2396 let mut env = env.await.unwrap_or_default();
2397 // Disables paging for `git` and hopefully other commands
2398 env.insert("PAGER".into(), "".into());
2399 for var in extra_env {
2400 env.insert(var.name, var.value);
2401 }
2402 env
2403 });
2404
2405 let project = self.project.clone();
2406 let language_registry = project.read(cx).languages().clone();
2407 let is_windows = project.read(cx).path_style(cx).is_windows();
2408
2409 let terminal_id = acp::TerminalId::new(Uuid::new_v4().to_string());
2410 let terminal_task = cx.spawn({
2411 let terminal_id = terminal_id.clone();
2412 async move |_this, cx| {
2413 let env = env.await;
2414 let shell = project
2415 .update(cx, |project, cx| {
2416 project
2417 .remote_client()
2418 .and_then(|r| r.read(cx).default_system_shell())
2419 })
2420 .unwrap_or_else(|| get_default_system_shell_preferring_bash());
2421 let (task_command, task_args) =
2422 ShellBuilder::new(&Shell::Program(shell), is_windows)
2423 .redirect_stdin_to_dev_null()
2424 .build(Some(command.clone()), &args);
2425 let terminal = project
2426 .update(cx, |project, cx| {
2427 project.create_terminal_task(
2428 task::SpawnInTerminal {
2429 command: Some(task_command),
2430 args: task_args,
2431 cwd: cwd.clone(),
2432 env,
2433 ..Default::default()
2434 },
2435 cx,
2436 )
2437 })
2438 .await?;
2439
2440 anyhow::Ok(cx.new(|cx| {
2441 Terminal::new(
2442 terminal_id,
2443 &format!("{} {}", command, args.join(" ")),
2444 cwd,
2445 output_byte_limit.map(|l| l as usize),
2446 terminal,
2447 language_registry,
2448 cx,
2449 )
2450 }))
2451 }
2452 });
2453
2454 cx.spawn(async move |this, cx| {
2455 let terminal = terminal_task.await?;
2456 this.update(cx, |this, _cx| {
2457 this.terminals.insert(terminal_id, terminal.clone());
2458 terminal
2459 })
2460 })
2461 }
2462
2463 pub fn kill_terminal(
2464 &mut self,
2465 terminal_id: acp::TerminalId,
2466 cx: &mut Context<Self>,
2467 ) -> Result<()> {
2468 self.terminals
2469 .get(&terminal_id)
2470 .context("Terminal not found")?
2471 .update(cx, |terminal, cx| {
2472 terminal.kill(cx);
2473 });
2474
2475 Ok(())
2476 }
2477
2478 pub fn release_terminal(
2479 &mut self,
2480 terminal_id: acp::TerminalId,
2481 cx: &mut Context<Self>,
2482 ) -> Result<()> {
2483 self.terminals
2484 .remove(&terminal_id)
2485 .context("Terminal not found")?
2486 .update(cx, |terminal, cx| {
2487 terminal.kill(cx);
2488 });
2489
2490 Ok(())
2491 }
2492
2493 pub fn terminal(&self, terminal_id: acp::TerminalId) -> Result<Entity<Terminal>> {
2494 self.terminals
2495 .get(&terminal_id)
2496 .context("Terminal not found")
2497 .cloned()
2498 }
2499
2500 pub fn to_markdown(&self, cx: &App) -> String {
2501 self.entries.iter().map(|e| e.to_markdown(cx)).collect()
2502 }
2503
2504 pub fn emit_load_error(&mut self, error: LoadError, cx: &mut Context<Self>) {
2505 cx.emit(AcpThreadEvent::LoadError(error));
2506 }
2507
2508 pub fn register_terminal_created(
2509 &mut self,
2510 terminal_id: acp::TerminalId,
2511 command_label: String,
2512 working_dir: Option<PathBuf>,
2513 output_byte_limit: Option<u64>,
2514 terminal: Entity<::terminal::Terminal>,
2515 cx: &mut Context<Self>,
2516 ) -> Entity<Terminal> {
2517 let language_registry = self.project.read(cx).languages().clone();
2518
2519 let entity = cx.new(|cx| {
2520 Terminal::new(
2521 terminal_id.clone(),
2522 &command_label,
2523 working_dir.clone(),
2524 output_byte_limit.map(|l| l as usize),
2525 terminal,
2526 language_registry,
2527 cx,
2528 )
2529 });
2530 self.terminals.insert(terminal_id.clone(), entity.clone());
2531 entity
2532 }
2533}
2534
2535fn markdown_for_raw_output(
2536 raw_output: &serde_json::Value,
2537 language_registry: &Arc<LanguageRegistry>,
2538 cx: &mut App,
2539) -> Option<Entity<Markdown>> {
2540 match raw_output {
2541 serde_json::Value::Null => None,
2542 serde_json::Value::Bool(value) => Some(cx.new(|cx| {
2543 Markdown::new(
2544 value.to_string().into(),
2545 Some(language_registry.clone()),
2546 None,
2547 cx,
2548 )
2549 })),
2550 serde_json::Value::Number(value) => Some(cx.new(|cx| {
2551 Markdown::new(
2552 value.to_string().into(),
2553 Some(language_registry.clone()),
2554 None,
2555 cx,
2556 )
2557 })),
2558 serde_json::Value::String(value) => Some(cx.new(|cx| {
2559 Markdown::new(
2560 value.clone().into(),
2561 Some(language_registry.clone()),
2562 None,
2563 cx,
2564 )
2565 })),
2566 value => Some(cx.new(|cx| {
2567 let pretty_json = to_string_pretty(value).unwrap_or_else(|_| value.to_string());
2568
2569 Markdown::new(
2570 format!("```json\n{}\n```", pretty_json).into(),
2571 Some(language_registry.clone()),
2572 None,
2573 cx,
2574 )
2575 })),
2576 }
2577}
2578
2579#[cfg(test)]
2580mod tests {
2581 use super::*;
2582 use anyhow::anyhow;
2583 use futures::{channel::mpsc, future::LocalBoxFuture, select};
2584 use gpui::{App, AsyncApp, TestAppContext, WeakEntity};
2585 use indoc::indoc;
2586 use project::{FakeFs, Fs};
2587 use rand::{distr, prelude::*};
2588 use serde_json::json;
2589 use settings::SettingsStore;
2590 use smol::stream::StreamExt as _;
2591 use std::{
2592 any::Any,
2593 cell::RefCell,
2594 path::Path,
2595 rc::Rc,
2596 sync::atomic::{AtomicBool, AtomicUsize, Ordering::SeqCst},
2597 time::Duration,
2598 };
2599 use util::path;
2600
2601 fn init_test(cx: &mut TestAppContext) {
2602 env_logger::try_init().ok();
2603 cx.update(|cx| {
2604 let settings_store = SettingsStore::test(cx);
2605 cx.set_global(settings_store);
2606 });
2607 }
2608
2609 #[gpui::test]
2610 async fn test_terminal_output_buffered_before_created_renders(cx: &mut gpui::TestAppContext) {
2611 init_test(cx);
2612
2613 let fs = FakeFs::new(cx.executor());
2614 let project = Project::test(fs, [], cx).await;
2615 let connection = Rc::new(FakeAgentConnection::new());
2616 let thread = cx
2617 .update(|cx| connection.new_session(project, std::path::Path::new(path!("/test")), cx))
2618 .await
2619 .unwrap();
2620
2621 let terminal_id = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
2622
2623 // Send Output BEFORE Created - should be buffered by acp_thread
2624 thread.update(cx, |thread, cx| {
2625 thread.on_terminal_provider_event(
2626 TerminalProviderEvent::Output {
2627 terminal_id: terminal_id.clone(),
2628 data: b"hello buffered".to_vec(),
2629 },
2630 cx,
2631 );
2632 });
2633
2634 // Create a display-only terminal and then send Created
2635 let lower = cx.new(|cx| {
2636 let builder = ::terminal::TerminalBuilder::new_display_only(
2637 ::terminal::terminal_settings::CursorShape::default(),
2638 ::terminal::terminal_settings::AlternateScroll::On,
2639 None,
2640 0,
2641 cx.background_executor(),
2642 PathStyle::local(),
2643 )
2644 .unwrap();
2645 builder.subscribe(cx)
2646 });
2647
2648 thread.update(cx, |thread, cx| {
2649 thread.on_terminal_provider_event(
2650 TerminalProviderEvent::Created {
2651 terminal_id: terminal_id.clone(),
2652 label: "Buffered Test".to_string(),
2653 cwd: None,
2654 output_byte_limit: None,
2655 terminal: lower.clone(),
2656 },
2657 cx,
2658 );
2659 });
2660
2661 // After Created, buffered Output should have been flushed into the renderer
2662 let content = thread.read_with(cx, |thread, cx| {
2663 let term = thread.terminal(terminal_id.clone()).unwrap();
2664 term.read_with(cx, |t, cx| t.inner().read(cx).get_content())
2665 });
2666
2667 assert!(
2668 content.contains("hello buffered"),
2669 "expected buffered output to render, got: {content}"
2670 );
2671 }
2672
2673 #[gpui::test]
2674 async fn test_terminal_output_and_exit_buffered_before_created(cx: &mut gpui::TestAppContext) {
2675 init_test(cx);
2676
2677 let fs = FakeFs::new(cx.executor());
2678 let project = Project::test(fs, [], cx).await;
2679 let connection = Rc::new(FakeAgentConnection::new());
2680 let thread = cx
2681 .update(|cx| connection.new_session(project, std::path::Path::new(path!("/test")), cx))
2682 .await
2683 .unwrap();
2684
2685 let terminal_id = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
2686
2687 // Send Output BEFORE Created
2688 thread.update(cx, |thread, cx| {
2689 thread.on_terminal_provider_event(
2690 TerminalProviderEvent::Output {
2691 terminal_id: terminal_id.clone(),
2692 data: b"pre-exit data".to_vec(),
2693 },
2694 cx,
2695 );
2696 });
2697
2698 // Send Exit BEFORE Created
2699 thread.update(cx, |thread, cx| {
2700 thread.on_terminal_provider_event(
2701 TerminalProviderEvent::Exit {
2702 terminal_id: terminal_id.clone(),
2703 status: acp::TerminalExitStatus::new().exit_code(0),
2704 },
2705 cx,
2706 );
2707 });
2708
2709 // Now create a display-only lower-level terminal and send Created
2710 let lower = cx.new(|cx| {
2711 let builder = ::terminal::TerminalBuilder::new_display_only(
2712 ::terminal::terminal_settings::CursorShape::default(),
2713 ::terminal::terminal_settings::AlternateScroll::On,
2714 None,
2715 0,
2716 cx.background_executor(),
2717 PathStyle::local(),
2718 )
2719 .unwrap();
2720 builder.subscribe(cx)
2721 });
2722
2723 thread.update(cx, |thread, cx| {
2724 thread.on_terminal_provider_event(
2725 TerminalProviderEvent::Created {
2726 terminal_id: terminal_id.clone(),
2727 label: "Buffered Exit Test".to_string(),
2728 cwd: None,
2729 output_byte_limit: None,
2730 terminal: lower.clone(),
2731 },
2732 cx,
2733 );
2734 });
2735
2736 // Output should be present after Created (flushed from buffer)
2737 let content = thread.read_with(cx, |thread, cx| {
2738 let term = thread.terminal(terminal_id.clone()).unwrap();
2739 term.read_with(cx, |t, cx| t.inner().read(cx).get_content())
2740 });
2741
2742 assert!(
2743 content.contains("pre-exit data"),
2744 "expected pre-exit data to render, got: {content}"
2745 );
2746 }
2747
2748 /// Test that killing a terminal via Terminal::kill properly:
2749 /// 1. Causes wait_for_exit to complete (doesn't hang forever)
2750 /// 2. The underlying terminal still has the output that was written before the kill
2751 ///
2752 /// This test verifies that the fix to kill_active_task (which now also kills
2753 /// the shell process in addition to the foreground process) properly allows
2754 /// wait_for_exit to complete instead of hanging indefinitely.
2755 #[cfg(unix)]
2756 #[gpui::test]
2757 async fn test_terminal_kill_allows_wait_for_exit_to_complete(cx: &mut gpui::TestAppContext) {
2758 use std::collections::HashMap;
2759 use task::Shell;
2760 use util::shell_builder::ShellBuilder;
2761
2762 init_test(cx);
2763 cx.executor().allow_parking();
2764
2765 let fs = FakeFs::new(cx.executor());
2766 let project = Project::test(fs, [], cx).await;
2767 let connection = Rc::new(FakeAgentConnection::new());
2768 let thread = cx
2769 .update(|cx| connection.new_session(project.clone(), Path::new(path!("/test")), cx))
2770 .await
2771 .unwrap();
2772
2773 let terminal_id = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
2774
2775 // Create a real PTY terminal that runs a command which prints output then sleeps
2776 // We use printf instead of echo and chain with && sleep to ensure proper execution
2777 let (completion_tx, _completion_rx) = smol::channel::unbounded();
2778 let (program, args) = ShellBuilder::new(&Shell::System, false).build(
2779 Some("printf 'output_before_kill\\n' && sleep 60".to_owned()),
2780 &[],
2781 );
2782
2783 let builder = cx
2784 .update(|cx| {
2785 ::terminal::TerminalBuilder::new(
2786 None,
2787 None,
2788 task::Shell::WithArguments {
2789 program,
2790 args,
2791 title_override: None,
2792 },
2793 HashMap::default(),
2794 ::terminal::terminal_settings::CursorShape::default(),
2795 ::terminal::terminal_settings::AlternateScroll::On,
2796 None,
2797 vec![],
2798 0,
2799 false,
2800 0,
2801 Some(completion_tx),
2802 cx,
2803 vec![],
2804 PathStyle::local(),
2805 )
2806 })
2807 .await
2808 .unwrap();
2809
2810 let lower_terminal = cx.new(|cx| builder.subscribe(cx));
2811
2812 // Create the acp_thread Terminal wrapper
2813 thread.update(cx, |thread, cx| {
2814 thread.on_terminal_provider_event(
2815 TerminalProviderEvent::Created {
2816 terminal_id: terminal_id.clone(),
2817 label: "printf output_before_kill && sleep 60".to_string(),
2818 cwd: None,
2819 output_byte_limit: None,
2820 terminal: lower_terminal.clone(),
2821 },
2822 cx,
2823 );
2824 });
2825
2826 // Wait for the printf command to execute and produce output
2827 // Use real time since parking is enabled
2828 cx.executor().timer(Duration::from_millis(500)).await;
2829
2830 // Get the acp_thread Terminal and kill it
2831 let wait_for_exit = thread.update(cx, |thread, cx| {
2832 let term = thread.terminals.get(&terminal_id).unwrap();
2833 let wait_for_exit = term.read(cx).wait_for_exit();
2834 term.update(cx, |term, cx| {
2835 term.kill(cx);
2836 });
2837 wait_for_exit
2838 });
2839
2840 // KEY ASSERTION: wait_for_exit should complete within a reasonable time (not hang).
2841 // Before the fix to kill_active_task, this would hang forever because
2842 // only the foreground process was killed, not the shell, so the PTY
2843 // child never exited and wait_for_completed_task never completed.
2844 let exit_result = futures::select! {
2845 result = futures::FutureExt::fuse(wait_for_exit) => Some(result),
2846 _ = futures::FutureExt::fuse(cx.background_executor.timer(Duration::from_secs(5))) => None,
2847 };
2848
2849 assert!(
2850 exit_result.is_some(),
2851 "wait_for_exit should complete after kill, but it timed out. \
2852 This indicates kill_active_task is not properly killing the shell process."
2853 );
2854
2855 // Give the system a chance to process any pending updates
2856 cx.run_until_parked();
2857
2858 // Verify that the underlying terminal still has the output that was
2859 // written before the kill. This verifies that killing doesn't lose output.
2860 let inner_content = thread.read_with(cx, |thread, cx| {
2861 let term = thread.terminals.get(&terminal_id).unwrap();
2862 term.read(cx).inner().read(cx).get_content()
2863 });
2864
2865 assert!(
2866 inner_content.contains("output_before_kill"),
2867 "Underlying terminal should contain output from before kill, got: {}",
2868 inner_content
2869 );
2870 }
2871
2872 #[gpui::test]
2873 async fn test_push_user_content_block(cx: &mut gpui::TestAppContext) {
2874 init_test(cx);
2875
2876 let fs = FakeFs::new(cx.executor());
2877 let project = Project::test(fs, [], cx).await;
2878 let connection = Rc::new(FakeAgentConnection::new());
2879 let thread = cx
2880 .update(|cx| connection.new_session(project, Path::new(path!("/test")), cx))
2881 .await
2882 .unwrap();
2883
2884 // Test creating a new user message
2885 thread.update(cx, |thread, cx| {
2886 thread.push_user_content_block(None, "Hello, ".into(), cx);
2887 });
2888
2889 thread.update(cx, |thread, cx| {
2890 assert_eq!(thread.entries.len(), 1);
2891 if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
2892 assert_eq!(user_msg.id, None);
2893 assert_eq!(user_msg.content.to_markdown(cx), "Hello, ");
2894 } else {
2895 panic!("Expected UserMessage");
2896 }
2897 });
2898
2899 // Test appending to existing user message
2900 let message_1_id = UserMessageId::new();
2901 thread.update(cx, |thread, cx| {
2902 thread.push_user_content_block(Some(message_1_id.clone()), "world!".into(), cx);
2903 });
2904
2905 thread.update(cx, |thread, cx| {
2906 assert_eq!(thread.entries.len(), 1);
2907 if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
2908 assert_eq!(user_msg.id, Some(message_1_id));
2909 assert_eq!(user_msg.content.to_markdown(cx), "Hello, world!");
2910 } else {
2911 panic!("Expected UserMessage");
2912 }
2913 });
2914
2915 // Test creating new user message after assistant message
2916 thread.update(cx, |thread, cx| {
2917 thread.push_assistant_content_block("Assistant response".into(), false, cx);
2918 });
2919
2920 let message_2_id = UserMessageId::new();
2921 thread.update(cx, |thread, cx| {
2922 thread.push_user_content_block(
2923 Some(message_2_id.clone()),
2924 "New user message".into(),
2925 cx,
2926 );
2927 });
2928
2929 thread.update(cx, |thread, cx| {
2930 assert_eq!(thread.entries.len(), 3);
2931 if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[2] {
2932 assert_eq!(user_msg.id, Some(message_2_id));
2933 assert_eq!(user_msg.content.to_markdown(cx), "New user message");
2934 } else {
2935 panic!("Expected UserMessage at index 2");
2936 }
2937 });
2938 }
2939
2940 #[gpui::test]
2941 async fn test_thinking_concatenation(cx: &mut gpui::TestAppContext) {
2942 init_test(cx);
2943
2944 let fs = FakeFs::new(cx.executor());
2945 let project = Project::test(fs, [], cx).await;
2946 let connection = Rc::new(FakeAgentConnection::new().on_user_message(
2947 |_, thread, mut cx| {
2948 async move {
2949 thread.update(&mut cx, |thread, cx| {
2950 thread
2951 .handle_session_update(
2952 acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk::new(
2953 "Thinking ".into(),
2954 )),
2955 cx,
2956 )
2957 .unwrap();
2958 thread
2959 .handle_session_update(
2960 acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk::new(
2961 "hard!".into(),
2962 )),
2963 cx,
2964 )
2965 .unwrap();
2966 })?;
2967 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
2968 }
2969 .boxed_local()
2970 },
2971 ));
2972
2973 let thread = cx
2974 .update(|cx| connection.new_session(project, Path::new(path!("/test")), cx))
2975 .await
2976 .unwrap();
2977
2978 thread
2979 .update(cx, |thread, cx| thread.send_raw("Hello from Zed!", cx))
2980 .await
2981 .unwrap();
2982
2983 let output = thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
2984 assert_eq!(
2985 output,
2986 indoc! {r#"
2987 ## User
2988
2989 Hello from Zed!
2990
2991 ## Assistant
2992
2993 <thinking>
2994 Thinking hard!
2995 </thinking>
2996
2997 "#}
2998 );
2999 }
3000
3001 #[gpui::test]
3002 async fn test_edits_concurrently_to_user(cx: &mut TestAppContext) {
3003 init_test(cx);
3004
3005 let fs = FakeFs::new(cx.executor());
3006 fs.insert_tree(path!("/tmp"), json!({"foo": "one\ntwo\nthree\n"}))
3007 .await;
3008 let project = Project::test(fs.clone(), [], cx).await;
3009 let (read_file_tx, read_file_rx) = oneshot::channel::<()>();
3010 let read_file_tx = Rc::new(RefCell::new(Some(read_file_tx)));
3011 let connection = Rc::new(FakeAgentConnection::new().on_user_message(
3012 move |_, thread, mut cx| {
3013 let read_file_tx = read_file_tx.clone();
3014 async move {
3015 let content = thread
3016 .update(&mut cx, |thread, cx| {
3017 thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
3018 })
3019 .unwrap()
3020 .await
3021 .unwrap();
3022 assert_eq!(content, "one\ntwo\nthree\n");
3023 read_file_tx.take().unwrap().send(()).unwrap();
3024 thread
3025 .update(&mut cx, |thread, cx| {
3026 thread.write_text_file(
3027 path!("/tmp/foo").into(),
3028 "one\ntwo\nthree\nfour\nfive\n".to_string(),
3029 cx,
3030 )
3031 })
3032 .unwrap()
3033 .await
3034 .unwrap();
3035 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3036 }
3037 .boxed_local()
3038 },
3039 ));
3040
3041 let (worktree, pathbuf) = project
3042 .update(cx, |project, cx| {
3043 project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
3044 })
3045 .await
3046 .unwrap();
3047 let buffer = project
3048 .update(cx, |project, cx| {
3049 project.open_buffer((worktree.read(cx).id(), pathbuf), cx)
3050 })
3051 .await
3052 .unwrap();
3053
3054 let thread = cx
3055 .update(|cx| connection.new_session(project, Path::new(path!("/tmp")), cx))
3056 .await
3057 .unwrap();
3058
3059 let request = thread.update(cx, |thread, cx| {
3060 thread.send_raw("Extend the count in /tmp/foo", cx)
3061 });
3062 read_file_rx.await.ok();
3063 buffer.update(cx, |buffer, cx| {
3064 buffer.edit([(0..0, "zero\n".to_string())], None, cx);
3065 });
3066 cx.run_until_parked();
3067 assert_eq!(
3068 buffer.read_with(cx, |buffer, _| buffer.text()),
3069 "zero\none\ntwo\nthree\nfour\nfive\n"
3070 );
3071 assert_eq!(
3072 String::from_utf8(fs.read_file_sync(path!("/tmp/foo")).unwrap()).unwrap(),
3073 "zero\none\ntwo\nthree\nfour\nfive\n"
3074 );
3075 request.await.unwrap();
3076 }
3077
3078 #[gpui::test]
3079 async fn test_reading_from_line(cx: &mut TestAppContext) {
3080 init_test(cx);
3081
3082 let fs = FakeFs::new(cx.executor());
3083 fs.insert_tree(path!("/tmp"), json!({"foo": "one\ntwo\nthree\nfour\n"}))
3084 .await;
3085 let project = Project::test(fs.clone(), [], cx).await;
3086 project
3087 .update(cx, |project, cx| {
3088 project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
3089 })
3090 .await
3091 .unwrap();
3092
3093 let connection = Rc::new(FakeAgentConnection::new());
3094
3095 let thread = cx
3096 .update(|cx| connection.new_session(project, Path::new(path!("/tmp")), cx))
3097 .await
3098 .unwrap();
3099
3100 // Whole file
3101 let content = thread
3102 .update(cx, |thread, cx| {
3103 thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
3104 })
3105 .await
3106 .unwrap();
3107
3108 assert_eq!(content, "one\ntwo\nthree\nfour\n");
3109
3110 // Only start line
3111 let content = thread
3112 .update(cx, |thread, cx| {
3113 thread.read_text_file(path!("/tmp/foo").into(), Some(3), None, false, cx)
3114 })
3115 .await
3116 .unwrap();
3117
3118 assert_eq!(content, "three\nfour\n");
3119
3120 // Only limit
3121 let content = thread
3122 .update(cx, |thread, cx| {
3123 thread.read_text_file(path!("/tmp/foo").into(), None, Some(2), false, cx)
3124 })
3125 .await
3126 .unwrap();
3127
3128 assert_eq!(content, "one\ntwo\n");
3129
3130 // Range
3131 let content = thread
3132 .update(cx, |thread, cx| {
3133 thread.read_text_file(path!("/tmp/foo").into(), Some(2), Some(2), false, cx)
3134 })
3135 .await
3136 .unwrap();
3137
3138 assert_eq!(content, "two\nthree\n");
3139
3140 // Invalid
3141 let err = thread
3142 .update(cx, |thread, cx| {
3143 thread.read_text_file(path!("/tmp/foo").into(), Some(6), Some(2), false, cx)
3144 })
3145 .await
3146 .unwrap_err();
3147
3148 assert_eq!(
3149 err.to_string(),
3150 "Invalid params: \"Attempting to read beyond the end of the file, line 5:0\""
3151 );
3152 }
3153
3154 #[gpui::test]
3155 async fn test_reading_empty_file(cx: &mut TestAppContext) {
3156 init_test(cx);
3157
3158 let fs = FakeFs::new(cx.executor());
3159 fs.insert_tree(path!("/tmp"), json!({"foo": ""})).await;
3160 let project = Project::test(fs.clone(), [], cx).await;
3161 project
3162 .update(cx, |project, cx| {
3163 project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
3164 })
3165 .await
3166 .unwrap();
3167
3168 let connection = Rc::new(FakeAgentConnection::new());
3169
3170 let thread = cx
3171 .update(|cx| connection.new_session(project, Path::new(path!("/tmp")), cx))
3172 .await
3173 .unwrap();
3174
3175 // Whole file
3176 let content = thread
3177 .update(cx, |thread, cx| {
3178 thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
3179 })
3180 .await
3181 .unwrap();
3182
3183 assert_eq!(content, "");
3184
3185 // Only start line
3186 let content = thread
3187 .update(cx, |thread, cx| {
3188 thread.read_text_file(path!("/tmp/foo").into(), Some(1), None, false, cx)
3189 })
3190 .await
3191 .unwrap();
3192
3193 assert_eq!(content, "");
3194
3195 // Only limit
3196 let content = thread
3197 .update(cx, |thread, cx| {
3198 thread.read_text_file(path!("/tmp/foo").into(), None, Some(2), false, cx)
3199 })
3200 .await
3201 .unwrap();
3202
3203 assert_eq!(content, "");
3204
3205 // Range
3206 let content = thread
3207 .update(cx, |thread, cx| {
3208 thread.read_text_file(path!("/tmp/foo").into(), Some(1), Some(1), false, cx)
3209 })
3210 .await
3211 .unwrap();
3212
3213 assert_eq!(content, "");
3214
3215 // Invalid
3216 let err = thread
3217 .update(cx, |thread, cx| {
3218 thread.read_text_file(path!("/tmp/foo").into(), Some(5), Some(2), false, cx)
3219 })
3220 .await
3221 .unwrap_err();
3222
3223 assert_eq!(
3224 err.to_string(),
3225 "Invalid params: \"Attempting to read beyond the end of the file, line 1:0\""
3226 );
3227 }
3228 #[gpui::test]
3229 async fn test_reading_non_existing_file(cx: &mut TestAppContext) {
3230 init_test(cx);
3231
3232 let fs = FakeFs::new(cx.executor());
3233 fs.insert_tree(path!("/tmp"), json!({})).await;
3234 let project = Project::test(fs.clone(), [], cx).await;
3235 project
3236 .update(cx, |project, cx| {
3237 project.find_or_create_worktree(path!("/tmp"), true, cx)
3238 })
3239 .await
3240 .unwrap();
3241
3242 let connection = Rc::new(FakeAgentConnection::new());
3243
3244 let thread = cx
3245 .update(|cx| connection.new_session(project, Path::new(path!("/tmp")), cx))
3246 .await
3247 .unwrap();
3248
3249 // Out of project file
3250 let err = thread
3251 .update(cx, |thread, cx| {
3252 thread.read_text_file(path!("/foo").into(), None, None, false, cx)
3253 })
3254 .await
3255 .unwrap_err();
3256
3257 assert_eq!(err.code, acp::ErrorCode::ResourceNotFound);
3258 }
3259
3260 #[gpui::test]
3261 async fn test_succeeding_canceled_toolcall(cx: &mut TestAppContext) {
3262 init_test(cx);
3263
3264 let fs = FakeFs::new(cx.executor());
3265 let project = Project::test(fs, [], cx).await;
3266 let id = acp::ToolCallId::new("test");
3267
3268 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3269 let id = id.clone();
3270 move |_, thread, mut cx| {
3271 let id = id.clone();
3272 async move {
3273 thread
3274 .update(&mut cx, |thread, cx| {
3275 thread.handle_session_update(
3276 acp::SessionUpdate::ToolCall(
3277 acp::ToolCall::new(id.clone(), "Label")
3278 .kind(acp::ToolKind::Fetch)
3279 .status(acp::ToolCallStatus::InProgress),
3280 ),
3281 cx,
3282 )
3283 })
3284 .unwrap()
3285 .unwrap();
3286 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3287 }
3288 .boxed_local()
3289 }
3290 }));
3291
3292 let thread = cx
3293 .update(|cx| connection.new_session(project, Path::new(path!("/test")), cx))
3294 .await
3295 .unwrap();
3296
3297 let request = thread.update(cx, |thread, cx| {
3298 thread.send_raw("Fetch https://example.com", cx)
3299 });
3300
3301 run_until_first_tool_call(&thread, cx).await;
3302
3303 thread.read_with(cx, |thread, _| {
3304 assert!(matches!(
3305 thread.entries[1],
3306 AgentThreadEntry::ToolCall(ToolCall {
3307 status: ToolCallStatus::InProgress,
3308 ..
3309 })
3310 ));
3311 });
3312
3313 thread.update(cx, |thread, cx| thread.cancel(cx)).await;
3314
3315 thread.read_with(cx, |thread, _| {
3316 assert!(matches!(
3317 &thread.entries[1],
3318 AgentThreadEntry::ToolCall(ToolCall {
3319 status: ToolCallStatus::Canceled,
3320 ..
3321 })
3322 ));
3323 });
3324
3325 thread
3326 .update(cx, |thread, cx| {
3327 thread.handle_session_update(
3328 acp::SessionUpdate::ToolCallUpdate(acp::ToolCallUpdate::new(
3329 id,
3330 acp::ToolCallUpdateFields::new().status(acp::ToolCallStatus::Completed),
3331 )),
3332 cx,
3333 )
3334 })
3335 .unwrap();
3336
3337 request.await.unwrap();
3338
3339 thread.read_with(cx, |thread, _| {
3340 assert!(matches!(
3341 thread.entries[1],
3342 AgentThreadEntry::ToolCall(ToolCall {
3343 status: ToolCallStatus::Completed,
3344 ..
3345 })
3346 ));
3347 });
3348 }
3349
3350 #[gpui::test]
3351 async fn test_no_pending_edits_if_tool_calls_are_completed(cx: &mut TestAppContext) {
3352 init_test(cx);
3353 let fs = FakeFs::new(cx.background_executor.clone());
3354 fs.insert_tree(path!("/test"), json!({})).await;
3355 let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
3356
3357 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3358 move |_, thread, mut cx| {
3359 async move {
3360 thread
3361 .update(&mut cx, |thread, cx| {
3362 thread.handle_session_update(
3363 acp::SessionUpdate::ToolCall(
3364 acp::ToolCall::new("test", "Label")
3365 .kind(acp::ToolKind::Edit)
3366 .status(acp::ToolCallStatus::Completed)
3367 .content(vec![acp::ToolCallContent::Diff(acp::Diff::new(
3368 "/test/test.txt",
3369 "foo",
3370 ))]),
3371 ),
3372 cx,
3373 )
3374 })
3375 .unwrap()
3376 .unwrap();
3377 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3378 }
3379 .boxed_local()
3380 }
3381 }));
3382
3383 let thread = cx
3384 .update(|cx| connection.new_session(project, Path::new(path!("/test")), cx))
3385 .await
3386 .unwrap();
3387
3388 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Hi".into()], cx)))
3389 .await
3390 .unwrap();
3391
3392 assert!(cx.read(|cx| !thread.read(cx).has_pending_edit_tool_calls()));
3393 }
3394
3395 #[gpui::test(iterations = 10)]
3396 async fn test_checkpoints(cx: &mut TestAppContext) {
3397 init_test(cx);
3398 let fs = FakeFs::new(cx.background_executor.clone());
3399 fs.insert_tree(
3400 path!("/test"),
3401 json!({
3402 ".git": {}
3403 }),
3404 )
3405 .await;
3406 let project = Project::test(fs.clone(), [path!("/test").as_ref()], cx).await;
3407
3408 let simulate_changes = Arc::new(AtomicBool::new(true));
3409 let next_filename = Arc::new(AtomicUsize::new(0));
3410 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3411 let simulate_changes = simulate_changes.clone();
3412 let next_filename = next_filename.clone();
3413 let fs = fs.clone();
3414 move |request, thread, mut cx| {
3415 let fs = fs.clone();
3416 let simulate_changes = simulate_changes.clone();
3417 let next_filename = next_filename.clone();
3418 async move {
3419 if simulate_changes.load(SeqCst) {
3420 let filename = format!("/test/file-{}", next_filename.fetch_add(1, SeqCst));
3421 fs.write(Path::new(&filename), b"").await?;
3422 }
3423
3424 let acp::ContentBlock::Text(content) = &request.prompt[0] else {
3425 panic!("expected text content block");
3426 };
3427 thread.update(&mut cx, |thread, cx| {
3428 thread
3429 .handle_session_update(
3430 acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk::new(
3431 content.text.to_uppercase().into(),
3432 )),
3433 cx,
3434 )
3435 .unwrap();
3436 })?;
3437 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3438 }
3439 .boxed_local()
3440 }
3441 }));
3442 let thread = cx
3443 .update(|cx| connection.new_session(project, Path::new(path!("/test")), cx))
3444 .await
3445 .unwrap();
3446
3447 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Lorem".into()], cx)))
3448 .await
3449 .unwrap();
3450 thread.read_with(cx, |thread, cx| {
3451 assert_eq!(
3452 thread.to_markdown(cx),
3453 indoc! {"
3454 ## User (checkpoint)
3455
3456 Lorem
3457
3458 ## Assistant
3459
3460 LOREM
3461
3462 "}
3463 );
3464 });
3465 assert_eq!(fs.files(), vec![Path::new(path!("/test/file-0"))]);
3466
3467 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["ipsum".into()], cx)))
3468 .await
3469 .unwrap();
3470 thread.read_with(cx, |thread, cx| {
3471 assert_eq!(
3472 thread.to_markdown(cx),
3473 indoc! {"
3474 ## User (checkpoint)
3475
3476 Lorem
3477
3478 ## Assistant
3479
3480 LOREM
3481
3482 ## User (checkpoint)
3483
3484 ipsum
3485
3486 ## Assistant
3487
3488 IPSUM
3489
3490 "}
3491 );
3492 });
3493 assert_eq!(
3494 fs.files(),
3495 vec![
3496 Path::new(path!("/test/file-0")),
3497 Path::new(path!("/test/file-1"))
3498 ]
3499 );
3500
3501 // Checkpoint isn't stored when there are no changes.
3502 simulate_changes.store(false, SeqCst);
3503 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["dolor".into()], cx)))
3504 .await
3505 .unwrap();
3506 thread.read_with(cx, |thread, cx| {
3507 assert_eq!(
3508 thread.to_markdown(cx),
3509 indoc! {"
3510 ## User (checkpoint)
3511
3512 Lorem
3513
3514 ## Assistant
3515
3516 LOREM
3517
3518 ## User (checkpoint)
3519
3520 ipsum
3521
3522 ## Assistant
3523
3524 IPSUM
3525
3526 ## User
3527
3528 dolor
3529
3530 ## Assistant
3531
3532 DOLOR
3533
3534 "}
3535 );
3536 });
3537 assert_eq!(
3538 fs.files(),
3539 vec![
3540 Path::new(path!("/test/file-0")),
3541 Path::new(path!("/test/file-1"))
3542 ]
3543 );
3544
3545 // Rewinding the conversation truncates the history and restores the checkpoint.
3546 thread
3547 .update(cx, |thread, cx| {
3548 let AgentThreadEntry::UserMessage(message) = &thread.entries[2] else {
3549 panic!("unexpected entries {:?}", thread.entries)
3550 };
3551 thread.restore_checkpoint(message.id.clone().unwrap(), cx)
3552 })
3553 .await
3554 .unwrap();
3555 thread.read_with(cx, |thread, cx| {
3556 assert_eq!(
3557 thread.to_markdown(cx),
3558 indoc! {"
3559 ## User (checkpoint)
3560
3561 Lorem
3562
3563 ## Assistant
3564
3565 LOREM
3566
3567 "}
3568 );
3569 });
3570 assert_eq!(fs.files(), vec![Path::new(path!("/test/file-0"))]);
3571 }
3572
3573 #[gpui::test]
3574 async fn test_tool_result_refusal(cx: &mut TestAppContext) {
3575 use std::sync::atomic::AtomicUsize;
3576 init_test(cx);
3577
3578 let fs = FakeFs::new(cx.executor());
3579 let project = Project::test(fs, None, cx).await;
3580
3581 // Create a connection that simulates refusal after tool result
3582 let prompt_count = Arc::new(AtomicUsize::new(0));
3583 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3584 let prompt_count = prompt_count.clone();
3585 move |_request, thread, mut cx| {
3586 let count = prompt_count.fetch_add(1, SeqCst);
3587 async move {
3588 if count == 0 {
3589 // First prompt: Generate a tool call with result
3590 thread.update(&mut cx, |thread, cx| {
3591 thread
3592 .handle_session_update(
3593 acp::SessionUpdate::ToolCall(
3594 acp::ToolCall::new("tool1", "Test Tool")
3595 .kind(acp::ToolKind::Fetch)
3596 .status(acp::ToolCallStatus::Completed)
3597 .raw_input(serde_json::json!({"query": "test"}))
3598 .raw_output(serde_json::json!({"result": "inappropriate content"})),
3599 ),
3600 cx,
3601 )
3602 .unwrap();
3603 })?;
3604
3605 // Now return refusal because of the tool result
3606 Ok(acp::PromptResponse::new(acp::StopReason::Refusal))
3607 } else {
3608 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3609 }
3610 }
3611 .boxed_local()
3612 }
3613 }));
3614
3615 let thread = cx
3616 .update(|cx| connection.new_session(project, Path::new(path!("/test")), cx))
3617 .await
3618 .unwrap();
3619
3620 // Track if we see a Refusal event
3621 let saw_refusal_event = Arc::new(std::sync::Mutex::new(false));
3622 let saw_refusal_event_captured = saw_refusal_event.clone();
3623 thread.update(cx, |_thread, cx| {
3624 cx.subscribe(
3625 &thread,
3626 move |_thread, _event_thread, event: &AcpThreadEvent, _cx| {
3627 if matches!(event, AcpThreadEvent::Refusal) {
3628 *saw_refusal_event_captured.lock().unwrap() = true;
3629 }
3630 },
3631 )
3632 .detach();
3633 });
3634
3635 // Send a user message - this will trigger tool call and then refusal
3636 let send_task = thread.update(cx, |thread, cx| thread.send(vec!["Hello".into()], cx));
3637 cx.background_executor.spawn(send_task).detach();
3638 cx.run_until_parked();
3639
3640 // Verify that:
3641 // 1. A Refusal event WAS emitted (because it's a tool result refusal, not user prompt)
3642 // 2. The user message was NOT truncated
3643 assert!(
3644 *saw_refusal_event.lock().unwrap(),
3645 "Refusal event should be emitted for tool result refusals"
3646 );
3647
3648 thread.read_with(cx, |thread, _| {
3649 let entries = thread.entries();
3650 assert!(entries.len() >= 2, "Should have user message and tool call");
3651
3652 // Verify user message is still there
3653 assert!(
3654 matches!(entries[0], AgentThreadEntry::UserMessage(_)),
3655 "User message should not be truncated"
3656 );
3657
3658 // Verify tool call is there with result
3659 if let AgentThreadEntry::ToolCall(tool_call) = &entries[1] {
3660 assert!(
3661 tool_call.raw_output.is_some(),
3662 "Tool call should have output"
3663 );
3664 } else {
3665 panic!("Expected tool call at index 1");
3666 }
3667 });
3668 }
3669
3670 #[gpui::test]
3671 async fn test_user_prompt_refusal_emits_event(cx: &mut TestAppContext) {
3672 init_test(cx);
3673
3674 let fs = FakeFs::new(cx.executor());
3675 let project = Project::test(fs, None, cx).await;
3676
3677 let refuse_next = Arc::new(AtomicBool::new(false));
3678 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3679 let refuse_next = refuse_next.clone();
3680 move |_request, _thread, _cx| {
3681 if refuse_next.load(SeqCst) {
3682 async move { Ok(acp::PromptResponse::new(acp::StopReason::Refusal)) }
3683 .boxed_local()
3684 } else {
3685 async move { Ok(acp::PromptResponse::new(acp::StopReason::EndTurn)) }
3686 .boxed_local()
3687 }
3688 }
3689 }));
3690
3691 let thread = cx
3692 .update(|cx| connection.new_session(project, Path::new(path!("/test")), cx))
3693 .await
3694 .unwrap();
3695
3696 // Track if we see a Refusal event
3697 let saw_refusal_event = Arc::new(std::sync::Mutex::new(false));
3698 let saw_refusal_event_captured = saw_refusal_event.clone();
3699 thread.update(cx, |_thread, cx| {
3700 cx.subscribe(
3701 &thread,
3702 move |_thread, _event_thread, event: &AcpThreadEvent, _cx| {
3703 if matches!(event, AcpThreadEvent::Refusal) {
3704 *saw_refusal_event_captured.lock().unwrap() = true;
3705 }
3706 },
3707 )
3708 .detach();
3709 });
3710
3711 // Send a message that will be refused
3712 refuse_next.store(true, SeqCst);
3713 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["hello".into()], cx)))
3714 .await
3715 .unwrap();
3716
3717 // Verify that a Refusal event WAS emitted for user prompt refusal
3718 assert!(
3719 *saw_refusal_event.lock().unwrap(),
3720 "Refusal event should be emitted for user prompt refusals"
3721 );
3722
3723 // Verify the message was truncated (user prompt refusal)
3724 thread.read_with(cx, |thread, cx| {
3725 assert_eq!(thread.to_markdown(cx), "");
3726 });
3727 }
3728
3729 #[gpui::test]
3730 async fn test_refusal(cx: &mut TestAppContext) {
3731 init_test(cx);
3732 let fs = FakeFs::new(cx.background_executor.clone());
3733 fs.insert_tree(path!("/"), json!({})).await;
3734 let project = Project::test(fs.clone(), [path!("/").as_ref()], cx).await;
3735
3736 let refuse_next = Arc::new(AtomicBool::new(false));
3737 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3738 let refuse_next = refuse_next.clone();
3739 move |request, thread, mut cx| {
3740 let refuse_next = refuse_next.clone();
3741 async move {
3742 if refuse_next.load(SeqCst) {
3743 return Ok(acp::PromptResponse::new(acp::StopReason::Refusal));
3744 }
3745
3746 let acp::ContentBlock::Text(content) = &request.prompt[0] else {
3747 panic!("expected text content block");
3748 };
3749 thread.update(&mut cx, |thread, cx| {
3750 thread
3751 .handle_session_update(
3752 acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk::new(
3753 content.text.to_uppercase().into(),
3754 )),
3755 cx,
3756 )
3757 .unwrap();
3758 })?;
3759 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3760 }
3761 .boxed_local()
3762 }
3763 }));
3764 let thread = cx
3765 .update(|cx| connection.new_session(project, Path::new(path!("/test")), cx))
3766 .await
3767 .unwrap();
3768
3769 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["hello".into()], cx)))
3770 .await
3771 .unwrap();
3772 thread.read_with(cx, |thread, cx| {
3773 assert_eq!(
3774 thread.to_markdown(cx),
3775 indoc! {"
3776 ## User
3777
3778 hello
3779
3780 ## Assistant
3781
3782 HELLO
3783
3784 "}
3785 );
3786 });
3787
3788 // Simulate refusing the second message. The message should be truncated
3789 // when a user prompt is refused.
3790 refuse_next.store(true, SeqCst);
3791 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["world".into()], cx)))
3792 .await
3793 .unwrap();
3794 thread.read_with(cx, |thread, cx| {
3795 assert_eq!(
3796 thread.to_markdown(cx),
3797 indoc! {"
3798 ## User
3799
3800 hello
3801
3802 ## Assistant
3803
3804 HELLO
3805
3806 "}
3807 );
3808 });
3809 }
3810
3811 async fn run_until_first_tool_call(
3812 thread: &Entity<AcpThread>,
3813 cx: &mut TestAppContext,
3814 ) -> usize {
3815 let (mut tx, mut rx) = mpsc::channel::<usize>(1);
3816
3817 let subscription = cx.update(|cx| {
3818 cx.subscribe(thread, move |thread, _, cx| {
3819 for (ix, entry) in thread.read(cx).entries.iter().enumerate() {
3820 if matches!(entry, AgentThreadEntry::ToolCall(_)) {
3821 return tx.try_send(ix).unwrap();
3822 }
3823 }
3824 })
3825 });
3826
3827 select! {
3828 _ = futures::FutureExt::fuse(cx.background_executor.timer(Duration::from_secs(10))) => {
3829 panic!("Timeout waiting for tool call")
3830 }
3831 ix = rx.next().fuse() => {
3832 drop(subscription);
3833 ix.unwrap()
3834 }
3835 }
3836 }
3837
3838 #[derive(Clone, Default)]
3839 struct FakeAgentConnection {
3840 auth_methods: Vec<acp::AuthMethod>,
3841 sessions: Arc<parking_lot::Mutex<HashMap<acp::SessionId, WeakEntity<AcpThread>>>>,
3842 on_user_message: Option<
3843 Rc<
3844 dyn Fn(
3845 acp::PromptRequest,
3846 WeakEntity<AcpThread>,
3847 AsyncApp,
3848 ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
3849 + 'static,
3850 >,
3851 >,
3852 }
3853
3854 impl FakeAgentConnection {
3855 fn new() -> Self {
3856 Self {
3857 auth_methods: Vec::new(),
3858 on_user_message: None,
3859 sessions: Arc::default(),
3860 }
3861 }
3862
3863 #[expect(unused)]
3864 fn with_auth_methods(mut self, auth_methods: Vec<acp::AuthMethod>) -> Self {
3865 self.auth_methods = auth_methods;
3866 self
3867 }
3868
3869 fn on_user_message(
3870 mut self,
3871 handler: impl Fn(
3872 acp::PromptRequest,
3873 WeakEntity<AcpThread>,
3874 AsyncApp,
3875 ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
3876 + 'static,
3877 ) -> Self {
3878 self.on_user_message.replace(Rc::new(handler));
3879 self
3880 }
3881 }
3882
3883 impl AgentConnection for FakeAgentConnection {
3884 fn telemetry_id(&self) -> SharedString {
3885 "fake".into()
3886 }
3887
3888 fn auth_methods(&self) -> &[acp::AuthMethod] {
3889 &self.auth_methods
3890 }
3891
3892 fn new_session(
3893 self: Rc<Self>,
3894 project: Entity<Project>,
3895 _cwd: &Path,
3896 cx: &mut App,
3897 ) -> Task<gpui::Result<Entity<AcpThread>>> {
3898 let session_id = acp::SessionId::new(
3899 rand::rng()
3900 .sample_iter(&distr::Alphanumeric)
3901 .take(7)
3902 .map(char::from)
3903 .collect::<String>(),
3904 );
3905 let action_log = cx.new(|_| ActionLog::new(project.clone()));
3906 let thread = cx.new(|cx| {
3907 AcpThread::new(
3908 None,
3909 "Test",
3910 self.clone(),
3911 project,
3912 action_log,
3913 session_id.clone(),
3914 watch::Receiver::constant(
3915 acp::PromptCapabilities::new()
3916 .image(true)
3917 .audio(true)
3918 .embedded_context(true),
3919 ),
3920 cx,
3921 )
3922 });
3923 self.sessions.lock().insert(session_id, thread.downgrade());
3924 Task::ready(Ok(thread))
3925 }
3926
3927 fn authenticate(&self, method: acp::AuthMethodId, _cx: &mut App) -> Task<gpui::Result<()>> {
3928 if self.auth_methods().iter().any(|m| m.id == method) {
3929 Task::ready(Ok(()))
3930 } else {
3931 Task::ready(Err(anyhow!("Invalid Auth Method")))
3932 }
3933 }
3934
3935 fn prompt(
3936 &self,
3937 _id: Option<UserMessageId>,
3938 params: acp::PromptRequest,
3939 cx: &mut App,
3940 ) -> Task<gpui::Result<acp::PromptResponse>> {
3941 let sessions = self.sessions.lock();
3942 let thread = sessions.get(¶ms.session_id).unwrap();
3943 if let Some(handler) = &self.on_user_message {
3944 let handler = handler.clone();
3945 let thread = thread.clone();
3946 cx.spawn(async move |cx| handler(params, thread, cx.clone()).await)
3947 } else {
3948 Task::ready(Ok(acp::PromptResponse::new(acp::StopReason::EndTurn)))
3949 }
3950 }
3951
3952 fn cancel(&self, session_id: &acp::SessionId, cx: &mut App) {
3953 let sessions = self.sessions.lock();
3954 let thread = sessions.get(session_id).unwrap().clone();
3955
3956 cx.spawn(async move |cx| {
3957 thread
3958 .update(cx, |thread, cx| thread.cancel(cx))
3959 .unwrap()
3960 .await
3961 })
3962 .detach();
3963 }
3964
3965 fn truncate(
3966 &self,
3967 session_id: &acp::SessionId,
3968 _cx: &App,
3969 ) -> Option<Rc<dyn AgentSessionTruncate>> {
3970 Some(Rc::new(FakeAgentSessionEditor {
3971 _session_id: session_id.clone(),
3972 }))
3973 }
3974
3975 fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
3976 self
3977 }
3978 }
3979
3980 struct FakeAgentSessionEditor {
3981 _session_id: acp::SessionId,
3982 }
3983
3984 impl AgentSessionTruncate for FakeAgentSessionEditor {
3985 fn run(&self, _message_id: UserMessageId, _cx: &mut App) -> Task<Result<()>> {
3986 Task::ready(Ok(()))
3987 }
3988 }
3989
3990 #[gpui::test]
3991 async fn test_tool_call_not_found_creates_failed_entry(cx: &mut TestAppContext) {
3992 init_test(cx);
3993
3994 let fs = FakeFs::new(cx.executor());
3995 let project = Project::test(fs, [], cx).await;
3996 let connection = Rc::new(FakeAgentConnection::new());
3997 let thread = cx
3998 .update(|cx| connection.new_session(project, Path::new(path!("/test")), cx))
3999 .await
4000 .unwrap();
4001
4002 // Try to update a tool call that doesn't exist
4003 let nonexistent_id = acp::ToolCallId::new("nonexistent-tool-call");
4004 thread.update(cx, |thread, cx| {
4005 let result = thread.handle_session_update(
4006 acp::SessionUpdate::ToolCallUpdate(acp::ToolCallUpdate::new(
4007 nonexistent_id.clone(),
4008 acp::ToolCallUpdateFields::new().status(acp::ToolCallStatus::Completed),
4009 )),
4010 cx,
4011 );
4012
4013 // The update should succeed (not return an error)
4014 assert!(result.is_ok());
4015
4016 // There should now be exactly one entry in the thread
4017 assert_eq!(thread.entries.len(), 1);
4018
4019 // The entry should be a failed tool call
4020 if let AgentThreadEntry::ToolCall(tool_call) = &thread.entries[0] {
4021 assert_eq!(tool_call.id, nonexistent_id);
4022 assert!(matches!(tool_call.status, ToolCallStatus::Failed));
4023 assert_eq!(tool_call.kind, acp::ToolKind::Fetch);
4024
4025 // Check that the content contains the error message
4026 assert_eq!(tool_call.content.len(), 1);
4027 if let ToolCallContent::ContentBlock(content_block) = &tool_call.content[0] {
4028 match content_block {
4029 ContentBlock::Markdown { markdown } => {
4030 let markdown_text = markdown.read(cx).source();
4031 assert!(markdown_text.contains("Tool call not found"));
4032 }
4033 ContentBlock::Empty => panic!("Expected markdown content, got empty"),
4034 ContentBlock::ResourceLink { .. } => {
4035 panic!("Expected markdown content, got resource link")
4036 }
4037 ContentBlock::Image { .. } => {
4038 panic!("Expected markdown content, got image")
4039 }
4040 }
4041 } else {
4042 panic!("Expected ContentBlock, got: {:?}", tool_call.content[0]);
4043 }
4044 } else {
4045 panic!("Expected ToolCall entry, got: {:?}", thread.entries[0]);
4046 }
4047 });
4048 }
4049
4050 /// Tests that restoring a checkpoint properly cleans up terminals that were
4051 /// created after that checkpoint, and cancels any in-progress generation.
4052 ///
4053 /// Reproduces issue #35142: When a checkpoint is restored, any terminal processes
4054 /// that were started after that checkpoint should be terminated, and any in-progress
4055 /// AI generation should be canceled.
4056 #[gpui::test]
4057 async fn test_restore_checkpoint_kills_terminal(cx: &mut TestAppContext) {
4058 init_test(cx);
4059
4060 let fs = FakeFs::new(cx.executor());
4061 let project = Project::test(fs, [], cx).await;
4062 let connection = Rc::new(FakeAgentConnection::new());
4063 let thread = cx
4064 .update(|cx| connection.new_session(project, Path::new(path!("/test")), cx))
4065 .await
4066 .unwrap();
4067
4068 // Send first user message to create a checkpoint
4069 cx.update(|cx| {
4070 thread.update(cx, |thread, cx| {
4071 thread.send(vec!["first message".into()], cx)
4072 })
4073 })
4074 .await
4075 .unwrap();
4076
4077 // Send second message (creates another checkpoint) - we'll restore to this one
4078 cx.update(|cx| {
4079 thread.update(cx, |thread, cx| {
4080 thread.send(vec!["second message".into()], cx)
4081 })
4082 })
4083 .await
4084 .unwrap();
4085
4086 // Create 2 terminals BEFORE the checkpoint that have completed running
4087 let terminal_id_1 = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
4088 let mock_terminal_1 = cx.new(|cx| {
4089 let builder = ::terminal::TerminalBuilder::new_display_only(
4090 ::terminal::terminal_settings::CursorShape::default(),
4091 ::terminal::terminal_settings::AlternateScroll::On,
4092 None,
4093 0,
4094 cx.background_executor(),
4095 PathStyle::local(),
4096 )
4097 .unwrap();
4098 builder.subscribe(cx)
4099 });
4100
4101 thread.update(cx, |thread, cx| {
4102 thread.on_terminal_provider_event(
4103 TerminalProviderEvent::Created {
4104 terminal_id: terminal_id_1.clone(),
4105 label: "echo 'first'".to_string(),
4106 cwd: Some(PathBuf::from("/test")),
4107 output_byte_limit: None,
4108 terminal: mock_terminal_1.clone(),
4109 },
4110 cx,
4111 );
4112 });
4113
4114 thread.update(cx, |thread, cx| {
4115 thread.on_terminal_provider_event(
4116 TerminalProviderEvent::Output {
4117 terminal_id: terminal_id_1.clone(),
4118 data: b"first\n".to_vec(),
4119 },
4120 cx,
4121 );
4122 });
4123
4124 thread.update(cx, |thread, cx| {
4125 thread.on_terminal_provider_event(
4126 TerminalProviderEvent::Exit {
4127 terminal_id: terminal_id_1.clone(),
4128 status: acp::TerminalExitStatus::new().exit_code(0),
4129 },
4130 cx,
4131 );
4132 });
4133
4134 let terminal_id_2 = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
4135 let mock_terminal_2 = cx.new(|cx| {
4136 let builder = ::terminal::TerminalBuilder::new_display_only(
4137 ::terminal::terminal_settings::CursorShape::default(),
4138 ::terminal::terminal_settings::AlternateScroll::On,
4139 None,
4140 0,
4141 cx.background_executor(),
4142 PathStyle::local(),
4143 )
4144 .unwrap();
4145 builder.subscribe(cx)
4146 });
4147
4148 thread.update(cx, |thread, cx| {
4149 thread.on_terminal_provider_event(
4150 TerminalProviderEvent::Created {
4151 terminal_id: terminal_id_2.clone(),
4152 label: "echo 'second'".to_string(),
4153 cwd: Some(PathBuf::from("/test")),
4154 output_byte_limit: None,
4155 terminal: mock_terminal_2.clone(),
4156 },
4157 cx,
4158 );
4159 });
4160
4161 thread.update(cx, |thread, cx| {
4162 thread.on_terminal_provider_event(
4163 TerminalProviderEvent::Output {
4164 terminal_id: terminal_id_2.clone(),
4165 data: b"second\n".to_vec(),
4166 },
4167 cx,
4168 );
4169 });
4170
4171 thread.update(cx, |thread, cx| {
4172 thread.on_terminal_provider_event(
4173 TerminalProviderEvent::Exit {
4174 terminal_id: terminal_id_2.clone(),
4175 status: acp::TerminalExitStatus::new().exit_code(0),
4176 },
4177 cx,
4178 );
4179 });
4180
4181 // Get the second message ID to restore to
4182 let second_message_id = thread.read_with(cx, |thread, _| {
4183 // At this point we have:
4184 // - Index 0: First user message (with checkpoint)
4185 // - Index 1: Second user message (with checkpoint)
4186 // No assistant responses because FakeAgentConnection just returns EndTurn
4187 let AgentThreadEntry::UserMessage(message) = &thread.entries[1] else {
4188 panic!("expected user message at index 1");
4189 };
4190 message.id.clone().unwrap()
4191 });
4192
4193 // Create a terminal AFTER the checkpoint we'll restore to.
4194 // This simulates the AI agent starting a long-running terminal command.
4195 let terminal_id = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
4196 let mock_terminal = cx.new(|cx| {
4197 let builder = ::terminal::TerminalBuilder::new_display_only(
4198 ::terminal::terminal_settings::CursorShape::default(),
4199 ::terminal::terminal_settings::AlternateScroll::On,
4200 None,
4201 0,
4202 cx.background_executor(),
4203 PathStyle::local(),
4204 )
4205 .unwrap();
4206 builder.subscribe(cx)
4207 });
4208
4209 // Register the terminal as created
4210 thread.update(cx, |thread, cx| {
4211 thread.on_terminal_provider_event(
4212 TerminalProviderEvent::Created {
4213 terminal_id: terminal_id.clone(),
4214 label: "sleep 1000".to_string(),
4215 cwd: Some(PathBuf::from("/test")),
4216 output_byte_limit: None,
4217 terminal: mock_terminal.clone(),
4218 },
4219 cx,
4220 );
4221 });
4222
4223 // Simulate the terminal producing output (still running)
4224 thread.update(cx, |thread, cx| {
4225 thread.on_terminal_provider_event(
4226 TerminalProviderEvent::Output {
4227 terminal_id: terminal_id.clone(),
4228 data: b"terminal is running...\n".to_vec(),
4229 },
4230 cx,
4231 );
4232 });
4233
4234 // Create a tool call entry that references this terminal
4235 // This represents the agent requesting a terminal command
4236 thread.update(cx, |thread, cx| {
4237 thread
4238 .handle_session_update(
4239 acp::SessionUpdate::ToolCall(
4240 acp::ToolCall::new("terminal-tool-1", "Running command")
4241 .kind(acp::ToolKind::Execute)
4242 .status(acp::ToolCallStatus::InProgress)
4243 .content(vec![acp::ToolCallContent::Terminal(acp::Terminal::new(
4244 terminal_id.clone(),
4245 ))])
4246 .raw_input(serde_json::json!({"command": "sleep 1000", "cd": "/test"})),
4247 ),
4248 cx,
4249 )
4250 .unwrap();
4251 });
4252
4253 // Verify terminal exists and is in the thread
4254 let terminal_exists_before =
4255 thread.read_with(cx, |thread, _| thread.terminals.contains_key(&terminal_id));
4256 assert!(
4257 terminal_exists_before,
4258 "Terminal should exist before checkpoint restore"
4259 );
4260
4261 // Verify the terminal's underlying task is still running (not completed)
4262 let terminal_running_before = thread.read_with(cx, |thread, _cx| {
4263 let terminal_entity = thread.terminals.get(&terminal_id).unwrap();
4264 terminal_entity.read_with(cx, |term, _cx| {
4265 term.output().is_none() // output is None means it's still running
4266 })
4267 });
4268 assert!(
4269 terminal_running_before,
4270 "Terminal should be running before checkpoint restore"
4271 );
4272
4273 // Verify we have the expected entries before restore
4274 let entry_count_before = thread.read_with(cx, |thread, _| thread.entries.len());
4275 assert!(
4276 entry_count_before > 1,
4277 "Should have multiple entries before restore"
4278 );
4279
4280 // Restore the checkpoint to the second message.
4281 // This should:
4282 // 1. Cancel any in-progress generation (via the cancel() call)
4283 // 2. Remove the terminal that was created after that point
4284 thread
4285 .update(cx, |thread, cx| {
4286 thread.restore_checkpoint(second_message_id, cx)
4287 })
4288 .await
4289 .unwrap();
4290
4291 // Verify that no send_task is in progress after restore
4292 // (cancel() clears the send_task)
4293 let has_send_task_after = thread.read_with(cx, |thread, _| thread.send_task.is_some());
4294 assert!(
4295 !has_send_task_after,
4296 "Should not have a send_task after restore (cancel should have cleared it)"
4297 );
4298
4299 // Verify the entries were truncated (restoring to index 1 truncates at 1, keeping only index 0)
4300 let entry_count = thread.read_with(cx, |thread, _| thread.entries.len());
4301 assert_eq!(
4302 entry_count, 1,
4303 "Should have 1 entry after restore (only the first user message)"
4304 );
4305
4306 // Verify the 2 completed terminals from before the checkpoint still exist
4307 let terminal_1_exists = thread.read_with(cx, |thread, _| {
4308 thread.terminals.contains_key(&terminal_id_1)
4309 });
4310 assert!(
4311 terminal_1_exists,
4312 "Terminal 1 (from before checkpoint) should still exist"
4313 );
4314
4315 let terminal_2_exists = thread.read_with(cx, |thread, _| {
4316 thread.terminals.contains_key(&terminal_id_2)
4317 });
4318 assert!(
4319 terminal_2_exists,
4320 "Terminal 2 (from before checkpoint) should still exist"
4321 );
4322
4323 // Verify they're still in completed state
4324 let terminal_1_completed = thread.read_with(cx, |thread, _cx| {
4325 let terminal_entity = thread.terminals.get(&terminal_id_1).unwrap();
4326 terminal_entity.read_with(cx, |term, _cx| term.output().is_some())
4327 });
4328 assert!(terminal_1_completed, "Terminal 1 should still be completed");
4329
4330 let terminal_2_completed = thread.read_with(cx, |thread, _cx| {
4331 let terminal_entity = thread.terminals.get(&terminal_id_2).unwrap();
4332 terminal_entity.read_with(cx, |term, _cx| term.output().is_some())
4333 });
4334 assert!(terminal_2_completed, "Terminal 2 should still be completed");
4335
4336 // Verify the running terminal (created after checkpoint) was removed
4337 let terminal_3_exists =
4338 thread.read_with(cx, |thread, _| thread.terminals.contains_key(&terminal_id));
4339 assert!(
4340 !terminal_3_exists,
4341 "Terminal 3 (created after checkpoint) should have been removed"
4342 );
4343
4344 // Verify total count is 2 (the two from before the checkpoint)
4345 let terminal_count = thread.read_with(cx, |thread, _| thread.terminals.len());
4346 assert_eq!(
4347 terminal_count, 2,
4348 "Should have exactly 2 terminals (the completed ones from before checkpoint)"
4349 );
4350 }
4351
4352 /// Tests that update_last_checkpoint correctly updates the original message's checkpoint
4353 /// even when a new user message is added while the async checkpoint comparison is in progress.
4354 ///
4355 /// This is a regression test for a bug where update_last_checkpoint would fail with
4356 /// "no checkpoint" if a new user message (without a checkpoint) was added between when
4357 /// update_last_checkpoint started and when its async closure ran.
4358 #[gpui::test]
4359 async fn test_update_last_checkpoint_with_new_message_added(cx: &mut TestAppContext) {
4360 init_test(cx);
4361
4362 let fs = FakeFs::new(cx.executor());
4363 fs.insert_tree(path!("/test"), json!({".git": {}, "file.txt": "content"}))
4364 .await;
4365 let project = Project::test(fs.clone(), [Path::new(path!("/test"))], cx).await;
4366
4367 let handler_done = Arc::new(AtomicBool::new(false));
4368 let handler_done_clone = handler_done.clone();
4369 let connection = Rc::new(FakeAgentConnection::new().on_user_message(
4370 move |_, _thread, _cx| {
4371 handler_done_clone.store(true, SeqCst);
4372 async move { Ok(acp::PromptResponse::new(acp::StopReason::EndTurn)) }.boxed_local()
4373 },
4374 ));
4375
4376 let thread = cx
4377 .update(|cx| connection.new_session(project, Path::new(path!("/test")), cx))
4378 .await
4379 .unwrap();
4380
4381 let send_future = thread.update(cx, |thread, cx| thread.send_raw("First message", cx));
4382 let send_task = cx.background_executor.spawn(send_future);
4383
4384 // Tick until handler completes, then a few more to let update_last_checkpoint start
4385 while !handler_done.load(SeqCst) {
4386 cx.executor().tick();
4387 }
4388 for _ in 0..5 {
4389 cx.executor().tick();
4390 }
4391
4392 thread.update(cx, |thread, cx| {
4393 thread.push_entry(
4394 AgentThreadEntry::UserMessage(UserMessage {
4395 id: Some(UserMessageId::new()),
4396 content: ContentBlock::Empty,
4397 chunks: vec!["Injected message (no checkpoint)".into()],
4398 checkpoint: None,
4399 indented: false,
4400 }),
4401 cx,
4402 );
4403 });
4404
4405 cx.run_until_parked();
4406 let result = send_task.await;
4407
4408 assert!(
4409 result.is_ok(),
4410 "send should succeed even when new message added during update_last_checkpoint: {:?}",
4411 result.err()
4412 );
4413 }
4414}