1mod connection;
2mod diff;
3mod mention;
4mod terminal;
5use action_log::{ActionLog, ActionLogTelemetry};
6use agent_client_protocol::{self as acp};
7use anyhow::{Context as _, Result, anyhow};
8use collections::HashSet;
9pub use connection::*;
10pub use diff::*;
11use futures::{FutureExt, channel::oneshot, future::BoxFuture};
12use gpui::{AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Task, WeakEntity};
13use itertools::Itertools;
14use language::language_settings::FormatOnSave;
15use language::{Anchor, Buffer, BufferSnapshot, LanguageRegistry, Point, ToPoint, text_diff};
16use markdown::Markdown;
17pub use mention::*;
18use project::lsp_store::{FormatTrigger, LspFormatTarget};
19use project::{AgentLocation, Project, git_store::GitStoreCheckpoint};
20use serde::{Deserialize, Serialize};
21use serde_json::to_string_pretty;
22use std::collections::HashMap;
23use std::error::Error;
24use std::fmt::{Formatter, Write};
25use std::ops::Range;
26use std::process::ExitStatus;
27use std::rc::Rc;
28use std::time::{Duration, Instant};
29use std::{fmt::Display, mem, path::PathBuf, sync::Arc};
30use task::{Shell, ShellBuilder};
31pub use terminal::*;
32use text::Bias;
33use ui::App;
34use util::markdown::MarkdownEscaped;
35use util::path_list::PathList;
36use util::{ResultExt, get_default_system_shell_preferring_bash, paths::PathStyle};
37use uuid::Uuid;
38
39/// Key used in ACP ToolCall meta to store the tool's programmatic name.
40/// This is a workaround since ACP's ToolCall doesn't have a dedicated name field.
41pub const TOOL_NAME_META_KEY: &str = "tool_name";
42
43/// Helper to extract tool name from ACP meta
44pub fn tool_name_from_meta(meta: &Option<acp::Meta>) -> Option<SharedString> {
45 meta.as_ref()
46 .and_then(|m| m.get(TOOL_NAME_META_KEY))
47 .and_then(|v| v.as_str())
48 .map(|s| SharedString::from(s.to_owned()))
49}
50
51/// Helper to create meta with tool name
52pub fn meta_with_tool_name(tool_name: &str) -> acp::Meta {
53 acp::Meta::from_iter([(TOOL_NAME_META_KEY.into(), tool_name.into())])
54}
55
56/// Key used in ACP ToolCall meta to store the session id and message indexes
57pub const SUBAGENT_SESSION_INFO_META_KEY: &str = "subagent_session_info";
58
59#[derive(Clone, Debug, Deserialize, Serialize)]
60pub struct SubagentSessionInfo {
61 /// The session id of the subagent sessiont that was spawned
62 pub session_id: acp::SessionId,
63 /// The index of the message of the start of the "turn" run by this tool call
64 pub message_start_index: usize,
65 /// The index of the output of the message that the subagent has returned
66 #[serde(skip_serializing_if = "Option::is_none")]
67 pub message_end_index: Option<usize>,
68}
69
70/// Helper to extract subagent session id from ACP meta
71pub fn subagent_session_info_from_meta(meta: &Option<acp::Meta>) -> Option<SubagentSessionInfo> {
72 meta.as_ref()
73 .and_then(|m| m.get(SUBAGENT_SESSION_INFO_META_KEY))
74 .and_then(|v| serde_json::from_value(v.clone()).ok())
75}
76
77#[derive(Debug)]
78pub struct UserMessage {
79 pub id: Option<UserMessageId>,
80 pub content: ContentBlock,
81 pub chunks: Vec<acp::ContentBlock>,
82 pub checkpoint: Option<Checkpoint>,
83 pub indented: bool,
84}
85
86#[derive(Debug)]
87pub struct Checkpoint {
88 git_checkpoint: GitStoreCheckpoint,
89 pub show: bool,
90}
91
92impl UserMessage {
93 fn to_markdown(&self, cx: &App) -> String {
94 let mut markdown = String::new();
95 if self
96 .checkpoint
97 .as_ref()
98 .is_some_and(|checkpoint| checkpoint.show)
99 {
100 writeln!(markdown, "## User (checkpoint)").unwrap();
101 } else {
102 writeln!(markdown, "## User").unwrap();
103 }
104 writeln!(markdown).unwrap();
105 writeln!(markdown, "{}", self.content.to_markdown(cx)).unwrap();
106 writeln!(markdown).unwrap();
107 markdown
108 }
109}
110
111#[derive(Debug, PartialEq)]
112pub struct AssistantMessage {
113 pub chunks: Vec<AssistantMessageChunk>,
114 pub indented: bool,
115 pub is_subagent_output: bool,
116}
117
118impl AssistantMessage {
119 pub fn to_markdown(&self, cx: &App) -> String {
120 format!(
121 "## Assistant\n\n{}\n\n",
122 self.chunks
123 .iter()
124 .map(|chunk| chunk.to_markdown(cx))
125 .join("\n\n")
126 )
127 }
128}
129
130#[derive(Debug, PartialEq)]
131pub enum AssistantMessageChunk {
132 Message { block: ContentBlock },
133 Thought { block: ContentBlock },
134}
135
136impl AssistantMessageChunk {
137 pub fn from_str(
138 chunk: &str,
139 language_registry: &Arc<LanguageRegistry>,
140 path_style: PathStyle,
141 cx: &mut App,
142 ) -> Self {
143 Self::Message {
144 block: ContentBlock::new(chunk.into(), language_registry, path_style, cx),
145 }
146 }
147
148 fn to_markdown(&self, cx: &App) -> String {
149 match self {
150 Self::Message { block } => block.to_markdown(cx).to_string(),
151 Self::Thought { block } => {
152 format!("<thinking>\n{}\n</thinking>", block.to_markdown(cx))
153 }
154 }
155 }
156}
157
158#[derive(Debug)]
159pub enum AgentThreadEntry {
160 UserMessage(UserMessage),
161 AssistantMessage(AssistantMessage),
162 ToolCall(ToolCall),
163 CompletedPlan(Vec<PlanEntry>),
164}
165
166impl AgentThreadEntry {
167 pub fn is_indented(&self) -> bool {
168 match self {
169 Self::UserMessage(message) => message.indented,
170 Self::AssistantMessage(message) => message.indented,
171 Self::ToolCall(_) => false,
172 Self::CompletedPlan(_) => false,
173 }
174 }
175
176 pub fn to_markdown(&self, cx: &App) -> String {
177 match self {
178 Self::UserMessage(message) => message.to_markdown(cx),
179 Self::AssistantMessage(message) => message.to_markdown(cx),
180 Self::ToolCall(tool_call) => tool_call.to_markdown(cx),
181 Self::CompletedPlan(entries) => {
182 let mut md = String::from("## Plan\n\n");
183 for entry in entries {
184 let source = entry.content.read(cx).source().to_string();
185 md.push_str(&format!("- [x] {}\n", source));
186 }
187 md
188 }
189 }
190 }
191
192 pub fn user_message(&self) -> Option<&UserMessage> {
193 if let AgentThreadEntry::UserMessage(message) = self {
194 Some(message)
195 } else {
196 None
197 }
198 }
199
200 pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
201 if let AgentThreadEntry::ToolCall(call) = self {
202 itertools::Either::Left(call.diffs())
203 } else {
204 itertools::Either::Right(std::iter::empty())
205 }
206 }
207
208 pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
209 if let AgentThreadEntry::ToolCall(call) = self {
210 itertools::Either::Left(call.terminals())
211 } else {
212 itertools::Either::Right(std::iter::empty())
213 }
214 }
215
216 pub fn location(&self, ix: usize) -> Option<(acp::ToolCallLocation, AgentLocation)> {
217 if let AgentThreadEntry::ToolCall(ToolCall {
218 locations,
219 resolved_locations,
220 ..
221 }) = self
222 {
223 Some((
224 locations.get(ix)?.clone(),
225 resolved_locations.get(ix)?.clone()?,
226 ))
227 } else {
228 None
229 }
230 }
231}
232
233#[derive(Debug)]
234pub struct ToolCall {
235 pub id: acp::ToolCallId,
236 pub label: Entity<Markdown>,
237 pub kind: acp::ToolKind,
238 pub content: Vec<ToolCallContent>,
239 pub status: ToolCallStatus,
240 pub locations: Vec<acp::ToolCallLocation>,
241 pub resolved_locations: Vec<Option<AgentLocation>>,
242 pub raw_input: Option<serde_json::Value>,
243 pub raw_input_markdown: Option<Entity<Markdown>>,
244 pub raw_output: Option<serde_json::Value>,
245 pub tool_name: Option<SharedString>,
246 pub subagent_session_info: Option<SubagentSessionInfo>,
247}
248
249impl ToolCall {
250 fn from_acp(
251 tool_call: acp::ToolCall,
252 status: ToolCallStatus,
253 language_registry: Arc<LanguageRegistry>,
254 path_style: PathStyle,
255 terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
256 cx: &mut App,
257 ) -> Result<Self> {
258 let title = if tool_call.kind == acp::ToolKind::Execute {
259 tool_call.title
260 } else if tool_call.kind == acp::ToolKind::Edit {
261 MarkdownEscaped(tool_call.title.as_str()).to_string()
262 } else if let Some((first_line, _)) = tool_call.title.split_once("\n") {
263 first_line.to_owned() + "…"
264 } else {
265 tool_call.title
266 };
267 let mut content = Vec::with_capacity(tool_call.content.len());
268 for item in tool_call.content {
269 if let Some(item) = ToolCallContent::from_acp(
270 item,
271 language_registry.clone(),
272 path_style,
273 terminals,
274 cx,
275 )? {
276 content.push(item);
277 }
278 }
279
280 let raw_input_markdown = tool_call
281 .raw_input
282 .as_ref()
283 .and_then(|input| markdown_for_raw_output(input, &language_registry, cx));
284
285 let tool_name = tool_name_from_meta(&tool_call.meta);
286
287 let subagent_session_info = subagent_session_info_from_meta(&tool_call.meta);
288
289 let result = Self {
290 id: tool_call.tool_call_id,
291 label: cx
292 .new(|cx| Markdown::new(title.into(), Some(language_registry.clone()), None, cx)),
293 kind: tool_call.kind,
294 content,
295 locations: tool_call.locations,
296 resolved_locations: Vec::default(),
297 status,
298 raw_input: tool_call.raw_input,
299 raw_input_markdown,
300 raw_output: tool_call.raw_output,
301 tool_name,
302 subagent_session_info,
303 };
304 Ok(result)
305 }
306
307 fn update_fields(
308 &mut self,
309 fields: acp::ToolCallUpdateFields,
310 meta: Option<acp::Meta>,
311 language_registry: Arc<LanguageRegistry>,
312 path_style: PathStyle,
313 terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
314 cx: &mut App,
315 ) -> Result<()> {
316 let acp::ToolCallUpdateFields {
317 kind,
318 status,
319 title,
320 content,
321 locations,
322 raw_input,
323 raw_output,
324 ..
325 } = fields;
326
327 if let Some(kind) = kind {
328 self.kind = kind;
329 }
330
331 if let Some(status) = status {
332 self.status = status.into();
333 }
334
335 if let Some(subagent_session_info) = subagent_session_info_from_meta(&meta) {
336 self.subagent_session_info = Some(subagent_session_info);
337 }
338
339 if let Some(title) = title {
340 if self.kind == acp::ToolKind::Execute {
341 for terminal in self.terminals() {
342 terminal.update(cx, |terminal, cx| {
343 terminal.update_command_label(&title, cx);
344 });
345 }
346 }
347 self.label.update(cx, |label, cx| {
348 if self.kind == acp::ToolKind::Execute {
349 label.replace(title, cx);
350 } else if self.kind == acp::ToolKind::Edit {
351 label.replace(MarkdownEscaped(&title).to_string(), cx)
352 } else if let Some((first_line, _)) = title.split_once("\n") {
353 label.replace(first_line.to_owned() + "…", cx);
354 } else {
355 label.replace(title, cx);
356 }
357 });
358 }
359
360 if let Some(content) = content {
361 let mut new_content_len = content.len();
362 let mut content = content.into_iter();
363
364 // Reuse existing content if we can
365 for (old, new) in self.content.iter_mut().zip(content.by_ref()) {
366 let valid_content =
367 old.update_from_acp(new, language_registry.clone(), path_style, terminals, cx)?;
368 if !valid_content {
369 new_content_len -= 1;
370 }
371 }
372 for new in content {
373 if let Some(new) = ToolCallContent::from_acp(
374 new,
375 language_registry.clone(),
376 path_style,
377 terminals,
378 cx,
379 )? {
380 self.content.push(new);
381 } else {
382 new_content_len -= 1;
383 }
384 }
385 self.content.truncate(new_content_len);
386 }
387
388 if let Some(locations) = locations {
389 self.locations = locations;
390 }
391
392 if let Some(raw_input) = raw_input {
393 self.raw_input_markdown = markdown_for_raw_output(&raw_input, &language_registry, cx);
394 self.raw_input = Some(raw_input);
395 }
396
397 if let Some(raw_output) = raw_output {
398 if self.content.is_empty()
399 && let Some(markdown) = markdown_for_raw_output(&raw_output, &language_registry, cx)
400 {
401 self.content
402 .push(ToolCallContent::ContentBlock(ContentBlock::Markdown {
403 markdown,
404 }));
405 }
406 self.raw_output = Some(raw_output);
407 }
408 Ok(())
409 }
410
411 pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
412 self.content.iter().filter_map(|content| match content {
413 ToolCallContent::Diff(diff) => Some(diff),
414 ToolCallContent::ContentBlock(_) => None,
415 ToolCallContent::Terminal(_) => None,
416 })
417 }
418
419 pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
420 self.content.iter().filter_map(|content| match content {
421 ToolCallContent::Terminal(terminal) => Some(terminal),
422 ToolCallContent::ContentBlock(_) => None,
423 ToolCallContent::Diff(_) => None,
424 })
425 }
426
427 pub fn is_subagent(&self) -> bool {
428 self.tool_name.as_ref().is_some_and(|s| s == "spawn_agent")
429 || self.subagent_session_info.is_some()
430 }
431
432 pub fn to_markdown(&self, cx: &App) -> String {
433 let mut markdown = format!(
434 "**Tool Call: {}**\nStatus: {}\n\n",
435 self.label.read(cx).source(),
436 self.status
437 );
438 for content in &self.content {
439 markdown.push_str(content.to_markdown(cx).as_str());
440 markdown.push_str("\n\n");
441 }
442 markdown
443 }
444
445 async fn resolve_location(
446 location: acp::ToolCallLocation,
447 project: WeakEntity<Project>,
448 cx: &mut AsyncApp,
449 ) -> Option<ResolvedLocation> {
450 let buffer = project
451 .update(cx, |project, cx| {
452 project
453 .project_path_for_absolute_path(&location.path, cx)
454 .map(|path| project.open_buffer(path, cx))
455 })
456 .ok()??;
457 let buffer = buffer.await.log_err()?;
458 let position = buffer.update(cx, |buffer, _| {
459 let snapshot = buffer.snapshot();
460 if let Some(row) = location.line {
461 let column = snapshot.indent_size_for_line(row).len;
462 let point = snapshot.clip_point(Point::new(row, column), Bias::Left);
463 snapshot.anchor_before(point)
464 } else {
465 Anchor::min_for_buffer(snapshot.remote_id())
466 }
467 });
468
469 Some(ResolvedLocation { buffer, position })
470 }
471
472 fn resolve_locations(
473 &self,
474 project: Entity<Project>,
475 cx: &mut App,
476 ) -> Task<Vec<Option<ResolvedLocation>>> {
477 let locations = self.locations.clone();
478 project.update(cx, |_, cx| {
479 cx.spawn(async move |project, cx| {
480 let mut new_locations = Vec::new();
481 for location in locations {
482 new_locations.push(Self::resolve_location(location, project.clone(), cx).await);
483 }
484 new_locations
485 })
486 })
487 }
488}
489
490// Separate so we can hold a strong reference to the buffer
491// for saving on the thread
492#[derive(Clone, Debug, PartialEq, Eq)]
493struct ResolvedLocation {
494 buffer: Entity<Buffer>,
495 position: Anchor,
496}
497
498impl From<&ResolvedLocation> for AgentLocation {
499 fn from(value: &ResolvedLocation) -> Self {
500 Self {
501 buffer: value.buffer.downgrade(),
502 position: value.position,
503 }
504 }
505}
506
507#[derive(Debug, Clone)]
508pub enum SelectedPermissionParams {
509 Terminal { patterns: Vec<String> },
510}
511
512#[derive(Debug)]
513pub struct SelectedPermissionOutcome {
514 pub option_id: acp::PermissionOptionId,
515 pub option_kind: acp::PermissionOptionKind,
516 pub params: Option<SelectedPermissionParams>,
517}
518
519impl SelectedPermissionOutcome {
520 pub fn new(option_id: acp::PermissionOptionId, option_kind: acp::PermissionOptionKind) -> Self {
521 Self {
522 option_id,
523 option_kind,
524 params: None,
525 }
526 }
527
528 pub fn params(mut self, params: Option<SelectedPermissionParams>) -> Self {
529 self.params = params;
530 self
531 }
532}
533
534impl From<SelectedPermissionOutcome> for acp::SelectedPermissionOutcome {
535 fn from(value: SelectedPermissionOutcome) -> Self {
536 Self::new(value.option_id)
537 }
538}
539
540#[derive(Debug)]
541pub enum RequestPermissionOutcome {
542 Cancelled,
543 Selected(SelectedPermissionOutcome),
544}
545
546impl From<RequestPermissionOutcome> for acp::RequestPermissionOutcome {
547 fn from(value: RequestPermissionOutcome) -> Self {
548 match value {
549 RequestPermissionOutcome::Cancelled => Self::Cancelled,
550 RequestPermissionOutcome::Selected(outcome) => Self::Selected(outcome.into()),
551 }
552 }
553}
554
555#[derive(Debug)]
556pub enum ToolCallStatus {
557 /// The tool call hasn't started running yet, but we start showing it to
558 /// the user.
559 Pending,
560 /// The tool call is waiting for confirmation from the user.
561 WaitingForConfirmation {
562 options: PermissionOptions,
563 respond_tx: oneshot::Sender<SelectedPermissionOutcome>,
564 },
565 /// The tool call is currently running.
566 InProgress,
567 /// The tool call completed successfully.
568 Completed,
569 /// The tool call failed.
570 Failed,
571 /// The user rejected the tool call.
572 Rejected,
573 /// The user canceled generation so the tool call was canceled.
574 Canceled,
575}
576
577impl From<acp::ToolCallStatus> for ToolCallStatus {
578 fn from(status: acp::ToolCallStatus) -> Self {
579 match status {
580 acp::ToolCallStatus::Pending => Self::Pending,
581 acp::ToolCallStatus::InProgress => Self::InProgress,
582 acp::ToolCallStatus::Completed => Self::Completed,
583 acp::ToolCallStatus::Failed => Self::Failed,
584 _ => Self::Pending,
585 }
586 }
587}
588
589impl Display for ToolCallStatus {
590 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
591 write!(
592 f,
593 "{}",
594 match self {
595 ToolCallStatus::Pending => "Pending",
596 ToolCallStatus::WaitingForConfirmation { .. } => "Waiting for confirmation",
597 ToolCallStatus::InProgress => "In Progress",
598 ToolCallStatus::Completed => "Completed",
599 ToolCallStatus::Failed => "Failed",
600 ToolCallStatus::Rejected => "Rejected",
601 ToolCallStatus::Canceled => "Canceled",
602 }
603 )
604 }
605}
606
607#[derive(Debug, PartialEq, Clone)]
608pub enum ContentBlock {
609 Empty,
610 Markdown { markdown: Entity<Markdown> },
611 ResourceLink { resource_link: acp::ResourceLink },
612 Image { image: Arc<gpui::Image> },
613}
614
615impl ContentBlock {
616 pub fn new(
617 block: acp::ContentBlock,
618 language_registry: &Arc<LanguageRegistry>,
619 path_style: PathStyle,
620 cx: &mut App,
621 ) -> Self {
622 let mut this = Self::Empty;
623 this.append(block, language_registry, path_style, cx);
624 this
625 }
626
627 pub fn new_combined(
628 blocks: impl IntoIterator<Item = acp::ContentBlock>,
629 language_registry: Arc<LanguageRegistry>,
630 path_style: PathStyle,
631 cx: &mut App,
632 ) -> Self {
633 let mut this = Self::Empty;
634 for block in blocks {
635 this.append(block, &language_registry, path_style, cx);
636 }
637 this
638 }
639
640 pub fn append(
641 &mut self,
642 block: acp::ContentBlock,
643 language_registry: &Arc<LanguageRegistry>,
644 path_style: PathStyle,
645 cx: &mut App,
646 ) {
647 match (&mut *self, &block) {
648 (ContentBlock::Empty, acp::ContentBlock::ResourceLink(resource_link)) => {
649 *self = ContentBlock::ResourceLink {
650 resource_link: resource_link.clone(),
651 };
652 }
653 (ContentBlock::Empty, acp::ContentBlock::Image(image_content)) => {
654 if let Some(image) = Self::decode_image(image_content) {
655 *self = ContentBlock::Image { image };
656 } else {
657 let new_content = Self::image_md(image_content);
658 *self = Self::create_markdown_block(new_content, language_registry, cx);
659 }
660 }
661 (ContentBlock::Empty, _) => {
662 let new_content = Self::block_string_contents(&block, path_style);
663 *self = Self::create_markdown_block(new_content, language_registry, cx);
664 }
665 (ContentBlock::Markdown { markdown }, _) => {
666 let new_content = Self::block_string_contents(&block, path_style);
667 markdown.update(cx, |markdown, cx| markdown.append(&new_content, cx));
668 }
669 (ContentBlock::ResourceLink { resource_link }, _) => {
670 let existing_content = Self::resource_link_md(&resource_link.uri, path_style);
671 let new_content = Self::block_string_contents(&block, path_style);
672 let combined = format!("{}\n{}", existing_content, new_content);
673 *self = Self::create_markdown_block(combined, language_registry, cx);
674 }
675 (ContentBlock::Image { .. }, _) => {
676 let new_content = Self::block_string_contents(&block, path_style);
677 let combined = format!("`Image`\n{}", new_content);
678 *self = Self::create_markdown_block(combined, language_registry, cx);
679 }
680 }
681 }
682
683 fn decode_image(image_content: &acp::ImageContent) -> Option<Arc<gpui::Image>> {
684 use base64::Engine as _;
685
686 let bytes = base64::engine::general_purpose::STANDARD
687 .decode(image_content.data.as_bytes())
688 .ok()?;
689 let format = gpui::ImageFormat::from_mime_type(&image_content.mime_type)?;
690 Some(Arc::new(gpui::Image::from_bytes(format, bytes)))
691 }
692
693 fn create_markdown_block(
694 content: String,
695 language_registry: &Arc<LanguageRegistry>,
696 cx: &mut App,
697 ) -> ContentBlock {
698 ContentBlock::Markdown {
699 markdown: cx
700 .new(|cx| Markdown::new(content.into(), Some(language_registry.clone()), None, cx)),
701 }
702 }
703
704 fn block_string_contents(block: &acp::ContentBlock, path_style: PathStyle) -> String {
705 match block {
706 acp::ContentBlock::Text(text_content) => text_content.text.clone(),
707 acp::ContentBlock::ResourceLink(resource_link) => {
708 Self::resource_link_md(&resource_link.uri, path_style)
709 }
710 acp::ContentBlock::Resource(acp::EmbeddedResource {
711 resource:
712 acp::EmbeddedResourceResource::TextResourceContents(acp::TextResourceContents {
713 uri,
714 ..
715 }),
716 ..
717 }) => Self::resource_link_md(uri, path_style),
718 acp::ContentBlock::Image(image) => Self::image_md(image),
719 _ => String::new(),
720 }
721 }
722
723 fn resource_link_md(uri: &str, path_style: PathStyle) -> String {
724 if let Some(uri) = MentionUri::parse(uri, path_style).log_err() {
725 uri.as_link().to_string()
726 } else {
727 uri.to_string()
728 }
729 }
730
731 fn image_md(_image: &acp::ImageContent) -> String {
732 "`Image`".into()
733 }
734
735 pub fn to_markdown<'a>(&'a self, cx: &'a App) -> &'a str {
736 match self {
737 ContentBlock::Empty => "",
738 ContentBlock::Markdown { markdown } => markdown.read(cx).source(),
739 ContentBlock::ResourceLink { resource_link } => &resource_link.uri,
740 ContentBlock::Image { .. } => "`Image`",
741 }
742 }
743
744 pub fn markdown(&self) -> Option<&Entity<Markdown>> {
745 match self {
746 ContentBlock::Empty => None,
747 ContentBlock::Markdown { markdown } => Some(markdown),
748 ContentBlock::ResourceLink { .. } => None,
749 ContentBlock::Image { .. } => None,
750 }
751 }
752
753 pub fn resource_link(&self) -> Option<&acp::ResourceLink> {
754 match self {
755 ContentBlock::ResourceLink { resource_link } => Some(resource_link),
756 _ => None,
757 }
758 }
759
760 pub fn image(&self) -> Option<&Arc<gpui::Image>> {
761 match self {
762 ContentBlock::Image { image } => Some(image),
763 _ => None,
764 }
765 }
766}
767
768#[derive(Debug)]
769pub enum ToolCallContent {
770 ContentBlock(ContentBlock),
771 Diff(Entity<Diff>),
772 Terminal(Entity<Terminal>),
773}
774
775impl ToolCallContent {
776 pub fn from_acp(
777 content: acp::ToolCallContent,
778 language_registry: Arc<LanguageRegistry>,
779 path_style: PathStyle,
780 terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
781 cx: &mut App,
782 ) -> Result<Option<Self>> {
783 match content {
784 acp::ToolCallContent::Content(acp::Content { content, .. }) => {
785 Ok(Some(Self::ContentBlock(ContentBlock::new(
786 content,
787 &language_registry,
788 path_style,
789 cx,
790 ))))
791 }
792 acp::ToolCallContent::Diff(diff) => Ok(Some(Self::Diff(cx.new(|cx| {
793 Diff::finalized(
794 diff.path.to_string_lossy().into_owned(),
795 diff.old_text,
796 diff.new_text,
797 language_registry,
798 cx,
799 )
800 })))),
801 acp::ToolCallContent::Terminal(acp::Terminal { terminal_id, .. }) => terminals
802 .get(&terminal_id)
803 .cloned()
804 .map(|terminal| Some(Self::Terminal(terminal)))
805 .ok_or_else(|| anyhow::anyhow!("Terminal with id `{}` not found", terminal_id)),
806 _ => Ok(None),
807 }
808 }
809
810 pub fn update_from_acp(
811 &mut self,
812 new: acp::ToolCallContent,
813 language_registry: Arc<LanguageRegistry>,
814 path_style: PathStyle,
815 terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
816 cx: &mut App,
817 ) -> Result<bool> {
818 let needs_update = match (&self, &new) {
819 (Self::Diff(old_diff), acp::ToolCallContent::Diff(new_diff)) => {
820 old_diff.read(cx).needs_update(
821 new_diff.old_text.as_deref().unwrap_or(""),
822 &new_diff.new_text,
823 cx,
824 )
825 }
826 _ => true,
827 };
828
829 if let Some(update) = Self::from_acp(new, language_registry, path_style, terminals, cx)? {
830 if needs_update {
831 *self = update;
832 }
833 Ok(true)
834 } else {
835 Ok(false)
836 }
837 }
838
839 pub fn to_markdown(&self, cx: &App) -> String {
840 match self {
841 Self::ContentBlock(content) => content.to_markdown(cx).to_string(),
842 Self::Diff(diff) => diff.read(cx).to_markdown(cx),
843 Self::Terminal(terminal) => terminal.read(cx).to_markdown(cx),
844 }
845 }
846
847 pub fn image(&self) -> Option<&Arc<gpui::Image>> {
848 match self {
849 Self::ContentBlock(content) => content.image(),
850 _ => None,
851 }
852 }
853}
854
855#[derive(Debug, PartialEq)]
856pub enum ToolCallUpdate {
857 UpdateFields(acp::ToolCallUpdate),
858 UpdateDiff(ToolCallUpdateDiff),
859 UpdateTerminal(ToolCallUpdateTerminal),
860}
861
862impl ToolCallUpdate {
863 fn id(&self) -> &acp::ToolCallId {
864 match self {
865 Self::UpdateFields(update) => &update.tool_call_id,
866 Self::UpdateDiff(diff) => &diff.id,
867 Self::UpdateTerminal(terminal) => &terminal.id,
868 }
869 }
870}
871
872impl From<acp::ToolCallUpdate> for ToolCallUpdate {
873 fn from(update: acp::ToolCallUpdate) -> Self {
874 Self::UpdateFields(update)
875 }
876}
877
878impl From<ToolCallUpdateDiff> for ToolCallUpdate {
879 fn from(diff: ToolCallUpdateDiff) -> Self {
880 Self::UpdateDiff(diff)
881 }
882}
883
884#[derive(Debug, PartialEq)]
885pub struct ToolCallUpdateDiff {
886 pub id: acp::ToolCallId,
887 pub diff: Entity<Diff>,
888}
889
890impl From<ToolCallUpdateTerminal> for ToolCallUpdate {
891 fn from(terminal: ToolCallUpdateTerminal) -> Self {
892 Self::UpdateTerminal(terminal)
893 }
894}
895
896#[derive(Debug, PartialEq)]
897pub struct ToolCallUpdateTerminal {
898 pub id: acp::ToolCallId,
899 pub terminal: Entity<Terminal>,
900}
901
902#[derive(Debug, Default)]
903pub struct Plan {
904 pub entries: Vec<PlanEntry>,
905}
906
907#[derive(Debug)]
908pub struct PlanStats<'a> {
909 pub in_progress_entry: Option<&'a PlanEntry>,
910 pub pending: u32,
911 pub completed: u32,
912}
913
914impl Plan {
915 pub fn is_empty(&self) -> bool {
916 self.entries.is_empty()
917 }
918
919 pub fn stats(&self) -> PlanStats<'_> {
920 let mut stats = PlanStats {
921 in_progress_entry: None,
922 pending: 0,
923 completed: 0,
924 };
925
926 for entry in &self.entries {
927 match &entry.status {
928 acp::PlanEntryStatus::Pending => {
929 stats.pending += 1;
930 }
931 acp::PlanEntryStatus::InProgress => {
932 stats.in_progress_entry = stats.in_progress_entry.or(Some(entry));
933 stats.pending += 1;
934 }
935 acp::PlanEntryStatus::Completed => {
936 stats.completed += 1;
937 }
938 _ => {}
939 }
940 }
941
942 stats
943 }
944}
945
946#[derive(Debug)]
947pub struct PlanEntry {
948 pub content: Entity<Markdown>,
949 pub priority: acp::PlanEntryPriority,
950 pub status: acp::PlanEntryStatus,
951}
952
953impl PlanEntry {
954 pub fn from_acp(entry: acp::PlanEntry, cx: &mut App) -> Self {
955 Self {
956 content: cx.new(|cx| Markdown::new(entry.content.into(), None, None, cx)),
957 priority: entry.priority,
958 status: entry.status,
959 }
960 }
961}
962
963#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
964pub struct TokenUsage {
965 pub max_tokens: u64,
966 pub used_tokens: u64,
967 pub input_tokens: u64,
968 pub output_tokens: u64,
969 pub max_output_tokens: Option<u64>,
970}
971
972pub const TOKEN_USAGE_WARNING_THRESHOLD: f32 = 0.8;
973
974impl TokenUsage {
975 pub fn ratio(&self) -> TokenUsageRatio {
976 #[cfg(debug_assertions)]
977 let warning_threshold: f32 = std::env::var("ZED_THREAD_WARNING_THRESHOLD")
978 .unwrap_or(TOKEN_USAGE_WARNING_THRESHOLD.to_string())
979 .parse()
980 .unwrap();
981 #[cfg(not(debug_assertions))]
982 let warning_threshold: f32 = TOKEN_USAGE_WARNING_THRESHOLD;
983
984 // When the maximum is unknown because there is no selected model,
985 // avoid showing the token limit warning.
986 if self.max_tokens == 0 {
987 TokenUsageRatio::Normal
988 } else if self.used_tokens >= self.max_tokens {
989 TokenUsageRatio::Exceeded
990 } else if self.used_tokens as f32 / self.max_tokens as f32 >= warning_threshold {
991 TokenUsageRatio::Warning
992 } else {
993 TokenUsageRatio::Normal
994 }
995 }
996}
997
998#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
999pub enum TokenUsageRatio {
1000 Normal,
1001 Warning,
1002 Exceeded,
1003}
1004
1005#[derive(Debug, Clone)]
1006pub struct RetryStatus {
1007 pub last_error: SharedString,
1008 pub attempt: usize,
1009 pub max_attempts: usize,
1010 pub started_at: Instant,
1011 pub duration: Duration,
1012}
1013
1014struct RunningTurn {
1015 id: u32,
1016 send_task: Task<()>,
1017}
1018
1019pub struct AcpThread {
1020 session_id: acp::SessionId,
1021 work_dirs: Option<PathList>,
1022 parent_session_id: Option<acp::SessionId>,
1023 title: Option<SharedString>,
1024 provisional_title: Option<SharedString>,
1025 entries: Vec<AgentThreadEntry>,
1026 plan: Plan,
1027 project: Entity<Project>,
1028 action_log: Entity<ActionLog>,
1029 shared_buffers: HashMap<Entity<Buffer>, BufferSnapshot>,
1030 turn_id: u32,
1031 running_turn: Option<RunningTurn>,
1032 connection: Rc<dyn AgentConnection>,
1033 token_usage: Option<TokenUsage>,
1034 prompt_capabilities: acp::PromptCapabilities,
1035 available_commands: Vec<acp::AvailableCommand>,
1036 _observe_prompt_capabilities: Task<anyhow::Result<()>>,
1037 terminals: HashMap<acp::TerminalId, Entity<Terminal>>,
1038 pending_terminal_output: HashMap<acp::TerminalId, Vec<Vec<u8>>>,
1039 pending_terminal_exit: HashMap<acp::TerminalId, acp::TerminalExitStatus>,
1040 had_error: bool,
1041 /// The user's unsent prompt text, persisted so it can be restored when reloading the thread.
1042 draft_prompt: Option<Vec<acp::ContentBlock>>,
1043 /// The initial scroll position for the thread view, set during session registration.
1044 ui_scroll_position: Option<gpui::ListOffset>,
1045 /// Buffer for smooth text streaming. Holds text that has been received from
1046 /// the model but not yet revealed in the UI. A timer task drains this buffer
1047 /// gradually to create a fluid typing effect instead of choppy chunk-at-a-time
1048 /// updates.
1049 streaming_text_buffer: Option<StreamingTextBuffer>,
1050}
1051
1052struct StreamingTextBuffer {
1053 /// Text received from the model but not yet appended to the Markdown source.
1054 pending: String,
1055 /// The number of bytes to reveal per timer turn.
1056 bytes_to_reveal_per_tick: usize,
1057 /// The Markdown entity being streamed into.
1058 target: Entity<Markdown>,
1059 /// Timer task that periodically moves text from `pending` into `source`.
1060 _reveal_task: Task<()>,
1061}
1062
1063impl StreamingTextBuffer {
1064 /// The number of milliseconds between each timer tick, controlling how quickly
1065 /// text is revealed.
1066 const TASK_UPDATE_MS: u64 = 16;
1067 /// The time in milliseconds to reveal the entire pending text.
1068 const REVEAL_TARGET: f32 = 200.0;
1069}
1070
1071impl From<&AcpThread> for ActionLogTelemetry {
1072 fn from(value: &AcpThread) -> Self {
1073 Self {
1074 agent_telemetry_id: value.connection().telemetry_id(),
1075 session_id: value.session_id.0.clone(),
1076 }
1077 }
1078}
1079
1080#[derive(Debug)]
1081pub enum AcpThreadEvent {
1082 NewEntry,
1083 TitleUpdated,
1084 TokenUsageUpdated,
1085 EntryUpdated(usize),
1086 EntriesRemoved(Range<usize>),
1087 ToolAuthorizationRequested(acp::ToolCallId),
1088 ToolAuthorizationReceived(acp::ToolCallId),
1089 Retry(RetryStatus),
1090 SubagentSpawned(acp::SessionId),
1091 Stopped(acp::StopReason),
1092 Error,
1093 LoadError(LoadError),
1094 PromptCapabilitiesUpdated,
1095 Refusal,
1096 AvailableCommandsUpdated(Vec<acp::AvailableCommand>),
1097 ModeUpdated(acp::SessionModeId),
1098 ConfigOptionsUpdated(Vec<acp::SessionConfigOption>),
1099 WorkingDirectoriesUpdated,
1100}
1101
1102impl EventEmitter<AcpThreadEvent> for AcpThread {}
1103
1104#[derive(Debug, Clone)]
1105pub enum TerminalProviderEvent {
1106 Created {
1107 terminal_id: acp::TerminalId,
1108 label: String,
1109 cwd: Option<PathBuf>,
1110 output_byte_limit: Option<u64>,
1111 terminal: Entity<::terminal::Terminal>,
1112 },
1113 Output {
1114 terminal_id: acp::TerminalId,
1115 data: Vec<u8>,
1116 },
1117 TitleChanged {
1118 terminal_id: acp::TerminalId,
1119 title: String,
1120 },
1121 Exit {
1122 terminal_id: acp::TerminalId,
1123 status: acp::TerminalExitStatus,
1124 },
1125}
1126
1127#[derive(Debug, Clone)]
1128pub enum TerminalProviderCommand {
1129 WriteInput {
1130 terminal_id: acp::TerminalId,
1131 bytes: Vec<u8>,
1132 },
1133 Resize {
1134 terminal_id: acp::TerminalId,
1135 cols: u16,
1136 rows: u16,
1137 },
1138 Close {
1139 terminal_id: acp::TerminalId,
1140 },
1141}
1142
1143#[derive(PartialEq, Eq, Debug)]
1144pub enum ThreadStatus {
1145 Idle,
1146 Generating,
1147}
1148
1149#[derive(Debug, Clone)]
1150pub enum LoadError {
1151 Unsupported {
1152 command: SharedString,
1153 current_version: SharedString,
1154 minimum_version: SharedString,
1155 },
1156 FailedToInstall(SharedString),
1157 Exited {
1158 status: ExitStatus,
1159 },
1160 Other(SharedString),
1161}
1162
1163impl Display for LoadError {
1164 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1165 match self {
1166 LoadError::Unsupported {
1167 command: path,
1168 current_version,
1169 minimum_version,
1170 } => {
1171 write!(
1172 f,
1173 "version {current_version} from {path} is not supported (need at least {minimum_version})"
1174 )
1175 }
1176 LoadError::FailedToInstall(msg) => write!(f, "Failed to install: {msg}"),
1177 LoadError::Exited { status } => write!(f, "Server exited with status {status}"),
1178 LoadError::Other(msg) => write!(f, "{msg}"),
1179 }
1180 }
1181}
1182
1183impl Error for LoadError {}
1184
1185impl AcpThread {
1186 pub fn new(
1187 parent_session_id: Option<acp::SessionId>,
1188 title: Option<SharedString>,
1189 work_dirs: Option<PathList>,
1190 connection: Rc<dyn AgentConnection>,
1191 project: Entity<Project>,
1192 action_log: Entity<ActionLog>,
1193 session_id: acp::SessionId,
1194 mut prompt_capabilities_rx: watch::Receiver<acp::PromptCapabilities>,
1195 cx: &mut Context<Self>,
1196 ) -> Self {
1197 let prompt_capabilities = prompt_capabilities_rx.borrow().clone();
1198 let task = cx.spawn::<_, anyhow::Result<()>>(async move |this, cx| {
1199 loop {
1200 let caps = prompt_capabilities_rx.recv().await?;
1201 this.update(cx, |this, cx| {
1202 this.prompt_capabilities = caps;
1203 cx.emit(AcpThreadEvent::PromptCapabilitiesUpdated);
1204 })?;
1205 }
1206 });
1207
1208 Self {
1209 parent_session_id,
1210 work_dirs,
1211 action_log,
1212 shared_buffers: Default::default(),
1213 entries: Default::default(),
1214 plan: Default::default(),
1215 title,
1216 provisional_title: None,
1217 project,
1218 running_turn: None,
1219 turn_id: 0,
1220 connection,
1221 session_id,
1222 token_usage: None,
1223 prompt_capabilities,
1224 available_commands: Vec::new(),
1225 _observe_prompt_capabilities: task,
1226 terminals: HashMap::default(),
1227 pending_terminal_output: HashMap::default(),
1228 pending_terminal_exit: HashMap::default(),
1229 had_error: false,
1230 draft_prompt: None,
1231 ui_scroll_position: None,
1232 streaming_text_buffer: None,
1233 }
1234 }
1235
1236 pub fn parent_session_id(&self) -> Option<&acp::SessionId> {
1237 self.parent_session_id.as_ref()
1238 }
1239
1240 pub fn prompt_capabilities(&self) -> acp::PromptCapabilities {
1241 self.prompt_capabilities.clone()
1242 }
1243
1244 pub fn available_commands(&self) -> &[acp::AvailableCommand] {
1245 &self.available_commands
1246 }
1247
1248 pub fn draft_prompt(&self) -> Option<&[acp::ContentBlock]> {
1249 self.draft_prompt.as_deref()
1250 }
1251
1252 pub fn set_draft_prompt(&mut self, prompt: Option<Vec<acp::ContentBlock>>) {
1253 self.draft_prompt = prompt;
1254 }
1255
1256 pub fn ui_scroll_position(&self) -> Option<gpui::ListOffset> {
1257 self.ui_scroll_position
1258 }
1259
1260 pub fn set_ui_scroll_position(&mut self, position: Option<gpui::ListOffset>) {
1261 self.ui_scroll_position = position;
1262 }
1263
1264 pub fn connection(&self) -> &Rc<dyn AgentConnection> {
1265 &self.connection
1266 }
1267
1268 pub fn action_log(&self) -> &Entity<ActionLog> {
1269 &self.action_log
1270 }
1271
1272 pub fn project(&self) -> &Entity<Project> {
1273 &self.project
1274 }
1275
1276 pub fn title(&self) -> Option<SharedString> {
1277 self.title
1278 .clone()
1279 .or_else(|| self.provisional_title.clone())
1280 }
1281
1282 pub fn has_provisional_title(&self) -> bool {
1283 self.provisional_title.is_some()
1284 }
1285
1286 pub fn entries(&self) -> &[AgentThreadEntry] {
1287 &self.entries
1288 }
1289
1290 pub fn session_id(&self) -> &acp::SessionId {
1291 &self.session_id
1292 }
1293
1294 pub fn work_dirs(&self) -> Option<&PathList> {
1295 self.work_dirs.as_ref()
1296 }
1297
1298 pub fn set_work_dirs(&mut self, work_dirs: PathList, cx: &mut Context<Self>) {
1299 self.work_dirs = Some(work_dirs);
1300 cx.emit(AcpThreadEvent::WorkingDirectoriesUpdated)
1301 }
1302
1303 pub fn status(&self) -> ThreadStatus {
1304 if self.running_turn.is_some() {
1305 ThreadStatus::Generating
1306 } else {
1307 ThreadStatus::Idle
1308 }
1309 }
1310
1311 pub fn had_error(&self) -> bool {
1312 self.had_error
1313 }
1314
1315 pub fn is_waiting_for_confirmation(&self) -> bool {
1316 for entry in self.entries.iter().rev() {
1317 match entry {
1318 AgentThreadEntry::UserMessage(_) => return false,
1319 AgentThreadEntry::ToolCall(ToolCall {
1320 status: ToolCallStatus::WaitingForConfirmation { .. },
1321 ..
1322 }) => return true,
1323 AgentThreadEntry::ToolCall(_)
1324 | AgentThreadEntry::AssistantMessage(_)
1325 | AgentThreadEntry::CompletedPlan(_) => {}
1326 }
1327 }
1328 false
1329 }
1330
1331 pub fn token_usage(&self) -> Option<&TokenUsage> {
1332 self.token_usage.as_ref()
1333 }
1334
1335 pub fn has_pending_edit_tool_calls(&self) -> bool {
1336 for entry in self.entries.iter().rev() {
1337 match entry {
1338 AgentThreadEntry::UserMessage(_) => return false,
1339 AgentThreadEntry::ToolCall(
1340 call @ ToolCall {
1341 status: ToolCallStatus::InProgress | ToolCallStatus::Pending,
1342 ..
1343 },
1344 ) if call.diffs().next().is_some() => {
1345 return true;
1346 }
1347 AgentThreadEntry::ToolCall(_)
1348 | AgentThreadEntry::AssistantMessage(_)
1349 | AgentThreadEntry::CompletedPlan(_) => {}
1350 }
1351 }
1352
1353 false
1354 }
1355
1356 pub fn has_in_progress_tool_calls(&self) -> bool {
1357 for entry in self.entries.iter().rev() {
1358 match entry {
1359 AgentThreadEntry::UserMessage(_) => return false,
1360 AgentThreadEntry::ToolCall(ToolCall {
1361 status: ToolCallStatus::InProgress | ToolCallStatus::Pending,
1362 ..
1363 }) => {
1364 return true;
1365 }
1366 AgentThreadEntry::ToolCall(_)
1367 | AgentThreadEntry::AssistantMessage(_)
1368 | AgentThreadEntry::CompletedPlan(_) => {}
1369 }
1370 }
1371
1372 false
1373 }
1374
1375 pub fn used_tools_since_last_user_message(&self) -> bool {
1376 for entry in self.entries.iter().rev() {
1377 match entry {
1378 AgentThreadEntry::UserMessage(..) => return false,
1379 AgentThreadEntry::AssistantMessage(..) | AgentThreadEntry::CompletedPlan(..) => {
1380 continue;
1381 }
1382 AgentThreadEntry::ToolCall(..) => return true,
1383 }
1384 }
1385
1386 false
1387 }
1388
1389 pub fn handle_session_update(
1390 &mut self,
1391 update: acp::SessionUpdate,
1392 cx: &mut Context<Self>,
1393 ) -> Result<(), acp::Error> {
1394 match update {
1395 acp::SessionUpdate::UserMessageChunk(acp::ContentChunk { content, .. }) => {
1396 self.push_user_content_block(None, content, cx);
1397 }
1398 acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk { content, .. }) => {
1399 self.push_assistant_content_block(content, false, cx);
1400 }
1401 acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk { content, .. }) => {
1402 self.push_assistant_content_block(content, true, cx);
1403 }
1404 acp::SessionUpdate::ToolCall(tool_call) => {
1405 self.upsert_tool_call(tool_call, cx)?;
1406 }
1407 acp::SessionUpdate::ToolCallUpdate(tool_call_update) => {
1408 self.update_tool_call(tool_call_update, cx)?;
1409 }
1410 acp::SessionUpdate::Plan(plan) => {
1411 self.update_plan(plan, cx);
1412 }
1413 acp::SessionUpdate::SessionInfoUpdate(info_update) => {
1414 if let acp::MaybeUndefined::Value(title) = info_update.title {
1415 let had_provisional = self.provisional_title.take().is_some();
1416 let title: SharedString = title.into();
1417 if self.title.as_ref() != Some(&title) {
1418 self.title = Some(title);
1419 cx.emit(AcpThreadEvent::TitleUpdated);
1420 } else if had_provisional {
1421 cx.emit(AcpThreadEvent::TitleUpdated);
1422 }
1423 }
1424 }
1425 acp::SessionUpdate::AvailableCommandsUpdate(acp::AvailableCommandsUpdate {
1426 available_commands,
1427 ..
1428 }) => {
1429 self.available_commands = available_commands.clone();
1430 cx.emit(AcpThreadEvent::AvailableCommandsUpdated(available_commands));
1431 }
1432 acp::SessionUpdate::CurrentModeUpdate(acp::CurrentModeUpdate {
1433 current_mode_id,
1434 ..
1435 }) => cx.emit(AcpThreadEvent::ModeUpdated(current_mode_id)),
1436 acp::SessionUpdate::ConfigOptionUpdate(acp::ConfigOptionUpdate {
1437 config_options,
1438 ..
1439 }) => cx.emit(AcpThreadEvent::ConfigOptionsUpdated(config_options)),
1440 _ => {}
1441 }
1442 Ok(())
1443 }
1444
1445 pub fn push_user_content_block(
1446 &mut self,
1447 message_id: Option<UserMessageId>,
1448 chunk: acp::ContentBlock,
1449 cx: &mut Context<Self>,
1450 ) {
1451 self.push_user_content_block_with_indent(message_id, chunk, false, cx)
1452 }
1453
1454 pub fn push_user_content_block_with_indent(
1455 &mut self,
1456 message_id: Option<UserMessageId>,
1457 chunk: acp::ContentBlock,
1458 indented: bool,
1459 cx: &mut Context<Self>,
1460 ) {
1461 let language_registry = self.project.read(cx).languages().clone();
1462 let path_style = self.project.read(cx).path_style(cx);
1463 let entries_len = self.entries.len();
1464
1465 if let Some(last_entry) = self.entries.last_mut()
1466 && let AgentThreadEntry::UserMessage(UserMessage {
1467 id,
1468 content,
1469 chunks,
1470 indented: existing_indented,
1471 ..
1472 }) = last_entry
1473 && *existing_indented == indented
1474 {
1475 Self::flush_streaming_text(&mut self.streaming_text_buffer, cx);
1476 *id = message_id.or(id.take());
1477 content.append(chunk.clone(), &language_registry, path_style, cx);
1478 chunks.push(chunk);
1479 let idx = entries_len - 1;
1480 cx.emit(AcpThreadEvent::EntryUpdated(idx));
1481 } else {
1482 let content = ContentBlock::new(chunk.clone(), &language_registry, path_style, cx);
1483 self.push_entry(
1484 AgentThreadEntry::UserMessage(UserMessage {
1485 id: message_id,
1486 content,
1487 chunks: vec![chunk],
1488 checkpoint: None,
1489 indented,
1490 }),
1491 cx,
1492 );
1493 }
1494 }
1495
1496 pub fn push_assistant_content_block(
1497 &mut self,
1498 chunk: acp::ContentBlock,
1499 is_thought: bool,
1500 cx: &mut Context<Self>,
1501 ) {
1502 self.push_assistant_content_block_with_indent(chunk, is_thought, false, cx)
1503 }
1504
1505 pub fn push_assistant_content_block_with_indent(
1506 &mut self,
1507 chunk: acp::ContentBlock,
1508 is_thought: bool,
1509 indented: bool,
1510 cx: &mut Context<Self>,
1511 ) {
1512 let path_style = self.project.read(cx).path_style(cx);
1513
1514 // For text chunks going to an existing Markdown block, buffer for smooth
1515 // streaming instead of appending all at once which may feel more choppy.
1516 if let acp::ContentBlock::Text(text_content) = &chunk {
1517 if let Some(markdown) = self.streaming_markdown_target(is_thought, indented) {
1518 let entries_len = self.entries.len();
1519 cx.emit(AcpThreadEvent::EntryUpdated(entries_len - 1));
1520 self.buffer_streaming_text(&markdown, text_content.text.clone(), cx);
1521 return;
1522 }
1523 }
1524
1525 let language_registry = self.project.read(cx).languages().clone();
1526 let entries_len = self.entries.len();
1527 if let Some(last_entry) = self.entries.last_mut()
1528 && let AgentThreadEntry::AssistantMessage(AssistantMessage {
1529 chunks,
1530 indented: existing_indented,
1531 is_subagent_output: _,
1532 }) = last_entry
1533 && *existing_indented == indented
1534 {
1535 let idx = entries_len - 1;
1536 Self::flush_streaming_text(&mut self.streaming_text_buffer, cx);
1537 cx.emit(AcpThreadEvent::EntryUpdated(idx));
1538 match (chunks.last_mut(), is_thought) {
1539 (Some(AssistantMessageChunk::Message { block }), false)
1540 | (Some(AssistantMessageChunk::Thought { block }), true) => {
1541 block.append(chunk, &language_registry, path_style, cx)
1542 }
1543 _ => {
1544 let block = ContentBlock::new(chunk, &language_registry, path_style, cx);
1545 if is_thought {
1546 chunks.push(AssistantMessageChunk::Thought { block })
1547 } else {
1548 chunks.push(AssistantMessageChunk::Message { block })
1549 }
1550 }
1551 }
1552 } else {
1553 let block = ContentBlock::new(chunk, &language_registry, path_style, cx);
1554 let chunk = if is_thought {
1555 AssistantMessageChunk::Thought { block }
1556 } else {
1557 AssistantMessageChunk::Message { block }
1558 };
1559
1560 self.push_entry(
1561 AgentThreadEntry::AssistantMessage(AssistantMessage {
1562 chunks: vec![chunk],
1563 indented,
1564 is_subagent_output: false,
1565 }),
1566 cx,
1567 );
1568 }
1569 }
1570
1571 fn streaming_markdown_target(
1572 &self,
1573 is_thought: bool,
1574 indented: bool,
1575 ) -> Option<Entity<Markdown>> {
1576 let last_entry = self.entries.last()?;
1577 if let AgentThreadEntry::AssistantMessage(AssistantMessage {
1578 chunks,
1579 indented: existing_indented,
1580 ..
1581 }) = last_entry
1582 && *existing_indented == indented
1583 && let [.., chunk] = chunks.as_slice()
1584 {
1585 match (chunk, is_thought) {
1586 (
1587 AssistantMessageChunk::Message {
1588 block: ContentBlock::Markdown { markdown },
1589 },
1590 false,
1591 )
1592 | (
1593 AssistantMessageChunk::Thought {
1594 block: ContentBlock::Markdown { markdown },
1595 },
1596 true,
1597 ) => Some(markdown.clone()),
1598 _ => None,
1599 }
1600 } else {
1601 None
1602 }
1603 }
1604
1605 /// Add text to the streaming buffer. If the target changed (e.g. switching
1606 /// from thoughts to message text), flush the old buffer first.
1607 fn buffer_streaming_text(
1608 &mut self,
1609 markdown: &Entity<Markdown>,
1610 text: String,
1611 cx: &mut Context<Self>,
1612 ) {
1613 if let Some(buffer) = &mut self.streaming_text_buffer {
1614 if buffer.target.entity_id() == markdown.entity_id() {
1615 buffer.pending.push_str(&text);
1616
1617 buffer.bytes_to_reveal_per_tick = (buffer.pending.len() as f32
1618 / StreamingTextBuffer::REVEAL_TARGET
1619 * StreamingTextBuffer::TASK_UPDATE_MS as f32)
1620 .ceil() as usize;
1621 return;
1622 }
1623 Self::flush_streaming_text(&mut self.streaming_text_buffer, cx);
1624 }
1625
1626 let target = markdown.clone();
1627 let _reveal_task = self.start_streaming_reveal(cx);
1628 let pending_len = text.len();
1629 let bytes_to_reveal = (pending_len as f32 / StreamingTextBuffer::REVEAL_TARGET
1630 * StreamingTextBuffer::TASK_UPDATE_MS as f32)
1631 .ceil() as usize;
1632 self.streaming_text_buffer = Some(StreamingTextBuffer {
1633 pending: text,
1634 bytes_to_reveal_per_tick: bytes_to_reveal,
1635 target,
1636 _reveal_task,
1637 });
1638 }
1639
1640 /// Flush all buffered streaming text into the Markdown entity immediately.
1641 fn flush_streaming_text(
1642 streaming_text_buffer: &mut Option<StreamingTextBuffer>,
1643 cx: &mut Context<Self>,
1644 ) {
1645 if let Some(buffer) = streaming_text_buffer.take() {
1646 if !buffer.pending.is_empty() {
1647 buffer
1648 .target
1649 .update(cx, |markdown, cx| markdown.append(&buffer.pending, cx));
1650 }
1651 }
1652 }
1653
1654 /// Spawns a foreground task that periodically drains
1655 /// `streaming_text_buffer.pending` into the target `Markdown` entity,
1656 /// producing smooth, continuous text output.
1657 fn start_streaming_reveal(&self, cx: &mut Context<Self>) -> Task<()> {
1658 cx.spawn(async move |this, cx| {
1659 loop {
1660 cx.background_executor()
1661 .timer(Duration::from_millis(StreamingTextBuffer::TASK_UPDATE_MS))
1662 .await;
1663
1664 let should_continue = this
1665 .update(cx, |this, cx| {
1666 let Some(buffer) = &mut this.streaming_text_buffer else {
1667 return false;
1668 };
1669
1670 if buffer.pending.is_empty() {
1671 return true;
1672 }
1673
1674 let pending_len = buffer.pending.len();
1675
1676 let byte_boundary = buffer
1677 .pending
1678 .ceil_char_boundary(buffer.bytes_to_reveal_per_tick)
1679 .min(pending_len);
1680
1681 buffer.target.update(cx, |markdown: &mut Markdown, cx| {
1682 markdown.append(&buffer.pending[..byte_boundary], cx);
1683 buffer.pending.drain(..byte_boundary);
1684 });
1685
1686 true
1687 })
1688 .unwrap_or(false);
1689
1690 if !should_continue {
1691 break;
1692 }
1693 }
1694 })
1695 }
1696
1697 fn push_entry(&mut self, entry: AgentThreadEntry, cx: &mut Context<Self>) {
1698 Self::flush_streaming_text(&mut self.streaming_text_buffer, cx);
1699 self.entries.push(entry);
1700 cx.emit(AcpThreadEvent::NewEntry);
1701 }
1702
1703 pub fn can_set_title(&mut self, cx: &mut Context<Self>) -> bool {
1704 self.connection.set_title(&self.session_id, cx).is_some()
1705 }
1706
1707 pub fn set_title(&mut self, title: SharedString, cx: &mut Context<Self>) -> Task<Result<()>> {
1708 let had_provisional = self.provisional_title.take().is_some();
1709 if self.title.as_ref() != Some(&title) {
1710 self.title = Some(title.clone());
1711 cx.emit(AcpThreadEvent::TitleUpdated);
1712 if let Some(set_title) = self.connection.set_title(&self.session_id, cx) {
1713 return set_title.run(title, cx);
1714 }
1715 } else if had_provisional {
1716 cx.emit(AcpThreadEvent::TitleUpdated);
1717 }
1718 Task::ready(Ok(()))
1719 }
1720
1721 /// Sets a provisional display title without propagating back to the
1722 /// underlying agent connection. This is used for quick preview titles
1723 /// (e.g. first 20 chars of the user message) that should be shown
1724 /// immediately but replaced once the LLM generates a proper title via
1725 /// `set_title`.
1726 pub fn set_provisional_title(&mut self, title: SharedString, cx: &mut Context<Self>) {
1727 self.provisional_title = Some(title);
1728 cx.emit(AcpThreadEvent::TitleUpdated);
1729 }
1730
1731 pub fn subagent_spawned(&mut self, session_id: acp::SessionId, cx: &mut Context<Self>) {
1732 cx.emit(AcpThreadEvent::SubagentSpawned(session_id));
1733 }
1734
1735 pub fn update_token_usage(&mut self, usage: Option<TokenUsage>, cx: &mut Context<Self>) {
1736 self.token_usage = usage;
1737 cx.emit(AcpThreadEvent::TokenUsageUpdated);
1738 }
1739
1740 pub fn update_retry_status(&mut self, status: RetryStatus, cx: &mut Context<Self>) {
1741 cx.emit(AcpThreadEvent::Retry(status));
1742 }
1743
1744 pub fn update_tool_call(
1745 &mut self,
1746 update: impl Into<ToolCallUpdate>,
1747 cx: &mut Context<Self>,
1748 ) -> Result<()> {
1749 let update = update.into();
1750 let languages = self.project.read(cx).languages().clone();
1751 let path_style = self.project.read(cx).path_style(cx);
1752
1753 let ix = match self.index_for_tool_call(update.id()) {
1754 Some(ix) => ix,
1755 None => {
1756 // Tool call not found - create a failed tool call entry
1757 let failed_tool_call = ToolCall {
1758 id: update.id().clone(),
1759 label: cx.new(|cx| Markdown::new("Tool call not found".into(), None, None, cx)),
1760 kind: acp::ToolKind::Fetch,
1761 content: vec![ToolCallContent::ContentBlock(ContentBlock::new(
1762 "Tool call not found".into(),
1763 &languages,
1764 path_style,
1765 cx,
1766 ))],
1767 status: ToolCallStatus::Failed,
1768 locations: Vec::new(),
1769 resolved_locations: Vec::new(),
1770 raw_input: None,
1771 raw_input_markdown: None,
1772 raw_output: None,
1773 tool_name: None,
1774 subagent_session_info: None,
1775 };
1776 self.push_entry(AgentThreadEntry::ToolCall(failed_tool_call), cx);
1777 return Ok(());
1778 }
1779 };
1780 let AgentThreadEntry::ToolCall(call) = &mut self.entries[ix] else {
1781 unreachable!()
1782 };
1783
1784 match update {
1785 ToolCallUpdate::UpdateFields(update) => {
1786 let location_updated = update.fields.locations.is_some();
1787 call.update_fields(
1788 update.fields,
1789 update.meta,
1790 languages,
1791 path_style,
1792 &self.terminals,
1793 cx,
1794 )?;
1795 if location_updated {
1796 self.resolve_locations(update.tool_call_id, cx);
1797 }
1798 }
1799 ToolCallUpdate::UpdateDiff(update) => {
1800 call.content.clear();
1801 call.content.push(ToolCallContent::Diff(update.diff));
1802 }
1803 ToolCallUpdate::UpdateTerminal(update) => {
1804 call.content.clear();
1805 call.content
1806 .push(ToolCallContent::Terminal(update.terminal));
1807 }
1808 }
1809
1810 cx.emit(AcpThreadEvent::EntryUpdated(ix));
1811
1812 Ok(())
1813 }
1814
1815 /// Updates a tool call if id matches an existing entry, otherwise inserts a new one.
1816 pub fn upsert_tool_call(
1817 &mut self,
1818 tool_call: acp::ToolCall,
1819 cx: &mut Context<Self>,
1820 ) -> Result<(), acp::Error> {
1821 let status = tool_call.status.into();
1822 self.upsert_tool_call_inner(tool_call.into(), status, cx)
1823 }
1824
1825 /// Fails if id does not match an existing entry.
1826 pub fn upsert_tool_call_inner(
1827 &mut self,
1828 update: acp::ToolCallUpdate,
1829 status: ToolCallStatus,
1830 cx: &mut Context<Self>,
1831 ) -> Result<(), acp::Error> {
1832 let language_registry = self.project.read(cx).languages().clone();
1833 let path_style = self.project.read(cx).path_style(cx);
1834 let id = update.tool_call_id.clone();
1835
1836 let agent_telemetry_id = self.connection().telemetry_id();
1837 let session = self.session_id();
1838 let parent_session_id = self.parent_session_id();
1839 if let ToolCallStatus::Completed | ToolCallStatus::Failed = status {
1840 let status = if matches!(status, ToolCallStatus::Completed) {
1841 "completed"
1842 } else {
1843 "failed"
1844 };
1845 telemetry::event!(
1846 "Agent Tool Call Completed",
1847 agent_telemetry_id,
1848 session,
1849 parent_session_id,
1850 status
1851 );
1852 }
1853
1854 if let Some(ix) = self.index_for_tool_call(&id) {
1855 let AgentThreadEntry::ToolCall(call) = &mut self.entries[ix] else {
1856 unreachable!()
1857 };
1858
1859 call.update_fields(
1860 update.fields,
1861 update.meta,
1862 language_registry,
1863 path_style,
1864 &self.terminals,
1865 cx,
1866 )?;
1867 call.status = status;
1868
1869 cx.emit(AcpThreadEvent::EntryUpdated(ix));
1870 } else {
1871 let call = ToolCall::from_acp(
1872 update.try_into()?,
1873 status,
1874 language_registry,
1875 self.project.read(cx).path_style(cx),
1876 &self.terminals,
1877 cx,
1878 )?;
1879 self.push_entry(AgentThreadEntry::ToolCall(call), cx);
1880 };
1881
1882 self.resolve_locations(id, cx);
1883 Ok(())
1884 }
1885
1886 fn index_for_tool_call(&self, id: &acp::ToolCallId) -> Option<usize> {
1887 self.entries
1888 .iter()
1889 .enumerate()
1890 .rev()
1891 .find_map(|(index, entry)| {
1892 if let AgentThreadEntry::ToolCall(tool_call) = entry
1893 && &tool_call.id == id
1894 {
1895 Some(index)
1896 } else {
1897 None
1898 }
1899 })
1900 }
1901
1902 fn tool_call_mut(&mut self, id: &acp::ToolCallId) -> Option<(usize, &mut ToolCall)> {
1903 // The tool call we are looking for is typically the last one, or very close to the end.
1904 // At the moment, it doesn't seem like a hashmap would be a good fit for this use case.
1905 self.entries
1906 .iter_mut()
1907 .enumerate()
1908 .rev()
1909 .find_map(|(index, tool_call)| {
1910 if let AgentThreadEntry::ToolCall(tool_call) = tool_call
1911 && &tool_call.id == id
1912 {
1913 Some((index, tool_call))
1914 } else {
1915 None
1916 }
1917 })
1918 }
1919
1920 pub fn tool_call(&self, id: &acp::ToolCallId) -> Option<(usize, &ToolCall)> {
1921 self.entries
1922 .iter()
1923 .enumerate()
1924 .rev()
1925 .find_map(|(index, tool_call)| {
1926 if let AgentThreadEntry::ToolCall(tool_call) = tool_call
1927 && &tool_call.id == id
1928 {
1929 Some((index, tool_call))
1930 } else {
1931 None
1932 }
1933 })
1934 }
1935
1936 pub fn tool_call_for_subagent(&self, session_id: &acp::SessionId) -> Option<&ToolCall> {
1937 self.entries.iter().find_map(|entry| match entry {
1938 AgentThreadEntry::ToolCall(tool_call) => {
1939 if let Some(subagent_session_info) = &tool_call.subagent_session_info
1940 && &subagent_session_info.session_id == session_id
1941 {
1942 Some(tool_call)
1943 } else {
1944 None
1945 }
1946 }
1947 _ => None,
1948 })
1949 }
1950
1951 pub fn resolve_locations(&mut self, id: acp::ToolCallId, cx: &mut Context<Self>) {
1952 let project = self.project.clone();
1953 let should_update_agent_location = self.parent_session_id.is_none();
1954 let Some((_, tool_call)) = self.tool_call_mut(&id) else {
1955 return;
1956 };
1957 let task = tool_call.resolve_locations(project, cx);
1958 cx.spawn(async move |this, cx| {
1959 let resolved_locations = task.await;
1960
1961 this.update(cx, |this, cx| {
1962 let project = this.project.clone();
1963
1964 for location in resolved_locations.iter().flatten() {
1965 this.shared_buffers
1966 .insert(location.buffer.clone(), location.buffer.read(cx).snapshot());
1967 }
1968 let Some((ix, tool_call)) = this.tool_call_mut(&id) else {
1969 return;
1970 };
1971
1972 if let Some(Some(location)) = resolved_locations.last() {
1973 project.update(cx, |project, cx| {
1974 let should_ignore = if let Some(agent_location) = project
1975 .agent_location()
1976 .filter(|agent_location| agent_location.buffer == location.buffer)
1977 {
1978 let snapshot = location.buffer.read(cx).snapshot();
1979 let old_position = agent_location.position.to_point(&snapshot);
1980 let new_position = location.position.to_point(&snapshot);
1981
1982 // ignore this so that when we get updates from the edit tool
1983 // the position doesn't reset to the startof line
1984 old_position.row == new_position.row
1985 && old_position.column > new_position.column
1986 } else {
1987 false
1988 };
1989 if !should_ignore && should_update_agent_location {
1990 project.set_agent_location(Some(location.into()), cx);
1991 }
1992 });
1993 }
1994
1995 let resolved_locations = resolved_locations
1996 .iter()
1997 .map(|l| l.as_ref().map(|l| AgentLocation::from(l)))
1998 .collect::<Vec<_>>();
1999
2000 if tool_call.resolved_locations != resolved_locations {
2001 tool_call.resolved_locations = resolved_locations;
2002 cx.emit(AcpThreadEvent::EntryUpdated(ix));
2003 }
2004 })
2005 })
2006 .detach();
2007 }
2008
2009 pub fn request_tool_call_authorization(
2010 &mut self,
2011 tool_call: acp::ToolCallUpdate,
2012 options: PermissionOptions,
2013 cx: &mut Context<Self>,
2014 ) -> Result<Task<RequestPermissionOutcome>> {
2015 let (tx, rx) = oneshot::channel();
2016
2017 let status = ToolCallStatus::WaitingForConfirmation {
2018 options,
2019 respond_tx: tx,
2020 };
2021
2022 let tool_call_id = tool_call.tool_call_id.clone();
2023 self.upsert_tool_call_inner(tool_call, status, cx)?;
2024 cx.emit(AcpThreadEvent::ToolAuthorizationRequested(
2025 tool_call_id.clone(),
2026 ));
2027
2028 Ok(cx.spawn(async move |this, cx| {
2029 let outcome = match rx.await {
2030 Ok(outcome) => RequestPermissionOutcome::Selected(outcome),
2031 Err(oneshot::Canceled) => RequestPermissionOutcome::Cancelled,
2032 };
2033 this.update(cx, |_this, cx| {
2034 cx.emit(AcpThreadEvent::ToolAuthorizationReceived(tool_call_id))
2035 })
2036 .ok();
2037 outcome
2038 }))
2039 }
2040
2041 pub fn authorize_tool_call(
2042 &mut self,
2043 id: acp::ToolCallId,
2044 outcome: SelectedPermissionOutcome,
2045 cx: &mut Context<Self>,
2046 ) {
2047 let Some((ix, call)) = self.tool_call_mut(&id) else {
2048 return;
2049 };
2050
2051 let new_status = match outcome.option_kind {
2052 acp::PermissionOptionKind::RejectOnce | acp::PermissionOptionKind::RejectAlways => {
2053 ToolCallStatus::Rejected
2054 }
2055 acp::PermissionOptionKind::AllowOnce | acp::PermissionOptionKind::AllowAlways => {
2056 ToolCallStatus::InProgress
2057 }
2058 _ => ToolCallStatus::InProgress,
2059 };
2060
2061 let curr_status = mem::replace(&mut call.status, new_status);
2062
2063 if let ToolCallStatus::WaitingForConfirmation { respond_tx, .. } = curr_status {
2064 respond_tx.send(outcome).log_err();
2065 } else if cfg!(debug_assertions) {
2066 panic!("tried to authorize an already authorized tool call");
2067 }
2068
2069 cx.emit(AcpThreadEvent::EntryUpdated(ix));
2070 }
2071
2072 pub fn plan(&self) -> &Plan {
2073 &self.plan
2074 }
2075
2076 pub fn update_plan(&mut self, request: acp::Plan, cx: &mut Context<Self>) {
2077 let new_entries_len = request.entries.len();
2078 let mut new_entries = request.entries.into_iter();
2079
2080 // Reuse existing markdown to prevent flickering
2081 for (old, new) in self.plan.entries.iter_mut().zip(new_entries.by_ref()) {
2082 let PlanEntry {
2083 content,
2084 priority,
2085 status,
2086 } = old;
2087 content.update(cx, |old, cx| {
2088 old.replace(new.content, cx);
2089 });
2090 *priority = new.priority;
2091 *status = new.status;
2092 }
2093 for new in new_entries {
2094 self.plan.entries.push(PlanEntry::from_acp(new, cx))
2095 }
2096 self.plan.entries.truncate(new_entries_len);
2097
2098 cx.notify();
2099 }
2100
2101 pub fn snapshot_completed_plan(&mut self, cx: &mut Context<Self>) {
2102 if !self.plan.is_empty() && self.plan.stats().pending == 0 {
2103 let completed_entries = std::mem::take(&mut self.plan.entries);
2104 self.push_entry(AgentThreadEntry::CompletedPlan(completed_entries), cx);
2105 }
2106 }
2107
2108 fn clear_completed_plan_entries(&mut self, cx: &mut Context<Self>) {
2109 self.plan
2110 .entries
2111 .retain(|entry| !matches!(entry.status, acp::PlanEntryStatus::Completed));
2112 cx.notify();
2113 }
2114
2115 pub fn clear_plan(&mut self, cx: &mut Context<Self>) {
2116 self.plan.entries.clear();
2117 cx.notify();
2118 }
2119
2120 #[cfg(any(test, feature = "test-support"))]
2121 pub fn send_raw(
2122 &mut self,
2123 message: &str,
2124 cx: &mut Context<Self>,
2125 ) -> BoxFuture<'static, Result<Option<acp::PromptResponse>>> {
2126 self.send(vec![message.into()], cx)
2127 }
2128
2129 pub fn send(
2130 &mut self,
2131 message: Vec<acp::ContentBlock>,
2132 cx: &mut Context<Self>,
2133 ) -> BoxFuture<'static, Result<Option<acp::PromptResponse>>> {
2134 let block = ContentBlock::new_combined(
2135 message.clone(),
2136 self.project.read(cx).languages().clone(),
2137 self.project.read(cx).path_style(cx),
2138 cx,
2139 );
2140 let request = acp::PromptRequest::new(self.session_id.clone(), message.clone());
2141 let git_store = self.project.read(cx).git_store().clone();
2142
2143 let message_id = if self.connection.truncate(&self.session_id, cx).is_some() {
2144 Some(UserMessageId::new())
2145 } else {
2146 None
2147 };
2148
2149 self.run_turn(cx, async move |this, cx| {
2150 this.update(cx, |this, cx| {
2151 this.push_entry(
2152 AgentThreadEntry::UserMessage(UserMessage {
2153 id: message_id.clone(),
2154 content: block,
2155 chunks: message,
2156 checkpoint: None,
2157 indented: false,
2158 }),
2159 cx,
2160 );
2161 })
2162 .ok();
2163
2164 let old_checkpoint = git_store
2165 .update(cx, |git, cx| git.checkpoint(cx))
2166 .await
2167 .context("failed to get old checkpoint")
2168 .log_err();
2169 this.update(cx, |this, cx| {
2170 if let Some((_ix, message)) = this.last_user_message() {
2171 message.checkpoint = old_checkpoint.map(|git_checkpoint| Checkpoint {
2172 git_checkpoint,
2173 show: false,
2174 });
2175 }
2176 this.connection.prompt(message_id, request, cx)
2177 })?
2178 .await
2179 })
2180 }
2181
2182 pub fn can_retry(&self, cx: &App) -> bool {
2183 self.connection.retry(&self.session_id, cx).is_some()
2184 }
2185
2186 pub fn retry(
2187 &mut self,
2188 cx: &mut Context<Self>,
2189 ) -> BoxFuture<'static, Result<Option<acp::PromptResponse>>> {
2190 self.run_turn(cx, async move |this, cx| {
2191 this.update(cx, |this, cx| {
2192 this.connection
2193 .retry(&this.session_id, cx)
2194 .map(|retry| retry.run(cx))
2195 })?
2196 .context("retrying a session is not supported")?
2197 .await
2198 })
2199 }
2200
2201 fn run_turn(
2202 &mut self,
2203 cx: &mut Context<Self>,
2204 f: impl 'static + AsyncFnOnce(WeakEntity<Self>, &mut AsyncApp) -> Result<acp::PromptResponse>,
2205 ) -> BoxFuture<'static, Result<Option<acp::PromptResponse>>> {
2206 self.clear_completed_plan_entries(cx);
2207 self.had_error = false;
2208
2209 let (tx, rx) = oneshot::channel();
2210 let cancel_task = self.cancel(cx);
2211
2212 self.turn_id += 1;
2213 let turn_id = self.turn_id;
2214 self.running_turn = Some(RunningTurn {
2215 id: turn_id,
2216 send_task: cx.spawn(async move |this, cx| {
2217 cancel_task.await;
2218 tx.send(f(this, cx).await).ok();
2219 }),
2220 });
2221
2222 cx.spawn(async move |this, cx| {
2223 let response = rx.await;
2224
2225 this.update(cx, |this, cx| this.update_last_checkpoint(cx))?
2226 .await?;
2227
2228 this.update(cx, |this, cx| {
2229 if this.parent_session_id.is_none() {
2230 this.project
2231 .update(cx, |project, cx| project.set_agent_location(None, cx));
2232 }
2233 let Ok(response) = response else {
2234 // tx dropped, just return
2235 return Ok(None);
2236 };
2237
2238 let is_same_turn = this
2239 .running_turn
2240 .as_ref()
2241 .is_some_and(|turn| turn_id == turn.id);
2242
2243 // If the user submitted a follow up message, running_turn might
2244 // already point to a different turn. Therefore we only want to
2245 // take the task if it's the same turn.
2246 if is_same_turn {
2247 this.running_turn.take();
2248 }
2249
2250 match response {
2251 Ok(r) => {
2252 Self::flush_streaming_text(&mut this.streaming_text_buffer, cx);
2253
2254 if r.stop_reason == acp::StopReason::MaxTokens {
2255 this.had_error = true;
2256 cx.emit(AcpThreadEvent::Error);
2257 log::error!("Max tokens reached. Usage: {:?}", this.token_usage);
2258
2259 let exceeded_max_output_tokens =
2260 this.token_usage.as_ref().is_some_and(|u| {
2261 u.max_output_tokens
2262 .is_some_and(|max| u.output_tokens >= max)
2263 });
2264
2265 let message = if exceeded_max_output_tokens {
2266 log::error!(
2267 "Max output tokens reached. Usage: {:?}",
2268 this.token_usage
2269 );
2270 "Maximum output tokens reached"
2271 } else {
2272 log::error!("Max tokens reached. Usage: {:?}", this.token_usage);
2273 "Maximum tokens reached"
2274 };
2275 return Err(anyhow!(message));
2276 }
2277
2278 let canceled = matches!(r.stop_reason, acp::StopReason::Cancelled);
2279 if canceled {
2280 this.mark_pending_tools_as_canceled();
2281 }
2282
2283 if !canceled {
2284 this.snapshot_completed_plan(cx);
2285 }
2286
2287 // Handle refusal - distinguish between user prompt and tool call refusals
2288 if let acp::StopReason::Refusal = r.stop_reason {
2289 this.had_error = true;
2290 if let Some((user_msg_ix, _)) = this.last_user_message() {
2291 // Check if there's a completed tool call with results after the last user message
2292 // This indicates the refusal is in response to tool output, not the user's prompt
2293 let has_completed_tool_call_after_user_msg =
2294 this.entries.iter().skip(user_msg_ix + 1).any(|entry| {
2295 if let AgentThreadEntry::ToolCall(tool_call) = entry {
2296 // Check if the tool call has completed and has output
2297 matches!(tool_call.status, ToolCallStatus::Completed)
2298 && tool_call.raw_output.is_some()
2299 } else {
2300 false
2301 }
2302 });
2303
2304 if has_completed_tool_call_after_user_msg {
2305 // Refusal is due to tool output - don't truncate, just notify
2306 // The model refused based on what the tool returned
2307 cx.emit(AcpThreadEvent::Refusal);
2308 } else {
2309 // User prompt was refused - truncate back to before the user message
2310 let range = user_msg_ix..this.entries.len();
2311 if range.start < range.end {
2312 this.entries.truncate(user_msg_ix);
2313 cx.emit(AcpThreadEvent::EntriesRemoved(range));
2314 }
2315 cx.emit(AcpThreadEvent::Refusal);
2316 }
2317 } else {
2318 // No user message found, treat as general refusal
2319 cx.emit(AcpThreadEvent::Refusal);
2320 }
2321 }
2322
2323 cx.emit(AcpThreadEvent::Stopped(r.stop_reason));
2324 Ok(Some(r))
2325 }
2326 Err(e) => {
2327 Self::flush_streaming_text(&mut this.streaming_text_buffer, cx);
2328
2329 this.had_error = true;
2330 cx.emit(AcpThreadEvent::Error);
2331 log::error!("Error in run turn: {:?}", e);
2332 Err(e)
2333 }
2334 }
2335 })?
2336 })
2337 .boxed()
2338 }
2339
2340 pub fn cancel(&mut self, cx: &mut Context<Self>) -> Task<()> {
2341 let Some(turn) = self.running_turn.take() else {
2342 return Task::ready(());
2343 };
2344 self.connection.cancel(&self.session_id, cx);
2345
2346 Self::flush_streaming_text(&mut self.streaming_text_buffer, cx);
2347 self.mark_pending_tools_as_canceled();
2348
2349 // Wait for the send task to complete
2350 cx.background_spawn(turn.send_task)
2351 }
2352
2353 fn mark_pending_tools_as_canceled(&mut self) {
2354 for entry in self.entries.iter_mut() {
2355 if let AgentThreadEntry::ToolCall(call) = entry {
2356 let cancel = matches!(
2357 call.status,
2358 ToolCallStatus::Pending
2359 | ToolCallStatus::WaitingForConfirmation { .. }
2360 | ToolCallStatus::InProgress
2361 );
2362
2363 if cancel {
2364 call.status = ToolCallStatus::Canceled;
2365 }
2366 }
2367 }
2368 }
2369
2370 /// Restores the git working tree to the state at the given checkpoint (if one exists)
2371 pub fn restore_checkpoint(
2372 &mut self,
2373 id: UserMessageId,
2374 cx: &mut Context<Self>,
2375 ) -> Task<Result<()>> {
2376 let Some((_, message)) = self.user_message_mut(&id) else {
2377 return Task::ready(Err(anyhow!("message not found")));
2378 };
2379
2380 let checkpoint = message
2381 .checkpoint
2382 .as_ref()
2383 .map(|c| c.git_checkpoint.clone());
2384
2385 // Cancel any in-progress generation before restoring
2386 let cancel_task = self.cancel(cx);
2387 let rewind = self.rewind(id.clone(), cx);
2388 let git_store = self.project.read(cx).git_store().clone();
2389
2390 cx.spawn(async move |_, cx| {
2391 cancel_task.await;
2392 rewind.await?;
2393 if let Some(checkpoint) = checkpoint {
2394 git_store
2395 .update(cx, |git, cx| git.restore_checkpoint(checkpoint, cx))
2396 .await?;
2397 }
2398
2399 Ok(())
2400 })
2401 }
2402
2403 /// Rewinds this thread to before the entry at `index`, removing it and all
2404 /// subsequent entries while rejecting any action_log changes made from that point.
2405 /// Unlike `restore_checkpoint`, this method does not restore from git.
2406 pub fn rewind(&mut self, id: UserMessageId, cx: &mut Context<Self>) -> Task<Result<()>> {
2407 let Some(truncate) = self.connection.truncate(&self.session_id, cx) else {
2408 return Task::ready(Err(anyhow!("not supported")));
2409 };
2410
2411 Self::flush_streaming_text(&mut self.streaming_text_buffer, cx);
2412 let telemetry = ActionLogTelemetry::from(&*self);
2413 cx.spawn(async move |this, cx| {
2414 cx.update(|cx| truncate.run(id.clone(), cx)).await?;
2415 this.update(cx, |this, cx| {
2416 if let Some((ix, _)) = this.user_message_mut(&id) {
2417 // Collect all terminals from entries that will be removed
2418 let terminals_to_remove: Vec<acp::TerminalId> = this.entries[ix..]
2419 .iter()
2420 .flat_map(|entry| entry.terminals())
2421 .filter_map(|terminal| terminal.read(cx).id().clone().into())
2422 .collect();
2423
2424 let range = ix..this.entries.len();
2425 this.entries.truncate(ix);
2426 cx.emit(AcpThreadEvent::EntriesRemoved(range));
2427
2428 // Kill and remove the terminals
2429 for terminal_id in terminals_to_remove {
2430 if let Some(terminal) = this.terminals.remove(&terminal_id) {
2431 terminal.update(cx, |terminal, cx| {
2432 terminal.kill(cx);
2433 });
2434 }
2435 }
2436 }
2437 this.action_log().update(cx, |action_log, cx| {
2438 action_log.reject_all_edits(Some(telemetry), cx)
2439 })
2440 })?
2441 .await;
2442 Ok(())
2443 })
2444 }
2445
2446 fn update_last_checkpoint(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
2447 let git_store = self.project.read(cx).git_store().clone();
2448
2449 let Some((_, message)) = self.last_user_message() else {
2450 return Task::ready(Ok(()));
2451 };
2452 let Some(user_message_id) = message.id.clone() else {
2453 return Task::ready(Ok(()));
2454 };
2455 let Some(checkpoint) = message.checkpoint.as_ref() else {
2456 return Task::ready(Ok(()));
2457 };
2458 let old_checkpoint = checkpoint.git_checkpoint.clone();
2459
2460 let new_checkpoint = git_store.update(cx, |git, cx| git.checkpoint(cx));
2461 cx.spawn(async move |this, cx| {
2462 let Some(new_checkpoint) = new_checkpoint
2463 .await
2464 .context("failed to get new checkpoint")
2465 .log_err()
2466 else {
2467 return Ok(());
2468 };
2469
2470 let equal = git_store
2471 .update(cx, |git, cx| {
2472 git.compare_checkpoints(old_checkpoint.clone(), new_checkpoint, cx)
2473 })
2474 .await
2475 .unwrap_or(true);
2476
2477 this.update(cx, |this, cx| {
2478 if let Some((ix, message)) = this.user_message_mut(&user_message_id) {
2479 if let Some(checkpoint) = message.checkpoint.as_mut() {
2480 checkpoint.show = !equal;
2481 cx.emit(AcpThreadEvent::EntryUpdated(ix));
2482 }
2483 }
2484 })?;
2485
2486 Ok(())
2487 })
2488 }
2489
2490 fn last_user_message(&mut self) -> Option<(usize, &mut UserMessage)> {
2491 self.entries
2492 .iter_mut()
2493 .enumerate()
2494 .rev()
2495 .find_map(|(ix, entry)| {
2496 if let AgentThreadEntry::UserMessage(message) = entry {
2497 Some((ix, message))
2498 } else {
2499 None
2500 }
2501 })
2502 }
2503
2504 fn user_message_mut(&mut self, id: &UserMessageId) -> Option<(usize, &mut UserMessage)> {
2505 self.entries.iter_mut().enumerate().find_map(|(ix, entry)| {
2506 if let AgentThreadEntry::UserMessage(message) = entry {
2507 if message.id.as_ref() == Some(id) {
2508 Some((ix, message))
2509 } else {
2510 None
2511 }
2512 } else {
2513 None
2514 }
2515 })
2516 }
2517
2518 pub fn read_text_file(
2519 &self,
2520 path: PathBuf,
2521 line: Option<u32>,
2522 limit: Option<u32>,
2523 reuse_shared_snapshot: bool,
2524 cx: &mut Context<Self>,
2525 ) -> Task<Result<String, acp::Error>> {
2526 // Args are 1-based, move to 0-based
2527 let line = line.unwrap_or_default().saturating_sub(1);
2528 let limit = limit.unwrap_or(u32::MAX);
2529 let project = self.project.clone();
2530 let action_log = self.action_log.clone();
2531 let should_update_agent_location = self.parent_session_id.is_none();
2532 cx.spawn(async move |this, cx| {
2533 let load = project.update(cx, |project, cx| {
2534 let path = project
2535 .project_path_for_absolute_path(&path, cx)
2536 .ok_or_else(|| {
2537 acp::Error::resource_not_found(Some(path.display().to_string()))
2538 })?;
2539 Ok::<_, acp::Error>(project.open_buffer(path, cx))
2540 })?;
2541
2542 let buffer = load.await?;
2543
2544 let snapshot = if reuse_shared_snapshot {
2545 this.read_with(cx, |this, _| {
2546 this.shared_buffers.get(&buffer.clone()).cloned()
2547 })
2548 .log_err()
2549 .flatten()
2550 } else {
2551 None
2552 };
2553
2554 let snapshot = if let Some(snapshot) = snapshot {
2555 snapshot
2556 } else {
2557 action_log.update(cx, |action_log, cx| {
2558 action_log.buffer_read(buffer.clone(), cx);
2559 });
2560
2561 let snapshot = buffer.update(cx, |buffer, _| buffer.snapshot());
2562 this.update(cx, |this, _| {
2563 this.shared_buffers.insert(buffer.clone(), snapshot.clone());
2564 })?;
2565 snapshot
2566 };
2567
2568 let max_point = snapshot.max_point();
2569 let start_position = Point::new(line, 0);
2570
2571 if start_position > max_point {
2572 return Err(acp::Error::invalid_params().data(format!(
2573 "Attempting to read beyond the end of the file, line {}:{}",
2574 max_point.row + 1,
2575 max_point.column
2576 )));
2577 }
2578
2579 let start = snapshot.anchor_before(start_position);
2580 let end = snapshot.anchor_before(Point::new(line.saturating_add(limit), 0));
2581
2582 if should_update_agent_location {
2583 project.update(cx, |project, cx| {
2584 project.set_agent_location(
2585 Some(AgentLocation {
2586 buffer: buffer.downgrade(),
2587 position: start,
2588 }),
2589 cx,
2590 );
2591 });
2592 }
2593
2594 Ok(snapshot.text_for_range(start..end).collect::<String>())
2595 })
2596 }
2597
2598 pub fn write_text_file(
2599 &self,
2600 path: PathBuf,
2601 content: String,
2602 cx: &mut Context<Self>,
2603 ) -> Task<Result<()>> {
2604 let project = self.project.clone();
2605 let action_log = self.action_log.clone();
2606 let should_update_agent_location = self.parent_session_id.is_none();
2607 cx.spawn(async move |this, cx| {
2608 let load = project.update(cx, |project, cx| {
2609 let path = project
2610 .project_path_for_absolute_path(&path, cx)
2611 .context("invalid path")?;
2612 anyhow::Ok(project.open_buffer(path, cx))
2613 });
2614 let buffer = load?.await?;
2615 let snapshot = this.update(cx, |this, cx| {
2616 this.shared_buffers
2617 .get(&buffer)
2618 .cloned()
2619 .unwrap_or_else(|| buffer.read(cx).snapshot())
2620 })?;
2621 let edits = cx
2622 .background_executor()
2623 .spawn(async move {
2624 let old_text = snapshot.text();
2625 text_diff(old_text.as_str(), &content)
2626 .into_iter()
2627 .map(|(range, replacement)| {
2628 (snapshot.anchor_range_inside(range), replacement)
2629 })
2630 .collect::<Vec<_>>()
2631 })
2632 .await;
2633
2634 if should_update_agent_location {
2635 project.update(cx, |project, cx| {
2636 project.set_agent_location(
2637 Some(AgentLocation {
2638 buffer: buffer.downgrade(),
2639 position: edits
2640 .last()
2641 .map(|(range, _)| range.end)
2642 .unwrap_or(Anchor::min_for_buffer(buffer.read(cx).remote_id())),
2643 }),
2644 cx,
2645 );
2646 });
2647 }
2648
2649 let format_on_save = cx.update(|cx| {
2650 action_log.update(cx, |action_log, cx| {
2651 action_log.buffer_read(buffer.clone(), cx);
2652 });
2653
2654 let format_on_save = buffer.update(cx, |buffer, cx| {
2655 buffer.edit(edits, None, cx);
2656
2657 let settings =
2658 language::language_settings::LanguageSettings::for_buffer(buffer, cx);
2659
2660 settings.format_on_save != FormatOnSave::Off
2661 });
2662 action_log.update(cx, |action_log, cx| {
2663 action_log.buffer_edited(buffer.clone(), cx);
2664 });
2665 format_on_save
2666 });
2667
2668 if format_on_save {
2669 let format_task = project.update(cx, |project, cx| {
2670 project.format(
2671 HashSet::from_iter([buffer.clone()]),
2672 LspFormatTarget::Buffers,
2673 false,
2674 FormatTrigger::Save,
2675 cx,
2676 )
2677 });
2678 format_task.await.log_err();
2679
2680 action_log.update(cx, |action_log, cx| {
2681 action_log.buffer_edited(buffer.clone(), cx);
2682 });
2683 }
2684
2685 project
2686 .update(cx, |project, cx| project.save_buffer(buffer, cx))
2687 .await
2688 })
2689 }
2690
2691 pub fn create_terminal(
2692 &self,
2693 command: String,
2694 args: Vec<String>,
2695 extra_env: Vec<acp::EnvVariable>,
2696 cwd: Option<PathBuf>,
2697 output_byte_limit: Option<u64>,
2698 cx: &mut Context<Self>,
2699 ) -> Task<Result<Entity<Terminal>>> {
2700 let env = match &cwd {
2701 Some(dir) => self.project.update(cx, |project, cx| {
2702 project.environment().update(cx, |env, cx| {
2703 env.directory_environment(dir.as_path().into(), cx)
2704 })
2705 }),
2706 None => Task::ready(None).shared(),
2707 };
2708 let env = cx.spawn(async move |_, _| {
2709 let mut env = env.await.unwrap_or_default();
2710 // Disables paging for `git` and hopefully other commands
2711 env.insert("PAGER".into(), "".into());
2712 for var in extra_env {
2713 env.insert(var.name, var.value);
2714 }
2715 env
2716 });
2717
2718 let project = self.project.clone();
2719 let language_registry = project.read(cx).languages().clone();
2720 let is_windows = project.read(cx).path_style(cx).is_windows();
2721
2722 let terminal_id = acp::TerminalId::new(Uuid::new_v4().to_string());
2723 let terminal_task = cx.spawn({
2724 let terminal_id = terminal_id.clone();
2725 async move |_this, cx| {
2726 let env = env.await;
2727 let shell = project
2728 .update(cx, |project, cx| {
2729 project
2730 .remote_client()
2731 .and_then(|r| r.read(cx).default_system_shell())
2732 })
2733 .unwrap_or_else(|| get_default_system_shell_preferring_bash());
2734 let (task_command, task_args) =
2735 ShellBuilder::new(&Shell::Program(shell), is_windows)
2736 .redirect_stdin_to_dev_null()
2737 .build(Some(command.clone()), &args);
2738 let terminal = project
2739 .update(cx, |project, cx| {
2740 project.create_terminal_task(
2741 task::SpawnInTerminal {
2742 command: Some(task_command),
2743 args: task_args,
2744 cwd: cwd.clone(),
2745 env,
2746 ..Default::default()
2747 },
2748 cx,
2749 )
2750 })
2751 .await?;
2752
2753 anyhow::Ok(cx.new(|cx| {
2754 Terminal::new(
2755 terminal_id,
2756 &format!("{} {}", command, args.join(" ")),
2757 cwd,
2758 output_byte_limit.map(|l| l as usize),
2759 terminal,
2760 language_registry,
2761 cx,
2762 )
2763 }))
2764 }
2765 });
2766
2767 cx.spawn(async move |this, cx| {
2768 let terminal = terminal_task.await?;
2769 this.update(cx, |this, _cx| {
2770 this.terminals.insert(terminal_id, terminal.clone());
2771 terminal
2772 })
2773 })
2774 }
2775
2776 pub fn kill_terminal(
2777 &mut self,
2778 terminal_id: acp::TerminalId,
2779 cx: &mut Context<Self>,
2780 ) -> Result<()> {
2781 self.terminals
2782 .get(&terminal_id)
2783 .context("Terminal not found")?
2784 .update(cx, |terminal, cx| {
2785 terminal.kill(cx);
2786 });
2787
2788 Ok(())
2789 }
2790
2791 pub fn release_terminal(
2792 &mut self,
2793 terminal_id: acp::TerminalId,
2794 cx: &mut Context<Self>,
2795 ) -> Result<()> {
2796 self.terminals
2797 .remove(&terminal_id)
2798 .context("Terminal not found")?
2799 .update(cx, |terminal, cx| {
2800 terminal.kill(cx);
2801 });
2802
2803 Ok(())
2804 }
2805
2806 pub fn terminal(&self, terminal_id: acp::TerminalId) -> Result<Entity<Terminal>> {
2807 self.terminals
2808 .get(&terminal_id)
2809 .context("Terminal not found")
2810 .cloned()
2811 }
2812
2813 pub fn to_markdown(&self, cx: &App) -> String {
2814 self.entries.iter().map(|e| e.to_markdown(cx)).collect()
2815 }
2816
2817 pub fn emit_load_error(&mut self, error: LoadError, cx: &mut Context<Self>) {
2818 cx.emit(AcpThreadEvent::LoadError(error));
2819 }
2820
2821 pub fn register_terminal_created(
2822 &mut self,
2823 terminal_id: acp::TerminalId,
2824 command_label: String,
2825 working_dir: Option<PathBuf>,
2826 output_byte_limit: Option<u64>,
2827 terminal: Entity<::terminal::Terminal>,
2828 cx: &mut Context<Self>,
2829 ) -> Entity<Terminal> {
2830 let language_registry = self.project.read(cx).languages().clone();
2831
2832 let entity = cx.new(|cx| {
2833 Terminal::new(
2834 terminal_id.clone(),
2835 &command_label,
2836 working_dir.clone(),
2837 output_byte_limit.map(|l| l as usize),
2838 terminal,
2839 language_registry,
2840 cx,
2841 )
2842 });
2843 self.terminals.insert(terminal_id.clone(), entity.clone());
2844 entity
2845 }
2846
2847 pub fn mark_as_subagent_output(&mut self, cx: &mut Context<Self>) {
2848 for entry in self.entries.iter_mut().rev() {
2849 if let AgentThreadEntry::AssistantMessage(assistant_message) = entry {
2850 assistant_message.is_subagent_output = true;
2851 cx.notify();
2852 return;
2853 }
2854 }
2855 }
2856
2857 pub fn on_terminal_provider_event(
2858 &mut self,
2859 event: TerminalProviderEvent,
2860 cx: &mut Context<Self>,
2861 ) {
2862 match event {
2863 TerminalProviderEvent::Created {
2864 terminal_id,
2865 label,
2866 cwd,
2867 output_byte_limit,
2868 terminal,
2869 } => {
2870 let entity = self.register_terminal_created(
2871 terminal_id.clone(),
2872 label,
2873 cwd,
2874 output_byte_limit,
2875 terminal,
2876 cx,
2877 );
2878
2879 if let Some(mut chunks) = self.pending_terminal_output.remove(&terminal_id) {
2880 for data in chunks.drain(..) {
2881 entity.update(cx, |term, cx| {
2882 term.inner().update(cx, |inner, cx| {
2883 inner.write_output(&data, cx);
2884 })
2885 });
2886 }
2887 }
2888
2889 if let Some(_status) = self.pending_terminal_exit.remove(&terminal_id) {
2890 entity.update(cx, |_term, cx| {
2891 cx.notify();
2892 });
2893 }
2894
2895 cx.notify();
2896 }
2897 TerminalProviderEvent::Output { terminal_id, data } => {
2898 if let Some(entity) = self.terminals.get(&terminal_id) {
2899 entity.update(cx, |term, cx| {
2900 term.inner().update(cx, |inner, cx| {
2901 inner.write_output(&data, cx);
2902 })
2903 });
2904 } else {
2905 self.pending_terminal_output
2906 .entry(terminal_id)
2907 .or_default()
2908 .push(data);
2909 }
2910 }
2911 TerminalProviderEvent::TitleChanged { terminal_id, title } => {
2912 if let Some(entity) = self.terminals.get(&terminal_id) {
2913 entity.update(cx, |term, cx| {
2914 term.inner().update(cx, |inner, cx| {
2915 inner.breadcrumb_text = title;
2916 cx.emit(::terminal::Event::BreadcrumbsChanged);
2917 })
2918 });
2919 }
2920 }
2921 TerminalProviderEvent::Exit {
2922 terminal_id,
2923 status,
2924 } => {
2925 if let Some(entity) = self.terminals.get(&terminal_id) {
2926 entity.update(cx, |_term, cx| {
2927 cx.notify();
2928 });
2929 } else {
2930 self.pending_terminal_exit.insert(terminal_id, status);
2931 }
2932 }
2933 }
2934 }
2935}
2936
2937fn markdown_for_raw_output(
2938 raw_output: &serde_json::Value,
2939 language_registry: &Arc<LanguageRegistry>,
2940 cx: &mut App,
2941) -> Option<Entity<Markdown>> {
2942 match raw_output {
2943 serde_json::Value::Null => None,
2944 serde_json::Value::Bool(value) => Some(cx.new(|cx| {
2945 Markdown::new(
2946 value.to_string().into(),
2947 Some(language_registry.clone()),
2948 None,
2949 cx,
2950 )
2951 })),
2952 serde_json::Value::Number(value) => Some(cx.new(|cx| {
2953 Markdown::new(
2954 value.to_string().into(),
2955 Some(language_registry.clone()),
2956 None,
2957 cx,
2958 )
2959 })),
2960 serde_json::Value::String(value) => Some(cx.new(|cx| {
2961 Markdown::new(
2962 value.clone().into(),
2963 Some(language_registry.clone()),
2964 None,
2965 cx,
2966 )
2967 })),
2968 value => Some(cx.new(|cx| {
2969 let pretty_json = to_string_pretty(value).unwrap_or_else(|_| value.to_string());
2970
2971 Markdown::new(
2972 format!("```json\n{}\n```", pretty_json).into(),
2973 Some(language_registry.clone()),
2974 None,
2975 cx,
2976 )
2977 })),
2978 }
2979}
2980
2981#[cfg(test)]
2982mod tests {
2983 use super::*;
2984 use anyhow::anyhow;
2985 use futures::{channel::mpsc, future::LocalBoxFuture, select};
2986 use gpui::{App, AsyncApp, TestAppContext, WeakEntity};
2987 use indoc::indoc;
2988 use project::{AgentId, FakeFs, Fs};
2989 use rand::{distr, prelude::*};
2990 use serde_json::json;
2991 use settings::SettingsStore;
2992 use smol::stream::StreamExt as _;
2993 use std::{
2994 any::Any,
2995 cell::RefCell,
2996 path::Path,
2997 rc::Rc,
2998 sync::atomic::{AtomicBool, AtomicUsize, Ordering::SeqCst},
2999 time::Duration,
3000 };
3001 use util::{path, path_list::PathList};
3002
3003 fn init_test(cx: &mut TestAppContext) {
3004 env_logger::try_init().ok();
3005 cx.update(|cx| {
3006 let settings_store = SettingsStore::test(cx);
3007 cx.set_global(settings_store);
3008 });
3009 }
3010
3011 #[gpui::test]
3012 async fn test_terminal_output_buffered_before_created_renders(cx: &mut gpui::TestAppContext) {
3013 init_test(cx);
3014
3015 let fs = FakeFs::new(cx.executor());
3016 let project = Project::test(fs, [], cx).await;
3017 let connection = Rc::new(FakeAgentConnection::new());
3018 let thread = cx
3019 .update(|cx| {
3020 connection.new_session(
3021 project,
3022 PathList::new(&[std::path::Path::new(path!("/test"))]),
3023 cx,
3024 )
3025 })
3026 .await
3027 .unwrap();
3028
3029 let terminal_id = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
3030
3031 // Send Output BEFORE Created - should be buffered by acp_thread
3032 thread.update(cx, |thread, cx| {
3033 thread.on_terminal_provider_event(
3034 TerminalProviderEvent::Output {
3035 terminal_id: terminal_id.clone(),
3036 data: b"hello buffered".to_vec(),
3037 },
3038 cx,
3039 );
3040 });
3041
3042 // Create a display-only terminal and then send Created
3043 let lower = cx.new(|cx| {
3044 let builder = ::terminal::TerminalBuilder::new_display_only(
3045 ::terminal::terminal_settings::CursorShape::default(),
3046 ::terminal::terminal_settings::AlternateScroll::On,
3047 None,
3048 0,
3049 cx.background_executor(),
3050 PathStyle::local(),
3051 )
3052 .unwrap();
3053 builder.subscribe(cx)
3054 });
3055
3056 thread.update(cx, |thread, cx| {
3057 thread.on_terminal_provider_event(
3058 TerminalProviderEvent::Created {
3059 terminal_id: terminal_id.clone(),
3060 label: "Buffered Test".to_string(),
3061 cwd: None,
3062 output_byte_limit: None,
3063 terminal: lower.clone(),
3064 },
3065 cx,
3066 );
3067 });
3068
3069 // After Created, buffered Output should have been flushed into the renderer
3070 let content = thread.read_with(cx, |thread, cx| {
3071 let term = thread.terminal(terminal_id.clone()).unwrap();
3072 term.read_with(cx, |t, cx| t.inner().read(cx).get_content())
3073 });
3074
3075 assert!(
3076 content.contains("hello buffered"),
3077 "expected buffered output to render, got: {content}"
3078 );
3079 }
3080
3081 #[gpui::test]
3082 async fn test_terminal_output_and_exit_buffered_before_created(cx: &mut gpui::TestAppContext) {
3083 init_test(cx);
3084
3085 let fs = FakeFs::new(cx.executor());
3086 let project = Project::test(fs, [], cx).await;
3087 let connection = Rc::new(FakeAgentConnection::new());
3088 let thread = cx
3089 .update(|cx| {
3090 connection.new_session(
3091 project,
3092 PathList::new(&[std::path::Path::new(path!("/test"))]),
3093 cx,
3094 )
3095 })
3096 .await
3097 .unwrap();
3098
3099 let terminal_id = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
3100
3101 // Send Output BEFORE Created
3102 thread.update(cx, |thread, cx| {
3103 thread.on_terminal_provider_event(
3104 TerminalProviderEvent::Output {
3105 terminal_id: terminal_id.clone(),
3106 data: b"pre-exit data".to_vec(),
3107 },
3108 cx,
3109 );
3110 });
3111
3112 // Send Exit BEFORE Created
3113 thread.update(cx, |thread, cx| {
3114 thread.on_terminal_provider_event(
3115 TerminalProviderEvent::Exit {
3116 terminal_id: terminal_id.clone(),
3117 status: acp::TerminalExitStatus::new().exit_code(0),
3118 },
3119 cx,
3120 );
3121 });
3122
3123 // Now create a display-only lower-level terminal and send Created
3124 let lower = cx.new(|cx| {
3125 let builder = ::terminal::TerminalBuilder::new_display_only(
3126 ::terminal::terminal_settings::CursorShape::default(),
3127 ::terminal::terminal_settings::AlternateScroll::On,
3128 None,
3129 0,
3130 cx.background_executor(),
3131 PathStyle::local(),
3132 )
3133 .unwrap();
3134 builder.subscribe(cx)
3135 });
3136
3137 thread.update(cx, |thread, cx| {
3138 thread.on_terminal_provider_event(
3139 TerminalProviderEvent::Created {
3140 terminal_id: terminal_id.clone(),
3141 label: "Buffered Exit Test".to_string(),
3142 cwd: None,
3143 output_byte_limit: None,
3144 terminal: lower.clone(),
3145 },
3146 cx,
3147 );
3148 });
3149
3150 // Output should be present after Created (flushed from buffer)
3151 let content = thread.read_with(cx, |thread, cx| {
3152 let term = thread.terminal(terminal_id.clone()).unwrap();
3153 term.read_with(cx, |t, cx| t.inner().read(cx).get_content())
3154 });
3155
3156 assert!(
3157 content.contains("pre-exit data"),
3158 "expected pre-exit data to render, got: {content}"
3159 );
3160 }
3161
3162 /// Test that killing a terminal via Terminal::kill properly:
3163 /// 1. Causes wait_for_exit to complete (doesn't hang forever)
3164 /// 2. The underlying terminal still has the output that was written before the kill
3165 ///
3166 /// This test verifies that the fix to kill_active_task (which now also kills
3167 /// the shell process in addition to the foreground process) properly allows
3168 /// wait_for_exit to complete instead of hanging indefinitely.
3169 #[cfg(unix)]
3170 #[gpui::test]
3171 async fn test_terminal_kill_allows_wait_for_exit_to_complete(cx: &mut gpui::TestAppContext) {
3172 use std::collections::HashMap;
3173 use task::Shell;
3174 use util::shell_builder::ShellBuilder;
3175
3176 init_test(cx);
3177 cx.executor().allow_parking();
3178
3179 let fs = FakeFs::new(cx.executor());
3180 let project = Project::test(fs, [], cx).await;
3181 let connection = Rc::new(FakeAgentConnection::new());
3182 let thread = cx
3183 .update(|cx| {
3184 connection.new_session(
3185 project.clone(),
3186 PathList::new(&[Path::new(path!("/test"))]),
3187 cx,
3188 )
3189 })
3190 .await
3191 .unwrap();
3192
3193 let terminal_id = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
3194
3195 // Create a real PTY terminal that runs a command which prints output then sleeps
3196 // We use printf instead of echo and chain with && sleep to ensure proper execution
3197 let (completion_tx, _completion_rx) = smol::channel::unbounded();
3198 let (program, args) = ShellBuilder::new(&Shell::System, false).build(
3199 Some("printf 'output_before_kill\\n' && sleep 60".to_owned()),
3200 &[],
3201 );
3202
3203 let builder = cx
3204 .update(|cx| {
3205 ::terminal::TerminalBuilder::new(
3206 None,
3207 None,
3208 task::Shell::WithArguments {
3209 program,
3210 args,
3211 title_override: None,
3212 },
3213 HashMap::default(),
3214 ::terminal::terminal_settings::CursorShape::default(),
3215 ::terminal::terminal_settings::AlternateScroll::On,
3216 None,
3217 vec![],
3218 0,
3219 false,
3220 0,
3221 Some(completion_tx),
3222 cx,
3223 vec![],
3224 PathStyle::local(),
3225 )
3226 })
3227 .await
3228 .unwrap();
3229
3230 let lower_terminal = cx.new(|cx| builder.subscribe(cx));
3231
3232 // Create the acp_thread Terminal wrapper
3233 thread.update(cx, |thread, cx| {
3234 thread.on_terminal_provider_event(
3235 TerminalProviderEvent::Created {
3236 terminal_id: terminal_id.clone(),
3237 label: "printf output_before_kill && sleep 60".to_string(),
3238 cwd: None,
3239 output_byte_limit: None,
3240 terminal: lower_terminal.clone(),
3241 },
3242 cx,
3243 );
3244 });
3245
3246 // Poll until the printf command produces output, rather than using a
3247 // fixed sleep which is flaky on loaded machines.
3248 let deadline = std::time::Instant::now() + Duration::from_secs(10);
3249 loop {
3250 let has_output = thread.read_with(cx, |thread, cx| {
3251 let term = thread
3252 .terminals
3253 .get(&terminal_id)
3254 .expect("terminal not found");
3255 let content = term.read(cx).inner().read(cx).get_content();
3256 content.contains("output_before_kill")
3257 });
3258 if has_output {
3259 break;
3260 }
3261 assert!(
3262 std::time::Instant::now() < deadline,
3263 "Timed out waiting for printf output to appear in terminal",
3264 );
3265 cx.executor().timer(Duration::from_millis(50)).await;
3266 }
3267
3268 // Get the acp_thread Terminal and kill it
3269 let wait_for_exit = thread.update(cx, |thread, cx| {
3270 let term = thread.terminals.get(&terminal_id).unwrap();
3271 let wait_for_exit = term.read(cx).wait_for_exit();
3272 term.update(cx, |term, cx| {
3273 term.kill(cx);
3274 });
3275 wait_for_exit
3276 });
3277
3278 // KEY ASSERTION: wait_for_exit should complete within a reasonable time (not hang).
3279 // Before the fix to kill_active_task, this would hang forever because
3280 // only the foreground process was killed, not the shell, so the PTY
3281 // child never exited and wait_for_completed_task never completed.
3282 let exit_result = futures::select! {
3283 result = futures::FutureExt::fuse(wait_for_exit) => Some(result),
3284 _ = futures::FutureExt::fuse(cx.background_executor.timer(Duration::from_secs(5))) => None,
3285 };
3286
3287 assert!(
3288 exit_result.is_some(),
3289 "wait_for_exit should complete after kill, but it timed out. \
3290 This indicates kill_active_task is not properly killing the shell process."
3291 );
3292
3293 // Give the system a chance to process any pending updates
3294 cx.run_until_parked();
3295
3296 // Verify that the underlying terminal still has the output that was
3297 // written before the kill. This verifies that killing doesn't lose output.
3298 let inner_content = thread.read_with(cx, |thread, cx| {
3299 let term = thread.terminals.get(&terminal_id).unwrap();
3300 term.read(cx).inner().read(cx).get_content()
3301 });
3302
3303 assert!(
3304 inner_content.contains("output_before_kill"),
3305 "Underlying terminal should contain output from before kill, got: {}",
3306 inner_content
3307 );
3308 }
3309
3310 #[gpui::test]
3311 async fn test_push_user_content_block(cx: &mut gpui::TestAppContext) {
3312 init_test(cx);
3313
3314 let fs = FakeFs::new(cx.executor());
3315 let project = Project::test(fs, [], cx).await;
3316 let connection = Rc::new(FakeAgentConnection::new());
3317 let thread = cx
3318 .update(|cx| {
3319 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
3320 })
3321 .await
3322 .unwrap();
3323
3324 // Test creating a new user message
3325 thread.update(cx, |thread, cx| {
3326 thread.push_user_content_block(None, "Hello, ".into(), cx);
3327 });
3328
3329 thread.update(cx, |thread, cx| {
3330 assert_eq!(thread.entries.len(), 1);
3331 if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
3332 assert_eq!(user_msg.id, None);
3333 assert_eq!(user_msg.content.to_markdown(cx), "Hello, ");
3334 } else {
3335 panic!("Expected UserMessage");
3336 }
3337 });
3338
3339 // Test appending to existing user message
3340 let message_1_id = UserMessageId::new();
3341 thread.update(cx, |thread, cx| {
3342 thread.push_user_content_block(Some(message_1_id.clone()), "world!".into(), cx);
3343 });
3344
3345 thread.update(cx, |thread, cx| {
3346 assert_eq!(thread.entries.len(), 1);
3347 if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
3348 assert_eq!(user_msg.id, Some(message_1_id));
3349 assert_eq!(user_msg.content.to_markdown(cx), "Hello, world!");
3350 } else {
3351 panic!("Expected UserMessage");
3352 }
3353 });
3354
3355 // Test creating new user message after assistant message
3356 thread.update(cx, |thread, cx| {
3357 thread.push_assistant_content_block("Assistant response".into(), false, cx);
3358 });
3359
3360 let message_2_id = UserMessageId::new();
3361 thread.update(cx, |thread, cx| {
3362 thread.push_user_content_block(
3363 Some(message_2_id.clone()),
3364 "New user message".into(),
3365 cx,
3366 );
3367 });
3368
3369 thread.update(cx, |thread, cx| {
3370 assert_eq!(thread.entries.len(), 3);
3371 if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[2] {
3372 assert_eq!(user_msg.id, Some(message_2_id));
3373 assert_eq!(user_msg.content.to_markdown(cx), "New user message");
3374 } else {
3375 panic!("Expected UserMessage at index 2");
3376 }
3377 });
3378 }
3379
3380 #[gpui::test]
3381 async fn test_thinking_concatenation(cx: &mut gpui::TestAppContext) {
3382 init_test(cx);
3383
3384 let fs = FakeFs::new(cx.executor());
3385 let project = Project::test(fs, [], cx).await;
3386 let connection = Rc::new(FakeAgentConnection::new().on_user_message(
3387 |_, thread, mut cx| {
3388 async move {
3389 thread.update(&mut cx, |thread, cx| {
3390 thread
3391 .handle_session_update(
3392 acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk::new(
3393 "Thinking ".into(),
3394 )),
3395 cx,
3396 )
3397 .unwrap();
3398 thread
3399 .handle_session_update(
3400 acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk::new(
3401 "hard!".into(),
3402 )),
3403 cx,
3404 )
3405 .unwrap();
3406 })?;
3407 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3408 }
3409 .boxed_local()
3410 },
3411 ));
3412
3413 let thread = cx
3414 .update(|cx| {
3415 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
3416 })
3417 .await
3418 .unwrap();
3419
3420 thread
3421 .update(cx, |thread, cx| thread.send_raw("Hello from Zed!", cx))
3422 .await
3423 .unwrap();
3424
3425 let output = thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
3426 assert_eq!(
3427 output,
3428 indoc! {r#"
3429 ## User
3430
3431 Hello from Zed!
3432
3433 ## Assistant
3434
3435 <thinking>
3436 Thinking hard!
3437 </thinking>
3438
3439 "#}
3440 );
3441 }
3442
3443 #[gpui::test]
3444 async fn test_edits_concurrently_to_user(cx: &mut TestAppContext) {
3445 init_test(cx);
3446
3447 let fs = FakeFs::new(cx.executor());
3448 fs.insert_tree(path!("/tmp"), json!({"foo": "one\ntwo\nthree\n"}))
3449 .await;
3450 let project = Project::test(fs.clone(), [], cx).await;
3451 let (read_file_tx, read_file_rx) = oneshot::channel::<()>();
3452 let read_file_tx = Rc::new(RefCell::new(Some(read_file_tx)));
3453 let connection = Rc::new(FakeAgentConnection::new().on_user_message(
3454 move |_, thread, mut cx| {
3455 let read_file_tx = read_file_tx.clone();
3456 async move {
3457 let content = thread
3458 .update(&mut cx, |thread, cx| {
3459 thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
3460 })
3461 .unwrap()
3462 .await
3463 .unwrap();
3464 assert_eq!(content, "one\ntwo\nthree\n");
3465 read_file_tx.take().unwrap().send(()).unwrap();
3466 thread
3467 .update(&mut cx, |thread, cx| {
3468 thread.write_text_file(
3469 path!("/tmp/foo").into(),
3470 "one\ntwo\nthree\nfour\nfive\n".to_string(),
3471 cx,
3472 )
3473 })
3474 .unwrap()
3475 .await
3476 .unwrap();
3477 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3478 }
3479 .boxed_local()
3480 },
3481 ));
3482
3483 let (worktree, pathbuf) = project
3484 .update(cx, |project, cx| {
3485 project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
3486 })
3487 .await
3488 .unwrap();
3489 let buffer = project
3490 .update(cx, |project, cx| {
3491 project.open_buffer((worktree.read(cx).id(), pathbuf), cx)
3492 })
3493 .await
3494 .unwrap();
3495
3496 let thread = cx
3497 .update(|cx| {
3498 connection.new_session(project, PathList::new(&[Path::new(path!("/tmp"))]), cx)
3499 })
3500 .await
3501 .unwrap();
3502
3503 let request = thread.update(cx, |thread, cx| {
3504 thread.send_raw("Extend the count in /tmp/foo", cx)
3505 });
3506 read_file_rx.await.ok();
3507 buffer.update(cx, |buffer, cx| {
3508 buffer.edit([(0..0, "zero\n".to_string())], None, cx);
3509 });
3510 cx.run_until_parked();
3511 assert_eq!(
3512 buffer.read_with(cx, |buffer, _| buffer.text()),
3513 "zero\none\ntwo\nthree\nfour\nfive\n"
3514 );
3515 assert_eq!(
3516 String::from_utf8(fs.read_file_sync(path!("/tmp/foo")).unwrap()).unwrap(),
3517 "zero\none\ntwo\nthree\nfour\nfive\n"
3518 );
3519 request.await.unwrap();
3520 }
3521
3522 #[gpui::test]
3523 async fn test_reading_from_line(cx: &mut TestAppContext) {
3524 init_test(cx);
3525
3526 let fs = FakeFs::new(cx.executor());
3527 fs.insert_tree(path!("/tmp"), json!({"foo": "one\ntwo\nthree\nfour\n"}))
3528 .await;
3529 let project = Project::test(fs.clone(), [], cx).await;
3530 project
3531 .update(cx, |project, cx| {
3532 project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
3533 })
3534 .await
3535 .unwrap();
3536
3537 let connection = Rc::new(FakeAgentConnection::new());
3538
3539 let thread = cx
3540 .update(|cx| {
3541 connection.new_session(project, PathList::new(&[Path::new(path!("/tmp"))]), cx)
3542 })
3543 .await
3544 .unwrap();
3545
3546 // Whole file
3547 let content = thread
3548 .update(cx, |thread, cx| {
3549 thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
3550 })
3551 .await
3552 .unwrap();
3553
3554 assert_eq!(content, "one\ntwo\nthree\nfour\n");
3555
3556 // Only start line
3557 let content = thread
3558 .update(cx, |thread, cx| {
3559 thread.read_text_file(path!("/tmp/foo").into(), Some(3), None, false, cx)
3560 })
3561 .await
3562 .unwrap();
3563
3564 assert_eq!(content, "three\nfour\n");
3565
3566 // Only limit
3567 let content = thread
3568 .update(cx, |thread, cx| {
3569 thread.read_text_file(path!("/tmp/foo").into(), None, Some(2), false, cx)
3570 })
3571 .await
3572 .unwrap();
3573
3574 assert_eq!(content, "one\ntwo\n");
3575
3576 // Range
3577 let content = thread
3578 .update(cx, |thread, cx| {
3579 thread.read_text_file(path!("/tmp/foo").into(), Some(2), Some(2), false, cx)
3580 })
3581 .await
3582 .unwrap();
3583
3584 assert_eq!(content, "two\nthree\n");
3585
3586 // Invalid
3587 let err = thread
3588 .update(cx, |thread, cx| {
3589 thread.read_text_file(path!("/tmp/foo").into(), Some(6), Some(2), false, cx)
3590 })
3591 .await
3592 .unwrap_err();
3593
3594 assert_eq!(
3595 err.to_string(),
3596 "Invalid params: \"Attempting to read beyond the end of the file, line 5:0\""
3597 );
3598 }
3599
3600 #[gpui::test]
3601 async fn test_reading_empty_file(cx: &mut TestAppContext) {
3602 init_test(cx);
3603
3604 let fs = FakeFs::new(cx.executor());
3605 fs.insert_tree(path!("/tmp"), json!({"foo": ""})).await;
3606 let project = Project::test(fs.clone(), [], cx).await;
3607 project
3608 .update(cx, |project, cx| {
3609 project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
3610 })
3611 .await
3612 .unwrap();
3613
3614 let connection = Rc::new(FakeAgentConnection::new());
3615
3616 let thread = cx
3617 .update(|cx| {
3618 connection.new_session(project, PathList::new(&[Path::new(path!("/tmp"))]), cx)
3619 })
3620 .await
3621 .unwrap();
3622
3623 // Whole file
3624 let content = thread
3625 .update(cx, |thread, cx| {
3626 thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
3627 })
3628 .await
3629 .unwrap();
3630
3631 assert_eq!(content, "");
3632
3633 // Only start line
3634 let content = thread
3635 .update(cx, |thread, cx| {
3636 thread.read_text_file(path!("/tmp/foo").into(), Some(1), None, false, cx)
3637 })
3638 .await
3639 .unwrap();
3640
3641 assert_eq!(content, "");
3642
3643 // Only limit
3644 let content = thread
3645 .update(cx, |thread, cx| {
3646 thread.read_text_file(path!("/tmp/foo").into(), None, Some(2), false, cx)
3647 })
3648 .await
3649 .unwrap();
3650
3651 assert_eq!(content, "");
3652
3653 // Range
3654 let content = thread
3655 .update(cx, |thread, cx| {
3656 thread.read_text_file(path!("/tmp/foo").into(), Some(1), Some(1), false, cx)
3657 })
3658 .await
3659 .unwrap();
3660
3661 assert_eq!(content, "");
3662
3663 // Invalid
3664 let err = thread
3665 .update(cx, |thread, cx| {
3666 thread.read_text_file(path!("/tmp/foo").into(), Some(5), Some(2), false, cx)
3667 })
3668 .await
3669 .unwrap_err();
3670
3671 assert_eq!(
3672 err.to_string(),
3673 "Invalid params: \"Attempting to read beyond the end of the file, line 1:0\""
3674 );
3675 }
3676 #[gpui::test]
3677 async fn test_reading_non_existing_file(cx: &mut TestAppContext) {
3678 init_test(cx);
3679
3680 let fs = FakeFs::new(cx.executor());
3681 fs.insert_tree(path!("/tmp"), json!({})).await;
3682 let project = Project::test(fs.clone(), [], cx).await;
3683 project
3684 .update(cx, |project, cx| {
3685 project.find_or_create_worktree(path!("/tmp"), true, cx)
3686 })
3687 .await
3688 .unwrap();
3689
3690 let connection = Rc::new(FakeAgentConnection::new());
3691
3692 let thread = cx
3693 .update(|cx| {
3694 connection.new_session(project, PathList::new(&[Path::new(path!("/tmp"))]), cx)
3695 })
3696 .await
3697 .unwrap();
3698
3699 // Out of project file
3700 let err = thread
3701 .update(cx, |thread, cx| {
3702 thread.read_text_file(path!("/foo").into(), None, None, false, cx)
3703 })
3704 .await
3705 .unwrap_err();
3706
3707 assert_eq!(err.code, acp::ErrorCode::ResourceNotFound);
3708 }
3709
3710 #[gpui::test]
3711 async fn test_succeeding_canceled_toolcall(cx: &mut TestAppContext) {
3712 init_test(cx);
3713
3714 let fs = FakeFs::new(cx.executor());
3715 let project = Project::test(fs, [], cx).await;
3716 let id = acp::ToolCallId::new("test");
3717
3718 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3719 let id = id.clone();
3720 move |_, thread, mut cx| {
3721 let id = id.clone();
3722 async move {
3723 thread
3724 .update(&mut cx, |thread, cx| {
3725 thread.handle_session_update(
3726 acp::SessionUpdate::ToolCall(
3727 acp::ToolCall::new(id.clone(), "Label")
3728 .kind(acp::ToolKind::Fetch)
3729 .status(acp::ToolCallStatus::InProgress),
3730 ),
3731 cx,
3732 )
3733 })
3734 .unwrap()
3735 .unwrap();
3736 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3737 }
3738 .boxed_local()
3739 }
3740 }));
3741
3742 let thread = cx
3743 .update(|cx| {
3744 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
3745 })
3746 .await
3747 .unwrap();
3748
3749 let request = thread.update(cx, |thread, cx| {
3750 thread.send_raw("Fetch https://example.com", cx)
3751 });
3752
3753 run_until_first_tool_call(&thread, cx).await;
3754
3755 thread.read_with(cx, |thread, _| {
3756 assert!(matches!(
3757 thread.entries[1],
3758 AgentThreadEntry::ToolCall(ToolCall {
3759 status: ToolCallStatus::InProgress,
3760 ..
3761 })
3762 ));
3763 });
3764
3765 thread.update(cx, |thread, cx| thread.cancel(cx)).await;
3766
3767 thread.read_with(cx, |thread, _| {
3768 assert!(matches!(
3769 &thread.entries[1],
3770 AgentThreadEntry::ToolCall(ToolCall {
3771 status: ToolCallStatus::Canceled,
3772 ..
3773 })
3774 ));
3775 });
3776
3777 thread
3778 .update(cx, |thread, cx| {
3779 thread.handle_session_update(
3780 acp::SessionUpdate::ToolCallUpdate(acp::ToolCallUpdate::new(
3781 id,
3782 acp::ToolCallUpdateFields::new().status(acp::ToolCallStatus::Completed),
3783 )),
3784 cx,
3785 )
3786 })
3787 .unwrap();
3788
3789 request.await.unwrap();
3790
3791 thread.read_with(cx, |thread, _| {
3792 assert!(matches!(
3793 thread.entries[1],
3794 AgentThreadEntry::ToolCall(ToolCall {
3795 status: ToolCallStatus::Completed,
3796 ..
3797 })
3798 ));
3799 });
3800 }
3801
3802 #[gpui::test]
3803 async fn test_no_pending_edits_if_tool_calls_are_completed(cx: &mut TestAppContext) {
3804 init_test(cx);
3805 let fs = FakeFs::new(cx.background_executor.clone());
3806 fs.insert_tree(path!("/test"), json!({})).await;
3807 let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
3808
3809 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3810 move |_, thread, mut cx| {
3811 async move {
3812 thread
3813 .update(&mut cx, |thread, cx| {
3814 thread.handle_session_update(
3815 acp::SessionUpdate::ToolCall(
3816 acp::ToolCall::new("test", "Label")
3817 .kind(acp::ToolKind::Edit)
3818 .status(acp::ToolCallStatus::Completed)
3819 .content(vec![acp::ToolCallContent::Diff(acp::Diff::new(
3820 "/test/test.txt",
3821 "foo",
3822 ))]),
3823 ),
3824 cx,
3825 )
3826 })
3827 .unwrap()
3828 .unwrap();
3829 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3830 }
3831 .boxed_local()
3832 }
3833 }));
3834
3835 let thread = cx
3836 .update(|cx| {
3837 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
3838 })
3839 .await
3840 .unwrap();
3841
3842 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Hi".into()], cx)))
3843 .await
3844 .unwrap();
3845
3846 assert!(cx.read(|cx| !thread.read(cx).has_pending_edit_tool_calls()));
3847 }
3848
3849 #[gpui::test(iterations = 10)]
3850 async fn test_checkpoints(cx: &mut TestAppContext) {
3851 init_test(cx);
3852 let fs = FakeFs::new(cx.background_executor.clone());
3853 fs.insert_tree(
3854 path!("/test"),
3855 json!({
3856 ".git": {}
3857 }),
3858 )
3859 .await;
3860 let project = Project::test(fs.clone(), [path!("/test").as_ref()], cx).await;
3861
3862 let simulate_changes = Arc::new(AtomicBool::new(true));
3863 let next_filename = Arc::new(AtomicUsize::new(0));
3864 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3865 let simulate_changes = simulate_changes.clone();
3866 let next_filename = next_filename.clone();
3867 let fs = fs.clone();
3868 move |request, thread, mut cx| {
3869 let fs = fs.clone();
3870 let simulate_changes = simulate_changes.clone();
3871 let next_filename = next_filename.clone();
3872 async move {
3873 if simulate_changes.load(SeqCst) {
3874 let filename = format!("/test/file-{}", next_filename.fetch_add(1, SeqCst));
3875 fs.write(Path::new(&filename), b"").await?;
3876 }
3877
3878 let acp::ContentBlock::Text(content) = &request.prompt[0] else {
3879 panic!("expected text content block");
3880 };
3881 thread.update(&mut cx, |thread, cx| {
3882 thread
3883 .handle_session_update(
3884 acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk::new(
3885 content.text.to_uppercase().into(),
3886 )),
3887 cx,
3888 )
3889 .unwrap();
3890 })?;
3891 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3892 }
3893 .boxed_local()
3894 }
3895 }));
3896 let thread = cx
3897 .update(|cx| {
3898 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
3899 })
3900 .await
3901 .unwrap();
3902
3903 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Lorem".into()], cx)))
3904 .await
3905 .unwrap();
3906 thread.read_with(cx, |thread, cx| {
3907 assert_eq!(
3908 thread.to_markdown(cx),
3909 indoc! {"
3910 ## User (checkpoint)
3911
3912 Lorem
3913
3914 ## Assistant
3915
3916 LOREM
3917
3918 "}
3919 );
3920 });
3921 assert_eq!(fs.files(), vec![Path::new(path!("/test/file-0"))]);
3922
3923 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["ipsum".into()], cx)))
3924 .await
3925 .unwrap();
3926 thread.read_with(cx, |thread, cx| {
3927 assert_eq!(
3928 thread.to_markdown(cx),
3929 indoc! {"
3930 ## User (checkpoint)
3931
3932 Lorem
3933
3934 ## Assistant
3935
3936 LOREM
3937
3938 ## User (checkpoint)
3939
3940 ipsum
3941
3942 ## Assistant
3943
3944 IPSUM
3945
3946 "}
3947 );
3948 });
3949 assert_eq!(
3950 fs.files(),
3951 vec![
3952 Path::new(path!("/test/file-0")),
3953 Path::new(path!("/test/file-1"))
3954 ]
3955 );
3956
3957 // Checkpoint isn't stored when there are no changes.
3958 simulate_changes.store(false, SeqCst);
3959 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["dolor".into()], cx)))
3960 .await
3961 .unwrap();
3962 thread.read_with(cx, |thread, cx| {
3963 assert_eq!(
3964 thread.to_markdown(cx),
3965 indoc! {"
3966 ## User (checkpoint)
3967
3968 Lorem
3969
3970 ## Assistant
3971
3972 LOREM
3973
3974 ## User (checkpoint)
3975
3976 ipsum
3977
3978 ## Assistant
3979
3980 IPSUM
3981
3982 ## User
3983
3984 dolor
3985
3986 ## Assistant
3987
3988 DOLOR
3989
3990 "}
3991 );
3992 });
3993 assert_eq!(
3994 fs.files(),
3995 vec![
3996 Path::new(path!("/test/file-0")),
3997 Path::new(path!("/test/file-1"))
3998 ]
3999 );
4000
4001 // Rewinding the conversation truncates the history and restores the checkpoint.
4002 thread
4003 .update(cx, |thread, cx| {
4004 let AgentThreadEntry::UserMessage(message) = &thread.entries[2] else {
4005 panic!("unexpected entries {:?}", thread.entries)
4006 };
4007 thread.restore_checkpoint(message.id.clone().unwrap(), cx)
4008 })
4009 .await
4010 .unwrap();
4011 thread.read_with(cx, |thread, cx| {
4012 assert_eq!(
4013 thread.to_markdown(cx),
4014 indoc! {"
4015 ## User (checkpoint)
4016
4017 Lorem
4018
4019 ## Assistant
4020
4021 LOREM
4022
4023 "}
4024 );
4025 });
4026 assert_eq!(fs.files(), vec![Path::new(path!("/test/file-0"))]);
4027 }
4028
4029 #[gpui::test]
4030 async fn test_tool_result_refusal(cx: &mut TestAppContext) {
4031 use std::sync::atomic::AtomicUsize;
4032 init_test(cx);
4033
4034 let fs = FakeFs::new(cx.executor());
4035 let project = Project::test(fs, None, cx).await;
4036
4037 // Create a connection that simulates refusal after tool result
4038 let prompt_count = Arc::new(AtomicUsize::new(0));
4039 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
4040 let prompt_count = prompt_count.clone();
4041 move |_request, thread, mut cx| {
4042 let count = prompt_count.fetch_add(1, SeqCst);
4043 async move {
4044 if count == 0 {
4045 // First prompt: Generate a tool call with result
4046 thread.update(&mut cx, |thread, cx| {
4047 thread
4048 .handle_session_update(
4049 acp::SessionUpdate::ToolCall(
4050 acp::ToolCall::new("tool1", "Test Tool")
4051 .kind(acp::ToolKind::Fetch)
4052 .status(acp::ToolCallStatus::Completed)
4053 .raw_input(serde_json::json!({"query": "test"}))
4054 .raw_output(serde_json::json!({"result": "inappropriate content"})),
4055 ),
4056 cx,
4057 )
4058 .unwrap();
4059 })?;
4060
4061 // Now return refusal because of the tool result
4062 Ok(acp::PromptResponse::new(acp::StopReason::Refusal))
4063 } else {
4064 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
4065 }
4066 }
4067 .boxed_local()
4068 }
4069 }));
4070
4071 let thread = cx
4072 .update(|cx| {
4073 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4074 })
4075 .await
4076 .unwrap();
4077
4078 // Track if we see a Refusal event
4079 let saw_refusal_event = Arc::new(std::sync::Mutex::new(false));
4080 let saw_refusal_event_captured = saw_refusal_event.clone();
4081 thread.update(cx, |_thread, cx| {
4082 cx.subscribe(
4083 &thread,
4084 move |_thread, _event_thread, event: &AcpThreadEvent, _cx| {
4085 if matches!(event, AcpThreadEvent::Refusal) {
4086 *saw_refusal_event_captured.lock().unwrap() = true;
4087 }
4088 },
4089 )
4090 .detach();
4091 });
4092
4093 // Send a user message - this will trigger tool call and then refusal
4094 let send_task = thread.update(cx, |thread, cx| thread.send(vec!["Hello".into()], cx));
4095 cx.background_executor.spawn(send_task).detach();
4096 cx.run_until_parked();
4097
4098 // Verify that:
4099 // 1. A Refusal event WAS emitted (because it's a tool result refusal, not user prompt)
4100 // 2. The user message was NOT truncated
4101 assert!(
4102 *saw_refusal_event.lock().unwrap(),
4103 "Refusal event should be emitted for tool result refusals"
4104 );
4105
4106 thread.read_with(cx, |thread, _| {
4107 let entries = thread.entries();
4108 assert!(entries.len() >= 2, "Should have user message and tool call");
4109
4110 // Verify user message is still there
4111 assert!(
4112 matches!(entries[0], AgentThreadEntry::UserMessage(_)),
4113 "User message should not be truncated"
4114 );
4115
4116 // Verify tool call is there with result
4117 if let AgentThreadEntry::ToolCall(tool_call) = &entries[1] {
4118 assert!(
4119 tool_call.raw_output.is_some(),
4120 "Tool call should have output"
4121 );
4122 } else {
4123 panic!("Expected tool call at index 1");
4124 }
4125 });
4126 }
4127
4128 #[gpui::test]
4129 async fn test_user_prompt_refusal_emits_event(cx: &mut TestAppContext) {
4130 init_test(cx);
4131
4132 let fs = FakeFs::new(cx.executor());
4133 let project = Project::test(fs, None, cx).await;
4134
4135 let refuse_next = Arc::new(AtomicBool::new(false));
4136 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
4137 let refuse_next = refuse_next.clone();
4138 move |_request, _thread, _cx| {
4139 if refuse_next.load(SeqCst) {
4140 async move { Ok(acp::PromptResponse::new(acp::StopReason::Refusal)) }
4141 .boxed_local()
4142 } else {
4143 async move { Ok(acp::PromptResponse::new(acp::StopReason::EndTurn)) }
4144 .boxed_local()
4145 }
4146 }
4147 }));
4148
4149 let thread = cx
4150 .update(|cx| {
4151 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4152 })
4153 .await
4154 .unwrap();
4155
4156 // Track if we see a Refusal event
4157 let saw_refusal_event = Arc::new(std::sync::Mutex::new(false));
4158 let saw_refusal_event_captured = saw_refusal_event.clone();
4159 thread.update(cx, |_thread, cx| {
4160 cx.subscribe(
4161 &thread,
4162 move |_thread, _event_thread, event: &AcpThreadEvent, _cx| {
4163 if matches!(event, AcpThreadEvent::Refusal) {
4164 *saw_refusal_event_captured.lock().unwrap() = true;
4165 }
4166 },
4167 )
4168 .detach();
4169 });
4170
4171 // Send a message that will be refused
4172 refuse_next.store(true, SeqCst);
4173 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["hello".into()], cx)))
4174 .await
4175 .unwrap();
4176
4177 // Verify that a Refusal event WAS emitted for user prompt refusal
4178 assert!(
4179 *saw_refusal_event.lock().unwrap(),
4180 "Refusal event should be emitted for user prompt refusals"
4181 );
4182
4183 // Verify the message was truncated (user prompt refusal)
4184 thread.read_with(cx, |thread, cx| {
4185 assert_eq!(thread.to_markdown(cx), "");
4186 });
4187 }
4188
4189 #[gpui::test]
4190 async fn test_refusal(cx: &mut TestAppContext) {
4191 init_test(cx);
4192 let fs = FakeFs::new(cx.background_executor.clone());
4193 fs.insert_tree(path!("/"), json!({})).await;
4194 let project = Project::test(fs.clone(), [path!("/").as_ref()], cx).await;
4195
4196 let refuse_next = Arc::new(AtomicBool::new(false));
4197 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
4198 let refuse_next = refuse_next.clone();
4199 move |request, thread, mut cx| {
4200 let refuse_next = refuse_next.clone();
4201 async move {
4202 if refuse_next.load(SeqCst) {
4203 return Ok(acp::PromptResponse::new(acp::StopReason::Refusal));
4204 }
4205
4206 let acp::ContentBlock::Text(content) = &request.prompt[0] else {
4207 panic!("expected text content block");
4208 };
4209 thread.update(&mut cx, |thread, cx| {
4210 thread
4211 .handle_session_update(
4212 acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk::new(
4213 content.text.to_uppercase().into(),
4214 )),
4215 cx,
4216 )
4217 .unwrap();
4218 })?;
4219 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
4220 }
4221 .boxed_local()
4222 }
4223 }));
4224 let thread = cx
4225 .update(|cx| {
4226 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4227 })
4228 .await
4229 .unwrap();
4230
4231 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["hello".into()], cx)))
4232 .await
4233 .unwrap();
4234 thread.read_with(cx, |thread, cx| {
4235 assert_eq!(
4236 thread.to_markdown(cx),
4237 indoc! {"
4238 ## User
4239
4240 hello
4241
4242 ## Assistant
4243
4244 HELLO
4245
4246 "}
4247 );
4248 });
4249
4250 // Simulate refusing the second message. The message should be truncated
4251 // when a user prompt is refused.
4252 refuse_next.store(true, SeqCst);
4253 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["world".into()], cx)))
4254 .await
4255 .unwrap();
4256 thread.read_with(cx, |thread, cx| {
4257 assert_eq!(
4258 thread.to_markdown(cx),
4259 indoc! {"
4260 ## User
4261
4262 hello
4263
4264 ## Assistant
4265
4266 HELLO
4267
4268 "}
4269 );
4270 });
4271 }
4272
4273 async fn run_until_first_tool_call(
4274 thread: &Entity<AcpThread>,
4275 cx: &mut TestAppContext,
4276 ) -> usize {
4277 let (mut tx, mut rx) = mpsc::channel::<usize>(1);
4278
4279 let subscription = cx.update(|cx| {
4280 cx.subscribe(thread, move |thread, _, cx| {
4281 for (ix, entry) in thread.read(cx).entries.iter().enumerate() {
4282 if matches!(entry, AgentThreadEntry::ToolCall(_)) {
4283 return tx.try_send(ix).unwrap();
4284 }
4285 }
4286 })
4287 });
4288
4289 select! {
4290 _ = futures::FutureExt::fuse(cx.background_executor.timer(Duration::from_secs(10))) => {
4291 panic!("Timeout waiting for tool call")
4292 }
4293 ix = rx.next().fuse() => {
4294 drop(subscription);
4295 ix.unwrap()
4296 }
4297 }
4298 }
4299
4300 #[derive(Clone, Default)]
4301 struct FakeAgentConnection {
4302 auth_methods: Vec<acp::AuthMethod>,
4303 sessions: Arc<parking_lot::Mutex<HashMap<acp::SessionId, WeakEntity<AcpThread>>>>,
4304 set_title_calls: Rc<RefCell<Vec<SharedString>>>,
4305 on_user_message: Option<
4306 Rc<
4307 dyn Fn(
4308 acp::PromptRequest,
4309 WeakEntity<AcpThread>,
4310 AsyncApp,
4311 ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
4312 + 'static,
4313 >,
4314 >,
4315 }
4316
4317 impl FakeAgentConnection {
4318 fn new() -> Self {
4319 Self {
4320 auth_methods: Vec::new(),
4321 on_user_message: None,
4322 sessions: Arc::default(),
4323 set_title_calls: Default::default(),
4324 }
4325 }
4326
4327 #[expect(unused)]
4328 fn with_auth_methods(mut self, auth_methods: Vec<acp::AuthMethod>) -> Self {
4329 self.auth_methods = auth_methods;
4330 self
4331 }
4332
4333 fn on_user_message(
4334 mut self,
4335 handler: impl Fn(
4336 acp::PromptRequest,
4337 WeakEntity<AcpThread>,
4338 AsyncApp,
4339 ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
4340 + 'static,
4341 ) -> Self {
4342 self.on_user_message.replace(Rc::new(handler));
4343 self
4344 }
4345 }
4346
4347 impl AgentConnection for FakeAgentConnection {
4348 fn agent_id(&self) -> AgentId {
4349 AgentId::new("fake")
4350 }
4351
4352 fn telemetry_id(&self) -> SharedString {
4353 "fake".into()
4354 }
4355
4356 fn auth_methods(&self) -> &[acp::AuthMethod] {
4357 &self.auth_methods
4358 }
4359
4360 fn new_session(
4361 self: Rc<Self>,
4362 project: Entity<Project>,
4363 work_dirs: PathList,
4364 cx: &mut App,
4365 ) -> Task<gpui::Result<Entity<AcpThread>>> {
4366 let session_id = acp::SessionId::new(
4367 rand::rng()
4368 .sample_iter(&distr::Alphanumeric)
4369 .take(7)
4370 .map(char::from)
4371 .collect::<String>(),
4372 );
4373 let action_log = cx.new(|_| ActionLog::new(project.clone()));
4374 let thread = cx.new(|cx| {
4375 AcpThread::new(
4376 None,
4377 None,
4378 Some(work_dirs),
4379 self.clone(),
4380 project,
4381 action_log,
4382 session_id.clone(),
4383 watch::Receiver::constant(
4384 acp::PromptCapabilities::new()
4385 .image(true)
4386 .audio(true)
4387 .embedded_context(true),
4388 ),
4389 cx,
4390 )
4391 });
4392 self.sessions.lock().insert(session_id, thread.downgrade());
4393 Task::ready(Ok(thread))
4394 }
4395
4396 fn authenticate(&self, method: acp::AuthMethodId, _cx: &mut App) -> Task<gpui::Result<()>> {
4397 if self.auth_methods().iter().any(|m| m.id() == &method) {
4398 Task::ready(Ok(()))
4399 } else {
4400 Task::ready(Err(anyhow!("Invalid Auth Method")))
4401 }
4402 }
4403
4404 fn prompt(
4405 &self,
4406 _id: Option<UserMessageId>,
4407 params: acp::PromptRequest,
4408 cx: &mut App,
4409 ) -> Task<gpui::Result<acp::PromptResponse>> {
4410 let sessions = self.sessions.lock();
4411 let thread = sessions.get(¶ms.session_id).unwrap();
4412 if let Some(handler) = &self.on_user_message {
4413 let handler = handler.clone();
4414 let thread = thread.clone();
4415 cx.spawn(async move |cx| handler(params, thread, cx.clone()).await)
4416 } else {
4417 Task::ready(Ok(acp::PromptResponse::new(acp::StopReason::EndTurn)))
4418 }
4419 }
4420
4421 fn cancel(&self, _session_id: &acp::SessionId, _cx: &mut App) {}
4422
4423 fn truncate(
4424 &self,
4425 session_id: &acp::SessionId,
4426 _cx: &App,
4427 ) -> Option<Rc<dyn AgentSessionTruncate>> {
4428 Some(Rc::new(FakeAgentSessionEditor {
4429 _session_id: session_id.clone(),
4430 }))
4431 }
4432
4433 fn set_title(
4434 &self,
4435 _session_id: &acp::SessionId,
4436 _cx: &App,
4437 ) -> Option<Rc<dyn AgentSessionSetTitle>> {
4438 Some(Rc::new(FakeAgentSessionSetTitle {
4439 calls: self.set_title_calls.clone(),
4440 }))
4441 }
4442
4443 fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
4444 self
4445 }
4446 }
4447
4448 struct FakeAgentSessionSetTitle {
4449 calls: Rc<RefCell<Vec<SharedString>>>,
4450 }
4451
4452 impl AgentSessionSetTitle for FakeAgentSessionSetTitle {
4453 fn run(&self, title: SharedString, _cx: &mut App) -> Task<Result<()>> {
4454 self.calls.borrow_mut().push(title);
4455 Task::ready(Ok(()))
4456 }
4457 }
4458
4459 struct FakeAgentSessionEditor {
4460 _session_id: acp::SessionId,
4461 }
4462
4463 impl AgentSessionTruncate for FakeAgentSessionEditor {
4464 fn run(&self, _message_id: UserMessageId, _cx: &mut App) -> Task<Result<()>> {
4465 Task::ready(Ok(()))
4466 }
4467 }
4468
4469 #[gpui::test]
4470 async fn test_tool_call_not_found_creates_failed_entry(cx: &mut TestAppContext) {
4471 init_test(cx);
4472
4473 let fs = FakeFs::new(cx.executor());
4474 let project = Project::test(fs, [], cx).await;
4475 let connection = Rc::new(FakeAgentConnection::new());
4476 let thread = cx
4477 .update(|cx| {
4478 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4479 })
4480 .await
4481 .unwrap();
4482
4483 // Try to update a tool call that doesn't exist
4484 let nonexistent_id = acp::ToolCallId::new("nonexistent-tool-call");
4485 thread.update(cx, |thread, cx| {
4486 let result = thread.handle_session_update(
4487 acp::SessionUpdate::ToolCallUpdate(acp::ToolCallUpdate::new(
4488 nonexistent_id.clone(),
4489 acp::ToolCallUpdateFields::new().status(acp::ToolCallStatus::Completed),
4490 )),
4491 cx,
4492 );
4493
4494 // The update should succeed (not return an error)
4495 assert!(result.is_ok());
4496
4497 // There should now be exactly one entry in the thread
4498 assert_eq!(thread.entries.len(), 1);
4499
4500 // The entry should be a failed tool call
4501 if let AgentThreadEntry::ToolCall(tool_call) = &thread.entries[0] {
4502 assert_eq!(tool_call.id, nonexistent_id);
4503 assert!(matches!(tool_call.status, ToolCallStatus::Failed));
4504 assert_eq!(tool_call.kind, acp::ToolKind::Fetch);
4505
4506 // Check that the content contains the error message
4507 assert_eq!(tool_call.content.len(), 1);
4508 if let ToolCallContent::ContentBlock(content_block) = &tool_call.content[0] {
4509 match content_block {
4510 ContentBlock::Markdown { markdown } => {
4511 let markdown_text = markdown.read(cx).source();
4512 assert!(markdown_text.contains("Tool call not found"));
4513 }
4514 ContentBlock::Empty => panic!("Expected markdown content, got empty"),
4515 ContentBlock::ResourceLink { .. } => {
4516 panic!("Expected markdown content, got resource link")
4517 }
4518 ContentBlock::Image { .. } => {
4519 panic!("Expected markdown content, got image")
4520 }
4521 }
4522 } else {
4523 panic!("Expected ContentBlock, got: {:?}", tool_call.content[0]);
4524 }
4525 } else {
4526 panic!("Expected ToolCall entry, got: {:?}", thread.entries[0]);
4527 }
4528 });
4529 }
4530
4531 /// Tests that restoring a checkpoint properly cleans up terminals that were
4532 /// created after that checkpoint, and cancels any in-progress generation.
4533 ///
4534 /// Reproduces issue #35142: When a checkpoint is restored, any terminal processes
4535 /// that were started after that checkpoint should be terminated, and any in-progress
4536 /// AI generation should be canceled.
4537 #[gpui::test]
4538 async fn test_restore_checkpoint_kills_terminal(cx: &mut TestAppContext) {
4539 init_test(cx);
4540
4541 let fs = FakeFs::new(cx.executor());
4542 let project = Project::test(fs, [], cx).await;
4543 let connection = Rc::new(FakeAgentConnection::new());
4544 let thread = cx
4545 .update(|cx| {
4546 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4547 })
4548 .await
4549 .unwrap();
4550
4551 // Send first user message to create a checkpoint
4552 cx.update(|cx| {
4553 thread.update(cx, |thread, cx| {
4554 thread.send(vec!["first message".into()], cx)
4555 })
4556 })
4557 .await
4558 .unwrap();
4559
4560 // Send second message (creates another checkpoint) - we'll restore to this one
4561 cx.update(|cx| {
4562 thread.update(cx, |thread, cx| {
4563 thread.send(vec!["second message".into()], cx)
4564 })
4565 })
4566 .await
4567 .unwrap();
4568
4569 // Create 2 terminals BEFORE the checkpoint that have completed running
4570 let terminal_id_1 = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
4571 let mock_terminal_1 = cx.new(|cx| {
4572 let builder = ::terminal::TerminalBuilder::new_display_only(
4573 ::terminal::terminal_settings::CursorShape::default(),
4574 ::terminal::terminal_settings::AlternateScroll::On,
4575 None,
4576 0,
4577 cx.background_executor(),
4578 PathStyle::local(),
4579 )
4580 .unwrap();
4581 builder.subscribe(cx)
4582 });
4583
4584 thread.update(cx, |thread, cx| {
4585 thread.on_terminal_provider_event(
4586 TerminalProviderEvent::Created {
4587 terminal_id: terminal_id_1.clone(),
4588 label: "echo 'first'".to_string(),
4589 cwd: Some(PathBuf::from("/test")),
4590 output_byte_limit: None,
4591 terminal: mock_terminal_1.clone(),
4592 },
4593 cx,
4594 );
4595 });
4596
4597 thread.update(cx, |thread, cx| {
4598 thread.on_terminal_provider_event(
4599 TerminalProviderEvent::Output {
4600 terminal_id: terminal_id_1.clone(),
4601 data: b"first\n".to_vec(),
4602 },
4603 cx,
4604 );
4605 });
4606
4607 thread.update(cx, |thread, cx| {
4608 thread.on_terminal_provider_event(
4609 TerminalProviderEvent::Exit {
4610 terminal_id: terminal_id_1.clone(),
4611 status: acp::TerminalExitStatus::new().exit_code(0),
4612 },
4613 cx,
4614 );
4615 });
4616
4617 let terminal_id_2 = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
4618 let mock_terminal_2 = cx.new(|cx| {
4619 let builder = ::terminal::TerminalBuilder::new_display_only(
4620 ::terminal::terminal_settings::CursorShape::default(),
4621 ::terminal::terminal_settings::AlternateScroll::On,
4622 None,
4623 0,
4624 cx.background_executor(),
4625 PathStyle::local(),
4626 )
4627 .unwrap();
4628 builder.subscribe(cx)
4629 });
4630
4631 thread.update(cx, |thread, cx| {
4632 thread.on_terminal_provider_event(
4633 TerminalProviderEvent::Created {
4634 terminal_id: terminal_id_2.clone(),
4635 label: "echo 'second'".to_string(),
4636 cwd: Some(PathBuf::from("/test")),
4637 output_byte_limit: None,
4638 terminal: mock_terminal_2.clone(),
4639 },
4640 cx,
4641 );
4642 });
4643
4644 thread.update(cx, |thread, cx| {
4645 thread.on_terminal_provider_event(
4646 TerminalProviderEvent::Output {
4647 terminal_id: terminal_id_2.clone(),
4648 data: b"second\n".to_vec(),
4649 },
4650 cx,
4651 );
4652 });
4653
4654 thread.update(cx, |thread, cx| {
4655 thread.on_terminal_provider_event(
4656 TerminalProviderEvent::Exit {
4657 terminal_id: terminal_id_2.clone(),
4658 status: acp::TerminalExitStatus::new().exit_code(0),
4659 },
4660 cx,
4661 );
4662 });
4663
4664 // Get the second message ID to restore to
4665 let second_message_id = thread.read_with(cx, |thread, _| {
4666 // At this point we have:
4667 // - Index 0: First user message (with checkpoint)
4668 // - Index 1: Second user message (with checkpoint)
4669 // No assistant responses because FakeAgentConnection just returns EndTurn
4670 let AgentThreadEntry::UserMessage(message) = &thread.entries[1] else {
4671 panic!("expected user message at index 1");
4672 };
4673 message.id.clone().unwrap()
4674 });
4675
4676 // Create a terminal AFTER the checkpoint we'll restore to.
4677 // This simulates the AI agent starting a long-running terminal command.
4678 let terminal_id = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
4679 let mock_terminal = cx.new(|cx| {
4680 let builder = ::terminal::TerminalBuilder::new_display_only(
4681 ::terminal::terminal_settings::CursorShape::default(),
4682 ::terminal::terminal_settings::AlternateScroll::On,
4683 None,
4684 0,
4685 cx.background_executor(),
4686 PathStyle::local(),
4687 )
4688 .unwrap();
4689 builder.subscribe(cx)
4690 });
4691
4692 // Register the terminal as created
4693 thread.update(cx, |thread, cx| {
4694 thread.on_terminal_provider_event(
4695 TerminalProviderEvent::Created {
4696 terminal_id: terminal_id.clone(),
4697 label: "sleep 1000".to_string(),
4698 cwd: Some(PathBuf::from("/test")),
4699 output_byte_limit: None,
4700 terminal: mock_terminal.clone(),
4701 },
4702 cx,
4703 );
4704 });
4705
4706 // Simulate the terminal producing output (still running)
4707 thread.update(cx, |thread, cx| {
4708 thread.on_terminal_provider_event(
4709 TerminalProviderEvent::Output {
4710 terminal_id: terminal_id.clone(),
4711 data: b"terminal is running...\n".to_vec(),
4712 },
4713 cx,
4714 );
4715 });
4716
4717 // Create a tool call entry that references this terminal
4718 // This represents the agent requesting a terminal command
4719 thread.update(cx, |thread, cx| {
4720 thread
4721 .handle_session_update(
4722 acp::SessionUpdate::ToolCall(
4723 acp::ToolCall::new("terminal-tool-1", "Running command")
4724 .kind(acp::ToolKind::Execute)
4725 .status(acp::ToolCallStatus::InProgress)
4726 .content(vec![acp::ToolCallContent::Terminal(acp::Terminal::new(
4727 terminal_id.clone(),
4728 ))])
4729 .raw_input(serde_json::json!({"command": "sleep 1000", "cd": "/test"})),
4730 ),
4731 cx,
4732 )
4733 .unwrap();
4734 });
4735
4736 // Verify terminal exists and is in the thread
4737 let terminal_exists_before =
4738 thread.read_with(cx, |thread, _| thread.terminals.contains_key(&terminal_id));
4739 assert!(
4740 terminal_exists_before,
4741 "Terminal should exist before checkpoint restore"
4742 );
4743
4744 // Verify the terminal's underlying task is still running (not completed)
4745 let terminal_running_before = thread.read_with(cx, |thread, _cx| {
4746 let terminal_entity = thread.terminals.get(&terminal_id).unwrap();
4747 terminal_entity.read_with(cx, |term, _cx| {
4748 term.output().is_none() // output is None means it's still running
4749 })
4750 });
4751 assert!(
4752 terminal_running_before,
4753 "Terminal should be running before checkpoint restore"
4754 );
4755
4756 // Verify we have the expected entries before restore
4757 let entry_count_before = thread.read_with(cx, |thread, _| thread.entries.len());
4758 assert!(
4759 entry_count_before > 1,
4760 "Should have multiple entries before restore"
4761 );
4762
4763 // Restore the checkpoint to the second message.
4764 // This should:
4765 // 1. Cancel any in-progress generation (via the cancel() call)
4766 // 2. Remove the terminal that was created after that point
4767 thread
4768 .update(cx, |thread, cx| {
4769 thread.restore_checkpoint(second_message_id, cx)
4770 })
4771 .await
4772 .unwrap();
4773
4774 // Verify that no send_task is in progress after restore
4775 // (cancel() clears the send_task)
4776 let has_send_task_after = thread.read_with(cx, |thread, _| thread.running_turn.is_some());
4777 assert!(
4778 !has_send_task_after,
4779 "Should not have a send_task after restore (cancel should have cleared it)"
4780 );
4781
4782 // Verify the entries were truncated (restoring to index 1 truncates at 1, keeping only index 0)
4783 let entry_count = thread.read_with(cx, |thread, _| thread.entries.len());
4784 assert_eq!(
4785 entry_count, 1,
4786 "Should have 1 entry after restore (only the first user message)"
4787 );
4788
4789 // Verify the 2 completed terminals from before the checkpoint still exist
4790 let terminal_1_exists = thread.read_with(cx, |thread, _| {
4791 thread.terminals.contains_key(&terminal_id_1)
4792 });
4793 assert!(
4794 terminal_1_exists,
4795 "Terminal 1 (from before checkpoint) should still exist"
4796 );
4797
4798 let terminal_2_exists = thread.read_with(cx, |thread, _| {
4799 thread.terminals.contains_key(&terminal_id_2)
4800 });
4801 assert!(
4802 terminal_2_exists,
4803 "Terminal 2 (from before checkpoint) should still exist"
4804 );
4805
4806 // Verify they're still in completed state
4807 let terminal_1_completed = thread.read_with(cx, |thread, _cx| {
4808 let terminal_entity = thread.terminals.get(&terminal_id_1).unwrap();
4809 terminal_entity.read_with(cx, |term, _cx| term.output().is_some())
4810 });
4811 assert!(terminal_1_completed, "Terminal 1 should still be completed");
4812
4813 let terminal_2_completed = thread.read_with(cx, |thread, _cx| {
4814 let terminal_entity = thread.terminals.get(&terminal_id_2).unwrap();
4815 terminal_entity.read_with(cx, |term, _cx| term.output().is_some())
4816 });
4817 assert!(terminal_2_completed, "Terminal 2 should still be completed");
4818
4819 // Verify the running terminal (created after checkpoint) was removed
4820 let terminal_3_exists =
4821 thread.read_with(cx, |thread, _| thread.terminals.contains_key(&terminal_id));
4822 assert!(
4823 !terminal_3_exists,
4824 "Terminal 3 (created after checkpoint) should have been removed"
4825 );
4826
4827 // Verify total count is 2 (the two from before the checkpoint)
4828 let terminal_count = thread.read_with(cx, |thread, _| thread.terminals.len());
4829 assert_eq!(
4830 terminal_count, 2,
4831 "Should have exactly 2 terminals (the completed ones from before checkpoint)"
4832 );
4833 }
4834
4835 /// Tests that update_last_checkpoint correctly updates the original message's checkpoint
4836 /// even when a new user message is added while the async checkpoint comparison is in progress.
4837 ///
4838 /// This is a regression test for a bug where update_last_checkpoint would fail with
4839 /// "no checkpoint" if a new user message (without a checkpoint) was added between when
4840 /// update_last_checkpoint started and when its async closure ran.
4841 #[gpui::test]
4842 async fn test_update_last_checkpoint_with_new_message_added(cx: &mut TestAppContext) {
4843 init_test(cx);
4844
4845 let fs = FakeFs::new(cx.executor());
4846 fs.insert_tree(path!("/test"), json!({".git": {}, "file.txt": "content"}))
4847 .await;
4848 let project = Project::test(fs.clone(), [Path::new(path!("/test"))], cx).await;
4849
4850 let handler_done = Arc::new(AtomicBool::new(false));
4851 let handler_done_clone = handler_done.clone();
4852 let connection = Rc::new(FakeAgentConnection::new().on_user_message(
4853 move |_, _thread, _cx| {
4854 handler_done_clone.store(true, SeqCst);
4855 async move { Ok(acp::PromptResponse::new(acp::StopReason::EndTurn)) }.boxed_local()
4856 },
4857 ));
4858
4859 let thread = cx
4860 .update(|cx| {
4861 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4862 })
4863 .await
4864 .unwrap();
4865
4866 let send_future = thread.update(cx, |thread, cx| thread.send_raw("First message", cx));
4867 let send_task = cx.background_executor.spawn(send_future);
4868
4869 // Tick until handler completes, then a few more to let update_last_checkpoint start
4870 while !handler_done.load(SeqCst) {
4871 cx.executor().tick();
4872 }
4873 for _ in 0..5 {
4874 cx.executor().tick();
4875 }
4876
4877 thread.update(cx, |thread, cx| {
4878 thread.push_entry(
4879 AgentThreadEntry::UserMessage(UserMessage {
4880 id: Some(UserMessageId::new()),
4881 content: ContentBlock::Empty,
4882 chunks: vec!["Injected message (no checkpoint)".into()],
4883 checkpoint: None,
4884 indented: false,
4885 }),
4886 cx,
4887 );
4888 });
4889
4890 cx.run_until_parked();
4891 let result = send_task.await;
4892
4893 assert!(
4894 result.is_ok(),
4895 "send should succeed even when new message added during update_last_checkpoint: {:?}",
4896 result.err()
4897 );
4898 }
4899
4900 /// Tests that when a follow-up message is sent during generation,
4901 /// the first turn completing does NOT clear `running_turn` because
4902 /// it now belongs to the second turn.
4903 #[gpui::test]
4904 async fn test_follow_up_message_during_generation_does_not_clear_turn(cx: &mut TestAppContext) {
4905 init_test(cx);
4906
4907 let fs = FakeFs::new(cx.executor());
4908 let project = Project::test(fs, [], cx).await;
4909
4910 // First handler waits for this signal before completing
4911 let (first_complete_tx, first_complete_rx) = futures::channel::oneshot::channel::<()>();
4912 let first_complete_rx = RefCell::new(Some(first_complete_rx));
4913
4914 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
4915 move |params, _thread, _cx| {
4916 let first_complete_rx = first_complete_rx.borrow_mut().take();
4917 let is_first = params
4918 .prompt
4919 .iter()
4920 .any(|c| matches!(c, acp::ContentBlock::Text(t) if t.text.contains("first")));
4921
4922 async move {
4923 if is_first {
4924 // First handler waits until signaled
4925 if let Some(rx) = first_complete_rx {
4926 rx.await.ok();
4927 }
4928 }
4929 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
4930 }
4931 .boxed_local()
4932 }
4933 }));
4934
4935 let thread = cx
4936 .update(|cx| {
4937 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4938 })
4939 .await
4940 .unwrap();
4941
4942 // Send first message (turn_id=1) - handler will block
4943 let first_request = thread.update(cx, |thread, cx| thread.send_raw("first", cx));
4944 assert_eq!(thread.read_with(cx, |t, _| t.turn_id), 1);
4945
4946 // Send second message (turn_id=2) while first is still blocked
4947 // This calls cancel() which takes turn 1's running_turn and sets turn 2's
4948 let second_request = thread.update(cx, |thread, cx| thread.send_raw("second", cx));
4949 assert_eq!(thread.read_with(cx, |t, _| t.turn_id), 2);
4950
4951 let running_turn_after_second_send =
4952 thread.read_with(cx, |thread, _| thread.running_turn.as_ref().map(|t| t.id));
4953 assert_eq!(
4954 running_turn_after_second_send,
4955 Some(2),
4956 "running_turn should be set to turn 2 after sending second message"
4957 );
4958
4959 // Now signal first handler to complete
4960 first_complete_tx.send(()).ok();
4961
4962 // First request completes - should NOT clear running_turn
4963 // because running_turn now belongs to turn 2
4964 first_request.await.unwrap();
4965
4966 let running_turn_after_first =
4967 thread.read_with(cx, |thread, _| thread.running_turn.as_ref().map(|t| t.id));
4968 assert_eq!(
4969 running_turn_after_first,
4970 Some(2),
4971 "first turn completing should not clear running_turn (belongs to turn 2)"
4972 );
4973
4974 // Second request completes - SHOULD clear running_turn
4975 second_request.await.unwrap();
4976
4977 let running_turn_after_second =
4978 thread.read_with(cx, |thread, _| thread.running_turn.is_some());
4979 assert!(
4980 !running_turn_after_second,
4981 "second turn completing should clear running_turn"
4982 );
4983 }
4984
4985 #[gpui::test]
4986 async fn test_send_returns_cancelled_response_and_marks_tools_as_cancelled(
4987 cx: &mut TestAppContext,
4988 ) {
4989 init_test(cx);
4990
4991 let fs = FakeFs::new(cx.executor());
4992 let project = Project::test(fs, [], cx).await;
4993
4994 let connection = Rc::new(FakeAgentConnection::new().on_user_message(
4995 move |_params, thread, mut cx| {
4996 async move {
4997 thread
4998 .update(&mut cx, |thread, cx| {
4999 thread.handle_session_update(
5000 acp::SessionUpdate::ToolCall(
5001 acp::ToolCall::new(
5002 acp::ToolCallId::new("test-tool"),
5003 "Test Tool",
5004 )
5005 .kind(acp::ToolKind::Fetch)
5006 .status(acp::ToolCallStatus::InProgress),
5007 ),
5008 cx,
5009 )
5010 })
5011 .unwrap()
5012 .unwrap();
5013
5014 Ok(acp::PromptResponse::new(acp::StopReason::Cancelled))
5015 }
5016 .boxed_local()
5017 },
5018 ));
5019
5020 let thread = cx
5021 .update(|cx| {
5022 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
5023 })
5024 .await
5025 .unwrap();
5026
5027 let response = thread
5028 .update(cx, |thread, cx| thread.send_raw("test message", cx))
5029 .await;
5030
5031 let response = response
5032 .expect("send should succeed")
5033 .expect("should have response");
5034 assert_eq!(
5035 response.stop_reason,
5036 acp::StopReason::Cancelled,
5037 "response should have Cancelled stop_reason"
5038 );
5039
5040 thread.read_with(cx, |thread, _| {
5041 let tool_entry = thread
5042 .entries
5043 .iter()
5044 .find_map(|e| {
5045 if let AgentThreadEntry::ToolCall(call) = e {
5046 Some(call)
5047 } else {
5048 None
5049 }
5050 })
5051 .expect("should have tool call entry");
5052
5053 assert!(
5054 matches!(tool_entry.status, ToolCallStatus::Canceled),
5055 "tool should be marked as Canceled when response is Cancelled, got {:?}",
5056 tool_entry.status
5057 );
5058 });
5059 }
5060
5061 #[gpui::test]
5062 async fn test_provisional_title_replaced_by_real_title(cx: &mut TestAppContext) {
5063 init_test(cx);
5064
5065 let fs = FakeFs::new(cx.executor());
5066 let project = Project::test(fs, [], cx).await;
5067 let connection = Rc::new(FakeAgentConnection::new());
5068 let set_title_calls = connection.set_title_calls.clone();
5069
5070 let thread = cx
5071 .update(|cx| {
5072 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
5073 })
5074 .await
5075 .unwrap();
5076
5077 // Initial title is the default.
5078 thread.read_with(cx, |thread, _| {
5079 assert_eq!(thread.title(), None);
5080 });
5081
5082 // Setting a provisional title updates the display title.
5083 thread.update(cx, |thread, cx| {
5084 thread.set_provisional_title("Hello, can you help…".into(), cx);
5085 });
5086 thread.read_with(cx, |thread, _| {
5087 assert_eq!(
5088 thread.title().as_ref().map(|s| s.as_str()),
5089 Some("Hello, can you help…")
5090 );
5091 });
5092
5093 // The provisional title should NOT have propagated to the connection.
5094 assert_eq!(
5095 set_title_calls.borrow().len(),
5096 0,
5097 "provisional title should not propagate to the connection"
5098 );
5099
5100 // When the real title arrives via set_title, it replaces the
5101 // provisional title and propagates to the connection.
5102 let task = thread.update(cx, |thread, cx| {
5103 thread.set_title("Helping with Rust question".into(), cx)
5104 });
5105 task.await.expect("set_title should succeed");
5106 thread.read_with(cx, |thread, _| {
5107 assert_eq!(
5108 thread.title().as_ref().map(|s| s.as_str()),
5109 Some("Helping with Rust question")
5110 );
5111 });
5112 assert_eq!(
5113 set_title_calls.borrow().as_slice(),
5114 &[SharedString::from("Helping with Rust question")],
5115 "real title should propagate to the connection"
5116 );
5117 }
5118
5119 #[gpui::test]
5120 async fn test_session_info_update_replaces_provisional_title_and_emits_event(
5121 cx: &mut TestAppContext,
5122 ) {
5123 init_test(cx);
5124
5125 let fs = FakeFs::new(cx.executor());
5126 let project = Project::test(fs, [], cx).await;
5127 let connection = Rc::new(FakeAgentConnection::new());
5128
5129 let thread = cx
5130 .update(|cx| {
5131 connection.clone().new_session(
5132 project,
5133 PathList::new(&[Path::new(path!("/test"))]),
5134 cx,
5135 )
5136 })
5137 .await
5138 .unwrap();
5139
5140 let title_updated_events = Rc::new(RefCell::new(0usize));
5141 let title_updated_events_for_subscription = title_updated_events.clone();
5142 thread.update(cx, |_thread, cx| {
5143 cx.subscribe(
5144 &thread,
5145 move |_thread, _event_thread, event: &AcpThreadEvent, _cx| {
5146 if matches!(event, AcpThreadEvent::TitleUpdated) {
5147 *title_updated_events_for_subscription.borrow_mut() += 1;
5148 }
5149 },
5150 )
5151 .detach();
5152 });
5153
5154 thread.update(cx, |thread, cx| {
5155 thread.set_provisional_title("Hello, can you help…".into(), cx);
5156 });
5157 assert_eq!(
5158 *title_updated_events.borrow(),
5159 1,
5160 "setting a provisional title should emit TitleUpdated"
5161 );
5162
5163 let result = thread.update(cx, |thread, cx| {
5164 thread.handle_session_update(
5165 acp::SessionUpdate::SessionInfoUpdate(
5166 acp::SessionInfoUpdate::new().title("Helping with Rust question"),
5167 ),
5168 cx,
5169 )
5170 });
5171 result.expect("session info update should succeed");
5172
5173 thread.read_with(cx, |thread, _| {
5174 assert_eq!(
5175 thread.title().as_ref().map(|s| s.as_str()),
5176 Some("Helping with Rust question")
5177 );
5178 assert!(
5179 !thread.has_provisional_title(),
5180 "session info title update should clear provisional title"
5181 );
5182 });
5183
5184 assert_eq!(
5185 *title_updated_events.borrow(),
5186 2,
5187 "session info title update should emit TitleUpdated"
5188 );
5189 assert!(
5190 connection.set_title_calls.borrow().is_empty(),
5191 "session info title update should not propagate back to the connection"
5192 );
5193 }
5194}