1mod connection;
2mod diff;
3mod mention;
4mod terminal;
5use action_log::{ActionLog, ActionLogTelemetry};
6use agent_client_protocol::{self as acp};
7use anyhow::{Context as _, Result, anyhow};
8use collections::HashSet;
9pub use connection::*;
10pub use diff::*;
11use futures::{FutureExt, channel::oneshot, future::BoxFuture};
12use gpui::{AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Task, WeakEntity};
13use itertools::Itertools;
14use language::language_settings::FormatOnSave;
15use language::{Anchor, Buffer, BufferSnapshot, LanguageRegistry, Point, ToPoint, text_diff};
16use markdown::Markdown;
17pub use mention::*;
18use project::lsp_store::{FormatTrigger, LspFormatTarget};
19use project::{AgentLocation, Project, git_store::GitStoreCheckpoint};
20use serde::{Deserialize, Serialize};
21use serde_json::to_string_pretty;
22use std::collections::HashMap;
23use std::error::Error;
24use std::fmt::{Formatter, Write};
25use std::ops::Range;
26use std::process::ExitStatus;
27use std::rc::Rc;
28use std::time::{Duration, Instant};
29use std::{fmt::Display, mem, path::PathBuf, sync::Arc};
30use task::{Shell, ShellBuilder};
31pub use terminal::*;
32use text::Bias;
33use ui::App;
34use util::markdown::MarkdownEscaped;
35use util::path_list::PathList;
36use util::{ResultExt, get_default_system_shell_preferring_bash, paths::PathStyle};
37use uuid::Uuid;
38
39/// Key used in ACP ToolCall meta to store the tool's programmatic name.
40/// This is a workaround since ACP's ToolCall doesn't have a dedicated name field.
41pub const TOOL_NAME_META_KEY: &str = "tool_name";
42
43/// Helper to extract tool name from ACP meta
44pub fn tool_name_from_meta(meta: &Option<acp::Meta>) -> Option<SharedString> {
45 meta.as_ref()
46 .and_then(|m| m.get(TOOL_NAME_META_KEY))
47 .and_then(|v| v.as_str())
48 .map(|s| SharedString::from(s.to_owned()))
49}
50
51/// Helper to create meta with tool name
52pub fn meta_with_tool_name(tool_name: &str) -> acp::Meta {
53 acp::Meta::from_iter([(TOOL_NAME_META_KEY.into(), tool_name.into())])
54}
55
56/// Key used in ACP ToolCall meta to store the session id and message indexes
57pub const SUBAGENT_SESSION_INFO_META_KEY: &str = "subagent_session_info";
58
59#[derive(Clone, Debug, Deserialize, Serialize)]
60pub struct SubagentSessionInfo {
61 /// The session id of the subagent sessiont that was spawned
62 pub session_id: acp::SessionId,
63 /// The index of the message of the start of the "turn" run by this tool call
64 pub message_start_index: usize,
65 /// The index of the output of the message that the subagent has returned
66 #[serde(skip_serializing_if = "Option::is_none")]
67 pub message_end_index: Option<usize>,
68}
69
70/// Helper to extract subagent session id from ACP meta
71pub fn subagent_session_info_from_meta(meta: &Option<acp::Meta>) -> Option<SubagentSessionInfo> {
72 meta.as_ref()
73 .and_then(|m| m.get(SUBAGENT_SESSION_INFO_META_KEY))
74 .and_then(|v| serde_json::from_value(v.clone()).ok())
75}
76
77#[derive(Debug)]
78pub struct UserMessage {
79 pub id: Option<UserMessageId>,
80 pub content: ContentBlock,
81 pub chunks: Vec<acp::ContentBlock>,
82 pub checkpoint: Option<Checkpoint>,
83 pub indented: bool,
84}
85
86#[derive(Debug)]
87pub struct Checkpoint {
88 git_checkpoint: GitStoreCheckpoint,
89 pub show: bool,
90}
91
92impl UserMessage {
93 fn to_markdown(&self, cx: &App) -> String {
94 let mut markdown = String::new();
95 if self
96 .checkpoint
97 .as_ref()
98 .is_some_and(|checkpoint| checkpoint.show)
99 {
100 writeln!(markdown, "## User (checkpoint)").unwrap();
101 } else {
102 writeln!(markdown, "## User").unwrap();
103 }
104 writeln!(markdown).unwrap();
105 writeln!(markdown, "{}", self.content.to_markdown(cx)).unwrap();
106 writeln!(markdown).unwrap();
107 markdown
108 }
109}
110
111#[derive(Debug, PartialEq)]
112pub struct AssistantMessage {
113 pub chunks: Vec<AssistantMessageChunk>,
114 pub indented: bool,
115 pub is_subagent_output: bool,
116}
117
118impl AssistantMessage {
119 pub fn to_markdown(&self, cx: &App) -> String {
120 format!(
121 "## Assistant\n\n{}\n\n",
122 self.chunks
123 .iter()
124 .map(|chunk| chunk.to_markdown(cx))
125 .join("\n\n")
126 )
127 }
128}
129
130#[derive(Debug, PartialEq)]
131pub enum AssistantMessageChunk {
132 Message { block: ContentBlock },
133 Thought { block: ContentBlock },
134}
135
136impl AssistantMessageChunk {
137 pub fn from_str(
138 chunk: &str,
139 language_registry: &Arc<LanguageRegistry>,
140 path_style: PathStyle,
141 cx: &mut App,
142 ) -> Self {
143 Self::Message {
144 block: ContentBlock::new(chunk.into(), language_registry, path_style, cx),
145 }
146 }
147
148 fn to_markdown(&self, cx: &App) -> String {
149 match self {
150 Self::Message { block } => block.to_markdown(cx).to_string(),
151 Self::Thought { block } => {
152 format!("<thinking>\n{}\n</thinking>", block.to_markdown(cx))
153 }
154 }
155 }
156}
157
158#[derive(Debug)]
159pub enum AgentThreadEntry {
160 UserMessage(UserMessage),
161 AssistantMessage(AssistantMessage),
162 ToolCall(ToolCall),
163}
164
165impl AgentThreadEntry {
166 pub fn is_indented(&self) -> bool {
167 match self {
168 Self::UserMessage(message) => message.indented,
169 Self::AssistantMessage(message) => message.indented,
170 Self::ToolCall(_) => false,
171 }
172 }
173
174 pub fn to_markdown(&self, cx: &App) -> String {
175 match self {
176 Self::UserMessage(message) => message.to_markdown(cx),
177 Self::AssistantMessage(message) => message.to_markdown(cx),
178 Self::ToolCall(tool_call) => tool_call.to_markdown(cx),
179 }
180 }
181
182 pub fn user_message(&self) -> Option<&UserMessage> {
183 if let AgentThreadEntry::UserMessage(message) = self {
184 Some(message)
185 } else {
186 None
187 }
188 }
189
190 pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
191 if let AgentThreadEntry::ToolCall(call) = self {
192 itertools::Either::Left(call.diffs())
193 } else {
194 itertools::Either::Right(std::iter::empty())
195 }
196 }
197
198 pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
199 if let AgentThreadEntry::ToolCall(call) = self {
200 itertools::Either::Left(call.terminals())
201 } else {
202 itertools::Either::Right(std::iter::empty())
203 }
204 }
205
206 pub fn location(&self, ix: usize) -> Option<(acp::ToolCallLocation, AgentLocation)> {
207 if let AgentThreadEntry::ToolCall(ToolCall {
208 locations,
209 resolved_locations,
210 ..
211 }) = self
212 {
213 Some((
214 locations.get(ix)?.clone(),
215 resolved_locations.get(ix)?.clone()?,
216 ))
217 } else {
218 None
219 }
220 }
221}
222
223#[derive(Debug)]
224pub struct ToolCall {
225 pub id: acp::ToolCallId,
226 pub label: Entity<Markdown>,
227 pub kind: acp::ToolKind,
228 pub content: Vec<ToolCallContent>,
229 pub status: ToolCallStatus,
230 pub locations: Vec<acp::ToolCallLocation>,
231 pub resolved_locations: Vec<Option<AgentLocation>>,
232 pub raw_input: Option<serde_json::Value>,
233 pub raw_input_markdown: Option<Entity<Markdown>>,
234 pub raw_output: Option<serde_json::Value>,
235 pub tool_name: Option<SharedString>,
236 pub subagent_session_info: Option<SubagentSessionInfo>,
237}
238
239impl ToolCall {
240 fn from_acp(
241 tool_call: acp::ToolCall,
242 status: ToolCallStatus,
243 language_registry: Arc<LanguageRegistry>,
244 path_style: PathStyle,
245 terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
246 cx: &mut App,
247 ) -> Result<Self> {
248 let title = if tool_call.kind == acp::ToolKind::Execute {
249 tool_call.title
250 } else if tool_call.kind == acp::ToolKind::Edit {
251 MarkdownEscaped(tool_call.title.as_str()).to_string()
252 } else if let Some((first_line, _)) = tool_call.title.split_once("\n") {
253 first_line.to_owned() + "…"
254 } else {
255 tool_call.title
256 };
257 let mut content = Vec::with_capacity(tool_call.content.len());
258 for item in tool_call.content {
259 if let Some(item) = ToolCallContent::from_acp(
260 item,
261 language_registry.clone(),
262 path_style,
263 terminals,
264 cx,
265 )? {
266 content.push(item);
267 }
268 }
269
270 let raw_input_markdown = tool_call
271 .raw_input
272 .as_ref()
273 .and_then(|input| markdown_for_raw_output(input, &language_registry, cx));
274
275 let tool_name = tool_name_from_meta(&tool_call.meta);
276
277 let subagent_session_info = subagent_session_info_from_meta(&tool_call.meta);
278
279 let result = Self {
280 id: tool_call.tool_call_id,
281 label: cx
282 .new(|cx| Markdown::new(title.into(), Some(language_registry.clone()), None, cx)),
283 kind: tool_call.kind,
284 content,
285 locations: tool_call.locations,
286 resolved_locations: Vec::default(),
287 status,
288 raw_input: tool_call.raw_input,
289 raw_input_markdown,
290 raw_output: tool_call.raw_output,
291 tool_name,
292 subagent_session_info,
293 };
294 Ok(result)
295 }
296
297 fn update_fields(
298 &mut self,
299 fields: acp::ToolCallUpdateFields,
300 meta: Option<acp::Meta>,
301 language_registry: Arc<LanguageRegistry>,
302 path_style: PathStyle,
303 terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
304 cx: &mut App,
305 ) -> Result<()> {
306 let acp::ToolCallUpdateFields {
307 kind,
308 status,
309 title,
310 content,
311 locations,
312 raw_input,
313 raw_output,
314 ..
315 } = fields;
316
317 if let Some(kind) = kind {
318 self.kind = kind;
319 }
320
321 if let Some(status) = status {
322 self.status = status.into();
323 }
324
325 if let Some(subagent_session_info) = subagent_session_info_from_meta(&meta) {
326 self.subagent_session_info = Some(subagent_session_info);
327 }
328
329 if let Some(title) = title {
330 if self.kind == acp::ToolKind::Execute {
331 for terminal in self.terminals() {
332 terminal.update(cx, |terminal, cx| {
333 terminal.update_command_label(&title, cx);
334 });
335 }
336 }
337 self.label.update(cx, |label, cx| {
338 if self.kind == acp::ToolKind::Execute {
339 label.replace(title, cx);
340 } else if self.kind == acp::ToolKind::Edit {
341 label.replace(MarkdownEscaped(&title).to_string(), cx)
342 } else if let Some((first_line, _)) = title.split_once("\n") {
343 label.replace(first_line.to_owned() + "…", cx);
344 } else {
345 label.replace(title, cx);
346 }
347 });
348 }
349
350 if let Some(content) = content {
351 let mut new_content_len = content.len();
352 let mut content = content.into_iter();
353
354 // Reuse existing content if we can
355 for (old, new) in self.content.iter_mut().zip(content.by_ref()) {
356 let valid_content =
357 old.update_from_acp(new, language_registry.clone(), path_style, terminals, cx)?;
358 if !valid_content {
359 new_content_len -= 1;
360 }
361 }
362 for new in content {
363 if let Some(new) = ToolCallContent::from_acp(
364 new,
365 language_registry.clone(),
366 path_style,
367 terminals,
368 cx,
369 )? {
370 self.content.push(new);
371 } else {
372 new_content_len -= 1;
373 }
374 }
375 self.content.truncate(new_content_len);
376 }
377
378 if let Some(locations) = locations {
379 self.locations = locations;
380 }
381
382 if let Some(raw_input) = raw_input {
383 self.raw_input_markdown = markdown_for_raw_output(&raw_input, &language_registry, cx);
384 self.raw_input = Some(raw_input);
385 }
386
387 if let Some(raw_output) = raw_output {
388 if self.content.is_empty()
389 && let Some(markdown) = markdown_for_raw_output(&raw_output, &language_registry, cx)
390 {
391 self.content
392 .push(ToolCallContent::ContentBlock(ContentBlock::Markdown {
393 markdown,
394 }));
395 }
396 self.raw_output = Some(raw_output);
397 }
398 Ok(())
399 }
400
401 pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
402 self.content.iter().filter_map(|content| match content {
403 ToolCallContent::Diff(diff) => Some(diff),
404 ToolCallContent::ContentBlock(_) => None,
405 ToolCallContent::Terminal(_) => None,
406 })
407 }
408
409 pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
410 self.content.iter().filter_map(|content| match content {
411 ToolCallContent::Terminal(terminal) => Some(terminal),
412 ToolCallContent::ContentBlock(_) => None,
413 ToolCallContent::Diff(_) => None,
414 })
415 }
416
417 pub fn is_subagent(&self) -> bool {
418 self.tool_name.as_ref().is_some_and(|s| s == "spawn_agent")
419 || self.subagent_session_info.is_some()
420 }
421
422 pub fn to_markdown(&self, cx: &App) -> String {
423 let mut markdown = format!(
424 "**Tool Call: {}**\nStatus: {}\n\n",
425 self.label.read(cx).source(),
426 self.status
427 );
428 for content in &self.content {
429 markdown.push_str(content.to_markdown(cx).as_str());
430 markdown.push_str("\n\n");
431 }
432 markdown
433 }
434
435 async fn resolve_location(
436 location: acp::ToolCallLocation,
437 project: WeakEntity<Project>,
438 cx: &mut AsyncApp,
439 ) -> Option<ResolvedLocation> {
440 let buffer = project
441 .update(cx, |project, cx| {
442 project
443 .project_path_for_absolute_path(&location.path, cx)
444 .map(|path| project.open_buffer(path, cx))
445 })
446 .ok()??;
447 let buffer = buffer.await.log_err()?;
448 let position = buffer.update(cx, |buffer, _| {
449 let snapshot = buffer.snapshot();
450 if let Some(row) = location.line {
451 let column = snapshot.indent_size_for_line(row).len;
452 let point = snapshot.clip_point(Point::new(row, column), Bias::Left);
453 snapshot.anchor_before(point)
454 } else {
455 Anchor::min_for_buffer(snapshot.remote_id())
456 }
457 });
458
459 Some(ResolvedLocation { buffer, position })
460 }
461
462 fn resolve_locations(
463 &self,
464 project: Entity<Project>,
465 cx: &mut App,
466 ) -> Task<Vec<Option<ResolvedLocation>>> {
467 let locations = self.locations.clone();
468 project.update(cx, |_, cx| {
469 cx.spawn(async move |project, cx| {
470 let mut new_locations = Vec::new();
471 for location in locations {
472 new_locations.push(Self::resolve_location(location, project.clone(), cx).await);
473 }
474 new_locations
475 })
476 })
477 }
478}
479
480// Separate so we can hold a strong reference to the buffer
481// for saving on the thread
482#[derive(Clone, Debug, PartialEq, Eq)]
483struct ResolvedLocation {
484 buffer: Entity<Buffer>,
485 position: Anchor,
486}
487
488impl From<&ResolvedLocation> for AgentLocation {
489 fn from(value: &ResolvedLocation) -> Self {
490 Self {
491 buffer: value.buffer.downgrade(),
492 position: value.position,
493 }
494 }
495}
496
497#[derive(Debug)]
498pub enum ToolCallStatus {
499 /// The tool call hasn't started running yet, but we start showing it to
500 /// the user.
501 Pending,
502 /// The tool call is waiting for confirmation from the user.
503 WaitingForConfirmation {
504 options: PermissionOptions,
505 respond_tx: oneshot::Sender<acp::PermissionOptionId>,
506 },
507 /// The tool call is currently running.
508 InProgress,
509 /// The tool call completed successfully.
510 Completed,
511 /// The tool call failed.
512 Failed,
513 /// The user rejected the tool call.
514 Rejected,
515 /// The user canceled generation so the tool call was canceled.
516 Canceled,
517}
518
519impl From<acp::ToolCallStatus> for ToolCallStatus {
520 fn from(status: acp::ToolCallStatus) -> Self {
521 match status {
522 acp::ToolCallStatus::Pending => Self::Pending,
523 acp::ToolCallStatus::InProgress => Self::InProgress,
524 acp::ToolCallStatus::Completed => Self::Completed,
525 acp::ToolCallStatus::Failed => Self::Failed,
526 _ => Self::Pending,
527 }
528 }
529}
530
531impl Display for ToolCallStatus {
532 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
533 write!(
534 f,
535 "{}",
536 match self {
537 ToolCallStatus::Pending => "Pending",
538 ToolCallStatus::WaitingForConfirmation { .. } => "Waiting for confirmation",
539 ToolCallStatus::InProgress => "In Progress",
540 ToolCallStatus::Completed => "Completed",
541 ToolCallStatus::Failed => "Failed",
542 ToolCallStatus::Rejected => "Rejected",
543 ToolCallStatus::Canceled => "Canceled",
544 }
545 )
546 }
547}
548
549#[derive(Debug, PartialEq, Clone)]
550pub enum ContentBlock {
551 Empty,
552 Markdown { markdown: Entity<Markdown> },
553 ResourceLink { resource_link: acp::ResourceLink },
554 Image { image: Arc<gpui::Image> },
555}
556
557impl ContentBlock {
558 pub fn new(
559 block: acp::ContentBlock,
560 language_registry: &Arc<LanguageRegistry>,
561 path_style: PathStyle,
562 cx: &mut App,
563 ) -> Self {
564 let mut this = Self::Empty;
565 this.append(block, language_registry, path_style, cx);
566 this
567 }
568
569 pub fn new_combined(
570 blocks: impl IntoIterator<Item = acp::ContentBlock>,
571 language_registry: Arc<LanguageRegistry>,
572 path_style: PathStyle,
573 cx: &mut App,
574 ) -> Self {
575 let mut this = Self::Empty;
576 for block in blocks {
577 this.append(block, &language_registry, path_style, cx);
578 }
579 this
580 }
581
582 pub fn append(
583 &mut self,
584 block: acp::ContentBlock,
585 language_registry: &Arc<LanguageRegistry>,
586 path_style: PathStyle,
587 cx: &mut App,
588 ) {
589 match (&mut *self, &block) {
590 (ContentBlock::Empty, acp::ContentBlock::ResourceLink(resource_link)) => {
591 *self = ContentBlock::ResourceLink {
592 resource_link: resource_link.clone(),
593 };
594 }
595 (ContentBlock::Empty, acp::ContentBlock::Image(image_content)) => {
596 if let Some(image) = Self::decode_image(image_content) {
597 *self = ContentBlock::Image { image };
598 } else {
599 let new_content = Self::image_md(image_content);
600 *self = Self::create_markdown_block(new_content, language_registry, cx);
601 }
602 }
603 (ContentBlock::Empty, _) => {
604 let new_content = Self::block_string_contents(&block, path_style);
605 *self = Self::create_markdown_block(new_content, language_registry, cx);
606 }
607 (ContentBlock::Markdown { markdown }, _) => {
608 let new_content = Self::block_string_contents(&block, path_style);
609 markdown.update(cx, |markdown, cx| markdown.append(&new_content, cx));
610 }
611 (ContentBlock::ResourceLink { resource_link }, _) => {
612 let existing_content = Self::resource_link_md(&resource_link.uri, path_style);
613 let new_content = Self::block_string_contents(&block, path_style);
614 let combined = format!("{}\n{}", existing_content, new_content);
615 *self = Self::create_markdown_block(combined, language_registry, cx);
616 }
617 (ContentBlock::Image { .. }, _) => {
618 let new_content = Self::block_string_contents(&block, path_style);
619 let combined = format!("`Image`\n{}", new_content);
620 *self = Self::create_markdown_block(combined, language_registry, cx);
621 }
622 }
623 }
624
625 fn decode_image(image_content: &acp::ImageContent) -> Option<Arc<gpui::Image>> {
626 use base64::Engine as _;
627
628 let bytes = base64::engine::general_purpose::STANDARD
629 .decode(image_content.data.as_bytes())
630 .ok()?;
631 let format = gpui::ImageFormat::from_mime_type(&image_content.mime_type)?;
632 Some(Arc::new(gpui::Image::from_bytes(format, bytes)))
633 }
634
635 fn create_markdown_block(
636 content: String,
637 language_registry: &Arc<LanguageRegistry>,
638 cx: &mut App,
639 ) -> ContentBlock {
640 ContentBlock::Markdown {
641 markdown: cx
642 .new(|cx| Markdown::new(content.into(), Some(language_registry.clone()), None, cx)),
643 }
644 }
645
646 fn block_string_contents(block: &acp::ContentBlock, path_style: PathStyle) -> String {
647 match block {
648 acp::ContentBlock::Text(text_content) => text_content.text.clone(),
649 acp::ContentBlock::ResourceLink(resource_link) => {
650 Self::resource_link_md(&resource_link.uri, path_style)
651 }
652 acp::ContentBlock::Resource(acp::EmbeddedResource {
653 resource:
654 acp::EmbeddedResourceResource::TextResourceContents(acp::TextResourceContents {
655 uri,
656 ..
657 }),
658 ..
659 }) => Self::resource_link_md(uri, path_style),
660 acp::ContentBlock::Image(image) => Self::image_md(image),
661 _ => String::new(),
662 }
663 }
664
665 fn resource_link_md(uri: &str, path_style: PathStyle) -> String {
666 if let Some(uri) = MentionUri::parse(uri, path_style).log_err() {
667 uri.as_link().to_string()
668 } else {
669 uri.to_string()
670 }
671 }
672
673 fn image_md(_image: &acp::ImageContent) -> String {
674 "`Image`".into()
675 }
676
677 pub fn to_markdown<'a>(&'a self, cx: &'a App) -> &'a str {
678 match self {
679 ContentBlock::Empty => "",
680 ContentBlock::Markdown { markdown } => markdown.read(cx).source(),
681 ContentBlock::ResourceLink { resource_link } => &resource_link.uri,
682 ContentBlock::Image { .. } => "`Image`",
683 }
684 }
685
686 pub fn markdown(&self) -> Option<&Entity<Markdown>> {
687 match self {
688 ContentBlock::Empty => None,
689 ContentBlock::Markdown { markdown } => Some(markdown),
690 ContentBlock::ResourceLink { .. } => None,
691 ContentBlock::Image { .. } => None,
692 }
693 }
694
695 pub fn resource_link(&self) -> Option<&acp::ResourceLink> {
696 match self {
697 ContentBlock::ResourceLink { resource_link } => Some(resource_link),
698 _ => None,
699 }
700 }
701
702 pub fn image(&self) -> Option<&Arc<gpui::Image>> {
703 match self {
704 ContentBlock::Image { image } => Some(image),
705 _ => None,
706 }
707 }
708}
709
710#[derive(Debug)]
711pub enum ToolCallContent {
712 ContentBlock(ContentBlock),
713 Diff(Entity<Diff>),
714 Terminal(Entity<Terminal>),
715}
716
717impl ToolCallContent {
718 pub fn from_acp(
719 content: acp::ToolCallContent,
720 language_registry: Arc<LanguageRegistry>,
721 path_style: PathStyle,
722 terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
723 cx: &mut App,
724 ) -> Result<Option<Self>> {
725 match content {
726 acp::ToolCallContent::Content(acp::Content { content, .. }) => {
727 Ok(Some(Self::ContentBlock(ContentBlock::new(
728 content,
729 &language_registry,
730 path_style,
731 cx,
732 ))))
733 }
734 acp::ToolCallContent::Diff(diff) => Ok(Some(Self::Diff(cx.new(|cx| {
735 Diff::finalized(
736 diff.path.to_string_lossy().into_owned(),
737 diff.old_text,
738 diff.new_text,
739 language_registry,
740 cx,
741 )
742 })))),
743 acp::ToolCallContent::Terminal(acp::Terminal { terminal_id, .. }) => terminals
744 .get(&terminal_id)
745 .cloned()
746 .map(|terminal| Some(Self::Terminal(terminal)))
747 .ok_or_else(|| anyhow::anyhow!("Terminal with id `{}` not found", terminal_id)),
748 _ => Ok(None),
749 }
750 }
751
752 pub fn update_from_acp(
753 &mut self,
754 new: acp::ToolCallContent,
755 language_registry: Arc<LanguageRegistry>,
756 path_style: PathStyle,
757 terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
758 cx: &mut App,
759 ) -> Result<bool> {
760 let needs_update = match (&self, &new) {
761 (Self::Diff(old_diff), acp::ToolCallContent::Diff(new_diff)) => {
762 old_diff.read(cx).needs_update(
763 new_diff.old_text.as_deref().unwrap_or(""),
764 &new_diff.new_text,
765 cx,
766 )
767 }
768 _ => true,
769 };
770
771 if let Some(update) = Self::from_acp(new, language_registry, path_style, terminals, cx)? {
772 if needs_update {
773 *self = update;
774 }
775 Ok(true)
776 } else {
777 Ok(false)
778 }
779 }
780
781 pub fn to_markdown(&self, cx: &App) -> String {
782 match self {
783 Self::ContentBlock(content) => content.to_markdown(cx).to_string(),
784 Self::Diff(diff) => diff.read(cx).to_markdown(cx),
785 Self::Terminal(terminal) => terminal.read(cx).to_markdown(cx),
786 }
787 }
788
789 pub fn image(&self) -> Option<&Arc<gpui::Image>> {
790 match self {
791 Self::ContentBlock(content) => content.image(),
792 _ => None,
793 }
794 }
795}
796
797#[derive(Debug, PartialEq)]
798pub enum ToolCallUpdate {
799 UpdateFields(acp::ToolCallUpdate),
800 UpdateDiff(ToolCallUpdateDiff),
801 UpdateTerminal(ToolCallUpdateTerminal),
802}
803
804impl ToolCallUpdate {
805 fn id(&self) -> &acp::ToolCallId {
806 match self {
807 Self::UpdateFields(update) => &update.tool_call_id,
808 Self::UpdateDiff(diff) => &diff.id,
809 Self::UpdateTerminal(terminal) => &terminal.id,
810 }
811 }
812}
813
814impl From<acp::ToolCallUpdate> for ToolCallUpdate {
815 fn from(update: acp::ToolCallUpdate) -> Self {
816 Self::UpdateFields(update)
817 }
818}
819
820impl From<ToolCallUpdateDiff> for ToolCallUpdate {
821 fn from(diff: ToolCallUpdateDiff) -> Self {
822 Self::UpdateDiff(diff)
823 }
824}
825
826#[derive(Debug, PartialEq)]
827pub struct ToolCallUpdateDiff {
828 pub id: acp::ToolCallId,
829 pub diff: Entity<Diff>,
830}
831
832impl From<ToolCallUpdateTerminal> for ToolCallUpdate {
833 fn from(terminal: ToolCallUpdateTerminal) -> Self {
834 Self::UpdateTerminal(terminal)
835 }
836}
837
838#[derive(Debug, PartialEq)]
839pub struct ToolCallUpdateTerminal {
840 pub id: acp::ToolCallId,
841 pub terminal: Entity<Terminal>,
842}
843
844#[derive(Debug, Default)]
845pub struct Plan {
846 pub entries: Vec<PlanEntry>,
847}
848
849#[derive(Debug)]
850pub struct PlanStats<'a> {
851 pub in_progress_entry: Option<&'a PlanEntry>,
852 pub pending: u32,
853 pub completed: u32,
854}
855
856impl Plan {
857 pub fn is_empty(&self) -> bool {
858 self.entries.is_empty()
859 }
860
861 pub fn stats(&self) -> PlanStats<'_> {
862 let mut stats = PlanStats {
863 in_progress_entry: None,
864 pending: 0,
865 completed: 0,
866 };
867
868 for entry in &self.entries {
869 match &entry.status {
870 acp::PlanEntryStatus::Pending => {
871 stats.pending += 1;
872 }
873 acp::PlanEntryStatus::InProgress => {
874 stats.in_progress_entry = stats.in_progress_entry.or(Some(entry));
875 }
876 acp::PlanEntryStatus::Completed => {
877 stats.completed += 1;
878 }
879 _ => {}
880 }
881 }
882
883 stats
884 }
885}
886
887#[derive(Debug)]
888pub struct PlanEntry {
889 pub content: Entity<Markdown>,
890 pub priority: acp::PlanEntryPriority,
891 pub status: acp::PlanEntryStatus,
892}
893
894impl PlanEntry {
895 pub fn from_acp(entry: acp::PlanEntry, cx: &mut App) -> Self {
896 Self {
897 content: cx.new(|cx| Markdown::new(entry.content.into(), None, None, cx)),
898 priority: entry.priority,
899 status: entry.status,
900 }
901 }
902}
903
904#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
905pub struct TokenUsage {
906 pub max_tokens: u64,
907 pub used_tokens: u64,
908 pub input_tokens: u64,
909 pub output_tokens: u64,
910 pub max_output_tokens: Option<u64>,
911}
912
913pub const TOKEN_USAGE_WARNING_THRESHOLD: f32 = 0.8;
914
915impl TokenUsage {
916 pub fn ratio(&self) -> TokenUsageRatio {
917 #[cfg(debug_assertions)]
918 let warning_threshold: f32 = std::env::var("ZED_THREAD_WARNING_THRESHOLD")
919 .unwrap_or(TOKEN_USAGE_WARNING_THRESHOLD.to_string())
920 .parse()
921 .unwrap();
922 #[cfg(not(debug_assertions))]
923 let warning_threshold: f32 = TOKEN_USAGE_WARNING_THRESHOLD;
924
925 // When the maximum is unknown because there is no selected model,
926 // avoid showing the token limit warning.
927 if self.max_tokens == 0 {
928 TokenUsageRatio::Normal
929 } else if self.used_tokens >= self.max_tokens {
930 TokenUsageRatio::Exceeded
931 } else if self.used_tokens as f32 / self.max_tokens as f32 >= warning_threshold {
932 TokenUsageRatio::Warning
933 } else {
934 TokenUsageRatio::Normal
935 }
936 }
937}
938
939#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
940pub enum TokenUsageRatio {
941 Normal,
942 Warning,
943 Exceeded,
944}
945
946#[derive(Debug, Clone)]
947pub struct RetryStatus {
948 pub last_error: SharedString,
949 pub attempt: usize,
950 pub max_attempts: usize,
951 pub started_at: Instant,
952 pub duration: Duration,
953}
954
955struct RunningTurn {
956 id: u32,
957 send_task: Task<()>,
958}
959
960pub struct AcpThread {
961 session_id: acp::SessionId,
962 work_dirs: Option<PathList>,
963 parent_session_id: Option<acp::SessionId>,
964 title: SharedString,
965 provisional_title: Option<SharedString>,
966 entries: Vec<AgentThreadEntry>,
967 plan: Plan,
968 project: Entity<Project>,
969 action_log: Entity<ActionLog>,
970 shared_buffers: HashMap<Entity<Buffer>, BufferSnapshot>,
971 turn_id: u32,
972 running_turn: Option<RunningTurn>,
973 connection: Rc<dyn AgentConnection>,
974 token_usage: Option<TokenUsage>,
975 prompt_capabilities: acp::PromptCapabilities,
976 _observe_prompt_capabilities: Task<anyhow::Result<()>>,
977 terminals: HashMap<acp::TerminalId, Entity<Terminal>>,
978 pending_terminal_output: HashMap<acp::TerminalId, Vec<Vec<u8>>>,
979 pending_terminal_exit: HashMap<acp::TerminalId, acp::TerminalExitStatus>,
980 had_error: bool,
981 /// The user's unsent prompt text, persisted so it can be restored when reloading the thread.
982 draft_prompt: Option<Vec<acp::ContentBlock>>,
983 /// The initial scroll position for the thread view, set during session registration.
984 ui_scroll_position: Option<gpui::ListOffset>,
985 /// Buffer for smooth text streaming. Holds text that has been received from
986 /// the model but not yet revealed in the UI. A timer task drains this buffer
987 /// gradually to create a fluid typing effect instead of choppy chunk-at-a-time
988 /// updates.
989 streaming_text_buffer: Option<StreamingTextBuffer>,
990}
991
992struct StreamingTextBuffer {
993 /// Text received from the model but not yet appended to the Markdown source.
994 pending: String,
995 /// The number of bytes to reveal per timer turn.
996 bytes_to_reveal_per_tick: usize,
997 /// The Markdown entity being streamed into.
998 target: Entity<Markdown>,
999 /// Timer task that periodically moves text from `pending` into `source`.
1000 _reveal_task: Task<()>,
1001}
1002
1003impl StreamingTextBuffer {
1004 /// The number of milliseconds between each timer tick, controlling how quickly
1005 /// text is revealed.
1006 const TASK_UPDATE_MS: u64 = 16;
1007 /// The time in milliseconds to reveal the entire pending text.
1008 const REVEAL_TARGET: f32 = 200.0;
1009}
1010
1011impl From<&AcpThread> for ActionLogTelemetry {
1012 fn from(value: &AcpThread) -> Self {
1013 Self {
1014 agent_telemetry_id: value.connection().telemetry_id(),
1015 session_id: value.session_id.0.clone(),
1016 }
1017 }
1018}
1019
1020#[derive(Debug)]
1021pub enum AcpThreadEvent {
1022 NewEntry,
1023 TitleUpdated,
1024 TokenUsageUpdated,
1025 EntryUpdated(usize),
1026 EntriesRemoved(Range<usize>),
1027 ToolAuthorizationRequested(acp::ToolCallId),
1028 ToolAuthorizationReceived(acp::ToolCallId),
1029 Retry(RetryStatus),
1030 SubagentSpawned(acp::SessionId),
1031 Stopped(acp::StopReason),
1032 Error,
1033 LoadError(LoadError),
1034 PromptCapabilitiesUpdated,
1035 Refusal,
1036 AvailableCommandsUpdated(Vec<acp::AvailableCommand>),
1037 ModeUpdated(acp::SessionModeId),
1038 ConfigOptionsUpdated(Vec<acp::SessionConfigOption>),
1039}
1040
1041impl EventEmitter<AcpThreadEvent> for AcpThread {}
1042
1043#[derive(Debug, Clone)]
1044pub enum TerminalProviderEvent {
1045 Created {
1046 terminal_id: acp::TerminalId,
1047 label: String,
1048 cwd: Option<PathBuf>,
1049 output_byte_limit: Option<u64>,
1050 terminal: Entity<::terminal::Terminal>,
1051 },
1052 Output {
1053 terminal_id: acp::TerminalId,
1054 data: Vec<u8>,
1055 },
1056 TitleChanged {
1057 terminal_id: acp::TerminalId,
1058 title: String,
1059 },
1060 Exit {
1061 terminal_id: acp::TerminalId,
1062 status: acp::TerminalExitStatus,
1063 },
1064}
1065
1066#[derive(Debug, Clone)]
1067pub enum TerminalProviderCommand {
1068 WriteInput {
1069 terminal_id: acp::TerminalId,
1070 bytes: Vec<u8>,
1071 },
1072 Resize {
1073 terminal_id: acp::TerminalId,
1074 cols: u16,
1075 rows: u16,
1076 },
1077 Close {
1078 terminal_id: acp::TerminalId,
1079 },
1080}
1081
1082#[derive(PartialEq, Eq, Debug)]
1083pub enum ThreadStatus {
1084 Idle,
1085 Generating,
1086}
1087
1088#[derive(Debug, Clone)]
1089pub enum LoadError {
1090 Unsupported {
1091 command: SharedString,
1092 current_version: SharedString,
1093 minimum_version: SharedString,
1094 },
1095 FailedToInstall(SharedString),
1096 Exited {
1097 status: ExitStatus,
1098 },
1099 Other(SharedString),
1100}
1101
1102impl Display for LoadError {
1103 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1104 match self {
1105 LoadError::Unsupported {
1106 command: path,
1107 current_version,
1108 minimum_version,
1109 } => {
1110 write!(
1111 f,
1112 "version {current_version} from {path} is not supported (need at least {minimum_version})"
1113 )
1114 }
1115 LoadError::FailedToInstall(msg) => write!(f, "Failed to install: {msg}"),
1116 LoadError::Exited { status } => write!(f, "Server exited with status {status}"),
1117 LoadError::Other(msg) => write!(f, "{msg}"),
1118 }
1119 }
1120}
1121
1122impl Error for LoadError {}
1123
1124impl AcpThread {
1125 pub fn new(
1126 parent_session_id: Option<acp::SessionId>,
1127 title: impl Into<SharedString>,
1128 work_dirs: Option<PathList>,
1129 connection: Rc<dyn AgentConnection>,
1130 project: Entity<Project>,
1131 action_log: Entity<ActionLog>,
1132 session_id: acp::SessionId,
1133 mut prompt_capabilities_rx: watch::Receiver<acp::PromptCapabilities>,
1134 cx: &mut Context<Self>,
1135 ) -> Self {
1136 let prompt_capabilities = prompt_capabilities_rx.borrow().clone();
1137 let task = cx.spawn::<_, anyhow::Result<()>>(async move |this, cx| {
1138 loop {
1139 let caps = prompt_capabilities_rx.recv().await?;
1140 this.update(cx, |this, cx| {
1141 this.prompt_capabilities = caps;
1142 cx.emit(AcpThreadEvent::PromptCapabilitiesUpdated);
1143 })?;
1144 }
1145 });
1146
1147 Self {
1148 parent_session_id,
1149 work_dirs,
1150 action_log,
1151 shared_buffers: Default::default(),
1152 entries: Default::default(),
1153 plan: Default::default(),
1154 title: title.into(),
1155 provisional_title: None,
1156 project,
1157 running_turn: None,
1158 turn_id: 0,
1159 connection,
1160 session_id,
1161 token_usage: None,
1162 prompt_capabilities,
1163 _observe_prompt_capabilities: task,
1164 terminals: HashMap::default(),
1165 pending_terminal_output: HashMap::default(),
1166 pending_terminal_exit: HashMap::default(),
1167 had_error: false,
1168 draft_prompt: None,
1169 ui_scroll_position: None,
1170 streaming_text_buffer: None,
1171 }
1172 }
1173
1174 pub fn parent_session_id(&self) -> Option<&acp::SessionId> {
1175 self.parent_session_id.as_ref()
1176 }
1177
1178 pub fn prompt_capabilities(&self) -> acp::PromptCapabilities {
1179 self.prompt_capabilities.clone()
1180 }
1181
1182 pub fn draft_prompt(&self) -> Option<&[acp::ContentBlock]> {
1183 self.draft_prompt.as_deref()
1184 }
1185
1186 pub fn set_draft_prompt(&mut self, prompt: Option<Vec<acp::ContentBlock>>) {
1187 self.draft_prompt = prompt;
1188 }
1189
1190 pub fn ui_scroll_position(&self) -> Option<gpui::ListOffset> {
1191 self.ui_scroll_position
1192 }
1193
1194 pub fn set_ui_scroll_position(&mut self, position: Option<gpui::ListOffset>) {
1195 self.ui_scroll_position = position;
1196 }
1197
1198 pub fn connection(&self) -> &Rc<dyn AgentConnection> {
1199 &self.connection
1200 }
1201
1202 pub fn action_log(&self) -> &Entity<ActionLog> {
1203 &self.action_log
1204 }
1205
1206 pub fn project(&self) -> &Entity<Project> {
1207 &self.project
1208 }
1209
1210 pub fn title(&self) -> SharedString {
1211 self.provisional_title
1212 .clone()
1213 .unwrap_or_else(|| self.title.clone())
1214 }
1215
1216 pub fn has_provisional_title(&self) -> bool {
1217 self.provisional_title.is_some()
1218 }
1219
1220 pub fn entries(&self) -> &[AgentThreadEntry] {
1221 &self.entries
1222 }
1223
1224 pub fn session_id(&self) -> &acp::SessionId {
1225 &self.session_id
1226 }
1227
1228 pub fn work_dirs(&self) -> Option<&PathList> {
1229 self.work_dirs.as_ref()
1230 }
1231
1232 pub fn status(&self) -> ThreadStatus {
1233 if self.running_turn.is_some() {
1234 ThreadStatus::Generating
1235 } else {
1236 ThreadStatus::Idle
1237 }
1238 }
1239
1240 pub fn had_error(&self) -> bool {
1241 self.had_error
1242 }
1243
1244 pub fn is_waiting_for_confirmation(&self) -> bool {
1245 for entry in self.entries.iter().rev() {
1246 match entry {
1247 AgentThreadEntry::UserMessage(_) => return false,
1248 AgentThreadEntry::ToolCall(ToolCall {
1249 status: ToolCallStatus::WaitingForConfirmation { .. },
1250 ..
1251 }) => return true,
1252 AgentThreadEntry::ToolCall(_) | AgentThreadEntry::AssistantMessage(_) => {}
1253 }
1254 }
1255 false
1256 }
1257
1258 pub fn token_usage(&self) -> Option<&TokenUsage> {
1259 self.token_usage.as_ref()
1260 }
1261
1262 pub fn has_pending_edit_tool_calls(&self) -> bool {
1263 for entry in self.entries.iter().rev() {
1264 match entry {
1265 AgentThreadEntry::UserMessage(_) => return false,
1266 AgentThreadEntry::ToolCall(
1267 call @ ToolCall {
1268 status: ToolCallStatus::InProgress | ToolCallStatus::Pending,
1269 ..
1270 },
1271 ) if call.diffs().next().is_some() => {
1272 return true;
1273 }
1274 AgentThreadEntry::ToolCall(_) | AgentThreadEntry::AssistantMessage(_) => {}
1275 }
1276 }
1277
1278 false
1279 }
1280
1281 pub fn has_in_progress_tool_calls(&self) -> bool {
1282 for entry in self.entries.iter().rev() {
1283 match entry {
1284 AgentThreadEntry::UserMessage(_) => return false,
1285 AgentThreadEntry::ToolCall(ToolCall {
1286 status: ToolCallStatus::InProgress | ToolCallStatus::Pending,
1287 ..
1288 }) => {
1289 return true;
1290 }
1291 AgentThreadEntry::ToolCall(_) | AgentThreadEntry::AssistantMessage(_) => {}
1292 }
1293 }
1294
1295 false
1296 }
1297
1298 pub fn used_tools_since_last_user_message(&self) -> bool {
1299 for entry in self.entries.iter().rev() {
1300 match entry {
1301 AgentThreadEntry::UserMessage(..) => return false,
1302 AgentThreadEntry::AssistantMessage(..) => continue,
1303 AgentThreadEntry::ToolCall(..) => return true,
1304 }
1305 }
1306
1307 false
1308 }
1309
1310 pub fn handle_session_update(
1311 &mut self,
1312 update: acp::SessionUpdate,
1313 cx: &mut Context<Self>,
1314 ) -> Result<(), acp::Error> {
1315 match update {
1316 acp::SessionUpdate::UserMessageChunk(acp::ContentChunk { content, .. }) => {
1317 self.push_user_content_block(None, content, cx);
1318 }
1319 acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk { content, .. }) => {
1320 self.push_assistant_content_block(content, false, cx);
1321 }
1322 acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk { content, .. }) => {
1323 self.push_assistant_content_block(content, true, cx);
1324 }
1325 acp::SessionUpdate::ToolCall(tool_call) => {
1326 self.upsert_tool_call(tool_call, cx)?;
1327 }
1328 acp::SessionUpdate::ToolCallUpdate(tool_call_update) => {
1329 self.update_tool_call(tool_call_update, cx)?;
1330 }
1331 acp::SessionUpdate::Plan(plan) => {
1332 self.update_plan(plan, cx);
1333 }
1334 acp::SessionUpdate::SessionInfoUpdate(info_update) => {
1335 if let acp::MaybeUndefined::Value(title) = info_update.title {
1336 let had_provisional = self.provisional_title.take().is_some();
1337 let title: SharedString = title.into();
1338 if title != self.title {
1339 self.title = title;
1340 cx.emit(AcpThreadEvent::TitleUpdated);
1341 } else if had_provisional {
1342 cx.emit(AcpThreadEvent::TitleUpdated);
1343 }
1344 }
1345 }
1346 acp::SessionUpdate::AvailableCommandsUpdate(acp::AvailableCommandsUpdate {
1347 available_commands,
1348 ..
1349 }) => cx.emit(AcpThreadEvent::AvailableCommandsUpdated(available_commands)),
1350 acp::SessionUpdate::CurrentModeUpdate(acp::CurrentModeUpdate {
1351 current_mode_id,
1352 ..
1353 }) => cx.emit(AcpThreadEvent::ModeUpdated(current_mode_id)),
1354 acp::SessionUpdate::ConfigOptionUpdate(acp::ConfigOptionUpdate {
1355 config_options,
1356 ..
1357 }) => cx.emit(AcpThreadEvent::ConfigOptionsUpdated(config_options)),
1358 _ => {}
1359 }
1360 Ok(())
1361 }
1362
1363 pub fn push_user_content_block(
1364 &mut self,
1365 message_id: Option<UserMessageId>,
1366 chunk: acp::ContentBlock,
1367 cx: &mut Context<Self>,
1368 ) {
1369 self.push_user_content_block_with_indent(message_id, chunk, false, cx)
1370 }
1371
1372 pub fn push_user_content_block_with_indent(
1373 &mut self,
1374 message_id: Option<UserMessageId>,
1375 chunk: acp::ContentBlock,
1376 indented: bool,
1377 cx: &mut Context<Self>,
1378 ) {
1379 let language_registry = self.project.read(cx).languages().clone();
1380 let path_style = self.project.read(cx).path_style(cx);
1381 let entries_len = self.entries.len();
1382
1383 if let Some(last_entry) = self.entries.last_mut()
1384 && let AgentThreadEntry::UserMessage(UserMessage {
1385 id,
1386 content,
1387 chunks,
1388 indented: existing_indented,
1389 ..
1390 }) = last_entry
1391 && *existing_indented == indented
1392 {
1393 Self::flush_streaming_text(&mut self.streaming_text_buffer, cx);
1394 *id = message_id.or(id.take());
1395 content.append(chunk.clone(), &language_registry, path_style, cx);
1396 chunks.push(chunk);
1397 let idx = entries_len - 1;
1398 cx.emit(AcpThreadEvent::EntryUpdated(idx));
1399 } else {
1400 let content = ContentBlock::new(chunk.clone(), &language_registry, path_style, cx);
1401 self.push_entry(
1402 AgentThreadEntry::UserMessage(UserMessage {
1403 id: message_id,
1404 content,
1405 chunks: vec![chunk],
1406 checkpoint: None,
1407 indented,
1408 }),
1409 cx,
1410 );
1411 }
1412 }
1413
1414 pub fn push_assistant_content_block(
1415 &mut self,
1416 chunk: acp::ContentBlock,
1417 is_thought: bool,
1418 cx: &mut Context<Self>,
1419 ) {
1420 self.push_assistant_content_block_with_indent(chunk, is_thought, false, cx)
1421 }
1422
1423 pub fn push_assistant_content_block_with_indent(
1424 &mut self,
1425 chunk: acp::ContentBlock,
1426 is_thought: bool,
1427 indented: bool,
1428 cx: &mut Context<Self>,
1429 ) {
1430 let path_style = self.project.read(cx).path_style(cx);
1431
1432 // For text chunks going to an existing Markdown block, buffer for smooth
1433 // streaming instead of appending all at once which may feel more choppy.
1434 if let acp::ContentBlock::Text(text_content) = &chunk {
1435 if let Some(markdown) = self.streaming_markdown_target(is_thought, indented) {
1436 let entries_len = self.entries.len();
1437 cx.emit(AcpThreadEvent::EntryUpdated(entries_len - 1));
1438 self.buffer_streaming_text(&markdown, text_content.text.clone(), cx);
1439 return;
1440 }
1441 }
1442
1443 let language_registry = self.project.read(cx).languages().clone();
1444 let entries_len = self.entries.len();
1445 if let Some(last_entry) = self.entries.last_mut()
1446 && let AgentThreadEntry::AssistantMessage(AssistantMessage {
1447 chunks,
1448 indented: existing_indented,
1449 is_subagent_output: _,
1450 }) = last_entry
1451 && *existing_indented == indented
1452 {
1453 let idx = entries_len - 1;
1454 Self::flush_streaming_text(&mut self.streaming_text_buffer, cx);
1455 cx.emit(AcpThreadEvent::EntryUpdated(idx));
1456 match (chunks.last_mut(), is_thought) {
1457 (Some(AssistantMessageChunk::Message { block }), false)
1458 | (Some(AssistantMessageChunk::Thought { block }), true) => {
1459 block.append(chunk, &language_registry, path_style, cx)
1460 }
1461 _ => {
1462 let block = ContentBlock::new(chunk, &language_registry, path_style, cx);
1463 if is_thought {
1464 chunks.push(AssistantMessageChunk::Thought { block })
1465 } else {
1466 chunks.push(AssistantMessageChunk::Message { block })
1467 }
1468 }
1469 }
1470 } else {
1471 let block = ContentBlock::new(chunk, &language_registry, path_style, cx);
1472 let chunk = if is_thought {
1473 AssistantMessageChunk::Thought { block }
1474 } else {
1475 AssistantMessageChunk::Message { block }
1476 };
1477
1478 self.push_entry(
1479 AgentThreadEntry::AssistantMessage(AssistantMessage {
1480 chunks: vec![chunk],
1481 indented,
1482 is_subagent_output: false,
1483 }),
1484 cx,
1485 );
1486 }
1487 }
1488
1489 fn streaming_markdown_target(
1490 &self,
1491 is_thought: bool,
1492 indented: bool,
1493 ) -> Option<Entity<Markdown>> {
1494 let last_entry = self.entries.last()?;
1495 if let AgentThreadEntry::AssistantMessage(AssistantMessage {
1496 chunks,
1497 indented: existing_indented,
1498 ..
1499 }) = last_entry
1500 && *existing_indented == indented
1501 && let [.., chunk] = chunks.as_slice()
1502 {
1503 match (chunk, is_thought) {
1504 (
1505 AssistantMessageChunk::Message {
1506 block: ContentBlock::Markdown { markdown },
1507 },
1508 false,
1509 )
1510 | (
1511 AssistantMessageChunk::Thought {
1512 block: ContentBlock::Markdown { markdown },
1513 },
1514 true,
1515 ) => Some(markdown.clone()),
1516 _ => None,
1517 }
1518 } else {
1519 None
1520 }
1521 }
1522
1523 /// Add text to the streaming buffer. If the target changed (e.g. switching
1524 /// from thoughts to message text), flush the old buffer first.
1525 fn buffer_streaming_text(
1526 &mut self,
1527 markdown: &Entity<Markdown>,
1528 text: String,
1529 cx: &mut Context<Self>,
1530 ) {
1531 if let Some(buffer) = &mut self.streaming_text_buffer {
1532 if buffer.target.entity_id() == markdown.entity_id() {
1533 buffer.pending.push_str(&text);
1534
1535 buffer.bytes_to_reveal_per_tick = (buffer.pending.len() as f32
1536 / StreamingTextBuffer::REVEAL_TARGET
1537 * StreamingTextBuffer::TASK_UPDATE_MS as f32)
1538 .ceil() as usize;
1539 return;
1540 }
1541 Self::flush_streaming_text(&mut self.streaming_text_buffer, cx);
1542 }
1543
1544 let target = markdown.clone();
1545 let _reveal_task = self.start_streaming_reveal(cx);
1546 let pending_len = text.len();
1547 let bytes_to_reveal = (pending_len as f32 / StreamingTextBuffer::REVEAL_TARGET
1548 * StreamingTextBuffer::TASK_UPDATE_MS as f32)
1549 .ceil() as usize;
1550 self.streaming_text_buffer = Some(StreamingTextBuffer {
1551 pending: text,
1552 bytes_to_reveal_per_tick: bytes_to_reveal,
1553 target,
1554 _reveal_task,
1555 });
1556 }
1557
1558 /// Flush all buffered streaming text into the Markdown entity immediately.
1559 fn flush_streaming_text(
1560 streaming_text_buffer: &mut Option<StreamingTextBuffer>,
1561 cx: &mut Context<Self>,
1562 ) {
1563 if let Some(buffer) = streaming_text_buffer.take() {
1564 if !buffer.pending.is_empty() {
1565 buffer
1566 .target
1567 .update(cx, |markdown, cx| markdown.append(&buffer.pending, cx));
1568 }
1569 }
1570 }
1571
1572 /// Spawns a foreground task that periodically drains
1573 /// `streaming_text_buffer.pending` into the target `Markdown` entity,
1574 /// producing smooth, continuous text output.
1575 fn start_streaming_reveal(&self, cx: &mut Context<Self>) -> Task<()> {
1576 cx.spawn(async move |this, cx| {
1577 loop {
1578 cx.background_executor()
1579 .timer(Duration::from_millis(StreamingTextBuffer::TASK_UPDATE_MS))
1580 .await;
1581
1582 let should_continue = this
1583 .update(cx, |this, cx| {
1584 let Some(buffer) = &mut this.streaming_text_buffer else {
1585 return false;
1586 };
1587
1588 if buffer.pending.is_empty() {
1589 return true;
1590 }
1591
1592 let pending_len = buffer.pending.len();
1593
1594 let byte_boundary = buffer
1595 .pending
1596 .ceil_char_boundary(buffer.bytes_to_reveal_per_tick)
1597 .min(pending_len);
1598
1599 buffer.target.update(cx, |markdown: &mut Markdown, cx| {
1600 markdown.append(&buffer.pending[..byte_boundary], cx);
1601 buffer.pending.drain(..byte_boundary);
1602 });
1603
1604 true
1605 })
1606 .unwrap_or(false);
1607
1608 if !should_continue {
1609 break;
1610 }
1611 }
1612 })
1613 }
1614
1615 fn push_entry(&mut self, entry: AgentThreadEntry, cx: &mut Context<Self>) {
1616 Self::flush_streaming_text(&mut self.streaming_text_buffer, cx);
1617 self.entries.push(entry);
1618 cx.emit(AcpThreadEvent::NewEntry);
1619 }
1620
1621 pub fn can_set_title(&mut self, cx: &mut Context<Self>) -> bool {
1622 self.connection.set_title(&self.session_id, cx).is_some()
1623 }
1624
1625 pub fn set_title(&mut self, title: SharedString, cx: &mut Context<Self>) -> Task<Result<()>> {
1626 let had_provisional = self.provisional_title.take().is_some();
1627 if title != self.title {
1628 self.title = title.clone();
1629 cx.emit(AcpThreadEvent::TitleUpdated);
1630 if let Some(set_title) = self.connection.set_title(&self.session_id, cx) {
1631 return set_title.run(title, cx);
1632 }
1633 } else if had_provisional {
1634 cx.emit(AcpThreadEvent::TitleUpdated);
1635 }
1636 Task::ready(Ok(()))
1637 }
1638
1639 /// Sets a provisional display title without propagating back to the
1640 /// underlying agent connection. This is used for quick preview titles
1641 /// (e.g. first 20 chars of the user message) that should be shown
1642 /// immediately but replaced once the LLM generates a proper title via
1643 /// `set_title`.
1644 pub fn set_provisional_title(&mut self, title: SharedString, cx: &mut Context<Self>) {
1645 self.provisional_title = Some(title);
1646 cx.emit(AcpThreadEvent::TitleUpdated);
1647 }
1648
1649 pub fn subagent_spawned(&mut self, session_id: acp::SessionId, cx: &mut Context<Self>) {
1650 cx.emit(AcpThreadEvent::SubagentSpawned(session_id));
1651 }
1652
1653 pub fn update_token_usage(&mut self, usage: Option<TokenUsage>, cx: &mut Context<Self>) {
1654 self.token_usage = usage;
1655 cx.emit(AcpThreadEvent::TokenUsageUpdated);
1656 }
1657
1658 pub fn update_retry_status(&mut self, status: RetryStatus, cx: &mut Context<Self>) {
1659 cx.emit(AcpThreadEvent::Retry(status));
1660 }
1661
1662 pub fn update_tool_call(
1663 &mut self,
1664 update: impl Into<ToolCallUpdate>,
1665 cx: &mut Context<Self>,
1666 ) -> Result<()> {
1667 let update = update.into();
1668 let languages = self.project.read(cx).languages().clone();
1669 let path_style = self.project.read(cx).path_style(cx);
1670
1671 let ix = match self.index_for_tool_call(update.id()) {
1672 Some(ix) => ix,
1673 None => {
1674 // Tool call not found - create a failed tool call entry
1675 let failed_tool_call = ToolCall {
1676 id: update.id().clone(),
1677 label: cx.new(|cx| Markdown::new("Tool call not found".into(), None, None, cx)),
1678 kind: acp::ToolKind::Fetch,
1679 content: vec![ToolCallContent::ContentBlock(ContentBlock::new(
1680 "Tool call not found".into(),
1681 &languages,
1682 path_style,
1683 cx,
1684 ))],
1685 status: ToolCallStatus::Failed,
1686 locations: Vec::new(),
1687 resolved_locations: Vec::new(),
1688 raw_input: None,
1689 raw_input_markdown: None,
1690 raw_output: None,
1691 tool_name: None,
1692 subagent_session_info: None,
1693 };
1694 self.push_entry(AgentThreadEntry::ToolCall(failed_tool_call), cx);
1695 return Ok(());
1696 }
1697 };
1698 let AgentThreadEntry::ToolCall(call) = &mut self.entries[ix] else {
1699 unreachable!()
1700 };
1701
1702 match update {
1703 ToolCallUpdate::UpdateFields(update) => {
1704 let location_updated = update.fields.locations.is_some();
1705 call.update_fields(
1706 update.fields,
1707 update.meta,
1708 languages,
1709 path_style,
1710 &self.terminals,
1711 cx,
1712 )?;
1713 if location_updated {
1714 self.resolve_locations(update.tool_call_id, cx);
1715 }
1716 }
1717 ToolCallUpdate::UpdateDiff(update) => {
1718 call.content.clear();
1719 call.content.push(ToolCallContent::Diff(update.diff));
1720 }
1721 ToolCallUpdate::UpdateTerminal(update) => {
1722 call.content.clear();
1723 call.content
1724 .push(ToolCallContent::Terminal(update.terminal));
1725 }
1726 }
1727
1728 cx.emit(AcpThreadEvent::EntryUpdated(ix));
1729
1730 Ok(())
1731 }
1732
1733 /// Updates a tool call if id matches an existing entry, otherwise inserts a new one.
1734 pub fn upsert_tool_call(
1735 &mut self,
1736 tool_call: acp::ToolCall,
1737 cx: &mut Context<Self>,
1738 ) -> Result<(), acp::Error> {
1739 let status = tool_call.status.into();
1740 self.upsert_tool_call_inner(tool_call.into(), status, cx)
1741 }
1742
1743 /// Fails if id does not match an existing entry.
1744 pub fn upsert_tool_call_inner(
1745 &mut self,
1746 update: acp::ToolCallUpdate,
1747 status: ToolCallStatus,
1748 cx: &mut Context<Self>,
1749 ) -> Result<(), acp::Error> {
1750 let language_registry = self.project.read(cx).languages().clone();
1751 let path_style = self.project.read(cx).path_style(cx);
1752 let id = update.tool_call_id.clone();
1753
1754 let agent_telemetry_id = self.connection().telemetry_id();
1755 let session = self.session_id();
1756 let parent_session_id = self.parent_session_id();
1757 if let ToolCallStatus::Completed | ToolCallStatus::Failed = status {
1758 let status = if matches!(status, ToolCallStatus::Completed) {
1759 "completed"
1760 } else {
1761 "failed"
1762 };
1763 telemetry::event!(
1764 "Agent Tool Call Completed",
1765 agent_telemetry_id,
1766 session,
1767 parent_session_id,
1768 status
1769 );
1770 }
1771
1772 if let Some(ix) = self.index_for_tool_call(&id) {
1773 let AgentThreadEntry::ToolCall(call) = &mut self.entries[ix] else {
1774 unreachable!()
1775 };
1776
1777 call.update_fields(
1778 update.fields,
1779 update.meta,
1780 language_registry,
1781 path_style,
1782 &self.terminals,
1783 cx,
1784 )?;
1785 call.status = status;
1786
1787 cx.emit(AcpThreadEvent::EntryUpdated(ix));
1788 } else {
1789 let call = ToolCall::from_acp(
1790 update.try_into()?,
1791 status,
1792 language_registry,
1793 self.project.read(cx).path_style(cx),
1794 &self.terminals,
1795 cx,
1796 )?;
1797 self.push_entry(AgentThreadEntry::ToolCall(call), cx);
1798 };
1799
1800 self.resolve_locations(id, cx);
1801 Ok(())
1802 }
1803
1804 fn index_for_tool_call(&self, id: &acp::ToolCallId) -> Option<usize> {
1805 self.entries
1806 .iter()
1807 .enumerate()
1808 .rev()
1809 .find_map(|(index, entry)| {
1810 if let AgentThreadEntry::ToolCall(tool_call) = entry
1811 && &tool_call.id == id
1812 {
1813 Some(index)
1814 } else {
1815 None
1816 }
1817 })
1818 }
1819
1820 fn tool_call_mut(&mut self, id: &acp::ToolCallId) -> Option<(usize, &mut ToolCall)> {
1821 // The tool call we are looking for is typically the last one, or very close to the end.
1822 // At the moment, it doesn't seem like a hashmap would be a good fit for this use case.
1823 self.entries
1824 .iter_mut()
1825 .enumerate()
1826 .rev()
1827 .find_map(|(index, tool_call)| {
1828 if let AgentThreadEntry::ToolCall(tool_call) = tool_call
1829 && &tool_call.id == id
1830 {
1831 Some((index, tool_call))
1832 } else {
1833 None
1834 }
1835 })
1836 }
1837
1838 pub fn tool_call(&self, id: &acp::ToolCallId) -> Option<(usize, &ToolCall)> {
1839 self.entries
1840 .iter()
1841 .enumerate()
1842 .rev()
1843 .find_map(|(index, tool_call)| {
1844 if let AgentThreadEntry::ToolCall(tool_call) = tool_call
1845 && &tool_call.id == id
1846 {
1847 Some((index, tool_call))
1848 } else {
1849 None
1850 }
1851 })
1852 }
1853
1854 pub fn tool_call_for_subagent(&self, session_id: &acp::SessionId) -> Option<&ToolCall> {
1855 self.entries.iter().find_map(|entry| match entry {
1856 AgentThreadEntry::ToolCall(tool_call) => {
1857 if let Some(subagent_session_info) = &tool_call.subagent_session_info
1858 && &subagent_session_info.session_id == session_id
1859 {
1860 Some(tool_call)
1861 } else {
1862 None
1863 }
1864 }
1865 _ => None,
1866 })
1867 }
1868
1869 pub fn resolve_locations(&mut self, id: acp::ToolCallId, cx: &mut Context<Self>) {
1870 let project = self.project.clone();
1871 let should_update_agent_location = self.parent_session_id.is_none();
1872 let Some((_, tool_call)) = self.tool_call_mut(&id) else {
1873 return;
1874 };
1875 let task = tool_call.resolve_locations(project, cx);
1876 cx.spawn(async move |this, cx| {
1877 let resolved_locations = task.await;
1878
1879 this.update(cx, |this, cx| {
1880 let project = this.project.clone();
1881
1882 for location in resolved_locations.iter().flatten() {
1883 this.shared_buffers
1884 .insert(location.buffer.clone(), location.buffer.read(cx).snapshot());
1885 }
1886 let Some((ix, tool_call)) = this.tool_call_mut(&id) else {
1887 return;
1888 };
1889
1890 if let Some(Some(location)) = resolved_locations.last() {
1891 project.update(cx, |project, cx| {
1892 let should_ignore = if let Some(agent_location) = project
1893 .agent_location()
1894 .filter(|agent_location| agent_location.buffer == location.buffer)
1895 {
1896 let snapshot = location.buffer.read(cx).snapshot();
1897 let old_position = agent_location.position.to_point(&snapshot);
1898 let new_position = location.position.to_point(&snapshot);
1899
1900 // ignore this so that when we get updates from the edit tool
1901 // the position doesn't reset to the startof line
1902 old_position.row == new_position.row
1903 && old_position.column > new_position.column
1904 } else {
1905 false
1906 };
1907 if !should_ignore && should_update_agent_location {
1908 project.set_agent_location(Some(location.into()), cx);
1909 }
1910 });
1911 }
1912
1913 let resolved_locations = resolved_locations
1914 .iter()
1915 .map(|l| l.as_ref().map(|l| AgentLocation::from(l)))
1916 .collect::<Vec<_>>();
1917
1918 if tool_call.resolved_locations != resolved_locations {
1919 tool_call.resolved_locations = resolved_locations;
1920 cx.emit(AcpThreadEvent::EntryUpdated(ix));
1921 }
1922 })
1923 })
1924 .detach();
1925 }
1926
1927 pub fn request_tool_call_authorization(
1928 &mut self,
1929 tool_call: acp::ToolCallUpdate,
1930 options: PermissionOptions,
1931 cx: &mut Context<Self>,
1932 ) -> Result<Task<acp::RequestPermissionOutcome>> {
1933 let (tx, rx) = oneshot::channel();
1934
1935 let status = ToolCallStatus::WaitingForConfirmation {
1936 options,
1937 respond_tx: tx,
1938 };
1939
1940 let tool_call_id = tool_call.tool_call_id.clone();
1941 self.upsert_tool_call_inner(tool_call, status, cx)?;
1942 cx.emit(AcpThreadEvent::ToolAuthorizationRequested(
1943 tool_call_id.clone(),
1944 ));
1945
1946 Ok(cx.spawn(async move |this, cx| {
1947 let outcome = match rx.await {
1948 Ok(option) => acp::RequestPermissionOutcome::Selected(
1949 acp::SelectedPermissionOutcome::new(option),
1950 ),
1951 Err(oneshot::Canceled) => acp::RequestPermissionOutcome::Cancelled,
1952 };
1953 this.update(cx, |_this, cx| {
1954 cx.emit(AcpThreadEvent::ToolAuthorizationReceived(tool_call_id))
1955 })
1956 .ok();
1957 outcome
1958 }))
1959 }
1960
1961 pub fn authorize_tool_call(
1962 &mut self,
1963 id: acp::ToolCallId,
1964 option_id: acp::PermissionOptionId,
1965 option_kind: acp::PermissionOptionKind,
1966 cx: &mut Context<Self>,
1967 ) {
1968 let Some((ix, call)) = self.tool_call_mut(&id) else {
1969 return;
1970 };
1971
1972 let new_status = match option_kind {
1973 acp::PermissionOptionKind::RejectOnce | acp::PermissionOptionKind::RejectAlways => {
1974 ToolCallStatus::Rejected
1975 }
1976 acp::PermissionOptionKind::AllowOnce | acp::PermissionOptionKind::AllowAlways => {
1977 ToolCallStatus::InProgress
1978 }
1979 _ => ToolCallStatus::InProgress,
1980 };
1981
1982 let curr_status = mem::replace(&mut call.status, new_status);
1983
1984 if let ToolCallStatus::WaitingForConfirmation { respond_tx, .. } = curr_status {
1985 respond_tx.send(option_id).log_err();
1986 } else if cfg!(debug_assertions) {
1987 panic!("tried to authorize an already authorized tool call");
1988 }
1989
1990 cx.emit(AcpThreadEvent::EntryUpdated(ix));
1991 }
1992
1993 pub fn plan(&self) -> &Plan {
1994 &self.plan
1995 }
1996
1997 pub fn update_plan(&mut self, request: acp::Plan, cx: &mut Context<Self>) {
1998 let new_entries_len = request.entries.len();
1999 let mut new_entries = request.entries.into_iter();
2000
2001 // Reuse existing markdown to prevent flickering
2002 for (old, new) in self.plan.entries.iter_mut().zip(new_entries.by_ref()) {
2003 let PlanEntry {
2004 content,
2005 priority,
2006 status,
2007 } = old;
2008 content.update(cx, |old, cx| {
2009 old.replace(new.content, cx);
2010 });
2011 *priority = new.priority;
2012 *status = new.status;
2013 }
2014 for new in new_entries {
2015 self.plan.entries.push(PlanEntry::from_acp(new, cx))
2016 }
2017 self.plan.entries.truncate(new_entries_len);
2018
2019 cx.notify();
2020 }
2021
2022 fn clear_completed_plan_entries(&mut self, cx: &mut Context<Self>) {
2023 self.plan
2024 .entries
2025 .retain(|entry| !matches!(entry.status, acp::PlanEntryStatus::Completed));
2026 cx.notify();
2027 }
2028
2029 #[cfg(any(test, feature = "test-support"))]
2030 pub fn send_raw(
2031 &mut self,
2032 message: &str,
2033 cx: &mut Context<Self>,
2034 ) -> BoxFuture<'static, Result<Option<acp::PromptResponse>>> {
2035 self.send(vec![message.into()], cx)
2036 }
2037
2038 pub fn send(
2039 &mut self,
2040 message: Vec<acp::ContentBlock>,
2041 cx: &mut Context<Self>,
2042 ) -> BoxFuture<'static, Result<Option<acp::PromptResponse>>> {
2043 let block = ContentBlock::new_combined(
2044 message.clone(),
2045 self.project.read(cx).languages().clone(),
2046 self.project.read(cx).path_style(cx),
2047 cx,
2048 );
2049 let request = acp::PromptRequest::new(self.session_id.clone(), message.clone());
2050 let git_store = self.project.read(cx).git_store().clone();
2051
2052 let message_id = if self.connection.truncate(&self.session_id, cx).is_some() {
2053 Some(UserMessageId::new())
2054 } else {
2055 None
2056 };
2057
2058 self.run_turn(cx, async move |this, cx| {
2059 this.update(cx, |this, cx| {
2060 this.push_entry(
2061 AgentThreadEntry::UserMessage(UserMessage {
2062 id: message_id.clone(),
2063 content: block,
2064 chunks: message,
2065 checkpoint: None,
2066 indented: false,
2067 }),
2068 cx,
2069 );
2070 })
2071 .ok();
2072
2073 let old_checkpoint = git_store
2074 .update(cx, |git, cx| git.checkpoint(cx))
2075 .await
2076 .context("failed to get old checkpoint")
2077 .log_err();
2078 this.update(cx, |this, cx| {
2079 if let Some((_ix, message)) = this.last_user_message() {
2080 message.checkpoint = old_checkpoint.map(|git_checkpoint| Checkpoint {
2081 git_checkpoint,
2082 show: false,
2083 });
2084 }
2085 this.connection.prompt(message_id, request, cx)
2086 })?
2087 .await
2088 })
2089 }
2090
2091 pub fn can_retry(&self, cx: &App) -> bool {
2092 self.connection.retry(&self.session_id, cx).is_some()
2093 }
2094
2095 pub fn retry(
2096 &mut self,
2097 cx: &mut Context<Self>,
2098 ) -> BoxFuture<'static, Result<Option<acp::PromptResponse>>> {
2099 self.run_turn(cx, async move |this, cx| {
2100 this.update(cx, |this, cx| {
2101 this.connection
2102 .retry(&this.session_id, cx)
2103 .map(|retry| retry.run(cx))
2104 })?
2105 .context("retrying a session is not supported")?
2106 .await
2107 })
2108 }
2109
2110 fn run_turn(
2111 &mut self,
2112 cx: &mut Context<Self>,
2113 f: impl 'static + AsyncFnOnce(WeakEntity<Self>, &mut AsyncApp) -> Result<acp::PromptResponse>,
2114 ) -> BoxFuture<'static, Result<Option<acp::PromptResponse>>> {
2115 self.clear_completed_plan_entries(cx);
2116 self.had_error = false;
2117
2118 let (tx, rx) = oneshot::channel();
2119 let cancel_task = self.cancel(cx);
2120
2121 self.turn_id += 1;
2122 let turn_id = self.turn_id;
2123 self.running_turn = Some(RunningTurn {
2124 id: turn_id,
2125 send_task: cx.spawn(async move |this, cx| {
2126 cancel_task.await;
2127 tx.send(f(this, cx).await).ok();
2128 }),
2129 });
2130
2131 cx.spawn(async move |this, cx| {
2132 let response = rx.await;
2133
2134 this.update(cx, |this, cx| this.update_last_checkpoint(cx))?
2135 .await?;
2136
2137 this.update(cx, |this, cx| {
2138 if this.parent_session_id.is_none() {
2139 this.project
2140 .update(cx, |project, cx| project.set_agent_location(None, cx));
2141 }
2142 let Ok(response) = response else {
2143 // tx dropped, just return
2144 return Ok(None);
2145 };
2146
2147 let is_same_turn = this
2148 .running_turn
2149 .as_ref()
2150 .is_some_and(|turn| turn_id == turn.id);
2151
2152 // If the user submitted a follow up message, running_turn might
2153 // already point to a different turn. Therefore we only want to
2154 // take the task if it's the same turn.
2155 if is_same_turn {
2156 this.running_turn.take();
2157 }
2158
2159 match response {
2160 Ok(r) => {
2161 Self::flush_streaming_text(&mut this.streaming_text_buffer, cx);
2162
2163 if r.stop_reason == acp::StopReason::MaxTokens {
2164 this.had_error = true;
2165 cx.emit(AcpThreadEvent::Error);
2166 log::error!("Max tokens reached. Usage: {:?}", this.token_usage);
2167 return Err(anyhow!("Max tokens reached"));
2168 }
2169
2170 let canceled = matches!(r.stop_reason, acp::StopReason::Cancelled);
2171 if canceled {
2172 this.mark_pending_tools_as_canceled();
2173 }
2174
2175 // Handle refusal - distinguish between user prompt and tool call refusals
2176 if let acp::StopReason::Refusal = r.stop_reason {
2177 this.had_error = true;
2178 if let Some((user_msg_ix, _)) = this.last_user_message() {
2179 // Check if there's a completed tool call with results after the last user message
2180 // This indicates the refusal is in response to tool output, not the user's prompt
2181 let has_completed_tool_call_after_user_msg =
2182 this.entries.iter().skip(user_msg_ix + 1).any(|entry| {
2183 if let AgentThreadEntry::ToolCall(tool_call) = entry {
2184 // Check if the tool call has completed and has output
2185 matches!(tool_call.status, ToolCallStatus::Completed)
2186 && tool_call.raw_output.is_some()
2187 } else {
2188 false
2189 }
2190 });
2191
2192 if has_completed_tool_call_after_user_msg {
2193 // Refusal is due to tool output - don't truncate, just notify
2194 // The model refused based on what the tool returned
2195 cx.emit(AcpThreadEvent::Refusal);
2196 } else {
2197 // User prompt was refused - truncate back to before the user message
2198 let range = user_msg_ix..this.entries.len();
2199 if range.start < range.end {
2200 this.entries.truncate(user_msg_ix);
2201 cx.emit(AcpThreadEvent::EntriesRemoved(range));
2202 }
2203 cx.emit(AcpThreadEvent::Refusal);
2204 }
2205 } else {
2206 // No user message found, treat as general refusal
2207 cx.emit(AcpThreadEvent::Refusal);
2208 }
2209 }
2210
2211 cx.emit(AcpThreadEvent::Stopped(r.stop_reason));
2212 Ok(Some(r))
2213 }
2214 Err(e) => {
2215 Self::flush_streaming_text(&mut this.streaming_text_buffer, cx);
2216
2217 this.had_error = true;
2218 cx.emit(AcpThreadEvent::Error);
2219 log::error!("Error in run turn: {:?}", e);
2220 Err(e)
2221 }
2222 }
2223 })?
2224 })
2225 .boxed()
2226 }
2227
2228 pub fn cancel(&mut self, cx: &mut Context<Self>) -> Task<()> {
2229 let Some(turn) = self.running_turn.take() else {
2230 return Task::ready(());
2231 };
2232 self.connection.cancel(&self.session_id, cx);
2233
2234 Self::flush_streaming_text(&mut self.streaming_text_buffer, cx);
2235 self.mark_pending_tools_as_canceled();
2236
2237 // Wait for the send task to complete
2238 cx.background_spawn(turn.send_task)
2239 }
2240
2241 fn mark_pending_tools_as_canceled(&mut self) {
2242 for entry in self.entries.iter_mut() {
2243 if let AgentThreadEntry::ToolCall(call) = entry {
2244 let cancel = matches!(
2245 call.status,
2246 ToolCallStatus::Pending
2247 | ToolCallStatus::WaitingForConfirmation { .. }
2248 | ToolCallStatus::InProgress
2249 );
2250
2251 if cancel {
2252 call.status = ToolCallStatus::Canceled;
2253 }
2254 }
2255 }
2256 }
2257
2258 /// Restores the git working tree to the state at the given checkpoint (if one exists)
2259 pub fn restore_checkpoint(
2260 &mut self,
2261 id: UserMessageId,
2262 cx: &mut Context<Self>,
2263 ) -> Task<Result<()>> {
2264 let Some((_, message)) = self.user_message_mut(&id) else {
2265 return Task::ready(Err(anyhow!("message not found")));
2266 };
2267
2268 let checkpoint = message
2269 .checkpoint
2270 .as_ref()
2271 .map(|c| c.git_checkpoint.clone());
2272
2273 // Cancel any in-progress generation before restoring
2274 let cancel_task = self.cancel(cx);
2275 let rewind = self.rewind(id.clone(), cx);
2276 let git_store = self.project.read(cx).git_store().clone();
2277
2278 cx.spawn(async move |_, cx| {
2279 cancel_task.await;
2280 rewind.await?;
2281 if let Some(checkpoint) = checkpoint {
2282 git_store
2283 .update(cx, |git, cx| git.restore_checkpoint(checkpoint, cx))
2284 .await?;
2285 }
2286
2287 Ok(())
2288 })
2289 }
2290
2291 /// Rewinds this thread to before the entry at `index`, removing it and all
2292 /// subsequent entries while rejecting any action_log changes made from that point.
2293 /// Unlike `restore_checkpoint`, this method does not restore from git.
2294 pub fn rewind(&mut self, id: UserMessageId, cx: &mut Context<Self>) -> Task<Result<()>> {
2295 let Some(truncate) = self.connection.truncate(&self.session_id, cx) else {
2296 return Task::ready(Err(anyhow!("not supported")));
2297 };
2298
2299 Self::flush_streaming_text(&mut self.streaming_text_buffer, cx);
2300 let telemetry = ActionLogTelemetry::from(&*self);
2301 cx.spawn(async move |this, cx| {
2302 cx.update(|cx| truncate.run(id.clone(), cx)).await?;
2303 this.update(cx, |this, cx| {
2304 if let Some((ix, _)) = this.user_message_mut(&id) {
2305 // Collect all terminals from entries that will be removed
2306 let terminals_to_remove: Vec<acp::TerminalId> = this.entries[ix..]
2307 .iter()
2308 .flat_map(|entry| entry.terminals())
2309 .filter_map(|terminal| terminal.read(cx).id().clone().into())
2310 .collect();
2311
2312 let range = ix..this.entries.len();
2313 this.entries.truncate(ix);
2314 cx.emit(AcpThreadEvent::EntriesRemoved(range));
2315
2316 // Kill and remove the terminals
2317 for terminal_id in terminals_to_remove {
2318 if let Some(terminal) = this.terminals.remove(&terminal_id) {
2319 terminal.update(cx, |terminal, cx| {
2320 terminal.kill(cx);
2321 });
2322 }
2323 }
2324 }
2325 this.action_log().update(cx, |action_log, cx| {
2326 action_log.reject_all_edits(Some(telemetry), cx)
2327 })
2328 })?
2329 .await;
2330 Ok(())
2331 })
2332 }
2333
2334 fn update_last_checkpoint(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
2335 let git_store = self.project.read(cx).git_store().clone();
2336
2337 let Some((_, message)) = self.last_user_message() else {
2338 return Task::ready(Ok(()));
2339 };
2340 let Some(user_message_id) = message.id.clone() else {
2341 return Task::ready(Ok(()));
2342 };
2343 let Some(checkpoint) = message.checkpoint.as_ref() else {
2344 return Task::ready(Ok(()));
2345 };
2346 let old_checkpoint = checkpoint.git_checkpoint.clone();
2347
2348 let new_checkpoint = git_store.update(cx, |git, cx| git.checkpoint(cx));
2349 cx.spawn(async move |this, cx| {
2350 let Some(new_checkpoint) = new_checkpoint
2351 .await
2352 .context("failed to get new checkpoint")
2353 .log_err()
2354 else {
2355 return Ok(());
2356 };
2357
2358 let equal = git_store
2359 .update(cx, |git, cx| {
2360 git.compare_checkpoints(old_checkpoint.clone(), new_checkpoint, cx)
2361 })
2362 .await
2363 .unwrap_or(true);
2364
2365 this.update(cx, |this, cx| {
2366 if let Some((ix, message)) = this.user_message_mut(&user_message_id) {
2367 if let Some(checkpoint) = message.checkpoint.as_mut() {
2368 checkpoint.show = !equal;
2369 cx.emit(AcpThreadEvent::EntryUpdated(ix));
2370 }
2371 }
2372 })?;
2373
2374 Ok(())
2375 })
2376 }
2377
2378 fn last_user_message(&mut self) -> Option<(usize, &mut UserMessage)> {
2379 self.entries
2380 .iter_mut()
2381 .enumerate()
2382 .rev()
2383 .find_map(|(ix, entry)| {
2384 if let AgentThreadEntry::UserMessage(message) = entry {
2385 Some((ix, message))
2386 } else {
2387 None
2388 }
2389 })
2390 }
2391
2392 fn user_message_mut(&mut self, id: &UserMessageId) -> Option<(usize, &mut UserMessage)> {
2393 self.entries.iter_mut().enumerate().find_map(|(ix, entry)| {
2394 if let AgentThreadEntry::UserMessage(message) = entry {
2395 if message.id.as_ref() == Some(id) {
2396 Some((ix, message))
2397 } else {
2398 None
2399 }
2400 } else {
2401 None
2402 }
2403 })
2404 }
2405
2406 pub fn read_text_file(
2407 &self,
2408 path: PathBuf,
2409 line: Option<u32>,
2410 limit: Option<u32>,
2411 reuse_shared_snapshot: bool,
2412 cx: &mut Context<Self>,
2413 ) -> Task<Result<String, acp::Error>> {
2414 // Args are 1-based, move to 0-based
2415 let line = line.unwrap_or_default().saturating_sub(1);
2416 let limit = limit.unwrap_or(u32::MAX);
2417 let project = self.project.clone();
2418 let action_log = self.action_log.clone();
2419 let should_update_agent_location = self.parent_session_id.is_none();
2420 cx.spawn(async move |this, cx| {
2421 let load = project.update(cx, |project, cx| {
2422 let path = project
2423 .project_path_for_absolute_path(&path, cx)
2424 .ok_or_else(|| {
2425 acp::Error::resource_not_found(Some(path.display().to_string()))
2426 })?;
2427 Ok::<_, acp::Error>(project.open_buffer(path, cx))
2428 })?;
2429
2430 let buffer = load.await?;
2431
2432 let snapshot = if reuse_shared_snapshot {
2433 this.read_with(cx, |this, _| {
2434 this.shared_buffers.get(&buffer.clone()).cloned()
2435 })
2436 .log_err()
2437 .flatten()
2438 } else {
2439 None
2440 };
2441
2442 let snapshot = if let Some(snapshot) = snapshot {
2443 snapshot
2444 } else {
2445 action_log.update(cx, |action_log, cx| {
2446 action_log.buffer_read(buffer.clone(), cx);
2447 });
2448
2449 let snapshot = buffer.update(cx, |buffer, _| buffer.snapshot());
2450 this.update(cx, |this, _| {
2451 this.shared_buffers.insert(buffer.clone(), snapshot.clone());
2452 })?;
2453 snapshot
2454 };
2455
2456 let max_point = snapshot.max_point();
2457 let start_position = Point::new(line, 0);
2458
2459 if start_position > max_point {
2460 return Err(acp::Error::invalid_params().data(format!(
2461 "Attempting to read beyond the end of the file, line {}:{}",
2462 max_point.row + 1,
2463 max_point.column
2464 )));
2465 }
2466
2467 let start = snapshot.anchor_before(start_position);
2468 let end = snapshot.anchor_before(Point::new(line.saturating_add(limit), 0));
2469
2470 if should_update_agent_location {
2471 project.update(cx, |project, cx| {
2472 project.set_agent_location(
2473 Some(AgentLocation {
2474 buffer: buffer.downgrade(),
2475 position: start,
2476 }),
2477 cx,
2478 );
2479 });
2480 }
2481
2482 Ok(snapshot.text_for_range(start..end).collect::<String>())
2483 })
2484 }
2485
2486 pub fn write_text_file(
2487 &self,
2488 path: PathBuf,
2489 content: String,
2490 cx: &mut Context<Self>,
2491 ) -> Task<Result<()>> {
2492 let project = self.project.clone();
2493 let action_log = self.action_log.clone();
2494 let should_update_agent_location = self.parent_session_id.is_none();
2495 cx.spawn(async move |this, cx| {
2496 let load = project.update(cx, |project, cx| {
2497 let path = project
2498 .project_path_for_absolute_path(&path, cx)
2499 .context("invalid path")?;
2500 anyhow::Ok(project.open_buffer(path, cx))
2501 });
2502 let buffer = load?.await?;
2503 let snapshot = this.update(cx, |this, cx| {
2504 this.shared_buffers
2505 .get(&buffer)
2506 .cloned()
2507 .unwrap_or_else(|| buffer.read(cx).snapshot())
2508 })?;
2509 let edits = cx
2510 .background_executor()
2511 .spawn(async move {
2512 let old_text = snapshot.text();
2513 text_diff(old_text.as_str(), &content)
2514 .into_iter()
2515 .map(|(range, replacement)| {
2516 (snapshot.anchor_range_around(range), replacement)
2517 })
2518 .collect::<Vec<_>>()
2519 })
2520 .await;
2521
2522 if should_update_agent_location {
2523 project.update(cx, |project, cx| {
2524 project.set_agent_location(
2525 Some(AgentLocation {
2526 buffer: buffer.downgrade(),
2527 position: edits
2528 .last()
2529 .map(|(range, _)| range.end)
2530 .unwrap_or(Anchor::min_for_buffer(buffer.read(cx).remote_id())),
2531 }),
2532 cx,
2533 );
2534 });
2535 }
2536
2537 let format_on_save = cx.update(|cx| {
2538 action_log.update(cx, |action_log, cx| {
2539 action_log.buffer_read(buffer.clone(), cx);
2540 });
2541
2542 let format_on_save = buffer.update(cx, |buffer, cx| {
2543 buffer.edit(edits, None, cx);
2544
2545 let settings = language::language_settings::language_settings(
2546 buffer.language().map(|l| l.name()),
2547 buffer.file(),
2548 cx,
2549 );
2550
2551 settings.format_on_save != FormatOnSave::Off
2552 });
2553 action_log.update(cx, |action_log, cx| {
2554 action_log.buffer_edited(buffer.clone(), cx);
2555 });
2556 format_on_save
2557 });
2558
2559 if format_on_save {
2560 let format_task = project.update(cx, |project, cx| {
2561 project.format(
2562 HashSet::from_iter([buffer.clone()]),
2563 LspFormatTarget::Buffers,
2564 false,
2565 FormatTrigger::Save,
2566 cx,
2567 )
2568 });
2569 format_task.await.log_err();
2570
2571 action_log.update(cx, |action_log, cx| {
2572 action_log.buffer_edited(buffer.clone(), cx);
2573 });
2574 }
2575
2576 project
2577 .update(cx, |project, cx| project.save_buffer(buffer, cx))
2578 .await
2579 })
2580 }
2581
2582 pub fn create_terminal(
2583 &self,
2584 command: String,
2585 args: Vec<String>,
2586 extra_env: Vec<acp::EnvVariable>,
2587 cwd: Option<PathBuf>,
2588 output_byte_limit: Option<u64>,
2589 cx: &mut Context<Self>,
2590 ) -> Task<Result<Entity<Terminal>>> {
2591 let env = match &cwd {
2592 Some(dir) => self.project.update(cx, |project, cx| {
2593 project.environment().update(cx, |env, cx| {
2594 env.directory_environment(dir.as_path().into(), cx)
2595 })
2596 }),
2597 None => Task::ready(None).shared(),
2598 };
2599 let env = cx.spawn(async move |_, _| {
2600 let mut env = env.await.unwrap_or_default();
2601 // Disables paging for `git` and hopefully other commands
2602 env.insert("PAGER".into(), "".into());
2603 for var in extra_env {
2604 env.insert(var.name, var.value);
2605 }
2606 env
2607 });
2608
2609 let project = self.project.clone();
2610 let language_registry = project.read(cx).languages().clone();
2611 let is_windows = project.read(cx).path_style(cx).is_windows();
2612
2613 let terminal_id = acp::TerminalId::new(Uuid::new_v4().to_string());
2614 let terminal_task = cx.spawn({
2615 let terminal_id = terminal_id.clone();
2616 async move |_this, cx| {
2617 let env = env.await;
2618 let shell = project
2619 .update(cx, |project, cx| {
2620 project
2621 .remote_client()
2622 .and_then(|r| r.read(cx).default_system_shell())
2623 })
2624 .unwrap_or_else(|| get_default_system_shell_preferring_bash());
2625 let (task_command, task_args) =
2626 ShellBuilder::new(&Shell::Program(shell), is_windows)
2627 .redirect_stdin_to_dev_null()
2628 .build(Some(command.clone()), &args);
2629 let terminal = project
2630 .update(cx, |project, cx| {
2631 project.create_terminal_task(
2632 task::SpawnInTerminal {
2633 command: Some(task_command),
2634 args: task_args,
2635 cwd: cwd.clone(),
2636 env,
2637 ..Default::default()
2638 },
2639 cx,
2640 )
2641 })
2642 .await?;
2643
2644 anyhow::Ok(cx.new(|cx| {
2645 Terminal::new(
2646 terminal_id,
2647 &format!("{} {}", command, args.join(" ")),
2648 cwd,
2649 output_byte_limit.map(|l| l as usize),
2650 terminal,
2651 language_registry,
2652 cx,
2653 )
2654 }))
2655 }
2656 });
2657
2658 cx.spawn(async move |this, cx| {
2659 let terminal = terminal_task.await?;
2660 this.update(cx, |this, _cx| {
2661 this.terminals.insert(terminal_id, terminal.clone());
2662 terminal
2663 })
2664 })
2665 }
2666
2667 pub fn kill_terminal(
2668 &mut self,
2669 terminal_id: acp::TerminalId,
2670 cx: &mut Context<Self>,
2671 ) -> Result<()> {
2672 self.terminals
2673 .get(&terminal_id)
2674 .context("Terminal not found")?
2675 .update(cx, |terminal, cx| {
2676 terminal.kill(cx);
2677 });
2678
2679 Ok(())
2680 }
2681
2682 pub fn release_terminal(
2683 &mut self,
2684 terminal_id: acp::TerminalId,
2685 cx: &mut Context<Self>,
2686 ) -> Result<()> {
2687 self.terminals
2688 .remove(&terminal_id)
2689 .context("Terminal not found")?
2690 .update(cx, |terminal, cx| {
2691 terminal.kill(cx);
2692 });
2693
2694 Ok(())
2695 }
2696
2697 pub fn terminal(&self, terminal_id: acp::TerminalId) -> Result<Entity<Terminal>> {
2698 self.terminals
2699 .get(&terminal_id)
2700 .context("Terminal not found")
2701 .cloned()
2702 }
2703
2704 pub fn to_markdown(&self, cx: &App) -> String {
2705 self.entries.iter().map(|e| e.to_markdown(cx)).collect()
2706 }
2707
2708 pub fn emit_load_error(&mut self, error: LoadError, cx: &mut Context<Self>) {
2709 cx.emit(AcpThreadEvent::LoadError(error));
2710 }
2711
2712 pub fn register_terminal_created(
2713 &mut self,
2714 terminal_id: acp::TerminalId,
2715 command_label: String,
2716 working_dir: Option<PathBuf>,
2717 output_byte_limit: Option<u64>,
2718 terminal: Entity<::terminal::Terminal>,
2719 cx: &mut Context<Self>,
2720 ) -> Entity<Terminal> {
2721 let language_registry = self.project.read(cx).languages().clone();
2722
2723 let entity = cx.new(|cx| {
2724 Terminal::new(
2725 terminal_id.clone(),
2726 &command_label,
2727 working_dir.clone(),
2728 output_byte_limit.map(|l| l as usize),
2729 terminal,
2730 language_registry,
2731 cx,
2732 )
2733 });
2734 self.terminals.insert(terminal_id.clone(), entity.clone());
2735 entity
2736 }
2737
2738 pub fn mark_as_subagent_output(&mut self, cx: &mut Context<Self>) {
2739 for entry in self.entries.iter_mut().rev() {
2740 if let AgentThreadEntry::AssistantMessage(assistant_message) = entry {
2741 assistant_message.is_subagent_output = true;
2742 cx.notify();
2743 return;
2744 }
2745 }
2746 }
2747
2748 pub fn on_terminal_provider_event(
2749 &mut self,
2750 event: TerminalProviderEvent,
2751 cx: &mut Context<Self>,
2752 ) {
2753 match event {
2754 TerminalProviderEvent::Created {
2755 terminal_id,
2756 label,
2757 cwd,
2758 output_byte_limit,
2759 terminal,
2760 } => {
2761 let entity = self.register_terminal_created(
2762 terminal_id.clone(),
2763 label,
2764 cwd,
2765 output_byte_limit,
2766 terminal,
2767 cx,
2768 );
2769
2770 if let Some(mut chunks) = self.pending_terminal_output.remove(&terminal_id) {
2771 for data in chunks.drain(..) {
2772 entity.update(cx, |term, cx| {
2773 term.inner().update(cx, |inner, cx| {
2774 inner.write_output(&data, cx);
2775 })
2776 });
2777 }
2778 }
2779
2780 if let Some(_status) = self.pending_terminal_exit.remove(&terminal_id) {
2781 entity.update(cx, |_term, cx| {
2782 cx.notify();
2783 });
2784 }
2785
2786 cx.notify();
2787 }
2788 TerminalProviderEvent::Output { terminal_id, data } => {
2789 if let Some(entity) = self.terminals.get(&terminal_id) {
2790 entity.update(cx, |term, cx| {
2791 term.inner().update(cx, |inner, cx| {
2792 inner.write_output(&data, cx);
2793 })
2794 });
2795 } else {
2796 self.pending_terminal_output
2797 .entry(terminal_id)
2798 .or_default()
2799 .push(data);
2800 }
2801 }
2802 TerminalProviderEvent::TitleChanged { terminal_id, title } => {
2803 if let Some(entity) = self.terminals.get(&terminal_id) {
2804 entity.update(cx, |term, cx| {
2805 term.inner().update(cx, |inner, cx| {
2806 inner.breadcrumb_text = title;
2807 cx.emit(::terminal::Event::BreadcrumbsChanged);
2808 })
2809 });
2810 }
2811 }
2812 TerminalProviderEvent::Exit {
2813 terminal_id,
2814 status,
2815 } => {
2816 if let Some(entity) = self.terminals.get(&terminal_id) {
2817 entity.update(cx, |_term, cx| {
2818 cx.notify();
2819 });
2820 } else {
2821 self.pending_terminal_exit.insert(terminal_id, status);
2822 }
2823 }
2824 }
2825 }
2826}
2827
2828fn markdown_for_raw_output(
2829 raw_output: &serde_json::Value,
2830 language_registry: &Arc<LanguageRegistry>,
2831 cx: &mut App,
2832) -> Option<Entity<Markdown>> {
2833 match raw_output {
2834 serde_json::Value::Null => None,
2835 serde_json::Value::Bool(value) => Some(cx.new(|cx| {
2836 Markdown::new(
2837 value.to_string().into(),
2838 Some(language_registry.clone()),
2839 None,
2840 cx,
2841 )
2842 })),
2843 serde_json::Value::Number(value) => Some(cx.new(|cx| {
2844 Markdown::new(
2845 value.to_string().into(),
2846 Some(language_registry.clone()),
2847 None,
2848 cx,
2849 )
2850 })),
2851 serde_json::Value::String(value) => Some(cx.new(|cx| {
2852 Markdown::new(
2853 value.clone().into(),
2854 Some(language_registry.clone()),
2855 None,
2856 cx,
2857 )
2858 })),
2859 value => Some(cx.new(|cx| {
2860 let pretty_json = to_string_pretty(value).unwrap_or_else(|_| value.to_string());
2861
2862 Markdown::new(
2863 format!("```json\n{}\n```", pretty_json).into(),
2864 Some(language_registry.clone()),
2865 None,
2866 cx,
2867 )
2868 })),
2869 }
2870}
2871
2872#[cfg(test)]
2873mod tests {
2874 use super::*;
2875 use anyhow::anyhow;
2876 use futures::{channel::mpsc, future::LocalBoxFuture, select};
2877 use gpui::{App, AsyncApp, TestAppContext, WeakEntity};
2878 use indoc::indoc;
2879 use project::{AgentId, FakeFs, Fs};
2880 use rand::{distr, prelude::*};
2881 use serde_json::json;
2882 use settings::SettingsStore;
2883 use smol::stream::StreamExt as _;
2884 use std::{
2885 any::Any,
2886 cell::RefCell,
2887 path::Path,
2888 rc::Rc,
2889 sync::atomic::{AtomicBool, AtomicUsize, Ordering::SeqCst},
2890 time::Duration,
2891 };
2892 use util::{path, path_list::PathList};
2893
2894 fn init_test(cx: &mut TestAppContext) {
2895 env_logger::try_init().ok();
2896 cx.update(|cx| {
2897 let settings_store = SettingsStore::test(cx);
2898 cx.set_global(settings_store);
2899 });
2900 }
2901
2902 #[gpui::test]
2903 async fn test_terminal_output_buffered_before_created_renders(cx: &mut gpui::TestAppContext) {
2904 init_test(cx);
2905
2906 let fs = FakeFs::new(cx.executor());
2907 let project = Project::test(fs, [], cx).await;
2908 let connection = Rc::new(FakeAgentConnection::new());
2909 let thread = cx
2910 .update(|cx| {
2911 connection.new_session(
2912 project,
2913 PathList::new(&[std::path::Path::new(path!("/test"))]),
2914 cx,
2915 )
2916 })
2917 .await
2918 .unwrap();
2919
2920 let terminal_id = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
2921
2922 // Send Output BEFORE Created - should be buffered by acp_thread
2923 thread.update(cx, |thread, cx| {
2924 thread.on_terminal_provider_event(
2925 TerminalProviderEvent::Output {
2926 terminal_id: terminal_id.clone(),
2927 data: b"hello buffered".to_vec(),
2928 },
2929 cx,
2930 );
2931 });
2932
2933 // Create a display-only terminal and then send Created
2934 let lower = cx.new(|cx| {
2935 let builder = ::terminal::TerminalBuilder::new_display_only(
2936 ::terminal::terminal_settings::CursorShape::default(),
2937 ::terminal::terminal_settings::AlternateScroll::On,
2938 None,
2939 0,
2940 cx.background_executor(),
2941 PathStyle::local(),
2942 )
2943 .unwrap();
2944 builder.subscribe(cx)
2945 });
2946
2947 thread.update(cx, |thread, cx| {
2948 thread.on_terminal_provider_event(
2949 TerminalProviderEvent::Created {
2950 terminal_id: terminal_id.clone(),
2951 label: "Buffered Test".to_string(),
2952 cwd: None,
2953 output_byte_limit: None,
2954 terminal: lower.clone(),
2955 },
2956 cx,
2957 );
2958 });
2959
2960 // After Created, buffered Output should have been flushed into the renderer
2961 let content = thread.read_with(cx, |thread, cx| {
2962 let term = thread.terminal(terminal_id.clone()).unwrap();
2963 term.read_with(cx, |t, cx| t.inner().read(cx).get_content())
2964 });
2965
2966 assert!(
2967 content.contains("hello buffered"),
2968 "expected buffered output to render, got: {content}"
2969 );
2970 }
2971
2972 #[gpui::test]
2973 async fn test_terminal_output_and_exit_buffered_before_created(cx: &mut gpui::TestAppContext) {
2974 init_test(cx);
2975
2976 let fs = FakeFs::new(cx.executor());
2977 let project = Project::test(fs, [], cx).await;
2978 let connection = Rc::new(FakeAgentConnection::new());
2979 let thread = cx
2980 .update(|cx| {
2981 connection.new_session(
2982 project,
2983 PathList::new(&[std::path::Path::new(path!("/test"))]),
2984 cx,
2985 )
2986 })
2987 .await
2988 .unwrap();
2989
2990 let terminal_id = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
2991
2992 // Send Output BEFORE Created
2993 thread.update(cx, |thread, cx| {
2994 thread.on_terminal_provider_event(
2995 TerminalProviderEvent::Output {
2996 terminal_id: terminal_id.clone(),
2997 data: b"pre-exit data".to_vec(),
2998 },
2999 cx,
3000 );
3001 });
3002
3003 // Send Exit BEFORE Created
3004 thread.update(cx, |thread, cx| {
3005 thread.on_terminal_provider_event(
3006 TerminalProviderEvent::Exit {
3007 terminal_id: terminal_id.clone(),
3008 status: acp::TerminalExitStatus::new().exit_code(0),
3009 },
3010 cx,
3011 );
3012 });
3013
3014 // Now create a display-only lower-level terminal and send Created
3015 let lower = cx.new(|cx| {
3016 let builder = ::terminal::TerminalBuilder::new_display_only(
3017 ::terminal::terminal_settings::CursorShape::default(),
3018 ::terminal::terminal_settings::AlternateScroll::On,
3019 None,
3020 0,
3021 cx.background_executor(),
3022 PathStyle::local(),
3023 )
3024 .unwrap();
3025 builder.subscribe(cx)
3026 });
3027
3028 thread.update(cx, |thread, cx| {
3029 thread.on_terminal_provider_event(
3030 TerminalProviderEvent::Created {
3031 terminal_id: terminal_id.clone(),
3032 label: "Buffered Exit Test".to_string(),
3033 cwd: None,
3034 output_byte_limit: None,
3035 terminal: lower.clone(),
3036 },
3037 cx,
3038 );
3039 });
3040
3041 // Output should be present after Created (flushed from buffer)
3042 let content = thread.read_with(cx, |thread, cx| {
3043 let term = thread.terminal(terminal_id.clone()).unwrap();
3044 term.read_with(cx, |t, cx| t.inner().read(cx).get_content())
3045 });
3046
3047 assert!(
3048 content.contains("pre-exit data"),
3049 "expected pre-exit data to render, got: {content}"
3050 );
3051 }
3052
3053 /// Test that killing a terminal via Terminal::kill properly:
3054 /// 1. Causes wait_for_exit to complete (doesn't hang forever)
3055 /// 2. The underlying terminal still has the output that was written before the kill
3056 ///
3057 /// This test verifies that the fix to kill_active_task (which now also kills
3058 /// the shell process in addition to the foreground process) properly allows
3059 /// wait_for_exit to complete instead of hanging indefinitely.
3060 #[cfg(unix)]
3061 #[gpui::test]
3062 async fn test_terminal_kill_allows_wait_for_exit_to_complete(cx: &mut gpui::TestAppContext) {
3063 use std::collections::HashMap;
3064 use task::Shell;
3065 use util::shell_builder::ShellBuilder;
3066
3067 init_test(cx);
3068 cx.executor().allow_parking();
3069
3070 let fs = FakeFs::new(cx.executor());
3071 let project = Project::test(fs, [], cx).await;
3072 let connection = Rc::new(FakeAgentConnection::new());
3073 let thread = cx
3074 .update(|cx| {
3075 connection.new_session(
3076 project.clone(),
3077 PathList::new(&[Path::new(path!("/test"))]),
3078 cx,
3079 )
3080 })
3081 .await
3082 .unwrap();
3083
3084 let terminal_id = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
3085
3086 // Create a real PTY terminal that runs a command which prints output then sleeps
3087 // We use printf instead of echo and chain with && sleep to ensure proper execution
3088 let (completion_tx, _completion_rx) = smol::channel::unbounded();
3089 let (program, args) = ShellBuilder::new(&Shell::System, false).build(
3090 Some("printf 'output_before_kill\\n' && sleep 60".to_owned()),
3091 &[],
3092 );
3093
3094 let builder = cx
3095 .update(|cx| {
3096 ::terminal::TerminalBuilder::new(
3097 None,
3098 None,
3099 task::Shell::WithArguments {
3100 program,
3101 args,
3102 title_override: None,
3103 },
3104 HashMap::default(),
3105 ::terminal::terminal_settings::CursorShape::default(),
3106 ::terminal::terminal_settings::AlternateScroll::On,
3107 None,
3108 vec![],
3109 0,
3110 false,
3111 0,
3112 Some(completion_tx),
3113 cx,
3114 vec![],
3115 PathStyle::local(),
3116 )
3117 })
3118 .await
3119 .unwrap();
3120
3121 let lower_terminal = cx.new(|cx| builder.subscribe(cx));
3122
3123 // Create the acp_thread Terminal wrapper
3124 thread.update(cx, |thread, cx| {
3125 thread.on_terminal_provider_event(
3126 TerminalProviderEvent::Created {
3127 terminal_id: terminal_id.clone(),
3128 label: "printf output_before_kill && sleep 60".to_string(),
3129 cwd: None,
3130 output_byte_limit: None,
3131 terminal: lower_terminal.clone(),
3132 },
3133 cx,
3134 );
3135 });
3136
3137 // Wait for the printf command to execute and produce output
3138 // Use real time since parking is enabled
3139 cx.executor().timer(Duration::from_millis(500)).await;
3140
3141 // Get the acp_thread Terminal and kill it
3142 let wait_for_exit = thread.update(cx, |thread, cx| {
3143 let term = thread.terminals.get(&terminal_id).unwrap();
3144 let wait_for_exit = term.read(cx).wait_for_exit();
3145 term.update(cx, |term, cx| {
3146 term.kill(cx);
3147 });
3148 wait_for_exit
3149 });
3150
3151 // KEY ASSERTION: wait_for_exit should complete within a reasonable time (not hang).
3152 // Before the fix to kill_active_task, this would hang forever because
3153 // only the foreground process was killed, not the shell, so the PTY
3154 // child never exited and wait_for_completed_task never completed.
3155 let exit_result = futures::select! {
3156 result = futures::FutureExt::fuse(wait_for_exit) => Some(result),
3157 _ = futures::FutureExt::fuse(cx.background_executor.timer(Duration::from_secs(5))) => None,
3158 };
3159
3160 assert!(
3161 exit_result.is_some(),
3162 "wait_for_exit should complete after kill, but it timed out. \
3163 This indicates kill_active_task is not properly killing the shell process."
3164 );
3165
3166 // Give the system a chance to process any pending updates
3167 cx.run_until_parked();
3168
3169 // Verify that the underlying terminal still has the output that was
3170 // written before the kill. This verifies that killing doesn't lose output.
3171 let inner_content = thread.read_with(cx, |thread, cx| {
3172 let term = thread.terminals.get(&terminal_id).unwrap();
3173 term.read(cx).inner().read(cx).get_content()
3174 });
3175
3176 assert!(
3177 inner_content.contains("output_before_kill"),
3178 "Underlying terminal should contain output from before kill, got: {}",
3179 inner_content
3180 );
3181 }
3182
3183 #[gpui::test]
3184 async fn test_push_user_content_block(cx: &mut gpui::TestAppContext) {
3185 init_test(cx);
3186
3187 let fs = FakeFs::new(cx.executor());
3188 let project = Project::test(fs, [], cx).await;
3189 let connection = Rc::new(FakeAgentConnection::new());
3190 let thread = cx
3191 .update(|cx| {
3192 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
3193 })
3194 .await
3195 .unwrap();
3196
3197 // Test creating a new user message
3198 thread.update(cx, |thread, cx| {
3199 thread.push_user_content_block(None, "Hello, ".into(), cx);
3200 });
3201
3202 thread.update(cx, |thread, cx| {
3203 assert_eq!(thread.entries.len(), 1);
3204 if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
3205 assert_eq!(user_msg.id, None);
3206 assert_eq!(user_msg.content.to_markdown(cx), "Hello, ");
3207 } else {
3208 panic!("Expected UserMessage");
3209 }
3210 });
3211
3212 // Test appending to existing user message
3213 let message_1_id = UserMessageId::new();
3214 thread.update(cx, |thread, cx| {
3215 thread.push_user_content_block(Some(message_1_id.clone()), "world!".into(), cx);
3216 });
3217
3218 thread.update(cx, |thread, cx| {
3219 assert_eq!(thread.entries.len(), 1);
3220 if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
3221 assert_eq!(user_msg.id, Some(message_1_id));
3222 assert_eq!(user_msg.content.to_markdown(cx), "Hello, world!");
3223 } else {
3224 panic!("Expected UserMessage");
3225 }
3226 });
3227
3228 // Test creating new user message after assistant message
3229 thread.update(cx, |thread, cx| {
3230 thread.push_assistant_content_block("Assistant response".into(), false, cx);
3231 });
3232
3233 let message_2_id = UserMessageId::new();
3234 thread.update(cx, |thread, cx| {
3235 thread.push_user_content_block(
3236 Some(message_2_id.clone()),
3237 "New user message".into(),
3238 cx,
3239 );
3240 });
3241
3242 thread.update(cx, |thread, cx| {
3243 assert_eq!(thread.entries.len(), 3);
3244 if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[2] {
3245 assert_eq!(user_msg.id, Some(message_2_id));
3246 assert_eq!(user_msg.content.to_markdown(cx), "New user message");
3247 } else {
3248 panic!("Expected UserMessage at index 2");
3249 }
3250 });
3251 }
3252
3253 #[gpui::test]
3254 async fn test_thinking_concatenation(cx: &mut gpui::TestAppContext) {
3255 init_test(cx);
3256
3257 let fs = FakeFs::new(cx.executor());
3258 let project = Project::test(fs, [], cx).await;
3259 let connection = Rc::new(FakeAgentConnection::new().on_user_message(
3260 |_, thread, mut cx| {
3261 async move {
3262 thread.update(&mut cx, |thread, cx| {
3263 thread
3264 .handle_session_update(
3265 acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk::new(
3266 "Thinking ".into(),
3267 )),
3268 cx,
3269 )
3270 .unwrap();
3271 thread
3272 .handle_session_update(
3273 acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk::new(
3274 "hard!".into(),
3275 )),
3276 cx,
3277 )
3278 .unwrap();
3279 })?;
3280 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3281 }
3282 .boxed_local()
3283 },
3284 ));
3285
3286 let thread = cx
3287 .update(|cx| {
3288 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
3289 })
3290 .await
3291 .unwrap();
3292
3293 thread
3294 .update(cx, |thread, cx| thread.send_raw("Hello from Zed!", cx))
3295 .await
3296 .unwrap();
3297
3298 let output = thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
3299 assert_eq!(
3300 output,
3301 indoc! {r#"
3302 ## User
3303
3304 Hello from Zed!
3305
3306 ## Assistant
3307
3308 <thinking>
3309 Thinking hard!
3310 </thinking>
3311
3312 "#}
3313 );
3314 }
3315
3316 #[gpui::test]
3317 async fn test_edits_concurrently_to_user(cx: &mut TestAppContext) {
3318 init_test(cx);
3319
3320 let fs = FakeFs::new(cx.executor());
3321 fs.insert_tree(path!("/tmp"), json!({"foo": "one\ntwo\nthree\n"}))
3322 .await;
3323 let project = Project::test(fs.clone(), [], cx).await;
3324 let (read_file_tx, read_file_rx) = oneshot::channel::<()>();
3325 let read_file_tx = Rc::new(RefCell::new(Some(read_file_tx)));
3326 let connection = Rc::new(FakeAgentConnection::new().on_user_message(
3327 move |_, thread, mut cx| {
3328 let read_file_tx = read_file_tx.clone();
3329 async move {
3330 let content = thread
3331 .update(&mut cx, |thread, cx| {
3332 thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
3333 })
3334 .unwrap()
3335 .await
3336 .unwrap();
3337 assert_eq!(content, "one\ntwo\nthree\n");
3338 read_file_tx.take().unwrap().send(()).unwrap();
3339 thread
3340 .update(&mut cx, |thread, cx| {
3341 thread.write_text_file(
3342 path!("/tmp/foo").into(),
3343 "one\ntwo\nthree\nfour\nfive\n".to_string(),
3344 cx,
3345 )
3346 })
3347 .unwrap()
3348 .await
3349 .unwrap();
3350 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3351 }
3352 .boxed_local()
3353 },
3354 ));
3355
3356 let (worktree, pathbuf) = project
3357 .update(cx, |project, cx| {
3358 project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
3359 })
3360 .await
3361 .unwrap();
3362 let buffer = project
3363 .update(cx, |project, cx| {
3364 project.open_buffer((worktree.read(cx).id(), pathbuf), cx)
3365 })
3366 .await
3367 .unwrap();
3368
3369 let thread = cx
3370 .update(|cx| {
3371 connection.new_session(project, PathList::new(&[Path::new(path!("/tmp"))]), cx)
3372 })
3373 .await
3374 .unwrap();
3375
3376 let request = thread.update(cx, |thread, cx| {
3377 thread.send_raw("Extend the count in /tmp/foo", cx)
3378 });
3379 read_file_rx.await.ok();
3380 buffer.update(cx, |buffer, cx| {
3381 buffer.edit([(0..0, "zero\n".to_string())], None, cx);
3382 });
3383 cx.run_until_parked();
3384 assert_eq!(
3385 buffer.read_with(cx, |buffer, _| buffer.text()),
3386 "zero\none\ntwo\nthree\nfour\nfive\n"
3387 );
3388 assert_eq!(
3389 String::from_utf8(fs.read_file_sync(path!("/tmp/foo")).unwrap()).unwrap(),
3390 "zero\none\ntwo\nthree\nfour\nfive\n"
3391 );
3392 request.await.unwrap();
3393 }
3394
3395 #[gpui::test]
3396 async fn test_reading_from_line(cx: &mut TestAppContext) {
3397 init_test(cx);
3398
3399 let fs = FakeFs::new(cx.executor());
3400 fs.insert_tree(path!("/tmp"), json!({"foo": "one\ntwo\nthree\nfour\n"}))
3401 .await;
3402 let project = Project::test(fs.clone(), [], cx).await;
3403 project
3404 .update(cx, |project, cx| {
3405 project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
3406 })
3407 .await
3408 .unwrap();
3409
3410 let connection = Rc::new(FakeAgentConnection::new());
3411
3412 let thread = cx
3413 .update(|cx| {
3414 connection.new_session(project, PathList::new(&[Path::new(path!("/tmp"))]), cx)
3415 })
3416 .await
3417 .unwrap();
3418
3419 // Whole file
3420 let content = thread
3421 .update(cx, |thread, cx| {
3422 thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
3423 })
3424 .await
3425 .unwrap();
3426
3427 assert_eq!(content, "one\ntwo\nthree\nfour\n");
3428
3429 // Only start line
3430 let content = thread
3431 .update(cx, |thread, cx| {
3432 thread.read_text_file(path!("/tmp/foo").into(), Some(3), None, false, cx)
3433 })
3434 .await
3435 .unwrap();
3436
3437 assert_eq!(content, "three\nfour\n");
3438
3439 // Only limit
3440 let content = thread
3441 .update(cx, |thread, cx| {
3442 thread.read_text_file(path!("/tmp/foo").into(), None, Some(2), false, cx)
3443 })
3444 .await
3445 .unwrap();
3446
3447 assert_eq!(content, "one\ntwo\n");
3448
3449 // Range
3450 let content = thread
3451 .update(cx, |thread, cx| {
3452 thread.read_text_file(path!("/tmp/foo").into(), Some(2), Some(2), false, cx)
3453 })
3454 .await
3455 .unwrap();
3456
3457 assert_eq!(content, "two\nthree\n");
3458
3459 // Invalid
3460 let err = thread
3461 .update(cx, |thread, cx| {
3462 thread.read_text_file(path!("/tmp/foo").into(), Some(6), Some(2), false, cx)
3463 })
3464 .await
3465 .unwrap_err();
3466
3467 assert_eq!(
3468 err.to_string(),
3469 "Invalid params: \"Attempting to read beyond the end of the file, line 5:0\""
3470 );
3471 }
3472
3473 #[gpui::test]
3474 async fn test_reading_empty_file(cx: &mut TestAppContext) {
3475 init_test(cx);
3476
3477 let fs = FakeFs::new(cx.executor());
3478 fs.insert_tree(path!("/tmp"), json!({"foo": ""})).await;
3479 let project = Project::test(fs.clone(), [], cx).await;
3480 project
3481 .update(cx, |project, cx| {
3482 project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
3483 })
3484 .await
3485 .unwrap();
3486
3487 let connection = Rc::new(FakeAgentConnection::new());
3488
3489 let thread = cx
3490 .update(|cx| {
3491 connection.new_session(project, PathList::new(&[Path::new(path!("/tmp"))]), cx)
3492 })
3493 .await
3494 .unwrap();
3495
3496 // Whole file
3497 let content = thread
3498 .update(cx, |thread, cx| {
3499 thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
3500 })
3501 .await
3502 .unwrap();
3503
3504 assert_eq!(content, "");
3505
3506 // Only start line
3507 let content = thread
3508 .update(cx, |thread, cx| {
3509 thread.read_text_file(path!("/tmp/foo").into(), Some(1), None, false, cx)
3510 })
3511 .await
3512 .unwrap();
3513
3514 assert_eq!(content, "");
3515
3516 // Only limit
3517 let content = thread
3518 .update(cx, |thread, cx| {
3519 thread.read_text_file(path!("/tmp/foo").into(), None, Some(2), false, cx)
3520 })
3521 .await
3522 .unwrap();
3523
3524 assert_eq!(content, "");
3525
3526 // Range
3527 let content = thread
3528 .update(cx, |thread, cx| {
3529 thread.read_text_file(path!("/tmp/foo").into(), Some(1), Some(1), false, cx)
3530 })
3531 .await
3532 .unwrap();
3533
3534 assert_eq!(content, "");
3535
3536 // Invalid
3537 let err = thread
3538 .update(cx, |thread, cx| {
3539 thread.read_text_file(path!("/tmp/foo").into(), Some(5), Some(2), false, cx)
3540 })
3541 .await
3542 .unwrap_err();
3543
3544 assert_eq!(
3545 err.to_string(),
3546 "Invalid params: \"Attempting to read beyond the end of the file, line 1:0\""
3547 );
3548 }
3549 #[gpui::test]
3550 async fn test_reading_non_existing_file(cx: &mut TestAppContext) {
3551 init_test(cx);
3552
3553 let fs = FakeFs::new(cx.executor());
3554 fs.insert_tree(path!("/tmp"), json!({})).await;
3555 let project = Project::test(fs.clone(), [], cx).await;
3556 project
3557 .update(cx, |project, cx| {
3558 project.find_or_create_worktree(path!("/tmp"), true, cx)
3559 })
3560 .await
3561 .unwrap();
3562
3563 let connection = Rc::new(FakeAgentConnection::new());
3564
3565 let thread = cx
3566 .update(|cx| {
3567 connection.new_session(project, PathList::new(&[Path::new(path!("/tmp"))]), cx)
3568 })
3569 .await
3570 .unwrap();
3571
3572 // Out of project file
3573 let err = thread
3574 .update(cx, |thread, cx| {
3575 thread.read_text_file(path!("/foo").into(), None, None, false, cx)
3576 })
3577 .await
3578 .unwrap_err();
3579
3580 assert_eq!(err.code, acp::ErrorCode::ResourceNotFound);
3581 }
3582
3583 #[gpui::test]
3584 async fn test_succeeding_canceled_toolcall(cx: &mut TestAppContext) {
3585 init_test(cx);
3586
3587 let fs = FakeFs::new(cx.executor());
3588 let project = Project::test(fs, [], cx).await;
3589 let id = acp::ToolCallId::new("test");
3590
3591 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3592 let id = id.clone();
3593 move |_, thread, mut cx| {
3594 let id = id.clone();
3595 async move {
3596 thread
3597 .update(&mut cx, |thread, cx| {
3598 thread.handle_session_update(
3599 acp::SessionUpdate::ToolCall(
3600 acp::ToolCall::new(id.clone(), "Label")
3601 .kind(acp::ToolKind::Fetch)
3602 .status(acp::ToolCallStatus::InProgress),
3603 ),
3604 cx,
3605 )
3606 })
3607 .unwrap()
3608 .unwrap();
3609 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3610 }
3611 .boxed_local()
3612 }
3613 }));
3614
3615 let thread = cx
3616 .update(|cx| {
3617 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
3618 })
3619 .await
3620 .unwrap();
3621
3622 let request = thread.update(cx, |thread, cx| {
3623 thread.send_raw("Fetch https://example.com", cx)
3624 });
3625
3626 run_until_first_tool_call(&thread, cx).await;
3627
3628 thread.read_with(cx, |thread, _| {
3629 assert!(matches!(
3630 thread.entries[1],
3631 AgentThreadEntry::ToolCall(ToolCall {
3632 status: ToolCallStatus::InProgress,
3633 ..
3634 })
3635 ));
3636 });
3637
3638 thread.update(cx, |thread, cx| thread.cancel(cx)).await;
3639
3640 thread.read_with(cx, |thread, _| {
3641 assert!(matches!(
3642 &thread.entries[1],
3643 AgentThreadEntry::ToolCall(ToolCall {
3644 status: ToolCallStatus::Canceled,
3645 ..
3646 })
3647 ));
3648 });
3649
3650 thread
3651 .update(cx, |thread, cx| {
3652 thread.handle_session_update(
3653 acp::SessionUpdate::ToolCallUpdate(acp::ToolCallUpdate::new(
3654 id,
3655 acp::ToolCallUpdateFields::new().status(acp::ToolCallStatus::Completed),
3656 )),
3657 cx,
3658 )
3659 })
3660 .unwrap();
3661
3662 request.await.unwrap();
3663
3664 thread.read_with(cx, |thread, _| {
3665 assert!(matches!(
3666 thread.entries[1],
3667 AgentThreadEntry::ToolCall(ToolCall {
3668 status: ToolCallStatus::Completed,
3669 ..
3670 })
3671 ));
3672 });
3673 }
3674
3675 #[gpui::test]
3676 async fn test_no_pending_edits_if_tool_calls_are_completed(cx: &mut TestAppContext) {
3677 init_test(cx);
3678 let fs = FakeFs::new(cx.background_executor.clone());
3679 fs.insert_tree(path!("/test"), json!({})).await;
3680 let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
3681
3682 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3683 move |_, thread, mut cx| {
3684 async move {
3685 thread
3686 .update(&mut cx, |thread, cx| {
3687 thread.handle_session_update(
3688 acp::SessionUpdate::ToolCall(
3689 acp::ToolCall::new("test", "Label")
3690 .kind(acp::ToolKind::Edit)
3691 .status(acp::ToolCallStatus::Completed)
3692 .content(vec![acp::ToolCallContent::Diff(acp::Diff::new(
3693 "/test/test.txt",
3694 "foo",
3695 ))]),
3696 ),
3697 cx,
3698 )
3699 })
3700 .unwrap()
3701 .unwrap();
3702 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3703 }
3704 .boxed_local()
3705 }
3706 }));
3707
3708 let thread = cx
3709 .update(|cx| {
3710 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
3711 })
3712 .await
3713 .unwrap();
3714
3715 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Hi".into()], cx)))
3716 .await
3717 .unwrap();
3718
3719 assert!(cx.read(|cx| !thread.read(cx).has_pending_edit_tool_calls()));
3720 }
3721
3722 #[gpui::test(iterations = 10)]
3723 async fn test_checkpoints(cx: &mut TestAppContext) {
3724 init_test(cx);
3725 let fs = FakeFs::new(cx.background_executor.clone());
3726 fs.insert_tree(
3727 path!("/test"),
3728 json!({
3729 ".git": {}
3730 }),
3731 )
3732 .await;
3733 let project = Project::test(fs.clone(), [path!("/test").as_ref()], cx).await;
3734
3735 let simulate_changes = Arc::new(AtomicBool::new(true));
3736 let next_filename = Arc::new(AtomicUsize::new(0));
3737 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3738 let simulate_changes = simulate_changes.clone();
3739 let next_filename = next_filename.clone();
3740 let fs = fs.clone();
3741 move |request, thread, mut cx| {
3742 let fs = fs.clone();
3743 let simulate_changes = simulate_changes.clone();
3744 let next_filename = next_filename.clone();
3745 async move {
3746 if simulate_changes.load(SeqCst) {
3747 let filename = format!("/test/file-{}", next_filename.fetch_add(1, SeqCst));
3748 fs.write(Path::new(&filename), b"").await?;
3749 }
3750
3751 let acp::ContentBlock::Text(content) = &request.prompt[0] else {
3752 panic!("expected text content block");
3753 };
3754 thread.update(&mut cx, |thread, cx| {
3755 thread
3756 .handle_session_update(
3757 acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk::new(
3758 content.text.to_uppercase().into(),
3759 )),
3760 cx,
3761 )
3762 .unwrap();
3763 })?;
3764 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3765 }
3766 .boxed_local()
3767 }
3768 }));
3769 let thread = cx
3770 .update(|cx| {
3771 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
3772 })
3773 .await
3774 .unwrap();
3775
3776 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Lorem".into()], cx)))
3777 .await
3778 .unwrap();
3779 thread.read_with(cx, |thread, cx| {
3780 assert_eq!(
3781 thread.to_markdown(cx),
3782 indoc! {"
3783 ## User (checkpoint)
3784
3785 Lorem
3786
3787 ## Assistant
3788
3789 LOREM
3790
3791 "}
3792 );
3793 });
3794 assert_eq!(fs.files(), vec![Path::new(path!("/test/file-0"))]);
3795
3796 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["ipsum".into()], cx)))
3797 .await
3798 .unwrap();
3799 thread.read_with(cx, |thread, cx| {
3800 assert_eq!(
3801 thread.to_markdown(cx),
3802 indoc! {"
3803 ## User (checkpoint)
3804
3805 Lorem
3806
3807 ## Assistant
3808
3809 LOREM
3810
3811 ## User (checkpoint)
3812
3813 ipsum
3814
3815 ## Assistant
3816
3817 IPSUM
3818
3819 "}
3820 );
3821 });
3822 assert_eq!(
3823 fs.files(),
3824 vec![
3825 Path::new(path!("/test/file-0")),
3826 Path::new(path!("/test/file-1"))
3827 ]
3828 );
3829
3830 // Checkpoint isn't stored when there are no changes.
3831 simulate_changes.store(false, SeqCst);
3832 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["dolor".into()], cx)))
3833 .await
3834 .unwrap();
3835 thread.read_with(cx, |thread, cx| {
3836 assert_eq!(
3837 thread.to_markdown(cx),
3838 indoc! {"
3839 ## User (checkpoint)
3840
3841 Lorem
3842
3843 ## Assistant
3844
3845 LOREM
3846
3847 ## User (checkpoint)
3848
3849 ipsum
3850
3851 ## Assistant
3852
3853 IPSUM
3854
3855 ## User
3856
3857 dolor
3858
3859 ## Assistant
3860
3861 DOLOR
3862
3863 "}
3864 );
3865 });
3866 assert_eq!(
3867 fs.files(),
3868 vec![
3869 Path::new(path!("/test/file-0")),
3870 Path::new(path!("/test/file-1"))
3871 ]
3872 );
3873
3874 // Rewinding the conversation truncates the history and restores the checkpoint.
3875 thread
3876 .update(cx, |thread, cx| {
3877 let AgentThreadEntry::UserMessage(message) = &thread.entries[2] else {
3878 panic!("unexpected entries {:?}", thread.entries)
3879 };
3880 thread.restore_checkpoint(message.id.clone().unwrap(), cx)
3881 })
3882 .await
3883 .unwrap();
3884 thread.read_with(cx, |thread, cx| {
3885 assert_eq!(
3886 thread.to_markdown(cx),
3887 indoc! {"
3888 ## User (checkpoint)
3889
3890 Lorem
3891
3892 ## Assistant
3893
3894 LOREM
3895
3896 "}
3897 );
3898 });
3899 assert_eq!(fs.files(), vec![Path::new(path!("/test/file-0"))]);
3900 }
3901
3902 #[gpui::test]
3903 async fn test_tool_result_refusal(cx: &mut TestAppContext) {
3904 use std::sync::atomic::AtomicUsize;
3905 init_test(cx);
3906
3907 let fs = FakeFs::new(cx.executor());
3908 let project = Project::test(fs, None, cx).await;
3909
3910 // Create a connection that simulates refusal after tool result
3911 let prompt_count = Arc::new(AtomicUsize::new(0));
3912 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3913 let prompt_count = prompt_count.clone();
3914 move |_request, thread, mut cx| {
3915 let count = prompt_count.fetch_add(1, SeqCst);
3916 async move {
3917 if count == 0 {
3918 // First prompt: Generate a tool call with result
3919 thread.update(&mut cx, |thread, cx| {
3920 thread
3921 .handle_session_update(
3922 acp::SessionUpdate::ToolCall(
3923 acp::ToolCall::new("tool1", "Test Tool")
3924 .kind(acp::ToolKind::Fetch)
3925 .status(acp::ToolCallStatus::Completed)
3926 .raw_input(serde_json::json!({"query": "test"}))
3927 .raw_output(serde_json::json!({"result": "inappropriate content"})),
3928 ),
3929 cx,
3930 )
3931 .unwrap();
3932 })?;
3933
3934 // Now return refusal because of the tool result
3935 Ok(acp::PromptResponse::new(acp::StopReason::Refusal))
3936 } else {
3937 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3938 }
3939 }
3940 .boxed_local()
3941 }
3942 }));
3943
3944 let thread = cx
3945 .update(|cx| {
3946 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
3947 })
3948 .await
3949 .unwrap();
3950
3951 // Track if we see a Refusal event
3952 let saw_refusal_event = Arc::new(std::sync::Mutex::new(false));
3953 let saw_refusal_event_captured = saw_refusal_event.clone();
3954 thread.update(cx, |_thread, cx| {
3955 cx.subscribe(
3956 &thread,
3957 move |_thread, _event_thread, event: &AcpThreadEvent, _cx| {
3958 if matches!(event, AcpThreadEvent::Refusal) {
3959 *saw_refusal_event_captured.lock().unwrap() = true;
3960 }
3961 },
3962 )
3963 .detach();
3964 });
3965
3966 // Send a user message - this will trigger tool call and then refusal
3967 let send_task = thread.update(cx, |thread, cx| thread.send(vec!["Hello".into()], cx));
3968 cx.background_executor.spawn(send_task).detach();
3969 cx.run_until_parked();
3970
3971 // Verify that:
3972 // 1. A Refusal event WAS emitted (because it's a tool result refusal, not user prompt)
3973 // 2. The user message was NOT truncated
3974 assert!(
3975 *saw_refusal_event.lock().unwrap(),
3976 "Refusal event should be emitted for tool result refusals"
3977 );
3978
3979 thread.read_with(cx, |thread, _| {
3980 let entries = thread.entries();
3981 assert!(entries.len() >= 2, "Should have user message and tool call");
3982
3983 // Verify user message is still there
3984 assert!(
3985 matches!(entries[0], AgentThreadEntry::UserMessage(_)),
3986 "User message should not be truncated"
3987 );
3988
3989 // Verify tool call is there with result
3990 if let AgentThreadEntry::ToolCall(tool_call) = &entries[1] {
3991 assert!(
3992 tool_call.raw_output.is_some(),
3993 "Tool call should have output"
3994 );
3995 } else {
3996 panic!("Expected tool call at index 1");
3997 }
3998 });
3999 }
4000
4001 #[gpui::test]
4002 async fn test_user_prompt_refusal_emits_event(cx: &mut TestAppContext) {
4003 init_test(cx);
4004
4005 let fs = FakeFs::new(cx.executor());
4006 let project = Project::test(fs, None, cx).await;
4007
4008 let refuse_next = Arc::new(AtomicBool::new(false));
4009 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
4010 let refuse_next = refuse_next.clone();
4011 move |_request, _thread, _cx| {
4012 if refuse_next.load(SeqCst) {
4013 async move { Ok(acp::PromptResponse::new(acp::StopReason::Refusal)) }
4014 .boxed_local()
4015 } else {
4016 async move { Ok(acp::PromptResponse::new(acp::StopReason::EndTurn)) }
4017 .boxed_local()
4018 }
4019 }
4020 }));
4021
4022 let thread = cx
4023 .update(|cx| {
4024 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4025 })
4026 .await
4027 .unwrap();
4028
4029 // Track if we see a Refusal event
4030 let saw_refusal_event = Arc::new(std::sync::Mutex::new(false));
4031 let saw_refusal_event_captured = saw_refusal_event.clone();
4032 thread.update(cx, |_thread, cx| {
4033 cx.subscribe(
4034 &thread,
4035 move |_thread, _event_thread, event: &AcpThreadEvent, _cx| {
4036 if matches!(event, AcpThreadEvent::Refusal) {
4037 *saw_refusal_event_captured.lock().unwrap() = true;
4038 }
4039 },
4040 )
4041 .detach();
4042 });
4043
4044 // Send a message that will be refused
4045 refuse_next.store(true, SeqCst);
4046 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["hello".into()], cx)))
4047 .await
4048 .unwrap();
4049
4050 // Verify that a Refusal event WAS emitted for user prompt refusal
4051 assert!(
4052 *saw_refusal_event.lock().unwrap(),
4053 "Refusal event should be emitted for user prompt refusals"
4054 );
4055
4056 // Verify the message was truncated (user prompt refusal)
4057 thread.read_with(cx, |thread, cx| {
4058 assert_eq!(thread.to_markdown(cx), "");
4059 });
4060 }
4061
4062 #[gpui::test]
4063 async fn test_refusal(cx: &mut TestAppContext) {
4064 init_test(cx);
4065 let fs = FakeFs::new(cx.background_executor.clone());
4066 fs.insert_tree(path!("/"), json!({})).await;
4067 let project = Project::test(fs.clone(), [path!("/").as_ref()], cx).await;
4068
4069 let refuse_next = Arc::new(AtomicBool::new(false));
4070 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
4071 let refuse_next = refuse_next.clone();
4072 move |request, thread, mut cx| {
4073 let refuse_next = refuse_next.clone();
4074 async move {
4075 if refuse_next.load(SeqCst) {
4076 return Ok(acp::PromptResponse::new(acp::StopReason::Refusal));
4077 }
4078
4079 let acp::ContentBlock::Text(content) = &request.prompt[0] else {
4080 panic!("expected text content block");
4081 };
4082 thread.update(&mut cx, |thread, cx| {
4083 thread
4084 .handle_session_update(
4085 acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk::new(
4086 content.text.to_uppercase().into(),
4087 )),
4088 cx,
4089 )
4090 .unwrap();
4091 })?;
4092 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
4093 }
4094 .boxed_local()
4095 }
4096 }));
4097 let thread = cx
4098 .update(|cx| {
4099 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4100 })
4101 .await
4102 .unwrap();
4103
4104 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["hello".into()], cx)))
4105 .await
4106 .unwrap();
4107 thread.read_with(cx, |thread, cx| {
4108 assert_eq!(
4109 thread.to_markdown(cx),
4110 indoc! {"
4111 ## User
4112
4113 hello
4114
4115 ## Assistant
4116
4117 HELLO
4118
4119 "}
4120 );
4121 });
4122
4123 // Simulate refusing the second message. The message should be truncated
4124 // when a user prompt is refused.
4125 refuse_next.store(true, SeqCst);
4126 cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["world".into()], cx)))
4127 .await
4128 .unwrap();
4129 thread.read_with(cx, |thread, cx| {
4130 assert_eq!(
4131 thread.to_markdown(cx),
4132 indoc! {"
4133 ## User
4134
4135 hello
4136
4137 ## Assistant
4138
4139 HELLO
4140
4141 "}
4142 );
4143 });
4144 }
4145
4146 async fn run_until_first_tool_call(
4147 thread: &Entity<AcpThread>,
4148 cx: &mut TestAppContext,
4149 ) -> usize {
4150 let (mut tx, mut rx) = mpsc::channel::<usize>(1);
4151
4152 let subscription = cx.update(|cx| {
4153 cx.subscribe(thread, move |thread, _, cx| {
4154 for (ix, entry) in thread.read(cx).entries.iter().enumerate() {
4155 if matches!(entry, AgentThreadEntry::ToolCall(_)) {
4156 return tx.try_send(ix).unwrap();
4157 }
4158 }
4159 })
4160 });
4161
4162 select! {
4163 _ = futures::FutureExt::fuse(cx.background_executor.timer(Duration::from_secs(10))) => {
4164 panic!("Timeout waiting for tool call")
4165 }
4166 ix = rx.next().fuse() => {
4167 drop(subscription);
4168 ix.unwrap()
4169 }
4170 }
4171 }
4172
4173 #[derive(Clone, Default)]
4174 struct FakeAgentConnection {
4175 auth_methods: Vec<acp::AuthMethod>,
4176 sessions: Arc<parking_lot::Mutex<HashMap<acp::SessionId, WeakEntity<AcpThread>>>>,
4177 set_title_calls: Rc<RefCell<Vec<SharedString>>>,
4178 on_user_message: Option<
4179 Rc<
4180 dyn Fn(
4181 acp::PromptRequest,
4182 WeakEntity<AcpThread>,
4183 AsyncApp,
4184 ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
4185 + 'static,
4186 >,
4187 >,
4188 }
4189
4190 impl FakeAgentConnection {
4191 fn new() -> Self {
4192 Self {
4193 auth_methods: Vec::new(),
4194 on_user_message: None,
4195 sessions: Arc::default(),
4196 set_title_calls: Default::default(),
4197 }
4198 }
4199
4200 #[expect(unused)]
4201 fn with_auth_methods(mut self, auth_methods: Vec<acp::AuthMethod>) -> Self {
4202 self.auth_methods = auth_methods;
4203 self
4204 }
4205
4206 fn on_user_message(
4207 mut self,
4208 handler: impl Fn(
4209 acp::PromptRequest,
4210 WeakEntity<AcpThread>,
4211 AsyncApp,
4212 ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
4213 + 'static,
4214 ) -> Self {
4215 self.on_user_message.replace(Rc::new(handler));
4216 self
4217 }
4218 }
4219
4220 impl AgentConnection for FakeAgentConnection {
4221 fn agent_id(&self) -> AgentId {
4222 AgentId::new("fake")
4223 }
4224
4225 fn telemetry_id(&self) -> SharedString {
4226 "fake".into()
4227 }
4228
4229 fn auth_methods(&self) -> &[acp::AuthMethod] {
4230 &self.auth_methods
4231 }
4232
4233 fn new_session(
4234 self: Rc<Self>,
4235 project: Entity<Project>,
4236 work_dirs: PathList,
4237 cx: &mut App,
4238 ) -> Task<gpui::Result<Entity<AcpThread>>> {
4239 let session_id = acp::SessionId::new(
4240 rand::rng()
4241 .sample_iter(&distr::Alphanumeric)
4242 .take(7)
4243 .map(char::from)
4244 .collect::<String>(),
4245 );
4246 let action_log = cx.new(|_| ActionLog::new(project.clone()));
4247 let thread = cx.new(|cx| {
4248 AcpThread::new(
4249 None,
4250 "Test",
4251 Some(work_dirs),
4252 self.clone(),
4253 project,
4254 action_log,
4255 session_id.clone(),
4256 watch::Receiver::constant(
4257 acp::PromptCapabilities::new()
4258 .image(true)
4259 .audio(true)
4260 .embedded_context(true),
4261 ),
4262 cx,
4263 )
4264 });
4265 self.sessions.lock().insert(session_id, thread.downgrade());
4266 Task::ready(Ok(thread))
4267 }
4268
4269 fn authenticate(&self, method: acp::AuthMethodId, _cx: &mut App) -> Task<gpui::Result<()>> {
4270 if self.auth_methods().iter().any(|m| m.id() == &method) {
4271 Task::ready(Ok(()))
4272 } else {
4273 Task::ready(Err(anyhow!("Invalid Auth Method")))
4274 }
4275 }
4276
4277 fn prompt(
4278 &self,
4279 _id: Option<UserMessageId>,
4280 params: acp::PromptRequest,
4281 cx: &mut App,
4282 ) -> Task<gpui::Result<acp::PromptResponse>> {
4283 let sessions = self.sessions.lock();
4284 let thread = sessions.get(¶ms.session_id).unwrap();
4285 if let Some(handler) = &self.on_user_message {
4286 let handler = handler.clone();
4287 let thread = thread.clone();
4288 cx.spawn(async move |cx| handler(params, thread, cx.clone()).await)
4289 } else {
4290 Task::ready(Ok(acp::PromptResponse::new(acp::StopReason::EndTurn)))
4291 }
4292 }
4293
4294 fn cancel(&self, _session_id: &acp::SessionId, _cx: &mut App) {}
4295
4296 fn truncate(
4297 &self,
4298 session_id: &acp::SessionId,
4299 _cx: &App,
4300 ) -> Option<Rc<dyn AgentSessionTruncate>> {
4301 Some(Rc::new(FakeAgentSessionEditor {
4302 _session_id: session_id.clone(),
4303 }))
4304 }
4305
4306 fn set_title(
4307 &self,
4308 _session_id: &acp::SessionId,
4309 _cx: &App,
4310 ) -> Option<Rc<dyn AgentSessionSetTitle>> {
4311 Some(Rc::new(FakeAgentSessionSetTitle {
4312 calls: self.set_title_calls.clone(),
4313 }))
4314 }
4315
4316 fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
4317 self
4318 }
4319 }
4320
4321 struct FakeAgentSessionSetTitle {
4322 calls: Rc<RefCell<Vec<SharedString>>>,
4323 }
4324
4325 impl AgentSessionSetTitle for FakeAgentSessionSetTitle {
4326 fn run(&self, title: SharedString, _cx: &mut App) -> Task<Result<()>> {
4327 self.calls.borrow_mut().push(title);
4328 Task::ready(Ok(()))
4329 }
4330 }
4331
4332 struct FakeAgentSessionEditor {
4333 _session_id: acp::SessionId,
4334 }
4335
4336 impl AgentSessionTruncate for FakeAgentSessionEditor {
4337 fn run(&self, _message_id: UserMessageId, _cx: &mut App) -> Task<Result<()>> {
4338 Task::ready(Ok(()))
4339 }
4340 }
4341
4342 #[gpui::test]
4343 async fn test_tool_call_not_found_creates_failed_entry(cx: &mut TestAppContext) {
4344 init_test(cx);
4345
4346 let fs = FakeFs::new(cx.executor());
4347 let project = Project::test(fs, [], cx).await;
4348 let connection = Rc::new(FakeAgentConnection::new());
4349 let thread = cx
4350 .update(|cx| {
4351 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4352 })
4353 .await
4354 .unwrap();
4355
4356 // Try to update a tool call that doesn't exist
4357 let nonexistent_id = acp::ToolCallId::new("nonexistent-tool-call");
4358 thread.update(cx, |thread, cx| {
4359 let result = thread.handle_session_update(
4360 acp::SessionUpdate::ToolCallUpdate(acp::ToolCallUpdate::new(
4361 nonexistent_id.clone(),
4362 acp::ToolCallUpdateFields::new().status(acp::ToolCallStatus::Completed),
4363 )),
4364 cx,
4365 );
4366
4367 // The update should succeed (not return an error)
4368 assert!(result.is_ok());
4369
4370 // There should now be exactly one entry in the thread
4371 assert_eq!(thread.entries.len(), 1);
4372
4373 // The entry should be a failed tool call
4374 if let AgentThreadEntry::ToolCall(tool_call) = &thread.entries[0] {
4375 assert_eq!(tool_call.id, nonexistent_id);
4376 assert!(matches!(tool_call.status, ToolCallStatus::Failed));
4377 assert_eq!(tool_call.kind, acp::ToolKind::Fetch);
4378
4379 // Check that the content contains the error message
4380 assert_eq!(tool_call.content.len(), 1);
4381 if let ToolCallContent::ContentBlock(content_block) = &tool_call.content[0] {
4382 match content_block {
4383 ContentBlock::Markdown { markdown } => {
4384 let markdown_text = markdown.read(cx).source();
4385 assert!(markdown_text.contains("Tool call not found"));
4386 }
4387 ContentBlock::Empty => panic!("Expected markdown content, got empty"),
4388 ContentBlock::ResourceLink { .. } => {
4389 panic!("Expected markdown content, got resource link")
4390 }
4391 ContentBlock::Image { .. } => {
4392 panic!("Expected markdown content, got image")
4393 }
4394 }
4395 } else {
4396 panic!("Expected ContentBlock, got: {:?}", tool_call.content[0]);
4397 }
4398 } else {
4399 panic!("Expected ToolCall entry, got: {:?}", thread.entries[0]);
4400 }
4401 });
4402 }
4403
4404 /// Tests that restoring a checkpoint properly cleans up terminals that were
4405 /// created after that checkpoint, and cancels any in-progress generation.
4406 ///
4407 /// Reproduces issue #35142: When a checkpoint is restored, any terminal processes
4408 /// that were started after that checkpoint should be terminated, and any in-progress
4409 /// AI generation should be canceled.
4410 #[gpui::test]
4411 async fn test_restore_checkpoint_kills_terminal(cx: &mut TestAppContext) {
4412 init_test(cx);
4413
4414 let fs = FakeFs::new(cx.executor());
4415 let project = Project::test(fs, [], cx).await;
4416 let connection = Rc::new(FakeAgentConnection::new());
4417 let thread = cx
4418 .update(|cx| {
4419 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4420 })
4421 .await
4422 .unwrap();
4423
4424 // Send first user message to create a checkpoint
4425 cx.update(|cx| {
4426 thread.update(cx, |thread, cx| {
4427 thread.send(vec!["first message".into()], cx)
4428 })
4429 })
4430 .await
4431 .unwrap();
4432
4433 // Send second message (creates another checkpoint) - we'll restore to this one
4434 cx.update(|cx| {
4435 thread.update(cx, |thread, cx| {
4436 thread.send(vec!["second message".into()], cx)
4437 })
4438 })
4439 .await
4440 .unwrap();
4441
4442 // Create 2 terminals BEFORE the checkpoint that have completed running
4443 let terminal_id_1 = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
4444 let mock_terminal_1 = cx.new(|cx| {
4445 let builder = ::terminal::TerminalBuilder::new_display_only(
4446 ::terminal::terminal_settings::CursorShape::default(),
4447 ::terminal::terminal_settings::AlternateScroll::On,
4448 None,
4449 0,
4450 cx.background_executor(),
4451 PathStyle::local(),
4452 )
4453 .unwrap();
4454 builder.subscribe(cx)
4455 });
4456
4457 thread.update(cx, |thread, cx| {
4458 thread.on_terminal_provider_event(
4459 TerminalProviderEvent::Created {
4460 terminal_id: terminal_id_1.clone(),
4461 label: "echo 'first'".to_string(),
4462 cwd: Some(PathBuf::from("/test")),
4463 output_byte_limit: None,
4464 terminal: mock_terminal_1.clone(),
4465 },
4466 cx,
4467 );
4468 });
4469
4470 thread.update(cx, |thread, cx| {
4471 thread.on_terminal_provider_event(
4472 TerminalProviderEvent::Output {
4473 terminal_id: terminal_id_1.clone(),
4474 data: b"first\n".to_vec(),
4475 },
4476 cx,
4477 );
4478 });
4479
4480 thread.update(cx, |thread, cx| {
4481 thread.on_terminal_provider_event(
4482 TerminalProviderEvent::Exit {
4483 terminal_id: terminal_id_1.clone(),
4484 status: acp::TerminalExitStatus::new().exit_code(0),
4485 },
4486 cx,
4487 );
4488 });
4489
4490 let terminal_id_2 = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
4491 let mock_terminal_2 = cx.new(|cx| {
4492 let builder = ::terminal::TerminalBuilder::new_display_only(
4493 ::terminal::terminal_settings::CursorShape::default(),
4494 ::terminal::terminal_settings::AlternateScroll::On,
4495 None,
4496 0,
4497 cx.background_executor(),
4498 PathStyle::local(),
4499 )
4500 .unwrap();
4501 builder.subscribe(cx)
4502 });
4503
4504 thread.update(cx, |thread, cx| {
4505 thread.on_terminal_provider_event(
4506 TerminalProviderEvent::Created {
4507 terminal_id: terminal_id_2.clone(),
4508 label: "echo 'second'".to_string(),
4509 cwd: Some(PathBuf::from("/test")),
4510 output_byte_limit: None,
4511 terminal: mock_terminal_2.clone(),
4512 },
4513 cx,
4514 );
4515 });
4516
4517 thread.update(cx, |thread, cx| {
4518 thread.on_terminal_provider_event(
4519 TerminalProviderEvent::Output {
4520 terminal_id: terminal_id_2.clone(),
4521 data: b"second\n".to_vec(),
4522 },
4523 cx,
4524 );
4525 });
4526
4527 thread.update(cx, |thread, cx| {
4528 thread.on_terminal_provider_event(
4529 TerminalProviderEvent::Exit {
4530 terminal_id: terminal_id_2.clone(),
4531 status: acp::TerminalExitStatus::new().exit_code(0),
4532 },
4533 cx,
4534 );
4535 });
4536
4537 // Get the second message ID to restore to
4538 let second_message_id = thread.read_with(cx, |thread, _| {
4539 // At this point we have:
4540 // - Index 0: First user message (with checkpoint)
4541 // - Index 1: Second user message (with checkpoint)
4542 // No assistant responses because FakeAgentConnection just returns EndTurn
4543 let AgentThreadEntry::UserMessage(message) = &thread.entries[1] else {
4544 panic!("expected user message at index 1");
4545 };
4546 message.id.clone().unwrap()
4547 });
4548
4549 // Create a terminal AFTER the checkpoint we'll restore to.
4550 // This simulates the AI agent starting a long-running terminal command.
4551 let terminal_id = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
4552 let mock_terminal = cx.new(|cx| {
4553 let builder = ::terminal::TerminalBuilder::new_display_only(
4554 ::terminal::terminal_settings::CursorShape::default(),
4555 ::terminal::terminal_settings::AlternateScroll::On,
4556 None,
4557 0,
4558 cx.background_executor(),
4559 PathStyle::local(),
4560 )
4561 .unwrap();
4562 builder.subscribe(cx)
4563 });
4564
4565 // Register the terminal as created
4566 thread.update(cx, |thread, cx| {
4567 thread.on_terminal_provider_event(
4568 TerminalProviderEvent::Created {
4569 terminal_id: terminal_id.clone(),
4570 label: "sleep 1000".to_string(),
4571 cwd: Some(PathBuf::from("/test")),
4572 output_byte_limit: None,
4573 terminal: mock_terminal.clone(),
4574 },
4575 cx,
4576 );
4577 });
4578
4579 // Simulate the terminal producing output (still running)
4580 thread.update(cx, |thread, cx| {
4581 thread.on_terminal_provider_event(
4582 TerminalProviderEvent::Output {
4583 terminal_id: terminal_id.clone(),
4584 data: b"terminal is running...\n".to_vec(),
4585 },
4586 cx,
4587 );
4588 });
4589
4590 // Create a tool call entry that references this terminal
4591 // This represents the agent requesting a terminal command
4592 thread.update(cx, |thread, cx| {
4593 thread
4594 .handle_session_update(
4595 acp::SessionUpdate::ToolCall(
4596 acp::ToolCall::new("terminal-tool-1", "Running command")
4597 .kind(acp::ToolKind::Execute)
4598 .status(acp::ToolCallStatus::InProgress)
4599 .content(vec![acp::ToolCallContent::Terminal(acp::Terminal::new(
4600 terminal_id.clone(),
4601 ))])
4602 .raw_input(serde_json::json!({"command": "sleep 1000", "cd": "/test"})),
4603 ),
4604 cx,
4605 )
4606 .unwrap();
4607 });
4608
4609 // Verify terminal exists and is in the thread
4610 let terminal_exists_before =
4611 thread.read_with(cx, |thread, _| thread.terminals.contains_key(&terminal_id));
4612 assert!(
4613 terminal_exists_before,
4614 "Terminal should exist before checkpoint restore"
4615 );
4616
4617 // Verify the terminal's underlying task is still running (not completed)
4618 let terminal_running_before = thread.read_with(cx, |thread, _cx| {
4619 let terminal_entity = thread.terminals.get(&terminal_id).unwrap();
4620 terminal_entity.read_with(cx, |term, _cx| {
4621 term.output().is_none() // output is None means it's still running
4622 })
4623 });
4624 assert!(
4625 terminal_running_before,
4626 "Terminal should be running before checkpoint restore"
4627 );
4628
4629 // Verify we have the expected entries before restore
4630 let entry_count_before = thread.read_with(cx, |thread, _| thread.entries.len());
4631 assert!(
4632 entry_count_before > 1,
4633 "Should have multiple entries before restore"
4634 );
4635
4636 // Restore the checkpoint to the second message.
4637 // This should:
4638 // 1. Cancel any in-progress generation (via the cancel() call)
4639 // 2. Remove the terminal that was created after that point
4640 thread
4641 .update(cx, |thread, cx| {
4642 thread.restore_checkpoint(second_message_id, cx)
4643 })
4644 .await
4645 .unwrap();
4646
4647 // Verify that no send_task is in progress after restore
4648 // (cancel() clears the send_task)
4649 let has_send_task_after = thread.read_with(cx, |thread, _| thread.running_turn.is_some());
4650 assert!(
4651 !has_send_task_after,
4652 "Should not have a send_task after restore (cancel should have cleared it)"
4653 );
4654
4655 // Verify the entries were truncated (restoring to index 1 truncates at 1, keeping only index 0)
4656 let entry_count = thread.read_with(cx, |thread, _| thread.entries.len());
4657 assert_eq!(
4658 entry_count, 1,
4659 "Should have 1 entry after restore (only the first user message)"
4660 );
4661
4662 // Verify the 2 completed terminals from before the checkpoint still exist
4663 let terminal_1_exists = thread.read_with(cx, |thread, _| {
4664 thread.terminals.contains_key(&terminal_id_1)
4665 });
4666 assert!(
4667 terminal_1_exists,
4668 "Terminal 1 (from before checkpoint) should still exist"
4669 );
4670
4671 let terminal_2_exists = thread.read_with(cx, |thread, _| {
4672 thread.terminals.contains_key(&terminal_id_2)
4673 });
4674 assert!(
4675 terminal_2_exists,
4676 "Terminal 2 (from before checkpoint) should still exist"
4677 );
4678
4679 // Verify they're still in completed state
4680 let terminal_1_completed = thread.read_with(cx, |thread, _cx| {
4681 let terminal_entity = thread.terminals.get(&terminal_id_1).unwrap();
4682 terminal_entity.read_with(cx, |term, _cx| term.output().is_some())
4683 });
4684 assert!(terminal_1_completed, "Terminal 1 should still be completed");
4685
4686 let terminal_2_completed = thread.read_with(cx, |thread, _cx| {
4687 let terminal_entity = thread.terminals.get(&terminal_id_2).unwrap();
4688 terminal_entity.read_with(cx, |term, _cx| term.output().is_some())
4689 });
4690 assert!(terminal_2_completed, "Terminal 2 should still be completed");
4691
4692 // Verify the running terminal (created after checkpoint) was removed
4693 let terminal_3_exists =
4694 thread.read_with(cx, |thread, _| thread.terminals.contains_key(&terminal_id));
4695 assert!(
4696 !terminal_3_exists,
4697 "Terminal 3 (created after checkpoint) should have been removed"
4698 );
4699
4700 // Verify total count is 2 (the two from before the checkpoint)
4701 let terminal_count = thread.read_with(cx, |thread, _| thread.terminals.len());
4702 assert_eq!(
4703 terminal_count, 2,
4704 "Should have exactly 2 terminals (the completed ones from before checkpoint)"
4705 );
4706 }
4707
4708 /// Tests that update_last_checkpoint correctly updates the original message's checkpoint
4709 /// even when a new user message is added while the async checkpoint comparison is in progress.
4710 ///
4711 /// This is a regression test for a bug where update_last_checkpoint would fail with
4712 /// "no checkpoint" if a new user message (without a checkpoint) was added between when
4713 /// update_last_checkpoint started and when its async closure ran.
4714 #[gpui::test]
4715 async fn test_update_last_checkpoint_with_new_message_added(cx: &mut TestAppContext) {
4716 init_test(cx);
4717
4718 let fs = FakeFs::new(cx.executor());
4719 fs.insert_tree(path!("/test"), json!({".git": {}, "file.txt": "content"}))
4720 .await;
4721 let project = Project::test(fs.clone(), [Path::new(path!("/test"))], cx).await;
4722
4723 let handler_done = Arc::new(AtomicBool::new(false));
4724 let handler_done_clone = handler_done.clone();
4725 let connection = Rc::new(FakeAgentConnection::new().on_user_message(
4726 move |_, _thread, _cx| {
4727 handler_done_clone.store(true, SeqCst);
4728 async move { Ok(acp::PromptResponse::new(acp::StopReason::EndTurn)) }.boxed_local()
4729 },
4730 ));
4731
4732 let thread = cx
4733 .update(|cx| {
4734 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4735 })
4736 .await
4737 .unwrap();
4738
4739 let send_future = thread.update(cx, |thread, cx| thread.send_raw("First message", cx));
4740 let send_task = cx.background_executor.spawn(send_future);
4741
4742 // Tick until handler completes, then a few more to let update_last_checkpoint start
4743 while !handler_done.load(SeqCst) {
4744 cx.executor().tick();
4745 }
4746 for _ in 0..5 {
4747 cx.executor().tick();
4748 }
4749
4750 thread.update(cx, |thread, cx| {
4751 thread.push_entry(
4752 AgentThreadEntry::UserMessage(UserMessage {
4753 id: Some(UserMessageId::new()),
4754 content: ContentBlock::Empty,
4755 chunks: vec!["Injected message (no checkpoint)".into()],
4756 checkpoint: None,
4757 indented: false,
4758 }),
4759 cx,
4760 );
4761 });
4762
4763 cx.run_until_parked();
4764 let result = send_task.await;
4765
4766 assert!(
4767 result.is_ok(),
4768 "send should succeed even when new message added during update_last_checkpoint: {:?}",
4769 result.err()
4770 );
4771 }
4772
4773 /// Tests that when a follow-up message is sent during generation,
4774 /// the first turn completing does NOT clear `running_turn` because
4775 /// it now belongs to the second turn.
4776 #[gpui::test]
4777 async fn test_follow_up_message_during_generation_does_not_clear_turn(cx: &mut TestAppContext) {
4778 init_test(cx);
4779
4780 let fs = FakeFs::new(cx.executor());
4781 let project = Project::test(fs, [], cx).await;
4782
4783 // First handler waits for this signal before completing
4784 let (first_complete_tx, first_complete_rx) = futures::channel::oneshot::channel::<()>();
4785 let first_complete_rx = RefCell::new(Some(first_complete_rx));
4786
4787 let connection = Rc::new(FakeAgentConnection::new().on_user_message({
4788 move |params, _thread, _cx| {
4789 let first_complete_rx = first_complete_rx.borrow_mut().take();
4790 let is_first = params
4791 .prompt
4792 .iter()
4793 .any(|c| matches!(c, acp::ContentBlock::Text(t) if t.text.contains("first")));
4794
4795 async move {
4796 if is_first {
4797 // First handler waits until signaled
4798 if let Some(rx) = first_complete_rx {
4799 rx.await.ok();
4800 }
4801 }
4802 Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
4803 }
4804 .boxed_local()
4805 }
4806 }));
4807
4808 let thread = cx
4809 .update(|cx| {
4810 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4811 })
4812 .await
4813 .unwrap();
4814
4815 // Send first message (turn_id=1) - handler will block
4816 let first_request = thread.update(cx, |thread, cx| thread.send_raw("first", cx));
4817 assert_eq!(thread.read_with(cx, |t, _| t.turn_id), 1);
4818
4819 // Send second message (turn_id=2) while first is still blocked
4820 // This calls cancel() which takes turn 1's running_turn and sets turn 2's
4821 let second_request = thread.update(cx, |thread, cx| thread.send_raw("second", cx));
4822 assert_eq!(thread.read_with(cx, |t, _| t.turn_id), 2);
4823
4824 let running_turn_after_second_send =
4825 thread.read_with(cx, |thread, _| thread.running_turn.as_ref().map(|t| t.id));
4826 assert_eq!(
4827 running_turn_after_second_send,
4828 Some(2),
4829 "running_turn should be set to turn 2 after sending second message"
4830 );
4831
4832 // Now signal first handler to complete
4833 first_complete_tx.send(()).ok();
4834
4835 // First request completes - should NOT clear running_turn
4836 // because running_turn now belongs to turn 2
4837 first_request.await.unwrap();
4838
4839 let running_turn_after_first =
4840 thread.read_with(cx, |thread, _| thread.running_turn.as_ref().map(|t| t.id));
4841 assert_eq!(
4842 running_turn_after_first,
4843 Some(2),
4844 "first turn completing should not clear running_turn (belongs to turn 2)"
4845 );
4846
4847 // Second request completes - SHOULD clear running_turn
4848 second_request.await.unwrap();
4849
4850 let running_turn_after_second =
4851 thread.read_with(cx, |thread, _| thread.running_turn.is_some());
4852 assert!(
4853 !running_turn_after_second,
4854 "second turn completing should clear running_turn"
4855 );
4856 }
4857
4858 #[gpui::test]
4859 async fn test_send_returns_cancelled_response_and_marks_tools_as_cancelled(
4860 cx: &mut TestAppContext,
4861 ) {
4862 init_test(cx);
4863
4864 let fs = FakeFs::new(cx.executor());
4865 let project = Project::test(fs, [], cx).await;
4866
4867 let connection = Rc::new(FakeAgentConnection::new().on_user_message(
4868 move |_params, thread, mut cx| {
4869 async move {
4870 thread
4871 .update(&mut cx, |thread, cx| {
4872 thread.handle_session_update(
4873 acp::SessionUpdate::ToolCall(
4874 acp::ToolCall::new(
4875 acp::ToolCallId::new("test-tool"),
4876 "Test Tool",
4877 )
4878 .kind(acp::ToolKind::Fetch)
4879 .status(acp::ToolCallStatus::InProgress),
4880 ),
4881 cx,
4882 )
4883 })
4884 .unwrap()
4885 .unwrap();
4886
4887 Ok(acp::PromptResponse::new(acp::StopReason::Cancelled))
4888 }
4889 .boxed_local()
4890 },
4891 ));
4892
4893 let thread = cx
4894 .update(|cx| {
4895 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4896 })
4897 .await
4898 .unwrap();
4899
4900 let response = thread
4901 .update(cx, |thread, cx| thread.send_raw("test message", cx))
4902 .await;
4903
4904 let response = response
4905 .expect("send should succeed")
4906 .expect("should have response");
4907 assert_eq!(
4908 response.stop_reason,
4909 acp::StopReason::Cancelled,
4910 "response should have Cancelled stop_reason"
4911 );
4912
4913 thread.read_with(cx, |thread, _| {
4914 let tool_entry = thread
4915 .entries
4916 .iter()
4917 .find_map(|e| {
4918 if let AgentThreadEntry::ToolCall(call) = e {
4919 Some(call)
4920 } else {
4921 None
4922 }
4923 })
4924 .expect("should have tool call entry");
4925
4926 assert!(
4927 matches!(tool_entry.status, ToolCallStatus::Canceled),
4928 "tool should be marked as Canceled when response is Cancelled, got {:?}",
4929 tool_entry.status
4930 );
4931 });
4932 }
4933
4934 #[gpui::test]
4935 async fn test_provisional_title_replaced_by_real_title(cx: &mut TestAppContext) {
4936 init_test(cx);
4937
4938 let fs = FakeFs::new(cx.executor());
4939 let project = Project::test(fs, [], cx).await;
4940 let connection = Rc::new(FakeAgentConnection::new());
4941 let set_title_calls = connection.set_title_calls.clone();
4942
4943 let thread = cx
4944 .update(|cx| {
4945 connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4946 })
4947 .await
4948 .unwrap();
4949
4950 // Initial title is the default.
4951 thread.read_with(cx, |thread, _| {
4952 assert_eq!(thread.title().as_ref(), "Test");
4953 });
4954
4955 // Setting a provisional title updates the display title.
4956 thread.update(cx, |thread, cx| {
4957 thread.set_provisional_title("Hello, can you help…".into(), cx);
4958 });
4959 thread.read_with(cx, |thread, _| {
4960 assert_eq!(thread.title().as_ref(), "Hello, can you help…");
4961 });
4962
4963 // The provisional title should NOT have propagated to the connection.
4964 assert_eq!(
4965 set_title_calls.borrow().len(),
4966 0,
4967 "provisional title should not propagate to the connection"
4968 );
4969
4970 // When the real title arrives via set_title, it replaces the
4971 // provisional title and propagates to the connection.
4972 let task = thread.update(cx, |thread, cx| {
4973 thread.set_title("Helping with Rust question".into(), cx)
4974 });
4975 task.await.expect("set_title should succeed");
4976 thread.read_with(cx, |thread, _| {
4977 assert_eq!(thread.title().as_ref(), "Helping with Rust question");
4978 });
4979 assert_eq!(
4980 set_title_calls.borrow().as_slice(),
4981 &[SharedString::from("Helping with Rust question")],
4982 "real title should propagate to the connection"
4983 );
4984 }
4985
4986 #[gpui::test]
4987 async fn test_session_info_update_replaces_provisional_title_and_emits_event(
4988 cx: &mut TestAppContext,
4989 ) {
4990 init_test(cx);
4991
4992 let fs = FakeFs::new(cx.executor());
4993 let project = Project::test(fs, [], cx).await;
4994 let connection = Rc::new(FakeAgentConnection::new());
4995
4996 let thread = cx
4997 .update(|cx| {
4998 connection.clone().new_session(
4999 project,
5000 PathList::new(&[Path::new(path!("/test"))]),
5001 cx,
5002 )
5003 })
5004 .await
5005 .unwrap();
5006
5007 let title_updated_events = Rc::new(RefCell::new(0usize));
5008 let title_updated_events_for_subscription = title_updated_events.clone();
5009 thread.update(cx, |_thread, cx| {
5010 cx.subscribe(
5011 &thread,
5012 move |_thread, _event_thread, event: &AcpThreadEvent, _cx| {
5013 if matches!(event, AcpThreadEvent::TitleUpdated) {
5014 *title_updated_events_for_subscription.borrow_mut() += 1;
5015 }
5016 },
5017 )
5018 .detach();
5019 });
5020
5021 thread.update(cx, |thread, cx| {
5022 thread.set_provisional_title("Hello, can you help…".into(), cx);
5023 });
5024 assert_eq!(
5025 *title_updated_events.borrow(),
5026 1,
5027 "setting a provisional title should emit TitleUpdated"
5028 );
5029
5030 let result = thread.update(cx, |thread, cx| {
5031 thread.handle_session_update(
5032 acp::SessionUpdate::SessionInfoUpdate(
5033 acp::SessionInfoUpdate::new().title("Helping with Rust question"),
5034 ),
5035 cx,
5036 )
5037 });
5038 result.expect("session info update should succeed");
5039
5040 thread.read_with(cx, |thread, _| {
5041 assert_eq!(thread.title().as_ref(), "Helping with Rust question");
5042 assert!(
5043 !thread.has_provisional_title(),
5044 "session info title update should clear provisional title"
5045 );
5046 });
5047
5048 assert_eq!(
5049 *title_updated_events.borrow(),
5050 2,
5051 "session info title update should emit TitleUpdated"
5052 );
5053 assert!(
5054 connection.set_title_calls.borrow().is_empty(),
5055 "session info title update should not propagate back to the connection"
5056 );
5057 }
5058}