acp_thread.rs

   1mod connection;
   2mod diff;
   3mod mention;
   4mod terminal;
   5use action_log::{ActionLog, ActionLogTelemetry};
   6use agent_client_protocol::{self as acp};
   7use anyhow::{Context as _, Result, anyhow};
   8use collections::HashSet;
   9pub use connection::*;
  10pub use diff::*;
  11use futures::{FutureExt, channel::oneshot, future::BoxFuture};
  12use gpui::{AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Task, WeakEntity};
  13use itertools::Itertools;
  14use language::language_settings::FormatOnSave;
  15use language::{Anchor, Buffer, BufferSnapshot, LanguageRegistry, Point, ToPoint, text_diff};
  16use markdown::Markdown;
  17pub use mention::*;
  18use project::lsp_store::{FormatTrigger, LspFormatTarget};
  19use project::{AgentLocation, Project, git_store::GitStoreCheckpoint};
  20use serde::{Deserialize, Serialize};
  21use serde_json::to_string_pretty;
  22use std::collections::HashMap;
  23use std::error::Error;
  24use std::fmt::{Formatter, Write};
  25use std::ops::Range;
  26use std::process::ExitStatus;
  27use std::rc::Rc;
  28use std::time::{Duration, Instant};
  29use std::{fmt::Display, mem, path::PathBuf, sync::Arc};
  30use task::{Shell, ShellBuilder};
  31pub use terminal::*;
  32use text::Bias;
  33use ui::App;
  34use util::markdown::MarkdownEscaped;
  35use util::path_list::PathList;
  36use util::{ResultExt, get_default_system_shell_preferring_bash, paths::PathStyle};
  37use uuid::Uuid;
  38
  39/// Key used in ACP ToolCall meta to store the tool's programmatic name.
  40/// This is a workaround since ACP's ToolCall doesn't have a dedicated name field.
  41pub const TOOL_NAME_META_KEY: &str = "tool_name";
  42
  43/// Helper to extract tool name from ACP meta
  44pub fn tool_name_from_meta(meta: &Option<acp::Meta>) -> Option<SharedString> {
  45    meta.as_ref()
  46        .and_then(|m| m.get(TOOL_NAME_META_KEY))
  47        .and_then(|v| v.as_str())
  48        .map(|s| SharedString::from(s.to_owned()))
  49}
  50
  51/// Helper to create meta with tool name
  52pub fn meta_with_tool_name(tool_name: &str) -> acp::Meta {
  53    acp::Meta::from_iter([(TOOL_NAME_META_KEY.into(), tool_name.into())])
  54}
  55
  56/// Key used in ACP ToolCall meta to store the session id and message indexes
  57pub const SUBAGENT_SESSION_INFO_META_KEY: &str = "subagent_session_info";
  58
  59#[derive(Clone, Debug, Deserialize, Serialize)]
  60pub struct SubagentSessionInfo {
  61    /// The session id of the subagent sessiont that was spawned
  62    pub session_id: acp::SessionId,
  63    /// The index of the message of the start of the "turn" run by this tool call
  64    pub message_start_index: usize,
  65    /// The index of the output of the message that the subagent has returned
  66    #[serde(skip_serializing_if = "Option::is_none")]
  67    pub message_end_index: Option<usize>,
  68}
  69
  70/// Helper to extract subagent session id from ACP meta
  71pub fn subagent_session_info_from_meta(meta: &Option<acp::Meta>) -> Option<SubagentSessionInfo> {
  72    meta.as_ref()
  73        .and_then(|m| m.get(SUBAGENT_SESSION_INFO_META_KEY))
  74        .and_then(|v| serde_json::from_value(v.clone()).ok())
  75}
  76
  77#[derive(Debug)]
  78pub struct UserMessage {
  79    pub id: Option<UserMessageId>,
  80    pub content: ContentBlock,
  81    pub chunks: Vec<acp::ContentBlock>,
  82    pub checkpoint: Option<Checkpoint>,
  83    pub indented: bool,
  84}
  85
  86#[derive(Debug)]
  87pub struct Checkpoint {
  88    git_checkpoint: GitStoreCheckpoint,
  89    pub show: bool,
  90}
  91
  92impl UserMessage {
  93    fn to_markdown(&self, cx: &App) -> String {
  94        let mut markdown = String::new();
  95        if self
  96            .checkpoint
  97            .as_ref()
  98            .is_some_and(|checkpoint| checkpoint.show)
  99        {
 100            writeln!(markdown, "## User (checkpoint)").unwrap();
 101        } else {
 102            writeln!(markdown, "## User").unwrap();
 103        }
 104        writeln!(markdown).unwrap();
 105        writeln!(markdown, "{}", self.content.to_markdown(cx)).unwrap();
 106        writeln!(markdown).unwrap();
 107        markdown
 108    }
 109}
 110
 111#[derive(Debug, PartialEq)]
 112pub struct AssistantMessage {
 113    pub chunks: Vec<AssistantMessageChunk>,
 114    pub indented: bool,
 115    pub is_subagent_output: bool,
 116}
 117
 118impl AssistantMessage {
 119    pub fn to_markdown(&self, cx: &App) -> String {
 120        format!(
 121            "## Assistant\n\n{}\n\n",
 122            self.chunks
 123                .iter()
 124                .map(|chunk| chunk.to_markdown(cx))
 125                .join("\n\n")
 126        )
 127    }
 128}
 129
 130#[derive(Debug, PartialEq)]
 131pub enum AssistantMessageChunk {
 132    Message { block: ContentBlock },
 133    Thought { block: ContentBlock },
 134}
 135
 136impl AssistantMessageChunk {
 137    pub fn from_str(
 138        chunk: &str,
 139        language_registry: &Arc<LanguageRegistry>,
 140        path_style: PathStyle,
 141        cx: &mut App,
 142    ) -> Self {
 143        Self::Message {
 144            block: ContentBlock::new(chunk.into(), language_registry, path_style, cx),
 145        }
 146    }
 147
 148    fn to_markdown(&self, cx: &App) -> String {
 149        match self {
 150            Self::Message { block } => block.to_markdown(cx).to_string(),
 151            Self::Thought { block } => {
 152                format!("<thinking>\n{}\n</thinking>", block.to_markdown(cx))
 153            }
 154        }
 155    }
 156}
 157
 158#[derive(Debug)]
 159pub enum AgentThreadEntry {
 160    UserMessage(UserMessage),
 161    AssistantMessage(AssistantMessage),
 162    ToolCall(ToolCall),
 163}
 164
 165impl AgentThreadEntry {
 166    pub fn is_indented(&self) -> bool {
 167        match self {
 168            Self::UserMessage(message) => message.indented,
 169            Self::AssistantMessage(message) => message.indented,
 170            Self::ToolCall(_) => false,
 171        }
 172    }
 173
 174    pub fn to_markdown(&self, cx: &App) -> String {
 175        match self {
 176            Self::UserMessage(message) => message.to_markdown(cx),
 177            Self::AssistantMessage(message) => message.to_markdown(cx),
 178            Self::ToolCall(tool_call) => tool_call.to_markdown(cx),
 179        }
 180    }
 181
 182    pub fn user_message(&self) -> Option<&UserMessage> {
 183        if let AgentThreadEntry::UserMessage(message) = self {
 184            Some(message)
 185        } else {
 186            None
 187        }
 188    }
 189
 190    pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
 191        if let AgentThreadEntry::ToolCall(call) = self {
 192            itertools::Either::Left(call.diffs())
 193        } else {
 194            itertools::Either::Right(std::iter::empty())
 195        }
 196    }
 197
 198    pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
 199        if let AgentThreadEntry::ToolCall(call) = self {
 200            itertools::Either::Left(call.terminals())
 201        } else {
 202            itertools::Either::Right(std::iter::empty())
 203        }
 204    }
 205
 206    pub fn location(&self, ix: usize) -> Option<(acp::ToolCallLocation, AgentLocation)> {
 207        if let AgentThreadEntry::ToolCall(ToolCall {
 208            locations,
 209            resolved_locations,
 210            ..
 211        }) = self
 212        {
 213            Some((
 214                locations.get(ix)?.clone(),
 215                resolved_locations.get(ix)?.clone()?,
 216            ))
 217        } else {
 218            None
 219        }
 220    }
 221}
 222
 223#[derive(Debug)]
 224pub struct ToolCall {
 225    pub id: acp::ToolCallId,
 226    pub label: Entity<Markdown>,
 227    pub kind: acp::ToolKind,
 228    pub content: Vec<ToolCallContent>,
 229    pub status: ToolCallStatus,
 230    pub locations: Vec<acp::ToolCallLocation>,
 231    pub resolved_locations: Vec<Option<AgentLocation>>,
 232    pub raw_input: Option<serde_json::Value>,
 233    pub raw_input_markdown: Option<Entity<Markdown>>,
 234    pub raw_output: Option<serde_json::Value>,
 235    pub tool_name: Option<SharedString>,
 236    pub subagent_session_info: Option<SubagentSessionInfo>,
 237}
 238
 239impl ToolCall {
 240    fn from_acp(
 241        tool_call: acp::ToolCall,
 242        status: ToolCallStatus,
 243        language_registry: Arc<LanguageRegistry>,
 244        path_style: PathStyle,
 245        terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
 246        cx: &mut App,
 247    ) -> Result<Self> {
 248        let title = if tool_call.kind == acp::ToolKind::Execute {
 249            tool_call.title
 250        } else if tool_call.kind == acp::ToolKind::Edit {
 251            MarkdownEscaped(tool_call.title.as_str()).to_string()
 252        } else if let Some((first_line, _)) = tool_call.title.split_once("\n") {
 253            first_line.to_owned() + ""
 254        } else {
 255            tool_call.title
 256        };
 257        let mut content = Vec::with_capacity(tool_call.content.len());
 258        for item in tool_call.content {
 259            if let Some(item) = ToolCallContent::from_acp(
 260                item,
 261                language_registry.clone(),
 262                path_style,
 263                terminals,
 264                cx,
 265            )? {
 266                content.push(item);
 267            }
 268        }
 269
 270        let raw_input_markdown = tool_call
 271            .raw_input
 272            .as_ref()
 273            .and_then(|input| markdown_for_raw_output(input, &language_registry, cx));
 274
 275        let tool_name = tool_name_from_meta(&tool_call.meta);
 276
 277        let subagent_session_info = subagent_session_info_from_meta(&tool_call.meta);
 278
 279        let result = Self {
 280            id: tool_call.tool_call_id,
 281            label: cx
 282                .new(|cx| Markdown::new(title.into(), Some(language_registry.clone()), None, cx)),
 283            kind: tool_call.kind,
 284            content,
 285            locations: tool_call.locations,
 286            resolved_locations: Vec::default(),
 287            status,
 288            raw_input: tool_call.raw_input,
 289            raw_input_markdown,
 290            raw_output: tool_call.raw_output,
 291            tool_name,
 292            subagent_session_info,
 293        };
 294        Ok(result)
 295    }
 296
 297    fn update_fields(
 298        &mut self,
 299        fields: acp::ToolCallUpdateFields,
 300        meta: Option<acp::Meta>,
 301        language_registry: Arc<LanguageRegistry>,
 302        path_style: PathStyle,
 303        terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
 304        cx: &mut App,
 305    ) -> Result<()> {
 306        let acp::ToolCallUpdateFields {
 307            kind,
 308            status,
 309            title,
 310            content,
 311            locations,
 312            raw_input,
 313            raw_output,
 314            ..
 315        } = fields;
 316
 317        if let Some(kind) = kind {
 318            self.kind = kind;
 319        }
 320
 321        if let Some(status) = status {
 322            self.status = status.into();
 323        }
 324
 325        if let Some(subagent_session_info) = subagent_session_info_from_meta(&meta) {
 326            self.subagent_session_info = Some(subagent_session_info);
 327        }
 328
 329        if let Some(title) = title {
 330            if self.kind == acp::ToolKind::Execute {
 331                for terminal in self.terminals() {
 332                    terminal.update(cx, |terminal, cx| {
 333                        terminal.update_command_label(&title, cx);
 334                    });
 335                }
 336            }
 337            self.label.update(cx, |label, cx| {
 338                if self.kind == acp::ToolKind::Execute {
 339                    label.replace(title, cx);
 340                } else if self.kind == acp::ToolKind::Edit {
 341                    label.replace(MarkdownEscaped(&title).to_string(), cx)
 342                } else if let Some((first_line, _)) = title.split_once("\n") {
 343                    label.replace(first_line.to_owned() + "", cx);
 344                } else {
 345                    label.replace(title, cx);
 346                }
 347            });
 348        }
 349
 350        if let Some(content) = content {
 351            let mut new_content_len = content.len();
 352            let mut content = content.into_iter();
 353
 354            // Reuse existing content if we can
 355            for (old, new) in self.content.iter_mut().zip(content.by_ref()) {
 356                let valid_content =
 357                    old.update_from_acp(new, language_registry.clone(), path_style, terminals, cx)?;
 358                if !valid_content {
 359                    new_content_len -= 1;
 360                }
 361            }
 362            for new in content {
 363                if let Some(new) = ToolCallContent::from_acp(
 364                    new,
 365                    language_registry.clone(),
 366                    path_style,
 367                    terminals,
 368                    cx,
 369                )? {
 370                    self.content.push(new);
 371                } else {
 372                    new_content_len -= 1;
 373                }
 374            }
 375            self.content.truncate(new_content_len);
 376        }
 377
 378        if let Some(locations) = locations {
 379            self.locations = locations;
 380        }
 381
 382        if let Some(raw_input) = raw_input {
 383            self.raw_input_markdown = markdown_for_raw_output(&raw_input, &language_registry, cx);
 384            self.raw_input = Some(raw_input);
 385        }
 386
 387        if let Some(raw_output) = raw_output {
 388            if self.content.is_empty()
 389                && let Some(markdown) = markdown_for_raw_output(&raw_output, &language_registry, cx)
 390            {
 391                self.content
 392                    .push(ToolCallContent::ContentBlock(ContentBlock::Markdown {
 393                        markdown,
 394                    }));
 395            }
 396            self.raw_output = Some(raw_output);
 397        }
 398        Ok(())
 399    }
 400
 401    pub fn diffs(&self) -> impl Iterator<Item = &Entity<Diff>> {
 402        self.content.iter().filter_map(|content| match content {
 403            ToolCallContent::Diff(diff) => Some(diff),
 404            ToolCallContent::ContentBlock(_) => None,
 405            ToolCallContent::Terminal(_) => None,
 406        })
 407    }
 408
 409    pub fn terminals(&self) -> impl Iterator<Item = &Entity<Terminal>> {
 410        self.content.iter().filter_map(|content| match content {
 411            ToolCallContent::Terminal(terminal) => Some(terminal),
 412            ToolCallContent::ContentBlock(_) => None,
 413            ToolCallContent::Diff(_) => None,
 414        })
 415    }
 416
 417    pub fn is_subagent(&self) -> bool {
 418        self.tool_name.as_ref().is_some_and(|s| s == "spawn_agent")
 419            || self.subagent_session_info.is_some()
 420    }
 421
 422    pub fn to_markdown(&self, cx: &App) -> String {
 423        let mut markdown = format!(
 424            "**Tool Call: {}**\nStatus: {}\n\n",
 425            self.label.read(cx).source(),
 426            self.status
 427        );
 428        for content in &self.content {
 429            markdown.push_str(content.to_markdown(cx).as_str());
 430            markdown.push_str("\n\n");
 431        }
 432        markdown
 433    }
 434
 435    async fn resolve_location(
 436        location: acp::ToolCallLocation,
 437        project: WeakEntity<Project>,
 438        cx: &mut AsyncApp,
 439    ) -> Option<ResolvedLocation> {
 440        let buffer = project
 441            .update(cx, |project, cx| {
 442                project
 443                    .project_path_for_absolute_path(&location.path, cx)
 444                    .map(|path| project.open_buffer(path, cx))
 445            })
 446            .ok()??;
 447        let buffer = buffer.await.log_err()?;
 448        let position = buffer.update(cx, |buffer, _| {
 449            let snapshot = buffer.snapshot();
 450            if let Some(row) = location.line {
 451                let column = snapshot.indent_size_for_line(row).len;
 452                let point = snapshot.clip_point(Point::new(row, column), Bias::Left);
 453                snapshot.anchor_before(point)
 454            } else {
 455                Anchor::min_for_buffer(snapshot.remote_id())
 456            }
 457        });
 458
 459        Some(ResolvedLocation { buffer, position })
 460    }
 461
 462    fn resolve_locations(
 463        &self,
 464        project: Entity<Project>,
 465        cx: &mut App,
 466    ) -> Task<Vec<Option<ResolvedLocation>>> {
 467        let locations = self.locations.clone();
 468        project.update(cx, |_, cx| {
 469            cx.spawn(async move |project, cx| {
 470                let mut new_locations = Vec::new();
 471                for location in locations {
 472                    new_locations.push(Self::resolve_location(location, project.clone(), cx).await);
 473                }
 474                new_locations
 475            })
 476        })
 477    }
 478}
 479
 480// Separate so we can hold a strong reference to the buffer
 481// for saving on the thread
 482#[derive(Clone, Debug, PartialEq, Eq)]
 483struct ResolvedLocation {
 484    buffer: Entity<Buffer>,
 485    position: Anchor,
 486}
 487
 488impl From<&ResolvedLocation> for AgentLocation {
 489    fn from(value: &ResolvedLocation) -> Self {
 490        Self {
 491            buffer: value.buffer.downgrade(),
 492            position: value.position,
 493        }
 494    }
 495}
 496
 497#[derive(Debug)]
 498pub enum ToolCallStatus {
 499    /// The tool call hasn't started running yet, but we start showing it to
 500    /// the user.
 501    Pending,
 502    /// The tool call is waiting for confirmation from the user.
 503    WaitingForConfirmation {
 504        options: PermissionOptions,
 505        respond_tx: oneshot::Sender<acp::PermissionOptionId>,
 506    },
 507    /// The tool call is currently running.
 508    InProgress,
 509    /// The tool call completed successfully.
 510    Completed,
 511    /// The tool call failed.
 512    Failed,
 513    /// The user rejected the tool call.
 514    Rejected,
 515    /// The user canceled generation so the tool call was canceled.
 516    Canceled,
 517}
 518
 519impl From<acp::ToolCallStatus> for ToolCallStatus {
 520    fn from(status: acp::ToolCallStatus) -> Self {
 521        match status {
 522            acp::ToolCallStatus::Pending => Self::Pending,
 523            acp::ToolCallStatus::InProgress => Self::InProgress,
 524            acp::ToolCallStatus::Completed => Self::Completed,
 525            acp::ToolCallStatus::Failed => Self::Failed,
 526            _ => Self::Pending,
 527        }
 528    }
 529}
 530
 531impl Display for ToolCallStatus {
 532    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
 533        write!(
 534            f,
 535            "{}",
 536            match self {
 537                ToolCallStatus::Pending => "Pending",
 538                ToolCallStatus::WaitingForConfirmation { .. } => "Waiting for confirmation",
 539                ToolCallStatus::InProgress => "In Progress",
 540                ToolCallStatus::Completed => "Completed",
 541                ToolCallStatus::Failed => "Failed",
 542                ToolCallStatus::Rejected => "Rejected",
 543                ToolCallStatus::Canceled => "Canceled",
 544            }
 545        )
 546    }
 547}
 548
 549#[derive(Debug, PartialEq, Clone)]
 550pub enum ContentBlock {
 551    Empty,
 552    Markdown { markdown: Entity<Markdown> },
 553    ResourceLink { resource_link: acp::ResourceLink },
 554    Image { image: Arc<gpui::Image> },
 555}
 556
 557impl ContentBlock {
 558    pub fn new(
 559        block: acp::ContentBlock,
 560        language_registry: &Arc<LanguageRegistry>,
 561        path_style: PathStyle,
 562        cx: &mut App,
 563    ) -> Self {
 564        let mut this = Self::Empty;
 565        this.append(block, language_registry, path_style, cx);
 566        this
 567    }
 568
 569    pub fn new_combined(
 570        blocks: impl IntoIterator<Item = acp::ContentBlock>,
 571        language_registry: Arc<LanguageRegistry>,
 572        path_style: PathStyle,
 573        cx: &mut App,
 574    ) -> Self {
 575        let mut this = Self::Empty;
 576        for block in blocks {
 577            this.append(block, &language_registry, path_style, cx);
 578        }
 579        this
 580    }
 581
 582    pub fn append(
 583        &mut self,
 584        block: acp::ContentBlock,
 585        language_registry: &Arc<LanguageRegistry>,
 586        path_style: PathStyle,
 587        cx: &mut App,
 588    ) {
 589        match (&mut *self, &block) {
 590            (ContentBlock::Empty, acp::ContentBlock::ResourceLink(resource_link)) => {
 591                *self = ContentBlock::ResourceLink {
 592                    resource_link: resource_link.clone(),
 593                };
 594            }
 595            (ContentBlock::Empty, acp::ContentBlock::Image(image_content)) => {
 596                if let Some(image) = Self::decode_image(image_content) {
 597                    *self = ContentBlock::Image { image };
 598                } else {
 599                    let new_content = Self::image_md(image_content);
 600                    *self = Self::create_markdown_block(new_content, language_registry, cx);
 601                }
 602            }
 603            (ContentBlock::Empty, _) => {
 604                let new_content = Self::block_string_contents(&block, path_style);
 605                *self = Self::create_markdown_block(new_content, language_registry, cx);
 606            }
 607            (ContentBlock::Markdown { markdown }, _) => {
 608                let new_content = Self::block_string_contents(&block, path_style);
 609                markdown.update(cx, |markdown, cx| markdown.append(&new_content, cx));
 610            }
 611            (ContentBlock::ResourceLink { resource_link }, _) => {
 612                let existing_content = Self::resource_link_md(&resource_link.uri, path_style);
 613                let new_content = Self::block_string_contents(&block, path_style);
 614                let combined = format!("{}\n{}", existing_content, new_content);
 615                *self = Self::create_markdown_block(combined, language_registry, cx);
 616            }
 617            (ContentBlock::Image { .. }, _) => {
 618                let new_content = Self::block_string_contents(&block, path_style);
 619                let combined = format!("`Image`\n{}", new_content);
 620                *self = Self::create_markdown_block(combined, language_registry, cx);
 621            }
 622        }
 623    }
 624
 625    fn decode_image(image_content: &acp::ImageContent) -> Option<Arc<gpui::Image>> {
 626        use base64::Engine as _;
 627
 628        let bytes = base64::engine::general_purpose::STANDARD
 629            .decode(image_content.data.as_bytes())
 630            .ok()?;
 631        let format = gpui::ImageFormat::from_mime_type(&image_content.mime_type)?;
 632        Some(Arc::new(gpui::Image::from_bytes(format, bytes)))
 633    }
 634
 635    fn create_markdown_block(
 636        content: String,
 637        language_registry: &Arc<LanguageRegistry>,
 638        cx: &mut App,
 639    ) -> ContentBlock {
 640        ContentBlock::Markdown {
 641            markdown: cx
 642                .new(|cx| Markdown::new(content.into(), Some(language_registry.clone()), None, cx)),
 643        }
 644    }
 645
 646    fn block_string_contents(block: &acp::ContentBlock, path_style: PathStyle) -> String {
 647        match block {
 648            acp::ContentBlock::Text(text_content) => text_content.text.clone(),
 649            acp::ContentBlock::ResourceLink(resource_link) => {
 650                Self::resource_link_md(&resource_link.uri, path_style)
 651            }
 652            acp::ContentBlock::Resource(acp::EmbeddedResource {
 653                resource:
 654                    acp::EmbeddedResourceResource::TextResourceContents(acp::TextResourceContents {
 655                        uri,
 656                        ..
 657                    }),
 658                ..
 659            }) => Self::resource_link_md(uri, path_style),
 660            acp::ContentBlock::Image(image) => Self::image_md(image),
 661            _ => String::new(),
 662        }
 663    }
 664
 665    fn resource_link_md(uri: &str, path_style: PathStyle) -> String {
 666        if let Some(uri) = MentionUri::parse(uri, path_style).log_err() {
 667            uri.as_link().to_string()
 668        } else {
 669            uri.to_string()
 670        }
 671    }
 672
 673    fn image_md(_image: &acp::ImageContent) -> String {
 674        "`Image`".into()
 675    }
 676
 677    pub fn to_markdown<'a>(&'a self, cx: &'a App) -> &'a str {
 678        match self {
 679            ContentBlock::Empty => "",
 680            ContentBlock::Markdown { markdown } => markdown.read(cx).source(),
 681            ContentBlock::ResourceLink { resource_link } => &resource_link.uri,
 682            ContentBlock::Image { .. } => "`Image`",
 683        }
 684    }
 685
 686    pub fn markdown(&self) -> Option<&Entity<Markdown>> {
 687        match self {
 688            ContentBlock::Empty => None,
 689            ContentBlock::Markdown { markdown } => Some(markdown),
 690            ContentBlock::ResourceLink { .. } => None,
 691            ContentBlock::Image { .. } => None,
 692        }
 693    }
 694
 695    pub fn resource_link(&self) -> Option<&acp::ResourceLink> {
 696        match self {
 697            ContentBlock::ResourceLink { resource_link } => Some(resource_link),
 698            _ => None,
 699        }
 700    }
 701
 702    pub fn image(&self) -> Option<&Arc<gpui::Image>> {
 703        match self {
 704            ContentBlock::Image { image } => Some(image),
 705            _ => None,
 706        }
 707    }
 708}
 709
 710#[derive(Debug)]
 711pub enum ToolCallContent {
 712    ContentBlock(ContentBlock),
 713    Diff(Entity<Diff>),
 714    Terminal(Entity<Terminal>),
 715}
 716
 717impl ToolCallContent {
 718    pub fn from_acp(
 719        content: acp::ToolCallContent,
 720        language_registry: Arc<LanguageRegistry>,
 721        path_style: PathStyle,
 722        terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
 723        cx: &mut App,
 724    ) -> Result<Option<Self>> {
 725        match content {
 726            acp::ToolCallContent::Content(acp::Content { content, .. }) => {
 727                Ok(Some(Self::ContentBlock(ContentBlock::new(
 728                    content,
 729                    &language_registry,
 730                    path_style,
 731                    cx,
 732                ))))
 733            }
 734            acp::ToolCallContent::Diff(diff) => Ok(Some(Self::Diff(cx.new(|cx| {
 735                Diff::finalized(
 736                    diff.path.to_string_lossy().into_owned(),
 737                    diff.old_text,
 738                    diff.new_text,
 739                    language_registry,
 740                    cx,
 741                )
 742            })))),
 743            acp::ToolCallContent::Terminal(acp::Terminal { terminal_id, .. }) => terminals
 744                .get(&terminal_id)
 745                .cloned()
 746                .map(|terminal| Some(Self::Terminal(terminal)))
 747                .ok_or_else(|| anyhow::anyhow!("Terminal with id `{}` not found", terminal_id)),
 748            _ => Ok(None),
 749        }
 750    }
 751
 752    pub fn update_from_acp(
 753        &mut self,
 754        new: acp::ToolCallContent,
 755        language_registry: Arc<LanguageRegistry>,
 756        path_style: PathStyle,
 757        terminals: &HashMap<acp::TerminalId, Entity<Terminal>>,
 758        cx: &mut App,
 759    ) -> Result<bool> {
 760        let needs_update = match (&self, &new) {
 761            (Self::Diff(old_diff), acp::ToolCallContent::Diff(new_diff)) => {
 762                old_diff.read(cx).needs_update(
 763                    new_diff.old_text.as_deref().unwrap_or(""),
 764                    &new_diff.new_text,
 765                    cx,
 766                )
 767            }
 768            _ => true,
 769        };
 770
 771        if let Some(update) = Self::from_acp(new, language_registry, path_style, terminals, cx)? {
 772            if needs_update {
 773                *self = update;
 774            }
 775            Ok(true)
 776        } else {
 777            Ok(false)
 778        }
 779    }
 780
 781    pub fn to_markdown(&self, cx: &App) -> String {
 782        match self {
 783            Self::ContentBlock(content) => content.to_markdown(cx).to_string(),
 784            Self::Diff(diff) => diff.read(cx).to_markdown(cx),
 785            Self::Terminal(terminal) => terminal.read(cx).to_markdown(cx),
 786        }
 787    }
 788
 789    pub fn image(&self) -> Option<&Arc<gpui::Image>> {
 790        match self {
 791            Self::ContentBlock(content) => content.image(),
 792            _ => None,
 793        }
 794    }
 795}
 796
 797#[derive(Debug, PartialEq)]
 798pub enum ToolCallUpdate {
 799    UpdateFields(acp::ToolCallUpdate),
 800    UpdateDiff(ToolCallUpdateDiff),
 801    UpdateTerminal(ToolCallUpdateTerminal),
 802}
 803
 804impl ToolCallUpdate {
 805    fn id(&self) -> &acp::ToolCallId {
 806        match self {
 807            Self::UpdateFields(update) => &update.tool_call_id,
 808            Self::UpdateDiff(diff) => &diff.id,
 809            Self::UpdateTerminal(terminal) => &terminal.id,
 810        }
 811    }
 812}
 813
 814impl From<acp::ToolCallUpdate> for ToolCallUpdate {
 815    fn from(update: acp::ToolCallUpdate) -> Self {
 816        Self::UpdateFields(update)
 817    }
 818}
 819
 820impl From<ToolCallUpdateDiff> for ToolCallUpdate {
 821    fn from(diff: ToolCallUpdateDiff) -> Self {
 822        Self::UpdateDiff(diff)
 823    }
 824}
 825
 826#[derive(Debug, PartialEq)]
 827pub struct ToolCallUpdateDiff {
 828    pub id: acp::ToolCallId,
 829    pub diff: Entity<Diff>,
 830}
 831
 832impl From<ToolCallUpdateTerminal> for ToolCallUpdate {
 833    fn from(terminal: ToolCallUpdateTerminal) -> Self {
 834        Self::UpdateTerminal(terminal)
 835    }
 836}
 837
 838#[derive(Debug, PartialEq)]
 839pub struct ToolCallUpdateTerminal {
 840    pub id: acp::ToolCallId,
 841    pub terminal: Entity<Terminal>,
 842}
 843
 844#[derive(Debug, Default)]
 845pub struct Plan {
 846    pub entries: Vec<PlanEntry>,
 847}
 848
 849#[derive(Debug)]
 850pub struct PlanStats<'a> {
 851    pub in_progress_entry: Option<&'a PlanEntry>,
 852    pub pending: u32,
 853    pub completed: u32,
 854}
 855
 856impl Plan {
 857    pub fn is_empty(&self) -> bool {
 858        self.entries.is_empty()
 859    }
 860
 861    pub fn stats(&self) -> PlanStats<'_> {
 862        let mut stats = PlanStats {
 863            in_progress_entry: None,
 864            pending: 0,
 865            completed: 0,
 866        };
 867
 868        for entry in &self.entries {
 869            match &entry.status {
 870                acp::PlanEntryStatus::Pending => {
 871                    stats.pending += 1;
 872                }
 873                acp::PlanEntryStatus::InProgress => {
 874                    stats.in_progress_entry = stats.in_progress_entry.or(Some(entry));
 875                }
 876                acp::PlanEntryStatus::Completed => {
 877                    stats.completed += 1;
 878                }
 879                _ => {}
 880            }
 881        }
 882
 883        stats
 884    }
 885}
 886
 887#[derive(Debug)]
 888pub struct PlanEntry {
 889    pub content: Entity<Markdown>,
 890    pub priority: acp::PlanEntryPriority,
 891    pub status: acp::PlanEntryStatus,
 892}
 893
 894impl PlanEntry {
 895    pub fn from_acp(entry: acp::PlanEntry, cx: &mut App) -> Self {
 896        Self {
 897            content: cx.new(|cx| Markdown::new(entry.content.into(), None, None, cx)),
 898            priority: entry.priority,
 899            status: entry.status,
 900        }
 901    }
 902}
 903
 904#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
 905pub struct TokenUsage {
 906    pub max_tokens: u64,
 907    pub used_tokens: u64,
 908    pub input_tokens: u64,
 909    pub output_tokens: u64,
 910    pub max_output_tokens: Option<u64>,
 911}
 912
 913pub const TOKEN_USAGE_WARNING_THRESHOLD: f32 = 0.8;
 914
 915impl TokenUsage {
 916    pub fn ratio(&self) -> TokenUsageRatio {
 917        #[cfg(debug_assertions)]
 918        let warning_threshold: f32 = std::env::var("ZED_THREAD_WARNING_THRESHOLD")
 919            .unwrap_or(TOKEN_USAGE_WARNING_THRESHOLD.to_string())
 920            .parse()
 921            .unwrap();
 922        #[cfg(not(debug_assertions))]
 923        let warning_threshold: f32 = TOKEN_USAGE_WARNING_THRESHOLD;
 924
 925        // When the maximum is unknown because there is no selected model,
 926        // avoid showing the token limit warning.
 927        if self.max_tokens == 0 {
 928            TokenUsageRatio::Normal
 929        } else if self.used_tokens >= self.max_tokens {
 930            TokenUsageRatio::Exceeded
 931        } else if self.used_tokens as f32 / self.max_tokens as f32 >= warning_threshold {
 932            TokenUsageRatio::Warning
 933        } else {
 934            TokenUsageRatio::Normal
 935        }
 936    }
 937}
 938
 939#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
 940pub enum TokenUsageRatio {
 941    Normal,
 942    Warning,
 943    Exceeded,
 944}
 945
 946#[derive(Debug, Clone)]
 947pub struct RetryStatus {
 948    pub last_error: SharedString,
 949    pub attempt: usize,
 950    pub max_attempts: usize,
 951    pub started_at: Instant,
 952    pub duration: Duration,
 953}
 954
 955struct RunningTurn {
 956    id: u32,
 957    send_task: Task<()>,
 958}
 959
 960pub struct AcpThread {
 961    session_id: acp::SessionId,
 962    work_dirs: Option<PathList>,
 963    parent_session_id: Option<acp::SessionId>,
 964    title: SharedString,
 965    provisional_title: Option<SharedString>,
 966    entries: Vec<AgentThreadEntry>,
 967    plan: Plan,
 968    project: Entity<Project>,
 969    action_log: Entity<ActionLog>,
 970    shared_buffers: HashMap<Entity<Buffer>, BufferSnapshot>,
 971    turn_id: u32,
 972    running_turn: Option<RunningTurn>,
 973    connection: Rc<dyn AgentConnection>,
 974    token_usage: Option<TokenUsage>,
 975    prompt_capabilities: acp::PromptCapabilities,
 976    _observe_prompt_capabilities: Task<anyhow::Result<()>>,
 977    terminals: HashMap<acp::TerminalId, Entity<Terminal>>,
 978    pending_terminal_output: HashMap<acp::TerminalId, Vec<Vec<u8>>>,
 979    pending_terminal_exit: HashMap<acp::TerminalId, acp::TerminalExitStatus>,
 980    had_error: bool,
 981    /// The user's unsent prompt text, persisted so it can be restored when reloading the thread.
 982    draft_prompt: Option<Vec<acp::ContentBlock>>,
 983    /// The initial scroll position for the thread view, set during session registration.
 984    ui_scroll_position: Option<gpui::ListOffset>,
 985    /// Buffer for smooth text streaming. Holds text that has been received from
 986    /// the model but not yet revealed in the UI. A timer task drains this buffer
 987    /// gradually to create a fluid typing effect instead of choppy chunk-at-a-time
 988    /// updates.
 989    streaming_text_buffer: Option<StreamingTextBuffer>,
 990}
 991
 992struct StreamingTextBuffer {
 993    /// Text received from the model but not yet appended to the Markdown source.
 994    pending: String,
 995    /// The number of bytes to reveal per timer turn.
 996    bytes_to_reveal_per_tick: usize,
 997    /// The Markdown entity being streamed into.
 998    target: Entity<Markdown>,
 999    /// Timer task that periodically moves text from `pending` into `source`.
1000    _reveal_task: Task<()>,
1001}
1002
1003impl StreamingTextBuffer {
1004    /// The number of milliseconds between each timer tick, controlling how quickly
1005    /// text is revealed.
1006    const TASK_UPDATE_MS: u64 = 16;
1007    /// The time in milliseconds to reveal the entire pending text.
1008    const REVEAL_TARGET: f32 = 200.0;
1009}
1010
1011impl From<&AcpThread> for ActionLogTelemetry {
1012    fn from(value: &AcpThread) -> Self {
1013        Self {
1014            agent_telemetry_id: value.connection().telemetry_id(),
1015            session_id: value.session_id.0.clone(),
1016        }
1017    }
1018}
1019
1020#[derive(Debug)]
1021pub enum AcpThreadEvent {
1022    NewEntry,
1023    TitleUpdated,
1024    TokenUsageUpdated,
1025    EntryUpdated(usize),
1026    EntriesRemoved(Range<usize>),
1027    ToolAuthorizationRequested(acp::ToolCallId),
1028    ToolAuthorizationReceived(acp::ToolCallId),
1029    Retry(RetryStatus),
1030    SubagentSpawned(acp::SessionId),
1031    Stopped(acp::StopReason),
1032    Error,
1033    LoadError(LoadError),
1034    PromptCapabilitiesUpdated,
1035    Refusal,
1036    AvailableCommandsUpdated(Vec<acp::AvailableCommand>),
1037    ModeUpdated(acp::SessionModeId),
1038    ConfigOptionsUpdated(Vec<acp::SessionConfigOption>),
1039}
1040
1041impl EventEmitter<AcpThreadEvent> for AcpThread {}
1042
1043#[derive(Debug, Clone)]
1044pub enum TerminalProviderEvent {
1045    Created {
1046        terminal_id: acp::TerminalId,
1047        label: String,
1048        cwd: Option<PathBuf>,
1049        output_byte_limit: Option<u64>,
1050        terminal: Entity<::terminal::Terminal>,
1051    },
1052    Output {
1053        terminal_id: acp::TerminalId,
1054        data: Vec<u8>,
1055    },
1056    TitleChanged {
1057        terminal_id: acp::TerminalId,
1058        title: String,
1059    },
1060    Exit {
1061        terminal_id: acp::TerminalId,
1062        status: acp::TerminalExitStatus,
1063    },
1064}
1065
1066#[derive(Debug, Clone)]
1067pub enum TerminalProviderCommand {
1068    WriteInput {
1069        terminal_id: acp::TerminalId,
1070        bytes: Vec<u8>,
1071    },
1072    Resize {
1073        terminal_id: acp::TerminalId,
1074        cols: u16,
1075        rows: u16,
1076    },
1077    Close {
1078        terminal_id: acp::TerminalId,
1079    },
1080}
1081
1082#[derive(PartialEq, Eq, Debug)]
1083pub enum ThreadStatus {
1084    Idle,
1085    Generating,
1086}
1087
1088#[derive(Debug, Clone)]
1089pub enum LoadError {
1090    Unsupported {
1091        command: SharedString,
1092        current_version: SharedString,
1093        minimum_version: SharedString,
1094    },
1095    FailedToInstall(SharedString),
1096    Exited {
1097        status: ExitStatus,
1098    },
1099    Other(SharedString),
1100}
1101
1102impl Display for LoadError {
1103    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1104        match self {
1105            LoadError::Unsupported {
1106                command: path,
1107                current_version,
1108                minimum_version,
1109            } => {
1110                write!(
1111                    f,
1112                    "version {current_version} from {path} is not supported (need at least {minimum_version})"
1113                )
1114            }
1115            LoadError::FailedToInstall(msg) => write!(f, "Failed to install: {msg}"),
1116            LoadError::Exited { status } => write!(f, "Server exited with status {status}"),
1117            LoadError::Other(msg) => write!(f, "{msg}"),
1118        }
1119    }
1120}
1121
1122impl Error for LoadError {}
1123
1124impl AcpThread {
1125    pub fn new(
1126        parent_session_id: Option<acp::SessionId>,
1127        title: impl Into<SharedString>,
1128        work_dirs: Option<PathList>,
1129        connection: Rc<dyn AgentConnection>,
1130        project: Entity<Project>,
1131        action_log: Entity<ActionLog>,
1132        session_id: acp::SessionId,
1133        mut prompt_capabilities_rx: watch::Receiver<acp::PromptCapabilities>,
1134        cx: &mut Context<Self>,
1135    ) -> Self {
1136        let prompt_capabilities = prompt_capabilities_rx.borrow().clone();
1137        let task = cx.spawn::<_, anyhow::Result<()>>(async move |this, cx| {
1138            loop {
1139                let caps = prompt_capabilities_rx.recv().await?;
1140                this.update(cx, |this, cx| {
1141                    this.prompt_capabilities = caps;
1142                    cx.emit(AcpThreadEvent::PromptCapabilitiesUpdated);
1143                })?;
1144            }
1145        });
1146
1147        Self {
1148            parent_session_id,
1149            work_dirs,
1150            action_log,
1151            shared_buffers: Default::default(),
1152            entries: Default::default(),
1153            plan: Default::default(),
1154            title: title.into(),
1155            provisional_title: None,
1156            project,
1157            running_turn: None,
1158            turn_id: 0,
1159            connection,
1160            session_id,
1161            token_usage: None,
1162            prompt_capabilities,
1163            _observe_prompt_capabilities: task,
1164            terminals: HashMap::default(),
1165            pending_terminal_output: HashMap::default(),
1166            pending_terminal_exit: HashMap::default(),
1167            had_error: false,
1168            draft_prompt: None,
1169            ui_scroll_position: None,
1170            streaming_text_buffer: None,
1171        }
1172    }
1173
1174    pub fn parent_session_id(&self) -> Option<&acp::SessionId> {
1175        self.parent_session_id.as_ref()
1176    }
1177
1178    pub fn prompt_capabilities(&self) -> acp::PromptCapabilities {
1179        self.prompt_capabilities.clone()
1180    }
1181
1182    pub fn draft_prompt(&self) -> Option<&[acp::ContentBlock]> {
1183        self.draft_prompt.as_deref()
1184    }
1185
1186    pub fn set_draft_prompt(&mut self, prompt: Option<Vec<acp::ContentBlock>>) {
1187        self.draft_prompt = prompt;
1188    }
1189
1190    pub fn ui_scroll_position(&self) -> Option<gpui::ListOffset> {
1191        self.ui_scroll_position
1192    }
1193
1194    pub fn set_ui_scroll_position(&mut self, position: Option<gpui::ListOffset>) {
1195        self.ui_scroll_position = position;
1196    }
1197
1198    pub fn connection(&self) -> &Rc<dyn AgentConnection> {
1199        &self.connection
1200    }
1201
1202    pub fn action_log(&self) -> &Entity<ActionLog> {
1203        &self.action_log
1204    }
1205
1206    pub fn project(&self) -> &Entity<Project> {
1207        &self.project
1208    }
1209
1210    pub fn title(&self) -> SharedString {
1211        self.provisional_title
1212            .clone()
1213            .unwrap_or_else(|| self.title.clone())
1214    }
1215
1216    pub fn has_provisional_title(&self) -> bool {
1217        self.provisional_title.is_some()
1218    }
1219
1220    pub fn entries(&self) -> &[AgentThreadEntry] {
1221        &self.entries
1222    }
1223
1224    pub fn session_id(&self) -> &acp::SessionId {
1225        &self.session_id
1226    }
1227
1228    pub fn work_dirs(&self) -> Option<&PathList> {
1229        self.work_dirs.as_ref()
1230    }
1231
1232    pub fn status(&self) -> ThreadStatus {
1233        if self.running_turn.is_some() {
1234            ThreadStatus::Generating
1235        } else {
1236            ThreadStatus::Idle
1237        }
1238    }
1239
1240    pub fn had_error(&self) -> bool {
1241        self.had_error
1242    }
1243
1244    pub fn is_waiting_for_confirmation(&self) -> bool {
1245        for entry in self.entries.iter().rev() {
1246            match entry {
1247                AgentThreadEntry::UserMessage(_) => return false,
1248                AgentThreadEntry::ToolCall(ToolCall {
1249                    status: ToolCallStatus::WaitingForConfirmation { .. },
1250                    ..
1251                }) => return true,
1252                AgentThreadEntry::ToolCall(_) | AgentThreadEntry::AssistantMessage(_) => {}
1253            }
1254        }
1255        false
1256    }
1257
1258    pub fn token_usage(&self) -> Option<&TokenUsage> {
1259        self.token_usage.as_ref()
1260    }
1261
1262    pub fn has_pending_edit_tool_calls(&self) -> bool {
1263        for entry in self.entries.iter().rev() {
1264            match entry {
1265                AgentThreadEntry::UserMessage(_) => return false,
1266                AgentThreadEntry::ToolCall(
1267                    call @ ToolCall {
1268                        status: ToolCallStatus::InProgress | ToolCallStatus::Pending,
1269                        ..
1270                    },
1271                ) if call.diffs().next().is_some() => {
1272                    return true;
1273                }
1274                AgentThreadEntry::ToolCall(_) | AgentThreadEntry::AssistantMessage(_) => {}
1275            }
1276        }
1277
1278        false
1279    }
1280
1281    pub fn has_in_progress_tool_calls(&self) -> bool {
1282        for entry in self.entries.iter().rev() {
1283            match entry {
1284                AgentThreadEntry::UserMessage(_) => return false,
1285                AgentThreadEntry::ToolCall(ToolCall {
1286                    status: ToolCallStatus::InProgress | ToolCallStatus::Pending,
1287                    ..
1288                }) => {
1289                    return true;
1290                }
1291                AgentThreadEntry::ToolCall(_) | AgentThreadEntry::AssistantMessage(_) => {}
1292            }
1293        }
1294
1295        false
1296    }
1297
1298    pub fn used_tools_since_last_user_message(&self) -> bool {
1299        for entry in self.entries.iter().rev() {
1300            match entry {
1301                AgentThreadEntry::UserMessage(..) => return false,
1302                AgentThreadEntry::AssistantMessage(..) => continue,
1303                AgentThreadEntry::ToolCall(..) => return true,
1304            }
1305        }
1306
1307        false
1308    }
1309
1310    pub fn handle_session_update(
1311        &mut self,
1312        update: acp::SessionUpdate,
1313        cx: &mut Context<Self>,
1314    ) -> Result<(), acp::Error> {
1315        match update {
1316            acp::SessionUpdate::UserMessageChunk(acp::ContentChunk { content, .. }) => {
1317                self.push_user_content_block(None, content, cx);
1318            }
1319            acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk { content, .. }) => {
1320                self.push_assistant_content_block(content, false, cx);
1321            }
1322            acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk { content, .. }) => {
1323                self.push_assistant_content_block(content, true, cx);
1324            }
1325            acp::SessionUpdate::ToolCall(tool_call) => {
1326                self.upsert_tool_call(tool_call, cx)?;
1327            }
1328            acp::SessionUpdate::ToolCallUpdate(tool_call_update) => {
1329                self.update_tool_call(tool_call_update, cx)?;
1330            }
1331            acp::SessionUpdate::Plan(plan) => {
1332                self.update_plan(plan, cx);
1333            }
1334            acp::SessionUpdate::AvailableCommandsUpdate(acp::AvailableCommandsUpdate {
1335                available_commands,
1336                ..
1337            }) => cx.emit(AcpThreadEvent::AvailableCommandsUpdated(available_commands)),
1338            acp::SessionUpdate::CurrentModeUpdate(acp::CurrentModeUpdate {
1339                current_mode_id,
1340                ..
1341            }) => cx.emit(AcpThreadEvent::ModeUpdated(current_mode_id)),
1342            acp::SessionUpdate::ConfigOptionUpdate(acp::ConfigOptionUpdate {
1343                config_options,
1344                ..
1345            }) => cx.emit(AcpThreadEvent::ConfigOptionsUpdated(config_options)),
1346            _ => {}
1347        }
1348        Ok(())
1349    }
1350
1351    pub fn push_user_content_block(
1352        &mut self,
1353        message_id: Option<UserMessageId>,
1354        chunk: acp::ContentBlock,
1355        cx: &mut Context<Self>,
1356    ) {
1357        self.push_user_content_block_with_indent(message_id, chunk, false, cx)
1358    }
1359
1360    pub fn push_user_content_block_with_indent(
1361        &mut self,
1362        message_id: Option<UserMessageId>,
1363        chunk: acp::ContentBlock,
1364        indented: bool,
1365        cx: &mut Context<Self>,
1366    ) {
1367        let language_registry = self.project.read(cx).languages().clone();
1368        let path_style = self.project.read(cx).path_style(cx);
1369        let entries_len = self.entries.len();
1370
1371        if let Some(last_entry) = self.entries.last_mut()
1372            && let AgentThreadEntry::UserMessage(UserMessage {
1373                id,
1374                content,
1375                chunks,
1376                indented: existing_indented,
1377                ..
1378            }) = last_entry
1379            && *existing_indented == indented
1380        {
1381            Self::flush_streaming_text(&mut self.streaming_text_buffer, cx);
1382            *id = message_id.or(id.take());
1383            content.append(chunk.clone(), &language_registry, path_style, cx);
1384            chunks.push(chunk);
1385            let idx = entries_len - 1;
1386            cx.emit(AcpThreadEvent::EntryUpdated(idx));
1387        } else {
1388            let content = ContentBlock::new(chunk.clone(), &language_registry, path_style, cx);
1389            self.push_entry(
1390                AgentThreadEntry::UserMessage(UserMessage {
1391                    id: message_id,
1392                    content,
1393                    chunks: vec![chunk],
1394                    checkpoint: None,
1395                    indented,
1396                }),
1397                cx,
1398            );
1399        }
1400    }
1401
1402    pub fn push_assistant_content_block(
1403        &mut self,
1404        chunk: acp::ContentBlock,
1405        is_thought: bool,
1406        cx: &mut Context<Self>,
1407    ) {
1408        self.push_assistant_content_block_with_indent(chunk, is_thought, false, cx)
1409    }
1410
1411    pub fn push_assistant_content_block_with_indent(
1412        &mut self,
1413        chunk: acp::ContentBlock,
1414        is_thought: bool,
1415        indented: bool,
1416        cx: &mut Context<Self>,
1417    ) {
1418        let path_style = self.project.read(cx).path_style(cx);
1419
1420        // For text chunks going to an existing Markdown block, buffer for smooth
1421        // streaming instead of appending all at once which may feel more choppy.
1422        if let acp::ContentBlock::Text(text_content) = &chunk {
1423            if let Some(markdown) = self.streaming_markdown_target(is_thought, indented) {
1424                let entries_len = self.entries.len();
1425                cx.emit(AcpThreadEvent::EntryUpdated(entries_len - 1));
1426                self.buffer_streaming_text(&markdown, text_content.text.clone(), cx);
1427                return;
1428            }
1429        }
1430
1431        let language_registry = self.project.read(cx).languages().clone();
1432        let entries_len = self.entries.len();
1433        if let Some(last_entry) = self.entries.last_mut()
1434            && let AgentThreadEntry::AssistantMessage(AssistantMessage {
1435                chunks,
1436                indented: existing_indented,
1437                is_subagent_output: _,
1438            }) = last_entry
1439            && *existing_indented == indented
1440        {
1441            let idx = entries_len - 1;
1442            Self::flush_streaming_text(&mut self.streaming_text_buffer, cx);
1443            cx.emit(AcpThreadEvent::EntryUpdated(idx));
1444            match (chunks.last_mut(), is_thought) {
1445                (Some(AssistantMessageChunk::Message { block }), false)
1446                | (Some(AssistantMessageChunk::Thought { block }), true) => {
1447                    block.append(chunk, &language_registry, path_style, cx)
1448                }
1449                _ => {
1450                    let block = ContentBlock::new(chunk, &language_registry, path_style, cx);
1451                    if is_thought {
1452                        chunks.push(AssistantMessageChunk::Thought { block })
1453                    } else {
1454                        chunks.push(AssistantMessageChunk::Message { block })
1455                    }
1456                }
1457            }
1458        } else {
1459            let block = ContentBlock::new(chunk, &language_registry, path_style, cx);
1460            let chunk = if is_thought {
1461                AssistantMessageChunk::Thought { block }
1462            } else {
1463                AssistantMessageChunk::Message { block }
1464            };
1465
1466            self.push_entry(
1467                AgentThreadEntry::AssistantMessage(AssistantMessage {
1468                    chunks: vec![chunk],
1469                    indented,
1470                    is_subagent_output: false,
1471                }),
1472                cx,
1473            );
1474        }
1475    }
1476
1477    fn streaming_markdown_target(
1478        &self,
1479        is_thought: bool,
1480        indented: bool,
1481    ) -> Option<Entity<Markdown>> {
1482        let last_entry = self.entries.last()?;
1483        if let AgentThreadEntry::AssistantMessage(AssistantMessage {
1484            chunks,
1485            indented: existing_indented,
1486            ..
1487        }) = last_entry
1488            && *existing_indented == indented
1489            && let [.., chunk] = chunks.as_slice()
1490        {
1491            match (chunk, is_thought) {
1492                (
1493                    AssistantMessageChunk::Message {
1494                        block: ContentBlock::Markdown { markdown },
1495                    },
1496                    false,
1497                )
1498                | (
1499                    AssistantMessageChunk::Thought {
1500                        block: ContentBlock::Markdown { markdown },
1501                    },
1502                    true,
1503                ) => Some(markdown.clone()),
1504                _ => None,
1505            }
1506        } else {
1507            None
1508        }
1509    }
1510
1511    /// Add text to the streaming buffer. If the target changed (e.g. switching
1512    /// from thoughts to message text), flush the old buffer first.
1513    fn buffer_streaming_text(
1514        &mut self,
1515        markdown: &Entity<Markdown>,
1516        text: String,
1517        cx: &mut Context<Self>,
1518    ) {
1519        if let Some(buffer) = &mut self.streaming_text_buffer {
1520            if buffer.target.entity_id() == markdown.entity_id() {
1521                buffer.pending.push_str(&text);
1522
1523                buffer.bytes_to_reveal_per_tick = (buffer.pending.len() as f32
1524                    / StreamingTextBuffer::REVEAL_TARGET
1525                    * StreamingTextBuffer::TASK_UPDATE_MS as f32)
1526                    .ceil() as usize;
1527                return;
1528            }
1529            Self::flush_streaming_text(&mut self.streaming_text_buffer, cx);
1530        }
1531
1532        let target = markdown.clone();
1533        let _reveal_task = self.start_streaming_reveal(cx);
1534        let pending_len = text.len();
1535        let bytes_to_reveal = (pending_len as f32 / StreamingTextBuffer::REVEAL_TARGET
1536            * StreamingTextBuffer::TASK_UPDATE_MS as f32)
1537            .ceil() as usize;
1538        self.streaming_text_buffer = Some(StreamingTextBuffer {
1539            pending: text,
1540            bytes_to_reveal_per_tick: bytes_to_reveal,
1541            target,
1542            _reveal_task,
1543        });
1544    }
1545
1546    /// Flush all buffered streaming text into the Markdown entity immediately.
1547    fn flush_streaming_text(
1548        streaming_text_buffer: &mut Option<StreamingTextBuffer>,
1549        cx: &mut Context<Self>,
1550    ) {
1551        if let Some(buffer) = streaming_text_buffer.take() {
1552            if !buffer.pending.is_empty() {
1553                buffer
1554                    .target
1555                    .update(cx, |markdown, cx| markdown.append(&buffer.pending, cx));
1556            }
1557        }
1558    }
1559
1560    /// Spawns a foreground task that periodically drains
1561    /// `streaming_text_buffer.pending` into the target `Markdown` entity,
1562    /// producing smooth, continuous text output.
1563    fn start_streaming_reveal(&self, cx: &mut Context<Self>) -> Task<()> {
1564        cx.spawn(async move |this, cx| {
1565            loop {
1566                cx.background_executor()
1567                    .timer(Duration::from_millis(StreamingTextBuffer::TASK_UPDATE_MS))
1568                    .await;
1569
1570                let should_continue = this
1571                    .update(cx, |this, cx| {
1572                        let Some(buffer) = &mut this.streaming_text_buffer else {
1573                            return false;
1574                        };
1575
1576                        if buffer.pending.is_empty() {
1577                            return true;
1578                        }
1579
1580                        let pending_len = buffer.pending.len();
1581
1582                        let byte_boundary = buffer
1583                            .pending
1584                            .ceil_char_boundary(buffer.bytes_to_reveal_per_tick)
1585                            .min(pending_len);
1586
1587                        buffer.target.update(cx, |markdown: &mut Markdown, cx| {
1588                            markdown.append(&buffer.pending[..byte_boundary], cx);
1589                            buffer.pending.drain(..byte_boundary);
1590                        });
1591
1592                        true
1593                    })
1594                    .unwrap_or(false);
1595
1596                if !should_continue {
1597                    break;
1598                }
1599            }
1600        })
1601    }
1602
1603    fn push_entry(&mut self, entry: AgentThreadEntry, cx: &mut Context<Self>) {
1604        Self::flush_streaming_text(&mut self.streaming_text_buffer, cx);
1605        self.entries.push(entry);
1606        cx.emit(AcpThreadEvent::NewEntry);
1607    }
1608
1609    pub fn can_set_title(&mut self, cx: &mut Context<Self>) -> bool {
1610        self.connection.set_title(&self.session_id, cx).is_some()
1611    }
1612
1613    pub fn set_title(&mut self, title: SharedString, cx: &mut Context<Self>) -> Task<Result<()>> {
1614        let had_provisional = self.provisional_title.take().is_some();
1615        if title != self.title {
1616            self.title = title.clone();
1617            cx.emit(AcpThreadEvent::TitleUpdated);
1618            if let Some(set_title) = self.connection.set_title(&self.session_id, cx) {
1619                return set_title.run(title, cx);
1620            }
1621        } else if had_provisional {
1622            cx.emit(AcpThreadEvent::TitleUpdated);
1623        }
1624        Task::ready(Ok(()))
1625    }
1626
1627    /// Sets a provisional display title without propagating back to the
1628    /// underlying agent connection. This is used for quick preview titles
1629    /// (e.g. first 20 chars of the user message) that should be shown
1630    /// immediately but replaced once the LLM generates a proper title via
1631    /// `set_title`.
1632    pub fn set_provisional_title(&mut self, title: SharedString, cx: &mut Context<Self>) {
1633        self.provisional_title = Some(title);
1634        cx.emit(AcpThreadEvent::TitleUpdated);
1635    }
1636
1637    pub fn subagent_spawned(&mut self, session_id: acp::SessionId, cx: &mut Context<Self>) {
1638        cx.emit(AcpThreadEvent::SubagentSpawned(session_id));
1639    }
1640
1641    pub fn update_token_usage(&mut self, usage: Option<TokenUsage>, cx: &mut Context<Self>) {
1642        self.token_usage = usage;
1643        cx.emit(AcpThreadEvent::TokenUsageUpdated);
1644    }
1645
1646    pub fn update_retry_status(&mut self, status: RetryStatus, cx: &mut Context<Self>) {
1647        cx.emit(AcpThreadEvent::Retry(status));
1648    }
1649
1650    pub fn update_tool_call(
1651        &mut self,
1652        update: impl Into<ToolCallUpdate>,
1653        cx: &mut Context<Self>,
1654    ) -> Result<()> {
1655        let update = update.into();
1656        let languages = self.project.read(cx).languages().clone();
1657        let path_style = self.project.read(cx).path_style(cx);
1658
1659        let ix = match self.index_for_tool_call(update.id()) {
1660            Some(ix) => ix,
1661            None => {
1662                // Tool call not found - create a failed tool call entry
1663                let failed_tool_call = ToolCall {
1664                    id: update.id().clone(),
1665                    label: cx.new(|cx| Markdown::new("Tool call not found".into(), None, None, cx)),
1666                    kind: acp::ToolKind::Fetch,
1667                    content: vec![ToolCallContent::ContentBlock(ContentBlock::new(
1668                        "Tool call not found".into(),
1669                        &languages,
1670                        path_style,
1671                        cx,
1672                    ))],
1673                    status: ToolCallStatus::Failed,
1674                    locations: Vec::new(),
1675                    resolved_locations: Vec::new(),
1676                    raw_input: None,
1677                    raw_input_markdown: None,
1678                    raw_output: None,
1679                    tool_name: None,
1680                    subagent_session_info: None,
1681                };
1682                self.push_entry(AgentThreadEntry::ToolCall(failed_tool_call), cx);
1683                return Ok(());
1684            }
1685        };
1686        let AgentThreadEntry::ToolCall(call) = &mut self.entries[ix] else {
1687            unreachable!()
1688        };
1689
1690        match update {
1691            ToolCallUpdate::UpdateFields(update) => {
1692                let location_updated = update.fields.locations.is_some();
1693                call.update_fields(
1694                    update.fields,
1695                    update.meta,
1696                    languages,
1697                    path_style,
1698                    &self.terminals,
1699                    cx,
1700                )?;
1701                if location_updated {
1702                    self.resolve_locations(update.tool_call_id, cx);
1703                }
1704            }
1705            ToolCallUpdate::UpdateDiff(update) => {
1706                call.content.clear();
1707                call.content.push(ToolCallContent::Diff(update.diff));
1708            }
1709            ToolCallUpdate::UpdateTerminal(update) => {
1710                call.content.clear();
1711                call.content
1712                    .push(ToolCallContent::Terminal(update.terminal));
1713            }
1714        }
1715
1716        cx.emit(AcpThreadEvent::EntryUpdated(ix));
1717
1718        Ok(())
1719    }
1720
1721    /// Updates a tool call if id matches an existing entry, otherwise inserts a new one.
1722    pub fn upsert_tool_call(
1723        &mut self,
1724        tool_call: acp::ToolCall,
1725        cx: &mut Context<Self>,
1726    ) -> Result<(), acp::Error> {
1727        let status = tool_call.status.into();
1728        self.upsert_tool_call_inner(tool_call.into(), status, cx)
1729    }
1730
1731    /// Fails if id does not match an existing entry.
1732    pub fn upsert_tool_call_inner(
1733        &mut self,
1734        update: acp::ToolCallUpdate,
1735        status: ToolCallStatus,
1736        cx: &mut Context<Self>,
1737    ) -> Result<(), acp::Error> {
1738        let language_registry = self.project.read(cx).languages().clone();
1739        let path_style = self.project.read(cx).path_style(cx);
1740        let id = update.tool_call_id.clone();
1741
1742        let agent_telemetry_id = self.connection().telemetry_id();
1743        let session = self.session_id();
1744        let parent_session_id = self.parent_session_id();
1745        if let ToolCallStatus::Completed | ToolCallStatus::Failed = status {
1746            let status = if matches!(status, ToolCallStatus::Completed) {
1747                "completed"
1748            } else {
1749                "failed"
1750            };
1751            telemetry::event!(
1752                "Agent Tool Call Completed",
1753                agent_telemetry_id,
1754                session,
1755                parent_session_id,
1756                status
1757            );
1758        }
1759
1760        if let Some(ix) = self.index_for_tool_call(&id) {
1761            let AgentThreadEntry::ToolCall(call) = &mut self.entries[ix] else {
1762                unreachable!()
1763            };
1764
1765            call.update_fields(
1766                update.fields,
1767                update.meta,
1768                language_registry,
1769                path_style,
1770                &self.terminals,
1771                cx,
1772            )?;
1773            call.status = status;
1774
1775            cx.emit(AcpThreadEvent::EntryUpdated(ix));
1776        } else {
1777            let call = ToolCall::from_acp(
1778                update.try_into()?,
1779                status,
1780                language_registry,
1781                self.project.read(cx).path_style(cx),
1782                &self.terminals,
1783                cx,
1784            )?;
1785            self.push_entry(AgentThreadEntry::ToolCall(call), cx);
1786        };
1787
1788        self.resolve_locations(id, cx);
1789        Ok(())
1790    }
1791
1792    fn index_for_tool_call(&self, id: &acp::ToolCallId) -> Option<usize> {
1793        self.entries
1794            .iter()
1795            .enumerate()
1796            .rev()
1797            .find_map(|(index, entry)| {
1798                if let AgentThreadEntry::ToolCall(tool_call) = entry
1799                    && &tool_call.id == id
1800                {
1801                    Some(index)
1802                } else {
1803                    None
1804                }
1805            })
1806    }
1807
1808    fn tool_call_mut(&mut self, id: &acp::ToolCallId) -> Option<(usize, &mut ToolCall)> {
1809        // The tool call we are looking for is typically the last one, or very close to the end.
1810        // At the moment, it doesn't seem like a hashmap would be a good fit for this use case.
1811        self.entries
1812            .iter_mut()
1813            .enumerate()
1814            .rev()
1815            .find_map(|(index, tool_call)| {
1816                if let AgentThreadEntry::ToolCall(tool_call) = tool_call
1817                    && &tool_call.id == id
1818                {
1819                    Some((index, tool_call))
1820                } else {
1821                    None
1822                }
1823            })
1824    }
1825
1826    pub fn tool_call(&self, id: &acp::ToolCallId) -> Option<(usize, &ToolCall)> {
1827        self.entries
1828            .iter()
1829            .enumerate()
1830            .rev()
1831            .find_map(|(index, tool_call)| {
1832                if let AgentThreadEntry::ToolCall(tool_call) = tool_call
1833                    && &tool_call.id == id
1834                {
1835                    Some((index, tool_call))
1836                } else {
1837                    None
1838                }
1839            })
1840    }
1841
1842    pub fn tool_call_for_subagent(&self, session_id: &acp::SessionId) -> Option<&ToolCall> {
1843        self.entries.iter().find_map(|entry| match entry {
1844            AgentThreadEntry::ToolCall(tool_call) => {
1845                if let Some(subagent_session_info) = &tool_call.subagent_session_info
1846                    && &subagent_session_info.session_id == session_id
1847                {
1848                    Some(tool_call)
1849                } else {
1850                    None
1851                }
1852            }
1853            _ => None,
1854        })
1855    }
1856
1857    pub fn resolve_locations(&mut self, id: acp::ToolCallId, cx: &mut Context<Self>) {
1858        let project = self.project.clone();
1859        let should_update_agent_location = self.parent_session_id.is_none();
1860        let Some((_, tool_call)) = self.tool_call_mut(&id) else {
1861            return;
1862        };
1863        let task = tool_call.resolve_locations(project, cx);
1864        cx.spawn(async move |this, cx| {
1865            let resolved_locations = task.await;
1866
1867            this.update(cx, |this, cx| {
1868                let project = this.project.clone();
1869
1870                for location in resolved_locations.iter().flatten() {
1871                    this.shared_buffers
1872                        .insert(location.buffer.clone(), location.buffer.read(cx).snapshot());
1873                }
1874                let Some((ix, tool_call)) = this.tool_call_mut(&id) else {
1875                    return;
1876                };
1877
1878                if let Some(Some(location)) = resolved_locations.last() {
1879                    project.update(cx, |project, cx| {
1880                        let should_ignore = if let Some(agent_location) = project
1881                            .agent_location()
1882                            .filter(|agent_location| agent_location.buffer == location.buffer)
1883                        {
1884                            let snapshot = location.buffer.read(cx).snapshot();
1885                            let old_position = agent_location.position.to_point(&snapshot);
1886                            let new_position = location.position.to_point(&snapshot);
1887
1888                            // ignore this so that when we get updates from the edit tool
1889                            // the position doesn't reset to the startof line
1890                            old_position.row == new_position.row
1891                                && old_position.column > new_position.column
1892                        } else {
1893                            false
1894                        };
1895                        if !should_ignore && should_update_agent_location {
1896                            project.set_agent_location(Some(location.into()), cx);
1897                        }
1898                    });
1899                }
1900
1901                let resolved_locations = resolved_locations
1902                    .iter()
1903                    .map(|l| l.as_ref().map(|l| AgentLocation::from(l)))
1904                    .collect::<Vec<_>>();
1905
1906                if tool_call.resolved_locations != resolved_locations {
1907                    tool_call.resolved_locations = resolved_locations;
1908                    cx.emit(AcpThreadEvent::EntryUpdated(ix));
1909                }
1910            })
1911        })
1912        .detach();
1913    }
1914
1915    pub fn request_tool_call_authorization(
1916        &mut self,
1917        tool_call: acp::ToolCallUpdate,
1918        options: PermissionOptions,
1919        cx: &mut Context<Self>,
1920    ) -> Result<Task<acp::RequestPermissionOutcome>> {
1921        let (tx, rx) = oneshot::channel();
1922
1923        let status = ToolCallStatus::WaitingForConfirmation {
1924            options,
1925            respond_tx: tx,
1926        };
1927
1928        let tool_call_id = tool_call.tool_call_id.clone();
1929        self.upsert_tool_call_inner(tool_call, status, cx)?;
1930        cx.emit(AcpThreadEvent::ToolAuthorizationRequested(
1931            tool_call_id.clone(),
1932        ));
1933
1934        Ok(cx.spawn(async move |this, cx| {
1935            let outcome = match rx.await {
1936                Ok(option) => acp::RequestPermissionOutcome::Selected(
1937                    acp::SelectedPermissionOutcome::new(option),
1938                ),
1939                Err(oneshot::Canceled) => acp::RequestPermissionOutcome::Cancelled,
1940            };
1941            this.update(cx, |_this, cx| {
1942                cx.emit(AcpThreadEvent::ToolAuthorizationReceived(tool_call_id))
1943            })
1944            .ok();
1945            outcome
1946        }))
1947    }
1948
1949    pub fn authorize_tool_call(
1950        &mut self,
1951        id: acp::ToolCallId,
1952        option_id: acp::PermissionOptionId,
1953        option_kind: acp::PermissionOptionKind,
1954        cx: &mut Context<Self>,
1955    ) {
1956        let Some((ix, call)) = self.tool_call_mut(&id) else {
1957            return;
1958        };
1959
1960        let new_status = match option_kind {
1961            acp::PermissionOptionKind::RejectOnce | acp::PermissionOptionKind::RejectAlways => {
1962                ToolCallStatus::Rejected
1963            }
1964            acp::PermissionOptionKind::AllowOnce | acp::PermissionOptionKind::AllowAlways => {
1965                ToolCallStatus::InProgress
1966            }
1967            _ => ToolCallStatus::InProgress,
1968        };
1969
1970        let curr_status = mem::replace(&mut call.status, new_status);
1971
1972        if let ToolCallStatus::WaitingForConfirmation { respond_tx, .. } = curr_status {
1973            respond_tx.send(option_id).log_err();
1974        } else if cfg!(debug_assertions) {
1975            panic!("tried to authorize an already authorized tool call");
1976        }
1977
1978        cx.emit(AcpThreadEvent::EntryUpdated(ix));
1979    }
1980
1981    pub fn plan(&self) -> &Plan {
1982        &self.plan
1983    }
1984
1985    pub fn update_plan(&mut self, request: acp::Plan, cx: &mut Context<Self>) {
1986        let new_entries_len = request.entries.len();
1987        let mut new_entries = request.entries.into_iter();
1988
1989        // Reuse existing markdown to prevent flickering
1990        for (old, new) in self.plan.entries.iter_mut().zip(new_entries.by_ref()) {
1991            let PlanEntry {
1992                content,
1993                priority,
1994                status,
1995            } = old;
1996            content.update(cx, |old, cx| {
1997                old.replace(new.content, cx);
1998            });
1999            *priority = new.priority;
2000            *status = new.status;
2001        }
2002        for new in new_entries {
2003            self.plan.entries.push(PlanEntry::from_acp(new, cx))
2004        }
2005        self.plan.entries.truncate(new_entries_len);
2006
2007        cx.notify();
2008    }
2009
2010    fn clear_completed_plan_entries(&mut self, cx: &mut Context<Self>) {
2011        self.plan
2012            .entries
2013            .retain(|entry| !matches!(entry.status, acp::PlanEntryStatus::Completed));
2014        cx.notify();
2015    }
2016
2017    #[cfg(any(test, feature = "test-support"))]
2018    pub fn send_raw(
2019        &mut self,
2020        message: &str,
2021        cx: &mut Context<Self>,
2022    ) -> BoxFuture<'static, Result<Option<acp::PromptResponse>>> {
2023        self.send(vec![message.into()], cx)
2024    }
2025
2026    pub fn send(
2027        &mut self,
2028        message: Vec<acp::ContentBlock>,
2029        cx: &mut Context<Self>,
2030    ) -> BoxFuture<'static, Result<Option<acp::PromptResponse>>> {
2031        let block = ContentBlock::new_combined(
2032            message.clone(),
2033            self.project.read(cx).languages().clone(),
2034            self.project.read(cx).path_style(cx),
2035            cx,
2036        );
2037        let request = acp::PromptRequest::new(self.session_id.clone(), message.clone());
2038        let git_store = self.project.read(cx).git_store().clone();
2039
2040        let message_id = if self.connection.truncate(&self.session_id, cx).is_some() {
2041            Some(UserMessageId::new())
2042        } else {
2043            None
2044        };
2045
2046        self.run_turn(cx, async move |this, cx| {
2047            this.update(cx, |this, cx| {
2048                this.push_entry(
2049                    AgentThreadEntry::UserMessage(UserMessage {
2050                        id: message_id.clone(),
2051                        content: block,
2052                        chunks: message,
2053                        checkpoint: None,
2054                        indented: false,
2055                    }),
2056                    cx,
2057                );
2058            })
2059            .ok();
2060
2061            let old_checkpoint = git_store
2062                .update(cx, |git, cx| git.checkpoint(cx))
2063                .await
2064                .context("failed to get old checkpoint")
2065                .log_err();
2066            this.update(cx, |this, cx| {
2067                if let Some((_ix, message)) = this.last_user_message() {
2068                    message.checkpoint = old_checkpoint.map(|git_checkpoint| Checkpoint {
2069                        git_checkpoint,
2070                        show: false,
2071                    });
2072                }
2073                this.connection.prompt(message_id, request, cx)
2074            })?
2075            .await
2076        })
2077    }
2078
2079    pub fn can_retry(&self, cx: &App) -> bool {
2080        self.connection.retry(&self.session_id, cx).is_some()
2081    }
2082
2083    pub fn retry(
2084        &mut self,
2085        cx: &mut Context<Self>,
2086    ) -> BoxFuture<'static, Result<Option<acp::PromptResponse>>> {
2087        self.run_turn(cx, async move |this, cx| {
2088            this.update(cx, |this, cx| {
2089                this.connection
2090                    .retry(&this.session_id, cx)
2091                    .map(|retry| retry.run(cx))
2092            })?
2093            .context("retrying a session is not supported")?
2094            .await
2095        })
2096    }
2097
2098    fn run_turn(
2099        &mut self,
2100        cx: &mut Context<Self>,
2101        f: impl 'static + AsyncFnOnce(WeakEntity<Self>, &mut AsyncApp) -> Result<acp::PromptResponse>,
2102    ) -> BoxFuture<'static, Result<Option<acp::PromptResponse>>> {
2103        self.clear_completed_plan_entries(cx);
2104        self.had_error = false;
2105
2106        let (tx, rx) = oneshot::channel();
2107        let cancel_task = self.cancel(cx);
2108
2109        self.turn_id += 1;
2110        let turn_id = self.turn_id;
2111        self.running_turn = Some(RunningTurn {
2112            id: turn_id,
2113            send_task: cx.spawn(async move |this, cx| {
2114                cancel_task.await;
2115                tx.send(f(this, cx).await).ok();
2116            }),
2117        });
2118
2119        cx.spawn(async move |this, cx| {
2120            let response = rx.await;
2121
2122            this.update(cx, |this, cx| this.update_last_checkpoint(cx))?
2123                .await?;
2124
2125            this.update(cx, |this, cx| {
2126                if this.parent_session_id.is_none() {
2127                    this.project
2128                        .update(cx, |project, cx| project.set_agent_location(None, cx));
2129                }
2130                let Ok(response) = response else {
2131                    // tx dropped, just return
2132                    return Ok(None);
2133                };
2134
2135                let is_same_turn = this
2136                    .running_turn
2137                    .as_ref()
2138                    .is_some_and(|turn| turn_id == turn.id);
2139
2140                // If the user submitted a follow up message, running_turn might
2141                // already point to a different turn. Therefore we only want to
2142                // take the task if it's the same turn.
2143                if is_same_turn {
2144                    this.running_turn.take();
2145                }
2146
2147                match response {
2148                    Ok(r) => {
2149                        Self::flush_streaming_text(&mut this.streaming_text_buffer, cx);
2150
2151                        if r.stop_reason == acp::StopReason::MaxTokens {
2152                            this.had_error = true;
2153                            cx.emit(AcpThreadEvent::Error);
2154                            log::error!("Max tokens reached. Usage: {:?}", this.token_usage);
2155                            return Err(anyhow!("Max tokens reached"));
2156                        }
2157
2158                        let canceled = matches!(r.stop_reason, acp::StopReason::Cancelled);
2159                        if canceled {
2160                            this.mark_pending_tools_as_canceled();
2161                        }
2162
2163                        // Handle refusal - distinguish between user prompt and tool call refusals
2164                        if let acp::StopReason::Refusal = r.stop_reason {
2165                            this.had_error = true;
2166                            if let Some((user_msg_ix, _)) = this.last_user_message() {
2167                                // Check if there's a completed tool call with results after the last user message
2168                                // This indicates the refusal is in response to tool output, not the user's prompt
2169                                let has_completed_tool_call_after_user_msg =
2170                                    this.entries.iter().skip(user_msg_ix + 1).any(|entry| {
2171                                        if let AgentThreadEntry::ToolCall(tool_call) = entry {
2172                                            // Check if the tool call has completed and has output
2173                                            matches!(tool_call.status, ToolCallStatus::Completed)
2174                                                && tool_call.raw_output.is_some()
2175                                        } else {
2176                                            false
2177                                        }
2178                                    });
2179
2180                                if has_completed_tool_call_after_user_msg {
2181                                    // Refusal is due to tool output - don't truncate, just notify
2182                                    // The model refused based on what the tool returned
2183                                    cx.emit(AcpThreadEvent::Refusal);
2184                                } else {
2185                                    // User prompt was refused - truncate back to before the user message
2186                                    let range = user_msg_ix..this.entries.len();
2187                                    if range.start < range.end {
2188                                        this.entries.truncate(user_msg_ix);
2189                                        cx.emit(AcpThreadEvent::EntriesRemoved(range));
2190                                    }
2191                                    cx.emit(AcpThreadEvent::Refusal);
2192                                }
2193                            } else {
2194                                // No user message found, treat as general refusal
2195                                cx.emit(AcpThreadEvent::Refusal);
2196                            }
2197                        }
2198
2199                        cx.emit(AcpThreadEvent::Stopped(r.stop_reason));
2200                        Ok(Some(r))
2201                    }
2202                    Err(e) => {
2203                        Self::flush_streaming_text(&mut this.streaming_text_buffer, cx);
2204
2205                        this.had_error = true;
2206                        cx.emit(AcpThreadEvent::Error);
2207                        log::error!("Error in run turn: {:?}", e);
2208                        Err(e)
2209                    }
2210                }
2211            })?
2212        })
2213        .boxed()
2214    }
2215
2216    pub fn cancel(&mut self, cx: &mut Context<Self>) -> Task<()> {
2217        let Some(turn) = self.running_turn.take() else {
2218            return Task::ready(());
2219        };
2220        self.connection.cancel(&self.session_id, cx);
2221
2222        Self::flush_streaming_text(&mut self.streaming_text_buffer, cx);
2223        self.mark_pending_tools_as_canceled();
2224
2225        // Wait for the send task to complete
2226        cx.background_spawn(turn.send_task)
2227    }
2228
2229    fn mark_pending_tools_as_canceled(&mut self) {
2230        for entry in self.entries.iter_mut() {
2231            if let AgentThreadEntry::ToolCall(call) = entry {
2232                let cancel = matches!(
2233                    call.status,
2234                    ToolCallStatus::Pending
2235                        | ToolCallStatus::WaitingForConfirmation { .. }
2236                        | ToolCallStatus::InProgress
2237                );
2238
2239                if cancel {
2240                    call.status = ToolCallStatus::Canceled;
2241                }
2242            }
2243        }
2244    }
2245
2246    /// Restores the git working tree to the state at the given checkpoint (if one exists)
2247    pub fn restore_checkpoint(
2248        &mut self,
2249        id: UserMessageId,
2250        cx: &mut Context<Self>,
2251    ) -> Task<Result<()>> {
2252        let Some((_, message)) = self.user_message_mut(&id) else {
2253            return Task::ready(Err(anyhow!("message not found")));
2254        };
2255
2256        let checkpoint = message
2257            .checkpoint
2258            .as_ref()
2259            .map(|c| c.git_checkpoint.clone());
2260
2261        // Cancel any in-progress generation before restoring
2262        let cancel_task = self.cancel(cx);
2263        let rewind = self.rewind(id.clone(), cx);
2264        let git_store = self.project.read(cx).git_store().clone();
2265
2266        cx.spawn(async move |_, cx| {
2267            cancel_task.await;
2268            rewind.await?;
2269            if let Some(checkpoint) = checkpoint {
2270                git_store
2271                    .update(cx, |git, cx| git.restore_checkpoint(checkpoint, cx))
2272                    .await?;
2273            }
2274
2275            Ok(())
2276        })
2277    }
2278
2279    /// Rewinds this thread to before the entry at `index`, removing it and all
2280    /// subsequent entries while rejecting any action_log changes made from that point.
2281    /// Unlike `restore_checkpoint`, this method does not restore from git.
2282    pub fn rewind(&mut self, id: UserMessageId, cx: &mut Context<Self>) -> Task<Result<()>> {
2283        let Some(truncate) = self.connection.truncate(&self.session_id, cx) else {
2284            return Task::ready(Err(anyhow!("not supported")));
2285        };
2286
2287        Self::flush_streaming_text(&mut self.streaming_text_buffer, cx);
2288        let telemetry = ActionLogTelemetry::from(&*self);
2289        cx.spawn(async move |this, cx| {
2290            cx.update(|cx| truncate.run(id.clone(), cx)).await?;
2291            this.update(cx, |this, cx| {
2292                if let Some((ix, _)) = this.user_message_mut(&id) {
2293                    // Collect all terminals from entries that will be removed
2294                    let terminals_to_remove: Vec<acp::TerminalId> = this.entries[ix..]
2295                        .iter()
2296                        .flat_map(|entry| entry.terminals())
2297                        .filter_map(|terminal| terminal.read(cx).id().clone().into())
2298                        .collect();
2299
2300                    let range = ix..this.entries.len();
2301                    this.entries.truncate(ix);
2302                    cx.emit(AcpThreadEvent::EntriesRemoved(range));
2303
2304                    // Kill and remove the terminals
2305                    for terminal_id in terminals_to_remove {
2306                        if let Some(terminal) = this.terminals.remove(&terminal_id) {
2307                            terminal.update(cx, |terminal, cx| {
2308                                terminal.kill(cx);
2309                            });
2310                        }
2311                    }
2312                }
2313                this.action_log().update(cx, |action_log, cx| {
2314                    action_log.reject_all_edits(Some(telemetry), cx)
2315                })
2316            })?
2317            .await;
2318            Ok(())
2319        })
2320    }
2321
2322    fn update_last_checkpoint(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
2323        let git_store = self.project.read(cx).git_store().clone();
2324
2325        let Some((_, message)) = self.last_user_message() else {
2326            return Task::ready(Ok(()));
2327        };
2328        let Some(user_message_id) = message.id.clone() else {
2329            return Task::ready(Ok(()));
2330        };
2331        let Some(checkpoint) = message.checkpoint.as_ref() else {
2332            return Task::ready(Ok(()));
2333        };
2334        let old_checkpoint = checkpoint.git_checkpoint.clone();
2335
2336        let new_checkpoint = git_store.update(cx, |git, cx| git.checkpoint(cx));
2337        cx.spawn(async move |this, cx| {
2338            let Some(new_checkpoint) = new_checkpoint
2339                .await
2340                .context("failed to get new checkpoint")
2341                .log_err()
2342            else {
2343                return Ok(());
2344            };
2345
2346            let equal = git_store
2347                .update(cx, |git, cx| {
2348                    git.compare_checkpoints(old_checkpoint.clone(), new_checkpoint, cx)
2349                })
2350                .await
2351                .unwrap_or(true);
2352
2353            this.update(cx, |this, cx| {
2354                if let Some((ix, message)) = this.user_message_mut(&user_message_id) {
2355                    if let Some(checkpoint) = message.checkpoint.as_mut() {
2356                        checkpoint.show = !equal;
2357                        cx.emit(AcpThreadEvent::EntryUpdated(ix));
2358                    }
2359                }
2360            })?;
2361
2362            Ok(())
2363        })
2364    }
2365
2366    fn last_user_message(&mut self) -> Option<(usize, &mut UserMessage)> {
2367        self.entries
2368            .iter_mut()
2369            .enumerate()
2370            .rev()
2371            .find_map(|(ix, entry)| {
2372                if let AgentThreadEntry::UserMessage(message) = entry {
2373                    Some((ix, message))
2374                } else {
2375                    None
2376                }
2377            })
2378    }
2379
2380    fn user_message_mut(&mut self, id: &UserMessageId) -> Option<(usize, &mut UserMessage)> {
2381        self.entries.iter_mut().enumerate().find_map(|(ix, entry)| {
2382            if let AgentThreadEntry::UserMessage(message) = entry {
2383                if message.id.as_ref() == Some(id) {
2384                    Some((ix, message))
2385                } else {
2386                    None
2387                }
2388            } else {
2389                None
2390            }
2391        })
2392    }
2393
2394    pub fn read_text_file(
2395        &self,
2396        path: PathBuf,
2397        line: Option<u32>,
2398        limit: Option<u32>,
2399        reuse_shared_snapshot: bool,
2400        cx: &mut Context<Self>,
2401    ) -> Task<Result<String, acp::Error>> {
2402        // Args are 1-based, move to 0-based
2403        let line = line.unwrap_or_default().saturating_sub(1);
2404        let limit = limit.unwrap_or(u32::MAX);
2405        let project = self.project.clone();
2406        let action_log = self.action_log.clone();
2407        let should_update_agent_location = self.parent_session_id.is_none();
2408        cx.spawn(async move |this, cx| {
2409            let load = project.update(cx, |project, cx| {
2410                let path = project
2411                    .project_path_for_absolute_path(&path, cx)
2412                    .ok_or_else(|| {
2413                        acp::Error::resource_not_found(Some(path.display().to_string()))
2414                    })?;
2415                Ok::<_, acp::Error>(project.open_buffer(path, cx))
2416            })?;
2417
2418            let buffer = load.await?;
2419
2420            let snapshot = if reuse_shared_snapshot {
2421                this.read_with(cx, |this, _| {
2422                    this.shared_buffers.get(&buffer.clone()).cloned()
2423                })
2424                .log_err()
2425                .flatten()
2426            } else {
2427                None
2428            };
2429
2430            let snapshot = if let Some(snapshot) = snapshot {
2431                snapshot
2432            } else {
2433                action_log.update(cx, |action_log, cx| {
2434                    action_log.buffer_read(buffer.clone(), cx);
2435                });
2436
2437                let snapshot = buffer.update(cx, |buffer, _| buffer.snapshot());
2438                this.update(cx, |this, _| {
2439                    this.shared_buffers.insert(buffer.clone(), snapshot.clone());
2440                })?;
2441                snapshot
2442            };
2443
2444            let max_point = snapshot.max_point();
2445            let start_position = Point::new(line, 0);
2446
2447            if start_position > max_point {
2448                return Err(acp::Error::invalid_params().data(format!(
2449                    "Attempting to read beyond the end of the file, line {}:{}",
2450                    max_point.row + 1,
2451                    max_point.column
2452                )));
2453            }
2454
2455            let start = snapshot.anchor_before(start_position);
2456            let end = snapshot.anchor_before(Point::new(line.saturating_add(limit), 0));
2457
2458            if should_update_agent_location {
2459                project.update(cx, |project, cx| {
2460                    project.set_agent_location(
2461                        Some(AgentLocation {
2462                            buffer: buffer.downgrade(),
2463                            position: start,
2464                        }),
2465                        cx,
2466                    );
2467                });
2468            }
2469
2470            Ok(snapshot.text_for_range(start..end).collect::<String>())
2471        })
2472    }
2473
2474    pub fn write_text_file(
2475        &self,
2476        path: PathBuf,
2477        content: String,
2478        cx: &mut Context<Self>,
2479    ) -> Task<Result<()>> {
2480        let project = self.project.clone();
2481        let action_log = self.action_log.clone();
2482        let should_update_agent_location = self.parent_session_id.is_none();
2483        cx.spawn(async move |this, cx| {
2484            let load = project.update(cx, |project, cx| {
2485                let path = project
2486                    .project_path_for_absolute_path(&path, cx)
2487                    .context("invalid path")?;
2488                anyhow::Ok(project.open_buffer(path, cx))
2489            });
2490            let buffer = load?.await?;
2491            let snapshot = this.update(cx, |this, cx| {
2492                this.shared_buffers
2493                    .get(&buffer)
2494                    .cloned()
2495                    .unwrap_or_else(|| buffer.read(cx).snapshot())
2496            })?;
2497            let edits = cx
2498                .background_executor()
2499                .spawn(async move {
2500                    let old_text = snapshot.text();
2501                    text_diff(old_text.as_str(), &content)
2502                        .into_iter()
2503                        .map(|(range, replacement)| {
2504                            (snapshot.anchor_range_around(range), replacement)
2505                        })
2506                        .collect::<Vec<_>>()
2507                })
2508                .await;
2509
2510            if should_update_agent_location {
2511                project.update(cx, |project, cx| {
2512                    project.set_agent_location(
2513                        Some(AgentLocation {
2514                            buffer: buffer.downgrade(),
2515                            position: edits
2516                                .last()
2517                                .map(|(range, _)| range.end)
2518                                .unwrap_or(Anchor::min_for_buffer(buffer.read(cx).remote_id())),
2519                        }),
2520                        cx,
2521                    );
2522                });
2523            }
2524
2525            let format_on_save = cx.update(|cx| {
2526                action_log.update(cx, |action_log, cx| {
2527                    action_log.buffer_read(buffer.clone(), cx);
2528                });
2529
2530                let format_on_save = buffer.update(cx, |buffer, cx| {
2531                    buffer.edit(edits, None, cx);
2532
2533                    let settings = language::language_settings::language_settings(
2534                        buffer.language().map(|l| l.name()),
2535                        buffer.file(),
2536                        cx,
2537                    );
2538
2539                    settings.format_on_save != FormatOnSave::Off
2540                });
2541                action_log.update(cx, |action_log, cx| {
2542                    action_log.buffer_edited(buffer.clone(), cx);
2543                });
2544                format_on_save
2545            });
2546
2547            if format_on_save {
2548                let format_task = project.update(cx, |project, cx| {
2549                    project.format(
2550                        HashSet::from_iter([buffer.clone()]),
2551                        LspFormatTarget::Buffers,
2552                        false,
2553                        FormatTrigger::Save,
2554                        cx,
2555                    )
2556                });
2557                format_task.await.log_err();
2558
2559                action_log.update(cx, |action_log, cx| {
2560                    action_log.buffer_edited(buffer.clone(), cx);
2561                });
2562            }
2563
2564            project
2565                .update(cx, |project, cx| project.save_buffer(buffer, cx))
2566                .await
2567        })
2568    }
2569
2570    pub fn create_terminal(
2571        &self,
2572        command: String,
2573        args: Vec<String>,
2574        extra_env: Vec<acp::EnvVariable>,
2575        cwd: Option<PathBuf>,
2576        output_byte_limit: Option<u64>,
2577        cx: &mut Context<Self>,
2578    ) -> Task<Result<Entity<Terminal>>> {
2579        let env = match &cwd {
2580            Some(dir) => self.project.update(cx, |project, cx| {
2581                project.environment().update(cx, |env, cx| {
2582                    env.directory_environment(dir.as_path().into(), cx)
2583                })
2584            }),
2585            None => Task::ready(None).shared(),
2586        };
2587        let env = cx.spawn(async move |_, _| {
2588            let mut env = env.await.unwrap_or_default();
2589            // Disables paging for `git` and hopefully other commands
2590            env.insert("PAGER".into(), "".into());
2591            for var in extra_env {
2592                env.insert(var.name, var.value);
2593            }
2594            env
2595        });
2596
2597        let project = self.project.clone();
2598        let language_registry = project.read(cx).languages().clone();
2599        let is_windows = project.read(cx).path_style(cx).is_windows();
2600
2601        let terminal_id = acp::TerminalId::new(Uuid::new_v4().to_string());
2602        let terminal_task = cx.spawn({
2603            let terminal_id = terminal_id.clone();
2604            async move |_this, cx| {
2605                let env = env.await;
2606                let shell = project
2607                    .update(cx, |project, cx| {
2608                        project
2609                            .remote_client()
2610                            .and_then(|r| r.read(cx).default_system_shell())
2611                    })
2612                    .unwrap_or_else(|| get_default_system_shell_preferring_bash());
2613                let (task_command, task_args) =
2614                    ShellBuilder::new(&Shell::Program(shell), is_windows)
2615                        .redirect_stdin_to_dev_null()
2616                        .build(Some(command.clone()), &args);
2617                let terminal = project
2618                    .update(cx, |project, cx| {
2619                        project.create_terminal_task(
2620                            task::SpawnInTerminal {
2621                                command: Some(task_command),
2622                                args: task_args,
2623                                cwd: cwd.clone(),
2624                                env,
2625                                ..Default::default()
2626                            },
2627                            cx,
2628                        )
2629                    })
2630                    .await?;
2631
2632                anyhow::Ok(cx.new(|cx| {
2633                    Terminal::new(
2634                        terminal_id,
2635                        &format!("{} {}", command, args.join(" ")),
2636                        cwd,
2637                        output_byte_limit.map(|l| l as usize),
2638                        terminal,
2639                        language_registry,
2640                        cx,
2641                    )
2642                }))
2643            }
2644        });
2645
2646        cx.spawn(async move |this, cx| {
2647            let terminal = terminal_task.await?;
2648            this.update(cx, |this, _cx| {
2649                this.terminals.insert(terminal_id, terminal.clone());
2650                terminal
2651            })
2652        })
2653    }
2654
2655    pub fn kill_terminal(
2656        &mut self,
2657        terminal_id: acp::TerminalId,
2658        cx: &mut Context<Self>,
2659    ) -> Result<()> {
2660        self.terminals
2661            .get(&terminal_id)
2662            .context("Terminal not found")?
2663            .update(cx, |terminal, cx| {
2664                terminal.kill(cx);
2665            });
2666
2667        Ok(())
2668    }
2669
2670    pub fn release_terminal(
2671        &mut self,
2672        terminal_id: acp::TerminalId,
2673        cx: &mut Context<Self>,
2674    ) -> Result<()> {
2675        self.terminals
2676            .remove(&terminal_id)
2677            .context("Terminal not found")?
2678            .update(cx, |terminal, cx| {
2679                terminal.kill(cx);
2680            });
2681
2682        Ok(())
2683    }
2684
2685    pub fn terminal(&self, terminal_id: acp::TerminalId) -> Result<Entity<Terminal>> {
2686        self.terminals
2687            .get(&terminal_id)
2688            .context("Terminal not found")
2689            .cloned()
2690    }
2691
2692    pub fn to_markdown(&self, cx: &App) -> String {
2693        self.entries.iter().map(|e| e.to_markdown(cx)).collect()
2694    }
2695
2696    pub fn emit_load_error(&mut self, error: LoadError, cx: &mut Context<Self>) {
2697        cx.emit(AcpThreadEvent::LoadError(error));
2698    }
2699
2700    pub fn register_terminal_created(
2701        &mut self,
2702        terminal_id: acp::TerminalId,
2703        command_label: String,
2704        working_dir: Option<PathBuf>,
2705        output_byte_limit: Option<u64>,
2706        terminal: Entity<::terminal::Terminal>,
2707        cx: &mut Context<Self>,
2708    ) -> Entity<Terminal> {
2709        let language_registry = self.project.read(cx).languages().clone();
2710
2711        let entity = cx.new(|cx| {
2712            Terminal::new(
2713                terminal_id.clone(),
2714                &command_label,
2715                working_dir.clone(),
2716                output_byte_limit.map(|l| l as usize),
2717                terminal,
2718                language_registry,
2719                cx,
2720            )
2721        });
2722        self.terminals.insert(terminal_id.clone(), entity.clone());
2723        entity
2724    }
2725
2726    pub fn mark_as_subagent_output(&mut self, cx: &mut Context<Self>) {
2727        for entry in self.entries.iter_mut().rev() {
2728            if let AgentThreadEntry::AssistantMessage(assistant_message) = entry {
2729                assistant_message.is_subagent_output = true;
2730                cx.notify();
2731                return;
2732            }
2733        }
2734    }
2735
2736    pub fn on_terminal_provider_event(
2737        &mut self,
2738        event: TerminalProviderEvent,
2739        cx: &mut Context<Self>,
2740    ) {
2741        match event {
2742            TerminalProviderEvent::Created {
2743                terminal_id,
2744                label,
2745                cwd,
2746                output_byte_limit,
2747                terminal,
2748            } => {
2749                let entity = self.register_terminal_created(
2750                    terminal_id.clone(),
2751                    label,
2752                    cwd,
2753                    output_byte_limit,
2754                    terminal,
2755                    cx,
2756                );
2757
2758                if let Some(mut chunks) = self.pending_terminal_output.remove(&terminal_id) {
2759                    for data in chunks.drain(..) {
2760                        entity.update(cx, |term, cx| {
2761                            term.inner().update(cx, |inner, cx| {
2762                                inner.write_output(&data, cx);
2763                            })
2764                        });
2765                    }
2766                }
2767
2768                if let Some(_status) = self.pending_terminal_exit.remove(&terminal_id) {
2769                    entity.update(cx, |_term, cx| {
2770                        cx.notify();
2771                    });
2772                }
2773
2774                cx.notify();
2775            }
2776            TerminalProviderEvent::Output { terminal_id, data } => {
2777                if let Some(entity) = self.terminals.get(&terminal_id) {
2778                    entity.update(cx, |term, cx| {
2779                        term.inner().update(cx, |inner, cx| {
2780                            inner.write_output(&data, cx);
2781                        })
2782                    });
2783                } else {
2784                    self.pending_terminal_output
2785                        .entry(terminal_id)
2786                        .or_default()
2787                        .push(data);
2788                }
2789            }
2790            TerminalProviderEvent::TitleChanged { terminal_id, title } => {
2791                if let Some(entity) = self.terminals.get(&terminal_id) {
2792                    entity.update(cx, |term, cx| {
2793                        term.inner().update(cx, |inner, cx| {
2794                            inner.breadcrumb_text = title;
2795                            cx.emit(::terminal::Event::BreadcrumbsChanged);
2796                        })
2797                    });
2798                }
2799            }
2800            TerminalProviderEvent::Exit {
2801                terminal_id,
2802                status,
2803            } => {
2804                if let Some(entity) = self.terminals.get(&terminal_id) {
2805                    entity.update(cx, |_term, cx| {
2806                        cx.notify();
2807                    });
2808                } else {
2809                    self.pending_terminal_exit.insert(terminal_id, status);
2810                }
2811            }
2812        }
2813    }
2814}
2815
2816fn markdown_for_raw_output(
2817    raw_output: &serde_json::Value,
2818    language_registry: &Arc<LanguageRegistry>,
2819    cx: &mut App,
2820) -> Option<Entity<Markdown>> {
2821    match raw_output {
2822        serde_json::Value::Null => None,
2823        serde_json::Value::Bool(value) => Some(cx.new(|cx| {
2824            Markdown::new(
2825                value.to_string().into(),
2826                Some(language_registry.clone()),
2827                None,
2828                cx,
2829            )
2830        })),
2831        serde_json::Value::Number(value) => Some(cx.new(|cx| {
2832            Markdown::new(
2833                value.to_string().into(),
2834                Some(language_registry.clone()),
2835                None,
2836                cx,
2837            )
2838        })),
2839        serde_json::Value::String(value) => Some(cx.new(|cx| {
2840            Markdown::new(
2841                value.clone().into(),
2842                Some(language_registry.clone()),
2843                None,
2844                cx,
2845            )
2846        })),
2847        value => Some(cx.new(|cx| {
2848            let pretty_json = to_string_pretty(value).unwrap_or_else(|_| value.to_string());
2849
2850            Markdown::new(
2851                format!("```json\n{}\n```", pretty_json).into(),
2852                Some(language_registry.clone()),
2853                None,
2854                cx,
2855            )
2856        })),
2857    }
2858}
2859
2860#[cfg(test)]
2861mod tests {
2862    use super::*;
2863    use anyhow::anyhow;
2864    use futures::{channel::mpsc, future::LocalBoxFuture, select};
2865    use gpui::{App, AsyncApp, TestAppContext, WeakEntity};
2866    use indoc::indoc;
2867    use project::{AgentId, FakeFs, Fs};
2868    use rand::{distr, prelude::*};
2869    use serde_json::json;
2870    use settings::SettingsStore;
2871    use smol::stream::StreamExt as _;
2872    use std::{
2873        any::Any,
2874        cell::RefCell,
2875        path::Path,
2876        rc::Rc,
2877        sync::atomic::{AtomicBool, AtomicUsize, Ordering::SeqCst},
2878        time::Duration,
2879    };
2880    use util::{path, path_list::PathList};
2881
2882    fn init_test(cx: &mut TestAppContext) {
2883        env_logger::try_init().ok();
2884        cx.update(|cx| {
2885            let settings_store = SettingsStore::test(cx);
2886            cx.set_global(settings_store);
2887        });
2888    }
2889
2890    #[gpui::test]
2891    async fn test_terminal_output_buffered_before_created_renders(cx: &mut gpui::TestAppContext) {
2892        init_test(cx);
2893
2894        let fs = FakeFs::new(cx.executor());
2895        let project = Project::test(fs, [], cx).await;
2896        let connection = Rc::new(FakeAgentConnection::new());
2897        let thread = cx
2898            .update(|cx| {
2899                connection.new_session(
2900                    project,
2901                    PathList::new(&[std::path::Path::new(path!("/test"))]),
2902                    cx,
2903                )
2904            })
2905            .await
2906            .unwrap();
2907
2908        let terminal_id = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
2909
2910        // Send Output BEFORE Created - should be buffered by acp_thread
2911        thread.update(cx, |thread, cx| {
2912            thread.on_terminal_provider_event(
2913                TerminalProviderEvent::Output {
2914                    terminal_id: terminal_id.clone(),
2915                    data: b"hello buffered".to_vec(),
2916                },
2917                cx,
2918            );
2919        });
2920
2921        // Create a display-only terminal and then send Created
2922        let lower = cx.new(|cx| {
2923            let builder = ::terminal::TerminalBuilder::new_display_only(
2924                ::terminal::terminal_settings::CursorShape::default(),
2925                ::terminal::terminal_settings::AlternateScroll::On,
2926                None,
2927                0,
2928                cx.background_executor(),
2929                PathStyle::local(),
2930            )
2931            .unwrap();
2932            builder.subscribe(cx)
2933        });
2934
2935        thread.update(cx, |thread, cx| {
2936            thread.on_terminal_provider_event(
2937                TerminalProviderEvent::Created {
2938                    terminal_id: terminal_id.clone(),
2939                    label: "Buffered Test".to_string(),
2940                    cwd: None,
2941                    output_byte_limit: None,
2942                    terminal: lower.clone(),
2943                },
2944                cx,
2945            );
2946        });
2947
2948        // After Created, buffered Output should have been flushed into the renderer
2949        let content = thread.read_with(cx, |thread, cx| {
2950            let term = thread.terminal(terminal_id.clone()).unwrap();
2951            term.read_with(cx, |t, cx| t.inner().read(cx).get_content())
2952        });
2953
2954        assert!(
2955            content.contains("hello buffered"),
2956            "expected buffered output to render, got: {content}"
2957        );
2958    }
2959
2960    #[gpui::test]
2961    async fn test_terminal_output_and_exit_buffered_before_created(cx: &mut gpui::TestAppContext) {
2962        init_test(cx);
2963
2964        let fs = FakeFs::new(cx.executor());
2965        let project = Project::test(fs, [], cx).await;
2966        let connection = Rc::new(FakeAgentConnection::new());
2967        let thread = cx
2968            .update(|cx| {
2969                connection.new_session(
2970                    project,
2971                    PathList::new(&[std::path::Path::new(path!("/test"))]),
2972                    cx,
2973                )
2974            })
2975            .await
2976            .unwrap();
2977
2978        let terminal_id = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
2979
2980        // Send Output BEFORE Created
2981        thread.update(cx, |thread, cx| {
2982            thread.on_terminal_provider_event(
2983                TerminalProviderEvent::Output {
2984                    terminal_id: terminal_id.clone(),
2985                    data: b"pre-exit data".to_vec(),
2986                },
2987                cx,
2988            );
2989        });
2990
2991        // Send Exit BEFORE Created
2992        thread.update(cx, |thread, cx| {
2993            thread.on_terminal_provider_event(
2994                TerminalProviderEvent::Exit {
2995                    terminal_id: terminal_id.clone(),
2996                    status: acp::TerminalExitStatus::new().exit_code(0),
2997                },
2998                cx,
2999            );
3000        });
3001
3002        // Now create a display-only lower-level terminal and send Created
3003        let lower = cx.new(|cx| {
3004            let builder = ::terminal::TerminalBuilder::new_display_only(
3005                ::terminal::terminal_settings::CursorShape::default(),
3006                ::terminal::terminal_settings::AlternateScroll::On,
3007                None,
3008                0,
3009                cx.background_executor(),
3010                PathStyle::local(),
3011            )
3012            .unwrap();
3013            builder.subscribe(cx)
3014        });
3015
3016        thread.update(cx, |thread, cx| {
3017            thread.on_terminal_provider_event(
3018                TerminalProviderEvent::Created {
3019                    terminal_id: terminal_id.clone(),
3020                    label: "Buffered Exit Test".to_string(),
3021                    cwd: None,
3022                    output_byte_limit: None,
3023                    terminal: lower.clone(),
3024                },
3025                cx,
3026            );
3027        });
3028
3029        // Output should be present after Created (flushed from buffer)
3030        let content = thread.read_with(cx, |thread, cx| {
3031            let term = thread.terminal(terminal_id.clone()).unwrap();
3032            term.read_with(cx, |t, cx| t.inner().read(cx).get_content())
3033        });
3034
3035        assert!(
3036            content.contains("pre-exit data"),
3037            "expected pre-exit data to render, got: {content}"
3038        );
3039    }
3040
3041    /// Test that killing a terminal via Terminal::kill properly:
3042    /// 1. Causes wait_for_exit to complete (doesn't hang forever)
3043    /// 2. The underlying terminal still has the output that was written before the kill
3044    ///
3045    /// This test verifies that the fix to kill_active_task (which now also kills
3046    /// the shell process in addition to the foreground process) properly allows
3047    /// wait_for_exit to complete instead of hanging indefinitely.
3048    #[cfg(unix)]
3049    #[gpui::test]
3050    async fn test_terminal_kill_allows_wait_for_exit_to_complete(cx: &mut gpui::TestAppContext) {
3051        use std::collections::HashMap;
3052        use task::Shell;
3053        use util::shell_builder::ShellBuilder;
3054
3055        init_test(cx);
3056        cx.executor().allow_parking();
3057
3058        let fs = FakeFs::new(cx.executor());
3059        let project = Project::test(fs, [], cx).await;
3060        let connection = Rc::new(FakeAgentConnection::new());
3061        let thread = cx
3062            .update(|cx| {
3063                connection.new_session(
3064                    project.clone(),
3065                    PathList::new(&[Path::new(path!("/test"))]),
3066                    cx,
3067                )
3068            })
3069            .await
3070            .unwrap();
3071
3072        let terminal_id = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
3073
3074        // Create a real PTY terminal that runs a command which prints output then sleeps
3075        // We use printf instead of echo and chain with && sleep to ensure proper execution
3076        let (completion_tx, _completion_rx) = smol::channel::unbounded();
3077        let (program, args) = ShellBuilder::new(&Shell::System, false).build(
3078            Some("printf 'output_before_kill\\n' && sleep 60".to_owned()),
3079            &[],
3080        );
3081
3082        let builder = cx
3083            .update(|cx| {
3084                ::terminal::TerminalBuilder::new(
3085                    None,
3086                    None,
3087                    task::Shell::WithArguments {
3088                        program,
3089                        args,
3090                        title_override: None,
3091                    },
3092                    HashMap::default(),
3093                    ::terminal::terminal_settings::CursorShape::default(),
3094                    ::terminal::terminal_settings::AlternateScroll::On,
3095                    None,
3096                    vec![],
3097                    0,
3098                    false,
3099                    0,
3100                    Some(completion_tx),
3101                    cx,
3102                    vec![],
3103                    PathStyle::local(),
3104                )
3105            })
3106            .await
3107            .unwrap();
3108
3109        let lower_terminal = cx.new(|cx| builder.subscribe(cx));
3110
3111        // Create the acp_thread Terminal wrapper
3112        thread.update(cx, |thread, cx| {
3113            thread.on_terminal_provider_event(
3114                TerminalProviderEvent::Created {
3115                    terminal_id: terminal_id.clone(),
3116                    label: "printf output_before_kill && sleep 60".to_string(),
3117                    cwd: None,
3118                    output_byte_limit: None,
3119                    terminal: lower_terminal.clone(),
3120                },
3121                cx,
3122            );
3123        });
3124
3125        // Wait for the printf command to execute and produce output
3126        // Use real time since parking is enabled
3127        cx.executor().timer(Duration::from_millis(500)).await;
3128
3129        // Get the acp_thread Terminal and kill it
3130        let wait_for_exit = thread.update(cx, |thread, cx| {
3131            let term = thread.terminals.get(&terminal_id).unwrap();
3132            let wait_for_exit = term.read(cx).wait_for_exit();
3133            term.update(cx, |term, cx| {
3134                term.kill(cx);
3135            });
3136            wait_for_exit
3137        });
3138
3139        // KEY ASSERTION: wait_for_exit should complete within a reasonable time (not hang).
3140        // Before the fix to kill_active_task, this would hang forever because
3141        // only the foreground process was killed, not the shell, so the PTY
3142        // child never exited and wait_for_completed_task never completed.
3143        let exit_result = futures::select! {
3144            result = futures::FutureExt::fuse(wait_for_exit) => Some(result),
3145            _ = futures::FutureExt::fuse(cx.background_executor.timer(Duration::from_secs(5))) => None,
3146        };
3147
3148        assert!(
3149            exit_result.is_some(),
3150            "wait_for_exit should complete after kill, but it timed out. \
3151            This indicates kill_active_task is not properly killing the shell process."
3152        );
3153
3154        // Give the system a chance to process any pending updates
3155        cx.run_until_parked();
3156
3157        // Verify that the underlying terminal still has the output that was
3158        // written before the kill. This verifies that killing doesn't lose output.
3159        let inner_content = thread.read_with(cx, |thread, cx| {
3160            let term = thread.terminals.get(&terminal_id).unwrap();
3161            term.read(cx).inner().read(cx).get_content()
3162        });
3163
3164        assert!(
3165            inner_content.contains("output_before_kill"),
3166            "Underlying terminal should contain output from before kill, got: {}",
3167            inner_content
3168        );
3169    }
3170
3171    #[gpui::test]
3172    async fn test_push_user_content_block(cx: &mut gpui::TestAppContext) {
3173        init_test(cx);
3174
3175        let fs = FakeFs::new(cx.executor());
3176        let project = Project::test(fs, [], cx).await;
3177        let connection = Rc::new(FakeAgentConnection::new());
3178        let thread = cx
3179            .update(|cx| {
3180                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
3181            })
3182            .await
3183            .unwrap();
3184
3185        // Test creating a new user message
3186        thread.update(cx, |thread, cx| {
3187            thread.push_user_content_block(None, "Hello, ".into(), cx);
3188        });
3189
3190        thread.update(cx, |thread, cx| {
3191            assert_eq!(thread.entries.len(), 1);
3192            if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
3193                assert_eq!(user_msg.id, None);
3194                assert_eq!(user_msg.content.to_markdown(cx), "Hello, ");
3195            } else {
3196                panic!("Expected UserMessage");
3197            }
3198        });
3199
3200        // Test appending to existing user message
3201        let message_1_id = UserMessageId::new();
3202        thread.update(cx, |thread, cx| {
3203            thread.push_user_content_block(Some(message_1_id.clone()), "world!".into(), cx);
3204        });
3205
3206        thread.update(cx, |thread, cx| {
3207            assert_eq!(thread.entries.len(), 1);
3208            if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[0] {
3209                assert_eq!(user_msg.id, Some(message_1_id));
3210                assert_eq!(user_msg.content.to_markdown(cx), "Hello, world!");
3211            } else {
3212                panic!("Expected UserMessage");
3213            }
3214        });
3215
3216        // Test creating new user message after assistant message
3217        thread.update(cx, |thread, cx| {
3218            thread.push_assistant_content_block("Assistant response".into(), false, cx);
3219        });
3220
3221        let message_2_id = UserMessageId::new();
3222        thread.update(cx, |thread, cx| {
3223            thread.push_user_content_block(
3224                Some(message_2_id.clone()),
3225                "New user message".into(),
3226                cx,
3227            );
3228        });
3229
3230        thread.update(cx, |thread, cx| {
3231            assert_eq!(thread.entries.len(), 3);
3232            if let AgentThreadEntry::UserMessage(user_msg) = &thread.entries[2] {
3233                assert_eq!(user_msg.id, Some(message_2_id));
3234                assert_eq!(user_msg.content.to_markdown(cx), "New user message");
3235            } else {
3236                panic!("Expected UserMessage at index 2");
3237            }
3238        });
3239    }
3240
3241    #[gpui::test]
3242    async fn test_thinking_concatenation(cx: &mut gpui::TestAppContext) {
3243        init_test(cx);
3244
3245        let fs = FakeFs::new(cx.executor());
3246        let project = Project::test(fs, [], cx).await;
3247        let connection = Rc::new(FakeAgentConnection::new().on_user_message(
3248            |_, thread, mut cx| {
3249                async move {
3250                    thread.update(&mut cx, |thread, cx| {
3251                        thread
3252                            .handle_session_update(
3253                                acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk::new(
3254                                    "Thinking ".into(),
3255                                )),
3256                                cx,
3257                            )
3258                            .unwrap();
3259                        thread
3260                            .handle_session_update(
3261                                acp::SessionUpdate::AgentThoughtChunk(acp::ContentChunk::new(
3262                                    "hard!".into(),
3263                                )),
3264                                cx,
3265                            )
3266                            .unwrap();
3267                    })?;
3268                    Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3269                }
3270                .boxed_local()
3271            },
3272        ));
3273
3274        let thread = cx
3275            .update(|cx| {
3276                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
3277            })
3278            .await
3279            .unwrap();
3280
3281        thread
3282            .update(cx, |thread, cx| thread.send_raw("Hello from Zed!", cx))
3283            .await
3284            .unwrap();
3285
3286        let output = thread.read_with(cx, |thread, cx| thread.to_markdown(cx));
3287        assert_eq!(
3288            output,
3289            indoc! {r#"
3290            ## User
3291
3292            Hello from Zed!
3293
3294            ## Assistant
3295
3296            <thinking>
3297            Thinking hard!
3298            </thinking>
3299
3300            "#}
3301        );
3302    }
3303
3304    #[gpui::test]
3305    async fn test_edits_concurrently_to_user(cx: &mut TestAppContext) {
3306        init_test(cx);
3307
3308        let fs = FakeFs::new(cx.executor());
3309        fs.insert_tree(path!("/tmp"), json!({"foo": "one\ntwo\nthree\n"}))
3310            .await;
3311        let project = Project::test(fs.clone(), [], cx).await;
3312        let (read_file_tx, read_file_rx) = oneshot::channel::<()>();
3313        let read_file_tx = Rc::new(RefCell::new(Some(read_file_tx)));
3314        let connection = Rc::new(FakeAgentConnection::new().on_user_message(
3315            move |_, thread, mut cx| {
3316                let read_file_tx = read_file_tx.clone();
3317                async move {
3318                    let content = thread
3319                        .update(&mut cx, |thread, cx| {
3320                            thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
3321                        })
3322                        .unwrap()
3323                        .await
3324                        .unwrap();
3325                    assert_eq!(content, "one\ntwo\nthree\n");
3326                    read_file_tx.take().unwrap().send(()).unwrap();
3327                    thread
3328                        .update(&mut cx, |thread, cx| {
3329                            thread.write_text_file(
3330                                path!("/tmp/foo").into(),
3331                                "one\ntwo\nthree\nfour\nfive\n".to_string(),
3332                                cx,
3333                            )
3334                        })
3335                        .unwrap()
3336                        .await
3337                        .unwrap();
3338                    Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3339                }
3340                .boxed_local()
3341            },
3342        ));
3343
3344        let (worktree, pathbuf) = project
3345            .update(cx, |project, cx| {
3346                project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
3347            })
3348            .await
3349            .unwrap();
3350        let buffer = project
3351            .update(cx, |project, cx| {
3352                project.open_buffer((worktree.read(cx).id(), pathbuf), cx)
3353            })
3354            .await
3355            .unwrap();
3356
3357        let thread = cx
3358            .update(|cx| {
3359                connection.new_session(project, PathList::new(&[Path::new(path!("/tmp"))]), cx)
3360            })
3361            .await
3362            .unwrap();
3363
3364        let request = thread.update(cx, |thread, cx| {
3365            thread.send_raw("Extend the count in /tmp/foo", cx)
3366        });
3367        read_file_rx.await.ok();
3368        buffer.update(cx, |buffer, cx| {
3369            buffer.edit([(0..0, "zero\n".to_string())], None, cx);
3370        });
3371        cx.run_until_parked();
3372        assert_eq!(
3373            buffer.read_with(cx, |buffer, _| buffer.text()),
3374            "zero\none\ntwo\nthree\nfour\nfive\n"
3375        );
3376        assert_eq!(
3377            String::from_utf8(fs.read_file_sync(path!("/tmp/foo")).unwrap()).unwrap(),
3378            "zero\none\ntwo\nthree\nfour\nfive\n"
3379        );
3380        request.await.unwrap();
3381    }
3382
3383    #[gpui::test]
3384    async fn test_reading_from_line(cx: &mut TestAppContext) {
3385        init_test(cx);
3386
3387        let fs = FakeFs::new(cx.executor());
3388        fs.insert_tree(path!("/tmp"), json!({"foo": "one\ntwo\nthree\nfour\n"}))
3389            .await;
3390        let project = Project::test(fs.clone(), [], cx).await;
3391        project
3392            .update(cx, |project, cx| {
3393                project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
3394            })
3395            .await
3396            .unwrap();
3397
3398        let connection = Rc::new(FakeAgentConnection::new());
3399
3400        let thread = cx
3401            .update(|cx| {
3402                connection.new_session(project, PathList::new(&[Path::new(path!("/tmp"))]), cx)
3403            })
3404            .await
3405            .unwrap();
3406
3407        // Whole file
3408        let content = thread
3409            .update(cx, |thread, cx| {
3410                thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
3411            })
3412            .await
3413            .unwrap();
3414
3415        assert_eq!(content, "one\ntwo\nthree\nfour\n");
3416
3417        // Only start line
3418        let content = thread
3419            .update(cx, |thread, cx| {
3420                thread.read_text_file(path!("/tmp/foo").into(), Some(3), None, false, cx)
3421            })
3422            .await
3423            .unwrap();
3424
3425        assert_eq!(content, "three\nfour\n");
3426
3427        // Only limit
3428        let content = thread
3429            .update(cx, |thread, cx| {
3430                thread.read_text_file(path!("/tmp/foo").into(), None, Some(2), false, cx)
3431            })
3432            .await
3433            .unwrap();
3434
3435        assert_eq!(content, "one\ntwo\n");
3436
3437        // Range
3438        let content = thread
3439            .update(cx, |thread, cx| {
3440                thread.read_text_file(path!("/tmp/foo").into(), Some(2), Some(2), false, cx)
3441            })
3442            .await
3443            .unwrap();
3444
3445        assert_eq!(content, "two\nthree\n");
3446
3447        // Invalid
3448        let err = thread
3449            .update(cx, |thread, cx| {
3450                thread.read_text_file(path!("/tmp/foo").into(), Some(6), Some(2), false, cx)
3451            })
3452            .await
3453            .unwrap_err();
3454
3455        assert_eq!(
3456            err.to_string(),
3457            "Invalid params: \"Attempting to read beyond the end of the file, line 5:0\""
3458        );
3459    }
3460
3461    #[gpui::test]
3462    async fn test_reading_empty_file(cx: &mut TestAppContext) {
3463        init_test(cx);
3464
3465        let fs = FakeFs::new(cx.executor());
3466        fs.insert_tree(path!("/tmp"), json!({"foo": ""})).await;
3467        let project = Project::test(fs.clone(), [], cx).await;
3468        project
3469            .update(cx, |project, cx| {
3470                project.find_or_create_worktree(path!("/tmp/foo"), true, cx)
3471            })
3472            .await
3473            .unwrap();
3474
3475        let connection = Rc::new(FakeAgentConnection::new());
3476
3477        let thread = cx
3478            .update(|cx| {
3479                connection.new_session(project, PathList::new(&[Path::new(path!("/tmp"))]), cx)
3480            })
3481            .await
3482            .unwrap();
3483
3484        // Whole file
3485        let content = thread
3486            .update(cx, |thread, cx| {
3487                thread.read_text_file(path!("/tmp/foo").into(), None, None, false, cx)
3488            })
3489            .await
3490            .unwrap();
3491
3492        assert_eq!(content, "");
3493
3494        // Only start line
3495        let content = thread
3496            .update(cx, |thread, cx| {
3497                thread.read_text_file(path!("/tmp/foo").into(), Some(1), None, false, cx)
3498            })
3499            .await
3500            .unwrap();
3501
3502        assert_eq!(content, "");
3503
3504        // Only limit
3505        let content = thread
3506            .update(cx, |thread, cx| {
3507                thread.read_text_file(path!("/tmp/foo").into(), None, Some(2), false, cx)
3508            })
3509            .await
3510            .unwrap();
3511
3512        assert_eq!(content, "");
3513
3514        // Range
3515        let content = thread
3516            .update(cx, |thread, cx| {
3517                thread.read_text_file(path!("/tmp/foo").into(), Some(1), Some(1), false, cx)
3518            })
3519            .await
3520            .unwrap();
3521
3522        assert_eq!(content, "");
3523
3524        // Invalid
3525        let err = thread
3526            .update(cx, |thread, cx| {
3527                thread.read_text_file(path!("/tmp/foo").into(), Some(5), Some(2), false, cx)
3528            })
3529            .await
3530            .unwrap_err();
3531
3532        assert_eq!(
3533            err.to_string(),
3534            "Invalid params: \"Attempting to read beyond the end of the file, line 1:0\""
3535        );
3536    }
3537    #[gpui::test]
3538    async fn test_reading_non_existing_file(cx: &mut TestAppContext) {
3539        init_test(cx);
3540
3541        let fs = FakeFs::new(cx.executor());
3542        fs.insert_tree(path!("/tmp"), json!({})).await;
3543        let project = Project::test(fs.clone(), [], cx).await;
3544        project
3545            .update(cx, |project, cx| {
3546                project.find_or_create_worktree(path!("/tmp"), true, cx)
3547            })
3548            .await
3549            .unwrap();
3550
3551        let connection = Rc::new(FakeAgentConnection::new());
3552
3553        let thread = cx
3554            .update(|cx| {
3555                connection.new_session(project, PathList::new(&[Path::new(path!("/tmp"))]), cx)
3556            })
3557            .await
3558            .unwrap();
3559
3560        // Out of project file
3561        let err = thread
3562            .update(cx, |thread, cx| {
3563                thread.read_text_file(path!("/foo").into(), None, None, false, cx)
3564            })
3565            .await
3566            .unwrap_err();
3567
3568        assert_eq!(err.code, acp::ErrorCode::ResourceNotFound);
3569    }
3570
3571    #[gpui::test]
3572    async fn test_succeeding_canceled_toolcall(cx: &mut TestAppContext) {
3573        init_test(cx);
3574
3575        let fs = FakeFs::new(cx.executor());
3576        let project = Project::test(fs, [], cx).await;
3577        let id = acp::ToolCallId::new("test");
3578
3579        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3580            let id = id.clone();
3581            move |_, thread, mut cx| {
3582                let id = id.clone();
3583                async move {
3584                    thread
3585                        .update(&mut cx, |thread, cx| {
3586                            thread.handle_session_update(
3587                                acp::SessionUpdate::ToolCall(
3588                                    acp::ToolCall::new(id.clone(), "Label")
3589                                        .kind(acp::ToolKind::Fetch)
3590                                        .status(acp::ToolCallStatus::InProgress),
3591                                ),
3592                                cx,
3593                            )
3594                        })
3595                        .unwrap()
3596                        .unwrap();
3597                    Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3598                }
3599                .boxed_local()
3600            }
3601        }));
3602
3603        let thread = cx
3604            .update(|cx| {
3605                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
3606            })
3607            .await
3608            .unwrap();
3609
3610        let request = thread.update(cx, |thread, cx| {
3611            thread.send_raw("Fetch https://example.com", cx)
3612        });
3613
3614        run_until_first_tool_call(&thread, cx).await;
3615
3616        thread.read_with(cx, |thread, _| {
3617            assert!(matches!(
3618                thread.entries[1],
3619                AgentThreadEntry::ToolCall(ToolCall {
3620                    status: ToolCallStatus::InProgress,
3621                    ..
3622                })
3623            ));
3624        });
3625
3626        thread.update(cx, |thread, cx| thread.cancel(cx)).await;
3627
3628        thread.read_with(cx, |thread, _| {
3629            assert!(matches!(
3630                &thread.entries[1],
3631                AgentThreadEntry::ToolCall(ToolCall {
3632                    status: ToolCallStatus::Canceled,
3633                    ..
3634                })
3635            ));
3636        });
3637
3638        thread
3639            .update(cx, |thread, cx| {
3640                thread.handle_session_update(
3641                    acp::SessionUpdate::ToolCallUpdate(acp::ToolCallUpdate::new(
3642                        id,
3643                        acp::ToolCallUpdateFields::new().status(acp::ToolCallStatus::Completed),
3644                    )),
3645                    cx,
3646                )
3647            })
3648            .unwrap();
3649
3650        request.await.unwrap();
3651
3652        thread.read_with(cx, |thread, _| {
3653            assert!(matches!(
3654                thread.entries[1],
3655                AgentThreadEntry::ToolCall(ToolCall {
3656                    status: ToolCallStatus::Completed,
3657                    ..
3658                })
3659            ));
3660        });
3661    }
3662
3663    #[gpui::test]
3664    async fn test_no_pending_edits_if_tool_calls_are_completed(cx: &mut TestAppContext) {
3665        init_test(cx);
3666        let fs = FakeFs::new(cx.background_executor.clone());
3667        fs.insert_tree(path!("/test"), json!({})).await;
3668        let project = Project::test(fs, [path!("/test").as_ref()], cx).await;
3669
3670        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3671            move |_, thread, mut cx| {
3672                async move {
3673                    thread
3674                        .update(&mut cx, |thread, cx| {
3675                            thread.handle_session_update(
3676                                acp::SessionUpdate::ToolCall(
3677                                    acp::ToolCall::new("test", "Label")
3678                                        .kind(acp::ToolKind::Edit)
3679                                        .status(acp::ToolCallStatus::Completed)
3680                                        .content(vec![acp::ToolCallContent::Diff(acp::Diff::new(
3681                                            "/test/test.txt",
3682                                            "foo",
3683                                        ))]),
3684                                ),
3685                                cx,
3686                            )
3687                        })
3688                        .unwrap()
3689                        .unwrap();
3690                    Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3691                }
3692                .boxed_local()
3693            }
3694        }));
3695
3696        let thread = cx
3697            .update(|cx| {
3698                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
3699            })
3700            .await
3701            .unwrap();
3702
3703        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Hi".into()], cx)))
3704            .await
3705            .unwrap();
3706
3707        assert!(cx.read(|cx| !thread.read(cx).has_pending_edit_tool_calls()));
3708    }
3709
3710    #[gpui::test(iterations = 10)]
3711    async fn test_checkpoints(cx: &mut TestAppContext) {
3712        init_test(cx);
3713        let fs = FakeFs::new(cx.background_executor.clone());
3714        fs.insert_tree(
3715            path!("/test"),
3716            json!({
3717                ".git": {}
3718            }),
3719        )
3720        .await;
3721        let project = Project::test(fs.clone(), [path!("/test").as_ref()], cx).await;
3722
3723        let simulate_changes = Arc::new(AtomicBool::new(true));
3724        let next_filename = Arc::new(AtomicUsize::new(0));
3725        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3726            let simulate_changes = simulate_changes.clone();
3727            let next_filename = next_filename.clone();
3728            let fs = fs.clone();
3729            move |request, thread, mut cx| {
3730                let fs = fs.clone();
3731                let simulate_changes = simulate_changes.clone();
3732                let next_filename = next_filename.clone();
3733                async move {
3734                    if simulate_changes.load(SeqCst) {
3735                        let filename = format!("/test/file-{}", next_filename.fetch_add(1, SeqCst));
3736                        fs.write(Path::new(&filename), b"").await?;
3737                    }
3738
3739                    let acp::ContentBlock::Text(content) = &request.prompt[0] else {
3740                        panic!("expected text content block");
3741                    };
3742                    thread.update(&mut cx, |thread, cx| {
3743                        thread
3744                            .handle_session_update(
3745                                acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk::new(
3746                                    content.text.to_uppercase().into(),
3747                                )),
3748                                cx,
3749                            )
3750                            .unwrap();
3751                    })?;
3752                    Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3753                }
3754                .boxed_local()
3755            }
3756        }));
3757        let thread = cx
3758            .update(|cx| {
3759                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
3760            })
3761            .await
3762            .unwrap();
3763
3764        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["Lorem".into()], cx)))
3765            .await
3766            .unwrap();
3767        thread.read_with(cx, |thread, cx| {
3768            assert_eq!(
3769                thread.to_markdown(cx),
3770                indoc! {"
3771                    ## User (checkpoint)
3772
3773                    Lorem
3774
3775                    ## Assistant
3776
3777                    LOREM
3778
3779                "}
3780            );
3781        });
3782        assert_eq!(fs.files(), vec![Path::new(path!("/test/file-0"))]);
3783
3784        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["ipsum".into()], cx)))
3785            .await
3786            .unwrap();
3787        thread.read_with(cx, |thread, cx| {
3788            assert_eq!(
3789                thread.to_markdown(cx),
3790                indoc! {"
3791                    ## User (checkpoint)
3792
3793                    Lorem
3794
3795                    ## Assistant
3796
3797                    LOREM
3798
3799                    ## User (checkpoint)
3800
3801                    ipsum
3802
3803                    ## Assistant
3804
3805                    IPSUM
3806
3807                "}
3808            );
3809        });
3810        assert_eq!(
3811            fs.files(),
3812            vec![
3813                Path::new(path!("/test/file-0")),
3814                Path::new(path!("/test/file-1"))
3815            ]
3816        );
3817
3818        // Checkpoint isn't stored when there are no changes.
3819        simulate_changes.store(false, SeqCst);
3820        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["dolor".into()], cx)))
3821            .await
3822            .unwrap();
3823        thread.read_with(cx, |thread, cx| {
3824            assert_eq!(
3825                thread.to_markdown(cx),
3826                indoc! {"
3827                    ## User (checkpoint)
3828
3829                    Lorem
3830
3831                    ## Assistant
3832
3833                    LOREM
3834
3835                    ## User (checkpoint)
3836
3837                    ipsum
3838
3839                    ## Assistant
3840
3841                    IPSUM
3842
3843                    ## User
3844
3845                    dolor
3846
3847                    ## Assistant
3848
3849                    DOLOR
3850
3851                "}
3852            );
3853        });
3854        assert_eq!(
3855            fs.files(),
3856            vec![
3857                Path::new(path!("/test/file-0")),
3858                Path::new(path!("/test/file-1"))
3859            ]
3860        );
3861
3862        // Rewinding the conversation truncates the history and restores the checkpoint.
3863        thread
3864            .update(cx, |thread, cx| {
3865                let AgentThreadEntry::UserMessage(message) = &thread.entries[2] else {
3866                    panic!("unexpected entries {:?}", thread.entries)
3867                };
3868                thread.restore_checkpoint(message.id.clone().unwrap(), cx)
3869            })
3870            .await
3871            .unwrap();
3872        thread.read_with(cx, |thread, cx| {
3873            assert_eq!(
3874                thread.to_markdown(cx),
3875                indoc! {"
3876                    ## User (checkpoint)
3877
3878                    Lorem
3879
3880                    ## Assistant
3881
3882                    LOREM
3883
3884                "}
3885            );
3886        });
3887        assert_eq!(fs.files(), vec![Path::new(path!("/test/file-0"))]);
3888    }
3889
3890    #[gpui::test]
3891    async fn test_tool_result_refusal(cx: &mut TestAppContext) {
3892        use std::sync::atomic::AtomicUsize;
3893        init_test(cx);
3894
3895        let fs = FakeFs::new(cx.executor());
3896        let project = Project::test(fs, None, cx).await;
3897
3898        // Create a connection that simulates refusal after tool result
3899        let prompt_count = Arc::new(AtomicUsize::new(0));
3900        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3901            let prompt_count = prompt_count.clone();
3902            move |_request, thread, mut cx| {
3903                let count = prompt_count.fetch_add(1, SeqCst);
3904                async move {
3905                    if count == 0 {
3906                        // First prompt: Generate a tool call with result
3907                        thread.update(&mut cx, |thread, cx| {
3908                            thread
3909                                .handle_session_update(
3910                                    acp::SessionUpdate::ToolCall(
3911                                        acp::ToolCall::new("tool1", "Test Tool")
3912                                            .kind(acp::ToolKind::Fetch)
3913                                            .status(acp::ToolCallStatus::Completed)
3914                                            .raw_input(serde_json::json!({"query": "test"}))
3915                                            .raw_output(serde_json::json!({"result": "inappropriate content"})),
3916                                    ),
3917                                    cx,
3918                                )
3919                                .unwrap();
3920                        })?;
3921
3922                        // Now return refusal because of the tool result
3923                        Ok(acp::PromptResponse::new(acp::StopReason::Refusal))
3924                    } else {
3925                        Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
3926                    }
3927                }
3928                .boxed_local()
3929            }
3930        }));
3931
3932        let thread = cx
3933            .update(|cx| {
3934                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
3935            })
3936            .await
3937            .unwrap();
3938
3939        // Track if we see a Refusal event
3940        let saw_refusal_event = Arc::new(std::sync::Mutex::new(false));
3941        let saw_refusal_event_captured = saw_refusal_event.clone();
3942        thread.update(cx, |_thread, cx| {
3943            cx.subscribe(
3944                &thread,
3945                move |_thread, _event_thread, event: &AcpThreadEvent, _cx| {
3946                    if matches!(event, AcpThreadEvent::Refusal) {
3947                        *saw_refusal_event_captured.lock().unwrap() = true;
3948                    }
3949                },
3950            )
3951            .detach();
3952        });
3953
3954        // Send a user message - this will trigger tool call and then refusal
3955        let send_task = thread.update(cx, |thread, cx| thread.send(vec!["Hello".into()], cx));
3956        cx.background_executor.spawn(send_task).detach();
3957        cx.run_until_parked();
3958
3959        // Verify that:
3960        // 1. A Refusal event WAS emitted (because it's a tool result refusal, not user prompt)
3961        // 2. The user message was NOT truncated
3962        assert!(
3963            *saw_refusal_event.lock().unwrap(),
3964            "Refusal event should be emitted for tool result refusals"
3965        );
3966
3967        thread.read_with(cx, |thread, _| {
3968            let entries = thread.entries();
3969            assert!(entries.len() >= 2, "Should have user message and tool call");
3970
3971            // Verify user message is still there
3972            assert!(
3973                matches!(entries[0], AgentThreadEntry::UserMessage(_)),
3974                "User message should not be truncated"
3975            );
3976
3977            // Verify tool call is there with result
3978            if let AgentThreadEntry::ToolCall(tool_call) = &entries[1] {
3979                assert!(
3980                    tool_call.raw_output.is_some(),
3981                    "Tool call should have output"
3982                );
3983            } else {
3984                panic!("Expected tool call at index 1");
3985            }
3986        });
3987    }
3988
3989    #[gpui::test]
3990    async fn test_user_prompt_refusal_emits_event(cx: &mut TestAppContext) {
3991        init_test(cx);
3992
3993        let fs = FakeFs::new(cx.executor());
3994        let project = Project::test(fs, None, cx).await;
3995
3996        let refuse_next = Arc::new(AtomicBool::new(false));
3997        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
3998            let refuse_next = refuse_next.clone();
3999            move |_request, _thread, _cx| {
4000                if refuse_next.load(SeqCst) {
4001                    async move { Ok(acp::PromptResponse::new(acp::StopReason::Refusal)) }
4002                        .boxed_local()
4003                } else {
4004                    async move { Ok(acp::PromptResponse::new(acp::StopReason::EndTurn)) }
4005                        .boxed_local()
4006                }
4007            }
4008        }));
4009
4010        let thread = cx
4011            .update(|cx| {
4012                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4013            })
4014            .await
4015            .unwrap();
4016
4017        // Track if we see a Refusal event
4018        let saw_refusal_event = Arc::new(std::sync::Mutex::new(false));
4019        let saw_refusal_event_captured = saw_refusal_event.clone();
4020        thread.update(cx, |_thread, cx| {
4021            cx.subscribe(
4022                &thread,
4023                move |_thread, _event_thread, event: &AcpThreadEvent, _cx| {
4024                    if matches!(event, AcpThreadEvent::Refusal) {
4025                        *saw_refusal_event_captured.lock().unwrap() = true;
4026                    }
4027                },
4028            )
4029            .detach();
4030        });
4031
4032        // Send a message that will be refused
4033        refuse_next.store(true, SeqCst);
4034        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["hello".into()], cx)))
4035            .await
4036            .unwrap();
4037
4038        // Verify that a Refusal event WAS emitted for user prompt refusal
4039        assert!(
4040            *saw_refusal_event.lock().unwrap(),
4041            "Refusal event should be emitted for user prompt refusals"
4042        );
4043
4044        // Verify the message was truncated (user prompt refusal)
4045        thread.read_with(cx, |thread, cx| {
4046            assert_eq!(thread.to_markdown(cx), "");
4047        });
4048    }
4049
4050    #[gpui::test]
4051    async fn test_refusal(cx: &mut TestAppContext) {
4052        init_test(cx);
4053        let fs = FakeFs::new(cx.background_executor.clone());
4054        fs.insert_tree(path!("/"), json!({})).await;
4055        let project = Project::test(fs.clone(), [path!("/").as_ref()], cx).await;
4056
4057        let refuse_next = Arc::new(AtomicBool::new(false));
4058        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
4059            let refuse_next = refuse_next.clone();
4060            move |request, thread, mut cx| {
4061                let refuse_next = refuse_next.clone();
4062                async move {
4063                    if refuse_next.load(SeqCst) {
4064                        return Ok(acp::PromptResponse::new(acp::StopReason::Refusal));
4065                    }
4066
4067                    let acp::ContentBlock::Text(content) = &request.prompt[0] else {
4068                        panic!("expected text content block");
4069                    };
4070                    thread.update(&mut cx, |thread, cx| {
4071                        thread
4072                            .handle_session_update(
4073                                acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk::new(
4074                                    content.text.to_uppercase().into(),
4075                                )),
4076                                cx,
4077                            )
4078                            .unwrap();
4079                    })?;
4080                    Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
4081                }
4082                .boxed_local()
4083            }
4084        }));
4085        let thread = cx
4086            .update(|cx| {
4087                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4088            })
4089            .await
4090            .unwrap();
4091
4092        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["hello".into()], cx)))
4093            .await
4094            .unwrap();
4095        thread.read_with(cx, |thread, cx| {
4096            assert_eq!(
4097                thread.to_markdown(cx),
4098                indoc! {"
4099                    ## User
4100
4101                    hello
4102
4103                    ## Assistant
4104
4105                    HELLO
4106
4107                "}
4108            );
4109        });
4110
4111        // Simulate refusing the second message. The message should be truncated
4112        // when a user prompt is refused.
4113        refuse_next.store(true, SeqCst);
4114        cx.update(|cx| thread.update(cx, |thread, cx| thread.send(vec!["world".into()], cx)))
4115            .await
4116            .unwrap();
4117        thread.read_with(cx, |thread, cx| {
4118            assert_eq!(
4119                thread.to_markdown(cx),
4120                indoc! {"
4121                    ## User
4122
4123                    hello
4124
4125                    ## Assistant
4126
4127                    HELLO
4128
4129                "}
4130            );
4131        });
4132    }
4133
4134    async fn run_until_first_tool_call(
4135        thread: &Entity<AcpThread>,
4136        cx: &mut TestAppContext,
4137    ) -> usize {
4138        let (mut tx, mut rx) = mpsc::channel::<usize>(1);
4139
4140        let subscription = cx.update(|cx| {
4141            cx.subscribe(thread, move |thread, _, cx| {
4142                for (ix, entry) in thread.read(cx).entries.iter().enumerate() {
4143                    if matches!(entry, AgentThreadEntry::ToolCall(_)) {
4144                        return tx.try_send(ix).unwrap();
4145                    }
4146                }
4147            })
4148        });
4149
4150        select! {
4151            _ = futures::FutureExt::fuse(cx.background_executor.timer(Duration::from_secs(10))) => {
4152                panic!("Timeout waiting for tool call")
4153            }
4154            ix = rx.next().fuse() => {
4155                drop(subscription);
4156                ix.unwrap()
4157            }
4158        }
4159    }
4160
4161    #[derive(Clone, Default)]
4162    struct FakeAgentConnection {
4163        auth_methods: Vec<acp::AuthMethod>,
4164        sessions: Arc<parking_lot::Mutex<HashMap<acp::SessionId, WeakEntity<AcpThread>>>>,
4165        set_title_calls: Rc<RefCell<Vec<SharedString>>>,
4166        on_user_message: Option<
4167            Rc<
4168                dyn Fn(
4169                        acp::PromptRequest,
4170                        WeakEntity<AcpThread>,
4171                        AsyncApp,
4172                    ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
4173                    + 'static,
4174            >,
4175        >,
4176    }
4177
4178    impl FakeAgentConnection {
4179        fn new() -> Self {
4180            Self {
4181                auth_methods: Vec::new(),
4182                on_user_message: None,
4183                sessions: Arc::default(),
4184                set_title_calls: Default::default(),
4185            }
4186        }
4187
4188        #[expect(unused)]
4189        fn with_auth_methods(mut self, auth_methods: Vec<acp::AuthMethod>) -> Self {
4190            self.auth_methods = auth_methods;
4191            self
4192        }
4193
4194        fn on_user_message(
4195            mut self,
4196            handler: impl Fn(
4197                acp::PromptRequest,
4198                WeakEntity<AcpThread>,
4199                AsyncApp,
4200            ) -> LocalBoxFuture<'static, Result<acp::PromptResponse>>
4201            + 'static,
4202        ) -> Self {
4203            self.on_user_message.replace(Rc::new(handler));
4204            self
4205        }
4206    }
4207
4208    impl AgentConnection for FakeAgentConnection {
4209        fn agent_id(&self) -> AgentId {
4210            AgentId::new("fake")
4211        }
4212
4213        fn telemetry_id(&self) -> SharedString {
4214            "fake".into()
4215        }
4216
4217        fn auth_methods(&self) -> &[acp::AuthMethod] {
4218            &self.auth_methods
4219        }
4220
4221        fn new_session(
4222            self: Rc<Self>,
4223            project: Entity<Project>,
4224            work_dirs: PathList,
4225            cx: &mut App,
4226        ) -> Task<gpui::Result<Entity<AcpThread>>> {
4227            let session_id = acp::SessionId::new(
4228                rand::rng()
4229                    .sample_iter(&distr::Alphanumeric)
4230                    .take(7)
4231                    .map(char::from)
4232                    .collect::<String>(),
4233            );
4234            let action_log = cx.new(|_| ActionLog::new(project.clone()));
4235            let thread = cx.new(|cx| {
4236                AcpThread::new(
4237                    None,
4238                    "Test",
4239                    Some(work_dirs),
4240                    self.clone(),
4241                    project,
4242                    action_log,
4243                    session_id.clone(),
4244                    watch::Receiver::constant(
4245                        acp::PromptCapabilities::new()
4246                            .image(true)
4247                            .audio(true)
4248                            .embedded_context(true),
4249                    ),
4250                    cx,
4251                )
4252            });
4253            self.sessions.lock().insert(session_id, thread.downgrade());
4254            Task::ready(Ok(thread))
4255        }
4256
4257        fn authenticate(&self, method: acp::AuthMethodId, _cx: &mut App) -> Task<gpui::Result<()>> {
4258            if self.auth_methods().iter().any(|m| m.id() == &method) {
4259                Task::ready(Ok(()))
4260            } else {
4261                Task::ready(Err(anyhow!("Invalid Auth Method")))
4262            }
4263        }
4264
4265        fn prompt(
4266            &self,
4267            _id: Option<UserMessageId>,
4268            params: acp::PromptRequest,
4269            cx: &mut App,
4270        ) -> Task<gpui::Result<acp::PromptResponse>> {
4271            let sessions = self.sessions.lock();
4272            let thread = sessions.get(&params.session_id).unwrap();
4273            if let Some(handler) = &self.on_user_message {
4274                let handler = handler.clone();
4275                let thread = thread.clone();
4276                cx.spawn(async move |cx| handler(params, thread, cx.clone()).await)
4277            } else {
4278                Task::ready(Ok(acp::PromptResponse::new(acp::StopReason::EndTurn)))
4279            }
4280        }
4281
4282        fn cancel(&self, _session_id: &acp::SessionId, _cx: &mut App) {}
4283
4284        fn truncate(
4285            &self,
4286            session_id: &acp::SessionId,
4287            _cx: &App,
4288        ) -> Option<Rc<dyn AgentSessionTruncate>> {
4289            Some(Rc::new(FakeAgentSessionEditor {
4290                _session_id: session_id.clone(),
4291            }))
4292        }
4293
4294        fn set_title(
4295            &self,
4296            _session_id: &acp::SessionId,
4297            _cx: &App,
4298        ) -> Option<Rc<dyn AgentSessionSetTitle>> {
4299            Some(Rc::new(FakeAgentSessionSetTitle {
4300                calls: self.set_title_calls.clone(),
4301            }))
4302        }
4303
4304        fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
4305            self
4306        }
4307    }
4308
4309    struct FakeAgentSessionSetTitle {
4310        calls: Rc<RefCell<Vec<SharedString>>>,
4311    }
4312
4313    impl AgentSessionSetTitle for FakeAgentSessionSetTitle {
4314        fn run(&self, title: SharedString, _cx: &mut App) -> Task<Result<()>> {
4315            self.calls.borrow_mut().push(title);
4316            Task::ready(Ok(()))
4317        }
4318    }
4319
4320    struct FakeAgentSessionEditor {
4321        _session_id: acp::SessionId,
4322    }
4323
4324    impl AgentSessionTruncate for FakeAgentSessionEditor {
4325        fn run(&self, _message_id: UserMessageId, _cx: &mut App) -> Task<Result<()>> {
4326            Task::ready(Ok(()))
4327        }
4328    }
4329
4330    #[gpui::test]
4331    async fn test_tool_call_not_found_creates_failed_entry(cx: &mut TestAppContext) {
4332        init_test(cx);
4333
4334        let fs = FakeFs::new(cx.executor());
4335        let project = Project::test(fs, [], cx).await;
4336        let connection = Rc::new(FakeAgentConnection::new());
4337        let thread = cx
4338            .update(|cx| {
4339                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4340            })
4341            .await
4342            .unwrap();
4343
4344        // Try to update a tool call that doesn't exist
4345        let nonexistent_id = acp::ToolCallId::new("nonexistent-tool-call");
4346        thread.update(cx, |thread, cx| {
4347            let result = thread.handle_session_update(
4348                acp::SessionUpdate::ToolCallUpdate(acp::ToolCallUpdate::new(
4349                    nonexistent_id.clone(),
4350                    acp::ToolCallUpdateFields::new().status(acp::ToolCallStatus::Completed),
4351                )),
4352                cx,
4353            );
4354
4355            // The update should succeed (not return an error)
4356            assert!(result.is_ok());
4357
4358            // There should now be exactly one entry in the thread
4359            assert_eq!(thread.entries.len(), 1);
4360
4361            // The entry should be a failed tool call
4362            if let AgentThreadEntry::ToolCall(tool_call) = &thread.entries[0] {
4363                assert_eq!(tool_call.id, nonexistent_id);
4364                assert!(matches!(tool_call.status, ToolCallStatus::Failed));
4365                assert_eq!(tool_call.kind, acp::ToolKind::Fetch);
4366
4367                // Check that the content contains the error message
4368                assert_eq!(tool_call.content.len(), 1);
4369                if let ToolCallContent::ContentBlock(content_block) = &tool_call.content[0] {
4370                    match content_block {
4371                        ContentBlock::Markdown { markdown } => {
4372                            let markdown_text = markdown.read(cx).source();
4373                            assert!(markdown_text.contains("Tool call not found"));
4374                        }
4375                        ContentBlock::Empty => panic!("Expected markdown content, got empty"),
4376                        ContentBlock::ResourceLink { .. } => {
4377                            panic!("Expected markdown content, got resource link")
4378                        }
4379                        ContentBlock::Image { .. } => {
4380                            panic!("Expected markdown content, got image")
4381                        }
4382                    }
4383                } else {
4384                    panic!("Expected ContentBlock, got: {:?}", tool_call.content[0]);
4385                }
4386            } else {
4387                panic!("Expected ToolCall entry, got: {:?}", thread.entries[0]);
4388            }
4389        });
4390    }
4391
4392    /// Tests that restoring a checkpoint properly cleans up terminals that were
4393    /// created after that checkpoint, and cancels any in-progress generation.
4394    ///
4395    /// Reproduces issue #35142: When a checkpoint is restored, any terminal processes
4396    /// that were started after that checkpoint should be terminated, and any in-progress
4397    /// AI generation should be canceled.
4398    #[gpui::test]
4399    async fn test_restore_checkpoint_kills_terminal(cx: &mut TestAppContext) {
4400        init_test(cx);
4401
4402        let fs = FakeFs::new(cx.executor());
4403        let project = Project::test(fs, [], cx).await;
4404        let connection = Rc::new(FakeAgentConnection::new());
4405        let thread = cx
4406            .update(|cx| {
4407                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4408            })
4409            .await
4410            .unwrap();
4411
4412        // Send first user message to create a checkpoint
4413        cx.update(|cx| {
4414            thread.update(cx, |thread, cx| {
4415                thread.send(vec!["first message".into()], cx)
4416            })
4417        })
4418        .await
4419        .unwrap();
4420
4421        // Send second message (creates another checkpoint) - we'll restore to this one
4422        cx.update(|cx| {
4423            thread.update(cx, |thread, cx| {
4424                thread.send(vec!["second message".into()], cx)
4425            })
4426        })
4427        .await
4428        .unwrap();
4429
4430        // Create 2 terminals BEFORE the checkpoint that have completed running
4431        let terminal_id_1 = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
4432        let mock_terminal_1 = cx.new(|cx| {
4433            let builder = ::terminal::TerminalBuilder::new_display_only(
4434                ::terminal::terminal_settings::CursorShape::default(),
4435                ::terminal::terminal_settings::AlternateScroll::On,
4436                None,
4437                0,
4438                cx.background_executor(),
4439                PathStyle::local(),
4440            )
4441            .unwrap();
4442            builder.subscribe(cx)
4443        });
4444
4445        thread.update(cx, |thread, cx| {
4446            thread.on_terminal_provider_event(
4447                TerminalProviderEvent::Created {
4448                    terminal_id: terminal_id_1.clone(),
4449                    label: "echo 'first'".to_string(),
4450                    cwd: Some(PathBuf::from("/test")),
4451                    output_byte_limit: None,
4452                    terminal: mock_terminal_1.clone(),
4453                },
4454                cx,
4455            );
4456        });
4457
4458        thread.update(cx, |thread, cx| {
4459            thread.on_terminal_provider_event(
4460                TerminalProviderEvent::Output {
4461                    terminal_id: terminal_id_1.clone(),
4462                    data: b"first\n".to_vec(),
4463                },
4464                cx,
4465            );
4466        });
4467
4468        thread.update(cx, |thread, cx| {
4469            thread.on_terminal_provider_event(
4470                TerminalProviderEvent::Exit {
4471                    terminal_id: terminal_id_1.clone(),
4472                    status: acp::TerminalExitStatus::new().exit_code(0),
4473                },
4474                cx,
4475            );
4476        });
4477
4478        let terminal_id_2 = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
4479        let mock_terminal_2 = cx.new(|cx| {
4480            let builder = ::terminal::TerminalBuilder::new_display_only(
4481                ::terminal::terminal_settings::CursorShape::default(),
4482                ::terminal::terminal_settings::AlternateScroll::On,
4483                None,
4484                0,
4485                cx.background_executor(),
4486                PathStyle::local(),
4487            )
4488            .unwrap();
4489            builder.subscribe(cx)
4490        });
4491
4492        thread.update(cx, |thread, cx| {
4493            thread.on_terminal_provider_event(
4494                TerminalProviderEvent::Created {
4495                    terminal_id: terminal_id_2.clone(),
4496                    label: "echo 'second'".to_string(),
4497                    cwd: Some(PathBuf::from("/test")),
4498                    output_byte_limit: None,
4499                    terminal: mock_terminal_2.clone(),
4500                },
4501                cx,
4502            );
4503        });
4504
4505        thread.update(cx, |thread, cx| {
4506            thread.on_terminal_provider_event(
4507                TerminalProviderEvent::Output {
4508                    terminal_id: terminal_id_2.clone(),
4509                    data: b"second\n".to_vec(),
4510                },
4511                cx,
4512            );
4513        });
4514
4515        thread.update(cx, |thread, cx| {
4516            thread.on_terminal_provider_event(
4517                TerminalProviderEvent::Exit {
4518                    terminal_id: terminal_id_2.clone(),
4519                    status: acp::TerminalExitStatus::new().exit_code(0),
4520                },
4521                cx,
4522            );
4523        });
4524
4525        // Get the second message ID to restore to
4526        let second_message_id = thread.read_with(cx, |thread, _| {
4527            // At this point we have:
4528            // - Index 0: First user message (with checkpoint)
4529            // - Index 1: Second user message (with checkpoint)
4530            // No assistant responses because FakeAgentConnection just returns EndTurn
4531            let AgentThreadEntry::UserMessage(message) = &thread.entries[1] else {
4532                panic!("expected user message at index 1");
4533            };
4534            message.id.clone().unwrap()
4535        });
4536
4537        // Create a terminal AFTER the checkpoint we'll restore to.
4538        // This simulates the AI agent starting a long-running terminal command.
4539        let terminal_id = acp::TerminalId::new(uuid::Uuid::new_v4().to_string());
4540        let mock_terminal = cx.new(|cx| {
4541            let builder = ::terminal::TerminalBuilder::new_display_only(
4542                ::terminal::terminal_settings::CursorShape::default(),
4543                ::terminal::terminal_settings::AlternateScroll::On,
4544                None,
4545                0,
4546                cx.background_executor(),
4547                PathStyle::local(),
4548            )
4549            .unwrap();
4550            builder.subscribe(cx)
4551        });
4552
4553        // Register the terminal as created
4554        thread.update(cx, |thread, cx| {
4555            thread.on_terminal_provider_event(
4556                TerminalProviderEvent::Created {
4557                    terminal_id: terminal_id.clone(),
4558                    label: "sleep 1000".to_string(),
4559                    cwd: Some(PathBuf::from("/test")),
4560                    output_byte_limit: None,
4561                    terminal: mock_terminal.clone(),
4562                },
4563                cx,
4564            );
4565        });
4566
4567        // Simulate the terminal producing output (still running)
4568        thread.update(cx, |thread, cx| {
4569            thread.on_terminal_provider_event(
4570                TerminalProviderEvent::Output {
4571                    terminal_id: terminal_id.clone(),
4572                    data: b"terminal is running...\n".to_vec(),
4573                },
4574                cx,
4575            );
4576        });
4577
4578        // Create a tool call entry that references this terminal
4579        // This represents the agent requesting a terminal command
4580        thread.update(cx, |thread, cx| {
4581            thread
4582                .handle_session_update(
4583                    acp::SessionUpdate::ToolCall(
4584                        acp::ToolCall::new("terminal-tool-1", "Running command")
4585                            .kind(acp::ToolKind::Execute)
4586                            .status(acp::ToolCallStatus::InProgress)
4587                            .content(vec![acp::ToolCallContent::Terminal(acp::Terminal::new(
4588                                terminal_id.clone(),
4589                            ))])
4590                            .raw_input(serde_json::json!({"command": "sleep 1000", "cd": "/test"})),
4591                    ),
4592                    cx,
4593                )
4594                .unwrap();
4595        });
4596
4597        // Verify terminal exists and is in the thread
4598        let terminal_exists_before =
4599            thread.read_with(cx, |thread, _| thread.terminals.contains_key(&terminal_id));
4600        assert!(
4601            terminal_exists_before,
4602            "Terminal should exist before checkpoint restore"
4603        );
4604
4605        // Verify the terminal's underlying task is still running (not completed)
4606        let terminal_running_before = thread.read_with(cx, |thread, _cx| {
4607            let terminal_entity = thread.terminals.get(&terminal_id).unwrap();
4608            terminal_entity.read_with(cx, |term, _cx| {
4609                term.output().is_none() // output is None means it's still running
4610            })
4611        });
4612        assert!(
4613            terminal_running_before,
4614            "Terminal should be running before checkpoint restore"
4615        );
4616
4617        // Verify we have the expected entries before restore
4618        let entry_count_before = thread.read_with(cx, |thread, _| thread.entries.len());
4619        assert!(
4620            entry_count_before > 1,
4621            "Should have multiple entries before restore"
4622        );
4623
4624        // Restore the checkpoint to the second message.
4625        // This should:
4626        // 1. Cancel any in-progress generation (via the cancel() call)
4627        // 2. Remove the terminal that was created after that point
4628        thread
4629            .update(cx, |thread, cx| {
4630                thread.restore_checkpoint(second_message_id, cx)
4631            })
4632            .await
4633            .unwrap();
4634
4635        // Verify that no send_task is in progress after restore
4636        // (cancel() clears the send_task)
4637        let has_send_task_after = thread.read_with(cx, |thread, _| thread.running_turn.is_some());
4638        assert!(
4639            !has_send_task_after,
4640            "Should not have a send_task after restore (cancel should have cleared it)"
4641        );
4642
4643        // Verify the entries were truncated (restoring to index 1 truncates at 1, keeping only index 0)
4644        let entry_count = thread.read_with(cx, |thread, _| thread.entries.len());
4645        assert_eq!(
4646            entry_count, 1,
4647            "Should have 1 entry after restore (only the first user message)"
4648        );
4649
4650        // Verify the 2 completed terminals from before the checkpoint still exist
4651        let terminal_1_exists = thread.read_with(cx, |thread, _| {
4652            thread.terminals.contains_key(&terminal_id_1)
4653        });
4654        assert!(
4655            terminal_1_exists,
4656            "Terminal 1 (from before checkpoint) should still exist"
4657        );
4658
4659        let terminal_2_exists = thread.read_with(cx, |thread, _| {
4660            thread.terminals.contains_key(&terminal_id_2)
4661        });
4662        assert!(
4663            terminal_2_exists,
4664            "Terminal 2 (from before checkpoint) should still exist"
4665        );
4666
4667        // Verify they're still in completed state
4668        let terminal_1_completed = thread.read_with(cx, |thread, _cx| {
4669            let terminal_entity = thread.terminals.get(&terminal_id_1).unwrap();
4670            terminal_entity.read_with(cx, |term, _cx| term.output().is_some())
4671        });
4672        assert!(terminal_1_completed, "Terminal 1 should still be completed");
4673
4674        let terminal_2_completed = thread.read_with(cx, |thread, _cx| {
4675            let terminal_entity = thread.terminals.get(&terminal_id_2).unwrap();
4676            terminal_entity.read_with(cx, |term, _cx| term.output().is_some())
4677        });
4678        assert!(terminal_2_completed, "Terminal 2 should still be completed");
4679
4680        // Verify the running terminal (created after checkpoint) was removed
4681        let terminal_3_exists =
4682            thread.read_with(cx, |thread, _| thread.terminals.contains_key(&terminal_id));
4683        assert!(
4684            !terminal_3_exists,
4685            "Terminal 3 (created after checkpoint) should have been removed"
4686        );
4687
4688        // Verify total count is 2 (the two from before the checkpoint)
4689        let terminal_count = thread.read_with(cx, |thread, _| thread.terminals.len());
4690        assert_eq!(
4691            terminal_count, 2,
4692            "Should have exactly 2 terminals (the completed ones from before checkpoint)"
4693        );
4694    }
4695
4696    /// Tests that update_last_checkpoint correctly updates the original message's checkpoint
4697    /// even when a new user message is added while the async checkpoint comparison is in progress.
4698    ///
4699    /// This is a regression test for a bug where update_last_checkpoint would fail with
4700    /// "no checkpoint" if a new user message (without a checkpoint) was added between when
4701    /// update_last_checkpoint started and when its async closure ran.
4702    #[gpui::test]
4703    async fn test_update_last_checkpoint_with_new_message_added(cx: &mut TestAppContext) {
4704        init_test(cx);
4705
4706        let fs = FakeFs::new(cx.executor());
4707        fs.insert_tree(path!("/test"), json!({".git": {}, "file.txt": "content"}))
4708            .await;
4709        let project = Project::test(fs.clone(), [Path::new(path!("/test"))], cx).await;
4710
4711        let handler_done = Arc::new(AtomicBool::new(false));
4712        let handler_done_clone = handler_done.clone();
4713        let connection = Rc::new(FakeAgentConnection::new().on_user_message(
4714            move |_, _thread, _cx| {
4715                handler_done_clone.store(true, SeqCst);
4716                async move { Ok(acp::PromptResponse::new(acp::StopReason::EndTurn)) }.boxed_local()
4717            },
4718        ));
4719
4720        let thread = cx
4721            .update(|cx| {
4722                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4723            })
4724            .await
4725            .unwrap();
4726
4727        let send_future = thread.update(cx, |thread, cx| thread.send_raw("First message", cx));
4728        let send_task = cx.background_executor.spawn(send_future);
4729
4730        // Tick until handler completes, then a few more to let update_last_checkpoint start
4731        while !handler_done.load(SeqCst) {
4732            cx.executor().tick();
4733        }
4734        for _ in 0..5 {
4735            cx.executor().tick();
4736        }
4737
4738        thread.update(cx, |thread, cx| {
4739            thread.push_entry(
4740                AgentThreadEntry::UserMessage(UserMessage {
4741                    id: Some(UserMessageId::new()),
4742                    content: ContentBlock::Empty,
4743                    chunks: vec!["Injected message (no checkpoint)".into()],
4744                    checkpoint: None,
4745                    indented: false,
4746                }),
4747                cx,
4748            );
4749        });
4750
4751        cx.run_until_parked();
4752        let result = send_task.await;
4753
4754        assert!(
4755            result.is_ok(),
4756            "send should succeed even when new message added during update_last_checkpoint: {:?}",
4757            result.err()
4758        );
4759    }
4760
4761    /// Tests that when a follow-up message is sent during generation,
4762    /// the first turn completing does NOT clear `running_turn` because
4763    /// it now belongs to the second turn.
4764    #[gpui::test]
4765    async fn test_follow_up_message_during_generation_does_not_clear_turn(cx: &mut TestAppContext) {
4766        init_test(cx);
4767
4768        let fs = FakeFs::new(cx.executor());
4769        let project = Project::test(fs, [], cx).await;
4770
4771        // First handler waits for this signal before completing
4772        let (first_complete_tx, first_complete_rx) = futures::channel::oneshot::channel::<()>();
4773        let first_complete_rx = RefCell::new(Some(first_complete_rx));
4774
4775        let connection = Rc::new(FakeAgentConnection::new().on_user_message({
4776            move |params, _thread, _cx| {
4777                let first_complete_rx = first_complete_rx.borrow_mut().take();
4778                let is_first = params
4779                    .prompt
4780                    .iter()
4781                    .any(|c| matches!(c, acp::ContentBlock::Text(t) if t.text.contains("first")));
4782
4783                async move {
4784                    if is_first {
4785                        // First handler waits until signaled
4786                        if let Some(rx) = first_complete_rx {
4787                            rx.await.ok();
4788                        }
4789                    }
4790                    Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
4791                }
4792                .boxed_local()
4793            }
4794        }));
4795
4796        let thread = cx
4797            .update(|cx| {
4798                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4799            })
4800            .await
4801            .unwrap();
4802
4803        // Send first message (turn_id=1) - handler will block
4804        let first_request = thread.update(cx, |thread, cx| thread.send_raw("first", cx));
4805        assert_eq!(thread.read_with(cx, |t, _| t.turn_id), 1);
4806
4807        // Send second message (turn_id=2) while first is still blocked
4808        // This calls cancel() which takes turn 1's running_turn and sets turn 2's
4809        let second_request = thread.update(cx, |thread, cx| thread.send_raw("second", cx));
4810        assert_eq!(thread.read_with(cx, |t, _| t.turn_id), 2);
4811
4812        let running_turn_after_second_send =
4813            thread.read_with(cx, |thread, _| thread.running_turn.as_ref().map(|t| t.id));
4814        assert_eq!(
4815            running_turn_after_second_send,
4816            Some(2),
4817            "running_turn should be set to turn 2 after sending second message"
4818        );
4819
4820        // Now signal first handler to complete
4821        first_complete_tx.send(()).ok();
4822
4823        // First request completes - should NOT clear running_turn
4824        // because running_turn now belongs to turn 2
4825        first_request.await.unwrap();
4826
4827        let running_turn_after_first =
4828            thread.read_with(cx, |thread, _| thread.running_turn.as_ref().map(|t| t.id));
4829        assert_eq!(
4830            running_turn_after_first,
4831            Some(2),
4832            "first turn completing should not clear running_turn (belongs to turn 2)"
4833        );
4834
4835        // Second request completes - SHOULD clear running_turn
4836        second_request.await.unwrap();
4837
4838        let running_turn_after_second =
4839            thread.read_with(cx, |thread, _| thread.running_turn.is_some());
4840        assert!(
4841            !running_turn_after_second,
4842            "second turn completing should clear running_turn"
4843        );
4844    }
4845
4846    #[gpui::test]
4847    async fn test_send_returns_cancelled_response_and_marks_tools_as_cancelled(
4848        cx: &mut TestAppContext,
4849    ) {
4850        init_test(cx);
4851
4852        let fs = FakeFs::new(cx.executor());
4853        let project = Project::test(fs, [], cx).await;
4854
4855        let connection = Rc::new(FakeAgentConnection::new().on_user_message(
4856            move |_params, thread, mut cx| {
4857                async move {
4858                    thread
4859                        .update(&mut cx, |thread, cx| {
4860                            thread.handle_session_update(
4861                                acp::SessionUpdate::ToolCall(
4862                                    acp::ToolCall::new(
4863                                        acp::ToolCallId::new("test-tool"),
4864                                        "Test Tool",
4865                                    )
4866                                    .kind(acp::ToolKind::Fetch)
4867                                    .status(acp::ToolCallStatus::InProgress),
4868                                ),
4869                                cx,
4870                            )
4871                        })
4872                        .unwrap()
4873                        .unwrap();
4874
4875                    Ok(acp::PromptResponse::new(acp::StopReason::Cancelled))
4876                }
4877                .boxed_local()
4878            },
4879        ));
4880
4881        let thread = cx
4882            .update(|cx| {
4883                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4884            })
4885            .await
4886            .unwrap();
4887
4888        let response = thread
4889            .update(cx, |thread, cx| thread.send_raw("test message", cx))
4890            .await;
4891
4892        let response = response
4893            .expect("send should succeed")
4894            .expect("should have response");
4895        assert_eq!(
4896            response.stop_reason,
4897            acp::StopReason::Cancelled,
4898            "response should have Cancelled stop_reason"
4899        );
4900
4901        thread.read_with(cx, |thread, _| {
4902            let tool_entry = thread
4903                .entries
4904                .iter()
4905                .find_map(|e| {
4906                    if let AgentThreadEntry::ToolCall(call) = e {
4907                        Some(call)
4908                    } else {
4909                        None
4910                    }
4911                })
4912                .expect("should have tool call entry");
4913
4914            assert!(
4915                matches!(tool_entry.status, ToolCallStatus::Canceled),
4916                "tool should be marked as Canceled when response is Cancelled, got {:?}",
4917                tool_entry.status
4918            );
4919        });
4920    }
4921
4922    #[gpui::test]
4923    async fn test_provisional_title_replaced_by_real_title(cx: &mut TestAppContext) {
4924        init_test(cx);
4925
4926        let fs = FakeFs::new(cx.executor());
4927        let project = Project::test(fs, [], cx).await;
4928        let connection = Rc::new(FakeAgentConnection::new());
4929        let set_title_calls = connection.set_title_calls.clone();
4930
4931        let thread = cx
4932            .update(|cx| {
4933                connection.new_session(project, PathList::new(&[Path::new(path!("/test"))]), cx)
4934            })
4935            .await
4936            .unwrap();
4937
4938        // Initial title is the default.
4939        thread.read_with(cx, |thread, _| {
4940            assert_eq!(thread.title().as_ref(), "Test");
4941        });
4942
4943        // Setting a provisional title updates the display title.
4944        thread.update(cx, |thread, cx| {
4945            thread.set_provisional_title("Hello, can you help…".into(), cx);
4946        });
4947        thread.read_with(cx, |thread, _| {
4948            assert_eq!(thread.title().as_ref(), "Hello, can you help…");
4949        });
4950
4951        // The provisional title should NOT have propagated to the connection.
4952        assert_eq!(
4953            set_title_calls.borrow().len(),
4954            0,
4955            "provisional title should not propagate to the connection"
4956        );
4957
4958        // When the real title arrives via set_title, it replaces the
4959        // provisional title and propagates to the connection.
4960        let task = thread.update(cx, |thread, cx| {
4961            thread.set_title("Helping with Rust question".into(), cx)
4962        });
4963        task.await.expect("set_title should succeed");
4964        thread.read_with(cx, |thread, _| {
4965            assert_eq!(thread.title().as_ref(), "Helping with Rust question");
4966        });
4967        assert_eq!(
4968            set_title_calls.borrow().as_slice(),
4969            &[SharedString::from("Helping with Rust question")],
4970            "real title should propagate to the connection"
4971        );
4972    }
4973}