acp_thread.rs

   1mod connection;
   2mod diff;
   3mod mention;
   4mod terminal;
   5use action_log::{ActionLog, ActionLogTelemetry};
   6use agent_client_protocol::{self as acp};
   7use anyhow::{Context as _, Result, anyhow};
   8use collections::HashSet;
   9pub use connection::*;
  10pub use diff::*;
  11use futures::{FutureExt, channel::oneshot, future::BoxFuture};
  12use gpui::{AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Task, WeakEntity};
  13use itertools::Itertools;
  14use language::language_settings::FormatOnSave;
  15use language::{Anchor, Buffer, BufferSnapshot, LanguageRegistry, Point, ToPoint, text_diff};
  16use markdown::Markdown;
  17pub use mention::*;
  18use project::lsp_store::{FormatTrigger, LspFormatTarget};
  19use project::{AgentLocation, Project, git_store::GitStoreCheckpoint};
  20use serde::{Deserialize, Serialize};
  21use serde_json::to_string_pretty;
  22use std::collections::HashMap;
  23use std::error::Error;
  24use std::fmt::{Formatter, Write};
  25use std::ops::Range;
  26use std::process::ExitStatus;
  27use std::rc::Rc;
  28use std::time::{Duration, Instant};
  29use std::{fmt::Display, mem, path::PathBuf, sync::Arc};
  30use task::{Shell, ShellBuilder};
  31pub use terminal::*;
  32use text::Bias;
  33use ui::App;
  34use util::markdown::MarkdownEscaped;
  35use util::path_list::PathList;
  36use util::{ResultExt, get_default_system_shell_preferring_bash, paths::PathStyle};
  37use uuid::Uuid;
  38
  39/// Key used in ACP ToolCall meta to store the tool's programmatic name.
  40/// This is a workaround since ACP's ToolCall doesn't have a dedicated name field.
  41pub const TOOL_NAME_META_KEY: &str = "tool_name";
  42
  43/// Helper to extract tool name from ACP meta
  44pub fn tool_name_from_meta(meta: &Option<acp::Meta>) -> Option<SharedString> {
  45    meta.as_ref()
  46        .and_then(|m| m.get(TOOL_NAME_META_KEY))
  47        .and_then(|v| v.as_str())
  48        .map(|s| SharedString::from(s.to_owned()))
  49}
  50
  51/// Helper to create meta with tool name
  52pub fn meta_with_tool_name(tool_name: &str) -> acp::Meta {
  53    acp::Meta::from_iter([(TOOL_NAME_META_KEY.into(), tool_name.into())])
  54}
  55
  56/// Key used in ACP ToolCall meta to store the session id and message indexes
  57pub const SUBAGENT_SESSION_INFO_META_KEY: &str = "subagent_session_info";
  58
  59#[derive(Clone, Debug, Deserialize, Serialize)]
  60pub struct SubagentSessionInfo {
  61    /// The session id of the subagent sessiont that was spawned
  62    pub session_id: acp::SessionId,
  63    /// The index of the message of the start of the "turn" run by this tool call
  64    pub message_start_index: usize,
  65    /// The index of the output of the message that the subagent has returned
  66    #[serde(skip_serializing_if = "Option::is_none")]
  67    pub message_end_index: Option<usize>,
  68}
  69
  70/// Helper to extract subagent session id from ACP meta
  71pub fn subagent_session_info_from_meta(meta: &Option<acp::Meta>) -> Option<SubagentSessionInfo> {
  72    meta.as_ref()
  73        .and_then(|m| m.get(SUBAGENT_SESSION_INFO_META_KEY))
  74        .and_then(|v| serde_json::from_value(v.clone()).ok())
  75}
  76
  77#[derive(Debug)]
  78pub struct UserMessage {
  79    pub id: Option<UserMessageId>,
  80    pub content: ContentBlock,
  81    pub chunks: Vec<acp::ContentBlock>,
  82    pub checkpoint: Option<Checkpoint>,
  83    pub indented: bool,
  84}
  85
  86#[derive(Debug)]
  87pub struct Checkpoint {
  88    git_checkpoint: GitStoreCheckpoint,
  89    pub show: bool,
  90}
  91
  92impl UserMessage {
  93    fn to_markdown(&self, cx: &App) -> String {
  94        let mut markdown = String::new();
  95        if self
  96            .checkpoint
  97            .as_ref()
  98            .is_some_and(|checkpoint| checkpoint.show)
  99        {
 100            writeln!(markdown, "## User (checkpoint)").unwrap();
 101        } else {
 102            writeln!(markdown, "## User").unwrap();
 103        }
 104        writeln!(markdown).unwrap();
 105        writeln!(markdown, "{}", self.content.to_markdown(cx)).unwrap();
 106        writeln!(markdown).unwrap();
 107        markdown
 108    }
 109}
 110
 111#[derive(Debug, PartialEq)]
 112pub struct AssistantMessage {
 113    pub chunks: Vec<AssistantMessageChunk>,
 114    pub indented: bool,
 115    pub is_subagent_output: bool,
 116}
 117
 118impl AssistantMessage {
 119    pub fn to_markdown(&self, cx: &App) -> String {
 120        format!(
 121            "## Assistant\n\n{}\n\n",
 122            self.chunks
 123                .iter()
 124                .map(|chunk| chunk.to_markdown(cx))
 125                .join("\n\n")
 126        )
 127    }
 128}
 129
 130#[derive(Debug, PartialEq)]
 131pub enum AssistantMessageChunk {
 132    Message { block: ContentBlock },
 133    Thought { block: ContentBlock },
 134}
 135
 136impl AssistantMessageChunk {
 137    pub fn from_str(
 138        chunk: &str,
 139        language_registry: &Arc<LanguageRegistry>,
 140        path_style: PathStyle,
 141        cx: &mut App,
 142    ) -> Self {
 143        Self::Message {
 144            block: ContentBlock::new(chunk.into(), language_registry, path_style, cx),
 145        }
 146    }
 147
 148    fn to_markdown(&self, cx: &App) -> String {
 149        match self {
 150            Self::Message { block } => block.to_markdown(cx).to_string(),
 151            Self::Thought { block } => {
 152                format!("<thinking>\n{}\n</thinking>", block.to_markdown(cx))
 153            }
 154        }
 155    }
 156}
 157
 158#[derive(Debug)]
 159pub enum AgentThreadEntry {
 160    UserMessage(UserMessage),
 161    AssistantMessage(AssistantMessage),
 162    ToolCall(ToolCall),
 163}
 164
 165impl AgentThreadEntry {
 166    pub fn is_indented(&self) -> bool {
 167        match self {
 168            Self::UserMessage(message) => message.indented,
 169            Self::AssistantMessage(message) => message.indented,
 170            Self::ToolCall(_) => false,
 171        }
 172    }
 173
 174    pub fn to_markdown(&self, cx: &App) -> String {
 175        match self {
 176            Self::UserMessage(message) => message.to_markdown(cx),
 177            Self::AssistantMessage(message) => message.to_markdown(cx),
 178            Self::ToolCall(tool_call) => tool_call.to_markdown(cx),
 179        }
 180    }
 181
 182    pub fn user_message(&self) -> Option<&UserMessage> {
 183        if let AgentThreadEntry::UserMessage(message) = self {
 184            Some(message)
 185        } else {
 186            None
 187        }
 188    }
 189
 190    pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
 191        if let AgentThreadEntry::ToolCall(call) = self {
 192            itertools::Either::Left(call.diffs())
 193        } else {
 194            itertools::Either::Right(std::iter::empty())
 195        }
 196    }
 197
 198    pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
 199        if let AgentThreadEntry::ToolCall(call) = self {
 200            itertools::Either::Left(call.terminals())
 201        } else {
 202            itertools::Either::Right(std::iter::empty())
 203        }
 204    }
 205
 206    pub fn location(&self, ix: usize) -> Option<(acp::ToolCallLocation, AgentLocation)> {
 207        if let AgentThreadEntry::ToolCall(ToolCall {
 208            locations,
 209            resolved_locations,
 210            ..
 211        }) = self
 212        {
 213            Some((
 214                locations.get(ix)?.clone(),
 215                resolved_locations.get(ix)?.clone()?,
 216            ))
 217        } else {
 218            None
 219        }
 220    }
 221}
 222
 223#[derive(Debug)]
 224pub struct ToolCall {
 225    pub id: acp::ToolCallId,
 226    pub label: Entity<Markdown>,
 227    pub kind: acp::ToolKind,
 228    pub content: Vec<ToolCallContent>,
 229    pub status: ToolCallStatus,
 230    pub locations: Vec<acp::ToolCallLocation>,
 231    pub resolved_locations: Vec<Option<AgentLocation>>,
 232    pub raw_input: Option<serde_json::Value>,
 233    pub raw_input_markdown: Option<Entity<Markdown>>,
 234    pub raw_output: Option<serde_json::Value>,
 235    pub tool_name: Option<SharedString>,
 236    pub subagent_session_info: Option<SubagentSessionInfo>,
 237}
 238
 239impl ToolCall {
 240    fn from_acp(
 241        tool_call: acp::ToolCall,
 242        status: ToolCallStatus,
 243        language_registry: Arc<LanguageRegistry>,
 244        path_style: PathStyle,
 245        terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
 246        cx: &mut App,
 247    ) -> Result<Self> {
 248        let title = if tool_call.kind == acp::ToolKind::Execute {
 249            tool_call.title
 250        } else if tool_call.kind == acp::ToolKind::Edit {
 251            MarkdownEscaped(tool_call.title.as_str()).to_string()
 252        } else if let Some((first_line, _)) = tool_call.title.split_once("\n") {
 253            first_line.to_owned() + ""
 254        } else {
 255            tool_call.title
 256        };
 257        let mut content = Vec::with_capacity(tool_call.content.len());
 258        for item in tool_call.content {
 259            if let Some(item) = ToolCallContent::from_acp(
 260                item,
 261                language_registry.clone(),
 262                path_style,
 263                terminals,
 264                cx,
 265            )? {
 266                content.push(item);
 267            }
 268        }
 269
 270        let raw_input_markdown = tool_call
 271            .raw_input
 272            .as_ref()
 273            .and_then(|input| markdown_for_raw_output(input, &language_registry, cx));
 274
 275        let tool_name = tool_name_from_meta(&tool_call.meta);
 276
 277        let subagent_session_info = subagent_session_info_from_meta(&tool_call.meta);
 278
 279        let result = Self {
 280            id: tool_call.tool_call_id,
 281            label: cx
 282                .new(|cx| Markdown::new(title.into(), Some(language_registry.clone()), None, cx)),
 283            kind: tool_call.kind,
 284            content,
 285            locations: tool_call.locations,
 286            resolved_locations: Vec::default(),
 287            status,
 288            raw_input: tool_call.raw_input,
 289            raw_input_markdown,
 290            raw_output: tool_call.raw_output,
 291            tool_name,
 292            subagent_session_info,
 293        };
 294        Ok(result)
 295    }
 296
 297    fn update_fields(
 298        &mut self,
 299        fields: acp::ToolCallUpdateFields,
 300        meta: Option<acp::Meta>,
 301        language_registry: Arc<LanguageRegistry>,
 302        path_style: PathStyle,
 303        terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
 304        cx: &mut App,
 305    ) -> Result<()> {
 306        let acp::ToolCallUpdateFields {
 307            kind,
 308            status,
 309            title,
 310            content,
 311            locations,
 312            raw_input,
 313            raw_output,
 314            ..
 315        } = fields;
 316
 317        if let Some(kind) = kind {
 318            self.kind = kind;
 319        }
 320
 321        if let Some(status) = status {
 322            self.status = status.into();
 323        }
 324
 325        if let Some(subagent_session_info) = subagent_session_info_from_meta(&meta) {
 326            self.subagent_session_info = Some(subagent_session_info);
 327        }
 328
 329        if let Some(title) = title {
 330            if self.kind == acp::ToolKind::Execute {
 331                for terminal in self.terminals() {
 332                    terminal.update(cx, |terminal, cx| {
 333                        terminal.update_command_label(&title, cx);
 334                    });
 335                }
 336            }
 337            self.label.update(cx, |label, cx| {
 338                if self.kind == acp::ToolKind::Execute {
 339                    label.replace(title, cx);
 340                } else if self.kind == acp::ToolKind::Edit {
 341                    label.replace(MarkdownEscaped(&title).to_string(), cx)
 342                } else if let Some((first_line, _)) = title.split_once("\n") {
 343                    label.replace(first_line.to_owned() + "", cx);
 344                } else {
 345                    label.replace(title, cx);
 346                }
 347            });
 348        }
 349
 350        if let Some(content) = content {
 351            let mut new_content_len = content.len();
 352            let mut content = content.into_iter();
 353
 354            // Reuse existing content if we can
 355            for (old, new) in self.content.iter_mut().zip(content.by_ref()) {
 356                let valid_content =
 357                    old.update_from_acp(new, language_registry.clone(), path_style, terminals, cx)?;
 358                if !valid_content {
 359                    new_content_len -= 1;
 360                }
 361            }
 362            for new in content {
 363                if let Some(new) = ToolCallContent::from_acp(
 364                    new,
 365                    language_registry.clone(),
 366                    path_style,
 367                    terminals,
 368                    cx,
 369                )? {
 370                    self.content.push(new);
 371                } else {
 372                    new_content_len -= 1;
 373                }
 374            }
 375            self.content.truncate(new_content_len);
 376        }
 377
 378        if let Some(locations) = locations {
 379            self.locations = locations;
 380        }
 381
 382        if let Some(raw_input) = raw_input {
 383            self.raw_input_markdown = markdown_for_raw_output(&raw_input, &language_registry, cx);
 384            self.raw_input = Some(raw_input);
 385        }
 386
 387        if let Some(raw_output) = raw_output {
 388            if self.content.is_empty()
 389                && let Some(markdown) = markdown_for_raw_output(&raw_output, &language_registry, cx)
 390            {
 391                self.content
 392                    .push(ToolCallContent::ContentBlock(ContentBlock::Markdown {
 393                        markdown,
 394                    }));
 395            }
 396            self.raw_output = Some(raw_output);
 397        }
 398        Ok(())
 399    }
 400
 401    pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
 402        self.content.iter().filter_map(|content| match content {
 403            ToolCallContent::Diff(diff) => Some(diff),
 404            ToolCallContent::ContentBlock(_) => None,
 405            ToolCallContent::Terminal(_) => None,
 406        })
 407    }
 408
 409    pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
 410        self.content.iter().filter_map(|content| match content {
 411            ToolCallContent::Terminal(terminal) => Some(terminal),
 412            ToolCallContent::ContentBlock(_) => None,
 413            ToolCallContent::Diff(_) => None,
 414        })
 415    }
 416
 417    pub fn is_subagent(&self) -> bool {
 418        self.tool_name.as_ref().is_some_and(|s| s == "spawn_agent")
 419            || self.subagent_session_info.is_some()
 420    }
 421
 422    pub fn to_markdown(&self, cx: &App) -> String {
 423        let mut markdown = format!(
 424            "**Tool Call: {}**\nStatus: {}\n\n",
 425            self.label.read(cx).source(),
 426            self.status
 427        );
 428        for content in &self.content {
 429            markdown.push_str(content.to_markdown(cx).as_str());
 430            markdown.push_str("\n\n");
 431        }
 432        markdown
 433    }
 434
 435    async fn resolve_location(
 436        location: acp::ToolCallLocation,
 437        project: WeakEntity<Project>,
 438        cx: &mut AsyncApp,
 439    ) -> Option<ResolvedLocation> {
 440        let buffer = project
 441            .update(cx, |project, cx| {
 442                project
 443                    .project_path_for_absolute_path(&location.path, cx)
 444                    .map(|path| project.open_buffer(path, cx))
 445            })
 446            .ok()??;
 447        let buffer = buffer.await.log_err()?;
 448        let position = buffer.update(cx, |buffer, _| {
 449            let snapshot = buffer.snapshot();
 450            if let Some(row) = location.line {
 451                let column = snapshot.indent_size_for_line(row).len;
 452                let point = snapshot.clip_point(Point::new(row, column), Bias::Left);
 453                snapshot.anchor_before(point)
 454            } else {
 455                Anchor::min_for_buffer(snapshot.remote_id())
 456            }
 457        });
 458
 459        Some(ResolvedLocation { buffer, position })
 460    }
 461
 462    fn resolve_locations(
 463        &self,
 464        project: Entity<Project>,
 465        cx: &mut App,
 466    ) -> Task<Vec<Option<ResolvedLocation>>> {
 467        let locations = self.locations.clone();
 468        project.update(cx, |_, cx| {
 469            cx.spawn(async move |project, cx| {
 470                let mut new_locations = Vec::new();
 471                for location in locations {
 472                    new_locations.push(Self::resolve_location(location, project.clone(), cx).await);
 473                }
 474                new_locations
 475            })
 476        })
 477    }
 478}
 479
 480// Separate so we can hold a strong reference to the buffer
 481// for saving on the thread
 482#[derive(Clone, Debug, PartialEq, Eq)]
 483struct ResolvedLocation {
 484    buffer: Entity<Buffer>,
 485    position: Anchor,
 486}
 487
 488impl From<&ResolvedLocation> for AgentLocation {
 489    fn from(value: &ResolvedLocation) -> Self {
 490        Self {
 491            buffer: value.buffer.downgrade(),
 492            position: value.position,
 493        }
 494    }
 495}
 496
 497#[derive(Debug)]
 498pub enum ToolCallStatus {
 499    /// The tool call hasn't started running yet, but we start showing it to
 500    /// the user.
 501    Pending,
 502    /// The tool call is waiting for confirmation from the user.
 503    WaitingForConfirmation {
 504        options: PermissionOptions,
 505        respond_tx: oneshot::Sender<acp::PermissionOptionId>,
 506    },
 507    /// The tool call is currently running.
 508    InProgress,
 509    /// The tool call completed successfully.
 510    Completed,
 511    /// The tool call failed.
 512    Failed,
 513    /// The user rejected the tool call.
 514    Rejected,
 515    /// The user canceled generation so the tool call was canceled.
 516    Canceled,
 517}
 518
 519impl From<acp::ToolCallStatus> for ToolCallStatus {
 520    fn from(status: acp::ToolCallStatus) -> Self {
 521        match status {
 522            acp::ToolCallStatus::Pending => Self::Pending,
 523            acp::ToolCallStatus::InProgress => Self::InProgress,
 524            acp::ToolCallStatus::Completed => Self::Completed,
 525            acp::ToolCallStatus::Failed => Self::Failed,
 526            _ => Self::Pending,
 527        }
 528    }
 529}
 530
 531impl Display for ToolCallStatus {
 532    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
 533        write!(
 534            f,
 535            "{}",
 536            match self {
 537                ToolCallStatus::Pending => "Pending",
 538                ToolCallStatus::WaitingForConfirmation { .. } => "Waiting for confirmation",
 539                ToolCallStatus::InProgress => "In Progress",
 540                ToolCallStatus::Completed => "Completed",
 541                ToolCallStatus::Failed => "Failed",
 542                ToolCallStatus::Rejected => "Rejected",
 543                ToolCallStatus::Canceled => "Canceled",
 544            }
 545        )
 546    }
 547}
 548
 549#[derive(Debug, PartialEq, Clone)]
 550pub enum ContentBlock {
 551    Empty,
 552    Markdown { markdown: Entity<Markdown> },
 553    ResourceLink { resource_link: acp::ResourceLink },
 554    Image { image: Arc<gpui::Image> },
 555}
 556
 557impl ContentBlock {
 558    pub fn new(
 559        block: acp::ContentBlock,
 560        language_registry: &Arc<LanguageRegistry>,
 561        path_style: PathStyle,
 562        cx: &mut App,
 563    ) -> Self {
 564        let mut this = Self::Empty;
 565        this.append(block, language_registry, path_style, cx);
 566        this
 567    }
 568
 569    pub fn new_combined(
 570        blocks: impl IntoIterator<Item = acp::ContentBlock>,
 571        language_registry: Arc<LanguageRegistry>,
 572        path_style: PathStyle,
 573        cx: &mut App,
 574    ) -> Self {
 575        let mut this = Self::Empty;
 576        for block in blocks {
 577            this.append(block, &language_registry, path_style, cx);
 578        }
 579        this
 580    }
 581
 582    pub fn append(
 583        &mut self,
 584        block: acp::ContentBlock,
 585        language_registry: &Arc<LanguageRegistry>,
 586        path_style: PathStyle,
 587        cx: &mut App,
 588    ) {
 589        match (&mut *self, &block) {
 590            (ContentBlock::Empty, acp::ContentBlock::ResourceLink(resource_link)) => {
 591                *self = ContentBlock::ResourceLink {
 592                    resource_link: resource_link.clone(),
 593                };
 594            }
 595            (ContentBlock::Empty, acp::ContentBlock::Image(image_content)) => {
 596                if let Some(image) = Self::decode_image(image_content) {
 597                    *self = ContentBlock::Image { image };
 598                } else {
 599                    let new_content = Self::image_md(image_content);
 600                    *self = Self::create_markdown_block(new_content, language_registry, cx);
 601                }
 602            }
 603            (ContentBlock::Empty, _) => {
 604                let new_content = Self::block_string_contents(&block, path_style);
 605                *self = Self::create_markdown_block(new_content, language_registry, cx);
 606            }
 607            (ContentBlock::Markdown { markdown }, _) => {
 608                let new_content = Self::block_string_contents(&block, path_style);
 609                markdown.update(cx, |markdown, cx| markdown.append(&new_content, cx));
 610            }
 611            (ContentBlock::ResourceLink { resource_link }, _) => {
 612                let existing_content = Self::resource_link_md(&resource_link.uri, path_style);
 613                let new_content = Self::block_string_contents(&block, path_style);
 614                let combined = format!("{}\n{}", existing_content, new_content);
 615                *self = Self::create_markdown_block(combined, language_registry, cx);
 616            }
 617            (ContentBlock::Image { .. }, _) => {
 618                let new_content = Self::block_string_contents(&block, path_style);
 619                let combined = format!("`Image`\n{}", new_content);
 620                *self = Self::create_markdown_block(combined, language_registry, cx);
 621            }
 622        }
 623    }
 624
 625    fn decode_image(image_content: &acp::ImageContent) -> Option<Arc<gpui::Image>> {
 626        use base64::Engine as _;
 627
 628        let bytes = base64::engine::general_purpose::STANDARD
 629            .decode(image_content.data.as_bytes())
 630            .ok()?;
 631        let format = gpui::ImageFormat::from_mime_type(&image_content.mime_type)?;
 632        Some(Arc::new(gpui::Image::from_bytes(format, bytes)))
 633    }
 634
 635    fn create_markdown_block(
 636        content: String,
 637        language_registry: &Arc<LanguageRegistry>,
 638        cx: &mut App,
 639    ) -> ContentBlock {
 640        ContentBlock::Markdown {
 641            markdown: cx
 642                .new(|cx| Markdown::new(content.into(), Some(language_registry.clone()), None, cx)),
 643        }
 644    }
 645
 646    fn block_string_contents(block: &acp::ContentBlock, path_style: PathStyle) -> String {
 647        match block {
 648            acp::ContentBlock::Text(text_content) => text_content.text.clone(),
 649            acp::ContentBlock::ResourceLink(resource_link) => {
 650                Self::resource_link_md(&resource_link.uri, path_style)
 651            }
 652            acp::ContentBlock::Resource(acp::EmbeddedResource {
 653                resource:
 654                    acp::EmbeddedResourceResource::TextResourceContents(acp::TextResourceContents {
 655                        uri,
 656                        ..
 657                    }),
 658                ..
 659            }) => Self::resource_link_md(uri, path_style),
 660            acp::ContentBlock::Image(image) => Self::image_md(image),
 661            _ => String::new(),
 662        }
 663    }
 664
 665    fn resource_link_md(uri: &str, path_style: PathStyle) -> String {
 666        if let Some(uri) = MentionUri::parse(uri, path_style).log_err() {
 667            uri.as_link().to_string()
 668        } else {
 669            uri.to_string()
 670        }
 671    }
 672
 673    fn image_md(_image: &acp::ImageContent) -> String {
 674        "`Image`".into()
 675    }
 676
 677    pub fn to_markdown<'a>(&'a self, cx: &'a App) -> &'a str {
 678        match self {
 679            ContentBlock::Empty => "",
 680            ContentBlock::Markdown { markdown } => markdown.read(cx).source(),
 681            ContentBlock::ResourceLink { resource_link } => &resource_link.uri,
 682            ContentBlock::Image { .. } => "`Image`",
 683        }
 684    }
 685
 686    pub fn markdown(&self) -> Option<&Entity<Markdown>> {
 687        match self {
 688            ContentBlock::Empty => None,
 689            ContentBlock::Markdown { markdown } => Some(markdown),
 690            ContentBlock::ResourceLink { .. } => None,
 691            ContentBlock::Image { .. } => None,
 692        }
 693    }
 694
 695    pub fn resource_link(&self) -> Option<&acp::ResourceLink> {
 696        match self {
 697            ContentBlock::ResourceLink { resource_link } => Some(resource_link),
 698            _ => None,
 699        }
 700    }
 701
 702    pub fn image(&self) -> Option<&Arc<gpui::Image>> {
 703        match self {
 704            ContentBlock::Image { image } => Some(image),
 705            _ => None,
 706        }
 707    }
 708}
 709
 710#[derive(Debug)]
 711pub enum ToolCallContent {
 712    ContentBlock(ContentBlock),
 713    Diff(Entity<Diff>),
 714    Terminal(Entity<Terminal>),
 715}
 716
 717impl ToolCallContent {
 718    pub fn from_acp(
 719        content: acp::ToolCallContent,
 720        language_registry: Arc<LanguageRegistry>,
 721        path_style: PathStyle,
 722        terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
 723        cx: &mut App,
 724    ) -> Result<Option<Self>> {
 725        match content {
 726            acp::ToolCallContent::Content(acp::Content { content, .. }) => {
 727                Ok(Some(Self::ContentBlock(ContentBlock::new(
 728                    content,
 729                    &language_registry,
 730                    path_style,
 731                    cx,
 732                ))))
 733            }
 734            acp::ToolCallContent::Diff(diff) => Ok(Some(Self::Diff(cx.new(|cx| {
 735                Diff::finalized(
 736                    diff.path.to_string_lossy().into_owned(),
 737                    diff.old_text,
 738                    diff.new_text,
 739                    language_registry,
 740                    cx,
 741                )
 742            })))),
 743            acp::ToolCallContent::Terminal(acp::Terminal { terminal_id, .. }) => terminals
 744                .get(&terminal_id)
 745                .cloned()
 746                .map(|terminal| Some(Self::Terminal(terminal)))
 747                .ok_or_else(|| anyhow::anyhow!("Terminal with id `{}` not found", terminal_id)),
 748            _ => Ok(None),
 749        }
 750    }
 751
 752    pub fn update_from_acp(
 753        &mut self,
 754        new: acp::ToolCallContent,
 755        language_registry: Arc<LanguageRegistry>,
 756        path_style: PathStyle,
 757        terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
 758        cx: &mut App,
 759    ) -> Result<bool> {
 760        let needs_update = match (&self, &new) {
 761            (Self::Diff(old_diff), acp::ToolCallContent::Diff(new_diff)) => {
 762                old_diff.read(cx).needs_update(
 763                    new_diff.old_text.as_deref().unwrap_or(""),
 764                    &new_diff.new_text,
 765                    cx,
 766                )
 767            }
 768            _ => true,
 769        };
 770
 771        if let Some(update) = Self::from_acp(new, language_registry, path_style, terminals, cx)? {
 772            if needs_update {
 773                *self = update;
 774            }
 775            Ok(true)
 776        } else {
 777            Ok(false)
 778        }
 779    }
 780
 781    pub fn to_markdown(&self, cx: &App) -> String {
 782        match self {
 783            Self::ContentBlock(content) => content.to_markdown(cx).to_string(),
 784            Self::Diff(diff) => diff.read(cx).to_markdown(cx),
 785            Self::Terminal(terminal) => terminal.read(cx).to_markdown(cx),
 786        }
 787    }
 788
 789    pub fn image(&self) -> Option<&Arc<gpui::Image>> {
 790        match self {
 791            Self::ContentBlock(content) => content.image(),
 792            _ => None,
 793        }
 794    }
 795}
 796
 797#[derive(Debug, PartialEq)]
 798pub enum ToolCallUpdate {
 799    UpdateFields(acp::ToolCallUpdate),
 800    UpdateDiff(ToolCallUpdateDiff),
 801    UpdateTerminal(ToolCallUpdateTerminal),
 802}
 803
 804impl ToolCallUpdate {
 805    fn id(&self) -> &acp::ToolCallId {
 806        match self {
 807            Self::UpdateFields(update) => &update.tool_call_id,
 808            Self::UpdateDiff(diff) => &diff.id,
 809            Self::UpdateTerminal(terminal) => &terminal.id,
 810        }
 811    }
 812}
 813
 814impl From<acp::ToolCallUpdate> for ToolCallUpdate {
 815    fn from(update: acp::ToolCallUpdate) -> Self {
 816        Self::UpdateFields(update)
 817    }
 818}
 819
 820impl From<ToolCallUpdateDiff> for ToolCallUpdate {
 821    fn from(diff: ToolCallUpdateDiff) -> Self {
 822        Self::UpdateDiff(diff)
 823    }
 824}
 825
 826#[derive(Debug, PartialEq)]
 827pub struct ToolCallUpdateDiff {
 828    pub id: acp::ToolCallId,
 829    pub diff: Entity<Diff>,
 830}
 831
 832impl From<ToolCallUpdateTerminal> for ToolCallUpdate {
 833    fn from(terminal: ToolCallUpdateTerminal) -> Self {
 834        Self::UpdateTerminal(terminal)
 835    }
 836}
 837
 838#[derive(Debug, PartialEq)]
 839pub struct ToolCallUpdateTerminal {
 840    pub id: acp::ToolCallId,
 841    pub terminal: Entity<Terminal>,
 842}
 843
 844#[derive(Debug, Default)]
 845pub struct Plan {
 846    pub entries: Vec<PlanEntry>,
 847}
 848
 849#[derive(Debug)]
 850pub struct PlanStats<'a> {
 851    pub in_progress_entry: Option<&'a PlanEntry>,
 852    pub pending: u32,
 853    pub completed: u32,
 854}
 855
 856impl Plan {
 857    pub fn is_empty(&self) -> bool {
 858        self.entries.is_empty()
 859    }
 860
 861    pub fn stats(&self) -> PlanStats<'_> {
 862        let mut stats = PlanStats {
 863            in_progress_entry: None,
 864            pending: 0,
 865            completed: 0,
 866        };
 867
 868        for entry in &self.entries {
 869            match &entry.status {
 870                acp::PlanEntryStatus::Pending => {
 871                    stats.pending += 1;
 872                }
 873                acp::PlanEntryStatus::InProgress => {
 874                    stats.in_progress_entry = stats.in_progress_entry.or(Some(entry));
 875                }
 876                acp::PlanEntryStatus::Completed => {
 877                    stats.completed += 1;
 878                }
 879                _ => {}
 880            }
 881        }
 882
 883        stats
 884    }
 885}
 886
 887#[derive(Debug)]
 888pub struct PlanEntry {
 889    pub content: Entity<Markdown>,
 890    pub priority: acp::PlanEntryPriority,
 891    pub status: acp::PlanEntryStatus,
 892}
 893
 894impl PlanEntry {
 895    pub fn from_acp(entry: acp::PlanEntry, cx: &mut App) -> Self {
 896        Self {
 897            content: cx.new(|cx| Markdown::new(entry.content.into(), None, None, cx)),
 898            priority: entry.priority,
 899            status: entry.status,
 900        }
 901    }
 902}
 903
 904#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
 905pub struct TokenUsage {
 906    pub max_tokens: u64,
 907    pub used_tokens: u64,
 908    pub input_tokens: u64,
 909    pub output_tokens: u64,
 910    pub max_output_tokens: Option<u64>,
 911}
 912
 913pub const TOKEN_USAGE_WARNING_THRESHOLD: f32 = 0.8;
 914
 915impl TokenUsage {
 916    pub fn ratio(&self) -> TokenUsageRatio {
 917        #[cfg(debug_assertions)]
 918        let warning_threshold: f32 = std::env::var("ZED_THREAD_WARNING_THRESHOLD")
 919            .unwrap_or(TOKEN_USAGE_WARNING_THRESHOLD.to_string())
 920            .parse()
 921            .unwrap();
 922        #[cfg(not(debug_assertions))]
 923        let warning_threshold: f32 = TOKEN_USAGE_WARNING_THRESHOLD;
 924
 925        // When the maximum is unknown because there is no selected model,
 926        // avoid showing the token limit warning.
 927        if self.max_tokens == 0 {
 928            TokenUsageRatio::Normal
 929        } else if self.used_tokens >= self.max_tokens {
 930            TokenUsageRatio::Exceeded
 931        } else if self.used_tokens as f32 / self.max_tokens as f32 >= warning_threshold {
 932            TokenUsageRatio::Warning
 933        } else {
 934            TokenUsageRatio::Normal
 935        }
 936    }
 937}
 938
 939#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
 940pub enum TokenUsageRatio {
 941    Normal,
 942    Warning,
 943    Exceeded,
 944}
 945
 946#[derive(Debug, Clone)]
 947pub struct RetryStatus {
 948    pub last_error: SharedString,
 949    pub attempt: usize,
 950    pub max_attempts: usize,
 951    pub started_at: Instant,
 952    pub duration: Duration,
 953}
 954
 955struct RunningTurn {
 956    id: u32,
 957    send_task: Task<()>,
 958}
 959
 960pub struct AcpThread {
 961    session_id: acp::SessionId,
 962    work_dirs: Option<PathList>,
 963    parent_session_id: Option<acp::SessionId>,
 964    title: SharedString,
 965    provisional_title: Option<SharedString>,
 966    entries: Vec<AgentThreadEntry>,
 967    plan: Plan,
 968    project: Entity<Project>,
 969    action_log: Entity<ActionLog>,
 970    shared_buffers: HashMap<Entity<Buffer>, BufferSnapshot>,
 971    turn_id: u32,
 972    running_turn: Option<RunningTurn>,
 973    connection: Rc<dyn AgentConnection>,
 974    token_usage: Option<TokenUsage>,
 975    prompt_capabilities: acp::PromptCapabilities,
 976    _observe_prompt_capabilities: Task<anyhow::Result<()>>,
 977    terminals: HashMap<acp::TerminalId, Entity<Terminal>>,
 978    pending_terminal_output: HashMap<acp::TerminalId, Vec<Vec<u8>>>,
 979    pending_terminal_exit: HashMap<acp::TerminalId, acp::TerminalExitStatus>,
 980    had_error: bool,
 981    /// The user's unsent prompt text, persisted so it can be restored when reloading the thread.
 982    draft_prompt: Option<Vec<acp::ContentBlock>>,
 983    /// The initial scroll position for the thread view, set during session registration.
 984    ui_scroll_position: Option<gpui::ListOffset>,
 985    /// Buffer for smooth text streaming. Holds text that has been received from
 986    /// the model but not yet revealed in the UI. A timer task drains this buffer
 987    /// gradually to create a fluid typing effect instead of choppy chunk-at-a-time
 988    /// updates.
 989    streaming_text_buffer: Option<StreamingTextBuffer>,
 990}
 991
 992struct StreamingTextBuffer {
 993    /// Text received from the model but not yet appended to the Markdown source.
 994    pending: String,
 995    /// The number of bytes to reveal per timer turn.
 996    bytes_to_reveal_per_tick: usize,
 997    /// The Markdown entity being streamed into.
 998    target: Entity<Markdown>,
 999    /// Timer task that periodically moves text from `pending` into `source`.
1000    _reveal_task: Task<()>,
1001}
1002
1003impl StreamingTextBuffer {
1004    /// The number of milliseconds between each timer tick, controlling how quickly
1005    /// text is revealed.
1006    const TASK_UPDATE_MS: u64 = 16;
1007    /// The time in milliseconds to reveal the entire pending text.
1008    const REVEAL_TARGET: f32 = 200.0;
1009}
1010
1011impl From<&AcpThread> for ActionLogTelemetry {
1012    fn from(value: &AcpThread) -> Self {
1013        Self {
1014            agent_telemetry_id: value.connection().telemetry_id(),
1015            session_id: value.session_id.0.clone(),
1016        }
1017    }
1018}
1019
1020#[derive(Debug)]
1021pub enum AcpThreadEvent {
1022    NewEntry,
1023    TitleUpdated,
1024    TokenUsageUpdated,
1025    EntryUpdated(usize),
1026    EntriesRemoved(Range<usize>),
1027    ToolAuthorizationRequested(acp::ToolCallId),
1028    ToolAuthorizationReceived(acp::ToolCallId),
1029    Retry(RetryStatus),
1030    SubagentSpawned(acp::SessionId),
1031    Stopped(acp::StopReason),
1032    Error,
1033    LoadError(LoadError),
1034    PromptCapabilitiesUpdated,
1035    Refusal,
1036    AvailableCommandsUpdated(Vec<acp::AvailableCommand>),
1037    ModeUpdated(acp::SessionModeId),
1038    ConfigOptionsUpdated(Vec<acp::SessionConfigOption>),
1039}
1040
1041impl EventEmitter<AcpThreadEvent> for AcpThread {}
1042
1043#[derive(Debug, Clone)]
1044pub enum TerminalProviderEvent {
1045    Created {
1046        terminal_id: acp::TerminalId,
1047        label: String,
1048        cwd: Option<PathBuf>,
1049        output_byte_limit: Option<u64>,
1050        terminal: Entity<::terminal::Terminal>,
1051    },
1052    Output {
1053        terminal_id: acp::TerminalId,
1054        data: Vec<u8>,
1055    },
1056    TitleChanged {
1057        terminal_id: acp::TerminalId,
1058        title: String,
1059    },
1060    Exit {
1061        terminal_id: acp::TerminalId,
1062        status: acp::TerminalExitStatus,
1063    },
1064}
1065
1066#[derive(Debug, Clone)]
1067pub enum TerminalProviderCommand {
1068    WriteInput {
1069        terminal_id: acp::TerminalId,
1070        bytes: Vec<u8>,
1071    },
1072    Resize {
1073        terminal_id: acp::TerminalId,
1074        cols: u16,
1075        rows: u16,
1076    },
1077    Close {
1078        terminal_id: acp::TerminalId,
1079    },
1080}
1081
1082#[derive(PartialEq, Eq, Debug)]
1083pub enum ThreadStatus {
1084    Idle,
1085    Generating,
1086}
1087
1088#[derive(Debug, Clone)]
1089pub enum LoadError {
1090    Unsupported {
1091        command: SharedString,
1092        current_version: SharedString,
1093        minimum_version: SharedString,
1094    },
1095    FailedToInstall(SharedString),
1096    Exited {
1097        status: ExitStatus,
1098    },
1099    Other(SharedString),
1100}
1101
1102impl Display for LoadError {
1103    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1104        match self {
1105            LoadError::Unsupported {
1106                command: path,
1107                current_version,
1108                minimum_version,
1109            } => {
1110                write!(
1111                    f,
1112                    "version {current_version} from {path} is not supported (need at least {minimum_version})"
1113                )
1114            }
1115            LoadError::FailedToInstall(msg) => write!(f, "Failed to install: {msg}"),
1116            LoadError::Exited { status } => write!(f, "Server exited with status {status}"),
1117            LoadError::Other(msg) => write!(f, "{msg}"),
1118        }
1119    }
1120}
1121
1122impl Error for LoadError {}
1123
1124impl AcpThread {
1125    pub fn new(
1126        parent_session_id: Option<acp::SessionId>,
1127        title: impl Into<SharedString>,
1128        work_dirs: Option<PathList>,
1129        connection: Rc<dyn AgentConnection>,
1130        project: Entity<Project>,
1131        action_log: Entity<ActionLog>,
1132        session_id: acp::SessionId,
1133        mut prompt_capabilities_rx: watch::Receiver<acp::PromptCapabilities>,
1134        cx: &mut Context<Self>,
1135    ) -> Self {
1136        let prompt_capabilities = prompt_capabilities_rx.borrow().clone();
1137        let task = cx.spawn::<_, anyhow::Result<()>>(async move |this, cx| {
1138            loop {
1139                let caps = prompt_capabilities_rx.recv().await?;
1140                this.update(cx, |this, cx| {
1141                    this.prompt_capabilities = caps;
1142                    cx.emit(AcpThreadEvent::PromptCapabilitiesUpdated);
1143                })?;
1144            }
1145        });
1146
1147        Self {
1148            parent_session_id,
1149            work_dirs,
1150            action_log,
1151            shared_buffers: Default::default(),
1152            entries: Default::default(),
1153            plan: Default::default(),
1154            title: title.into(),
1155            provisional_title: None,
1156            project,
1157            running_turn: None,
1158            turn_id: 0,
1159            connection,
1160            session_id,
1161            token_usage: None,
1162            prompt_capabilities,
1163            _observe_prompt_capabilities: task,
1164            terminals: HashMap::default(),
1165            pending_terminal_output: HashMap::default(),
1166            pending_terminal_exit: HashMap::default(),
1167            had_error: false,
1168            draft_prompt: None,
1169            ui_scroll_position: None,
1170            streaming_text_buffer: None,
1171        }
1172    }
1173
1174    pub fn parent_session_id(&self) -> Option<&acp::SessionId> {
1175        self.parent_session_id.as_ref()
1176    }
1177
1178    pub fn prompt_capabilities(&self) -> acp::PromptCapabilities {
1179        self.prompt_capabilities.clone()
1180    }
1181
1182    pub fn draft_prompt(&self) -> Option<&[acp::ContentBlock]> {
1183        self.draft_prompt.as_deref()
1184    }
1185
1186    pub fn set_draft_prompt(&mut self, prompt: Option<Vec<acp::ContentBlock>>) {
1187        self.draft_prompt = prompt;
1188    }
1189
1190    pub fn ui_scroll_position(&self) -> Option<gpui::ListOffset> {
1191        self.ui_scroll_position
1192    }
1193
1194    pub fn set_ui_scroll_position(&mut self, position: Option<gpui::ListOffset>) {
1195        self.ui_scroll_position = position;
1196    }
1197
1198    pub fn connection(&self) -> &Rc<dyn AgentConnection> {
1199        &self.connection
1200    }
1201
1202    pub fn action_log(&self) -> &Entity<ActionLog> {
1203        &self.action_log
1204    }
1205
1206    pub fn project(&self) -> &Entity<Project> {
1207        &self.project
1208    }
1209
1210    pub fn title(&self) -> SharedString {
1211        self.provisional_title
1212            .clone()
1213            .unwrap_or_else(|| self.title.clone())
1214    }
1215
1216    pub fn has_provisional_title(&self) -> bool {
1217        self.provisional_title.is_some()
1218    }
1219
1220    pub fn entries(&self) -> &[AgentThreadEntry] {
1221        &self.entries
1222    }
1223
1224    pub fn session_id(&self) -> &acp::SessionId {
1225        &self.session_id
1226    }
1227
1228    pub fn work_dirs(&self) -> Option<&PathList> {
1229        self.work_dirs.as_ref()
1230    }
1231
1232    pub fn status(&self) -> ThreadStatus {
1233        if self.running_turn.is_some() {
1234            ThreadStatus::Generating
1235        } else {
1236            ThreadStatus::Idle
1237        }
1238    }
1239
1240    pub fn had_error(&self) -> bool {
1241        self.had_error
1242    }
1243
1244    pub fn is_waiting_for_confirmation(&self) -> bool {
1245        for entry in self.entries.iter().rev() {
1246            match entry {
1247                AgentThreadEntry::UserMessage(_) => return false,
1248                AgentThreadEntry::ToolCall(ToolCall {
1249                    status: ToolCallStatus::WaitingForConfirmation { .. },
1250                    ..
1251                }) => return true,
1252                AgentThreadEntry::ToolCall(_) | AgentThreadEntry::AssistantMessage(_) => {}
1253            }
1254        }
1255        false
1256    }
1257
1258    pub fn token_usage(&self) -> Option<&TokenUsage> {
1259        self.token_usage.as_ref()
1260    }
1261
1262    pub fn has_pending_edit_tool_calls(&self) -> bool {
1263        for entry in self.entries.iter().rev() {
1264            match entry {
1265                AgentThreadEntry::UserMessage(_) => return false,
1266                AgentThreadEntry::ToolCall(
1267                    call @ ToolCall {
1268                        status: ToolCallStatus::InProgress | ToolCallStatus::Pending,
1269                        ..
1270                    },
1271                ) if call.diffs().next().is_some() => {
1272                    return true;
1273                }
1274                AgentThreadEntry::ToolCall(_) | AgentThreadEntry::AssistantMessage(_) => {}
1275            }
1276        }
1277
1278        false
1279    }
1280
1281    pub fn has_in_progress_tool_calls(&self) -> bool {
1282        for entry in self.entries.iter().rev() {
1283            match entry {
1284                AgentThreadEntry::UserMessage(_) => return false,
1285                AgentThreadEntry::ToolCall(ToolCall {
1286                    status: ToolCallStatus::InProgress | ToolCallStatus::Pending,
1287                    ..
1288                }) => {
1289                    return true;
1290                }
1291                AgentThreadEntry::ToolCall(_) | AgentThreadEntry::AssistantMessage(_) => {}
1292            }
1293        }
1294
1295        false
1296    }
1297
1298    pub fn used_tools_since_last_user_message(&self) -> bool {
1299        for entry in self.entries.iter().rev() {
1300            match entry {
1301                AgentThreadEntry::UserMessage(..) => return false,
1302                AgentThreadEntry::AssistantMessage(..) => continue,
1303                AgentThreadEntry::ToolCall(..) => return true,
1304            }
1305        }
1306
1307        false
1308    }
1309
1310    pub fn handle_session_update(
1311        &mut self,
1312        update: acp::SessionUpdate,
1313        cx: &mut Context<Self>,
1314    ) -> Result<(), acp::Error> {
1315        match update {
1316            acp::SessionUpdate::UserMessageChunk(acp::ContentChunk { content, .. }) => {
1317                self.push_user_content_block(None, content, cx);
1318            }
1319            acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk { content, .. }) => {
1320                self.push_assistant_content_block(content, false, cx);
1321            }
1322            acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk { content, .. }) => {
1323                self.push_assistant_content_block(content, true, cx);
1324            }
1325            acp::SessionUpdate::ToolCall(tool_call) => {
1326                self.upsert_tool_call(tool_call, cx)?;
1327            }
1328            acp::SessionUpdate::ToolCallUpdate(tool_call_update) => {
1329                self.update_tool_call(tool_call_update, cx)?;
1330            }
1331            acp::SessionUpdate::Plan(plan) => {
1332                self.update_plan(plan, cx);
1333            }
1334            acp::SessionUpdate::SessionInfoUpdate(info_update) => {
1335                if let acp::MaybeUndefined::Value(title) = info_update.title {
1336                    let had_provisional = self.provisional_title.take().is_some();
1337                    let title: SharedString = title.into();
1338                    if title != self.title {
1339                        self.title = title;
1340                        cx.emit(AcpThreadEvent::TitleUpdated);
1341                    } else if had_provisional {
1342                        cx.emit(AcpThreadEvent::TitleUpdated);
1343                    }
1344                }
1345            }
1346            acp::SessionUpdate::AvailableCommandsUpdate(acp::AvailableCommandsUpdate {
1347                available_commands,
1348                ..
1349            }) => cx.emit(AcpThreadEvent::AvailableCommandsUpdated(available_commands)),
1350            acp::SessionUpdate::CurrentModeUpdate(acp::CurrentModeUpdate {
1351                current_mode_id,
1352                ..
1353            }) => cx.emit(AcpThreadEvent::ModeUpdated(current_mode_id)),
1354            acp::SessionUpdate::ConfigOptionUpdate(acp::ConfigOptionUpdate {
1355                config_options,
1356                ..
1357            }) => cx.emit(AcpThreadEvent::ConfigOptionsUpdated(config_options)),
1358            _ => {}
1359        }
1360        Ok(())
1361    }
1362
1363    pub fn push_user_content_block(
1364        &mut self,
1365        message_id: Option<UserMessageId>,
1366        chunk: acp::ContentBlock,
1367        cx: &mut Context<Self>,
1368    ) {
1369        self.push_user_content_block_with_indent(message_id, chunk, false, cx)
1370    }
1371
1372    pub fn push_user_content_block_with_indent(
1373        &mut self,
1374        message_id: Option<UserMessageId>,
1375        chunk: acp::ContentBlock,
1376        indented: bool,
1377        cx: &mut Context<Self>,
1378    ) {
1379        let language_registry = self.project.read(cx).languages().clone();
1380        let path_style = self.project.read(cx).path_style(cx);
1381        let entries_len = self.entries.len();
1382
1383        if let Some(last_entry) = self.entries.last_mut()
1384            && let AgentThreadEntry::UserMessage(UserMessage {
1385                id,
1386                content,
1387                chunks,
1388                indented: existing_indented,
1389                ..
1390            }) = last_entry
1391            && *existing_indented == indented
1392        {
1393            Self::flush_streaming_text(&mut self.streaming_text_buffer, cx);
1394            *id = message_id.or(id.take());
1395            content.append(chunk.clone(), &language_registry, path_style, cx);
1396            chunks.push(chunk);
1397            let idx = entries_len - 1;
1398            cx.emit(AcpThreadEvent::EntryUpdated(idx));
1399        } else {
1400            let content = ContentBlock::new(chunk.clone(), &language_registry, path_style, cx);
1401            self.push_entry(
1402                AgentThreadEntry::UserMessage(UserMessage {
1403                    id: message_id,
1404                    content,
1405                    chunks: vec![chunk],
1406                    checkpoint: None,
1407                    indented,
1408                }),
1409                cx,
1410            );
1411        }
1412    }
1413
1414    pub fn push_assistant_content_block(
1415        &mut self,
1416        chunk: acp::ContentBlock,
1417        is_thought: bool,
1418        cx: &mut Context<Self>,
1419    ) {
1420        self.push_assistant_content_block_with_indent(chunk, is_thought, false, cx)
1421    }
1422
1423    pub fn push_assistant_content_block_with_indent(
1424        &mut self,
1425        chunk: acp::ContentBlock,
1426        is_thought: bool,
1427        indented: bool,
1428        cx: &mut Context<Self>,
1429    ) {
1430        let path_style = self.project.read(cx).path_style(cx);
1431
1432        // For text chunks going to an existing Markdown block, buffer for smooth
1433        // streaming instead of appending all at once which may feel more choppy.
1434        if let acp::ContentBlock::Text(text_content) = &chunk {
1435            if let Some(markdown) = self.streaming_markdown_target(is_thought, indented) {
1436                let entries_len = self.entries.len();
1437                cx.emit(AcpThreadEvent::EntryUpdated(entries_len - 1));
1438                self.buffer_streaming_text(&markdown, text_content.text.clone(), cx);
1439                return;
1440            }
1441        }
1442
1443        let language_registry = self.project.read(cx).languages().clone();
1444        let entries_len = self.entries.len();
1445        if let Some(last_entry) = self.entries.last_mut()
1446            && let AgentThreadEntry::AssistantMessage(AssistantMessage {
1447                chunks,
1448                indented: existing_indented,
1449                is_subagent_output: _,
1450            }) = last_entry
1451            && *existing_indented == indented
1452        {
1453            let idx = entries_len - 1;
1454            Self::flush_streaming_text(&mut self.streaming_text_buffer, cx);
1455            cx.emit(AcpThreadEvent::EntryUpdated(idx));
1456            match (chunks.last_mut(), is_thought) {
1457                (Some(AssistantMessageChunk::Message { block }), false)
1458                | (Some(AssistantMessageChunk::Thought { block }), true) => {
1459                    block.append(chunk, &language_registry, path_style, cx)
1460                }
1461                _ => {
1462                    let block = ContentBlock::new(chunk, &language_registry, path_style, cx);
1463                    if is_thought {
1464                        chunks.push(AssistantMessageChunk::Thought { block })
1465                    } else {
1466                        chunks.push(AssistantMessageChunk::Message { block })
1467                    }
1468                }
1469            }
1470        } else {
1471            let block = ContentBlock::new(chunk, &language_registry, path_style, cx);
1472            let chunk = if is_thought {
1473                AssistantMessageChunk::Thought { block }
1474            } else {
1475                AssistantMessageChunk::Message { block }
1476            };
1477
1478            self.push_entry(
1479                AgentThreadEntry::AssistantMessage(AssistantMessage {
1480                    chunks: vec![chunk],
1481                    indented,
1482                    is_subagent_output: false,
1483                }),
1484                cx,
1485            );
1486        }
1487    }
1488
1489    fn streaming_markdown_target(
1490        &self,
1491        is_thought: bool,
1492        indented: bool,
1493    ) -> Option<Entity<Markdown>> {
1494        let last_entry = self.entries.last()?;
1495        if let AgentThreadEntry::AssistantMessage(AssistantMessage {
1496            chunks,
1497            indented: existing_indented,
1498            ..
1499        }) = last_entry
1500            && *existing_indented == indented
1501            && let [.., chunk] = chunks.as_slice()
1502        {
1503            match (chunk, is_thought) {
1504                (
1505                    AssistantMessageChunk::Message {
1506                        block: ContentBlock::Markdown { markdown },
1507                    },
1508                    false,
1509                )
1510                | (
1511                    AssistantMessageChunk::Thought {
1512                        block: ContentBlock::Markdown { markdown },
1513                    },
1514                    true,
1515                ) => Some(markdown.clone()),
1516                _ => None,
1517            }
1518        } else {
1519            None
1520        }
1521    }
1522
1523    /// Add text to the streaming buffer. If the target changed (e.g. switching
1524    /// from thoughts to message text), flush the old buffer first.
1525    fn buffer_streaming_text(
1526        &mut self,
1527        markdown: &Entity<Markdown>,
1528        text: String,
1529        cx: &mut Context<Self>,
1530    ) {
1531        if let Some(buffer) = &mut self.streaming_text_buffer {
1532            if buffer.target.entity_id() == markdown.entity_id() {
1533                buffer.pending.push_str(&text);
1534
1535                buffer.bytes_to_reveal_per_tick = (buffer.pending.len() as f32
1536                    / StreamingTextBuffer::REVEAL_TARGET
1537                    * StreamingTextBuffer::TASK_UPDATE_MS as f32)
1538                    .ceil() as usize;
1539                return;
1540            }
1541            Self::flush_streaming_text(&mut self.streaming_text_buffer, cx);
1542        }
1543
1544        let target = markdown.clone();
1545        let _reveal_task = self.start_streaming_reveal(cx);
1546        let pending_len = text.len();
1547        let bytes_to_reveal = (pending_len as f32 / StreamingTextBuffer::REVEAL_TARGET
1548            * StreamingTextBuffer::TASK_UPDATE_MS as f32)
1549            .ceil() as usize;
1550        self.streaming_text_buffer = Some(StreamingTextBuffer {
1551            pending: text,
1552            bytes_to_reveal_per_tick: bytes_to_reveal,
1553            target,
1554            _reveal_task,
1555        });
1556    }
1557
1558    /// Flush all buffered streaming text into the Markdown entity immediately.
1559    fn flush_streaming_text(
1560        streaming_text_buffer: &mut Option<StreamingTextBuffer>,
1561        cx: &mut Context<Self>,
1562    ) {
1563        if let Some(buffer) = streaming_text_buffer.take() {
1564            if !buffer.pending.is_empty() {
1565                buffer
1566                    .target
1567                    .update(cx, |markdown, cx| markdown.append(&buffer.pending, cx));
1568            }
1569        }
1570    }
1571
1572    /// Spawns a foreground task that periodically drains
1573    /// `streaming_text_buffer.pending` into the target `Markdown` entity,
1574    /// producing smooth, continuous text output.
1575    fn start_streaming_reveal(&self, cx: &mut Context<Self>) -> Task<()> {
1576        cx.spawn(async move |this, cx| {
1577            loop {
1578                cx.background_executor()
1579                    .timer(Duration::from_millis(StreamingTextBuffer::TASK_UPDATE_MS))
1580                    .await;
1581
1582                let should_continue = this
1583                    .update(cx, |this, cx| {
1584                        let Some(buffer) = &mut this.streaming_text_buffer else {
1585                            return false;
1586                        };
1587
1588                        if buffer.pending.is_empty() {
1589                            return true;
1590                        }
1591
1592                        let pending_len = buffer.pending.len();
1593
1594                        let byte_boundary = buffer
1595                            .pending
1596                            .ceil_char_boundary(buffer.bytes_to_reveal_per_tick)
1597                            .min(pending_len);
1598
1599                        buffer.target.update(cx, |markdown: &mut Markdown, cx| {
1600                            markdown.append(&buffer.pending[..byte_boundary], cx);
1601                            buffer.pending.drain(..byte_boundary);
1602                        });
1603
1604                        true
1605                    })
1606                    .unwrap_or(false);
1607
1608                if !should_continue {
1609                    break;
1610                }
1611            }
1612        })
1613    }
1614
1615    fn push_entry(&mut self, entry: AgentThreadEntry, cx: &mut Context<Self>) {
1616        Self::flush_streaming_text(&mut self.streaming_text_buffer, cx);
1617        self.entries.push(entry);
1618        cx.emit(AcpThreadEvent::NewEntry);
1619    }
1620
1621    pub fn can_set_title(&mut self, cx: &mut Context<Self>) -> bool {
1622        self.connection.set_title(&self.session_id, cx).is_some()
1623    }
1624
1625    pub fn set_title(&mut self, title: SharedString, cx: &mut Context<Self>) -> Task<Result<()>> {
1626        let had_provisional = self.provisional_title.take().is_some();
1627        if title != self.title {
1628            self.title = title.clone();
1629            cx.emit(AcpThreadEvent::TitleUpdated);
1630            if let Some(set_title) = self.connection.set_title(&self.session_id, cx) {
1631                return set_title.run(title, cx);
1632            }
1633        } else if had_provisional {
1634            cx.emit(AcpThreadEvent::TitleUpdated);
1635        }
1636        Task::ready(Ok(()))
1637    }
1638
1639    /// Sets a provisional display title without propagating back to the
1640    /// underlying agent connection. This is used for quick preview titles
1641    /// (e.g. first 20 chars of the user message) that should be shown
1642    /// immediately but replaced once the LLM generates a proper title via
1643    /// `set_title`.
1644    pub fn set_provisional_title(&mut self, title: SharedString, cx: &mut Context<Self>) {
1645        self.provisional_title = Some(title);
1646        cx.emit(AcpThreadEvent::TitleUpdated);
1647    }
1648
1649    pub fn subagent_spawned(&mut self, session_id: acp::SessionId, cx: &mut Context<Self>) {
1650        cx.emit(AcpThreadEvent::SubagentSpawned(session_id));
1651    }
1652
1653    pub fn update_token_usage(&mut self, usage: Option<TokenUsage>, cx: &mut Context<Self>) {
1654        self.token_usage = usage;
1655        cx.emit(AcpThreadEvent::TokenUsageUpdated);
1656    }
1657
1658    pub fn update_retry_status(&mut self, status: RetryStatus, cx: &mut Context<Self>) {
1659        cx.emit(AcpThreadEvent::Retry(status));
1660    }
1661
1662    pub fn update_tool_call(
1663        &mut self,
1664        update: impl Into<ToolCallUpdate>,
1665        cx: &mut Context<Self>,
1666    ) -> Result<()> {
1667        let update = update.into();
1668        let languages = self.project.read(cx).languages().clone();
1669        let path_style = self.project.read(cx).path_style(cx);
1670
1671        let ix = match self.index_for_tool_call(update.id()) {
1672            Some(ix) => ix,
1673            None => {
1674                // Tool call not found - create a failed tool call entry
1675                let failed_tool_call = ToolCall {
1676                    id: update.id().clone(),
1677                    label: cx.new(|cx| Markdown::new("Tool call not found".into(), None, None, cx)),
1678                    kind: acp::ToolKind::Fetch,
1679                    content: vec![ToolCallContent::ContentBlock(ContentBlock::new(
1680                        "Tool call not found".into(),
1681                        &languages,
1682                        path_style,
1683                        cx,
1684                    ))],
1685                    status: ToolCallStatus::Failed,
1686                    locations: Vec::new(),
1687                    resolved_locations: Vec::new(),
1688                    raw_input: None,
1689                    raw_input_markdown: None,
1690                    raw_output: None,
1691                    tool_name: None,
1692                    subagent_session_info: None,
1693                };
1694                self.push_entry(AgentThreadEntry::ToolCall(failed_tool_call), cx);
1695                return Ok(());
1696            }
1697        };
1698        let AgentThreadEntry::ToolCall(call) = &mut self.entries[ix] else {
1699            unreachable!()
1700        };
1701
1702        match update {
1703            ToolCallUpdate::UpdateFields(update) => {
1704                let location_updated = update.fields.locations.is_some();
1705                call.update_fields(
1706                    update.fields,
1707                    update.meta,
1708                    languages,
1709                    path_style,
1710                    &self.terminals,
1711                    cx,
1712                )?;
1713                if location_updated {
1714                    self.resolve_locations(update.tool_call_id, cx);
1715                }
1716            }
1717            ToolCallUpdate::UpdateDiff(update) => {
1718                call.content.clear();
1719                call.content.push(ToolCallContent::Diff(update.diff));
1720            }
1721            ToolCallUpdate::UpdateTerminal(update) => {
1722                call.content.clear();
1723                call.content
1724                    .push(ToolCallContent::Terminal(update.terminal));
1725            }
1726        }
1727
1728        cx.emit(AcpThreadEvent::EntryUpdated(ix));
1729
1730        Ok(())
1731    }
1732
1733    /// Updates a tool call if id matches an existing entry, otherwise inserts a new one.
1734    pub fn upsert_tool_call(
1735        &mut self,
1736        tool_call: acp::ToolCall,
1737        cx: &mut Context<Self>,
1738    ) -> Result<(), acp::Error> {
1739        let status = tool_call.status.into();
1740        self.upsert_tool_call_inner(tool_call.into(), status, cx)
1741    }
1742
1743    /// Fails if id does not match an existing entry.
1744    pub fn upsert_tool_call_inner(
1745        &mut self,
1746        update: acp::ToolCallUpdate,
1747        status: ToolCallStatus,
1748        cx: &mut Context<Self>,
1749    ) -> Result<(), acp::Error> {
1750        let language_registry = self.project.read(cx).languages().clone();
1751        let path_style = self.project.read(cx).path_style(cx);
1752        let id = update.tool_call_id.clone();
1753
1754        let agent_telemetry_id = self.connection().telemetry_id();
1755        let session = self.session_id();
1756        let parent_session_id = self.parent_session_id();
1757        if let ToolCallStatus::Completed | ToolCallStatus::Failed = status {
1758            let status = if matches!(status, ToolCallStatus::Completed) {
1759                "completed"
1760            } else {
1761                "failed"
1762            };
1763            telemetry::event!(
1764                "Agent Tool Call Completed",
1765                agent_telemetry_id,
1766                session,
1767                parent_session_id,
1768                status
1769            );
1770        }
1771
1772        if let Some(ix) = self.index_for_tool_call(&id) {
1773            let AgentThreadEntry::ToolCall(call) = &mut self.entries[ix] else {
1774                unreachable!()
1775            };
1776
1777            call.update_fields(
1778                update.fields,
1779                update.meta,
1780                language_registry,
1781                path_style,
1782                &self.terminals,
1783                cx,
1784            )?;
1785            call.status = status;
1786
1787            cx.emit(AcpThreadEvent::EntryUpdated(ix));
1788        } else {
1789            let call = ToolCall::from_acp(
1790                update.try_into()?,
1791                status,
1792                language_registry,
1793                self.project.read(cx).path_style(cx),
1794                &self.terminals,
1795                cx,
1796            )?;
1797            self.push_entry(AgentThreadEntry::ToolCall(call), cx);
1798        };
1799
1800        self.resolve_locations(id, cx);
1801        Ok(())
1802    }
1803
1804    fn index_for_tool_call(&self, id: &acp::ToolCallId) -> Option<usize> {
1805        self.entries
1806            .iter()
1807            .enumerate()
1808            .rev()
1809            .find_map(|(index, entry)| {
1810                if let AgentThreadEntry::ToolCall(tool_call) = entry
1811                    && &tool_call.id == id
1812                {
1813                    Some(index)
1814                } else {
1815                    None
1816                }
1817            })
1818    }
1819
1820    fn tool_call_mut(&mut self, id: &acp::ToolCallId) -> Option<(usize, &mut ToolCall)> {
1821        // The tool call we are looking for is typically the last one, or very close to the end.
1822        // At the moment, it doesn't seem like a hashmap would be a good fit for this use case.
1823        self.entries
1824            .iter_mut()
1825            .enumerate()
1826            .rev()
1827            .find_map(|(index, tool_call)| {
1828                if let AgentThreadEntry::ToolCall(tool_call) = tool_call
1829                    && &tool_call.id == id
1830                {
1831                    Some((index, tool_call))
1832                } else {
1833                    None
1834                }
1835            })
1836    }
1837
1838    pub fn tool_call(&self, id: &acp::ToolCallId) -> Option<(usize, &ToolCall)> {
1839        self.entries
1840            .iter()
1841            .enumerate()
1842            .rev()
1843            .find_map(|(index, tool_call)| {
1844                if let AgentThreadEntry::ToolCall(tool_call) = tool_call
1845                    && &tool_call.id == id
1846                {
1847                    Some((index, tool_call))
1848                } else {
1849                    None
1850                }
1851            })
1852    }
1853
1854    pub fn tool_call_for_subagent(&self, session_id: &acp::SessionId) -> Option<&ToolCall> {
1855        self.entries.iter().find_map(|entry| match entry {
1856            AgentThreadEntry::ToolCall(tool_call) => {
1857                if let Some(subagent_session_info) = &tool_call.subagent_session_info
1858                    && &subagent_session_info.session_id == session_id
1859                {
1860                    Some(tool_call)
1861                } else {
1862                    None
1863                }
1864            }
1865            _ => None,
1866        })
1867    }
1868
1869    pub fn resolve_locations(&mut self, id: acp::ToolCallId, cx: &mut Context<Self>) {
1870        let project = self.project.clone();
1871        let should_update_agent_location = self.parent_session_id.is_none();
1872        let Some((_, tool_call)) = self.tool_call_mut(&id) else {
1873            return;
1874        };
1875        let task = tool_call.resolve_locations(project, cx);
1876        cx.spawn(async move |this, cx| {
1877            let resolved_locations = task.await;
1878
1879            this.update(cx, |this, cx| {
1880                let project = this.project.clone();
1881
1882                for location in resolved_locations.iter().flatten() {
1883                    this.shared_buffers
1884                        .insert(location.buffer.clone(), location.buffer.read(cx).snapshot());
1885                }
1886                let Some((ix, tool_call)) = this.tool_call_mut(&id) else {
1887                    return;
1888                };
1889
1890                if let Some(Some(location)) = resolved_locations.last() {
1891                    project.update(cx, |project, cx| {
1892                        let should_ignore = if let Some(agent_location) = project
1893                            .agent_location()
1894                            .filter(|agent_location| agent_location.buffer == location.buffer)
1895                        {
1896                            let snapshot = location.buffer.read(cx).snapshot();
1897                            let old_position = agent_location.position.to_point(&snapshot);
1898                            let new_position = location.position.to_point(&snapshot);
1899
1900                            // ignore this so that when we get updates from the edit tool
1901                            // the position doesn't reset to the startof line
1902                            old_position.row == new_position.row
1903                                && old_position.column > new_position.column
1904                        } else {
1905                            false
1906                        };
1907                        if !should_ignore && should_update_agent_location {
1908                            project.set_agent_location(Some(location.into()), cx);
1909                        }
1910                    });
1911                }
1912
1913                let resolved_locations = resolved_locations
1914                    .iter()
1915                    .map(|l| l.as_ref().map(|l| AgentLocation::from(l)))
1916                    .collect::<Vec<_>>();
1917
1918                if tool_call.resolved_locations != resolved_locations {
1919                    tool_call.resolved_locations = resolved_locations;
1920                    cx.emit(AcpThreadEvent::EntryUpdated(ix));
1921                }
1922            })
1923        })
1924        .detach();
1925    }
1926
1927    pub fn request_tool_call_authorization(
1928        &mut self,
1929        tool_call: acp::ToolCallUpdate,
1930        options: PermissionOptions,
1931        cx: &mut Context<Self>,
1932    ) -> Result<Task<acp::RequestPermissionOutcome>> {
1933        let (tx, rx) = oneshot::channel();
1934
1935        let status = ToolCallStatus::WaitingForConfirmation {
1936            options,
1937            respond_tx: tx,
1938        };
1939
1940        let tool_call_id = tool_call.tool_call_id.clone();
1941        self.upsert_tool_call_inner(tool_call, status, cx)?;
1942        cx.emit(AcpThreadEvent::ToolAuthorizationRequested(
1943            tool_call_id.clone(),
1944        ));
1945
1946        Ok(cx.spawn(async move |this, cx| {
1947            let outcome = match rx.await {
1948                Ok(option) => acp::RequestPermissionOutcome::Selected(
1949                    acp::SelectedPermissionOutcome::new(option),
1950                ),
1951                Err(oneshot::Canceled) => acp::RequestPermissionOutcome::Cancelled,
1952            };
1953            this.update(cx, |_this, cx| {
1954                cx.emit(AcpThreadEvent::ToolAuthorizationReceived(tool_call_id))
1955            })
1956            .ok();
1957            outcome
1958        }))
1959    }
1960
1961    pub fn authorize_tool_call(
1962        &mut self,
1963        id: acp::ToolCallId,
1964        option_id: acp::PermissionOptionId,
1965        option_kind: acp::PermissionOptionKind,
1966        cx: &mut Context<Self>,
1967    ) {
1968        let Some((ix, call)) = self.tool_call_mut(&id) else {
1969            return;
1970        };
1971
1972        let new_status = match option_kind {
1973            acp::PermissionOptionKind::RejectOnce | acp::PermissionOptionKind::RejectAlways => {
1974                ToolCallStatus::Rejected
1975            }
1976            acp::PermissionOptionKind::AllowOnce | acp::PermissionOptionKind::AllowAlways => {
1977                ToolCallStatus::InProgress
1978            }
1979            _ => ToolCallStatus::InProgress,
1980        };
1981
1982        let curr_status = mem::replace(&mut call.status, new_status);
1983
1984        if let ToolCallStatus::WaitingForConfirmation { respond_tx, .. } = curr_status {
1985            respond_tx.send(option_id).log_err();
1986        } else if cfg!(debug_assertions) {
1987            panic!("tried to authorize an already authorized tool call");
1988        }
1989
1990        cx.emit(AcpThreadEvent::EntryUpdated(ix));
1991    }
1992
1993    pub fn plan(&self) -> &Plan {
1994        &self.plan
1995    }
1996
1997    pub fn update_plan(&mut self, request: acp::Plan, cx: &mut Context<Self>) {
1998        let new_entries_len = request.entries.len();
1999        let mut new_entries = request.entries.into_iter();
2000
2001        // Reuse existing markdown to prevent flickering
2002        for (old, new) in self.plan.entries.iter_mut().zip(new_entries.by_ref()) {
2003            let PlanEntry {
2004                content,
2005                priority,
2006                status,
2007            } = old;
2008            content.update(cx, |old, cx| {
2009                old.replace(new.content, cx);
2010            });
2011            *priority = new.priority;
2012            *status = new.status;
2013        }
2014        for new in new_entries {
2015            self.plan.entries.push(PlanEntry::from_acp(new, cx))
2016        }
2017        self.plan.entries.truncate(new_entries_len);
2018
2019        cx.notify();
2020    }
2021
2022    fn clear_completed_plan_entries(&mut self, cx: &mut Context<Self>) {
2023        self.plan
2024            .entries
2025            .retain(|entry| !matches!(entry.status, acp::PlanEntryStatus::Completed));
2026        cx.notify();
2027    }
2028
2029    #[cfg(any(test, feature = "test-support"))]
2030    pub fn send_raw(
2031        &mut self,
2032        message: &str,
2033        cx: &mut Context<Self>,
2034    ) -> BoxFuture<'static, Result<Option<acp::PromptResponse>>> {
2035        self.send(vec![message.into()], cx)
2036    }
2037
2038    pub fn send(
2039        &mut self,
2040        message: Vec<acp::ContentBlock>,
2041        cx: &mut Context<Self>,
2042    ) -> BoxFuture<'static, Result<Option<acp::PromptResponse>>> {
2043        let block = ContentBlock::new_combined(
2044            message.clone(),
2045            self.project.read(cx).languages().clone(),
2046            self.project.read(cx).path_style(cx),
2047            cx,
2048        );
2049        let request = acp::PromptRequest::new(self.session_id.clone(), message.clone());
2050        let git_store = self.project.read(cx).git_store().clone();
2051
2052        let message_id = if self.connection.truncate(&self.session_id, cx).is_some() {
2053            Some(UserMessageId::new())
2054        } else {
2055            None
2056        };
2057
2058        self.run_turn(cx, async move |this, cx| {
2059            this.update(cx, |this, cx| {
2060                this.push_entry(
2061                    AgentThreadEntry::UserMessage(UserMessage {
2062                        id: message_id.clone(),
2063                        content: block,
2064                        chunks: message,
2065                        checkpoint: None,
2066                        indented: false,
2067                    }),
2068                    cx,
2069                );
2070            })
2071            .ok();
2072
2073            let old_checkpoint = git_store
2074                .update(cx, |git, cx| git.checkpoint(cx))
2075                .await
2076                .context("failed to get old checkpoint")
2077                .log_err();
2078            this.update(cx, |this, cx| {
2079                if let Some((_ix, message)) = this.last_user_message() {
2080                    message.checkpoint = old_checkpoint.map(|git_checkpoint| Checkpoint {
2081                        git_checkpoint,
2082                        show: false,
2083                    });
2084                }
2085                this.connection.prompt(message_id, request, cx)
2086            })?
2087            .await
2088        })
2089    }
2090
2091    pub fn can_retry(&self, cx: &App) -> bool {
2092        self.connection.retry(&self.session_id, cx).is_some()
2093    }
2094
2095    pub fn retry(
2096        &mut self,
2097        cx: &mut Context<Self>,
2098    ) -> BoxFuture<'static, Result<Option<acp::PromptResponse>>> {
2099        self.run_turn(cx, async move |this, cx| {
2100            this.update(cx, |this, cx| {
2101                this.connection
2102                    .retry(&this.session_id, cx)
2103                    .map(|retry| retry.run(cx))
2104            })?
2105            .context("retrying a session is not supported")?
2106            .await
2107        })
2108    }
2109
2110    fn run_turn(
2111        &mut self,
2112        cx: &mut Context<Self>,
2113        f: impl 'static + AsyncFnOnce(WeakEntity<Self>, &mut AsyncApp) -> Result<acp::PromptResponse>,
2114    ) -> BoxFuture<'static, Result<Option<acp::PromptResponse>>> {
2115        self.clear_completed_plan_entries(cx);
2116        self.had_error = false;
2117
2118        let (tx, rx) = oneshot::channel();
2119        let cancel_task = self.cancel(cx);
2120
2121        self.turn_id += 1;
2122        let turn_id = self.turn_id;
2123        self.running_turn = Some(RunningTurn {
2124            id: turn_id,
2125            send_task: cx.spawn(async move |this, cx| {
2126                cancel_task.await;
2127                tx.send(f(this, cx).await).ok();
2128            }),
2129        });
2130
2131        cx.spawn(async move |this, cx| {
2132            let response = rx.await;
2133
2134            this.update(cx, |this, cx| this.update_last_checkpoint(cx))?
2135                .await?;
2136
2137            this.update(cx, |this, cx| {
2138                if this.parent_session_id.is_none() {
2139                    this.project
2140                        .update(cx, |project, cx| project.set_agent_location(None, cx));
2141                }
2142                let Ok(response) = response else {
2143                    // tx dropped, just return
2144                    return Ok(None);
2145                };
2146
2147                let is_same_turn = this
2148                    .running_turn
2149                    .as_ref()
2150                    .is_some_and(|turn| turn_id == turn.id);
2151
2152                // If the user submitted a follow up message, running_turn might
2153                // already point to a different turn. Therefore we only want to
2154                // take the task if it's the same turn.
2155                if is_same_turn {
2156                    this.running_turn.take();
2157                }
2158
2159                match response {
2160                    Ok(r) => {
2161                        Self::flush_streaming_text(&mut this.streaming_text_buffer, cx);
2162
2163                        if r.stop_reason == acp::StopReason::MaxTokens {
2164                            this.had_error = true;
2165                            cx.emit(AcpThreadEvent::Error);
2166                            log::error!("Max tokens reached. Usage: {:?}", this.token_usage);
2167                            return Err(anyhow!("Max tokens reached"));
2168                        }
2169
2170                        let canceled = matches!(r.stop_reason, acp::StopReason::Cancelled);
2171                        if canceled {
2172                            this.mark_pending_tools_as_canceled();
2173                        }
2174
2175                        // Handle refusal - distinguish between user prompt and tool call refusals
2176                        if let acp::StopReason::Refusal = r.stop_reason {
2177                            this.had_error = true;
2178                            if let Some((user_msg_ix, _)) = this.last_user_message() {
2179                                // Check if there's a completed tool call with results after the last user message
2180                                // This indicates the refusal is in response to tool output, not the user's prompt
2181                                let has_completed_tool_call_after_user_msg =
2182                                    this.entries.iter().skip(user_msg_ix + 1).any(|entry| {
2183                                        if let AgentThreadEntry::ToolCall(tool_call) = entry {
2184                                            // Check if the tool call has completed and has output
2185                                            matches!(tool_call.status, ToolCallStatus::Completed)
2186                                                && tool_call.raw_output.is_some()
2187                                        } else {
2188                                            false
2189                                        }
2190                                    });
2191
2192                                if has_completed_tool_call_after_user_msg {
2193                                    // Refusal is due to tool output - don't truncate, just notify
2194                                    // The model refused based on what the tool returned
2195                                    cx.emit(AcpThreadEvent::Refusal);
2196                                } else {
2197                                    // User prompt was refused - truncate back to before the user message
2198                                    let range = user_msg_ix..this.entries.len();
2199                                    if range.start < range.end {
2200                                        this.entries.truncate(user_msg_ix);
2201                                        cx.emit(AcpThreadEvent::EntriesRemoved(range));
2202                                    }
2203                                    cx.emit(AcpThreadEvent::Refusal);
2204                                }
2205                            } else {
2206                                // No user message found, treat as general refusal
2207                                cx.emit(AcpThreadEvent::Refusal);
2208                            }
2209                        }
2210
2211                        cx.emit(AcpThreadEvent::Stopped(r.stop_reason));
2212                        Ok(Some(r))
2213                    }
2214                    Err(e) => {
2215                        Self::flush_streaming_text(&mut this.streaming_text_buffer, cx);
2216
2217                        this.had_error = true;
2218                        cx.emit(AcpThreadEvent::Error);
2219                        log::error!("Error in run turn: {:?}", e);
2220                        Err(e)
2221                    }
2222                }
2223            })?
2224        })
2225        .boxed()
2226    }
2227
2228    pub fn cancel(&mut self, cx: &mut Context<Self>) -> Task<()> {
2229        let Some(turn) = self.running_turn.take() else {
2230            return Task::ready(());
2231        };
2232        self.connection.cancel(&self.session_id, cx);
2233
2234        Self::flush_streaming_text(&mut self.streaming_text_buffer, cx);
2235        self.mark_pending_tools_as_canceled();
2236
2237        // Wait for the send task to complete
2238        cx.background_spawn(turn.send_task)
2239    }
2240
2241    fn mark_pending_tools_as_canceled(&mut self) {
2242        for entry in self.entries.iter_mut() {
2243            if let AgentThreadEntry::ToolCall(call) = entry {
2244                let cancel = matches!(
2245                    call.status,
2246                    ToolCallStatus::Pending
2247                        | ToolCallStatus::WaitingForConfirmation { .. }
2248                        | ToolCallStatus::InProgress
2249                );
2250
2251                if cancel {
2252                    call.status = ToolCallStatus::Canceled;
2253                }
2254            }
2255        }
2256    }
2257
2258    /// Restores the git working tree to the state at the given checkpoint (if one exists)
2259    pub fn restore_checkpoint(
2260        &mut self,
2261        id: UserMessageId,
2262        cx: &mut Context<Self>,
2263    ) -> Task<Result<()>> {
2264        let Some((_, message)) = self.user_message_mut(&id) else {
2265            return Task::ready(Err(anyhow!("message not found")));
2266        };
2267
2268        let checkpoint = message
2269            .checkpoint
2270            .as_ref()
2271            .map(|c| c.git_checkpoint.clone());
2272
2273        // Cancel any in-progress generation before restoring
2274        let cancel_task = self.cancel(cx);
2275        let rewind = self.rewind(id.clone(), cx);
2276        let git_store = self.project.read(cx).git_store().clone();
2277
2278        cx.spawn(async move |_, cx| {
2279            cancel_task.await;
2280            rewind.await?;
2281            if let Some(checkpoint) = checkpoint {
2282                git_store
2283                    .update(cx, |git, cx| git.restore_checkpoint(checkpoint, cx))
2284                    .await?;
2285            }
2286
2287            Ok(())
2288        })
2289    }
2290
2291    /// Rewinds this thread to before the entry at `index`, removing it and all
2292    /// subsequent entries while rejecting any action_log changes made from that point.
2293    /// Unlike `restore_checkpoint`, this method does not restore from git.
2294    pub fn rewind(&mut self, id: UserMessageId, cx: &mut Context<Self>) -> Task<Result<()>> {
2295        let Some(truncate) = self.connection.truncate(&self.session_id, cx) else {
2296            return Task::ready(Err(anyhow!("not supported")));
2297        };
2298
2299        Self::flush_streaming_text(&mut self.streaming_text_buffer, cx);
2300        let telemetry = ActionLogTelemetry::from(&*self);
2301        cx.spawn(async move |this, cx| {
2302            cx.update(|cx| truncate.run(id.clone(), cx)).await?;
2303            this.update(cx, |this, cx| {
2304                if let Some((ix, _)) = this.user_message_mut(&id) {
2305                    // Collect all terminals from entries that will be removed
2306                    let terminals_to_remove: Vec<acp::TerminalId> = this.entries[ix..]
2307                        .iter()
2308                        .flat_map(|entry| entry.terminals())
2309                        .filter_map(|terminal| terminal.read(cx).id().clone().into())
2310                        .collect();
2311
2312                    let range = ix..this.entries.len();
2313                    this.entries.truncate(ix);
2314                    cx.emit(AcpThreadEvent::EntriesRemoved(range));
2315
2316                    // Kill and remove the terminals
2317                    for terminal_id in terminals_to_remove {
2318                        if let Some(terminal) = this.terminals.remove(&terminal_id) {
2319                            terminal.update(cx, |terminal, cx| {
2320                                terminal.kill(cx);
2321                            });
2322                        }
2323                    }
2324                }
2325                this.action_log().update(cx, |action_log, cx| {
2326                    action_log.reject_all_edits(Some(telemetry), cx)
2327                })
2328            })?
2329            .await;
2330            Ok(())
2331        })
2332    }
2333
2334    fn update_last_checkpoint(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
2335        let git_store = self.project.read(cx).git_store().clone();
2336
2337        let Some((_, message)) = self.last_user_message() else {
2338            return Task::ready(Ok(()));
2339        };
2340        let Some(user_message_id) = message.id.clone() else {
2341            return Task::ready(Ok(()));
2342        };
2343        let Some(checkpoint) = message.checkpoint.as_ref() else {
2344            return Task::ready(Ok(()));
2345        };
2346        let old_checkpoint = checkpoint.git_checkpoint.clone();
2347
2348        let new_checkpoint = git_store.update(cx, |git, cx| git.checkpoint(cx));
2349        cx.spawn(async move |this, cx| {
2350            let Some(new_checkpoint) = new_checkpoint
2351                .await
2352                .context("failed to get new checkpoint")
2353                .log_err()
2354            else {
2355                return Ok(());
2356            };
2357
2358            let equal = git_store
2359                .update(cx, |git, cx| {
2360                    git.compare_checkpoints(old_checkpoint.clone(), new_checkpoint, cx)
2361                })
2362                .await
2363                .unwrap_or(true);
2364
2365            this.update(cx, |this, cx| {
2366                if let Some((ix, message)) = this.user_message_mut(&user_message_id) {
2367                    if let Some(checkpoint) = message.checkpoint.as_mut() {
2368                        checkpoint.show = !equal;
2369                        cx.emit(AcpThreadEvent::EntryUpdated(ix));
2370                    }
2371                }
2372            })?;
2373
2374            Ok(())
2375        })
2376    }
2377
2378    fn last_user_message(&mut self) -> Option<(usize, &mut UserMessage)> {
2379        self.entries
2380            .iter_mut()
2381            .enumerate()
2382            .rev()
2383            .find_map(|(ix, entry)| {
2384                if let AgentThreadEntry::UserMessage(message) = entry {
2385                    Some((ix, message))
2386                } else {
2387                    None
2388                }
2389            })
2390    }
2391
2392    fn user_message_mut(&mut self, id: &UserMessageId) -> Option<(usize, &mut UserMessage)> {
2393        self.entries.iter_mut().enumerate().find_map(|(ix, entry)| {
2394            if let AgentThreadEntry::UserMessage(message) = entry {
2395                if message.id.as_ref() == Some(id) {
2396                    Some((ix, message))
2397                } else {
2398                    None
2399                }
2400            } else {
2401                None
2402            }
2403        })
2404    }
2405
2406    pub fn read_text_file(
2407        &self,
2408        path: PathBuf,
2409        line: Option<u32>,
2410        limit: Option<u32>,
2411        reuse_shared_snapshot: bool,
2412        cx: &mut Context<Self>,
2413    ) -> Task<Result<String, acp::Error>> {
2414        // Args are 1-based, move to 0-based
2415        let line = line.unwrap_or_default().saturating_sub(1);
2416        let limit = limit.unwrap_or(u32::MAX);
2417        let project = self.project.clone();
2418        let action_log = self.action_log.clone();
2419        let should_update_agent_location = self.parent_session_id.is_none();
2420        cx.spawn(async move |this, cx| {
2421            let load = project.update(cx, |project, cx| {
2422                let path = project
2423                    .project_path_for_absolute_path(&path, cx)
2424                    .ok_or_else(|| {
2425                        acp::Error::resource_not_found(Some(path.display().to_string()))
2426                    })?;
2427                Ok::<_, acp::Error>(project.open_buffer(path, cx))
2428            })?;
2429
2430            let buffer = load.await?;
2431
2432            let snapshot = if reuse_shared_snapshot {
2433                this.read_with(cx, |this, _| {
2434                    this.shared_buffers.get(&buffer.clone()).cloned()
2435                })
2436                .log_err()
2437                .flatten()
2438            } else {
2439                None
2440            };
2441
2442            let snapshot = if let Some(snapshot) = snapshot {
2443                snapshot
2444            } else {
2445                action_log.update(cx, |action_log, cx| {
2446                    action_log.buffer_read(buffer.clone(), cx);
2447                });
2448
2449                let snapshot = buffer.update(cx, |buffer, _| buffer.snapshot());
2450                this.update(cx, |this, _| {
2451                    this.shared_buffers.insert(buffer.clone(), snapshot.clone());
2452                })?;
2453                snapshot
2454            };
2455
2456            let max_point = snapshot.max_point();
2457            let start_position = Point::new(line, 0);
2458
2459            if start_position > max_point {
2460                return Err(acp::Error::invalid_params().data(format!(
2461                    "Attempting to read beyond the end of the file, line {}:{}",
2462                    max_point.row + 1,
2463                    max_point.column
2464                )));
2465            }
2466
2467            let start = snapshot.anchor_before(start_position);
2468            let end = snapshot.anchor_before(Point::new(line.saturating_add(limit), 0));
2469
2470            if should_update_agent_location {
2471                project.update(cx, |project, cx| {
2472                    project.set_agent_location(
2473                        Some(AgentLocation {
2474                            buffer: buffer.downgrade(),
2475                            position: start,
2476                        }),
2477                        cx,
2478                    );
2479                });
2480            }
2481
2482            Ok(snapshot.text_for_range(start..end).collect::<String>())
2483        })
2484    }
2485
2486    pub fn write_text_file(
2487        &self,
2488        path: PathBuf,
2489        content: String,
2490        cx: &mut Context<Self>,
2491    ) -> Task<Result<()>> {
2492        let project = self.project.clone();
2493        let action_log = self.action_log.clone();
2494        let should_update_agent_location = self.parent_session_id.is_none();
2495        cx.spawn(async move |this, cx| {
2496            let load = project.update(cx, |project, cx| {
2497                let path = project
2498                    .project_path_for_absolute_path(&path, cx)
2499                    .context("invalid path")?;
2500                anyhow::Ok(project.open_buffer(path, cx))
2501            });
2502            let buffer = load?.await?;
2503            let snapshot = this.update(cx, |this, cx| {
2504                this.shared_buffers
2505                    .get(&buffer)
2506                    .cloned()
2507                    .unwrap_or_else(|| buffer.read(cx).snapshot())
2508            })?;
2509            let edits = cx
2510                .background_executor()
2511                .spawn(async move {
2512                    let old_text = snapshot.text();
2513                    text_diff(old_text.as_str(), &content)
2514                        .into_iter()
2515                        .map(|(range, replacement)| {
2516                            (snapshot.anchor_range_around(range), replacement)
2517                        })
2518                        .collect::<Vec<_>>()
2519                })
2520                .await;
2521
2522            if should_update_agent_location {
2523                project.update(cx, |project, cx| {
2524                    project.set_agent_location(
2525                        Some(AgentLocation {
2526                            buffer: buffer.downgrade(),
2527                            position: edits
2528                                .last()
2529                                .map(|(range, _)| range.end)
2530                                .unwrap_or(Anchor::min_for_buffer(buffer.read(cx).remote_id())),
2531                        }),
2532                        cx,
2533                    );
2534                });
2535            }
2536
2537            let format_on_save = cx.update(|cx| {
2538                action_log.update(cx, |action_log, cx| {
2539                    action_log.buffer_read(buffer.clone(), cx);
2540                });
2541
2542                let format_on_save = buffer.update(cx, |buffer, cx| {
2543                    buffer.edit(edits, None, cx);
2544
2545                    let settings = language::language_settings::language_settings(
2546                        buffer.language().map(|l| l.name()),
2547                        buffer.file(),
2548                        cx,
2549                    );
2550
2551                    settings.format_on_save != FormatOnSave::Off
2552                });
2553                action_log.update(cx, |action_log, cx| {
2554                    action_log.buffer_edited(buffer.clone(), cx);
2555                });
2556                format_on_save
2557            });
2558
2559            if format_on_save {
2560                let format_task = project.update(cx, |project, cx| {
2561                    project.format(
2562                        HashSet::from_iter([buffer.clone()]),
2563                        LspFormatTarget::Buffers,
2564                        false,
2565                        FormatTrigger::Save,
2566                        cx,
2567                    )
2568                });
2569                format_task.await.log_err();
2570
2571                action_log.update(cx, |action_log, cx| {
2572                    action_log.buffer_edited(buffer.clone(), cx);
2573                });
2574            }
2575
2576            project
2577                .update(cx, |project, cx| project.save_buffer(buffer, cx))
2578                .await
2579        })
2580    }
2581
2582    pub fn create_terminal(
2583        &self,
2584        command: String,
2585        args: Vec<String>,
2586        extra_env: Vec<acp::EnvVariable>,
2587        cwd: Option<PathBuf>,
2588        output_byte_limit: Option<u64>,
2589        cx: &mut Context<Self>,
2590    ) -> Task<Result<Entity<Terminal>>> {
2591        let env = match &cwd {
2592            Some(dir) => self.project.update(cx, |project, cx| {
2593                project.environment().update(cx, |env, cx| {
2594                    env.directory_environment(dir.as_path().into(), cx)
2595                })
2596            }),
2597            None => Task::ready(None).shared(),
2598        };
2599        let env = cx.spawn(async move |_, _| {
2600            let mut env = env.await.unwrap_or_default();
2601            // Disables paging for `git` and hopefully other commands
2602            env.insert("PAGER".into(), "".into());
2603            for var in extra_env {
2604                env.insert(var.name, var.value);
2605            }
2606            env
2607        });
2608
2609        let project = self.project.clone();
2610        let language_registry = project.read(cx).languages().clone();
2611        let is_windows = project.read(cx).path_style(cx).is_windows();
2612
2613        let terminal_id = acp::TerminalId::new(Uuid::new_v4().to_string());
2614        let terminal_task = cx.spawn({
2615            let terminal_id = terminal_id.clone();
2616            async move |_this, cx| {
2617                let env = env.await;
2618                let shell = project
2619                    .update(cx, |project, cx| {
2620                        project
2621                            .remote_client()
2622                            .and_then(|r| r.read(cx).default_system_shell())
2623                    })
2624                    .unwrap_or_else(|| get_default_system_shell_preferring_bash());
2625                let (task_command, task_args) =
2626                    ShellBuilder::new(&Shell::Program(shell), is_windows)
2627                        .redirect_stdin_to_dev_null()
2628                        .build(Some(command.clone()), &args);
2629                let terminal = project
2630                    .update(cx, |project, cx| {
2631                        project.create_terminal_task(
2632                            task::SpawnInTerminal {
2633                                command: Some(task_command),
2634                                args: task_args,
2635                                cwd: cwd.clone(),
2636                                env,
2637                                ..Default::default()
2638                            },
2639                            cx,
2640                        )
2641                    })
2642                    .await?;
2643
2644                anyhow::Ok(cx.new(|cx| {
2645                    Terminal::new(
2646                        terminal_id,
2647                        &format!("{} {}", command, args.join(" ")),
2648                        cwd,
2649                        output_byte_limit.map(|l| l as usize),
2650                        terminal,
2651                        language_registry,
2652                        cx,
2653                    )
2654                }))
2655            }
2656        });
2657
2658        cx.spawn(async move |this, cx| {
2659            let terminal = terminal_task.await?;
2660            this.update(cx, |this, _cx| {
2661                this.terminals.insert(terminal_id, terminal.clone());
2662                terminal
2663            })
2664        })
2665    }
2666
2667    pub fn kill_terminal(
2668        &mut self,
2669        terminal_id: acp::TerminalId,
2670        cx: &mut Context<Self>,
2671    ) -> Result<()> {
2672        self.terminals
2673            .get(&terminal_id)
2674            .context("Terminal not found")?
2675            .update(cx, |terminal, cx| {
2676                terminal.kill(cx);
2677            });
2678
2679        Ok(())
2680    }
2681
2682    pub fn release_terminal(
2683        &mut self,
2684        terminal_id: acp::TerminalId,
2685        cx: &mut Context<Self>,
2686    ) -> Result<()> {
2687        self.terminals
2688            .remove(&terminal_id)
2689            .context("Terminal not found")?
2690            .update(cx, |terminal, cx| {
2691                terminal.kill(cx);
2692            });
2693
2694        Ok(())
2695    }
2696
2697    pub fn terminal(&self, terminal_id: acp::TerminalId) -> Result<Entity<Terminal>> {
2698        self.terminals
2699            .get(&terminal_id)
2700            .context("Terminal not found")
2701            .cloned()
2702    }
2703
2704    pub fn to_markdown(&self, cx: &App) -> String {
2705        self.entries.iter().map(|e| e.to_markdown(cx)).collect()
2706    }
2707
2708    pub fn emit_load_error(&mut self, error: LoadError, cx: &mut Context<Self>) {
2709        cx.emit(AcpThreadEvent::LoadError(error));
2710    }
2711
2712    pub fn register_terminal_created(
2713        &mut self,
2714        terminal_id: acp::TerminalId,
2715        command_label: String,
2716        working_dir: Option<PathBuf>,
2717        output_byte_limit: Option<u64>,
2718        terminal: Entity<::terminal::Terminal>,
2719        cx: &mut Context<Self>,
2720    ) -> Entity<Terminal> {
2721        let language_registry = self.project.read(cx).languages().clone();
2722
2723        let entity = cx.new(|cx| {
2724            Terminal::new(
2725                terminal_id.clone(),
2726                &command_label,
2727                working_dir.clone(),
2728                output_byte_limit.map(|l| l as usize),
2729                terminal,
2730                language_registry,
2731                cx,
2732            )
2733        });
2734        self.terminals.insert(terminal_id.clone(), entity.clone());
2735        entity
2736    }
2737
2738    pub fn mark_as_subagent_output(&mut self, cx: &mut Context<Self>) {
2739        for entry in self.entries.iter_mut().rev() {
2740            if let AgentThreadEntry::AssistantMessage(assistant_message) = entry {
2741                assistant_message.is_subagent_output = true;
2742                cx.notify();
2743                return;
2744            }
2745        }
2746    }
2747
2748    pub fn on_terminal_provider_event(
2749        &mut self,
2750        event: TerminalProviderEvent,
2751        cx: &mut Context<Self>,
2752    ) {
2753        match event {
2754            TerminalProviderEvent::Created {
2755                terminal_id,
2756                label,
2757                cwd,
2758                output_byte_limit,
2759                terminal,
2760            } => {
2761                let entity = self.register_terminal_created(
2762                    terminal_id.clone(),
2763                    label,
2764                    cwd,
2765                    output_byte_limit,
2766                    terminal,
2767                    cx,
2768                );
2769
2770                if let Some(mut chunks) = self.pending_terminal_output.remove(&terminal_id) {
2771                    for data in chunks.drain(..) {
2772                        entity.update(cx, |term, cx| {
2773                            term.inner().update(cx, |inner, cx| {
2774                                inner.write_output(&data, cx);
2775                            })
2776                        });
2777                    }
2778                }
2779
2780                if let Some(_status) = self.pending_terminal_exit.remove(&terminal_id) {
2781                    entity.update(cx, |_term, cx| {
2782                        cx.notify();
2783                    });
2784                }
2785
2786                cx.notify();
2787            }
2788            TerminalProviderEvent::Output { terminal_id, data } => {
2789                if let Some(entity) = self.terminals.get(&terminal_id) {
2790                    entity.update(cx, |term, cx| {
2791                        term.inner().update(cx, |inner, cx| {
2792                            inner.write_output(&data, cx);
2793                        })
2794                    });
2795                } else {
2796                    self.pending_terminal_output
2797                        .entry(terminal_id)
2798                        .or_default()
2799                        .push(data);
2800                }
2801            }
2802            TerminalProviderEvent::TitleChanged { terminal_id, title } => {
2803                if let Some(entity) = self.terminals.get(&terminal_id) {
2804                    entity.update(cx, |term, cx| {
2805                        term.inner().update(cx, |inner, cx| {
2806                            inner.breadcrumb_text = title;
2807                            cx.emit(::terminal::Event::BreadcrumbsChanged);
2808                        })
2809                    });
2810                }
2811            }
2812            TerminalProviderEvent::Exit {
2813                terminal_id,
2814                status,
2815            } => {
2816                if let Some(entity) = self.terminals.get(&terminal_id) {
2817                    entity.update(cx, |_term, cx| {
2818                        cx.notify();
2819                    });
2820                } else {
2821                    self.pending_terminal_exit.insert(terminal_id, status);
2822                }
2823            }
2824        }
2825    }
2826}
2827
2828fn markdown_for_raw_output(
2829    raw_output: &serde_json::Value,
2830    language_registry: &Arc<LanguageRegistry>,
2831    cx: &mut App,
2832) -> Option<Entity<Markdown>> {
2833    match raw_output {
2834        serde_json::Value::Null => None,
2835        serde_json::Value::Bool(value) => Some(cx.new(|cx| {
2836            Markdown::new(
2837                value.to_string().into(),
2838                Some(language_registry.clone()),
2839                None,
2840                cx,
2841            )
2842        })),
2843        serde_json::Value::Number(value) => Some(cx.new(|cx| {
2844            Markdown::new(
2845                value.to_string().into(),
2846                Some(language_registry.clone()),
2847                None,
2848                cx,
2849            )
2850        })),
2851        serde_json::Value::String(value) => Some(cx.new(|cx| {
2852            Markdown::new(
2853                value.clone().into(),
2854                Some(language_registry.clone()),
2855                None,
2856                cx,
2857            )
2858        })),
2859        value => Some(cx.new(|cx| {
2860            let pretty_json = to_string_pretty(value).unwrap_or_else(|_| value.to_string());
2861
2862            Markdown::new(
2863                format!("```json\n{}\n```", pretty_json).into(),
2864                Some(language_registry.clone()),
2865                None,
2866                cx,
2867            )
2868        })),
2869    }
2870}
2871
2872#[cfg(test)]
2873mod tests {
2874    use super::*;
2875    use anyhow::anyhow;
2876    use futures::{channel::mpsc, future::LocalBoxFuture, select};
2877    use gpui::{App, AsyncApp, TestAppContext, WeakEntity};
2878    use indoc::indoc;
2879    use project::{AgentId, FakeFs, Fs};
2880    use rand::{distr, prelude::*};
2881    use serde_json::json;
2882    use settings::SettingsStore;
2883    use smol::stream::StreamExt as _;
2884    use std::{
2885        any::Any,
2886        cell::RefCell,
2887        path::Path,
2888        rc::Rc,
2889        sync::atomic::{AtomicBool, AtomicUsize, Ordering::SeqCst},
2890        time::Duration,
2891    };
2892    use util::{path, path_list::PathList};
2893
2894    fn init_test(cx: &mut TestAppContext) {
2895        env_logger::try_init().ok();
2896        cx.update(|cx| {
2897            let settings_store = SettingsStore::test(cx);
2898            cx.set_global(settings_store);
2899        });
2900    }
2901
2902    #[gpui::test]
2903    async fn test_terminal_output_buffered_before_created_renders(cx: &mut gpui::TestAppContext) {
2904        init_test(cx);
2905
2906        let fs = FakeFs::new(cx.executor());
2907        let project = Project::test(fs, [], cx).await;
2908        let connection = Rc::new(FakeAgentConnection::new());
2909        let thread = cx
2910            .update(|cx| {
2911                connection.new_session(
2912                    project,
2913                    PathList::new(&[std::path::Path::new(path!("/test"))]),
2914                    cx,
2915                )
2916            })
2917            .await
2918            .unwrap();
2919
2920        let terminal_id = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
2921
2922        // Send Output BEFORE Created - should be buffered by acp_thread
2923        thread.update(cx, |thread, cx| {
2924            thread.on_terminal_provider_event(
2925                TerminalProviderEvent::Output {
2926                    terminal_id: terminal_id.clone(),
2927                    data: b"hello buffered".to_vec(),
2928                },
2929                cx,
2930            );
2931        });
2932
2933        // Create a display-only terminal and then send Created
2934        let lower = cx.new(|cx| {
2935            let builder = ::terminal::TerminalBuilder::new_display_only(
2936                ::terminal::terminal_settings::CursorShape::default(),
2937                ::terminal::terminal_settings::AlternateScroll::On,
2938                None,
2939                0,
2940                cx.background_executor(),
2941                PathStyle::local(),
2942            )
2943            .unwrap();
2944            builder.subscribe(cx)
2945        });
2946
2947        thread.update(cx, |thread, cx| {
2948            thread.on_terminal_provider_event(
2949                TerminalProviderEvent::Created {
2950                    terminal_id: terminal_id.clone(),
2951                    label: "Buffered Test".to_string(),
2952                    cwd: None,
2953                    output_byte_limit: None,
2954                    terminal: lower.clone(),
2955                },
2956                cx,
2957            );
2958        });
2959
2960        // After Created, buffered Output should have been flushed into the renderer
2961        let content = thread.read_with(cx, |thread, cx| {
2962            let term = thread.terminal(terminal_id.clone()).unwrap();
2963            term.read_with(cx, |t, cx| t.inner().read(cx).get_content())
2964        });
2965
2966        assert!(
2967            content.contains("hello buffered"),
2968            "expected buffered output to render, got: {content}"
2969        );
2970    }
2971
2972    #[gpui::test]
2973    async fn test_terminal_output_and_exit_buffered_before_created(cx: &mut gpui::TestAppContext) {
2974        init_test(cx);
2975
2976        let fs = FakeFs::new(cx.executor());
2977        let project = Project::test(fs, [], cx).await;
2978        let connection = Rc::new(FakeAgentConnection::new());
2979        let thread = cx
2980            .update(|cx| {
2981                connection.new_session(
2982                    project,
2983                    PathList::new(&[std::path::Path::new(path!("/test"))]),
2984                    cx,
2985                )
2986            })
2987            .await
2988            .unwrap();
2989
2990        let terminal_id = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
2991
2992        // Send Output BEFORE Created
2993        thread.update(cx, |thread, cx| {
2994            thread.on_terminal_provider_event(
2995                TerminalProviderEvent::Output {
2996                    terminal_id: terminal_id.clone(),
2997                    data: b"pre-exit data".to_vec(),
2998                },
2999                cx,
3000            );
3001        });
3002
3003        // Send Exit BEFORE Created
3004        thread.update(cx, |thread, cx| {
3005            thread.on_terminal_provider_event(
3006                TerminalProviderEvent::Exit {
3007                    terminal_id: terminal_id.clone(),
3008                    status: acp::TerminalExitStatus::new().exit_code(0),
3009                },
3010                cx,
3011            );
3012        });
3013
3014        // Now create a display-only lower-level terminal and send Created
3015        let lower = cx.new(|cx| {
3016            let builder = ::terminal::TerminalBuilder::new_display_only(
3017                ::terminal::terminal_settings::CursorShape::default(),
3018                ::terminal::terminal_settings::AlternateScroll::On,
3019                None,
3020                0,
3021                cx.background_executor(),
3022                PathStyle::local(),
3023            )
3024            .unwrap();
3025            builder.subscribe(cx)
3026        });
3027
3028        thread.update(cx, |thread, cx| {
3029            thread.on_terminal_provider_event(
3030                TerminalProviderEvent::Created {
3031                    terminal_id: terminal_id.clone(),
3032                    label: "Buffered Exit Test".to_string(),
3033                    cwd: None,
3034                    output_byte_limit: None,
3035                    terminal: lower.clone(),
3036                },
3037                cx,
3038            );
3039        });
3040
3041        // Output should be present after Created (flushed from buffer)
3042        let content = thread.read_with(cx, |thread, cx| {
3043            let term = thread.terminal(terminal_id.clone()).unwrap();
3044            term.read_with(cx, |t, cx| t.inner().read(cx).get_content())
3045        });
3046
3047        assert!(
3048            content.contains("pre-exit data"),
3049            "expected pre-exit data to render, got: {content}"
3050        );
3051    }
3052
3053    /// Test that killing a terminal via Terminal::kill properly:
3054    /// 1. Causes wait_for_exit to complete (doesn't hang forever)
3055    /// 2. The underlying terminal still has the output that was written before the kill
3056    ///
3057    /// This test verifies that the fix to kill_active_task (which now also kills
3058    /// the shell process in addition to the foreground process) properly allows
3059    /// wait_for_exit to complete instead of hanging indefinitely.
3060    #[cfg(unix)]
3061    #[gpui::test]
3062    async fn test_terminal_kill_allows_wait_for_exit_to_complete(cx: &mut gpui::TestAppContext) {
3063        use std::collections::HashMap;
3064        use task::Shell;
3065        use util::shell_builder::ShellBuilder;
3066
3067        init_test(cx);
3068        cx.executor().allow_parking();
3069
3070        let fs = FakeFs::new(cx.executor());
3071        let project = Project::test(fs, [], cx).await;
3072        let connection = Rc::new(FakeAgentConnection::new());
3073        let thread = cx
3074            .update(|cx| {
3075                connection.new_session(
3076                    project.clone(),
3077                    PathList::new(&[Path::new(path!("/test"))]),
3078                    cx,
3079                )
3080            })
3081            .await
3082            .unwrap();
3083
3084        let terminal_id = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
3085
3086        // Create a real PTY terminal that runs a command which prints output then sleeps
3087        // We use printf instead of echo and chain with && sleep to ensure proper execution
3088        let (completion_tx, _completion_rx) = smol::channel::unbounded();
3089        let (program, args) = ShellBuilder::new(&Shell::System, false).build(
3090            Some("printf 'output_before_kill\\n' && sleep 60".to_owned()),
3091            &[],
3092        );
3093
3094        let builder = cx
3095            .update(|cx| {
3096                ::terminal::TerminalBuilder::new(
3097                    None,
3098                    None,
3099                    task::Shell::WithArguments {
3100                        program,
3101                        args,
3102                        title_override: None,
3103                    },
3104                    HashMap::default(),
3105                    ::terminal::terminal_settings::CursorShape::default(),
3106                    ::terminal::terminal_settings::AlternateScroll::On,
3107                    None,
3108                    vec![],
3109                    0,
3110                    false,
3111                    0,
3112                    Some(completion_tx),
3113                    cx,
3114                    vec![],
3115                    PathStyle::local(),
3116                )
3117            })
3118            .await
3119            .unwrap();
3120
3121        let lower_terminal = cx.new(|cx| builder.subscribe(cx));
3122
3123        // Create the acp_thread Terminal wrapper
3124        thread.update(cx, |thread, cx| {
3125            thread.on_terminal_provider_event(
3126                TerminalProviderEvent::Created {
3127                    terminal_id: terminal_id.clone(),
3128                    label: "printf output_before_kill && sleep 60".to_string(),
3129                    cwd: None,
3130                    output_byte_limit: None,
3131                    terminal: lower_terminal.clone(),
3132                },
3133                cx,
3134            );
3135        });
3136
3137        // Wait for the printf command to execute and produce output
3138        // Use real time since parking is enabled
3139        cx.executor().timer(Duration::from_millis(500)).await;
3140
3141        // Get the acp_thread Terminal and kill it
3142        let wait_for_exit = thread.update(cx, |thread, cx| {
3143            let term = thread.terminals.get(&terminal_id).unwrap();
3144            let wait_for_exit = term.read(cx).wait_for_exit();
3145            term.update(cx, |term, cx| {
3146                term.kill(cx);
3147            });
3148            wait_for_exit
3149        });
3150
3151        // KEY ASSERTION: wait_for_exit should complete within a reasonable time (not hang).
3152        // Before the fix to kill_active_task, this would hang forever because
3153        // only the foreground process was killed, not the shell, so the PTY
3154        // child never exited and wait_for_completed_task never completed.
3155        let exit_result = futures::select! {
3156            result = futures::FutureExt::fuse(wait_for_exit) => Some(result),
3157            _ = futures::FutureExt::fuse(cx.background_executor.timer(Duration::from_secs(5))) => None,
3158        };
3159
3160        assert!(
3161            exit_result.is_some(),
3162            "wait_for_exit should complete after kill, but it timed out. \
3163            This indicates kill_active_task is not properly killing the shell process."
3164        );
3165
3166        // Give the system a chance to process any pending updates
3167        cx.run_until_parked();
3168
3169        // Verify that the underlying terminal still has the output that was
3170        // written before the kill. This verifies that killing doesn't lose output.
3171        let inner_content = thread.read_with(cx, |thread, cx| {
3172            let term = thread.terminals.get(&terminal_id).unwrap();
3173            term.read(cx).inner().read(cx).get_content()
3174        });
3175
3176        assert!(
3177            inner_content.contains("output_before_kill"),
3178            "Underlying terminal should contain output from before kill, got: {}",
3179            inner_content
3180        );
3181    }
3182
3183    #[gpui::test]
3184    async fn test_push_user_content_block(cx: &mut gpui::TestAppContext) {
3185        init_test(cx);
3186
3187        let fs = FakeFs::new(cx.executor());
3188        let project = Project::test(fs, [], cx).await;
3189        let connection = Rc::new(FakeAgentConnection::new());
3190        let thread = cx
3191            .update(|cx| {
3192                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
3193            })
3194            .await
3195            .unwrap();
3196
3197        // Test creating a new user message
3198        thread.update(cx, |thread, cx| {
3199            thread.push_user_content_block(None, "Hello, ".into(), cx);
3200        });
3201
3202        thread.update(cx, |thread, cx| {
3203            assert_eq!(thread.entries.len(), 1);
3204            if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
3205                assert_eq!(user_msg.id, None);
3206                assert_eq!(user_msg.content.to_markdown(cx), "Hello, ");
3207            } else {
3208                panic!("Expected UserMessage");
3209            }
3210        });
3211
3212        // Test appending to existing user message
3213        let message_1_id = UserMessageId::new();
3214        thread.update(cx, |thread, cx| {
3215            thread.push_user_content_block(Some(message_1_id.clone()), "world!".into(), cx);
3216        });
3217
3218        thread.update(cx, |thread, cx| {
3219            assert_eq!(thread.entries.len(), 1);
3220            if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
3221                assert_eq!(user_msg.id, Some(message_1_id));
3222                assert_eq!(user_msg.content.to_markdown(cx), "Hello, world!");
3223            } else {
3224                panic!("Expected UserMessage");
3225            }
3226        });
3227
3228        // Test creating new user message after assistant message
3229        thread.update(cx, |thread, cx| {
3230            thread.push_assistant_content_block("Assistant response".into(), false, cx);
3231        });
3232
3233        let message_2_id = UserMessageId::new();
3234        thread.update(cx, |thread, cx| {
3235            thread.push_user_content_block(
3236                Some(message_2_id.clone()),
3237                "New user message".into(),
3238                cx,
3239            );
3240        });
3241
3242        thread.update(cx, |thread, cx| {
3243            assert_eq!(thread.entries.len(), 3);
3244            if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[2] {
3245                assert_eq!(user_msg.id, Some(message_2_id));
3246                assert_eq!(user_msg.content.to_markdown(cx), "New user message");
3247            } else {
3248                panic!("Expected UserMessage at index 2");
3249            }
3250        });
3251    }
3252
3253    #[gpui::test]
3254    async fn test_thinking_concatenation(cx: &mut gpui::TestAppContext) {
3255        init_test(cx);
3256
3257        let fs = FakeFs::new(cx.executor());
3258        let project = Project::test(fs, [], cx).await;
3259        let connection = Rc::new(FakeAgentConnection::new().on_user_message(
3260            |_, thread, mut cx| {
3261                async move {
3262                    thread.update(&mut cx, |thread, cx| {
3263                        thread
3264                            .handle_session_update(
3265                                acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk::new(
3266                                    "Thinking ".into(),
3267                                )),
3268                                cx,
3269                            )
3270                            .unwrap();
3271                        thread
3272                            .handle_session_update(
3273                                acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk::new(
3274                                    "hard!".into(),
3275                                )),
3276                                cx,
3277                            )
3278                            .unwrap();
3279                    })?;
3280                    Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3281                }
3282                .boxed_local()
3283            },
3284        ));
3285
3286        let thread = cx
3287            .update(|cx| {
3288                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
3289            })
3290            .await
3291            .unwrap();
3292
3293        thread
3294            .update(cx, |thread, cx| thread.send_raw("Hello from Zed!", cx))
3295            .await
3296            .unwrap();
3297
3298        let output = thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
3299        assert_eq!(
3300            output,
3301            indoc! {r#"
3302            ## User
3303
3304            Hello from Zed!
3305
3306            ## Assistant
3307
3308            <thinking>
3309            Thinking hard!
3310            </thinking>
3311
3312            "#}
3313        );
3314    }
3315
3316    #[gpui::test]
3317    async fn test_edits_concurrently_to_user(cx: &mut TestAppContext) {
3318        init_test(cx);
3319
3320        let fs = FakeFs::new(cx.executor());
3321        fs.insert_tree(path!("/tmp"), json!({"foo": "one\ntwo\nthree\n"}))
3322            .await;
3323        let project = Project::test(fs.clone(), [], cx).await;
3324        let (read_file_tx, read_file_rx) = oneshot::channel::<()>();
3325        let read_file_tx = Rc::new(RefCell::new(Some(read_file_tx)));
3326        let connection = Rc::new(FakeAgentConnection::new().on_user_message(
3327            move |_, thread, mut cx| {
3328                let read_file_tx = read_file_tx.clone();
3329                async move {
3330                    let content = thread
3331                        .update(&mut cx, |thread, cx| {
3332                            thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
3333                        })
3334                        .unwrap()
3335                        .await
3336                        .unwrap();
3337                    assert_eq!(content, "one\ntwo\nthree\n");
3338                    read_file_tx.take().unwrap().send(()).unwrap();
3339                    thread
3340                        .update(&mut cx, |thread, cx| {
3341                            thread.write_text_file(
3342                                path!("/tmp/foo").into(),
3343                                "one\ntwo\nthree\nfour\nfive\n".to_string(),
3344                                cx,
3345                            )
3346                        })
3347                        .unwrap()
3348                        .await
3349                        .unwrap();
3350                    Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3351                }
3352                .boxed_local()
3353            },
3354        ));
3355
3356        let (worktree, pathbuf) = project
3357            .update(cx, |project, cx| {
3358                project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
3359            })
3360            .await
3361            .unwrap();
3362        let buffer = project
3363            .update(cx, |project, cx| {
3364                project.open_buffer((worktree.read(cx).id(), pathbuf), cx)
3365            })
3366            .await
3367            .unwrap();
3368
3369        let thread = cx
3370            .update(|cx| {
3371                connection.new_session(project, PathList::new(&[Path::new(path!("/tmp"))]), cx)
3372            })
3373            .await
3374            .unwrap();
3375
3376        let request = thread.update(cx, |thread, cx| {
3377            thread.send_raw("Extend the count in /tmp/foo", cx)
3378        });
3379        read_file_rx.await.ok();
3380        buffer.update(cx, |buffer, cx| {
3381            buffer.edit([(0..0, "zero\n".to_string())], None, cx);
3382        });
3383        cx.run_until_parked();
3384        assert_eq!(
3385            buffer.read_with(cx, |buffer, _| buffer.text()),
3386            "zero\none\ntwo\nthree\nfour\nfive\n"
3387        );
3388        assert_eq!(
3389            String::from_utf8(fs.read_file_sync(path!("/tmp/foo")).unwrap()).unwrap(),
3390            "zero\none\ntwo\nthree\nfour\nfive\n"
3391        );
3392        request.await.unwrap();
3393    }
3394
3395    #[gpui::test]
3396    async fn test_reading_from_line(cx: &mut TestAppContext) {
3397        init_test(cx);
3398
3399        let fs = FakeFs::new(cx.executor());
3400        fs.insert_tree(path!("/tmp"), json!({"foo": "one\ntwo\nthree\nfour\n"}))
3401            .await;
3402        let project = Project::test(fs.clone(), [], cx).await;
3403        project
3404            .update(cx, |project, cx| {
3405                project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
3406            })
3407            .await
3408            .unwrap();
3409
3410        let connection = Rc::new(FakeAgentConnection::new());
3411
3412        let thread = cx
3413            .update(|cx| {
3414                connection.new_session(project, PathList::new(&[Path::new(path!("/tmp"))]), cx)
3415            })
3416            .await
3417            .unwrap();
3418
3419        // Whole file
3420        let content = thread
3421            .update(cx, |thread, cx| {
3422                thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
3423            })
3424            .await
3425            .unwrap();
3426
3427        assert_eq!(content, "one\ntwo\nthree\nfour\n");
3428
3429        // Only start line
3430        let content = thread
3431            .update(cx, |thread, cx| {
3432                thread.read_text_file(path!("/tmp/foo").into(), Some(3), None, false, cx)
3433            })
3434            .await
3435            .unwrap();
3436
3437        assert_eq!(content, "three\nfour\n");
3438
3439        // Only limit
3440        let content = thread
3441            .update(cx, |thread, cx| {
3442                thread.read_text_file(path!("/tmp/foo").into(), None, Some(2), false, cx)
3443            })
3444            .await
3445            .unwrap();
3446
3447        assert_eq!(content, "one\ntwo\n");
3448
3449        // Range
3450        let content = thread
3451            .update(cx, |thread, cx| {
3452                thread.read_text_file(path!("/tmp/foo").into(), Some(2), Some(2), false, cx)
3453            })
3454            .await
3455            .unwrap();
3456
3457        assert_eq!(content, "two\nthree\n");
3458
3459        // Invalid
3460        let err = thread
3461            .update(cx, |thread, cx| {
3462                thread.read_text_file(path!("/tmp/foo").into(), Some(6), Some(2), false, cx)
3463            })
3464            .await
3465            .unwrap_err();
3466
3467        assert_eq!(
3468            err.to_string(),
3469            "Invalid params: \"Attempting to read beyond the end of the file, line 5:0\""
3470        );
3471    }
3472
3473    #[gpui::test]
3474    async fn test_reading_empty_file(cx: &mut TestAppContext) {
3475        init_test(cx);
3476
3477        let fs = FakeFs::new(cx.executor());
3478        fs.insert_tree(path!("/tmp"), json!({"foo": ""})).await;
3479        let project = Project::test(fs.clone(), [], cx).await;
3480        project
3481            .update(cx, |project, cx| {
3482                project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
3483            })
3484            .await
3485            .unwrap();
3486
3487        let connection = Rc::new(FakeAgentConnection::new());
3488
3489        let thread = cx
3490            .update(|cx| {
3491                connection.new_session(project, PathList::new(&[Path::new(path!("/tmp"))]), cx)
3492            })
3493            .await
3494            .unwrap();
3495
3496        // Whole file
3497        let content = thread
3498            .update(cx, |thread, cx| {
3499                thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
3500            })
3501            .await
3502            .unwrap();
3503
3504        assert_eq!(content, "");
3505
3506        // Only start line
3507        let content = thread
3508            .update(cx, |thread, cx| {
3509                thread.read_text_file(path!("/tmp/foo").into(), Some(1), None, false, cx)
3510            })
3511            .await
3512            .unwrap();
3513
3514        assert_eq!(content, "");
3515
3516        // Only limit
3517        let content = thread
3518            .update(cx, |thread, cx| {
3519                thread.read_text_file(path!("/tmp/foo").into(), None, Some(2), false, cx)
3520            })
3521            .await
3522            .unwrap();
3523
3524        assert_eq!(content, "");
3525
3526        // Range
3527        let content = thread
3528            .update(cx, |thread, cx| {
3529                thread.read_text_file(path!("/tmp/foo").into(), Some(1), Some(1), false, cx)
3530            })
3531            .await
3532            .unwrap();
3533
3534        assert_eq!(content, "");
3535
3536        // Invalid
3537        let err = thread
3538            .update(cx, |thread, cx| {
3539                thread.read_text_file(path!("/tmp/foo").into(), Some(5), Some(2), false, cx)
3540            })
3541            .await
3542            .unwrap_err();
3543
3544        assert_eq!(
3545            err.to_string(),
3546            "Invalid params: \"Attempting to read beyond the end of the file, line 1:0\""
3547        );
3548    }
3549    #[gpui::test]
3550    async fn test_reading_non_existing_file(cx: &mut TestAppContext) {
3551        init_test(cx);
3552
3553        let fs = FakeFs::new(cx.executor());
3554        fs.insert_tree(path!("/tmp"), json!({})).await;
3555        let project = Project::test(fs.clone(), [], cx).await;
3556        project
3557            .update(cx, |project, cx| {
3558                project.find_or_create_worktree(path!("/tmp"), true, cx)
3559            })
3560            .await
3561            .unwrap();
3562
3563        let connection = Rc::new(FakeAgentConnection::new());
3564
3565        let thread = cx
3566            .update(|cx| {
3567                connection.new_session(project, PathList::new(&[Path::new(path!("/tmp"))]), cx)
3568            })
3569            .await
3570            .unwrap();
3571
3572        // Out of project file
3573        let err = thread
3574            .update(cx, |thread, cx| {
3575                thread.read_text_file(path!("/foo").into(), None, None, false, cx)
3576            })
3577            .await
3578            .unwrap_err();
3579
3580        assert_eq!(err.code, acp::ErrorCode::ResourceNotFound);
3581    }
3582
3583    #[gpui::test]
3584    async fn test_succeeding_canceled_toolcall(cx: &mut TestAppContext) {
3585        init_test(cx);
3586
3587        let fs = FakeFs::new(cx.executor());
3588        let project = Project::test(fs, [], cx).await;
3589        let id = acp::ToolCallId::new("test");
3590
3591        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3592            let id = id.clone();
3593            move |_, thread, mut cx| {
3594                let id = id.clone();
3595                async move {
3596                    thread
3597                        .update(&mut cx, |thread, cx| {
3598                            thread.handle_session_update(
3599                                acp::SessionUpdate::ToolCall(
3600                                    acp::ToolCall::new(id.clone(), "Label")
3601                                        .kind(acp::ToolKind::Fetch)
3602                                        .status(acp::ToolCallStatus::InProgress),
3603                                ),
3604                                cx,
3605                            )
3606                        })
3607                        .unwrap()
3608                        .unwrap();
3609                    Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3610                }
3611                .boxed_local()
3612            }
3613        }));
3614
3615        let thread = cx
3616            .update(|cx| {
3617                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
3618            })
3619            .await
3620            .unwrap();
3621
3622        let request = thread.update(cx, |thread, cx| {
3623            thread.send_raw("Fetch https://example.com", cx)
3624        });
3625
3626        run_until_first_tool_call(&thread, cx).await;
3627
3628        thread.read_with(cx, |thread, _| {
3629            assert!(matches!(
3630                thread.entries[1],
3631                AgentThreadEntry::ToolCall(ToolCall {
3632                    status: ToolCallStatus::InProgress,
3633                    ..
3634                })
3635            ));
3636        });
3637
3638        thread.update(cx, |thread, cx| thread.cancel(cx)).await;
3639
3640        thread.read_with(cx, |thread, _| {
3641            assert!(matches!(
3642                &thread.entries[1],
3643                AgentThreadEntry::ToolCall(ToolCall {
3644                    status: ToolCallStatus::Canceled,
3645                    ..
3646                })
3647            ));
3648        });
3649
3650        thread
3651            .update(cx, |thread, cx| {
3652                thread.handle_session_update(
3653                    acp::SessionUpdate::ToolCallUpdate(acp::ToolCallUpdate::new(
3654                        id,
3655                        acp::ToolCallUpdateFields::new().status(acp::ToolCallStatus::Completed),
3656                    )),
3657                    cx,
3658                )
3659            })
3660            .unwrap();
3661
3662        request.await.unwrap();
3663
3664        thread.read_with(cx, |thread, _| {
3665            assert!(matches!(
3666                thread.entries[1],
3667                AgentThreadEntry::ToolCall(ToolCall {
3668                    status: ToolCallStatus::Completed,
3669                    ..
3670                })
3671            ));
3672        });
3673    }
3674
3675    #[gpui::test]
3676    async fn test_no_pending_edits_if_tool_calls_are_completed(cx: &mut TestAppContext) {
3677        init_test(cx);
3678        let fs = FakeFs::new(cx.background_executor.clone());
3679        fs.insert_tree(path!("/test"), json!({})).await;
3680        let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
3681
3682        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3683            move |_, thread, mut cx| {
3684                async move {
3685                    thread
3686                        .update(&mut cx, |thread, cx| {
3687                            thread.handle_session_update(
3688                                acp::SessionUpdate::ToolCall(
3689                                    acp::ToolCall::new("test", "Label")
3690                                        .kind(acp::ToolKind::Edit)
3691                                        .status(acp::ToolCallStatus::Completed)
3692                                        .content(vec![acp::ToolCallContent::Diff(acp::Diff::new(
3693                                            "/test/test.txt",
3694                                            "foo",
3695                                        ))]),
3696                                ),
3697                                cx,
3698                            )
3699                        })
3700                        .unwrap()
3701                        .unwrap();
3702                    Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3703                }
3704                .boxed_local()
3705            }
3706        }));
3707
3708        let thread = cx
3709            .update(|cx| {
3710                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
3711            })
3712            .await
3713            .unwrap();
3714
3715        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Hi".into()], cx)))
3716            .await
3717            .unwrap();
3718
3719        assert!(cx.read(|cx| !thread.read(cx).has_pending_edit_tool_calls()));
3720    }
3721
3722    #[gpui::test(iterations = 10)]
3723    async fn test_checkpoints(cx: &mut TestAppContext) {
3724        init_test(cx);
3725        let fs = FakeFs::new(cx.background_executor.clone());
3726        fs.insert_tree(
3727            path!("/test"),
3728            json!({
3729                ".git": {}
3730            }),
3731        )
3732        .await;
3733        let project = Project::test(fs.clone(), [path!("/test").as_ref()], cx).await;
3734
3735        let simulate_changes = Arc::new(AtomicBool::new(true));
3736        let next_filename = Arc::new(AtomicUsize::new(0));
3737        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3738            let simulate_changes = simulate_changes.clone();
3739            let next_filename = next_filename.clone();
3740            let fs = fs.clone();
3741            move |request, thread, mut cx| {
3742                let fs = fs.clone();
3743                let simulate_changes = simulate_changes.clone();
3744                let next_filename = next_filename.clone();
3745                async move {
3746                    if simulate_changes.load(SeqCst) {
3747                        let filename = format!("/test/file-{}", next_filename.fetch_add(1, SeqCst));
3748                        fs.write(Path::new(&filename), b"").await?;
3749                    }
3750
3751                    let acp::ContentBlock::Text(content) = &request.prompt[0] else {
3752                        panic!("expected text content block");
3753                    };
3754                    thread.update(&mut cx, |thread, cx| {
3755                        thread
3756                            .handle_session_update(
3757                                acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk::new(
3758                                    content.text.to_uppercase().into(),
3759                                )),
3760                                cx,
3761                            )
3762                            .unwrap();
3763                    })?;
3764                    Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3765                }
3766                .boxed_local()
3767            }
3768        }));
3769        let thread = cx
3770            .update(|cx| {
3771                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
3772            })
3773            .await
3774            .unwrap();
3775
3776        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Lorem".into()], cx)))
3777            .await
3778            .unwrap();
3779        thread.read_with(cx, |thread, cx| {
3780            assert_eq!(
3781                thread.to_markdown(cx),
3782                indoc! {"
3783                    ## User (checkpoint)
3784
3785                    Lorem
3786
3787                    ## Assistant
3788
3789                    LOREM
3790
3791                "}
3792            );
3793        });
3794        assert_eq!(fs.files(), vec![Path::new(path!("/test/file-0"))]);
3795
3796        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["ipsum".into()], cx)))
3797            .await
3798            .unwrap();
3799        thread.read_with(cx, |thread, cx| {
3800            assert_eq!(
3801                thread.to_markdown(cx),
3802                indoc! {"
3803                    ## User (checkpoint)
3804
3805                    Lorem
3806
3807                    ## Assistant
3808
3809                    LOREM
3810
3811                    ## User (checkpoint)
3812
3813                    ipsum
3814
3815                    ## Assistant
3816
3817                    IPSUM
3818
3819                "}
3820            );
3821        });
3822        assert_eq!(
3823            fs.files(),
3824            vec![
3825                Path::new(path!("/test/file-0")),
3826                Path::new(path!("/test/file-1"))
3827            ]
3828        );
3829
3830        // Checkpoint isn't stored when there are no changes.
3831        simulate_changes.store(false, SeqCst);
3832        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["dolor".into()], cx)))
3833            .await
3834            .unwrap();
3835        thread.read_with(cx, |thread, cx| {
3836            assert_eq!(
3837                thread.to_markdown(cx),
3838                indoc! {"
3839                    ## User (checkpoint)
3840
3841                    Lorem
3842
3843                    ## Assistant
3844
3845                    LOREM
3846
3847                    ## User (checkpoint)
3848
3849                    ipsum
3850
3851                    ## Assistant
3852
3853                    IPSUM
3854
3855                    ## User
3856
3857                    dolor
3858
3859                    ## Assistant
3860
3861                    DOLOR
3862
3863                "}
3864            );
3865        });
3866        assert_eq!(
3867            fs.files(),
3868            vec![
3869                Path::new(path!("/test/file-0")),
3870                Path::new(path!("/test/file-1"))
3871            ]
3872        );
3873
3874        // Rewinding the conversation truncates the history and restores the checkpoint.
3875        thread
3876            .update(cx, |thread, cx| {
3877                let AgentThreadEntry::UserMessage(message) = &thread.entries[2] else {
3878                    panic!("unexpected entries {:?}", thread.entries)
3879                };
3880                thread.restore_checkpoint(message.id.clone().unwrap(), cx)
3881            })
3882            .await
3883            .unwrap();
3884        thread.read_with(cx, |thread, cx| {
3885            assert_eq!(
3886                thread.to_markdown(cx),
3887                indoc! {"
3888                    ## User (checkpoint)
3889
3890                    Lorem
3891
3892                    ## Assistant
3893
3894                    LOREM
3895
3896                "}
3897            );
3898        });
3899        assert_eq!(fs.files(), vec![Path::new(path!("/test/file-0"))]);
3900    }
3901
3902    #[gpui::test]
3903    async fn test_tool_result_refusal(cx: &mut TestAppContext) {
3904        use std::sync::atomic::AtomicUsize;
3905        init_test(cx);
3906
3907        let fs = FakeFs::new(cx.executor());
3908        let project = Project::test(fs, None, cx).await;
3909
3910        // Create a connection that simulates refusal after tool result
3911        let prompt_count = Arc::new(AtomicUsize::new(0));
3912        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3913            let prompt_count = prompt_count.clone();
3914            move |_request, thread, mut cx| {
3915                let count = prompt_count.fetch_add(1, SeqCst);
3916                async move {
3917                    if count == 0 {
3918                        // First prompt: Generate a tool call with result
3919                        thread.update(&mut cx, |thread, cx| {
3920                            thread
3921                                .handle_session_update(
3922                                    acp::SessionUpdate::ToolCall(
3923                                        acp::ToolCall::new("tool1", "Test Tool")
3924                                            .kind(acp::ToolKind::Fetch)
3925                                            .status(acp::ToolCallStatus::Completed)
3926                                            .raw_input(serde_json::json!({"query": "test"}))
3927                                            .raw_output(serde_json::json!({"result": "inappropriate content"})),
3928                                    ),
3929                                    cx,
3930                                )
3931                                .unwrap();
3932                        })?;
3933
3934                        // Now return refusal because of the tool result
3935                        Ok(acp::PromptResponse::new(acp::StopReason::Refusal))
3936                    } else {
3937                        Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3938                    }
3939                }
3940                .boxed_local()
3941            }
3942        }));
3943
3944        let thread = cx
3945            .update(|cx| {
3946                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
3947            })
3948            .await
3949            .unwrap();
3950
3951        // Track if we see a Refusal event
3952        let saw_refusal_event = Arc::new(std::sync::Mutex::new(false));
3953        let saw_refusal_event_captured = saw_refusal_event.clone();
3954        thread.update(cx, |_thread, cx| {
3955            cx.subscribe(
3956                &thread,
3957                move |_thread, _event_thread, event: &AcpThreadEvent, _cx| {
3958                    if matches!(event, AcpThreadEvent::Refusal) {
3959                        *saw_refusal_event_captured.lock().unwrap() = true;
3960                    }
3961                },
3962            )
3963            .detach();
3964        });
3965
3966        // Send a user message - this will trigger tool call and then refusal
3967        let send_task = thread.update(cx, |thread, cx| thread.send(vec!["Hello".into()], cx));
3968        cx.background_executor.spawn(send_task).detach();
3969        cx.run_until_parked();
3970
3971        // Verify that:
3972        // 1. A Refusal event WAS emitted (because it's a tool result refusal, not user prompt)
3973        // 2. The user message was NOT truncated
3974        assert!(
3975            *saw_refusal_event.lock().unwrap(),
3976            "Refusal event should be emitted for tool result refusals"
3977        );
3978
3979        thread.read_with(cx, |thread, _| {
3980            let entries = thread.entries();
3981            assert!(entries.len() >= 2, "Should have user message and tool call");
3982
3983            // Verify user message is still there
3984            assert!(
3985                matches!(entries[0], AgentThreadEntry::UserMessage(_)),
3986                "User message should not be truncated"
3987            );
3988
3989            // Verify tool call is there with result
3990            if let AgentThreadEntry::ToolCall(tool_call) = &entries[1] {
3991                assert!(
3992                    tool_call.raw_output.is_some(),
3993                    "Tool call should have output"
3994                );
3995            } else {
3996                panic!("Expected tool call at index 1");
3997            }
3998        });
3999    }
4000
4001    #[gpui::test]
4002    async fn test_user_prompt_refusal_emits_event(cx: &mut TestAppContext) {
4003        init_test(cx);
4004
4005        let fs = FakeFs::new(cx.executor());
4006        let project = Project::test(fs, None, cx).await;
4007
4008        let refuse_next = Arc::new(AtomicBool::new(false));
4009        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
4010            let refuse_next = refuse_next.clone();
4011            move |_request, _thread, _cx| {
4012                if refuse_next.load(SeqCst) {
4013                    async move { Ok(acp::PromptResponse::new(acp::StopReason::Refusal)) }
4014                        .boxed_local()
4015                } else {
4016                    async move { Ok(acp::PromptResponse::new(acp::StopReason::EndTurn)) }
4017                        .boxed_local()
4018                }
4019            }
4020        }));
4021
4022        let thread = cx
4023            .update(|cx| {
4024                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4025            })
4026            .await
4027            .unwrap();
4028
4029        // Track if we see a Refusal event
4030        let saw_refusal_event = Arc::new(std::sync::Mutex::new(false));
4031        let saw_refusal_event_captured = saw_refusal_event.clone();
4032        thread.update(cx, |_thread, cx| {
4033            cx.subscribe(
4034                &thread,
4035                move |_thread, _event_thread, event: &AcpThreadEvent, _cx| {
4036                    if matches!(event, AcpThreadEvent::Refusal) {
4037                        *saw_refusal_event_captured.lock().unwrap() = true;
4038                    }
4039                },
4040            )
4041            .detach();
4042        });
4043
4044        // Send a message that will be refused
4045        refuse_next.store(true, SeqCst);
4046        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["hello".into()], cx)))
4047            .await
4048            .unwrap();
4049
4050        // Verify that a Refusal event WAS emitted for user prompt refusal
4051        assert!(
4052            *saw_refusal_event.lock().unwrap(),
4053            "Refusal event should be emitted for user prompt refusals"
4054        );
4055
4056        // Verify the message was truncated (user prompt refusal)
4057        thread.read_with(cx, |thread, cx| {
4058            assert_eq!(thread.to_markdown(cx), "");
4059        });
4060    }
4061
4062    #[gpui::test]
4063    async fn test_refusal(cx: &mut TestAppContext) {
4064        init_test(cx);
4065        let fs = FakeFs::new(cx.background_executor.clone());
4066        fs.insert_tree(path!("/"), json!({})).await;
4067        let project = Project::test(fs.clone(), [path!("/").as_ref()], cx).await;
4068
4069        let refuse_next = Arc::new(AtomicBool::new(false));
4070        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
4071            let refuse_next = refuse_next.clone();
4072            move |request, thread, mut cx| {
4073                let refuse_next = refuse_next.clone();
4074                async move {
4075                    if refuse_next.load(SeqCst) {
4076                        return Ok(acp::PromptResponse::new(acp::StopReason::Refusal));
4077                    }
4078
4079                    let acp::ContentBlock::Text(content) = &request.prompt[0] else {
4080                        panic!("expected text content block");
4081                    };
4082                    thread.update(&mut cx, |thread, cx| {
4083                        thread
4084                            .handle_session_update(
4085                                acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk::new(
4086                                    content.text.to_uppercase().into(),
4087                                )),
4088                                cx,
4089                            )
4090                            .unwrap();
4091                    })?;
4092                    Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
4093                }
4094                .boxed_local()
4095            }
4096        }));
4097        let thread = cx
4098            .update(|cx| {
4099                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4100            })
4101            .await
4102            .unwrap();
4103
4104        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["hello".into()], cx)))
4105            .await
4106            .unwrap();
4107        thread.read_with(cx, |thread, cx| {
4108            assert_eq!(
4109                thread.to_markdown(cx),
4110                indoc! {"
4111                    ## User
4112
4113                    hello
4114
4115                    ## Assistant
4116
4117                    HELLO
4118
4119                "}
4120            );
4121        });
4122
4123        // Simulate refusing the second message. The message should be truncated
4124        // when a user prompt is refused.
4125        refuse_next.store(true, SeqCst);
4126        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["world".into()], cx)))
4127            .await
4128            .unwrap();
4129        thread.read_with(cx, |thread, cx| {
4130            assert_eq!(
4131                thread.to_markdown(cx),
4132                indoc! {"
4133                    ## User
4134
4135                    hello
4136
4137                    ## Assistant
4138
4139                    HELLO
4140
4141                "}
4142            );
4143        });
4144    }
4145
4146    async fn run_until_first_tool_call(
4147        thread: &Entity<AcpThread>,
4148        cx: &mut TestAppContext,
4149    ) -> usize {
4150        let (mut tx, mut rx) = mpsc::channel::<usize>(1);
4151
4152        let subscription = cx.update(|cx| {
4153            cx.subscribe(thread, move |thread, _, cx| {
4154                for (ix, entry) in thread.read(cx).entries.iter().enumerate() {
4155                    if matches!(entry, AgentThreadEntry::ToolCall(_)) {
4156                        return tx.try_send(ix).unwrap();
4157                    }
4158                }
4159            })
4160        });
4161
4162        select! {
4163            _ = futures::FutureExt::fuse(cx.background_executor.timer(Duration::from_secs(10))) => {
4164                panic!("Timeout waiting for tool call")
4165            }
4166            ix = rx.next().fuse() => {
4167                drop(subscription);
4168                ix.unwrap()
4169            }
4170        }
4171    }
4172
4173    #[derive(Clone, Default)]
4174    struct FakeAgentConnection {
4175        auth_methods: Vec<acp::AuthMethod>,
4176        sessions: Arc<parking_lot::Mutex<HashMap<acp::SessionId, WeakEntity<AcpThread>>>>,
4177        set_title_calls: Rc<RefCell<Vec<SharedString>>>,
4178        on_user_message: Option<
4179            Rc<
4180                dyn Fn(
4181                        acp::PromptRequest,
4182                        WeakEntity<AcpThread>,
4183                        AsyncApp,
4184                    ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
4185                    + 'static,
4186            >,
4187        >,
4188    }
4189
4190    impl FakeAgentConnection {
4191        fn new() -> Self {
4192            Self {
4193                auth_methods: Vec::new(),
4194                on_user_message: None,
4195                sessions: Arc::default(),
4196                set_title_calls: Default::default(),
4197            }
4198        }
4199
4200        #[expect(unused)]
4201        fn with_auth_methods(mut self, auth_methods: Vec<acp::AuthMethod>) -> Self {
4202            self.auth_methods = auth_methods;
4203            self
4204        }
4205
4206        fn on_user_message(
4207            mut self,
4208            handler: impl Fn(
4209                acp::PromptRequest,
4210                WeakEntity<AcpThread>,
4211                AsyncApp,
4212            ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
4213            + 'static,
4214        ) -> Self {
4215            self.on_user_message.replace(Rc::new(handler));
4216            self
4217        }
4218    }
4219
4220    impl AgentConnection for FakeAgentConnection {
4221        fn agent_id(&self) -> AgentId {
4222            AgentId::new("fake")
4223        }
4224
4225        fn telemetry_id(&self) -> SharedString {
4226            "fake".into()
4227        }
4228
4229        fn auth_methods(&self) -> &[acp::AuthMethod] {
4230            &self.auth_methods
4231        }
4232
4233        fn new_session(
4234            self: Rc<Self>,
4235            project: Entity<Project>,
4236            work_dirs: PathList,
4237            cx: &mut App,
4238        ) -> Task<gpui::Result<Entity<AcpThread>>> {
4239            let session_id = acp::SessionId::new(
4240                rand::rng()
4241                    .sample_iter(&distr::Alphanumeric)
4242                    .take(7)
4243                    .map(char::from)
4244                    .collect::<String>(),
4245            );
4246            let action_log = cx.new(|_| ActionLog::new(project.clone()));
4247            let thread = cx.new(|cx| {
4248                AcpThread::new(
4249                    None,
4250                    "Test",
4251                    Some(work_dirs),
4252                    self.clone(),
4253                    project,
4254                    action_log,
4255                    session_id.clone(),
4256                    watch::Receiver::constant(
4257                        acp::PromptCapabilities::new()
4258                            .image(true)
4259                            .audio(true)
4260                            .embedded_context(true),
4261                    ),
4262                    cx,
4263                )
4264            });
4265            self.sessions.lock().insert(session_id, thread.downgrade());
4266            Task::ready(Ok(thread))
4267        }
4268
4269        fn authenticate(&self, method: acp::AuthMethodId, _cx: &mut App) -> Task<gpui::Result<()>> {
4270            if self.auth_methods().iter().any(|m| m.id() == &method) {
4271                Task::ready(Ok(()))
4272            } else {
4273                Task::ready(Err(anyhow!("Invalid Auth Method")))
4274            }
4275        }
4276
4277        fn prompt(
4278            &self,
4279            _id: Option<UserMessageId>,
4280            params: acp::PromptRequest,
4281            cx: &mut App,
4282        ) -> Task<gpui::Result<acp::PromptResponse>> {
4283            let sessions = self.sessions.lock();
4284            let thread = sessions.get(&params.session_id).unwrap();
4285            if let Some(handler) = &self.on_user_message {
4286                let handler = handler.clone();
4287                let thread = thread.clone();
4288                cx.spawn(async move |cx| handler(params, thread, cx.clone()).await)
4289            } else {
4290                Task::ready(Ok(acp::PromptResponse::new(acp::StopReason::EndTurn)))
4291            }
4292        }
4293
4294        fn cancel(&self, _session_id: &acp::SessionId, _cx: &mut App) {}
4295
4296        fn truncate(
4297            &self,
4298            session_id: &acp::SessionId,
4299            _cx: &App,
4300        ) -> Option<Rc<dyn AgentSessionTruncate>> {
4301            Some(Rc::new(FakeAgentSessionEditor {
4302                _session_id: session_id.clone(),
4303            }))
4304        }
4305
4306        fn set_title(
4307            &self,
4308            _session_id: &acp::SessionId,
4309            _cx: &App,
4310        ) -> Option<Rc<dyn AgentSessionSetTitle>> {
4311            Some(Rc::new(FakeAgentSessionSetTitle {
4312                calls: self.set_title_calls.clone(),
4313            }))
4314        }
4315
4316        fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
4317            self
4318        }
4319    }
4320
4321    struct FakeAgentSessionSetTitle {
4322        calls: Rc<RefCell<Vec<SharedString>>>,
4323    }
4324
4325    impl AgentSessionSetTitle for FakeAgentSessionSetTitle {
4326        fn run(&self, title: SharedString, _cx: &mut App) -> Task<Result<()>> {
4327            self.calls.borrow_mut().push(title);
4328            Task::ready(Ok(()))
4329        }
4330    }
4331
4332    struct FakeAgentSessionEditor {
4333        _session_id: acp::SessionId,
4334    }
4335
4336    impl AgentSessionTruncate for FakeAgentSessionEditor {
4337        fn run(&self, _message_id: UserMessageId, _cx: &mut App) -> Task<Result<()>> {
4338            Task::ready(Ok(()))
4339        }
4340    }
4341
4342    #[gpui::test]
4343    async fn test_tool_call_not_found_creates_failed_entry(cx: &mut TestAppContext) {
4344        init_test(cx);
4345
4346        let fs = FakeFs::new(cx.executor());
4347        let project = Project::test(fs, [], cx).await;
4348        let connection = Rc::new(FakeAgentConnection::new());
4349        let thread = cx
4350            .update(|cx| {
4351                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4352            })
4353            .await
4354            .unwrap();
4355
4356        // Try to update a tool call that doesn't exist
4357        let nonexistent_id = acp::ToolCallId::new("nonexistent-tool-call");
4358        thread.update(cx, |thread, cx| {
4359            let result = thread.handle_session_update(
4360                acp::SessionUpdate::ToolCallUpdate(acp::ToolCallUpdate::new(
4361                    nonexistent_id.clone(),
4362                    acp::ToolCallUpdateFields::new().status(acp::ToolCallStatus::Completed),
4363                )),
4364                cx,
4365            );
4366
4367            // The update should succeed (not return an error)
4368            assert!(result.is_ok());
4369
4370            // There should now be exactly one entry in the thread
4371            assert_eq!(thread.entries.len(), 1);
4372
4373            // The entry should be a failed tool call
4374            if let AgentThreadEntry::ToolCall(tool_call) = &thread.entries[0] {
4375                assert_eq!(tool_call.id, nonexistent_id);
4376                assert!(matches!(tool_call.status, ToolCallStatus::Failed));
4377                assert_eq!(tool_call.kind, acp::ToolKind::Fetch);
4378
4379                // Check that the content contains the error message
4380                assert_eq!(tool_call.content.len(), 1);
4381                if let ToolCallContent::ContentBlock(content_block) = &tool_call.content[0] {
4382                    match content_block {
4383                        ContentBlock::Markdown { markdown } => {
4384                            let markdown_text = markdown.read(cx).source();
4385                            assert!(markdown_text.contains("Tool call not found"));
4386                        }
4387                        ContentBlock::Empty => panic!("Expected markdown content, got empty"),
4388                        ContentBlock::ResourceLink { .. } => {
4389                            panic!("Expected markdown content, got resource link")
4390                        }
4391                        ContentBlock::Image { .. } => {
4392                            panic!("Expected markdown content, got image")
4393                        }
4394                    }
4395                } else {
4396                    panic!("Expected ContentBlock, got: {:?}", tool_call.content[0]);
4397                }
4398            } else {
4399                panic!("Expected ToolCall entry, got: {:?}", thread.entries[0]);
4400            }
4401        });
4402    }
4403
4404    /// Tests that restoring a checkpoint properly cleans up terminals that were
4405    /// created after that checkpoint, and cancels any in-progress generation.
4406    ///
4407    /// Reproduces issue #35142: When a checkpoint is restored, any terminal processes
4408    /// that were started after that checkpoint should be terminated, and any in-progress
4409    /// AI generation should be canceled.
4410    #[gpui::test]
4411    async fn test_restore_checkpoint_kills_terminal(cx: &mut TestAppContext) {
4412        init_test(cx);
4413
4414        let fs = FakeFs::new(cx.executor());
4415        let project = Project::test(fs, [], cx).await;
4416        let connection = Rc::new(FakeAgentConnection::new());
4417        let thread = cx
4418            .update(|cx| {
4419                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4420            })
4421            .await
4422            .unwrap();
4423
4424        // Send first user message to create a checkpoint
4425        cx.update(|cx| {
4426            thread.update(cx, |thread, cx| {
4427                thread.send(vec!["first message".into()], cx)
4428            })
4429        })
4430        .await
4431        .unwrap();
4432
4433        // Send second message (creates another checkpoint) - we'll restore to this one
4434        cx.update(|cx| {
4435            thread.update(cx, |thread, cx| {
4436                thread.send(vec!["second message".into()], cx)
4437            })
4438        })
4439        .await
4440        .unwrap();
4441
4442        // Create 2 terminals BEFORE the checkpoint that have completed running
4443        let terminal_id_1 = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
4444        let mock_terminal_1 = cx.new(|cx| {
4445            let builder = ::terminal::TerminalBuilder::new_display_only(
4446                ::terminal::terminal_settings::CursorShape::default(),
4447                ::terminal::terminal_settings::AlternateScroll::On,
4448                None,
4449                0,
4450                cx.background_executor(),
4451                PathStyle::local(),
4452            )
4453            .unwrap();
4454            builder.subscribe(cx)
4455        });
4456
4457        thread.update(cx, |thread, cx| {
4458            thread.on_terminal_provider_event(
4459                TerminalProviderEvent::Created {
4460                    terminal_id: terminal_id_1.clone(),
4461                    label: "echo 'first'".to_string(),
4462                    cwd: Some(PathBuf::from("/test")),
4463                    output_byte_limit: None,
4464                    terminal: mock_terminal_1.clone(),
4465                },
4466                cx,
4467            );
4468        });
4469
4470        thread.update(cx, |thread, cx| {
4471            thread.on_terminal_provider_event(
4472                TerminalProviderEvent::Output {
4473                    terminal_id: terminal_id_1.clone(),
4474                    data: b"first\n".to_vec(),
4475                },
4476                cx,
4477            );
4478        });
4479
4480        thread.update(cx, |thread, cx| {
4481            thread.on_terminal_provider_event(
4482                TerminalProviderEvent::Exit {
4483                    terminal_id: terminal_id_1.clone(),
4484                    status: acp::TerminalExitStatus::new().exit_code(0),
4485                },
4486                cx,
4487            );
4488        });
4489
4490        let terminal_id_2 = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
4491        let mock_terminal_2 = cx.new(|cx| {
4492            let builder = ::terminal::TerminalBuilder::new_display_only(
4493                ::terminal::terminal_settings::CursorShape::default(),
4494                ::terminal::terminal_settings::AlternateScroll::On,
4495                None,
4496                0,
4497                cx.background_executor(),
4498                PathStyle::local(),
4499            )
4500            .unwrap();
4501            builder.subscribe(cx)
4502        });
4503
4504        thread.update(cx, |thread, cx| {
4505            thread.on_terminal_provider_event(
4506                TerminalProviderEvent::Created {
4507                    terminal_id: terminal_id_2.clone(),
4508                    label: "echo 'second'".to_string(),
4509                    cwd: Some(PathBuf::from("/test")),
4510                    output_byte_limit: None,
4511                    terminal: mock_terminal_2.clone(),
4512                },
4513                cx,
4514            );
4515        });
4516
4517        thread.update(cx, |thread, cx| {
4518            thread.on_terminal_provider_event(
4519                TerminalProviderEvent::Output {
4520                    terminal_id: terminal_id_2.clone(),
4521                    data: b"second\n".to_vec(),
4522                },
4523                cx,
4524            );
4525        });
4526
4527        thread.update(cx, |thread, cx| {
4528            thread.on_terminal_provider_event(
4529                TerminalProviderEvent::Exit {
4530                    terminal_id: terminal_id_2.clone(),
4531                    status: acp::TerminalExitStatus::new().exit_code(0),
4532                },
4533                cx,
4534            );
4535        });
4536
4537        // Get the second message ID to restore to
4538        let second_message_id = thread.read_with(cx, |thread, _| {
4539            // At this point we have:
4540            // - Index 0: First user message (with checkpoint)
4541            // - Index 1: Second user message (with checkpoint)
4542            // No assistant responses because FakeAgentConnection just returns EndTurn
4543            let AgentThreadEntry::UserMessage(message) = &thread.entries[1] else {
4544                panic!("expected user message at index 1");
4545            };
4546            message.id.clone().unwrap()
4547        });
4548
4549        // Create a terminal AFTER the checkpoint we'll restore to.
4550        // This simulates the AI agent starting a long-running terminal command.
4551        let terminal_id = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
4552        let mock_terminal = cx.new(|cx| {
4553            let builder = ::terminal::TerminalBuilder::new_display_only(
4554                ::terminal::terminal_settings::CursorShape::default(),
4555                ::terminal::terminal_settings::AlternateScroll::On,
4556                None,
4557                0,
4558                cx.background_executor(),
4559                PathStyle::local(),
4560            )
4561            .unwrap();
4562            builder.subscribe(cx)
4563        });
4564
4565        // Register the terminal as created
4566        thread.update(cx, |thread, cx| {
4567            thread.on_terminal_provider_event(
4568                TerminalProviderEvent::Created {
4569                    terminal_id: terminal_id.clone(),
4570                    label: "sleep 1000".to_string(),
4571                    cwd: Some(PathBuf::from("/test")),
4572                    output_byte_limit: None,
4573                    terminal: mock_terminal.clone(),
4574                },
4575                cx,
4576            );
4577        });
4578
4579        // Simulate the terminal producing output (still running)
4580        thread.update(cx, |thread, cx| {
4581            thread.on_terminal_provider_event(
4582                TerminalProviderEvent::Output {
4583                    terminal_id: terminal_id.clone(),
4584                    data: b"terminal is running...\n".to_vec(),
4585                },
4586                cx,
4587            );
4588        });
4589
4590        // Create a tool call entry that references this terminal
4591        // This represents the agent requesting a terminal command
4592        thread.update(cx, |thread, cx| {
4593            thread
4594                .handle_session_update(
4595                    acp::SessionUpdate::ToolCall(
4596                        acp::ToolCall::new("terminal-tool-1", "Running command")
4597                            .kind(acp::ToolKind::Execute)
4598                            .status(acp::ToolCallStatus::InProgress)
4599                            .content(vec![acp::ToolCallContent::Terminal(acp::Terminal::new(
4600                                terminal_id.clone(),
4601                            ))])
4602                            .raw_input(serde_json::json!({"command": "sleep 1000", "cd": "/test"})),
4603                    ),
4604                    cx,
4605                )
4606                .unwrap();
4607        });
4608
4609        // Verify terminal exists and is in the thread
4610        let terminal_exists_before =
4611            thread.read_with(cx, |thread, _| thread.terminals.contains_key(&terminal_id));
4612        assert!(
4613            terminal_exists_before,
4614            "Terminal should exist before checkpoint restore"
4615        );
4616
4617        // Verify the terminal's underlying task is still running (not completed)
4618        let terminal_running_before = thread.read_with(cx, |thread, _cx| {
4619            let terminal_entity = thread.terminals.get(&terminal_id).unwrap();
4620            terminal_entity.read_with(cx, |term, _cx| {
4621                term.output().is_none() // output is None means it's still running
4622            })
4623        });
4624        assert!(
4625            terminal_running_before,
4626            "Terminal should be running before checkpoint restore"
4627        );
4628
4629        // Verify we have the expected entries before restore
4630        let entry_count_before = thread.read_with(cx, |thread, _| thread.entries.len());
4631        assert!(
4632            entry_count_before > 1,
4633            "Should have multiple entries before restore"
4634        );
4635
4636        // Restore the checkpoint to the second message.
4637        // This should:
4638        // 1. Cancel any in-progress generation (via the cancel() call)
4639        // 2. Remove the terminal that was created after that point
4640        thread
4641            .update(cx, |thread, cx| {
4642                thread.restore_checkpoint(second_message_id, cx)
4643            })
4644            .await
4645            .unwrap();
4646
4647        // Verify that no send_task is in progress after restore
4648        // (cancel() clears the send_task)
4649        let has_send_task_after = thread.read_with(cx, |thread, _| thread.running_turn.is_some());
4650        assert!(
4651            !has_send_task_after,
4652            "Should not have a send_task after restore (cancel should have cleared it)"
4653        );
4654
4655        // Verify the entries were truncated (restoring to index 1 truncates at 1, keeping only index 0)
4656        let entry_count = thread.read_with(cx, |thread, _| thread.entries.len());
4657        assert_eq!(
4658            entry_count, 1,
4659            "Should have 1 entry after restore (only the first user message)"
4660        );
4661
4662        // Verify the 2 completed terminals from before the checkpoint still exist
4663        let terminal_1_exists = thread.read_with(cx, |thread, _| {
4664            thread.terminals.contains_key(&terminal_id_1)
4665        });
4666        assert!(
4667            terminal_1_exists,
4668            "Terminal 1 (from before checkpoint) should still exist"
4669        );
4670
4671        let terminal_2_exists = thread.read_with(cx, |thread, _| {
4672            thread.terminals.contains_key(&terminal_id_2)
4673        });
4674        assert!(
4675            terminal_2_exists,
4676            "Terminal 2 (from before checkpoint) should still exist"
4677        );
4678
4679        // Verify they're still in completed state
4680        let terminal_1_completed = thread.read_with(cx, |thread, _cx| {
4681            let terminal_entity = thread.terminals.get(&terminal_id_1).unwrap();
4682            terminal_entity.read_with(cx, |term, _cx| term.output().is_some())
4683        });
4684        assert!(terminal_1_completed, "Terminal 1 should still be completed");
4685
4686        let terminal_2_completed = thread.read_with(cx, |thread, _cx| {
4687            let terminal_entity = thread.terminals.get(&terminal_id_2).unwrap();
4688            terminal_entity.read_with(cx, |term, _cx| term.output().is_some())
4689        });
4690        assert!(terminal_2_completed, "Terminal 2 should still be completed");
4691
4692        // Verify the running terminal (created after checkpoint) was removed
4693        let terminal_3_exists =
4694            thread.read_with(cx, |thread, _| thread.terminals.contains_key(&terminal_id));
4695        assert!(
4696            !terminal_3_exists,
4697            "Terminal 3 (created after checkpoint) should have been removed"
4698        );
4699
4700        // Verify total count is 2 (the two from before the checkpoint)
4701        let terminal_count = thread.read_with(cx, |thread, _| thread.terminals.len());
4702        assert_eq!(
4703            terminal_count, 2,
4704            "Should have exactly 2 terminals (the completed ones from before checkpoint)"
4705        );
4706    }
4707
4708    /// Tests that update_last_checkpoint correctly updates the original message's checkpoint
4709    /// even when a new user message is added while the async checkpoint comparison is in progress.
4710    ///
4711    /// This is a regression test for a bug where update_last_checkpoint would fail with
4712    /// "no checkpoint" if a new user message (without a checkpoint) was added between when
4713    /// update_last_checkpoint started and when its async closure ran.
4714    #[gpui::test]
4715    async fn test_update_last_checkpoint_with_new_message_added(cx: &mut TestAppContext) {
4716        init_test(cx);
4717
4718        let fs = FakeFs::new(cx.executor());
4719        fs.insert_tree(path!("/test"), json!({".git": {}, "file.txt": "content"}))
4720            .await;
4721        let project = Project::test(fs.clone(), [Path::new(path!("/test"))], cx).await;
4722
4723        let handler_done = Arc::new(AtomicBool::new(false));
4724        let handler_done_clone = handler_done.clone();
4725        let connection = Rc::new(FakeAgentConnection::new().on_user_message(
4726            move |_, _thread, _cx| {
4727                handler_done_clone.store(true, SeqCst);
4728                async move { Ok(acp::PromptResponse::new(acp::StopReason::EndTurn)) }.boxed_local()
4729            },
4730        ));
4731
4732        let thread = cx
4733            .update(|cx| {
4734                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4735            })
4736            .await
4737            .unwrap();
4738
4739        let send_future = thread.update(cx, |thread, cx| thread.send_raw("First message", cx));
4740        let send_task = cx.background_executor.spawn(send_future);
4741
4742        // Tick until handler completes, then a few more to let update_last_checkpoint start
4743        while !handler_done.load(SeqCst) {
4744            cx.executor().tick();
4745        }
4746        for _ in 0..5 {
4747            cx.executor().tick();
4748        }
4749
4750        thread.update(cx, |thread, cx| {
4751            thread.push_entry(
4752                AgentThreadEntry::UserMessage(UserMessage {
4753                    id: Some(UserMessageId::new()),
4754                    content: ContentBlock::Empty,
4755                    chunks: vec!["Injected message (no checkpoint)".into()],
4756                    checkpoint: None,
4757                    indented: false,
4758                }),
4759                cx,
4760            );
4761        });
4762
4763        cx.run_until_parked();
4764        let result = send_task.await;
4765
4766        assert!(
4767            result.is_ok(),
4768            "send should succeed even when new message added during update_last_checkpoint: {:?}",
4769            result.err()
4770        );
4771    }
4772
4773    /// Tests that when a follow-up message is sent during generation,
4774    /// the first turn completing does NOT clear `running_turn` because
4775    /// it now belongs to the second turn.
4776    #[gpui::test]
4777    async fn test_follow_up_message_during_generation_does_not_clear_turn(cx: &mut TestAppContext) {
4778        init_test(cx);
4779
4780        let fs = FakeFs::new(cx.executor());
4781        let project = Project::test(fs, [], cx).await;
4782
4783        // First handler waits for this signal before completing
4784        let (first_complete_tx, first_complete_rx) = futures::channel::oneshot::channel::<()>();
4785        let first_complete_rx = RefCell::new(Some(first_complete_rx));
4786
4787        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
4788            move |params, _thread, _cx| {
4789                let first_complete_rx = first_complete_rx.borrow_mut().take();
4790                let is_first = params
4791                    .prompt
4792                    .iter()
4793                    .any(|c| matches!(c, acp::ContentBlock::Text(t) if t.text.contains("first")));
4794
4795                async move {
4796                    if is_first {
4797                        // First handler waits until signaled
4798                        if let Some(rx) = first_complete_rx {
4799                            rx.await.ok();
4800                        }
4801                    }
4802                    Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
4803                }
4804                .boxed_local()
4805            }
4806        }));
4807
4808        let thread = cx
4809            .update(|cx| {
4810                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4811            })
4812            .await
4813            .unwrap();
4814
4815        // Send first message (turn_id=1) - handler will block
4816        let first_request = thread.update(cx, |thread, cx| thread.send_raw("first", cx));
4817        assert_eq!(thread.read_with(cx, |t, _| t.turn_id), 1);
4818
4819        // Send second message (turn_id=2) while first is still blocked
4820        // This calls cancel() which takes turn 1's running_turn and sets turn 2's
4821        let second_request = thread.update(cx, |thread, cx| thread.send_raw("second", cx));
4822        assert_eq!(thread.read_with(cx, |t, _| t.turn_id), 2);
4823
4824        let running_turn_after_second_send =
4825            thread.read_with(cx, |thread, _| thread.running_turn.as_ref().map(|t| t.id));
4826        assert_eq!(
4827            running_turn_after_second_send,
4828            Some(2),
4829            "running_turn should be set to turn 2 after sending second message"
4830        );
4831
4832        // Now signal first handler to complete
4833        first_complete_tx.send(()).ok();
4834
4835        // First request completes - should NOT clear running_turn
4836        // because running_turn now belongs to turn 2
4837        first_request.await.unwrap();
4838
4839        let running_turn_after_first =
4840            thread.read_with(cx, |thread, _| thread.running_turn.as_ref().map(|t| t.id));
4841        assert_eq!(
4842            running_turn_after_first,
4843            Some(2),
4844            "first turn completing should not clear running_turn (belongs to turn 2)"
4845        );
4846
4847        // Second request completes - SHOULD clear running_turn
4848        second_request.await.unwrap();
4849
4850        let running_turn_after_second =
4851            thread.read_with(cx, |thread, _| thread.running_turn.is_some());
4852        assert!(
4853            !running_turn_after_second,
4854            "second turn completing should clear running_turn"
4855        );
4856    }
4857
4858    #[gpui::test]
4859    async fn test_send_returns_cancelled_response_and_marks_tools_as_cancelled(
4860        cx: &mut TestAppContext,
4861    ) {
4862        init_test(cx);
4863
4864        let fs = FakeFs::new(cx.executor());
4865        let project = Project::test(fs, [], cx).await;
4866
4867        let connection = Rc::new(FakeAgentConnection::new().on_user_message(
4868            move |_params, thread, mut cx| {
4869                async move {
4870                    thread
4871                        .update(&mut cx, |thread, cx| {
4872                            thread.handle_session_update(
4873                                acp::SessionUpdate::ToolCall(
4874                                    acp::ToolCall::new(
4875                                        acp::ToolCallId::new("test-tool"),
4876                                        "Test Tool",
4877                                    )
4878                                    .kind(acp::ToolKind::Fetch)
4879                                    .status(acp::ToolCallStatus::InProgress),
4880                                ),
4881                                cx,
4882                            )
4883                        })
4884                        .unwrap()
4885                        .unwrap();
4886
4887                    Ok(acp::PromptResponse::new(acp::StopReason::Cancelled))
4888                }
4889                .boxed_local()
4890            },
4891        ));
4892
4893        let thread = cx
4894            .update(|cx| {
4895                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4896            })
4897            .await
4898            .unwrap();
4899
4900        let response = thread
4901            .update(cx, |thread, cx| thread.send_raw("test message", cx))
4902            .await;
4903
4904        let response = response
4905            .expect("send should succeed")
4906            .expect("should have response");
4907        assert_eq!(
4908            response.stop_reason,
4909            acp::StopReason::Cancelled,
4910            "response should have Cancelled stop_reason"
4911        );
4912
4913        thread.read_with(cx, |thread, _| {
4914            let tool_entry = thread
4915                .entries
4916                .iter()
4917                .find_map(|e| {
4918                    if let AgentThreadEntry::ToolCall(call) = e {
4919                        Some(call)
4920                    } else {
4921                        None
4922                    }
4923                })
4924                .expect("should have tool call entry");
4925
4926            assert!(
4927                matches!(tool_entry.status, ToolCallStatus::Canceled),
4928                "tool should be marked as Canceled when response is Cancelled, got {:?}",
4929                tool_entry.status
4930            );
4931        });
4932    }
4933
4934    #[gpui::test]
4935    async fn test_provisional_title_replaced_by_real_title(cx: &mut TestAppContext) {
4936        init_test(cx);
4937
4938        let fs = FakeFs::new(cx.executor());
4939        let project = Project::test(fs, [], cx).await;
4940        let connection = Rc::new(FakeAgentConnection::new());
4941        let set_title_calls = connection.set_title_calls.clone();
4942
4943        let thread = cx
4944            .update(|cx| {
4945                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4946            })
4947            .await
4948            .unwrap();
4949
4950        // Initial title is the default.
4951        thread.read_with(cx, |thread, _| {
4952            assert_eq!(thread.title().as_ref(), "Test");
4953        });
4954
4955        // Setting a provisional title updates the display title.
4956        thread.update(cx, |thread, cx| {
4957            thread.set_provisional_title("Hello, can you help…".into(), cx);
4958        });
4959        thread.read_with(cx, |thread, _| {
4960            assert_eq!(thread.title().as_ref(), "Hello, can you help…");
4961        });
4962
4963        // The provisional title should NOT have propagated to the connection.
4964        assert_eq!(
4965            set_title_calls.borrow().len(),
4966            0,
4967            "provisional title should not propagate to the connection"
4968        );
4969
4970        // When the real title arrives via set_title, it replaces the
4971        // provisional title and propagates to the connection.
4972        let task = thread.update(cx, |thread, cx| {
4973            thread.set_title("Helping with Rust question".into(), cx)
4974        });
4975        task.await.expect("set_title should succeed");
4976        thread.read_with(cx, |thread, _| {
4977            assert_eq!(thread.title().as_ref(), "Helping with Rust question");
4978        });
4979        assert_eq!(
4980            set_title_calls.borrow().as_slice(),
4981            &[SharedString::from("Helping with Rust question")],
4982            "real title should propagate to the connection"
4983        );
4984    }
4985
4986    #[gpui::test]
4987    async fn test_session_info_update_replaces_provisional_title_and_emits_event(
4988        cx: &mut TestAppContext,
4989    ) {
4990        init_test(cx);
4991
4992        let fs = FakeFs::new(cx.executor());
4993        let project = Project::test(fs, [], cx).await;
4994        let connection = Rc::new(FakeAgentConnection::new());
4995
4996        let thread = cx
4997            .update(|cx| {
4998                connection.clone().new_session(
4999                    project,
5000                    PathList::new(&[Path::new(path!("/test"))]),
5001                    cx,
5002                )
5003            })
5004            .await
5005            .unwrap();
5006
5007        let title_updated_events = Rc::new(RefCell::new(0usize));
5008        let title_updated_events_for_subscription = title_updated_events.clone();
5009        thread.update(cx, |_thread, cx| {
5010            cx.subscribe(
5011                &thread,
5012                move |_thread, _event_thread, event: &AcpThreadEvent, _cx| {
5013                    if matches!(event, AcpThreadEvent::TitleUpdated) {
5014                        *title_updated_events_for_subscription.borrow_mut() += 1;
5015                    }
5016                },
5017            )
5018            .detach();
5019        });
5020
5021        thread.update(cx, |thread, cx| {
5022            thread.set_provisional_title("Hello, can you help…".into(), cx);
5023        });
5024        assert_eq!(
5025            *title_updated_events.borrow(),
5026            1,
5027            "setting a provisional title should emit TitleUpdated"
5028        );
5029
5030        let result = thread.update(cx, |thread, cx| {
5031            thread.handle_session_update(
5032                acp::SessionUpdate::SessionInfoUpdate(
5033                    acp::SessionInfoUpdate::new().title("Helping with Rust question"),
5034                ),
5035                cx,
5036            )
5037        });
5038        result.expect("session info update should succeed");
5039
5040        thread.read_with(cx, |thread, _| {
5041            assert_eq!(thread.title().as_ref(), "Helping with Rust question");
5042            assert!(
5043                !thread.has_provisional_title(),
5044                "session info title update should clear provisional title"
5045            );
5046        });
5047
5048        assert_eq!(
5049            *title_updated_events.borrow(),
5050            2,
5051            "session info title update should emit TitleUpdated"
5052        );
5053        assert!(
5054            connection.set_title_calls.borrow().is_empty(),
5055            "session info title update should not propagate back to the connection"
5056        );
5057    }
5058}