acp_thread.rs

   1mod connection;
   2mod diff;
   3mod mention;
   4mod terminal;
   5use action_log::{ActionLog, ActionLogTelemetry};
   6use agent_client_protocol::{self as acp};
   7use anyhow::{Context as _, Result, anyhow};
   8use collections::HashSet;
   9pub use connection::*;
  10pub use diff::*;
  11use feature_flags::{AcpBetaFeatureFlag, FeatureFlagAppExt as _};
  12use futures::{FutureExt, channel::oneshot, future::BoxFuture};
  13use gpui::{AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Task, WeakEntity};
  14use itertools::Itertools;
  15use language::language_settings::FormatOnSave;
  16use language::{Anchor, Buffer, BufferSnapshot, LanguageRegistry, Point, ToPoint, text_diff};
  17use markdown::Markdown;
  18pub use mention::*;
  19use project::lsp_store::{FormatTrigger, LspFormatTarget};
  20use project::{AgentLocation, Project, git_store::GitStoreCheckpoint};
  21use serde::{Deserialize, Serialize};
  22use serde_json::to_string_pretty;
  23use std::collections::HashMap;
  24use std::error::Error;
  25use std::fmt::{Formatter, Write};
  26use std::ops::Range;
  27use std::process::ExitStatus;
  28use std::rc::Rc;
  29use std::time::{Duration, Instant};
  30use std::{fmt::Display, mem, path::PathBuf, sync::Arc};
  31use task::{Shell, ShellBuilder};
  32pub use terminal::*;
  33use text::Bias;
  34use ui::App;
  35use util::markdown::MarkdownEscaped;
  36use util::path_list::PathList;
  37use util::{ResultExt, get_default_system_shell_preferring_bash, paths::PathStyle};
  38use uuid::Uuid;
  39
  40/// Returned when the model stops because it exhausted its output token budget.
  41#[derive(Debug)]
  42pub struct MaxOutputTokensError;
  43
  44impl std::fmt::Display for MaxOutputTokensError {
  45    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
  46        write!(f, "output token limit reached")
  47    }
  48}
  49
  50impl std::error::Error for MaxOutputTokensError {}
  51
  52/// Key used in ACP ToolCall meta to store the tool's programmatic name.
  53/// This is a workaround since ACP's ToolCall doesn't have a dedicated name field.
  54pub const TOOL_NAME_META_KEY: &str = "tool_name";
  55
  56/// Helper to extract tool name from ACP meta
  57pub fn tool_name_from_meta(meta: &Option<acp::Meta>) -> Option<SharedString> {
  58    meta.as_ref()
  59        .and_then(|m| m.get(TOOL_NAME_META_KEY))
  60        .and_then(|v| v.as_str())
  61        .map(|s| SharedString::from(s.to_owned()))
  62}
  63
  64/// Helper to create meta with tool name
  65pub fn meta_with_tool_name(tool_name: &str) -> acp::Meta {
  66    acp::Meta::from_iter([(TOOL_NAME_META_KEY.into(), tool_name.into())])
  67}
  68
  69/// Key used in ACP ToolCall meta to store the session id and message indexes
  70pub const SUBAGENT_SESSION_INFO_META_KEY: &str = "subagent_session_info";
  71
  72#[derive(Clone, Debug, Deserialize, Serialize)]
  73pub struct SubagentSessionInfo {
  74    /// The session id of the subagent sessiont that was spawned
  75    pub session_id: acp::SessionId,
  76    /// The index of the message of the start of the "turn" run by this tool call
  77    pub message_start_index: usize,
  78    /// The index of the output of the message that the subagent has returned
  79    #[serde(skip_serializing_if = "Option::is_none")]
  80    pub message_end_index: Option<usize>,
  81}
  82
  83/// Helper to extract subagent session id from ACP meta
  84pub fn subagent_session_info_from_meta(meta: &Option<acp::Meta>) -> Option<SubagentSessionInfo> {
  85    meta.as_ref()
  86        .and_then(|m| m.get(SUBAGENT_SESSION_INFO_META_KEY))
  87        .and_then(|v| serde_json::from_value(v.clone()).ok())
  88}
  89
  90#[derive(Debug)]
  91pub struct UserMessage {
  92    pub id: Option<UserMessageId>,
  93    pub content: ContentBlock,
  94    pub chunks: Vec<acp::ContentBlock>,
  95    pub checkpoint: Option<Checkpoint>,
  96    pub indented: bool,
  97}
  98
  99#[derive(Debug)]
 100pub struct Checkpoint {
 101    git_checkpoint: GitStoreCheckpoint,
 102    pub show: bool,
 103}
 104
 105impl UserMessage {
 106    fn to_markdown(&self, cx: &App) -> String {
 107        let mut markdown = String::new();
 108        if self
 109            .checkpoint
 110            .as_ref()
 111            .is_some_and(|checkpoint| checkpoint.show)
 112        {
 113            writeln!(markdown, "## User (checkpoint)").unwrap();
 114        } else {
 115            writeln!(markdown, "## User").unwrap();
 116        }
 117        writeln!(markdown).unwrap();
 118        writeln!(markdown, "{}", self.content.to_markdown(cx)).unwrap();
 119        writeln!(markdown).unwrap();
 120        markdown
 121    }
 122}
 123
 124#[derive(Debug, PartialEq)]
 125pub struct AssistantMessage {
 126    pub chunks: Vec<AssistantMessageChunk>,
 127    pub indented: bool,
 128    pub is_subagent_output: bool,
 129}
 130
 131impl AssistantMessage {
 132    pub fn to_markdown(&self, cx: &App) -> String {
 133        format!(
 134            "## Assistant\n\n{}\n\n",
 135            self.chunks
 136                .iter()
 137                .map(|chunk| chunk.to_markdown(cx))
 138                .join("\n\n")
 139        )
 140    }
 141}
 142
 143#[derive(Debug, PartialEq)]
 144pub enum AssistantMessageChunk {
 145    Message { block: ContentBlock },
 146    Thought { block: ContentBlock },
 147}
 148
 149impl AssistantMessageChunk {
 150    pub fn from_str(
 151        chunk: &str,
 152        language_registry: &Arc<LanguageRegistry>,
 153        path_style: PathStyle,
 154        cx: &mut App,
 155    ) -> Self {
 156        Self::Message {
 157            block: ContentBlock::new(chunk.into(), language_registry, path_style, cx),
 158        }
 159    }
 160
 161    fn to_markdown(&self, cx: &App) -> String {
 162        match self {
 163            Self::Message { block } => block.to_markdown(cx).to_string(),
 164            Self::Thought { block } => {
 165                format!("<thinking>\n{}\n</thinking>", block.to_markdown(cx))
 166            }
 167        }
 168    }
 169}
 170
 171#[derive(Debug)]
 172pub enum AgentThreadEntry {
 173    UserMessage(UserMessage),
 174    AssistantMessage(AssistantMessage),
 175    ToolCall(ToolCall),
 176    CompletedPlan(Vec<PlanEntry>),
 177}
 178
 179impl AgentThreadEntry {
 180    pub fn is_indented(&self) -> bool {
 181        match self {
 182            Self::UserMessage(message) => message.indented,
 183            Self::AssistantMessage(message) => message.indented,
 184            Self::ToolCall(_) => false,
 185            Self::CompletedPlan(_) => false,
 186        }
 187    }
 188
 189    pub fn to_markdown(&self, cx: &App) -> String {
 190        match self {
 191            Self::UserMessage(message) => message.to_markdown(cx),
 192            Self::AssistantMessage(message) => message.to_markdown(cx),
 193            Self::ToolCall(tool_call) => tool_call.to_markdown(cx),
 194            Self::CompletedPlan(entries) => {
 195                let mut md = String::from("## Plan\n\n");
 196                for entry in entries {
 197                    let source = entry.content.read(cx).source().to_string();
 198                    md.push_str(&format!("- [x] {}\n", source));
 199                }
 200                md
 201            }
 202        }
 203    }
 204
 205    pub fn user_message(&self) -> Option<&UserMessage> {
 206        if let AgentThreadEntry::UserMessage(message) = self {
 207            Some(message)
 208        } else {
 209            None
 210        }
 211    }
 212
 213    pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
 214        if let AgentThreadEntry::ToolCall(call) = self {
 215            itertools::Either::Left(call.diffs())
 216        } else {
 217            itertools::Either::Right(std::iter::empty())
 218        }
 219    }
 220
 221    pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
 222        if let AgentThreadEntry::ToolCall(call) = self {
 223            itertools::Either::Left(call.terminals())
 224        } else {
 225            itertools::Either::Right(std::iter::empty())
 226        }
 227    }
 228
 229    pub fn location(&self, ix: usize) -> Option<(acp::ToolCallLocation, AgentLocation)> {
 230        if let AgentThreadEntry::ToolCall(ToolCall {
 231            locations,
 232            resolved_locations,
 233            ..
 234        }) = self
 235        {
 236            Some((
 237                locations.get(ix)?.clone(),
 238                resolved_locations.get(ix)?.clone()?,
 239            ))
 240        } else {
 241            None
 242        }
 243    }
 244}
 245
 246#[derive(Debug)]
 247pub struct ToolCall {
 248    pub id: acp::ToolCallId,
 249    pub label: Entity<Markdown>,
 250    pub kind: acp::ToolKind,
 251    pub content: Vec<ToolCallContent>,
 252    pub status: ToolCallStatus,
 253    pub locations: Vec<acp::ToolCallLocation>,
 254    pub resolved_locations: Vec<Option<AgentLocation>>,
 255    pub raw_input: Option<serde_json::Value>,
 256    pub raw_input_markdown: Option<Entity<Markdown>>,
 257    pub raw_output: Option<serde_json::Value>,
 258    pub tool_name: Option<SharedString>,
 259    pub subagent_session_info: Option<SubagentSessionInfo>,
 260}
 261
 262impl ToolCall {
 263    fn from_acp(
 264        tool_call: acp::ToolCall,
 265        status: ToolCallStatus,
 266        language_registry: Arc<LanguageRegistry>,
 267        path_style: PathStyle,
 268        terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
 269        cx: &mut App,
 270    ) -> Result<Self> {
 271        let title = if tool_call.kind == acp::ToolKind::Execute {
 272            tool_call.title
 273        } else if tool_call.kind == acp::ToolKind::Edit {
 274            MarkdownEscaped(tool_call.title.as_str()).to_string()
 275        } else if let Some((first_line, _)) = tool_call.title.split_once("\n") {
 276            first_line.to_owned() + ""
 277        } else {
 278            tool_call.title
 279        };
 280        let mut content = Vec::with_capacity(tool_call.content.len());
 281        for item in tool_call.content {
 282            if let Some(item) = ToolCallContent::from_acp(
 283                item,
 284                language_registry.clone(),
 285                path_style,
 286                terminals,
 287                cx,
 288            )? {
 289                content.push(item);
 290            }
 291        }
 292
 293        let raw_input_markdown = tool_call
 294            .raw_input
 295            .as_ref()
 296            .and_then(|input| markdown_for_raw_output(input, &language_registry, cx));
 297
 298        let tool_name = tool_name_from_meta(&tool_call.meta);
 299
 300        let subagent_session_info = subagent_session_info_from_meta(&tool_call.meta);
 301
 302        let result = Self {
 303            id: tool_call.tool_call_id,
 304            label: cx
 305                .new(|cx| Markdown::new(title.into(), Some(language_registry.clone()), None, cx)),
 306            kind: tool_call.kind,
 307            content,
 308            locations: tool_call.locations,
 309            resolved_locations: Vec::default(),
 310            status,
 311            raw_input: tool_call.raw_input,
 312            raw_input_markdown,
 313            raw_output: tool_call.raw_output,
 314            tool_name,
 315            subagent_session_info,
 316        };
 317        Ok(result)
 318    }
 319
 320    fn update_fields(
 321        &mut self,
 322        fields: acp::ToolCallUpdateFields,
 323        meta: Option<acp::Meta>,
 324        language_registry: Arc<LanguageRegistry>,
 325        path_style: PathStyle,
 326        terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
 327        cx: &mut App,
 328    ) -> Result<()> {
 329        let acp::ToolCallUpdateFields {
 330            kind,
 331            status,
 332            title,
 333            content,
 334            locations,
 335            raw_input,
 336            raw_output,
 337            ..
 338        } = fields;
 339
 340        if let Some(kind) = kind {
 341            self.kind = kind;
 342        }
 343
 344        if let Some(status) = status {
 345            self.status = status.into();
 346        }
 347
 348        if let Some(subagent_session_info) = subagent_session_info_from_meta(&meta) {
 349            self.subagent_session_info = Some(subagent_session_info);
 350        }
 351
 352        if let Some(title) = title {
 353            if self.kind == acp::ToolKind::Execute {
 354                for terminal in self.terminals() {
 355                    terminal.update(cx, |terminal, cx| {
 356                        terminal.update_command_label(&title, cx);
 357                    });
 358                }
 359            }
 360            self.label.update(cx, |label, cx| {
 361                if self.kind == acp::ToolKind::Execute {
 362                    label.replace(title, cx);
 363                } else if self.kind == acp::ToolKind::Edit {
 364                    label.replace(MarkdownEscaped(&title).to_string(), cx)
 365                } else if let Some((first_line, _)) = title.split_once("\n") {
 366                    label.replace(first_line.to_owned() + "", cx);
 367                } else {
 368                    label.replace(title, cx);
 369                }
 370            });
 371        }
 372
 373        if let Some(content) = content {
 374            let mut new_content_len = content.len();
 375            let mut content = content.into_iter();
 376
 377            // Reuse existing content if we can
 378            for (old, new) in self.content.iter_mut().zip(content.by_ref()) {
 379                let valid_content =
 380                    old.update_from_acp(new, language_registry.clone(), path_style, terminals, cx)?;
 381                if !valid_content {
 382                    new_content_len -= 1;
 383                }
 384            }
 385            for new in content {
 386                if let Some(new) = ToolCallContent::from_acp(
 387                    new,
 388                    language_registry.clone(),
 389                    path_style,
 390                    terminals,
 391                    cx,
 392                )? {
 393                    self.content.push(new);
 394                } else {
 395                    new_content_len -= 1;
 396                }
 397            }
 398            self.content.truncate(new_content_len);
 399        }
 400
 401        if let Some(locations) = locations {
 402            self.locations = locations;
 403        }
 404
 405        if let Some(raw_input) = raw_input {
 406            self.raw_input_markdown = markdown_for_raw_output(&raw_input, &language_registry, cx);
 407            self.raw_input = Some(raw_input);
 408        }
 409
 410        if let Some(raw_output) = raw_output {
 411            if self.content.is_empty()
 412                && let Some(markdown) = markdown_for_raw_output(&raw_output, &language_registry, cx)
 413            {
 414                self.content
 415                    .push(ToolCallContent::ContentBlock(ContentBlock::Markdown {
 416                        markdown,
 417                    }));
 418            }
 419            self.raw_output = Some(raw_output);
 420        }
 421        Ok(())
 422    }
 423
 424    pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
 425        self.content.iter().filter_map(|content| match content {
 426            ToolCallContent::Diff(diff) => Some(diff),
 427            ToolCallContent::ContentBlock(_) => None,
 428            ToolCallContent::Terminal(_) => None,
 429        })
 430    }
 431
 432    pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
 433        self.content.iter().filter_map(|content| match content {
 434            ToolCallContent::Terminal(terminal) => Some(terminal),
 435            ToolCallContent::ContentBlock(_) => None,
 436            ToolCallContent::Diff(_) => None,
 437        })
 438    }
 439
 440    pub fn is_subagent(&self) -> bool {
 441        self.tool_name.as_ref().is_some_and(|s| s == "spawn_agent")
 442            || self.subagent_session_info.is_some()
 443    }
 444
 445    pub fn to_markdown(&self, cx: &App) -> String {
 446        let mut markdown = format!(
 447            "**Tool Call: {}**\nStatus: {}\n\n",
 448            self.label.read(cx).source(),
 449            self.status
 450        );
 451        for content in &self.content {
 452            markdown.push_str(content.to_markdown(cx).as_str());
 453            markdown.push_str("\n\n");
 454        }
 455        markdown
 456    }
 457
 458    async fn resolve_location(
 459        location: acp::ToolCallLocation,
 460        project: WeakEntity<Project>,
 461        cx: &mut AsyncApp,
 462    ) -> Option<ResolvedLocation> {
 463        let buffer = project
 464            .update(cx, |project, cx| {
 465                project
 466                    .project_path_for_absolute_path(&location.path, cx)
 467                    .map(|path| project.open_buffer(path, cx))
 468            })
 469            .ok()??;
 470        let buffer = buffer.await.log_err()?;
 471        let position = buffer.update(cx, |buffer, _| {
 472            let snapshot = buffer.snapshot();
 473            if let Some(row) = location.line {
 474                let column = snapshot.indent_size_for_line(row).len;
 475                let point = snapshot.clip_point(Point::new(row, column), Bias::Left);
 476                snapshot.anchor_before(point)
 477            } else {
 478                Anchor::min_for_buffer(snapshot.remote_id())
 479            }
 480        });
 481
 482        Some(ResolvedLocation { buffer, position })
 483    }
 484
 485    fn resolve_locations(
 486        &self,
 487        project: Entity<Project>,
 488        cx: &mut App,
 489    ) -> Task<Vec<Option<ResolvedLocation>>> {
 490        let locations = self.locations.clone();
 491        project.update(cx, |_, cx| {
 492            cx.spawn(async move |project, cx| {
 493                let mut new_locations = Vec::new();
 494                for location in locations {
 495                    new_locations.push(Self::resolve_location(location, project.clone(), cx).await);
 496                }
 497                new_locations
 498            })
 499        })
 500    }
 501}
 502
 503// Separate so we can hold a strong reference to the buffer
 504// for saving on the thread
 505#[derive(Clone, Debug, PartialEq, Eq)]
 506struct ResolvedLocation {
 507    buffer: Entity<Buffer>,
 508    position: Anchor,
 509}
 510
 511impl From<&ResolvedLocation> for AgentLocation {
 512    fn from(value: &ResolvedLocation) -> Self {
 513        Self {
 514            buffer: value.buffer.downgrade(),
 515            position: value.position,
 516        }
 517    }
 518}
 519
 520#[derive(Debug, Clone)]
 521pub enum SelectedPermissionParams {
 522    Terminal { patterns: Vec<String> },
 523}
 524
 525#[derive(Debug)]
 526pub struct SelectedPermissionOutcome {
 527    pub option_id: acp::PermissionOptionId,
 528    pub option_kind: acp::PermissionOptionKind,
 529    pub params: Option<SelectedPermissionParams>,
 530}
 531
 532impl SelectedPermissionOutcome {
 533    pub fn new(option_id: acp::PermissionOptionId, option_kind: acp::PermissionOptionKind) -> Self {
 534        Self {
 535            option_id,
 536            option_kind,
 537            params: None,
 538        }
 539    }
 540
 541    pub fn params(mut self, params: Option<SelectedPermissionParams>) -> Self {
 542        self.params = params;
 543        self
 544    }
 545}
 546
 547impl From<SelectedPermissionOutcome> for acp::SelectedPermissionOutcome {
 548    fn from(value: SelectedPermissionOutcome) -> Self {
 549        Self::new(value.option_id)
 550    }
 551}
 552
 553#[derive(Debug)]
 554pub enum RequestPermissionOutcome {
 555    Cancelled,
 556    Selected(SelectedPermissionOutcome),
 557}
 558
 559impl From<RequestPermissionOutcome> for acp::RequestPermissionOutcome {
 560    fn from(value: RequestPermissionOutcome) -> Self {
 561        match value {
 562            RequestPermissionOutcome::Cancelled => Self::Cancelled,
 563            RequestPermissionOutcome::Selected(outcome) => Self::Selected(outcome.into()),
 564        }
 565    }
 566}
 567
 568#[derive(Debug)]
 569pub enum ToolCallStatus {
 570    /// The tool call hasn't started running yet, but we start showing it to
 571    /// the user.
 572    Pending,
 573    /// The tool call is waiting for confirmation from the user.
 574    WaitingForConfirmation {
 575        options: PermissionOptions,
 576        respond_tx: oneshot::Sender<SelectedPermissionOutcome>,
 577    },
 578    /// The tool call is currently running.
 579    InProgress,
 580    /// The tool call completed successfully.
 581    Completed,
 582    /// The tool call failed.
 583    Failed,
 584    /// The user rejected the tool call.
 585    Rejected,
 586    /// The user canceled generation so the tool call was canceled.
 587    Canceled,
 588}
 589
 590impl From<acp::ToolCallStatus> for ToolCallStatus {
 591    fn from(status: acp::ToolCallStatus) -> Self {
 592        match status {
 593            acp::ToolCallStatus::Pending => Self::Pending,
 594            acp::ToolCallStatus::InProgress => Self::InProgress,
 595            acp::ToolCallStatus::Completed => Self::Completed,
 596            acp::ToolCallStatus::Failed => Self::Failed,
 597            _ => Self::Pending,
 598        }
 599    }
 600}
 601
 602impl Display for ToolCallStatus {
 603    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
 604        write!(
 605            f,
 606            "{}",
 607            match self {
 608                ToolCallStatus::Pending => "Pending",
 609                ToolCallStatus::WaitingForConfirmation { .. } => "Waiting for confirmation",
 610                ToolCallStatus::InProgress => "In Progress",
 611                ToolCallStatus::Completed => "Completed",
 612                ToolCallStatus::Failed => "Failed",
 613                ToolCallStatus::Rejected => "Rejected",
 614                ToolCallStatus::Canceled => "Canceled",
 615            }
 616        )
 617    }
 618}
 619
 620#[derive(Debug, PartialEq, Clone)]
 621pub enum ContentBlock {
 622    Empty,
 623    Markdown { markdown: Entity<Markdown> },
 624    ResourceLink { resource_link: acp::ResourceLink },
 625    Image { image: Arc<gpui::Image> },
 626}
 627
 628impl ContentBlock {
 629    pub fn new(
 630        block: acp::ContentBlock,
 631        language_registry: &Arc<LanguageRegistry>,
 632        path_style: PathStyle,
 633        cx: &mut App,
 634    ) -> Self {
 635        let mut this = Self::Empty;
 636        this.append(block, language_registry, path_style, cx);
 637        this
 638    }
 639
 640    pub fn new_combined(
 641        blocks: impl IntoIterator<Item = acp::ContentBlock>,
 642        language_registry: Arc<LanguageRegistry>,
 643        path_style: PathStyle,
 644        cx: &mut App,
 645    ) -> Self {
 646        let mut this = Self::Empty;
 647        for block in blocks {
 648            this.append(block, &language_registry, path_style, cx);
 649        }
 650        this
 651    }
 652
 653    pub fn append(
 654        &mut self,
 655        block: acp::ContentBlock,
 656        language_registry: &Arc<LanguageRegistry>,
 657        path_style: PathStyle,
 658        cx: &mut App,
 659    ) {
 660        match (&mut *self, &block) {
 661            (ContentBlock::Empty, acp::ContentBlock::ResourceLink(resource_link)) => {
 662                *self = ContentBlock::ResourceLink {
 663                    resource_link: resource_link.clone(),
 664                };
 665            }
 666            (ContentBlock::Empty, acp::ContentBlock::Image(image_content)) => {
 667                if let Some(image) = Self::decode_image(image_content) {
 668                    *self = ContentBlock::Image { image };
 669                } else {
 670                    let new_content = Self::image_md(image_content);
 671                    *self = Self::create_markdown_block(new_content, language_registry, cx);
 672                }
 673            }
 674            (ContentBlock::Empty, _) => {
 675                let new_content = Self::block_string_contents(&block, path_style);
 676                *self = Self::create_markdown_block(new_content, language_registry, cx);
 677            }
 678            (ContentBlock::Markdown { markdown }, _) => {
 679                let new_content = Self::block_string_contents(&block, path_style);
 680                markdown.update(cx, |markdown, cx| markdown.append(&new_content, cx));
 681            }
 682            (ContentBlock::ResourceLink { resource_link }, _) => {
 683                let existing_content = Self::resource_link_md(&resource_link.uri, path_style);
 684                let new_content = Self::block_string_contents(&block, path_style);
 685                let combined = format!("{}\n{}", existing_content, new_content);
 686                *self = Self::create_markdown_block(combined, language_registry, cx);
 687            }
 688            (ContentBlock::Image { .. }, _) => {
 689                let new_content = Self::block_string_contents(&block, path_style);
 690                let combined = format!("`Image`\n{}", new_content);
 691                *self = Self::create_markdown_block(combined, language_registry, cx);
 692            }
 693        }
 694    }
 695
 696    fn decode_image(image_content: &acp::ImageContent) -> Option<Arc<gpui::Image>> {
 697        use base64::Engine as _;
 698
 699        let bytes = base64::engine::general_purpose::STANDARD
 700            .decode(image_content.data.as_bytes())
 701            .ok()?;
 702        let format = gpui::ImageFormat::from_mime_type(&image_content.mime_type)?;
 703        Some(Arc::new(gpui::Image::from_bytes(format, bytes)))
 704    }
 705
 706    fn create_markdown_block(
 707        content: String,
 708        language_registry: &Arc<LanguageRegistry>,
 709        cx: &mut App,
 710    ) -> ContentBlock {
 711        ContentBlock::Markdown {
 712            markdown: cx
 713                .new(|cx| Markdown::new(content.into(), Some(language_registry.clone()), None, cx)),
 714        }
 715    }
 716
 717    fn block_string_contents(block: &acp::ContentBlock, path_style: PathStyle) -> String {
 718        match block {
 719            acp::ContentBlock::Text(text_content) => text_content.text.clone(),
 720            acp::ContentBlock::ResourceLink(resource_link) => {
 721                Self::resource_link_md(&resource_link.uri, path_style)
 722            }
 723            acp::ContentBlock::Resource(acp::EmbeddedResource {
 724                resource:
 725                    acp::EmbeddedResourceResource::TextResourceContents(acp::TextResourceContents {
 726                        uri,
 727                        ..
 728                    }),
 729                ..
 730            }) => Self::resource_link_md(uri, path_style),
 731            acp::ContentBlock::Image(image) => Self::image_md(image),
 732            _ => String::new(),
 733        }
 734    }
 735
 736    fn resource_link_md(uri: &str, path_style: PathStyle) -> String {
 737        if let Some(uri) = MentionUri::parse(uri, path_style).log_err() {
 738            uri.as_link().to_string()
 739        } else {
 740            uri.to_string()
 741        }
 742    }
 743
 744    fn image_md(_image: &acp::ImageContent) -> String {
 745        "`Image`".into()
 746    }
 747
 748    pub fn to_markdown<'a>(&'a self, cx: &'a App) -> &'a str {
 749        match self {
 750            ContentBlock::Empty => "",
 751            ContentBlock::Markdown { markdown } => markdown.read(cx).source(),
 752            ContentBlock::ResourceLink { resource_link } => &resource_link.uri,
 753            ContentBlock::Image { .. } => "`Image`",
 754        }
 755    }
 756
 757    pub fn markdown(&self) -> Option<&Entity<Markdown>> {
 758        match self {
 759            ContentBlock::Empty => None,
 760            ContentBlock::Markdown { markdown } => Some(markdown),
 761            ContentBlock::ResourceLink { .. } => None,
 762            ContentBlock::Image { .. } => None,
 763        }
 764    }
 765
 766    pub fn resource_link(&self) -> Option<&acp::ResourceLink> {
 767        match self {
 768            ContentBlock::ResourceLink { resource_link } => Some(resource_link),
 769            _ => None,
 770        }
 771    }
 772
 773    pub fn image(&self) -> Option<&Arc<gpui::Image>> {
 774        match self {
 775            ContentBlock::Image { image } => Some(image),
 776            _ => None,
 777        }
 778    }
 779}
 780
 781#[derive(Debug)]
 782pub enum ToolCallContent {
 783    ContentBlock(ContentBlock),
 784    Diff(Entity<Diff>),
 785    Terminal(Entity<Terminal>),
 786}
 787
 788impl ToolCallContent {
 789    pub fn from_acp(
 790        content: acp::ToolCallContent,
 791        language_registry: Arc<LanguageRegistry>,
 792        path_style: PathStyle,
 793        terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
 794        cx: &mut App,
 795    ) -> Result<Option<Self>> {
 796        match content {
 797            acp::ToolCallContent::Content(acp::Content { content, .. }) => {
 798                Ok(Some(Self::ContentBlock(ContentBlock::new(
 799                    content,
 800                    &language_registry,
 801                    path_style,
 802                    cx,
 803                ))))
 804            }
 805            acp::ToolCallContent::Diff(diff) => Ok(Some(Self::Diff(cx.new(|cx| {
 806                Diff::finalized(
 807                    diff.path.to_string_lossy().into_owned(),
 808                    diff.old_text,
 809                    diff.new_text,
 810                    language_registry,
 811                    cx,
 812                )
 813            })))),
 814            acp::ToolCallContent::Terminal(acp::Terminal { terminal_id, .. }) => terminals
 815                .get(&terminal_id)
 816                .cloned()
 817                .map(|terminal| Some(Self::Terminal(terminal)))
 818                .ok_or_else(|| anyhow::anyhow!("Terminal with id `{}` not found", terminal_id)),
 819            _ => Ok(None),
 820        }
 821    }
 822
 823    pub fn update_from_acp(
 824        &mut self,
 825        new: acp::ToolCallContent,
 826        language_registry: Arc<LanguageRegistry>,
 827        path_style: PathStyle,
 828        terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
 829        cx: &mut App,
 830    ) -> Result<bool> {
 831        let needs_update = match (&self, &new) {
 832            (Self::Diff(old_diff), acp::ToolCallContent::Diff(new_diff)) => {
 833                old_diff.read(cx).needs_update(
 834                    new_diff.old_text.as_deref().unwrap_or(""),
 835                    &new_diff.new_text,
 836                    cx,
 837                )
 838            }
 839            _ => true,
 840        };
 841
 842        if let Some(update) = Self::from_acp(new, language_registry, path_style, terminals, cx)? {
 843            if needs_update {
 844                *self = update;
 845            }
 846            Ok(true)
 847        } else {
 848            Ok(false)
 849        }
 850    }
 851
 852    pub fn to_markdown(&self, cx: &App) -> String {
 853        match self {
 854            Self::ContentBlock(content) => content.to_markdown(cx).to_string(),
 855            Self::Diff(diff) => diff.read(cx).to_markdown(cx),
 856            Self::Terminal(terminal) => terminal.read(cx).to_markdown(cx),
 857        }
 858    }
 859
 860    pub fn image(&self) -> Option<&Arc<gpui::Image>> {
 861        match self {
 862            Self::ContentBlock(content) => content.image(),
 863            _ => None,
 864        }
 865    }
 866}
 867
 868#[derive(Debug, PartialEq)]
 869pub enum ToolCallUpdate {
 870    UpdateFields(acp::ToolCallUpdate),
 871    UpdateDiff(ToolCallUpdateDiff),
 872    UpdateTerminal(ToolCallUpdateTerminal),
 873}
 874
 875impl ToolCallUpdate {
 876    fn id(&self) -> &acp::ToolCallId {
 877        match self {
 878            Self::UpdateFields(update) => &update.tool_call_id,
 879            Self::UpdateDiff(diff) => &diff.id,
 880            Self::UpdateTerminal(terminal) => &terminal.id,
 881        }
 882    }
 883}
 884
 885impl From<acp::ToolCallUpdate> for ToolCallUpdate {
 886    fn from(update: acp::ToolCallUpdate) -> Self {
 887        Self::UpdateFields(update)
 888    }
 889}
 890
 891impl From<ToolCallUpdateDiff> for ToolCallUpdate {
 892    fn from(diff: ToolCallUpdateDiff) -> Self {
 893        Self::UpdateDiff(diff)
 894    }
 895}
 896
 897#[derive(Debug, PartialEq)]
 898pub struct ToolCallUpdateDiff {
 899    pub id: acp::ToolCallId,
 900    pub diff: Entity<Diff>,
 901}
 902
 903impl From<ToolCallUpdateTerminal> for ToolCallUpdate {
 904    fn from(terminal: ToolCallUpdateTerminal) -> Self {
 905        Self::UpdateTerminal(terminal)
 906    }
 907}
 908
 909#[derive(Debug, PartialEq)]
 910pub struct ToolCallUpdateTerminal {
 911    pub id: acp::ToolCallId,
 912    pub terminal: Entity<Terminal>,
 913}
 914
 915#[derive(Debug, Default)]
 916pub struct Plan {
 917    pub entries: Vec<PlanEntry>,
 918}
 919
 920#[derive(Debug)]
 921pub struct PlanStats<'a> {
 922    pub in_progress_entry: Option<&'a PlanEntry>,
 923    pub pending: u32,
 924    pub completed: u32,
 925}
 926
 927impl Plan {
 928    pub fn is_empty(&self) -> bool {
 929        self.entries.is_empty()
 930    }
 931
 932    pub fn stats(&self) -> PlanStats<'_> {
 933        let mut stats = PlanStats {
 934            in_progress_entry: None,
 935            pending: 0,
 936            completed: 0,
 937        };
 938
 939        for entry in &self.entries {
 940            match &entry.status {
 941                acp::PlanEntryStatus::Pending => {
 942                    stats.pending += 1;
 943                }
 944                acp::PlanEntryStatus::InProgress => {
 945                    stats.in_progress_entry = stats.in_progress_entry.or(Some(entry));
 946                    stats.pending += 1;
 947                }
 948                acp::PlanEntryStatus::Completed => {
 949                    stats.completed += 1;
 950                }
 951                _ => {}
 952            }
 953        }
 954
 955        stats
 956    }
 957}
 958
 959#[derive(Debug)]
 960pub struct PlanEntry {
 961    pub content: Entity<Markdown>,
 962    pub priority: acp::PlanEntryPriority,
 963    pub status: acp::PlanEntryStatus,
 964}
 965
 966impl PlanEntry {
 967    pub fn from_acp(entry: acp::PlanEntry, cx: &mut App) -> Self {
 968        Self {
 969            content: cx.new(|cx| Markdown::new(entry.content.into(), None, None, cx)),
 970            priority: entry.priority,
 971            status: entry.status,
 972        }
 973    }
 974}
 975
 976#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
 977pub struct TokenUsage {
 978    pub max_tokens: u64,
 979    pub used_tokens: u64,
 980    pub input_tokens: u64,
 981    pub output_tokens: u64,
 982    pub max_output_tokens: Option<u64>,
 983}
 984
 985#[derive(Debug, Clone)]
 986pub struct SessionCost {
 987    pub amount: f64,
 988    pub currency: SharedString,
 989}
 990
 991pub const TOKEN_USAGE_WARNING_THRESHOLD: f32 = 0.8;
 992
 993impl TokenUsage {
 994    pub fn ratio(&self) -> TokenUsageRatio {
 995        #[cfg(debug_assertions)]
 996        let warning_threshold: f32 = std::env::var("ZED_THREAD_WARNING_THRESHOLD")
 997            .unwrap_or(TOKEN_USAGE_WARNING_THRESHOLD.to_string())
 998            .parse()
 999            .unwrap();
1000        #[cfg(not(debug_assertions))]
1001        let warning_threshold: f32 = TOKEN_USAGE_WARNING_THRESHOLD;
1002
1003        // When the maximum is unknown because there is no selected model,
1004        // avoid showing the token limit warning.
1005        if self.max_tokens == 0 {
1006            TokenUsageRatio::Normal
1007        } else if self.used_tokens >= self.max_tokens {
1008            TokenUsageRatio::Exceeded
1009        } else if self.used_tokens as f32 / self.max_tokens as f32 >= warning_threshold {
1010            TokenUsageRatio::Warning
1011        } else {
1012            TokenUsageRatio::Normal
1013        }
1014    }
1015}
1016
1017#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
1018pub enum TokenUsageRatio {
1019    Normal,
1020    Warning,
1021    Exceeded,
1022}
1023
1024#[derive(Debug, Clone)]
1025pub struct RetryStatus {
1026    pub last_error: SharedString,
1027    pub attempt: usize,
1028    pub max_attempts: usize,
1029    pub started_at: Instant,
1030    pub duration: Duration,
1031}
1032
1033struct RunningTurn {
1034    id: u32,
1035    send_task: Task<()>,
1036}
1037
1038pub struct AcpThread {
1039    session_id: acp::SessionId,
1040    work_dirs: Option<PathList>,
1041    parent_session_id: Option<acp::SessionId>,
1042    title: Option<SharedString>,
1043    provisional_title: Option<SharedString>,
1044    entries: Vec<AgentThreadEntry>,
1045    plan: Plan,
1046    project: Entity<Project>,
1047    action_log: Entity<ActionLog>,
1048    shared_buffers: HashMap<Entity<Buffer>, BufferSnapshot>,
1049    turn_id: u32,
1050    running_turn: Option<RunningTurn>,
1051    connection: Rc<dyn AgentConnection>,
1052    token_usage: Option<TokenUsage>,
1053    cost: Option<SessionCost>,
1054    prompt_capabilities: acp::PromptCapabilities,
1055    available_commands: Vec<acp::AvailableCommand>,
1056    _observe_prompt_capabilities: Task<anyhow::Result<()>>,
1057    terminals: HashMap<acp::TerminalId, Entity<Terminal>>,
1058    pending_terminal_output: HashMap<acp::TerminalId, Vec<Vec<u8>>>,
1059    pending_terminal_exit: HashMap<acp::TerminalId, acp::TerminalExitStatus>,
1060    had_error: bool,
1061    /// The user's unsent prompt text, persisted so it can be restored when reloading the thread.
1062    draft_prompt: Option<Vec<acp::ContentBlock>>,
1063    /// The initial scroll position for the thread view, set during session registration.
1064    ui_scroll_position: Option<gpui::ListOffset>,
1065    /// Buffer for smooth text streaming. Holds text that has been received from
1066    /// the model but not yet revealed in the UI. A timer task drains this buffer
1067    /// gradually to create a fluid typing effect instead of choppy chunk-at-a-time
1068    /// updates.
1069    streaming_text_buffer: Option<StreamingTextBuffer>,
1070}
1071
1072struct StreamingTextBuffer {
1073    /// Text received from the model but not yet appended to the Markdown source.
1074    pending: String,
1075    /// The number of bytes to reveal per timer turn.
1076    bytes_to_reveal_per_tick: usize,
1077    /// The Markdown entity being streamed into.
1078    target: Entity<Markdown>,
1079    /// Timer task that periodically moves text from `pending` into `source`.
1080    _reveal_task: Task<()>,
1081}
1082
1083impl StreamingTextBuffer {
1084    /// The number of milliseconds between each timer tick, controlling how quickly
1085    /// text is revealed.
1086    const TASK_UPDATE_MS: u64 = 16;
1087    /// The time in milliseconds to reveal the entire pending text.
1088    const REVEAL_TARGET: f32 = 200.0;
1089}
1090
1091impl From<&AcpThread> for ActionLogTelemetry {
1092    fn from(value: &AcpThread) -> Self {
1093        Self {
1094            agent_telemetry_id: value.connection().telemetry_id(),
1095            session_id: value.session_id.0.clone(),
1096        }
1097    }
1098}
1099
1100#[derive(Debug)]
1101pub enum AcpThreadEvent {
1102    PromptUpdated,
1103    NewEntry,
1104    TitleUpdated,
1105    TokenUsageUpdated,
1106    EntryUpdated(usize),
1107    EntriesRemoved(Range<usize>),
1108    ToolAuthorizationRequested(acp::ToolCallId),
1109    ToolAuthorizationReceived(acp::ToolCallId),
1110    Retry(RetryStatus),
1111    SubagentSpawned(acp::SessionId),
1112    Stopped(acp::StopReason),
1113    Error,
1114    LoadError(LoadError),
1115    PromptCapabilitiesUpdated,
1116    Refusal,
1117    AvailableCommandsUpdated(Vec<acp::AvailableCommand>),
1118    ModeUpdated(acp::SessionModeId),
1119    ConfigOptionsUpdated(Vec<acp::SessionConfigOption>),
1120    WorkingDirectoriesUpdated,
1121}
1122
1123impl EventEmitter<AcpThreadEvent> for AcpThread {}
1124
1125#[derive(Debug, Clone)]
1126pub enum TerminalProviderEvent {
1127    Created {
1128        terminal_id: acp::TerminalId,
1129        label: String,
1130        cwd: Option<PathBuf>,
1131        output_byte_limit: Option<u64>,
1132        terminal: Entity<::terminal::Terminal>,
1133    },
1134    Output {
1135        terminal_id: acp::TerminalId,
1136        data: Vec<u8>,
1137    },
1138    TitleChanged {
1139        terminal_id: acp::TerminalId,
1140        title: String,
1141    },
1142    Exit {
1143        terminal_id: acp::TerminalId,
1144        status: acp::TerminalExitStatus,
1145    },
1146}
1147
1148#[derive(Debug, Clone)]
1149pub enum TerminalProviderCommand {
1150    WriteInput {
1151        terminal_id: acp::TerminalId,
1152        bytes: Vec<u8>,
1153    },
1154    Resize {
1155        terminal_id: acp::TerminalId,
1156        cols: u16,
1157        rows: u16,
1158    },
1159    Close {
1160        terminal_id: acp::TerminalId,
1161    },
1162}
1163
1164#[derive(PartialEq, Eq, Debug)]
1165pub enum ThreadStatus {
1166    Idle,
1167    Generating,
1168}
1169
1170#[derive(Debug, Clone)]
1171pub enum LoadError {
1172    Unsupported {
1173        command: SharedString,
1174        current_version: SharedString,
1175        minimum_version: SharedString,
1176    },
1177    FailedToInstall(SharedString),
1178    Exited {
1179        status: ExitStatus,
1180    },
1181    Other(SharedString),
1182}
1183
1184impl Display for LoadError {
1185    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1186        match self {
1187            LoadError::Unsupported {
1188                command: path,
1189                current_version,
1190                minimum_version,
1191            } => {
1192                write!(
1193                    f,
1194                    "version {current_version} from {path} is not supported (need at least {minimum_version})"
1195                )
1196            }
1197            LoadError::FailedToInstall(msg) => write!(f, "Failed to install: {msg}"),
1198            LoadError::Exited { status } => write!(f, "Server exited with status {status}"),
1199            LoadError::Other(msg) => write!(f, "{msg}"),
1200        }
1201    }
1202}
1203
1204impl Error for LoadError {}
1205
1206impl AcpThread {
1207    pub fn new(
1208        parent_session_id: Option<acp::SessionId>,
1209        title: Option<SharedString>,
1210        work_dirs: Option<PathList>,
1211        connection: Rc<dyn AgentConnection>,
1212        project: Entity<Project>,
1213        action_log: Entity<ActionLog>,
1214        session_id: acp::SessionId,
1215        mut prompt_capabilities_rx: watch::Receiver<acp::PromptCapabilities>,
1216        cx: &mut Context<Self>,
1217    ) -> Self {
1218        let prompt_capabilities = prompt_capabilities_rx.borrow().clone();
1219        let task = cx.spawn::<_, anyhow::Result<()>>(async move |this, cx| {
1220            loop {
1221                let caps = prompt_capabilities_rx.recv().await?;
1222                this.update(cx, |this, cx| {
1223                    this.prompt_capabilities = caps;
1224                    cx.emit(AcpThreadEvent::PromptCapabilitiesUpdated);
1225                })?;
1226            }
1227        });
1228
1229        Self {
1230            parent_session_id,
1231            work_dirs,
1232            action_log,
1233            shared_buffers: Default::default(),
1234            entries: Default::default(),
1235            plan: Default::default(),
1236            title,
1237            provisional_title: None,
1238            project,
1239            running_turn: None,
1240            turn_id: 0,
1241            connection,
1242            session_id,
1243            token_usage: None,
1244            cost: None,
1245            prompt_capabilities,
1246            available_commands: Vec::new(),
1247            _observe_prompt_capabilities: task,
1248            terminals: HashMap::default(),
1249            pending_terminal_output: HashMap::default(),
1250            pending_terminal_exit: HashMap::default(),
1251            had_error: false,
1252            draft_prompt: None,
1253            ui_scroll_position: None,
1254            streaming_text_buffer: None,
1255        }
1256    }
1257
1258    pub fn parent_session_id(&self) -> Option<&acp::SessionId> {
1259        self.parent_session_id.as_ref()
1260    }
1261
1262    pub fn prompt_capabilities(&self) -> acp::PromptCapabilities {
1263        self.prompt_capabilities.clone()
1264    }
1265
1266    pub fn available_commands(&self) -> &[acp::AvailableCommand] {
1267        &self.available_commands
1268    }
1269
1270    pub fn is_draft_thread(&self) -> bool {
1271        self.entries().is_empty()
1272    }
1273
1274    pub fn draft_prompt(&self) -> Option<&[acp::ContentBlock]> {
1275        self.draft_prompt.as_deref()
1276    }
1277
1278    pub fn set_draft_prompt(
1279        &mut self,
1280        prompt: Option<Vec<acp::ContentBlock>>,
1281        cx: &mut Context<Self>,
1282    ) {
1283        cx.emit(AcpThreadEvent::PromptUpdated);
1284        self.draft_prompt = prompt;
1285    }
1286
1287    pub fn ui_scroll_position(&self) -> Option<gpui::ListOffset> {
1288        self.ui_scroll_position
1289    }
1290
1291    pub fn set_ui_scroll_position(&mut self, position: Option<gpui::ListOffset>) {
1292        self.ui_scroll_position = position;
1293    }
1294
1295    pub fn connection(&self) -> &Rc<dyn AgentConnection> {
1296        &self.connection
1297    }
1298
1299    pub fn action_log(&self) -> &Entity<ActionLog> {
1300        &self.action_log
1301    }
1302
1303    pub fn project(&self) -> &Entity<Project> {
1304        &self.project
1305    }
1306
1307    pub fn title(&self) -> Option<SharedString> {
1308        self.title
1309            .clone()
1310            .or_else(|| self.provisional_title.clone())
1311    }
1312
1313    pub fn has_provisional_title(&self) -> bool {
1314        self.provisional_title.is_some()
1315    }
1316
1317    pub fn entries(&self) -> &[AgentThreadEntry] {
1318        &self.entries
1319    }
1320
1321    pub fn session_id(&self) -> &acp::SessionId {
1322        &self.session_id
1323    }
1324
1325    pub fn supports_truncate(&self, cx: &App) -> bool {
1326        self.connection.truncate(&self.session_id, cx).is_some()
1327    }
1328
1329    pub fn work_dirs(&self) -> Option<&PathList> {
1330        self.work_dirs.as_ref()
1331    }
1332
1333    pub fn set_work_dirs(&mut self, work_dirs: PathList, cx: &mut Context<Self>) {
1334        self.work_dirs = Some(work_dirs);
1335        cx.emit(AcpThreadEvent::WorkingDirectoriesUpdated)
1336    }
1337
1338    pub fn status(&self) -> ThreadStatus {
1339        if self.running_turn.is_some() {
1340            ThreadStatus::Generating
1341        } else {
1342            ThreadStatus::Idle
1343        }
1344    }
1345
1346    pub fn had_error(&self) -> bool {
1347        self.had_error
1348    }
1349
1350    pub fn is_waiting_for_confirmation(&self) -> bool {
1351        for entry in self.entries.iter().rev() {
1352            match entry {
1353                AgentThreadEntry::UserMessage(_) => return false,
1354                AgentThreadEntry::ToolCall(ToolCall {
1355                    status: ToolCallStatus::WaitingForConfirmation { .. },
1356                    ..
1357                }) => return true,
1358                AgentThreadEntry::ToolCall(_)
1359                | AgentThreadEntry::AssistantMessage(_)
1360                | AgentThreadEntry::CompletedPlan(_) => {}
1361            }
1362        }
1363        false
1364    }
1365
1366    pub fn token_usage(&self) -> Option<&TokenUsage> {
1367        self.token_usage.as_ref()
1368    }
1369
1370    pub fn cost(&self) -> Option<&SessionCost> {
1371        self.cost.as_ref()
1372    }
1373
1374    pub fn has_pending_edit_tool_calls(&self) -> bool {
1375        for entry in self.entries.iter().rev() {
1376            match entry {
1377                AgentThreadEntry::UserMessage(_) => return false,
1378                AgentThreadEntry::ToolCall(
1379                    call @ ToolCall {
1380                        status: ToolCallStatus::InProgress | ToolCallStatus::Pending,
1381                        ..
1382                    },
1383                ) if call.diffs().next().is_some() => {
1384                    return true;
1385                }
1386                AgentThreadEntry::ToolCall(_)
1387                | AgentThreadEntry::AssistantMessage(_)
1388                | AgentThreadEntry::CompletedPlan(_) => {}
1389            }
1390        }
1391
1392        false
1393    }
1394
1395    pub fn has_in_progress_tool_calls(&self) -> bool {
1396        for entry in self.entries.iter().rev() {
1397            match entry {
1398                AgentThreadEntry::UserMessage(_) => return false,
1399                AgentThreadEntry::ToolCall(ToolCall {
1400                    status: ToolCallStatus::InProgress | ToolCallStatus::Pending,
1401                    ..
1402                }) => {
1403                    return true;
1404                }
1405                AgentThreadEntry::ToolCall(_)
1406                | AgentThreadEntry::AssistantMessage(_)
1407                | AgentThreadEntry::CompletedPlan(_) => {}
1408            }
1409        }
1410
1411        false
1412    }
1413
1414    pub fn used_tools_since_last_user_message(&self) -> bool {
1415        for entry in self.entries.iter().rev() {
1416            match entry {
1417                AgentThreadEntry::UserMessage(..) => return false,
1418                AgentThreadEntry::AssistantMessage(..) | AgentThreadEntry::CompletedPlan(..) => {
1419                    continue;
1420                }
1421                AgentThreadEntry::ToolCall(..) => return true,
1422            }
1423        }
1424
1425        false
1426    }
1427
1428    pub fn handle_session_update(
1429        &mut self,
1430        update: acp::SessionUpdate,
1431        cx: &mut Context<Self>,
1432    ) -> Result<(), acp::Error> {
1433        match update {
1434            acp::SessionUpdate::UserMessageChunk(acp::ContentChunk { content, .. }) => {
1435                // We optimistically add the full user prompt before calling `prompt`.
1436                // Some ACP servers echo user chunks back over updates. Skip the chunk if
1437                // it's already present in the current user message to avoid duplicating content.
1438                let already_in_user_message = self
1439                    .entries
1440                    .last()
1441                    .and_then(|entry| entry.user_message())
1442                    .is_some_and(|message| message.chunks.contains(&content));
1443                if !already_in_user_message {
1444                    self.push_user_content_block(None, content, cx);
1445                }
1446            }
1447            acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk { content, .. }) => {
1448                self.push_assistant_content_block(content, false, cx);
1449            }
1450            acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk { content, .. }) => {
1451                self.push_assistant_content_block(content, true, cx);
1452            }
1453            acp::SessionUpdate::ToolCall(tool_call) => {
1454                self.upsert_tool_call(tool_call, cx)?;
1455            }
1456            acp::SessionUpdate::ToolCallUpdate(tool_call_update) => {
1457                self.update_tool_call(tool_call_update, cx)?;
1458            }
1459            acp::SessionUpdate::Plan(plan) => {
1460                self.update_plan(plan, cx);
1461            }
1462            acp::SessionUpdate::SessionInfoUpdate(info_update) => {
1463                if let acp::MaybeUndefined::Value(title) = info_update.title {
1464                    let had_provisional = self.provisional_title.take().is_some();
1465                    let title: SharedString = title.into();
1466                    if self.title.as_ref() != Some(&title) {
1467                        self.title = Some(title);
1468                        cx.emit(AcpThreadEvent::TitleUpdated);
1469                    } else if had_provisional {
1470                        cx.emit(AcpThreadEvent::TitleUpdated);
1471                    }
1472                }
1473            }
1474            acp::SessionUpdate::AvailableCommandsUpdate(acp::AvailableCommandsUpdate {
1475                available_commands,
1476                ..
1477            }) => {
1478                self.available_commands = available_commands.clone();
1479                cx.emit(AcpThreadEvent::AvailableCommandsUpdated(available_commands));
1480            }
1481            acp::SessionUpdate::CurrentModeUpdate(acp::CurrentModeUpdate {
1482                current_mode_id,
1483                ..
1484            }) => cx.emit(AcpThreadEvent::ModeUpdated(current_mode_id)),
1485            acp::SessionUpdate::ConfigOptionUpdate(acp::ConfigOptionUpdate {
1486                config_options,
1487                ..
1488            }) => cx.emit(AcpThreadEvent::ConfigOptionsUpdated(config_options)),
1489            acp::SessionUpdate::UsageUpdate(update) if cx.has_flag::<AcpBetaFeatureFlag>() => {
1490                let usage = self.token_usage.get_or_insert_with(Default::default);
1491                usage.max_tokens = update.size;
1492                usage.used_tokens = update.used;
1493                if let Some(cost) = update.cost {
1494                    self.cost = Some(SessionCost {
1495                        amount: cost.amount,
1496                        currency: cost.currency.into(),
1497                    });
1498                }
1499                cx.emit(AcpThreadEvent::TokenUsageUpdated);
1500            }
1501            _ => {}
1502        }
1503        Ok(())
1504    }
1505
1506    pub fn push_user_content_block(
1507        &mut self,
1508        message_id: Option<UserMessageId>,
1509        chunk: acp::ContentBlock,
1510        cx: &mut Context<Self>,
1511    ) {
1512        self.push_user_content_block_with_indent(message_id, chunk, false, cx)
1513    }
1514
1515    pub fn push_user_content_block_with_indent(
1516        &mut self,
1517        message_id: Option<UserMessageId>,
1518        chunk: acp::ContentBlock,
1519        indented: bool,
1520        cx: &mut Context<Self>,
1521    ) {
1522        let language_registry = self.project.read(cx).languages().clone();
1523        let path_style = self.project.read(cx).path_style(cx);
1524        let entries_len = self.entries.len();
1525
1526        if let Some(last_entry) = self.entries.last_mut()
1527            && let AgentThreadEntry::UserMessage(UserMessage {
1528                id,
1529                content,
1530                chunks,
1531                indented: existing_indented,
1532                ..
1533            }) = last_entry
1534            && *existing_indented == indented
1535        {
1536            Self::flush_streaming_text(&mut self.streaming_text_buffer, cx);
1537            *id = message_id.or(id.take());
1538            content.append(chunk.clone(), &language_registry, path_style, cx);
1539            chunks.push(chunk);
1540            let idx = entries_len - 1;
1541            cx.emit(AcpThreadEvent::EntryUpdated(idx));
1542        } else {
1543            let content = ContentBlock::new(chunk.clone(), &language_registry, path_style, cx);
1544            self.push_entry(
1545                AgentThreadEntry::UserMessage(UserMessage {
1546                    id: message_id,
1547                    content,
1548                    chunks: vec![chunk],
1549                    checkpoint: None,
1550                    indented,
1551                }),
1552                cx,
1553            );
1554        }
1555    }
1556
1557    pub fn push_assistant_content_block(
1558        &mut self,
1559        chunk: acp::ContentBlock,
1560        is_thought: bool,
1561        cx: &mut Context<Self>,
1562    ) {
1563        self.push_assistant_content_block_with_indent(chunk, is_thought, false, cx)
1564    }
1565
1566    pub fn push_assistant_content_block_with_indent(
1567        &mut self,
1568        chunk: acp::ContentBlock,
1569        is_thought: bool,
1570        indented: bool,
1571        cx: &mut Context<Self>,
1572    ) {
1573        let path_style = self.project.read(cx).path_style(cx);
1574
1575        // For text chunks going to an existing Markdown block, buffer for smooth
1576        // streaming instead of appending all at once which may feel more choppy.
1577        if let acp::ContentBlock::Text(text_content) = &chunk {
1578            if let Some(markdown) = self.streaming_markdown_target(is_thought, indented) {
1579                let entries_len = self.entries.len();
1580                cx.emit(AcpThreadEvent::EntryUpdated(entries_len - 1));
1581                self.buffer_streaming_text(&markdown, text_content.text.clone(), cx);
1582                return;
1583            }
1584        }
1585
1586        let language_registry = self.project.read(cx).languages().clone();
1587        let entries_len = self.entries.len();
1588        if let Some(last_entry) = self.entries.last_mut()
1589            && let AgentThreadEntry::AssistantMessage(AssistantMessage {
1590                chunks,
1591                indented: existing_indented,
1592                is_subagent_output: _,
1593            }) = last_entry
1594            && *existing_indented == indented
1595        {
1596            let idx = entries_len - 1;
1597            Self::flush_streaming_text(&mut self.streaming_text_buffer, cx);
1598            cx.emit(AcpThreadEvent::EntryUpdated(idx));
1599            match (chunks.last_mut(), is_thought) {
1600                (Some(AssistantMessageChunk::Message { block }), false)
1601                | (Some(AssistantMessageChunk::Thought { block }), true) => {
1602                    block.append(chunk, &language_registry, path_style, cx)
1603                }
1604                _ => {
1605                    let block = ContentBlock::new(chunk, &language_registry, path_style, cx);
1606                    if is_thought {
1607                        chunks.push(AssistantMessageChunk::Thought { block })
1608                    } else {
1609                        chunks.push(AssistantMessageChunk::Message { block })
1610                    }
1611                }
1612            }
1613        } else {
1614            let block = ContentBlock::new(chunk, &language_registry, path_style, cx);
1615            let chunk = if is_thought {
1616                AssistantMessageChunk::Thought { block }
1617            } else {
1618                AssistantMessageChunk::Message { block }
1619            };
1620
1621            self.push_entry(
1622                AgentThreadEntry::AssistantMessage(AssistantMessage {
1623                    chunks: vec![chunk],
1624                    indented,
1625                    is_subagent_output: false,
1626                }),
1627                cx,
1628            );
1629        }
1630    }
1631
1632    fn streaming_markdown_target(
1633        &self,
1634        is_thought: bool,
1635        indented: bool,
1636    ) -> Option<Entity<Markdown>> {
1637        let last_entry = self.entries.last()?;
1638        if let AgentThreadEntry::AssistantMessage(AssistantMessage {
1639            chunks,
1640            indented: existing_indented,
1641            ..
1642        }) = last_entry
1643            && *existing_indented == indented
1644            && let [.., chunk] = chunks.as_slice()
1645        {
1646            match (chunk, is_thought) {
1647                (
1648                    AssistantMessageChunk::Message {
1649                        block: ContentBlock::Markdown { markdown },
1650                    },
1651                    false,
1652                )
1653                | (
1654                    AssistantMessageChunk::Thought {
1655                        block: ContentBlock::Markdown { markdown },
1656                    },
1657                    true,
1658                ) => Some(markdown.clone()),
1659                _ => None,
1660            }
1661        } else {
1662            None
1663        }
1664    }
1665
1666    /// Add text to the streaming buffer. If the target changed (e.g. switching
1667    /// from thoughts to message text), flush the old buffer first.
1668    fn buffer_streaming_text(
1669        &mut self,
1670        markdown: &Entity<Markdown>,
1671        text: String,
1672        cx: &mut Context<Self>,
1673    ) {
1674        if let Some(buffer) = &mut self.streaming_text_buffer {
1675            if buffer.target.entity_id() == markdown.entity_id() {
1676                buffer.pending.push_str(&text);
1677
1678                buffer.bytes_to_reveal_per_tick = (buffer.pending.len() as f32
1679                    / StreamingTextBuffer::REVEAL_TARGET
1680                    * StreamingTextBuffer::TASK_UPDATE_MS as f32)
1681                    .ceil() as usize;
1682                return;
1683            }
1684            Self::flush_streaming_text(&mut self.streaming_text_buffer, cx);
1685        }
1686
1687        let target = markdown.clone();
1688        let _reveal_task = self.start_streaming_reveal(cx);
1689        let pending_len = text.len();
1690        let bytes_to_reveal = (pending_len as f32 / StreamingTextBuffer::REVEAL_TARGET
1691            * StreamingTextBuffer::TASK_UPDATE_MS as f32)
1692            .ceil() as usize;
1693        self.streaming_text_buffer = Some(StreamingTextBuffer {
1694            pending: text,
1695            bytes_to_reveal_per_tick: bytes_to_reveal,
1696            target,
1697            _reveal_task,
1698        });
1699    }
1700
1701    /// Flush all buffered streaming text into the Markdown entity immediately.
1702    fn flush_streaming_text(
1703        streaming_text_buffer: &mut Option<StreamingTextBuffer>,
1704        cx: &mut Context<Self>,
1705    ) {
1706        if let Some(buffer) = streaming_text_buffer.take() {
1707            if !buffer.pending.is_empty() {
1708                buffer
1709                    .target
1710                    .update(cx, |markdown, cx| markdown.append(&buffer.pending, cx));
1711            }
1712        }
1713    }
1714
1715    /// Spawns a foreground task that periodically drains
1716    /// `streaming_text_buffer.pending` into the target `Markdown` entity,
1717    /// producing smooth, continuous text output.
1718    fn start_streaming_reveal(&self, cx: &mut Context<Self>) -> Task<()> {
1719        cx.spawn(async move |this, cx| {
1720            loop {
1721                cx.background_executor()
1722                    .timer(Duration::from_millis(StreamingTextBuffer::TASK_UPDATE_MS))
1723                    .await;
1724
1725                let should_continue = this
1726                    .update(cx, |this, cx| {
1727                        let Some(buffer) = &mut this.streaming_text_buffer else {
1728                            return false;
1729                        };
1730
1731                        if buffer.pending.is_empty() {
1732                            return true;
1733                        }
1734
1735                        let pending_len = buffer.pending.len();
1736
1737                        let byte_boundary = buffer
1738                            .pending
1739                            .ceil_char_boundary(buffer.bytes_to_reveal_per_tick)
1740                            .min(pending_len);
1741
1742                        buffer.target.update(cx, |markdown: &mut Markdown, cx| {
1743                            markdown.append(&buffer.pending[..byte_boundary], cx);
1744                            buffer.pending.drain(..byte_boundary);
1745                        });
1746
1747                        true
1748                    })
1749                    .unwrap_or(false);
1750
1751                if !should_continue {
1752                    break;
1753                }
1754            }
1755        })
1756    }
1757
1758    fn push_entry(&mut self, entry: AgentThreadEntry, cx: &mut Context<Self>) {
1759        Self::flush_streaming_text(&mut self.streaming_text_buffer, cx);
1760        self.entries.push(entry);
1761        cx.emit(AcpThreadEvent::NewEntry);
1762    }
1763
1764    pub fn can_set_title(&mut self, cx: &mut Context<Self>) -> bool {
1765        self.connection.set_title(&self.session_id, cx).is_some()
1766    }
1767
1768    pub fn set_title(&mut self, title: SharedString, cx: &mut Context<Self>) -> Task<Result<()>> {
1769        let had_provisional = self.provisional_title.take().is_some();
1770        if self.title.as_ref() != Some(&title) {
1771            self.title = Some(title.clone());
1772            cx.emit(AcpThreadEvent::TitleUpdated);
1773            if let Some(set_title) = self.connection.set_title(&self.session_id, cx) {
1774                return set_title.run(title, cx);
1775            }
1776        } else if had_provisional {
1777            cx.emit(AcpThreadEvent::TitleUpdated);
1778        }
1779        Task::ready(Ok(()))
1780    }
1781
1782    /// Sets a provisional display title without propagating back to the
1783    /// underlying agent connection. This is used for quick preview titles
1784    /// (e.g. first 20 chars of the user message) that should be shown
1785    /// immediately but replaced once the LLM generates a proper title via
1786    /// `set_title`.
1787    pub fn set_provisional_title(&mut self, title: SharedString, cx: &mut Context<Self>) {
1788        self.provisional_title = Some(title);
1789        cx.emit(AcpThreadEvent::TitleUpdated);
1790    }
1791
1792    pub fn subagent_spawned(&mut self, session_id: acp::SessionId, cx: &mut Context<Self>) {
1793        cx.emit(AcpThreadEvent::SubagentSpawned(session_id));
1794    }
1795
1796    pub fn update_token_usage(&mut self, usage: Option<TokenUsage>, cx: &mut Context<Self>) {
1797        if usage.is_none() {
1798            self.cost = None;
1799        }
1800        self.token_usage = usage;
1801        cx.emit(AcpThreadEvent::TokenUsageUpdated);
1802    }
1803
1804    pub fn update_retry_status(&mut self, status: RetryStatus, cx: &mut Context<Self>) {
1805        cx.emit(AcpThreadEvent::Retry(status));
1806    }
1807
1808    pub fn update_tool_call(
1809        &mut self,
1810        update: impl Into<ToolCallUpdate>,
1811        cx: &mut Context<Self>,
1812    ) -> Result<()> {
1813        let update = update.into();
1814        let languages = self.project.read(cx).languages().clone();
1815        let path_style = self.project.read(cx).path_style(cx);
1816
1817        let ix = match self.index_for_tool_call(update.id()) {
1818            Some(ix) => ix,
1819            None => {
1820                // Tool call not found - create a failed tool call entry
1821                let failed_tool_call = ToolCall {
1822                    id: update.id().clone(),
1823                    label: cx.new(|cx| Markdown::new("Tool call not found".into(), None, None, cx)),
1824                    kind: acp::ToolKind::Fetch,
1825                    content: vec![ToolCallContent::ContentBlock(ContentBlock::new(
1826                        "Tool call not found".into(),
1827                        &languages,
1828                        path_style,
1829                        cx,
1830                    ))],
1831                    status: ToolCallStatus::Failed,
1832                    locations: Vec::new(),
1833                    resolved_locations: Vec::new(),
1834                    raw_input: None,
1835                    raw_input_markdown: None,
1836                    raw_output: None,
1837                    tool_name: None,
1838                    subagent_session_info: None,
1839                };
1840                self.push_entry(AgentThreadEntry::ToolCall(failed_tool_call), cx);
1841                return Ok(());
1842            }
1843        };
1844        let AgentThreadEntry::ToolCall(call) = &mut self.entries[ix] else {
1845            unreachable!()
1846        };
1847
1848        match update {
1849            ToolCallUpdate::UpdateFields(update) => {
1850                let location_updated = update.fields.locations.is_some();
1851                call.update_fields(
1852                    update.fields,
1853                    update.meta,
1854                    languages,
1855                    path_style,
1856                    &self.terminals,
1857                    cx,
1858                )?;
1859                if location_updated {
1860                    self.resolve_locations(update.tool_call_id, cx);
1861                }
1862            }
1863            ToolCallUpdate::UpdateDiff(update) => {
1864                call.content.clear();
1865                call.content.push(ToolCallContent::Diff(update.diff));
1866            }
1867            ToolCallUpdate::UpdateTerminal(update) => {
1868                call.content.clear();
1869                call.content
1870                    .push(ToolCallContent::Terminal(update.terminal));
1871            }
1872        }
1873
1874        cx.emit(AcpThreadEvent::EntryUpdated(ix));
1875
1876        Ok(())
1877    }
1878
1879    /// Updates a tool call if id matches an existing entry, otherwise inserts a new one.
1880    pub fn upsert_tool_call(
1881        &mut self,
1882        tool_call: acp::ToolCall,
1883        cx: &mut Context<Self>,
1884    ) -> Result<(), acp::Error> {
1885        let status = tool_call.status.into();
1886        self.upsert_tool_call_inner(tool_call.into(), status, cx)
1887    }
1888
1889    /// Fails if id does not match an existing entry.
1890    pub fn upsert_tool_call_inner(
1891        &mut self,
1892        update: acp::ToolCallUpdate,
1893        status: ToolCallStatus,
1894        cx: &mut Context<Self>,
1895    ) -> Result<(), acp::Error> {
1896        let language_registry = self.project.read(cx).languages().clone();
1897        let path_style = self.project.read(cx).path_style(cx);
1898        let id = update.tool_call_id.clone();
1899
1900        let agent_telemetry_id = self.connection().telemetry_id();
1901        let session = self.session_id();
1902        let parent_session_id = self.parent_session_id();
1903        if let ToolCallStatus::Completed | ToolCallStatus::Failed = status {
1904            let status = if matches!(status, ToolCallStatus::Completed) {
1905                "completed"
1906            } else {
1907                "failed"
1908            };
1909            telemetry::event!(
1910                "Agent Tool Call Completed",
1911                agent_telemetry_id,
1912                session,
1913                parent_session_id,
1914                status
1915            );
1916        }
1917
1918        if let Some(ix) = self.index_for_tool_call(&id) {
1919            let AgentThreadEntry::ToolCall(call) = &mut self.entries[ix] else {
1920                unreachable!()
1921            };
1922
1923            call.update_fields(
1924                update.fields,
1925                update.meta,
1926                language_registry,
1927                path_style,
1928                &self.terminals,
1929                cx,
1930            )?;
1931            call.status = status;
1932
1933            cx.emit(AcpThreadEvent::EntryUpdated(ix));
1934        } else {
1935            let call = ToolCall::from_acp(
1936                update.try_into()?,
1937                status,
1938                language_registry,
1939                self.project.read(cx).path_style(cx),
1940                &self.terminals,
1941                cx,
1942            )?;
1943            self.push_entry(AgentThreadEntry::ToolCall(call), cx);
1944        };
1945
1946        self.resolve_locations(id, cx);
1947        Ok(())
1948    }
1949
1950    fn index_for_tool_call(&self, id: &acp::ToolCallId) -> Option<usize> {
1951        self.entries
1952            .iter()
1953            .enumerate()
1954            .rev()
1955            .find_map(|(index, entry)| {
1956                if let AgentThreadEntry::ToolCall(tool_call) = entry
1957                    && &tool_call.id == id
1958                {
1959                    Some(index)
1960                } else {
1961                    None
1962                }
1963            })
1964    }
1965
1966    fn tool_call_mut(&mut self, id: &acp::ToolCallId) -> Option<(usize, &mut ToolCall)> {
1967        // The tool call we are looking for is typically the last one, or very close to the end.
1968        // At the moment, it doesn't seem like a hashmap would be a good fit for this use case.
1969        self.entries
1970            .iter_mut()
1971            .enumerate()
1972            .rev()
1973            .find_map(|(index, tool_call)| {
1974                if let AgentThreadEntry::ToolCall(tool_call) = tool_call
1975                    && &tool_call.id == id
1976                {
1977                    Some((index, tool_call))
1978                } else {
1979                    None
1980                }
1981            })
1982    }
1983
1984    pub fn tool_call(&self, id: &acp::ToolCallId) -> Option<(usize, &ToolCall)> {
1985        self.entries
1986            .iter()
1987            .enumerate()
1988            .rev()
1989            .find_map(|(index, tool_call)| {
1990                if let AgentThreadEntry::ToolCall(tool_call) = tool_call
1991                    && &tool_call.id == id
1992                {
1993                    Some((index, tool_call))
1994                } else {
1995                    None
1996                }
1997            })
1998    }
1999
2000    pub fn tool_call_for_subagent(&self, session_id: &acp::SessionId) -> Option<&ToolCall> {
2001        self.entries.iter().find_map(|entry| match entry {
2002            AgentThreadEntry::ToolCall(tool_call) => {
2003                if let Some(subagent_session_info) = &tool_call.subagent_session_info
2004                    && &subagent_session_info.session_id == session_id
2005                {
2006                    Some(tool_call)
2007                } else {
2008                    None
2009                }
2010            }
2011            _ => None,
2012        })
2013    }
2014
2015    pub fn resolve_locations(&mut self, id: acp::ToolCallId, cx: &mut Context<Self>) {
2016        let project = self.project.clone();
2017        let should_update_agent_location = self.parent_session_id.is_none();
2018        let Some((_, tool_call)) = self.tool_call_mut(&id) else {
2019            return;
2020        };
2021        let task = tool_call.resolve_locations(project, cx);
2022        cx.spawn(async move |this, cx| {
2023            let resolved_locations = task.await;
2024
2025            this.update(cx, |this, cx| {
2026                let project = this.project.clone();
2027
2028                for location in resolved_locations.iter().flatten() {
2029                    this.shared_buffers
2030                        .insert(location.buffer.clone(), location.buffer.read(cx).snapshot());
2031                }
2032                let Some((ix, tool_call)) = this.tool_call_mut(&id) else {
2033                    return;
2034                };
2035
2036                if let Some(Some(location)) = resolved_locations.last() {
2037                    project.update(cx, |project, cx| {
2038                        let should_ignore = if let Some(agent_location) = project
2039                            .agent_location()
2040                            .filter(|agent_location| agent_location.buffer == location.buffer)
2041                        {
2042                            let snapshot = location.buffer.read(cx).snapshot();
2043                            let old_position = agent_location.position.to_point(&snapshot);
2044                            let new_position = location.position.to_point(&snapshot);
2045
2046                            // ignore this so that when we get updates from the edit tool
2047                            // the position doesn't reset to the startof line
2048                            old_position.row == new_position.row
2049                                && old_position.column > new_position.column
2050                        } else {
2051                            false
2052                        };
2053                        if !should_ignore && should_update_agent_location {
2054                            project.set_agent_location(Some(location.into()), cx);
2055                        }
2056                    });
2057                }
2058
2059                let resolved_locations = resolved_locations
2060                    .iter()
2061                    .map(|l| l.as_ref().map(|l| AgentLocation::from(l)))
2062                    .collect::<Vec<_>>();
2063
2064                if tool_call.resolved_locations != resolved_locations {
2065                    tool_call.resolved_locations = resolved_locations;
2066                    cx.emit(AcpThreadEvent::EntryUpdated(ix));
2067                }
2068            })
2069        })
2070        .detach();
2071    }
2072
2073    pub fn request_tool_call_authorization(
2074        &mut self,
2075        tool_call: acp::ToolCallUpdate,
2076        options: PermissionOptions,
2077        cx: &mut Context<Self>,
2078    ) -> Result<Task<RequestPermissionOutcome>> {
2079        let (tx, rx) = oneshot::channel();
2080
2081        let status = ToolCallStatus::WaitingForConfirmation {
2082            options,
2083            respond_tx: tx,
2084        };
2085
2086        let tool_call_id = tool_call.tool_call_id.clone();
2087        self.upsert_tool_call_inner(tool_call, status, cx)?;
2088        cx.emit(AcpThreadEvent::ToolAuthorizationRequested(
2089            tool_call_id.clone(),
2090        ));
2091
2092        Ok(cx.spawn(async move |this, cx| {
2093            let outcome = match rx.await {
2094                Ok(outcome) => RequestPermissionOutcome::Selected(outcome),
2095                Err(oneshot::Canceled) => RequestPermissionOutcome::Cancelled,
2096            };
2097            this.update(cx, |_this, cx| {
2098                cx.emit(AcpThreadEvent::ToolAuthorizationReceived(tool_call_id))
2099            })
2100            .ok();
2101            outcome
2102        }))
2103    }
2104
2105    pub fn authorize_tool_call(
2106        &mut self,
2107        id: acp::ToolCallId,
2108        outcome: SelectedPermissionOutcome,
2109        cx: &mut Context<Self>,
2110    ) {
2111        let Some((ix, call)) = self.tool_call_mut(&id) else {
2112            return;
2113        };
2114
2115        let new_status = match outcome.option_kind {
2116            acp::PermissionOptionKind::RejectOnce | acp::PermissionOptionKind::RejectAlways => {
2117                ToolCallStatus::Rejected
2118            }
2119            acp::PermissionOptionKind::AllowOnce | acp::PermissionOptionKind::AllowAlways => {
2120                ToolCallStatus::InProgress
2121            }
2122            _ => ToolCallStatus::InProgress,
2123        };
2124
2125        let curr_status = mem::replace(&mut call.status, new_status);
2126
2127        if let ToolCallStatus::WaitingForConfirmation { respond_tx, .. } = curr_status {
2128            respond_tx.send(outcome).log_err();
2129        } else if cfg!(debug_assertions) {
2130            panic!("tried to authorize an already authorized tool call");
2131        }
2132
2133        cx.emit(AcpThreadEvent::EntryUpdated(ix));
2134    }
2135
2136    pub fn plan(&self) -> &Plan {
2137        &self.plan
2138    }
2139
2140    pub fn update_plan(&mut self, request: acp::Plan, cx: &mut Context<Self>) {
2141        let new_entries_len = request.entries.len();
2142        let mut new_entries = request.entries.into_iter();
2143
2144        // Reuse existing markdown to prevent flickering
2145        for (old, new) in self.plan.entries.iter_mut().zip(new_entries.by_ref()) {
2146            let PlanEntry {
2147                content,
2148                priority,
2149                status,
2150            } = old;
2151            content.update(cx, |old, cx| {
2152                old.replace(new.content, cx);
2153            });
2154            *priority = new.priority;
2155            *status = new.status;
2156        }
2157        for new in new_entries {
2158            self.plan.entries.push(PlanEntry::from_acp(new, cx))
2159        }
2160        self.plan.entries.truncate(new_entries_len);
2161
2162        cx.notify();
2163    }
2164
2165    pub fn snapshot_completed_plan(&mut self, cx: &mut Context<Self>) {
2166        if !self.plan.is_empty() && self.plan.stats().pending == 0 {
2167            let completed_entries = std::mem::take(&mut self.plan.entries);
2168            self.push_entry(AgentThreadEntry::CompletedPlan(completed_entries), cx);
2169        }
2170    }
2171
2172    fn clear_completed_plan_entries(&mut self, cx: &mut Context<Self>) {
2173        self.plan
2174            .entries
2175            .retain(|entry| !matches!(entry.status, acp::PlanEntryStatus::Completed));
2176        cx.notify();
2177    }
2178
2179    pub fn clear_plan(&mut self, cx: &mut Context<Self>) {
2180        self.plan.entries.clear();
2181        cx.notify();
2182    }
2183
2184    #[cfg(any(test, feature = "test-support"))]
2185    pub fn send_raw(
2186        &mut self,
2187        message: &str,
2188        cx: &mut Context<Self>,
2189    ) -> BoxFuture<'static, Result<Option<acp::PromptResponse>>> {
2190        self.send(vec![message.into()], cx)
2191    }
2192
2193    pub fn send(
2194        &mut self,
2195        message: Vec<acp::ContentBlock>,
2196        cx: &mut Context<Self>,
2197    ) -> BoxFuture<'static, Result<Option<acp::PromptResponse>>> {
2198        let block = ContentBlock::new_combined(
2199            message.clone(),
2200            self.project.read(cx).languages().clone(),
2201            self.project.read(cx).path_style(cx),
2202            cx,
2203        );
2204        let request = acp::PromptRequest::new(self.session_id.clone(), message.clone());
2205        let git_store = self.project.read(cx).git_store().clone();
2206
2207        let message_id = UserMessageId::new();
2208
2209        self.run_turn(cx, async move |this, cx| {
2210            this.update(cx, |this, cx| {
2211                this.push_entry(
2212                    AgentThreadEntry::UserMessage(UserMessage {
2213                        id: Some(message_id.clone()),
2214                        content: block,
2215                        chunks: message,
2216                        checkpoint: None,
2217                        indented: false,
2218                    }),
2219                    cx,
2220                );
2221            })
2222            .ok();
2223
2224            let old_checkpoint = git_store
2225                .update(cx, |git, cx| git.checkpoint(cx))
2226                .await
2227                .context("failed to get old checkpoint")
2228                .log_err();
2229            this.update(cx, |this, cx| {
2230                if let Some((_ix, message)) = this.last_user_message() {
2231                    message.checkpoint = old_checkpoint.map(|git_checkpoint| Checkpoint {
2232                        git_checkpoint,
2233                        show: false,
2234                    });
2235                }
2236                this.connection.prompt(message_id, request, cx)
2237            })?
2238            .await
2239        })
2240    }
2241
2242    pub fn can_retry(&self, cx: &App) -> bool {
2243        self.connection.retry(&self.session_id, cx).is_some()
2244    }
2245
2246    pub fn retry(
2247        &mut self,
2248        cx: &mut Context<Self>,
2249    ) -> BoxFuture<'static, Result<Option<acp::PromptResponse>>> {
2250        self.run_turn(cx, async move |this, cx| {
2251            this.update(cx, |this, cx| {
2252                this.connection
2253                    .retry(&this.session_id, cx)
2254                    .map(|retry| retry.run(cx))
2255            })?
2256            .context("retrying a session is not supported")?
2257            .await
2258        })
2259    }
2260
2261    fn run_turn(
2262        &mut self,
2263        cx: &mut Context<Self>,
2264        f: impl 'static + AsyncFnOnce(WeakEntity<Self>, &mut AsyncApp) -> Result<acp::PromptResponse>,
2265    ) -> BoxFuture<'static, Result<Option<acp::PromptResponse>>> {
2266        self.clear_completed_plan_entries(cx);
2267        self.had_error = false;
2268
2269        let (tx, rx) = oneshot::channel();
2270        let cancel_task = self.cancel(cx);
2271
2272        self.turn_id += 1;
2273        let turn_id = self.turn_id;
2274        self.running_turn = Some(RunningTurn {
2275            id: turn_id,
2276            send_task: cx.spawn(async move |this, cx| {
2277                cancel_task.await;
2278                tx.send(f(this, cx).await).ok();
2279            }),
2280        });
2281
2282        cx.spawn(async move |this, cx| {
2283            let response = rx.await;
2284
2285            this.update(cx, |this, cx| this.update_last_checkpoint(cx))?
2286                .await?;
2287
2288            this.update(cx, |this, cx| {
2289                if this.parent_session_id.is_none() {
2290                    this.project
2291                        .update(cx, |project, cx| project.set_agent_location(None, cx));
2292                }
2293                let Ok(response) = response else {
2294                    // tx dropped, just return
2295                    return Ok(None);
2296                };
2297
2298                let is_same_turn = this
2299                    .running_turn
2300                    .as_ref()
2301                    .is_some_and(|turn| turn_id == turn.id);
2302
2303                // If the user submitted a follow up message, running_turn might
2304                // already point to a different turn. Therefore we only want to
2305                // take the task if it's the same turn.
2306                if is_same_turn {
2307                    this.running_turn.take();
2308                }
2309
2310                match response {
2311                    Ok(r) => {
2312                        Self::flush_streaming_text(&mut this.streaming_text_buffer, cx);
2313
2314                        if r.stop_reason == acp::StopReason::MaxTokens {
2315                            this.had_error = true;
2316                            cx.emit(AcpThreadEvent::Error);
2317                            log::error!("Max tokens reached. Usage: {:?}", this.token_usage);
2318
2319                            let exceeded_max_output_tokens =
2320                                this.token_usage.as_ref().is_some_and(|u| {
2321                                    u.max_output_tokens
2322                                        .is_some_and(|max| u.output_tokens >= max)
2323                                });
2324
2325                            if exceeded_max_output_tokens {
2326                                log::error!(
2327                                    "Max output tokens reached. Usage: {:?}",
2328                                    this.token_usage
2329                                );
2330                            } else {
2331                                log::error!("Max tokens reached. Usage: {:?}", this.token_usage);
2332                            }
2333                            return Err(anyhow!(MaxOutputTokensError));
2334                        }
2335
2336                        let canceled = matches!(r.stop_reason, acp::StopReason::Cancelled);
2337                        if canceled {
2338                            this.mark_pending_tools_as_canceled();
2339                        }
2340
2341                        if !canceled {
2342                            this.snapshot_completed_plan(cx);
2343                        }
2344
2345                        // Handle refusal - distinguish between user prompt and tool call refusals
2346                        if let acp::StopReason::Refusal = r.stop_reason {
2347                            this.had_error = true;
2348                            if let Some((user_msg_ix, _)) = this.last_user_message() {
2349                                // Check if there's a completed tool call with results after the last user message
2350                                // This indicates the refusal is in response to tool output, not the user's prompt
2351                                let has_completed_tool_call_after_user_msg =
2352                                    this.entries.iter().skip(user_msg_ix + 1).any(|entry| {
2353                                        if let AgentThreadEntry::ToolCall(tool_call) = entry {
2354                                            // Check if the tool call has completed and has output
2355                                            matches!(tool_call.status, ToolCallStatus::Completed)
2356                                                && tool_call.raw_output.is_some()
2357                                        } else {
2358                                            false
2359                                        }
2360                                    });
2361
2362                                if has_completed_tool_call_after_user_msg {
2363                                    // Refusal is due to tool output - don't truncate, just notify
2364                                    // The model refused based on what the tool returned
2365                                    cx.emit(AcpThreadEvent::Refusal);
2366                                } else {
2367                                    // User prompt was refused - truncate back to before the user message
2368                                    let range = user_msg_ix..this.entries.len();
2369                                    if range.start < range.end {
2370                                        this.entries.truncate(user_msg_ix);
2371                                        cx.emit(AcpThreadEvent::EntriesRemoved(range));
2372                                    }
2373                                    cx.emit(AcpThreadEvent::Refusal);
2374                                }
2375                            } else {
2376                                // No user message found, treat as general refusal
2377                                cx.emit(AcpThreadEvent::Refusal);
2378                            }
2379                        }
2380
2381                        if cx.has_flag::<AcpBetaFeatureFlag>()
2382                            && let Some(response_usage) = &r.usage
2383                        {
2384                            let usage = this.token_usage.get_or_insert_with(Default::default);
2385                            usage.input_tokens = response_usage.input_tokens;
2386                            usage.output_tokens = response_usage.output_tokens;
2387                            cx.emit(AcpThreadEvent::TokenUsageUpdated);
2388                        }
2389
2390                        cx.emit(AcpThreadEvent::Stopped(r.stop_reason));
2391                        Ok(Some(r))
2392                    }
2393                    Err(e) => {
2394                        Self::flush_streaming_text(&mut this.streaming_text_buffer, cx);
2395
2396                        this.had_error = true;
2397                        cx.emit(AcpThreadEvent::Error);
2398                        log::error!("Error in run turn: {:?}", e);
2399                        Err(e)
2400                    }
2401                }
2402            })?
2403        })
2404        .boxed()
2405    }
2406
2407    pub fn cancel(&mut self, cx: &mut Context<Self>) -> Task<()> {
2408        let Some(turn) = self.running_turn.take() else {
2409            return Task::ready(());
2410        };
2411        self.connection.cancel(&self.session_id, cx);
2412
2413        Self::flush_streaming_text(&mut self.streaming_text_buffer, cx);
2414        self.mark_pending_tools_as_canceled();
2415
2416        // Wait for the send task to complete
2417        cx.background_spawn(turn.send_task)
2418    }
2419
2420    fn mark_pending_tools_as_canceled(&mut self) {
2421        for entry in self.entries.iter_mut() {
2422            if let AgentThreadEntry::ToolCall(call) = entry {
2423                let cancel = matches!(
2424                    call.status,
2425                    ToolCallStatus::Pending
2426                        | ToolCallStatus::WaitingForConfirmation { .. }
2427                        | ToolCallStatus::InProgress
2428                );
2429
2430                if cancel {
2431                    call.status = ToolCallStatus::Canceled;
2432                }
2433            }
2434        }
2435    }
2436
2437    /// Restores the git working tree to the state at the given checkpoint (if one exists)
2438    pub fn restore_checkpoint(
2439        &mut self,
2440        id: UserMessageId,
2441        cx: &mut Context<Self>,
2442    ) -> Task<Result<()>> {
2443        let Some((_, message)) = self.user_message_mut(&id) else {
2444            return Task::ready(Err(anyhow!("message not found")));
2445        };
2446
2447        let checkpoint = message
2448            .checkpoint
2449            .as_ref()
2450            .map(|c| c.git_checkpoint.clone());
2451
2452        // Cancel any in-progress generation before restoring
2453        let cancel_task = self.cancel(cx);
2454        let rewind = self.rewind(id.clone(), cx);
2455        let git_store = self.project.read(cx).git_store().clone();
2456
2457        cx.spawn(async move |_, cx| {
2458            cancel_task.await;
2459            rewind.await?;
2460            if let Some(checkpoint) = checkpoint {
2461                git_store
2462                    .update(cx, |git, cx| git.restore_checkpoint(checkpoint, cx))
2463                    .await?;
2464            }
2465
2466            Ok(())
2467        })
2468    }
2469
2470    /// Rewinds this thread to before the entry at `index`, removing it and all
2471    /// subsequent entries while rejecting any action_log changes made from that point.
2472    /// Unlike `restore_checkpoint`, this method does not restore from git.
2473    pub fn rewind(&mut self, id: UserMessageId, cx: &mut Context<Self>) -> Task<Result<()>> {
2474        let Some(truncate) = self.connection.truncate(&self.session_id, cx) else {
2475            return Task::ready(Err(anyhow!("not supported")));
2476        };
2477
2478        Self::flush_streaming_text(&mut self.streaming_text_buffer, cx);
2479        let telemetry = ActionLogTelemetry::from(&*self);
2480        cx.spawn(async move |this, cx| {
2481            cx.update(|cx| truncate.run(id.clone(), cx)).await?;
2482            this.update(cx, |this, cx| {
2483                if let Some((ix, _)) = this.user_message_mut(&id) {
2484                    // Collect all terminals from entries that will be removed
2485                    let terminals_to_remove: Vec<acp::TerminalId> = this.entries[ix..]
2486                        .iter()
2487                        .flat_map(|entry| entry.terminals())
2488                        .filter_map(|terminal| terminal.read(cx).id().clone().into())
2489                        .collect();
2490
2491                    let range = ix..this.entries.len();
2492                    this.entries.truncate(ix);
2493                    cx.emit(AcpThreadEvent::EntriesRemoved(range));
2494
2495                    // Kill and remove the terminals
2496                    for terminal_id in terminals_to_remove {
2497                        if let Some(terminal) = this.terminals.remove(&terminal_id) {
2498                            terminal.update(cx, |terminal, cx| {
2499                                terminal.kill(cx);
2500                            });
2501                        }
2502                    }
2503                }
2504                this.action_log().update(cx, |action_log, cx| {
2505                    action_log.reject_all_edits(Some(telemetry), cx)
2506                })
2507            })?
2508            .await;
2509            Ok(())
2510        })
2511    }
2512
2513    fn update_last_checkpoint(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
2514        let git_store = self.project.read(cx).git_store().clone();
2515
2516        let Some((_, message)) = self.last_user_message() else {
2517            return Task::ready(Ok(()));
2518        };
2519        let Some(user_message_id) = message.id.clone() else {
2520            return Task::ready(Ok(()));
2521        };
2522        let Some(checkpoint) = message.checkpoint.as_ref() else {
2523            return Task::ready(Ok(()));
2524        };
2525        let old_checkpoint = checkpoint.git_checkpoint.clone();
2526
2527        let new_checkpoint = git_store.update(cx, |git, cx| git.checkpoint(cx));
2528        cx.spawn(async move |this, cx| {
2529            let Some(new_checkpoint) = new_checkpoint
2530                .await
2531                .context("failed to get new checkpoint")
2532                .log_err()
2533            else {
2534                return Ok(());
2535            };
2536
2537            let equal = git_store
2538                .update(cx, |git, cx| {
2539                    git.compare_checkpoints(old_checkpoint.clone(), new_checkpoint, cx)
2540                })
2541                .await
2542                .unwrap_or(true);
2543
2544            this.update(cx, |this, cx| {
2545                if let Some((ix, message)) = this.user_message_mut(&user_message_id) {
2546                    if let Some(checkpoint) = message.checkpoint.as_mut() {
2547                        checkpoint.show = !equal;
2548                        cx.emit(AcpThreadEvent::EntryUpdated(ix));
2549                    }
2550                }
2551            })?;
2552
2553            Ok(())
2554        })
2555    }
2556
2557    fn last_user_message(&mut self) -> Option<(usize, &mut UserMessage)> {
2558        self.entries
2559            .iter_mut()
2560            .enumerate()
2561            .rev()
2562            .find_map(|(ix, entry)| {
2563                if let AgentThreadEntry::UserMessage(message) = entry {
2564                    Some((ix, message))
2565                } else {
2566                    None
2567                }
2568            })
2569    }
2570
2571    fn user_message_mut(&mut self, id: &UserMessageId) -> Option<(usize, &mut UserMessage)> {
2572        self.entries.iter_mut().enumerate().find_map(|(ix, entry)| {
2573            if let AgentThreadEntry::UserMessage(message) = entry {
2574                if message.id.as_ref() == Some(id) {
2575                    Some((ix, message))
2576                } else {
2577                    None
2578                }
2579            } else {
2580                None
2581            }
2582        })
2583    }
2584
2585    pub fn read_text_file(
2586        &self,
2587        path: PathBuf,
2588        line: Option<u32>,
2589        limit: Option<u32>,
2590        reuse_shared_snapshot: bool,
2591        cx: &mut Context<Self>,
2592    ) -> Task<Result<String, acp::Error>> {
2593        // Args are 1-based, move to 0-based
2594        let line = line.unwrap_or_default().saturating_sub(1);
2595        let limit = limit.unwrap_or(u32::MAX);
2596        let project = self.project.clone();
2597        let action_log = self.action_log.clone();
2598        let should_update_agent_location = self.parent_session_id.is_none();
2599        cx.spawn(async move |this, cx| {
2600            let load = project.update(cx, |project, cx| {
2601                let path = project
2602                    .project_path_for_absolute_path(&path, cx)
2603                    .ok_or_else(|| {
2604                        acp::Error::resource_not_found(Some(path.display().to_string()))
2605                    })?;
2606                Ok::<_, acp::Error>(project.open_buffer(path, cx))
2607            })?;
2608
2609            let buffer = load.await?;
2610
2611            let snapshot = if reuse_shared_snapshot {
2612                this.read_with(cx, |this, _| {
2613                    this.shared_buffers.get(&buffer.clone()).cloned()
2614                })
2615                .log_err()
2616                .flatten()
2617            } else {
2618                None
2619            };
2620
2621            let snapshot = if let Some(snapshot) = snapshot {
2622                snapshot
2623            } else {
2624                action_log.update(cx, |action_log, cx| {
2625                    action_log.buffer_read(buffer.clone(), cx);
2626                });
2627
2628                let snapshot = buffer.update(cx, |buffer, _| buffer.snapshot());
2629                this.update(cx, |this, _| {
2630                    this.shared_buffers.insert(buffer.clone(), snapshot.clone());
2631                })?;
2632                snapshot
2633            };
2634
2635            let max_point = snapshot.max_point();
2636            let start_position = Point::new(line, 0);
2637
2638            if start_position > max_point {
2639                return Err(acp::Error::invalid_params().data(format!(
2640                    "Attempting to read beyond the end of the file, line {}:{}",
2641                    max_point.row + 1,
2642                    max_point.column
2643                )));
2644            }
2645
2646            let start = snapshot.anchor_before(start_position);
2647            let end = snapshot.anchor_before(Point::new(line.saturating_add(limit), 0));
2648
2649            if should_update_agent_location {
2650                project.update(cx, |project, cx| {
2651                    project.set_agent_location(
2652                        Some(AgentLocation {
2653                            buffer: buffer.downgrade(),
2654                            position: start,
2655                        }),
2656                        cx,
2657                    );
2658                });
2659            }
2660
2661            Ok(snapshot.text_for_range(start..end).collect::<String>())
2662        })
2663    }
2664
2665    pub fn write_text_file(
2666        &self,
2667        path: PathBuf,
2668        content: String,
2669        cx: &mut Context<Self>,
2670    ) -> Task<Result<()>> {
2671        let project = self.project.clone();
2672        let action_log = self.action_log.clone();
2673        let should_update_agent_location = self.parent_session_id.is_none();
2674        cx.spawn(async move |this, cx| {
2675            let load = project.update(cx, |project, cx| {
2676                let path = project
2677                    .project_path_for_absolute_path(&path, cx)
2678                    .context("invalid path")?;
2679                anyhow::Ok(project.open_buffer(path, cx))
2680            });
2681            let buffer = load?.await?;
2682            let snapshot = this.update(cx, |this, cx| {
2683                this.shared_buffers
2684                    .get(&buffer)
2685                    .cloned()
2686                    .unwrap_or_else(|| buffer.read(cx).snapshot())
2687            })?;
2688            let edits = cx
2689                .background_executor()
2690                .spawn(async move {
2691                    let old_text = snapshot.text();
2692                    text_diff(old_text.as_str(), &content)
2693                        .into_iter()
2694                        .map(|(range, replacement)| {
2695                            (snapshot.anchor_range_inside(range), replacement)
2696                        })
2697                        .collect::<Vec<_>>()
2698                })
2699                .await;
2700
2701            if should_update_agent_location {
2702                project.update(cx, |project, cx| {
2703                    project.set_agent_location(
2704                        Some(AgentLocation {
2705                            buffer: buffer.downgrade(),
2706                            position: edits
2707                                .last()
2708                                .map(|(range, _)| range.end)
2709                                .unwrap_or(Anchor::min_for_buffer(buffer.read(cx).remote_id())),
2710                        }),
2711                        cx,
2712                    );
2713                });
2714            }
2715
2716            let format_on_save = cx.update(|cx| {
2717                action_log.update(cx, |action_log, cx| {
2718                    action_log.buffer_read(buffer.clone(), cx);
2719                });
2720
2721                let format_on_save = buffer.update(cx, |buffer, cx| {
2722                    buffer.edit(edits, None, cx);
2723
2724                    let settings =
2725                        language::language_settings::LanguageSettings::for_buffer(buffer, cx);
2726
2727                    settings.format_on_save != FormatOnSave::Off
2728                });
2729                action_log.update(cx, |action_log, cx| {
2730                    action_log.buffer_edited(buffer.clone(), cx);
2731                });
2732                format_on_save
2733            });
2734
2735            if format_on_save {
2736                let format_task = project.update(cx, |project, cx| {
2737                    project.format(
2738                        HashSet::from_iter([buffer.clone()]),
2739                        LspFormatTarget::Buffers,
2740                        false,
2741                        FormatTrigger::Save,
2742                        cx,
2743                    )
2744                });
2745                format_task.await.log_err();
2746
2747                action_log.update(cx, |action_log, cx| {
2748                    action_log.buffer_edited(buffer.clone(), cx);
2749                });
2750            }
2751
2752            project
2753                .update(cx, |project, cx| project.save_buffer(buffer, cx))
2754                .await
2755        })
2756    }
2757
2758    pub fn create_terminal(
2759        &self,
2760        command: String,
2761        args: Vec<String>,
2762        extra_env: Vec<acp::EnvVariable>,
2763        cwd: Option<PathBuf>,
2764        output_byte_limit: Option<u64>,
2765        cx: &mut Context<Self>,
2766    ) -> Task<Result<Entity<Terminal>>> {
2767        let env = match &cwd {
2768            Some(dir) => self.project.update(cx, |project, cx| {
2769                project.environment().update(cx, |env, cx| {
2770                    env.directory_environment(dir.as_path().into(), cx)
2771                })
2772            }),
2773            None => Task::ready(None).shared(),
2774        };
2775        let env = cx.spawn(async move |_, _| {
2776            let mut env = env.await.unwrap_or_default();
2777            // Disables paging for `git` and hopefully other commands
2778            env.insert("PAGER".into(), "".into());
2779            for var in extra_env {
2780                env.insert(var.name, var.value);
2781            }
2782            env
2783        });
2784
2785        let project = self.project.clone();
2786        let language_registry = project.read(cx).languages().clone();
2787        let is_windows = project.read(cx).path_style(cx).is_windows();
2788
2789        let terminal_id = acp::TerminalId::new(Uuid::new_v4().to_string());
2790        let terminal_task = cx.spawn({
2791            let terminal_id = terminal_id.clone();
2792            async move |_this, cx| {
2793                let env = env.await;
2794                let shell = project
2795                    .update(cx, |project, cx| {
2796                        project
2797                            .remote_client()
2798                            .and_then(|r| r.read(cx).default_system_shell())
2799                    })
2800                    .unwrap_or_else(|| get_default_system_shell_preferring_bash());
2801                let (task_command, task_args) =
2802                    ShellBuilder::new(&Shell::Program(shell), is_windows)
2803                        .redirect_stdin_to_dev_null()
2804                        .build(Some(command.clone()), &args);
2805                let terminal = project
2806                    .update(cx, |project, cx| {
2807                        project.create_terminal_task(
2808                            task::SpawnInTerminal {
2809                                command: Some(task_command),
2810                                args: task_args,
2811                                cwd: cwd.clone(),
2812                                env,
2813                                ..Default::default()
2814                            },
2815                            cx,
2816                        )
2817                    })
2818                    .await?;
2819
2820                anyhow::Ok(cx.new(|cx| {
2821                    Terminal::new(
2822                        terminal_id,
2823                        &format!("{} {}", command, args.join(" ")),
2824                        cwd,
2825                        output_byte_limit.map(|l| l as usize),
2826                        terminal,
2827                        language_registry,
2828                        cx,
2829                    )
2830                }))
2831            }
2832        });
2833
2834        cx.spawn(async move |this, cx| {
2835            let terminal = terminal_task.await?;
2836            this.update(cx, |this, _cx| {
2837                this.terminals.insert(terminal_id, terminal.clone());
2838                terminal
2839            })
2840        })
2841    }
2842
2843    pub fn kill_terminal(
2844        &mut self,
2845        terminal_id: acp::TerminalId,
2846        cx: &mut Context<Self>,
2847    ) -> Result<()> {
2848        self.terminals
2849            .get(&terminal_id)
2850            .context("Terminal not found")?
2851            .update(cx, |terminal, cx| {
2852                terminal.kill(cx);
2853            });
2854
2855        Ok(())
2856    }
2857
2858    pub fn release_terminal(
2859        &mut self,
2860        terminal_id: acp::TerminalId,
2861        cx: &mut Context<Self>,
2862    ) -> Result<()> {
2863        self.terminals
2864            .remove(&terminal_id)
2865            .context("Terminal not found")?
2866            .update(cx, |terminal, cx| {
2867                terminal.kill(cx);
2868            });
2869
2870        Ok(())
2871    }
2872
2873    pub fn terminal(&self, terminal_id: acp::TerminalId) -> Result<Entity<Terminal>> {
2874        self.terminals
2875            .get(&terminal_id)
2876            .context("Terminal not found")
2877            .cloned()
2878    }
2879
2880    pub fn to_markdown(&self, cx: &App) -> String {
2881        self.entries.iter().map(|e| e.to_markdown(cx)).collect()
2882    }
2883
2884    pub fn emit_load_error(&mut self, error: LoadError, cx: &mut Context<Self>) {
2885        cx.emit(AcpThreadEvent::LoadError(error));
2886    }
2887
2888    pub fn register_terminal_created(
2889        &mut self,
2890        terminal_id: acp::TerminalId,
2891        command_label: String,
2892        working_dir: Option<PathBuf>,
2893        output_byte_limit: Option<u64>,
2894        terminal: Entity<::terminal::Terminal>,
2895        cx: &mut Context<Self>,
2896    ) -> Entity<Terminal> {
2897        let language_registry = self.project.read(cx).languages().clone();
2898
2899        let entity = cx.new(|cx| {
2900            Terminal::new(
2901                terminal_id.clone(),
2902                &command_label,
2903                working_dir.clone(),
2904                output_byte_limit.map(|l| l as usize),
2905                terminal,
2906                language_registry,
2907                cx,
2908            )
2909        });
2910        self.terminals.insert(terminal_id.clone(), entity.clone());
2911        entity
2912    }
2913
2914    pub fn mark_as_subagent_output(&mut self, cx: &mut Context<Self>) {
2915        for entry in self.entries.iter_mut().rev() {
2916            if let AgentThreadEntry::AssistantMessage(assistant_message) = entry {
2917                assistant_message.is_subagent_output = true;
2918                cx.notify();
2919                return;
2920            }
2921        }
2922    }
2923
2924    pub fn on_terminal_provider_event(
2925        &mut self,
2926        event: TerminalProviderEvent,
2927        cx: &mut Context<Self>,
2928    ) {
2929        match event {
2930            TerminalProviderEvent::Created {
2931                terminal_id,
2932                label,
2933                cwd,
2934                output_byte_limit,
2935                terminal,
2936            } => {
2937                let entity = self.register_terminal_created(
2938                    terminal_id.clone(),
2939                    label,
2940                    cwd,
2941                    output_byte_limit,
2942                    terminal,
2943                    cx,
2944                );
2945
2946                if let Some(mut chunks) = self.pending_terminal_output.remove(&terminal_id) {
2947                    for data in chunks.drain(..) {
2948                        entity.update(cx, |term, cx| {
2949                            term.inner().update(cx, |inner, cx| {
2950                                inner.write_output(&data, cx);
2951                            })
2952                        });
2953                    }
2954                }
2955
2956                if let Some(_status) = self.pending_terminal_exit.remove(&terminal_id) {
2957                    entity.update(cx, |_term, cx| {
2958                        cx.notify();
2959                    });
2960                }
2961
2962                cx.notify();
2963            }
2964            TerminalProviderEvent::Output { terminal_id, data } => {
2965                if let Some(entity) = self.terminals.get(&terminal_id) {
2966                    entity.update(cx, |term, cx| {
2967                        term.inner().update(cx, |inner, cx| {
2968                            inner.write_output(&data, cx);
2969                        })
2970                    });
2971                } else {
2972                    self.pending_terminal_output
2973                        .entry(terminal_id)
2974                        .or_default()
2975                        .push(data);
2976                }
2977            }
2978            TerminalProviderEvent::TitleChanged { terminal_id, title } => {
2979                if let Some(entity) = self.terminals.get(&terminal_id) {
2980                    entity.update(cx, |term, cx| {
2981                        term.inner().update(cx, |inner, cx| {
2982                            inner.breadcrumb_text = title;
2983                            cx.emit(::terminal::Event::BreadcrumbsChanged);
2984                        })
2985                    });
2986                }
2987            }
2988            TerminalProviderEvent::Exit {
2989                terminal_id,
2990                status,
2991            } => {
2992                if let Some(entity) = self.terminals.get(&terminal_id) {
2993                    entity.update(cx, |_term, cx| {
2994                        cx.notify();
2995                    });
2996                } else {
2997                    self.pending_terminal_exit.insert(terminal_id, status);
2998                }
2999            }
3000        }
3001    }
3002}
3003
3004fn markdown_for_raw_output(
3005    raw_output: &serde_json::Value,
3006    language_registry: &Arc<LanguageRegistry>,
3007    cx: &mut App,
3008) -> Option<Entity<Markdown>> {
3009    match raw_output {
3010        serde_json::Value::Null => None,
3011        serde_json::Value::Bool(value) => Some(cx.new(|cx| {
3012            Markdown::new(
3013                value.to_string().into(),
3014                Some(language_registry.clone()),
3015                None,
3016                cx,
3017            )
3018        })),
3019        serde_json::Value::Number(value) => Some(cx.new(|cx| {
3020            Markdown::new(
3021                value.to_string().into(),
3022                Some(language_registry.clone()),
3023                None,
3024                cx,
3025            )
3026        })),
3027        serde_json::Value::String(value) => Some(cx.new(|cx| {
3028            Markdown::new(
3029                value.clone().into(),
3030                Some(language_registry.clone()),
3031                None,
3032                cx,
3033            )
3034        })),
3035        value => Some(cx.new(|cx| {
3036            let pretty_json = to_string_pretty(value).unwrap_or_else(|_| value.to_string());
3037
3038            Markdown::new(
3039                format!("```json\n{}\n```", pretty_json).into(),
3040                Some(language_registry.clone()),
3041                None,
3042                cx,
3043            )
3044        })),
3045    }
3046}
3047
3048#[cfg(test)]
3049mod tests {
3050    use super::*;
3051    use anyhow::anyhow;
3052    use futures::{channel::mpsc, future::LocalBoxFuture, select};
3053    use gpui::{App, AsyncApp, TestAppContext, WeakEntity};
3054    use indoc::indoc;
3055    use project::{AgentId, FakeFs, Fs};
3056    use rand::{distr, prelude::*};
3057    use serde_json::json;
3058    use settings::SettingsStore;
3059    use smol::stream::StreamExt as _;
3060    use std::{
3061        any::Any,
3062        cell::RefCell,
3063        path::Path,
3064        rc::Rc,
3065        sync::atomic::{AtomicBool, AtomicUsize, Ordering::SeqCst},
3066        time::Duration,
3067    };
3068    use util::{path, path_list::PathList};
3069
3070    fn init_test(cx: &mut TestAppContext) {
3071        env_logger::try_init().ok();
3072        cx.update(|cx| {
3073            let settings_store = SettingsStore::test(cx);
3074            cx.set_global(settings_store);
3075        });
3076    }
3077
3078    #[gpui::test]
3079    async fn test_terminal_output_buffered_before_created_renders(cx: &mut gpui::TestAppContext) {
3080        init_test(cx);
3081
3082        let fs = FakeFs::new(cx.executor());
3083        let project = Project::test(fs, [], cx).await;
3084        let connection = Rc::new(FakeAgentConnection::new());
3085        let thread = cx
3086            .update(|cx| {
3087                connection.new_session(
3088                    project,
3089                    PathList::new(&[std::path::Path::new(path!("/test"))]),
3090                    cx,
3091                )
3092            })
3093            .await
3094            .unwrap();
3095
3096        let terminal_id = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
3097
3098        // Send Output BEFORE Created - should be buffered by acp_thread
3099        thread.update(cx, |thread, cx| {
3100            thread.on_terminal_provider_event(
3101                TerminalProviderEvent::Output {
3102                    terminal_id: terminal_id.clone(),
3103                    data: b"hello buffered".to_vec(),
3104                },
3105                cx,
3106            );
3107        });
3108
3109        // Create a display-only terminal and then send Created
3110        let lower = cx.new(|cx| {
3111            let builder = ::terminal::TerminalBuilder::new_display_only(
3112                ::terminal::terminal_settings::CursorShape::default(),
3113                ::terminal::terminal_settings::AlternateScroll::On,
3114                None,
3115                0,
3116                cx.background_executor(),
3117                PathStyle::local(),
3118            )
3119            .unwrap();
3120            builder.subscribe(cx)
3121        });
3122
3123        thread.update(cx, |thread, cx| {
3124            thread.on_terminal_provider_event(
3125                TerminalProviderEvent::Created {
3126                    terminal_id: terminal_id.clone(),
3127                    label: "Buffered Test".to_string(),
3128                    cwd: None,
3129                    output_byte_limit: None,
3130                    terminal: lower.clone(),
3131                },
3132                cx,
3133            );
3134        });
3135
3136        // After Created, buffered Output should have been flushed into the renderer
3137        let content = thread.read_with(cx, |thread, cx| {
3138            let term = thread.terminal(terminal_id.clone()).unwrap();
3139            term.read_with(cx, |t, cx| t.inner().read(cx).get_content())
3140        });
3141
3142        assert!(
3143            content.contains("hello buffered"),
3144            "expected buffered output to render, got: {content}"
3145        );
3146    }
3147
3148    #[gpui::test]
3149    async fn test_terminal_output_and_exit_buffered_before_created(cx: &mut gpui::TestAppContext) {
3150        init_test(cx);
3151
3152        let fs = FakeFs::new(cx.executor());
3153        let project = Project::test(fs, [], cx).await;
3154        let connection = Rc::new(FakeAgentConnection::new());
3155        let thread = cx
3156            .update(|cx| {
3157                connection.new_session(
3158                    project,
3159                    PathList::new(&[std::path::Path::new(path!("/test"))]),
3160                    cx,
3161                )
3162            })
3163            .await
3164            .unwrap();
3165
3166        let terminal_id = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
3167
3168        // Send Output BEFORE Created
3169        thread.update(cx, |thread, cx| {
3170            thread.on_terminal_provider_event(
3171                TerminalProviderEvent::Output {
3172                    terminal_id: terminal_id.clone(),
3173                    data: b"pre-exit data".to_vec(),
3174                },
3175                cx,
3176            );
3177        });
3178
3179        // Send Exit BEFORE Created
3180        thread.update(cx, |thread, cx| {
3181            thread.on_terminal_provider_event(
3182                TerminalProviderEvent::Exit {
3183                    terminal_id: terminal_id.clone(),
3184                    status: acp::TerminalExitStatus::new().exit_code(0),
3185                },
3186                cx,
3187            );
3188        });
3189
3190        // Now create a display-only lower-level terminal and send Created
3191        let lower = cx.new(|cx| {
3192            let builder = ::terminal::TerminalBuilder::new_display_only(
3193                ::terminal::terminal_settings::CursorShape::default(),
3194                ::terminal::terminal_settings::AlternateScroll::On,
3195                None,
3196                0,
3197                cx.background_executor(),
3198                PathStyle::local(),
3199            )
3200            .unwrap();
3201            builder.subscribe(cx)
3202        });
3203
3204        thread.update(cx, |thread, cx| {
3205            thread.on_terminal_provider_event(
3206                TerminalProviderEvent::Created {
3207                    terminal_id: terminal_id.clone(),
3208                    label: "Buffered Exit Test".to_string(),
3209                    cwd: None,
3210                    output_byte_limit: None,
3211                    terminal: lower.clone(),
3212                },
3213                cx,
3214            );
3215        });
3216
3217        // Output should be present after Created (flushed from buffer)
3218        let content = thread.read_with(cx, |thread, cx| {
3219            let term = thread.terminal(terminal_id.clone()).unwrap();
3220            term.read_with(cx, |t, cx| t.inner().read(cx).get_content())
3221        });
3222
3223        assert!(
3224            content.contains("pre-exit data"),
3225            "expected pre-exit data to render, got: {content}"
3226        );
3227    }
3228
3229    /// Test that killing a terminal via Terminal::kill properly:
3230    /// 1. Causes wait_for_exit to complete (doesn't hang forever)
3231    /// 2. The underlying terminal still has the output that was written before the kill
3232    ///
3233    /// This test verifies that the fix to kill_active_task (which now also kills
3234    /// the shell process in addition to the foreground process) properly allows
3235    /// wait_for_exit to complete instead of hanging indefinitely.
3236    #[cfg(unix)]
3237    #[gpui::test]
3238    async fn test_terminal_kill_allows_wait_for_exit_to_complete(cx: &mut gpui::TestAppContext) {
3239        use std::collections::HashMap;
3240        use task::Shell;
3241        use util::shell_builder::ShellBuilder;
3242
3243        init_test(cx);
3244        cx.executor().allow_parking();
3245
3246        let fs = FakeFs::new(cx.executor());
3247        let project = Project::test(fs, [], cx).await;
3248        let connection = Rc::new(FakeAgentConnection::new());
3249        let thread = cx
3250            .update(|cx| {
3251                connection.new_session(
3252                    project.clone(),
3253                    PathList::new(&[Path::new(path!("/test"))]),
3254                    cx,
3255                )
3256            })
3257            .await
3258            .unwrap();
3259
3260        let terminal_id = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
3261
3262        // Create a real PTY terminal that runs a command which prints output then sleeps
3263        // We use printf instead of echo and chain with && sleep to ensure proper execution
3264        let (completion_tx, _completion_rx) = smol::channel::unbounded();
3265        let (program, args) = ShellBuilder::new(&Shell::System, false).build(
3266            Some("printf 'output_before_kill\\n' && sleep 60".to_owned()),
3267            &[],
3268        );
3269
3270        let builder = cx
3271            .update(|cx| {
3272                ::terminal::TerminalBuilder::new(
3273                    None,
3274                    None,
3275                    task::Shell::WithArguments {
3276                        program,
3277                        args,
3278                        title_override: None,
3279                    },
3280                    HashMap::default(),
3281                    ::terminal::terminal_settings::CursorShape::default(),
3282                    ::terminal::terminal_settings::AlternateScroll::On,
3283                    None,
3284                    vec![],
3285                    0,
3286                    false,
3287                    0,
3288                    Some(completion_tx),
3289                    cx,
3290                    vec![],
3291                    PathStyle::local(),
3292                )
3293            })
3294            .await
3295            .unwrap();
3296
3297        let lower_terminal = cx.new(|cx| builder.subscribe(cx));
3298
3299        // Create the acp_thread Terminal wrapper
3300        thread.update(cx, |thread, cx| {
3301            thread.on_terminal_provider_event(
3302                TerminalProviderEvent::Created {
3303                    terminal_id: terminal_id.clone(),
3304                    label: "printf output_before_kill && sleep 60".to_string(),
3305                    cwd: None,
3306                    output_byte_limit: None,
3307                    terminal: lower_terminal.clone(),
3308                },
3309                cx,
3310            );
3311        });
3312
3313        // Poll until the printf command produces output, rather than using a
3314        // fixed sleep which is flaky on loaded machines.
3315        let deadline = std::time::Instant::now() + Duration::from_secs(10);
3316        loop {
3317            let has_output = thread.read_with(cx, |thread, cx| {
3318                let term = thread
3319                    .terminals
3320                    .get(&terminal_id)
3321                    .expect("terminal not found");
3322                let content = term.read(cx).inner().read(cx).get_content();
3323                content.contains("output_before_kill")
3324            });
3325            if has_output {
3326                break;
3327            }
3328            assert!(
3329                std::time::Instant::now() < deadline,
3330                "Timed out waiting for printf output to appear in terminal",
3331            );
3332            cx.executor().timer(Duration::from_millis(50)).await;
3333        }
3334
3335        // Get the acp_thread Terminal and kill it
3336        let wait_for_exit = thread.update(cx, |thread, cx| {
3337            let term = thread.terminals.get(&terminal_id).unwrap();
3338            let wait_for_exit = term.read(cx).wait_for_exit();
3339            term.update(cx, |term, cx| {
3340                term.kill(cx);
3341            });
3342            wait_for_exit
3343        });
3344
3345        // KEY ASSERTION: wait_for_exit should complete within a reasonable time (not hang).
3346        // Before the fix to kill_active_task, this would hang forever because
3347        // only the foreground process was killed, not the shell, so the PTY
3348        // child never exited and wait_for_completed_task never completed.
3349        let exit_result = futures::select! {
3350            result = futures::FutureExt::fuse(wait_for_exit) => Some(result),
3351            _ = futures::FutureExt::fuse(cx.background_executor.timer(Duration::from_secs(5))) => None,
3352        };
3353
3354        assert!(
3355            exit_result.is_some(),
3356            "wait_for_exit should complete after kill, but it timed out. \
3357            This indicates kill_active_task is not properly killing the shell process."
3358        );
3359
3360        // Give the system a chance to process any pending updates
3361        cx.run_until_parked();
3362
3363        // Verify that the underlying terminal still has the output that was
3364        // written before the kill. This verifies that killing doesn't lose output.
3365        let inner_content = thread.read_with(cx, |thread, cx| {
3366            let term = thread.terminals.get(&terminal_id).unwrap();
3367            term.read(cx).inner().read(cx).get_content()
3368        });
3369
3370        assert!(
3371            inner_content.contains("output_before_kill"),
3372            "Underlying terminal should contain output from before kill, got: {}",
3373            inner_content
3374        );
3375    }
3376
3377    #[gpui::test]
3378    async fn test_push_user_content_block(cx: &mut gpui::TestAppContext) {
3379        init_test(cx);
3380
3381        let fs = FakeFs::new(cx.executor());
3382        let project = Project::test(fs, [], cx).await;
3383        let connection = Rc::new(FakeAgentConnection::new());
3384        let thread = cx
3385            .update(|cx| {
3386                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
3387            })
3388            .await
3389            .unwrap();
3390
3391        // Test creating a new user message
3392        thread.update(cx, |thread, cx| {
3393            thread.push_user_content_block(None, "Hello, ".into(), cx);
3394        });
3395
3396        thread.update(cx, |thread, cx| {
3397            assert_eq!(thread.entries.len(), 1);
3398            if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
3399                assert_eq!(user_msg.id, None);
3400                assert_eq!(user_msg.content.to_markdown(cx), "Hello, ");
3401            } else {
3402                panic!("Expected UserMessage");
3403            }
3404        });
3405
3406        // Test appending to existing user message
3407        let message_1_id = UserMessageId::new();
3408        thread.update(cx, |thread, cx| {
3409            thread.push_user_content_block(Some(message_1_id.clone()), "world!".into(), cx);
3410        });
3411
3412        thread.update(cx, |thread, cx| {
3413            assert_eq!(thread.entries.len(), 1);
3414            if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
3415                assert_eq!(user_msg.id, Some(message_1_id));
3416                assert_eq!(user_msg.content.to_markdown(cx), "Hello, world!");
3417            } else {
3418                panic!("Expected UserMessage");
3419            }
3420        });
3421
3422        // Test creating new user message after assistant message
3423        thread.update(cx, |thread, cx| {
3424            thread.push_assistant_content_block("Assistant response".into(), false, cx);
3425        });
3426
3427        let message_2_id = UserMessageId::new();
3428        thread.update(cx, |thread, cx| {
3429            thread.push_user_content_block(
3430                Some(message_2_id.clone()),
3431                "New user message".into(),
3432                cx,
3433            );
3434        });
3435
3436        thread.update(cx, |thread, cx| {
3437            assert_eq!(thread.entries.len(), 3);
3438            if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[2] {
3439                assert_eq!(user_msg.id, Some(message_2_id));
3440                assert_eq!(user_msg.content.to_markdown(cx), "New user message");
3441            } else {
3442                panic!("Expected UserMessage at index 2");
3443            }
3444        });
3445    }
3446
3447    #[gpui::test]
3448    async fn test_thinking_concatenation(cx: &mut gpui::TestAppContext) {
3449        init_test(cx);
3450
3451        let fs = FakeFs::new(cx.executor());
3452        let project = Project::test(fs, [], cx).await;
3453        let connection = Rc::new(FakeAgentConnection::new().on_user_message(
3454            |_, thread, mut cx| {
3455                async move {
3456                    thread.update(&mut cx, |thread, cx| {
3457                        thread
3458                            .handle_session_update(
3459                                acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk::new(
3460                                    "Thinking ".into(),
3461                                )),
3462                                cx,
3463                            )
3464                            .unwrap();
3465                        thread
3466                            .handle_session_update(
3467                                acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk::new(
3468                                    "hard!".into(),
3469                                )),
3470                                cx,
3471                            )
3472                            .unwrap();
3473                    })?;
3474                    Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3475                }
3476                .boxed_local()
3477            },
3478        ));
3479
3480        let thread = cx
3481            .update(|cx| {
3482                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
3483            })
3484            .await
3485            .unwrap();
3486
3487        thread
3488            .update(cx, |thread, cx| thread.send_raw("Hello from Zed!", cx))
3489            .await
3490            .unwrap();
3491
3492        let output = thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
3493        assert_eq!(
3494            output,
3495            indoc! {r#"
3496            ## User
3497
3498            Hello from Zed!
3499
3500            ## Assistant
3501
3502            <thinking>
3503            Thinking hard!
3504            </thinking>
3505
3506            "#}
3507        );
3508    }
3509
3510    #[gpui::test]
3511    async fn test_ignore_echoed_user_message_chunks_during_active_turn(
3512        cx: &mut gpui::TestAppContext,
3513    ) {
3514        init_test(cx);
3515
3516        let fs = FakeFs::new(cx.executor());
3517        let project = Project::test(fs, [], cx).await;
3518        let connection = Rc::new(FakeAgentConnection::new().on_user_message(
3519            |request, thread, mut cx| {
3520                async move {
3521                    let prompt = request.prompt.first().cloned().unwrap_or_else(|| "".into());
3522
3523                    thread.update(&mut cx, |thread, cx| {
3524                        thread
3525                            .handle_session_update(
3526                                acp::SessionUpdate::UserMessageChunk(acp::ContentChunk::new(
3527                                    prompt,
3528                                )),
3529                                cx,
3530                            )
3531                            .unwrap();
3532                    })?;
3533
3534                    Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3535                }
3536                .boxed_local()
3537            },
3538        ));
3539
3540        let thread = cx
3541            .update(|cx| {
3542                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
3543            })
3544            .await
3545            .unwrap();
3546
3547        thread
3548            .update(cx, |thread, cx| thread.send_raw("Hello from Zed!", cx))
3549            .await
3550            .unwrap();
3551
3552        let output = thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
3553        assert_eq!(output.matches("Hello from Zed!").count(), 1);
3554    }
3555
3556    #[gpui::test]
3557    async fn test_edits_concurrently_to_user(cx: &mut TestAppContext) {
3558        init_test(cx);
3559
3560        let fs = FakeFs::new(cx.executor());
3561        fs.insert_tree(path!("/tmp"), json!({"foo": "one\ntwo\nthree\n"}))
3562            .await;
3563        let project = Project::test(fs.clone(), [], cx).await;
3564        let (read_file_tx, read_file_rx) = oneshot::channel::<()>();
3565        let read_file_tx = Rc::new(RefCell::new(Some(read_file_tx)));
3566        let connection = Rc::new(FakeAgentConnection::new().on_user_message(
3567            move |_, thread, mut cx| {
3568                let read_file_tx = read_file_tx.clone();
3569                async move {
3570                    let content = thread
3571                        .update(&mut cx, |thread, cx| {
3572                            thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
3573                        })
3574                        .unwrap()
3575                        .await
3576                        .unwrap();
3577                    assert_eq!(content, "one\ntwo\nthree\n");
3578                    read_file_tx.take().unwrap().send(()).unwrap();
3579                    thread
3580                        .update(&mut cx, |thread, cx| {
3581                            thread.write_text_file(
3582                                path!("/tmp/foo").into(),
3583                                "one\ntwo\nthree\nfour\nfive\n".to_string(),
3584                                cx,
3585                            )
3586                        })
3587                        .unwrap()
3588                        .await
3589                        .unwrap();
3590                    Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3591                }
3592                .boxed_local()
3593            },
3594        ));
3595
3596        let (worktree, pathbuf) = project
3597            .update(cx, |project, cx| {
3598                project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
3599            })
3600            .await
3601            .unwrap();
3602        let buffer = project
3603            .update(cx, |project, cx| {
3604                project.open_buffer((worktree.read(cx).id(), pathbuf), cx)
3605            })
3606            .await
3607            .unwrap();
3608
3609        let thread = cx
3610            .update(|cx| {
3611                connection.new_session(project, PathList::new(&[Path::new(path!("/tmp"))]), cx)
3612            })
3613            .await
3614            .unwrap();
3615
3616        let request = thread.update(cx, |thread, cx| {
3617            thread.send_raw("Extend the count in /tmp/foo", cx)
3618        });
3619        read_file_rx.await.ok();
3620        buffer.update(cx, |buffer, cx| {
3621            buffer.edit([(0..0, "zero\n".to_string())], None, cx);
3622        });
3623        cx.run_until_parked();
3624        assert_eq!(
3625            buffer.read_with(cx, |buffer, _| buffer.text()),
3626            "zero\none\ntwo\nthree\nfour\nfive\n"
3627        );
3628        assert_eq!(
3629            String::from_utf8(fs.read_file_sync(path!("/tmp/foo")).unwrap()).unwrap(),
3630            "zero\none\ntwo\nthree\nfour\nfive\n"
3631        );
3632        request.await.unwrap();
3633    }
3634
3635    #[gpui::test]
3636    async fn test_reading_from_line(cx: &mut TestAppContext) {
3637        init_test(cx);
3638
3639        let fs = FakeFs::new(cx.executor());
3640        fs.insert_tree(path!("/tmp"), json!({"foo": "one\ntwo\nthree\nfour\n"}))
3641            .await;
3642        let project = Project::test(fs.clone(), [], cx).await;
3643        project
3644            .update(cx, |project, cx| {
3645                project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
3646            })
3647            .await
3648            .unwrap();
3649
3650        let connection = Rc::new(FakeAgentConnection::new());
3651
3652        let thread = cx
3653            .update(|cx| {
3654                connection.new_session(project, PathList::new(&[Path::new(path!("/tmp"))]), cx)
3655            })
3656            .await
3657            .unwrap();
3658
3659        // Whole file
3660        let content = thread
3661            .update(cx, |thread, cx| {
3662                thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
3663            })
3664            .await
3665            .unwrap();
3666
3667        assert_eq!(content, "one\ntwo\nthree\nfour\n");
3668
3669        // Only start line
3670        let content = thread
3671            .update(cx, |thread, cx| {
3672                thread.read_text_file(path!("/tmp/foo").into(), Some(3), None, false, cx)
3673            })
3674            .await
3675            .unwrap();
3676
3677        assert_eq!(content, "three\nfour\n");
3678
3679        // Only limit
3680        let content = thread
3681            .update(cx, |thread, cx| {
3682                thread.read_text_file(path!("/tmp/foo").into(), None, Some(2), false, cx)
3683            })
3684            .await
3685            .unwrap();
3686
3687        assert_eq!(content, "one\ntwo\n");
3688
3689        // Range
3690        let content = thread
3691            .update(cx, |thread, cx| {
3692                thread.read_text_file(path!("/tmp/foo").into(), Some(2), Some(2), false, cx)
3693            })
3694            .await
3695            .unwrap();
3696
3697        assert_eq!(content, "two\nthree\n");
3698
3699        // Invalid
3700        let err = thread
3701            .update(cx, |thread, cx| {
3702                thread.read_text_file(path!("/tmp/foo").into(), Some(6), Some(2), false, cx)
3703            })
3704            .await
3705            .unwrap_err();
3706
3707        assert_eq!(
3708            err.to_string(),
3709            "Invalid params: \"Attempting to read beyond the end of the file, line 5:0\""
3710        );
3711    }
3712
3713    #[gpui::test]
3714    async fn test_reading_empty_file(cx: &mut TestAppContext) {
3715        init_test(cx);
3716
3717        let fs = FakeFs::new(cx.executor());
3718        fs.insert_tree(path!("/tmp"), json!({"foo": ""})).await;
3719        let project = Project::test(fs.clone(), [], cx).await;
3720        project
3721            .update(cx, |project, cx| {
3722                project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
3723            })
3724            .await
3725            .unwrap();
3726
3727        let connection = Rc::new(FakeAgentConnection::new());
3728
3729        let thread = cx
3730            .update(|cx| {
3731                connection.new_session(project, PathList::new(&[Path::new(path!("/tmp"))]), cx)
3732            })
3733            .await
3734            .unwrap();
3735
3736        // Whole file
3737        let content = thread
3738            .update(cx, |thread, cx| {
3739                thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
3740            })
3741            .await
3742            .unwrap();
3743
3744        assert_eq!(content, "");
3745
3746        // Only start line
3747        let content = thread
3748            .update(cx, |thread, cx| {
3749                thread.read_text_file(path!("/tmp/foo").into(), Some(1), None, false, cx)
3750            })
3751            .await
3752            .unwrap();
3753
3754        assert_eq!(content, "");
3755
3756        // Only limit
3757        let content = thread
3758            .update(cx, |thread, cx| {
3759                thread.read_text_file(path!("/tmp/foo").into(), None, Some(2), false, cx)
3760            })
3761            .await
3762            .unwrap();
3763
3764        assert_eq!(content, "");
3765
3766        // Range
3767        let content = thread
3768            .update(cx, |thread, cx| {
3769                thread.read_text_file(path!("/tmp/foo").into(), Some(1), Some(1), false, cx)
3770            })
3771            .await
3772            .unwrap();
3773
3774        assert_eq!(content, "");
3775
3776        // Invalid
3777        let err = thread
3778            .update(cx, |thread, cx| {
3779                thread.read_text_file(path!("/tmp/foo").into(), Some(5), Some(2), false, cx)
3780            })
3781            .await
3782            .unwrap_err();
3783
3784        assert_eq!(
3785            err.to_string(),
3786            "Invalid params: \"Attempting to read beyond the end of the file, line 1:0\""
3787        );
3788    }
3789    #[gpui::test]
3790    async fn test_reading_non_existing_file(cx: &mut TestAppContext) {
3791        init_test(cx);
3792
3793        let fs = FakeFs::new(cx.executor());
3794        fs.insert_tree(path!("/tmp"), json!({})).await;
3795        let project = Project::test(fs.clone(), [], cx).await;
3796        project
3797            .update(cx, |project, cx| {
3798                project.find_or_create_worktree(path!("/tmp"), true, cx)
3799            })
3800            .await
3801            .unwrap();
3802
3803        let connection = Rc::new(FakeAgentConnection::new());
3804
3805        let thread = cx
3806            .update(|cx| {
3807                connection.new_session(project, PathList::new(&[Path::new(path!("/tmp"))]), cx)
3808            })
3809            .await
3810            .unwrap();
3811
3812        // Out of project file
3813        let err = thread
3814            .update(cx, |thread, cx| {
3815                thread.read_text_file(path!("/foo").into(), None, None, false, cx)
3816            })
3817            .await
3818            .unwrap_err();
3819
3820        assert_eq!(err.code, acp::ErrorCode::ResourceNotFound);
3821    }
3822
3823    #[gpui::test]
3824    async fn test_succeeding_canceled_toolcall(cx: &mut TestAppContext) {
3825        init_test(cx);
3826
3827        let fs = FakeFs::new(cx.executor());
3828        let project = Project::test(fs, [], cx).await;
3829        let id = acp::ToolCallId::new("test");
3830
3831        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3832            let id = id.clone();
3833            move |_, thread, mut cx| {
3834                let id = id.clone();
3835                async move {
3836                    thread
3837                        .update(&mut cx, |thread, cx| {
3838                            thread.handle_session_update(
3839                                acp::SessionUpdate::ToolCall(
3840                                    acp::ToolCall::new(id.clone(), "Label")
3841                                        .kind(acp::ToolKind::Fetch)
3842                                        .status(acp::ToolCallStatus::InProgress),
3843                                ),
3844                                cx,
3845                            )
3846                        })
3847                        .unwrap()
3848                        .unwrap();
3849                    Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3850                }
3851                .boxed_local()
3852            }
3853        }));
3854
3855        let thread = cx
3856            .update(|cx| {
3857                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
3858            })
3859            .await
3860            .unwrap();
3861
3862        let request = thread.update(cx, |thread, cx| {
3863            thread.send_raw("Fetch https://example.com", cx)
3864        });
3865
3866        run_until_first_tool_call(&thread, cx).await;
3867
3868        thread.read_with(cx, |thread, _| {
3869            assert!(matches!(
3870                thread.entries[1],
3871                AgentThreadEntry::ToolCall(ToolCall {
3872                    status: ToolCallStatus::InProgress,
3873                    ..
3874                })
3875            ));
3876        });
3877
3878        thread.update(cx, |thread, cx| thread.cancel(cx)).await;
3879
3880        thread.read_with(cx, |thread, _| {
3881            assert!(matches!(
3882                &thread.entries[1],
3883                AgentThreadEntry::ToolCall(ToolCall {
3884                    status: ToolCallStatus::Canceled,
3885                    ..
3886                })
3887            ));
3888        });
3889
3890        thread
3891            .update(cx, |thread, cx| {
3892                thread.handle_session_update(
3893                    acp::SessionUpdate::ToolCallUpdate(acp::ToolCallUpdate::new(
3894                        id,
3895                        acp::ToolCallUpdateFields::new().status(acp::ToolCallStatus::Completed),
3896                    )),
3897                    cx,
3898                )
3899            })
3900            .unwrap();
3901
3902        request.await.unwrap();
3903
3904        thread.read_with(cx, |thread, _| {
3905            assert!(matches!(
3906                thread.entries[1],
3907                AgentThreadEntry::ToolCall(ToolCall {
3908                    status: ToolCallStatus::Completed,
3909                    ..
3910                })
3911            ));
3912        });
3913    }
3914
3915    #[gpui::test]
3916    async fn test_no_pending_edits_if_tool_calls_are_completed(cx: &mut TestAppContext) {
3917        init_test(cx);
3918        let fs = FakeFs::new(cx.background_executor.clone());
3919        fs.insert_tree(path!("/test"), json!({})).await;
3920        let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
3921
3922        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3923            move |_, thread, mut cx| {
3924                async move {
3925                    thread
3926                        .update(&mut cx, |thread, cx| {
3927                            thread.handle_session_update(
3928                                acp::SessionUpdate::ToolCall(
3929                                    acp::ToolCall::new("test", "Label")
3930                                        .kind(acp::ToolKind::Edit)
3931                                        .status(acp::ToolCallStatus::Completed)
3932                                        .content(vec![acp::ToolCallContent::Diff(acp::Diff::new(
3933                                            "/test/test.txt",
3934                                            "foo",
3935                                        ))]),
3936                                ),
3937                                cx,
3938                            )
3939                        })
3940                        .unwrap()
3941                        .unwrap();
3942                    Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3943                }
3944                .boxed_local()
3945            }
3946        }));
3947
3948        let thread = cx
3949            .update(|cx| {
3950                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
3951            })
3952            .await
3953            .unwrap();
3954
3955        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Hi".into()], cx)))
3956            .await
3957            .unwrap();
3958
3959        assert!(cx.read(|cx| !thread.read(cx).has_pending_edit_tool_calls()));
3960    }
3961
3962    #[gpui::test(iterations = 10)]
3963    async fn test_checkpoints(cx: &mut TestAppContext) {
3964        init_test(cx);
3965        let fs = FakeFs::new(cx.background_executor.clone());
3966        fs.insert_tree(
3967            path!("/test"),
3968            json!({
3969                ".git": {}
3970            }),
3971        )
3972        .await;
3973        let project = Project::test(fs.clone(), [path!("/test").as_ref()], cx).await;
3974
3975        let simulate_changes = Arc::new(AtomicBool::new(true));
3976        let next_filename = Arc::new(AtomicUsize::new(0));
3977        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3978            let simulate_changes = simulate_changes.clone();
3979            let next_filename = next_filename.clone();
3980            let fs = fs.clone();
3981            move |request, thread, mut cx| {
3982                let fs = fs.clone();
3983                let simulate_changes = simulate_changes.clone();
3984                let next_filename = next_filename.clone();
3985                async move {
3986                    if simulate_changes.load(SeqCst) {
3987                        let filename = format!("/test/file-{}", next_filename.fetch_add(1, SeqCst));
3988                        fs.write(Path::new(&filename), b"").await?;
3989                    }
3990
3991                    let acp::ContentBlock::Text(content) = &request.prompt[0] else {
3992                        panic!("expected text content block");
3993                    };
3994                    thread.update(&mut cx, |thread, cx| {
3995                        thread
3996                            .handle_session_update(
3997                                acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk::new(
3998                                    content.text.to_uppercase().into(),
3999                                )),
4000                                cx,
4001                            )
4002                            .unwrap();
4003                    })?;
4004                    Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
4005                }
4006                .boxed_local()
4007            }
4008        }));
4009        let thread = cx
4010            .update(|cx| {
4011                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4012            })
4013            .await
4014            .unwrap();
4015
4016        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Lorem".into()], cx)))
4017            .await
4018            .unwrap();
4019        thread.read_with(cx, |thread, cx| {
4020            assert_eq!(
4021                thread.to_markdown(cx),
4022                indoc! {"
4023                    ## User (checkpoint)
4024
4025                    Lorem
4026
4027                    ## Assistant
4028
4029                    LOREM
4030
4031                "}
4032            );
4033        });
4034        assert_eq!(fs.files(), vec![Path::new(path!("/test/file-0"))]);
4035
4036        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["ipsum".into()], cx)))
4037            .await
4038            .unwrap();
4039        thread.read_with(cx, |thread, cx| {
4040            assert_eq!(
4041                thread.to_markdown(cx),
4042                indoc! {"
4043                    ## User (checkpoint)
4044
4045                    Lorem
4046
4047                    ## Assistant
4048
4049                    LOREM
4050
4051                    ## User (checkpoint)
4052
4053                    ipsum
4054
4055                    ## Assistant
4056
4057                    IPSUM
4058
4059                "}
4060            );
4061        });
4062        assert_eq!(
4063            fs.files(),
4064            vec![
4065                Path::new(path!("/test/file-0")),
4066                Path::new(path!("/test/file-1"))
4067            ]
4068        );
4069
4070        // Checkpoint isn't stored when there are no changes.
4071        simulate_changes.store(false, SeqCst);
4072        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["dolor".into()], cx)))
4073            .await
4074            .unwrap();
4075        thread.read_with(cx, |thread, cx| {
4076            assert_eq!(
4077                thread.to_markdown(cx),
4078                indoc! {"
4079                    ## User (checkpoint)
4080
4081                    Lorem
4082
4083                    ## Assistant
4084
4085                    LOREM
4086
4087                    ## User (checkpoint)
4088
4089                    ipsum
4090
4091                    ## Assistant
4092
4093                    IPSUM
4094
4095                    ## User
4096
4097                    dolor
4098
4099                    ## Assistant
4100
4101                    DOLOR
4102
4103                "}
4104            );
4105        });
4106        assert_eq!(
4107            fs.files(),
4108            vec![
4109                Path::new(path!("/test/file-0")),
4110                Path::new(path!("/test/file-1"))
4111            ]
4112        );
4113
4114        // Rewinding the conversation truncates the history and restores the checkpoint.
4115        thread
4116            .update(cx, |thread, cx| {
4117                let AgentThreadEntry::UserMessage(message) = &thread.entries[2] else {
4118                    panic!("unexpected entries {:?}", thread.entries)
4119                };
4120                thread.restore_checkpoint(message.id.clone().unwrap(), cx)
4121            })
4122            .await
4123            .unwrap();
4124        thread.read_with(cx, |thread, cx| {
4125            assert_eq!(
4126                thread.to_markdown(cx),
4127                indoc! {"
4128                    ## User (checkpoint)
4129
4130                    Lorem
4131
4132                    ## Assistant
4133
4134                    LOREM
4135
4136                "}
4137            );
4138        });
4139        assert_eq!(fs.files(), vec![Path::new(path!("/test/file-0"))]);
4140    }
4141
4142    #[gpui::test]
4143    async fn test_tool_result_refusal(cx: &mut TestAppContext) {
4144        use std::sync::atomic::AtomicUsize;
4145        init_test(cx);
4146
4147        let fs = FakeFs::new(cx.executor());
4148        let project = Project::test(fs, None, cx).await;
4149
4150        // Create a connection that simulates refusal after tool result
4151        let prompt_count = Arc::new(AtomicUsize::new(0));
4152        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
4153            let prompt_count = prompt_count.clone();
4154            move |_request, thread, mut cx| {
4155                let count = prompt_count.fetch_add(1, SeqCst);
4156                async move {
4157                    if count == 0 {
4158                        // First prompt: Generate a tool call with result
4159                        thread.update(&mut cx, |thread, cx| {
4160                            thread
4161                                .handle_session_update(
4162                                    acp::SessionUpdate::ToolCall(
4163                                        acp::ToolCall::new("tool1", "Test Tool")
4164                                            .kind(acp::ToolKind::Fetch)
4165                                            .status(acp::ToolCallStatus::Completed)
4166                                            .raw_input(serde_json::json!({"query": "test"}))
4167                                            .raw_output(serde_json::json!({"result": "inappropriate content"})),
4168                                    ),
4169                                    cx,
4170                                )
4171                                .unwrap();
4172                        })?;
4173
4174                        // Now return refusal because of the tool result
4175                        Ok(acp::PromptResponse::new(acp::StopReason::Refusal))
4176                    } else {
4177                        Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
4178                    }
4179                }
4180                .boxed_local()
4181            }
4182        }));
4183
4184        let thread = cx
4185            .update(|cx| {
4186                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4187            })
4188            .await
4189            .unwrap();
4190
4191        // Track if we see a Refusal event
4192        let saw_refusal_event = Arc::new(std::sync::Mutex::new(false));
4193        let saw_refusal_event_captured = saw_refusal_event.clone();
4194        thread.update(cx, |_thread, cx| {
4195            cx.subscribe(
4196                &thread,
4197                move |_thread, _event_thread, event: &AcpThreadEvent, _cx| {
4198                    if matches!(event, AcpThreadEvent::Refusal) {
4199                        *saw_refusal_event_captured.lock().unwrap() = true;
4200                    }
4201                },
4202            )
4203            .detach();
4204        });
4205
4206        // Send a user message - this will trigger tool call and then refusal
4207        let send_task = thread.update(cx, |thread, cx| thread.send(vec!["Hello".into()], cx));
4208        cx.background_executor.spawn(send_task).detach();
4209        cx.run_until_parked();
4210
4211        // Verify that:
4212        // 1. A Refusal event WAS emitted (because it's a tool result refusal, not user prompt)
4213        // 2. The user message was NOT truncated
4214        assert!(
4215            *saw_refusal_event.lock().unwrap(),
4216            "Refusal event should be emitted for tool result refusals"
4217        );
4218
4219        thread.read_with(cx, |thread, _| {
4220            let entries = thread.entries();
4221            assert!(entries.len() >= 2, "Should have user message and tool call");
4222
4223            // Verify user message is still there
4224            assert!(
4225                matches!(entries[0], AgentThreadEntry::UserMessage(_)),
4226                "User message should not be truncated"
4227            );
4228
4229            // Verify tool call is there with result
4230            if let AgentThreadEntry::ToolCall(tool_call) = &entries[1] {
4231                assert!(
4232                    tool_call.raw_output.is_some(),
4233                    "Tool call should have output"
4234                );
4235            } else {
4236                panic!("Expected tool call at index 1");
4237            }
4238        });
4239    }
4240
4241    #[gpui::test]
4242    async fn test_user_prompt_refusal_emits_event(cx: &mut TestAppContext) {
4243        init_test(cx);
4244
4245        let fs = FakeFs::new(cx.executor());
4246        let project = Project::test(fs, None, cx).await;
4247
4248        let refuse_next = Arc::new(AtomicBool::new(false));
4249        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
4250            let refuse_next = refuse_next.clone();
4251            move |_request, _thread, _cx| {
4252                if refuse_next.load(SeqCst) {
4253                    async move { Ok(acp::PromptResponse::new(acp::StopReason::Refusal)) }
4254                        .boxed_local()
4255                } else {
4256                    async move { Ok(acp::PromptResponse::new(acp::StopReason::EndTurn)) }
4257                        .boxed_local()
4258                }
4259            }
4260        }));
4261
4262        let thread = cx
4263            .update(|cx| {
4264                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4265            })
4266            .await
4267            .unwrap();
4268
4269        // Track if we see a Refusal event
4270        let saw_refusal_event = Arc::new(std::sync::Mutex::new(false));
4271        let saw_refusal_event_captured = saw_refusal_event.clone();
4272        thread.update(cx, |_thread, cx| {
4273            cx.subscribe(
4274                &thread,
4275                move |_thread, _event_thread, event: &AcpThreadEvent, _cx| {
4276                    if matches!(event, AcpThreadEvent::Refusal) {
4277                        *saw_refusal_event_captured.lock().unwrap() = true;
4278                    }
4279                },
4280            )
4281            .detach();
4282        });
4283
4284        // Send a message that will be refused
4285        refuse_next.store(true, SeqCst);
4286        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["hello".into()], cx)))
4287            .await
4288            .unwrap();
4289
4290        // Verify that a Refusal event WAS emitted for user prompt refusal
4291        assert!(
4292            *saw_refusal_event.lock().unwrap(),
4293            "Refusal event should be emitted for user prompt refusals"
4294        );
4295
4296        // Verify the message was truncated (user prompt refusal)
4297        thread.read_with(cx, |thread, cx| {
4298            assert_eq!(thread.to_markdown(cx), "");
4299        });
4300    }
4301
4302    #[gpui::test]
4303    async fn test_refusal(cx: &mut TestAppContext) {
4304        init_test(cx);
4305        let fs = FakeFs::new(cx.background_executor.clone());
4306        fs.insert_tree(path!("/"), json!({})).await;
4307        let project = Project::test(fs.clone(), [path!("/").as_ref()], cx).await;
4308
4309        let refuse_next = Arc::new(AtomicBool::new(false));
4310        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
4311            let refuse_next = refuse_next.clone();
4312            move |request, thread, mut cx| {
4313                let refuse_next = refuse_next.clone();
4314                async move {
4315                    if refuse_next.load(SeqCst) {
4316                        return Ok(acp::PromptResponse::new(acp::StopReason::Refusal));
4317                    }
4318
4319                    let acp::ContentBlock::Text(content) = &request.prompt[0] else {
4320                        panic!("expected text content block");
4321                    };
4322                    thread.update(&mut cx, |thread, cx| {
4323                        thread
4324                            .handle_session_update(
4325                                acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk::new(
4326                                    content.text.to_uppercase().into(),
4327                                )),
4328                                cx,
4329                            )
4330                            .unwrap();
4331                    })?;
4332                    Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
4333                }
4334                .boxed_local()
4335            }
4336        }));
4337        let thread = cx
4338            .update(|cx| {
4339                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4340            })
4341            .await
4342            .unwrap();
4343
4344        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["hello".into()], cx)))
4345            .await
4346            .unwrap();
4347        thread.read_with(cx, |thread, cx| {
4348            assert_eq!(
4349                thread.to_markdown(cx),
4350                indoc! {"
4351                    ## User
4352
4353                    hello
4354
4355                    ## Assistant
4356
4357                    HELLO
4358
4359                "}
4360            );
4361        });
4362
4363        // Simulate refusing the second message. The message should be truncated
4364        // when a user prompt is refused.
4365        refuse_next.store(true, SeqCst);
4366        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["world".into()], cx)))
4367            .await
4368            .unwrap();
4369        thread.read_with(cx, |thread, cx| {
4370            assert_eq!(
4371                thread.to_markdown(cx),
4372                indoc! {"
4373                    ## User
4374
4375                    hello
4376
4377                    ## Assistant
4378
4379                    HELLO
4380
4381                "}
4382            );
4383        });
4384    }
4385
4386    async fn run_until_first_tool_call(
4387        thread: &Entity<AcpThread>,
4388        cx: &mut TestAppContext,
4389    ) -> usize {
4390        let (mut tx, mut rx) = mpsc::channel::<usize>(1);
4391
4392        let subscription = cx.update(|cx| {
4393            cx.subscribe(thread, move |thread, _, cx| {
4394                for (ix, entry) in thread.read(cx).entries.iter().enumerate() {
4395                    if matches!(entry, AgentThreadEntry::ToolCall(_)) {
4396                        return tx.try_send(ix).unwrap();
4397                    }
4398                }
4399            })
4400        });
4401
4402        select! {
4403            _ = futures::FutureExt::fuse(cx.background_executor.timer(Duration::from_secs(10))) => {
4404                panic!("Timeout waiting for tool call")
4405            }
4406            ix = rx.next().fuse() => {
4407                drop(subscription);
4408                ix.unwrap()
4409            }
4410        }
4411    }
4412
4413    #[derive(Clone, Default)]
4414    struct FakeAgentConnection {
4415        auth_methods: Vec<acp::AuthMethod>,
4416        supports_truncate: bool,
4417        sessions: Arc<parking_lot::Mutex<HashMap<acp::SessionId, WeakEntity<AcpThread>>>>,
4418        set_title_calls: Rc<RefCell<Vec<SharedString>>>,
4419        on_user_message: Option<
4420            Rc<
4421                dyn Fn(
4422                        acp::PromptRequest,
4423                        WeakEntity<AcpThread>,
4424                        AsyncApp,
4425                    ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
4426                    + 'static,
4427            >,
4428        >,
4429    }
4430
4431    impl FakeAgentConnection {
4432        fn new() -> Self {
4433            Self {
4434                auth_methods: Vec::new(),
4435                supports_truncate: true,
4436                on_user_message: None,
4437                sessions: Arc::default(),
4438                set_title_calls: Default::default(),
4439            }
4440        }
4441
4442        fn without_truncate_support(mut self) -> Self {
4443            self.supports_truncate = false;
4444            self
4445        }
4446
4447        #[expect(unused)]
4448        fn with_auth_methods(mut self, auth_methods: Vec<acp::AuthMethod>) -> Self {
4449            self.auth_methods = auth_methods;
4450            self
4451        }
4452
4453        fn on_user_message(
4454            mut self,
4455            handler: impl Fn(
4456                acp::PromptRequest,
4457                WeakEntity<AcpThread>,
4458                AsyncApp,
4459            ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
4460            + 'static,
4461        ) -> Self {
4462            self.on_user_message.replace(Rc::new(handler));
4463            self
4464        }
4465    }
4466
4467    impl AgentConnection for FakeAgentConnection {
4468        fn agent_id(&self) -> AgentId {
4469            AgentId::new("fake")
4470        }
4471
4472        fn telemetry_id(&self) -> SharedString {
4473            "fake".into()
4474        }
4475
4476        fn auth_methods(&self) -> &[acp::AuthMethod] {
4477            &self.auth_methods
4478        }
4479
4480        fn new_session(
4481            self: Rc<Self>,
4482            project: Entity<Project>,
4483            work_dirs: PathList,
4484            cx: &mut App,
4485        ) -> Task<gpui::Result<Entity<AcpThread>>> {
4486            let session_id = acp::SessionId::new(
4487                rand::rng()
4488                    .sample_iter(&distr::Alphanumeric)
4489                    .take(7)
4490                    .map(char::from)
4491                    .collect::<String>(),
4492            );
4493            let action_log = cx.new(|_| ActionLog::new(project.clone()));
4494            let thread = cx.new(|cx| {
4495                AcpThread::new(
4496                    None,
4497                    None,
4498                    Some(work_dirs),
4499                    self.clone(),
4500                    project,
4501                    action_log,
4502                    session_id.clone(),
4503                    watch::Receiver::constant(
4504                        acp::PromptCapabilities::new()
4505                            .image(true)
4506                            .audio(true)
4507                            .embedded_context(true),
4508                    ),
4509                    cx,
4510                )
4511            });
4512            self.sessions.lock().insert(session_id, thread.downgrade());
4513            Task::ready(Ok(thread))
4514        }
4515
4516        fn authenticate(&self, method: acp::AuthMethodId, _cx: &mut App) -> Task<gpui::Result<()>> {
4517            if self.auth_methods().iter().any(|m| m.id() == &method) {
4518                Task::ready(Ok(()))
4519            } else {
4520                Task::ready(Err(anyhow!("Invalid Auth Method")))
4521            }
4522        }
4523
4524        fn prompt(
4525            &self,
4526            _id: UserMessageId,
4527            params: acp::PromptRequest,
4528            cx: &mut App,
4529        ) -> Task<gpui::Result<acp::PromptResponse>> {
4530            let sessions = self.sessions.lock();
4531            let thread = sessions.get(&params.session_id).unwrap();
4532            if let Some(handler) = &self.on_user_message {
4533                let handler = handler.clone();
4534                let thread = thread.clone();
4535                cx.spawn(async move |cx| handler(params, thread, cx.clone()).await)
4536            } else {
4537                Task::ready(Ok(acp::PromptResponse::new(acp::StopReason::EndTurn)))
4538            }
4539        }
4540
4541        fn cancel(&self, _session_id: &acp::SessionId, _cx: &mut App) {}
4542
4543        fn truncate(
4544            &self,
4545            session_id: &acp::SessionId,
4546            _cx: &App,
4547        ) -> Option<Rc<dyn AgentSessionTruncate>> {
4548            self.supports_truncate.then(|| {
4549                Rc::new(FakeAgentSessionEditor {
4550                    _session_id: session_id.clone(),
4551                }) as Rc<dyn AgentSessionTruncate>
4552            })
4553        }
4554
4555        fn set_title(
4556            &self,
4557            _session_id: &acp::SessionId,
4558            _cx: &App,
4559        ) -> Option<Rc<dyn AgentSessionSetTitle>> {
4560            Some(Rc::new(FakeAgentSessionSetTitle {
4561                calls: self.set_title_calls.clone(),
4562            }))
4563        }
4564
4565        fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
4566            self
4567        }
4568    }
4569
4570    struct FakeAgentSessionSetTitle {
4571        calls: Rc<RefCell<Vec<SharedString>>>,
4572    }
4573
4574    impl AgentSessionSetTitle for FakeAgentSessionSetTitle {
4575        fn run(&self, title: SharedString, _cx: &mut App) -> Task<Result<()>> {
4576            self.calls.borrow_mut().push(title);
4577            Task::ready(Ok(()))
4578        }
4579    }
4580
4581    struct FakeAgentSessionEditor {
4582        _session_id: acp::SessionId,
4583    }
4584
4585    impl AgentSessionTruncate for FakeAgentSessionEditor {
4586        fn run(&self, _message_id: UserMessageId, _cx: &mut App) -> Task<Result<()>> {
4587            Task::ready(Ok(()))
4588        }
4589    }
4590
4591    #[gpui::test]
4592    async fn test_tool_call_not_found_creates_failed_entry(cx: &mut TestAppContext) {
4593        init_test(cx);
4594
4595        let fs = FakeFs::new(cx.executor());
4596        let project = Project::test(fs, [], cx).await;
4597        let connection = Rc::new(FakeAgentConnection::new());
4598        let thread = cx
4599            .update(|cx| {
4600                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4601            })
4602            .await
4603            .unwrap();
4604
4605        // Try to update a tool call that doesn't exist
4606        let nonexistent_id = acp::ToolCallId::new("nonexistent-tool-call");
4607        thread.update(cx, |thread, cx| {
4608            let result = thread.handle_session_update(
4609                acp::SessionUpdate::ToolCallUpdate(acp::ToolCallUpdate::new(
4610                    nonexistent_id.clone(),
4611                    acp::ToolCallUpdateFields::new().status(acp::ToolCallStatus::Completed),
4612                )),
4613                cx,
4614            );
4615
4616            // The update should succeed (not return an error)
4617            assert!(result.is_ok());
4618
4619            // There should now be exactly one entry in the thread
4620            assert_eq!(thread.entries.len(), 1);
4621
4622            // The entry should be a failed tool call
4623            if let AgentThreadEntry::ToolCall(tool_call) = &thread.entries[0] {
4624                assert_eq!(tool_call.id, nonexistent_id);
4625                assert!(matches!(tool_call.status, ToolCallStatus::Failed));
4626                assert_eq!(tool_call.kind, acp::ToolKind::Fetch);
4627
4628                // Check that the content contains the error message
4629                assert_eq!(tool_call.content.len(), 1);
4630                if let ToolCallContent::ContentBlock(content_block) = &tool_call.content[0] {
4631                    match content_block {
4632                        ContentBlock::Markdown { markdown } => {
4633                            let markdown_text = markdown.read(cx).source();
4634                            assert!(markdown_text.contains("Tool call not found"));
4635                        }
4636                        ContentBlock::Empty => panic!("Expected markdown content, got empty"),
4637                        ContentBlock::ResourceLink { .. } => {
4638                            panic!("Expected markdown content, got resource link")
4639                        }
4640                        ContentBlock::Image { .. } => {
4641                            panic!("Expected markdown content, got image")
4642                        }
4643                    }
4644                } else {
4645                    panic!("Expected ContentBlock, got: {:?}", tool_call.content[0]);
4646                }
4647            } else {
4648                panic!("Expected ToolCall entry, got: {:?}", thread.entries[0]);
4649            }
4650        });
4651    }
4652
4653    /// Tests that restoring a checkpoint properly cleans up terminals that were
4654    /// created after that checkpoint, and cancels any in-progress generation.
4655    ///
4656    /// Reproduces issue #35142: When a checkpoint is restored, any terminal processes
4657    /// that were started after that checkpoint should be terminated, and any in-progress
4658    /// AI generation should be canceled.
4659    #[gpui::test]
4660    async fn test_restore_checkpoint_kills_terminal(cx: &mut TestAppContext) {
4661        init_test(cx);
4662
4663        let fs = FakeFs::new(cx.executor());
4664        let project = Project::test(fs, [], cx).await;
4665        let connection = Rc::new(FakeAgentConnection::new());
4666        let thread = cx
4667            .update(|cx| {
4668                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4669            })
4670            .await
4671            .unwrap();
4672
4673        // Send first user message to create a checkpoint
4674        cx.update(|cx| {
4675            thread.update(cx, |thread, cx| {
4676                thread.send(vec!["first message".into()], cx)
4677            })
4678        })
4679        .await
4680        .unwrap();
4681
4682        // Send second message (creates another checkpoint) - we'll restore to this one
4683        cx.update(|cx| {
4684            thread.update(cx, |thread, cx| {
4685                thread.send(vec!["second message".into()], cx)
4686            })
4687        })
4688        .await
4689        .unwrap();
4690
4691        // Create 2 terminals BEFORE the checkpoint that have completed running
4692        let terminal_id_1 = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
4693        let mock_terminal_1 = cx.new(|cx| {
4694            let builder = ::terminal::TerminalBuilder::new_display_only(
4695                ::terminal::terminal_settings::CursorShape::default(),
4696                ::terminal::terminal_settings::AlternateScroll::On,
4697                None,
4698                0,
4699                cx.background_executor(),
4700                PathStyle::local(),
4701            )
4702            .unwrap();
4703            builder.subscribe(cx)
4704        });
4705
4706        thread.update(cx, |thread, cx| {
4707            thread.on_terminal_provider_event(
4708                TerminalProviderEvent::Created {
4709                    terminal_id: terminal_id_1.clone(),
4710                    label: "echo 'first'".to_string(),
4711                    cwd: Some(PathBuf::from("/test")),
4712                    output_byte_limit: None,
4713                    terminal: mock_terminal_1.clone(),
4714                },
4715                cx,
4716            );
4717        });
4718
4719        thread.update(cx, |thread, cx| {
4720            thread.on_terminal_provider_event(
4721                TerminalProviderEvent::Output {
4722                    terminal_id: terminal_id_1.clone(),
4723                    data: b"first\n".to_vec(),
4724                },
4725                cx,
4726            );
4727        });
4728
4729        thread.update(cx, |thread, cx| {
4730            thread.on_terminal_provider_event(
4731                TerminalProviderEvent::Exit {
4732                    terminal_id: terminal_id_1.clone(),
4733                    status: acp::TerminalExitStatus::new().exit_code(0),
4734                },
4735                cx,
4736            );
4737        });
4738
4739        let terminal_id_2 = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
4740        let mock_terminal_2 = cx.new(|cx| {
4741            let builder = ::terminal::TerminalBuilder::new_display_only(
4742                ::terminal::terminal_settings::CursorShape::default(),
4743                ::terminal::terminal_settings::AlternateScroll::On,
4744                None,
4745                0,
4746                cx.background_executor(),
4747                PathStyle::local(),
4748            )
4749            .unwrap();
4750            builder.subscribe(cx)
4751        });
4752
4753        thread.update(cx, |thread, cx| {
4754            thread.on_terminal_provider_event(
4755                TerminalProviderEvent::Created {
4756                    terminal_id: terminal_id_2.clone(),
4757                    label: "echo 'second'".to_string(),
4758                    cwd: Some(PathBuf::from("/test")),
4759                    output_byte_limit: None,
4760                    terminal: mock_terminal_2.clone(),
4761                },
4762                cx,
4763            );
4764        });
4765
4766        thread.update(cx, |thread, cx| {
4767            thread.on_terminal_provider_event(
4768                TerminalProviderEvent::Output {
4769                    terminal_id: terminal_id_2.clone(),
4770                    data: b"second\n".to_vec(),
4771                },
4772                cx,
4773            );
4774        });
4775
4776        thread.update(cx, |thread, cx| {
4777            thread.on_terminal_provider_event(
4778                TerminalProviderEvent::Exit {
4779                    terminal_id: terminal_id_2.clone(),
4780                    status: acp::TerminalExitStatus::new().exit_code(0),
4781                },
4782                cx,
4783            );
4784        });
4785
4786        // Get the second message ID to restore to
4787        let second_message_id = thread.read_with(cx, |thread, _| {
4788            // At this point we have:
4789            // - Index 0: First user message (with checkpoint)
4790            // - Index 1: Second user message (with checkpoint)
4791            // No assistant responses because FakeAgentConnection just returns EndTurn
4792            let AgentThreadEntry::UserMessage(message) = &thread.entries[1] else {
4793                panic!("expected user message at index 1");
4794            };
4795            message.id.clone().unwrap()
4796        });
4797
4798        // Create a terminal AFTER the checkpoint we'll restore to.
4799        // This simulates the AI agent starting a long-running terminal command.
4800        let terminal_id = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
4801        let mock_terminal = cx.new(|cx| {
4802            let builder = ::terminal::TerminalBuilder::new_display_only(
4803                ::terminal::terminal_settings::CursorShape::default(),
4804                ::terminal::terminal_settings::AlternateScroll::On,
4805                None,
4806                0,
4807                cx.background_executor(),
4808                PathStyle::local(),
4809            )
4810            .unwrap();
4811            builder.subscribe(cx)
4812        });
4813
4814        // Register the terminal as created
4815        thread.update(cx, |thread, cx| {
4816            thread.on_terminal_provider_event(
4817                TerminalProviderEvent::Created {
4818                    terminal_id: terminal_id.clone(),
4819                    label: "sleep 1000".to_string(),
4820                    cwd: Some(PathBuf::from("/test")),
4821                    output_byte_limit: None,
4822                    terminal: mock_terminal.clone(),
4823                },
4824                cx,
4825            );
4826        });
4827
4828        // Simulate the terminal producing output (still running)
4829        thread.update(cx, |thread, cx| {
4830            thread.on_terminal_provider_event(
4831                TerminalProviderEvent::Output {
4832                    terminal_id: terminal_id.clone(),
4833                    data: b"terminal is running...\n".to_vec(),
4834                },
4835                cx,
4836            );
4837        });
4838
4839        // Create a tool call entry that references this terminal
4840        // This represents the agent requesting a terminal command
4841        thread.update(cx, |thread, cx| {
4842            thread
4843                .handle_session_update(
4844                    acp::SessionUpdate::ToolCall(
4845                        acp::ToolCall::new("terminal-tool-1", "Running command")
4846                            .kind(acp::ToolKind::Execute)
4847                            .status(acp::ToolCallStatus::InProgress)
4848                            .content(vec![acp::ToolCallContent::Terminal(acp::Terminal::new(
4849                                terminal_id.clone(),
4850                            ))])
4851                            .raw_input(serde_json::json!({"command": "sleep 1000", "cd": "/test"})),
4852                    ),
4853                    cx,
4854                )
4855                .unwrap();
4856        });
4857
4858        // Verify terminal exists and is in the thread
4859        let terminal_exists_before =
4860            thread.read_with(cx, |thread, _| thread.terminals.contains_key(&terminal_id));
4861        assert!(
4862            terminal_exists_before,
4863            "Terminal should exist before checkpoint restore"
4864        );
4865
4866        // Verify the terminal's underlying task is still running (not completed)
4867        let terminal_running_before = thread.read_with(cx, |thread, _cx| {
4868            let terminal_entity = thread.terminals.get(&terminal_id).unwrap();
4869            terminal_entity.read_with(cx, |term, _cx| {
4870                term.output().is_none() // output is None means it's still running
4871            })
4872        });
4873        assert!(
4874            terminal_running_before,
4875            "Terminal should be running before checkpoint restore"
4876        );
4877
4878        // Verify we have the expected entries before restore
4879        let entry_count_before = thread.read_with(cx, |thread, _| thread.entries.len());
4880        assert!(
4881            entry_count_before > 1,
4882            "Should have multiple entries before restore"
4883        );
4884
4885        // Restore the checkpoint to the second message.
4886        // This should:
4887        // 1. Cancel any in-progress generation (via the cancel() call)
4888        // 2. Remove the terminal that was created after that point
4889        thread
4890            .update(cx, |thread, cx| {
4891                thread.restore_checkpoint(second_message_id, cx)
4892            })
4893            .await
4894            .unwrap();
4895
4896        // Verify that no send_task is in progress after restore
4897        // (cancel() clears the send_task)
4898        let has_send_task_after = thread.read_with(cx, |thread, _| thread.running_turn.is_some());
4899        assert!(
4900            !has_send_task_after,
4901            "Should not have a send_task after restore (cancel should have cleared it)"
4902        );
4903
4904        // Verify the entries were truncated (restoring to index 1 truncates at 1, keeping only index 0)
4905        let entry_count = thread.read_with(cx, |thread, _| thread.entries.len());
4906        assert_eq!(
4907            entry_count, 1,
4908            "Should have 1 entry after restore (only the first user message)"
4909        );
4910
4911        // Verify the 2 completed terminals from before the checkpoint still exist
4912        let terminal_1_exists = thread.read_with(cx, |thread, _| {
4913            thread.terminals.contains_key(&terminal_id_1)
4914        });
4915        assert!(
4916            terminal_1_exists,
4917            "Terminal 1 (from before checkpoint) should still exist"
4918        );
4919
4920        let terminal_2_exists = thread.read_with(cx, |thread, _| {
4921            thread.terminals.contains_key(&terminal_id_2)
4922        });
4923        assert!(
4924            terminal_2_exists,
4925            "Terminal 2 (from before checkpoint) should still exist"
4926        );
4927
4928        // Verify they're still in completed state
4929        let terminal_1_completed = thread.read_with(cx, |thread, _cx| {
4930            let terminal_entity = thread.terminals.get(&terminal_id_1).unwrap();
4931            terminal_entity.read_with(cx, |term, _cx| term.output().is_some())
4932        });
4933        assert!(terminal_1_completed, "Terminal 1 should still be completed");
4934
4935        let terminal_2_completed = thread.read_with(cx, |thread, _cx| {
4936            let terminal_entity = thread.terminals.get(&terminal_id_2).unwrap();
4937            terminal_entity.read_with(cx, |term, _cx| term.output().is_some())
4938        });
4939        assert!(terminal_2_completed, "Terminal 2 should still be completed");
4940
4941        // Verify the running terminal (created after checkpoint) was removed
4942        let terminal_3_exists =
4943            thread.read_with(cx, |thread, _| thread.terminals.contains_key(&terminal_id));
4944        assert!(
4945            !terminal_3_exists,
4946            "Terminal 3 (created after checkpoint) should have been removed"
4947        );
4948
4949        // Verify total count is 2 (the two from before the checkpoint)
4950        let terminal_count = thread.read_with(cx, |thread, _| thread.terminals.len());
4951        assert_eq!(
4952            terminal_count, 2,
4953            "Should have exactly 2 terminals (the completed ones from before checkpoint)"
4954        );
4955    }
4956
4957    /// Tests that update_last_checkpoint correctly updates the original message's checkpoint
4958    /// even when a new user message is added while the async checkpoint comparison is in progress.
4959    ///
4960    /// This is a regression test for a bug where update_last_checkpoint would fail with
4961    /// "no checkpoint" if a new user message (without a checkpoint) was added between when
4962    /// update_last_checkpoint started and when its async closure ran.
4963    #[gpui::test]
4964    async fn test_update_last_checkpoint_with_new_message_added(cx: &mut TestAppContext) {
4965        init_test(cx);
4966
4967        let fs = FakeFs::new(cx.executor());
4968        fs.insert_tree(path!("/test"), json!({".git": {}, "file.txt": "content"}))
4969            .await;
4970        let project = Project::test(fs.clone(), [Path::new(path!("/test"))], cx).await;
4971
4972        let handler_done = Arc::new(AtomicBool::new(false));
4973        let handler_done_clone = handler_done.clone();
4974        let connection = Rc::new(FakeAgentConnection::new().on_user_message(
4975            move |_, _thread, _cx| {
4976                handler_done_clone.store(true, SeqCst);
4977                async move { Ok(acp::PromptResponse::new(acp::StopReason::EndTurn)) }.boxed_local()
4978            },
4979        ));
4980
4981        let thread = cx
4982            .update(|cx| {
4983                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4984            })
4985            .await
4986            .unwrap();
4987
4988        let send_future = thread.update(cx, |thread, cx| thread.send_raw("First message", cx));
4989        let send_task = cx.background_executor.spawn(send_future);
4990
4991        // Tick until handler completes, then a few more to let update_last_checkpoint start
4992        while !handler_done.load(SeqCst) {
4993            cx.executor().tick();
4994        }
4995        for _ in 0..5 {
4996            cx.executor().tick();
4997        }
4998
4999        thread.update(cx, |thread, cx| {
5000            thread.push_entry(
5001                AgentThreadEntry::UserMessage(UserMessage {
5002                    id: Some(UserMessageId::new()),
5003                    content: ContentBlock::Empty,
5004                    chunks: vec!["Injected message (no checkpoint)".into()],
5005                    checkpoint: None,
5006                    indented: false,
5007                }),
5008                cx,
5009            );
5010        });
5011
5012        cx.run_until_parked();
5013        let result = send_task.await;
5014
5015        assert!(
5016            result.is_ok(),
5017            "send should succeed even when new message added during update_last_checkpoint: {:?}",
5018            result.err()
5019        );
5020    }
5021
5022    /// Tests that when a follow-up message is sent during generation,
5023    /// the first turn completing does NOT clear `running_turn` because
5024    /// it now belongs to the second turn.
5025    #[gpui::test]
5026    async fn test_follow_up_message_during_generation_does_not_clear_turn(cx: &mut TestAppContext) {
5027        init_test(cx);
5028
5029        let fs = FakeFs::new(cx.executor());
5030        let project = Project::test(fs, [], cx).await;
5031
5032        // First handler waits for this signal before completing
5033        let (first_complete_tx, first_complete_rx) = futures::channel::oneshot::channel::<()>();
5034        let first_complete_rx = RefCell::new(Some(first_complete_rx));
5035
5036        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
5037            move |params, _thread, _cx| {
5038                let first_complete_rx = first_complete_rx.borrow_mut().take();
5039                let is_first = params
5040                    .prompt
5041                    .iter()
5042                    .any(|c| matches!(c, acp::ContentBlock::Text(t) if t.text.contains("first")));
5043
5044                async move {
5045                    if is_first {
5046                        // First handler waits until signaled
5047                        if let Some(rx) = first_complete_rx {
5048                            rx.await.ok();
5049                        }
5050                    }
5051                    Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
5052                }
5053                .boxed_local()
5054            }
5055        }));
5056
5057        let thread = cx
5058            .update(|cx| {
5059                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
5060            })
5061            .await
5062            .unwrap();
5063
5064        // Send first message (turn_id=1) - handler will block
5065        let first_request = thread.update(cx, |thread, cx| thread.send_raw("first", cx));
5066        assert_eq!(thread.read_with(cx, |t, _| t.turn_id), 1);
5067
5068        // Send second message (turn_id=2) while first is still blocked
5069        // This calls cancel() which takes turn 1's running_turn and sets turn 2's
5070        let second_request = thread.update(cx, |thread, cx| thread.send_raw("second", cx));
5071        assert_eq!(thread.read_with(cx, |t, _| t.turn_id), 2);
5072
5073        let running_turn_after_second_send =
5074            thread.read_with(cx, |thread, _| thread.running_turn.as_ref().map(|t| t.id));
5075        assert_eq!(
5076            running_turn_after_second_send,
5077            Some(2),
5078            "running_turn should be set to turn 2 after sending second message"
5079        );
5080
5081        // Now signal first handler to complete
5082        first_complete_tx.send(()).ok();
5083
5084        // First request completes - should NOT clear running_turn
5085        // because running_turn now belongs to turn 2
5086        first_request.await.unwrap();
5087
5088        let running_turn_after_first =
5089            thread.read_with(cx, |thread, _| thread.running_turn.as_ref().map(|t| t.id));
5090        assert_eq!(
5091            running_turn_after_first,
5092            Some(2),
5093            "first turn completing should not clear running_turn (belongs to turn 2)"
5094        );
5095
5096        // Second request completes - SHOULD clear running_turn
5097        second_request.await.unwrap();
5098
5099        let running_turn_after_second =
5100            thread.read_with(cx, |thread, _| thread.running_turn.is_some());
5101        assert!(
5102            !running_turn_after_second,
5103            "second turn completing should clear running_turn"
5104        );
5105    }
5106
5107    #[gpui::test]
5108    async fn test_send_assigns_message_id_without_truncate_support(cx: &mut TestAppContext) {
5109        init_test(cx);
5110
5111        let fs = FakeFs::new(cx.executor());
5112        let project = Project::test(fs, [], cx).await;
5113
5114        let connection = Rc::new(FakeAgentConnection::new().without_truncate_support());
5115        let thread = cx
5116            .update(|cx| {
5117                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
5118            })
5119            .await
5120            .unwrap();
5121
5122        let response = thread
5123            .update(cx, |thread, cx| thread.send_raw("test message", cx))
5124            .await;
5125
5126        assert!(response.is_ok(), "send should not fail: {response:?}");
5127        thread.read_with(cx, |thread, _| {
5128            let AgentThreadEntry::UserMessage(message) = &thread.entries[0] else {
5129                panic!("expected first entry to be a user message")
5130            };
5131            assert!(
5132                message.id.is_some(),
5133                "user message should always have an id"
5134            );
5135        });
5136    }
5137
5138    #[gpui::test]
5139    async fn test_send_returns_cancelled_response_and_marks_tools_as_cancelled(
5140        cx: &mut TestAppContext,
5141    ) {
5142        init_test(cx);
5143
5144        let fs = FakeFs::new(cx.executor());
5145        let project = Project::test(fs, [], cx).await;
5146
5147        let connection = Rc::new(FakeAgentConnection::new().on_user_message(
5148            move |_params, thread, mut cx| {
5149                async move {
5150                    thread
5151                        .update(&mut cx, |thread, cx| {
5152                            thread.handle_session_update(
5153                                acp::SessionUpdate::ToolCall(
5154                                    acp::ToolCall::new(
5155                                        acp::ToolCallId::new("test-tool"),
5156                                        "Test Tool",
5157                                    )
5158                                    .kind(acp::ToolKind::Fetch)
5159                                    .status(acp::ToolCallStatus::InProgress),
5160                                ),
5161                                cx,
5162                            )
5163                        })
5164                        .unwrap()
5165                        .unwrap();
5166
5167                    Ok(acp::PromptResponse::new(acp::StopReason::Cancelled))
5168                }
5169                .boxed_local()
5170            },
5171        ));
5172
5173        let thread = cx
5174            .update(|cx| {
5175                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
5176            })
5177            .await
5178            .unwrap();
5179
5180        let response = thread
5181            .update(cx, |thread, cx| thread.send_raw("test message", cx))
5182            .await;
5183
5184        let response = response
5185            .expect("send should succeed")
5186            .expect("should have response");
5187        assert_eq!(
5188            response.stop_reason,
5189            acp::StopReason::Cancelled,
5190            "response should have Cancelled stop_reason"
5191        );
5192
5193        thread.read_with(cx, |thread, _| {
5194            let tool_entry = thread
5195                .entries
5196                .iter()
5197                .find_map(|e| {
5198                    if let AgentThreadEntry::ToolCall(call) = e {
5199                        Some(call)
5200                    } else {
5201                        None
5202                    }
5203                })
5204                .expect("should have tool call entry");
5205
5206            assert!(
5207                matches!(tool_entry.status, ToolCallStatus::Canceled),
5208                "tool should be marked as Canceled when response is Cancelled, got {:?}",
5209                tool_entry.status
5210            );
5211        });
5212    }
5213
5214    #[gpui::test]
5215    async fn test_provisional_title_replaced_by_real_title(cx: &mut TestAppContext) {
5216        init_test(cx);
5217
5218        let fs = FakeFs::new(cx.executor());
5219        let project = Project::test(fs, [], cx).await;
5220        let connection = Rc::new(FakeAgentConnection::new());
5221        let set_title_calls = connection.set_title_calls.clone();
5222
5223        let thread = cx
5224            .update(|cx| {
5225                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
5226            })
5227            .await
5228            .unwrap();
5229
5230        // Initial title is the default.
5231        thread.read_with(cx, |thread, _| {
5232            assert_eq!(thread.title(), None);
5233        });
5234
5235        // Setting a provisional title updates the display title.
5236        thread.update(cx, |thread, cx| {
5237            thread.set_provisional_title("Hello, can you help…".into(), cx);
5238        });
5239        thread.read_with(cx, |thread, _| {
5240            assert_eq!(
5241                thread.title().as_ref().map(|s| s.as_str()),
5242                Some("Hello, can you help…")
5243            );
5244        });
5245
5246        // The provisional title should NOT have propagated to the connection.
5247        assert_eq!(
5248            set_title_calls.borrow().len(),
5249            0,
5250            "provisional title should not propagate to the connection"
5251        );
5252
5253        // When the real title arrives via set_title, it replaces the
5254        // provisional title and propagates to the connection.
5255        let task = thread.update(cx, |thread, cx| {
5256            thread.set_title("Helping with Rust question".into(), cx)
5257        });
5258        task.await.expect("set_title should succeed");
5259        thread.read_with(cx, |thread, _| {
5260            assert_eq!(
5261                thread.title().as_ref().map(|s| s.as_str()),
5262                Some("Helping with Rust question")
5263            );
5264        });
5265        assert_eq!(
5266            set_title_calls.borrow().as_slice(),
5267            &[SharedString::from("Helping with Rust question")],
5268            "real title should propagate to the connection"
5269        );
5270    }
5271
5272    #[gpui::test]
5273    async fn test_session_info_update_replaces_provisional_title_and_emits_event(
5274        cx: &mut TestAppContext,
5275    ) {
5276        init_test(cx);
5277
5278        let fs = FakeFs::new(cx.executor());
5279        let project = Project::test(fs, [], cx).await;
5280        let connection = Rc::new(FakeAgentConnection::new());
5281
5282        let thread = cx
5283            .update(|cx| {
5284                connection.clone().new_session(
5285                    project,
5286                    PathList::new(&[Path::new(path!("/test"))]),
5287                    cx,
5288                )
5289            })
5290            .await
5291            .unwrap();
5292
5293        let title_updated_events = Rc::new(RefCell::new(0usize));
5294        let title_updated_events_for_subscription = title_updated_events.clone();
5295        thread.update(cx, |_thread, cx| {
5296            cx.subscribe(
5297                &thread,
5298                move |_thread, _event_thread, event: &AcpThreadEvent, _cx| {
5299                    if matches!(event, AcpThreadEvent::TitleUpdated) {
5300                        *title_updated_events_for_subscription.borrow_mut() += 1;
5301                    }
5302                },
5303            )
5304            .detach();
5305        });
5306
5307        thread.update(cx, |thread, cx| {
5308            thread.set_provisional_title("Hello, can you help…".into(), cx);
5309        });
5310        assert_eq!(
5311            *title_updated_events.borrow(),
5312            1,
5313            "setting a provisional title should emit TitleUpdated"
5314        );
5315
5316        let result = thread.update(cx, |thread, cx| {
5317            thread.handle_session_update(
5318                acp::SessionUpdate::SessionInfoUpdate(
5319                    acp::SessionInfoUpdate::new().title("Helping with Rust question"),
5320                ),
5321                cx,
5322            )
5323        });
5324        result.expect("session info update should succeed");
5325
5326        thread.read_with(cx, |thread, _| {
5327            assert_eq!(
5328                thread.title().as_ref().map(|s| s.as_str()),
5329                Some("Helping with Rust question")
5330            );
5331            assert!(
5332                !thread.has_provisional_title(),
5333                "session info title update should clear provisional title"
5334            );
5335        });
5336
5337        assert_eq!(
5338            *title_updated_events.borrow(),
5339            2,
5340            "session info title update should emit TitleUpdated"
5341        );
5342        assert!(
5343            connection.set_title_calls.borrow().is_empty(),
5344            "session info title update should not propagate back to the connection"
5345        );
5346    }
5347
5348    #[gpui::test]
5349    async fn test_usage_update_populates_token_usage_and_cost(cx: &mut TestAppContext) {
5350        init_test(cx);
5351
5352        let fs = FakeFs::new(cx.executor());
5353        let project = Project::test(fs, [], cx).await;
5354        let connection = Rc::new(FakeAgentConnection::new());
5355        let thread = cx
5356            .update(|cx| {
5357                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
5358            })
5359            .await
5360            .unwrap();
5361
5362        thread.update(cx, |thread, cx| {
5363            thread
5364                .handle_session_update(
5365                    acp::SessionUpdate::UsageUpdate(
5366                        acp::UsageUpdate::new(5000, 10000).cost(acp::Cost::new(0.42, "USD")),
5367                    ),
5368                    cx,
5369                )
5370                .unwrap();
5371        });
5372
5373        thread.read_with(cx, |thread, _| {
5374            let usage = thread.token_usage().expect("token_usage should be set");
5375            assert_eq!(usage.max_tokens, 10000);
5376            assert_eq!(usage.used_tokens, 5000);
5377
5378            let cost = thread.cost().expect("cost should be set");
5379            assert!((cost.amount - 0.42).abs() < f64::EPSILON);
5380            assert_eq!(cost.currency.as_ref(), "USD");
5381        });
5382    }
5383
5384    #[gpui::test]
5385    async fn test_usage_update_without_cost_preserves_existing_cost(cx: &mut TestAppContext) {
5386        init_test(cx);
5387
5388        let fs = FakeFs::new(cx.executor());
5389        let project = Project::test(fs, [], cx).await;
5390        let connection = Rc::new(FakeAgentConnection::new());
5391        let thread = cx
5392            .update(|cx| {
5393                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
5394            })
5395            .await
5396            .unwrap();
5397
5398        thread.update(cx, |thread, cx| {
5399            thread
5400                .handle_session_update(
5401                    acp::SessionUpdate::UsageUpdate(
5402                        acp::UsageUpdate::new(1000, 10000).cost(acp::Cost::new(0.10, "USD")),
5403                    ),
5404                    cx,
5405                )
5406                .unwrap();
5407
5408            thread
5409                .handle_session_update(
5410                    acp::SessionUpdate::UsageUpdate(acp::UsageUpdate::new(2000, 10000)),
5411                    cx,
5412                )
5413                .unwrap();
5414        });
5415
5416        thread.read_with(cx, |thread, _| {
5417            let usage = thread.token_usage().expect("token_usage should be set");
5418            assert_eq!(usage.used_tokens, 2000);
5419
5420            let cost = thread.cost().expect("cost should be preserved");
5421            assert!((cost.amount - 0.10).abs() < f64::EPSILON);
5422        });
5423    }
5424
5425    #[gpui::test]
5426    async fn test_response_usage_does_not_clobber_session_usage(cx: &mut TestAppContext) {
5427        init_test(cx);
5428
5429        let fs = FakeFs::new(cx.executor());
5430        let project = Project::test(fs, [], cx).await;
5431        let connection = Rc::new(FakeAgentConnection::new().on_user_message(
5432            move |_, thread, mut cx| {
5433                async move {
5434                    thread.update(&mut cx, |thread, cx| {
5435                        thread
5436                            .handle_session_update(
5437                                acp::SessionUpdate::UsageUpdate(
5438                                    acp::UsageUpdate::new(3000, 10000)
5439                                        .cost(acp::Cost::new(0.05, "EUR")),
5440                                ),
5441                                cx,
5442                            )
5443                            .unwrap();
5444                    })?;
5445                    Ok(acp::PromptResponse::new(acp::StopReason::EndTurn)
5446                        .usage(acp::Usage::new(500, 200, 300)))
5447                }
5448                .boxed_local()
5449            },
5450        ));
5451
5452        let thread = cx
5453            .update(|cx| {
5454                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
5455            })
5456            .await
5457            .unwrap();
5458
5459        thread
5460            .update(cx, |thread, cx| thread.send_raw("hello", cx))
5461            .await
5462            .unwrap();
5463
5464        thread.read_with(cx, |thread, _| {
5465            let usage = thread.token_usage().expect("token_usage should be set");
5466            assert_eq!(usage.max_tokens, 10000, "max_tokens from UsageUpdate");
5467            assert_eq!(usage.used_tokens, 3000, "used_tokens from UsageUpdate");
5468            assert_eq!(usage.input_tokens, 200, "input_tokens from response usage");
5469            assert_eq!(
5470                usage.output_tokens, 300,
5471                "output_tokens from response usage"
5472            );
5473
5474            let cost = thread.cost().expect("cost should be set");
5475            assert!((cost.amount - 0.05).abs() < f64::EPSILON);
5476            assert_eq!(cost.currency.as_ref(), "EUR");
5477        });
5478    }
5479
5480    #[gpui::test]
5481    async fn test_clearing_token_usage_also_clears_cost(cx: &mut TestAppContext) {
5482        init_test(cx);
5483
5484        let fs = FakeFs::new(cx.executor());
5485        let project = Project::test(fs, [], cx).await;
5486        let connection = Rc::new(FakeAgentConnection::new());
5487        let thread = cx
5488            .update(|cx| {
5489                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
5490            })
5491            .await
5492            .unwrap();
5493
5494        thread.update(cx, |thread, cx| {
5495            thread
5496                .handle_session_update(
5497                    acp::SessionUpdate::UsageUpdate(
5498                        acp::UsageUpdate::new(1000, 10000).cost(acp::Cost::new(0.25, "USD")),
5499                    ),
5500                    cx,
5501                )
5502                .unwrap();
5503
5504            assert!(thread.token_usage().is_some());
5505            assert!(thread.cost().is_some());
5506
5507            thread.update_token_usage(None, cx);
5508
5509            assert!(thread.token_usage().is_none());
5510            assert!(
5511                thread.cost().is_none(),
5512                "cost should be cleared when token usage is cleared"
5513            );
5514        });
5515    }
5516}