1mod connection;
2mod diff;
3mod mention;
4mod terminal;
5
6use agent_settings::AgentSettings;
7
8/// Key used in ACP ToolCall meta to store the tool's programmatic name.
9/// This is a workaround since ACP's ToolCall doesn't have a dedicated name field.
10pub const TOOL_NAME_META_KEY: &str = "tool_name";
11
12/// The tool name for subagent spawning
13pub const SUBAGENT_TOOL_NAME: &str = "subagent";
14
15/// Helper to extract tool name from ACP meta
16pub fn tool_name_from_meta(meta: &Option<acp::Meta>) -> Option<SharedString> {
17 meta.as_ref()
18 .and_then(|m| m.get(TOOL_NAME_META_KEY))
19 .and_then(|v| v.as_str())
20 .map(|s| SharedString::from(s.to_owned()))
21}
22
23/// Helper to create meta with tool name
24pub fn meta_with_tool_name(tool_name: &str) -> acp::Meta {
25 acp::Meta::from_iter([(TOOL_NAME_META_KEY.into(), tool_name.into())])
26}
27use collections::HashSet;
28pub use connection::*;
29pub use diff::*;
30use language::language_settings::FormatOnSave;
31pub use mention::*;
32use project::lsp_store::{FormatTrigger, LspFormatTarget};
33use serde::{Deserialize, Serialize};
34use serde_json::to_string_pretty;
35use settings::Settings as _;
36use task::{Shell, ShellBuilder};
37pub use terminal::*;
38
39use action_log::{ActionLog, ActionLogTelemetry};
40use agent_client_protocol::{self as acp};
41use anyhow::{Context as _, Result, anyhow};
42use editor::Bias;
43use futures::{FutureExt, channel::oneshot, future::BoxFuture};
44use gpui::{AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Task, WeakEntity};
45use itertools::Itertools;
46use language::{Anchor, Buffer, BufferSnapshot, LanguageRegistry, Point, ToPoint, text_diff};
47use markdown::Markdown;
48use project::{AgentLocation, Project, git_store::GitStoreCheckpoint};
49use std::collections::HashMap;
50use std::error::Error;
51use std::fmt::{Formatter, Write};
52use std::ops::Range;
53use std::process::ExitStatus;
54use std::rc::Rc;
55use std::time::{Duration, Instant};
56use std::{fmt::Display, mem, path::PathBuf, sync::Arc};
57use ui::App;
58use util::{ResultExt, get_default_system_shell_preferring_bash, paths::PathStyle};
59use uuid::Uuid;
60
61#[derive(Debug)]
62pub struct UserMessage {
63 pub id: Option<UserMessageId>,
64 pub content: ContentBlock,
65 pub chunks: Vec<acp::ContentBlock>,
66 pub checkpoint: Option<Checkpoint>,
67 pub indented: bool,
68}
69
70#[derive(Debug)]
71pub struct Checkpoint {
72 git_checkpoint: GitStoreCheckpoint,
73 pub show: bool,
74}
75
76impl UserMessage {
77 fn to_markdown(&self, cx: &App) -> String {
78 let mut markdown = String::new();
79 if self
80 .checkpoint
81 .as_ref()
82 .is_some_and(|checkpoint| checkpoint.show)
83 {
84 writeln!(markdown, "## User (checkpoint)").unwrap();
85 } else {
86 writeln!(markdown, "## User").unwrap();
87 }
88 writeln!(markdown).unwrap();
89 writeln!(markdown, "{}", self.content.to_markdown(cx)).unwrap();
90 writeln!(markdown).unwrap();
91 markdown
92 }
93}
94
95#[derive(Debug, PartialEq)]
96pub struct AssistantMessage {
97 pub chunks: Vec<AssistantMessageChunk>,
98 pub indented: bool,
99}
100
101impl AssistantMessage {
102 pub fn to_markdown(&self, cx: &App) -> String {
103 format!(
104 "## Assistant\n\n{}\n\n",
105 self.chunks
106 .iter()
107 .map(|chunk| chunk.to_markdown(cx))
108 .join("\n\n")
109 )
110 }
111}
112
113#[derive(Debug, PartialEq)]
114pub enum AssistantMessageChunk {
115 Message { block: ContentBlock },
116 Thought { block: ContentBlock },
117}
118
119impl AssistantMessageChunk {
120 pub fn from_str(
121 chunk: &str,
122 language_registry: &Arc<LanguageRegistry>,
123 path_style: PathStyle,
124 cx: &mut App,
125 ) -> Self {
126 Self::Message {
127 block: ContentBlock::new(chunk.into(), language_registry, path_style, cx),
128 }
129 }
130
131 fn to_markdown(&self, cx: &App) -> String {
132 match self {
133 Self::Message { block } => block.to_markdown(cx).to_string(),
134 Self::Thought { block } => {
135 format!("<thinking>\n{}\n</thinking>", block.to_markdown(cx))
136 }
137 }
138 }
139}
140
141#[derive(Debug)]
142pub enum AgentThreadEntry {
143 UserMessage(UserMessage),
144 AssistantMessage(AssistantMessage),
145 ToolCall(ToolCall),
146}
147
148impl AgentThreadEntry {
149 pub fn is_indented(&self) -> bool {
150 match self {
151 Self::UserMessage(message) => message.indented,
152 Self::AssistantMessage(message) => message.indented,
153 Self::ToolCall(_) => false,
154 }
155 }
156
157 pub fn to_markdown(&self, cx: &App) -> String {
158 match self {
159 Self::UserMessage(message) => message.to_markdown(cx),
160 Self::AssistantMessage(message) => message.to_markdown(cx),
161 Self::ToolCall(tool_call) => tool_call.to_markdown(cx),
162 }
163 }
164
165 pub fn user_message(&self) -> Option<&UserMessage> {
166 if let AgentThreadEntry::UserMessage(message) = self {
167 Some(message)
168 } else {
169 None
170 }
171 }
172
173 pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
174 if let AgentThreadEntry::ToolCall(call) = self {
175 itertools::Either::Left(call.diffs())
176 } else {
177 itertools::Either::Right(std::iter::empty())
178 }
179 }
180
181 pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
182 if let AgentThreadEntry::ToolCall(call) = self {
183 itertools::Either::Left(call.terminals())
184 } else {
185 itertools::Either::Right(std::iter::empty())
186 }
187 }
188
189 pub fn location(&self, ix: usize) -> Option<(acp::ToolCallLocation, AgentLocation)> {
190 if let AgentThreadEntry::ToolCall(ToolCall {
191 locations,
192 resolved_locations,
193 ..
194 }) = self
195 {
196 Some((
197 locations.get(ix)?.clone(),
198 resolved_locations.get(ix)?.clone()?,
199 ))
200 } else {
201 None
202 }
203 }
204}
205
206#[derive(Debug)]
207pub struct ToolCall {
208 pub id: acp::ToolCallId,
209 pub label: Entity<Markdown>,
210 pub kind: acp::ToolKind,
211 pub content: Vec<ToolCallContent>,
212 pub status: ToolCallStatus,
213 pub locations: Vec<acp::ToolCallLocation>,
214 pub resolved_locations: Vec<Option<AgentLocation>>,
215 pub raw_input: Option<serde_json::Value>,
216 pub raw_input_markdown: Option<Entity<Markdown>>,
217 pub raw_output: Option<serde_json::Value>,
218 pub tool_name: Option<SharedString>,
219}
220
221impl ToolCall {
222 fn from_acp(
223 tool_call: acp::ToolCall,
224 status: ToolCallStatus,
225 language_registry: Arc<LanguageRegistry>,
226 path_style: PathStyle,
227 terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
228 cx: &mut App,
229 ) -> Result<Self> {
230 let title = if tool_call.kind == acp::ToolKind::Execute {
231 tool_call.title
232 } else if let Some((first_line, _)) = tool_call.title.split_once("\n") {
233 first_line.to_owned() + "…"
234 } else {
235 tool_call.title
236 };
237 let mut content = Vec::with_capacity(tool_call.content.len());
238 for item in tool_call.content {
239 if let Some(item) = ToolCallContent::from_acp(
240 item,
241 language_registry.clone(),
242 path_style,
243 terminals,
244 cx,
245 )? {
246 content.push(item);
247 }
248 }
249
250 let raw_input_markdown = tool_call
251 .raw_input
252 .as_ref()
253 .and_then(|input| markdown_for_raw_output(input, &language_registry, cx));
254
255 let tool_name = tool_name_from_meta(&tool_call.meta);
256
257 let result = Self {
258 id: tool_call.tool_call_id,
259 label: cx
260 .new(|cx| Markdown::new(title.into(), Some(language_registry.clone()), None, cx)),
261 kind: tool_call.kind,
262 content,
263 locations: tool_call.locations,
264 resolved_locations: Vec::default(),
265 status,
266 raw_input: tool_call.raw_input,
267 raw_input_markdown,
268 raw_output: tool_call.raw_output,
269 tool_name,
270 };
271 Ok(result)
272 }
273
274 fn update_fields(
275 &mut self,
276 fields: acp::ToolCallUpdateFields,
277 language_registry: Arc<LanguageRegistry>,
278 path_style: PathStyle,
279 terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
280 cx: &mut App,
281 ) -> Result<()> {
282 let acp::ToolCallUpdateFields {
283 kind,
284 status,
285 title,
286 content,
287 locations,
288 raw_input,
289 raw_output,
290 ..
291 } = fields;
292
293 if let Some(kind) = kind {
294 self.kind = kind;
295 }
296
297 if let Some(status) = status {
298 self.status = status.into();
299 }
300
301 if let Some(title) = title {
302 self.label.update(cx, |label, cx| {
303 if self.kind == acp::ToolKind::Execute {
304 label.replace(title, cx);
305 } else if let Some((first_line, _)) = title.split_once("\n") {
306 label.replace(first_line.to_owned() + "…", cx);
307 } else {
308 label.replace(title, cx);
309 }
310 });
311 }
312
313 if let Some(content) = content {
314 let mut new_content_len = content.len();
315 let mut content = content.into_iter();
316
317 // Reuse existing content if we can
318 for (old, new) in self.content.iter_mut().zip(content.by_ref()) {
319 let valid_content =
320 old.update_from_acp(new, language_registry.clone(), path_style, terminals, cx)?;
321 if !valid_content {
322 new_content_len -= 1;
323 }
324 }
325 for new in content {
326 if let Some(new) = ToolCallContent::from_acp(
327 new,
328 language_registry.clone(),
329 path_style,
330 terminals,
331 cx,
332 )? {
333 self.content.push(new);
334 } else {
335 new_content_len -= 1;
336 }
337 }
338 self.content.truncate(new_content_len);
339 }
340
341 if let Some(locations) = locations {
342 self.locations = locations;
343 }
344
345 if let Some(raw_input) = raw_input {
346 self.raw_input_markdown = markdown_for_raw_output(&raw_input, &language_registry, cx);
347 self.raw_input = Some(raw_input);
348 }
349
350 if let Some(raw_output) = raw_output {
351 if self.content.is_empty()
352 && let Some(markdown) = markdown_for_raw_output(&raw_output, &language_registry, cx)
353 {
354 self.content
355 .push(ToolCallContent::ContentBlock(ContentBlock::Markdown {
356 markdown,
357 }));
358 }
359 self.raw_output = Some(raw_output);
360 }
361 Ok(())
362 }
363
364 pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
365 self.content.iter().filter_map(|content| match content {
366 ToolCallContent::Diff(diff) => Some(diff),
367 ToolCallContent::ContentBlock(_) => None,
368 ToolCallContent::Terminal(_) => None,
369 ToolCallContent::SubagentThread(_) => None,
370 })
371 }
372
373 pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
374 self.content.iter().filter_map(|content| match content {
375 ToolCallContent::Terminal(terminal) => Some(terminal),
376 ToolCallContent::ContentBlock(_) => None,
377 ToolCallContent::Diff(_) => None,
378 ToolCallContent::SubagentThread(_) => None,
379 })
380 }
381
382 pub fn subagent_thread(&self) -> Option<&Entity<AcpThread>> {
383 self.content.iter().find_map(|content| match content {
384 ToolCallContent::SubagentThread(thread) => Some(thread),
385 _ => None,
386 })
387 }
388
389 pub fn is_subagent(&self) -> bool {
390 matches!(self.kind, acp::ToolKind::Other)
391 && self
392 .tool_name
393 .as_ref()
394 .map(|n| n.as_ref() == SUBAGENT_TOOL_NAME)
395 .unwrap_or(false)
396 }
397
398 pub fn to_markdown(&self, cx: &App) -> String {
399 let mut markdown = format!(
400 "**Tool Call: {}**\nStatus: {}\n\n",
401 self.label.read(cx).source(),
402 self.status
403 );
404 for content in &self.content {
405 markdown.push_str(content.to_markdown(cx).as_str());
406 markdown.push_str("\n\n");
407 }
408 markdown
409 }
410
411 async fn resolve_location(
412 location: acp::ToolCallLocation,
413 project: WeakEntity<Project>,
414 cx: &mut AsyncApp,
415 ) -> Option<ResolvedLocation> {
416 let buffer = project
417 .update(cx, |project, cx| {
418 project
419 .project_path_for_absolute_path(&location.path, cx)
420 .map(|path| project.open_buffer(path, cx))
421 })
422 .ok()??;
423 let buffer = buffer.await.log_err()?;
424 let position = buffer.update(cx, |buffer, _| {
425 let snapshot = buffer.snapshot();
426 if let Some(row) = location.line {
427 let column = snapshot.indent_size_for_line(row).len;
428 let point = snapshot.clip_point(Point::new(row, column), Bias::Left);
429 snapshot.anchor_before(point)
430 } else {
431 Anchor::min_for_buffer(snapshot.remote_id())
432 }
433 });
434
435 Some(ResolvedLocation { buffer, position })
436 }
437
438 fn resolve_locations(
439 &self,
440 project: Entity<Project>,
441 cx: &mut App,
442 ) -> Task<Vec<Option<ResolvedLocation>>> {
443 let locations = self.locations.clone();
444 project.update(cx, |_, cx| {
445 cx.spawn(async move |project, cx| {
446 let mut new_locations = Vec::new();
447 for location in locations {
448 new_locations.push(Self::resolve_location(location, project.clone(), cx).await);
449 }
450 new_locations
451 })
452 })
453 }
454}
455
456// Separate so we can hold a strong reference to the buffer
457// for saving on the thread
458#[derive(Clone, Debug, PartialEq, Eq)]
459struct ResolvedLocation {
460 buffer: Entity<Buffer>,
461 position: Anchor,
462}
463
464impl From<&ResolvedLocation> for AgentLocation {
465 fn from(value: &ResolvedLocation) -> Self {
466 Self {
467 buffer: value.buffer.downgrade(),
468 position: value.position,
469 }
470 }
471}
472
473#[derive(Debug)]
474pub enum ToolCallStatus {
475 /// The tool call hasn't started running yet, but we start showing it to
476 /// the user.
477 Pending,
478 /// The tool call is waiting for confirmation from the user.
479 WaitingForConfirmation {
480 options: Vec<acp::PermissionOption>,
481 respond_tx: oneshot::Sender<acp::PermissionOptionId>,
482 },
483 /// The tool call is currently running.
484 InProgress,
485 /// The tool call completed successfully.
486 Completed,
487 /// The tool call failed.
488 Failed,
489 /// The user rejected the tool call.
490 Rejected,
491 /// The user canceled generation so the tool call was canceled.
492 Canceled,
493}
494
495impl From<acp::ToolCallStatus> for ToolCallStatus {
496 fn from(status: acp::ToolCallStatus) -> Self {
497 match status {
498 acp::ToolCallStatus::Pending => Self::Pending,
499 acp::ToolCallStatus::InProgress => Self::InProgress,
500 acp::ToolCallStatus::Completed => Self::Completed,
501 acp::ToolCallStatus::Failed => Self::Failed,
502 _ => Self::Pending,
503 }
504 }
505}
506
507impl Display for ToolCallStatus {
508 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
509 write!(
510 f,
511 "{}",
512 match self {
513 ToolCallStatus::Pending => "Pending",
514 ToolCallStatus::WaitingForConfirmation { .. } => "Waiting for confirmation",
515 ToolCallStatus::InProgress => "In Progress",
516 ToolCallStatus::Completed => "Completed",
517 ToolCallStatus::Failed => "Failed",
518 ToolCallStatus::Rejected => "Rejected",
519 ToolCallStatus::Canceled => "Canceled",
520 }
521 )
522 }
523}
524
525#[derive(Debug, PartialEq, Clone)]
526pub enum ContentBlock {
527 Empty,
528 Markdown { markdown: Entity<Markdown> },
529 ResourceLink { resource_link: acp::ResourceLink },
530 Image { image: Arc<gpui::Image> },
531}
532
533impl ContentBlock {
534 pub fn new(
535 block: acp::ContentBlock,
536 language_registry: &Arc<LanguageRegistry>,
537 path_style: PathStyle,
538 cx: &mut App,
539 ) -> Self {
540 let mut this = Self::Empty;
541 this.append(block, language_registry, path_style, cx);
542 this
543 }
544
545 pub fn new_combined(
546 blocks: impl IntoIterator<Item = acp::ContentBlock>,
547 language_registry: Arc<LanguageRegistry>,
548 path_style: PathStyle,
549 cx: &mut App,
550 ) -> Self {
551 let mut this = Self::Empty;
552 for block in blocks {
553 this.append(block, &language_registry, path_style, cx);
554 }
555 this
556 }
557
558 pub fn append(
559 &mut self,
560 block: acp::ContentBlock,
561 language_registry: &Arc<LanguageRegistry>,
562 path_style: PathStyle,
563 cx: &mut App,
564 ) {
565 match (&mut *self, &block) {
566 (ContentBlock::Empty, acp::ContentBlock::ResourceLink(resource_link)) => {
567 *self = ContentBlock::ResourceLink {
568 resource_link: resource_link.clone(),
569 };
570 }
571 (ContentBlock::Empty, acp::ContentBlock::Image(image_content)) => {
572 if let Some(image) = Self::decode_image(image_content) {
573 *self = ContentBlock::Image { image };
574 } else {
575 let new_content = Self::image_md(image_content);
576 *self = Self::create_markdown_block(new_content, language_registry, cx);
577 }
578 }
579 (ContentBlock::Empty, _) => {
580 let new_content = Self::block_string_contents(&block, path_style);
581 *self = Self::create_markdown_block(new_content, language_registry, cx);
582 }
583 (ContentBlock::Markdown { markdown }, _) => {
584 let new_content = Self::block_string_contents(&block, path_style);
585 markdown.update(cx, |markdown, cx| markdown.append(&new_content, cx));
586 }
587 (ContentBlock::ResourceLink { resource_link }, _) => {
588 let existing_content = Self::resource_link_md(&resource_link.uri, path_style);
589 let new_content = Self::block_string_contents(&block, path_style);
590 let combined = format!("{}\n{}", existing_content, new_content);
591 *self = Self::create_markdown_block(combined, language_registry, cx);
592 }
593 (ContentBlock::Image { .. }, _) => {
594 let new_content = Self::block_string_contents(&block, path_style);
595 let combined = format!("`Image`\n{}", new_content);
596 *self = Self::create_markdown_block(combined, language_registry, cx);
597 }
598 }
599 }
600
601 fn decode_image(image_content: &acp::ImageContent) -> Option<Arc<gpui::Image>> {
602 use base64::Engine as _;
603
604 let bytes = base64::engine::general_purpose::STANDARD
605 .decode(image_content.data.as_bytes())
606 .ok()?;
607 let format = gpui::ImageFormat::from_mime_type(&image_content.mime_type)?;
608 Some(Arc::new(gpui::Image::from_bytes(format, bytes)))
609 }
610
611 fn create_markdown_block(
612 content: String,
613 language_registry: &Arc<LanguageRegistry>,
614 cx: &mut App,
615 ) -> ContentBlock {
616 ContentBlock::Markdown {
617 markdown: cx
618 .new(|cx| Markdown::new(content.into(), Some(language_registry.clone()), None, cx)),
619 }
620 }
621
622 fn block_string_contents(block: &acp::ContentBlock, path_style: PathStyle) -> String {
623 match block {
624 acp::ContentBlock::Text(text_content) => text_content.text.clone(),
625 acp::ContentBlock::ResourceLink(resource_link) => {
626 Self::resource_link_md(&resource_link.uri, path_style)
627 }
628 acp::ContentBlock::Resource(acp::EmbeddedResource {
629 resource:
630 acp::EmbeddedResourceResource::TextResourceContents(acp::TextResourceContents {
631 uri,
632 ..
633 }),
634 ..
635 }) => Self::resource_link_md(uri, path_style),
636 acp::ContentBlock::Image(image) => Self::image_md(image),
637 _ => String::new(),
638 }
639 }
640
641 fn resource_link_md(uri: &str, path_style: PathStyle) -> String {
642 if let Some(uri) = MentionUri::parse(uri, path_style).log_err() {
643 uri.as_link().to_string()
644 } else {
645 uri.to_string()
646 }
647 }
648
649 fn image_md(_image: &acp::ImageContent) -> String {
650 "`Image`".into()
651 }
652
653 pub fn to_markdown<'a>(&'a self, cx: &'a App) -> &'a str {
654 match self {
655 ContentBlock::Empty => "",
656 ContentBlock::Markdown { markdown } => markdown.read(cx).source(),
657 ContentBlock::ResourceLink { resource_link } => &resource_link.uri,
658 ContentBlock::Image { .. } => "`Image`",
659 }
660 }
661
662 pub fn markdown(&self) -> Option<&Entity<Markdown>> {
663 match self {
664 ContentBlock::Empty => None,
665 ContentBlock::Markdown { markdown } => Some(markdown),
666 ContentBlock::ResourceLink { .. } => None,
667 ContentBlock::Image { .. } => None,
668 }
669 }
670
671 pub fn resource_link(&self) -> Option<&acp::ResourceLink> {
672 match self {
673 ContentBlock::ResourceLink { resource_link } => Some(resource_link),
674 _ => None,
675 }
676 }
677
678 pub fn image(&self) -> Option<&Arc<gpui::Image>> {
679 match self {
680 ContentBlock::Image { image } => Some(image),
681 _ => None,
682 }
683 }
684}
685
686#[derive(Debug)]
687pub enum ToolCallContent {
688 ContentBlock(ContentBlock),
689 Diff(Entity<Diff>),
690 Terminal(Entity<Terminal>),
691 SubagentThread(Entity<AcpThread>),
692}
693
694impl ToolCallContent {
695 pub fn from_acp(
696 content: acp::ToolCallContent,
697 language_registry: Arc<LanguageRegistry>,
698 path_style: PathStyle,
699 terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
700 cx: &mut App,
701 ) -> Result<Option<Self>> {
702 match content {
703 acp::ToolCallContent::Content(acp::Content { content, .. }) => {
704 Ok(Some(Self::ContentBlock(ContentBlock::new(
705 content,
706 &language_registry,
707 path_style,
708 cx,
709 ))))
710 }
711 acp::ToolCallContent::Diff(diff) => Ok(Some(Self::Diff(cx.new(|cx| {
712 Diff::finalized(
713 diff.path.to_string_lossy().into_owned(),
714 diff.old_text,
715 diff.new_text,
716 language_registry,
717 cx,
718 )
719 })))),
720 acp::ToolCallContent::Terminal(acp::Terminal { terminal_id, .. }) => terminals
721 .get(&terminal_id)
722 .cloned()
723 .map(|terminal| Some(Self::Terminal(terminal)))
724 .ok_or_else(|| anyhow::anyhow!("Terminal with id `{}` not found", terminal_id)),
725 _ => Ok(None),
726 }
727 }
728
729 pub fn update_from_acp(
730 &mut self,
731 new: acp::ToolCallContent,
732 language_registry: Arc<LanguageRegistry>,
733 path_style: PathStyle,
734 terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
735 cx: &mut App,
736 ) -> Result<bool> {
737 let needs_update = match (&self, &new) {
738 (Self::Diff(old_diff), acp::ToolCallContent::Diff(new_diff)) => {
739 old_diff.read(cx).needs_update(
740 new_diff.old_text.as_deref().unwrap_or(""),
741 &new_diff.new_text,
742 cx,
743 )
744 }
745 _ => true,
746 };
747
748 if let Some(update) = Self::from_acp(new, language_registry, path_style, terminals, cx)? {
749 if needs_update {
750 *self = update;
751 }
752 Ok(true)
753 } else {
754 Ok(false)
755 }
756 }
757
758 pub fn to_markdown(&self, cx: &App) -> String {
759 match self {
760 Self::ContentBlock(content) => content.to_markdown(cx).to_string(),
761 Self::Diff(diff) => diff.read(cx).to_markdown(cx),
762 Self::Terminal(terminal) => terminal.read(cx).to_markdown(cx),
763 Self::SubagentThread(thread) => thread.read(cx).to_markdown(cx),
764 }
765 }
766
767 pub fn image(&self) -> Option<&Arc<gpui::Image>> {
768 match self {
769 Self::ContentBlock(content) => content.image(),
770 _ => None,
771 }
772 }
773
774 pub fn subagent_thread(&self) -> Option<&Entity<AcpThread>> {
775 match self {
776 Self::SubagentThread(thread) => Some(thread),
777 _ => None,
778 }
779 }
780}
781
782#[derive(Debug, PartialEq)]
783pub enum ToolCallUpdate {
784 UpdateFields(acp::ToolCallUpdate),
785 UpdateDiff(ToolCallUpdateDiff),
786 UpdateTerminal(ToolCallUpdateTerminal),
787 UpdateSubagentThread(ToolCallUpdateSubagentThread),
788}
789
790impl ToolCallUpdate {
791 fn id(&self) -> &acp::ToolCallId {
792 match self {
793 Self::UpdateFields(update) => &update.tool_call_id,
794 Self::UpdateDiff(diff) => &diff.id,
795 Self::UpdateTerminal(terminal) => &terminal.id,
796 Self::UpdateSubagentThread(subagent) => &subagent.id,
797 }
798 }
799}
800
801impl From<acp::ToolCallUpdate> for ToolCallUpdate {
802 fn from(update: acp::ToolCallUpdate) -> Self {
803 Self::UpdateFields(update)
804 }
805}
806
807impl From<ToolCallUpdateDiff> for ToolCallUpdate {
808 fn from(diff: ToolCallUpdateDiff) -> Self {
809 Self::UpdateDiff(diff)
810 }
811}
812
813#[derive(Debug, PartialEq)]
814pub struct ToolCallUpdateDiff {
815 pub id: acp::ToolCallId,
816 pub diff: Entity<Diff>,
817}
818
819impl From<ToolCallUpdateTerminal> for ToolCallUpdate {
820 fn from(terminal: ToolCallUpdateTerminal) -> Self {
821 Self::UpdateTerminal(terminal)
822 }
823}
824
825#[derive(Debug, PartialEq)]
826pub struct ToolCallUpdateTerminal {
827 pub id: acp::ToolCallId,
828 pub terminal: Entity<Terminal>,
829}
830
831impl From<ToolCallUpdateSubagentThread> for ToolCallUpdate {
832 fn from(subagent: ToolCallUpdateSubagentThread) -> Self {
833 Self::UpdateSubagentThread(subagent)
834 }
835}
836
837#[derive(Debug, PartialEq)]
838pub struct ToolCallUpdateSubagentThread {
839 pub id: acp::ToolCallId,
840 pub thread: Entity<AcpThread>,
841}
842
843#[derive(Debug, Default)]
844pub struct Plan {
845 pub entries: Vec<PlanEntry>,
846}
847
848#[derive(Debug)]
849pub struct PlanStats<'a> {
850 pub in_progress_entry: Option<&'a PlanEntry>,
851 pub pending: u32,
852 pub completed: u32,
853}
854
855impl Plan {
856 pub fn is_empty(&self) -> bool {
857 self.entries.is_empty()
858 }
859
860 pub fn stats(&self) -> PlanStats<'_> {
861 let mut stats = PlanStats {
862 in_progress_entry: None,
863 pending: 0,
864 completed: 0,
865 };
866
867 for entry in &self.entries {
868 match &entry.status {
869 acp::PlanEntryStatus::Pending => {
870 stats.pending += 1;
871 }
872 acp::PlanEntryStatus::InProgress => {
873 stats.in_progress_entry = stats.in_progress_entry.or(Some(entry));
874 }
875 acp::PlanEntryStatus::Completed => {
876 stats.completed += 1;
877 }
878 _ => {}
879 }
880 }
881
882 stats
883 }
884}
885
886#[derive(Debug)]
887pub struct PlanEntry {
888 pub content: Entity<Markdown>,
889 pub priority: acp::PlanEntryPriority,
890 pub status: acp::PlanEntryStatus,
891}
892
893impl PlanEntry {
894 pub fn from_acp(entry: acp::PlanEntry, cx: &mut App) -> Self {
895 Self {
896 content: cx.new(|cx| Markdown::new(entry.content.into(), None, None, cx)),
897 priority: entry.priority,
898 status: entry.status,
899 }
900 }
901}
902
903#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
904pub struct TokenUsage {
905 pub max_tokens: u64,
906 pub used_tokens: u64,
907 pub output_tokens: u64,
908}
909
910impl TokenUsage {
911 pub fn ratio(&self) -> TokenUsageRatio {
912 #[cfg(debug_assertions)]
913 let warning_threshold: f32 = std::env::var("ZED_THREAD_WARNING_THRESHOLD")
914 .unwrap_or("0.8".to_string())
915 .parse()
916 .unwrap();
917 #[cfg(not(debug_assertions))]
918 let warning_threshold: f32 = 0.8;
919
920 // When the maximum is unknown because there is no selected model,
921 // avoid showing the token limit warning.
922 if self.max_tokens == 0 {
923 TokenUsageRatio::Normal
924 } else if self.used_tokens >= self.max_tokens {
925 TokenUsageRatio::Exceeded
926 } else if self.used_tokens as f32 / self.max_tokens as f32 >= warning_threshold {
927 TokenUsageRatio::Warning
928 } else {
929 TokenUsageRatio::Normal
930 }
931 }
932}
933
934#[derive(Debug, Clone, PartialEq, Eq)]
935pub enum TokenUsageRatio {
936 Normal,
937 Warning,
938 Exceeded,
939}
940
941#[derive(Debug, Clone)]
942pub struct RetryStatus {
943 pub last_error: SharedString,
944 pub attempt: usize,
945 pub max_attempts: usize,
946 pub started_at: Instant,
947 pub duration: Duration,
948}
949
950pub struct AcpThread {
951 title: SharedString,
952 entries: Vec<AgentThreadEntry>,
953 plan: Plan,
954 project: Entity<Project>,
955 action_log: Entity<ActionLog>,
956 shared_buffers: HashMap<Entity<Buffer>, BufferSnapshot>,
957 send_task: Option<Task<()>>,
958 connection: Rc<dyn AgentConnection>,
959 session_id: acp::SessionId,
960 token_usage: Option<TokenUsage>,
961 prompt_capabilities: acp::PromptCapabilities,
962 _observe_prompt_capabilities: Task<anyhow::Result<()>>,
963 terminals: HashMap<acp::TerminalId, Entity<Terminal>>,
964 pending_terminal_output: HashMap<acp::TerminalId, Vec<Vec<u8>>>,
965 pending_terminal_exit: HashMap<acp::TerminalId, acp::TerminalExitStatus>,
966}
967
968impl From<&AcpThread> for ActionLogTelemetry {
969 fn from(value: &AcpThread) -> Self {
970 Self {
971 agent_telemetry_id: value.connection().telemetry_id(),
972 session_id: value.session_id.0.clone(),
973 }
974 }
975}
976
977#[derive(Debug)]
978pub enum AcpThreadEvent {
979 NewEntry,
980 TitleUpdated,
981 TokenUsageUpdated,
982 EntryUpdated(usize),
983 EntriesRemoved(Range<usize>),
984 ToolAuthorizationRequired,
985 Retry(RetryStatus),
986 Stopped,
987 Error,
988 LoadError(LoadError),
989 PromptCapabilitiesUpdated,
990 Refusal,
991 AvailableCommandsUpdated(Vec<acp::AvailableCommand>),
992 ModeUpdated(acp::SessionModeId),
993 ConfigOptionsUpdated(Vec<acp::SessionConfigOption>),
994}
995
996impl EventEmitter<AcpThreadEvent> for AcpThread {}
997
998#[derive(Debug, Clone)]
999pub enum TerminalProviderEvent {
1000 Created {
1001 terminal_id: acp::TerminalId,
1002 label: String,
1003 cwd: Option<PathBuf>,
1004 output_byte_limit: Option<u64>,
1005 terminal: Entity<::terminal::Terminal>,
1006 },
1007 Output {
1008 terminal_id: acp::TerminalId,
1009 data: Vec<u8>,
1010 },
1011 TitleChanged {
1012 terminal_id: acp::TerminalId,
1013 title: String,
1014 },
1015 Exit {
1016 terminal_id: acp::TerminalId,
1017 status: acp::TerminalExitStatus,
1018 },
1019}
1020
1021#[derive(Debug, Clone)]
1022pub enum TerminalProviderCommand {
1023 WriteInput {
1024 terminal_id: acp::TerminalId,
1025 bytes: Vec<u8>,
1026 },
1027 Resize {
1028 terminal_id: acp::TerminalId,
1029 cols: u16,
1030 rows: u16,
1031 },
1032 Close {
1033 terminal_id: acp::TerminalId,
1034 },
1035}
1036
1037impl AcpThread {
1038 pub fn on_terminal_provider_event(
1039 &mut self,
1040 event: TerminalProviderEvent,
1041 cx: &mut Context<Self>,
1042 ) {
1043 match event {
1044 TerminalProviderEvent::Created {
1045 terminal_id,
1046 label,
1047 cwd,
1048 output_byte_limit,
1049 terminal,
1050 } => {
1051 let entity = self.register_terminal_created(
1052 terminal_id.clone(),
1053 label,
1054 cwd,
1055 output_byte_limit,
1056 terminal,
1057 cx,
1058 );
1059
1060 if let Some(mut chunks) = self.pending_terminal_output.remove(&terminal_id) {
1061 for data in chunks.drain(..) {
1062 entity.update(cx, |term, cx| {
1063 term.inner().update(cx, |inner, cx| {
1064 inner.write_output(&data, cx);
1065 })
1066 });
1067 }
1068 }
1069
1070 if let Some(_status) = self.pending_terminal_exit.remove(&terminal_id) {
1071 entity.update(cx, |_term, cx| {
1072 cx.notify();
1073 });
1074 }
1075
1076 cx.notify();
1077 }
1078 TerminalProviderEvent::Output { terminal_id, data } => {
1079 if let Some(entity) = self.terminals.get(&terminal_id) {
1080 entity.update(cx, |term, cx| {
1081 term.inner().update(cx, |inner, cx| {
1082 inner.write_output(&data, cx);
1083 })
1084 });
1085 } else {
1086 self.pending_terminal_output
1087 .entry(terminal_id)
1088 .or_default()
1089 .push(data);
1090 }
1091 }
1092 TerminalProviderEvent::TitleChanged { terminal_id, title } => {
1093 if let Some(entity) = self.terminals.get(&terminal_id) {
1094 entity.update(cx, |term, cx| {
1095 term.inner().update(cx, |inner, cx| {
1096 inner.breadcrumb_text = title;
1097 cx.emit(::terminal::Event::BreadcrumbsChanged);
1098 })
1099 });
1100 }
1101 }
1102 TerminalProviderEvent::Exit {
1103 terminal_id,
1104 status,
1105 } => {
1106 if let Some(entity) = self.terminals.get(&terminal_id) {
1107 entity.update(cx, |_term, cx| {
1108 cx.notify();
1109 });
1110 } else {
1111 self.pending_terminal_exit.insert(terminal_id, status);
1112 }
1113 }
1114 }
1115 }
1116}
1117
1118#[derive(PartialEq, Eq, Debug)]
1119pub enum ThreadStatus {
1120 Idle,
1121 Generating,
1122}
1123
1124#[derive(Debug, Clone)]
1125pub enum LoadError {
1126 Unsupported {
1127 command: SharedString,
1128 current_version: SharedString,
1129 minimum_version: SharedString,
1130 },
1131 FailedToInstall(SharedString),
1132 Exited {
1133 status: ExitStatus,
1134 },
1135 Other(SharedString),
1136}
1137
1138impl Display for LoadError {
1139 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1140 match self {
1141 LoadError::Unsupported {
1142 command: path,
1143 current_version,
1144 minimum_version,
1145 } => {
1146 write!(
1147 f,
1148 "version {current_version} from {path} is not supported (need at least {minimum_version})"
1149 )
1150 }
1151 LoadError::FailedToInstall(msg) => write!(f, "Failed to install: {msg}"),
1152 LoadError::Exited { status } => write!(f, "Server exited with status {status}"),
1153 LoadError::Other(msg) => write!(f, "{msg}"),
1154 }
1155 }
1156}
1157
1158impl Error for LoadError {}
1159
1160impl AcpThread {
1161 pub fn new(
1162 title: impl Into<SharedString>,
1163 connection: Rc<dyn AgentConnection>,
1164 project: Entity<Project>,
1165 action_log: Entity<ActionLog>,
1166 session_id: acp::SessionId,
1167 mut prompt_capabilities_rx: watch::Receiver<acp::PromptCapabilities>,
1168 cx: &mut Context<Self>,
1169 ) -> Self {
1170 let prompt_capabilities = prompt_capabilities_rx.borrow().clone();
1171 let task = cx.spawn::<_, anyhow::Result<()>>(async move |this, cx| {
1172 loop {
1173 let caps = prompt_capabilities_rx.recv().await?;
1174 this.update(cx, |this, cx| {
1175 this.prompt_capabilities = caps;
1176 cx.emit(AcpThreadEvent::PromptCapabilitiesUpdated);
1177 })?;
1178 }
1179 });
1180
1181 Self {
1182 action_log,
1183 shared_buffers: Default::default(),
1184 entries: Default::default(),
1185 plan: Default::default(),
1186 title: title.into(),
1187 project,
1188 send_task: None,
1189 connection,
1190 session_id,
1191 token_usage: None,
1192 prompt_capabilities,
1193 _observe_prompt_capabilities: task,
1194 terminals: HashMap::default(),
1195 pending_terminal_output: HashMap::default(),
1196 pending_terminal_exit: HashMap::default(),
1197 }
1198 }
1199
1200 pub fn prompt_capabilities(&self) -> acp::PromptCapabilities {
1201 self.prompt_capabilities.clone()
1202 }
1203
1204 pub fn connection(&self) -> &Rc<dyn AgentConnection> {
1205 &self.connection
1206 }
1207
1208 pub fn action_log(&self) -> &Entity<ActionLog> {
1209 &self.action_log
1210 }
1211
1212 pub fn project(&self) -> &Entity<Project> {
1213 &self.project
1214 }
1215
1216 pub fn title(&self) -> SharedString {
1217 self.title.clone()
1218 }
1219
1220 pub fn entries(&self) -> &[AgentThreadEntry] {
1221 &self.entries
1222 }
1223
1224 pub fn session_id(&self) -> &acp::SessionId {
1225 &self.session_id
1226 }
1227
1228 pub fn status(&self) -> ThreadStatus {
1229 if self.send_task.is_some() {
1230 ThreadStatus::Generating
1231 } else {
1232 ThreadStatus::Idle
1233 }
1234 }
1235
1236 pub fn token_usage(&self) -> Option<&TokenUsage> {
1237 self.token_usage.as_ref()
1238 }
1239
1240 pub fn has_pending_edit_tool_calls(&self) -> bool {
1241 for entry in self.entries.iter().rev() {
1242 match entry {
1243 AgentThreadEntry::UserMessage(_) => return false,
1244 AgentThreadEntry::ToolCall(
1245 call @ ToolCall {
1246 status: ToolCallStatus::InProgress | ToolCallStatus::Pending,
1247 ..
1248 },
1249 ) if call.diffs().next().is_some() => {
1250 return true;
1251 }
1252 AgentThreadEntry::ToolCall(_) | AgentThreadEntry::AssistantMessage(_) => {}
1253 }
1254 }
1255
1256 false
1257 }
1258
1259 pub fn has_in_progress_tool_calls(&self) -> bool {
1260 for entry in self.entries.iter().rev() {
1261 match entry {
1262 AgentThreadEntry::UserMessage(_) => return false,
1263 AgentThreadEntry::ToolCall(ToolCall {
1264 status: ToolCallStatus::InProgress | ToolCallStatus::Pending,
1265 ..
1266 }) => {
1267 return true;
1268 }
1269 AgentThreadEntry::ToolCall(_) | AgentThreadEntry::AssistantMessage(_) => {}
1270 }
1271 }
1272
1273 false
1274 }
1275
1276 pub fn used_tools_since_last_user_message(&self) -> bool {
1277 for entry in self.entries.iter().rev() {
1278 match entry {
1279 AgentThreadEntry::UserMessage(..) => return false,
1280 AgentThreadEntry::AssistantMessage(..) => continue,
1281 AgentThreadEntry::ToolCall(..) => return true,
1282 }
1283 }
1284
1285 false
1286 }
1287
1288 pub fn handle_session_update(
1289 &mut self,
1290 update: acp::SessionUpdate,
1291 cx: &mut Context<Self>,
1292 ) -> Result<(), acp::Error> {
1293 match update {
1294 acp::SessionUpdate::UserMessageChunk(acp::ContentChunk { content, .. }) => {
1295 self.push_user_content_block(None, content, cx);
1296 }
1297 acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk { content, .. }) => {
1298 self.push_assistant_content_block(content, false, cx);
1299 }
1300 acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk { content, .. }) => {
1301 self.push_assistant_content_block(content, true, cx);
1302 }
1303 acp::SessionUpdate::ToolCall(tool_call) => {
1304 self.upsert_tool_call(tool_call, cx)?;
1305 }
1306 acp::SessionUpdate::ToolCallUpdate(tool_call_update) => {
1307 self.update_tool_call(tool_call_update, cx)?;
1308 }
1309 acp::SessionUpdate::Plan(plan) => {
1310 self.update_plan(plan, cx);
1311 }
1312 acp::SessionUpdate::AvailableCommandsUpdate(acp::AvailableCommandsUpdate {
1313 available_commands,
1314 ..
1315 }) => cx.emit(AcpThreadEvent::AvailableCommandsUpdated(available_commands)),
1316 acp::SessionUpdate::CurrentModeUpdate(acp::CurrentModeUpdate {
1317 current_mode_id,
1318 ..
1319 }) => cx.emit(AcpThreadEvent::ModeUpdated(current_mode_id)),
1320 acp::SessionUpdate::ConfigOptionUpdate(acp::ConfigOptionUpdate {
1321 config_options,
1322 ..
1323 }) => cx.emit(AcpThreadEvent::ConfigOptionsUpdated(config_options)),
1324 _ => {}
1325 }
1326 Ok(())
1327 }
1328
1329 pub fn push_user_content_block(
1330 &mut self,
1331 message_id: Option<UserMessageId>,
1332 chunk: acp::ContentBlock,
1333 cx: &mut Context<Self>,
1334 ) {
1335 self.push_user_content_block_with_indent(message_id, chunk, false, cx)
1336 }
1337
1338 pub fn push_user_content_block_with_indent(
1339 &mut self,
1340 message_id: Option<UserMessageId>,
1341 chunk: acp::ContentBlock,
1342 indented: bool,
1343 cx: &mut Context<Self>,
1344 ) {
1345 let language_registry = self.project.read(cx).languages().clone();
1346 let path_style = self.project.read(cx).path_style(cx);
1347 let entries_len = self.entries.len();
1348
1349 if let Some(last_entry) = self.entries.last_mut()
1350 && let AgentThreadEntry::UserMessage(UserMessage {
1351 id,
1352 content,
1353 chunks,
1354 indented: existing_indented,
1355 ..
1356 }) = last_entry
1357 && *existing_indented == indented
1358 {
1359 *id = message_id.or(id.take());
1360 content.append(chunk.clone(), &language_registry, path_style, cx);
1361 chunks.push(chunk);
1362 let idx = entries_len - 1;
1363 cx.emit(AcpThreadEvent::EntryUpdated(idx));
1364 } else {
1365 let content = ContentBlock::new(chunk.clone(), &language_registry, path_style, cx);
1366 self.push_entry(
1367 AgentThreadEntry::UserMessage(UserMessage {
1368 id: message_id,
1369 content,
1370 chunks: vec![chunk],
1371 checkpoint: None,
1372 indented,
1373 }),
1374 cx,
1375 );
1376 }
1377 }
1378
1379 pub fn push_assistant_content_block(
1380 &mut self,
1381 chunk: acp::ContentBlock,
1382 is_thought: bool,
1383 cx: &mut Context<Self>,
1384 ) {
1385 self.push_assistant_content_block_with_indent(chunk, is_thought, false, cx)
1386 }
1387
1388 pub fn push_assistant_content_block_with_indent(
1389 &mut self,
1390 chunk: acp::ContentBlock,
1391 is_thought: bool,
1392 indented: bool,
1393 cx: &mut Context<Self>,
1394 ) {
1395 let language_registry = self.project.read(cx).languages().clone();
1396 let path_style = self.project.read(cx).path_style(cx);
1397 let entries_len = self.entries.len();
1398 if let Some(last_entry) = self.entries.last_mut()
1399 && let AgentThreadEntry::AssistantMessage(AssistantMessage {
1400 chunks,
1401 indented: existing_indented,
1402 }) = last_entry
1403 && *existing_indented == indented
1404 {
1405 let idx = entries_len - 1;
1406 cx.emit(AcpThreadEvent::EntryUpdated(idx));
1407 match (chunks.last_mut(), is_thought) {
1408 (Some(AssistantMessageChunk::Message { block }), false)
1409 | (Some(AssistantMessageChunk::Thought { block }), true) => {
1410 block.append(chunk, &language_registry, path_style, cx)
1411 }
1412 _ => {
1413 let block = ContentBlock::new(chunk, &language_registry, path_style, cx);
1414 if is_thought {
1415 chunks.push(AssistantMessageChunk::Thought { block })
1416 } else {
1417 chunks.push(AssistantMessageChunk::Message { block })
1418 }
1419 }
1420 }
1421 } else {
1422 let block = ContentBlock::new(chunk, &language_registry, path_style, cx);
1423 let chunk = if is_thought {
1424 AssistantMessageChunk::Thought { block }
1425 } else {
1426 AssistantMessageChunk::Message { block }
1427 };
1428
1429 self.push_entry(
1430 AgentThreadEntry::AssistantMessage(AssistantMessage {
1431 chunks: vec![chunk],
1432 indented,
1433 }),
1434 cx,
1435 );
1436 }
1437 }
1438
1439 fn push_entry(&mut self, entry: AgentThreadEntry, cx: &mut Context<Self>) {
1440 self.entries.push(entry);
1441 cx.emit(AcpThreadEvent::NewEntry);
1442 }
1443
1444 pub fn can_set_title(&mut self, cx: &mut Context<Self>) -> bool {
1445 self.connection.set_title(&self.session_id, cx).is_some()
1446 }
1447
1448 pub fn set_title(&mut self, title: SharedString, cx: &mut Context<Self>) -> Task<Result<()>> {
1449 if title != self.title {
1450 self.title = title.clone();
1451 cx.emit(AcpThreadEvent::TitleUpdated);
1452 if let Some(set_title) = self.connection.set_title(&self.session_id, cx) {
1453 return set_title.run(title, cx);
1454 }
1455 }
1456 Task::ready(Ok(()))
1457 }
1458
1459 pub fn update_token_usage(&mut self, usage: Option<TokenUsage>, cx: &mut Context<Self>) {
1460 self.token_usage = usage;
1461 cx.emit(AcpThreadEvent::TokenUsageUpdated);
1462 }
1463
1464 pub fn update_retry_status(&mut self, status: RetryStatus, cx: &mut Context<Self>) {
1465 cx.emit(AcpThreadEvent::Retry(status));
1466 }
1467
1468 pub fn update_tool_call(
1469 &mut self,
1470 update: impl Into<ToolCallUpdate>,
1471 cx: &mut Context<Self>,
1472 ) -> Result<()> {
1473 let update = update.into();
1474 let languages = self.project.read(cx).languages().clone();
1475 let path_style = self.project.read(cx).path_style(cx);
1476
1477 let ix = match self.index_for_tool_call(update.id()) {
1478 Some(ix) => ix,
1479 None => {
1480 // Tool call not found - create a failed tool call entry
1481 let failed_tool_call = ToolCall {
1482 id: update.id().clone(),
1483 label: cx.new(|cx| Markdown::new("Tool call not found".into(), None, None, cx)),
1484 kind: acp::ToolKind::Fetch,
1485 content: vec![ToolCallContent::ContentBlock(ContentBlock::new(
1486 "Tool call not found".into(),
1487 &languages,
1488 path_style,
1489 cx,
1490 ))],
1491 status: ToolCallStatus::Failed,
1492 locations: Vec::new(),
1493 resolved_locations: Vec::new(),
1494 raw_input: None,
1495 raw_input_markdown: None,
1496 raw_output: None,
1497 tool_name: None,
1498 };
1499 self.push_entry(AgentThreadEntry::ToolCall(failed_tool_call), cx);
1500 return Ok(());
1501 }
1502 };
1503 let AgentThreadEntry::ToolCall(call) = &mut self.entries[ix] else {
1504 unreachable!()
1505 };
1506
1507 match update {
1508 ToolCallUpdate::UpdateFields(update) => {
1509 let location_updated = update.fields.locations.is_some();
1510 call.update_fields(update.fields, languages, path_style, &self.terminals, cx)?;
1511 if location_updated {
1512 self.resolve_locations(update.tool_call_id, cx);
1513 }
1514 }
1515 ToolCallUpdate::UpdateDiff(update) => {
1516 call.content.clear();
1517 call.content.push(ToolCallContent::Diff(update.diff));
1518 }
1519 ToolCallUpdate::UpdateTerminal(update) => {
1520 call.content.clear();
1521 call.content
1522 .push(ToolCallContent::Terminal(update.terminal));
1523 }
1524 ToolCallUpdate::UpdateSubagentThread(update) => {
1525 debug_assert!(
1526 !call.content.iter().any(|c| {
1527 matches!(c, ToolCallContent::SubagentThread(existing) if existing == &update.thread)
1528 }),
1529 "Duplicate SubagentThread update for the same AcpThread entity"
1530 );
1531 call.content
1532 .push(ToolCallContent::SubagentThread(update.thread));
1533 }
1534 }
1535
1536 cx.emit(AcpThreadEvent::EntryUpdated(ix));
1537
1538 Ok(())
1539 }
1540
1541 /// Updates a tool call if id matches an existing entry, otherwise inserts a new one.
1542 pub fn upsert_tool_call(
1543 &mut self,
1544 tool_call: acp::ToolCall,
1545 cx: &mut Context<Self>,
1546 ) -> Result<(), acp::Error> {
1547 let status = tool_call.status.into();
1548 self.upsert_tool_call_inner(tool_call.into(), status, cx)
1549 }
1550
1551 /// Fails if id does not match an existing entry.
1552 pub fn upsert_tool_call_inner(
1553 &mut self,
1554 update: acp::ToolCallUpdate,
1555 status: ToolCallStatus,
1556 cx: &mut Context<Self>,
1557 ) -> Result<(), acp::Error> {
1558 let language_registry = self.project.read(cx).languages().clone();
1559 let path_style = self.project.read(cx).path_style(cx);
1560 let id = update.tool_call_id.clone();
1561
1562 let agent_telemetry_id = self.connection().telemetry_id();
1563 let session = self.session_id();
1564 if let ToolCallStatus::Completed | ToolCallStatus::Failed = status {
1565 let status = if matches!(status, ToolCallStatus::Completed) {
1566 "completed"
1567 } else {
1568 "failed"
1569 };
1570 telemetry::event!(
1571 "Agent Tool Call Completed",
1572 agent_telemetry_id,
1573 session,
1574 status
1575 );
1576 }
1577
1578 if let Some(ix) = self.index_for_tool_call(&id) {
1579 let AgentThreadEntry::ToolCall(call) = &mut self.entries[ix] else {
1580 unreachable!()
1581 };
1582
1583 call.update_fields(
1584 update.fields,
1585 language_registry,
1586 path_style,
1587 &self.terminals,
1588 cx,
1589 )?;
1590 call.status = status;
1591
1592 cx.emit(AcpThreadEvent::EntryUpdated(ix));
1593 } else {
1594 let call = ToolCall::from_acp(
1595 update.try_into()?,
1596 status,
1597 language_registry,
1598 self.project.read(cx).path_style(cx),
1599 &self.terminals,
1600 cx,
1601 )?;
1602 self.push_entry(AgentThreadEntry::ToolCall(call), cx);
1603 };
1604
1605 self.resolve_locations(id, cx);
1606 Ok(())
1607 }
1608
1609 fn index_for_tool_call(&self, id: &acp::ToolCallId) -> Option<usize> {
1610 self.entries
1611 .iter()
1612 .enumerate()
1613 .rev()
1614 .find_map(|(index, entry)| {
1615 if let AgentThreadEntry::ToolCall(tool_call) = entry
1616 && &tool_call.id == id
1617 {
1618 Some(index)
1619 } else {
1620 None
1621 }
1622 })
1623 }
1624
1625 fn tool_call_mut(&mut self, id: &acp::ToolCallId) -> Option<(usize, &mut ToolCall)> {
1626 // The tool call we are looking for is typically the last one, or very close to the end.
1627 // At the moment, it doesn't seem like a hashmap would be a good fit for this use case.
1628 self.entries
1629 .iter_mut()
1630 .enumerate()
1631 .rev()
1632 .find_map(|(index, tool_call)| {
1633 if let AgentThreadEntry::ToolCall(tool_call) = tool_call
1634 && &tool_call.id == id
1635 {
1636 Some((index, tool_call))
1637 } else {
1638 None
1639 }
1640 })
1641 }
1642
1643 pub fn tool_call(&mut self, id: &acp::ToolCallId) -> Option<(usize, &ToolCall)> {
1644 self.entries
1645 .iter()
1646 .enumerate()
1647 .rev()
1648 .find_map(|(index, tool_call)| {
1649 if let AgentThreadEntry::ToolCall(tool_call) = tool_call
1650 && &tool_call.id == id
1651 {
1652 Some((index, tool_call))
1653 } else {
1654 None
1655 }
1656 })
1657 }
1658
1659 pub fn resolve_locations(&mut self, id: acp::ToolCallId, cx: &mut Context<Self>) {
1660 let project = self.project.clone();
1661 let Some((_, tool_call)) = self.tool_call_mut(&id) else {
1662 return;
1663 };
1664 let task = tool_call.resolve_locations(project, cx);
1665 cx.spawn(async move |this, cx| {
1666 let resolved_locations = task.await;
1667
1668 this.update(cx, |this, cx| {
1669 let project = this.project.clone();
1670
1671 for location in resolved_locations.iter().flatten() {
1672 this.shared_buffers
1673 .insert(location.buffer.clone(), location.buffer.read(cx).snapshot());
1674 }
1675 let Some((ix, tool_call)) = this.tool_call_mut(&id) else {
1676 return;
1677 };
1678
1679 if let Some(Some(location)) = resolved_locations.last() {
1680 project.update(cx, |project, cx| {
1681 let should_ignore = if let Some(agent_location) = project
1682 .agent_location()
1683 .filter(|agent_location| agent_location.buffer == location.buffer)
1684 {
1685 let snapshot = location.buffer.read(cx).snapshot();
1686 let old_position = agent_location.position.to_point(&snapshot);
1687 let new_position = location.position.to_point(&snapshot);
1688
1689 // ignore this so that when we get updates from the edit tool
1690 // the position doesn't reset to the startof line
1691 old_position.row == new_position.row
1692 && old_position.column > new_position.column
1693 } else {
1694 false
1695 };
1696 if !should_ignore {
1697 project.set_agent_location(Some(location.into()), cx);
1698 }
1699 });
1700 }
1701
1702 let resolved_locations = resolved_locations
1703 .iter()
1704 .map(|l| l.as_ref().map(|l| AgentLocation::from(l)))
1705 .collect::<Vec<_>>();
1706
1707 if tool_call.resolved_locations != resolved_locations {
1708 tool_call.resolved_locations = resolved_locations;
1709 cx.emit(AcpThreadEvent::EntryUpdated(ix));
1710 }
1711 })
1712 })
1713 .detach();
1714 }
1715
1716 pub fn request_tool_call_authorization(
1717 &mut self,
1718 tool_call: acp::ToolCallUpdate,
1719 options: Vec<acp::PermissionOption>,
1720 respect_always_allow_setting: bool,
1721 cx: &mut Context<Self>,
1722 ) -> Result<BoxFuture<'static, acp::RequestPermissionOutcome>> {
1723 let (tx, rx) = oneshot::channel();
1724
1725 if respect_always_allow_setting && AgentSettings::get_global(cx).always_allow_tool_actions {
1726 // Don't use AllowAlways, because then if you were to turn off always_allow_tool_actions,
1727 // some tools would (incorrectly) continue to auto-accept.
1728 if let Some(allow_once_option) = options.iter().find_map(|option| {
1729 if matches!(option.kind, acp::PermissionOptionKind::AllowOnce) {
1730 Some(option.option_id.clone())
1731 } else {
1732 None
1733 }
1734 }) {
1735 self.upsert_tool_call_inner(tool_call, ToolCallStatus::Pending, cx)?;
1736 return Ok(async {
1737 acp::RequestPermissionOutcome::Selected(acp::SelectedPermissionOutcome::new(
1738 allow_once_option,
1739 ))
1740 }
1741 .boxed());
1742 }
1743 }
1744
1745 let status = ToolCallStatus::WaitingForConfirmation {
1746 options,
1747 respond_tx: tx,
1748 };
1749
1750 self.upsert_tool_call_inner(tool_call, status, cx)?;
1751 cx.emit(AcpThreadEvent::ToolAuthorizationRequired);
1752
1753 let fut = async {
1754 match rx.await {
1755 Ok(option) => acp::RequestPermissionOutcome::Selected(
1756 acp::SelectedPermissionOutcome::new(option),
1757 ),
1758 Err(oneshot::Canceled) => acp::RequestPermissionOutcome::Cancelled,
1759 }
1760 }
1761 .boxed();
1762
1763 Ok(fut)
1764 }
1765
1766 pub fn authorize_tool_call(
1767 &mut self,
1768 id: acp::ToolCallId,
1769 option_id: acp::PermissionOptionId,
1770 option_kind: acp::PermissionOptionKind,
1771 cx: &mut Context<Self>,
1772 ) {
1773 let Some((ix, call)) = self.tool_call_mut(&id) else {
1774 return;
1775 };
1776
1777 let new_status = match option_kind {
1778 acp::PermissionOptionKind::RejectOnce | acp::PermissionOptionKind::RejectAlways => {
1779 ToolCallStatus::Rejected
1780 }
1781 acp::PermissionOptionKind::AllowOnce | acp::PermissionOptionKind::AllowAlways => {
1782 ToolCallStatus::InProgress
1783 }
1784 _ => ToolCallStatus::InProgress,
1785 };
1786
1787 let curr_status = mem::replace(&mut call.status, new_status);
1788
1789 if let ToolCallStatus::WaitingForConfirmation { respond_tx, .. } = curr_status {
1790 respond_tx.send(option_id).log_err();
1791 } else if cfg!(debug_assertions) {
1792 panic!("tried to authorize an already authorized tool call");
1793 }
1794
1795 cx.emit(AcpThreadEvent::EntryUpdated(ix));
1796 }
1797
1798 pub fn first_tool_awaiting_confirmation(&self) -> Option<&ToolCall> {
1799 let mut first_tool_call = None;
1800
1801 for entry in self.entries.iter().rev() {
1802 match &entry {
1803 AgentThreadEntry::ToolCall(call) => {
1804 if let ToolCallStatus::WaitingForConfirmation { .. } = call.status {
1805 first_tool_call = Some(call);
1806 } else {
1807 continue;
1808 }
1809 }
1810 AgentThreadEntry::UserMessage(_) | AgentThreadEntry::AssistantMessage(_) => {
1811 // Reached the beginning of the turn.
1812 // If we had pending permission requests in the previous turn, they have been cancelled.
1813 break;
1814 }
1815 }
1816 }
1817
1818 first_tool_call
1819 }
1820
1821 pub fn plan(&self) -> &Plan {
1822 &self.plan
1823 }
1824
1825 pub fn update_plan(&mut self, request: acp::Plan, cx: &mut Context<Self>) {
1826 let new_entries_len = request.entries.len();
1827 let mut new_entries = request.entries.into_iter();
1828
1829 // Reuse existing markdown to prevent flickering
1830 for (old, new) in self.plan.entries.iter_mut().zip(new_entries.by_ref()) {
1831 let PlanEntry {
1832 content,
1833 priority,
1834 status,
1835 } = old;
1836 content.update(cx, |old, cx| {
1837 old.replace(new.content, cx);
1838 });
1839 *priority = new.priority;
1840 *status = new.status;
1841 }
1842 for new in new_entries {
1843 self.plan.entries.push(PlanEntry::from_acp(new, cx))
1844 }
1845 self.plan.entries.truncate(new_entries_len);
1846
1847 cx.notify();
1848 }
1849
1850 fn clear_completed_plan_entries(&mut self, cx: &mut Context<Self>) {
1851 self.plan
1852 .entries
1853 .retain(|entry| !matches!(entry.status, acp::PlanEntryStatus::Completed));
1854 cx.notify();
1855 }
1856
1857 #[cfg(any(test, feature = "test-support"))]
1858 pub fn send_raw(
1859 &mut self,
1860 message: &str,
1861 cx: &mut Context<Self>,
1862 ) -> BoxFuture<'static, Result<()>> {
1863 self.send(vec![message.into()], cx)
1864 }
1865
1866 pub fn send(
1867 &mut self,
1868 message: Vec<acp::ContentBlock>,
1869 cx: &mut Context<Self>,
1870 ) -> BoxFuture<'static, Result<()>> {
1871 let block = ContentBlock::new_combined(
1872 message.clone(),
1873 self.project.read(cx).languages().clone(),
1874 self.project.read(cx).path_style(cx),
1875 cx,
1876 );
1877 let request = acp::PromptRequest::new(self.session_id.clone(), message.clone());
1878 let git_store = self.project.read(cx).git_store().clone();
1879
1880 let message_id = if self.connection.truncate(&self.session_id, cx).is_some() {
1881 Some(UserMessageId::new())
1882 } else {
1883 None
1884 };
1885
1886 self.run_turn(cx, async move |this, cx| {
1887 this.update(cx, |this, cx| {
1888 this.push_entry(
1889 AgentThreadEntry::UserMessage(UserMessage {
1890 id: message_id.clone(),
1891 content: block,
1892 chunks: message,
1893 checkpoint: None,
1894 indented: false,
1895 }),
1896 cx,
1897 );
1898 })
1899 .ok();
1900
1901 let old_checkpoint = git_store
1902 .update(cx, |git, cx| git.checkpoint(cx))
1903 .await
1904 .context("failed to get old checkpoint")
1905 .log_err();
1906 this.update(cx, |this, cx| {
1907 if let Some((_ix, message)) = this.last_user_message() {
1908 message.checkpoint = old_checkpoint.map(|git_checkpoint| Checkpoint {
1909 git_checkpoint,
1910 show: false,
1911 });
1912 }
1913 this.connection.prompt(message_id, request, cx)
1914 })?
1915 .await
1916 })
1917 }
1918
1919 pub fn can_resume(&self, cx: &App) -> bool {
1920 self.connection.resume(&self.session_id, cx).is_some()
1921 }
1922
1923 pub fn resume(&mut self, cx: &mut Context<Self>) -> BoxFuture<'static, Result<()>> {
1924 self.run_turn(cx, async move |this, cx| {
1925 this.update(cx, |this, cx| {
1926 this.connection
1927 .resume(&this.session_id, cx)
1928 .map(|resume| resume.run(cx))
1929 })?
1930 .context("resuming a session is not supported")?
1931 .await
1932 })
1933 }
1934
1935 fn run_turn(
1936 &mut self,
1937 cx: &mut Context<Self>,
1938 f: impl 'static + AsyncFnOnce(WeakEntity<Self>, &mut AsyncApp) -> Result<acp::PromptResponse>,
1939 ) -> BoxFuture<'static, Result<()>> {
1940 self.clear_completed_plan_entries(cx);
1941
1942 let (tx, rx) = oneshot::channel();
1943 let cancel_task = self.cancel(cx);
1944
1945 self.send_task = Some(cx.spawn(async move |this, cx| {
1946 cancel_task.await;
1947 tx.send(f(this, cx).await).ok();
1948 }));
1949
1950 cx.spawn(async move |this, cx| {
1951 let response = rx.await;
1952
1953 this.update(cx, |this, cx| this.update_last_checkpoint(cx))?
1954 .await?;
1955
1956 this.update(cx, |this, cx| {
1957 this.project
1958 .update(cx, |project, cx| project.set_agent_location(None, cx));
1959 match response {
1960 Ok(Err(e)) => {
1961 this.send_task.take();
1962 cx.emit(AcpThreadEvent::Error);
1963 Err(e)
1964 }
1965 result => {
1966 let canceled = matches!(
1967 result,
1968 Ok(Ok(acp::PromptResponse {
1969 stop_reason: acp::StopReason::Cancelled,
1970 ..
1971 }))
1972 );
1973
1974 // We only take the task if the current prompt wasn't canceled.
1975 //
1976 // This prompt may have been canceled because another one was sent
1977 // while it was still generating. In these cases, dropping `send_task`
1978 // would cause the next generation to be canceled.
1979 if !canceled {
1980 this.send_task.take();
1981 }
1982
1983 // Handle refusal - distinguish between user prompt and tool call refusals
1984 if let Ok(Ok(acp::PromptResponse {
1985 stop_reason: acp::StopReason::Refusal,
1986 ..
1987 })) = result
1988 {
1989 if let Some((user_msg_ix, _)) = this.last_user_message() {
1990 // Check if there's a completed tool call with results after the last user message
1991 // This indicates the refusal is in response to tool output, not the user's prompt
1992 let has_completed_tool_call_after_user_msg =
1993 this.entries.iter().skip(user_msg_ix + 1).any(|entry| {
1994 if let AgentThreadEntry::ToolCall(tool_call) = entry {
1995 // Check if the tool call has completed and has output
1996 matches!(tool_call.status, ToolCallStatus::Completed)
1997 && tool_call.raw_output.is_some()
1998 } else {
1999 false
2000 }
2001 });
2002
2003 if has_completed_tool_call_after_user_msg {
2004 // Refusal is due to tool output - don't truncate, just notify
2005 // The model refused based on what the tool returned
2006 cx.emit(AcpThreadEvent::Refusal);
2007 } else {
2008 // User prompt was refused - truncate back to before the user message
2009 let range = user_msg_ix..this.entries.len();
2010 if range.start < range.end {
2011 this.entries.truncate(user_msg_ix);
2012 cx.emit(AcpThreadEvent::EntriesRemoved(range));
2013 }
2014 cx.emit(AcpThreadEvent::Refusal);
2015 }
2016 } else {
2017 // No user message found, treat as general refusal
2018 cx.emit(AcpThreadEvent::Refusal);
2019 }
2020 }
2021
2022 cx.emit(AcpThreadEvent::Stopped);
2023 Ok(())
2024 }
2025 }
2026 })?
2027 })
2028 .boxed()
2029 }
2030
2031 pub fn cancel(&mut self, cx: &mut Context<Self>) -> Task<()> {
2032 let Some(send_task) = self.send_task.take() else {
2033 return Task::ready(());
2034 };
2035
2036 for entry in self.entries.iter_mut() {
2037 if let AgentThreadEntry::ToolCall(call) = entry {
2038 let cancel = matches!(
2039 call.status,
2040 ToolCallStatus::Pending
2041 | ToolCallStatus::WaitingForConfirmation { .. }
2042 | ToolCallStatus::InProgress
2043 );
2044
2045 if cancel {
2046 call.status = ToolCallStatus::Canceled;
2047 }
2048 }
2049 }
2050
2051 self.connection.cancel(&self.session_id, cx);
2052
2053 // Wait for the send task to complete
2054 cx.foreground_executor().spawn(send_task)
2055 }
2056
2057 /// Restores the git working tree to the state at the given checkpoint (if one exists)
2058 pub fn restore_checkpoint(
2059 &mut self,
2060 id: UserMessageId,
2061 cx: &mut Context<Self>,
2062 ) -> Task<Result<()>> {
2063 let Some((_, message)) = self.user_message_mut(&id) else {
2064 return Task::ready(Err(anyhow!("message not found")));
2065 };
2066
2067 let checkpoint = message
2068 .checkpoint
2069 .as_ref()
2070 .map(|c| c.git_checkpoint.clone());
2071
2072 // Cancel any in-progress generation before restoring
2073 let cancel_task = self.cancel(cx);
2074 let rewind = self.rewind(id.clone(), cx);
2075 let git_store = self.project.read(cx).git_store().clone();
2076
2077 cx.spawn(async move |_, cx| {
2078 cancel_task.await;
2079 rewind.await?;
2080 if let Some(checkpoint) = checkpoint {
2081 git_store
2082 .update(cx, |git, cx| git.restore_checkpoint(checkpoint, cx))
2083 .await?;
2084 }
2085
2086 Ok(())
2087 })
2088 }
2089
2090 /// Rewinds this thread to before the entry at `index`, removing it and all
2091 /// subsequent entries while rejecting any action_log changes made from that point.
2092 /// Unlike `restore_checkpoint`, this method does not restore from git.
2093 pub fn rewind(&mut self, id: UserMessageId, cx: &mut Context<Self>) -> Task<Result<()>> {
2094 let Some(truncate) = self.connection.truncate(&self.session_id, cx) else {
2095 return Task::ready(Err(anyhow!("not supported")));
2096 };
2097
2098 let telemetry = ActionLogTelemetry::from(&*self);
2099 cx.spawn(async move |this, cx| {
2100 cx.update(|cx| truncate.run(id.clone(), cx)).await?;
2101 this.update(cx, |this, cx| {
2102 if let Some((ix, _)) = this.user_message_mut(&id) {
2103 // Collect all terminals from entries that will be removed
2104 let terminals_to_remove: Vec<acp::TerminalId> = this.entries[ix..]
2105 .iter()
2106 .flat_map(|entry| entry.terminals())
2107 .filter_map(|terminal| terminal.read(cx).id().clone().into())
2108 .collect();
2109
2110 let range = ix..this.entries.len();
2111 this.entries.truncate(ix);
2112 cx.emit(AcpThreadEvent::EntriesRemoved(range));
2113
2114 // Kill and remove the terminals
2115 for terminal_id in terminals_to_remove {
2116 if let Some(terminal) = this.terminals.remove(&terminal_id) {
2117 terminal.update(cx, |terminal, cx| {
2118 terminal.kill(cx);
2119 });
2120 }
2121 }
2122 }
2123 this.action_log().update(cx, |action_log, cx| {
2124 action_log.reject_all_edits(Some(telemetry), cx)
2125 })
2126 })?
2127 .await;
2128 Ok(())
2129 })
2130 }
2131
2132 fn update_last_checkpoint(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
2133 let git_store = self.project.read(cx).git_store().clone();
2134
2135 let Some((_, message)) = self.last_user_message() else {
2136 return Task::ready(Ok(()));
2137 };
2138 let Some(user_message_id) = message.id.clone() else {
2139 return Task::ready(Ok(()));
2140 };
2141 let Some(checkpoint) = message.checkpoint.as_ref() else {
2142 return Task::ready(Ok(()));
2143 };
2144 let old_checkpoint = checkpoint.git_checkpoint.clone();
2145
2146 let new_checkpoint = git_store.update(cx, |git, cx| git.checkpoint(cx));
2147 cx.spawn(async move |this, cx| {
2148 let Some(new_checkpoint) = new_checkpoint
2149 .await
2150 .context("failed to get new checkpoint")
2151 .log_err()
2152 else {
2153 return Ok(());
2154 };
2155
2156 let equal = git_store
2157 .update(cx, |git, cx| {
2158 git.compare_checkpoints(old_checkpoint.clone(), new_checkpoint, cx)
2159 })
2160 .await
2161 .unwrap_or(true);
2162
2163 this.update(cx, |this, cx| {
2164 if let Some((ix, message)) = this.user_message_mut(&user_message_id) {
2165 if let Some(checkpoint) = message.checkpoint.as_mut() {
2166 checkpoint.show = !equal;
2167 cx.emit(AcpThreadEvent::EntryUpdated(ix));
2168 }
2169 }
2170 })?;
2171
2172 Ok(())
2173 })
2174 }
2175
2176 fn last_user_message(&mut self) -> Option<(usize, &mut UserMessage)> {
2177 self.entries
2178 .iter_mut()
2179 .enumerate()
2180 .rev()
2181 .find_map(|(ix, entry)| {
2182 if let AgentThreadEntry::UserMessage(message) = entry {
2183 Some((ix, message))
2184 } else {
2185 None
2186 }
2187 })
2188 }
2189
2190 fn user_message_mut(&mut self, id: &UserMessageId) -> Option<(usize, &mut UserMessage)> {
2191 self.entries.iter_mut().enumerate().find_map(|(ix, entry)| {
2192 if let AgentThreadEntry::UserMessage(message) = entry {
2193 if message.id.as_ref() == Some(id) {
2194 Some((ix, message))
2195 } else {
2196 None
2197 }
2198 } else {
2199 None
2200 }
2201 })
2202 }
2203
2204 pub fn read_text_file(
2205 &self,
2206 path: PathBuf,
2207 line: Option<u32>,
2208 limit: Option<u32>,
2209 reuse_shared_snapshot: bool,
2210 cx: &mut Context<Self>,
2211 ) -> Task<Result<String, acp::Error>> {
2212 // Args are 1-based, move to 0-based
2213 let line = line.unwrap_or_default().saturating_sub(1);
2214 let limit = limit.unwrap_or(u32::MAX);
2215 let project = self.project.clone();
2216 let action_log = self.action_log.clone();
2217 cx.spawn(async move |this, cx| {
2218 let load = project.update(cx, |project, cx| {
2219 let path = project
2220 .project_path_for_absolute_path(&path, cx)
2221 .ok_or_else(|| {
2222 acp::Error::resource_not_found(Some(path.display().to_string()))
2223 })?;
2224 Ok::<_, acp::Error>(project.open_buffer(path, cx))
2225 })?;
2226
2227 let buffer = load.await?;
2228
2229 let snapshot = if reuse_shared_snapshot {
2230 this.read_with(cx, |this, _| {
2231 this.shared_buffers.get(&buffer.clone()).cloned()
2232 })
2233 .log_err()
2234 .flatten()
2235 } else {
2236 None
2237 };
2238
2239 let snapshot = if let Some(snapshot) = snapshot {
2240 snapshot
2241 } else {
2242 action_log.update(cx, |action_log, cx| {
2243 action_log.buffer_read(buffer.clone(), cx);
2244 });
2245
2246 let snapshot = buffer.update(cx, |buffer, _| buffer.snapshot());
2247 this.update(cx, |this, _| {
2248 this.shared_buffers.insert(buffer.clone(), snapshot.clone());
2249 })?;
2250 snapshot
2251 };
2252
2253 let max_point = snapshot.max_point();
2254 let start_position = Point::new(line, 0);
2255
2256 if start_position > max_point {
2257 return Err(acp::Error::invalid_params().data(format!(
2258 "Attempting to read beyond the end of the file, line {}:{}",
2259 max_point.row + 1,
2260 max_point.column
2261 )));
2262 }
2263
2264 let start = snapshot.anchor_before(start_position);
2265 let end = snapshot.anchor_before(Point::new(line.saturating_add(limit), 0));
2266
2267 project.update(cx, |project, cx| {
2268 project.set_agent_location(
2269 Some(AgentLocation {
2270 buffer: buffer.downgrade(),
2271 position: start,
2272 }),
2273 cx,
2274 );
2275 });
2276
2277 Ok(snapshot.text_for_range(start..end).collect::<String>())
2278 })
2279 }
2280
2281 pub fn write_text_file(
2282 &self,
2283 path: PathBuf,
2284 content: String,
2285 cx: &mut Context<Self>,
2286 ) -> Task<Result<()>> {
2287 let project = self.project.clone();
2288 let action_log = self.action_log.clone();
2289 cx.spawn(async move |this, cx| {
2290 let load = project.update(cx, |project, cx| {
2291 let path = project
2292 .project_path_for_absolute_path(&path, cx)
2293 .context("invalid path")?;
2294 anyhow::Ok(project.open_buffer(path, cx))
2295 });
2296 let buffer = load?.await?;
2297 let snapshot = this.update(cx, |this, cx| {
2298 this.shared_buffers
2299 .get(&buffer)
2300 .cloned()
2301 .unwrap_or_else(|| buffer.read(cx).snapshot())
2302 })?;
2303 let edits = cx
2304 .background_executor()
2305 .spawn(async move {
2306 let old_text = snapshot.text();
2307 text_diff(old_text.as_str(), &content)
2308 .into_iter()
2309 .map(|(range, replacement)| {
2310 (
2311 snapshot.anchor_after(range.start)
2312 ..snapshot.anchor_before(range.end),
2313 replacement,
2314 )
2315 })
2316 .collect::<Vec<_>>()
2317 })
2318 .await;
2319
2320 project.update(cx, |project, cx| {
2321 project.set_agent_location(
2322 Some(AgentLocation {
2323 buffer: buffer.downgrade(),
2324 position: edits
2325 .last()
2326 .map(|(range, _)| range.end)
2327 .unwrap_or(Anchor::min_for_buffer(buffer.read(cx).remote_id())),
2328 }),
2329 cx,
2330 );
2331 });
2332
2333 let format_on_save = cx.update(|cx| {
2334 action_log.update(cx, |action_log, cx| {
2335 action_log.buffer_read(buffer.clone(), cx);
2336 });
2337
2338 let format_on_save = buffer.update(cx, |buffer, cx| {
2339 buffer.edit(edits, None, cx);
2340
2341 let settings = language::language_settings::language_settings(
2342 buffer.language().map(|l| l.name()),
2343 buffer.file(),
2344 cx,
2345 );
2346
2347 settings.format_on_save != FormatOnSave::Off
2348 });
2349 action_log.update(cx, |action_log, cx| {
2350 action_log.buffer_edited(buffer.clone(), cx);
2351 });
2352 format_on_save
2353 });
2354
2355 if format_on_save {
2356 let format_task = project.update(cx, |project, cx| {
2357 project.format(
2358 HashSet::from_iter([buffer.clone()]),
2359 LspFormatTarget::Buffers,
2360 false,
2361 FormatTrigger::Save,
2362 cx,
2363 )
2364 });
2365 format_task.await.log_err();
2366
2367 action_log.update(cx, |action_log, cx| {
2368 action_log.buffer_edited(buffer.clone(), cx);
2369 });
2370 }
2371
2372 project
2373 .update(cx, |project, cx| project.save_buffer(buffer, cx))
2374 .await
2375 })
2376 }
2377
2378 pub fn create_terminal(
2379 &self,
2380 command: String,
2381 args: Vec<String>,
2382 extra_env: Vec<acp::EnvVariable>,
2383 cwd: Option<PathBuf>,
2384 output_byte_limit: Option<u64>,
2385 cx: &mut Context<Self>,
2386 ) -> Task<Result<Entity<Terminal>>> {
2387 let env = match &cwd {
2388 Some(dir) => self.project.update(cx, |project, cx| {
2389 project.environment().update(cx, |env, cx| {
2390 env.directory_environment(dir.as_path().into(), cx)
2391 })
2392 }),
2393 None => Task::ready(None).shared(),
2394 };
2395 let env = cx.spawn(async move |_, _| {
2396 let mut env = env.await.unwrap_or_default();
2397 // Disables paging for `git` and hopefully other commands
2398 env.insert("PAGER".into(), "".into());
2399 for var in extra_env {
2400 env.insert(var.name, var.value);
2401 }
2402 env
2403 });
2404
2405 let project = self.project.clone();
2406 let language_registry = project.read(cx).languages().clone();
2407 let is_windows = project.read(cx).path_style(cx).is_windows();
2408
2409 let terminal_id = acp::TerminalId::new(Uuid::new_v4().to_string());
2410 let terminal_task = cx.spawn({
2411 let terminal_id = terminal_id.clone();
2412 async move |_this, cx| {
2413 let env = env.await;
2414 let shell = project
2415 .update(cx, |project, cx| {
2416 project
2417 .remote_client()
2418 .and_then(|r| r.read(cx).default_system_shell())
2419 })
2420 .unwrap_or_else(|| get_default_system_shell_preferring_bash());
2421 let (task_command, task_args) =
2422 ShellBuilder::new(&Shell::Program(shell), is_windows)
2423 .redirect_stdin_to_dev_null()
2424 .build(Some(command.clone()), &args);
2425 let terminal = project
2426 .update(cx, |project, cx| {
2427 project.create_terminal_task(
2428 task::SpawnInTerminal {
2429 command: Some(task_command),
2430 args: task_args,
2431 cwd: cwd.clone(),
2432 env,
2433 ..Default::default()
2434 },
2435 cx,
2436 )
2437 })
2438 .await?;
2439
2440 anyhow::Ok(cx.new(|cx| {
2441 Terminal::new(
2442 terminal_id,
2443 &format!("{} {}", command, args.join(" ")),
2444 cwd,
2445 output_byte_limit.map(|l| l as usize),
2446 terminal,
2447 language_registry,
2448 cx,
2449 )
2450 }))
2451 }
2452 });
2453
2454 cx.spawn(async move |this, cx| {
2455 let terminal = terminal_task.await?;
2456 this.update(cx, |this, _cx| {
2457 this.terminals.insert(terminal_id, terminal.clone());
2458 terminal
2459 })
2460 })
2461 }
2462
2463 pub fn kill_terminal(
2464 &mut self,
2465 terminal_id: acp::TerminalId,
2466 cx: &mut Context<Self>,
2467 ) -> Result<()> {
2468 self.terminals
2469 .get(&terminal_id)
2470 .context("Terminal not found")?
2471 .update(cx, |terminal, cx| {
2472 terminal.kill(cx);
2473 });
2474
2475 Ok(())
2476 }
2477
2478 pub fn release_terminal(
2479 &mut self,
2480 terminal_id: acp::TerminalId,
2481 cx: &mut Context<Self>,
2482 ) -> Result<()> {
2483 self.terminals
2484 .remove(&terminal_id)
2485 .context("Terminal not found")?
2486 .update(cx, |terminal, cx| {
2487 terminal.kill(cx);
2488 });
2489
2490 Ok(())
2491 }
2492
2493 pub fn terminal(&self, terminal_id: acp::TerminalId) -> Result<Entity<Terminal>> {
2494 self.terminals
2495 .get(&terminal_id)
2496 .context("Terminal not found")
2497 .cloned()
2498 }
2499
2500 pub fn to_markdown(&self, cx: &App) -> String {
2501 self.entries.iter().map(|e| e.to_markdown(cx)).collect()
2502 }
2503
2504 pub fn emit_load_error(&mut self, error: LoadError, cx: &mut Context<Self>) {
2505 cx.emit(AcpThreadEvent::LoadError(error));
2506 }
2507
2508 pub fn register_terminal_created(
2509 &mut self,
2510 terminal_id: acp::TerminalId,
2511 command_label: String,
2512 working_dir: Option<PathBuf>,
2513 output_byte_limit: Option<u64>,
2514 terminal: Entity<::terminal::Terminal>,
2515 cx: &mut Context<Self>,
2516 ) -> Entity<Terminal> {
2517 let language_registry = self.project.read(cx).languages().clone();
2518
2519 let entity = cx.new(|cx| {
2520 Terminal::new(
2521 terminal_id.clone(),
2522 &command_label,
2523 working_dir.clone(),
2524 output_byte_limit.map(|l| l as usize),
2525 terminal,
2526 language_registry,
2527 cx,
2528 )
2529 });
2530 self.terminals.insert(terminal_id.clone(), entity.clone());
2531 entity
2532 }
2533}
2534
2535fn markdown_for_raw_output(
2536 raw_output: &serde_json::Value,
2537 language_registry: &Arc<LanguageRegistry>,
2538 cx: &mut App,
2539) -> Option<Entity<Markdown>> {
2540 match raw_output {
2541 serde_json::Value::Null => None,
2542 serde_json::Value::Bool(value) => Some(cx.new(|cx| {
2543 Markdown::new(
2544 value.to_string().into(),
2545 Some(language_registry.clone()),
2546 None,
2547 cx,
2548 )
2549 })),
2550 serde_json::Value::Number(value) => Some(cx.new(|cx| {
2551 Markdown::new(
2552 value.to_string().into(),
2553 Some(language_registry.clone()),
2554 None,
2555 cx,
2556 )
2557 })),
2558 serde_json::Value::String(value) => Some(cx.new(|cx| {
2559 Markdown::new(
2560 value.clone().into(),
2561 Some(language_registry.clone()),
2562 None,
2563 cx,
2564 )
2565 })),
2566 value => Some(cx.new(|cx| {
2567 let pretty_json = to_string_pretty(value).unwrap_or_else(|_| value.to_string());
2568
2569 Markdown::new(
2570 format!("```json\n{}\n```", pretty_json).into(),
2571 Some(language_registry.clone()),
2572 None,
2573 cx,
2574 )
2575 })),
2576 }
2577}
2578
2579#[cfg(test)]
2580mod tests {
2581 use super::*;
2582 use anyhow::anyhow;
2583 use futures::{channel::mpsc, future::LocalBoxFuture, select};
2584 use gpui::{App, AsyncApp, TestAppContext, WeakEntity};
2585 use indoc::indoc;
2586 use project::{FakeFs, Fs};
2587 use rand::{distr, prelude::*};
2588 use serde_json::json;
2589 use settings::SettingsStore;
2590 use smol::stream::StreamExt as _;
2591 use std::{
2592 any::Any,
2593 cell::RefCell,
2594 path::Path,
2595 rc::Rc,
2596 sync::atomic::{AtomicBool, AtomicUsize, Ordering::SeqCst},
2597 time::Duration,
2598 };
2599 use util::path;
2600
2601 fn init_test(cx: &mut TestAppContext) {
2602 env_logger::try_init().ok();
2603 cx.update(|cx| {
2604 let settings_store = SettingsStore::test(cx);
2605 cx.set_global(settings_store);
2606 });
2607 }
2608
2609 #[gpui::test]
2610 async fn test_terminal_output_buffered_before_created_renders(cx: &mut gpui::TestAppContext) {
2611 init_test(cx);
2612
2613 let fs = FakeFs::new(cx.executor());
2614 let project = Project::test(fs, [], cx).await;
2615 let connection = Rc::new(FakeAgentConnection::new());
2616 let thread = cx
2617 .update(|cx| connection.new_thread(project, std::path::Path::new(path!("/test")), cx))
2618 .await
2619 .unwrap();
2620
2621 let terminal_id = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
2622
2623 // Send Output BEFORE Created - should be buffered by acp_thread
2624 thread.update(cx, |thread, cx| {
2625 thread.on_terminal_provider_event(
2626 TerminalProviderEvent::Output {
2627 terminal_id: terminal_id.clone(),
2628 data: b"hello buffered".to_vec(),
2629 },
2630 cx,
2631 );
2632 });
2633
2634 // Create a display-only terminal and then send Created
2635 let lower = cx.new(|cx| {
2636 let builder = ::terminal::TerminalBuilder::new_display_only(
2637 ::terminal::terminal_settings::CursorShape::default(),
2638 ::terminal::terminal_settings::AlternateScroll::On,
2639 None,
2640 0,
2641 )
2642 .unwrap();
2643 builder.subscribe(cx)
2644 });
2645
2646 thread.update(cx, |thread, cx| {
2647 thread.on_terminal_provider_event(
2648 TerminalProviderEvent::Created {
2649 terminal_id: terminal_id.clone(),
2650 label: "Buffered Test".to_string(),
2651 cwd: None,
2652 output_byte_limit: None,
2653 terminal: lower.clone(),
2654 },
2655 cx,
2656 );
2657 });
2658
2659 // After Created, buffered Output should have been flushed into the renderer
2660 let content = thread.read_with(cx, |thread, cx| {
2661 let term = thread.terminal(terminal_id.clone()).unwrap();
2662 term.read_with(cx, |t, cx| t.inner().read(cx).get_content())
2663 });
2664
2665 assert!(
2666 content.contains("hello buffered"),
2667 "expected buffered output to render, got: {content}"
2668 );
2669 }
2670
2671 #[gpui::test]
2672 async fn test_terminal_output_and_exit_buffered_before_created(cx: &mut gpui::TestAppContext) {
2673 init_test(cx);
2674
2675 let fs = FakeFs::new(cx.executor());
2676 let project = Project::test(fs, [], cx).await;
2677 let connection = Rc::new(FakeAgentConnection::new());
2678 let thread = cx
2679 .update(|cx| connection.new_thread(project, std::path::Path::new(path!("/test")), cx))
2680 .await
2681 .unwrap();
2682
2683 let terminal_id = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
2684
2685 // Send Output BEFORE Created
2686 thread.update(cx, |thread, cx| {
2687 thread.on_terminal_provider_event(
2688 TerminalProviderEvent::Output {
2689 terminal_id: terminal_id.clone(),
2690 data: b"pre-exit data".to_vec(),
2691 },
2692 cx,
2693 );
2694 });
2695
2696 // Send Exit BEFORE Created
2697 thread.update(cx, |thread, cx| {
2698 thread.on_terminal_provider_event(
2699 TerminalProviderEvent::Exit {
2700 terminal_id: terminal_id.clone(),
2701 status: acp::TerminalExitStatus::new().exit_code(0),
2702 },
2703 cx,
2704 );
2705 });
2706
2707 // Now create a display-only lower-level terminal and send Created
2708 let lower = cx.new(|cx| {
2709 let builder = ::terminal::TerminalBuilder::new_display_only(
2710 ::terminal::terminal_settings::CursorShape::default(),
2711 ::terminal::terminal_settings::AlternateScroll::On,
2712 None,
2713 0,
2714 )
2715 .unwrap();
2716 builder.subscribe(cx)
2717 });
2718
2719 thread.update(cx, |thread, cx| {
2720 thread.on_terminal_provider_event(
2721 TerminalProviderEvent::Created {
2722 terminal_id: terminal_id.clone(),
2723 label: "Buffered Exit Test".to_string(),
2724 cwd: None,
2725 output_byte_limit: None,
2726 terminal: lower.clone(),
2727 },
2728 cx,
2729 );
2730 });
2731
2732 // Output should be present after Created (flushed from buffer)
2733 let content = thread.read_with(cx, |thread, cx| {
2734 let term = thread.terminal(terminal_id.clone()).unwrap();
2735 term.read_with(cx, |t, cx| t.inner().read(cx).get_content())
2736 });
2737
2738 assert!(
2739 content.contains("pre-exit data"),
2740 "expected pre-exit data to render, got: {content}"
2741 );
2742 }
2743
2744 /// Test that killing a terminal via Terminal::kill properly:
2745 /// 1. Causes wait_for_exit to complete (doesn't hang forever)
2746 /// 2. The underlying terminal still has the output that was written before the kill
2747 ///
2748 /// This test verifies that the fix to kill_active_task (which now also kills
2749 /// the shell process in addition to the foreground process) properly allows
2750 /// wait_for_exit to complete instead of hanging indefinitely.
2751 #[cfg(unix)]
2752 #[gpui::test]
2753 async fn test_terminal_kill_allows_wait_for_exit_to_complete(cx: &mut gpui::TestAppContext) {
2754 use std::collections::HashMap;
2755 use task::Shell;
2756 use util::shell_builder::ShellBuilder;
2757
2758 init_test(cx);
2759 cx.executor().allow_parking();
2760
2761 let fs = FakeFs::new(cx.executor());
2762 let project = Project::test(fs, [], cx).await;
2763 let connection = Rc::new(FakeAgentConnection::new());
2764 let thread = cx
2765 .update(|cx| connection.new_thread(project.clone(), Path::new(path!("/test")), cx))
2766 .await
2767 .unwrap();
2768
2769 let terminal_id = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
2770
2771 // Create a real PTY terminal that runs a command which prints output then sleeps
2772 // We use printf instead of echo and chain with && sleep to ensure proper execution
2773 let (completion_tx, _completion_rx) = smol::channel::unbounded();
2774 let (program, args) = ShellBuilder::new(&Shell::System, false).build(
2775 Some("printf 'output_before_kill\\n' && sleep 60".to_owned()),
2776 &[],
2777 );
2778
2779 let builder = cx
2780 .update(|cx| {
2781 ::terminal::TerminalBuilder::new(
2782 None,
2783 None,
2784 task::Shell::WithArguments {
2785 program,
2786 args,
2787 title_override: None,
2788 },
2789 HashMap::default(),
2790 ::terminal::terminal_settings::CursorShape::default(),
2791 ::terminal::terminal_settings::AlternateScroll::On,
2792 None,
2793 vec![],
2794 0,
2795 false,
2796 0,
2797 Some(completion_tx),
2798 cx,
2799 vec![],
2800 )
2801 })
2802 .await
2803 .unwrap();
2804
2805 let lower_terminal = cx.new(|cx| builder.subscribe(cx));
2806
2807 // Create the acp_thread Terminal wrapper
2808 thread.update(cx, |thread, cx| {
2809 thread.on_terminal_provider_event(
2810 TerminalProviderEvent::Created {
2811 terminal_id: terminal_id.clone(),
2812 label: "printf output_before_kill && sleep 60".to_string(),
2813 cwd: None,
2814 output_byte_limit: None,
2815 terminal: lower_terminal.clone(),
2816 },
2817 cx,
2818 );
2819 });
2820
2821 // Wait for the printf command to execute and produce output
2822 // Use real time since parking is enabled
2823 cx.executor().timer(Duration::from_millis(500)).await;
2824
2825 // Get the acp_thread Terminal and kill it
2826 let wait_for_exit = thread.update(cx, |thread, cx| {
2827 let term = thread.terminals.get(&terminal_id).unwrap();
2828 let wait_for_exit = term.read(cx).wait_for_exit();
2829 term.update(cx, |term, cx| {
2830 term.kill(cx);
2831 });
2832 wait_for_exit
2833 });
2834
2835 // KEY ASSERTION: wait_for_exit should complete within a reasonable time (not hang).
2836 // Before the fix to kill_active_task, this would hang forever because
2837 // only the foreground process was killed, not the shell, so the PTY
2838 // child never exited and wait_for_completed_task never completed.
2839 let exit_result = futures::select! {
2840 result = futures::FutureExt::fuse(wait_for_exit) => Some(result),
2841 _ = futures::FutureExt::fuse(cx.background_executor.timer(Duration::from_secs(5))) => None,
2842 };
2843
2844 assert!(
2845 exit_result.is_some(),
2846 "wait_for_exit should complete after kill, but it timed out. \
2847 This indicates kill_active_task is not properly killing the shell process."
2848 );
2849
2850 // Give the system a chance to process any pending updates
2851 cx.run_until_parked();
2852
2853 // Verify that the underlying terminal still has the output that was
2854 // written before the kill. This verifies that killing doesn't lose output.
2855 let inner_content = thread.read_with(cx, |thread, cx| {
2856 let term = thread.terminals.get(&terminal_id).unwrap();
2857 term.read(cx).inner().read(cx).get_content()
2858 });
2859
2860 assert!(
2861 inner_content.contains("output_before_kill"),
2862 "Underlying terminal should contain output from before kill, got: {}",
2863 inner_content
2864 );
2865 }
2866
2867 #[gpui::test]
2868 async fn test_push_user_content_block(cx: &mut gpui::TestAppContext) {
2869 init_test(cx);
2870
2871 let fs = FakeFs::new(cx.executor());
2872 let project = Project::test(fs, [], cx).await;
2873 let connection = Rc::new(FakeAgentConnection::new());
2874 let thread = cx
2875 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
2876 .await
2877 .unwrap();
2878
2879 // Test creating a new user message
2880 thread.update(cx, |thread, cx| {
2881 thread.push_user_content_block(None, "Hello, ".into(), cx);
2882 });
2883
2884 thread.update(cx, |thread, cx| {
2885 assert_eq!(thread.entries.len(), 1);
2886 if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
2887 assert_eq!(user_msg.id, None);
2888 assert_eq!(user_msg.content.to_markdown(cx), "Hello, ");
2889 } else {
2890 panic!("Expected UserMessage");
2891 }
2892 });
2893
2894 // Test appending to existing user message
2895 let message_1_id = UserMessageId::new();
2896 thread.update(cx, |thread, cx| {
2897 thread.push_user_content_block(Some(message_1_id.clone()), "world!".into(), cx);
2898 });
2899
2900 thread.update(cx, |thread, cx| {
2901 assert_eq!(thread.entries.len(), 1);
2902 if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
2903 assert_eq!(user_msg.id, Some(message_1_id));
2904 assert_eq!(user_msg.content.to_markdown(cx), "Hello, world!");
2905 } else {
2906 panic!("Expected UserMessage");
2907 }
2908 });
2909
2910 // Test creating new user message after assistant message
2911 thread.update(cx, |thread, cx| {
2912 thread.push_assistant_content_block("Assistant response".into(), false, cx);
2913 });
2914
2915 let message_2_id = UserMessageId::new();
2916 thread.update(cx, |thread, cx| {
2917 thread.push_user_content_block(
2918 Some(message_2_id.clone()),
2919 "New user message".into(),
2920 cx,
2921 );
2922 });
2923
2924 thread.update(cx, |thread, cx| {
2925 assert_eq!(thread.entries.len(), 3);
2926 if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[2] {
2927 assert_eq!(user_msg.id, Some(message_2_id));
2928 assert_eq!(user_msg.content.to_markdown(cx), "New user message");
2929 } else {
2930 panic!("Expected UserMessage at index 2");
2931 }
2932 });
2933 }
2934
2935 #[gpui::test]
2936 async fn test_thinking_concatenation(cx: &mut gpui::TestAppContext) {
2937 init_test(cx);
2938
2939 let fs = FakeFs::new(cx.executor());
2940 let project = Project::test(fs, [], cx).await;
2941 let connection = Rc::new(FakeAgentConnection::new().on_user_message(
2942 |_, thread, mut cx| {
2943 async move {
2944 thread.update(&mut cx, |thread, cx| {
2945 thread
2946 .handle_session_update(
2947 acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk::new(
2948 "Thinking ".into(),
2949 )),
2950 cx,
2951 )
2952 .unwrap();
2953 thread
2954 .handle_session_update(
2955 acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk::new(
2956 "hard!".into(),
2957 )),
2958 cx,
2959 )
2960 .unwrap();
2961 })?;
2962 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
2963 }
2964 .boxed_local()
2965 },
2966 ));
2967
2968 let thread = cx
2969 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
2970 .await
2971 .unwrap();
2972
2973 thread
2974 .update(cx, |thread, cx| thread.send_raw("Hello from Zed!", cx))
2975 .await
2976 .unwrap();
2977
2978 let output = thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
2979 assert_eq!(
2980 output,
2981 indoc! {r#"
2982 ## User
2983
2984 Hello from Zed!
2985
2986 ## Assistant
2987
2988 <thinking>
2989 Thinking hard!
2990 </thinking>
2991
2992 "#}
2993 );
2994 }
2995
2996 #[gpui::test]
2997 async fn test_edits_concurrently_to_user(cx: &mut TestAppContext) {
2998 init_test(cx);
2999
3000 let fs = FakeFs::new(cx.executor());
3001 fs.insert_tree(path!("/tmp"), json!({"foo": "one\ntwo\nthree\n"}))
3002 .await;
3003 let project = Project::test(fs.clone(), [], cx).await;
3004 let (read_file_tx, read_file_rx) = oneshot::channel::<()>();
3005 let read_file_tx = Rc::new(RefCell::new(Some(read_file_tx)));
3006 let connection = Rc::new(FakeAgentConnection::new().on_user_message(
3007 move |_, thread, mut cx| {
3008 let read_file_tx = read_file_tx.clone();
3009 async move {
3010 let content = thread
3011 .update(&mut cx, |thread, cx| {
3012 thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
3013 })
3014 .unwrap()
3015 .await
3016 .unwrap();
3017 assert_eq!(content, "one\ntwo\nthree\n");
3018 read_file_tx.take().unwrap().send(()).unwrap();
3019 thread
3020 .update(&mut cx, |thread, cx| {
3021 thread.write_text_file(
3022 path!("/tmp/foo").into(),
3023 "one\ntwo\nthree\nfour\nfive\n".to_string(),
3024 cx,
3025 )
3026 })
3027 .unwrap()
3028 .await
3029 .unwrap();
3030 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3031 }
3032 .boxed_local()
3033 },
3034 ));
3035
3036 let (worktree, pathbuf) = project
3037 .update(cx, |project, cx| {
3038 project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
3039 })
3040 .await
3041 .unwrap();
3042 let buffer = project
3043 .update(cx, |project, cx| {
3044 project.open_buffer((worktree.read(cx).id(), pathbuf), cx)
3045 })
3046 .await
3047 .unwrap();
3048
3049 let thread = cx
3050 .update(|cx| connection.new_thread(project, Path::new(path!("/tmp")), cx))
3051 .await
3052 .unwrap();
3053
3054 let request = thread.update(cx, |thread, cx| {
3055 thread.send_raw("Extend the count in /tmp/foo", cx)
3056 });
3057 read_file_rx.await.ok();
3058 buffer.update(cx, |buffer, cx| {
3059 buffer.edit([(0..0, "zero\n".to_string())], None, cx);
3060 });
3061 cx.run_until_parked();
3062 assert_eq!(
3063 buffer.read_with(cx, |buffer, _| buffer.text()),
3064 "zero\none\ntwo\nthree\nfour\nfive\n"
3065 );
3066 assert_eq!(
3067 String::from_utf8(fs.read_file_sync(path!("/tmp/foo")).unwrap()).unwrap(),
3068 "zero\none\ntwo\nthree\nfour\nfive\n"
3069 );
3070 request.await.unwrap();
3071 }
3072
3073 #[gpui::test]
3074 async fn test_reading_from_line(cx: &mut TestAppContext) {
3075 init_test(cx);
3076
3077 let fs = FakeFs::new(cx.executor());
3078 fs.insert_tree(path!("/tmp"), json!({"foo": "one\ntwo\nthree\nfour\n"}))
3079 .await;
3080 let project = Project::test(fs.clone(), [], cx).await;
3081 project
3082 .update(cx, |project, cx| {
3083 project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
3084 })
3085 .await
3086 .unwrap();
3087
3088 let connection = Rc::new(FakeAgentConnection::new());
3089
3090 let thread = cx
3091 .update(|cx| connection.new_thread(project, Path::new(path!("/tmp")), cx))
3092 .await
3093 .unwrap();
3094
3095 // Whole file
3096 let content = thread
3097 .update(cx, |thread, cx| {
3098 thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
3099 })
3100 .await
3101 .unwrap();
3102
3103 assert_eq!(content, "one\ntwo\nthree\nfour\n");
3104
3105 // Only start line
3106 let content = thread
3107 .update(cx, |thread, cx| {
3108 thread.read_text_file(path!("/tmp/foo").into(), Some(3), None, false, cx)
3109 })
3110 .await
3111 .unwrap();
3112
3113 assert_eq!(content, "three\nfour\n");
3114
3115 // Only limit
3116 let content = thread
3117 .update(cx, |thread, cx| {
3118 thread.read_text_file(path!("/tmp/foo").into(), None, Some(2), false, cx)
3119 })
3120 .await
3121 .unwrap();
3122
3123 assert_eq!(content, "one\ntwo\n");
3124
3125 // Range
3126 let content = thread
3127 .update(cx, |thread, cx| {
3128 thread.read_text_file(path!("/tmp/foo").into(), Some(2), Some(2), false, cx)
3129 })
3130 .await
3131 .unwrap();
3132
3133 assert_eq!(content, "two\nthree\n");
3134
3135 // Invalid
3136 let err = thread
3137 .update(cx, |thread, cx| {
3138 thread.read_text_file(path!("/tmp/foo").into(), Some(6), Some(2), false, cx)
3139 })
3140 .await
3141 .unwrap_err();
3142
3143 assert_eq!(
3144 err.to_string(),
3145 "Invalid params: \"Attempting to read beyond the end of the file, line 5:0\""
3146 );
3147 }
3148
3149 #[gpui::test]
3150 async fn test_reading_empty_file(cx: &mut TestAppContext) {
3151 init_test(cx);
3152
3153 let fs = FakeFs::new(cx.executor());
3154 fs.insert_tree(path!("/tmp"), json!({"foo": ""})).await;
3155 let project = Project::test(fs.clone(), [], cx).await;
3156 project
3157 .update(cx, |project, cx| {
3158 project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
3159 })
3160 .await
3161 .unwrap();
3162
3163 let connection = Rc::new(FakeAgentConnection::new());
3164
3165 let thread = cx
3166 .update(|cx| connection.new_thread(project, Path::new(path!("/tmp")), cx))
3167 .await
3168 .unwrap();
3169
3170 // Whole file
3171 let content = thread
3172 .update(cx, |thread, cx| {
3173 thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
3174 })
3175 .await
3176 .unwrap();
3177
3178 assert_eq!(content, "");
3179
3180 // Only start line
3181 let content = thread
3182 .update(cx, |thread, cx| {
3183 thread.read_text_file(path!("/tmp/foo").into(), Some(1), None, false, cx)
3184 })
3185 .await
3186 .unwrap();
3187
3188 assert_eq!(content, "");
3189
3190 // Only limit
3191 let content = thread
3192 .update(cx, |thread, cx| {
3193 thread.read_text_file(path!("/tmp/foo").into(), None, Some(2), false, cx)
3194 })
3195 .await
3196 .unwrap();
3197
3198 assert_eq!(content, "");
3199
3200 // Range
3201 let content = thread
3202 .update(cx, |thread, cx| {
3203 thread.read_text_file(path!("/tmp/foo").into(), Some(1), Some(1), false, cx)
3204 })
3205 .await
3206 .unwrap();
3207
3208 assert_eq!(content, "");
3209
3210 // Invalid
3211 let err = thread
3212 .update(cx, |thread, cx| {
3213 thread.read_text_file(path!("/tmp/foo").into(), Some(5), Some(2), false, cx)
3214 })
3215 .await
3216 .unwrap_err();
3217
3218 assert_eq!(
3219 err.to_string(),
3220 "Invalid params: \"Attempting to read beyond the end of the file, line 1:0\""
3221 );
3222 }
3223 #[gpui::test]
3224 async fn test_reading_non_existing_file(cx: &mut TestAppContext) {
3225 init_test(cx);
3226
3227 let fs = FakeFs::new(cx.executor());
3228 fs.insert_tree(path!("/tmp"), json!({})).await;
3229 let project = Project::test(fs.clone(), [], cx).await;
3230 project
3231 .update(cx, |project, cx| {
3232 project.find_or_create_worktree(path!("/tmp"), true, cx)
3233 })
3234 .await
3235 .unwrap();
3236
3237 let connection = Rc::new(FakeAgentConnection::new());
3238
3239 let thread = cx
3240 .update(|cx| connection.new_thread(project, Path::new(path!("/tmp")), cx))
3241 .await
3242 .unwrap();
3243
3244 // Out of project file
3245 let err = thread
3246 .update(cx, |thread, cx| {
3247 thread.read_text_file(path!("/foo").into(), None, None, false, cx)
3248 })
3249 .await
3250 .unwrap_err();
3251
3252 assert_eq!(err.code, acp::ErrorCode::ResourceNotFound);
3253 }
3254
3255 #[gpui::test]
3256 async fn test_succeeding_canceled_toolcall(cx: &mut TestAppContext) {
3257 init_test(cx);
3258
3259 let fs = FakeFs::new(cx.executor());
3260 let project = Project::test(fs, [], cx).await;
3261 let id = acp::ToolCallId::new("test");
3262
3263 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3264 let id = id.clone();
3265 move |_, thread, mut cx| {
3266 let id = id.clone();
3267 async move {
3268 thread
3269 .update(&mut cx, |thread, cx| {
3270 thread.handle_session_update(
3271 acp::SessionUpdate::ToolCall(
3272 acp::ToolCall::new(id.clone(), "Label")
3273 .kind(acp::ToolKind::Fetch)
3274 .status(acp::ToolCallStatus::InProgress),
3275 ),
3276 cx,
3277 )
3278 })
3279 .unwrap()
3280 .unwrap();
3281 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3282 }
3283 .boxed_local()
3284 }
3285 }));
3286
3287 let thread = cx
3288 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3289 .await
3290 .unwrap();
3291
3292 let request = thread.update(cx, |thread, cx| {
3293 thread.send_raw("Fetch https://example.com", cx)
3294 });
3295
3296 run_until_first_tool_call(&thread, cx).await;
3297
3298 thread.read_with(cx, |thread, _| {
3299 assert!(matches!(
3300 thread.entries[1],
3301 AgentThreadEntry::ToolCall(ToolCall {
3302 status: ToolCallStatus::InProgress,
3303 ..
3304 })
3305 ));
3306 });
3307
3308 thread.update(cx, |thread, cx| thread.cancel(cx)).await;
3309
3310 thread.read_with(cx, |thread, _| {
3311 assert!(matches!(
3312 &thread.entries[1],
3313 AgentThreadEntry::ToolCall(ToolCall {
3314 status: ToolCallStatus::Canceled,
3315 ..
3316 })
3317 ));
3318 });
3319
3320 thread
3321 .update(cx, |thread, cx| {
3322 thread.handle_session_update(
3323 acp::SessionUpdate::ToolCallUpdate(acp::ToolCallUpdate::new(
3324 id,
3325 acp::ToolCallUpdateFields::new().status(acp::ToolCallStatus::Completed),
3326 )),
3327 cx,
3328 )
3329 })
3330 .unwrap();
3331
3332 request.await.unwrap();
3333
3334 thread.read_with(cx, |thread, _| {
3335 assert!(matches!(
3336 thread.entries[1],
3337 AgentThreadEntry::ToolCall(ToolCall {
3338 status: ToolCallStatus::Completed,
3339 ..
3340 })
3341 ));
3342 });
3343 }
3344
3345 #[gpui::test]
3346 async fn test_no_pending_edits_if_tool_calls_are_completed(cx: &mut TestAppContext) {
3347 init_test(cx);
3348 let fs = FakeFs::new(cx.background_executor.clone());
3349 fs.insert_tree(path!("/test"), json!({})).await;
3350 let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
3351
3352 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3353 move |_, thread, mut cx| {
3354 async move {
3355 thread
3356 .update(&mut cx, |thread, cx| {
3357 thread.handle_session_update(
3358 acp::SessionUpdate::ToolCall(
3359 acp::ToolCall::new("test", "Label")
3360 .kind(acp::ToolKind::Edit)
3361 .status(acp::ToolCallStatus::Completed)
3362 .content(vec![acp::ToolCallContent::Diff(acp::Diff::new(
3363 "/test/test.txt",
3364 "foo",
3365 ))]),
3366 ),
3367 cx,
3368 )
3369 })
3370 .unwrap()
3371 .unwrap();
3372 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3373 }
3374 .boxed_local()
3375 }
3376 }));
3377
3378 let thread = cx
3379 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3380 .await
3381 .unwrap();
3382
3383 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Hi".into()], cx)))
3384 .await
3385 .unwrap();
3386
3387 assert!(cx.read(|cx| !thread.read(cx).has_pending_edit_tool_calls()));
3388 }
3389
3390 #[gpui::test(iterations = 10)]
3391 async fn test_checkpoints(cx: &mut TestAppContext) {
3392 init_test(cx);
3393 let fs = FakeFs::new(cx.background_executor.clone());
3394 fs.insert_tree(
3395 path!("/test"),
3396 json!({
3397 ".git": {}
3398 }),
3399 )
3400 .await;
3401 let project = Project::test(fs.clone(), [path!("/test").as_ref()], cx).await;
3402
3403 let simulate_changes = Arc::new(AtomicBool::new(true));
3404 let next_filename = Arc::new(AtomicUsize::new(0));
3405 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3406 let simulate_changes = simulate_changes.clone();
3407 let next_filename = next_filename.clone();
3408 let fs = fs.clone();
3409 move |request, thread, mut cx| {
3410 let fs = fs.clone();
3411 let simulate_changes = simulate_changes.clone();
3412 let next_filename = next_filename.clone();
3413 async move {
3414 if simulate_changes.load(SeqCst) {
3415 let filename = format!("/test/file-{}", next_filename.fetch_add(1, SeqCst));
3416 fs.write(Path::new(&filename), b"").await?;
3417 }
3418
3419 let acp::ContentBlock::Text(content) = &request.prompt[0] else {
3420 panic!("expected text content block");
3421 };
3422 thread.update(&mut cx, |thread, cx| {
3423 thread
3424 .handle_session_update(
3425 acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk::new(
3426 content.text.to_uppercase().into(),
3427 )),
3428 cx,
3429 )
3430 .unwrap();
3431 })?;
3432 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3433 }
3434 .boxed_local()
3435 }
3436 }));
3437 let thread = cx
3438 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3439 .await
3440 .unwrap();
3441
3442 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Lorem".into()], cx)))
3443 .await
3444 .unwrap();
3445 thread.read_with(cx, |thread, cx| {
3446 assert_eq!(
3447 thread.to_markdown(cx),
3448 indoc! {"
3449 ## User (checkpoint)
3450
3451 Lorem
3452
3453 ## Assistant
3454
3455 LOREM
3456
3457 "}
3458 );
3459 });
3460 assert_eq!(fs.files(), vec![Path::new(path!("/test/file-0"))]);
3461
3462 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["ipsum".into()], cx)))
3463 .await
3464 .unwrap();
3465 thread.read_with(cx, |thread, cx| {
3466 assert_eq!(
3467 thread.to_markdown(cx),
3468 indoc! {"
3469 ## User (checkpoint)
3470
3471 Lorem
3472
3473 ## Assistant
3474
3475 LOREM
3476
3477 ## User (checkpoint)
3478
3479 ipsum
3480
3481 ## Assistant
3482
3483 IPSUM
3484
3485 "}
3486 );
3487 });
3488 assert_eq!(
3489 fs.files(),
3490 vec![
3491 Path::new(path!("/test/file-0")),
3492 Path::new(path!("/test/file-1"))
3493 ]
3494 );
3495
3496 // Checkpoint isn't stored when there are no changes.
3497 simulate_changes.store(false, SeqCst);
3498 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["dolor".into()], cx)))
3499 .await
3500 .unwrap();
3501 thread.read_with(cx, |thread, cx| {
3502 assert_eq!(
3503 thread.to_markdown(cx),
3504 indoc! {"
3505 ## User (checkpoint)
3506
3507 Lorem
3508
3509 ## Assistant
3510
3511 LOREM
3512
3513 ## User (checkpoint)
3514
3515 ipsum
3516
3517 ## Assistant
3518
3519 IPSUM
3520
3521 ## User
3522
3523 dolor
3524
3525 ## Assistant
3526
3527 DOLOR
3528
3529 "}
3530 );
3531 });
3532 assert_eq!(
3533 fs.files(),
3534 vec![
3535 Path::new(path!("/test/file-0")),
3536 Path::new(path!("/test/file-1"))
3537 ]
3538 );
3539
3540 // Rewinding the conversation truncates the history and restores the checkpoint.
3541 thread
3542 .update(cx, |thread, cx| {
3543 let AgentThreadEntry::UserMessage(message) = &thread.entries[2] else {
3544 panic!("unexpected entries {:?}", thread.entries)
3545 };
3546 thread.restore_checkpoint(message.id.clone().unwrap(), cx)
3547 })
3548 .await
3549 .unwrap();
3550 thread.read_with(cx, |thread, cx| {
3551 assert_eq!(
3552 thread.to_markdown(cx),
3553 indoc! {"
3554 ## User (checkpoint)
3555
3556 Lorem
3557
3558 ## Assistant
3559
3560 LOREM
3561
3562 "}
3563 );
3564 });
3565 assert_eq!(fs.files(), vec![Path::new(path!("/test/file-0"))]);
3566 }
3567
3568 #[gpui::test]
3569 async fn test_tool_result_refusal(cx: &mut TestAppContext) {
3570 use std::sync::atomic::AtomicUsize;
3571 init_test(cx);
3572
3573 let fs = FakeFs::new(cx.executor());
3574 let project = Project::test(fs, None, cx).await;
3575
3576 // Create a connection that simulates refusal after tool result
3577 let prompt_count = Arc::new(AtomicUsize::new(0));
3578 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3579 let prompt_count = prompt_count.clone();
3580 move |_request, thread, mut cx| {
3581 let count = prompt_count.fetch_add(1, SeqCst);
3582 async move {
3583 if count == 0 {
3584 // First prompt: Generate a tool call with result
3585 thread.update(&mut cx, |thread, cx| {
3586 thread
3587 .handle_session_update(
3588 acp::SessionUpdate::ToolCall(
3589 acp::ToolCall::new("tool1", "Test Tool")
3590 .kind(acp::ToolKind::Fetch)
3591 .status(acp::ToolCallStatus::Completed)
3592 .raw_input(serde_json::json!({"query": "test"}))
3593 .raw_output(serde_json::json!({"result": "inappropriate content"})),
3594 ),
3595 cx,
3596 )
3597 .unwrap();
3598 })?;
3599
3600 // Now return refusal because of the tool result
3601 Ok(acp::PromptResponse::new(acp::StopReason::Refusal))
3602 } else {
3603 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3604 }
3605 }
3606 .boxed_local()
3607 }
3608 }));
3609
3610 let thread = cx
3611 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3612 .await
3613 .unwrap();
3614
3615 // Track if we see a Refusal event
3616 let saw_refusal_event = Arc::new(std::sync::Mutex::new(false));
3617 let saw_refusal_event_captured = saw_refusal_event.clone();
3618 thread.update(cx, |_thread, cx| {
3619 cx.subscribe(
3620 &thread,
3621 move |_thread, _event_thread, event: &AcpThreadEvent, _cx| {
3622 if matches!(event, AcpThreadEvent::Refusal) {
3623 *saw_refusal_event_captured.lock().unwrap() = true;
3624 }
3625 },
3626 )
3627 .detach();
3628 });
3629
3630 // Send a user message - this will trigger tool call and then refusal
3631 let send_task = thread.update(cx, |thread, cx| thread.send(vec!["Hello".into()], cx));
3632 cx.background_executor.spawn(send_task).detach();
3633 cx.run_until_parked();
3634
3635 // Verify that:
3636 // 1. A Refusal event WAS emitted (because it's a tool result refusal, not user prompt)
3637 // 2. The user message was NOT truncated
3638 assert!(
3639 *saw_refusal_event.lock().unwrap(),
3640 "Refusal event should be emitted for tool result refusals"
3641 );
3642
3643 thread.read_with(cx, |thread, _| {
3644 let entries = thread.entries();
3645 assert!(entries.len() >= 2, "Should have user message and tool call");
3646
3647 // Verify user message is still there
3648 assert!(
3649 matches!(entries[0], AgentThreadEntry::UserMessage(_)),
3650 "User message should not be truncated"
3651 );
3652
3653 // Verify tool call is there with result
3654 if let AgentThreadEntry::ToolCall(tool_call) = &entries[1] {
3655 assert!(
3656 tool_call.raw_output.is_some(),
3657 "Tool call should have output"
3658 );
3659 } else {
3660 panic!("Expected tool call at index 1");
3661 }
3662 });
3663 }
3664
3665 #[gpui::test]
3666 async fn test_user_prompt_refusal_emits_event(cx: &mut TestAppContext) {
3667 init_test(cx);
3668
3669 let fs = FakeFs::new(cx.executor());
3670 let project = Project::test(fs, None, cx).await;
3671
3672 let refuse_next = Arc::new(AtomicBool::new(false));
3673 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3674 let refuse_next = refuse_next.clone();
3675 move |_request, _thread, _cx| {
3676 if refuse_next.load(SeqCst) {
3677 async move { Ok(acp::PromptResponse::new(acp::StopReason::Refusal)) }
3678 .boxed_local()
3679 } else {
3680 async move { Ok(acp::PromptResponse::new(acp::StopReason::EndTurn)) }
3681 .boxed_local()
3682 }
3683 }
3684 }));
3685
3686 let thread = cx
3687 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3688 .await
3689 .unwrap();
3690
3691 // Track if we see a Refusal event
3692 let saw_refusal_event = Arc::new(std::sync::Mutex::new(false));
3693 let saw_refusal_event_captured = saw_refusal_event.clone();
3694 thread.update(cx, |_thread, cx| {
3695 cx.subscribe(
3696 &thread,
3697 move |_thread, _event_thread, event: &AcpThreadEvent, _cx| {
3698 if matches!(event, AcpThreadEvent::Refusal) {
3699 *saw_refusal_event_captured.lock().unwrap() = true;
3700 }
3701 },
3702 )
3703 .detach();
3704 });
3705
3706 // Send a message that will be refused
3707 refuse_next.store(true, SeqCst);
3708 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["hello".into()], cx)))
3709 .await
3710 .unwrap();
3711
3712 // Verify that a Refusal event WAS emitted for user prompt refusal
3713 assert!(
3714 *saw_refusal_event.lock().unwrap(),
3715 "Refusal event should be emitted for user prompt refusals"
3716 );
3717
3718 // Verify the message was truncated (user prompt refusal)
3719 thread.read_with(cx, |thread, cx| {
3720 assert_eq!(thread.to_markdown(cx), "");
3721 });
3722 }
3723
3724 #[gpui::test]
3725 async fn test_refusal(cx: &mut TestAppContext) {
3726 init_test(cx);
3727 let fs = FakeFs::new(cx.background_executor.clone());
3728 fs.insert_tree(path!("/"), json!({})).await;
3729 let project = Project::test(fs.clone(), [path!("/").as_ref()], cx).await;
3730
3731 let refuse_next = Arc::new(AtomicBool::new(false));
3732 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3733 let refuse_next = refuse_next.clone();
3734 move |request, thread, mut cx| {
3735 let refuse_next = refuse_next.clone();
3736 async move {
3737 if refuse_next.load(SeqCst) {
3738 return Ok(acp::PromptResponse::new(acp::StopReason::Refusal));
3739 }
3740
3741 let acp::ContentBlock::Text(content) = &request.prompt[0] else {
3742 panic!("expected text content block");
3743 };
3744 thread.update(&mut cx, |thread, cx| {
3745 thread
3746 .handle_session_update(
3747 acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk::new(
3748 content.text.to_uppercase().into(),
3749 )),
3750 cx,
3751 )
3752 .unwrap();
3753 })?;
3754 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3755 }
3756 .boxed_local()
3757 }
3758 }));
3759 let thread = cx
3760 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3761 .await
3762 .unwrap();
3763
3764 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["hello".into()], cx)))
3765 .await
3766 .unwrap();
3767 thread.read_with(cx, |thread, cx| {
3768 assert_eq!(
3769 thread.to_markdown(cx),
3770 indoc! {"
3771 ## User
3772
3773 hello
3774
3775 ## Assistant
3776
3777 HELLO
3778
3779 "}
3780 );
3781 });
3782
3783 // Simulate refusing the second message. The message should be truncated
3784 // when a user prompt is refused.
3785 refuse_next.store(true, SeqCst);
3786 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["world".into()], cx)))
3787 .await
3788 .unwrap();
3789 thread.read_with(cx, |thread, cx| {
3790 assert_eq!(
3791 thread.to_markdown(cx),
3792 indoc! {"
3793 ## User
3794
3795 hello
3796
3797 ## Assistant
3798
3799 HELLO
3800
3801 "}
3802 );
3803 });
3804 }
3805
3806 async fn run_until_first_tool_call(
3807 thread: &Entity<AcpThread>,
3808 cx: &mut TestAppContext,
3809 ) -> usize {
3810 let (mut tx, mut rx) = mpsc::channel::<usize>(1);
3811
3812 let subscription = cx.update(|cx| {
3813 cx.subscribe(thread, move |thread, _, cx| {
3814 for (ix, entry) in thread.read(cx).entries.iter().enumerate() {
3815 if matches!(entry, AgentThreadEntry::ToolCall(_)) {
3816 return tx.try_send(ix).unwrap();
3817 }
3818 }
3819 })
3820 });
3821
3822 select! {
3823 _ = futures::FutureExt::fuse(cx.background_executor.timer(Duration::from_secs(10))) => {
3824 panic!("Timeout waiting for tool call")
3825 }
3826 ix = rx.next().fuse() => {
3827 drop(subscription);
3828 ix.unwrap()
3829 }
3830 }
3831 }
3832
3833 #[derive(Clone, Default)]
3834 struct FakeAgentConnection {
3835 auth_methods: Vec<acp::AuthMethod>,
3836 sessions: Arc<parking_lot::Mutex<HashMap<acp::SessionId, WeakEntity<AcpThread>>>>,
3837 on_user_message: Option<
3838 Rc<
3839 dyn Fn(
3840 acp::PromptRequest,
3841 WeakEntity<AcpThread>,
3842 AsyncApp,
3843 ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
3844 + 'static,
3845 >,
3846 >,
3847 }
3848
3849 impl FakeAgentConnection {
3850 fn new() -> Self {
3851 Self {
3852 auth_methods: Vec::new(),
3853 on_user_message: None,
3854 sessions: Arc::default(),
3855 }
3856 }
3857
3858 #[expect(unused)]
3859 fn with_auth_methods(mut self, auth_methods: Vec<acp::AuthMethod>) -> Self {
3860 self.auth_methods = auth_methods;
3861 self
3862 }
3863
3864 fn on_user_message(
3865 mut self,
3866 handler: impl Fn(
3867 acp::PromptRequest,
3868 WeakEntity<AcpThread>,
3869 AsyncApp,
3870 ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
3871 + 'static,
3872 ) -> Self {
3873 self.on_user_message.replace(Rc::new(handler));
3874 self
3875 }
3876 }
3877
3878 impl AgentConnection for FakeAgentConnection {
3879 fn telemetry_id(&self) -> SharedString {
3880 "fake".into()
3881 }
3882
3883 fn auth_methods(&self) -> &[acp::AuthMethod] {
3884 &self.auth_methods
3885 }
3886
3887 fn new_thread(
3888 self: Rc<Self>,
3889 project: Entity<Project>,
3890 _cwd: &Path,
3891 cx: &mut App,
3892 ) -> Task<gpui::Result<Entity<AcpThread>>> {
3893 let session_id = acp::SessionId::new(
3894 rand::rng()
3895 .sample_iter(&distr::Alphanumeric)
3896 .take(7)
3897 .map(char::from)
3898 .collect::<String>(),
3899 );
3900 let action_log = cx.new(|_| ActionLog::new(project.clone()));
3901 let thread = cx.new(|cx| {
3902 AcpThread::new(
3903 "Test",
3904 self.clone(),
3905 project,
3906 action_log,
3907 session_id.clone(),
3908 watch::Receiver::constant(
3909 acp::PromptCapabilities::new()
3910 .image(true)
3911 .audio(true)
3912 .embedded_context(true),
3913 ),
3914 cx,
3915 )
3916 });
3917 self.sessions.lock().insert(session_id, thread.downgrade());
3918 Task::ready(Ok(thread))
3919 }
3920
3921 fn authenticate(&self, method: acp::AuthMethodId, _cx: &mut App) -> Task<gpui::Result<()>> {
3922 if self.auth_methods().iter().any(|m| m.id == method) {
3923 Task::ready(Ok(()))
3924 } else {
3925 Task::ready(Err(anyhow!("Invalid Auth Method")))
3926 }
3927 }
3928
3929 fn prompt(
3930 &self,
3931 _id: Option<UserMessageId>,
3932 params: acp::PromptRequest,
3933 cx: &mut App,
3934 ) -> Task<gpui::Result<acp::PromptResponse>> {
3935 let sessions = self.sessions.lock();
3936 let thread = sessions.get(¶ms.session_id).unwrap();
3937 if let Some(handler) = &self.on_user_message {
3938 let handler = handler.clone();
3939 let thread = thread.clone();
3940 cx.spawn(async move |cx| handler(params, thread, cx.clone()).await)
3941 } else {
3942 Task::ready(Ok(acp::PromptResponse::new(acp::StopReason::EndTurn)))
3943 }
3944 }
3945
3946 fn cancel(&self, session_id: &acp::SessionId, cx: &mut App) {
3947 let sessions = self.sessions.lock();
3948 let thread = sessions.get(session_id).unwrap().clone();
3949
3950 cx.spawn(async move |cx| {
3951 thread
3952 .update(cx, |thread, cx| thread.cancel(cx))
3953 .unwrap()
3954 .await
3955 })
3956 .detach();
3957 }
3958
3959 fn truncate(
3960 &self,
3961 session_id: &acp::SessionId,
3962 _cx: &App,
3963 ) -> Option<Rc<dyn AgentSessionTruncate>> {
3964 Some(Rc::new(FakeAgentSessionEditor {
3965 _session_id: session_id.clone(),
3966 }))
3967 }
3968
3969 fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
3970 self
3971 }
3972 }
3973
3974 struct FakeAgentSessionEditor {
3975 _session_id: acp::SessionId,
3976 }
3977
3978 impl AgentSessionTruncate for FakeAgentSessionEditor {
3979 fn run(&self, _message_id: UserMessageId, _cx: &mut App) -> Task<Result<()>> {
3980 Task::ready(Ok(()))
3981 }
3982 }
3983
3984 #[gpui::test]
3985 async fn test_tool_call_not_found_creates_failed_entry(cx: &mut TestAppContext) {
3986 init_test(cx);
3987
3988 let fs = FakeFs::new(cx.executor());
3989 let project = Project::test(fs, [], cx).await;
3990 let connection = Rc::new(FakeAgentConnection::new());
3991 let thread = cx
3992 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3993 .await
3994 .unwrap();
3995
3996 // Try to update a tool call that doesn't exist
3997 let nonexistent_id = acp::ToolCallId::new("nonexistent-tool-call");
3998 thread.update(cx, |thread, cx| {
3999 let result = thread.handle_session_update(
4000 acp::SessionUpdate::ToolCallUpdate(acp::ToolCallUpdate::new(
4001 nonexistent_id.clone(),
4002 acp::ToolCallUpdateFields::new().status(acp::ToolCallStatus::Completed),
4003 )),
4004 cx,
4005 );
4006
4007 // The update should succeed (not return an error)
4008 assert!(result.is_ok());
4009
4010 // There should now be exactly one entry in the thread
4011 assert_eq!(thread.entries.len(), 1);
4012
4013 // The entry should be a failed tool call
4014 if let AgentThreadEntry::ToolCall(tool_call) = &thread.entries[0] {
4015 assert_eq!(tool_call.id, nonexistent_id);
4016 assert!(matches!(tool_call.status, ToolCallStatus::Failed));
4017 assert_eq!(tool_call.kind, acp::ToolKind::Fetch);
4018
4019 // Check that the content contains the error message
4020 assert_eq!(tool_call.content.len(), 1);
4021 if let ToolCallContent::ContentBlock(content_block) = &tool_call.content[0] {
4022 match content_block {
4023 ContentBlock::Markdown { markdown } => {
4024 let markdown_text = markdown.read(cx).source();
4025 assert!(markdown_text.contains("Tool call not found"));
4026 }
4027 ContentBlock::Empty => panic!("Expected markdown content, got empty"),
4028 ContentBlock::ResourceLink { .. } => {
4029 panic!("Expected markdown content, got resource link")
4030 }
4031 ContentBlock::Image { .. } => {
4032 panic!("Expected markdown content, got image")
4033 }
4034 }
4035 } else {
4036 panic!("Expected ContentBlock, got: {:?}", tool_call.content[0]);
4037 }
4038 } else {
4039 panic!("Expected ToolCall entry, got: {:?}", thread.entries[0]);
4040 }
4041 });
4042 }
4043
4044 /// Tests that restoring a checkpoint properly cleans up terminals that were
4045 /// created after that checkpoint, and cancels any in-progress generation.
4046 ///
4047 /// Reproduces issue #35142: When a checkpoint is restored, any terminal processes
4048 /// that were started after that checkpoint should be terminated, and any in-progress
4049 /// AI generation should be canceled.
4050 #[gpui::test]
4051 async fn test_restore_checkpoint_kills_terminal(cx: &mut TestAppContext) {
4052 init_test(cx);
4053
4054 let fs = FakeFs::new(cx.executor());
4055 let project = Project::test(fs, [], cx).await;
4056 let connection = Rc::new(FakeAgentConnection::new());
4057 let thread = cx
4058 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
4059 .await
4060 .unwrap();
4061
4062 // Send first user message to create a checkpoint
4063 cx.update(|cx| {
4064 thread.update(cx, |thread, cx| {
4065 thread.send(vec!["first message".into()], cx)
4066 })
4067 })
4068 .await
4069 .unwrap();
4070
4071 // Send second message (creates another checkpoint) - we'll restore to this one
4072 cx.update(|cx| {
4073 thread.update(cx, |thread, cx| {
4074 thread.send(vec!["second message".into()], cx)
4075 })
4076 })
4077 .await
4078 .unwrap();
4079
4080 // Create 2 terminals BEFORE the checkpoint that have completed running
4081 let terminal_id_1 = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
4082 let mock_terminal_1 = cx.new(|cx| {
4083 let builder = ::terminal::TerminalBuilder::new_display_only(
4084 ::terminal::terminal_settings::CursorShape::default(),
4085 ::terminal::terminal_settings::AlternateScroll::On,
4086 None,
4087 0,
4088 )
4089 .unwrap();
4090 builder.subscribe(cx)
4091 });
4092
4093 thread.update(cx, |thread, cx| {
4094 thread.on_terminal_provider_event(
4095 TerminalProviderEvent::Created {
4096 terminal_id: terminal_id_1.clone(),
4097 label: "echo 'first'".to_string(),
4098 cwd: Some(PathBuf::from("/test")),
4099 output_byte_limit: None,
4100 terminal: mock_terminal_1.clone(),
4101 },
4102 cx,
4103 );
4104 });
4105
4106 thread.update(cx, |thread, cx| {
4107 thread.on_terminal_provider_event(
4108 TerminalProviderEvent::Output {
4109 terminal_id: terminal_id_1.clone(),
4110 data: b"first\n".to_vec(),
4111 },
4112 cx,
4113 );
4114 });
4115
4116 thread.update(cx, |thread, cx| {
4117 thread.on_terminal_provider_event(
4118 TerminalProviderEvent::Exit {
4119 terminal_id: terminal_id_1.clone(),
4120 status: acp::TerminalExitStatus::new().exit_code(0),
4121 },
4122 cx,
4123 );
4124 });
4125
4126 let terminal_id_2 = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
4127 let mock_terminal_2 = cx.new(|cx| {
4128 let builder = ::terminal::TerminalBuilder::new_display_only(
4129 ::terminal::terminal_settings::CursorShape::default(),
4130 ::terminal::terminal_settings::AlternateScroll::On,
4131 None,
4132 0,
4133 )
4134 .unwrap();
4135 builder.subscribe(cx)
4136 });
4137
4138 thread.update(cx, |thread, cx| {
4139 thread.on_terminal_provider_event(
4140 TerminalProviderEvent::Created {
4141 terminal_id: terminal_id_2.clone(),
4142 label: "echo 'second'".to_string(),
4143 cwd: Some(PathBuf::from("/test")),
4144 output_byte_limit: None,
4145 terminal: mock_terminal_2.clone(),
4146 },
4147 cx,
4148 );
4149 });
4150
4151 thread.update(cx, |thread, cx| {
4152 thread.on_terminal_provider_event(
4153 TerminalProviderEvent::Output {
4154 terminal_id: terminal_id_2.clone(),
4155 data: b"second\n".to_vec(),
4156 },
4157 cx,
4158 );
4159 });
4160
4161 thread.update(cx, |thread, cx| {
4162 thread.on_terminal_provider_event(
4163 TerminalProviderEvent::Exit {
4164 terminal_id: terminal_id_2.clone(),
4165 status: acp::TerminalExitStatus::new().exit_code(0),
4166 },
4167 cx,
4168 );
4169 });
4170
4171 // Get the second message ID to restore to
4172 let second_message_id = thread.read_with(cx, |thread, _| {
4173 // At this point we have:
4174 // - Index 0: First user message (with checkpoint)
4175 // - Index 1: Second user message (with checkpoint)
4176 // No assistant responses because FakeAgentConnection just returns EndTurn
4177 let AgentThreadEntry::UserMessage(message) = &thread.entries[1] else {
4178 panic!("expected user message at index 1");
4179 };
4180 message.id.clone().unwrap()
4181 });
4182
4183 // Create a terminal AFTER the checkpoint we'll restore to.
4184 // This simulates the AI agent starting a long-running terminal command.
4185 let terminal_id = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
4186 let mock_terminal = cx.new(|cx| {
4187 let builder = ::terminal::TerminalBuilder::new_display_only(
4188 ::terminal::terminal_settings::CursorShape::default(),
4189 ::terminal::terminal_settings::AlternateScroll::On,
4190 None,
4191 0,
4192 )
4193 .unwrap();
4194 builder.subscribe(cx)
4195 });
4196
4197 // Register the terminal as created
4198 thread.update(cx, |thread, cx| {
4199 thread.on_terminal_provider_event(
4200 TerminalProviderEvent::Created {
4201 terminal_id: terminal_id.clone(),
4202 label: "sleep 1000".to_string(),
4203 cwd: Some(PathBuf::from("/test")),
4204 output_byte_limit: None,
4205 terminal: mock_terminal.clone(),
4206 },
4207 cx,
4208 );
4209 });
4210
4211 // Simulate the terminal producing output (still running)
4212 thread.update(cx, |thread, cx| {
4213 thread.on_terminal_provider_event(
4214 TerminalProviderEvent::Output {
4215 terminal_id: terminal_id.clone(),
4216 data: b"terminal is running...\n".to_vec(),
4217 },
4218 cx,
4219 );
4220 });
4221
4222 // Create a tool call entry that references this terminal
4223 // This represents the agent requesting a terminal command
4224 thread.update(cx, |thread, cx| {
4225 thread
4226 .handle_session_update(
4227 acp::SessionUpdate::ToolCall(
4228 acp::ToolCall::new("terminal-tool-1", "Running command")
4229 .kind(acp::ToolKind::Execute)
4230 .status(acp::ToolCallStatus::InProgress)
4231 .content(vec![acp::ToolCallContent::Terminal(acp::Terminal::new(
4232 terminal_id.clone(),
4233 ))])
4234 .raw_input(serde_json::json!({"command": "sleep 1000", "cd": "/test"})),
4235 ),
4236 cx,
4237 )
4238 .unwrap();
4239 });
4240
4241 // Verify terminal exists and is in the thread
4242 let terminal_exists_before =
4243 thread.read_with(cx, |thread, _| thread.terminals.contains_key(&terminal_id));
4244 assert!(
4245 terminal_exists_before,
4246 "Terminal should exist before checkpoint restore"
4247 );
4248
4249 // Verify the terminal's underlying task is still running (not completed)
4250 let terminal_running_before = thread.read_with(cx, |thread, _cx| {
4251 let terminal_entity = thread.terminals.get(&terminal_id).unwrap();
4252 terminal_entity.read_with(cx, |term, _cx| {
4253 term.output().is_none() // output is None means it's still running
4254 })
4255 });
4256 assert!(
4257 terminal_running_before,
4258 "Terminal should be running before checkpoint restore"
4259 );
4260
4261 // Verify we have the expected entries before restore
4262 let entry_count_before = thread.read_with(cx, |thread, _| thread.entries.len());
4263 assert!(
4264 entry_count_before > 1,
4265 "Should have multiple entries before restore"
4266 );
4267
4268 // Restore the checkpoint to the second message.
4269 // This should:
4270 // 1. Cancel any in-progress generation (via the cancel() call)
4271 // 2. Remove the terminal that was created after that point
4272 thread
4273 .update(cx, |thread, cx| {
4274 thread.restore_checkpoint(second_message_id, cx)
4275 })
4276 .await
4277 .unwrap();
4278
4279 // Verify that no send_task is in progress after restore
4280 // (cancel() clears the send_task)
4281 let has_send_task_after = thread.read_with(cx, |thread, _| thread.send_task.is_some());
4282 assert!(
4283 !has_send_task_after,
4284 "Should not have a send_task after restore (cancel should have cleared it)"
4285 );
4286
4287 // Verify the entries were truncated (restoring to index 1 truncates at 1, keeping only index 0)
4288 let entry_count = thread.read_with(cx, |thread, _| thread.entries.len());
4289 assert_eq!(
4290 entry_count, 1,
4291 "Should have 1 entry after restore (only the first user message)"
4292 );
4293
4294 // Verify the 2 completed terminals from before the checkpoint still exist
4295 let terminal_1_exists = thread.read_with(cx, |thread, _| {
4296 thread.terminals.contains_key(&terminal_id_1)
4297 });
4298 assert!(
4299 terminal_1_exists,
4300 "Terminal 1 (from before checkpoint) should still exist"
4301 );
4302
4303 let terminal_2_exists = thread.read_with(cx, |thread, _| {
4304 thread.terminals.contains_key(&terminal_id_2)
4305 });
4306 assert!(
4307 terminal_2_exists,
4308 "Terminal 2 (from before checkpoint) should still exist"
4309 );
4310
4311 // Verify they're still in completed state
4312 let terminal_1_completed = thread.read_with(cx, |thread, _cx| {
4313 let terminal_entity = thread.terminals.get(&terminal_id_1).unwrap();
4314 terminal_entity.read_with(cx, |term, _cx| term.output().is_some())
4315 });
4316 assert!(terminal_1_completed, "Terminal 1 should still be completed");
4317
4318 let terminal_2_completed = thread.read_with(cx, |thread, _cx| {
4319 let terminal_entity = thread.terminals.get(&terminal_id_2).unwrap();
4320 terminal_entity.read_with(cx, |term, _cx| term.output().is_some())
4321 });
4322 assert!(terminal_2_completed, "Terminal 2 should still be completed");
4323
4324 // Verify the running terminal (created after checkpoint) was removed
4325 let terminal_3_exists =
4326 thread.read_with(cx, |thread, _| thread.terminals.contains_key(&terminal_id));
4327 assert!(
4328 !terminal_3_exists,
4329 "Terminal 3 (created after checkpoint) should have been removed"
4330 );
4331
4332 // Verify total count is 2 (the two from before the checkpoint)
4333 let terminal_count = thread.read_with(cx, |thread, _| thread.terminals.len());
4334 assert_eq!(
4335 terminal_count, 2,
4336 "Should have exactly 2 terminals (the completed ones from before checkpoint)"
4337 );
4338 }
4339
4340 /// Tests that update_last_checkpoint correctly updates the original message's checkpoint
4341 /// even when a new user message is added while the async checkpoint comparison is in progress.
4342 ///
4343 /// This is a regression test for a bug where update_last_checkpoint would fail with
4344 /// "no checkpoint" if a new user message (without a checkpoint) was added between when
4345 /// update_last_checkpoint started and when its async closure ran.
4346 #[gpui::test]
4347 async fn test_update_last_checkpoint_with_new_message_added(cx: &mut TestAppContext) {
4348 init_test(cx);
4349
4350 let fs = FakeFs::new(cx.executor());
4351 fs.insert_tree(path!("/test"), json!({".git": {}, "file.txt": "content"}))
4352 .await;
4353 let project = Project::test(fs.clone(), [Path::new(path!("/test"))], cx).await;
4354
4355 let handler_done = Arc::new(AtomicBool::new(false));
4356 let handler_done_clone = handler_done.clone();
4357 let connection = Rc::new(FakeAgentConnection::new().on_user_message(
4358 move |_, _thread, _cx| {
4359 handler_done_clone.store(true, SeqCst);
4360 async move { Ok(acp::PromptResponse::new(acp::StopReason::EndTurn)) }.boxed_local()
4361 },
4362 ));
4363
4364 let thread = cx
4365 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
4366 .await
4367 .unwrap();
4368
4369 let send_future = thread.update(cx, |thread, cx| thread.send_raw("First message", cx));
4370 let send_task = cx.background_executor.spawn(send_future);
4371
4372 // Tick until handler completes, then a few more to let update_last_checkpoint start
4373 while !handler_done.load(SeqCst) {
4374 cx.executor().tick();
4375 }
4376 for _ in 0..5 {
4377 cx.executor().tick();
4378 }
4379
4380 thread.update(cx, |thread, cx| {
4381 thread.push_entry(
4382 AgentThreadEntry::UserMessage(UserMessage {
4383 id: Some(UserMessageId::new()),
4384 content: ContentBlock::Empty,
4385 chunks: vec!["Injected message (no checkpoint)".into()],
4386 checkpoint: None,
4387 indented: false,
4388 }),
4389 cx,
4390 );
4391 });
4392
4393 cx.run_until_parked();
4394 let result = send_task.await;
4395
4396 assert!(
4397 result.is_ok(),
4398 "send should succeed even when new message added during update_last_checkpoint: {:?}",
4399 result.err()
4400 );
4401 }
4402}