1mod connection;
2mod diff;
3mod mention;
4mod terminal;
5use action_log::{ActionLog, ActionLogTelemetry};
6use agent_client_protocol::{self as acp};
7use anyhow::{Context as _, Result, anyhow};
8use collections::HashSet;
9pub use connection::*;
10pub use diff::*;
11use futures::{FutureExt, channel::oneshot, future::BoxFuture};
12use gpui::{AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Task, WeakEntity};
13use itertools::Itertools;
14use language::language_settings::FormatOnSave;
15use language::{Anchor, Buffer, BufferSnapshot, LanguageRegistry, Point, ToPoint, text_diff};
16use markdown::Markdown;
17pub use mention::*;
18use project::lsp_store::{FormatTrigger, LspFormatTarget};
19use project::{AgentLocation, Project, git_store::GitStoreCheckpoint};
20use serde::{Deserialize, Serialize};
21use serde_json::to_string_pretty;
22use std::collections::HashMap;
23use std::error::Error;
24use std::fmt::{Formatter, Write};
25use std::ops::Range;
26use std::process::ExitStatus;
27use std::rc::Rc;
28use std::time::{Duration, Instant};
29use std::{fmt::Display, mem, path::PathBuf, sync::Arc};
30use task::{Shell, ShellBuilder};
31pub use terminal::*;
32use text::Bias;
33use ui::App;
34use util::markdown::MarkdownEscaped;
35use util::path_list::PathList;
36use util::{ResultExt, get_default_system_shell_preferring_bash, paths::PathStyle};
37use uuid::Uuid;
38
39/// Key used in ACP ToolCall meta to store the tool's programmatic name.
40/// This is a workaround since ACP's ToolCall doesn't have a dedicated name field.
41pub const TOOL_NAME_META_KEY: &str = "tool_name";
42
43/// Helper to extract tool name from ACP meta
44pub fn tool_name_from_meta(meta: &Option<acp::Meta>) -> Option<SharedString> {
45 meta.as_ref()
46 .and_then(|m| m.get(TOOL_NAME_META_KEY))
47 .and_then(|v| v.as_str())
48 .map(|s| SharedString::from(s.to_owned()))
49}
50
51/// Helper to create meta with tool name
52pub fn meta_with_tool_name(tool_name: &str) -> acp::Meta {
53 acp::Meta::from_iter([(TOOL_NAME_META_KEY.into(), tool_name.into())])
54}
55
56/// Key used in ACP ToolCall meta to store the session id and message indexes
57pub const SUBAGENT_SESSION_INFO_META_KEY: &str = "subagent_session_info";
58
59#[derive(Clone, Debug, Deserialize, Serialize)]
60pub struct SubagentSessionInfo {
61 /// The session id of the subagent sessiont that was spawned
62 pub session_id: acp::SessionId,
63 /// The index of the message of the start of the "turn" run by this tool call
64 pub message_start_index: usize,
65 /// The index of the output of the message that the subagent has returned
66 #[serde(skip_serializing_if = "Option::is_none")]
67 pub message_end_index: Option<usize>,
68}
69
70/// Helper to extract subagent session id from ACP meta
71pub fn subagent_session_info_from_meta(meta: &Option<acp::Meta>) -> Option<SubagentSessionInfo> {
72 meta.as_ref()
73 .and_then(|m| m.get(SUBAGENT_SESSION_INFO_META_KEY))
74 .and_then(|v| serde_json::from_value(v.clone()).ok())
75}
76
77#[derive(Debug)]
78pub struct UserMessage {
79 pub id: Option<UserMessageId>,
80 pub content: ContentBlock,
81 pub chunks: Vec<acp::ContentBlock>,
82 pub checkpoint: Option<Checkpoint>,
83 pub indented: bool,
84}
85
86#[derive(Debug)]
87pub struct Checkpoint {
88 git_checkpoint: GitStoreCheckpoint,
89 pub show: bool,
90}
91
92impl UserMessage {
93 fn to_markdown(&self, cx: &App) -> String {
94 let mut markdown = String::new();
95 if self
96 .checkpoint
97 .as_ref()
98 .is_some_and(|checkpoint| checkpoint.show)
99 {
100 writeln!(markdown, "## User (checkpoint)").unwrap();
101 } else {
102 writeln!(markdown, "## User").unwrap();
103 }
104 writeln!(markdown).unwrap();
105 writeln!(markdown, "{}", self.content.to_markdown(cx)).unwrap();
106 writeln!(markdown).unwrap();
107 markdown
108 }
109}
110
111#[derive(Debug, PartialEq)]
112pub struct AssistantMessage {
113 pub chunks: Vec<AssistantMessageChunk>,
114 pub indented: bool,
115 pub is_subagent_output: bool,
116}
117
118impl AssistantMessage {
119 pub fn to_markdown(&self, cx: &App) -> String {
120 format!(
121 "## Assistant\n\n{}\n\n",
122 self.chunks
123 .iter()
124 .map(|chunk| chunk.to_markdown(cx))
125 .join("\n\n")
126 )
127 }
128}
129
130#[derive(Debug, PartialEq)]
131pub enum AssistantMessageChunk {
132 Message { block: ContentBlock },
133 Thought { block: ContentBlock },
134}
135
136impl AssistantMessageChunk {
137 pub fn from_str(
138 chunk: &str,
139 language_registry: &Arc<LanguageRegistry>,
140 path_style: PathStyle,
141 cx: &mut App,
142 ) -> Self {
143 Self::Message {
144 block: ContentBlock::new(chunk.into(), language_registry, path_style, cx),
145 }
146 }
147
148 fn to_markdown(&self, cx: &App) -> String {
149 match self {
150 Self::Message { block } => block.to_markdown(cx).to_string(),
151 Self::Thought { block } => {
152 format!("<thinking>\n{}\n</thinking>", block.to_markdown(cx))
153 }
154 }
155 }
156}
157
158#[derive(Debug)]
159pub enum AgentThreadEntry {
160 UserMessage(UserMessage),
161 AssistantMessage(AssistantMessage),
162 ToolCall(ToolCall),
163}
164
165impl AgentThreadEntry {
166 pub fn is_indented(&self) -> bool {
167 match self {
168 Self::UserMessage(message) => message.indented,
169 Self::AssistantMessage(message) => message.indented,
170 Self::ToolCall(_) => false,
171 }
172 }
173
174 pub fn to_markdown(&self, cx: &App) -> String {
175 match self {
176 Self::UserMessage(message) => message.to_markdown(cx),
177 Self::AssistantMessage(message) => message.to_markdown(cx),
178 Self::ToolCall(tool_call) => tool_call.to_markdown(cx),
179 }
180 }
181
182 pub fn user_message(&self) -> Option<&UserMessage> {
183 if let AgentThreadEntry::UserMessage(message) = self {
184 Some(message)
185 } else {
186 None
187 }
188 }
189
190 pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
191 if let AgentThreadEntry::ToolCall(call) = self {
192 itertools::Either::Left(call.diffs())
193 } else {
194 itertools::Either::Right(std::iter::empty())
195 }
196 }
197
198 pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
199 if let AgentThreadEntry::ToolCall(call) = self {
200 itertools::Either::Left(call.terminals())
201 } else {
202 itertools::Either::Right(std::iter::empty())
203 }
204 }
205
206 pub fn location(&self, ix: usize) -> Option<(acp::ToolCallLocation, AgentLocation)> {
207 if let AgentThreadEntry::ToolCall(ToolCall {
208 locations,
209 resolved_locations,
210 ..
211 }) = self
212 {
213 Some((
214 locations.get(ix)?.clone(),
215 resolved_locations.get(ix)?.clone()?,
216 ))
217 } else {
218 None
219 }
220 }
221}
222
223#[derive(Debug)]
224pub struct ToolCall {
225 pub id: acp::ToolCallId,
226 pub label: Entity<Markdown>,
227 pub kind: acp::ToolKind,
228 pub content: Vec<ToolCallContent>,
229 pub status: ToolCallStatus,
230 pub locations: Vec<acp::ToolCallLocation>,
231 pub resolved_locations: Vec<Option<AgentLocation>>,
232 pub raw_input: Option<serde_json::Value>,
233 pub raw_input_markdown: Option<Entity<Markdown>>,
234 pub raw_output: Option<serde_json::Value>,
235 pub tool_name: Option<SharedString>,
236 pub subagent_session_info: Option<SubagentSessionInfo>,
237}
238
239impl ToolCall {
240 fn from_acp(
241 tool_call: acp::ToolCall,
242 status: ToolCallStatus,
243 language_registry: Arc<LanguageRegistry>,
244 path_style: PathStyle,
245 terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
246 cx: &mut App,
247 ) -> Result<Self> {
248 let title = if tool_call.kind == acp::ToolKind::Execute {
249 tool_call.title
250 } else if tool_call.kind == acp::ToolKind::Edit {
251 MarkdownEscaped(tool_call.title.as_str()).to_string()
252 } else if let Some((first_line, _)) = tool_call.title.split_once("\n") {
253 first_line.to_owned() + "…"
254 } else {
255 tool_call.title
256 };
257 let mut content = Vec::with_capacity(tool_call.content.len());
258 for item in tool_call.content {
259 if let Some(item) = ToolCallContent::from_acp(
260 item,
261 language_registry.clone(),
262 path_style,
263 terminals,
264 cx,
265 )? {
266 content.push(item);
267 }
268 }
269
270 let raw_input_markdown = tool_call
271 .raw_input
272 .as_ref()
273 .and_then(|input| markdown_for_raw_output(input, &language_registry, cx));
274
275 let tool_name = tool_name_from_meta(&tool_call.meta);
276
277 let subagent_session_info = subagent_session_info_from_meta(&tool_call.meta);
278
279 let result = Self {
280 id: tool_call.tool_call_id,
281 label: cx
282 .new(|cx| Markdown::new(title.into(), Some(language_registry.clone()), None, cx)),
283 kind: tool_call.kind,
284 content,
285 locations: tool_call.locations,
286 resolved_locations: Vec::default(),
287 status,
288 raw_input: tool_call.raw_input,
289 raw_input_markdown,
290 raw_output: tool_call.raw_output,
291 tool_name,
292 subagent_session_info,
293 };
294 Ok(result)
295 }
296
297 fn update_fields(
298 &mut self,
299 fields: acp::ToolCallUpdateFields,
300 meta: Option<acp::Meta>,
301 language_registry: Arc<LanguageRegistry>,
302 path_style: PathStyle,
303 terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
304 cx: &mut App,
305 ) -> Result<()> {
306 let acp::ToolCallUpdateFields {
307 kind,
308 status,
309 title,
310 content,
311 locations,
312 raw_input,
313 raw_output,
314 ..
315 } = fields;
316
317 if let Some(kind) = kind {
318 self.kind = kind;
319 }
320
321 if let Some(status) = status {
322 self.status = status.into();
323 }
324
325 if let Some(subagent_session_info) = subagent_session_info_from_meta(&meta) {
326 self.subagent_session_info = Some(subagent_session_info);
327 }
328
329 if let Some(title) = title {
330 if self.kind == acp::ToolKind::Execute {
331 for terminal in self.terminals() {
332 terminal.update(cx, |terminal, cx| {
333 terminal.update_command_label(&title, cx);
334 });
335 }
336 }
337 self.label.update(cx, |label, cx| {
338 if self.kind == acp::ToolKind::Execute {
339 label.replace(title, cx);
340 } else if self.kind == acp::ToolKind::Edit {
341 label.replace(MarkdownEscaped(&title).to_string(), cx)
342 } else if let Some((first_line, _)) = title.split_once("\n") {
343 label.replace(first_line.to_owned() + "…", cx);
344 } else {
345 label.replace(title, cx);
346 }
347 });
348 }
349
350 if let Some(content) = content {
351 let mut new_content_len = content.len();
352 let mut content = content.into_iter();
353
354 // Reuse existing content if we can
355 for (old, new) in self.content.iter_mut().zip(content.by_ref()) {
356 let valid_content =
357 old.update_from_acp(new, language_registry.clone(), path_style, terminals, cx)?;
358 if !valid_content {
359 new_content_len -= 1;
360 }
361 }
362 for new in content {
363 if let Some(new) = ToolCallContent::from_acp(
364 new,
365 language_registry.clone(),
366 path_style,
367 terminals,
368 cx,
369 )? {
370 self.content.push(new);
371 } else {
372 new_content_len -= 1;
373 }
374 }
375 self.content.truncate(new_content_len);
376 }
377
378 if let Some(locations) = locations {
379 self.locations = locations;
380 }
381
382 if let Some(raw_input) = raw_input {
383 self.raw_input_markdown = markdown_for_raw_output(&raw_input, &language_registry, cx);
384 self.raw_input = Some(raw_input);
385 }
386
387 if let Some(raw_output) = raw_output {
388 if self.content.is_empty()
389 && let Some(markdown) = markdown_for_raw_output(&raw_output, &language_registry, cx)
390 {
391 self.content
392 .push(ToolCallContent::ContentBlock(ContentBlock::Markdown {
393 markdown,
394 }));
395 }
396 self.raw_output = Some(raw_output);
397 }
398 Ok(())
399 }
400
401 pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
402 self.content.iter().filter_map(|content| match content {
403 ToolCallContent::Diff(diff) => Some(diff),
404 ToolCallContent::ContentBlock(_) => None,
405 ToolCallContent::Terminal(_) => None,
406 })
407 }
408
409 pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
410 self.content.iter().filter_map(|content| match content {
411 ToolCallContent::Terminal(terminal) => Some(terminal),
412 ToolCallContent::ContentBlock(_) => None,
413 ToolCallContent::Diff(_) => None,
414 })
415 }
416
417 pub fn is_subagent(&self) -> bool {
418 self.tool_name.as_ref().is_some_and(|s| s == "spawn_agent")
419 || self.subagent_session_info.is_some()
420 }
421
422 pub fn to_markdown(&self, cx: &App) -> String {
423 let mut markdown = format!(
424 "**Tool Call: {}**\nStatus: {}\n\n",
425 self.label.read(cx).source(),
426 self.status
427 );
428 for content in &self.content {
429 markdown.push_str(content.to_markdown(cx).as_str());
430 markdown.push_str("\n\n");
431 }
432 markdown
433 }
434
435 async fn resolve_location(
436 location: acp::ToolCallLocation,
437 project: WeakEntity<Project>,
438 cx: &mut AsyncApp,
439 ) -> Option<ResolvedLocation> {
440 let buffer = project
441 .update(cx, |project, cx| {
442 project
443 .project_path_for_absolute_path(&location.path, cx)
444 .map(|path| project.open_buffer(path, cx))
445 })
446 .ok()??;
447 let buffer = buffer.await.log_err()?;
448 let position = buffer.update(cx, |buffer, _| {
449 let snapshot = buffer.snapshot();
450 if let Some(row) = location.line {
451 let column = snapshot.indent_size_for_line(row).len;
452 let point = snapshot.clip_point(Point::new(row, column), Bias::Left);
453 snapshot.anchor_before(point)
454 } else {
455 Anchor::min_for_buffer(snapshot.remote_id())
456 }
457 });
458
459 Some(ResolvedLocation { buffer, position })
460 }
461
462 fn resolve_locations(
463 &self,
464 project: Entity<Project>,
465 cx: &mut App,
466 ) -> Task<Vec<Option<ResolvedLocation>>> {
467 let locations = self.locations.clone();
468 project.update(cx, |_, cx| {
469 cx.spawn(async move |project, cx| {
470 let mut new_locations = Vec::new();
471 for location in locations {
472 new_locations.push(Self::resolve_location(location, project.clone(), cx).await);
473 }
474 new_locations
475 })
476 })
477 }
478}
479
480// Separate so we can hold a strong reference to the buffer
481// for saving on the thread
482#[derive(Clone, Debug, PartialEq, Eq)]
483struct ResolvedLocation {
484 buffer: Entity<Buffer>,
485 position: Anchor,
486}
487
488impl From<&ResolvedLocation> for AgentLocation {
489 fn from(value: &ResolvedLocation) -> Self {
490 Self {
491 buffer: value.buffer.downgrade(),
492 position: value.position,
493 }
494 }
495}
496
497#[derive(Debug, Clone)]
498pub enum SelectedPermissionParams {
499 Terminal { patterns: Vec<String> },
500}
501
502#[derive(Debug)]
503pub struct SelectedPermissionOutcome {
504 pub option_id: acp::PermissionOptionId,
505 pub params: Option<SelectedPermissionParams>,
506}
507
508impl SelectedPermissionOutcome {
509 pub fn new(option_id: acp::PermissionOptionId) -> Self {
510 Self {
511 option_id,
512 params: None,
513 }
514 }
515
516 pub fn params(mut self, params: Option<SelectedPermissionParams>) -> Self {
517 self.params = params;
518 self
519 }
520}
521
522impl From<acp::PermissionOptionId> for SelectedPermissionOutcome {
523 fn from(option_id: acp::PermissionOptionId) -> Self {
524 Self::new(option_id)
525 }
526}
527
528impl From<SelectedPermissionOutcome> for acp::SelectedPermissionOutcome {
529 fn from(value: SelectedPermissionOutcome) -> Self {
530 Self::new(value.option_id)
531 }
532}
533
534#[derive(Debug)]
535pub enum RequestPermissionOutcome {
536 Cancelled,
537 Selected(SelectedPermissionOutcome),
538}
539
540impl From<RequestPermissionOutcome> for acp::RequestPermissionOutcome {
541 fn from(value: RequestPermissionOutcome) -> Self {
542 match value {
543 RequestPermissionOutcome::Cancelled => Self::Cancelled,
544 RequestPermissionOutcome::Selected(outcome) => Self::Selected(outcome.into()),
545 }
546 }
547}
548
549#[derive(Debug)]
550pub enum ToolCallStatus {
551 /// The tool call hasn't started running yet, but we start showing it to
552 /// the user.
553 Pending,
554 /// The tool call is waiting for confirmation from the user.
555 WaitingForConfirmation {
556 options: PermissionOptions,
557 respond_tx: oneshot::Sender<SelectedPermissionOutcome>,
558 },
559 /// The tool call is currently running.
560 InProgress,
561 /// The tool call completed successfully.
562 Completed,
563 /// The tool call failed.
564 Failed,
565 /// The user rejected the tool call.
566 Rejected,
567 /// The user canceled generation so the tool call was canceled.
568 Canceled,
569}
570
571impl From<acp::ToolCallStatus> for ToolCallStatus {
572 fn from(status: acp::ToolCallStatus) -> Self {
573 match status {
574 acp::ToolCallStatus::Pending => Self::Pending,
575 acp::ToolCallStatus::InProgress => Self::InProgress,
576 acp::ToolCallStatus::Completed => Self::Completed,
577 acp::ToolCallStatus::Failed => Self::Failed,
578 _ => Self::Pending,
579 }
580 }
581}
582
583impl Display for ToolCallStatus {
584 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
585 write!(
586 f,
587 "{}",
588 match self {
589 ToolCallStatus::Pending => "Pending",
590 ToolCallStatus::WaitingForConfirmation { .. } => "Waiting for confirmation",
591 ToolCallStatus::InProgress => "In Progress",
592 ToolCallStatus::Completed => "Completed",
593 ToolCallStatus::Failed => "Failed",
594 ToolCallStatus::Rejected => "Rejected",
595 ToolCallStatus::Canceled => "Canceled",
596 }
597 )
598 }
599}
600
601#[derive(Debug, PartialEq, Clone)]
602pub enum ContentBlock {
603 Empty,
604 Markdown { markdown: Entity<Markdown> },
605 ResourceLink { resource_link: acp::ResourceLink },
606 Image { image: Arc<gpui::Image> },
607}
608
609impl ContentBlock {
610 pub fn new(
611 block: acp::ContentBlock,
612 language_registry: &Arc<LanguageRegistry>,
613 path_style: PathStyle,
614 cx: &mut App,
615 ) -> Self {
616 let mut this = Self::Empty;
617 this.append(block, language_registry, path_style, cx);
618 this
619 }
620
621 pub fn new_combined(
622 blocks: impl IntoIterator<Item = acp::ContentBlock>,
623 language_registry: Arc<LanguageRegistry>,
624 path_style: PathStyle,
625 cx: &mut App,
626 ) -> Self {
627 let mut this = Self::Empty;
628 for block in blocks {
629 this.append(block, &language_registry, path_style, cx);
630 }
631 this
632 }
633
634 pub fn append(
635 &mut self,
636 block: acp::ContentBlock,
637 language_registry: &Arc<LanguageRegistry>,
638 path_style: PathStyle,
639 cx: &mut App,
640 ) {
641 match (&mut *self, &block) {
642 (ContentBlock::Empty, acp::ContentBlock::ResourceLink(resource_link)) => {
643 *self = ContentBlock::ResourceLink {
644 resource_link: resource_link.clone(),
645 };
646 }
647 (ContentBlock::Empty, acp::ContentBlock::Image(image_content)) => {
648 if let Some(image) = Self::decode_image(image_content) {
649 *self = ContentBlock::Image { image };
650 } else {
651 let new_content = Self::image_md(image_content);
652 *self = Self::create_markdown_block(new_content, language_registry, cx);
653 }
654 }
655 (ContentBlock::Empty, _) => {
656 let new_content = Self::block_string_contents(&block, path_style);
657 *self = Self::create_markdown_block(new_content, language_registry, cx);
658 }
659 (ContentBlock::Markdown { markdown }, _) => {
660 let new_content = Self::block_string_contents(&block, path_style);
661 markdown.update(cx, |markdown, cx| markdown.append(&new_content, cx));
662 }
663 (ContentBlock::ResourceLink { resource_link }, _) => {
664 let existing_content = Self::resource_link_md(&resource_link.uri, path_style);
665 let new_content = Self::block_string_contents(&block, path_style);
666 let combined = format!("{}\n{}", existing_content, new_content);
667 *self = Self::create_markdown_block(combined, language_registry, cx);
668 }
669 (ContentBlock::Image { .. }, _) => {
670 let new_content = Self::block_string_contents(&block, path_style);
671 let combined = format!("`Image`\n{}", new_content);
672 *self = Self::create_markdown_block(combined, language_registry, cx);
673 }
674 }
675 }
676
677 fn decode_image(image_content: &acp::ImageContent) -> Option<Arc<gpui::Image>> {
678 use base64::Engine as _;
679
680 let bytes = base64::engine::general_purpose::STANDARD
681 .decode(image_content.data.as_bytes())
682 .ok()?;
683 let format = gpui::ImageFormat::from_mime_type(&image_content.mime_type)?;
684 Some(Arc::new(gpui::Image::from_bytes(format, bytes)))
685 }
686
687 fn create_markdown_block(
688 content: String,
689 language_registry: &Arc<LanguageRegistry>,
690 cx: &mut App,
691 ) -> ContentBlock {
692 ContentBlock::Markdown {
693 markdown: cx
694 .new(|cx| Markdown::new(content.into(), Some(language_registry.clone()), None, cx)),
695 }
696 }
697
698 fn block_string_contents(block: &acp::ContentBlock, path_style: PathStyle) -> String {
699 match block {
700 acp::ContentBlock::Text(text_content) => text_content.text.clone(),
701 acp::ContentBlock::ResourceLink(resource_link) => {
702 Self::resource_link_md(&resource_link.uri, path_style)
703 }
704 acp::ContentBlock::Resource(acp::EmbeddedResource {
705 resource:
706 acp::EmbeddedResourceResource::TextResourceContents(acp::TextResourceContents {
707 uri,
708 ..
709 }),
710 ..
711 }) => Self::resource_link_md(uri, path_style),
712 acp::ContentBlock::Image(image) => Self::image_md(image),
713 _ => String::new(),
714 }
715 }
716
717 fn resource_link_md(uri: &str, path_style: PathStyle) -> String {
718 if let Some(uri) = MentionUri::parse(uri, path_style).log_err() {
719 uri.as_link().to_string()
720 } else {
721 uri.to_string()
722 }
723 }
724
725 fn image_md(_image: &acp::ImageContent) -> String {
726 "`Image`".into()
727 }
728
729 pub fn to_markdown<'a>(&'a self, cx: &'a App) -> &'a str {
730 match self {
731 ContentBlock::Empty => "",
732 ContentBlock::Markdown { markdown } => markdown.read(cx).source(),
733 ContentBlock::ResourceLink { resource_link } => &resource_link.uri,
734 ContentBlock::Image { .. } => "`Image`",
735 }
736 }
737
738 pub fn markdown(&self) -> Option<&Entity<Markdown>> {
739 match self {
740 ContentBlock::Empty => None,
741 ContentBlock::Markdown { markdown } => Some(markdown),
742 ContentBlock::ResourceLink { .. } => None,
743 ContentBlock::Image { .. } => None,
744 }
745 }
746
747 pub fn resource_link(&self) -> Option<&acp::ResourceLink> {
748 match self {
749 ContentBlock::ResourceLink { resource_link } => Some(resource_link),
750 _ => None,
751 }
752 }
753
754 pub fn image(&self) -> Option<&Arc<gpui::Image>> {
755 match self {
756 ContentBlock::Image { image } => Some(image),
757 _ => None,
758 }
759 }
760}
761
762#[derive(Debug)]
763pub enum ToolCallContent {
764 ContentBlock(ContentBlock),
765 Diff(Entity<Diff>),
766 Terminal(Entity<Terminal>),
767}
768
769impl ToolCallContent {
770 pub fn from_acp(
771 content: acp::ToolCallContent,
772 language_registry: Arc<LanguageRegistry>,
773 path_style: PathStyle,
774 terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
775 cx: &mut App,
776 ) -> Result<Option<Self>> {
777 match content {
778 acp::ToolCallContent::Content(acp::Content { content, .. }) => {
779 Ok(Some(Self::ContentBlock(ContentBlock::new(
780 content,
781 &language_registry,
782 path_style,
783 cx,
784 ))))
785 }
786 acp::ToolCallContent::Diff(diff) => Ok(Some(Self::Diff(cx.new(|cx| {
787 Diff::finalized(
788 diff.path.to_string_lossy().into_owned(),
789 diff.old_text,
790 diff.new_text,
791 language_registry,
792 cx,
793 )
794 })))),
795 acp::ToolCallContent::Terminal(acp::Terminal { terminal_id, .. }) => terminals
796 .get(&terminal_id)
797 .cloned()
798 .map(|terminal| Some(Self::Terminal(terminal)))
799 .ok_or_else(|| anyhow::anyhow!("Terminal with id `{}` not found", terminal_id)),
800 _ => Ok(None),
801 }
802 }
803
804 pub fn update_from_acp(
805 &mut self,
806 new: acp::ToolCallContent,
807 language_registry: Arc<LanguageRegistry>,
808 path_style: PathStyle,
809 terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
810 cx: &mut App,
811 ) -> Result<bool> {
812 let needs_update = match (&self, &new) {
813 (Self::Diff(old_diff), acp::ToolCallContent::Diff(new_diff)) => {
814 old_diff.read(cx).needs_update(
815 new_diff.old_text.as_deref().unwrap_or(""),
816 &new_diff.new_text,
817 cx,
818 )
819 }
820 _ => true,
821 };
822
823 if let Some(update) = Self::from_acp(new, language_registry, path_style, terminals, cx)? {
824 if needs_update {
825 *self = update;
826 }
827 Ok(true)
828 } else {
829 Ok(false)
830 }
831 }
832
833 pub fn to_markdown(&self, cx: &App) -> String {
834 match self {
835 Self::ContentBlock(content) => content.to_markdown(cx).to_string(),
836 Self::Diff(diff) => diff.read(cx).to_markdown(cx),
837 Self::Terminal(terminal) => terminal.read(cx).to_markdown(cx),
838 }
839 }
840
841 pub fn image(&self) -> Option<&Arc<gpui::Image>> {
842 match self {
843 Self::ContentBlock(content) => content.image(),
844 _ => None,
845 }
846 }
847}
848
849#[derive(Debug, PartialEq)]
850pub enum ToolCallUpdate {
851 UpdateFields(acp::ToolCallUpdate),
852 UpdateDiff(ToolCallUpdateDiff),
853 UpdateTerminal(ToolCallUpdateTerminal),
854}
855
856impl ToolCallUpdate {
857 fn id(&self) -> &acp::ToolCallId {
858 match self {
859 Self::UpdateFields(update) => &update.tool_call_id,
860 Self::UpdateDiff(diff) => &diff.id,
861 Self::UpdateTerminal(terminal) => &terminal.id,
862 }
863 }
864}
865
866impl From<acp::ToolCallUpdate> for ToolCallUpdate {
867 fn from(update: acp::ToolCallUpdate) -> Self {
868 Self::UpdateFields(update)
869 }
870}
871
872impl From<ToolCallUpdateDiff> for ToolCallUpdate {
873 fn from(diff: ToolCallUpdateDiff) -> Self {
874 Self::UpdateDiff(diff)
875 }
876}
877
878#[derive(Debug, PartialEq)]
879pub struct ToolCallUpdateDiff {
880 pub id: acp::ToolCallId,
881 pub diff: Entity<Diff>,
882}
883
884impl From<ToolCallUpdateTerminal> for ToolCallUpdate {
885 fn from(terminal: ToolCallUpdateTerminal) -> Self {
886 Self::UpdateTerminal(terminal)
887 }
888}
889
890#[derive(Debug, PartialEq)]
891pub struct ToolCallUpdateTerminal {
892 pub id: acp::ToolCallId,
893 pub terminal: Entity<Terminal>,
894}
895
896#[derive(Debug, Default)]
897pub struct Plan {
898 pub entries: Vec<PlanEntry>,
899}
900
901#[derive(Debug)]
902pub struct PlanStats<'a> {
903 pub in_progress_entry: Option<&'a PlanEntry>,
904 pub pending: u32,
905 pub completed: u32,
906}
907
908impl Plan {
909 pub fn is_empty(&self) -> bool {
910 self.entries.is_empty()
911 }
912
913 pub fn stats(&self) -> PlanStats<'_> {
914 let mut stats = PlanStats {
915 in_progress_entry: None,
916 pending: 0,
917 completed: 0,
918 };
919
920 for entry in &self.entries {
921 match &entry.status {
922 acp::PlanEntryStatus::Pending => {
923 stats.pending += 1;
924 }
925 acp::PlanEntryStatus::InProgress => {
926 stats.in_progress_entry = stats.in_progress_entry.or(Some(entry));
927 }
928 acp::PlanEntryStatus::Completed => {
929 stats.completed += 1;
930 }
931 _ => {}
932 }
933 }
934
935 stats
936 }
937}
938
939#[derive(Debug)]
940pub struct PlanEntry {
941 pub content: Entity<Markdown>,
942 pub priority: acp::PlanEntryPriority,
943 pub status: acp::PlanEntryStatus,
944}
945
946impl PlanEntry {
947 pub fn from_acp(entry: acp::PlanEntry, cx: &mut App) -> Self {
948 Self {
949 content: cx.new(|cx| Markdown::new(entry.content.into(), None, None, cx)),
950 priority: entry.priority,
951 status: entry.status,
952 }
953 }
954}
955
956#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
957pub struct TokenUsage {
958 pub max_tokens: u64,
959 pub used_tokens: u64,
960 pub input_tokens: u64,
961 pub output_tokens: u64,
962 pub max_output_tokens: Option<u64>,
963}
964
965pub const TOKEN_USAGE_WARNING_THRESHOLD: f32 = 0.8;
966
967impl TokenUsage {
968 pub fn ratio(&self) -> TokenUsageRatio {
969 #[cfg(debug_assertions)]
970 let warning_threshold: f32 = std::env::var("ZED_THREAD_WARNING_THRESHOLD")
971 .unwrap_or(TOKEN_USAGE_WARNING_THRESHOLD.to_string())
972 .parse()
973 .unwrap();
974 #[cfg(not(debug_assertions))]
975 let warning_threshold: f32 = TOKEN_USAGE_WARNING_THRESHOLD;
976
977 // When the maximum is unknown because there is no selected model,
978 // avoid showing the token limit warning.
979 if self.max_tokens == 0 {
980 TokenUsageRatio::Normal
981 } else if self.used_tokens >= self.max_tokens {
982 TokenUsageRatio::Exceeded
983 } else if self.used_tokens as f32 / self.max_tokens as f32 >= warning_threshold {
984 TokenUsageRatio::Warning
985 } else {
986 TokenUsageRatio::Normal
987 }
988 }
989}
990
991#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
992pub enum TokenUsageRatio {
993 Normal,
994 Warning,
995 Exceeded,
996}
997
998#[derive(Debug, Clone)]
999pub struct RetryStatus {
1000 pub last_error: SharedString,
1001 pub attempt: usize,
1002 pub max_attempts: usize,
1003 pub started_at: Instant,
1004 pub duration: Duration,
1005}
1006
1007struct RunningTurn {
1008 id: u32,
1009 send_task: Task<()>,
1010}
1011
1012pub struct AcpThread {
1013 session_id: acp::SessionId,
1014 work_dirs: Option<PathList>,
1015 parent_session_id: Option<acp::SessionId>,
1016 title: SharedString,
1017 provisional_title: Option<SharedString>,
1018 entries: Vec<AgentThreadEntry>,
1019 plan: Plan,
1020 project: Entity<Project>,
1021 action_log: Entity<ActionLog>,
1022 shared_buffers: HashMap<Entity<Buffer>, BufferSnapshot>,
1023 turn_id: u32,
1024 running_turn: Option<RunningTurn>,
1025 connection: Rc<dyn AgentConnection>,
1026 token_usage: Option<TokenUsage>,
1027 prompt_capabilities: acp::PromptCapabilities,
1028 _observe_prompt_capabilities: Task<anyhow::Result<()>>,
1029 terminals: HashMap<acp::TerminalId, Entity<Terminal>>,
1030 pending_terminal_output: HashMap<acp::TerminalId, Vec<Vec<u8>>>,
1031 pending_terminal_exit: HashMap<acp::TerminalId, acp::TerminalExitStatus>,
1032 had_error: bool,
1033 /// The user's unsent prompt text, persisted so it can be restored when reloading the thread.
1034 draft_prompt: Option<Vec<acp::ContentBlock>>,
1035 /// The initial scroll position for the thread view, set during session registration.
1036 ui_scroll_position: Option<gpui::ListOffset>,
1037 /// Buffer for smooth text streaming. Holds text that has been received from
1038 /// the model but not yet revealed in the UI. A timer task drains this buffer
1039 /// gradually to create a fluid typing effect instead of choppy chunk-at-a-time
1040 /// updates.
1041 streaming_text_buffer: Option<StreamingTextBuffer>,
1042}
1043
1044struct StreamingTextBuffer {
1045 /// Text received from the model but not yet appended to the Markdown source.
1046 pending: String,
1047 /// The number of bytes to reveal per timer turn.
1048 bytes_to_reveal_per_tick: usize,
1049 /// The Markdown entity being streamed into.
1050 target: Entity<Markdown>,
1051 /// Timer task that periodically moves text from `pending` into `source`.
1052 _reveal_task: Task<()>,
1053}
1054
1055impl StreamingTextBuffer {
1056 /// The number of milliseconds between each timer tick, controlling how quickly
1057 /// text is revealed.
1058 const TASK_UPDATE_MS: u64 = 16;
1059 /// The time in milliseconds to reveal the entire pending text.
1060 const REVEAL_TARGET: f32 = 200.0;
1061}
1062
1063impl From<&AcpThread> for ActionLogTelemetry {
1064 fn from(value: &AcpThread) -> Self {
1065 Self {
1066 agent_telemetry_id: value.connection().telemetry_id(),
1067 session_id: value.session_id.0.clone(),
1068 }
1069 }
1070}
1071
1072#[derive(Debug)]
1073pub enum AcpThreadEvent {
1074 NewEntry,
1075 TitleUpdated,
1076 TokenUsageUpdated,
1077 EntryUpdated(usize),
1078 EntriesRemoved(Range<usize>),
1079 ToolAuthorizationRequested(acp::ToolCallId),
1080 ToolAuthorizationReceived(acp::ToolCallId),
1081 Retry(RetryStatus),
1082 SubagentSpawned(acp::SessionId),
1083 Stopped(acp::StopReason),
1084 Error,
1085 LoadError(LoadError),
1086 PromptCapabilitiesUpdated,
1087 Refusal,
1088 AvailableCommandsUpdated(Vec<acp::AvailableCommand>),
1089 ModeUpdated(acp::SessionModeId),
1090 ConfigOptionsUpdated(Vec<acp::SessionConfigOption>),
1091}
1092
1093impl EventEmitter<AcpThreadEvent> for AcpThread {}
1094
1095#[derive(Debug, Clone)]
1096pub enum TerminalProviderEvent {
1097 Created {
1098 terminal_id: acp::TerminalId,
1099 label: String,
1100 cwd: Option<PathBuf>,
1101 output_byte_limit: Option<u64>,
1102 terminal: Entity<::terminal::Terminal>,
1103 },
1104 Output {
1105 terminal_id: acp::TerminalId,
1106 data: Vec<u8>,
1107 },
1108 TitleChanged {
1109 terminal_id: acp::TerminalId,
1110 title: String,
1111 },
1112 Exit {
1113 terminal_id: acp::TerminalId,
1114 status: acp::TerminalExitStatus,
1115 },
1116}
1117
1118#[derive(Debug, Clone)]
1119pub enum TerminalProviderCommand {
1120 WriteInput {
1121 terminal_id: acp::TerminalId,
1122 bytes: Vec<u8>,
1123 },
1124 Resize {
1125 terminal_id: acp::TerminalId,
1126 cols: u16,
1127 rows: u16,
1128 },
1129 Close {
1130 terminal_id: acp::TerminalId,
1131 },
1132}
1133
1134#[derive(PartialEq, Eq, Debug)]
1135pub enum ThreadStatus {
1136 Idle,
1137 Generating,
1138}
1139
1140#[derive(Debug, Clone)]
1141pub enum LoadError {
1142 Unsupported {
1143 command: SharedString,
1144 current_version: SharedString,
1145 minimum_version: SharedString,
1146 },
1147 FailedToInstall(SharedString),
1148 Exited {
1149 status: ExitStatus,
1150 },
1151 Other(SharedString),
1152}
1153
1154impl Display for LoadError {
1155 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1156 match self {
1157 LoadError::Unsupported {
1158 command: path,
1159 current_version,
1160 minimum_version,
1161 } => {
1162 write!(
1163 f,
1164 "version {current_version} from {path} is not supported (need at least {minimum_version})"
1165 )
1166 }
1167 LoadError::FailedToInstall(msg) => write!(f, "Failed to install: {msg}"),
1168 LoadError::Exited { status } => write!(f, "Server exited with status {status}"),
1169 LoadError::Other(msg) => write!(f, "{msg}"),
1170 }
1171 }
1172}
1173
1174impl Error for LoadError {}
1175
1176impl AcpThread {
1177 pub fn new(
1178 parent_session_id: Option<acp::SessionId>,
1179 title: impl Into<SharedString>,
1180 work_dirs: Option<PathList>,
1181 connection: Rc<dyn AgentConnection>,
1182 project: Entity<Project>,
1183 action_log: Entity<ActionLog>,
1184 session_id: acp::SessionId,
1185 mut prompt_capabilities_rx: watch::Receiver<acp::PromptCapabilities>,
1186 cx: &mut Context<Self>,
1187 ) -> Self {
1188 let prompt_capabilities = prompt_capabilities_rx.borrow().clone();
1189 let task = cx.spawn::<_, anyhow::Result<()>>(async move |this, cx| {
1190 loop {
1191 let caps = prompt_capabilities_rx.recv().await?;
1192 this.update(cx, |this, cx| {
1193 this.prompt_capabilities = caps;
1194 cx.emit(AcpThreadEvent::PromptCapabilitiesUpdated);
1195 })?;
1196 }
1197 });
1198
1199 Self {
1200 parent_session_id,
1201 work_dirs,
1202 action_log,
1203 shared_buffers: Default::default(),
1204 entries: Default::default(),
1205 plan: Default::default(),
1206 title: title.into(),
1207 provisional_title: None,
1208 project,
1209 running_turn: None,
1210 turn_id: 0,
1211 connection,
1212 session_id,
1213 token_usage: None,
1214 prompt_capabilities,
1215 _observe_prompt_capabilities: task,
1216 terminals: HashMap::default(),
1217 pending_terminal_output: HashMap::default(),
1218 pending_terminal_exit: HashMap::default(),
1219 had_error: false,
1220 draft_prompt: None,
1221 ui_scroll_position: None,
1222 streaming_text_buffer: None,
1223 }
1224 }
1225
1226 pub fn parent_session_id(&self) -> Option<&acp::SessionId> {
1227 self.parent_session_id.as_ref()
1228 }
1229
1230 pub fn prompt_capabilities(&self) -> acp::PromptCapabilities {
1231 self.prompt_capabilities.clone()
1232 }
1233
1234 pub fn draft_prompt(&self) -> Option<&[acp::ContentBlock]> {
1235 self.draft_prompt.as_deref()
1236 }
1237
1238 pub fn set_draft_prompt(&mut self, prompt: Option<Vec<acp::ContentBlock>>) {
1239 self.draft_prompt = prompt;
1240 }
1241
1242 pub fn ui_scroll_position(&self) -> Option<gpui::ListOffset> {
1243 self.ui_scroll_position
1244 }
1245
1246 pub fn set_ui_scroll_position(&mut self, position: Option<gpui::ListOffset>) {
1247 self.ui_scroll_position = position;
1248 }
1249
1250 pub fn connection(&self) -> &Rc<dyn AgentConnection> {
1251 &self.connection
1252 }
1253
1254 pub fn action_log(&self) -> &Entity<ActionLog> {
1255 &self.action_log
1256 }
1257
1258 pub fn project(&self) -> &Entity<Project> {
1259 &self.project
1260 }
1261
1262 pub fn title(&self) -> SharedString {
1263 self.provisional_title
1264 .clone()
1265 .unwrap_or_else(|| self.title.clone())
1266 }
1267
1268 pub fn has_provisional_title(&self) -> bool {
1269 self.provisional_title.is_some()
1270 }
1271
1272 pub fn entries(&self) -> &[AgentThreadEntry] {
1273 &self.entries
1274 }
1275
1276 pub fn session_id(&self) -> &acp::SessionId {
1277 &self.session_id
1278 }
1279
1280 pub fn work_dirs(&self) -> Option<&PathList> {
1281 self.work_dirs.as_ref()
1282 }
1283
1284 pub fn status(&self) -> ThreadStatus {
1285 if self.running_turn.is_some() {
1286 ThreadStatus::Generating
1287 } else {
1288 ThreadStatus::Idle
1289 }
1290 }
1291
1292 pub fn had_error(&self) -> bool {
1293 self.had_error
1294 }
1295
1296 pub fn is_waiting_for_confirmation(&self) -> bool {
1297 for entry in self.entries.iter().rev() {
1298 match entry {
1299 AgentThreadEntry::UserMessage(_) => return false,
1300 AgentThreadEntry::ToolCall(ToolCall {
1301 status: ToolCallStatus::WaitingForConfirmation { .. },
1302 ..
1303 }) => return true,
1304 AgentThreadEntry::ToolCall(_) | AgentThreadEntry::AssistantMessage(_) => {}
1305 }
1306 }
1307 false
1308 }
1309
1310 pub fn token_usage(&self) -> Option<&TokenUsage> {
1311 self.token_usage.as_ref()
1312 }
1313
1314 pub fn has_pending_edit_tool_calls(&self) -> bool {
1315 for entry in self.entries.iter().rev() {
1316 match entry {
1317 AgentThreadEntry::UserMessage(_) => return false,
1318 AgentThreadEntry::ToolCall(
1319 call @ ToolCall {
1320 status: ToolCallStatus::InProgress | ToolCallStatus::Pending,
1321 ..
1322 },
1323 ) if call.diffs().next().is_some() => {
1324 return true;
1325 }
1326 AgentThreadEntry::ToolCall(_) | AgentThreadEntry::AssistantMessage(_) => {}
1327 }
1328 }
1329
1330 false
1331 }
1332
1333 pub fn has_in_progress_tool_calls(&self) -> bool {
1334 for entry in self.entries.iter().rev() {
1335 match entry {
1336 AgentThreadEntry::UserMessage(_) => return false,
1337 AgentThreadEntry::ToolCall(ToolCall {
1338 status: ToolCallStatus::InProgress | ToolCallStatus::Pending,
1339 ..
1340 }) => {
1341 return true;
1342 }
1343 AgentThreadEntry::ToolCall(_) | AgentThreadEntry::AssistantMessage(_) => {}
1344 }
1345 }
1346
1347 false
1348 }
1349
1350 pub fn used_tools_since_last_user_message(&self) -> bool {
1351 for entry in self.entries.iter().rev() {
1352 match entry {
1353 AgentThreadEntry::UserMessage(..) => return false,
1354 AgentThreadEntry::AssistantMessage(..) => continue,
1355 AgentThreadEntry::ToolCall(..) => return true,
1356 }
1357 }
1358
1359 false
1360 }
1361
1362 pub fn handle_session_update(
1363 &mut self,
1364 update: acp::SessionUpdate,
1365 cx: &mut Context<Self>,
1366 ) -> Result<(), acp::Error> {
1367 match update {
1368 acp::SessionUpdate::UserMessageChunk(acp::ContentChunk { content, .. }) => {
1369 self.push_user_content_block(None, content, cx);
1370 }
1371 acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk { content, .. }) => {
1372 self.push_assistant_content_block(content, false, cx);
1373 }
1374 acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk { content, .. }) => {
1375 self.push_assistant_content_block(content, true, cx);
1376 }
1377 acp::SessionUpdate::ToolCall(tool_call) => {
1378 self.upsert_tool_call(tool_call, cx)?;
1379 }
1380 acp::SessionUpdate::ToolCallUpdate(tool_call_update) => {
1381 self.update_tool_call(tool_call_update, cx)?;
1382 }
1383 acp::SessionUpdate::Plan(plan) => {
1384 self.update_plan(plan, cx);
1385 }
1386 acp::SessionUpdate::SessionInfoUpdate(info_update) => {
1387 if let acp::MaybeUndefined::Value(title) = info_update.title {
1388 let had_provisional = self.provisional_title.take().is_some();
1389 let title: SharedString = title.into();
1390 if title != self.title {
1391 self.title = title;
1392 cx.emit(AcpThreadEvent::TitleUpdated);
1393 } else if had_provisional {
1394 cx.emit(AcpThreadEvent::TitleUpdated);
1395 }
1396 }
1397 }
1398 acp::SessionUpdate::AvailableCommandsUpdate(acp::AvailableCommandsUpdate {
1399 available_commands,
1400 ..
1401 }) => cx.emit(AcpThreadEvent::AvailableCommandsUpdated(available_commands)),
1402 acp::SessionUpdate::CurrentModeUpdate(acp::CurrentModeUpdate {
1403 current_mode_id,
1404 ..
1405 }) => cx.emit(AcpThreadEvent::ModeUpdated(current_mode_id)),
1406 acp::SessionUpdate::ConfigOptionUpdate(acp::ConfigOptionUpdate {
1407 config_options,
1408 ..
1409 }) => cx.emit(AcpThreadEvent::ConfigOptionsUpdated(config_options)),
1410 _ => {}
1411 }
1412 Ok(())
1413 }
1414
1415 pub fn push_user_content_block(
1416 &mut self,
1417 message_id: Option<UserMessageId>,
1418 chunk: acp::ContentBlock,
1419 cx: &mut Context<Self>,
1420 ) {
1421 self.push_user_content_block_with_indent(message_id, chunk, false, cx)
1422 }
1423
1424 pub fn push_user_content_block_with_indent(
1425 &mut self,
1426 message_id: Option<UserMessageId>,
1427 chunk: acp::ContentBlock,
1428 indented: bool,
1429 cx: &mut Context<Self>,
1430 ) {
1431 let language_registry = self.project.read(cx).languages().clone();
1432 let path_style = self.project.read(cx).path_style(cx);
1433 let entries_len = self.entries.len();
1434
1435 if let Some(last_entry) = self.entries.last_mut()
1436 && let AgentThreadEntry::UserMessage(UserMessage {
1437 id,
1438 content,
1439 chunks,
1440 indented: existing_indented,
1441 ..
1442 }) = last_entry
1443 && *existing_indented == indented
1444 {
1445 Self::flush_streaming_text(&mut self.streaming_text_buffer, cx);
1446 *id = message_id.or(id.take());
1447 content.append(chunk.clone(), &language_registry, path_style, cx);
1448 chunks.push(chunk);
1449 let idx = entries_len - 1;
1450 cx.emit(AcpThreadEvent::EntryUpdated(idx));
1451 } else {
1452 let content = ContentBlock::new(chunk.clone(), &language_registry, path_style, cx);
1453 self.push_entry(
1454 AgentThreadEntry::UserMessage(UserMessage {
1455 id: message_id,
1456 content,
1457 chunks: vec![chunk],
1458 checkpoint: None,
1459 indented,
1460 }),
1461 cx,
1462 );
1463 }
1464 }
1465
1466 pub fn push_assistant_content_block(
1467 &mut self,
1468 chunk: acp::ContentBlock,
1469 is_thought: bool,
1470 cx: &mut Context<Self>,
1471 ) {
1472 self.push_assistant_content_block_with_indent(chunk, is_thought, false, cx)
1473 }
1474
1475 pub fn push_assistant_content_block_with_indent(
1476 &mut self,
1477 chunk: acp::ContentBlock,
1478 is_thought: bool,
1479 indented: bool,
1480 cx: &mut Context<Self>,
1481 ) {
1482 let path_style = self.project.read(cx).path_style(cx);
1483
1484 // For text chunks going to an existing Markdown block, buffer for smooth
1485 // streaming instead of appending all at once which may feel more choppy.
1486 if let acp::ContentBlock::Text(text_content) = &chunk {
1487 if let Some(markdown) = self.streaming_markdown_target(is_thought, indented) {
1488 let entries_len = self.entries.len();
1489 cx.emit(AcpThreadEvent::EntryUpdated(entries_len - 1));
1490 self.buffer_streaming_text(&markdown, text_content.text.clone(), cx);
1491 return;
1492 }
1493 }
1494
1495 let language_registry = self.project.read(cx).languages().clone();
1496 let entries_len = self.entries.len();
1497 if let Some(last_entry) = self.entries.last_mut()
1498 && let AgentThreadEntry::AssistantMessage(AssistantMessage {
1499 chunks,
1500 indented: existing_indented,
1501 is_subagent_output: _,
1502 }) = last_entry
1503 && *existing_indented == indented
1504 {
1505 let idx = entries_len - 1;
1506 Self::flush_streaming_text(&mut self.streaming_text_buffer, cx);
1507 cx.emit(AcpThreadEvent::EntryUpdated(idx));
1508 match (chunks.last_mut(), is_thought) {
1509 (Some(AssistantMessageChunk::Message { block }), false)
1510 | (Some(AssistantMessageChunk::Thought { block }), true) => {
1511 block.append(chunk, &language_registry, path_style, cx)
1512 }
1513 _ => {
1514 let block = ContentBlock::new(chunk, &language_registry, path_style, cx);
1515 if is_thought {
1516 chunks.push(AssistantMessageChunk::Thought { block })
1517 } else {
1518 chunks.push(AssistantMessageChunk::Message { block })
1519 }
1520 }
1521 }
1522 } else {
1523 let block = ContentBlock::new(chunk, &language_registry, path_style, cx);
1524 let chunk = if is_thought {
1525 AssistantMessageChunk::Thought { block }
1526 } else {
1527 AssistantMessageChunk::Message { block }
1528 };
1529
1530 self.push_entry(
1531 AgentThreadEntry::AssistantMessage(AssistantMessage {
1532 chunks: vec![chunk],
1533 indented,
1534 is_subagent_output: false,
1535 }),
1536 cx,
1537 );
1538 }
1539 }
1540
1541 fn streaming_markdown_target(
1542 &self,
1543 is_thought: bool,
1544 indented: bool,
1545 ) -> Option<Entity<Markdown>> {
1546 let last_entry = self.entries.last()?;
1547 if let AgentThreadEntry::AssistantMessage(AssistantMessage {
1548 chunks,
1549 indented: existing_indented,
1550 ..
1551 }) = last_entry
1552 && *existing_indented == indented
1553 && let [.., chunk] = chunks.as_slice()
1554 {
1555 match (chunk, is_thought) {
1556 (
1557 AssistantMessageChunk::Message {
1558 block: ContentBlock::Markdown { markdown },
1559 },
1560 false,
1561 )
1562 | (
1563 AssistantMessageChunk::Thought {
1564 block: ContentBlock::Markdown { markdown },
1565 },
1566 true,
1567 ) => Some(markdown.clone()),
1568 _ => None,
1569 }
1570 } else {
1571 None
1572 }
1573 }
1574
1575 /// Add text to the streaming buffer. If the target changed (e.g. switching
1576 /// from thoughts to message text), flush the old buffer first.
1577 fn buffer_streaming_text(
1578 &mut self,
1579 markdown: &Entity<Markdown>,
1580 text: String,
1581 cx: &mut Context<Self>,
1582 ) {
1583 if let Some(buffer) = &mut self.streaming_text_buffer {
1584 if buffer.target.entity_id() == markdown.entity_id() {
1585 buffer.pending.push_str(&text);
1586
1587 buffer.bytes_to_reveal_per_tick = (buffer.pending.len() as f32
1588 / StreamingTextBuffer::REVEAL_TARGET
1589 * StreamingTextBuffer::TASK_UPDATE_MS as f32)
1590 .ceil() as usize;
1591 return;
1592 }
1593 Self::flush_streaming_text(&mut self.streaming_text_buffer, cx);
1594 }
1595
1596 let target = markdown.clone();
1597 let _reveal_task = self.start_streaming_reveal(cx);
1598 let pending_len = text.len();
1599 let bytes_to_reveal = (pending_len as f32 / StreamingTextBuffer::REVEAL_TARGET
1600 * StreamingTextBuffer::TASK_UPDATE_MS as f32)
1601 .ceil() as usize;
1602 self.streaming_text_buffer = Some(StreamingTextBuffer {
1603 pending: text,
1604 bytes_to_reveal_per_tick: bytes_to_reveal,
1605 target,
1606 _reveal_task,
1607 });
1608 }
1609
1610 /// Flush all buffered streaming text into the Markdown entity immediately.
1611 fn flush_streaming_text(
1612 streaming_text_buffer: &mut Option<StreamingTextBuffer>,
1613 cx: &mut Context<Self>,
1614 ) {
1615 if let Some(buffer) = streaming_text_buffer.take() {
1616 if !buffer.pending.is_empty() {
1617 buffer
1618 .target
1619 .update(cx, |markdown, cx| markdown.append(&buffer.pending, cx));
1620 }
1621 }
1622 }
1623
1624 /// Spawns a foreground task that periodically drains
1625 /// `streaming_text_buffer.pending` into the target `Markdown` entity,
1626 /// producing smooth, continuous text output.
1627 fn start_streaming_reveal(&self, cx: &mut Context<Self>) -> Task<()> {
1628 cx.spawn(async move |this, cx| {
1629 loop {
1630 cx.background_executor()
1631 .timer(Duration::from_millis(StreamingTextBuffer::TASK_UPDATE_MS))
1632 .await;
1633
1634 let should_continue = this
1635 .update(cx, |this, cx| {
1636 let Some(buffer) = &mut this.streaming_text_buffer else {
1637 return false;
1638 };
1639
1640 if buffer.pending.is_empty() {
1641 return true;
1642 }
1643
1644 let pending_len = buffer.pending.len();
1645
1646 let byte_boundary = buffer
1647 .pending
1648 .ceil_char_boundary(buffer.bytes_to_reveal_per_tick)
1649 .min(pending_len);
1650
1651 buffer.target.update(cx, |markdown: &mut Markdown, cx| {
1652 markdown.append(&buffer.pending[..byte_boundary], cx);
1653 buffer.pending.drain(..byte_boundary);
1654 });
1655
1656 true
1657 })
1658 .unwrap_or(false);
1659
1660 if !should_continue {
1661 break;
1662 }
1663 }
1664 })
1665 }
1666
1667 fn push_entry(&mut self, entry: AgentThreadEntry, cx: &mut Context<Self>) {
1668 Self::flush_streaming_text(&mut self.streaming_text_buffer, cx);
1669 self.entries.push(entry);
1670 cx.emit(AcpThreadEvent::NewEntry);
1671 }
1672
1673 pub fn can_set_title(&mut self, cx: &mut Context<Self>) -> bool {
1674 self.connection.set_title(&self.session_id, cx).is_some()
1675 }
1676
1677 pub fn set_title(&mut self, title: SharedString, cx: &mut Context<Self>) -> Task<Result<()>> {
1678 let had_provisional = self.provisional_title.take().is_some();
1679 if title != self.title {
1680 self.title = title.clone();
1681 cx.emit(AcpThreadEvent::TitleUpdated);
1682 if let Some(set_title) = self.connection.set_title(&self.session_id, cx) {
1683 return set_title.run(title, cx);
1684 }
1685 } else if had_provisional {
1686 cx.emit(AcpThreadEvent::TitleUpdated);
1687 }
1688 Task::ready(Ok(()))
1689 }
1690
1691 /// Sets a provisional display title without propagating back to the
1692 /// underlying agent connection. This is used for quick preview titles
1693 /// (e.g. first 20 chars of the user message) that should be shown
1694 /// immediately but replaced once the LLM generates a proper title via
1695 /// `set_title`.
1696 pub fn set_provisional_title(&mut self, title: SharedString, cx: &mut Context<Self>) {
1697 self.provisional_title = Some(title);
1698 cx.emit(AcpThreadEvent::TitleUpdated);
1699 }
1700
1701 pub fn subagent_spawned(&mut self, session_id: acp::SessionId, cx: &mut Context<Self>) {
1702 cx.emit(AcpThreadEvent::SubagentSpawned(session_id));
1703 }
1704
1705 pub fn update_token_usage(&mut self, usage: Option<TokenUsage>, cx: &mut Context<Self>) {
1706 self.token_usage = usage;
1707 cx.emit(AcpThreadEvent::TokenUsageUpdated);
1708 }
1709
1710 pub fn update_retry_status(&mut self, status: RetryStatus, cx: &mut Context<Self>) {
1711 cx.emit(AcpThreadEvent::Retry(status));
1712 }
1713
1714 pub fn update_tool_call(
1715 &mut self,
1716 update: impl Into<ToolCallUpdate>,
1717 cx: &mut Context<Self>,
1718 ) -> Result<()> {
1719 let update = update.into();
1720 let languages = self.project.read(cx).languages().clone();
1721 let path_style = self.project.read(cx).path_style(cx);
1722
1723 let ix = match self.index_for_tool_call(update.id()) {
1724 Some(ix) => ix,
1725 None => {
1726 // Tool call not found - create a failed tool call entry
1727 let failed_tool_call = ToolCall {
1728 id: update.id().clone(),
1729 label: cx.new(|cx| Markdown::new("Tool call not found".into(), None, None, cx)),
1730 kind: acp::ToolKind::Fetch,
1731 content: vec![ToolCallContent::ContentBlock(ContentBlock::new(
1732 "Tool call not found".into(),
1733 &languages,
1734 path_style,
1735 cx,
1736 ))],
1737 status: ToolCallStatus::Failed,
1738 locations: Vec::new(),
1739 resolved_locations: Vec::new(),
1740 raw_input: None,
1741 raw_input_markdown: None,
1742 raw_output: None,
1743 tool_name: None,
1744 subagent_session_info: None,
1745 };
1746 self.push_entry(AgentThreadEntry::ToolCall(failed_tool_call), cx);
1747 return Ok(());
1748 }
1749 };
1750 let AgentThreadEntry::ToolCall(call) = &mut self.entries[ix] else {
1751 unreachable!()
1752 };
1753
1754 match update {
1755 ToolCallUpdate::UpdateFields(update) => {
1756 let location_updated = update.fields.locations.is_some();
1757 call.update_fields(
1758 update.fields,
1759 update.meta,
1760 languages,
1761 path_style,
1762 &self.terminals,
1763 cx,
1764 )?;
1765 if location_updated {
1766 self.resolve_locations(update.tool_call_id, cx);
1767 }
1768 }
1769 ToolCallUpdate::UpdateDiff(update) => {
1770 call.content.clear();
1771 call.content.push(ToolCallContent::Diff(update.diff));
1772 }
1773 ToolCallUpdate::UpdateTerminal(update) => {
1774 call.content.clear();
1775 call.content
1776 .push(ToolCallContent::Terminal(update.terminal));
1777 }
1778 }
1779
1780 cx.emit(AcpThreadEvent::EntryUpdated(ix));
1781
1782 Ok(())
1783 }
1784
1785 /// Updates a tool call if id matches an existing entry, otherwise inserts a new one.
1786 pub fn upsert_tool_call(
1787 &mut self,
1788 tool_call: acp::ToolCall,
1789 cx: &mut Context<Self>,
1790 ) -> Result<(), acp::Error> {
1791 let status = tool_call.status.into();
1792 self.upsert_tool_call_inner(tool_call.into(), status, cx)
1793 }
1794
1795 /// Fails if id does not match an existing entry.
1796 pub fn upsert_tool_call_inner(
1797 &mut self,
1798 update: acp::ToolCallUpdate,
1799 status: ToolCallStatus,
1800 cx: &mut Context<Self>,
1801 ) -> Result<(), acp::Error> {
1802 let language_registry = self.project.read(cx).languages().clone();
1803 let path_style = self.project.read(cx).path_style(cx);
1804 let id = update.tool_call_id.clone();
1805
1806 let agent_telemetry_id = self.connection().telemetry_id();
1807 let session = self.session_id();
1808 let parent_session_id = self.parent_session_id();
1809 if let ToolCallStatus::Completed | ToolCallStatus::Failed = status {
1810 let status = if matches!(status, ToolCallStatus::Completed) {
1811 "completed"
1812 } else {
1813 "failed"
1814 };
1815 telemetry::event!(
1816 "Agent Tool Call Completed",
1817 agent_telemetry_id,
1818 session,
1819 parent_session_id,
1820 status
1821 );
1822 }
1823
1824 if let Some(ix) = self.index_for_tool_call(&id) {
1825 let AgentThreadEntry::ToolCall(call) = &mut self.entries[ix] else {
1826 unreachable!()
1827 };
1828
1829 call.update_fields(
1830 update.fields,
1831 update.meta,
1832 language_registry,
1833 path_style,
1834 &self.terminals,
1835 cx,
1836 )?;
1837 call.status = status;
1838
1839 cx.emit(AcpThreadEvent::EntryUpdated(ix));
1840 } else {
1841 let call = ToolCall::from_acp(
1842 update.try_into()?,
1843 status,
1844 language_registry,
1845 self.project.read(cx).path_style(cx),
1846 &self.terminals,
1847 cx,
1848 )?;
1849 self.push_entry(AgentThreadEntry::ToolCall(call), cx);
1850 };
1851
1852 self.resolve_locations(id, cx);
1853 Ok(())
1854 }
1855
1856 fn index_for_tool_call(&self, id: &acp::ToolCallId) -> Option<usize> {
1857 self.entries
1858 .iter()
1859 .enumerate()
1860 .rev()
1861 .find_map(|(index, entry)| {
1862 if let AgentThreadEntry::ToolCall(tool_call) = entry
1863 && &tool_call.id == id
1864 {
1865 Some(index)
1866 } else {
1867 None
1868 }
1869 })
1870 }
1871
1872 fn tool_call_mut(&mut self, id: &acp::ToolCallId) -> Option<(usize, &mut ToolCall)> {
1873 // The tool call we are looking for is typically the last one, or very close to the end.
1874 // At the moment, it doesn't seem like a hashmap would be a good fit for this use case.
1875 self.entries
1876 .iter_mut()
1877 .enumerate()
1878 .rev()
1879 .find_map(|(index, tool_call)| {
1880 if let AgentThreadEntry::ToolCall(tool_call) = tool_call
1881 && &tool_call.id == id
1882 {
1883 Some((index, tool_call))
1884 } else {
1885 None
1886 }
1887 })
1888 }
1889
1890 pub fn tool_call(&self, id: &acp::ToolCallId) -> Option<(usize, &ToolCall)> {
1891 self.entries
1892 .iter()
1893 .enumerate()
1894 .rev()
1895 .find_map(|(index, tool_call)| {
1896 if let AgentThreadEntry::ToolCall(tool_call) = tool_call
1897 && &tool_call.id == id
1898 {
1899 Some((index, tool_call))
1900 } else {
1901 None
1902 }
1903 })
1904 }
1905
1906 pub fn tool_call_for_subagent(&self, session_id: &acp::SessionId) -> Option<&ToolCall> {
1907 self.entries.iter().find_map(|entry| match entry {
1908 AgentThreadEntry::ToolCall(tool_call) => {
1909 if let Some(subagent_session_info) = &tool_call.subagent_session_info
1910 && &subagent_session_info.session_id == session_id
1911 {
1912 Some(tool_call)
1913 } else {
1914 None
1915 }
1916 }
1917 _ => None,
1918 })
1919 }
1920
1921 pub fn resolve_locations(&mut self, id: acp::ToolCallId, cx: &mut Context<Self>) {
1922 let project = self.project.clone();
1923 let should_update_agent_location = self.parent_session_id.is_none();
1924 let Some((_, tool_call)) = self.tool_call_mut(&id) else {
1925 return;
1926 };
1927 let task = tool_call.resolve_locations(project, cx);
1928 cx.spawn(async move |this, cx| {
1929 let resolved_locations = task.await;
1930
1931 this.update(cx, |this, cx| {
1932 let project = this.project.clone();
1933
1934 for location in resolved_locations.iter().flatten() {
1935 this.shared_buffers
1936 .insert(location.buffer.clone(), location.buffer.read(cx).snapshot());
1937 }
1938 let Some((ix, tool_call)) = this.tool_call_mut(&id) else {
1939 return;
1940 };
1941
1942 if let Some(Some(location)) = resolved_locations.last() {
1943 project.update(cx, |project, cx| {
1944 let should_ignore = if let Some(agent_location) = project
1945 .agent_location()
1946 .filter(|agent_location| agent_location.buffer == location.buffer)
1947 {
1948 let snapshot = location.buffer.read(cx).snapshot();
1949 let old_position = agent_location.position.to_point(&snapshot);
1950 let new_position = location.position.to_point(&snapshot);
1951
1952 // ignore this so that when we get updates from the edit tool
1953 // the position doesn't reset to the startof line
1954 old_position.row == new_position.row
1955 && old_position.column > new_position.column
1956 } else {
1957 false
1958 };
1959 if !should_ignore && should_update_agent_location {
1960 project.set_agent_location(Some(location.into()), cx);
1961 }
1962 });
1963 }
1964
1965 let resolved_locations = resolved_locations
1966 .iter()
1967 .map(|l| l.as_ref().map(|l| AgentLocation::from(l)))
1968 .collect::<Vec<_>>();
1969
1970 if tool_call.resolved_locations != resolved_locations {
1971 tool_call.resolved_locations = resolved_locations;
1972 cx.emit(AcpThreadEvent::EntryUpdated(ix));
1973 }
1974 })
1975 })
1976 .detach();
1977 }
1978
1979 pub fn request_tool_call_authorization(
1980 &mut self,
1981 tool_call: acp::ToolCallUpdate,
1982 options: PermissionOptions,
1983 cx: &mut Context<Self>,
1984 ) -> Result<Task<RequestPermissionOutcome>> {
1985 let (tx, rx) = oneshot::channel();
1986
1987 let status = ToolCallStatus::WaitingForConfirmation {
1988 options,
1989 respond_tx: tx,
1990 };
1991
1992 let tool_call_id = tool_call.tool_call_id.clone();
1993 self.upsert_tool_call_inner(tool_call, status, cx)?;
1994 cx.emit(AcpThreadEvent::ToolAuthorizationRequested(
1995 tool_call_id.clone(),
1996 ));
1997
1998 Ok(cx.spawn(async move |this, cx| {
1999 let outcome = match rx.await {
2000 Ok(outcome) => RequestPermissionOutcome::Selected(outcome),
2001 Err(oneshot::Canceled) => RequestPermissionOutcome::Cancelled,
2002 };
2003 this.update(cx, |_this, cx| {
2004 cx.emit(AcpThreadEvent::ToolAuthorizationReceived(tool_call_id))
2005 })
2006 .ok();
2007 outcome
2008 }))
2009 }
2010
2011 pub fn authorize_tool_call(
2012 &mut self,
2013 id: acp::ToolCallId,
2014 outcome: SelectedPermissionOutcome,
2015 option_kind: acp::PermissionOptionKind,
2016 cx: &mut Context<Self>,
2017 ) {
2018 let Some((ix, call)) = self.tool_call_mut(&id) else {
2019 return;
2020 };
2021
2022 let new_status = match option_kind {
2023 acp::PermissionOptionKind::RejectOnce | acp::PermissionOptionKind::RejectAlways => {
2024 ToolCallStatus::Rejected
2025 }
2026 acp::PermissionOptionKind::AllowOnce | acp::PermissionOptionKind::AllowAlways => {
2027 ToolCallStatus::InProgress
2028 }
2029 _ => ToolCallStatus::InProgress,
2030 };
2031
2032 let curr_status = mem::replace(&mut call.status, new_status);
2033
2034 if let ToolCallStatus::WaitingForConfirmation { respond_tx, .. } = curr_status {
2035 respond_tx.send(outcome).log_err();
2036 } else if cfg!(debug_assertions) {
2037 panic!("tried to authorize an already authorized tool call");
2038 }
2039
2040 cx.emit(AcpThreadEvent::EntryUpdated(ix));
2041 }
2042
2043 pub fn plan(&self) -> &Plan {
2044 &self.plan
2045 }
2046
2047 pub fn update_plan(&mut self, request: acp::Plan, cx: &mut Context<Self>) {
2048 let new_entries_len = request.entries.len();
2049 let mut new_entries = request.entries.into_iter();
2050
2051 // Reuse existing markdown to prevent flickering
2052 for (old, new) in self.plan.entries.iter_mut().zip(new_entries.by_ref()) {
2053 let PlanEntry {
2054 content,
2055 priority,
2056 status,
2057 } = old;
2058 content.update(cx, |old, cx| {
2059 old.replace(new.content, cx);
2060 });
2061 *priority = new.priority;
2062 *status = new.status;
2063 }
2064 for new in new_entries {
2065 self.plan.entries.push(PlanEntry::from_acp(new, cx))
2066 }
2067 self.plan.entries.truncate(new_entries_len);
2068
2069 cx.notify();
2070 }
2071
2072 fn clear_completed_plan_entries(&mut self, cx: &mut Context<Self>) {
2073 self.plan
2074 .entries
2075 .retain(|entry| !matches!(entry.status, acp::PlanEntryStatus::Completed));
2076 cx.notify();
2077 }
2078
2079 #[cfg(any(test, feature = "test-support"))]
2080 pub fn send_raw(
2081 &mut self,
2082 message: &str,
2083 cx: &mut Context<Self>,
2084 ) -> BoxFuture<'static, Result<Option<acp::PromptResponse>>> {
2085 self.send(vec![message.into()], cx)
2086 }
2087
2088 pub fn send(
2089 &mut self,
2090 message: Vec<acp::ContentBlock>,
2091 cx: &mut Context<Self>,
2092 ) -> BoxFuture<'static, Result<Option<acp::PromptResponse>>> {
2093 let block = ContentBlock::new_combined(
2094 message.clone(),
2095 self.project.read(cx).languages().clone(),
2096 self.project.read(cx).path_style(cx),
2097 cx,
2098 );
2099 let request = acp::PromptRequest::new(self.session_id.clone(), message.clone());
2100 let git_store = self.project.read(cx).git_store().clone();
2101
2102 let message_id = if self.connection.truncate(&self.session_id, cx).is_some() {
2103 Some(UserMessageId::new())
2104 } else {
2105 None
2106 };
2107
2108 self.run_turn(cx, async move |this, cx| {
2109 this.update(cx, |this, cx| {
2110 this.push_entry(
2111 AgentThreadEntry::UserMessage(UserMessage {
2112 id: message_id.clone(),
2113 content: block,
2114 chunks: message,
2115 checkpoint: None,
2116 indented: false,
2117 }),
2118 cx,
2119 );
2120 })
2121 .ok();
2122
2123 let old_checkpoint = git_store
2124 .update(cx, |git, cx| git.checkpoint(cx))
2125 .await
2126 .context("failed to get old checkpoint")
2127 .log_err();
2128 this.update(cx, |this, cx| {
2129 if let Some((_ix, message)) = this.last_user_message() {
2130 message.checkpoint = old_checkpoint.map(|git_checkpoint| Checkpoint {
2131 git_checkpoint,
2132 show: false,
2133 });
2134 }
2135 this.connection.prompt(message_id, request, cx)
2136 })?
2137 .await
2138 })
2139 }
2140
2141 pub fn can_retry(&self, cx: &App) -> bool {
2142 self.connection.retry(&self.session_id, cx).is_some()
2143 }
2144
2145 pub fn retry(
2146 &mut self,
2147 cx: &mut Context<Self>,
2148 ) -> BoxFuture<'static, Result<Option<acp::PromptResponse>>> {
2149 self.run_turn(cx, async move |this, cx| {
2150 this.update(cx, |this, cx| {
2151 this.connection
2152 .retry(&this.session_id, cx)
2153 .map(|retry| retry.run(cx))
2154 })?
2155 .context("retrying a session is not supported")?
2156 .await
2157 })
2158 }
2159
2160 fn run_turn(
2161 &mut self,
2162 cx: &mut Context<Self>,
2163 f: impl 'static + AsyncFnOnce(WeakEntity<Self>, &mut AsyncApp) -> Result<acp::PromptResponse>,
2164 ) -> BoxFuture<'static, Result<Option<acp::PromptResponse>>> {
2165 self.clear_completed_plan_entries(cx);
2166 self.had_error = false;
2167
2168 let (tx, rx) = oneshot::channel();
2169 let cancel_task = self.cancel(cx);
2170
2171 self.turn_id += 1;
2172 let turn_id = self.turn_id;
2173 self.running_turn = Some(RunningTurn {
2174 id: turn_id,
2175 send_task: cx.spawn(async move |this, cx| {
2176 cancel_task.await;
2177 tx.send(f(this, cx).await).ok();
2178 }),
2179 });
2180
2181 cx.spawn(async move |this, cx| {
2182 let response = rx.await;
2183
2184 this.update(cx, |this, cx| this.update_last_checkpoint(cx))?
2185 .await?;
2186
2187 this.update(cx, |this, cx| {
2188 if this.parent_session_id.is_none() {
2189 this.project
2190 .update(cx, |project, cx| project.set_agent_location(None, cx));
2191 }
2192 let Ok(response) = response else {
2193 // tx dropped, just return
2194 return Ok(None);
2195 };
2196
2197 let is_same_turn = this
2198 .running_turn
2199 .as_ref()
2200 .is_some_and(|turn| turn_id == turn.id);
2201
2202 // If the user submitted a follow up message, running_turn might
2203 // already point to a different turn. Therefore we only want to
2204 // take the task if it's the same turn.
2205 if is_same_turn {
2206 this.running_turn.take();
2207 }
2208
2209 match response {
2210 Ok(r) => {
2211 Self::flush_streaming_text(&mut this.streaming_text_buffer, cx);
2212
2213 if r.stop_reason == acp::StopReason::MaxTokens {
2214 this.had_error = true;
2215 cx.emit(AcpThreadEvent::Error);
2216 log::error!("Max tokens reached. Usage: {:?}", this.token_usage);
2217 return Err(anyhow!("Max tokens reached"));
2218 }
2219
2220 let canceled = matches!(r.stop_reason, acp::StopReason::Cancelled);
2221 if canceled {
2222 this.mark_pending_tools_as_canceled();
2223 }
2224
2225 // Handle refusal - distinguish between user prompt and tool call refusals
2226 if let acp::StopReason::Refusal = r.stop_reason {
2227 this.had_error = true;
2228 if let Some((user_msg_ix, _)) = this.last_user_message() {
2229 // Check if there's a completed tool call with results after the last user message
2230 // This indicates the refusal is in response to tool output, not the user's prompt
2231 let has_completed_tool_call_after_user_msg =
2232 this.entries.iter().skip(user_msg_ix + 1).any(|entry| {
2233 if let AgentThreadEntry::ToolCall(tool_call) = entry {
2234 // Check if the tool call has completed and has output
2235 matches!(tool_call.status, ToolCallStatus::Completed)
2236 && tool_call.raw_output.is_some()
2237 } else {
2238 false
2239 }
2240 });
2241
2242 if has_completed_tool_call_after_user_msg {
2243 // Refusal is due to tool output - don't truncate, just notify
2244 // The model refused based on what the tool returned
2245 cx.emit(AcpThreadEvent::Refusal);
2246 } else {
2247 // User prompt was refused - truncate back to before the user message
2248 let range = user_msg_ix..this.entries.len();
2249 if range.start < range.end {
2250 this.entries.truncate(user_msg_ix);
2251 cx.emit(AcpThreadEvent::EntriesRemoved(range));
2252 }
2253 cx.emit(AcpThreadEvent::Refusal);
2254 }
2255 } else {
2256 // No user message found, treat as general refusal
2257 cx.emit(AcpThreadEvent::Refusal);
2258 }
2259 }
2260
2261 cx.emit(AcpThreadEvent::Stopped(r.stop_reason));
2262 Ok(Some(r))
2263 }
2264 Err(e) => {
2265 Self::flush_streaming_text(&mut this.streaming_text_buffer, cx);
2266
2267 this.had_error = true;
2268 cx.emit(AcpThreadEvent::Error);
2269 log::error!("Error in run turn: {:?}", e);
2270 Err(e)
2271 }
2272 }
2273 })?
2274 })
2275 .boxed()
2276 }
2277
2278 pub fn cancel(&mut self, cx: &mut Context<Self>) -> Task<()> {
2279 let Some(turn) = self.running_turn.take() else {
2280 return Task::ready(());
2281 };
2282 self.connection.cancel(&self.session_id, cx);
2283
2284 Self::flush_streaming_text(&mut self.streaming_text_buffer, cx);
2285 self.mark_pending_tools_as_canceled();
2286
2287 // Wait for the send task to complete
2288 cx.background_spawn(turn.send_task)
2289 }
2290
2291 fn mark_pending_tools_as_canceled(&mut self) {
2292 for entry in self.entries.iter_mut() {
2293 if let AgentThreadEntry::ToolCall(call) = entry {
2294 let cancel = matches!(
2295 call.status,
2296 ToolCallStatus::Pending
2297 | ToolCallStatus::WaitingForConfirmation { .. }
2298 | ToolCallStatus::InProgress
2299 );
2300
2301 if cancel {
2302 call.status = ToolCallStatus::Canceled;
2303 }
2304 }
2305 }
2306 }
2307
2308 /// Restores the git working tree to the state at the given checkpoint (if one exists)
2309 pub fn restore_checkpoint(
2310 &mut self,
2311 id: UserMessageId,
2312 cx: &mut Context<Self>,
2313 ) -> Task<Result<()>> {
2314 let Some((_, message)) = self.user_message_mut(&id) else {
2315 return Task::ready(Err(anyhow!("message not found")));
2316 };
2317
2318 let checkpoint = message
2319 .checkpoint
2320 .as_ref()
2321 .map(|c| c.git_checkpoint.clone());
2322
2323 // Cancel any in-progress generation before restoring
2324 let cancel_task = self.cancel(cx);
2325 let rewind = self.rewind(id.clone(), cx);
2326 let git_store = self.project.read(cx).git_store().clone();
2327
2328 cx.spawn(async move |_, cx| {
2329 cancel_task.await;
2330 rewind.await?;
2331 if let Some(checkpoint) = checkpoint {
2332 git_store
2333 .update(cx, |git, cx| git.restore_checkpoint(checkpoint, cx))
2334 .await?;
2335 }
2336
2337 Ok(())
2338 })
2339 }
2340
2341 /// Rewinds this thread to before the entry at `index`, removing it and all
2342 /// subsequent entries while rejecting any action_log changes made from that point.
2343 /// Unlike `restore_checkpoint`, this method does not restore from git.
2344 pub fn rewind(&mut self, id: UserMessageId, cx: &mut Context<Self>) -> Task<Result<()>> {
2345 let Some(truncate) = self.connection.truncate(&self.session_id, cx) else {
2346 return Task::ready(Err(anyhow!("not supported")));
2347 };
2348
2349 Self::flush_streaming_text(&mut self.streaming_text_buffer, cx);
2350 let telemetry = ActionLogTelemetry::from(&*self);
2351 cx.spawn(async move |this, cx| {
2352 cx.update(|cx| truncate.run(id.clone(), cx)).await?;
2353 this.update(cx, |this, cx| {
2354 if let Some((ix, _)) = this.user_message_mut(&id) {
2355 // Collect all terminals from entries that will be removed
2356 let terminals_to_remove: Vec<acp::TerminalId> = this.entries[ix..]
2357 .iter()
2358 .flat_map(|entry| entry.terminals())
2359 .filter_map(|terminal| terminal.read(cx).id().clone().into())
2360 .collect();
2361
2362 let range = ix..this.entries.len();
2363 this.entries.truncate(ix);
2364 cx.emit(AcpThreadEvent::EntriesRemoved(range));
2365
2366 // Kill and remove the terminals
2367 for terminal_id in terminals_to_remove {
2368 if let Some(terminal) = this.terminals.remove(&terminal_id) {
2369 terminal.update(cx, |terminal, cx| {
2370 terminal.kill(cx);
2371 });
2372 }
2373 }
2374 }
2375 this.action_log().update(cx, |action_log, cx| {
2376 action_log.reject_all_edits(Some(telemetry), cx)
2377 })
2378 })?
2379 .await;
2380 Ok(())
2381 })
2382 }
2383
2384 fn update_last_checkpoint(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
2385 let git_store = self.project.read(cx).git_store().clone();
2386
2387 let Some((_, message)) = self.last_user_message() else {
2388 return Task::ready(Ok(()));
2389 };
2390 let Some(user_message_id) = message.id.clone() else {
2391 return Task::ready(Ok(()));
2392 };
2393 let Some(checkpoint) = message.checkpoint.as_ref() else {
2394 return Task::ready(Ok(()));
2395 };
2396 let old_checkpoint = checkpoint.git_checkpoint.clone();
2397
2398 let new_checkpoint = git_store.update(cx, |git, cx| git.checkpoint(cx));
2399 cx.spawn(async move |this, cx| {
2400 let Some(new_checkpoint) = new_checkpoint
2401 .await
2402 .context("failed to get new checkpoint")
2403 .log_err()
2404 else {
2405 return Ok(());
2406 };
2407
2408 let equal = git_store
2409 .update(cx, |git, cx| {
2410 git.compare_checkpoints(old_checkpoint.clone(), new_checkpoint, cx)
2411 })
2412 .await
2413 .unwrap_or(true);
2414
2415 this.update(cx, |this, cx| {
2416 if let Some((ix, message)) = this.user_message_mut(&user_message_id) {
2417 if let Some(checkpoint) = message.checkpoint.as_mut() {
2418 checkpoint.show = !equal;
2419 cx.emit(AcpThreadEvent::EntryUpdated(ix));
2420 }
2421 }
2422 })?;
2423
2424 Ok(())
2425 })
2426 }
2427
2428 fn last_user_message(&mut self) -> Option<(usize, &mut UserMessage)> {
2429 self.entries
2430 .iter_mut()
2431 .enumerate()
2432 .rev()
2433 .find_map(|(ix, entry)| {
2434 if let AgentThreadEntry::UserMessage(message) = entry {
2435 Some((ix, message))
2436 } else {
2437 None
2438 }
2439 })
2440 }
2441
2442 fn user_message_mut(&mut self, id: &UserMessageId) -> Option<(usize, &mut UserMessage)> {
2443 self.entries.iter_mut().enumerate().find_map(|(ix, entry)| {
2444 if let AgentThreadEntry::UserMessage(message) = entry {
2445 if message.id.as_ref() == Some(id) {
2446 Some((ix, message))
2447 } else {
2448 None
2449 }
2450 } else {
2451 None
2452 }
2453 })
2454 }
2455
2456 pub fn read_text_file(
2457 &self,
2458 path: PathBuf,
2459 line: Option<u32>,
2460 limit: Option<u32>,
2461 reuse_shared_snapshot: bool,
2462 cx: &mut Context<Self>,
2463 ) -> Task<Result<String, acp::Error>> {
2464 // Args are 1-based, move to 0-based
2465 let line = line.unwrap_or_default().saturating_sub(1);
2466 let limit = limit.unwrap_or(u32::MAX);
2467 let project = self.project.clone();
2468 let action_log = self.action_log.clone();
2469 let should_update_agent_location = self.parent_session_id.is_none();
2470 cx.spawn(async move |this, cx| {
2471 let load = project.update(cx, |project, cx| {
2472 let path = project
2473 .project_path_for_absolute_path(&path, cx)
2474 .ok_or_else(|| {
2475 acp::Error::resource_not_found(Some(path.display().to_string()))
2476 })?;
2477 Ok::<_, acp::Error>(project.open_buffer(path, cx))
2478 })?;
2479
2480 let buffer = load.await?;
2481
2482 let snapshot = if reuse_shared_snapshot {
2483 this.read_with(cx, |this, _| {
2484 this.shared_buffers.get(&buffer.clone()).cloned()
2485 })
2486 .log_err()
2487 .flatten()
2488 } else {
2489 None
2490 };
2491
2492 let snapshot = if let Some(snapshot) = snapshot {
2493 snapshot
2494 } else {
2495 action_log.update(cx, |action_log, cx| {
2496 action_log.buffer_read(buffer.clone(), cx);
2497 });
2498
2499 let snapshot = buffer.update(cx, |buffer, _| buffer.snapshot());
2500 this.update(cx, |this, _| {
2501 this.shared_buffers.insert(buffer.clone(), snapshot.clone());
2502 })?;
2503 snapshot
2504 };
2505
2506 let max_point = snapshot.max_point();
2507 let start_position = Point::new(line, 0);
2508
2509 if start_position > max_point {
2510 return Err(acp::Error::invalid_params().data(format!(
2511 "Attempting to read beyond the end of the file, line {}:{}",
2512 max_point.row + 1,
2513 max_point.column
2514 )));
2515 }
2516
2517 let start = snapshot.anchor_before(start_position);
2518 let end = snapshot.anchor_before(Point::new(line.saturating_add(limit), 0));
2519
2520 if should_update_agent_location {
2521 project.update(cx, |project, cx| {
2522 project.set_agent_location(
2523 Some(AgentLocation {
2524 buffer: buffer.downgrade(),
2525 position: start,
2526 }),
2527 cx,
2528 );
2529 });
2530 }
2531
2532 Ok(snapshot.text_for_range(start..end).collect::<String>())
2533 })
2534 }
2535
2536 pub fn write_text_file(
2537 &self,
2538 path: PathBuf,
2539 content: String,
2540 cx: &mut Context<Self>,
2541 ) -> Task<Result<()>> {
2542 let project = self.project.clone();
2543 let action_log = self.action_log.clone();
2544 let should_update_agent_location = self.parent_session_id.is_none();
2545 cx.spawn(async move |this, cx| {
2546 let load = project.update(cx, |project, cx| {
2547 let path = project
2548 .project_path_for_absolute_path(&path, cx)
2549 .context("invalid path")?;
2550 anyhow::Ok(project.open_buffer(path, cx))
2551 });
2552 let buffer = load?.await?;
2553 let snapshot = this.update(cx, |this, cx| {
2554 this.shared_buffers
2555 .get(&buffer)
2556 .cloned()
2557 .unwrap_or_else(|| buffer.read(cx).snapshot())
2558 })?;
2559 let edits = cx
2560 .background_executor()
2561 .spawn(async move {
2562 let old_text = snapshot.text();
2563 text_diff(old_text.as_str(), &content)
2564 .into_iter()
2565 .map(|(range, replacement)| {
2566 (snapshot.anchor_range_around(range), replacement)
2567 })
2568 .collect::<Vec<_>>()
2569 })
2570 .await;
2571
2572 if should_update_agent_location {
2573 project.update(cx, |project, cx| {
2574 project.set_agent_location(
2575 Some(AgentLocation {
2576 buffer: buffer.downgrade(),
2577 position: edits
2578 .last()
2579 .map(|(range, _)| range.end)
2580 .unwrap_or(Anchor::min_for_buffer(buffer.read(cx).remote_id())),
2581 }),
2582 cx,
2583 );
2584 });
2585 }
2586
2587 let format_on_save = cx.update(|cx| {
2588 action_log.update(cx, |action_log, cx| {
2589 action_log.buffer_read(buffer.clone(), cx);
2590 });
2591
2592 let format_on_save = buffer.update(cx, |buffer, cx| {
2593 buffer.edit(edits, None, cx);
2594
2595 let settings = language::language_settings::language_settings(
2596 buffer.language().map(|l| l.name()),
2597 buffer.file(),
2598 cx,
2599 );
2600
2601 settings.format_on_save != FormatOnSave::Off
2602 });
2603 action_log.update(cx, |action_log, cx| {
2604 action_log.buffer_edited(buffer.clone(), cx);
2605 });
2606 format_on_save
2607 });
2608
2609 if format_on_save {
2610 let format_task = project.update(cx, |project, cx| {
2611 project.format(
2612 HashSet::from_iter([buffer.clone()]),
2613 LspFormatTarget::Buffers,
2614 false,
2615 FormatTrigger::Save,
2616 cx,
2617 )
2618 });
2619 format_task.await.log_err();
2620
2621 action_log.update(cx, |action_log, cx| {
2622 action_log.buffer_edited(buffer.clone(), cx);
2623 });
2624 }
2625
2626 project
2627 .update(cx, |project, cx| project.save_buffer(buffer, cx))
2628 .await
2629 })
2630 }
2631
2632 pub fn create_terminal(
2633 &self,
2634 command: String,
2635 args: Vec<String>,
2636 extra_env: Vec<acp::EnvVariable>,
2637 cwd: Option<PathBuf>,
2638 output_byte_limit: Option<u64>,
2639 cx: &mut Context<Self>,
2640 ) -> Task<Result<Entity<Terminal>>> {
2641 let env = match &cwd {
2642 Some(dir) => self.project.update(cx, |project, cx| {
2643 project.environment().update(cx, |env, cx| {
2644 env.directory_environment(dir.as_path().into(), cx)
2645 })
2646 }),
2647 None => Task::ready(None).shared(),
2648 };
2649 let env = cx.spawn(async move |_, _| {
2650 let mut env = env.await.unwrap_or_default();
2651 // Disables paging for `git` and hopefully other commands
2652 env.insert("PAGER".into(), "".into());
2653 for var in extra_env {
2654 env.insert(var.name, var.value);
2655 }
2656 env
2657 });
2658
2659 let project = self.project.clone();
2660 let language_registry = project.read(cx).languages().clone();
2661 let is_windows = project.read(cx).path_style(cx).is_windows();
2662
2663 let terminal_id = acp::TerminalId::new(Uuid::new_v4().to_string());
2664 let terminal_task = cx.spawn({
2665 let terminal_id = terminal_id.clone();
2666 async move |_this, cx| {
2667 let env = env.await;
2668 let shell = project
2669 .update(cx, |project, cx| {
2670 project
2671 .remote_client()
2672 .and_then(|r| r.read(cx).default_system_shell())
2673 })
2674 .unwrap_or_else(|| get_default_system_shell_preferring_bash());
2675 let (task_command, task_args) =
2676 ShellBuilder::new(&Shell::Program(shell), is_windows)
2677 .redirect_stdin_to_dev_null()
2678 .build(Some(command.clone()), &args);
2679 let terminal = project
2680 .update(cx, |project, cx| {
2681 project.create_terminal_task(
2682 task::SpawnInTerminal {
2683 command: Some(task_command),
2684 args: task_args,
2685 cwd: cwd.clone(),
2686 env,
2687 ..Default::default()
2688 },
2689 cx,
2690 )
2691 })
2692 .await?;
2693
2694 anyhow::Ok(cx.new(|cx| {
2695 Terminal::new(
2696 terminal_id,
2697 &format!("{} {}", command, args.join(" ")),
2698 cwd,
2699 output_byte_limit.map(|l| l as usize),
2700 terminal,
2701 language_registry,
2702 cx,
2703 )
2704 }))
2705 }
2706 });
2707
2708 cx.spawn(async move |this, cx| {
2709 let terminal = terminal_task.await?;
2710 this.update(cx, |this, _cx| {
2711 this.terminals.insert(terminal_id, terminal.clone());
2712 terminal
2713 })
2714 })
2715 }
2716
2717 pub fn kill_terminal(
2718 &mut self,
2719 terminal_id: acp::TerminalId,
2720 cx: &mut Context<Self>,
2721 ) -> Result<()> {
2722 self.terminals
2723 .get(&terminal_id)
2724 .context("Terminal not found")?
2725 .update(cx, |terminal, cx| {
2726 terminal.kill(cx);
2727 });
2728
2729 Ok(())
2730 }
2731
2732 pub fn release_terminal(
2733 &mut self,
2734 terminal_id: acp::TerminalId,
2735 cx: &mut Context<Self>,
2736 ) -> Result<()> {
2737 self.terminals
2738 .remove(&terminal_id)
2739 .context("Terminal not found")?
2740 .update(cx, |terminal, cx| {
2741 terminal.kill(cx);
2742 });
2743
2744 Ok(())
2745 }
2746
2747 pub fn terminal(&self, terminal_id: acp::TerminalId) -> Result<Entity<Terminal>> {
2748 self.terminals
2749 .get(&terminal_id)
2750 .context("Terminal not found")
2751 .cloned()
2752 }
2753
2754 pub fn to_markdown(&self, cx: &App) -> String {
2755 self.entries.iter().map(|e| e.to_markdown(cx)).collect()
2756 }
2757
2758 pub fn emit_load_error(&mut self, error: LoadError, cx: &mut Context<Self>) {
2759 cx.emit(AcpThreadEvent::LoadError(error));
2760 }
2761
2762 pub fn register_terminal_created(
2763 &mut self,
2764 terminal_id: acp::TerminalId,
2765 command_label: String,
2766 working_dir: Option<PathBuf>,
2767 output_byte_limit: Option<u64>,
2768 terminal: Entity<::terminal::Terminal>,
2769 cx: &mut Context<Self>,
2770 ) -> Entity<Terminal> {
2771 let language_registry = self.project.read(cx).languages().clone();
2772
2773 let entity = cx.new(|cx| {
2774 Terminal::new(
2775 terminal_id.clone(),
2776 &command_label,
2777 working_dir.clone(),
2778 output_byte_limit.map(|l| l as usize),
2779 terminal,
2780 language_registry,
2781 cx,
2782 )
2783 });
2784 self.terminals.insert(terminal_id.clone(), entity.clone());
2785 entity
2786 }
2787
2788 pub fn mark_as_subagent_output(&mut self, cx: &mut Context<Self>) {
2789 for entry in self.entries.iter_mut().rev() {
2790 if let AgentThreadEntry::AssistantMessage(assistant_message) = entry {
2791 assistant_message.is_subagent_output = true;
2792 cx.notify();
2793 return;
2794 }
2795 }
2796 }
2797
2798 pub fn on_terminal_provider_event(
2799 &mut self,
2800 event: TerminalProviderEvent,
2801 cx: &mut Context<Self>,
2802 ) {
2803 match event {
2804 TerminalProviderEvent::Created {
2805 terminal_id,
2806 label,
2807 cwd,
2808 output_byte_limit,
2809 terminal,
2810 } => {
2811 let entity = self.register_terminal_created(
2812 terminal_id.clone(),
2813 label,
2814 cwd,
2815 output_byte_limit,
2816 terminal,
2817 cx,
2818 );
2819
2820 if let Some(mut chunks) = self.pending_terminal_output.remove(&terminal_id) {
2821 for data in chunks.drain(..) {
2822 entity.update(cx, |term, cx| {
2823 term.inner().update(cx, |inner, cx| {
2824 inner.write_output(&data, cx);
2825 })
2826 });
2827 }
2828 }
2829
2830 if let Some(_status) = self.pending_terminal_exit.remove(&terminal_id) {
2831 entity.update(cx, |_term, cx| {
2832 cx.notify();
2833 });
2834 }
2835
2836 cx.notify();
2837 }
2838 TerminalProviderEvent::Output { terminal_id, data } => {
2839 if let Some(entity) = self.terminals.get(&terminal_id) {
2840 entity.update(cx, |term, cx| {
2841 term.inner().update(cx, |inner, cx| {
2842 inner.write_output(&data, cx);
2843 })
2844 });
2845 } else {
2846 self.pending_terminal_output
2847 .entry(terminal_id)
2848 .or_default()
2849 .push(data);
2850 }
2851 }
2852 TerminalProviderEvent::TitleChanged { terminal_id, title } => {
2853 if let Some(entity) = self.terminals.get(&terminal_id) {
2854 entity.update(cx, |term, cx| {
2855 term.inner().update(cx, |inner, cx| {
2856 inner.breadcrumb_text = title;
2857 cx.emit(::terminal::Event::BreadcrumbsChanged);
2858 })
2859 });
2860 }
2861 }
2862 TerminalProviderEvent::Exit {
2863 terminal_id,
2864 status,
2865 } => {
2866 if let Some(entity) = self.terminals.get(&terminal_id) {
2867 entity.update(cx, |_term, cx| {
2868 cx.notify();
2869 });
2870 } else {
2871 self.pending_terminal_exit.insert(terminal_id, status);
2872 }
2873 }
2874 }
2875 }
2876}
2877
2878fn markdown_for_raw_output(
2879 raw_output: &serde_json::Value,
2880 language_registry: &Arc<LanguageRegistry>,
2881 cx: &mut App,
2882) -> Option<Entity<Markdown>> {
2883 match raw_output {
2884 serde_json::Value::Null => None,
2885 serde_json::Value::Bool(value) => Some(cx.new(|cx| {
2886 Markdown::new(
2887 value.to_string().into(),
2888 Some(language_registry.clone()),
2889 None,
2890 cx,
2891 )
2892 })),
2893 serde_json::Value::Number(value) => Some(cx.new(|cx| {
2894 Markdown::new(
2895 value.to_string().into(),
2896 Some(language_registry.clone()),
2897 None,
2898 cx,
2899 )
2900 })),
2901 serde_json::Value::String(value) => Some(cx.new(|cx| {
2902 Markdown::new(
2903 value.clone().into(),
2904 Some(language_registry.clone()),
2905 None,
2906 cx,
2907 )
2908 })),
2909 value => Some(cx.new(|cx| {
2910 let pretty_json = to_string_pretty(value).unwrap_or_else(|_| value.to_string());
2911
2912 Markdown::new(
2913 format!("```json\n{}\n```", pretty_json).into(),
2914 Some(language_registry.clone()),
2915 None,
2916 cx,
2917 )
2918 })),
2919 }
2920}
2921
2922#[cfg(test)]
2923mod tests {
2924 use super::*;
2925 use anyhow::anyhow;
2926 use futures::{channel::mpsc, future::LocalBoxFuture, select};
2927 use gpui::{App, AsyncApp, TestAppContext, WeakEntity};
2928 use indoc::indoc;
2929 use project::{AgentId, FakeFs, Fs};
2930 use rand::{distr, prelude::*};
2931 use serde_json::json;
2932 use settings::SettingsStore;
2933 use smol::stream::StreamExt as _;
2934 use std::{
2935 any::Any,
2936 cell::RefCell,
2937 path::Path,
2938 rc::Rc,
2939 sync::atomic::{AtomicBool, AtomicUsize, Ordering::SeqCst},
2940 time::Duration,
2941 };
2942 use util::{path, path_list::PathList};
2943
2944 fn init_test(cx: &mut TestAppContext) {
2945 env_logger::try_init().ok();
2946 cx.update(|cx| {
2947 let settings_store = SettingsStore::test(cx);
2948 cx.set_global(settings_store);
2949 });
2950 }
2951
2952 #[gpui::test]
2953 async fn test_terminal_output_buffered_before_created_renders(cx: &mut gpui::TestAppContext) {
2954 init_test(cx);
2955
2956 let fs = FakeFs::new(cx.executor());
2957 let project = Project::test(fs, [], cx).await;
2958 let connection = Rc::new(FakeAgentConnection::new());
2959 let thread = cx
2960 .update(|cx| {
2961 connection.new_session(
2962 project,
2963 PathList::new(&[std::path::Path::new(path!("/test"))]),
2964 cx,
2965 )
2966 })
2967 .await
2968 .unwrap();
2969
2970 let terminal_id = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
2971
2972 // Send Output BEFORE Created - should be buffered by acp_thread
2973 thread.update(cx, |thread, cx| {
2974 thread.on_terminal_provider_event(
2975 TerminalProviderEvent::Output {
2976 terminal_id: terminal_id.clone(),
2977 data: b"hello buffered".to_vec(),
2978 },
2979 cx,
2980 );
2981 });
2982
2983 // Create a display-only terminal and then send Created
2984 let lower = cx.new(|cx| {
2985 let builder = ::terminal::TerminalBuilder::new_display_only(
2986 ::terminal::terminal_settings::CursorShape::default(),
2987 ::terminal::terminal_settings::AlternateScroll::On,
2988 None,
2989 0,
2990 cx.background_executor(),
2991 PathStyle::local(),
2992 )
2993 .unwrap();
2994 builder.subscribe(cx)
2995 });
2996
2997 thread.update(cx, |thread, cx| {
2998 thread.on_terminal_provider_event(
2999 TerminalProviderEvent::Created {
3000 terminal_id: terminal_id.clone(),
3001 label: "Buffered Test".to_string(),
3002 cwd: None,
3003 output_byte_limit: None,
3004 terminal: lower.clone(),
3005 },
3006 cx,
3007 );
3008 });
3009
3010 // After Created, buffered Output should have been flushed into the renderer
3011 let content = thread.read_with(cx, |thread, cx| {
3012 let term = thread.terminal(terminal_id.clone()).unwrap();
3013 term.read_with(cx, |t, cx| t.inner().read(cx).get_content())
3014 });
3015
3016 assert!(
3017 content.contains("hello buffered"),
3018 "expected buffered output to render, got: {content}"
3019 );
3020 }
3021
3022 #[gpui::test]
3023 async fn test_terminal_output_and_exit_buffered_before_created(cx: &mut gpui::TestAppContext) {
3024 init_test(cx);
3025
3026 let fs = FakeFs::new(cx.executor());
3027 let project = Project::test(fs, [], cx).await;
3028 let connection = Rc::new(FakeAgentConnection::new());
3029 let thread = cx
3030 .update(|cx| {
3031 connection.new_session(
3032 project,
3033 PathList::new(&[std::path::Path::new(path!("/test"))]),
3034 cx,
3035 )
3036 })
3037 .await
3038 .unwrap();
3039
3040 let terminal_id = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
3041
3042 // Send Output BEFORE Created
3043 thread.update(cx, |thread, cx| {
3044 thread.on_terminal_provider_event(
3045 TerminalProviderEvent::Output {
3046 terminal_id: terminal_id.clone(),
3047 data: b"pre-exit data".to_vec(),
3048 },
3049 cx,
3050 );
3051 });
3052
3053 // Send Exit BEFORE Created
3054 thread.update(cx, |thread, cx| {
3055 thread.on_terminal_provider_event(
3056 TerminalProviderEvent::Exit {
3057 terminal_id: terminal_id.clone(),
3058 status: acp::TerminalExitStatus::new().exit_code(0),
3059 },
3060 cx,
3061 );
3062 });
3063
3064 // Now create a display-only lower-level terminal and send Created
3065 let lower = cx.new(|cx| {
3066 let builder = ::terminal::TerminalBuilder::new_display_only(
3067 ::terminal::terminal_settings::CursorShape::default(),
3068 ::terminal::terminal_settings::AlternateScroll::On,
3069 None,
3070 0,
3071 cx.background_executor(),
3072 PathStyle::local(),
3073 )
3074 .unwrap();
3075 builder.subscribe(cx)
3076 });
3077
3078 thread.update(cx, |thread, cx| {
3079 thread.on_terminal_provider_event(
3080 TerminalProviderEvent::Created {
3081 terminal_id: terminal_id.clone(),
3082 label: "Buffered Exit Test".to_string(),
3083 cwd: None,
3084 output_byte_limit: None,
3085 terminal: lower.clone(),
3086 },
3087 cx,
3088 );
3089 });
3090
3091 // Output should be present after Created (flushed from buffer)
3092 let content = thread.read_with(cx, |thread, cx| {
3093 let term = thread.terminal(terminal_id.clone()).unwrap();
3094 term.read_with(cx, |t, cx| t.inner().read(cx).get_content())
3095 });
3096
3097 assert!(
3098 content.contains("pre-exit data"),
3099 "expected pre-exit data to render, got: {content}"
3100 );
3101 }
3102
3103 /// Test that killing a terminal via Terminal::kill properly:
3104 /// 1. Causes wait_for_exit to complete (doesn't hang forever)
3105 /// 2. The underlying terminal still has the output that was written before the kill
3106 ///
3107 /// This test verifies that the fix to kill_active_task (which now also kills
3108 /// the shell process in addition to the foreground process) properly allows
3109 /// wait_for_exit to complete instead of hanging indefinitely.
3110 #[cfg(unix)]
3111 #[gpui::test]
3112 async fn test_terminal_kill_allows_wait_for_exit_to_complete(cx: &mut gpui::TestAppContext) {
3113 use std::collections::HashMap;
3114 use task::Shell;
3115 use util::shell_builder::ShellBuilder;
3116
3117 init_test(cx);
3118 cx.executor().allow_parking();
3119
3120 let fs = FakeFs::new(cx.executor());
3121 let project = Project::test(fs, [], cx).await;
3122 let connection = Rc::new(FakeAgentConnection::new());
3123 let thread = cx
3124 .update(|cx| {
3125 connection.new_session(
3126 project.clone(),
3127 PathList::new(&[Path::new(path!("/test"))]),
3128 cx,
3129 )
3130 })
3131 .await
3132 .unwrap();
3133
3134 let terminal_id = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
3135
3136 // Create a real PTY terminal that runs a command which prints output then sleeps
3137 // We use printf instead of echo and chain with && sleep to ensure proper execution
3138 let (completion_tx, _completion_rx) = smol::channel::unbounded();
3139 let (program, args) = ShellBuilder::new(&Shell::System, false).build(
3140 Some("printf 'output_before_kill\\n' && sleep 60".to_owned()),
3141 &[],
3142 );
3143
3144 let builder = cx
3145 .update(|cx| {
3146 ::terminal::TerminalBuilder::new(
3147 None,
3148 None,
3149 task::Shell::WithArguments {
3150 program,
3151 args,
3152 title_override: None,
3153 },
3154 HashMap::default(),
3155 ::terminal::terminal_settings::CursorShape::default(),
3156 ::terminal::terminal_settings::AlternateScroll::On,
3157 None,
3158 vec![],
3159 0,
3160 false,
3161 0,
3162 Some(completion_tx),
3163 cx,
3164 vec![],
3165 PathStyle::local(),
3166 )
3167 })
3168 .await
3169 .unwrap();
3170
3171 let lower_terminal = cx.new(|cx| builder.subscribe(cx));
3172
3173 // Create the acp_thread Terminal wrapper
3174 thread.update(cx, |thread, cx| {
3175 thread.on_terminal_provider_event(
3176 TerminalProviderEvent::Created {
3177 terminal_id: terminal_id.clone(),
3178 label: "printf output_before_kill && sleep 60".to_string(),
3179 cwd: None,
3180 output_byte_limit: None,
3181 terminal: lower_terminal.clone(),
3182 },
3183 cx,
3184 );
3185 });
3186
3187 // Wait for the printf command to execute and produce output
3188 // Use real time since parking is enabled
3189 cx.executor().timer(Duration::from_millis(500)).await;
3190
3191 // Get the acp_thread Terminal and kill it
3192 let wait_for_exit = thread.update(cx, |thread, cx| {
3193 let term = thread.terminals.get(&terminal_id).unwrap();
3194 let wait_for_exit = term.read(cx).wait_for_exit();
3195 term.update(cx, |term, cx| {
3196 term.kill(cx);
3197 });
3198 wait_for_exit
3199 });
3200
3201 // KEY ASSERTION: wait_for_exit should complete within a reasonable time (not hang).
3202 // Before the fix to kill_active_task, this would hang forever because
3203 // only the foreground process was killed, not the shell, so the PTY
3204 // child never exited and wait_for_completed_task never completed.
3205 let exit_result = futures::select! {
3206 result = futures::FutureExt::fuse(wait_for_exit) => Some(result),
3207 _ = futures::FutureExt::fuse(cx.background_executor.timer(Duration::from_secs(5))) => None,
3208 };
3209
3210 assert!(
3211 exit_result.is_some(),
3212 "wait_for_exit should complete after kill, but it timed out. \
3213 This indicates kill_active_task is not properly killing the shell process."
3214 );
3215
3216 // Give the system a chance to process any pending updates
3217 cx.run_until_parked();
3218
3219 // Verify that the underlying terminal still has the output that was
3220 // written before the kill. This verifies that killing doesn't lose output.
3221 let inner_content = thread.read_with(cx, |thread, cx| {
3222 let term = thread.terminals.get(&terminal_id).unwrap();
3223 term.read(cx).inner().read(cx).get_content()
3224 });
3225
3226 assert!(
3227 inner_content.contains("output_before_kill"),
3228 "Underlying terminal should contain output from before kill, got: {}",
3229 inner_content
3230 );
3231 }
3232
3233 #[gpui::test]
3234 async fn test_push_user_content_block(cx: &mut gpui::TestAppContext) {
3235 init_test(cx);
3236
3237 let fs = FakeFs::new(cx.executor());
3238 let project = Project::test(fs, [], cx).await;
3239 let connection = Rc::new(FakeAgentConnection::new());
3240 let thread = cx
3241 .update(|cx| {
3242 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
3243 })
3244 .await
3245 .unwrap();
3246
3247 // Test creating a new user message
3248 thread.update(cx, |thread, cx| {
3249 thread.push_user_content_block(None, "Hello, ".into(), cx);
3250 });
3251
3252 thread.update(cx, |thread, cx| {
3253 assert_eq!(thread.entries.len(), 1);
3254 if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
3255 assert_eq!(user_msg.id, None);
3256 assert_eq!(user_msg.content.to_markdown(cx), "Hello, ");
3257 } else {
3258 panic!("Expected UserMessage");
3259 }
3260 });
3261
3262 // Test appending to existing user message
3263 let message_1_id = UserMessageId::new();
3264 thread.update(cx, |thread, cx| {
3265 thread.push_user_content_block(Some(message_1_id.clone()), "world!".into(), cx);
3266 });
3267
3268 thread.update(cx, |thread, cx| {
3269 assert_eq!(thread.entries.len(), 1);
3270 if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
3271 assert_eq!(user_msg.id, Some(message_1_id));
3272 assert_eq!(user_msg.content.to_markdown(cx), "Hello, world!");
3273 } else {
3274 panic!("Expected UserMessage");
3275 }
3276 });
3277
3278 // Test creating new user message after assistant message
3279 thread.update(cx, |thread, cx| {
3280 thread.push_assistant_content_block("Assistant response".into(), false, cx);
3281 });
3282
3283 let message_2_id = UserMessageId::new();
3284 thread.update(cx, |thread, cx| {
3285 thread.push_user_content_block(
3286 Some(message_2_id.clone()),
3287 "New user message".into(),
3288 cx,
3289 );
3290 });
3291
3292 thread.update(cx, |thread, cx| {
3293 assert_eq!(thread.entries.len(), 3);
3294 if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[2] {
3295 assert_eq!(user_msg.id, Some(message_2_id));
3296 assert_eq!(user_msg.content.to_markdown(cx), "New user message");
3297 } else {
3298 panic!("Expected UserMessage at index 2");
3299 }
3300 });
3301 }
3302
3303 #[gpui::test]
3304 async fn test_thinking_concatenation(cx: &mut gpui::TestAppContext) {
3305 init_test(cx);
3306
3307 let fs = FakeFs::new(cx.executor());
3308 let project = Project::test(fs, [], cx).await;
3309 let connection = Rc::new(FakeAgentConnection::new().on_user_message(
3310 |_, thread, mut cx| {
3311 async move {
3312 thread.update(&mut cx, |thread, cx| {
3313 thread
3314 .handle_session_update(
3315 acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk::new(
3316 "Thinking ".into(),
3317 )),
3318 cx,
3319 )
3320 .unwrap();
3321 thread
3322 .handle_session_update(
3323 acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk::new(
3324 "hard!".into(),
3325 )),
3326 cx,
3327 )
3328 .unwrap();
3329 })?;
3330 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3331 }
3332 .boxed_local()
3333 },
3334 ));
3335
3336 let thread = cx
3337 .update(|cx| {
3338 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
3339 })
3340 .await
3341 .unwrap();
3342
3343 thread
3344 .update(cx, |thread, cx| thread.send_raw("Hello from Zed!", cx))
3345 .await
3346 .unwrap();
3347
3348 let output = thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
3349 assert_eq!(
3350 output,
3351 indoc! {r#"
3352 ## User
3353
3354 Hello from Zed!
3355
3356 ## Assistant
3357
3358 <thinking>
3359 Thinking hard!
3360 </thinking>
3361
3362 "#}
3363 );
3364 }
3365
3366 #[gpui::test]
3367 async fn test_edits_concurrently_to_user(cx: &mut TestAppContext) {
3368 init_test(cx);
3369
3370 let fs = FakeFs::new(cx.executor());
3371 fs.insert_tree(path!("/tmp"), json!({"foo": "one\ntwo\nthree\n"}))
3372 .await;
3373 let project = Project::test(fs.clone(), [], cx).await;
3374 let (read_file_tx, read_file_rx) = oneshot::channel::<()>();
3375 let read_file_tx = Rc::new(RefCell::new(Some(read_file_tx)));
3376 let connection = Rc::new(FakeAgentConnection::new().on_user_message(
3377 move |_, thread, mut cx| {
3378 let read_file_tx = read_file_tx.clone();
3379 async move {
3380 let content = thread
3381 .update(&mut cx, |thread, cx| {
3382 thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
3383 })
3384 .unwrap()
3385 .await
3386 .unwrap();
3387 assert_eq!(content, "one\ntwo\nthree\n");
3388 read_file_tx.take().unwrap().send(()).unwrap();
3389 thread
3390 .update(&mut cx, |thread, cx| {
3391 thread.write_text_file(
3392 path!("/tmp/foo").into(),
3393 "one\ntwo\nthree\nfour\nfive\n".to_string(),
3394 cx,
3395 )
3396 })
3397 .unwrap()
3398 .await
3399 .unwrap();
3400 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3401 }
3402 .boxed_local()
3403 },
3404 ));
3405
3406 let (worktree, pathbuf) = project
3407 .update(cx, |project, cx| {
3408 project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
3409 })
3410 .await
3411 .unwrap();
3412 let buffer = project
3413 .update(cx, |project, cx| {
3414 project.open_buffer((worktree.read(cx).id(), pathbuf), cx)
3415 })
3416 .await
3417 .unwrap();
3418
3419 let thread = cx
3420 .update(|cx| {
3421 connection.new_session(project, PathList::new(&[Path::new(path!("/tmp"))]), cx)
3422 })
3423 .await
3424 .unwrap();
3425
3426 let request = thread.update(cx, |thread, cx| {
3427 thread.send_raw("Extend the count in /tmp/foo", cx)
3428 });
3429 read_file_rx.await.ok();
3430 buffer.update(cx, |buffer, cx| {
3431 buffer.edit([(0..0, "zero\n".to_string())], None, cx);
3432 });
3433 cx.run_until_parked();
3434 assert_eq!(
3435 buffer.read_with(cx, |buffer, _| buffer.text()),
3436 "zero\none\ntwo\nthree\nfour\nfive\n"
3437 );
3438 assert_eq!(
3439 String::from_utf8(fs.read_file_sync(path!("/tmp/foo")).unwrap()).unwrap(),
3440 "zero\none\ntwo\nthree\nfour\nfive\n"
3441 );
3442 request.await.unwrap();
3443 }
3444
3445 #[gpui::test]
3446 async fn test_reading_from_line(cx: &mut TestAppContext) {
3447 init_test(cx);
3448
3449 let fs = FakeFs::new(cx.executor());
3450 fs.insert_tree(path!("/tmp"), json!({"foo": "one\ntwo\nthree\nfour\n"}))
3451 .await;
3452 let project = Project::test(fs.clone(), [], cx).await;
3453 project
3454 .update(cx, |project, cx| {
3455 project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
3456 })
3457 .await
3458 .unwrap();
3459
3460 let connection = Rc::new(FakeAgentConnection::new());
3461
3462 let thread = cx
3463 .update(|cx| {
3464 connection.new_session(project, PathList::new(&[Path::new(path!("/tmp"))]), cx)
3465 })
3466 .await
3467 .unwrap();
3468
3469 // Whole file
3470 let content = thread
3471 .update(cx, |thread, cx| {
3472 thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
3473 })
3474 .await
3475 .unwrap();
3476
3477 assert_eq!(content, "one\ntwo\nthree\nfour\n");
3478
3479 // Only start line
3480 let content = thread
3481 .update(cx, |thread, cx| {
3482 thread.read_text_file(path!("/tmp/foo").into(), Some(3), None, false, cx)
3483 })
3484 .await
3485 .unwrap();
3486
3487 assert_eq!(content, "three\nfour\n");
3488
3489 // Only limit
3490 let content = thread
3491 .update(cx, |thread, cx| {
3492 thread.read_text_file(path!("/tmp/foo").into(), None, Some(2), false, cx)
3493 })
3494 .await
3495 .unwrap();
3496
3497 assert_eq!(content, "one\ntwo\n");
3498
3499 // Range
3500 let content = thread
3501 .update(cx, |thread, cx| {
3502 thread.read_text_file(path!("/tmp/foo").into(), Some(2), Some(2), false, cx)
3503 })
3504 .await
3505 .unwrap();
3506
3507 assert_eq!(content, "two\nthree\n");
3508
3509 // Invalid
3510 let err = thread
3511 .update(cx, |thread, cx| {
3512 thread.read_text_file(path!("/tmp/foo").into(), Some(6), Some(2), false, cx)
3513 })
3514 .await
3515 .unwrap_err();
3516
3517 assert_eq!(
3518 err.to_string(),
3519 "Invalid params: \"Attempting to read beyond the end of the file, line 5:0\""
3520 );
3521 }
3522
3523 #[gpui::test]
3524 async fn test_reading_empty_file(cx: &mut TestAppContext) {
3525 init_test(cx);
3526
3527 let fs = FakeFs::new(cx.executor());
3528 fs.insert_tree(path!("/tmp"), json!({"foo": ""})).await;
3529 let project = Project::test(fs.clone(), [], cx).await;
3530 project
3531 .update(cx, |project, cx| {
3532 project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
3533 })
3534 .await
3535 .unwrap();
3536
3537 let connection = Rc::new(FakeAgentConnection::new());
3538
3539 let thread = cx
3540 .update(|cx| {
3541 connection.new_session(project, PathList::new(&[Path::new(path!("/tmp"))]), cx)
3542 })
3543 .await
3544 .unwrap();
3545
3546 // Whole file
3547 let content = thread
3548 .update(cx, |thread, cx| {
3549 thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
3550 })
3551 .await
3552 .unwrap();
3553
3554 assert_eq!(content, "");
3555
3556 // Only start line
3557 let content = thread
3558 .update(cx, |thread, cx| {
3559 thread.read_text_file(path!("/tmp/foo").into(), Some(1), None, false, cx)
3560 })
3561 .await
3562 .unwrap();
3563
3564 assert_eq!(content, "");
3565
3566 // Only limit
3567 let content = thread
3568 .update(cx, |thread, cx| {
3569 thread.read_text_file(path!("/tmp/foo").into(), None, Some(2), false, cx)
3570 })
3571 .await
3572 .unwrap();
3573
3574 assert_eq!(content, "");
3575
3576 // Range
3577 let content = thread
3578 .update(cx, |thread, cx| {
3579 thread.read_text_file(path!("/tmp/foo").into(), Some(1), Some(1), false, cx)
3580 })
3581 .await
3582 .unwrap();
3583
3584 assert_eq!(content, "");
3585
3586 // Invalid
3587 let err = thread
3588 .update(cx, |thread, cx| {
3589 thread.read_text_file(path!("/tmp/foo").into(), Some(5), Some(2), false, cx)
3590 })
3591 .await
3592 .unwrap_err();
3593
3594 assert_eq!(
3595 err.to_string(),
3596 "Invalid params: \"Attempting to read beyond the end of the file, line 1:0\""
3597 );
3598 }
3599 #[gpui::test]
3600 async fn test_reading_non_existing_file(cx: &mut TestAppContext) {
3601 init_test(cx);
3602
3603 let fs = FakeFs::new(cx.executor());
3604 fs.insert_tree(path!("/tmp"), json!({})).await;
3605 let project = Project::test(fs.clone(), [], cx).await;
3606 project
3607 .update(cx, |project, cx| {
3608 project.find_or_create_worktree(path!("/tmp"), true, cx)
3609 })
3610 .await
3611 .unwrap();
3612
3613 let connection = Rc::new(FakeAgentConnection::new());
3614
3615 let thread = cx
3616 .update(|cx| {
3617 connection.new_session(project, PathList::new(&[Path::new(path!("/tmp"))]), cx)
3618 })
3619 .await
3620 .unwrap();
3621
3622 // Out of project file
3623 let err = thread
3624 .update(cx, |thread, cx| {
3625 thread.read_text_file(path!("/foo").into(), None, None, false, cx)
3626 })
3627 .await
3628 .unwrap_err();
3629
3630 assert_eq!(err.code, acp::ErrorCode::ResourceNotFound);
3631 }
3632
3633 #[gpui::test]
3634 async fn test_succeeding_canceled_toolcall(cx: &mut TestAppContext) {
3635 init_test(cx);
3636
3637 let fs = FakeFs::new(cx.executor());
3638 let project = Project::test(fs, [], cx).await;
3639 let id = acp::ToolCallId::new("test");
3640
3641 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3642 let id = id.clone();
3643 move |_, thread, mut cx| {
3644 let id = id.clone();
3645 async move {
3646 thread
3647 .update(&mut cx, |thread, cx| {
3648 thread.handle_session_update(
3649 acp::SessionUpdate::ToolCall(
3650 acp::ToolCall::new(id.clone(), "Label")
3651 .kind(acp::ToolKind::Fetch)
3652 .status(acp::ToolCallStatus::InProgress),
3653 ),
3654 cx,
3655 )
3656 })
3657 .unwrap()
3658 .unwrap();
3659 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3660 }
3661 .boxed_local()
3662 }
3663 }));
3664
3665 let thread = cx
3666 .update(|cx| {
3667 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
3668 })
3669 .await
3670 .unwrap();
3671
3672 let request = thread.update(cx, |thread, cx| {
3673 thread.send_raw("Fetch https://example.com", cx)
3674 });
3675
3676 run_until_first_tool_call(&thread, cx).await;
3677
3678 thread.read_with(cx, |thread, _| {
3679 assert!(matches!(
3680 thread.entries[1],
3681 AgentThreadEntry::ToolCall(ToolCall {
3682 status: ToolCallStatus::InProgress,
3683 ..
3684 })
3685 ));
3686 });
3687
3688 thread.update(cx, |thread, cx| thread.cancel(cx)).await;
3689
3690 thread.read_with(cx, |thread, _| {
3691 assert!(matches!(
3692 &thread.entries[1],
3693 AgentThreadEntry::ToolCall(ToolCall {
3694 status: ToolCallStatus::Canceled,
3695 ..
3696 })
3697 ));
3698 });
3699
3700 thread
3701 .update(cx, |thread, cx| {
3702 thread.handle_session_update(
3703 acp::SessionUpdate::ToolCallUpdate(acp::ToolCallUpdate::new(
3704 id,
3705 acp::ToolCallUpdateFields::new().status(acp::ToolCallStatus::Completed),
3706 )),
3707 cx,
3708 )
3709 })
3710 .unwrap();
3711
3712 request.await.unwrap();
3713
3714 thread.read_with(cx, |thread, _| {
3715 assert!(matches!(
3716 thread.entries[1],
3717 AgentThreadEntry::ToolCall(ToolCall {
3718 status: ToolCallStatus::Completed,
3719 ..
3720 })
3721 ));
3722 });
3723 }
3724
3725 #[gpui::test]
3726 async fn test_no_pending_edits_if_tool_calls_are_completed(cx: &mut TestAppContext) {
3727 init_test(cx);
3728 let fs = FakeFs::new(cx.background_executor.clone());
3729 fs.insert_tree(path!("/test"), json!({})).await;
3730 let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
3731
3732 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3733 move |_, thread, mut cx| {
3734 async move {
3735 thread
3736 .update(&mut cx, |thread, cx| {
3737 thread.handle_session_update(
3738 acp::SessionUpdate::ToolCall(
3739 acp::ToolCall::new("test", "Label")
3740 .kind(acp::ToolKind::Edit)
3741 .status(acp::ToolCallStatus::Completed)
3742 .content(vec![acp::ToolCallContent::Diff(acp::Diff::new(
3743 "/test/test.txt",
3744 "foo",
3745 ))]),
3746 ),
3747 cx,
3748 )
3749 })
3750 .unwrap()
3751 .unwrap();
3752 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3753 }
3754 .boxed_local()
3755 }
3756 }));
3757
3758 let thread = cx
3759 .update(|cx| {
3760 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
3761 })
3762 .await
3763 .unwrap();
3764
3765 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Hi".into()], cx)))
3766 .await
3767 .unwrap();
3768
3769 assert!(cx.read(|cx| !thread.read(cx).has_pending_edit_tool_calls()));
3770 }
3771
3772 #[gpui::test(iterations = 10)]
3773 async fn test_checkpoints(cx: &mut TestAppContext) {
3774 init_test(cx);
3775 let fs = FakeFs::new(cx.background_executor.clone());
3776 fs.insert_tree(
3777 path!("/test"),
3778 json!({
3779 ".git": {}
3780 }),
3781 )
3782 .await;
3783 let project = Project::test(fs.clone(), [path!("/test").as_ref()], cx).await;
3784
3785 let simulate_changes = Arc::new(AtomicBool::new(true));
3786 let next_filename = Arc::new(AtomicUsize::new(0));
3787 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3788 let simulate_changes = simulate_changes.clone();
3789 let next_filename = next_filename.clone();
3790 let fs = fs.clone();
3791 move |request, thread, mut cx| {
3792 let fs = fs.clone();
3793 let simulate_changes = simulate_changes.clone();
3794 let next_filename = next_filename.clone();
3795 async move {
3796 if simulate_changes.load(SeqCst) {
3797 let filename = format!("/test/file-{}", next_filename.fetch_add(1, SeqCst));
3798 fs.write(Path::new(&filename), b"").await?;
3799 }
3800
3801 let acp::ContentBlock::Text(content) = &request.prompt[0] else {
3802 panic!("expected text content block");
3803 };
3804 thread.update(&mut cx, |thread, cx| {
3805 thread
3806 .handle_session_update(
3807 acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk::new(
3808 content.text.to_uppercase().into(),
3809 )),
3810 cx,
3811 )
3812 .unwrap();
3813 })?;
3814 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3815 }
3816 .boxed_local()
3817 }
3818 }));
3819 let thread = cx
3820 .update(|cx| {
3821 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
3822 })
3823 .await
3824 .unwrap();
3825
3826 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Lorem".into()], cx)))
3827 .await
3828 .unwrap();
3829 thread.read_with(cx, |thread, cx| {
3830 assert_eq!(
3831 thread.to_markdown(cx),
3832 indoc! {"
3833 ## User (checkpoint)
3834
3835 Lorem
3836
3837 ## Assistant
3838
3839 LOREM
3840
3841 "}
3842 );
3843 });
3844 assert_eq!(fs.files(), vec![Path::new(path!("/test/file-0"))]);
3845
3846 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["ipsum".into()], cx)))
3847 .await
3848 .unwrap();
3849 thread.read_with(cx, |thread, cx| {
3850 assert_eq!(
3851 thread.to_markdown(cx),
3852 indoc! {"
3853 ## User (checkpoint)
3854
3855 Lorem
3856
3857 ## Assistant
3858
3859 LOREM
3860
3861 ## User (checkpoint)
3862
3863 ipsum
3864
3865 ## Assistant
3866
3867 IPSUM
3868
3869 "}
3870 );
3871 });
3872 assert_eq!(
3873 fs.files(),
3874 vec![
3875 Path::new(path!("/test/file-0")),
3876 Path::new(path!("/test/file-1"))
3877 ]
3878 );
3879
3880 // Checkpoint isn't stored when there are no changes.
3881 simulate_changes.store(false, SeqCst);
3882 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["dolor".into()], cx)))
3883 .await
3884 .unwrap();
3885 thread.read_with(cx, |thread, cx| {
3886 assert_eq!(
3887 thread.to_markdown(cx),
3888 indoc! {"
3889 ## User (checkpoint)
3890
3891 Lorem
3892
3893 ## Assistant
3894
3895 LOREM
3896
3897 ## User (checkpoint)
3898
3899 ipsum
3900
3901 ## Assistant
3902
3903 IPSUM
3904
3905 ## User
3906
3907 dolor
3908
3909 ## Assistant
3910
3911 DOLOR
3912
3913 "}
3914 );
3915 });
3916 assert_eq!(
3917 fs.files(),
3918 vec![
3919 Path::new(path!("/test/file-0")),
3920 Path::new(path!("/test/file-1"))
3921 ]
3922 );
3923
3924 // Rewinding the conversation truncates the history and restores the checkpoint.
3925 thread
3926 .update(cx, |thread, cx| {
3927 let AgentThreadEntry::UserMessage(message) = &thread.entries[2] else {
3928 panic!("unexpected entries {:?}", thread.entries)
3929 };
3930 thread.restore_checkpoint(message.id.clone().unwrap(), cx)
3931 })
3932 .await
3933 .unwrap();
3934 thread.read_with(cx, |thread, cx| {
3935 assert_eq!(
3936 thread.to_markdown(cx),
3937 indoc! {"
3938 ## User (checkpoint)
3939
3940 Lorem
3941
3942 ## Assistant
3943
3944 LOREM
3945
3946 "}
3947 );
3948 });
3949 assert_eq!(fs.files(), vec![Path::new(path!("/test/file-0"))]);
3950 }
3951
3952 #[gpui::test]
3953 async fn test_tool_result_refusal(cx: &mut TestAppContext) {
3954 use std::sync::atomic::AtomicUsize;
3955 init_test(cx);
3956
3957 let fs = FakeFs::new(cx.executor());
3958 let project = Project::test(fs, None, cx).await;
3959
3960 // Create a connection that simulates refusal after tool result
3961 let prompt_count = Arc::new(AtomicUsize::new(0));
3962 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3963 let prompt_count = prompt_count.clone();
3964 move |_request, thread, mut cx| {
3965 let count = prompt_count.fetch_add(1, SeqCst);
3966 async move {
3967 if count == 0 {
3968 // First prompt: Generate a tool call with result
3969 thread.update(&mut cx, |thread, cx| {
3970 thread
3971 .handle_session_update(
3972 acp::SessionUpdate::ToolCall(
3973 acp::ToolCall::new("tool1", "Test Tool")
3974 .kind(acp::ToolKind::Fetch)
3975 .status(acp::ToolCallStatus::Completed)
3976 .raw_input(serde_json::json!({"query": "test"}))
3977 .raw_output(serde_json::json!({"result": "inappropriate content"})),
3978 ),
3979 cx,
3980 )
3981 .unwrap();
3982 })?;
3983
3984 // Now return refusal because of the tool result
3985 Ok(acp::PromptResponse::new(acp::StopReason::Refusal))
3986 } else {
3987 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3988 }
3989 }
3990 .boxed_local()
3991 }
3992 }));
3993
3994 let thread = cx
3995 .update(|cx| {
3996 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
3997 })
3998 .await
3999 .unwrap();
4000
4001 // Track if we see a Refusal event
4002 let saw_refusal_event = Arc::new(std::sync::Mutex::new(false));
4003 let saw_refusal_event_captured = saw_refusal_event.clone();
4004 thread.update(cx, |_thread, cx| {
4005 cx.subscribe(
4006 &thread,
4007 move |_thread, _event_thread, event: &AcpThreadEvent, _cx| {
4008 if matches!(event, AcpThreadEvent::Refusal) {
4009 *saw_refusal_event_captured.lock().unwrap() = true;
4010 }
4011 },
4012 )
4013 .detach();
4014 });
4015
4016 // Send a user message - this will trigger tool call and then refusal
4017 let send_task = thread.update(cx, |thread, cx| thread.send(vec!["Hello".into()], cx));
4018 cx.background_executor.spawn(send_task).detach();
4019 cx.run_until_parked();
4020
4021 // Verify that:
4022 // 1. A Refusal event WAS emitted (because it's a tool result refusal, not user prompt)
4023 // 2. The user message was NOT truncated
4024 assert!(
4025 *saw_refusal_event.lock().unwrap(),
4026 "Refusal event should be emitted for tool result refusals"
4027 );
4028
4029 thread.read_with(cx, |thread, _| {
4030 let entries = thread.entries();
4031 assert!(entries.len() >= 2, "Should have user message and tool call");
4032
4033 // Verify user message is still there
4034 assert!(
4035 matches!(entries[0], AgentThreadEntry::UserMessage(_)),
4036 "User message should not be truncated"
4037 );
4038
4039 // Verify tool call is there with result
4040 if let AgentThreadEntry::ToolCall(tool_call) = &entries[1] {
4041 assert!(
4042 tool_call.raw_output.is_some(),
4043 "Tool call should have output"
4044 );
4045 } else {
4046 panic!("Expected tool call at index 1");
4047 }
4048 });
4049 }
4050
4051 #[gpui::test]
4052 async fn test_user_prompt_refusal_emits_event(cx: &mut TestAppContext) {
4053 init_test(cx);
4054
4055 let fs = FakeFs::new(cx.executor());
4056 let project = Project::test(fs, None, cx).await;
4057
4058 let refuse_next = Arc::new(AtomicBool::new(false));
4059 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
4060 let refuse_next = refuse_next.clone();
4061 move |_request, _thread, _cx| {
4062 if refuse_next.load(SeqCst) {
4063 async move { Ok(acp::PromptResponse::new(acp::StopReason::Refusal)) }
4064 .boxed_local()
4065 } else {
4066 async move { Ok(acp::PromptResponse::new(acp::StopReason::EndTurn)) }
4067 .boxed_local()
4068 }
4069 }
4070 }));
4071
4072 let thread = cx
4073 .update(|cx| {
4074 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4075 })
4076 .await
4077 .unwrap();
4078
4079 // Track if we see a Refusal event
4080 let saw_refusal_event = Arc::new(std::sync::Mutex::new(false));
4081 let saw_refusal_event_captured = saw_refusal_event.clone();
4082 thread.update(cx, |_thread, cx| {
4083 cx.subscribe(
4084 &thread,
4085 move |_thread, _event_thread, event: &AcpThreadEvent, _cx| {
4086 if matches!(event, AcpThreadEvent::Refusal) {
4087 *saw_refusal_event_captured.lock().unwrap() = true;
4088 }
4089 },
4090 )
4091 .detach();
4092 });
4093
4094 // Send a message that will be refused
4095 refuse_next.store(true, SeqCst);
4096 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["hello".into()], cx)))
4097 .await
4098 .unwrap();
4099
4100 // Verify that a Refusal event WAS emitted for user prompt refusal
4101 assert!(
4102 *saw_refusal_event.lock().unwrap(),
4103 "Refusal event should be emitted for user prompt refusals"
4104 );
4105
4106 // Verify the message was truncated (user prompt refusal)
4107 thread.read_with(cx, |thread, cx| {
4108 assert_eq!(thread.to_markdown(cx), "");
4109 });
4110 }
4111
4112 #[gpui::test]
4113 async fn test_refusal(cx: &mut TestAppContext) {
4114 init_test(cx);
4115 let fs = FakeFs::new(cx.background_executor.clone());
4116 fs.insert_tree(path!("/"), json!({})).await;
4117 let project = Project::test(fs.clone(), [path!("/").as_ref()], cx).await;
4118
4119 let refuse_next = Arc::new(AtomicBool::new(false));
4120 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
4121 let refuse_next = refuse_next.clone();
4122 move |request, thread, mut cx| {
4123 let refuse_next = refuse_next.clone();
4124 async move {
4125 if refuse_next.load(SeqCst) {
4126 return Ok(acp::PromptResponse::new(acp::StopReason::Refusal));
4127 }
4128
4129 let acp::ContentBlock::Text(content) = &request.prompt[0] else {
4130 panic!("expected text content block");
4131 };
4132 thread.update(&mut cx, |thread, cx| {
4133 thread
4134 .handle_session_update(
4135 acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk::new(
4136 content.text.to_uppercase().into(),
4137 )),
4138 cx,
4139 )
4140 .unwrap();
4141 })?;
4142 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
4143 }
4144 .boxed_local()
4145 }
4146 }));
4147 let thread = cx
4148 .update(|cx| {
4149 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4150 })
4151 .await
4152 .unwrap();
4153
4154 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["hello".into()], cx)))
4155 .await
4156 .unwrap();
4157 thread.read_with(cx, |thread, cx| {
4158 assert_eq!(
4159 thread.to_markdown(cx),
4160 indoc! {"
4161 ## User
4162
4163 hello
4164
4165 ## Assistant
4166
4167 HELLO
4168
4169 "}
4170 );
4171 });
4172
4173 // Simulate refusing the second message. The message should be truncated
4174 // when a user prompt is refused.
4175 refuse_next.store(true, SeqCst);
4176 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["world".into()], cx)))
4177 .await
4178 .unwrap();
4179 thread.read_with(cx, |thread, cx| {
4180 assert_eq!(
4181 thread.to_markdown(cx),
4182 indoc! {"
4183 ## User
4184
4185 hello
4186
4187 ## Assistant
4188
4189 HELLO
4190
4191 "}
4192 );
4193 });
4194 }
4195
4196 async fn run_until_first_tool_call(
4197 thread: &Entity<AcpThread>,
4198 cx: &mut TestAppContext,
4199 ) -> usize {
4200 let (mut tx, mut rx) = mpsc::channel::<usize>(1);
4201
4202 let subscription = cx.update(|cx| {
4203 cx.subscribe(thread, move |thread, _, cx| {
4204 for (ix, entry) in thread.read(cx).entries.iter().enumerate() {
4205 if matches!(entry, AgentThreadEntry::ToolCall(_)) {
4206 return tx.try_send(ix).unwrap();
4207 }
4208 }
4209 })
4210 });
4211
4212 select! {
4213 _ = futures::FutureExt::fuse(cx.background_executor.timer(Duration::from_secs(10))) => {
4214 panic!("Timeout waiting for tool call")
4215 }
4216 ix = rx.next().fuse() => {
4217 drop(subscription);
4218 ix.unwrap()
4219 }
4220 }
4221 }
4222
4223 #[derive(Clone, Default)]
4224 struct FakeAgentConnection {
4225 auth_methods: Vec<acp::AuthMethod>,
4226 sessions: Arc<parking_lot::Mutex<HashMap<acp::SessionId, WeakEntity<AcpThread>>>>,
4227 set_title_calls: Rc<RefCell<Vec<SharedString>>>,
4228 on_user_message: Option<
4229 Rc<
4230 dyn Fn(
4231 acp::PromptRequest,
4232 WeakEntity<AcpThread>,
4233 AsyncApp,
4234 ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
4235 + 'static,
4236 >,
4237 >,
4238 }
4239
4240 impl FakeAgentConnection {
4241 fn new() -> Self {
4242 Self {
4243 auth_methods: Vec::new(),
4244 on_user_message: None,
4245 sessions: Arc::default(),
4246 set_title_calls: Default::default(),
4247 }
4248 }
4249
4250 #[expect(unused)]
4251 fn with_auth_methods(mut self, auth_methods: Vec<acp::AuthMethod>) -> Self {
4252 self.auth_methods = auth_methods;
4253 self
4254 }
4255
4256 fn on_user_message(
4257 mut self,
4258 handler: impl Fn(
4259 acp::PromptRequest,
4260 WeakEntity<AcpThread>,
4261 AsyncApp,
4262 ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
4263 + 'static,
4264 ) -> Self {
4265 self.on_user_message.replace(Rc::new(handler));
4266 self
4267 }
4268 }
4269
4270 impl AgentConnection for FakeAgentConnection {
4271 fn agent_id(&self) -> AgentId {
4272 AgentId::new("fake")
4273 }
4274
4275 fn telemetry_id(&self) -> SharedString {
4276 "fake".into()
4277 }
4278
4279 fn auth_methods(&self) -> &[acp::AuthMethod] {
4280 &self.auth_methods
4281 }
4282
4283 fn new_session(
4284 self: Rc<Self>,
4285 project: Entity<Project>,
4286 work_dirs: PathList,
4287 cx: &mut App,
4288 ) -> Task<gpui::Result<Entity<AcpThread>>> {
4289 let session_id = acp::SessionId::new(
4290 rand::rng()
4291 .sample_iter(&distr::Alphanumeric)
4292 .take(7)
4293 .map(char::from)
4294 .collect::<String>(),
4295 );
4296 let action_log = cx.new(|_| ActionLog::new(project.clone()));
4297 let thread = cx.new(|cx| {
4298 AcpThread::new(
4299 None,
4300 "Test",
4301 Some(work_dirs),
4302 self.clone(),
4303 project,
4304 action_log,
4305 session_id.clone(),
4306 watch::Receiver::constant(
4307 acp::PromptCapabilities::new()
4308 .image(true)
4309 .audio(true)
4310 .embedded_context(true),
4311 ),
4312 cx,
4313 )
4314 });
4315 self.sessions.lock().insert(session_id, thread.downgrade());
4316 Task::ready(Ok(thread))
4317 }
4318
4319 fn authenticate(&self, method: acp::AuthMethodId, _cx: &mut App) -> Task<gpui::Result<()>> {
4320 if self.auth_methods().iter().any(|m| m.id() == &method) {
4321 Task::ready(Ok(()))
4322 } else {
4323 Task::ready(Err(anyhow!("Invalid Auth Method")))
4324 }
4325 }
4326
4327 fn prompt(
4328 &self,
4329 _id: Option<UserMessageId>,
4330 params: acp::PromptRequest,
4331 cx: &mut App,
4332 ) -> Task<gpui::Result<acp::PromptResponse>> {
4333 let sessions = self.sessions.lock();
4334 let thread = sessions.get(¶ms.session_id).unwrap();
4335 if let Some(handler) = &self.on_user_message {
4336 let handler = handler.clone();
4337 let thread = thread.clone();
4338 cx.spawn(async move |cx| handler(params, thread, cx.clone()).await)
4339 } else {
4340 Task::ready(Ok(acp::PromptResponse::new(acp::StopReason::EndTurn)))
4341 }
4342 }
4343
4344 fn cancel(&self, _session_id: &acp::SessionId, _cx: &mut App) {}
4345
4346 fn truncate(
4347 &self,
4348 session_id: &acp::SessionId,
4349 _cx: &App,
4350 ) -> Option<Rc<dyn AgentSessionTruncate>> {
4351 Some(Rc::new(FakeAgentSessionEditor {
4352 _session_id: session_id.clone(),
4353 }))
4354 }
4355
4356 fn set_title(
4357 &self,
4358 _session_id: &acp::SessionId,
4359 _cx: &App,
4360 ) -> Option<Rc<dyn AgentSessionSetTitle>> {
4361 Some(Rc::new(FakeAgentSessionSetTitle {
4362 calls: self.set_title_calls.clone(),
4363 }))
4364 }
4365
4366 fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
4367 self
4368 }
4369 }
4370
4371 struct FakeAgentSessionSetTitle {
4372 calls: Rc<RefCell<Vec<SharedString>>>,
4373 }
4374
4375 impl AgentSessionSetTitle for FakeAgentSessionSetTitle {
4376 fn run(&self, title: SharedString, _cx: &mut App) -> Task<Result<()>> {
4377 self.calls.borrow_mut().push(title);
4378 Task::ready(Ok(()))
4379 }
4380 }
4381
4382 struct FakeAgentSessionEditor {
4383 _session_id: acp::SessionId,
4384 }
4385
4386 impl AgentSessionTruncate for FakeAgentSessionEditor {
4387 fn run(&self, _message_id: UserMessageId, _cx: &mut App) -> Task<Result<()>> {
4388 Task::ready(Ok(()))
4389 }
4390 }
4391
4392 #[gpui::test]
4393 async fn test_tool_call_not_found_creates_failed_entry(cx: &mut TestAppContext) {
4394 init_test(cx);
4395
4396 let fs = FakeFs::new(cx.executor());
4397 let project = Project::test(fs, [], cx).await;
4398 let connection = Rc::new(FakeAgentConnection::new());
4399 let thread = cx
4400 .update(|cx| {
4401 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4402 })
4403 .await
4404 .unwrap();
4405
4406 // Try to update a tool call that doesn't exist
4407 let nonexistent_id = acp::ToolCallId::new("nonexistent-tool-call");
4408 thread.update(cx, |thread, cx| {
4409 let result = thread.handle_session_update(
4410 acp::SessionUpdate::ToolCallUpdate(acp::ToolCallUpdate::new(
4411 nonexistent_id.clone(),
4412 acp::ToolCallUpdateFields::new().status(acp::ToolCallStatus::Completed),
4413 )),
4414 cx,
4415 );
4416
4417 // The update should succeed (not return an error)
4418 assert!(result.is_ok());
4419
4420 // There should now be exactly one entry in the thread
4421 assert_eq!(thread.entries.len(), 1);
4422
4423 // The entry should be a failed tool call
4424 if let AgentThreadEntry::ToolCall(tool_call) = &thread.entries[0] {
4425 assert_eq!(tool_call.id, nonexistent_id);
4426 assert!(matches!(tool_call.status, ToolCallStatus::Failed));
4427 assert_eq!(tool_call.kind, acp::ToolKind::Fetch);
4428
4429 // Check that the content contains the error message
4430 assert_eq!(tool_call.content.len(), 1);
4431 if let ToolCallContent::ContentBlock(content_block) = &tool_call.content[0] {
4432 match content_block {
4433 ContentBlock::Markdown { markdown } => {
4434 let markdown_text = markdown.read(cx).source();
4435 assert!(markdown_text.contains("Tool call not found"));
4436 }
4437 ContentBlock::Empty => panic!("Expected markdown content, got empty"),
4438 ContentBlock::ResourceLink { .. } => {
4439 panic!("Expected markdown content, got resource link")
4440 }
4441 ContentBlock::Image { .. } => {
4442 panic!("Expected markdown content, got image")
4443 }
4444 }
4445 } else {
4446 panic!("Expected ContentBlock, got: {:?}", tool_call.content[0]);
4447 }
4448 } else {
4449 panic!("Expected ToolCall entry, got: {:?}", thread.entries[0]);
4450 }
4451 });
4452 }
4453
4454 /// Tests that restoring a checkpoint properly cleans up terminals that were
4455 /// created after that checkpoint, and cancels any in-progress generation.
4456 ///
4457 /// Reproduces issue #35142: When a checkpoint is restored, any terminal processes
4458 /// that were started after that checkpoint should be terminated, and any in-progress
4459 /// AI generation should be canceled.
4460 #[gpui::test]
4461 async fn test_restore_checkpoint_kills_terminal(cx: &mut TestAppContext) {
4462 init_test(cx);
4463
4464 let fs = FakeFs::new(cx.executor());
4465 let project = Project::test(fs, [], cx).await;
4466 let connection = Rc::new(FakeAgentConnection::new());
4467 let thread = cx
4468 .update(|cx| {
4469 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4470 })
4471 .await
4472 .unwrap();
4473
4474 // Send first user message to create a checkpoint
4475 cx.update(|cx| {
4476 thread.update(cx, |thread, cx| {
4477 thread.send(vec!["first message".into()], cx)
4478 })
4479 })
4480 .await
4481 .unwrap();
4482
4483 // Send second message (creates another checkpoint) - we'll restore to this one
4484 cx.update(|cx| {
4485 thread.update(cx, |thread, cx| {
4486 thread.send(vec!["second message".into()], cx)
4487 })
4488 })
4489 .await
4490 .unwrap();
4491
4492 // Create 2 terminals BEFORE the checkpoint that have completed running
4493 let terminal_id_1 = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
4494 let mock_terminal_1 = cx.new(|cx| {
4495 let builder = ::terminal::TerminalBuilder::new_display_only(
4496 ::terminal::terminal_settings::CursorShape::default(),
4497 ::terminal::terminal_settings::AlternateScroll::On,
4498 None,
4499 0,
4500 cx.background_executor(),
4501 PathStyle::local(),
4502 )
4503 .unwrap();
4504 builder.subscribe(cx)
4505 });
4506
4507 thread.update(cx, |thread, cx| {
4508 thread.on_terminal_provider_event(
4509 TerminalProviderEvent::Created {
4510 terminal_id: terminal_id_1.clone(),
4511 label: "echo 'first'".to_string(),
4512 cwd: Some(PathBuf::from("/test")),
4513 output_byte_limit: None,
4514 terminal: mock_terminal_1.clone(),
4515 },
4516 cx,
4517 );
4518 });
4519
4520 thread.update(cx, |thread, cx| {
4521 thread.on_terminal_provider_event(
4522 TerminalProviderEvent::Output {
4523 terminal_id: terminal_id_1.clone(),
4524 data: b"first\n".to_vec(),
4525 },
4526 cx,
4527 );
4528 });
4529
4530 thread.update(cx, |thread, cx| {
4531 thread.on_terminal_provider_event(
4532 TerminalProviderEvent::Exit {
4533 terminal_id: terminal_id_1.clone(),
4534 status: acp::TerminalExitStatus::new().exit_code(0),
4535 },
4536 cx,
4537 );
4538 });
4539
4540 let terminal_id_2 = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
4541 let mock_terminal_2 = cx.new(|cx| {
4542 let builder = ::terminal::TerminalBuilder::new_display_only(
4543 ::terminal::terminal_settings::CursorShape::default(),
4544 ::terminal::terminal_settings::AlternateScroll::On,
4545 None,
4546 0,
4547 cx.background_executor(),
4548 PathStyle::local(),
4549 )
4550 .unwrap();
4551 builder.subscribe(cx)
4552 });
4553
4554 thread.update(cx, |thread, cx| {
4555 thread.on_terminal_provider_event(
4556 TerminalProviderEvent::Created {
4557 terminal_id: terminal_id_2.clone(),
4558 label: "echo 'second'".to_string(),
4559 cwd: Some(PathBuf::from("/test")),
4560 output_byte_limit: None,
4561 terminal: mock_terminal_2.clone(),
4562 },
4563 cx,
4564 );
4565 });
4566
4567 thread.update(cx, |thread, cx| {
4568 thread.on_terminal_provider_event(
4569 TerminalProviderEvent::Output {
4570 terminal_id: terminal_id_2.clone(),
4571 data: b"second\n".to_vec(),
4572 },
4573 cx,
4574 );
4575 });
4576
4577 thread.update(cx, |thread, cx| {
4578 thread.on_terminal_provider_event(
4579 TerminalProviderEvent::Exit {
4580 terminal_id: terminal_id_2.clone(),
4581 status: acp::TerminalExitStatus::new().exit_code(0),
4582 },
4583 cx,
4584 );
4585 });
4586
4587 // Get the second message ID to restore to
4588 let second_message_id = thread.read_with(cx, |thread, _| {
4589 // At this point we have:
4590 // - Index 0: First user message (with checkpoint)
4591 // - Index 1: Second user message (with checkpoint)
4592 // No assistant responses because FakeAgentConnection just returns EndTurn
4593 let AgentThreadEntry::UserMessage(message) = &thread.entries[1] else {
4594 panic!("expected user message at index 1");
4595 };
4596 message.id.clone().unwrap()
4597 });
4598
4599 // Create a terminal AFTER the checkpoint we'll restore to.
4600 // This simulates the AI agent starting a long-running terminal command.
4601 let terminal_id = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
4602 let mock_terminal = cx.new(|cx| {
4603 let builder = ::terminal::TerminalBuilder::new_display_only(
4604 ::terminal::terminal_settings::CursorShape::default(),
4605 ::terminal::terminal_settings::AlternateScroll::On,
4606 None,
4607 0,
4608 cx.background_executor(),
4609 PathStyle::local(),
4610 )
4611 .unwrap();
4612 builder.subscribe(cx)
4613 });
4614
4615 // Register the terminal as created
4616 thread.update(cx, |thread, cx| {
4617 thread.on_terminal_provider_event(
4618 TerminalProviderEvent::Created {
4619 terminal_id: terminal_id.clone(),
4620 label: "sleep 1000".to_string(),
4621 cwd: Some(PathBuf::from("/test")),
4622 output_byte_limit: None,
4623 terminal: mock_terminal.clone(),
4624 },
4625 cx,
4626 );
4627 });
4628
4629 // Simulate the terminal producing output (still running)
4630 thread.update(cx, |thread, cx| {
4631 thread.on_terminal_provider_event(
4632 TerminalProviderEvent::Output {
4633 terminal_id: terminal_id.clone(),
4634 data: b"terminal is running...\n".to_vec(),
4635 },
4636 cx,
4637 );
4638 });
4639
4640 // Create a tool call entry that references this terminal
4641 // This represents the agent requesting a terminal command
4642 thread.update(cx, |thread, cx| {
4643 thread
4644 .handle_session_update(
4645 acp::SessionUpdate::ToolCall(
4646 acp::ToolCall::new("terminal-tool-1", "Running command")
4647 .kind(acp::ToolKind::Execute)
4648 .status(acp::ToolCallStatus::InProgress)
4649 .content(vec![acp::ToolCallContent::Terminal(acp::Terminal::new(
4650 terminal_id.clone(),
4651 ))])
4652 .raw_input(serde_json::json!({"command": "sleep 1000", "cd": "/test"})),
4653 ),
4654 cx,
4655 )
4656 .unwrap();
4657 });
4658
4659 // Verify terminal exists and is in the thread
4660 let terminal_exists_before =
4661 thread.read_with(cx, |thread, _| thread.terminals.contains_key(&terminal_id));
4662 assert!(
4663 terminal_exists_before,
4664 "Terminal should exist before checkpoint restore"
4665 );
4666
4667 // Verify the terminal's underlying task is still running (not completed)
4668 let terminal_running_before = thread.read_with(cx, |thread, _cx| {
4669 let terminal_entity = thread.terminals.get(&terminal_id).unwrap();
4670 terminal_entity.read_with(cx, |term, _cx| {
4671 term.output().is_none() // output is None means it's still running
4672 })
4673 });
4674 assert!(
4675 terminal_running_before,
4676 "Terminal should be running before checkpoint restore"
4677 );
4678
4679 // Verify we have the expected entries before restore
4680 let entry_count_before = thread.read_with(cx, |thread, _| thread.entries.len());
4681 assert!(
4682 entry_count_before > 1,
4683 "Should have multiple entries before restore"
4684 );
4685
4686 // Restore the checkpoint to the second message.
4687 // This should:
4688 // 1. Cancel any in-progress generation (via the cancel() call)
4689 // 2. Remove the terminal that was created after that point
4690 thread
4691 .update(cx, |thread, cx| {
4692 thread.restore_checkpoint(second_message_id, cx)
4693 })
4694 .await
4695 .unwrap();
4696
4697 // Verify that no send_task is in progress after restore
4698 // (cancel() clears the send_task)
4699 let has_send_task_after = thread.read_with(cx, |thread, _| thread.running_turn.is_some());
4700 assert!(
4701 !has_send_task_after,
4702 "Should not have a send_task after restore (cancel should have cleared it)"
4703 );
4704
4705 // Verify the entries were truncated (restoring to index 1 truncates at 1, keeping only index 0)
4706 let entry_count = thread.read_with(cx, |thread, _| thread.entries.len());
4707 assert_eq!(
4708 entry_count, 1,
4709 "Should have 1 entry after restore (only the first user message)"
4710 );
4711
4712 // Verify the 2 completed terminals from before the checkpoint still exist
4713 let terminal_1_exists = thread.read_with(cx, |thread, _| {
4714 thread.terminals.contains_key(&terminal_id_1)
4715 });
4716 assert!(
4717 terminal_1_exists,
4718 "Terminal 1 (from before checkpoint) should still exist"
4719 );
4720
4721 let terminal_2_exists = thread.read_with(cx, |thread, _| {
4722 thread.terminals.contains_key(&terminal_id_2)
4723 });
4724 assert!(
4725 terminal_2_exists,
4726 "Terminal 2 (from before checkpoint) should still exist"
4727 );
4728
4729 // Verify they're still in completed state
4730 let terminal_1_completed = thread.read_with(cx, |thread, _cx| {
4731 let terminal_entity = thread.terminals.get(&terminal_id_1).unwrap();
4732 terminal_entity.read_with(cx, |term, _cx| term.output().is_some())
4733 });
4734 assert!(terminal_1_completed, "Terminal 1 should still be completed");
4735
4736 let terminal_2_completed = thread.read_with(cx, |thread, _cx| {
4737 let terminal_entity = thread.terminals.get(&terminal_id_2).unwrap();
4738 terminal_entity.read_with(cx, |term, _cx| term.output().is_some())
4739 });
4740 assert!(terminal_2_completed, "Terminal 2 should still be completed");
4741
4742 // Verify the running terminal (created after checkpoint) was removed
4743 let terminal_3_exists =
4744 thread.read_with(cx, |thread, _| thread.terminals.contains_key(&terminal_id));
4745 assert!(
4746 !terminal_3_exists,
4747 "Terminal 3 (created after checkpoint) should have been removed"
4748 );
4749
4750 // Verify total count is 2 (the two from before the checkpoint)
4751 let terminal_count = thread.read_with(cx, |thread, _| thread.terminals.len());
4752 assert_eq!(
4753 terminal_count, 2,
4754 "Should have exactly 2 terminals (the completed ones from before checkpoint)"
4755 );
4756 }
4757
4758 /// Tests that update_last_checkpoint correctly updates the original message's checkpoint
4759 /// even when a new user message is added while the async checkpoint comparison is in progress.
4760 ///
4761 /// This is a regression test for a bug where update_last_checkpoint would fail with
4762 /// "no checkpoint" if a new user message (without a checkpoint) was added between when
4763 /// update_last_checkpoint started and when its async closure ran.
4764 #[gpui::test]
4765 async fn test_update_last_checkpoint_with_new_message_added(cx: &mut TestAppContext) {
4766 init_test(cx);
4767
4768 let fs = FakeFs::new(cx.executor());
4769 fs.insert_tree(path!("/test"), json!({".git": {}, "file.txt": "content"}))
4770 .await;
4771 let project = Project::test(fs.clone(), [Path::new(path!("/test"))], cx).await;
4772
4773 let handler_done = Arc::new(AtomicBool::new(false));
4774 let handler_done_clone = handler_done.clone();
4775 let connection = Rc::new(FakeAgentConnection::new().on_user_message(
4776 move |_, _thread, _cx| {
4777 handler_done_clone.store(true, SeqCst);
4778 async move { Ok(acp::PromptResponse::new(acp::StopReason::EndTurn)) }.boxed_local()
4779 },
4780 ));
4781
4782 let thread = cx
4783 .update(|cx| {
4784 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4785 })
4786 .await
4787 .unwrap();
4788
4789 let send_future = thread.update(cx, |thread, cx| thread.send_raw("First message", cx));
4790 let send_task = cx.background_executor.spawn(send_future);
4791
4792 // Tick until handler completes, then a few more to let update_last_checkpoint start
4793 while !handler_done.load(SeqCst) {
4794 cx.executor().tick();
4795 }
4796 for _ in 0..5 {
4797 cx.executor().tick();
4798 }
4799
4800 thread.update(cx, |thread, cx| {
4801 thread.push_entry(
4802 AgentThreadEntry::UserMessage(UserMessage {
4803 id: Some(UserMessageId::new()),
4804 content: ContentBlock::Empty,
4805 chunks: vec!["Injected message (no checkpoint)".into()],
4806 checkpoint: None,
4807 indented: false,
4808 }),
4809 cx,
4810 );
4811 });
4812
4813 cx.run_until_parked();
4814 let result = send_task.await;
4815
4816 assert!(
4817 result.is_ok(),
4818 "send should succeed even when new message added during update_last_checkpoint: {:?}",
4819 result.err()
4820 );
4821 }
4822
4823 /// Tests that when a follow-up message is sent during generation,
4824 /// the first turn completing does NOT clear `running_turn` because
4825 /// it now belongs to the second turn.
4826 #[gpui::test]
4827 async fn test_follow_up_message_during_generation_does_not_clear_turn(cx: &mut TestAppContext) {
4828 init_test(cx);
4829
4830 let fs = FakeFs::new(cx.executor());
4831 let project = Project::test(fs, [], cx).await;
4832
4833 // First handler waits for this signal before completing
4834 let (first_complete_tx, first_complete_rx) = futures::channel::oneshot::channel::<()>();
4835 let first_complete_rx = RefCell::new(Some(first_complete_rx));
4836
4837 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
4838 move |params, _thread, _cx| {
4839 let first_complete_rx = first_complete_rx.borrow_mut().take();
4840 let is_first = params
4841 .prompt
4842 .iter()
4843 .any(|c| matches!(c, acp::ContentBlock::Text(t) if t.text.contains("first")));
4844
4845 async move {
4846 if is_first {
4847 // First handler waits until signaled
4848 if let Some(rx) = first_complete_rx {
4849 rx.await.ok();
4850 }
4851 }
4852 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
4853 }
4854 .boxed_local()
4855 }
4856 }));
4857
4858 let thread = cx
4859 .update(|cx| {
4860 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4861 })
4862 .await
4863 .unwrap();
4864
4865 // Send first message (turn_id=1) - handler will block
4866 let first_request = thread.update(cx, |thread, cx| thread.send_raw("first", cx));
4867 assert_eq!(thread.read_with(cx, |t, _| t.turn_id), 1);
4868
4869 // Send second message (turn_id=2) while first is still blocked
4870 // This calls cancel() which takes turn 1's running_turn and sets turn 2's
4871 let second_request = thread.update(cx, |thread, cx| thread.send_raw("second", cx));
4872 assert_eq!(thread.read_with(cx, |t, _| t.turn_id), 2);
4873
4874 let running_turn_after_second_send =
4875 thread.read_with(cx, |thread, _| thread.running_turn.as_ref().map(|t| t.id));
4876 assert_eq!(
4877 running_turn_after_second_send,
4878 Some(2),
4879 "running_turn should be set to turn 2 after sending second message"
4880 );
4881
4882 // Now signal first handler to complete
4883 first_complete_tx.send(()).ok();
4884
4885 // First request completes - should NOT clear running_turn
4886 // because running_turn now belongs to turn 2
4887 first_request.await.unwrap();
4888
4889 let running_turn_after_first =
4890 thread.read_with(cx, |thread, _| thread.running_turn.as_ref().map(|t| t.id));
4891 assert_eq!(
4892 running_turn_after_first,
4893 Some(2),
4894 "first turn completing should not clear running_turn (belongs to turn 2)"
4895 );
4896
4897 // Second request completes - SHOULD clear running_turn
4898 second_request.await.unwrap();
4899
4900 let running_turn_after_second =
4901 thread.read_with(cx, |thread, _| thread.running_turn.is_some());
4902 assert!(
4903 !running_turn_after_second,
4904 "second turn completing should clear running_turn"
4905 );
4906 }
4907
4908 #[gpui::test]
4909 async fn test_send_returns_cancelled_response_and_marks_tools_as_cancelled(
4910 cx: &mut TestAppContext,
4911 ) {
4912 init_test(cx);
4913
4914 let fs = FakeFs::new(cx.executor());
4915 let project = Project::test(fs, [], cx).await;
4916
4917 let connection = Rc::new(FakeAgentConnection::new().on_user_message(
4918 move |_params, thread, mut cx| {
4919 async move {
4920 thread
4921 .update(&mut cx, |thread, cx| {
4922 thread.handle_session_update(
4923 acp::SessionUpdate::ToolCall(
4924 acp::ToolCall::new(
4925 acp::ToolCallId::new("test-tool"),
4926 "Test Tool",
4927 )
4928 .kind(acp::ToolKind::Fetch)
4929 .status(acp::ToolCallStatus::InProgress),
4930 ),
4931 cx,
4932 )
4933 })
4934 .unwrap()
4935 .unwrap();
4936
4937 Ok(acp::PromptResponse::new(acp::StopReason::Cancelled))
4938 }
4939 .boxed_local()
4940 },
4941 ));
4942
4943 let thread = cx
4944 .update(|cx| {
4945 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4946 })
4947 .await
4948 .unwrap();
4949
4950 let response = thread
4951 .update(cx, |thread, cx| thread.send_raw("test message", cx))
4952 .await;
4953
4954 let response = response
4955 .expect("send should succeed")
4956 .expect("should have response");
4957 assert_eq!(
4958 response.stop_reason,
4959 acp::StopReason::Cancelled,
4960 "response should have Cancelled stop_reason"
4961 );
4962
4963 thread.read_with(cx, |thread, _| {
4964 let tool_entry = thread
4965 .entries
4966 .iter()
4967 .find_map(|e| {
4968 if let AgentThreadEntry::ToolCall(call) = e {
4969 Some(call)
4970 } else {
4971 None
4972 }
4973 })
4974 .expect("should have tool call entry");
4975
4976 assert!(
4977 matches!(tool_entry.status, ToolCallStatus::Canceled),
4978 "tool should be marked as Canceled when response is Cancelled, got {:?}",
4979 tool_entry.status
4980 );
4981 });
4982 }
4983
4984 #[gpui::test]
4985 async fn test_provisional_title_replaced_by_real_title(cx: &mut TestAppContext) {
4986 init_test(cx);
4987
4988 let fs = FakeFs::new(cx.executor());
4989 let project = Project::test(fs, [], cx).await;
4990 let connection = Rc::new(FakeAgentConnection::new());
4991 let set_title_calls = connection.set_title_calls.clone();
4992
4993 let thread = cx
4994 .update(|cx| {
4995 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4996 })
4997 .await
4998 .unwrap();
4999
5000 // Initial title is the default.
5001 thread.read_with(cx, |thread, _| {
5002 assert_eq!(thread.title().as_ref(), "Test");
5003 });
5004
5005 // Setting a provisional title updates the display title.
5006 thread.update(cx, |thread, cx| {
5007 thread.set_provisional_title("Hello, can you help…".into(), cx);
5008 });
5009 thread.read_with(cx, |thread, _| {
5010 assert_eq!(thread.title().as_ref(), "Hello, can you help…");
5011 });
5012
5013 // The provisional title should NOT have propagated to the connection.
5014 assert_eq!(
5015 set_title_calls.borrow().len(),
5016 0,
5017 "provisional title should not propagate to the connection"
5018 );
5019
5020 // When the real title arrives via set_title, it replaces the
5021 // provisional title and propagates to the connection.
5022 let task = thread.update(cx, |thread, cx| {
5023 thread.set_title("Helping with Rust question".into(), cx)
5024 });
5025 task.await.expect("set_title should succeed");
5026 thread.read_with(cx, |thread, _| {
5027 assert_eq!(thread.title().as_ref(), "Helping with Rust question");
5028 });
5029 assert_eq!(
5030 set_title_calls.borrow().as_slice(),
5031 &[SharedString::from("Helping with Rust question")],
5032 "real title should propagate to the connection"
5033 );
5034 }
5035
5036 #[gpui::test]
5037 async fn test_session_info_update_replaces_provisional_title_and_emits_event(
5038 cx: &mut TestAppContext,
5039 ) {
5040 init_test(cx);
5041
5042 let fs = FakeFs::new(cx.executor());
5043 let project = Project::test(fs, [], cx).await;
5044 let connection = Rc::new(FakeAgentConnection::new());
5045
5046 let thread = cx
5047 .update(|cx| {
5048 connection.clone().new_session(
5049 project,
5050 PathList::new(&[Path::new(path!("/test"))]),
5051 cx,
5052 )
5053 })
5054 .await
5055 .unwrap();
5056
5057 let title_updated_events = Rc::new(RefCell::new(0usize));
5058 let title_updated_events_for_subscription = title_updated_events.clone();
5059 thread.update(cx, |_thread, cx| {
5060 cx.subscribe(
5061 &thread,
5062 move |_thread, _event_thread, event: &AcpThreadEvent, _cx| {
5063 if matches!(event, AcpThreadEvent::TitleUpdated) {
5064 *title_updated_events_for_subscription.borrow_mut() += 1;
5065 }
5066 },
5067 )
5068 .detach();
5069 });
5070
5071 thread.update(cx, |thread, cx| {
5072 thread.set_provisional_title("Hello, can you help…".into(), cx);
5073 });
5074 assert_eq!(
5075 *title_updated_events.borrow(),
5076 1,
5077 "setting a provisional title should emit TitleUpdated"
5078 );
5079
5080 let result = thread.update(cx, |thread, cx| {
5081 thread.handle_session_update(
5082 acp::SessionUpdate::SessionInfoUpdate(
5083 acp::SessionInfoUpdate::new().title("Helping with Rust question"),
5084 ),
5085 cx,
5086 )
5087 });
5088 result.expect("session info update should succeed");
5089
5090 thread.read_with(cx, |thread, _| {
5091 assert_eq!(thread.title().as_ref(), "Helping with Rust question");
5092 assert!(
5093 !thread.has_provisional_title(),
5094 "session info title update should clear provisional title"
5095 );
5096 });
5097
5098 assert_eq!(
5099 *title_updated_events.borrow(),
5100 2,
5101 "session info title update should emit TitleUpdated"
5102 );
5103 assert!(
5104 connection.set_title_calls.borrow().is_empty(),
5105 "session info title update should not propagate back to the connection"
5106 );
5107 }
5108}