1mod connection;
2mod diff;
3mod mention;
4mod terminal;
5use action_log::{ActionLog, ActionLogTelemetry};
6use agent_client_protocol::{self as acp};
7use anyhow::{Context as _, Result, anyhow};
8use collections::HashSet;
9pub use connection::*;
10pub use diff::*;
11use futures::{FutureExt, channel::oneshot, future::BoxFuture};
12use gpui::{AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Task, WeakEntity};
13use itertools::Itertools;
14use language::language_settings::FormatOnSave;
15use language::{Anchor, Buffer, BufferSnapshot, LanguageRegistry, Point, ToPoint, text_diff};
16use markdown::Markdown;
17pub use mention::*;
18use project::lsp_store::{FormatTrigger, LspFormatTarget};
19use project::{AgentLocation, Project, git_store::GitStoreCheckpoint};
20use serde::{Deserialize, Serialize};
21use serde_json::to_string_pretty;
22use std::collections::HashMap;
23use std::error::Error;
24use std::fmt::{Formatter, Write};
25use std::ops::Range;
26use std::process::ExitStatus;
27use std::rc::Rc;
28use std::time::{Duration, Instant};
29use std::{fmt::Display, mem, path::PathBuf, sync::Arc};
30use task::{Shell, ShellBuilder};
31pub use terminal::*;
32use text::Bias;
33use ui::App;
34use util::markdown::MarkdownEscaped;
35use util::path_list::PathList;
36use util::{ResultExt, get_default_system_shell_preferring_bash, paths::PathStyle};
37use uuid::Uuid;
38
39/// A unique identifier for a conversation thread, allocated by Zed.
40///
41/// This is the internal identity type used throughout the application for
42/// bookkeeping. It is distinct from `acp::SessionId`, which is allocated
43/// by the agent and used for the wire protocol.
44#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
45pub struct ThreadId(pub Uuid);
46
47impl ThreadId {
48 pub fn new() -> Self {
49 Self(Uuid::new_v4())
50 }
51}
52
53impl std::fmt::Display for ThreadId {
54 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
55 write!(f, "{}", self.0)
56 }
57}
58
59/// Key used in ACP ToolCall meta to store the tool's programmatic name.
60/// This is a workaround since ACP's ToolCall doesn't have a dedicated name field.
61pub const TOOL_NAME_META_KEY: &str = "tool_name";
62
63/// Helper to extract tool name from ACP meta
64pub fn tool_name_from_meta(meta: &Option<acp::Meta>) -> Option<SharedString> {
65 meta.as_ref()
66 .and_then(|m| m.get(TOOL_NAME_META_KEY))
67 .and_then(|v| v.as_str())
68 .map(|s| SharedString::from(s.to_owned()))
69}
70
71/// Helper to create meta with tool name
72pub fn meta_with_tool_name(tool_name: &str) -> acp::Meta {
73 acp::Meta::from_iter([(TOOL_NAME_META_KEY.into(), tool_name.into())])
74}
75
76/// Key used in ACP ToolCall meta to store the session id and message indexes
77pub const SUBAGENT_SESSION_INFO_META_KEY: &str = "subagent_session_info";
78
79#[derive(Clone, Debug, Deserialize, Serialize)]
80pub struct SubagentSessionInfo {
81 /// The session id of the subagent sessiont that was spawned
82 pub session_id: acp::SessionId,
83 /// The index of the message of the start of the "turn" run by this tool call
84 pub message_start_index: usize,
85 /// The index of the output of the message that the subagent has returned
86 #[serde(skip_serializing_if = "Option::is_none")]
87 pub message_end_index: Option<usize>,
88}
89
90/// Helper to extract subagent session id from ACP meta
91pub fn subagent_session_info_from_meta(meta: &Option<acp::Meta>) -> Option<SubagentSessionInfo> {
92 meta.as_ref()
93 .and_then(|m| m.get(SUBAGENT_SESSION_INFO_META_KEY))
94 .and_then(|v| serde_json::from_value(v.clone()).ok())
95}
96
97#[derive(Debug)]
98pub struct UserMessage {
99 pub id: Option<UserMessageId>,
100 pub content: ContentBlock,
101 pub chunks: Vec<acp::ContentBlock>,
102 pub checkpoint: Option<Checkpoint>,
103 pub indented: bool,
104}
105
106#[derive(Debug)]
107pub struct Checkpoint {
108 git_checkpoint: GitStoreCheckpoint,
109 pub show: bool,
110}
111
112impl UserMessage {
113 fn to_markdown(&self, cx: &App) -> String {
114 let mut markdown = String::new();
115 if self
116 .checkpoint
117 .as_ref()
118 .is_some_and(|checkpoint| checkpoint.show)
119 {
120 writeln!(markdown, "## User (checkpoint)").unwrap();
121 } else {
122 writeln!(markdown, "## User").unwrap();
123 }
124 writeln!(markdown).unwrap();
125 writeln!(markdown, "{}", self.content.to_markdown(cx)).unwrap();
126 writeln!(markdown).unwrap();
127 markdown
128 }
129}
130
131#[derive(Debug, PartialEq)]
132pub struct AssistantMessage {
133 pub chunks: Vec<AssistantMessageChunk>,
134 pub indented: bool,
135 pub is_subagent_output: bool,
136}
137
138impl AssistantMessage {
139 pub fn to_markdown(&self, cx: &App) -> String {
140 format!(
141 "## Assistant\n\n{}\n\n",
142 self.chunks
143 .iter()
144 .map(|chunk| chunk.to_markdown(cx))
145 .join("\n\n")
146 )
147 }
148}
149
150#[derive(Debug, PartialEq)]
151pub enum AssistantMessageChunk {
152 Message { block: ContentBlock },
153 Thought { block: ContentBlock },
154}
155
156impl AssistantMessageChunk {
157 pub fn from_str(
158 chunk: &str,
159 language_registry: &Arc<LanguageRegistry>,
160 path_style: PathStyle,
161 cx: &mut App,
162 ) -> Self {
163 Self::Message {
164 block: ContentBlock::new(chunk.into(), language_registry, path_style, cx),
165 }
166 }
167
168 fn to_markdown(&self, cx: &App) -> String {
169 match self {
170 Self::Message { block } => block.to_markdown(cx).to_string(),
171 Self::Thought { block } => {
172 format!("<thinking>\n{}\n</thinking>", block.to_markdown(cx))
173 }
174 }
175 }
176}
177
178#[derive(Debug)]
179pub enum AgentThreadEntry {
180 UserMessage(UserMessage),
181 AssistantMessage(AssistantMessage),
182 ToolCall(ToolCall),
183 CompletedPlan(Vec<PlanEntry>),
184}
185
186impl AgentThreadEntry {
187 pub fn is_indented(&self) -> bool {
188 match self {
189 Self::UserMessage(message) => message.indented,
190 Self::AssistantMessage(message) => message.indented,
191 Self::ToolCall(_) => false,
192 Self::CompletedPlan(_) => false,
193 }
194 }
195
196 pub fn to_markdown(&self, cx: &App) -> String {
197 match self {
198 Self::UserMessage(message) => message.to_markdown(cx),
199 Self::AssistantMessage(message) => message.to_markdown(cx),
200 Self::ToolCall(tool_call) => tool_call.to_markdown(cx),
201 Self::CompletedPlan(entries) => {
202 let mut md = String::from("## Plan\n\n");
203 for entry in entries {
204 let source = entry.content.read(cx).source().to_string();
205 md.push_str(&format!("- [x] {}\n", source));
206 }
207 md
208 }
209 }
210 }
211
212 pub fn user_message(&self) -> Option<&UserMessage> {
213 if let AgentThreadEntry::UserMessage(message) = self {
214 Some(message)
215 } else {
216 None
217 }
218 }
219
220 pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
221 if let AgentThreadEntry::ToolCall(call) = self {
222 itertools::Either::Left(call.diffs())
223 } else {
224 itertools::Either::Right(std::iter::empty())
225 }
226 }
227
228 pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
229 if let AgentThreadEntry::ToolCall(call) = self {
230 itertools::Either::Left(call.terminals())
231 } else {
232 itertools::Either::Right(std::iter::empty())
233 }
234 }
235
236 pub fn location(&self, ix: usize) -> Option<(acp::ToolCallLocation, AgentLocation)> {
237 if let AgentThreadEntry::ToolCall(ToolCall {
238 locations,
239 resolved_locations,
240 ..
241 }) = self
242 {
243 Some((
244 locations.get(ix)?.clone(),
245 resolved_locations.get(ix)?.clone()?,
246 ))
247 } else {
248 None
249 }
250 }
251}
252
253#[derive(Debug)]
254pub struct ToolCall {
255 pub id: acp::ToolCallId,
256 pub label: Entity<Markdown>,
257 pub kind: acp::ToolKind,
258 pub content: Vec<ToolCallContent>,
259 pub status: ToolCallStatus,
260 pub locations: Vec<acp::ToolCallLocation>,
261 pub resolved_locations: Vec<Option<AgentLocation>>,
262 pub raw_input: Option<serde_json::Value>,
263 pub raw_input_markdown: Option<Entity<Markdown>>,
264 pub raw_output: Option<serde_json::Value>,
265 pub tool_name: Option<SharedString>,
266 pub subagent_session_info: Option<SubagentSessionInfo>,
267}
268
269impl ToolCall {
270 fn from_acp(
271 tool_call: acp::ToolCall,
272 status: ToolCallStatus,
273 language_registry: Arc<LanguageRegistry>,
274 path_style: PathStyle,
275 terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
276 cx: &mut App,
277 ) -> Result<Self> {
278 let title = if tool_call.kind == acp::ToolKind::Execute {
279 tool_call.title
280 } else if tool_call.kind == acp::ToolKind::Edit {
281 MarkdownEscaped(tool_call.title.as_str()).to_string()
282 } else if let Some((first_line, _)) = tool_call.title.split_once("\n") {
283 first_line.to_owned() + "…"
284 } else {
285 tool_call.title
286 };
287 let mut content = Vec::with_capacity(tool_call.content.len());
288 for item in tool_call.content {
289 if let Some(item) = ToolCallContent::from_acp(
290 item,
291 language_registry.clone(),
292 path_style,
293 terminals,
294 cx,
295 )? {
296 content.push(item);
297 }
298 }
299
300 let raw_input_markdown = tool_call
301 .raw_input
302 .as_ref()
303 .and_then(|input| markdown_for_raw_output(input, &language_registry, cx));
304
305 let tool_name = tool_name_from_meta(&tool_call.meta);
306
307 let subagent_session_info = subagent_session_info_from_meta(&tool_call.meta);
308
309 let result = Self {
310 id: tool_call.tool_call_id,
311 label: cx
312 .new(|cx| Markdown::new(title.into(), Some(language_registry.clone()), None, cx)),
313 kind: tool_call.kind,
314 content,
315 locations: tool_call.locations,
316 resolved_locations: Vec::default(),
317 status,
318 raw_input: tool_call.raw_input,
319 raw_input_markdown,
320 raw_output: tool_call.raw_output,
321 tool_name,
322 subagent_session_info,
323 };
324 Ok(result)
325 }
326
327 fn update_fields(
328 &mut self,
329 fields: acp::ToolCallUpdateFields,
330 meta: Option<acp::Meta>,
331 language_registry: Arc<LanguageRegistry>,
332 path_style: PathStyle,
333 terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
334 cx: &mut App,
335 ) -> Result<()> {
336 let acp::ToolCallUpdateFields {
337 kind,
338 status,
339 title,
340 content,
341 locations,
342 raw_input,
343 raw_output,
344 ..
345 } = fields;
346
347 if let Some(kind) = kind {
348 self.kind = kind;
349 }
350
351 if let Some(status) = status {
352 self.status = status.into();
353 }
354
355 if let Some(subagent_session_info) = subagent_session_info_from_meta(&meta) {
356 self.subagent_session_info = Some(subagent_session_info);
357 }
358
359 if let Some(title) = title {
360 if self.kind == acp::ToolKind::Execute {
361 for terminal in self.terminals() {
362 terminal.update(cx, |terminal, cx| {
363 terminal.update_command_label(&title, cx);
364 });
365 }
366 }
367 self.label.update(cx, |label, cx| {
368 if self.kind == acp::ToolKind::Execute {
369 label.replace(title, cx);
370 } else if self.kind == acp::ToolKind::Edit {
371 label.replace(MarkdownEscaped(&title).to_string(), cx)
372 } else if let Some((first_line, _)) = title.split_once("\n") {
373 label.replace(first_line.to_owned() + "…", cx);
374 } else {
375 label.replace(title, cx);
376 }
377 });
378 }
379
380 if let Some(content) = content {
381 let mut new_content_len = content.len();
382 let mut content = content.into_iter();
383
384 // Reuse existing content if we can
385 for (old, new) in self.content.iter_mut().zip(content.by_ref()) {
386 let valid_content =
387 old.update_from_acp(new, language_registry.clone(), path_style, terminals, cx)?;
388 if !valid_content {
389 new_content_len -= 1;
390 }
391 }
392 for new in content {
393 if let Some(new) = ToolCallContent::from_acp(
394 new,
395 language_registry.clone(),
396 path_style,
397 terminals,
398 cx,
399 )? {
400 self.content.push(new);
401 } else {
402 new_content_len -= 1;
403 }
404 }
405 self.content.truncate(new_content_len);
406 }
407
408 if let Some(locations) = locations {
409 self.locations = locations;
410 }
411
412 if let Some(raw_input) = raw_input {
413 self.raw_input_markdown = markdown_for_raw_output(&raw_input, &language_registry, cx);
414 self.raw_input = Some(raw_input);
415 }
416
417 if let Some(raw_output) = raw_output {
418 if self.content.is_empty()
419 && let Some(markdown) = markdown_for_raw_output(&raw_output, &language_registry, cx)
420 {
421 self.content
422 .push(ToolCallContent::ContentBlock(ContentBlock::Markdown {
423 markdown,
424 }));
425 }
426 self.raw_output = Some(raw_output);
427 }
428 Ok(())
429 }
430
431 pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
432 self.content.iter().filter_map(|content| match content {
433 ToolCallContent::Diff(diff) => Some(diff),
434 ToolCallContent::ContentBlock(_) => None,
435 ToolCallContent::Terminal(_) => None,
436 })
437 }
438
439 pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
440 self.content.iter().filter_map(|content| match content {
441 ToolCallContent::Terminal(terminal) => Some(terminal),
442 ToolCallContent::ContentBlock(_) => None,
443 ToolCallContent::Diff(_) => None,
444 })
445 }
446
447 pub fn is_subagent(&self) -> bool {
448 self.tool_name.as_ref().is_some_and(|s| s == "spawn_agent")
449 || self.subagent_session_info.is_some()
450 }
451
452 pub fn to_markdown(&self, cx: &App) -> String {
453 let mut markdown = format!(
454 "**Tool Call: {}**\nStatus: {}\n\n",
455 self.label.read(cx).source(),
456 self.status
457 );
458 for content in &self.content {
459 markdown.push_str(content.to_markdown(cx).as_str());
460 markdown.push_str("\n\n");
461 }
462 markdown
463 }
464
465 async fn resolve_location(
466 location: acp::ToolCallLocation,
467 project: WeakEntity<Project>,
468 cx: &mut AsyncApp,
469 ) -> Option<ResolvedLocation> {
470 let buffer = project
471 .update(cx, |project, cx| {
472 project
473 .project_path_for_absolute_path(&location.path, cx)
474 .map(|path| project.open_buffer(path, cx))
475 })
476 .ok()??;
477 let buffer = buffer.await.log_err()?;
478 let position = buffer.update(cx, |buffer, _| {
479 let snapshot = buffer.snapshot();
480 if let Some(row) = location.line {
481 let column = snapshot.indent_size_for_line(row).len;
482 let point = snapshot.clip_point(Point::new(row, column), Bias::Left);
483 snapshot.anchor_before(point)
484 } else {
485 Anchor::min_for_buffer(snapshot.remote_id())
486 }
487 });
488
489 Some(ResolvedLocation { buffer, position })
490 }
491
492 fn resolve_locations(
493 &self,
494 project: Entity<Project>,
495 cx: &mut App,
496 ) -> Task<Vec<Option<ResolvedLocation>>> {
497 let locations = self.locations.clone();
498 project.update(cx, |_, cx| {
499 cx.spawn(async move |project, cx| {
500 let mut new_locations = Vec::new();
501 for location in locations {
502 new_locations.push(Self::resolve_location(location, project.clone(), cx).await);
503 }
504 new_locations
505 })
506 })
507 }
508}
509
510// Separate so we can hold a strong reference to the buffer
511// for saving on the thread
512#[derive(Clone, Debug, PartialEq, Eq)]
513struct ResolvedLocation {
514 buffer: Entity<Buffer>,
515 position: Anchor,
516}
517
518impl From<&ResolvedLocation> for AgentLocation {
519 fn from(value: &ResolvedLocation) -> Self {
520 Self {
521 buffer: value.buffer.downgrade(),
522 position: value.position,
523 }
524 }
525}
526
527#[derive(Debug, Clone)]
528pub enum SelectedPermissionParams {
529 Terminal { patterns: Vec<String> },
530}
531
532#[derive(Debug)]
533pub struct SelectedPermissionOutcome {
534 pub option_id: acp::PermissionOptionId,
535 pub option_kind: acp::PermissionOptionKind,
536 pub params: Option<SelectedPermissionParams>,
537}
538
539impl SelectedPermissionOutcome {
540 pub fn new(option_id: acp::PermissionOptionId, option_kind: acp::PermissionOptionKind) -> Self {
541 Self {
542 option_id,
543 option_kind,
544 params: None,
545 }
546 }
547
548 pub fn params(mut self, params: Option<SelectedPermissionParams>) -> Self {
549 self.params = params;
550 self
551 }
552}
553
554impl From<SelectedPermissionOutcome> for acp::SelectedPermissionOutcome {
555 fn from(value: SelectedPermissionOutcome) -> Self {
556 Self::new(value.option_id)
557 }
558}
559
560#[derive(Debug)]
561pub enum RequestPermissionOutcome {
562 Cancelled,
563 Selected(SelectedPermissionOutcome),
564}
565
566impl From<RequestPermissionOutcome> for acp::RequestPermissionOutcome {
567 fn from(value: RequestPermissionOutcome) -> Self {
568 match value {
569 RequestPermissionOutcome::Cancelled => Self::Cancelled,
570 RequestPermissionOutcome::Selected(outcome) => Self::Selected(outcome.into()),
571 }
572 }
573}
574
575#[derive(Debug)]
576pub enum ToolCallStatus {
577 /// The tool call hasn't started running yet, but we start showing it to
578 /// the user.
579 Pending,
580 /// The tool call is waiting for confirmation from the user.
581 WaitingForConfirmation {
582 options: PermissionOptions,
583 respond_tx: oneshot::Sender<SelectedPermissionOutcome>,
584 },
585 /// The tool call is currently running.
586 InProgress,
587 /// The tool call completed successfully.
588 Completed,
589 /// The tool call failed.
590 Failed,
591 /// The user rejected the tool call.
592 Rejected,
593 /// The user canceled generation so the tool call was canceled.
594 Canceled,
595}
596
597impl From<acp::ToolCallStatus> for ToolCallStatus {
598 fn from(status: acp::ToolCallStatus) -> Self {
599 match status {
600 acp::ToolCallStatus::Pending => Self::Pending,
601 acp::ToolCallStatus::InProgress => Self::InProgress,
602 acp::ToolCallStatus::Completed => Self::Completed,
603 acp::ToolCallStatus::Failed => Self::Failed,
604 _ => Self::Pending,
605 }
606 }
607}
608
609impl Display for ToolCallStatus {
610 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
611 write!(
612 f,
613 "{}",
614 match self {
615 ToolCallStatus::Pending => "Pending",
616 ToolCallStatus::WaitingForConfirmation { .. } => "Waiting for confirmation",
617 ToolCallStatus::InProgress => "In Progress",
618 ToolCallStatus::Completed => "Completed",
619 ToolCallStatus::Failed => "Failed",
620 ToolCallStatus::Rejected => "Rejected",
621 ToolCallStatus::Canceled => "Canceled",
622 }
623 )
624 }
625}
626
627#[derive(Debug, PartialEq, Clone)]
628pub enum ContentBlock {
629 Empty,
630 Markdown { markdown: Entity<Markdown> },
631 ResourceLink { resource_link: acp::ResourceLink },
632 Image { image: Arc<gpui::Image> },
633}
634
635impl ContentBlock {
636 pub fn new(
637 block: acp::ContentBlock,
638 language_registry: &Arc<LanguageRegistry>,
639 path_style: PathStyle,
640 cx: &mut App,
641 ) -> Self {
642 let mut this = Self::Empty;
643 this.append(block, language_registry, path_style, cx);
644 this
645 }
646
647 pub fn new_combined(
648 blocks: impl IntoIterator<Item = acp::ContentBlock>,
649 language_registry: Arc<LanguageRegistry>,
650 path_style: PathStyle,
651 cx: &mut App,
652 ) -> Self {
653 let mut this = Self::Empty;
654 for block in blocks {
655 this.append(block, &language_registry, path_style, cx);
656 }
657 this
658 }
659
660 pub fn append(
661 &mut self,
662 block: acp::ContentBlock,
663 language_registry: &Arc<LanguageRegistry>,
664 path_style: PathStyle,
665 cx: &mut App,
666 ) {
667 match (&mut *self, &block) {
668 (ContentBlock::Empty, acp::ContentBlock::ResourceLink(resource_link)) => {
669 *self = ContentBlock::ResourceLink {
670 resource_link: resource_link.clone(),
671 };
672 }
673 (ContentBlock::Empty, acp::ContentBlock::Image(image_content)) => {
674 if let Some(image) = Self::decode_image(image_content) {
675 *self = ContentBlock::Image { image };
676 } else {
677 let new_content = Self::image_md(image_content);
678 *self = Self::create_markdown_block(new_content, language_registry, cx);
679 }
680 }
681 (ContentBlock::Empty, _) => {
682 let new_content = Self::block_string_contents(&block, path_style);
683 *self = Self::create_markdown_block(new_content, language_registry, cx);
684 }
685 (ContentBlock::Markdown { markdown }, _) => {
686 let new_content = Self::block_string_contents(&block, path_style);
687 markdown.update(cx, |markdown, cx| markdown.append(&new_content, cx));
688 }
689 (ContentBlock::ResourceLink { resource_link }, _) => {
690 let existing_content = Self::resource_link_md(&resource_link.uri, path_style);
691 let new_content = Self::block_string_contents(&block, path_style);
692 let combined = format!("{}\n{}", existing_content, new_content);
693 *self = Self::create_markdown_block(combined, language_registry, cx);
694 }
695 (ContentBlock::Image { .. }, _) => {
696 let new_content = Self::block_string_contents(&block, path_style);
697 let combined = format!("`Image`\n{}", new_content);
698 *self = Self::create_markdown_block(combined, language_registry, cx);
699 }
700 }
701 }
702
703 fn decode_image(image_content: &acp::ImageContent) -> Option<Arc<gpui::Image>> {
704 use base64::Engine as _;
705
706 let bytes = base64::engine::general_purpose::STANDARD
707 .decode(image_content.data.as_bytes())
708 .ok()?;
709 let format = gpui::ImageFormat::from_mime_type(&image_content.mime_type)?;
710 Some(Arc::new(gpui::Image::from_bytes(format, bytes)))
711 }
712
713 fn create_markdown_block(
714 content: String,
715 language_registry: &Arc<LanguageRegistry>,
716 cx: &mut App,
717 ) -> ContentBlock {
718 ContentBlock::Markdown {
719 markdown: cx
720 .new(|cx| Markdown::new(content.into(), Some(language_registry.clone()), None, cx)),
721 }
722 }
723
724 fn block_string_contents(block: &acp::ContentBlock, path_style: PathStyle) -> String {
725 match block {
726 acp::ContentBlock::Text(text_content) => text_content.text.clone(),
727 acp::ContentBlock::ResourceLink(resource_link) => {
728 Self::resource_link_md(&resource_link.uri, path_style)
729 }
730 acp::ContentBlock::Resource(acp::EmbeddedResource {
731 resource:
732 acp::EmbeddedResourceResource::TextResourceContents(acp::TextResourceContents {
733 uri,
734 ..
735 }),
736 ..
737 }) => Self::resource_link_md(uri, path_style),
738 acp::ContentBlock::Image(image) => Self::image_md(image),
739 _ => String::new(),
740 }
741 }
742
743 fn resource_link_md(uri: &str, path_style: PathStyle) -> String {
744 if let Some(uri) = MentionUri::parse(uri, path_style).log_err() {
745 uri.as_link().to_string()
746 } else {
747 uri.to_string()
748 }
749 }
750
751 fn image_md(_image: &acp::ImageContent) -> String {
752 "`Image`".into()
753 }
754
755 pub fn to_markdown<'a>(&'a self, cx: &'a App) -> &'a str {
756 match self {
757 ContentBlock::Empty => "",
758 ContentBlock::Markdown { markdown } => markdown.read(cx).source(),
759 ContentBlock::ResourceLink { resource_link } => &resource_link.uri,
760 ContentBlock::Image { .. } => "`Image`",
761 }
762 }
763
764 pub fn markdown(&self) -> Option<&Entity<Markdown>> {
765 match self {
766 ContentBlock::Empty => None,
767 ContentBlock::Markdown { markdown } => Some(markdown),
768 ContentBlock::ResourceLink { .. } => None,
769 ContentBlock::Image { .. } => None,
770 }
771 }
772
773 pub fn resource_link(&self) -> Option<&acp::ResourceLink> {
774 match self {
775 ContentBlock::ResourceLink { resource_link } => Some(resource_link),
776 _ => None,
777 }
778 }
779
780 pub fn image(&self) -> Option<&Arc<gpui::Image>> {
781 match self {
782 ContentBlock::Image { image } => Some(image),
783 _ => None,
784 }
785 }
786}
787
788#[derive(Debug)]
789pub enum ToolCallContent {
790 ContentBlock(ContentBlock),
791 Diff(Entity<Diff>),
792 Terminal(Entity<Terminal>),
793}
794
795impl ToolCallContent {
796 pub fn from_acp(
797 content: acp::ToolCallContent,
798 language_registry: Arc<LanguageRegistry>,
799 path_style: PathStyle,
800 terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
801 cx: &mut App,
802 ) -> Result<Option<Self>> {
803 match content {
804 acp::ToolCallContent::Content(acp::Content { content, .. }) => {
805 Ok(Some(Self::ContentBlock(ContentBlock::new(
806 content,
807 &language_registry,
808 path_style,
809 cx,
810 ))))
811 }
812 acp::ToolCallContent::Diff(diff) => Ok(Some(Self::Diff(cx.new(|cx| {
813 Diff::finalized(
814 diff.path.to_string_lossy().into_owned(),
815 diff.old_text,
816 diff.new_text,
817 language_registry,
818 cx,
819 )
820 })))),
821 acp::ToolCallContent::Terminal(acp::Terminal { terminal_id, .. }) => terminals
822 .get(&terminal_id)
823 .cloned()
824 .map(|terminal| Some(Self::Terminal(terminal)))
825 .ok_or_else(|| anyhow::anyhow!("Terminal with id `{}` not found", terminal_id)),
826 _ => Ok(None),
827 }
828 }
829
830 pub fn update_from_acp(
831 &mut self,
832 new: acp::ToolCallContent,
833 language_registry: Arc<LanguageRegistry>,
834 path_style: PathStyle,
835 terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
836 cx: &mut App,
837 ) -> Result<bool> {
838 let needs_update = match (&self, &new) {
839 (Self::Diff(old_diff), acp::ToolCallContent::Diff(new_diff)) => {
840 old_diff.read(cx).needs_update(
841 new_diff.old_text.as_deref().unwrap_or(""),
842 &new_diff.new_text,
843 cx,
844 )
845 }
846 _ => true,
847 };
848
849 if let Some(update) = Self::from_acp(new, language_registry, path_style, terminals, cx)? {
850 if needs_update {
851 *self = update;
852 }
853 Ok(true)
854 } else {
855 Ok(false)
856 }
857 }
858
859 pub fn to_markdown(&self, cx: &App) -> String {
860 match self {
861 Self::ContentBlock(content) => content.to_markdown(cx).to_string(),
862 Self::Diff(diff) => diff.read(cx).to_markdown(cx),
863 Self::Terminal(terminal) => terminal.read(cx).to_markdown(cx),
864 }
865 }
866
867 pub fn image(&self) -> Option<&Arc<gpui::Image>> {
868 match self {
869 Self::ContentBlock(content) => content.image(),
870 _ => None,
871 }
872 }
873}
874
875#[derive(Debug, PartialEq)]
876pub enum ToolCallUpdate {
877 UpdateFields(acp::ToolCallUpdate),
878 UpdateDiff(ToolCallUpdateDiff),
879 UpdateTerminal(ToolCallUpdateTerminal),
880}
881
882impl ToolCallUpdate {
883 fn id(&self) -> &acp::ToolCallId {
884 match self {
885 Self::UpdateFields(update) => &update.tool_call_id,
886 Self::UpdateDiff(diff) => &diff.id,
887 Self::UpdateTerminal(terminal) => &terminal.id,
888 }
889 }
890}
891
892impl From<acp::ToolCallUpdate> for ToolCallUpdate {
893 fn from(update: acp::ToolCallUpdate) -> Self {
894 Self::UpdateFields(update)
895 }
896}
897
898impl From<ToolCallUpdateDiff> for ToolCallUpdate {
899 fn from(diff: ToolCallUpdateDiff) -> Self {
900 Self::UpdateDiff(diff)
901 }
902}
903
904#[derive(Debug, PartialEq)]
905pub struct ToolCallUpdateDiff {
906 pub id: acp::ToolCallId,
907 pub diff: Entity<Diff>,
908}
909
910impl From<ToolCallUpdateTerminal> for ToolCallUpdate {
911 fn from(terminal: ToolCallUpdateTerminal) -> Self {
912 Self::UpdateTerminal(terminal)
913 }
914}
915
916#[derive(Debug, PartialEq)]
917pub struct ToolCallUpdateTerminal {
918 pub id: acp::ToolCallId,
919 pub terminal: Entity<Terminal>,
920}
921
922#[derive(Debug, Default)]
923pub struct Plan {
924 pub entries: Vec<PlanEntry>,
925}
926
927#[derive(Debug)]
928pub struct PlanStats<'a> {
929 pub in_progress_entry: Option<&'a PlanEntry>,
930 pub pending: u32,
931 pub completed: u32,
932}
933
934impl Plan {
935 pub fn is_empty(&self) -> bool {
936 self.entries.is_empty()
937 }
938
939 pub fn stats(&self) -> PlanStats<'_> {
940 let mut stats = PlanStats {
941 in_progress_entry: None,
942 pending: 0,
943 completed: 0,
944 };
945
946 for entry in &self.entries {
947 match &entry.status {
948 acp::PlanEntryStatus::Pending => {
949 stats.pending += 1;
950 }
951 acp::PlanEntryStatus::InProgress => {
952 stats.in_progress_entry = stats.in_progress_entry.or(Some(entry));
953 stats.pending += 1;
954 }
955 acp::PlanEntryStatus::Completed => {
956 stats.completed += 1;
957 }
958 _ => {}
959 }
960 }
961
962 stats
963 }
964}
965
966#[derive(Debug)]
967pub struct PlanEntry {
968 pub content: Entity<Markdown>,
969 pub priority: acp::PlanEntryPriority,
970 pub status: acp::PlanEntryStatus,
971}
972
973impl PlanEntry {
974 pub fn from_acp(entry: acp::PlanEntry, cx: &mut App) -> Self {
975 Self {
976 content: cx.new(|cx| Markdown::new(entry.content.into(), None, None, cx)),
977 priority: entry.priority,
978 status: entry.status,
979 }
980 }
981}
982
983#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
984pub struct TokenUsage {
985 pub max_tokens: u64,
986 pub used_tokens: u64,
987 pub input_tokens: u64,
988 pub output_tokens: u64,
989 pub max_output_tokens: Option<u64>,
990}
991
992pub const TOKEN_USAGE_WARNING_THRESHOLD: f32 = 0.8;
993
994impl TokenUsage {
995 pub fn ratio(&self) -> TokenUsageRatio {
996 #[cfg(debug_assertions)]
997 let warning_threshold: f32 = std::env::var("ZED_THREAD_WARNING_THRESHOLD")
998 .unwrap_or(TOKEN_USAGE_WARNING_THRESHOLD.to_string())
999 .parse()
1000 .unwrap();
1001 #[cfg(not(debug_assertions))]
1002 let warning_threshold: f32 = TOKEN_USAGE_WARNING_THRESHOLD;
1003
1004 // When the maximum is unknown because there is no selected model,
1005 // avoid showing the token limit warning.
1006 if self.max_tokens == 0 {
1007 TokenUsageRatio::Normal
1008 } else if self.used_tokens >= self.max_tokens {
1009 TokenUsageRatio::Exceeded
1010 } else if self.used_tokens as f32 / self.max_tokens as f32 >= warning_threshold {
1011 TokenUsageRatio::Warning
1012 } else {
1013 TokenUsageRatio::Normal
1014 }
1015 }
1016}
1017
1018#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
1019pub enum TokenUsageRatio {
1020 Normal,
1021 Warning,
1022 Exceeded,
1023}
1024
1025#[derive(Debug, Clone)]
1026pub struct RetryStatus {
1027 pub last_error: SharedString,
1028 pub attempt: usize,
1029 pub max_attempts: usize,
1030 pub started_at: Instant,
1031 pub duration: Duration,
1032}
1033
1034struct RunningTurn {
1035 id: u32,
1036 send_task: Task<()>,
1037}
1038
1039pub struct AcpThread {
1040 thread_id: ThreadId,
1041 session_id: acp::SessionId,
1042 work_dirs: Option<PathList>,
1043 parent_thread_id: Option<ThreadId>,
1044 title: Option<SharedString>,
1045 provisional_title: Option<SharedString>,
1046 entries: Vec<AgentThreadEntry>,
1047 plan: Plan,
1048 project: Entity<Project>,
1049 action_log: Entity<ActionLog>,
1050 shared_buffers: HashMap<Entity<Buffer>, BufferSnapshot>,
1051 turn_id: u32,
1052 running_turn: Option<RunningTurn>,
1053 connection: Rc<dyn AgentConnection>,
1054 token_usage: Option<TokenUsage>,
1055 prompt_capabilities: acp::PromptCapabilities,
1056 available_commands: Vec<acp::AvailableCommand>,
1057 _observe_prompt_capabilities: Task<anyhow::Result<()>>,
1058 terminals: HashMap<acp::TerminalId, Entity<Terminal>>,
1059 pending_terminal_output: HashMap<acp::TerminalId, Vec<Vec<u8>>>,
1060 pending_terminal_exit: HashMap<acp::TerminalId, acp::TerminalExitStatus>,
1061 had_error: bool,
1062 /// The user's unsent prompt text, persisted so it can be restored when reloading the thread.
1063 draft_prompt: Option<Vec<acp::ContentBlock>>,
1064 /// The initial scroll position for the thread view, set during session registration.
1065 ui_scroll_position: Option<gpui::ListOffset>,
1066 /// Buffer for smooth text streaming. Holds text that has been received from
1067 /// the model but not yet revealed in the UI. A timer task drains this buffer
1068 /// gradually to create a fluid typing effect instead of choppy chunk-at-a-time
1069 /// updates.
1070 streaming_text_buffer: Option<StreamingTextBuffer>,
1071}
1072
1073struct StreamingTextBuffer {
1074 /// Text received from the model but not yet appended to the Markdown source.
1075 pending: String,
1076 /// The number of bytes to reveal per timer turn.
1077 bytes_to_reveal_per_tick: usize,
1078 /// The Markdown entity being streamed into.
1079 target: Entity<Markdown>,
1080 /// Timer task that periodically moves text from `pending` into `source`.
1081 _reveal_task: Task<()>,
1082}
1083
1084impl StreamingTextBuffer {
1085 /// The number of milliseconds between each timer tick, controlling how quickly
1086 /// text is revealed.
1087 const TASK_UPDATE_MS: u64 = 16;
1088 /// The time in milliseconds to reveal the entire pending text.
1089 const REVEAL_TARGET: f32 = 200.0;
1090}
1091
1092impl From<&AcpThread> for ActionLogTelemetry {
1093 fn from(value: &AcpThread) -> Self {
1094 Self {
1095 agent_telemetry_id: value.connection().telemetry_id(),
1096 session_id: value.session_id.0.clone(),
1097 }
1098 }
1099}
1100
1101#[derive(Debug)]
1102pub enum AcpThreadEvent {
1103 NewEntry,
1104 TitleUpdated,
1105 TokenUsageUpdated,
1106 EntryUpdated(usize),
1107 EntriesRemoved(Range<usize>),
1108 ToolAuthorizationRequested(acp::ToolCallId),
1109 ToolAuthorizationReceived(acp::ToolCallId),
1110 Retry(RetryStatus),
1111 SubagentSpawned(acp::SessionId),
1112 Stopped(acp::StopReason),
1113 Error,
1114 LoadError(LoadError),
1115 PromptCapabilitiesUpdated,
1116 Refusal,
1117 AvailableCommandsUpdated(Vec<acp::AvailableCommand>),
1118 ModeUpdated(acp::SessionModeId),
1119 ConfigOptionsUpdated(Vec<acp::SessionConfigOption>),
1120 WorkingDirectoriesUpdated,
1121}
1122
1123impl EventEmitter<AcpThreadEvent> for AcpThread {}
1124
1125#[derive(Debug, Clone)]
1126pub enum TerminalProviderEvent {
1127 Created {
1128 terminal_id: acp::TerminalId,
1129 label: String,
1130 cwd: Option<PathBuf>,
1131 output_byte_limit: Option<u64>,
1132 terminal: Entity<::terminal::Terminal>,
1133 },
1134 Output {
1135 terminal_id: acp::TerminalId,
1136 data: Vec<u8>,
1137 },
1138 TitleChanged {
1139 terminal_id: acp::TerminalId,
1140 title: String,
1141 },
1142 Exit {
1143 terminal_id: acp::TerminalId,
1144 status: acp::TerminalExitStatus,
1145 },
1146}
1147
1148#[derive(Debug, Clone)]
1149pub enum TerminalProviderCommand {
1150 WriteInput {
1151 terminal_id: acp::TerminalId,
1152 bytes: Vec<u8>,
1153 },
1154 Resize {
1155 terminal_id: acp::TerminalId,
1156 cols: u16,
1157 rows: u16,
1158 },
1159 Close {
1160 terminal_id: acp::TerminalId,
1161 },
1162}
1163
1164#[derive(PartialEq, Eq, Debug)]
1165pub enum ThreadStatus {
1166 Idle,
1167 Generating,
1168}
1169
1170#[derive(Debug, Clone)]
1171pub enum LoadError {
1172 Unsupported {
1173 command: SharedString,
1174 current_version: SharedString,
1175 minimum_version: SharedString,
1176 },
1177 FailedToInstall(SharedString),
1178 Exited {
1179 status: ExitStatus,
1180 },
1181 Other(SharedString),
1182}
1183
1184impl Display for LoadError {
1185 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1186 match self {
1187 LoadError::Unsupported {
1188 command: path,
1189 current_version,
1190 minimum_version,
1191 } => {
1192 write!(
1193 f,
1194 "version {current_version} from {path} is not supported (need at least {minimum_version})"
1195 )
1196 }
1197 LoadError::FailedToInstall(msg) => write!(f, "Failed to install: {msg}"),
1198 LoadError::Exited { status } => write!(f, "Server exited with status {status}"),
1199 LoadError::Other(msg) => write!(f, "{msg}"),
1200 }
1201 }
1202}
1203
1204impl Error for LoadError {}
1205
1206impl AcpThread {
1207 pub fn new(
1208 parent_thread_id: Option<ThreadId>,
1209 title: Option<SharedString>,
1210 work_dirs: Option<PathList>,
1211 connection: Rc<dyn AgentConnection>,
1212 project: Entity<Project>,
1213 action_log: Entity<ActionLog>,
1214 thread_id: ThreadId,
1215 session_id: acp::SessionId,
1216 mut prompt_capabilities_rx: watch::Receiver<acp::PromptCapabilities>,
1217 cx: &mut Context<Self>,
1218 ) -> Self {
1219 let prompt_capabilities = prompt_capabilities_rx.borrow().clone();
1220 let task = cx.spawn::<_, anyhow::Result<()>>(async move |this, cx| {
1221 loop {
1222 let caps = prompt_capabilities_rx.recv().await?;
1223 this.update(cx, |this, cx| {
1224 this.prompt_capabilities = caps;
1225 cx.emit(AcpThreadEvent::PromptCapabilitiesUpdated);
1226 })?;
1227 }
1228 });
1229
1230 Self {
1231 thread_id,
1232 parent_thread_id,
1233 work_dirs,
1234 action_log,
1235 shared_buffers: Default::default(),
1236 entries: Default::default(),
1237 plan: Default::default(),
1238 title,
1239 provisional_title: None,
1240 project,
1241 running_turn: None,
1242 turn_id: 0,
1243 connection,
1244 session_id,
1245 token_usage: None,
1246 prompt_capabilities,
1247 available_commands: Vec::new(),
1248 _observe_prompt_capabilities: task,
1249 terminals: HashMap::default(),
1250 pending_terminal_output: HashMap::default(),
1251 pending_terminal_exit: HashMap::default(),
1252 had_error: false,
1253 draft_prompt: None,
1254 ui_scroll_position: None,
1255 streaming_text_buffer: None,
1256 }
1257 }
1258
1259 pub fn thread_id(&self) -> ThreadId {
1260 self.thread_id
1261 }
1262
1263 pub fn parent_thread_id(&self) -> Option<ThreadId> {
1264 self.parent_thread_id
1265 }
1266
1267 pub fn prompt_capabilities(&self) -> acp::PromptCapabilities {
1268 self.prompt_capabilities.clone()
1269 }
1270
1271 pub fn available_commands(&self) -> &[acp::AvailableCommand] {
1272 &self.available_commands
1273 }
1274
1275 pub fn draft_prompt(&self) -> Option<&[acp::ContentBlock]> {
1276 self.draft_prompt.as_deref()
1277 }
1278
1279 pub fn set_draft_prompt(&mut self, prompt: Option<Vec<acp::ContentBlock>>) {
1280 self.draft_prompt = prompt;
1281 }
1282
1283 pub fn ui_scroll_position(&self) -> Option<gpui::ListOffset> {
1284 self.ui_scroll_position
1285 }
1286
1287 pub fn set_ui_scroll_position(&mut self, position: Option<gpui::ListOffset>) {
1288 self.ui_scroll_position = position;
1289 }
1290
1291 pub fn connection(&self) -> &Rc<dyn AgentConnection> {
1292 &self.connection
1293 }
1294
1295 pub fn action_log(&self) -> &Entity<ActionLog> {
1296 &self.action_log
1297 }
1298
1299 pub fn project(&self) -> &Entity<Project> {
1300 &self.project
1301 }
1302
1303 pub fn title(&self) -> Option<SharedString> {
1304 self.title
1305 .clone()
1306 .or_else(|| self.provisional_title.clone())
1307 }
1308
1309 pub fn has_provisional_title(&self) -> bool {
1310 self.provisional_title.is_some()
1311 }
1312
1313 pub fn entries(&self) -> &[AgentThreadEntry] {
1314 &self.entries
1315 }
1316
1317 pub fn session_id(&self) -> &acp::SessionId {
1318 &self.session_id
1319 }
1320
1321 pub fn work_dirs(&self) -> Option<&PathList> {
1322 self.work_dirs.as_ref()
1323 }
1324
1325 pub fn set_work_dirs(&mut self, work_dirs: PathList, cx: &mut Context<Self>) {
1326 self.work_dirs = Some(work_dirs);
1327 cx.emit(AcpThreadEvent::WorkingDirectoriesUpdated)
1328 }
1329
1330 pub fn status(&self) -> ThreadStatus {
1331 if self.running_turn.is_some() {
1332 ThreadStatus::Generating
1333 } else {
1334 ThreadStatus::Idle
1335 }
1336 }
1337
1338 pub fn had_error(&self) -> bool {
1339 self.had_error
1340 }
1341
1342 pub fn is_waiting_for_confirmation(&self) -> bool {
1343 for entry in self.entries.iter().rev() {
1344 match entry {
1345 AgentThreadEntry::UserMessage(_) => return false,
1346 AgentThreadEntry::ToolCall(ToolCall {
1347 status: ToolCallStatus::WaitingForConfirmation { .. },
1348 ..
1349 }) => return true,
1350 AgentThreadEntry::ToolCall(_)
1351 | AgentThreadEntry::AssistantMessage(_)
1352 | AgentThreadEntry::CompletedPlan(_) => {}
1353 }
1354 }
1355 false
1356 }
1357
1358 pub fn token_usage(&self) -> Option<&TokenUsage> {
1359 self.token_usage.as_ref()
1360 }
1361
1362 pub fn has_pending_edit_tool_calls(&self) -> bool {
1363 for entry in self.entries.iter().rev() {
1364 match entry {
1365 AgentThreadEntry::UserMessage(_) => return false,
1366 AgentThreadEntry::ToolCall(
1367 call @ ToolCall {
1368 status: ToolCallStatus::InProgress | ToolCallStatus::Pending,
1369 ..
1370 },
1371 ) if call.diffs().next().is_some() => {
1372 return true;
1373 }
1374 AgentThreadEntry::ToolCall(_)
1375 | AgentThreadEntry::AssistantMessage(_)
1376 | AgentThreadEntry::CompletedPlan(_) => {}
1377 }
1378 }
1379
1380 false
1381 }
1382
1383 pub fn has_in_progress_tool_calls(&self) -> bool {
1384 for entry in self.entries.iter().rev() {
1385 match entry {
1386 AgentThreadEntry::UserMessage(_) => return false,
1387 AgentThreadEntry::ToolCall(ToolCall {
1388 status: ToolCallStatus::InProgress | ToolCallStatus::Pending,
1389 ..
1390 }) => {
1391 return true;
1392 }
1393 AgentThreadEntry::ToolCall(_)
1394 | AgentThreadEntry::AssistantMessage(_)
1395 | AgentThreadEntry::CompletedPlan(_) => {}
1396 }
1397 }
1398
1399 false
1400 }
1401
1402 pub fn used_tools_since_last_user_message(&self) -> bool {
1403 for entry in self.entries.iter().rev() {
1404 match entry {
1405 AgentThreadEntry::UserMessage(..) => return false,
1406 AgentThreadEntry::AssistantMessage(..) | AgentThreadEntry::CompletedPlan(..) => {
1407 continue;
1408 }
1409 AgentThreadEntry::ToolCall(..) => return true,
1410 }
1411 }
1412
1413 false
1414 }
1415
1416 pub fn handle_session_update(
1417 &mut self,
1418 update: acp::SessionUpdate,
1419 cx: &mut Context<Self>,
1420 ) -> Result<(), acp::Error> {
1421 match update {
1422 acp::SessionUpdate::UserMessageChunk(acp::ContentChunk { content, .. }) => {
1423 // We optimistically add the full user prompt before calling `prompt`.
1424 // Some ACP servers echo user chunks back over updates. Skip the chunk if
1425 // it's already present in the current user message to avoid duplicating content.
1426 let already_in_user_message = self
1427 .entries
1428 .last()
1429 .and_then(|entry| entry.user_message())
1430 .is_some_and(|message| message.chunks.contains(&content));
1431 if !already_in_user_message {
1432 self.push_user_content_block(None, content, cx);
1433 }
1434 }
1435 acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk { content, .. }) => {
1436 self.push_assistant_content_block(content, false, cx);
1437 }
1438 acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk { content, .. }) => {
1439 self.push_assistant_content_block(content, true, cx);
1440 }
1441 acp::SessionUpdate::ToolCall(tool_call) => {
1442 self.upsert_tool_call(tool_call, cx)?;
1443 }
1444 acp::SessionUpdate::ToolCallUpdate(tool_call_update) => {
1445 self.update_tool_call(tool_call_update, cx)?;
1446 }
1447 acp::SessionUpdate::Plan(plan) => {
1448 self.update_plan(plan, cx);
1449 }
1450 acp::SessionUpdate::SessionInfoUpdate(info_update) => {
1451 if let acp::MaybeUndefined::Value(title) = info_update.title {
1452 let had_provisional = self.provisional_title.take().is_some();
1453 let title: SharedString = title.into();
1454 if self.title.as_ref() != Some(&title) {
1455 self.title = Some(title);
1456 cx.emit(AcpThreadEvent::TitleUpdated);
1457 } else if had_provisional {
1458 cx.emit(AcpThreadEvent::TitleUpdated);
1459 }
1460 }
1461 }
1462 acp::SessionUpdate::AvailableCommandsUpdate(acp::AvailableCommandsUpdate {
1463 available_commands,
1464 ..
1465 }) => {
1466 self.available_commands = available_commands.clone();
1467 cx.emit(AcpThreadEvent::AvailableCommandsUpdated(available_commands));
1468 }
1469 acp::SessionUpdate::CurrentModeUpdate(acp::CurrentModeUpdate {
1470 current_mode_id,
1471 ..
1472 }) => cx.emit(AcpThreadEvent::ModeUpdated(current_mode_id)),
1473 acp::SessionUpdate::ConfigOptionUpdate(acp::ConfigOptionUpdate {
1474 config_options,
1475 ..
1476 }) => cx.emit(AcpThreadEvent::ConfigOptionsUpdated(config_options)),
1477 _ => {}
1478 }
1479 Ok(())
1480 }
1481
1482 pub fn push_user_content_block(
1483 &mut self,
1484 message_id: Option<UserMessageId>,
1485 chunk: acp::ContentBlock,
1486 cx: &mut Context<Self>,
1487 ) {
1488 self.push_user_content_block_with_indent(message_id, chunk, false, cx)
1489 }
1490
1491 pub fn push_user_content_block_with_indent(
1492 &mut self,
1493 message_id: Option<UserMessageId>,
1494 chunk: acp::ContentBlock,
1495 indented: bool,
1496 cx: &mut Context<Self>,
1497 ) {
1498 let language_registry = self.project.read(cx).languages().clone();
1499 let path_style = self.project.read(cx).path_style(cx);
1500 let entries_len = self.entries.len();
1501
1502 if let Some(last_entry) = self.entries.last_mut()
1503 && let AgentThreadEntry::UserMessage(UserMessage {
1504 id,
1505 content,
1506 chunks,
1507 indented: existing_indented,
1508 ..
1509 }) = last_entry
1510 && *existing_indented == indented
1511 {
1512 Self::flush_streaming_text(&mut self.streaming_text_buffer, cx);
1513 *id = message_id.or(id.take());
1514 content.append(chunk.clone(), &language_registry, path_style, cx);
1515 chunks.push(chunk);
1516 let idx = entries_len - 1;
1517 cx.emit(AcpThreadEvent::EntryUpdated(idx));
1518 } else {
1519 let content = ContentBlock::new(chunk.clone(), &language_registry, path_style, cx);
1520 self.push_entry(
1521 AgentThreadEntry::UserMessage(UserMessage {
1522 id: message_id,
1523 content,
1524 chunks: vec![chunk],
1525 checkpoint: None,
1526 indented,
1527 }),
1528 cx,
1529 );
1530 }
1531 }
1532
1533 pub fn push_assistant_content_block(
1534 &mut self,
1535 chunk: acp::ContentBlock,
1536 is_thought: bool,
1537 cx: &mut Context<Self>,
1538 ) {
1539 self.push_assistant_content_block_with_indent(chunk, is_thought, false, cx)
1540 }
1541
1542 pub fn push_assistant_content_block_with_indent(
1543 &mut self,
1544 chunk: acp::ContentBlock,
1545 is_thought: bool,
1546 indented: bool,
1547 cx: &mut Context<Self>,
1548 ) {
1549 let path_style = self.project.read(cx).path_style(cx);
1550
1551 // For text chunks going to an existing Markdown block, buffer for smooth
1552 // streaming instead of appending all at once which may feel more choppy.
1553 if let acp::ContentBlock::Text(text_content) = &chunk {
1554 if let Some(markdown) = self.streaming_markdown_target(is_thought, indented) {
1555 let entries_len = self.entries.len();
1556 cx.emit(AcpThreadEvent::EntryUpdated(entries_len - 1));
1557 self.buffer_streaming_text(&markdown, text_content.text.clone(), cx);
1558 return;
1559 }
1560 }
1561
1562 let language_registry = self.project.read(cx).languages().clone();
1563 let entries_len = self.entries.len();
1564 if let Some(last_entry) = self.entries.last_mut()
1565 && let AgentThreadEntry::AssistantMessage(AssistantMessage {
1566 chunks,
1567 indented: existing_indented,
1568 is_subagent_output: _,
1569 }) = last_entry
1570 && *existing_indented == indented
1571 {
1572 let idx = entries_len - 1;
1573 Self::flush_streaming_text(&mut self.streaming_text_buffer, cx);
1574 cx.emit(AcpThreadEvent::EntryUpdated(idx));
1575 match (chunks.last_mut(), is_thought) {
1576 (Some(AssistantMessageChunk::Message { block }), false)
1577 | (Some(AssistantMessageChunk::Thought { block }), true) => {
1578 block.append(chunk, &language_registry, path_style, cx)
1579 }
1580 _ => {
1581 let block = ContentBlock::new(chunk, &language_registry, path_style, cx);
1582 if is_thought {
1583 chunks.push(AssistantMessageChunk::Thought { block })
1584 } else {
1585 chunks.push(AssistantMessageChunk::Message { block })
1586 }
1587 }
1588 }
1589 } else {
1590 let block = ContentBlock::new(chunk, &language_registry, path_style, cx);
1591 let chunk = if is_thought {
1592 AssistantMessageChunk::Thought { block }
1593 } else {
1594 AssistantMessageChunk::Message { block }
1595 };
1596
1597 self.push_entry(
1598 AgentThreadEntry::AssistantMessage(AssistantMessage {
1599 chunks: vec![chunk],
1600 indented,
1601 is_subagent_output: false,
1602 }),
1603 cx,
1604 );
1605 }
1606 }
1607
1608 fn streaming_markdown_target(
1609 &self,
1610 is_thought: bool,
1611 indented: bool,
1612 ) -> Option<Entity<Markdown>> {
1613 let last_entry = self.entries.last()?;
1614 if let AgentThreadEntry::AssistantMessage(AssistantMessage {
1615 chunks,
1616 indented: existing_indented,
1617 ..
1618 }) = last_entry
1619 && *existing_indented == indented
1620 && let [.., chunk] = chunks.as_slice()
1621 {
1622 match (chunk, is_thought) {
1623 (
1624 AssistantMessageChunk::Message {
1625 block: ContentBlock::Markdown { markdown },
1626 },
1627 false,
1628 )
1629 | (
1630 AssistantMessageChunk::Thought {
1631 block: ContentBlock::Markdown { markdown },
1632 },
1633 true,
1634 ) => Some(markdown.clone()),
1635 _ => None,
1636 }
1637 } else {
1638 None
1639 }
1640 }
1641
1642 /// Add text to the streaming buffer. If the target changed (e.g. switching
1643 /// from thoughts to message text), flush the old buffer first.
1644 fn buffer_streaming_text(
1645 &mut self,
1646 markdown: &Entity<Markdown>,
1647 text: String,
1648 cx: &mut Context<Self>,
1649 ) {
1650 if let Some(buffer) = &mut self.streaming_text_buffer {
1651 if buffer.target.entity_id() == markdown.entity_id() {
1652 buffer.pending.push_str(&text);
1653
1654 buffer.bytes_to_reveal_per_tick = (buffer.pending.len() as f32
1655 / StreamingTextBuffer::REVEAL_TARGET
1656 * StreamingTextBuffer::TASK_UPDATE_MS as f32)
1657 .ceil() as usize;
1658 return;
1659 }
1660 Self::flush_streaming_text(&mut self.streaming_text_buffer, cx);
1661 }
1662
1663 let target = markdown.clone();
1664 let _reveal_task = self.start_streaming_reveal(cx);
1665 let pending_len = text.len();
1666 let bytes_to_reveal = (pending_len as f32 / StreamingTextBuffer::REVEAL_TARGET
1667 * StreamingTextBuffer::TASK_UPDATE_MS as f32)
1668 .ceil() as usize;
1669 self.streaming_text_buffer = Some(StreamingTextBuffer {
1670 pending: text,
1671 bytes_to_reveal_per_tick: bytes_to_reveal,
1672 target,
1673 _reveal_task,
1674 });
1675 }
1676
1677 /// Flush all buffered streaming text into the Markdown entity immediately.
1678 fn flush_streaming_text(
1679 streaming_text_buffer: &mut Option<StreamingTextBuffer>,
1680 cx: &mut Context<Self>,
1681 ) {
1682 if let Some(buffer) = streaming_text_buffer.take() {
1683 if !buffer.pending.is_empty() {
1684 buffer
1685 .target
1686 .update(cx, |markdown, cx| markdown.append(&buffer.pending, cx));
1687 }
1688 }
1689 }
1690
1691 /// Spawns a foreground task that periodically drains
1692 /// `streaming_text_buffer.pending` into the target `Markdown` entity,
1693 /// producing smooth, continuous text output.
1694 fn start_streaming_reveal(&self, cx: &mut Context<Self>) -> Task<()> {
1695 cx.spawn(async move |this, cx| {
1696 loop {
1697 cx.background_executor()
1698 .timer(Duration::from_millis(StreamingTextBuffer::TASK_UPDATE_MS))
1699 .await;
1700
1701 let should_continue = this
1702 .update(cx, |this, cx| {
1703 let Some(buffer) = &mut this.streaming_text_buffer else {
1704 return false;
1705 };
1706
1707 if buffer.pending.is_empty() {
1708 return true;
1709 }
1710
1711 let pending_len = buffer.pending.len();
1712
1713 let byte_boundary = buffer
1714 .pending
1715 .ceil_char_boundary(buffer.bytes_to_reveal_per_tick)
1716 .min(pending_len);
1717
1718 buffer.target.update(cx, |markdown: &mut Markdown, cx| {
1719 markdown.append(&buffer.pending[..byte_boundary], cx);
1720 buffer.pending.drain(..byte_boundary);
1721 });
1722
1723 true
1724 })
1725 .unwrap_or(false);
1726
1727 if !should_continue {
1728 break;
1729 }
1730 }
1731 })
1732 }
1733
1734 fn push_entry(&mut self, entry: AgentThreadEntry, cx: &mut Context<Self>) {
1735 Self::flush_streaming_text(&mut self.streaming_text_buffer, cx);
1736 self.entries.push(entry);
1737 cx.emit(AcpThreadEvent::NewEntry);
1738 }
1739
1740 pub fn can_set_title(&mut self, cx: &mut Context<Self>) -> bool {
1741 self.connection.set_title(&self.session_id, cx).is_some()
1742 }
1743
1744 pub fn set_title(&mut self, title: SharedString, cx: &mut Context<Self>) -> Task<Result<()>> {
1745 let had_provisional = self.provisional_title.take().is_some();
1746 if self.title.as_ref() != Some(&title) {
1747 self.title = Some(title.clone());
1748 cx.emit(AcpThreadEvent::TitleUpdated);
1749 if let Some(set_title) = self.connection.set_title(&self.session_id, cx) {
1750 return set_title.run(title, cx);
1751 }
1752 } else if had_provisional {
1753 cx.emit(AcpThreadEvent::TitleUpdated);
1754 }
1755 Task::ready(Ok(()))
1756 }
1757
1758 /// Sets a provisional display title without propagating back to the
1759 /// underlying agent connection. This is used for quick preview titles
1760 /// (e.g. first 20 chars of the user message) that should be shown
1761 /// immediately but replaced once the LLM generates a proper title via
1762 /// `set_title`.
1763 pub fn set_provisional_title(&mut self, title: SharedString, cx: &mut Context<Self>) {
1764 self.provisional_title = Some(title);
1765 cx.emit(AcpThreadEvent::TitleUpdated);
1766 }
1767
1768 pub fn subagent_spawned(&mut self, session_id: acp::SessionId, cx: &mut Context<Self>) {
1769 cx.emit(AcpThreadEvent::SubagentSpawned(session_id));
1770 }
1771
1772 pub fn update_token_usage(&mut self, usage: Option<TokenUsage>, cx: &mut Context<Self>) {
1773 self.token_usage = usage;
1774 cx.emit(AcpThreadEvent::TokenUsageUpdated);
1775 }
1776
1777 pub fn update_retry_status(&mut self, status: RetryStatus, cx: &mut Context<Self>) {
1778 cx.emit(AcpThreadEvent::Retry(status));
1779 }
1780
1781 pub fn update_tool_call(
1782 &mut self,
1783 update: impl Into<ToolCallUpdate>,
1784 cx: &mut Context<Self>,
1785 ) -> Result<()> {
1786 let update = update.into();
1787 let languages = self.project.read(cx).languages().clone();
1788 let path_style = self.project.read(cx).path_style(cx);
1789
1790 let ix = match self.index_for_tool_call(update.id()) {
1791 Some(ix) => ix,
1792 None => {
1793 // Tool call not found - create a failed tool call entry
1794 let failed_tool_call = ToolCall {
1795 id: update.id().clone(),
1796 label: cx.new(|cx| Markdown::new("Tool call not found".into(), None, None, cx)),
1797 kind: acp::ToolKind::Fetch,
1798 content: vec![ToolCallContent::ContentBlock(ContentBlock::new(
1799 "Tool call not found".into(),
1800 &languages,
1801 path_style,
1802 cx,
1803 ))],
1804 status: ToolCallStatus::Failed,
1805 locations: Vec::new(),
1806 resolved_locations: Vec::new(),
1807 raw_input: None,
1808 raw_input_markdown: None,
1809 raw_output: None,
1810 tool_name: None,
1811 subagent_session_info: None,
1812 };
1813 self.push_entry(AgentThreadEntry::ToolCall(failed_tool_call), cx);
1814 return Ok(());
1815 }
1816 };
1817 let AgentThreadEntry::ToolCall(call) = &mut self.entries[ix] else {
1818 unreachable!()
1819 };
1820
1821 match update {
1822 ToolCallUpdate::UpdateFields(update) => {
1823 let location_updated = update.fields.locations.is_some();
1824 call.update_fields(
1825 update.fields,
1826 update.meta,
1827 languages,
1828 path_style,
1829 &self.terminals,
1830 cx,
1831 )?;
1832 if location_updated {
1833 self.resolve_locations(update.tool_call_id, cx);
1834 }
1835 }
1836 ToolCallUpdate::UpdateDiff(update) => {
1837 call.content.clear();
1838 call.content.push(ToolCallContent::Diff(update.diff));
1839 }
1840 ToolCallUpdate::UpdateTerminal(update) => {
1841 call.content.clear();
1842 call.content
1843 .push(ToolCallContent::Terminal(update.terminal));
1844 }
1845 }
1846
1847 cx.emit(AcpThreadEvent::EntryUpdated(ix));
1848
1849 Ok(())
1850 }
1851
1852 /// Updates a tool call if id matches an existing entry, otherwise inserts a new one.
1853 pub fn upsert_tool_call(
1854 &mut self,
1855 tool_call: acp::ToolCall,
1856 cx: &mut Context<Self>,
1857 ) -> Result<(), acp::Error> {
1858 let status = tool_call.status.into();
1859 self.upsert_tool_call_inner(tool_call.into(), status, cx)
1860 }
1861
1862 /// Fails if id does not match an existing entry.
1863 pub fn upsert_tool_call_inner(
1864 &mut self,
1865 update: acp::ToolCallUpdate,
1866 status: ToolCallStatus,
1867 cx: &mut Context<Self>,
1868 ) -> Result<(), acp::Error> {
1869 let language_registry = self.project.read(cx).languages().clone();
1870 let path_style = self.project.read(cx).path_style(cx);
1871 let id = update.tool_call_id.clone();
1872
1873 let agent_telemetry_id = self.connection().telemetry_id();
1874 let session = self.session_id();
1875 let parent_thread_id = self.parent_thread_id();
1876 if let ToolCallStatus::Completed | ToolCallStatus::Failed = status {
1877 let status = if matches!(status, ToolCallStatus::Completed) {
1878 "completed"
1879 } else {
1880 "failed"
1881 };
1882 telemetry::event!(
1883 "Agent Tool Call Completed",
1884 agent_telemetry_id,
1885 session,
1886 parent_thread_id,
1887 status
1888 );
1889 }
1890
1891 if let Some(ix) = self.index_for_tool_call(&id) {
1892 let AgentThreadEntry::ToolCall(call) = &mut self.entries[ix] else {
1893 unreachable!()
1894 };
1895
1896 call.update_fields(
1897 update.fields,
1898 update.meta,
1899 language_registry,
1900 path_style,
1901 &self.terminals,
1902 cx,
1903 )?;
1904 call.status = status;
1905
1906 cx.emit(AcpThreadEvent::EntryUpdated(ix));
1907 } else {
1908 let call = ToolCall::from_acp(
1909 update.try_into()?,
1910 status,
1911 language_registry,
1912 self.project.read(cx).path_style(cx),
1913 &self.terminals,
1914 cx,
1915 )?;
1916 self.push_entry(AgentThreadEntry::ToolCall(call), cx);
1917 };
1918
1919 self.resolve_locations(id, cx);
1920 Ok(())
1921 }
1922
1923 fn index_for_tool_call(&self, id: &acp::ToolCallId) -> Option<usize> {
1924 self.entries
1925 .iter()
1926 .enumerate()
1927 .rev()
1928 .find_map(|(index, entry)| {
1929 if let AgentThreadEntry::ToolCall(tool_call) = entry
1930 && &tool_call.id == id
1931 {
1932 Some(index)
1933 } else {
1934 None
1935 }
1936 })
1937 }
1938
1939 fn tool_call_mut(&mut self, id: &acp::ToolCallId) -> Option<(usize, &mut ToolCall)> {
1940 // The tool call we are looking for is typically the last one, or very close to the end.
1941 // At the moment, it doesn't seem like a hashmap would be a good fit for this use case.
1942 self.entries
1943 .iter_mut()
1944 .enumerate()
1945 .rev()
1946 .find_map(|(index, tool_call)| {
1947 if let AgentThreadEntry::ToolCall(tool_call) = tool_call
1948 && &tool_call.id == id
1949 {
1950 Some((index, tool_call))
1951 } else {
1952 None
1953 }
1954 })
1955 }
1956
1957 pub fn tool_call(&self, id: &acp::ToolCallId) -> Option<(usize, &ToolCall)> {
1958 self.entries
1959 .iter()
1960 .enumerate()
1961 .rev()
1962 .find_map(|(index, tool_call)| {
1963 if let AgentThreadEntry::ToolCall(tool_call) = tool_call
1964 && &tool_call.id == id
1965 {
1966 Some((index, tool_call))
1967 } else {
1968 None
1969 }
1970 })
1971 }
1972
1973 pub fn tool_call_for_subagent(&self, session_id: &acp::SessionId) -> Option<&ToolCall> {
1974 self.entries.iter().find_map(|entry| match entry {
1975 AgentThreadEntry::ToolCall(tool_call) => {
1976 if let Some(subagent_session_info) = &tool_call.subagent_session_info
1977 && &subagent_session_info.session_id == session_id
1978 {
1979 Some(tool_call)
1980 } else {
1981 None
1982 }
1983 }
1984 _ => None,
1985 })
1986 }
1987
1988 pub fn resolve_locations(&mut self, id: acp::ToolCallId, cx: &mut Context<Self>) {
1989 let project = self.project.clone();
1990 let should_update_agent_location = self.parent_thread_id.is_none();
1991 let Some((_, tool_call)) = self.tool_call_mut(&id) else {
1992 return;
1993 };
1994 let task = tool_call.resolve_locations(project, cx);
1995 cx.spawn(async move |this, cx| {
1996 let resolved_locations = task.await;
1997
1998 this.update(cx, |this, cx| {
1999 let project = this.project.clone();
2000
2001 for location in resolved_locations.iter().flatten() {
2002 this.shared_buffers
2003 .insert(location.buffer.clone(), location.buffer.read(cx).snapshot());
2004 }
2005 let Some((ix, tool_call)) = this.tool_call_mut(&id) else {
2006 return;
2007 };
2008
2009 if let Some(Some(location)) = resolved_locations.last() {
2010 project.update(cx, |project, cx| {
2011 let should_ignore = if let Some(agent_location) = project
2012 .agent_location()
2013 .filter(|agent_location| agent_location.buffer == location.buffer)
2014 {
2015 let snapshot = location.buffer.read(cx).snapshot();
2016 let old_position = agent_location.position.to_point(&snapshot);
2017 let new_position = location.position.to_point(&snapshot);
2018
2019 // ignore this so that when we get updates from the edit tool
2020 // the position doesn't reset to the startof line
2021 old_position.row == new_position.row
2022 && old_position.column > new_position.column
2023 } else {
2024 false
2025 };
2026 if !should_ignore && should_update_agent_location {
2027 project.set_agent_location(Some(location.into()), cx);
2028 }
2029 });
2030 }
2031
2032 let resolved_locations = resolved_locations
2033 .iter()
2034 .map(|l| l.as_ref().map(|l| AgentLocation::from(l)))
2035 .collect::<Vec<_>>();
2036
2037 if tool_call.resolved_locations != resolved_locations {
2038 tool_call.resolved_locations = resolved_locations;
2039 cx.emit(AcpThreadEvent::EntryUpdated(ix));
2040 }
2041 })
2042 })
2043 .detach();
2044 }
2045
2046 pub fn request_tool_call_authorization(
2047 &mut self,
2048 tool_call: acp::ToolCallUpdate,
2049 options: PermissionOptions,
2050 cx: &mut Context<Self>,
2051 ) -> Result<Task<RequestPermissionOutcome>> {
2052 let (tx, rx) = oneshot::channel();
2053
2054 let status = ToolCallStatus::WaitingForConfirmation {
2055 options,
2056 respond_tx: tx,
2057 };
2058
2059 let tool_call_id = tool_call.tool_call_id.clone();
2060 self.upsert_tool_call_inner(tool_call, status, cx)?;
2061 cx.emit(AcpThreadEvent::ToolAuthorizationRequested(
2062 tool_call_id.clone(),
2063 ));
2064
2065 Ok(cx.spawn(async move |this, cx| {
2066 let outcome = match rx.await {
2067 Ok(outcome) => RequestPermissionOutcome::Selected(outcome),
2068 Err(oneshot::Canceled) => RequestPermissionOutcome::Cancelled,
2069 };
2070 this.update(cx, |_this, cx| {
2071 cx.emit(AcpThreadEvent::ToolAuthorizationReceived(tool_call_id))
2072 })
2073 .ok();
2074 outcome
2075 }))
2076 }
2077
2078 pub fn authorize_tool_call(
2079 &mut self,
2080 id: acp::ToolCallId,
2081 outcome: SelectedPermissionOutcome,
2082 cx: &mut Context<Self>,
2083 ) {
2084 let Some((ix, call)) = self.tool_call_mut(&id) else {
2085 return;
2086 };
2087
2088 let new_status = match outcome.option_kind {
2089 acp::PermissionOptionKind::RejectOnce | acp::PermissionOptionKind::RejectAlways => {
2090 ToolCallStatus::Rejected
2091 }
2092 acp::PermissionOptionKind::AllowOnce | acp::PermissionOptionKind::AllowAlways => {
2093 ToolCallStatus::InProgress
2094 }
2095 _ => ToolCallStatus::InProgress,
2096 };
2097
2098 let curr_status = mem::replace(&mut call.status, new_status);
2099
2100 if let ToolCallStatus::WaitingForConfirmation { respond_tx, .. } = curr_status {
2101 respond_tx.send(outcome).log_err();
2102 } else if cfg!(debug_assertions) {
2103 panic!("tried to authorize an already authorized tool call");
2104 }
2105
2106 cx.emit(AcpThreadEvent::EntryUpdated(ix));
2107 }
2108
2109 pub fn plan(&self) -> &Plan {
2110 &self.plan
2111 }
2112
2113 pub fn update_plan(&mut self, request: acp::Plan, cx: &mut Context<Self>) {
2114 let new_entries_len = request.entries.len();
2115 let mut new_entries = request.entries.into_iter();
2116
2117 // Reuse existing markdown to prevent flickering
2118 for (old, new) in self.plan.entries.iter_mut().zip(new_entries.by_ref()) {
2119 let PlanEntry {
2120 content,
2121 priority,
2122 status,
2123 } = old;
2124 content.update(cx, |old, cx| {
2125 old.replace(new.content, cx);
2126 });
2127 *priority = new.priority;
2128 *status = new.status;
2129 }
2130 for new in new_entries {
2131 self.plan.entries.push(PlanEntry::from_acp(new, cx))
2132 }
2133 self.plan.entries.truncate(new_entries_len);
2134
2135 cx.notify();
2136 }
2137
2138 pub fn snapshot_completed_plan(&mut self, cx: &mut Context<Self>) {
2139 if !self.plan.is_empty() && self.plan.stats().pending == 0 {
2140 let completed_entries = std::mem::take(&mut self.plan.entries);
2141 self.push_entry(AgentThreadEntry::CompletedPlan(completed_entries), cx);
2142 }
2143 }
2144
2145 fn clear_completed_plan_entries(&mut self, cx: &mut Context<Self>) {
2146 self.plan
2147 .entries
2148 .retain(|entry| !matches!(entry.status, acp::PlanEntryStatus::Completed));
2149 cx.notify();
2150 }
2151
2152 pub fn clear_plan(&mut self, cx: &mut Context<Self>) {
2153 self.plan.entries.clear();
2154 cx.notify();
2155 }
2156
2157 #[cfg(any(test, feature = "test-support"))]
2158 pub fn send_raw(
2159 &mut self,
2160 message: &str,
2161 cx: &mut Context<Self>,
2162 ) -> BoxFuture<'static, Result<Option<acp::PromptResponse>>> {
2163 self.send(vec![message.into()], cx)
2164 }
2165
2166 pub fn send(
2167 &mut self,
2168 message: Vec<acp::ContentBlock>,
2169 cx: &mut Context<Self>,
2170 ) -> BoxFuture<'static, Result<Option<acp::PromptResponse>>> {
2171 let block = ContentBlock::new_combined(
2172 message.clone(),
2173 self.project.read(cx).languages().clone(),
2174 self.project.read(cx).path_style(cx),
2175 cx,
2176 );
2177 let request = acp::PromptRequest::new(self.session_id.clone(), message.clone());
2178 let git_store = self.project.read(cx).git_store().clone();
2179
2180 let message_id = if self.connection.truncate(&self.session_id, cx).is_some() {
2181 Some(UserMessageId::new())
2182 } else {
2183 None
2184 };
2185
2186 self.run_turn(cx, async move |this, cx| {
2187 this.update(cx, |this, cx| {
2188 this.push_entry(
2189 AgentThreadEntry::UserMessage(UserMessage {
2190 id: message_id.clone(),
2191 content: block,
2192 chunks: message,
2193 checkpoint: None,
2194 indented: false,
2195 }),
2196 cx,
2197 );
2198 })
2199 .ok();
2200
2201 let old_checkpoint = git_store
2202 .update(cx, |git, cx| git.checkpoint(cx))
2203 .await
2204 .context("failed to get old checkpoint")
2205 .log_err();
2206 this.update(cx, |this, cx| {
2207 if let Some((_ix, message)) = this.last_user_message() {
2208 message.checkpoint = old_checkpoint.map(|git_checkpoint| Checkpoint {
2209 git_checkpoint,
2210 show: false,
2211 });
2212 }
2213 this.connection.prompt(message_id, request, cx)
2214 })?
2215 .await
2216 })
2217 }
2218
2219 pub fn can_retry(&self, cx: &App) -> bool {
2220 self.connection.retry(&self.session_id, cx).is_some()
2221 }
2222
2223 pub fn retry(
2224 &mut self,
2225 cx: &mut Context<Self>,
2226 ) -> BoxFuture<'static, Result<Option<acp::PromptResponse>>> {
2227 self.run_turn(cx, async move |this, cx| {
2228 this.update(cx, |this, cx| {
2229 this.connection
2230 .retry(&this.session_id, cx)
2231 .map(|retry| retry.run(cx))
2232 })?
2233 .context("retrying a session is not supported")?
2234 .await
2235 })
2236 }
2237
2238 fn run_turn(
2239 &mut self,
2240 cx: &mut Context<Self>,
2241 f: impl 'static + AsyncFnOnce(WeakEntity<Self>, &mut AsyncApp) -> Result<acp::PromptResponse>,
2242 ) -> BoxFuture<'static, Result<Option<acp::PromptResponse>>> {
2243 self.clear_completed_plan_entries(cx);
2244 self.had_error = false;
2245
2246 let (tx, rx) = oneshot::channel();
2247 let cancel_task = self.cancel(cx);
2248
2249 self.turn_id += 1;
2250 let turn_id = self.turn_id;
2251 self.running_turn = Some(RunningTurn {
2252 id: turn_id,
2253 send_task: cx.spawn(async move |this, cx| {
2254 cancel_task.await;
2255 tx.send(f(this, cx).await).ok();
2256 }),
2257 });
2258
2259 cx.spawn(async move |this, cx| {
2260 let response = rx.await;
2261
2262 this.update(cx, |this, cx| this.update_last_checkpoint(cx))?
2263 .await?;
2264
2265 this.update(cx, |this, cx| {
2266 if this.parent_thread_id.is_none() {
2267 this.project
2268 .update(cx, |project, cx| project.set_agent_location(None, cx));
2269 }
2270 let Ok(response) = response else {
2271 // tx dropped, just return
2272 return Ok(None);
2273 };
2274
2275 let is_same_turn = this
2276 .running_turn
2277 .as_ref()
2278 .is_some_and(|turn| turn_id == turn.id);
2279
2280 // If the user submitted a follow up message, running_turn might
2281 // already point to a different turn. Therefore we only want to
2282 // take the task if it's the same turn.
2283 if is_same_turn {
2284 this.running_turn.take();
2285 }
2286
2287 match response {
2288 Ok(r) => {
2289 Self::flush_streaming_text(&mut this.streaming_text_buffer, cx);
2290
2291 if r.stop_reason == acp::StopReason::MaxTokens {
2292 this.had_error = true;
2293 cx.emit(AcpThreadEvent::Error);
2294 log::error!("Max tokens reached. Usage: {:?}", this.token_usage);
2295
2296 let exceeded_max_output_tokens =
2297 this.token_usage.as_ref().is_some_and(|u| {
2298 u.max_output_tokens
2299 .is_some_and(|max| u.output_tokens >= max)
2300 });
2301
2302 let message = if exceeded_max_output_tokens {
2303 log::error!(
2304 "Max output tokens reached. Usage: {:?}",
2305 this.token_usage
2306 );
2307 "Maximum output tokens reached"
2308 } else {
2309 log::error!("Max tokens reached. Usage: {:?}", this.token_usage);
2310 "Maximum tokens reached"
2311 };
2312 return Err(anyhow!(message));
2313 }
2314
2315 let canceled = matches!(r.stop_reason, acp::StopReason::Cancelled);
2316 if canceled {
2317 this.mark_pending_tools_as_canceled();
2318 }
2319
2320 if !canceled {
2321 this.snapshot_completed_plan(cx);
2322 }
2323
2324 // Handle refusal - distinguish between user prompt and tool call refusals
2325 if let acp::StopReason::Refusal = r.stop_reason {
2326 this.had_error = true;
2327 if let Some((user_msg_ix, _)) = this.last_user_message() {
2328 // Check if there's a completed tool call with results after the last user message
2329 // This indicates the refusal is in response to tool output, not the user's prompt
2330 let has_completed_tool_call_after_user_msg =
2331 this.entries.iter().skip(user_msg_ix + 1).any(|entry| {
2332 if let AgentThreadEntry::ToolCall(tool_call) = entry {
2333 // Check if the tool call has completed and has output
2334 matches!(tool_call.status, ToolCallStatus::Completed)
2335 && tool_call.raw_output.is_some()
2336 } else {
2337 false
2338 }
2339 });
2340
2341 if has_completed_tool_call_after_user_msg {
2342 // Refusal is due to tool output - don't truncate, just notify
2343 // The model refused based on what the tool returned
2344 cx.emit(AcpThreadEvent::Refusal);
2345 } else {
2346 // User prompt was refused - truncate back to before the user message
2347 let range = user_msg_ix..this.entries.len();
2348 if range.start < range.end {
2349 this.entries.truncate(user_msg_ix);
2350 cx.emit(AcpThreadEvent::EntriesRemoved(range));
2351 }
2352 cx.emit(AcpThreadEvent::Refusal);
2353 }
2354 } else {
2355 // No user message found, treat as general refusal
2356 cx.emit(AcpThreadEvent::Refusal);
2357 }
2358 }
2359
2360 cx.emit(AcpThreadEvent::Stopped(r.stop_reason));
2361 Ok(Some(r))
2362 }
2363 Err(e) => {
2364 Self::flush_streaming_text(&mut this.streaming_text_buffer, cx);
2365
2366 this.had_error = true;
2367 cx.emit(AcpThreadEvent::Error);
2368 log::error!("Error in run turn: {:?}", e);
2369 Err(e)
2370 }
2371 }
2372 })?
2373 })
2374 .boxed()
2375 }
2376
2377 pub fn cancel(&mut self, cx: &mut Context<Self>) -> Task<()> {
2378 let Some(turn) = self.running_turn.take() else {
2379 return Task::ready(());
2380 };
2381 self.connection.cancel(&self.session_id, cx);
2382
2383 Self::flush_streaming_text(&mut self.streaming_text_buffer, cx);
2384 self.mark_pending_tools_as_canceled();
2385
2386 // Wait for the send task to complete
2387 cx.background_spawn(turn.send_task)
2388 }
2389
2390 fn mark_pending_tools_as_canceled(&mut self) {
2391 for entry in self.entries.iter_mut() {
2392 if let AgentThreadEntry::ToolCall(call) = entry {
2393 let cancel = matches!(
2394 call.status,
2395 ToolCallStatus::Pending
2396 | ToolCallStatus::WaitingForConfirmation { .. }
2397 | ToolCallStatus::InProgress
2398 );
2399
2400 if cancel {
2401 call.status = ToolCallStatus::Canceled;
2402 }
2403 }
2404 }
2405 }
2406
2407 /// Restores the git working tree to the state at the given checkpoint (if one exists)
2408 pub fn restore_checkpoint(
2409 &mut self,
2410 id: UserMessageId,
2411 cx: &mut Context<Self>,
2412 ) -> Task<Result<()>> {
2413 let Some((_, message)) = self.user_message_mut(&id) else {
2414 return Task::ready(Err(anyhow!("message not found")));
2415 };
2416
2417 let checkpoint = message
2418 .checkpoint
2419 .as_ref()
2420 .map(|c| c.git_checkpoint.clone());
2421
2422 // Cancel any in-progress generation before restoring
2423 let cancel_task = self.cancel(cx);
2424 let rewind = self.rewind(id.clone(), cx);
2425 let git_store = self.project.read(cx).git_store().clone();
2426
2427 cx.spawn(async move |_, cx| {
2428 cancel_task.await;
2429 rewind.await?;
2430 if let Some(checkpoint) = checkpoint {
2431 git_store
2432 .update(cx, |git, cx| git.restore_checkpoint(checkpoint, cx))
2433 .await?;
2434 }
2435
2436 Ok(())
2437 })
2438 }
2439
2440 /// Rewinds this thread to before the entry at `index`, removing it and all
2441 /// subsequent entries while rejecting any action_log changes made from that point.
2442 /// Unlike `restore_checkpoint`, this method does not restore from git.
2443 pub fn rewind(&mut self, id: UserMessageId, cx: &mut Context<Self>) -> Task<Result<()>> {
2444 let Some(truncate) = self.connection.truncate(&self.session_id, cx) else {
2445 return Task::ready(Err(anyhow!("not supported")));
2446 };
2447
2448 Self::flush_streaming_text(&mut self.streaming_text_buffer, cx);
2449 let telemetry = ActionLogTelemetry::from(&*self);
2450 cx.spawn(async move |this, cx| {
2451 cx.update(|cx| truncate.run(id.clone(), cx)).await?;
2452 this.update(cx, |this, cx| {
2453 if let Some((ix, _)) = this.user_message_mut(&id) {
2454 // Collect all terminals from entries that will be removed
2455 let terminals_to_remove: Vec<acp::TerminalId> = this.entries[ix..]
2456 .iter()
2457 .flat_map(|entry| entry.terminals())
2458 .filter_map(|terminal| terminal.read(cx).id().clone().into())
2459 .collect();
2460
2461 let range = ix..this.entries.len();
2462 this.entries.truncate(ix);
2463 cx.emit(AcpThreadEvent::EntriesRemoved(range));
2464
2465 // Kill and remove the terminals
2466 for terminal_id in terminals_to_remove {
2467 if let Some(terminal) = this.terminals.remove(&terminal_id) {
2468 terminal.update(cx, |terminal, cx| {
2469 terminal.kill(cx);
2470 });
2471 }
2472 }
2473 }
2474 this.action_log().update(cx, |action_log, cx| {
2475 action_log.reject_all_edits(Some(telemetry), cx)
2476 })
2477 })?
2478 .await;
2479 Ok(())
2480 })
2481 }
2482
2483 fn update_last_checkpoint(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
2484 let git_store = self.project.read(cx).git_store().clone();
2485
2486 let Some((_, message)) = self.last_user_message() else {
2487 return Task::ready(Ok(()));
2488 };
2489 let Some(user_message_id) = message.id.clone() else {
2490 return Task::ready(Ok(()));
2491 };
2492 let Some(checkpoint) = message.checkpoint.as_ref() else {
2493 return Task::ready(Ok(()));
2494 };
2495 let old_checkpoint = checkpoint.git_checkpoint.clone();
2496
2497 let new_checkpoint = git_store.update(cx, |git, cx| git.checkpoint(cx));
2498 cx.spawn(async move |this, cx| {
2499 let Some(new_checkpoint) = new_checkpoint
2500 .await
2501 .context("failed to get new checkpoint")
2502 .log_err()
2503 else {
2504 return Ok(());
2505 };
2506
2507 let equal = git_store
2508 .update(cx, |git, cx| {
2509 git.compare_checkpoints(old_checkpoint.clone(), new_checkpoint, cx)
2510 })
2511 .await
2512 .unwrap_or(true);
2513
2514 this.update(cx, |this, cx| {
2515 if let Some((ix, message)) = this.user_message_mut(&user_message_id) {
2516 if let Some(checkpoint) = message.checkpoint.as_mut() {
2517 checkpoint.show = !equal;
2518 cx.emit(AcpThreadEvent::EntryUpdated(ix));
2519 }
2520 }
2521 })?;
2522
2523 Ok(())
2524 })
2525 }
2526
2527 fn last_user_message(&mut self) -> Option<(usize, &mut UserMessage)> {
2528 self.entries
2529 .iter_mut()
2530 .enumerate()
2531 .rev()
2532 .find_map(|(ix, entry)| {
2533 if let AgentThreadEntry::UserMessage(message) = entry {
2534 Some((ix, message))
2535 } else {
2536 None
2537 }
2538 })
2539 }
2540
2541 fn user_message_mut(&mut self, id: &UserMessageId) -> Option<(usize, &mut UserMessage)> {
2542 self.entries.iter_mut().enumerate().find_map(|(ix, entry)| {
2543 if let AgentThreadEntry::UserMessage(message) = entry {
2544 if message.id.as_ref() == Some(id) {
2545 Some((ix, message))
2546 } else {
2547 None
2548 }
2549 } else {
2550 None
2551 }
2552 })
2553 }
2554
2555 pub fn read_text_file(
2556 &self,
2557 path: PathBuf,
2558 line: Option<u32>,
2559 limit: Option<u32>,
2560 reuse_shared_snapshot: bool,
2561 cx: &mut Context<Self>,
2562 ) -> Task<Result<String, acp::Error>> {
2563 // Args are 1-based, move to 0-based
2564 let line = line.unwrap_or_default().saturating_sub(1);
2565 let limit = limit.unwrap_or(u32::MAX);
2566 let project = self.project.clone();
2567 let action_log = self.action_log.clone();
2568 let should_update_agent_location = self.parent_thread_id.is_none();
2569 cx.spawn(async move |this, cx| {
2570 let load = project.update(cx, |project, cx| {
2571 let path = project
2572 .project_path_for_absolute_path(&path, cx)
2573 .ok_or_else(|| {
2574 acp::Error::resource_not_found(Some(path.display().to_string()))
2575 })?;
2576 Ok::<_, acp::Error>(project.open_buffer(path, cx))
2577 })?;
2578
2579 let buffer = load.await?;
2580
2581 let snapshot = if reuse_shared_snapshot {
2582 this.read_with(cx, |this, _| {
2583 this.shared_buffers.get(&buffer.clone()).cloned()
2584 })
2585 .log_err()
2586 .flatten()
2587 } else {
2588 None
2589 };
2590
2591 let snapshot = if let Some(snapshot) = snapshot {
2592 snapshot
2593 } else {
2594 action_log.update(cx, |action_log, cx| {
2595 action_log.buffer_read(buffer.clone(), cx);
2596 });
2597
2598 let snapshot = buffer.update(cx, |buffer, _| buffer.snapshot());
2599 this.update(cx, |this, _| {
2600 this.shared_buffers.insert(buffer.clone(), snapshot.clone());
2601 })?;
2602 snapshot
2603 };
2604
2605 let max_point = snapshot.max_point();
2606 let start_position = Point::new(line, 0);
2607
2608 if start_position > max_point {
2609 return Err(acp::Error::invalid_params().data(format!(
2610 "Attempting to read beyond the end of the file, line {}:{}",
2611 max_point.row + 1,
2612 max_point.column
2613 )));
2614 }
2615
2616 let start = snapshot.anchor_before(start_position);
2617 let end = snapshot.anchor_before(Point::new(line.saturating_add(limit), 0));
2618
2619 if should_update_agent_location {
2620 project.update(cx, |project, cx| {
2621 project.set_agent_location(
2622 Some(AgentLocation {
2623 buffer: buffer.downgrade(),
2624 position: start,
2625 }),
2626 cx,
2627 );
2628 });
2629 }
2630
2631 Ok(snapshot.text_for_range(start..end).collect::<String>())
2632 })
2633 }
2634
2635 pub fn write_text_file(
2636 &self,
2637 path: PathBuf,
2638 content: String,
2639 cx: &mut Context<Self>,
2640 ) -> Task<Result<()>> {
2641 let project = self.project.clone();
2642 let action_log = self.action_log.clone();
2643 let should_update_agent_location = self.parent_thread_id.is_none();
2644 cx.spawn(async move |this, cx| {
2645 let load = project.update(cx, |project, cx| {
2646 let path = project
2647 .project_path_for_absolute_path(&path, cx)
2648 .context("invalid path")?;
2649 anyhow::Ok(project.open_buffer(path, cx))
2650 });
2651 let buffer = load?.await?;
2652 let snapshot = this.update(cx, |this, cx| {
2653 this.shared_buffers
2654 .get(&buffer)
2655 .cloned()
2656 .unwrap_or_else(|| buffer.read(cx).snapshot())
2657 })?;
2658 let edits = cx
2659 .background_executor()
2660 .spawn(async move {
2661 let old_text = snapshot.text();
2662 text_diff(old_text.as_str(), &content)
2663 .into_iter()
2664 .map(|(range, replacement)| {
2665 (snapshot.anchor_range_inside(range), replacement)
2666 })
2667 .collect::<Vec<_>>()
2668 })
2669 .await;
2670
2671 if should_update_agent_location {
2672 project.update(cx, |project, cx| {
2673 project.set_agent_location(
2674 Some(AgentLocation {
2675 buffer: buffer.downgrade(),
2676 position: edits
2677 .last()
2678 .map(|(range, _)| range.end)
2679 .unwrap_or(Anchor::min_for_buffer(buffer.read(cx).remote_id())),
2680 }),
2681 cx,
2682 );
2683 });
2684 }
2685
2686 let format_on_save = cx.update(|cx| {
2687 action_log.update(cx, |action_log, cx| {
2688 action_log.buffer_read(buffer.clone(), cx);
2689 });
2690
2691 let format_on_save = buffer.update(cx, |buffer, cx| {
2692 buffer.edit(edits, None, cx);
2693
2694 let settings =
2695 language::language_settings::LanguageSettings::for_buffer(buffer, cx);
2696
2697 settings.format_on_save != FormatOnSave::Off
2698 });
2699 action_log.update(cx, |action_log, cx| {
2700 action_log.buffer_edited(buffer.clone(), cx);
2701 });
2702 format_on_save
2703 });
2704
2705 if format_on_save {
2706 let format_task = project.update(cx, |project, cx| {
2707 project.format(
2708 HashSet::from_iter([buffer.clone()]),
2709 LspFormatTarget::Buffers,
2710 false,
2711 FormatTrigger::Save,
2712 cx,
2713 )
2714 });
2715 format_task.await.log_err();
2716
2717 action_log.update(cx, |action_log, cx| {
2718 action_log.buffer_edited(buffer.clone(), cx);
2719 });
2720 }
2721
2722 project
2723 .update(cx, |project, cx| project.save_buffer(buffer, cx))
2724 .await
2725 })
2726 }
2727
2728 pub fn create_terminal(
2729 &self,
2730 command: String,
2731 args: Vec<String>,
2732 extra_env: Vec<acp::EnvVariable>,
2733 cwd: Option<PathBuf>,
2734 output_byte_limit: Option<u64>,
2735 cx: &mut Context<Self>,
2736 ) -> Task<Result<Entity<Terminal>>> {
2737 let env = match &cwd {
2738 Some(dir) => self.project.update(cx, |project, cx| {
2739 project.environment().update(cx, |env, cx| {
2740 env.directory_environment(dir.as_path().into(), cx)
2741 })
2742 }),
2743 None => Task::ready(None).shared(),
2744 };
2745 let env = cx.spawn(async move |_, _| {
2746 let mut env = env.await.unwrap_or_default();
2747 // Disables paging for `git` and hopefully other commands
2748 env.insert("PAGER".into(), "".into());
2749 for var in extra_env {
2750 env.insert(var.name, var.value);
2751 }
2752 env
2753 });
2754
2755 let project = self.project.clone();
2756 let language_registry = project.read(cx).languages().clone();
2757 let is_windows = project.read(cx).path_style(cx).is_windows();
2758
2759 let terminal_id = acp::TerminalId::new(Uuid::new_v4().to_string());
2760 let terminal_task = cx.spawn({
2761 let terminal_id = terminal_id.clone();
2762 async move |_this, cx| {
2763 let env = env.await;
2764 let shell = project
2765 .update(cx, |project, cx| {
2766 project
2767 .remote_client()
2768 .and_then(|r| r.read(cx).default_system_shell())
2769 })
2770 .unwrap_or_else(|| get_default_system_shell_preferring_bash());
2771 let (task_command, task_args) =
2772 ShellBuilder::new(&Shell::Program(shell), is_windows)
2773 .redirect_stdin_to_dev_null()
2774 .build(Some(command.clone()), &args);
2775 let terminal = project
2776 .update(cx, |project, cx| {
2777 project.create_terminal_task(
2778 task::SpawnInTerminal {
2779 command: Some(task_command),
2780 args: task_args,
2781 cwd: cwd.clone(),
2782 env,
2783 ..Default::default()
2784 },
2785 cx,
2786 )
2787 })
2788 .await?;
2789
2790 anyhow::Ok(cx.new(|cx| {
2791 Terminal::new(
2792 terminal_id,
2793 &format!("{} {}", command, args.join(" ")),
2794 cwd,
2795 output_byte_limit.map(|l| l as usize),
2796 terminal,
2797 language_registry,
2798 cx,
2799 )
2800 }))
2801 }
2802 });
2803
2804 cx.spawn(async move |this, cx| {
2805 let terminal = terminal_task.await?;
2806 this.update(cx, |this, _cx| {
2807 this.terminals.insert(terminal_id, terminal.clone());
2808 terminal
2809 })
2810 })
2811 }
2812
2813 pub fn kill_terminal(
2814 &mut self,
2815 terminal_id: acp::TerminalId,
2816 cx: &mut Context<Self>,
2817 ) -> Result<()> {
2818 self.terminals
2819 .get(&terminal_id)
2820 .context("Terminal not found")?
2821 .update(cx, |terminal, cx| {
2822 terminal.kill(cx);
2823 });
2824
2825 Ok(())
2826 }
2827
2828 pub fn release_terminal(
2829 &mut self,
2830 terminal_id: acp::TerminalId,
2831 cx: &mut Context<Self>,
2832 ) -> Result<()> {
2833 self.terminals
2834 .remove(&terminal_id)
2835 .context("Terminal not found")?
2836 .update(cx, |terminal, cx| {
2837 terminal.kill(cx);
2838 });
2839
2840 Ok(())
2841 }
2842
2843 pub fn terminal(&self, terminal_id: acp::TerminalId) -> Result<Entity<Terminal>> {
2844 self.terminals
2845 .get(&terminal_id)
2846 .context("Terminal not found")
2847 .cloned()
2848 }
2849
2850 pub fn to_markdown(&self, cx: &App) -> String {
2851 self.entries.iter().map(|e| e.to_markdown(cx)).collect()
2852 }
2853
2854 pub fn emit_load_error(&mut self, error: LoadError, cx: &mut Context<Self>) {
2855 cx.emit(AcpThreadEvent::LoadError(error));
2856 }
2857
2858 pub fn register_terminal_created(
2859 &mut self,
2860 terminal_id: acp::TerminalId,
2861 command_label: String,
2862 working_dir: Option<PathBuf>,
2863 output_byte_limit: Option<u64>,
2864 terminal: Entity<::terminal::Terminal>,
2865 cx: &mut Context<Self>,
2866 ) -> Entity<Terminal> {
2867 let language_registry = self.project.read(cx).languages().clone();
2868
2869 let entity = cx.new(|cx| {
2870 Terminal::new(
2871 terminal_id.clone(),
2872 &command_label,
2873 working_dir.clone(),
2874 output_byte_limit.map(|l| l as usize),
2875 terminal,
2876 language_registry,
2877 cx,
2878 )
2879 });
2880 self.terminals.insert(terminal_id.clone(), entity.clone());
2881 entity
2882 }
2883
2884 pub fn mark_as_subagent_output(&mut self, cx: &mut Context<Self>) {
2885 for entry in self.entries.iter_mut().rev() {
2886 if let AgentThreadEntry::AssistantMessage(assistant_message) = entry {
2887 assistant_message.is_subagent_output = true;
2888 cx.notify();
2889 return;
2890 }
2891 }
2892 }
2893
2894 pub fn on_terminal_provider_event(
2895 &mut self,
2896 event: TerminalProviderEvent,
2897 cx: &mut Context<Self>,
2898 ) {
2899 match event {
2900 TerminalProviderEvent::Created {
2901 terminal_id,
2902 label,
2903 cwd,
2904 output_byte_limit,
2905 terminal,
2906 } => {
2907 let entity = self.register_terminal_created(
2908 terminal_id.clone(),
2909 label,
2910 cwd,
2911 output_byte_limit,
2912 terminal,
2913 cx,
2914 );
2915
2916 if let Some(mut chunks) = self.pending_terminal_output.remove(&terminal_id) {
2917 for data in chunks.drain(..) {
2918 entity.update(cx, |term, cx| {
2919 term.inner().update(cx, |inner, cx| {
2920 inner.write_output(&data, cx);
2921 })
2922 });
2923 }
2924 }
2925
2926 if let Some(_status) = self.pending_terminal_exit.remove(&terminal_id) {
2927 entity.update(cx, |_term, cx| {
2928 cx.notify();
2929 });
2930 }
2931
2932 cx.notify();
2933 }
2934 TerminalProviderEvent::Output { terminal_id, data } => {
2935 if let Some(entity) = self.terminals.get(&terminal_id) {
2936 entity.update(cx, |term, cx| {
2937 term.inner().update(cx, |inner, cx| {
2938 inner.write_output(&data, cx);
2939 })
2940 });
2941 } else {
2942 self.pending_terminal_output
2943 .entry(terminal_id)
2944 .or_default()
2945 .push(data);
2946 }
2947 }
2948 TerminalProviderEvent::TitleChanged { terminal_id, title } => {
2949 if let Some(entity) = self.terminals.get(&terminal_id) {
2950 entity.update(cx, |term, cx| {
2951 term.inner().update(cx, |inner, cx| {
2952 inner.breadcrumb_text = title;
2953 cx.emit(::terminal::Event::BreadcrumbsChanged);
2954 })
2955 });
2956 }
2957 }
2958 TerminalProviderEvent::Exit {
2959 terminal_id,
2960 status,
2961 } => {
2962 if let Some(entity) = self.terminals.get(&terminal_id) {
2963 entity.update(cx, |_term, cx| {
2964 cx.notify();
2965 });
2966 } else {
2967 self.pending_terminal_exit.insert(terminal_id, status);
2968 }
2969 }
2970 }
2971 }
2972}
2973
2974fn markdown_for_raw_output(
2975 raw_output: &serde_json::Value,
2976 language_registry: &Arc<LanguageRegistry>,
2977 cx: &mut App,
2978) -> Option<Entity<Markdown>> {
2979 match raw_output {
2980 serde_json::Value::Null => None,
2981 serde_json::Value::Bool(value) => Some(cx.new(|cx| {
2982 Markdown::new(
2983 value.to_string().into(),
2984 Some(language_registry.clone()),
2985 None,
2986 cx,
2987 )
2988 })),
2989 serde_json::Value::Number(value) => Some(cx.new(|cx| {
2990 Markdown::new(
2991 value.to_string().into(),
2992 Some(language_registry.clone()),
2993 None,
2994 cx,
2995 )
2996 })),
2997 serde_json::Value::String(value) => Some(cx.new(|cx| {
2998 Markdown::new(
2999 value.clone().into(),
3000 Some(language_registry.clone()),
3001 None,
3002 cx,
3003 )
3004 })),
3005 value => Some(cx.new(|cx| {
3006 let pretty_json = to_string_pretty(value).unwrap_or_else(|_| value.to_string());
3007
3008 Markdown::new(
3009 format!("```json\n{}\n```", pretty_json).into(),
3010 Some(language_registry.clone()),
3011 None,
3012 cx,
3013 )
3014 })),
3015 }
3016}
3017
3018#[cfg(test)]
3019mod tests {
3020 use super::*;
3021 use anyhow::anyhow;
3022 use futures::{channel::mpsc, future::LocalBoxFuture, select};
3023 use gpui::{App, AsyncApp, TestAppContext, WeakEntity};
3024 use indoc::indoc;
3025 use project::{AgentId, FakeFs, Fs};
3026 use rand::{distr, prelude::*};
3027 use serde_json::json;
3028 use settings::SettingsStore;
3029 use smol::stream::StreamExt as _;
3030 use std::{
3031 any::Any,
3032 cell::RefCell,
3033 path::Path,
3034 rc::Rc,
3035 sync::atomic::{AtomicBool, AtomicUsize, Ordering::SeqCst},
3036 time::Duration,
3037 };
3038 use util::{path, path_list::PathList};
3039
3040 fn init_test(cx: &mut TestAppContext) {
3041 env_logger::try_init().ok();
3042 cx.update(|cx| {
3043 let settings_store = SettingsStore::test(cx);
3044 cx.set_global(settings_store);
3045 });
3046 }
3047
3048 #[gpui::test]
3049 async fn test_terminal_output_buffered_before_created_renders(cx: &mut gpui::TestAppContext) {
3050 init_test(cx);
3051
3052 let fs = FakeFs::new(cx.executor());
3053 let project = Project::test(fs, [], cx).await;
3054 let connection = Rc::new(FakeAgentConnection::new());
3055 let thread = cx
3056 .update(|cx| {
3057 connection.new_session(
3058 project,
3059 PathList::new(&[std::path::Path::new(path!("/test"))]),
3060 cx,
3061 )
3062 })
3063 .await
3064 .unwrap();
3065
3066 let terminal_id = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
3067
3068 // Send Output BEFORE Created - should be buffered by acp_thread
3069 thread.update(cx, |thread, cx| {
3070 thread.on_terminal_provider_event(
3071 TerminalProviderEvent::Output {
3072 terminal_id: terminal_id.clone(),
3073 data: b"hello buffered".to_vec(),
3074 },
3075 cx,
3076 );
3077 });
3078
3079 // Create a display-only terminal and then send Created
3080 let lower = cx.new(|cx| {
3081 let builder = ::terminal::TerminalBuilder::new_display_only(
3082 ::terminal::terminal_settings::CursorShape::default(),
3083 ::terminal::terminal_settings::AlternateScroll::On,
3084 None,
3085 0,
3086 cx.background_executor(),
3087 PathStyle::local(),
3088 )
3089 .unwrap();
3090 builder.subscribe(cx)
3091 });
3092
3093 thread.update(cx, |thread, cx| {
3094 thread.on_terminal_provider_event(
3095 TerminalProviderEvent::Created {
3096 terminal_id: terminal_id.clone(),
3097 label: "Buffered Test".to_string(),
3098 cwd: None,
3099 output_byte_limit: None,
3100 terminal: lower.clone(),
3101 },
3102 cx,
3103 );
3104 });
3105
3106 // After Created, buffered Output should have been flushed into the renderer
3107 let content = thread.read_with(cx, |thread, cx| {
3108 let term = thread.terminal(terminal_id.clone()).unwrap();
3109 term.read_with(cx, |t, cx| t.inner().read(cx).get_content())
3110 });
3111
3112 assert!(
3113 content.contains("hello buffered"),
3114 "expected buffered output to render, got: {content}"
3115 );
3116 }
3117
3118 #[gpui::test]
3119 async fn test_terminal_output_and_exit_buffered_before_created(cx: &mut gpui::TestAppContext) {
3120 init_test(cx);
3121
3122 let fs = FakeFs::new(cx.executor());
3123 let project = Project::test(fs, [], cx).await;
3124 let connection = Rc::new(FakeAgentConnection::new());
3125 let thread = cx
3126 .update(|cx| {
3127 connection.new_session(
3128 project,
3129 PathList::new(&[std::path::Path::new(path!("/test"))]),
3130 cx,
3131 )
3132 })
3133 .await
3134 .unwrap();
3135
3136 let terminal_id = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
3137
3138 // Send Output BEFORE Created
3139 thread.update(cx, |thread, cx| {
3140 thread.on_terminal_provider_event(
3141 TerminalProviderEvent::Output {
3142 terminal_id: terminal_id.clone(),
3143 data: b"pre-exit data".to_vec(),
3144 },
3145 cx,
3146 );
3147 });
3148
3149 // Send Exit BEFORE Created
3150 thread.update(cx, |thread, cx| {
3151 thread.on_terminal_provider_event(
3152 TerminalProviderEvent::Exit {
3153 terminal_id: terminal_id.clone(),
3154 status: acp::TerminalExitStatus::new().exit_code(0),
3155 },
3156 cx,
3157 );
3158 });
3159
3160 // Now create a display-only lower-level terminal and send Created
3161 let lower = cx.new(|cx| {
3162 let builder = ::terminal::TerminalBuilder::new_display_only(
3163 ::terminal::terminal_settings::CursorShape::default(),
3164 ::terminal::terminal_settings::AlternateScroll::On,
3165 None,
3166 0,
3167 cx.background_executor(),
3168 PathStyle::local(),
3169 )
3170 .unwrap();
3171 builder.subscribe(cx)
3172 });
3173
3174 thread.update(cx, |thread, cx| {
3175 thread.on_terminal_provider_event(
3176 TerminalProviderEvent::Created {
3177 terminal_id: terminal_id.clone(),
3178 label: "Buffered Exit Test".to_string(),
3179 cwd: None,
3180 output_byte_limit: None,
3181 terminal: lower.clone(),
3182 },
3183 cx,
3184 );
3185 });
3186
3187 // Output should be present after Created (flushed from buffer)
3188 let content = thread.read_with(cx, |thread, cx| {
3189 let term = thread.terminal(terminal_id.clone()).unwrap();
3190 term.read_with(cx, |t, cx| t.inner().read(cx).get_content())
3191 });
3192
3193 assert!(
3194 content.contains("pre-exit data"),
3195 "expected pre-exit data to render, got: {content}"
3196 );
3197 }
3198
3199 /// Test that killing a terminal via Terminal::kill properly:
3200 /// 1. Causes wait_for_exit to complete (doesn't hang forever)
3201 /// 2. The underlying terminal still has the output that was written before the kill
3202 ///
3203 /// This test verifies that the fix to kill_active_task (which now also kills
3204 /// the shell process in addition to the foreground process) properly allows
3205 /// wait_for_exit to complete instead of hanging indefinitely.
3206 #[cfg(unix)]
3207 #[gpui::test]
3208 async fn test_terminal_kill_allows_wait_for_exit_to_complete(cx: &mut gpui::TestAppContext) {
3209 use std::collections::HashMap;
3210 use task::Shell;
3211 use util::shell_builder::ShellBuilder;
3212
3213 init_test(cx);
3214 cx.executor().allow_parking();
3215
3216 let fs = FakeFs::new(cx.executor());
3217 let project = Project::test(fs, [], cx).await;
3218 let connection = Rc::new(FakeAgentConnection::new());
3219 let thread = cx
3220 .update(|cx| {
3221 connection.new_session(
3222 project.clone(),
3223 PathList::new(&[Path::new(path!("/test"))]),
3224 cx,
3225 )
3226 })
3227 .await
3228 .unwrap();
3229
3230 let terminal_id = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
3231
3232 // Create a real PTY terminal that runs a command which prints output then sleeps
3233 // We use printf instead of echo and chain with && sleep to ensure proper execution
3234 let (completion_tx, _completion_rx) = smol::channel::unbounded();
3235 let (program, args) = ShellBuilder::new(&Shell::System, false).build(
3236 Some("printf 'output_before_kill\\n' && sleep 60".to_owned()),
3237 &[],
3238 );
3239
3240 let builder = cx
3241 .update(|cx| {
3242 ::terminal::TerminalBuilder::new(
3243 None,
3244 None,
3245 task::Shell::WithArguments {
3246 program,
3247 args,
3248 title_override: None,
3249 },
3250 HashMap::default(),
3251 ::terminal::terminal_settings::CursorShape::default(),
3252 ::terminal::terminal_settings::AlternateScroll::On,
3253 None,
3254 vec![],
3255 0,
3256 false,
3257 0,
3258 Some(completion_tx),
3259 cx,
3260 vec![],
3261 PathStyle::local(),
3262 )
3263 })
3264 .await
3265 .unwrap();
3266
3267 let lower_terminal = cx.new(|cx| builder.subscribe(cx));
3268
3269 // Create the acp_thread Terminal wrapper
3270 thread.update(cx, |thread, cx| {
3271 thread.on_terminal_provider_event(
3272 TerminalProviderEvent::Created {
3273 terminal_id: terminal_id.clone(),
3274 label: "printf output_before_kill && sleep 60".to_string(),
3275 cwd: None,
3276 output_byte_limit: None,
3277 terminal: lower_terminal.clone(),
3278 },
3279 cx,
3280 );
3281 });
3282
3283 // Poll until the printf command produces output, rather than using a
3284 // fixed sleep which is flaky on loaded machines.
3285 let deadline = std::time::Instant::now() + Duration::from_secs(10);
3286 loop {
3287 let has_output = thread.read_with(cx, |thread, cx| {
3288 let term = thread
3289 .terminals
3290 .get(&terminal_id)
3291 .expect("terminal not found");
3292 let content = term.read(cx).inner().read(cx).get_content();
3293 content.contains("output_before_kill")
3294 });
3295 if has_output {
3296 break;
3297 }
3298 assert!(
3299 std::time::Instant::now() < deadline,
3300 "Timed out waiting for printf output to appear in terminal",
3301 );
3302 cx.executor().timer(Duration::from_millis(50)).await;
3303 }
3304
3305 // Get the acp_thread Terminal and kill it
3306 let wait_for_exit = thread.update(cx, |thread, cx| {
3307 let term = thread.terminals.get(&terminal_id).unwrap();
3308 let wait_for_exit = term.read(cx).wait_for_exit();
3309 term.update(cx, |term, cx| {
3310 term.kill(cx);
3311 });
3312 wait_for_exit
3313 });
3314
3315 // KEY ASSERTION: wait_for_exit should complete within a reasonable time (not hang).
3316 // Before the fix to kill_active_task, this would hang forever because
3317 // only the foreground process was killed, not the shell, so the PTY
3318 // child never exited and wait_for_completed_task never completed.
3319 let exit_result = futures::select! {
3320 result = futures::FutureExt::fuse(wait_for_exit) => Some(result),
3321 _ = futures::FutureExt::fuse(cx.background_executor.timer(Duration::from_secs(5))) => None,
3322 };
3323
3324 assert!(
3325 exit_result.is_some(),
3326 "wait_for_exit should complete after kill, but it timed out. \
3327 This indicates kill_active_task is not properly killing the shell process."
3328 );
3329
3330 // Give the system a chance to process any pending updates
3331 cx.run_until_parked();
3332
3333 // Verify that the underlying terminal still has the output that was
3334 // written before the kill. This verifies that killing doesn't lose output.
3335 let inner_content = thread.read_with(cx, |thread, cx| {
3336 let term = thread.terminals.get(&terminal_id).unwrap();
3337 term.read(cx).inner().read(cx).get_content()
3338 });
3339
3340 assert!(
3341 inner_content.contains("output_before_kill"),
3342 "Underlying terminal should contain output from before kill, got: {}",
3343 inner_content
3344 );
3345 }
3346
3347 #[gpui::test]
3348 async fn test_push_user_content_block(cx: &mut gpui::TestAppContext) {
3349 init_test(cx);
3350
3351 let fs = FakeFs::new(cx.executor());
3352 let project = Project::test(fs, [], cx).await;
3353 let connection = Rc::new(FakeAgentConnection::new());
3354 let thread = cx
3355 .update(|cx| {
3356 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
3357 })
3358 .await
3359 .unwrap();
3360
3361 // Test creating a new user message
3362 thread.update(cx, |thread, cx| {
3363 thread.push_user_content_block(None, "Hello, ".into(), cx);
3364 });
3365
3366 thread.update(cx, |thread, cx| {
3367 assert_eq!(thread.entries.len(), 1);
3368 if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
3369 assert_eq!(user_msg.id, None);
3370 assert_eq!(user_msg.content.to_markdown(cx), "Hello, ");
3371 } else {
3372 panic!("Expected UserMessage");
3373 }
3374 });
3375
3376 // Test appending to existing user message
3377 let message_1_id = UserMessageId::new();
3378 thread.update(cx, |thread, cx| {
3379 thread.push_user_content_block(Some(message_1_id.clone()), "world!".into(), cx);
3380 });
3381
3382 thread.update(cx, |thread, cx| {
3383 assert_eq!(thread.entries.len(), 1);
3384 if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
3385 assert_eq!(user_msg.id, Some(message_1_id));
3386 assert_eq!(user_msg.content.to_markdown(cx), "Hello, world!");
3387 } else {
3388 panic!("Expected UserMessage");
3389 }
3390 });
3391
3392 // Test creating new user message after assistant message
3393 thread.update(cx, |thread, cx| {
3394 thread.push_assistant_content_block("Assistant response".into(), false, cx);
3395 });
3396
3397 let message_2_id = UserMessageId::new();
3398 thread.update(cx, |thread, cx| {
3399 thread.push_user_content_block(
3400 Some(message_2_id.clone()),
3401 "New user message".into(),
3402 cx,
3403 );
3404 });
3405
3406 thread.update(cx, |thread, cx| {
3407 assert_eq!(thread.entries.len(), 3);
3408 if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[2] {
3409 assert_eq!(user_msg.id, Some(message_2_id));
3410 assert_eq!(user_msg.content.to_markdown(cx), "New user message");
3411 } else {
3412 panic!("Expected UserMessage at index 2");
3413 }
3414 });
3415 }
3416
3417 #[gpui::test]
3418 async fn test_thinking_concatenation(cx: &mut gpui::TestAppContext) {
3419 init_test(cx);
3420
3421 let fs = FakeFs::new(cx.executor());
3422 let project = Project::test(fs, [], cx).await;
3423 let connection = Rc::new(FakeAgentConnection::new().on_user_message(
3424 |_, thread, mut cx| {
3425 async move {
3426 thread.update(&mut cx, |thread, cx| {
3427 thread
3428 .handle_session_update(
3429 acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk::new(
3430 "Thinking ".into(),
3431 )),
3432 cx,
3433 )
3434 .unwrap();
3435 thread
3436 .handle_session_update(
3437 acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk::new(
3438 "hard!".into(),
3439 )),
3440 cx,
3441 )
3442 .unwrap();
3443 })?;
3444 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3445 }
3446 .boxed_local()
3447 },
3448 ));
3449
3450 let thread = cx
3451 .update(|cx| {
3452 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
3453 })
3454 .await
3455 .unwrap();
3456
3457 thread
3458 .update(cx, |thread, cx| thread.send_raw("Hello from Zed!", cx))
3459 .await
3460 .unwrap();
3461
3462 let output = thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
3463 assert_eq!(
3464 output,
3465 indoc! {r#"
3466 ## User
3467
3468 Hello from Zed!
3469
3470 ## Assistant
3471
3472 <thinking>
3473 Thinking hard!
3474 </thinking>
3475
3476 "#}
3477 );
3478 }
3479
3480 #[gpui::test]
3481 async fn test_ignore_echoed_user_message_chunks_during_active_turn(
3482 cx: &mut gpui::TestAppContext,
3483 ) {
3484 init_test(cx);
3485
3486 let fs = FakeFs::new(cx.executor());
3487 let project = Project::test(fs, [], cx).await;
3488 let connection = Rc::new(FakeAgentConnection::new().on_user_message(
3489 |request, thread, mut cx| {
3490 async move {
3491 let prompt = request.prompt.first().cloned().unwrap_or_else(|| "".into());
3492
3493 thread.update(&mut cx, |thread, cx| {
3494 thread
3495 .handle_session_update(
3496 acp::SessionUpdate::UserMessageChunk(acp::ContentChunk::new(
3497 prompt,
3498 )),
3499 cx,
3500 )
3501 .unwrap();
3502 })?;
3503
3504 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3505 }
3506 .boxed_local()
3507 },
3508 ));
3509
3510 let thread = cx
3511 .update(|cx| {
3512 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
3513 })
3514 .await
3515 .unwrap();
3516
3517 thread
3518 .update(cx, |thread, cx| thread.send_raw("Hello from Zed!", cx))
3519 .await
3520 .unwrap();
3521
3522 let output = thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
3523 assert_eq!(output.matches("Hello from Zed!").count(), 1);
3524 }
3525
3526 #[gpui::test]
3527 async fn test_edits_concurrently_to_user(cx: &mut TestAppContext) {
3528 init_test(cx);
3529
3530 let fs = FakeFs::new(cx.executor());
3531 fs.insert_tree(path!("/tmp"), json!({"foo": "one\ntwo\nthree\n"}))
3532 .await;
3533 let project = Project::test(fs.clone(), [], cx).await;
3534 let (read_file_tx, read_file_rx) = oneshot::channel::<()>();
3535 let read_file_tx = Rc::new(RefCell::new(Some(read_file_tx)));
3536 let connection = Rc::new(FakeAgentConnection::new().on_user_message(
3537 move |_, thread, mut cx| {
3538 let read_file_tx = read_file_tx.clone();
3539 async move {
3540 let content = thread
3541 .update(&mut cx, |thread, cx| {
3542 thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
3543 })
3544 .unwrap()
3545 .await
3546 .unwrap();
3547 assert_eq!(content, "one\ntwo\nthree\n");
3548 read_file_tx.take().unwrap().send(()).unwrap();
3549 thread
3550 .update(&mut cx, |thread, cx| {
3551 thread.write_text_file(
3552 path!("/tmp/foo").into(),
3553 "one\ntwo\nthree\nfour\nfive\n".to_string(),
3554 cx,
3555 )
3556 })
3557 .unwrap()
3558 .await
3559 .unwrap();
3560 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3561 }
3562 .boxed_local()
3563 },
3564 ));
3565
3566 let (worktree, pathbuf) = project
3567 .update(cx, |project, cx| {
3568 project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
3569 })
3570 .await
3571 .unwrap();
3572 let buffer = project
3573 .update(cx, |project, cx| {
3574 project.open_buffer((worktree.read(cx).id(), pathbuf), cx)
3575 })
3576 .await
3577 .unwrap();
3578
3579 let thread = cx
3580 .update(|cx| {
3581 connection.new_session(project, PathList::new(&[Path::new(path!("/tmp"))]), cx)
3582 })
3583 .await
3584 .unwrap();
3585
3586 let request = thread.update(cx, |thread, cx| {
3587 thread.send_raw("Extend the count in /tmp/foo", cx)
3588 });
3589 read_file_rx.await.ok();
3590 buffer.update(cx, |buffer, cx| {
3591 buffer.edit([(0..0, "zero\n".to_string())], None, cx);
3592 });
3593 cx.run_until_parked();
3594 assert_eq!(
3595 buffer.read_with(cx, |buffer, _| buffer.text()),
3596 "zero\none\ntwo\nthree\nfour\nfive\n"
3597 );
3598 assert_eq!(
3599 String::from_utf8(fs.read_file_sync(path!("/tmp/foo")).unwrap()).unwrap(),
3600 "zero\none\ntwo\nthree\nfour\nfive\n"
3601 );
3602 request.await.unwrap();
3603 }
3604
3605 #[gpui::test]
3606 async fn test_reading_from_line(cx: &mut TestAppContext) {
3607 init_test(cx);
3608
3609 let fs = FakeFs::new(cx.executor());
3610 fs.insert_tree(path!("/tmp"), json!({"foo": "one\ntwo\nthree\nfour\n"}))
3611 .await;
3612 let project = Project::test(fs.clone(), [], cx).await;
3613 project
3614 .update(cx, |project, cx| {
3615 project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
3616 })
3617 .await
3618 .unwrap();
3619
3620 let connection = Rc::new(FakeAgentConnection::new());
3621
3622 let thread = cx
3623 .update(|cx| {
3624 connection.new_session(project, PathList::new(&[Path::new(path!("/tmp"))]), cx)
3625 })
3626 .await
3627 .unwrap();
3628
3629 // Whole file
3630 let content = thread
3631 .update(cx, |thread, cx| {
3632 thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
3633 })
3634 .await
3635 .unwrap();
3636
3637 assert_eq!(content, "one\ntwo\nthree\nfour\n");
3638
3639 // Only start line
3640 let content = thread
3641 .update(cx, |thread, cx| {
3642 thread.read_text_file(path!("/tmp/foo").into(), Some(3), None, false, cx)
3643 })
3644 .await
3645 .unwrap();
3646
3647 assert_eq!(content, "three\nfour\n");
3648
3649 // Only limit
3650 let content = thread
3651 .update(cx, |thread, cx| {
3652 thread.read_text_file(path!("/tmp/foo").into(), None, Some(2), false, cx)
3653 })
3654 .await
3655 .unwrap();
3656
3657 assert_eq!(content, "one\ntwo\n");
3658
3659 // Range
3660 let content = thread
3661 .update(cx, |thread, cx| {
3662 thread.read_text_file(path!("/tmp/foo").into(), Some(2), Some(2), false, cx)
3663 })
3664 .await
3665 .unwrap();
3666
3667 assert_eq!(content, "two\nthree\n");
3668
3669 // Invalid
3670 let err = thread
3671 .update(cx, |thread, cx| {
3672 thread.read_text_file(path!("/tmp/foo").into(), Some(6), Some(2), false, cx)
3673 })
3674 .await
3675 .unwrap_err();
3676
3677 assert_eq!(
3678 err.to_string(),
3679 "Invalid params: \"Attempting to read beyond the end of the file, line 5:0\""
3680 );
3681 }
3682
3683 #[gpui::test]
3684 async fn test_reading_empty_file(cx: &mut TestAppContext) {
3685 init_test(cx);
3686
3687 let fs = FakeFs::new(cx.executor());
3688 fs.insert_tree(path!("/tmp"), json!({"foo": ""})).await;
3689 let project = Project::test(fs.clone(), [], cx).await;
3690 project
3691 .update(cx, |project, cx| {
3692 project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
3693 })
3694 .await
3695 .unwrap();
3696
3697 let connection = Rc::new(FakeAgentConnection::new());
3698
3699 let thread = cx
3700 .update(|cx| {
3701 connection.new_session(project, PathList::new(&[Path::new(path!("/tmp"))]), cx)
3702 })
3703 .await
3704 .unwrap();
3705
3706 // Whole file
3707 let content = thread
3708 .update(cx, |thread, cx| {
3709 thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
3710 })
3711 .await
3712 .unwrap();
3713
3714 assert_eq!(content, "");
3715
3716 // Only start line
3717 let content = thread
3718 .update(cx, |thread, cx| {
3719 thread.read_text_file(path!("/tmp/foo").into(), Some(1), None, false, cx)
3720 })
3721 .await
3722 .unwrap();
3723
3724 assert_eq!(content, "");
3725
3726 // Only limit
3727 let content = thread
3728 .update(cx, |thread, cx| {
3729 thread.read_text_file(path!("/tmp/foo").into(), None, Some(2), false, cx)
3730 })
3731 .await
3732 .unwrap();
3733
3734 assert_eq!(content, "");
3735
3736 // Range
3737 let content = thread
3738 .update(cx, |thread, cx| {
3739 thread.read_text_file(path!("/tmp/foo").into(), Some(1), Some(1), false, cx)
3740 })
3741 .await
3742 .unwrap();
3743
3744 assert_eq!(content, "");
3745
3746 // Invalid
3747 let err = thread
3748 .update(cx, |thread, cx| {
3749 thread.read_text_file(path!("/tmp/foo").into(), Some(5), Some(2), false, cx)
3750 })
3751 .await
3752 .unwrap_err();
3753
3754 assert_eq!(
3755 err.to_string(),
3756 "Invalid params: \"Attempting to read beyond the end of the file, line 1:0\""
3757 );
3758 }
3759 #[gpui::test]
3760 async fn test_reading_non_existing_file(cx: &mut TestAppContext) {
3761 init_test(cx);
3762
3763 let fs = FakeFs::new(cx.executor());
3764 fs.insert_tree(path!("/tmp"), json!({})).await;
3765 let project = Project::test(fs.clone(), [], cx).await;
3766 project
3767 .update(cx, |project, cx| {
3768 project.find_or_create_worktree(path!("/tmp"), true, cx)
3769 })
3770 .await
3771 .unwrap();
3772
3773 let connection = Rc::new(FakeAgentConnection::new());
3774
3775 let thread = cx
3776 .update(|cx| {
3777 connection.new_session(project, PathList::new(&[Path::new(path!("/tmp"))]), cx)
3778 })
3779 .await
3780 .unwrap();
3781
3782 // Out of project file
3783 let err = thread
3784 .update(cx, |thread, cx| {
3785 thread.read_text_file(path!("/foo").into(), None, None, false, cx)
3786 })
3787 .await
3788 .unwrap_err();
3789
3790 assert_eq!(err.code, acp::ErrorCode::ResourceNotFound);
3791 }
3792
3793 #[gpui::test]
3794 async fn test_succeeding_canceled_toolcall(cx: &mut TestAppContext) {
3795 init_test(cx);
3796
3797 let fs = FakeFs::new(cx.executor());
3798 let project = Project::test(fs, [], cx).await;
3799 let id = acp::ToolCallId::new("test");
3800
3801 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3802 let id = id.clone();
3803 move |_, thread, mut cx| {
3804 let id = id.clone();
3805 async move {
3806 thread
3807 .update(&mut cx, |thread, cx| {
3808 thread.handle_session_update(
3809 acp::SessionUpdate::ToolCall(
3810 acp::ToolCall::new(id.clone(), "Label")
3811 .kind(acp::ToolKind::Fetch)
3812 .status(acp::ToolCallStatus::InProgress),
3813 ),
3814 cx,
3815 )
3816 })
3817 .unwrap()
3818 .unwrap();
3819 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3820 }
3821 .boxed_local()
3822 }
3823 }));
3824
3825 let thread = cx
3826 .update(|cx| {
3827 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
3828 })
3829 .await
3830 .unwrap();
3831
3832 let request = thread.update(cx, |thread, cx| {
3833 thread.send_raw("Fetch https://example.com", cx)
3834 });
3835
3836 run_until_first_tool_call(&thread, cx).await;
3837
3838 thread.read_with(cx, |thread, _| {
3839 assert!(matches!(
3840 thread.entries[1],
3841 AgentThreadEntry::ToolCall(ToolCall {
3842 status: ToolCallStatus::InProgress,
3843 ..
3844 })
3845 ));
3846 });
3847
3848 thread.update(cx, |thread, cx| thread.cancel(cx)).await;
3849
3850 thread.read_with(cx, |thread, _| {
3851 assert!(matches!(
3852 &thread.entries[1],
3853 AgentThreadEntry::ToolCall(ToolCall {
3854 status: ToolCallStatus::Canceled,
3855 ..
3856 })
3857 ));
3858 });
3859
3860 thread
3861 .update(cx, |thread, cx| {
3862 thread.handle_session_update(
3863 acp::SessionUpdate::ToolCallUpdate(acp::ToolCallUpdate::new(
3864 id,
3865 acp::ToolCallUpdateFields::new().status(acp::ToolCallStatus::Completed),
3866 )),
3867 cx,
3868 )
3869 })
3870 .unwrap();
3871
3872 request.await.unwrap();
3873
3874 thread.read_with(cx, |thread, _| {
3875 assert!(matches!(
3876 thread.entries[1],
3877 AgentThreadEntry::ToolCall(ToolCall {
3878 status: ToolCallStatus::Completed,
3879 ..
3880 })
3881 ));
3882 });
3883 }
3884
3885 #[gpui::test]
3886 async fn test_no_pending_edits_if_tool_calls_are_completed(cx: &mut TestAppContext) {
3887 init_test(cx);
3888 let fs = FakeFs::new(cx.background_executor.clone());
3889 fs.insert_tree(path!("/test"), json!({})).await;
3890 let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
3891
3892 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3893 move |_, thread, mut cx| {
3894 async move {
3895 thread
3896 .update(&mut cx, |thread, cx| {
3897 thread.handle_session_update(
3898 acp::SessionUpdate::ToolCall(
3899 acp::ToolCall::new("test", "Label")
3900 .kind(acp::ToolKind::Edit)
3901 .status(acp::ToolCallStatus::Completed)
3902 .content(vec![acp::ToolCallContent::Diff(acp::Diff::new(
3903 "/test/test.txt",
3904 "foo",
3905 ))]),
3906 ),
3907 cx,
3908 )
3909 })
3910 .unwrap()
3911 .unwrap();
3912 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3913 }
3914 .boxed_local()
3915 }
3916 }));
3917
3918 let thread = cx
3919 .update(|cx| {
3920 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
3921 })
3922 .await
3923 .unwrap();
3924
3925 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Hi".into()], cx)))
3926 .await
3927 .unwrap();
3928
3929 assert!(cx.read(|cx| !thread.read(cx).has_pending_edit_tool_calls()));
3930 }
3931
3932 #[gpui::test(iterations = 10)]
3933 async fn test_checkpoints(cx: &mut TestAppContext) {
3934 init_test(cx);
3935 let fs = FakeFs::new(cx.background_executor.clone());
3936 fs.insert_tree(
3937 path!("/test"),
3938 json!({
3939 ".git": {}
3940 }),
3941 )
3942 .await;
3943 let project = Project::test(fs.clone(), [path!("/test").as_ref()], cx).await;
3944
3945 let simulate_changes = Arc::new(AtomicBool::new(true));
3946 let next_filename = Arc::new(AtomicUsize::new(0));
3947 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3948 let simulate_changes = simulate_changes.clone();
3949 let next_filename = next_filename.clone();
3950 let fs = fs.clone();
3951 move |request, thread, mut cx| {
3952 let fs = fs.clone();
3953 let simulate_changes = simulate_changes.clone();
3954 let next_filename = next_filename.clone();
3955 async move {
3956 if simulate_changes.load(SeqCst) {
3957 let filename = format!("/test/file-{}", next_filename.fetch_add(1, SeqCst));
3958 fs.write(Path::new(&filename), b"").await?;
3959 }
3960
3961 let acp::ContentBlock::Text(content) = &request.prompt[0] else {
3962 panic!("expected text content block");
3963 };
3964 thread.update(&mut cx, |thread, cx| {
3965 thread
3966 .handle_session_update(
3967 acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk::new(
3968 content.text.to_uppercase().into(),
3969 )),
3970 cx,
3971 )
3972 .unwrap();
3973 })?;
3974 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3975 }
3976 .boxed_local()
3977 }
3978 }));
3979 let thread = cx
3980 .update(|cx| {
3981 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
3982 })
3983 .await
3984 .unwrap();
3985
3986 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Lorem".into()], cx)))
3987 .await
3988 .unwrap();
3989 thread.read_with(cx, |thread, cx| {
3990 assert_eq!(
3991 thread.to_markdown(cx),
3992 indoc! {"
3993 ## User (checkpoint)
3994
3995 Lorem
3996
3997 ## Assistant
3998
3999 LOREM
4000
4001 "}
4002 );
4003 });
4004 assert_eq!(fs.files(), vec![Path::new(path!("/test/file-0"))]);
4005
4006 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["ipsum".into()], cx)))
4007 .await
4008 .unwrap();
4009 thread.read_with(cx, |thread, cx| {
4010 assert_eq!(
4011 thread.to_markdown(cx),
4012 indoc! {"
4013 ## User (checkpoint)
4014
4015 Lorem
4016
4017 ## Assistant
4018
4019 LOREM
4020
4021 ## User (checkpoint)
4022
4023 ipsum
4024
4025 ## Assistant
4026
4027 IPSUM
4028
4029 "}
4030 );
4031 });
4032 assert_eq!(
4033 fs.files(),
4034 vec![
4035 Path::new(path!("/test/file-0")),
4036 Path::new(path!("/test/file-1"))
4037 ]
4038 );
4039
4040 // Checkpoint isn't stored when there are no changes.
4041 simulate_changes.store(false, SeqCst);
4042 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["dolor".into()], cx)))
4043 .await
4044 .unwrap();
4045 thread.read_with(cx, |thread, cx| {
4046 assert_eq!(
4047 thread.to_markdown(cx),
4048 indoc! {"
4049 ## User (checkpoint)
4050
4051 Lorem
4052
4053 ## Assistant
4054
4055 LOREM
4056
4057 ## User (checkpoint)
4058
4059 ipsum
4060
4061 ## Assistant
4062
4063 IPSUM
4064
4065 ## User
4066
4067 dolor
4068
4069 ## Assistant
4070
4071 DOLOR
4072
4073 "}
4074 );
4075 });
4076 assert_eq!(
4077 fs.files(),
4078 vec![
4079 Path::new(path!("/test/file-0")),
4080 Path::new(path!("/test/file-1"))
4081 ]
4082 );
4083
4084 // Rewinding the conversation truncates the history and restores the checkpoint.
4085 thread
4086 .update(cx, |thread, cx| {
4087 let AgentThreadEntry::UserMessage(message) = &thread.entries[2] else {
4088 panic!("unexpected entries {:?}", thread.entries)
4089 };
4090 thread.restore_checkpoint(message.id.clone().unwrap(), cx)
4091 })
4092 .await
4093 .unwrap();
4094 thread.read_with(cx, |thread, cx| {
4095 assert_eq!(
4096 thread.to_markdown(cx),
4097 indoc! {"
4098 ## User (checkpoint)
4099
4100 Lorem
4101
4102 ## Assistant
4103
4104 LOREM
4105
4106 "}
4107 );
4108 });
4109 assert_eq!(fs.files(), vec![Path::new(path!("/test/file-0"))]);
4110 }
4111
4112 #[gpui::test]
4113 async fn test_tool_result_refusal(cx: &mut TestAppContext) {
4114 use std::sync::atomic::AtomicUsize;
4115 init_test(cx);
4116
4117 let fs = FakeFs::new(cx.executor());
4118 let project = Project::test(fs, None, cx).await;
4119
4120 // Create a connection that simulates refusal after tool result
4121 let prompt_count = Arc::new(AtomicUsize::new(0));
4122 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
4123 let prompt_count = prompt_count.clone();
4124 move |_request, thread, mut cx| {
4125 let count = prompt_count.fetch_add(1, SeqCst);
4126 async move {
4127 if count == 0 {
4128 // First prompt: Generate a tool call with result
4129 thread.update(&mut cx, |thread, cx| {
4130 thread
4131 .handle_session_update(
4132 acp::SessionUpdate::ToolCall(
4133 acp::ToolCall::new("tool1", "Test Tool")
4134 .kind(acp::ToolKind::Fetch)
4135 .status(acp::ToolCallStatus::Completed)
4136 .raw_input(serde_json::json!({"query": "test"}))
4137 .raw_output(serde_json::json!({"result": "inappropriate content"})),
4138 ),
4139 cx,
4140 )
4141 .unwrap();
4142 })?;
4143
4144 // Now return refusal because of the tool result
4145 Ok(acp::PromptResponse::new(acp::StopReason::Refusal))
4146 } else {
4147 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
4148 }
4149 }
4150 .boxed_local()
4151 }
4152 }));
4153
4154 let thread = cx
4155 .update(|cx| {
4156 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4157 })
4158 .await
4159 .unwrap();
4160
4161 // Track if we see a Refusal event
4162 let saw_refusal_event = Arc::new(std::sync::Mutex::new(false));
4163 let saw_refusal_event_captured = saw_refusal_event.clone();
4164 thread.update(cx, |_thread, cx| {
4165 cx.subscribe(
4166 &thread,
4167 move |_thread, _event_thread, event: &AcpThreadEvent, _cx| {
4168 if matches!(event, AcpThreadEvent::Refusal) {
4169 *saw_refusal_event_captured.lock().unwrap() = true;
4170 }
4171 },
4172 )
4173 .detach();
4174 });
4175
4176 // Send a user message - this will trigger tool call and then refusal
4177 let send_task = thread.update(cx, |thread, cx| thread.send(vec!["Hello".into()], cx));
4178 cx.background_executor.spawn(send_task).detach();
4179 cx.run_until_parked();
4180
4181 // Verify that:
4182 // 1. A Refusal event WAS emitted (because it's a tool result refusal, not user prompt)
4183 // 2. The user message was NOT truncated
4184 assert!(
4185 *saw_refusal_event.lock().unwrap(),
4186 "Refusal event should be emitted for tool result refusals"
4187 );
4188
4189 thread.read_with(cx, |thread, _| {
4190 let entries = thread.entries();
4191 assert!(entries.len() >= 2, "Should have user message and tool call");
4192
4193 // Verify user message is still there
4194 assert!(
4195 matches!(entries[0], AgentThreadEntry::UserMessage(_)),
4196 "User message should not be truncated"
4197 );
4198
4199 // Verify tool call is there with result
4200 if let AgentThreadEntry::ToolCall(tool_call) = &entries[1] {
4201 assert!(
4202 tool_call.raw_output.is_some(),
4203 "Tool call should have output"
4204 );
4205 } else {
4206 panic!("Expected tool call at index 1");
4207 }
4208 });
4209 }
4210
4211 #[gpui::test]
4212 async fn test_user_prompt_refusal_emits_event(cx: &mut TestAppContext) {
4213 init_test(cx);
4214
4215 let fs = FakeFs::new(cx.executor());
4216 let project = Project::test(fs, None, cx).await;
4217
4218 let refuse_next = Arc::new(AtomicBool::new(false));
4219 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
4220 let refuse_next = refuse_next.clone();
4221 move |_request, _thread, _cx| {
4222 if refuse_next.load(SeqCst) {
4223 async move { Ok(acp::PromptResponse::new(acp::StopReason::Refusal)) }
4224 .boxed_local()
4225 } else {
4226 async move { Ok(acp::PromptResponse::new(acp::StopReason::EndTurn)) }
4227 .boxed_local()
4228 }
4229 }
4230 }));
4231
4232 let thread = cx
4233 .update(|cx| {
4234 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4235 })
4236 .await
4237 .unwrap();
4238
4239 // Track if we see a Refusal event
4240 let saw_refusal_event = Arc::new(std::sync::Mutex::new(false));
4241 let saw_refusal_event_captured = saw_refusal_event.clone();
4242 thread.update(cx, |_thread, cx| {
4243 cx.subscribe(
4244 &thread,
4245 move |_thread, _event_thread, event: &AcpThreadEvent, _cx| {
4246 if matches!(event, AcpThreadEvent::Refusal) {
4247 *saw_refusal_event_captured.lock().unwrap() = true;
4248 }
4249 },
4250 )
4251 .detach();
4252 });
4253
4254 // Send a message that will be refused
4255 refuse_next.store(true, SeqCst);
4256 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["hello".into()], cx)))
4257 .await
4258 .unwrap();
4259
4260 // Verify that a Refusal event WAS emitted for user prompt refusal
4261 assert!(
4262 *saw_refusal_event.lock().unwrap(),
4263 "Refusal event should be emitted for user prompt refusals"
4264 );
4265
4266 // Verify the message was truncated (user prompt refusal)
4267 thread.read_with(cx, |thread, cx| {
4268 assert_eq!(thread.to_markdown(cx), "");
4269 });
4270 }
4271
4272 #[gpui::test]
4273 async fn test_refusal(cx: &mut TestAppContext) {
4274 init_test(cx);
4275 let fs = FakeFs::new(cx.background_executor.clone());
4276 fs.insert_tree(path!("/"), json!({})).await;
4277 let project = Project::test(fs.clone(), [path!("/").as_ref()], cx).await;
4278
4279 let refuse_next = Arc::new(AtomicBool::new(false));
4280 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
4281 let refuse_next = refuse_next.clone();
4282 move |request, thread, mut cx| {
4283 let refuse_next = refuse_next.clone();
4284 async move {
4285 if refuse_next.load(SeqCst) {
4286 return Ok(acp::PromptResponse::new(acp::StopReason::Refusal));
4287 }
4288
4289 let acp::ContentBlock::Text(content) = &request.prompt[0] else {
4290 panic!("expected text content block");
4291 };
4292 thread.update(&mut cx, |thread, cx| {
4293 thread
4294 .handle_session_update(
4295 acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk::new(
4296 content.text.to_uppercase().into(),
4297 )),
4298 cx,
4299 )
4300 .unwrap();
4301 })?;
4302 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
4303 }
4304 .boxed_local()
4305 }
4306 }));
4307 let thread = cx
4308 .update(|cx| {
4309 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4310 })
4311 .await
4312 .unwrap();
4313
4314 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["hello".into()], cx)))
4315 .await
4316 .unwrap();
4317 thread.read_with(cx, |thread, cx| {
4318 assert_eq!(
4319 thread.to_markdown(cx),
4320 indoc! {"
4321 ## User
4322
4323 hello
4324
4325 ## Assistant
4326
4327 HELLO
4328
4329 "}
4330 );
4331 });
4332
4333 // Simulate refusing the second message. The message should be truncated
4334 // when a user prompt is refused.
4335 refuse_next.store(true, SeqCst);
4336 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["world".into()], cx)))
4337 .await
4338 .unwrap();
4339 thread.read_with(cx, |thread, cx| {
4340 assert_eq!(
4341 thread.to_markdown(cx),
4342 indoc! {"
4343 ## User
4344
4345 hello
4346
4347 ## Assistant
4348
4349 HELLO
4350
4351 "}
4352 );
4353 });
4354 }
4355
4356 async fn run_until_first_tool_call(
4357 thread: &Entity<AcpThread>,
4358 cx: &mut TestAppContext,
4359 ) -> usize {
4360 let (mut tx, mut rx) = mpsc::channel::<usize>(1);
4361
4362 let subscription = cx.update(|cx| {
4363 cx.subscribe(thread, move |thread, _, cx| {
4364 for (ix, entry) in thread.read(cx).entries.iter().enumerate() {
4365 if matches!(entry, AgentThreadEntry::ToolCall(_)) {
4366 return tx.try_send(ix).unwrap();
4367 }
4368 }
4369 })
4370 });
4371
4372 select! {
4373 _ = futures::FutureExt::fuse(cx.background_executor.timer(Duration::from_secs(10))) => {
4374 panic!("Timeout waiting for tool call")
4375 }
4376 ix = rx.next().fuse() => {
4377 drop(subscription);
4378 ix.unwrap()
4379 }
4380 }
4381 }
4382
4383 #[derive(Clone, Default)]
4384 struct FakeAgentConnection {
4385 auth_methods: Vec<acp::AuthMethod>,
4386 sessions: Arc<parking_lot::Mutex<HashMap<acp::SessionId, WeakEntity<AcpThread>>>>,
4387 set_title_calls: Rc<RefCell<Vec<SharedString>>>,
4388 on_user_message: Option<
4389 Rc<
4390 dyn Fn(
4391 acp::PromptRequest,
4392 WeakEntity<AcpThread>,
4393 AsyncApp,
4394 ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
4395 + 'static,
4396 >,
4397 >,
4398 }
4399
4400 impl FakeAgentConnection {
4401 fn new() -> Self {
4402 Self {
4403 auth_methods: Vec::new(),
4404 on_user_message: None,
4405 sessions: Arc::default(),
4406 set_title_calls: Default::default(),
4407 }
4408 }
4409
4410 #[expect(unused)]
4411 fn with_auth_methods(mut self, auth_methods: Vec<acp::AuthMethod>) -> Self {
4412 self.auth_methods = auth_methods;
4413 self
4414 }
4415
4416 fn on_user_message(
4417 mut self,
4418 handler: impl Fn(
4419 acp::PromptRequest,
4420 WeakEntity<AcpThread>,
4421 AsyncApp,
4422 ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
4423 + 'static,
4424 ) -> Self {
4425 self.on_user_message.replace(Rc::new(handler));
4426 self
4427 }
4428 }
4429
4430 impl AgentConnection for FakeAgentConnection {
4431 fn agent_id(&self) -> AgentId {
4432 AgentId::new("fake")
4433 }
4434
4435 fn telemetry_id(&self) -> SharedString {
4436 "fake".into()
4437 }
4438
4439 fn auth_methods(&self) -> &[acp::AuthMethod] {
4440 &self.auth_methods
4441 }
4442
4443 fn new_session(
4444 self: Rc<Self>,
4445 project: Entity<Project>,
4446 work_dirs: PathList,
4447 cx: &mut App,
4448 ) -> Task<gpui::Result<Entity<AcpThread>>> {
4449 let session_id = acp::SessionId::new(
4450 rand::rng()
4451 .sample_iter(&distr::Alphanumeric)
4452 .take(7)
4453 .map(char::from)
4454 .collect::<String>(),
4455 );
4456 let action_log = cx.new(|_| ActionLog::new(project.clone()));
4457 let thread = cx.new(|cx| {
4458 AcpThread::new(
4459 None,
4460 None,
4461 Some(work_dirs),
4462 self.clone(),
4463 project,
4464 action_log,
4465 session_id.clone(),
4466 watch::Receiver::constant(
4467 acp::PromptCapabilities::new()
4468 .image(true)
4469 .audio(true)
4470 .embedded_context(true),
4471 ),
4472 cx,
4473 )
4474 });
4475 self.sessions.lock().insert(session_id, thread.downgrade());
4476 Task::ready(Ok(thread))
4477 }
4478
4479 fn authenticate(&self, method: acp::AuthMethodId, _cx: &mut App) -> Task<gpui::Result<()>> {
4480 if self.auth_methods().iter().any(|m| m.id() == &method) {
4481 Task::ready(Ok(()))
4482 } else {
4483 Task::ready(Err(anyhow!("Invalid Auth Method")))
4484 }
4485 }
4486
4487 fn prompt(
4488 &self,
4489 _id: Option<UserMessageId>,
4490 params: acp::PromptRequest,
4491 cx: &mut App,
4492 ) -> Task<gpui::Result<acp::PromptResponse>> {
4493 let sessions = self.sessions.lock();
4494 let thread = sessions.get(¶ms.session_id).unwrap();
4495 if let Some(handler) = &self.on_user_message {
4496 let handler = handler.clone();
4497 let thread = thread.clone();
4498 cx.spawn(async move |cx| handler(params, thread, cx.clone()).await)
4499 } else {
4500 Task::ready(Ok(acp::PromptResponse::new(acp::StopReason::EndTurn)))
4501 }
4502 }
4503
4504 fn cancel(&self, _session_id: &acp::SessionId, _cx: &mut App) {}
4505
4506 fn truncate(
4507 &self,
4508 session_id: &acp::SessionId,
4509 _cx: &App,
4510 ) -> Option<Rc<dyn AgentSessionTruncate>> {
4511 Some(Rc::new(FakeAgentSessionEditor {
4512 _session_id: session_id.clone(),
4513 }))
4514 }
4515
4516 fn set_title(
4517 &self,
4518 _session_id: &acp::SessionId,
4519 _cx: &App,
4520 ) -> Option<Rc<dyn AgentSessionSetTitle>> {
4521 Some(Rc::new(FakeAgentSessionSetTitle {
4522 calls: self.set_title_calls.clone(),
4523 }))
4524 }
4525
4526 fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
4527 self
4528 }
4529 }
4530
4531 struct FakeAgentSessionSetTitle {
4532 calls: Rc<RefCell<Vec<SharedString>>>,
4533 }
4534
4535 impl AgentSessionSetTitle for FakeAgentSessionSetTitle {
4536 fn run(&self, title: SharedString, _cx: &mut App) -> Task<Result<()>> {
4537 self.calls.borrow_mut().push(title);
4538 Task::ready(Ok(()))
4539 }
4540 }
4541
4542 struct FakeAgentSessionEditor {
4543 _session_id: acp::SessionId,
4544 }
4545
4546 impl AgentSessionTruncate for FakeAgentSessionEditor {
4547 fn run(&self, _message_id: UserMessageId, _cx: &mut App) -> Task<Result<()>> {
4548 Task::ready(Ok(()))
4549 }
4550 }
4551
4552 #[gpui::test]
4553 async fn test_tool_call_not_found_creates_failed_entry(cx: &mut TestAppContext) {
4554 init_test(cx);
4555
4556 let fs = FakeFs::new(cx.executor());
4557 let project = Project::test(fs, [], cx).await;
4558 let connection = Rc::new(FakeAgentConnection::new());
4559 let thread = cx
4560 .update(|cx| {
4561 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4562 })
4563 .await
4564 .unwrap();
4565
4566 // Try to update a tool call that doesn't exist
4567 let nonexistent_id = acp::ToolCallId::new("nonexistent-tool-call");
4568 thread.update(cx, |thread, cx| {
4569 let result = thread.handle_session_update(
4570 acp::SessionUpdate::ToolCallUpdate(acp::ToolCallUpdate::new(
4571 nonexistent_id.clone(),
4572 acp::ToolCallUpdateFields::new().status(acp::ToolCallStatus::Completed),
4573 )),
4574 cx,
4575 );
4576
4577 // The update should succeed (not return an error)
4578 assert!(result.is_ok());
4579
4580 // There should now be exactly one entry in the thread
4581 assert_eq!(thread.entries.len(), 1);
4582
4583 // The entry should be a failed tool call
4584 if let AgentThreadEntry::ToolCall(tool_call) = &thread.entries[0] {
4585 assert_eq!(tool_call.id, nonexistent_id);
4586 assert!(matches!(tool_call.status, ToolCallStatus::Failed));
4587 assert_eq!(tool_call.kind, acp::ToolKind::Fetch);
4588
4589 // Check that the content contains the error message
4590 assert_eq!(tool_call.content.len(), 1);
4591 if let ToolCallContent::ContentBlock(content_block) = &tool_call.content[0] {
4592 match content_block {
4593 ContentBlock::Markdown { markdown } => {
4594 let markdown_text = markdown.read(cx).source();
4595 assert!(markdown_text.contains("Tool call not found"));
4596 }
4597 ContentBlock::Empty => panic!("Expected markdown content, got empty"),
4598 ContentBlock::ResourceLink { .. } => {
4599 panic!("Expected markdown content, got resource link")
4600 }
4601 ContentBlock::Image { .. } => {
4602 panic!("Expected markdown content, got image")
4603 }
4604 }
4605 } else {
4606 panic!("Expected ContentBlock, got: {:?}", tool_call.content[0]);
4607 }
4608 } else {
4609 panic!("Expected ToolCall entry, got: {:?}", thread.entries[0]);
4610 }
4611 });
4612 }
4613
4614 /// Tests that restoring a checkpoint properly cleans up terminals that were
4615 /// created after that checkpoint, and cancels any in-progress generation.
4616 ///
4617 /// Reproduces issue #35142: When a checkpoint is restored, any terminal processes
4618 /// that were started after that checkpoint should be terminated, and any in-progress
4619 /// AI generation should be canceled.
4620 #[gpui::test]
4621 async fn test_restore_checkpoint_kills_terminal(cx: &mut TestAppContext) {
4622 init_test(cx);
4623
4624 let fs = FakeFs::new(cx.executor());
4625 let project = Project::test(fs, [], cx).await;
4626 let connection = Rc::new(FakeAgentConnection::new());
4627 let thread = cx
4628 .update(|cx| {
4629 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4630 })
4631 .await
4632 .unwrap();
4633
4634 // Send first user message to create a checkpoint
4635 cx.update(|cx| {
4636 thread.update(cx, |thread, cx| {
4637 thread.send(vec!["first message".into()], cx)
4638 })
4639 })
4640 .await
4641 .unwrap();
4642
4643 // Send second message (creates another checkpoint) - we'll restore to this one
4644 cx.update(|cx| {
4645 thread.update(cx, |thread, cx| {
4646 thread.send(vec!["second message".into()], cx)
4647 })
4648 })
4649 .await
4650 .unwrap();
4651
4652 // Create 2 terminals BEFORE the checkpoint that have completed running
4653 let terminal_id_1 = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
4654 let mock_terminal_1 = cx.new(|cx| {
4655 let builder = ::terminal::TerminalBuilder::new_display_only(
4656 ::terminal::terminal_settings::CursorShape::default(),
4657 ::terminal::terminal_settings::AlternateScroll::On,
4658 None,
4659 0,
4660 cx.background_executor(),
4661 PathStyle::local(),
4662 )
4663 .unwrap();
4664 builder.subscribe(cx)
4665 });
4666
4667 thread.update(cx, |thread, cx| {
4668 thread.on_terminal_provider_event(
4669 TerminalProviderEvent::Created {
4670 terminal_id: terminal_id_1.clone(),
4671 label: "echo 'first'".to_string(),
4672 cwd: Some(PathBuf::from("/test")),
4673 output_byte_limit: None,
4674 terminal: mock_terminal_1.clone(),
4675 },
4676 cx,
4677 );
4678 });
4679
4680 thread.update(cx, |thread, cx| {
4681 thread.on_terminal_provider_event(
4682 TerminalProviderEvent::Output {
4683 terminal_id: terminal_id_1.clone(),
4684 data: b"first\n".to_vec(),
4685 },
4686 cx,
4687 );
4688 });
4689
4690 thread.update(cx, |thread, cx| {
4691 thread.on_terminal_provider_event(
4692 TerminalProviderEvent::Exit {
4693 terminal_id: terminal_id_1.clone(),
4694 status: acp::TerminalExitStatus::new().exit_code(0),
4695 },
4696 cx,
4697 );
4698 });
4699
4700 let terminal_id_2 = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
4701 let mock_terminal_2 = cx.new(|cx| {
4702 let builder = ::terminal::TerminalBuilder::new_display_only(
4703 ::terminal::terminal_settings::CursorShape::default(),
4704 ::terminal::terminal_settings::AlternateScroll::On,
4705 None,
4706 0,
4707 cx.background_executor(),
4708 PathStyle::local(),
4709 )
4710 .unwrap();
4711 builder.subscribe(cx)
4712 });
4713
4714 thread.update(cx, |thread, cx| {
4715 thread.on_terminal_provider_event(
4716 TerminalProviderEvent::Created {
4717 terminal_id: terminal_id_2.clone(),
4718 label: "echo 'second'".to_string(),
4719 cwd: Some(PathBuf::from("/test")),
4720 output_byte_limit: None,
4721 terminal: mock_terminal_2.clone(),
4722 },
4723 cx,
4724 );
4725 });
4726
4727 thread.update(cx, |thread, cx| {
4728 thread.on_terminal_provider_event(
4729 TerminalProviderEvent::Output {
4730 terminal_id: terminal_id_2.clone(),
4731 data: b"second\n".to_vec(),
4732 },
4733 cx,
4734 );
4735 });
4736
4737 thread.update(cx, |thread, cx| {
4738 thread.on_terminal_provider_event(
4739 TerminalProviderEvent::Exit {
4740 terminal_id: terminal_id_2.clone(),
4741 status: acp::TerminalExitStatus::new().exit_code(0),
4742 },
4743 cx,
4744 );
4745 });
4746
4747 // Get the second message ID to restore to
4748 let second_message_id = thread.read_with(cx, |thread, _| {
4749 // At this point we have:
4750 // - Index 0: First user message (with checkpoint)
4751 // - Index 1: Second user message (with checkpoint)
4752 // No assistant responses because FakeAgentConnection just returns EndTurn
4753 let AgentThreadEntry::UserMessage(message) = &thread.entries[1] else {
4754 panic!("expected user message at index 1");
4755 };
4756 message.id.clone().unwrap()
4757 });
4758
4759 // Create a terminal AFTER the checkpoint we'll restore to.
4760 // This simulates the AI agent starting a long-running terminal command.
4761 let terminal_id = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
4762 let mock_terminal = cx.new(|cx| {
4763 let builder = ::terminal::TerminalBuilder::new_display_only(
4764 ::terminal::terminal_settings::CursorShape::default(),
4765 ::terminal::terminal_settings::AlternateScroll::On,
4766 None,
4767 0,
4768 cx.background_executor(),
4769 PathStyle::local(),
4770 )
4771 .unwrap();
4772 builder.subscribe(cx)
4773 });
4774
4775 // Register the terminal as created
4776 thread.update(cx, |thread, cx| {
4777 thread.on_terminal_provider_event(
4778 TerminalProviderEvent::Created {
4779 terminal_id: terminal_id.clone(),
4780 label: "sleep 1000".to_string(),
4781 cwd: Some(PathBuf::from("/test")),
4782 output_byte_limit: None,
4783 terminal: mock_terminal.clone(),
4784 },
4785 cx,
4786 );
4787 });
4788
4789 // Simulate the terminal producing output (still running)
4790 thread.update(cx, |thread, cx| {
4791 thread.on_terminal_provider_event(
4792 TerminalProviderEvent::Output {
4793 terminal_id: terminal_id.clone(),
4794 data: b"terminal is running...\n".to_vec(),
4795 },
4796 cx,
4797 );
4798 });
4799
4800 // Create a tool call entry that references this terminal
4801 // This represents the agent requesting a terminal command
4802 thread.update(cx, |thread, cx| {
4803 thread
4804 .handle_session_update(
4805 acp::SessionUpdate::ToolCall(
4806 acp::ToolCall::new("terminal-tool-1", "Running command")
4807 .kind(acp::ToolKind::Execute)
4808 .status(acp::ToolCallStatus::InProgress)
4809 .content(vec![acp::ToolCallContent::Terminal(acp::Terminal::new(
4810 terminal_id.clone(),
4811 ))])
4812 .raw_input(serde_json::json!({"command": "sleep 1000", "cd": "/test"})),
4813 ),
4814 cx,
4815 )
4816 .unwrap();
4817 });
4818
4819 // Verify terminal exists and is in the thread
4820 let terminal_exists_before =
4821 thread.read_with(cx, |thread, _| thread.terminals.contains_key(&terminal_id));
4822 assert!(
4823 terminal_exists_before,
4824 "Terminal should exist before checkpoint restore"
4825 );
4826
4827 // Verify the terminal's underlying task is still running (not completed)
4828 let terminal_running_before = thread.read_with(cx, |thread, _cx| {
4829 let terminal_entity = thread.terminals.get(&terminal_id).unwrap();
4830 terminal_entity.read_with(cx, |term, _cx| {
4831 term.output().is_none() // output is None means it's still running
4832 })
4833 });
4834 assert!(
4835 terminal_running_before,
4836 "Terminal should be running before checkpoint restore"
4837 );
4838
4839 // Verify we have the expected entries before restore
4840 let entry_count_before = thread.read_with(cx, |thread, _| thread.entries.len());
4841 assert!(
4842 entry_count_before > 1,
4843 "Should have multiple entries before restore"
4844 );
4845
4846 // Restore the checkpoint to the second message.
4847 // This should:
4848 // 1. Cancel any in-progress generation (via the cancel() call)
4849 // 2. Remove the terminal that was created after that point
4850 thread
4851 .update(cx, |thread, cx| {
4852 thread.restore_checkpoint(second_message_id, cx)
4853 })
4854 .await
4855 .unwrap();
4856
4857 // Verify that no send_task is in progress after restore
4858 // (cancel() clears the send_task)
4859 let has_send_task_after = thread.read_with(cx, |thread, _| thread.running_turn.is_some());
4860 assert!(
4861 !has_send_task_after,
4862 "Should not have a send_task after restore (cancel should have cleared it)"
4863 );
4864
4865 // Verify the entries were truncated (restoring to index 1 truncates at 1, keeping only index 0)
4866 let entry_count = thread.read_with(cx, |thread, _| thread.entries.len());
4867 assert_eq!(
4868 entry_count, 1,
4869 "Should have 1 entry after restore (only the first user message)"
4870 );
4871
4872 // Verify the 2 completed terminals from before the checkpoint still exist
4873 let terminal_1_exists = thread.read_with(cx, |thread, _| {
4874 thread.terminals.contains_key(&terminal_id_1)
4875 });
4876 assert!(
4877 terminal_1_exists,
4878 "Terminal 1 (from before checkpoint) should still exist"
4879 );
4880
4881 let terminal_2_exists = thread.read_with(cx, |thread, _| {
4882 thread.terminals.contains_key(&terminal_id_2)
4883 });
4884 assert!(
4885 terminal_2_exists,
4886 "Terminal 2 (from before checkpoint) should still exist"
4887 );
4888
4889 // Verify they're still in completed state
4890 let terminal_1_completed = thread.read_with(cx, |thread, _cx| {
4891 let terminal_entity = thread.terminals.get(&terminal_id_1).unwrap();
4892 terminal_entity.read_with(cx, |term, _cx| term.output().is_some())
4893 });
4894 assert!(terminal_1_completed, "Terminal 1 should still be completed");
4895
4896 let terminal_2_completed = thread.read_with(cx, |thread, _cx| {
4897 let terminal_entity = thread.terminals.get(&terminal_id_2).unwrap();
4898 terminal_entity.read_with(cx, |term, _cx| term.output().is_some())
4899 });
4900 assert!(terminal_2_completed, "Terminal 2 should still be completed");
4901
4902 // Verify the running terminal (created after checkpoint) was removed
4903 let terminal_3_exists =
4904 thread.read_with(cx, |thread, _| thread.terminals.contains_key(&terminal_id));
4905 assert!(
4906 !terminal_3_exists,
4907 "Terminal 3 (created after checkpoint) should have been removed"
4908 );
4909
4910 // Verify total count is 2 (the two from before the checkpoint)
4911 let terminal_count = thread.read_with(cx, |thread, _| thread.terminals.len());
4912 assert_eq!(
4913 terminal_count, 2,
4914 "Should have exactly 2 terminals (the completed ones from before checkpoint)"
4915 );
4916 }
4917
4918 /// Tests that update_last_checkpoint correctly updates the original message's checkpoint
4919 /// even when a new user message is added while the async checkpoint comparison is in progress.
4920 ///
4921 /// This is a regression test for a bug where update_last_checkpoint would fail with
4922 /// "no checkpoint" if a new user message (without a checkpoint) was added between when
4923 /// update_last_checkpoint started and when its async closure ran.
4924 #[gpui::test]
4925 async fn test_update_last_checkpoint_with_new_message_added(cx: &mut TestAppContext) {
4926 init_test(cx);
4927
4928 let fs = FakeFs::new(cx.executor());
4929 fs.insert_tree(path!("/test"), json!({".git": {}, "file.txt": "content"}))
4930 .await;
4931 let project = Project::test(fs.clone(), [Path::new(path!("/test"))], cx).await;
4932
4933 let handler_done = Arc::new(AtomicBool::new(false));
4934 let handler_done_clone = handler_done.clone();
4935 let connection = Rc::new(FakeAgentConnection::new().on_user_message(
4936 move |_, _thread, _cx| {
4937 handler_done_clone.store(true, SeqCst);
4938 async move { Ok(acp::PromptResponse::new(acp::StopReason::EndTurn)) }.boxed_local()
4939 },
4940 ));
4941
4942 let thread = cx
4943 .update(|cx| {
4944 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4945 })
4946 .await
4947 .unwrap();
4948
4949 let send_future = thread.update(cx, |thread, cx| thread.send_raw("First message", cx));
4950 let send_task = cx.background_executor.spawn(send_future);
4951
4952 // Tick until handler completes, then a few more to let update_last_checkpoint start
4953 while !handler_done.load(SeqCst) {
4954 cx.executor().tick();
4955 }
4956 for _ in 0..5 {
4957 cx.executor().tick();
4958 }
4959
4960 thread.update(cx, |thread, cx| {
4961 thread.push_entry(
4962 AgentThreadEntry::UserMessage(UserMessage {
4963 id: Some(UserMessageId::new()),
4964 content: ContentBlock::Empty,
4965 chunks: vec!["Injected message (no checkpoint)".into()],
4966 checkpoint: None,
4967 indented: false,
4968 }),
4969 cx,
4970 );
4971 });
4972
4973 cx.run_until_parked();
4974 let result = send_task.await;
4975
4976 assert!(
4977 result.is_ok(),
4978 "send should succeed even when new message added during update_last_checkpoint: {:?}",
4979 result.err()
4980 );
4981 }
4982
4983 /// Tests that when a follow-up message is sent during generation,
4984 /// the first turn completing does NOT clear `running_turn` because
4985 /// it now belongs to the second turn.
4986 #[gpui::test]
4987 async fn test_follow_up_message_during_generation_does_not_clear_turn(cx: &mut TestAppContext) {
4988 init_test(cx);
4989
4990 let fs = FakeFs::new(cx.executor());
4991 let project = Project::test(fs, [], cx).await;
4992
4993 // First handler waits for this signal before completing
4994 let (first_complete_tx, first_complete_rx) = futures::channel::oneshot::channel::<()>();
4995 let first_complete_rx = RefCell::new(Some(first_complete_rx));
4996
4997 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
4998 move |params, _thread, _cx| {
4999 let first_complete_rx = first_complete_rx.borrow_mut().take();
5000 let is_first = params
5001 .prompt
5002 .iter()
5003 .any(|c| matches!(c, acp::ContentBlock::Text(t) if t.text.contains("first")));
5004
5005 async move {
5006 if is_first {
5007 // First handler waits until signaled
5008 if let Some(rx) = first_complete_rx {
5009 rx.await.ok();
5010 }
5011 }
5012 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
5013 }
5014 .boxed_local()
5015 }
5016 }));
5017
5018 let thread = cx
5019 .update(|cx| {
5020 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
5021 })
5022 .await
5023 .unwrap();
5024
5025 // Send first message (turn_id=1) - handler will block
5026 let first_request = thread.update(cx, |thread, cx| thread.send_raw("first", cx));
5027 assert_eq!(thread.read_with(cx, |t, _| t.turn_id), 1);
5028
5029 // Send second message (turn_id=2) while first is still blocked
5030 // This calls cancel() which takes turn 1's running_turn and sets turn 2's
5031 let second_request = thread.update(cx, |thread, cx| thread.send_raw("second", cx));
5032 assert_eq!(thread.read_with(cx, |t, _| t.turn_id), 2);
5033
5034 let running_turn_after_second_send =
5035 thread.read_with(cx, |thread, _| thread.running_turn.as_ref().map(|t| t.id));
5036 assert_eq!(
5037 running_turn_after_second_send,
5038 Some(2),
5039 "running_turn should be set to turn 2 after sending second message"
5040 );
5041
5042 // Now signal first handler to complete
5043 first_complete_tx.send(()).ok();
5044
5045 // First request completes - should NOT clear running_turn
5046 // because running_turn now belongs to turn 2
5047 first_request.await.unwrap();
5048
5049 let running_turn_after_first =
5050 thread.read_with(cx, |thread, _| thread.running_turn.as_ref().map(|t| t.id));
5051 assert_eq!(
5052 running_turn_after_first,
5053 Some(2),
5054 "first turn completing should not clear running_turn (belongs to turn 2)"
5055 );
5056
5057 // Second request completes - SHOULD clear running_turn
5058 second_request.await.unwrap();
5059
5060 let running_turn_after_second =
5061 thread.read_with(cx, |thread, _| thread.running_turn.is_some());
5062 assert!(
5063 !running_turn_after_second,
5064 "second turn completing should clear running_turn"
5065 );
5066 }
5067
5068 #[gpui::test]
5069 async fn test_send_returns_cancelled_response_and_marks_tools_as_cancelled(
5070 cx: &mut TestAppContext,
5071 ) {
5072 init_test(cx);
5073
5074 let fs = FakeFs::new(cx.executor());
5075 let project = Project::test(fs, [], cx).await;
5076
5077 let connection = Rc::new(FakeAgentConnection::new().on_user_message(
5078 move |_params, thread, mut cx| {
5079 async move {
5080 thread
5081 .update(&mut cx, |thread, cx| {
5082 thread.handle_session_update(
5083 acp::SessionUpdate::ToolCall(
5084 acp::ToolCall::new(
5085 acp::ToolCallId::new("test-tool"),
5086 "Test Tool",
5087 )
5088 .kind(acp::ToolKind::Fetch)
5089 .status(acp::ToolCallStatus::InProgress),
5090 ),
5091 cx,
5092 )
5093 })
5094 .unwrap()
5095 .unwrap();
5096
5097 Ok(acp::PromptResponse::new(acp::StopReason::Cancelled))
5098 }
5099 .boxed_local()
5100 },
5101 ));
5102
5103 let thread = cx
5104 .update(|cx| {
5105 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
5106 })
5107 .await
5108 .unwrap();
5109
5110 let response = thread
5111 .update(cx, |thread, cx| thread.send_raw("test message", cx))
5112 .await;
5113
5114 let response = response
5115 .expect("send should succeed")
5116 .expect("should have response");
5117 assert_eq!(
5118 response.stop_reason,
5119 acp::StopReason::Cancelled,
5120 "response should have Cancelled stop_reason"
5121 );
5122
5123 thread.read_with(cx, |thread, _| {
5124 let tool_entry = thread
5125 .entries
5126 .iter()
5127 .find_map(|e| {
5128 if let AgentThreadEntry::ToolCall(call) = e {
5129 Some(call)
5130 } else {
5131 None
5132 }
5133 })
5134 .expect("should have tool call entry");
5135
5136 assert!(
5137 matches!(tool_entry.status, ToolCallStatus::Canceled),
5138 "tool should be marked as Canceled when response is Cancelled, got {:?}",
5139 tool_entry.status
5140 );
5141 });
5142 }
5143
5144 #[gpui::test]
5145 async fn test_provisional_title_replaced_by_real_title(cx: &mut TestAppContext) {
5146 init_test(cx);
5147
5148 let fs = FakeFs::new(cx.executor());
5149 let project = Project::test(fs, [], cx).await;
5150 let connection = Rc::new(FakeAgentConnection::new());
5151 let set_title_calls = connection.set_title_calls.clone();
5152
5153 let thread = cx
5154 .update(|cx| {
5155 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
5156 })
5157 .await
5158 .unwrap();
5159
5160 // Initial title is the default.
5161 thread.read_with(cx, |thread, _| {
5162 assert_eq!(thread.title(), None);
5163 });
5164
5165 // Setting a provisional title updates the display title.
5166 thread.update(cx, |thread, cx| {
5167 thread.set_provisional_title("Hello, can you help…".into(), cx);
5168 });
5169 thread.read_with(cx, |thread, _| {
5170 assert_eq!(
5171 thread.title().as_ref().map(|s| s.as_str()),
5172 Some("Hello, can you help…")
5173 );
5174 });
5175
5176 // The provisional title should NOT have propagated to the connection.
5177 assert_eq!(
5178 set_title_calls.borrow().len(),
5179 0,
5180 "provisional title should not propagate to the connection"
5181 );
5182
5183 // When the real title arrives via set_title, it replaces the
5184 // provisional title and propagates to the connection.
5185 let task = thread.update(cx, |thread, cx| {
5186 thread.set_title("Helping with Rust question".into(), cx)
5187 });
5188 task.await.expect("set_title should succeed");
5189 thread.read_with(cx, |thread, _| {
5190 assert_eq!(
5191 thread.title().as_ref().map(|s| s.as_str()),
5192 Some("Helping with Rust question")
5193 );
5194 });
5195 assert_eq!(
5196 set_title_calls.borrow().as_slice(),
5197 &[SharedString::from("Helping with Rust question")],
5198 "real title should propagate to the connection"
5199 );
5200 }
5201
5202 #[gpui::test]
5203 async fn test_session_info_update_replaces_provisional_title_and_emits_event(
5204 cx: &mut TestAppContext,
5205 ) {
5206 init_test(cx);
5207
5208 let fs = FakeFs::new(cx.executor());
5209 let project = Project::test(fs, [], cx).await;
5210 let connection = Rc::new(FakeAgentConnection::new());
5211
5212 let thread = cx
5213 .update(|cx| {
5214 connection.clone().new_session(
5215 project,
5216 PathList::new(&[Path::new(path!("/test"))]),
5217 cx,
5218 )
5219 })
5220 .await
5221 .unwrap();
5222
5223 let title_updated_events = Rc::new(RefCell::new(0usize));
5224 let title_updated_events_for_subscription = title_updated_events.clone();
5225 thread.update(cx, |_thread, cx| {
5226 cx.subscribe(
5227 &thread,
5228 move |_thread, _event_thread, event: &AcpThreadEvent, _cx| {
5229 if matches!(event, AcpThreadEvent::TitleUpdated) {
5230 *title_updated_events_for_subscription.borrow_mut() += 1;
5231 }
5232 },
5233 )
5234 .detach();
5235 });
5236
5237 thread.update(cx, |thread, cx| {
5238 thread.set_provisional_title("Hello, can you help…".into(), cx);
5239 });
5240 assert_eq!(
5241 *title_updated_events.borrow(),
5242 1,
5243 "setting a provisional title should emit TitleUpdated"
5244 );
5245
5246 let result = thread.update(cx, |thread, cx| {
5247 thread.handle_session_update(
5248 acp::SessionUpdate::SessionInfoUpdate(
5249 acp::SessionInfoUpdate::new().title("Helping with Rust question"),
5250 ),
5251 cx,
5252 )
5253 });
5254 result.expect("session info update should succeed");
5255
5256 thread.read_with(cx, |thread, _| {
5257 assert_eq!(
5258 thread.title().as_ref().map(|s| s.as_str()),
5259 Some("Helping with Rust question")
5260 );
5261 assert!(
5262 !thread.has_provisional_title(),
5263 "session info title update should clear provisional title"
5264 );
5265 });
5266
5267 assert_eq!(
5268 *title_updated_events.borrow(),
5269 2,
5270 "session info title update should emit TitleUpdated"
5271 );
5272 assert!(
5273 connection.set_title_calls.borrow().is_empty(),
5274 "session info title update should not propagate back to the connection"
5275 );
5276 }
5277}