acp_thread.rs

   1mod connection;
   2mod diff;
   3mod mention;
   4mod terminal;
   5use action_log::{ActionLog, ActionLogTelemetry};
   6use agent_client_protocol::{self as acp};
   7use anyhow::{Context as _, Result, anyhow};
   8use collections::HashSet;
   9pub use connection::*;
  10pub use diff::*;
  11use feature_flags::{AcpBetaFeatureFlag, FeatureFlagAppExt as _};
  12use futures::{FutureExt, channel::oneshot, future::BoxFuture};
  13use gpui::{AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Task, WeakEntity};
  14use itertools::Itertools;
  15use language::language_settings::FormatOnSave;
  16use language::{Anchor, Buffer, BufferSnapshot, LanguageRegistry, Point, ToPoint, text_diff};
  17use markdown::Markdown;
  18pub use mention::*;
  19use project::lsp_store::{FormatTrigger, LspFormatTarget};
  20use project::{AgentLocation, Project, git_store::GitStoreCheckpoint};
  21use serde::{Deserialize, Serialize};
  22use serde_json::to_string_pretty;
  23use std::collections::HashMap;
  24use std::error::Error;
  25use std::fmt::{Formatter, Write};
  26use std::ops::Range;
  27use std::process::ExitStatus;
  28use std::rc::Rc;
  29use std::time::{Duration, Instant};
  30use std::{fmt::Display, mem, path::PathBuf, sync::Arc};
  31use task::{Shell, ShellBuilder};
  32pub use terminal::*;
  33use text::Bias;
  34use ui::App;
  35use util::markdown::MarkdownEscaped;
  36use util::path_list::PathList;
  37use util::{ResultExt, get_default_system_shell_preferring_bash, paths::PathStyle};
  38use uuid::Uuid;
  39
  40/// Returned when the model stops because it exhausted its output token budget.
  41#[derive(Debug)]
  42pub struct MaxOutputTokensError;
  43
  44impl std::fmt::Display for MaxOutputTokensError {
  45    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
  46        write!(f, "output token limit reached")
  47    }
  48}
  49
  50impl std::error::Error for MaxOutputTokensError {}
  51
  52/// Key used in ACP ToolCall meta to store the tool's programmatic name.
  53/// This is a workaround since ACP's ToolCall doesn't have a dedicated name field.
  54pub const TOOL_NAME_META_KEY: &str = "tool_name";
  55
  56/// Helper to extract tool name from ACP meta
  57pub fn tool_name_from_meta(meta: &Option<acp::Meta>) -> Option<SharedString> {
  58    meta.as_ref()
  59        .and_then(|m| m.get(TOOL_NAME_META_KEY))
  60        .and_then(|v| v.as_str())
  61        .map(|s| SharedString::from(s.to_owned()))
  62}
  63
  64/// Helper to create meta with tool name
  65pub fn meta_with_tool_name(tool_name: &str) -> acp::Meta {
  66    acp::Meta::from_iter([(TOOL_NAME_META_KEY.into(), tool_name.into())])
  67}
  68
  69/// Key used in ACP ToolCall meta to store the session id and message indexes
  70pub const SUBAGENT_SESSION_INFO_META_KEY: &str = "subagent_session_info";
  71
  72#[derive(Clone, Debug, Deserialize, Serialize)]
  73pub struct SubagentSessionInfo {
  74    /// The session id of the subagent sessiont that was spawned
  75    pub session_id: acp::SessionId,
  76    /// The index of the message of the start of the "turn" run by this tool call
  77    pub message_start_index: usize,
  78    /// The index of the output of the message that the subagent has returned
  79    #[serde(skip_serializing_if = "Option::is_none")]
  80    pub message_end_index: Option<usize>,
  81}
  82
  83/// Helper to extract subagent session id from ACP meta
  84pub fn subagent_session_info_from_meta(meta: &Option<acp::Meta>) -> Option<SubagentSessionInfo> {
  85    meta.as_ref()
  86        .and_then(|m| m.get(SUBAGENT_SESSION_INFO_META_KEY))
  87        .and_then(|v| serde_json::from_value(v.clone()).ok())
  88}
  89
  90#[derive(Debug)]
  91pub struct UserMessage {
  92    pub id: Option<UserMessageId>,
  93    pub content: ContentBlock,
  94    pub chunks: Vec<acp::ContentBlock>,
  95    pub checkpoint: Option<Checkpoint>,
  96    pub indented: bool,
  97}
  98
  99#[derive(Debug)]
 100pub struct Checkpoint {
 101    git_checkpoint: GitStoreCheckpoint,
 102    pub show: bool,
 103}
 104
 105impl UserMessage {
 106    fn to_markdown(&self, cx: &App) -> String {
 107        let mut markdown = String::new();
 108        if self
 109            .checkpoint
 110            .as_ref()
 111            .is_some_and(|checkpoint| checkpoint.show)
 112        {
 113            writeln!(markdown, "## User (checkpoint)").unwrap();
 114        } else {
 115            writeln!(markdown, "## User").unwrap();
 116        }
 117        writeln!(markdown).unwrap();
 118        writeln!(markdown, "{}", self.content.to_markdown(cx)).unwrap();
 119        writeln!(markdown).unwrap();
 120        markdown
 121    }
 122}
 123
 124#[derive(Debug, PartialEq)]
 125pub struct AssistantMessage {
 126    pub chunks: Vec<AssistantMessageChunk>,
 127    pub indented: bool,
 128    pub is_subagent_output: bool,
 129}
 130
 131impl AssistantMessage {
 132    pub fn to_markdown(&self, cx: &App) -> String {
 133        format!(
 134            "## Assistant\n\n{}\n\n",
 135            self.chunks
 136                .iter()
 137                .map(|chunk| chunk.to_markdown(cx))
 138                .join("\n\n")
 139        )
 140    }
 141}
 142
 143#[derive(Debug, PartialEq)]
 144pub enum AssistantMessageChunk {
 145    Message { block: ContentBlock },
 146    Thought { block: ContentBlock },
 147}
 148
 149impl AssistantMessageChunk {
 150    pub fn from_str(
 151        chunk: &str,
 152        language_registry: &Arc<LanguageRegistry>,
 153        path_style: PathStyle,
 154        cx: &mut App,
 155    ) -> Self {
 156        Self::Message {
 157            block: ContentBlock::new(chunk.into(), language_registry, path_style, cx),
 158        }
 159    }
 160
 161    fn to_markdown(&self, cx: &App) -> String {
 162        match self {
 163            Self::Message { block } => block.to_markdown(cx).to_string(),
 164            Self::Thought { block } => {
 165                format!("<thinking>\n{}\n</thinking>", block.to_markdown(cx))
 166            }
 167        }
 168    }
 169}
 170
 171#[derive(Debug)]
 172pub enum AgentThreadEntry {
 173    UserMessage(UserMessage),
 174    AssistantMessage(AssistantMessage),
 175    ToolCall(ToolCall),
 176    CompletedPlan(Vec<PlanEntry>),
 177}
 178
 179impl AgentThreadEntry {
 180    pub fn is_indented(&self) -> bool {
 181        match self {
 182            Self::UserMessage(message) => message.indented,
 183            Self::AssistantMessage(message) => message.indented,
 184            Self::ToolCall(_) => false,
 185            Self::CompletedPlan(_) => false,
 186        }
 187    }
 188
 189    pub fn to_markdown(&self, cx: &App) -> String {
 190        match self {
 191            Self::UserMessage(message) => message.to_markdown(cx),
 192            Self::AssistantMessage(message) => message.to_markdown(cx),
 193            Self::ToolCall(tool_call) => tool_call.to_markdown(cx),
 194            Self::CompletedPlan(entries) => {
 195                let mut md = String::from("## Plan\n\n");
 196                for entry in entries {
 197                    let source = entry.content.read(cx).source().to_string();
 198                    md.push_str(&format!("- [x] {}\n", source));
 199                }
 200                md
 201            }
 202        }
 203    }
 204
 205    pub fn user_message(&self) -> Option<&UserMessage> {
 206        if let AgentThreadEntry::UserMessage(message) = self {
 207            Some(message)
 208        } else {
 209            None
 210        }
 211    }
 212
 213    pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
 214        if let AgentThreadEntry::ToolCall(call) = self {
 215            itertools::Either::Left(call.diffs())
 216        } else {
 217            itertools::Either::Right(std::iter::empty())
 218        }
 219    }
 220
 221    pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
 222        if let AgentThreadEntry::ToolCall(call) = self {
 223            itertools::Either::Left(call.terminals())
 224        } else {
 225            itertools::Either::Right(std::iter::empty())
 226        }
 227    }
 228
 229    pub fn location(&self, ix: usize) -> Option<(acp::ToolCallLocation, AgentLocation)> {
 230        if let AgentThreadEntry::ToolCall(ToolCall {
 231            locations,
 232            resolved_locations,
 233            ..
 234        }) = self
 235        {
 236            Some((
 237                locations.get(ix)?.clone(),
 238                resolved_locations.get(ix)?.clone()?,
 239            ))
 240        } else {
 241            None
 242        }
 243    }
 244}
 245
 246#[derive(Debug)]
 247pub struct ToolCall {
 248    pub id: acp::ToolCallId,
 249    pub label: Entity<Markdown>,
 250    pub kind: acp::ToolKind,
 251    pub content: Vec<ToolCallContent>,
 252    pub status: ToolCallStatus,
 253    pub locations: Vec<acp::ToolCallLocation>,
 254    pub resolved_locations: Vec<Option<AgentLocation>>,
 255    pub raw_input: Option<serde_json::Value>,
 256    pub raw_input_markdown: Option<Entity<Markdown>>,
 257    pub raw_output: Option<serde_json::Value>,
 258    pub tool_name: Option<SharedString>,
 259    pub subagent_session_info: Option<SubagentSessionInfo>,
 260}
 261
 262impl ToolCall {
 263    fn from_acp(
 264        tool_call: acp::ToolCall,
 265        status: ToolCallStatus,
 266        language_registry: Arc<LanguageRegistry>,
 267        path_style: PathStyle,
 268        terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
 269        cx: &mut App,
 270    ) -> Result<Self> {
 271        let title = if tool_call.kind == acp::ToolKind::Execute {
 272            tool_call.title
 273        } else if tool_call.kind == acp::ToolKind::Edit {
 274            MarkdownEscaped(tool_call.title.as_str()).to_string()
 275        } else if let Some((first_line, _)) = tool_call.title.split_once("\n") {
 276            first_line.to_owned() + ""
 277        } else {
 278            tool_call.title
 279        };
 280        let mut content = Vec::with_capacity(tool_call.content.len());
 281        for item in tool_call.content {
 282            if let Some(item) = ToolCallContent::from_acp(
 283                item,
 284                language_registry.clone(),
 285                path_style,
 286                terminals,
 287                cx,
 288            )? {
 289                content.push(item);
 290            }
 291        }
 292
 293        let raw_input_markdown = tool_call
 294            .raw_input
 295            .as_ref()
 296            .and_then(|input| markdown_for_raw_output(input, &language_registry, cx));
 297
 298        let tool_name = tool_name_from_meta(&tool_call.meta);
 299
 300        let subagent_session_info = subagent_session_info_from_meta(&tool_call.meta);
 301
 302        let result = Self {
 303            id: tool_call.tool_call_id,
 304            label: cx
 305                .new(|cx| Markdown::new(title.into(), Some(language_registry.clone()), None, cx)),
 306            kind: tool_call.kind,
 307            content,
 308            locations: tool_call.locations,
 309            resolved_locations: Vec::default(),
 310            status,
 311            raw_input: tool_call.raw_input,
 312            raw_input_markdown,
 313            raw_output: tool_call.raw_output,
 314            tool_name,
 315            subagent_session_info,
 316        };
 317        Ok(result)
 318    }
 319
 320    fn update_fields(
 321        &mut self,
 322        fields: acp::ToolCallUpdateFields,
 323        meta: Option<acp::Meta>,
 324        language_registry: Arc<LanguageRegistry>,
 325        path_style: PathStyle,
 326        terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
 327        cx: &mut App,
 328    ) -> Result<()> {
 329        let acp::ToolCallUpdateFields {
 330            kind,
 331            status,
 332            title,
 333            content,
 334            locations,
 335            raw_input,
 336            raw_output,
 337            ..
 338        } = fields;
 339
 340        if let Some(kind) = kind {
 341            self.kind = kind;
 342        }
 343
 344        if let Some(status) = status {
 345            self.status = status.into();
 346        }
 347
 348        if let Some(subagent_session_info) = subagent_session_info_from_meta(&meta) {
 349            self.subagent_session_info = Some(subagent_session_info);
 350        }
 351
 352        if let Some(title) = title {
 353            if self.kind == acp::ToolKind::Execute {
 354                for terminal in self.terminals() {
 355                    terminal.update(cx, |terminal, cx| {
 356                        terminal.update_command_label(&title, cx);
 357                    });
 358                }
 359            }
 360            self.label.update(cx, |label, cx| {
 361                if self.kind == acp::ToolKind::Execute {
 362                    label.replace(title, cx);
 363                } else if self.kind == acp::ToolKind::Edit {
 364                    label.replace(MarkdownEscaped(&title).to_string(), cx)
 365                } else if let Some((first_line, _)) = title.split_once("\n") {
 366                    label.replace(first_line.to_owned() + "", cx);
 367                } else {
 368                    label.replace(title, cx);
 369                }
 370            });
 371        }
 372
 373        if let Some(content) = content {
 374            let mut new_content_len = content.len();
 375            let mut content = content.into_iter();
 376
 377            // Reuse existing content if we can
 378            for (old, new) in self.content.iter_mut().zip(content.by_ref()) {
 379                let valid_content =
 380                    old.update_from_acp(new, language_registry.clone(), path_style, terminals, cx)?;
 381                if !valid_content {
 382                    new_content_len -= 1;
 383                }
 384            }
 385            for new in content {
 386                if let Some(new) = ToolCallContent::from_acp(
 387                    new,
 388                    language_registry.clone(),
 389                    path_style,
 390                    terminals,
 391                    cx,
 392                )? {
 393                    self.content.push(new);
 394                } else {
 395                    new_content_len -= 1;
 396                }
 397            }
 398            self.content.truncate(new_content_len);
 399        }
 400
 401        if let Some(locations) = locations {
 402            self.locations = locations;
 403        }
 404
 405        if let Some(raw_input) = raw_input {
 406            self.raw_input_markdown = markdown_for_raw_output(&raw_input, &language_registry, cx);
 407            self.raw_input = Some(raw_input);
 408        }
 409
 410        if let Some(raw_output) = raw_output {
 411            if self.content.is_empty()
 412                && let Some(markdown) = markdown_for_raw_output(&raw_output, &language_registry, cx)
 413            {
 414                self.content
 415                    .push(ToolCallContent::ContentBlock(ContentBlock::Markdown {
 416                        markdown,
 417                    }));
 418            }
 419            self.raw_output = Some(raw_output);
 420        }
 421        Ok(())
 422    }
 423
 424    pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
 425        self.content.iter().filter_map(|content| match content {
 426            ToolCallContent::Diff(diff) => Some(diff),
 427            ToolCallContent::ContentBlock(_) => None,
 428            ToolCallContent::Terminal(_) => None,
 429        })
 430    }
 431
 432    pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
 433        self.content.iter().filter_map(|content| match content {
 434            ToolCallContent::Terminal(terminal) => Some(terminal),
 435            ToolCallContent::ContentBlock(_) => None,
 436            ToolCallContent::Diff(_) => None,
 437        })
 438    }
 439
 440    pub fn is_subagent(&self) -> bool {
 441        self.tool_name.as_ref().is_some_and(|s| s == "spawn_agent")
 442            || self.subagent_session_info.is_some()
 443    }
 444
 445    pub fn to_markdown(&self, cx: &App) -> String {
 446        let mut markdown = format!(
 447            "**Tool Call: {}**\nStatus: {}\n\n",
 448            self.label.read(cx).source(),
 449            self.status
 450        );
 451        for content in &self.content {
 452            markdown.push_str(content.to_markdown(cx).as_str());
 453            markdown.push_str("\n\n");
 454        }
 455        markdown
 456    }
 457
 458    async fn resolve_location(
 459        location: acp::ToolCallLocation,
 460        project: WeakEntity<Project>,
 461        cx: &mut AsyncApp,
 462    ) -> Option<ResolvedLocation> {
 463        let buffer = project
 464            .update(cx, |project, cx| {
 465                project
 466                    .project_path_for_absolute_path(&location.path, cx)
 467                    .map(|path| project.open_buffer(path, cx))
 468            })
 469            .ok()??;
 470        let buffer = buffer.await.log_err()?;
 471        let position = buffer.update(cx, |buffer, _| {
 472            let snapshot = buffer.snapshot();
 473            if let Some(row) = location.line {
 474                let column = snapshot.indent_size_for_line(row).len;
 475                let point = snapshot.clip_point(Point::new(row, column), Bias::Left);
 476                snapshot.anchor_before(point)
 477            } else {
 478                Anchor::min_for_buffer(snapshot.remote_id())
 479            }
 480        });
 481
 482        Some(ResolvedLocation { buffer, position })
 483    }
 484
 485    fn resolve_locations(
 486        &self,
 487        project: Entity<Project>,
 488        cx: &mut App,
 489    ) -> Task<Vec<Option<ResolvedLocation>>> {
 490        let locations = self.locations.clone();
 491        project.update(cx, |_, cx| {
 492            cx.spawn(async move |project, cx| {
 493                let mut new_locations = Vec::new();
 494                for location in locations {
 495                    new_locations.push(Self::resolve_location(location, project.clone(), cx).await);
 496                }
 497                new_locations
 498            })
 499        })
 500    }
 501}
 502
 503// Separate so we can hold a strong reference to the buffer
 504// for saving on the thread
 505#[derive(Clone, Debug, PartialEq, Eq)]
 506struct ResolvedLocation {
 507    buffer: Entity<Buffer>,
 508    position: Anchor,
 509}
 510
 511impl From<&ResolvedLocation> for AgentLocation {
 512    fn from(value: &ResolvedLocation) -> Self {
 513        Self {
 514            buffer: value.buffer.downgrade(),
 515            position: value.position,
 516        }
 517    }
 518}
 519
 520#[derive(Debug, Clone)]
 521pub enum SelectedPermissionParams {
 522    Terminal { patterns: Vec<String> },
 523}
 524
 525#[derive(Debug)]
 526pub struct SelectedPermissionOutcome {
 527    pub option_id: acp::PermissionOptionId,
 528    pub option_kind: acp::PermissionOptionKind,
 529    pub params: Option<SelectedPermissionParams>,
 530}
 531
 532impl SelectedPermissionOutcome {
 533    pub fn new(option_id: acp::PermissionOptionId, option_kind: acp::PermissionOptionKind) -> Self {
 534        Self {
 535            option_id,
 536            option_kind,
 537            params: None,
 538        }
 539    }
 540
 541    pub fn params(mut self, params: Option<SelectedPermissionParams>) -> Self {
 542        self.params = params;
 543        self
 544    }
 545}
 546
 547impl From<SelectedPermissionOutcome> for acp::SelectedPermissionOutcome {
 548    fn from(value: SelectedPermissionOutcome) -> Self {
 549        Self::new(value.option_id)
 550    }
 551}
 552
 553#[derive(Debug)]
 554pub enum RequestPermissionOutcome {
 555    Cancelled,
 556    Selected(SelectedPermissionOutcome),
 557}
 558
 559impl From<RequestPermissionOutcome> for acp::RequestPermissionOutcome {
 560    fn from(value: RequestPermissionOutcome) -> Self {
 561        match value {
 562            RequestPermissionOutcome::Cancelled => Self::Cancelled,
 563            RequestPermissionOutcome::Selected(outcome) => Self::Selected(outcome.into()),
 564        }
 565    }
 566}
 567
 568#[derive(Debug)]
 569pub enum ToolCallStatus {
 570    /// The tool call hasn't started running yet, but we start showing it to
 571    /// the user.
 572    Pending,
 573    /// The tool call is waiting for confirmation from the user.
 574    WaitingForConfirmation {
 575        options: PermissionOptions,
 576        respond_tx: oneshot::Sender<SelectedPermissionOutcome>,
 577    },
 578    /// The tool call is currently running.
 579    InProgress,
 580    /// The tool call completed successfully.
 581    Completed,
 582    /// The tool call failed.
 583    Failed,
 584    /// The user rejected the tool call.
 585    Rejected,
 586    /// The user canceled generation so the tool call was canceled.
 587    Canceled,
 588}
 589
 590impl From<acp::ToolCallStatus> for ToolCallStatus {
 591    fn from(status: acp::ToolCallStatus) -> Self {
 592        match status {
 593            acp::ToolCallStatus::Pending => Self::Pending,
 594            acp::ToolCallStatus::InProgress => Self::InProgress,
 595            acp::ToolCallStatus::Completed => Self::Completed,
 596            acp::ToolCallStatus::Failed => Self::Failed,
 597            _ => Self::Pending,
 598        }
 599    }
 600}
 601
 602impl Display for ToolCallStatus {
 603    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
 604        write!(
 605            f,
 606            "{}",
 607            match self {
 608                ToolCallStatus::Pending => "Pending",
 609                ToolCallStatus::WaitingForConfirmation { .. } => "Waiting for confirmation",
 610                ToolCallStatus::InProgress => "In Progress",
 611                ToolCallStatus::Completed => "Completed",
 612                ToolCallStatus::Failed => "Failed",
 613                ToolCallStatus::Rejected => "Rejected",
 614                ToolCallStatus::Canceled => "Canceled",
 615            }
 616        )
 617    }
 618}
 619
 620#[derive(Debug, PartialEq, Clone)]
 621pub enum ContentBlock {
 622    Empty,
 623    Markdown { markdown: Entity<Markdown> },
 624    ResourceLink { resource_link: acp::ResourceLink },
 625    Image { image: Arc<gpui::Image> },
 626}
 627
 628impl ContentBlock {
 629    pub fn new(
 630        block: acp::ContentBlock,
 631        language_registry: &Arc<LanguageRegistry>,
 632        path_style: PathStyle,
 633        cx: &mut App,
 634    ) -> Self {
 635        let mut this = Self::Empty;
 636        this.append(block, language_registry, path_style, cx);
 637        this
 638    }
 639
 640    pub fn new_combined(
 641        blocks: impl IntoIterator<Item = acp::ContentBlock>,
 642        language_registry: Arc<LanguageRegistry>,
 643        path_style: PathStyle,
 644        cx: &mut App,
 645    ) -> Self {
 646        let mut this = Self::Empty;
 647        for block in blocks {
 648            this.append(block, &language_registry, path_style, cx);
 649        }
 650        this
 651    }
 652
 653    pub fn append(
 654        &mut self,
 655        block: acp::ContentBlock,
 656        language_registry: &Arc<LanguageRegistry>,
 657        path_style: PathStyle,
 658        cx: &mut App,
 659    ) {
 660        match (&mut *self, &block) {
 661            (ContentBlock::Empty, acp::ContentBlock::ResourceLink(resource_link)) => {
 662                *self = ContentBlock::ResourceLink {
 663                    resource_link: resource_link.clone(),
 664                };
 665            }
 666            (ContentBlock::Empty, acp::ContentBlock::Image(image_content)) => {
 667                if let Some(image) = Self::decode_image(image_content) {
 668                    *self = ContentBlock::Image { image };
 669                } else {
 670                    let new_content = Self::image_md(image_content);
 671                    *self = Self::create_markdown_block(new_content, language_registry, cx);
 672                }
 673            }
 674            (ContentBlock::Empty, _) => {
 675                let new_content = Self::block_string_contents(&block, path_style);
 676                *self = Self::create_markdown_block(new_content, language_registry, cx);
 677            }
 678            (ContentBlock::Markdown { markdown }, _) => {
 679                let new_content = Self::block_string_contents(&block, path_style);
 680                markdown.update(cx, |markdown, cx| markdown.append(&new_content, cx));
 681            }
 682            (ContentBlock::ResourceLink { resource_link }, _) => {
 683                let existing_content = Self::resource_link_md(&resource_link.uri, path_style);
 684                let new_content = Self::block_string_contents(&block, path_style);
 685                let combined = format!("{}\n{}", existing_content, new_content);
 686                *self = Self::create_markdown_block(combined, language_registry, cx);
 687            }
 688            (ContentBlock::Image { .. }, _) => {
 689                let new_content = Self::block_string_contents(&block, path_style);
 690                let combined = format!("`Image`\n{}", new_content);
 691                *self = Self::create_markdown_block(combined, language_registry, cx);
 692            }
 693        }
 694    }
 695
 696    fn decode_image(image_content: &acp::ImageContent) -> Option<Arc<gpui::Image>> {
 697        use base64::Engine as _;
 698
 699        let bytes = base64::engine::general_purpose::STANDARD
 700            .decode(image_content.data.as_bytes())
 701            .ok()?;
 702        let format = gpui::ImageFormat::from_mime_type(&image_content.mime_type)?;
 703        Some(Arc::new(gpui::Image::from_bytes(format, bytes)))
 704    }
 705
 706    fn create_markdown_block(
 707        content: String,
 708        language_registry: &Arc<LanguageRegistry>,
 709        cx: &mut App,
 710    ) -> ContentBlock {
 711        ContentBlock::Markdown {
 712            markdown: cx
 713                .new(|cx| Markdown::new(content.into(), Some(language_registry.clone()), None, cx)),
 714        }
 715    }
 716
 717    fn block_string_contents(block: &acp::ContentBlock, path_style: PathStyle) -> String {
 718        match block {
 719            acp::ContentBlock::Text(text_content) => text_content.text.clone(),
 720            acp::ContentBlock::ResourceLink(resource_link) => {
 721                Self::resource_link_md(&resource_link.uri, path_style)
 722            }
 723            acp::ContentBlock::Resource(acp::EmbeddedResource {
 724                resource:
 725                    acp::EmbeddedResourceResource::TextResourceContents(acp::TextResourceContents {
 726                        uri,
 727                        ..
 728                    }),
 729                ..
 730            }) => Self::resource_link_md(uri, path_style),
 731            acp::ContentBlock::Image(image) => Self::image_md(image),
 732            _ => String::new(),
 733        }
 734    }
 735
 736    fn resource_link_md(uri: &str, path_style: PathStyle) -> String {
 737        if let Some(uri) = MentionUri::parse(uri, path_style).log_err() {
 738            uri.as_link().to_string()
 739        } else {
 740            uri.to_string()
 741        }
 742    }
 743
 744    fn image_md(_image: &acp::ImageContent) -> String {
 745        "`Image`".into()
 746    }
 747
 748    pub fn to_markdown<'a>(&'a self, cx: &'a App) -> &'a str {
 749        match self {
 750            ContentBlock::Empty => "",
 751            ContentBlock::Markdown { markdown } => markdown.read(cx).source(),
 752            ContentBlock::ResourceLink { resource_link } => &resource_link.uri,
 753            ContentBlock::Image { .. } => "`Image`",
 754        }
 755    }
 756
 757    pub fn markdown(&self) -> Option<&Entity<Markdown>> {
 758        match self {
 759            ContentBlock::Empty => None,
 760            ContentBlock::Markdown { markdown } => Some(markdown),
 761            ContentBlock::ResourceLink { .. } => None,
 762            ContentBlock::Image { .. } => None,
 763        }
 764    }
 765
 766    pub fn resource_link(&self) -> Option<&acp::ResourceLink> {
 767        match self {
 768            ContentBlock::ResourceLink { resource_link } => Some(resource_link),
 769            _ => None,
 770        }
 771    }
 772
 773    pub fn image(&self) -> Option<&Arc<gpui::Image>> {
 774        match self {
 775            ContentBlock::Image { image } => Some(image),
 776            _ => None,
 777        }
 778    }
 779}
 780
 781#[derive(Debug)]
 782pub enum ToolCallContent {
 783    ContentBlock(ContentBlock),
 784    Diff(Entity<Diff>),
 785    Terminal(Entity<Terminal>),
 786}
 787
 788impl ToolCallContent {
 789    pub fn from_acp(
 790        content: acp::ToolCallContent,
 791        language_registry: Arc<LanguageRegistry>,
 792        path_style: PathStyle,
 793        terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
 794        cx: &mut App,
 795    ) -> Result<Option<Self>> {
 796        match content {
 797            acp::ToolCallContent::Content(acp::Content { content, .. }) => {
 798                Ok(Some(Self::ContentBlock(ContentBlock::new(
 799                    content,
 800                    &language_registry,
 801                    path_style,
 802                    cx,
 803                ))))
 804            }
 805            acp::ToolCallContent::Diff(diff) => Ok(Some(Self::Diff(cx.new(|cx| {
 806                Diff::finalized(
 807                    diff.path.to_string_lossy().into_owned(),
 808                    diff.old_text,
 809                    diff.new_text,
 810                    language_registry,
 811                    cx,
 812                )
 813            })))),
 814            acp::ToolCallContent::Terminal(acp::Terminal { terminal_id, .. }) => terminals
 815                .get(&terminal_id)
 816                .cloned()
 817                .map(|terminal| Some(Self::Terminal(terminal)))
 818                .ok_or_else(|| anyhow::anyhow!("Terminal with id `{}` not found", terminal_id)),
 819            _ => Ok(None),
 820        }
 821    }
 822
 823    pub fn update_from_acp(
 824        &mut self,
 825        new: acp::ToolCallContent,
 826        language_registry: Arc<LanguageRegistry>,
 827        path_style: PathStyle,
 828        terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
 829        cx: &mut App,
 830    ) -> Result<bool> {
 831        let needs_update = match (&self, &new) {
 832            (Self::Diff(old_diff), acp::ToolCallContent::Diff(new_diff)) => {
 833                old_diff.read(cx).needs_update(
 834                    new_diff.old_text.as_deref().unwrap_or(""),
 835                    &new_diff.new_text,
 836                    cx,
 837                )
 838            }
 839            _ => true,
 840        };
 841
 842        if let Some(update) = Self::from_acp(new, language_registry, path_style, terminals, cx)? {
 843            if needs_update {
 844                *self = update;
 845            }
 846            Ok(true)
 847        } else {
 848            Ok(false)
 849        }
 850    }
 851
 852    pub fn to_markdown(&self, cx: &App) -> String {
 853        match self {
 854            Self::ContentBlock(content) => content.to_markdown(cx).to_string(),
 855            Self::Diff(diff) => diff.read(cx).to_markdown(cx),
 856            Self::Terminal(terminal) => terminal.read(cx).to_markdown(cx),
 857        }
 858    }
 859
 860    pub fn image(&self) -> Option<&Arc<gpui::Image>> {
 861        match self {
 862            Self::ContentBlock(content) => content.image(),
 863            _ => None,
 864        }
 865    }
 866}
 867
 868#[derive(Debug, PartialEq)]
 869pub enum ToolCallUpdate {
 870    UpdateFields(acp::ToolCallUpdate),
 871    UpdateDiff(ToolCallUpdateDiff),
 872    UpdateTerminal(ToolCallUpdateTerminal),
 873}
 874
 875impl ToolCallUpdate {
 876    fn id(&self) -> &acp::ToolCallId {
 877        match self {
 878            Self::UpdateFields(update) => &update.tool_call_id,
 879            Self::UpdateDiff(diff) => &diff.id,
 880            Self::UpdateTerminal(terminal) => &terminal.id,
 881        }
 882    }
 883}
 884
 885impl From<acp::ToolCallUpdate> for ToolCallUpdate {
 886    fn from(update: acp::ToolCallUpdate) -> Self {
 887        Self::UpdateFields(update)
 888    }
 889}
 890
 891impl From<ToolCallUpdateDiff> for ToolCallUpdate {
 892    fn from(diff: ToolCallUpdateDiff) -> Self {
 893        Self::UpdateDiff(diff)
 894    }
 895}
 896
 897#[derive(Debug, PartialEq)]
 898pub struct ToolCallUpdateDiff {
 899    pub id: acp::ToolCallId,
 900    pub diff: Entity<Diff>,
 901}
 902
 903impl From<ToolCallUpdateTerminal> for ToolCallUpdate {
 904    fn from(terminal: ToolCallUpdateTerminal) -> Self {
 905        Self::UpdateTerminal(terminal)
 906    }
 907}
 908
 909#[derive(Debug, PartialEq)]
 910pub struct ToolCallUpdateTerminal {
 911    pub id: acp::ToolCallId,
 912    pub terminal: Entity<Terminal>,
 913}
 914
 915#[derive(Debug, Default)]
 916pub struct Plan {
 917    pub entries: Vec<PlanEntry>,
 918}
 919
 920#[derive(Debug)]
 921pub struct PlanStats<'a> {
 922    pub in_progress_entry: Option<&'a PlanEntry>,
 923    pub pending: u32,
 924    pub completed: u32,
 925}
 926
 927impl Plan {
 928    pub fn is_empty(&self) -> bool {
 929        self.entries.is_empty()
 930    }
 931
 932    pub fn stats(&self) -> PlanStats<'_> {
 933        let mut stats = PlanStats {
 934            in_progress_entry: None,
 935            pending: 0,
 936            completed: 0,
 937        };
 938
 939        for entry in &self.entries {
 940            match &entry.status {
 941                acp::PlanEntryStatus::Pending => {
 942                    stats.pending += 1;
 943                }
 944                acp::PlanEntryStatus::InProgress => {
 945                    stats.in_progress_entry = stats.in_progress_entry.or(Some(entry));
 946                    stats.pending += 1;
 947                }
 948                acp::PlanEntryStatus::Completed => {
 949                    stats.completed += 1;
 950                }
 951                _ => {}
 952            }
 953        }
 954
 955        stats
 956    }
 957}
 958
 959#[derive(Debug)]
 960pub struct PlanEntry {
 961    pub content: Entity<Markdown>,
 962    pub priority: acp::PlanEntryPriority,
 963    pub status: acp::PlanEntryStatus,
 964}
 965
 966impl PlanEntry {
 967    pub fn from_acp(entry: acp::PlanEntry, cx: &mut App) -> Self {
 968        Self {
 969            content: cx.new(|cx| Markdown::new(entry.content.into(), None, None, cx)),
 970            priority: entry.priority,
 971            status: entry.status,
 972        }
 973    }
 974}
 975
 976#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
 977pub struct TokenUsage {
 978    pub max_tokens: u64,
 979    pub used_tokens: u64,
 980    pub input_tokens: u64,
 981    pub output_tokens: u64,
 982    pub max_output_tokens: Option<u64>,
 983}
 984
 985#[derive(Debug, Clone)]
 986pub struct SessionCost {
 987    pub amount: f64,
 988    pub currency: SharedString,
 989}
 990
 991pub const TOKEN_USAGE_WARNING_THRESHOLD: f32 = 0.8;
 992
 993impl TokenUsage {
 994    pub fn ratio(&self) -> TokenUsageRatio {
 995        #[cfg(debug_assertions)]
 996        let warning_threshold: f32 = std::env::var("ZED_THREAD_WARNING_THRESHOLD")
 997            .unwrap_or(TOKEN_USAGE_WARNING_THRESHOLD.to_string())
 998            .parse()
 999            .unwrap();
1000        #[cfg(not(debug_assertions))]
1001        let warning_threshold: f32 = TOKEN_USAGE_WARNING_THRESHOLD;
1002
1003        // When the maximum is unknown because there is no selected model,
1004        // avoid showing the token limit warning.
1005        if self.max_tokens == 0 {
1006            TokenUsageRatio::Normal
1007        } else if self.used_tokens >= self.max_tokens {
1008            TokenUsageRatio::Exceeded
1009        } else if self.used_tokens as f32 / self.max_tokens as f32 >= warning_threshold {
1010            TokenUsageRatio::Warning
1011        } else {
1012            TokenUsageRatio::Normal
1013        }
1014    }
1015}
1016
1017#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
1018pub enum TokenUsageRatio {
1019    Normal,
1020    Warning,
1021    Exceeded,
1022}
1023
1024#[derive(Debug, Clone)]
1025pub struct RetryStatus {
1026    pub last_error: SharedString,
1027    pub attempt: usize,
1028    pub max_attempts: usize,
1029    pub started_at: Instant,
1030    pub duration: Duration,
1031}
1032
1033struct RunningTurn {
1034    id: u32,
1035    send_task: Task<()>,
1036}
1037
1038pub struct AcpThread {
1039    session_id: acp::SessionId,
1040    work_dirs: Option<PathList>,
1041    parent_session_id: Option<acp::SessionId>,
1042    title: Option<SharedString>,
1043    provisional_title: Option<SharedString>,
1044    entries: Vec<AgentThreadEntry>,
1045    plan: Plan,
1046    project: Entity<Project>,
1047    action_log: Entity<ActionLog>,
1048    shared_buffers: HashMap<Entity<Buffer>, BufferSnapshot>,
1049    turn_id: u32,
1050    running_turn: Option<RunningTurn>,
1051    connection: Rc<dyn AgentConnection>,
1052    token_usage: Option<TokenUsage>,
1053    cost: Option<SessionCost>,
1054    prompt_capabilities: acp::PromptCapabilities,
1055    available_commands: Vec<acp::AvailableCommand>,
1056    _observe_prompt_capabilities: Task<anyhow::Result<()>>,
1057    terminals: HashMap<acp::TerminalId, Entity<Terminal>>,
1058    pending_terminal_output: HashMap<acp::TerminalId, Vec<Vec<u8>>>,
1059    pending_terminal_exit: HashMap<acp::TerminalId, acp::TerminalExitStatus>,
1060    had_error: bool,
1061    /// The user's unsent prompt text, persisted so it can be restored when reloading the thread.
1062    draft_prompt: Option<Vec<acp::ContentBlock>>,
1063    /// The initial scroll position for the thread view, set during session registration.
1064    ui_scroll_position: Option<gpui::ListOffset>,
1065    /// Buffer for smooth text streaming. Holds text that has been received from
1066    /// the model but not yet revealed in the UI. A timer task drains this buffer
1067    /// gradually to create a fluid typing effect instead of choppy chunk-at-a-time
1068    /// updates.
1069    streaming_text_buffer: Option<StreamingTextBuffer>,
1070}
1071
1072struct StreamingTextBuffer {
1073    /// Text received from the model but not yet appended to the Markdown source.
1074    pending: String,
1075    /// The number of bytes to reveal per timer turn.
1076    bytes_to_reveal_per_tick: usize,
1077    /// The Markdown entity being streamed into.
1078    target: Entity<Markdown>,
1079    /// Timer task that periodically moves text from `pending` into `source`.
1080    _reveal_task: Task<()>,
1081}
1082
1083impl StreamingTextBuffer {
1084    /// The number of milliseconds between each timer tick, controlling how quickly
1085    /// text is revealed.
1086    const TASK_UPDATE_MS: u64 = 16;
1087    /// The time in milliseconds to reveal the entire pending text.
1088    const REVEAL_TARGET: f32 = 200.0;
1089}
1090
1091impl From<&AcpThread> for ActionLogTelemetry {
1092    fn from(value: &AcpThread) -> Self {
1093        Self {
1094            agent_telemetry_id: value.connection().telemetry_id(),
1095            session_id: value.session_id.0.clone(),
1096        }
1097    }
1098}
1099
1100#[derive(Debug)]
1101pub enum AcpThreadEvent {
1102    NewEntry,
1103    TitleUpdated,
1104    TokenUsageUpdated,
1105    EntryUpdated(usize),
1106    EntriesRemoved(Range<usize>),
1107    ToolAuthorizationRequested(acp::ToolCallId),
1108    ToolAuthorizationReceived(acp::ToolCallId),
1109    Retry(RetryStatus),
1110    SubagentSpawned(acp::SessionId),
1111    Stopped(acp::StopReason),
1112    Error,
1113    LoadError(LoadError),
1114    PromptCapabilitiesUpdated,
1115    Refusal,
1116    AvailableCommandsUpdated(Vec<acp::AvailableCommand>),
1117    ModeUpdated(acp::SessionModeId),
1118    ConfigOptionsUpdated(Vec<acp::SessionConfigOption>),
1119    WorkingDirectoriesUpdated,
1120}
1121
1122impl EventEmitter<AcpThreadEvent> for AcpThread {}
1123
1124#[derive(Debug, Clone)]
1125pub enum TerminalProviderEvent {
1126    Created {
1127        terminal_id: acp::TerminalId,
1128        label: String,
1129        cwd: Option<PathBuf>,
1130        output_byte_limit: Option<u64>,
1131        terminal: Entity<::terminal::Terminal>,
1132    },
1133    Output {
1134        terminal_id: acp::TerminalId,
1135        data: Vec<u8>,
1136    },
1137    TitleChanged {
1138        terminal_id: acp::TerminalId,
1139        title: String,
1140    },
1141    Exit {
1142        terminal_id: acp::TerminalId,
1143        status: acp::TerminalExitStatus,
1144    },
1145}
1146
1147#[derive(Debug, Clone)]
1148pub enum TerminalProviderCommand {
1149    WriteInput {
1150        terminal_id: acp::TerminalId,
1151        bytes: Vec<u8>,
1152    },
1153    Resize {
1154        terminal_id: acp::TerminalId,
1155        cols: u16,
1156        rows: u16,
1157    },
1158    Close {
1159        terminal_id: acp::TerminalId,
1160    },
1161}
1162
1163#[derive(PartialEq, Eq, Debug)]
1164pub enum ThreadStatus {
1165    Idle,
1166    Generating,
1167}
1168
1169#[derive(Debug, Clone)]
1170pub enum LoadError {
1171    Unsupported {
1172        command: SharedString,
1173        current_version: SharedString,
1174        minimum_version: SharedString,
1175    },
1176    FailedToInstall(SharedString),
1177    Exited {
1178        status: ExitStatus,
1179    },
1180    Other(SharedString),
1181}
1182
1183impl Display for LoadError {
1184    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1185        match self {
1186            LoadError::Unsupported {
1187                command: path,
1188                current_version,
1189                minimum_version,
1190            } => {
1191                write!(
1192                    f,
1193                    "version {current_version} from {path} is not supported (need at least {minimum_version})"
1194                )
1195            }
1196            LoadError::FailedToInstall(msg) => write!(f, "Failed to install: {msg}"),
1197            LoadError::Exited { status } => write!(f, "Server exited with status {status}"),
1198            LoadError::Other(msg) => write!(f, "{msg}"),
1199        }
1200    }
1201}
1202
1203impl Error for LoadError {}
1204
1205impl AcpThread {
1206    pub fn new(
1207        parent_session_id: Option<acp::SessionId>,
1208        title: Option<SharedString>,
1209        work_dirs: Option<PathList>,
1210        connection: Rc<dyn AgentConnection>,
1211        project: Entity<Project>,
1212        action_log: Entity<ActionLog>,
1213        session_id: acp::SessionId,
1214        mut prompt_capabilities_rx: watch::Receiver<acp::PromptCapabilities>,
1215        cx: &mut Context<Self>,
1216    ) -> Self {
1217        let prompt_capabilities = prompt_capabilities_rx.borrow().clone();
1218        let task = cx.spawn::<_, anyhow::Result<()>>(async move |this, cx| {
1219            loop {
1220                let caps = prompt_capabilities_rx.recv().await?;
1221                this.update(cx, |this, cx| {
1222                    this.prompt_capabilities = caps;
1223                    cx.emit(AcpThreadEvent::PromptCapabilitiesUpdated);
1224                })?;
1225            }
1226        });
1227
1228        Self {
1229            parent_session_id,
1230            work_dirs,
1231            action_log,
1232            shared_buffers: Default::default(),
1233            entries: Default::default(),
1234            plan: Default::default(),
1235            title,
1236            provisional_title: None,
1237            project,
1238            running_turn: None,
1239            turn_id: 0,
1240            connection,
1241            session_id,
1242            token_usage: None,
1243            cost: None,
1244            prompt_capabilities,
1245            available_commands: Vec::new(),
1246            _observe_prompt_capabilities: task,
1247            terminals: HashMap::default(),
1248            pending_terminal_output: HashMap::default(),
1249            pending_terminal_exit: HashMap::default(),
1250            had_error: false,
1251            draft_prompt: None,
1252            ui_scroll_position: None,
1253            streaming_text_buffer: None,
1254        }
1255    }
1256
1257    pub fn parent_session_id(&self) -> Option<&acp::SessionId> {
1258        self.parent_session_id.as_ref()
1259    }
1260
1261    pub fn prompt_capabilities(&self) -> acp::PromptCapabilities {
1262        self.prompt_capabilities.clone()
1263    }
1264
1265    pub fn available_commands(&self) -> &[acp::AvailableCommand] {
1266        &self.available_commands
1267    }
1268
1269    pub fn draft_prompt(&self) -> Option<&[acp::ContentBlock]> {
1270        self.draft_prompt.as_deref()
1271    }
1272
1273    pub fn set_draft_prompt(&mut self, prompt: Option<Vec<acp::ContentBlock>>) {
1274        self.draft_prompt = prompt;
1275    }
1276
1277    pub fn ui_scroll_position(&self) -> Option<gpui::ListOffset> {
1278        self.ui_scroll_position
1279    }
1280
1281    pub fn set_ui_scroll_position(&mut self, position: Option<gpui::ListOffset>) {
1282        self.ui_scroll_position = position;
1283    }
1284
1285    pub fn connection(&self) -> &Rc<dyn AgentConnection> {
1286        &self.connection
1287    }
1288
1289    pub fn action_log(&self) -> &Entity<ActionLog> {
1290        &self.action_log
1291    }
1292
1293    pub fn project(&self) -> &Entity<Project> {
1294        &self.project
1295    }
1296
1297    pub fn title(&self) -> Option<SharedString> {
1298        self.title
1299            .clone()
1300            .or_else(|| self.provisional_title.clone())
1301    }
1302
1303    pub fn has_provisional_title(&self) -> bool {
1304        self.provisional_title.is_some()
1305    }
1306
1307    pub fn entries(&self) -> &[AgentThreadEntry] {
1308        &self.entries
1309    }
1310
1311    pub fn session_id(&self) -> &acp::SessionId {
1312        &self.session_id
1313    }
1314
1315    pub fn supports_truncate(&self, cx: &App) -> bool {
1316        self.connection.truncate(&self.session_id, cx).is_some()
1317    }
1318
1319    pub fn work_dirs(&self) -> Option<&PathList> {
1320        self.work_dirs.as_ref()
1321    }
1322
1323    pub fn set_work_dirs(&mut self, work_dirs: PathList, cx: &mut Context<Self>) {
1324        self.work_dirs = Some(work_dirs);
1325        cx.emit(AcpThreadEvent::WorkingDirectoriesUpdated)
1326    }
1327
1328    pub fn status(&self) -> ThreadStatus {
1329        if self.running_turn.is_some() {
1330            ThreadStatus::Generating
1331        } else {
1332            ThreadStatus::Idle
1333        }
1334    }
1335
1336    pub fn had_error(&self) -> bool {
1337        self.had_error
1338    }
1339
1340    pub fn is_waiting_for_confirmation(&self) -> bool {
1341        for entry in self.entries.iter().rev() {
1342            match entry {
1343                AgentThreadEntry::UserMessage(_) => return false,
1344                AgentThreadEntry::ToolCall(ToolCall {
1345                    status: ToolCallStatus::WaitingForConfirmation { .. },
1346                    ..
1347                }) => return true,
1348                AgentThreadEntry::ToolCall(_)
1349                | AgentThreadEntry::AssistantMessage(_)
1350                | AgentThreadEntry::CompletedPlan(_) => {}
1351            }
1352        }
1353        false
1354    }
1355
1356    pub fn token_usage(&self) -> Option<&TokenUsage> {
1357        self.token_usage.as_ref()
1358    }
1359
1360    pub fn cost(&self) -> Option<&SessionCost> {
1361        self.cost.as_ref()
1362    }
1363
1364    pub fn has_pending_edit_tool_calls(&self) -> bool {
1365        for entry in self.entries.iter().rev() {
1366            match entry {
1367                AgentThreadEntry::UserMessage(_) => return false,
1368                AgentThreadEntry::ToolCall(
1369                    call @ ToolCall {
1370                        status: ToolCallStatus::InProgress | ToolCallStatus::Pending,
1371                        ..
1372                    },
1373                ) if call.diffs().next().is_some() => {
1374                    return true;
1375                }
1376                AgentThreadEntry::ToolCall(_)
1377                | AgentThreadEntry::AssistantMessage(_)
1378                | AgentThreadEntry::CompletedPlan(_) => {}
1379            }
1380        }
1381
1382        false
1383    }
1384
1385    pub fn has_in_progress_tool_calls(&self) -> bool {
1386        for entry in self.entries.iter().rev() {
1387            match entry {
1388                AgentThreadEntry::UserMessage(_) => return false,
1389                AgentThreadEntry::ToolCall(ToolCall {
1390                    status: ToolCallStatus::InProgress | ToolCallStatus::Pending,
1391                    ..
1392                }) => {
1393                    return true;
1394                }
1395                AgentThreadEntry::ToolCall(_)
1396                | AgentThreadEntry::AssistantMessage(_)
1397                | AgentThreadEntry::CompletedPlan(_) => {}
1398            }
1399        }
1400
1401        false
1402    }
1403
1404    pub fn used_tools_since_last_user_message(&self) -> bool {
1405        for entry in self.entries.iter().rev() {
1406            match entry {
1407                AgentThreadEntry::UserMessage(..) => return false,
1408                AgentThreadEntry::AssistantMessage(..) | AgentThreadEntry::CompletedPlan(..) => {
1409                    continue;
1410                }
1411                AgentThreadEntry::ToolCall(..) => return true,
1412            }
1413        }
1414
1415        false
1416    }
1417
1418    pub fn handle_session_update(
1419        &mut self,
1420        update: acp::SessionUpdate,
1421        cx: &mut Context<Self>,
1422    ) -> Result<(), acp::Error> {
1423        match update {
1424            acp::SessionUpdate::UserMessageChunk(acp::ContentChunk { content, .. }) => {
1425                // We optimistically add the full user prompt before calling `prompt`.
1426                // Some ACP servers echo user chunks back over updates. Skip the chunk if
1427                // it's already present in the current user message to avoid duplicating content.
1428                let already_in_user_message = self
1429                    .entries
1430                    .last()
1431                    .and_then(|entry| entry.user_message())
1432                    .is_some_and(|message| message.chunks.contains(&content));
1433                if !already_in_user_message {
1434                    self.push_user_content_block(None, content, cx);
1435                }
1436            }
1437            acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk { content, .. }) => {
1438                self.push_assistant_content_block(content, false, cx);
1439            }
1440            acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk { content, .. }) => {
1441                self.push_assistant_content_block(content, true, cx);
1442            }
1443            acp::SessionUpdate::ToolCall(tool_call) => {
1444                self.upsert_tool_call(tool_call, cx)?;
1445            }
1446            acp::SessionUpdate::ToolCallUpdate(tool_call_update) => {
1447                self.update_tool_call(tool_call_update, cx)?;
1448            }
1449            acp::SessionUpdate::Plan(plan) => {
1450                self.update_plan(plan, cx);
1451            }
1452            acp::SessionUpdate::SessionInfoUpdate(info_update) => {
1453                if let acp::MaybeUndefined::Value(title) = info_update.title {
1454                    let had_provisional = self.provisional_title.take().is_some();
1455                    let title: SharedString = title.into();
1456                    if self.title.as_ref() != Some(&title) {
1457                        self.title = Some(title);
1458                        cx.emit(AcpThreadEvent::TitleUpdated);
1459                    } else if had_provisional {
1460                        cx.emit(AcpThreadEvent::TitleUpdated);
1461                    }
1462                }
1463            }
1464            acp::SessionUpdate::AvailableCommandsUpdate(acp::AvailableCommandsUpdate {
1465                available_commands,
1466                ..
1467            }) => {
1468                self.available_commands = available_commands.clone();
1469                cx.emit(AcpThreadEvent::AvailableCommandsUpdated(available_commands));
1470            }
1471            acp::SessionUpdate::CurrentModeUpdate(acp::CurrentModeUpdate {
1472                current_mode_id,
1473                ..
1474            }) => cx.emit(AcpThreadEvent::ModeUpdated(current_mode_id)),
1475            acp::SessionUpdate::ConfigOptionUpdate(acp::ConfigOptionUpdate {
1476                config_options,
1477                ..
1478            }) => cx.emit(AcpThreadEvent::ConfigOptionsUpdated(config_options)),
1479            acp::SessionUpdate::UsageUpdate(update) if cx.has_flag::<AcpBetaFeatureFlag>() => {
1480                let usage = self.token_usage.get_or_insert_with(Default::default);
1481                usage.max_tokens = update.size;
1482                usage.used_tokens = update.used;
1483                if let Some(cost) = update.cost {
1484                    self.cost = Some(SessionCost {
1485                        amount: cost.amount,
1486                        currency: cost.currency.into(),
1487                    });
1488                }
1489                cx.emit(AcpThreadEvent::TokenUsageUpdated);
1490            }
1491            _ => {}
1492        }
1493        Ok(())
1494    }
1495
1496    pub fn push_user_content_block(
1497        &mut self,
1498        message_id: Option<UserMessageId>,
1499        chunk: acp::ContentBlock,
1500        cx: &mut Context<Self>,
1501    ) {
1502        self.push_user_content_block_with_indent(message_id, chunk, false, cx)
1503    }
1504
1505    pub fn push_user_content_block_with_indent(
1506        &mut self,
1507        message_id: Option<UserMessageId>,
1508        chunk: acp::ContentBlock,
1509        indented: bool,
1510        cx: &mut Context<Self>,
1511    ) {
1512        let language_registry = self.project.read(cx).languages().clone();
1513        let path_style = self.project.read(cx).path_style(cx);
1514        let entries_len = self.entries.len();
1515
1516        if let Some(last_entry) = self.entries.last_mut()
1517            && let AgentThreadEntry::UserMessage(UserMessage {
1518                id,
1519                content,
1520                chunks,
1521                indented: existing_indented,
1522                ..
1523            }) = last_entry
1524            && *existing_indented == indented
1525        {
1526            Self::flush_streaming_text(&mut self.streaming_text_buffer, cx);
1527            *id = message_id.or(id.take());
1528            content.append(chunk.clone(), &language_registry, path_style, cx);
1529            chunks.push(chunk);
1530            let idx = entries_len - 1;
1531            cx.emit(AcpThreadEvent::EntryUpdated(idx));
1532        } else {
1533            let content = ContentBlock::new(chunk.clone(), &language_registry, path_style, cx);
1534            self.push_entry(
1535                AgentThreadEntry::UserMessage(UserMessage {
1536                    id: message_id,
1537                    content,
1538                    chunks: vec![chunk],
1539                    checkpoint: None,
1540                    indented,
1541                }),
1542                cx,
1543            );
1544        }
1545    }
1546
1547    pub fn push_assistant_content_block(
1548        &mut self,
1549        chunk: acp::ContentBlock,
1550        is_thought: bool,
1551        cx: &mut Context<Self>,
1552    ) {
1553        self.push_assistant_content_block_with_indent(chunk, is_thought, false, cx)
1554    }
1555
1556    pub fn push_assistant_content_block_with_indent(
1557        &mut self,
1558        chunk: acp::ContentBlock,
1559        is_thought: bool,
1560        indented: bool,
1561        cx: &mut Context<Self>,
1562    ) {
1563        let path_style = self.project.read(cx).path_style(cx);
1564
1565        // For text chunks going to an existing Markdown block, buffer for smooth
1566        // streaming instead of appending all at once which may feel more choppy.
1567        if let acp::ContentBlock::Text(text_content) = &chunk {
1568            if let Some(markdown) = self.streaming_markdown_target(is_thought, indented) {
1569                let entries_len = self.entries.len();
1570                cx.emit(AcpThreadEvent::EntryUpdated(entries_len - 1));
1571                self.buffer_streaming_text(&markdown, text_content.text.clone(), cx);
1572                return;
1573            }
1574        }
1575
1576        let language_registry = self.project.read(cx).languages().clone();
1577        let entries_len = self.entries.len();
1578        if let Some(last_entry) = self.entries.last_mut()
1579            && let AgentThreadEntry::AssistantMessage(AssistantMessage {
1580                chunks,
1581                indented: existing_indented,
1582                is_subagent_output: _,
1583            }) = last_entry
1584            && *existing_indented == indented
1585        {
1586            let idx = entries_len - 1;
1587            Self::flush_streaming_text(&mut self.streaming_text_buffer, cx);
1588            cx.emit(AcpThreadEvent::EntryUpdated(idx));
1589            match (chunks.last_mut(), is_thought) {
1590                (Some(AssistantMessageChunk::Message { block }), false)
1591                | (Some(AssistantMessageChunk::Thought { block }), true) => {
1592                    block.append(chunk, &language_registry, path_style, cx)
1593                }
1594                _ => {
1595                    let block = ContentBlock::new(chunk, &language_registry, path_style, cx);
1596                    if is_thought {
1597                        chunks.push(AssistantMessageChunk::Thought { block })
1598                    } else {
1599                        chunks.push(AssistantMessageChunk::Message { block })
1600                    }
1601                }
1602            }
1603        } else {
1604            let block = ContentBlock::new(chunk, &language_registry, path_style, cx);
1605            let chunk = if is_thought {
1606                AssistantMessageChunk::Thought { block }
1607            } else {
1608                AssistantMessageChunk::Message { block }
1609            };
1610
1611            self.push_entry(
1612                AgentThreadEntry::AssistantMessage(AssistantMessage {
1613                    chunks: vec![chunk],
1614                    indented,
1615                    is_subagent_output: false,
1616                }),
1617                cx,
1618            );
1619        }
1620    }
1621
1622    fn streaming_markdown_target(
1623        &self,
1624        is_thought: bool,
1625        indented: bool,
1626    ) -> Option<Entity<Markdown>> {
1627        let last_entry = self.entries.last()?;
1628        if let AgentThreadEntry::AssistantMessage(AssistantMessage {
1629            chunks,
1630            indented: existing_indented,
1631            ..
1632        }) = last_entry
1633            && *existing_indented == indented
1634            && let [.., chunk] = chunks.as_slice()
1635        {
1636            match (chunk, is_thought) {
1637                (
1638                    AssistantMessageChunk::Message {
1639                        block: ContentBlock::Markdown { markdown },
1640                    },
1641                    false,
1642                )
1643                | (
1644                    AssistantMessageChunk::Thought {
1645                        block: ContentBlock::Markdown { markdown },
1646                    },
1647                    true,
1648                ) => Some(markdown.clone()),
1649                _ => None,
1650            }
1651        } else {
1652            None
1653        }
1654    }
1655
1656    /// Add text to the streaming buffer. If the target changed (e.g. switching
1657    /// from thoughts to message text), flush the old buffer first.
1658    fn buffer_streaming_text(
1659        &mut self,
1660        markdown: &Entity<Markdown>,
1661        text: String,
1662        cx: &mut Context<Self>,
1663    ) {
1664        if let Some(buffer) = &mut self.streaming_text_buffer {
1665            if buffer.target.entity_id() == markdown.entity_id() {
1666                buffer.pending.push_str(&text);
1667
1668                buffer.bytes_to_reveal_per_tick = (buffer.pending.len() as f32
1669                    / StreamingTextBuffer::REVEAL_TARGET
1670                    * StreamingTextBuffer::TASK_UPDATE_MS as f32)
1671                    .ceil() as usize;
1672                return;
1673            }
1674            Self::flush_streaming_text(&mut self.streaming_text_buffer, cx);
1675        }
1676
1677        let target = markdown.clone();
1678        let _reveal_task = self.start_streaming_reveal(cx);
1679        let pending_len = text.len();
1680        let bytes_to_reveal = (pending_len as f32 / StreamingTextBuffer::REVEAL_TARGET
1681            * StreamingTextBuffer::TASK_UPDATE_MS as f32)
1682            .ceil() as usize;
1683        self.streaming_text_buffer = Some(StreamingTextBuffer {
1684            pending: text,
1685            bytes_to_reveal_per_tick: bytes_to_reveal,
1686            target,
1687            _reveal_task,
1688        });
1689    }
1690
1691    /// Flush all buffered streaming text into the Markdown entity immediately.
1692    fn flush_streaming_text(
1693        streaming_text_buffer: &mut Option<StreamingTextBuffer>,
1694        cx: &mut Context<Self>,
1695    ) {
1696        if let Some(buffer) = streaming_text_buffer.take() {
1697            if !buffer.pending.is_empty() {
1698                buffer
1699                    .target
1700                    .update(cx, |markdown, cx| markdown.append(&buffer.pending, cx));
1701            }
1702        }
1703    }
1704
1705    /// Spawns a foreground task that periodically drains
1706    /// `streaming_text_buffer.pending` into the target `Markdown` entity,
1707    /// producing smooth, continuous text output.
1708    fn start_streaming_reveal(&self, cx: &mut Context<Self>) -> Task<()> {
1709        cx.spawn(async move |this, cx| {
1710            loop {
1711                cx.background_executor()
1712                    .timer(Duration::from_millis(StreamingTextBuffer::TASK_UPDATE_MS))
1713                    .await;
1714
1715                let should_continue = this
1716                    .update(cx, |this, cx| {
1717                        let Some(buffer) = &mut this.streaming_text_buffer else {
1718                            return false;
1719                        };
1720
1721                        if buffer.pending.is_empty() {
1722                            return true;
1723                        }
1724
1725                        let pending_len = buffer.pending.len();
1726
1727                        let byte_boundary = buffer
1728                            .pending
1729                            .ceil_char_boundary(buffer.bytes_to_reveal_per_tick)
1730                            .min(pending_len);
1731
1732                        buffer.target.update(cx, |markdown: &mut Markdown, cx| {
1733                            markdown.append(&buffer.pending[..byte_boundary], cx);
1734                            buffer.pending.drain(..byte_boundary);
1735                        });
1736
1737                        true
1738                    })
1739                    .unwrap_or(false);
1740
1741                if !should_continue {
1742                    break;
1743                }
1744            }
1745        })
1746    }
1747
1748    fn push_entry(&mut self, entry: AgentThreadEntry, cx: &mut Context<Self>) {
1749        Self::flush_streaming_text(&mut self.streaming_text_buffer, cx);
1750        self.entries.push(entry);
1751        cx.emit(AcpThreadEvent::NewEntry);
1752    }
1753
1754    pub fn can_set_title(&mut self, cx: &mut Context<Self>) -> bool {
1755        self.connection.set_title(&self.session_id, cx).is_some()
1756    }
1757
1758    pub fn set_title(&mut self, title: SharedString, cx: &mut Context<Self>) -> Task<Result<()>> {
1759        let had_provisional = self.provisional_title.take().is_some();
1760        if self.title.as_ref() != Some(&title) {
1761            self.title = Some(title.clone());
1762            cx.emit(AcpThreadEvent::TitleUpdated);
1763            if let Some(set_title) = self.connection.set_title(&self.session_id, cx) {
1764                return set_title.run(title, cx);
1765            }
1766        } else if had_provisional {
1767            cx.emit(AcpThreadEvent::TitleUpdated);
1768        }
1769        Task::ready(Ok(()))
1770    }
1771
1772    /// Sets a provisional display title without propagating back to the
1773    /// underlying agent connection. This is used for quick preview titles
1774    /// (e.g. first 20 chars of the user message) that should be shown
1775    /// immediately but replaced once the LLM generates a proper title via
1776    /// `set_title`.
1777    pub fn set_provisional_title(&mut self, title: SharedString, cx: &mut Context<Self>) {
1778        self.provisional_title = Some(title);
1779        cx.emit(AcpThreadEvent::TitleUpdated);
1780    }
1781
1782    pub fn subagent_spawned(&mut self, session_id: acp::SessionId, cx: &mut Context<Self>) {
1783        cx.emit(AcpThreadEvent::SubagentSpawned(session_id));
1784    }
1785
1786    pub fn update_token_usage(&mut self, usage: Option<TokenUsage>, cx: &mut Context<Self>) {
1787        if usage.is_none() {
1788            self.cost = None;
1789        }
1790        self.token_usage = usage;
1791        cx.emit(AcpThreadEvent::TokenUsageUpdated);
1792    }
1793
1794    pub fn update_retry_status(&mut self, status: RetryStatus, cx: &mut Context<Self>) {
1795        cx.emit(AcpThreadEvent::Retry(status));
1796    }
1797
1798    pub fn update_tool_call(
1799        &mut self,
1800        update: impl Into<ToolCallUpdate>,
1801        cx: &mut Context<Self>,
1802    ) -> Result<()> {
1803        let update = update.into();
1804        let languages = self.project.read(cx).languages().clone();
1805        let path_style = self.project.read(cx).path_style(cx);
1806
1807        let ix = match self.index_for_tool_call(update.id()) {
1808            Some(ix) => ix,
1809            None => {
1810                // Tool call not found - create a failed tool call entry
1811                let failed_tool_call = ToolCall {
1812                    id: update.id().clone(),
1813                    label: cx.new(|cx| Markdown::new("Tool call not found".into(), None, None, cx)),
1814                    kind: acp::ToolKind::Fetch,
1815                    content: vec![ToolCallContent::ContentBlock(ContentBlock::new(
1816                        "Tool call not found".into(),
1817                        &languages,
1818                        path_style,
1819                        cx,
1820                    ))],
1821                    status: ToolCallStatus::Failed,
1822                    locations: Vec::new(),
1823                    resolved_locations: Vec::new(),
1824                    raw_input: None,
1825                    raw_input_markdown: None,
1826                    raw_output: None,
1827                    tool_name: None,
1828                    subagent_session_info: None,
1829                };
1830                self.push_entry(AgentThreadEntry::ToolCall(failed_tool_call), cx);
1831                return Ok(());
1832            }
1833        };
1834        let AgentThreadEntry::ToolCall(call) = &mut self.entries[ix] else {
1835            unreachable!()
1836        };
1837
1838        match update {
1839            ToolCallUpdate::UpdateFields(update) => {
1840                let location_updated = update.fields.locations.is_some();
1841                call.update_fields(
1842                    update.fields,
1843                    update.meta,
1844                    languages,
1845                    path_style,
1846                    &self.terminals,
1847                    cx,
1848                )?;
1849                if location_updated {
1850                    self.resolve_locations(update.tool_call_id, cx);
1851                }
1852            }
1853            ToolCallUpdate::UpdateDiff(update) => {
1854                call.content.clear();
1855                call.content.push(ToolCallContent::Diff(update.diff));
1856            }
1857            ToolCallUpdate::UpdateTerminal(update) => {
1858                call.content.clear();
1859                call.content
1860                    .push(ToolCallContent::Terminal(update.terminal));
1861            }
1862        }
1863
1864        cx.emit(AcpThreadEvent::EntryUpdated(ix));
1865
1866        Ok(())
1867    }
1868
1869    /// Updates a tool call if id matches an existing entry, otherwise inserts a new one.
1870    pub fn upsert_tool_call(
1871        &mut self,
1872        tool_call: acp::ToolCall,
1873        cx: &mut Context<Self>,
1874    ) -> Result<(), acp::Error> {
1875        let status = tool_call.status.into();
1876        self.upsert_tool_call_inner(tool_call.into(), status, cx)
1877    }
1878
1879    /// Fails if id does not match an existing entry.
1880    pub fn upsert_tool_call_inner(
1881        &mut self,
1882        update: acp::ToolCallUpdate,
1883        status: ToolCallStatus,
1884        cx: &mut Context<Self>,
1885    ) -> Result<(), acp::Error> {
1886        let language_registry = self.project.read(cx).languages().clone();
1887        let path_style = self.project.read(cx).path_style(cx);
1888        let id = update.tool_call_id.clone();
1889
1890        let agent_telemetry_id = self.connection().telemetry_id();
1891        let session = self.session_id();
1892        let parent_session_id = self.parent_session_id();
1893        if let ToolCallStatus::Completed | ToolCallStatus::Failed = status {
1894            let status = if matches!(status, ToolCallStatus::Completed) {
1895                "completed"
1896            } else {
1897                "failed"
1898            };
1899            telemetry::event!(
1900                "Agent Tool Call Completed",
1901                agent_telemetry_id,
1902                session,
1903                parent_session_id,
1904                status
1905            );
1906        }
1907
1908        if let Some(ix) = self.index_for_tool_call(&id) {
1909            let AgentThreadEntry::ToolCall(call) = &mut self.entries[ix] else {
1910                unreachable!()
1911            };
1912
1913            call.update_fields(
1914                update.fields,
1915                update.meta,
1916                language_registry,
1917                path_style,
1918                &self.terminals,
1919                cx,
1920            )?;
1921            call.status = status;
1922
1923            cx.emit(AcpThreadEvent::EntryUpdated(ix));
1924        } else {
1925            let call = ToolCall::from_acp(
1926                update.try_into()?,
1927                status,
1928                language_registry,
1929                self.project.read(cx).path_style(cx),
1930                &self.terminals,
1931                cx,
1932            )?;
1933            self.push_entry(AgentThreadEntry::ToolCall(call), cx);
1934        };
1935
1936        self.resolve_locations(id, cx);
1937        Ok(())
1938    }
1939
1940    fn index_for_tool_call(&self, id: &acp::ToolCallId) -> Option<usize> {
1941        self.entries
1942            .iter()
1943            .enumerate()
1944            .rev()
1945            .find_map(|(index, entry)| {
1946                if let AgentThreadEntry::ToolCall(tool_call) = entry
1947                    && &tool_call.id == id
1948                {
1949                    Some(index)
1950                } else {
1951                    None
1952                }
1953            })
1954    }
1955
1956    fn tool_call_mut(&mut self, id: &acp::ToolCallId) -> Option<(usize, &mut ToolCall)> {
1957        // The tool call we are looking for is typically the last one, or very close to the end.
1958        // At the moment, it doesn't seem like a hashmap would be a good fit for this use case.
1959        self.entries
1960            .iter_mut()
1961            .enumerate()
1962            .rev()
1963            .find_map(|(index, tool_call)| {
1964                if let AgentThreadEntry::ToolCall(tool_call) = tool_call
1965                    && &tool_call.id == id
1966                {
1967                    Some((index, tool_call))
1968                } else {
1969                    None
1970                }
1971            })
1972    }
1973
1974    pub fn tool_call(&self, id: &acp::ToolCallId) -> Option<(usize, &ToolCall)> {
1975        self.entries
1976            .iter()
1977            .enumerate()
1978            .rev()
1979            .find_map(|(index, tool_call)| {
1980                if let AgentThreadEntry::ToolCall(tool_call) = tool_call
1981                    && &tool_call.id == id
1982                {
1983                    Some((index, tool_call))
1984                } else {
1985                    None
1986                }
1987            })
1988    }
1989
1990    pub fn tool_call_for_subagent(&self, session_id: &acp::SessionId) -> Option<&ToolCall> {
1991        self.entries.iter().find_map(|entry| match entry {
1992            AgentThreadEntry::ToolCall(tool_call) => {
1993                if let Some(subagent_session_info) = &tool_call.subagent_session_info
1994                    && &subagent_session_info.session_id == session_id
1995                {
1996                    Some(tool_call)
1997                } else {
1998                    None
1999                }
2000            }
2001            _ => None,
2002        })
2003    }
2004
2005    pub fn resolve_locations(&mut self, id: acp::ToolCallId, cx: &mut Context<Self>) {
2006        let project = self.project.clone();
2007        let should_update_agent_location = self.parent_session_id.is_none();
2008        let Some((_, tool_call)) = self.tool_call_mut(&id) else {
2009            return;
2010        };
2011        let task = tool_call.resolve_locations(project, cx);
2012        cx.spawn(async move |this, cx| {
2013            let resolved_locations = task.await;
2014
2015            this.update(cx, |this, cx| {
2016                let project = this.project.clone();
2017
2018                for location in resolved_locations.iter().flatten() {
2019                    this.shared_buffers
2020                        .insert(location.buffer.clone(), location.buffer.read(cx).snapshot());
2021                }
2022                let Some((ix, tool_call)) = this.tool_call_mut(&id) else {
2023                    return;
2024                };
2025
2026                if let Some(Some(location)) = resolved_locations.last() {
2027                    project.update(cx, |project, cx| {
2028                        let should_ignore = if let Some(agent_location) = project
2029                            .agent_location()
2030                            .filter(|agent_location| agent_location.buffer == location.buffer)
2031                        {
2032                            let snapshot = location.buffer.read(cx).snapshot();
2033                            let old_position = agent_location.position.to_point(&snapshot);
2034                            let new_position = location.position.to_point(&snapshot);
2035
2036                            // ignore this so that when we get updates from the edit tool
2037                            // the position doesn't reset to the startof line
2038                            old_position.row == new_position.row
2039                                && old_position.column > new_position.column
2040                        } else {
2041                            false
2042                        };
2043                        if !should_ignore && should_update_agent_location {
2044                            project.set_agent_location(Some(location.into()), cx);
2045                        }
2046                    });
2047                }
2048
2049                let resolved_locations = resolved_locations
2050                    .iter()
2051                    .map(|l| l.as_ref().map(|l| AgentLocation::from(l)))
2052                    .collect::<Vec<_>>();
2053
2054                if tool_call.resolved_locations != resolved_locations {
2055                    tool_call.resolved_locations = resolved_locations;
2056                    cx.emit(AcpThreadEvent::EntryUpdated(ix));
2057                }
2058            })
2059        })
2060        .detach();
2061    }
2062
2063    pub fn request_tool_call_authorization(
2064        &mut self,
2065        tool_call: acp::ToolCallUpdate,
2066        options: PermissionOptions,
2067        cx: &mut Context<Self>,
2068    ) -> Result<Task<RequestPermissionOutcome>> {
2069        let (tx, rx) = oneshot::channel();
2070
2071        let status = ToolCallStatus::WaitingForConfirmation {
2072            options,
2073            respond_tx: tx,
2074        };
2075
2076        let tool_call_id = tool_call.tool_call_id.clone();
2077        self.upsert_tool_call_inner(tool_call, status, cx)?;
2078        cx.emit(AcpThreadEvent::ToolAuthorizationRequested(
2079            tool_call_id.clone(),
2080        ));
2081
2082        Ok(cx.spawn(async move |this, cx| {
2083            let outcome = match rx.await {
2084                Ok(outcome) => RequestPermissionOutcome::Selected(outcome),
2085                Err(oneshot::Canceled) => RequestPermissionOutcome::Cancelled,
2086            };
2087            this.update(cx, |_this, cx| {
2088                cx.emit(AcpThreadEvent::ToolAuthorizationReceived(tool_call_id))
2089            })
2090            .ok();
2091            outcome
2092        }))
2093    }
2094
2095    pub fn authorize_tool_call(
2096        &mut self,
2097        id: acp::ToolCallId,
2098        outcome: SelectedPermissionOutcome,
2099        cx: &mut Context<Self>,
2100    ) {
2101        let Some((ix, call)) = self.tool_call_mut(&id) else {
2102            return;
2103        };
2104
2105        let new_status = match outcome.option_kind {
2106            acp::PermissionOptionKind::RejectOnce | acp::PermissionOptionKind::RejectAlways => {
2107                ToolCallStatus::Rejected
2108            }
2109            acp::PermissionOptionKind::AllowOnce | acp::PermissionOptionKind::AllowAlways => {
2110                ToolCallStatus::InProgress
2111            }
2112            _ => ToolCallStatus::InProgress,
2113        };
2114
2115        let curr_status = mem::replace(&mut call.status, new_status);
2116
2117        if let ToolCallStatus::WaitingForConfirmation { respond_tx, .. } = curr_status {
2118            respond_tx.send(outcome).log_err();
2119        } else if cfg!(debug_assertions) {
2120            panic!("tried to authorize an already authorized tool call");
2121        }
2122
2123        cx.emit(AcpThreadEvent::EntryUpdated(ix));
2124    }
2125
2126    pub fn plan(&self) -> &Plan {
2127        &self.plan
2128    }
2129
2130    pub fn update_plan(&mut self, request: acp::Plan, cx: &mut Context<Self>) {
2131        let new_entries_len = request.entries.len();
2132        let mut new_entries = request.entries.into_iter();
2133
2134        // Reuse existing markdown to prevent flickering
2135        for (old, new) in self.plan.entries.iter_mut().zip(new_entries.by_ref()) {
2136            let PlanEntry {
2137                content,
2138                priority,
2139                status,
2140            } = old;
2141            content.update(cx, |old, cx| {
2142                old.replace(new.content, cx);
2143            });
2144            *priority = new.priority;
2145            *status = new.status;
2146        }
2147        for new in new_entries {
2148            self.plan.entries.push(PlanEntry::from_acp(new, cx))
2149        }
2150        self.plan.entries.truncate(new_entries_len);
2151
2152        cx.notify();
2153    }
2154
2155    pub fn snapshot_completed_plan(&mut self, cx: &mut Context<Self>) {
2156        if !self.plan.is_empty() && self.plan.stats().pending == 0 {
2157            let completed_entries = std::mem::take(&mut self.plan.entries);
2158            self.push_entry(AgentThreadEntry::CompletedPlan(completed_entries), cx);
2159        }
2160    }
2161
2162    fn clear_completed_plan_entries(&mut self, cx: &mut Context<Self>) {
2163        self.plan
2164            .entries
2165            .retain(|entry| !matches!(entry.status, acp::PlanEntryStatus::Completed));
2166        cx.notify();
2167    }
2168
2169    pub fn clear_plan(&mut self, cx: &mut Context<Self>) {
2170        self.plan.entries.clear();
2171        cx.notify();
2172    }
2173
2174    #[cfg(any(test, feature = "test-support"))]
2175    pub fn send_raw(
2176        &mut self,
2177        message: &str,
2178        cx: &mut Context<Self>,
2179    ) -> BoxFuture<'static, Result<Option<acp::PromptResponse>>> {
2180        self.send(vec![message.into()], cx)
2181    }
2182
2183    pub fn send(
2184        &mut self,
2185        message: Vec<acp::ContentBlock>,
2186        cx: &mut Context<Self>,
2187    ) -> BoxFuture<'static, Result<Option<acp::PromptResponse>>> {
2188        let block = ContentBlock::new_combined(
2189            message.clone(),
2190            self.project.read(cx).languages().clone(),
2191            self.project.read(cx).path_style(cx),
2192            cx,
2193        );
2194        let request = acp::PromptRequest::new(self.session_id.clone(), message.clone());
2195        let git_store = self.project.read(cx).git_store().clone();
2196
2197        let message_id = UserMessageId::new();
2198
2199        self.run_turn(cx, async move |this, cx| {
2200            this.update(cx, |this, cx| {
2201                this.push_entry(
2202                    AgentThreadEntry::UserMessage(UserMessage {
2203                        id: Some(message_id.clone()),
2204                        content: block,
2205                        chunks: message,
2206                        checkpoint: None,
2207                        indented: false,
2208                    }),
2209                    cx,
2210                );
2211            })
2212            .ok();
2213
2214            let old_checkpoint = git_store
2215                .update(cx, |git, cx| git.checkpoint(cx))
2216                .await
2217                .context("failed to get old checkpoint")
2218                .log_err();
2219            this.update(cx, |this, cx| {
2220                if let Some((_ix, message)) = this.last_user_message() {
2221                    message.checkpoint = old_checkpoint.map(|git_checkpoint| Checkpoint {
2222                        git_checkpoint,
2223                        show: false,
2224                    });
2225                }
2226                this.connection.prompt(message_id, request, cx)
2227            })?
2228            .await
2229        })
2230    }
2231
2232    pub fn can_retry(&self, cx: &App) -> bool {
2233        self.connection.retry(&self.session_id, cx).is_some()
2234    }
2235
2236    pub fn retry(
2237        &mut self,
2238        cx: &mut Context<Self>,
2239    ) -> BoxFuture<'static, Result<Option<acp::PromptResponse>>> {
2240        self.run_turn(cx, async move |this, cx| {
2241            this.update(cx, |this, cx| {
2242                this.connection
2243                    .retry(&this.session_id, cx)
2244                    .map(|retry| retry.run(cx))
2245            })?
2246            .context("retrying a session is not supported")?
2247            .await
2248        })
2249    }
2250
2251    fn run_turn(
2252        &mut self,
2253        cx: &mut Context<Self>,
2254        f: impl 'static + AsyncFnOnce(WeakEntity<Self>, &mut AsyncApp) -> Result<acp::PromptResponse>,
2255    ) -> BoxFuture<'static, Result<Option<acp::PromptResponse>>> {
2256        self.clear_completed_plan_entries(cx);
2257        self.had_error = false;
2258
2259        let (tx, rx) = oneshot::channel();
2260        let cancel_task = self.cancel(cx);
2261
2262        self.turn_id += 1;
2263        let turn_id = self.turn_id;
2264        self.running_turn = Some(RunningTurn {
2265            id: turn_id,
2266            send_task: cx.spawn(async move |this, cx| {
2267                cancel_task.await;
2268                tx.send(f(this, cx).await).ok();
2269            }),
2270        });
2271
2272        cx.spawn(async move |this, cx| {
2273            let response = rx.await;
2274
2275            this.update(cx, |this, cx| this.update_last_checkpoint(cx))?
2276                .await?;
2277
2278            this.update(cx, |this, cx| {
2279                if this.parent_session_id.is_none() {
2280                    this.project
2281                        .update(cx, |project, cx| project.set_agent_location(None, cx));
2282                }
2283                let Ok(response) = response else {
2284                    // tx dropped, just return
2285                    return Ok(None);
2286                };
2287
2288                let is_same_turn = this
2289                    .running_turn
2290                    .as_ref()
2291                    .is_some_and(|turn| turn_id == turn.id);
2292
2293                // If the user submitted a follow up message, running_turn might
2294                // already point to a different turn. Therefore we only want to
2295                // take the task if it's the same turn.
2296                if is_same_turn {
2297                    this.running_turn.take();
2298                }
2299
2300                match response {
2301                    Ok(r) => {
2302                        Self::flush_streaming_text(&mut this.streaming_text_buffer, cx);
2303
2304                        if r.stop_reason == acp::StopReason::MaxTokens {
2305                            this.had_error = true;
2306                            cx.emit(AcpThreadEvent::Error);
2307                            log::error!("Max tokens reached. Usage: {:?}", this.token_usage);
2308
2309                            let exceeded_max_output_tokens =
2310                                this.token_usage.as_ref().is_some_and(|u| {
2311                                    u.max_output_tokens
2312                                        .is_some_and(|max| u.output_tokens >= max)
2313                                });
2314
2315                            if exceeded_max_output_tokens {
2316                                log::error!(
2317                                    "Max output tokens reached. Usage: {:?}",
2318                                    this.token_usage
2319                                );
2320                            } else {
2321                                log::error!("Max tokens reached. Usage: {:?}", this.token_usage);
2322                            }
2323                            return Err(anyhow!(MaxOutputTokensError));
2324                        }
2325
2326                        let canceled = matches!(r.stop_reason, acp::StopReason::Cancelled);
2327                        if canceled {
2328                            this.mark_pending_tools_as_canceled();
2329                        }
2330
2331                        if !canceled {
2332                            this.snapshot_completed_plan(cx);
2333                        }
2334
2335                        // Handle refusal - distinguish between user prompt and tool call refusals
2336                        if let acp::StopReason::Refusal = r.stop_reason {
2337                            this.had_error = true;
2338                            if let Some((user_msg_ix, _)) = this.last_user_message() {
2339                                // Check if there's a completed tool call with results after the last user message
2340                                // This indicates the refusal is in response to tool output, not the user's prompt
2341                                let has_completed_tool_call_after_user_msg =
2342                                    this.entries.iter().skip(user_msg_ix + 1).any(|entry| {
2343                                        if let AgentThreadEntry::ToolCall(tool_call) = entry {
2344                                            // Check if the tool call has completed and has output
2345                                            matches!(tool_call.status, ToolCallStatus::Completed)
2346                                                && tool_call.raw_output.is_some()
2347                                        } else {
2348                                            false
2349                                        }
2350                                    });
2351
2352                                if has_completed_tool_call_after_user_msg {
2353                                    // Refusal is due to tool output - don't truncate, just notify
2354                                    // The model refused based on what the tool returned
2355                                    cx.emit(AcpThreadEvent::Refusal);
2356                                } else {
2357                                    // User prompt was refused - truncate back to before the user message
2358                                    let range = user_msg_ix..this.entries.len();
2359                                    if range.start < range.end {
2360                                        this.entries.truncate(user_msg_ix);
2361                                        cx.emit(AcpThreadEvent::EntriesRemoved(range));
2362                                    }
2363                                    cx.emit(AcpThreadEvent::Refusal);
2364                                }
2365                            } else {
2366                                // No user message found, treat as general refusal
2367                                cx.emit(AcpThreadEvent::Refusal);
2368                            }
2369                        }
2370
2371                        if cx.has_flag::<AcpBetaFeatureFlag>()
2372                            && let Some(response_usage) = &r.usage
2373                        {
2374                            let usage = this.token_usage.get_or_insert_with(Default::default);
2375                            usage.input_tokens = response_usage.input_tokens;
2376                            usage.output_tokens = response_usage.output_tokens;
2377                            cx.emit(AcpThreadEvent::TokenUsageUpdated);
2378                        }
2379
2380                        cx.emit(AcpThreadEvent::Stopped(r.stop_reason));
2381                        Ok(Some(r))
2382                    }
2383                    Err(e) => {
2384                        Self::flush_streaming_text(&mut this.streaming_text_buffer, cx);
2385
2386                        this.had_error = true;
2387                        cx.emit(AcpThreadEvent::Error);
2388                        log::error!("Error in run turn: {:?}", e);
2389                        Err(e)
2390                    }
2391                }
2392            })?
2393        })
2394        .boxed()
2395    }
2396
2397    pub fn cancel(&mut self, cx: &mut Context<Self>) -> Task<()> {
2398        let Some(turn) = self.running_turn.take() else {
2399            return Task::ready(());
2400        };
2401        self.connection.cancel(&self.session_id, cx);
2402
2403        Self::flush_streaming_text(&mut self.streaming_text_buffer, cx);
2404        self.mark_pending_tools_as_canceled();
2405
2406        // Wait for the send task to complete
2407        cx.background_spawn(turn.send_task)
2408    }
2409
2410    fn mark_pending_tools_as_canceled(&mut self) {
2411        for entry in self.entries.iter_mut() {
2412            if let AgentThreadEntry::ToolCall(call) = entry {
2413                let cancel = matches!(
2414                    call.status,
2415                    ToolCallStatus::Pending
2416                        | ToolCallStatus::WaitingForConfirmation { .. }
2417                        | ToolCallStatus::InProgress
2418                );
2419
2420                if cancel {
2421                    call.status = ToolCallStatus::Canceled;
2422                }
2423            }
2424        }
2425    }
2426
2427    /// Restores the git working tree to the state at the given checkpoint (if one exists)
2428    pub fn restore_checkpoint(
2429        &mut self,
2430        id: UserMessageId,
2431        cx: &mut Context<Self>,
2432    ) -> Task<Result<()>> {
2433        let Some((_, message)) = self.user_message_mut(&id) else {
2434            return Task::ready(Err(anyhow!("message not found")));
2435        };
2436
2437        let checkpoint = message
2438            .checkpoint
2439            .as_ref()
2440            .map(|c| c.git_checkpoint.clone());
2441
2442        // Cancel any in-progress generation before restoring
2443        let cancel_task = self.cancel(cx);
2444        let rewind = self.rewind(id.clone(), cx);
2445        let git_store = self.project.read(cx).git_store().clone();
2446
2447        cx.spawn(async move |_, cx| {
2448            cancel_task.await;
2449            rewind.await?;
2450            if let Some(checkpoint) = checkpoint {
2451                git_store
2452                    .update(cx, |git, cx| git.restore_checkpoint(checkpoint, cx))
2453                    .await?;
2454            }
2455
2456            Ok(())
2457        })
2458    }
2459
2460    /// Rewinds this thread to before the entry at `index`, removing it and all
2461    /// subsequent entries while rejecting any action_log changes made from that point.
2462    /// Unlike `restore_checkpoint`, this method does not restore from git.
2463    pub fn rewind(&mut self, id: UserMessageId, cx: &mut Context<Self>) -> Task<Result<()>> {
2464        let Some(truncate) = self.connection.truncate(&self.session_id, cx) else {
2465            return Task::ready(Err(anyhow!("not supported")));
2466        };
2467
2468        Self::flush_streaming_text(&mut self.streaming_text_buffer, cx);
2469        let telemetry = ActionLogTelemetry::from(&*self);
2470        cx.spawn(async move |this, cx| {
2471            cx.update(|cx| truncate.run(id.clone(), cx)).await?;
2472            this.update(cx, |this, cx| {
2473                if let Some((ix, _)) = this.user_message_mut(&id) {
2474                    // Collect all terminals from entries that will be removed
2475                    let terminals_to_remove: Vec<acp::TerminalId> = this.entries[ix..]
2476                        .iter()
2477                        .flat_map(|entry| entry.terminals())
2478                        .filter_map(|terminal| terminal.read(cx).id().clone().into())
2479                        .collect();
2480
2481                    let range = ix..this.entries.len();
2482                    this.entries.truncate(ix);
2483                    cx.emit(AcpThreadEvent::EntriesRemoved(range));
2484
2485                    // Kill and remove the terminals
2486                    for terminal_id in terminals_to_remove {
2487                        if let Some(terminal) = this.terminals.remove(&terminal_id) {
2488                            terminal.update(cx, |terminal, cx| {
2489                                terminal.kill(cx);
2490                            });
2491                        }
2492                    }
2493                }
2494                this.action_log().update(cx, |action_log, cx| {
2495                    action_log.reject_all_edits(Some(telemetry), cx)
2496                })
2497            })?
2498            .await;
2499            Ok(())
2500        })
2501    }
2502
2503    fn update_last_checkpoint(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
2504        let git_store = self.project.read(cx).git_store().clone();
2505
2506        let Some((_, message)) = self.last_user_message() else {
2507            return Task::ready(Ok(()));
2508        };
2509        let Some(user_message_id) = message.id.clone() else {
2510            return Task::ready(Ok(()));
2511        };
2512        let Some(checkpoint) = message.checkpoint.as_ref() else {
2513            return Task::ready(Ok(()));
2514        };
2515        let old_checkpoint = checkpoint.git_checkpoint.clone();
2516
2517        let new_checkpoint = git_store.update(cx, |git, cx| git.checkpoint(cx));
2518        cx.spawn(async move |this, cx| {
2519            let Some(new_checkpoint) = new_checkpoint
2520                .await
2521                .context("failed to get new checkpoint")
2522                .log_err()
2523            else {
2524                return Ok(());
2525            };
2526
2527            let equal = git_store
2528                .update(cx, |git, cx| {
2529                    git.compare_checkpoints(old_checkpoint.clone(), new_checkpoint, cx)
2530                })
2531                .await
2532                .unwrap_or(true);
2533
2534            this.update(cx, |this, cx| {
2535                if let Some((ix, message)) = this.user_message_mut(&user_message_id) {
2536                    if let Some(checkpoint) = message.checkpoint.as_mut() {
2537                        checkpoint.show = !equal;
2538                        cx.emit(AcpThreadEvent::EntryUpdated(ix));
2539                    }
2540                }
2541            })?;
2542
2543            Ok(())
2544        })
2545    }
2546
2547    fn last_user_message(&mut self) -> Option<(usize, &mut UserMessage)> {
2548        self.entries
2549            .iter_mut()
2550            .enumerate()
2551            .rev()
2552            .find_map(|(ix, entry)| {
2553                if let AgentThreadEntry::UserMessage(message) = entry {
2554                    Some((ix, message))
2555                } else {
2556                    None
2557                }
2558            })
2559    }
2560
2561    fn user_message_mut(&mut self, id: &UserMessageId) -> Option<(usize, &mut UserMessage)> {
2562        self.entries.iter_mut().enumerate().find_map(|(ix, entry)| {
2563            if let AgentThreadEntry::UserMessage(message) = entry {
2564                if message.id.as_ref() == Some(id) {
2565                    Some((ix, message))
2566                } else {
2567                    None
2568                }
2569            } else {
2570                None
2571            }
2572        })
2573    }
2574
2575    pub fn read_text_file(
2576        &self,
2577        path: PathBuf,
2578        line: Option<u32>,
2579        limit: Option<u32>,
2580        reuse_shared_snapshot: bool,
2581        cx: &mut Context<Self>,
2582    ) -> Task<Result<String, acp::Error>> {
2583        // Args are 1-based, move to 0-based
2584        let line = line.unwrap_or_default().saturating_sub(1);
2585        let limit = limit.unwrap_or(u32::MAX);
2586        let project = self.project.clone();
2587        let action_log = self.action_log.clone();
2588        let should_update_agent_location = self.parent_session_id.is_none();
2589        cx.spawn(async move |this, cx| {
2590            let load = project.update(cx, |project, cx| {
2591                let path = project
2592                    .project_path_for_absolute_path(&path, cx)
2593                    .ok_or_else(|| {
2594                        acp::Error::resource_not_found(Some(path.display().to_string()))
2595                    })?;
2596                Ok::<_, acp::Error>(project.open_buffer(path, cx))
2597            })?;
2598
2599            let buffer = load.await?;
2600
2601            let snapshot = if reuse_shared_snapshot {
2602                this.read_with(cx, |this, _| {
2603                    this.shared_buffers.get(&buffer.clone()).cloned()
2604                })
2605                .log_err()
2606                .flatten()
2607            } else {
2608                None
2609            };
2610
2611            let snapshot = if let Some(snapshot) = snapshot {
2612                snapshot
2613            } else {
2614                action_log.update(cx, |action_log, cx| {
2615                    action_log.buffer_read(buffer.clone(), cx);
2616                });
2617
2618                let snapshot = buffer.update(cx, |buffer, _| buffer.snapshot());
2619                this.update(cx, |this, _| {
2620                    this.shared_buffers.insert(buffer.clone(), snapshot.clone());
2621                })?;
2622                snapshot
2623            };
2624
2625            let max_point = snapshot.max_point();
2626            let start_position = Point::new(line, 0);
2627
2628            if start_position > max_point {
2629                return Err(acp::Error::invalid_params().data(format!(
2630                    "Attempting to read beyond the end of the file, line {}:{}",
2631                    max_point.row + 1,
2632                    max_point.column
2633                )));
2634            }
2635
2636            let start = snapshot.anchor_before(start_position);
2637            let end = snapshot.anchor_before(Point::new(line.saturating_add(limit), 0));
2638
2639            if should_update_agent_location {
2640                project.update(cx, |project, cx| {
2641                    project.set_agent_location(
2642                        Some(AgentLocation {
2643                            buffer: buffer.downgrade(),
2644                            position: start,
2645                        }),
2646                        cx,
2647                    );
2648                });
2649            }
2650
2651            Ok(snapshot.text_for_range(start..end).collect::<String>())
2652        })
2653    }
2654
2655    pub fn write_text_file(
2656        &self,
2657        path: PathBuf,
2658        content: String,
2659        cx: &mut Context<Self>,
2660    ) -> Task<Result<()>> {
2661        let project = self.project.clone();
2662        let action_log = self.action_log.clone();
2663        let should_update_agent_location = self.parent_session_id.is_none();
2664        cx.spawn(async move |this, cx| {
2665            let load = project.update(cx, |project, cx| {
2666                let path = project
2667                    .project_path_for_absolute_path(&path, cx)
2668                    .context("invalid path")?;
2669                anyhow::Ok(project.open_buffer(path, cx))
2670            });
2671            let buffer = load?.await?;
2672            let snapshot = this.update(cx, |this, cx| {
2673                this.shared_buffers
2674                    .get(&buffer)
2675                    .cloned()
2676                    .unwrap_or_else(|| buffer.read(cx).snapshot())
2677            })?;
2678            let edits = cx
2679                .background_executor()
2680                .spawn(async move {
2681                    let old_text = snapshot.text();
2682                    text_diff(old_text.as_str(), &content)
2683                        .into_iter()
2684                        .map(|(range, replacement)| {
2685                            (snapshot.anchor_range_inside(range), replacement)
2686                        })
2687                        .collect::<Vec<_>>()
2688                })
2689                .await;
2690
2691            if should_update_agent_location {
2692                project.update(cx, |project, cx| {
2693                    project.set_agent_location(
2694                        Some(AgentLocation {
2695                            buffer: buffer.downgrade(),
2696                            position: edits
2697                                .last()
2698                                .map(|(range, _)| range.end)
2699                                .unwrap_or(Anchor::min_for_buffer(buffer.read(cx).remote_id())),
2700                        }),
2701                        cx,
2702                    );
2703                });
2704            }
2705
2706            let format_on_save = cx.update(|cx| {
2707                action_log.update(cx, |action_log, cx| {
2708                    action_log.buffer_read(buffer.clone(), cx);
2709                });
2710
2711                let format_on_save = buffer.update(cx, |buffer, cx| {
2712                    buffer.edit(edits, None, cx);
2713
2714                    let settings =
2715                        language::language_settings::LanguageSettings::for_buffer(buffer, cx);
2716
2717                    settings.format_on_save != FormatOnSave::Off
2718                });
2719                action_log.update(cx, |action_log, cx| {
2720                    action_log.buffer_edited(buffer.clone(), cx);
2721                });
2722                format_on_save
2723            });
2724
2725            if format_on_save {
2726                let format_task = project.update(cx, |project, cx| {
2727                    project.format(
2728                        HashSet::from_iter([buffer.clone()]),
2729                        LspFormatTarget::Buffers,
2730                        false,
2731                        FormatTrigger::Save,
2732                        cx,
2733                    )
2734                });
2735                format_task.await.log_err();
2736
2737                action_log.update(cx, |action_log, cx| {
2738                    action_log.buffer_edited(buffer.clone(), cx);
2739                });
2740            }
2741
2742            project
2743                .update(cx, |project, cx| project.save_buffer(buffer, cx))
2744                .await
2745        })
2746    }
2747
2748    pub fn create_terminal(
2749        &self,
2750        command: String,
2751        args: Vec<String>,
2752        extra_env: Vec<acp::EnvVariable>,
2753        cwd: Option<PathBuf>,
2754        output_byte_limit: Option<u64>,
2755        cx: &mut Context<Self>,
2756    ) -> Task<Result<Entity<Terminal>>> {
2757        let env = match &cwd {
2758            Some(dir) => self.project.update(cx, |project, cx| {
2759                project.environment().update(cx, |env, cx| {
2760                    env.directory_environment(dir.as_path().into(), cx)
2761                })
2762            }),
2763            None => Task::ready(None).shared(),
2764        };
2765        let env = cx.spawn(async move |_, _| {
2766            let mut env = env.await.unwrap_or_default();
2767            // Disables paging for `git` and hopefully other commands
2768            env.insert("PAGER".into(), "".into());
2769            for var in extra_env {
2770                env.insert(var.name, var.value);
2771            }
2772            env
2773        });
2774
2775        let project = self.project.clone();
2776        let language_registry = project.read(cx).languages().clone();
2777        let is_windows = project.read(cx).path_style(cx).is_windows();
2778
2779        let terminal_id = acp::TerminalId::new(Uuid::new_v4().to_string());
2780        let terminal_task = cx.spawn({
2781            let terminal_id = terminal_id.clone();
2782            async move |_this, cx| {
2783                let env = env.await;
2784                let shell = project
2785                    .update(cx, |project, cx| {
2786                        project
2787                            .remote_client()
2788                            .and_then(|r| r.read(cx).default_system_shell())
2789                    })
2790                    .unwrap_or_else(|| get_default_system_shell_preferring_bash());
2791                let (task_command, task_args) =
2792                    ShellBuilder::new(&Shell::Program(shell), is_windows)
2793                        .redirect_stdin_to_dev_null()
2794                        .build(Some(command.clone()), &args);
2795                let terminal = project
2796                    .update(cx, |project, cx| {
2797                        project.create_terminal_task(
2798                            task::SpawnInTerminal {
2799                                command: Some(task_command),
2800                                args: task_args,
2801                                cwd: cwd.clone(),
2802                                env,
2803                                ..Default::default()
2804                            },
2805                            cx,
2806                        )
2807                    })
2808                    .await?;
2809
2810                anyhow::Ok(cx.new(|cx| {
2811                    Terminal::new(
2812                        terminal_id,
2813                        &format!("{} {}", command, args.join(" ")),
2814                        cwd,
2815                        output_byte_limit.map(|l| l as usize),
2816                        terminal,
2817                        language_registry,
2818                        cx,
2819                    )
2820                }))
2821            }
2822        });
2823
2824        cx.spawn(async move |this, cx| {
2825            let terminal = terminal_task.await?;
2826            this.update(cx, |this, _cx| {
2827                this.terminals.insert(terminal_id, terminal.clone());
2828                terminal
2829            })
2830        })
2831    }
2832
2833    pub fn kill_terminal(
2834        &mut self,
2835        terminal_id: acp::TerminalId,
2836        cx: &mut Context<Self>,
2837    ) -> Result<()> {
2838        self.terminals
2839            .get(&terminal_id)
2840            .context("Terminal not found")?
2841            .update(cx, |terminal, cx| {
2842                terminal.kill(cx);
2843            });
2844
2845        Ok(())
2846    }
2847
2848    pub fn release_terminal(
2849        &mut self,
2850        terminal_id: acp::TerminalId,
2851        cx: &mut Context<Self>,
2852    ) -> Result<()> {
2853        self.terminals
2854            .remove(&terminal_id)
2855            .context("Terminal not found")?
2856            .update(cx, |terminal, cx| {
2857                terminal.kill(cx);
2858            });
2859
2860        Ok(())
2861    }
2862
2863    pub fn terminal(&self, terminal_id: acp::TerminalId) -> Result<Entity<Terminal>> {
2864        self.terminals
2865            .get(&terminal_id)
2866            .context("Terminal not found")
2867            .cloned()
2868    }
2869
2870    pub fn to_markdown(&self, cx: &App) -> String {
2871        self.entries.iter().map(|e| e.to_markdown(cx)).collect()
2872    }
2873
2874    pub fn emit_load_error(&mut self, error: LoadError, cx: &mut Context<Self>) {
2875        cx.emit(AcpThreadEvent::LoadError(error));
2876    }
2877
2878    pub fn register_terminal_created(
2879        &mut self,
2880        terminal_id: acp::TerminalId,
2881        command_label: String,
2882        working_dir: Option<PathBuf>,
2883        output_byte_limit: Option<u64>,
2884        terminal: Entity<::terminal::Terminal>,
2885        cx: &mut Context<Self>,
2886    ) -> Entity<Terminal> {
2887        let language_registry = self.project.read(cx).languages().clone();
2888
2889        let entity = cx.new(|cx| {
2890            Terminal::new(
2891                terminal_id.clone(),
2892                &command_label,
2893                working_dir.clone(),
2894                output_byte_limit.map(|l| l as usize),
2895                terminal,
2896                language_registry,
2897                cx,
2898            )
2899        });
2900        self.terminals.insert(terminal_id.clone(), entity.clone());
2901        entity
2902    }
2903
2904    pub fn mark_as_subagent_output(&mut self, cx: &mut Context<Self>) {
2905        for entry in self.entries.iter_mut().rev() {
2906            if let AgentThreadEntry::AssistantMessage(assistant_message) = entry {
2907                assistant_message.is_subagent_output = true;
2908                cx.notify();
2909                return;
2910            }
2911        }
2912    }
2913
2914    pub fn on_terminal_provider_event(
2915        &mut self,
2916        event: TerminalProviderEvent,
2917        cx: &mut Context<Self>,
2918    ) {
2919        match event {
2920            TerminalProviderEvent::Created {
2921                terminal_id,
2922                label,
2923                cwd,
2924                output_byte_limit,
2925                terminal,
2926            } => {
2927                let entity = self.register_terminal_created(
2928                    terminal_id.clone(),
2929                    label,
2930                    cwd,
2931                    output_byte_limit,
2932                    terminal,
2933                    cx,
2934                );
2935
2936                if let Some(mut chunks) = self.pending_terminal_output.remove(&terminal_id) {
2937                    for data in chunks.drain(..) {
2938                        entity.update(cx, |term, cx| {
2939                            term.inner().update(cx, |inner, cx| {
2940                                inner.write_output(&data, cx);
2941                            })
2942                        });
2943                    }
2944                }
2945
2946                if let Some(_status) = self.pending_terminal_exit.remove(&terminal_id) {
2947                    entity.update(cx, |_term, cx| {
2948                        cx.notify();
2949                    });
2950                }
2951
2952                cx.notify();
2953            }
2954            TerminalProviderEvent::Output { terminal_id, data } => {
2955                if let Some(entity) = self.terminals.get(&terminal_id) {
2956                    entity.update(cx, |term, cx| {
2957                        term.inner().update(cx, |inner, cx| {
2958                            inner.write_output(&data, cx);
2959                        })
2960                    });
2961                } else {
2962                    self.pending_terminal_output
2963                        .entry(terminal_id)
2964                        .or_default()
2965                        .push(data);
2966                }
2967            }
2968            TerminalProviderEvent::TitleChanged { terminal_id, title } => {
2969                if let Some(entity) = self.terminals.get(&terminal_id) {
2970                    entity.update(cx, |term, cx| {
2971                        term.inner().update(cx, |inner, cx| {
2972                            inner.breadcrumb_text = title;
2973                            cx.emit(::terminal::Event::BreadcrumbsChanged);
2974                        })
2975                    });
2976                }
2977            }
2978            TerminalProviderEvent::Exit {
2979                terminal_id,
2980                status,
2981            } => {
2982                if let Some(entity) = self.terminals.get(&terminal_id) {
2983                    entity.update(cx, |_term, cx| {
2984                        cx.notify();
2985                    });
2986                } else {
2987                    self.pending_terminal_exit.insert(terminal_id, status);
2988                }
2989            }
2990        }
2991    }
2992}
2993
2994fn markdown_for_raw_output(
2995    raw_output: &serde_json::Value,
2996    language_registry: &Arc<LanguageRegistry>,
2997    cx: &mut App,
2998) -> Option<Entity<Markdown>> {
2999    match raw_output {
3000        serde_json::Value::Null => None,
3001        serde_json::Value::Bool(value) => Some(cx.new(|cx| {
3002            Markdown::new(
3003                value.to_string().into(),
3004                Some(language_registry.clone()),
3005                None,
3006                cx,
3007            )
3008        })),
3009        serde_json::Value::Number(value) => Some(cx.new(|cx| {
3010            Markdown::new(
3011                value.to_string().into(),
3012                Some(language_registry.clone()),
3013                None,
3014                cx,
3015            )
3016        })),
3017        serde_json::Value::String(value) => Some(cx.new(|cx| {
3018            Markdown::new(
3019                value.clone().into(),
3020                Some(language_registry.clone()),
3021                None,
3022                cx,
3023            )
3024        })),
3025        value => Some(cx.new(|cx| {
3026            let pretty_json = to_string_pretty(value).unwrap_or_else(|_| value.to_string());
3027
3028            Markdown::new(
3029                format!("```json\n{}\n```", pretty_json).into(),
3030                Some(language_registry.clone()),
3031                None,
3032                cx,
3033            )
3034        })),
3035    }
3036}
3037
3038#[cfg(test)]
3039mod tests {
3040    use super::*;
3041    use anyhow::anyhow;
3042    use futures::{channel::mpsc, future::LocalBoxFuture, select};
3043    use gpui::{App, AsyncApp, TestAppContext, WeakEntity};
3044    use indoc::indoc;
3045    use project::{AgentId, FakeFs, Fs};
3046    use rand::{distr, prelude::*};
3047    use serde_json::json;
3048    use settings::SettingsStore;
3049    use smol::stream::StreamExt as _;
3050    use std::{
3051        any::Any,
3052        cell::RefCell,
3053        path::Path,
3054        rc::Rc,
3055        sync::atomic::{AtomicBool, AtomicUsize, Ordering::SeqCst},
3056        time::Duration,
3057    };
3058    use util::{path, path_list::PathList};
3059
3060    fn init_test(cx: &mut TestAppContext) {
3061        env_logger::try_init().ok();
3062        cx.update(|cx| {
3063            let settings_store = SettingsStore::test(cx);
3064            cx.set_global(settings_store);
3065        });
3066    }
3067
3068    #[gpui::test]
3069    async fn test_terminal_output_buffered_before_created_renders(cx: &mut gpui::TestAppContext) {
3070        init_test(cx);
3071
3072        let fs = FakeFs::new(cx.executor());
3073        let project = Project::test(fs, [], cx).await;
3074        let connection = Rc::new(FakeAgentConnection::new());
3075        let thread = cx
3076            .update(|cx| {
3077                connection.new_session(
3078                    project,
3079                    PathList::new(&[std::path::Path::new(path!("/test"))]),
3080                    cx,
3081                )
3082            })
3083            .await
3084            .unwrap();
3085
3086        let terminal_id = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
3087
3088        // Send Output BEFORE Created - should be buffered by acp_thread
3089        thread.update(cx, |thread, cx| {
3090            thread.on_terminal_provider_event(
3091                TerminalProviderEvent::Output {
3092                    terminal_id: terminal_id.clone(),
3093                    data: b"hello buffered".to_vec(),
3094                },
3095                cx,
3096            );
3097        });
3098
3099        // Create a display-only terminal and then send Created
3100        let lower = cx.new(|cx| {
3101            let builder = ::terminal::TerminalBuilder::new_display_only(
3102                ::terminal::terminal_settings::CursorShape::default(),
3103                ::terminal::terminal_settings::AlternateScroll::On,
3104                None,
3105                0,
3106                cx.background_executor(),
3107                PathStyle::local(),
3108            )
3109            .unwrap();
3110            builder.subscribe(cx)
3111        });
3112
3113        thread.update(cx, |thread, cx| {
3114            thread.on_terminal_provider_event(
3115                TerminalProviderEvent::Created {
3116                    terminal_id: terminal_id.clone(),
3117                    label: "Buffered Test".to_string(),
3118                    cwd: None,
3119                    output_byte_limit: None,
3120                    terminal: lower.clone(),
3121                },
3122                cx,
3123            );
3124        });
3125
3126        // After Created, buffered Output should have been flushed into the renderer
3127        let content = thread.read_with(cx, |thread, cx| {
3128            let term = thread.terminal(terminal_id.clone()).unwrap();
3129            term.read_with(cx, |t, cx| t.inner().read(cx).get_content())
3130        });
3131
3132        assert!(
3133            content.contains("hello buffered"),
3134            "expected buffered output to render, got: {content}"
3135        );
3136    }
3137
3138    #[gpui::test]
3139    async fn test_terminal_output_and_exit_buffered_before_created(cx: &mut gpui::TestAppContext) {
3140        init_test(cx);
3141
3142        let fs = FakeFs::new(cx.executor());
3143        let project = Project::test(fs, [], cx).await;
3144        let connection = Rc::new(FakeAgentConnection::new());
3145        let thread = cx
3146            .update(|cx| {
3147                connection.new_session(
3148                    project,
3149                    PathList::new(&[std::path::Path::new(path!("/test"))]),
3150                    cx,
3151                )
3152            })
3153            .await
3154            .unwrap();
3155
3156        let terminal_id = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
3157
3158        // Send Output BEFORE Created
3159        thread.update(cx, |thread, cx| {
3160            thread.on_terminal_provider_event(
3161                TerminalProviderEvent::Output {
3162                    terminal_id: terminal_id.clone(),
3163                    data: b"pre-exit data".to_vec(),
3164                },
3165                cx,
3166            );
3167        });
3168
3169        // Send Exit BEFORE Created
3170        thread.update(cx, |thread, cx| {
3171            thread.on_terminal_provider_event(
3172                TerminalProviderEvent::Exit {
3173                    terminal_id: terminal_id.clone(),
3174                    status: acp::TerminalExitStatus::new().exit_code(0),
3175                },
3176                cx,
3177            );
3178        });
3179
3180        // Now create a display-only lower-level terminal and send Created
3181        let lower = cx.new(|cx| {
3182            let builder = ::terminal::TerminalBuilder::new_display_only(
3183                ::terminal::terminal_settings::CursorShape::default(),
3184                ::terminal::terminal_settings::AlternateScroll::On,
3185                None,
3186                0,
3187                cx.background_executor(),
3188                PathStyle::local(),
3189            )
3190            .unwrap();
3191            builder.subscribe(cx)
3192        });
3193
3194        thread.update(cx, |thread, cx| {
3195            thread.on_terminal_provider_event(
3196                TerminalProviderEvent::Created {
3197                    terminal_id: terminal_id.clone(),
3198                    label: "Buffered Exit Test".to_string(),
3199                    cwd: None,
3200                    output_byte_limit: None,
3201                    terminal: lower.clone(),
3202                },
3203                cx,
3204            );
3205        });
3206
3207        // Output should be present after Created (flushed from buffer)
3208        let content = thread.read_with(cx, |thread, cx| {
3209            let term = thread.terminal(terminal_id.clone()).unwrap();
3210            term.read_with(cx, |t, cx| t.inner().read(cx).get_content())
3211        });
3212
3213        assert!(
3214            content.contains("pre-exit data"),
3215            "expected pre-exit data to render, got: {content}"
3216        );
3217    }
3218
3219    /// Test that killing a terminal via Terminal::kill properly:
3220    /// 1. Causes wait_for_exit to complete (doesn't hang forever)
3221    /// 2. The underlying terminal still has the output that was written before the kill
3222    ///
3223    /// This test verifies that the fix to kill_active_task (which now also kills
3224    /// the shell process in addition to the foreground process) properly allows
3225    /// wait_for_exit to complete instead of hanging indefinitely.
3226    #[cfg(unix)]
3227    #[gpui::test]
3228    async fn test_terminal_kill_allows_wait_for_exit_to_complete(cx: &mut gpui::TestAppContext) {
3229        use std::collections::HashMap;
3230        use task::Shell;
3231        use util::shell_builder::ShellBuilder;
3232
3233        init_test(cx);
3234        cx.executor().allow_parking();
3235
3236        let fs = FakeFs::new(cx.executor());
3237        let project = Project::test(fs, [], cx).await;
3238        let connection = Rc::new(FakeAgentConnection::new());
3239        let thread = cx
3240            .update(|cx| {
3241                connection.new_session(
3242                    project.clone(),
3243                    PathList::new(&[Path::new(path!("/test"))]),
3244                    cx,
3245                )
3246            })
3247            .await
3248            .unwrap();
3249
3250        let terminal_id = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
3251
3252        // Create a real PTY terminal that runs a command which prints output then sleeps
3253        // We use printf instead of echo and chain with && sleep to ensure proper execution
3254        let (completion_tx, _completion_rx) = smol::channel::unbounded();
3255        let (program, args) = ShellBuilder::new(&Shell::System, false).build(
3256            Some("printf 'output_before_kill\\n' && sleep 60".to_owned()),
3257            &[],
3258        );
3259
3260        let builder = cx
3261            .update(|cx| {
3262                ::terminal::TerminalBuilder::new(
3263                    None,
3264                    None,
3265                    task::Shell::WithArguments {
3266                        program,
3267                        args,
3268                        title_override: None,
3269                    },
3270                    HashMap::default(),
3271                    ::terminal::terminal_settings::CursorShape::default(),
3272                    ::terminal::terminal_settings::AlternateScroll::On,
3273                    None,
3274                    vec![],
3275                    0,
3276                    false,
3277                    0,
3278                    Some(completion_tx),
3279                    cx,
3280                    vec![],
3281                    PathStyle::local(),
3282                )
3283            })
3284            .await
3285            .unwrap();
3286
3287        let lower_terminal = cx.new(|cx| builder.subscribe(cx));
3288
3289        // Create the acp_thread Terminal wrapper
3290        thread.update(cx, |thread, cx| {
3291            thread.on_terminal_provider_event(
3292                TerminalProviderEvent::Created {
3293                    terminal_id: terminal_id.clone(),
3294                    label: "printf output_before_kill && sleep 60".to_string(),
3295                    cwd: None,
3296                    output_byte_limit: None,
3297                    terminal: lower_terminal.clone(),
3298                },
3299                cx,
3300            );
3301        });
3302
3303        // Poll until the printf command produces output, rather than using a
3304        // fixed sleep which is flaky on loaded machines.
3305        let deadline = std::time::Instant::now() + Duration::from_secs(10);
3306        loop {
3307            let has_output = thread.read_with(cx, |thread, cx| {
3308                let term = thread
3309                    .terminals
3310                    .get(&terminal_id)
3311                    .expect("terminal not found");
3312                let content = term.read(cx).inner().read(cx).get_content();
3313                content.contains("output_before_kill")
3314            });
3315            if has_output {
3316                break;
3317            }
3318            assert!(
3319                std::time::Instant::now() < deadline,
3320                "Timed out waiting for printf output to appear in terminal",
3321            );
3322            cx.executor().timer(Duration::from_millis(50)).await;
3323        }
3324
3325        // Get the acp_thread Terminal and kill it
3326        let wait_for_exit = thread.update(cx, |thread, cx| {
3327            let term = thread.terminals.get(&terminal_id).unwrap();
3328            let wait_for_exit = term.read(cx).wait_for_exit();
3329            term.update(cx, |term, cx| {
3330                term.kill(cx);
3331            });
3332            wait_for_exit
3333        });
3334
3335        // KEY ASSERTION: wait_for_exit should complete within a reasonable time (not hang).
3336        // Before the fix to kill_active_task, this would hang forever because
3337        // only the foreground process was killed, not the shell, so the PTY
3338        // child never exited and wait_for_completed_task never completed.
3339        let exit_result = futures::select! {
3340            result = futures::FutureExt::fuse(wait_for_exit) => Some(result),
3341            _ = futures::FutureExt::fuse(cx.background_executor.timer(Duration::from_secs(5))) => None,
3342        };
3343
3344        assert!(
3345            exit_result.is_some(),
3346            "wait_for_exit should complete after kill, but it timed out. \
3347            This indicates kill_active_task is not properly killing the shell process."
3348        );
3349
3350        // Give the system a chance to process any pending updates
3351        cx.run_until_parked();
3352
3353        // Verify that the underlying terminal still has the output that was
3354        // written before the kill. This verifies that killing doesn't lose output.
3355        let inner_content = thread.read_with(cx, |thread, cx| {
3356            let term = thread.terminals.get(&terminal_id).unwrap();
3357            term.read(cx).inner().read(cx).get_content()
3358        });
3359
3360        assert!(
3361            inner_content.contains("output_before_kill"),
3362            "Underlying terminal should contain output from before kill, got: {}",
3363            inner_content
3364        );
3365    }
3366
3367    #[gpui::test]
3368    async fn test_push_user_content_block(cx: &mut gpui::TestAppContext) {
3369        init_test(cx);
3370
3371        let fs = FakeFs::new(cx.executor());
3372        let project = Project::test(fs, [], cx).await;
3373        let connection = Rc::new(FakeAgentConnection::new());
3374        let thread = cx
3375            .update(|cx| {
3376                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
3377            })
3378            .await
3379            .unwrap();
3380
3381        // Test creating a new user message
3382        thread.update(cx, |thread, cx| {
3383            thread.push_user_content_block(None, "Hello, ".into(), cx);
3384        });
3385
3386        thread.update(cx, |thread, cx| {
3387            assert_eq!(thread.entries.len(), 1);
3388            if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
3389                assert_eq!(user_msg.id, None);
3390                assert_eq!(user_msg.content.to_markdown(cx), "Hello, ");
3391            } else {
3392                panic!("Expected UserMessage");
3393            }
3394        });
3395
3396        // Test appending to existing user message
3397        let message_1_id = UserMessageId::new();
3398        thread.update(cx, |thread, cx| {
3399            thread.push_user_content_block(Some(message_1_id.clone()), "world!".into(), cx);
3400        });
3401
3402        thread.update(cx, |thread, cx| {
3403            assert_eq!(thread.entries.len(), 1);
3404            if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
3405                assert_eq!(user_msg.id, Some(message_1_id));
3406                assert_eq!(user_msg.content.to_markdown(cx), "Hello, world!");
3407            } else {
3408                panic!("Expected UserMessage");
3409            }
3410        });
3411
3412        // Test creating new user message after assistant message
3413        thread.update(cx, |thread, cx| {
3414            thread.push_assistant_content_block("Assistant response".into(), false, cx);
3415        });
3416
3417        let message_2_id = UserMessageId::new();
3418        thread.update(cx, |thread, cx| {
3419            thread.push_user_content_block(
3420                Some(message_2_id.clone()),
3421                "New user message".into(),
3422                cx,
3423            );
3424        });
3425
3426        thread.update(cx, |thread, cx| {
3427            assert_eq!(thread.entries.len(), 3);
3428            if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[2] {
3429                assert_eq!(user_msg.id, Some(message_2_id));
3430                assert_eq!(user_msg.content.to_markdown(cx), "New user message");
3431            } else {
3432                panic!("Expected UserMessage at index 2");
3433            }
3434        });
3435    }
3436
3437    #[gpui::test]
3438    async fn test_thinking_concatenation(cx: &mut gpui::TestAppContext) {
3439        init_test(cx);
3440
3441        let fs = FakeFs::new(cx.executor());
3442        let project = Project::test(fs, [], cx).await;
3443        let connection = Rc::new(FakeAgentConnection::new().on_user_message(
3444            |_, thread, mut cx| {
3445                async move {
3446                    thread.update(&mut cx, |thread, cx| {
3447                        thread
3448                            .handle_session_update(
3449                                acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk::new(
3450                                    "Thinking ".into(),
3451                                )),
3452                                cx,
3453                            )
3454                            .unwrap();
3455                        thread
3456                            .handle_session_update(
3457                                acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk::new(
3458                                    "hard!".into(),
3459                                )),
3460                                cx,
3461                            )
3462                            .unwrap();
3463                    })?;
3464                    Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3465                }
3466                .boxed_local()
3467            },
3468        ));
3469
3470        let thread = cx
3471            .update(|cx| {
3472                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
3473            })
3474            .await
3475            .unwrap();
3476
3477        thread
3478            .update(cx, |thread, cx| thread.send_raw("Hello from Zed!", cx))
3479            .await
3480            .unwrap();
3481
3482        let output = thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
3483        assert_eq!(
3484            output,
3485            indoc! {r#"
3486            ## User
3487
3488            Hello from Zed!
3489
3490            ## Assistant
3491
3492            <thinking>
3493            Thinking hard!
3494            </thinking>
3495
3496            "#}
3497        );
3498    }
3499
3500    #[gpui::test]
3501    async fn test_ignore_echoed_user_message_chunks_during_active_turn(
3502        cx: &mut gpui::TestAppContext,
3503    ) {
3504        init_test(cx);
3505
3506        let fs = FakeFs::new(cx.executor());
3507        let project = Project::test(fs, [], cx).await;
3508        let connection = Rc::new(FakeAgentConnection::new().on_user_message(
3509            |request, thread, mut cx| {
3510                async move {
3511                    let prompt = request.prompt.first().cloned().unwrap_or_else(|| "".into());
3512
3513                    thread.update(&mut cx, |thread, cx| {
3514                        thread
3515                            .handle_session_update(
3516                                acp::SessionUpdate::UserMessageChunk(acp::ContentChunk::new(
3517                                    prompt,
3518                                )),
3519                                cx,
3520                            )
3521                            .unwrap();
3522                    })?;
3523
3524                    Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3525                }
3526                .boxed_local()
3527            },
3528        ));
3529
3530        let thread = cx
3531            .update(|cx| {
3532                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
3533            })
3534            .await
3535            .unwrap();
3536
3537        thread
3538            .update(cx, |thread, cx| thread.send_raw("Hello from Zed!", cx))
3539            .await
3540            .unwrap();
3541
3542        let output = thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
3543        assert_eq!(output.matches("Hello from Zed!").count(), 1);
3544    }
3545
3546    #[gpui::test]
3547    async fn test_edits_concurrently_to_user(cx: &mut TestAppContext) {
3548        init_test(cx);
3549
3550        let fs = FakeFs::new(cx.executor());
3551        fs.insert_tree(path!("/tmp"), json!({"foo": "one\ntwo\nthree\n"}))
3552            .await;
3553        let project = Project::test(fs.clone(), [], cx).await;
3554        let (read_file_tx, read_file_rx) = oneshot::channel::<()>();
3555        let read_file_tx = Rc::new(RefCell::new(Some(read_file_tx)));
3556        let connection = Rc::new(FakeAgentConnection::new().on_user_message(
3557            move |_, thread, mut cx| {
3558                let read_file_tx = read_file_tx.clone();
3559                async move {
3560                    let content = thread
3561                        .update(&mut cx, |thread, cx| {
3562                            thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
3563                        })
3564                        .unwrap()
3565                        .await
3566                        .unwrap();
3567                    assert_eq!(content, "one\ntwo\nthree\n");
3568                    read_file_tx.take().unwrap().send(()).unwrap();
3569                    thread
3570                        .update(&mut cx, |thread, cx| {
3571                            thread.write_text_file(
3572                                path!("/tmp/foo").into(),
3573                                "one\ntwo\nthree\nfour\nfive\n".to_string(),
3574                                cx,
3575                            )
3576                        })
3577                        .unwrap()
3578                        .await
3579                        .unwrap();
3580                    Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3581                }
3582                .boxed_local()
3583            },
3584        ));
3585
3586        let (worktree, pathbuf) = project
3587            .update(cx, |project, cx| {
3588                project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
3589            })
3590            .await
3591            .unwrap();
3592        let buffer = project
3593            .update(cx, |project, cx| {
3594                project.open_buffer((worktree.read(cx).id(), pathbuf), cx)
3595            })
3596            .await
3597            .unwrap();
3598
3599        let thread = cx
3600            .update(|cx| {
3601                connection.new_session(project, PathList::new(&[Path::new(path!("/tmp"))]), cx)
3602            })
3603            .await
3604            .unwrap();
3605
3606        let request = thread.update(cx, |thread, cx| {
3607            thread.send_raw("Extend the count in /tmp/foo", cx)
3608        });
3609        read_file_rx.await.ok();
3610        buffer.update(cx, |buffer, cx| {
3611            buffer.edit([(0..0, "zero\n".to_string())], None, cx);
3612        });
3613        cx.run_until_parked();
3614        assert_eq!(
3615            buffer.read_with(cx, |buffer, _| buffer.text()),
3616            "zero\none\ntwo\nthree\nfour\nfive\n"
3617        );
3618        assert_eq!(
3619            String::from_utf8(fs.read_file_sync(path!("/tmp/foo")).unwrap()).unwrap(),
3620            "zero\none\ntwo\nthree\nfour\nfive\n"
3621        );
3622        request.await.unwrap();
3623    }
3624
3625    #[gpui::test]
3626    async fn test_reading_from_line(cx: &mut TestAppContext) {
3627        init_test(cx);
3628
3629        let fs = FakeFs::new(cx.executor());
3630        fs.insert_tree(path!("/tmp"), json!({"foo": "one\ntwo\nthree\nfour\n"}))
3631            .await;
3632        let project = Project::test(fs.clone(), [], cx).await;
3633        project
3634            .update(cx, |project, cx| {
3635                project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
3636            })
3637            .await
3638            .unwrap();
3639
3640        let connection = Rc::new(FakeAgentConnection::new());
3641
3642        let thread = cx
3643            .update(|cx| {
3644                connection.new_session(project, PathList::new(&[Path::new(path!("/tmp"))]), cx)
3645            })
3646            .await
3647            .unwrap();
3648
3649        // Whole file
3650        let content = thread
3651            .update(cx, |thread, cx| {
3652                thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
3653            })
3654            .await
3655            .unwrap();
3656
3657        assert_eq!(content, "one\ntwo\nthree\nfour\n");
3658
3659        // Only start line
3660        let content = thread
3661            .update(cx, |thread, cx| {
3662                thread.read_text_file(path!("/tmp/foo").into(), Some(3), None, false, cx)
3663            })
3664            .await
3665            .unwrap();
3666
3667        assert_eq!(content, "three\nfour\n");
3668
3669        // Only limit
3670        let content = thread
3671            .update(cx, |thread, cx| {
3672                thread.read_text_file(path!("/tmp/foo").into(), None, Some(2), false, cx)
3673            })
3674            .await
3675            .unwrap();
3676
3677        assert_eq!(content, "one\ntwo\n");
3678
3679        // Range
3680        let content = thread
3681            .update(cx, |thread, cx| {
3682                thread.read_text_file(path!("/tmp/foo").into(), Some(2), Some(2), false, cx)
3683            })
3684            .await
3685            .unwrap();
3686
3687        assert_eq!(content, "two\nthree\n");
3688
3689        // Invalid
3690        let err = thread
3691            .update(cx, |thread, cx| {
3692                thread.read_text_file(path!("/tmp/foo").into(), Some(6), Some(2), false, cx)
3693            })
3694            .await
3695            .unwrap_err();
3696
3697        assert_eq!(
3698            err.to_string(),
3699            "Invalid params: \"Attempting to read beyond the end of the file, line 5:0\""
3700        );
3701    }
3702
3703    #[gpui::test]
3704    async fn test_reading_empty_file(cx: &mut TestAppContext) {
3705        init_test(cx);
3706
3707        let fs = FakeFs::new(cx.executor());
3708        fs.insert_tree(path!("/tmp"), json!({"foo": ""})).await;
3709        let project = Project::test(fs.clone(), [], cx).await;
3710        project
3711            .update(cx, |project, cx| {
3712                project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
3713            })
3714            .await
3715            .unwrap();
3716
3717        let connection = Rc::new(FakeAgentConnection::new());
3718
3719        let thread = cx
3720            .update(|cx| {
3721                connection.new_session(project, PathList::new(&[Path::new(path!("/tmp"))]), cx)
3722            })
3723            .await
3724            .unwrap();
3725
3726        // Whole file
3727        let content = thread
3728            .update(cx, |thread, cx| {
3729                thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
3730            })
3731            .await
3732            .unwrap();
3733
3734        assert_eq!(content, "");
3735
3736        // Only start line
3737        let content = thread
3738            .update(cx, |thread, cx| {
3739                thread.read_text_file(path!("/tmp/foo").into(), Some(1), None, false, cx)
3740            })
3741            .await
3742            .unwrap();
3743
3744        assert_eq!(content, "");
3745
3746        // Only limit
3747        let content = thread
3748            .update(cx, |thread, cx| {
3749                thread.read_text_file(path!("/tmp/foo").into(), None, Some(2), false, cx)
3750            })
3751            .await
3752            .unwrap();
3753
3754        assert_eq!(content, "");
3755
3756        // Range
3757        let content = thread
3758            .update(cx, |thread, cx| {
3759                thread.read_text_file(path!("/tmp/foo").into(), Some(1), Some(1), false, cx)
3760            })
3761            .await
3762            .unwrap();
3763
3764        assert_eq!(content, "");
3765
3766        // Invalid
3767        let err = thread
3768            .update(cx, |thread, cx| {
3769                thread.read_text_file(path!("/tmp/foo").into(), Some(5), Some(2), false, cx)
3770            })
3771            .await
3772            .unwrap_err();
3773
3774        assert_eq!(
3775            err.to_string(),
3776            "Invalid params: \"Attempting to read beyond the end of the file, line 1:0\""
3777        );
3778    }
3779    #[gpui::test]
3780    async fn test_reading_non_existing_file(cx: &mut TestAppContext) {
3781        init_test(cx);
3782
3783        let fs = FakeFs::new(cx.executor());
3784        fs.insert_tree(path!("/tmp"), json!({})).await;
3785        let project = Project::test(fs.clone(), [], cx).await;
3786        project
3787            .update(cx, |project, cx| {
3788                project.find_or_create_worktree(path!("/tmp"), true, cx)
3789            })
3790            .await
3791            .unwrap();
3792
3793        let connection = Rc::new(FakeAgentConnection::new());
3794
3795        let thread = cx
3796            .update(|cx| {
3797                connection.new_session(project, PathList::new(&[Path::new(path!("/tmp"))]), cx)
3798            })
3799            .await
3800            .unwrap();
3801
3802        // Out of project file
3803        let err = thread
3804            .update(cx, |thread, cx| {
3805                thread.read_text_file(path!("/foo").into(), None, None, false, cx)
3806            })
3807            .await
3808            .unwrap_err();
3809
3810        assert_eq!(err.code, acp::ErrorCode::ResourceNotFound);
3811    }
3812
3813    #[gpui::test]
3814    async fn test_succeeding_canceled_toolcall(cx: &mut TestAppContext) {
3815        init_test(cx);
3816
3817        let fs = FakeFs::new(cx.executor());
3818        let project = Project::test(fs, [], cx).await;
3819        let id = acp::ToolCallId::new("test");
3820
3821        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3822            let id = id.clone();
3823            move |_, thread, mut cx| {
3824                let id = id.clone();
3825                async move {
3826                    thread
3827                        .update(&mut cx, |thread, cx| {
3828                            thread.handle_session_update(
3829                                acp::SessionUpdate::ToolCall(
3830                                    acp::ToolCall::new(id.clone(), "Label")
3831                                        .kind(acp::ToolKind::Fetch)
3832                                        .status(acp::ToolCallStatus::InProgress),
3833                                ),
3834                                cx,
3835                            )
3836                        })
3837                        .unwrap()
3838                        .unwrap();
3839                    Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3840                }
3841                .boxed_local()
3842            }
3843        }));
3844
3845        let thread = cx
3846            .update(|cx| {
3847                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
3848            })
3849            .await
3850            .unwrap();
3851
3852        let request = thread.update(cx, |thread, cx| {
3853            thread.send_raw("Fetch https://example.com", cx)
3854        });
3855
3856        run_until_first_tool_call(&thread, cx).await;
3857
3858        thread.read_with(cx, |thread, _| {
3859            assert!(matches!(
3860                thread.entries[1],
3861                AgentThreadEntry::ToolCall(ToolCall {
3862                    status: ToolCallStatus::InProgress,
3863                    ..
3864                })
3865            ));
3866        });
3867
3868        thread.update(cx, |thread, cx| thread.cancel(cx)).await;
3869
3870        thread.read_with(cx, |thread, _| {
3871            assert!(matches!(
3872                &thread.entries[1],
3873                AgentThreadEntry::ToolCall(ToolCall {
3874                    status: ToolCallStatus::Canceled,
3875                    ..
3876                })
3877            ));
3878        });
3879
3880        thread
3881            .update(cx, |thread, cx| {
3882                thread.handle_session_update(
3883                    acp::SessionUpdate::ToolCallUpdate(acp::ToolCallUpdate::new(
3884                        id,
3885                        acp::ToolCallUpdateFields::new().status(acp::ToolCallStatus::Completed),
3886                    )),
3887                    cx,
3888                )
3889            })
3890            .unwrap();
3891
3892        request.await.unwrap();
3893
3894        thread.read_with(cx, |thread, _| {
3895            assert!(matches!(
3896                thread.entries[1],
3897                AgentThreadEntry::ToolCall(ToolCall {
3898                    status: ToolCallStatus::Completed,
3899                    ..
3900                })
3901            ));
3902        });
3903    }
3904
3905    #[gpui::test]
3906    async fn test_no_pending_edits_if_tool_calls_are_completed(cx: &mut TestAppContext) {
3907        init_test(cx);
3908        let fs = FakeFs::new(cx.background_executor.clone());
3909        fs.insert_tree(path!("/test"), json!({})).await;
3910        let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
3911
3912        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3913            move |_, thread, mut cx| {
3914                async move {
3915                    thread
3916                        .update(&mut cx, |thread, cx| {
3917                            thread.handle_session_update(
3918                                acp::SessionUpdate::ToolCall(
3919                                    acp::ToolCall::new("test", "Label")
3920                                        .kind(acp::ToolKind::Edit)
3921                                        .status(acp::ToolCallStatus::Completed)
3922                                        .content(vec![acp::ToolCallContent::Diff(acp::Diff::new(
3923                                            "/test/test.txt",
3924                                            "foo",
3925                                        ))]),
3926                                ),
3927                                cx,
3928                            )
3929                        })
3930                        .unwrap()
3931                        .unwrap();
3932                    Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3933                }
3934                .boxed_local()
3935            }
3936        }));
3937
3938        let thread = cx
3939            .update(|cx| {
3940                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
3941            })
3942            .await
3943            .unwrap();
3944
3945        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Hi".into()], cx)))
3946            .await
3947            .unwrap();
3948
3949        assert!(cx.read(|cx| !thread.read(cx).has_pending_edit_tool_calls()));
3950    }
3951
3952    #[gpui::test(iterations = 10)]
3953    async fn test_checkpoints(cx: &mut TestAppContext) {
3954        init_test(cx);
3955        let fs = FakeFs::new(cx.background_executor.clone());
3956        fs.insert_tree(
3957            path!("/test"),
3958            json!({
3959                ".git": {}
3960            }),
3961        )
3962        .await;
3963        let project = Project::test(fs.clone(), [path!("/test").as_ref()], cx).await;
3964
3965        let simulate_changes = Arc::new(AtomicBool::new(true));
3966        let next_filename = Arc::new(AtomicUsize::new(0));
3967        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3968            let simulate_changes = simulate_changes.clone();
3969            let next_filename = next_filename.clone();
3970            let fs = fs.clone();
3971            move |request, thread, mut cx| {
3972                let fs = fs.clone();
3973                let simulate_changes = simulate_changes.clone();
3974                let next_filename = next_filename.clone();
3975                async move {
3976                    if simulate_changes.load(SeqCst) {
3977                        let filename = format!("/test/file-{}", next_filename.fetch_add(1, SeqCst));
3978                        fs.write(Path::new(&filename), b"").await?;
3979                    }
3980
3981                    let acp::ContentBlock::Text(content) = &request.prompt[0] else {
3982                        panic!("expected text content block");
3983                    };
3984                    thread.update(&mut cx, |thread, cx| {
3985                        thread
3986                            .handle_session_update(
3987                                acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk::new(
3988                                    content.text.to_uppercase().into(),
3989                                )),
3990                                cx,
3991                            )
3992                            .unwrap();
3993                    })?;
3994                    Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3995                }
3996                .boxed_local()
3997            }
3998        }));
3999        let thread = cx
4000            .update(|cx| {
4001                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4002            })
4003            .await
4004            .unwrap();
4005
4006        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Lorem".into()], cx)))
4007            .await
4008            .unwrap();
4009        thread.read_with(cx, |thread, cx| {
4010            assert_eq!(
4011                thread.to_markdown(cx),
4012                indoc! {"
4013                    ## User (checkpoint)
4014
4015                    Lorem
4016
4017                    ## Assistant
4018
4019                    LOREM
4020
4021                "}
4022            );
4023        });
4024        assert_eq!(fs.files(), vec![Path::new(path!("/test/file-0"))]);
4025
4026        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["ipsum".into()], cx)))
4027            .await
4028            .unwrap();
4029        thread.read_with(cx, |thread, cx| {
4030            assert_eq!(
4031                thread.to_markdown(cx),
4032                indoc! {"
4033                    ## User (checkpoint)
4034
4035                    Lorem
4036
4037                    ## Assistant
4038
4039                    LOREM
4040
4041                    ## User (checkpoint)
4042
4043                    ipsum
4044
4045                    ## Assistant
4046
4047                    IPSUM
4048
4049                "}
4050            );
4051        });
4052        assert_eq!(
4053            fs.files(),
4054            vec![
4055                Path::new(path!("/test/file-0")),
4056                Path::new(path!("/test/file-1"))
4057            ]
4058        );
4059
4060        // Checkpoint isn't stored when there are no changes.
4061        simulate_changes.store(false, SeqCst);
4062        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["dolor".into()], cx)))
4063            .await
4064            .unwrap();
4065        thread.read_with(cx, |thread, cx| {
4066            assert_eq!(
4067                thread.to_markdown(cx),
4068                indoc! {"
4069                    ## User (checkpoint)
4070
4071                    Lorem
4072
4073                    ## Assistant
4074
4075                    LOREM
4076
4077                    ## User (checkpoint)
4078
4079                    ipsum
4080
4081                    ## Assistant
4082
4083                    IPSUM
4084
4085                    ## User
4086
4087                    dolor
4088
4089                    ## Assistant
4090
4091                    DOLOR
4092
4093                "}
4094            );
4095        });
4096        assert_eq!(
4097            fs.files(),
4098            vec![
4099                Path::new(path!("/test/file-0")),
4100                Path::new(path!("/test/file-1"))
4101            ]
4102        );
4103
4104        // Rewinding the conversation truncates the history and restores the checkpoint.
4105        thread
4106            .update(cx, |thread, cx| {
4107                let AgentThreadEntry::UserMessage(message) = &thread.entries[2] else {
4108                    panic!("unexpected entries {:?}", thread.entries)
4109                };
4110                thread.restore_checkpoint(message.id.clone().unwrap(), cx)
4111            })
4112            .await
4113            .unwrap();
4114        thread.read_with(cx, |thread, cx| {
4115            assert_eq!(
4116                thread.to_markdown(cx),
4117                indoc! {"
4118                    ## User (checkpoint)
4119
4120                    Lorem
4121
4122                    ## Assistant
4123
4124                    LOREM
4125
4126                "}
4127            );
4128        });
4129        assert_eq!(fs.files(), vec![Path::new(path!("/test/file-0"))]);
4130    }
4131
4132    #[gpui::test]
4133    async fn test_tool_result_refusal(cx: &mut TestAppContext) {
4134        use std::sync::atomic::AtomicUsize;
4135        init_test(cx);
4136
4137        let fs = FakeFs::new(cx.executor());
4138        let project = Project::test(fs, None, cx).await;
4139
4140        // Create a connection that simulates refusal after tool result
4141        let prompt_count = Arc::new(AtomicUsize::new(0));
4142        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
4143            let prompt_count = prompt_count.clone();
4144            move |_request, thread, mut cx| {
4145                let count = prompt_count.fetch_add(1, SeqCst);
4146                async move {
4147                    if count == 0 {
4148                        // First prompt: Generate a tool call with result
4149                        thread.update(&mut cx, |thread, cx| {
4150                            thread
4151                                .handle_session_update(
4152                                    acp::SessionUpdate::ToolCall(
4153                                        acp::ToolCall::new("tool1", "Test Tool")
4154                                            .kind(acp::ToolKind::Fetch)
4155                                            .status(acp::ToolCallStatus::Completed)
4156                                            .raw_input(serde_json::json!({"query": "test"}))
4157                                            .raw_output(serde_json::json!({"result": "inappropriate content"})),
4158                                    ),
4159                                    cx,
4160                                )
4161                                .unwrap();
4162                        })?;
4163
4164                        // Now return refusal because of the tool result
4165                        Ok(acp::PromptResponse::new(acp::StopReason::Refusal))
4166                    } else {
4167                        Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
4168                    }
4169                }
4170                .boxed_local()
4171            }
4172        }));
4173
4174        let thread = cx
4175            .update(|cx| {
4176                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4177            })
4178            .await
4179            .unwrap();
4180
4181        // Track if we see a Refusal event
4182        let saw_refusal_event = Arc::new(std::sync::Mutex::new(false));
4183        let saw_refusal_event_captured = saw_refusal_event.clone();
4184        thread.update(cx, |_thread, cx| {
4185            cx.subscribe(
4186                &thread,
4187                move |_thread, _event_thread, event: &AcpThreadEvent, _cx| {
4188                    if matches!(event, AcpThreadEvent::Refusal) {
4189                        *saw_refusal_event_captured.lock().unwrap() = true;
4190                    }
4191                },
4192            )
4193            .detach();
4194        });
4195
4196        // Send a user message - this will trigger tool call and then refusal
4197        let send_task = thread.update(cx, |thread, cx| thread.send(vec!["Hello".into()], cx));
4198        cx.background_executor.spawn(send_task).detach();
4199        cx.run_until_parked();
4200
4201        // Verify that:
4202        // 1. A Refusal event WAS emitted (because it's a tool result refusal, not user prompt)
4203        // 2. The user message was NOT truncated
4204        assert!(
4205            *saw_refusal_event.lock().unwrap(),
4206            "Refusal event should be emitted for tool result refusals"
4207        );
4208
4209        thread.read_with(cx, |thread, _| {
4210            let entries = thread.entries();
4211            assert!(entries.len() >= 2, "Should have user message and tool call");
4212
4213            // Verify user message is still there
4214            assert!(
4215                matches!(entries[0], AgentThreadEntry::UserMessage(_)),
4216                "User message should not be truncated"
4217            );
4218
4219            // Verify tool call is there with result
4220            if let AgentThreadEntry::ToolCall(tool_call) = &entries[1] {
4221                assert!(
4222                    tool_call.raw_output.is_some(),
4223                    "Tool call should have output"
4224                );
4225            } else {
4226                panic!("Expected tool call at index 1");
4227            }
4228        });
4229    }
4230
4231    #[gpui::test]
4232    async fn test_user_prompt_refusal_emits_event(cx: &mut TestAppContext) {
4233        init_test(cx);
4234
4235        let fs = FakeFs::new(cx.executor());
4236        let project = Project::test(fs, None, cx).await;
4237
4238        let refuse_next = Arc::new(AtomicBool::new(false));
4239        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
4240            let refuse_next = refuse_next.clone();
4241            move |_request, _thread, _cx| {
4242                if refuse_next.load(SeqCst) {
4243                    async move { Ok(acp::PromptResponse::new(acp::StopReason::Refusal)) }
4244                        .boxed_local()
4245                } else {
4246                    async move { Ok(acp::PromptResponse::new(acp::StopReason::EndTurn)) }
4247                        .boxed_local()
4248                }
4249            }
4250        }));
4251
4252        let thread = cx
4253            .update(|cx| {
4254                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4255            })
4256            .await
4257            .unwrap();
4258
4259        // Track if we see a Refusal event
4260        let saw_refusal_event = Arc::new(std::sync::Mutex::new(false));
4261        let saw_refusal_event_captured = saw_refusal_event.clone();
4262        thread.update(cx, |_thread, cx| {
4263            cx.subscribe(
4264                &thread,
4265                move |_thread, _event_thread, event: &AcpThreadEvent, _cx| {
4266                    if matches!(event, AcpThreadEvent::Refusal) {
4267                        *saw_refusal_event_captured.lock().unwrap() = true;
4268                    }
4269                },
4270            )
4271            .detach();
4272        });
4273
4274        // Send a message that will be refused
4275        refuse_next.store(true, SeqCst);
4276        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["hello".into()], cx)))
4277            .await
4278            .unwrap();
4279
4280        // Verify that a Refusal event WAS emitted for user prompt refusal
4281        assert!(
4282            *saw_refusal_event.lock().unwrap(),
4283            "Refusal event should be emitted for user prompt refusals"
4284        );
4285
4286        // Verify the message was truncated (user prompt refusal)
4287        thread.read_with(cx, |thread, cx| {
4288            assert_eq!(thread.to_markdown(cx), "");
4289        });
4290    }
4291
4292    #[gpui::test]
4293    async fn test_refusal(cx: &mut TestAppContext) {
4294        init_test(cx);
4295        let fs = FakeFs::new(cx.background_executor.clone());
4296        fs.insert_tree(path!("/"), json!({})).await;
4297        let project = Project::test(fs.clone(), [path!("/").as_ref()], cx).await;
4298
4299        let refuse_next = Arc::new(AtomicBool::new(false));
4300        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
4301            let refuse_next = refuse_next.clone();
4302            move |request, thread, mut cx| {
4303                let refuse_next = refuse_next.clone();
4304                async move {
4305                    if refuse_next.load(SeqCst) {
4306                        return Ok(acp::PromptResponse::new(acp::StopReason::Refusal));
4307                    }
4308
4309                    let acp::ContentBlock::Text(content) = &request.prompt[0] else {
4310                        panic!("expected text content block");
4311                    };
4312                    thread.update(&mut cx, |thread, cx| {
4313                        thread
4314                            .handle_session_update(
4315                                acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk::new(
4316                                    content.text.to_uppercase().into(),
4317                                )),
4318                                cx,
4319                            )
4320                            .unwrap();
4321                    })?;
4322                    Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
4323                }
4324                .boxed_local()
4325            }
4326        }));
4327        let thread = cx
4328            .update(|cx| {
4329                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4330            })
4331            .await
4332            .unwrap();
4333
4334        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["hello".into()], cx)))
4335            .await
4336            .unwrap();
4337        thread.read_with(cx, |thread, cx| {
4338            assert_eq!(
4339                thread.to_markdown(cx),
4340                indoc! {"
4341                    ## User
4342
4343                    hello
4344
4345                    ## Assistant
4346
4347                    HELLO
4348
4349                "}
4350            );
4351        });
4352
4353        // Simulate refusing the second message. The message should be truncated
4354        // when a user prompt is refused.
4355        refuse_next.store(true, SeqCst);
4356        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["world".into()], cx)))
4357            .await
4358            .unwrap();
4359        thread.read_with(cx, |thread, cx| {
4360            assert_eq!(
4361                thread.to_markdown(cx),
4362                indoc! {"
4363                    ## User
4364
4365                    hello
4366
4367                    ## Assistant
4368
4369                    HELLO
4370
4371                "}
4372            );
4373        });
4374    }
4375
4376    async fn run_until_first_tool_call(
4377        thread: &Entity<AcpThread>,
4378        cx: &mut TestAppContext,
4379    ) -> usize {
4380        let (mut tx, mut rx) = mpsc::channel::<usize>(1);
4381
4382        let subscription = cx.update(|cx| {
4383            cx.subscribe(thread, move |thread, _, cx| {
4384                for (ix, entry) in thread.read(cx).entries.iter().enumerate() {
4385                    if matches!(entry, AgentThreadEntry::ToolCall(_)) {
4386                        return tx.try_send(ix).unwrap();
4387                    }
4388                }
4389            })
4390        });
4391
4392        select! {
4393            _ = futures::FutureExt::fuse(cx.background_executor.timer(Duration::from_secs(10))) => {
4394                panic!("Timeout waiting for tool call")
4395            }
4396            ix = rx.next().fuse() => {
4397                drop(subscription);
4398                ix.unwrap()
4399            }
4400        }
4401    }
4402
4403    #[derive(Clone, Default)]
4404    struct FakeAgentConnection {
4405        auth_methods: Vec<acp::AuthMethod>,
4406        supports_truncate: bool,
4407        sessions: Arc<parking_lot::Mutex<HashMap<acp::SessionId, WeakEntity<AcpThread>>>>,
4408        set_title_calls: Rc<RefCell<Vec<SharedString>>>,
4409        on_user_message: Option<
4410            Rc<
4411                dyn Fn(
4412                        acp::PromptRequest,
4413                        WeakEntity<AcpThread>,
4414                        AsyncApp,
4415                    ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
4416                    + 'static,
4417            >,
4418        >,
4419    }
4420
4421    impl FakeAgentConnection {
4422        fn new() -> Self {
4423            Self {
4424                auth_methods: Vec::new(),
4425                supports_truncate: true,
4426                on_user_message: None,
4427                sessions: Arc::default(),
4428                set_title_calls: Default::default(),
4429            }
4430        }
4431
4432        fn without_truncate_support(mut self) -> Self {
4433            self.supports_truncate = false;
4434            self
4435        }
4436
4437        #[expect(unused)]
4438        fn with_auth_methods(mut self, auth_methods: Vec<acp::AuthMethod>) -> Self {
4439            self.auth_methods = auth_methods;
4440            self
4441        }
4442
4443        fn on_user_message(
4444            mut self,
4445            handler: impl Fn(
4446                acp::PromptRequest,
4447                WeakEntity<AcpThread>,
4448                AsyncApp,
4449            ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
4450            + 'static,
4451        ) -> Self {
4452            self.on_user_message.replace(Rc::new(handler));
4453            self
4454        }
4455    }
4456
4457    impl AgentConnection for FakeAgentConnection {
4458        fn agent_id(&self) -> AgentId {
4459            AgentId::new("fake")
4460        }
4461
4462        fn telemetry_id(&self) -> SharedString {
4463            "fake".into()
4464        }
4465
4466        fn auth_methods(&self) -> &[acp::AuthMethod] {
4467            &self.auth_methods
4468        }
4469
4470        fn new_session(
4471            self: Rc<Self>,
4472            project: Entity<Project>,
4473            work_dirs: PathList,
4474            cx: &mut App,
4475        ) -> Task<gpui::Result<Entity<AcpThread>>> {
4476            let session_id = acp::SessionId::new(
4477                rand::rng()
4478                    .sample_iter(&distr::Alphanumeric)
4479                    .take(7)
4480                    .map(char::from)
4481                    .collect::<String>(),
4482            );
4483            let action_log = cx.new(|_| ActionLog::new(project.clone()));
4484            let thread = cx.new(|cx| {
4485                AcpThread::new(
4486                    None,
4487                    None,
4488                    Some(work_dirs),
4489                    self.clone(),
4490                    project,
4491                    action_log,
4492                    session_id.clone(),
4493                    watch::Receiver::constant(
4494                        acp::PromptCapabilities::new()
4495                            .image(true)
4496                            .audio(true)
4497                            .embedded_context(true),
4498                    ),
4499                    cx,
4500                )
4501            });
4502            self.sessions.lock().insert(session_id, thread.downgrade());
4503            Task::ready(Ok(thread))
4504        }
4505
4506        fn authenticate(&self, method: acp::AuthMethodId, _cx: &mut App) -> Task<gpui::Result<()>> {
4507            if self.auth_methods().iter().any(|m| m.id() == &method) {
4508                Task::ready(Ok(()))
4509            } else {
4510                Task::ready(Err(anyhow!("Invalid Auth Method")))
4511            }
4512        }
4513
4514        fn prompt(
4515            &self,
4516            _id: UserMessageId,
4517            params: acp::PromptRequest,
4518            cx: &mut App,
4519        ) -> Task<gpui::Result<acp::PromptResponse>> {
4520            let sessions = self.sessions.lock();
4521            let thread = sessions.get(&params.session_id).unwrap();
4522            if let Some(handler) = &self.on_user_message {
4523                let handler = handler.clone();
4524                let thread = thread.clone();
4525                cx.spawn(async move |cx| handler(params, thread, cx.clone()).await)
4526            } else {
4527                Task::ready(Ok(acp::PromptResponse::new(acp::StopReason::EndTurn)))
4528            }
4529        }
4530
4531        fn cancel(&self, _session_id: &acp::SessionId, _cx: &mut App) {}
4532
4533        fn truncate(
4534            &self,
4535            session_id: &acp::SessionId,
4536            _cx: &App,
4537        ) -> Option<Rc<dyn AgentSessionTruncate>> {
4538            self.supports_truncate.then(|| {
4539                Rc::new(FakeAgentSessionEditor {
4540                    _session_id: session_id.clone(),
4541                }) as Rc<dyn AgentSessionTruncate>
4542            })
4543        }
4544
4545        fn set_title(
4546            &self,
4547            _session_id: &acp::SessionId,
4548            _cx: &App,
4549        ) -> Option<Rc<dyn AgentSessionSetTitle>> {
4550            Some(Rc::new(FakeAgentSessionSetTitle {
4551                calls: self.set_title_calls.clone(),
4552            }))
4553        }
4554
4555        fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
4556            self
4557        }
4558    }
4559
4560    struct FakeAgentSessionSetTitle {
4561        calls: Rc<RefCell<Vec<SharedString>>>,
4562    }
4563
4564    impl AgentSessionSetTitle for FakeAgentSessionSetTitle {
4565        fn run(&self, title: SharedString, _cx: &mut App) -> Task<Result<()>> {
4566            self.calls.borrow_mut().push(title);
4567            Task::ready(Ok(()))
4568        }
4569    }
4570
4571    struct FakeAgentSessionEditor {
4572        _session_id: acp::SessionId,
4573    }
4574
4575    impl AgentSessionTruncate for FakeAgentSessionEditor {
4576        fn run(&self, _message_id: UserMessageId, _cx: &mut App) -> Task<Result<()>> {
4577            Task::ready(Ok(()))
4578        }
4579    }
4580
4581    #[gpui::test]
4582    async fn test_tool_call_not_found_creates_failed_entry(cx: &mut TestAppContext) {
4583        init_test(cx);
4584
4585        let fs = FakeFs::new(cx.executor());
4586        let project = Project::test(fs, [], cx).await;
4587        let connection = Rc::new(FakeAgentConnection::new());
4588        let thread = cx
4589            .update(|cx| {
4590                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4591            })
4592            .await
4593            .unwrap();
4594
4595        // Try to update a tool call that doesn't exist
4596        let nonexistent_id = acp::ToolCallId::new("nonexistent-tool-call");
4597        thread.update(cx, |thread, cx| {
4598            let result = thread.handle_session_update(
4599                acp::SessionUpdate::ToolCallUpdate(acp::ToolCallUpdate::new(
4600                    nonexistent_id.clone(),
4601                    acp::ToolCallUpdateFields::new().status(acp::ToolCallStatus::Completed),
4602                )),
4603                cx,
4604            );
4605
4606            // The update should succeed (not return an error)
4607            assert!(result.is_ok());
4608
4609            // There should now be exactly one entry in the thread
4610            assert_eq!(thread.entries.len(), 1);
4611
4612            // The entry should be a failed tool call
4613            if let AgentThreadEntry::ToolCall(tool_call) = &thread.entries[0] {
4614                assert_eq!(tool_call.id, nonexistent_id);
4615                assert!(matches!(tool_call.status, ToolCallStatus::Failed));
4616                assert_eq!(tool_call.kind, acp::ToolKind::Fetch);
4617
4618                // Check that the content contains the error message
4619                assert_eq!(tool_call.content.len(), 1);
4620                if let ToolCallContent::ContentBlock(content_block) = &tool_call.content[0] {
4621                    match content_block {
4622                        ContentBlock::Markdown { markdown } => {
4623                            let markdown_text = markdown.read(cx).source();
4624                            assert!(markdown_text.contains("Tool call not found"));
4625                        }
4626                        ContentBlock::Empty => panic!("Expected markdown content, got empty"),
4627                        ContentBlock::ResourceLink { .. } => {
4628                            panic!("Expected markdown content, got resource link")
4629                        }
4630                        ContentBlock::Image { .. } => {
4631                            panic!("Expected markdown content, got image")
4632                        }
4633                    }
4634                } else {
4635                    panic!("Expected ContentBlock, got: {:?}", tool_call.content[0]);
4636                }
4637            } else {
4638                panic!("Expected ToolCall entry, got: {:?}", thread.entries[0]);
4639            }
4640        });
4641    }
4642
4643    /// Tests that restoring a checkpoint properly cleans up terminals that were
4644    /// created after that checkpoint, and cancels any in-progress generation.
4645    ///
4646    /// Reproduces issue #35142: When a checkpoint is restored, any terminal processes
4647    /// that were started after that checkpoint should be terminated, and any in-progress
4648    /// AI generation should be canceled.
4649    #[gpui::test]
4650    async fn test_restore_checkpoint_kills_terminal(cx: &mut TestAppContext) {
4651        init_test(cx);
4652
4653        let fs = FakeFs::new(cx.executor());
4654        let project = Project::test(fs, [], cx).await;
4655        let connection = Rc::new(FakeAgentConnection::new());
4656        let thread = cx
4657            .update(|cx| {
4658                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4659            })
4660            .await
4661            .unwrap();
4662
4663        // Send first user message to create a checkpoint
4664        cx.update(|cx| {
4665            thread.update(cx, |thread, cx| {
4666                thread.send(vec!["first message".into()], cx)
4667            })
4668        })
4669        .await
4670        .unwrap();
4671
4672        // Send second message (creates another checkpoint) - we'll restore to this one
4673        cx.update(|cx| {
4674            thread.update(cx, |thread, cx| {
4675                thread.send(vec!["second message".into()], cx)
4676            })
4677        })
4678        .await
4679        .unwrap();
4680
4681        // Create 2 terminals BEFORE the checkpoint that have completed running
4682        let terminal_id_1 = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
4683        let mock_terminal_1 = cx.new(|cx| {
4684            let builder = ::terminal::TerminalBuilder::new_display_only(
4685                ::terminal::terminal_settings::CursorShape::default(),
4686                ::terminal::terminal_settings::AlternateScroll::On,
4687                None,
4688                0,
4689                cx.background_executor(),
4690                PathStyle::local(),
4691            )
4692            .unwrap();
4693            builder.subscribe(cx)
4694        });
4695
4696        thread.update(cx, |thread, cx| {
4697            thread.on_terminal_provider_event(
4698                TerminalProviderEvent::Created {
4699                    terminal_id: terminal_id_1.clone(),
4700                    label: "echo 'first'".to_string(),
4701                    cwd: Some(PathBuf::from("/test")),
4702                    output_byte_limit: None,
4703                    terminal: mock_terminal_1.clone(),
4704                },
4705                cx,
4706            );
4707        });
4708
4709        thread.update(cx, |thread, cx| {
4710            thread.on_terminal_provider_event(
4711                TerminalProviderEvent::Output {
4712                    terminal_id: terminal_id_1.clone(),
4713                    data: b"first\n".to_vec(),
4714                },
4715                cx,
4716            );
4717        });
4718
4719        thread.update(cx, |thread, cx| {
4720            thread.on_terminal_provider_event(
4721                TerminalProviderEvent::Exit {
4722                    terminal_id: terminal_id_1.clone(),
4723                    status: acp::TerminalExitStatus::new().exit_code(0),
4724                },
4725                cx,
4726            );
4727        });
4728
4729        let terminal_id_2 = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
4730        let mock_terminal_2 = cx.new(|cx| {
4731            let builder = ::terminal::TerminalBuilder::new_display_only(
4732                ::terminal::terminal_settings::CursorShape::default(),
4733                ::terminal::terminal_settings::AlternateScroll::On,
4734                None,
4735                0,
4736                cx.background_executor(),
4737                PathStyle::local(),
4738            )
4739            .unwrap();
4740            builder.subscribe(cx)
4741        });
4742
4743        thread.update(cx, |thread, cx| {
4744            thread.on_terminal_provider_event(
4745                TerminalProviderEvent::Created {
4746                    terminal_id: terminal_id_2.clone(),
4747                    label: "echo 'second'".to_string(),
4748                    cwd: Some(PathBuf::from("/test")),
4749                    output_byte_limit: None,
4750                    terminal: mock_terminal_2.clone(),
4751                },
4752                cx,
4753            );
4754        });
4755
4756        thread.update(cx, |thread, cx| {
4757            thread.on_terminal_provider_event(
4758                TerminalProviderEvent::Output {
4759                    terminal_id: terminal_id_2.clone(),
4760                    data: b"second\n".to_vec(),
4761                },
4762                cx,
4763            );
4764        });
4765
4766        thread.update(cx, |thread, cx| {
4767            thread.on_terminal_provider_event(
4768                TerminalProviderEvent::Exit {
4769                    terminal_id: terminal_id_2.clone(),
4770                    status: acp::TerminalExitStatus::new().exit_code(0),
4771                },
4772                cx,
4773            );
4774        });
4775
4776        // Get the second message ID to restore to
4777        let second_message_id = thread.read_with(cx, |thread, _| {
4778            // At this point we have:
4779            // - Index 0: First user message (with checkpoint)
4780            // - Index 1: Second user message (with checkpoint)
4781            // No assistant responses because FakeAgentConnection just returns EndTurn
4782            let AgentThreadEntry::UserMessage(message) = &thread.entries[1] else {
4783                panic!("expected user message at index 1");
4784            };
4785            message.id.clone().unwrap()
4786        });
4787
4788        // Create a terminal AFTER the checkpoint we'll restore to.
4789        // This simulates the AI agent starting a long-running terminal command.
4790        let terminal_id = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
4791        let mock_terminal = cx.new(|cx| {
4792            let builder = ::terminal::TerminalBuilder::new_display_only(
4793                ::terminal::terminal_settings::CursorShape::default(),
4794                ::terminal::terminal_settings::AlternateScroll::On,
4795                None,
4796                0,
4797                cx.background_executor(),
4798                PathStyle::local(),
4799            )
4800            .unwrap();
4801            builder.subscribe(cx)
4802        });
4803
4804        // Register the terminal as created
4805        thread.update(cx, |thread, cx| {
4806            thread.on_terminal_provider_event(
4807                TerminalProviderEvent::Created {
4808                    terminal_id: terminal_id.clone(),
4809                    label: "sleep 1000".to_string(),
4810                    cwd: Some(PathBuf::from("/test")),
4811                    output_byte_limit: None,
4812                    terminal: mock_terminal.clone(),
4813                },
4814                cx,
4815            );
4816        });
4817
4818        // Simulate the terminal producing output (still running)
4819        thread.update(cx, |thread, cx| {
4820            thread.on_terminal_provider_event(
4821                TerminalProviderEvent::Output {
4822                    terminal_id: terminal_id.clone(),
4823                    data: b"terminal is running...\n".to_vec(),
4824                },
4825                cx,
4826            );
4827        });
4828
4829        // Create a tool call entry that references this terminal
4830        // This represents the agent requesting a terminal command
4831        thread.update(cx, |thread, cx| {
4832            thread
4833                .handle_session_update(
4834                    acp::SessionUpdate::ToolCall(
4835                        acp::ToolCall::new("terminal-tool-1", "Running command")
4836                            .kind(acp::ToolKind::Execute)
4837                            .status(acp::ToolCallStatus::InProgress)
4838                            .content(vec![acp::ToolCallContent::Terminal(acp::Terminal::new(
4839                                terminal_id.clone(),
4840                            ))])
4841                            .raw_input(serde_json::json!({"command": "sleep 1000", "cd": "/test"})),
4842                    ),
4843                    cx,
4844                )
4845                .unwrap();
4846        });
4847
4848        // Verify terminal exists and is in the thread
4849        let terminal_exists_before =
4850            thread.read_with(cx, |thread, _| thread.terminals.contains_key(&terminal_id));
4851        assert!(
4852            terminal_exists_before,
4853            "Terminal should exist before checkpoint restore"
4854        );
4855
4856        // Verify the terminal's underlying task is still running (not completed)
4857        let terminal_running_before = thread.read_with(cx, |thread, _cx| {
4858            let terminal_entity = thread.terminals.get(&terminal_id).unwrap();
4859            terminal_entity.read_with(cx, |term, _cx| {
4860                term.output().is_none() // output is None means it's still running
4861            })
4862        });
4863        assert!(
4864            terminal_running_before,
4865            "Terminal should be running before checkpoint restore"
4866        );
4867
4868        // Verify we have the expected entries before restore
4869        let entry_count_before = thread.read_with(cx, |thread, _| thread.entries.len());
4870        assert!(
4871            entry_count_before > 1,
4872            "Should have multiple entries before restore"
4873        );
4874
4875        // Restore the checkpoint to the second message.
4876        // This should:
4877        // 1. Cancel any in-progress generation (via the cancel() call)
4878        // 2. Remove the terminal that was created after that point
4879        thread
4880            .update(cx, |thread, cx| {
4881                thread.restore_checkpoint(second_message_id, cx)
4882            })
4883            .await
4884            .unwrap();
4885
4886        // Verify that no send_task is in progress after restore
4887        // (cancel() clears the send_task)
4888        let has_send_task_after = thread.read_with(cx, |thread, _| thread.running_turn.is_some());
4889        assert!(
4890            !has_send_task_after,
4891            "Should not have a send_task after restore (cancel should have cleared it)"
4892        );
4893
4894        // Verify the entries were truncated (restoring to index 1 truncates at 1, keeping only index 0)
4895        let entry_count = thread.read_with(cx, |thread, _| thread.entries.len());
4896        assert_eq!(
4897            entry_count, 1,
4898            "Should have 1 entry after restore (only the first user message)"
4899        );
4900
4901        // Verify the 2 completed terminals from before the checkpoint still exist
4902        let terminal_1_exists = thread.read_with(cx, |thread, _| {
4903            thread.terminals.contains_key(&terminal_id_1)
4904        });
4905        assert!(
4906            terminal_1_exists,
4907            "Terminal 1 (from before checkpoint) should still exist"
4908        );
4909
4910        let terminal_2_exists = thread.read_with(cx, |thread, _| {
4911            thread.terminals.contains_key(&terminal_id_2)
4912        });
4913        assert!(
4914            terminal_2_exists,
4915            "Terminal 2 (from before checkpoint) should still exist"
4916        );
4917
4918        // Verify they're still in completed state
4919        let terminal_1_completed = thread.read_with(cx, |thread, _cx| {
4920            let terminal_entity = thread.terminals.get(&terminal_id_1).unwrap();
4921            terminal_entity.read_with(cx, |term, _cx| term.output().is_some())
4922        });
4923        assert!(terminal_1_completed, "Terminal 1 should still be completed");
4924
4925        let terminal_2_completed = thread.read_with(cx, |thread, _cx| {
4926            let terminal_entity = thread.terminals.get(&terminal_id_2).unwrap();
4927            terminal_entity.read_with(cx, |term, _cx| term.output().is_some())
4928        });
4929        assert!(terminal_2_completed, "Terminal 2 should still be completed");
4930
4931        // Verify the running terminal (created after checkpoint) was removed
4932        let terminal_3_exists =
4933            thread.read_with(cx, |thread, _| thread.terminals.contains_key(&terminal_id));
4934        assert!(
4935            !terminal_3_exists,
4936            "Terminal 3 (created after checkpoint) should have been removed"
4937        );
4938
4939        // Verify total count is 2 (the two from before the checkpoint)
4940        let terminal_count = thread.read_with(cx, |thread, _| thread.terminals.len());
4941        assert_eq!(
4942            terminal_count, 2,
4943            "Should have exactly 2 terminals (the completed ones from before checkpoint)"
4944        );
4945    }
4946
4947    /// Tests that update_last_checkpoint correctly updates the original message's checkpoint
4948    /// even when a new user message is added while the async checkpoint comparison is in progress.
4949    ///
4950    /// This is a regression test for a bug where update_last_checkpoint would fail with
4951    /// "no checkpoint" if a new user message (without a checkpoint) was added between when
4952    /// update_last_checkpoint started and when its async closure ran.
4953    #[gpui::test]
4954    async fn test_update_last_checkpoint_with_new_message_added(cx: &mut TestAppContext) {
4955        init_test(cx);
4956
4957        let fs = FakeFs::new(cx.executor());
4958        fs.insert_tree(path!("/test"), json!({".git": {}, "file.txt": "content"}))
4959            .await;
4960        let project = Project::test(fs.clone(), [Path::new(path!("/test"))], cx).await;
4961
4962        let handler_done = Arc::new(AtomicBool::new(false));
4963        let handler_done_clone = handler_done.clone();
4964        let connection = Rc::new(FakeAgentConnection::new().on_user_message(
4965            move |_, _thread, _cx| {
4966                handler_done_clone.store(true, SeqCst);
4967                async move { Ok(acp::PromptResponse::new(acp::StopReason::EndTurn)) }.boxed_local()
4968            },
4969        ));
4970
4971        let thread = cx
4972            .update(|cx| {
4973                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4974            })
4975            .await
4976            .unwrap();
4977
4978        let send_future = thread.update(cx, |thread, cx| thread.send_raw("First message", cx));
4979        let send_task = cx.background_executor.spawn(send_future);
4980
4981        // Tick until handler completes, then a few more to let update_last_checkpoint start
4982        while !handler_done.load(SeqCst) {
4983            cx.executor().tick();
4984        }
4985        for _ in 0..5 {
4986            cx.executor().tick();
4987        }
4988
4989        thread.update(cx, |thread, cx| {
4990            thread.push_entry(
4991                AgentThreadEntry::UserMessage(UserMessage {
4992                    id: Some(UserMessageId::new()),
4993                    content: ContentBlock::Empty,
4994                    chunks: vec!["Injected message (no checkpoint)".into()],
4995                    checkpoint: None,
4996                    indented: false,
4997                }),
4998                cx,
4999            );
5000        });
5001
5002        cx.run_until_parked();
5003        let result = send_task.await;
5004
5005        assert!(
5006            result.is_ok(),
5007            "send should succeed even when new message added during update_last_checkpoint: {:?}",
5008            result.err()
5009        );
5010    }
5011
5012    /// Tests that when a follow-up message is sent during generation,
5013    /// the first turn completing does NOT clear `running_turn` because
5014    /// it now belongs to the second turn.
5015    #[gpui::test]
5016    async fn test_follow_up_message_during_generation_does_not_clear_turn(cx: &mut TestAppContext) {
5017        init_test(cx);
5018
5019        let fs = FakeFs::new(cx.executor());
5020        let project = Project::test(fs, [], cx).await;
5021
5022        // First handler waits for this signal before completing
5023        let (first_complete_tx, first_complete_rx) = futures::channel::oneshot::channel::<()>();
5024        let first_complete_rx = RefCell::new(Some(first_complete_rx));
5025
5026        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
5027            move |params, _thread, _cx| {
5028                let first_complete_rx = first_complete_rx.borrow_mut().take();
5029                let is_first = params
5030                    .prompt
5031                    .iter()
5032                    .any(|c| matches!(c, acp::ContentBlock::Text(t) if t.text.contains("first")));
5033
5034                async move {
5035                    if is_first {
5036                        // First handler waits until signaled
5037                        if let Some(rx) = first_complete_rx {
5038                            rx.await.ok();
5039                        }
5040                    }
5041                    Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
5042                }
5043                .boxed_local()
5044            }
5045        }));
5046
5047        let thread = cx
5048            .update(|cx| {
5049                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
5050            })
5051            .await
5052            .unwrap();
5053
5054        // Send first message (turn_id=1) - handler will block
5055        let first_request = thread.update(cx, |thread, cx| thread.send_raw("first", cx));
5056        assert_eq!(thread.read_with(cx, |t, _| t.turn_id), 1);
5057
5058        // Send second message (turn_id=2) while first is still blocked
5059        // This calls cancel() which takes turn 1's running_turn and sets turn 2's
5060        let second_request = thread.update(cx, |thread, cx| thread.send_raw("second", cx));
5061        assert_eq!(thread.read_with(cx, |t, _| t.turn_id), 2);
5062
5063        let running_turn_after_second_send =
5064            thread.read_with(cx, |thread, _| thread.running_turn.as_ref().map(|t| t.id));
5065        assert_eq!(
5066            running_turn_after_second_send,
5067            Some(2),
5068            "running_turn should be set to turn 2 after sending second message"
5069        );
5070
5071        // Now signal first handler to complete
5072        first_complete_tx.send(()).ok();
5073
5074        // First request completes - should NOT clear running_turn
5075        // because running_turn now belongs to turn 2
5076        first_request.await.unwrap();
5077
5078        let running_turn_after_first =
5079            thread.read_with(cx, |thread, _| thread.running_turn.as_ref().map(|t| t.id));
5080        assert_eq!(
5081            running_turn_after_first,
5082            Some(2),
5083            "first turn completing should not clear running_turn (belongs to turn 2)"
5084        );
5085
5086        // Second request completes - SHOULD clear running_turn
5087        second_request.await.unwrap();
5088
5089        let running_turn_after_second =
5090            thread.read_with(cx, |thread, _| thread.running_turn.is_some());
5091        assert!(
5092            !running_turn_after_second,
5093            "second turn completing should clear running_turn"
5094        );
5095    }
5096
5097    #[gpui::test]
5098    async fn test_send_assigns_message_id_without_truncate_support(cx: &mut TestAppContext) {
5099        init_test(cx);
5100
5101        let fs = FakeFs::new(cx.executor());
5102        let project = Project::test(fs, [], cx).await;
5103
5104        let connection = Rc::new(FakeAgentConnection::new().without_truncate_support());
5105        let thread = cx
5106            .update(|cx| {
5107                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
5108            })
5109            .await
5110            .unwrap();
5111
5112        let response = thread
5113            .update(cx, |thread, cx| thread.send_raw("test message", cx))
5114            .await;
5115
5116        assert!(response.is_ok(), "send should not fail: {response:?}");
5117        thread.read_with(cx, |thread, _| {
5118            let AgentThreadEntry::UserMessage(message) = &thread.entries[0] else {
5119                panic!("expected first entry to be a user message")
5120            };
5121            assert!(
5122                message.id.is_some(),
5123                "user message should always have an id"
5124            );
5125        });
5126    }
5127
5128    #[gpui::test]
5129    async fn test_send_returns_cancelled_response_and_marks_tools_as_cancelled(
5130        cx: &mut TestAppContext,
5131    ) {
5132        init_test(cx);
5133
5134        let fs = FakeFs::new(cx.executor());
5135        let project = Project::test(fs, [], cx).await;
5136
5137        let connection = Rc::new(FakeAgentConnection::new().on_user_message(
5138            move |_params, thread, mut cx| {
5139                async move {
5140                    thread
5141                        .update(&mut cx, |thread, cx| {
5142                            thread.handle_session_update(
5143                                acp::SessionUpdate::ToolCall(
5144                                    acp::ToolCall::new(
5145                                        acp::ToolCallId::new("test-tool"),
5146                                        "Test Tool",
5147                                    )
5148                                    .kind(acp::ToolKind::Fetch)
5149                                    .status(acp::ToolCallStatus::InProgress),
5150                                ),
5151                                cx,
5152                            )
5153                        })
5154                        .unwrap()
5155                        .unwrap();
5156
5157                    Ok(acp::PromptResponse::new(acp::StopReason::Cancelled))
5158                }
5159                .boxed_local()
5160            },
5161        ));
5162
5163        let thread = cx
5164            .update(|cx| {
5165                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
5166            })
5167            .await
5168            .unwrap();
5169
5170        let response = thread
5171            .update(cx, |thread, cx| thread.send_raw("test message", cx))
5172            .await;
5173
5174        let response = response
5175            .expect("send should succeed")
5176            .expect("should have response");
5177        assert_eq!(
5178            response.stop_reason,
5179            acp::StopReason::Cancelled,
5180            "response should have Cancelled stop_reason"
5181        );
5182
5183        thread.read_with(cx, |thread, _| {
5184            let tool_entry = thread
5185                .entries
5186                .iter()
5187                .find_map(|e| {
5188                    if let AgentThreadEntry::ToolCall(call) = e {
5189                        Some(call)
5190                    } else {
5191                        None
5192                    }
5193                })
5194                .expect("should have tool call entry");
5195
5196            assert!(
5197                matches!(tool_entry.status, ToolCallStatus::Canceled),
5198                "tool should be marked as Canceled when response is Cancelled, got {:?}",
5199                tool_entry.status
5200            );
5201        });
5202    }
5203
5204    #[gpui::test]
5205    async fn test_provisional_title_replaced_by_real_title(cx: &mut TestAppContext) {
5206        init_test(cx);
5207
5208        let fs = FakeFs::new(cx.executor());
5209        let project = Project::test(fs, [], cx).await;
5210        let connection = Rc::new(FakeAgentConnection::new());
5211        let set_title_calls = connection.set_title_calls.clone();
5212
5213        let thread = cx
5214            .update(|cx| {
5215                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
5216            })
5217            .await
5218            .unwrap();
5219
5220        // Initial title is the default.
5221        thread.read_with(cx, |thread, _| {
5222            assert_eq!(thread.title(), None);
5223        });
5224
5225        // Setting a provisional title updates the display title.
5226        thread.update(cx, |thread, cx| {
5227            thread.set_provisional_title("Hello, can you help…".into(), cx);
5228        });
5229        thread.read_with(cx, |thread, _| {
5230            assert_eq!(
5231                thread.title().as_ref().map(|s| s.as_str()),
5232                Some("Hello, can you help…")
5233            );
5234        });
5235
5236        // The provisional title should NOT have propagated to the connection.
5237        assert_eq!(
5238            set_title_calls.borrow().len(),
5239            0,
5240            "provisional title should not propagate to the connection"
5241        );
5242
5243        // When the real title arrives via set_title, it replaces the
5244        // provisional title and propagates to the connection.
5245        let task = thread.update(cx, |thread, cx| {
5246            thread.set_title("Helping with Rust question".into(), cx)
5247        });
5248        task.await.expect("set_title should succeed");
5249        thread.read_with(cx, |thread, _| {
5250            assert_eq!(
5251                thread.title().as_ref().map(|s| s.as_str()),
5252                Some("Helping with Rust question")
5253            );
5254        });
5255        assert_eq!(
5256            set_title_calls.borrow().as_slice(),
5257            &[SharedString::from("Helping with Rust question")],
5258            "real title should propagate to the connection"
5259        );
5260    }
5261
5262    #[gpui::test]
5263    async fn test_session_info_update_replaces_provisional_title_and_emits_event(
5264        cx: &mut TestAppContext,
5265    ) {
5266        init_test(cx);
5267
5268        let fs = FakeFs::new(cx.executor());
5269        let project = Project::test(fs, [], cx).await;
5270        let connection = Rc::new(FakeAgentConnection::new());
5271
5272        let thread = cx
5273            .update(|cx| {
5274                connection.clone().new_session(
5275                    project,
5276                    PathList::new(&[Path::new(path!("/test"))]),
5277                    cx,
5278                )
5279            })
5280            .await
5281            .unwrap();
5282
5283        let title_updated_events = Rc::new(RefCell::new(0usize));
5284        let title_updated_events_for_subscription = title_updated_events.clone();
5285        thread.update(cx, |_thread, cx| {
5286            cx.subscribe(
5287                &thread,
5288                move |_thread, _event_thread, event: &AcpThreadEvent, _cx| {
5289                    if matches!(event, AcpThreadEvent::TitleUpdated) {
5290                        *title_updated_events_for_subscription.borrow_mut() += 1;
5291                    }
5292                },
5293            )
5294            .detach();
5295        });
5296
5297        thread.update(cx, |thread, cx| {
5298            thread.set_provisional_title("Hello, can you help…".into(), cx);
5299        });
5300        assert_eq!(
5301            *title_updated_events.borrow(),
5302            1,
5303            "setting a provisional title should emit TitleUpdated"
5304        );
5305
5306        let result = thread.update(cx, |thread, cx| {
5307            thread.handle_session_update(
5308                acp::SessionUpdate::SessionInfoUpdate(
5309                    acp::SessionInfoUpdate::new().title("Helping with Rust question"),
5310                ),
5311                cx,
5312            )
5313        });
5314        result.expect("session info update should succeed");
5315
5316        thread.read_with(cx, |thread, _| {
5317            assert_eq!(
5318                thread.title().as_ref().map(|s| s.as_str()),
5319                Some("Helping with Rust question")
5320            );
5321            assert!(
5322                !thread.has_provisional_title(),
5323                "session info title update should clear provisional title"
5324            );
5325        });
5326
5327        assert_eq!(
5328            *title_updated_events.borrow(),
5329            2,
5330            "session info title update should emit TitleUpdated"
5331        );
5332        assert!(
5333            connection.set_title_calls.borrow().is_empty(),
5334            "session info title update should not propagate back to the connection"
5335        );
5336    }
5337
5338    #[gpui::test]
5339    async fn test_usage_update_populates_token_usage_and_cost(cx: &mut TestAppContext) {
5340        init_test(cx);
5341
5342        let fs = FakeFs::new(cx.executor());
5343        let project = Project::test(fs, [], cx).await;
5344        let connection = Rc::new(FakeAgentConnection::new());
5345        let thread = cx
5346            .update(|cx| {
5347                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
5348            })
5349            .await
5350            .unwrap();
5351
5352        thread.update(cx, |thread, cx| {
5353            thread
5354                .handle_session_update(
5355                    acp::SessionUpdate::UsageUpdate(
5356                        acp::UsageUpdate::new(5000, 10000).cost(acp::Cost::new(0.42, "USD")),
5357                    ),
5358                    cx,
5359                )
5360                .unwrap();
5361        });
5362
5363        thread.read_with(cx, |thread, _| {
5364            let usage = thread.token_usage().expect("token_usage should be set");
5365            assert_eq!(usage.max_tokens, 10000);
5366            assert_eq!(usage.used_tokens, 5000);
5367
5368            let cost = thread.cost().expect("cost should be set");
5369            assert!((cost.amount - 0.42).abs() < f64::EPSILON);
5370            assert_eq!(cost.currency.as_ref(), "USD");
5371        });
5372    }
5373
5374    #[gpui::test]
5375    async fn test_usage_update_without_cost_preserves_existing_cost(cx: &mut TestAppContext) {
5376        init_test(cx);
5377
5378        let fs = FakeFs::new(cx.executor());
5379        let project = Project::test(fs, [], cx).await;
5380        let connection = Rc::new(FakeAgentConnection::new());
5381        let thread = cx
5382            .update(|cx| {
5383                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
5384            })
5385            .await
5386            .unwrap();
5387
5388        thread.update(cx, |thread, cx| {
5389            thread
5390                .handle_session_update(
5391                    acp::SessionUpdate::UsageUpdate(
5392                        acp::UsageUpdate::new(1000, 10000).cost(acp::Cost::new(0.10, "USD")),
5393                    ),
5394                    cx,
5395                )
5396                .unwrap();
5397
5398            thread
5399                .handle_session_update(
5400                    acp::SessionUpdate::UsageUpdate(acp::UsageUpdate::new(2000, 10000)),
5401                    cx,
5402                )
5403                .unwrap();
5404        });
5405
5406        thread.read_with(cx, |thread, _| {
5407            let usage = thread.token_usage().expect("token_usage should be set");
5408            assert_eq!(usage.used_tokens, 2000);
5409
5410            let cost = thread.cost().expect("cost should be preserved");
5411            assert!((cost.amount - 0.10).abs() < f64::EPSILON);
5412        });
5413    }
5414
5415    #[gpui::test]
5416    async fn test_response_usage_does_not_clobber_session_usage(cx: &mut TestAppContext) {
5417        init_test(cx);
5418
5419        let fs = FakeFs::new(cx.executor());
5420        let project = Project::test(fs, [], cx).await;
5421        let connection = Rc::new(FakeAgentConnection::new().on_user_message(
5422            move |_, thread, mut cx| {
5423                async move {
5424                    thread.update(&mut cx, |thread, cx| {
5425                        thread
5426                            .handle_session_update(
5427                                acp::SessionUpdate::UsageUpdate(
5428                                    acp::UsageUpdate::new(3000, 10000)
5429                                        .cost(acp::Cost::new(0.05, "EUR")),
5430                                ),
5431                                cx,
5432                            )
5433                            .unwrap();
5434                    })?;
5435                    Ok(acp::PromptResponse::new(acp::StopReason::EndTurn)
5436                        .usage(acp::Usage::new(500, 200, 300)))
5437                }
5438                .boxed_local()
5439            },
5440        ));
5441
5442        let thread = cx
5443            .update(|cx| {
5444                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
5445            })
5446            .await
5447            .unwrap();
5448
5449        thread
5450            .update(cx, |thread, cx| thread.send_raw("hello", cx))
5451            .await
5452            .unwrap();
5453
5454        thread.read_with(cx, |thread, _| {
5455            let usage = thread.token_usage().expect("token_usage should be set");
5456            assert_eq!(usage.max_tokens, 10000, "max_tokens from UsageUpdate");
5457            assert_eq!(usage.used_tokens, 3000, "used_tokens from UsageUpdate");
5458            assert_eq!(usage.input_tokens, 200, "input_tokens from response usage");
5459            assert_eq!(
5460                usage.output_tokens, 300,
5461                "output_tokens from response usage"
5462            );
5463
5464            let cost = thread.cost().expect("cost should be set");
5465            assert!((cost.amount - 0.05).abs() < f64::EPSILON);
5466            assert_eq!(cost.currency.as_ref(), "EUR");
5467        });
5468    }
5469
5470    #[gpui::test]
5471    async fn test_clearing_token_usage_also_clears_cost(cx: &mut TestAppContext) {
5472        init_test(cx);
5473
5474        let fs = FakeFs::new(cx.executor());
5475        let project = Project::test(fs, [], cx).await;
5476        let connection = Rc::new(FakeAgentConnection::new());
5477        let thread = cx
5478            .update(|cx| {
5479                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
5480            })
5481            .await
5482            .unwrap();
5483
5484        thread.update(cx, |thread, cx| {
5485            thread
5486                .handle_session_update(
5487                    acp::SessionUpdate::UsageUpdate(
5488                        acp::UsageUpdate::new(1000, 10000).cost(acp::Cost::new(0.25, "USD")),
5489                    ),
5490                    cx,
5491                )
5492                .unwrap();
5493
5494            assert!(thread.token_usage().is_some());
5495            assert!(thread.cost().is_some());
5496
5497            thread.update_token_usage(None, cx);
5498
5499            assert!(thread.token_usage().is_none());
5500            assert!(
5501                thread.cost().is_none(),
5502                "cost should be cleared when token usage is cleared"
5503            );
5504        });
5505    }
5506}