1mod connection;
2mod diff;
3mod mention;
4mod terminal;
5
6use agent_settings::AgentSettings;
7
8/// Key used in ACP ToolCall meta to store the tool's programmatic name.
9/// This is a workaround since ACP's ToolCall doesn't have a dedicated name field.
10pub const TOOL_NAME_META_KEY: &str = "tool_name";
11
12/// The tool name for subagent spawning
13pub const SUBAGENT_TOOL_NAME: &str = "subagent";
14
15/// Helper to extract tool name from ACP meta
16pub fn tool_name_from_meta(meta: &Option<acp::Meta>) -> Option<SharedString> {
17 meta.as_ref()
18 .and_then(|m| m.get(TOOL_NAME_META_KEY))
19 .and_then(|v| v.as_str())
20 .map(|s| SharedString::from(s.to_owned()))
21}
22
23/// Helper to create meta with tool name
24pub fn meta_with_tool_name(tool_name: &str) -> acp::Meta {
25 acp::Meta::from_iter([(TOOL_NAME_META_KEY.into(), tool_name.into())])
26}
27use collections::HashSet;
28pub use connection::*;
29pub use diff::*;
30use language::language_settings::FormatOnSave;
31pub use mention::*;
32use project::lsp_store::{FormatTrigger, LspFormatTarget};
33use serde::{Deserialize, Serialize};
34use serde_json::to_string_pretty;
35use settings::Settings as _;
36use task::{Shell, ShellBuilder};
37pub use terminal::*;
38
39use action_log::{ActionLog, ActionLogTelemetry};
40use agent_client_protocol::{self as acp};
41use anyhow::{Context as _, Result, anyhow};
42use editor::Bias;
43use futures::{FutureExt, channel::oneshot, future::BoxFuture};
44use gpui::{AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Task, WeakEntity};
45use itertools::Itertools;
46use language::{Anchor, Buffer, BufferSnapshot, LanguageRegistry, Point, ToPoint, text_diff};
47use markdown::Markdown;
48use project::{AgentLocation, Project, git_store::GitStoreCheckpoint};
49use std::collections::HashMap;
50use std::error::Error;
51use std::fmt::{Formatter, Write};
52use std::ops::Range;
53use std::process::ExitStatus;
54use std::rc::Rc;
55use std::time::{Duration, Instant};
56use std::{fmt::Display, mem, path::PathBuf, sync::Arc};
57use ui::App;
58use util::{ResultExt, get_default_system_shell_preferring_bash, paths::PathStyle};
59use uuid::Uuid;
60
61#[derive(Debug)]
62pub struct UserMessage {
63 pub id: Option<UserMessageId>,
64 pub content: ContentBlock,
65 pub chunks: Vec<acp::ContentBlock>,
66 pub checkpoint: Option<Checkpoint>,
67 pub indented: bool,
68}
69
70#[derive(Debug)]
71pub struct Checkpoint {
72 git_checkpoint: GitStoreCheckpoint,
73 pub show: bool,
74}
75
76impl UserMessage {
77 fn to_markdown(&self, cx: &App) -> String {
78 let mut markdown = String::new();
79 if self
80 .checkpoint
81 .as_ref()
82 .is_some_and(|checkpoint| checkpoint.show)
83 {
84 writeln!(markdown, "## User (checkpoint)").unwrap();
85 } else {
86 writeln!(markdown, "## User").unwrap();
87 }
88 writeln!(markdown).unwrap();
89 writeln!(markdown, "{}", self.content.to_markdown(cx)).unwrap();
90 writeln!(markdown).unwrap();
91 markdown
92 }
93}
94
95#[derive(Debug, PartialEq)]
96pub struct AssistantMessage {
97 pub chunks: Vec<AssistantMessageChunk>,
98 pub indented: bool,
99}
100
101impl AssistantMessage {
102 pub fn to_markdown(&self, cx: &App) -> String {
103 format!(
104 "## Assistant\n\n{}\n\n",
105 self.chunks
106 .iter()
107 .map(|chunk| chunk.to_markdown(cx))
108 .join("\n\n")
109 )
110 }
111}
112
113#[derive(Debug, PartialEq)]
114pub enum AssistantMessageChunk {
115 Message { block: ContentBlock },
116 Thought { block: ContentBlock },
117}
118
119impl AssistantMessageChunk {
120 pub fn from_str(
121 chunk: &str,
122 language_registry: &Arc<LanguageRegistry>,
123 path_style: PathStyle,
124 cx: &mut App,
125 ) -> Self {
126 Self::Message {
127 block: ContentBlock::new(chunk.into(), language_registry, path_style, cx),
128 }
129 }
130
131 fn to_markdown(&self, cx: &App) -> String {
132 match self {
133 Self::Message { block } => block.to_markdown(cx).to_string(),
134 Self::Thought { block } => {
135 format!("<thinking>\n{}\n</thinking>", block.to_markdown(cx))
136 }
137 }
138 }
139}
140
141#[derive(Debug)]
142pub enum AgentThreadEntry {
143 UserMessage(UserMessage),
144 AssistantMessage(AssistantMessage),
145 ToolCall(ToolCall),
146}
147
148impl AgentThreadEntry {
149 pub fn is_indented(&self) -> bool {
150 match self {
151 Self::UserMessage(message) => message.indented,
152 Self::AssistantMessage(message) => message.indented,
153 Self::ToolCall(_) => false,
154 }
155 }
156
157 pub fn to_markdown(&self, cx: &App) -> String {
158 match self {
159 Self::UserMessage(message) => message.to_markdown(cx),
160 Self::AssistantMessage(message) => message.to_markdown(cx),
161 Self::ToolCall(tool_call) => tool_call.to_markdown(cx),
162 }
163 }
164
165 pub fn user_message(&self) -> Option<&UserMessage> {
166 if let AgentThreadEntry::UserMessage(message) = self {
167 Some(message)
168 } else {
169 None
170 }
171 }
172
173 pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
174 if let AgentThreadEntry::ToolCall(call) = self {
175 itertools::Either::Left(call.diffs())
176 } else {
177 itertools::Either::Right(std::iter::empty())
178 }
179 }
180
181 pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
182 if let AgentThreadEntry::ToolCall(call) = self {
183 itertools::Either::Left(call.terminals())
184 } else {
185 itertools::Either::Right(std::iter::empty())
186 }
187 }
188
189 pub fn location(&self, ix: usize) -> Option<(acp::ToolCallLocation, AgentLocation)> {
190 if let AgentThreadEntry::ToolCall(ToolCall {
191 locations,
192 resolved_locations,
193 ..
194 }) = self
195 {
196 Some((
197 locations.get(ix)?.clone(),
198 resolved_locations.get(ix)?.clone()?,
199 ))
200 } else {
201 None
202 }
203 }
204}
205
206#[derive(Debug)]
207pub struct ToolCall {
208 pub id: acp::ToolCallId,
209 pub label: Entity<Markdown>,
210 pub kind: acp::ToolKind,
211 pub content: Vec<ToolCallContent>,
212 pub status: ToolCallStatus,
213 pub locations: Vec<acp::ToolCallLocation>,
214 pub resolved_locations: Vec<Option<AgentLocation>>,
215 pub raw_input: Option<serde_json::Value>,
216 pub raw_input_markdown: Option<Entity<Markdown>>,
217 pub raw_output: Option<serde_json::Value>,
218 pub tool_name: Option<SharedString>,
219}
220
221impl ToolCall {
222 fn from_acp(
223 tool_call: acp::ToolCall,
224 status: ToolCallStatus,
225 language_registry: Arc<LanguageRegistry>,
226 path_style: PathStyle,
227 terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
228 cx: &mut App,
229 ) -> Result<Self> {
230 let title = if let Some((first_line, _)) = tool_call.title.split_once("\n") {
231 first_line.to_owned() + "…"
232 } else {
233 tool_call.title
234 };
235 let mut content = Vec::with_capacity(tool_call.content.len());
236 for item in tool_call.content {
237 if let Some(item) = ToolCallContent::from_acp(
238 item,
239 language_registry.clone(),
240 path_style,
241 terminals,
242 cx,
243 )? {
244 content.push(item);
245 }
246 }
247
248 let raw_input_markdown = tool_call
249 .raw_input
250 .as_ref()
251 .and_then(|input| markdown_for_raw_output(input, &language_registry, cx));
252
253 let tool_name = tool_name_from_meta(&tool_call.meta);
254
255 let result = Self {
256 id: tool_call.tool_call_id,
257 label: cx
258 .new(|cx| Markdown::new(title.into(), Some(language_registry.clone()), None, cx)),
259 kind: tool_call.kind,
260 content,
261 locations: tool_call.locations,
262 resolved_locations: Vec::default(),
263 status,
264 raw_input: tool_call.raw_input,
265 raw_input_markdown,
266 raw_output: tool_call.raw_output,
267 tool_name,
268 };
269 Ok(result)
270 }
271
272 fn update_fields(
273 &mut self,
274 fields: acp::ToolCallUpdateFields,
275 language_registry: Arc<LanguageRegistry>,
276 path_style: PathStyle,
277 terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
278 cx: &mut App,
279 ) -> Result<()> {
280 let acp::ToolCallUpdateFields {
281 kind,
282 status,
283 title,
284 content,
285 locations,
286 raw_input,
287 raw_output,
288 ..
289 } = fields;
290
291 if let Some(kind) = kind {
292 self.kind = kind;
293 }
294
295 if let Some(status) = status {
296 self.status = status.into();
297 }
298
299 if let Some(title) = title {
300 self.label.update(cx, |label, cx| {
301 if let Some((first_line, _)) = title.split_once("\n") {
302 label.replace(first_line.to_owned() + "…", cx)
303 } else {
304 label.replace(title, cx);
305 }
306 });
307 }
308
309 if let Some(content) = content {
310 let mut new_content_len = content.len();
311 let mut content = content.into_iter();
312
313 // Reuse existing content if we can
314 for (old, new) in self.content.iter_mut().zip(content.by_ref()) {
315 let valid_content =
316 old.update_from_acp(new, language_registry.clone(), path_style, terminals, cx)?;
317 if !valid_content {
318 new_content_len -= 1;
319 }
320 }
321 for new in content {
322 if let Some(new) = ToolCallContent::from_acp(
323 new,
324 language_registry.clone(),
325 path_style,
326 terminals,
327 cx,
328 )? {
329 self.content.push(new);
330 } else {
331 new_content_len -= 1;
332 }
333 }
334 self.content.truncate(new_content_len);
335 }
336
337 if let Some(locations) = locations {
338 self.locations = locations;
339 }
340
341 if let Some(raw_input) = raw_input {
342 self.raw_input_markdown = markdown_for_raw_output(&raw_input, &language_registry, cx);
343 self.raw_input = Some(raw_input);
344 }
345
346 if let Some(raw_output) = raw_output {
347 if self.content.is_empty()
348 && let Some(markdown) = markdown_for_raw_output(&raw_output, &language_registry, cx)
349 {
350 self.content
351 .push(ToolCallContent::ContentBlock(ContentBlock::Markdown {
352 markdown,
353 }));
354 }
355 self.raw_output = Some(raw_output);
356 }
357 Ok(())
358 }
359
360 pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
361 self.content.iter().filter_map(|content| match content {
362 ToolCallContent::Diff(diff) => Some(diff),
363 ToolCallContent::ContentBlock(_) => None,
364 ToolCallContent::Terminal(_) => None,
365 ToolCallContent::SubagentThread(_) => None,
366 })
367 }
368
369 pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
370 self.content.iter().filter_map(|content| match content {
371 ToolCallContent::Terminal(terminal) => Some(terminal),
372 ToolCallContent::ContentBlock(_) => None,
373 ToolCallContent::Diff(_) => None,
374 ToolCallContent::SubagentThread(_) => None,
375 })
376 }
377
378 pub fn subagent_thread(&self) -> Option<&Entity<AcpThread>> {
379 self.content.iter().find_map(|content| match content {
380 ToolCallContent::SubagentThread(thread) => Some(thread),
381 _ => None,
382 })
383 }
384
385 pub fn is_subagent(&self) -> bool {
386 matches!(self.kind, acp::ToolKind::Other)
387 && self
388 .tool_name
389 .as_ref()
390 .map(|n| n.as_ref() == SUBAGENT_TOOL_NAME)
391 .unwrap_or(false)
392 }
393
394 fn to_markdown(&self, cx: &App) -> String {
395 let mut markdown = format!(
396 "**Tool Call: {}**\nStatus: {}\n\n",
397 self.label.read(cx).source(),
398 self.status
399 );
400 for content in &self.content {
401 markdown.push_str(content.to_markdown(cx).as_str());
402 markdown.push_str("\n\n");
403 }
404 markdown
405 }
406
407 async fn resolve_location(
408 location: acp::ToolCallLocation,
409 project: WeakEntity<Project>,
410 cx: &mut AsyncApp,
411 ) -> Option<ResolvedLocation> {
412 let buffer = project
413 .update(cx, |project, cx| {
414 project
415 .project_path_for_absolute_path(&location.path, cx)
416 .map(|path| project.open_buffer(path, cx))
417 })
418 .ok()??;
419 let buffer = buffer.await.log_err()?;
420 let position = buffer.update(cx, |buffer, _| {
421 let snapshot = buffer.snapshot();
422 if let Some(row) = location.line {
423 let column = snapshot.indent_size_for_line(row).len;
424 let point = snapshot.clip_point(Point::new(row, column), Bias::Left);
425 snapshot.anchor_before(point)
426 } else {
427 Anchor::min_for_buffer(snapshot.remote_id())
428 }
429 });
430
431 Some(ResolvedLocation { buffer, position })
432 }
433
434 fn resolve_locations(
435 &self,
436 project: Entity<Project>,
437 cx: &mut App,
438 ) -> Task<Vec<Option<ResolvedLocation>>> {
439 let locations = self.locations.clone();
440 project.update(cx, |_, cx| {
441 cx.spawn(async move |project, cx| {
442 let mut new_locations = Vec::new();
443 for location in locations {
444 new_locations.push(Self::resolve_location(location, project.clone(), cx).await);
445 }
446 new_locations
447 })
448 })
449 }
450}
451
452// Separate so we can hold a strong reference to the buffer
453// for saving on the thread
454#[derive(Clone, Debug, PartialEq, Eq)]
455struct ResolvedLocation {
456 buffer: Entity<Buffer>,
457 position: Anchor,
458}
459
460impl From<&ResolvedLocation> for AgentLocation {
461 fn from(value: &ResolvedLocation) -> Self {
462 Self {
463 buffer: value.buffer.downgrade(),
464 position: value.position,
465 }
466 }
467}
468
469#[derive(Debug)]
470pub enum ToolCallStatus {
471 /// The tool call hasn't started running yet, but we start showing it to
472 /// the user.
473 Pending,
474 /// The tool call is waiting for confirmation from the user.
475 WaitingForConfirmation {
476 options: Vec<acp::PermissionOption>,
477 respond_tx: oneshot::Sender<acp::PermissionOptionId>,
478 },
479 /// The tool call is currently running.
480 InProgress,
481 /// The tool call completed successfully.
482 Completed,
483 /// The tool call failed.
484 Failed,
485 /// The user rejected the tool call.
486 Rejected,
487 /// The user canceled generation so the tool call was canceled.
488 Canceled,
489}
490
491impl From<acp::ToolCallStatus> for ToolCallStatus {
492 fn from(status: acp::ToolCallStatus) -> Self {
493 match status {
494 acp::ToolCallStatus::Pending => Self::Pending,
495 acp::ToolCallStatus::InProgress => Self::InProgress,
496 acp::ToolCallStatus::Completed => Self::Completed,
497 acp::ToolCallStatus::Failed => Self::Failed,
498 _ => Self::Pending,
499 }
500 }
501}
502
503impl Display for ToolCallStatus {
504 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
505 write!(
506 f,
507 "{}",
508 match self {
509 ToolCallStatus::Pending => "Pending",
510 ToolCallStatus::WaitingForConfirmation { .. } => "Waiting for confirmation",
511 ToolCallStatus::InProgress => "In Progress",
512 ToolCallStatus::Completed => "Completed",
513 ToolCallStatus::Failed => "Failed",
514 ToolCallStatus::Rejected => "Rejected",
515 ToolCallStatus::Canceled => "Canceled",
516 }
517 )
518 }
519}
520
521#[derive(Debug, PartialEq, Clone)]
522pub enum ContentBlock {
523 Empty,
524 Markdown { markdown: Entity<Markdown> },
525 ResourceLink { resource_link: acp::ResourceLink },
526 Image { image: Arc<gpui::Image> },
527}
528
529impl ContentBlock {
530 pub fn new(
531 block: acp::ContentBlock,
532 language_registry: &Arc<LanguageRegistry>,
533 path_style: PathStyle,
534 cx: &mut App,
535 ) -> Self {
536 let mut this = Self::Empty;
537 this.append(block, language_registry, path_style, cx);
538 this
539 }
540
541 pub fn new_combined(
542 blocks: impl IntoIterator<Item = acp::ContentBlock>,
543 language_registry: Arc<LanguageRegistry>,
544 path_style: PathStyle,
545 cx: &mut App,
546 ) -> Self {
547 let mut this = Self::Empty;
548 for block in blocks {
549 this.append(block, &language_registry, path_style, cx);
550 }
551 this
552 }
553
554 pub fn append(
555 &mut self,
556 block: acp::ContentBlock,
557 language_registry: &Arc<LanguageRegistry>,
558 path_style: PathStyle,
559 cx: &mut App,
560 ) {
561 match (&mut *self, &block) {
562 (ContentBlock::Empty, acp::ContentBlock::ResourceLink(resource_link)) => {
563 *self = ContentBlock::ResourceLink {
564 resource_link: resource_link.clone(),
565 };
566 }
567 (ContentBlock::Empty, acp::ContentBlock::Image(image_content)) => {
568 if let Some(image) = Self::decode_image(image_content) {
569 *self = ContentBlock::Image { image };
570 } else {
571 let new_content = Self::image_md(image_content);
572 *self = Self::create_markdown_block(new_content, language_registry, cx);
573 }
574 }
575 (ContentBlock::Empty, _) => {
576 let new_content = Self::block_string_contents(&block, path_style);
577 *self = Self::create_markdown_block(new_content, language_registry, cx);
578 }
579 (ContentBlock::Markdown { markdown }, _) => {
580 let new_content = Self::block_string_contents(&block, path_style);
581 markdown.update(cx, |markdown, cx| markdown.append(&new_content, cx));
582 }
583 (ContentBlock::ResourceLink { resource_link }, _) => {
584 let existing_content = Self::resource_link_md(&resource_link.uri, path_style);
585 let new_content = Self::block_string_contents(&block, path_style);
586 let combined = format!("{}\n{}", existing_content, new_content);
587 *self = Self::create_markdown_block(combined, language_registry, cx);
588 }
589 (ContentBlock::Image { .. }, _) => {
590 let new_content = Self::block_string_contents(&block, path_style);
591 let combined = format!("`Image`\n{}", new_content);
592 *self = Self::create_markdown_block(combined, language_registry, cx);
593 }
594 }
595 }
596
597 fn decode_image(image_content: &acp::ImageContent) -> Option<Arc<gpui::Image>> {
598 use base64::Engine as _;
599
600 let bytes = base64::engine::general_purpose::STANDARD
601 .decode(image_content.data.as_bytes())
602 .ok()?;
603 let format = gpui::ImageFormat::from_mime_type(&image_content.mime_type)?;
604 Some(Arc::new(gpui::Image::from_bytes(format, bytes)))
605 }
606
607 fn create_markdown_block(
608 content: String,
609 language_registry: &Arc<LanguageRegistry>,
610 cx: &mut App,
611 ) -> ContentBlock {
612 ContentBlock::Markdown {
613 markdown: cx
614 .new(|cx| Markdown::new(content.into(), Some(language_registry.clone()), None, cx)),
615 }
616 }
617
618 fn block_string_contents(block: &acp::ContentBlock, path_style: PathStyle) -> String {
619 match block {
620 acp::ContentBlock::Text(text_content) => text_content.text.clone(),
621 acp::ContentBlock::ResourceLink(resource_link) => {
622 Self::resource_link_md(&resource_link.uri, path_style)
623 }
624 acp::ContentBlock::Resource(acp::EmbeddedResource {
625 resource:
626 acp::EmbeddedResourceResource::TextResourceContents(acp::TextResourceContents {
627 uri,
628 ..
629 }),
630 ..
631 }) => Self::resource_link_md(uri, path_style),
632 acp::ContentBlock::Image(image) => Self::image_md(image),
633 _ => String::new(),
634 }
635 }
636
637 fn resource_link_md(uri: &str, path_style: PathStyle) -> String {
638 if let Some(uri) = MentionUri::parse(uri, path_style).log_err() {
639 uri.as_link().to_string()
640 } else {
641 uri.to_string()
642 }
643 }
644
645 fn image_md(_image: &acp::ImageContent) -> String {
646 "`Image`".into()
647 }
648
649 pub fn to_markdown<'a>(&'a self, cx: &'a App) -> &'a str {
650 match self {
651 ContentBlock::Empty => "",
652 ContentBlock::Markdown { markdown } => markdown.read(cx).source(),
653 ContentBlock::ResourceLink { resource_link } => &resource_link.uri,
654 ContentBlock::Image { .. } => "`Image`",
655 }
656 }
657
658 pub fn markdown(&self) -> Option<&Entity<Markdown>> {
659 match self {
660 ContentBlock::Empty => None,
661 ContentBlock::Markdown { markdown } => Some(markdown),
662 ContentBlock::ResourceLink { .. } => None,
663 ContentBlock::Image { .. } => None,
664 }
665 }
666
667 pub fn resource_link(&self) -> Option<&acp::ResourceLink> {
668 match self {
669 ContentBlock::ResourceLink { resource_link } => Some(resource_link),
670 _ => None,
671 }
672 }
673
674 pub fn image(&self) -> Option<&Arc<gpui::Image>> {
675 match self {
676 ContentBlock::Image { image } => Some(image),
677 _ => None,
678 }
679 }
680}
681
682#[derive(Debug)]
683pub enum ToolCallContent {
684 ContentBlock(ContentBlock),
685 Diff(Entity<Diff>),
686 Terminal(Entity<Terminal>),
687 SubagentThread(Entity<AcpThread>),
688}
689
690impl ToolCallContent {
691 pub fn from_acp(
692 content: acp::ToolCallContent,
693 language_registry: Arc<LanguageRegistry>,
694 path_style: PathStyle,
695 terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
696 cx: &mut App,
697 ) -> Result<Option<Self>> {
698 match content {
699 acp::ToolCallContent::Content(acp::Content { content, .. }) => {
700 Ok(Some(Self::ContentBlock(ContentBlock::new(
701 content,
702 &language_registry,
703 path_style,
704 cx,
705 ))))
706 }
707 acp::ToolCallContent::Diff(diff) => Ok(Some(Self::Diff(cx.new(|cx| {
708 Diff::finalized(
709 diff.path.to_string_lossy().into_owned(),
710 diff.old_text,
711 diff.new_text,
712 language_registry,
713 cx,
714 )
715 })))),
716 acp::ToolCallContent::Terminal(acp::Terminal { terminal_id, .. }) => terminals
717 .get(&terminal_id)
718 .cloned()
719 .map(|terminal| Some(Self::Terminal(terminal)))
720 .ok_or_else(|| anyhow::anyhow!("Terminal with id `{}` not found", terminal_id)),
721 _ => Ok(None),
722 }
723 }
724
725 pub fn update_from_acp(
726 &mut self,
727 new: acp::ToolCallContent,
728 language_registry: Arc<LanguageRegistry>,
729 path_style: PathStyle,
730 terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
731 cx: &mut App,
732 ) -> Result<bool> {
733 let needs_update = match (&self, &new) {
734 (Self::Diff(old_diff), acp::ToolCallContent::Diff(new_diff)) => {
735 old_diff.read(cx).needs_update(
736 new_diff.old_text.as_deref().unwrap_or(""),
737 &new_diff.new_text,
738 cx,
739 )
740 }
741 _ => true,
742 };
743
744 if let Some(update) = Self::from_acp(new, language_registry, path_style, terminals, cx)? {
745 if needs_update {
746 *self = update;
747 }
748 Ok(true)
749 } else {
750 Ok(false)
751 }
752 }
753
754 pub fn to_markdown(&self, cx: &App) -> String {
755 match self {
756 Self::ContentBlock(content) => content.to_markdown(cx).to_string(),
757 Self::Diff(diff) => diff.read(cx).to_markdown(cx),
758 Self::Terminal(terminal) => terminal.read(cx).to_markdown(cx),
759 Self::SubagentThread(thread) => thread.read(cx).to_markdown(cx),
760 }
761 }
762
763 pub fn image(&self) -> Option<&Arc<gpui::Image>> {
764 match self {
765 Self::ContentBlock(content) => content.image(),
766 _ => None,
767 }
768 }
769
770 pub fn subagent_thread(&self) -> Option<&Entity<AcpThread>> {
771 match self {
772 Self::SubagentThread(thread) => Some(thread),
773 _ => None,
774 }
775 }
776}
777
778#[derive(Debug, PartialEq)]
779pub enum ToolCallUpdate {
780 UpdateFields(acp::ToolCallUpdate),
781 UpdateDiff(ToolCallUpdateDiff),
782 UpdateTerminal(ToolCallUpdateTerminal),
783 UpdateSubagentThread(ToolCallUpdateSubagentThread),
784}
785
786impl ToolCallUpdate {
787 fn id(&self) -> &acp::ToolCallId {
788 match self {
789 Self::UpdateFields(update) => &update.tool_call_id,
790 Self::UpdateDiff(diff) => &diff.id,
791 Self::UpdateTerminal(terminal) => &terminal.id,
792 Self::UpdateSubagentThread(subagent) => &subagent.id,
793 }
794 }
795}
796
797impl From<acp::ToolCallUpdate> for ToolCallUpdate {
798 fn from(update: acp::ToolCallUpdate) -> Self {
799 Self::UpdateFields(update)
800 }
801}
802
803impl From<ToolCallUpdateDiff> for ToolCallUpdate {
804 fn from(diff: ToolCallUpdateDiff) -> Self {
805 Self::UpdateDiff(diff)
806 }
807}
808
809#[derive(Debug, PartialEq)]
810pub struct ToolCallUpdateDiff {
811 pub id: acp::ToolCallId,
812 pub diff: Entity<Diff>,
813}
814
815impl From<ToolCallUpdateTerminal> for ToolCallUpdate {
816 fn from(terminal: ToolCallUpdateTerminal) -> Self {
817 Self::UpdateTerminal(terminal)
818 }
819}
820
821#[derive(Debug, PartialEq)]
822pub struct ToolCallUpdateTerminal {
823 pub id: acp::ToolCallId,
824 pub terminal: Entity<Terminal>,
825}
826
827impl From<ToolCallUpdateSubagentThread> for ToolCallUpdate {
828 fn from(subagent: ToolCallUpdateSubagentThread) -> Self {
829 Self::UpdateSubagentThread(subagent)
830 }
831}
832
833#[derive(Debug, PartialEq)]
834pub struct ToolCallUpdateSubagentThread {
835 pub id: acp::ToolCallId,
836 pub thread: Entity<AcpThread>,
837}
838
839#[derive(Debug, Default)]
840pub struct Plan {
841 pub entries: Vec<PlanEntry>,
842}
843
844#[derive(Debug)]
845pub struct PlanStats<'a> {
846 pub in_progress_entry: Option<&'a PlanEntry>,
847 pub pending: u32,
848 pub completed: u32,
849}
850
851impl Plan {
852 pub fn is_empty(&self) -> bool {
853 self.entries.is_empty()
854 }
855
856 pub fn stats(&self) -> PlanStats<'_> {
857 let mut stats = PlanStats {
858 in_progress_entry: None,
859 pending: 0,
860 completed: 0,
861 };
862
863 for entry in &self.entries {
864 match &entry.status {
865 acp::PlanEntryStatus::Pending => {
866 stats.pending += 1;
867 }
868 acp::PlanEntryStatus::InProgress => {
869 stats.in_progress_entry = stats.in_progress_entry.or(Some(entry));
870 }
871 acp::PlanEntryStatus::Completed => {
872 stats.completed += 1;
873 }
874 _ => {}
875 }
876 }
877
878 stats
879 }
880}
881
882#[derive(Debug)]
883pub struct PlanEntry {
884 pub content: Entity<Markdown>,
885 pub priority: acp::PlanEntryPriority,
886 pub status: acp::PlanEntryStatus,
887}
888
889impl PlanEntry {
890 pub fn from_acp(entry: acp::PlanEntry, cx: &mut App) -> Self {
891 Self {
892 content: cx.new(|cx| Markdown::new(entry.content.into(), None, None, cx)),
893 priority: entry.priority,
894 status: entry.status,
895 }
896 }
897}
898
899#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
900pub struct TokenUsage {
901 pub max_tokens: u64,
902 pub used_tokens: u64,
903 pub output_tokens: u64,
904}
905
906impl TokenUsage {
907 pub fn ratio(&self) -> TokenUsageRatio {
908 #[cfg(debug_assertions)]
909 let warning_threshold: f32 = std::env::var("ZED_THREAD_WARNING_THRESHOLD")
910 .unwrap_or("0.8".to_string())
911 .parse()
912 .unwrap();
913 #[cfg(not(debug_assertions))]
914 let warning_threshold: f32 = 0.8;
915
916 // When the maximum is unknown because there is no selected model,
917 // avoid showing the token limit warning.
918 if self.max_tokens == 0 {
919 TokenUsageRatio::Normal
920 } else if self.used_tokens >= self.max_tokens {
921 TokenUsageRatio::Exceeded
922 } else if self.used_tokens as f32 / self.max_tokens as f32 >= warning_threshold {
923 TokenUsageRatio::Warning
924 } else {
925 TokenUsageRatio::Normal
926 }
927 }
928}
929
930#[derive(Debug, Clone, PartialEq, Eq)]
931pub enum TokenUsageRatio {
932 Normal,
933 Warning,
934 Exceeded,
935}
936
937#[derive(Debug, Clone)]
938pub struct RetryStatus {
939 pub last_error: SharedString,
940 pub attempt: usize,
941 pub max_attempts: usize,
942 pub started_at: Instant,
943 pub duration: Duration,
944}
945
946pub struct AcpThread {
947 title: SharedString,
948 entries: Vec<AgentThreadEntry>,
949 plan: Plan,
950 project: Entity<Project>,
951 action_log: Entity<ActionLog>,
952 shared_buffers: HashMap<Entity<Buffer>, BufferSnapshot>,
953 send_task: Option<Task<()>>,
954 connection: Rc<dyn AgentConnection>,
955 session_id: acp::SessionId,
956 token_usage: Option<TokenUsage>,
957 prompt_capabilities: acp::PromptCapabilities,
958 _observe_prompt_capabilities: Task<anyhow::Result<()>>,
959 terminals: HashMap<acp::TerminalId, Entity<Terminal>>,
960 pending_terminal_output: HashMap<acp::TerminalId, Vec<Vec<u8>>>,
961 pending_terminal_exit: HashMap<acp::TerminalId, acp::TerminalExitStatus>,
962}
963
964impl From<&AcpThread> for ActionLogTelemetry {
965 fn from(value: &AcpThread) -> Self {
966 Self {
967 agent_telemetry_id: value.connection().telemetry_id(),
968 session_id: value.session_id.0.clone(),
969 }
970 }
971}
972
973#[derive(Debug)]
974pub enum AcpThreadEvent {
975 NewEntry,
976 TitleUpdated,
977 TokenUsageUpdated,
978 EntryUpdated(usize),
979 EntriesRemoved(Range<usize>),
980 ToolAuthorizationRequired,
981 Retry(RetryStatus),
982 Stopped,
983 Error,
984 LoadError(LoadError),
985 PromptCapabilitiesUpdated,
986 Refusal,
987 AvailableCommandsUpdated(Vec<acp::AvailableCommand>),
988 ModeUpdated(acp::SessionModeId),
989 ConfigOptionsUpdated(Vec<acp::SessionConfigOption>),
990}
991
992impl EventEmitter<AcpThreadEvent> for AcpThread {}
993
994#[derive(Debug, Clone)]
995pub enum TerminalProviderEvent {
996 Created {
997 terminal_id: acp::TerminalId,
998 label: String,
999 cwd: Option<PathBuf>,
1000 output_byte_limit: Option<u64>,
1001 terminal: Entity<::terminal::Terminal>,
1002 },
1003 Output {
1004 terminal_id: acp::TerminalId,
1005 data: Vec<u8>,
1006 },
1007 TitleChanged {
1008 terminal_id: acp::TerminalId,
1009 title: String,
1010 },
1011 Exit {
1012 terminal_id: acp::TerminalId,
1013 status: acp::TerminalExitStatus,
1014 },
1015}
1016
1017#[derive(Debug, Clone)]
1018pub enum TerminalProviderCommand {
1019 WriteInput {
1020 terminal_id: acp::TerminalId,
1021 bytes: Vec<u8>,
1022 },
1023 Resize {
1024 terminal_id: acp::TerminalId,
1025 cols: u16,
1026 rows: u16,
1027 },
1028 Close {
1029 terminal_id: acp::TerminalId,
1030 },
1031}
1032
1033impl AcpThread {
1034 pub fn on_terminal_provider_event(
1035 &mut self,
1036 event: TerminalProviderEvent,
1037 cx: &mut Context<Self>,
1038 ) {
1039 match event {
1040 TerminalProviderEvent::Created {
1041 terminal_id,
1042 label,
1043 cwd,
1044 output_byte_limit,
1045 terminal,
1046 } => {
1047 let entity = self.register_terminal_created(
1048 terminal_id.clone(),
1049 label,
1050 cwd,
1051 output_byte_limit,
1052 terminal,
1053 cx,
1054 );
1055
1056 if let Some(mut chunks) = self.pending_terminal_output.remove(&terminal_id) {
1057 for data in chunks.drain(..) {
1058 entity.update(cx, |term, cx| {
1059 term.inner().update(cx, |inner, cx| {
1060 inner.write_output(&data, cx);
1061 })
1062 });
1063 }
1064 }
1065
1066 if let Some(_status) = self.pending_terminal_exit.remove(&terminal_id) {
1067 entity.update(cx, |_term, cx| {
1068 cx.notify();
1069 });
1070 }
1071
1072 cx.notify();
1073 }
1074 TerminalProviderEvent::Output { terminal_id, data } => {
1075 if let Some(entity) = self.terminals.get(&terminal_id) {
1076 entity.update(cx, |term, cx| {
1077 term.inner().update(cx, |inner, cx| {
1078 inner.write_output(&data, cx);
1079 })
1080 });
1081 } else {
1082 self.pending_terminal_output
1083 .entry(terminal_id)
1084 .or_default()
1085 .push(data);
1086 }
1087 }
1088 TerminalProviderEvent::TitleChanged { terminal_id, title } => {
1089 if let Some(entity) = self.terminals.get(&terminal_id) {
1090 entity.update(cx, |term, cx| {
1091 term.inner().update(cx, |inner, cx| {
1092 inner.breadcrumb_text = title;
1093 cx.emit(::terminal::Event::BreadcrumbsChanged);
1094 })
1095 });
1096 }
1097 }
1098 TerminalProviderEvent::Exit {
1099 terminal_id,
1100 status,
1101 } => {
1102 if let Some(entity) = self.terminals.get(&terminal_id) {
1103 entity.update(cx, |_term, cx| {
1104 cx.notify();
1105 });
1106 } else {
1107 self.pending_terminal_exit.insert(terminal_id, status);
1108 }
1109 }
1110 }
1111 }
1112}
1113
1114#[derive(PartialEq, Eq, Debug)]
1115pub enum ThreadStatus {
1116 Idle,
1117 Generating,
1118}
1119
1120#[derive(Debug, Clone)]
1121pub enum LoadError {
1122 Unsupported {
1123 command: SharedString,
1124 current_version: SharedString,
1125 minimum_version: SharedString,
1126 },
1127 FailedToInstall(SharedString),
1128 Exited {
1129 status: ExitStatus,
1130 },
1131 Other(SharedString),
1132}
1133
1134impl Display for LoadError {
1135 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1136 match self {
1137 LoadError::Unsupported {
1138 command: path,
1139 current_version,
1140 minimum_version,
1141 } => {
1142 write!(
1143 f,
1144 "version {current_version} from {path} is not supported (need at least {minimum_version})"
1145 )
1146 }
1147 LoadError::FailedToInstall(msg) => write!(f, "Failed to install: {msg}"),
1148 LoadError::Exited { status } => write!(f, "Server exited with status {status}"),
1149 LoadError::Other(msg) => write!(f, "{msg}"),
1150 }
1151 }
1152}
1153
1154impl Error for LoadError {}
1155
1156impl AcpThread {
1157 pub fn new(
1158 title: impl Into<SharedString>,
1159 connection: Rc<dyn AgentConnection>,
1160 project: Entity<Project>,
1161 action_log: Entity<ActionLog>,
1162 session_id: acp::SessionId,
1163 mut prompt_capabilities_rx: watch::Receiver<acp::PromptCapabilities>,
1164 cx: &mut Context<Self>,
1165 ) -> Self {
1166 let prompt_capabilities = prompt_capabilities_rx.borrow().clone();
1167 let task = cx.spawn::<_, anyhow::Result<()>>(async move |this, cx| {
1168 loop {
1169 let caps = prompt_capabilities_rx.recv().await?;
1170 this.update(cx, |this, cx| {
1171 this.prompt_capabilities = caps;
1172 cx.emit(AcpThreadEvent::PromptCapabilitiesUpdated);
1173 })?;
1174 }
1175 });
1176
1177 Self {
1178 action_log,
1179 shared_buffers: Default::default(),
1180 entries: Default::default(),
1181 plan: Default::default(),
1182 title: title.into(),
1183 project,
1184 send_task: None,
1185 connection,
1186 session_id,
1187 token_usage: None,
1188 prompt_capabilities,
1189 _observe_prompt_capabilities: task,
1190 terminals: HashMap::default(),
1191 pending_terminal_output: HashMap::default(),
1192 pending_terminal_exit: HashMap::default(),
1193 }
1194 }
1195
1196 pub fn prompt_capabilities(&self) -> acp::PromptCapabilities {
1197 self.prompt_capabilities.clone()
1198 }
1199
1200 pub fn connection(&self) -> &Rc<dyn AgentConnection> {
1201 &self.connection
1202 }
1203
1204 pub fn action_log(&self) -> &Entity<ActionLog> {
1205 &self.action_log
1206 }
1207
1208 pub fn project(&self) -> &Entity<Project> {
1209 &self.project
1210 }
1211
1212 pub fn title(&self) -> SharedString {
1213 self.title.clone()
1214 }
1215
1216 pub fn entries(&self) -> &[AgentThreadEntry] {
1217 &self.entries
1218 }
1219
1220 pub fn session_id(&self) -> &acp::SessionId {
1221 &self.session_id
1222 }
1223
1224 pub fn status(&self) -> ThreadStatus {
1225 if self.send_task.is_some() {
1226 ThreadStatus::Generating
1227 } else {
1228 ThreadStatus::Idle
1229 }
1230 }
1231
1232 pub fn token_usage(&self) -> Option<&TokenUsage> {
1233 self.token_usage.as_ref()
1234 }
1235
1236 pub fn has_pending_edit_tool_calls(&self) -> bool {
1237 for entry in self.entries.iter().rev() {
1238 match entry {
1239 AgentThreadEntry::UserMessage(_) => return false,
1240 AgentThreadEntry::ToolCall(
1241 call @ ToolCall {
1242 status: ToolCallStatus::InProgress | ToolCallStatus::Pending,
1243 ..
1244 },
1245 ) if call.diffs().next().is_some() => {
1246 return true;
1247 }
1248 AgentThreadEntry::ToolCall(_) | AgentThreadEntry::AssistantMessage(_) => {}
1249 }
1250 }
1251
1252 false
1253 }
1254
1255 pub fn has_in_progress_tool_calls(&self) -> bool {
1256 for entry in self.entries.iter().rev() {
1257 match entry {
1258 AgentThreadEntry::UserMessage(_) => return false,
1259 AgentThreadEntry::ToolCall(ToolCall {
1260 status: ToolCallStatus::InProgress | ToolCallStatus::Pending,
1261 ..
1262 }) => {
1263 return true;
1264 }
1265 AgentThreadEntry::ToolCall(_) | AgentThreadEntry::AssistantMessage(_) => {}
1266 }
1267 }
1268
1269 false
1270 }
1271
1272 pub fn used_tools_since_last_user_message(&self) -> bool {
1273 for entry in self.entries.iter().rev() {
1274 match entry {
1275 AgentThreadEntry::UserMessage(..) => return false,
1276 AgentThreadEntry::AssistantMessage(..) => continue,
1277 AgentThreadEntry::ToolCall(..) => return true,
1278 }
1279 }
1280
1281 false
1282 }
1283
1284 pub fn handle_session_update(
1285 &mut self,
1286 update: acp::SessionUpdate,
1287 cx: &mut Context<Self>,
1288 ) -> Result<(), acp::Error> {
1289 match update {
1290 acp::SessionUpdate::UserMessageChunk(acp::ContentChunk { content, .. }) => {
1291 self.push_user_content_block(None, content, cx);
1292 }
1293 acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk { content, .. }) => {
1294 self.push_assistant_content_block(content, false, cx);
1295 }
1296 acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk { content, .. }) => {
1297 self.push_assistant_content_block(content, true, cx);
1298 }
1299 acp::SessionUpdate::ToolCall(tool_call) => {
1300 self.upsert_tool_call(tool_call, cx)?;
1301 }
1302 acp::SessionUpdate::ToolCallUpdate(tool_call_update) => {
1303 self.update_tool_call(tool_call_update, cx)?;
1304 }
1305 acp::SessionUpdate::Plan(plan) => {
1306 self.update_plan(plan, cx);
1307 }
1308 acp::SessionUpdate::AvailableCommandsUpdate(acp::AvailableCommandsUpdate {
1309 available_commands,
1310 ..
1311 }) => cx.emit(AcpThreadEvent::AvailableCommandsUpdated(available_commands)),
1312 acp::SessionUpdate::CurrentModeUpdate(acp::CurrentModeUpdate {
1313 current_mode_id,
1314 ..
1315 }) => cx.emit(AcpThreadEvent::ModeUpdated(current_mode_id)),
1316 acp::SessionUpdate::ConfigOptionUpdate(acp::ConfigOptionUpdate {
1317 config_options,
1318 ..
1319 }) => cx.emit(AcpThreadEvent::ConfigOptionsUpdated(config_options)),
1320 _ => {}
1321 }
1322 Ok(())
1323 }
1324
1325 pub fn push_user_content_block(
1326 &mut self,
1327 message_id: Option<UserMessageId>,
1328 chunk: acp::ContentBlock,
1329 cx: &mut Context<Self>,
1330 ) {
1331 self.push_user_content_block_with_indent(message_id, chunk, false, cx)
1332 }
1333
1334 pub fn push_user_content_block_with_indent(
1335 &mut self,
1336 message_id: Option<UserMessageId>,
1337 chunk: acp::ContentBlock,
1338 indented: bool,
1339 cx: &mut Context<Self>,
1340 ) {
1341 let language_registry = self.project.read(cx).languages().clone();
1342 let path_style = self.project.read(cx).path_style(cx);
1343 let entries_len = self.entries.len();
1344
1345 if let Some(last_entry) = self.entries.last_mut()
1346 && let AgentThreadEntry::UserMessage(UserMessage {
1347 id,
1348 content,
1349 chunks,
1350 indented: existing_indented,
1351 ..
1352 }) = last_entry
1353 && *existing_indented == indented
1354 {
1355 *id = message_id.or(id.take());
1356 content.append(chunk.clone(), &language_registry, path_style, cx);
1357 chunks.push(chunk);
1358 let idx = entries_len - 1;
1359 cx.emit(AcpThreadEvent::EntryUpdated(idx));
1360 } else {
1361 let content = ContentBlock::new(chunk.clone(), &language_registry, path_style, cx);
1362 self.push_entry(
1363 AgentThreadEntry::UserMessage(UserMessage {
1364 id: message_id,
1365 content,
1366 chunks: vec![chunk],
1367 checkpoint: None,
1368 indented,
1369 }),
1370 cx,
1371 );
1372 }
1373 }
1374
1375 pub fn push_assistant_content_block(
1376 &mut self,
1377 chunk: acp::ContentBlock,
1378 is_thought: bool,
1379 cx: &mut Context<Self>,
1380 ) {
1381 self.push_assistant_content_block_with_indent(chunk, is_thought, false, cx)
1382 }
1383
1384 pub fn push_assistant_content_block_with_indent(
1385 &mut self,
1386 chunk: acp::ContentBlock,
1387 is_thought: bool,
1388 indented: bool,
1389 cx: &mut Context<Self>,
1390 ) {
1391 let language_registry = self.project.read(cx).languages().clone();
1392 let path_style = self.project.read(cx).path_style(cx);
1393 let entries_len = self.entries.len();
1394 if let Some(last_entry) = self.entries.last_mut()
1395 && let AgentThreadEntry::AssistantMessage(AssistantMessage {
1396 chunks,
1397 indented: existing_indented,
1398 }) = last_entry
1399 && *existing_indented == indented
1400 {
1401 let idx = entries_len - 1;
1402 cx.emit(AcpThreadEvent::EntryUpdated(idx));
1403 match (chunks.last_mut(), is_thought) {
1404 (Some(AssistantMessageChunk::Message { block }), false)
1405 | (Some(AssistantMessageChunk::Thought { block }), true) => {
1406 block.append(chunk, &language_registry, path_style, cx)
1407 }
1408 _ => {
1409 let block = ContentBlock::new(chunk, &language_registry, path_style, cx);
1410 if is_thought {
1411 chunks.push(AssistantMessageChunk::Thought { block })
1412 } else {
1413 chunks.push(AssistantMessageChunk::Message { block })
1414 }
1415 }
1416 }
1417 } else {
1418 let block = ContentBlock::new(chunk, &language_registry, path_style, cx);
1419 let chunk = if is_thought {
1420 AssistantMessageChunk::Thought { block }
1421 } else {
1422 AssistantMessageChunk::Message { block }
1423 };
1424
1425 self.push_entry(
1426 AgentThreadEntry::AssistantMessage(AssistantMessage {
1427 chunks: vec![chunk],
1428 indented,
1429 }),
1430 cx,
1431 );
1432 }
1433 }
1434
1435 fn push_entry(&mut self, entry: AgentThreadEntry, cx: &mut Context<Self>) {
1436 self.entries.push(entry);
1437 cx.emit(AcpThreadEvent::NewEntry);
1438 }
1439
1440 pub fn can_set_title(&mut self, cx: &mut Context<Self>) -> bool {
1441 self.connection.set_title(&self.session_id, cx).is_some()
1442 }
1443
1444 pub fn set_title(&mut self, title: SharedString, cx: &mut Context<Self>) -> Task<Result<()>> {
1445 if title != self.title {
1446 self.title = title.clone();
1447 cx.emit(AcpThreadEvent::TitleUpdated);
1448 if let Some(set_title) = self.connection.set_title(&self.session_id, cx) {
1449 return set_title.run(title, cx);
1450 }
1451 }
1452 Task::ready(Ok(()))
1453 }
1454
1455 pub fn update_token_usage(&mut self, usage: Option<TokenUsage>, cx: &mut Context<Self>) {
1456 self.token_usage = usage;
1457 cx.emit(AcpThreadEvent::TokenUsageUpdated);
1458 }
1459
1460 pub fn update_retry_status(&mut self, status: RetryStatus, cx: &mut Context<Self>) {
1461 cx.emit(AcpThreadEvent::Retry(status));
1462 }
1463
1464 pub fn update_tool_call(
1465 &mut self,
1466 update: impl Into<ToolCallUpdate>,
1467 cx: &mut Context<Self>,
1468 ) -> Result<()> {
1469 let update = update.into();
1470 let languages = self.project.read(cx).languages().clone();
1471 let path_style = self.project.read(cx).path_style(cx);
1472
1473 let ix = match self.index_for_tool_call(update.id()) {
1474 Some(ix) => ix,
1475 None => {
1476 // Tool call not found - create a failed tool call entry
1477 let failed_tool_call = ToolCall {
1478 id: update.id().clone(),
1479 label: cx.new(|cx| Markdown::new("Tool call not found".into(), None, None, cx)),
1480 kind: acp::ToolKind::Fetch,
1481 content: vec![ToolCallContent::ContentBlock(ContentBlock::new(
1482 "Tool call not found".into(),
1483 &languages,
1484 path_style,
1485 cx,
1486 ))],
1487 status: ToolCallStatus::Failed,
1488 locations: Vec::new(),
1489 resolved_locations: Vec::new(),
1490 raw_input: None,
1491 raw_input_markdown: None,
1492 raw_output: None,
1493 tool_name: None,
1494 };
1495 self.push_entry(AgentThreadEntry::ToolCall(failed_tool_call), cx);
1496 return Ok(());
1497 }
1498 };
1499 let AgentThreadEntry::ToolCall(call) = &mut self.entries[ix] else {
1500 unreachable!()
1501 };
1502
1503 match update {
1504 ToolCallUpdate::UpdateFields(update) => {
1505 let location_updated = update.fields.locations.is_some();
1506 call.update_fields(update.fields, languages, path_style, &self.terminals, cx)?;
1507 if location_updated {
1508 self.resolve_locations(update.tool_call_id, cx);
1509 }
1510 }
1511 ToolCallUpdate::UpdateDiff(update) => {
1512 call.content.clear();
1513 call.content.push(ToolCallContent::Diff(update.diff));
1514 }
1515 ToolCallUpdate::UpdateTerminal(update) => {
1516 call.content.clear();
1517 call.content
1518 .push(ToolCallContent::Terminal(update.terminal));
1519 }
1520 ToolCallUpdate::UpdateSubagentThread(update) => {
1521 call.content.clear();
1522 call.content
1523 .push(ToolCallContent::SubagentThread(update.thread));
1524 }
1525 }
1526
1527 cx.emit(AcpThreadEvent::EntryUpdated(ix));
1528
1529 Ok(())
1530 }
1531
1532 /// Updates a tool call if id matches an existing entry, otherwise inserts a new one.
1533 pub fn upsert_tool_call(
1534 &mut self,
1535 tool_call: acp::ToolCall,
1536 cx: &mut Context<Self>,
1537 ) -> Result<(), acp::Error> {
1538 let status = tool_call.status.into();
1539 self.upsert_tool_call_inner(tool_call.into(), status, cx)
1540 }
1541
1542 /// Fails if id does not match an existing entry.
1543 pub fn upsert_tool_call_inner(
1544 &mut self,
1545 update: acp::ToolCallUpdate,
1546 status: ToolCallStatus,
1547 cx: &mut Context<Self>,
1548 ) -> Result<(), acp::Error> {
1549 let language_registry = self.project.read(cx).languages().clone();
1550 let path_style = self.project.read(cx).path_style(cx);
1551 let id = update.tool_call_id.clone();
1552
1553 let agent_telemetry_id = self.connection().telemetry_id();
1554 let session = self.session_id();
1555 if let ToolCallStatus::Completed | ToolCallStatus::Failed = status {
1556 let status = if matches!(status, ToolCallStatus::Completed) {
1557 "completed"
1558 } else {
1559 "failed"
1560 };
1561 telemetry::event!(
1562 "Agent Tool Call Completed",
1563 agent_telemetry_id,
1564 session,
1565 status
1566 );
1567 }
1568
1569 if let Some(ix) = self.index_for_tool_call(&id) {
1570 let AgentThreadEntry::ToolCall(call) = &mut self.entries[ix] else {
1571 unreachable!()
1572 };
1573
1574 call.update_fields(
1575 update.fields,
1576 language_registry,
1577 path_style,
1578 &self.terminals,
1579 cx,
1580 )?;
1581 call.status = status;
1582
1583 cx.emit(AcpThreadEvent::EntryUpdated(ix));
1584 } else {
1585 let call = ToolCall::from_acp(
1586 update.try_into()?,
1587 status,
1588 language_registry,
1589 self.project.read(cx).path_style(cx),
1590 &self.terminals,
1591 cx,
1592 )?;
1593 self.push_entry(AgentThreadEntry::ToolCall(call), cx);
1594 };
1595
1596 self.resolve_locations(id, cx);
1597 Ok(())
1598 }
1599
1600 fn index_for_tool_call(&self, id: &acp::ToolCallId) -> Option<usize> {
1601 self.entries
1602 .iter()
1603 .enumerate()
1604 .rev()
1605 .find_map(|(index, entry)| {
1606 if let AgentThreadEntry::ToolCall(tool_call) = entry
1607 && &tool_call.id == id
1608 {
1609 Some(index)
1610 } else {
1611 None
1612 }
1613 })
1614 }
1615
1616 fn tool_call_mut(&mut self, id: &acp::ToolCallId) -> Option<(usize, &mut ToolCall)> {
1617 // The tool call we are looking for is typically the last one, or very close to the end.
1618 // At the moment, it doesn't seem like a hashmap would be a good fit for this use case.
1619 self.entries
1620 .iter_mut()
1621 .enumerate()
1622 .rev()
1623 .find_map(|(index, tool_call)| {
1624 if let AgentThreadEntry::ToolCall(tool_call) = tool_call
1625 && &tool_call.id == id
1626 {
1627 Some((index, tool_call))
1628 } else {
1629 None
1630 }
1631 })
1632 }
1633
1634 pub fn tool_call(&mut self, id: &acp::ToolCallId) -> Option<(usize, &ToolCall)> {
1635 self.entries
1636 .iter()
1637 .enumerate()
1638 .rev()
1639 .find_map(|(index, tool_call)| {
1640 if let AgentThreadEntry::ToolCall(tool_call) = tool_call
1641 && &tool_call.id == id
1642 {
1643 Some((index, tool_call))
1644 } else {
1645 None
1646 }
1647 })
1648 }
1649
1650 pub fn resolve_locations(&mut self, id: acp::ToolCallId, cx: &mut Context<Self>) {
1651 let project = self.project.clone();
1652 let Some((_, tool_call)) = self.tool_call_mut(&id) else {
1653 return;
1654 };
1655 let task = tool_call.resolve_locations(project, cx);
1656 cx.spawn(async move |this, cx| {
1657 let resolved_locations = task.await;
1658
1659 this.update(cx, |this, cx| {
1660 let project = this.project.clone();
1661
1662 for location in resolved_locations.iter().flatten() {
1663 this.shared_buffers
1664 .insert(location.buffer.clone(), location.buffer.read(cx).snapshot());
1665 }
1666 let Some((ix, tool_call)) = this.tool_call_mut(&id) else {
1667 return;
1668 };
1669
1670 if let Some(Some(location)) = resolved_locations.last() {
1671 project.update(cx, |project, cx| {
1672 let should_ignore = if let Some(agent_location) = project
1673 .agent_location()
1674 .filter(|agent_location| agent_location.buffer == location.buffer)
1675 {
1676 let snapshot = location.buffer.read(cx).snapshot();
1677 let old_position = agent_location.position.to_point(&snapshot);
1678 let new_position = location.position.to_point(&snapshot);
1679
1680 // ignore this so that when we get updates from the edit tool
1681 // the position doesn't reset to the startof line
1682 old_position.row == new_position.row
1683 && old_position.column > new_position.column
1684 } else {
1685 false
1686 };
1687 if !should_ignore {
1688 project.set_agent_location(Some(location.into()), cx);
1689 }
1690 });
1691 }
1692
1693 let resolved_locations = resolved_locations
1694 .iter()
1695 .map(|l| l.as_ref().map(|l| AgentLocation::from(l)))
1696 .collect::<Vec<_>>();
1697
1698 if tool_call.resolved_locations != resolved_locations {
1699 tool_call.resolved_locations = resolved_locations;
1700 cx.emit(AcpThreadEvent::EntryUpdated(ix));
1701 }
1702 })
1703 })
1704 .detach();
1705 }
1706
1707 pub fn request_tool_call_authorization(
1708 &mut self,
1709 tool_call: acp::ToolCallUpdate,
1710 options: Vec<acp::PermissionOption>,
1711 respect_always_allow_setting: bool,
1712 cx: &mut Context<Self>,
1713 ) -> Result<BoxFuture<'static, acp::RequestPermissionOutcome>> {
1714 let (tx, rx) = oneshot::channel();
1715
1716 if respect_always_allow_setting && AgentSettings::get_global(cx).always_allow_tool_actions {
1717 // Don't use AllowAlways, because then if you were to turn off always_allow_tool_actions,
1718 // some tools would (incorrectly) continue to auto-accept.
1719 if let Some(allow_once_option) = options.iter().find_map(|option| {
1720 if matches!(option.kind, acp::PermissionOptionKind::AllowOnce) {
1721 Some(option.option_id.clone())
1722 } else {
1723 None
1724 }
1725 }) {
1726 self.upsert_tool_call_inner(tool_call, ToolCallStatus::Pending, cx)?;
1727 return Ok(async {
1728 acp::RequestPermissionOutcome::Selected(acp::SelectedPermissionOutcome::new(
1729 allow_once_option,
1730 ))
1731 }
1732 .boxed());
1733 }
1734 }
1735
1736 let status = ToolCallStatus::WaitingForConfirmation {
1737 options,
1738 respond_tx: tx,
1739 };
1740
1741 self.upsert_tool_call_inner(tool_call, status, cx)?;
1742 cx.emit(AcpThreadEvent::ToolAuthorizationRequired);
1743
1744 let fut = async {
1745 match rx.await {
1746 Ok(option) => acp::RequestPermissionOutcome::Selected(
1747 acp::SelectedPermissionOutcome::new(option),
1748 ),
1749 Err(oneshot::Canceled) => acp::RequestPermissionOutcome::Cancelled,
1750 }
1751 }
1752 .boxed();
1753
1754 Ok(fut)
1755 }
1756
1757 pub fn authorize_tool_call(
1758 &mut self,
1759 id: acp::ToolCallId,
1760 option_id: acp::PermissionOptionId,
1761 option_kind: acp::PermissionOptionKind,
1762 cx: &mut Context<Self>,
1763 ) {
1764 let Some((ix, call)) = self.tool_call_mut(&id) else {
1765 return;
1766 };
1767
1768 let new_status = match option_kind {
1769 acp::PermissionOptionKind::RejectOnce | acp::PermissionOptionKind::RejectAlways => {
1770 ToolCallStatus::Rejected
1771 }
1772 acp::PermissionOptionKind::AllowOnce | acp::PermissionOptionKind::AllowAlways => {
1773 ToolCallStatus::InProgress
1774 }
1775 _ => ToolCallStatus::InProgress,
1776 };
1777
1778 let curr_status = mem::replace(&mut call.status, new_status);
1779
1780 if let ToolCallStatus::WaitingForConfirmation { respond_tx, .. } = curr_status {
1781 respond_tx.send(option_id).log_err();
1782 } else if cfg!(debug_assertions) {
1783 panic!("tried to authorize an already authorized tool call");
1784 }
1785
1786 cx.emit(AcpThreadEvent::EntryUpdated(ix));
1787 }
1788
1789 pub fn first_tool_awaiting_confirmation(&self) -> Option<&ToolCall> {
1790 let mut first_tool_call = None;
1791
1792 for entry in self.entries.iter().rev() {
1793 match &entry {
1794 AgentThreadEntry::ToolCall(call) => {
1795 if let ToolCallStatus::WaitingForConfirmation { .. } = call.status {
1796 first_tool_call = Some(call);
1797 } else {
1798 continue;
1799 }
1800 }
1801 AgentThreadEntry::UserMessage(_) | AgentThreadEntry::AssistantMessage(_) => {
1802 // Reached the beginning of the turn.
1803 // If we had pending permission requests in the previous turn, they have been cancelled.
1804 break;
1805 }
1806 }
1807 }
1808
1809 first_tool_call
1810 }
1811
1812 pub fn plan(&self) -> &Plan {
1813 &self.plan
1814 }
1815
1816 pub fn update_plan(&mut self, request: acp::Plan, cx: &mut Context<Self>) {
1817 let new_entries_len = request.entries.len();
1818 let mut new_entries = request.entries.into_iter();
1819
1820 // Reuse existing markdown to prevent flickering
1821 for (old, new) in self.plan.entries.iter_mut().zip(new_entries.by_ref()) {
1822 let PlanEntry {
1823 content,
1824 priority,
1825 status,
1826 } = old;
1827 content.update(cx, |old, cx| {
1828 old.replace(new.content, cx);
1829 });
1830 *priority = new.priority;
1831 *status = new.status;
1832 }
1833 for new in new_entries {
1834 self.plan.entries.push(PlanEntry::from_acp(new, cx))
1835 }
1836 self.plan.entries.truncate(new_entries_len);
1837
1838 cx.notify();
1839 }
1840
1841 fn clear_completed_plan_entries(&mut self, cx: &mut Context<Self>) {
1842 self.plan
1843 .entries
1844 .retain(|entry| !matches!(entry.status, acp::PlanEntryStatus::Completed));
1845 cx.notify();
1846 }
1847
1848 #[cfg(any(test, feature = "test-support"))]
1849 pub fn send_raw(
1850 &mut self,
1851 message: &str,
1852 cx: &mut Context<Self>,
1853 ) -> BoxFuture<'static, Result<()>> {
1854 self.send(vec![message.into()], cx)
1855 }
1856
1857 pub fn send(
1858 &mut self,
1859 message: Vec<acp::ContentBlock>,
1860 cx: &mut Context<Self>,
1861 ) -> BoxFuture<'static, Result<()>> {
1862 let block = ContentBlock::new_combined(
1863 message.clone(),
1864 self.project.read(cx).languages().clone(),
1865 self.project.read(cx).path_style(cx),
1866 cx,
1867 );
1868 let request = acp::PromptRequest::new(self.session_id.clone(), message.clone());
1869 let git_store = self.project.read(cx).git_store().clone();
1870
1871 let message_id = if self.connection.truncate(&self.session_id, cx).is_some() {
1872 Some(UserMessageId::new())
1873 } else {
1874 None
1875 };
1876
1877 self.run_turn(cx, async move |this, cx| {
1878 this.update(cx, |this, cx| {
1879 this.push_entry(
1880 AgentThreadEntry::UserMessage(UserMessage {
1881 id: message_id.clone(),
1882 content: block,
1883 chunks: message,
1884 checkpoint: None,
1885 indented: false,
1886 }),
1887 cx,
1888 );
1889 })
1890 .ok();
1891
1892 let old_checkpoint = git_store
1893 .update(cx, |git, cx| git.checkpoint(cx))
1894 .await
1895 .context("failed to get old checkpoint")
1896 .log_err();
1897 this.update(cx, |this, cx| {
1898 if let Some((_ix, message)) = this.last_user_message() {
1899 message.checkpoint = old_checkpoint.map(|git_checkpoint| Checkpoint {
1900 git_checkpoint,
1901 show: false,
1902 });
1903 }
1904 this.connection.prompt(message_id, request, cx)
1905 })?
1906 .await
1907 })
1908 }
1909
1910 pub fn can_resume(&self, cx: &App) -> bool {
1911 self.connection.resume(&self.session_id, cx).is_some()
1912 }
1913
1914 pub fn resume(&mut self, cx: &mut Context<Self>) -> BoxFuture<'static, Result<()>> {
1915 self.run_turn(cx, async move |this, cx| {
1916 this.update(cx, |this, cx| {
1917 this.connection
1918 .resume(&this.session_id, cx)
1919 .map(|resume| resume.run(cx))
1920 })?
1921 .context("resuming a session is not supported")?
1922 .await
1923 })
1924 }
1925
1926 fn run_turn(
1927 &mut self,
1928 cx: &mut Context<Self>,
1929 f: impl 'static + AsyncFnOnce(WeakEntity<Self>, &mut AsyncApp) -> Result<acp::PromptResponse>,
1930 ) -> BoxFuture<'static, Result<()>> {
1931 self.clear_completed_plan_entries(cx);
1932
1933 let (tx, rx) = oneshot::channel();
1934 let cancel_task = self.cancel(cx);
1935
1936 self.send_task = Some(cx.spawn(async move |this, cx| {
1937 cancel_task.await;
1938 tx.send(f(this, cx).await).ok();
1939 }));
1940
1941 cx.spawn(async move |this, cx| {
1942 let response = rx.await;
1943
1944 this.update(cx, |this, cx| this.update_last_checkpoint(cx))?
1945 .await?;
1946
1947 this.update(cx, |this, cx| {
1948 this.project
1949 .update(cx, |project, cx| project.set_agent_location(None, cx));
1950 match response {
1951 Ok(Err(e)) => {
1952 this.send_task.take();
1953 cx.emit(AcpThreadEvent::Error);
1954 Err(e)
1955 }
1956 result => {
1957 let canceled = matches!(
1958 result,
1959 Ok(Ok(acp::PromptResponse {
1960 stop_reason: acp::StopReason::Cancelled,
1961 ..
1962 }))
1963 );
1964
1965 // We only take the task if the current prompt wasn't canceled.
1966 //
1967 // This prompt may have been canceled because another one was sent
1968 // while it was still generating. In these cases, dropping `send_task`
1969 // would cause the next generation to be canceled.
1970 if !canceled {
1971 this.send_task.take();
1972 }
1973
1974 // Handle refusal - distinguish between user prompt and tool call refusals
1975 if let Ok(Ok(acp::PromptResponse {
1976 stop_reason: acp::StopReason::Refusal,
1977 ..
1978 })) = result
1979 {
1980 if let Some((user_msg_ix, _)) = this.last_user_message() {
1981 // Check if there's a completed tool call with results after the last user message
1982 // This indicates the refusal is in response to tool output, not the user's prompt
1983 let has_completed_tool_call_after_user_msg =
1984 this.entries.iter().skip(user_msg_ix + 1).any(|entry| {
1985 if let AgentThreadEntry::ToolCall(tool_call) = entry {
1986 // Check if the tool call has completed and has output
1987 matches!(tool_call.status, ToolCallStatus::Completed)
1988 && tool_call.raw_output.is_some()
1989 } else {
1990 false
1991 }
1992 });
1993
1994 if has_completed_tool_call_after_user_msg {
1995 // Refusal is due to tool output - don't truncate, just notify
1996 // The model refused based on what the tool returned
1997 cx.emit(AcpThreadEvent::Refusal);
1998 } else {
1999 // User prompt was refused - truncate back to before the user message
2000 let range = user_msg_ix..this.entries.len();
2001 if range.start < range.end {
2002 this.entries.truncate(user_msg_ix);
2003 cx.emit(AcpThreadEvent::EntriesRemoved(range));
2004 }
2005 cx.emit(AcpThreadEvent::Refusal);
2006 }
2007 } else {
2008 // No user message found, treat as general refusal
2009 cx.emit(AcpThreadEvent::Refusal);
2010 }
2011 }
2012
2013 cx.emit(AcpThreadEvent::Stopped);
2014 Ok(())
2015 }
2016 }
2017 })?
2018 })
2019 .boxed()
2020 }
2021
2022 pub fn cancel(&mut self, cx: &mut Context<Self>) -> Task<()> {
2023 let Some(send_task) = self.send_task.take() else {
2024 return Task::ready(());
2025 };
2026
2027 for entry in self.entries.iter_mut() {
2028 if let AgentThreadEntry::ToolCall(call) = entry {
2029 let cancel = matches!(
2030 call.status,
2031 ToolCallStatus::Pending
2032 | ToolCallStatus::WaitingForConfirmation { .. }
2033 | ToolCallStatus::InProgress
2034 );
2035
2036 if cancel {
2037 call.status = ToolCallStatus::Canceled;
2038 }
2039 }
2040 }
2041
2042 self.connection.cancel(&self.session_id, cx);
2043
2044 // Wait for the send task to complete
2045 cx.foreground_executor().spawn(send_task)
2046 }
2047
2048 /// Restores the git working tree to the state at the given checkpoint (if one exists)
2049 pub fn restore_checkpoint(
2050 &mut self,
2051 id: UserMessageId,
2052 cx: &mut Context<Self>,
2053 ) -> Task<Result<()>> {
2054 let Some((_, message)) = self.user_message_mut(&id) else {
2055 return Task::ready(Err(anyhow!("message not found")));
2056 };
2057
2058 let checkpoint = message
2059 .checkpoint
2060 .as_ref()
2061 .map(|c| c.git_checkpoint.clone());
2062
2063 // Cancel any in-progress generation before restoring
2064 let cancel_task = self.cancel(cx);
2065 let rewind = self.rewind(id.clone(), cx);
2066 let git_store = self.project.read(cx).git_store().clone();
2067
2068 cx.spawn(async move |_, cx| {
2069 cancel_task.await;
2070 rewind.await?;
2071 if let Some(checkpoint) = checkpoint {
2072 git_store
2073 .update(cx, |git, cx| git.restore_checkpoint(checkpoint, cx))
2074 .await?;
2075 }
2076
2077 Ok(())
2078 })
2079 }
2080
2081 /// Rewinds this thread to before the entry at `index`, removing it and all
2082 /// subsequent entries while rejecting any action_log changes made from that point.
2083 /// Unlike `restore_checkpoint`, this method does not restore from git.
2084 pub fn rewind(&mut self, id: UserMessageId, cx: &mut Context<Self>) -> Task<Result<()>> {
2085 let Some(truncate) = self.connection.truncate(&self.session_id, cx) else {
2086 return Task::ready(Err(anyhow!("not supported")));
2087 };
2088
2089 let telemetry = ActionLogTelemetry::from(&*self);
2090 cx.spawn(async move |this, cx| {
2091 cx.update(|cx| truncate.run(id.clone(), cx)).await?;
2092 this.update(cx, |this, cx| {
2093 if let Some((ix, _)) = this.user_message_mut(&id) {
2094 // Collect all terminals from entries that will be removed
2095 let terminals_to_remove: Vec<acp::TerminalId> = this.entries[ix..]
2096 .iter()
2097 .flat_map(|entry| entry.terminals())
2098 .filter_map(|terminal| terminal.read(cx).id().clone().into())
2099 .collect();
2100
2101 let range = ix..this.entries.len();
2102 this.entries.truncate(ix);
2103 cx.emit(AcpThreadEvent::EntriesRemoved(range));
2104
2105 // Kill and remove the terminals
2106 for terminal_id in terminals_to_remove {
2107 if let Some(terminal) = this.terminals.remove(&terminal_id) {
2108 terminal.update(cx, |terminal, cx| {
2109 terminal.kill(cx);
2110 });
2111 }
2112 }
2113 }
2114 this.action_log().update(cx, |action_log, cx| {
2115 action_log.reject_all_edits(Some(telemetry), cx)
2116 })
2117 })?
2118 .await;
2119 Ok(())
2120 })
2121 }
2122
2123 fn update_last_checkpoint(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
2124 let git_store = self.project.read(cx).git_store().clone();
2125
2126 let Some((_, message)) = self.last_user_message() else {
2127 return Task::ready(Ok(()));
2128 };
2129 let Some(user_message_id) = message.id.clone() else {
2130 return Task::ready(Ok(()));
2131 };
2132 let Some(checkpoint) = message.checkpoint.as_ref() else {
2133 return Task::ready(Ok(()));
2134 };
2135 let old_checkpoint = checkpoint.git_checkpoint.clone();
2136
2137 let new_checkpoint = git_store.update(cx, |git, cx| git.checkpoint(cx));
2138 cx.spawn(async move |this, cx| {
2139 let Some(new_checkpoint) = new_checkpoint
2140 .await
2141 .context("failed to get new checkpoint")
2142 .log_err()
2143 else {
2144 return Ok(());
2145 };
2146
2147 let equal = git_store
2148 .update(cx, |git, cx| {
2149 git.compare_checkpoints(old_checkpoint.clone(), new_checkpoint, cx)
2150 })
2151 .await
2152 .unwrap_or(true);
2153
2154 this.update(cx, |this, cx| {
2155 if let Some((ix, message)) = this.user_message_mut(&user_message_id) {
2156 if let Some(checkpoint) = message.checkpoint.as_mut() {
2157 checkpoint.show = !equal;
2158 cx.emit(AcpThreadEvent::EntryUpdated(ix));
2159 }
2160 }
2161 })?;
2162
2163 Ok(())
2164 })
2165 }
2166
2167 fn last_user_message(&mut self) -> Option<(usize, &mut UserMessage)> {
2168 self.entries
2169 .iter_mut()
2170 .enumerate()
2171 .rev()
2172 .find_map(|(ix, entry)| {
2173 if let AgentThreadEntry::UserMessage(message) = entry {
2174 Some((ix, message))
2175 } else {
2176 None
2177 }
2178 })
2179 }
2180
2181 fn user_message_mut(&mut self, id: &UserMessageId) -> Option<(usize, &mut UserMessage)> {
2182 self.entries.iter_mut().enumerate().find_map(|(ix, entry)| {
2183 if let AgentThreadEntry::UserMessage(message) = entry {
2184 if message.id.as_ref() == Some(id) {
2185 Some((ix, message))
2186 } else {
2187 None
2188 }
2189 } else {
2190 None
2191 }
2192 })
2193 }
2194
2195 pub fn read_text_file(
2196 &self,
2197 path: PathBuf,
2198 line: Option<u32>,
2199 limit: Option<u32>,
2200 reuse_shared_snapshot: bool,
2201 cx: &mut Context<Self>,
2202 ) -> Task<Result<String, acp::Error>> {
2203 // Args are 1-based, move to 0-based
2204 let line = line.unwrap_or_default().saturating_sub(1);
2205 let limit = limit.unwrap_or(u32::MAX);
2206 let project = self.project.clone();
2207 let action_log = self.action_log.clone();
2208 cx.spawn(async move |this, cx| {
2209 let load = project.update(cx, |project, cx| {
2210 let path = project
2211 .project_path_for_absolute_path(&path, cx)
2212 .ok_or_else(|| {
2213 acp::Error::resource_not_found(Some(path.display().to_string()))
2214 })?;
2215 Ok::<_, acp::Error>(project.open_buffer(path, cx))
2216 })?;
2217
2218 let buffer = load.await?;
2219
2220 let snapshot = if reuse_shared_snapshot {
2221 this.read_with(cx, |this, _| {
2222 this.shared_buffers.get(&buffer.clone()).cloned()
2223 })
2224 .log_err()
2225 .flatten()
2226 } else {
2227 None
2228 };
2229
2230 let snapshot = if let Some(snapshot) = snapshot {
2231 snapshot
2232 } else {
2233 action_log.update(cx, |action_log, cx| {
2234 action_log.buffer_read(buffer.clone(), cx);
2235 });
2236
2237 let snapshot = buffer.update(cx, |buffer, _| buffer.snapshot());
2238 this.update(cx, |this, _| {
2239 this.shared_buffers.insert(buffer.clone(), snapshot.clone());
2240 })?;
2241 snapshot
2242 };
2243
2244 let max_point = snapshot.max_point();
2245 let start_position = Point::new(line, 0);
2246
2247 if start_position > max_point {
2248 return Err(acp::Error::invalid_params().data(format!(
2249 "Attempting to read beyond the end of the file, line {}:{}",
2250 max_point.row + 1,
2251 max_point.column
2252 )));
2253 }
2254
2255 let start = snapshot.anchor_before(start_position);
2256 let end = snapshot.anchor_before(Point::new(line.saturating_add(limit), 0));
2257
2258 project.update(cx, |project, cx| {
2259 project.set_agent_location(
2260 Some(AgentLocation {
2261 buffer: buffer.downgrade(),
2262 position: start,
2263 }),
2264 cx,
2265 );
2266 });
2267
2268 Ok(snapshot.text_for_range(start..end).collect::<String>())
2269 })
2270 }
2271
2272 pub fn write_text_file(
2273 &self,
2274 path: PathBuf,
2275 content: String,
2276 cx: &mut Context<Self>,
2277 ) -> Task<Result<()>> {
2278 let project = self.project.clone();
2279 let action_log = self.action_log.clone();
2280 cx.spawn(async move |this, cx| {
2281 let load = project.update(cx, |project, cx| {
2282 let path = project
2283 .project_path_for_absolute_path(&path, cx)
2284 .context("invalid path")?;
2285 anyhow::Ok(project.open_buffer(path, cx))
2286 });
2287 let buffer = load?.await?;
2288 let snapshot = this.update(cx, |this, cx| {
2289 this.shared_buffers
2290 .get(&buffer)
2291 .cloned()
2292 .unwrap_or_else(|| buffer.read(cx).snapshot())
2293 })?;
2294 let edits = cx
2295 .background_executor()
2296 .spawn(async move {
2297 let old_text = snapshot.text();
2298 text_diff(old_text.as_str(), &content)
2299 .into_iter()
2300 .map(|(range, replacement)| {
2301 (
2302 snapshot.anchor_after(range.start)
2303 ..snapshot.anchor_before(range.end),
2304 replacement,
2305 )
2306 })
2307 .collect::<Vec<_>>()
2308 })
2309 .await;
2310
2311 project.update(cx, |project, cx| {
2312 project.set_agent_location(
2313 Some(AgentLocation {
2314 buffer: buffer.downgrade(),
2315 position: edits
2316 .last()
2317 .map(|(range, _)| range.end)
2318 .unwrap_or(Anchor::min_for_buffer(buffer.read(cx).remote_id())),
2319 }),
2320 cx,
2321 );
2322 });
2323
2324 let format_on_save = cx.update(|cx| {
2325 action_log.update(cx, |action_log, cx| {
2326 action_log.buffer_read(buffer.clone(), cx);
2327 });
2328
2329 let format_on_save = buffer.update(cx, |buffer, cx| {
2330 buffer.edit(edits, None, cx);
2331
2332 let settings = language::language_settings::language_settings(
2333 buffer.language().map(|l| l.name()),
2334 buffer.file(),
2335 cx,
2336 );
2337
2338 settings.format_on_save != FormatOnSave::Off
2339 });
2340 action_log.update(cx, |action_log, cx| {
2341 action_log.buffer_edited(buffer.clone(), cx);
2342 });
2343 format_on_save
2344 });
2345
2346 if format_on_save {
2347 let format_task = project.update(cx, |project, cx| {
2348 project.format(
2349 HashSet::from_iter([buffer.clone()]),
2350 LspFormatTarget::Buffers,
2351 false,
2352 FormatTrigger::Save,
2353 cx,
2354 )
2355 });
2356 format_task.await.log_err();
2357
2358 action_log.update(cx, |action_log, cx| {
2359 action_log.buffer_edited(buffer.clone(), cx);
2360 });
2361 }
2362
2363 project
2364 .update(cx, |project, cx| project.save_buffer(buffer, cx))
2365 .await
2366 })
2367 }
2368
2369 pub fn create_terminal(
2370 &self,
2371 command: String,
2372 args: Vec<String>,
2373 extra_env: Vec<acp::EnvVariable>,
2374 cwd: Option<PathBuf>,
2375 output_byte_limit: Option<u64>,
2376 cx: &mut Context<Self>,
2377 ) -> Task<Result<Entity<Terminal>>> {
2378 let env = match &cwd {
2379 Some(dir) => self.project.update(cx, |project, cx| {
2380 project.environment().update(cx, |env, cx| {
2381 env.directory_environment(dir.as_path().into(), cx)
2382 })
2383 }),
2384 None => Task::ready(None).shared(),
2385 };
2386 let env = cx.spawn(async move |_, _| {
2387 let mut env = env.await.unwrap_or_default();
2388 // Disables paging for `git` and hopefully other commands
2389 env.insert("PAGER".into(), "".into());
2390 for var in extra_env {
2391 env.insert(var.name, var.value);
2392 }
2393 env
2394 });
2395
2396 let project = self.project.clone();
2397 let language_registry = project.read(cx).languages().clone();
2398 let is_windows = project.read(cx).path_style(cx).is_windows();
2399
2400 let terminal_id = acp::TerminalId::new(Uuid::new_v4().to_string());
2401 let terminal_task = cx.spawn({
2402 let terminal_id = terminal_id.clone();
2403 async move |_this, cx| {
2404 let env = env.await;
2405 let shell = project
2406 .update(cx, |project, cx| {
2407 project
2408 .remote_client()
2409 .and_then(|r| r.read(cx).default_system_shell())
2410 })
2411 .unwrap_or_else(|| get_default_system_shell_preferring_bash());
2412 let (task_command, task_args) =
2413 ShellBuilder::new(&Shell::Program(shell), is_windows)
2414 .redirect_stdin_to_dev_null()
2415 .build(Some(command.clone()), &args);
2416 let terminal = project
2417 .update(cx, |project, cx| {
2418 project.create_terminal_task(
2419 task::SpawnInTerminal {
2420 command: Some(task_command),
2421 args: task_args,
2422 cwd: cwd.clone(),
2423 env,
2424 ..Default::default()
2425 },
2426 cx,
2427 )
2428 })
2429 .await?;
2430
2431 anyhow::Ok(cx.new(|cx| {
2432 Terminal::new(
2433 terminal_id,
2434 &format!("{} {}", command, args.join(" ")),
2435 cwd,
2436 output_byte_limit.map(|l| l as usize),
2437 terminal,
2438 language_registry,
2439 cx,
2440 )
2441 }))
2442 }
2443 });
2444
2445 cx.spawn(async move |this, cx| {
2446 let terminal = terminal_task.await?;
2447 this.update(cx, |this, _cx| {
2448 this.terminals.insert(terminal_id, terminal.clone());
2449 terminal
2450 })
2451 })
2452 }
2453
2454 pub fn kill_terminal(
2455 &mut self,
2456 terminal_id: acp::TerminalId,
2457 cx: &mut Context<Self>,
2458 ) -> Result<()> {
2459 self.terminals
2460 .get(&terminal_id)
2461 .context("Terminal not found")?
2462 .update(cx, |terminal, cx| {
2463 terminal.kill(cx);
2464 });
2465
2466 Ok(())
2467 }
2468
2469 pub fn release_terminal(
2470 &mut self,
2471 terminal_id: acp::TerminalId,
2472 cx: &mut Context<Self>,
2473 ) -> Result<()> {
2474 self.terminals
2475 .remove(&terminal_id)
2476 .context("Terminal not found")?
2477 .update(cx, |terminal, cx| {
2478 terminal.kill(cx);
2479 });
2480
2481 Ok(())
2482 }
2483
2484 pub fn terminal(&self, terminal_id: acp::TerminalId) -> Result<Entity<Terminal>> {
2485 self.terminals
2486 .get(&terminal_id)
2487 .context("Terminal not found")
2488 .cloned()
2489 }
2490
2491 pub fn to_markdown(&self, cx: &App) -> String {
2492 self.entries.iter().map(|e| e.to_markdown(cx)).collect()
2493 }
2494
2495 pub fn emit_load_error(&mut self, error: LoadError, cx: &mut Context<Self>) {
2496 cx.emit(AcpThreadEvent::LoadError(error));
2497 }
2498
2499 pub fn register_terminal_created(
2500 &mut self,
2501 terminal_id: acp::TerminalId,
2502 command_label: String,
2503 working_dir: Option<PathBuf>,
2504 output_byte_limit: Option<u64>,
2505 terminal: Entity<::terminal::Terminal>,
2506 cx: &mut Context<Self>,
2507 ) -> Entity<Terminal> {
2508 let language_registry = self.project.read(cx).languages().clone();
2509
2510 let entity = cx.new(|cx| {
2511 Terminal::new(
2512 terminal_id.clone(),
2513 &command_label,
2514 working_dir.clone(),
2515 output_byte_limit.map(|l| l as usize),
2516 terminal,
2517 language_registry,
2518 cx,
2519 )
2520 });
2521 self.terminals.insert(terminal_id.clone(), entity.clone());
2522 entity
2523 }
2524}
2525
2526fn markdown_for_raw_output(
2527 raw_output: &serde_json::Value,
2528 language_registry: &Arc<LanguageRegistry>,
2529 cx: &mut App,
2530) -> Option<Entity<Markdown>> {
2531 match raw_output {
2532 serde_json::Value::Null => None,
2533 serde_json::Value::Bool(value) => Some(cx.new(|cx| {
2534 Markdown::new(
2535 value.to_string().into(),
2536 Some(language_registry.clone()),
2537 None,
2538 cx,
2539 )
2540 })),
2541 serde_json::Value::Number(value) => Some(cx.new(|cx| {
2542 Markdown::new(
2543 value.to_string().into(),
2544 Some(language_registry.clone()),
2545 None,
2546 cx,
2547 )
2548 })),
2549 serde_json::Value::String(value) => Some(cx.new(|cx| {
2550 Markdown::new(
2551 value.clone().into(),
2552 Some(language_registry.clone()),
2553 None,
2554 cx,
2555 )
2556 })),
2557 value => Some(cx.new(|cx| {
2558 let pretty_json = to_string_pretty(value).unwrap_or_else(|_| value.to_string());
2559
2560 Markdown::new(
2561 format!("```json\n{}\n```", pretty_json).into(),
2562 Some(language_registry.clone()),
2563 None,
2564 cx,
2565 )
2566 })),
2567 }
2568}
2569
2570#[cfg(test)]
2571mod tests {
2572 use super::*;
2573 use anyhow::anyhow;
2574 use futures::{channel::mpsc, future::LocalBoxFuture, select};
2575 use gpui::{App, AsyncApp, TestAppContext, WeakEntity};
2576 use indoc::indoc;
2577 use project::{FakeFs, Fs};
2578 use rand::{distr, prelude::*};
2579 use serde_json::json;
2580 use settings::SettingsStore;
2581 use smol::stream::StreamExt as _;
2582 use std::{
2583 any::Any,
2584 cell::RefCell,
2585 path::Path,
2586 rc::Rc,
2587 sync::atomic::{AtomicBool, AtomicUsize, Ordering::SeqCst},
2588 time::Duration,
2589 };
2590 use util::path;
2591
2592 fn init_test(cx: &mut TestAppContext) {
2593 env_logger::try_init().ok();
2594 cx.update(|cx| {
2595 let settings_store = SettingsStore::test(cx);
2596 cx.set_global(settings_store);
2597 });
2598 }
2599
2600 #[gpui::test]
2601 async fn test_terminal_output_buffered_before_created_renders(cx: &mut gpui::TestAppContext) {
2602 init_test(cx);
2603
2604 let fs = FakeFs::new(cx.executor());
2605 let project = Project::test(fs, [], cx).await;
2606 let connection = Rc::new(FakeAgentConnection::new());
2607 let thread = cx
2608 .update(|cx| connection.new_thread(project, std::path::Path::new(path!("/test")), cx))
2609 .await
2610 .unwrap();
2611
2612 let terminal_id = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
2613
2614 // Send Output BEFORE Created - should be buffered by acp_thread
2615 thread.update(cx, |thread, cx| {
2616 thread.on_terminal_provider_event(
2617 TerminalProviderEvent::Output {
2618 terminal_id: terminal_id.clone(),
2619 data: b"hello buffered".to_vec(),
2620 },
2621 cx,
2622 );
2623 });
2624
2625 // Create a display-only terminal and then send Created
2626 let lower = cx.new(|cx| {
2627 let builder = ::terminal::TerminalBuilder::new_display_only(
2628 ::terminal::terminal_settings::CursorShape::default(),
2629 ::terminal::terminal_settings::AlternateScroll::On,
2630 None,
2631 0,
2632 )
2633 .unwrap();
2634 builder.subscribe(cx)
2635 });
2636
2637 thread.update(cx, |thread, cx| {
2638 thread.on_terminal_provider_event(
2639 TerminalProviderEvent::Created {
2640 terminal_id: terminal_id.clone(),
2641 label: "Buffered Test".to_string(),
2642 cwd: None,
2643 output_byte_limit: None,
2644 terminal: lower.clone(),
2645 },
2646 cx,
2647 );
2648 });
2649
2650 // After Created, buffered Output should have been flushed into the renderer
2651 let content = thread.read_with(cx, |thread, cx| {
2652 let term = thread.terminal(terminal_id.clone()).unwrap();
2653 term.read_with(cx, |t, cx| t.inner().read(cx).get_content())
2654 });
2655
2656 assert!(
2657 content.contains("hello buffered"),
2658 "expected buffered output to render, got: {content}"
2659 );
2660 }
2661
2662 #[gpui::test]
2663 async fn test_terminal_output_and_exit_buffered_before_created(cx: &mut gpui::TestAppContext) {
2664 init_test(cx);
2665
2666 let fs = FakeFs::new(cx.executor());
2667 let project = Project::test(fs, [], cx).await;
2668 let connection = Rc::new(FakeAgentConnection::new());
2669 let thread = cx
2670 .update(|cx| connection.new_thread(project, std::path::Path::new(path!("/test")), cx))
2671 .await
2672 .unwrap();
2673
2674 let terminal_id = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
2675
2676 // Send Output BEFORE Created
2677 thread.update(cx, |thread, cx| {
2678 thread.on_terminal_provider_event(
2679 TerminalProviderEvent::Output {
2680 terminal_id: terminal_id.clone(),
2681 data: b"pre-exit data".to_vec(),
2682 },
2683 cx,
2684 );
2685 });
2686
2687 // Send Exit BEFORE Created
2688 thread.update(cx, |thread, cx| {
2689 thread.on_terminal_provider_event(
2690 TerminalProviderEvent::Exit {
2691 terminal_id: terminal_id.clone(),
2692 status: acp::TerminalExitStatus::new().exit_code(0),
2693 },
2694 cx,
2695 );
2696 });
2697
2698 // Now create a display-only lower-level terminal and send Created
2699 let lower = cx.new(|cx| {
2700 let builder = ::terminal::TerminalBuilder::new_display_only(
2701 ::terminal::terminal_settings::CursorShape::default(),
2702 ::terminal::terminal_settings::AlternateScroll::On,
2703 None,
2704 0,
2705 )
2706 .unwrap();
2707 builder.subscribe(cx)
2708 });
2709
2710 thread.update(cx, |thread, cx| {
2711 thread.on_terminal_provider_event(
2712 TerminalProviderEvent::Created {
2713 terminal_id: terminal_id.clone(),
2714 label: "Buffered Exit Test".to_string(),
2715 cwd: None,
2716 output_byte_limit: None,
2717 terminal: lower.clone(),
2718 },
2719 cx,
2720 );
2721 });
2722
2723 // Output should be present after Created (flushed from buffer)
2724 let content = thread.read_with(cx, |thread, cx| {
2725 let term = thread.terminal(terminal_id.clone()).unwrap();
2726 term.read_with(cx, |t, cx| t.inner().read(cx).get_content())
2727 });
2728
2729 assert!(
2730 content.contains("pre-exit data"),
2731 "expected pre-exit data to render, got: {content}"
2732 );
2733 }
2734
2735 /// Test that killing a terminal via Terminal::kill properly:
2736 /// 1. Causes wait_for_exit to complete (doesn't hang forever)
2737 /// 2. The underlying terminal still has the output that was written before the kill
2738 ///
2739 /// This test verifies that the fix to kill_active_task (which now also kills
2740 /// the shell process in addition to the foreground process) properly allows
2741 /// wait_for_exit to complete instead of hanging indefinitely.
2742 #[cfg(unix)]
2743 #[gpui::test]
2744 async fn test_terminal_kill_allows_wait_for_exit_to_complete(cx: &mut gpui::TestAppContext) {
2745 use std::collections::HashMap;
2746 use task::Shell;
2747 use util::shell_builder::ShellBuilder;
2748
2749 init_test(cx);
2750 cx.executor().allow_parking();
2751
2752 let fs = FakeFs::new(cx.executor());
2753 let project = Project::test(fs, [], cx).await;
2754 let connection = Rc::new(FakeAgentConnection::new());
2755 let thread = cx
2756 .update(|cx| connection.new_thread(project.clone(), Path::new(path!("/test")), cx))
2757 .await
2758 .unwrap();
2759
2760 let terminal_id = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
2761
2762 // Create a real PTY terminal that runs a command which prints output then sleeps
2763 // We use printf instead of echo and chain with && sleep to ensure proper execution
2764 let (completion_tx, _completion_rx) = smol::channel::unbounded();
2765 let (program, args) = ShellBuilder::new(&Shell::System, false).build(
2766 Some("printf 'output_before_kill\\n' && sleep 60".to_owned()),
2767 &[],
2768 );
2769
2770 let builder = cx
2771 .update(|cx| {
2772 ::terminal::TerminalBuilder::new(
2773 None,
2774 None,
2775 task::Shell::WithArguments {
2776 program,
2777 args,
2778 title_override: None,
2779 },
2780 HashMap::default(),
2781 ::terminal::terminal_settings::CursorShape::default(),
2782 ::terminal::terminal_settings::AlternateScroll::On,
2783 None,
2784 vec![],
2785 0,
2786 false,
2787 0,
2788 Some(completion_tx),
2789 cx,
2790 vec![],
2791 )
2792 })
2793 .await
2794 .unwrap();
2795
2796 let lower_terminal = cx.new(|cx| builder.subscribe(cx));
2797
2798 // Create the acp_thread Terminal wrapper
2799 thread.update(cx, |thread, cx| {
2800 thread.on_terminal_provider_event(
2801 TerminalProviderEvent::Created {
2802 terminal_id: terminal_id.clone(),
2803 label: "printf output_before_kill && sleep 60".to_string(),
2804 cwd: None,
2805 output_byte_limit: None,
2806 terminal: lower_terminal.clone(),
2807 },
2808 cx,
2809 );
2810 });
2811
2812 // Wait for the printf command to execute and produce output
2813 // Use real time since parking is enabled
2814 cx.executor().timer(Duration::from_millis(500)).await;
2815
2816 // Get the acp_thread Terminal and kill it
2817 let wait_for_exit = thread.update(cx, |thread, cx| {
2818 let term = thread.terminals.get(&terminal_id).unwrap();
2819 let wait_for_exit = term.read(cx).wait_for_exit();
2820 term.update(cx, |term, cx| {
2821 term.kill(cx);
2822 });
2823 wait_for_exit
2824 });
2825
2826 // KEY ASSERTION: wait_for_exit should complete within a reasonable time (not hang).
2827 // Before the fix to kill_active_task, this would hang forever because
2828 // only the foreground process was killed, not the shell, so the PTY
2829 // child never exited and wait_for_completed_task never completed.
2830 let exit_result = futures::select! {
2831 result = futures::FutureExt::fuse(wait_for_exit) => Some(result),
2832 _ = futures::FutureExt::fuse(cx.background_executor.timer(Duration::from_secs(5))) => None,
2833 };
2834
2835 assert!(
2836 exit_result.is_some(),
2837 "wait_for_exit should complete after kill, but it timed out. \
2838 This indicates kill_active_task is not properly killing the shell process."
2839 );
2840
2841 // Give the system a chance to process any pending updates
2842 cx.run_until_parked();
2843
2844 // Verify that the underlying terminal still has the output that was
2845 // written before the kill. This verifies that killing doesn't lose output.
2846 let inner_content = thread.read_with(cx, |thread, cx| {
2847 let term = thread.terminals.get(&terminal_id).unwrap();
2848 term.read(cx).inner().read(cx).get_content()
2849 });
2850
2851 assert!(
2852 inner_content.contains("output_before_kill"),
2853 "Underlying terminal should contain output from before kill, got: {}",
2854 inner_content
2855 );
2856 }
2857
2858 #[gpui::test]
2859 async fn test_push_user_content_block(cx: &mut gpui::TestAppContext) {
2860 init_test(cx);
2861
2862 let fs = FakeFs::new(cx.executor());
2863 let project = Project::test(fs, [], cx).await;
2864 let connection = Rc::new(FakeAgentConnection::new());
2865 let thread = cx
2866 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
2867 .await
2868 .unwrap();
2869
2870 // Test creating a new user message
2871 thread.update(cx, |thread, cx| {
2872 thread.push_user_content_block(None, "Hello, ".into(), cx);
2873 });
2874
2875 thread.update(cx, |thread, cx| {
2876 assert_eq!(thread.entries.len(), 1);
2877 if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
2878 assert_eq!(user_msg.id, None);
2879 assert_eq!(user_msg.content.to_markdown(cx), "Hello, ");
2880 } else {
2881 panic!("Expected UserMessage");
2882 }
2883 });
2884
2885 // Test appending to existing user message
2886 let message_1_id = UserMessageId::new();
2887 thread.update(cx, |thread, cx| {
2888 thread.push_user_content_block(Some(message_1_id.clone()), "world!".into(), cx);
2889 });
2890
2891 thread.update(cx, |thread, cx| {
2892 assert_eq!(thread.entries.len(), 1);
2893 if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
2894 assert_eq!(user_msg.id, Some(message_1_id));
2895 assert_eq!(user_msg.content.to_markdown(cx), "Hello, world!");
2896 } else {
2897 panic!("Expected UserMessage");
2898 }
2899 });
2900
2901 // Test creating new user message after assistant message
2902 thread.update(cx, |thread, cx| {
2903 thread.push_assistant_content_block("Assistant response".into(), false, cx);
2904 });
2905
2906 let message_2_id = UserMessageId::new();
2907 thread.update(cx, |thread, cx| {
2908 thread.push_user_content_block(
2909 Some(message_2_id.clone()),
2910 "New user message".into(),
2911 cx,
2912 );
2913 });
2914
2915 thread.update(cx, |thread, cx| {
2916 assert_eq!(thread.entries.len(), 3);
2917 if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[2] {
2918 assert_eq!(user_msg.id, Some(message_2_id));
2919 assert_eq!(user_msg.content.to_markdown(cx), "New user message");
2920 } else {
2921 panic!("Expected UserMessage at index 2");
2922 }
2923 });
2924 }
2925
2926 #[gpui::test]
2927 async fn test_thinking_concatenation(cx: &mut gpui::TestAppContext) {
2928 init_test(cx);
2929
2930 let fs = FakeFs::new(cx.executor());
2931 let project = Project::test(fs, [], cx).await;
2932 let connection = Rc::new(FakeAgentConnection::new().on_user_message(
2933 |_, thread, mut cx| {
2934 async move {
2935 thread.update(&mut cx, |thread, cx| {
2936 thread
2937 .handle_session_update(
2938 acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk::new(
2939 "Thinking ".into(),
2940 )),
2941 cx,
2942 )
2943 .unwrap();
2944 thread
2945 .handle_session_update(
2946 acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk::new(
2947 "hard!".into(),
2948 )),
2949 cx,
2950 )
2951 .unwrap();
2952 })?;
2953 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
2954 }
2955 .boxed_local()
2956 },
2957 ));
2958
2959 let thread = cx
2960 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
2961 .await
2962 .unwrap();
2963
2964 thread
2965 .update(cx, |thread, cx| thread.send_raw("Hello from Zed!", cx))
2966 .await
2967 .unwrap();
2968
2969 let output = thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
2970 assert_eq!(
2971 output,
2972 indoc! {r#"
2973 ## User
2974
2975 Hello from Zed!
2976
2977 ## Assistant
2978
2979 <thinking>
2980 Thinking hard!
2981 </thinking>
2982
2983 "#}
2984 );
2985 }
2986
2987 #[gpui::test]
2988 async fn test_edits_concurrently_to_user(cx: &mut TestAppContext) {
2989 init_test(cx);
2990
2991 let fs = FakeFs::new(cx.executor());
2992 fs.insert_tree(path!("/tmp"), json!({"foo": "one\ntwo\nthree\n"}))
2993 .await;
2994 let project = Project::test(fs.clone(), [], cx).await;
2995 let (read_file_tx, read_file_rx) = oneshot::channel::<()>();
2996 let read_file_tx = Rc::new(RefCell::new(Some(read_file_tx)));
2997 let connection = Rc::new(FakeAgentConnection::new().on_user_message(
2998 move |_, thread, mut cx| {
2999 let read_file_tx = read_file_tx.clone();
3000 async move {
3001 let content = thread
3002 .update(&mut cx, |thread, cx| {
3003 thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
3004 })
3005 .unwrap()
3006 .await
3007 .unwrap();
3008 assert_eq!(content, "one\ntwo\nthree\n");
3009 read_file_tx.take().unwrap().send(()).unwrap();
3010 thread
3011 .update(&mut cx, |thread, cx| {
3012 thread.write_text_file(
3013 path!("/tmp/foo").into(),
3014 "one\ntwo\nthree\nfour\nfive\n".to_string(),
3015 cx,
3016 )
3017 })
3018 .unwrap()
3019 .await
3020 .unwrap();
3021 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3022 }
3023 .boxed_local()
3024 },
3025 ));
3026
3027 let (worktree, pathbuf) = project
3028 .update(cx, |project, cx| {
3029 project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
3030 })
3031 .await
3032 .unwrap();
3033 let buffer = project
3034 .update(cx, |project, cx| {
3035 project.open_buffer((worktree.read(cx).id(), pathbuf), cx)
3036 })
3037 .await
3038 .unwrap();
3039
3040 let thread = cx
3041 .update(|cx| connection.new_thread(project, Path::new(path!("/tmp")), cx))
3042 .await
3043 .unwrap();
3044
3045 let request = thread.update(cx, |thread, cx| {
3046 thread.send_raw("Extend the count in /tmp/foo", cx)
3047 });
3048 read_file_rx.await.ok();
3049 buffer.update(cx, |buffer, cx| {
3050 buffer.edit([(0..0, "zero\n".to_string())], None, cx);
3051 });
3052 cx.run_until_parked();
3053 assert_eq!(
3054 buffer.read_with(cx, |buffer, _| buffer.text()),
3055 "zero\none\ntwo\nthree\nfour\nfive\n"
3056 );
3057 assert_eq!(
3058 String::from_utf8(fs.read_file_sync(path!("/tmp/foo")).unwrap()).unwrap(),
3059 "zero\none\ntwo\nthree\nfour\nfive\n"
3060 );
3061 request.await.unwrap();
3062 }
3063
3064 #[gpui::test]
3065 async fn test_reading_from_line(cx: &mut TestAppContext) {
3066 init_test(cx);
3067
3068 let fs = FakeFs::new(cx.executor());
3069 fs.insert_tree(path!("/tmp"), json!({"foo": "one\ntwo\nthree\nfour\n"}))
3070 .await;
3071 let project = Project::test(fs.clone(), [], cx).await;
3072 project
3073 .update(cx, |project, cx| {
3074 project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
3075 })
3076 .await
3077 .unwrap();
3078
3079 let connection = Rc::new(FakeAgentConnection::new());
3080
3081 let thread = cx
3082 .update(|cx| connection.new_thread(project, Path::new(path!("/tmp")), cx))
3083 .await
3084 .unwrap();
3085
3086 // Whole file
3087 let content = thread
3088 .update(cx, |thread, cx| {
3089 thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
3090 })
3091 .await
3092 .unwrap();
3093
3094 assert_eq!(content, "one\ntwo\nthree\nfour\n");
3095
3096 // Only start line
3097 let content = thread
3098 .update(cx, |thread, cx| {
3099 thread.read_text_file(path!("/tmp/foo").into(), Some(3), None, false, cx)
3100 })
3101 .await
3102 .unwrap();
3103
3104 assert_eq!(content, "three\nfour\n");
3105
3106 // Only limit
3107 let content = thread
3108 .update(cx, |thread, cx| {
3109 thread.read_text_file(path!("/tmp/foo").into(), None, Some(2), false, cx)
3110 })
3111 .await
3112 .unwrap();
3113
3114 assert_eq!(content, "one\ntwo\n");
3115
3116 // Range
3117 let content = thread
3118 .update(cx, |thread, cx| {
3119 thread.read_text_file(path!("/tmp/foo").into(), Some(2), Some(2), false, cx)
3120 })
3121 .await
3122 .unwrap();
3123
3124 assert_eq!(content, "two\nthree\n");
3125
3126 // Invalid
3127 let err = thread
3128 .update(cx, |thread, cx| {
3129 thread.read_text_file(path!("/tmp/foo").into(), Some(6), Some(2), false, cx)
3130 })
3131 .await
3132 .unwrap_err();
3133
3134 assert_eq!(
3135 err.to_string(),
3136 "Invalid params: \"Attempting to read beyond the end of the file, line 5:0\""
3137 );
3138 }
3139
3140 #[gpui::test]
3141 async fn test_reading_empty_file(cx: &mut TestAppContext) {
3142 init_test(cx);
3143
3144 let fs = FakeFs::new(cx.executor());
3145 fs.insert_tree(path!("/tmp"), json!({"foo": ""})).await;
3146 let project = Project::test(fs.clone(), [], cx).await;
3147 project
3148 .update(cx, |project, cx| {
3149 project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
3150 })
3151 .await
3152 .unwrap();
3153
3154 let connection = Rc::new(FakeAgentConnection::new());
3155
3156 let thread = cx
3157 .update(|cx| connection.new_thread(project, Path::new(path!("/tmp")), cx))
3158 .await
3159 .unwrap();
3160
3161 // Whole file
3162 let content = thread
3163 .update(cx, |thread, cx| {
3164 thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
3165 })
3166 .await
3167 .unwrap();
3168
3169 assert_eq!(content, "");
3170
3171 // Only start line
3172 let content = thread
3173 .update(cx, |thread, cx| {
3174 thread.read_text_file(path!("/tmp/foo").into(), Some(1), None, false, cx)
3175 })
3176 .await
3177 .unwrap();
3178
3179 assert_eq!(content, "");
3180
3181 // Only limit
3182 let content = thread
3183 .update(cx, |thread, cx| {
3184 thread.read_text_file(path!("/tmp/foo").into(), None, Some(2), false, cx)
3185 })
3186 .await
3187 .unwrap();
3188
3189 assert_eq!(content, "");
3190
3191 // Range
3192 let content = thread
3193 .update(cx, |thread, cx| {
3194 thread.read_text_file(path!("/tmp/foo").into(), Some(1), Some(1), false, cx)
3195 })
3196 .await
3197 .unwrap();
3198
3199 assert_eq!(content, "");
3200
3201 // Invalid
3202 let err = thread
3203 .update(cx, |thread, cx| {
3204 thread.read_text_file(path!("/tmp/foo").into(), Some(5), Some(2), false, cx)
3205 })
3206 .await
3207 .unwrap_err();
3208
3209 assert_eq!(
3210 err.to_string(),
3211 "Invalid params: \"Attempting to read beyond the end of the file, line 1:0\""
3212 );
3213 }
3214 #[gpui::test]
3215 async fn test_reading_non_existing_file(cx: &mut TestAppContext) {
3216 init_test(cx);
3217
3218 let fs = FakeFs::new(cx.executor());
3219 fs.insert_tree(path!("/tmp"), json!({})).await;
3220 let project = Project::test(fs.clone(), [], cx).await;
3221 project
3222 .update(cx, |project, cx| {
3223 project.find_or_create_worktree(path!("/tmp"), true, cx)
3224 })
3225 .await
3226 .unwrap();
3227
3228 let connection = Rc::new(FakeAgentConnection::new());
3229
3230 let thread = cx
3231 .update(|cx| connection.new_thread(project, Path::new(path!("/tmp")), cx))
3232 .await
3233 .unwrap();
3234
3235 // Out of project file
3236 let err = thread
3237 .update(cx, |thread, cx| {
3238 thread.read_text_file(path!("/foo").into(), None, None, false, cx)
3239 })
3240 .await
3241 .unwrap_err();
3242
3243 assert_eq!(err.code, acp::ErrorCode::ResourceNotFound);
3244 }
3245
3246 #[gpui::test]
3247 async fn test_succeeding_canceled_toolcall(cx: &mut TestAppContext) {
3248 init_test(cx);
3249
3250 let fs = FakeFs::new(cx.executor());
3251 let project = Project::test(fs, [], cx).await;
3252 let id = acp::ToolCallId::new("test");
3253
3254 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3255 let id = id.clone();
3256 move |_, thread, mut cx| {
3257 let id = id.clone();
3258 async move {
3259 thread
3260 .update(&mut cx, |thread, cx| {
3261 thread.handle_session_update(
3262 acp::SessionUpdate::ToolCall(
3263 acp::ToolCall::new(id.clone(), "Label")
3264 .kind(acp::ToolKind::Fetch)
3265 .status(acp::ToolCallStatus::InProgress),
3266 ),
3267 cx,
3268 )
3269 })
3270 .unwrap()
3271 .unwrap();
3272 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3273 }
3274 .boxed_local()
3275 }
3276 }));
3277
3278 let thread = cx
3279 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3280 .await
3281 .unwrap();
3282
3283 let request = thread.update(cx, |thread, cx| {
3284 thread.send_raw("Fetch https://example.com", cx)
3285 });
3286
3287 run_until_first_tool_call(&thread, cx).await;
3288
3289 thread.read_with(cx, |thread, _| {
3290 assert!(matches!(
3291 thread.entries[1],
3292 AgentThreadEntry::ToolCall(ToolCall {
3293 status: ToolCallStatus::InProgress,
3294 ..
3295 })
3296 ));
3297 });
3298
3299 thread.update(cx, |thread, cx| thread.cancel(cx)).await;
3300
3301 thread.read_with(cx, |thread, _| {
3302 assert!(matches!(
3303 &thread.entries[1],
3304 AgentThreadEntry::ToolCall(ToolCall {
3305 status: ToolCallStatus::Canceled,
3306 ..
3307 })
3308 ));
3309 });
3310
3311 thread
3312 .update(cx, |thread, cx| {
3313 thread.handle_session_update(
3314 acp::SessionUpdate::ToolCallUpdate(acp::ToolCallUpdate::new(
3315 id,
3316 acp::ToolCallUpdateFields::new().status(acp::ToolCallStatus::Completed),
3317 )),
3318 cx,
3319 )
3320 })
3321 .unwrap();
3322
3323 request.await.unwrap();
3324
3325 thread.read_with(cx, |thread, _| {
3326 assert!(matches!(
3327 thread.entries[1],
3328 AgentThreadEntry::ToolCall(ToolCall {
3329 status: ToolCallStatus::Completed,
3330 ..
3331 })
3332 ));
3333 });
3334 }
3335
3336 #[gpui::test]
3337 async fn test_no_pending_edits_if_tool_calls_are_completed(cx: &mut TestAppContext) {
3338 init_test(cx);
3339 let fs = FakeFs::new(cx.background_executor.clone());
3340 fs.insert_tree(path!("/test"), json!({})).await;
3341 let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
3342
3343 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3344 move |_, thread, mut cx| {
3345 async move {
3346 thread
3347 .update(&mut cx, |thread, cx| {
3348 thread.handle_session_update(
3349 acp::SessionUpdate::ToolCall(
3350 acp::ToolCall::new("test", "Label")
3351 .kind(acp::ToolKind::Edit)
3352 .status(acp::ToolCallStatus::Completed)
3353 .content(vec![acp::ToolCallContent::Diff(acp::Diff::new(
3354 "/test/test.txt",
3355 "foo",
3356 ))]),
3357 ),
3358 cx,
3359 )
3360 })
3361 .unwrap()
3362 .unwrap();
3363 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3364 }
3365 .boxed_local()
3366 }
3367 }));
3368
3369 let thread = cx
3370 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3371 .await
3372 .unwrap();
3373
3374 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Hi".into()], cx)))
3375 .await
3376 .unwrap();
3377
3378 assert!(cx.read(|cx| !thread.read(cx).has_pending_edit_tool_calls()));
3379 }
3380
3381 #[gpui::test(iterations = 10)]
3382 async fn test_checkpoints(cx: &mut TestAppContext) {
3383 init_test(cx);
3384 let fs = FakeFs::new(cx.background_executor.clone());
3385 fs.insert_tree(
3386 path!("/test"),
3387 json!({
3388 ".git": {}
3389 }),
3390 )
3391 .await;
3392 let project = Project::test(fs.clone(), [path!("/test").as_ref()], cx).await;
3393
3394 let simulate_changes = Arc::new(AtomicBool::new(true));
3395 let next_filename = Arc::new(AtomicUsize::new(0));
3396 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3397 let simulate_changes = simulate_changes.clone();
3398 let next_filename = next_filename.clone();
3399 let fs = fs.clone();
3400 move |request, thread, mut cx| {
3401 let fs = fs.clone();
3402 let simulate_changes = simulate_changes.clone();
3403 let next_filename = next_filename.clone();
3404 async move {
3405 if simulate_changes.load(SeqCst) {
3406 let filename = format!("/test/file-{}", next_filename.fetch_add(1, SeqCst));
3407 fs.write(Path::new(&filename), b"").await?;
3408 }
3409
3410 let acp::ContentBlock::Text(content) = &request.prompt[0] else {
3411 panic!("expected text content block");
3412 };
3413 thread.update(&mut cx, |thread, cx| {
3414 thread
3415 .handle_session_update(
3416 acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk::new(
3417 content.text.to_uppercase().into(),
3418 )),
3419 cx,
3420 )
3421 .unwrap();
3422 })?;
3423 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3424 }
3425 .boxed_local()
3426 }
3427 }));
3428 let thread = cx
3429 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3430 .await
3431 .unwrap();
3432
3433 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Lorem".into()], cx)))
3434 .await
3435 .unwrap();
3436 thread.read_with(cx, |thread, cx| {
3437 assert_eq!(
3438 thread.to_markdown(cx),
3439 indoc! {"
3440 ## User (checkpoint)
3441
3442 Lorem
3443
3444 ## Assistant
3445
3446 LOREM
3447
3448 "}
3449 );
3450 });
3451 assert_eq!(fs.files(), vec![Path::new(path!("/test/file-0"))]);
3452
3453 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["ipsum".into()], cx)))
3454 .await
3455 .unwrap();
3456 thread.read_with(cx, |thread, cx| {
3457 assert_eq!(
3458 thread.to_markdown(cx),
3459 indoc! {"
3460 ## User (checkpoint)
3461
3462 Lorem
3463
3464 ## Assistant
3465
3466 LOREM
3467
3468 ## User (checkpoint)
3469
3470 ipsum
3471
3472 ## Assistant
3473
3474 IPSUM
3475
3476 "}
3477 );
3478 });
3479 assert_eq!(
3480 fs.files(),
3481 vec![
3482 Path::new(path!("/test/file-0")),
3483 Path::new(path!("/test/file-1"))
3484 ]
3485 );
3486
3487 // Checkpoint isn't stored when there are no changes.
3488 simulate_changes.store(false, SeqCst);
3489 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["dolor".into()], cx)))
3490 .await
3491 .unwrap();
3492 thread.read_with(cx, |thread, cx| {
3493 assert_eq!(
3494 thread.to_markdown(cx),
3495 indoc! {"
3496 ## User (checkpoint)
3497
3498 Lorem
3499
3500 ## Assistant
3501
3502 LOREM
3503
3504 ## User (checkpoint)
3505
3506 ipsum
3507
3508 ## Assistant
3509
3510 IPSUM
3511
3512 ## User
3513
3514 dolor
3515
3516 ## Assistant
3517
3518 DOLOR
3519
3520 "}
3521 );
3522 });
3523 assert_eq!(
3524 fs.files(),
3525 vec![
3526 Path::new(path!("/test/file-0")),
3527 Path::new(path!("/test/file-1"))
3528 ]
3529 );
3530
3531 // Rewinding the conversation truncates the history and restores the checkpoint.
3532 thread
3533 .update(cx, |thread, cx| {
3534 let AgentThreadEntry::UserMessage(message) = &thread.entries[2] else {
3535 panic!("unexpected entries {:?}", thread.entries)
3536 };
3537 thread.restore_checkpoint(message.id.clone().unwrap(), cx)
3538 })
3539 .await
3540 .unwrap();
3541 thread.read_with(cx, |thread, cx| {
3542 assert_eq!(
3543 thread.to_markdown(cx),
3544 indoc! {"
3545 ## User (checkpoint)
3546
3547 Lorem
3548
3549 ## Assistant
3550
3551 LOREM
3552
3553 "}
3554 );
3555 });
3556 assert_eq!(fs.files(), vec![Path::new(path!("/test/file-0"))]);
3557 }
3558
3559 #[gpui::test]
3560 async fn test_tool_result_refusal(cx: &mut TestAppContext) {
3561 use std::sync::atomic::AtomicUsize;
3562 init_test(cx);
3563
3564 let fs = FakeFs::new(cx.executor());
3565 let project = Project::test(fs, None, cx).await;
3566
3567 // Create a connection that simulates refusal after tool result
3568 let prompt_count = Arc::new(AtomicUsize::new(0));
3569 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3570 let prompt_count = prompt_count.clone();
3571 move |_request, thread, mut cx| {
3572 let count = prompt_count.fetch_add(1, SeqCst);
3573 async move {
3574 if count == 0 {
3575 // First prompt: Generate a tool call with result
3576 thread.update(&mut cx, |thread, cx| {
3577 thread
3578 .handle_session_update(
3579 acp::SessionUpdate::ToolCall(
3580 acp::ToolCall::new("tool1", "Test Tool")
3581 .kind(acp::ToolKind::Fetch)
3582 .status(acp::ToolCallStatus::Completed)
3583 .raw_input(serde_json::json!({"query": "test"}))
3584 .raw_output(serde_json::json!({"result": "inappropriate content"})),
3585 ),
3586 cx,
3587 )
3588 .unwrap();
3589 })?;
3590
3591 // Now return refusal because of the tool result
3592 Ok(acp::PromptResponse::new(acp::StopReason::Refusal))
3593 } else {
3594 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3595 }
3596 }
3597 .boxed_local()
3598 }
3599 }));
3600
3601 let thread = cx
3602 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3603 .await
3604 .unwrap();
3605
3606 // Track if we see a Refusal event
3607 let saw_refusal_event = Arc::new(std::sync::Mutex::new(false));
3608 let saw_refusal_event_captured = saw_refusal_event.clone();
3609 thread.update(cx, |_thread, cx| {
3610 cx.subscribe(
3611 &thread,
3612 move |_thread, _event_thread, event: &AcpThreadEvent, _cx| {
3613 if matches!(event, AcpThreadEvent::Refusal) {
3614 *saw_refusal_event_captured.lock().unwrap() = true;
3615 }
3616 },
3617 )
3618 .detach();
3619 });
3620
3621 // Send a user message - this will trigger tool call and then refusal
3622 let send_task = thread.update(cx, |thread, cx| thread.send(vec!["Hello".into()], cx));
3623 cx.background_executor.spawn(send_task).detach();
3624 cx.run_until_parked();
3625
3626 // Verify that:
3627 // 1. A Refusal event WAS emitted (because it's a tool result refusal, not user prompt)
3628 // 2. The user message was NOT truncated
3629 assert!(
3630 *saw_refusal_event.lock().unwrap(),
3631 "Refusal event should be emitted for tool result refusals"
3632 );
3633
3634 thread.read_with(cx, |thread, _| {
3635 let entries = thread.entries();
3636 assert!(entries.len() >= 2, "Should have user message and tool call");
3637
3638 // Verify user message is still there
3639 assert!(
3640 matches!(entries[0], AgentThreadEntry::UserMessage(_)),
3641 "User message should not be truncated"
3642 );
3643
3644 // Verify tool call is there with result
3645 if let AgentThreadEntry::ToolCall(tool_call) = &entries[1] {
3646 assert!(
3647 tool_call.raw_output.is_some(),
3648 "Tool call should have output"
3649 );
3650 } else {
3651 panic!("Expected tool call at index 1");
3652 }
3653 });
3654 }
3655
3656 #[gpui::test]
3657 async fn test_user_prompt_refusal_emits_event(cx: &mut TestAppContext) {
3658 init_test(cx);
3659
3660 let fs = FakeFs::new(cx.executor());
3661 let project = Project::test(fs, None, cx).await;
3662
3663 let refuse_next = Arc::new(AtomicBool::new(false));
3664 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3665 let refuse_next = refuse_next.clone();
3666 move |_request, _thread, _cx| {
3667 if refuse_next.load(SeqCst) {
3668 async move { Ok(acp::PromptResponse::new(acp::StopReason::Refusal)) }
3669 .boxed_local()
3670 } else {
3671 async move { Ok(acp::PromptResponse::new(acp::StopReason::EndTurn)) }
3672 .boxed_local()
3673 }
3674 }
3675 }));
3676
3677 let thread = cx
3678 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3679 .await
3680 .unwrap();
3681
3682 // Track if we see a Refusal event
3683 let saw_refusal_event = Arc::new(std::sync::Mutex::new(false));
3684 let saw_refusal_event_captured = saw_refusal_event.clone();
3685 thread.update(cx, |_thread, cx| {
3686 cx.subscribe(
3687 &thread,
3688 move |_thread, _event_thread, event: &AcpThreadEvent, _cx| {
3689 if matches!(event, AcpThreadEvent::Refusal) {
3690 *saw_refusal_event_captured.lock().unwrap() = true;
3691 }
3692 },
3693 )
3694 .detach();
3695 });
3696
3697 // Send a message that will be refused
3698 refuse_next.store(true, SeqCst);
3699 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["hello".into()], cx)))
3700 .await
3701 .unwrap();
3702
3703 // Verify that a Refusal event WAS emitted for user prompt refusal
3704 assert!(
3705 *saw_refusal_event.lock().unwrap(),
3706 "Refusal event should be emitted for user prompt refusals"
3707 );
3708
3709 // Verify the message was truncated (user prompt refusal)
3710 thread.read_with(cx, |thread, cx| {
3711 assert_eq!(thread.to_markdown(cx), "");
3712 });
3713 }
3714
3715 #[gpui::test]
3716 async fn test_refusal(cx: &mut TestAppContext) {
3717 init_test(cx);
3718 let fs = FakeFs::new(cx.background_executor.clone());
3719 fs.insert_tree(path!("/"), json!({})).await;
3720 let project = Project::test(fs.clone(), [path!("/").as_ref()], cx).await;
3721
3722 let refuse_next = Arc::new(AtomicBool::new(false));
3723 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3724 let refuse_next = refuse_next.clone();
3725 move |request, thread, mut cx| {
3726 let refuse_next = refuse_next.clone();
3727 async move {
3728 if refuse_next.load(SeqCst) {
3729 return Ok(acp::PromptResponse::new(acp::StopReason::Refusal));
3730 }
3731
3732 let acp::ContentBlock::Text(content) = &request.prompt[0] else {
3733 panic!("expected text content block");
3734 };
3735 thread.update(&mut cx, |thread, cx| {
3736 thread
3737 .handle_session_update(
3738 acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk::new(
3739 content.text.to_uppercase().into(),
3740 )),
3741 cx,
3742 )
3743 .unwrap();
3744 })?;
3745 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3746 }
3747 .boxed_local()
3748 }
3749 }));
3750 let thread = cx
3751 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3752 .await
3753 .unwrap();
3754
3755 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["hello".into()], cx)))
3756 .await
3757 .unwrap();
3758 thread.read_with(cx, |thread, cx| {
3759 assert_eq!(
3760 thread.to_markdown(cx),
3761 indoc! {"
3762 ## User
3763
3764 hello
3765
3766 ## Assistant
3767
3768 HELLO
3769
3770 "}
3771 );
3772 });
3773
3774 // Simulate refusing the second message. The message should be truncated
3775 // when a user prompt is refused.
3776 refuse_next.store(true, SeqCst);
3777 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["world".into()], cx)))
3778 .await
3779 .unwrap();
3780 thread.read_with(cx, |thread, cx| {
3781 assert_eq!(
3782 thread.to_markdown(cx),
3783 indoc! {"
3784 ## User
3785
3786 hello
3787
3788 ## Assistant
3789
3790 HELLO
3791
3792 "}
3793 );
3794 });
3795 }
3796
3797 async fn run_until_first_tool_call(
3798 thread: &Entity<AcpThread>,
3799 cx: &mut TestAppContext,
3800 ) -> usize {
3801 let (mut tx, mut rx) = mpsc::channel::<usize>(1);
3802
3803 let subscription = cx.update(|cx| {
3804 cx.subscribe(thread, move |thread, _, cx| {
3805 for (ix, entry) in thread.read(cx).entries.iter().enumerate() {
3806 if matches!(entry, AgentThreadEntry::ToolCall(_)) {
3807 return tx.try_send(ix).unwrap();
3808 }
3809 }
3810 })
3811 });
3812
3813 select! {
3814 _ = futures::FutureExt::fuse(cx.background_executor.timer(Duration::from_secs(10))) => {
3815 panic!("Timeout waiting for tool call")
3816 }
3817 ix = rx.next().fuse() => {
3818 drop(subscription);
3819 ix.unwrap()
3820 }
3821 }
3822 }
3823
3824 #[derive(Clone, Default)]
3825 struct FakeAgentConnection {
3826 auth_methods: Vec<acp::AuthMethod>,
3827 sessions: Arc<parking_lot::Mutex<HashMap<acp::SessionId, WeakEntity<AcpThread>>>>,
3828 on_user_message: Option<
3829 Rc<
3830 dyn Fn(
3831 acp::PromptRequest,
3832 WeakEntity<AcpThread>,
3833 AsyncApp,
3834 ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
3835 + 'static,
3836 >,
3837 >,
3838 }
3839
3840 impl FakeAgentConnection {
3841 fn new() -> Self {
3842 Self {
3843 auth_methods: Vec::new(),
3844 on_user_message: None,
3845 sessions: Arc::default(),
3846 }
3847 }
3848
3849 #[expect(unused)]
3850 fn with_auth_methods(mut self, auth_methods: Vec<acp::AuthMethod>) -> Self {
3851 self.auth_methods = auth_methods;
3852 self
3853 }
3854
3855 fn on_user_message(
3856 mut self,
3857 handler: impl Fn(
3858 acp::PromptRequest,
3859 WeakEntity<AcpThread>,
3860 AsyncApp,
3861 ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
3862 + 'static,
3863 ) -> Self {
3864 self.on_user_message.replace(Rc::new(handler));
3865 self
3866 }
3867 }
3868
3869 impl AgentConnection for FakeAgentConnection {
3870 fn telemetry_id(&self) -> SharedString {
3871 "fake".into()
3872 }
3873
3874 fn auth_methods(&self) -> &[acp::AuthMethod] {
3875 &self.auth_methods
3876 }
3877
3878 fn new_thread(
3879 self: Rc<Self>,
3880 project: Entity<Project>,
3881 _cwd: &Path,
3882 cx: &mut App,
3883 ) -> Task<gpui::Result<Entity<AcpThread>>> {
3884 let session_id = acp::SessionId::new(
3885 rand::rng()
3886 .sample_iter(&distr::Alphanumeric)
3887 .take(7)
3888 .map(char::from)
3889 .collect::<String>(),
3890 );
3891 let action_log = cx.new(|_| ActionLog::new(project.clone()));
3892 let thread = cx.new(|cx| {
3893 AcpThread::new(
3894 "Test",
3895 self.clone(),
3896 project,
3897 action_log,
3898 session_id.clone(),
3899 watch::Receiver::constant(
3900 acp::PromptCapabilities::new()
3901 .image(true)
3902 .audio(true)
3903 .embedded_context(true),
3904 ),
3905 cx,
3906 )
3907 });
3908 self.sessions.lock().insert(session_id, thread.downgrade());
3909 Task::ready(Ok(thread))
3910 }
3911
3912 fn authenticate(&self, method: acp::AuthMethodId, _cx: &mut App) -> Task<gpui::Result<()>> {
3913 if self.auth_methods().iter().any(|m| m.id == method) {
3914 Task::ready(Ok(()))
3915 } else {
3916 Task::ready(Err(anyhow!("Invalid Auth Method")))
3917 }
3918 }
3919
3920 fn prompt(
3921 &self,
3922 _id: Option<UserMessageId>,
3923 params: acp::PromptRequest,
3924 cx: &mut App,
3925 ) -> Task<gpui::Result<acp::PromptResponse>> {
3926 let sessions = self.sessions.lock();
3927 let thread = sessions.get(¶ms.session_id).unwrap();
3928 if let Some(handler) = &self.on_user_message {
3929 let handler = handler.clone();
3930 let thread = thread.clone();
3931 cx.spawn(async move |cx| handler(params, thread, cx.clone()).await)
3932 } else {
3933 Task::ready(Ok(acp::PromptResponse::new(acp::StopReason::EndTurn)))
3934 }
3935 }
3936
3937 fn cancel(&self, session_id: &acp::SessionId, cx: &mut App) {
3938 let sessions = self.sessions.lock();
3939 let thread = sessions.get(session_id).unwrap().clone();
3940
3941 cx.spawn(async move |cx| {
3942 thread
3943 .update(cx, |thread, cx| thread.cancel(cx))
3944 .unwrap()
3945 .await
3946 })
3947 .detach();
3948 }
3949
3950 fn truncate(
3951 &self,
3952 session_id: &acp::SessionId,
3953 _cx: &App,
3954 ) -> Option<Rc<dyn AgentSessionTruncate>> {
3955 Some(Rc::new(FakeAgentSessionEditor {
3956 _session_id: session_id.clone(),
3957 }))
3958 }
3959
3960 fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
3961 self
3962 }
3963 }
3964
3965 struct FakeAgentSessionEditor {
3966 _session_id: acp::SessionId,
3967 }
3968
3969 impl AgentSessionTruncate for FakeAgentSessionEditor {
3970 fn run(&self, _message_id: UserMessageId, _cx: &mut App) -> Task<Result<()>> {
3971 Task::ready(Ok(()))
3972 }
3973 }
3974
3975 #[gpui::test]
3976 async fn test_tool_call_not_found_creates_failed_entry(cx: &mut TestAppContext) {
3977 init_test(cx);
3978
3979 let fs = FakeFs::new(cx.executor());
3980 let project = Project::test(fs, [], cx).await;
3981 let connection = Rc::new(FakeAgentConnection::new());
3982 let thread = cx
3983 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
3984 .await
3985 .unwrap();
3986
3987 // Try to update a tool call that doesn't exist
3988 let nonexistent_id = acp::ToolCallId::new("nonexistent-tool-call");
3989 thread.update(cx, |thread, cx| {
3990 let result = thread.handle_session_update(
3991 acp::SessionUpdate::ToolCallUpdate(acp::ToolCallUpdate::new(
3992 nonexistent_id.clone(),
3993 acp::ToolCallUpdateFields::new().status(acp::ToolCallStatus::Completed),
3994 )),
3995 cx,
3996 );
3997
3998 // The update should succeed (not return an error)
3999 assert!(result.is_ok());
4000
4001 // There should now be exactly one entry in the thread
4002 assert_eq!(thread.entries.len(), 1);
4003
4004 // The entry should be a failed tool call
4005 if let AgentThreadEntry::ToolCall(tool_call) = &thread.entries[0] {
4006 assert_eq!(tool_call.id, nonexistent_id);
4007 assert!(matches!(tool_call.status, ToolCallStatus::Failed));
4008 assert_eq!(tool_call.kind, acp::ToolKind::Fetch);
4009
4010 // Check that the content contains the error message
4011 assert_eq!(tool_call.content.len(), 1);
4012 if let ToolCallContent::ContentBlock(content_block) = &tool_call.content[0] {
4013 match content_block {
4014 ContentBlock::Markdown { markdown } => {
4015 let markdown_text = markdown.read(cx).source();
4016 assert!(markdown_text.contains("Tool call not found"));
4017 }
4018 ContentBlock::Empty => panic!("Expected markdown content, got empty"),
4019 ContentBlock::ResourceLink { .. } => {
4020 panic!("Expected markdown content, got resource link")
4021 }
4022 ContentBlock::Image { .. } => {
4023 panic!("Expected markdown content, got image")
4024 }
4025 }
4026 } else {
4027 panic!("Expected ContentBlock, got: {:?}", tool_call.content[0]);
4028 }
4029 } else {
4030 panic!("Expected ToolCall entry, got: {:?}", thread.entries[0]);
4031 }
4032 });
4033 }
4034
4035 /// Tests that restoring a checkpoint properly cleans up terminals that were
4036 /// created after that checkpoint, and cancels any in-progress generation.
4037 ///
4038 /// Reproduces issue #35142: When a checkpoint is restored, any terminal processes
4039 /// that were started after that checkpoint should be terminated, and any in-progress
4040 /// AI generation should be canceled.
4041 #[gpui::test]
4042 async fn test_restore_checkpoint_kills_terminal(cx: &mut TestAppContext) {
4043 init_test(cx);
4044
4045 let fs = FakeFs::new(cx.executor());
4046 let project = Project::test(fs, [], cx).await;
4047 let connection = Rc::new(FakeAgentConnection::new());
4048 let thread = cx
4049 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
4050 .await
4051 .unwrap();
4052
4053 // Send first user message to create a checkpoint
4054 cx.update(|cx| {
4055 thread.update(cx, |thread, cx| {
4056 thread.send(vec!["first message".into()], cx)
4057 })
4058 })
4059 .await
4060 .unwrap();
4061
4062 // Send second message (creates another checkpoint) - we'll restore to this one
4063 cx.update(|cx| {
4064 thread.update(cx, |thread, cx| {
4065 thread.send(vec!["second message".into()], cx)
4066 })
4067 })
4068 .await
4069 .unwrap();
4070
4071 // Create 2 terminals BEFORE the checkpoint that have completed running
4072 let terminal_id_1 = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
4073 let mock_terminal_1 = cx.new(|cx| {
4074 let builder = ::terminal::TerminalBuilder::new_display_only(
4075 ::terminal::terminal_settings::CursorShape::default(),
4076 ::terminal::terminal_settings::AlternateScroll::On,
4077 None,
4078 0,
4079 )
4080 .unwrap();
4081 builder.subscribe(cx)
4082 });
4083
4084 thread.update(cx, |thread, cx| {
4085 thread.on_terminal_provider_event(
4086 TerminalProviderEvent::Created {
4087 terminal_id: terminal_id_1.clone(),
4088 label: "echo 'first'".to_string(),
4089 cwd: Some(PathBuf::from("/test")),
4090 output_byte_limit: None,
4091 terminal: mock_terminal_1.clone(),
4092 },
4093 cx,
4094 );
4095 });
4096
4097 thread.update(cx, |thread, cx| {
4098 thread.on_terminal_provider_event(
4099 TerminalProviderEvent::Output {
4100 terminal_id: terminal_id_1.clone(),
4101 data: b"first\n".to_vec(),
4102 },
4103 cx,
4104 );
4105 });
4106
4107 thread.update(cx, |thread, cx| {
4108 thread.on_terminal_provider_event(
4109 TerminalProviderEvent::Exit {
4110 terminal_id: terminal_id_1.clone(),
4111 status: acp::TerminalExitStatus::new().exit_code(0),
4112 },
4113 cx,
4114 );
4115 });
4116
4117 let terminal_id_2 = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
4118 let mock_terminal_2 = cx.new(|cx| {
4119 let builder = ::terminal::TerminalBuilder::new_display_only(
4120 ::terminal::terminal_settings::CursorShape::default(),
4121 ::terminal::terminal_settings::AlternateScroll::On,
4122 None,
4123 0,
4124 )
4125 .unwrap();
4126 builder.subscribe(cx)
4127 });
4128
4129 thread.update(cx, |thread, cx| {
4130 thread.on_terminal_provider_event(
4131 TerminalProviderEvent::Created {
4132 terminal_id: terminal_id_2.clone(),
4133 label: "echo 'second'".to_string(),
4134 cwd: Some(PathBuf::from("/test")),
4135 output_byte_limit: None,
4136 terminal: mock_terminal_2.clone(),
4137 },
4138 cx,
4139 );
4140 });
4141
4142 thread.update(cx, |thread, cx| {
4143 thread.on_terminal_provider_event(
4144 TerminalProviderEvent::Output {
4145 terminal_id: terminal_id_2.clone(),
4146 data: b"second\n".to_vec(),
4147 },
4148 cx,
4149 );
4150 });
4151
4152 thread.update(cx, |thread, cx| {
4153 thread.on_terminal_provider_event(
4154 TerminalProviderEvent::Exit {
4155 terminal_id: terminal_id_2.clone(),
4156 status: acp::TerminalExitStatus::new().exit_code(0),
4157 },
4158 cx,
4159 );
4160 });
4161
4162 // Get the second message ID to restore to
4163 let second_message_id = thread.read_with(cx, |thread, _| {
4164 // At this point we have:
4165 // - Index 0: First user message (with checkpoint)
4166 // - Index 1: Second user message (with checkpoint)
4167 // No assistant responses because FakeAgentConnection just returns EndTurn
4168 let AgentThreadEntry::UserMessage(message) = &thread.entries[1] else {
4169 panic!("expected user message at index 1");
4170 };
4171 message.id.clone().unwrap()
4172 });
4173
4174 // Create a terminal AFTER the checkpoint we'll restore to.
4175 // This simulates the AI agent starting a long-running terminal command.
4176 let terminal_id = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
4177 let mock_terminal = cx.new(|cx| {
4178 let builder = ::terminal::TerminalBuilder::new_display_only(
4179 ::terminal::terminal_settings::CursorShape::default(),
4180 ::terminal::terminal_settings::AlternateScroll::On,
4181 None,
4182 0,
4183 )
4184 .unwrap();
4185 builder.subscribe(cx)
4186 });
4187
4188 // Register the terminal as created
4189 thread.update(cx, |thread, cx| {
4190 thread.on_terminal_provider_event(
4191 TerminalProviderEvent::Created {
4192 terminal_id: terminal_id.clone(),
4193 label: "sleep 1000".to_string(),
4194 cwd: Some(PathBuf::from("/test")),
4195 output_byte_limit: None,
4196 terminal: mock_terminal.clone(),
4197 },
4198 cx,
4199 );
4200 });
4201
4202 // Simulate the terminal producing output (still running)
4203 thread.update(cx, |thread, cx| {
4204 thread.on_terminal_provider_event(
4205 TerminalProviderEvent::Output {
4206 terminal_id: terminal_id.clone(),
4207 data: b"terminal is running...\n".to_vec(),
4208 },
4209 cx,
4210 );
4211 });
4212
4213 // Create a tool call entry that references this terminal
4214 // This represents the agent requesting a terminal command
4215 thread.update(cx, |thread, cx| {
4216 thread
4217 .handle_session_update(
4218 acp::SessionUpdate::ToolCall(
4219 acp::ToolCall::new("terminal-tool-1", "Running command")
4220 .kind(acp::ToolKind::Execute)
4221 .status(acp::ToolCallStatus::InProgress)
4222 .content(vec![acp::ToolCallContent::Terminal(acp::Terminal::new(
4223 terminal_id.clone(),
4224 ))])
4225 .raw_input(serde_json::json!({"command": "sleep 1000", "cd": "/test"})),
4226 ),
4227 cx,
4228 )
4229 .unwrap();
4230 });
4231
4232 // Verify terminal exists and is in the thread
4233 let terminal_exists_before =
4234 thread.read_with(cx, |thread, _| thread.terminals.contains_key(&terminal_id));
4235 assert!(
4236 terminal_exists_before,
4237 "Terminal should exist before checkpoint restore"
4238 );
4239
4240 // Verify the terminal's underlying task is still running (not completed)
4241 let terminal_running_before = thread.read_with(cx, |thread, _cx| {
4242 let terminal_entity = thread.terminals.get(&terminal_id).unwrap();
4243 terminal_entity.read_with(cx, |term, _cx| {
4244 term.output().is_none() // output is None means it's still running
4245 })
4246 });
4247 assert!(
4248 terminal_running_before,
4249 "Terminal should be running before checkpoint restore"
4250 );
4251
4252 // Verify we have the expected entries before restore
4253 let entry_count_before = thread.read_with(cx, |thread, _| thread.entries.len());
4254 assert!(
4255 entry_count_before > 1,
4256 "Should have multiple entries before restore"
4257 );
4258
4259 // Restore the checkpoint to the second message.
4260 // This should:
4261 // 1. Cancel any in-progress generation (via the cancel() call)
4262 // 2. Remove the terminal that was created after that point
4263 thread
4264 .update(cx, |thread, cx| {
4265 thread.restore_checkpoint(second_message_id, cx)
4266 })
4267 .await
4268 .unwrap();
4269
4270 // Verify that no send_task is in progress after restore
4271 // (cancel() clears the send_task)
4272 let has_send_task_after = thread.read_with(cx, |thread, _| thread.send_task.is_some());
4273 assert!(
4274 !has_send_task_after,
4275 "Should not have a send_task after restore (cancel should have cleared it)"
4276 );
4277
4278 // Verify the entries were truncated (restoring to index 1 truncates at 1, keeping only index 0)
4279 let entry_count = thread.read_with(cx, |thread, _| thread.entries.len());
4280 assert_eq!(
4281 entry_count, 1,
4282 "Should have 1 entry after restore (only the first user message)"
4283 );
4284
4285 // Verify the 2 completed terminals from before the checkpoint still exist
4286 let terminal_1_exists = thread.read_with(cx, |thread, _| {
4287 thread.terminals.contains_key(&terminal_id_1)
4288 });
4289 assert!(
4290 terminal_1_exists,
4291 "Terminal 1 (from before checkpoint) should still exist"
4292 );
4293
4294 let terminal_2_exists = thread.read_with(cx, |thread, _| {
4295 thread.terminals.contains_key(&terminal_id_2)
4296 });
4297 assert!(
4298 terminal_2_exists,
4299 "Terminal 2 (from before checkpoint) should still exist"
4300 );
4301
4302 // Verify they're still in completed state
4303 let terminal_1_completed = thread.read_with(cx, |thread, _cx| {
4304 let terminal_entity = thread.terminals.get(&terminal_id_1).unwrap();
4305 terminal_entity.read_with(cx, |term, _cx| term.output().is_some())
4306 });
4307 assert!(terminal_1_completed, "Terminal 1 should still be completed");
4308
4309 let terminal_2_completed = thread.read_with(cx, |thread, _cx| {
4310 let terminal_entity = thread.terminals.get(&terminal_id_2).unwrap();
4311 terminal_entity.read_with(cx, |term, _cx| term.output().is_some())
4312 });
4313 assert!(terminal_2_completed, "Terminal 2 should still be completed");
4314
4315 // Verify the running terminal (created after checkpoint) was removed
4316 let terminal_3_exists =
4317 thread.read_with(cx, |thread, _| thread.terminals.contains_key(&terminal_id));
4318 assert!(
4319 !terminal_3_exists,
4320 "Terminal 3 (created after checkpoint) should have been removed"
4321 );
4322
4323 // Verify total count is 2 (the two from before the checkpoint)
4324 let terminal_count = thread.read_with(cx, |thread, _| thread.terminals.len());
4325 assert_eq!(
4326 terminal_count, 2,
4327 "Should have exactly 2 terminals (the completed ones from before checkpoint)"
4328 );
4329 }
4330
4331 /// Tests that update_last_checkpoint correctly updates the original message's checkpoint
4332 /// even when a new user message is added while the async checkpoint comparison is in progress.
4333 ///
4334 /// This is a regression test for a bug where update_last_checkpoint would fail with
4335 /// "no checkpoint" if a new user message (without a checkpoint) was added between when
4336 /// update_last_checkpoint started and when its async closure ran.
4337 #[gpui::test]
4338 async fn test_update_last_checkpoint_with_new_message_added(cx: &mut TestAppContext) {
4339 init_test(cx);
4340
4341 let fs = FakeFs::new(cx.executor());
4342 fs.insert_tree(path!("/test"), json!({".git": {}, "file.txt": "content"}))
4343 .await;
4344 let project = Project::test(fs.clone(), [Path::new(path!("/test"))], cx).await;
4345
4346 let handler_done = Arc::new(AtomicBool::new(false));
4347 let handler_done_clone = handler_done.clone();
4348 let connection = Rc::new(FakeAgentConnection::new().on_user_message(
4349 move |_, _thread, _cx| {
4350 handler_done_clone.store(true, SeqCst);
4351 async move { Ok(acp::PromptResponse::new(acp::StopReason::EndTurn)) }.boxed_local()
4352 },
4353 ));
4354
4355 let thread = cx
4356 .update(|cx| connection.new_thread(project, Path::new(path!("/test")), cx))
4357 .await
4358 .unwrap();
4359
4360 let send_future = thread.update(cx, |thread, cx| thread.send_raw("First message", cx));
4361 let send_task = cx.background_executor.spawn(send_future);
4362
4363 // Tick until handler completes, then a few more to let update_last_checkpoint start
4364 while !handler_done.load(SeqCst) {
4365 cx.executor().tick();
4366 }
4367 for _ in 0..5 {
4368 cx.executor().tick();
4369 }
4370
4371 thread.update(cx, |thread, cx| {
4372 thread.push_entry(
4373 AgentThreadEntry::UserMessage(UserMessage {
4374 id: Some(UserMessageId::new()),
4375 content: ContentBlock::Empty,
4376 chunks: vec!["Injected message (no checkpoint)".into()],
4377 checkpoint: None,
4378 indented: false,
4379 }),
4380 cx,
4381 );
4382 });
4383
4384 cx.run_until_parked();
4385 let result = send_task.await;
4386
4387 assert!(
4388 result.is_ok(),
4389 "send should succeed even when new message added during update_last_checkpoint: {:?}",
4390 result.err()
4391 );
4392 }
4393}