1mod connection;
2mod diff;
3mod mention;
4mod terminal;
5
6/// Key used in ACP ToolCall meta to store the tool's programmatic name.
7/// This is a workaround since ACP's ToolCall doesn't have a dedicated name field.
8pub const TOOL_NAME_META_KEY: &str = "tool_name";
9
10/// Key used in ACP ToolCall meta to store the session id when a subagent is spawned.
11pub const SUBAGENT_SESSION_ID_META_KEY: &str = "subagent_session_id";
12
13/// Helper to extract tool name from ACP meta
14pub fn tool_name_from_meta(meta: &Option<acp::Meta>) -> Option<SharedString> {
15 meta.as_ref()
16 .and_then(|m| m.get(TOOL_NAME_META_KEY))
17 .and_then(|v| v.as_str())
18 .map(|s| SharedString::from(s.to_owned()))
19}
20
21/// Helper to extract subagent session id from ACP meta
22pub fn subagent_session_id_from_meta(meta: &Option<acp::Meta>) -> Option<acp::SessionId> {
23 meta.as_ref()
24 .and_then(|m| m.get(SUBAGENT_SESSION_ID_META_KEY))
25 .and_then(|v| v.as_str())
26 .map(|s| acp::SessionId::from(s.to_string()))
27}
28
29/// Helper to create meta with tool name
30pub fn meta_with_tool_name(tool_name: &str) -> acp::Meta {
31 acp::Meta::from_iter([(TOOL_NAME_META_KEY.into(), tool_name.into())])
32}
33use collections::HashSet;
34pub use connection::*;
35pub use diff::*;
36use language::language_settings::FormatOnSave;
37pub use mention::*;
38use project::lsp_store::{FormatTrigger, LspFormatTarget};
39use serde::{Deserialize, Serialize};
40use serde_json::to_string_pretty;
41
42use task::{Shell, ShellBuilder};
43pub use terminal::*;
44
45use action_log::{ActionLog, ActionLogTelemetry};
46use agent_client_protocol::{self as acp};
47use anyhow::{Context as _, Result, anyhow};
48use futures::{FutureExt, channel::oneshot, future::BoxFuture};
49use gpui::{AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Task, WeakEntity};
50use itertools::Itertools;
51use language::{Anchor, Buffer, BufferSnapshot, LanguageRegistry, Point, ToPoint, text_diff};
52use markdown::Markdown;
53use project::{AgentLocation, Project, git_store::GitStoreCheckpoint};
54use std::collections::HashMap;
55use std::error::Error;
56use std::fmt::{Formatter, Write};
57use std::ops::Range;
58use std::process::ExitStatus;
59use std::rc::Rc;
60use std::time::{Duration, Instant};
61use std::{fmt::Display, mem, path::PathBuf, sync::Arc};
62use text::Bias;
63use ui::App;
64use util::{ResultExt, get_default_system_shell_preferring_bash, paths::PathStyle};
65use uuid::Uuid;
66
67#[derive(Debug)]
68pub struct UserMessage {
69 pub id: Option<UserMessageId>,
70 pub content: ContentBlock,
71 pub chunks: Vec<acp::ContentBlock>,
72 pub checkpoint: Option<Checkpoint>,
73 pub indented: bool,
74}
75
76#[derive(Debug)]
77pub struct Checkpoint {
78 git_checkpoint: GitStoreCheckpoint,
79 pub show: bool,
80}
81
82impl UserMessage {
83 fn to_markdown(&self, cx: &App) -> String {
84 let mut markdown = String::new();
85 if self
86 .checkpoint
87 .as_ref()
88 .is_some_and(|checkpoint| checkpoint.show)
89 {
90 writeln!(markdown, "## User (checkpoint)").unwrap();
91 } else {
92 writeln!(markdown, "## User").unwrap();
93 }
94 writeln!(markdown).unwrap();
95 writeln!(markdown, "{}", self.content.to_markdown(cx)).unwrap();
96 writeln!(markdown).unwrap();
97 markdown
98 }
99}
100
101#[derive(Debug, PartialEq)]
102pub struct AssistantMessage {
103 pub chunks: Vec<AssistantMessageChunk>,
104 pub indented: bool,
105}
106
107impl AssistantMessage {
108 pub fn to_markdown(&self, cx: &App) -> String {
109 format!(
110 "## Assistant\n\n{}\n\n",
111 self.chunks
112 .iter()
113 .map(|chunk| chunk.to_markdown(cx))
114 .join("\n\n")
115 )
116 }
117}
118
119#[derive(Debug, PartialEq)]
120pub enum AssistantMessageChunk {
121 Message { block: ContentBlock },
122 Thought { block: ContentBlock },
123}
124
125impl AssistantMessageChunk {
126 pub fn from_str(
127 chunk: &str,
128 language_registry: &Arc<LanguageRegistry>,
129 path_style: PathStyle,
130 cx: &mut App,
131 ) -> Self {
132 Self::Message {
133 block: ContentBlock::new(chunk.into(), language_registry, path_style, cx),
134 }
135 }
136
137 fn to_markdown(&self, cx: &App) -> String {
138 match self {
139 Self::Message { block } => block.to_markdown(cx).to_string(),
140 Self::Thought { block } => {
141 format!("<thinking>\n{}\n</thinking>", block.to_markdown(cx))
142 }
143 }
144 }
145}
146
147#[derive(Debug)]
148pub enum AgentThreadEntry {
149 UserMessage(UserMessage),
150 AssistantMessage(AssistantMessage),
151 ToolCall(ToolCall),
152}
153
154impl AgentThreadEntry {
155 pub fn is_indented(&self) -> bool {
156 match self {
157 Self::UserMessage(message) => message.indented,
158 Self::AssistantMessage(message) => message.indented,
159 Self::ToolCall(_) => false,
160 }
161 }
162
163 pub fn to_markdown(&self, cx: &App) -> String {
164 match self {
165 Self::UserMessage(message) => message.to_markdown(cx),
166 Self::AssistantMessage(message) => message.to_markdown(cx),
167 Self::ToolCall(tool_call) => tool_call.to_markdown(cx),
168 }
169 }
170
171 pub fn user_message(&self) -> Option<&UserMessage> {
172 if let AgentThreadEntry::UserMessage(message) = self {
173 Some(message)
174 } else {
175 None
176 }
177 }
178
179 pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
180 if let AgentThreadEntry::ToolCall(call) = self {
181 itertools::Either::Left(call.diffs())
182 } else {
183 itertools::Either::Right(std::iter::empty())
184 }
185 }
186
187 pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
188 if let AgentThreadEntry::ToolCall(call) = self {
189 itertools::Either::Left(call.terminals())
190 } else {
191 itertools::Either::Right(std::iter::empty())
192 }
193 }
194
195 pub fn location(&self, ix: usize) -> Option<(acp::ToolCallLocation, AgentLocation)> {
196 if let AgentThreadEntry::ToolCall(ToolCall {
197 locations,
198 resolved_locations,
199 ..
200 }) = self
201 {
202 Some((
203 locations.get(ix)?.clone(),
204 resolved_locations.get(ix)?.clone()?,
205 ))
206 } else {
207 None
208 }
209 }
210}
211
212#[derive(Debug)]
213pub struct ToolCall {
214 pub id: acp::ToolCallId,
215 pub label: Entity<Markdown>,
216 pub kind: acp::ToolKind,
217 pub content: Vec<ToolCallContent>,
218 pub status: ToolCallStatus,
219 pub locations: Vec<acp::ToolCallLocation>,
220 pub resolved_locations: Vec<Option<AgentLocation>>,
221 pub raw_input: Option<serde_json::Value>,
222 pub raw_input_markdown: Option<Entity<Markdown>>,
223 pub raw_output: Option<serde_json::Value>,
224 pub tool_name: Option<SharedString>,
225 pub subagent_session_id: Option<acp::SessionId>,
226}
227
228impl ToolCall {
229 fn from_acp(
230 tool_call: acp::ToolCall,
231 status: ToolCallStatus,
232 language_registry: Arc<LanguageRegistry>,
233 path_style: PathStyle,
234 terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
235 cx: &mut App,
236 ) -> Result<Self> {
237 let title = if tool_call.kind == acp::ToolKind::Execute {
238 tool_call.title
239 } else if let Some((first_line, _)) = tool_call.title.split_once("\n") {
240 first_line.to_owned() + "…"
241 } else {
242 tool_call.title
243 };
244 let mut content = Vec::with_capacity(tool_call.content.len());
245 for item in tool_call.content {
246 if let Some(item) = ToolCallContent::from_acp(
247 item,
248 language_registry.clone(),
249 path_style,
250 terminals,
251 cx,
252 )? {
253 content.push(item);
254 }
255 }
256
257 let raw_input_markdown = tool_call
258 .raw_input
259 .as_ref()
260 .and_then(|input| markdown_for_raw_output(input, &language_registry, cx));
261
262 let tool_name = tool_name_from_meta(&tool_call.meta);
263
264 let subagent_session = subagent_session_id_from_meta(&tool_call.meta);
265
266 let result = Self {
267 id: tool_call.tool_call_id,
268 label: cx
269 .new(|cx| Markdown::new(title.into(), Some(language_registry.clone()), None, cx)),
270 kind: tool_call.kind,
271 content,
272 locations: tool_call.locations,
273 resolved_locations: Vec::default(),
274 status,
275 raw_input: tool_call.raw_input,
276 raw_input_markdown,
277 raw_output: tool_call.raw_output,
278 tool_name,
279 subagent_session_id: subagent_session,
280 };
281 Ok(result)
282 }
283
284 fn update_fields(
285 &mut self,
286 fields: acp::ToolCallUpdateFields,
287 meta: Option<acp::Meta>,
288 language_registry: Arc<LanguageRegistry>,
289 path_style: PathStyle,
290 terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
291 cx: &mut App,
292 ) -> Result<()> {
293 let acp::ToolCallUpdateFields {
294 kind,
295 status,
296 title,
297 content,
298 locations,
299 raw_input,
300 raw_output,
301 ..
302 } = fields;
303
304 if let Some(kind) = kind {
305 self.kind = kind;
306 }
307
308 if let Some(status) = status {
309 self.status = status.into();
310 }
311
312 if let Some(subagent_session_id) = subagent_session_id_from_meta(&meta) {
313 self.subagent_session_id = Some(subagent_session_id);
314 }
315
316 if let Some(title) = title {
317 self.label.update(cx, |label, cx| {
318 if self.kind == acp::ToolKind::Execute {
319 label.replace(title, cx);
320 } else if let Some((first_line, _)) = title.split_once("\n") {
321 label.replace(first_line.to_owned() + "…", cx);
322 } else {
323 label.replace(title, cx);
324 }
325 });
326 }
327
328 if let Some(content) = content {
329 let mut new_content_len = content.len();
330 let mut content = content.into_iter();
331
332 // Reuse existing content if we can
333 for (old, new) in self.content.iter_mut().zip(content.by_ref()) {
334 let valid_content =
335 old.update_from_acp(new, language_registry.clone(), path_style, terminals, cx)?;
336 if !valid_content {
337 new_content_len -= 1;
338 }
339 }
340 for new in content {
341 if let Some(new) = ToolCallContent::from_acp(
342 new,
343 language_registry.clone(),
344 path_style,
345 terminals,
346 cx,
347 )? {
348 self.content.push(new);
349 } else {
350 new_content_len -= 1;
351 }
352 }
353 self.content.truncate(new_content_len);
354 }
355
356 if let Some(locations) = locations {
357 self.locations = locations;
358 }
359
360 if let Some(raw_input) = raw_input {
361 self.raw_input_markdown = markdown_for_raw_output(&raw_input, &language_registry, cx);
362 self.raw_input = Some(raw_input);
363 }
364
365 if let Some(raw_output) = raw_output {
366 if self.content.is_empty()
367 && let Some(markdown) = markdown_for_raw_output(&raw_output, &language_registry, cx)
368 {
369 self.content
370 .push(ToolCallContent::ContentBlock(ContentBlock::Markdown {
371 markdown,
372 }));
373 }
374 self.raw_output = Some(raw_output);
375 }
376 Ok(())
377 }
378
379 pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
380 self.content.iter().filter_map(|content| match content {
381 ToolCallContent::Diff(diff) => Some(diff),
382 ToolCallContent::ContentBlock(_) => None,
383 ToolCallContent::Terminal(_) => None,
384 })
385 }
386
387 pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
388 self.content.iter().filter_map(|content| match content {
389 ToolCallContent::Terminal(terminal) => Some(terminal),
390 ToolCallContent::ContentBlock(_) => None,
391 ToolCallContent::Diff(_) => None,
392 })
393 }
394
395 pub fn is_subagent(&self) -> bool {
396 self.tool_name.as_ref().is_some_and(|s| s == "subagent")
397 || self.subagent_session_id.is_some()
398 }
399
400 pub fn to_markdown(&self, cx: &App) -> String {
401 let mut markdown = format!(
402 "**Tool Call: {}**\nStatus: {}\n\n",
403 self.label.read(cx).source(),
404 self.status
405 );
406 for content in &self.content {
407 markdown.push_str(content.to_markdown(cx).as_str());
408 markdown.push_str("\n\n");
409 }
410 markdown
411 }
412
413 async fn resolve_location(
414 location: acp::ToolCallLocation,
415 project: WeakEntity<Project>,
416 cx: &mut AsyncApp,
417 ) -> Option<ResolvedLocation> {
418 let buffer = project
419 .update(cx, |project, cx| {
420 project
421 .project_path_for_absolute_path(&location.path, cx)
422 .map(|path| project.open_buffer(path, cx))
423 })
424 .ok()??;
425 let buffer = buffer.await.log_err()?;
426 let position = buffer.update(cx, |buffer, _| {
427 let snapshot = buffer.snapshot();
428 if let Some(row) = location.line {
429 let column = snapshot.indent_size_for_line(row).len;
430 let point = snapshot.clip_point(Point::new(row, column), Bias::Left);
431 snapshot.anchor_before(point)
432 } else {
433 Anchor::min_for_buffer(snapshot.remote_id())
434 }
435 });
436
437 Some(ResolvedLocation { buffer, position })
438 }
439
440 fn resolve_locations(
441 &self,
442 project: Entity<Project>,
443 cx: &mut App,
444 ) -> Task<Vec<Option<ResolvedLocation>>> {
445 let locations = self.locations.clone();
446 project.update(cx, |_, cx| {
447 cx.spawn(async move |project, cx| {
448 let mut new_locations = Vec::new();
449 for location in locations {
450 new_locations.push(Self::resolve_location(location, project.clone(), cx).await);
451 }
452 new_locations
453 })
454 })
455 }
456}
457
458// Separate so we can hold a strong reference to the buffer
459// for saving on the thread
460#[derive(Clone, Debug, PartialEq, Eq)]
461struct ResolvedLocation {
462 buffer: Entity<Buffer>,
463 position: Anchor,
464}
465
466impl From<&ResolvedLocation> for AgentLocation {
467 fn from(value: &ResolvedLocation) -> Self {
468 Self {
469 buffer: value.buffer.downgrade(),
470 position: value.position,
471 }
472 }
473}
474
475#[derive(Debug)]
476pub enum ToolCallStatus {
477 /// The tool call hasn't started running yet, but we start showing it to
478 /// the user.
479 Pending,
480 /// The tool call is waiting for confirmation from the user.
481 WaitingForConfirmation {
482 options: PermissionOptions,
483 respond_tx: oneshot::Sender<acp::PermissionOptionId>,
484 },
485 /// The tool call is currently running.
486 InProgress,
487 /// The tool call completed successfully.
488 Completed,
489 /// The tool call failed.
490 Failed,
491 /// The user rejected the tool call.
492 Rejected,
493 /// The user canceled generation so the tool call was canceled.
494 Canceled,
495}
496
497impl From<acp::ToolCallStatus> for ToolCallStatus {
498 fn from(status: acp::ToolCallStatus) -> Self {
499 match status {
500 acp::ToolCallStatus::Pending => Self::Pending,
501 acp::ToolCallStatus::InProgress => Self::InProgress,
502 acp::ToolCallStatus::Completed => Self::Completed,
503 acp::ToolCallStatus::Failed => Self::Failed,
504 _ => Self::Pending,
505 }
506 }
507}
508
509impl Display for ToolCallStatus {
510 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
511 write!(
512 f,
513 "{}",
514 match self {
515 ToolCallStatus::Pending => "Pending",
516 ToolCallStatus::WaitingForConfirmation { .. } => "Waiting for confirmation",
517 ToolCallStatus::InProgress => "In Progress",
518 ToolCallStatus::Completed => "Completed",
519 ToolCallStatus::Failed => "Failed",
520 ToolCallStatus::Rejected => "Rejected",
521 ToolCallStatus::Canceled => "Canceled",
522 }
523 )
524 }
525}
526
527#[derive(Debug, PartialEq, Clone)]
528pub enum ContentBlock {
529 Empty,
530 Markdown { markdown: Entity<Markdown> },
531 ResourceLink { resource_link: acp::ResourceLink },
532 Image { image: Arc<gpui::Image> },
533}
534
535impl ContentBlock {
536 pub fn new(
537 block: acp::ContentBlock,
538 language_registry: &Arc<LanguageRegistry>,
539 path_style: PathStyle,
540 cx: &mut App,
541 ) -> Self {
542 let mut this = Self::Empty;
543 this.append(block, language_registry, path_style, cx);
544 this
545 }
546
547 pub fn new_combined(
548 blocks: impl IntoIterator<Item = acp::ContentBlock>,
549 language_registry: Arc<LanguageRegistry>,
550 path_style: PathStyle,
551 cx: &mut App,
552 ) -> Self {
553 let mut this = Self::Empty;
554 for block in blocks {
555 this.append(block, &language_registry, path_style, cx);
556 }
557 this
558 }
559
560 pub fn append(
561 &mut self,
562 block: acp::ContentBlock,
563 language_registry: &Arc<LanguageRegistry>,
564 path_style: PathStyle,
565 cx: &mut App,
566 ) {
567 match (&mut *self, &block) {
568 (ContentBlock::Empty, acp::ContentBlock::ResourceLink(resource_link)) => {
569 *self = ContentBlock::ResourceLink {
570 resource_link: resource_link.clone(),
571 };
572 }
573 (ContentBlock::Empty, acp::ContentBlock::Image(image_content)) => {
574 if let Some(image) = Self::decode_image(image_content) {
575 *self = ContentBlock::Image { image };
576 } else {
577 let new_content = Self::image_md(image_content);
578 *self = Self::create_markdown_block(new_content, language_registry, cx);
579 }
580 }
581 (ContentBlock::Empty, _) => {
582 let new_content = Self::block_string_contents(&block, path_style);
583 *self = Self::create_markdown_block(new_content, language_registry, cx);
584 }
585 (ContentBlock::Markdown { markdown }, _) => {
586 let new_content = Self::block_string_contents(&block, path_style);
587 markdown.update(cx, |markdown, cx| markdown.append(&new_content, cx));
588 }
589 (ContentBlock::ResourceLink { resource_link }, _) => {
590 let existing_content = Self::resource_link_md(&resource_link.uri, path_style);
591 let new_content = Self::block_string_contents(&block, path_style);
592 let combined = format!("{}\n{}", existing_content, new_content);
593 *self = Self::create_markdown_block(combined, language_registry, cx);
594 }
595 (ContentBlock::Image { .. }, _) => {
596 let new_content = Self::block_string_contents(&block, path_style);
597 let combined = format!("`Image`\n{}", new_content);
598 *self = Self::create_markdown_block(combined, language_registry, cx);
599 }
600 }
601 }
602
603 fn decode_image(image_content: &acp::ImageContent) -> Option<Arc<gpui::Image>> {
604 use base64::Engine as _;
605
606 let bytes = base64::engine::general_purpose::STANDARD
607 .decode(image_content.data.as_bytes())
608 .ok()?;
609 let format = gpui::ImageFormat::from_mime_type(&image_content.mime_type)?;
610 Some(Arc::new(gpui::Image::from_bytes(format, bytes)))
611 }
612
613 fn create_markdown_block(
614 content: String,
615 language_registry: &Arc<LanguageRegistry>,
616 cx: &mut App,
617 ) -> ContentBlock {
618 ContentBlock::Markdown {
619 markdown: cx
620 .new(|cx| Markdown::new(content.into(), Some(language_registry.clone()), None, cx)),
621 }
622 }
623
624 fn block_string_contents(block: &acp::ContentBlock, path_style: PathStyle) -> String {
625 match block {
626 acp::ContentBlock::Text(text_content) => text_content.text.clone(),
627 acp::ContentBlock::ResourceLink(resource_link) => {
628 Self::resource_link_md(&resource_link.uri, path_style)
629 }
630 acp::ContentBlock::Resource(acp::EmbeddedResource {
631 resource:
632 acp::EmbeddedResourceResource::TextResourceContents(acp::TextResourceContents {
633 uri,
634 ..
635 }),
636 ..
637 }) => Self::resource_link_md(uri, path_style),
638 acp::ContentBlock::Image(image) => Self::image_md(image),
639 _ => String::new(),
640 }
641 }
642
643 fn resource_link_md(uri: &str, path_style: PathStyle) -> String {
644 if let Some(uri) = MentionUri::parse(uri, path_style).log_err() {
645 uri.as_link().to_string()
646 } else {
647 uri.to_string()
648 }
649 }
650
651 fn image_md(_image: &acp::ImageContent) -> String {
652 "`Image`".into()
653 }
654
655 pub fn to_markdown<'a>(&'a self, cx: &'a App) -> &'a str {
656 match self {
657 ContentBlock::Empty => "",
658 ContentBlock::Markdown { markdown } => markdown.read(cx).source(),
659 ContentBlock::ResourceLink { resource_link } => &resource_link.uri,
660 ContentBlock::Image { .. } => "`Image`",
661 }
662 }
663
664 pub fn markdown(&self) -> Option<&Entity<Markdown>> {
665 match self {
666 ContentBlock::Empty => None,
667 ContentBlock::Markdown { markdown } => Some(markdown),
668 ContentBlock::ResourceLink { .. } => None,
669 ContentBlock::Image { .. } => None,
670 }
671 }
672
673 pub fn resource_link(&self) -> Option<&acp::ResourceLink> {
674 match self {
675 ContentBlock::ResourceLink { resource_link } => Some(resource_link),
676 _ => None,
677 }
678 }
679
680 pub fn image(&self) -> Option<&Arc<gpui::Image>> {
681 match self {
682 ContentBlock::Image { image } => Some(image),
683 _ => None,
684 }
685 }
686}
687
688#[derive(Debug)]
689pub enum ToolCallContent {
690 ContentBlock(ContentBlock),
691 Diff(Entity<Diff>),
692 Terminal(Entity<Terminal>),
693}
694
695impl ToolCallContent {
696 pub fn from_acp(
697 content: acp::ToolCallContent,
698 language_registry: Arc<LanguageRegistry>,
699 path_style: PathStyle,
700 terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
701 cx: &mut App,
702 ) -> Result<Option<Self>> {
703 match content {
704 acp::ToolCallContent::Content(acp::Content { content, .. }) => {
705 Ok(Some(Self::ContentBlock(ContentBlock::new(
706 content,
707 &language_registry,
708 path_style,
709 cx,
710 ))))
711 }
712 acp::ToolCallContent::Diff(diff) => Ok(Some(Self::Diff(cx.new(|cx| {
713 Diff::finalized(
714 diff.path.to_string_lossy().into_owned(),
715 diff.old_text,
716 diff.new_text,
717 language_registry,
718 cx,
719 )
720 })))),
721 acp::ToolCallContent::Terminal(acp::Terminal { terminal_id, .. }) => terminals
722 .get(&terminal_id)
723 .cloned()
724 .map(|terminal| Some(Self::Terminal(terminal)))
725 .ok_or_else(|| anyhow::anyhow!("Terminal with id `{}` not found", terminal_id)),
726 _ => Ok(None),
727 }
728 }
729
730 pub fn update_from_acp(
731 &mut self,
732 new: acp::ToolCallContent,
733 language_registry: Arc<LanguageRegistry>,
734 path_style: PathStyle,
735 terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
736 cx: &mut App,
737 ) -> Result<bool> {
738 let needs_update = match (&self, &new) {
739 (Self::Diff(old_diff), acp::ToolCallContent::Diff(new_diff)) => {
740 old_diff.read(cx).needs_update(
741 new_diff.old_text.as_deref().unwrap_or(""),
742 &new_diff.new_text,
743 cx,
744 )
745 }
746 _ => true,
747 };
748
749 if let Some(update) = Self::from_acp(new, language_registry, path_style, terminals, cx)? {
750 if needs_update {
751 *self = update;
752 }
753 Ok(true)
754 } else {
755 Ok(false)
756 }
757 }
758
759 pub fn to_markdown(&self, cx: &App) -> String {
760 match self {
761 Self::ContentBlock(content) => content.to_markdown(cx).to_string(),
762 Self::Diff(diff) => diff.read(cx).to_markdown(cx),
763 Self::Terminal(terminal) => terminal.read(cx).to_markdown(cx),
764 }
765 }
766
767 pub fn image(&self) -> Option<&Arc<gpui::Image>> {
768 match self {
769 Self::ContentBlock(content) => content.image(),
770 _ => None,
771 }
772 }
773}
774
775#[derive(Debug, PartialEq)]
776pub enum ToolCallUpdate {
777 UpdateFields(acp::ToolCallUpdate),
778 UpdateDiff(ToolCallUpdateDiff),
779 UpdateTerminal(ToolCallUpdateTerminal),
780}
781
782impl ToolCallUpdate {
783 fn id(&self) -> &acp::ToolCallId {
784 match self {
785 Self::UpdateFields(update) => &update.tool_call_id,
786 Self::UpdateDiff(diff) => &diff.id,
787 Self::UpdateTerminal(terminal) => &terminal.id,
788 }
789 }
790}
791
792impl From<acp::ToolCallUpdate> for ToolCallUpdate {
793 fn from(update: acp::ToolCallUpdate) -> Self {
794 Self::UpdateFields(update)
795 }
796}
797
798impl From<ToolCallUpdateDiff> for ToolCallUpdate {
799 fn from(diff: ToolCallUpdateDiff) -> Self {
800 Self::UpdateDiff(diff)
801 }
802}
803
804#[derive(Debug, PartialEq)]
805pub struct ToolCallUpdateDiff {
806 pub id: acp::ToolCallId,
807 pub diff: Entity<Diff>,
808}
809
810impl From<ToolCallUpdateTerminal> for ToolCallUpdate {
811 fn from(terminal: ToolCallUpdateTerminal) -> Self {
812 Self::UpdateTerminal(terminal)
813 }
814}
815
816#[derive(Debug, PartialEq)]
817pub struct ToolCallUpdateTerminal {
818 pub id: acp::ToolCallId,
819 pub terminal: Entity<Terminal>,
820}
821
822#[derive(Debug, Default)]
823pub struct Plan {
824 pub entries: Vec<PlanEntry>,
825}
826
827#[derive(Debug)]
828pub struct PlanStats<'a> {
829 pub in_progress_entry: Option<&'a PlanEntry>,
830 pub pending: u32,
831 pub completed: u32,
832}
833
834impl Plan {
835 pub fn is_empty(&self) -> bool {
836 self.entries.is_empty()
837 }
838
839 pub fn stats(&self) -> PlanStats<'_> {
840 let mut stats = PlanStats {
841 in_progress_entry: None,
842 pending: 0,
843 completed: 0,
844 };
845
846 for entry in &self.entries {
847 match &entry.status {
848 acp::PlanEntryStatus::Pending => {
849 stats.pending += 1;
850 }
851 acp::PlanEntryStatus::InProgress => {
852 stats.in_progress_entry = stats.in_progress_entry.or(Some(entry));
853 }
854 acp::PlanEntryStatus::Completed => {
855 stats.completed += 1;
856 }
857 _ => {}
858 }
859 }
860
861 stats
862 }
863}
864
865#[derive(Debug)]
866pub struct PlanEntry {
867 pub content: Entity<Markdown>,
868 pub priority: acp::PlanEntryPriority,
869 pub status: acp::PlanEntryStatus,
870}
871
872impl PlanEntry {
873 pub fn from_acp(entry: acp::PlanEntry, cx: &mut App) -> Self {
874 Self {
875 content: cx.new(|cx| Markdown::new(entry.content.into(), None, None, cx)),
876 priority: entry.priority,
877 status: entry.status,
878 }
879 }
880}
881
882#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
883pub struct TokenUsage {
884 pub max_tokens: u64,
885 pub used_tokens: u64,
886 pub input_tokens: u64,
887 pub output_tokens: u64,
888}
889
890impl TokenUsage {
891 pub fn ratio(&self) -> TokenUsageRatio {
892 #[cfg(debug_assertions)]
893 let warning_threshold: f32 = std::env::var("ZED_THREAD_WARNING_THRESHOLD")
894 .unwrap_or("0.8".to_string())
895 .parse()
896 .unwrap();
897 #[cfg(not(debug_assertions))]
898 let warning_threshold: f32 = 0.8;
899
900 // When the maximum is unknown because there is no selected model,
901 // avoid showing the token limit warning.
902 if self.max_tokens == 0 {
903 TokenUsageRatio::Normal
904 } else if self.used_tokens >= self.max_tokens {
905 TokenUsageRatio::Exceeded
906 } else if self.used_tokens as f32 / self.max_tokens as f32 >= warning_threshold {
907 TokenUsageRatio::Warning
908 } else {
909 TokenUsageRatio::Normal
910 }
911 }
912}
913
914#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
915pub enum TokenUsageRatio {
916 Normal,
917 Warning,
918 Exceeded,
919}
920
921#[derive(Debug, Clone)]
922pub struct RetryStatus {
923 pub last_error: SharedString,
924 pub attempt: usize,
925 pub max_attempts: usize,
926 pub started_at: Instant,
927 pub duration: Duration,
928}
929
930pub struct AcpThread {
931 parent_session_id: Option<acp::SessionId>,
932 title: SharedString,
933 entries: Vec<AgentThreadEntry>,
934 plan: Plan,
935 project: Entity<Project>,
936 action_log: Entity<ActionLog>,
937 shared_buffers: HashMap<Entity<Buffer>, BufferSnapshot>,
938 send_task: Option<Task<()>>,
939 connection: Rc<dyn AgentConnection>,
940 session_id: acp::SessionId,
941 token_usage: Option<TokenUsage>,
942 prompt_capabilities: acp::PromptCapabilities,
943 _observe_prompt_capabilities: Task<anyhow::Result<()>>,
944 terminals: HashMap<acp::TerminalId, Entity<Terminal>>,
945 pending_terminal_output: HashMap<acp::TerminalId, Vec<Vec<u8>>>,
946 pending_terminal_exit: HashMap<acp::TerminalId, acp::TerminalExitStatus>,
947 // subagent cancellation fields
948 user_stopped: Arc<std::sync::atomic::AtomicBool>,
949 user_stop_tx: watch::Sender<bool>,
950}
951
952impl From<&AcpThread> for ActionLogTelemetry {
953 fn from(value: &AcpThread) -> Self {
954 Self {
955 agent_telemetry_id: value.connection().telemetry_id(),
956 session_id: value.session_id.0.clone(),
957 }
958 }
959}
960
961#[derive(Debug)]
962pub enum AcpThreadEvent {
963 NewEntry,
964 TitleUpdated,
965 TokenUsageUpdated,
966 EntryUpdated(usize),
967 EntriesRemoved(Range<usize>),
968 ToolAuthorizationRequired,
969 Retry(RetryStatus),
970 SubagentSpawned(acp::SessionId),
971 Stopped,
972 Error,
973 LoadError(LoadError),
974 PromptCapabilitiesUpdated,
975 Refusal,
976 AvailableCommandsUpdated(Vec<acp::AvailableCommand>),
977 ModeUpdated(acp::SessionModeId),
978 ConfigOptionsUpdated(Vec<acp::SessionConfigOption>),
979}
980
981impl EventEmitter<AcpThreadEvent> for AcpThread {}
982
983#[derive(Debug, Clone)]
984pub enum TerminalProviderEvent {
985 Created {
986 terminal_id: acp::TerminalId,
987 label: String,
988 cwd: Option<PathBuf>,
989 output_byte_limit: Option<u64>,
990 terminal: Entity<::terminal::Terminal>,
991 },
992 Output {
993 terminal_id: acp::TerminalId,
994 data: Vec<u8>,
995 },
996 TitleChanged {
997 terminal_id: acp::TerminalId,
998 title: String,
999 },
1000 Exit {
1001 terminal_id: acp::TerminalId,
1002 status: acp::TerminalExitStatus,
1003 },
1004}
1005
1006#[derive(Debug, Clone)]
1007pub enum TerminalProviderCommand {
1008 WriteInput {
1009 terminal_id: acp::TerminalId,
1010 bytes: Vec<u8>,
1011 },
1012 Resize {
1013 terminal_id: acp::TerminalId,
1014 cols: u16,
1015 rows: u16,
1016 },
1017 Close {
1018 terminal_id: acp::TerminalId,
1019 },
1020}
1021
1022impl AcpThread {
1023 pub fn on_terminal_provider_event(
1024 &mut self,
1025 event: TerminalProviderEvent,
1026 cx: &mut Context<Self>,
1027 ) {
1028 match event {
1029 TerminalProviderEvent::Created {
1030 terminal_id,
1031 label,
1032 cwd,
1033 output_byte_limit,
1034 terminal,
1035 } => {
1036 let entity = self.register_terminal_created(
1037 terminal_id.clone(),
1038 label,
1039 cwd,
1040 output_byte_limit,
1041 terminal,
1042 cx,
1043 );
1044
1045 if let Some(mut chunks) = self.pending_terminal_output.remove(&terminal_id) {
1046 for data in chunks.drain(..) {
1047 entity.update(cx, |term, cx| {
1048 term.inner().update(cx, |inner, cx| {
1049 inner.write_output(&data, cx);
1050 })
1051 });
1052 }
1053 }
1054
1055 if let Some(_status) = self.pending_terminal_exit.remove(&terminal_id) {
1056 entity.update(cx, |_term, cx| {
1057 cx.notify();
1058 });
1059 }
1060
1061 cx.notify();
1062 }
1063 TerminalProviderEvent::Output { terminal_id, data } => {
1064 if let Some(entity) = self.terminals.get(&terminal_id) {
1065 entity.update(cx, |term, cx| {
1066 term.inner().update(cx, |inner, cx| {
1067 inner.write_output(&data, cx);
1068 })
1069 });
1070 } else {
1071 self.pending_terminal_output
1072 .entry(terminal_id)
1073 .or_default()
1074 .push(data);
1075 }
1076 }
1077 TerminalProviderEvent::TitleChanged { terminal_id, title } => {
1078 if let Some(entity) = self.terminals.get(&terminal_id) {
1079 entity.update(cx, |term, cx| {
1080 term.inner().update(cx, |inner, cx| {
1081 inner.breadcrumb_text = title;
1082 cx.emit(::terminal::Event::BreadcrumbsChanged);
1083 })
1084 });
1085 }
1086 }
1087 TerminalProviderEvent::Exit {
1088 terminal_id,
1089 status,
1090 } => {
1091 if let Some(entity) = self.terminals.get(&terminal_id) {
1092 entity.update(cx, |_term, cx| {
1093 cx.notify();
1094 });
1095 } else {
1096 self.pending_terminal_exit.insert(terminal_id, status);
1097 }
1098 }
1099 }
1100 }
1101}
1102
1103#[derive(PartialEq, Eq, Debug)]
1104pub enum ThreadStatus {
1105 Idle,
1106 Generating,
1107}
1108
1109#[derive(Debug, Clone)]
1110pub enum LoadError {
1111 Unsupported {
1112 command: SharedString,
1113 current_version: SharedString,
1114 minimum_version: SharedString,
1115 },
1116 FailedToInstall(SharedString),
1117 Exited {
1118 status: ExitStatus,
1119 },
1120 Other(SharedString),
1121}
1122
1123impl Display for LoadError {
1124 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1125 match self {
1126 LoadError::Unsupported {
1127 command: path,
1128 current_version,
1129 minimum_version,
1130 } => {
1131 write!(
1132 f,
1133 "version {current_version} from {path} is not supported (need at least {minimum_version})"
1134 )
1135 }
1136 LoadError::FailedToInstall(msg) => write!(f, "Failed to install: {msg}"),
1137 LoadError::Exited { status } => write!(f, "Server exited with status {status}"),
1138 LoadError::Other(msg) => write!(f, "{msg}"),
1139 }
1140 }
1141}
1142
1143impl Error for LoadError {}
1144
1145impl AcpThread {
1146 pub fn new(
1147 parent_session_id: Option<acp::SessionId>,
1148 title: impl Into<SharedString>,
1149 connection: Rc<dyn AgentConnection>,
1150 project: Entity<Project>,
1151 action_log: Entity<ActionLog>,
1152 session_id: acp::SessionId,
1153 mut prompt_capabilities_rx: watch::Receiver<acp::PromptCapabilities>,
1154 cx: &mut Context<Self>,
1155 ) -> Self {
1156 let prompt_capabilities = prompt_capabilities_rx.borrow().clone();
1157 let task = cx.spawn::<_, anyhow::Result<()>>(async move |this, cx| {
1158 loop {
1159 let caps = prompt_capabilities_rx.recv().await?;
1160 this.update(cx, |this, cx| {
1161 this.prompt_capabilities = caps;
1162 cx.emit(AcpThreadEvent::PromptCapabilitiesUpdated);
1163 })?;
1164 }
1165 });
1166
1167 let (user_stop_tx, _user_stop_rx) = watch::channel(false);
1168
1169 Self {
1170 parent_session_id,
1171 action_log,
1172 shared_buffers: Default::default(),
1173 entries: Default::default(),
1174 plan: Default::default(),
1175 title: title.into(),
1176 project,
1177 send_task: None,
1178 connection,
1179 session_id,
1180 token_usage: None,
1181 prompt_capabilities,
1182 _observe_prompt_capabilities: task,
1183 terminals: HashMap::default(),
1184 pending_terminal_output: HashMap::default(),
1185 pending_terminal_exit: HashMap::default(),
1186 user_stopped: Arc::new(std::sync::atomic::AtomicBool::new(false)),
1187 user_stop_tx,
1188 }
1189 }
1190
1191 pub fn parent_session_id(&self) -> Option<&acp::SessionId> {
1192 self.parent_session_id.as_ref()
1193 }
1194
1195 pub fn prompt_capabilities(&self) -> acp::PromptCapabilities {
1196 self.prompt_capabilities.clone()
1197 }
1198
1199 /// Marks this thread as stopped by user action and signals any listeners.
1200 pub fn stop_by_user(&mut self) {
1201 self.user_stopped
1202 .store(true, std::sync::atomic::Ordering::SeqCst);
1203 self.user_stop_tx.send(true).ok();
1204 self.send_task.take();
1205 }
1206
1207 pub fn was_stopped_by_user(&self) -> bool {
1208 self.user_stopped.load(std::sync::atomic::Ordering::SeqCst)
1209 }
1210
1211 pub fn user_stop_receiver(&self) -> watch::Receiver<bool> {
1212 self.user_stop_tx.receiver()
1213 }
1214
1215 pub fn connection(&self) -> &Rc<dyn AgentConnection> {
1216 &self.connection
1217 }
1218
1219 pub fn action_log(&self) -> &Entity<ActionLog> {
1220 &self.action_log
1221 }
1222
1223 pub fn project(&self) -> &Entity<Project> {
1224 &self.project
1225 }
1226
1227 pub fn title(&self) -> SharedString {
1228 self.title.clone()
1229 }
1230
1231 pub fn entries(&self) -> &[AgentThreadEntry] {
1232 &self.entries
1233 }
1234
1235 pub fn session_id(&self) -> &acp::SessionId {
1236 &self.session_id
1237 }
1238
1239 pub fn status(&self) -> ThreadStatus {
1240 if self.send_task.is_some() {
1241 ThreadStatus::Generating
1242 } else {
1243 ThreadStatus::Idle
1244 }
1245 }
1246
1247 pub fn token_usage(&self) -> Option<&TokenUsage> {
1248 self.token_usage.as_ref()
1249 }
1250
1251 pub fn has_pending_edit_tool_calls(&self) -> bool {
1252 for entry in self.entries.iter().rev() {
1253 match entry {
1254 AgentThreadEntry::UserMessage(_) => return false,
1255 AgentThreadEntry::ToolCall(
1256 call @ ToolCall {
1257 status: ToolCallStatus::InProgress | ToolCallStatus::Pending,
1258 ..
1259 },
1260 ) if call.diffs().next().is_some() => {
1261 return true;
1262 }
1263 AgentThreadEntry::ToolCall(_) | AgentThreadEntry::AssistantMessage(_) => {}
1264 }
1265 }
1266
1267 false
1268 }
1269
1270 pub fn has_in_progress_tool_calls(&self) -> bool {
1271 for entry in self.entries.iter().rev() {
1272 match entry {
1273 AgentThreadEntry::UserMessage(_) => return false,
1274 AgentThreadEntry::ToolCall(ToolCall {
1275 status: ToolCallStatus::InProgress | ToolCallStatus::Pending,
1276 ..
1277 }) => {
1278 return true;
1279 }
1280 AgentThreadEntry::ToolCall(_) | AgentThreadEntry::AssistantMessage(_) => {}
1281 }
1282 }
1283
1284 false
1285 }
1286
1287 pub fn used_tools_since_last_user_message(&self) -> bool {
1288 for entry in self.entries.iter().rev() {
1289 match entry {
1290 AgentThreadEntry::UserMessage(..) => return false,
1291 AgentThreadEntry::AssistantMessage(..) => continue,
1292 AgentThreadEntry::ToolCall(..) => return true,
1293 }
1294 }
1295
1296 false
1297 }
1298
1299 pub fn handle_session_update(
1300 &mut self,
1301 update: acp::SessionUpdate,
1302 cx: &mut Context<Self>,
1303 ) -> Result<(), acp::Error> {
1304 match update {
1305 acp::SessionUpdate::UserMessageChunk(acp::ContentChunk { content, .. }) => {
1306 self.push_user_content_block(None, content, cx);
1307 }
1308 acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk { content, .. }) => {
1309 self.push_assistant_content_block(content, false, cx);
1310 }
1311 acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk { content, .. }) => {
1312 self.push_assistant_content_block(content, true, cx);
1313 }
1314 acp::SessionUpdate::ToolCall(tool_call) => {
1315 self.upsert_tool_call(tool_call, cx)?;
1316 }
1317 acp::SessionUpdate::ToolCallUpdate(tool_call_update) => {
1318 self.update_tool_call(tool_call_update, cx)?;
1319 }
1320 acp::SessionUpdate::Plan(plan) => {
1321 self.update_plan(plan, cx);
1322 }
1323 acp::SessionUpdate::AvailableCommandsUpdate(acp::AvailableCommandsUpdate {
1324 available_commands,
1325 ..
1326 }) => cx.emit(AcpThreadEvent::AvailableCommandsUpdated(available_commands)),
1327 acp::SessionUpdate::CurrentModeUpdate(acp::CurrentModeUpdate {
1328 current_mode_id,
1329 ..
1330 }) => cx.emit(AcpThreadEvent::ModeUpdated(current_mode_id)),
1331 acp::SessionUpdate::ConfigOptionUpdate(acp::ConfigOptionUpdate {
1332 config_options,
1333 ..
1334 }) => cx.emit(AcpThreadEvent::ConfigOptionsUpdated(config_options)),
1335 _ => {}
1336 }
1337 Ok(())
1338 }
1339
1340 pub fn push_user_content_block(
1341 &mut self,
1342 message_id: Option<UserMessageId>,
1343 chunk: acp::ContentBlock,
1344 cx: &mut Context<Self>,
1345 ) {
1346 self.push_user_content_block_with_indent(message_id, chunk, false, cx)
1347 }
1348
1349 pub fn push_user_content_block_with_indent(
1350 &mut self,
1351 message_id: Option<UserMessageId>,
1352 chunk: acp::ContentBlock,
1353 indented: bool,
1354 cx: &mut Context<Self>,
1355 ) {
1356 let language_registry = self.project.read(cx).languages().clone();
1357 let path_style = self.project.read(cx).path_style(cx);
1358 let entries_len = self.entries.len();
1359
1360 if let Some(last_entry) = self.entries.last_mut()
1361 && let AgentThreadEntry::UserMessage(UserMessage {
1362 id,
1363 content,
1364 chunks,
1365 indented: existing_indented,
1366 ..
1367 }) = last_entry
1368 && *existing_indented == indented
1369 {
1370 *id = message_id.or(id.take());
1371 content.append(chunk.clone(), &language_registry, path_style, cx);
1372 chunks.push(chunk);
1373 let idx = entries_len - 1;
1374 cx.emit(AcpThreadEvent::EntryUpdated(idx));
1375 } else {
1376 let content = ContentBlock::new(chunk.clone(), &language_registry, path_style, cx);
1377 self.push_entry(
1378 AgentThreadEntry::UserMessage(UserMessage {
1379 id: message_id,
1380 content,
1381 chunks: vec![chunk],
1382 checkpoint: None,
1383 indented,
1384 }),
1385 cx,
1386 );
1387 }
1388 }
1389
1390 pub fn push_assistant_content_block(
1391 &mut self,
1392 chunk: acp::ContentBlock,
1393 is_thought: bool,
1394 cx: &mut Context<Self>,
1395 ) {
1396 self.push_assistant_content_block_with_indent(chunk, is_thought, false, cx)
1397 }
1398
1399 pub fn push_assistant_content_block_with_indent(
1400 &mut self,
1401 chunk: acp::ContentBlock,
1402 is_thought: bool,
1403 indented: bool,
1404 cx: &mut Context<Self>,
1405 ) {
1406 let language_registry = self.project.read(cx).languages().clone();
1407 let path_style = self.project.read(cx).path_style(cx);
1408 let entries_len = self.entries.len();
1409 if let Some(last_entry) = self.entries.last_mut()
1410 && let AgentThreadEntry::AssistantMessage(AssistantMessage {
1411 chunks,
1412 indented: existing_indented,
1413 }) = last_entry
1414 && *existing_indented == indented
1415 {
1416 let idx = entries_len - 1;
1417 cx.emit(AcpThreadEvent::EntryUpdated(idx));
1418 match (chunks.last_mut(), is_thought) {
1419 (Some(AssistantMessageChunk::Message { block }), false)
1420 | (Some(AssistantMessageChunk::Thought { block }), true) => {
1421 block.append(chunk, &language_registry, path_style, cx)
1422 }
1423 _ => {
1424 let block = ContentBlock::new(chunk, &language_registry, path_style, cx);
1425 if is_thought {
1426 chunks.push(AssistantMessageChunk::Thought { block })
1427 } else {
1428 chunks.push(AssistantMessageChunk::Message { block })
1429 }
1430 }
1431 }
1432 } else {
1433 let block = ContentBlock::new(chunk, &language_registry, path_style, cx);
1434 let chunk = if is_thought {
1435 AssistantMessageChunk::Thought { block }
1436 } else {
1437 AssistantMessageChunk::Message { block }
1438 };
1439
1440 self.push_entry(
1441 AgentThreadEntry::AssistantMessage(AssistantMessage {
1442 chunks: vec![chunk],
1443 indented,
1444 }),
1445 cx,
1446 );
1447 }
1448 }
1449
1450 fn push_entry(&mut self, entry: AgentThreadEntry, cx: &mut Context<Self>) {
1451 self.entries.push(entry);
1452 cx.emit(AcpThreadEvent::NewEntry);
1453 }
1454
1455 pub fn can_set_title(&mut self, cx: &mut Context<Self>) -> bool {
1456 self.connection.set_title(&self.session_id, cx).is_some()
1457 }
1458
1459 pub fn set_title(&mut self, title: SharedString, cx: &mut Context<Self>) -> Task<Result<()>> {
1460 if title != self.title {
1461 self.title = title.clone();
1462 cx.emit(AcpThreadEvent::TitleUpdated);
1463 if let Some(set_title) = self.connection.set_title(&self.session_id, cx) {
1464 return set_title.run(title, cx);
1465 }
1466 }
1467 Task::ready(Ok(()))
1468 }
1469
1470 pub fn subagent_spawned(&mut self, session_id: acp::SessionId, cx: &mut Context<Self>) {
1471 cx.emit(AcpThreadEvent::SubagentSpawned(session_id));
1472 }
1473
1474 pub fn update_token_usage(&mut self, usage: Option<TokenUsage>, cx: &mut Context<Self>) {
1475 self.token_usage = usage;
1476 cx.emit(AcpThreadEvent::TokenUsageUpdated);
1477 }
1478
1479 pub fn update_retry_status(&mut self, status: RetryStatus, cx: &mut Context<Self>) {
1480 cx.emit(AcpThreadEvent::Retry(status));
1481 }
1482
1483 pub fn update_tool_call(
1484 &mut self,
1485 update: impl Into<ToolCallUpdate>,
1486 cx: &mut Context<Self>,
1487 ) -> Result<()> {
1488 let update = update.into();
1489 let languages = self.project.read(cx).languages().clone();
1490 let path_style = self.project.read(cx).path_style(cx);
1491
1492 let ix = match self.index_for_tool_call(update.id()) {
1493 Some(ix) => ix,
1494 None => {
1495 // Tool call not found - create a failed tool call entry
1496 let failed_tool_call = ToolCall {
1497 id: update.id().clone(),
1498 label: cx.new(|cx| Markdown::new("Tool call not found".into(), None, None, cx)),
1499 kind: acp::ToolKind::Fetch,
1500 content: vec![ToolCallContent::ContentBlock(ContentBlock::new(
1501 "Tool call not found".into(),
1502 &languages,
1503 path_style,
1504 cx,
1505 ))],
1506 status: ToolCallStatus::Failed,
1507 locations: Vec::new(),
1508 resolved_locations: Vec::new(),
1509 raw_input: None,
1510 raw_input_markdown: None,
1511 raw_output: None,
1512 tool_name: None,
1513 subagent_session_id: None,
1514 };
1515 self.push_entry(AgentThreadEntry::ToolCall(failed_tool_call), cx);
1516 return Ok(());
1517 }
1518 };
1519 let AgentThreadEntry::ToolCall(call) = &mut self.entries[ix] else {
1520 unreachable!()
1521 };
1522
1523 match update {
1524 ToolCallUpdate::UpdateFields(update) => {
1525 let location_updated = update.fields.locations.is_some();
1526 call.update_fields(
1527 update.fields,
1528 update.meta,
1529 languages,
1530 path_style,
1531 &self.terminals,
1532 cx,
1533 )?;
1534 if location_updated {
1535 self.resolve_locations(update.tool_call_id, cx);
1536 }
1537 }
1538 ToolCallUpdate::UpdateDiff(update) => {
1539 call.content.clear();
1540 call.content.push(ToolCallContent::Diff(update.diff));
1541 }
1542 ToolCallUpdate::UpdateTerminal(update) => {
1543 call.content.clear();
1544 call.content
1545 .push(ToolCallContent::Terminal(update.terminal));
1546 }
1547 }
1548
1549 cx.emit(AcpThreadEvent::EntryUpdated(ix));
1550
1551 Ok(())
1552 }
1553
1554 /// Updates a tool call if id matches an existing entry, otherwise inserts a new one.
1555 pub fn upsert_tool_call(
1556 &mut self,
1557 tool_call: acp::ToolCall,
1558 cx: &mut Context<Self>,
1559 ) -> Result<(), acp::Error> {
1560 let status = tool_call.status.into();
1561 self.upsert_tool_call_inner(tool_call.into(), status, cx)
1562 }
1563
1564 /// Fails if id does not match an existing entry.
1565 pub fn upsert_tool_call_inner(
1566 &mut self,
1567 update: acp::ToolCallUpdate,
1568 status: ToolCallStatus,
1569 cx: &mut Context<Self>,
1570 ) -> Result<(), acp::Error> {
1571 let language_registry = self.project.read(cx).languages().clone();
1572 let path_style = self.project.read(cx).path_style(cx);
1573 let id = update.tool_call_id.clone();
1574
1575 let agent_telemetry_id = self.connection().telemetry_id();
1576 let session = self.session_id();
1577 if let ToolCallStatus::Completed | ToolCallStatus::Failed = status {
1578 let status = if matches!(status, ToolCallStatus::Completed) {
1579 "completed"
1580 } else {
1581 "failed"
1582 };
1583 telemetry::event!(
1584 "Agent Tool Call Completed",
1585 agent_telemetry_id,
1586 session,
1587 status
1588 );
1589 }
1590
1591 if let Some(ix) = self.index_for_tool_call(&id) {
1592 let AgentThreadEntry::ToolCall(call) = &mut self.entries[ix] else {
1593 unreachable!()
1594 };
1595
1596 call.update_fields(
1597 update.fields,
1598 update.meta,
1599 language_registry,
1600 path_style,
1601 &self.terminals,
1602 cx,
1603 )?;
1604 call.status = status;
1605
1606 cx.emit(AcpThreadEvent::EntryUpdated(ix));
1607 } else {
1608 let call = ToolCall::from_acp(
1609 update.try_into()?,
1610 status,
1611 language_registry,
1612 self.project.read(cx).path_style(cx),
1613 &self.terminals,
1614 cx,
1615 )?;
1616 self.push_entry(AgentThreadEntry::ToolCall(call), cx);
1617 };
1618
1619 self.resolve_locations(id, cx);
1620 Ok(())
1621 }
1622
1623 fn index_for_tool_call(&self, id: &acp::ToolCallId) -> Option<usize> {
1624 self.entries
1625 .iter()
1626 .enumerate()
1627 .rev()
1628 .find_map(|(index, entry)| {
1629 if let AgentThreadEntry::ToolCall(tool_call) = entry
1630 && &tool_call.id == id
1631 {
1632 Some(index)
1633 } else {
1634 None
1635 }
1636 })
1637 }
1638
1639 fn tool_call_mut(&mut self, id: &acp::ToolCallId) -> Option<(usize, &mut ToolCall)> {
1640 // The tool call we are looking for is typically the last one, or very close to the end.
1641 // At the moment, it doesn't seem like a hashmap would be a good fit for this use case.
1642 self.entries
1643 .iter_mut()
1644 .enumerate()
1645 .rev()
1646 .find_map(|(index, tool_call)| {
1647 if let AgentThreadEntry::ToolCall(tool_call) = tool_call
1648 && &tool_call.id == id
1649 {
1650 Some((index, tool_call))
1651 } else {
1652 None
1653 }
1654 })
1655 }
1656
1657 pub fn tool_call(&mut self, id: &acp::ToolCallId) -> Option<(usize, &ToolCall)> {
1658 self.entries
1659 .iter()
1660 .enumerate()
1661 .rev()
1662 .find_map(|(index, tool_call)| {
1663 if let AgentThreadEntry::ToolCall(tool_call) = tool_call
1664 && &tool_call.id == id
1665 {
1666 Some((index, tool_call))
1667 } else {
1668 None
1669 }
1670 })
1671 }
1672
1673 pub fn resolve_locations(&mut self, id: acp::ToolCallId, cx: &mut Context<Self>) {
1674 let project = self.project.clone();
1675 let Some((_, tool_call)) = self.tool_call_mut(&id) else {
1676 return;
1677 };
1678 let task = tool_call.resolve_locations(project, cx);
1679 cx.spawn(async move |this, cx| {
1680 let resolved_locations = task.await;
1681
1682 this.update(cx, |this, cx| {
1683 let project = this.project.clone();
1684
1685 for location in resolved_locations.iter().flatten() {
1686 this.shared_buffers
1687 .insert(location.buffer.clone(), location.buffer.read(cx).snapshot());
1688 }
1689 let Some((ix, tool_call)) = this.tool_call_mut(&id) else {
1690 return;
1691 };
1692
1693 if let Some(Some(location)) = resolved_locations.last() {
1694 project.update(cx, |project, cx| {
1695 let should_ignore = if let Some(agent_location) = project
1696 .agent_location()
1697 .filter(|agent_location| agent_location.buffer == location.buffer)
1698 {
1699 let snapshot = location.buffer.read(cx).snapshot();
1700 let old_position = agent_location.position.to_point(&snapshot);
1701 let new_position = location.position.to_point(&snapshot);
1702
1703 // ignore this so that when we get updates from the edit tool
1704 // the position doesn't reset to the startof line
1705 old_position.row == new_position.row
1706 && old_position.column > new_position.column
1707 } else {
1708 false
1709 };
1710 if !should_ignore {
1711 project.set_agent_location(Some(location.into()), cx);
1712 }
1713 });
1714 }
1715
1716 let resolved_locations = resolved_locations
1717 .iter()
1718 .map(|l| l.as_ref().map(|l| AgentLocation::from(l)))
1719 .collect::<Vec<_>>();
1720
1721 if tool_call.resolved_locations != resolved_locations {
1722 tool_call.resolved_locations = resolved_locations;
1723 cx.emit(AcpThreadEvent::EntryUpdated(ix));
1724 }
1725 })
1726 })
1727 .detach();
1728 }
1729
1730 pub fn request_tool_call_authorization(
1731 &mut self,
1732 tool_call: acp::ToolCallUpdate,
1733 options: PermissionOptions,
1734 cx: &mut Context<Self>,
1735 ) -> Result<BoxFuture<'static, acp::RequestPermissionOutcome>> {
1736 let (tx, rx) = oneshot::channel();
1737
1738 let status = ToolCallStatus::WaitingForConfirmation {
1739 options,
1740 respond_tx: tx,
1741 };
1742
1743 self.upsert_tool_call_inner(tool_call, status, cx)?;
1744 cx.emit(AcpThreadEvent::ToolAuthorizationRequired);
1745
1746 let fut = async {
1747 match rx.await {
1748 Ok(option) => acp::RequestPermissionOutcome::Selected(
1749 acp::SelectedPermissionOutcome::new(option),
1750 ),
1751 Err(oneshot::Canceled) => acp::RequestPermissionOutcome::Cancelled,
1752 }
1753 }
1754 .boxed();
1755
1756 Ok(fut)
1757 }
1758
1759 pub fn authorize_tool_call(
1760 &mut self,
1761 id: acp::ToolCallId,
1762 option_id: acp::PermissionOptionId,
1763 option_kind: acp::PermissionOptionKind,
1764 cx: &mut Context<Self>,
1765 ) {
1766 let Some((ix, call)) = self.tool_call_mut(&id) else {
1767 return;
1768 };
1769
1770 let new_status = match option_kind {
1771 acp::PermissionOptionKind::RejectOnce | acp::PermissionOptionKind::RejectAlways => {
1772 ToolCallStatus::Rejected
1773 }
1774 acp::PermissionOptionKind::AllowOnce | acp::PermissionOptionKind::AllowAlways => {
1775 ToolCallStatus::InProgress
1776 }
1777 _ => ToolCallStatus::InProgress,
1778 };
1779
1780 let curr_status = mem::replace(&mut call.status, new_status);
1781
1782 if let ToolCallStatus::WaitingForConfirmation { respond_tx, .. } = curr_status {
1783 respond_tx.send(option_id).log_err();
1784 } else if cfg!(debug_assertions) {
1785 panic!("tried to authorize an already authorized tool call");
1786 }
1787
1788 cx.emit(AcpThreadEvent::EntryUpdated(ix));
1789 }
1790
1791 pub fn first_tool_awaiting_confirmation(&self) -> Option<&ToolCall> {
1792 let mut first_tool_call = None;
1793
1794 for entry in self.entries.iter().rev() {
1795 match &entry {
1796 AgentThreadEntry::ToolCall(call) => {
1797 if let ToolCallStatus::WaitingForConfirmation { .. } = call.status {
1798 first_tool_call = Some(call);
1799 } else {
1800 continue;
1801 }
1802 }
1803 AgentThreadEntry::UserMessage(_) | AgentThreadEntry::AssistantMessage(_) => {
1804 // Reached the beginning of the turn.
1805 // If we had pending permission requests in the previous turn, they have been cancelled.
1806 break;
1807 }
1808 }
1809 }
1810
1811 first_tool_call
1812 }
1813
1814 pub fn plan(&self) -> &Plan {
1815 &self.plan
1816 }
1817
1818 pub fn update_plan(&mut self, request: acp::Plan, cx: &mut Context<Self>) {
1819 let new_entries_len = request.entries.len();
1820 let mut new_entries = request.entries.into_iter();
1821
1822 // Reuse existing markdown to prevent flickering
1823 for (old, new) in self.plan.entries.iter_mut().zip(new_entries.by_ref()) {
1824 let PlanEntry {
1825 content,
1826 priority,
1827 status,
1828 } = old;
1829 content.update(cx, |old, cx| {
1830 old.replace(new.content, cx);
1831 });
1832 *priority = new.priority;
1833 *status = new.status;
1834 }
1835 for new in new_entries {
1836 self.plan.entries.push(PlanEntry::from_acp(new, cx))
1837 }
1838 self.plan.entries.truncate(new_entries_len);
1839
1840 cx.notify();
1841 }
1842
1843 fn clear_completed_plan_entries(&mut self, cx: &mut Context<Self>) {
1844 self.plan
1845 .entries
1846 .retain(|entry| !matches!(entry.status, acp::PlanEntryStatus::Completed));
1847 cx.notify();
1848 }
1849
1850 #[cfg(any(test, feature = "test-support"))]
1851 pub fn send_raw(
1852 &mut self,
1853 message: &str,
1854 cx: &mut Context<Self>,
1855 ) -> BoxFuture<'static, Result<()>> {
1856 self.send(vec![message.into()], cx)
1857 }
1858
1859 pub fn send(
1860 &mut self,
1861 message: Vec<acp::ContentBlock>,
1862 cx: &mut Context<Self>,
1863 ) -> BoxFuture<'static, Result<()>> {
1864 let block = ContentBlock::new_combined(
1865 message.clone(),
1866 self.project.read(cx).languages().clone(),
1867 self.project.read(cx).path_style(cx),
1868 cx,
1869 );
1870 let request = acp::PromptRequest::new(self.session_id.clone(), message.clone());
1871 let git_store = self.project.read(cx).git_store().clone();
1872
1873 let message_id = if self.connection.truncate(&self.session_id, cx).is_some() {
1874 Some(UserMessageId::new())
1875 } else {
1876 None
1877 };
1878
1879 self.run_turn(cx, async move |this, cx| {
1880 this.update(cx, |this, cx| {
1881 this.push_entry(
1882 AgentThreadEntry::UserMessage(UserMessage {
1883 id: message_id.clone(),
1884 content: block,
1885 chunks: message,
1886 checkpoint: None,
1887 indented: false,
1888 }),
1889 cx,
1890 );
1891 })
1892 .ok();
1893
1894 let old_checkpoint = git_store
1895 .update(cx, |git, cx| git.checkpoint(cx))
1896 .await
1897 .context("failed to get old checkpoint")
1898 .log_err();
1899 this.update(cx, |this, cx| {
1900 if let Some((_ix, message)) = this.last_user_message() {
1901 message.checkpoint = old_checkpoint.map(|git_checkpoint| Checkpoint {
1902 git_checkpoint,
1903 show: false,
1904 });
1905 }
1906 this.connection.prompt(message_id, request, cx)
1907 })?
1908 .await
1909 })
1910 }
1911
1912 pub fn can_retry(&self, cx: &App) -> bool {
1913 self.connection.retry(&self.session_id, cx).is_some()
1914 }
1915
1916 pub fn retry(&mut self, cx: &mut Context<Self>) -> BoxFuture<'static, Result<()>> {
1917 self.run_turn(cx, async move |this, cx| {
1918 this.update(cx, |this, cx| {
1919 this.connection
1920 .retry(&this.session_id, cx)
1921 .map(|retry| retry.run(cx))
1922 })?
1923 .context("retrying a session is not supported")?
1924 .await
1925 })
1926 }
1927
1928 fn run_turn(
1929 &mut self,
1930 cx: &mut Context<Self>,
1931 f: impl 'static + AsyncFnOnce(WeakEntity<Self>, &mut AsyncApp) -> Result<acp::PromptResponse>,
1932 ) -> BoxFuture<'static, Result<()>> {
1933 self.clear_completed_plan_entries(cx);
1934
1935 let (tx, rx) = oneshot::channel();
1936 let cancel_task = self.cancel(cx);
1937
1938 self.send_task = Some(cx.spawn(async move |this, cx| {
1939 cancel_task.await;
1940 tx.send(f(this, cx).await).ok();
1941 }));
1942
1943 cx.spawn(async move |this, cx| {
1944 let response = rx.await;
1945
1946 this.update(cx, |this, cx| this.update_last_checkpoint(cx))?
1947 .await?;
1948
1949 this.update(cx, |this, cx| {
1950 this.project
1951 .update(cx, |project, cx| project.set_agent_location(None, cx));
1952 match response {
1953 Ok(Err(e)) => {
1954 this.send_task.take();
1955 cx.emit(AcpThreadEvent::Error);
1956 Err(e)
1957 }
1958 Ok(Ok(r)) if r.stop_reason == acp::StopReason::MaxTokens => {
1959 this.send_task.take();
1960 cx.emit(AcpThreadEvent::Error);
1961 Err(anyhow!("Max tokens reached"))
1962 }
1963 result => {
1964 let canceled = matches!(
1965 result,
1966 Ok(Ok(acp::PromptResponse {
1967 stop_reason: acp::StopReason::Cancelled,
1968 ..
1969 }))
1970 );
1971
1972 // We only take the task if the current prompt wasn't canceled.
1973 //
1974 // This prompt may have been canceled because another one was sent
1975 // while it was still generating. In these cases, dropping `send_task`
1976 // would cause the next generation to be canceled.
1977 if !canceled {
1978 this.send_task.take();
1979 }
1980
1981 // Handle refusal - distinguish between user prompt and tool call refusals
1982 if let Ok(Ok(acp::PromptResponse {
1983 stop_reason: acp::StopReason::Refusal,
1984 ..
1985 })) = result
1986 {
1987 if let Some((user_msg_ix, _)) = this.last_user_message() {
1988 // Check if there's a completed tool call with results after the last user message
1989 // This indicates the refusal is in response to tool output, not the user's prompt
1990 let has_completed_tool_call_after_user_msg =
1991 this.entries.iter().skip(user_msg_ix + 1).any(|entry| {
1992 if let AgentThreadEntry::ToolCall(tool_call) = entry {
1993 // Check if the tool call has completed and has output
1994 matches!(tool_call.status, ToolCallStatus::Completed)
1995 && tool_call.raw_output.is_some()
1996 } else {
1997 false
1998 }
1999 });
2000
2001 if has_completed_tool_call_after_user_msg {
2002 // Refusal is due to tool output - don't truncate, just notify
2003 // The model refused based on what the tool returned
2004 cx.emit(AcpThreadEvent::Refusal);
2005 } else {
2006 // User prompt was refused - truncate back to before the user message
2007 let range = user_msg_ix..this.entries.len();
2008 if range.start < range.end {
2009 this.entries.truncate(user_msg_ix);
2010 cx.emit(AcpThreadEvent::EntriesRemoved(range));
2011 }
2012 cx.emit(AcpThreadEvent::Refusal);
2013 }
2014 } else {
2015 // No user message found, treat as general refusal
2016 cx.emit(AcpThreadEvent::Refusal);
2017 }
2018 }
2019
2020 cx.emit(AcpThreadEvent::Stopped);
2021 Ok(())
2022 }
2023 }
2024 })?
2025 })
2026 .boxed()
2027 }
2028
2029 pub fn cancel(&mut self, cx: &mut Context<Self>) -> Task<()> {
2030 let Some(send_task) = self.send_task.take() else {
2031 return Task::ready(());
2032 };
2033
2034 for entry in self.entries.iter_mut() {
2035 if let AgentThreadEntry::ToolCall(call) = entry {
2036 let cancel = matches!(
2037 call.status,
2038 ToolCallStatus::Pending
2039 | ToolCallStatus::WaitingForConfirmation { .. }
2040 | ToolCallStatus::InProgress
2041 );
2042
2043 if cancel {
2044 call.status = ToolCallStatus::Canceled;
2045 }
2046 }
2047 }
2048
2049 self.connection.cancel(&self.session_id, cx);
2050
2051 // Wait for the send task to complete
2052 cx.foreground_executor().spawn(send_task)
2053 }
2054
2055 /// Restores the git working tree to the state at the given checkpoint (if one exists)
2056 pub fn restore_checkpoint(
2057 &mut self,
2058 id: UserMessageId,
2059 cx: &mut Context<Self>,
2060 ) -> Task<Result<()>> {
2061 let Some((_, message)) = self.user_message_mut(&id) else {
2062 return Task::ready(Err(anyhow!("message not found")));
2063 };
2064
2065 let checkpoint = message
2066 .checkpoint
2067 .as_ref()
2068 .map(|c| c.git_checkpoint.clone());
2069
2070 // Cancel any in-progress generation before restoring
2071 let cancel_task = self.cancel(cx);
2072 let rewind = self.rewind(id.clone(), cx);
2073 let git_store = self.project.read(cx).git_store().clone();
2074
2075 cx.spawn(async move |_, cx| {
2076 cancel_task.await;
2077 rewind.await?;
2078 if let Some(checkpoint) = checkpoint {
2079 git_store
2080 .update(cx, |git, cx| git.restore_checkpoint(checkpoint, cx))
2081 .await?;
2082 }
2083
2084 Ok(())
2085 })
2086 }
2087
2088 /// Rewinds this thread to before the entry at `index`, removing it and all
2089 /// subsequent entries while rejecting any action_log changes made from that point.
2090 /// Unlike `restore_checkpoint`, this method does not restore from git.
2091 pub fn rewind(&mut self, id: UserMessageId, cx: &mut Context<Self>) -> Task<Result<()>> {
2092 let Some(truncate) = self.connection.truncate(&self.session_id, cx) else {
2093 return Task::ready(Err(anyhow!("not supported")));
2094 };
2095
2096 let telemetry = ActionLogTelemetry::from(&*self);
2097 cx.spawn(async move |this, cx| {
2098 cx.update(|cx| truncate.run(id.clone(), cx)).await?;
2099 this.update(cx, |this, cx| {
2100 if let Some((ix, _)) = this.user_message_mut(&id) {
2101 // Collect all terminals from entries that will be removed
2102 let terminals_to_remove: Vec<acp::TerminalId> = this.entries[ix..]
2103 .iter()
2104 .flat_map(|entry| entry.terminals())
2105 .filter_map(|terminal| terminal.read(cx).id().clone().into())
2106 .collect();
2107
2108 let range = ix..this.entries.len();
2109 this.entries.truncate(ix);
2110 cx.emit(AcpThreadEvent::EntriesRemoved(range));
2111
2112 // Kill and remove the terminals
2113 for terminal_id in terminals_to_remove {
2114 if let Some(terminal) = this.terminals.remove(&terminal_id) {
2115 terminal.update(cx, |terminal, cx| {
2116 terminal.kill(cx);
2117 });
2118 }
2119 }
2120 }
2121 this.action_log().update(cx, |action_log, cx| {
2122 action_log.reject_all_edits(Some(telemetry), cx)
2123 })
2124 })?
2125 .await;
2126 Ok(())
2127 })
2128 }
2129
2130 fn update_last_checkpoint(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
2131 let git_store = self.project.read(cx).git_store().clone();
2132
2133 let Some((_, message)) = self.last_user_message() else {
2134 return Task::ready(Ok(()));
2135 };
2136 let Some(user_message_id) = message.id.clone() else {
2137 return Task::ready(Ok(()));
2138 };
2139 let Some(checkpoint) = message.checkpoint.as_ref() else {
2140 return Task::ready(Ok(()));
2141 };
2142 let old_checkpoint = checkpoint.git_checkpoint.clone();
2143
2144 let new_checkpoint = git_store.update(cx, |git, cx| git.checkpoint(cx));
2145 cx.spawn(async move |this, cx| {
2146 let Some(new_checkpoint) = new_checkpoint
2147 .await
2148 .context("failed to get new checkpoint")
2149 .log_err()
2150 else {
2151 return Ok(());
2152 };
2153
2154 let equal = git_store
2155 .update(cx, |git, cx| {
2156 git.compare_checkpoints(old_checkpoint.clone(), new_checkpoint, cx)
2157 })
2158 .await
2159 .unwrap_or(true);
2160
2161 this.update(cx, |this, cx| {
2162 if let Some((ix, message)) = this.user_message_mut(&user_message_id) {
2163 if let Some(checkpoint) = message.checkpoint.as_mut() {
2164 checkpoint.show = !equal;
2165 cx.emit(AcpThreadEvent::EntryUpdated(ix));
2166 }
2167 }
2168 })?;
2169
2170 Ok(())
2171 })
2172 }
2173
2174 fn last_user_message(&mut self) -> Option<(usize, &mut UserMessage)> {
2175 self.entries
2176 .iter_mut()
2177 .enumerate()
2178 .rev()
2179 .find_map(|(ix, entry)| {
2180 if let AgentThreadEntry::UserMessage(message) = entry {
2181 Some((ix, message))
2182 } else {
2183 None
2184 }
2185 })
2186 }
2187
2188 fn user_message_mut(&mut self, id: &UserMessageId) -> Option<(usize, &mut UserMessage)> {
2189 self.entries.iter_mut().enumerate().find_map(|(ix, entry)| {
2190 if let AgentThreadEntry::UserMessage(message) = entry {
2191 if message.id.as_ref() == Some(id) {
2192 Some((ix, message))
2193 } else {
2194 None
2195 }
2196 } else {
2197 None
2198 }
2199 })
2200 }
2201
2202 pub fn read_text_file(
2203 &self,
2204 path: PathBuf,
2205 line: Option<u32>,
2206 limit: Option<u32>,
2207 reuse_shared_snapshot: bool,
2208 cx: &mut Context<Self>,
2209 ) -> Task<Result<String, acp::Error>> {
2210 // Args are 1-based, move to 0-based
2211 let line = line.unwrap_or_default().saturating_sub(1);
2212 let limit = limit.unwrap_or(u32::MAX);
2213 let project = self.project.clone();
2214 let action_log = self.action_log.clone();
2215 cx.spawn(async move |this, cx| {
2216 let load = project.update(cx, |project, cx| {
2217 let path = project
2218 .project_path_for_absolute_path(&path, cx)
2219 .ok_or_else(|| {
2220 acp::Error::resource_not_found(Some(path.display().to_string()))
2221 })?;
2222 Ok::<_, acp::Error>(project.open_buffer(path, cx))
2223 })?;
2224
2225 let buffer = load.await?;
2226
2227 let snapshot = if reuse_shared_snapshot {
2228 this.read_with(cx, |this, _| {
2229 this.shared_buffers.get(&buffer.clone()).cloned()
2230 })
2231 .log_err()
2232 .flatten()
2233 } else {
2234 None
2235 };
2236
2237 let snapshot = if let Some(snapshot) = snapshot {
2238 snapshot
2239 } else {
2240 action_log.update(cx, |action_log, cx| {
2241 action_log.buffer_read(buffer.clone(), cx);
2242 });
2243
2244 let snapshot = buffer.update(cx, |buffer, _| buffer.snapshot());
2245 this.update(cx, |this, _| {
2246 this.shared_buffers.insert(buffer.clone(), snapshot.clone());
2247 })?;
2248 snapshot
2249 };
2250
2251 let max_point = snapshot.max_point();
2252 let start_position = Point::new(line, 0);
2253
2254 if start_position > max_point {
2255 return Err(acp::Error::invalid_params().data(format!(
2256 "Attempting to read beyond the end of the file, line {}:{}",
2257 max_point.row + 1,
2258 max_point.column
2259 )));
2260 }
2261
2262 let start = snapshot.anchor_before(start_position);
2263 let end = snapshot.anchor_before(Point::new(line.saturating_add(limit), 0));
2264
2265 project.update(cx, |project, cx| {
2266 project.set_agent_location(
2267 Some(AgentLocation {
2268 buffer: buffer.downgrade(),
2269 position: start,
2270 }),
2271 cx,
2272 );
2273 });
2274
2275 Ok(snapshot.text_for_range(start..end).collect::<String>())
2276 })
2277 }
2278
2279 pub fn write_text_file(
2280 &self,
2281 path: PathBuf,
2282 content: String,
2283 cx: &mut Context<Self>,
2284 ) -> Task<Result<()>> {
2285 let project = self.project.clone();
2286 let action_log = self.action_log.clone();
2287 cx.spawn(async move |this, cx| {
2288 let load = project.update(cx, |project, cx| {
2289 let path = project
2290 .project_path_for_absolute_path(&path, cx)
2291 .context("invalid path")?;
2292 anyhow::Ok(project.open_buffer(path, cx))
2293 });
2294 let buffer = load?.await?;
2295 let snapshot = this.update(cx, |this, cx| {
2296 this.shared_buffers
2297 .get(&buffer)
2298 .cloned()
2299 .unwrap_or_else(|| buffer.read(cx).snapshot())
2300 })?;
2301 let edits = cx
2302 .background_executor()
2303 .spawn(async move {
2304 let old_text = snapshot.text();
2305 text_diff(old_text.as_str(), &content)
2306 .into_iter()
2307 .map(|(range, replacement)| {
2308 (
2309 snapshot.anchor_after(range.start)
2310 ..snapshot.anchor_before(range.end),
2311 replacement,
2312 )
2313 })
2314 .collect::<Vec<_>>()
2315 })
2316 .await;
2317
2318 project.update(cx, |project, cx| {
2319 project.set_agent_location(
2320 Some(AgentLocation {
2321 buffer: buffer.downgrade(),
2322 position: edits
2323 .last()
2324 .map(|(range, _)| range.end)
2325 .unwrap_or(Anchor::min_for_buffer(buffer.read(cx).remote_id())),
2326 }),
2327 cx,
2328 );
2329 });
2330
2331 let format_on_save = cx.update(|cx| {
2332 action_log.update(cx, |action_log, cx| {
2333 action_log.buffer_read(buffer.clone(), cx);
2334 });
2335
2336 let format_on_save = buffer.update(cx, |buffer, cx| {
2337 buffer.edit(edits, None, cx);
2338
2339 let settings = language::language_settings::language_settings(
2340 buffer.language().map(|l| l.name()),
2341 buffer.file(),
2342 cx,
2343 );
2344
2345 settings.format_on_save != FormatOnSave::Off
2346 });
2347 action_log.update(cx, |action_log, cx| {
2348 action_log.buffer_edited(buffer.clone(), cx);
2349 });
2350 format_on_save
2351 });
2352
2353 if format_on_save {
2354 let format_task = project.update(cx, |project, cx| {
2355 project.format(
2356 HashSet::from_iter([buffer.clone()]),
2357 LspFormatTarget::Buffers,
2358 false,
2359 FormatTrigger::Save,
2360 cx,
2361 )
2362 });
2363 format_task.await.log_err();
2364
2365 action_log.update(cx, |action_log, cx| {
2366 action_log.buffer_edited(buffer.clone(), cx);
2367 });
2368 }
2369
2370 project
2371 .update(cx, |project, cx| project.save_buffer(buffer, cx))
2372 .await
2373 })
2374 }
2375
2376 pub fn create_terminal(
2377 &self,
2378 command: String,
2379 args: Vec<String>,
2380 extra_env: Vec<acp::EnvVariable>,
2381 cwd: Option<PathBuf>,
2382 output_byte_limit: Option<u64>,
2383 cx: &mut Context<Self>,
2384 ) -> Task<Result<Entity<Terminal>>> {
2385 let env = match &cwd {
2386 Some(dir) => self.project.update(cx, |project, cx| {
2387 project.environment().update(cx, |env, cx| {
2388 env.directory_environment(dir.as_path().into(), cx)
2389 })
2390 }),
2391 None => Task::ready(None).shared(),
2392 };
2393 let env = cx.spawn(async move |_, _| {
2394 let mut env = env.await.unwrap_or_default();
2395 // Disables paging for `git` and hopefully other commands
2396 env.insert("PAGER".into(), "".into());
2397 for var in extra_env {
2398 env.insert(var.name, var.value);
2399 }
2400 env
2401 });
2402
2403 let project = self.project.clone();
2404 let language_registry = project.read(cx).languages().clone();
2405 let is_windows = project.read(cx).path_style(cx).is_windows();
2406
2407 let terminal_id = acp::TerminalId::new(Uuid::new_v4().to_string());
2408 let terminal_task = cx.spawn({
2409 let terminal_id = terminal_id.clone();
2410 async move |_this, cx| {
2411 let env = env.await;
2412 let shell = project
2413 .update(cx, |project, cx| {
2414 project
2415 .remote_client()
2416 .and_then(|r| r.read(cx).default_system_shell())
2417 })
2418 .unwrap_or_else(|| get_default_system_shell_preferring_bash());
2419 let (task_command, task_args) =
2420 ShellBuilder::new(&Shell::Program(shell), is_windows)
2421 .redirect_stdin_to_dev_null()
2422 .build(Some(command.clone()), &args);
2423 let terminal = project
2424 .update(cx, |project, cx| {
2425 project.create_terminal_task(
2426 task::SpawnInTerminal {
2427 command: Some(task_command),
2428 args: task_args,
2429 cwd: cwd.clone(),
2430 env,
2431 ..Default::default()
2432 },
2433 cx,
2434 )
2435 })
2436 .await?;
2437
2438 anyhow::Ok(cx.new(|cx| {
2439 Terminal::new(
2440 terminal_id,
2441 &format!("{} {}", command, args.join(" ")),
2442 cwd,
2443 output_byte_limit.map(|l| l as usize),
2444 terminal,
2445 language_registry,
2446 cx,
2447 )
2448 }))
2449 }
2450 });
2451
2452 cx.spawn(async move |this, cx| {
2453 let terminal = terminal_task.await?;
2454 this.update(cx, |this, _cx| {
2455 this.terminals.insert(terminal_id, terminal.clone());
2456 terminal
2457 })
2458 })
2459 }
2460
2461 pub fn kill_terminal(
2462 &mut self,
2463 terminal_id: acp::TerminalId,
2464 cx: &mut Context<Self>,
2465 ) -> Result<()> {
2466 self.terminals
2467 .get(&terminal_id)
2468 .context("Terminal not found")?
2469 .update(cx, |terminal, cx| {
2470 terminal.kill(cx);
2471 });
2472
2473 Ok(())
2474 }
2475
2476 pub fn release_terminal(
2477 &mut self,
2478 terminal_id: acp::TerminalId,
2479 cx: &mut Context<Self>,
2480 ) -> Result<()> {
2481 self.terminals
2482 .remove(&terminal_id)
2483 .context("Terminal not found")?
2484 .update(cx, |terminal, cx| {
2485 terminal.kill(cx);
2486 });
2487
2488 Ok(())
2489 }
2490
2491 pub fn terminal(&self, terminal_id: acp::TerminalId) -> Result<Entity<Terminal>> {
2492 self.terminals
2493 .get(&terminal_id)
2494 .context("Terminal not found")
2495 .cloned()
2496 }
2497
2498 pub fn to_markdown(&self, cx: &App) -> String {
2499 self.entries.iter().map(|e| e.to_markdown(cx)).collect()
2500 }
2501
2502 pub fn emit_load_error(&mut self, error: LoadError, cx: &mut Context<Self>) {
2503 cx.emit(AcpThreadEvent::LoadError(error));
2504 }
2505
2506 pub fn register_terminal_created(
2507 &mut self,
2508 terminal_id: acp::TerminalId,
2509 command_label: String,
2510 working_dir: Option<PathBuf>,
2511 output_byte_limit: Option<u64>,
2512 terminal: Entity<::terminal::Terminal>,
2513 cx: &mut Context<Self>,
2514 ) -> Entity<Terminal> {
2515 let language_registry = self.project.read(cx).languages().clone();
2516
2517 let entity = cx.new(|cx| {
2518 Terminal::new(
2519 terminal_id.clone(),
2520 &command_label,
2521 working_dir.clone(),
2522 output_byte_limit.map(|l| l as usize),
2523 terminal,
2524 language_registry,
2525 cx,
2526 )
2527 });
2528 self.terminals.insert(terminal_id.clone(), entity.clone());
2529 entity
2530 }
2531}
2532
2533fn markdown_for_raw_output(
2534 raw_output: &serde_json::Value,
2535 language_registry: &Arc<LanguageRegistry>,
2536 cx: &mut App,
2537) -> Option<Entity<Markdown>> {
2538 match raw_output {
2539 serde_json::Value::Null => None,
2540 serde_json::Value::Bool(value) => Some(cx.new(|cx| {
2541 Markdown::new(
2542 value.to_string().into(),
2543 Some(language_registry.clone()),
2544 None,
2545 cx,
2546 )
2547 })),
2548 serde_json::Value::Number(value) => Some(cx.new(|cx| {
2549 Markdown::new(
2550 value.to_string().into(),
2551 Some(language_registry.clone()),
2552 None,
2553 cx,
2554 )
2555 })),
2556 serde_json::Value::String(value) => Some(cx.new(|cx| {
2557 Markdown::new(
2558 value.clone().into(),
2559 Some(language_registry.clone()),
2560 None,
2561 cx,
2562 )
2563 })),
2564 value => Some(cx.new(|cx| {
2565 let pretty_json = to_string_pretty(value).unwrap_or_else(|_| value.to_string());
2566
2567 Markdown::new(
2568 format!("```json\n{}\n```", pretty_json).into(),
2569 Some(language_registry.clone()),
2570 None,
2571 cx,
2572 )
2573 })),
2574 }
2575}
2576
2577#[cfg(test)]
2578mod tests {
2579 use super::*;
2580 use anyhow::anyhow;
2581 use futures::{channel::mpsc, future::LocalBoxFuture, select};
2582 use gpui::{App, AsyncApp, TestAppContext, WeakEntity};
2583 use indoc::indoc;
2584 use project::{FakeFs, Fs};
2585 use rand::{distr, prelude::*};
2586 use serde_json::json;
2587 use settings::SettingsStore;
2588 use smol::stream::StreamExt as _;
2589 use std::{
2590 any::Any,
2591 cell::RefCell,
2592 path::Path,
2593 rc::Rc,
2594 sync::atomic::{AtomicBool, AtomicUsize, Ordering::SeqCst},
2595 time::Duration,
2596 };
2597 use util::path;
2598
2599 fn init_test(cx: &mut TestAppContext) {
2600 env_logger::try_init().ok();
2601 cx.update(|cx| {
2602 let settings_store = SettingsStore::test(cx);
2603 cx.set_global(settings_store);
2604 });
2605 }
2606
2607 #[gpui::test]
2608 async fn test_terminal_output_buffered_before_created_renders(cx: &mut gpui::TestAppContext) {
2609 init_test(cx);
2610
2611 let fs = FakeFs::new(cx.executor());
2612 let project = Project::test(fs, [], cx).await;
2613 let connection = Rc::new(FakeAgentConnection::new());
2614 let thread = cx
2615 .update(|cx| connection.new_session(project, std::path::Path::new(path!("/test")), cx))
2616 .await
2617 .unwrap();
2618
2619 let terminal_id = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
2620
2621 // Send Output BEFORE Created - should be buffered by acp_thread
2622 thread.update(cx, |thread, cx| {
2623 thread.on_terminal_provider_event(
2624 TerminalProviderEvent::Output {
2625 terminal_id: terminal_id.clone(),
2626 data: b"hello buffered".to_vec(),
2627 },
2628 cx,
2629 );
2630 });
2631
2632 // Create a display-only terminal and then send Created
2633 let lower = cx.new(|cx| {
2634 let builder = ::terminal::TerminalBuilder::new_display_only(
2635 ::terminal::terminal_settings::CursorShape::default(),
2636 ::terminal::terminal_settings::AlternateScroll::On,
2637 None,
2638 0,
2639 cx.background_executor(),
2640 PathStyle::local(),
2641 )
2642 .unwrap();
2643 builder.subscribe(cx)
2644 });
2645
2646 thread.update(cx, |thread, cx| {
2647 thread.on_terminal_provider_event(
2648 TerminalProviderEvent::Created {
2649 terminal_id: terminal_id.clone(),
2650 label: "Buffered Test".to_string(),
2651 cwd: None,
2652 output_byte_limit: None,
2653 terminal: lower.clone(),
2654 },
2655 cx,
2656 );
2657 });
2658
2659 // After Created, buffered Output should have been flushed into the renderer
2660 let content = thread.read_with(cx, |thread, cx| {
2661 let term = thread.terminal(terminal_id.clone()).unwrap();
2662 term.read_with(cx, |t, cx| t.inner().read(cx).get_content())
2663 });
2664
2665 assert!(
2666 content.contains("hello buffered"),
2667 "expected buffered output to render, got: {content}"
2668 );
2669 }
2670
2671 #[gpui::test]
2672 async fn test_terminal_output_and_exit_buffered_before_created(cx: &mut gpui::TestAppContext) {
2673 init_test(cx);
2674
2675 let fs = FakeFs::new(cx.executor());
2676 let project = Project::test(fs, [], cx).await;
2677 let connection = Rc::new(FakeAgentConnection::new());
2678 let thread = cx
2679 .update(|cx| connection.new_session(project, std::path::Path::new(path!("/test")), cx))
2680 .await
2681 .unwrap();
2682
2683 let terminal_id = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
2684
2685 // Send Output BEFORE Created
2686 thread.update(cx, |thread, cx| {
2687 thread.on_terminal_provider_event(
2688 TerminalProviderEvent::Output {
2689 terminal_id: terminal_id.clone(),
2690 data: b"pre-exit data".to_vec(),
2691 },
2692 cx,
2693 );
2694 });
2695
2696 // Send Exit BEFORE Created
2697 thread.update(cx, |thread, cx| {
2698 thread.on_terminal_provider_event(
2699 TerminalProviderEvent::Exit {
2700 terminal_id: terminal_id.clone(),
2701 status: acp::TerminalExitStatus::new().exit_code(0),
2702 },
2703 cx,
2704 );
2705 });
2706
2707 // Now create a display-only lower-level terminal and send Created
2708 let lower = cx.new(|cx| {
2709 let builder = ::terminal::TerminalBuilder::new_display_only(
2710 ::terminal::terminal_settings::CursorShape::default(),
2711 ::terminal::terminal_settings::AlternateScroll::On,
2712 None,
2713 0,
2714 cx.background_executor(),
2715 PathStyle::local(),
2716 )
2717 .unwrap();
2718 builder.subscribe(cx)
2719 });
2720
2721 thread.update(cx, |thread, cx| {
2722 thread.on_terminal_provider_event(
2723 TerminalProviderEvent::Created {
2724 terminal_id: terminal_id.clone(),
2725 label: "Buffered Exit Test".to_string(),
2726 cwd: None,
2727 output_byte_limit: None,
2728 terminal: lower.clone(),
2729 },
2730 cx,
2731 );
2732 });
2733
2734 // Output should be present after Created (flushed from buffer)
2735 let content = thread.read_with(cx, |thread, cx| {
2736 let term = thread.terminal(terminal_id.clone()).unwrap();
2737 term.read_with(cx, |t, cx| t.inner().read(cx).get_content())
2738 });
2739
2740 assert!(
2741 content.contains("pre-exit data"),
2742 "expected pre-exit data to render, got: {content}"
2743 );
2744 }
2745
2746 /// Test that killing a terminal via Terminal::kill properly:
2747 /// 1. Causes wait_for_exit to complete (doesn't hang forever)
2748 /// 2. The underlying terminal still has the output that was written before the kill
2749 ///
2750 /// This test verifies that the fix to kill_active_task (which now also kills
2751 /// the shell process in addition to the foreground process) properly allows
2752 /// wait_for_exit to complete instead of hanging indefinitely.
2753 #[cfg(unix)]
2754 #[gpui::test]
2755 async fn test_terminal_kill_allows_wait_for_exit_to_complete(cx: &mut gpui::TestAppContext) {
2756 use std::collections::HashMap;
2757 use task::Shell;
2758 use util::shell_builder::ShellBuilder;
2759
2760 init_test(cx);
2761 cx.executor().allow_parking();
2762
2763 let fs = FakeFs::new(cx.executor());
2764 let project = Project::test(fs, [], cx).await;
2765 let connection = Rc::new(FakeAgentConnection::new());
2766 let thread = cx
2767 .update(|cx| connection.new_session(project.clone(), Path::new(path!("/test")), cx))
2768 .await
2769 .unwrap();
2770
2771 let terminal_id = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
2772
2773 // Create a real PTY terminal that runs a command which prints output then sleeps
2774 // We use printf instead of echo and chain with && sleep to ensure proper execution
2775 let (completion_tx, _completion_rx) = smol::channel::unbounded();
2776 let (program, args) = ShellBuilder::new(&Shell::System, false).build(
2777 Some("printf 'output_before_kill\\n' && sleep 60".to_owned()),
2778 &[],
2779 );
2780
2781 let builder = cx
2782 .update(|cx| {
2783 ::terminal::TerminalBuilder::new(
2784 None,
2785 None,
2786 task::Shell::WithArguments {
2787 program,
2788 args,
2789 title_override: None,
2790 },
2791 HashMap::default(),
2792 ::terminal::terminal_settings::CursorShape::default(),
2793 ::terminal::terminal_settings::AlternateScroll::On,
2794 None,
2795 vec![],
2796 0,
2797 false,
2798 0,
2799 Some(completion_tx),
2800 cx,
2801 vec![],
2802 PathStyle::local(),
2803 )
2804 })
2805 .await
2806 .unwrap();
2807
2808 let lower_terminal = cx.new(|cx| builder.subscribe(cx));
2809
2810 // Create the acp_thread Terminal wrapper
2811 thread.update(cx, |thread, cx| {
2812 thread.on_terminal_provider_event(
2813 TerminalProviderEvent::Created {
2814 terminal_id: terminal_id.clone(),
2815 label: "printf output_before_kill && sleep 60".to_string(),
2816 cwd: None,
2817 output_byte_limit: None,
2818 terminal: lower_terminal.clone(),
2819 },
2820 cx,
2821 );
2822 });
2823
2824 // Wait for the printf command to execute and produce output
2825 // Use real time since parking is enabled
2826 cx.executor().timer(Duration::from_millis(500)).await;
2827
2828 // Get the acp_thread Terminal and kill it
2829 let wait_for_exit = thread.update(cx, |thread, cx| {
2830 let term = thread.terminals.get(&terminal_id).unwrap();
2831 let wait_for_exit = term.read(cx).wait_for_exit();
2832 term.update(cx, |term, cx| {
2833 term.kill(cx);
2834 });
2835 wait_for_exit
2836 });
2837
2838 // KEY ASSERTION: wait_for_exit should complete within a reasonable time (not hang).
2839 // Before the fix to kill_active_task, this would hang forever because
2840 // only the foreground process was killed, not the shell, so the PTY
2841 // child never exited and wait_for_completed_task never completed.
2842 let exit_result = futures::select! {
2843 result = futures::FutureExt::fuse(wait_for_exit) => Some(result),
2844 _ = futures::FutureExt::fuse(cx.background_executor.timer(Duration::from_secs(5))) => None,
2845 };
2846
2847 assert!(
2848 exit_result.is_some(),
2849 "wait_for_exit should complete after kill, but it timed out. \
2850 This indicates kill_active_task is not properly killing the shell process."
2851 );
2852
2853 // Give the system a chance to process any pending updates
2854 cx.run_until_parked();
2855
2856 // Verify that the underlying terminal still has the output that was
2857 // written before the kill. This verifies that killing doesn't lose output.
2858 let inner_content = thread.read_with(cx, |thread, cx| {
2859 let term = thread.terminals.get(&terminal_id).unwrap();
2860 term.read(cx).inner().read(cx).get_content()
2861 });
2862
2863 assert!(
2864 inner_content.contains("output_before_kill"),
2865 "Underlying terminal should contain output from before kill, got: {}",
2866 inner_content
2867 );
2868 }
2869
2870 #[gpui::test]
2871 async fn test_push_user_content_block(cx: &mut gpui::TestAppContext) {
2872 init_test(cx);
2873
2874 let fs = FakeFs::new(cx.executor());
2875 let project = Project::test(fs, [], cx).await;
2876 let connection = Rc::new(FakeAgentConnection::new());
2877 let thread = cx
2878 .update(|cx| connection.new_session(project, Path::new(path!("/test")), cx))
2879 .await
2880 .unwrap();
2881
2882 // Test creating a new user message
2883 thread.update(cx, |thread, cx| {
2884 thread.push_user_content_block(None, "Hello, ".into(), cx);
2885 });
2886
2887 thread.update(cx, |thread, cx| {
2888 assert_eq!(thread.entries.len(), 1);
2889 if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
2890 assert_eq!(user_msg.id, None);
2891 assert_eq!(user_msg.content.to_markdown(cx), "Hello, ");
2892 } else {
2893 panic!("Expected UserMessage");
2894 }
2895 });
2896
2897 // Test appending to existing user message
2898 let message_1_id = UserMessageId::new();
2899 thread.update(cx, |thread, cx| {
2900 thread.push_user_content_block(Some(message_1_id.clone()), "world!".into(), cx);
2901 });
2902
2903 thread.update(cx, |thread, cx| {
2904 assert_eq!(thread.entries.len(), 1);
2905 if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
2906 assert_eq!(user_msg.id, Some(message_1_id));
2907 assert_eq!(user_msg.content.to_markdown(cx), "Hello, world!");
2908 } else {
2909 panic!("Expected UserMessage");
2910 }
2911 });
2912
2913 // Test creating new user message after assistant message
2914 thread.update(cx, |thread, cx| {
2915 thread.push_assistant_content_block("Assistant response".into(), false, cx);
2916 });
2917
2918 let message_2_id = UserMessageId::new();
2919 thread.update(cx, |thread, cx| {
2920 thread.push_user_content_block(
2921 Some(message_2_id.clone()),
2922 "New user message".into(),
2923 cx,
2924 );
2925 });
2926
2927 thread.update(cx, |thread, cx| {
2928 assert_eq!(thread.entries.len(), 3);
2929 if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[2] {
2930 assert_eq!(user_msg.id, Some(message_2_id));
2931 assert_eq!(user_msg.content.to_markdown(cx), "New user message");
2932 } else {
2933 panic!("Expected UserMessage at index 2");
2934 }
2935 });
2936 }
2937
2938 #[gpui::test]
2939 async fn test_thinking_concatenation(cx: &mut gpui::TestAppContext) {
2940 init_test(cx);
2941
2942 let fs = FakeFs::new(cx.executor());
2943 let project = Project::test(fs, [], cx).await;
2944 let connection = Rc::new(FakeAgentConnection::new().on_user_message(
2945 |_, thread, mut cx| {
2946 async move {
2947 thread.update(&mut cx, |thread, cx| {
2948 thread
2949 .handle_session_update(
2950 acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk::new(
2951 "Thinking ".into(),
2952 )),
2953 cx,
2954 )
2955 .unwrap();
2956 thread
2957 .handle_session_update(
2958 acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk::new(
2959 "hard!".into(),
2960 )),
2961 cx,
2962 )
2963 .unwrap();
2964 })?;
2965 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
2966 }
2967 .boxed_local()
2968 },
2969 ));
2970
2971 let thread = cx
2972 .update(|cx| connection.new_session(project, Path::new(path!("/test")), cx))
2973 .await
2974 .unwrap();
2975
2976 thread
2977 .update(cx, |thread, cx| thread.send_raw("Hello from Zed!", cx))
2978 .await
2979 .unwrap();
2980
2981 let output = thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
2982 assert_eq!(
2983 output,
2984 indoc! {r#"
2985 ## User
2986
2987 Hello from Zed!
2988
2989 ## Assistant
2990
2991 <thinking>
2992 Thinking hard!
2993 </thinking>
2994
2995 "#}
2996 );
2997 }
2998
2999 #[gpui::test]
3000 async fn test_edits_concurrently_to_user(cx: &mut TestAppContext) {
3001 init_test(cx);
3002
3003 let fs = FakeFs::new(cx.executor());
3004 fs.insert_tree(path!("/tmp"), json!({"foo": "one\ntwo\nthree\n"}))
3005 .await;
3006 let project = Project::test(fs.clone(), [], cx).await;
3007 let (read_file_tx, read_file_rx) = oneshot::channel::<()>();
3008 let read_file_tx = Rc::new(RefCell::new(Some(read_file_tx)));
3009 let connection = Rc::new(FakeAgentConnection::new().on_user_message(
3010 move |_, thread, mut cx| {
3011 let read_file_tx = read_file_tx.clone();
3012 async move {
3013 let content = thread
3014 .update(&mut cx, |thread, cx| {
3015 thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
3016 })
3017 .unwrap()
3018 .await
3019 .unwrap();
3020 assert_eq!(content, "one\ntwo\nthree\n");
3021 read_file_tx.take().unwrap().send(()).unwrap();
3022 thread
3023 .update(&mut cx, |thread, cx| {
3024 thread.write_text_file(
3025 path!("/tmp/foo").into(),
3026 "one\ntwo\nthree\nfour\nfive\n".to_string(),
3027 cx,
3028 )
3029 })
3030 .unwrap()
3031 .await
3032 .unwrap();
3033 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3034 }
3035 .boxed_local()
3036 },
3037 ));
3038
3039 let (worktree, pathbuf) = project
3040 .update(cx, |project, cx| {
3041 project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
3042 })
3043 .await
3044 .unwrap();
3045 let buffer = project
3046 .update(cx, |project, cx| {
3047 project.open_buffer((worktree.read(cx).id(), pathbuf), cx)
3048 })
3049 .await
3050 .unwrap();
3051
3052 let thread = cx
3053 .update(|cx| connection.new_session(project, Path::new(path!("/tmp")), cx))
3054 .await
3055 .unwrap();
3056
3057 let request = thread.update(cx, |thread, cx| {
3058 thread.send_raw("Extend the count in /tmp/foo", cx)
3059 });
3060 read_file_rx.await.ok();
3061 buffer.update(cx, |buffer, cx| {
3062 buffer.edit([(0..0, "zero\n".to_string())], None, cx);
3063 });
3064 cx.run_until_parked();
3065 assert_eq!(
3066 buffer.read_with(cx, |buffer, _| buffer.text()),
3067 "zero\none\ntwo\nthree\nfour\nfive\n"
3068 );
3069 assert_eq!(
3070 String::from_utf8(fs.read_file_sync(path!("/tmp/foo")).unwrap()).unwrap(),
3071 "zero\none\ntwo\nthree\nfour\nfive\n"
3072 );
3073 request.await.unwrap();
3074 }
3075
3076 #[gpui::test]
3077 async fn test_reading_from_line(cx: &mut TestAppContext) {
3078 init_test(cx);
3079
3080 let fs = FakeFs::new(cx.executor());
3081 fs.insert_tree(path!("/tmp"), json!({"foo": "one\ntwo\nthree\nfour\n"}))
3082 .await;
3083 let project = Project::test(fs.clone(), [], cx).await;
3084 project
3085 .update(cx, |project, cx| {
3086 project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
3087 })
3088 .await
3089 .unwrap();
3090
3091 let connection = Rc::new(FakeAgentConnection::new());
3092
3093 let thread = cx
3094 .update(|cx| connection.new_session(project, Path::new(path!("/tmp")), cx))
3095 .await
3096 .unwrap();
3097
3098 // Whole file
3099 let content = thread
3100 .update(cx, |thread, cx| {
3101 thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
3102 })
3103 .await
3104 .unwrap();
3105
3106 assert_eq!(content, "one\ntwo\nthree\nfour\n");
3107
3108 // Only start line
3109 let content = thread
3110 .update(cx, |thread, cx| {
3111 thread.read_text_file(path!("/tmp/foo").into(), Some(3), None, false, cx)
3112 })
3113 .await
3114 .unwrap();
3115
3116 assert_eq!(content, "three\nfour\n");
3117
3118 // Only limit
3119 let content = thread
3120 .update(cx, |thread, cx| {
3121 thread.read_text_file(path!("/tmp/foo").into(), None, Some(2), false, cx)
3122 })
3123 .await
3124 .unwrap();
3125
3126 assert_eq!(content, "one\ntwo\n");
3127
3128 // Range
3129 let content = thread
3130 .update(cx, |thread, cx| {
3131 thread.read_text_file(path!("/tmp/foo").into(), Some(2), Some(2), false, cx)
3132 })
3133 .await
3134 .unwrap();
3135
3136 assert_eq!(content, "two\nthree\n");
3137
3138 // Invalid
3139 let err = thread
3140 .update(cx, |thread, cx| {
3141 thread.read_text_file(path!("/tmp/foo").into(), Some(6), Some(2), false, cx)
3142 })
3143 .await
3144 .unwrap_err();
3145
3146 assert_eq!(
3147 err.to_string(),
3148 "Invalid params: \"Attempting to read beyond the end of the file, line 5:0\""
3149 );
3150 }
3151
3152 #[gpui::test]
3153 async fn test_reading_empty_file(cx: &mut TestAppContext) {
3154 init_test(cx);
3155
3156 let fs = FakeFs::new(cx.executor());
3157 fs.insert_tree(path!("/tmp"), json!({"foo": ""})).await;
3158 let project = Project::test(fs.clone(), [], cx).await;
3159 project
3160 .update(cx, |project, cx| {
3161 project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
3162 })
3163 .await
3164 .unwrap();
3165
3166 let connection = Rc::new(FakeAgentConnection::new());
3167
3168 let thread = cx
3169 .update(|cx| connection.new_session(project, Path::new(path!("/tmp")), cx))
3170 .await
3171 .unwrap();
3172
3173 // Whole file
3174 let content = thread
3175 .update(cx, |thread, cx| {
3176 thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
3177 })
3178 .await
3179 .unwrap();
3180
3181 assert_eq!(content, "");
3182
3183 // Only start line
3184 let content = thread
3185 .update(cx, |thread, cx| {
3186 thread.read_text_file(path!("/tmp/foo").into(), Some(1), None, false, cx)
3187 })
3188 .await
3189 .unwrap();
3190
3191 assert_eq!(content, "");
3192
3193 // Only limit
3194 let content = thread
3195 .update(cx, |thread, cx| {
3196 thread.read_text_file(path!("/tmp/foo").into(), None, Some(2), false, cx)
3197 })
3198 .await
3199 .unwrap();
3200
3201 assert_eq!(content, "");
3202
3203 // Range
3204 let content = thread
3205 .update(cx, |thread, cx| {
3206 thread.read_text_file(path!("/tmp/foo").into(), Some(1), Some(1), false, cx)
3207 })
3208 .await
3209 .unwrap();
3210
3211 assert_eq!(content, "");
3212
3213 // Invalid
3214 let err = thread
3215 .update(cx, |thread, cx| {
3216 thread.read_text_file(path!("/tmp/foo").into(), Some(5), Some(2), false, cx)
3217 })
3218 .await
3219 .unwrap_err();
3220
3221 assert_eq!(
3222 err.to_string(),
3223 "Invalid params: \"Attempting to read beyond the end of the file, line 1:0\""
3224 );
3225 }
3226 #[gpui::test]
3227 async fn test_reading_non_existing_file(cx: &mut TestAppContext) {
3228 init_test(cx);
3229
3230 let fs = FakeFs::new(cx.executor());
3231 fs.insert_tree(path!("/tmp"), json!({})).await;
3232 let project = Project::test(fs.clone(), [], cx).await;
3233 project
3234 .update(cx, |project, cx| {
3235 project.find_or_create_worktree(path!("/tmp"), true, cx)
3236 })
3237 .await
3238 .unwrap();
3239
3240 let connection = Rc::new(FakeAgentConnection::new());
3241
3242 let thread = cx
3243 .update(|cx| connection.new_session(project, Path::new(path!("/tmp")), cx))
3244 .await
3245 .unwrap();
3246
3247 // Out of project file
3248 let err = thread
3249 .update(cx, |thread, cx| {
3250 thread.read_text_file(path!("/foo").into(), None, None, false, cx)
3251 })
3252 .await
3253 .unwrap_err();
3254
3255 assert_eq!(err.code, acp::ErrorCode::ResourceNotFound);
3256 }
3257
3258 #[gpui::test]
3259 async fn test_succeeding_canceled_toolcall(cx: &mut TestAppContext) {
3260 init_test(cx);
3261
3262 let fs = FakeFs::new(cx.executor());
3263 let project = Project::test(fs, [], cx).await;
3264 let id = acp::ToolCallId::new("test");
3265
3266 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3267 let id = id.clone();
3268 move |_, thread, mut cx| {
3269 let id = id.clone();
3270 async move {
3271 thread
3272 .update(&mut cx, |thread, cx| {
3273 thread.handle_session_update(
3274 acp::SessionUpdate::ToolCall(
3275 acp::ToolCall::new(id.clone(), "Label")
3276 .kind(acp::ToolKind::Fetch)
3277 .status(acp::ToolCallStatus::InProgress),
3278 ),
3279 cx,
3280 )
3281 })
3282 .unwrap()
3283 .unwrap();
3284 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3285 }
3286 .boxed_local()
3287 }
3288 }));
3289
3290 let thread = cx
3291 .update(|cx| connection.new_session(project, Path::new(path!("/test")), cx))
3292 .await
3293 .unwrap();
3294
3295 let request = thread.update(cx, |thread, cx| {
3296 thread.send_raw("Fetch https://example.com", cx)
3297 });
3298
3299 run_until_first_tool_call(&thread, cx).await;
3300
3301 thread.read_with(cx, |thread, _| {
3302 assert!(matches!(
3303 thread.entries[1],
3304 AgentThreadEntry::ToolCall(ToolCall {
3305 status: ToolCallStatus::InProgress,
3306 ..
3307 })
3308 ));
3309 });
3310
3311 thread.update(cx, |thread, cx| thread.cancel(cx)).await;
3312
3313 thread.read_with(cx, |thread, _| {
3314 assert!(matches!(
3315 &thread.entries[1],
3316 AgentThreadEntry::ToolCall(ToolCall {
3317 status: ToolCallStatus::Canceled,
3318 ..
3319 })
3320 ));
3321 });
3322
3323 thread
3324 .update(cx, |thread, cx| {
3325 thread.handle_session_update(
3326 acp::SessionUpdate::ToolCallUpdate(acp::ToolCallUpdate::new(
3327 id,
3328 acp::ToolCallUpdateFields::new().status(acp::ToolCallStatus::Completed),
3329 )),
3330 cx,
3331 )
3332 })
3333 .unwrap();
3334
3335 request.await.unwrap();
3336
3337 thread.read_with(cx, |thread, _| {
3338 assert!(matches!(
3339 thread.entries[1],
3340 AgentThreadEntry::ToolCall(ToolCall {
3341 status: ToolCallStatus::Completed,
3342 ..
3343 })
3344 ));
3345 });
3346 }
3347
3348 #[gpui::test]
3349 async fn test_no_pending_edits_if_tool_calls_are_completed(cx: &mut TestAppContext) {
3350 init_test(cx);
3351 let fs = FakeFs::new(cx.background_executor.clone());
3352 fs.insert_tree(path!("/test"), json!({})).await;
3353 let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
3354
3355 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3356 move |_, thread, mut cx| {
3357 async move {
3358 thread
3359 .update(&mut cx, |thread, cx| {
3360 thread.handle_session_update(
3361 acp::SessionUpdate::ToolCall(
3362 acp::ToolCall::new("test", "Label")
3363 .kind(acp::ToolKind::Edit)
3364 .status(acp::ToolCallStatus::Completed)
3365 .content(vec![acp::ToolCallContent::Diff(acp::Diff::new(
3366 "/test/test.txt",
3367 "foo",
3368 ))]),
3369 ),
3370 cx,
3371 )
3372 })
3373 .unwrap()
3374 .unwrap();
3375 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3376 }
3377 .boxed_local()
3378 }
3379 }));
3380
3381 let thread = cx
3382 .update(|cx| connection.new_session(project, Path::new(path!("/test")), cx))
3383 .await
3384 .unwrap();
3385
3386 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Hi".into()], cx)))
3387 .await
3388 .unwrap();
3389
3390 assert!(cx.read(|cx| !thread.read(cx).has_pending_edit_tool_calls()));
3391 }
3392
3393 #[gpui::test(iterations = 10)]
3394 async fn test_checkpoints(cx: &mut TestAppContext) {
3395 init_test(cx);
3396 let fs = FakeFs::new(cx.background_executor.clone());
3397 fs.insert_tree(
3398 path!("/test"),
3399 json!({
3400 ".git": {}
3401 }),
3402 )
3403 .await;
3404 let project = Project::test(fs.clone(), [path!("/test").as_ref()], cx).await;
3405
3406 let simulate_changes = Arc::new(AtomicBool::new(true));
3407 let next_filename = Arc::new(AtomicUsize::new(0));
3408 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3409 let simulate_changes = simulate_changes.clone();
3410 let next_filename = next_filename.clone();
3411 let fs = fs.clone();
3412 move |request, thread, mut cx| {
3413 let fs = fs.clone();
3414 let simulate_changes = simulate_changes.clone();
3415 let next_filename = next_filename.clone();
3416 async move {
3417 if simulate_changes.load(SeqCst) {
3418 let filename = format!("/test/file-{}", next_filename.fetch_add(1, SeqCst));
3419 fs.write(Path::new(&filename), b"").await?;
3420 }
3421
3422 let acp::ContentBlock::Text(content) = &request.prompt[0] else {
3423 panic!("expected text content block");
3424 };
3425 thread.update(&mut cx, |thread, cx| {
3426 thread
3427 .handle_session_update(
3428 acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk::new(
3429 content.text.to_uppercase().into(),
3430 )),
3431 cx,
3432 )
3433 .unwrap();
3434 })?;
3435 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3436 }
3437 .boxed_local()
3438 }
3439 }));
3440 let thread = cx
3441 .update(|cx| connection.new_session(project, Path::new(path!("/test")), cx))
3442 .await
3443 .unwrap();
3444
3445 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Lorem".into()], cx)))
3446 .await
3447 .unwrap();
3448 thread.read_with(cx, |thread, cx| {
3449 assert_eq!(
3450 thread.to_markdown(cx),
3451 indoc! {"
3452 ## User (checkpoint)
3453
3454 Lorem
3455
3456 ## Assistant
3457
3458 LOREM
3459
3460 "}
3461 );
3462 });
3463 assert_eq!(fs.files(), vec![Path::new(path!("/test/file-0"))]);
3464
3465 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["ipsum".into()], cx)))
3466 .await
3467 .unwrap();
3468 thread.read_with(cx, |thread, cx| {
3469 assert_eq!(
3470 thread.to_markdown(cx),
3471 indoc! {"
3472 ## User (checkpoint)
3473
3474 Lorem
3475
3476 ## Assistant
3477
3478 LOREM
3479
3480 ## User (checkpoint)
3481
3482 ipsum
3483
3484 ## Assistant
3485
3486 IPSUM
3487
3488 "}
3489 );
3490 });
3491 assert_eq!(
3492 fs.files(),
3493 vec![
3494 Path::new(path!("/test/file-0")),
3495 Path::new(path!("/test/file-1"))
3496 ]
3497 );
3498
3499 // Checkpoint isn't stored when there are no changes.
3500 simulate_changes.store(false, SeqCst);
3501 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["dolor".into()], cx)))
3502 .await
3503 .unwrap();
3504 thread.read_with(cx, |thread, cx| {
3505 assert_eq!(
3506 thread.to_markdown(cx),
3507 indoc! {"
3508 ## User (checkpoint)
3509
3510 Lorem
3511
3512 ## Assistant
3513
3514 LOREM
3515
3516 ## User (checkpoint)
3517
3518 ipsum
3519
3520 ## Assistant
3521
3522 IPSUM
3523
3524 ## User
3525
3526 dolor
3527
3528 ## Assistant
3529
3530 DOLOR
3531
3532 "}
3533 );
3534 });
3535 assert_eq!(
3536 fs.files(),
3537 vec![
3538 Path::new(path!("/test/file-0")),
3539 Path::new(path!("/test/file-1"))
3540 ]
3541 );
3542
3543 // Rewinding the conversation truncates the history and restores the checkpoint.
3544 thread
3545 .update(cx, |thread, cx| {
3546 let AgentThreadEntry::UserMessage(message) = &thread.entries[2] else {
3547 panic!("unexpected entries {:?}", thread.entries)
3548 };
3549 thread.restore_checkpoint(message.id.clone().unwrap(), cx)
3550 })
3551 .await
3552 .unwrap();
3553 thread.read_with(cx, |thread, cx| {
3554 assert_eq!(
3555 thread.to_markdown(cx),
3556 indoc! {"
3557 ## User (checkpoint)
3558
3559 Lorem
3560
3561 ## Assistant
3562
3563 LOREM
3564
3565 "}
3566 );
3567 });
3568 assert_eq!(fs.files(), vec![Path::new(path!("/test/file-0"))]);
3569 }
3570
3571 #[gpui::test]
3572 async fn test_tool_result_refusal(cx: &mut TestAppContext) {
3573 use std::sync::atomic::AtomicUsize;
3574 init_test(cx);
3575
3576 let fs = FakeFs::new(cx.executor());
3577 let project = Project::test(fs, None, cx).await;
3578
3579 // Create a connection that simulates refusal after tool result
3580 let prompt_count = Arc::new(AtomicUsize::new(0));
3581 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3582 let prompt_count = prompt_count.clone();
3583 move |_request, thread, mut cx| {
3584 let count = prompt_count.fetch_add(1, SeqCst);
3585 async move {
3586 if count == 0 {
3587 // First prompt: Generate a tool call with result
3588 thread.update(&mut cx, |thread, cx| {
3589 thread
3590 .handle_session_update(
3591 acp::SessionUpdate::ToolCall(
3592 acp::ToolCall::new("tool1", "Test Tool")
3593 .kind(acp::ToolKind::Fetch)
3594 .status(acp::ToolCallStatus::Completed)
3595 .raw_input(serde_json::json!({"query": "test"}))
3596 .raw_output(serde_json::json!({"result": "inappropriate content"})),
3597 ),
3598 cx,
3599 )
3600 .unwrap();
3601 })?;
3602
3603 // Now return refusal because of the tool result
3604 Ok(acp::PromptResponse::new(acp::StopReason::Refusal))
3605 } else {
3606 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3607 }
3608 }
3609 .boxed_local()
3610 }
3611 }));
3612
3613 let thread = cx
3614 .update(|cx| connection.new_session(project, Path::new(path!("/test")), cx))
3615 .await
3616 .unwrap();
3617
3618 // Track if we see a Refusal event
3619 let saw_refusal_event = Arc::new(std::sync::Mutex::new(false));
3620 let saw_refusal_event_captured = saw_refusal_event.clone();
3621 thread.update(cx, |_thread, cx| {
3622 cx.subscribe(
3623 &thread,
3624 move |_thread, _event_thread, event: &AcpThreadEvent, _cx| {
3625 if matches!(event, AcpThreadEvent::Refusal) {
3626 *saw_refusal_event_captured.lock().unwrap() = true;
3627 }
3628 },
3629 )
3630 .detach();
3631 });
3632
3633 // Send a user message - this will trigger tool call and then refusal
3634 let send_task = thread.update(cx, |thread, cx| thread.send(vec!["Hello".into()], cx));
3635 cx.background_executor.spawn(send_task).detach();
3636 cx.run_until_parked();
3637
3638 // Verify that:
3639 // 1. A Refusal event WAS emitted (because it's a tool result refusal, not user prompt)
3640 // 2. The user message was NOT truncated
3641 assert!(
3642 *saw_refusal_event.lock().unwrap(),
3643 "Refusal event should be emitted for tool result refusals"
3644 );
3645
3646 thread.read_with(cx, |thread, _| {
3647 let entries = thread.entries();
3648 assert!(entries.len() >= 2, "Should have user message and tool call");
3649
3650 // Verify user message is still there
3651 assert!(
3652 matches!(entries[0], AgentThreadEntry::UserMessage(_)),
3653 "User message should not be truncated"
3654 );
3655
3656 // Verify tool call is there with result
3657 if let AgentThreadEntry::ToolCall(tool_call) = &entries[1] {
3658 assert!(
3659 tool_call.raw_output.is_some(),
3660 "Tool call should have output"
3661 );
3662 } else {
3663 panic!("Expected tool call at index 1");
3664 }
3665 });
3666 }
3667
3668 #[gpui::test]
3669 async fn test_user_prompt_refusal_emits_event(cx: &mut TestAppContext) {
3670 init_test(cx);
3671
3672 let fs = FakeFs::new(cx.executor());
3673 let project = Project::test(fs, None, cx).await;
3674
3675 let refuse_next = Arc::new(AtomicBool::new(false));
3676 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3677 let refuse_next = refuse_next.clone();
3678 move |_request, _thread, _cx| {
3679 if refuse_next.load(SeqCst) {
3680 async move { Ok(acp::PromptResponse::new(acp::StopReason::Refusal)) }
3681 .boxed_local()
3682 } else {
3683 async move { Ok(acp::PromptResponse::new(acp::StopReason::EndTurn)) }
3684 .boxed_local()
3685 }
3686 }
3687 }));
3688
3689 let thread = cx
3690 .update(|cx| connection.new_session(project, Path::new(path!("/test")), cx))
3691 .await
3692 .unwrap();
3693
3694 // Track if we see a Refusal event
3695 let saw_refusal_event = Arc::new(std::sync::Mutex::new(false));
3696 let saw_refusal_event_captured = saw_refusal_event.clone();
3697 thread.update(cx, |_thread, cx| {
3698 cx.subscribe(
3699 &thread,
3700 move |_thread, _event_thread, event: &AcpThreadEvent, _cx| {
3701 if matches!(event, AcpThreadEvent::Refusal) {
3702 *saw_refusal_event_captured.lock().unwrap() = true;
3703 }
3704 },
3705 )
3706 .detach();
3707 });
3708
3709 // Send a message that will be refused
3710 refuse_next.store(true, SeqCst);
3711 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["hello".into()], cx)))
3712 .await
3713 .unwrap();
3714
3715 // Verify that a Refusal event WAS emitted for user prompt refusal
3716 assert!(
3717 *saw_refusal_event.lock().unwrap(),
3718 "Refusal event should be emitted for user prompt refusals"
3719 );
3720
3721 // Verify the message was truncated (user prompt refusal)
3722 thread.read_with(cx, |thread, cx| {
3723 assert_eq!(thread.to_markdown(cx), "");
3724 });
3725 }
3726
3727 #[gpui::test]
3728 async fn test_refusal(cx: &mut TestAppContext) {
3729 init_test(cx);
3730 let fs = FakeFs::new(cx.background_executor.clone());
3731 fs.insert_tree(path!("/"), json!({})).await;
3732 let project = Project::test(fs.clone(), [path!("/").as_ref()], cx).await;
3733
3734 let refuse_next = Arc::new(AtomicBool::new(false));
3735 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3736 let refuse_next = refuse_next.clone();
3737 move |request, thread, mut cx| {
3738 let refuse_next = refuse_next.clone();
3739 async move {
3740 if refuse_next.load(SeqCst) {
3741 return Ok(acp::PromptResponse::new(acp::StopReason::Refusal));
3742 }
3743
3744 let acp::ContentBlock::Text(content) = &request.prompt[0] else {
3745 panic!("expected text content block");
3746 };
3747 thread.update(&mut cx, |thread, cx| {
3748 thread
3749 .handle_session_update(
3750 acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk::new(
3751 content.text.to_uppercase().into(),
3752 )),
3753 cx,
3754 )
3755 .unwrap();
3756 })?;
3757 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3758 }
3759 .boxed_local()
3760 }
3761 }));
3762 let thread = cx
3763 .update(|cx| connection.new_session(project, Path::new(path!("/test")), cx))
3764 .await
3765 .unwrap();
3766
3767 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["hello".into()], cx)))
3768 .await
3769 .unwrap();
3770 thread.read_with(cx, |thread, cx| {
3771 assert_eq!(
3772 thread.to_markdown(cx),
3773 indoc! {"
3774 ## User
3775
3776 hello
3777
3778 ## Assistant
3779
3780 HELLO
3781
3782 "}
3783 );
3784 });
3785
3786 // Simulate refusing the second message. The message should be truncated
3787 // when a user prompt is refused.
3788 refuse_next.store(true, SeqCst);
3789 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["world".into()], cx)))
3790 .await
3791 .unwrap();
3792 thread.read_with(cx, |thread, cx| {
3793 assert_eq!(
3794 thread.to_markdown(cx),
3795 indoc! {"
3796 ## User
3797
3798 hello
3799
3800 ## Assistant
3801
3802 HELLO
3803
3804 "}
3805 );
3806 });
3807 }
3808
3809 async fn run_until_first_tool_call(
3810 thread: &Entity<AcpThread>,
3811 cx: &mut TestAppContext,
3812 ) -> usize {
3813 let (mut tx, mut rx) = mpsc::channel::<usize>(1);
3814
3815 let subscription = cx.update(|cx| {
3816 cx.subscribe(thread, move |thread, _, cx| {
3817 for (ix, entry) in thread.read(cx).entries.iter().enumerate() {
3818 if matches!(entry, AgentThreadEntry::ToolCall(_)) {
3819 return tx.try_send(ix).unwrap();
3820 }
3821 }
3822 })
3823 });
3824
3825 select! {
3826 _ = futures::FutureExt::fuse(cx.background_executor.timer(Duration::from_secs(10))) => {
3827 panic!("Timeout waiting for tool call")
3828 }
3829 ix = rx.next().fuse() => {
3830 drop(subscription);
3831 ix.unwrap()
3832 }
3833 }
3834 }
3835
3836 #[derive(Clone, Default)]
3837 struct FakeAgentConnection {
3838 auth_methods: Vec<acp::AuthMethod>,
3839 sessions: Arc<parking_lot::Mutex<HashMap<acp::SessionId, WeakEntity<AcpThread>>>>,
3840 on_user_message: Option<
3841 Rc<
3842 dyn Fn(
3843 acp::PromptRequest,
3844 WeakEntity<AcpThread>,
3845 AsyncApp,
3846 ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
3847 + 'static,
3848 >,
3849 >,
3850 }
3851
3852 impl FakeAgentConnection {
3853 fn new() -> Self {
3854 Self {
3855 auth_methods: Vec::new(),
3856 on_user_message: None,
3857 sessions: Arc::default(),
3858 }
3859 }
3860
3861 #[expect(unused)]
3862 fn with_auth_methods(mut self, auth_methods: Vec<acp::AuthMethod>) -> Self {
3863 self.auth_methods = auth_methods;
3864 self
3865 }
3866
3867 fn on_user_message(
3868 mut self,
3869 handler: impl Fn(
3870 acp::PromptRequest,
3871 WeakEntity<AcpThread>,
3872 AsyncApp,
3873 ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
3874 + 'static,
3875 ) -> Self {
3876 self.on_user_message.replace(Rc::new(handler));
3877 self
3878 }
3879 }
3880
3881 impl AgentConnection for FakeAgentConnection {
3882 fn telemetry_id(&self) -> SharedString {
3883 "fake".into()
3884 }
3885
3886 fn auth_methods(&self) -> &[acp::AuthMethod] {
3887 &self.auth_methods
3888 }
3889
3890 fn new_session(
3891 self: Rc<Self>,
3892 project: Entity<Project>,
3893 _cwd: &Path,
3894 cx: &mut App,
3895 ) -> Task<gpui::Result<Entity<AcpThread>>> {
3896 let session_id = acp::SessionId::new(
3897 rand::rng()
3898 .sample_iter(&distr::Alphanumeric)
3899 .take(7)
3900 .map(char::from)
3901 .collect::<String>(),
3902 );
3903 let action_log = cx.new(|_| ActionLog::new(project.clone()));
3904 let thread = cx.new(|cx| {
3905 AcpThread::new(
3906 None,
3907 "Test",
3908 self.clone(),
3909 project,
3910 action_log,
3911 session_id.clone(),
3912 watch::Receiver::constant(
3913 acp::PromptCapabilities::new()
3914 .image(true)
3915 .audio(true)
3916 .embedded_context(true),
3917 ),
3918 cx,
3919 )
3920 });
3921 self.sessions.lock().insert(session_id, thread.downgrade());
3922 Task::ready(Ok(thread))
3923 }
3924
3925 fn authenticate(&self, method: acp::AuthMethodId, _cx: &mut App) -> Task<gpui::Result<()>> {
3926 if self.auth_methods().iter().any(|m| m.id == method) {
3927 Task::ready(Ok(()))
3928 } else {
3929 Task::ready(Err(anyhow!("Invalid Auth Method")))
3930 }
3931 }
3932
3933 fn prompt(
3934 &self,
3935 _id: Option<UserMessageId>,
3936 params: acp::PromptRequest,
3937 cx: &mut App,
3938 ) -> Task<gpui::Result<acp::PromptResponse>> {
3939 let sessions = self.sessions.lock();
3940 let thread = sessions.get(¶ms.session_id).unwrap();
3941 if let Some(handler) = &self.on_user_message {
3942 let handler = handler.clone();
3943 let thread = thread.clone();
3944 cx.spawn(async move |cx| handler(params, thread, cx.clone()).await)
3945 } else {
3946 Task::ready(Ok(acp::PromptResponse::new(acp::StopReason::EndTurn)))
3947 }
3948 }
3949
3950 fn cancel(&self, session_id: &acp::SessionId, cx: &mut App) {
3951 let sessions = self.sessions.lock();
3952 let thread = sessions.get(session_id).unwrap().clone();
3953
3954 cx.spawn(async move |cx| {
3955 thread
3956 .update(cx, |thread, cx| thread.cancel(cx))
3957 .unwrap()
3958 .await
3959 })
3960 .detach();
3961 }
3962
3963 fn truncate(
3964 &self,
3965 session_id: &acp::SessionId,
3966 _cx: &App,
3967 ) -> Option<Rc<dyn AgentSessionTruncate>> {
3968 Some(Rc::new(FakeAgentSessionEditor {
3969 _session_id: session_id.clone(),
3970 }))
3971 }
3972
3973 fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
3974 self
3975 }
3976 }
3977
3978 struct FakeAgentSessionEditor {
3979 _session_id: acp::SessionId,
3980 }
3981
3982 impl AgentSessionTruncate for FakeAgentSessionEditor {
3983 fn run(&self, _message_id: UserMessageId, _cx: &mut App) -> Task<Result<()>> {
3984 Task::ready(Ok(()))
3985 }
3986 }
3987
3988 #[gpui::test]
3989 async fn test_tool_call_not_found_creates_failed_entry(cx: &mut TestAppContext) {
3990 init_test(cx);
3991
3992 let fs = FakeFs::new(cx.executor());
3993 let project = Project::test(fs, [], cx).await;
3994 let connection = Rc::new(FakeAgentConnection::new());
3995 let thread = cx
3996 .update(|cx| connection.new_session(project, Path::new(path!("/test")), cx))
3997 .await
3998 .unwrap();
3999
4000 // Try to update a tool call that doesn't exist
4001 let nonexistent_id = acp::ToolCallId::new("nonexistent-tool-call");
4002 thread.update(cx, |thread, cx| {
4003 let result = thread.handle_session_update(
4004 acp::SessionUpdate::ToolCallUpdate(acp::ToolCallUpdate::new(
4005 nonexistent_id.clone(),
4006 acp::ToolCallUpdateFields::new().status(acp::ToolCallStatus::Completed),
4007 )),
4008 cx,
4009 );
4010
4011 // The update should succeed (not return an error)
4012 assert!(result.is_ok());
4013
4014 // There should now be exactly one entry in the thread
4015 assert_eq!(thread.entries.len(), 1);
4016
4017 // The entry should be a failed tool call
4018 if let AgentThreadEntry::ToolCall(tool_call) = &thread.entries[0] {
4019 assert_eq!(tool_call.id, nonexistent_id);
4020 assert!(matches!(tool_call.status, ToolCallStatus::Failed));
4021 assert_eq!(tool_call.kind, acp::ToolKind::Fetch);
4022
4023 // Check that the content contains the error message
4024 assert_eq!(tool_call.content.len(), 1);
4025 if let ToolCallContent::ContentBlock(content_block) = &tool_call.content[0] {
4026 match content_block {
4027 ContentBlock::Markdown { markdown } => {
4028 let markdown_text = markdown.read(cx).source();
4029 assert!(markdown_text.contains("Tool call not found"));
4030 }
4031 ContentBlock::Empty => panic!("Expected markdown content, got empty"),
4032 ContentBlock::ResourceLink { .. } => {
4033 panic!("Expected markdown content, got resource link")
4034 }
4035 ContentBlock::Image { .. } => {
4036 panic!("Expected markdown content, got image")
4037 }
4038 }
4039 } else {
4040 panic!("Expected ContentBlock, got: {:?}", tool_call.content[0]);
4041 }
4042 } else {
4043 panic!("Expected ToolCall entry, got: {:?}", thread.entries[0]);
4044 }
4045 });
4046 }
4047
4048 /// Tests that restoring a checkpoint properly cleans up terminals that were
4049 /// created after that checkpoint, and cancels any in-progress generation.
4050 ///
4051 /// Reproduces issue #35142: When a checkpoint is restored, any terminal processes
4052 /// that were started after that checkpoint should be terminated, and any in-progress
4053 /// AI generation should be canceled.
4054 #[gpui::test]
4055 async fn test_restore_checkpoint_kills_terminal(cx: &mut TestAppContext) {
4056 init_test(cx);
4057
4058 let fs = FakeFs::new(cx.executor());
4059 let project = Project::test(fs, [], cx).await;
4060 let connection = Rc::new(FakeAgentConnection::new());
4061 let thread = cx
4062 .update(|cx| connection.new_session(project, Path::new(path!("/test")), cx))
4063 .await
4064 .unwrap();
4065
4066 // Send first user message to create a checkpoint
4067 cx.update(|cx| {
4068 thread.update(cx, |thread, cx| {
4069 thread.send(vec!["first message".into()], cx)
4070 })
4071 })
4072 .await
4073 .unwrap();
4074
4075 // Send second message (creates another checkpoint) - we'll restore to this one
4076 cx.update(|cx| {
4077 thread.update(cx, |thread, cx| {
4078 thread.send(vec!["second message".into()], cx)
4079 })
4080 })
4081 .await
4082 .unwrap();
4083
4084 // Create 2 terminals BEFORE the checkpoint that have completed running
4085 let terminal_id_1 = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
4086 let mock_terminal_1 = cx.new(|cx| {
4087 let builder = ::terminal::TerminalBuilder::new_display_only(
4088 ::terminal::terminal_settings::CursorShape::default(),
4089 ::terminal::terminal_settings::AlternateScroll::On,
4090 None,
4091 0,
4092 cx.background_executor(),
4093 PathStyle::local(),
4094 )
4095 .unwrap();
4096 builder.subscribe(cx)
4097 });
4098
4099 thread.update(cx, |thread, cx| {
4100 thread.on_terminal_provider_event(
4101 TerminalProviderEvent::Created {
4102 terminal_id: terminal_id_1.clone(),
4103 label: "echo 'first'".to_string(),
4104 cwd: Some(PathBuf::from("/test")),
4105 output_byte_limit: None,
4106 terminal: mock_terminal_1.clone(),
4107 },
4108 cx,
4109 );
4110 });
4111
4112 thread.update(cx, |thread, cx| {
4113 thread.on_terminal_provider_event(
4114 TerminalProviderEvent::Output {
4115 terminal_id: terminal_id_1.clone(),
4116 data: b"first\n".to_vec(),
4117 },
4118 cx,
4119 );
4120 });
4121
4122 thread.update(cx, |thread, cx| {
4123 thread.on_terminal_provider_event(
4124 TerminalProviderEvent::Exit {
4125 terminal_id: terminal_id_1.clone(),
4126 status: acp::TerminalExitStatus::new().exit_code(0),
4127 },
4128 cx,
4129 );
4130 });
4131
4132 let terminal_id_2 = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
4133 let mock_terminal_2 = cx.new(|cx| {
4134 let builder = ::terminal::TerminalBuilder::new_display_only(
4135 ::terminal::terminal_settings::CursorShape::default(),
4136 ::terminal::terminal_settings::AlternateScroll::On,
4137 None,
4138 0,
4139 cx.background_executor(),
4140 PathStyle::local(),
4141 )
4142 .unwrap();
4143 builder.subscribe(cx)
4144 });
4145
4146 thread.update(cx, |thread, cx| {
4147 thread.on_terminal_provider_event(
4148 TerminalProviderEvent::Created {
4149 terminal_id: terminal_id_2.clone(),
4150 label: "echo 'second'".to_string(),
4151 cwd: Some(PathBuf::from("/test")),
4152 output_byte_limit: None,
4153 terminal: mock_terminal_2.clone(),
4154 },
4155 cx,
4156 );
4157 });
4158
4159 thread.update(cx, |thread, cx| {
4160 thread.on_terminal_provider_event(
4161 TerminalProviderEvent::Output {
4162 terminal_id: terminal_id_2.clone(),
4163 data: b"second\n".to_vec(),
4164 },
4165 cx,
4166 );
4167 });
4168
4169 thread.update(cx, |thread, cx| {
4170 thread.on_terminal_provider_event(
4171 TerminalProviderEvent::Exit {
4172 terminal_id: terminal_id_2.clone(),
4173 status: acp::TerminalExitStatus::new().exit_code(0),
4174 },
4175 cx,
4176 );
4177 });
4178
4179 // Get the second message ID to restore to
4180 let second_message_id = thread.read_with(cx, |thread, _| {
4181 // At this point we have:
4182 // - Index 0: First user message (with checkpoint)
4183 // - Index 1: Second user message (with checkpoint)
4184 // No assistant responses because FakeAgentConnection just returns EndTurn
4185 let AgentThreadEntry::UserMessage(message) = &thread.entries[1] else {
4186 panic!("expected user message at index 1");
4187 };
4188 message.id.clone().unwrap()
4189 });
4190
4191 // Create a terminal AFTER the checkpoint we'll restore to.
4192 // This simulates the AI agent starting a long-running terminal command.
4193 let terminal_id = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
4194 let mock_terminal = cx.new(|cx| {
4195 let builder = ::terminal::TerminalBuilder::new_display_only(
4196 ::terminal::terminal_settings::CursorShape::default(),
4197 ::terminal::terminal_settings::AlternateScroll::On,
4198 None,
4199 0,
4200 cx.background_executor(),
4201 PathStyle::local(),
4202 )
4203 .unwrap();
4204 builder.subscribe(cx)
4205 });
4206
4207 // Register the terminal as created
4208 thread.update(cx, |thread, cx| {
4209 thread.on_terminal_provider_event(
4210 TerminalProviderEvent::Created {
4211 terminal_id: terminal_id.clone(),
4212 label: "sleep 1000".to_string(),
4213 cwd: Some(PathBuf::from("/test")),
4214 output_byte_limit: None,
4215 terminal: mock_terminal.clone(),
4216 },
4217 cx,
4218 );
4219 });
4220
4221 // Simulate the terminal producing output (still running)
4222 thread.update(cx, |thread, cx| {
4223 thread.on_terminal_provider_event(
4224 TerminalProviderEvent::Output {
4225 terminal_id: terminal_id.clone(),
4226 data: b"terminal is running...\n".to_vec(),
4227 },
4228 cx,
4229 );
4230 });
4231
4232 // Create a tool call entry that references this terminal
4233 // This represents the agent requesting a terminal command
4234 thread.update(cx, |thread, cx| {
4235 thread
4236 .handle_session_update(
4237 acp::SessionUpdate::ToolCall(
4238 acp::ToolCall::new("terminal-tool-1", "Running command")
4239 .kind(acp::ToolKind::Execute)
4240 .status(acp::ToolCallStatus::InProgress)
4241 .content(vec![acp::ToolCallContent::Terminal(acp::Terminal::new(
4242 terminal_id.clone(),
4243 ))])
4244 .raw_input(serde_json::json!({"command": "sleep 1000", "cd": "/test"})),
4245 ),
4246 cx,
4247 )
4248 .unwrap();
4249 });
4250
4251 // Verify terminal exists and is in the thread
4252 let terminal_exists_before =
4253 thread.read_with(cx, |thread, _| thread.terminals.contains_key(&terminal_id));
4254 assert!(
4255 terminal_exists_before,
4256 "Terminal should exist before checkpoint restore"
4257 );
4258
4259 // Verify the terminal's underlying task is still running (not completed)
4260 let terminal_running_before = thread.read_with(cx, |thread, _cx| {
4261 let terminal_entity = thread.terminals.get(&terminal_id).unwrap();
4262 terminal_entity.read_with(cx, |term, _cx| {
4263 term.output().is_none() // output is None means it's still running
4264 })
4265 });
4266 assert!(
4267 terminal_running_before,
4268 "Terminal should be running before checkpoint restore"
4269 );
4270
4271 // Verify we have the expected entries before restore
4272 let entry_count_before = thread.read_with(cx, |thread, _| thread.entries.len());
4273 assert!(
4274 entry_count_before > 1,
4275 "Should have multiple entries before restore"
4276 );
4277
4278 // Restore the checkpoint to the second message.
4279 // This should:
4280 // 1. Cancel any in-progress generation (via the cancel() call)
4281 // 2. Remove the terminal that was created after that point
4282 thread
4283 .update(cx, |thread, cx| {
4284 thread.restore_checkpoint(second_message_id, cx)
4285 })
4286 .await
4287 .unwrap();
4288
4289 // Verify that no send_task is in progress after restore
4290 // (cancel() clears the send_task)
4291 let has_send_task_after = thread.read_with(cx, |thread, _| thread.send_task.is_some());
4292 assert!(
4293 !has_send_task_after,
4294 "Should not have a send_task after restore (cancel should have cleared it)"
4295 );
4296
4297 // Verify the entries were truncated (restoring to index 1 truncates at 1, keeping only index 0)
4298 let entry_count = thread.read_with(cx, |thread, _| thread.entries.len());
4299 assert_eq!(
4300 entry_count, 1,
4301 "Should have 1 entry after restore (only the first user message)"
4302 );
4303
4304 // Verify the 2 completed terminals from before the checkpoint still exist
4305 let terminal_1_exists = thread.read_with(cx, |thread, _| {
4306 thread.terminals.contains_key(&terminal_id_1)
4307 });
4308 assert!(
4309 terminal_1_exists,
4310 "Terminal 1 (from before checkpoint) should still exist"
4311 );
4312
4313 let terminal_2_exists = thread.read_with(cx, |thread, _| {
4314 thread.terminals.contains_key(&terminal_id_2)
4315 });
4316 assert!(
4317 terminal_2_exists,
4318 "Terminal 2 (from before checkpoint) should still exist"
4319 );
4320
4321 // Verify they're still in completed state
4322 let terminal_1_completed = thread.read_with(cx, |thread, _cx| {
4323 let terminal_entity = thread.terminals.get(&terminal_id_1).unwrap();
4324 terminal_entity.read_with(cx, |term, _cx| term.output().is_some())
4325 });
4326 assert!(terminal_1_completed, "Terminal 1 should still be completed");
4327
4328 let terminal_2_completed = thread.read_with(cx, |thread, _cx| {
4329 let terminal_entity = thread.terminals.get(&terminal_id_2).unwrap();
4330 terminal_entity.read_with(cx, |term, _cx| term.output().is_some())
4331 });
4332 assert!(terminal_2_completed, "Terminal 2 should still be completed");
4333
4334 // Verify the running terminal (created after checkpoint) was removed
4335 let terminal_3_exists =
4336 thread.read_with(cx, |thread, _| thread.terminals.contains_key(&terminal_id));
4337 assert!(
4338 !terminal_3_exists,
4339 "Terminal 3 (created after checkpoint) should have been removed"
4340 );
4341
4342 // Verify total count is 2 (the two from before the checkpoint)
4343 let terminal_count = thread.read_with(cx, |thread, _| thread.terminals.len());
4344 assert_eq!(
4345 terminal_count, 2,
4346 "Should have exactly 2 terminals (the completed ones from before checkpoint)"
4347 );
4348 }
4349
4350 /// Tests that update_last_checkpoint correctly updates the original message's checkpoint
4351 /// even when a new user message is added while the async checkpoint comparison is in progress.
4352 ///
4353 /// This is a regression test for a bug where update_last_checkpoint would fail with
4354 /// "no checkpoint" if a new user message (without a checkpoint) was added between when
4355 /// update_last_checkpoint started and when its async closure ran.
4356 #[gpui::test]
4357 async fn test_update_last_checkpoint_with_new_message_added(cx: &mut TestAppContext) {
4358 init_test(cx);
4359
4360 let fs = FakeFs::new(cx.executor());
4361 fs.insert_tree(path!("/test"), json!({".git": {}, "file.txt": "content"}))
4362 .await;
4363 let project = Project::test(fs.clone(), [Path::new(path!("/test"))], cx).await;
4364
4365 let handler_done = Arc::new(AtomicBool::new(false));
4366 let handler_done_clone = handler_done.clone();
4367 let connection = Rc::new(FakeAgentConnection::new().on_user_message(
4368 move |_, _thread, _cx| {
4369 handler_done_clone.store(true, SeqCst);
4370 async move { Ok(acp::PromptResponse::new(acp::StopReason::EndTurn)) }.boxed_local()
4371 },
4372 ));
4373
4374 let thread = cx
4375 .update(|cx| connection.new_session(project, Path::new(path!("/test")), cx))
4376 .await
4377 .unwrap();
4378
4379 let send_future = thread.update(cx, |thread, cx| thread.send_raw("First message", cx));
4380 let send_task = cx.background_executor.spawn(send_future);
4381
4382 // Tick until handler completes, then a few more to let update_last_checkpoint start
4383 while !handler_done.load(SeqCst) {
4384 cx.executor().tick();
4385 }
4386 for _ in 0..5 {
4387 cx.executor().tick();
4388 }
4389
4390 thread.update(cx, |thread, cx| {
4391 thread.push_entry(
4392 AgentThreadEntry::UserMessage(UserMessage {
4393 id: Some(UserMessageId::new()),
4394 content: ContentBlock::Empty,
4395 chunks: vec!["Injected message (no checkpoint)".into()],
4396 checkpoint: None,
4397 indented: false,
4398 }),
4399 cx,
4400 );
4401 });
4402
4403 cx.run_until_parked();
4404 let result = send_task.await;
4405
4406 assert!(
4407 result.is_ok(),
4408 "send should succeed even when new message added during update_last_checkpoint: {:?}",
4409 result.err()
4410 );
4411 }
4412}